diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 552f523e..ee898c66 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -292,6 +292,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { case <-obj.state[vertex].processDone: case <-obj.state[vertex].watchDone: case <-obj.state[vertex].limitDone: + case <-obj.state[vertex].retryDone: case <-obj.state[vertex].removeDone: case <-obj.state[vertex].eventsDone: } @@ -528,7 +529,7 @@ Loop: } if e != nil { failed = true - close(obj.state[vertex].limitDone) // causes doneCtx to cancel + close(obj.state[vertex].retryDone) // causes doneCtx to cancel reterr = errwrap.Append(reterr, e) // permanent failure break RetryWait } diff --git a/engine/graph/state.go b/engine/graph/state.go index 1b9321a2..c759f6c2 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -73,11 +73,15 @@ type State struct { processDone chan struct{} // watchDone is closed when the Watch function fails permanently, and we // close this to signal we should definitely exit. (Often redundant.) - watchDone chan struct{} // could be shared with limitDone + watchDone chan struct{} // could be shared with limitDone or retryDone // limitDone is closed when the Watch function fails permanently, and we // close this to signal we should definitely exit. This happens inside // of the limit loop of the Process section of Worker. - limitDone chan struct{} // could be shared with watchDone + limitDone chan struct{} // could be shared with watchDone or retryDone + // retryDone is closed when the Watch function fails permanently, and we + // close this to signal we should definitely exit. This happens inside + // of the retry loop of the Process section of Worker. + retryDone chan struct{} // could be shared with watchDone or limitDone // removeDone is closed when the vertexRemoveFn method asks for an exit. // This happens when we're switching graphs. The switch to an "empty" is // the equivalent of asking for a final shutdown. @@ -141,6 +145,7 @@ func (obj *State) Init() error { obj.processDone = make(chan struct{}) obj.watchDone = make(chan struct{}) obj.limitDone = make(chan struct{}) + obj.retryDone = make(chan struct{}) obj.removeDone = make(chan struct{}) obj.eventsDone = make(chan struct{})