// 协程池 // 实现目标: // 模块使用时,不用每个模块不断启动新协程 // 需要协程处理业务时,获取一个协程(Get),处理事情(Do或 <-func()),处理完,返还协程(Put) // 修改时间: 20241128 package goroutine import ( "fmt" "leafstalk/otherutils/snowflake" "sync" "time" ) type Task func() type Worker struct { id int64 taskChan chan Task quitChan chan struct{} lastUsed int64 } func NewWorker(id int64) *Worker { return &Worker{ id: id, taskChan: make(chan Task), quitChan: make(chan struct{}), lastUsed: time.Now().Unix(), } } func (w *Worker) Start(wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() for { select { case task := <-w.taskChan: w.safeDo(task) case <-w.quitChan: return } } }() } func (w *Worker) safeDo(task Task) { defer func() { if err := recover(); err != nil { fmt.Println("Recovered from panic:", err) } }() task() w.lastUsed = time.Now().Unix() } func (w *Worker) Stop() { close(w.quitChan) } func (w *Worker) AsyncDo(task Task) { w.taskChan <- task } type WorkerPool struct { workersMap map[int64]*Worker // 使用map存储worker,键为worker的ID idleWorkers []int64 // 改为存储worker的ID的切片 maxIdleWorkers int MinIdleWorkers int mu sync.Mutex wg sync.WaitGroup IdSequence *snowflake.Node quitChan chan struct{} } func NewWorkerPool(maxIdleWorkers, minIdleWorkers int) *WorkerPool { startTs := time.Date(2020, 1, 1, 0, 0, 0, 0, time.Local) seq, err := snowflake.NewNode(0, 0, 22, 1, startTs.Unix()) if err != nil { return nil } if minIdleWorkers < 0 || maxIdleWorkers <= 0 || minIdleWorkers > maxIdleWorkers { return nil } p := &WorkerPool{ workersMap: make(map[int64]*Worker), idleWorkers: make([]int64, 0, maxIdleWorkers), maxIdleWorkers: maxIdleWorkers, MinIdleWorkers: minIdleWorkers, IdSequence: seq, quitChan: make(chan struct{}), } p.StartIdleCheck() return p } func (p *WorkerPool) Get() *Worker { p.mu.Lock() defer p.mu.Unlock() if len(p.idleWorkers) > 0 { id := p.idleWorkers[len(p.idleWorkers)-1] p.idleWorkers = p.idleWorkers[:len(p.idleWorkers)-1] return p.workersMap[id] } id := p.IdSequence.Generate().Int64() w := NewWorker(id) p.workersMap[w.id] = w w.Start(&p.wg) return w } func (p *WorkerPool) Put(w *Worker) { p.mu.Lock() defer p.mu.Unlock() if len(p.idleWorkers) < p.maxIdleWorkers { p.idleWorkers = append(p.idleWorkers, w.id) } else { w.Stop() delete(p.workersMap, w.id) } } func (p *WorkerPool) AsyncDo(task Task) { worker := p.Get() worker.taskChan <- func() { defer p.Put(worker) task() } } func (p *WorkerPool) StartIdleCheck() { p.wg.Add(1) go func() { defer p.wg.Done() ticker := time.NewTicker(1 * time.Minute) for { select { case <-ticker.C: p.checkIdleWorkers() case <-p.quitChan: ticker.Stop() return } } }() } func (p *WorkerPool) checkIdleWorkers() { p.mu.Lock() defer p.mu.Unlock() if len(p.idleWorkers) <= p.MinIdleWorkers { return } maxCnt := len(p.idleWorkers) - p.MinIdleWorkers now := time.Now().Unix() cnt := 0 lst := p.idleWorkers[:0] for i := 0; i < len(p.idleWorkers); i += 1 { if cnt >= maxCnt { lst = append(lst, p.idleWorkers[i:]...) break } id := p.idleWorkers[i] w := p.workersMap[id] if now-w.lastUsed > 5*60 { w.Stop() delete(p.workersMap, id) cnt += 1 } else { lst = append(lst, p.idleWorkers[i:]...) break } } oldlen := len(p.idleWorkers) newLen := len(lst) if newLen != oldlen { p.idleWorkers = lst clear(p.idleWorkers[newLen:oldlen]) } } func (p *WorkerPool) Close() { p.mu.Lock() defer p.mu.Unlock() close(p.quitChan) // Stop all workers for _, w := range p.workersMap { w.Stop() } // Wait for all workers to finish p.wg.Wait() p.workersMap = make(map[int64]*Worker) p.idleWorkers = make([]int64, 0, p.maxIdleWorkers) } func testWorkerPool() { //testWorkerPool() pool := NewWorkerPool(5, 2) defer pool.Close() for i := 0; i < 100; i++ { worker := pool.Get() if worker != nil { taskId := i worker.taskChan <- func() { fmt.Printf("Worker %d is processing task %d\n", worker.id, taskId) time.Sleep(time.Second) // Simulate work pool.Put(worker) } } else { fmt.Println("No available workers") } time.Sleep(time.Millisecond * 50) } // Give some time for tasks to complete time.Sleep(5 * time.Minute) }