fix golint issues in core/collection, refine cache interface (#475)
This commit is contained in:
@@ -23,8 +23,10 @@ const (
|
||||
var emptyLruCache = emptyLru{}
|
||||
|
||||
type (
|
||||
// CacheOption defines the method to customize a Cache.
|
||||
CacheOption func(cache *Cache)
|
||||
|
||||
// A Cache object is a in-memory cache.
|
||||
Cache struct {
|
||||
name string
|
||||
lock sync.Mutex
|
||||
@@ -38,6 +40,7 @@ type (
|
||||
}
|
||||
)
|
||||
|
||||
// NewCache returns a Cache with given expire.
|
||||
func NewCache(expire time.Duration, opts ...CacheOption) (*Cache, error) {
|
||||
cache := &Cache{
|
||||
data: make(map[string]interface{}),
|
||||
@@ -72,6 +75,7 @@ func NewCache(expire time.Duration, opts ...CacheOption) (*Cache, error) {
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
// Del deletes the item with the given key from c.
|
||||
func (c *Cache) Del(key string) {
|
||||
c.lock.Lock()
|
||||
delete(c.data, key)
|
||||
@@ -80,6 +84,7 @@ func (c *Cache) Del(key string) {
|
||||
c.timingWheel.RemoveTimer(key)
|
||||
}
|
||||
|
||||
// Get returns the item with the given key from c.
|
||||
func (c *Cache) Get(key string) (interface{}, bool) {
|
||||
value, ok := c.doGet(key)
|
||||
if ok {
|
||||
@@ -91,6 +96,7 @@ func (c *Cache) Get(key string) (interface{}, bool) {
|
||||
return value, ok
|
||||
}
|
||||
|
||||
// Set sets value into c with key.
|
||||
func (c *Cache) Set(key string, value interface{}) {
|
||||
c.lock.Lock()
|
||||
_, ok := c.data[key]
|
||||
@@ -106,6 +112,9 @@ func (c *Cache) Set(key string, value interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// Take returns the item with the given key.
|
||||
// If the item is in c, return it directly.
|
||||
// If not, use fetch method to get the item, set into c and return it.
|
||||
func (c *Cache) Take(key string, fetch func() (interface{}, error)) (interface{}, error) {
|
||||
if val, ok := c.doGet(key); ok {
|
||||
c.stats.IncrementHit()
|
||||
@@ -167,6 +176,7 @@ func (c *Cache) size() int {
|
||||
return len(c.data)
|
||||
}
|
||||
|
||||
// WithLimit customizes a Cache with items up to limit.
|
||||
func WithLimit(limit int) CacheOption {
|
||||
return func(cache *Cache) {
|
||||
if limit > 0 {
|
||||
@@ -175,6 +185,7 @@ func WithLimit(limit int) CacheOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithName customizes a Cache with the given name.
|
||||
func WithName(name string) CacheOption {
|
||||
return func(cache *Cache) {
|
||||
cache.name = name
|
||||
|
||||
@@ -2,6 +2,7 @@ package collection
|
||||
|
||||
import "sync"
|
||||
|
||||
// A Queue is a FIFO queue.
|
||||
type Queue struct {
|
||||
lock sync.Mutex
|
||||
elements []interface{}
|
||||
@@ -11,6 +12,7 @@ type Queue struct {
|
||||
count int
|
||||
}
|
||||
|
||||
// NewQueue returns a Queue object.
|
||||
func NewQueue(size int) *Queue {
|
||||
return &Queue{
|
||||
elements: make([]interface{}, size),
|
||||
@@ -18,6 +20,7 @@ func NewQueue(size int) *Queue {
|
||||
}
|
||||
}
|
||||
|
||||
// Empty checks if q is empty.
|
||||
func (q *Queue) Empty() bool {
|
||||
q.lock.Lock()
|
||||
empty := q.count == 0
|
||||
@@ -26,6 +29,7 @@ func (q *Queue) Empty() bool {
|
||||
return empty
|
||||
}
|
||||
|
||||
// Put puts element into q at the last position.
|
||||
func (q *Queue) Put(element interface{}) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
@@ -44,6 +48,7 @@ func (q *Queue) Put(element interface{}) {
|
||||
q.count++
|
||||
}
|
||||
|
||||
// Take takes the first element out of q if not empty.
|
||||
func (q *Queue) Take() (interface{}, bool) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
@@ -2,12 +2,14 @@ package collection
|
||||
|
||||
import "sync"
|
||||
|
||||
// A Ring can be used as fixed size ring.
|
||||
type Ring struct {
|
||||
elements []interface{}
|
||||
index int
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// NewRing returns a Ring object with the given size n.
|
||||
func NewRing(n int) *Ring {
|
||||
if n < 1 {
|
||||
panic("n should be greater than 0")
|
||||
@@ -18,6 +20,7 @@ func NewRing(n int) *Ring {
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds v into r.
|
||||
func (r *Ring) Add(v interface{}) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
@@ -26,6 +29,7 @@ func (r *Ring) Add(v interface{}) {
|
||||
r.index++
|
||||
}
|
||||
|
||||
// Take takes all items from r.
|
||||
func (r *Ring) Take() []interface{} {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
@@ -18,6 +18,7 @@ type SafeMap struct {
|
||||
dirtyNew map[interface{}]interface{}
|
||||
}
|
||||
|
||||
// NewSafeMap returns a SafeMap.
|
||||
func NewSafeMap() *SafeMap {
|
||||
return &SafeMap{
|
||||
dirtyOld: make(map[interface{}]interface{}),
|
||||
@@ -25,6 +26,7 @@ func NewSafeMap() *SafeMap {
|
||||
}
|
||||
}
|
||||
|
||||
// Del deletes the value with the given key from m.
|
||||
func (m *SafeMap) Del(key interface{}) {
|
||||
m.lock.Lock()
|
||||
if _, ok := m.dirtyOld[key]; ok {
|
||||
@@ -53,6 +55,7 @@ func (m *SafeMap) Del(key interface{}) {
|
||||
m.lock.Unlock()
|
||||
}
|
||||
|
||||
// Get gets the value with the given key from m.
|
||||
func (m *SafeMap) Get(key interface{}) (interface{}, bool) {
|
||||
m.lock.RLock()
|
||||
defer m.lock.RUnlock()
|
||||
@@ -65,6 +68,7 @@ func (m *SafeMap) Get(key interface{}) (interface{}, bool) {
|
||||
return val, ok
|
||||
}
|
||||
|
||||
// Set sets the value into m with the given key.
|
||||
func (m *SafeMap) Set(key, value interface{}) {
|
||||
m.lock.Lock()
|
||||
if m.deletionOld <= maxDeletion {
|
||||
@@ -83,6 +87,7 @@ func (m *SafeMap) Set(key, value interface{}) {
|
||||
m.lock.Unlock()
|
||||
}
|
||||
|
||||
// Size returns the size of m.
|
||||
func (m *SafeMap) Size() int {
|
||||
m.lock.RLock()
|
||||
size := len(m.dirtyOld) + len(m.dirtyNew)
|
||||
|
||||
@@ -21,6 +21,7 @@ type Set struct {
|
||||
tp int
|
||||
}
|
||||
|
||||
// NewSet returns a managed Set, can only put the values with the same type.
|
||||
func NewSet() *Set {
|
||||
return &Set{
|
||||
data: make(map[interface{}]lang.PlaceholderType),
|
||||
@@ -28,6 +29,7 @@ func NewSet() *Set {
|
||||
}
|
||||
}
|
||||
|
||||
// NewUnmanagedSet returns a unmanaged Set, which can put values with different types.
|
||||
func NewUnmanagedSet() *Set {
|
||||
return &Set{
|
||||
data: make(map[interface{}]lang.PlaceholderType),
|
||||
@@ -35,42 +37,49 @@ func NewUnmanagedSet() *Set {
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds i into s.
|
||||
func (s *Set) Add(i ...interface{}) {
|
||||
for _, each := range i {
|
||||
s.add(each)
|
||||
}
|
||||
}
|
||||
|
||||
// AddInt adds int values ii into s.
|
||||
func (s *Set) AddInt(ii ...int) {
|
||||
for _, each := range ii {
|
||||
s.add(each)
|
||||
}
|
||||
}
|
||||
|
||||
// AddInt64 adds int64 values ii into s.
|
||||
func (s *Set) AddInt64(ii ...int64) {
|
||||
for _, each := range ii {
|
||||
s.add(each)
|
||||
}
|
||||
}
|
||||
|
||||
// AddUint adds uint values ii into s.
|
||||
func (s *Set) AddUint(ii ...uint) {
|
||||
for _, each := range ii {
|
||||
s.add(each)
|
||||
}
|
||||
}
|
||||
|
||||
// AddUint64 adds uint64 values ii into s.
|
||||
func (s *Set) AddUint64(ii ...uint64) {
|
||||
for _, each := range ii {
|
||||
s.add(each)
|
||||
}
|
||||
}
|
||||
|
||||
// AddStr adds string values ss into s.
|
||||
func (s *Set) AddStr(ss ...string) {
|
||||
for _, each := range ss {
|
||||
s.add(each)
|
||||
}
|
||||
}
|
||||
|
||||
// Contains checks if i is in s.
|
||||
func (s *Set) Contains(i interface{}) bool {
|
||||
if len(s.data) == 0 {
|
||||
return false
|
||||
@@ -81,6 +90,7 @@ func (s *Set) Contains(i interface{}) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
// Keys returns the keys in s.
|
||||
func (s *Set) Keys() []interface{} {
|
||||
var keys []interface{}
|
||||
|
||||
@@ -91,6 +101,7 @@ func (s *Set) Keys() []interface{} {
|
||||
return keys
|
||||
}
|
||||
|
||||
// KeysInt returns the int keys in s.
|
||||
func (s *Set) KeysInt() []int {
|
||||
var keys []int
|
||||
|
||||
@@ -105,6 +116,7 @@ func (s *Set) KeysInt() []int {
|
||||
return keys
|
||||
}
|
||||
|
||||
// KeysInt64 returns int64 keys in s.
|
||||
func (s *Set) KeysInt64() []int64 {
|
||||
var keys []int64
|
||||
|
||||
@@ -119,6 +131,7 @@ func (s *Set) KeysInt64() []int64 {
|
||||
return keys
|
||||
}
|
||||
|
||||
// KeysUint returns uint keys in s.
|
||||
func (s *Set) KeysUint() []uint {
|
||||
var keys []uint
|
||||
|
||||
@@ -133,6 +146,7 @@ func (s *Set) KeysUint() []uint {
|
||||
return keys
|
||||
}
|
||||
|
||||
// KeysUint64 returns uint64 keys in s.
|
||||
func (s *Set) KeysUint64() []uint64 {
|
||||
var keys []uint64
|
||||
|
||||
@@ -147,6 +161,7 @@ func (s *Set) KeysUint64() []uint64 {
|
||||
return keys
|
||||
}
|
||||
|
||||
// KeysStr returns string keys in s.
|
||||
func (s *Set) KeysStr() []string {
|
||||
var keys []string
|
||||
|
||||
@@ -161,11 +176,13 @@ func (s *Set) KeysStr() []string {
|
||||
return keys
|
||||
}
|
||||
|
||||
// Remove removes i from s.
|
||||
func (s *Set) Remove(i interface{}) {
|
||||
s.validate(i)
|
||||
delete(s.data, i)
|
||||
}
|
||||
|
||||
// Count returns the number of items in s.
|
||||
func (s *Set) Count() int {
|
||||
return len(s.data)
|
||||
}
|
||||
|
||||
@@ -13,8 +13,10 @@ import (
|
||||
const drainWorkers = 8
|
||||
|
||||
type (
|
||||
// Execute defines the method to execute the task.
|
||||
Execute func(key, value interface{})
|
||||
|
||||
// A TimingWheel is a timing wheel object to schedule tasks.
|
||||
TimingWheel struct {
|
||||
interval time.Duration
|
||||
ticker timex.Ticker
|
||||
@@ -54,6 +56,7 @@ type (
|
||||
}
|
||||
)
|
||||
|
||||
// NewTimingWheel returns a TimingWheel.
|
||||
func NewTimingWheel(interval time.Duration, numSlots int, execute Execute) (*TimingWheel, error) {
|
||||
if interval <= 0 || numSlots <= 0 || execute == nil {
|
||||
return nil, fmt.Errorf("interval: %v, slots: %d, execute: %p", interval, numSlots, execute)
|
||||
@@ -85,10 +88,12 @@ func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execu
|
||||
return tw, nil
|
||||
}
|
||||
|
||||
// Drain drains all items and executes them.
|
||||
func (tw *TimingWheel) Drain(fn func(key, value interface{})) {
|
||||
tw.drainChannel <- fn
|
||||
}
|
||||
|
||||
// MoveTimer moves the task with the given key to the given delay.
|
||||
func (tw *TimingWheel) MoveTimer(key interface{}, delay time.Duration) {
|
||||
if delay <= 0 || key == nil {
|
||||
return
|
||||
@@ -100,6 +105,7 @@ func (tw *TimingWheel) MoveTimer(key interface{}, delay time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveTimer removes the task with the given key.
|
||||
func (tw *TimingWheel) RemoveTimer(key interface{}) {
|
||||
if key == nil {
|
||||
return
|
||||
@@ -108,6 +114,7 @@ func (tw *TimingWheel) RemoveTimer(key interface{}) {
|
||||
tw.removeChannel <- key
|
||||
}
|
||||
|
||||
// SetTimer sets the task value with the given key to the delay.
|
||||
func (tw *TimingWheel) SetTimer(key, value interface{}, delay time.Duration) {
|
||||
if delay <= 0 || key == nil {
|
||||
return
|
||||
@@ -122,6 +129,7 @@ func (tw *TimingWheel) SetTimer(key, value interface{}, delay time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops tw.
|
||||
func (tw *TimingWheel) Stop() {
|
||||
close(tw.stopChannel)
|
||||
}
|
||||
|
||||
32
core/stores/cache/cache.go
vendored
32
core/stores/cache/cache.go
vendored
@@ -12,11 +12,11 @@ import (
|
||||
|
||||
type (
|
||||
Cache interface {
|
||||
DelCache(keys ...string) error
|
||||
GetCache(key string, v interface{}) error
|
||||
Del(keys ...string) error
|
||||
Get(key string, v interface{}) error
|
||||
IsNotFound(err error) bool
|
||||
SetCache(key string, v interface{}) error
|
||||
SetCacheWithExpire(key string, v interface{}, expire time.Duration) error
|
||||
Set(key string, v interface{}) error
|
||||
SetWithExpire(key string, v interface{}, expire time.Duration) error
|
||||
Take(v interface{}, key string, query func(v interface{}) error) error
|
||||
TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error
|
||||
}
|
||||
@@ -27,19 +27,19 @@ type (
|
||||
}
|
||||
)
|
||||
|
||||
func NewCache(c ClusterConf, barrier syncx.SharedCalls, st *CacheStat, errNotFound error,
|
||||
func New(c ClusterConf, barrier syncx.SharedCalls, st *CacheStat, errNotFound error,
|
||||
opts ...Option) Cache {
|
||||
if len(c) == 0 || TotalWeights(c) <= 0 {
|
||||
log.Fatal("no cache nodes")
|
||||
}
|
||||
|
||||
if len(c) == 1 {
|
||||
return NewCacheNode(c[0].NewRedis(), barrier, st, errNotFound, opts...)
|
||||
return NewNode(c[0].NewRedis(), barrier, st, errNotFound, opts...)
|
||||
}
|
||||
|
||||
dispatcher := hash.NewConsistentHash()
|
||||
for _, node := range c {
|
||||
cn := NewCacheNode(node.NewRedis(), barrier, st, errNotFound, opts...)
|
||||
cn := NewNode(node.NewRedis(), barrier, st, errNotFound, opts...)
|
||||
dispatcher.AddWithWeight(cn, node.Weight)
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ func NewCache(c ClusterConf, barrier syncx.SharedCalls, st *CacheStat, errNotFou
|
||||
}
|
||||
}
|
||||
|
||||
func (cc cacheCluster) DelCache(keys ...string) error {
|
||||
func (cc cacheCluster) Del(keys ...string) error {
|
||||
switch len(keys) {
|
||||
case 0:
|
||||
return nil
|
||||
@@ -60,7 +60,7 @@ func (cc cacheCluster) DelCache(keys ...string) error {
|
||||
return cc.errNotFound
|
||||
}
|
||||
|
||||
return c.(Cache).DelCache(key)
|
||||
return c.(Cache).Del(key)
|
||||
default:
|
||||
var be errorx.BatchError
|
||||
nodes := make(map[interface{}][]string)
|
||||
@@ -74,7 +74,7 @@ func (cc cacheCluster) DelCache(keys ...string) error {
|
||||
nodes[c] = append(nodes[c], key)
|
||||
}
|
||||
for c, ks := range nodes {
|
||||
if err := c.(Cache).DelCache(ks...); err != nil {
|
||||
if err := c.(Cache).Del(ks...); err != nil {
|
||||
be.Add(err)
|
||||
}
|
||||
}
|
||||
@@ -83,35 +83,35 @@ func (cc cacheCluster) DelCache(keys ...string) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (cc cacheCluster) GetCache(key string, v interface{}) error {
|
||||
func (cc cacheCluster) Get(key string, v interface{}) error {
|
||||
c, ok := cc.dispatcher.Get(key)
|
||||
if !ok {
|
||||
return cc.errNotFound
|
||||
}
|
||||
|
||||
return c.(Cache).GetCache(key, v)
|
||||
return c.(Cache).Get(key, v)
|
||||
}
|
||||
|
||||
func (cc cacheCluster) IsNotFound(err error) bool {
|
||||
return err == cc.errNotFound
|
||||
}
|
||||
|
||||
func (cc cacheCluster) SetCache(key string, v interface{}) error {
|
||||
func (cc cacheCluster) Set(key string, v interface{}) error {
|
||||
c, ok := cc.dispatcher.Get(key)
|
||||
if !ok {
|
||||
return cc.errNotFound
|
||||
}
|
||||
|
||||
return c.(Cache).SetCache(key, v)
|
||||
return c.(Cache).Set(key, v)
|
||||
}
|
||||
|
||||
func (cc cacheCluster) SetCacheWithExpire(key string, v interface{}, expire time.Duration) error {
|
||||
func (cc cacheCluster) SetWithExpire(key string, v interface{}, expire time.Duration) error {
|
||||
c, ok := cc.dispatcher.Get(key)
|
||||
if !ok {
|
||||
return cc.errNotFound
|
||||
}
|
||||
|
||||
return c.(Cache).SetCacheWithExpire(key, v, expire)
|
||||
return c.(Cache).SetWithExpire(key, v, expire)
|
||||
}
|
||||
|
||||
func (cc cacheCluster) Take(v interface{}, key string, query func(v interface{}) error) error {
|
||||
|
||||
60
core/stores/cache/cache_test.go
vendored
60
core/stores/cache/cache_test.go
vendored
@@ -21,7 +21,7 @@ type mockedNode struct {
|
||||
errNotFound error
|
||||
}
|
||||
|
||||
func (mc *mockedNode) DelCache(keys ...string) error {
|
||||
func (mc *mockedNode) Del(keys ...string) error {
|
||||
var be errorx.BatchError
|
||||
for _, key := range keys {
|
||||
if _, ok := mc.vals[key]; !ok {
|
||||
@@ -33,7 +33,7 @@ func (mc *mockedNode) DelCache(keys ...string) error {
|
||||
return be.Err()
|
||||
}
|
||||
|
||||
func (mc *mockedNode) GetCache(key string, v interface{}) error {
|
||||
func (mc *mockedNode) Get(key string, v interface{}) error {
|
||||
bs, ok := mc.vals[key]
|
||||
if ok {
|
||||
return json.Unmarshal(bs, v)
|
||||
@@ -46,7 +46,7 @@ func (mc *mockedNode) IsNotFound(err error) bool {
|
||||
return err == mc.errNotFound
|
||||
}
|
||||
|
||||
func (mc *mockedNode) SetCache(key string, v interface{}) error {
|
||||
func (mc *mockedNode) Set(key string, v interface{}) error {
|
||||
data, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -56,20 +56,20 @@ func (mc *mockedNode) SetCache(key string, v interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *mockedNode) SetCacheWithExpire(key string, v interface{}, expire time.Duration) error {
|
||||
return mc.SetCache(key, v)
|
||||
func (mc *mockedNode) SetWithExpire(key string, v interface{}, expire time.Duration) error {
|
||||
return mc.Set(key, v)
|
||||
}
|
||||
|
||||
func (mc *mockedNode) Take(v interface{}, key string, query func(v interface{}) error) error {
|
||||
if _, ok := mc.vals[key]; ok {
|
||||
return mc.GetCache(key, v)
|
||||
return mc.Get(key, v)
|
||||
}
|
||||
|
||||
if err := query(v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return mc.SetCache(key, v)
|
||||
return mc.Set(key, v)
|
||||
}
|
||||
|
||||
func (mc *mockedNode) TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error {
|
||||
@@ -102,26 +102,26 @@ func TestCache_SetDel(t *testing.T) {
|
||||
Weight: 100,
|
||||
},
|
||||
}
|
||||
c := NewCache(conf, syncx.NewSharedCalls(), NewCacheStat("mock"), errPlaceholder)
|
||||
c := New(conf, syncx.NewSharedCalls(), NewCacheStat("mock"), errPlaceholder)
|
||||
for i := 0; i < total; i++ {
|
||||
if i%2 == 0 {
|
||||
assert.Nil(t, c.SetCache(fmt.Sprintf("key/%d", i), i))
|
||||
assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i))
|
||||
} else {
|
||||
assert.Nil(t, c.SetCacheWithExpire(fmt.Sprintf("key/%d", i), i, 0))
|
||||
assert.Nil(t, c.SetWithExpire(fmt.Sprintf("key/%d", i), i, 0))
|
||||
}
|
||||
}
|
||||
for i := 0; i < total; i++ {
|
||||
var v int
|
||||
assert.Nil(t, c.GetCache(fmt.Sprintf("key/%d", i), &v))
|
||||
assert.Nil(t, c.Get(fmt.Sprintf("key/%d", i), &v))
|
||||
assert.Equal(t, i, v)
|
||||
}
|
||||
assert.Nil(t, c.DelCache())
|
||||
assert.Nil(t, c.Del())
|
||||
for i := 0; i < total; i++ {
|
||||
assert.Nil(t, c.DelCache(fmt.Sprintf("key/%d", i)))
|
||||
assert.Nil(t, c.Del(fmt.Sprintf("key/%d", i)))
|
||||
}
|
||||
for i := 0; i < total; i++ {
|
||||
var v int
|
||||
assert.True(t, c.IsNotFound(c.GetCache(fmt.Sprintf("key/%d", i), &v)))
|
||||
assert.True(t, c.IsNotFound(c.Get(fmt.Sprintf("key/%d", i), &v)))
|
||||
assert.Equal(t, 0, v)
|
||||
}
|
||||
}
|
||||
@@ -140,26 +140,26 @@ func TestCache_OneNode(t *testing.T) {
|
||||
Weight: 100,
|
||||
},
|
||||
}
|
||||
c := NewCache(conf, syncx.NewSharedCalls(), NewCacheStat("mock"), errPlaceholder)
|
||||
c := New(conf, syncx.NewSharedCalls(), NewCacheStat("mock"), errPlaceholder)
|
||||
for i := 0; i < total; i++ {
|
||||
if i%2 == 0 {
|
||||
assert.Nil(t, c.SetCache(fmt.Sprintf("key/%d", i), i))
|
||||
assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i))
|
||||
} else {
|
||||
assert.Nil(t, c.SetCacheWithExpire(fmt.Sprintf("key/%d", i), i, 0))
|
||||
assert.Nil(t, c.SetWithExpire(fmt.Sprintf("key/%d", i), i, 0))
|
||||
}
|
||||
}
|
||||
for i := 0; i < total; i++ {
|
||||
var v int
|
||||
assert.Nil(t, c.GetCache(fmt.Sprintf("key/%d", i), &v))
|
||||
assert.Nil(t, c.Get(fmt.Sprintf("key/%d", i), &v))
|
||||
assert.Equal(t, i, v)
|
||||
}
|
||||
assert.Nil(t, c.DelCache())
|
||||
assert.Nil(t, c.Del())
|
||||
for i := 0; i < total; i++ {
|
||||
assert.Nil(t, c.DelCache(fmt.Sprintf("key/%d", i)))
|
||||
assert.Nil(t, c.Del(fmt.Sprintf("key/%d", i)))
|
||||
}
|
||||
for i := 0; i < total; i++ {
|
||||
var v int
|
||||
assert.True(t, c.IsNotFound(c.GetCache(fmt.Sprintf("key/%d", i), &v)))
|
||||
assert.True(t, c.IsNotFound(c.Get(fmt.Sprintf("key/%d", i), &v)))
|
||||
assert.Equal(t, 0, v)
|
||||
}
|
||||
}
|
||||
@@ -188,7 +188,7 @@ func TestCache_Balance(t *testing.T) {
|
||||
errNotFound: errPlaceholder,
|
||||
}
|
||||
for i := 0; i < total; i++ {
|
||||
assert.Nil(t, c.SetCache(strconv.Itoa(i), i))
|
||||
assert.Nil(t, c.Set(strconv.Itoa(i), i))
|
||||
}
|
||||
|
||||
counts := make(map[int]int)
|
||||
@@ -201,13 +201,13 @@ func TestCache_Balance(t *testing.T) {
|
||||
|
||||
for i := 0; i < total; i++ {
|
||||
var v int
|
||||
assert.Nil(t, c.GetCache(strconv.Itoa(i), &v))
|
||||
assert.Nil(t, c.Get(strconv.Itoa(i), &v))
|
||||
assert.Equal(t, i, v)
|
||||
}
|
||||
|
||||
for i := 0; i < total/10; i++ {
|
||||
assert.Nil(t, c.DelCache(strconv.Itoa(i*10), strconv.Itoa(i*10+1), strconv.Itoa(i*10+2)))
|
||||
assert.Nil(t, c.DelCache(strconv.Itoa(i*10+9)))
|
||||
assert.Nil(t, c.Del(strconv.Itoa(i*10), strconv.Itoa(i*10+1), strconv.Itoa(i*10+2)))
|
||||
assert.Nil(t, c.Del(strconv.Itoa(i*10+9)))
|
||||
}
|
||||
|
||||
var count int
|
||||
@@ -237,11 +237,11 @@ func TestCacheNoNode(t *testing.T) {
|
||||
dispatcher: dispatcher,
|
||||
errNotFound: errPlaceholder,
|
||||
}
|
||||
assert.NotNil(t, c.DelCache("foo"))
|
||||
assert.NotNil(t, c.DelCache("foo", "bar", "any"))
|
||||
assert.NotNil(t, c.GetCache("foo", nil))
|
||||
assert.NotNil(t, c.SetCache("foo", nil))
|
||||
assert.NotNil(t, c.SetCacheWithExpire("foo", nil, time.Second))
|
||||
assert.NotNil(t, c.Del("foo"))
|
||||
assert.NotNil(t, c.Del("foo", "bar", "any"))
|
||||
assert.NotNil(t, c.Get("foo", nil))
|
||||
assert.NotNil(t, c.Set("foo", nil))
|
||||
assert.NotNil(t, c.SetWithExpire("foo", nil, time.Second))
|
||||
assert.NotNil(t, c.Take(nil, "foo", func(v interface{}) error {
|
||||
return nil
|
||||
}))
|
||||
|
||||
18
core/stores/cache/cachenode.go
vendored
18
core/stores/cache/cachenode.go
vendored
@@ -37,13 +37,13 @@ type cacheNode struct {
|
||||
errNotFound error
|
||||
}
|
||||
|
||||
// NewCacheNode returns a cacheNode.
|
||||
// NewNode returns a cacheNode.
|
||||
// rds is the underlying redis node or cluster.
|
||||
// barrier is the barrier that maybe shared with other cache nodes on cache cluster.
|
||||
// st is used to stat the cache.
|
||||
// errNotFound defines the error that returned on cache not found.
|
||||
// opts are the options that customize the cacheNode.
|
||||
func NewCacheNode(rds *redis.Redis, barrier syncx.SharedCalls, st *CacheStat,
|
||||
func NewNode(rds *redis.Redis, barrier syncx.SharedCalls, st *CacheStat,
|
||||
errNotFound error, opts ...Option) Cache {
|
||||
o := newOptions(opts...)
|
||||
return cacheNode{
|
||||
@@ -60,7 +60,7 @@ func NewCacheNode(rds *redis.Redis, barrier syncx.SharedCalls, st *CacheStat,
|
||||
}
|
||||
|
||||
// DelCache deletes cached values with keys.
|
||||
func (c cacheNode) DelCache(keys ...string) error {
|
||||
func (c cacheNode) Del(keys ...string) error {
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -74,7 +74,7 @@ func (c cacheNode) DelCache(keys ...string) error {
|
||||
}
|
||||
|
||||
// GetCache gets the cache with key and fills into v.
|
||||
func (c cacheNode) GetCache(key string, v interface{}) error {
|
||||
func (c cacheNode) Get(key string, v interface{}) error {
|
||||
err := c.doGetCache(key, v)
|
||||
if err == errPlaceholder {
|
||||
return c.errNotFound
|
||||
@@ -89,12 +89,12 @@ func (c cacheNode) IsNotFound(err error) bool {
|
||||
}
|
||||
|
||||
// SetCache sets the cache with key and v, using c.expiry.
|
||||
func (c cacheNode) SetCache(key string, v interface{}) error {
|
||||
return c.SetCacheWithExpire(key, v, c.aroundDuration(c.expiry))
|
||||
func (c cacheNode) Set(key string, v interface{}) error {
|
||||
return c.SetWithExpire(key, v, c.aroundDuration(c.expiry))
|
||||
}
|
||||
|
||||
// SetCacheWithExpire sets the cache with key and v, using given expire.
|
||||
func (c cacheNode) SetCacheWithExpire(key string, v interface{}, expire time.Duration) error {
|
||||
func (c cacheNode) SetWithExpire(key string, v interface{}, expire time.Duration) error {
|
||||
data, err := jsonx.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -112,7 +112,7 @@ func (c cacheNode) String() string {
|
||||
// query from DB and set cache using c.expiry, then return the result.
|
||||
func (c cacheNode) Take(v interface{}, key string, query func(v interface{}) error) error {
|
||||
return c.doTake(v, key, query, func(v interface{}) error {
|
||||
return c.SetCache(key, v)
|
||||
return c.Set(key, v)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -124,7 +124,7 @@ func (c cacheNode) TakeWithExpire(v interface{}, key string, query func(v interf
|
||||
return c.doTake(v, key, func(v interface{}) error {
|
||||
return query(v, expire)
|
||||
}, func(v interface{}) error {
|
||||
return c.SetCacheWithExpire(key, v, expire)
|
||||
return c.SetWithExpire(key, v, expire)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
30
core/stores/cache/cachenode_test.go
vendored
30
core/stores/cache/cachenode_test.go
vendored
@@ -39,14 +39,14 @@ func TestCacheNode_DelCache(t *testing.T) {
|
||||
stat: NewCacheStat("any"),
|
||||
errNotFound: errTestNotFound,
|
||||
}
|
||||
assert.Nil(t, cn.DelCache())
|
||||
assert.Nil(t, cn.DelCache([]string{}...))
|
||||
assert.Nil(t, cn.DelCache(make([]string, 0)...))
|
||||
cn.SetCache("first", "one")
|
||||
assert.Nil(t, cn.DelCache("first"))
|
||||
cn.SetCache("first", "one")
|
||||
cn.SetCache("second", "two")
|
||||
assert.Nil(t, cn.DelCache("first", "second"))
|
||||
assert.Nil(t, cn.Del())
|
||||
assert.Nil(t, cn.Del([]string{}...))
|
||||
assert.Nil(t, cn.Del(make([]string, 0)...))
|
||||
cn.Set("first", "one")
|
||||
assert.Nil(t, cn.Del("first"))
|
||||
cn.Set("first", "one")
|
||||
cn.Set("second", "two")
|
||||
assert.Nil(t, cn.Del("first", "second"))
|
||||
}
|
||||
|
||||
func TestCacheNode_InvalidCache(t *testing.T) {
|
||||
@@ -64,7 +64,7 @@ func TestCacheNode_InvalidCache(t *testing.T) {
|
||||
}
|
||||
s.Set("any", "value")
|
||||
var str string
|
||||
assert.NotNil(t, cn.GetCache("any", &str))
|
||||
assert.NotNil(t, cn.Get("any", &str))
|
||||
assert.Equal(t, "", str)
|
||||
_, err = s.Get("any")
|
||||
assert.Equal(t, miniredis.ErrKeyNotFound, err)
|
||||
@@ -91,7 +91,7 @@ func TestCacheNode_Take(t *testing.T) {
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "value", str)
|
||||
assert.Nil(t, cn.GetCache("any", &str))
|
||||
assert.Nil(t, cn.Get("any", &str))
|
||||
val, err := store.Get("any")
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, `"value"`, val)
|
||||
@@ -116,7 +116,7 @@ func TestCacheNode_TakeNotFound(t *testing.T) {
|
||||
return errTestNotFound
|
||||
})
|
||||
assert.True(t, cn.IsNotFound(err))
|
||||
assert.True(t, cn.IsNotFound(cn.GetCache("any", &str)))
|
||||
assert.True(t, cn.IsNotFound(cn.Get("any", &str)))
|
||||
val, err := store.Get("any")
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, `*`, val)
|
||||
@@ -126,7 +126,7 @@ func TestCacheNode_TakeNotFound(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
assert.True(t, cn.IsNotFound(err))
|
||||
assert.True(t, cn.IsNotFound(cn.GetCache("any", &str)))
|
||||
assert.True(t, cn.IsNotFound(cn.Get("any", &str)))
|
||||
|
||||
store.Del("any")
|
||||
var errDummy = errors.New("dummy")
|
||||
@@ -157,7 +157,7 @@ func TestCacheNode_TakeWithExpire(t *testing.T) {
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "value", str)
|
||||
assert.Nil(t, cn.GetCache("any", &str))
|
||||
assert.Nil(t, cn.Get("any", &str))
|
||||
val, err := store.Get("any")
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, `"value"`, val)
|
||||
@@ -200,8 +200,8 @@ func TestCacheValueWithBigInt(t *testing.T) {
|
||||
value int64 = 323427211229009810
|
||||
)
|
||||
|
||||
assert.Nil(t, cn.SetCache(key, value))
|
||||
assert.Nil(t, cn.Set(key, value))
|
||||
var val interface{}
|
||||
assert.Nil(t, cn.GetCache(key, &val))
|
||||
assert.Nil(t, cn.Get(key, &val))
|
||||
assert.Equal(t, strconv.FormatInt(value, 10), fmt.Sprintf("%v", val))
|
||||
}
|
||||
|
||||
@@ -36,11 +36,11 @@ func (c *cachedCollection) Count(query interface{}) (int, error) {
|
||||
}
|
||||
|
||||
func (c *cachedCollection) DelCache(keys ...string) error {
|
||||
return c.cache.DelCache(keys...)
|
||||
return c.cache.Del(keys...)
|
||||
}
|
||||
|
||||
func (c *cachedCollection) GetCache(key string, v interface{}) error {
|
||||
return c.cache.GetCache(key, v)
|
||||
return c.cache.Get(key, v)
|
||||
}
|
||||
|
||||
func (c *cachedCollection) FindAllNoCache(v interface{}, query interface{}, opts ...QueryOption) error {
|
||||
@@ -125,7 +125,7 @@ func (c *cachedCollection) RemoveIdNoCache(id interface{}) error {
|
||||
}
|
||||
|
||||
func (c *cachedCollection) SetCache(key string, v interface{}) error {
|
||||
return c.cache.SetCache(key, v)
|
||||
return c.cache.Set(key, v)
|
||||
}
|
||||
|
||||
func (c *cachedCollection) Update(selector, update interface{}, keys ...string) error {
|
||||
|
||||
@@ -34,7 +34,7 @@ func TestCollection_Count(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
defer clean()
|
||||
|
||||
cach := cache.NewCacheNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
||||
cach := cache.NewNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
||||
c := newCollection(dummyConn{}, cach)
|
||||
val, err := c.Count("any")
|
||||
assert.Nil(t, err)
|
||||
@@ -98,7 +98,7 @@ func TestStat(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
defer clean()
|
||||
|
||||
cach := cache.NewCacheNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
||||
cach := cache.NewNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
||||
c := newCollection(dummyConn{}, cach)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
@@ -121,7 +121,7 @@ func TestStatCacheFails(t *testing.T) {
|
||||
defer log.SetOutput(os.Stdout)
|
||||
|
||||
r := redis.NewRedis("localhost:59999", redis.NodeType)
|
||||
cach := cache.NewCacheNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
||||
cach := cache.NewNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
||||
c := newCollection(dummyConn{}, cach)
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
@@ -142,7 +142,7 @@ func TestStatDbFails(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
defer clean()
|
||||
|
||||
cach := cache.NewCacheNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
||||
cach := cache.NewNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
||||
c := newCollection(dummyConn{}, cach)
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
@@ -164,7 +164,7 @@ func TestStatFromMemory(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
defer clean()
|
||||
|
||||
cach := cache.NewCacheNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
||||
cach := cache.NewNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
||||
c := newCollection(dummyConn{}, cach)
|
||||
|
||||
var all sync.WaitGroup
|
||||
|
||||
@@ -34,14 +34,14 @@ func MustNewModel(url, collection string, c cache.CacheConf, opts ...cache.Optio
|
||||
}
|
||||
|
||||
func NewNodeModel(url, collection string, rds *redis.Redis, opts ...cache.Option) (*Model, error) {
|
||||
c := cache.NewCacheNode(rds, sharedCalls, stats, mgo.ErrNotFound, opts...)
|
||||
c := cache.NewNode(rds, sharedCalls, stats, mgo.ErrNotFound, opts...)
|
||||
return createModel(url, collection, c, func(collection mongo.Collection) *cachedCollection {
|
||||
return newCollection(collection, c)
|
||||
})
|
||||
}
|
||||
|
||||
func NewModel(url, collection string, conf cache.CacheConf, opts ...cache.Option) (*Model, error) {
|
||||
c := cache.NewCache(conf, sharedCalls, stats, mgo.ErrNotFound, opts...)
|
||||
c := cache.New(conf, sharedCalls, stats, mgo.ErrNotFound, opts...)
|
||||
return createModel(url, collection, c, func(collection mongo.Collection) *cachedCollection {
|
||||
return newCollection(collection, c)
|
||||
})
|
||||
@@ -54,11 +54,11 @@ func (mm *Model) Count(query interface{}) (int, error) {
|
||||
}
|
||||
|
||||
func (mm *Model) DelCache(keys ...string) error {
|
||||
return mm.cache.DelCache(keys...)
|
||||
return mm.cache.Del(keys...)
|
||||
}
|
||||
|
||||
func (mm *Model) GetCache(key string, v interface{}) error {
|
||||
return mm.cache.GetCache(key, v)
|
||||
return mm.cache.Get(key, v)
|
||||
}
|
||||
|
||||
func (mm *Model) GetCollection(session *mgo.Session) *cachedCollection {
|
||||
@@ -144,7 +144,7 @@ func (mm *Model) RemoveIdNoCache(id interface{}) error {
|
||||
}
|
||||
|
||||
func (mm *Model) SetCache(key string, v interface{}) error {
|
||||
return mm.cache.SetCache(key, v)
|
||||
return mm.cache.Set(key, v)
|
||||
}
|
||||
|
||||
func (mm *Model) Update(selector, update interface{}, keys ...string) error {
|
||||
|
||||
@@ -36,23 +36,23 @@ type (
|
||||
func NewNodeConn(db sqlx.SqlConn, rds *redis.Redis, opts ...cache.Option) CachedConn {
|
||||
return CachedConn{
|
||||
db: db,
|
||||
cache: cache.NewCacheNode(rds, exclusiveCalls, stats, sql.ErrNoRows, opts...),
|
||||
cache: cache.NewNode(rds, exclusiveCalls, stats, sql.ErrNoRows, opts...),
|
||||
}
|
||||
}
|
||||
|
||||
func NewConn(db sqlx.SqlConn, c cache.CacheConf, opts ...cache.Option) CachedConn {
|
||||
return CachedConn{
|
||||
db: db,
|
||||
cache: cache.NewCache(c, exclusiveCalls, stats, sql.ErrNoRows, opts...),
|
||||
cache: cache.New(c, exclusiveCalls, stats, sql.ErrNoRows, opts...),
|
||||
}
|
||||
}
|
||||
|
||||
func (cc CachedConn) DelCache(keys ...string) error {
|
||||
return cc.cache.DelCache(keys...)
|
||||
return cc.cache.Del(keys...)
|
||||
}
|
||||
|
||||
func (cc CachedConn) GetCache(key string, v interface{}) error {
|
||||
return cc.cache.GetCache(key, v)
|
||||
return cc.cache.Get(key, v)
|
||||
}
|
||||
|
||||
func (cc CachedConn) Exec(exec ExecFn, keys ...string) (sql.Result, error) {
|
||||
@@ -90,7 +90,7 @@ func (cc CachedConn) QueryRowIndex(v interface{}, key string, keyer func(primary
|
||||
}
|
||||
|
||||
found = true
|
||||
return cc.cache.SetCacheWithExpire(keyer(primaryKey), v, expire+cacheSafeGapBetweenIndexAndPrimary)
|
||||
return cc.cache.SetWithExpire(keyer(primaryKey), v, expire+cacheSafeGapBetweenIndexAndPrimary)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -114,7 +114,7 @@ func (cc CachedConn) QueryRowsNoCache(v interface{}, q string, args ...interface
|
||||
}
|
||||
|
||||
func (cc CachedConn) SetCache(key string, v interface{}) error {
|
||||
return cc.cache.SetCache(key, v)
|
||||
return cc.cache.Set(key, v)
|
||||
}
|
||||
|
||||
func (cc CachedConn) Transact(fn func(sqlx.Session) error) error {
|
||||
|
||||
Reference in New Issue
Block a user