taskfuncdispatcher.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. // 异步处理闭包函数,
  2. // 开启若干协程,监听闭包函数通道
  3. // 实现任务异步处理功能
  4. package coroutine
  5. import (
  6. "leafstalk/log"
  7. "sync"
  8. )
  9. // TaskFuncDispatcher 异步任务调度器,用于管理和执行异步任务
  10. type TaskFuncDispatcher struct {
  11. name string
  12. numWorkers int
  13. chTasks chan func() // 传递闭包函数的通道,用于执行异步任务
  14. wg sync.WaitGroup
  15. // cancel context.CancelFunc
  16. }
  17. // NewTaskFuncDispatcher 创建一个异步任务调度器实例
  18. func NewTaskFuncDispatcher(name string, numWorkers int, bufferSize int) *TaskFuncDispatcher {
  19. return &TaskFuncDispatcher{
  20. name: name,
  21. numWorkers: numWorkers,
  22. chTasks: make(chan func(), bufferSize),
  23. }
  24. }
  25. // Start 启动异步任务调度器
  26. func (td *TaskFuncDispatcher) Start() {
  27. // ctx, cancel := context.WithCancel(context.Background())
  28. // td.cancel = cancel
  29. for range td.numWorkers {
  30. td.wg.Add(1)
  31. go func() {
  32. defer td.wg.Done()
  33. td.run()
  34. }()
  35. }
  36. }
  37. // Stop 停止异步任务调度器,等待所有任务完成
  38. func (td *TaskFuncDispatcher) Stop() {
  39. close(td.chTasks)
  40. td.wg.Wait()
  41. }
  42. // AddTask 添加任务到任务通道,通道满时需要等待
  43. func (td *TaskFuncDispatcher) AddTask(task func()) {
  44. td.chTasks <- task
  45. }
  46. // TryAddTask 尝试添加任务到任务通道,通道满时直接返回添加是否成功的结果
  47. func (td *TaskFuncDispatcher) TryAddTask(task func()) bool {
  48. select {
  49. case td.chTasks <- task:
  50. return true
  51. default:
  52. return false
  53. }
  54. }
  55. // run 异步任务调度器的核心逻辑
  56. func (td *TaskFuncDispatcher) run() {
  57. safeDo := func(task func()) {
  58. defer func() {
  59. if err := recover(); err != nil {
  60. // 处理 panic,例如记录日志
  61. log.Errorf("TaskFuncDispatcher panic error %s: %v", td.name, err)
  62. }
  63. }()
  64. task()
  65. }
  66. for task := range td.chTasks {
  67. safeDo(task)
  68. }
  69. }
  70. func example2() {
  71. taskDispatcher := NewTaskFuncDispatcher("MyTask", 3, 10)
  72. taskDispatcher.Start()
  73. taskDispatcher.AddTask(func() {
  74. // 处理任务1
  75. })
  76. taskDispatcher.AddTask(func() {
  77. // 处理任务2
  78. })
  79. taskDispatcher.Stop()
  80. }