timer.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. // 最小堆存放数据项
  2. // 并实现了一个时序队列,支持添加、删除、修改等操作
  3. // 修改时间:2024-12-26
  4. package smallheap
  5. import (
  6. "container/heap"
  7. "context"
  8. "leafstalk/log"
  9. "leafstalk/otherutils/snowflake"
  10. "sync"
  11. "time"
  12. )
  13. type SortItem interface {
  14. SetIndex(index int)
  15. GetIndex() int
  16. GetSortKey() int64
  17. GetUniqueKey() int64
  18. }
  19. type SmallHeap[T SortItem] struct {
  20. Items []T
  21. QueryTable map[int64]T
  22. sequence *snowflake.Node //1MS生成400万个,不分节点
  23. }
  24. func (h SmallHeap[T]) Len() int {
  25. return len(h.Items)
  26. }
  27. func (h SmallHeap[T]) Less(i, j int) bool {
  28. return h.Items[i].GetSortKey() < h.Items[j].GetSortKey()
  29. }
  30. func (h SmallHeap[T]) Swap(i, j int) {
  31. h.Items[i], h.Items[j] = h.Items[j], h.Items[i]
  32. h.Items[i].SetIndex(i)
  33. h.Items[j].SetIndex(j)
  34. }
  35. func (h *SmallHeap[T]) Push(x interface{}) {
  36. v, ok := x.(T)
  37. if ok {
  38. v.SetIndex(len(h.Items))
  39. h.Items = append(h.Items, v)
  40. }
  41. }
  42. func (h *SmallHeap[T]) Pop() interface{} {
  43. n := len(h.Items)
  44. x := h.Items[n-1]
  45. h.Items = h.Items[:n-1]
  46. return x
  47. }
  48. func NewSmallHeap[T SortItem]() *SmallHeap[T] {
  49. sh := new(SmallHeap[T])
  50. sh.Items = make([]T, 0, 16)
  51. sh.QueryTable = make(map[int64]T)
  52. seq, err := snowflake.NewMsNode(0, 0)
  53. if err != nil {
  54. panic("NewSmallHeap init sequence error")
  55. return nil
  56. }
  57. sh.sequence = seq
  58. heap.Init(sh)
  59. return sh
  60. }
  61. func (h *SmallHeap[T]) Clear() {
  62. h.Items = make([]T, 0)
  63. h.QueryTable = make(map[int64]T)
  64. }
  65. // 使用接口
  66. // 添加
  67. func (h *SmallHeap[T]) Add(it T) {
  68. heap.Push(h, it)
  69. k := it.GetUniqueKey()
  70. h.QueryTable[k] = it
  71. }
  72. // 删除
  73. func (h *SmallHeap[T]) Remove(index int) T {
  74. if index < 0 || index >= h.Len() {
  75. var t T
  76. return t
  77. }
  78. it := heap.Remove(h, index)
  79. v, ok := it.(T)
  80. if ok {
  81. v.SetIndex(-1)
  82. delete(h.QueryTable, v.GetUniqueKey())
  83. return v
  84. }
  85. var t T
  86. return t
  87. }
  88. func (h *SmallHeap[T]) RemoveByUniqueKey(uniqueKey int64) T {
  89. v, ok := h.QueryTable[uniqueKey]
  90. if !ok {
  91. var t T
  92. return t
  93. }
  94. return h.Remove(v.GetIndex())
  95. }
  96. // 修改
  97. func (h *SmallHeap[T]) Update(t1 T) bool {
  98. old, ok := h.GetByUniqueKey(t1.GetUniqueKey())
  99. if ok {
  100. heap.Fix(h, old.GetIndex())
  101. return true
  102. }
  103. return false
  104. }
  105. // 查看第一个
  106. func (h *SmallHeap[T]) Get(index int) T {
  107. if index >= 0 && index < h.Len() {
  108. return h.Items[index]
  109. }
  110. var t T
  111. return t
  112. }
  113. func (h *SmallHeap[T]) GetByUniqueKey(uniqueKey int64) (T, bool) {
  114. v, ok := h.QueryTable[uniqueKey]
  115. return v, ok
  116. }
  117. // 生成唯一ID,1MS生成400万个
  118. func (h *SmallHeap[T]) NewId() int64 {
  119. return h.sequence.Generate().Int64()
  120. }
  121. // 排序、唯一KEY都是int64
  122. type Int64SortObject[T any] struct {
  123. index int
  124. SortKey int64
  125. uniqueKey int64
  126. val T
  127. }
  128. func (t *Int64SortObject[T]) GetSortKey() int64 {
  129. return t.SortKey
  130. }
  131. func (t *Int64SortObject[T]) SetIndex(index int) {
  132. t.index = index
  133. }
  134. func (t *Int64SortObject[T]) GetIndex() int {
  135. return t.index
  136. }
  137. func (t *Int64SortObject[T]) GetUniqueKey() int64 {
  138. return t.uniqueKey
  139. }
  140. // 时序队列
  141. type TimeOrderQueue[T2 any] struct {
  142. allObjects *SmallHeap[*Int64SortObject[T2]]
  143. callBack func(t T2)
  144. wg sync.WaitGroup
  145. cancel context.CancelFunc
  146. }
  147. func NewTimeOrderQueue[T2 any](checkPeriod time.Duration, callBack func(t T2)) *TimeOrderQueue[T2] {
  148. df := new(TimeOrderQueue[T2])
  149. df.allObjects = NewSmallHeap[*Int64SortObject[T2]]()
  150. ctx, cancel := context.WithCancel(context.Background())
  151. df.cancel = cancel
  152. df.callBack = callBack
  153. df.wg.Add(1)
  154. go func() {
  155. defer df.wg.Done()
  156. df.checkOnTime(ctx, checkPeriod)
  157. }()
  158. return df
  159. }
  160. func (d *TimeOrderQueue[T2]) Stop() {
  161. // 关闭存储协程
  162. if d.cancel != nil {
  163. d.cancel()
  164. d.cancel = nil
  165. }
  166. d.wg.Wait()
  167. }
  168. func (d *TimeOrderQueue[T2]) GetAll() map[int64]T2 {
  169. res := make(map[int64]T2)
  170. for k, v := range d.allObjects.QueryTable {
  171. res[k] = v.val
  172. }
  173. return res
  174. }
  175. func (d *TimeOrderQueue[T2]) NewId() int64 {
  176. return d.allObjects.NewId()
  177. }
  178. // 添加定时触发对象
  179. // 要求objId运行期间必须唯一
  180. func (d *TimeOrderQueue[T2]) AddTimeOrderObject(delay int64, obj T2, objId int64) int64 {
  181. ct := time.Now().Unix()
  182. ct += delay
  183. if objId == 0 {
  184. objId = d.NewId()
  185. }
  186. it := new(Int64SortObject[T2])
  187. it.SortKey = ct
  188. it.val = obj
  189. it.uniqueKey = objId
  190. d.allObjects.Add(it)
  191. return objId
  192. }
  193. func (d *TimeOrderQueue[T2]) RemoveTimeOrderObject(id int64) T2 {
  194. t := d.allObjects.RemoveByUniqueKey(id)
  195. return t.val
  196. }
  197. // 更新延迟时间,从调用函数时开始计算
  198. func (d *TimeOrderQueue[T2]) UpdateDelayTime(id int64, delay int64) bool {
  199. it, ok := d.allObjects.GetByUniqueKey(id)
  200. if !ok {
  201. return ok
  202. }
  203. ct := time.Now().Unix()
  204. ct += delay
  205. it.SortKey = ct
  206. return d.allObjects.Update(it)
  207. }
  208. func (d *TimeOrderQueue[T2]) checkOnTime(ctx context.Context, period time.Duration) {
  209. safeCall := func(t T2) {
  210. if d.callBack == nil {
  211. return
  212. }
  213. defer func() {
  214. if err := recover(); err != nil {
  215. log.Warnf("checkTimeout safeCall error. %v", err)
  216. }
  217. }()
  218. d.callBack(t)
  219. }
  220. if period < time.Millisecond {
  221. period = 50 * time.Millisecond
  222. }
  223. ticker := time.NewTicker(period)
  224. for {
  225. var ct time.Time
  226. select {
  227. case <-ctx.Done():
  228. ticker.Stop()
  229. log.Warnf("checkTimeout exit.")
  230. return
  231. case ct = <-ticker.C:
  232. }
  233. cst := ct.Unix()
  234. for {
  235. // 是否到时间
  236. t := d.allObjects.Get(0)
  237. if t == nil {
  238. break
  239. }
  240. if t.SortKey > cst {
  241. break
  242. }
  243. // 取第一个
  244. d.allObjects.Remove(0)
  245. safeCall(t.val)
  246. }
  247. }
  248. }