feat: slow threshold customizable in zrpc (#1191)
* feat: slow threshold customizable in rest * feat: slow threshold customizable in rest * feat: slow threshold customizable in rest * feat: slow threshold customizable in zrpc
This commit is contained in:
@@ -11,6 +11,9 @@ import (
|
|||||||
"github.com/tal-tech/go-zero/rest/router"
|
"github.com/tal-tech/go-zero/rest/router"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// SetSlowThreshold sets the slow threshold.
|
||||||
|
var SetSlowThreshold = handler.SetSlowThreshold
|
||||||
|
|
||||||
type (
|
type (
|
||||||
runOptions struct {
|
runOptions struct {
|
||||||
start func(*engine) error
|
start func(*engine) error
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/tal-tech/go-zero/core/discov"
|
"github.com/tal-tech/go-zero/core/discov"
|
||||||
"github.com/tal-tech/go-zero/zrpc/internal"
|
"github.com/tal-tech/go-zero/zrpc/internal"
|
||||||
"github.com/tal-tech/go-zero/zrpc/internal/auth"
|
"github.com/tal-tech/go-zero/zrpc/internal/auth"
|
||||||
|
"github.com/tal-tech/go-zero/zrpc/internal/clientinterceptors"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -101,3 +102,8 @@ func NewClientWithTarget(target string, opts ...ClientOption) (Client, error) {
|
|||||||
func (rc *RpcClient) Conn() *grpc.ClientConn {
|
func (rc *RpcClient) Conn() *grpc.ClientConn {
|
||||||
return rc.client.Conn()
|
return rc.client.Conn()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetClientSlowThreshold sets the slow threshold on client side.
|
||||||
|
func SetClientSlowThreshold(threshold time.Duration) {
|
||||||
|
clientinterceptors.SetSlowThreshold(threshold)
|
||||||
|
}
|
||||||
|
|||||||
@@ -108,6 +108,8 @@ func TestDepositServer_Deposit(t *testing.T) {
|
|||||||
tarConfClient,
|
tarConfClient,
|
||||||
targetClient,
|
targetClient,
|
||||||
}
|
}
|
||||||
|
SetClientSlowThreshold(time.Second)
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
tt := tt
|
tt := tt
|
||||||
for _, client := range clients {
|
for _, client := range clients {
|
||||||
|
|||||||
@@ -6,11 +6,14 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/tal-tech/go-zero/core/logx"
|
||||||
|
"github.com/tal-tech/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/tal-tech/go-zero/core/timex"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
const slowThreshold = time.Millisecond * 500
|
const defaultSlowThreshold = time.Millisecond * 500
|
||||||
|
|
||||||
|
var slowThreshold = syncx.ForAtomicDuration(defaultSlowThreshold)
|
||||||
|
|
||||||
// DurationInterceptor is an interceptor that logs the processing time.
|
// DurationInterceptor is an interceptor that logs the processing time.
|
||||||
func DurationInterceptor(ctx context.Context, method string, req, reply interface{},
|
func DurationInterceptor(ctx context.Context, method string, req, reply interface{},
|
||||||
@@ -23,7 +26,7 @@ func DurationInterceptor(ctx context.Context, method string, req, reply interfac
|
|||||||
serverName, req, err.Error())
|
serverName, req, err.Error())
|
||||||
} else {
|
} else {
|
||||||
elapsed := timex.Since(start)
|
elapsed := timex.Since(start)
|
||||||
if elapsed > slowThreshold {
|
if elapsed > slowThreshold.Load() {
|
||||||
logx.WithContext(ctx).WithDuration(elapsed).Slowf("[RPC] ok - slowcall - %s - %v - %v",
|
logx.WithContext(ctx).WithDuration(elapsed).Slowf("[RPC] ok - slowcall - %s - %v - %v",
|
||||||
serverName, req, reply)
|
serverName, req, reply)
|
||||||
}
|
}
|
||||||
@@ -31,3 +34,8 @@ func DurationInterceptor(ctx context.Context, method string, req, reply interfac
|
|||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetSlowThreshold sets the slow threshold.
|
||||||
|
func SetSlowThreshold(threshold time.Duration) {
|
||||||
|
slowThreshold.Set(threshold)
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
@@ -35,3 +36,9 @@ func TestDurationInterceptor(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSetSlowThreshold(t *testing.T) {
|
||||||
|
assert.Equal(t, defaultSlowThreshold, slowThreshold.Load())
|
||||||
|
SetSlowThreshold(time.Second)
|
||||||
|
assert.Equal(t, time.Second, slowThreshold.Load())
|
||||||
|
}
|
||||||
|
|||||||
@@ -7,12 +7,20 @@ import (
|
|||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/tal-tech/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/tal-tech/go-zero/core/stat"
|
||||||
|
"github.com/tal-tech/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/tal-tech/go-zero/core/timex"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
const serverSlowThreshold = time.Millisecond * 500
|
const defaultSlowThreshold = time.Millisecond * 500
|
||||||
|
|
||||||
|
var slowThreshold = syncx.ForAtomicDuration(defaultSlowThreshold)
|
||||||
|
|
||||||
|
// SetSlowThreshold sets the slow threshold.
|
||||||
|
func SetSlowThreshold(threshold time.Duration) {
|
||||||
|
slowThreshold.Set(threshold)
|
||||||
|
}
|
||||||
|
|
||||||
// UnaryStatInterceptor returns a func that uses given metrics to report stats.
|
// UnaryStatInterceptor returns a func that uses given metrics to report stats.
|
||||||
func UnaryStatInterceptor(metrics *stat.Metrics) grpc.UnaryServerInterceptor {
|
func UnaryStatInterceptor(metrics *stat.Metrics) grpc.UnaryServerInterceptor {
|
||||||
@@ -44,7 +52,7 @@ func logDuration(ctx context.Context, method string, req interface{}, duration t
|
|||||||
content, err := json.Marshal(req)
|
content, err := json.Marshal(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logx.WithContext(ctx).Errorf("%s - %s", addr, err.Error())
|
logx.WithContext(ctx).Errorf("%s - %s", addr, err.Error())
|
||||||
} else if duration > serverSlowThreshold {
|
} else if duration > slowThreshold.Load() {
|
||||||
logx.WithContext(ctx).WithDuration(duration).Slowf("[RPC] slowcall - %s - %s - %s",
|
logx.WithContext(ctx).WithDuration(duration).Slowf("[RPC] slowcall - %s - %s - %s",
|
||||||
addr, method, string(content))
|
addr, method, string(content))
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -13,6 +13,12 @@ import (
|
|||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestSetSlowThreshold(t *testing.T) {
|
||||||
|
assert.Equal(t, defaultSlowThreshold, slowThreshold.Load())
|
||||||
|
SetSlowThreshold(time.Second)
|
||||||
|
assert.Equal(t, time.Second, slowThreshold.Load())
|
||||||
|
}
|
||||||
|
|
||||||
func TestUnaryStatInterceptor(t *testing.T) {
|
func TestUnaryStatInterceptor(t *testing.T) {
|
||||||
metrics := stat.NewMetrics("mock")
|
metrics := stat.NewMetrics("mock")
|
||||||
interceptor := UnaryStatInterceptor(metrics)
|
interceptor := UnaryStatInterceptor(metrics)
|
||||||
|
|||||||
@@ -95,6 +95,11 @@ func (rs *RpcServer) Stop() {
|
|||||||
logx.Close()
|
logx.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetServerSlowThreshold sets the slow threshold on server side.
|
||||||
|
func SetServerSlowThreshold(threshold time.Duration) {
|
||||||
|
serverinterceptors.SetSlowThreshold(threshold)
|
||||||
|
}
|
||||||
|
|
||||||
func setupInterceptors(server internal.Server, c RpcServerConf, metrics *stat.Metrics) error {
|
func setupInterceptors(server internal.Server, c RpcServerConf, metrics *stat.Metrics) error {
|
||||||
if c.CpuThreshold > 0 {
|
if c.CpuThreshold > 0 {
|
||||||
shedder := load.NewAdaptiveShedder(load.WithCpuThreshold(c.CpuThreshold))
|
shedder := load.NewAdaptiveShedder(load.WithCpuThreshold(c.CpuThreshold))
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ func TestServer_setupInterceptors(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestServer(t *testing.T) {
|
func TestServer(t *testing.T) {
|
||||||
|
SetServerSlowThreshold(time.Second)
|
||||||
srv := MustNewServer(RpcServerConf{
|
srv := MustNewServer(RpcServerConf{
|
||||||
ServiceConf: service.ServiceConf{
|
ServiceConf: service.ServiceConf{
|
||||||
Log: logx.LogConf{
|
Log: logx.LogConf{
|
||||||
|
|||||||
Reference in New Issue
Block a user