pool.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. // 协程池
  2. // 实现目标:
  3. // 模块使用时,不用每个模块不断启动新协程
  4. // 需要协程处理业务时,获取一个协程(Get),处理事情(Do或 <-func()),处理完,返还协程(Put)
  5. // 修改时间: 20241128
  6. package goroutine
  7. import (
  8. "fmt"
  9. "leafstalk/otherutils/snowflake"
  10. "sync"
  11. "time"
  12. )
  13. type Task func()
  14. type Worker struct {
  15. id int64
  16. taskChan chan Task
  17. quitChan chan struct{}
  18. lastUsed int64
  19. }
  20. func NewWorker(id int64) *Worker {
  21. return &Worker{
  22. id: id,
  23. taskChan: make(chan Task),
  24. quitChan: make(chan struct{}),
  25. lastUsed: time.Now().Unix(),
  26. }
  27. }
  28. func (w *Worker) Start(wg *sync.WaitGroup) {
  29. wg.Add(1)
  30. go func() {
  31. defer wg.Done()
  32. for {
  33. select {
  34. case task := <-w.taskChan:
  35. w.safeDo(task)
  36. case <-w.quitChan:
  37. return
  38. }
  39. }
  40. }()
  41. }
  42. func (w *Worker) safeDo(task Task) {
  43. defer func() {
  44. if err := recover(); err != nil {
  45. fmt.Println("Recovered from panic:", err)
  46. }
  47. }()
  48. task()
  49. w.lastUsed = time.Now().Unix()
  50. }
  51. func (w *Worker) Stop() {
  52. close(w.quitChan)
  53. }
  54. func (w *Worker) AsyncDo(task Task) {
  55. w.taskChan <- task
  56. }
  57. type WorkerPool struct {
  58. workersMap map[int64]*Worker // 使用map存储worker,键为worker的ID
  59. idleWorkers []int64 // 改为存储worker的ID的切片
  60. maxIdleWorkers int
  61. MinIdleWorkers int
  62. mu sync.Mutex
  63. wg sync.WaitGroup
  64. IdSequence *snowflake.Node
  65. quitChan chan struct{}
  66. }
  67. func NewWorkerPool(maxIdleWorkers, minIdleWorkers int) *WorkerPool {
  68. startTs := time.Date(2020, 1, 1, 0, 0, 0, 0, time.Local)
  69. seq, err := snowflake.NewNode(0, 0, 22, 1, startTs.Unix())
  70. if err != nil {
  71. return nil
  72. }
  73. if minIdleWorkers < 0 || maxIdleWorkers <= 0 || minIdleWorkers > maxIdleWorkers {
  74. return nil
  75. }
  76. p := &WorkerPool{
  77. workersMap: make(map[int64]*Worker),
  78. idleWorkers: make([]int64, 0, maxIdleWorkers),
  79. maxIdleWorkers: maxIdleWorkers,
  80. MinIdleWorkers: minIdleWorkers,
  81. IdSequence: seq,
  82. quitChan: make(chan struct{}),
  83. }
  84. p.StartIdleCheck()
  85. return p
  86. }
  87. func (p *WorkerPool) Get() *Worker {
  88. p.mu.Lock()
  89. defer p.mu.Unlock()
  90. if len(p.idleWorkers) > 0 {
  91. id := p.idleWorkers[len(p.idleWorkers)-1]
  92. p.idleWorkers = p.idleWorkers[:len(p.idleWorkers)-1]
  93. return p.workersMap[id]
  94. }
  95. id := p.IdSequence.Generate().Int64()
  96. w := NewWorker(id)
  97. p.workersMap[w.id] = w
  98. w.Start(&p.wg)
  99. return w
  100. }
  101. func (p *WorkerPool) Put(w *Worker) {
  102. p.mu.Lock()
  103. defer p.mu.Unlock()
  104. if len(p.idleWorkers) < p.maxIdleWorkers {
  105. p.idleWorkers = append(p.idleWorkers, w.id)
  106. } else {
  107. w.Stop()
  108. delete(p.workersMap, w.id)
  109. }
  110. }
  111. func (p *WorkerPool) AsyncDo(task Task) {
  112. worker := p.Get()
  113. worker.taskChan <- func() {
  114. defer p.Put(worker)
  115. task()
  116. }
  117. }
  118. func (p *WorkerPool) StartIdleCheck() {
  119. p.wg.Add(1)
  120. go func() {
  121. defer p.wg.Done()
  122. ticker := time.NewTicker(1 * time.Minute)
  123. for {
  124. select {
  125. case <-ticker.C:
  126. p.checkIdleWorkers()
  127. case <-p.quitChan:
  128. ticker.Stop()
  129. return
  130. }
  131. }
  132. }()
  133. }
  134. func (p *WorkerPool) checkIdleWorkers() {
  135. p.mu.Lock()
  136. defer p.mu.Unlock()
  137. if len(p.idleWorkers) <= p.MinIdleWorkers {
  138. return
  139. }
  140. maxCnt := len(p.idleWorkers) - p.MinIdleWorkers
  141. now := time.Now().Unix()
  142. cnt := 0
  143. lst := p.idleWorkers[:0]
  144. for i := 0; i < len(p.idleWorkers); i += 1 {
  145. if cnt >= maxCnt {
  146. lst = append(lst, p.idleWorkers[i:]...)
  147. break
  148. }
  149. id := p.idleWorkers[i]
  150. w := p.workersMap[id]
  151. if now-w.lastUsed > 5*60 {
  152. w.Stop()
  153. delete(p.workersMap, id)
  154. cnt += 1
  155. } else {
  156. lst = append(lst, p.idleWorkers[i:]...)
  157. break
  158. }
  159. }
  160. oldlen := len(p.idleWorkers)
  161. newLen := len(lst)
  162. if newLen != oldlen {
  163. p.idleWorkers = lst
  164. clear(p.idleWorkers[newLen:oldlen])
  165. }
  166. }
  167. func (p *WorkerPool) Close() {
  168. p.mu.Lock()
  169. defer p.mu.Unlock()
  170. close(p.quitChan)
  171. // Stop all workers
  172. for _, w := range p.workersMap {
  173. w.Stop()
  174. }
  175. // Wait for all workers to finish
  176. p.wg.Wait()
  177. p.workersMap = make(map[int64]*Worker)
  178. p.idleWorkers = make([]int64, 0, p.maxIdleWorkers)
  179. }
  180. func testWorkerPool() { //testWorkerPool()
  181. pool := NewWorkerPool(5, 2)
  182. defer pool.Close()
  183. for i := 0; i < 100; i++ {
  184. worker := pool.Get()
  185. if worker != nil {
  186. taskId := i
  187. worker.taskChan <- func() {
  188. fmt.Printf("Worker %d is processing task %d\n", worker.id, taskId)
  189. time.Sleep(time.Second) // Simulate work
  190. pool.Put(worker)
  191. }
  192. } else {
  193. fmt.Println("No available workers")
  194. }
  195. time.Sleep(time.Millisecond * 50)
  196. }
  197. // Give some time for tasks to complete
  198. time.Sleep(5 * time.Minute)
  199. }