export cache package, add client interceptor customization

This commit is contained in:
kevin
2020-09-29 17:25:49 +08:00
parent dbca20e3df
commit d1b303fe7e
19 changed files with 70 additions and 79 deletions

129
core/stores/cache/cache.go vendored Normal file
View File

@@ -0,0 +1,129 @@
package cache
import (
"fmt"
"log"
"time"
"github.com/tal-tech/go-zero/core/errorx"
"github.com/tal-tech/go-zero/core/hash"
"github.com/tal-tech/go-zero/core/syncx"
)
type (
Cache interface {
DelCache(keys ...string) error
GetCache(key string, v interface{}) error
SetCache(key string, v interface{}) error
SetCacheWithExpire(key string, v interface{}, expire time.Duration) error
Take(v interface{}, key string, query func(v interface{}) error) error
TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error
}
cacheCluster struct {
dispatcher *hash.ConsistentHash
errNotFound error
}
)
func NewCache(c ClusterConf, barrier syncx.SharedCalls, st *CacheStat, errNotFound error,
opts ...Option) Cache {
if len(c) == 0 || TotalWeights(c) <= 0 {
log.Fatal("no cache nodes")
}
if len(c) == 1 {
return NewCacheNode(c[0].NewRedis(), barrier, st, errNotFound, opts...)
}
dispatcher := hash.NewConsistentHash()
for _, node := range c {
cn := NewCacheNode(node.NewRedis(), barrier, st, errNotFound, opts...)
dispatcher.AddWithWeight(cn, node.Weight)
}
return cacheCluster{
dispatcher: dispatcher,
errNotFound: errNotFound,
}
}
func (cc cacheCluster) DelCache(keys ...string) error {
switch len(keys) {
case 0:
return nil
case 1:
key := keys[0]
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).DelCache(key)
default:
var be errorx.BatchError
nodes := make(map[interface{}][]string)
for _, key := range keys {
c, ok := cc.dispatcher.Get(key)
if !ok {
be.Add(fmt.Errorf("key %q not found", key))
continue
}
nodes[c] = append(nodes[c], key)
}
for c, ks := range nodes {
if err := c.(Cache).DelCache(ks...); err != nil {
be.Add(err)
}
}
return be.Err()
}
}
func (cc cacheCluster) GetCache(key string, v interface{}) error {
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).GetCache(key, v)
}
func (cc cacheCluster) SetCache(key string, v interface{}) error {
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).SetCache(key, v)
}
func (cc cacheCluster) SetCacheWithExpire(key string, v interface{}, expire time.Duration) error {
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).SetCacheWithExpire(key, v, expire)
}
func (cc cacheCluster) Take(v interface{}, key string, query func(v interface{}) error) error {
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).Take(v, key, query)
}
func (cc cacheCluster) TakeWithExpire(v interface{}, key string,
query func(v interface{}, expire time.Duration) error) error {
c, ok := cc.dispatcher.Get(key)
if !ok {
return cc.errNotFound
}
return c.(Cache).TakeWithExpire(v, key, query)
}

200
core/stores/cache/cache_test.go vendored Normal file
View File

