// 最小堆存放数据项 // 并实现了一个时序队列,支持添加、删除、修改等操作 // 修改时间:2024-12-26 package smallheap import ( "context" "leafstalk/log" "sync" "time" ) // 排序、唯一KEY都是int64 type Int64SortObject[T any] struct { index int SortKey int64 uniqueKey int64 val T } func (t *Int64SortObject[T]) GetSortKey() int64 { return t.SortKey } func (t *Int64SortObject[T]) SetIndex(index int) { t.index = index } func (t *Int64SortObject[T]) GetIndex() int { return t.index } func (t *Int64SortObject[T]) GetUniqueKey() int64 { return t.uniqueKey } // 时序队列 type TimeOrderQueue[T2 any] struct { allObjects *SmallHeap[*Int64SortObject[T2]] callBack func(t T2) wg sync.WaitGroup cancel context.CancelFunc } func NewTimeOrderQueue[T2 any](checkPeriod time.Duration, callBack func(t T2)) *TimeOrderQueue[T2] { df := new(TimeOrderQueue[T2]) df.allObjects = NewSmallHeap[*Int64SortObject[T2]]() ctx, cancel := context.WithCancel(context.Background()) df.cancel = cancel df.callBack = callBack df.wg.Add(1) go func() { defer df.wg.Done() df.checkOnTime(ctx, checkPeriod) }() return df } func (d *TimeOrderQueue[T2]) Stop() { // 关闭存储协程 if d.cancel != nil { d.cancel() d.cancel = nil } d.wg.Wait() } func (d *TimeOrderQueue[T2]) GetAll() map[int64]T2 { res := make(map[int64]T2) for k, v := range d.allObjects.QueryTable { res[k] = v.val } return res } // func (d *TimeOrderQueue[T2]) NewId() int64 { // return d.allObjects.NewId() // } // 添加定时触发对象 // 要求objId运行期间必须唯一 func (d *TimeOrderQueue[T2]) AddTimeOrderObject(delay int64, obj T2, objId int64) int64 { ct := time.Now().Unix() ct += delay // if objId == 0 { // objId = d.NewId() // } it := new(Int64SortObject[T2]) it.SortKey = ct it.val = obj it.uniqueKey = objId d.allObjects.Add(it) return objId } func (d *TimeOrderQueue[T2]) RemoveTimeOrderObject(id int64) T2 { t := d.allObjects.RemoveByUniqueKey(id) return t.val } // 更新延迟时间,从调用函数时开始计算 func (d *TimeOrderQueue[T2]) UpdateDelayTime(id int64, delay int64) bool { it, ok := d.allObjects.GetByUniqueKey(id) if !ok { return ok } ct := time.Now().Unix() ct += delay it.SortKey = ct return d.allObjects.Resort(it) } func (d *TimeOrderQueue[T2]) checkOnTime(ctx context.Context, period time.Duration) { safeCall := func(t T2) { if d.callBack == nil { return } defer func() { if err := recover(); err != nil { log.Warnf("checkTimeout safeCall error. %v", err) } }() d.callBack(t) } if period < time.Millisecond { period = 50 * time.Millisecond } ticker := time.NewTicker(period) for { var ct time.Time select { case <-ctx.Done(): ticker.Stop() log.Warnf("checkTimeout exit.") return case ct = <-ticker.C: } cst := ct.Unix() for { // 是否到时间 t, err := d.allObjects.Get(0) if err != nil { break } if t.SortKey > cst { break } // 取第一个 d.allObjects.Remove(0) safeCall(t.val) } } }