sync_customer_service_chat_log.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package jobs
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "gadmin/config"
  7. "gadmin/internal/gorm/model"
  8. "gadmin/internal/gorm/query"
  9. "gadmin/package/platformPay"
  10. "github.com/jinzhu/now"
  11. "github.com/nahid/gohttp"
  12. "github.com/sirupsen/logrus"
  13. "gorm.io/gorm"
  14. "os"
  15. "strconv"
  16. "time"
  17. )
  18. //拉取客服聊天记录
  19. type ChatRecord struct {
  20. RecordList []record `json:"recordlist,omitempty"`
  21. Number int `json:"number,omitempty"`
  22. MsgId int `json:"msgid,omitempty"`
  23. }
  24. type record struct {
  25. OpenId string `json:"openid,omitempty"`
  26. OperCode int `json:"opercode,omitempty"`
  27. Text string `json:"text,omitempty"`
  28. Time int64 `json:"time,omitempty"`
  29. Worker string `json:"worker,omitempty"`
  30. }
  31. var SyncCustomerServiceChatLog = new(JobCustomerServiceChatLog)
  32. type JobCustomerServiceChatLog struct{}
  33. func (j *JobCustomerServiceChatLog) Run() {
  34. if os.Getenv("ADMIN_PLATFORM") != "wechat" {
  35. logrus.Warn("客服聊天记录目前只有微信平台")
  36. return
  37. }
  38. logrus.Info("保存客服聊天记录开始...")
  39. var (
  40. ctx = context.Background()
  41. chatLogQuery = query.Use(config.DB).CustomerServiceChatLog
  42. chatLogModel = chatLogQuery.WithContext(ctx)
  43. startDay = now.With(time.Now().AddDate(0, -2, 0))
  44. startTime = startDay.BeginningOfDay().Unix()
  45. endTime = startDay.BeginningOfDay().Add(time.Hour * 24).Unix()
  46. msgId = 1
  47. )
  48. //获取最后一次拉取时间点
  49. lastLog, err := chatLogModel.Order(chatLogQuery.Time.Desc()).First()
  50. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  51. logrus.Errorf("JobCustomerServiceChatLog 获取mysql客服记录失败:%s", err.Error())
  52. return
  53. }
  54. if lastLog != nil {
  55. startTime = lastLog.Time.Unix()
  56. endTime = time.Unix(startTime, 0).Add(time.Hour * 24).Unix()
  57. }
  58. //获取聊天记录
  59. if config.LogRedis.Exists("chatLog:wx").Val() == 0 {
  60. config.LogRedis.HMSet("chatLog:wx", map[string]interface{}{
  61. "startTime": startTime,
  62. "endTime": endTime,
  63. "msgId": msgId,
  64. })
  65. config.LogRedis.Expire("chatLog:wx", time.Minute*25)
  66. }
  67. //处理数据
  68. dataProcess := func(recordList []record) []*model.CustomerServiceChatLog {
  69. openIdMap := make(map[string]struct{})
  70. saveModels := make([]*model.CustomerServiceChatLog, len(recordList))
  71. playersMap := make(map[string]model.UserAccount)
  72. for _, item := range recordList {
  73. var player model.UserAccount
  74. openId := fmt.Sprintf("%s@%s", item.OpenId, "wx")
  75. if _, ok := openIdMap[openId]; ok {
  76. continue
  77. }
  78. openIdMap[openId] = struct{}{}
  79. err := config.LDB.WithContext(ctx).Scopes(model.UserAccountTableByKey(&model.UserAccount{}, openId)).Where("openid = ?", openId).First(&player).Error
  80. if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
  81. logrus.Errorf("JobCustomerServiceChatLog 查询用户信息失败: %v", err)
  82. return nil
  83. }
  84. playersMap[openId] = player
  85. }
  86. for i, item := range recordList {
  87. openId := fmt.Sprintf("%s@%s", item.OpenId, "wx")
  88. //客服、玩家
  89. operType := 1
  90. if item.OperCode == 2002 {
  91. operType = 2
  92. }
  93. player := playersMap[openId]
  94. saveModel := &model.CustomerServiceChatLog{
  95. Type: int32(operType),
  96. AccID: player.AccID,
  97. OpenID: openId,
  98. PlayerName: player.Nickname,
  99. Text: item.Text,
  100. Time: time.Unix(item.Time, 0),
  101. CreatedAt: time.Now(),
  102. }
  103. saveModels[i] = saveModel
  104. }
  105. return saveModels
  106. }
  107. for config.LogRedis.Exists("chatLog:wx").Val() == 1 {
  108. params := config.LogRedis.HGetAll("chatLog:wx").Val()
  109. startTime, _ = strconv.ParseInt(params["startTime"], 10, 64)
  110. endTime, _ = strconv.ParseInt(params["endTime"], 10, 64)
  111. msgId, _ = strconv.Atoi(params["msgId"])
  112. logRecord, err := j.getChatLog(startTime, endTime, msgId, 10000)
  113. if err != nil {
  114. logrus.Errorf("JobCustomerServiceChatLog 获取客服记录失败:%s", err.Error())
  115. return
  116. }
  117. if len(logRecord.RecordList) == 0 {
  118. continue
  119. }
  120. saveModels := dataProcess(logRecord.RecordList)
  121. err = chatLogModel.CreateInBatches(saveModels, 1000)
  122. if err != nil {
  123. logrus.Errorf("JobCustomerServiceChatLog 保存客服聊天记录失败: %v", err)
  124. return
  125. }
  126. }
  127. logrus.Info("保存客服聊天记录完成...")
  128. }
  129. func (j *JobCustomerServiceChatLog) getChatLog(startTime, endTime int64, msgId, number int) (ChatRecord, error) {
  130. var resBody ChatRecord
  131. //获取accessToken
  132. token, _, err := platformPay.QueryAccessToken(1, 0)
  133. if err != nil {
  134. return resBody, fmt.Errorf("获取AccessToken失败: %s", err.Error())
  135. }
  136. url := fmt.Sprintf("%s?access_token=%s", "https://api.weixin.qq.com/customservice/msgrecord/getmsglist", token)
  137. http := gohttp.NewRequest()
  138. formData := make(map[string]interface{})
  139. formData["starttime"] = startTime
  140. formData["endtime"] = endTime
  141. formData["msgid"] = msgId
  142. formData["number"] = number
  143. post, err := http.JSON(formData).Post(url)
  144. if err != nil {
  145. return resBody, fmt.Errorf("获取客服聊天记录失败: %s", err.Error())
  146. }
  147. err = post.UnmarshalBody(&resBody)
  148. if err != nil {
  149. return resBody, fmt.Errorf("解析客服聊天记录失败: %s", err.Error())
  150. }
  151. //msgId入redis
  152. if number == resBody.Number {
  153. rdsChatLog := make(map[string]interface{})
  154. rdsChatLog["msgId"] = resBody.MsgId
  155. rdsChatLog["startTime"] = startTime
  156. rdsChatLog["endTime"] = endTime
  157. config.LogRedis.HMSet("chatLog:wx", rdsChatLog)
  158. config.LogRedis.Expire("chatLog:wx", time.Minute*25)
  159. } else if len(resBody.RecordList) == 0 {
  160. startTimeDay := time.Unix(startTime, 0)
  161. today := time.Now()
  162. d1 := time.Date(startTimeDay.Year(), startTimeDay.Month(), startTimeDay.Day(), 0, 0, 0, 0, time.Local)
  163. d2 := time.Date(today.Year(), today.Month(), today.Day(), 0, 0, 0, 0, time.Local)
  164. if d1.Equal(d2) {
  165. //已经同步到今天了 并且没有数据 则删除key
  166. config.LogRedis.Del("chatLog:wx")
  167. } else {
  168. rdsChatLog := make(map[string]interface{})
  169. startDay := startTimeDay.Add(time.Hour * 24)
  170. rdsChatLog["startTime"] = startDay.Unix()
  171. rdsChatLog["endTime"] = startDay.Add(time.Hour * 24).Unix()
  172. config.LogRedis.HMSet("chatLog:wx", rdsChatLog)
  173. config.LogRedis.Expire("chatLog:wx", time.Minute*25)
  174. }
  175. } else {
  176. config.LogRedis.Del("chatLog:wx")
  177. }
  178. return resBody, nil
  179. }