feature(retry): Delete retry mechanism (#1279)
This commit is contained in:
@@ -18,8 +18,6 @@ var (
|
||||
WithNonBlock = internal.WithNonBlock
|
||||
// WithTimeout is an alias of internal.WithTimeout.
|
||||
WithTimeout = internal.WithTimeout
|
||||
// WithRetry is an alias of internal.WithRetry.
|
||||
WithRetry = internal.WithRetry
|
||||
// WithTransportCredentials return a func to make the gRPC calls secured with given credentials.
|
||||
WithTransportCredentials = internal.WithTransportCredentials
|
||||
// WithUnaryClientInterceptor is an alias of internal.WithUnaryClientInterceptor.
|
||||
@@ -63,9 +61,7 @@ func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
|
||||
if c.Timeout > 0 {
|
||||
opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))
|
||||
}
|
||||
if c.Retry {
|
||||
opts = append(opts, WithRetry())
|
||||
}
|
||||
|
||||
opts = append(opts, options...)
|
||||
|
||||
var target string
|
||||
|
||||
@@ -102,7 +102,6 @@ func TestDepositServer_Deposit(t *testing.T) {
|
||||
App: "foo",
|
||||
Token: "bar",
|
||||
Timeout: 1000,
|
||||
Retry: true,
|
||||
},
|
||||
WithDialOption(grpc.WithContextDialer(dialer())),
|
||||
WithUnaryClientInterceptor(func(ctx context.Context, method string, req, reply interface{},
|
||||
|
||||
@@ -18,7 +18,6 @@ type (
|
||||
// setting 0 means no timeout
|
||||
Timeout int64 `json:",default=2000"`
|
||||
CpuThreshold int64 `json:",default=900,range=[0:1000]"`
|
||||
MaxRetries int `json:",default=0,range=[0:]"`
|
||||
}
|
||||
|
||||
// A RpcClientConf is a rpc client config.
|
||||
@@ -29,7 +28,6 @@ type (
|
||||
App string `json:",optional"`
|
||||
Token string `json:",optional"`
|
||||
NonBlock bool `json:",optional"`
|
||||
Retry bool `json:",optional"` // grpc auto retry
|
||||
Timeout int64 `json:",default=2000"`
|
||||
}
|
||||
)
|
||||
|
||||
@@ -34,7 +34,6 @@ type (
|
||||
NonBlock bool
|
||||
Timeout time.Duration
|
||||
Secure bool
|
||||
Retry bool
|
||||
DialOptions []grpc.DialOption
|
||||
}
|
||||
|
||||
@@ -83,7 +82,6 @@ func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption {
|
||||
clientinterceptors.PrometheusInterceptor,
|
||||
clientinterceptors.BreakerInterceptor,
|
||||
clientinterceptors.TimeoutInterceptor(cliOpts.Timeout),
|
||||
clientinterceptors.RetryInterceptor(cliOpts.Retry),
|
||||
),
|
||||
WithStreamClientInterceptors(
|
||||
clientinterceptors.StreamTracingInterceptor,
|
||||
@@ -136,13 +134,6 @@ func WithTimeout(timeout time.Duration) ClientOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithRetry returns a func to customize a ClientOptions with auto retry.
|
||||
func WithRetry() ClientOption {
|
||||
return func(options *ClientOptions) {
|
||||
options.Retry = true
|
||||
}
|
||||
}
|
||||
|
||||
// WithTransportCredentials return a func to make the gRPC calls secured with given credentials.
|
||||
func WithTransportCredentials(creds credentials.TransportCredentials) ClientOption {
|
||||
return func(options *ClientOptions) {
|
||||
|
||||
@@ -24,13 +24,6 @@ func TestWithTimeout(t *testing.T) {
|
||||
assert.Equal(t, time.Second, options.Timeout)
|
||||
}
|
||||
|
||||
func TestWithRetry(t *testing.T) {
|
||||
var options ClientOptions
|
||||
opt := WithRetry()
|
||||
opt(&options)
|
||||
assert.True(t, options.Retry)
|
||||
}
|
||||
|
||||
func TestWithNonBlock(t *testing.T) {
|
||||
var options ClientOptions
|
||||
opt := WithNonBlock()
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
package clientinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/retry"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// RetryInterceptor retry interceptor
|
||||
func RetryInterceptor(enable bool) grpc.UnaryClientInterceptor {
|
||||
if !enable {
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
|
||||
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
|
||||
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
return retry.Do(ctx, func(ctx context.Context, callOpts ...grpc.CallOption) error {
|
||||
return invoker(ctx, method, req, reply, cc, callOpts...)
|
||||
}, opts...)
|
||||
}
|
||||
}
|
||||
@@ -1,28 +0,0 @@
|
||||
package clientinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/retry"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func TestRetryInterceptor_WithMax(t *testing.T) {
|
||||
n := 4
|
||||
for i := 0; i < n; i++ {
|
||||
count := 0
|
||||
cc := new(grpc.ClientConn)
|
||||
err := RetryInterceptor(true)(context.Background(), "/1", nil, nil, cc,
|
||||
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
|
||||
opts ...grpc.CallOption) error {
|
||||
count++
|
||||
return status.Error(codes.ResourceExhausted, "ResourceExhausted")
|
||||
}, retry.WithMax(i))
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, i+1, count)
|
||||
}
|
||||
}
|
||||
@@ -14,8 +14,7 @@ type (
|
||||
ServerOption func(options *rpcServerOptions)
|
||||
|
||||
rpcServerOptions struct {
|
||||
metrics *stat.Metrics
|
||||
MaxRetries int
|
||||
metrics *stat.Metrics
|
||||
}
|
||||
|
||||
rpcServer struct {
|
||||
@@ -56,7 +55,6 @@ func (s *rpcServer) Start(register RegisterFn) error {
|
||||
|
||||
unaryInterceptors := []grpc.UnaryServerInterceptor{
|
||||
serverinterceptors.UnaryTracingInterceptor,
|
||||
serverinterceptors.RetryInterceptor(s.maxRetries),
|
||||
serverinterceptors.UnaryCrashInterceptor,
|
||||
serverinterceptors.UnaryStatInterceptor(s.metrics),
|
||||
serverinterceptors.UnaryPrometheusInterceptor,
|
||||
@@ -89,10 +87,3 @@ func WithMetrics(metrics *stat.Metrics) ServerOption {
|
||||
options.metrics = metrics
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxRetries returns a func that sets a max retries to a Server.
|
||||
func WithMaxRetries(maxRetries int) ServerOption {
|
||||
return func(options *rpcServerOptions) {
|
||||
options.MaxRetries = maxRetries
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ type (
|
||||
baseRpcServer struct {
|
||||
address string
|
||||
metrics *stat.Metrics
|
||||
maxRetries int
|
||||
options []grpc.ServerOption
|
||||
streamInterceptors []grpc.StreamServerInterceptor
|
||||
unaryInterceptors []grpc.UnaryServerInterceptor
|
||||
@@ -30,9 +29,8 @@ type (
|
||||
|
||||
func newBaseRpcServer(address string, rpcServerOpts *rpcServerOptions) *baseRpcServer {
|
||||
return &baseRpcServer{
|
||||
address: address,
|
||||
metrics: rpcServerOpts.metrics,
|
||||
maxRetries: rpcServerOpts.MaxRetries,
|
||||
address: address,
|
||||
metrics: rpcServerOpts.metrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,35 +0,0 @@
|
||||
package serverinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/retry"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func RetryInterceptor(maxAttempt int) grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
var md metadata.MD
|
||||
requestMd, ok := metadata.FromIncomingContext(ctx)
|
||||
if ok {
|
||||
md = requestMd.Copy()
|
||||
attemptMd := md.Get(retry.AttemptMetadataKey)
|
||||
if len(attemptMd) != 0 && attemptMd[0] != "" {
|
||||
if attempt, err := strconv.Atoi(attemptMd[0]); err == nil {
|
||||
if attempt > maxAttempt {
|
||||
logx.WithContext(ctx).Errorf("retries exceeded:%d, max retries:%d", attempt, maxAttempt)
|
||||
return nil, status.Error(codes.FailedPrecondition, "Retries exceeded")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
@@ -1,45 +0,0 @@
|
||||
package serverinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/retry"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
func TestRetryInterceptor(t *testing.T) {
|
||||
t.Run("retries exceeded", func(t *testing.T) {
|
||||
interceptor := RetryInterceptor(2)
|
||||
ctx := metadata.NewIncomingContext(context.Background(),
|
||||
metadata.New(map[string]string{retry.AttemptMetadataKey: "3"}))
|
||||
resp, err := interceptor(ctx, nil, nil, func(ctx context.Context,
|
||||
req interface{}) (interface{}, error) {
|
||||
return nil, nil
|
||||
})
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, resp)
|
||||
})
|
||||
|
||||
t.Run("reasonable retries", func(t *testing.T) {
|
||||
interceptor := RetryInterceptor(2)
|
||||
ctx := metadata.NewIncomingContext(context.Background(),
|
||||
metadata.New(map[string]string{retry.AttemptMetadataKey: "2"}))
|
||||
resp, err := interceptor(ctx, nil, nil, func(ctx context.Context,
|
||||
req interface{}) (interface{}, error) {
|
||||
return nil, nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, resp)
|
||||
})
|
||||
t.Run("no retries", func(t *testing.T) {
|
||||
interceptor := RetryInterceptor(0)
|
||||
resp, err := interceptor(context.Background(), nil, nil,
|
||||
func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return nil, nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, resp)
|
||||
})
|
||||
}
|
||||
@@ -40,7 +40,6 @@ func NewServer(c RpcServerConf, register internal.RegisterFn) (*RpcServer, error
|
||||
metrics := stat.NewMetrics(c.ListenOn)
|
||||
serverOptions := []internal.ServerOption{
|
||||
internal.WithMetrics(metrics),
|
||||
internal.WithMaxRetries(c.MaxRetries),
|
||||
}
|
||||
|
||||
if c.HasEtcd() {
|
||||
|
||||
Reference in New Issue
Block a user