engine: Add the retry loop around Watch

This adds back the retry loop around Watch. This is done as a separate
commit so you can more easily see the logic of the retry magic.
This commit is contained in:
James Shubin
2019-02-14 15:01:22 -05:00
parent 253ed78cc6
commit 4c3bf9fc7a

View File

@@ -286,12 +286,61 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
close(obj.state[vertex].doneChan) close(obj.state[vertex].doneChan)
}() }()
var err error
var retry = res.MetaParams().Retry // lookup the retry value
var delay uint64
for { // retry loop
// a retry-delay was requested, wait, but don't block events!
if delay > 0 {
errDelayExpired := engine.Error("delay exit")
err = func() error { // slim watch main loop
timer := time.NewTimer(time.Duration(delay) * time.Millisecond)
defer obj.state[vertex].init.Logf("the Watch delay expired!")
defer timer.Stop() // it's nice to cleanup
for {
select {
case <-timer.C: // the wait is over
return errDelayExpired // special
case <-obj.state[vertex].init.Done:
return nil
}
}
}()
if err == errDelayExpired {
delay = 0 // reset
continue
}
} else if interval := res.MetaParams().Poll; interval > 0 { // poll instead of watching :(
obj.state[vertex].cuid.StartTimer()
err = obj.state[vertex].poll(interval)
obj.state[vertex].cuid.StopTimer() // clean up nicely
} else {
obj.state[vertex].cuid.StartTimer()
obj.Logf("Watch(%s)", vertex) obj.Logf("Watch(%s)", vertex)
err := res.Watch() // run the watch normally err = res.Watch() // run the watch normally
obj.Logf("Watch(%s): Exited(%+v)", vertex, err) obj.Logf("Watch(%s): Exited(%+v)", vertex, err)
obj.state[vertex].cuid.StopTimer() // clean up nicely
}
if err == nil { // || err == engine.ErrClosed if err == nil { // || err == engine.ErrClosed
return // exited cleanly, we're done return // exited cleanly, we're done
} }
// we've got an error...
delay = res.MetaParams().Delay
if retry < 0 { // infinite retries
continue
}
if retry > 0 { // don't decrement past 0
retry--
obj.state[vertex].init.Logf("retrying Watch after %.4f seconds (%d left)", float64(delay)/1000, retry)
continue
}
//if retry == 0 { // optional
// err = errwrap.Wrapf(err, "permanent watch error")
//}
break // break out of this and send the error
} // for retry loop
// this section sends an error... // this section sends an error...
// If the CheckApply loop exits and THEN the Watch fails with an // If the CheckApply loop exits and THEN the Watch fails with an