diff --git a/pgraph/actions.go b/pgraph/actions.go index 4b7b6f6c..9e811f40 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -415,7 +415,9 @@ func (g *Graph) Worker(v *Vertex) error { } // TODO: reset the watch retry count after some amount of success + v.Res.RegisterConverger() e := v.Res.Watch(processChan) + v.Res.UnregisterConverger() if e == nil { // exit signal err = nil // clean exit break diff --git a/resources/exec.go b/resources/exec.go index 10576754..11a25b68 100644 --- a/resources/exec.go +++ b/resources/exec.go @@ -25,7 +25,6 @@ import ( "log" "os/exec" "strings" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/util" @@ -116,17 +115,7 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status var send = false // send event? var exit = false @@ -169,6 +158,11 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error { bufioch, errch = obj.BufioChanScanner(scanner) } + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + for { obj.SetState(ResStateWatching) // reset select { @@ -199,15 +193,10 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false // it is okay to invalidate the clean state on poke too obj.StateOK(false) // something made state dirty diff --git a/resources/file.go b/resources/file.go index e828cd1e..1c03b5da 100644 --- a/resources/file.go +++ b/resources/file.go @@ -30,7 +30,6 @@ import ( "path" "path/filepath" "strings" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/recwatch" @@ -147,17 +146,7 @@ func (obj *FileRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status var err error obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse) @@ -166,6 +155,11 @@ func (obj *FileRes) Watch(processChan chan event.Event) error { } defer obj.recWatcher.Close() + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false // send event? var exit = false @@ -200,16 +194,10 @@ func (obj *FileRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true - obj.StateOK(false) // dirty } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/hostname.go b/resources/hostname.go index 737a30f4..d235a130 100644 --- a/resources/hostname.go +++ b/resources/hostname.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "log" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/util" @@ -114,17 +113,7 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status // if we share the bus with others, we will get each others messages!! bus, err := util.SystemBusPrivateUsable() // don't share the bus connection! @@ -142,6 +131,11 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error { signals := make(chan *dbus.Signal, 10) // closed by dbus package bus.Signal(signals) + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false // send event? for { @@ -164,15 +158,10 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { diff --git a/resources/msg.go b/resources/msg.go index 2bf956fd..64e29a9d 100644 --- a/resources/msg.go +++ b/resources/msg.go @@ -23,7 +23,6 @@ import ( "log" "regexp" "strings" - "time" "github.com/purpleidea/mgmt/event" @@ -141,16 +140,11 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() + cuid := obj.Converger() // get the converger uid used to report status - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... } var send = false // send event? @@ -168,15 +162,10 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false // only do this on certain types of events //obj.isStateOK = false // something made state dirty diff --git a/resources/noop.go b/resources/noop.go index ed17a8ff..3e98c098 100644 --- a/resources/noop.go +++ b/resources/noop.go @@ -20,7 +20,6 @@ package resources import ( "encoding/gob" "log" - "time" "github.com/purpleidea/mgmt/event" ) @@ -65,16 +64,11 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() + cuid := obj.Converger() // get the converger uid used to report status - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... } var send = false // send event? @@ -92,15 +86,10 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/nspawn.go b/resources/nspawn.go index f1ebcf3a..58a0a884 100644 --- a/resources/nspawn.go +++ b/resources/nspawn.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "log" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/util" @@ -101,17 +100,7 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - } - // 1/2 the resolution of converged timeout - return time.After(time.Duration(500) * time.Millisecond) - } + cuid := obj.Converger() // get the converger uid used to report status // this resource depends on systemd ensure that it's running if !systemdUtil.IsRunningSystemd() { @@ -135,6 +124,11 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error { buschan := make(chan *dbus.Signal, 10) bus.Signal(buschan) + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false var exit = false @@ -165,16 +159,10 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true - obj.StateOK(false) // dirty } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/password.go b/resources/password.go index 589f6ca5..26bd0dad 100644 --- a/resources/password.go +++ b/resources/password.go @@ -27,7 +27,6 @@ import ( "os" "path" "strings" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/recwatch" @@ -174,17 +173,7 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status var err error obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false) @@ -193,6 +182,11 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error { } defer obj.recWatcher.Close() + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false // send event? var exit = false for { @@ -220,15 +214,10 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/pkg.go b/resources/pkg.go index 38e02d46..7b176852 100644 --- a/resources/pkg.go +++ b/resources/pkg.go @@ -23,7 +23,6 @@ import ( "log" "path" "strings" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/resources/packagekit" @@ -115,17 +114,7 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status bus := packagekit.NewBus() if bus == nil { @@ -138,6 +127,11 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error { return errwrap.Wrapf(err, "Error adding signal match") } + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false // send event? var exit = false @@ -175,16 +169,10 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true - obj.StateOK(false) // dirty } // do all our event sending all together to avoid duplicate msgs if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/resources.go b/resources/resources.go index 098ff5de..1eefd351 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -130,6 +130,9 @@ type Base interface { AssociateData(*Data) IsWatching() bool SetWatching(bool) + RegisterConverger() + UnregisterConverger() + Converger() converger.ConvergerUID GetState() ResState SetState(ResState) DoSend(chan event.Event, string) (bool, error) @@ -147,6 +150,7 @@ type Base interface { GetGroup() []Res // return everyone grouped inside me SetGroup([]Res) VarDir(string) (string, error) + Running(chan event.Event) error // notify the engine that Watch started } // Res is the minimum interface you need to implement to define a new resource. @@ -171,7 +175,8 @@ type BaseRes struct { kind string events chan event.Event converger converger.Converger // converged tracking - prefix string // base prefix for this resource + cuid converger.ConvergerUID + prefix string // base prefix for this resource debug bool state ResState watching bool // is Watch() loop running ? @@ -285,6 +290,24 @@ func (obj *BaseRes) SetWatching(b bool) { obj.watching = b } +// RegisterConverger sets up the cuid for the resource. This is a helper +// function for the engine, and shouldn't be called by the resources directly. +func (obj *BaseRes) RegisterConverger() { + obj.cuid = obj.converger.Register() +} + +// UnregisterConverger tears down the cuid for the resource. This is a helper +// function for the engine, and shouldn't be called by the resources directly. +func (obj *BaseRes) UnregisterConverger() { + obj.cuid.Unregister() +} + +// Converger returns the ConvergerUID for the resource. This should be called +// by the Watch method of the resource to set the converged state. +func (obj *BaseRes) Converger() converger.ConvergerUID { + return obj.cuid +} + // GetState returns the state of the resource. func (obj *BaseRes) GetState() ResState { return obj.state diff --git a/resources/sendrecv.go b/resources/sendrecv.go index 9df54cda..93c0e88b 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -108,6 +108,19 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) { return true, false // required to keep the stupid go compiler happy } +// Running is called by the Watch method of the resource once it has started up. +// This signals to the engine to kick off the initial CheckApply resource check. +func (obj *BaseRes) Running(processChan chan event.Event) error { + obj.StateOK(false) // assume we're initially dirty + cuid := obj.Converger() // get the converger uid used to report status + cuid.SetConverged(false) // a reasonable initial assumption + + // FIXME: exit return value is unused atm, so ignore it for now... + //if exit, err := obj.DoSend(processChan, ""); exit || err != nil { + _, err := obj.DoSend(processChan, "") + return err // bubble up any possible error (or nil) +} + // Send points to a value that a resource will send. type Send struct { Res Res // a handle to the resource which is sending a value diff --git a/resources/svc.go b/resources/svc.go index ee16b9d7..cb2b7c36 100644 --- a/resources/svc.go +++ b/resources/svc.go @@ -23,7 +23,6 @@ import ( "encoding/gob" "fmt" "log" - "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/util" @@ -81,17 +80,7 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status // obj.Name: svc name if !systemdUtil.IsRunningSystemd() { @@ -116,6 +105,11 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { buschan := make(chan *dbus.Signal, 10) bus.Signal(buschan) + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name var send = false // send event? var exit = false @@ -175,11 +169,6 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true - obj.StateOK(false) // dirty } } else { if !activeSet { @@ -227,16 +216,10 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true - obj.StateOK(false) // dirty } } if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/timer.go b/resources/timer.go index 7c35f7b8..0f022298 100644 --- a/resources/timer.go +++ b/resources/timer.go @@ -79,22 +79,17 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status // create a time.Ticker for the given interval obj.ticker = obj.newTicker() defer obj.ticker.Stop() + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false for { @@ -113,13 +108,9 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true } + if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, "timer ticked"); exit || err != nil { return err // we exit or bubble up a NACK... diff --git a/resources/virt.go b/resources/virt.go index 0c3afc74..c67ac0ea 100644 --- a/resources/virt.go +++ b/resources/virt.go @@ -137,17 +137,7 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error { } obj.SetWatching(true) defer obj.SetWatching(false) - cuid := obj.converger.Register() - defer cuid.Unregister() - - var startup bool - Startup := func(block bool) <-chan time.Time { - if block { - return nil // blocks forever - //return make(chan time.Time) // blocks forever - } - return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout - } + cuid := obj.Converger() // get the converger uid used to report status conn, err := obj.connect() if err != nil { @@ -203,6 +193,11 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error { ) defer conn.DomainEventDeregister(callbackID) + // notify engine that we're running + if err := obj.Running(processChan); err != nil { + return err // bubble up a NACK... + } + var send = false var exit = false @@ -260,15 +255,9 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error { case <-cuid.ConvergedTimer(): cuid.SetConverged(true) // converged! continue - - case <-Startup(startup): - cuid.SetConverged(false) - send = true - obj.StateOK(false) // dirty } if send { - startup = true // startup finished send = false if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK...