resources: rate limiting: Implement resource rate limiting
This adds rate limiting with the limit and burst meta parameters. The limits apply to how often the Process check is called. As a result, it might get called more often than there are Watch events due to possible Poke/BackPoke events. This system might need to get rethought in the future depending on its usefulness.
This commit is contained in:
@@ -468,6 +468,19 @@ of `K` seconds to still converge when `J <= K`, as long as `I > J || I > K`,
|
||||
which is another way of saying that if the resource finally settles down to give
|
||||
the graph enough time, it can probably converge.
|
||||
|
||||
#### Limit
|
||||
Float. Maximum rate of `CheckApply` runs started per second. Useful to limit
|
||||
an especially _eventful_ process from causing excessive checks to run. This
|
||||
defaults to `+Infinity` which adds no limiting. If you change this value, you
|
||||
will also need to change the `Burst` value to a non-zero value. Please see the
|
||||
[rate](https://godoc.org/golang.org/x/time/rate) package for more information.
|
||||
|
||||
#### Burst
|
||||
Integer. Burst is the maximum number of runs which can happen without invoking
|
||||
the rate limiter as designated by the `Limit` value. If the `Limit` is not set
|
||||
to `+Infinity`, this must be a non-zero value. Please see the
|
||||
[rate](https://godoc.org/golang.org/x/time/rate) package for more information.
|
||||
|
||||
### Graph definition file
|
||||
graph.yaml is the compiled graph definition file. The format is currently
|
||||
undocumented, but by looking through the [examples/](https://github.com/purpleidea/mgmt/tree/master/examples)
|
||||
|
||||
13
examples/file4.yaml
Normal file
13
examples/file4.yaml
Normal file
@@ -0,0 +1,13 @@
|
||||
---
|
||||
graph: mygraph
|
||||
resources:
|
||||
file:
|
||||
- name: file1
|
||||
path: "/tmp/mgmt/f1"
|
||||
meta:
|
||||
limit: 0.5
|
||||
burst: 3
|
||||
content: |
|
||||
i am f1
|
||||
state: exists
|
||||
edges: []
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
|
||||
multierr "github.com/hashicorp/go-multierror"
|
||||
errwrap "github.com/pkg/errors"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// GetTimestamp returns the timestamp of a vertex
|
||||
@@ -327,6 +328,8 @@ func (g *Graph) Worker(v *Vertex) error {
|
||||
|
||||
var delay = time.Duration(v.Meta().Delay) * time.Millisecond
|
||||
var retry = v.Meta().Retry // number of tries left, -1 for infinite
|
||||
var limiter = rate.NewLimiter(v.Meta().Limit, v.Meta().Burst)
|
||||
limited := false
|
||||
|
||||
Loop:
|
||||
for {
|
||||
@@ -347,6 +350,35 @@ func (g *Graph) Worker(v *Vertex) error {
|
||||
continue
|
||||
}
|
||||
|
||||
// catch invalid rates
|
||||
if v.Meta().Burst == 0 && !(v.Meta().Limit == rate.Inf) { // blocked
|
||||
e := fmt.Errorf("%s[%s]: Permanently limited (rate != Inf, burst: 0)", v.Kind(), v.GetName())
|
||||
v.SendEvent(event.EventExit, &SentinelErr{e})
|
||||
ev.ACK() // ready for next message
|
||||
continue
|
||||
}
|
||||
|
||||
// rate limit
|
||||
// FIXME: consider skipping rate limit check if
|
||||
// the event is a poke instead of a watch event
|
||||
if !limited && !(v.Meta().Limit == rate.Inf) { // skip over the playback event...
|
||||
now := time.Now()
|
||||
r := limiter.ReserveN(now, 1) // one event
|
||||
// r.OK() seems to always be true here!
|
||||
d := r.DelayFrom(now)
|
||||
if d > 0 { // delay
|
||||
limited = true
|
||||
playback = true
|
||||
log.Printf("%s[%s]: Limited (rate: %v/sec, burst: %d, next: %v)", v.Kind(), v.GetName(), v.Meta().Limit, v.Meta().Burst, d)
|
||||
// start the timer...
|
||||
timer.Reset(d)
|
||||
waiting = true // waiting for retry timer
|
||||
ev.ACK()
|
||||
continue
|
||||
} // otherwise, we run directly!
|
||||
}
|
||||
limited = false // let one through
|
||||
|
||||
running = true
|
||||
go func(ev *event.Event) {
|
||||
if e := g.Process(v); e != nil {
|
||||
|
||||
@@ -34,6 +34,7 @@ import (
|
||||
"github.com/purpleidea/mgmt/event"
|
||||
|
||||
errwrap "github.com/pkg/errors"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
//go:generate stringer -type=ResState -output=resstate_stringer.go
|
||||
@@ -93,9 +94,11 @@ type MetaParams struct {
|
||||
// NOTE: there are separate Watch and CheckApply retry and delay values,
|
||||
// but I've decided to use the same ones for both until there's a proper
|
||||
// reason to want to do something differently for the Watch errors.
|
||||
Retry int16 `yaml:"retry"` // metaparam, number of times to retry on error. -1 for infinite
|
||||
Delay uint64 `yaml:"delay"` // metaparam, number of milliseconds to wait between retries
|
||||
Poll uint32 `yaml:"poll"` // metaparam, number of seconds between poll interval, 0 to watch.
|
||||
Retry int16 `yaml:"retry"` // metaparam, number of times to retry on error. -1 for infinite
|
||||
Delay uint64 `yaml:"delay"` // metaparam, number of milliseconds to wait between retries
|
||||
Poll uint32 `yaml:"poll"` // metaparam, number of seconds between poll intervals, 0 to watch
|
||||
Limit rate.Limit `yaml:"limit"` // metaparam, number of events per second to allow through
|
||||
Burst int `yaml:"burst"` // metaparam, number of events to allow in a burst
|
||||
}
|
||||
|
||||
// UnmarshalYAML is the custom unmarshal handler for the MetaParams struct. It
|
||||
@@ -117,9 +120,11 @@ var DefaultMetaParams = MetaParams{
|
||||
AutoEdge: true,
|
||||
AutoGroup: true,
|
||||
Noop: false,
|
||||
Retry: 0, // TODO: is this a good default?
|
||||
Delay: 0, // TODO: is this a good default?
|
||||
Poll: 0, // defaults to watching for events
|
||||
Retry: 0, // TODO: is this a good default?
|
||||
Delay: 0, // TODO: is this a good default?
|
||||
Poll: 0, // defaults to watching for events
|
||||
Limit: rate.Inf, // defaults to no limit
|
||||
Burst: 0, // no burst needed on an infinite rate // TODO: is this a good default?
|
||||
}
|
||||
|
||||
// The Base interface is everything that is common to all resources.
|
||||
@@ -244,6 +249,9 @@ func (obj *BaseUID) Reversed() bool {
|
||||
|
||||
// Validate reports any problems with the struct definition.
|
||||
func (obj *BaseRes) Validate() error {
|
||||
if obj.Meta().Burst == 0 && !(obj.Meta().Limit == rate.Inf) { // blocked
|
||||
return fmt.Errorf("Permanently limited (rate != Inf, burst: 0)")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -437,6 +445,12 @@ func (obj *BaseRes) Compare(res Res) bool {
|
||||
if obj.Meta().Poll != res.Meta().Poll {
|
||||
return false
|
||||
}
|
||||
if obj.Meta().Limit != res.Meta().Limit {
|
||||
return false
|
||||
}
|
||||
if obj.Meta().Burst != res.Meta().Burst {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user