diff --git a/docs/resource-guide.md b/docs/resource-guide.md index 4c321bc8..7840d02e 100644 --- a/docs/resource-guide.md +++ b/docs/resource-guide.md @@ -33,6 +33,7 @@ along with this program. If not, see . * [Default - Get an empty resource with defaults](#default) * [Validate - Validate the values of a resource struct](#validate) * [Init - Initialize the resource](#init) + * [Close - Cleanup the resource](#close) * [CheckApply - Check and apply resource state](#checkapply) * [Watch - Detect resource changes](#watch) * [Compare - Compare resource with another](#compare) @@ -133,6 +134,31 @@ this. In other words, you should expect `Validate` to have run first, but you shouldn't allow `Init` to dangerously `rm -rf /$the_world` if your code only checks `$the_world` in `Validate`. Remember to always program safely! +### Close +```golang +Close() error +``` + +This is called to cleanup after the resource. It is usually not necessary, but +can be useful if you'd like to properly close a persistent connection that you +opened in the `Init` method and were using throughout the resource. + +#### Example +```golang +// Close runs some cleanup code for this resource. +func (obj *FooRes) Close() error { + + obj.Conn.Close() // ignore error in this case + + return obj.BaseRes.Close() // call base close, b/c we're overriding +} +``` + +You should probably check the return errors of your internal methods, and pass +on an error if something went wrong. Remember to always call the base `Close` +method! If you plan to return early if you hit an internal error, then at least +call it with a defer! + ### CheckApply ```golang CheckApply(apply bool) (checkOK bool, err error) @@ -210,12 +236,12 @@ will likely find the state to now be correct. ### Watch ```golang -Watch(chan Event) error +Watch(chan *Event) error ``` `Watch` is a main loop that runs and sends messages when it detects that the state of the resource might have changed. To send a message you should write to -the input `Event` channel using the `DoSend` helper method. The Watch function +the input event channel using the `Event` helper method. The Watch function should run continuously until a shutdown message is received. If at any time something goes wrong, you should return an error, and the `mgmt` engine will handle possibly restarting the main loop based on the `retry` meta parameters. @@ -250,7 +276,7 @@ itself! If we receive an internal event from the `<-obj.Events()` method, we can read it with the ReadEvent helper function. This function tells us if we should shutdown our resource, and if we should generate an event. When we want to send an event, -we use the `DoSend` helper function. It is also important to mark the resource +we use the `Event` helper function. It is also important to mark the resource state as `dirty` if we believe it might have changed. We do this with the `StateOK(false)` function. @@ -278,7 +304,7 @@ thing, but provide a `select`-free interface for different coding situations. #### Example ```golang // Watch is the listener and main loop for this resource. -func (obj *FooRes) Watch(processChan chan event.Event) error { +func (obj *FooRes) Watch(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status // setup the Foo resource @@ -294,15 +320,15 @@ func (obj *FooRes) Watch(processChan chan event.Event) error { } var send = false // send event? - var exit = false + var exit *error for { obj.SetState(ResStateWatching) // reset select { case event := <-obj.Events(): cuid.SetConverged(false) // we avoid sending events on unpause - if exit, send = obj.ReadEvent(&event); exit { - return nil // exit + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit // exit } // the actual events! @@ -326,9 +352,7 @@ func (obj *FooRes) Watch(processChan chan event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } } diff --git a/event/event.go b/event/event.go index d13051eb..4d619be5 100644 --- a/event/event.go +++ b/event/event.go @@ -46,8 +46,7 @@ type Event struct { Name EventName Resp Resp // channel to send an ack response on, nil to skip //Wg *sync.WaitGroup // receiver barrier to Wait() for everyone else on - Msg string // some words for fun - Activity bool // did something interesting happen? + Err error // store an error in our event } // ACK sends a single acknowledgement on the channel if one was requested. @@ -80,7 +79,7 @@ func NewResp() Resp { // ACK sends a true value to resp. func (resp Resp) ACK() { if resp != nil { - resp <- nil + resp <- nil // TODO: close instead? } } @@ -114,7 +113,7 @@ func (resp Resp) ACKWait() { } } -// GetActivity returns the activity value. -func (event *Event) GetActivity() bool { - return event.Activity +// Error returns the stored error value. +func (event *Event) Error() error { + return event.Err } diff --git a/pgraph/actions.go b/pgraph/actions.go index cb1f2971..c2d7cec8 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -27,6 +27,7 @@ import ( "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/resources" + multierr "github.com/hashicorp/go-multierror" errwrap "github.com/pkg/errors" ) @@ -76,11 +77,9 @@ func (g *Graph) Poke(v *Vertex, activity bool) error { wg.Add(1) go func(nn *Vertex) error { defer wg.Done() - edge := g.Adjacency[v][nn] // lookup - notify := edge.Notify && edge.Refresh() - - // FIXME: is it okay that this is sync? - nn.SendEvent(event.EventPoke, true, notify) + //edge := g.Adjacency[v][nn] // lookup + //notify := edge.Notify && edge.Refresh() + nn.SendEvent(event.EventPoke, nil) // TODO: check return value? return nil // never error for now... }(n) @@ -110,8 +109,7 @@ func (g *Graph) BackPoke(v *Vertex) { if g.Flags.Debug { log.Printf("%s[%s]: BackPoke: %s[%s]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) } - // FIXME: is it okay that this is sync? - n.SendEvent(event.EventBackPoke, true, false) + n.SendEvent(event.EventBackPoke, nil) } else { if g.Flags.Debug { log.Printf("%s[%s]: BackPoke: %s[%s]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) @@ -311,68 +309,98 @@ func (g *Graph) Worker(v *Vertex) error { // the Watch() function about which graph it is // running on, which isolates things nicely... obj := v.Res - // TODO: is there a better system for the `Watching` flag? - obj.SetWatching(true) - defer obj.SetWatching(false) - processChan := make(chan event.Event) + obj.SetWorking(true) // gets set to false in Res.Close() method at end... + + lock := &sync.Mutex{} // lock around processChan closing and sending + finished := false // did we close processChan ? + processChan := make(chan *event.Event) go func() { running := false + done := make(chan struct{}) + playback := false // do we need to run another one? + + waiting := false var timer = time.NewTimer(time.Duration(math.MaxInt64)) // longest duration if !timer.Stop() { <-timer.C // unnecessary, shouldn't happen } + var delay = time.Duration(v.Meta().Delay) * time.Millisecond var retry = v.Meta().Retry // number of tries left, -1 for infinite - var saved event.Event + Loop: for { // this has to be synchronous, because otherwise the Res // event loop will keep running and change state, // causing the converged timeout to fire! select { - case event, ok := <-processChan: // must use like this - if running && ok { - // we got an event that wasn't a close, - // while we were waiting for the timer! - // if this happens, it might be a bug:( - log.Fatalf("%s[%s]: Worker: Unexpected event: %+v", v.Kind(), v.GetName(), event) - } + case ev, ok := <-processChan: // must use like this if !ok { // processChan closed, let's exit break Loop // no event, so no ack! } - // the above mentioned synchronous part, is the - // running of this function, paired with an ack. - if e := g.Process(v); e != nil { - saved = event - log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e) - if retry == 0 { - // wrap the error in the sentinel - event.ACKNACK(&SentinelErr{e}) // fail the Watch() - break Loop - } - if retry > 0 { // don't decrement the -1 - retry-- - } - log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry) - // start the timer... - timer.Reset(delay) - running = true + // if running, we skip running a new execution! + // if waiting, we skip running a new execution! + if running || waiting { + playback = true + ev.ACK() // ready for next message continue } - retry = v.Meta().Retry // reset on success - event.ACK() // sync + + running = true + go func(ev *event.Event) { + if e := g.Process(v); e != nil { + playback = true + log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e) + if retry == 0 { + // wrap the error in the sentinel + v.SendEvent(event.EventExit, &SentinelErr{e}) + return + } + if retry > 0 { // don't decrement the -1 + retry-- + } + log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry) + // start the timer... + timer.Reset(delay) + waiting = true // waiting for retry timer + return + } + retry = v.Meta().Retry // reset on success + close(done) // trigger + }(ev) + ev.ACK() // sync (now mostly useless) case <-timer.C: + waiting = false if !timer.Stop() { //<-timer.C // blocks, docs are wrong! } - running = false log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName()) - // re-send this failed event, to trigger a CheckApply() - go func() { processChan <- saved }() - // TODO: should we send a fake event instead? - //saved = nil + close(done) + + // a CheckApply run (with possibly retry pause) finished + case <-done: + if g.Flags.Debug { + log.Printf("%s[%s]: CheckApply finished!", v.Kind(), v.GetName()) + } + done = make(chan struct{}) // reset + // re-send this event, to trigger a CheckApply() + if playback { + playback = false + // this lock avoids us sending to + // channel after we've closed it! + lock.Lock() + go func() { + if !finished { + // TODO: can this experience indefinite postponement ? + // see: https://github.com/golang/go/issues/11506 + obj.Event(processChan) // replay a new event + } + lock.Unlock() + }() + } + running = false } } }() @@ -403,8 +431,14 @@ func (g *Graph) Worker(v *Vertex) error { case event := <-obj.Events(): // NOTE: this code should match the similar Res code! //cuid.SetConverged(false) // TODO: ? - if exit, send := obj.ReadEvent(&event); exit { - return nil // exit + if exit, send := obj.ReadEvent(event); exit != nil { + err := *exit // exit err + if e := obj.Close(); err == nil { + err = e + } else if e != nil { + err = multierr.Append(err, e) // list of errors + } + return err // exit } else if send { // if we dive down this rabbit hole, our // timer.C won't get seen until we get out! @@ -442,7 +476,7 @@ func (g *Graph) Worker(v *Vertex) error { // TODO: reset the watch retry count after some amount of success v.Res.RegisterConverger() var e error - if v.Meta().Poll > 0 { // poll instead of watching :( + if v.Res.Meta().Poll > 0 { // poll instead of watching :( cuid := v.Res.ConvergerUID() // get the converger uid used to report status cuid.StartTimer() e = v.Res.Poll(processChan) @@ -474,7 +508,17 @@ func (g *Graph) Worker(v *Vertex) error { // by getting the Watch resource to send one event once it's up! //v.SendEvent(eventPoke, false, false) } + lock.Lock() // lock to avoid a send when closed! + finished = true close(processChan) + lock.Unlock() + + // close resource and return possible errors if any + if e := obj.Close(); err == nil { + err = e + } else if e != nil { + err = multierr.Append(err, e) // list of errors + } return err } @@ -504,7 +548,7 @@ func (g *Graph) Start(first bool) { // start or continue v.Res.Starter(true) // let the startup code know to poke } - if !v.Res.IsWatching() { // if Watch() is not running... + if !v.Res.IsWorking() { // if Worker() is not running... g.wg.Add(1) // must pass in value to avoid races... // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ @@ -529,7 +573,7 @@ func (g *Graph) Start(first bool) { // start or continue }(v) if !first { // unpause! - v.Res.SendEvent(event.EventStart, true, false) // sync! + v.Res.SendEvent(event.EventStart, nil) // sync! } } @@ -542,7 +586,7 @@ func (g *Graph) Pause() { defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState()) t, _ := g.TopologicalSort() for _, v := range t { // squeeze out the events... - v.SendEvent(event.EventPause, true, false) + v.SendEvent(event.EventPause, nil) } } @@ -559,7 +603,7 @@ func (g *Graph) Exit() { // when we hit the 'default' in the select statement! // XXX: we can do this to quiesce, but it's not necessary now - v.SendEvent(event.EventExit, true, false) + v.SendEvent(event.EventExit, nil) } g.wg.Wait() // for now, this doesn't need to be a separate Wait() method } diff --git a/pgraph/pgraph.go b/pgraph/pgraph.go index 1d285e88..ba0053ab 100644 --- a/pgraph/pgraph.go +++ b/pgraph/pgraph.go @@ -567,7 +567,7 @@ func (g *Graph) GraphSync(oldGraph *Graph) (*Graph, error) { for v := range oldGraph.Adjacency { if !VertexContains(v, vertexKeep) { // wait for exit before starting new graph! - v.SendEvent(event.EventExit, true, false) + v.SendEvent(event.EventExit, nil) // sync oldGraph.DeleteVertex(v) } } diff --git a/resources/exec.go b/resources/exec.go index a9175634..32771e78 100644 --- a/resources/exec.go +++ b/resources/exec.go @@ -113,11 +113,11 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan } // Watch is the primary listener for this resource and it outputs events. -func (obj *ExecRes) Watch(processChan chan event.Event) error { +func (obj *ExecRes) Watch(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status var send = false // send event? - var exit = false + var exit *error bufioch, errch := make(chan string), make(chan error) if obj.WatchCmd != "" { @@ -185,8 +185,8 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error { case event := <-obj.Events(): cuid.SetConverged(false) - if exit, send = obj.ReadEvent(&event); exit { - return nil // exit + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit // exit } case <-cuid.ConvergedTimer(): @@ -199,9 +199,7 @@ func (obj *ExecRes) Watch(processChan chan event.Event) error { send = false // it is okay to invalidate the clean state on poke too obj.StateOK(false) // something made state dirty - if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } } diff --git a/resources/file.go b/resources/file.go index a1b1691f..5ee49efd 100644 --- a/resources/file.go +++ b/resources/file.go @@ -147,7 +147,7 @@ func (obj *FileRes) GetPath() string { // If the Watch returns an error, it means that something has gone wrong, and it // must be restarted. On a clean exit it returns nil. // FIXME: Also watch the source directory when using obj.Source !!! -func (obj *FileRes) Watch(processChan chan event.Event) error { +func (obj *FileRes) Watch(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status var err error @@ -163,7 +163,7 @@ func (obj *FileRes) Watch(processChan chan event.Event) error { } var send = false // send event? - var exit = false + var exit *error for { if obj.debug { @@ -188,8 +188,8 @@ func (obj *FileRes) Watch(processChan chan event.Event) error { case event := <-obj.Events(): cuid.SetConverged(false) - if exit, send = obj.ReadEvent(&event); exit { - return nil // exit + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit // exit } //obj.StateOK(false) // dirty // these events don't invalidate state @@ -201,9 +201,7 @@ func (obj *FileRes) Watch(processChan chan event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } } diff --git a/resources/hostname.go b/resources/hostname.go index 95f31337..2078e357 100644 --- a/resources/hostname.go +++ b/resources/hostname.go @@ -111,7 +111,7 @@ func (obj *HostnameRes) Init() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *HostnameRes) Watch(processChan chan event.Event) error { +func (obj *HostnameRes) Watch(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status // if we share the bus with others, we will get each others messages!! @@ -148,8 +148,8 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error { case event := <-obj.Events(): cuid.SetConverged(false) // we avoid sending events on unpause - if exit, _ := obj.ReadEvent(&event); exit { - return nil // exit + if exit, _ := obj.ReadEvent(event); exit != nil { + return *exit // exit } send = true obj.StateOK(false) // dirty @@ -162,10 +162,7 @@ func (obj *HostnameRes) Watch(processChan chan event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - - if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } } diff --git a/resources/msg.go b/resources/msg.go index 49a7eb06..78174e05 100644 --- a/resources/msg.go +++ b/resources/msg.go @@ -139,7 +139,7 @@ func (obj *MsgRes) journalPriority() journal.Priority { } // Watch is the primary listener for this resource and it outputs events. -func (obj *MsgRes) Watch(processChan chan event.Event) error { +func (obj *MsgRes) Watch(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status // notify engine that we're running @@ -148,15 +148,15 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error { } var send = false // send event? - var exit = false + var exit *error for { obj.SetState(ResStateWatching) // reset select { case event := <-obj.Events(): cuid.SetConverged(false) // we avoid sending events on unpause - if exit, send = obj.ReadEvent(&event); exit { - return nil // exit + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit // exit } case <-cuid.ConvergedTimer(): @@ -167,9 +167,7 @@ func (obj *MsgRes) Watch(processChan chan event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } } diff --git a/resources/noop.go b/resources/noop.go index f2f8e915..7b4b149d 100644 --- a/resources/noop.go +++ b/resources/noop.go @@ -63,7 +63,7 @@ func (obj *NoopRes) Init() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *NoopRes) Watch(processChan chan event.Event) error { +func (obj *NoopRes) Watch(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status // notify engine that we're running @@ -72,15 +72,15 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error { } var send = false // send event? - var exit = false + var exit *error for { obj.SetState(ResStateWatching) // reset select { case event := <-obj.Events(): cuid.SetConverged(false) // we avoid sending events on unpause - if exit, send = obj.ReadEvent(&event); exit { - return nil // exit + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit // exit } case <-cuid.ConvergedTimer(): @@ -91,9 +91,7 @@ func (obj *NoopRes) Watch(processChan chan event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } } diff --git a/resources/nspawn.go b/resources/nspawn.go index 02f8fec3..04f7a59c 100644 --- a/resources/nspawn.go +++ b/resources/nspawn.go @@ -102,7 +102,7 @@ func (obj *NspawnRes) Init() error { } // Watch for state changes and sends a message to the bus if there is a change -func (obj *NspawnRes) Watch(processChan chan event.Event) error { +func (obj *NspawnRes) Watch(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status // this resource depends on systemd ensure that it's running @@ -133,7 +133,7 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error { } var send = false - var exit = false + var exit *error for { obj.SetState(ResStateWatching) @@ -155,8 +155,8 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error { case event := <-obj.Events(): cuid.SetConverged(false) - if exit, send = obj.ReadEvent(&event); exit { - return nil // exit + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit // exit } case <-cuid.ConvergedTimer(): @@ -167,9 +167,7 @@ func (obj *NspawnRes) Watch(processChan chan event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } } diff --git a/resources/password.go b/resources/password.go index 20eb3e6b..2e6281dc 100644 --- a/resources/password.go +++ b/resources/password.go @@ -173,7 +173,7 @@ Loop: } // Watch is the primary listener for this resource and it outputs events. -func (obj *PasswordRes) Watch(processChan chan event.Event) error { +func (obj *PasswordRes) Watch(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status var err error @@ -189,7 +189,7 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error { } var send = false // send event? - var exit = false + var exit *error for { obj.SetState(ResStateWatching) // reset select { @@ -208,8 +208,8 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error { case event := <-obj.Events(): cuid.SetConverged(false) // we avoid sending events on unpause - if exit, send = obj.ReadEvent(&event); exit { - return nil // exit + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit // exit } case <-cuid.ConvergedTimer(): @@ -220,9 +220,7 @@ func (obj *PasswordRes) Watch(processChan chan event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } } diff --git a/resources/pkg.go b/resources/pkg.go index 065fa993..06a8db37 100644 --- a/resources/pkg.go +++ b/resources/pkg.go @@ -115,7 +115,7 @@ func (obj *PkgRes) Init() error { // It uses the PackageKit UpdatesChanged signal to watch for changes. // TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/110 -func (obj *PkgRes) Watch(processChan chan event.Event) error { +func (obj *PkgRes) Watch(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status bus := packagekit.NewBus() @@ -135,7 +135,7 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error { } var send = false // send event? - var exit = false + var exit *error for { if obj.debug { @@ -163,8 +163,8 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error { case event := <-obj.Events(): cuid.SetConverged(false) - if exit, send = obj.ReadEvent(&event); exit { - return nil // exit + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit // exit } //obj.StateOK(false) // these events don't invalidate state @@ -176,9 +176,7 @@ func (obj *PkgRes) Watch(processChan chan event.Event) error { // do all our event sending all together to avoid duplicate msgs if send { send = false - if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } } diff --git a/resources/resources.go b/resources/resources.go index ff01c146..f17ba0e9 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -26,6 +26,7 @@ import ( "log" "os" "path" + "sync" "time" // TODO: should each resource be a sub-package? @@ -129,19 +130,19 @@ type Base interface { SetKind(string) Kind() string Meta() *MetaParams - Events() chan event.Event + Events() chan *event.Event AssociateData(*Data) - IsWatching() bool - SetWatching(bool) + IsWorking() bool + SetWorking(bool) Converger() converger.Converger RegisterConverger() UnregisterConverger() ConvergerUID() converger.ConvergerUID GetState() ResState SetState(ResState) - DoSend(chan event.Event, string) (bool, error) - SendEvent(event.EventName, bool, bool) bool - ReadEvent(*event.Event) (bool, bool) // TODO: optional here? + Event(chan *event.Event) error + SendEvent(event.EventName, error) error + ReadEvent(*event.Event) (*error, bool) Refresh() bool // is there a pending refresh to run? SetRefresh(bool) // set the refresh state of this resource SendRecv(Res) (map[string]bool, error) // send->recv data passing function @@ -154,10 +155,10 @@ type Base interface { GetGroup() []Res // return everyone grouped inside me SetGroup([]Res) VarDir(string) (string, error) - Running(chan event.Event) error // notify the engine that Watch started - Started() <-chan struct{} // returns when the resource has started + Running(chan *event.Event) error // notify the engine that Watch started + Started() <-chan struct{} // returns when the resource has started Starter(bool) - Poll(chan event.Event) error // poll alternative to watching :( + Poll(chan *event.Event) error // poll alternative to watching :( } // Res is the minimum interface you need to implement to define a new resource. @@ -166,8 +167,9 @@ type Res interface { Default() Res // return a struct with sane defaults as a Res Validate() error Init() error - GetUIDs() []ResUID // most resources only return one - Watch(chan event.Event) error // send on channel to signal process() events + Close() error + GetUIDs() []ResUID // most resources only return one + Watch(chan *event.Event) error // send on channel to signal process() events CheckApply(apply bool) (checkOK bool, err error) AutoEdges() AutoEdge Compare(Res) bool @@ -182,13 +184,14 @@ type BaseRes struct { Recv map[string]*Send // mapping of key to receive on from value kind string - events chan event.Event + mutex *sync.Mutex // locks around sending and closing of events channel + events chan *event.Event converger converger.Converger // converged tracking cuid converger.ConvergerUID prefix string // base prefix for this resource debug bool state ResState - watching bool // is Watch() loop running ? + working bool // is the Worker() loop running ? started chan struct{} // closed when worker is started/running starter bool // does this have indegree == 0 ? XXX: usually? isStateOK bool // whether the state is okay based on events or not @@ -244,8 +247,9 @@ func (obj *BaseRes) Init() error { if obj.kind == "" { return fmt.Errorf("Resource did not set kind!") } - obj.events = make(chan event.Event) // unbuffered chan to avoid stale events - obj.started = make(chan struct{}) // closes when started + obj.mutex = &sync.Mutex{} + obj.events = make(chan *event.Event) // unbuffered chan to avoid stale events + obj.started = make(chan struct{}) // closes when started //dir, err := obj.VarDir("") //if err != nil { // return errwrap.Wrapf(err, "VarDir failed in Init()") @@ -255,6 +259,15 @@ func (obj *BaseRes) Init() error { return nil } +// Close shuts down and performs any cleanup. +func (obj *BaseRes) Close() error { + obj.mutex.Lock() + obj.working = false // obj.SetWorking(false) + close(obj.events) // this is where we properly close this channel! + obj.mutex.Unlock() + return nil +} + // GetName is used by all the resources to Get the name. func (obj *BaseRes) GetName() string { return obj.Name @@ -281,7 +294,7 @@ func (obj *BaseRes) Meta() *MetaParams { } // Events returns the channel of events to listen on. -func (obj *BaseRes) Events() chan event.Event { +func (obj *BaseRes) Events() chan *event.Event { return obj.events } @@ -292,14 +305,18 @@ func (obj *BaseRes) AssociateData(data *Data) { obj.debug = data.Debug } -// IsWatching tells us if the Worker() function is running. -func (obj *BaseRes) IsWatching() bool { - return obj.watching +// IsWorking tells us if the Worker() function is running. +func (obj *BaseRes) IsWorking() bool { + obj.mutex.Lock() + defer obj.mutex.Unlock() + return obj.working } -// SetWatching stores the status of if the Worker() function is running. -func (obj *BaseRes) SetWatching(b bool) { - obj.watching = b +// SetWorking tracks the state of if Worker() function is running. +func (obj *BaseRes) SetWorking(b bool) { + obj.mutex.Lock() + defer obj.mutex.Unlock() + obj.working = b } // Converger returns the converger object used by the system. It can be used to @@ -455,7 +472,7 @@ func (obj *BaseRes) Started() <-chan struct{} { return obj.started } func (obj *BaseRes) Starter(b bool) { obj.starter = b } // Poll is the watch replacement for when we want to poll, which outputs events. -func (obj *BaseRes) Poll(processChan chan event.Event) error { +func (obj *BaseRes) Poll(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status // create a time.Ticker for the given interval @@ -468,7 +485,7 @@ func (obj *BaseRes) Poll(processChan chan event.Event) error { } var send = false - var exit = false + var exit *error for { obj.SetState(ResStateWatching) select { @@ -479,16 +496,14 @@ func (obj *BaseRes) Poll(processChan chan event.Event) error { case event := <-obj.Events(): cuid.ResetTimer() // important - if exit, send = obj.ReadEvent(&event); exit { - return nil + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit // exit } } if send { send = false - if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } } diff --git a/resources/sendrecv.go b/resources/sendrecv.go index f629471e..76b4fd68 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -28,97 +28,84 @@ import ( errwrap "github.com/pkg/errors" ) -// SendEvent pushes an event into the message queue for a particular vertex -func (obj *BaseRes) SendEvent(ev event.EventName, sync bool, activity bool) bool { - // TODO: isn't this race-y ? - if !obj.IsWatching() { // element has already exited - return false // if we don't return, we'll block on the send - } - if !sync { - obj.events <- event.Event{Name: ev, Resp: nil, Msg: "", Activity: activity} - return true - } - +// Event sends off an event, but doesn't block the incoming event queue. +func (obj *BaseRes) Event(processChan chan *event.Event) error { resp := event.NewResp() - obj.events <- event.Event{Name: ev, Resp: resp, Msg: "", Activity: activity} - resp.ACKWait() // waits until true (nil) value - return true + processChan <- &event.Event{Name: event.EventNil, Resp: resp} // trigger process + return resp.Wait() } -// DoSend sends off an event, but doesn't block the incoming event queue. -func (obj *BaseRes) DoSend(processChan chan event.Event, comment string) (exit bool, err error) { +// SendEvent pushes an event into the message queue for a particular vertex. +func (obj *BaseRes) SendEvent(ev event.EventName, err error) error { resp := event.NewResp() - processChan <- event.Event{Name: event.EventNil, Resp: resp, Activity: false, Msg: comment} // trigger process - e := resp.Wait() - return false, e // XXX: at the moment, we don't use the exit bool. + obj.mutex.Lock() + if !obj.working { + obj.mutex.Unlock() + return fmt.Errorf("resource worker is not running") + } + obj.events <- &event.Event{Name: ev, Resp: resp, Err: err} + obj.mutex.Unlock() + resp.ACKWait() // waits until true (nil) value + return nil } // ReadEvent processes events when a select gets one, and handles the pause // code too! The return values specify if we should exit and poke respectively. -func (obj *BaseRes) ReadEvent(ev *event.Event) (exit, send bool) { +func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) { ev.ACK() - var poke bool - // ensure that a CheckApply runs by sending with a dirty state... - if ev.GetActivity() { // if previous node did work, and we were notified... - //obj.StateOK(false) // not necessarily - poke = true // poke! - //obj.SetRefresh(true) // TODO: is this redundant? - } + err := ev.Error() switch ev.Name { case event.EventStart: - send = true || poke - return + return nil, true case event.EventPoke: - send = true || poke - return + return nil, true case event.EventBackPoke: - send = true || poke - return // forward poking in response to a back poke! + return nil, true // forward poking in response to a back poke! case event.EventExit: // FIXME: what do we do if we have a pending refresh (poke) and an exit? - return true, false + return &err, false case event.EventPause: // wait for next event to continue select { case e, ok := <-obj.Events(): if !ok { // shutdown - return true, false + err := error(nil) + return &err, false } e.ACK() + err := e.Error() if e.Name == event.EventExit { - return true, false + return &err, false } else if e.Name == event.EventStart { // eventContinue - return false, false // don't poke on unpause! - } else { - // if we get a poke event here, it's a bug! - log.Fatalf("%s[%s]: Unknown event: %v, while paused!", obj.Kind(), obj.GetName(), e) + return nil, false // don't poke on unpause! } + // if we get a poke event here, it's a bug! + err = fmt.Errorf("%s[%s]: Unknown event: %v, while paused!", obj.Kind(), obj.GetName(), e) + panic(err) // TODO: return a special sentinel instead? + //return &err, false } - - default: - log.Fatal("Unknown event: ", ev) } - return true, false // required to keep the stupid go compiler happy + err = fmt.Errorf("Unknown event: %v", ev) + panic(err) // TODO: return a special sentinel instead? + //return &err, false } // Running is called by the Watch method of the resource once it has started up. // This signals to the engine to kick off the initial CheckApply resource check. -func (obj *BaseRes) Running(processChan chan event.Event) error { +func (obj *BaseRes) Running(processChan chan *event.Event) error { obj.StateOK(false) // assume we're initially dirty cuid := obj.ConvergerUID() // get the converger uid used to report status cuid.SetConverged(false) // a reasonable initial assumption close(obj.started) // send started signal - // FIXME: exit return value is unused atm, so ignore it for now... - //if exit, err := obj.DoSend(processChan, ""); exit || err != nil { var err error if obj.starter { // vertices of indegree == 0 should send initial pokes - _, err = obj.DoSend(processChan, "") // trigger a CheckApply + err = obj.Event(processChan) // trigger a CheckApply } return err // bubble up any possible error (or nil) } diff --git a/resources/svc.go b/resources/svc.go index 7ad734ee..fc096ff7 100644 --- a/resources/svc.go +++ b/resources/svc.go @@ -79,7 +79,7 @@ func (obj *SvcRes) Init() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *SvcRes) Watch(processChan chan event.Event) error { +func (obj *SvcRes) Watch(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status // obj.Name: svc name @@ -112,7 +112,7 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { var svc = fmt.Sprintf("%s.service", obj.Name) // systemd name var send = false // send event? - var exit = false + var exit *error var invalid = false // does the svc exist or not? var previous bool // previous invalid value set := conn.NewSubscriptionSet() // no error should be returned @@ -162,8 +162,8 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { case event := <-obj.Events(): cuid.SetConverged(false) - if exit, send = obj.ReadEvent(&event); exit { - return nil // exit + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit // exit } case <-cuid.ConvergedTimer(): @@ -209,8 +209,8 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { case event := <-obj.Events(): cuid.SetConverged(false) - if exit, send = obj.ReadEvent(&event); exit { - return nil // exit + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit // exit } case <-cuid.ConvergedTimer(): @@ -221,9 +221,7 @@ func (obj *SvcRes) Watch(processChan chan event.Event) error { if send { send = false - if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } } diff --git a/resources/timer.go b/resources/timer.go index e073ae51..6e5647a2 100644 --- a/resources/timer.go +++ b/resources/timer.go @@ -77,7 +77,7 @@ func (obj *TimerRes) newTicker() *time.Ticker { } // Watch is the primary listener for this resource and it outputs events. -func (obj *TimerRes) Watch(processChan chan event.Event) error { +func (obj *TimerRes) Watch(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status // create a time.Ticker for the given interval @@ -100,8 +100,8 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error { case event := <-obj.Events(): cuid.SetConverged(false) - if exit, _ := obj.ReadEvent(&event); exit { - return nil + if exit, _ := obj.ReadEvent(event); exit != nil { + return *exit // exit } case <-cuid.ConvergedTimer(): @@ -111,9 +111,7 @@ func (obj *TimerRes) Watch(processChan chan event.Event) error { if send { send = false - if exit, err := obj.DoSend(processChan, "timer ticked"); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } } diff --git a/resources/virt.go b/resources/virt.go index 10eaef28..7fc47e0f 100644 --- a/resources/virt.go +++ b/resources/virt.go @@ -155,7 +155,7 @@ func (obj *VirtRes) connect() (conn *libvirt.Connect, err error) { } // Watch is the primary listener for this resource and it outputs events. -func (obj *VirtRes) Watch(processChan chan event.Event) error { +func (obj *VirtRes) Watch(processChan chan *event.Event) error { cuid := obj.ConvergerUID() // get the converger uid used to report status conn, err := obj.connect() @@ -209,7 +209,7 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error { } var send = false - var exit = false + var exit *error // if ptr exists, that is the exit error to return for { select { @@ -261,8 +261,8 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error { case event := <-obj.Events(): cuid.SetConverged(false) - if exit, send = obj.ReadEvent(&event); exit { - return nil // exit + if exit, send = obj.ReadEvent(event); exit != nil { + return *exit // exit } case <-cuid.ConvergedTimer(): @@ -272,9 +272,7 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error { if send { send = false - if exit, err := obj.DoSend(processChan, ""); exit || err != nil { - return err // we exit or bubble up a NACK... - } + obj.Event(processChan) } } }