@@ -0,0 +1,200 @@
package cache
import (
"encoding/json"
"fmt"
"math"
"strconv"
"testing"
"time"
"github.com/alicebob/miniredis"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/errorx"
"github.com/tal-tech/go-zero/core/hash"
"github.com/tal-tech/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/core/syncx"
)
type mockedNode struct {
vals map[string][]byte
errNotFound error
}
func (mc *mockedNode) DelCache(keys ...string) error {
var be errorx.BatchError
for _, key := range keys {
if _, ok := mc.vals[key]; !ok {
be.Add(mc.errNotFound)
} else {
delete(mc.vals, key)
}
}
return be.Err()
}
func (mc *mockedNode) GetCache(key string, v interface{}) error {
bs, ok := mc.vals[key]
if ok {
return json.Unmarshal(bs, v)
}
return mc.errNotFound
}
func (mc *mockedNode) SetCache(key string, v interface{}) error {
data, err := json.Marshal(v)
if err != nil {
return err
}
mc.vals[key] = data
return nil
}
func (mc *mockedNode) SetCacheWithExpire(key string, v interface{}, expire time.Duration) error {
return mc.SetCache(key, v)
}
func (mc *mockedNode) Take(v interface{}, key string, query func(v interface{}) error) error {
if _, ok := mc.vals[key]; ok {
return mc.GetCache(key, v)
}
if err := query(v); err != nil {
return err
}
return mc.SetCache(key, v)
}
func (mc *mockedNode) TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error {
return mc.Take(v, key, func(v interface{}) error {
return query(v, 0)
})
}
func TestCache_SetDel(t *testing.T) {
const total = 1000
r1 := miniredis.NewMiniRedis()
assert.Nil(t, r1.Start())
defer r1.Close()
r2 := miniredis.NewMiniRedis()
assert.Nil(t, r2.Start())
defer r2.Close()
conf := ClusterConf{
{
RedisConf: redis.RedisConf{
Host: r1.Addr(),
Type: redis.NodeType,
},
Weight: 100,
},
{
RedisConf: redis.RedisConf{
Host: r2.Addr(),
Type: redis.NodeType,
},
Weight: 100,
},
}
c := NewCache(conf, syncx.NewSharedCalls(), NewCacheStat("mock"), errPlaceholder)
for i := 0; i < total; i++ {
if i%2 == 0 {
assert.Nil(t, c.SetCache(fmt.Sprintf("key/%d", i), i))
} else {
assert.Nil(t, c.SetCacheWithExpire(fmt.Sprintf("key/%d", i), i, 0))
}
}
for i := 0; i < total; i++ {
var v int
assert.Nil(t, c.GetCache(fmt.Sprintf("key/%d", i), &v))
assert.Equal(t, i, v)
}
for i := 0; i < total; i++ {
assert.Nil(t, c.DelCache(fmt.Sprintf("key/%d", i)))
}
for i := 0; i < total; i++ {
var v int
assert.Equal(t, errPlaceholder, c.GetCache(fmt.Sprintf("key/%d", i), &v))
assert.Equal(t, 0, v)
}
}
func TestCache_Balance(t *testing.T) {
const (
numNodes = 100
total = 10000
)
dispatcher := hash.NewConsistentHash()
maps := make([]map[string][]byte, numNodes)
for i := 0; i < numNodes; i++ {
maps[i] = map[string][]byte{
strconv.Itoa(i): []byte(strconv.Itoa(i)),
}
}
for i := 0; i < numNodes; i++ {
dispatcher.AddWithWeight(&mockedNode{
vals: maps[i],
errNotFound: errPlaceholder,
}, 100)
}
c := cacheCluster{
dispatcher: dispatcher,
errNotFound: errPlaceholder,
}
for i := 0; i < total; i++ {
assert.Nil(t, c.SetCache(strconv.Itoa(i), i))
}
counts := make(map[int]int)
for i, m := range maps {
counts[i] = len(m)
}
entropy := calcEntropy(counts, total)
assert.True(t, len(counts) > 1)
assert.True(t, entropy > .95, fmt.Sprintf("entropy should be greater than 0.95, but got %.2f", entropy))
for i := 0; i < total; i++ {
var v int
assert.Nil(t, c.GetCache(strconv.Itoa(i), &v))
assert.Equal(t, i, v)
}
for i := 0; i < total/10; i++ {
assert.Nil(t, c.DelCache(strconv.Itoa(i*10), strconv.Itoa(i*10+1), strconv.Itoa(i*10+2)))
assert.Nil(t, c.DelCache(strconv.Itoa(i*10+9)))
}
var count int
for i := 0; i < total/10; i++ {
var val int
if i%2 == 0 {
assert.Nil(t, c.Take(&val, strconv.Itoa(i*10), func(v interface{}) error {
*v.(*int) = i
count++
return nil
}))
} else {
assert.Nil(t, c.TakeWithExpire(&val, strconv.Itoa(i*10), func(v interface{}, expire time.Duration) error {
*v.(*int) = i
count++
return nil
}))
}
assert.Equal(t, i, val)
}
assert.Equal(t, total/10, count)
}
func calcEntropy(m map[int]int, total int) float64 {
var entropy float64
for _, v := range m {
proba := float64(v) / float64(total)
entropy -= proba * math.Log2(proba)
}
return entropy / math.Log2(float64(len(m)))
}

