123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- package memcache
- import (
- "errors"
- "fmt"
- "leafstalk/covenant/monitor"
- "leafstalk/module"
- "leafstalk/otherutils"
- "reflect"
- "strconv"
- "sync"
- "time"
- cach2 "github.com/patrickmn/go-cache"
- )
- type ItemParam interface {
- IdString() string
- }
- type PlatPlayer struct {
- PlayerId int64
- Plat string
- }
- func (p *PlatPlayer) IdString() string {
- return strconv.FormatInt(p.PlayerId, 10)
- }
- type LoadId int64
- func (l LoadId) IdString() string {
- return strconv.FormatInt(int64(l), 10)
- }
- // 数据加载器类 K CacheItemParam,
- type DataLoader[P ItemParam, T any] struct {
- name string
- allCacheItems *cach2.Cache
- tokens otherutils.RoutineTokens
- loadFilter otherutils.FilterString
- playerPool *sync.Pool
- usePool bool
- // 这几项需要赋值
- Skeleton *module.Skeleton
- NewItem func(param P) (T, error)
- LoadItem func(param P) (T, error)
- PrePocess func(T, P) bool
- }
- func (dl *DataLoader[P, T]) isPointerNil(val T) bool {
- vi := reflect.ValueOf(val)
- return vi.IsNil()
- }
- // 创建新的数据加载器
- func NewDataLoader[P ItemParam, T any](aliveTl time.Duration, useParamPool bool) *DataLoader[P, T] {
- var tmp *T
- val := reflect.TypeOf(tmp)
- if val == nil || val.Kind() != reflect.Ptr {
- return nil
- }
- name := val.Elem().Name()
- dl := new(DataLoader[P, T])
- dl.allCacheItems = cach2.New(aliveTl, 10*time.Minute)
- dl.tokens.Init(10)
- dl.name = name
- if useParamPool {
- var tmp2 P
- msgType := reflect.TypeOf(tmp2)
- if msgType != nil && msgType.Kind() == reflect.Ptr {
- dl.usePool = true
- dl.playerPool = &sync.Pool{
- New: func() interface{} {
- return reflect.New(msgType.Elem()).Interface()
- },
- }
- }
- }
- return dl
- }
- func (dl *DataLoader[P, T]) GetCacheData(strId string) (df T) {
- if v, ok := dl.allCacheItems.Get(strId); ok {
- v2, _ := v.(T)
- return v2
- }
- return
- }
- func (dl *DataLoader[P, T]) SetCacheData(strId string, v T) {
- dl.allCacheItems.SetDefault(strId, v)
- }
- func (dl *DataLoader[P, T]) DeleteCacheData(strId string) {
- dl.allCacheItems.Delete(strId)
- }
- func (dl *DataLoader[P, T]) Items() []T {
- var lst []T
- kvs := dl.allCacheItems.Items()
- for _, v := range kvs {
- v2, ok := v.Object.(T)
- if ok {
- lst = append(lst, v2)
- }
- }
- return lst
- }
- func (dl *DataLoader[P, T]) SetEvictedCallBack(f func(string, T)) {
- dl.allCacheItems.OnEvicted(func(key string, val interface{}) {
- v2, _ := val.(T)
- f(key, v2)
- })
- }
- // 需要实现一个更新时长的函数
- type callBackFunDl[T any] struct {
- fprocess func(p T)
- fErrProcess func(errorCode int, err error)
- allowCreate bool
- }
- func (dl *DataLoader[P, T]) NewItemParam() P {
- if !dl.usePool {
- var t P
- return t
- }
- player := dl.playerPool.Get().(P)
- return player
- }
- func (dl *DataLoader[P, T]) putParamPool(param P) {
- dl.playerPool.Put(param)
- }
- func (dl *DataLoader[P, T]) LoadAndProcess(param P, fprocess func(p T), fErrProcess func(errorCode int, err error)) {
- dl.loadAndProcess(param, fprocess, fErrProcess, true)
- }
- func (dl *DataLoader[P, T]) LoadOnlyAndProcess(param P, fprocess func(p T), fErrProcess func(errorCode int, err error)) {
- dl.loadAndProcess(param, fprocess, fErrProcess, false)
- }
- func (dl *DataLoader[P, T]) loadAndProcess(param P, fprocess func(p T), fErrProcess func(errorCode int, err error), allowCreate bool) {
- st := time.Now()
- //找到玩家
- strId := (param).IdString()
- p := dl.GetCacheData(strId)
- if !dl.isPointerNil(p) {
- if dl.usePool {
- defer dl.putParamPool(param)
- }
- if !dl.PrePocess(p, param) {
- fErrProcess(101, nil)
- return
- }
- dl.SetCacheData(strId, p)
- fprocess(p)
- monitor.GoLoadTimeoutWarn2(fmt.Sprintf("%s.LoadAndProcess", dl.name), strId, st)
- return
- }
- //正在加载
- if dl.loadFilter.IsExist(strId) {
- if dl.usePool {
- defer dl.putParamPool(param)
- }
- t := new(callBackFunDl[T])
- t.fprocess = fprocess
- t.fErrProcess = fErrProcess
- t.allowCreate = allowCreate
- dl.loadFilter.AppendTask(strId, t)
- return
- }
- dl.loadFilter.Add(strId)
- var err error
- dl.Skeleton.Go(func() {
- //协程加载
- dl.tokens.Get()
- defer dl.tokens.Release()
- p, err = dl.LoadItem(param)
- }, func() {
- // exitReasonNoItem := false
- var errCode int
- defer func() {
- if dl.usePool {
- defer dl.putParamPool(param)
- }
- lst := dl.loadFilter.Remove(strId)
- // 仅加载后,发现不存在,但其他处理需要创建新对象,此时创建新对象
- for _, v := range lst {
- cft := v.(*callBackFunDl[T])
- if errCode == 104 && cft.allowCreate {
- p, err = dl.NewItem(param)
- if err != nil {
- errCode = 106
- fErrProcess(errCode, err)
- continue
- }
- if v := dl.GetCacheData(strId); !dl.isPointerNil(v) {
- p = v
- }
- if !dl.PrePocess(p, param) {
- errCode = 107
- err = errors.New("errCode:105")
- fErrProcess(errCode, err)
- continue
- }
- dl.SetCacheData(strId, p)
- // err = nil
- errCode = 0
- }
- if err != nil {
- cft.fErrProcess(errCode, err)
- } else {
- cft.fprocess(p)
- }
- }
- monitor.GoLoadTimeoutWarn2(fmt.Sprintf("%s.LoadAndProcess", dl.name), strId, st)
- }()
- if err != nil {
- errCode = 102
- fErrProcess(errCode, err)
- return
- }
- if dl.isPointerNil(p) {
- if allowCreate {
- p, err = dl.NewItem(param)
- if err != nil {
- errCode = 103
- fErrProcess(errCode, err)
- return
- }
- } else {
- // exitReasonNoItem = true
- errCode = 104
- err = errors.New("errCode:104")
- fErrProcess(errCode, err)
- return
- }
- }
- if v := dl.GetCacheData(strId); !dl.isPointerNil(v) {
- p = v
- }
- if !dl.PrePocess(p, param) {
- errCode = 105
- err = errors.New("errCode:105")
- fErrProcess(errCode, err)
- return
- }
- dl.SetCacheData(strId, p)
- fprocess(p)
- })
- }
- func (dl *DataLoader[P, T]) IsNoExistErrorCode(code int) bool {
- return (code == 104)
- }
- func (dl *DataLoader[P, T]) Go(loadMany func(), fprocess func()) {
- st := time.Now()
- dl.Skeleton.Go(func() {
- //协程加载
- dl.tokens.Get()
- defer dl.tokens.Release()
- loadMany()
- }, func() {
- defer func() {
- monitor.GoLoadTimeoutWarn2(fmt.Sprintf("%s.LoadAndProcess Go", dl.name), "many", st)
- }()
- fprocess()
- })
- }
- // idiomsCache = NewDataLoader[*PlatPlayer, model.IdiomMatch](5 * time.Minute)
- // idiomsCache.NewItem = NewObj
- // idiomsCache.LoadItem = LoadPlayer
- // idiomsCache.PrePocess = PrePocess
- // idiomsCache.Skeleton = skeleton
- // func NewPlayer(param *memcache.PlatPlayer) (*model.CurioMake, error){}
- //func LoadPlayer(param *memcache.PlatPlayer) (*model.CurioMake, error) {}
- //func preProcess(p *model.CurioMake, param *memcache.PlatPlayer) bool {}
- // var allPlayerCache *memcache.DataLoader[memcache.LoadId, ChapterPlayer]
- // func InitCache(skeleton *module.Skeleton) {
- // allPlayerCache = memcache.NewDataLoader[memcache.LoadId, ChapterPlayer](5*time.Minute, false)
- // allPlayerCache.NewItem = NewPlayer
- // allPlayerCache.LoadItem = LoadPlayer
- // allPlayerCache.PrePocess = preProcess
- // allPlayerCache.Skeleton = skeleton
- // }
- // func NewPlayer(param memcache.LoadId) (*ChapterPlayer, error) { return nil, nil }
- // func LoadPlayer(param memcache.LoadId) (*ChapterPlayer, error) { return nil, nil }
- // func preProcess(p *ChapterPlayer, param memcache.LoadId) bool { return true }
- // p := allCurioPlayers.NewItemParam()
- // p.PlayerId = request.PlayerId
- // p.Plat = request.ClientPlat
- // allCurioPlayers.LoadAndProcess(p, processSucess, processError)
|