package etcd import ( "context" "errors" "fmt" "log" "time" "leafstalk/conf" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/namespace" ) type EtcdServerConfig struct { servers []string DialTimeout time.Duration User string Pass string Prefix string } type LeaseItem struct { LeaseID clientv3.LeaseID ChanKeepAliveResponse <-chan *clientv3.LeaseKeepAliveResponse CancelKeep context.CancelFunc Value *Server } type EtcdService struct { config *conf.Config EtcdCli *clientv3.Client ttl int serviceCfg EtcdServerConfig serverManager ServerManager chExitEtcd chan struct{} // leaseID clientv3.LeaseID coreCtx context.Context // chLease chan (<-chan *clientv3.LeaseKeepAliveResponse) chRegister chan *Server allLeaseItem map[int64]*LeaseItem // wg sync.WaitGroup // wgWatch sync.WaitGroup // cancelWatch context.CancelFunc } func New(config *conf.Config, cli *clientv3.Client) (*EtcdService, error) { var client *clientv3.Client if cli != nil { client = cli } sd := &EtcdService{ config: config, EtcdCli: client, } servers := config.GetStringSlice("etcd.addr") if len(servers) == 0 { servers = append(servers, "127.0.0.1:2379") } prefix := config.GetString("projectName") prefix += "/" sd.configure(servers, prefix) sd.chExitEtcd = make(chan struct{}) sd.allLeaseItem = make(map[int64]*LeaseItem, 0) if err := sd.InitEtcd(); err != nil { log.Fatalln(err) } sd.chRegister = make(chan *Server, 1) return sd, nil } func (sd *EtcdService) configure(servers []string, prefix string) { sd.serviceCfg.servers = servers //[]string{"127.0.0.1:2379"} sd.serviceCfg.DialTimeout = time.Second * 5 sd.ttl = 5 //60 sd.serviceCfg.Prefix = prefix //"grave/" } func (sd *EtcdService) InitEtcd() error { var cli *clientv3.Client var err error if sd.EtcdCli == nil { cli, err = sd.NewClient() if err != nil { return err } sd.EtcdCli = cli } sd.coreCtx = cli.Ctx() // namespaced etcd :) sd.EtcdCli.KV = namespace.NewKV(sd.EtcdCli.KV, sd.serviceCfg.Prefix) sd.EtcdCli.Watcher = namespace.NewWatcher(sd.EtcdCli.Watcher, sd.serviceCfg.Prefix) sd.EtcdCli.Lease = namespace.NewLease(sd.EtcdCli.Lease, sd.serviceCfg.Prefix) return nil } func (sd *EtcdService) NewClient() (*clientv3.Client, error) { config := clientv3.Config{ Endpoints: sd.serviceCfg.servers, DialTimeout: sd.serviceCfg.DialTimeout, } if sd.serviceCfg.User != "" && sd.serviceCfg.Pass != "" { config.Username = sd.serviceCfg.User config.Password = sd.serviceCfg.Pass } var cli *clientv3.Client var err error cli, err = clientv3.New(config) if err != nil { return nil, err } return cli, nil } // 监听传过来的租约通道,创建监听租约的协程 // 监听应用退出 // 监听租约协程退出 func (sd *EtcdService) KeepServerOnline(s *Server) error { ctx, cancel := context.WithTimeout(sd.coreCtx, time.Second*5) err := sd.RegisterServer(ctx, s) cancel() if err != nil { return err } go func() { for { select { case s := <-sd.chRegister: sd.RegisterServer(sd.coreCtx, s) case <-sd.coreCtx.Done(): return } } }() return nil } func (sd *EtcdService) RegisterServer(ctx context.Context, s *Server) error { ticker := time.NewTicker(time.Second) defer ticker.Stop() for { if err := sd.registerServer(s); err == nil { return nil } else { fmt.Printf("registerServer error. %v", err) } // 主动控制时,返回 select { case <-ctx.Done(): return errors.New("etcd core closed") default: } // 网络断开时,重试 select { case <-ticker.C: case <-ctx.Done(): return errors.New("etcd core closed") } } // sd.chRegister = make(chan struct{}) // if err := sd.registerServer(); err != nil { // return err // } // return nil } // 注册服务器,并保持登录状态 func (sd *EtcdService) registerServer(s *Server) error { li, err := sd.CreateLease() if err != nil { return err } opts := []clientv3.OpOption{} opts = append(opts, clientv3.WithLease(clientv3.LeaseID(li.LeaseID))) ctx, cancel := context.WithTimeout(sd.coreCtx, time.Second*5) _, err = sd.EtcdCli.Put( ctx, s.getKey(), s.AsJSONString(), opts..., ) cancel() if err != nil { sd.revokeLease(li) return err } li.Value = s sd.allLeaseItem[s.ID] = li go func() { for { select { case _, ok := <-li.ChanKeepAliveResponse: if !ok { sd.chRegister <- li.Value return } case <-sd.coreCtx.Done(): return } } }() return nil } // 更新服务器信息 func (sd *EtcdService) UpdateServer(s *Server) error { if li, ok := sd.allLeaseItem[s.ID]; ok { opts := []clientv3.OpOption{} opts = append(opts, clientv3.WithLease(clientv3.LeaseID(li.LeaseID))) ctx, cancel := context.WithTimeout(sd.coreCtx, time.Second*1) _, err := sd.EtcdCli.Put( ctx, s.getKey(), s.AsJSONString(), opts..., ) cancel() if err != nil { return err } } return nil } // // 租约通道断开,需要注册新租约 // // 取消注册,不再需要保持租约在线 // // 服务关闭 // func (sd *EtcdService) KeepLease(chLar <-chan *clientv3.LeaseKeepAliveResponse) { // defer func() { // sd.wg.Done() // }() // clearLeaseCh := func() { // for { // select { // case <-sd.chLease: // default: // return // } // } // } // for { // select { // case lid, ok := <-chLar: // if ok { // fmt.Println(lid) // continue // } // fmt.Println("lease chan close") // case <-sd.coreCtx.Done(): // fmt.Println("client chan close") // case <-sd.chExitEtcd: // sd.revokeLease() // fmt.Println("etcd chan close") // return // case <-sd.chRegister: // sd.revokeLease() // fmt.Println("register chan close") // return // } // time.Sleep(time.Second) // fmt.Println(1111115) // // logger.Infoln("KeepLease RegisterServer") // if err := sd.registerServer(); err != nil { // clearLeaseCh() // // logger.Error("KeepLease error: ", err) // } else { // break // } // } // fmt.Println("KeepLease exit") // }