engine: Add the retry loop around Process

This adds back the retry loop around Process. This is done as a
separate commit so you can more easily see the logic of the retry magic
This commit is similar but different to the earlier commit adding retry
around Watch.
This commit is contained in:
James Shubin
2019-02-14 16:17:52 -05:00
parent f06e87377c
commit aa165b5e17

View File

@@ -477,7 +477,46 @@ Loop:
} }
// end of limit delay // end of limit delay
// retry...
var err error var err error
var retry = res.MetaParams().Retry // lookup the retry value
var delay uint64
RetryLoop:
for { // retry loop
if delay > 0 {
timer := time.NewTimer(time.Duration(delay) * time.Millisecond)
RetryWait:
for {
select {
case <-timer.C: // the wait is over
break RetryWait
// consume other events while we're waiting...
case e, ok := <-obj.state[vertex].eventsChan: // read from watch channel
if !ok {
return reterr // we only return when chan closes
}
if e != nil {
failed = true
close(obj.state[vertex].limitDone) // causes doneChan to close
reterr = multierr.Append(reterr, e) // permanent failure
break RetryWait
}
if obj.Debug {
obj.Logf("event received in retry")
}
// TODO: does this get added in properly?
limiter.ReserveN(time.Now(), 1) // one event
}
}
timer.Stop() // it's nice to cleanup
delay = 0 // reset
obj.state[vertex].init.Logf("the CheckApply delay expired!")
}
if failed { // don't Process anymore if we've already failed...
continue Loop
}
if obj.Debug { if obj.Debug {
obj.Logf("Process(%s)", vertex) obj.Logf("Process(%s)", vertex)
} }
@@ -485,15 +524,33 @@ Loop:
if obj.Debug { if obj.Debug {
obj.Logf("Process(%s): Return(%+v)", vertex, err) obj.Logf("Process(%s): Return(%+v)", vertex, err)
} }
if err == nil {
break RetryLoop
}
// we've got an error...
delay = res.MetaParams().Delay
// It is important that we shutdown the Watch loop if this dies. if retry < 0 { // infinite retries
// If Process fails permanently, we ask it to exit right here... continue
if err != nil { }
if retry > 0 { // don't decrement past 0
retry--
obj.state[vertex].init.Logf("retrying CheckApply after %.4f seconds (%d left)", float64(delay)/1000, retry)
continue
}
//if retry == 0 { // optional
// err = errwrap.Wrapf(err, "permanent process error")
//}
// It is important that we shutdown the Watch loop if
// this dies. If Process fails permanently, we ask it
// to exit right here... (It happens when we loop...)
failed = true failed = true
close(obj.state[vertex].processDone) // causes doneChan to close close(obj.state[vertex].processDone) // causes doneChan to close
reterr = multierr.Append(reterr, err) // permanent failure reterr = multierr.Append(reterr, err) // permanent failure
continue continue
}
} // retry loop
// When this Process loop exits, it's because something has // When this Process loop exits, it's because something has
// caused Watch() to shutdown (even if it's our permanent // caused Watch() to shutdown (even if it's our permanent