package jobs import ( "encoding/json" "gadmin/config" "gadmin/internal/gorm/model" "gadmin/internal/gorm/query" "github.com/sirupsen/logrus" "gorm.io/gen" "os" "sync" "time" ) var ( Migrate = new(jMigrate) migrateUserLock sync.RWMutex migrateUserMap = make(map[int64]int64, 0) ) type jMigrate struct { sync.Mutex } func (j *jMigrate) Run(table string) { logrus.Infof("Migrate Run table:%v.....", table) if os.Getenv("GIN_MODE") != "release" && os.Getenv("ADMIN_IS_LOCAL") != "1" { logrus.Warnf("测试环境禁止同步") return } j.Lock() defer j.Unlock() switch table { case "changed_logs": j.migrateChangedLogs() case "changed_player": j.migrateChangedPlayer() case "chapter_logs_user_details": j.migrateChapterLogsUserDetails() case "gem_player": j.migrateGemPlayer() case "orders": j.migrateOrders() case "player_channel": j.migratePlayerChannel() case "redeem_received": j.migrateRedeemReceived() case "changed_statistics": j.migrateChangedStatistics() case "login_logs": j.migrateLoginLogs() case "chapter": j.migrateChapter() case "chapter_logs": j.migrateChapterLogs() case "advertisement_logs": j.migrateAdvertisementLogs() case "disconnect_logs": j.migrateDisconnectLogs() default: logrus.Warnf("不支持的迁移表:%v", table) } } func (j *jMigrate) migrateChangedStatistics() { var ( results []*model.ChangedStatistic m = query.Use(config.DB).ChangedStatistic ) err := m.Select(m.ID, m.Players). FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error { for _, result := range results { var ( players []int64 newPlayers []int64 ) if err := json.Unmarshal([]byte(result.Players), &players); err != nil { logrus.Errorf("migrateChangedStatistics Unmarshal err:%+v, result:%+v", err, result) continue } for _, player := range players { newPlayerId := j.getNewPlayerId(player) if newPlayerId > 0 { newPlayers = append(newPlayers, newPlayerId) } else { newPlayers = append(newPlayers, player) } } marshal, err := json.Marshal(newPlayers) if err != nil { logrus.Errorf("migrateChangedStatistics Marshal err:%+v, result:%+v", err, result) continue } if _, err = query.Use(config.DB).ChangedStatistic.Where(m.ID.Eq(result.ID)).Updates(&model.ChangedStatistic{Players: string(marshal)}); err != nil { panic(err) } } return nil }) if err != nil { panic(err) } logrus.Warnf("migrateChangedStatistics finished") } func (j *jMigrate) migrateRedeemReceived() { var ( results []*model.RedeemReceived m = query.Use(config.DB).RedeemReceived ) err := m.Select(m.ID, m.Playerid). FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error { for _, result := range results { if newPlayerId := j.getNewPlayerId(result.Playerid); newPlayerId > 0 { if _, err := query.Use(config.DB).RedeemReceived.Where(m.ID.Eq(result.ID)).Updates(&model.RedeemReceived{Playerid: newPlayerId}); err != nil { panic(err) } } } return nil }) if err != nil { panic(err) } logrus.Warnf("migrateRedeemReceived finished") } func (j *jMigrate) migratePlayerChannel() { var ( results []*model.PlayerChannel m = query.Use(config.DB).PlayerChannel ) err := m.Select(m.ID, m.Playerid). FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error { for _, result := range results { if newPlayerId := j.getNewPlayerId(int64(result.Playerid)); newPlayerId > 0 { if _, err := query.Use(config.DB).PlayerChannel.Where(m.ID.Eq(result.ID)).Updates(&model.PlayerChannel{Playerid: newPlayerId}); err != nil { panic(err) } } } return nil }) if err != nil { panic(err) } logrus.Warnf("migratePlayerChannel finished") } func (j *jMigrate) migrateOrders() { var ( results []*model.Order m = query.Use(config.DB).Order ) err := m.Select(m.ID, m.PlayerID). FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error { for _, result := range results { if newPlayerId := j.getNewPlayerId(int64(result.PlayerID)); newPlayerId > 0 { if _, err := query.Use(config.DB).Order.Where(m.ID.Eq(result.ID)).Updates(&model.Order{PlayerID: newPlayerId}); err != nil { panic(err) } } } return nil }) if err != nil { panic(err) } logrus.Warnf("migrateOrders finished") } func (j *jMigrate) migrateGemPlayer() { var ( results []*model.GemPlayer m = query.Use(config.DB).GemPlayer ) err := m.Select(m.ID, m.UserID). FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error { for _, result := range results { if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 { if _, err := query.Use(config.DB).GemPlayer.Where(m.ID.Eq(result.ID)).Updates(&model.GemPlayer{UserID: newPlayerId}); err != nil { panic(err) } } } return nil }) if err != nil { panic(err) } logrus.Warnf("migrateGemPlayer finished") } func (j *jMigrate) migrateChapterLogsUserDetails() { var tables = []string{"202304"} // 分表 "", "202302", for _, table := range tables { tab := config.DB.Scopes(model.ChapterLogsUserDetailTableSetDate(table)) logrus.Warnf("正在更新表:%v", table) var ( results []*model.ChapterLogsUserDetail m = query.Use(tab).ChapterLogsUserDetail.Table("") ) err := m.Select(m.ID, m.UserID). FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error { for _, result := range results { if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 { if _, err := query.Use(tab).ChapterLogsUserDetail.Where(m.ID.Eq(result.ID)).Updates(&model.ChapterLogsUserDetail{UserID: newPlayerId}); err != nil { panic(err) } } } return nil }) if err != nil { panic(err) } } logrus.Warnf("migrateChapterLogsUserDetails finished") } func (j *jMigrate) migrateChangedPlayer() { var ( results []*model.ChangedPlayer m = query.Use(config.DB).ChangedPlayer ) err := m.Select(m.ID, m.Playerid). FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error { for _, result := range results { if newPlayerId := j.getNewPlayerId(result.Playerid); newPlayerId > 0 { if _, err := query.Use(config.DB).ChangedPlayer.Where(m.ID.Eq(result.ID)).Updates(&model.ChangedPlayer{Playerid: newPlayerId}); err != nil { panic(err) } } } return nil }) if err != nil { panic(err) } logrus.Warnf("migrateChangedPlayer finished") } func (j *jMigrate) migrateLoginLogs() { var ( results []*model.LoginLog m = query.Use(config.DB).LoginLog ) err := m.Select(m.ID, m.UserID). FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error { logrus.Warnf("results:%+v", len(results)) for _, result := range results { if result.UserID == 0 { logrus.Warnf("跳过...") continue } if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 { if _, err := query.Use(config.DB).LoginLog.Where(m.ID.Eq(result.ID)).Updates(&model.LoginLog{UserID: newPlayerId}); err != nil { panic(err) } } else { logrus.Warnf("跳过1...") } } return nil }) if err != nil { panic(err) } logrus.Warnf("migrateLoginLog finished") } func (j *jMigrate) migrateChangedLogs() { var ( results []*model.ChangedLog m = query.Use(config.DB).ChangedLog ) err := m.Select(m.ID, m.UserID).Where(m.ID.Gt(22554361)). FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error { for _, result := range results { if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 { if _, err := query.Use(config.DB).ChangedLog.Where(m.ID.Eq(result.ID)).Updates(&model.ChangedLog{UserID: newPlayerId}); err != nil { panic(err) } } else { logrus.Warnf("跳过1...") } } return nil }) if err != nil { panic(err) } logrus.Warnf("migrateChangedLogs finished") } func (j *jMigrate) migrateChapter() { var ( results []*model.Chapter m = query.Use(config.DB).Chapter ) err := m.Select(m.ID, m.PlayerID). FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error { for _, result := range results { if newPlayerId := j.getNewPlayerId(int64(result.PlayerID)); newPlayerId > 0 { if _, err := query.Use(config.DB).Chapter.Where(m.ID.Eq(result.ID)).Updates(&model.Chapter{PlayerID: newPlayerId}); err != nil { panic(err) } } else { logrus.Warnf("跳过1...") } } return nil }) if err != nil { panic(err) } logrus.Warnf("migrateChapter finished") } func (j *jMigrate) migrateChapterLogs() { var tables = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 100, 101, 102, 103, 104, 105} // 分表 "", "202302", for _, table := range tables { tab := config.DB.Scopes(model.ChapterLogTableSetNum(table)) logrus.Warnf("migrateChapterLogs 正在更新表:%v", table) var ( results []*model.ChapterLog m = query.Use(tab).ChapterLog.Table("") ) err := m.Select(m.ID, m.UserID, m.EventAt). FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error { for _, result := range results { if table == 0 && result.ID < 46557844 { continue } // 只更新03-22~04-22 if result.EventAt < 1679414400 { continue } if result.EventAt > 1682092800 { continue } if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 { if _, err := query.Use(tab).ChapterLog.Where(m.ID.Eq(result.ID)).Updates(&model.ChapterLog{UserID: newPlayerId}); err != nil { panic(err) } } } return nil }) if err != nil { panic(err) } } logrus.Warnf("migrateChapterLogs finished") } func (j *jMigrate) migrateAdvertisementLogs() { var ( results []*model.AdvertisementLog m = query.Use(config.DB).AdvertisementLog ) err := m.Select(m.ID, m.UserID). FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error { for _, result := range results { if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 { if _, err := query.Use(config.DB).AdvertisementLog.Where(m.ID.Eq(result.ID)).Updates(&model.AdvertisementLog{UserID: newPlayerId}); err != nil { panic(err) } } else { logrus.Warnf("跳过1...") } } return nil }) if err != nil { panic(err) } logrus.Warnf("migrateAdvertisementLogs finished") } func (j *jMigrate) migrateDisconnectLogs() { var ( disModel = &model.DisconnectLog{} results []*model.DisconnectLog m = query.Use(config.DB.Scopes(model.TableOfYearMonth(disModel.TableName(), time.Now()))).DisconnectLog.Table("") ) err := m.Select(m.ID, m.Userid, m.EventAt).Where(m.ID.Gt(3052443)). FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error { for _, result := range results { // 只更新03-25~04-22 if result.EventAt < 1679673600 { continue } if result.EventAt > 1682092800 { continue } if newPlayerId := j.getNewPlayerId(result.Userid); newPlayerId > 0 { if _, err := query.Use(config.DB.Scopes(model.TableOfYearMonth(disModel.TableName(), time.Now()))).DisconnectLog.Table("").Where(m.ID.Eq(result.ID)).Updates(&model.DisconnectLog{Userid: newPlayerId}); err != nil { panic(err) } } else { logrus.Warnf("跳过1...") } } return nil }) if err != nil { panic(err) } logrus.Warnf("migrateDisconnectLogs finished") } func (j *jMigrate) getNewPlayerId(oldPlayerId int64) int64 { //if val, ok := migrateUserMap[oldPlayerId]; ok { // return val //} // //migrateUserLock.RLock() //defer migrateUserLock.RUnlock() // //var user *model.MigratePlayer //config.MigrateDB.Select("playerid").Where("oldPlayerId = ?", oldPlayerId).First(&user) //if user == nil { // logrus.Warnf("getNewPlayerId user == nil, oldPlayerId:%v", oldPlayerId) // return 0 //} // //migrateUserMap[oldPlayerId] = user.Playerid //if len(migrateUserMap) >= 30000 { // migrateUserMap = make(map[int64]int64, 0) //} // //return user.Playerid return 0 }