engine: Add limit delay before Process can run
This adds back the limit delay around Process.
This commit is contained in:
@@ -28,6 +28,7 @@ import (
|
|||||||
|
|
||||||
multierr "github.com/hashicorp/go-multierror"
|
multierr "github.com/hashicorp/go-multierror"
|
||||||
errwrap "github.com/pkg/errors"
|
errwrap "github.com/pkg/errors"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OKTimestamp returns true if this vertex can run right now.
|
// OKTimestamp returns true if this vertex can run right now.
|
||||||
@@ -250,6 +251,11 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
|
|||||||
return fmt.Errorf("vertex is not a resource")
|
return fmt.Errorf("vertex is not a resource")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// bonus safety check
|
||||||
|
if res.MetaParams().Burst == 0 && !(res.MetaParams().Limit == rate.Inf) { // blocked
|
||||||
|
return fmt.Errorf("permanently limited (rate != Inf, burst = 0)")
|
||||||
|
}
|
||||||
|
|
||||||
//defer close(obj.state[vertex].stopped) // done signal
|
//defer close(obj.state[vertex].stopped) // done signal
|
||||||
|
|
||||||
obj.state[vertex].cuid = obj.Converger.Register()
|
obj.state[vertex].cuid = obj.Converger.Register()
|
||||||
@@ -278,6 +284,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
|
|||||||
select {
|
select {
|
||||||
case <-obj.state[vertex].processDone:
|
case <-obj.state[vertex].processDone:
|
||||||
case <-obj.state[vertex].watchDone:
|
case <-obj.state[vertex].watchDone:
|
||||||
|
case <-obj.state[vertex].limitDone:
|
||||||
case <-obj.state[vertex].removeDone:
|
case <-obj.state[vertex].removeDone:
|
||||||
case <-obj.state[vertex].eventsDone:
|
case <-obj.state[vertex].eventsDone:
|
||||||
}
|
}
|
||||||
@@ -354,6 +361,8 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
|
|||||||
// If this exits cleanly, we must unblock the reverse-multiplexer.
|
// If this exits cleanly, we must unblock the reverse-multiplexer.
|
||||||
// I think this additional close is unnecessary, but it's not harmful.
|
// I think this additional close is unnecessary, but it's not harmful.
|
||||||
defer close(obj.state[vertex].eventsDone) // causes doneChan to close
|
defer close(obj.state[vertex].eventsDone) // causes doneChan to close
|
||||||
|
limiter := rate.NewLimiter(res.MetaParams().Limit, res.MetaParams().Burst)
|
||||||
|
var reserv *rate.Reservation
|
||||||
var reterr error
|
var reterr error
|
||||||
var failed bool // has Process permanently failed?
|
var failed bool // has Process permanently failed?
|
||||||
Loop:
|
Loop:
|
||||||
@@ -375,6 +384,8 @@ Loop:
|
|||||||
if obj.Debug {
|
if obj.Debug {
|
||||||
obj.Logf("event received")
|
obj.Logf("event received")
|
||||||
}
|
}
|
||||||
|
reserv = limiter.ReserveN(time.Now(), 1) // one event
|
||||||
|
// reserv.OK() seems to always be true here!
|
||||||
|
|
||||||
case _, ok := <-obj.state[vertex].pokeChan: // read from buffered poke channel
|
case _, ok := <-obj.state[vertex].pokeChan: // read from buffered poke channel
|
||||||
if !ok { // we never close it
|
if !ok { // we never close it
|
||||||
@@ -383,6 +394,7 @@ Loop:
|
|||||||
if obj.Debug {
|
if obj.Debug {
|
||||||
obj.Logf("poke received")
|
obj.Logf("poke received")
|
||||||
}
|
}
|
||||||
|
reserv = nil // we didn't receive a real event here...
|
||||||
}
|
}
|
||||||
if failed { // don't Process anymore if we've already failed...
|
if failed { // don't Process anymore if we've already failed...
|
||||||
continue Loop
|
continue Loop
|
||||||
@@ -421,6 +433,49 @@ Loop:
|
|||||||
default:
|
default:
|
||||||
// no pause requested, keep going...
|
// no pause requested, keep going...
|
||||||
}
|
}
|
||||||
|
if failed { // don't Process anymore if we've already failed...
|
||||||
|
continue Loop
|
||||||
|
}
|
||||||
|
|
||||||
|
// limit delay
|
||||||
|
d := time.Duration(0)
|
||||||
|
if reserv != nil {
|
||||||
|
d = reserv.DelayFrom(time.Now())
|
||||||
|
}
|
||||||
|
if reserv != nil && d > 0 { // delay
|
||||||
|
obj.state[vertex].init.Logf("limited (rate: %v/sec, burst: %d, next: %v)", res.MetaParams().Limit, res.MetaParams().Burst, d)
|
||||||
|
timer := time.NewTimer(time.Duration(d) * time.Millisecond)
|
||||||
|
LimitWait:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-timer.C: // the wait is over
|
||||||
|
break LimitWait
|
||||||
|
|
||||||
|
// consume other events while we're waiting...
|
||||||
|
case e, ok := <-obj.state[vertex].eventsChan: // read from watch channel
|
||||||
|
if !ok {
|
||||||
|
return reterr // we only return when chan closes
|
||||||
|
}
|
||||||
|
if e != nil {
|
||||||
|
failed = true
|
||||||
|
close(obj.state[vertex].limitDone) // causes doneChan to close
|
||||||
|
reterr = multierr.Append(reterr, e) // permanent failure
|
||||||
|
break LimitWait
|
||||||
|
}
|
||||||
|
if obj.Debug {
|
||||||
|
obj.Logf("event received in limit")
|
||||||
|
}
|
||||||
|
// TODO: does this get added in properly?
|
||||||
|
limiter.ReserveN(time.Now(), 1) // one event
|
||||||
|
}
|
||||||
|
}
|
||||||
|
timer.Stop() // it's nice to cleanup
|
||||||
|
obj.state[vertex].init.Logf("rate limiting expired!")
|
||||||
|
}
|
||||||
|
if failed { // don't Process anymore if we've already failed...
|
||||||
|
continue Loop
|
||||||
|
}
|
||||||
|
// end of limit delay
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
if obj.Debug {
|
if obj.Debug {
|
||||||
|
|||||||
@@ -69,7 +69,11 @@ type State struct {
|
|||||||
processDone chan struct{}
|
processDone chan struct{}
|
||||||
// watchDone is closed when the Watch function fails permanently, and we
|
// watchDone is closed when the Watch function fails permanently, and we
|
||||||
// close this to signal we should definitely exit. (Often redundant.)
|
// close this to signal we should definitely exit. (Often redundant.)
|
||||||
watchDone chan struct{}
|
watchDone chan struct{} // could be shared with limitDone
|
||||||
|
// limitDone is closed when the Watch function fails permanently, and we
|
||||||
|
// close this to signal we should definitely exit. This happens inside
|
||||||
|
// of the limit loop of the Process section of Worker.
|
||||||
|
limitDone chan struct{} // could be shared with watchDone
|
||||||
// removeDone is closed when the vertexRemoveFn method asks for an exit.
|
// removeDone is closed when the vertexRemoveFn method asks for an exit.
|
||||||
// This happens when we're switching graphs. The switch to an "empty" is
|
// This happens when we're switching graphs. The switch to an "empty" is
|
||||||
// the equivalent of asking for a final shutdown.
|
// the equivalent of asking for a final shutdown.
|
||||||
@@ -131,6 +135,7 @@ func (obj *State) Init() error {
|
|||||||
|
|
||||||
obj.processDone = make(chan struct{})
|
obj.processDone = make(chan struct{})
|
||||||
obj.watchDone = make(chan struct{})
|
obj.watchDone = make(chan struct{})
|
||||||
|
obj.limitDone = make(chan struct{})
|
||||||
obj.removeDone = make(chan struct{})
|
obj.removeDone = make(chan struct{})
|
||||||
obj.eventsDone = make(chan struct{})
|
obj.eventsDone = make(chan struct{})
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user