servercustomer.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package etcd
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "time"
  7. clientv3 "go.etcd.io/etcd/client/v3"
  8. )
  9. // func (sd *EtcdService) StopWatch() {
  10. // sd.cancelWatch()
  11. // sd.wgWatch.Wait()
  12. // }
  13. // func (sd *EtcdService) WatchServer(sm ServerManager) {
  14. // sd.wgWatch.Wait()
  15. // sd.wgWatch.Add(1)
  16. // go sd.watchServer(sm)
  17. // }
  18. // func (sd *EtcdService) watchServer(sm ServerManager) {
  19. // defer func() {
  20. // sd.wgWatch.Done()
  21. // }()
  22. // ctx, cancelWatch := context.WithCancel(sd.coreCtx)
  23. // chW := sd.EtcdCli.Watch(ctx, "servers/", clientv3.WithPrefix())
  24. // sd.cancelWatch = cancelWatch
  25. // go func() {
  26. // for {
  27. // select {
  28. // case wResp, ok := <-chW:
  29. // if wResp.Err() != nil {
  30. // fmt.Printf("etcd watcher response error: %s", wResp.Err())
  31. // // logger.Warnf("etcd watcher response error: %s", wResp.Err())
  32. // }
  33. // if !ok {
  34. // // logger.Error("etcd watcher died")
  35. // return
  36. // }
  37. // for _, ev := range wResp.Events {
  38. // switch ev.Type {
  39. // case clientv3.EventTypePut:
  40. // s, _ := PutServer(ev.Kv.Key, ev.Kv.Value)
  41. // if s != nil {
  42. // sm.AddServer(s)
  43. // }
  44. // case clientv3.EventTypeDelete:
  45. // id, _ := DeleteServer(ev.Kv.Key, ev.Kv.Value)
  46. // if len(id) > 0 {
  47. // sm.DeleteServer(id)
  48. // }
  49. // }
  50. // }
  51. // case <-sd.coreCtx.Done():
  52. // return
  53. // case <-sd.chExitEtcd:
  54. // cancelWatch()
  55. // return
  56. // }
  57. // }
  58. // }()
  59. // }
  60. func (sd *EtcdService) SyncServer() ([]*Server, error) {
  61. keys, err := sd.EtcdCli.Get(
  62. context.TODO(),
  63. "servers/gate/",
  64. clientv3.WithPrefix(),
  65. //clientv3.WithKeysOnly(),
  66. )
  67. if err != nil {
  68. return nil, err
  69. }
  70. var lst []*Server
  71. for _, kv := range keys.Kvs {
  72. fmt.Println("sync server", string(kv.Key), string(kv.Value))
  73. if s, err := ParseEtcdServer(kv.Value); err == nil {
  74. lst = append(lst, s)
  75. }
  76. }
  77. return lst, nil
  78. }
  79. func (sd *EtcdService) Get(k string) (map[string]string, error) {
  80. ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
  81. key := "servers/" + k
  82. keys, err := sd.EtcdCli.Get(
  83. ctx,
  84. key,
  85. clientv3.WithPrefix(),
  86. )
  87. if err != nil {
  88. return nil, err
  89. }
  90. maps := make(map[string]string)
  91. for _, kv := range keys.Kvs {
  92. ik := string(kv.Key)
  93. v := string(kv.Value)
  94. fmt.Println("EtcdService Get", ik, v)
  95. nik := strings.TrimRight(ik, "/")
  96. n := strings.LastIndexByte(nik, '/')
  97. var nk string
  98. if n >= 0 {
  99. nk = nik[n+1:]
  100. } else {
  101. nk = nik
  102. }
  103. maps[nk] = v
  104. }
  105. return maps, nil
  106. }