// 最小堆存放数据项 // 并实现了一个时序队列,支持添加、删除、修改等操作 // 修改时间:2025-03-29 package smallheap import "leafstalk/log" type KeyItem interface { GetKey() int64 } // 排序、唯一KEY都是int64 type QueueItem[T KeyItem] struct { index int sortVal int64 val T } func (t *QueueItem[T]) GetSortValue() int64 { return t.sortVal } func (t *QueueItem[T]) SetSortValue(val int64) { t.sortVal = val } func (t *QueueItem[T]) SetIndex(index int) { t.index = index } func (t *QueueItem[T]) GetIndex() int { return t.index } func (t *QueueItem[T]) GetUniqueKey() int64 { return t.val.GetKey() } // 时序队列 type OrderQueue[T2 KeyItem] struct { SmallHeap *SmallHeap[*QueueItem[T2]] } func NewOrderQueue[T2 KeyItem]() *OrderQueue[T2] { it := new(OrderQueue[T2]) it.SmallHeap = NewSmallHeap[*QueueItem[T2]]() return it } // 添加定时触发对象 // 要求objId运行期间必须唯一 func (d *OrderQueue[T2]) Add(obj T2, tick int64) int64 { it := new(QueueItem[T2]) it.sortVal = tick it.val = obj d.SmallHeap.Add(it) return obj.GetKey() } func (d *OrderQueue[T2]) Set(obj T2, tick int64) { t, ok := d.SmallHeap.GetByUniqueKey(obj.GetKey()) if ok { t.val = obj return } it := new(QueueItem[T2]) it.sortVal = tick it.val = obj d.SmallHeap.Add(it) } func (d *OrderQueue[T2]) SetSortVal(key int64, tick int64) { d.SmallHeap.UpdateSortVal(key, tick) } func (d *OrderQueue[T2]) Update(obj T2, tick int64) { t, ok := d.SmallHeap.GetByUniqueKey(obj.GetKey()) if ok { t.val = obj t.sortVal = tick d.SmallHeap.UpdateSortVal(obj.GetKey(), tick) return } it := new(QueueItem[T2]) it.sortVal = tick it.val = obj d.SmallHeap.Add(it) } func (d *OrderQueue[T2]) Get(key int64) (T2, bool) { t, ok := d.SmallHeap.GetByUniqueKey(key) if ok { return t.val, true } var tv T2 return tv, false } func (d *OrderQueue[T2]) Remove(id int64) (T2, error) { t, err := d.SmallHeap.RemoveByUniqueKey(id) if err != nil { var t T2 return t, err } return t.val, nil } func (d *OrderQueue[T2]) Pop() (T2, error) { t, err := d.SmallHeap.Remove(0) if err != nil { var t T2 return t, err } return t.val, nil } func (d *OrderQueue[T2]) PeekFirst() (T2, int64, error) { t, err := d.SmallHeap.Get(0) if err != nil { var t T2 return t, 0, err } return t.val, t.sortVal, nil } func safeCall[T KeyItem](callBack func(T), t T) { defer func() { if err := recover(); err != nil { log.Warnf("checkTimeout safeCall error. %v", err) } }() callBack(t) } func (d *OrderQueue[T2]) ClearSmallHeadQueue(std int64, callBack func(T2)) { num := d.SmallHeap.Size() for i := 0; i < num; i += 1 { t, err := d.SmallHeap.Get(0) if err != nil { break } if t.sortVal <= std { d.SmallHeap.Remove(0) if callBack != nil { safeCall(callBack, t.val) } } else { break } } } func (d *OrderQueue[T2]) ClearBiglHeadQueue(std int64, callBack func(T2)) { num := d.SmallHeap.Size() for i := 0; i < num; i += 1 { t, err := d.SmallHeap.Get(0) if err != nil { break } if t.sortVal >= std { d.SmallHeap.Remove(0) if callBack != nil { safeCall(callBack, t.val) } } else { break } } }