nacos.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package conf
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "leafstalk/log"
  7. "strings"
  8. "github.com/nacos-group/nacos-sdk-go/v2/clients"
  9. "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
  10. "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
  11. "github.com/nacos-group/nacos-sdk-go/v2/vo"
  12. "github.com/spf13/viper"
  13. )
  14. // LocalNacosConfig 本地nacos配置
  15. type LocalNacosConfig struct {
  16. NamespaceId string `json:"namespaceId"` // 空间命名
  17. LineGroup string `json:"lineGroup"` // 线路分组
  18. PublicGroup string `json:"publicGroup"` // 线路分组
  19. UserName string `json:"userName"` //
  20. PassWord string `json:"passWord"` //
  21. Servers []*ConfigServer `json:"servers"` // 集群服务器
  22. }
  23. type ConfigServer struct {
  24. IpAddr string `json:"ipAddr"` // ip地址
  25. Port uint64 `json:"port"` // 端口
  26. }
  27. // ConfigCenterAgent 配置中心代理
  28. type ConfigCenterAgent struct {
  29. NacosClient config_client.IConfigClient // 客户端api
  30. }
  31. var (
  32. configCenterAgent *ConfigCenterAgent
  33. NacosConfig *LocalNacosConfig
  34. )
  35. // 配置中心代理
  36. func NewNacosConfigCenter(cfg *LocalNacosConfig, dataDirName string) (cli *ConfigCenterAgent, err error) {
  37. // 基本配置检查
  38. if cfg == nil || len(cfg.NamespaceId) == 0 { // len(cfg.DataID) == 0 ||
  39. err = errors.New("configuration check failed")
  40. return
  41. }
  42. // nacos客户端
  43. nacosClient, err := createNacosClient(cfg, dataDirName)
  44. if err != nil {
  45. return nil, err
  46. }
  47. cli = new(ConfigCenterAgent)
  48. cli.NacosClient = nacosClient
  49. return
  50. }
  51. // createNacosClient create config client.
  52. func createNacosClient(cfg *LocalNacosConfig, dataDirName string) (iClient config_client.IConfigClient, err error) {
  53. clientConfig := *constant.NewClientConfig(
  54. constant.WithNamespaceId(cfg.NamespaceId), //When namespace is public, fill in the blank string here.
  55. constant.WithTimeoutMs(5000),
  56. constant.WithBeatInterval(1000),
  57. constant.WithNotLoadCacheAtStart(true),
  58. constant.WithLogDir(fmt.Sprintf("../nacos/%s/log", dataDirName)),
  59. constant.WithCacheDir(fmt.Sprintf("../nacos/%s/cache", dataDirName)),
  60. constant.WithLogLevel("info"),
  61. constant.WithUsername(cfg.UserName),
  62. constant.WithPassword(cfg.PassWord),
  63. )
  64. var serverConfigs []constant.ServerConfig
  65. for _, serv := range cfg.Servers {
  66. c := constant.NewServerConfig(
  67. serv.IpAddr,
  68. serv.Port,
  69. constant.WithScheme("http"),
  70. constant.WithContextPath("/nacos"),
  71. )
  72. serverConfigs = append(serverConfigs, *c)
  73. }
  74. // Another way of create config client for dynamic configuration (recommend)
  75. return clients.NewConfigClient(
  76. vo.NacosClientParam{
  77. ClientConfig: &clientConfig,
  78. ServerConfigs: serverConfigs,
  79. },
  80. )
  81. }
  82. // GetIClient get client api
  83. func (c *ConfigCenterAgent) GetIClient() config_client.IConfigClient {
  84. return c.NacosClient
  85. }
  86. // GetConfig get config info
  87. func (c *ConfigCenterAgent) GetConfig(dataId string, group string) (string, error) {
  88. return c.NacosClient.GetConfig(vo.ConfigParam{
  89. DataId: dataId,
  90. Group: group,
  91. })
  92. }
  93. // PublishConfig publish config
  94. func (c *ConfigCenterAgent) PublishConfig(content string, dataId string, group string) (bool, error) {
  95. return c.NacosClient.PublishConfig(vo.ConfigParam{
  96. DataId: dataId,
  97. Group: group,
  98. Content: content,
  99. })
  100. }
  101. // DeleteConfig delete config
  102. func (c *ConfigCenterAgent) DeleteConfig(dataId string, group string) (bool, error) {
  103. return c.NacosClient.DeleteConfig(vo.ConfigParam{
  104. DataId: dataId,
  105. Group: group,
  106. })
  107. }
  108. // CancelListenConfig Cancel the listening of config change event
  109. func (c *ConfigCenterAgent) CancelListenConfig(dataId string, group string) error {
  110. return c.NacosClient.CancelListenConfig(vo.ConfigParam{
  111. DataId: dataId,
  112. Group: group,
  113. })
  114. }
  115. // ListenChanges Listen config change event
  116. func (c *ConfigCenterAgent) ListenChanges(dataId string, group string, onChange func(namespace, group, dataId, data string)) error {
  117. return c.NacosClient.ListenConfig(vo.ConfigParam{
  118. DataId: dataId,
  119. Group: group,
  120. OnChange: onChange,
  121. })
  122. }
  123. func ListenConfigChanged(group string, dataId string, callback func(group, dataId string, newContent string)) error {
  124. content1, err := configCenterAgent.GetConfig(dataId, group)
  125. if err != nil {
  126. return err
  127. }
  128. callback(group, dataId, content1)
  129. // 开始监听动态配置变化
  130. err = configCenterAgent.ListenChanges(dataId, group, func(namespace, group, dataId, newContent string) {
  131. defer func() {
  132. if err := recover(); err != nil {
  133. log.Warnf("ListenConfigChanged ListenChanges error %v", err)
  134. }
  135. }()
  136. callback(group, dataId, newContent)
  137. })
  138. if err != nil {
  139. return err
  140. }
  141. return nil
  142. }
  143. // LoadSysConfig 加载系统配置
  144. // 先从本地拿到nacos配置,然后到nacos中拿系统配置
  145. func LoadSysConfig(cfgFile string, appName string) (sub1 *Config, err error) {
  146. conf1 := viper.New()
  147. conf1.SetConfigFile(cfgFile) //"../conf/grave.json"
  148. if err = conf1.ReadInConfig(); err != nil {
  149. log.Warnf("LoadSysConfig ReadInConfig error %v", err)
  150. return
  151. }
  152. // 取nacos相关配置
  153. var nacosCfg *LocalNacosConfig
  154. if err = conf1.UnmarshalKey("nacos", &nacosCfg); err != nil {
  155. log.Warnf("LoadSysConfig UnmarshalKey error %v", err)
  156. return
  157. }
  158. if nacosCfg == nil {
  159. return nil, errors.New("no config")
  160. }
  161. log.Infof("LoadSysConfig nacosCfg %v", *nacosCfg)
  162. // 构成配置中心
  163. cfgCenter, err := NewNacosConfigCenter(nacosCfg, appName)
  164. if err != nil {
  165. log.Warnf("LoadSysConfig NewNacosConfigCenter error %v", err)
  166. return
  167. }
  168. configCenterAgent = cfgCenter
  169. NacosConfig = nacosCfg
  170. // // 初始化动态配置事件
  171. // err = initDynamicEvent(cfgCenter, nacosCfg.PublicGroup)
  172. // if err != nil {
  173. // log.Warnf("LoadSysConfig initDynamicEvent error %v", err)
  174. // return
  175. // }
  176. // 从配置中心拿静态配置
  177. content, err := cfgCenter.GetConfig(appName, nacosCfg.LineGroup)
  178. if err != nil {
  179. log.Warnf("LoadSysConfig GetConfig %v error %v", nacosCfg.LineGroup, err)
  180. return
  181. }
  182. if err = conf1.MergeConfig(strings.NewReader(content)); err != nil {
  183. log.Warnf("LoadSysConfig MergeConfig %v error %v %v", nacosCfg.LineGroup, content, err)
  184. return
  185. }
  186. // // 共享组配置
  187. // content2, err2 := cfgCenter.GetConfig(appName, nacosCfg.PublicGroup)
  188. // if err2 != nil {
  189. // log.Warnf("LoadSysConfig GetConfig %v error %v", nacosCfg.PublicGroup, err2)
  190. // return
  191. // }
  192. // if err = conf1.MergeConfig(strings.NewReader(content2)); err != nil {
  193. // log.Warnf("LoadSysConfig MergeConfig %v error %v %v", nacosCfg.PublicGroup, content, err)
  194. // return
  195. // }
  196. // 拿各模块配置
  197. includeList := conf1.GetStringSlice("include")
  198. for _, v := range includeList {
  199. dataId := v
  200. lineGroup := nacosCfg.LineGroup
  201. ss := strings.Split(v, ".")
  202. if len(ss) == 2 {
  203. lineGroup = ss[0]
  204. dataId = ss[1]
  205. }
  206. // 先获取数据项
  207. var subContent string
  208. if subContent, err = cfgCenter.GetConfig(dataId, lineGroup); err != nil {
  209. log.Warnf("LoadSysConfig GetConfig %v error2 %v", dataId, err)
  210. return nil, err
  211. }
  212. var data map[string]interface{}
  213. if err = json.Unmarshal([]byte(subContent), &data); err != nil {
  214. log.Warnf("LoadSysConfig Unmarshal error2 %v", err)
  215. err = fmt.Errorf("load include sub:%v, err:%+v", dataId, err)
  216. return nil, err
  217. }
  218. conf1.Set(dataId, data)
  219. }
  220. // 展示
  221. for _, key := range conf1.AllKeys() {
  222. value := conf1.Get(key)
  223. fmt.Printf("%s: %v\n", key, value)
  224. }
  225. sub1 = NewConfig(conf1)
  226. return
  227. }
  228. // ["etcd", "nats", "sharedRedis", "lines.etcd", "lines.nats.[gmSbject.line1]", "lines.redis.[line1]" ]
  229. // 从.[分割,前面是数据项;后面是可选项
  230. // 数据项单项:默认数据组做数据组 单项值做数据ID
  231. // 数据项双项:第一项是数据组,第二项是数据ID
  232. // 可选项:可以有多个可选项
  233. func parseIncludeItem(itemVal string, defaultLineGroup string) (lineGroup string, dataId string, opts []string, err error) {
  234. splitDataId := func(val string) (group string, dataId string, err error) {
  235. vals := strings.Split(val, ".")
  236. if len(vals) == 1 {
  237. return defaultLineGroup, vals[0], nil
  238. } else if len(vals) == 2 {
  239. return vals[0], vals[1], nil
  240. }
  241. return "", "", errors.New("error dataId format")
  242. }
  243. // 用.[分割字符串
  244. vals := strings.Split(itemVal, ".[")
  245. num := len(vals)
  246. if num != 1 && num != 2 {
  247. return "", "", nil, errors.New("error dataId format1")
  248. }
  249. if num == 2 {
  250. vals[1] = strings.TrimRight(vals[1], "]")
  251. }
  252. // 先分割数据项
  253. lineGroup, dataId, err = splitDataId(vals[0])
  254. if err != nil {
  255. return "", "", nil, errors.New("error dataId format2")
  256. }
  257. if num == 1 {
  258. return lineGroup, dataId, nil, nil
  259. }
  260. // 分割选项
  261. opts = strings.Split(vals[1], ",")
  262. return lineGroup, dataId, opts, nil
  263. }
  264. // "lines.redis.[line1]"; "lines.redis.[db.line1]"; "lines.redis.[db.line1, addr.line1]"
  265. // 所有非可选项 + 可选项 做数据项的值.可以有多个可选项. 针对每个子可选项:
  266. // 分割后只有一项:单项值做数据项的值;只有一项时,不能有其他可选项,例如不能lines.redis.[line1, addr.line1]
  267. // 分割后有2项:第二项的值提取出来,做第一项的值
  268. func adjustOpts(dataId string, data map[string]interface{}, opts []string) (map[string]interface{}, error) {
  269. for _, opt := range opts {
  270. path := strings.Split(opt, ".")
  271. if len(path) == 1 {
  272. // 子项做主项,只能一个,并且没有兄弟子项
  273. if len(opts) != 1 {
  274. return nil, errors.New("errors opts format1")
  275. }
  276. data[dataId] = data[opt]
  277. return data, nil
  278. } else if len(path) == 2 {
  279. // 孙子项替换子项 lines.redis.[db.line1]
  280. childKey := path[0]
  281. grandsonKey := path[1]
  282. childVal, ok := data[childKey]
  283. if !ok {
  284. return nil, errors.New("errors opts format2")
  285. }
  286. grandsons, ok := childVal.(map[string]interface{})
  287. if !ok {
  288. return nil, errors.New("errors opts format3")
  289. }
  290. data[childKey], ok = grandsons[grandsonKey]
  291. if !ok {
  292. return nil, errors.New("errors opts format4")
  293. }
  294. continue
  295. }
  296. return nil, errors.New("errors opts formatn")
  297. }
  298. return data, nil
  299. }