feat: update go-redis to v8, support ctx in redis methods (#1507)

* feat: update go-redis to v8, support ctx in redis methods

* fix compile errors

* chore: remove unused const

* chore: add tracing log on redis
This commit is contained in:
Kevin Wan
2022-02-09 11:06:06 +08:00
committed by GitHub
parent 77482c8946
commit 822ee2e1c5
15 changed files with 980 additions and 264 deletions

View File

@@ -23,10 +23,9 @@ func TestPeriodLimit_RedisUnavailable(t *testing.T) {
const (
seconds = 1
total = 100
quota = 5
)
l := NewPeriodLimit(seconds, quota, redis.NewRedis(s.Addr(), redis.NodeType), "periodlimit")
l := NewPeriodLimit(seconds, quota, redis.New(s.Addr()), "periodlimit")
s.Close()
val, err := l.Take("first")
assert.NotNil(t, err)

83
core/stores/redis/hook.go Normal file
View File

@@ -0,0 +1,83 @@
package redis
import (
"context"
"strings"
"time"
red "github.com/go-redis/redis/v8"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mapping"
"github.com/zeromicro/go-zero/core/timex"
)
var (
startTimeKey = contextKey("startTime")
durationHook = hook{}
)
type (
contextKey string
hook struct{}
)
func (h hook) BeforeProcess(ctx context.Context, _ red.Cmder) (context.Context, error) {
return context.WithValue(ctx, startTimeKey, timex.Now()), nil
}
func (h hook) AfterProcess(ctx context.Context, cmd red.Cmder) error {
val := ctx.Value(startTimeKey)
if val == nil {
return nil
}
start, ok := val.(time.Duration)
if !ok {
return nil
}
duration := timex.Since(start)
if duration > slowThreshold.Load() {
logDuration(ctx, cmd, duration)
}
return nil
}
func (h hook) BeforeProcessPipeline(ctx context.Context, _ []red.Cmder) (context.Context, error) {
return context.WithValue(ctx, startTimeKey, timex.Now()), nil
}
func (h hook) AfterProcessPipeline(ctx context.Context, cmds []red.Cmder) error {
if len(cmds) == 0 {
return nil
}
val := ctx.Value(startTimeKey)
if val == nil {
return nil
}
start, ok := val.(time.Duration)
if !ok {
return nil
}
duration := timex.Since(start)
if duration > slowThreshold.Load()*time.Duration(len(cmds)) {
logDuration(ctx, cmds[0], duration)
}
return nil
}
func logDuration(ctx context.Context, cmd red.Cmder, duration time.Duration) {
var buf strings.Builder
for i, arg := range cmd.Args() {
if i > 0 {
buf.WriteByte(' ')
}
buf.WriteString(mapping.Repr(arg))
}
logx.WithContext(ctx).WithDuration(duration).Slowf("[REDIS] slowcall on executing: %s", buf.String())
}

View File

@@ -0,0 +1,138 @@
package redis
import (
"context"
"log"
"strings"
"testing"
"time"
red "github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"
)
func TestHookProcessCase1(t *testing.T) {
writer := log.Writer()
var buf strings.Builder
log.SetOutput(&buf)
defer log.SetOutput(writer)
ctx, err := durationHook.BeforeProcess(context.Background(), nil)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, durationHook.AfterProcess(ctx, red.NewCmd(context.Background())))
assert.False(t, strings.Contains(buf.String(), "slow"))
}
func TestHookProcessCase2(t *testing.T) {
writer := log.Writer()
var buf strings.Builder
log.SetOutput(&buf)
defer log.SetOutput(writer)
ctx, err := durationHook.BeforeProcess(context.Background(), nil)
if err != nil {
t.Fatal(err)
}
time.Sleep(slowThreshold.Load() + time.Millisecond)
assert.Nil(t, durationHook.AfterProcess(ctx, red.NewCmd(context.Background(), "foo", "bar")))
assert.True(t, strings.Contains(buf.String(), "slow"))
}
func TestHookProcessCase3(t *testing.T) {
writer := log.Writer()
var buf strings.Builder
log.SetOutput(&buf)
defer log.SetOutput(writer)
assert.Nil(t, durationHook.AfterProcess(context.Background(), red.NewCmd(context.Background())))
assert.True(t, buf.Len() == 0)
}
func TestHookProcessCase4(t *testing.T) {
writer := log.Writer()
var buf strings.Builder
log.SetOutput(&buf)
defer log.SetOutput(writer)
ctx := context.WithValue(context.Background(), startTimeKey, "foo")
assert.Nil(t, durationHook.AfterProcess(ctx, red.NewCmd(context.Background())))
assert.True(t, buf.Len() == 0)
}
func TestHookProcessPipelineCase1(t *testing.T) {
writer := log.Writer()
var buf strings.Builder
log.SetOutput(&buf)
defer log.SetOutput(writer)
ctx, err := durationHook.BeforeProcessPipeline(context.Background(), nil)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, durationHook.AfterProcessPipeline(ctx, []red.Cmder{
red.NewCmd(context.Background()),
}))
assert.False(t, strings.Contains(buf.String(), "slow"))
}
func TestHookProcessPipelineCase2(t *testing.T) {
writer := log.Writer()
var buf strings.Builder
log.SetOutput(&buf)
defer log.SetOutput(writer)
ctx, err := durationHook.BeforeProcessPipeline(context.Background(), nil)
if err != nil {
t.Fatal(err)
}
time.Sleep(slowThreshold.Load() + time.Millisecond)
assert.Nil(t, durationHook.AfterProcessPipeline(ctx, []red.Cmder{
red.NewCmd(context.Background(), "foo", "bar"),
}))
assert.True(t, strings.Contains(buf.String(), "slow"))
}
func TestHookProcessPipelineCase3(t *testing.T) {
writer := log.Writer()
var buf strings.Builder
log.SetOutput(&buf)
defer log.SetOutput(writer)
assert.Nil(t, durationHook.AfterProcessPipeline(context.Background(), []red.Cmder{
red.NewCmd(context.Background()),
}))
assert.True(t, buf.Len() == 0)
}
func TestHookProcessPipelineCase4(t *testing.T) {
writer := log.Writer()
var buf strings.Builder
log.SetOutput(&buf)
defer log.SetOutput(writer)
ctx := context.WithValue(context.Background(), startTimeKey, "foo")
assert.Nil(t, durationHook.AfterProcessPipeline(ctx, []red.Cmder{
red.NewCmd(context.Background()),
}))
assert.True(t, buf.Len() == 0)
}
func TestHookProcessPipelineCase5(t *testing.T) {
writer := log.Writer()
var buf strings.Builder
log.SetOutput(&buf)
defer log.SetOutput(writer)
ctx := context.WithValue(context.Background(), startTimeKey, "foo")
assert.Nil(t, durationHook.AfterProcessPipeline(ctx, nil))
assert.True(t, buf.Len() == 0)
}

