lease.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package etcd
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. )
  7. // 创建租约,需要维持租约
  8. func (sd *EtcdService) CreateLease() (*LeaseItem, error) {
  9. cli := sd.EtcdCli
  10. if cli == nil {
  11. return nil, fmt.Errorf("no client")
  12. }
  13. ctx2, cancel2 := context.WithTimeout(sd.coreCtx, time.Second*5)
  14. l, err := cli.Grant(ctx2, int64(sd.ttl))
  15. cancel2()
  16. if err != nil {
  17. return nil, err
  18. }
  19. fmt.Println("new lease:", l.ID)
  20. //一次调用一直保持,直到通道关闭或出错为止
  21. ctx, cancel3 := context.WithCancel(sd.coreCtx)
  22. kac, err := cli.KeepAlive(ctx, l.ID)
  23. if err != nil || kac == nil {
  24. cancel3()
  25. return nil, err
  26. }
  27. li := new(LeaseItem)
  28. li.LeaseID = l.ID
  29. li.CancelKeep = cancel3
  30. li.ChanKeepAliveResponse = kac
  31. return li, nil
  32. // // go func(kac <-chan *clientv3.LeaseKeepAliveResponse) {
  33. // // defer close(donec)
  34. // // //for resp := range kac {
  35. // // for range kac {
  36. // // // eat messages until keep alive channel closes
  37. // // //log.Infoln(resp.ID)
  38. // // }
  39. // // // log.Infoln("keepalive donec")
  40. // // }(kac)
  41. // if _, ok := <-kac; !ok {
  42. // return errors.New("lease chan shutdown")
  43. // }
  44. // // sd.chLease <- kac
  45. // return nil
  46. }
  47. // 取消租约
  48. func (sd *EtcdService) revokeLease(l *LeaseItem) error {
  49. ctx, cancel := context.WithTimeout(sd.coreCtx, time.Duration(sd.ttl)*time.Second)
  50. _, err := sd.EtcdCli.Revoke(ctx, l.LeaseID)
  51. cancel()
  52. return err
  53. }