engine: graph: Give retry channel its own signal

This just makes the copy+pasting less confusing.
This commit is contained in:
James Shubin
2023-09-02 00:56:06 -04:00
parent 41493993a2
commit cf49a9a6e7
2 changed files with 9 additions and 3 deletions

View File

@@ -292,6 +292,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
case <-obj.state[vertex].processDone:
case <-obj.state[vertex].watchDone:
case <-obj.state[vertex].limitDone:
case <-obj.state[vertex].retryDone:
case <-obj.state[vertex].removeDone:
case <-obj.state[vertex].eventsDone:
}
@@ -528,7 +529,7 @@ Loop:
}
if e != nil {
failed = true
close(obj.state[vertex].limitDone) // causes doneCtx to cancel
close(obj.state[vertex].retryDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, e) // permanent failure
break RetryWait
}

View File

@@ -73,11 +73,15 @@ type State struct {
processDone chan struct{}
// watchDone is closed when the Watch function fails permanently, and we
// close this to signal we should definitely exit. (Often redundant.)
watchDone chan struct{} // could be shared with limitDone
watchDone chan struct{} // could be shared with limitDone or retryDone
// limitDone is closed when the Watch function fails permanently, and we
// close this to signal we should definitely exit. This happens inside
// of the limit loop of the Process section of Worker.
limitDone chan struct{} // could be shared with watchDone
limitDone chan struct{} // could be shared with watchDone or retryDone
// retryDone is closed when the Watch function fails permanently, and we
// close this to signal we should definitely exit. This happens inside
// of the retry loop of the Process section of Worker.
retryDone chan struct{} // could be shared with watchDone or limitDone
// removeDone is closed when the vertexRemoveFn method asks for an exit.
// This happens when we're switching graphs. The switch to an "empty" is
// the equivalent of asking for a final shutdown.
@@ -141,6 +145,7 @@ func (obj *State) Init() error {
obj.processDone = make(chan struct{})
obj.watchDone = make(chan struct{})
obj.limitDone = make(chan struct{})
obj.retryDone = make(chan struct{})
obj.removeDone = make(chan struct{})
obj.eventsDone = make(chan struct{})