123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- // 最小堆存放数据项
- // 并实现了一个时序队列,支持添加、删除、修改等操作
- // 修改时间:2025-03-29
- package smallheap
- import "leafstalk/log"
- type KeyItem interface {
- GetKey() int64
- }
- // 排序、唯一KEY都是int64
- type QueueItem[T KeyItem] struct {
- index int
- sortVal int64
- val T
- }
- func (t *QueueItem[T]) GetSortValue() int64 {
- return t.sortVal
- }
- func (t *QueueItem[T]) SetSortValue(val int64) {
- t.sortVal = val
- }
- func (t *QueueItem[T]) SetIndex(index int) {
- t.index = index
- }
- func (t *QueueItem[T]) GetIndex() int {
- return t.index
- }
- func (t *QueueItem[T]) GetUniqueKey() int64 {
- return t.val.GetKey()
- }
- // 时序队列
- type OrderQueue[T2 KeyItem] struct {
- SmallHeap *SmallHeap[*QueueItem[T2]]
- }
- func NewOrderQueue[T2 KeyItem]() *OrderQueue[T2] {
- it := new(OrderQueue[T2])
- it.SmallHeap = NewSmallHeap[*QueueItem[T2]]()
- return it
- }
- // 添加定时触发对象
- // 要求objId运行期间必须唯一
- func (d *OrderQueue[T2]) Add(obj T2, tick int64) int64 {
- it := new(QueueItem[T2])
- it.sortVal = tick
- it.val = obj
- d.SmallHeap.Add(it)
- return obj.GetKey()
- }
- func (d *OrderQueue[T2]) Set(obj T2, tick int64) {
- t, ok := d.SmallHeap.GetByUniqueKey(obj.GetKey())
- if ok {
- t.val = obj
- return
- }
- it := new(QueueItem[T2])
- it.sortVal = tick
- it.val = obj
- d.SmallHeap.Add(it)
- }
- func (d *OrderQueue[T2]) SetSortVal(key int64, tick int64) {
- d.SmallHeap.UpdateSortVal(key, tick)
- }
- func (d *OrderQueue[T2]) Update(obj T2, tick int64) {
- t, ok := d.SmallHeap.GetByUniqueKey(obj.GetKey())
- if ok {
- t.val = obj
- t.sortVal = tick
- d.SmallHeap.UpdateSortVal(obj.GetKey(), tick)
- return
- }
- it := new(QueueItem[T2])
- it.sortVal = tick
- it.val = obj
- d.SmallHeap.Add(it)
- }
- func (d *OrderQueue[T2]) Get(key int64) (T2, bool) {
- t, ok := d.SmallHeap.GetByUniqueKey(key)
- if ok {
- return t.val, true
- }
- var tv T2
- return tv, false
- }
- func (d *OrderQueue[T2]) Remove(id int64) (T2, error) {
- t, err := d.SmallHeap.RemoveByUniqueKey(id)
- if err != nil {
- var t T2
- return t, err
- }
- return t.val, nil
- }
- func (d *OrderQueue[T2]) Pop() (T2, error) {
- t, err := d.SmallHeap.Remove(0)
- if err != nil {
- var t T2
- return t, err
- }
- return t.val, nil
- }
- func (d *OrderQueue[T2]) PeekFirst() (T2, int64, error) {
- t, err := d.SmallHeap.Get(0)
- if err != nil {
- var t T2
- return t, 0, err
- }
- return t.val, t.sortVal, nil
- }
- func safeCall[T KeyItem](callBack func(T), t T) {
- defer func() {
- if err := recover(); err != nil {
- log.Warnf("checkTimeout safeCall error. %v", err)
- }
- }()
- callBack(t)
- }
- func (d *OrderQueue[T2]) ClearSmallHeadQueue(std int64, callBack func(T2)) {
- num := d.SmallHeap.Size()
- for i := 0; i < num; i += 1 {
- t, err := d.SmallHeap.Get(0)
- if err != nil {
- break
- }
- if t.sortVal <= std {
- d.SmallHeap.Remove(0)
- if callBack != nil {
- safeCall(callBack, t.val)
- }
- } else {
- break
- }
- }
- }
- func (d *OrderQueue[T2]) ClearBiglHeadQueue(std int64, callBack func(T2)) {
- num := d.SmallHeap.Size()
- for i := 0; i < num; i += 1 {
- t, err := d.SmallHeap.Get(0)
- if err != nil {
- break
- }
- if t.sortVal >= std {
- d.SmallHeap.Remove(0)
- if callBack != nil {
- safeCall(callBack, t.val)
- }
- } else {
- break
- }
- }
- }
|