package timercache //package timer // https://blog.csdn.net/li_101357/article/details/90111230 // https://studygolang.com/articles/13173 // https://github.com/caucy/timeloop/blob/master/timer/timer.go // https://studygolang.com/articles/13060 import ( "container/heap" "runtime" "sync" "sync/atomic" "time" ) const ( stateWaitting = 1 stateDeleted = 2 stateDone = 3 //stateModify = 2 MIN_TIMER = 20 * time.Millisecond ) type TimerEntry struct { lock sync.Mutex runTime time.Time // 到期时间 Interval time.Duration // 调用间隔 callback func(e *TimerEntry) // 回调方法 key interface{} //key val interface{} //数据 index int //在切片中的索引 state int32 //运行状态 } // time heap type timerCache struct { locker *sync.Mutex timers []*TimerEntry stop chan struct{} preCallback func(e *TimerEntry) bool entryPool *sync.Pool } type TimerCache struct { *timerCache } func New() *TimerCache { cron := newCache() cw := &TimerCache{cron} runtime.SetFinalizer(cw, Stop) return cw } // Stop halts cron instant c from running. func Stop(c *TimerCache) { c.stop <- struct{}{} } func newCache() *timerCache { tc := timerCache{ timers: make([]*TimerEntry, 0), stop: make(chan struct{}, 1), locker: new(sync.Mutex), entryPool: new(sync.Pool), } tc.entryPool.New = func() interface{} { return new(TimerEntry) } heap.Init(&tc) go tc.run() return &tc } func (e *TimerEntry) GetKey() interface{} { return e.key } func (e *TimerEntry) SetVal(val interface{}, callback func(e *TimerEntry)) { e.lock.Lock() defer e.lock.Unlock() e.val = val e.callback = callback } func (e *TimerEntry) GetVal() interface{} { e.lock.Lock() defer e.lock.Unlock() return e.val } //heap.Interface func (h timerCache) Len() int { return len(h.timers) } func (h timerCache) Less(i, j int) bool { t1, t2 := h.timers[i].runTime, h.timers[j].runTime return t1.Before(t2) } func (h timerCache) Swap(i, j int) { h.timers[i], h.timers[j] = h.timers[j], h.timers[i] h.timers[i].index = i h.timers[j].index = j } func (h *timerCache) Push(x interface{}) { n := len(h.timers) item := x.(*TimerEntry) item.index = n h.timers = append(h.timers, item) } func (h *timerCache) Pop() interface{} { l := len(h.timers) item := h.timers[l-1] item.index = -1 h.timers = h.timers[:l-1] return item } //operator func (h *timerCache) SetPrecallBack(callback func(e *TimerEntry) bool) { h.preCallback = callback } func (h *timerCache) Add(d time.Duration, key interface{}, val interface{}, callBack func(e *TimerEntry)) *TimerEntry { if d < MIN_TIMER { d = MIN_TIMER } t := h.entryPool.Get().(*TimerEntry) t.runTime = time.Now().Add(d) t.Interval = d t.key = key t.val = val t.callback = callBack t.state = stateWaitting h.locker.Lock() defer h.locker.Unlock() heap.Push(h, t) return t } func (h *timerCache) Remove(e *TimerEntry) bool { h.locker.Lock() defer h.locker.Unlock() atomic.StoreInt32(&e.state, stateDeleted) if e.index >= 0 { heap.Remove(h, e.index) h.entryPool.Put(e) return true } return false } func (h *timerCache) run() { ticker := time.NewTicker(time.Millisecond * 500) for { select { case now := <-ticker.C: h.execTask(now) case <-h.stop: ticker.Stop() return // terminate go-routine. } } } func (h *timerCache) execTask(now time.Time) { //取超时项 dl := func() []*TimerEntry { dl := make([]*TimerEntry, 0) h.locker.Lock() defer h.locker.Unlock() if len(h.timers) <= 0 { return dl } for i := 0; i < 1000000; i++ { item := h.timers[0] if !item.runTime.After(now) { it := heap.Pop(h) item := it.(*TimerEntry) dl = append(dl, item) if len(h.timers) <= 0 { break } } else { break } } return dl }() if len(dl) <= 0 { return } execCallBack := func(item *TimerEntry) { defer func() { if err := recover(); err != nil { //log.Wa("%v", err) _ = err } }() if h.preCallback != nil { if h.preCallback(item) { item.callback(item) } return } item.callback(item) } //执行回调 for _, item := range dl { if atomic.CompareAndSwapInt32(&item.state, stateWaitting, stateDone) { execCallBack(item) h.entryPool.Put(item) } } }