From 4c3bf9fc7ad689716df2aafd660f3180097ce699 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Thu, 14 Feb 2019 15:01:22 -0500 Subject: [PATCH] engine: Add the retry loop around Watch This adds back the retry loop around Watch. This is done as a separate commit so you can more easily see the logic of the retry magic. --- engine/graph/actions.go | 61 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 31d73c6b..5c5d5f89 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -286,12 +286,61 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { close(obj.state[vertex].doneChan) }() - obj.Logf("Watch(%s)", vertex) - err := res.Watch() // run the watch normally - obj.Logf("Watch(%s): Exited(%+v)", vertex, err) - if err == nil { // || err == engine.ErrClosed - return // exited cleanly, we're done - } + var err error + var retry = res.MetaParams().Retry // lookup the retry value + var delay uint64 + for { // retry loop + // a retry-delay was requested, wait, but don't block events! + if delay > 0 { + errDelayExpired := engine.Error("delay exit") + err = func() error { // slim watch main loop + timer := time.NewTimer(time.Duration(delay) * time.Millisecond) + defer obj.state[vertex].init.Logf("the Watch delay expired!") + defer timer.Stop() // it's nice to cleanup + for { + select { + case <-timer.C: // the wait is over + return errDelayExpired // special + + case <-obj.state[vertex].init.Done: + return nil + } + } + }() + if err == errDelayExpired { + delay = 0 // reset + continue + } + } else if interval := res.MetaParams().Poll; interval > 0 { // poll instead of watching :( + obj.state[vertex].cuid.StartTimer() + err = obj.state[vertex].poll(interval) + obj.state[vertex].cuid.StopTimer() // clean up nicely + } else { + obj.state[vertex].cuid.StartTimer() + obj.Logf("Watch(%s)", vertex) + err = res.Watch() // run the watch normally + obj.Logf("Watch(%s): Exited(%+v)", vertex, err) + obj.state[vertex].cuid.StopTimer() // clean up nicely + } + if err == nil { // || err == engine.ErrClosed + return // exited cleanly, we're done + } + // we've got an error... + delay = res.MetaParams().Delay + + if retry < 0 { // infinite retries + continue + } + if retry > 0 { // don't decrement past 0 + retry-- + obj.state[vertex].init.Logf("retrying Watch after %.4f seconds (%d left)", float64(delay)/1000, retry) + continue + } + //if retry == 0 { // optional + // err = errwrap.Wrapf(err, "permanent watch error") + //} + break // break out of this and send the error + } // for retry loop // this section sends an error... // If the CheckApply loop exits and THEN the Watch fails with an