123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463 |
- package jobs
- import (
- "encoding/json"
- "gadmin/config"
- "gadmin/internal/gorm/model"
- "gadmin/internal/gorm/query"
- "github.com/sirupsen/logrus"
- "gorm.io/gen"
- "os"
- "sync"
- "time"
- )
- var (
- Migrate = new(jMigrate)
- migrateUserLock sync.RWMutex
- migrateUserMap = make(map[int64]int64, 0)
- )
- type jMigrate struct {
- sync.Mutex
- }
- func (j *jMigrate) Run(table string) {
- logrus.Infof("Migrate Run table:%v.....", table)
- if os.Getenv("GIN_MODE") != "release" && os.Getenv("ADMIN_IS_LOCAL") != "1" {
- logrus.Warnf("测试环境禁止同步")
- return
- }
- j.Lock()
- defer j.Unlock()
- switch table {
- case "changed_logs":
- j.migrateChangedLogs()
- case "changed_player":
- j.migrateChangedPlayer()
- case "chapter_logs_user_details":
- j.migrateChapterLogsUserDetails()
- case "gem_player":
- j.migrateGemPlayer()
- case "orders":
- j.migrateOrders()
- case "player_channel":
- j.migratePlayerChannel()
- case "redeem_received":
- j.migrateRedeemReceived()
- case "changed_statistics":
- j.migrateChangedStatistics()
- case "login_logs":
- j.migrateLoginLogs()
- case "chapter":
- j.migrateChapter()
- case "chapter_logs":
- j.migrateChapterLogs()
- case "advertisement_logs":
- j.migrateAdvertisementLogs()
- case "disconnect_logs":
- j.migrateDisconnectLogs()
- default:
- logrus.Warnf("不支持的迁移表:%v", table)
- }
- }
- func (j *jMigrate) migrateChangedStatistics() {
- var (
- results []*model.ChangedStatistic
- m = query.Use(config.DB).ChangedStatistic
- )
- err := m.Select(m.ID, m.Players).
- FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
- for _, result := range results {
- var (
- players []int64
- newPlayers []int64
- )
- if err := json.Unmarshal([]byte(result.Players), &players); err != nil {
- logrus.Errorf("migrateChangedStatistics Unmarshal err:%+v, result:%+v", err, result)
- continue
- }
- for _, player := range players {
- newPlayerId := j.getNewPlayerId(player)
- if newPlayerId > 0 {
- newPlayers = append(newPlayers, newPlayerId)
- } else {
- newPlayers = append(newPlayers, player)
- }
- }
- marshal, err := json.Marshal(newPlayers)
- if err != nil {
- logrus.Errorf("migrateChangedStatistics Marshal err:%+v, result:%+v", err, result)
- continue
- }
- if _, err = query.Use(config.DB).ChangedStatistic.Where(m.ID.Eq(result.ID)).Updates(&model.ChangedStatistic{Players: string(marshal)}); err != nil {
- panic(err)
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- logrus.Warnf("migrateChangedStatistics finished")
- }
- func (j *jMigrate) migrateRedeemReceived() {
- var (
- results []*model.RedeemReceived
- m = query.Use(config.DB).RedeemReceived
- )
- err := m.Select(m.ID, m.Playerid).
- FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
- for _, result := range results {
- if newPlayerId := j.getNewPlayerId(result.Playerid); newPlayerId > 0 {
- if _, err := query.Use(config.DB).RedeemReceived.Where(m.ID.Eq(result.ID)).Updates(&model.RedeemReceived{Playerid: newPlayerId}); err != nil {
- panic(err)
- }
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- logrus.Warnf("migrateRedeemReceived finished")
- }
- func (j *jMigrate) migratePlayerChannel() {
- var (
- results []*model.PlayerChannel
- m = query.Use(config.DB).PlayerChannel
- )
- err := m.Select(m.ID, m.Playerid).
- FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
- for _, result := range results {
- if newPlayerId := j.getNewPlayerId(int64(result.Playerid)); newPlayerId > 0 {
- if _, err := query.Use(config.DB).PlayerChannel.Where(m.ID.Eq(result.ID)).Updates(&model.PlayerChannel{Playerid: newPlayerId}); err != nil {
- panic(err)
- }
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- logrus.Warnf("migratePlayerChannel finished")
- }
- func (j *jMigrate) migrateOrders() {
- var (
- results []*model.Order
- m = query.Use(config.DB).Order
- )
- err := m.Select(m.ID, m.PlayerID).
- FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
- for _, result := range results {
- if newPlayerId := j.getNewPlayerId(int64(result.PlayerID)); newPlayerId > 0 {
- if _, err := query.Use(config.DB).Order.Where(m.ID.Eq(result.ID)).Updates(&model.Order{PlayerID: newPlayerId}); err != nil {
- panic(err)
- }
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- logrus.Warnf("migrateOrders finished")
- }
- func (j *jMigrate) migrateGemPlayer() {
- var (
- results []*model.GemPlayer
- m = query.Use(config.DB).GemPlayer
- )
- err := m.Select(m.ID, m.UserID).
- FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
- for _, result := range results {
- if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 {
- if _, err := query.Use(config.DB).GemPlayer.Where(m.ID.Eq(result.ID)).Updates(&model.GemPlayer{UserID: newPlayerId}); err != nil {
- panic(err)
- }
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- logrus.Warnf("migrateGemPlayer finished")
- }
- func (j *jMigrate) migrateChapterLogsUserDetails() {
- var tables = []string{"202304"} // 分表 "", "202302",
- for _, table := range tables {
- tab := config.DB.Scopes(model.ChapterLogsUserDetailTableSetDate(table))
- logrus.Warnf("正在更新表:%v", table)
- var (
- results []*model.ChapterLogsUserDetail
- m = query.Use(tab).ChapterLogsUserDetail.Table("")
- )
- err := m.Select(m.ID, m.UserID).
- FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
- for _, result := range results {
- if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 {
- if _, err := query.Use(tab).ChapterLogsUserDetail.Where(m.ID.Eq(result.ID)).Updates(&model.ChapterLogsUserDetail{UserID: newPlayerId}); err != nil {
- panic(err)
- }
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- }
- logrus.Warnf("migrateChapterLogsUserDetails finished")
- }
- func (j *jMigrate) migrateChangedPlayer() {
- var (
- results []*model.ChangedPlayer
- m = query.Use(config.DB).ChangedPlayer
- )
- err := m.Select(m.ID, m.Playerid).
- FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
- for _, result := range results {
- if newPlayerId := j.getNewPlayerId(result.Playerid); newPlayerId > 0 {
- if _, err := query.Use(config.DB).ChangedPlayer.Where(m.ID.Eq(result.ID)).Updates(&model.ChangedPlayer{Playerid: newPlayerId}); err != nil {
- panic(err)
- }
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- logrus.Warnf("migrateChangedPlayer finished")
- }
- func (j *jMigrate) migrateLoginLogs() {
- var (
- results []*model.LoginLog
- m = query.Use(config.DB).LoginLog
- )
- err := m.Select(m.ID, m.UserID).
- FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error {
- logrus.Warnf("results:%+v", len(results))
- for _, result := range results {
- if result.UserID == 0 {
- logrus.Warnf("跳过...")
- continue
- }
- if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 {
- if _, err := query.Use(config.DB).LoginLog.Where(m.ID.Eq(result.ID)).Updates(&model.LoginLog{UserID: newPlayerId}); err != nil {
- panic(err)
- }
- } else {
- logrus.Warnf("跳过1...")
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- logrus.Warnf("migrateLoginLog finished")
- }
- func (j *jMigrate) migrateChangedLogs() {
- var (
- results []*model.ChangedLog
- m = query.Use(config.DB).ChangedLog
- )
- err := m.Select(m.ID, m.UserID).Where(m.ID.Gt(22554361)).
- FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error {
- for _, result := range results {
- if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 {
- if _, err := query.Use(config.DB).ChangedLog.Where(m.ID.Eq(result.ID)).Updates(&model.ChangedLog{UserID: newPlayerId}); err != nil {
- panic(err)
- }
- } else {
- logrus.Warnf("跳过1...")
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- logrus.Warnf("migrateChangedLogs finished")
- }
- func (j *jMigrate) migrateChapter() {
- var (
- results []*model.Chapter
- m = query.Use(config.DB).Chapter
- )
- err := m.Select(m.ID, m.PlayerID).
- FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error {
- for _, result := range results {
- if newPlayerId := j.getNewPlayerId(int64(result.PlayerID)); newPlayerId > 0 {
- if _, err := query.Use(config.DB).Chapter.Where(m.ID.Eq(result.ID)).Updates(&model.Chapter{PlayerID: newPlayerId}); err != nil {
- panic(err)
- }
- } else {
- logrus.Warnf("跳过1...")
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- logrus.Warnf("migrateChapter finished")
- }
- func (j *jMigrate) migrateChapterLogs() {
- var tables = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 100, 101, 102, 103, 104, 105} // 分表 "", "202302",
- for _, table := range tables {
- tab := config.DB.Scopes(model.ChapterLogTableSetNum(table))
- logrus.Warnf("migrateChapterLogs 正在更新表:%v", table)
- var (
- results []*model.ChapterLog
- m = query.Use(tab).ChapterLog.Table("")
- )
- err := m.Select(m.ID, m.UserID, m.EventAt).
- FindInBatches(&results, 2000, func(tx gen.Dao, batch int) error {
- for _, result := range results {
- if table == 0 && result.ID < 46557844 {
- continue
- }
- // 只更新03-22~04-22
- if result.EventAt < 1679414400 {
- continue
- }
- if result.EventAt > 1682092800 {
- continue
- }
- if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 {
- if _, err := query.Use(tab).ChapterLog.Where(m.ID.Eq(result.ID)).Updates(&model.ChapterLog{UserID: newPlayerId}); err != nil {
- panic(err)
- }
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- }
- logrus.Warnf("migrateChapterLogs finished")
- }
- func (j *jMigrate) migrateAdvertisementLogs() {
- var (
- results []*model.AdvertisementLog
- m = query.Use(config.DB).AdvertisementLog
- )
- err := m.Select(m.ID, m.UserID).
- FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error {
- for _, result := range results {
- if newPlayerId := j.getNewPlayerId(result.UserID); newPlayerId > 0 {
- if _, err := query.Use(config.DB).AdvertisementLog.Where(m.ID.Eq(result.ID)).Updates(&model.AdvertisementLog{UserID: newPlayerId}); err != nil {
- panic(err)
- }
- } else {
- logrus.Warnf("跳过1...")
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- logrus.Warnf("migrateAdvertisementLogs finished")
- }
- func (j *jMigrate) migrateDisconnectLogs() {
- var (
- disModel = &model.DisconnectLog{}
- results []*model.DisconnectLog
- m = query.Use(config.DB.Scopes(model.TableOfYearMonth(disModel.TableName(), time.Now()))).DisconnectLog.Table("")
- )
- err := m.Select(m.ID, m.Userid, m.EventAt).Where(m.ID.Gt(3052443)).
- FindInBatches(&results, 1000, func(tx gen.Dao, batch int) error {
- for _, result := range results {
- // 只更新03-25~04-22
- if result.EventAt < 1679673600 {
- continue
- }
- if result.EventAt > 1682092800 {
- continue
- }
- if newPlayerId := j.getNewPlayerId(result.Userid); newPlayerId > 0 {
- if _, err := query.Use(config.DB.Scopes(model.TableOfYearMonth(disModel.TableName(), time.Now()))).DisconnectLog.Table("").Where(m.ID.Eq(result.ID)).Updates(&model.DisconnectLog{Userid: newPlayerId}); err != nil {
- panic(err)
- }
- } else {
- logrus.Warnf("跳过1...")
- }
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- logrus.Warnf("migrateDisconnectLogs finished")
- }
- func (j *jMigrate) getNewPlayerId(oldPlayerId int64) int64 {
- //if val, ok := migrateUserMap[oldPlayerId]; ok {
- // return val
- //}
- //
- //migrateUserLock.RLock()
- //defer migrateUserLock.RUnlock()
- //
- //var user *model.MigratePlayer
- //config.MigrateDB.Select("playerid").Where("oldPlayerId = ?", oldPlayerId).First(&user)
- //if user == nil {
- // logrus.Warnf("getNewPlayerId user == nil, oldPlayerId:%v", oldPlayerId)
- // return 0
- //}
- //
- //migrateUserMap[oldPlayerId] = user.Playerid
- //if len(migrateUserMap) >= 30000 {
- // migrateUserMap = make(map[int64]int64, 0)
- //}
- //
- //return user.Playerid
- return 0
- }
|