timequeue.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. // 最小堆存放数据项
  2. // 并实现了一个时序队列,支持添加、删除、修改等操作
  3. // 修改时间:2024-12-26
  4. package smallheap
  5. import (
  6. "context"
  7. "leafstalk/log"
  8. "sync"
  9. "time"
  10. )
  11. // 排序、唯一KEY都是int64
  12. type Int64SortObject[T any] struct {
  13. index int
  14. SortKey int64
  15. uniqueKey int64
  16. val T
  17. }
  18. func (t *Int64SortObject[T]) GetSortValue() int64 {
  19. return t.SortKey
  20. }
  21. func (t *Int64SortObject[T]) SetSortValue(val int64) {
  22. t.SortKey = val
  23. }
  24. func (t *Int64SortObject[T]) SetIndex(index int) {
  25. t.index = index
  26. }
  27. func (t *Int64SortObject[T]) GetIndex() int {
  28. return t.index
  29. }
  30. func (t *Int64SortObject[T]) GetUniqueKey() int64 {
  31. return t.uniqueKey
  32. }
  33. // 时序队列
  34. type TimeOrderQueue[T2 any] struct {
  35. allObjects *SmallHeap[*Int64SortObject[T2]]
  36. callBack func(t T2)
  37. wg sync.WaitGroup
  38. cancel context.CancelFunc
  39. }
  40. func NewTimeOrderQueue[T2 any](checkPeriod time.Duration, callBack func(t T2)) *TimeOrderQueue[T2] {
  41. df := new(TimeOrderQueue[T2])
  42. df.allObjects = NewSmallHeap[*Int64SortObject[T2]]()
  43. ctx, cancel := context.WithCancel(context.Background())
  44. df.cancel = cancel
  45. df.callBack = callBack
  46. df.wg.Add(1)
  47. go func() {
  48. defer df.wg.Done()
  49. df.checkOnTime(ctx, checkPeriod)
  50. }()
  51. return df
  52. }
  53. func (d *TimeOrderQueue[T2]) Stop() {
  54. // 关闭存储协程
  55. if d.cancel != nil {
  56. d.cancel()
  57. d.cancel = nil
  58. }
  59. d.wg.Wait()
  60. }
  61. func (d *TimeOrderQueue[T2]) GetAll() map[int64]T2 {
  62. res := make(map[int64]T2)
  63. for k, v := range d.allObjects.QueryTable {
  64. res[k] = v.val
  65. }
  66. return res
  67. }
  68. // func (d *TimeOrderQueue[T2]) NewId() int64 {
  69. // return d.allObjects.NewId()
  70. // }
  71. // 添加定时触发对象
  72. // 要求objId运行期间必须唯一
  73. func (d *TimeOrderQueue[T2]) AddTimeOrderObject(delay int64, obj T2, objId int64) int64 {
  74. ct := time.Now().Unix()
  75. ct += delay
  76. // if objId == 0 {
  77. // objId = d.NewId()
  78. // }
  79. it := new(Int64SortObject[T2])
  80. it.SortKey = ct
  81. it.val = obj
  82. it.uniqueKey = objId
  83. d.allObjects.Add(it)
  84. return objId
  85. }
  86. func (d *TimeOrderQueue[T2]) RemoveTimeOrderObject(id int64) (T2, error) {
  87. t, err := d.allObjects.RemoveByUniqueKey(id)
  88. if err != nil {
  89. var t T2
  90. return t, err
  91. }
  92. return t.val, nil
  93. }
  94. // 更新延迟时间,从调用函数时开始计算
  95. func (d *TimeOrderQueue[T2]) UpdateDelayTime(id int64, delay int64) bool {
  96. ct := time.Now().Unix()
  97. ct += delay
  98. return d.allObjects.UpdateSortVal(id, ct)
  99. }
  100. func (d *TimeOrderQueue[T2]) checkOnTime(ctx context.Context, period time.Duration) {
  101. safeCall := func(t T2) {
  102. if d.callBack == nil {
  103. return
  104. }
  105. defer func() {
  106. if err := recover(); err != nil {
  107. log.Warnf("checkTimeout safeCall error. %v", err)
  108. }
  109. }()
  110. d.callBack(t)
  111. }
  112. if period < time.Millisecond {
  113. period = 50 * time.Millisecond
  114. }
  115. ticker := time.NewTicker(period)
  116. for {
  117. var ct time.Time
  118. select {
  119. case <-ctx.Done():
  120. ticker.Stop()
  121. log.Warnf("checkTimeout exit.")
  122. return
  123. case ct = <-ticker.C:
  124. }
  125. cst := ct.Unix()
  126. for {
  127. // 是否到时间
  128. t, err := d.allObjects.Get(0)
  129. if err != nil {
  130. break
  131. }
  132. if t.SortKey > cst {
  133. break
  134. }
  135. // 取第一个
  136. d.allObjects.Remove(0)
  137. safeCall(t.val)
  138. }
  139. }
  140. }