nacos_config.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package nacos
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/nacos-group/nacos-sdk-go/v2/clients"
  6. "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
  7. "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
  8. "github.com/nacos-group/nacos-sdk-go/v2/model"
  9. "github.com/nacos-group/nacos-sdk-go/v2/vo"
  10. "leafstalk/log"
  11. )
  12. type CenterConfig struct {
  13. NamespaceId string // 空间命名
  14. DataID string // 数据ID
  15. Group string // 分组
  16. Servers []*ConfigServer // 集群服务器
  17. ChangeCall func(data string) // 变化回调
  18. }
  19. type ConfigServer struct {
  20. IpAddr string // ip地址
  21. Port uint64 // 端口
  22. }
  23. // ConfigClient 配置中心客户端
  24. type ConfigClient struct {
  25. *CenterConfig // 配置
  26. config_client.IConfigClient // 客户端api
  27. context.Context // 上下文
  28. updateCh chan string // 数据更新chan
  29. data string // 当前数据
  30. }
  31. func NewConfigClient(ctx context.Context, cfg *CenterConfig) (cli *ConfigClient, err error) {
  32. if cfg == nil || len(cfg.NamespaceId) == 0 || len(cfg.DataID) == 0 || len(cfg.Group) == 0 {
  33. err = errors.New("configuration check failed")
  34. return
  35. }
  36. cli = new(ConfigClient)
  37. cli.updateCh = make(chan string)
  38. cli.CenterConfig = cfg
  39. cli.Context = ctx
  40. cli.IConfigClient, err = createConfigClient(cfg)
  41. if err != nil {
  42. return
  43. }
  44. cli.data, err = cli.GetConfig()
  45. return
  46. }
  47. // createConfigClient create config client.
  48. func createConfigClient(cfg *CenterConfig) (iClient config_client.IConfigClient, err error) {
  49. clientConfig := *constant.NewClientConfig(
  50. constant.WithNamespaceId(cfg.NamespaceId), //When namespace is public, fill in the blank string here.
  51. constant.WithTimeoutMs(5000),
  52. constant.WithBeatInterval(1000),
  53. constant.WithNotLoadCacheAtStart(true),
  54. constant.WithLogDir("./tmp/nacos/log"),
  55. constant.WithCacheDir("./tmp/nacos/cache"),
  56. constant.WithLogLevel("debug"),
  57. )
  58. var serverConfigs []constant.ServerConfig
  59. for _, serv := range cfg.Servers {
  60. c := constant.NewServerConfig(
  61. serv.IpAddr,
  62. serv.Port,
  63. constant.WithScheme("http"),
  64. constant.WithContextPath("/nacos"),
  65. )
  66. serverConfigs = append(serverConfigs, *c)
  67. }
  68. // Another way of create config client for dynamic configuration (recommend)
  69. return clients.NewConfigClient(
  70. vo.NacosClientParam{
  71. ClientConfig: &clientConfig,
  72. ServerConfigs: serverConfigs,
  73. },
  74. )
  75. }
  76. // GetIClient get client api
  77. func (c *ConfigClient) GetIClient() config_client.IConfigClient {
  78. return c.IConfigClient
  79. }
  80. // GetData get data
  81. func (c *ConfigClient) GetData() string {
  82. return c.data
  83. }
  84. // GetConfig get config info
  85. func (c *ConfigClient) GetConfig() (string, error) {
  86. return c.IConfigClient.GetConfig(vo.ConfigParam{
  87. DataId: c.DataID,
  88. Group: c.Group,
  89. })
  90. }
  91. // PublishConfig publish config
  92. func (c *ConfigClient) PublishConfig(content string) (bool, error) {
  93. return c.IConfigClient.PublishConfig(vo.ConfigParam{
  94. DataId: c.DataID,
  95. Group: c.Group,
  96. Content: content,
  97. })
  98. }
  99. // DeleteConfig delete config
  100. func (c *ConfigClient) DeleteConfig() (bool, error) {
  101. return c.IConfigClient.DeleteConfig(vo.ConfigParam{
  102. DataId: c.DataID,
  103. Group: c.Group,
  104. })
  105. }
  106. // CancelListenConfig Cancel the listening of config change event
  107. func (c *ConfigClient) CancelListenConfig() error {
  108. return c.IConfigClient.CancelListenConfig(vo.ConfigParam{
  109. DataId: c.DataID,
  110. Group: c.Group,
  111. })
  112. }
  113. // SearchConfig Search config
  114. func (c *ConfigClient) SearchConfig(search string, pageNo, pageSize int) (*model.ConfigPage, error) {
  115. return c.IConfigClient.SearchConfig(vo.SearchConfigParam{
  116. DataId: c.DataID,
  117. Group: c.Group,
  118. Search: search,
  119. PageNo: pageNo,
  120. PageSize: pageSize,
  121. })
  122. }
  123. // ListenChanges Listen config change event
  124. func (c *ConfigClient) ListenChanges() {
  125. err := c.IConfigClient.ListenConfig(vo.ConfigParam{
  126. DataId: c.DataID,
  127. Group: c.Group,
  128. OnChange: func(namespace, group, dataId, data string) {
  129. log.WithField("nacos", "config").Warn("OnChange group:" + group + ", dataId:" + dataId + ", data:" + data)
  130. c.data = data
  131. c.updateCh <- data
  132. },
  133. })
  134. if err != nil {
  135. log.WithField("nacos", "config").Fatalf("ListenChanges err:%+v", err)
  136. return
  137. }
  138. go func() {
  139. for {
  140. select {
  141. case newData := <-c.updateCh:
  142. log.WithField("nacos", "config").Warnf("Received updated config:%v", newData)
  143. // 进行其他操作,例如重新加载配置、进行灰度发布等
  144. if c.ChangeCall != nil {
  145. c.ChangeCall(newData)
  146. }
  147. case <-c.Context.Done():
  148. if err = c.CancelListenConfig(); err != nil {
  149. log.WithField("nacos", "config").Warnf("ListenChanges CancelListenConfig err:%+v", err)
  150. }
  151. log.Warn("ListenChanges Done.")
  152. return
  153. }
  154. }
  155. }()
  156. }