diff --git a/core/executors/periodicalexecutor.go b/core/executors/periodicalexecutor.go index 6ceea15c..91dcc847 100644 --- a/core/executors/periodicalexecutor.go +++ b/core/executors/periodicalexecutor.go @@ -93,15 +93,12 @@ func (pe *PeriodicalExecutor) Wait() { func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) { pe.lock.Lock() defer func() { - var start bool if !pe.guarded { pe.guarded = true - start = true + // defer to unlock quickly + defer pe.backgroundFlush() } pe.lock.Unlock() - if start { - pe.backgroundFlush() - } }() if pe.container.AddTask(task) { @@ -148,14 +145,12 @@ func (pe *PeriodicalExecutor) cleanup() (stop bool) { pe.guarded = false if atomic.LoadInt32(&pe.inflight) == 0 { stop = true + // defer to unlock quickly + // flush again to avoid missing tasks + defer pe.Flush() } pe.lock.Unlock() - if stop { - // flush again to avoid missing tasks - pe.Flush() - } - return }