View File

@@ -1,32 +0,0 @@
package redis
import (
"strings"
red "github.com/go-redis/redis"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mapping"
"github.com/zeromicro/go-zero/core/timex"
)
func checkDuration(proc func(red.Cmder) error) func(red.Cmder) error {
return func(cmd red.Cmder) error {
start := timex.Now()
defer func() {
duration := timex.Since(start)
if duration > slowThreshold.Load() {
var buf strings.Builder
for i, arg := range cmd.Args() {
if i > 0 {
buf.WriteByte(' ')
}
buf.WriteString(mapping.Repr(arg))
}
logx.WithDuration(duration).Slowf("[REDIS] slowcall on executing: %s", buf.String())
}
}()
return proc(cmd)
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,7 @@
package redis
import (
"context"
"crypto/tls"
"errors"
"io"
@@ -9,7 +10,7 @@ import (
"time"
"github.com/alicebob/miniredis/v2"
red "github.com/go-redis/redis"
red "github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stringx"
@@ -964,13 +965,14 @@ func TestRedis_SortedSet(t *testing.T) {
assert.NotNil(t, err)
client.Zadd("second", 2, "aa")
client.Zadd("third", 3, "bbb")
val, err = client.Zunionstore("union", ZStore{
val, err = client.Zunionstore("union", &ZStore{
Keys: []string{"second", "third"},
Weights: []float64{1, 2},
Aggregate: "SUM",
}, "second", "third")
})
assert.Nil(t, err)
assert.Equal(t, int64(2), val)
_, err = New(client.Addr, badType()).Zunionstore("union", ZStore{})
_, err = New(client.Addr, badType()).Zunionstore("union", &ZStore{})
assert.NotNil(t, err)
vals, err = client.Zrange("union", 0, 10000)
assert.Nil(t, err)
@@ -988,9 +990,9 @@ func TestRedis_Pipelined(t *testing.T) {
}))
err := client.Pipelined(
func(pipe Pipeliner) error {
pipe.Incr("pipelined_counter")
pipe.Expire("pipelined_counter", time.Hour)
pipe.ZAdd("zadd", Z{Score: 12, Member: "zadd"})
pipe.Incr(context.Background(), "pipelined_counter")
pipe.Expire(context.Background(), "pipelined_counter", time.Hour)
pipe.ZAdd(context.Background(), "zadd", &Z{Score: 12, Member: "zadd"})
return nil
},
)
@@ -1187,6 +1189,6 @@ type mockedNode struct {
RedisNode
}
func (n mockedNode) BLPop(timeout time.Duration, keys ...string) *red.StringSliceCmd {
return red.NewStringSliceCmd("foo", "bar")
func (n mockedNode) BLPop(ctx context.Context, timeout time.Duration, keys ...string) *red.StringSliceCmd {
return red.NewStringSliceCmd(context.Background(), "foo", "bar")
}

View File

@@ -3,7 +3,7 @@ package redis
import (
"fmt"
red "github.com/go-redis/redis"
red "github.com/go-redis/redis/v8"
"github.com/zeromicro/go-zero/core/logx"
)

View File

@@ -4,7 +4,7 @@ import (
"crypto/tls"
"io"
red "github.com/go-redis/redis"
red "github.com/go-redis/redis/v8"
"github.com/zeromicro/go-zero/core/syncx"
)
@@ -32,7 +32,8 @@ func getClient(r *Redis) (*red.Client, error) {
MinIdleConns: idleConns,
TLSConfig: tlsConfig,
})
store.WrapProcess(checkDuration)
store.AddHook(durationHook)
return store, nil
})
if err != nil {

View File

@@ -4,7 +4,7 @@ import (
"crypto/tls"
"io"
red "github.com/go-redis/redis"
red "github.com/go-redis/redis/v8"
"github.com/zeromicro/go-zero/core/syncx"
)
@@ -25,7 +25,7 @@ func getCluster(r *Redis) (*red.ClusterClient, error) {
MinIdleConns: idleConns,
TLSConfig: tlsConfig,
})
store.WrapProcess(checkDuration)
store.AddHook(durationHook)
return store, nil
})

View File

@@ -5,7 +5,7 @@ import (
"sync/atomic"
"time"
red "github.com/go-redis/redis"
red "github.com/go-redis/redis/v8"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stringx"
)

View File

@@ -17,10 +17,12 @@ func CreateRedis() (r *redis.Redis, clean func(), err error) {
return redis.New(mr.Addr()), func() {
ch := make(chan lang.PlaceholderType)
go func() {
mr.Close()
close(ch)
}()
select {
case <-ch:
case <-time.After(time.Second):