nacos_dynamic_event.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package conf
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "leafstalk/log"
  6. "reflect"
  7. "sync"
  8. "leafstalk/otherutils/deepcopy"
  9. )
  10. // EventFunc 动态配置事件回调函数类型定义
  11. type EventFunc func(key string, val interface{})
  12. // event 动态事件结构体
  13. type event struct {
  14. sync.Mutex
  15. config map[string]interface{} // 本地缓存配置。用于初始化和对比新配置中的差异项,当存在差异项时进行回调处理使用
  16. list map[string][]EventFunc // 所有事件的列表
  17. }
  18. var dynamicEvent *event // 动态事件实例
  19. // initDynamicEvent 初始化动态配置事件
  20. func initDynamicEvent(cfgCenter *ConfigCenterAgent, group string) (err error) {
  21. dynamicEvent = &event{
  22. list: make(map[string][]EventFunc),
  23. }
  24. // 初始化时先获取一次动态配置,将其保存到本地
  25. content, err := cfgCenter.GetConfig("dynamicConfig", group)
  26. if err != nil {
  27. return
  28. }
  29. if err = json.Unmarshal([]byte(content), &dynamicEvent.config); err != nil {
  30. log.Warnf("initDynamicEvent error %v", content)
  31. return
  32. }
  33. if dynamicEvent.config == nil {
  34. err = errors.New("exception in obtaining dynamic config, config is nil")
  35. return
  36. }
  37. // 开始监听动态配置变化
  38. return cfgCenter.ListenChanges("dynamicConfig", group, func(namespace, group, dataId, newContent string) {
  39. var newConfig map[string]interface{}
  40. if err = json.Unmarshal([]byte(newContent), &newConfig); err != nil {
  41. // 解析配置出错,打印错误日志并返回
  42. log.Errorf("error parsing dynamic config for listen, err:%+v, newContent:%v", err, newContent)
  43. return
  44. }
  45. log.Infof("initDynamicEvent content %v", newContent)
  46. keys := CompareMaps(dynamicEvent.config, newConfig)
  47. if len(keys) == 0 {
  48. return
  49. }
  50. dynamicEvent.Lock()
  51. defer dynamicEvent.Unlock()
  52. dynamicEvent.config = newConfig
  53. for _, key := range keys {
  54. dynamicEvent.CallBack(key)
  55. }
  56. })
  57. }
  58. // CallBack 触发一个分组的事件回调
  59. func (e *event) CallBack(key string) {
  60. val := e.config[key]
  61. if cbfs, ok := e.list[key]; ok {
  62. for _, f := range cbfs {
  63. val2, err := deepcopy.Copy(val)
  64. if err != nil {
  65. log.Errorf("CallBack deepcopy error %v", err)
  66. }
  67. f(key, val2)
  68. }
  69. }
  70. }
  71. // CompareMaps 检查old 和 new 中每个 key 的值是否相等,返回不相等的key列表
  72. func CompareMaps(old, new map[string]interface{}) (diffKeys []string) {
  73. if new == nil {
  74. return
  75. }
  76. for key := range old {
  77. value2, ok := new[key]
  78. if ok && !reflect.DeepEqual(old[key], value2) {
  79. diffKeys = append(diffKeys, key)
  80. }
  81. }
  82. for key := range new {
  83. if _, ok := old[key]; !ok {
  84. diffKeys = append(diffKeys, key)
  85. }
  86. }
  87. return diffKeys
  88. }
  89. // RegisterAndCall 往一个分组中注册事件并触发一次事件回调
  90. // 这里的 keyGroup 表示一个配置键,同一个键可以有多个回调方法注册。
  91. // 当键发生变化时,会触发该键下所有注册的回调事件。
  92. func RegisterAndCall(key string, callback EventFunc) {
  93. dynamicEvent.Lock()
  94. defer dynamicEvent.Unlock()
  95. val := dynamicEvent.config[key]
  96. val2, err := deepcopy.Copy(val)
  97. if err != nil {
  98. log.Fatalf("RegisterAndCall deepcopy error %v", err)
  99. }
  100. dynamicEvent.list[key] = append(dynamicEvent.list[key], callback)
  101. callback(key, val2)
  102. }