cache.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package memcache
  2. import (
  3. "errors"
  4. "fmt"
  5. "leafstalk/covenant/monitor"
  6. "leafstalk/module"
  7. "leafstalk/otherutils"
  8. "reflect"
  9. "strconv"
  10. "sync"
  11. "time"
  12. cach2 "github.com/patrickmn/go-cache"
  13. )
  14. var ErrInvalidType = errors.New("invalid type: must be a pointer")
  15. type ItemParam interface {
  16. IdString() string
  17. }
  18. type PlatPlayer struct {
  19. PlayerId int64
  20. Plat string
  21. }
  22. func (p *PlatPlayer) IdString() string {
  23. return strconv.FormatInt(p.PlayerId, 10)
  24. }
  25. type LoadId int64
  26. func (l LoadId) IdString() string {
  27. return strconv.FormatInt(int64(l), 10)
  28. }
  29. // 数据加载器类 K CacheItemParam,
  30. type DataLoader[P ItemParam, T any] struct {
  31. name string
  32. allCacheItems *cach2.Cache
  33. tokens *otherutils.RoutineTokens
  34. loadFilter otherutils.FilterString
  35. playerPool *sync.Pool
  36. usePool bool
  37. // 这几项需要赋值
  38. Skeleton *module.Skeleton
  39. NewItem func(param P) (T, error)
  40. LoadItem func(param P) (T, error)
  41. PreProcess func(T, P) bool
  42. }
  43. func (dl *DataLoader[P, T]) isPointerNil(val T) bool {
  44. vi := reflect.ValueOf(val)
  45. return vi.IsNil()
  46. }
  47. // 创建新的数据加载器
  48. func NewDataLoader[P ItemParam, T any](aliveTl time.Duration, useParamPool bool) *DataLoader[P, T] {
  49. var tmp *T
  50. val := reflect.TypeOf(tmp)
  51. name := val.Elem().Name()
  52. dl := new(DataLoader[P, T])
  53. dl.allCacheItems = cach2.New(aliveTl, 10*time.Minute)
  54. dl.tokens = otherutils.NewRoutineTokens(10)
  55. dl.name = name
  56. if useParamPool {
  57. var tmp2 P
  58. msgType := reflect.TypeOf(tmp2)
  59. if msgType != nil && msgType.Kind() == reflect.Ptr {
  60. dl.usePool = true
  61. dl.playerPool = &sync.Pool{
  62. New: func() interface{} {
  63. return reflect.New(msgType.Elem()).Interface()
  64. },
  65. }
  66. }
  67. }
  68. return dl
  69. }
  70. func (dl *DataLoader[P, T]) GetCacheData(strId string) (T, bool) {
  71. if v, ok := dl.allCacheItems.Get(strId); ok {
  72. v2, _ := v.(T)
  73. return v2, true
  74. }
  75. var zero T
  76. return zero, false
  77. }
  78. func (dl *DataLoader[P, T]) SetCacheData(strId string, v T) {
  79. dl.allCacheItems.SetDefault(strId, v)
  80. }
  81. func (dl *DataLoader[P, T]) DeleteCacheData(strId string) {
  82. dl.allCacheItems.Delete(strId)
  83. }
  84. func (dl *DataLoader[P, T]) Items() []T {
  85. var lst []T
  86. kvs := dl.allCacheItems.Items()
  87. for _, v := range kvs {
  88. v2, ok := v.Object.(T)
  89. if ok {
  90. lst = append(lst, v2)
  91. }
  92. }
  93. return lst
  94. }
  95. func (dl *DataLoader[P, T]) SetEvictedCallBack(f func(string, T)) {
  96. dl.allCacheItems.OnEvicted(func(key string, val interface{}) {
  97. v2, _ := val.(T)
  98. f(key, v2)
  99. })
  100. }
  101. // 需要实现一个更新时长的函数
  102. type callBackFunDl[T any] struct {
  103. fprocess func(p T)
  104. fErrProcess func(errorCode int, err error)
  105. allowCreate bool
  106. }
  107. func (dl *DataLoader[P, T]) NewItemParam() P {
  108. if !dl.usePool {
  109. var t P
  110. return t
  111. }
  112. player := dl.playerPool.Get().(P)
  113. return player
  114. }
  115. func (dl *DataLoader[P, T]) putParamPool(param P) {
  116. if dl.usePool {
  117. dl.playerPool.Put(param)
  118. }
  119. }
  120. func (dl *DataLoader[P, T]) LoadAndProcess(param P, fprocess func(p T), fErrProcess func(errorCode int, err error)) {
  121. dl.loadAndProcess(param, fprocess, fErrProcess, true)
  122. }
  123. func (dl *DataLoader[P, T]) LoadOnlyAndProcess(param P, fprocess func(p T), fErrProcess func(errorCode int, err error)) {
  124. dl.loadAndProcess(param, fprocess, fErrProcess, false)
  125. }
  126. func (dl *DataLoader[P, T]) loadAndProcess(param P, fprocess func(p T), fErrProcess func(errorCode int, err error), allowCreate bool) {
  127. st := time.Now()
  128. // calcErrorCode := errorcoder.GetCalcInterfaceErrMethod(errorcoder.ErrStatusLoadProcess)
  129. //找到玩家
  130. strId := param.IdString()
  131. p, found := dl.GetCacheData(strId)
  132. if found {
  133. if dl.usePool {
  134. defer dl.putParamPool(param)
  135. }
  136. if !dl.PreProcess(p, param) {
  137. errorCode := 101 //calcErrorCode(1)
  138. fErrProcess(errorCode, nil)
  139. return
  140. }
  141. dl.SetCacheData(strId, p)
  142. fprocess(p)
  143. monitor.GoLoadTimeoutWarn2(fmt.Sprintf("%s.LoadAndProcess", dl.name), strId, st)
  144. return
  145. }
  146. //正在加载
  147. if dl.loadFilter.IsExist(strId) {
  148. if dl.usePool {
  149. defer dl.putParamPool(param)
  150. }
  151. t := new(callBackFunDl[T])
  152. t.fprocess = fprocess
  153. t.fErrProcess = fErrProcess
  154. t.allowCreate = allowCreate
  155. dl.loadFilter.AppendTask(strId, t)
  156. return
  157. }
  158. dl.loadFilter.Add(strId)
  159. var err error
  160. dl.Skeleton.Go(func() {
  161. //协程加载
  162. dl.tokens.Acquire()
  163. defer dl.tokens.Release()
  164. p, err = dl.LoadItem(param)
  165. }, func() {
  166. noFoundObject := false
  167. var errCode int
  168. defer func() {
  169. if dl.usePool {
  170. defer dl.putParamPool(param)
  171. }
  172. lst := dl.loadFilter.Remove(strId)
  173. // 仅加载后,发现不存在,但其他处理需要创建新对象,此时创建新对象
  174. for _, v := range lst {
  175. cft := v.(*callBackFunDl[T])
  176. if noFoundObject && cft.allowCreate {
  177. p, err = dl.NewItem(param)
  178. if err != nil {
  179. errCode = 103
  180. fErrProcess(errCode, err)
  181. continue
  182. }
  183. if v, found := dl.GetCacheData(strId); found {
  184. p = v
  185. }
  186. if !dl.PreProcess(p, param) {
  187. errCode = 105
  188. err = errors.New("errCode:105")
  189. fErrProcess(errCode, err)
  190. continue
  191. }
  192. dl.SetCacheData(strId, p)
  193. // err = nil
  194. errCode = 0
  195. }
  196. if err != nil {
  197. cft.fErrProcess(errCode, err)
  198. } else {
  199. cft.fprocess(p)
  200. }
  201. }
  202. monitor.GoLoadTimeoutWarn2(fmt.Sprintf("%s.LoadAndProcess", dl.name), strId, st)
  203. }()
  204. if err != nil {
  205. errCode = 102
  206. fErrProcess(errCode, err)
  207. return
  208. }
  209. if dl.isPointerNil(p) {
  210. if allowCreate {
  211. p, err = dl.NewItem(param)
  212. if err != nil {
  213. errCode = 103
  214. fErrProcess(errCode, err)
  215. return
  216. }
  217. } else {
  218. noFoundObject = true
  219. errCode = 104
  220. err = errors.New("errCode:104")
  221. fErrProcess(errCode, err)
  222. return
  223. }
  224. }
  225. if v, found := dl.GetCacheData(strId); found {
  226. p = v
  227. }
  228. if !dl.PreProcess(p, param) {
  229. errCode = 105
  230. err = errors.New("errCode:105")
  231. fErrProcess(errCode, err)
  232. return
  233. }
  234. fprocess(p)
  235. dl.SetCacheData(strId, p)
  236. })
  237. }
  238. func (dl *DataLoader[P, T]) IsNoExistErrorCode(code int) bool {
  239. return (code == 104)
  240. }
  241. func (dl *DataLoader[P, T]) Go(loadMany func(), fprocess func()) {
  242. st := time.Now()
  243. dl.Skeleton.Go(func() {
  244. //协程加载
  245. dl.tokens.Acquire()
  246. defer dl.tokens.Release()
  247. loadMany()
  248. }, func() {
  249. defer func() {
  250. monitor.GoLoadTimeoutWarn2(fmt.Sprintf("%s.LoadAndProcess Go", dl.name), "many", st)
  251. }()
  252. fprocess()
  253. })
  254. }
  255. // idiomsCache = NewDataLoader[*PlatPlayer, model.IdiomMatch](5 * time.Minute)
  256. // idiomsCache.NewItem = NewObj
  257. // idiomsCache.LoadItem = LoadPlayer
  258. // idiomsCache.PreProcess = PrePocess
  259. // idiomsCache.Skeleton = skeleton
  260. // func NewPlayer(param *memcache.PlatPlayer) (*model.CurioMake, error){}
  261. //func LoadPlayer(param *memcache.PlatPlayer) (*model.CurioMake, error) {}
  262. //func preProcess(p *model.CurioMake, param *memcache.PlatPlayer) bool {}
  263. // var allPlayerCache *memcache.DataLoader[memcache.LoadId, ChapterPlayer]
  264. // func InitCache(skeleton *module.Skeleton) {
  265. // allPlayerCache = memcache.NewDataLoader[memcache.LoadId, ChapterPlayer](5*time.Minute, false)
  266. // allPlayerCache.NewItem = NewPlayer
  267. // allPlayerCache.LoadItem = LoadPlayer
  268. // allPlayerCache.PreProcess = preProcess
  269. // allPlayerCache.Skeleton = skeleton
  270. // }
  271. // func NewPlayer(param memcache.LoadId) (*ChapterPlayer, error) { return nil, nil }
  272. // func LoadPlayer(param memcache.LoadId) (*ChapterPlayer, error) { return nil, nil }
  273. // func preProcess(p *ChapterPlayer, param memcache.LoadId) bool { return true }
  274. // p := allCurioPlayers.NewItemParam()
  275. // p.PlayerId = request.PlayerId
  276. // p.Plat = request.ClientPlat
  277. // allCurioPlayers.LoadAndProcess(p, processSucess, processError)