routinetoken.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package otherutils
  2. import (
  3. "context"
  4. "errors"
  5. "sync/atomic"
  6. "time"
  7. )
  8. var (
  9. // ErrTokenBucketClosed 表示令牌桶已关闭
  10. ErrTokenBucketClosed = errors.New("token bucket is closed")
  11. // ErrAcquireTimeout 表示获取令牌超时
  12. ErrAcquireTimeout = errors.New("acquire token timeout")
  13. )
  14. // RoutineTokens 是一个用于控制协程数量的令牌桶
  15. // 它是并发安全的,可以用于限制同时运行的协程数量
  16. type RoutineTokens struct {
  17. semaRoutine chan struct{}
  18. closed atomic.Bool
  19. }
  20. // NewRoutineTokens 创建一个新的令牌桶,tokenNum指定最大协程数
  21. // 如果tokenNum <= 0,则默认设置为1
  22. func NewRoutineTokens(tokenNum int) *RoutineTokens {
  23. if tokenNum <= 0 {
  24. tokenNum = 1
  25. }
  26. return &RoutineTokens{
  27. semaRoutine: make(chan struct{}, tokenNum),
  28. }
  29. }
  30. // Acquire 获取一个令牌,如果没有可用令牌会阻塞等待
  31. // 如果令牌桶已关闭,返回ErrTokenBucketClosed
  32. func (m *RoutineTokens) Acquire() error {
  33. if m.closed.Load() {
  34. return ErrTokenBucketClosed
  35. }
  36. m.semaRoutine <- struct{}{}
  37. return nil
  38. }
  39. // AcquireWithTimeout 尝试在指定时间内获取令牌
  40. // timeout为等待超时时间,如果为0则立即返回
  41. func (m *RoutineTokens) AcquireWithTimeout(timeout time.Duration) error {
  42. if m.closed.Load() {
  43. return ErrTokenBucketClosed
  44. }
  45. timer := time.NewTimer(timeout)
  46. defer timer.Stop()
  47. select {
  48. case m.semaRoutine <- struct{}{}:
  49. return nil
  50. case <-timer.C:
  51. return ErrAcquireTimeout
  52. }
  53. }
  54. // AcquireWithContext 尝试在context取消前获取令牌
  55. func (m *RoutineTokens) AcquireWithContext(ctx context.Context) error {
  56. if m.closed.Load() {
  57. return ErrTokenBucketClosed
  58. }
  59. select {
  60. case m.semaRoutine <- struct{}{}:
  61. return nil
  62. case <-ctx.Done():
  63. return ctx.Err()
  64. }
  65. }
  66. // TryAcquire 尝试获取一个令牌,如果没有可用令牌立即返回false
  67. // 如果令牌桶已关闭,返回error
  68. func (m *RoutineTokens) TryAcquire() (bool, error) {
  69. if m.closed.Load() {
  70. return false, ErrTokenBucketClosed
  71. }
  72. select {
  73. case m.semaRoutine <- struct{}{}:
  74. return true, nil
  75. default:
  76. return false, nil
  77. }
  78. }
  79. // Release 释放一个令牌
  80. func (m *RoutineTokens) Release() {
  81. select {
  82. case <-m.semaRoutine:
  83. default:
  84. }
  85. }
  86. // Close 关闭令牌桶,释放资源
  87. // 关闭后的令牌桶不能再使用
  88. // 此方法是幂等的,可以安全地多次调用
  89. func (m *RoutineTokens) Close() {
  90. if !m.closed.Swap(true) {
  91. close(m.semaRoutine)
  92. }
  93. }
  94. // Available 返回当前可用的令牌数量
  95. func (m *RoutineTokens) Available() int {
  96. return cap(m.semaRoutine) - len(m.semaRoutine)
  97. }