append.go 6.5 KB


  1. package memcache
  2. import (
  3. "fmt"
  4. "leafstalk/covenant/monitor"
  5. "leafstalk/module"
  6. "leafstalk/otherutils"
  7. "reflect"
  8. "strconv"
  9. "time"
  10. cach2 "github.com/patrickmn/go-cache"
  11. )
  12. type DirtyChecker interface {
  13. IsDirty() bool
  14. }
  15. type AppendCache[T DirtyChecker] struct {
  16. allPlayers *cach2.Cache
  17. tokens *otherutils.RoutineTokens
  18. loadFilter otherutils.FilterInt64
  19. skeleton *module.Skeleton
  20. name string
  21. Load func(id int64) (T, error)
  22. New func(playerId int64) T
  23. PreProcess func(T)
  24. LoadDirty func(v T) error
  25. }
  26. func NewAppendCache[T DirtyChecker](aliveTl time.Duration, skeleton *module.Skeleton) *AppendCache[T] {
  27. c := new(AppendCache[T])
  28. c.skeleton = skeleton
  29. c.allPlayers = cach2.New(aliveTl, 10*time.Minute)
  30. c.tokens = otherutils.NewRoutineTokens(10)
  31. var tmp T
  32. val := reflect.TypeOf(tmp)
  33. c.name = val.Elem().Name()
  34. return c
  35. }
  36. type callBackFun2[T interface{}] struct {
  37. fprocess func(p T)
  38. fErrProcess func(playerId int64, err error)
  39. }
  40. func (c *AppendCache[T]) GetCacheData(playerID int64) (T, bool) {
  41. k := strconv.FormatInt(playerID, 10)
  42. if v, ok := c.allPlayers.Get(k); ok {
  43. if v2, ok := v.(T); ok {
  44. return v2, true
  45. }
  46. }
  47. var zeroVal T
  48. return zeroVal, false
  49. }
  50. func (c *AppendCache[T]) SetCacheData(playerID int64, v T) {
  51. k2 := strconv.FormatInt(playerID, 10)
  52. c.allPlayers.SetDefault(k2, v)
  53. }
  54. func (c *AppendCache[T]) DeleteCacheData(playerID int64) {
  55. k2 := strconv.FormatInt(playerID, 10)
  56. c.allPlayers.Delete(k2)
  57. }
  58. func (c *AppendCache[T]) FlushCacheData() {
  59. c.allPlayers.Flush()
  60. }
  61. func (c *AppendCache[T]) Items(callBack func(playerId int64, val T)) {
  62. items := c.allPlayers.Items()
  63. for k, v := range items {
  64. k2, err := strconv.ParseInt(k, 10, 64)
  65. if err != nil {
  66. continue
  67. }
  68. v2, ok2 := v.Object.(T)
  69. if ok2 {
  70. callBack(k2, v2)
  71. }
  72. }
  73. }
  74. func (c *AppendCache[T]) LoadAndProcess(playerId int64, fprocess func(p T), fErrProcess func(playerId int64, err error)) {
  75. //找到玩家
  76. isDirty := false
  77. st := time.Now()
  78. p, has := c.GetCacheData(playerId)
  79. if has {
  80. isDirty = p.IsDirty()
  81. if !isDirty {
  82. c.PreProcess(p)
  83. c.SetCacheData(playerId, p)
  84. fprocess(p)
  85. monitor.GoLoadTimeoutWarn(fmt.Sprintf("%s.LoadAndProcess", c.name), playerId, st)
  86. return
  87. }
  88. }
  89. //正在加载
  90. if c.loadFilter.IsExist(playerId) {
  91. t := new(callBackFun2[T])
  92. t.fprocess = fprocess
  93. t.fErrProcess = fErrProcess
  94. c.loadFilter.AppendTask(playerId, t)
  95. return
  96. }
  97. c.loadFilter.Add(playerId)
  98. var err error
  99. c.skeleton.Go(func() {
  100. //协程加载
  101. c.tokens.Acquire()
  102. defer c.tokens.Release()
  103. if isDirty {
  104. err = c.LoadDirty(p)
  105. } else {
  106. p, err = c.Load(playerId)
  107. }
  108. }, func() {
  109. defer func() {
  110. s := c.loadFilter.Remove(playerId)
  111. for _, v := range s {
  112. cft := v.(*callBackFun2[T])
  113. if err != nil {
  114. cft.fErrProcess(playerId, err)
  115. } else {
  116. cft.fprocess(p)
  117. }
  118. }
  119. monitor.GoLoadTimeoutWarn(fmt.Sprintf("%s.LoadAndProcess1", c.name), playerId, st)
  120. }()
  121. if err != nil {
  122. fErrProcess(playerId, err)
  123. return
  124. }
  125. if isZero2(p) {
  126. p = c.New(playerId)
  127. }
  128. c.PreProcess(p)
  129. c.SetCacheData(playerId, p)
  130. fprocess(p)
  131. })
  132. }
  133. func isZero2[T any](v T) bool {
  134. return reflect.ValueOf(v).IsZero()
  135. }
  136. // func IsNil(i interface{}) bool {
  137. // if i == nil {
  138. // return true
  139. // }
  140. // switch reflect.TypeOf(i).Kind() {
  141. // case reflect.Ptr, reflect.Map, reflect.Array, reflect.Chan, reflect.Slice:
  142. // return reflect.ValueOf(i).IsNil()
  143. // }
  144. // return false
  145. // }
  146. // func (c *AppendCache[T]) LoadManyAndNoCacheProcess(loadMany func() (map[int64]*T, error), fprocess func(players []*T), fErrProcess func(err error)) {
  147. // var ps map[int64]*T
  148. // var err error
  149. // c.skeleton.Go(func() {
  150. // //协程加载
  151. // c.tokens.Acquire()
  152. // defer c.tokens.Release()
  153. // ps, err = loadMany()
  154. // }, func() {
  155. // defer func() {
  156. // }()
  157. // if err != nil {
  158. // fErrProcess(err)
  159. // return
  160. // }
  161. // lst := make([]*T, 0, len(ps))
  162. // for id, v := range ps {
  163. // old := c.GetCacheData(id)
  164. // if old != nil {
  165. // v = old
  166. // }
  167. // c.PreProcess(v)
  168. // // c.SetCacheData(id, v)
  169. // lst = append(lst, v)
  170. // }
  171. // fprocess(lst)
  172. // })
  173. // }
  174. // func (c *AppendCache[T]) LoadMultiAndProcess(playerId1 int64, playerId2 int64, fprocess func(p1 *T, p2 *T), fErrProcess func(err error)) {
  175. // c.LoadAndProcess(playerId1, func(p1 *T) {
  176. // c.LoadAndProcess(playerId2, func(p2 *T) {
  177. // fprocess(p1, p2)
  178. // }, func(playerId int64, err error) {
  179. // fErrProcess(err)
  180. // })
  181. // }, func(playerId int64, err error) {
  182. // fErrProcess(err)
  183. // })
  184. // }
  185. // func NewPlayer(playerId int64) *PlayerTasks {
  186. // return nil
  187. // }
  188. // func LoadPlayer(playerId int64) (*PlayerTasks, error) {
  189. // return nil, nil
  190. // }
  191. // func preProcess(p *PlayerTasks) {
  192. // }
  193. // allPayPlayers = memcache.NewCache[PlayerTasks](5*time.Minute, m.Skeleton) //.Init(LoadPlayer, NewObj, PrePocess)
  194. // allPayPlayers.New = NewPlayer
  195. // allPayPlayers.Load = LoadPlayer
  196. // allPayPlayers.PreProcess = preProcess
  197. // type PlayerTasks struct {
  198. // PlayerID int64
  199. // Tasks []string
  200. // }
  201. // func NewPlayer(playerId int64) *PlayerTasks {
  202. // return &PlayerTasks{
  203. // PlayerID: playerId,
  204. // Tasks: []string{"Task1", "Task2"},
  205. // }
  206. // }
  207. // // 实现 Load 函数,用于加载 PlayerTasks 对象
  208. // func LoadPlayer(playerId int64) (*PlayerTasks, error) {
  209. // // 模拟从数据库或其他存储中加载数据
  210. // return &PlayerTasks{
  211. // PlayerID: playerId,
  212. // Tasks: []string{"Task3", "Task4"},
  213. // }, nil
  214. // }
  215. // // 实现 PreProcess 函数,用于预处理 PlayerTasks 对象
  216. // func preProcess(p *PlayerTasks) bool {
  217. // // 模拟一些预处理操作
  218. // p.Tasks = append(p.Tasks, "PreprocessedTask")
  219. // return true
  220. // }
  221. // func example3() {
  222. // // 初始化 Skeleton(假设 Skeleton 已经定义)
  223. // skeleton := &module.Skeleton{}
  224. // // 创建一个 AppendCache 实例,缓存存活时间为 5 分钟
  225. // cache := NewGenericCache[PlayerTasks](5*time.Minute, skeleton)
  226. // // 设置 New、Load 和 PreProcess 函数
  227. // cache.New = NewPlayer
  228. // cache.Load = LoadPlayer
  229. // cache.PreProcess = preProcess
  230. // // 加载并处理玩家数据
  231. // playerID := int64(123)
  232. // cache.LoadAndProcess(playerID, func(p *PlayerTasks) {
  233. // // 处理加载成功的玩家数据
  234. // fmt.Printf("Player %d tasks: %v\n", p.PlayerID, p.Tasks)
  235. // }, func(playerId int64, err error) {
  236. // // 处理加载失败的情况
  237. // fmt.Printf("Failed to load player %d, error code: %d\n", playerId, err)
  238. // })
  239. // // 等待一段时间,以便协程完成
  240. // time.Sleep(1 * time.Second)
  241. // }