diff --git a/docs/resource-guide.md b/docs/resource-guide.md index fdc21060..41b10c1c 100644 --- a/docs/resource-guide.md +++ b/docs/resource-guide.md @@ -273,7 +273,7 @@ sending out erroneous `Event` messages to keep things alive until it finishes. #### Example ```golang // Watch is the listener and main loop for this resource. -func (obj *FooRes) Watch(processChan chan *event.Event) error { +func (obj *FooRes) Watch() error { // setup the Foo resource var err error if err, obj.foo = OpenFoo(); err != nil { @@ -282,7 +282,7 @@ func (obj *FooRes) Watch(processChan chan *event.Event) error { defer obj.whatever.CloseFoo() // shutdown our // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -311,7 +311,7 @@ func (obj *FooRes) Watch(processChan chan *event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - obj.Event(processChan) + obj.Event() // send the event! } } } diff --git a/pgraph/actions.go b/pgraph/actions.go index 4c0e53f6..77ee5a8c 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -235,8 +235,8 @@ func (g *Graph) Process(v *Vertex) error { // would simplify the resources by removing boilerplate if v.Meta().Poll > 0 { if !checkOK { // something changed, restart timer - cuid := v.Res.ConvergerUID() // get the converger uid used to report status - cuid.ResetTimer() // activity! + cuid, _, _ := v.Res.ConvergerUIDs() // get the converger uid used to report status + cuid.ResetTimer() // activity! if g.Flags.Debug { log.Printf("%s[%s]: Converger: ResetTimer", obj.Kind(), obj.GetName()) } @@ -306,6 +306,149 @@ func (obj *SentinelErr) Error() string { return obj.err.Error() } +// innerWorker is the CheckApply runner that reads from processChan. +func (g *Graph) innerWorker(v *Vertex) { + obj := v.Res + running := false + done := make(chan struct{}) + playback := false // do we need to run another one? + _, wcuid, pcuid := obj.ConvergerUIDs() // get extra cuids (worker, process) + + waiting := false + var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration + if !timer.Stop() { + <-timer.C // unnecessary, shouldn't happen + } + + var delay = time.Duration(v.Meta().Delay) * time.Millisecond + var retry = v.Meta().Retry // number of tries left, -1 for infinite + var limiter = rate.NewLimiter(v.Meta().Limit, v.Meta().Burst) + limited := false + +Loop: + for { + select { + case ev, ok := <-obj.ProcessChan(): // must use like this + if !ok { // processChan closed, let's exit + break Loop // no event, so no ack! + } + if v.Res.Meta().Poll == 0 { // skip for polling + wcuid.SetConverged(false) + } + + // if process started, but no action yet, skip! + if v.Res.GetState() == resources.ResStateProcess { + if g.Flags.Debug { + log.Printf("%s[%s]: Skipped event!", v.Kind(), v.GetName()) + } + ev.ACK() // ready for next message + continue + } + + // if running, we skip running a new execution! + // if waiting, we skip running a new execution! + if running || waiting { + if g.Flags.Debug { + log.Printf("%s[%s]: Playback added!", v.Kind(), v.GetName()) + } + playback = true + ev.ACK() // ready for next message + continue + } + + // catch invalid rates + if v.Meta().Burst == 0 && !(v.Meta().Limit == rate.Inf) { // blocked + e := fmt.Errorf("%s[%s]: Permanently limited (rate != Inf, burst: 0)", v.Kind(), v.GetName()) + v.SendEvent(event.EventExit, &SentinelErr{e}) + ev.ACK() // ready for next message + continue + } + + // rate limit + // FIXME: consider skipping rate limit check if + // the event is a poke instead of a watch event + if !limited && !(v.Meta().Limit == rate.Inf) { // skip over the playback event... + now := time.Now() + r := limiter.ReserveN(now, 1) // one event + // r.OK() seems to always be true here! + d := r.DelayFrom(now) + if d > 0 { // delay + limited = true + playback = true + log.Printf("%s[%s]: Limited (rate: %v/sec, burst: %d, next: %v)", v.Kind(), v.GetName(), v.Meta().Limit, v.Meta().Burst, d) + // start the timer... + timer.Reset(d) + waiting = true // waiting for retry timer + ev.ACK() + continue + } // otherwise, we run directly! + } + limited = false // let one through + + running = true + go func(ev *event.Event) { + pcuid.SetConverged(false) // "block" Process + if e := g.Process(v); e != nil { + playback = true + log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e) + if retry == 0 { + // wrap the error in the sentinel + v.SendEvent(event.EventExit, &SentinelErr{e}) + return + } + if retry > 0 { // don't decrement the -1 + retry-- + } + log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry) + // start the timer... + timer.Reset(delay) + waiting = true // waiting for retry timer + return + } + retry = v.Meta().Retry // reset on success + close(done) // trigger + }(ev) + ev.ACK() // sync (now mostly useless) + + case <-timer.C: + if v.Res.Meta().Poll == 0 { // skip for polling + wcuid.SetConverged(false) + } + waiting = false + if !timer.Stop() { + //<-timer.C // blocks, docs are wrong! + } + log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName()) + close(done) + + // a CheckApply run (with possibly retry pause) finished + case <-done: + if v.Res.Meta().Poll == 0 { // skip for polling + wcuid.SetConverged(false) + } + if g.Flags.Debug { + log.Printf("%s[%s]: CheckApply finished!", v.Kind(), v.GetName()) + } + done = make(chan struct{}) // reset + // re-send this event, to trigger a CheckApply() + if playback { + playback = false + // this lock avoids us sending to + // channel after we've closed it! + // TODO: can this experience indefinite postponement ? + // see: https://github.com/golang/go/issues/11506 + go obj.Event() // replay a new event + } + running = false + pcuid.SetConverged(true) // "unblock" Process + + case <-wcuid.ConvergedTimer(): + wcuid.SetConverged(true) // converged! + continue + } + } +} + // Worker is the common run frontend of the vertex. It handles all of the retry // and retry delay common code, and ultimately returns the final status of this // vertex execution. @@ -316,172 +459,30 @@ func (g *Graph) Worker(v *Vertex) error { // the Watch() function about which graph it is // running on, which isolates things nicely... obj := v.Res - - // run the init (should match 1-1 with Close function if this succeeds) + if g.Flags.Debug { + log.Printf("%s[%s]: Worker: Running", v.Kind(), v.GetName()) + defer log.Printf("%s[%s]: Worker: Stopped", v.Kind(), v.GetName()) + } + // run the init (should match 1-1 with Close function) if err := obj.Init(); err != nil { + // always exit the worker function by finishing with Close() + if e := obj.Close(); e != nil { + err = multierr.Append(err, e) // list of errors + } return errwrap.Wrapf(err, "could not Init() resource") } - lock := &sync.Mutex{} // lock around processChan closing and sending - finished := false // did we close processChan ? - processChan := make(chan *event.Event) - // if the CheckApply run takes longer than the converged // timeout, we could inappropriately converge mid-apply! // avoid this by blocking convergence with a fake report // we also add a similar blocker around the worker loop! - wcuid := obj.Converger().Register() // get an extra cuid for the worker! - defer wcuid.Unregister() - wcuid.SetConverged(true) // starts off false, and waits for loop timeout - pcuid := obj.Converger().Register() // get an extra cuid for the process - defer pcuid.Unregister() + _, wcuid, pcuid := obj.ConvergerUIDs() // get extra cuids (worker, process) + // XXX: put these in Init() ? + wcuid.SetConverged(true) // starts off false, and waits for loop timeout pcuid.SetConverged(true) // starts off true, because it's not running... - go func() { - running := false - done := make(chan struct{}) - playback := false // do we need to run another one? + go g.innerWorker(v) - waiting := false - var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration - if !timer.Stop() { - <-timer.C // unnecessary, shouldn't happen - } - - var delay = time.Duration(v.Meta().Delay) * time.Millisecond - var retry = v.Meta().Retry // number of tries left, -1 for infinite - var limiter = rate.NewLimiter(v.Meta().Limit, v.Meta().Burst) - limited := false - - Loop: - for { - select { - case ev, ok := <-processChan: // must use like this - if !ok { // processChan closed, let's exit - break Loop // no event, so no ack! - } - if v.Res.Meta().Poll == 0 { // skip for polling - wcuid.SetConverged(false) - } - - // if process started, but no action yet, skip! - if v.Res.GetState() == resources.ResStateProcess { - if g.Flags.Debug { - log.Printf("%s[%s]: Skipped event!", v.Kind(), v.GetName()) - } - ev.ACK() // ready for next message - continue - } - - // if running, we skip running a new execution! - // if waiting, we skip running a new execution! - if running || waiting { - if g.Flags.Debug { - log.Printf("%s[%s]: Playback added!", v.Kind(), v.GetName()) - } - playback = true - ev.ACK() // ready for next message - continue - } - - // catch invalid rates - if v.Meta().Burst == 0 && !(v.Meta().Limit == rate.Inf) { // blocked - e := fmt.Errorf("%s[%s]: Permanently limited (rate != Inf, burst: 0)", v.Kind(), v.GetName()) - v.SendEvent(event.EventExit, &SentinelErr{e}) - ev.ACK() // ready for next message - continue - } - - // rate limit - // FIXME: consider skipping rate limit check if - // the event is a poke instead of a watch event - if !limited && !(v.Meta().Limit == rate.Inf) { // skip over the playback event... - now := time.Now() - r := limiter.ReserveN(now, 1) // one event - // r.OK() seems to always be true here! - d := r.DelayFrom(now) - if d > 0 { // delay - limited = true - playback = true - log.Printf("%s[%s]: Limited (rate: %v/sec, burst: %d, next: %v)", v.Kind(), v.GetName(), v.Meta().Limit, v.Meta().Burst, d) - // start the timer... - timer.Reset(d) - waiting = true // waiting for retry timer - ev.ACK() - continue - } // otherwise, we run directly! - } - limited = false // let one through - - running = true - go func(ev *event.Event) { - pcuid.SetConverged(false) // "block" Process - if e := g.Process(v); e != nil { - playback = true - log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e) - if retry == 0 { - // wrap the error in the sentinel - v.SendEvent(event.EventExit, &SentinelErr{e}) - return - } - if retry > 0 { // don't decrement the -1 - retry-- - } - log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry) - // start the timer... - timer.Reset(delay) - waiting = true // waiting for retry timer - return - } - retry = v.Meta().Retry // reset on success - close(done) // trigger - }(ev) - ev.ACK() // sync (now mostly useless) - - case <-timer.C: - if v.Res.Meta().Poll == 0 { // skip for polling - wcuid.SetConverged(false) - } - waiting = false - if !timer.Stop() { - //<-timer.C // blocks, docs are wrong! - } - log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName()) - close(done) - - // a CheckApply run (with possibly retry pause) finished - case <-done: - if v.Res.Meta().Poll == 0 { // skip for polling - wcuid.SetConverged(false) - } - if g.Flags.Debug { - log.Printf("%s[%s]: CheckApply finished!", v.Kind(), v.GetName()) - } - done = make(chan struct{}) // reset - // re-send this event, to trigger a CheckApply() - if playback { - playback = false - // this lock avoids us sending to - // channel after we've closed it! - lock.Lock() - go func() { - if !finished { - // TODO: can this experience indefinite postponement ? - // see: https://github.com/golang/go/issues/11506 - obj.Event(processChan) // replay a new event - } - lock.Unlock() - }() - } - running = false - pcuid.SetConverged(true) // "unblock" Process - - case <-wcuid.ConvergedTimer(): - wcuid.SetConverged(true) // converged! - continue - } - } - }() var err error // propagate the error up (this is a permanent BAD error!) // the watch delay runs inside of the Watch resource loop, so that it // can still process signals and exit if needed. It shouldn't run any @@ -545,24 +546,22 @@ func (g *Graph) Worker(v *Vertex) error { // NOTE: we can avoid the send if running Watch guarantees // one CheckApply event on startup! //if pendingSendEvent { // TODO: should this become a list in the future? - // if exit, err := obj.DoSend(processChan, ""); exit || err != nil { + // if err := obj.Event() err != nil { // return err // we exit or bubble up a NACK... // } //} } // TODO: reset the watch retry count after some amount of success - v.Res.RegisterConverger() var e error if v.Res.Meta().Poll > 0 { // poll instead of watching :( - cuid := v.Res.ConvergerUID() // get the converger uid used to report status + cuid, _, _ := v.Res.ConvergerUIDs() // get the converger uid used to report status cuid.StartTimer() - e = v.Res.Poll(processChan) + e = v.Res.Poll() cuid.StopTimer() // clean up nicely } else { - e = v.Res.Watch(processChan) // run the watch normally + e = v.Res.Watch() // run the watch normally } - v.Res.UnregisterConverger() if e == nil { // exit signal err = nil // clean exit break @@ -586,10 +585,6 @@ func (g *Graph) Worker(v *Vertex) error { // by getting the Watch resource to send one event once it's up! //v.SendEvent(eventPoke, false, false) } - lock.Lock() // lock to avoid a send when closed! - finished = true - close(processChan) - lock.Unlock() // close resource and return possible errors if any if e := obj.Close(); err == nil { @@ -635,6 +630,7 @@ func (g *Graph) Start(first bool) { // start or continue // TODO: if a sufficient number of workers error, // should something be done? Should these restart // after perma-failure if we have a graph change? + log.Printf("%s[%s]: Started", vv.Kind(), vv.GetName()) if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err) return diff --git a/resources/augeas.go b/resources/augeas.go index 89c82025..e8e4f08d 100644 --- a/resources/augeas.go +++ b/resources/augeas.go @@ -25,7 +25,6 @@ import ( "os" "strings" - "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/recwatch" errwrap "github.com/pkg/errors" @@ -101,7 +100,7 @@ func (obj *AugeasRes) Init() error { // Watch is the primary listener for this resource and it outputs events. // Taken from the File resource. // FIXME: DRY - This is taken from the file resource -func (obj *AugeasRes) Watch(processChan chan *event.Event) error { +func (obj *AugeasRes) Watch() error { var err error obj.recWatcher, err = recwatch.NewRecWatcher(obj.File, false) if err != nil { @@ -110,7 +109,7 @@ func (obj *AugeasRes) Watch(processChan chan *event.Event) error { defer obj.recWatcher.Close() // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -146,7 +145,7 @@ func (obj *AugeasRes) Watch(processChan chan *event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - obj.Event(processChan) + obj.Event() } } } diff --git a/resources/exec.go b/resources/exec.go index 09ef4a07..7266bdd4 100644 --- a/resources/exec.go +++ b/resources/exec.go @@ -26,7 +26,6 @@ import ( "os/exec" "strings" - "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/util" errwrap "github.com/pkg/errors" @@ -98,7 +97,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan } // Watch is the primary listener for this resource and it outputs events. -func (obj *ExecRes) Watch(processChan chan *event.Event) error { +func (obj *ExecRes) Watch() error { var send = false // send event? var exit *error bufioch, errch := make(chan string), make(chan error) @@ -141,7 +140,7 @@ func (obj *ExecRes) Watch(processChan chan *event.Event) error { } // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -174,7 +173,7 @@ func (obj *ExecRes) Watch(processChan chan *event.Event) error { send = false // it is okay to invalidate the clean state on poke too obj.StateOK(false) // something made state dirty - obj.Event(processChan) + obj.Event() } } } diff --git a/resources/file.go b/resources/file.go index 8664f3aa..90305fd2 100644 --- a/resources/file.go +++ b/resources/file.go @@ -34,7 +34,6 @@ import ( "strings" "syscall" - "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/recwatch" "github.com/purpleidea/mgmt/util" @@ -181,7 +180,7 @@ func (obj *FileRes) GetPath() string { // If the Watch returns an error, it means that something has gone wrong, and it // must be restarted. On a clean exit it returns nil. // FIXME: Also watch the source directory when using obj.Source !!! -func (obj *FileRes) Watch(processChan chan *event.Event) error { +func (obj *FileRes) Watch() error { var err error obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, obj.Recurse) if err != nil { @@ -190,7 +189,7 @@ func (obj *FileRes) Watch(processChan chan *event.Event) error { defer obj.recWatcher.Close() // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -226,7 +225,7 @@ func (obj *FileRes) Watch(processChan chan *event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - obj.Event(processChan) + obj.Event() } } } diff --git a/resources/hostname.go b/resources/hostname.go index d39b86dd..896445b4 100644 --- a/resources/hostname.go +++ b/resources/hostname.go @@ -23,7 +23,6 @@ import ( "fmt" "log" - "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/util" "github.com/godbus/dbus" @@ -102,7 +101,7 @@ func (obj *HostnameRes) Init() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *HostnameRes) Watch(processChan chan *event.Event) error { +func (obj *HostnameRes) Watch() error { // if we share the bus with others, we will get each others messages!! bus, err := util.SystemBusPrivateUsable() // don't share the bus connection! if err != nil { @@ -120,7 +119,7 @@ func (obj *HostnameRes) Watch(processChan chan *event.Event) error { bus.Signal(signals) // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -144,7 +143,7 @@ func (obj *HostnameRes) Watch(processChan chan *event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - obj.Event(processChan) + obj.Event() } } } diff --git a/resources/msg.go b/resources/msg.go index 48ae4dee..5d06b03e 100644 --- a/resources/msg.go +++ b/resources/msg.go @@ -24,8 +24,6 @@ import ( "regexp" "strings" - "github.com/purpleidea/mgmt/event" - "github.com/coreos/go-systemd/journal" ) @@ -122,9 +120,9 @@ func (obj *MsgRes) journalPriority() journal.Priority { } // Watch is the primary listener for this resource and it outputs events. -func (obj *MsgRes) Watch(processChan chan *event.Event) error { +func (obj *MsgRes) Watch() error { // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -142,7 +140,7 @@ func (obj *MsgRes) Watch(processChan chan *event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - obj.Event(processChan) + obj.Event() } } } diff --git a/resources/noop.go b/resources/noop.go index 9b41fbb4..990eb136 100644 --- a/resources/noop.go +++ b/resources/noop.go @@ -21,8 +21,6 @@ import ( "encoding/gob" "fmt" "log" - - "github.com/purpleidea/mgmt/event" ) func init() { @@ -56,9 +54,9 @@ func (obj *NoopRes) Init() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *NoopRes) Watch(processChan chan *event.Event) error { +func (obj *NoopRes) Watch() error { // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -76,7 +74,7 @@ func (obj *NoopRes) Watch(processChan chan *event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - obj.Event(processChan) + obj.Event() } } } diff --git a/resources/nspawn.go b/resources/nspawn.go index bc984d14..7056d280 100644 --- a/resources/nspawn.go +++ b/resources/nspawn.go @@ -23,7 +23,6 @@ import ( "fmt" "log" - "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/util" systemdUtil "github.com/coreos/go-systemd/util" @@ -98,7 +97,7 @@ func (obj *NspawnRes) Init() error { } // Watch for state changes and sends a message to the bus if there is a change -func (obj *NspawnRes) Watch(processChan chan *event.Event) error { +func (obj *NspawnRes) Watch() error { // this resource depends on systemd ensure that it's running if !systemdUtil.IsRunningSystemd() { return fmt.Errorf("Systemd is not running.") @@ -122,7 +121,7 @@ func (obj *NspawnRes) Watch(processChan chan *event.Event) error { bus.Signal(buschan) // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -155,7 +154,7 @@ func (obj *NspawnRes) Watch(processChan chan *event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - obj.Event(processChan) + obj.Event() } } } diff --git a/resources/password.go b/resources/password.go index 69ad1c45..a5a489bb 100644 --- a/resources/password.go +++ b/resources/password.go @@ -28,7 +28,6 @@ import ( "path" "strings" - "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/recwatch" errwrap "github.com/pkg/errors" @@ -165,7 +164,7 @@ Loop: } // Watch is the primary listener for this resource and it outputs events. -func (obj *PasswordRes) Watch(processChan chan *event.Event) error { +func (obj *PasswordRes) Watch() error { var err error obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false) if err != nil { @@ -174,7 +173,7 @@ func (obj *PasswordRes) Watch(processChan chan *event.Event) error { defer obj.recWatcher.Close() // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -203,7 +202,7 @@ func (obj *PasswordRes) Watch(processChan chan *event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - obj.Event(processChan) + obj.Event() } } } diff --git a/resources/pkg.go b/resources/pkg.go index daa04e99..e74817b0 100644 --- a/resources/pkg.go +++ b/resources/pkg.go @@ -24,7 +24,6 @@ import ( "path" "strings" - "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/resources/packagekit" "github.com/purpleidea/mgmt/util" @@ -104,7 +103,7 @@ func (obj *PkgRes) Init() error { // It uses the PackageKit UpdatesChanged signal to watch for changes. // TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/110 -func (obj *PkgRes) Watch(processChan chan *event.Event) error { +func (obj *PkgRes) Watch() error { bus := packagekit.NewBus() if bus == nil { return fmt.Errorf("Can't connect to PackageKit bus.") @@ -117,7 +116,7 @@ func (obj *PkgRes) Watch(processChan chan *event.Event) error { } // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -156,7 +155,7 @@ func (obj *PkgRes) Watch(processChan chan *event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - obj.Event(processChan) + obj.Event() } } } diff --git a/resources/resources.go b/resources/resources.go index ec1feca9..490d0032 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -141,12 +141,10 @@ type Base interface { AssociateData(*Data) IsWorking() bool Converger() converger.Converger - RegisterConverger() - UnregisterConverger() - ConvergerUID() converger.ConvergerUID + ConvergerUIDs() (converger.ConvergerUID, converger.ConvergerUID, converger.ConvergerUID) GetState() ResState SetState(ResState) - Event(chan *event.Event) error + Event() error SendEvent(event.EventName, error) error ReadEvent(*event.Event) (*error, bool) Refresh() bool // is there a pending refresh to run? @@ -161,10 +159,11 @@ type Base interface { GetGroup() []Res // return everyone grouped inside me SetGroup([]Res) VarDir(string) (string, error) - Running(chan *event.Event) error // notify the engine that Watch started - Started() <-chan struct{} // returns when the resource has started + Running() error // notify the engine that Watch started + Started() <-chan struct{} // returns when the resource has started Starter(bool) - Poll(chan *event.Event) error // poll alternative to watching :( + Poll() error // poll alternative to watching :( + ProcessChan() chan *event.Event Prometheus() *prometheus.Prometheus } @@ -175,8 +174,8 @@ type Res interface { Validate() error Init() error Close() error - UIDs() []ResUID // most resources only return one - Watch(chan *event.Event) error // send on channel to signal process() events + UIDs() []ResUID // most resources only return one + Watch() error // send on channel to signal process() events CheckApply(apply bool) (checkOK bool, err error) AutoEdges() AutoEdge Compare(Res) bool @@ -190,23 +189,28 @@ type BaseRes struct { MetaParams MetaParams `yaml:"meta"` // struct of all the metaparams Recv map[string]*Send // mapping of key to receive on from value - kind string - mutex *sync.Mutex // locks around sending and closing of events channel - events chan *event.Event - converger converger.Converger // converged tracking - cuid converger.ConvergerUID - prometheus *prometheus.Prometheus - prefix string // base prefix for this resource - debug bool - state ResState - working bool // is the Worker() loop running ? - started chan struct{} // closed when worker is started/running - isStarted bool // did the started chan already close? - starter bool // does this have indegree == 0 ? XXX: usually? - isStateOK bool // whether the state is okay based on events or not - isGrouped bool // am i contained within a group? - grouped []Res // list of any grouped resources - refresh bool // does this resource have a refresh to run? + kind string + mutex *sync.Mutex // locks around sending and closing of events channel + events chan *event.Event + converger converger.Converger // converged tracking + cuid converger.ConvergerUID + wcuid converger.ConvergerUID + pcuid converger.ConvergerUID + prometheus *prometheus.Prometheus + prefix string // base prefix for this resource + debug bool + state ResState + working bool // is the Worker() loop running ? + started chan struct{} // closed when worker is started/running + isStarted bool // did the started chan already close? + starter bool // does this have indegree == 0 ? XXX: usually? + isStateOK bool // whether the state is okay based on events or not + isGrouped bool // am i contained within a group? + grouped []Res // list of any grouped resources + processLock *sync.Mutex + processDone bool + processChan chan *event.Event + refresh bool // does this resource have a refresh to run? //refreshState StatefulBool // TODO: future stateful bool } @@ -291,7 +295,13 @@ func (obj *BaseRes) Init() error { if obj.kind == "" { return fmt.Errorf("Resource did not set kind!") } + + obj.cuid = obj.converger.Register() + obj.wcuid = obj.converger.Register() // get a cuid for the worker! + obj.pcuid = obj.converger.Register() // get a cuid for the process + obj.mutex = &sync.Mutex{} + obj.working = true // Worker method should now be running... obj.events = make(chan *event.Event) // unbuffered chan to avoid stale events obj.started = make(chan struct{}) // closes when started @@ -303,6 +313,10 @@ func (obj *BaseRes) Init() error { obj.Meta().Limit = rate.Inf } + obj.processLock = &sync.Mutex{} // lock around processChan closing and sending + obj.processDone = false // did we close processChan ? + obj.processChan = make(chan *event.Event) + //dir, err := obj.VarDir("") //if err != nil { // return errwrap.Wrapf(err, "VarDir failed in Init()") @@ -310,7 +324,6 @@ func (obj *BaseRes) Init() error { // TODO: this StatefulBool implementation could be eventually swappable //obj.refreshState = &DiskBool{Path: path.Join(dir, refreshPathToken)} - obj.working = true // Worker method should now be running... return nil } @@ -319,10 +332,21 @@ func (obj *BaseRes) Close() error { if obj.debug { log.Printf("%s[%s]: Close()", obj.Kind(), obj.GetName()) } + + obj.processLock.Lock() // lock to avoid a send when closed! + obj.processDone = true + close(obj.processChan) + obj.processLock.Unlock() + obj.mutex.Lock() obj.working = false // Worker method should now be closing... close(obj.events) // this is where we properly close this channel! obj.mutex.Unlock() + + obj.pcuid.Unregister() + obj.wcuid.Unregister() + obj.cuid.Unregister() + return nil } @@ -375,22 +399,11 @@ func (obj *BaseRes) Converger() converger.Converger { return obj.converger } -// RegisterConverger sets up the cuid for the resource. This is a helper -// function for the engine, and shouldn't be called by the resources directly. -func (obj *BaseRes) RegisterConverger() { - obj.cuid = obj.converger.Register() -} - -// UnregisterConverger tears down the cuid for the resource. This is a helper -// function for the engine, and shouldn't be called by the resources directly. -func (obj *BaseRes) UnregisterConverger() { - obj.cuid.Unregister() -} - -// ConvergerUID returns the ConvergerUID for the resource. This should be called -// by the Watch method of the resource to set the converged state. -func (obj *BaseRes) ConvergerUID() converger.ConvergerUID { - return obj.cuid +// ConvergerUIDs returns the ConvergerUIDs for the resource. This is called by +// the various methods that need one of these ConvergerUIDs. They are registered +// by the Init method and unregistered on the resource Close. +func (obj *BaseRes) ConvergerUIDs() (cuid converger.ConvergerUID, wcuid converger.ConvergerUID, pcuid converger.ConvergerUID) { + return obj.cuid, obj.wcuid, obj.pcuid } // GetState returns the state of the resource. @@ -416,6 +429,11 @@ func (obj *BaseRes) StateOK(b bool) { obj.isStateOK = b } +// ProcessChan returns the chan that resources send events to. Internal API! +func (obj *BaseRes) ProcessChan() chan *event.Event { + return obj.processChan +} + // GroupCmp compares two resources and decides if they're suitable for grouping // You'll probably want to override this method when implementing a resource... func (obj *BaseRes) GroupCmp(res Res) bool { @@ -528,15 +546,15 @@ func (obj *BaseRes) Started() <-chan struct{} { return obj.started } func (obj *BaseRes) Starter(b bool) { obj.starter = b } // Poll is the watch replacement for when we want to poll, which outputs events. -func (obj *BaseRes) Poll(processChan chan *event.Event) error { - cuid := obj.ConvergerUID() // get the converger uid used to report status +func (obj *BaseRes) Poll() error { + cuid, _, _ := obj.ConvergerUIDs() // get the converger uid used to report status // create a time.Ticker for the given interval ticker := time.NewTicker(time.Duration(obj.Meta().Poll) * time.Second) defer ticker.Stop() // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } cuid.SetConverged(false) // quickly stop any converge due to Running() @@ -559,7 +577,7 @@ func (obj *BaseRes) Poll(processChan chan *event.Event) error { if send { send = false - obj.Event(processChan) + obj.Event() } } } diff --git a/resources/sendrecv.go b/resources/sendrecv.go index bde89c74..96f51639 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -29,9 +29,13 @@ import ( ) // Event sends off an event, but doesn't block the incoming event queue. -func (obj *BaseRes) Event(processChan chan *event.Event) error { +func (obj *BaseRes) Event() error { resp := event.NewResp() - processChan <- &event.Event{Name: event.EventNil, Resp: resp} // trigger process + obj.processLock.Lock() + if !obj.processDone { + obj.processChan <- &event.Event{Name: event.EventNil, Resp: resp} // trigger process + } + obj.processLock.Unlock() return resp.Wait() } @@ -97,14 +101,14 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) { // Running is called by the Watch method of the resource once it has started up. // This signals to the engine to kick off the initial CheckApply resource check. -func (obj *BaseRes) Running(processChan chan *event.Event) error { +func (obj *BaseRes) Running() error { // TODO: If a non-polling resource wants to use the converger, then it // should probably tell Running (via an arg) to not do this. Currently // it is a very unlikey race that could cause an early converge if the // converge timeout is very short ( ~ 1s) and the Watch method doesn't // immediately SetConverged(false) to stop possible early termination. if obj.Meta().Poll == 0 { // if not polling, unblock this... - cuid := obj.ConvergerUID() + cuid, _, _ := obj.ConvergerUIDs() cuid.SetConverged(true) // a reasonable initial assumption } @@ -116,7 +120,7 @@ func (obj *BaseRes) Running(processChan chan *event.Event) error { var err error if obj.starter { // vertices of indegree == 0 should send initial pokes - err = obj.Event(processChan) // trigger a CheckApply + err = obj.Event() // trigger a CheckApply } return err // bubble up any possible error (or nil) } diff --git a/resources/svc.go b/resources/svc.go index 35fcdfcd..0dba98fc 100644 --- a/resources/svc.go +++ b/resources/svc.go @@ -24,7 +24,6 @@ import ( "fmt" "log" - "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/util" systemd "github.com/coreos/go-systemd/dbus" // change namespace @@ -71,7 +70,7 @@ func (obj *SvcRes) Init() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *SvcRes) Watch(processChan chan *event.Event) error { +func (obj *SvcRes) Watch() error { // obj.Name: svc name if !systemdUtil.IsRunningSystemd() { return fmt.Errorf("Systemd is not running.") @@ -96,7 +95,7 @@ func (obj *SvcRes) Watch(processChan chan *event.Event) error { bus.Signal(buschan) // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -197,7 +196,7 @@ func (obj *SvcRes) Watch(processChan chan *event.Event) error { if send { send = false - obj.Event(processChan) + obj.Event() } } } diff --git a/resources/timer.go b/resources/timer.go index 3f8c087a..dfe366e8 100644 --- a/resources/timer.go +++ b/resources/timer.go @@ -22,8 +22,6 @@ import ( "fmt" "log" "time" - - "github.com/purpleidea/mgmt/event" ) func init() { @@ -70,13 +68,13 @@ func (obj *TimerRes) newTicker() *time.Ticker { } // Watch is the primary listener for this resource and it outputs events. -func (obj *TimerRes) Watch(processChan chan *event.Event) error { +func (obj *TimerRes) Watch() error { // create a time.Ticker for the given interval obj.ticker = obj.newTicker() defer obj.ticker.Stop() // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -96,7 +94,7 @@ func (obj *TimerRes) Watch(processChan chan *event.Event) error { if send { send = false - obj.Event(processChan) + obj.Event() } } } diff --git a/resources/virt.go b/resources/virt.go index cd79c35e..88fb8b6c 100644 --- a/resources/virt.go +++ b/resources/virt.go @@ -29,8 +29,6 @@ import ( "strings" "time" - "github.com/purpleidea/mgmt/event" - multierr "github.com/hashicorp/go-multierror" "github.com/libvirt/libvirt-go" libvirtxml "github.com/libvirt/libvirt-go-xml" @@ -250,7 +248,7 @@ func (obj *VirtRes) connect() (conn *libvirt.Connect, err error) { } // Watch is the primary listener for this resource and it outputs events. -func (obj *VirtRes) Watch(processChan chan *event.Event) error { +func (obj *VirtRes) Watch() error { domChan := make(chan libvirt.DomainEventType) // TODO: do we need to buffer this? gaChan := make(chan *libvirt.DomainEventAgentLifecycle) errorChan := make(chan error) @@ -308,7 +306,7 @@ func (obj *VirtRes) Watch(processChan chan *event.Event) error { defer obj.conn.DomainEventDeregister(gaCallbackID) // notify engine that we're running - if err := obj.Running(processChan); err != nil { + if err := obj.Running(); err != nil { return err // bubble up a NACK... } @@ -400,7 +398,7 @@ func (obj *VirtRes) Watch(processChan chan *event.Event) error { if send { send = false - obj.Event(processChan) + obj.Event() } } }