package memcache import ( "errors" "fmt" "leafstalk/covenant/monitor" "leafstalk/module" "leafstalk/otherutils" "reflect" "strconv" "sync" "time" cach2 "github.com/patrickmn/go-cache" ) type ItemParam interface { IdString() string } type PlatPlayer struct { PlayerId int64 Plat string } func (p *PlatPlayer) IdString() string { return strconv.FormatInt(p.PlayerId, 10) } type LoadId int64 func (l LoadId) IdString() string { return strconv.FormatInt(int64(l), 10) } // 数据加载器类 K CacheItemParam, type DataLoader[P ItemParam, T any] struct { name string allCacheItems *cach2.Cache tokens otherutils.RoutineTokens loadFilter otherutils.FilterString playerPool *sync.Pool usePool bool // 这几项需要赋值 Skeleton *module.Skeleton NewItem func(param P) (T, error) LoadItem func(param P) (T, error) PrePocess func(T, P) bool } func (dl *DataLoader[P, T]) isPointerNil(val T) bool { vi := reflect.ValueOf(val) return vi.IsNil() } // 创建新的数据加载器 func NewDataLoader[P ItemParam, T any](aliveTl time.Duration, useParamPool bool) *DataLoader[P, T] { var tmp *T val := reflect.TypeOf(tmp) if val == nil || val.Kind() != reflect.Ptr { return nil } name := val.Elem().Name() dl := new(DataLoader[P, T]) dl.allCacheItems = cach2.New(aliveTl, 10*time.Minute) dl.tokens.Init(10) dl.name = name if useParamPool { var tmp2 P msgType := reflect.TypeOf(tmp2) if msgType != nil && msgType.Kind() == reflect.Ptr { dl.usePool = true dl.playerPool = &sync.Pool{ New: func() interface{} { return reflect.New(msgType.Elem()).Interface() }, } } } return dl } func (dl *DataLoader[P, T]) GetCacheData(strId string) (df T) { if v, ok := dl.allCacheItems.Get(strId); ok { v2, _ := v.(T) return v2 } return } func (dl *DataLoader[P, T]) SetCacheData(strId string, v T) { dl.allCacheItems.SetDefault(strId, v) } func (dl *DataLoader[P, T]) DeleteCacheData(strId string) { dl.allCacheItems.Delete(strId) } func (dl *DataLoader[P, T]) Items() []T { var lst []T kvs := dl.allCacheItems.Items() for _, v := range kvs { v2, ok := v.Object.(T) if ok { lst = append(lst, v2) } } return lst } func (dl *DataLoader[P, T]) SetEvictedCallBack(f func(string, T)) { dl.allCacheItems.OnEvicted(func(key string, val interface{}) { v2, _ := val.(T) f(key, v2) }) } // 需要实现一个更新时长的函数 type callBackFunDl[T any] struct { fprocess func(p T) fErrProcess func(errorCode int, err error) allowCreate bool } func (dl *DataLoader[P, T]) NewItemParam() P { if !dl.usePool { var t P return t } player := dl.playerPool.Get().(P) return player } func (dl *DataLoader[P, T]) putParamPool(param P) { dl.playerPool.Put(param) } func (dl *DataLoader[P, T]) LoadAndProcess(param P, fprocess func(p T), fErrProcess func(errorCode int, err error)) { dl.loadAndProcess(param, fprocess, fErrProcess, true) } func (dl *DataLoader[P, T]) LoadOnlyAndProcess(param P, fprocess func(p T), fErrProcess func(errorCode int, err error)) { dl.loadAndProcess(param, fprocess, fErrProcess, false) } func (dl *DataLoader[P, T]) loadAndProcess(param P, fprocess func(p T), fErrProcess func(errorCode int, err error), allowCreate bool) { st := time.Now() //找到玩家 strId := (param).IdString() p := dl.GetCacheData(strId) if !dl.isPointerNil(p) { if dl.usePool { defer dl.putParamPool(param) } if !dl.PrePocess(p, param) { fErrProcess(101, nil) return } dl.SetCacheData(strId, p) fprocess(p) monitor.GoLoadTimeoutWarn2(fmt.Sprintf("%s.LoadAndProcess", dl.name), strId, st) return } //正在加载 if dl.loadFilter.IsExist(strId) { if dl.usePool { defer dl.putParamPool(param) } t := new(callBackFunDl[T]) t.fprocess = fprocess t.fErrProcess = fErrProcess t.allowCreate = allowCreate dl.loadFilter.AppendTask(strId, t) return } dl.loadFilter.Add(strId) var err error dl.Skeleton.Go(func() { //协程加载 dl.tokens.Get() defer dl.tokens.Release() p, err = dl.LoadItem(param) }, func() { // exitReasonNoItem := false var errCode int defer func() { if dl.usePool { defer dl.putParamPool(param) } lst := dl.loadFilter.Remove(strId) // 仅加载后,发现不存在,但其他处理需要创建新对象,此时创建新对象 for _, v := range lst { cft := v.(*callBackFunDl[T]) if errCode == 104 && cft.allowCreate { p, err = dl.NewItem(param) if err != nil { errCode = 106 fErrProcess(errCode, err) continue } if v := dl.GetCacheData(strId); !dl.isPointerNil(v) { p = v } if !dl.PrePocess(p, param) { errCode = 107 err = errors.New("errCode:105") fErrProcess(errCode, err) continue } dl.SetCacheData(strId, p) // err = nil errCode = 0 } if err != nil { cft.fErrProcess(errCode, err) } else { cft.fprocess(p) } } monitor.GoLoadTimeoutWarn2(fmt.Sprintf("%s.LoadAndProcess", dl.name), strId, st) }() if err != nil { errCode = 102 fErrProcess(errCode, err) return } if dl.isPointerNil(p) { if allowCreate { p, err = dl.NewItem(param) if err != nil { errCode = 103 fErrProcess(errCode, err) return } } else { // exitReasonNoItem = true errCode = 104 err = errors.New("errCode:104") fErrProcess(errCode, err) return } } if v := dl.GetCacheData(strId); !dl.isPointerNil(v) { p = v } if !dl.PrePocess(p, param) { errCode = 105 err = errors.New("errCode:105") fErrProcess(errCode, err) return } dl.SetCacheData(strId, p) fprocess(p) }) } func (dl *DataLoader[P, T]) IsNoExistErrorCode(code int) bool { return (code == 104) } func (dl *DataLoader[P, T]) Go(loadMany func(), fprocess func()) { st := time.Now() dl.Skeleton.Go(func() { //协程加载 dl.tokens.Get() defer dl.tokens.Release() loadMany() }, func() { defer func() { monitor.GoLoadTimeoutWarn2(fmt.Sprintf("%s.LoadAndProcess Go", dl.name), "many", st) }() fprocess() }) } // idiomsCache = NewDataLoader[*PlatPlayer, model.IdiomMatch](5 * time.Minute) // idiomsCache.NewItem = NewObj // idiomsCache.LoadItem = LoadPlayer // idiomsCache.PrePocess = PrePocess // idiomsCache.Skeleton = skeleton // func NewPlayer(param *memcache.PlatPlayer) (*model.CurioMake, error){} //func LoadPlayer(param *memcache.PlatPlayer) (*model.CurioMake, error) {} //func preProcess(p *model.CurioMake, param *memcache.PlatPlayer) bool {} // var allPlayerCache *memcache.DataLoader[memcache.LoadId, ChapterPlayer] // func InitCache(skeleton *module.Skeleton) { // allPlayerCache = memcache.NewDataLoader[memcache.LoadId, ChapterPlayer](5*time.Minute, false) // allPlayerCache.NewItem = NewPlayer // allPlayerCache.LoadItem = LoadPlayer // allPlayerCache.PrePocess = preProcess // allPlayerCache.Skeleton = skeleton // } // func NewPlayer(param memcache.LoadId) (*ChapterPlayer, error) { return nil, nil } // func LoadPlayer(param memcache.LoadId) (*ChapterPlayer, error) { return nil, nil } // func preProcess(p *ChapterPlayer, param memcache.LoadId) bool { return true } // p := allCurioPlayers.NewItemParam() // p.PlayerId = request.PlayerId // p.Plat = request.ClientPlat // allCurioPlayers.LoadAndProcess(p, processSucess, processError)