123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- package etcd
- import (
- "context"
- "fmt"
- "time"
- )
- // 创建租约,需要维持租约
- func (sd *EtcdService) CreateLease() (*LeaseItem, error) {
- cli := sd.EtcdCli
- if cli == nil {
- return nil, fmt.Errorf("no client")
- }
- ctx2, cancel2 := context.WithTimeout(sd.coreCtx, time.Second*5)
- l, err := cli.Grant(ctx2, int64(sd.ttl))
- cancel2()
- if err != nil {
- return nil, err
- }
- fmt.Println("new lease:", l.ID)
- //一次调用一直保持,直到通道关闭或出错为止
- ctx, cancel3 := context.WithCancel(sd.coreCtx)
- kac, err := cli.KeepAlive(ctx, l.ID)
- if err != nil || kac == nil {
- cancel3()
- return nil, err
- }
- li := new(LeaseItem)
- li.LeaseID = l.ID
- li.CancelKeep = cancel3
- li.ChanKeepAliveResponse = kac
- return li, nil
- // // go func(kac <-chan *clientv3.LeaseKeepAliveResponse) {
- // // defer close(donec)
- // // //for resp := range kac {
- // // for range kac {
- // // // eat messages until keep alive channel closes
- // // //log.Infoln(resp.ID)
- // // }
- // // // log.Infoln("keepalive donec")
- // // }(kac)
- // if _, ok := <-kac; !ok {
- // return errors.New("lease chan shutdown")
- // }
- // // sd.chLease <- kac
- // return nil
- }
- // 取消租约
- func (sd *EtcdService) revokeLease(l *LeaseItem) error {
- ctx, cancel := context.WithTimeout(sd.coreCtx, time.Duration(sd.ttl)*time.Second)
- _, err := sd.EtcdCli.Revoke(ctx, l.LeaseID)
- cancel()
- return err
- }
|