changed.go 7.4 KB


  1. package jobs
  2. import (
  3. "encoding/json"
  4. "gadmin/config"
  5. "gadmin/internal/admin/service"
  6. "gadmin/internal/gorm/model"
  7. "gadmin/internal/gorm/query"
  8. "gadmin/utility"
  9. "github.com/sirupsen/logrus"
  10. "sync"
  11. "time"
  12. )
  13. var Changed = new(jChanged)
  14. type jChanged struct {
  15. sync.RWMutex
  16. }
  17. func (j *jChanged) Run() {
  18. logrus.Info("jChanged Run.....")
  19. //if os.Getenv("GIN_MODE") == "release" {
  20. // j.RLock()
  21. // defer j.RUnlock()
  22. //
  23. // for serverId, _ := range config.GDBGroup {
  24. // j.updateSts(serverId, 1) // 钻石
  25. // j.updateSts(serverId, 2) // 金币
  26. // j.updatePlayer(serverId, 99) // 玩家消费统计
  27. // }
  28. //} else {
  29. // logrus.Warn("测试环境无需运行,跳过..")
  30. //}
  31. }
  32. func (j *jChanged) updateSts(serverId int, t int64) {
  33. take, err := j.getTakeDate(serverId, t)
  34. if err != nil {
  35. return
  36. }
  37. if take == nil {
  38. return
  39. }
  40. channelIds, err := service.Channel.Ids()
  41. if err != nil {
  42. logrus.Warningf("Channel.Ids,err:%v", err)
  43. return
  44. }
  45. for _, channelId := range channelIds {
  46. // 这里同时更新一下昨天的,保证昨天的是完整统计
  47. j.updateStatistics(serverId, t, take.Date.AddDate(0, 0, -1), channelId)
  48. j.updateStatistics(serverId, t, take.Date, channelId)
  49. }
  50. }
  51. func (j *jChanged) updateStatistics(serverId int, t int64, date time.Time, channelId string) {
  52. logrus.Infof("updateStatistics serverId:%v, t:%v, 统计日期:%+v, channelId:%v", serverId, t, date, channelId)
  53. type ChangedStatistic struct {
  54. ID int64
  55. Type int32
  56. Source int32
  57. Amount int64
  58. Players []int64
  59. Counts int64
  60. Date string
  61. }
  62. var (
  63. q = query.Use(config.DB).ChangedLog
  64. m = q.Where(q.Date.Eq(date)).Where(q.ChannelID.Eq(channelId), q.ServerID.Eq(int32(serverId)))
  65. saveLists []*ChangedStatistic
  66. )
  67. if t == 1 {
  68. m = m.Where(q.Diamond.Lt(0))
  69. } else {
  70. m = m.Where(q.Coin.Lt(0))
  71. }
  72. lists, err := m.Find()
  73. if err != nil {
  74. logrus.Warnf("updateStatistics Find err:%+v", err)
  75. return
  76. }
  77. addAmount := func(statistic *ChangedStatistic, v *model.ChangedLog, t int64) int64 {
  78. var amount int64
  79. if statistic == nil {
  80. statistic = new(ChangedStatistic)
  81. }
  82. if t == 1 {
  83. amount = statistic.Amount + v.Diamond
  84. } else {
  85. amount = statistic.Amount + v.Coin
  86. }
  87. return amount
  88. }
  89. for _, v := range lists {
  90. var exist bool
  91. for _, statistic := range saveLists {
  92. if statistic.Source == v.Source {
  93. exist = true
  94. statistic.Amount = addAmount(statistic, v, t)
  95. statistic.Players = append(statistic.Players, v.UserID)
  96. statistic.Counts++
  97. }
  98. }
  99. if exist == false {
  100. saveLists = append(saveLists, &ChangedStatistic{
  101. Type: int32(t),
  102. Source: v.Source,
  103. Amount: addAmount(nil, v, t),
  104. Players: []int64{v.UserID},
  105. Counts: 1,
  106. Date: v.Date.Format("2006-01-02"),
  107. })
  108. }
  109. }
  110. for _, vv := range saveLists {
  111. vv.Players = utility.UniqueInt64s(vv.Players)
  112. var (
  113. qs = query.Use(config.DB).ChangedStatistic
  114. stm *model.ChangedStatistic
  115. data model.ChangedStatistic
  116. )
  117. if err = qs.Where(qs.Type.Eq(int32(t)), qs.Source.Eq(vv.Source), qs.Date.Eq(vv.Date), qs.ChannelID.Eq(channelId), qs.ServerID.Eq(int32(serverId))).Scan(&stm); err != nil {
  118. logrus.Warnf("saveLists Scan err:%+v", err)
  119. return
  120. }
  121. b, _ := json.Marshal(vv.Players)
  122. data.ServerID = int32(serverId)
  123. data.ChannelID = channelId
  124. data.Type = int32(t)
  125. data.Source = vv.Source
  126. data.Amount = vv.Amount
  127. data.Players = string(b)
  128. data.Counts = vv.Counts
  129. data.Date = vv.Date
  130. logrus.Warnf("data:%+v", data)
  131. if stm != nil && stm.ID > 0 {
  132. if _, err = query.Use(config.DB).ChangedStatistic.Where(qs.ID.Eq(stm.ID)).Updates(&data); err != nil {
  133. logrus.Warnf("saveLists Updates err:%+v", err)
  134. return
  135. }
  136. } else {
  137. if err = query.Use(config.DB).ChangedStatistic.Create(&data); err != nil {
  138. logrus.Warnf("saveLists Create err:%+v", err)
  139. return
  140. }
  141. }
  142. }
  143. var lastData = new(model.ChangedLog)
  144. if len(lists) == 0 {
  145. lastData.Date = date
  146. }
  147. if len(lists) == 1 {
  148. lastData = lists[0]
  149. }
  150. if len(lists) > 1 {
  151. lastData = lists[len(lists)-1]
  152. }
  153. var qsc = query.Use(config.DB).ChangedSync
  154. _, err = query.Use(config.DB).ChangedSync.Where(qsc.Type.Eq(t)).Updates(&model.ChangedSync{
  155. ServerID: int32(serverId),
  156. LastID: lastData.ID,
  157. LastSyncTime: lastData.Date,
  158. UpdatedAt: time.Now(),
  159. })
  160. if err != nil {
  161. logrus.Warnf("updateStatistics ChangedSync Updates err:%+v", err)
  162. return
  163. }
  164. }
  165. func (j *jChanged) updatePlayer(serverId int, t int64) {
  166. take, err := j.getSyncModel(serverId, t)
  167. if err != nil {
  168. logrus.Info("updatePlayer getLastDate err :", err)
  169. return
  170. }
  171. if take == nil {
  172. return
  173. }
  174. var (
  175. q = query.Use(config.DB).ChangedLog
  176. m = q.Where(q.ID.Gt(take.LastID), q.ServerID.Eq(int32(serverId)))
  177. )
  178. lists, err := m.Find()
  179. if err != nil {
  180. logrus.Warnf("updatePlayer Find err:%+v", err)
  181. return
  182. }
  183. savePlayerConsumption := func(c *model.ChangedLog) error {
  184. var (
  185. qp = query.Use(config.DB).ChangedPlayer
  186. stm *model.ChangedPlayer
  187. )
  188. if err = qp.Where(qp.Playerid.Eq(c.UserID)).Scan(&stm); err != nil {
  189. logrus.Warnf("savePlayerConsumption Scan err:%+v", err)
  190. return err
  191. }
  192. if stm != nil && stm.ID > 0 {
  193. // 已更新过的 直接跳过
  194. if stm.LastCid >= c.ID {
  195. return nil
  196. }
  197. if _, err = query.Use(config.DB).ChangedPlayer.Where(qp.ID.Eq(stm.ID)).Updates(&model.ChangedPlayer{
  198. ServerID: int32(serverId),
  199. ExpendCoin: stm.ExpendCoin + c.Coin,
  200. ExpendDiamond: stm.ExpendDiamond + c.Diamond,
  201. LastCid: c.ID,
  202. UpdatedAt: time.Unix(int64(c.Time), 0),
  203. }); err != nil {
  204. logrus.Warnf("savePlayerConsumption Updates err:%+v", err)
  205. return err
  206. }
  207. } else {
  208. if err = query.Use(config.DB).ChangedPlayer.Create(&model.ChangedPlayer{
  209. ServerID: int32(serverId),
  210. Playerid: c.UserID,
  211. ExpendCoin: c.Coin,
  212. ExpendDiamond: c.Diamond,
  213. LastCid: c.ID,
  214. UpdatedAt: time.Unix(int64(c.Time), 0),
  215. CreatedAt: time.Unix(int64(c.Time), 0),
  216. }); err != nil {
  217. logrus.Warnf("savePlayerConsumption Create err:%+v", err)
  218. return err
  219. }
  220. }
  221. return nil
  222. }
  223. for _, v := range lists {
  224. if v.Coin < 0 || v.Diamond < 0 {
  225. if err := savePlayerConsumption(v); err != nil {
  226. return
  227. }
  228. }
  229. }
  230. if len(lists) == 0 {
  231. return
  232. }
  233. var lastData = new(model.ChangedLog)
  234. if len(lists) == 1 {
  235. lastData = lists[0]
  236. } else {
  237. lastData = lists[len(lists)-1]
  238. }
  239. var qsc = query.Use(config.DB).ChangedSync
  240. _, err = query.Use(config.DB).ChangedSync.Where(qsc.Type.Eq(t), qsc.ServerID.Eq(int32(serverId))).Updates(&model.ChangedSync{
  241. LastID: lastData.ID,
  242. LastSyncTime: lastData.Date,
  243. UpdatedAt: time.Now(),
  244. })
  245. if err != nil {
  246. logrus.Warnf("updatePlayer ChangedSync Updates err:%+v", err)
  247. return
  248. }
  249. }
  250. func (j *jChanged) getTakeDate(serverId int, t int64) (*model.ChangedLog, error) {
  251. mod, err := j.getSyncModel(serverId, t)
  252. if err != nil {
  253. logrus.Warn("getLastDate err :", err)
  254. return nil, err
  255. }
  256. if mod == nil {
  257. logrus.Warn("getLastDate mod == nil")
  258. return nil, nil
  259. }
  260. q := query.Use(config.DB).ChangedLog
  261. take, err := q.Where(q.Date.Gt(mod.LastSyncTime), q.ServerID.Eq(int32(serverId))).Take()
  262. if err != nil {
  263. logrus.Warn("getLastDate Take err :", err)
  264. return nil, err
  265. }
  266. return take, nil
  267. }
  268. // 获取同步模型
  269. func (j *jChanged) getSyncModel(serverId int, t int64) (m *model.ChangedSync, err error) {
  270. var q = query.Use(config.DB).ChangedSync
  271. if err = q.Where(q.Type.Eq(t), q.ServerID.Eq(int32(serverId))).Scan(&m); err != nil {
  272. return
  273. }
  274. return m, nil
  275. }