From 4f34f7083ba72899174329a0378f277ccf2e8a7b Mon Sep 17 00:00:00 2001 From: James Shubin Date: Mon, 16 Jan 2017 18:53:46 -0500 Subject: [PATCH] resources: rate limiting: Implement resource rate limiting This adds rate limiting with the limit and burst meta parameters. The limits apply to how often the Process check is called. As a result, it might get called more often than there are Watch events due to possible Poke/BackPoke events. This system might need to get rethought in the future depending on its usefulness. --- docs/documentation.md | 13 +++++++++++++ examples/file4.yaml | 13 +++++++++++++ pgraph/actions.go | 32 ++++++++++++++++++++++++++++++++ resources/resources.go | 26 ++++++++++++++++++++------ 4 files changed, 78 insertions(+), 6 deletions(-) create mode 100644 examples/file4.yaml diff --git a/docs/documentation.md b/docs/documentation.md index c584b8d4..4cbcf139 100644 --- a/docs/documentation.md +++ b/docs/documentation.md @@ -468,6 +468,19 @@ of `K` seconds to still converge when `J <= K`, as long as `I > J || I > K`, which is another way of saying that if the resource finally settles down to give the graph enough time, it can probably converge. +#### Limit +Float. Maximum rate of `CheckApply` runs started per second. Useful to limit +an especially _eventful_ process from causing excessive checks to run. This +defaults to `+Infinity` which adds no limiting. If you change this value, you +will also need to change the `Burst` value to a non-zero value. Please see the +[rate](https://godoc.org/golang.org/x/time/rate) package for more information. + +#### Burst +Integer. Burst is the maximum number of runs which can happen without invoking +the rate limiter as designated by the `Limit` value. If the `Limit` is not set +to `+Infinity`, this must be a non-zero value. Please see the +[rate](https://godoc.org/golang.org/x/time/rate) package for more information. + ### Graph definition file graph.yaml is the compiled graph definition file. The format is currently undocumented, but by looking through the [examples/](https://github.com/purpleidea/mgmt/tree/master/examples) diff --git a/examples/file4.yaml b/examples/file4.yaml new file mode 100644 index 00000000..116aaa04 --- /dev/null +++ b/examples/file4.yaml @@ -0,0 +1,13 @@ +--- +graph: mygraph +resources: + file: + - name: file1 + path: "/tmp/mgmt/f1" + meta: + limit: 0.5 + burst: 3 + content: | + i am f1 + state: exists +edges: [] diff --git a/pgraph/actions.go b/pgraph/actions.go index c2d7cec8..0a4cea60 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -29,6 +29,7 @@ import ( multierr "github.com/hashicorp/go-multierror" errwrap "github.com/pkg/errors" + "golang.org/x/time/rate" ) // GetTimestamp returns the timestamp of a vertex @@ -327,6 +328,8 @@ func (g *Graph) Worker(v *Vertex) error { var delay = time.Duration(v.Meta().Delay) * time.Millisecond var retry = v.Meta().Retry // number of tries left, -1 for infinite + var limiter = rate.NewLimiter(v.Meta().Limit, v.Meta().Burst) + limited := false Loop: for { @@ -347,6 +350,35 @@ func (g *Graph) Worker(v *Vertex) error { continue } + // catch invalid rates + if v.Meta().Burst == 0 && !(v.Meta().Limit == rate.Inf) { // blocked + e := fmt.Errorf("%s[%s]: Permanently limited (rate != Inf, burst: 0)", v.Kind(), v.GetName()) + v.SendEvent(event.EventExit, &SentinelErr{e}) + ev.ACK() // ready for next message + continue + } + + // rate limit + // FIXME: consider skipping rate limit check if + // the event is a poke instead of a watch event + if !limited && !(v.Meta().Limit == rate.Inf) { // skip over the playback event... + now := time.Now() + r := limiter.ReserveN(now, 1) // one event + // r.OK() seems to always be true here! + d := r.DelayFrom(now) + if d > 0 { // delay + limited = true + playback = true + log.Printf("%s[%s]: Limited (rate: %v/sec, burst: %d, next: %v)", v.Kind(), v.GetName(), v.Meta().Limit, v.Meta().Burst, d) + // start the timer... + timer.Reset(d) + waiting = true // waiting for retry timer + ev.ACK() + continue + } // otherwise, we run directly! + } + limited = false // let one through + running = true go func(ev *event.Event) { if e := g.Process(v); e != nil { diff --git a/resources/resources.go b/resources/resources.go index 26639977..e27503f7 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -34,6 +34,7 @@ import ( "github.com/purpleidea/mgmt/event" errwrap "github.com/pkg/errors" + "golang.org/x/time/rate" ) //go:generate stringer -type=ResState -output=resstate_stringer.go @@ -93,9 +94,11 @@ type MetaParams struct { // NOTE: there are separate Watch and CheckApply retry and delay values, // but I've decided to use the same ones for both until there's a proper // reason to want to do something differently for the Watch errors. - Retry int16 `yaml:"retry"` // metaparam, number of times to retry on error. -1 for infinite - Delay uint64 `yaml:"delay"` // metaparam, number of milliseconds to wait between retries - Poll uint32 `yaml:"poll"` // metaparam, number of seconds between poll interval, 0 to watch. + Retry int16 `yaml:"retry"` // metaparam, number of times to retry on error. -1 for infinite + Delay uint64 `yaml:"delay"` // metaparam, number of milliseconds to wait between retries + Poll uint32 `yaml:"poll"` // metaparam, number of seconds between poll intervals, 0 to watch + Limit rate.Limit `yaml:"limit"` // metaparam, number of events per second to allow through + Burst int `yaml:"burst"` // metaparam, number of events to allow in a burst } // UnmarshalYAML is the custom unmarshal handler for the MetaParams struct. It @@ -117,9 +120,11 @@ var DefaultMetaParams = MetaParams{ AutoEdge: true, AutoGroup: true, Noop: false, - Retry: 0, // TODO: is this a good default? - Delay: 0, // TODO: is this a good default? - Poll: 0, // defaults to watching for events + Retry: 0, // TODO: is this a good default? + Delay: 0, // TODO: is this a good default? + Poll: 0, // defaults to watching for events + Limit: rate.Inf, // defaults to no limit + Burst: 0, // no burst needed on an infinite rate // TODO: is this a good default? } // The Base interface is everything that is common to all resources. @@ -244,6 +249,9 @@ func (obj *BaseUID) Reversed() bool { // Validate reports any problems with the struct definition. func (obj *BaseRes) Validate() error { + if obj.Meta().Burst == 0 && !(obj.Meta().Limit == rate.Inf) { // blocked + return fmt.Errorf("Permanently limited (rate != Inf, burst: 0)") + } return nil } @@ -437,6 +445,12 @@ func (obj *BaseRes) Compare(res Res) bool { if obj.Meta().Poll != res.Meta().Poll { return false } + if obj.Meta().Limit != res.Meta().Limit { + return false + } + if obj.Meta().Burst != res.Meta().Burst { + return false + } return true }