diff --git a/.gitignore b/.gitignore index 234a2cda..985ce47c 100644 --- a/.gitignore +++ b/.gitignore @@ -15,9 +15,6 @@ **/.DS_Store **/logs -# ignore adhoc test code -**/adhoc - # gitlab ci .cache diff --git a/core/logx/tracelogger.go b/core/logx/tracelogger.go index 5d48c6de..c02e0c9d 100644 --- a/core/logx/tracelogger.go +++ b/core/logx/tracelogger.go @@ -7,7 +7,6 @@ import ( "time" "github.com/tal-tech/go-zero/core/timex" - "github.com/tal-tech/go-zero/core/trace/tracespec" "go.opentelemetry.io/otel/trace" ) @@ -94,35 +93,19 @@ func WithContext(ctx context.Context) Logger { } func spanIdFromContext(ctx context.Context) string { - span := trace.SpanFromContext(ctx) - if span.IsRecording() { - spanCtx := span.SpanContext() - if spanCtx.IsValid() { - return spanCtx.SpanID().String() - } + spanCtx := trace.SpanContextFromContext(ctx) + if spanCtx.HasSpanID() { + return spanCtx.SpanID().String() } - t, ok := ctx.Value(tracespec.TracingKey).(tracespec.Trace) - if !ok { - return "" - } - - return t.SpanId() + return "" } func traceIdFromContext(ctx context.Context) string { - span := trace.SpanFromContext(ctx) - if span.IsRecording() { - spanCtx := span.SpanContext() - if spanCtx.IsValid() { - return span.SpanContext().TraceID().String() - } + spanCtx := trace.SpanContextFromContext(ctx) + if spanCtx.HasTraceID() { + return spanCtx.TraceID().String() } - t, ok := ctx.Value(tracespec.TracingKey).(tracespec.Trace) - if !ok { - return "" - } - - return t.TraceId() + return "" } diff --git a/core/logx/tracelogger_test.go b/core/logx/tracelogger_test.go index da7be2e7..c05c0f86 100644 --- a/core/logx/tracelogger_test.go +++ b/core/logx/tracelogger_test.go @@ -9,71 +9,90 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/tal-tech/go-zero/core/trace/tracespec" + "go.opentelemetry.io/otel" + sdktrace "go.opentelemetry.io/otel/sdk/trace" ) const ( - mockTraceID = "mock-trace-id" - mockSpanID = "mock-span-id" + traceKey = "trace" + spanKey = "span" ) -var mock tracespec.Trace = new(mockTrace) - func TestTraceLog(t *testing.T) { var buf mockWriter atomic.StoreUint32(&initialized, 1) - ctx := context.WithValue(context.Background(), tracespec.TracingKey, mock) + otp := otel.GetTracerProvider() + tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample())) + otel.SetTracerProvider(tp) + defer otel.SetTracerProvider(otp) + + ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar") WithContext(ctx).(*traceLogger).write(&buf, levelInfo, testlog) - assert.True(t, strings.Contains(buf.String(), mockTraceID)) - assert.True(t, strings.Contains(buf.String(), mockSpanID)) + assert.True(t, strings.Contains(buf.String(), traceKey)) + assert.True(t, strings.Contains(buf.String(), spanKey)) } func TestTraceError(t *testing.T) { var buf mockWriter atomic.StoreUint32(&initialized, 1) errorLog = newLogWriter(log.New(&buf, "", flags)) - ctx := context.WithValue(context.Background(), tracespec.TracingKey, mock) + otp := otel.GetTracerProvider() + tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample())) + otel.SetTracerProvider(tp) + defer otel.SetTracerProvider(otp) + + ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar") l := WithContext(ctx).(*traceLogger) SetLevel(InfoLevel) l.WithDuration(time.Second).Error(testlog) - assert.True(t, strings.Contains(buf.String(), mockTraceID)) - assert.True(t, strings.Contains(buf.String(), mockSpanID)) + assert.True(t, strings.Contains(buf.String(), traceKey)) + assert.True(t, strings.Contains(buf.String(), spanKey)) buf.Reset() l.WithDuration(time.Second).Errorf(testlog) - assert.True(t, strings.Contains(buf.String(), mockTraceID)) - assert.True(t, strings.Contains(buf.String(), mockSpanID)) + assert.True(t, strings.Contains(buf.String(), traceKey)) + assert.True(t, strings.Contains(buf.String(), spanKey)) } func TestTraceInfo(t *testing.T) { var buf mockWriter atomic.StoreUint32(&initialized, 1) infoLog = newLogWriter(log.New(&buf, "", flags)) - ctx := context.WithValue(context.Background(), tracespec.TracingKey, mock) + otp := otel.GetTracerProvider() + tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample())) + otel.SetTracerProvider(tp) + defer otel.SetTracerProvider(otp) + + ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar") l := WithContext(ctx).(*traceLogger) SetLevel(InfoLevel) l.WithDuration(time.Second).Info(testlog) - assert.True(t, strings.Contains(buf.String(), mockTraceID)) - assert.True(t, strings.Contains(buf.String(), mockSpanID)) + assert.True(t, strings.Contains(buf.String(), traceKey)) + assert.True(t, strings.Contains(buf.String(), spanKey)) buf.Reset() l.WithDuration(time.Second).Infof(testlog) - assert.True(t, strings.Contains(buf.String(), mockTraceID)) - assert.True(t, strings.Contains(buf.String(), mockSpanID)) + assert.True(t, strings.Contains(buf.String(), traceKey)) + assert.True(t, strings.Contains(buf.String(), spanKey)) } func TestTraceSlow(t *testing.T) { var buf mockWriter atomic.StoreUint32(&initialized, 1) slowLog = newLogWriter(log.New(&buf, "", flags)) - ctx := context.WithValue(context.Background(), tracespec.TracingKey, mock) + otp := otel.GetTracerProvider() + tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample())) + otel.SetTracerProvider(tp) + defer otel.SetTracerProvider(otp) + + ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar") l := WithContext(ctx).(*traceLogger) SetLevel(InfoLevel) l.WithDuration(time.Second).Slow(testlog) - assert.True(t, strings.Contains(buf.String(), mockTraceID)) - assert.True(t, strings.Contains(buf.String(), mockSpanID)) + assert.True(t, strings.Contains(buf.String(), traceKey)) + assert.True(t, strings.Contains(buf.String(), spanKey)) buf.Reset() l.WithDuration(time.Second).Slowf(testlog) - assert.True(t, strings.Contains(buf.String(), mockTraceID)) - assert.True(t, strings.Contains(buf.String(), mockSpanID)) + assert.True(t, strings.Contains(buf.String(), traceKey)) + assert.True(t, strings.Contains(buf.String(), spanKey)) } func TestTraceWithoutContext(t *testing.T) { @@ -83,34 +102,10 @@ func TestTraceWithoutContext(t *testing.T) { l := WithContext(context.Background()).(*traceLogger) SetLevel(InfoLevel) l.WithDuration(time.Second).Info(testlog) - assert.False(t, strings.Contains(buf.String(), mockTraceID)) - assert.False(t, strings.Contains(buf.String(), mockSpanID)) + assert.False(t, strings.Contains(buf.String(), traceKey)) + assert.False(t, strings.Contains(buf.String(), spanKey)) buf.Reset() l.WithDuration(time.Second).Infof(testlog) - assert.False(t, strings.Contains(buf.String(), mockTraceID)) - assert.False(t, strings.Contains(buf.String(), mockSpanID)) -} - -type mockTrace struct{} - -func (t mockTrace) TraceId() string { - return mockTraceID -} - -func (t mockTrace) SpanId() string { - return mockSpanID -} - -func (t mockTrace) Finish() { -} - -func (t mockTrace) Fork(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) { - return nil, nil -} - -func (t mockTrace) Follow(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) { - return nil, nil -} - -func (t mockTrace) Visit(fn func(key, val string) bool) { + assert.False(t, strings.Contains(buf.String(), traceKey)) + assert.False(t, strings.Contains(buf.String(), spanKey)) } diff --git a/core/service/serviceconf.go b/core/service/serviceconf.go index 6496aab1..577b0663 100644 --- a/core/service/serviceconf.go +++ b/core/service/serviceconf.go @@ -7,7 +7,7 @@ import ( "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/prometheus" "github.com/tal-tech/go-zero/core/stat" - "github.com/tal-tech/go-zero/core/trace/opentelemetry" + "github.com/tal-tech/go-zero/core/trace" ) const ( @@ -27,10 +27,10 @@ const ( type ServiceConf struct { Name string Log logx.LogConf - Mode string `json:",default=pro,options=dev|test|rt|pre|pro"` - MetricsUrl string `json:",optional"` - Prometheus prometheus.Config `json:",optional"` - Telemetry opentelemetry.Config `json:",optional"` + Mode string `json:",default=pro,options=dev|test|rt|pre|pro"` + MetricsUrl string `json:",optional"` + Prometheus prometheus.Config `json:",optional"` + Telemetry trace.Config `json:",optional"` } // MustSetUp sets up the service, exits on error. @@ -55,7 +55,7 @@ func (sc ServiceConf) SetUp() error { if len(sc.Telemetry.Name) == 0 { sc.Telemetry.Name = sc.Name } - opentelemetry.StartAgent(sc.Telemetry) + trace.StartAgent(sc.Telemetry) if len(sc.MetricsUrl) > 0 { stat.SetReportWriter(stat.NewRemoteWriter(sc.MetricsUrl)) diff --git a/core/trace/agent.go b/core/trace/agent.go new file mode 100644 index 00000000..af4f046f --- /dev/null +++ b/core/trace/agent.go @@ -0,0 +1,69 @@ +package trace + +import ( + "fmt" + "sync" + + "github.com/tal-tech/go-zero/core/logx" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/exporters/zipkin" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" +) + +const ( + kindJaeger = "jaeger" + kindZipkin = "zipkin" +) + +var once sync.Once + +// StartAgent starts a opentelemetry agent. +func StartAgent(c Config) { + once.Do(func() { + startAgent(c) + }) +} + +func createExporter(c Config) (sdktrace.SpanExporter, error) { + // Just support jaeger now, more for later + switch c.Batcher { + case kindJaeger: + return jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(c.Endpoint))) + case kindZipkin: + return zipkin.New(c.Endpoint) + default: + return nil, fmt.Errorf("unknown exporter: %s", c.Batcher) + } +} + +func startAgent(c Config) { + opts := []sdktrace.TracerProviderOption{ + // Set the sampling rate based on the parent span to 100% + sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(c.Sampler))), + // Record information about this application in an Resource. + sdktrace.WithResource(resource.NewSchemaless(semconv.ServiceNameKey.String(c.Name))), + } + + if len(c.Endpoint) > 0 { + exp, err := createExporter(c) + if err != nil { + logx.Error(err) + return + } + + // Always be sure to batch in production. + opts = append(opts, sdktrace.WithBatcher(exp)) + } + + tp := sdktrace.NewTracerProvider(opts...) + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, propagation.Baggage{})) + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + logx.Errorf("[otel] error: %v", err) + })) +} diff --git a/core/trace/opentelemetry/attributes.go b/core/trace/attributes.go similarity index 98% rename from core/trace/opentelemetry/attributes.go rename to core/trace/attributes.go index 45bfd0ea..a0c9dfd3 100644 --- a/core/trace/opentelemetry/attributes.go +++ b/core/trace/attributes.go @@ -1,4 +1,4 @@ -package opentelemetry +package trace import ( "go.opentelemetry.io/otel/attribute" diff --git a/core/trace/carrier.go b/core/trace/carrier.go deleted file mode 100644 index 4308b8a9..00000000 --- a/core/trace/carrier.go +++ /dev/null @@ -1,43 +0,0 @@ -package trace - -import ( - "errors" - "net/http" - "strings" -) - -// ErrInvalidCarrier indicates an error that the carrier is invalid. -var ErrInvalidCarrier = errors.New("invalid carrier") - -type ( - // Carrier interface wraps the Get and Set method. - Carrier interface { - Get(key string) string - Set(key, value string) - } - - httpCarrier http.Header - // grpc metadata takes keys as case insensitive - grpcCarrier map[string][]string -) - -func (h httpCarrier) Get(key string) string { - return http.Header(h).Get(key) -} - -func (h httpCarrier) Set(key, val string) { - http.Header(h).Set(key, val) -} - -func (g grpcCarrier) Get(key string) string { - if vals, ok := g[strings.ToLower(key)]; ok && len(vals) > 0 { - return vals[0] - } - - return "" -} - -func (g grpcCarrier) Set(key, val string) { - key = strings.ToLower(key) - g[key] = append(g[key], val) -} diff --git a/core/trace/carrier_test.go b/core/trace/carrier_test.go deleted file mode 100644 index 4d9f0326..00000000 --- a/core/trace/carrier_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package trace - -import ( - "net/http" - "net/http/httptest" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/tal-tech/go-zero/core/stringx" -) - -func TestHttpCarrier(t *testing.T) { - tests := []map[string]string{ - {}, - { - "first": "a", - "second": "b", - }, - } - - for _, test := range tests { - t.Run(stringx.RandId(), func(t *testing.T) { - req := httptest.NewRequest(http.MethodGet, "http://localhost", nil) - carrier := httpCarrier(req.Header) - for k, v := range test { - carrier.Set(k, v) - } - for k, v := range test { - assert.Equal(t, v, carrier.Get(k)) - } - assert.Equal(t, "", carrier.Get("none")) - }) - } -} - -func TestGrpcCarrier(t *testing.T) { - tests := []map[string]string{ - {}, - { - "first": "a", - "second": "b", - }, - } - - for _, test := range tests { - t.Run(stringx.RandId(), func(t *testing.T) { - m := make(map[string][]string) - carrier := grpcCarrier(m) - for k, v := range test { - carrier.Set(k, v) - } - for k, v := range test { - assert.Equal(t, v, carrier.Get(k)) - } - assert.Equal(t, "", carrier.Get("none")) - }) - } -} diff --git a/core/trace/opentelemetry/config.go b/core/trace/config.go similarity index 75% rename from core/trace/opentelemetry/config.go rename to core/trace/config.go index 0d3b2689..ff18bca1 100644 --- a/core/trace/opentelemetry/config.go +++ b/core/trace/config.go @@ -1,4 +1,4 @@ -package opentelemetry +package trace // TraceName represents the tracing name. const TraceName = "go-zero" @@ -8,5 +8,5 @@ type Config struct { Name string `json:",optional"` Endpoint string `json:",optional"` Sampler float64 `json:",default=1.0"` - Batcher string `json:",default=jaeger"` + Batcher string `json:",default=jaeger,options=jaeger|zipkin"` } diff --git a/core/trace/constants.go b/core/trace/constants.go deleted file mode 100644 index 33be4a19..00000000 --- a/core/trace/constants.go +++ /dev/null @@ -1,8 +0,0 @@ -package trace - -const ( - // TraceIdKey is the trace id header. - TraceIdKey = "X-Trace-ID" - - spanIdKey = "X-Span-ID" -) diff --git a/core/trace/opentelemetry/message.go b/core/trace/message.go similarity index 97% rename from core/trace/opentelemetry/message.go rename to core/trace/message.go index f637dbfa..de0c5f90 100644 --- a/core/trace/opentelemetry/message.go +++ b/core/trace/message.go @@ -1,4 +1,4 @@ -package opentelemetry +package trace import ( "context" diff --git a/core/trace/noop.go b/core/trace/noop.go deleted file mode 100644 index 5df47fc3..00000000 --- a/core/trace/noop.go +++ /dev/null @@ -1,33 +0,0 @@ -package trace - -import ( - "context" - - "github.com/tal-tech/go-zero/core/trace/tracespec" -) - -var emptyNoopSpan = noopSpan{} - -type noopSpan struct{} - -func (s noopSpan) Finish() { -} - -func (s noopSpan) Follow(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) { - return ctx, emptyNoopSpan -} - -func (s noopSpan) Fork(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) { - return ctx, emptyNoopSpan -} - -func (s noopSpan) SpanId() string { - return "" -} - -func (s noopSpan) TraceId() string { - return "" -} - -func (s noopSpan) Visit(fn func(key, val string) bool) { -} diff --git a/core/trace/noop_test.go b/core/trace/noop_test.go deleted file mode 100644 index 10bde427..00000000 --- a/core/trace/noop_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package trace - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestNoopSpan_Fork(t *testing.T) { - ctx, span := emptyNoopSpan.Fork(context.Background(), "", "") - assert.Equal(t, emptyNoopSpan, span) - assert.Equal(t, context.Background(), ctx) -} - -func TestNoopSpan_Follow(t *testing.T) { - ctx, span := emptyNoopSpan.Follow(context.Background(), "", "") - assert.Equal(t, emptyNoopSpan, span) - assert.Equal(t, context.Background(), ctx) -} - -func TestNoopSpan(t *testing.T) { - emptyNoopSpan.Visit(func(key, val string) bool { - assert.Fail(t, "should not go here") - return true - }) - - ctx, span := emptyNoopSpan.Follow(context.Background(), "", "") - assert.Equal(t, context.Background(), ctx) - assert.Equal(t, "", span.TraceId()) - assert.Equal(t, "", span.SpanId()) -} diff --git a/core/trace/opentelemetry/agent.go b/core/trace/opentelemetry/agent.go deleted file mode 100644 index d9825fe2..00000000 --- a/core/trace/opentelemetry/agent.go +++ /dev/null @@ -1,62 +0,0 @@ -package opentelemetry - -import ( - "sync" - - "github.com/tal-tech/go-zero/core/logx" - "github.com/tal-tech/go-zero/core/syncx" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/jaeger" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" -) - -var ( - once sync.Once - enabled syncx.AtomicBool -) - -// Enabled returns if opentelemetry is enabled. -func Enabled() bool { - return enabled.True() -} - -// StartAgent starts a opentelemetry agent. -func StartAgent(c Config) { - once.Do(func() { - if len(c.Endpoint) == 0 { - return - } - - // Just support jaeger now - if c.Batcher != "jaeger" { - return - } - - exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(c.Endpoint))) - if err != nil { - logx.Error(err) - return - } - - tp := sdktrace.NewTracerProvider( - // Set the sampling rate based on the parent span to 100% - sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(c.Sampler))), - // Always be sure to batch in production. - sdktrace.WithBatcher(exp), - // Record information about this application in an Resource. - sdktrace.WithResource(resource.NewSchemaless(semconv.ServiceNameKey.String(c.Name))), - ) - - otel.SetTracerProvider(tp) - otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( - propagation.TraceContext{}, propagation.Baggage{})) - otel.SetErrorHandler(otel.ErrorHandlerFunc(func(e error) { - logx.Errorf("[otel] error: %v", err) - })) - - enabled.Set(true) - }) -} diff --git a/core/trace/opentelemetry/clientstream.go b/core/trace/opentelemetry/clientstream.go deleted file mode 100644 index c6c5456d..00000000 --- a/core/trace/opentelemetry/clientstream.go +++ /dev/null @@ -1,121 +0,0 @@ -package opentelemetry - -import ( - "context" - "io" - - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" -) - -const ( - receiveEndEvent streamEventType = iota - errorEvent -) - -type ( - streamEventType int - - streamEvent struct { - Type streamEventType - Err error - } - - clientStream struct { - grpc.ClientStream - Finished chan error - desc *grpc.StreamDesc - events chan streamEvent - eventsDone chan struct{} - receivedMessageID int - sentMessageID int - } -) - -func (w *clientStream) RecvMsg(m interface{}) error { - err := w.ClientStream.RecvMsg(m) - if err == nil && !w.desc.ServerStreams { - w.sendStreamEvent(receiveEndEvent, nil) - } else if err == io.EOF { - w.sendStreamEvent(receiveEndEvent, nil) - } else if err != nil { - w.sendStreamEvent(errorEvent, err) - } else { - w.receivedMessageID++ - MessageReceived.Event(w.Context(), w.receivedMessageID, m) - } - - return err -} - -func (w *clientStream) SendMsg(m interface{}) error { - err := w.ClientStream.SendMsg(m) - w.sentMessageID++ - MessageSent.Event(w.Context(), w.sentMessageID, m) - if err != nil { - w.sendStreamEvent(errorEvent, err) - } - - return err -} - -func (w *clientStream) Header() (metadata.MD, error) { - md, err := w.ClientStream.Header() - if err != nil { - w.sendStreamEvent(errorEvent, err) - } - - return md, err -} - -func (w *clientStream) CloseSend() error { - err := w.ClientStream.CloseSend() - if err != nil { - w.sendStreamEvent(errorEvent, err) - } - - return err -} - -func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) { - select { - case <-w.eventsDone: - case w.events <- streamEvent{Type: eventType, Err: err}: - } -} - -// WrapClientStream wraps s with given ctx and desc. -func WrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream { - events := make(chan streamEvent) - eventsDone := make(chan struct{}) - finished := make(chan error) - - go func() { - defer close(eventsDone) - - for { - select { - case event := <-events: - switch event.Type { - case receiveEndEvent: - finished <- nil - return - case errorEvent: - finished <- event.Err - return - } - case <-ctx.Done(): - finished <- ctx.Err() - return - } - } - }() - - return &clientStream{ - ClientStream: s, - desc: desc, - events: events, - eventsDone: eventsDone, - Finished: finished, - } -} diff --git a/core/trace/opentelemetry/serverstream.go b/core/trace/opentelemetry/serverstream.go deleted file mode 100644 index d67146bc..00000000 --- a/core/trace/opentelemetry/serverstream.go +++ /dev/null @@ -1,47 +0,0 @@ -package opentelemetry - -import ( - "context" - - "google.golang.org/grpc" -) - -// serverStream wraps around the embedded grpc.ServerStream, -// and intercepts the RecvMsg and SendMsg method call. -type serverStream struct { - grpc.ServerStream - ctx context.Context - - receivedMessageID int - sentMessageID int -} - -func (w *serverStream) Context() context.Context { - return w.ctx -} - -func (w *serverStream) RecvMsg(m interface{}) error { - err := w.ServerStream.RecvMsg(m) - if err == nil { - w.receivedMessageID++ - MessageReceived.Event(w.Context(), w.receivedMessageID, m) - } - - return err -} - -func (w *serverStream) SendMsg(m interface{}) error { - err := w.ServerStream.SendMsg(m) - w.sentMessageID++ - MessageSent.Event(w.Context(), w.sentMessageID, m) - - return err -} - -// WrapServerStream wraps the given grpc.ServerStream with the given context. -func WrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream { - return &serverStream{ - ServerStream: ss, - ctx: ctx, - } -} diff --git a/core/trace/propagator.go b/core/trace/propagator.go deleted file mode 100644 index 9d18979d..00000000 --- a/core/trace/propagator.go +++ /dev/null @@ -1,90 +0,0 @@ -package trace - -import ( - "net/http" - - "google.golang.org/grpc/metadata" -) - -const ( - // HttpFormat means http carrier format. - HttpFormat = iota - // GrpcFormat means grpc carrier format. - GrpcFormat -) - -var ( - emptyHttpPropagator httpPropagator - emptyGrpcPropagator grpcPropagator -) - -type ( - // Propagator interface wraps the Extract and Inject methods. - Propagator interface { - Extract(carrier interface{}) (Carrier, error) - Inject(carrier interface{}) (Carrier, error) - } - - httpPropagator struct{} - grpcPropagator struct{} -) - -func (h httpPropagator) Extract(carrier interface{}) (Carrier, error) { - if c, ok := carrier.(http.Header); ok { - return httpCarrier(c), nil - } - - return nil, ErrInvalidCarrier -} - -func (h httpPropagator) Inject(carrier interface{}) (Carrier, error) { - if c, ok := carrier.(http.Header); ok { - return httpCarrier(c), nil - } - - return nil, ErrInvalidCarrier -} - -func (g grpcPropagator) Extract(carrier interface{}) (Carrier, error) { - if c, ok := carrier.(metadata.MD); ok { - return grpcCarrier(c), nil - } - - return nil, ErrInvalidCarrier -} - -func (g grpcPropagator) Inject(carrier interface{}) (Carrier, error) { - if c, ok := carrier.(metadata.MD); ok { - return grpcCarrier(c), nil - } - - return nil, ErrInvalidCarrier -} - -// Extract extracts tracing information from carrier with given format. -func Extract(format, carrier interface{}) (Carrier, error) { - switch v := format.(type) { - case int: - if v == HttpFormat { - return emptyHttpPropagator.Extract(carrier) - } else if v == GrpcFormat { - return emptyGrpcPropagator.Extract(carrier) - } - } - - return nil, ErrInvalidCarrier -} - -// Inject injects tracing information into carrier with given format. -func Inject(format, carrier interface{}) (Carrier, error) { - switch v := format.(type) { - case int: - if v == HttpFormat { - return emptyHttpPropagator.Inject(carrier) - } else if v == GrpcFormat { - return emptyGrpcPropagator.Inject(carrier) - } - } - - return nil, ErrInvalidCarrier -} diff --git a/core/trace/propagator_test.go b/core/trace/propagator_test.go deleted file mode 100644 index 4dd4e60f..00000000 --- a/core/trace/propagator_test.go +++ /dev/null @@ -1,68 +0,0 @@ -package trace - -import ( - "net/http" - "net/http/httptest" - "testing" - - "github.com/stretchr/testify/assert" - "google.golang.org/grpc/metadata" -) - -func TestHttpPropagator_Extract(t *testing.T) { - req := httptest.NewRequest(http.MethodGet, "http://localhost", nil) - req.Header.Set(TraceIdKey, "trace") - req.Header.Set(spanIdKey, "span") - carrier, err := Extract(HttpFormat, req.Header) - assert.Nil(t, err) - assert.Equal(t, "trace", carrier.Get(TraceIdKey)) - assert.Equal(t, "span", carrier.Get(spanIdKey)) - - _, err = Extract(HttpFormat, req) - assert.Equal(t, ErrInvalidCarrier, err) -} - -func TestHttpPropagator_Inject(t *testing.T) { - req := httptest.NewRequest(http.MethodGet, "http://localhost", nil) - req.Header.Set(TraceIdKey, "trace") - req.Header.Set(spanIdKey, "span") - carrier, err := Inject(HttpFormat, req.Header) - assert.Nil(t, err) - assert.Equal(t, "trace", carrier.Get(TraceIdKey)) - assert.Equal(t, "span", carrier.Get(spanIdKey)) - - _, err = Inject(HttpFormat, req) - assert.Equal(t, ErrInvalidCarrier, err) -} - -func TestGrpcPropagator_Extract(t *testing.T) { - md := metadata.New(map[string]string{ - TraceIdKey: "trace", - spanIdKey: "span", - }) - carrier, err := Extract(GrpcFormat, md) - assert.Nil(t, err) - assert.Equal(t, "trace", carrier.Get(TraceIdKey)) - assert.Equal(t, "span", carrier.Get(spanIdKey)) - - _, err = Extract(GrpcFormat, 1) - assert.Equal(t, ErrInvalidCarrier, err) - _, err = Extract(nil, 1) - assert.Equal(t, ErrInvalidCarrier, err) -} - -func TestGrpcPropagator_Inject(t *testing.T) { - md := metadata.New(map[string]string{ - TraceIdKey: "trace", - spanIdKey: "span", - }) - carrier, err := Inject(GrpcFormat, md) - assert.Nil(t, err) - assert.Equal(t, "trace", carrier.Get(TraceIdKey)) - assert.Equal(t, "span", carrier.Get(spanIdKey)) - - _, err = Inject(GrpcFormat, 1) - assert.Equal(t, ErrInvalidCarrier, err) - _, err = Inject(nil, 1) - assert.Equal(t, ErrInvalidCarrier, err) -} diff --git a/core/trace/span.go b/core/trace/span.go deleted file mode 100644 index 1174283e..00000000 --- a/core/trace/span.go +++ /dev/null @@ -1,150 +0,0 @@ -package trace - -import ( - "context" - "fmt" - "strconv" - "strings" - "time" - - "github.com/tal-tech/go-zero/core/stringx" - "github.com/tal-tech/go-zero/core/timex" - "github.com/tal-tech/go-zero/core/trace/tracespec" -) - -const ( - initSpanId = "0" - clientFlag = "client" - serverFlag = "server" - spanSepRune = '.' -) - -var spanSep = string([]byte{spanSepRune}) - -// A Span is a calling span that connects caller and callee. -type Span struct { - ctx spanContext - serviceName string - operationName string - startTime time.Time - flag string - children int -} - -func newServerSpan(carrier Carrier, serviceName, operationName string) tracespec.Trace { - traceId := stringx.TakeWithPriority(func() string { - if carrier != nil { - return carrier.Get(TraceIdKey) - } - return "" - }, stringx.RandId) - spanId := stringx.TakeWithPriority(func() string { - if carrier != nil { - return carrier.Get(spanIdKey) - } - return "" - }, func() string { - return initSpanId - }) - - return &Span{ - ctx: spanContext{ - traceId: traceId, - spanId: spanId, - }, - serviceName: serviceName, - operationName: operationName, - startTime: timex.Time(), - flag: serverFlag, - } -} - -// Finish finishes the calling span. -func (s *Span) Finish() { -} - -// Follow follows the tracing service and operation names in context. -func (s *Span) Follow(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) { - span := &Span{ - ctx: spanContext{ - traceId: s.ctx.traceId, - spanId: s.followSpanId(), - }, - serviceName: serviceName, - operationName: operationName, - startTime: timex.Time(), - flag: s.flag, - } - return context.WithValue(ctx, tracespec.TracingKey, span), span -} - -// Fork forks the tracing service and operation names in context. -func (s *Span) Fork(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) { - span := &Span{ - ctx: spanContext{ - traceId: s.ctx.traceId, - spanId: s.forkSpanId(), - }, - serviceName: serviceName, - operationName: operationName, - startTime: timex.Time(), - flag: clientFlag, - } - return context.WithValue(ctx, tracespec.TracingKey, span), span -} - -// SpanId returns the span id. -func (s *Span) SpanId() string { - return s.ctx.SpanId() -} - -// TraceId returns the trace id. -func (s *Span) TraceId() string { - return s.ctx.TraceId() -} - -// Visit visits the span using fn. -func (s *Span) Visit(fn func(key, val string) bool) { - s.ctx.Visit(fn) -} - -func (s *Span) forkSpanId() string { - s.children++ - return fmt.Sprintf("%s.%d", s.ctx.spanId, s.children) -} - -func (s *Span) followSpanId() string { - fields := strings.FieldsFunc(s.ctx.spanId, func(r rune) bool { - return r == spanSepRune - }) - if len(fields) == 0 { - return s.ctx.spanId - } - - last := fields[len(fields)-1] - val, err := strconv.Atoi(last) - if err != nil { - return s.ctx.spanId - } - - last = strconv.Itoa(val + 1) - fields[len(fields)-1] = last - - return strings.Join(fields, spanSep) -} - -// StartClientSpan starts the client span with given context, service and operation names. -func StartClientSpan(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) { - if span, ok := ctx.Value(tracespec.TracingKey).(*Span); ok { - return span.Fork(ctx, serviceName, operationName) - } - - return ctx, emptyNoopSpan -} - -// StartServerSpan starts the server span with given context, carrier, service and operation names. -func StartServerSpan(ctx context.Context, carrier Carrier, serviceName, operationName string) ( - context.Context, tracespec.Trace) { - span := newServerSpan(carrier, serviceName, operationName) - return context.WithValue(ctx, tracespec.TracingKey, span), span -} diff --git a/core/trace/span_test.go b/core/trace/span_test.go deleted file mode 100644 index 8323b226..00000000 --- a/core/trace/span_test.go +++ /dev/null @@ -1,139 +0,0 @@ -package trace - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/tal-tech/go-zero/core/stringx" - "github.com/tal-tech/go-zero/core/trace/tracespec" - "google.golang.org/grpc/metadata" -) - -func TestClientSpan(t *testing.T) { - span := newServerSpan(nil, "service", "operation") - ctx := context.WithValue(context.Background(), tracespec.TracingKey, span) - ctx, span = StartClientSpan(ctx, "entrance", "operation") - defer span.Finish() - assert.Equal(t, span, ctx.Value(tracespec.TracingKey)) - - const serviceName = "authorization" - const operationName = "verification" - ctx, childSpan := span.Fork(ctx, serviceName, operationName) - defer childSpan.Finish() - - assert.Equal(t, childSpan, ctx.Value(tracespec.TracingKey)) - assert.Equal(t, getSpan(span).TraceId(), getSpan(childSpan).TraceId()) - assert.Equal(t, "0.1.1", getSpan(childSpan).SpanId()) - assert.Equal(t, serviceName, childSpan.(*Span).serviceName) - assert.Equal(t, operationName, childSpan.(*Span).operationName) - assert.Equal(t, clientFlag, childSpan.(*Span).flag) -} - -func TestClientSpan_WithoutTrace(t *testing.T) { - ctx, span := StartClientSpan(context.Background(), "entrance", "operation") - defer span.Finish() - assert.Equal(t, emptyNoopSpan, span) - assert.Equal(t, context.Background(), ctx) -} - -func TestServerSpan(t *testing.T) { - ctx, span := StartServerSpan(context.Background(), nil, "service", "operation") - defer span.Finish() - assert.Equal(t, span, ctx.Value(tracespec.TracingKey)) - - const serviceName = "authorization" - const operationName = "verification" - ctx, childSpan := span.Fork(ctx, serviceName, operationName) - defer childSpan.Finish() - - assert.Equal(t, childSpan, ctx.Value(tracespec.TracingKey)) - assert.Equal(t, getSpan(span).TraceId(), getSpan(childSpan).TraceId()) - assert.Equal(t, "0.1", getSpan(childSpan).SpanId()) - assert.Equal(t, serviceName, childSpan.(*Span).serviceName) - assert.Equal(t, operationName, childSpan.(*Span).operationName) - assert.Equal(t, clientFlag, childSpan.(*Span).flag) -} - -func TestServerSpan_WithCarrier(t *testing.T) { - md := metadata.New(map[string]string{ - TraceIdKey: "a", - spanIdKey: "0.1", - }) - ctx, span := StartServerSpan(context.Background(), grpcCarrier(md), "service", "operation") - defer span.Finish() - assert.Equal(t, span, ctx.Value(tracespec.TracingKey)) - - const serviceName = "authorization" - const operationName = "verification" - ctx, childSpan := span.Fork(ctx, serviceName, operationName) - defer childSpan.Finish() - - assert.Equal(t, childSpan, ctx.Value(tracespec.TracingKey)) - assert.Equal(t, getSpan(span).TraceId(), getSpan(childSpan).TraceId()) - assert.Equal(t, "0.1.1", getSpan(childSpan).SpanId()) - assert.Equal(t, serviceName, childSpan.(*Span).serviceName) - assert.Equal(t, operationName, childSpan.(*Span).operationName) - assert.Equal(t, clientFlag, childSpan.(*Span).flag) -} - -func TestSpan_Follow(t *testing.T) { - tests := []struct { - span string - expectSpan string - }{ - { - "0.1", - "0.2", - }, - { - "0", - "1", - }, - { - "a", - "a", - }, - } - - for _, test := range tests { - t.Run(stringx.RandId(), func(t *testing.T) { - md := metadata.New(map[string]string{ - TraceIdKey: "a", - spanIdKey: test.span, - }) - ctx, span := StartServerSpan(context.Background(), grpcCarrier(md), - "service", "operation") - defer span.Finish() - assert.Equal(t, span, ctx.Value(tracespec.TracingKey)) - - const serviceName = "authorization" - const operationName = "verification" - ctx, childSpan := span.Follow(ctx, serviceName, operationName) - defer childSpan.Finish() - - assert.Equal(t, childSpan, ctx.Value(tracespec.TracingKey)) - assert.Equal(t, getSpan(span).TraceId(), getSpan(childSpan).TraceId()) - assert.Equal(t, test.expectSpan, getSpan(childSpan).SpanId()) - assert.Equal(t, serviceName, childSpan.(*Span).serviceName) - assert.Equal(t, operationName, childSpan.(*Span).operationName) - assert.Equal(t, span.(*Span).flag, childSpan.(*Span).flag) - }) - } -} - -func TestSpan_Visit(t *testing.T) { - var run bool - span := newServerSpan(nil, "service", "operation") - span.Visit(func(key, val string) bool { - assert.True(t, len(key) > 0) - assert.True(t, len(val) > 0) - run = true - return true - }) - assert.True(t, run) -} - -func getSpan(span tracespec.Trace) tracespec.Trace { - return span.(*Span) -} diff --git a/core/trace/spancontext.go b/core/trace/spancontext.go deleted file mode 100644 index 7fc82257..00000000 --- a/core/trace/spancontext.go +++ /dev/null @@ -1,19 +0,0 @@ -package trace - -type spanContext struct { - traceId string - spanId string -} - -func (sc spanContext) TraceId() string { - return sc.traceId -} - -func (sc spanContext) SpanId() string { - return sc.spanId -} - -func (sc spanContext) Visit(fn func(key, val string) bool) { - fn(TraceIdKey, sc.traceId) - fn(spanIdKey, sc.spanId) -} diff --git a/core/trace/opentelemetry/tracer.go b/core/trace/tracer.go similarity index 98% rename from core/trace/opentelemetry/tracer.go rename to core/trace/tracer.go index f70021e0..109c1ba2 100644 --- a/core/trace/opentelemetry/tracer.go +++ b/core/trace/tracer.go @@ -1,4 +1,4 @@ -package opentelemetry +package trace import ( "context" diff --git a/core/trace/opentelemetry/tracer_test.go b/core/trace/tracer_test.go similarity index 99% rename from core/trace/opentelemetry/tracer_test.go rename to core/trace/tracer_test.go index cfe43d33..8f0085ff 100644 --- a/core/trace/opentelemetry/tracer_test.go +++ b/core/trace/tracer_test.go @@ -1,4 +1,4 @@ -package opentelemetry +package trace import ( "context" diff --git a/core/trace/tracespec/keys.go b/core/trace/tracespec/keys.go deleted file mode 100644 index c6da8c79..00000000 --- a/core/trace/tracespec/keys.go +++ /dev/null @@ -1,12 +0,0 @@ -package tracespec - -// TracingKey is tracing key for context -var TracingKey = contextKey("X-Trace") - -// contextKey a type for context key -type contextKey string - -// String returns a context will reveal a fair amount of information about it. -func (c contextKey) String() string { - return "trace/tracespec context key " + string(c) -} diff --git a/core/trace/tracespec/spancontext.go b/core/trace/tracespec/spancontext.go deleted file mode 100644 index cb91d888..00000000 --- a/core/trace/tracespec/spancontext.go +++ /dev/null @@ -1,8 +0,0 @@ -package tracespec - -// SpanContext interface that represents a span context. -type SpanContext interface { - TraceId() string - SpanId() string - Visit(fn func(key, val string) bool) -} diff --git a/core/trace/tracespec/trace.go b/core/trace/tracespec/trace.go deleted file mode 100644 index b0576ce2..00000000 --- a/core/trace/tracespec/trace.go +++ /dev/null @@ -1,11 +0,0 @@ -package tracespec - -import "context" - -// Trace interface represents a tracing. -type Trace interface { - SpanContext - Finish() - Fork(ctx context.Context, serviceName, operationName string) (context.Context, Trace) - Follow(ctx context.Context, serviceName, operationName string) (context.Context, Trace) -} diff --git a/core/trace/opentelemetry/utils.go b/core/trace/utils.go similarity index 98% rename from core/trace/opentelemetry/utils.go rename to core/trace/utils.go index e1074c13..9477375e 100644 --- a/core/trace/opentelemetry/utils.go +++ b/core/trace/utils.go @@ -1,4 +1,4 @@ -package opentelemetry +package trace import ( "context" diff --git a/core/trace/opentelemetry/utils_test.go b/core/trace/utils_test.go similarity index 98% rename from core/trace/opentelemetry/utils_test.go rename to core/trace/utils_test.go index a064a329..e6bb9590 100644 --- a/core/trace/opentelemetry/utils_test.go +++ b/core/trace/utils_test.go @@ -1,4 +1,4 @@ -package opentelemetry +package trace import ( "testing" diff --git a/core/trace/vars.go b/core/trace/vars.go new file mode 100644 index 00000000..d8cee290 --- /dev/null +++ b/core/trace/vars.go @@ -0,0 +1,8 @@ +package trace + +import "net/http" + +// TraceIdKey is the trace id header. +// https://www.w3.org/TR/trace-context/#trace-id +// May change it to trace-id afterwards. +var TraceIdKey = http.CanonicalHeaderKey("x-trace-id") diff --git a/go.mod b/go.mod index de6d68e2..369674db 100644 --- a/go.mod +++ b/go.mod @@ -35,13 +35,14 @@ require ( github.com/zeromicro/protobuf v0.0.0-20210921042113-636cd51f0c35 go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.0 - go.opentelemetry.io/otel v1.0.0 - go.opentelemetry.io/otel/exporters/jaeger v1.0.0 - go.opentelemetry.io/otel/sdk v1.0.0 - go.opentelemetry.io/otel/trace v1.0.0 + go.opentelemetry.io/otel v1.0.1 + go.opentelemetry.io/otel/exporters/jaeger v1.0.1 + go.opentelemetry.io/otel/exporters/zipkin v1.0.1 + go.opentelemetry.io/otel/sdk v1.0.1 + go.opentelemetry.io/otel/trace v1.0.1 go.uber.org/automaxprocs v1.3.0 golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b // indirect - golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 // indirect + golang.org/x/sys v0.0.0-20211002104244-808efd93c36d // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac google.golang.org/genproto v0.0.0-20210928142010-c7af6a1a74c9 // indirect diff --git a/go.sum b/go.sum index 481ba643..7f8cd559 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -88,6 +90,9 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/proto v1.9.0 h1:l0QiNT6Qs7Yj0Mb4X6dnWBQer4ebei2BFcgQLbGqUDc= @@ -178,6 +183,7 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -207,6 +213,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I= github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg= +github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= +github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= @@ -281,12 +289,17 @@ github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/openzipkin/zipkin-go v0.2.5 h1:UwtQQx2pyPIgWYHRg+epgdx1/HnBQTgN3/oIYEJTQzU= +github.com/openzipkin/zipkin-go v0.2.5/go.mod h1:KpXfKdgRDnnhsxw4pNIH9Md5lyFqKUa4YDFlwRYAMyE= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.5.1+incompatible h1:Yq0up0149Hh5Ekhm/91lgkZuD1ZDnXNM26bycpTzYBM= github.com/pierrec/lz4 v2.5.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -294,6 +307,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -315,6 +329,7 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -331,6 +346,7 @@ github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTd github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= @@ -365,14 +381,16 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.opentelemetry.io/otel v1.0.0 h1:qTTn6x71GVBvoafHK/yaRUmFzI4LcONZD0/kXxl5PHI= -go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg= -go.opentelemetry.io/otel/exporters/jaeger v1.0.0 h1:cLhx8llHw02h5JTqGqaRbYn+QVKHmrzD9vEbKnSPk5U= -go.opentelemetry.io/otel/exporters/jaeger v1.0.0/go.mod h1:q10N1AolE1JjqKrFJK2tYw0iZpmX+HBaXBtuCzRnBGQ= -go.opentelemetry.io/otel/sdk v1.0.0 h1:BNPMYUONPNbLneMttKSjQhOTlFLOD9U22HNG1KrIN2Y= -go.opentelemetry.io/otel/sdk v1.0.0/go.mod h1:PCrDHlSy5x1kjezSdL37PhbFUMjrsLRshJ2zCzeXwbM= -go.opentelemetry.io/otel/trace v1.0.0 h1:TSBr8GTEtKevYMG/2d21M989r5WJYVimhTHBKVEZuh4= -go.opentelemetry.io/otel/trace v1.0.0/go.mod h1:PXTWqayeFUlJV1YDNhsJYB184+IvAH814St6o6ajzIs= +go.opentelemetry.io/otel v1.0.1 h1:4XKyXmfqJLOQ7feyV5DB6gsBFZ0ltB8vLtp6pj4JIcc= +go.opentelemetry.io/otel v1.0.1/go.mod h1:OPEOD4jIT2SlZPMmwT6FqZz2C0ZNdQqiWcoK6M0SNFU= +go.opentelemetry.io/otel/exporters/jaeger v1.0.1 h1:fg9udWIWWJMAT+Gq2ATFd/DFy3OZvKEZy9VK2amxvkw= +go.opentelemetry.io/otel/exporters/jaeger v1.0.1/go.mod h1:85Ym3qknJdIdfRzYS9Ofy9NeLi9gKPFzFDBEHCKpfXI= +go.opentelemetry.io/otel/exporters/zipkin v1.0.1 h1:Li6OvM1Po5qrP+HnXlZa+FyLkMun7JG4R0vTAch12qs= +go.opentelemetry.io/otel/exporters/zipkin v1.0.1/go.mod h1:KXb2W6IVINSd/rKugSARqP3TsByxngvea3B1vm5ju74= +go.opentelemetry.io/otel/sdk v1.0.1 h1:wXxFEWGo7XfXupPwVJvTBOaPBC9FEg0wB8hMNrKk+cA= +go.opentelemetry.io/otel/sdk v1.0.1/go.mod h1:HrdXne+BiwsOHYYkBE5ysIcv2bvdZstxzmCQhxTcZkI= +go.opentelemetry.io/otel/trace v1.0.1 h1:StTeIH6Q3G4r0Fiw34LTokUFESZgIDUr0qIJ7mKmAfw= +go.opentelemetry.io/otel/trace v1.0.1/go.mod h1:5g4i4fKLaX2BQpSBsxw8YYcgKpMMSW3x7ZTuYBr3sUk= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -509,8 +527,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 h1:foEbQz/B0Oz6YIqu/69kfXPYeFQAuuMYFkjaqXzl5Wo= -golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211002104244-808efd93c36d h1:SABT8Vei3iTiu+Gy8KOzpSNz+W1EQ5YBCRtiEETxF+0= +golang.org/x/sys v0.0.0-20211002104244-808efd93c36d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -614,6 +632,7 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= diff --git a/rest/engine.go b/rest/engine.go index 4ed8e0cd..216d3cad 100644 --- a/rest/engine.go +++ b/rest/engine.go @@ -107,8 +107,7 @@ func (s *engine) bindFeaturedRoutes(router httpx.Router, fr featuredRoutes, metr func (s *engine) bindRoute(fr featuredRoutes, router httpx.Router, metrics *stat.Metrics, route Route, verifier func(chain alice.Chain) alice.Chain) error { chain := alice.New( - handler.TracingHandler, - handler.OtelHandler(s.conf.Name, route.Path), + handler.TracingHandler(s.conf.Name, route.Path), s.getLogHandler(), handler.PrometheusHandler(route.Path), handler.MaxConns(s.conf.MaxConns), diff --git a/rest/handler/otelhandler.go b/rest/handler/otelhandler.go deleted file mode 100644 index 9f41d484..00000000 --- a/rest/handler/otelhandler.go +++ /dev/null @@ -1,37 +0,0 @@ -package handler - -import ( - "net/http" - - "github.com/tal-tech/go-zero/core/trace/opentelemetry" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/propagation" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" - oteltrace "go.opentelemetry.io/otel/trace" -) - -// OtelHandler return a middleware that process the opentelemetry. -func OtelHandler(serviceName, path string) func(http.Handler) http.Handler { - return func(next http.Handler) http.Handler { - if !opentelemetry.Enabled() { - return next - } - - propagator := otel.GetTextMapPropagator() - tracer := otel.GetTracerProvider().Tracer(opentelemetry.TraceName) - - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := propagator.Extract(r.Context(), propagation.HeaderCarrier(r.Header)) - spanCtx, span := tracer.Start( - ctx, - path, - oteltrace.WithSpanKind(oteltrace.SpanKindServer), - oteltrace.WithAttributes(semconv.HTTPServerAttributesFromHTTPRequest( - serviceName, path, r)...), - ) - defer span.End() - - next.ServeHTTP(w, r.WithContext(spanCtx)) - }) - } -} diff --git a/rest/handler/otelhandler_test.go b/rest/handler/otelhandler_test.go deleted file mode 100644 index 8812046d..00000000 --- a/rest/handler/otelhandler_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package handler - -import ( - "context" - "net/http" - "net/http/httptest" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/tal-tech/go-zero/core/trace/opentelemetry" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/trace" -) - -func TestOtelHandler(t *testing.T) { - opentelemetry.StartAgent(opentelemetry.Config{ - Name: "go-zero-test", - Endpoint: "http://localhost:14268/api/traces", - Batcher: "jaeger", - Sampler: 1.0, - }) - - ts := httptest.NewServer( - http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header)) - spanCtx := trace.SpanContextFromContext(ctx) - assert.Equal(t, true, spanCtx.IsValid()) - }), - ) - defer ts.Close() - - client := ts.Client() - err := func(ctx context.Context) error { - ctx, span := otel.Tracer("httptrace/client").Start(ctx, "test") - defer span.End() - - req, _ := http.NewRequest("GET", ts.URL, nil) - otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header)) - - res, err := client.Do(req) - assert.Equal(t, err, nil) - _ = res.Body.Close() - return nil - }(context.Background()) - - assert.Equal(t, err, nil) -} diff --git a/rest/handler/tracinghandler.go b/rest/handler/tracinghandler.go index 73dbe531..8936e928 100644 --- a/rest/handler/tracinghandler.go +++ b/rest/handler/tracinghandler.go @@ -3,26 +3,37 @@ package handler import ( "net/http" - "github.com/tal-tech/go-zero/core/logx" - "github.com/tal-tech/go-zero/core/sysx" "github.com/tal-tech/go-zero/core/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + oteltrace "go.opentelemetry.io/otel/trace" ) -// TracingHandler returns a middleware that traces the request. -func TracingHandler(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - carrier, err := trace.Extract(trace.HttpFormat, r.Header) - // ErrInvalidCarrier means no trace id was set in http header - if err != nil && err != trace.ErrInvalidCarrier { - logx.Error(err) - } +// TracingHandler return a middleware that process the opentelemetry. +func TracingHandler(serviceName, path string) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + propagator := otel.GetTextMapPropagator() + tracer := otel.GetTracerProvider().Tracer(trace.TraceName) - ctx, span := trace.StartServerSpan(r.Context(), carrier, sysx.Hostname(), r.RequestURI) - defer span.Finish() - r = r.WithContext(ctx) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := propagator.Extract(r.Context(), propagation.HeaderCarrier(r.Header)) + spanCtx, span := tracer.Start( + ctx, + path, + oteltrace.WithSpanKind(oteltrace.SpanKindServer), + oteltrace.WithAttributes(semconv.HTTPServerAttributesFromHTTPRequest( + serviceName, path, r)...), + ) + defer span.End() - // conveniently tracking error messages - w.Header().Set(trace.TraceIdKey, span.TraceId()) - next.ServeHTTP(w, r) - }) + // convenient for tracking error messages + sc := span.SpanContext() + if sc.HasTraceID() { + w.Header().Set(trace.TraceIdKey, sc.TraceID().String()) + } + + next.ServeHTTP(w, r.WithContext(spanCtx)) + }) + } } diff --git a/rest/handler/tracinghandler_test.go b/rest/handler/tracinghandler_test.go index 0d5e0fbd..f4e0fbb3 100644 --- a/rest/handler/tracinghandler_test.go +++ b/rest/handler/tracinghandler_test.go @@ -1,31 +1,48 @@ package handler import ( + "context" "net/http" "net/http/httptest" "testing" "github.com/stretchr/testify/assert" - "github.com/tal-tech/go-zero/core/stringx" - "github.com/tal-tech/go-zero/core/trace" - "github.com/tal-tech/go-zero/core/trace/tracespec" + ztrace "github.com/tal-tech/go-zero/core/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" ) -func TestTracingHandler(t *testing.T) { - req := httptest.NewRequest(http.MethodGet, "http://localhost", nil) +func TestOtelHandler(t *testing.T) { + ztrace.StartAgent(ztrace.Config{ + Name: "go-zero-test", + Endpoint: "http://localhost:14268/api/traces", + Batcher: "jaeger", + Sampler: 1.0, + }) - traceId := stringx.RandId() - req.Header.Set(trace.TraceIdKey, traceId) + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header)) + spanCtx := trace.SpanContextFromContext(ctx) + assert.Equal(t, true, spanCtx.IsValid()) + }), + ) + defer ts.Close() - handler := TracingHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - span, ok := r.Context().Value(tracespec.TracingKey).(tracespec.Trace) - assert.True(t, ok) - assert.Equal(t, traceId, span.TraceId()) - })) + client := ts.Client() + err := func(ctx context.Context) error { + ctx, span := otel.Tracer("httptrace/client").Start(ctx, "test") + defer span.End() - resp := httptest.NewRecorder() - handler.ServeHTTP(resp, req) + req, _ := http.NewRequest("GET", ts.URL, nil) + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header)) - assert.Equal(t, http.StatusOK, resp.Code) - assert.Equal(t, traceId, resp.Header().Get(trace.TraceIdKey)) + res, err := client.Do(req) + assert.Equal(t, err, nil) + _ = res.Body.Close() + return nil + }(context.Background()) + + assert.Equal(t, err, nil) } diff --git a/zrpc/internal/client.go b/zrpc/internal/client.go index e8a9c28f..4b942598 100644 --- a/zrpc/internal/client.go +++ b/zrpc/internal/client.go @@ -68,7 +68,6 @@ func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption { grpc.WithBlock(), WithUnaryClientInterceptors( clientinterceptors.UnaryTracingInterceptor, - clientinterceptors.UnaryOpenTracingInterceptor(), clientinterceptors.DurationInterceptor, clientinterceptors.PrometheusInterceptor, clientinterceptors.BreakerInterceptor, @@ -76,7 +75,6 @@ func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption { ), WithStreamClientInterceptors( clientinterceptors.StreamTracingInterceptor, - clientinterceptors.StreamOpenTracingInterceptor(), ), } diff --git a/zrpc/internal/clientinterceptors/opentracinginterceptor.go b/zrpc/internal/clientinterceptors/opentracinginterceptor.go deleted file mode 100644 index 04bc55b8..00000000 --- a/zrpc/internal/clientinterceptors/opentracinginterceptor.go +++ /dev/null @@ -1,92 +0,0 @@ -package clientinterceptors - -import ( - "context" - - "github.com/tal-tech/go-zero/core/trace/opentelemetry" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" - "google.golang.org/grpc" - gcodes "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" -) - -// UnaryOpenTracingInterceptor returns a grpc.UnaryClientInterceptor for opentelemetry. -func UnaryOpenTracingInterceptor() grpc.UnaryClientInterceptor { - propagator := otel.GetTextMapPropagator() - return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, - invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - if !opentelemetry.Enabled() { - return invoker(ctx, method, req, reply, cc, opts...) - } - - requestMetadata, _ := metadata.FromOutgoingContext(ctx) - metadataCopy := requestMetadata.Copy() - tr := otel.Tracer(opentelemetry.TraceName) - name, attr := opentelemetry.SpanInfo(method, cc.Target()) - ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient), - trace.WithAttributes(attr...)) - defer span.End() - - opentelemetry.Inject(ctx, propagator, &metadataCopy) - ctx = metadata.NewOutgoingContext(ctx, metadataCopy) - opentelemetry.MessageSent.Event(ctx, 1, req) - opentelemetry.MessageReceived.Event(ctx, 1, reply) - - if err := invoker(ctx, method, req, reply, cc, opts...); err != nil { - s, _ := status.FromError(err) - span.SetStatus(codes.Error, s.Message()) - span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code())) - return err - } - - span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK)) - return nil - } -} - -// StreamOpenTracingInterceptor returns a grpc.StreamClientInterceptor for opentelemetry. -func StreamOpenTracingInterceptor() grpc.StreamClientInterceptor { - propagator := otel.GetTextMapPropagator() - return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, - streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - if !opentelemetry.Enabled() { - return streamer(ctx, desc, cc, method, opts...) - } - - requestMetadata, _ := metadata.FromOutgoingContext(ctx) - metadataCopy := requestMetadata.Copy() - tr := otel.Tracer(opentelemetry.TraceName) - name, attr := opentelemetry.SpanInfo(method, cc.Target()) - ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient), - trace.WithAttributes(attr...)) - opentelemetry.Inject(ctx, propagator, &metadataCopy) - ctx = metadata.NewOutgoingContext(ctx, metadataCopy) - s, err := streamer(ctx, desc, cc, method, opts...) - if err != nil { - grpcStatus, _ := status.FromError(err) - span.SetStatus(codes.Error, grpcStatus.Message()) - span.SetAttributes(opentelemetry.StatusCodeAttr(grpcStatus.Code())) - span.End() - return s, err - } - - stream := opentelemetry.WrapClientStream(ctx, s, desc) - - go func() { - if err := <-stream.Finished; err != nil { - s, _ := status.FromError(err) - span.SetStatus(codes.Error, s.Message()) - span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code())) - } else { - span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK)) - } - - span.End() - }() - - return stream, nil - } -} diff --git a/zrpc/internal/clientinterceptors/opentracinginterceptor_test.go b/zrpc/internal/clientinterceptors/opentracinginterceptor_test.go deleted file mode 100644 index 1ab5466c..00000000 --- a/zrpc/internal/clientinterceptors/opentracinginterceptor_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package clientinterceptors - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/tal-tech/go-zero/core/trace/opentelemetry" - "google.golang.org/grpc" -) - -func TestOpenTracingInterceptor(t *testing.T) { - opentelemetry.StartAgent(opentelemetry.Config{ - Name: "go-zero-test", - Endpoint: "http://localhost:14268/api/traces", - Batcher: "jaeger", - Sampler: 1.0, - }) - - cc := new(grpc.ClientConn) - err := UnaryOpenTracingInterceptor()(context.Background(), "/ListUser", nil, nil, cc, - func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, - opts ...grpc.CallOption) error { - return nil - }) - assert.Nil(t, err) -} diff --git a/zrpc/internal/clientinterceptors/tracinginterceptor.go b/zrpc/internal/clientinterceptors/tracinginterceptor.go index abf134c1..6567deed 100644 --- a/zrpc/internal/clientinterceptors/tracinginterceptor.go +++ b/zrpc/internal/clientinterceptors/tracinginterceptor.go @@ -2,40 +2,206 @@ package clientinterceptors import ( "context" + "io" - "github.com/tal-tech/go-zero/core/trace" + ztrace "github.com/tal-tech/go-zero/core/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" + gcodes "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) -// UnaryTracingInterceptor is an interceptor that handles tracing. +const ( + receiveEndEvent streamEventType = iota + errorEvent +) + +// UnaryTracingInterceptor returns a grpc.UnaryClientInterceptor for opentelemetry. func UnaryTracingInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - ctx, span := trace.StartClientSpan(ctx, cc.Target(), method) - defer span.Finish() + ctx, span := startSpan(ctx, method, cc.Target()) + defer span.End() - var pairs []string - span.Visit(func(key, val string) bool { - pairs = append(pairs, key, val) - return true - }) - ctx = metadata.AppendToOutgoingContext(ctx, pairs...) + ztrace.MessageSent.Event(ctx, 1, req) + ztrace.MessageReceived.Event(ctx, 1, reply) - return invoker(ctx, method, req, reply, cc, opts...) + if err := invoker(ctx, method, req, reply, cc, opts...); err != nil { + s, ok := status.FromError(err) + if ok { + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(ztrace.StatusCodeAttr(s.Code())) + } else { + span.SetStatus(codes.Error, err.Error()) + } + return err + } + + span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK)) + return nil } -// StreamTracingInterceptor is an interceptor that handles tracing for stream requests. +// StreamTracingInterceptor returns a grpc.StreamClientInterceptor for opentelemetry. func StreamTracingInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - ctx, span := trace.StartClientSpan(ctx, cc.Target(), method) - defer span.Finish() + ctx, span := startSpan(ctx, method, cc.Target()) + s, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + st, ok := status.FromError(err) + if ok { + span.SetStatus(codes.Error, st.Message()) + span.SetAttributes(ztrace.StatusCodeAttr(st.Code())) + } else { + span.SetStatus(codes.Error, err.Error()) + } + span.End() + return s, err + } - var pairs []string - span.Visit(func(key, val string) bool { - pairs = append(pairs, key, val) - return true - }) - ctx = metadata.AppendToOutgoingContext(ctx, pairs...) + stream := wrapClientStream(ctx, s, desc) - return streamer(ctx, desc, cc, method, opts...) + go func() { + if err := <-stream.Finished; err != nil { + s, ok := status.FromError(err) + if ok { + span.SetStatus(codes.Error, s.Message()) + } else { + span.SetStatus(codes.Error, err.Error()) + } + span.SetAttributes(ztrace.StatusCodeAttr(s.Code())) + } else { + span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK)) + } + + span.End() + }() + + return stream, nil +} + +type ( + streamEventType int + + streamEvent struct { + Type streamEventType + Err error + } + + clientStream struct { + grpc.ClientStream + Finished chan error + desc *grpc.StreamDesc + events chan streamEvent + eventsDone chan struct{} + receivedMessageID int + sentMessageID int + } +) + +func (w *clientStream) RecvMsg(m interface{}) error { + err := w.ClientStream.RecvMsg(m) + if err == nil && !w.desc.ServerStreams { + w.sendStreamEvent(receiveEndEvent, nil) + } else if err == io.EOF { + w.sendStreamEvent(receiveEndEvent, nil) + } else if err != nil { + w.sendStreamEvent(errorEvent, err) + } else { + w.receivedMessageID++ + ztrace.MessageReceived.Event(w.Context(), w.receivedMessageID, m) + } + + return err +} + +func (w *clientStream) SendMsg(m interface{}) error { + err := w.ClientStream.SendMsg(m) + w.sentMessageID++ + ztrace.MessageSent.Event(w.Context(), w.sentMessageID, m) + if err != nil { + w.sendStreamEvent(errorEvent, err) + } + + return err +} + +func (w *clientStream) Header() (metadata.MD, error) { + md, err := w.ClientStream.Header() + if err != nil { + w.sendStreamEvent(errorEvent, err) + } + + return md, err +} + +func (w *clientStream) CloseSend() error { + err := w.ClientStream.CloseSend() + if err != nil { + w.sendStreamEvent(errorEvent, err) + } + + return err +} + +func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) { + select { + case <-w.eventsDone: + case w.events <- streamEvent{Type: eventType, Err: err}: + } +} + +func startSpan(ctx context.Context, method, target string) (context.Context, trace.Span) { + var md metadata.MD + requestMetadata, ok := metadata.FromOutgoingContext(ctx) + if ok { + md = requestMetadata.Copy() + } else { + md = metadata.MD{} + } + tr := otel.Tracer(ztrace.TraceName) + name, attr := ztrace.SpanInfo(method, target) + ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(attr...)) + ztrace.Inject(ctx, otel.GetTextMapPropagator(), &md) + ctx = metadata.NewOutgoingContext(ctx, md) + + return ctx, span +} + +// wrapClientStream wraps s with given ctx and desc. +func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream { + events := make(chan streamEvent) + eventsDone := make(chan struct{}) + finished := make(chan error) + + go func() { + defer close(eventsDone) + + for { + select { + case event := <-events: + switch event.Type { + case receiveEndEvent: + finished <- nil + return + case errorEvent: + finished <- event.Err + return + } + case <-ctx.Done(): + finished <- ctx.Err() + return + } + } + }() + + return &clientStream{ + ClientStream: s, + desc: desc, + events: events, + eventsDone: eventsDone, + Finished: finished, + } } diff --git a/zrpc/internal/clientinterceptors/tracinginterceptor_test.go b/zrpc/internal/clientinterceptors/tracinginterceptor_test.go index 1c448d96..5f154950 100644 --- a/zrpc/internal/clientinterceptors/tracinginterceptor_test.go +++ b/zrpc/internal/clientinterceptors/tracinginterceptor_test.go @@ -9,9 +9,25 @@ import ( "github.com/stretchr/testify/assert" "github.com/tal-tech/go-zero/core/trace" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" ) +func TestOpenTracingInterceptor(t *testing.T) { + trace.StartAgent(trace.Config{ + Name: "go-zero-test", + Endpoint: "http://localhost:14268/api/traces", + Batcher: "jaeger", + Sampler: 1.0, + }) + + cc := new(grpc.ClientConn) + err := UnaryTracingInterceptor(context.Background(), "/ListUser", nil, nil, cc, + func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, + opts ...grpc.CallOption) error { + return nil + }) + assert.Nil(t, err) +} + func TestUnaryTracingInterceptor(t *testing.T) { var run int32 var wg sync.WaitGroup @@ -50,14 +66,8 @@ func TestUnaryTracingInterceptor_GrpcFormat(t *testing.T) { var run int32 var wg sync.WaitGroup wg.Add(1) - md := metadata.New(map[string]string{ - "foo": "bar", - }) - carrier, err := trace.Inject(trace.GrpcFormat, md) - assert.Nil(t, err) - ctx, _ := trace.StartServerSpan(context.Background(), carrier, "user", "/foo") cc := new(grpc.ClientConn) - err = UnaryTracingInterceptor(ctx, "/foo", nil, nil, cc, + err := UnaryTracingInterceptor(context.Background(), "/foo", nil, nil, cc, func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { defer wg.Done() @@ -73,14 +83,8 @@ func TestStreamTracingInterceptor_GrpcFormat(t *testing.T) { var run int32 var wg sync.WaitGroup wg.Add(1) - md := metadata.New(map[string]string{ - "foo": "bar", - }) - carrier, err := trace.Inject(trace.GrpcFormat, md) - assert.Nil(t, err) - ctx, _ := trace.StartServerSpan(context.Background(), carrier, "user", "/foo") cc := new(grpc.ClientConn) - _, err = StreamTracingInterceptor(ctx, nil, cc, "/foo", + _, err := StreamTracingInterceptor(context.Background(), nil, cc, "/foo", func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { defer wg.Done() diff --git a/zrpc/internal/rpcserver.go b/zrpc/internal/rpcserver.go index b71f6c2f..784e6854 100644 --- a/zrpc/internal/rpcserver.go +++ b/zrpc/internal/rpcserver.go @@ -54,17 +54,15 @@ func (s *rpcServer) Start(register RegisterFn) error { } unaryInterceptors := []grpc.UnaryServerInterceptor{ - serverinterceptors.UnaryTracingInterceptor(s.name), - serverinterceptors.UnaryOpenTracingInterceptor(), - serverinterceptors.UnaryCrashInterceptor(), + serverinterceptors.UnaryTracingInterceptor, + serverinterceptors.UnaryCrashInterceptor, serverinterceptors.UnaryStatInterceptor(s.metrics), - serverinterceptors.UnaryPrometheusInterceptor(), - serverinterceptors.UnaryBreakerInterceptor(), + serverinterceptors.UnaryPrometheusInterceptor, + serverinterceptors.UnaryBreakerInterceptor, } unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...) streamInterceptors := []grpc.StreamServerInterceptor{ - serverinterceptors.StreamTracingInterceptor(s.name), - serverinterceptors.StreamOpenTracingInterceptor(), + serverinterceptors.StreamTracingInterceptor, serverinterceptors.StreamCrashInterceptor, serverinterceptors.StreamBreakerInterceptor, } diff --git a/zrpc/internal/serverinterceptors/breakerinterceptor.go b/zrpc/internal/serverinterceptors/breakerinterceptor.go index 7a87f253..b1f783d9 100644 --- a/zrpc/internal/serverinterceptors/breakerinterceptor.go +++ b/zrpc/internal/serverinterceptors/breakerinterceptor.go @@ -18,16 +18,14 @@ func StreamBreakerInterceptor(srv interface{}, stream grpc.ServerStream, info *g } // UnaryBreakerInterceptor is an interceptor that acts as a circuit breaker. -func UnaryBreakerInterceptor() grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler) (resp interface{}, err error) { - breakerName := info.FullMethod - err = breaker.DoWithAcceptable(breakerName, func() error { - var err error - resp, err = handler(ctx, req) - return err - }, codes.Acceptable) +func UnaryBreakerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler) (resp interface{}, err error) { + breakerName := info.FullMethod + err = breaker.DoWithAcceptable(breakerName, func() error { + var err error + resp, err = handler(ctx, req) + return err + }, codes.Acceptable) - return resp, err - } + return resp, err } diff --git a/zrpc/internal/serverinterceptors/breakerinterceptor_test.go b/zrpc/internal/serverinterceptors/breakerinterceptor_test.go index 222b929a..50dc110b 100644 --- a/zrpc/internal/serverinterceptors/breakerinterceptor_test.go +++ b/zrpc/internal/serverinterceptors/breakerinterceptor_test.go @@ -21,8 +21,7 @@ func TestStreamBreakerInterceptor(t *testing.T) { } func TestUnaryBreakerInterceptor(t *testing.T) { - interceptor := UnaryBreakerInterceptor() - _, err := interceptor(nil, nil, &grpc.UnaryServerInfo{ + _, err := UnaryBreakerInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{ FullMethod: "any", }, func(ctx context.Context, req interface{}) (interface{}, error) { return nil, status.New(codes.DeadlineExceeded, "any").Err() diff --git a/zrpc/internal/serverinterceptors/crashinterceptor.go b/zrpc/internal/serverinterceptors/crashinterceptor.go index 77e7a361..5a27036b 100644 --- a/zrpc/internal/serverinterceptors/crashinterceptor.go +++ b/zrpc/internal/serverinterceptors/crashinterceptor.go @@ -21,15 +21,13 @@ func StreamCrashInterceptor(srv interface{}, stream grpc.ServerStream, info *grp } // UnaryCrashInterceptor catches panics in processing unary requests and recovers. -func UnaryCrashInterceptor() grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler) (resp interface{}, err error) { - defer handleCrash(func(r interface{}) { - err = toPanicError(r) - }) +func UnaryCrashInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler) (resp interface{}, err error) { + defer handleCrash(func(r interface{}) { + err = toPanicError(r) + }) - return handler(ctx, req) - } + return handler(ctx, req) } func handleCrash(handler func(interface{})) { diff --git a/zrpc/internal/serverinterceptors/crashinterceptor_test.go b/zrpc/internal/serverinterceptors/crashinterceptor_test.go index 3e66b764..fe5916ef 100644 --- a/zrpc/internal/serverinterceptors/crashinterceptor_test.go +++ b/zrpc/internal/serverinterceptors/crashinterceptor_test.go @@ -22,8 +22,7 @@ func TestStreamCrashInterceptor(t *testing.T) { } func TestUnaryCrashInterceptor(t *testing.T) { - interceptor := UnaryCrashInterceptor() - _, err := interceptor(context.Background(), nil, nil, + _, err := UnaryCrashInterceptor(context.Background(), nil, nil, func(ctx context.Context, req interface{}) (interface{}, error) { panic("mock panic") }) diff --git a/zrpc/internal/serverinterceptors/opentracinginterceptor.go b/zrpc/internal/serverinterceptors/opentracinginterceptor.go deleted file mode 100644 index 42342a10..00000000 --- a/zrpc/internal/serverinterceptors/opentracinginterceptor.go +++ /dev/null @@ -1,82 +0,0 @@ -package serverinterceptors - -import ( - "context" - - "github.com/tal-tech/go-zero/core/trace/opentelemetry" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/baggage" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" - "google.golang.org/grpc" - gcodes "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" -) - -// UnaryOpenTracingInterceptor returns a grpc.UnaryServerInterceptor for opentelemetry. -func UnaryOpenTracingInterceptor() grpc.UnaryServerInterceptor { - propagator := otel.GetTextMapPropagator() - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler) (interface{}, error) { - if !opentelemetry.Enabled() { - return handler(ctx, req) - } - - requestMetadata, _ := metadata.FromIncomingContext(ctx) - metadataCopy := requestMetadata.Copy() - bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy) - ctx = baggage.ContextWithBaggage(ctx, bags) - tr := otel.Tracer(opentelemetry.TraceName) - name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx)) - ctx, span := tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name, - trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...)) - defer span.End() - - opentelemetry.MessageReceived.Event(ctx, 1, req) - resp, err := handler(ctx, req) - if err != nil { - s, _ := status.FromError(err) - span.SetStatus(codes.Error, s.Message()) - span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code())) - opentelemetry.MessageSent.Event(ctx, 1, s.Proto()) - return nil, err - } - - span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK)) - opentelemetry.MessageSent.Event(ctx, 1, resp) - - return resp, nil - } -} - -// StreamOpenTracingInterceptor returns a grpc.StreamServerInterceptor for opentelemetry. -func StreamOpenTracingInterceptor() grpc.StreamServerInterceptor { - propagator := otel.GetTextMapPropagator() - return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - ctx := ss.Context() - if !opentelemetry.Enabled() { - return handler(srv, opentelemetry.WrapServerStream(ctx, ss)) - } - - requestMetadata, _ := metadata.FromIncomingContext(ctx) - metadataCopy := requestMetadata.Copy() - bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy) - ctx = baggage.ContextWithBaggage(ctx, bags) - tr := otel.Tracer(opentelemetry.TraceName) - name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx)) - ctx, span := tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name, - trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...)) - defer span.End() - - if err := handler(srv, opentelemetry.WrapServerStream(ctx, ss)); err != nil { - s, _ := status.FromError(err) - span.SetStatus(codes.Error, s.Message()) - span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code())) - return err - } - - span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK)) - return nil - } -} diff --git a/zrpc/internal/serverinterceptors/opentracinginterceptor_test.go b/zrpc/internal/serverinterceptors/opentracinginterceptor_test.go deleted file mode 100644 index 8e95a59e..00000000 --- a/zrpc/internal/serverinterceptors/opentracinginterceptor_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package serverinterceptors - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/tal-tech/go-zero/core/trace/opentelemetry" - "google.golang.org/grpc" -) - -func TestUnaryOpenTracingInterceptor_Disable(t *testing.T) { - interceptor := UnaryOpenTracingInterceptor() - _, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{ - FullMethod: "/", - }, func(ctx context.Context, req interface{}) (interface{}, error) { - return nil, nil - }) - assert.Nil(t, err) -} - -func TestUnaryOpenTracingInterceptor_Enabled(t *testing.T) { - opentelemetry.StartAgent(opentelemetry.Config{ - Name: "go-zero-test", - Endpoint: "http://localhost:14268/api/traces", - Batcher: "jaeger", - Sampler: 1.0, - }) - interceptor := UnaryOpenTracingInterceptor() - _, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{ - FullMethod: "/package.TestService.GetUser", - }, func(ctx context.Context, req interface{}) (interface{}, error) { - return nil, nil - }) - assert.Nil(t, err) -} diff --git a/zrpc/internal/serverinterceptors/prometheusinterceptor.go b/zrpc/internal/serverinterceptors/prometheusinterceptor.go index decc0d59..5ccd0c07 100644 --- a/zrpc/internal/serverinterceptors/prometheusinterceptor.go +++ b/zrpc/internal/serverinterceptors/prometheusinterceptor.go @@ -33,18 +33,16 @@ var ( }) ) -// UnaryPrometheusInterceptor returns a func that reports to the prometheus server. -func UnaryPrometheusInterceptor() grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) ( - interface{}, error) { - if !prometheus.Enabled() { - return handler(ctx, req) - } - - startTime := timex.Now() - resp, err := handler(ctx, req) - metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod) - metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err)))) - return resp, err +// UnaryPrometheusInterceptor reports the statistics to the prometheus server. +func UnaryPrometheusInterceptor(ctx context.Context, req interface{}, + info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if !prometheus.Enabled() { + return handler(ctx, req) } + + startTime := timex.Now() + resp, err := handler(ctx, req) + metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod) + metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err)))) + return resp, err } diff --git a/zrpc/internal/serverinterceptors/prometheusinterceptor_test.go b/zrpc/internal/serverinterceptors/prometheusinterceptor_test.go index 5eb1840e..18b1887b 100644 --- a/zrpc/internal/serverinterceptors/prometheusinterceptor_test.go +++ b/zrpc/internal/serverinterceptors/prometheusinterceptor_test.go @@ -10,8 +10,7 @@ import ( ) func TestUnaryPromMetricInterceptor_Disabled(t *testing.T) { - interceptor := UnaryPrometheusInterceptor() - _, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{ + _, err := UnaryPrometheusInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{ FullMethod: "/", }, func(ctx context.Context, req interface{}) (interface{}, error) { return nil, nil @@ -24,8 +23,7 @@ func TestUnaryPromMetricInterceptor_Enabled(t *testing.T) { Host: "localhost", Path: "/", }) - interceptor := UnaryPrometheusInterceptor() - _, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{ + _, err := UnaryPrometheusInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{ FullMethod: "/", }, func(ctx context.Context, req interface{}) (interface{}, error) { return nil, nil diff --git a/zrpc/internal/serverinterceptors/tracinginterceptor.go b/zrpc/internal/serverinterceptors/tracinginterceptor.go index 0cd653a6..6950ef56 100644 --- a/zrpc/internal/serverinterceptors/tracinginterceptor.go +++ b/zrpc/internal/serverinterceptors/tracinginterceptor.go @@ -3,50 +3,116 @@ package serverinterceptors import ( "context" - "github.com/tal-tech/go-zero/core/trace" + ztrace "github.com/tal-tech/go-zero/core/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/baggage" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" + gcodes "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) -// UnaryTracingInterceptor returns a grpc.UnaryServerInterceptor -// that handles tracing with given service name. -func UnaryTracingInterceptor(serviceName string) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler) (resp interface{}, err error) { - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return handler(ctx, req) - } +// UnaryTracingInterceptor is a grpc.UnaryServerInterceptor for opentelemetry. +func UnaryTracingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler) (interface{}, error) { + ctx, span := startSpan(ctx, info.FullMethod) + defer span.End() - carrier, err := trace.Extract(trace.GrpcFormat, md) - if err != nil { - return handler(ctx, req) + ztrace.MessageReceived.Event(ctx, 1, req) + resp, err := handler(ctx, req) + if err != nil { + s, ok := status.FromError(err) + if ok { + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(ztrace.StatusCodeAttr(s.Code())) + ztrace.MessageSent.Event(ctx, 1, s.Proto()) + } else { + span.SetStatus(codes.Error, err.Error()) } - - ctx, span := trace.StartServerSpan(ctx, carrier, serviceName, info.FullMethod) - defer span.Finish() - return handler(ctx, req) + return nil, err } + + span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK)) + ztrace.MessageSent.Event(ctx, 1, resp) + + return resp, nil } -// StreamTracingInterceptor returns a grpc.StreamServerInterceptor -// that handles tracing with given service name. -func StreamTracingInterceptor(serviceName string) grpc.StreamServerInterceptor { - return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, - handler grpc.StreamHandler) error { - ctx := ss.Context() - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return handler(srv, ss) - } +// StreamTracingInterceptor returns a grpc.StreamServerInterceptor for opentelemetry. +func StreamTracingInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, + handler grpc.StreamHandler) error { + ctx, span := startSpan(ss.Context(), info.FullMethod) + defer span.End() - carrier, err := trace.Extract(trace.GrpcFormat, md) - if err != nil { - return handler(srv, ss) + if err := handler(srv, wrapServerStream(ctx, ss)); err != nil { + s, ok := status.FromError(err) + if ok { + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(ztrace.StatusCodeAttr(s.Code())) + } else { + span.SetStatus(codes.Error, err.Error()) } + return err + } - _, span := trace.StartServerSpan(ctx, carrier, serviceName, info.FullMethod) - defer span.Finish() - return handler(srv, ss) + span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK)) + return nil +} + +// serverStream wraps around the embedded grpc.ServerStream, +// and intercepts the RecvMsg and SendMsg method call. +type serverStream struct { + grpc.ServerStream + ctx context.Context + receivedMessageID int + sentMessageID int +} + +func (w *serverStream) Context() context.Context { + return w.ctx +} + +func (w *serverStream) RecvMsg(m interface{}) error { + err := w.ServerStream.RecvMsg(m) + if err == nil { + w.receivedMessageID++ + ztrace.MessageReceived.Event(w.Context(), w.receivedMessageID, m) + } + + return err +} + +func (w *serverStream) SendMsg(m interface{}) error { + err := w.ServerStream.SendMsg(m) + w.sentMessageID++ + ztrace.MessageSent.Event(w.Context(), w.sentMessageID, m) + + return err +} + +func startSpan(ctx context.Context, method string) (context.Context, trace.Span) { + var md metadata.MD + requestMetadata, ok := metadata.FromIncomingContext(ctx) + if ok { + md = requestMetadata.Copy() + } else { + md = metadata.MD{} + } + bags, spanCtx := ztrace.Extract(ctx, otel.GetTextMapPropagator(), &md) + ctx = baggage.ContextWithBaggage(ctx, bags) + tr := otel.Tracer(ztrace.TraceName) + name, attr := ztrace.SpanInfo(method, ztrace.PeerFromCtx(ctx)) + + return tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name, + trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...)) +} + +// wrapServerStream wraps the given grpc.ServerStream with the given context. +func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream { + return &serverStream{ + ServerStream: ss, + ctx: ctx, } } diff --git a/zrpc/internal/serverinterceptors/tracinginterceptor_test.go b/zrpc/internal/serverinterceptors/tracinginterceptor_test.go index e1bae3d4..cf3c2da6 100644 --- a/zrpc/internal/serverinterceptors/tracinginterceptor_test.go +++ b/zrpc/internal/serverinterceptors/tracinginterceptor_test.go @@ -7,17 +7,40 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/tal-tech/go-zero/core/trace/tracespec" + "github.com/tal-tech/go-zero/core/trace" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) +func TestUnaryOpenTracingInterceptor_Disable(t *testing.T) { + _, err := UnaryTracingInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{ + FullMethod: "/", + }, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + assert.Nil(t, err) +} + +func TestUnaryOpenTracingInterceptor_Enabled(t *testing.T) { + trace.StartAgent(trace.Config{ + Name: "go-zero-test", + Endpoint: "http://localhost:14268/api/traces", + Batcher: "jaeger", + Sampler: 1.0, + }) + _, err := UnaryTracingInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{ + FullMethod: "/package.TestService.GetUser", + }, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + assert.Nil(t, err) +} + func TestUnaryTracingInterceptor(t *testing.T) { - interceptor := UnaryTracingInterceptor("foo") var run int32 var wg sync.WaitGroup wg.Add(1) - _, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{ + _, err := UnaryTracingInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{ FullMethod: "/", }, func(ctx context.Context, req interface{}) (interface{}, error) { defer wg.Done() @@ -29,49 +52,14 @@ func TestUnaryTracingInterceptor(t *testing.T) { assert.Equal(t, int32(1), atomic.LoadInt32(&run)) } -func TestUnaryTracingInterceptor_GrpcFormat(t *testing.T) { - interceptor := UnaryTracingInterceptor("foo") - var wg sync.WaitGroup - wg.Add(1) - var md metadata.MD - ctx := metadata.NewIncomingContext(context.Background(), md) - _, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{ - FullMethod: "/", - }, func(ctx context.Context, req interface{}) (interface{}, error) { - defer wg.Done() - assert.True(t, len(ctx.Value(tracespec.TracingKey).(tracespec.Trace).TraceId()) > 0) - assert.True(t, len(ctx.Value(tracespec.TracingKey).(tracespec.Trace).SpanId()) > 0) - return nil, nil - }) - wg.Wait() - assert.Nil(t, err) -} - -func TestStreamTracingInterceptor(t *testing.T) { - interceptor := StreamTracingInterceptor("foo") - var run int32 - var wg sync.WaitGroup - wg.Add(1) - err := interceptor(nil, new(mockedServerStream), nil, - func(srv interface{}, stream grpc.ServerStream) error { - defer wg.Done() - atomic.AddInt32(&run, 1) - return nil - }) - wg.Wait() - assert.Nil(t, err) - assert.Equal(t, int32(1), atomic.LoadInt32(&run)) -} - func TestStreamTracingInterceptor_GrpcFormat(t *testing.T) { - interceptor := StreamTracingInterceptor("foo") var run int32 var wg sync.WaitGroup wg.Add(1) var md metadata.MD ctx := metadata.NewIncomingContext(context.Background(), md) stream := mockedServerStream{ctx: ctx} - err := interceptor(nil, &stream, &grpc.StreamServerInfo{ + err := StreamTracingInterceptor(nil, &stream, &grpc.StreamServerInfo{ FullMethod: "/foo", }, func(srv interface{}, stream grpc.ServerStream) error { defer wg.Done() diff --git a/zrpc/server_test.go b/zrpc/server_test.go index 7d7a1f9a..4f24c68f 100644 --- a/zrpc/server_test.go +++ b/zrpc/server_test.go @@ -52,7 +52,7 @@ func TestServer(t *testing.T) { }, func(server *grpc.Server) { }) srv.AddOptions(grpc.ConnectionTimeout(time.Hour)) - srv.AddUnaryInterceptors(serverinterceptors.UnaryCrashInterceptor()) + srv.AddUnaryInterceptors(serverinterceptors.UnaryCrashInterceptor) srv.AddStreamInterceptors(serverinterceptors.StreamCrashInterceptor) go srv.Start() srv.Stop() @@ -94,7 +94,7 @@ func TestServer_HasEtcd(t *testing.T) { }, func(server *grpc.Server) { }) srv.AddOptions(grpc.ConnectionTimeout(time.Hour)) - srv.AddUnaryInterceptors(serverinterceptors.UnaryCrashInterceptor()) + srv.AddUnaryInterceptors(serverinterceptors.UnaryCrashInterceptor) srv.AddStreamInterceptors(serverinterceptors.StreamCrashInterceptor) go srv.Start() srv.Stop()