123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- // 最小堆存放数据项
- // 并实现了一个时序队列,支持添加、删除、修改等操作
- // 修改时间:2024-12-26
- package smallheap
- import (
- "context"
- "leafstalk/log"
- "sync"
- "time"
- )
- // 排序、唯一KEY都是int64
- type Int64SortObject[T any] struct {
- index int
- SortKey int64
- uniqueKey int64
- val T
- }
- func (t *Int64SortObject[T]) GetSortKey() int64 {
- return t.SortKey
- }
- func (t *Int64SortObject[T]) SetIndex(index int) {
- t.index = index
- }
- func (t *Int64SortObject[T]) GetIndex() int {
- return t.index
- }
- func (t *Int64SortObject[T]) GetUniqueKey() int64 {
- return t.uniqueKey
- }
- // 时序队列
- type TimeOrderQueue[T2 any] struct {
- allObjects *SmallHeap[*Int64SortObject[T2]]
- callBack func(t T2)
- wg sync.WaitGroup
- cancel context.CancelFunc
- }
- func NewTimeOrderQueue[T2 any](checkPeriod time.Duration, callBack func(t T2)) *TimeOrderQueue[T2] {
- df := new(TimeOrderQueue[T2])
- df.allObjects = NewSmallHeap[*Int64SortObject[T2]]()
- ctx, cancel := context.WithCancel(context.Background())
- df.cancel = cancel
- df.callBack = callBack
- df.wg.Add(1)
- go func() {
- defer df.wg.Done()
- df.checkOnTime(ctx, checkPeriod)
- }()
- return df
- }
- func (d *TimeOrderQueue[T2]) Stop() {
- // 关闭存储协程
- if d.cancel != nil {
- d.cancel()
- d.cancel = nil
- }
- d.wg.Wait()
- }
- func (d *TimeOrderQueue[T2]) GetAll() map[int64]T2 {
- res := make(map[int64]T2)
- for k, v := range d.allObjects.QueryTable {
- res[k] = v.val
- }
- return res
- }
- // func (d *TimeOrderQueue[T2]) NewId() int64 {
- // return d.allObjects.NewId()
- // }
- // 添加定时触发对象
- // 要求objId运行期间必须唯一
- func (d *TimeOrderQueue[T2]) AddTimeOrderObject(delay int64, obj T2, objId int64) int64 {
- ct := time.Now().Unix()
- ct += delay
- // if objId == 0 {
- // objId = d.NewId()
- // }
- it := new(Int64SortObject[T2])
- it.SortKey = ct
- it.val = obj
- it.uniqueKey = objId
- d.allObjects.Add(it)
- return objId
- }
- func (d *TimeOrderQueue[T2]) RemoveTimeOrderObject(id int64) T2 {
- t := d.allObjects.RemoveByUniqueKey(id)
- return t.val
- }
- // 更新延迟时间,从调用函数时开始计算
- func (d *TimeOrderQueue[T2]) UpdateDelayTime(id int64, delay int64) bool {
- it, ok := d.allObjects.GetByUniqueKey(id)
- if !ok {
- return ok
- }
- ct := time.Now().Unix()
- ct += delay
- it.SortKey = ct
- return d.allObjects.Resort(it)
- }
- func (d *TimeOrderQueue[T2]) checkOnTime(ctx context.Context, period time.Duration) {
- safeCall := func(t T2) {
- if d.callBack == nil {
- return
- }
- defer func() {
- if err := recover(); err != nil {
- log.Warnf("checkTimeout safeCall error. %v", err)
- }
- }()
- d.callBack(t)
- }
- if period < time.Millisecond {
- period = 50 * time.Millisecond
- }
- ticker := time.NewTicker(period)
- for {
- var ct time.Time
- select {
- case <-ctx.Done():
- ticker.Stop()
- log.Warnf("checkTimeout exit.")
- return
- case ct = <-ticker.C:
- }
- cst := ct.Unix()
- for {
- // 是否到时间
- t, err := d.allObjects.Get(0)
- if err != nil {
- break
- }
- if t.SortKey > cst {
- break
- }
- // 取第一个
- d.allObjects.Remove(0)
- safeCall(t.val)
- }
- }
- }
|