123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- // Package chat
- // 通道传递数据,调用指定的协程函数进行处理
- // 1-N个协程处理一种事情
- package coroutine
- import (
- "leafstalk/log"
- "sync"
- )
- // DataTaskRoutine 异步任务协程类
- type DataTaskRoutine[T any] struct {
- name string
- numWorkers int
- chTasks chan T
- wg sync.WaitGroup
- // cancel context.CancelFunc
- taskFunc func(T) // 任务函数,接收数据作为参数
- }
- // NewDataTaskRoutine 创建一个异步任务协程实例
- func NewDataTaskRoutine[T any](name string, numWorkers int, bufferSize int, taskFunc func(T)) *DataTaskRoutine[T] {
- return &DataTaskRoutine[T]{
- name: name,
- numWorkers: numWorkers,
- chTasks: make(chan T, bufferSize),
- taskFunc: taskFunc,
- }
- }
- // Start 启动异步任务协程
- func (tr *DataTaskRoutine[T]) Start() {
- // ctx, cancel := context.WithCancel(context.Background())
- // tr.cancel = cancel
- for range tr.numWorkers {
- tr.wg.Add(1)
- go func() {
- defer tr.wg.Done()
- tr.run()
- }()
- }
- }
- // Stop 停止异步任务协程,等待协程退出
- func (tr *DataTaskRoutine[T]) Stop() {
- close(tr.chTasks)
- tr.wg.Wait()
- // tr.cancel()
- }
- // AddTask 添加任务到任务通道,通道满时需要等待
- func (tr *DataTaskRoutine[T]) AddTask(task T) {
- tr.chTasks <- task
- }
- // TryAddTask 尝试添加任务到任务通道,通道满时直接返回添加是否成功的结果
- func (tr *DataTaskRoutine[T]) TryAddTask(task T) bool {
- select {
- case tr.chTasks <- task:
- return true
- default:
- return false
- }
- }
- func (tr *DataTaskRoutine[T]) TryGetTask(n int) []T {
- var lst []T
- for range n {
- select {
- case t := <-tr.chTasks:
- lst = append(lst, t)
- default:
- return lst
- }
- }
- return lst
- }
- // run 异步任务协程的核心逻辑
- func (tr *DataTaskRoutine[T]) run() {
- safeDo := func(task T) {
- defer func() {
- if err := recover(); err != nil {
- // 处理 panic,例如记录日志
- log.Errorf("DataTaskRoutine Recovered panic %s: %v", tr.name, err)
- }
- }()
- tr.taskFunc(task)
- }
- for task := range tr.chTasks {
- if tr.taskFunc != nil {
- safeDo(task)
- }
- }
- }
- func DataTaskRoutineExample() {
- DataTaskRoutine := NewDataTaskRoutine[string]("MyTask", 3, 10, func(task string) {
- // 处理任务
- })
- DataTaskRoutine.Start()
- DataTaskRoutine.AddTask("data1")
- DataTaskRoutine.AddTask("data2")
- DataTaskRoutine.Stop()
- }
|