View File

@@ -1,5 +1,3 @@
package cache
import "github.com/tal-tech/go-zero/core/stores/internal"
type CacheConf = internal.ClusterConf
type CacheConf = ClusterConf

208
core/stores/cache/cachenode.go vendored Normal file
View File

@@ -0,0 +1,208 @@
package cache
import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
"time"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/core/syncx"
)
const (
notFoundPlaceholder = "*"
// make the expiry unstable to avoid lots of cached items expire at the same time
// make the unstable expiry to be [0.95, 1.05] * seconds
expiryDeviation = 0.05
)
// indicates there is no such value associate with the key
var errPlaceholder = errors.New("placeholder")
type cacheNode struct {
rds *redis.Redis
expiry time.Duration
notFoundExpiry time.Duration
barrier syncx.SharedCalls
r *rand.Rand
lock *sync.Mutex
unstableExpiry mathx.Unstable
stat *CacheStat
errNotFound error
}
func NewCacheNode(rds *redis.Redis, barrier syncx.SharedCalls, st *CacheStat,
errNotFound error, opts ...Option) Cache {
o := newOptions(opts...)
return cacheNode{
rds: rds,
expiry: o.Expiry,
notFoundExpiry: o.NotFoundExpiry,
barrier: barrier,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: st,
errNotFound: errNotFound,
}
}
func (c cacheNode) DelCache(keys ...string) error {
if len(keys) == 0 {
return nil
}
if _, err := c.rds.Del(keys...); err != nil {
logx.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
c.asyncRetryDelCache(keys...)
}
return nil
}
func (c cacheNode) GetCache(key string, v interface{}) error {
if err := c.doGetCache(key, v); err == errPlaceholder {
return c.errNotFound
} else {
return err
}
}
func (c cacheNode) SetCache(key string, v interface{}) error {
return c.SetCacheWithExpire(key, v, c.aroundDuration(c.expiry))
}
func (c cacheNode) SetCacheWithExpire(key string, v interface{}, expire time.Duration) error {
data, err := json.Marshal(v)
if err != nil {
return err
}
return c.rds.Setex(key, string(data), int(expire.Seconds()))
}
func (c cacheNode) String() string {
return c.rds.Addr
}
func (c cacheNode) Take(v interface{}, key string, query func(v interface{}) error) error {
return c.doTake(v, key, query, func(v interface{}) error {
return c.SetCache(key, v)
})
}
func (c cacheNode) TakeWithExpire(v interface{}, key string,
query func(v interface{}, expire time.Duration) error) error {
expire := c.aroundDuration(c.expiry)
return c.doTake(v, key, func(v interface{}) error {
return query(v, expire)
}, func(v interface{}) error {
return c.SetCacheWithExpire(key, v, expire)
})
}
func (c cacheNode) aroundDuration(duration time.Duration) time.Duration {
return c.unstableExpiry.AroundDuration(duration)
}
func (c cacheNode) asyncRetryDelCache(keys ...string) {
AddCleanTask(func() error {
_, err := c.rds.Del(keys...)
return err
}, keys...)
}
func (c cacheNode) doGetCache(key string, v interface{}) error {
c.stat.IncrementTotal()
data, err := c.rds.Get(key)
if err != nil {
c.stat.IncrementMiss()
return err
}
if len(data) == 0 {
c.stat.IncrementMiss()
return c.errNotFound
}
c.stat.IncrementHit()
if data == notFoundPlaceholder {
return errPlaceholder
}
return c.processCache(key, data, v)
}
func (c cacheNode) doTake(v interface{}, key string, query func(v interface{}) error,
cacheVal func(v interface{}) error) error {
val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) {
if err := c.doGetCache(key, v); err != nil {
if err == errPlaceholder {
return nil, c.errNotFound
} else if err != c.errNotFound {
// why we just return the error instead of query from db,
// because we don't allow the disaster pass to the dbs.
// fail fast, in case we bring down the dbs.
return nil, err
}
if err = query(v); err == c.errNotFound {
if err = c.setCacheWithNotFound(key); err != nil {
logx.Error(err)
}
return nil, c.errNotFound
} else if err != nil {
c.stat.IncrementDbFails()
return nil, err
}
if err = cacheVal(v); err != nil {
logx.Error(err)
}
}
return json.Marshal(v)
})
if err != nil {
return err
}
if fresh {
return nil
} else {
// got the result from previous ongoing query
c.stat.IncrementTotal()
c.stat.IncrementHit()
}
return json.Unmarshal(val.([]byte), v)
}
func (c cacheNode) processCache(key string, data string, v interface{}) error {
err := json.Unmarshal([]byte(data), v)
if err == nil {
return nil
}
report := fmt.Sprintf("unmarshal cache, node: %s, key: %s, value: %s, error: %v",
c.rds.Addr, key, data, err)
logx.Error(report)
stat.Report(report)
if _, e := c.rds.Del(key); e != nil {
logx.Errorf("delete invalid cache, node: %s, key: %s, value: %s, error: %v",
c.rds.Addr, key, data, e)
}
// returns errNotFound to reload the value by the given queryFn
return c.errNotFound
}
func (c cacheNode) setCacheWithNotFound(key string) error {
return c.rds.Setex(key, notFoundPlaceholder, int(c.aroundDuration(c.notFoundExpiry).Seconds()))
}

