diff --git a/core/stores/cache/cachenode_test.go b/core/stores/cache/cachenode_test.go index 6e16a102..f286639b 100644 --- a/core/stores/cache/cachenode_test.go +++ b/core/stores/cache/cachenode_test.go @@ -1,6 +1,3 @@ -//go:build !race - -// Disable data race detection is because of the timingWheel in cacheNode. package cache import ( @@ -58,16 +55,16 @@ func TestCacheNode_DelCache(t *testing.T) { }) t.Run("del cache with errors", func(t *testing.T) { - old := timingWheel + old := timingWheel.Load() ticker := timex.NewFakeTicker() - var err error - timingWheel, err = collection.NewTimingWheelWithTicker( + tw, err := collection.NewTimingWheelWithTicker( time.Millisecond, timingWheelSlots, func(key, value any) { clean(key, value) }, ticker) + timingWheel.Store(tw) assert.NoError(t, err) t.Cleanup(func() { - timingWheel = old + timingWheel.Store(old) }) r, err := miniredis.Run() diff --git a/core/stores/cache/cleaner.go b/core/stores/cache/cleaner.go index ce0b50a4..24ec5780 100644 --- a/core/stores/cache/cleaner.go +++ b/core/stores/cache/cleaner.go @@ -2,6 +2,7 @@ package cache import ( "fmt" + "sync/atomic" "time" "github.com/zeromicro/go-zero/core/collection" @@ -19,7 +20,8 @@ const ( ) var ( - timingWheel *collection.TimingWheel + // use atomic to avoid data race in unit tests + timingWheel atomic.Value taskRunner = threading.NewTaskRunner(cleanWorkers) ) @@ -30,22 +32,27 @@ type delayTask struct { } func init() { - var err error - timingWheel, err = collection.NewTimingWheel(time.Second, timingWheelSlots, clean) + tw, err := collection.NewTimingWheel(time.Second, timingWheelSlots, clean) logx.Must(err) + timingWheel.Store(tw) proc.AddShutdownListener(func() { - timingWheel.Drain(clean) + if err := tw.Drain(clean); err != nil { + logx.Errorf("failed to drain timing wheel: %v", err) + } }) } // AddCleanTask adds a clean task on given keys. func AddCleanTask(task func() error, keys ...string) { - timingWheel.SetTimer(stringx.Randn(taskKeyLen), delayTask{ + tw := timingWheel.Load().(*collection.TimingWheel) + if err := tw.SetTimer(stringx.Randn(taskKeyLen), delayTask{ delay: time.Second, task: task, keys: keys, - }, time.Second) + }, time.Second); err != nil { + logx.Errorf("failed to set timer for keys: %q, error: %v", formatKeys(keys), err) + } } func clean(key, value any) { @@ -59,7 +66,10 @@ func clean(key, value any) { next, ok := nextDelay(dt.delay) if ok { dt.delay = next - timingWheel.SetTimer(key, dt, next) + tw := timingWheel.Load().(*collection.TimingWheel) + if err = tw.SetTimer(key, dt, next); err != nil { + logx.Errorf("failed to set timer for key: %s, error: %v", key, err) + } } else { msg := fmt.Sprintf("retried but failed to clear cache with keys: %q, error: %v", formatKeys(dt.keys), err) diff --git a/core/stores/cache/cleaner_test.go b/core/stores/cache/cleaner_test.go index 73f6b3f5..1497b9eb 100644 --- a/core/stores/cache/cleaner_test.go +++ b/core/stores/cache/cleaner_test.go @@ -5,7 +5,9 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/zeromicro/go-zero/core/collection" "github.com/zeromicro/go-zero/core/proc" + "github.com/zeromicro/go-zero/core/timex" ) func TestNextDelay(t *testing.T) { @@ -49,6 +51,18 @@ func TestNextDelay(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + old := timingWheel.Load() + ticker := timex.NewFakeTicker() + tw, err := collection.NewTimingWheelWithTicker( + time.Millisecond, timingWheelSlots, func(key, value any) { + clean(key, value) + }, ticker) + timingWheel.Store(tw) + assert.NoError(t, err) + t.Cleanup(func() { + timingWheel.Store(old) + }) + next, ok := nextDelay(test.input) assert.Equal(t, test.ok, ok) assert.Equal(t, test.output, next)