fix golint issues (#992)

This commit is contained in:
Kevin Wan
2021-09-04 12:16:30 +08:00
committed by GitHub
parent 20f665ede8
commit 9bdadf2381
14 changed files with 68 additions and 77 deletions

View File

@@ -3,43 +3,38 @@ package opentelemetry
import ( import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0" semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
grpc_codes "google.golang.org/grpc/codes" gcodes "google.golang.org/grpc/codes"
) )
const ( const (
// GRPCStatusCodeKey is convention for numeric status code of a gRPC request. // GRPCStatusCodeKey is convention for numeric status code of a gRPC request.
GRPCStatusCodeKey = attribute.Key("rpc.grpc.status_code") GRPCStatusCodeKey = attribute.Key("rpc.grpc.status_code")
// RPCNameKey is the name of message transmitted or received.
// Name of message transmitted or received.
RPCNameKey = attribute.Key("name") RPCNameKey = attribute.Key("name")
// RPCMessageTypeKey is the type of message transmitted or received.
// Type of message transmitted or received.
RPCMessageTypeKey = attribute.Key("message.type") RPCMessageTypeKey = attribute.Key("message.type")
// RPCMessageIDKey is the identifier of message transmitted or received.
// Identifier of message transmitted or received.
RPCMessageIDKey = attribute.Key("message.id") RPCMessageIDKey = attribute.Key("message.id")
// RPCMessageCompressedSizeKey is the compressed size of the message transmitted or received in bytes.
// The compressed size of the message transmitted or received in bytes.
RPCMessageCompressedSizeKey = attribute.Key("message.compressed_size") RPCMessageCompressedSizeKey = attribute.Key("message.compressed_size")
// RPCMessageUncompressedSizeKey is the uncompressed size of the message
// The uncompressed size of the message transmitted or received in // transmitted or received in bytes.
// bytes.
RPCMessageUncompressedSizeKey = attribute.Key("message.uncompressed_size") RPCMessageUncompressedSizeKey = attribute.Key("message.uncompressed_size")
) )
// Semantic conventions for common RPC attributes. // Semantic conventions for common RPC attributes.
var ( var (
// Semantic convention for gRPC as the remoting system. // RPCSystemGRPC is the semantic convention for gRPC as the remoting system.
RPCSystemGRPC = semconv.RPCSystemKey.String("grpc") RPCSystemGRPC = semconv.RPCSystemKey.String("grpc")
// RPCNameMessage is the semantic convention for a message named message.
// Semantic convention for a message named message.
RPCNameMessage = RPCNameKey.String("message") RPCNameMessage = RPCNameKey.String("message")
// RPCMessageTypeSent is the semantic conventions for sent RPC message types.
// Semantic conventions for RPC message types. RPCMessageTypeSent = RPCMessageTypeKey.String("SENT")
RPCMessageTypeSent = RPCMessageTypeKey.String("SENT") // RPCMessageTypeReceived is the semantic conventions for the received RPC message types.
RPCMessageTypeReceived = RPCMessageTypeKey.String("RECEIVED") RPCMessageTypeReceived = RPCMessageTypeKey.String("RECEIVED")
) )
func StatusCodeAttr(c grpc_codes.Code) attribute.KeyValue { // StatusCodeAttr returns a attribute.KeyValue that represents the give c.
func StatusCodeAttr(c gcodes.Code) attribute.KeyValue {
return GRPCStatusCodeKey.Int64(int64(c)) return GRPCStatusCodeKey.Int64(int64(c))
} }

View File

