diff --git a/engine/graph/actions.go b/engine/graph/actions.go index bce9f16b..4f464bd4 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -477,23 +477,80 @@ Loop: } // end of limit delay + // retry... var err error - if obj.Debug { - obj.Logf("Process(%s)", vertex) - } - err = obj.Process(vertex) - if obj.Debug { - obj.Logf("Process(%s): Return(%+v)", vertex, err) - } + var retry = res.MetaParams().Retry // lookup the retry value + var delay uint64 + RetryLoop: + for { // retry loop + if delay > 0 { + timer := time.NewTimer(time.Duration(delay) * time.Millisecond) + RetryWait: + for { + select { + case <-timer.C: // the wait is over + break RetryWait - // It is important that we shutdown the Watch loop if this dies. - // If Process fails permanently, we ask it to exit right here... - if err != nil { + // consume other events while we're waiting... + case e, ok := <-obj.state[vertex].eventsChan: // read from watch channel + if !ok { + return reterr // we only return when chan closes + } + if e != nil { + failed = true + close(obj.state[vertex].limitDone) // causes doneChan to close + reterr = multierr.Append(reterr, e) // permanent failure + break RetryWait + } + if obj.Debug { + obj.Logf("event received in retry") + } + // TODO: does this get added in properly? + limiter.ReserveN(time.Now(), 1) // one event + } + } + timer.Stop() // it's nice to cleanup + delay = 0 // reset + obj.state[vertex].init.Logf("the CheckApply delay expired!") + } + if failed { // don't Process anymore if we've already failed... + continue Loop + } + + if obj.Debug { + obj.Logf("Process(%s)", vertex) + } + err = obj.Process(vertex) + if obj.Debug { + obj.Logf("Process(%s): Return(%+v)", vertex, err) + } + if err == nil { + break RetryLoop + } + // we've got an error... + delay = res.MetaParams().Delay + + if retry < 0 { // infinite retries + continue + } + if retry > 0 { // don't decrement past 0 + retry-- + obj.state[vertex].init.Logf("retrying CheckApply after %.4f seconds (%d left)", float64(delay)/1000, retry) + continue + } + //if retry == 0 { // optional + // err = errwrap.Wrapf(err, "permanent process error") + //} + + // It is important that we shutdown the Watch loop if + // this dies. If Process fails permanently, we ask it + // to exit right here... (It happens when we loop...) failed = true close(obj.state[vertex].processDone) // causes doneChan to close reterr = multierr.Append(reterr, err) // permanent failure continue - } + + } // retry loop // When this Process loop exits, it's because something has // caused Watch() to shutdown (even if it's our permanent