// 添加玩家所在的渠道 // 对于新玩家,查询其登录渠道,保存起来,供后面其他分析用 // 从游戏库中查询新玩家,分析adv_origin_logs表生成 package jobs import ( "fmt" "gadmin/config" "gadmin/internal/gorm/model" "gadmin/internal/gorm/query" "gadmin/utility/player" "os" "sync" "time" "github.com/sirupsen/logrus" "gorm.io/gen" ) var SyncChannel = new(jSyncChannel) type jSyncChannel struct { sync.Mutex } func (j *jSyncChannel) Run() { logrus.Info("SyncChannel Run.....") if os.Getenv("GIN_MODE") != "release" && os.Getenv("ADMIN_IS_LOCAL") != "1" { logrus.Warnf("测试环境禁止同步") return } j.Lock() defer j.Unlock() for serverId, _ := range config.GDBGroup { j.sync(serverId) } } func (j *jSyncChannel) Trigger() { logrus.Info("SyncChannel Trigger.....") if os.Getenv("GIN_MODE") != "release" && os.Getenv("ADMIN_IS_LOCAL") != "1" { logrus.Warnf("测试环境禁止同步") return } ok := j.TryLock() if !ok { logrus.Info("SyncChannel进程正在运行,跳过...") return } defer j.Unlock() for serverId, _ := range config.GDBGroup { j.sync(serverId) } } func (j *jSyncChannel) sync(serverId int) { DB, err := player.GetDBByServerID(serverId) if err != nil { logrus.Warningf("SyncChannel GetDBByServerID err:%v", err) return } var ( key = fmt.Sprintf("channel_sync_player_id_%v", serverId) lastId = config.FirstOrCreate(key, "sync") newLastId = lastId u = query.Use(config.GDBGroup[DB]).PlayerAttr results []*model.PlayerAttr ) err = u.Select(u.ID, u.Playerid).Where(u.ID.Gt(lastId)).Order(u.ID).FindInBatches(&results, 500, func(tx gen.Dao, batch int) error { playerIds := make([]int64, len(results)) playerMap := make(map[int64]struct{}) for i, item := range results { if _, ok := playerMap[item.Playerid]; ok { continue } playerMap[item.Playerid] = struct{}{} playerIds[i] = item.Playerid } j.instChannels(playerIds) newLastId = results[len(results)-1].ID return nil }) if err != nil { logrus.Warnf("SyncChannel sync err:%+v", err) return } if newLastId != lastId { config.Save(key, newLastId) } logrus.Info("SyncChannel sync success..") } func (j *jSyncChannel) instChannels(playerIds []int64) { playerChannels := make([]*model.PlayerChannel, 0) config.DB.Where("`playerid` in (?)", playerIds).Find(&playerChannels) //如果所查询的用户都存在数据 直接跳过 if len(playerChannels) == len(playerIds) { return } existChannel := make(map[int64]struct{}) for _, item := range playerChannels { existChannel[item.Playerid] = struct{}{} } //获取不存在channel信息的用户 并创建channel信息 playerChannelDataMap := make(map[int64]*model.PlayerChannel) advPlayerIds := make([]int64, 0) for _, playerId := range playerIds { if _, ok := existChannel[playerId]; ok { //存在则跳过 continue } //adv所用id advPlayerIds = append(advPlayerIds, playerId) playerChannelDataMap[playerId] = &model.PlayerChannel{ Playerid: playerId, ChannelID: "0", } } //获取adv信息 advModel := &model.AdvOriginLog{} advQuery := query.Use(config.DB.Scopes(model.TableOfYearMonth(advModel.TableName(), time.Now()))).AdvOriginLog.Table("") advList, err := advQuery.Where(advQuery.Userid.In(advPlayerIds...), advQuery.NewUser.Eq(1)).Find() if err != nil { return } //如果有adv信息 填入channelData existAdv := make(map[int64]*model.AdvOriginLog) for _, item := range advList { existAdv[item.Userid] = item } needUpdateAdvInfo := make(map[int64]string) for _, advPlayerId := range advPlayerIds { if advInfo, ok := existAdv[advPlayerId]; ok { //如果有adv信息 填入channelData playerChannelDataMap[advPlayerId].ChannelID = advInfo.Traceid //如果traceid已经存在需要则需要更新 if advInfo.Traceid != "0" && advInfo.Traceid != "" { //组装需要更新的数据 needUpdateAdvInfo[advPlayerId] = advInfo.Traceid } } } //更新 j.batchSaveAdvChannelEvent(needUpdateAdvInfo) //保存 playerChannelData := make([]*model.PlayerChannel, len(playerChannelDataMap)) i := 0 for _, item := range playerChannelDataMap { playerChannelData[i] = item i++ } config.DB.Create(&playerChannelData) } func (j *jSyncChannel) batchSaveAdvChannelEvent(UpdateData map[int64]string) { //每个user对应的traceId还不一样 只能循环更新了 for playerId, traceId := range UpdateData { // 登录日志 config.DB.Model(&model.LoginLog{}). Where("`user_id` = ? and `channel_id` != ?", playerId, traceId). Update("channel_id", traceId) // 充值订单 config.DB.Model(&model.Order{}). Where("`player_id` = ? and `channel_id` != ?", playerId, traceId). Update("channel_id", traceId) } } func (j *jSyncChannel) instChannel(userId int64) { var ( models *model.PlayerChannel advModel *model.AdvOriginLog ) config.DB.Where("`playerid` = ?", userId).First(&models) if models == nil || models.ID == 0 { user := model.PlayerChannel{Playerid: userId, ChannelID: "0"} config.DB.Where("`userid` = ? and `new_user` = 1", userId).First(&advModel) if advModel != nil { user.ChannelID = advModel.Traceid if advModel.Traceid != "0" && advModel.Traceid != "" { j.saveAdvChannelEvent(userId, advModel.Traceid) } } config.DB.Create(&user) } } // saveAdvChannelEvent 更新已存在数据的相关统计,因为有可能渠道更新前相关统计数据就已经存在 func (j *jSyncChannel) saveAdvChannelEvent(userId int64, channelID string) { // 登录日志 config.DB.Model(&model.LoginLog{}). Where("`user_id` = ? and `channel_id` != ?", userId, channelID). Update("channel_id", channelID) // 充值订单 config.DB.Model(&model.Order{}). Where("`player_id` = ? and `channel_id` != ?", userId, channelID). Update("channel_id", channelID) }