diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 5c9f91af..a05c1ed0 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -39,6 +39,7 @@ along with this program. If not, see . 5. [Resources - All built-in primitives](#resources) 6. [Usage/FAQ - Notes on usage and frequently asked questions](#usage-and-frequently-asked-questions) 7. [Reference - Detailed reference](#reference) + * [Meta parameters](#meta-parameters) * [Graph definition file](#graph-definition-file) * [Command line](#command-line) 8. [Examples - Example configurations](#examples) @@ -207,6 +208,13 @@ lets them respond in real-time to converge to the desired state. This property allows you to build more complex resources that you probably hadn't considered in the past. +In addition to the resource specific properties, there are resource properties +(otherwise known as parameters) which can apply to every resource. These are +called [meta parameters](#meta-parameters) and are listed separately. Certain +meta parameters aren't very useful when combined with certain resources, but +in general, it should be fairly obvious, such as when combining the `noop` meta +parameter with the [Noop](#Noop) resource. + * [Exec](#Exec): Execute shell commands on the system. * [File](#File): Manage files and directories. * [Noop](#Noop): A simple resource that does nothing. @@ -359,9 +367,40 @@ information on these options, please view the source at: If you feel that a well used option needs documenting here, please patch it! ###Overview of reference +* [Meta parameters](#meta-parameters): List of available resource meta parameters. * [Graph definition file](#graph-definition-file): Main graph definition file. * [Command line](#command-line): Command line parameters. +###Meta parameters +These meta parameters are special parameters (or properties) which can apply to +any resource. The usefulness of doing so will depend on the particular meta +parameter and resource combination. + +####AutoEdge +Boolean. Should we generate auto edges for this resource? + +####AutoGroup +Boolean. Should we attempt to automatically group this resource with others? + +####Noop +Boolean. Should the Apply portion of the CheckApply method of the resource +make any changes? Noop is a concatenation of no-operation. + +####Retry +Integer. The number of times to retry running the resource on error. Use -1 for +infinite. This currently applies for both the Watch operation (which can fail) +and for the CheckApply operation. While they could have separate values, I've +decided to use the same ones for both until there's a proper reason to want to +do something differently for the Watch errors. + +####Delay +Integer. Number of milliseconds to wait between retries. The same value is +shared between the Watch and CheckApply retries. This currently applies for both +the Watch operation (which can fail) and for the CheckApply operation. While +they could have separate values, I've decided to use the same ones for both +until there's a proper reason to want to do something differently for the Watch +errors. + ###Graph definition file graph.yaml is the compiled graph definition file. The format is currently undocumented, but by looking through the [examples/](https://github.com/purpleidea/mgmt/tree/master/examples) diff --git a/examples/file3.yaml b/examples/file3.yaml new file mode 100644 index 00000000..68c40d04 --- /dev/null +++ b/examples/file3.yaml @@ -0,0 +1,14 @@ +--- +graph: mygraph +comment: You can test Watch and CheckApply failures with chmod ugo-r and chmod ugo-w. +resources: + file: + - name: file1 + path: "/tmp/mgmt/f1" + meta: + retry: 3 + delay: 5000 + content: | + i am f1 + state: exists +edges: [] diff --git a/exec.go b/exec.go index b63c7a42..0dc8ae09 100644 --- a/exec.go +++ b/exec.go @@ -107,7 +107,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan } // Watch is the primary listener for this resource and it outputs events. -func (obj *ExecRes) Watch(processChan chan Event, delay time.Duration) error { +func (obj *ExecRes) Watch(processChan chan Event) error { if obj.IsWatching() { return nil } @@ -116,59 +116,13 @@ func (obj *ExecRes) Watch(processChan chan Event, delay time.Duration) error { cuuid := obj.converger.Register() defer cuuid.Unregister() - var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas - doSend = func() (bool, error) { - resp := NewResp() - processChan <- Event{eventNil, resp, "", true} // trigger process - select { - case e := <-resp: // wait for the ACK() - if e != nil { // we got a NACK - return true, e // exit with error - } - - case event := <-obj.events: - // NOTE: this code should match the similar code below! - cuuid.SetConverged(false) - if exit, send := obj.ReadEvent(&event); exit { - return true, nil // exit, without error - } else if send { - return doSend() // recurse - } - } - return false, nil // return, no error or exit signal - } - - // if a retry-delay was requested, wait, but don't block our events! - if delay > 0 { - var pendingSendEvent bool - timer := time.NewTimer(delay) - Loop: - for { - select { - case <-timer.C: // the wait is over - break Loop // critical - - case event := <-obj.events: - // NOTE: this code should match the similar code below! - cuuid.SetConverged(false) - if exit, send := obj.ReadEvent(&event); exit { - return nil // exit - } else if send { - // NOTE: see long comment in the file resource - //if exit, err := doSend(); exit || err != nil { - // return err // we exit or bubble up a NACK... - //} - pendingSendEvent = true // all events are identical for now... - } - } - } - timer.Stop() // it's nice to cleanup - log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName()) - if pendingSendEvent { // TODO: should this become a list in the future? - if exit, err := doSend(); exit || err != nil { - return err // we exit or bubble up a NACK... - } + var startup bool + Startup := func(block bool) <-chan time.Time { + if block { + return nil // blocks forever + //return make(chan time.Time) // blocks forever } + return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout } var send = false // send event? @@ -242,14 +196,19 @@ func (obj *ExecRes) Watch(processChan chan Event, delay time.Duration) error { case <-cuuid.ConvergedTimer(): cuuid.SetConverged(true) // converged! continue + + case <-Startup(startup): + cuuid.SetConverged(false) + send = true } // do all our event sending all together to avoid duplicate msgs if send { + startup = true // startup finished send = false // it is okay to invalidate the clean state on poke too obj.isStateOK = false // something made state dirty - if exit, err := doSend(); exit || err != nil { + if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... } } diff --git a/file.go b/file.go index 999cbccd..8c0bd9ab 100644 --- a/file.go +++ b/file.go @@ -166,10 +166,9 @@ func (obj *FileRes) addSubFolders(p string) error { // This one is a file watcher for files and directories. // Modify with caution, it is probably important to write some test cases first! // If the Watch returns an error, it means that something has gone wrong, and it -// must be restarted. On a clean exit it returns nil. The delay parameter asks -// it to respect this pause duration before trying to watch again. +// must be restarted. On a clean exit it returns nil. // FIXME: Also watch the source directory when using obj.Source !!! -func (obj *FileRes) Watch(processChan chan Event, delay time.Duration) error { +func (obj *FileRes) Watch(processChan chan Event) error { if obj.IsWatching() { return nil // TODO: should this be an error? } @@ -178,73 +177,13 @@ func (obj *FileRes) Watch(processChan chan Event, delay time.Duration) error { cuuid := obj.converger.Register() defer cuuid.Unregister() - var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas - doSend = func() (bool, error) { - resp := NewResp() - processChan <- Event{eventNil, resp, "", true} // trigger process - select { - case e := <-resp: // wait for the ACK() - if e != nil { // we got a NACK - return true, e // exit with error - } - - case event := <-obj.events: - // NOTE: this code should match the similar code below! - cuuid.SetConverged(false) - if exit, send := obj.ReadEvent(&event); exit { - return true, nil // exit, without error - } else if send { - return doSend() // recurse - } - } - return false, nil // return, no error or exit signal - } - - // if a retry-delay was requested, wait, but don't block our events! - if delay > 0 { - var pendingSendEvent bool - timer := time.NewTimer(delay) - Loop: - for { - select { - case <-timer.C: // the wait is over - break Loop // critical - - case event := <-obj.events: - // NOTE: this code should match the similar code below! - cuuid.SetConverged(false) - if exit, send := obj.ReadEvent(&event); exit { - return nil // exit - } else if send { - // if we dive down this rabbit hole, our - // timer.C won't get seen until we get out! - // in this situation, the Watch() is blocked - // from performing until CheckApply returns - // successfully, or errors out. This isn't - // so bad, but we should document it. Is it - // possible that some resource *needs* Watch - // to run to be able to execute a CheckApply? - // That situation shouldn't be common, and - // should probably not be allowed. Can we - // avoid it though? - //if exit, err := doSend(); exit || err != nil { - // return err // we exit or bubble up a NACK... - //} - // Instead of doing the above, we can - // add events to a pending list, and - // when we finish the delay, we can run - // them. - pendingSendEvent = true // all events are identical for now... - } - } - } - timer.Stop() // it's nice to cleanup - log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName()) - if pendingSendEvent { // TODO: should this become a list in the future? - if exit, err := doSend(); exit || err != nil { - return err // we exit or bubble up a NACK... - } + var startup bool + Startup := func(block bool) <-chan time.Time { + if block { + return nil // blocks forever + //return make(chan time.Time) // blocks forever } + return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout } var safename = path.Clean(obj.path) // no trailing slash @@ -418,17 +357,23 @@ func (obj *FileRes) Watch(processChan chan Event, delay time.Duration) error { case <-cuuid.ConvergedTimer(): cuuid.SetConverged(true) // converged! continue + + case <-Startup(startup): + cuuid.SetConverged(false) + send = true + dirty = true } // do all our event sending all together to avoid duplicate msgs if send { + startup = true // startup finished send = false // only invalid state on certain types of events if dirty { dirty = false obj.isStateOK = false // something made state dirty } - if exit, err := doSend(); exit || err != nil { + if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... } } diff --git a/main.go b/main.go index 3450b19a..fe65f7d3 100644 --- a/main.go +++ b/main.go @@ -231,7 +231,7 @@ func run(c *cli.Context) error { converger.SetStateFn(convergerStateFn) } - exitchan := make(chan Event) // exit event + exitchan := make(chan struct{}) // exit on close go func() { startchan := make(chan struct{}) // start signal go func() { startchan <- struct{}{} }() @@ -268,8 +268,7 @@ func run(c *cli.Context) error { } // XXX: case compile_event: ... // ... - case msg := <-exitchan: - msg.ACK() + case <-exitchan: return } @@ -384,16 +383,13 @@ func run(c *cli.Context) error { G.Exit() // tell all the children to exit // tell inner main loop to exit - resp := NewResp() - go func() { exitchan <- Event{eventExit, resp, "", false} }() + close(exitchan) // cleanup etcd main loop last so it can process everything first if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd log.Printf("Etcd exited poorly with: %v", err) } - resp.ACKWait() // let inner main loop finish cleanly just in case - if DEBUG { log.Printf("Graph: %v", G) } diff --git a/noop.go b/noop.go index 12bcb750..2d354304 100644 --- a/noop.go +++ b/noop.go @@ -58,7 +58,7 @@ func (obj *NoopRes) Validate() bool { } // Watch is the primary listener for this resource and it outputs events. -func (obj *NoopRes) Watch(processChan chan Event, delay time.Duration) error { +func (obj *NoopRes) Watch(processChan chan Event) error { if obj.IsWatching() { return nil // TODO: should this be an error? } @@ -67,59 +67,13 @@ func (obj *NoopRes) Watch(processChan chan Event, delay time.Duration) error { cuuid := obj.converger.Register() defer cuuid.Unregister() - var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas - doSend = func() (bool, error) { - resp := NewResp() - processChan <- Event{eventNil, resp, "", true} // trigger process - select { - case e := <-resp: // wait for the ACK() - if e != nil { // we got a NACK - return true, e // exit with error - } - - case event := <-obj.events: - // NOTE: this code should match the similar code below! - cuuid.SetConverged(false) - if exit, send := obj.ReadEvent(&event); exit { - return true, nil // exit, without error - } else if send { - return doSend() // recurse - } - } - return false, nil // return, no error or exit signal - } - - // if a retry-delay was requested, wait, but don't block our events! - if delay > 0 { - var pendingSendEvent bool - timer := time.NewTimer(delay) - Loop: - for { - select { - case <-timer.C: // the wait is over - break Loop // critical - - case event := <-obj.events: - // NOTE: this code should match the similar code below! - cuuid.SetConverged(false) - if exit, send := obj.ReadEvent(&event); exit { - return nil // exit - } else if send { - // NOTE: see long comment in the file resource - //if exit, err := doSend(); exit || err != nil { - // return err // we exit or bubble up a NACK... - //} - pendingSendEvent = true // all events are identical for now... - } - } - } - timer.Stop() // it's nice to cleanup - log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName()) - if pendingSendEvent { // TODO: should this become a list in the future? - if exit, err := doSend(); exit || err != nil { - return err // we exit or bubble up a NACK... - } + var startup bool + Startup := func(block bool) <-chan time.Time { + if block { + return nil // blocks forever + //return make(chan time.Time) // blocks forever } + return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout } var send = false // send event? @@ -137,14 +91,19 @@ func (obj *NoopRes) Watch(processChan chan Event, delay time.Duration) error { case <-cuuid.ConvergedTimer(): cuuid.SetConverged(true) // converged! continue + + case <-Startup(startup): + cuuid.SetConverged(false) + send = true } // do all our event sending all together to avoid duplicate msgs if send { + startup = true // startup finished send = false // only do this on certain types of events //obj.isStateOK = false // something made state dirty - if exit, err := doSend(); exit || err != nil { + if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... } } diff --git a/pgraph.go b/pgraph.go index f759001f..b2e30954 100644 --- a/pgraph.go +++ b/pgraph.go @@ -793,6 +793,7 @@ func (g *Graph) Worker(v *Vertex) error { // this avoids us having to pass the data into // the Watch() function about which graph it is // running on, which isolates things nicely... + obj := v.Res chanProcess := make(chan Event) go func() { running := false @@ -847,6 +848,7 @@ func (g *Graph) Worker(v *Vertex) error { //<-timer.C // blocks, docs are wrong! } running = false + log.Printf("%s[%s]: CheckApply delay expired!", v.Kind(), v.GetName()) // re-send this failed event, to trigger a CheckApply() go func() { chanProcess <- saved }() // TODO: should we send a fake event instead? @@ -864,8 +866,61 @@ func (g *Graph) Worker(v *Vertex) error { var watchRetry int16 = v.Meta().Retry // number of tries left, -1 for infinite // watch blocks until it ends, & errors to retry for { + // TODO: do we have to stop the converged-timeout when in this block (perhaps we're in the delay block!) + // TODO: should we setup/manage some of the converged timeout stuff in here anyways? + + // if a retry-delay was requested, wait, but don't block our events! + if watchDelay > 0 { + //var pendingSendEvent bool + timer := time.NewTimer(watchDelay) + Loop: + for { + select { + case <-timer.C: // the wait is over + break Loop // critical + + // TODO: resources could have a separate exit channel to avoid this complexity!? + case event := <-obj.Events(): + // NOTE: this code should match the similar Res code! + //cuuid.SetConverged(false) // TODO ? + if exit, send := obj.ReadEvent(&event); exit { + return nil // exit + } else if send { + // if we dive down this rabbit hole, our + // timer.C won't get seen until we get out! + // in this situation, the Watch() is blocked + // from performing until CheckApply returns + // successfully, or errors out. This isn't + // so bad, but we should document it. Is it + // possible that some resource *needs* Watch + // to run to be able to execute a CheckApply? + // That situation shouldn't be common, and + // should probably not be allowed. Can we + // avoid it though? + //if exit, err := doSend(); exit || err != nil { + // return err // we exit or bubble up a NACK... + //} + // Instead of doing the above, we can + // add events to a pending list, and + // when we finish the delay, we can run + // them. + //pendingSendEvent = true // all events are identical for now... + } + } + } + timer.Stop() // it's nice to cleanup + log.Printf("%s[%s]: Watch delay expired!", v.Kind(), v.GetName()) + // NOTE: we can avoid the send if running Watch guarantees + // one CheckApply event on startup! + //if pendingSendEvent { // TODO: should this become a list in the future? + // if exit, err := obj.DoSend(chanProcess, ""); exit || err != nil { + // return err // we exit or bubble up a NACK... + // } + //} + } + // TODO: reset the watch retry count after some amount of success - e := v.Res.Watch(chanProcess, watchDelay) + e := v.Res.Watch(chanProcess) if e == nil { // exit signal err = nil // clean exit break @@ -884,11 +939,10 @@ func (g *Graph) Worker(v *Vertex) error { } watchDelay = time.Duration(v.Meta().Delay) * time.Millisecond log.Printf("%v[%v]: Watch: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), watchDelay.Seconds(), watchRetry) - // We trigger a CheckApply if watch restarts, so that we catch - // any possible events that happened while down. NOTE: this is - // only flood-safe if the Watch resource de-duplicates similar - // send event messages. It does for now, rethink this later... - v.SendEvent(eventPoke, false, false) + // We need to trigger a CheckApply after Watch restarts, so that + // we catch any lost events that happened while down. We do this + // by getting the Watch resource to send one event once it's up! + //v.SendEvent(eventPoke, false, false) } close(chanProcess) return err @@ -966,6 +1020,7 @@ func (g *Graph) Exit() { t, _ := g.TopologicalSort() for _, v := range t { // squeeze out the events... // turn off the taps... + // XXX: consider instead doing this by closing the Res.events channel instead? // XXX: do this by sending an exit signal, and then returning // when we hit the 'default' in the select statement! // XXX: we can do this to quiesce, but it's not necessary now diff --git a/pkg.go b/pkg.go index 2d906835..89e435b2 100644 --- a/pkg.go +++ b/pkg.go @@ -109,7 +109,7 @@ func (obj *PkgRes) Validate() bool { // It uses the PackageKit UpdatesChanged signal to watch for changes. // TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/110 -func (obj *PkgRes) Watch(processChan chan Event, delay time.Duration) error { +func (obj *PkgRes) Watch(processChan chan Event) error { if obj.IsWatching() { return nil } @@ -118,59 +118,13 @@ func (obj *PkgRes) Watch(processChan chan Event, delay time.Duration) error { cuuid := obj.converger.Register() defer cuuid.Unregister() - var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas - doSend = func() (bool, error) { - resp := NewResp() - processChan <- Event{eventNil, resp, "", true} // trigger process - select { - case e := <-resp: // wait for the ACK() - if e != nil { // we got a NACK - return true, e // exit with error - } - - case event := <-obj.events: - // NOTE: this code should match the similar code below! - cuuid.SetConverged(false) - if exit, send := obj.ReadEvent(&event); exit { - return true, nil // exit, without error - } else if send { - return doSend() // recurse - } - } - return false, nil // return, no error or exit signal - } - - // if a retry-delay was requested, wait, but don't block our events! - if delay > 0 { - var pendingSendEvent bool - timer := time.NewTimer(delay) - Loop: - for { - select { - case <-timer.C: // the wait is over - break Loop // critical - - case event := <-obj.events: - // NOTE: this code should match the similar code below! - cuuid.SetConverged(false) - if exit, send := obj.ReadEvent(&event); exit { - return nil // exit - } else if send { - // NOTE: see long comment in the file resource - //if exit, err := doSend(); exit || err != nil { - // return err // we exit or bubble up a NACK... - //} - pendingSendEvent = true // all events are identical for now... - } - } - } - timer.Stop() // it's nice to cleanup - log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName()) - if pendingSendEvent { // TODO: should this become a list in the future? - if exit, err := doSend(); exit || err != nil { - return err // we exit or bubble up a NACK... - } + var startup bool + Startup := func(block bool) <-chan time.Time { + if block { + return nil // blocks forever + //return make(chan time.Time) // blocks forever } + return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout } bus := NewBus() @@ -222,17 +176,23 @@ func (obj *PkgRes) Watch(processChan chan Event, delay time.Duration) error { case <-cuuid.ConvergedTimer(): cuuid.SetConverged(true) // converged! continue + + case <-Startup(startup): + cuuid.SetConverged(false) + send = true + dirty = true } // do all our event sending all together to avoid duplicate msgs if send { + startup = true // startup finished send = false // only invalid state on certain types of events if dirty { dirty = false obj.isStateOK = false // something made state dirty } - if exit, err := doSend(); exit || err != nil { + if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... } } diff --git a/resources.go b/resources.go index 2f1f643f..27aa07ca 100644 --- a/resources.go +++ b/resources.go @@ -23,7 +23,6 @@ import ( "encoding/gob" "fmt" "log" - "time" ) //go:generate stringer -type=resState -output=resstate_stringer.go @@ -80,11 +79,13 @@ type Base interface { setKind(string) Kind() string Meta() *MetaParams + Events() chan Event AssociateData(Converger) IsWatching() bool SetWatching(bool) GetState() resState SetState(resState) + DoSend(chan Event, string) (bool, error) SendEvent(eventName, bool, bool) bool ReadEvent(*Event) (bool, bool) // TODO: optional here? GroupCmp(Res) bool // TODO: is there a better name for this? @@ -100,8 +101,8 @@ type Res interface { Base // include everything from the Base interface Init() //Validate() bool // TODO: this might one day be added - GetUUIDs() []ResUUID // most resources only return one - Watch(chan Event, time.Duration) error // send on channel to signal process() events + GetUUIDs() []ResUUID // most resources only return one + Watch(chan Event) error // send on channel to signal process() events CheckApply(bool) (bool, error) AutoEdges() AutoEdge Compare(Res) bool @@ -193,6 +194,11 @@ func (obj *BaseRes) Meta() *MetaParams { return &obj.MetaParams } +// Events returns the channel of events to listen on. +func (obj *BaseRes) Events() chan Event { + return obj.events +} + // AssociateData associates some data with the object in question. func (obj *BaseRes) AssociateData(converger Converger) { obj.converger = converger @@ -221,6 +227,30 @@ func (obj *BaseRes) SetState(state resState) { obj.state = state } +// DoSend sends off an event, but doesn't block the incoming event queue. It can +// also recursively call itself when events need processing during the wait. +// I'm not completely comfortable with this fn, but it will have to do for now. +func (obj *BaseRes) DoSend(processChan chan Event, comment string) (bool, error) { + resp := NewResp() + processChan <- Event{eventNil, resp, comment, true} // trigger process + select { + case e := <-resp: // wait for the ACK() + if e != nil { // we got a NACK + return true, e // exit with error + } + + case event := <-obj.events: + // NOTE: this code should match the similar code below! + //cuuid.SetConverged(false) // TODO ? + if exit, send := obj.ReadEvent(&event); exit { + return true, nil // exit, without error + } else if send { + return obj.DoSend(processChan, comment) // recurse + } + } + return false, nil // return, no error or exit signal +} + // SendEvent pushes an event into the message queue for a particular vertex func (obj *BaseRes) SendEvent(event eventName, sync bool, activity bool) bool { // TODO: isn't this race-y ? diff --git a/svc.go b/svc.go index 0bcacb9c..969a114f 100644 --- a/svc.go +++ b/svc.go @@ -72,7 +72,7 @@ func (obj *SvcRes) Validate() bool { } // Watch is the primary listener for this resource and it outputs events. -func (obj *SvcRes) Watch(processChan chan Event, delay time.Duration) error { +func (obj *SvcRes) Watch(processChan chan Event) error { if obj.IsWatching() { return nil } @@ -81,59 +81,13 @@ func (obj *SvcRes) Watch(processChan chan Event, delay time.Duration) error { cuuid := obj.converger.Register() defer cuuid.Unregister() - var doSend func() (bool, error) // lol, golang doesn't support recursive lambdas - doSend = func() (bool, error) { - resp := NewResp() - processChan <- Event{eventNil, resp, "", true} // trigger process - select { - case e := <-resp: // wait for the ACK() - if e != nil { // we got a NACK - return true, e // exit with error - } - - case event := <-obj.events: - // NOTE: this code should match the similar code below! - cuuid.SetConverged(false) - if exit, send := obj.ReadEvent(&event); exit { - return true, nil // exit, without error - } else if send { - return doSend() // recurse - } - } - return false, nil // return, no error or exit signal - } - - // if a retry-delay was requested, wait, but don't block our events! - if delay > 0 { - var pendingSendEvent bool - timer := time.NewTimer(delay) - Loop: - for { - select { - case <-timer.C: // the wait is over - break Loop // critical - - case event := <-obj.events: - // NOTE: this code should match the similar code below! - cuuid.SetConverged(false) - if exit, send := obj.ReadEvent(&event); exit { - return nil // exit - } else if send { - // NOTE: see long comment in the file resource - //if exit, err := doSend(); exit || err != nil { - // return err // we exit or bubble up a NACK... - //} - pendingSendEvent = true // all events are identical for now... - } - } - } - timer.Stop() // it's nice to cleanup - log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName()) - if pendingSendEvent { // TODO: should this become a list in the future? - if exit, err := doSend(); exit || err != nil { - return err // we exit or bubble up a NACK... - } + var startup bool + Startup := func(block bool) <-chan time.Time { + if block { + return nil // blocks forever + //return make(chan time.Time) // blocks forever } + return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout } // obj.Name: svc name @@ -222,6 +176,11 @@ func (obj *SvcRes) Watch(processChan chan Event, delay time.Duration) error { case <-cuuid.ConvergedTimer(): cuuid.SetConverged(true) // converged! continue + + case <-Startup(startup): + cuuid.SetConverged(false) + send = true + dirty = true } } else { if !activeSet { @@ -268,16 +227,22 @@ func (obj *SvcRes) Watch(processChan chan Event, delay time.Duration) error { case <-cuuid.ConvergedTimer(): cuuid.SetConverged(true) // converged! continue + + case <-Startup(startup): + cuuid.SetConverged(false) + send = true + dirty = true } } if send { + startup = true // startup finished send = false if dirty { dirty = false obj.isStateOK = false // something made state dirty } - if exit, err := doSend(); exit || err != nil { + if exit, err := obj.DoSend(processChan, ""); exit || err != nil { return err // we exit or bubble up a NACK... } } diff --git a/timer.go b/timer.go index dea911f1..d6bb413d 100644 --- a/timer.go +++ b/timer.go @@ -65,7 +65,7 @@ func (obj *TimerRes) Validate() bool { } // Watch is the primary listener for this resource and it outputs events. -func (obj *TimerRes) Watch(processChan chan Event, delay time.Duration) error { +func (obj *TimerRes) Watch(processChan chan Event) error { if obj.IsWatching() { return nil } @@ -74,59 +74,13 @@ func (obj *TimerRes) Watch(processChan chan Event, delay time.Duration) error { cuuid := obj.converger.Register() defer cuuid.Unregister() - var doSend func(string) (bool, error) // lol, golang doesn't support recursive lambdas - doSend = func(comment string) (bool, error) { - resp := NewResp() - processChan <- Event{eventNil, resp, comment, true} // trigger process - select { - case e := <-resp: // wait for the ACK() - if e != nil { // we got a NACK - return true, e // exit with error - } - - case event := <-obj.events: - // NOTE: this code should match the similar code below! - cuuid.SetConverged(false) - if exit, send := obj.ReadEvent(&event); exit { - return true, nil // exit, without error - } else if send { - return doSend(comment) // recurse - } - } - return false, nil // return, no error or exit signal - } - - // if a retry-delay was requested, wait, but don't block our events! - if delay > 0 { - var pendingSendEvent bool - timer := time.NewTimer(delay) - Loop: - for { - select { - case <-timer.C: // the wait is over - break Loop // critical - - case event := <-obj.events: - // NOTE: this code should match the similar code below! - cuuid.SetConverged(false) - if exit, send := obj.ReadEvent(&event); exit { - return nil // exit - } else if send { - // NOTE: see long comment in the file resource - //if exit, err := doSend(); exit || err != nil { - // return err // we exit or bubble up a NACK... - //} - pendingSendEvent = true // all events are identical for now... - } - } - } - timer.Stop() // it's nice to cleanup - log.Printf("%s[%s]: Delay expired!", obj.Kind(), obj.GetName()) - if pendingSendEvent { // TODO: should this become a list in the future? - if exit, err := doSend("pending delayed event"); exit || err != nil { - return err // we exit or bubble up a NACK... - } + var startup bool + Startup := func(block bool) <-chan time.Time { + if block { + return nil // blocks forever + //return make(chan time.Time) // blocks forever } + return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout } // Create a time.Ticker for the given interval @@ -149,11 +103,16 @@ func (obj *TimerRes) Watch(processChan chan Event, delay time.Duration) error { case <-cuuid.ConvergedTimer(): cuuid.SetConverged(true) continue + + case <-Startup(startup): + cuuid.SetConverged(false) + send = true } if send { + startup = true // startup finished send = false obj.isStateOK = false - if exit, err := doSend("timer ticked"); exit || err != nil { + if exit, err := obj.DoSend(processChan, "timer ticked"); exit || err != nil { return err // we exit or bubble up a NACK... } }