123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- package jobs
- import (
- "context"
- "errors"
- "fmt"
- "gadmin/config"
- "gadmin/internal/gorm/model"
- "gadmin/internal/gorm/query"
- "gadmin/package/platformPay"
- "github.com/jinzhu/now"
- "github.com/nahid/gohttp"
- "github.com/sirupsen/logrus"
- "gorm.io/gorm"
- "os"
- "strconv"
- "time"
- )
- //拉取客服聊天记录
- type ChatRecord struct {
- RecordList []record `json:"recordlist,omitempty"`
- Number int `json:"number,omitempty"`
- MsgId int `json:"msgid,omitempty"`
- }
- type record struct {
- OpenId string `json:"openid,omitempty"`
- OperCode int `json:"opercode,omitempty"`
- Text string `json:"text,omitempty"`
- Time int64 `json:"time,omitempty"`
- Worker string `json:"worker,omitempty"`
- }
- var SyncCustomerServiceChatLog = new(JobCustomerServiceChatLog)
- type JobCustomerServiceChatLog struct{}
- func (j *JobCustomerServiceChatLog) Run() {
- if os.Getenv("ADMIN_PLATFORM") != "wechat" {
- logrus.Warn("客服聊天记录目前只有微信平台")
- return
- }
- logrus.Info("保存客服聊天记录开始...")
- var (
- ctx = context.Background()
- chatLogQuery = query.Use(config.DB).CustomerServiceChatLog
- chatLogModel = chatLogQuery.WithContext(ctx)
- startDay = now.With(time.Now().AddDate(0, -2, 0))
- startTime = startDay.BeginningOfDay().Unix()
- endTime = startDay.BeginningOfDay().Add(time.Hour * 24).Unix()
- msgId = 1
- )
- //获取最后一次拉取时间点
- lastLog, err := chatLogModel.Order(chatLogQuery.Time.Desc()).First()
- if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
- logrus.Errorf("JobCustomerServiceChatLog 获取mysql客服记录失败:%s", err.Error())
- return
- }
- if lastLog != nil {
- startTime = lastLog.Time.Unix()
- endTime = time.Unix(startTime, 0).Add(time.Hour * 24).Unix()
- }
- //获取聊天记录
- if config.LogRedis.Exists("chatLog:wx").Val() == 0 {
- config.LogRedis.HMSet("chatLog:wx", map[string]interface{}{
- "startTime": startTime,
- "endTime": endTime,
- "msgId": msgId,
- })
- config.LogRedis.Expire("chatLog:wx", time.Minute*25)
- }
- //处理数据
- dataProcess := func(recordList []record) []*model.CustomerServiceChatLog {
- openIdMap := make(map[string]struct{})
- saveModels := make([]*model.CustomerServiceChatLog, len(recordList))
- playersMap := make(map[string]model.UserAccount)
- for _, item := range recordList {
- var player model.UserAccount
- openId := fmt.Sprintf("%s@%s", item.OpenId, "wx")
- if _, ok := openIdMap[openId]; ok {
- continue
- }
- openIdMap[openId] = struct{}{}
- err := config.LDB.WithContext(ctx).Scopes(model.UserAccountTableByKey(&model.UserAccount{}, openId)).Where("openid = ?", openId).First(&player).Error
- if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
- logrus.Errorf("JobCustomerServiceChatLog 查询用户信息失败: %v", err)
- return nil
- }
- playersMap[openId] = player
- }
- for i, item := range recordList {
- openId := fmt.Sprintf("%s@%s", item.OpenId, "wx")
- //客服、玩家
- operType := 1
- if item.OperCode == 2002 {
- operType = 2
- }
- player := playersMap[openId]
- saveModel := &model.CustomerServiceChatLog{
- Type: int32(operType),
- AccID: player.AccID,
- OpenID: openId,
- PlayerName: player.Nickname,
- Text: item.Text,
- Time: time.Unix(item.Time, 0),
- CreatedAt: time.Now(),
- }
- saveModels[i] = saveModel
- }
- return saveModels
- }
- for config.LogRedis.Exists("chatLog:wx").Val() == 1 {
- params := config.LogRedis.HGetAll("chatLog:wx").Val()
- startTime, _ = strconv.ParseInt(params["startTime"], 10, 64)
- endTime, _ = strconv.ParseInt(params["endTime"], 10, 64)
- msgId, _ = strconv.Atoi(params["msgId"])
- logRecord, err := j.getChatLog(startTime, endTime, msgId, 10000)
- if err != nil {
- logrus.Errorf("JobCustomerServiceChatLog 获取客服记录失败:%s", err.Error())
- return
- }
- if len(logRecord.RecordList) == 0 {
- continue
- }
- saveModels := dataProcess(logRecord.RecordList)
- err = chatLogModel.CreateInBatches(saveModels, 1000)
- if err != nil {
- logrus.Errorf("JobCustomerServiceChatLog 保存客服聊天记录失败: %v", err)
- return
- }
- }
- logrus.Info("保存客服聊天记录完成...")
- }
- func (j *JobCustomerServiceChatLog) getChatLog(startTime, endTime int64, msgId, number int) (ChatRecord, error) {
- var resBody ChatRecord
- //获取accessToken
- token, _, err := platformPay.QueryAccessToken(1, 0)
- if err != nil {
- return resBody, fmt.Errorf("获取AccessToken失败: %s", err.Error())
- }
- url := fmt.Sprintf("%s?access_token=%s", "https://api.weixin.qq.com/customservice/msgrecord/getmsglist", token)
- http := gohttp.NewRequest()
- formData := make(map[string]interface{})
- formData["starttime"] = startTime
- formData["endtime"] = endTime
- formData["msgid"] = msgId
- formData["number"] = number
- post, err := http.JSON(formData).Post(url)
- if err != nil {
- return resBody, fmt.Errorf("获取客服聊天记录失败: %s", err.Error())
- }
- err = post.UnmarshalBody(&resBody)
- if err != nil {
- return resBody, fmt.Errorf("解析客服聊天记录失败: %s", err.Error())
- }
- //msgId入redis
- if number == resBody.Number {
- rdsChatLog := make(map[string]interface{})
- rdsChatLog["msgId"] = resBody.MsgId
- rdsChatLog["startTime"] = startTime
- rdsChatLog["endTime"] = endTime
- config.LogRedis.HMSet("chatLog:wx", rdsChatLog)
- config.LogRedis.Expire("chatLog:wx", time.Minute*25)
- } else if len(resBody.RecordList) == 0 {
- startTimeDay := time.Unix(startTime, 0)
- today := time.Now()
- d1 := time.Date(startTimeDay.Year(), startTimeDay.Month(), startTimeDay.Day(), 0, 0, 0, 0, time.Local)
- d2 := time.Date(today.Year(), today.Month(), today.Day(), 0, 0, 0, 0, time.Local)
- if d1.Equal(d2) {
- //已经同步到今天了 并且没有数据 则删除key
- config.LogRedis.Del("chatLog:wx")
- } else {
- rdsChatLog := make(map[string]interface{})
- startDay := startTimeDay.Add(time.Hour * 24)
- rdsChatLog["startTime"] = startDay.Unix()
- rdsChatLog["endTime"] = startDay.Add(time.Hour * 24).Unix()
- config.LogRedis.HMSet("chatLog:wx", rdsChatLog)
- config.LogRedis.Expire("chatLog:wx", time.Minute*25)
- }
- } else {
- config.LogRedis.Del("chatLog:wx")
- }
- return resBody, nil
- }
|