heap.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package redisserver
  2. import (
  3. "container/heap"
  4. "fmt"
  5. "sync"
  6. )
  7. type SaveItem interface {
  8. TableName() string
  9. GetUniqueKey() string
  10. }
  11. type SaveCallBack func()
  12. type SortItem struct {
  13. value SaveItem
  14. tick int64
  15. index int
  16. chEndEvent chan struct{}
  17. }
  18. type SortQueue struct {
  19. sync.Mutex
  20. heap []*SortItem
  21. itemsMap map[string]*SortItem
  22. }
  23. // 接口
  24. func (sq *SortQueue) Len() int {
  25. return len(sq.heap)
  26. }
  27. // 根最小
  28. func (sq *SortQueue) Less(i, j int) bool {
  29. return sq.heap[i].tick < sq.heap[j].tick
  30. }
  31. func (sq *SortQueue) Swap(i, j int) {
  32. sq.heap[i], sq.heap[j] = sq.heap[j], sq.heap[i]
  33. sq.heap[i].index = i
  34. sq.heap[j].index = j
  35. }
  36. // 元素放在队列尾部
  37. func (sq *SortQueue) Push(x interface{}) {
  38. n := len(sq.heap)
  39. item := x.(*SortItem)
  40. item.index = n
  41. sq.heap = append(sq.heap, item)
  42. }
  43. // 弹出最后一个元素
  44. func (sq *SortQueue) Pop() interface{} {
  45. n := len(sq.heap)
  46. item := sq.heap[n-1]
  47. item.index = -1
  48. sq.heap = sq.heap[0 : n-1]
  49. return item
  50. }
  51. // 使用
  52. func NewSortQueue(len int) *SortQueue {
  53. sq := new(SortQueue)
  54. sq.heap = make([]*SortItem, 0, len)
  55. heap.Init(sq)
  56. sq.itemsMap = make(map[string]*SortItem)
  57. return sq
  58. }
  59. func CalcUniqueKey(item SaveItem) string {
  60. tab := item.TableName()
  61. id := item.GetUniqueKey()
  62. return fmt.Sprintf("%s.%s", tab, id)
  63. }
  64. func (sq *SortQueue) Clear() {
  65. sq.heap = make([]*SortItem, 0)
  66. heap.Init(sq)
  67. sq.itemsMap = make(map[string]*SortItem)
  68. }
  69. // 插入一条记录
  70. func (sq *SortQueue) Set(si SaveItem, tick int64, needEndEvent bool) chan struct{} {
  71. sq.Lock()
  72. defer sq.Unlock()
  73. key := CalcUniqueKey(si)
  74. if v, ok := sq.itemsMap[key]; ok {
  75. v.value = si
  76. if needEndEvent {
  77. if v.chEndEvent == nil {
  78. v.chEndEvent = make(chan struct{})
  79. }
  80. }
  81. if v.tick > tick {
  82. v.tick = tick
  83. heap.Fix(sq, v.index)
  84. }
  85. return v.chEndEvent
  86. }
  87. item := new(SortItem)
  88. item.value = si
  89. item.tick = tick
  90. if needEndEvent {
  91. if item.chEndEvent == nil {
  92. item.chEndEvent = make(chan struct{})
  93. }
  94. }
  95. heap.Push(sq, item)
  96. sq.itemsMap[key] = item
  97. return item.chEndEvent
  98. }
  99. // 更新记录时间戳、增加事件通知
  100. func (sq *SortQueue) SetUtil(si SaveItem, tick int64, needEndEvent bool) chan struct{} {
  101. sq.Lock()
  102. defer sq.Unlock()
  103. key := CalcUniqueKey(si)
  104. if v, ok := sq.itemsMap[key]; ok {
  105. if needEndEvent {
  106. if v.chEndEvent == nil {
  107. v.chEndEvent = make(chan struct{})
  108. }
  109. }
  110. if v.tick > tick {
  111. v.tick = tick
  112. heap.Fix(sq, v.index)
  113. }
  114. return v.chEndEvent
  115. }
  116. return nil
  117. }
  118. // 移除最小值记录
  119. func (sq *SortQueue) PopItem() (SaveItem, chan struct{}) {
  120. sq.Lock()
  121. defer sq.Unlock()
  122. if len(sq.heap) == 0 {
  123. return nil, nil
  124. }
  125. it := heap.Pop(sq).(*SortItem)
  126. key := CalcUniqueKey(it.value)
  127. delete(sq.itemsMap, key)
  128. return it.value, it.chEndEvent
  129. }
  130. // 移除指定记录
  131. func (sq *SortQueue) RemoveItem(item SaveItem) SaveItem {
  132. sq.Lock()
  133. defer sq.Unlock()
  134. if len(sq.heap) == 0 {
  135. return nil
  136. }
  137. key := CalcUniqueKey(item)
  138. if v, ok := sq.itemsMap[key]; ok {
  139. heap.Remove(sq, v.index)
  140. delete(sq.itemsMap, key)
  141. return v.value
  142. }
  143. return nil
  144. }
  145. // 查看二叉树根记录,就是最小值记录
  146. func (sq *SortQueue) First() (SaveItem, int64) {
  147. sq.Lock()
  148. defer sq.Unlock()
  149. if len(sq.heap) == 0 {
  150. return nil, 0
  151. }
  152. item := sq.heap[0]
  153. return item.value, item.tick
  154. }
  155. // 查看二叉树最后节点信息,不一定是最大值
  156. func (sq *SortQueue) Last() (SaveItem, int64) {
  157. sq.Lock()
  158. defer sq.Unlock()
  159. cnt := sq.Len()
  160. if cnt == 0 {
  161. return nil, 0
  162. }
  163. item := sq.heap[cnt-1]
  164. return item.value, item.tick
  165. }
  166. func example() {
  167. // saveSq := NewSortQueue(100)
  168. // for i := 0; i < 10; i += 1 {
  169. // it := new(NewYearFifteen)
  170. // it.PlayerId = 1 + int64(i)
  171. // rd := rand.Int63n(1000)
  172. // saveSq.Set(it, rd, nil)
  173. // }
  174. // fmt.Println(saveSq.First())
  175. // fmt.Println(saveSq.Last())
  176. // for _, v := range saveSq.heap {
  177. // fmt.Println(v.tick, v.value.GetUniqueKey())
  178. // }
  179. // cnt := saveSq.Len()
  180. // for i := 0; i < cnt; i += 1 {
  181. // fmt.Println(saveSq.PopItem().GetUniqueKey())
  182. // }
  183. }