cache.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. package memcache
  2. import (
  3. "errors"
  4. "fmt"
  5. "leafstalk/covenant/monitor"
  6. "leafstalk/module"
  7. "leafstalk/otherutils"
  8. "reflect"
  9. "strconv"
  10. "sync"
  11. "time"
  12. cach2 "github.com/patrickmn/go-cache"
  13. )
  14. type ItemParam interface {
  15. IdString() string
  16. }
  17. type PlatPlayer struct {
  18. PlayerId int64
  19. Plat string
  20. }
  21. func (p *PlatPlayer) IdString() string {
  22. return strconv.FormatInt(p.PlayerId, 10)
  23. }
  24. type LoadId int64
  25. func (l LoadId) IdString() string {
  26. return strconv.FormatInt(int64(l), 10)
  27. }
  28. // 数据加载器类 K CacheItemParam,
  29. type DataLoader[P ItemParam, T any] struct {
  30. name string
  31. allCacheItems *cach2.Cache
  32. tokens otherutils.RoutineTokens
  33. loadFilter otherutils.FilterString
  34. playerPool *sync.Pool
  35. usePool bool
  36. // 这几项需要赋值
  37. Skeleton *module.Skeleton
  38. NewItem func(param P) (T, error)
  39. LoadItem func(param P) (T, error)
  40. PrePocess func(T, P) bool
  41. }
  42. func (dl *DataLoader[P, T]) isPointerNil(val T) bool {
  43. vi := reflect.ValueOf(val)
  44. return vi.IsNil()
  45. }
  46. // 创建新的数据加载器
  47. func NewDataLoader[P ItemParam, T any](aliveTl time.Duration, useParamPool bool) *DataLoader[P, T] {
  48. var tmp *T
  49. val := reflect.TypeOf(tmp)
  50. if val == nil || val.Kind() != reflect.Ptr {
  51. return nil
  52. }
  53. name := val.Elem().Name()
  54. dl := new(DataLoader[P, T])
  55. dl.allCacheItems = cach2.New(aliveTl, 10*time.Minute)
  56. dl.tokens.Init(10)
  57. dl.name = name
  58. if useParamPool {
  59. var tmp2 P
  60. msgType := reflect.TypeOf(tmp2)
  61. if msgType != nil && msgType.Kind() == reflect.Ptr {
  62. dl.usePool = true
  63. dl.playerPool = &sync.Pool{
  64. New: func() interface{} {
  65. return reflect.New(msgType.Elem()).Interface()
  66. },
  67. }
  68. }
  69. }
  70. return dl
  71. }
  72. func (dl *DataLoader[P, T]) GetCacheData(strId string) (df T) {
  73. if v, ok := dl.allCacheItems.Get(strId); ok {
  74. v2, _ := v.(T)
  75. return v2
  76. }
  77. return
  78. }
  79. func (dl *DataLoader[P, T]) SetCacheData(strId string, v T) {
  80. dl.allCacheItems.SetDefault(strId, v)
  81. }
  82. func (dl *DataLoader[P, T]) DeleteCacheData(strId string) {
  83. dl.allCacheItems.Delete(strId)
  84. }
  85. func (dl *DataLoader[P, T]) Items() []T {
  86. var lst []T
  87. kvs := dl.allCacheItems.Items()
  88. for _, v := range kvs {
  89. v2, ok := v.Object.(T)
  90. if ok {
  91. lst = append(lst, v2)
  92. }
  93. }
  94. return lst
  95. }
  96. func (dl *DataLoader[P, T]) SetEvictedCallBack(f func(string, T)) {
  97. dl.allCacheItems.OnEvicted(func(key string, val interface{}) {
  98. v2, _ := val.(T)
  99. f(key, v2)
  100. })
  101. }
  102. // 需要实现一个更新时长的函数
  103. type callBackFunDl[T any] struct {
  104. fprocess func(p T)
  105. fErrProcess func(errorCode int, err error)
  106. allowCreate bool
  107. }
  108. func (dl *DataLoader[P, T]) NewItemParam() P {
  109. if !dl.usePool {
  110. var t P
  111. return t
  112. }
  113. player := dl.playerPool.Get().(P)
  114. return player
  115. }
  116. func (dl *DataLoader[P, T]) putParamPool(param P) {
  117. dl.playerPool.Put(param)
  118. }
  119. func (dl *DataLoader[P, T]) LoadAndProcess(param P, fprocess func(p T), fErrProcess func(errorCode int, err error)) {
  120. dl.loadAndProcess(param, fprocess, fErrProcess, true)
  121. }
  122. func (dl *DataLoader[P, T]) LoadOnlyAndProcess(param P, fprocess func(p T), fErrProcess func(errorCode int, err error)) {
  123. dl.loadAndProcess(param, fprocess, fErrProcess, false)
  124. }
  125. func (dl *DataLoader[P, T]) loadAndProcess(param P, fprocess func(p T), fErrProcess func(errorCode int, err error), allowCreate bool) {
  126. st := time.Now()
  127. //找到玩家
  128. strId := (param).IdString()
  129. p := dl.GetCacheData(strId)
  130. if !dl.isPointerNil(p) {
  131. if dl.usePool {
  132. defer dl.putParamPool(param)
  133. }
  134. if !dl.PrePocess(p, param) {
  135. fErrProcess(101, nil)
  136. return
  137. }
  138. dl.SetCacheData(strId, p)
  139. fprocess(p)
  140. monitor.GoLoadTimeoutWarn2(fmt.Sprintf("%s.LoadAndProcess", dl.name), strId, st)
  141. return
  142. }
  143. //正在加载
  144. if dl.loadFilter.IsExist(strId) {
  145. if dl.usePool {
  146. defer dl.putParamPool(param)
  147. }
  148. t := new(callBackFunDl[T])
  149. t.fprocess = fprocess
  150. t.fErrProcess = fErrProcess
  151. t.allowCreate = allowCreate
  152. dl.loadFilter.AppendTask(strId, t)
  153. return
  154. }
  155. dl.loadFilter.Add(strId)
  156. var err error
  157. dl.Skeleton.Go(func() {
  158. //协程加载
  159. dl.tokens.Get()
  160. defer dl.tokens.Release()
  161. p, err = dl.LoadItem(param)
  162. }, func() {
  163. // exitReasonNoItem := false
  164. var errCode int
  165. defer func() {
  166. if dl.usePool {
  167. defer dl.putParamPool(param)
  168. }
  169. lst := dl.loadFilter.Remove(strId)
  170. // 仅加载后,发现不存在,但其他处理需要创建新对象,此时创建新对象
  171. for _, v := range lst {
  172. cft := v.(*callBackFunDl[T])
  173. if errCode == 104 && cft.allowCreate {
  174. p, err = dl.NewItem(param)
  175. if err != nil {
  176. errCode = 106
  177. fErrProcess(errCode, err)
  178. continue
  179. }
  180. if v := dl.GetCacheData(strId); !dl.isPointerNil(v) {
  181. p = v
  182. }
  183. if !dl.PrePocess(p, param) {
  184. errCode = 107
  185. err = errors.New("errCode:105")
  186. fErrProcess(errCode, err)
  187. continue
  188. }
  189. dl.SetCacheData(strId, p)
  190. // err = nil
  191. errCode = 0
  192. }
  193. if err != nil {
  194. cft.fErrProcess(errCode, err)
  195. } else {
  196. cft.fprocess(p)
  197. }
  198. }
  199. monitor.GoLoadTimeoutWarn2(fmt.Sprintf("%s.LoadAndProcess", dl.name), strId, st)
  200. }()
  201. if err != nil {
  202. errCode = 102
  203. fErrProcess(errCode, err)
  204. return
  205. }
  206. if dl.isPointerNil(p) {
  207. if allowCreate {
  208. p, err = dl.NewItem(param)
  209. if err != nil {
  210. errCode = 103
  211. fErrProcess(errCode, err)
  212. return
  213. }
  214. } else {
  215. // exitReasonNoItem = true
  216. errCode = 104
  217. err = errors.New("errCode:104")
  218. fErrProcess(errCode, err)
  219. return
  220. }
  221. }
  222. if v := dl.GetCacheData(strId); !dl.isPointerNil(v) {
  223. p = v
  224. }
  225. if !dl.PrePocess(p, param) {
  226. errCode = 105
  227. err = errors.New("errCode:105")
  228. fErrProcess(errCode, err)
  229. return
  230. }
  231. dl.SetCacheData(strId, p)
  232. fprocess(p)
  233. })
  234. }
  235. func (dl *DataLoader[P, T]) IsNoExistErrorCode(code int) bool {
  236. return (code == 104)
  237. }
  238. func (dl *DataLoader[P, T]) Go(loadMany func(), fprocess func()) {
  239. st := time.Now()
  240. dl.Skeleton.Go(func() {
  241. //协程加载
  242. dl.tokens.Get()
  243. defer dl.tokens.Release()
  244. loadMany()
  245. }, func() {
  246. defer func() {
  247. monitor.GoLoadTimeoutWarn2(fmt.Sprintf("%s.LoadAndProcess Go", dl.name), "many", st)
  248. }()
  249. fprocess()
  250. })
  251. }
  252. // idiomsCache = NewDataLoader[*PlatPlayer, model.IdiomMatch](5 * time.Minute)
  253. // idiomsCache.NewItem = NewObj
  254. // idiomsCache.LoadItem = LoadPlayer
  255. // idiomsCache.PrePocess = PrePocess
  256. // idiomsCache.Skeleton = skeleton
  257. // func NewPlayer(param *memcache.PlatPlayer) (*model.CurioMake, error){}
  258. //func LoadPlayer(param *memcache.PlatPlayer) (*model.CurioMake, error) {}
  259. //func preProcess(p *model.CurioMake, param *memcache.PlatPlayer) bool {}
  260. // var allPlayerCache *memcache.DataLoader[memcache.LoadId, ChapterPlayer]
  261. // func InitCache(skeleton *module.Skeleton) {
  262. // allPlayerCache = memcache.NewDataLoader[memcache.LoadId, ChapterPlayer](5*time.Minute, false)
  263. // allPlayerCache.NewItem = NewPlayer
  264. // allPlayerCache.LoadItem = LoadPlayer
  265. // allPlayerCache.PrePocess = preProcess
  266. // allPlayerCache.Skeleton = skeleton
  267. // }
  268. // func NewPlayer(param memcache.LoadId) (*ChapterPlayer, error) { return nil, nil }
  269. // func LoadPlayer(param memcache.LoadId) (*ChapterPlayer, error) { return nil, nil }
  270. // func preProcess(p *ChapterPlayer, param memcache.LoadId) bool { return true }
  271. // p := allCurioPlayers.NewItemParam()
  272. // p.PlayerId = request.PlayerId
  273. // p.Plat = request.ClientPlat
  274. // allCurioPlayers.LoadAndProcess(p, processSucess, processError)