123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- package nacos
- import (
- "context"
- "errors"
- "github.com/nacos-group/nacos-sdk-go/v2/clients"
- "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
- "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
- "github.com/nacos-group/nacos-sdk-go/v2/model"
- "github.com/nacos-group/nacos-sdk-go/v2/vo"
- "leafstalk/log"
- )
- type CenterConfig struct {
- NamespaceId string // 空间命名
- DataID string // 数据ID
- Group string // 分组
- Servers []*ConfigServer // 集群服务器
- ChangeCall func(data string) // 变化回调
- }
- type ConfigServer struct {
- IpAddr string // ip地址
- Port uint64 // 端口
- }
- // ConfigClient 配置中心客户端
- type ConfigClient struct {
- *CenterConfig // 配置
- config_client.IConfigClient // 客户端api
- context.Context // 上下文
- updateCh chan string // 数据更新chan
- data string // 当前数据
- }
- func NewConfigClient(ctx context.Context, cfg *CenterConfig) (cli *ConfigClient, err error) {
- if cfg == nil || len(cfg.NamespaceId) == 0 || len(cfg.DataID) == 0 || len(cfg.Group) == 0 {
- err = errors.New("configuration check failed")
- return
- }
- cli = new(ConfigClient)
- cli.updateCh = make(chan string)
- cli.CenterConfig = cfg
- cli.Context = ctx
- cli.IConfigClient, err = createConfigClient(cfg)
- if err != nil {
- return
- }
- cli.data, err = cli.GetConfig()
- return
- }
- // createConfigClient create config client.
- func createConfigClient(cfg *CenterConfig) (iClient config_client.IConfigClient, err error) {
- clientConfig := *constant.NewClientConfig(
- constant.WithNamespaceId(cfg.NamespaceId), //When namespace is public, fill in the blank string here.
- constant.WithTimeoutMs(5000),
- constant.WithBeatInterval(1000),
- constant.WithNotLoadCacheAtStart(true),
- constant.WithLogDir("./tmp/nacos/log"),
- constant.WithCacheDir("./tmp/nacos/cache"),
- constant.WithLogLevel("debug"),
- )
- var serverConfigs []constant.ServerConfig
- for _, serv := range cfg.Servers {
- c := constant.NewServerConfig(
- serv.IpAddr,
- serv.Port,
- constant.WithScheme("http"),
- constant.WithContextPath("/nacos"),
- )
- serverConfigs = append(serverConfigs, *c)
- }
- // Another way of create config client for dynamic configuration (recommend)
- return clients.NewConfigClient(
- vo.NacosClientParam{
- ClientConfig: &clientConfig,
- ServerConfigs: serverConfigs,
- },
- )
- }
- // GetIClient get client api
- func (c *ConfigClient) GetIClient() config_client.IConfigClient {
- return c.IConfigClient
- }
- // GetData get data
- func (c *ConfigClient) GetData() string {
- return c.data
- }
- // GetConfig get config info
- func (c *ConfigClient) GetConfig() (string, error) {
- return c.IConfigClient.GetConfig(vo.ConfigParam{
- DataId: c.DataID,
- Group: c.Group,
- })
- }
- // PublishConfig publish config
- func (c *ConfigClient) PublishConfig(content string) (bool, error) {
- return c.IConfigClient.PublishConfig(vo.ConfigParam{
- DataId: c.DataID,
- Group: c.Group,
- Content: content,
- })
- }
- // DeleteConfig delete config
- func (c *ConfigClient) DeleteConfig() (bool, error) {
- return c.IConfigClient.DeleteConfig(vo.ConfigParam{
- DataId: c.DataID,
- Group: c.Group,
- })
- }
- // CancelListenConfig Cancel the listening of config change event
- func (c *ConfigClient) CancelListenConfig() error {
- return c.IConfigClient.CancelListenConfig(vo.ConfigParam{
- DataId: c.DataID,
- Group: c.Group,
- })
- }
- // SearchConfig Search config
- func (c *ConfigClient) SearchConfig(search string, pageNo, pageSize int) (*model.ConfigPage, error) {
- return c.IConfigClient.SearchConfig(vo.SearchConfigParam{
- DataId: c.DataID,
- Group: c.Group,
- Search: search,
- PageNo: pageNo,
- PageSize: pageSize,
- })
- }
- // ListenChanges Listen config change event
- func (c *ConfigClient) ListenChanges() {
- err := c.IConfigClient.ListenConfig(vo.ConfigParam{
- DataId: c.DataID,
- Group: c.Group,
- OnChange: func(namespace, group, dataId, data string) {
- log.WithField("nacos", "config").Warn("OnChange group:" + group + ", dataId:" + dataId + ", data:" + data)
- c.data = data
- c.updateCh <- data
- },
- })
- if err != nil {
- log.WithField("nacos", "config").Fatalf("ListenChanges err:%+v", err)
- return
- }
- go func() {
- for {
- select {
- case newData := <-c.updateCh:
- log.WithField("nacos", "config").Warnf("Received updated config:%v", newData)
- // 进行其他操作,例如重新加载配置、进行灰度发布等
- if c.ChangeCall != nil {
- c.ChangeCall(newData)
- }
- case <-c.Context.Done():
- if err = c.CancelListenConfig(); err != nil {
- log.WithField("nacos", "config").Warnf("ListenChanges CancelListenConfig err:%+v", err)
- }
- log.Warn("ListenChanges Done.")
- return
- }
- }
- }()
- }
|