package otherutils import ( "context" "errors" "sync/atomic" "time" ) var ( // ErrTokenBucketClosed 表示令牌桶已关闭 ErrTokenBucketClosed = errors.New("token bucket is closed") // ErrAcquireTimeout 表示获取令牌超时 ErrAcquireTimeout = errors.New("acquire token timeout") ) // RoutineTokens 是一个用于控制协程数量的令牌桶 // 它是并发安全的,可以用于限制同时运行的协程数量 type RoutineTokens struct { semaRoutine chan struct{} closed atomic.Bool } // NewRoutineTokens 创建一个新的令牌桶,tokenNum指定最大协程数 // 如果tokenNum <= 0,则默认设置为1 func NewRoutineTokens(tokenNum int) *RoutineTokens { if tokenNum <= 0 { tokenNum = 1 } return &RoutineTokens{ semaRoutine: make(chan struct{}, tokenNum), } } // Acquire 获取一个令牌,如果没有可用令牌会阻塞等待 // 如果令牌桶已关闭,返回ErrTokenBucketClosed func (m *RoutineTokens) Acquire() error { if m.closed.Load() { return ErrTokenBucketClosed } m.semaRoutine <- struct{}{} return nil } // AcquireWithTimeout 尝试在指定时间内获取令牌 // timeout为等待超时时间,如果为0则立即返回 func (m *RoutineTokens) AcquireWithTimeout(timeout time.Duration) error { if m.closed.Load() { return ErrTokenBucketClosed } timer := time.NewTimer(timeout) defer timer.Stop() select { case m.semaRoutine <- struct{}{}: return nil case <-timer.C: return ErrAcquireTimeout } } // AcquireWithContext 尝试在context取消前获取令牌 func (m *RoutineTokens) AcquireWithContext(ctx context.Context) error { if m.closed.Load() { return ErrTokenBucketClosed } select { case m.semaRoutine <- struct{}{}: return nil case <-ctx.Done(): return ctx.Err() } } // TryAcquire 尝试获取一个令牌,如果没有可用令牌立即返回false // 如果令牌桶已关闭,返回error func (m *RoutineTokens) TryAcquire() (bool, error) { if m.closed.Load() { return false, ErrTokenBucketClosed } select { case m.semaRoutine <- struct{}{}: return true, nil default: return false, nil } } // Release 释放一个令牌 func (m *RoutineTokens) Release() { select { case <-m.semaRoutine: default: } } // Close 关闭令牌桶,释放资源 // 关闭后的令牌桶不能再使用 // 此方法是幂等的,可以安全地多次调用 func (m *RoutineTokens) Close() { if !m.closed.Swap(true) { close(m.semaRoutine) } } // Available 返回当前可用的令牌数量 func (m *RoutineTokens) Available() int { return cap(m.semaRoutine) - len(m.semaRoutine) }