package jobs import ( "fmt" "gadmin/config" "gadmin/internal/gorm/model" "gadmin/internal/gorm/query" "gadmin/utility/player" "github.com/sirupsen/logrus" "gorm.io/gen" "sync" ) var JoinChannel = new(jJoinChannel) type jJoinChannel struct { sync.RWMutex } func (j *jJoinChannel) Run() { logrus.Info("jJoinChannel Run.....") j.handle() } func (j *jJoinChannel) SetTable(table string) { switch table { case "chapter_logs_0", "chapter_logs_1", "chapter_logs_2", "chapter_logs_3", "chapter_logs_4", "chapter_logs_5", "chapter_logs_6", "chapter_logs_7", "chapter_logs_8", "chapter_logs_9", "chapter_logs_10", "chapter_logs_11", "chapter_logs_12", "chapter_logs_13", "chapter_logs_100", "chapter_logs_101", "chapter_logs_102", "chapter_logs_103", "chapter_logs_104", "chapter_logs_105", "chapter_logs_106": j.chapterLogs(table) default: logrus.Infof("jJoinChannel SetTable 无效参数:%+v", table) } } func (j *jJoinChannel) chapterLogs(table string) { j.RLock() defer j.RUnlock() logrus.Warnf("正在更新表:%v", table) var ( tab = config.DB.Scopes(model.ChapterLogTableSetName(table)) results []*model.ChapterLog u = query.Use(tab).ChapterLog.Table("") q = query.Use(tab).ChapterLog.Table("") ) first, _ := q.Where(q.EventAt.Gte(1680710400)).Order(q.ID).Limit(1).First() // 4.6以后的数据 if first == nil { first = new(model.ChapterLog) } err := u.Select(u.ID, u.UserID). Where(u.ID.Gte(first.ID)). Order(u.ID.Desc()). FindInBatches(&results, 50000, func(tx gen.Dao, batch int) error { for _, result := range results { if result.UserID == 0 { continue } channelID := player.GetUserChannel(result.UserID) //userCreatedAt := player.GetUserStamp(int64(result.PlayerID)) //if userCreatedAt == 0 { // fmt.Printf("result.PlayerID:%v, userCreatedAt= 0 \n", result.PlayerID) //} if channelID != "0" { if _, err := u.Where(u.ID.Eq(result.ID)).Updates(&model.OnlineDurationLog{ ChannelID: channelID, //UserCreatedAt: userCreatedAt, }); err != nil { fmt.Printf("Updates err: %+v", err) logrus.WithField("form", "JoinChannel").Warnf("Updates err: %+v", err) return err } } } return nil }) if err != nil { fmt.Printf("FindInBatches err:%+v", err) logrus.WithField("form", "JoinChannel").Errorf("FindInBatches err:%+v", err) } logrus.Infof("渠道ID导入完成!") } func (j *jJoinChannel) handle() { j.RLock() defer j.RUnlock() var ( results []*model.OnlineDurationLog u = query.Use(config.DB).OnlineDurationLog //startTime = now.With(time.Now().AddDate(0, -1, 0)).Time ) //logrus.Infof("开始重新导入渠道ID,date:%v, Unix:%+v", startTime, startTime.Unix()) err := u.Select(u.ID, u.UserID). Where(u.LoginAt.Gte(int32(1680710400))). // 4.6以后的数据 Order(u.ID.Desc()). FindInBatches(&results, 50000, func(tx gen.Dao, batch int) error { for _, result := range results { if result.UserID == 0 { continue } channelID := player.GetUserChannel(result.UserID) //userCreatedAt := player.GetUserStamp(int64(result.PlayerID)) //if userCreatedAt == 0 { // fmt.Printf("result.PlayerID:%v, userCreatedAt= 0 \n", result.PlayerID) //} if _, err := u.Where(u.ID.Eq(result.ID)).Updates(&model.OnlineDurationLog{ ChannelID: channelID, //UserCreatedAt: userCreatedAt, }); err != nil { fmt.Printf("Updates err: %+v", err) logrus.WithField("form", "JoinChannel").Warnf("Updates err: %+v", err) return err } } return nil }) if err != nil { fmt.Printf("FindInBatches err:%+v", err) logrus.WithField("form", "JoinChannel").Errorf("FindInBatches err:%+v", err) } logrus.Infof("渠道ID导入完成!") }