123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- // 协程池
- // 实现目标:
- // 模块使用时,不用每个模块不断启动新协程
- // 需要协程处理业务时,获取一个协程(Get),处理事情(Do或 <-func()),处理完,返还协程(Put)
- // 修改时间: 20241128
- package goroutine
- import (
- "fmt"
- "leafstalk/otherutils/snowflake"
- "sync"
- "time"
- )
- type Task func()
- type Worker struct {
- id int64
- taskChan chan Task
- quitChan chan struct{}
- lastUsed int64
- }
- func NewWorker(id int64) *Worker {
- return &Worker{
- id: id,
- taskChan: make(chan Task),
- quitChan: make(chan struct{}),
- lastUsed: time.Now().Unix(),
- }
- }
- func (w *Worker) Start(wg *sync.WaitGroup) {
- wg.Add(1)
- go func() {
- defer wg.Done()
- for {
- select {
- case task := <-w.taskChan:
- w.safeDo(task)
- case <-w.quitChan:
- return
- }
- }
- }()
- }
- func (w *Worker) safeDo(task Task) {
- defer func() {
- if err := recover(); err != nil {
- fmt.Println("Recovered from panic:", err)
- }
- }()
- task()
- w.lastUsed = time.Now().Unix()
- }
- func (w *Worker) Stop() {
- close(w.quitChan)
- }
- func (w *Worker) AsyncDo(task Task) {
- w.taskChan <- task
- }
- type WorkerPool struct {
- workersMap map[int64]*Worker // 使用map存储worker,键为worker的ID
- idleWorkers []int64 // 改为存储worker的ID的切片
- maxIdleWorkers int
- MinIdleWorkers int
- mu sync.Mutex
- wg sync.WaitGroup
- IdSequence *snowflake.Node
- quitChan chan struct{}
- }
- func NewWorkerPool(maxIdleWorkers, minIdleWorkers int) *WorkerPool {
- startTs := time.Date(2020, 1, 1, 0, 0, 0, 0, time.Local)
- seq, err := snowflake.NewNode(0, 0, 22, 1, startTs.Unix())
- if err != nil {
- return nil
- }
- if minIdleWorkers < 0 || maxIdleWorkers <= 0 || minIdleWorkers > maxIdleWorkers {
- return nil
- }
- p := &WorkerPool{
- workersMap: make(map[int64]*Worker),
- idleWorkers: make([]int64, 0, maxIdleWorkers),
- maxIdleWorkers: maxIdleWorkers,
- MinIdleWorkers: minIdleWorkers,
- IdSequence: seq,
- quitChan: make(chan struct{}),
- }
- p.StartIdleCheck()
- return p
- }
- func (p *WorkerPool) Get() *Worker {
- p.mu.Lock()
- defer p.mu.Unlock()
- if len(p.idleWorkers) > 0 {
- id := p.idleWorkers[len(p.idleWorkers)-1]
- p.idleWorkers = p.idleWorkers[:len(p.idleWorkers)-1]
- return p.workersMap[id]
- }
- id := p.IdSequence.Generate().Int64()
- w := NewWorker(id)
- p.workersMap[w.id] = w
- w.Start(&p.wg)
- return w
- }
- func (p *WorkerPool) Put(w *Worker) {
- p.mu.Lock()
- defer p.mu.Unlock()
- if len(p.idleWorkers) < p.maxIdleWorkers {
- p.idleWorkers = append(p.idleWorkers, w.id)
- } else {
- w.Stop()
- delete(p.workersMap, w.id)
- }
- }
- func (p *WorkerPool) AsyncDo(task Task) {
- worker := p.Get()
- worker.taskChan <- func() {
- defer p.Put(worker)
- task()
- }
- }
- func (p *WorkerPool) StartIdleCheck() {
- p.wg.Add(1)
- go func() {
- defer p.wg.Done()
- ticker := time.NewTicker(1 * time.Minute)
- for {
- select {
- case <-ticker.C:
- p.checkIdleWorkers()
- case <-p.quitChan:
- ticker.Stop()
- return
- }
- }
- }()
- }
- func (p *WorkerPool) checkIdleWorkers() {
- p.mu.Lock()
- defer p.mu.Unlock()
- if len(p.idleWorkers) <= p.MinIdleWorkers {
- return
- }
- maxCnt := len(p.idleWorkers) - p.MinIdleWorkers
- now := time.Now().Unix()
- cnt := 0
- lst := p.idleWorkers[:0]
- for i := 0; i < len(p.idleWorkers); i += 1 {
- if cnt >= maxCnt {
- lst = append(lst, p.idleWorkers[i:]...)
- break
- }
- id := p.idleWorkers[i]
- w := p.workersMap[id]
- if now-w.lastUsed > 5*60 {
- w.Stop()
- delete(p.workersMap, id)
- cnt += 1
- } else {
- lst = append(lst, p.idleWorkers[i:]...)
- break
- }
- }
- oldlen := len(p.idleWorkers)
- newLen := len(lst)
- if newLen != oldlen {
- p.idleWorkers = lst
- clear(p.idleWorkers[newLen:oldlen])
- }
- }
- func (p *WorkerPool) Close() {
- p.mu.Lock()
- defer p.mu.Unlock()
- close(p.quitChan)
- // Stop all workers
- for _, w := range p.workersMap {
- w.Stop()
- }
- // Wait for all workers to finish
- p.wg.Wait()
- p.workersMap = make(map[int64]*Worker)
- p.idleWorkers = make([]int64, 0, p.maxIdleWorkers)
- }
- func testWorkerPool() { //testWorkerPool()
- pool := NewWorkerPool(5, 2)
- defer pool.Close()
- for i := 0; i < 100; i++ {
- worker := pool.Get()
- if worker != nil {
- taskId := i
- worker.taskChan <- func() {
- fmt.Printf("Worker %d is processing task %d\n", worker.id, taskId)
- time.Sleep(time.Second) // Simulate work
- pool.Put(worker)
- }
- } else {
- fmt.Println("No available workers")
- }
- time.Sleep(time.Millisecond * 50)
- }
- // Give some time for tasks to complete
- time.Sleep(5 * time.Minute)
- }
|