serverprovider.go 6.1 KB


  1. package etcd
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "time"
  8. "leafstalk/conf"
  9. clientv3 "go.etcd.io/etcd/client/v3"
  10. "go.etcd.io/etcd/client/v3/namespace"
  11. )
  12. type EtcdServerConfig struct {
  13. servers []string
  14. DialTimeout time.Duration
  15. User string
  16. Pass string
  17. Prefix string
  18. }
  19. type LeaseItem struct {
  20. LeaseID clientv3.LeaseID
  21. ChanKeepAliveResponse <-chan *clientv3.LeaseKeepAliveResponse
  22. CancelKeep context.CancelFunc
  23. Value *Server
  24. }
  25. type EtcdService struct {
  26. config *conf.Config
  27. EtcdCli *clientv3.Client
  28. ttl int
  29. serviceCfg EtcdServerConfig
  30. serverManager ServerManager
  31. chExitEtcd chan struct{}
  32. // leaseID clientv3.LeaseID
  33. coreCtx context.Context
  34. // chLease chan (<-chan *clientv3.LeaseKeepAliveResponse)
  35. chRegister chan *Server
  36. allLeaseItem map[int64]*LeaseItem
  37. // wg sync.WaitGroup
  38. // wgWatch sync.WaitGroup
  39. // cancelWatch context.CancelFunc
  40. }
  41. func New(config *conf.Config, cli *clientv3.Client) (*EtcdService, error) {
  42. var client *clientv3.Client
  43. if cli != nil {
  44. client = cli
  45. }
  46. sd := &EtcdService{
  47. config: config,
  48. EtcdCli: client,
  49. }
  50. servers := config.GetStringSlice("etcd.addr")
  51. if len(servers) == 0 {
  52. servers = append(servers, "127.0.0.1:2379")
  53. }
  54. prefix := config.GetString("projectName")
  55. prefix += "/"
  56. sd.configure(servers, prefix)
  57. sd.chExitEtcd = make(chan struct{})
  58. sd.allLeaseItem = make(map[int64]*LeaseItem, 0)
  59. if err := sd.InitEtcd(); err != nil {
  60. log.Fatalln(err)
  61. }
  62. sd.chRegister = make(chan *Server, 1)
  63. return sd, nil
  64. }
  65. func (sd *EtcdService) configure(servers []string, prefix string) {
  66. sd.serviceCfg.servers = servers //[]string{"127.0.0.1:2379"}
  67. sd.serviceCfg.DialTimeout = time.Second * 5
  68. sd.ttl = 5 //60
  69. sd.serviceCfg.Prefix = prefix //"grave/"
  70. }
  71. func (sd *EtcdService) InitEtcd() error {
  72. var cli *clientv3.Client
  73. var err error
  74. if sd.EtcdCli == nil {
  75. cli, err = sd.NewClient()
  76. if err != nil {
  77. return err
  78. }
  79. sd.EtcdCli = cli
  80. }
  81. sd.coreCtx = cli.Ctx()
  82. // namespaced etcd :)
  83. sd.EtcdCli.KV = namespace.NewKV(sd.EtcdCli.KV, sd.serviceCfg.Prefix)
  84. sd.EtcdCli.Watcher = namespace.NewWatcher(sd.EtcdCli.Watcher, sd.serviceCfg.Prefix)
  85. sd.EtcdCli.Lease = namespace.NewLease(sd.EtcdCli.Lease, sd.serviceCfg.Prefix)
  86. return nil
  87. }
  88. func (sd *EtcdService) NewClient() (*clientv3.Client, error) {
  89. config := clientv3.Config{
  90. Endpoints: sd.serviceCfg.servers,
  91. DialTimeout: sd.serviceCfg.DialTimeout,
  92. }
  93. if sd.serviceCfg.User != "" && sd.serviceCfg.Pass != "" {
  94. config.Username = sd.serviceCfg.User
  95. config.Password = sd.serviceCfg.Pass
  96. }
  97. var cli *clientv3.Client
  98. var err error
  99. cli, err = clientv3.New(config)
  100. if err != nil {
  101. return nil, err
  102. }
  103. return cli, nil
  104. }
  105. // 监听传过来的租约通道,创建监听租约的协程
  106. // 监听应用退出
  107. // 监听租约协程退出
  108. func (sd *EtcdService) KeepServerOnline(s *Server) error {
  109. ctx, cancel := context.WithTimeout(sd.coreCtx, time.Second*5)
  110. err := sd.RegisterServer(ctx, s)
  111. cancel()
  112. if err != nil {
  113. return err
  114. }
  115. go func() {
  116. for {
  117. select {
  118. case s := <-sd.chRegister:
  119. sd.RegisterServer(sd.coreCtx, s)
  120. case <-sd.coreCtx.Done():
  121. return
  122. }
  123. }
  124. }()
  125. return nil
  126. }
  127. func (sd *EtcdService) RegisterServer(ctx context.Context, s *Server) error {
  128. ticker := time.NewTicker(time.Second)
  129. defer ticker.Stop()
  130. for {
  131. if err := sd.registerServer(s); err == nil {
  132. return nil
  133. } else {
  134. fmt.Printf("registerServer error. %v", err)
  135. }
  136. // 主动控制时,返回
  137. select {
  138. case <-ctx.Done():
  139. return errors.New("etcd core closed")
  140. default:
  141. }
  142. // 网络断开时,重试
  143. select {
  144. case <-ticker.C:
  145. case <-ctx.Done():
  146. return errors.New("etcd core closed")
  147. }
  148. }
  149. // sd.chRegister = make(chan struct{})
  150. // if err := sd.registerServer(); err != nil {
  151. // return err
  152. // }
  153. // return nil
  154. }
  155. // 注册服务器,并保持登录状态
  156. func (sd *EtcdService) registerServer(s *Server) error {
  157. li, err := sd.CreateLease()
  158. if err != nil {
  159. return err
  160. }
  161. opts := []clientv3.OpOption{}
  162. opts = append(opts, clientv3.WithLease(clientv3.LeaseID(li.LeaseID)))
  163. ctx, cancel := context.WithTimeout(sd.coreCtx, time.Second*5)
  164. _, err = sd.EtcdCli.Put(
  165. ctx,
  166. s.getKey(),
  167. s.AsJSONString(),
  168. opts...,
  169. )
  170. cancel()
  171. if err != nil {
  172. sd.revokeLease(li)
  173. return err
  174. }
  175. li.Value = s
  176. sd.allLeaseItem[s.ID] = li
  177. go func() {
  178. for {
  179. select {
  180. case _, ok := <-li.ChanKeepAliveResponse:
  181. if !ok {
  182. sd.chRegister <- li.Value
  183. return
  184. }
  185. case <-sd.coreCtx.Done():
  186. return
  187. }
  188. }
  189. }()
  190. return nil
  191. }
  192. // 更新服务器信息
  193. func (sd *EtcdService) UpdateServer(s *Server) error {
  194. if li, ok := sd.allLeaseItem[s.ID]; ok {
  195. opts := []clientv3.OpOption{}
  196. opts = append(opts, clientv3.WithLease(clientv3.LeaseID(li.LeaseID)))
  197. ctx, cancel := context.WithTimeout(sd.coreCtx, time.Second*1)
  198. _, err := sd.EtcdCli.Put(
  199. ctx,
  200. s.getKey(),
  201. s.AsJSONString(),
  202. opts...,
  203. )
  204. cancel()
  205. if err != nil {
  206. return err
  207. }
  208. }
  209. return nil
  210. }
  211. // // 租约通道断开,需要注册新租约
  212. // // 取消注册,不再需要保持租约在线
  213. // // 服务关闭
  214. // func (sd *EtcdService) KeepLease(chLar <-chan *clientv3.LeaseKeepAliveResponse) {
  215. // defer func() {
  216. // sd.wg.Done()
  217. // }()
  218. // clearLeaseCh := func() {
  219. // for {
  220. // select {
  221. // case <-sd.chLease:
  222. // default:
  223. // return
  224. // }
  225. // }
  226. // }
  227. // for {
  228. // select {
  229. // case lid, ok := <-chLar:
  230. // if ok {
  231. // fmt.Println(lid)
  232. // continue
  233. // }
  234. // fmt.Println("lease chan close")
  235. // case <-sd.coreCtx.Done():
  236. // fmt.Println("client chan close")
  237. // case <-sd.chExitEtcd:
  238. // sd.revokeLease()
  239. // fmt.Println("etcd chan close")
  240. // return
  241. // case <-sd.chRegister:
  242. // sd.revokeLease()
  243. // fmt.Println("register chan close")
  244. // return
  245. // }
  246. // time.Sleep(time.Second)
  247. // fmt.Println(1111115)
  248. // // logger.Infoln("KeepLease RegisterServer")
  249. // if err := sd.registerServer(); err != nil {
  250. // clearLeaseCh()
  251. // // logger.Error("KeepLease error: ", err)
  252. // } else {
  253. // break
  254. // }
  255. // }
  256. // fmt.Println("KeepLease exit")
  257. // }