cron.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package server
  2. import (
  3. "gadmin/internal/admin/forms"
  4. "gadmin/internal/admin/service"
  5. "github.com/gin-gonic/gin"
  6. "github.com/robfig/cron/v3"
  7. "net/http/httptest"
  8. "sync"
  9. "time"
  10. )
  11. var cronTab *cron.Cron
  12. func RegisterJob() {
  13. //cronTab.AddFunc("@every 300s", RefreshConsumptionStatistics) // 每300秒刷新一次消费统计缓存
  14. cronTab.AddFunc("@every 10s", CheckNotice) // 每10秒检查一次是否有需要推送的广播
  15. cronTab.AddFunc("@every 10s", CheckEmail) // 每10秒检查一次是否有需要推送的邮件
  16. }
  17. func cronStart() {
  18. cronTab = cron.New()
  19. RegisterJob()
  20. cronTab.Start()
  21. }
  22. var (
  23. checkNoticeLock sync.Mutex
  24. checkEmailLock sync.Mutex
  25. )
  26. func CheckEmail() {
  27. /* ok := checkEmailLock.TryLock()
  28. if !ok {
  29. logrus.Info("CheckNotice进程正在运行,跳过...")
  30. return
  31. }
  32. defer checkEmailLock.Unlock()
  33. var (
  34. ctx = context.TODO()
  35. q = query.Use(config.DB).AdminEmail
  36. lists []*model.AdminEmail
  37. now = time.Now().Unix()
  38. err error
  39. )
  40. if err = q.WithContext(ctx).Where(q.Status.In(consts.EmailStatusWait)).Where(q.SendAt.Lte(now)).Order(q.ID.Desc()).Scan(&lists); err != nil {
  41. logrus.Warnf("CheckEmail Scan lists err:%+v", err)
  42. return
  43. }
  44. if len(lists) == 0 {
  45. return
  46. }
  47. for _, v := range lists {
  48. logrus.Infof("CheckEmail SendEmailToWorld data:%+v", utility.DumpToJSON(v))
  49. var q2 = query.Use(config.DB).AdminEmail
  50. err = service.AdminEmail.SendEmailToWorld(v)
  51. if err != nil {
  52. logrus.Warnf("CheckEmail 邮件发送失败:%+v", err)
  53. _, err2 := q2.WithContext(ctx).Where(q2.ID.Eq(v.ID)).Updates(model.AdminEmail{Status: consts.EmailStatusErr})
  54. if err2 != nil {
  55. logrus.Warnf("CheckEmail 邮件更新失败:%+v", err2)
  56. }
  57. continue
  58. }
  59. // 发送成功
  60. _, err = q2.WithContext(ctx).Where(q2.ID.Eq(v.ID)).Updates(model.AdminEmail{
  61. Status: consts.EmailStatusSent,
  62. SendAt: time.Now().Unix(),
  63. })
  64. if err != nil {
  65. logrus.Warnf("CheckEmail 邮件更新失败-2:%+v", err)
  66. }
  67. }*/
  68. }
  69. func CheckNotice() {
  70. /* ok := checkNoticeLock.TryLock()
  71. if !ok {
  72. logrus.Info("CheckNotice进程正在运行,跳过...")
  73. return
  74. }
  75. defer checkNoticeLock.Unlock()
  76. var (
  77. ctx = context.TODO()
  78. q = query.Use(config.DB).AdminNotice
  79. lists []*model.AdminNotice
  80. err error
  81. normLabel = "stopServerNotice" // 普通广播
  82. stopServerLabel = "stopServerNoticeEnd" // 最后一次停服广播
  83. allServerIds = service.ServerOption.GetServerIds()
  84. )
  85. err = q.WithContext(ctx).
  86. Where(q.Environment.Eq(os.Getenv("GIN_MODE"))).
  87. Where(q.Status.In(consts.NoticeStatusRunning, consts.NoticeStatusNotStarted)).
  88. Order(q.ID.Desc()).Scan(&lists)
  89. if err != nil {
  90. logrus.Warnf("CheckNotice Scan lists err:%+v", err)
  91. return
  92. }
  93. if len(lists) == 0 {
  94. //logrus.Info("CheckNotice 没有正在运行的广播..")
  95. return
  96. }
  97. updateStatus := func(models *model.AdminNotice) {
  98. _, err = query.Use(config.DB).AdminNotice.WithContext(ctx).
  99. Where(q.ID.Eq(models.ID)).
  100. Where(q.Environment.Eq(os.Getenv("GIN_MODE"))).
  101. Updates(&model.AdminNotice{LastSendAt: models.LastSendAt, Status: models.Status, UpdatedAt: time.Now().Unix()})
  102. if err != nil {
  103. logrus.Warnf("CheckNotice updateStatus err:%+v", err)
  104. return
  105. }
  106. return
  107. }
  108. sendToWorld := func(serverIds []int, msgId, typ, content string) {
  109. for _, sId := range serverIds {
  110. DB, err := player.GetDBByServerID(sId)
  111. if err != nil {
  112. logrus.Warnf("CheckNotice GetDBByServerID2 GetDBByServerID err:%+v", err)
  113. continue
  114. }
  115. msgItem := msg.GmPlacard{
  116. MsgId: msgId,
  117. Type: typ,
  118. Content: content,
  119. }
  120. //err = gate.SendToWorld(0, &msgItem)
  121. var resp *msg.ResponseGmPlacard
  122. res, err := config.GmNats.GmRequest(DB, "GmPlacard", msgItem)
  123. if err != nil {
  124. logrus.Warnf("CheckNotice GmRequest err:%+v", err)
  125. continue
  126. }
  127. if err = json.Unmarshal(res, &resp); err != nil {
  128. logrus.Warnf("CheckNotice nats Unmarshal err:%+v", err)
  129. continue
  130. }
  131. logrus.Infof("CheckNotice sendToWorld ok!! DB:%v, msg:%+v err:%+v", DB, msgItem, err)
  132. }
  133. return
  134. }
  135. for _, v := range lists {
  136. now := time.Now().Unix()
  137. var serverIds []int
  138. if v.ServerIds == "" {
  139. serverIds = allServerIds
  140. } else {
  141. if err := json.Unmarshal([]byte(v.ServerIds), &serverIds); err != nil {
  142. logrus.Warnf("CheckNotice Unmarshal err:%+v", err)
  143. continue
  144. }
  145. // 所有服务器
  146. for _, serverId := range serverIds {
  147. if serverId == 0 {
  148. serverIds = allServerIds
  149. break
  150. }
  151. }
  152. }
  153. // 未开始的广播
  154. if v.StartAt > now {
  155. continue
  156. }
  157. // 如果最后时间小于当前时间,不执行了
  158. if v.EndAt < now {
  159. v.Status = consts.NoticeStatusStopped
  160. // 关服广播
  161. if v.NoticeType == consts.NoticeTypeStopSev {
  162. sendToWorld(serverIds, v.MsgID, stopServerLabel, "")
  163. }
  164. updateStatus(v)
  165. continue
  166. }
  167. // 判断是否满足发送间隔
  168. if v.LastSendAt+v.SendInterval*60 < now {
  169. v.LastSendAt = now
  170. // 发送广播
  171. sendToWorld(serverIds, v.MsgID, normLabel, v.Content)
  172. if consts.NoticeStatusNotStarted == v.Status {
  173. v.Status = consts.NoticeStatusRunning
  174. }
  175. updateStatus(v)
  176. continue
  177. }
  178. }*/
  179. }
  180. func RefreshConsumptionStatistics() {
  181. var (
  182. ctx, _ = gin.CreateTestContext(httptest.NewRecorder())
  183. currentTime = time.Now()
  184. endTime = time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), 0, 0, 0, 0, currentTime.Location()).UnixMilli()
  185. startTime = endTime - 86400000*7
  186. endDate = time.UnixMilli(endTime).Format("2006-01-02")
  187. startDate = time.UnixMilli(startTime).Format("2006-01-02")
  188. req forms.ConsumptionStatisticsReq
  189. )
  190. req = forms.ConsumptionStatisticsReq{
  191. ListReq: forms.ListReq{Page: 1, PerPage: 15}, Type: 1, Source: 0,
  192. //Createtime: []int64{1662566400000, 1663171200000},
  193. //Date: []string{"2022-09-08", "2022-09-15"},
  194. }
  195. req.Createtime = []int64{startTime, endTime}
  196. req.Date = []string{startDate, endDate}
  197. // 钻石
  198. service.ChangedLogs.Statistics(ctx, req)
  199. // 金币
  200. req.Type = 2
  201. service.ChangedLogs.Statistics(ctx, req)
  202. }