timequeue.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. // 最小堆存放数据项
  2. // 并实现了一个时序队列,支持添加、删除、修改等操作
  3. // 修改时间:2024-12-26
  4. package smallheap
  5. import (
  6. "context"
  7. "leafstalk/log"
  8. "sync"
  9. "time"
  10. )
  11. // 排序、唯一KEY都是int64
  12. type Int64SortObject[T any] struct {
  13. index int
  14. SortKey int64
  15. uniqueKey int64
  16. val T
  17. }
  18. func (t *Int64SortObject[T]) GetSortKey() int64 {
  19. return t.SortKey
  20. }
  21. func (t *Int64SortObject[T]) SetIndex(index int) {
  22. t.index = index
  23. }
  24. func (t *Int64SortObject[T]) GetIndex() int {
  25. return t.index
  26. }
  27. func (t *Int64SortObject[T]) GetUniqueKey() int64 {
  28. return t.uniqueKey
  29. }
  30. // 时序队列
  31. type TimeOrderQueue[T2 any] struct {
  32. allObjects *SmallHeap[*Int64SortObject[T2]]
  33. callBack func(t T2)
  34. wg sync.WaitGroup
  35. cancel context.CancelFunc
  36. }
  37. func NewTimeOrderQueue[T2 any](checkPeriod time.Duration, callBack func(t T2)) *TimeOrderQueue[T2] {
  38. df := new(TimeOrderQueue[T2])
  39. df.allObjects = NewSmallHeap[*Int64SortObject[T2]]()
  40. ctx, cancel := context.WithCancel(context.Background())
  41. df.cancel = cancel
  42. df.callBack = callBack
  43. df.wg.Add(1)
  44. go func() {
  45. defer df.wg.Done()
  46. df.checkOnTime(ctx, checkPeriod)
  47. }()
  48. return df
  49. }
  50. func (d *TimeOrderQueue[T2]) Stop() {
  51. // 关闭存储协程
  52. if d.cancel != nil {
  53. d.cancel()
  54. d.cancel = nil
  55. }
  56. d.wg.Wait()
  57. }
  58. func (d *TimeOrderQueue[T2]) GetAll() map[int64]T2 {
  59. res := make(map[int64]T2)
  60. for k, v := range d.allObjects.QueryTable {
  61. res[k] = v.val
  62. }
  63. return res
  64. }
  65. // func (d *TimeOrderQueue[T2]) NewId() int64 {
  66. // return d.allObjects.NewId()
  67. // }
  68. // 添加定时触发对象
  69. // 要求objId运行期间必须唯一
  70. func (d *TimeOrderQueue[T2]) AddTimeOrderObject(delay int64, obj T2, objId int64) int64 {
  71. ct := time.Now().Unix()
  72. ct += delay
  73. // if objId == 0 {
  74. // objId = d.NewId()
  75. // }
  76. it := new(Int64SortObject[T2])
  77. it.SortKey = ct
  78. it.val = obj
  79. it.uniqueKey = objId
  80. d.allObjects.Add(it)
  81. return objId
  82. }
  83. func (d *TimeOrderQueue[T2]) RemoveTimeOrderObject(id int64) T2 {
  84. t := d.allObjects.RemoveByUniqueKey(id)
  85. return t.val
  86. }
  87. // 更新延迟时间,从调用函数时开始计算
  88. func (d *TimeOrderQueue[T2]) UpdateDelayTime(id int64, delay int64) bool {
  89. it, ok := d.allObjects.GetByUniqueKey(id)
  90. if !ok {
  91. return ok
  92. }
  93. ct := time.Now().Unix()
  94. ct += delay
  95. it.SortKey = ct
  96. return d.allObjects.Resort(it)
  97. }
  98. func (d *TimeOrderQueue[T2]) checkOnTime(ctx context.Context, period time.Duration) {
  99. safeCall := func(t T2) {
  100. if d.callBack == nil {
  101. return
  102. }
  103. defer func() {
  104. if err := recover(); err != nil {
  105. log.Warnf("checkTimeout safeCall error. %v", err)
  106. }
  107. }()
  108. d.callBack(t)
  109. }
  110. if period < time.Millisecond {
  111. period = 50 * time.Millisecond
  112. }
  113. ticker := time.NewTicker(period)
  114. for {
  115. var ct time.Time
  116. select {
  117. case <-ctx.Done():
  118. ticker.Stop()
  119. log.Warnf("checkTimeout exit.")
  120. return
  121. case ct = <-ticker.C:
  122. }
  123. cst := ct.Unix()
  124. for {
  125. // 是否到时间
  126. t, err := d.allObjects.Get(0)
  127. if err != nil {
  128. break
  129. }
  130. if t.SortKey > cst {
  131. break
  132. }
  133. // 取第一个
  134. d.allObjects.Remove(0)
  135. safeCall(t.val)
  136. }
  137. }
  138. }