From 4b9066eda664bde0f7283ce2b75ad035448d77db Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sat, 1 Oct 2022 20:55:25 +0800 Subject: [PATCH] chore: better shedding algorithm, make sure recover from shedding (#2476) * backup * chore: better shedding algorithm, make sure recover from shedding --- core/load/adaptiveshedder.go | 26 +++++++++++++++----------- core/load/adaptiveshedder_test.go | 10 +++++++--- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/core/load/adaptiveshedder.go b/core/load/adaptiveshedder.go index c25743b9..f925ec4d 100644 --- a/core/load/adaptiveshedder.go +++ b/core/load/adaptiveshedder.go @@ -17,7 +17,7 @@ import ( const ( defaultBuckets = 50 defaultWindow = time.Second * 5 - // using 1000m notation, 900m is like 80%, keep it as var for unit test + // using 1000m notation, 900m is like 90%, keep it as var for unit test defaultCpuThreshold = 900 defaultMinRt = float64(time.Second / time.Millisecond) // moving average hyperparameter beta for calculating requests on the fly @@ -70,7 +70,7 @@ type ( flying int64 avgFlying float64 avgFlyingLock syncx.SpinLock - dropTime *syncx.AtomicDuration + overloadTime *syncx.AtomicDuration droppedRecently *syncx.AtomicBool passCounter *collection.RollingWindow rtCounter *collection.RollingWindow @@ -106,7 +106,7 @@ func NewAdaptiveShedder(opts ...ShedderOption) Shedder { return &adaptiveShedder{ cpuThreshold: options.cpuThreshold, windows: int64(time.Second / bucketDuration), - dropTime: syncx.NewAtomicDuration(), + overloadTime: syncx.NewAtomicDuration(), droppedRecently: syncx.NewAtomicBool(), passCounter: collection.NewRollingWindow(options.buckets, bucketDuration, collection.IgnoreCurrentBucket()), @@ -118,7 +118,6 @@ func NewAdaptiveShedder(opts ...ShedderOption) Shedder { // Allow implements Shedder.Allow. func (as *adaptiveShedder) Allow() (Promise, error) { if as.shouldDrop() { - as.dropTime.Set(timex.Now()) as.droppedRecently.Set(true) return nil, ErrServiceOverloaded @@ -215,21 +214,26 @@ func (as *adaptiveShedder) stillHot() bool { return false } - dropTime := as.dropTime.Load() - if dropTime == 0 { + overloadTime := as.overloadTime.Load() + if overloadTime == 0 { return false } - hot := timex.Since(dropTime) < coolOffDuration - if !hot { - as.droppedRecently.Set(false) + if timex.Since(overloadTime) < coolOffDuration { + return true } - return hot + as.droppedRecently.Set(false) + return false } func (as *adaptiveShedder) systemOverloaded() bool { - return systemOverloadChecker(as.cpuThreshold) + if !systemOverloadChecker(as.cpuThreshold) { + return false + } + + as.overloadTime.Set(timex.Now()) + return true } // WithBuckets customizes the Shedder with given number of buckets. diff --git a/core/load/adaptiveshedder_test.go b/core/load/adaptiveshedder_test.go index cb711bbe..94789322 100644 --- a/core/load/adaptiveshedder_test.go +++ b/core/load/adaptiveshedder_test.go @@ -13,6 +13,7 @@ import ( "github.com/zeromicro/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/stat" "github.com/zeromicro/go-zero/core/syncx" + "github.com/zeromicro/go-zero/core/timex" ) const ( @@ -136,7 +137,7 @@ func TestAdaptiveShedderShouldDrop(t *testing.T) { passCounter: passCounter, rtCounter: rtCounter, windows: buckets, - dropTime: syncx.NewAtomicDuration(), + overloadTime: syncx.NewAtomicDuration(), droppedRecently: syncx.NewAtomicBool(), } // cpu >= 800, inflight < maxPass @@ -190,12 +191,15 @@ func TestAdaptiveShedderStillHot(t *testing.T) { passCounter: passCounter, rtCounter: rtCounter, windows: buckets, - dropTime: syncx.NewAtomicDuration(), + overloadTime: syncx.NewAtomicDuration(), droppedRecently: syncx.ForAtomicBool(true), } assert.False(t, shedder.stillHot()) - shedder.dropTime.Set(-coolOffDuration * 2) + shedder.overloadTime.Set(-coolOffDuration * 2) assert.False(t, shedder.stillHot()) + shedder.droppedRecently.Set(true) + shedder.overloadTime.Set(timex.Now()) + assert.True(t, shedder.stillHot()) } func BenchmarkAdaptiveShedder_Allow(b *testing.B) {