generic.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package memcache
  2. import (
  3. "fmt"
  4. "leafstalk/covenant/monitor"
  5. "leafstalk/log"
  6. "leafstalk/module"
  7. "leafstalk/otherutils"
  8. "reflect"
  9. "strconv"
  10. "time"
  11. cach2 "github.com/patrickmn/go-cache"
  12. )
  13. type GenericCache[T interface{}] struct {
  14. allPlayers *cach2.Cache
  15. tokens *otherutils.RoutineTokens
  16. loadFilter otherutils.FilterInt64
  17. skeleton *module.Skeleton
  18. name string
  19. Load func(id int64) (*T, error)
  20. New func(playerId int64) *T
  21. PreProcess func(*T)
  22. }
  23. func NewGenericCache[T interface{}](aliveTl time.Duration, skeleton *module.Skeleton) *GenericCache[T] {
  24. c := new(GenericCache[T])
  25. c.skeleton = skeleton
  26. c.allPlayers = cach2.New(aliveTl, 10*time.Minute)
  27. c.tokens = otherutils.NewRoutineTokens(10)
  28. var tmp *T
  29. val := reflect.TypeOf(tmp)
  30. c.name = val.Elem().Name()
  31. return c
  32. }
  33. type callBackFun[T interface{}] struct {
  34. fprocess func(p *T)
  35. fErrProcess func(playerId int64, err error)
  36. }
  37. func (c *GenericCache[T]) GetCacheData(playerID int64) *T {
  38. k := strconv.FormatInt(playerID, 10)
  39. if v, ok := c.allPlayers.Get(k); ok {
  40. if v2, ok := v.(*T); ok {
  41. return v2
  42. }
  43. }
  44. return nil
  45. }
  46. func (c *GenericCache[T]) SetCacheData(playerID int64, v *T) {
  47. k2 := strconv.FormatInt(playerID, 10)
  48. c.allPlayers.SetDefault(k2, v)
  49. }
  50. func (c *GenericCache[T]) DeleteCacheData(playerID int64) {
  51. k2 := strconv.FormatInt(playerID, 10)
  52. c.allPlayers.Delete(k2)
  53. }
  54. func (c *GenericCache[T]) Items(callBack func(playerId int64, val *T)) {
  55. items := c.allPlayers.Items()
  56. for k, v := range items {
  57. k2, err := strconv.ParseInt(k, 10, 64)
  58. if err != nil {
  59. continue
  60. }
  61. v2, ok2 := v.Object.(*T)
  62. if ok2 {
  63. callBack(k2, v2)
  64. }
  65. }
  66. }
  67. func (c *GenericCache[T]) LoadAndProcess(playerId int64, fprocess func(p *T), fErrProcess func(playerId int64, err error)) {
  68. //找到玩家
  69. st := time.Now()
  70. p := c.GetCacheData(playerId)
  71. if p != nil {
  72. c.PreProcess(p)
  73. c.SetCacheData(playerId, p)
  74. fprocess(p)
  75. monitor.GoLoadTimeoutWarn(fmt.Sprintf("%s.LoadAndProcess", c.name), playerId, st)
  76. return
  77. }
  78. //正在加载
  79. if c.loadFilter.IsExist(playerId) {
  80. t := new(callBackFun[T])
  81. t.fprocess = fprocess
  82. t.fErrProcess = fErrProcess
  83. c.loadFilter.AppendTask(playerId, t)
  84. return
  85. }
  86. c.loadFilter.Add(playerId)
  87. var err error
  88. c.skeleton.Go(func() {
  89. //协程加载
  90. c.tokens.Acquire()
  91. defer c.tokens.Release()
  92. p, err = c.Load(playerId)
  93. if err != nil {
  94. log.Errorf("Load %s error %v", c.name, err)
  95. }
  96. }, func() {
  97. defer func() {
  98. s := c.loadFilter.Remove(playerId)
  99. for _, v := range s {
  100. cft := v.(*callBackFun[T])
  101. if err != nil {
  102. cft.fErrProcess(playerId, err)
  103. } else {
  104. cft.fprocess(p)
  105. }
  106. }
  107. monitor.GoLoadTimeoutWarn(fmt.Sprintf("%s.LoadAndProcess1", c.name), playerId, st)
  108. }()
  109. if err != nil {
  110. fErrProcess(playerId, err)
  111. return
  112. }
  113. old := c.GetCacheData(playerId)
  114. if old != nil {
  115. p = old
  116. }
  117. if IsNil(p) || p == nil {
  118. p = c.New(playerId)
  119. }
  120. c.PreProcess(p)
  121. c.SetCacheData(playerId, p)
  122. fprocess(p)
  123. })
  124. }
  125. func IsNil(i interface{}) bool {
  126. if i == nil {
  127. return true
  128. }
  129. switch reflect.TypeOf(i).Kind() {
  130. case reflect.Ptr, reflect.Map, reflect.Array, reflect.Chan, reflect.Slice:
  131. return reflect.ValueOf(i).IsNil()
  132. }
  133. return false
  134. }
  135. func (c *GenericCache[T]) LoadManyAndNoCacheProcess(loadMany func() (map[int64]*T, error), fprocess func(players []*T), fErrProcess func(err error)) {
  136. var ps map[int64]*T
  137. var err error
  138. c.skeleton.Go(func() {
  139. //协程加载
  140. c.tokens.Acquire()
  141. defer c.tokens.Release()
  142. ps, err = loadMany()
  143. }, func() {
  144. defer func() {
  145. }()
  146. if err != nil {
  147. fErrProcess(err)
  148. return
  149. }
  150. lst := make([]*T, 0, len(ps))
  151. for id, v := range ps {
  152. old := c.GetCacheData(id)
  153. if old != nil {
  154. v = old
  155. }
  156. c.PreProcess(v)
  157. // c.SetCacheData(id, v)
  158. lst = append(lst, v)
  159. }
  160. fprocess(lst)
  161. })
  162. }
  163. func (c *GenericCache[T]) LoadMultiAndProcess(playerId1 int64, playerId2 int64, fprocess func(p1 *T, p2 *T), fErrProcess func(err error)) {
  164. c.LoadAndProcess(playerId1, func(p1 *T) {
  165. c.LoadAndProcess(playerId2, func(p2 *T) {
  166. fprocess(p1, p2)
  167. }, func(playerId int64, err error) {
  168. fErrProcess(err)
  169. })
  170. }, func(playerId int64, err error) {
  171. fErrProcess(err)
  172. })
  173. }
  174. // func NewPlayer(playerId int64) *PlayerTasks {
  175. // return nil
  176. // }
  177. // func LoadPlayer(playerId int64) (*PlayerTasks, error) {
  178. // return nil, nil
  179. // }
  180. // func preProcess(p *PlayerTasks) {
  181. // }
  182. // allPayPlayers = memcache.NewCache[PlayerTasks](5*time.Minute, m.Skeleton) //.Init(LoadPlayer, NewObj, PrePocess)
  183. // allPayPlayers.New = NewPlayer
  184. // allPayPlayers.Load = LoadPlayer
  185. // allPayPlayers.PreProcess = preProcess
  186. type PlayerTasks struct {
  187. PlayerID int64
  188. Tasks []string
  189. }
  190. func NewPlayer(playerId int64) *PlayerTasks {
  191. return &PlayerTasks{
  192. PlayerID: playerId,
  193. Tasks: []string{"Task1", "Task2"},
  194. }
  195. }
  196. // 实现 Load 函数,用于加载 PlayerTasks 对象
  197. func LoadPlayer(playerId int64) (*PlayerTasks, error) {
  198. // 模拟从数据库或其他存储中加载数据
  199. return &PlayerTasks{
  200. PlayerID: playerId,
  201. Tasks: []string{"Task3", "Task4"},
  202. }, nil
  203. }
  204. // 实现 PreProcess 函数,用于预处理 PlayerTasks 对象
  205. func preProcess(p *PlayerTasks) {
  206. // 模拟一些预处理操作
  207. p.Tasks = append(p.Tasks, "PreprocessedTask")
  208. return
  209. }
  210. func example3() {
  211. // 初始化 Skeleton(假设 Skeleton 已经定义)
  212. skeleton := &module.Skeleton{}
  213. // 创建一个 GenericCache 实例,缓存存活时间为 5 分钟
  214. cache := NewGenericCache[PlayerTasks](5*time.Minute, skeleton)
  215. // 设置 New、Load 和 PreProcess 函数
  216. cache.New = NewPlayer
  217. cache.Load = LoadPlayer
  218. cache.PreProcess = preProcess
  219. // 加载并处理玩家数据
  220. playerID := int64(123)
  221. cache.LoadAndProcess(playerID, func(p *PlayerTasks) {
  222. // 处理加载成功的玩家数据
  223. fmt.Printf("Player %d tasks: %v\n", p.PlayerID, p.Tasks)
  224. }, func(playerId int64, err error) {
  225. // 处理加载失败的情况
  226. fmt.Printf("Failed to load player %d, error code: %d\n", playerId, err)
  227. })
  228. // 等待一段时间,以便协程完成
  229. time.Sleep(1 * time.Second)
  230. }