optimize: shedding algorithm performance (#3908)

This commit is contained in:
Kevin Wan
2024-02-15 20:22:22 +08:00
committed by GitHub
parent 8ceb2885db
commit 9c17499757
8 changed files with 605 additions and 18 deletions

View File

@@ -9,6 +9,7 @@ import (
"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex"
@@ -21,8 +22,11 @@ const (
defaultCpuThreshold = 900
defaultMinRt = float64(time.Second / time.Millisecond)
// moving average hyperparameter beta for calculating requests on the fly
flyingBeta = 0.9
coolOffDuration = time.Second
flyingBeta = 0.9
coolOffDuration = time.Second
cpuMax = 1000 // millicpu
millisecondsPerSecond = 1000
overloadFactorLowerBound = 0.1
)
var (
@@ -66,7 +70,7 @@ type (
adaptiveShedder struct {
cpuThreshold int64
windows int64
windowScale float64
flying int64
avgFlying float64
avgFlyingLock syncx.SpinLock
@@ -105,7 +109,7 @@ func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
bucketDuration := options.window / time.Duration(options.buckets)
return &adaptiveShedder{
cpuThreshold: options.cpuThreshold,
windows: int64(time.Second / bucketDuration),
windowScale: float64(time.Second) / float64(bucketDuration) / millisecondsPerSecond,
overloadTime: syncx.NewAtomicDuration(),
droppedRecently: syncx.NewAtomicBool(),
passCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
@@ -149,16 +153,17 @@ func (as *adaptiveShedder) highThru() bool {
as.avgFlyingLock.Lock()
avgFlying := as.avgFlying
as.avgFlyingLock.Unlock()
maxFlight := as.maxFlight()
return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight
maxFlight := as.maxFlight() * as.overloadFactor()
return avgFlying > maxFlight && float64(atomic.LoadInt64(&as.flying)) > maxFlight
}
func (as *adaptiveShedder) maxFlight() int64 {
func (as *adaptiveShedder) maxFlight() float64 {
// windows = buckets per second
// maxQPS = maxPASS * windows
// minRT = min average response time in milliseconds
// maxQPS * minRT / milliseconds_per_second
return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))
// allowedFlying = maxQPS * minRT / milliseconds_per_second
maxFlight := float64(as.maxPass()) * as.minRt() * as.windowScale
return mathx.AtLeast(maxFlight, 1)
}
func (as *adaptiveShedder) maxPass() int64 {
@@ -174,6 +179,8 @@ func (as *adaptiveShedder) maxPass() int64 {
}
func (as *adaptiveShedder) minRt() float64 {
// if no requests in previous windows, return defaultMinRt,
// its a reasonable large value to avoid dropping requests.
result := defaultMinRt
as.rtCounter.Reduce(func(b *collection.Bucket) {
@@ -190,6 +197,13 @@ func (as *adaptiveShedder) minRt() float64 {
return result
}
func (as *adaptiveShedder) overloadFactor() float64 {
// as.cpuThreshold must be less than cpuMax
factor := (cpuMax - float64(stat.CpuUsage())) / (cpuMax - float64(as.cpuThreshold))
// at least accept 10% of acceptable requests even cpu is highly overloaded.
return mathx.Between(factor, overloadFactorLowerBound, 1)
}
func (as *adaptiveShedder) shouldDrop() bool {
if as.systemOverloaded() || as.stillHot() {
if as.highThru() {