12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- // 异步处理闭包函数,
- // 开启若干协程,监听闭包函数通道
- // 实现任务异步处理功能
- package coroutine
- import (
- "leafstalk/log"
- "sync"
- )
- // TaskFuncDispatcher 异步任务调度器,用于管理和执行异步任务
- type TaskFuncDispatcher struct {
- name string
- numWorkers int
- chTasks chan func() // 传递闭包函数的通道,用于执行异步任务
- wg sync.WaitGroup
- // cancel context.CancelFunc
- }
- // NewTaskFuncDispatcher 创建一个异步任务调度器实例
- func NewTaskFuncDispatcher(name string, numWorkers int, bufferSize int) *TaskFuncDispatcher {
- return &TaskFuncDispatcher{
- name: name,
- numWorkers: numWorkers,
- chTasks: make(chan func(), bufferSize),
- }
- }
- // Start 启动异步任务调度器
- func (td *TaskFuncDispatcher) Start() {
- // ctx, cancel := context.WithCancel(context.Background())
- // td.cancel = cancel
- for range td.numWorkers {
- td.wg.Add(1)
- go func() {
- defer td.wg.Done()
- td.run()
- }()
- }
- }
- // Stop 停止异步任务调度器,等待所有任务完成
- func (td *TaskFuncDispatcher) Stop() {
- close(td.chTasks)
- td.wg.Wait()
- }
- // AddTask 添加任务到任务通道,通道满时需要等待
- func (td *TaskFuncDispatcher) AddTask(task func()) {
- td.chTasks <- task
- }
- // TryAddTask 尝试添加任务到任务通道,通道满时直接返回添加是否成功的结果
- func (td *TaskFuncDispatcher) TryAddTask(task func()) bool {
- select {
- case td.chTasks <- task:
- return true
- default:
- return false
- }
- }
- // run 异步任务调度器的核心逻辑
- func (td *TaskFuncDispatcher) run() {
- safeDo := func(task func()) {
- defer func() {
- if err := recover(); err != nil {
- // 处理 panic,例如记录日志
- log.Errorf("TaskFuncDispatcher panic error %s: %v", td.name, err)
- }
- }()
- task()
- }
- for task := range td.chTasks {
- safeDo(task)
- }
- }
- func example2() {
- taskDispatcher := NewTaskFuncDispatcher("MyTask", 3, 10)
- taskDispatcher.Start()
- taskDispatcher.AddTask(func() {
- // 处理任务1
- })
- taskDispatcher.AddTask(func() {
- // 处理任务2
- })
- taskDispatcher.Stop()
- }
|