123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- package memcache
- import (
- "fmt"
- "leafstalk/covenant/monitor"
- "leafstalk/module"
- "leafstalk/otherutils"
- "reflect"
- "strconv"
- "time"
- cach2 "github.com/patrickmn/go-cache"
- )
- type DirtyChecker interface {
- IsDirty() bool
- }
- type AppendCache[T DirtyChecker] struct {
- allPlayers *cach2.Cache
- tokens *otherutils.RoutineTokens
- loadFilter otherutils.FilterInt64
- skeleton *module.Skeleton
- name string
- Load func(id int64) (T, error)
- New func(playerId int64) T
- PreProcess func(T)
- LoadDirty func(v T) error
- }
- func NewAppendCache[T DirtyChecker](aliveTl time.Duration, skeleton *module.Skeleton) *AppendCache[T] {
- c := new(AppendCache[T])
- c.skeleton = skeleton
- c.allPlayers = cach2.New(aliveTl, 10*time.Minute)
- c.tokens = otherutils.NewRoutineTokens(10)
- var tmp T
- val := reflect.TypeOf(tmp)
- c.name = val.Elem().Name()
- return c
- }
- type callBackFun2[T interface{}] struct {
- fprocess func(p T)
- fErrProcess func(playerId int64, err error)
- }
- func (c *AppendCache[T]) GetCacheData(playerID int64) (T, bool) {
- k := strconv.FormatInt(playerID, 10)
- if v, ok := c.allPlayers.Get(k); ok {
- if v2, ok := v.(T); ok {
- return v2, true
- }
- }
- var zeroVal T
- return zeroVal, false
- }
- func (c *AppendCache[T]) SetCacheData(playerID int64, v T) {
- k2 := strconv.FormatInt(playerID, 10)
- c.allPlayers.SetDefault(k2, v)
- }
- func (c *AppendCache[T]) DeleteCacheData(playerID int64) {
- k2 := strconv.FormatInt(playerID, 10)
- c.allPlayers.Delete(k2)
- }
- func (c *AppendCache[T]) FlushCacheData() {
- c.allPlayers.Flush()
- }
- func (c *AppendCache[T]) Items(callBack func(playerId int64, val T)) {
- items := c.allPlayers.Items()
- for k, v := range items {
- k2, err := strconv.ParseInt(k, 10, 64)
- if err != nil {
- continue
- }
- v2, ok2 := v.Object.(T)
- if ok2 {
- callBack(k2, v2)
- }
- }
- }
- func (c *AppendCache[T]) LoadAndProcess(playerId int64, fprocess func(p T), fErrProcess func(playerId int64, err error)) {
- //找到玩家
- isDirty := false
- st := time.Now()
- p, has := c.GetCacheData(playerId)
- if has {
- isDirty = p.IsDirty()
- if !isDirty {
- c.PreProcess(p)
- c.SetCacheData(playerId, p)
- fprocess(p)
- monitor.GoLoadTimeoutWarn(fmt.Sprintf("%s.LoadAndProcess", c.name), playerId, st)
- return
- }
- }
- //正在加载
- if c.loadFilter.IsExist(playerId) {
- t := new(callBackFun2[T])
- t.fprocess = fprocess
- t.fErrProcess = fErrProcess
- c.loadFilter.AppendTask(playerId, t)
- return
- }
- c.loadFilter.Add(playerId)
- var err error
- c.skeleton.Go(func() {
- //协程加载
- c.tokens.Acquire()
- defer c.tokens.Release()
- if isDirty {
- err = c.LoadDirty(p)
- } else {
- p, err = c.Load(playerId)
- }
- }, func() {
- defer func() {
- s := c.loadFilter.Remove(playerId)
- for _, v := range s {
- cft := v.(*callBackFun2[T])
- if err != nil {
- cft.fErrProcess(playerId, err)
- } else {
- cft.fprocess(p)
- }
- }
- monitor.GoLoadTimeoutWarn(fmt.Sprintf("%s.LoadAndProcess1", c.name), playerId, st)
- }()
- if err != nil {
- fErrProcess(playerId, err)
- return
- }
- if isZero2(p) {
- p = c.New(playerId)
- }
- c.PreProcess(p)
- c.SetCacheData(playerId, p)
- fprocess(p)
- })
- }
- func isZero2[T any](v T) bool {
- return reflect.ValueOf(v).IsZero()
- }
- // func IsNil(i interface{}) bool {
- // if i == nil {
- // return true
- // }
- // switch reflect.TypeOf(i).Kind() {
- // case reflect.Ptr, reflect.Map, reflect.Array, reflect.Chan, reflect.Slice:
- // return reflect.ValueOf(i).IsNil()
- // }
- // return false
- // }
- // func (c *AppendCache[T]) LoadManyAndNoCacheProcess(loadMany func() (map[int64]*T, error), fprocess func(players []*T), fErrProcess func(err error)) {
- // var ps map[int64]*T
- // var err error
- // c.skeleton.Go(func() {
- // //协程加载
- // c.tokens.Acquire()
- // defer c.tokens.Release()
- // ps, err = loadMany()
- // }, func() {
- // defer func() {
- // }()
- // if err != nil {
- // fErrProcess(err)
- // return
- // }
- // lst := make([]*T, 0, len(ps))
- // for id, v := range ps {
- // old := c.GetCacheData(id)
- // if old != nil {
- // v = old
- // }
- // c.PreProcess(v)
- // // c.SetCacheData(id, v)
- // lst = append(lst, v)
- // }
- // fprocess(lst)
- // })
- // }
- // func (c *AppendCache[T]) LoadMultiAndProcess(playerId1 int64, playerId2 int64, fprocess func(p1 *T, p2 *T), fErrProcess func(err error)) {
- // c.LoadAndProcess(playerId1, func(p1 *T) {
- // c.LoadAndProcess(playerId2, func(p2 *T) {
- // fprocess(p1, p2)
- // }, func(playerId int64, err error) {
- // fErrProcess(err)
- // })
- // }, func(playerId int64, err error) {
- // fErrProcess(err)
- // })
- // }
- // func NewPlayer(playerId int64) *PlayerTasks {
- // return nil
- // }
- // func LoadPlayer(playerId int64) (*PlayerTasks, error) {
- // return nil, nil
- // }
- // func preProcess(p *PlayerTasks) {
- // }
- // allPayPlayers = memcache.NewCache[PlayerTasks](5*time.Minute, m.Skeleton) //.Init(LoadPlayer, NewObj, PrePocess)
- // allPayPlayers.New = NewPlayer
- // allPayPlayers.Load = LoadPlayer
- // allPayPlayers.PreProcess = preProcess
- // type PlayerTasks struct {
- // PlayerID int64
- // Tasks []string
- // }
- // func NewPlayer(playerId int64) *PlayerTasks {
- // return &PlayerTasks{
- // PlayerID: playerId,
- // Tasks: []string{"Task1", "Task2"},
- // }
- // }
- // // 实现 Load 函数,用于加载 PlayerTasks 对象
- // func LoadPlayer(playerId int64) (*PlayerTasks, error) {
- // // 模拟从数据库或其他存储中加载数据
- // return &PlayerTasks{
- // PlayerID: playerId,
- // Tasks: []string{"Task3", "Task4"},
- // }, nil
- // }
- // // 实现 PreProcess 函数,用于预处理 PlayerTasks 对象
- // func preProcess(p *PlayerTasks) bool {
- // // 模拟一些预处理操作
- // p.Tasks = append(p.Tasks, "PreprocessedTask")
- // return true
- // }
- // func example3() {
- // // 初始化 Skeleton(假设 Skeleton 已经定义)
- // skeleton := &module.Skeleton{}
- // // 创建一个 AppendCache 实例,缓存存活时间为 5 分钟
- // cache := NewGenericCache[PlayerTasks](5*time.Minute, skeleton)
- // // 设置 New、Load 和 PreProcess 函数
- // cache.New = NewPlayer
- // cache.Load = LoadPlayer
- // cache.PreProcess = preProcess
- // // 加载并处理玩家数据
- // playerID := int64(123)
- // cache.LoadAndProcess(playerID, func(p *PlayerTasks) {
- // // 处理加载成功的玩家数据
- // fmt.Printf("Player %d tasks: %v\n", p.PlayerID, p.Tasks)
- // }, func(playerId int64, err error) {
- // // 处理加载失败的情况
- // fmt.Printf("Failed to load player %d, error code: %d\n", playerId, err)
- // })
- // // 等待一段时间,以便协程完成
- // time.Sleep(1 * time.Second)
- // }
|