From f06e87377cd7f2bec07717679640e82f6a9c6746 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Thu, 14 Feb 2019 15:51:27 -0500 Subject: [PATCH] engine: Add limit delay before Process can run This adds back the limit delay around Process. --- engine/graph/actions.go | 55 +++++++++++++++++++++++++++++++++++++++++ engine/graph/state.go | 7 +++++- 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 5c5d5f89..bce9f16b 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -28,6 +28,7 @@ import ( multierr "github.com/hashicorp/go-multierror" errwrap "github.com/pkg/errors" + "golang.org/x/time/rate" ) // OKTimestamp returns true if this vertex can run right now. @@ -250,6 +251,11 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { return fmt.Errorf("vertex is not a resource") } + // bonus safety check + if res.MetaParams().Burst == 0 && !(res.MetaParams().Limit == rate.Inf) { // blocked + return fmt.Errorf("permanently limited (rate != Inf, burst = 0)") + } + //defer close(obj.state[vertex].stopped) // done signal obj.state[vertex].cuid = obj.Converger.Register() @@ -278,6 +284,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { select { case <-obj.state[vertex].processDone: case <-obj.state[vertex].watchDone: + case <-obj.state[vertex].limitDone: case <-obj.state[vertex].removeDone: case <-obj.state[vertex].eventsDone: } @@ -354,6 +361,8 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { // If this exits cleanly, we must unblock the reverse-multiplexer. // I think this additional close is unnecessary, but it's not harmful. defer close(obj.state[vertex].eventsDone) // causes doneChan to close + limiter := rate.NewLimiter(res.MetaParams().Limit, res.MetaParams().Burst) + var reserv *rate.Reservation var reterr error var failed bool // has Process permanently failed? Loop: @@ -375,6 +384,8 @@ Loop: if obj.Debug { obj.Logf("event received") } + reserv = limiter.ReserveN(time.Now(), 1) // one event + // reserv.OK() seems to always be true here! case _, ok := <-obj.state[vertex].pokeChan: // read from buffered poke channel if !ok { // we never close it @@ -383,6 +394,7 @@ Loop: if obj.Debug { obj.Logf("poke received") } + reserv = nil // we didn't receive a real event here... } if failed { // don't Process anymore if we've already failed... continue Loop @@ -421,6 +433,49 @@ Loop: default: // no pause requested, keep going... } + if failed { // don't Process anymore if we've already failed... + continue Loop + } + + // limit delay + d := time.Duration(0) + if reserv != nil { + d = reserv.DelayFrom(time.Now()) + } + if reserv != nil && d > 0 { // delay + obj.state[vertex].init.Logf("limited (rate: %v/sec, burst: %d, next: %v)", res.MetaParams().Limit, res.MetaParams().Burst, d) + timer := time.NewTimer(time.Duration(d) * time.Millisecond) + LimitWait: + for { + select { + case <-timer.C: // the wait is over + break LimitWait + + // consume other events while we're waiting... + case e, ok := <-obj.state[vertex].eventsChan: // read from watch channel + if !ok { + return reterr // we only return when chan closes + } + if e != nil { + failed = true + close(obj.state[vertex].limitDone) // causes doneChan to close + reterr = multierr.Append(reterr, e) // permanent failure + break LimitWait + } + if obj.Debug { + obj.Logf("event received in limit") + } + // TODO: does this get added in properly? + limiter.ReserveN(time.Now(), 1) // one event + } + } + timer.Stop() // it's nice to cleanup + obj.state[vertex].init.Logf("rate limiting expired!") + } + if failed { // don't Process anymore if we've already failed... + continue Loop + } + // end of limit delay var err error if obj.Debug { diff --git a/engine/graph/state.go b/engine/graph/state.go index 4c98e0ba..a13d63a4 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -69,7 +69,11 @@ type State struct { processDone chan struct{} // watchDone is closed when the Watch function fails permanently, and we // close this to signal we should definitely exit. (Often redundant.) - watchDone chan struct{} + watchDone chan struct{} // could be shared with limitDone + // limitDone is closed when the Watch function fails permanently, and we + // close this to signal we should definitely exit. This happens inside + // of the limit loop of the Process section of Worker. + limitDone chan struct{} // could be shared with watchDone // removeDone is closed when the vertexRemoveFn method asks for an exit. // This happens when we're switching graphs. The switch to an "empty" is // the equivalent of asking for a final shutdown. @@ -131,6 +135,7 @@ func (obj *State) Init() error { obj.processDone = make(chan struct{}) obj.watchDone = make(chan struct{}) + obj.limitDone = make(chan struct{}) obj.removeDone = make(chan struct{}) obj.eventsDone = make(chan struct{})