You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
60 lines
1.3 KiB
60 lines
1.3 KiB
package limit |
|
|
|
import ( |
|
"context" |
|
"time" |
|
|
|
"go-common/library/container/queue/aqm" |
|
"go-common/library/log" |
|
"go-common/library/rate" |
|
"go-common/library/rate/vegas" |
|
) |
|
|
|
var _ rate.Limiter = &Limiter{} |
|
|
|
// New returns a new Limiter that allows events up to adaptive rtt. |
|
func New(c *aqm.Config) *Limiter { |
|
l := &Limiter{ |
|
rate: vegas.New(), |
|
queue: aqm.New(c), |
|
} |
|
go func() { |
|
ticker := time.NewTicker(time.Second * 1) |
|
defer ticker.Stop() |
|
for { |
|
<-ticker.C |
|
v := l.rate.Stat() |
|
q := l.queue.Stat() |
|
log.Info("rate/limit: limit(%d) inFlight(%d) minRtt(%v) rtt(%v) codel packets(%d)", v.Limit, v.InFlight, v.MinRTT, v.LastRTT, q.Packets) |
|
} |
|
}() |
|
return l |
|
} |
|
|
|
// Limiter use tcp vegas + codel for adaptive limit. |
|
type Limiter struct { |
|
rate *vegas.Vegas |
|
queue *aqm.Queue |
|
} |
|
|
|
// Allow immplemnet rate.Limiter. |
|
// if error is returned,no need to call done() |
|
func (l *Limiter) Allow(ctx context.Context) (func(rate.Op), error) { |
|
var ( |
|
done func(time.Time, rate.Op) |
|
err error |
|
ok bool |
|
) |
|
if done, ok = l.rate.Acquire(); !ok { |
|
// NOTE exceed max inflight, use queue |
|
if err = l.queue.Push(ctx); err != nil { |
|
done(time.Time{}, rate.Ignore) |
|
return func(rate.Op) {}, err |
|
} |
|
} |
|
start := time.Now() |
|
return func(op rate.Op) { |
|
done(start, op) |
|
l.queue.Pop() |
|
}, nil |
|
}
|
|
|