package jobs import ( "context" "errors" "fmt" "gadmin/config" "gadmin/internal/gorm/model" "gadmin/internal/gorm/query" "gadmin/package/platformPay" "github.com/jinzhu/now" "github.com/nahid/gohttp" "github.com/sirupsen/logrus" "gorm.io/gorm" "os" "strconv" "time" ) //拉取客服聊天记录 type ChatRecord struct { RecordList []record `json:"recordlist,omitempty"` Number int `json:"number,omitempty"` MsgId int `json:"msgid,omitempty"` } type record struct { OpenId string `json:"openid,omitempty"` OperCode int `json:"opercode,omitempty"` Text string `json:"text,omitempty"` Time int64 `json:"time,omitempty"` Worker string `json:"worker,omitempty"` } var SyncCustomerServiceChatLog = new(JobCustomerServiceChatLog) type JobCustomerServiceChatLog struct{} func (j *JobCustomerServiceChatLog) Run() { if os.Getenv("ADMIN_PLATFORM") != "wechat" { logrus.Warn("客服聊天记录目前只有微信平台") return } logrus.Info("保存客服聊天记录开始...") var ( ctx = context.Background() chatLogQuery = query.Use(config.DB).CustomerServiceChatLog chatLogModel = chatLogQuery.WithContext(ctx) startDay = now.With(time.Now().AddDate(0, -2, 0)) startTime = startDay.BeginningOfDay().Unix() endTime = startDay.BeginningOfDay().Add(time.Hour * 24).Unix() msgId = 1 ) //获取最后一次拉取时间点 lastLog, err := chatLogModel.Order(chatLogQuery.Time.Desc()).First() if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { logrus.Errorf("JobCustomerServiceChatLog 获取mysql客服记录失败:%s", err.Error()) return } if lastLog != nil { startTime = lastLog.Time.Unix() endTime = time.Unix(startTime, 0).Add(time.Hour * 24).Unix() } //获取聊天记录 if config.LogRedis.Exists("chatLog:wx").Val() == 0 { config.LogRedis.HMSet("chatLog:wx", map[string]interface{}{ "startTime": startTime, "endTime": endTime, "msgId": msgId, }) config.LogRedis.Expire("chatLog:wx", time.Minute*25) } //处理数据 dataProcess := func(recordList []record) []*model.CustomerServiceChatLog { openIdMap := make(map[string]struct{}) saveModels := make([]*model.CustomerServiceChatLog, len(recordList)) playersMap := make(map[string]model.UserAccount) for _, item := range recordList { var player model.UserAccount openId := fmt.Sprintf("%s@%s", item.OpenId, "wx") if _, ok := openIdMap[openId]; ok { continue } openIdMap[openId] = struct{}{} err := config.LDB.WithContext(ctx).Scopes(model.UserAccountTableByKey(&model.UserAccount{}, openId)).Where("openid = ?", openId).First(&player).Error if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { logrus.Errorf("JobCustomerServiceChatLog 查询用户信息失败: %v", err) return nil } playersMap[openId] = player } for i, item := range recordList { openId := fmt.Sprintf("%s@%s", item.OpenId, "wx") //客服、玩家 operType := 1 if item.OperCode == 2002 { operType = 2 } player := playersMap[openId] saveModel := &model.CustomerServiceChatLog{ Type: int32(operType), AccID: player.AccID, OpenID: openId, PlayerName: player.Nickname, Text: item.Text, Time: time.Unix(item.Time, 0), CreatedAt: time.Now(), } saveModels[i] = saveModel } return saveModels } for config.LogRedis.Exists("chatLog:wx").Val() == 1 { params := config.LogRedis.HGetAll("chatLog:wx").Val() startTime, _ = strconv.ParseInt(params["startTime"], 10, 64) endTime, _ = strconv.ParseInt(params["endTime"], 10, 64) msgId, _ = strconv.Atoi(params["msgId"]) logRecord, err := j.getChatLog(startTime, endTime, msgId, 10000) if err != nil { logrus.Errorf("JobCustomerServiceChatLog 获取客服记录失败:%s", err.Error()) return } if len(logRecord.RecordList) == 0 { continue } saveModels := dataProcess(logRecord.RecordList) err = chatLogModel.CreateInBatches(saveModels, 1000) if err != nil { logrus.Errorf("JobCustomerServiceChatLog 保存客服聊天记录失败: %v", err) return } } logrus.Info("保存客服聊天记录完成...") } func (j *JobCustomerServiceChatLog) getChatLog(startTime, endTime int64, msgId, number int) (ChatRecord, error) { var resBody ChatRecord //获取accessToken token, _, err := platformPay.QueryAccessToken(1, 0) if err != nil { return resBody, fmt.Errorf("获取AccessToken失败: %s", err.Error()) } url := fmt.Sprintf("%s?access_token=%s", "https://api.weixin.qq.com/customservice/msgrecord/getmsglist", token) http := gohttp.NewRequest() formData := make(map[string]interface{}) formData["starttime"] = startTime formData["endtime"] = endTime formData["msgid"] = msgId formData["number"] = number post, err := http.JSON(formData).Post(url) if err != nil { return resBody, fmt.Errorf("获取客服聊天记录失败: %s", err.Error()) } err = post.UnmarshalBody(&resBody) if err != nil { return resBody, fmt.Errorf("解析客服聊天记录失败: %s", err.Error()) } //msgId入redis if number == resBody.Number { rdsChatLog := make(map[string]interface{}) rdsChatLog["msgId"] = resBody.MsgId rdsChatLog["startTime"] = startTime rdsChatLog["endTime"] = endTime config.LogRedis.HMSet("chatLog:wx", rdsChatLog) config.LogRedis.Expire("chatLog:wx", time.Minute*25) } else if len(resBody.RecordList) == 0 { startTimeDay := time.Unix(startTime, 0) today := time.Now() d1 := time.Date(startTimeDay.Year(), startTimeDay.Month(), startTimeDay.Day(), 0, 0, 0, 0, time.Local) d2 := time.Date(today.Year(), today.Month(), today.Day(), 0, 0, 0, 0, time.Local) if d1.Equal(d2) { //已经同步到今天了 并且没有数据 则删除key config.LogRedis.Del("chatLog:wx") } else { rdsChatLog := make(map[string]interface{}) startDay := startTimeDay.Add(time.Hour * 24) rdsChatLog["startTime"] = startDay.Unix() rdsChatLog["endTime"] = startDay.Add(time.Hour * 24).Unix() config.LogRedis.HMSet("chatLog:wx", rdsChatLog) config.LogRedis.Expire("chatLog:wx", time.Minute*25) } } else { config.LogRedis.Del("chatLog:wx") } return resBody, nil }