package dbserver import ( "container/heap" "fmt" "sync" ) type SaveItem interface { TableName() string GetUniqueKey() string } type SaveCallBack func() type SortItem struct { value SaveItem tick int64 index int chEndEvent chan struct{} } type SortQueue struct { sync.Mutex heap []*SortItem itemsMap map[string]*SortItem } // 接口 func (sq *SortQueue) Len() int { return len(sq.heap) } // 根最小 func (sq *SortQueue) Less(i, j int) bool { return sq.heap[i].tick < sq.heap[j].tick } func (sq *SortQueue) Swap(i, j int) { sq.heap[i], sq.heap[j] = sq.heap[j], sq.heap[i] sq.heap[i].index = i sq.heap[j].index = j } // 元素放在队列尾部 func (sq *SortQueue) Push(x interface{}) { n := len(sq.heap) item := x.(*SortItem) item.index = n sq.heap = append(sq.heap, item) } // 弹出最后一个元素 func (sq *SortQueue) Pop() interface{} { n := len(sq.heap) item := sq.heap[n-1] item.index = -1 sq.heap = sq.heap[0 : n-1] return item } // 使用 func NewSortQueue(len int) *SortQueue { sq := new(SortQueue) sq.heap = make([]*SortItem, 0, len) heap.Init(sq) sq.itemsMap = make(map[string]*SortItem) return sq } func CalcUniqueKey(item SaveItem) string { tab := item.TableName() id := item.GetUniqueKey() return fmt.Sprintf("%s.%s", tab, id) } // 插入一条记录 func (sq *SortQueue) Set(si SaveItem, tick int64, needEndEvent bool) chan struct{} { sq.Lock() defer sq.Unlock() key := CalcUniqueKey(si) if v, ok := sq.itemsMap[key]; ok { v.value = si if needEndEvent { if v.chEndEvent == nil { v.chEndEvent = make(chan struct{}) } } if v.tick > tick { v.tick = tick heap.Fix(sq, v.index) } return v.chEndEvent } item := new(SortItem) item.value = si item.tick = tick if needEndEvent { if item.chEndEvent == nil { item.chEndEvent = make(chan struct{}) } } heap.Push(sq, item) sq.itemsMap[key] = item return item.chEndEvent } // 插入一条记录 func (sq *SortQueue) SetUtil(si SaveItem, tick int64, needEndEvent bool) chan struct{} { sq.Lock() defer sq.Unlock() key := CalcUniqueKey(si) if v, ok := sq.itemsMap[key]; ok { if needEndEvent { if v.chEndEvent == nil { v.chEndEvent = make(chan struct{}) } } if v.tick > tick { v.tick = tick heap.Fix(sq, v.index) } return v.chEndEvent } return nil } // 移除最小值记录 func (sq *SortQueue) PopItem() (SaveItem, chan struct{}) { sq.Lock() defer sq.Unlock() if len(sq.heap) == 0 { return nil, nil } it := heap.Pop(sq).(*SortItem) key := CalcUniqueKey(it.value) delete(sq.itemsMap, key) return it.value, it.chEndEvent } // 移除指定记录 func (sq *SortQueue) RemoveItem(item SaveItem) SaveItem { sq.Lock() defer sq.Unlock() if len(sq.heap) == 0 { return nil } key := CalcUniqueKey(item) if v, ok := sq.itemsMap[key]; ok { heap.Remove(sq, v.index) delete(sq.itemsMap, key) return v.value } return nil } // 查看二叉树根记录,就是最小值记录 func (sq *SortQueue) First() (SaveItem, int64) { sq.Lock() defer sq.Unlock() if len(sq.heap) == 0 { return nil, 0 } item := sq.heap[0] return item.value, item.tick } // 查看二叉树最后节点信息,不一定是最大值 func (sq *SortQueue) Last() (SaveItem, int64) { sq.Lock() defer sq.Unlock() cnt := sq.Len() if cnt == 0 { return nil, 0 } item := sq.heap[cnt-1] return item.value, item.tick } func example() { // saveSq := NewSortQueue(100) // for i := 0; i < 10; i += 1 { // it := new(NewYearFifteen) // it.PlayerId = 1 + int64(i) // rd := rand.Int63n(1000) // saveSq.Set(it, rd, nil) // } // fmt.Println(saveSq.First()) // fmt.Println(saveSq.Last()) // for _, v := range saveSq.heap { // fmt.Println(v.tick, v.value.GetUniqueKey()) // } // cnt := saveSq.Len() // for i := 0; i < cnt; i += 1 { // fmt.Println(saveSq.PopItem().GetUniqueKey()) // } }