resources: Simplify resource Converger and Startup code

This takes the Converged initialization and Startup patterns that are
common in all resources, and bakes it into the core engine. This way
resource writing is much more concise and there is less boilerplate!
This commit is contained in:
James Shubin
2016-12-11 22:22:53 -05:00
parent e519811893
commit 36b916f27f
14 changed files with 102 additions and 192 deletions

View File

@@ -415,7 +415,9 @@ func (g *Graph) Worker(v *Vertex) error {
}
// TODO: reset the watch retry count after some amount of success
v.Res.RegisterConverger()
e := v.Res.Watch(processChan)
v.Res.UnregisterConverger()
if e == nil { // exit signal
err = nil // clean exit
break

View File

@@ -25,7 +25,6 @@ import (
"log"
"os/exec"
"strings"
"time"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util"
@@ -116,17 +115,7 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
cuid := obj.Converger() // get the converger uid used to report status
var send = false // send event?
var exit = false
@@ -169,6 +158,11 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
bufioch, errch = obj.BufioChanScanner(scanner)
}
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
for {
obj.SetState(ResStateWatching) // reset
select {
@@ -199,15 +193,10 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
}
// do all our event sending all together to avoid duplicate msgs
if send {
startup = true // startup finished
send = false
// it is okay to invalidate the clean state on poke too
obj.StateOK(false) // something made state dirty

View File

@@ -30,7 +30,6 @@ import (
"path"
"path/filepath"
"strings"
"time"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/recwatch"
@@ -147,17 +146,7 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
cuid := obj.Converger() // get the converger uid used to report status
var err error
obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse)
@@ -166,6 +155,11 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
}
defer obj.recWatcher.Close()
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false // send event?
var exit = false
@@ -200,16 +194,10 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
obj.StateOK(false) // dirty
}
// do all our event sending all together to avoid duplicate msgs
if send {
startup = true // startup finished
send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK...

View File

@@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"log"
"time"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util"
@@ -114,17 +113,7 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error {
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
cuid := obj.Converger() // get the converger uid used to report status
// if we share the bus with others, we will get each others messages!!
bus, err := util.SystemBusPrivateUsable() // don't share the bus connection!
@@ -142,6 +131,11 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error {
signals := make(chan *dbus.Signal, 10) // closed by dbus package
bus.Signal(signals)
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false // send event?
for {
@@ -164,15 +158,10 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
}
// do all our event sending all together to avoid duplicate msgs
if send {
startup = true // startup finished
send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {

View File

@@ -23,7 +23,6 @@ import (
"log"
"regexp"
"strings"
"time"
"github.com/purpleidea/mgmt/event"
@@ -141,16 +140,11 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error {
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
cuid := obj.Converger() // get the converger uid used to report status
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false // send event?
@@ -168,15 +162,10 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
}
// do all our event sending all together to avoid duplicate msgs
if send {
startup = true // startup finished
send = false
// only do this on certain types of events
//obj.isStateOK = false // something made state dirty

View File

@@ -20,7 +20,6 @@ package resources
import (
"encoding/gob"
"log"
"time"
"github.com/purpleidea/mgmt/event"
)
@@ -65,16 +64,11 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error {
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
cuid := obj.Converger() // get the converger uid used to report status
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false // send event?
@@ -92,15 +86,10 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
}
// do all our event sending all together to avoid duplicate msgs
if send {
startup = true // startup finished
send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK...

View File

@@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"log"
"time"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util"
@@ -101,17 +100,7 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error {
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
}
// 1/2 the resolution of converged timeout
return time.After(time.Duration(500) * time.Millisecond)
}
cuid := obj.Converger() // get the converger uid used to report status
// this resource depends on systemd ensure that it's running
if !systemdUtil.IsRunningSystemd() {
@@ -135,6 +124,11 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error {
buschan := make(chan *dbus.Signal, 10)
bus.Signal(buschan)
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false
var exit = false
@@ -165,16 +159,10 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
obj.StateOK(false) // dirty
}
// do all our event sending all together to avoid duplicate msgs
if send {
startup = true // startup finished
send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK...

View File

@@ -27,7 +27,6 @@ import (
"os"
"path"
"strings"
"time"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/recwatch"
@@ -174,17 +173,7 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error {
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
cuid := obj.Converger() // get the converger uid used to report status
var err error
obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false)
@@ -193,6 +182,11 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error {
}
defer obj.recWatcher.Close()
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false // send event?
var exit = false
for {
@@ -220,15 +214,10 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
}
// do all our event sending all together to avoid duplicate msgs
if send {
startup = true // startup finished
send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK...

View File

@@ -23,7 +23,6 @@ import (
"log"
"path"
"strings"
"time"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/resources/packagekit"
@@ -115,17 +114,7 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
cuid := obj.Converger() // get the converger uid used to report status
bus := packagekit.NewBus()
if bus == nil {
@@ -138,6 +127,11 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
return errwrap.Wrapf(err, "Error adding signal match")
}
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false // send event?
var exit = false
@@ -175,16 +169,10 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
obj.StateOK(false) // dirty
}
// do all our event sending all together to avoid duplicate msgs
if send {
startup = true // startup finished
send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK...

View File

@@ -130,6 +130,9 @@ type Base interface {
AssociateData(*Data)
IsWatching() bool
SetWatching(bool)
RegisterConverger()
UnregisterConverger()
Converger() converger.ConvergerUID
GetState() ResState
SetState(ResState)
DoSend(chan event.Event, string) (bool, error)
@@ -147,6 +150,7 @@ type Base interface {
GetGroup() []Res // return everyone grouped inside me
SetGroup([]Res)
VarDir(string) (string, error)
Running(chan event.Event) error // notify the engine that Watch started
}
// Res is the minimum interface you need to implement to define a new resource.
@@ -171,7 +175,8 @@ type BaseRes struct {
kind string
events chan event.Event
converger converger.Converger // converged tracking
prefix string // base prefix for this resource
cuid converger.ConvergerUID
prefix string // base prefix for this resource
debug bool
state ResState
watching bool // is Watch() loop running ?
@@ -285,6 +290,24 @@ func (obj *BaseRes) SetWatching(b bool) {
obj.watching = b
}
// RegisterConverger sets up the cuid for the resource. This is a helper
// function for the engine, and shouldn't be called by the resources directly.
func (obj *BaseRes) RegisterConverger() {
obj.cuid = obj.converger.Register()
}
// UnregisterConverger tears down the cuid for the resource. This is a helper
// function for the engine, and shouldn't be called by the resources directly.
func (obj *BaseRes) UnregisterConverger() {
obj.cuid.Unregister()
}
// Converger returns the ConvergerUID for the resource. This should be called
// by the Watch method of the resource to set the converged state.
func (obj *BaseRes) Converger() converger.ConvergerUID {
return obj.cuid
}
// GetState returns the state of the resource.
func (obj *BaseRes) GetState() ResState {
return obj.state

View File

@@ -108,6 +108,19 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) {
return true, false // required to keep the stupid go compiler happy
}
// Running is called by the Watch method of the resource once it has started up.
// This signals to the engine to kick off the initial CheckApply resource check.
func (obj *BaseRes) Running(processChan chan event.Event) error {
obj.StateOK(false) // assume we're initially dirty
cuid := obj.Converger() // get the converger uid used to report status
cuid.SetConverged(false) // a reasonable initial assumption
// FIXME: exit return value is unused atm, so ignore it for now...
//if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
_, err := obj.DoSend(processChan, "")
return err // bubble up any possible error (or nil)
}
// Send points to a value that a resource will send.
type Send struct {
Res Res // a handle to the resource which is sending a value

View File

@@ -23,7 +23,6 @@ import (
"encoding/gob"
"fmt"
"log"
"time"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/util"
@@ -81,17 +80,7 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
cuid := obj.Converger() // get the converger uid used to report status
// obj.Name: svc name
if !systemdUtil.IsRunningSystemd() {
@@ -116,6 +105,11 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
buschan := make(chan *dbus.Signal, 10)
bus.Signal(buschan)
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name
var send = false // send event?
var exit = false
@@ -175,11 +169,6 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
obj.StateOK(false) // dirty
}
} else {
if !activeSet {
@@ -227,16 +216,10 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
obj.StateOK(false) // dirty
}
}
if send {
startup = true // startup finished
send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK...

View File

@@ -79,22 +79,17 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error {
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
cuid := obj.Converger() // get the converger uid used to report status
// create a time.Ticker for the given interval
obj.ticker = obj.newTicker()
defer obj.ticker.Stop()
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false
for {
@@ -113,13 +108,9 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer():
cuid.SetConverged(true)
continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
}
if send {
startup = true // startup finished
send = false
if exit, err := obj.DoSend(processChan, "timer ticked"); exit || err != nil {
return err // we exit or bubble up a NACK...

View File

@@ -137,17 +137,7 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
}
obj.SetWatching(true)
defer obj.SetWatching(false)
cuid := obj.converger.Register()
defer cuid.Unregister()
var startup bool
Startup := func(block bool) <-chan time.Time {
if block {
return nil // blocks forever
//return make(chan time.Time) // blocks forever
}
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
}
cuid := obj.Converger() // get the converger uid used to report status
conn, err := obj.connect()
if err != nil {
@@ -203,6 +193,11 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
)
defer conn.DomainEventDeregister(callbackID)
// notify engine that we're running
if err := obj.Running(processChan); err != nil {
return err // bubble up a NACK...
}
var send = false
var exit = false
@@ -260,15 +255,9 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
case <-cuid.ConvergedTimer():
cuid.SetConverged(true) // converged!
continue
case <-Startup(startup):
cuid.SetConverged(false)
send = true
obj.StateOK(false) // dirty
}
if send {
startup = true // startup finished
send = false
if exit, err := obj.DoSend(processChan, ""); exit || err != nil {
return err // we exit or bubble up a NACK...