chore: reorg imports, format code, make MaxRetires default to 0 (#1165)
* chore: reverse the order of stopping services * chore: reverse the order of stopping services * chore: reorg imports, format code * chore: format code, and refactor * feat: change MaxRetries default to 0, disable retry
This commit is contained in:
@@ -1,9 +1,10 @@
|
|||||||
package backoff
|
package backoff
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWaitBetween(t *testing.T) {
|
func TestWaitBetween(t *testing.T) {
|
||||||
|
|||||||
@@ -1,14 +1,14 @@
|
|||||||
package retry
|
package retry
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/retry/backoff"
|
"github.com/tal-tech/go-zero/core/retry/backoff"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// WithDisable disables the retry behaviour on this call, or this interceptor.
|
// WithDisable disables the retry behaviour on this call, or this interceptor.
|
||||||
//
|
// It's semantically the same to `WithMax(0)`
|
||||||
// Its semantically the same to `WithMax`
|
|
||||||
func WithDisable() *CallOption {
|
func WithDisable() *CallOption {
|
||||||
return WithMax(0)
|
return WithMax(0)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,14 +3,15 @@ package retry
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/tal-tech/go-zero/core/logx"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRetryWithDisable(t *testing.T) {
|
func TestRetryWithDisable(t *testing.T) {
|
||||||
@@ -33,7 +34,6 @@ func TestRetryWithBackoff(t *testing.T) {
|
|||||||
return time.Millisecond
|
return time.Millisecond
|
||||||
}))
|
}))
|
||||||
assert.EqualValues(t, time.Millisecond, retryCallOptions.backoffFunc(1))
|
assert.EqualValues(t, time.Millisecond, retryCallOptions.backoffFunc(1))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRetryWithCodes(t *testing.T) {
|
func TestRetryWithCodes(t *testing.T) {
|
||||||
@@ -50,7 +50,6 @@ func TestRetryWithPerRetryTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Test_waitRetryBackoff(t *testing.T) {
|
func Test_waitRetryBackoff(t *testing.T) {
|
||||||
|
|
||||||
opt := &options{perCallTimeout: time.Second, backoffFunc: func(attempt int) time.Duration {
|
opt := &options{perCallTimeout: time.Second, backoffFunc: func(attempt int) time.Duration {
|
||||||
return time.Second
|
return time.Second
|
||||||
}}
|
}}
|
||||||
@@ -76,7 +75,6 @@ func Test_perCallContext(t *testing.T) {
|
|||||||
md, ok := metadata.FromOutgoingContext(callContext)
|
md, ok := metadata.FromOutgoingContext(callContext)
|
||||||
assert.True(t, ok)
|
assert.True(t, ok)
|
||||||
assert.EqualValues(t, metadata.MD{"1": {"1"}, AttemptMetadataKey: {"1"}}, md)
|
assert.EqualValues(t, metadata.MD{"1": {"1"}, AttemptMetadataKey: {"1"}}, md)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_filterCallOptions(t *testing.T) {
|
func Test_filterCallOptions(t *testing.T) {
|
||||||
@@ -88,5 +86,4 @@ func Test_filterCallOptions(t *testing.T) {
|
|||||||
})
|
})
|
||||||
assert.EqualValues(t, []grpc.CallOption{grpcEmptyCallOpt}, options)
|
assert.EqualValues(t, []grpc.CallOption{grpcEmptyCallOpt}, options)
|
||||||
assert.EqualValues(t, []*CallOption{retryCallOpt}, retryCallOptions)
|
assert.EqualValues(t, []*CallOption{retryCallOpt}, retryCallOptions)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,16 +2,15 @@ package retry
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/tal-tech/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/retry/backoff"
|
"github.com/tal-tech/go-zero/core/retry/backoff"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
"strconv"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const AttemptMetadataKey = "x-retry-attempt"
|
const AttemptMetadataKey = "x-retry-attempt"
|
||||||
@@ -38,6 +37,7 @@ type (
|
|||||||
codes []codes.Code
|
codes []codes.Code
|
||||||
backoffFunc backoff.Func
|
backoffFunc backoff.Func
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallOption is a grpc.CallOption that is local to grpc retry.
|
// CallOption is a grpc.CallOption that is local to grpc retry.
|
||||||
CallOption struct {
|
CallOption struct {
|
||||||
grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic.
|
grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic.
|
||||||
@@ -65,6 +65,7 @@ func waitRetryBackoff(logger logx.Logger, attempt int, ctx context.Context, retr
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -73,11 +74,13 @@ func isRetriable(err error, retryOptions *options) bool {
|
|||||||
if isContextError(err) {
|
if isContextError(err) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, code := range retryOptions.codes {
|
for _, code := range retryOptions.codes {
|
||||||
if code == errCode {
|
if code == errCode {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,6 +93,7 @@ func reuseOrNewWithCallOptions(opt *options, retryCallOptions []*CallOption) *op
|
|||||||
if len(retryCallOptions) == 0 {
|
if len(retryCallOptions) == 0 {
|
||||||
return opt
|
return opt
|
||||||
}
|
}
|
||||||
|
|
||||||
return parseRetryCallOptions(opt, retryCallOptions...)
|
return parseRetryCallOptions(opt, retryCallOptions...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -97,6 +101,7 @@ func parseRetryCallOptions(opt *options, opts ...*CallOption) *options {
|
|||||||
for _, option := range opts {
|
for _, option := range opts {
|
||||||
option.apply(opt)
|
option.apply(opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
return opt
|
return opt
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -122,7 +127,7 @@ func extractIncomingAndClone(ctx context.Context) metadata.MD {
|
|||||||
if !ok {
|
if !ok {
|
||||||
return metadata.MD{}
|
return metadata.MD{}
|
||||||
}
|
}
|
||||||
// clone
|
|
||||||
return md.Copy()
|
return md.Copy()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,6 +139,7 @@ func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOp
|
|||||||
grpcOptions = append(grpcOptions, opt)
|
grpcOptions = append(grpcOptions, opt)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return grpcOptions, retryOptions
|
return grpcOptions, retryOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -145,6 +151,7 @@ func Do(ctx context.Context, call func(ctx context.Context, opts ...grpc.CallOpt
|
|||||||
if callOpts.max == 0 {
|
if callOpts.max == 0 {
|
||||||
return call(ctx, opts...)
|
return call(ctx, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
var lastErr error
|
var lastErr error
|
||||||
for attempt := 0; attempt <= callOpts.max; attempt++ {
|
for attempt := 0; attempt <= callOpts.max; attempt++ {
|
||||||
if err := waitRetryBackoff(logger, attempt, ctx, callOpts); err != nil {
|
if err := waitRetryBackoff(logger, attempt, ctx, callOpts); err != nil {
|
||||||
@@ -157,6 +164,7 @@ func Do(ctx context.Context, call func(ctx context.Context, opts ...grpc.CallOpt
|
|||||||
if lastErr == nil {
|
if lastErr == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if attempt == 0 {
|
if attempt == 0 {
|
||||||
logger.Errorf("grpc call failed, got err: %v", lastErr)
|
logger.Errorf("grpc call failed, got err: %v", lastErr)
|
||||||
} else {
|
} else {
|
||||||
@@ -175,5 +183,6 @@ func Do(ctx context.Context, call func(ctx context.Context, opts ...grpc.CallOpt
|
|||||||
return lastErr
|
return lastErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return lastErr
|
return lastErr
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,11 +2,12 @@ package retry
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"testing"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDo(t *testing.T) {
|
func TestDo(t *testing.T) {
|
||||||
@@ -16,10 +17,8 @@ func TestDo(t *testing.T) {
|
|||||||
err := Do(context.Background(), func(ctx context.Context, opts ...grpc.CallOption) error {
|
err := Do(context.Background(), func(ctx context.Context, opts ...grpc.CallOption) error {
|
||||||
count++
|
count++
|
||||||
return status.Error(codes.ResourceExhausted, "ResourceExhausted")
|
return status.Error(codes.ResourceExhausted, "ResourceExhausted")
|
||||||
|
|
||||||
}, WithMax(i))
|
}, WithMax(i))
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
assert.Equal(t, i+1, count)
|
assert.Equal(t, i+1, count)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ type (
|
|||||||
// setting 0 means no timeout
|
// setting 0 means no timeout
|
||||||
Timeout int64 `json:",default=2000"`
|
Timeout int64 `json:",default=2000"`
|
||||||
CpuThreshold int64 `json:",default=900,range=[0:1000]"`
|
CpuThreshold int64 `json:",default=900,range=[0:1000]"`
|
||||||
MaxRetries int `json:",range=[0:]"`
|
MaxRetries int `json:",default=0,range=[0:]"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// A RpcClientConf is a rpc client config.
|
// A RpcClientConf is a rpc client config.
|
||||||
|
|||||||
@@ -2,16 +2,22 @@ package clientinterceptors
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/retry"
|
"github.com/tal-tech/go-zero/core/retry"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RetryInterceptor retry interceptor
|
// RetryInterceptor retry interceptor
|
||||||
func RetryInterceptor(enable bool) grpc.UnaryClientInterceptor {
|
func RetryInterceptor(enable bool) grpc.UnaryClientInterceptor {
|
||||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
if !enable {
|
||||||
if !enable {
|
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
|
||||||
|
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||||
return invoker(ctx, method, req, reply, cc, opts...)
|
return invoker(ctx, method, req, reply, cc, opts...)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
|
||||||
|
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||||
return retry.Do(ctx, func(ctx context.Context, callOpts ...grpc.CallOption) error {
|
return retry.Do(ctx, func(ctx context.Context, callOpts ...grpc.CallOption) error {
|
||||||
return invoker(ctx, method, req, reply, cc, callOpts...)
|
return invoker(ctx, method, req, reply, cc, callOpts...)
|
||||||
}, opts...)
|
}, opts...)
|
||||||
|
|||||||
@@ -2,12 +2,13 @@ package clientinterceptors
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/retry"
|
"github.com/tal-tech/go-zero/core/retry"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"testing"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRetryInterceptor_WithMax(t *testing.T) {
|
func TestRetryInterceptor_WithMax(t *testing.T) {
|
||||||
@@ -16,12 +17,12 @@ func TestRetryInterceptor_WithMax(t *testing.T) {
|
|||||||
count := 0
|
count := 0
|
||||||
cc := new(grpc.ClientConn)
|
cc := new(grpc.ClientConn)
|
||||||
err := RetryInterceptor(true)(context.Background(), "/1", nil, nil, cc,
|
err := RetryInterceptor(true)(context.Background(), "/1", nil, nil, cc,
|
||||||
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
|
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
|
||||||
|
opts ...grpc.CallOption) error {
|
||||||
count++
|
count++
|
||||||
return status.Error(codes.ResourceExhausted, "ResourceExhausted")
|
return status.Error(codes.ResourceExhausted, "ResourceExhausted")
|
||||||
}, retry.WithMax(i))
|
}, retry.WithMax(i))
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
assert.Equal(t, i+1, count)
|
assert.Equal(t, i+1, count)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,17 +2,19 @@ package serverinterceptors
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/tal-tech/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/retry"
|
"github.com/tal-tech/go-zero/core/retry"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"strconv"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func RetryInterceptor(maxAttempt int) grpc.UnaryServerInterceptor {
|
func RetryInterceptor(maxAttempt int) grpc.UnaryServerInterceptor {
|
||||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||||
|
handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||||
var md metadata.MD
|
var md metadata.MD
|
||||||
requestMd, ok := metadata.FromIncomingContext(ctx)
|
requestMd, ok := metadata.FromIncomingContext(ctx)
|
||||||
if ok {
|
if ok {
|
||||||
|
|||||||
@@ -2,17 +2,20 @@ package serverinterceptors
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/retry"
|
"github.com/tal-tech/go-zero/core/retry"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
"testing"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRetryInterceptor(t *testing.T) {
|
func TestRetryInterceptor(t *testing.T) {
|
||||||
t.Run("retries exceeded", func(t *testing.T) {
|
t.Run("retries exceeded", func(t *testing.T) {
|
||||||
interceptor := RetryInterceptor(2)
|
interceptor := RetryInterceptor(2)
|
||||||
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{retry.AttemptMetadataKey: "3"}))
|
ctx := metadata.NewIncomingContext(context.Background(),
|
||||||
resp, err := interceptor(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
|
metadata.New(map[string]string{retry.AttemptMetadataKey: "3"}))
|
||||||
|
resp, err := interceptor(ctx, nil, nil, func(ctx context.Context,
|
||||||
|
req interface{}) (interface{}, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
@@ -21,8 +24,10 @@ func TestRetryInterceptor(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("reasonable retries", func(t *testing.T) {
|
t.Run("reasonable retries", func(t *testing.T) {
|
||||||
interceptor := RetryInterceptor(2)
|
interceptor := RetryInterceptor(2)
|
||||||
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{retry.AttemptMetadataKey: "2"}))
|
ctx := metadata.NewIncomingContext(context.Background(),
|
||||||
resp, err := interceptor(ctx, nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
|
metadata.New(map[string]string{retry.AttemptMetadataKey: "2"}))
|
||||||
|
resp, err := interceptor(ctx, nil, nil, func(ctx context.Context,
|
||||||
|
req interface{}) (interface{}, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
@@ -30,11 +35,11 @@ func TestRetryInterceptor(t *testing.T) {
|
|||||||
})
|
})
|
||||||
t.Run("no retries", func(t *testing.T) {
|
t.Run("no retries", func(t *testing.T) {
|
||||||
interceptor := RetryInterceptor(0)
|
interceptor := RetryInterceptor(0)
|
||||||
resp, err := interceptor(context.Background(), nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) {
|
resp, err := interceptor(context.Background(), nil, nil,
|
||||||
return nil, nil
|
func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
})
|
return nil, nil
|
||||||
|
})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Nil(t, resp)
|
assert.Nil(t, resp)
|
||||||
})
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user