engine: graph: Handle the back poke differently
A back poke is the deferral or delay of a Process/CheckApply. This is because we notice that we're not truly ready to CheckApply due to some timestamp issue. When Process errors, we should accept that, but not treat it as a success.
This commit is contained in:
@@ -26,4 +26,7 @@ func (e Error) Error() string { return string(e) }
|
|||||||
const (
|
const (
|
||||||
// ErrClosed means we couldn't complete a task because we had closed.
|
// ErrClosed means we couldn't complete a task because we had closed.
|
||||||
ErrClosed = Error("closed")
|
ErrClosed = Error("closed")
|
||||||
|
|
||||||
|
// ErrBackPoke means we're postponing due to a needed backpoke.
|
||||||
|
ErrBackPoke = Error("backpoke")
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -85,7 +85,9 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
|
|||||||
|
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
return nil // can't continue until timestamp is in sequence
|
|
||||||
|
// can't continue until timestamp is in sequence, defer for now
|
||||||
|
return engine.ErrBackPoke
|
||||||
}
|
}
|
||||||
|
|
||||||
// semaphores!
|
// semaphores!
|
||||||
@@ -564,14 +566,22 @@ Loop:
|
|||||||
if obj.Debug {
|
if obj.Debug {
|
||||||
obj.Logf("Process(%s)", vertex)
|
obj.Logf("Process(%s)", vertex)
|
||||||
}
|
}
|
||||||
|
backPoke := false
|
||||||
err = obj.Process(obj.state[vertex].doneCtx, vertex)
|
err = obj.Process(obj.state[vertex].doneCtx, vertex)
|
||||||
if obj.Debug {
|
if err == engine.ErrBackPoke {
|
||||||
|
backPoke = true
|
||||||
|
err = nil // for future code safety
|
||||||
|
}
|
||||||
|
if obj.Debug && backPoke {
|
||||||
|
obj.Logf("Process(%s): BackPoke!", vertex)
|
||||||
|
}
|
||||||
|
if obj.Debug && !backPoke {
|
||||||
obj.Logf("Process(%s): Return(%s)", vertex, engineUtil.CleanError(err))
|
obj.Logf("Process(%s): Return(%s)", vertex, engineUtil.CleanError(err))
|
||||||
}
|
}
|
||||||
if err == nil && res.MetaParams().RetryReset { // reset it on success!
|
if err == nil && !backPoke && res.MetaParams().RetryReset { // reset it on success!
|
||||||
obj.metas[engine.PtrUID(res)].CheckApplyRetry = res.MetaParams().Retry // lookup the retry value
|
obj.metas[engine.PtrUID(res)].CheckApplyRetry = res.MetaParams().Retry // lookup the retry value
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil || backPoke {
|
||||||
break RetryLoop
|
break RetryLoop
|
||||||
}
|
}
|
||||||
// we've got an error...
|
// we've got an error...
|
||||||
|
|||||||
Reference in New Issue
Block a user