123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- package memcache
- import (
- "fmt"
- "leafstalk/covenant/monitor"
- "leafstalk/log"
- "leafstalk/module"
- "leafstalk/otherutils"
- "reflect"
- "strconv"
- "time"
- cach2 "github.com/patrickmn/go-cache"
- )
- type GenericCache[T interface{}] struct {
- allPlayers *cach2.Cache
- tokens *otherutils.RoutineTokens
- loadFilter otherutils.FilterInt64
- skeleton *module.Skeleton
- name string
- Load func(id int64) (*T, error)
- New func(playerId int64) *T
- PreProcess func(*T)
- }
- func NewGenericCache[T interface{}](aliveTl time.Duration, skeleton *module.Skeleton) *GenericCache[T] {
- c := new(GenericCache[T])
- c.skeleton = skeleton
- c.allPlayers = cach2.New(aliveTl, 10*time.Minute)
- c.tokens = otherutils.NewRoutineTokens(10)
- var tmp *T
- val := reflect.TypeOf(tmp)
- c.name = val.Elem().Name()
- return c
- }
- type callBackFun[T interface{}] struct {
- fprocess func(p *T)
- fErrProcess func(playerId int64, err error)
- }
- func (c *GenericCache[T]) GetCacheData(playerID int64) *T {
- k := strconv.FormatInt(playerID, 10)
- if v, ok := c.allPlayers.Get(k); ok {
- if v2, ok := v.(*T); ok {
- return v2
- }
- }
- return nil
- }
- func (c *GenericCache[T]) SetCacheData(playerID int64, v *T) {
- k2 := strconv.FormatInt(playerID, 10)
- c.allPlayers.SetDefault(k2, v)
- }
- func (c *GenericCache[T]) DeleteCacheData(playerID int64) {
- k2 := strconv.FormatInt(playerID, 10)
- c.allPlayers.Delete(k2)
- }
- func (c *GenericCache[T]) Items(callBack func(playerId int64, val *T)) {
- items := c.allPlayers.Items()
- for k, v := range items {
- k2, err := strconv.ParseInt(k, 10, 64)
- if err != nil {
- continue
- }
- v2, ok2 := v.Object.(*T)
- if ok2 {
- callBack(k2, v2)
- }
- }
- }
- func (c *GenericCache[T]) LoadAndProcess(playerId int64, fprocess func(p *T), fErrProcess func(playerId int64, err error)) {
- //找到玩家
- st := time.Now()
- p := c.GetCacheData(playerId)
- if p != nil {
- c.PreProcess(p)
- c.SetCacheData(playerId, p)
- fprocess(p)
- monitor.GoLoadTimeoutWarn(fmt.Sprintf("%s.LoadAndProcess", c.name), playerId, st)
- return
- }
- //正在加载
- if c.loadFilter.IsExist(playerId) {
- t := new(callBackFun[T])
- t.fprocess = fprocess
- t.fErrProcess = fErrProcess
- c.loadFilter.AppendTask(playerId, t)
- return
- }
- c.loadFilter.Add(playerId)
- var err error
- c.skeleton.Go(func() {
- //协程加载
- c.tokens.Acquire()
- defer c.tokens.Release()
- p, err = c.Load(playerId)
- if err != nil {
- log.Errorf("Load %s error %v", c.name, err)
- }
- }, func() {
- defer func() {
- s := c.loadFilter.Remove(playerId)
- for _, v := range s {
- cft := v.(*callBackFun[T])
- if err != nil {
- cft.fErrProcess(playerId, err)
- } else {
- cft.fprocess(p)
- }
- }
- monitor.GoLoadTimeoutWarn(fmt.Sprintf("%s.LoadAndProcess1", c.name), playerId, st)
- }()
- if err != nil {
- fErrProcess(playerId, err)
- return
- }
- old := c.GetCacheData(playerId)
- if old != nil {
- p = old
- }
- if IsNil(p) || p == nil {
- p = c.New(playerId)
- }
- c.PreProcess(p)
- c.SetCacheData(playerId, p)
- fprocess(p)
- })
- }
- func IsNil(i interface{}) bool {
- if i == nil {
- return true
- }
- switch reflect.TypeOf(i).Kind() {
- case reflect.Ptr, reflect.Map, reflect.Array, reflect.Chan, reflect.Slice:
- return reflect.ValueOf(i).IsNil()
- }
- return false
- }
- func (c *GenericCache[T]) LoadManyAndNoCacheProcess(loadMany func() (map[int64]*T, error), fprocess func(players []*T), fErrProcess func(err error)) {
- var ps map[int64]*T
- var err error
- c.skeleton.Go(func() {
- //协程加载
- c.tokens.Acquire()
- defer c.tokens.Release()
- ps, err = loadMany()
- }, func() {
- defer func() {
- }()
- if err != nil {
- fErrProcess(err)
- return
- }
- lst := make([]*T, 0, len(ps))
- for id, v := range ps {
- old := c.GetCacheData(id)
- if old != nil {
- v = old
- }
- c.PreProcess(v)
- // c.SetCacheData(id, v)
- lst = append(lst, v)
- }
- fprocess(lst)
- })
- }
- func (c *GenericCache[T]) LoadMultiAndProcess(playerId1 int64, playerId2 int64, fprocess func(p1 *T, p2 *T), fErrProcess func(err error)) {
- c.LoadAndProcess(playerId1, func(p1 *T) {
- c.LoadAndProcess(playerId2, func(p2 *T) {
- fprocess(p1, p2)
- }, func(playerId int64, err error) {
- fErrProcess(err)
- })
- }, func(playerId int64, err error) {
- fErrProcess(err)
- })
- }
- // func NewPlayer(playerId int64) *PlayerTasks {
- // return nil
- // }
- // func LoadPlayer(playerId int64) (*PlayerTasks, error) {
- // return nil, nil
- // }
- // func preProcess(p *PlayerTasks) {
- // }
- // allPayPlayers = memcache.NewCache[PlayerTasks](5*time.Minute, m.Skeleton) //.Init(LoadPlayer, NewObj, PrePocess)
- // allPayPlayers.New = NewPlayer
- // allPayPlayers.Load = LoadPlayer
- // allPayPlayers.PreProcess = preProcess
- type PlayerTasks struct {
- PlayerID int64
- Tasks []string
- }
- func NewPlayer(playerId int64) *PlayerTasks {
- return &PlayerTasks{
- PlayerID: playerId,
- Tasks: []string{"Task1", "Task2"},
- }
- }
- // 实现 Load 函数,用于加载 PlayerTasks 对象
- func LoadPlayer(playerId int64) (*PlayerTasks, error) {
- // 模拟从数据库或其他存储中加载数据
- return &PlayerTasks{
- PlayerID: playerId,
- Tasks: []string{"Task3", "Task4"},
- }, nil
- }
- // 实现 PreProcess 函数,用于预处理 PlayerTasks 对象
- func preProcess(p *PlayerTasks) {
- // 模拟一些预处理操作
- p.Tasks = append(p.Tasks, "PreprocessedTask")
- return
- }
- func example3() {
- // 初始化 Skeleton(假设 Skeleton 已经定义)
- skeleton := &module.Skeleton{}
- // 创建一个 GenericCache 实例,缓存存活时间为 5 分钟
- cache := NewGenericCache[PlayerTasks](5*time.Minute, skeleton)
- // 设置 New、Load 和 PreProcess 函数
- cache.New = NewPlayer
- cache.Load = LoadPlayer
- cache.PreProcess = preProcess
- // 加载并处理玩家数据
- playerID := int64(123)
- cache.LoadAndProcess(playerID, func(p *PlayerTasks) {
- // 处理加载成功的玩家数据
- fmt.Printf("Player %d tasks: %v\n", p.PlayerID, p.Tasks)
- }, func(playerId int64, err error) {
- // 处理加载失败的情况
- fmt.Printf("Failed to load player %d, error code: %d\n", playerId, err)
- })
- // 等待一段时间,以便协程完成
- time.Sleep(1 * time.Second)
- }
|