From 5bc01e4bfd28cd205182702332b87aaf3679043b Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sun, 3 Jan 2021 19:54:11 +0800 Subject: [PATCH] set guarded to false only on quitting background flush (#342) * set guarded to false only on quitting background flush * set guarded to false only on quitting background flush, cont. --- core/executors/periodicalexecutor.go | 32 ++++++++++++---------------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/core/executors/periodicalexecutor.go b/core/executors/periodicalexecutor.go index 91dcc847..2a2990c8 100644 --- a/core/executors/periodicalexecutor.go +++ b/core/executors/periodicalexecutor.go @@ -111,6 +111,9 @@ func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) func (pe *PeriodicalExecutor) backgroundFlush() { threading.GoSafe(func() { + // flush before quit goroutine to avoid missing tasks + defer pe.Flush() + ticker := pe.newTicker(pe.interval) defer ticker.Stop() @@ -130,30 +133,17 @@ func (pe *PeriodicalExecutor) backgroundFlush() { commanded = false } else if pe.Flush() { last = timex.Now() - } else if timex.Since(last) > pe.interval*idleRound { - if pe.cleanup() { - return - } + } else if pe.shallQuit(last) { + pe.lock.Lock() + pe.guarded = false + pe.lock.Unlock() + return } } } }) } -func (pe *PeriodicalExecutor) cleanup() (stop bool) { - pe.lock.Lock() - pe.guarded = false - if atomic.LoadInt32(&pe.inflight) == 0 { - stop = true - // defer to unlock quickly - // flush again to avoid missing tasks - defer pe.Flush() - } - pe.lock.Unlock() - - return -} - func (pe *PeriodicalExecutor) doneExecution() { pe.waitGroup.Done() } @@ -189,3 +179,9 @@ func (pe *PeriodicalExecutor) hasTasks(tasks interface{}) bool { return true } } + +func (pe *PeriodicalExecutor) shallQuit(last time.Duration) bool { + idleEnough := timex.Since(last) > pe.interval*idleRound + noPending := atomic.LoadInt32(&pe.inflight) == 0 + return idleEnough && noPending +}