From 36b916f27f89d73ecb6bf1ae8ff7fe82a0b8e3fa Mon Sep 17 00:00:00 2001 From: James Shubin Date: Sun, 11 Dec 2016 22:22:53 -0500 Subject: [PATCH] resources: Simplify resource Converger and Startup code This takes the Converged initialization and Startup patterns that are common in all resources, and bakes it into the core engine. This way resource writing is much more concise and there is less boilerplate! --- pgraph/actions.go | 2 ++ resources/exec.go | 23 ++++++----------------- resources/file.go | 24 ++++++------------------ resources/hostname.go | 23 ++++++----------------- resources/msg.go | 19 ++++--------------- resources/noop.go | 19 ++++--------------- resources/nspawn.go | 24 ++++++------------------ resources/password.go | 23 ++++++----------------- resources/pkg.go | 24 ++++++------------------ resources/resources.go | 25 ++++++++++++++++++++++++- resources/sendrecv.go | 13 +++++++++++++ resources/svc.go | 29 ++++++----------------------- resources/timer.go | 23 +++++++---------------- resources/virt.go | 23 ++++++----------------- 14 files changed, 102 insertions(+), 192 deletions(-) diff --git a/pgraph/actions.go b/pgraph/actions.go index 4b7b6f6c..9e811f40 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -415,7 +415,9 @@ func (g *Graph) Worker(v *Vertex) error { } // TODO: reset the watch retry count after some amount of success + v.Res.RegisterConverger() e := v.Res.Watch(processChan) + v.Res.UnregisterConverger() if e == nil { // exit signal err = nil // clean exit break diff --git a/resources/exec.go b/resources/exec.go index 10576754..11a25b68 100644 --- a/resources/exec.go +++ b/resources/exec.go @@ -25,7 +25,6 @@ import ( "log" "os/exec" "strings" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/util" @@ -116,17 +115,7 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status var send = false // send event? var exit = false @@ -169,6 +158,11 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error { bufioch, errch = obj.BufioChanScanner(scanner) } + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + for { obj.SetState(ResStateWatching) // reset select { @@ -199,15 +193,10 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false // it is okay to invalidate the clean state on poke too obj.StateOK(false) // something made state dirty diff --git a/resources/file.go b/resources/file.go index e828cd1e..1c03b5da 100644 --- a/resources/file.go +++ b/resources/file.go @@ -30,7 +30,6 @@ import ( "path" "path/filepath" "strings" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/recwatch" @@ -147,17 +146,7 @@ func (obj *FileRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status var err error obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse) @@ -166,6 +155,11 @@ func (obj *FileRes) Watch(processChan chan event.Event) error { } defer obj.recWatcher.Close() + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false // send event? var exit = false @@ -200,16 +194,10 @@ func (obj *FileRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true - obj.StateOK(false) // dirty } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/hostname.go b/resources/hostname.go index 737a30f4..d235a130 100644 --- a/resources/hostname.go +++ b/resources/hostname.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "log" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/util" @@ -114,17 +113,7 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status // if we share the bus with others, we will get each others messages!! bus, err := util.SystemBusPrivateUsable() // don't share the bus connection! @@ -142,6 +131,11 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error { signals := make(chan *dbus.Signal, 10) // closed by dbus package bus.Signal(signals) + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false // send event? for { @@ -164,15 +158,10 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { diff --git a/resources/msg.go b/resources/msg.go index 2bf956fd..64e29a9d 100644 --- a/resources/msg.go +++ b/resources/msg.go @@ -23,7 +23,6 @@ import ( "log" "regexp" "strings" - "time" "github.com/purpleidea/mgmt/event" @@ -141,16 +140,11 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() + cuid := obj.Converger() // get the converger uid used to report status - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... } var send = false // send event? @@ -168,15 +162,10 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false // only do this on certain types of events //obj.isStateOK = false // something made state dirty diff --git a/resources/noop.go b/resources/noop.go index ed17a8ff..3e98c098 100644 --- a/resources/noop.go +++ b/resources/noop.go @@ -20,7 +20,6 @@ package resources import ( "encoding/gob" "log" - "time" "github.com/purpleidea/mgmt/event" ) @@ -65,16 +64,11 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() + cuid := obj.Converger() // get the converger uid used to report status - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... } var send = false // send event? @@ -92,15 +86,10 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/nspawn.go b/resources/nspawn.go index f1ebcf3a..58a0a884 100644 --- a/resources/nspawn.go +++ b/resources/nspawn.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "log" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/util" @@ -101,17 +100,7 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - } - // 1/2 the resolution of converged timeout - return time.After(time.Duration(500) * time.Millisecond) - } + cuid := obj.Converger() // get the converger uid used to report status // this resource depends on systemd ensure that it's running if !systemdUtil.IsRunningSystemd() { @@ -135,6 +124,11 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error { buschan := make(chan *dbus.Signal, 10) bus.Signal(buschan) + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false var exit = false @@ -165,16 +159,10 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true - obj.StateOK(false) // dirty } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/password.go b/resources/password.go index 589f6ca5..26bd0dad 100644 --- a/resources/password.go +++ b/resources/password.go @@ -27,7 +27,6 @@ import ( "os" "path" "strings" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/recwatch" @@ -174,17 +173,7 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status var err error obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false) @@ -193,6 +182,11 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error { } defer obj.recWatcher.Close() + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false // send event? var exit = false for { @@ -220,15 +214,10 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/pkg.go b/resources/pkg.go index 38e02d46..7b176852 100644 --- a/resources/pkg.go +++ b/resources/pkg.go @@ -23,7 +23,6 @@ import ( "log" "path" "strings" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/resources/packagekit" @@ -115,17 +114,7 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status bus := packagekit.NewBus() if bus == nil { @@ -138,6 +127,11 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error { return errwrap.Wrapf(err, "Error adding signal match") } + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false // send event? var exit = false @@ -175,16 +169,10 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true - obj.StateOK(false) // dirty } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/resources.go b/resources/resources.go index 098ff5de..1eefd351 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -130,6 +130,9 @@ type Base interface { AssociateData(*Data) IsWatching() bool SetWatching(bool) + RegisterConverger() + UnregisterConverger() + Converger() converger.ConvergerUID GetState() ResState SetState(ResState) DoSend(chan event.Event, string) (bool, error) @@ -147,6 +150,7 @@ type Base interface { GetGroup() []Res // return everyone grouped inside me SetGroup([]Res) VarDir(string) (string, error) + Running(chan event.Event) error // notify the engine that Watch started } // Res is the minimum interface you need to implement to define a new resource. @@ -171,7 +175,8 @@ type BaseRes struct { kind string events chan event.Event converger converger.Converger // converged tracking - prefix string // base prefix for this resource + cuid converger.ConvergerUID + prefix string // base prefix for this resource debug bool state ResState watching bool // is Watch() loop running ? @@ -285,6 +290,24 @@ func (obj *BaseRes) SetWatching(b bool) { obj.watching = b } +// RegisterConverger sets up the cuid for the resource. This is a helper +// function for the engine, and shouldn't be called by the resources directly. +func (obj *BaseRes) RegisterConverger() { + obj.cuid = obj.converger.Register() +} + +// UnregisterConverger tears down the cuid for the resource. This is a helper +// function for the engine, and shouldn't be called by the resources directly. +func (obj *BaseRes) UnregisterConverger() { + obj.cuid.Unregister() +} + +// Converger returns the ConvergerUID for the resource. This should be called +// by the Watch method of the resource to set the converged state. +func (obj *BaseRes) Converger() converger.ConvergerUID { + return obj.cuid +} + // GetState returns the state of the resource. func (obj *BaseRes) GetState() ResState { return obj.state diff --git a/resources/sendrecv.go b/resources/sendrecv.go index 9df54cda..93c0e88b 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -108,6 +108,19 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) { return true, false // required to keep the stupid go compiler happy } +// Running is called by the Watch method of the resource once it has started up. +// This signals to the engine to kick off the initial CheckApply resource check. +func (obj *BaseRes) Running(processChan chan event.Event) error { + obj.StateOK(false) // assume we're initially dirty + cuid := obj.Converger() // get the converger uid used to report status + cuid.SetConverged(false) // a reasonable initial assumption + + // FIXME: exit return value is unused atm, so ignore it for now... + //if exit, err := obj.DoSend(processChan, ""); exit || err != nil { + _, err := obj.DoSend(processChan, "") + return err // bubble up any possible error (or nil) +} + // Send points to a value that a resource will send. type Send struct { Res Res // a handle to the resource which is sending a value diff --git a/resources/svc.go b/resources/svc.go index ee16b9d7..cb2b7c36 100644 --- a/resources/svc.go +++ b/resources/svc.go @@ -23,7 +23,6 @@ import ( "encoding/gob" "fmt" "log" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/util" @@ -81,17 +80,7 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status // obj.Name: svc name if !systemdUtil.IsRunningSystemd() { @@ -116,6 +105,11 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { buschan := make(chan *dbus.Signal, 10) bus.Signal(buschan) + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name var send = false // send event? var exit = false @@ -175,11 +169,6 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true - obj.StateOK(false) // dirty } } else { if !activeSet { @@ -227,16 +216,10 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true - obj.StateOK(false) // dirty } } if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/timer.go b/resources/timer.go index 7c35f7b8..0f022298 100644 --- a/resources/timer.go +++ b/resources/timer.go @@ -79,22 +79,17 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status // create a time.Ticker for the given interval obj.ticker = obj.newTicker() defer obj.ticker.Stop() + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false for { @@ -113,13 +108,9 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true } + if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, "timer ticked"); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/virt.go b/resources/virt.go index 0c3afc74..c67ac0ea 100644 --- a/resources/virt.go +++ b/resources/virt.go @@ -137,17 +137,7 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status conn, err := obj.connect() if err != nil { @@ -203,6 +193,11 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error { ) defer conn.DomainEventDeregister(callbackID) + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false var exit = false @@ -260,15 +255,9 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true - obj.StateOK(false) // dirty } if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK...