chore: add unit tests (#1615)
* test: add more tests * test: add more tests
This commit is contained in:
@@ -23,7 +23,7 @@ func TestBaseRpcServer_AddStreamInterceptors(t *testing.T) {
|
||||
server := newBaseRpcServer("foo", &rpcServerOptions{metrics: metrics})
|
||||
server.SetName("bar")
|
||||
var vals []int
|
||||
f := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
f := func(_ interface{}, _ grpc.ServerStream, _ *grpc.StreamServerInfo, _ grpc.StreamHandler) error {
|
||||
vals = append(vals, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -9,13 +9,13 @@ import (
|
||||
|
||||
// StreamAuthorizeInterceptor returns a func that uses given authenticator in processing stream requests.
|
||||
func StreamAuthorizeInterceptor(authenticator *auth.Authenticator) grpc.StreamServerInterceptor {
|
||||
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
return func(svr interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler) error {
|
||||
if err := authenticator.Authenticate(stream.Context()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return handler(srv, stream)
|
||||
return handler(svr, stream)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -65,7 +65,7 @@ func TestStreamAuthorizeInterceptor(t *testing.T) {
|
||||
})
|
||||
ctx := metadata.NewIncomingContext(context.Background(), md)
|
||||
stream := mockedStream{ctx: ctx}
|
||||
err = interceptor(nil, stream, nil, func(srv interface{}, stream grpc.ServerStream) error {
|
||||
err = interceptor(nil, stream, nil, func(_ interface{}, _ grpc.ServerStream) error {
|
||||
return nil
|
||||
})
|
||||
if test.hasError {
|
||||
|
||||
@@ -9,11 +9,11 @@ import (
|
||||
)
|
||||
|
||||
// StreamBreakerInterceptor is an interceptor that acts as a circuit breaker.
|
||||
func StreamBreakerInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
func StreamBreakerInterceptor(svr interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler) (err error) {
|
||||
breakerName := info.FullMethod
|
||||
return breaker.DoWithAcceptable(breakerName, func() error {
|
||||
return handler(srv, stream)
|
||||
return handler(svr, stream)
|
||||
}, codes.Acceptable)
|
||||
}
|
||||
|
||||
|
||||
@@ -13,8 +13,7 @@ import (
|
||||
func TestStreamBreakerInterceptor(t *testing.T) {
|
||||
err := StreamBreakerInterceptor(nil, nil, &grpc.StreamServerInfo{
|
||||
FullMethod: "any",
|
||||
}, func(
|
||||
srv interface{}, stream grpc.ServerStream) error {
|
||||
}, func(_ interface{}, _ grpc.ServerStream) error {
|
||||
return status.New(codes.DeadlineExceeded, "any").Err()
|
||||
})
|
||||
assert.NotNil(t, err)
|
||||
@@ -23,7 +22,7 @@ func TestStreamBreakerInterceptor(t *testing.T) {
|
||||
func TestUnaryBreakerInterceptor(t *testing.T) {
|
||||
_, err := UnaryBreakerInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{
|
||||
FullMethod: "any",
|
||||
}, func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
}, func(_ context.Context, _ interface{}) (interface{}, error) {
|
||||
return nil, status.New(codes.DeadlineExceeded, "any").Err()
|
||||
})
|
||||
assert.NotNil(t, err)
|
||||
|
||||
@@ -11,17 +11,17 @@ import (
|
||||
)
|
||||
|
||||
// StreamCrashInterceptor catches panics in processing stream requests and recovers.
|
||||
func StreamCrashInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
func StreamCrashInterceptor(svr interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler) (err error) {
|
||||
defer handleCrash(func(r interface{}) {
|
||||
err = toPanicError(r)
|
||||
})
|
||||
|
||||
return handler(srv, stream)
|
||||
return handler(svr, stream)
|
||||
}
|
||||
|
||||
// UnaryCrashInterceptor catches panics in processing unary requests and recovers.
|
||||
func UnaryCrashInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
func UnaryCrashInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
defer handleCrash(func(r interface{}) {
|
||||
err = toPanicError(r)
|
||||
|
||||
@@ -15,7 +15,7 @@ func init() {
|
||||
|
||||
func TestStreamCrashInterceptor(t *testing.T) {
|
||||
err := StreamCrashInterceptor(nil, nil, nil, func(
|
||||
srv interface{}, stream grpc.ServerStream) error {
|
||||
svr interface{}, stream grpc.ServerStream) error {
|
||||
panic("mock panic")
|
||||
})
|
||||
assert.NotNil(t, err)
|
||||
|
||||
@@ -41,12 +41,12 @@ func UnaryTracingInterceptor(ctx context.Context, req interface{}, info *grpc.Un
|
||||
}
|
||||
|
||||
// StreamTracingInterceptor returns a grpc.StreamServerInterceptor for opentelemetry.
|
||||
func StreamTracingInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
func StreamTracingInterceptor(svr interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler) error {
|
||||
ctx, span := startSpan(ss.Context(), info.FullMethod)
|
||||
defer span.End()
|
||||
|
||||
if err := handler(srv, wrapServerStream(ctx, ss)); err != nil {
|
||||
if err := handler(svr, wrapServerStream(ctx, ss)); err != nil {
|
||||
s, ok := status.FromError(err)
|
||||
if ok {
|
||||
span.SetStatus(codes.Error, s.Message())
|
||||
|
||||
@@ -101,7 +101,7 @@ func TestStreamTracingInterceptor_GrpcFormat(t *testing.T) {
|
||||
stream := mockedServerStream{ctx: ctx}
|
||||
err := StreamTracingInterceptor(nil, &stream, &grpc.StreamServerInfo{
|
||||
FullMethod: "/foo",
|
||||
}, func(srv interface{}, stream grpc.ServerStream) error {
|
||||
}, func(svr interface{}, stream grpc.ServerStream) error {
|
||||
defer wg.Done()
|
||||
atomic.AddInt32(&run, 1)
|
||||
return nil
|
||||
@@ -138,7 +138,7 @@ func TestStreamTracingInterceptor_FinishWithGrpcError(t *testing.T) {
|
||||
stream := mockedServerStream{ctx: ctx}
|
||||
err := StreamTracingInterceptor(nil, &stream, &grpc.StreamServerInfo{
|
||||
FullMethod: "/foo",
|
||||
}, func(srv interface{}, stream grpc.ServerStream) error {
|
||||
}, func(svr interface{}, stream grpc.ServerStream) error {
|
||||
defer wg.Done()
|
||||
return test.err
|
||||
})
|
||||
@@ -175,7 +175,7 @@ func TestStreamTracingInterceptor_WithError(t *testing.T) {
|
||||
stream := mockedServerStream{ctx: ctx}
|
||||
err := StreamTracingInterceptor(nil, &stream, &grpc.StreamServerInfo{
|
||||
FullMethod: "/foo",
|
||||
}, func(srv interface{}, stream grpc.ServerStream) error {
|
||||
}, func(svr interface{}, stream grpc.ServerStream) error {
|
||||
defer wg.Done()
|
||||
return test.err
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user