65
core/stores/cache/cachenode_test.go vendored Normal file
View File

@@ -0,0 +1,65 @@
package cache
import (
"errors"
"math/rand"
"sync"
"testing"
"time"
"github.com/alicebob/miniredis"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/stores/redis"
)
func init() {
logx.Disable()
stat.SetReporter(nil)
}
func TestCacheNode_DelCache(t *testing.T) {
s, err := miniredis.Run()
assert.Nil(t, err)
defer s.Close()
cn := cacheNode{
rds: redis.NewRedis(s.Addr(), redis.NodeType),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewCacheStat("any"),
errNotFound: errors.New("any"),
}
assert.Nil(t, cn.DelCache())
assert.Nil(t, cn.DelCache([]string{}...))
assert.Nil(t, cn.DelCache(make([]string, 0)...))
cn.SetCache("first", "one")
assert.Nil(t, cn.DelCache("first"))
cn.SetCache("first", "one")
cn.SetCache("second", "two")
assert.Nil(t, cn.DelCache("first", "second"))
}
func TestCacheNode_InvalidCache(t *testing.T) {
s, err := miniredis.Run()
assert.Nil(t, err)
defer s.Close()
cn := cacheNode{
rds: redis.NewRedis(s.Addr(), redis.NodeType),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewCacheStat("any"),
errNotFound: errors.New("any"),
}
s.Set("any", "value")
var str string
assert.NotNil(t, cn.GetCache("any", &str))
assert.Equal(t, "", str)
_, err = s.Get("any")
assert.Equal(t, miniredis.ErrKeyNotFound, err)
}

View File