@@ -9,6 +9,13 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
const (
receiveEndEvent streamEventType = iota
errorEvent
)
var _ = proto.Marshal
type streamEventType int type streamEventType int
type streamEvent struct { type streamEvent struct {
@@ -16,11 +23,6 @@ type streamEvent struct {
Err error Err error
} }
const (
receiveEndEvent streamEventType = iota
errorEvent
)
type clientStream struct { type clientStream struct {
grpc.ClientStream grpc.ClientStream
@@ -33,11 +35,8 @@ type clientStream struct {
sentMessageID int sentMessageID int
} }
var _ = proto.Marshal
func (w *clientStream) RecvMsg(m interface{}) error { func (w *clientStream) RecvMsg(m interface{}) error {
err := w.ClientStream.RecvMsg(m) err := w.ClientStream.RecvMsg(m)
if err == nil && !w.desc.ServerStreams { if err == nil && !w.desc.ServerStreams {
w.sendStreamEvent(receiveEndEvent, nil) w.sendStreamEvent(receiveEndEvent, nil)
} else if err == io.EOF { } else if err == io.EOF {
@@ -54,10 +53,8 @@ func (w *clientStream) RecvMsg(m interface{}) error {
func (w *clientStream) SendMsg(m interface{}) error { func (w *clientStream) SendMsg(m interface{}) error {
err := w.ClientStream.SendMsg(m) err := w.ClientStream.SendMsg(m)
w.sentMessageID++ w.sentMessageID++
MessageSent.Event(w.Context(), w.sentMessageID, m) MessageSent.Event(w.Context(), w.sentMessageID, m)
if err != nil { if err != nil {
w.sendStreamEvent(errorEvent, err) w.sendStreamEvent(errorEvent, err)
} }
@@ -90,6 +87,7 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
} }
} }
// WrapClientStream wraps s with given ctx and desc.
func WrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream { func WrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
events := make(chan streamEvent) events := make(chan streamEvent)
eventsDone := make(chan struct{}) eventsDone := make(chan struct{})

View File

@@ -1,5 +1,6 @@
package opentelemetry package opentelemetry
// TraceName represents the tracing name.
const TraceName = "go-zero" const TraceName = "go-zero"
// A Config is a opentelemetry config. // A Config is a opentelemetry config.

View File

@@ -9,7 +9,9 @@ import (
) )
var ( var (
MessageSent = messageType(RPCMessageTypeSent) // MessageSent is the type of sent messages.
MessageSent = messageType(RPCMessageTypeSent)
// MessageReceived is the type of received messages.
MessageReceived = messageType(RPCMessageTypeReceived) MessageReceived = messageType(RPCMessageTypeReceived)
) )

View File

@@ -40,6 +40,7 @@ func (w *serverStream) SendMsg(m interface{}) error {
return err return err
} }
// WrapServerStream wraps the given grpc.ServerStream with the given context.
func WrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream { func WrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {
return &serverStream{ return &serverStream{
ServerStream: ss, ServerStream: ss,

View File

@@ -36,12 +36,14 @@ func (s *metadataSupplier) Keys() []string {
return out return out
} }
// Inject injects the metadata into ctx.
func Inject(ctx context.Context, p propagation.TextMapPropagator, metadata *metadata.MD) { func Inject(ctx context.Context, p propagation.TextMapPropagator, metadata *metadata.MD) {
p.Inject(ctx, &metadataSupplier{ p.Inject(ctx, &metadataSupplier{
metadata: metadata, metadata: metadata,
}) })
} }
// Extract extracts the metadata from ctx.
func Extract(ctx context.Context, p propagation.TextMapPropagator, metadata *metadata.MD) (baggage.Baggage, trace.SpanContext) { func Extract(ctx context.Context, p propagation.TextMapPropagator, metadata *metadata.MD) (baggage.Baggage, trace.SpanContext) {
ctx = p.Extract(ctx, &metadataSupplier{ ctx = p.Extract(ctx, &metadataSupplier{
metadata: metadata, metadata: metadata,

View File

@@ -10,14 +10,17 @@ import (
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
) )
// PeerFromCtx returns the peer from ctx.
func PeerFromCtx(ctx context.Context) string { func PeerFromCtx(ctx context.Context) string {
p, ok := peer.FromContext(ctx) p, ok := peer.FromContext(ctx)
if !ok { if !ok {
return "" return ""
} }
return p.Addr.String() return p.Addr.String()
} }
// SpanInfo returns the span info.
func SpanInfo(fullMethod, peerAddress string) (string, []attribute.KeyValue) { func SpanInfo(fullMethod, peerAddress string) (string, []attribute.KeyValue) {
attrs := []attribute.KeyValue{RPCSystemGRPC} attrs := []attribute.KeyValue{RPCSystemGRPC}
name, mAttrs := ParseFullMethod(fullMethod) name, mAttrs := ParseFullMethod(fullMethod)
@@ -26,6 +29,7 @@ func SpanInfo(fullMethod, peerAddress string) (string, []attribute.KeyValue) {
return name, attrs return name, attrs
} }
// ParseFullMethod returns the method name and attributes.
func ParseFullMethod(fullMethod string) (string, []attribute.KeyValue) { func ParseFullMethod(fullMethod string) (string, []attribute.KeyValue) {
name := strings.TrimLeft(fullMethod, "/") name := strings.TrimLeft(fullMethod, "/")
parts := strings.SplitN(name, "/", 2) parts := strings.SplitN(name, "/", 2)
@@ -41,9 +45,11 @@ func ParseFullMethod(fullMethod string) (string, []attribute.KeyValue) {
if method := parts[1]; method != "" { if method := parts[1]; method != "" {
attrs = append(attrs, semconv.RPCMethodKey.String(method)) attrs = append(attrs, semconv.RPCMethodKey.String(method))
} }
return name, attrs return name, attrs
} }
// PeerAttr returns the peer attributes.
func PeerAttr(addr string) []attribute.KeyValue { func PeerAttr(addr string) []attribute.KeyValue {
host, port, err := net.SplitHostPort(addr) host, port, err := net.SplitHostPort(addr)
if err != nil { if err != nil {

View File

@@ -51,6 +51,7 @@ func init() {
}() }()
} }
// Done returns the channel that notifies the process quitting.
func Done() <-chan struct{} { func Done() <-chan struct{} {
return done return done
} }

View File

@@ -1,6 +1,7 @@
package trace package trace
const ( const (
// TraceIdKey is the trace id header.
TraceIdKey = "X-Trace-ID" TraceIdKey = "X-Trace-ID"
spanIdKey = "X-Span-ID" spanIdKey = "X-Span-ID"

View File

@@ -10,6 +10,7 @@ import (
oteltrace "go.opentelemetry.io/otel/trace" oteltrace "go.opentelemetry.io/otel/trace"
) )
// OtelHandler return a middleware that process the opentelemetry.
func OtelHandler(path string) func(http.Handler) http.Handler { func OtelHandler(path string) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler { return func(next http.Handler) http.Handler {
if !opentelemetry.Enabled() { if !opentelemetry.Enabled() {

View File

@@ -13,9 +13,11 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
// OpenTracingInterceptor returns a grpc.UnaryClientInterceptor for opentelemetry.
func OpenTracingInterceptor() grpc.UnaryClientInterceptor { func OpenTracingInterceptor() grpc.UnaryClientInterceptor {
propagator := otel.GetTextMapPropagator() propagator := otel.GetTextMapPropagator()
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if !opentelemetry.Enabled() { if !opentelemetry.Enabled() {
return invoker(ctx, method, req, reply, cc, opts...) return invoker(ctx, method, req, reply, cc, opts...)
} }
@@ -24,13 +26,8 @@ func OpenTracingInterceptor() grpc.UnaryClientInterceptor {
metadataCopy := requestMetadata.Copy() metadataCopy := requestMetadata.Copy()
tr := otel.Tracer(opentelemetry.TraceName) tr := otel.Tracer(opentelemetry.TraceName)
name, attr := opentelemetry.SpanInfo(method, cc.Target()) name, attr := opentelemetry.SpanInfo(method, cc.Target())
ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient),
var span trace.Span trace.WithAttributes(attr...))
ctx, span = tr.Start(ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attr...),
)
defer span.End() defer span.End()
opentelemetry.Inject(ctx, propagator, &metadataCopy) opentelemetry.Inject(ctx, propagator, &metadataCopy)
@@ -50,30 +47,23 @@ func OpenTracingInterceptor() grpc.UnaryClientInterceptor {
} }
} }
// StreamOpenTracingInterceptor returns a grpc.StreamClientInterceptor for opentelemetry.
func StreamOpenTracingInterceptor() grpc.StreamClientInterceptor { func StreamOpenTracingInterceptor() grpc.StreamClientInterceptor {
propagator := otel.GetTextMapPropagator() propagator := otel.GetTextMapPropagator()
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if !opentelemetry.Enabled() { if !opentelemetry.Enabled() {
return streamer(ctx, desc, cc, method, opts...) return streamer(ctx, desc, cc, method, opts...)
} }
requestMetadata, _ := metadata.FromOutgoingContext(ctx) requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy() metadataCopy := requestMetadata.Copy()
tr := otel.Tracer(opentelemetry.TraceName) tr := otel.Tracer(opentelemetry.TraceName)
name, attr := opentelemetry.SpanInfo(method, cc.Target()) name, attr := opentelemetry.SpanInfo(method, cc.Target())
var span trace.Span ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient),
ctx, span = tr.Start( trace.WithAttributes(attr...))
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attr...),
)
opentelemetry.Inject(ctx, propagator, &metadataCopy) opentelemetry.Inject(ctx, propagator, &metadataCopy)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy) ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
s, err := streamer(ctx, desc, cc, method, opts...) s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil { if err != nil {
grpcStatus, _ := status.FromError(err) grpcStatus, _ := status.FromError(err)
@@ -82,12 +72,11 @@ func StreamOpenTracingInterceptor() grpc.StreamClientInterceptor {
span.End() span.End()
return s, err return s, err
} }
stream := opentelemetry.WrapClientStream(ctx, s, desc) stream := opentelemetry.WrapClientStream(ctx, s, desc)
go func() { go func() {
err := <-stream.Finished if err := <-stream.Finished; err != nil {
if err != nil {
s, _ := status.FromError(err) s, _ := status.FromError(err)
span.SetStatus(codes.Error, s.Message()) span.SetStatus(codes.Error, s.Message())
span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code())) span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code()))

View File

@@ -8,12 +8,14 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
) )
// EventHandler is ResourceEventHandler implementation.
type EventHandler struct { type EventHandler struct {
update func([]string) update func([]string)
endpoints map[string]lang.PlaceholderType endpoints map[string]lang.PlaceholderType
lock sync.Mutex lock sync.Mutex
} }
// NewEventHandler returns an EventHandler.
func NewEventHandler(update func([]string)) *EventHandler { func NewEventHandler(update func([]string)) *EventHandler {
return &EventHandler{ return &EventHandler{
update: update, update: update,
@@ -21,6 +23,7 @@ func NewEventHandler(update func([]string)) *EventHandler {
} }
} }
// OnAdd handles the endpoints add events.
func (h *EventHandler) OnAdd(obj interface{}) { func (h *EventHandler) OnAdd(obj interface{}) {
endpoints, ok := obj.(*v1.Endpoints) endpoints, ok := obj.(*v1.Endpoints)
if !ok { if !ok {
@@ -46,6 +49,7 @@ func (h *EventHandler) OnAdd(obj interface{}) {
} }
} }
// OnDelete handles the endpoints delete events.
func (h *EventHandler) OnDelete(obj interface{}) { func (h *EventHandler) OnDelete(obj interface{}) {
endpoints, ok := obj.(*v1.Endpoints) endpoints, ok := obj.(*v1.Endpoints)
if !ok { if !ok {
@@ -71,6 +75,7 @@ func (h *EventHandler) OnDelete(obj interface{}) {
} }
} }
// OnUpdate handles the endpoints update events.
func (h *EventHandler) OnUpdate(oldObj, newObj interface{}) { func (h *EventHandler) OnUpdate(oldObj, newObj interface{}) {
oldEndpoints, ok := oldObj.(*v1.Endpoints) oldEndpoints, ok := oldObj.(*v1.Endpoints)
if !ok { if !ok {
@@ -91,6 +96,7 @@ func (h *EventHandler) OnUpdate(oldObj, newObj interface{}) {
h.Update(newEndpoints) h.Update(newEndpoints)
} }
// Update updates the endpoints.
func (h *EventHandler) Update(endpoints *v1.Endpoints) { func (h *EventHandler) Update(endpoints *v1.Endpoints) {
h.lock.Lock() h.lock.Lock()
defer h.lock.Unlock() defer h.lock.Unlock()

View File

@@ -15,12 +15,14 @@ const (
var emptyService Service var emptyService Service
// Service represents a service with namespace, name and port.
type Service struct { type Service struct {
Namespace string Namespace string
Name string Name string
Port int Port int
} }
// ParseTarget parses the resolver.Target.
func ParseTarget(target resolver.Target) (Service, error) { func ParseTarget(target resolver.Target) (Service, error) {
var service Service var service Service
service.Namespace = target.Authority service.Namespace = target.Authority

View File

@@ -14,6 +14,7 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
// UnaryOpenTracingInterceptor returns a grpc.UnaryServerInterceptor for opentelemetry.
func UnaryOpenTracingInterceptor() grpc.UnaryServerInterceptor { func UnaryOpenTracingInterceptor() grpc.UnaryServerInterceptor {
propagator := otel.GetTextMapPropagator() propagator := otel.GetTextMapPropagator()
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
@@ -24,20 +25,12 @@ func UnaryOpenTracingInterceptor() grpc.UnaryServerInterceptor {
requestMetadata, _ := metadata.FromIncomingContext(ctx) requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy() metadataCopy := requestMetadata.Copy()
bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy) bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy)
ctx = baggage.ContextWithBaggage(ctx, bags) ctx = baggage.ContextWithBaggage(ctx, bags)
tr := otel.Tracer(opentelemetry.TraceName) tr := otel.Tracer(opentelemetry.TraceName)
name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx)) name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx))
ctx, span := tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name,
var span trace.Span trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...))
ctx, span = tr.Start(
trace.ContextWithRemoteSpanContext(ctx, spanCtx),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
)
defer span.End() defer span.End()
opentelemetry.MessageReceived.Event(ctx, 1, req) opentelemetry.MessageReceived.Event(ctx, 1, req)
@@ -57,6 +50,7 @@ func UnaryOpenTracingInterceptor() grpc.UnaryServerInterceptor {
} }
} }
// StreamOpenTracingInterceptor returns a grpc.StreamServerInterceptor for opentelemetry.
func StreamOpenTracingInterceptor() grpc.StreamServerInterceptor { func StreamOpenTracingInterceptor() grpc.StreamServerInterceptor {
propagator := otel.GetTextMapPropagator() propagator := otel.GetTextMapPropagator()
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
@@ -67,30 +61,22 @@ func StreamOpenTracingInterceptor() grpc.StreamServerInterceptor {
requestMetadata, _ := metadata.FromIncomingContext(ctx) requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy() metadataCopy := requestMetadata.Copy()
bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy) bags, spanCtx := opentelemetry.Extract(ctx, propagator, &metadataCopy)
ctx = baggage.ContextWithBaggage(ctx, bags) ctx = baggage.ContextWithBaggage(ctx, bags)
tr := otel.Tracer(opentelemetry.TraceName) tr := otel.Tracer(opentelemetry.TraceName)
name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx)) name, attr := opentelemetry.SpanInfo(info.FullMethod, opentelemetry.PeerFromCtx(ctx))
ctx, span := tr.Start( ctx, span := tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name,
trace.ContextWithRemoteSpanContext(ctx, spanCtx), trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...))
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
)
defer span.End() defer span.End()
err := handler(srv, opentelemetry.WrapServerStream(ctx, ss)) if err := handler(srv, opentelemetry.WrapServerStream(ctx, ss)); err != nil {
if err != nil {
s, _ := status.FromError(err) s, _ := status.FromError(err)
span.SetStatus(codes.Error, s.Message()) span.SetStatus(codes.Error, s.Message())
span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code())) span.SetAttributes(opentelemetry.StatusCodeAttr(s.Code()))
} else { return err
span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK))
} }
return err span.SetAttributes(opentelemetry.StatusCodeAttr(gcodes.OK))
return nil
} }
} }