123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- // 添加玩家所在的渠道
- // 对于新玩家,查询其登录渠道,保存起来,供后面其他分析用
- // 从游戏库中查询新玩家,分析adv_origin_logs表生成
- package jobs
- import (
- "fmt"
- "gadmin/config"
- "gadmin/internal/gorm/model"
- "gadmin/internal/gorm/query"
- "gadmin/utility/player"
- "os"
- "sync"
- "time"
- "github.com/sirupsen/logrus"
- "gorm.io/gen"
- )
- var SyncChannel = new(jSyncChannel)
- type jSyncChannel struct {
- sync.Mutex
- }
- func (j *jSyncChannel) Run() {
- logrus.Info("SyncChannel Run.....")
- if os.Getenv("GIN_MODE") != "release" && os.Getenv("ADMIN_IS_LOCAL") != "1" {
- logrus.Warnf("测试环境禁止同步")
- return
- }
- j.Lock()
- defer j.Unlock()
- for serverId, _ := range config.GDBGroup {
- j.sync(serverId)
- }
- }
- func (j *jSyncChannel) Trigger() {
- logrus.Info("SyncChannel Trigger.....")
- if os.Getenv("GIN_MODE") != "release" && os.Getenv("ADMIN_IS_LOCAL") != "1" {
- logrus.Warnf("测试环境禁止同步")
- return
- }
- ok := j.TryLock()
- if !ok {
- logrus.Info("SyncChannel进程正在运行,跳过...")
- return
- }
- defer j.Unlock()
- for serverId, _ := range config.GDBGroup {
- j.sync(serverId)
- }
- }
- func (j *jSyncChannel) sync(serverId int) {
- DB, err := player.GetDBByServerID(serverId)
- if err != nil {
- logrus.Warningf("SyncChannel GetDBByServerID err:%v", err)
- return
- }
- var (
- key = fmt.Sprintf("channel_sync_player_id_%v", serverId)
- lastId = config.FirstOrCreate(key, "sync")
- newLastId = lastId
- u = query.Use(config.GDBGroup[DB]).PlayerAttr
- results []*model.PlayerAttr
- )
- err = u.Select(u.ID, u.Playerid).Where(u.ID.Gt(lastId)).Order(u.ID).FindInBatches(&results, 500, func(tx gen.Dao, batch int) error {
- playerIds := make([]int64, len(results))
- playerMap := make(map[int64]struct{})
- for i, item := range results {
- if _, ok := playerMap[item.Playerid]; ok {
- continue
- }
- playerMap[item.Playerid] = struct{}{}
- playerIds[i] = item.Playerid
- }
- j.instChannels(playerIds)
- newLastId = results[len(results)-1].ID
- return nil
- })
- if err != nil {
- logrus.Warnf("SyncChannel sync err:%+v", err)
- return
- }
- if newLastId != lastId {
- config.Save(key, newLastId)
- }
- logrus.Info("SyncChannel sync success..")
- }
- func (j *jSyncChannel) instChannels(playerIds []int64) {
- playerChannels := make([]*model.PlayerChannel, 0)
- config.DB.Where("`playerid` in (?)", playerIds).Find(&playerChannels)
- //如果所查询的用户都存在数据 直接跳过
- if len(playerChannels) == len(playerIds) {
- return
- }
- existChannel := make(map[int64]struct{})
- for _, item := range playerChannels {
- existChannel[item.Playerid] = struct{}{}
- }
- //获取不存在channel信息的用户 并创建channel信息
- playerChannelDataMap := make(map[int64]*model.PlayerChannel)
- advPlayerIds := make([]int64, 0)
- for _, playerId := range playerIds {
- if _, ok := existChannel[playerId]; ok {
- //存在则跳过
- continue
- }
- //adv所用id
- advPlayerIds = append(advPlayerIds, playerId)
- playerChannelDataMap[playerId] = &model.PlayerChannel{
- Playerid: playerId,
- ChannelID: "0",
- }
- }
- //获取adv信息
- advModel := &model.AdvOriginLog{}
- advQuery := query.Use(config.DB.Scopes(model.TableOfYearMonth(advModel.TableName(), time.Now()))).AdvOriginLog.Table("")
- advList, err := advQuery.Where(advQuery.Userid.In(advPlayerIds...), advQuery.NewUser.Eq(1)).Find()
- if err != nil {
- return
- }
- //如果有adv信息 填入channelData
- existAdv := make(map[int64]*model.AdvOriginLog)
- for _, item := range advList {
- existAdv[item.Userid] = item
- }
- needUpdateAdvInfo := make(map[int64]string)
- for _, advPlayerId := range advPlayerIds {
- if advInfo, ok := existAdv[advPlayerId]; ok {
- //如果有adv信息 填入channelData
- playerChannelDataMap[advPlayerId].ChannelID = advInfo.Traceid
- //如果traceid已经存在需要则需要更新
- if advInfo.Traceid != "0" && advInfo.Traceid != "" {
- //组装需要更新的数据
- needUpdateAdvInfo[advPlayerId] = advInfo.Traceid
- }
- }
- }
- //更新
- j.batchSaveAdvChannelEvent(needUpdateAdvInfo)
- //保存
- playerChannelData := make([]*model.PlayerChannel, len(playerChannelDataMap))
- i := 0
- for _, item := range playerChannelDataMap {
- playerChannelData[i] = item
- i++
- }
- config.DB.Create(&playerChannelData)
- }
- func (j *jSyncChannel) batchSaveAdvChannelEvent(UpdateData map[int64]string) {
- //每个user对应的traceId还不一样 只能循环更新了
- for playerId, traceId := range UpdateData {
- // 登录日志
- config.DB.Model(&model.LoginLog{}).
- Where("`user_id` = ? and `channel_id` != ?", playerId, traceId).
- Update("channel_id", traceId)
- // 充值订单
- config.DB.Model(&model.Order{}).
- Where("`player_id` = ? and `channel_id` != ?", playerId, traceId).
- Update("channel_id", traceId)
- }
- }
- func (j *jSyncChannel) instChannel(userId int64) {
- var (
- models *model.PlayerChannel
- advModel *model.AdvOriginLog
- )
- config.DB.Where("`playerid` = ?", userId).First(&models)
- if models == nil || models.ID == 0 {
- user := model.PlayerChannel{Playerid: userId, ChannelID: "0"}
- config.DB.Where("`userid` = ? and `new_user` = 1", userId).First(&advModel)
- if advModel != nil {
- user.ChannelID = advModel.Traceid
- if advModel.Traceid != "0" && advModel.Traceid != "" {
- j.saveAdvChannelEvent(userId, advModel.Traceid)
- }
- }
- config.DB.Create(&user)
- }
- }
- // saveAdvChannelEvent 更新已存在数据的相关统计,因为有可能渠道更新前相关统计数据就已经存在
- func (j *jSyncChannel) saveAdvChannelEvent(userId int64, channelID string) {
- // 登录日志
- config.DB.Model(&model.LoginLog{}).
- Where("`user_id` = ? and `channel_id` != ?", userId, channelID).
- Update("channel_id", channelID)
- // 充值订单
- config.DB.Model(&model.Order{}).
- Where("`player_id` = ? and `channel_id` != ?", userId, channelID).
- Update("channel_id", channelID)
- }
|