@@ -1,21 +1,45 @@
package cache
import (
"time"
import "time"
"github.com/tal-tech/go-zero/core/stores/internal"
const (
defaultExpiry = time.Hour * 24 * 7
defaultNotFoundExpiry = time.Minute
)
type Option = internal.Option
type (
Options struct {
Expiry time.Duration
NotFoundExpiry time.Duration
}
Option func(o *Options)
)
func newOptions(opts ...Option) Options {
var o Options
for _, opt := range opts {
opt(&o)
}
if o.Expiry <= 0 {
o.Expiry = defaultExpiry
}
if o.NotFoundExpiry <= 0 {
o.NotFoundExpiry = defaultNotFoundExpiry
}
return o
}
func WithExpiry(expiry time.Duration) Option {
return func(o *internal.Options) {
return func(o *Options) {
o.Expiry = expiry
}
}
func WithNotFoundExpiry(expiry time.Duration) Option {
return func(o *internal.Options) {
return func(o *Options) {
o.NotFoundExpiry = expiry
}
}

67
core/stores/cache/cachestat.go vendored Normal file
View File

@@ -0,0 +1,67 @@
package cache
import (
"sync/atomic"
"time"
"github.com/tal-tech/go-zero/core/logx"
)
const statInterval = time.Minute
type CacheStat struct {
name string
// export the fields to let the unit tests working,
// reside in internal package, doesn't matter.
Total uint64
Hit uint64
Miss uint64
DbFails uint64
}
func NewCacheStat(name string) *CacheStat {
ret := &CacheStat{
name: name,
}
go ret.statLoop()
return ret
}
func (cs *CacheStat) IncrementTotal() {
atomic.AddUint64(&cs.Total, 1)
}
func (cs *CacheStat) IncrementHit() {
atomic.AddUint64(&cs.Hit, 1)
}
func (cs *CacheStat) IncrementMiss() {
atomic.AddUint64(&cs.Miss, 1)
}
func (cs *CacheStat) IncrementDbFails() {
atomic.AddUint64(&cs.DbFails, 1)
}
func (cs *CacheStat) statLoop() {
ticker := time.NewTicker(statInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
total := atomic.SwapUint64(&cs.Total, 0)
if total == 0 {
continue
}
hit := atomic.SwapUint64(&cs.Hit, 0)
percent := 100 * float32(hit) / float32(total)
miss := atomic.SwapUint64(&cs.Miss, 0)
dbf := atomic.SwapUint64(&cs.DbFails, 0)
logx.Statf("dbcache(%s) - qpm: %d, hit_ratio: %.1f%%, hit: %d, miss: %d, db_fails: %d",
cs.name, total, percent, hit, miss, dbf)
}
}
}

84
core/stores/cache/cleaner.go vendored Normal file
View File

@@ -0,0 +1,84 @@
package cache
import (
"fmt"
"time"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/threading"
)
const (
timingWheelSlots = 300
cleanWorkers = 5
taskKeyLen = 8
)
var (
timingWheel *collection.TimingWheel
taskRunner = threading.NewTaskRunner(cleanWorkers)
)
type delayTask struct {
delay time.Duration
task func() error
keys []string
}
func init() {
var err error
timingWheel, err = collection.NewTimingWheel(time.Second, timingWheelSlots, clean)
logx.Must(err)
proc.AddShutdownListener(func() {
timingWheel.Drain(clean)
})
}
func AddCleanTask(task func() error, keys ...string) {
timingWheel.SetTimer(stringx.Randn(taskKeyLen), delayTask{
delay: time.Second,
task: task,
keys: keys,
}, time.Second)
}
func clean(key, value interface{}) {
taskRunner.Schedule(func() {
dt := value.(delayTask)
err := dt.task()
if err == nil {
return
}
next, ok := nextDelay(dt.delay)
if ok {
dt.delay = next
timingWheel.SetTimer(key, dt, next)
} else {
msg := fmt.Sprintf("retried but failed to clear cache with keys: %q, error: %v",
formatKeys(dt.keys), err)
logx.Error(msg)
stat.Report(msg)
}
})
}
func nextDelay(delay time.Duration) (time.Duration, bool) {
switch delay {
case time.Second:
return time.Second * 5, true
case time.Second * 5:
return time.Minute, true
case time.Minute:
return time.Minute * 5, true
case time.Minute * 5:
return time.Hour, true
default:
return 0, false
}
}

12
core/stores/cache/config.go vendored Normal file
View File

@@ -0,0 +1,12 @@
package cache
import "github.com/tal-tech/go-zero/core/stores/redis"
type (
ClusterConf []NodeConf
NodeConf struct {
redis.RedisConf
Weight int `json:",default=100"`
}
)

22
core/stores/cache/util.go vendored Normal file
View File

@@ -0,0 +1,22 @@
package cache
import "strings"
const keySeparator = ","
func TotalWeights(c []NodeConf) int {
var weights int
for _, node := range c {
if node.Weight < 0 {
node.Weight = 0
}
weights += node.Weight
}
return weights
}
func formatKeys(keys []string) string {
return strings.Join(keys, keySeparator)
}