|
- package jobs
- import (
- "encoding/json"
- "gadmin/config"
- "gadmin/internal/admin/service"
- "gadmin/internal/gorm/model"
- "gadmin/internal/gorm/query"
- "gadmin/utility"
- "github.com/sirupsen/logrus"
- "sync"
- "time"
- )
- var Changed = new(jChanged)
- type jChanged struct {
- sync.RWMutex
- }
- func (j *jChanged) Run() {
- logrus.Info("jChanged Run.....")
- //if os.Getenv("GIN_MODE") == "release" {
- // j.RLock()
- // defer j.RUnlock()
- //
- // for serverId, _ := range config.GDBGroup {
- // j.updateSts(serverId, 1) // 钻石
- // j.updateSts(serverId, 2) // 金币
- // j.updatePlayer(serverId, 99) // 玩家消费统计
- // }
- //} else {
- // logrus.Warn("测试环境无需运行,跳过..")
- //}
- }
- func (j *jChanged) updateSts(serverId int, t int64) {
- take, err := j.getTakeDate(serverId, t)
- if err != nil {
- return
- }
- if take == nil {
- return
- }
- channelIds, err := service.Channel.Ids()
- if err != nil {
- logrus.Warningf("Channel.Ids,err:%v", err)
- return
- }
- for _, channelId := range channelIds {
- // 这里同时更新一下昨天的,保证昨天的是完整统计
- j.updateStatistics(serverId, t, take.Date.AddDate(0, 0, -1), channelId)
- j.updateStatistics(serverId, t, take.Date, channelId)
- }
- }
- func (j *jChanged) updateStatistics(serverId int, t int64, date time.Time, channelId string) {
- logrus.Infof("updateStatistics serverId:%v, t:%v, 统计日期:%+v, channelId:%v", serverId, t, date, channelId)
- type ChangedStatistic struct {
- ID int64
- Type int32
- Source int32
- Amount int64
- Players []int64
- Counts int64
- Date string
- }
- var (
- q = query.Use(config.DB).ChangedLog
- m = q.Where(q.Date.Eq(date)).Where(q.ChannelID.Eq(channelId), q.ServerID.Eq(int32(serverId)))
- saveLists []*ChangedStatistic
- )
- if t == 1 {
- m = m.Where(q.Diamond.Lt(0))
- } else {
- m = m.Where(q.Coin.Lt(0))
- }
- lists, err := m.Find()
- if err != nil {
- logrus.Warnf("updateStatistics Find err:%+v", err)
- return
- }
- addAmount := func(statistic *ChangedStatistic, v *model.ChangedLog, t int64) int64 {
- var amount int64
- if statistic == nil {
- statistic = new(ChangedStatistic)
- }
- if t == 1 {
- amount = statistic.Amount + v.Diamond
- } else {
- amount = statistic.Amount + v.Coin
- }
- return amount
- }
- for _, v := range lists {
- var exist bool
- for _, statistic := range saveLists {
- if statistic.Source == v.Source {
- exist = true
- statistic.Amount = addAmount(statistic, v, t)
- statistic.Players = append(statistic.Players, v.UserID)
- statistic.Counts++
- }
- }
- if exist == false {
- saveLists = append(saveLists, &ChangedStatistic{
- Type: int32(t),
- Source: v.Source,
- Amount: addAmount(nil, v, t),
- Players: []int64{v.UserID},
- Counts: 1,
- Date: v.Date.Format("2006-01-02"),
- })
- }
- }
- for _, vv := range saveLists {
- vv.Players = utility.UniqueInt64s(vv.Players)
- var (
- qs = query.Use(config.DB).ChangedStatistic
- stm *model.ChangedStatistic
- data model.ChangedStatistic
- )
- if err = qs.Where(qs.Type.Eq(int32(t)), qs.Source.Eq(vv.Source), qs.Date.Eq(vv.Date), qs.ChannelID.Eq(channelId), qs.ServerID.Eq(int32(serverId))).Scan(&stm); err != nil {
- logrus.Warnf("saveLists Scan err:%+v", err)
- return
- }
- b, _ := json.Marshal(vv.Players)
- data.ServerID = int32(serverId)
- data.ChannelID = channelId
- data.Type = int32(t)
- data.Source = vv.Source
- data.Amount = vv.Amount
- data.Players = string(b)
- data.Counts = vv.Counts
- data.Date = vv.Date
- logrus.Warnf("data:%+v", data)
- if stm != nil && stm.ID > 0 {
- if _, err = query.Use(config.DB).ChangedStatistic.Where(qs.ID.Eq(stm.ID)).Updates(&data); err != nil {
- logrus.Warnf("saveLists Updates err:%+v", err)
- return
- }
- } else {
- if err = query.Use(config.DB).ChangedStatistic.Create(&data); err != nil {
- logrus.Warnf("saveLists Create err:%+v", err)
- return
- }
- }
- }
- var lastData = new(model.ChangedLog)
- if len(lists) == 0 {
- lastData.Date = date
- }
- if len(lists) == 1 {
- lastData = lists[0]
- }
- if len(lists) > 1 {
- lastData = lists[len(lists)-1]
- }
- var qsc = query.Use(config.DB).ChangedSync
- _, err = query.Use(config.DB).ChangedSync.Where(qsc.Type.Eq(t)).Updates(&model.ChangedSync{
- ServerID: int32(serverId),
- LastID: lastData.ID,
- LastSyncTime: lastData.Date,
- UpdatedAt: time.Now(),
- })
- if err != nil {
- logrus.Warnf("updateStatistics ChangedSync Updates err:%+v", err)
- return
- }
- }
- func (j *jChanged) updatePlayer(serverId int, t int64) {
- take, err := j.getSyncModel(serverId, t)
- if err != nil {
- logrus.Info("updatePlayer getLastDate err :", err)
- return
- }
- if take == nil {
- return
- }
- var (
- q = query.Use(config.DB).ChangedLog
- m = q.Where(q.ID.Gt(take.LastID), q.ServerID.Eq(int32(serverId)))
- )
- lists, err := m.Find()
- if err != nil {
- logrus.Warnf("updatePlayer Find err:%+v", err)
- return
- }
- savePlayerConsumption := func(c *model.ChangedLog) error {
- var (
- qp = query.Use(config.DB).ChangedPlayer
- stm *model.ChangedPlayer
- )
- if err = qp.Where(qp.Playerid.Eq(c.UserID)).Scan(&stm); err != nil {
- logrus.Warnf("savePlayerConsumption Scan err:%+v", err)
- return err
- }
- if stm != nil && stm.ID > 0 {
- // 已更新过的 直接跳过
- if stm.LastCid >= c.ID {
- return nil
- }
- if _, err = query.Use(config.DB).ChangedPlayer.Where(qp.ID.Eq(stm.ID)).Updates(&model.ChangedPlayer{
- ServerID: int32(serverId),
- ExpendCoin: stm.ExpendCoin + c.Coin,
- ExpendDiamond: stm.ExpendDiamond + c.Diamond,
- LastCid: c.ID,
- UpdatedAt: time.Unix(int64(c.Time), 0),
- }); err != nil {
- logrus.Warnf("savePlayerConsumption Updates err:%+v", err)
- return err
- }
- } else {
- if err = query.Use(config.DB).ChangedPlayer.Create(&model.ChangedPlayer{
- ServerID: int32(serverId),
- Playerid: c.UserID,
- ExpendCoin: c.Coin,
- ExpendDiamond: c.Diamond,
- LastCid: c.ID,
- UpdatedAt: time.Unix(int64(c.Time), 0),
- CreatedAt: time.Unix(int64(c.Time), 0),
- }); err != nil {
- logrus.Warnf("savePlayerConsumption Create err:%+v", err)
- return err
- }
- }
- return nil
- }
- for _, v := range lists {
- if v.Coin < 0 || v.Diamond < 0 {
- if err := savePlayerConsumption(v); err != nil {
- return
- }
- }
- }
- if len(lists) == 0 {
- return
- }
- var lastData = new(model.ChangedLog)
- if len(lists) == 1 {
- lastData = lists[0]
- } else {
- lastData = lists[len(lists)-1]
- }
- var qsc = query.Use(config.DB).ChangedSync
- _, err = query.Use(config.DB).ChangedSync.Where(qsc.Type.Eq(t), qsc.ServerID.Eq(int32(serverId))).Updates(&model.ChangedSync{
- LastID: lastData.ID,
- LastSyncTime: lastData.Date,
- UpdatedAt: time.Now(),
- })
- if err != nil {
- logrus.Warnf("updatePlayer ChangedSync Updates err:%+v", err)
- return
- }
- }
- func (j *jChanged) getTakeDate(serverId int, t int64) (*model.ChangedLog, error) {
- mod, err := j.getSyncModel(serverId, t)
- if err != nil {
- logrus.Warn("getLastDate err :", err)
- return nil, err
- }
- if mod == nil {
- logrus.Warn("getLastDate mod == nil")
- return nil, nil
- }
- q := query.Use(config.DB).ChangedLog
- take, err := q.Where(q.Date.Gt(mod.LastSyncTime), q.ServerID.Eq(int32(serverId))).Take()
- if err != nil {
- logrus.Warn("getLastDate Take err :", err)
- return nil, err
- }
- return take, nil
- }
- // 获取同步模型
- func (j *jChanged) getSyncModel(serverId int, t int64) (m *model.ChangedSync, err error) {
- var q = query.Use(config.DB).ChangedSync
- if err = q.Where(q.Type.Eq(t), q.ServerID.Eq(int32(serverId))).Scan(&m); err != nil {
- return
- }
- return m, nil
- }
|