converger: Remove converger boilerplate from the resources

This simplifies the resource code by now removing all the converger
related material. Happy resource writing!
This commit is contained in:
James Shubin
2017-01-25 11:30:47 -05:00
parent 357102fdb5
commit 54296da647
14 changed files with 21 additions and 112 deletions

View File

@@ -279,9 +279,8 @@ The lifetime of most resources `Watch` method should be spent in an infinite
loop that is bounded by a `select` call. The `select` call is the point where loop that is bounded by a `select` call. The `select` call is the point where
our method hands back control to the engine (and the kernel) so that we can our method hands back control to the engine (and the kernel) so that we can
sleep until something of interest wakes us up. In this loop we must process sleep until something of interest wakes us up. In this loop we must process
events from the engine via the `<-obj.Events()` call, wait for the converged events from the engine via the `<-obj.Events()` call, and receive events for our
timeout with `<-cuid.ConvergedTimer()`, and receive events for our resource resource itself!
itself!
#### Events #### Events
If we receive an internal event from the `<-obj.Events()` method, we can read it If we receive an internal event from the `<-obj.Events()` method, we can read it
@@ -300,8 +299,8 @@ or from before `mgmt` was running. It does this by calling the `Running` method.
#### Converged #### Converged
The engine might be asked to shutdown when the entire state of the system has The engine might be asked to shutdown when the entire state of the system has
not seen any changes for some duration of time. In order for the engine to be not seen any changes for some duration of time. The engine can determine this
able to make this determination, each resource must report its converged state. automatically, but each resource can block this if it is absolutely necessary.
To do this, the `Watch` method should get the `ConvergedUID` handle that has To do this, the `Watch` method should get the `ConvergedUID` handle that has
been prepared for it by the engine. This is done by calling the `ConvergerUID` been prepared for it by the engine. This is done by calling the `ConvergerUID`
method on the resource object. The result can be used to set the converged method on the resource object. The result can be used to set the converged
@@ -312,12 +311,14 @@ Instead of interacting with the `ConvergedUID` with these two methods, we can
instead use the `StartTimer` and `ResetTimer` methods which accomplish the same instead use the `StartTimer` and `ResetTimer` methods which accomplish the same
thing, but provide a `select`-free interface for different coding situations. thing, but provide a `select`-free interface for different coding situations.
This particular facility is most likely not required for most resources. It may
prove to be useful if a resource wants to start off a long operation, but avoid
sending out erroneous `Event` messages to keep things alive until it finishes.
#### Example #### Example
```golang ```golang
// Watch is the listener and main loop for this resource. // Watch is the listener and main loop for this resource.
func (obj *FooRes) Watch(processChan chan *event.Event) error { func (obj *FooRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
// setup the Foo resource // setup the Foo resource
var err error var err error
if err, obj.foo = OpenFoo(); err != nil { if err, obj.foo = OpenFoo(); err != nil {
@@ -335,7 +336,6 @@ func (obj *FooRes) Watch(processChan chan *event.Event) error {
for { for {
select { select {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
// we avoid sending events on unpause // we avoid sending events on unpause
if exit, send = obj.ReadEvent(event); exit != nil { if exit, send = obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
@@ -345,18 +345,12 @@ func (obj *FooRes) Watch(processChan chan *event.Event) error {
case event := <-obj.foo.Events: case event := <-obj.foo.Events:
if is_an_event { if is_an_event {
send = true // used below send = true // used below
cuid.SetConverged(false)
obj.StateOK(false) // dirty obj.StateOK(false) // dirty
} }
// event errors // event errors
case err := <-obj.foo.Errors: case err := <-obj.foo.Errors:
cuuid.SetConverged(false)
return err // will cause a retry or permanent failure return err // will cause a retry or permanent failure
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs

View File

@@ -114,8 +114,6 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *ExecRes) Watch(processChan chan *event.Event) error { func (obj *ExecRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
var send = false // send event? var send = false // send event?
var exit *error var exit *error
bufioch, errch := make(chan string), make(chan error) bufioch, errch := make(chan string), make(chan error)
@@ -165,7 +163,6 @@ func (obj *ExecRes) Watch(processChan chan *event.Event) error {
for { for {
select { select {
case text := <-bufioch: case text := <-bufioch:
cuid.SetConverged(false)
// each time we get a line of output, we loop! // each time we get a line of output, we loop!
log.Printf("%s[%s]: Watch output: %s", obj.Kind(), obj.GetName(), text) log.Printf("%s[%s]: Watch output: %s", obj.Kind(), obj.GetName(), text)
if text != "" { if text != "" {
@@ -173,7 +170,6 @@ func (obj *ExecRes) Watch(processChan chan *event.Event) error {
} }
case err := <-errch: case err := <-errch:
cuid.SetConverged(false)
if err == nil { // EOF if err == nil { // EOF
// FIXME: add an "if watch command ends/crashes" // FIXME: add an "if watch command ends/crashes"
// restart or generate error option // restart or generate error option
@@ -183,14 +179,9 @@ func (obj *ExecRes) Watch(processChan chan *event.Event) error {
return errwrap.Wrapf(err, "Unknown error") return errwrap.Wrapf(err, "Unknown error")
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
if exit, send = obj.ReadEvent(event); exit != nil { if exit, send = obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
} }
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs

View File

@@ -148,8 +148,6 @@ func (obj *FileRes) GetPath() string {
// must be restarted. On a clean exit it returns nil. // must be restarted. On a clean exit it returns nil.
// FIXME: Also watch the source directory when using obj.Source !!! // FIXME: Also watch the source directory when using obj.Source !!!
func (obj *FileRes) Watch(processChan chan *event.Event) error { func (obj *FileRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
var err error var err error
obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse) obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse)
if err != nil { if err != nil {
@@ -175,7 +173,6 @@ func (obj *FileRes) Watch(processChan chan *event.Event) error {
if !ok { // channel shutdown if !ok { // channel shutdown
return nil return nil
} }
cuid.SetConverged(false)
if err := event.Error; err != nil { if err := event.Error; err != nil {
return errwrap.Wrapf(err, "Unknown %s[%s] watcher error", obj.Kind(), obj.GetName()) return errwrap.Wrapf(err, "Unknown %s[%s] watcher error", obj.Kind(), obj.GetName())
} }
@@ -186,15 +183,10 @@ func (obj *FileRes) Watch(processChan chan *event.Event) error {
obj.StateOK(false) // dirty obj.StateOK(false) // dirty
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
if exit, send = obj.ReadEvent(event); exit != nil { if exit, send = obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
} }
//obj.StateOK(false) // dirty // these events don't invalidate state //obj.StateOK(false) // dirty // these events don't invalidate state
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs

View File

@@ -112,8 +112,6 @@ func (obj *HostnameRes) Init() error {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *HostnameRes) Watch(processChan chan *event.Event) error { func (obj *HostnameRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
// if we share the bus with others, we will get each others messages!! // if we share the bus with others, we will get each others messages!!
bus, err := util.SystemBusPrivateUsable() // don't share the bus connection! bus, err := util.SystemBusPrivateUsable() // don't share the bus connection!
if err != nil { if err != nil {
@@ -140,22 +138,16 @@ func (obj *HostnameRes) Watch(processChan chan *event.Event) error {
for { for {
select { select {
case <-signals: case <-signals:
cuid.SetConverged(false)
send = true send = true
obj.StateOK(false) // dirty obj.StateOK(false) // dirty
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
// we avoid sending events on unpause // we avoid sending events on unpause
if exit, _ := obj.ReadEvent(event); exit != nil { if exit, _ := obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
} }
send = true send = true
obj.StateOK(false) // dirty obj.StateOK(false) // dirty
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs

View File

@@ -140,8 +140,6 @@ func (obj *MsgRes) journalPriority() journal.Priority {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *MsgRes) Watch(processChan chan *event.Event) error { func (obj *MsgRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
@@ -152,15 +150,10 @@ func (obj *MsgRes) Watch(processChan chan *event.Event) error {
for { for {
select { select {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
// we avoid sending events on unpause // we avoid sending events on unpause
if exit, send = obj.ReadEvent(event); exit != nil { if exit, send = obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
} }
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs

View File

@@ -64,8 +64,6 @@ func (obj *NoopRes) Init() error {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *NoopRes) Watch(processChan chan *event.Event) error { func (obj *NoopRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
// notify engine that we're running // notify engine that we're running
if err := obj.Running(processChan); err != nil { if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
@@ -76,15 +74,10 @@ func (obj *NoopRes) Watch(processChan chan *event.Event) error {
for { for {
select { select {
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
// we avoid sending events on unpause // we avoid sending events on unpause
if exit, send = obj.ReadEvent(event); exit != nil { if exit, send = obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
} }
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs

View File

@@ -107,8 +107,6 @@ func (obj *NspawnRes) Init() error {
// Watch for state changes and sends a message to the bus if there is a change // Watch for state changes and sends a message to the bus if there is a change
func (obj *NspawnRes) Watch(processChan chan *event.Event) error { func (obj *NspawnRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
// this resource depends on systemd ensure that it's running // this resource depends on systemd ensure that it's running
if !systemdUtil.IsRunningSystemd() { if !systemdUtil.IsRunningSystemd() {
return fmt.Errorf("Systemd is not running.") return fmt.Errorf("Systemd is not running.")
@@ -157,14 +155,9 @@ func (obj *NspawnRes) Watch(processChan chan *event.Event) error {
} }
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
if exit, send = obj.ReadEvent(event); exit != nil { if exit, send = obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
} }
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs

View File

@@ -174,8 +174,6 @@ Loop:
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *PasswordRes) Watch(processChan chan *event.Event) error { func (obj *PasswordRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
var err error var err error
obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false) obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false)
if err != nil { if err != nil {
@@ -197,7 +195,6 @@ func (obj *PasswordRes) Watch(processChan chan *event.Event) error {
if !ok { // channel shutdown if !ok { // channel shutdown
return nil return nil
} }
cuid.SetConverged(false)
if err := event.Error; err != nil { if err := event.Error; err != nil {
return errwrap.Wrapf(err, "Unknown %s[%s] watcher error", obj.Kind(), obj.GetName()) return errwrap.Wrapf(err, "Unknown %s[%s] watcher error", obj.Kind(), obj.GetName())
} }
@@ -205,15 +202,10 @@ func (obj *PasswordRes) Watch(processChan chan *event.Event) error {
obj.StateOK(false) // dirty obj.StateOK(false) // dirty
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
// we avoid sending events on unpause // we avoid sending events on unpause
if exit, send = obj.ReadEvent(event); exit != nil { if exit, send = obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
} }
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs

View File

@@ -116,8 +116,6 @@ func (obj *PkgRes) Init() error {
// TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/109
// TODO: https://github.com/hughsie/PackageKit/issues/110 // TODO: https://github.com/hughsie/PackageKit/issues/110
func (obj *PkgRes) Watch(processChan chan *event.Event) error { func (obj *PkgRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
bus := packagekit.NewBus() bus := packagekit.NewBus()
if bus == nil { if bus == nil {
return fmt.Errorf("Can't connect to PackageKit bus.") return fmt.Errorf("Can't connect to PackageKit bus.")
@@ -144,8 +142,6 @@ func (obj *PkgRes) Watch(processChan chan *event.Event) error {
select { select {
case event := <-ch: case event := <-ch:
cuid.SetConverged(false)
// FIXME: ask packagekit for info on what packages changed // FIXME: ask packagekit for info on what packages changed
if obj.debug { if obj.debug {
log.Printf("%s: Event: %v", obj.fmtNames(obj.getNames()), event.Name) log.Printf("%s: Event: %v", obj.fmtNames(obj.getNames()), event.Name)
@@ -161,15 +157,11 @@ func (obj *PkgRes) Watch(processChan chan *event.Event) error {
obj.StateOK(false) // dirty obj.StateOK(false) // dirty
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
if exit, send = obj.ReadEvent(event); exit != nil { if exit, send = obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
} }
//obj.StateOK(false) // these events don't invalidate state //obj.StateOK(false) // these events don't invalidate state
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
} }
// do all our event sending all together to avoid duplicate msgs // do all our event sending all together to avoid duplicate msgs

View File

@@ -530,6 +530,7 @@ func (obj *BaseRes) Poll(processChan chan *event.Event) error {
if err := obj.Running(processChan); err != nil { if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK... return err // bubble up a NACK...
} }
cuid.SetConverged(false) // quickly stop any converge due to Running()
var send = false var send = false
var exit *error var exit *error

View File

@@ -98,9 +98,17 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) {
// Running is called by the Watch method of the resource once it has started up. // Running is called by the Watch method of the resource once it has started up.
// This signals to the engine to kick off the initial CheckApply resource check. // This signals to the engine to kick off the initial CheckApply resource check.
func (obj *BaseRes) Running(processChan chan *event.Event) error { func (obj *BaseRes) Running(processChan chan *event.Event) error {
// TODO: If a non-polling resource wants to use the converger, then it
// should probably tell Running (via an arg) to not do this. Currently
// it is a very unlikey race that could cause an early converge if the
// converge timeout is very short ( ~ 1s) and the Watch method doesn't
// immediately SetConverged(false) to stop possible early termination.
if obj.Meta().Poll == 0 { // if not polling, unblock this...
cuid := obj.ConvergerUID()
cuid.SetConverged(true) // a reasonable initial assumption
}
obj.StateOK(false) // assume we're initially dirty obj.StateOK(false) // assume we're initially dirty
cuid := obj.ConvergerUID() // get the converger uid used to report status
cuid.SetConverged(false) // a reasonable initial assumption
close(obj.started) // send started signal close(obj.started) // send started signal
var err error var err error

View File

@@ -80,8 +80,6 @@ func (obj *SvcRes) Init() error {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *SvcRes) Watch(processChan chan *event.Event) error { func (obj *SvcRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
// obj.Name: svc name // obj.Name: svc name
if !systemdUtil.IsRunningSystemd() { if !systemdUtil.IsRunningSystemd() {
return fmt.Errorf("Systemd is not running.") return fmt.Errorf("Systemd is not running.")
@@ -155,19 +153,13 @@ func (obj *SvcRes) Watch(processChan chan *event.Event) error {
select { select {
case <-buschan: // XXX: wait for new units event to unstick case <-buschan: // XXX: wait for new units event to unstick
cuid.SetConverged(false)
// loop so that we can see the changed invalid signal // loop so that we can see the changed invalid signal
log.Printf("Svc[%s]->DaemonReload()", svc) log.Printf("Svc[%s]->DaemonReload()", svc)
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
if exit, send = obj.ReadEvent(event); exit != nil { if exit, send = obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
} }
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
} }
} else { } else {
if !activeSet { if !activeSet {
@@ -202,18 +194,12 @@ func (obj *SvcRes) Watch(processChan chan *event.Event) error {
obj.StateOK(false) // dirty obj.StateOK(false) // dirty
case err := <-subErrors: case err := <-subErrors:
cuid.SetConverged(false)
return errwrap.Wrapf(err, "Unknown %s[%s] error", obj.Kind(), obj.GetName()) return errwrap.Wrapf(err, "Unknown %s[%s] error", obj.Kind(), obj.GetName())
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
if exit, send = obj.ReadEvent(event); exit != nil { if exit, send = obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
} }
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
} }
} }

View File

@@ -78,8 +78,6 @@ func (obj *TimerRes) newTicker() *time.Ticker {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *TimerRes) Watch(processChan chan *event.Event) error { func (obj *TimerRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
// create a time.Ticker for the given interval // create a time.Ticker for the given interval
obj.ticker = obj.newTicker() obj.ticker = obj.newTicker()
defer obj.ticker.Stop() defer obj.ticker.Stop()
@@ -98,14 +96,9 @@ func (obj *TimerRes) Watch(processChan chan *event.Event) error {
log.Printf("%s[%s]: received tick", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: received tick", obj.Kind(), obj.GetName())
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
if exit, _ := obj.ReadEvent(event); exit != nil { if exit, _ := obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
} }
case <-cuid.ConvergedTimer():
cuid.SetConverged(true)
continue
} }
if send { if send {

View File

@@ -156,8 +156,6 @@ func (obj *VirtRes) connect() (conn *libvirt.Connect, err error) {
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
func (obj *VirtRes) Watch(processChan chan *event.Event) error { func (obj *VirtRes) Watch(processChan chan *event.Event) error {
cuid := obj.ConvergerUID() // get the converger uid used to report status
conn, err := obj.connect() conn, err := obj.connect()
if err != nil { if err != nil {
return fmt.Errorf("Connection to libvirt failed with: %s", err) return fmt.Errorf("Connection to libvirt failed with: %s", err)
@@ -251,23 +249,14 @@ func (obj *VirtRes) Watch(processChan chan *event.Event) error {
obj.StateOK(false) // dirty obj.StateOK(false) // dirty
send = true send = true
} }
if send {
cuid.SetConverged(false)
}
case err := <-errorChan: case err := <-errorChan:
cuid.SetConverged(false)
return fmt.Errorf("Unknown %s[%s] libvirt error: %s", obj.Kind(), obj.GetName(), err) return fmt.Errorf("Unknown %s[%s] libvirt error: %s", obj.Kind(), obj.GetName(), err)
case event := <-obj.Events(): case event := <-obj.Events():
cuid.SetConverged(false)
if exit, send = obj.ReadEvent(event); exit != nil { if exit, send = obj.ReadEvent(event); exit != nil {
return *exit // exit return *exit // exit
} }
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
} }
if send { if send {