datataskroutine.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. // Package chat
  2. // 通道传递数据,调用指定的协程函数进行处理
  3. // 1-N个协程处理一种事情
  4. package coroutine
  5. import (
  6. "leafstalk/log"
  7. "sync"
  8. )
  9. // DataTaskRoutine 异步任务协程类
  10. type DataTaskRoutine[T any] struct {
  11. name string
  12. numWorkers int
  13. chTasks chan T
  14. wg sync.WaitGroup
  15. // cancel context.CancelFunc
  16. taskFunc func(T) // 任务函数,接收数据作为参数
  17. }
  18. // NewDataTaskRoutine 创建一个异步任务协程实例
  19. func NewDataTaskRoutine[T any](name string, numWorkers int, bufferSize int, taskFunc func(T)) *DataTaskRoutine[T] {
  20. return &DataTaskRoutine[T]{
  21. name: name,
  22. numWorkers: numWorkers,
  23. chTasks: make(chan T, bufferSize),
  24. taskFunc: taskFunc,
  25. }
  26. }
  27. // Start 启动异步任务协程
  28. func (tr *DataTaskRoutine[T]) Start() {
  29. // ctx, cancel := context.WithCancel(context.Background())
  30. // tr.cancel = cancel
  31. for range tr.numWorkers {
  32. tr.wg.Add(1)
  33. go func() {
  34. defer tr.wg.Done()
  35. tr.run()
  36. }()
  37. }
  38. }
  39. // Stop 停止异步任务协程,等待协程退出
  40. func (tr *DataTaskRoutine[T]) Stop() {
  41. close(tr.chTasks)
  42. tr.wg.Wait()
  43. // tr.cancel()
  44. }
  45. // AddTask 添加任务到任务通道,通道满时需要等待
  46. func (tr *DataTaskRoutine[T]) AddTask(task T) {
  47. tr.chTasks <- task
  48. }
  49. // TryAddTask 尝试添加任务到任务通道,通道满时直接返回添加是否成功的结果
  50. func (tr *DataTaskRoutine[T]) TryAddTask(task T) bool {
  51. select {
  52. case tr.chTasks <- task:
  53. return true
  54. default:
  55. return false
  56. }
  57. }
  58. func (tr *DataTaskRoutine[T]) TryGetTask(n int) []T {
  59. var lst []T
  60. for range n {
  61. select {
  62. case t := <-tr.chTasks:
  63. lst = append(lst, t)
  64. default:
  65. return lst
  66. }
  67. }
  68. return lst
  69. }
  70. // run 异步任务协程的核心逻辑
  71. func (tr *DataTaskRoutine[T]) run() {
  72. safeDo := func(task T) {
  73. defer func() {
  74. if err := recover(); err != nil {
  75. // 处理 panic,例如记录日志
  76. log.Errorf("DataTaskRoutine Recovered panic %s: %v", tr.name, err)
  77. }
  78. }()
  79. tr.taskFunc(task)
  80. }
  81. for task := range tr.chTasks {
  82. if tr.taskFunc != nil {
  83. safeDo(task)
  84. }
  85. }
  86. }
  87. func DataTaskRoutineExample() {
  88. DataTaskRoutine := NewDataTaskRoutine[string]("MyTask", 3, 10, func(task string) {
  89. // 处理任务
  90. })
  91. DataTaskRoutine.Start()
  92. DataTaskRoutine.AddTask("data1")
  93. DataTaskRoutine.AddTask("data2")
  94. DataTaskRoutine.Stop()
  95. }