package etcd import ( "context" "fmt" "time" ) // 创建租约,需要维持租约 func (sd *EtcdService) CreateLease() (*LeaseItem, error) { cli := sd.EtcdCli if cli == nil { return nil, fmt.Errorf("no client") } ctx2, cancel2 := context.WithTimeout(sd.coreCtx, time.Second*5) l, err := cli.Grant(ctx2, int64(sd.ttl)) cancel2() if err != nil { return nil, err } fmt.Println("new lease:", l.ID) //一次调用一直保持,直到通道关闭或出错为止 ctx, cancel3 := context.WithCancel(sd.coreCtx) kac, err := cli.KeepAlive(ctx, l.ID) if err != nil || kac == nil { cancel3() return nil, err } li := new(LeaseItem) li.LeaseID = l.ID li.CancelKeep = cancel3 li.ChanKeepAliveResponse = kac return li, nil // // go func(kac <-chan *clientv3.LeaseKeepAliveResponse) { // // defer close(donec) // // //for resp := range kac { // // for range kac { // // // eat messages until keep alive channel closes // // //log.Infoln(resp.ID) // // } // // // log.Infoln("keepalive donec") // // }(kac) // if _, ok := <-kac; !ok { // return errors.New("lease chan shutdown") // } // // sd.chLease <- kac // return nil } // 取消租约 func (sd *EtcdService) revokeLease(l *LeaseItem) error { ctx, cancel := context.WithTimeout(sd.coreCtx, time.Duration(sd.ttl)*time.Second) _, err := sd.EtcdCli.Revoke(ctx, l.LeaseID) cancel() return err }