heap.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package dbserver
  2. import (
  3. "container/heap"
  4. "fmt"
  5. "sync"
  6. )
  7. type SaveItem interface {
  8. TableName() string
  9. GetUniqueKey() string
  10. }
  11. type SaveCallBack func()
  12. type SortItem struct {
  13. value SaveItem
  14. tick int64
  15. index int
  16. chEndEvent chan struct{}
  17. }
  18. type SortQueue struct {
  19. sync.Mutex
  20. heap []*SortItem
  21. itemsMap map[string]*SortItem
  22. }
  23. // 接口
  24. func (sq *SortQueue) Len() int {
  25. return len(sq.heap)
  26. }
  27. // 根最小
  28. func (sq *SortQueue) Less(i, j int) bool {
  29. return sq.heap[i].tick < sq.heap[j].tick
  30. }
  31. func (sq *SortQueue) Swap(i, j int) {
  32. sq.heap[i], sq.heap[j] = sq.heap[j], sq.heap[i]
  33. sq.heap[i].index = i
  34. sq.heap[j].index = j
  35. }
  36. // 元素放在队列尾部
  37. func (sq *SortQueue) Push(x interface{}) {
  38. n := len(sq.heap)
  39. item := x.(*SortItem)
  40. item.index = n
  41. sq.heap = append(sq.heap, item)
  42. }
  43. // 弹出最后一个元素
  44. func (sq *SortQueue) Pop() interface{} {
  45. n := len(sq.heap)
  46. item := sq.heap[n-1]
  47. item.index = -1
  48. sq.heap = sq.heap[0 : n-1]
  49. return item
  50. }
  51. // 使用
  52. func NewSortQueue(len int) *SortQueue {
  53. sq := new(SortQueue)
  54. sq.heap = make([]*SortItem, 0, len)
  55. heap.Init(sq)
  56. sq.itemsMap = make(map[string]*SortItem)
  57. return sq
  58. }
  59. func CalcUniqueKey(item SaveItem) string {
  60. tab := item.TableName()
  61. id := item.GetUniqueKey()
  62. return fmt.Sprintf("%s.%s", tab, id)
  63. }
  64. // 插入一条记录
  65. func (sq *SortQueue) Set(si SaveItem, tick int64, needEndEvent bool) chan struct{} {
  66. sq.Lock()
  67. defer sq.Unlock()
  68. key := CalcUniqueKey(si)
  69. if v, ok := sq.itemsMap[key]; ok {
  70. v.value = si
  71. if needEndEvent {
  72. if v.chEndEvent == nil {
  73. v.chEndEvent = make(chan struct{})
  74. }
  75. }
  76. if v.tick > tick {
  77. v.tick = tick
  78. heap.Fix(sq, v.index)
  79. }
  80. return v.chEndEvent
  81. }
  82. item := new(SortItem)
  83. item.value = si
  84. item.tick = tick
  85. if needEndEvent {
  86. if item.chEndEvent == nil {
  87. item.chEndEvent = make(chan struct{})
  88. }
  89. }
  90. heap.Push(sq, item)
  91. sq.itemsMap[key] = item
  92. return item.chEndEvent
  93. }
  94. // 插入一条记录
  95. func (sq *SortQueue) SetUtil(si SaveItem, tick int64, needEndEvent bool) chan struct{} {
  96. sq.Lock()
  97. defer sq.Unlock()
  98. key := CalcUniqueKey(si)
  99. if v, ok := sq.itemsMap[key]; ok {
  100. if needEndEvent {
  101. if v.chEndEvent == nil {
  102. v.chEndEvent = make(chan struct{})
  103. }
  104. }
  105. if v.tick > tick {
  106. v.tick = tick
  107. heap.Fix(sq, v.index)
  108. }
  109. return v.chEndEvent
  110. }
  111. return nil
  112. }
  113. // 移除最小值记录
  114. func (sq *SortQueue) PopItem() (SaveItem, chan struct{}) {
  115. sq.Lock()
  116. defer sq.Unlock()
  117. if len(sq.heap) == 0 {
  118. return nil, nil
  119. }
  120. it := heap.Pop(sq).(*SortItem)
  121. key := CalcUniqueKey(it.value)
  122. delete(sq.itemsMap, key)
  123. return it.value, it.chEndEvent
  124. }
  125. // 移除指定记录
  126. func (sq *SortQueue) RemoveItem(item SaveItem) SaveItem {
  127. sq.Lock()
  128. defer sq.Unlock()
  129. if len(sq.heap) == 0 {
  130. return nil
  131. }
  132. key := CalcUniqueKey(item)
  133. if v, ok := sq.itemsMap[key]; ok {
  134. heap.Remove(sq, v.index)
  135. delete(sq.itemsMap, key)
  136. return v.value
  137. }
  138. return nil
  139. }
  140. // 查看二叉树根记录,就是最小值记录
  141. func (sq *SortQueue) First() (SaveItem, int64) {
  142. sq.Lock()
  143. defer sq.Unlock()
  144. if len(sq.heap) == 0 {
  145. return nil, 0
  146. }
  147. item := sq.heap[0]
  148. return item.value, item.tick
  149. }
  150. // 查看二叉树最后节点信息,不一定是最大值
  151. func (sq *SortQueue) Last() (SaveItem, int64) {
  152. sq.Lock()
  153. defer sq.Unlock()
  154. cnt := sq.Len()
  155. if cnt == 0 {
  156. return nil, 0
  157. }
  158. item := sq.heap[cnt-1]
  159. return item.value, item.tick
  160. }
  161. func example() {
  162. // saveSq := NewSortQueue(100)
  163. // for i := 0; i < 10; i += 1 {
  164. // it := new(NewYearFifteen)
  165. // it.PlayerId = 1 + int64(i)
  166. // rd := rand.Int63n(1000)
  167. // saveSq.Set(it, rd, nil)
  168. // }
  169. // fmt.Println(saveSq.First())
  170. // fmt.Println(saveSq.Last())
  171. // for _, v := range saveSq.heap {
  172. // fmt.Println(v.tick, v.value.GetUniqueKey())
  173. // }
  174. // cnt := saveSq.Len()
  175. // for i := 0; i < cnt; i += 1 {
  176. // fmt.Println(saveSq.PopItem().GetUniqueKey())
  177. // }
  178. }