package jobs import ( "encoding/json" "gadmin/config" "gadmin/internal/admin/service" "gadmin/internal/gorm/model" "gadmin/internal/gorm/query" "gadmin/utility" "github.com/sirupsen/logrus" "sync" "time" ) var Changed = new(jChanged) type jChanged struct { sync.RWMutex } func (j *jChanged) Run() { logrus.Info("jChanged Run.....") //if os.Getenv("GIN_MODE") == "release" { // j.RLock() // defer j.RUnlock() // // for serverId, _ := range config.GDBGroup { // j.updateSts(serverId, 1) // 钻石 // j.updateSts(serverId, 2) // 金币 // j.updatePlayer(serverId, 99) // 玩家消费统计 // } //} else { // logrus.Warn("测试环境无需运行,跳过..") //} } func (j *jChanged) updateSts(serverId int, t int64) { take, err := j.getTakeDate(serverId, t) if err != nil { return } if take == nil { return } channelIds, err := service.Channel.Ids() if err != nil { logrus.Warningf("Channel.Ids,err:%v", err) return } for _, channelId := range channelIds { // 这里同时更新一下昨天的,保证昨天的是完整统计 j.updateStatistics(serverId, t, take.Date.AddDate(0, 0, -1), channelId) j.updateStatistics(serverId, t, take.Date, channelId) } } func (j *jChanged) updateStatistics(serverId int, t int64, date time.Time, channelId string) { logrus.Infof("updateStatistics serverId:%v, t:%v, 统计日期:%+v, channelId:%v", serverId, t, date, channelId) type ChangedStatistic struct { ID int64 Type int32 Source int32 Amount int64 Players []int64 Counts int64 Date string } var ( q = query.Use(config.DB).ChangedLog m = q.Where(q.Date.Eq(date)).Where(q.ChannelID.Eq(channelId), q.ServerID.Eq(int32(serverId))) saveLists []*ChangedStatistic ) if t == 1 { m = m.Where(q.Diamond.Lt(0)) } else { m = m.Where(q.Coin.Lt(0)) } lists, err := m.Find() if err != nil { logrus.Warnf("updateStatistics Find err:%+v", err) return } addAmount := func(statistic *ChangedStatistic, v *model.ChangedLog, t int64) int64 { var amount int64 if statistic == nil { statistic = new(ChangedStatistic) } if t == 1 { amount = statistic.Amount + v.Diamond } else { amount = statistic.Amount + v.Coin } return amount } for _, v := range lists { var exist bool for _, statistic := range saveLists { if statistic.Source == v.Source { exist = true statistic.Amount = addAmount(statistic, v, t) statistic.Players = append(statistic.Players, v.UserID) statistic.Counts++ } } if exist == false { saveLists = append(saveLists, &ChangedStatistic{ Type: int32(t), Source: v.Source, Amount: addAmount(nil, v, t), Players: []int64{v.UserID}, Counts: 1, Date: v.Date.Format("2006-01-02"), }) } } for _, vv := range saveLists { vv.Players = utility.UniqueInt64s(vv.Players) var ( qs = query.Use(config.DB).ChangedStatistic stm *model.ChangedStatistic data model.ChangedStatistic ) if err = qs.Where(qs.Type.Eq(int32(t)), qs.Source.Eq(vv.Source), qs.Date.Eq(vv.Date), qs.ChannelID.Eq(channelId), qs.ServerID.Eq(int32(serverId))).Scan(&stm); err != nil { logrus.Warnf("saveLists Scan err:%+v", err) return } b, _ := json.Marshal(vv.Players) data.ServerID = int32(serverId) data.ChannelID = channelId data.Type = int32(t) data.Source = vv.Source data.Amount = vv.Amount data.Players = string(b) data.Counts = vv.Counts data.Date = vv.Date logrus.Warnf("data:%+v", data) if stm != nil && stm.ID > 0 { if _, err = query.Use(config.DB).ChangedStatistic.Where(qs.ID.Eq(stm.ID)).Updates(&data); err != nil { logrus.Warnf("saveLists Updates err:%+v", err) return } } else { if err = query.Use(config.DB).ChangedStatistic.Create(&data); err != nil { logrus.Warnf("saveLists Create err:%+v", err) return } } } var lastData = new(model.ChangedLog) if len(lists) == 0 { lastData.Date = date } if len(lists) == 1 { lastData = lists[0] } if len(lists) > 1 { lastData = lists[len(lists)-1] } var qsc = query.Use(config.DB).ChangedSync _, err = query.Use(config.DB).ChangedSync.Where(qsc.Type.Eq(t)).Updates(&model.ChangedSync{ ServerID: int32(serverId), LastID: lastData.ID, LastSyncTime: lastData.Date, UpdatedAt: time.Now(), }) if err != nil { logrus.Warnf("updateStatistics ChangedSync Updates err:%+v", err) return } } func (j *jChanged) updatePlayer(serverId int, t int64) { take, err := j.getSyncModel(serverId, t) if err != nil { logrus.Info("updatePlayer getLastDate err :", err) return } if take == nil { return } var ( q = query.Use(config.DB).ChangedLog m = q.Where(q.ID.Gt(take.LastID), q.ServerID.Eq(int32(serverId))) ) lists, err := m.Find() if err != nil { logrus.Warnf("updatePlayer Find err:%+v", err) return } savePlayerConsumption := func(c *model.ChangedLog) error { var ( qp = query.Use(config.DB).ChangedPlayer stm *model.ChangedPlayer ) if err = qp.Where(qp.Playerid.Eq(c.UserID)).Scan(&stm); err != nil { logrus.Warnf("savePlayerConsumption Scan err:%+v", err) return err } if stm != nil && stm.ID > 0 { // 已更新过的 直接跳过 if stm.LastCid >= c.ID { return nil } if _, err = query.Use(config.DB).ChangedPlayer.Where(qp.ID.Eq(stm.ID)).Updates(&model.ChangedPlayer{ ServerID: int32(serverId), ExpendCoin: stm.ExpendCoin + c.Coin, ExpendDiamond: stm.ExpendDiamond + c.Diamond, LastCid: c.ID, UpdatedAt: time.Unix(int64(c.Time), 0), }); err != nil { logrus.Warnf("savePlayerConsumption Updates err:%+v", err) return err } } else { if err = query.Use(config.DB).ChangedPlayer.Create(&model.ChangedPlayer{ ServerID: int32(serverId), Playerid: c.UserID, ExpendCoin: c.Coin, ExpendDiamond: c.Diamond, LastCid: c.ID, UpdatedAt: time.Unix(int64(c.Time), 0), CreatedAt: time.Unix(int64(c.Time), 0), }); err != nil { logrus.Warnf("savePlayerConsumption Create err:%+v", err) return err } } return nil } for _, v := range lists { if v.Coin < 0 || v.Diamond < 0 { if err := savePlayerConsumption(v); err != nil { return } } } if len(lists) == 0 { return } var lastData = new(model.ChangedLog) if len(lists) == 1 { lastData = lists[0] } else { lastData = lists[len(lists)-1] } var qsc = query.Use(config.DB).ChangedSync _, err = query.Use(config.DB).ChangedSync.Where(qsc.Type.Eq(t), qsc.ServerID.Eq(int32(serverId))).Updates(&model.ChangedSync{ LastID: lastData.ID, LastSyncTime: lastData.Date, UpdatedAt: time.Now(), }) if err != nil { logrus.Warnf("updatePlayer ChangedSync Updates err:%+v", err) return } } func (j *jChanged) getTakeDate(serverId int, t int64) (*model.ChangedLog, error) { mod, err := j.getSyncModel(serverId, t) if err != nil { logrus.Warn("getLastDate err :", err) return nil, err } if mod == nil { logrus.Warn("getLastDate mod == nil") return nil, nil } q := query.Use(config.DB).ChangedLog take, err := q.Where(q.Date.Gt(mod.LastSyncTime), q.ServerID.Eq(int32(serverId))).Take() if err != nil { logrus.Warn("getLastDate Take err :", err) return nil, err } return take, nil } // 获取同步模型 func (j *jChanged) getSyncModel(serverId int, t int64) (m *model.ChangedSync, err error) { var q = query.Use(config.DB).ChangedSync if err = q.Where(q.Type.Eq(t), q.ServerID.Eq(int32(serverId))).Scan(&m); err != nil { return } return m, nil }