// Package chat // 通道传递数据,调用指定的协程函数进行处理 // 1-N个协程处理一种事情 package coroutine import ( "leafstalk/log" "sync" ) // DataTaskRoutine 异步任务协程类 type DataTaskRoutine[T any] struct { name string numWorkers int chTasks chan T wg sync.WaitGroup // cancel context.CancelFunc taskFunc func(T) // 任务函数,接收数据作为参数 } // NewDataTaskRoutine 创建一个异步任务协程实例 func NewDataTaskRoutine[T any](name string, numWorkers int, bufferSize int, taskFunc func(T)) *DataTaskRoutine[T] { return &DataTaskRoutine[T]{ name: name, numWorkers: numWorkers, chTasks: make(chan T, bufferSize), taskFunc: taskFunc, } } // Start 启动异步任务协程 func (tr *DataTaskRoutine[T]) Start() { // ctx, cancel := context.WithCancel(context.Background()) // tr.cancel = cancel for range tr.numWorkers { tr.wg.Add(1) go func() { defer tr.wg.Done() tr.run() }() } } // Stop 停止异步任务协程,等待协程退出 func (tr *DataTaskRoutine[T]) Stop() { close(tr.chTasks) tr.wg.Wait() // tr.cancel() } // AddTask 添加任务到任务通道,通道满时需要等待 func (tr *DataTaskRoutine[T]) AddTask(task T) { tr.chTasks <- task } // TryAddTask 尝试添加任务到任务通道,通道满时直接返回添加是否成功的结果 func (tr *DataTaskRoutine[T]) TryAddTask(task T) bool { select { case tr.chTasks <- task: return true default: return false } } func (tr *DataTaskRoutine[T]) TryGetTask(n int) []T { var lst []T for range n { select { case t := <-tr.chTasks: lst = append(lst, t) default: return lst } } return lst } // run 异步任务协程的核心逻辑 func (tr *DataTaskRoutine[T]) run() { safeDo := func(task T) { defer func() { if err := recover(); err != nil { // 处理 panic,例如记录日志 log.Errorf("DataTaskRoutine Recovered panic %s: %v", tr.name, err) } }() tr.taskFunc(task) } for task := range tr.chTasks { if tr.taskFunc != nil { safeDo(task) } } } func DataTaskRoutineExample() { DataTaskRoutine := NewDataTaskRoutine[string]("MyTask", 3, 10, func(task string) { // 处理任务 }) DataTaskRoutine.Start() DataTaskRoutine.AddTask("data1") DataTaskRoutine.AddTask("data2") DataTaskRoutine.Stop() }