// 异步处理闭包函数, // 开启若干协程,监听闭包函数通道 // 实现任务异步处理功能 package coroutine import ( "leafstalk/log" "sync" ) // TaskFuncDispatcher 异步任务调度器,用于管理和执行异步任务 type TaskFuncDispatcher struct { name string numWorkers int chTasks chan func() // 传递闭包函数的通道,用于执行异步任务 wg sync.WaitGroup // cancel context.CancelFunc } // NewTaskFuncDispatcher 创建一个异步任务调度器实例 func NewTaskFuncDispatcher(name string, numWorkers int, bufferSize int) *TaskFuncDispatcher { return &TaskFuncDispatcher{ name: name, numWorkers: numWorkers, chTasks: make(chan func(), bufferSize), } } // Start 启动异步任务调度器 func (td *TaskFuncDispatcher) Start() { // ctx, cancel := context.WithCancel(context.Background()) // td.cancel = cancel for range td.numWorkers { td.wg.Add(1) go func() { defer td.wg.Done() td.run() }() } } // Stop 停止异步任务调度器,等待所有任务完成 func (td *TaskFuncDispatcher) Stop() { close(td.chTasks) td.wg.Wait() } // AddTask 添加任务到任务通道,通道满时需要等待 func (td *TaskFuncDispatcher) AddTask(task func()) { td.chTasks <- task } // TryAddTask 尝试添加任务到任务通道,通道满时直接返回添加是否成功的结果 func (td *TaskFuncDispatcher) TryAddTask(task func()) bool { select { case td.chTasks <- task: return true default: return false } } // run 异步任务调度器的核心逻辑 func (td *TaskFuncDispatcher) run() { safeDo := func(task func()) { defer func() { if err := recover(); err != nil { // 处理 panic,例如记录日志 log.Errorf("TaskFuncDispatcher panic error %s: %v", td.name, err) } }() task() } for task := range td.chTasks { safeDo(task) } } func example2() { taskDispatcher := NewTaskFuncDispatcher("MyTask", 3, 10) taskDispatcher.Start() taskDispatcher.AddTask(func() { // 处理任务1 }) taskDispatcher.AddTask(func() { // 处理任务2 }) taskDispatcher.Stop() }