package memcache import ( "fmt" "leafstalk/covenant/monitor" "leafstalk/log" "leafstalk/module" "leafstalk/otherutils" "reflect" "strconv" "time" cach2 "github.com/patrickmn/go-cache" ) type GenericCache[T interface{}] struct { allPlayers *cach2.Cache tokens *otherutils.RoutineTokens loadFilter otherutils.FilterInt64 skeleton *module.Skeleton name string Load func(id int64) (*T, error) New func(playerId int64) *T PreProcess func(*T) } func NewGenericCache[T interface{}](aliveTl time.Duration, skeleton *module.Skeleton) *GenericCache[T] { c := new(GenericCache[T]) c.skeleton = skeleton c.allPlayers = cach2.New(aliveTl, 10*time.Minute) c.tokens = otherutils.NewRoutineTokens(10) var tmp *T val := reflect.TypeOf(tmp) c.name = val.Elem().Name() return c } type callBackFun[T interface{}] struct { fprocess func(p *T) fErrProcess func(playerId int64, err error) } func (c *GenericCache[T]) GetCacheData(playerID int64) *T { k := strconv.FormatInt(playerID, 10) if v, ok := c.allPlayers.Get(k); ok { if v2, ok := v.(*T); ok { return v2 } } return nil } func (c *GenericCache[T]) SetCacheData(playerID int64, v *T) { k2 := strconv.FormatInt(playerID, 10) c.allPlayers.SetDefault(k2, v) } func (c *GenericCache[T]) DeleteCacheData(playerID int64) { k2 := strconv.FormatInt(playerID, 10) c.allPlayers.Delete(k2) } func (c *GenericCache[T]) Items(callBack func(playerId int64, val *T)) { items := c.allPlayers.Items() for k, v := range items { k2, err := strconv.ParseInt(k, 10, 64) if err != nil { continue } v2, ok2 := v.Object.(*T) if ok2 { callBack(k2, v2) } } } func (c *GenericCache[T]) LoadAndProcess(playerId int64, fprocess func(p *T), fErrProcess func(playerId int64, err error)) { //找到玩家 st := time.Now() p := c.GetCacheData(playerId) if p != nil { c.PreProcess(p) c.SetCacheData(playerId, p) fprocess(p) monitor.GoLoadTimeoutWarn(fmt.Sprintf("%s.LoadAndProcess", c.name), playerId, st) return } //正在加载 if c.loadFilter.IsExist(playerId) { t := new(callBackFun[T]) t.fprocess = fprocess t.fErrProcess = fErrProcess c.loadFilter.AppendTask(playerId, t) return } c.loadFilter.Add(playerId) var err error c.skeleton.Go(func() { //协程加载 c.tokens.Acquire() defer c.tokens.Release() p, err = c.Load(playerId) if err != nil { log.Errorf("Load %s error %v", c.name, err) } }, func() { defer func() { s := c.loadFilter.Remove(playerId) for _, v := range s { cft := v.(*callBackFun[T]) if err != nil { cft.fErrProcess(playerId, err) } else { cft.fprocess(p) } } monitor.GoLoadTimeoutWarn(fmt.Sprintf("%s.LoadAndProcess1", c.name), playerId, st) }() if err != nil { fErrProcess(playerId, err) return } old := c.GetCacheData(playerId) if old != nil { p = old } if IsNil(p) || p == nil { p = c.New(playerId) } c.PreProcess(p) c.SetCacheData(playerId, p) fprocess(p) }) } func IsNil(i interface{}) bool { if i == nil { return true } switch reflect.TypeOf(i).Kind() { case reflect.Ptr, reflect.Map, reflect.Array, reflect.Chan, reflect.Slice: return reflect.ValueOf(i).IsNil() } return false } func (c *GenericCache[T]) LoadManyAndNoCacheProcess(loadMany func() (map[int64]*T, error), fprocess func(players []*T), fErrProcess func(err error)) { var ps map[int64]*T var err error c.skeleton.Go(func() { //协程加载 c.tokens.Acquire() defer c.tokens.Release() ps, err = loadMany() }, func() { defer func() { }() if err != nil { fErrProcess(err) return } lst := make([]*T, 0, len(ps)) for id, v := range ps { old := c.GetCacheData(id) if old != nil { v = old } c.PreProcess(v) // c.SetCacheData(id, v) lst = append(lst, v) } fprocess(lst) }) } func (c *GenericCache[T]) LoadMultiAndProcess(playerId1 int64, playerId2 int64, fprocess func(p1 *T, p2 *T), fErrProcess func(err error)) { c.LoadAndProcess(playerId1, func(p1 *T) { c.LoadAndProcess(playerId2, func(p2 *T) { fprocess(p1, p2) }, func(playerId int64, err error) { fErrProcess(err) }) }, func(playerId int64, err error) { fErrProcess(err) }) } // func NewPlayer(playerId int64) *PlayerTasks { // return nil // } // func LoadPlayer(playerId int64) (*PlayerTasks, error) { // return nil, nil // } // func preProcess(p *PlayerTasks) { // } // allPayPlayers = memcache.NewCache[PlayerTasks](5*time.Minute, m.Skeleton) //.Init(LoadPlayer, NewObj, PrePocess) // allPayPlayers.New = NewPlayer // allPayPlayers.Load = LoadPlayer // allPayPlayers.PreProcess = preProcess type PlayerTasks struct { PlayerID int64 Tasks []string } func NewPlayer(playerId int64) *PlayerTasks { return &PlayerTasks{ PlayerID: playerId, Tasks: []string{"Task1", "Task2"}, } } // 实现 Load 函数,用于加载 PlayerTasks 对象 func LoadPlayer(playerId int64) (*PlayerTasks, error) { // 模拟从数据库或其他存储中加载数据 return &PlayerTasks{ PlayerID: playerId, Tasks: []string{"Task3", "Task4"}, }, nil } // 实现 PreProcess 函数,用于预处理 PlayerTasks 对象 func preProcess(p *PlayerTasks) { // 模拟一些预处理操作 p.Tasks = append(p.Tasks, "PreprocessedTask") return } func example3() { // 初始化 Skeleton(假设 Skeleton 已经定义) skeleton := &module.Skeleton{} // 创建一个 GenericCache 实例,缓存存活时间为 5 分钟 cache := NewGenericCache[PlayerTasks](5*time.Minute, skeleton) // 设置 New、Load 和 PreProcess 函数 cache.New = NewPlayer cache.Load = LoadPlayer cache.PreProcess = preProcess // 加载并处理玩家数据 playerID := int64(123) cache.LoadAndProcess(playerID, func(p *PlayerTasks) { // 处理加载成功的玩家数据 fmt.Printf("Player %d tasks: %v\n", p.PlayerID, p.Tasks) }, func(playerId int64, err error) { // 处理加载失败的情况 fmt.Printf("Failed to load player %d, error code: %d\n", playerId, err) }) // 等待一段时间,以便协程完成 time.Sleep(1 * time.Second) }