// 最小堆存放数据项 // 并实现了一个时序队列,支持添加、删除、修改等操作 // 修改时间:2024-12-26 package smallheap import ( "container/heap" "context" "leafstalk/log" "leafstalk/otherutils/snowflake" "sync" "time" ) type SortItem interface { SetIndex(index int) GetIndex() int GetSortKey() int64 GetUniqueKey() int64 } type SmallHeap[T SortItem] struct { Items []T QueryTable map[int64]T sequence *snowflake.Node //1MS生成400万个,不分节点 } func (h SmallHeap[T]) Len() int { return len(h.Items) } func (h SmallHeap[T]) Less(i, j int) bool { return h.Items[i].GetSortKey() < h.Items[j].GetSortKey() } func (h SmallHeap[T]) Swap(i, j int) { h.Items[i], h.Items[j] = h.Items[j], h.Items[i] h.Items[i].SetIndex(i) h.Items[j].SetIndex(j) } func (h *SmallHeap[T]) Push(x interface{}) { v, ok := x.(T) if ok { v.SetIndex(len(h.Items)) h.Items = append(h.Items, v) } } func (h *SmallHeap[T]) Pop() interface{} { n := len(h.Items) x := h.Items[n-1] h.Items = h.Items[:n-1] return x } func NewSmallHeap[T SortItem]() *SmallHeap[T] { sh := new(SmallHeap[T]) sh.Items = make([]T, 0, 16) sh.QueryTable = make(map[int64]T) seq, err := snowflake.NewMsNode(0, 0) if err != nil { panic("NewSmallHeap init sequence error") return nil } sh.sequence = seq heap.Init(sh) return sh } func (h *SmallHeap[T]) Clear() { h.Items = make([]T, 0) h.QueryTable = make(map[int64]T) } // 使用接口 // 添加 func (h *SmallHeap[T]) Add(it T) { heap.Push(h, it) k := it.GetUniqueKey() h.QueryTable[k] = it } // 删除 func (h *SmallHeap[T]) Remove(index int) T { if index < 0 || index >= h.Len() { var t T return t } it := heap.Remove(h, index) v, ok := it.(T) if ok { v.SetIndex(-1) delete(h.QueryTable, v.GetUniqueKey()) return v } var t T return t } func (h *SmallHeap[T]) RemoveByUniqueKey(uniqueKey int64) T { v, ok := h.QueryTable[uniqueKey] if !ok { var t T return t } return h.Remove(v.GetIndex()) } // 修改 func (h *SmallHeap[T]) Update(t1 T) bool { old, ok := h.GetByUniqueKey(t1.GetUniqueKey()) if ok { heap.Fix(h, old.GetIndex()) return true } return false } // 查看第一个 func (h *SmallHeap[T]) Get(index int) T { if index >= 0 && index < h.Len() { return h.Items[index] } var t T return t } func (h *SmallHeap[T]) GetByUniqueKey(uniqueKey int64) (T, bool) { v, ok := h.QueryTable[uniqueKey] return v, ok } // 生成唯一ID,1MS生成400万个 func (h *SmallHeap[T]) NewId() int64 { return h.sequence.Generate().Int64() } // 排序、唯一KEY都是int64 type Int64SortObject[T any] struct { index int SortKey int64 uniqueKey int64 val T } func (t *Int64SortObject[T]) GetSortKey() int64 { return t.SortKey } func (t *Int64SortObject[T]) SetIndex(index int) { t.index = index } func (t *Int64SortObject[T]) GetIndex() int { return t.index } func (t *Int64SortObject[T]) GetUniqueKey() int64 { return t.uniqueKey } // 时序队列 type TimeOrderQueue[T2 any] struct { allObjects *SmallHeap[*Int64SortObject[T2]] callBack func(t T2) wg sync.WaitGroup cancel context.CancelFunc } func NewTimeOrderQueue[T2 any](checkPeriod time.Duration, callBack func(t T2)) *TimeOrderQueue[T2] { df := new(TimeOrderQueue[T2]) df.allObjects = NewSmallHeap[*Int64SortObject[T2]]() ctx, cancel := context.WithCancel(context.Background()) df.cancel = cancel df.callBack = callBack df.wg.Add(1) go func() { defer df.wg.Done() df.checkOnTime(ctx, checkPeriod) }() return df } func (d *TimeOrderQueue[T2]) Stop() { // 关闭存储协程 if d.cancel != nil { d.cancel() d.cancel = nil } d.wg.Wait() } func (d *TimeOrderQueue[T2]) GetAll() map[int64]T2 { res := make(map[int64]T2) for k, v := range d.allObjects.QueryTable { res[k] = v.val } return res } func (d *TimeOrderQueue[T2]) NewId() int64 { return d.allObjects.NewId() } // 添加定时触发对象 // 要求objId运行期间必须唯一 func (d *TimeOrderQueue[T2]) AddTimeOrderObject(delay int64, obj T2, objId int64) int64 { ct := time.Now().Unix() ct += delay if objId == 0 { objId = d.NewId() } it := new(Int64SortObject[T2]) it.SortKey = ct it.val = obj it.uniqueKey = objId d.allObjects.Add(it) return objId } func (d *TimeOrderQueue[T2]) RemoveTimeOrderObject(id int64) T2 { t := d.allObjects.RemoveByUniqueKey(id) return t.val } // 更新延迟时间,从调用函数时开始计算 func (d *TimeOrderQueue[T2]) UpdateDelayTime(id int64, delay int64) bool { it, ok := d.allObjects.GetByUniqueKey(id) if !ok { return ok } ct := time.Now().Unix() ct += delay it.SortKey = ct return d.allObjects.Update(it) } func (d *TimeOrderQueue[T2]) checkOnTime(ctx context.Context, period time.Duration) { safeCall := func(t T2) { if d.callBack == nil { return } defer func() { if err := recover(); err != nil { log.Warnf("checkTimeout safeCall error. %v", err) } }() d.callBack(t) } if period < time.Millisecond { period = 50 * time.Millisecond } ticker := time.NewTicker(period) for { var ct time.Time select { case <-ctx.Done(): ticker.Stop() log.Warnf("checkTimeout exit.") return case ct = <-ticker.C: } cst := ct.Unix() for { // 是否到时间 t := d.allObjects.Get(0) if t == nil { break } if t.SortKey > cst { break } // 取第一个 d.allObjects.Remove(0) safeCall(t.val) } } }