diff --git a/core/executors/bulkexecutor.go b/core/executors/bulkexecutor.go index 4b7e6184..c1de4b5c 100644 --- a/core/executors/bulkexecutor.go +++ b/core/executors/bulkexecutor.go @@ -5,8 +5,12 @@ import "time" const defaultBulkTasks = 1000 type ( + // BulkOption defines the method to customize a BulkExecutor. BulkOption func(options *bulkOptions) + // A BulkExecutor is an executor that can execute tasks on either requirement meets: + // 1. up to given size of tasks + // 2. flush interval time elapsed BulkExecutor struct { executor *PeriodicalExecutor container *bulkContainer @@ -18,6 +22,7 @@ type ( } ) +// NewBulkExecutor returns a BulkExecutor. func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor { options := newBulkOptions() for _, opt := range opts { @@ -36,25 +41,30 @@ func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor { return executor } +// Add adds task into be. func (be *BulkExecutor) Add(task interface{}) error { be.executor.Add(task) return nil } +// Flush forces be to flush and execute tasks. func (be *BulkExecutor) Flush() { be.executor.Flush() } +// Wait waits be to done with the task execution. func (be *BulkExecutor) Wait() { be.executor.Wait() } +// WithBulkTasks customizes a BulkExecutor with given tasks limit. func WithBulkTasks(tasks int) BulkOption { return func(options *bulkOptions) { options.cachedTasks = tasks } } +// WithBulkInterval customizes a BulkExecutor with given flush interval. func WithBulkInterval(duration time.Duration) BulkOption { return func(options *bulkOptions) { options.flushInterval = duration diff --git a/core/executors/chunkexecutor.go b/core/executors/chunkexecutor.go index 226e0e8a..b78ec339 100644 --- a/core/executors/chunkexecutor.go +++ b/core/executors/chunkexecutor.go @@ -5,8 +5,12 @@ import "time" const defaultChunkSize = 1024 * 1024 // 1M type ( + // ChunkOption defines the method to customize a ChunkExecutor. ChunkOption func(options *chunkOptions) + // A ChunkExecutor is an executor to execute tasks when either requirement meets: + // 1. up to given chunk size + // 2. flush interval elapsed ChunkExecutor struct { executor *PeriodicalExecutor container *chunkContainer @@ -18,6 +22,7 @@ type ( } ) +// NewChunkExecutor returns a ChunkExecutor. func NewChunkExecutor(execute Execute, opts ...ChunkOption) *ChunkExecutor { options := newChunkOptions() for _, opt := range opts { @@ -36,6 +41,7 @@ func NewChunkExecutor(execute Execute, opts ...ChunkOption) *ChunkExecutor { return executor } +// Add adds task with given chunk size into ce. func (ce *ChunkExecutor) Add(task interface{}, size int) error { ce.executor.Add(chunk{ val: task, @@ -44,20 +50,24 @@ func (ce *ChunkExecutor) Add(task interface{}, size int) error { return nil } +// Flush forces ce to flush and execute tasks. func (ce *ChunkExecutor) Flush() { ce.executor.Flush() } +// Wait waits the execution to be done. func (ce *ChunkExecutor) Wait() { ce.executor.Wait() } +// WithChunkBytes customizes a ChunkExecutor with the given chunk size. func WithChunkBytes(size int) ChunkOption { return func(options *chunkOptions) { options.chunkSize = size } } +// WithFlushInterval customizes a ChunkExecutor with the given flush interval. func WithFlushInterval(duration time.Duration) ChunkOption { return func(options *chunkOptions) { options.flushInterval = duration diff --git a/core/executors/delayexecutor.go b/core/executors/delayexecutor.go index bddd342b..b0e9d968 100644 --- a/core/executors/delayexecutor.go +++ b/core/executors/delayexecutor.go @@ -7,6 +7,7 @@ import ( "github.com/tal-tech/go-zero/core/threading" ) +// A DelayExecutor delays a tasks on given delay interval. type DelayExecutor struct { fn func() delay time.Duration @@ -14,6 +15,7 @@ type DelayExecutor struct { lock sync.Mutex } +// NewDelayExecutor returns a DelayExecutor with given fn and delay. func NewDelayExecutor(fn func(), delay time.Duration) *DelayExecutor { return &DelayExecutor{ fn: fn, @@ -21,6 +23,7 @@ func NewDelayExecutor(fn func(), delay time.Duration) *DelayExecutor { } } +// Trigger triggers the task to be executed after given delay, safe to trigger more than once. func (de *DelayExecutor) Trigger() { de.lock.Lock() defer de.lock.Unlock() diff --git a/core/executors/lessexecutor.go b/core/executors/lessexecutor.go index c9a3d4ea..3584ebb7 100644 --- a/core/executors/lessexecutor.go +++ b/core/executors/lessexecutor.go @@ -7,11 +7,13 @@ import ( "github.com/tal-tech/go-zero/core/timex" ) +// A LessExecutor is an executor to limit execution once within given time interval. type LessExecutor struct { threshold time.Duration lastTime *syncx.AtomicDuration } +// NewLessExecutor returns a LessExecutor with given threshold as time interval. func NewLessExecutor(threshold time.Duration) *LessExecutor { return &LessExecutor{ threshold: threshold, @@ -19,6 +21,8 @@ func NewLessExecutor(threshold time.Duration) *LessExecutor { } } +// DoOrDiscard executes or discards the task depends on if +// another task was executed within the time interval. func (le *LessExecutor) DoOrDiscard(execute func()) bool { now := timex.Now() lastTime := le.lastTime.Load() diff --git a/core/executors/periodicalexecutor.go b/core/executors/periodicalexecutor.go index 4e7be6d9..20488283 100644 --- a/core/executors/periodicalexecutor.go +++ b/core/executors/periodicalexecutor.go @@ -28,6 +28,7 @@ type ( RemoveAll() interface{} } + // A PeriodicalExecutor is an executor that periodically execute tasks. PeriodicalExecutor struct { commander chan interface{} interval time.Duration @@ -43,6 +44,7 @@ type ( } ) +// NewPeriodicalExecutor returns a PeriodicalExecutor with given interval and container. func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor { executor := &PeriodicalExecutor{ // buffer 1 to let the caller go quickly @@ -61,6 +63,7 @@ func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *Per return executor } +// Add adds tasks into pe. func (pe *PeriodicalExecutor) Add(task interface{}) { if vals, ok := pe.addAndCheck(task); ok { pe.commander <- vals @@ -68,6 +71,7 @@ func (pe *PeriodicalExecutor) Add(task interface{}) { } } +// Flush forces pe to execute tasks. func (pe *PeriodicalExecutor) Flush() bool { pe.enterExecution() return pe.executeTasks(func() interface{} { @@ -77,12 +81,14 @@ func (pe *PeriodicalExecutor) Flush() bool { }()) } +// Sync lets caller to run fn thread-safe with pe, especially for the underlying container. func (pe *PeriodicalExecutor) Sync(fn func()) { pe.lock.Lock() defer pe.lock.Unlock() fn() } +// Wait waits the execution to be done. func (pe *PeriodicalExecutor) Wait() { pe.Flush() pe.wgBarrier.Guard(func() { diff --git a/core/executors/vars.go b/core/executors/vars.go index 7d91c15c..fa0ab9d5 100644 --- a/core/executors/vars.go +++ b/core/executors/vars.go @@ -4,4 +4,5 @@ import "time" const defaultFlushInterval = time.Second +// Execute defines the method to execute tasks. type Execute func(tasks []interface{}) diff --git a/core/stores/redis/redis.go b/core/stores/redis/redis.go index 59dd1657..7fd02ea3 100644 --- a/core/stores/redis/redis.go +++ b/core/stores/redis/redis.go @@ -72,6 +72,24 @@ func NewRedis(redisAddr, redisType string, redisPass ...string) *Redis { } } +// BitCount is redis bitcount command implementation. +func (s *Redis) BitCount(key string, start, end int64) (val int64, err error) { + err = s.brk.DoWithAcceptable(func() error { + conn, err := getRedis(s) + if err != nil { + return err + } + + val, err = conn.BitCount(key, &red.BitCount{ + Start: start, + End: end, + }).Result() + return err + }, acceptable) + + return +} + // Blpop uses passed in redis connection to execute blocking queries. // Doesn't benefit from pooling redis connections of blocking queries func (s *Redis) Blpop(redisNode RedisNode, key string) (string, error) { @@ -108,23 +126,6 @@ func (s *Redis) BlpopEx(redisNode RedisNode, key string) (string, bool, error) { return vals[1], true, nil } -func (s *Redis) BitCount(key string, start, end int64) (val int64, err error) { - err = s.brk.DoWithAcceptable(func() error { - conn, err := getRedis(s) - if err != nil { - return err - } - - val, err = conn.BitCount(key, &red.BitCount{ - Start: start, - End: end, - }).Result() - return err - }, acceptable) - - return -} - func (s *Redis) Del(keys ...string) (val int, err error) { err = s.brk.DoWithAcceptable(func() error { conn, err := getRedis(s)