engine: resources, graph: Change the done channel into a ctx

This is part one of porting Watch to context.
This commit is contained in:
James Shubin
2023-08-07 19:44:41 -04:00
parent 5eac48094b
commit 53a878bf61
34 changed files with 73 additions and 66 deletions

View File

@@ -272,7 +272,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
defer close(obj.state[vertex].eventsChan) // we close this on behalf of res
// This is a close reverse-multiplexer. If any of the channels
// close, then it will cause the doneChan to close. That way,
// close, then it will cause the doneCtx to cancel. That way,
// multiple different folks can send a close signal, without
// every worrying about duplicate channel close panics.
obj.state[vertex].wg.Add(1)
@@ -289,7 +289,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
}
// the main "done" signal gets activated here!
close(obj.state[vertex].doneChan)
obj.state[vertex].doneCtxCancel() // cancels doneCtx
}()
var err error
@@ -308,7 +308,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
case <-timer.C: // the wait is over
return errDelayExpired // special
case <-obj.state[vertex].init.Done:
case <-obj.state[vertex].init.DoneCtx.Done():
return nil
}
}
@@ -359,7 +359,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
// If this exits cleanly, we must unblock the reverse-multiplexer.
// I think this additional close is unnecessary, but it's not harmful.
defer close(obj.state[vertex].eventsDone) // causes doneChan to close
defer close(obj.state[vertex].eventsDone) // causes doneCtx to cancel
limiter := rate.NewLimiter(res.MetaParams().Limit, res.MetaParams().Burst)
var reserv *rate.Reservation
var reterr error
@@ -376,7 +376,7 @@ Loop:
// we then save so we can return it to the caller of us.
if err != nil {
failed = true
close(obj.state[vertex].watchDone) // causes doneChan to close
close(obj.state[vertex].watchDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, err) // permanent failure
continue
}
@@ -411,7 +411,7 @@ Loop:
// pause if one was requested...
select {
case <-obj.state[vertex].pauseSignal: // channel closes
// NOTE: If we allowed a doneChan below to let us out
// NOTE: If we allowed a doneCtx below to let us out
// of the resumeSignal wait, then we could loop around
// and run this again, causing a panic. Instead of this
// being made safe with a sync.Once, we instead run a
@@ -457,7 +457,7 @@ Loop:
}
if e != nil {
failed = true
close(obj.state[vertex].limitDone) // causes doneChan to close
close(obj.state[vertex].limitDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, e) // permanent failure
break LimitWait
}
@@ -497,7 +497,7 @@ Loop:
}
if e != nil {
failed = true
close(obj.state[vertex].limitDone) // causes doneChan to close
close(obj.state[vertex].limitDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, e) // permanent failure
break RetryWait
}
@@ -545,7 +545,7 @@ Loop:
// this dies. If Process fails permanently, we ask it
// to exit right here... (It happens when we loop...)
failed = true
close(obj.state[vertex].processDone) // causes doneChan to close
close(obj.state[vertex].processDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, err) // permanent failure
continue