From 54296da64775fda6e416e0887bf56f875b32e0b4 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Wed, 25 Jan 2017 11:30:47 -0500 Subject: [PATCH] converger: Remove converger boilerplate from the resources This simplifies the resource code by now removing all the converger related material. Happy resource writing! --- docs/resource-guide.md | 22 ++++++++-------------- resources/exec.go | 9 --------- resources/file.go | 8 -------- resources/hostname.go | 8 -------- resources/msg.go | 7 ------- resources/noop.go | 7 ------- resources/nspawn.go | 7 ------- resources/password.go | 8 -------- resources/pkg.go | 8 -------- resources/resources.go | 1 + resources/sendrecv.go | 16 ++++++++++++---- resources/svc.go | 14 -------------- resources/timer.go | 7 ------- resources/virt.go | 11 ----------- 14 files changed, 21 insertions(+), 112 deletions(-) diff --git a/docs/resource-guide.md b/docs/resource-guide.md index 82b7250c..7577cda6 100644 --- a/docs/resource-guide.md +++ b/docs/resource-guide.md @@ -279,9 +279,8 @@ The lifetime of most resources `Watch` method should be spent in an infinite loop that is bounded by a `select` call. The `select` call is the point where our method hands back control to the engine (and the kernel) so that we can sleep until something of interest wakes us up. In this loop we must process -events from the engine via the `<-obj.Events()` call, wait for the converged -timeout with `<-cuid.ConvergedTimer()`, and receive events for our resource -itself! +events from the engine via the `<-obj.Events()` call, and receive events for our +resource itself! #### Events If we receive an internal event from the `<-obj.Events()` method, we can read it @@ -300,8 +299,8 @@ or from before `mgmt` was running. It does this by calling the `Running` method. #### Converged The engine might be asked to shutdown when the entire state of the system has -not seen any changes for some duration of time. In order for the engine to be -able to make this determination, each resource must report its converged state. +not seen any changes for some duration of time. The engine can determine this +automatically, but each resource can block this if it is absolutely necessary. To do this, the `Watch` method should get the `ConvergedUID` handle that has been prepared for it by the engine. This is done by calling the `ConvergerUID` method on the resource object. The result can be used to set the converged @@ -312,12 +311,14 @@ Instead of interacting with the `ConvergedUID` with these two methods, we can instead use the `StartTimer` and `ResetTimer` methods which accomplish the same thing, but provide a `select`-free interface for different coding situations. +This particular facility is most likely not required for most resources. It may +prove to be useful if a resource wants to start off a long operation, but avoid +sending out erroneous `Event` messages to keep things alive until it finishes. + #### Example ```golang // Watch is the listener and main loop for this resource. func (obj *FooRes) Watch(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status - // setup the Foo resource var err error if err, obj.foo = OpenFoo(); err != nil { @@ -335,7 +336,6 @@ func (obj *FooRes) Watch(processChan chan *event.Event) error { for { select { case event := <-obj.Events(): - cuid.SetConverged(false) // we avoid sending events on unpause if exit, send = obj.ReadEvent(event); exit != nil { return *exit // exit @@ -345,18 +345,12 @@ func (obj *FooRes) Watch(processChan chan *event.Event) error { case event := <-obj.foo.Events: if is_an_event { send = true // used below - cuid.SetConverged(false) obj.StateOK(false) // dirty } // event errors case err := <-obj.foo.Errors: - cuuid.SetConverged(false) return err // will cause a retry or permanent failure - - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) // converged! - continue } // do all our event sending all together to avoid duplicate msgs diff --git a/resources/exec.go b/resources/exec.go index b09c7e25..36e24291 100644 --- a/resources/exec.go +++ b/resources/exec.go @@ -114,8 +114,6 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan // Watch is the primary listener for this resource and it outputs events. func (obj *ExecRes) Watch(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status - var send = false // send event? var exit *error bufioch, errch := make(chan string), make(chan error) @@ -165,7 +163,6 @@ func (obj *ExecRes) Watch(processChan chan *event.Event) error { for { select { case text := <-bufioch: - cuid.SetConverged(false) // each time we get a line of output, we loop! log.Printf("%s[%s]: Watch output: %s", obj.Kind(), obj.GetName(), text) if text != "" { @@ -173,7 +170,6 @@ func (obj *ExecRes) Watch(processChan chan *event.Event) error { } case err := <-errch: - cuid.SetConverged(false) if err == nil { // EOF // FIXME: add an "if watch command ends/crashes" // restart or generate error option @@ -183,14 +179,9 @@ func (obj *ExecRes) Watch(processChan chan *event.Event) error { return errwrap.Wrapf(err, "Unknown error") case event := <-obj.Events(): - cuid.SetConverged(false) if exit, send = obj.ReadEvent(event); exit != nil { return *exit // exit } - - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) // converged! - continue } // do all our event sending all together to avoid duplicate msgs diff --git a/resources/file.go b/resources/file.go index b80f2341..f21d77ec 100644 --- a/resources/file.go +++ b/resources/file.go @@ -148,8 +148,6 @@ func (obj *FileRes) GetPath() string { // must be restarted. On a clean exit it returns nil. // FIXME: Also watch the source directory when using obj.Source !!! func (obj *FileRes) Watch(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status - var err error obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse) if err != nil { @@ -175,7 +173,6 @@ func (obj *FileRes) Watch(processChan chan *event.Event) error { if !ok { // channel shutdown return nil } - cuid.SetConverged(false) if err := event.Error; err != nil { return errwrap.Wrapf(err, "Unknown %s[%s] watcher error", obj.Kind(), obj.GetName()) } @@ -186,15 +183,10 @@ func (obj *FileRes) Watch(processChan chan *event.Event) error { obj.StateOK(false) // dirty case event := <-obj.Events(): - cuid.SetConverged(false) if exit, send = obj.ReadEvent(event); exit != nil { return *exit // exit } //obj.StateOK(false) // dirty // these events don't invalidate state - - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) // converged! - continue } // do all our event sending all together to avoid duplicate msgs diff --git a/resources/hostname.go b/resources/hostname.go index a8dbd2db..be0c9966 100644 --- a/resources/hostname.go +++ b/resources/hostname.go @@ -112,8 +112,6 @@ func (obj *HostnameRes) Init() error { // Watch is the primary listener for this resource and it outputs events. func (obj *HostnameRes) Watch(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status - // if we share the bus with others, we will get each others messages!! bus, err := util.SystemBusPrivateUsable() // don't share the bus connection! if err != nil { @@ -140,22 +138,16 @@ func (obj *HostnameRes) Watch(processChan chan *event.Event) error { for { select { case <-signals: - cuid.SetConverged(false) send = true obj.StateOK(false) // dirty case event := <-obj.Events(): - cuid.SetConverged(false) // we avoid sending events on unpause if exit, _ := obj.ReadEvent(event); exit != nil { return *exit // exit } send = true obj.StateOK(false) // dirty - - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) // converged! - continue } // do all our event sending all together to avoid duplicate msgs diff --git a/resources/msg.go b/resources/msg.go index 4b1ba42c..4320469c 100644 --- a/resources/msg.go +++ b/resources/msg.go @@ -140,8 +140,6 @@ func (obj *MsgRes) journalPriority() journal.Priority { // Watch is the primary listener for this resource and it outputs events. func (obj *MsgRes) Watch(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status - // notify engine that we're running if err := obj.Running(processChan); err != nil { return err // bubble up a NACK... @@ -152,15 +150,10 @@ func (obj *MsgRes) Watch(processChan chan *event.Event) error { for { select { case event := <-obj.Events(): - cuid.SetConverged(false) // we avoid sending events on unpause if exit, send = obj.ReadEvent(event); exit != nil { return *exit // exit } - - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) // converged! - continue } // do all our event sending all together to avoid duplicate msgs diff --git a/resources/noop.go b/resources/noop.go index 659b76f1..a8141adb 100644 --- a/resources/noop.go +++ b/resources/noop.go @@ -64,8 +64,6 @@ func (obj *NoopRes) Init() error { // Watch is the primary listener for this resource and it outputs events. func (obj *NoopRes) Watch(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status - // notify engine that we're running if err := obj.Running(processChan); err != nil { return err // bubble up a NACK... @@ -76,15 +74,10 @@ func (obj *NoopRes) Watch(processChan chan *event.Event) error { for { select { case event := <-obj.Events(): - cuid.SetConverged(false) // we avoid sending events on unpause if exit, send = obj.ReadEvent(event); exit != nil { return *exit // exit } - - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) // converged! - continue } // do all our event sending all together to avoid duplicate msgs diff --git a/resources/nspawn.go b/resources/nspawn.go index 1d3ed245..93c82362 100644 --- a/resources/nspawn.go +++ b/resources/nspawn.go @@ -107,8 +107,6 @@ func (obj *NspawnRes) Init() error { // Watch for state changes and sends a message to the bus if there is a change func (obj *NspawnRes) Watch(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status - // this resource depends on systemd ensure that it's running if !systemdUtil.IsRunningSystemd() { return fmt.Errorf("Systemd is not running.") @@ -157,14 +155,9 @@ func (obj *NspawnRes) Watch(processChan chan *event.Event) error { } case event := <-obj.Events(): - cuid.SetConverged(false) if exit, send = obj.ReadEvent(event); exit != nil { return *exit // exit } - - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) // converged! - continue } // do all our event sending all together to avoid duplicate msgs diff --git a/resources/password.go b/resources/password.go index 07cf5f62..2ce18ec6 100644 --- a/resources/password.go +++ b/resources/password.go @@ -174,8 +174,6 @@ Loop: // Watch is the primary listener for this resource and it outputs events. func (obj *PasswordRes) Watch(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status - var err error obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false) if err != nil { @@ -197,7 +195,6 @@ func (obj *PasswordRes) Watch(processChan chan *event.Event) error { if !ok { // channel shutdown return nil } - cuid.SetConverged(false) if err := event.Error; err != nil { return errwrap.Wrapf(err, "Unknown %s[%s] watcher error", obj.Kind(), obj.GetName()) } @@ -205,15 +202,10 @@ func (obj *PasswordRes) Watch(processChan chan *event.Event) error { obj.StateOK(false) // dirty case event := <-obj.Events(): - cuid.SetConverged(false) // we avoid sending events on unpause if exit, send = obj.ReadEvent(event); exit != nil { return *exit // exit } - - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) // converged! - continue } // do all our event sending all together to avoid duplicate msgs diff --git a/resources/pkg.go b/resources/pkg.go index f2f68569..fc8eea3e 100644 --- a/resources/pkg.go +++ b/resources/pkg.go @@ -116,8 +116,6 @@ func (obj *PkgRes) Init() error { // TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/110 func (obj *PkgRes) Watch(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status - bus := packagekit.NewBus() if bus == nil { return fmt.Errorf("Can't connect to PackageKit bus.") @@ -144,8 +142,6 @@ func (obj *PkgRes) Watch(processChan chan *event.Event) error { select { case event := <-ch: - cuid.SetConverged(false) - // FIXME: ask packagekit for info on what packages changed if obj.debug { log.Printf("%s: Event: %v", obj.fmtNames(obj.getNames()), event.Name) @@ -161,15 +157,11 @@ func (obj *PkgRes) Watch(processChan chan *event.Event) error { obj.StateOK(false) // dirty case event := <-obj.Events(): - cuid.SetConverged(false) if exit, send = obj.ReadEvent(event); exit != nil { return *exit // exit } //obj.StateOK(false) // these events don't invalidate state - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) // converged! - continue } // do all our event sending all together to avoid duplicate msgs diff --git a/resources/resources.go b/resources/resources.go index ac52f92d..1c1ea93d 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -530,6 +530,7 @@ func (obj *BaseRes) Poll(processChan chan *event.Event) error { if err := obj.Running(processChan); err != nil { return err // bubble up a NACK... } + cuid.SetConverged(false) // quickly stop any converge due to Running() var send = false var exit *error diff --git a/resources/sendrecv.go b/resources/sendrecv.go index 76b4fd68..0ed0dcb4 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -98,10 +98,18 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) { // Running is called by the Watch method of the resource once it has started up. // This signals to the engine to kick off the initial CheckApply resource check. func (obj *BaseRes) Running(processChan chan *event.Event) error { - obj.StateOK(false) // assume we're initially dirty - cuid := obj.ConvergerUID() // get the converger uid used to report status - cuid.SetConverged(false) // a reasonable initial assumption - close(obj.started) // send started signal + // TODO: If a non-polling resource wants to use the converger, then it + // should probably tell Running (via an arg) to not do this. Currently + // it is a very unlikey race that could cause an early converge if the + // converge timeout is very short ( ~ 1s) and the Watch method doesn't + // immediately SetConverged(false) to stop possible early termination. + if obj.Meta().Poll == 0 { // if not polling, unblock this... + cuid := obj.ConvergerUID() + cuid.SetConverged(true) // a reasonable initial assumption + } + + obj.StateOK(false) // assume we're initially dirty + close(obj.started) // send started signal var err error if obj.starter { // vertices of indegree == 0 should send initial pokes diff --git a/resources/svc.go b/resources/svc.go index c243a6b7..66bcddd2 100644 --- a/resources/svc.go +++ b/resources/svc.go @@ -80,8 +80,6 @@ func (obj *SvcRes) Init() error { // Watch is the primary listener for this resource and it outputs events. func (obj *SvcRes) Watch(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status - // obj.Name: svc name if !systemdUtil.IsRunningSystemd() { return fmt.Errorf("Systemd is not running.") @@ -155,19 +153,13 @@ func (obj *SvcRes) Watch(processChan chan *event.Event) error { select { case <-buschan: // XXX: wait for new units event to unstick - cuid.SetConverged(false) // loop so that we can see the changed invalid signal log.Printf("Svc[%s]->DaemonReload()", svc) case event := <-obj.Events(): - cuid.SetConverged(false) if exit, send = obj.ReadEvent(event); exit != nil { return *exit // exit } - - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) // converged! - continue } } else { if !activeSet { @@ -202,18 +194,12 @@ func (obj *SvcRes) Watch(processChan chan *event.Event) error { obj.StateOK(false) // dirty case err := <-subErrors: - cuid.SetConverged(false) return errwrap.Wrapf(err, "Unknown %s[%s] error", obj.Kind(), obj.GetName()) case event := <-obj.Events(): - cuid.SetConverged(false) if exit, send = obj.ReadEvent(event); exit != nil { return *exit // exit } - - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) // converged! - continue } } diff --git a/resources/timer.go b/resources/timer.go index 5be83379..3ae6266e 100644 --- a/resources/timer.go +++ b/resources/timer.go @@ -78,8 +78,6 @@ func (obj *TimerRes) newTicker() *time.Ticker { // Watch is the primary listener for this resource and it outputs events. func (obj *TimerRes) Watch(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status - // create a time.Ticker for the given interval obj.ticker = obj.newTicker() defer obj.ticker.Stop() @@ -98,14 +96,9 @@ func (obj *TimerRes) Watch(processChan chan *event.Event) error { log.Printf("%s[%s]: received tick", obj.Kind(), obj.GetName()) case event := <-obj.Events(): - cuid.SetConverged(false) if exit, _ := obj.ReadEvent(event); exit != nil { return *exit // exit } - - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) - continue } if send { diff --git a/resources/virt.go b/resources/virt.go index dc034404..50a89926 100644 --- a/resources/virt.go +++ b/resources/virt.go @@ -156,8 +156,6 @@ func (obj *VirtRes) connect() (conn *libvirt.Connect, err error) { // Watch is the primary listener for this resource and it outputs events. func (obj *VirtRes) Watch(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status - conn, err := obj.connect() if err != nil { return fmt.Errorf("Connection to libvirt failed with: %s", err) @@ -251,23 +249,14 @@ func (obj *VirtRes) Watch(processChan chan *event.Event) error { obj.StateOK(false) // dirty send = true } - if send { - cuid.SetConverged(false) - } case err := <-errorChan: - cuid.SetConverged(false) return fmt.Errorf("Unknown %s[%s] libvirt error: %s", obj.Kind(), obj.GetName(), err) case event := <-obj.Events(): - cuid.SetConverged(false) if exit, send = obj.ReadEvent(event); exit != nil { return *exit // exit } - - case <-cuid.ConvergedTimer(): - cuid.SetConverged(true) // converged! - continue } if send {