diff --git a/core/retry/backoff/backoff_test.go b/core/retry/backoff/backoff_test.go index c6b7483b..73a64471 100644 --- a/core/retry/backoff/backoff_test.go +++ b/core/retry/backoff/backoff_test.go @@ -1,9 +1,10 @@ package backoff import ( - "github.com/stretchr/testify/assert" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestWaitBetween(t *testing.T) { diff --git a/core/retry/options.go b/core/retry/options.go index 115473bf..be4a633d 100644 --- a/core/retry/options.go +++ b/core/retry/options.go @@ -1,14 +1,14 @@ package retry import ( + "time" + "github.com/tal-tech/go-zero/core/retry/backoff" "google.golang.org/grpc/codes" - "time" ) // WithDisable disables the retry behaviour on this call, or this interceptor. -// -// Its semantically the same to `WithMax` +// It's semantically the same to `WithMax(0)` func WithDisable() *CallOption { return WithMax(0) } diff --git a/core/retry/options_test.go b/core/retry/options_test.go index adcf25ae..6ac0bf4b 100644 --- a/core/retry/options_test.go +++ b/core/retry/options_test.go @@ -3,14 +3,15 @@ package retry import ( "context" "errors" + "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/tal-tech/go-zero/core/logx" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "testing" - "time" ) func TestRetryWithDisable(t *testing.T) { @@ -33,7 +34,6 @@ func TestRetryWithBackoff(t *testing.T) { return time.Millisecond })) assert.EqualValues(t, time.Millisecond, retryCallOptions.backoffFunc(1)) - } func TestRetryWithCodes(t *testing.T) { @@ -50,7 +50,6 @@ func TestRetryWithPerRetryTimeout(t *testing.T) { } func Test_waitRetryBackoff(t *testing.T) { - opt := &options{perCallTimeout: time.Second, backoffFunc: func(attempt int) time.Duration { return time.Second }} @@ -76,7 +75,6 @@ func Test_perCallContext(t *testing.T) { md, ok := metadata.FromOutgoingContext(callContext) assert.True(t, ok) assert.EqualValues(t, metadata.MD{"1": {"1"}, AttemptMetadataKey: {"1"}}, md) - } func Test_filterCallOptions(t *testing.T) { @@ -88,5 +86,4 @@ func Test_filterCallOptions(t *testing.T) { }) assert.EqualValues(t, []grpc.CallOption{grpcEmptyCallOpt}, options) assert.EqualValues(t, []*CallOption{retryCallOpt}, retryCallOptions) - } diff --git a/core/retry/retryinterceptor.go b/core/retry/retryinterceptor.go index f85bb2f7..435a1979 100644 --- a/core/retry/retryinterceptor.go +++ b/core/retry/retryinterceptor.go @@ -2,16 +2,15 @@ package retry import ( "context" + "strconv" + "time" + "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/retry/backoff" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - - "strconv" - "time" ) const AttemptMetadataKey = "x-retry-attempt" @@ -38,6 +37,7 @@ type ( codes []codes.Code backoffFunc backoff.Func } + // CallOption is a grpc.CallOption that is local to grpc retry. CallOption struct { grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic. @@ -65,6 +65,7 @@ func waitRetryBackoff(logger logx.Logger, attempt int, ctx context.Context, retr } } } + return nil } @@ -73,11 +74,13 @@ func isRetriable(err error, retryOptions *options) bool { if isContextError(err) { return false } + for _, code := range retryOptions.codes { if code == errCode { return true } } + return false } @@ -90,6 +93,7 @@ func reuseOrNewWithCallOptions(opt *options, retryCallOptions []*CallOption) *op if len(retryCallOptions) == 0 { return opt } + return parseRetryCallOptions(opt, retryCallOptions...) } @@ -97,6 +101,7 @@ func parseRetryCallOptions(opt *options, opts ...*CallOption) *options { for _, option := range opts { option.apply(opt) } + return opt } @@ -122,7 +127,7 @@ func extractIncomingAndClone(ctx context.Context) metadata.MD { if !ok { return metadata.MD{} } - // clone + return md.Copy() } @@ -134,6 +139,7 @@ func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOp grpcOptions = append(grpcOptions, opt) } } + return grpcOptions, retryOptions } @@ -145,6 +151,7 @@ func Do(ctx context.Context, call func(ctx context.Context, opts ...grpc.CallOpt if callOpts.max == 0 { return call(ctx, opts...) } + var lastErr error for attempt := 0; attempt <= callOpts.max; attempt++ { if err := waitRetryBackoff(logger, attempt, ctx, callOpts); err != nil { @@ -157,6 +164,7 @@ func Do(ctx context.Context, call func(ctx context.Context, opts ...grpc.CallOpt if lastErr == nil { return nil } + if attempt == 0 { logger.Errorf("grpc call failed, got err: %v", lastErr) } else { @@ -175,5 +183,6 @@ func Do(ctx context.Context, call func(ctx context.Context, opts ...grpc.CallOpt return lastErr } } + return lastErr } diff --git a/core/retry/retryinterceptor_test.go b/core/retry/retryinterceptor_test.go index f5a5756f..d5c3043b 100644 --- a/core/retry/retryinterceptor_test.go +++ b/core/retry/retryinterceptor_test.go @@ -2,11 +2,12 @@ package retry import ( "context" + "testing" + "github.com/stretchr/testify/assert" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "testing" ) func TestDo(t *testing.T) { @@ -16,10 +17,8 @@ func TestDo(t *testing.T) { err := Do(context.Background(), func(ctx context.Context, opts ...grpc.CallOption) error { count++ return status.Error(codes.ResourceExhausted, "ResourceExhausted") - }, WithMax(i)) assert.Error(t, err) assert.Equal(t, i+1, count) } - } diff --git a/zrpc/config.go b/zrpc/config.go index 36cbcdba..07b72724 100644 --- a/zrpc/config.go +++ b/zrpc/config.go @@ -18,7 +18,7 @@ type ( // setting 0 means no timeout Timeout int64 `json:",default=2000"` CpuThreshold int64 `json:",default=900,range=[0:1000]"` - MaxRetries int `json:",range=[0:]"` + MaxRetries int `json:",default=0,range=[0:]"` } // A RpcClientConf is a rpc client config. diff --git a/zrpc/internal/clientinterceptors/retryinterceptor.go b/zrpc/internal/clientinterceptors/retryinterceptor.go index a5cdee6c..55ba26f6 100644 --- a/zrpc/internal/clientinterceptors/retryinterceptor.go +++ b/zrpc/internal/clientinterceptors/retryinterceptor.go @@ -2,16 +2,22 @@ package clientinterceptors import ( "context" + "github.com/tal-tech/go-zero/core/retry" "google.golang.org/grpc" ) // RetryInterceptor retry interceptor func RetryInterceptor(enable bool) grpc.UnaryClientInterceptor { - return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - if !enable { + if !enable { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return invoker(ctx, method, req, reply, cc, opts...) } + } + + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return retry.Do(ctx, func(ctx context.Context, callOpts ...grpc.CallOption) error { return invoker(ctx, method, req, reply, cc, callOpts...) }, opts...) diff --git a/zrpc/internal/clientinterceptors/retryinterceptor_test.go b/zrpc/internal/clientinterceptors/retryinterceptor_test.go index 7952cb78..d8274105 100644 --- a/zrpc/internal/clientinterceptors/retryinterceptor_test.go +++ b/zrpc/internal/clientinterceptors/retryinterceptor_test.go @@ -2,12 +2,13 @@ package clientinterceptors import ( "context" + "testing" + "github.com/stretchr/testify/assert" "github.com/tal-tech/go-zero/core/retry" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "testing" ) func TestRetryInterceptor_WithMax(t *testing.T) { @@ -16,12 +17,12 @@ func TestRetryInterceptor_WithMax(t *testing.T) { count := 0 cc := new(grpc.ClientConn) err := RetryInterceptor(true)(context.Background(), "/1", nil, nil, cc, - func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, + opts ...grpc.CallOption) error { count++ return status.Error(codes.ResourceExhausted, "ResourceExhausted") }, retry.WithMax(i)) assert.Error(t, err) assert.Equal(t, i+1, count) } - } diff --git a/zrpc/internal/serverinterceptors/retryinterceptor.go b/zrpc/internal/serverinterceptors/retryinterceptor.go index 401d099f..aacaf5cc 100644 --- a/zrpc/internal/serverinterceptors/retryinterceptor.go +++ b/zrpc/internal/serverinterceptors/retryinterceptor.go @@ -2,17 +2,19 @@ package serverinterceptors import ( "context" + "strconv" + "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/retry" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "strconv" ) func RetryInterceptor(maxAttempt int) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler) (resp interface{}, err error) { var md metadata.MD requestMd, ok := metadata.FromIncomingContext(ctx) if ok { diff --git a/zrpc/internal/serverinterceptors/retryinterceptor_test.go b/zrpc/internal/serverinterceptors/retryinterceptor_test.go index 1f0c601b..c2ea4afd 100644 --- a/zrpc/internal/serverinterceptors/retryinterceptor_test.go +++ b/zrpc/internal/serverinterceptors/retryinterceptor_test.go @@ -2,17 +2,20 @@ package serverinterceptors import ( "context" + "testing" + "github.com/stretchr/testify/assert" "github.com/tal-tech/go-zero/core/retry" "google.golang.org/grpc/metadata" - "testing" ) func TestRetryInterceptor(t *testing.T) { t.Run("retries exceeded", func(t *testing.T) { interceptor := RetryInterceptor(2) - ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{retry.AttemptMetadataKey: "3"})) - resp, err := interceptor(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + ctx := metadata.NewIncomingContext(context.Background(), + metadata.New(map[string]string{retry.AttemptMetadataKey: "3"})) + resp, err := interceptor(ctx, nil, nil, func(ctx context.Context, + req interface{}) (interface{}, error) { return nil, nil }) assert.Error(t, err) @@ -21,8 +24,10 @@ func TestRetryInterceptor(t *testing.T) { t.Run("reasonable retries", func(t *testing.T) { interceptor := RetryInterceptor(2) - ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{retry.AttemptMetadataKey: "2"})) - resp, err := interceptor(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + ctx := metadata.NewIncomingContext(context.Background(), + metadata.New(map[string]string{retry.AttemptMetadataKey: "2"})) + resp, err := interceptor(ctx, nil, nil, func(ctx context.Context, + req interface{}) (interface{}, error) { return nil, nil }) assert.NoError(t, err) @@ -30,11 +35,11 @@ func TestRetryInterceptor(t *testing.T) { }) t.Run("no retries", func(t *testing.T) { interceptor := RetryInterceptor(0) - resp, err := interceptor(context.Background(), nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { - return nil, nil - }) + resp, err := interceptor(context.Background(), nil, nil, + func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) assert.NoError(t, err) assert.Nil(t, resp) }) - }