feat: upgrade go-redis to v9 (#3088)

Co-authored-by: cong <zhangcong1992@gmail.com>
This commit is contained in:
Kevin Wan
2024-01-13 22:40:58 +08:00
committed by GitHub
parent 7822a4c1cb
commit 368caa7608
13 changed files with 133 additions and 226 deletions

View File

@@ -7,9 +7,8 @@ import (
"strings"
"time"
red "github.com/go-redis/redis/v8"
red "github.com/redis/go-redis/v9"
"github.com/zeromicro/go-zero/core/breaker"
"github.com/zeromicro/go-zero/core/errorx"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mapping"
"github.com/zeromicro/go-zero/core/timex"
@@ -23,93 +22,65 @@ import (
const spanName = "redis"
var (
startTimeKey = contextKey("startTime")
durationHook = hook{}
redisCmdsAttributeKey = attribute.Key("redis.cmds")
)
type (
contextKey string
hook struct{}
)
type hook struct{}
func (h hook) BeforeProcess(ctx context.Context, cmd red.Cmder) (context.Context, error) {
return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmd), nil
func (h hook) DialHook(next red.DialHook) red.DialHook {
// no need to implement
return next
}
func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error {
err := cmd.Err()
h.endSpan(ctx, err)
func (h hook) ProcessHook(next red.ProcessHook) red.ProcessHook {
return func(ctx context.Context, cmd red.Cmder) error {
start := timex.Now()
ctx, endSpan := h.startSpan(ctx, cmd)
val := ctx.Value(startTimeKey)
if val == nil {
return nil
}
err := next(ctx, cmd)
start, ok := val.(time.Duration)
if !ok {
return nil
}
endSpan(err)
duration := timex.Since(start)
duration := timex.Since(start)
if duration > slowThreshold.Load() {
logDuration(ctx, []red.Cmder{cmd}, duration)
metricSlowCount.Inc(cmd.Name())
}
metricReqDur.ObserveFloat(float64(duration)/float64(time.Millisecond), cmd.Name())
if msg := formatError(err); len(msg) > 0 {
metricReqErr.Inc(cmd.Name(), msg)
}
return nil
}
func (h hook) BeforeProcessPipeline(ctx context.Context, cmds []red.Cmder) (context.Context, error) {
if len(cmds) == 0 {
return ctx, nil
}
return h.startSpan(context.WithValue(ctx, startTimeKey, timex.Now()), cmds...), nil
}
func (h hook) AfterProcessPipeline(ctx context.Context, cmds []red.Cmder) error {
if len(cmds) == 0 {
return nil
}
batchError := errorx.BatchError{}
for _, cmd := range cmds {
err := cmd.Err()
if err == nil {
continue
if duration > slowThreshold.Load() {
logDuration(ctx, []red.Cmder{cmd}, duration)
metricSlowCount.Inc(cmd.Name())
}
batchError.Add(err)
}
h.endSpan(ctx, batchError.Err())
metricReqDur.Observe(duration.Milliseconds(), cmd.Name())
if msg := formatError(err); len(msg) > 0 {
metricReqErr.Inc(cmd.Name(), msg)
}
val := ctx.Value(startTimeKey)
if val == nil {
return nil
return err
}
}
start, ok := val.(time.Duration)
if !ok {
return nil
func (h hook) ProcessPipelineHook(next red.ProcessPipelineHook) red.ProcessPipelineHook {
return func(ctx context.Context, cmds []red.Cmder) error {
if len(cmds) == 0 {
return next(ctx, cmds)
}
start := timex.Now()
ctx, endSpan := h.startSpan(ctx, cmds...)
err := next(ctx, cmds)
endSpan(err)
duration := timex.Since(start)
if duration > slowThreshold.Load()*time.Duration(len(cmds)) {
logDuration(ctx, cmds, duration)
}
metricReqDur.Observe(duration.Milliseconds(), "Pipeline")
if msg := formatError(err); len(msg) > 0 {
metricReqErr.Inc("Pipeline", msg)
}
return err
}
duration := timex.Since(start)
if duration > slowThreshold.Load()*time.Duration(len(cmds)) {
logDuration(ctx, cmds, duration)
}
metricReqDur.Observe(duration.Milliseconds(), "Pipeline")
if msg := formatError(batchError.Err()); len(msg) > 0 {
metricReqErr.Inc("Pipeline", msg)
}
return nil
}
func formatError(err error) string {
@@ -152,7 +123,7 @@ func logDuration(ctx context.Context, cmds []red.Cmder, duration time.Duration)
logx.WithContext(ctx).WithDuration(duration).Slowf("[REDIS] slowcall on executing: %s", buf.String())
}
func (h hook) startSpan(ctx context.Context, cmds ...red.Cmder) context.Context {
func (h hook) startSpan(ctx context.Context, cmds ...red.Cmder) (context.Context, func(err error)) {
tracer := trace.TracerFromContext(ctx)
ctx, span := tracer.Start(ctx,
@@ -166,18 +137,15 @@ func (h hook) startSpan(ctx context.Context, cmds ...red.Cmder) context.Context
}
span.SetAttributes(redisCmdsAttributeKey.StringSlice(cmdStrs))
return ctx
}
return ctx, func(err error) {
defer span.End()
func (h hook) endSpan(ctx context.Context, err error) {
span := oteltrace.SpanFromContext(ctx)
defer span.End()
if err == nil || err == red.Nil {
span.SetStatus(codes.Ok, "")
return
}
if err == nil || err == red.Nil {
span.SetStatus(codes.Ok, "")
return
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}