diff --git a/engine/error.go b/engine/error.go index 9dcff239..8fbbeb8b 100644 --- a/engine/error.go +++ b/engine/error.go @@ -26,4 +26,7 @@ func (e Error) Error() string { return string(e) } const ( // ErrClosed means we couldn't complete a task because we had closed. ErrClosed = Error("closed") + + // ErrBackPoke means we're postponing due to a needed backpoke. + ErrBackPoke = Error("backpoke") ) diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 35e8a13c..ce80c4fb 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -85,7 +85,9 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { } wg.Wait() - return nil // can't continue until timestamp is in sequence + + // can't continue until timestamp is in sequence, defer for now + return engine.ErrBackPoke } // semaphores! @@ -564,14 +566,22 @@ Loop: if obj.Debug { obj.Logf("Process(%s)", vertex) } + backPoke := false err = obj.Process(obj.state[vertex].doneCtx, vertex) - if obj.Debug { + if err == engine.ErrBackPoke { + backPoke = true + err = nil // for future code safety + } + if obj.Debug && backPoke { + obj.Logf("Process(%s): BackPoke!", vertex) + } + if obj.Debug && !backPoke { obj.Logf("Process(%s): Return(%s)", vertex, engineUtil.CleanError(err)) } - if err == nil && res.MetaParams().RetryReset { // reset it on success! + if err == nil && !backPoke && res.MetaParams().RetryReset { // reset it on success! obj.metas[engine.PtrUID(res)].CheckApplyRetry = res.MetaParams().Retry // lookup the retry value } - if err == nil { + if err == nil || backPoke { break RetryLoop } // we've got an error...