migrate.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. package jobs
  2. import (
  3. "encoding/json"
  4. "gadmin/config"
  5. "gadmin/internal/gorm/model"
  6. "gadmin/internal/gorm/query"
  7. "github.com/sirupsen/logrus"
  8. "gorm.io/gen"
  9. "os"
  10. "sync"
  11. "time"
  12. )
  13. var (
  14. Migrate = new(jMigrate)
  15. migrateUserLock sync.RWMutex
  16. migrateUserMap = make(map[int64]int64, 0)
  17. )
  18. type jMigrate struct {
  19. sync.Mutex
  20. }
  21. func (j *jMigrate) Run(table string) {
  22. logrus.Infof("Migrate Run table:%v.....", table)
  23. if os.Getenv("GIN_MODE") != "release" && os.Getenv("ADMIN_IS_LOCAL") != "1" {
  24. logrus.Warnf("测试环境禁止同步")
  25. return
  26. }
  27. j.Lock()
  28. defer j.Unlock()
  29. switch table {
  30. case "changed_logs":
  31. j.migrateChangedLogs()
  32. case "changed_player":
  33. j.migrateChangedPlayer()
  34. case "chapter_logs_user_details":
  35. j.migrateChapterLogsUserDetails()
  36. case "gem_player":
  37. j.migrateGemPlayer()
  38. case "orders":
  39. j.migrateOrders()
  40. case "player_channel":
  41. j.migratePlayerChannel()
  42. case "redeem_received":
  43. j.migrateRedeemReceived()
  44. case "changed_statistics":
  45. j.migrateChangedStatistics()
  46. case "login_logs":
  47. j.migrateLoginLogs()
  48. case "chapter":
  49. j.migrateChapter()
  50. case "chapter_logs":
  51. j.migrateChapterLogs()
  52. case "advertisement_logs":
  53. j.migrateAdvertisementLogs()
  54. case "disconnect_logs":
  55. j.migrateDisconnectLogs()
  56. default:
  57. logrus.Warnf("不支持的迁移表:%v", table)
  58. }
  59. }
  60. func (j *jMigrate) migrateChangedStatistics() {
  61. var (
  62. results []*model.ChangedStatistic
  63. m = query.Use(config.DB).ChangedStatistic
  64. )
  65. err := m.Select(m.ID, m.Players).
  66. FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
  67. for _, result := range results {
  68. var (
  69. players []int64
  70. newPlayers []int64
  71. )
  72. if err := json.Unmarshal([]byte(result.Players), &players); err != nil {
  73. logrus.Errorf("migrateChangedStatistics Unmarshal err:%+v, result:%+v", err, result)
  74. continue
  75. }
  76. for _, player := range players {
  77. newPlayerId := j.getNewPlayerId(player)
  78. if newPlayerId > 0 {
  79. newPlayers = append(newPlayers, newPlayerId)
  80. } else {
  81. newPlayers = append(newPlayers, player)
  82. }
  83. }
  84. marshal, err := json.Marshal(newPlayers)
  85. if err != nil {
  86. logrus.Errorf("migrateChangedStatistics Marshal err:%+v, result:%+v", err, result)
  87. continue
  88. }
  89. if _, err = query.Use(config.DB).ChangedStatistic.Where(m.ID.Eq(result.ID)).Updates(&model.ChangedStatistic{Players: string(marshal)}); err != nil {
  90. panic(err)
  91. }
  92. }
  93. return nil
  94. })
  95. if err != nil {
  96. panic(err)
  97. }
  98. logrus.Warnf("migrateChangedStatistics finished")
  99. }
  100. func (j *jMigrate) migrateRedeemReceived() {
  101. var (
  102. results []*model.RedeemReceived
  103. m = query.Use(config.DB).RedeemReceived
  104. )
  105. err := m.Select(m.ID, m.Playerid).
  106. FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
  107. for _, result := range results {
  108. if newPlayerId := j.getNewPlayerId(result.Playerid); newPlayerId > 0 {
  109. if _, err := query.Use(config.DB).RedeemReceived.Where(m.ID.Eq(result.ID)).Updates(&model.RedeemReceived{Playerid: newPlayerId}); err != nil {
  110. panic(err)
  111. }
  112. }
  113. }
  114. return nil
  115. })
  116. if err != nil {
  117. panic(err)
  118. }
  119. logrus.Warnf("migrateRedeemReceived finished")
  120. }
  121. func (j *jMigrate) migratePlayerChannel() {
  122. var (
  123. results []*model.PlayerChannel
  124. m = query.Use(config.DB).PlayerChannel
  125. )
  126. err := m.Select(m.ID, m.Playerid).
  127. FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
  128. for _, result := range results {
  129. if newPlayerId := j.getNewPlayerId(int64(result.Playerid)); newPlayerId > 0 {
  130. if _, err := query.Use(config.DB).PlayerChannel.Where(m.ID.Eq(result.ID)).Updates(&model.PlayerChannel{Playerid: newPlayerId}); err != nil {
  131. panic(err)
  132. }
  133. }
  134. }
  135. return nil
  136. })
  137. if err != nil {
  138. panic(err)
  139. }
  140. logrus.Warnf("migratePlayerChannel finished")
  141. }
  142. func (j *jMigrate) migrateOrders() {
  143. var (
  144. results []*model.Order
  145. m = query.Use(config.DB).Order
  146. )
  147. err := m.Select(m.ID, m.PlayerID).
  148. FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
  149. for _, result := range results {
  150. if newPlayerId := j.getNewPlayerId(int64(result.PlayerID)); newPlayerId > 0 {
  151. if _, err := query.Use(config.DB).Order.Where(m.ID.Eq(result.ID)).Updates(&model.Order{PlayerID: newPlayerId}); err != nil {
  152. panic(err)
  153. }
  154. }
  155. }
  156. return nil
  157. })
  158. if err != nil {
  159. panic(err)
  160. }
  161. logrus.Warnf("migrateOrders finished")
  162. }
  163. func (j *jMigrate) migrateGemPlayer() {
  164. var (
  165. results []*model.GemPlayer
  166. m = query.Use(config.DB).GemPlayer
  167. )
  168. err := m.Select(m.ID, m.UserID).
  169. FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
  170. for _, result := range results {
  171. if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 {
  172. if _, err := query.Use(config.DB).GemPlayer.Where(m.ID.Eq(result.ID)).Updates(&model.GemPlayer{UserID: newPlayerId}); err != nil {
  173. panic(err)
  174. }
  175. }
  176. }
  177. return nil
  178. })
  179. if err != nil {
  180. panic(err)
  181. }
  182. logrus.Warnf("migrateGemPlayer finished")
  183. }
  184. func (j *jMigrate) migrateChapterLogsUserDetails() {
  185. var tables = []string{"202304"} // 分表 "", "202302",
  186. for _, table := range tables {
  187. tab := config.DB.Scopes(model.ChapterLogsUserDetailTableSetDate(table))
  188. logrus.Warnf("正在更新表:%v", table)
  189. var (
  190. results []*model.ChapterLogsUserDetail
  191. m = query.Use(tab).ChapterLogsUserDetail.Table("")
  192. )
  193. err := m.Select(m.ID, m.UserID).
  194. FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
  195. for _, result := range results {
  196. if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 {
  197. if _, err := query.Use(tab).ChapterLogsUserDetail.Where(m.ID.Eq(result.ID)).Updates(&model.ChapterLogsUserDetail{UserID: newPlayerId}); err != nil {
  198. panic(err)
  199. }
  200. }
  201. }
  202. return nil
  203. })
  204. if err != nil {
  205. panic(err)
  206. }
  207. }
  208. logrus.Warnf("migrateChapterLogsUserDetails finished")
  209. }
  210. func (j *jMigrate) migrateChangedPlayer() {
  211. var (
  212. results []*model.ChangedPlayer
  213. m = query.Use(config.DB).ChangedPlayer
  214. )
  215. err := m.Select(m.ID, m.Playerid).
  216. FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
  217. for _, result := range results {
  218. if newPlayerId := j.getNewPlayerId(result.Playerid); newPlayerId > 0 {
  219. if _, err := query.Use(config.DB).ChangedPlayer.Where(m.ID.Eq(result.ID)).Updates(&model.ChangedPlayer{Playerid: newPlayerId}); err != nil {
  220. panic(err)
  221. }
  222. }
  223. }
  224. return nil
  225. })
  226. if err != nil {
  227. panic(err)
  228. }
  229. logrus.Warnf("migrateChangedPlayer finished")
  230. }
  231. func (j *jMigrate) migrateLoginLogs() {
  232. var (
  233. results []*model.LoginLog
  234. m = query.Use(config.DB).LoginLog
  235. )
  236. err := m.Select(m.ID, m.UserID).
  237. FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error {
  238. logrus.Warnf("results:%+v", len(results))
  239. for _, result := range results {
  240. if result.UserID == 0 {
  241. logrus.Warnf("跳过...")
  242. continue
  243. }
  244. if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 {
  245. if _, err := query.Use(config.DB).LoginLog.Where(m.ID.Eq(result.ID)).Updates(&model.LoginLog{UserID: newPlayerId}); err != nil {
  246. panic(err)
  247. }
  248. } else {
  249. logrus.Warnf("跳过1...")
  250. }
  251. }
  252. return nil
  253. })
  254. if err != nil {
  255. panic(err)
  256. }
  257. logrus.Warnf("migrateLoginLog finished")
  258. }
  259. func (j *jMigrate) migrateChangedLogs() {
  260. var (
  261. results []*model.ChangedLog
  262. m = query.Use(config.DB).ChangedLog
  263. )
  264. err := m.Select(m.ID, m.UserID).Where(m.ID.Gt(22554361)).
  265. FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error {
  266. for _, result := range results {
  267. if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 {
  268. if _, err := query.Use(config.DB).ChangedLog.Where(m.ID.Eq(result.ID)).Updates(&model.ChangedLog{UserID: newPlayerId}); err != nil {
  269. panic(err)
  270. }
  271. } else {
  272. logrus.Warnf("跳过1...")
  273. }
  274. }
  275. return nil
  276. })
  277. if err != nil {
  278. panic(err)
  279. }
  280. logrus.Warnf("migrateChangedLogs finished")
  281. }
  282. func (j *jMigrate) migrateChapter() {
  283. var (
  284. results []*model.Chapter
  285. m = query.Use(config.DB).Chapter
  286. )
  287. err := m.Select(m.ID, m.PlayerID).
  288. FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error {
  289. for _, result := range results {
  290. if newPlayerId := j.getNewPlayerId(int64(result.PlayerID)); newPlayerId > 0 {
  291. if _, err := query.Use(config.DB).Chapter.Where(m.ID.Eq(result.ID)).Updates(&model.Chapter{PlayerID: newPlayerId}); err != nil {
  292. panic(err)
  293. }
  294. } else {
  295. logrus.Warnf("跳过1...")
  296. }
  297. }
  298. return nil
  299. })
  300. if err != nil {
  301. panic(err)
  302. }
  303. logrus.Warnf("migrateChapter finished")
  304. }
  305. func (j *jMigrate) migrateChapterLogs() {
  306. var tables = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 100, 101, 102, 103, 104, 105} // 分表 "", "202302",
  307. for _, table := range tables {
  308. tab := config.DB.Scopes(model.ChapterLogTableSetNum(table))
  309. logrus.Warnf("migrateChapterLogs 正在更新表:%v", table)
  310. var (
  311. results []*model.ChapterLog
  312. m = query.Use(tab).ChapterLog.Table("")
  313. )
  314. err := m.Select(m.ID, m.UserID, m.EventAt).
  315. FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
  316. for _, result := range results {
  317. if table == 0 && result.ID < 46557844 {
  318. continue
  319. }
  320. // 只更新03-22~04-22
  321. if result.EventAt < 1679414400 {
  322. continue
  323. }
  324. if result.EventAt > 1682092800 {
  325. continue
  326. }
  327. if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 {
  328. if _, err := query.Use(tab).ChapterLog.Where(m.ID.Eq(result.ID)).Updates(&model.ChapterLog{UserID: newPlayerId}); err != nil {
  329. panic(err)
  330. }
  331. }
  332. }
  333. return nil
  334. })
  335. if err != nil {
  336. panic(err)
  337. }
  338. }
  339. logrus.Warnf("migrateChapterLogs finished")
  340. }
  341. func (j *jMigrate) migrateAdvertisementLogs() {
  342. var (
  343. results []*model.AdvertisementLog
  344. m = query.Use(config.DB).AdvertisementLog
  345. )
  346. err := m.Select(m.ID, m.UserID).
  347. FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error {
  348. for _, result := range results {
  349. if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 {
  350. if _, err := query.Use(config.DB).AdvertisementLog.Where(m.ID.Eq(result.ID)).Updates(&model.AdvertisementLog{UserID: newPlayerId}); err != nil {
  351. panic(err)
  352. }
  353. } else {
  354. logrus.Warnf("跳过1...")
  355. }
  356. }
  357. return nil
  358. })
  359. if err != nil {
  360. panic(err)
  361. }
  362. logrus.Warnf("migrateAdvertisementLogs finished")
  363. }
  364. func (j *jMigrate) migrateDisconnectLogs() {
  365. var (
  366. disModel = &model.DisconnectLog{}
  367. results []*model.DisconnectLog
  368. m = query.Use(config.DB.Scopes(model.TableOfYearMonth(disModel.TableName(), time.Now()))).DisconnectLog.Table("")
  369. )
  370. err := m.Select(m.ID, m.Userid, m.EventAt).Where(m.ID.Gt(3052443)).
  371. FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error {
  372. for _, result := range results {
  373. // 只更新03-25~04-22
  374. if result.EventAt < 1679673600 {
  375. continue
  376. }
  377. if result.EventAt > 1682092800 {
  378. continue
  379. }
  380. if newPlayerId := j.getNewPlayerId(result.Userid); newPlayerId > 0 {
  381. if _, err := query.Use(config.DB.Scopes(model.TableOfYearMonth(disModel.TableName(), time.Now()))).DisconnectLog.Table("").Where(m.ID.Eq(result.ID)).Updates(&model.DisconnectLog{Userid: newPlayerId}); err != nil {
  382. panic(err)
  383. }
  384. } else {
  385. logrus.Warnf("跳过1...")
  386. }
  387. }
  388. return nil
  389. })
  390. if err != nil {
  391. panic(err)
  392. }
  393. logrus.Warnf("migrateDisconnectLogs finished")
  394. }
  395. func (j *jMigrate) getNewPlayerId(oldPlayerId int64) int64 {
  396. //if val, ok := migrateUserMap[oldPlayerId]; ok {
  397. // return val
  398. //}
  399. //
  400. //migrateUserLock.RLock()
  401. //defer migrateUserLock.RUnlock()
  402. //
  403. //var user *model.MigratePlayer
  404. //config.MigrateDB.Select("playerid").Where("oldPlayerId = ?", oldPlayerId).First(&user)
  405. //if user == nil {
  406. // logrus.Warnf("getNewPlayerId user == nil, oldPlayerId:%v", oldPlayerId)
  407. // return 0
  408. //}
  409. //
  410. //migrateUserMap[oldPlayerId] = user.Playerid
  411. //if len(migrateUserMap) >= 30000 {
  412. // migrateUserMap = make(map[int64]int64, 0)
  413. //}
  414. //
  415. //return user.Playerid
  416. return 0
  417. }