You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
143 lines
3.4 KiB
143 lines
3.4 KiB
/* |
|
Copyright 2014 The Kubernetes Authors. |
|
|
|
Licensed under the Apache License, Version 2.0 (the "License"); |
|
you may not use this file except in compliance with the License. |
|
You may obtain a copy of the License at |
|
|
|
http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
Unless required by applicable law or agreed to in writing, software |
|
distributed under the License is distributed on an "AS IS" BASIS, |
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
See the License for the specific language governing permissions and |
|
limitations under the License. |
|
*/ |
|
|
|
package flowcontrol |
|
|
|
import ( |
|
"sync" |
|
"time" |
|
|
|
"golang.org/x/time/rate" |
|
) |
|
|
|
type RateLimiter interface { |
|
// TryAccept returns true if a token is taken immediately. Otherwise, |
|
// it returns false. |
|
TryAccept() bool |
|
// Accept returns once a token becomes available. |
|
Accept() |
|
// Stop stops the rate limiter, subsequent calls to CanAccept will return false |
|
Stop() |
|
// QPS returns QPS of this rate limiter |
|
QPS() float32 |
|
} |
|
|
|
type tokenBucketRateLimiter struct { |
|
limiter *rate.Limiter |
|
clock Clock |
|
qps float32 |
|
} |
|
|
|
// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach. |
|
// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a |
|
// smoothed qps rate of 'qps'. |
|
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'. |
|
// The maximum number of tokens in the bucket is capped at 'burst'. |
|
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { |
|
limiter := rate.NewLimiter(rate.Limit(qps), burst) |
|
return newTokenBucketRateLimiter(limiter, realClock{}, qps) |
|
} |
|
|
|
// An injectable, mockable clock interface. |
|
type Clock interface { |
|
Now() time.Time |
|
Sleep(time.Duration) |
|
} |
|
|
|
type realClock struct{} |
|
|
|
func (realClock) Now() time.Time { |
|
return time.Now() |
|
} |
|
func (realClock) Sleep(d time.Duration) { |
|
time.Sleep(d) |
|
} |
|
|
|
// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter |
|
// but allows an injectable clock, for testing. |
|
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter { |
|
limiter := rate.NewLimiter(rate.Limit(qps), burst) |
|
return newTokenBucketRateLimiter(limiter, c, qps) |
|
} |
|
|
|
func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter { |
|
return &tokenBucketRateLimiter{ |
|
limiter: limiter, |
|
clock: c, |
|
qps: qps, |
|
} |
|
} |
|
|
|
func (t *tokenBucketRateLimiter) TryAccept() bool { |
|
return t.limiter.AllowN(t.clock.Now(), 1) |
|
} |
|
|
|
// Accept will block until a token becomes available |
|
func (t *tokenBucketRateLimiter) Accept() { |
|
now := t.clock.Now() |
|
t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now)) |
|
} |
|
|
|
func (t *tokenBucketRateLimiter) Stop() { |
|
} |
|
|
|
func (t *tokenBucketRateLimiter) QPS() float32 { |
|
return t.qps |
|
} |
|
|
|
type fakeAlwaysRateLimiter struct{} |
|
|
|
func NewFakeAlwaysRateLimiter() RateLimiter { |
|
return &fakeAlwaysRateLimiter{} |
|
} |
|
|
|
func (t *fakeAlwaysRateLimiter) TryAccept() bool { |
|
return true |
|
} |
|
|
|
func (t *fakeAlwaysRateLimiter) Stop() {} |
|
|
|
func (t *fakeAlwaysRateLimiter) Accept() {} |
|
|
|
func (t *fakeAlwaysRateLimiter) QPS() float32 { |
|
return 1 |
|
} |
|
|
|
type fakeNeverRateLimiter struct { |
|
wg sync.WaitGroup |
|
} |
|
|
|
func NewFakeNeverRateLimiter() RateLimiter { |
|
rl := fakeNeverRateLimiter{} |
|
rl.wg.Add(1) |
|
return &rl |
|
} |
|
|
|
func (t *fakeNeverRateLimiter) TryAccept() bool { |
|
return false |
|
} |
|
|
|
func (t *fakeNeverRateLimiter) Stop() { |
|
t.wg.Done() |
|
} |
|
|
|
func (t *fakeNeverRateLimiter) Accept() { |
|
t.wg.Wait() |
|
} |
|
|
|
func (t *fakeNeverRateLimiter) QPS() float32 { |
|
return 1 |
|
}
|
|
|