refactor: refactor trace in redis & sql & mongo (#1865)
* refactor: refactor tracing in redis & sql & mongo Signed-off-by: chenquan <chenquan.dev@gmail.com> * fix: fix some tests Signed-off-by: chenquan <chenquan.dev@gmail.com> * refactor: add missing content Signed-off-by: chenquan <chenquan.dev@gmail.com> * refactor: adjust `log` and `return` Signed-off-by: chenquan <chenquan.dev@gmail.com> * refactor: reformat code Signed-off-by: chenquan <chenquan.dev@gmail.com> * refactor: reformat code Signed-off-by: chenquan <chenquan.dev@gmail.com> * refactor: reformat code Signed-off-by: chenquan <chenquan.dev@gmail.com> * refactor: simpler span name Signed-off-by: chenquan <chenquan.dev@gmail.com> * refactor: fix a bug Signed-off-by: chenquan <chenquan.dev@gmail.com> * refactor: fix a bug Signed-off-by: chenquan <chenquan.dev@gmail.com>
This commit is contained in:
@@ -8,18 +8,35 @@ import (
|
||||
"github.com/zeromicro/go-zero/core/breaker"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/trace"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
mopt "go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
|
||||
"go.opentelemetry.io/otel"
|
||||
tracesdk "go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultSlowThreshold = time.Millisecond * 500
|
||||
// spanName is the span name of the mongo calls.
|
||||
spanName = "mongo"
|
||||
|
||||
// mongodb method names
|
||||
aggregate = "Aggregate"
|
||||
bulkWrite = "BulkWrite"
|
||||
countDocuments = "CountDocuments"
|
||||
deleteMany = "DeleteMany"
|
||||
deleteOne = "DeleteOne"
|
||||
distinct = "Distinct"
|
||||
estimatedDocumentCount = "EstimatedDocumentCount"
|
||||
find = "Find"
|
||||
findOne = "FindOne"
|
||||
findOneAndDelete = "FindOneAndDelete"
|
||||
findOneAndReplace = "FindOneAndReplace"
|
||||
findOneAndUpdate = "FindOneAndUpdate"
|
||||
insertMany = "InsertMany"
|
||||
insertOne = "InsertOne"
|
||||
replaceOne = "ReplaceOne"
|
||||
updateByID = "UpdateByID"
|
||||
updateMany = "UpdateMany"
|
||||
updateOne = "UpdateOne"
|
||||
)
|
||||
|
||||
// ErrNotFound is an alias of mongo.ErrNoDocuments
|
||||
@@ -120,341 +137,397 @@ func newCollection(collection *mongo.Collection, brk breaker.Breaker) Collection
|
||||
|
||||
func (c *decoratedCollection) Aggregate(ctx context.Context, pipeline interface{},
|
||||
opts ...*mopt.AggregateOptions) (cur *mongo.Cursor, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, aggregate)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple("Aggregate", starTime, err)
|
||||
c.logDurationSimple(ctx, aggregate, starTime, err)
|
||||
}()
|
||||
|
||||
cur, err = c.Collection.Aggregate(ctx, pipeline, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel,
|
||||
opts ...*mopt.BulkWriteOptions) (res *mongo.BulkWriteResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, bulkWrite)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple("BulkWrite", startTime, err)
|
||||
c.logDurationSimple(ctx, bulkWrite, startTime, err)
|
||||
}()
|
||||
|
||||
res, err = c.Collection.BulkWrite(ctx, models, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) CountDocuments(ctx context.Context, filter interface{},
|
||||
opts ...*mopt.CountOptions) (count int64, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, countDocuments)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple("CountDocuments", startTime, err)
|
||||
c.logDurationSimple(ctx, countDocuments, startTime, err)
|
||||
}()
|
||||
|
||||
count, err = c.Collection.CountDocuments(ctx, filter, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) DeleteMany(ctx context.Context, filter interface{},
|
||||
opts ...*mopt.DeleteOptions) (res *mongo.DeleteResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, deleteMany)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple("DeleteMany", startTime, err)
|
||||
c.logDurationSimple(ctx, deleteMany, startTime, err)
|
||||
}()
|
||||
|
||||
res, err = c.Collection.DeleteMany(ctx, filter, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) DeleteOne(ctx context.Context, filter interface{},
|
||||
opts ...*mopt.DeleteOptions) (res *mongo.DeleteResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, deleteOne)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration("DeleteOne", startTime, err, filter)
|
||||
c.logDuration(ctx, deleteOne, startTime, err, filter)
|
||||
}()
|
||||
|
||||
res, err = c.Collection.DeleteOne(ctx, filter, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) Distinct(ctx context.Context, fieldName string, filter interface{},
|
||||
opts ...*mopt.DistinctOptions) (val []interface{}, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, distinct)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple("Distinct", startTime, err)
|
||||
c.logDurationSimple(ctx, distinct, startTime, err)
|
||||
}()
|
||||
|
||||
val, err = c.Collection.Distinct(ctx, fieldName, filter, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) EstimatedDocumentCount(ctx context.Context,
|
||||
opts ...*mopt.EstimatedDocumentCountOptions) (val int64, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, estimatedDocumentCount)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple("EstimatedDocumentCount", startTime, err)
|
||||
c.logDurationSimple(ctx, estimatedDocumentCount, startTime, err)
|
||||
}()
|
||||
|
||||
val, err = c.Collection.EstimatedDocumentCount(ctx, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) Find(ctx context.Context, filter interface{},
|
||||
opts ...*mopt.FindOptions) (cur *mongo.Cursor, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, find)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration("Find", startTime, err, filter)
|
||||
c.logDuration(ctx, find, startTime, err, filter)
|
||||
}()
|
||||
|
||||
cur, err = c.Collection.Find(ctx, filter, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) FindOne(ctx context.Context, filter interface{},
|
||||
opts ...*mopt.FindOneOptions) (res *mongo.SingleResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, findOne)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration("FindOne", startTime, err, filter)
|
||||
c.logDuration(ctx, findOne, startTime, err, filter)
|
||||
}()
|
||||
|
||||
res = c.Collection.FindOne(ctx, filter, opts...)
|
||||
err = res.Err()
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) FindOneAndDelete(ctx context.Context, filter interface{},
|
||||
opts ...*mopt.FindOneAndDeleteOptions) (res *mongo.SingleResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, findOneAndDelete)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration("FindOneAndDelete", startTime, err, filter)
|
||||
c.logDuration(ctx, findOneAndDelete, startTime, err, filter)
|
||||
}()
|
||||
|
||||
res = c.Collection.FindOneAndDelete(ctx, filter, opts...)
|
||||
err = res.Err()
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) FindOneAndReplace(ctx context.Context, filter interface{},
|
||||
replacement interface{}, opts ...*mopt.FindOneAndReplaceOptions) (
|
||||
res *mongo.SingleResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, findOneAndReplace)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration("FindOneAndReplace", startTime, err, filter, replacement)
|
||||
c.logDuration(ctx, findOneAndReplace, startTime, err, filter, replacement)
|
||||
}()
|
||||
|
||||
res = c.Collection.FindOneAndReplace(ctx, filter, replacement, opts...)
|
||||
err = res.Err()
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) FindOneAndUpdate(ctx context.Context, filter interface{}, update interface{},
|
||||
opts ...*mopt.FindOneAndUpdateOptions) (res *mongo.SingleResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, findOneAndUpdate)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration("FindOneAndUpdate", startTime, err, filter, update)
|
||||
c.logDuration(ctx, findOneAndUpdate, startTime, err, filter, update)
|
||||
}()
|
||||
|
||||
res = c.Collection.FindOneAndUpdate(ctx, filter, update, opts...)
|
||||
err = res.Err()
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) InsertMany(ctx context.Context, documents []interface{},
|
||||
opts ...*mopt.InsertManyOptions) (res *mongo.InsertManyResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, insertMany)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple("InsertMany", startTime, err)
|
||||
c.logDurationSimple(ctx, insertMany, startTime, err)
|
||||
}()
|
||||
|
||||
res, err = c.Collection.InsertMany(ctx, documents, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) InsertOne(ctx context.Context, document interface{},
|
||||
opts ...*mopt.InsertOneOptions) (res *mongo.InsertOneResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, insertOne)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration("InsertOne", startTime, err, document)
|
||||
c.logDuration(ctx, insertOne, startTime, err, document)
|
||||
}()
|
||||
|
||||
res, err = c.Collection.InsertOne(ctx, document, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) ReplaceOne(ctx context.Context, filter interface{}, replacement interface{},
|
||||
opts ...*mopt.ReplaceOptions) (res *mongo.UpdateResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, replaceOne)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration("ReplaceOne", startTime, err, filter, replacement)
|
||||
c.logDuration(ctx, replaceOne, startTime, err, filter, replacement)
|
||||
}()
|
||||
|
||||
res, err = c.Collection.ReplaceOne(ctx, filter, replacement, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) UpdateByID(ctx context.Context, id interface{}, update interface{},
|
||||
opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, updateByID)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration("UpdateByID", startTime, err, id, update)
|
||||
c.logDuration(ctx, updateByID, startTime, err, id, update)
|
||||
}()
|
||||
|
||||
res, err = c.Collection.UpdateByID(ctx, id, update, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) UpdateMany(ctx context.Context, filter interface{}, update interface{},
|
||||
opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, updateMany)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple("UpdateMany", startTime, err)
|
||||
c.logDurationSimple(ctx, updateMany, startTime, err)
|
||||
}()
|
||||
|
||||
res, err = c.Collection.UpdateMany(ctx, filter, update, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{},
|
||||
opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, updateOne)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration("UpdateOne", startTime, err, filter, update)
|
||||
c.logDuration(ctx, updateOne, startTime, err, filter, update)
|
||||
}()
|
||||
|
||||
res, err = c.Collection.UpdateOne(ctx, filter, update, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) logDuration(method string, startTime time.Duration, err error,
|
||||
func (c *decoratedCollection) logDuration(ctx context.Context, method string, startTime time.Duration, err error,
|
||||
docs ...interface{}) {
|
||||
duration := timex.Since(startTime)
|
||||
logger := logx.WithContext(ctx).WithDuration(duration)
|
||||
|
||||
content, e := json.Marshal(docs)
|
||||
if e != nil {
|
||||
logx.Error(err)
|
||||
logger.Error(err)
|
||||
} else if err != nil {
|
||||
if duration > slowThreshold.Load() {
|
||||
logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s) - %s",
|
||||
logger.Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s) - %s",
|
||||
c.name, method, err.Error(), string(content))
|
||||
} else {
|
||||
logx.WithDuration(duration).Infof("mongo(%s) - %s - fail(%s) - %s",
|
||||
logger.Infof("mongo(%s) - %s - fail(%s) - %s",
|
||||
c.name, method, err.Error(), string(content))
|
||||
}
|
||||
} else {
|
||||
if duration > slowThreshold.Load() {
|
||||
logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - ok - %s",
|
||||
logger.Slowf("[MONGO] mongo(%s) - slowcall - %s - ok - %s",
|
||||
c.name, method, string(content))
|
||||
} else {
|
||||
logx.WithDuration(duration).Infof("mongo(%s) - %s - ok - %s", c.name, method, string(content))
|
||||
logger.Infof("mongo(%s) - %s - ok - %s", c.name, method, string(content))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *decoratedCollection) logDurationSimple(method string, startTime time.Duration, err error) {
|
||||
logDuration(c.name, method, startTime, err)
|
||||
func (c *decoratedCollection) logDurationSimple(ctx context.Context, method string, startTime time.Duration, err error) {
|
||||
logDuration(ctx, c.name, method, startTime, err)
|
||||
}
|
||||
|
||||
func (p keepablePromise) accept(err error) error {
|
||||
@@ -483,8 +556,3 @@ func acceptable(err error) bool {
|
||||
err == session.ErrAbortTwice || err == session.ErrCommitAfterAbort ||
|
||||
err == session.ErrUnackWCUnsupported || err == session.ErrSnapshotTransaction
|
||||
}
|
||||
|
||||
func startSpan(ctx context.Context) (context.Context, tracesdk.Span) {
|
||||
tracer := otel.GetTracerProvider().Tracer(trace.TraceName)
|
||||
return tracer.Start(ctx, spanName)
|
||||
}
|
||||
|
||||
@@ -588,21 +588,21 @@ func Test_DecoratedCollectionLogDuration(t *testing.T) {
|
||||
}()
|
||||
|
||||
buf.Reset()
|
||||
c.logDuration("foo", time.Millisecond, nil, "bar")
|
||||
c.logDuration(context.Background(), "foo", time.Millisecond, nil, "bar")
|
||||
assert.Contains(t, buf.String(), "foo")
|
||||
assert.Contains(t, buf.String(), "bar")
|
||||
|
||||
buf.Reset()
|
||||
c.logDuration("foo", time.Millisecond, errors.New("bar"), make(chan int))
|
||||
c.logDuration(context.Background(), "foo", time.Millisecond, errors.New("bar"), make(chan int))
|
||||
assert.Contains(t, buf.String(), "bar")
|
||||
|
||||
buf.Reset()
|
||||
c.logDuration("foo", slowThreshold.Load()+time.Millisecond, errors.New("bar"))
|
||||
c.logDuration(context.Background(), "foo", slowThreshold.Load()+time.Millisecond, errors.New("bar"))
|
||||
assert.Contains(t, buf.String(), "foo")
|
||||
assert.Contains(t, buf.String(), "slowcall")
|
||||
|
||||
buf.Reset()
|
||||
c.logDuration("foo", slowThreshold.Load()+time.Millisecond, nil)
|
||||
c.logDuration(context.Background(), "foo", slowThreshold.Load()+time.Millisecond, nil)
|
||||
assert.Contains(t, buf.String(), "foo")
|
||||
assert.Contains(t, buf.String(), "slowcall")
|
||||
}
|
||||
@@ -630,15 +630,15 @@ func (d *dropBreaker) Allow() (breaker.Promise, error) {
|
||||
return nil, errDummy
|
||||
}
|
||||
|
||||
func (d *dropBreaker) Do(req func() error) error {
|
||||
func (d *dropBreaker) Do(_ func() error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *dropBreaker) DoWithAcceptable(req func() error, acceptable breaker.Acceptable) error {
|
||||
func (d *dropBreaker) DoWithAcceptable(_ func() error, _ breaker.Acceptable) error {
|
||||
return errDummy
|
||||
}
|
||||
|
||||
func (d *dropBreaker) DoWithFallback(req func() error, fallback func(err error) error) error {
|
||||
func (d *dropBreaker) DoWithFallback(_ func() error, _ func(err error) error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,14 @@ import (
|
||||
mopt "go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
const (
|
||||
startSession = "StartSession"
|
||||
abortTransaction = "AbortTransaction"
|
||||
commitTransaction = "CommitTransaction"
|
||||
withTransaction = "WithTransaction"
|
||||
endSession = "EndSession"
|
||||
)
|
||||
|
||||
type (
|
||||
// Model is a mongodb store model that represents a collection.
|
||||
Model struct {
|
||||
@@ -23,7 +31,8 @@ type (
|
||||
|
||||
wrappedSession struct {
|
||||
mongo.Session
|
||||
brk breaker.Breaker
|
||||
name string
|
||||
brk breaker.Breaker
|
||||
}
|
||||
)
|
||||
|
||||
@@ -66,7 +75,7 @@ func (m *Model) StartSession(opts ...*mopt.SessionOptions) (sess mongo.Session,
|
||||
err = m.brk.DoWithAcceptable(func() error {
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
logDuration(m.name, "StartSession", starTime, err)
|
||||
logDuration(context.Background(), m.name, startSession, starTime, err)
|
||||
}()
|
||||
|
||||
session, sessionErr := m.cli.StartSession(opts...)
|
||||
@@ -76,11 +85,13 @@ func (m *Model) StartSession(opts ...*mopt.SessionOptions) (sess mongo.Session,
|
||||
|
||||
sess = &wrappedSession{
|
||||
Session: session,
|
||||
name: m.name,
|
||||
brk: m.brk,
|
||||
}
|
||||
|
||||
return nil
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -169,33 +180,57 @@ func (m *Model) FindOneAndUpdate(ctx context.Context, v, filter interface{}, upd
|
||||
return res.Decode(v)
|
||||
}
|
||||
|
||||
func (w *wrappedSession) AbortTransaction(ctx context.Context) error {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
// AbortTransaction implements the mongo.Session interface.
|
||||
func (w *wrappedSession) AbortTransaction(ctx context.Context) (err error) {
|
||||
ctx, span := startSpan(ctx, abortTransaction)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
return w.brk.DoWithAcceptable(func() error {
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
logDuration(ctx, w.name, abortTransaction, starTime, err)
|
||||
}()
|
||||
|
||||
return w.Session.AbortTransaction(ctx)
|
||||
}, acceptable)
|
||||
}
|
||||
|
||||
func (w *wrappedSession) CommitTransaction(ctx context.Context) error {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
// CommitTransaction implements the mongo.Session interface.
|
||||
func (w *wrappedSession) CommitTransaction(ctx context.Context) (err error) {
|
||||
ctx, span := startSpan(ctx, commitTransaction)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
return w.brk.DoWithAcceptable(func() error {
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
logDuration(ctx, w.name, commitTransaction, starTime, err)
|
||||
}()
|
||||
|
||||
return w.Session.CommitTransaction(ctx)
|
||||
}, acceptable)
|
||||
}
|
||||
|
||||
// WithTransaction implements the mongo.Session interface.
|
||||
func (w *wrappedSession) WithTransaction(
|
||||
ctx context.Context,
|
||||
fn func(sessCtx mongo.SessionContext) (interface{}, error),
|
||||
opts ...*mopt.TransactionOptions,
|
||||
) (res interface{}, err error) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
ctx, span := startSpan(ctx, withTransaction)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = w.brk.DoWithAcceptable(func() error {
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
logDuration(ctx, w.name, withTransaction, starTime, err)
|
||||
}()
|
||||
|
||||
res, err = w.Session.WithTransaction(ctx, fn, opts...)
|
||||
return err
|
||||
}, acceptable)
|
||||
@@ -203,11 +238,20 @@ func (w *wrappedSession) WithTransaction(
|
||||
return
|
||||
}
|
||||
|
||||
// EndSession implements the mongo.Session interface.
|
||||
func (w *wrappedSession) EndSession(ctx context.Context) {
|
||||
ctx, span := startSpan(ctx)
|
||||
defer span.End()
|
||||
var err error
|
||||
ctx, span := startSpan(ctx, endSession)
|
||||
defer func() {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = w.brk.DoWithAcceptable(func() error {
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
logDuration(ctx, w.name, endSession, starTime, err)
|
||||
}()
|
||||
|
||||
_ = w.brk.DoWithAcceptable(func() error {
|
||||
w.Session.EndSession(ctx)
|
||||
return nil
|
||||
}, acceptable)
|
||||
|
||||
37
core/stores/mon/trace.go
Normal file
37
core/stores/mon/trace.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package mon
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/trace"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
oteltrace "go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
var mongoCmdAttributeKey = attribute.Key("mongo.cmd")
|
||||
|
||||
func startSpan(ctx context.Context, cmd string) (context.Context, oteltrace.Span) {
|
||||
tracer := otel.GetTracerProvider().Tracer(trace.TraceName)
|
||||
ctx, span := tracer.Start(ctx,
|
||||
spanName,
|
||||
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
|
||||
)
|
||||
span.SetAttributes(mongoCmdAttributeKey.String(cmd))
|
||||
return ctx, span
|
||||
}
|
||||
|
||||
func endSpan(span oteltrace.Span, err error) {
|
||||
defer span.End()
|
||||
|
||||
if err == nil || err == mongo.ErrNoDocuments ||
|
||||
err == mongo.ErrNilValue || err == mongo.ErrNilDocument {
|
||||
span.SetStatus(codes.Ok, "")
|
||||
return
|
||||
}
|
||||
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
span.RecordError(err)
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package mon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -15,11 +16,12 @@ func FormatAddr(hosts []string) string {
|
||||
return strings.Join(hosts, mongoAddrSep)
|
||||
}
|
||||
|
||||
func logDuration(name, method string, startTime time.Duration, err error) {
|
||||
func logDuration(ctx context.Context, name, method string, startTime time.Duration, err error) {
|
||||
duration := timex.Since(startTime)
|
||||
logger := logx.WithContext(ctx).WithDuration(duration)
|
||||
if err != nil {
|
||||
logx.WithDuration(duration).Infof("mongo(%s) - %s - fail(%s)", name, method, err.Error())
|
||||
logger.Infof("mongo(%s) - %s - fail(%s)", name, method, err.Error())
|
||||
} else {
|
||||
logx.WithDuration(duration).Infof("mongo(%s) - %s - ok", name, method)
|
||||
logger.Infof("mongo(%s) - %s - ok", name, method)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package mon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
@@ -50,12 +51,12 @@ func Test_logDuration(t *testing.T) {
|
||||
}()
|
||||
|
||||
buf.Reset()
|
||||
logDuration("foo", "bar", time.Millisecond, nil)
|
||||
logDuration(context.Background(), "foo", "bar", time.Millisecond, nil)
|
||||
assert.Contains(t, buf.String(), "foo")
|
||||
assert.Contains(t, buf.String(), "bar")
|
||||
|
||||
buf.Reset()
|
||||
logDuration("foo", "bar", time.Millisecond, errors.New("bar"))
|
||||
logDuration(context.Background(), "foo", "bar", time.Millisecond, errors.New("bar"))
|
||||
assert.Contains(t, buf.String(), "foo")
|
||||
assert.Contains(t, buf.String(), "bar")
|
||||
assert.Contains(t, buf.String(), "fail")
|
||||
|
||||
Reference in New Issue
Block a user