sequencetask.go 1.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package sequencetask
  2. import (
  3. "leafstalk/log"
  4. "sync"
  5. )
  6. const (
  7. defaultFunLen = 1000
  8. )
  9. type TaskQueue struct {
  10. TaskQueueLen int
  11. allFuns chan func()
  12. doneRun chan struct{}
  13. wg sync.WaitGroup
  14. }
  15. func (t *TaskQueue) StartSequenceTaskProcess(taskQueueLen int) {
  16. t.TaskQueueLen = taskQueueLen
  17. if t.TaskQueueLen == 0 {
  18. t.TaskQueueLen = defaultFunLen
  19. }
  20. t.allFuns = make(chan func(), t.TaskQueueLen)
  21. t.wg.Add(1)
  22. go t.run()
  23. }
  24. func (t *TaskQueue) StopSequenceTaskProcess() {
  25. close(t.doneRun)
  26. close(t.allFuns)
  27. t.wg.Wait()
  28. }
  29. func (t *TaskQueue) AddTask(f func()) bool {
  30. if len(t.allFuns) >= t.TaskQueueLen {
  31. return false
  32. }
  33. t.allFuns <- f
  34. return true
  35. }
  36. func (t *TaskQueue) run() {
  37. defer t.wg.Done()
  38. t.doneRun = make(chan struct{})
  39. safeExec := func(f func()) {
  40. defer func() {
  41. if err := recover(); err != nil {
  42. log.Warnln("timercall exec panic:", err)
  43. }
  44. }()
  45. f()
  46. }
  47. for {
  48. select {
  49. case <-t.doneRun:
  50. return
  51. case f, ok := <-t.allFuns:
  52. if !ok {
  53. return
  54. }
  55. safeExec(f)
  56. }
  57. }
  58. }