engine: graph: Cleanup pause/resume code

There's always the fear that there is either a panic or a deadlock in
the highly concurrent engine resource code. I have not seen one recently
and I've been running some pretty concurrent tests. In the meantime, and
with my hopefully improved knowledge of concurrency, I decided to
rewrite some of the "uglier" parts of the engine. I think it is a lot
clearer now, and much less likely that there is a concurrency issue.

This has been tested by running the examples/lang/fastcount.mcl example.
This commit is contained in:
James Shubin
2023-08-30 21:17:05 -04:00
parent 2edae22a65
commit 2773a621a2
3 changed files with 82 additions and 55 deletions

View File

@@ -364,8 +364,14 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
var reserv *rate.Reservation
var reterr error
var failed bool // has Process permanently failed?
var closed bool // has the resumeSignal channel closed?
Loop:
for { // process loop
// This is the main select where things happen and where we exit
// from. It's similar to the two "satellite" select's which we
// might spend some time in if we're retrying or rate limiting.
// This select is also the main event receiver and is also the
// only place where we read from the poke channel.
select {
case err, ok := <-obj.state[vertex].eventsChan: // read from watch channel
if !ok {
@@ -394,9 +400,30 @@ Loop:
obj.Logf("poke received")
}
reserv = nil // we didn't receive a real event here...
}
if failed { // don't Process anymore if we've already failed...
continue Loop
case _, ok := <-obj.state[vertex].pauseSignal: // one message
if !ok {
obj.state[vertex].pauseSignal = nil
continue // this is not a new pause message
}
// NOTE: If we allowed a doneCtx below to let us out
// of the resumeSignal wait, then we could loop around
// and run this again, causing a panic. Instead of this
// being made safe with a sync.Once, we instead run a
// close() call inside of the vertexRemoveFn function,
// which should unblock resumeSignal so we can shutdown.
obj.state[vertex].pausedAck.Ack() // send ack
// we are paused now, and waiting for resume or exit...
select {
case _, closed = <-obj.state[vertex].resumeSignal: // channel closes
// resumed!
// pass through to allow a Process to try to run
// TODO: consider adding this fast pause here...
//if obj.fastPause {
// obj.Logf("fast pausing on resume")
// continue
//}
}
}
// drop redundant pokes
@@ -408,31 +435,8 @@ Loop:
}
}
// pause if one was requested...
select {
case <-obj.state[vertex].pauseSignal: // channel closes
// NOTE: If we allowed a doneCtx below to let us out
// of the resumeSignal wait, then we could loop around
// and run this again, causing a panic. Instead of this
// being made safe with a sync.Once, we instead run a
// Resume() call inside of the vertexRemoveFn function,
// which should unblock it when we're going to need to.
obj.state[vertex].pausedAck.Ack() // send ack
// we are paused now, and waiting for resume or exit...
select {
case <-obj.state[vertex].resumeSignal: // channel closes
// resumed!
// pass through to allow a Process to try to run
// TODO: consider adding this fast pause here...
//if obj.fastPause {
// obj.Logf("fast pausing on resume")
// continue
//}
}
default:
// no pause requested, keep going...
}
if failed { // don't Process anymore if we've already failed...
// don't Process anymore if we've already failed or shutdown...
if failed || closed {
continue Loop
}
@@ -446,6 +450,9 @@ Loop:
timer := time.NewTimer(time.Duration(d) * time.Millisecond)
LimitWait:
for {
// This "satellite" select doesn't need a poke
// channel because we're already in "event
// received" mode, and poke doesn't block.
select {
case <-timer.C: // the wait is over
break LimitWait
@@ -471,7 +478,8 @@ Loop:
timer.Stop() // it's nice to cleanup
obj.state[vertex].init.Logf("rate limiting expired!")
}
if failed { // don't Process anymore if we've already failed...
// don't Process anymore if we've already failed or shutdown...
if failed || closed {
continue Loop
}
// end of limit delay
@@ -486,6 +494,10 @@ Loop:
timer := time.NewTimer(time.Duration(delay) * time.Millisecond)
RetryWait:
for {
// This "satellite" select doesn't need
// a poke channel because we're already
// in "event received" mode, and poke
// doesn't block.
select {
case <-timer.C: // the wait is over
break RetryWait
@@ -512,7 +524,8 @@ Loop:
delay = 0 // reset
obj.state[vertex].init.Logf("the CheckApply delay expired!")
}
if failed { // don't Process anymore if we've already failed...
// don't Process anymore if we've already failed or shutdown...
if failed || closed {
continue Loop
}