sync_channel.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. // 添加玩家所在的渠道
  2. // 对于新玩家,查询其登录渠道,保存起来,供后面其他分析用
  3. // 从游戏库中查询新玩家,分析adv_origin_logs表生成
  4. package jobs
  5. import (
  6. "fmt"
  7. "gadmin/config"
  8. "gadmin/internal/gorm/model"
  9. "gadmin/internal/gorm/query"
  10. "gadmin/utility/player"
  11. "os"
  12. "sync"
  13. "time"
  14. "github.com/sirupsen/logrus"
  15. "gorm.io/gen"
  16. )
  17. var SyncChannel = new(jSyncChannel)
  18. type jSyncChannel struct {
  19. sync.Mutex
  20. }
  21. func (j *jSyncChannel) Run() {
  22. logrus.Info("SyncChannel Run.....")
  23. if os.Getenv("GIN_MODE") != "release" && os.Getenv("ADMIN_IS_LOCAL") != "1" {
  24. logrus.Warnf("测试环境禁止同步")
  25. return
  26. }
  27. j.Lock()
  28. defer j.Unlock()
  29. for serverId, _ := range config.GDBGroup {
  30. j.sync(serverId)
  31. }
  32. }
  33. func (j *jSyncChannel) Trigger() {
  34. logrus.Info("SyncChannel Trigger.....")
  35. if os.Getenv("GIN_MODE") != "release" && os.Getenv("ADMIN_IS_LOCAL") != "1" {
  36. logrus.Warnf("测试环境禁止同步")
  37. return
  38. }
  39. ok := j.TryLock()
  40. if !ok {
  41. logrus.Info("SyncChannel进程正在运行,跳过...")
  42. return
  43. }
  44. defer j.Unlock()
  45. for serverId, _ := range config.GDBGroup {
  46. j.sync(serverId)
  47. }
  48. }
  49. func (j *jSyncChannel) sync(serverId int) {
  50. DB, err := player.GetDBByServerID(serverId)
  51. if err != nil {
  52. logrus.Warningf("SyncChannel GetDBByServerID err:%v", err)
  53. return
  54. }
  55. var (
  56. key = fmt.Sprintf("channel_sync_player_id_%v", serverId)
  57. lastId = config.FirstOrCreate(key, "sync")
  58. newLastId = lastId
  59. u = query.Use(config.GDBGroup[DB]).PlayerAttr
  60. results []*model.PlayerAttr
  61. )
  62. err = u.Select(u.ID, u.Playerid).Where(u.ID.Gt(lastId)).Order(u.ID).FindInBatches(&results, 500, func(tx gen.Dao, batch int) error {
  63. playerIds := make([]int64, len(results))
  64. playerMap := make(map[int64]struct{})
  65. for i, item := range results {
  66. if _, ok := playerMap[item.Playerid]; ok {
  67. continue
  68. }
  69. playerMap[item.Playerid] = struct{}{}
  70. playerIds[i] = item.Playerid
  71. }
  72. j.instChannels(playerIds)
  73. newLastId = results[len(results)-1].ID
  74. return nil
  75. })
  76. if err != nil {
  77. logrus.Warnf("SyncChannel sync err:%+v", err)
  78. return
  79. }
  80. if newLastId != lastId {
  81. config.Save(key, newLastId)
  82. }
  83. logrus.Info("SyncChannel sync success..")
  84. }
  85. func (j *jSyncChannel) instChannels(playerIds []int64) {
  86. playerChannels := make([]*model.PlayerChannel, 0)
  87. config.DB.Where("`playerid` in (?)", playerIds).Find(&playerChannels)
  88. //如果所查询的用户都存在数据 直接跳过
  89. if len(playerChannels) == len(playerIds) {
  90. return
  91. }
  92. existChannel := make(map[int64]struct{})
  93. for _, item := range playerChannels {
  94. existChannel[item.Playerid] = struct{}{}
  95. }
  96. //获取不存在channel信息的用户 并创建channel信息
  97. playerChannelDataMap := make(map[int64]*model.PlayerChannel)
  98. advPlayerIds := make([]int64, 0)
  99. for _, playerId := range playerIds {
  100. if _, ok := existChannel[playerId]; ok {
  101. //存在则跳过
  102. continue
  103. }
  104. //adv所用id
  105. advPlayerIds = append(advPlayerIds, playerId)
  106. playerChannelDataMap[playerId] = &model.PlayerChannel{
  107. Playerid: playerId,
  108. ChannelID: "0",
  109. }
  110. }
  111. //获取adv信息
  112. advModel := &model.AdvOriginLog{}
  113. advQuery := query.Use(config.DB.Scopes(model.TableOfYearMonth(advModel.TableName(), time.Now()))).AdvOriginLog.Table("")
  114. advList, err := advQuery.Where(advQuery.Userid.In(advPlayerIds...), advQuery.NewUser.Eq(1)).Find()
  115. if err != nil {
  116. return
  117. }
  118. //如果有adv信息 填入channelData
  119. existAdv := make(map[int64]*model.AdvOriginLog)
  120. for _, item := range advList {
  121. existAdv[item.Userid] = item
  122. }
  123. needUpdateAdvInfo := make(map[int64]string)
  124. for _, advPlayerId := range advPlayerIds {
  125. if advInfo, ok := existAdv[advPlayerId]; ok {
  126. //如果有adv信息 填入channelData
  127. playerChannelDataMap[advPlayerId].ChannelID = advInfo.Traceid
  128. //如果traceid已经存在需要则需要更新
  129. if advInfo.Traceid != "0" && advInfo.Traceid != "" {
  130. //组装需要更新的数据
  131. needUpdateAdvInfo[advPlayerId] = advInfo.Traceid
  132. }
  133. }
  134. }
  135. //更新
  136. j.batchSaveAdvChannelEvent(needUpdateAdvInfo)
  137. //保存
  138. playerChannelData := make([]*model.PlayerChannel, len(playerChannelDataMap))
  139. i := 0
  140. for _, item := range playerChannelDataMap {
  141. playerChannelData[i] = item
  142. i++
  143. }
  144. config.DB.Create(&playerChannelData)
  145. }
  146. func (j *jSyncChannel) batchSaveAdvChannelEvent(UpdateData map[int64]string) {
  147. //每个user对应的traceId还不一样 只能循环更新了
  148. for playerId, traceId := range UpdateData {
  149. // 登录日志
  150. config.DB.Model(&model.LoginLog{}).
  151. Where("`user_id` = ? and `channel_id` != ?", playerId, traceId).
  152. Update("channel_id", traceId)
  153. // 充值订单
  154. config.DB.Model(&model.Order{}).
  155. Where("`player_id` = ? and `channel_id` != ?", playerId, traceId).
  156. Update("channel_id", traceId)
  157. }
  158. }
  159. func (j *jSyncChannel) instChannel(userId int64) {
  160. var (
  161. models *model.PlayerChannel
  162. advModel *model.AdvOriginLog
  163. )
  164. config.DB.Where("`playerid` = ?", userId).First(&models)
  165. if models == nil || models.ID == 0 {
  166. user := model.PlayerChannel{Playerid: userId, ChannelID: "0"}
  167. config.DB.Where("`userid` = ? and `new_user` = 1", userId).First(&advModel)
  168. if advModel != nil {
  169. user.ChannelID = advModel.Traceid
  170. if advModel.Traceid != "0" && advModel.Traceid != "" {
  171. j.saveAdvChannelEvent(userId, advModel.Traceid)
  172. }
  173. }
  174. config.DB.Create(&user)
  175. }
  176. }
  177. // saveAdvChannelEvent 更新已存在数据的相关统计,因为有可能渠道更新前相关统计数据就已经存在
  178. func (j *jSyncChannel) saveAdvChannelEvent(userId int64, channelID string) {
  179. // 登录日志
  180. config.DB.Model(&model.LoginLog{}).
  181. Where("`user_id` = ? and `channel_id` != ?", userId, channelID).
  182. Update("channel_id", channelID)
  183. // 充值订单
  184. config.DB.Model(&model.Order{}).
  185. Where("`player_id` = ? and `channel_id` != ?", userId, channelID).
  186. Update("channel_id", channelID)
  187. }