|
- package etcd
- import (
- "context"
- "errors"
- "fmt"
- "log"
- "time"
- "leafstalk/conf"
- clientv3 "go.etcd.io/etcd/client/v3"
- "go.etcd.io/etcd/client/v3/namespace"
- )
- type EtcdServerConfig struct {
- servers []string
- DialTimeout time.Duration
- User string
- Pass string
- Prefix string
- }
- type LeaseItem struct {
- LeaseID clientv3.LeaseID
- ChanKeepAliveResponse <-chan *clientv3.LeaseKeepAliveResponse
- CancelKeep context.CancelFunc
- Value *Server
- }
- type EtcdService struct {
- config *conf.Config
- EtcdCli *clientv3.Client
- ttl int
- serviceCfg EtcdServerConfig
- serverManager ServerManager
- chExitEtcd chan struct{}
- // leaseID clientv3.LeaseID
- coreCtx context.Context
- // chLease chan (<-chan *clientv3.LeaseKeepAliveResponse)
- chRegister chan *Server
- allLeaseItem map[int64]*LeaseItem
- // wg sync.WaitGroup
- // wgWatch sync.WaitGroup
- // cancelWatch context.CancelFunc
- }
- func New(config *conf.Config, cli *clientv3.Client) (*EtcdService, error) {
- var client *clientv3.Client
- if cli != nil {
- client = cli
- }
- sd := &EtcdService{
- config: config,
- EtcdCli: client,
- }
- servers := config.GetStringSlice("etcd.addr")
- if len(servers) == 0 {
- servers = append(servers, "127.0.0.1:2379")
- }
- prefix := config.GetString("projectName")
- prefix += "/"
- sd.configure(servers, prefix)
- sd.chExitEtcd = make(chan struct{})
- sd.allLeaseItem = make(map[int64]*LeaseItem, 0)
- if err := sd.InitEtcd(); err != nil {
- log.Fatalln(err)
- }
- sd.chRegister = make(chan *Server, 1)
- return sd, nil
- }
- func (sd *EtcdService) configure(servers []string, prefix string) {
- sd.serviceCfg.servers = servers //[]string{"127.0.0.1:2379"}
- sd.serviceCfg.DialTimeout = time.Second * 5
- sd.ttl = 5 //60
- sd.serviceCfg.Prefix = prefix //"grave/"
- }
- func (sd *EtcdService) InitEtcd() error {
- var cli *clientv3.Client
- var err error
- if sd.EtcdCli == nil {
- cli, err = sd.NewClient()
- if err != nil {
- return err
- }
- sd.EtcdCli = cli
- }
- sd.coreCtx = cli.Ctx()
- // namespaced etcd :)
- sd.EtcdCli.KV = namespace.NewKV(sd.EtcdCli.KV, sd.serviceCfg.Prefix)
- sd.EtcdCli.Watcher = namespace.NewWatcher(sd.EtcdCli.Watcher, sd.serviceCfg.Prefix)
- sd.EtcdCli.Lease = namespace.NewLease(sd.EtcdCli.Lease, sd.serviceCfg.Prefix)
- return nil
- }
- func (sd *EtcdService) NewClient() (*clientv3.Client, error) {
- config := clientv3.Config{
- Endpoints: sd.serviceCfg.servers,
- DialTimeout: sd.serviceCfg.DialTimeout,
- }
- if sd.serviceCfg.User != "" && sd.serviceCfg.Pass != "" {
- config.Username = sd.serviceCfg.User
- config.Password = sd.serviceCfg.Pass
- }
- var cli *clientv3.Client
- var err error
- cli, err = clientv3.New(config)
- if err != nil {
- return nil, err
- }
- return cli, nil
- }
- // 监听传过来的租约通道,创建监听租约的协程
- // 监听应用退出
- // 监听租约协程退出
- func (sd *EtcdService) KeepServerOnline(s *Server) error {
- ctx, cancel := context.WithTimeout(sd.coreCtx, time.Second*5)
- err := sd.RegisterServer(ctx, s)
- cancel()
- if err != nil {
- return err
- }
- go func() {
- for {
- select {
- case s := <-sd.chRegister:
- sd.RegisterServer(sd.coreCtx, s)
- case <-sd.coreCtx.Done():
- return
- }
- }
- }()
- return nil
- }
- func (sd *EtcdService) RegisterServer(ctx context.Context, s *Server) error {
- ticker := time.NewTicker(time.Second)
- defer ticker.Stop()
- for {
- if err := sd.registerServer(s); err == nil {
- return nil
- } else {
- fmt.Printf("registerServer error. %v", err)
- }
- // 主动控制时,返回
- select {
- case <-ctx.Done():
- return errors.New("etcd core closed")
- default:
- }
- // 网络断开时,重试
- select {
- case <-ticker.C:
- case <-ctx.Done():
- return errors.New("etcd core closed")
- }
- }
- // sd.chRegister = make(chan struct{})
- // if err := sd.registerServer(); err != nil {
- // return err
- // }
- // return nil
- }
- // 注册服务器,并保持登录状态
- func (sd *EtcdService) registerServer(s *Server) error {
- li, err := sd.CreateLease()
- if err != nil {
- return err
- }
- opts := []clientv3.OpOption{}
- opts = append(opts, clientv3.WithLease(clientv3.LeaseID(li.LeaseID)))
- ctx, cancel := context.WithTimeout(sd.coreCtx, time.Second*5)
- _, err = sd.EtcdCli.Put(
- ctx,
- s.getKey(),
- s.AsJSONString(),
- opts...,
- )
- cancel()
- if err != nil {
- sd.revokeLease(li)
- return err
- }
- li.Value = s
- sd.allLeaseItem[s.ID] = li
- go func() {
- for {
- select {
- case _, ok := <-li.ChanKeepAliveResponse:
- if !ok {
- sd.chRegister <- li.Value
- return
- }
- case <-sd.coreCtx.Done():
- return
- }
- }
- }()
- return nil
- }
- // 更新服务器信息
- func (sd *EtcdService) UpdateServer(s *Server) error {
- if li, ok := sd.allLeaseItem[s.ID]; ok {
- opts := []clientv3.OpOption{}
- opts = append(opts, clientv3.WithLease(clientv3.LeaseID(li.LeaseID)))
- ctx, cancel := context.WithTimeout(sd.coreCtx, time.Second*1)
- _, err := sd.EtcdCli.Put(
- ctx,
- s.getKey(),
- s.AsJSONString(),
- opts...,
- )
- cancel()
- if err != nil {
- return err
- }
- }
- return nil
- }
- // // 租约通道断开,需要注册新租约
- // // 取消注册,不再需要保持租约在线
- // // 服务关闭
- // func (sd *EtcdService) KeepLease(chLar <-chan *clientv3.LeaseKeepAliveResponse) {
- // defer func() {
- // sd.wg.Done()
- // }()
- // clearLeaseCh := func() {
- // for {
- // select {
- // case <-sd.chLease:
- // default:
- // return
- // }
- // }
- // }
- // for {
- // select {
- // case lid, ok := <-chLar:
- // if ok {
- // fmt.Println(lid)
- // continue
- // }
- // fmt.Println("lease chan close")
- // case <-sd.coreCtx.Done():
- // fmt.Println("client chan close")
- // case <-sd.chExitEtcd:
- // sd.revokeLease()
- // fmt.Println("etcd chan close")
- // return
- // case <-sd.chRegister:
- // sd.revokeLease()
- // fmt.Println("register chan close")
- // return
- // }
- // time.Sleep(time.Second)
- // fmt.Println(1111115)
- // // logger.Infoln("KeepLease RegisterServer")
- // if err := sd.registerServer(); err != nil {
- // clearLeaseCh()
- // // logger.Error("KeepLease error: ", err)
- // } else {
- // break
- // }
- // }
- // fmt.Println("KeepLease exit")
- // }
|