keyqueue.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. // 最小堆存放数据项
  2. // 并实现了一个时序队列,支持添加、删除、修改等操作
  3. // 修改时间:2025-03-29
  4. package smallheap
  5. import "leafstalk/log"
  6. type KeyItem interface {
  7. GetKey() int64
  8. }
  9. // 排序、唯一KEY都是int64
  10. type QueueItem[T KeyItem] struct {
  11. index int
  12. sortVal int64
  13. val T
  14. }
  15. func (t *QueueItem[T]) GetSortValue() int64 {
  16. return t.sortVal
  17. }
  18. func (t *QueueItem[T]) SetSortValue(val int64) {
  19. t.sortVal = val
  20. }
  21. func (t *QueueItem[T]) SetIndex(index int) {
  22. t.index = index
  23. }
  24. func (t *QueueItem[T]) GetIndex() int {
  25. return t.index
  26. }
  27. func (t *QueueItem[T]) GetUniqueKey() int64 {
  28. return t.val.GetKey()
  29. }
  30. // 时序队列
  31. type OrderQueue[T2 KeyItem] struct {
  32. SmallHeap *SmallHeap[*QueueItem[T2]]
  33. }
  34. func NewOrderQueue[T2 KeyItem]() *OrderQueue[T2] {
  35. it := new(OrderQueue[T2])
  36. it.SmallHeap = NewSmallHeap[*QueueItem[T2]]()
  37. return it
  38. }
  39. // 添加定时触发对象
  40. // 要求objId运行期间必须唯一
  41. func (d *OrderQueue[T2]) Add(obj T2, tick int64) int64 {
  42. it := new(QueueItem[T2])
  43. it.sortVal = tick
  44. it.val = obj
  45. d.SmallHeap.Add(it)
  46. return obj.GetKey()
  47. }
  48. func (d *OrderQueue[T2]) Set(obj T2, tick int64) {
  49. t, ok := d.SmallHeap.GetByUniqueKey(obj.GetKey())
  50. if ok {
  51. t.val = obj
  52. return
  53. }
  54. it := new(QueueItem[T2])
  55. it.sortVal = tick
  56. it.val = obj
  57. d.SmallHeap.Add(it)
  58. }
  59. func (d *OrderQueue[T2]) SetSortVal(key int64, tick int64) {
  60. d.SmallHeap.UpdateSortVal(key, tick)
  61. }
  62. func (d *OrderQueue[T2]) Update(obj T2, tick int64) {
  63. t, ok := d.SmallHeap.GetByUniqueKey(obj.GetKey())
  64. if ok {
  65. t.val = obj
  66. t.sortVal = tick
  67. d.SmallHeap.UpdateSortVal(obj.GetKey(), tick)
  68. return
  69. }
  70. it := new(QueueItem[T2])
  71. it.sortVal = tick
  72. it.val = obj
  73. d.SmallHeap.Add(it)
  74. }
  75. func (d *OrderQueue[T2]) Get(key int64) (T2, bool) {
  76. t, ok := d.SmallHeap.GetByUniqueKey(key)
  77. if ok {
  78. return t.val, true
  79. }
  80. var tv T2
  81. return tv, false
  82. }
  83. func (d *OrderQueue[T2]) Remove(id int64) (T2, error) {
  84. t, err := d.SmallHeap.RemoveByUniqueKey(id)
  85. if err != nil {
  86. var t T2
  87. return t, err
  88. }
  89. return t.val, nil
  90. }
  91. func (d *OrderQueue[T2]) Pop() (T2, error) {
  92. t, err := d.SmallHeap.Remove(0)
  93. if err != nil {
  94. var t T2
  95. return t, err
  96. }
  97. return t.val, nil
  98. }
  99. func (d *OrderQueue[T2]) PeekFirst() (T2, int64, error) {
  100. t, err := d.SmallHeap.Get(0)
  101. if err != nil {
  102. var t T2
  103. return t, 0, err
  104. }
  105. return t.val, t.sortVal, nil
  106. }
  107. func safeCall[T KeyItem](callBack func(T), t T) {
  108. defer func() {
  109. if err := recover(); err != nil {
  110. log.Warnf("checkTimeout safeCall error. %v", err)
  111. }
  112. }()
  113. callBack(t)
  114. }
  115. func (d *OrderQueue[T2]) ClearSmallHeadQueue(std int64, callBack func(T2)) {
  116. num := d.SmallHeap.Size()
  117. for i := 0; i < num; i += 1 {
  118. t, err := d.SmallHeap.Get(0)
  119. if err != nil {
  120. break
  121. }
  122. if t.sortVal <= std {
  123. d.SmallHeap.Remove(0)
  124. if callBack != nil {
  125. safeCall(callBack, t.val)
  126. }
  127. } else {
  128. break
  129. }
  130. }
  131. }
  132. func (d *OrderQueue[T2]) ClearBiglHeadQueue(std int64, callBack func(T2)) {
  133. num := d.SmallHeap.Size()
  134. for i := 0; i < num; i += 1 {
  135. t, err := d.SmallHeap.Get(0)
  136. if err != nil {
  137. break
  138. }
  139. if t.sortVal >= std {
  140. d.SmallHeap.Remove(0)
  141. if callBack != nil {
  142. safeCall(callBack, t.val)
  143. }
  144. } else {
  145. break
  146. }
  147. }
  148. }