resources: Add poll metaparameter

This allows a resource to use polling instead of the event based
mechanism. This isn't recommended, but it could be useful, and it was
certainly fun to code!
This commit is contained in:
James Shubin
2016-12-23 23:48:33 -05:00
parent 6ad8ac0b6b
commit b921aabbed
5 changed files with 129 additions and 7 deletions

View File

@@ -449,6 +449,23 @@ they could have separate values, I've decided to use the same ones for both
until there's a proper reason to want to do something differently for the Watch
errors.
####Poll
Integer. Number of seconds to wait between `CheckApply` checks. If this is
greater than zero, then the standard event based `Watch` mechanism for this
resource is replaced with a simple polling mechanism. In general, this is not
recommended, unless you have a very good reason for doing so.
Please keep in mind that if you have a resource which changes every `I` seconds,
and you poll it every `J` seconds, and you've asked for a converged timeout of
`K` seconds, and `I <= J <= K`, then your graph will likely never converge.
When polling, the system detects that a resource is not converged if its
`CheckApply` method returns false. This allows a resource which changes every
`I` seconds, and which is polled every `J` seconds, and with a converged timeout
of `K` seconds to still converge when `J <= K`, as long as `I > J || I > K`,
which is another way of saying that if the resource finally settles down to give
the graph enough time, it can probably converge.
###Graph definition file
graph.yaml is the compiled graph definition file. The format is currently
undocumented, but by looking through the [examples/](https://github.com/purpleidea/mgmt/tree/master/examples)

24
examples/poll1.yaml Normal file
View File

@@ -0,0 +1,24 @@
---
graph: mygraph
resources:
file:
- name: file1
meta:
poll: 5
path: "/tmp/mgmt/f1"
content: |
i am f1
state: exists
- name: file2
path: "/tmp/mgmt/f2"
content: |
i am f2
state: exists
- name: file3
meta:
poll: 1
path: "/tmp/mgmt/f3"
content: |
i am f3
state: exists
edges: []

View File

@@ -209,8 +209,30 @@ func (g *Graph) Process(v *Vertex) error {
// run the CheckApply!
} else {
// if the CheckApply run takes longer than the converged
// timeout, we could inappropriately converge mid-apply!
// avoid this by blocking convergence with a fake report
block := obj.Converger().Register() // get an extra cuid
block.SetConverged(false) // block while CheckApply runs!
// if this fails, don't UpdateTimestamp()
checkOK, err = obj.CheckApply(!noop)
block.SetConverged(true) // unblock
block.Unregister()
// TODO: Can the `Poll` converged timeout tracking be a
// more general method for all converged timeouts? this
// would simplify the resources by removing boilerplate
if v.Meta().Poll > 0 {
if !checkOK { // something changed, restart timer
cuid := v.Res.ConvergerUID() // get the converger uid used to report status
cuid.ResetTimer() // activity!
if g.Flags.Debug {
log.Printf("%s[%s]: Converger: ResetTimer", obj.Kind(), obj.GetName())
}
}
}
}
if checkOK && err != nil { // should never return this way
@@ -419,7 +441,15 @@ func (g *Graph) Worker(v *Vertex) error {
// TODO: reset the watch retry count after some amount of success
v.Res.RegisterConverger()
e := v.Res.Watch(processChan)
var e error
if v.Meta().Poll > 0 { // poll instead of watching :(
cuid := v.Res.ConvergerUID() // get the converger uid used to report status
cuid.StartTimer()
e = v.Res.Poll(processChan)
cuid.StopTimer() // clean up nicely
} else {
e = v.Res.Watch(processChan) // run the watch normally
}
v.Res.UnregisterConverger()
if e == nil { // exit signal
err = nil // clean exit

View File

@@ -26,6 +26,7 @@ import (
"log"
"os"
"path"
"time"
// TODO: should each resource be a sub-package?
"github.com/purpleidea/mgmt/converger"
@@ -93,6 +94,7 @@ type MetaParams struct {
// reason to want to do something differently for the Watch errors.
Retry int16 `yaml:"retry"` // metaparam, number of times to retry on error. -1 for infinite
Delay uint64 `yaml:"delay"` // metaparam, number of milliseconds to wait between retries
Poll uint32 `yaml:"poll"` // metaparam, number of seconds between poll interval, 0 to watch.
}
// UnmarshalYAML is the custom unmarshal handler for the MetaParams struct. It
@@ -116,6 +118,7 @@ var DefaultMetaParams = MetaParams{
Noop: false,
Retry: 0, // TODO: is this a good default?
Delay: 0, // TODO: is this a good default?
Poll: 0, // defaults to watching for events
}
// The Base interface is everything that is common to all resources.
@@ -130,6 +133,7 @@ type Base interface {
AssociateData(*Data)
IsWatching() bool
SetWatching(bool)
Converger() converger.Converger
RegisterConverger()
UnregisterConverger()
ConvergerUID() converger.ConvergerUID
@@ -153,6 +157,7 @@ type Base interface {
Running(chan event.Event) error // notify the engine that Watch started
Started() <-chan struct{} // returns when the resource has started
Starter(bool)
Poll(chan event.Event) error // poll alternative to watching :(
}
// Res is the minimum interface you need to implement to define a new resource.
@@ -295,6 +300,12 @@ func (obj *BaseRes) SetWatching(b bool) {
obj.watching = b
}
// Converger returns the converger object used by the system. It can be used to
// register new convergers if needed.
func (obj *BaseRes) Converger() converger.Converger {
return obj.converger
}
// RegisterConverger sets up the cuid for the resource. This is a helper
// function for the engine, and shouldn't be called by the resources directly.
func (obj *BaseRes) RegisterConverger() {
@@ -399,6 +410,9 @@ func (obj *BaseRes) Compare(res Res) bool {
if obj.Meta().Delay != res.Meta().Delay {
return false
}
if obj.Meta().Poll != res.Meta().Poll {
return false
}
return true
}
@@ -438,6 +452,45 @@ func (obj *BaseRes) Started() <-chan struct{} { return obj.started }
// If we have an indegree of 0, we'll need to be a poke initiator in the graph.
func (obj *BaseRes) Starter(b bool) { obj.starter = b }
// Poll is the watch replacement for when we want to poll, which outputs events.
func (obj *BaseRes) Poll(processChan chan event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
// create a time.Ticker for the given interval
ticker := time.NewTicker(time.Duration(obj.Meta().Poll) * time.Second)
defer ticker.Stop()
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false
var exit = false
for {
obj.SetState(ResStateWatching)
select {
case <-ticker.C: // received the timer event
log.Printf("%s[%s]: polling...", obj.Kind(), obj.GetName())
send = true
obj.StateOK(false) // dirty
case event := <-obj.Events():
cuid.ResetTimer() // important
if exit, send = obj.ReadEvent(&event); exit {
return nil
}
}
if send {
send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK...
}
}
}
}
// ResToB64 encodes a resource to a base64 encoded string (after serialization)
func ResToB64(res Res) (string, error) {
b := bytes.Buffer{}

View File

@@ -62,8 +62,6 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) {
if ev.GetActivity() { // if previous node did work, and we were notified...
//obj.StateOK(false) // not necessarily
poke = true // poke!
// XXX: this should be elsewhere in case Watch isn't used (eg: Polling instead...)
// XXX: unless this is used in our "fallback" polling implementation???
//obj.SetRefresh(true) // TODO: is this redundant?
}
@@ -111,10 +109,10 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) {
// Running is called by the Watch method of the resource once it has started up.
// This signals to the engine to kick off the initial CheckApply resource check.
func (obj *BaseRes) Running(processChan chan event.Event) error {
obj.StateOK(false) // assume we're initially dirty
cuid := obj.ConvergerUID() // get the converger uid used to report status
cuid.SetConverged(false) // a reasonable initial assumption
close(obj.started) // send started signal
obj.StateOK(false) // assume we're initially dirty
cuid := obj.ConvergerUID() // get the converger uid used to report status
cuid.SetConverged(false) // a reasonable initial assumption
close(obj.started) // send started signal
// FIXME: exit return value is unused atm, so ignore it for now...
//if exit, err := obj.DoSend(processChan, ""); exit || err != nil {