timercache.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package timercache
  2. //package timer
  3. // https://blog.csdn.net/li_101357/article/details/90111230
  4. // https://studygolang.com/articles/13173
  5. // https://github.com/caucy/timeloop/blob/master/timer/timer.go
  6. // https://studygolang.com/articles/13060
  7. import (
  8. "container/heap"
  9. "runtime"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. const (
  15. stateWaitting = 1
  16. stateDeleted = 2
  17. stateDone = 3
  18. //stateModify = 2
  19. MIN_TIMER = 20 * time.Millisecond
  20. )
  21. type TimerEntry struct {
  22. lock sync.Mutex
  23. runTime time.Time // 到期时间
  24. Interval time.Duration // 调用间隔
  25. callback func(e *TimerEntry) // 回调方法
  26. key interface{} //key
  27. val interface{} //数据
  28. index int //在切片中的索引
  29. state int32 //运行状态
  30. }
  31. // time heap
  32. type timerCache struct {
  33. locker *sync.Mutex
  34. timers []*TimerEntry
  35. stop chan struct{}
  36. preCallback func(e *TimerEntry) bool
  37. entryPool *sync.Pool
  38. }
  39. type TimerCache struct {
  40. *timerCache
  41. }
  42. func New() *TimerCache {
  43. cron := newCache()
  44. cw := &TimerCache{cron}
  45. runtime.SetFinalizer(cw, Stop)
  46. return cw
  47. }
  48. // Stop halts cron instant c from running.
  49. func Stop(c *TimerCache) {
  50. c.stop <- struct{}{}
  51. }
  52. func newCache() *timerCache {
  53. tc := timerCache{
  54. timers: make([]*TimerEntry, 0),
  55. stop: make(chan struct{}, 1),
  56. locker: new(sync.Mutex),
  57. entryPool: new(sync.Pool),
  58. }
  59. tc.entryPool.New = func() interface{} {
  60. return new(TimerEntry)
  61. }
  62. heap.Init(&tc)
  63. go tc.run()
  64. return &tc
  65. }
  66. func (e *TimerEntry) GetKey() interface{} {
  67. return e.key
  68. }
  69. func (e *TimerEntry) SetVal(val interface{}, callback func(e *TimerEntry)) {
  70. e.lock.Lock()
  71. defer e.lock.Unlock()
  72. e.val = val
  73. e.callback = callback
  74. }
  75. func (e *TimerEntry) GetVal() interface{} {
  76. e.lock.Lock()
  77. defer e.lock.Unlock()
  78. return e.val
  79. }
  80. //heap.Interface
  81. func (h timerCache) Len() int {
  82. return len(h.timers)
  83. }
  84. func (h timerCache) Less(i, j int) bool {
  85. t1, t2 := h.timers[i].runTime, h.timers[j].runTime
  86. return t1.Before(t2)
  87. }
  88. func (h timerCache) Swap(i, j int) {
  89. h.timers[i], h.timers[j] = h.timers[j], h.timers[i]
  90. h.timers[i].index = i
  91. h.timers[j].index = j
  92. }
  93. func (h *timerCache) Push(x interface{}) {
  94. n := len(h.timers)
  95. item := x.(*TimerEntry)
  96. item.index = n
  97. h.timers = append(h.timers, item)
  98. }
  99. func (h *timerCache) Pop() interface{} {
  100. l := len(h.timers)
  101. item := h.timers[l-1]
  102. item.index = -1
  103. h.timers = h.timers[:l-1]
  104. return item
  105. }
  106. //operator
  107. func (h *timerCache) SetPrecallBack(callback func(e *TimerEntry) bool) {
  108. h.preCallback = callback
  109. }
  110. func (h *timerCache) Add(d time.Duration, key interface{}, val interface{}, callBack func(e *TimerEntry)) *TimerEntry {
  111. if d < MIN_TIMER {
  112. d = MIN_TIMER
  113. }
  114. t := h.entryPool.Get().(*TimerEntry)
  115. t.runTime = time.Now().Add(d)
  116. t.Interval = d
  117. t.key = key
  118. t.val = val
  119. t.callback = callBack
  120. t.state = stateWaitting
  121. h.locker.Lock()
  122. defer h.locker.Unlock()
  123. heap.Push(h, t)
  124. return t
  125. }
  126. func (h *timerCache) Remove(e *TimerEntry) bool {
  127. h.locker.Lock()
  128. defer h.locker.Unlock()
  129. atomic.StoreInt32(&e.state, stateDeleted)
  130. if e.index >= 0 {
  131. heap.Remove(h, e.index)
  132. h.entryPool.Put(e)
  133. return true
  134. }
  135. return false
  136. }
  137. func (h *timerCache) run() {
  138. ticker := time.NewTicker(time.Millisecond * 500)
  139. for {
  140. select {
  141. case now := <-ticker.C:
  142. h.execTask(now)
  143. case <-h.stop:
  144. ticker.Stop()
  145. return // terminate go-routine.
  146. }
  147. }
  148. }
  149. func (h *timerCache) execTask(now time.Time) {
  150. //取超时项
  151. dl := func() []*TimerEntry {
  152. dl := make([]*TimerEntry, 0)
  153. h.locker.Lock()
  154. defer h.locker.Unlock()
  155. if len(h.timers) <= 0 {
  156. return dl
  157. }
  158. for i := 0; i < 1000000; i++ {
  159. item := h.timers[0]
  160. if !item.runTime.After(now) {
  161. it := heap.Pop(h)
  162. item := it.(*TimerEntry)
  163. dl = append(dl, item)
  164. if len(h.timers) <= 0 {
  165. break
  166. }
  167. } else {
  168. break
  169. }
  170. }
  171. return dl
  172. }()
  173. if len(dl) <= 0 {
  174. return
  175. }
  176. execCallBack := func(item *TimerEntry) {
  177. defer func() {
  178. if err := recover(); err != nil {
  179. //log.Wa("%v", err)
  180. _ = err
  181. }
  182. }()
  183. if h.preCallback != nil {
  184. if h.preCallback(item) {
  185. item.callback(item)
  186. }
  187. return
  188. }
  189. item.callback(item)
  190. }
  191. //执行回调
  192. for _, item := range dl {
  193. if atomic.CompareAndSwapInt32(&item.state, stateWaitting, stateDone) {
  194. execCallBack(item)
  195. h.entryPool.Put(item)
  196. }
  197. }
  198. }