chore: remove directive for tests (#3257)
This commit is contained in:
11
core/stores/cache/cachenode_test.go
vendored
11
core/stores/cache/cachenode_test.go
vendored
@@ -1,6 +1,3 @@
|
|||||||
//go:build !race
|
|
||||||
|
|
||||||
// Disable data race detection is because of the timingWheel in cacheNode.
|
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -58,16 +55,16 @@ func TestCacheNode_DelCache(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("del cache with errors", func(t *testing.T) {
|
t.Run("del cache with errors", func(t *testing.T) {
|
||||||
old := timingWheel
|
old := timingWheel.Load()
|
||||||
ticker := timex.NewFakeTicker()
|
ticker := timex.NewFakeTicker()
|
||||||
var err error
|
tw, err := collection.NewTimingWheelWithTicker(
|
||||||
timingWheel, err = collection.NewTimingWheelWithTicker(
|
|
||||||
time.Millisecond, timingWheelSlots, func(key, value any) {
|
time.Millisecond, timingWheelSlots, func(key, value any) {
|
||||||
clean(key, value)
|
clean(key, value)
|
||||||
}, ticker)
|
}, ticker)
|
||||||
|
timingWheel.Store(tw)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
timingWheel = old
|
timingWheel.Store(old)
|
||||||
})
|
})
|
||||||
|
|
||||||
r, err := miniredis.Run()
|
r, err := miniredis.Run()
|
||||||
|
|||||||
24
core/stores/cache/cleaner.go
vendored
24
core/stores/cache/cleaner.go
vendored
@@ -2,6 +2,7 @@ package cache
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/zeromicro/go-zero/core/collection"
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
@@ -19,7 +20,8 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
timingWheel *collection.TimingWheel
|
// use atomic to avoid data race in unit tests
|
||||||
|
timingWheel atomic.Value
|
||||||
taskRunner = threading.NewTaskRunner(cleanWorkers)
|
taskRunner = threading.NewTaskRunner(cleanWorkers)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -30,22 +32,27 @@ type delayTask struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
var err error
|
tw, err := collection.NewTimingWheel(time.Second, timingWheelSlots, clean)
|
||||||
timingWheel, err = collection.NewTimingWheel(time.Second, timingWheelSlots, clean)
|
|
||||||
logx.Must(err)
|
logx.Must(err)
|
||||||
|
timingWheel.Store(tw)
|
||||||
|
|
||||||
proc.AddShutdownListener(func() {
|
proc.AddShutdownListener(func() {
|
||||||
timingWheel.Drain(clean)
|
if err := tw.Drain(clean); err != nil {
|
||||||
|
logx.Errorf("failed to drain timing wheel: %v", err)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddCleanTask adds a clean task on given keys.
|
// AddCleanTask adds a clean task on given keys.
|
||||||
func AddCleanTask(task func() error, keys ...string) {
|
func AddCleanTask(task func() error, keys ...string) {
|
||||||
timingWheel.SetTimer(stringx.Randn(taskKeyLen), delayTask{
|
tw := timingWheel.Load().(*collection.TimingWheel)
|
||||||
|
if err := tw.SetTimer(stringx.Randn(taskKeyLen), delayTask{
|
||||||
delay: time.Second,
|
delay: time.Second,
|
||||||
task: task,
|
task: task,
|
||||||
keys: keys,
|
keys: keys,
|
||||||
}, time.Second)
|
}, time.Second); err != nil {
|
||||||
|
logx.Errorf("failed to set timer for keys: %q, error: %v", formatKeys(keys), err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func clean(key, value any) {
|
func clean(key, value any) {
|
||||||
@@ -59,7 +66,10 @@ func clean(key, value any) {
|
|||||||
next, ok := nextDelay(dt.delay)
|
next, ok := nextDelay(dt.delay)
|
||||||
if ok {
|
if ok {
|
||||||
dt.delay = next
|
dt.delay = next
|
||||||
timingWheel.SetTimer(key, dt, next)
|
tw := timingWheel.Load().(*collection.TimingWheel)
|
||||||
|
if err = tw.SetTimer(key, dt, next); err != nil {
|
||||||
|
logx.Errorf("failed to set timer for key: %s, error: %v", key, err)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
msg := fmt.Sprintf("retried but failed to clear cache with keys: %q, error: %v",
|
msg := fmt.Sprintf("retried but failed to clear cache with keys: %q, error: %v",
|
||||||
formatKeys(dt.keys), err)
|
formatKeys(dt.keys), err)
|
||||||
|
|||||||
14
core/stores/cache/cleaner_test.go
vendored
14
core/stores/cache/cleaner_test.go
vendored
@@ -5,7 +5,9 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
"github.com/zeromicro/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNextDelay(t *testing.T) {
|
func TestNextDelay(t *testing.T) {
|
||||||
@@ -49,6 +51,18 @@ func TestNextDelay(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
old := timingWheel.Load()
|
||||||
|
ticker := timex.NewFakeTicker()
|
||||||
|
tw, err := collection.NewTimingWheelWithTicker(
|
||||||
|
time.Millisecond, timingWheelSlots, func(key, value any) {
|
||||||
|
clean(key, value)
|
||||||
|
}, ticker)
|
||||||
|
timingWheel.Store(tw)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
timingWheel.Store(old)
|
||||||
|
})
|
||||||
|
|
||||||
next, ok := nextDelay(test.input)
|
next, ok := nextDelay(test.input)
|
||||||
assert.Equal(t, test.ok, ok)
|
assert.Equal(t, test.ok, ok)
|
||||||
assert.Equal(t, test.output, next)
|
assert.Equal(t, test.output, next)
|
||||||
|
|||||||
Reference in New Issue
Block a user