package memcache import ( "fmt" "leafstalk/covenant/monitor" "leafstalk/module" "leafstalk/otherutils" "reflect" "strconv" "time" cach2 "github.com/patrickmn/go-cache" ) type DirtyChecker interface { IsDirty() bool } type AppendCache[T DirtyChecker] struct { allPlayers *cach2.Cache tokens *otherutils.RoutineTokens loadFilter otherutils.FilterInt64 skeleton *module.Skeleton name string Load func(id int64) (T, error) New func(playerId int64) T PreProcess func(T) LoadDirty func(v T) error } func NewAppendCache[T DirtyChecker](aliveTl time.Duration, skeleton *module.Skeleton) *AppendCache[T] { c := new(AppendCache[T]) c.skeleton = skeleton c.allPlayers = cach2.New(aliveTl, 10*time.Minute) c.tokens = otherutils.NewRoutineTokens(10) var tmp T val := reflect.TypeOf(tmp) c.name = val.Elem().Name() return c } type callBackFun2[T interface{}] struct { fprocess func(p T) fErrProcess func(playerId int64, err error) } func (c *AppendCache[T]) GetCacheData(playerID int64) (T, bool) { k := strconv.FormatInt(playerID, 10) if v, ok := c.allPlayers.Get(k); ok { if v2, ok := v.(T); ok { return v2, true } } var zeroVal T return zeroVal, false } func (c *AppendCache[T]) SetCacheData(playerID int64, v T) { k2 := strconv.FormatInt(playerID, 10) c.allPlayers.SetDefault(k2, v) } func (c *AppendCache[T]) DeleteCacheData(playerID int64) { k2 := strconv.FormatInt(playerID, 10) c.allPlayers.Delete(k2) } func (c *AppendCache[T]) FlushCacheData() { c.allPlayers.Flush() } func (c *AppendCache[T]) Items(callBack func(playerId int64, val T)) { items := c.allPlayers.Items() for k, v := range items { k2, err := strconv.ParseInt(k, 10, 64) if err != nil { continue } v2, ok2 := v.Object.(T) if ok2 { callBack(k2, v2) } } } func (c *AppendCache[T]) LoadAndProcess(playerId int64, fprocess func(p T), fErrProcess func(playerId int64, err error)) { //找到玩家 isDirty := false st := time.Now() p, has := c.GetCacheData(playerId) if has { isDirty = p.IsDirty() if !isDirty { c.PreProcess(p) c.SetCacheData(playerId, p) fprocess(p) monitor.GoLoadTimeoutWarn(fmt.Sprintf("%s.LoadAndProcess", c.name), playerId, st) return } } //正在加载 if c.loadFilter.IsExist(playerId) { t := new(callBackFun2[T]) t.fprocess = fprocess t.fErrProcess = fErrProcess c.loadFilter.AppendTask(playerId, t) return } c.loadFilter.Add(playerId) var err error c.skeleton.Go(func() { //协程加载 c.tokens.Acquire() defer c.tokens.Release() if isDirty { err = c.LoadDirty(p) } else { p, err = c.Load(playerId) } }, func() { defer func() { s := c.loadFilter.Remove(playerId) for _, v := range s { cft := v.(*callBackFun2[T]) if err != nil { cft.fErrProcess(playerId, err) } else { cft.fprocess(p) } } monitor.GoLoadTimeoutWarn(fmt.Sprintf("%s.LoadAndProcess1", c.name), playerId, st) }() if err != nil { fErrProcess(playerId, err) return } if isZero2(p) { p = c.New(playerId) } c.PreProcess(p) c.SetCacheData(playerId, p) fprocess(p) }) } func isZero2[T any](v T) bool { return reflect.ValueOf(v).IsZero() } // func IsNil(i interface{}) bool { // if i == nil { // return true // } // switch reflect.TypeOf(i).Kind() { // case reflect.Ptr, reflect.Map, reflect.Array, reflect.Chan, reflect.Slice: // return reflect.ValueOf(i).IsNil() // } // return false // } // func (c *AppendCache[T]) LoadManyAndNoCacheProcess(loadMany func() (map[int64]*T, error), fprocess func(players []*T), fErrProcess func(err error)) { // var ps map[int64]*T // var err error // c.skeleton.Go(func() { // //协程加载 // c.tokens.Acquire() // defer c.tokens.Release() // ps, err = loadMany() // }, func() { // defer func() { // }() // if err != nil { // fErrProcess(err) // return // } // lst := make([]*T, 0, len(ps)) // for id, v := range ps { // old := c.GetCacheData(id) // if old != nil { // v = old // } // c.PreProcess(v) // // c.SetCacheData(id, v) // lst = append(lst, v) // } // fprocess(lst) // }) // } // func (c *AppendCache[T]) LoadMultiAndProcess(playerId1 int64, playerId2 int64, fprocess func(p1 *T, p2 *T), fErrProcess func(err error)) { // c.LoadAndProcess(playerId1, func(p1 *T) { // c.LoadAndProcess(playerId2, func(p2 *T) { // fprocess(p1, p2) // }, func(playerId int64, err error) { // fErrProcess(err) // }) // }, func(playerId int64, err error) { // fErrProcess(err) // }) // } // func NewPlayer(playerId int64) *PlayerTasks { // return nil // } // func LoadPlayer(playerId int64) (*PlayerTasks, error) { // return nil, nil // } // func preProcess(p *PlayerTasks) { // } // allPayPlayers = memcache.NewCache[PlayerTasks](5*time.Minute, m.Skeleton) //.Init(LoadPlayer, NewObj, PrePocess) // allPayPlayers.New = NewPlayer // allPayPlayers.Load = LoadPlayer // allPayPlayers.PreProcess = preProcess // type PlayerTasks struct { // PlayerID int64 // Tasks []string // } // func NewPlayer(playerId int64) *PlayerTasks { // return &PlayerTasks{ // PlayerID: playerId, // Tasks: []string{"Task1", "Task2"}, // } // } // // 实现 Load 函数,用于加载 PlayerTasks 对象 // func LoadPlayer(playerId int64) (*PlayerTasks, error) { // // 模拟从数据库或其他存储中加载数据 // return &PlayerTasks{ // PlayerID: playerId, // Tasks: []string{"Task3", "Task4"}, // }, nil // } // // 实现 PreProcess 函数,用于预处理 PlayerTasks 对象 // func preProcess(p *PlayerTasks) bool { // // 模拟一些预处理操作 // p.Tasks = append(p.Tasks, "PreprocessedTask") // return true // } // func example3() { // // 初始化 Skeleton(假设 Skeleton 已经定义) // skeleton := &module.Skeleton{} // // 创建一个 AppendCache 实例,缓存存活时间为 5 分钟 // cache := NewGenericCache[PlayerTasks](5*time.Minute, skeleton) // // 设置 New、Load 和 PreProcess 函数 // cache.New = NewPlayer // cache.Load = LoadPlayer // cache.PreProcess = preProcess // // 加载并处理玩家数据 // playerID := int64(123) // cache.LoadAndProcess(playerID, func(p *PlayerTasks) { // // 处理加载成功的玩家数据 // fmt.Printf("Player %d tasks: %v\n", p.PlayerID, p.Tasks) // }, func(playerId int64, err error) { // // 处理加载失败的情况 // fmt.Printf("Failed to load player %d, error code: %d\n", playerId, err) // }) // // 等待一段时间,以便协程完成 // time.Sleep(1 * time.Second) // }