123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- package timercache
- //package timer
- // https://blog.csdn.net/li_101357/article/details/90111230
- // https://studygolang.com/articles/13173
- // https://github.com/caucy/timeloop/blob/master/timer/timer.go
- // https://studygolang.com/articles/13060
- import (
- "container/heap"
- "runtime"
- "sync"
- "sync/atomic"
- "time"
- )
- const (
- stateWaitting = 1
- stateDeleted = 2
- stateDone = 3
- //stateModify = 2
- MIN_TIMER = 20 * time.Millisecond
- )
- type TimerEntry struct {
- lock sync.Mutex
- runTime time.Time // 到期时间
- Interval time.Duration // 调用间隔
- callback func(e *TimerEntry) // 回调方法
- key interface{} //key
- val interface{} //数据
- index int //在切片中的索引
- state int32 //运行状态
- }
- // time heap
- type timerCache struct {
- locker *sync.Mutex
- timers []*TimerEntry
- stop chan struct{}
- preCallback func(e *TimerEntry) bool
- entryPool *sync.Pool
- }
- type TimerCache struct {
- *timerCache
- }
- func New() *TimerCache {
- cron := newCache()
- cw := &TimerCache{cron}
- runtime.SetFinalizer(cw, Stop)
- return cw
- }
- // Stop halts cron instant c from running.
- func Stop(c *TimerCache) {
- c.stop <- struct{}{}
- }
- func newCache() *timerCache {
- tc := timerCache{
- timers: make([]*TimerEntry, 0),
- stop: make(chan struct{}, 1),
- locker: new(sync.Mutex),
- entryPool: new(sync.Pool),
- }
- tc.entryPool.New = func() interface{} {
- return new(TimerEntry)
- }
- heap.Init(&tc)
- go tc.run()
- return &tc
- }
- func (e *TimerEntry) GetKey() interface{} {
- return e.key
- }
- func (e *TimerEntry) SetVal(val interface{}, callback func(e *TimerEntry)) {
- e.lock.Lock()
- defer e.lock.Unlock()
- e.val = val
- e.callback = callback
- }
- func (e *TimerEntry) GetVal() interface{} {
- e.lock.Lock()
- defer e.lock.Unlock()
- return e.val
- }
- //heap.Interface
- func (h timerCache) Len() int {
- return len(h.timers)
- }
- func (h timerCache) Less(i, j int) bool {
- t1, t2 := h.timers[i].runTime, h.timers[j].runTime
- return t1.Before(t2)
- }
- func (h timerCache) Swap(i, j int) {
- h.timers[i], h.timers[j] = h.timers[j], h.timers[i]
- h.timers[i].index = i
- h.timers[j].index = j
- }
- func (h *timerCache) Push(x interface{}) {
- n := len(h.timers)
- item := x.(*TimerEntry)
- item.index = n
- h.timers = append(h.timers, item)
- }
- func (h *timerCache) Pop() interface{} {
- l := len(h.timers)
- item := h.timers[l-1]
- item.index = -1
- h.timers = h.timers[:l-1]
- return item
- }
- //operator
- func (h *timerCache) SetPrecallBack(callback func(e *TimerEntry) bool) {
- h.preCallback = callback
- }
- func (h *timerCache) Add(d time.Duration, key interface{}, val interface{}, callBack func(e *TimerEntry)) *TimerEntry {
- if d < MIN_TIMER {
- d = MIN_TIMER
- }
- t := h.entryPool.Get().(*TimerEntry)
- t.runTime = time.Now().Add(d)
- t.Interval = d
- t.key = key
- t.val = val
- t.callback = callBack
- t.state = stateWaitting
- h.locker.Lock()
- defer h.locker.Unlock()
- heap.Push(h, t)
- return t
- }
- func (h *timerCache) Remove(e *TimerEntry) bool {
- h.locker.Lock()
- defer h.locker.Unlock()
- atomic.StoreInt32(&e.state, stateDeleted)
- if e.index >= 0 {
- heap.Remove(h, e.index)
- h.entryPool.Put(e)
- return true
- }
- return false
- }
- func (h *timerCache) run() {
- ticker := time.NewTicker(time.Millisecond * 500)
- for {
- select {
- case now := <-ticker.C:
- h.execTask(now)
- case <-h.stop:
- ticker.Stop()
- return // terminate go-routine.
- }
- }
- }
- func (h *timerCache) execTask(now time.Time) {
- //取超时项
- dl := func() []*TimerEntry {
- dl := make([]*TimerEntry, 0)
- h.locker.Lock()
- defer h.locker.Unlock()
- if len(h.timers) <= 0 {
- return dl
- }
- for i := 0; i < 1000000; i++ {
- item := h.timers[0]
- if !item.runTime.After(now) {
- it := heap.Pop(h)
- item := it.(*TimerEntry)
- dl = append(dl, item)
- if len(h.timers) <= 0 {
- break
- }
- } else {
- break
- }
- }
- return dl
- }()
- if len(dl) <= 0 {
- return
- }
- execCallBack := func(item *TimerEntry) {
- defer func() {
- if err := recover(); err != nil {
- //log.Wa("%v", err)
- _ = err
- }
- }()
- if h.preCallback != nil {
- if h.preCallback(item) {
- item.callback(item)
- }
- return
- }
- item.callback(item)
- }
- //执行回调
- for _, item := range dl {
- if atomic.CompareAndSwapInt32(&item.state, stateWaitting, stateDone) {
- execCallBack(item)
- h.entryPool.Put(item)
- }
- }
- }
|