feat: opentelemetry integration, removed self designed tracing (#1111)
* feat: opentelemetry integration, removed self designed tracing * feat: support zipkin on opentelemetry integration * feat: support zipkin on opentelemetry integration, enable it in conf * style: format code * fix: support logx without exporter configured * fix: check return values * refactor: simplify code * refactor: simplify opentelemetry integration * ci: fix staticcheck errors
This commit is contained in:
@@ -68,7 +68,6 @@ func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption {
|
||||
grpc.WithBlock(),
|
||||
WithUnaryClientInterceptors(
|
||||
clientinterceptors.UnaryTracingInterceptor,
|
||||
clientinterceptors.UnaryOpenTracingInterceptor(),
|
||||
clientinterceptors.DurationInterceptor,
|
||||
clientinterceptors.PrometheusInterceptor,
|
||||
clientinterceptors.BreakerInterceptor,
|
||||
@@ -76,7 +75,6 @@ func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption {
|
||||
),
|
||||
WithStreamClientInterceptors(
|
||||
clientinterceptors.StreamTracingInterceptor,
|
||||
clientinterceptors.StreamOpenTracingInterceptor(),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
@@ -1,92 +0,0 @@
|
||||
package clientinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/trace/opentelemetry"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc"
|
||||
gcodes "google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// UnaryOpenTracingInterceptor returns a grpc.UnaryClientInterceptor for opentelemetry.
|
||||
func UnaryOpenTracingInterceptor() grpc.UnaryClientInterceptor {
|
||||
propagator := otel.GetTextMapPropagator()
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
|
||||
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
if !opentelemetry.Enabled() {
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
|
||||
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
|
||||
metadataCopy := requestMetadata.Copy()
|
||||
tr := otel.Tracer(opentelemetry.TraceName)
|
||||
name, attr := opentelemetry.SpanInfo(method, cc.Target())
|
||||
ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient),
|
||||
trace.WithAttributes(attr...))
|
||||
defer span.End()
|
||||
|
||||
opentelemetry.Inject(ctx, propagator, &metadataCopy)
|
||||
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
|
||||
opentelemetry.MessageSent.Event(ctx, 1, req)
|
||||
opentelemetry.MessageReceived.Event(ctx, 1, reply)
|
||||
|
||||
if err := invoker(ctx, method, req, reply, cc, opts...); err != nil {
|
||||
s, _ := status.FromError(err)
|
||||
span.SetStatus(codes.Error, s.Message())
|
||||
span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code()))
|
||||
return err
|
||||
}
|
||||
|
||||
span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// StreamOpenTracingInterceptor returns a grpc.StreamClientInterceptor for opentelemetry.
|
||||
func StreamOpenTracingInterceptor() grpc.StreamClientInterceptor {
|
||||
propagator := otel.GetTextMapPropagator()
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
|
||||
streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
if !opentelemetry.Enabled() {
|
||||
return streamer(ctx, desc, cc, method, opts...)
|
||||
}
|
||||
|
||||
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
|
||||
metadataCopy := requestMetadata.Copy()
|
||||
tr := otel.Tracer(opentelemetry.TraceName)
|
||||
name, attr := opentelemetry.SpanInfo(method, cc.Target())
|
||||
ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient),
|
||||
trace.WithAttributes(attr...))
|
||||
opentelemetry.Inject(ctx, propagator, &metadataCopy)
|
||||
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
|
||||
s, err := streamer(ctx, desc, cc, method, opts...)
|
||||
if err != nil {
|
||||
grpcStatus, _ := status.FromError(err)
|
||||
span.SetStatus(codes.Error, grpcStatus.Message())
|
||||
span.SetAttributes(opentelemetry.StatusCodeAttr(grpcStatus.Code()))
|
||||
span.End()
|
||||
return s, err
|
||||
}
|
||||
|
||||
stream := opentelemetry.WrapClientStream(ctx, s, desc)
|
||||
|
||||
go func() {
|
||||
if err := <-stream.Finished; err != nil {
|
||||
s, _ := status.FromError(err)
|
||||
span.SetStatus(codes.Error, s.Message())
|
||||
span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code()))
|
||||
} else {
|
||||
span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK))
|
||||
}
|
||||
|
||||
span.End()
|
||||
}()
|
||||
|
||||
return stream, nil
|
||||
}
|
||||
}
|
||||
@@ -1,27 +0,0 @@
|
||||
package clientinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/trace/opentelemetry"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func TestOpenTracingInterceptor(t *testing.T) {
|
||||
opentelemetry.StartAgent(opentelemetry.Config{
|
||||
Name: "go-zero-test",
|
||||
Endpoint: "http://localhost:14268/api/traces",
|
||||
Batcher: "jaeger",
|
||||
Sampler: 1.0,
|
||||
})
|
||||
|
||||
cc := new(grpc.ClientConn)
|
||||
err := UnaryOpenTracingInterceptor()(context.Background(), "/ListUser", nil, nil, cc,
|
||||
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
|
||||
opts ...grpc.CallOption) error {
|
||||
return nil
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
@@ -2,40 +2,206 @@ package clientinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/trace"
|
||||
ztrace "github.com/tal-tech/go-zero/core/trace"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc"
|
||||
gcodes "google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// UnaryTracingInterceptor is an interceptor that handles tracing.
|
||||
const (
|
||||
receiveEndEvent streamEventType = iota
|
||||
errorEvent
|
||||
)
|
||||
|
||||
// UnaryTracingInterceptor returns a grpc.UnaryClientInterceptor for opentelemetry.
|
||||
func UnaryTracingInterceptor(ctx context.Context, method string, req, reply interface{},
|
||||
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
ctx, span := trace.StartClientSpan(ctx, cc.Target(), method)
|
||||
defer span.Finish()
|
||||
ctx, span := startSpan(ctx, method, cc.Target())
|
||||
defer span.End()
|
||||
|
||||
var pairs []string
|
||||
span.Visit(func(key, val string) bool {
|
||||
pairs = append(pairs, key, val)
|
||||
return true
|
||||
})
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, pairs...)
|
||||
ztrace.MessageSent.Event(ctx, 1, req)
|
||||
ztrace.MessageReceived.Event(ctx, 1, reply)
|
||||
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
if err := invoker(ctx, method, req, reply, cc, opts...); err != nil {
|
||||
s, ok := status.FromError(err)
|
||||
if ok {
|
||||
span.SetStatus(codes.Error, s.Message())
|
||||
span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))
|
||||
} else {
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))
|
||||
return nil
|
||||
}
|
||||
|
||||
// StreamTracingInterceptor is an interceptor that handles tracing for stream requests.
|
||||
// StreamTracingInterceptor returns a grpc.StreamClientInterceptor for opentelemetry.
|
||||
func StreamTracingInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn,
|
||||
method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
ctx, span := trace.StartClientSpan(ctx, cc.Target(), method)
|
||||
defer span.Finish()
|
||||
ctx, span := startSpan(ctx, method, cc.Target())
|
||||
s, err := streamer(ctx, desc, cc, method, opts...)
|
||||
if err != nil {
|
||||
st, ok := status.FromError(err)
|
||||
if ok {
|
||||
span.SetStatus(codes.Error, st.Message())
|
||||
span.SetAttributes(ztrace.StatusCodeAttr(st.Code()))
|
||||
} else {
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
}
|
||||
span.End()
|
||||
return s, err
|
||||
}
|
||||
|
||||
var pairs []string
|
||||
span.Visit(func(key, val string) bool {
|
||||
pairs = append(pairs, key, val)
|
||||
return true
|
||||
})
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, pairs...)
|
||||
stream := wrapClientStream(ctx, s, desc)
|
||||
|
||||
return streamer(ctx, desc, cc, method, opts...)
|
||||
go func() {
|
||||
if err := <-stream.Finished; err != nil {
|
||||
s, ok := status.FromError(err)
|
||||
if ok {
|
||||
span.SetStatus(codes.Error, s.Message())
|
||||
} else {
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
}
|
||||
span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))
|
||||
} else {
|
||||
span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))
|
||||
}
|
||||
|
||||
span.End()
|
||||
}()
|
||||
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
type (
|
||||
streamEventType int
|
||||
|
||||
streamEvent struct {
|
||||
Type streamEventType
|
||||
Err error
|
||||
}
|
||||
|
||||
clientStream struct {
|
||||
grpc.ClientStream
|
||||
Finished chan error
|
||||
desc *grpc.StreamDesc
|
||||
events chan streamEvent
|
||||
eventsDone chan struct{}
|
||||
receivedMessageID int
|
||||
sentMessageID int
|
||||
}
|
||||
)
|
||||
|
||||
func (w *clientStream) RecvMsg(m interface{}) error {
|
||||
err := w.ClientStream.RecvMsg(m)
|
||||
if err == nil && !w.desc.ServerStreams {
|
||||
w.sendStreamEvent(receiveEndEvent, nil)
|
||||
} else if err == io.EOF {
|
||||
w.sendStreamEvent(receiveEndEvent, nil)
|
||||
} else if err != nil {
|
||||
w.sendStreamEvent(errorEvent, err)
|
||||
} else {
|
||||
w.receivedMessageID++
|
||||
ztrace.MessageReceived.Event(w.Context(), w.receivedMessageID, m)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *clientStream) SendMsg(m interface{}) error {
|
||||
err := w.ClientStream.SendMsg(m)
|
||||
w.sentMessageID++
|
||||
ztrace.MessageSent.Event(w.Context(), w.sentMessageID, m)
|
||||
if err != nil {
|
||||
w.sendStreamEvent(errorEvent, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *clientStream) Header() (metadata.MD, error) {
|
||||
md, err := w.ClientStream.Header()
|
||||
if err != nil {
|
||||
w.sendStreamEvent(errorEvent, err)
|
||||
}
|
||||
|
||||
return md, err
|
||||
}
|
||||
|
||||
func (w *clientStream) CloseSend() error {
|
||||
err := w.ClientStream.CloseSend()
|
||||
if err != nil {
|
||||
w.sendStreamEvent(errorEvent, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
|
||||
select {
|
||||
case <-w.eventsDone:
|
||||
case w.events <- streamEvent{Type: eventType, Err: err}:
|
||||
}
|
||||
}
|
||||
|
||||
func startSpan(ctx context.Context, method, target string) (context.Context, trace.Span) {
|
||||
var md metadata.MD
|
||||
requestMetadata, ok := metadata.FromOutgoingContext(ctx)
|
||||
if ok {
|
||||
md = requestMetadata.Copy()
|
||||
} else {
|
||||
md = metadata.MD{}
|
||||
}
|
||||
tr := otel.Tracer(ztrace.TraceName)
|
||||
name, attr := ztrace.SpanInfo(method, target)
|
||||
ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient),
|
||||
trace.WithAttributes(attr...))
|
||||
ztrace.Inject(ctx, otel.GetTextMapPropagator(), &md)
|
||||
ctx = metadata.NewOutgoingContext(ctx, md)
|
||||
|
||||
return ctx, span
|
||||
}
|
||||
|
||||
// wrapClientStream wraps s with given ctx and desc.
|
||||
func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
|
||||
events := make(chan streamEvent)
|
||||
eventsDone := make(chan struct{})
|
||||
finished := make(chan error)
|
||||
|
||||
go func() {
|
||||
defer close(eventsDone)
|
||||
|
||||
for {
|
||||
select {
|
||||
case event := <-events:
|
||||
switch event.Type {
|
||||
case receiveEndEvent:
|
||||
finished <- nil
|
||||
return
|
||||
case errorEvent:
|
||||
finished <- event.Err
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
finished <- ctx.Err()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return &clientStream{
|
||||
ClientStream: s,
|
||||
desc: desc,
|
||||
events: events,
|
||||
eventsDone: eventsDone,
|
||||
Finished: finished,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,9 +9,25 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/trace"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
func TestOpenTracingInterceptor(t *testing.T) {
|
||||
trace.StartAgent(trace.Config{
|
||||
Name: "go-zero-test",
|
||||
Endpoint: "http://localhost:14268/api/traces",
|
||||
Batcher: "jaeger",
|
||||
Sampler: 1.0,
|
||||
})
|
||||
|
||||
cc := new(grpc.ClientConn)
|
||||
err := UnaryTracingInterceptor(context.Background(), "/ListUser", nil, nil, cc,
|
||||
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
|
||||
opts ...grpc.CallOption) error {
|
||||
return nil
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestUnaryTracingInterceptor(t *testing.T) {
|
||||
var run int32
|
||||
var wg sync.WaitGroup
|
||||
@@ -50,14 +66,8 @@ func TestUnaryTracingInterceptor_GrpcFormat(t *testing.T) {
|
||||
var run int32
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
md := metadata.New(map[string]string{
|
||||
"foo": "bar",
|
||||
})
|
||||
carrier, err := trace.Inject(trace.GrpcFormat, md)
|
||||
assert.Nil(t, err)
|
||||
ctx, _ := trace.StartServerSpan(context.Background(), carrier, "user", "/foo")
|
||||
cc := new(grpc.ClientConn)
|
||||
err = UnaryTracingInterceptor(ctx, "/foo", nil, nil, cc,
|
||||
err := UnaryTracingInterceptor(context.Background(), "/foo", nil, nil, cc,
|
||||
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
|
||||
opts ...grpc.CallOption) error {
|
||||
defer wg.Done()
|
||||
@@ -73,14 +83,8 @@ func TestStreamTracingInterceptor_GrpcFormat(t *testing.T) {
|
||||
var run int32
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
md := metadata.New(map[string]string{
|
||||
"foo": "bar",
|
||||
})
|
||||
carrier, err := trace.Inject(trace.GrpcFormat, md)
|
||||
assert.Nil(t, err)
|
||||
ctx, _ := trace.StartServerSpan(context.Background(), carrier, "user", "/foo")
|
||||
cc := new(grpc.ClientConn)
|
||||
_, err = StreamTracingInterceptor(ctx, nil, cc, "/foo",
|
||||
_, err := StreamTracingInterceptor(context.Background(), nil, cc, "/foo",
|
||||
func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
|
||||
opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
defer wg.Done()
|
||||
|
||||
@@ -54,17 +54,15 @@ func (s *rpcServer) Start(register RegisterFn) error {
|
||||
}
|
||||
|
||||
unaryInterceptors := []grpc.UnaryServerInterceptor{
|
||||
serverinterceptors.UnaryTracingInterceptor(s.name),
|
||||
serverinterceptors.UnaryOpenTracingInterceptor(),
|
||||
serverinterceptors.UnaryCrashInterceptor(),
|
||||
serverinterceptors.UnaryTracingInterceptor,
|
||||
serverinterceptors.UnaryCrashInterceptor,
|
||||
serverinterceptors.UnaryStatInterceptor(s.metrics),
|
||||
serverinterceptors.UnaryPrometheusInterceptor(),
|
||||
serverinterceptors.UnaryBreakerInterceptor(),
|
||||
serverinterceptors.UnaryPrometheusInterceptor,
|
||||
serverinterceptors.UnaryBreakerInterceptor,
|
||||
}
|
||||
unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
|
||||
streamInterceptors := []grpc.StreamServerInterceptor{
|
||||
serverinterceptors.StreamTracingInterceptor(s.name),
|
||||
serverinterceptors.StreamOpenTracingInterceptor(),
|
||||
serverinterceptors.StreamTracingInterceptor,
|
||||
serverinterceptors.StreamCrashInterceptor,
|
||||
serverinterceptors.StreamBreakerInterceptor,
|
||||
}
|
||||
|
||||
@@ -18,16 +18,14 @@ func StreamBreakerInterceptor(srv interface{}, stream grpc.ServerStream, info *g
|
||||
}
|
||||
|
||||
// UnaryBreakerInterceptor is an interceptor that acts as a circuit breaker.
|
||||
func UnaryBreakerInterceptor() grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
breakerName := info.FullMethod
|
||||
err = breaker.DoWithAcceptable(breakerName, func() error {
|
||||
var err error
|
||||
resp, err = handler(ctx, req)
|
||||
return err
|
||||
}, codes.Acceptable)
|
||||
func UnaryBreakerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
breakerName := info.FullMethod
|
||||
err = breaker.DoWithAcceptable(breakerName, func() error {
|
||||
var err error
|
||||
resp, err = handler(ctx, req)
|
||||
return err
|
||||
}, codes.Acceptable)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
@@ -21,8 +21,7 @@ func TestStreamBreakerInterceptor(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUnaryBreakerInterceptor(t *testing.T) {
|
||||
interceptor := UnaryBreakerInterceptor()
|
||||
_, err := interceptor(nil, nil, &grpc.UnaryServerInfo{
|
||||
_, err := UnaryBreakerInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{
|
||||
FullMethod: "any",
|
||||
}, func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return nil, status.New(codes.DeadlineExceeded, "any").Err()
|
||||
|
||||
@@ -21,15 +21,13 @@ func StreamCrashInterceptor(srv interface{}, stream grpc.ServerStream, info *grp
|
||||
}
|
||||
|
||||
// UnaryCrashInterceptor catches panics in processing unary requests and recovers.
|
||||
func UnaryCrashInterceptor() grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
defer handleCrash(func(r interface{}) {
|
||||
err = toPanicError(r)
|
||||
})
|
||||
func UnaryCrashInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
defer handleCrash(func(r interface{}) {
|
||||
err = toPanicError(r)
|
||||
})
|
||||
|
||||
return handler(ctx, req)
|
||||
}
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
func handleCrash(handler func(interface{})) {
|
||||
|
||||
@@ -22,8 +22,7 @@ func TestStreamCrashInterceptor(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUnaryCrashInterceptor(t *testing.T) {
|
||||
interceptor := UnaryCrashInterceptor()
|
||||
_, err := interceptor(context.Background(), nil, nil,
|
||||
_, err := UnaryCrashInterceptor(context.Background(), nil, nil,
|
||||
func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
panic("mock panic")
|
||||
})
|
||||
|
||||
@@ -1,82 +0,0 @@
|
||||
package serverinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/trace/opentelemetry"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/baggage"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc"
|
||||
gcodes "google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// UnaryOpenTracingInterceptor returns a grpc.UnaryServerInterceptor for opentelemetry.
|
||||
func UnaryOpenTracingInterceptor() grpc.UnaryServerInterceptor {
|
||||
propagator := otel.GetTextMapPropagator()
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (interface{}, error) {
|
||||
if !opentelemetry.Enabled() {
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
requestMetadata, _ := metadata.FromIncomingContext(ctx)
|
||||
metadataCopy := requestMetadata.Copy()
|
||||
bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy)
|
||||
ctx = baggage.ContextWithBaggage(ctx, bags)
|
||||
tr := otel.Tracer(opentelemetry.TraceName)
|
||||
name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx))
|
||||
ctx, span := tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name,
|
||||
trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...))
|
||||
defer span.End()
|
||||
|
||||
opentelemetry.MessageReceived.Event(ctx, 1, req)
|
||||
resp, err := handler(ctx, req)
|
||||
if err != nil {
|
||||
s, _ := status.FromError(err)
|
||||
span.SetStatus(codes.Error, s.Message())
|
||||
span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code()))
|
||||
opentelemetry.MessageSent.Event(ctx, 1, s.Proto())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK))
|
||||
opentelemetry.MessageSent.Event(ctx, 1, resp)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
}
|
||||
|
||||
// StreamOpenTracingInterceptor returns a grpc.StreamServerInterceptor for opentelemetry.
|
||||
func StreamOpenTracingInterceptor() grpc.StreamServerInterceptor {
|
||||
propagator := otel.GetTextMapPropagator()
|
||||
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
ctx := ss.Context()
|
||||
if !opentelemetry.Enabled() {
|
||||
return handler(srv, opentelemetry.WrapServerStream(ctx, ss))
|
||||
}
|
||||
|
||||
requestMetadata, _ := metadata.FromIncomingContext(ctx)
|
||||
metadataCopy := requestMetadata.Copy()
|
||||
bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy)
|
||||
ctx = baggage.ContextWithBaggage(ctx, bags)
|
||||
tr := otel.Tracer(opentelemetry.TraceName)
|
||||
name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx))
|
||||
ctx, span := tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name,
|
||||
trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...))
|
||||
defer span.End()
|
||||
|
||||
if err := handler(srv, opentelemetry.WrapServerStream(ctx, ss)); err != nil {
|
||||
s, _ := status.FromError(err)
|
||||
span.SetStatus(codes.Error, s.Message())
|
||||
span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code()))
|
||||
return err
|
||||
}
|
||||
|
||||
span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -1,36 +0,0 @@
|
||||
package serverinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/trace/opentelemetry"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func TestUnaryOpenTracingInterceptor_Disable(t *testing.T) {
|
||||
interceptor := UnaryOpenTracingInterceptor()
|
||||
_, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{
|
||||
FullMethod: "/",
|
||||
}, func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return nil, nil
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestUnaryOpenTracingInterceptor_Enabled(t *testing.T) {
|
||||
opentelemetry.StartAgent(opentelemetry.Config{
|
||||
Name: "go-zero-test",
|
||||
Endpoint: "http://localhost:14268/api/traces",
|
||||
Batcher: "jaeger",
|
||||
Sampler: 1.0,
|
||||
})
|
||||
interceptor := UnaryOpenTracingInterceptor()
|
||||
_, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{
|
||||
FullMethod: "/package.TestService.GetUser",
|
||||
}, func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return nil, nil
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
@@ -33,18 +33,16 @@ var (
|
||||
})
|
||||
)
|
||||
|
||||
// UnaryPrometheusInterceptor returns a func that reports to the prometheus server.
|
||||
func UnaryPrometheusInterceptor() grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
|
||||
interface{}, error) {
|
||||
if !prometheus.Enabled() {
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
startTime := timex.Now()
|
||||
resp, err := handler(ctx, req)
|
||||
metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod)
|
||||
metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err))))
|
||||
return resp, err
|
||||
// UnaryPrometheusInterceptor reports the statistics to the prometheus server.
|
||||
func UnaryPrometheusInterceptor(ctx context.Context, req interface{},
|
||||
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
if !prometheus.Enabled() {
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
startTime := timex.Now()
|
||||
resp, err := handler(ctx, req)
|
||||
metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod)
|
||||
metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err))))
|
||||
return resp, err
|
||||
}
|
||||
|
||||
@@ -10,8 +10,7 @@ import (
|
||||
)
|
||||
|
||||
func TestUnaryPromMetricInterceptor_Disabled(t *testing.T) {
|
||||
interceptor := UnaryPrometheusInterceptor()
|
||||
_, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{
|
||||
_, err := UnaryPrometheusInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{
|
||||
FullMethod: "/",
|
||||
}, func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return nil, nil
|
||||
@@ -24,8 +23,7 @@ func TestUnaryPromMetricInterceptor_Enabled(t *testing.T) {
|
||||
Host: "localhost",
|
||||
Path: "/",
|
||||
})
|
||||
interceptor := UnaryPrometheusInterceptor()
|
||||
_, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{
|
||||
_, err := UnaryPrometheusInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{
|
||||
FullMethod: "/",
|
||||
}, func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return nil, nil
|
||||
|
||||
@@ -3,50 +3,116 @@ package serverinterceptors
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/trace"
|
||||
ztrace "github.com/tal-tech/go-zero/core/trace"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/baggage"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc"
|
||||
gcodes "google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// UnaryTracingInterceptor returns a grpc.UnaryServerInterceptor
|
||||
// that handles tracing with given service name.
|
||||
func UnaryTracingInterceptor(serviceName string) grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
return handler(ctx, req)
|
||||
}
|
||||
// UnaryTracingInterceptor is a grpc.UnaryServerInterceptor for opentelemetry.
|
||||
func UnaryTracingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (interface{}, error) {
|
||||
ctx, span := startSpan(ctx, info.FullMethod)
|
||||
defer span.End()
|
||||
|
||||
carrier, err := trace.Extract(trace.GrpcFormat, md)
|
||||
if err != nil {
|
||||
return handler(ctx, req)
|
||||
ztrace.MessageReceived.Event(ctx, 1, req)
|
||||
resp, err := handler(ctx, req)
|
||||
if err != nil {
|
||||
s, ok := status.FromError(err)
|
||||
if ok {
|
||||
span.SetStatus(codes.Error, s.Message())
|
||||
span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))
|
||||
ztrace.MessageSent.Event(ctx, 1, s.Proto())
|
||||
} else {
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
}
|
||||
|
||||
ctx, span := trace.StartServerSpan(ctx, carrier, serviceName, info.FullMethod)
|
||||
defer span.Finish()
|
||||
return handler(ctx, req)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))
|
||||
ztrace.MessageSent.Event(ctx, 1, resp)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// StreamTracingInterceptor returns a grpc.StreamServerInterceptor
|
||||
// that handles tracing with given service name.
|
||||
func StreamTracingInterceptor(serviceName string) grpc.StreamServerInterceptor {
|
||||
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler) error {
|
||||
ctx := ss.Context()
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
return handler(srv, ss)
|
||||
}
|
||||
// StreamTracingInterceptor returns a grpc.StreamServerInterceptor for opentelemetry.
|
||||
func StreamTracingInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler) error {
|
||||
ctx, span := startSpan(ss.Context(), info.FullMethod)
|
||||
defer span.End()
|
||||
|
||||
carrier, err := trace.Extract(trace.GrpcFormat, md)
|
||||
if err != nil {
|
||||
return handler(srv, ss)
|
||||
if err := handler(srv, wrapServerStream(ctx, ss)); err != nil {
|
||||
s, ok := status.FromError(err)
|
||||
if ok {
|
||||
span.SetStatus(codes.Error, s.Message())
|
||||
span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))
|
||||
} else {
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
_, span := trace.StartServerSpan(ctx, carrier, serviceName, info.FullMethod)
|
||||
defer span.Finish()
|
||||
return handler(srv, ss)
|
||||
span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))
|
||||
return nil
|
||||
}
|
||||
|
||||
// serverStream wraps around the embedded grpc.ServerStream,
|
||||
// and intercepts the RecvMsg and SendMsg method call.
|
||||
type serverStream struct {
|
||||
grpc.ServerStream
|
||||
ctx context.Context
|
||||
receivedMessageID int
|
||||
sentMessageID int
|
||||
}
|
||||
|
||||
func (w *serverStream) Context() context.Context {
|
||||
return w.ctx
|
||||
}
|
||||
|
||||
func (w *serverStream) RecvMsg(m interface{}) error {
|
||||
err := w.ServerStream.RecvMsg(m)
|
||||
if err == nil {
|
||||
w.receivedMessageID++
|
||||
ztrace.MessageReceived.Event(w.Context(), w.receivedMessageID, m)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *serverStream) SendMsg(m interface{}) error {
|
||||
err := w.ServerStream.SendMsg(m)
|
||||
w.sentMessageID++
|
||||
ztrace.MessageSent.Event(w.Context(), w.sentMessageID, m)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func startSpan(ctx context.Context, method string) (context.Context, trace.Span) {
|
||||
var md metadata.MD
|
||||
requestMetadata, ok := metadata.FromIncomingContext(ctx)
|
||||
if ok {
|
||||
md = requestMetadata.Copy()
|
||||
} else {
|
||||
md = metadata.MD{}
|
||||
}
|
||||
bags, spanCtx := ztrace.Extract(ctx, otel.GetTextMapPropagator(), &md)
|
||||
ctx = baggage.ContextWithBaggage(ctx, bags)
|
||||
tr := otel.Tracer(ztrace.TraceName)
|
||||
name, attr := ztrace.SpanInfo(method, ztrace.PeerFromCtx(ctx))
|
||||
|
||||
return tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name,
|
||||
trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...))
|
||||
}
|
||||
|
||||
// wrapServerStream wraps the given grpc.ServerStream with the given context.
|
||||
func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {
|
||||
return &serverStream{
|
||||
ServerStream: ss,
|
||||
ctx: ctx,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,17 +7,40 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/trace/tracespec"
|
||||
"github.com/tal-tech/go-zero/core/trace"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
func TestUnaryOpenTracingInterceptor_Disable(t *testing.T) {
|
||||
_, err := UnaryTracingInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{
|
||||
FullMethod: "/",
|
||||
}, func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return nil, nil
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestUnaryOpenTracingInterceptor_Enabled(t *testing.T) {
|
||||
trace.StartAgent(trace.Config{
|
||||
Name: "go-zero-test",
|
||||
Endpoint: "http://localhost:14268/api/traces",
|
||||
Batcher: "jaeger",
|
||||
Sampler: 1.0,
|
||||
})
|
||||
_, err := UnaryTracingInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{
|
||||
FullMethod: "/package.TestService.GetUser",
|
||||
}, func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return nil, nil
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestUnaryTracingInterceptor(t *testing.T) {
|
||||
interceptor := UnaryTracingInterceptor("foo")
|
||||
var run int32
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
_, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{
|
||||
_, err := UnaryTracingInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{
|
||||
FullMethod: "/",
|
||||
}, func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
defer wg.Done()
|
||||
@@ -29,49 +52,14 @@ func TestUnaryTracingInterceptor(t *testing.T) {
|
||||
assert.Equal(t, int32(1), atomic.LoadInt32(&run))
|
||||
}
|
||||
|
||||
func TestUnaryTracingInterceptor_GrpcFormat(t *testing.T) {
|
||||
interceptor := UnaryTracingInterceptor("foo")
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
var md metadata.MD
|
||||
ctx := metadata.NewIncomingContext(context.Background(), md)
|
||||
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{
|
||||
FullMethod: "/",
|
||||
}, func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
defer wg.Done()
|
||||
assert.True(t, len(ctx.Value(tracespec.TracingKey).(tracespec.Trace).TraceId()) > 0)
|
||||
assert.True(t, len(ctx.Value(tracespec.TracingKey).(tracespec.Trace).SpanId()) > 0)
|
||||
return nil, nil
|
||||
})
|
||||
wg.Wait()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestStreamTracingInterceptor(t *testing.T) {
|
||||
interceptor := StreamTracingInterceptor("foo")
|
||||
var run int32
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
err := interceptor(nil, new(mockedServerStream), nil,
|
||||
func(srv interface{}, stream grpc.ServerStream) error {
|
||||
defer wg.Done()
|
||||
atomic.AddInt32(&run, 1)
|
||||
return nil
|
||||
})
|
||||
wg.Wait()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, int32(1), atomic.LoadInt32(&run))
|
||||
}
|
||||
|
||||
func TestStreamTracingInterceptor_GrpcFormat(t *testing.T) {
|
||||
interceptor := StreamTracingInterceptor("foo")
|
||||
var run int32
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
var md metadata.MD
|
||||
ctx := metadata.NewIncomingContext(context.Background(), md)
|
||||
stream := mockedServerStream{ctx: ctx}
|
||||
err := interceptor(nil, &stream, &grpc.StreamServerInfo{
|
||||
err := StreamTracingInterceptor(nil, &stream, &grpc.StreamServerInfo{
|
||||
FullMethod: "/foo",
|
||||
}, func(srv interface{}, stream grpc.ServerStream) error {
|
||||
defer wg.Done()
|
||||
|
||||
@@ -52,7 +52,7 @@ func TestServer(t *testing.T) {
|
||||
}, func(server *grpc.Server) {
|
||||
})
|
||||
srv.AddOptions(grpc.ConnectionTimeout(time.Hour))
|
||||
srv.AddUnaryInterceptors(serverinterceptors.UnaryCrashInterceptor())
|
||||
srv.AddUnaryInterceptors(serverinterceptors.UnaryCrashInterceptor)
|
||||
srv.AddStreamInterceptors(serverinterceptors.StreamCrashInterceptor)
|
||||
go srv.Start()
|
||||
srv.Stop()
|
||||
@@ -94,7 +94,7 @@ func TestServer_HasEtcd(t *testing.T) {
|
||||
}, func(server *grpc.Server) {
|
||||
})
|
||||
srv.AddOptions(grpc.ConnectionTimeout(time.Hour))
|
||||
srv.AddUnaryInterceptors(serverinterceptors.UnaryCrashInterceptor())
|
||||
srv.AddUnaryInterceptors(serverinterceptors.UnaryCrashInterceptor)
|
||||
srv.AddStreamInterceptors(serverinterceptors.StreamCrashInterceptor)
|
||||
go srv.Start()
|
||||
srv.Stop()
|
||||
|
||||
Reference in New Issue
Block a user