From 69ea42500000904f7f95d8d1ca8868e05e56fab4 Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 7 Aug 2020 14:43:11 +0800 Subject: [PATCH] avoid multiply zero on calculation load --- rpcx/internal/balancer/p2c/p2c.go | 40 +++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/rpcx/internal/balancer/p2c/p2c.go b/rpcx/internal/balancer/p2c/p2c.go index d1028155..8194b858 100644 --- a/rpcx/internal/balancer/p2c/p2c.go +++ b/rpcx/internal/balancer/p2c/p2c.go @@ -2,12 +2,16 @@ package p2c import ( "context" + "fmt" "math" "math/rand" + "strings" "sync" "sync/atomic" "time" + "zero/core/logx" + "zero/core/syncx" "zero/core/timex" "zero/rpcx/internal/codes" @@ -24,6 +28,7 @@ const ( throttleSuccess = initSuccess / 2 penalty = int64(math.MaxInt32) pickTimes = 3 + logInterval = time.Minute ) func init() { @@ -54,12 +59,14 @@ func (b *p2cPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) return &p2cPicker{ conns: conns, r: rand.New(rand.NewSource(time.Now().UnixNano())), + stamp: syncx.NewAtomicDuration(), } } type p2cPicker struct { conns []*subConn r *rand.Rand + stamp *syncx.AtomicDuration lock sync.Mutex } @@ -95,6 +102,7 @@ func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) ( } atomic.AddInt64(&chosen.inflight, 1) + atomic.AddInt64(&chosen.requests, 1) return chosen.conn, p.buildDoneFunc(chosen), nil } @@ -102,14 +110,14 @@ func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) { start := int64(timex.Now()) return func(info balancer.DoneInfo) { atomic.AddInt64(&c.inflight, -1) - now := int64(timex.Now()) + now := timex.Now() last := atomic.SwapInt64(&c.last, int64(now)) - td := now - last + td := int64(now) - last if td < 0 { td = 0 } w := math.Exp(float64(-td) / float64(decayTime)) - lag := now - start + lag := int64(now) - start if lag < 0 { lag = 0 } @@ -124,6 +132,13 @@ func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) { } osucc := atomic.LoadUint64(&c.success) atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w))) + + stamp := p.stamp.Load() + if now-stamp >= logInterval { + if p.stamp.CompareAndSwap(stamp, now) { + p.logStats() + } + } } } @@ -147,12 +162,28 @@ func (p *p2cPicker) choose(c1, c2 *subConn) *subConn { } } +func (p *p2cPicker) logStats() { + var stats []string + + p.lock.Lock() + defer p.lock.Unlock() + + for _, conn := range p.conns { + fmt.Println(conn.lag, conn.inflight) + stats = append(stats, fmt.Sprintf("conn: %s, load: %d, reqs: %d", + conn.addr.Addr, conn.load(), atomic.SwapInt64(&conn.requests, 0))) + } + + logx.Statf("p2c - %s", strings.Join(stats, "; ")) +} + type subConn struct { addr resolver.Address conn balancer.SubConn lag uint64 inflight int64 success uint64 + requests int64 last int64 pick int64 } @@ -162,8 +193,9 @@ func (c *subConn) healthy() bool { } func (c *subConn) load() int64 { + // plus one to avoid multiply zero lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1))) - load := lag * atomic.LoadInt64(&c.inflight) + load := lag * (atomic.LoadInt64(&c.inflight) + 1) if load == 0 { return penalty } else {