diff --git a/etcd/etcd.go b/etcd/etcd.go index 9d821a1d..e1df69c9 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -2096,11 +2096,12 @@ func Leader(obj *EmbdEtcd) (string, error) { return "", fmt.Errorf("members map is not current") // not found } -// WatchAll returns a channel that outputs a true bool when activity occurs +// WatchResources returns a channel that outputs events when exported resources +// change. // TODO: Filter our watch (on the server side if possible) based on the // collection prefixes and filters that we care about... -func WatchAll(obj *EmbdEtcd) chan bool { - ch := make(chan bool, 1) // buffer it so we can measure it +func WatchResources(obj *EmbdEtcd) chan error { + ch := make(chan error, 1) // buffer it so we can measure it path := fmt.Sprintf("/%s/exported/", NS) callback := func(re *RE) error { // TODO: is this even needed? it used to happen on conn errors @@ -2118,7 +2119,7 @@ func WatchAll(obj *EmbdEtcd) chan bool { // this check avoids multiple events all queueing up and then // being released continuously long after the changes stopped // do not block! - ch <- true // event + ch <- nil // event } return nil } diff --git a/etcd/world.go b/etcd/world.go index 8c69c1b5..b28a0302 100644 --- a/etcd/world.go +++ b/etcd/world.go @@ -27,6 +27,12 @@ type World struct { EmbdEtcd *EmbdEtcd } +// ResWatch returns a channel which spits out events on possible exported +// resource changes. +func (obj *World) ResWatch() chan error { + return WatchResources(obj.EmbdEtcd) +} + // ResExport exports a list of resources under our hostname namespace. // Subsequent calls replace the previously set collection atomically. func (obj *World) ResExport(resourceList []resources.Res) error { diff --git a/lib/main.go b/lib/main.go index a9f38f06..4b79f4ad 100644 --- a/lib/main.go +++ b/lib/main.go @@ -372,25 +372,12 @@ func (obj *Main) Run() error { exitchan := make(chan struct{}) // exit on close go func() { - startChan := make(chan struct{}) // start signal - close(startChan) // kick it off! - - log.Println("Main: Etcd: Starting...") - etcdChan := etcd.WatchAll(EmbdEtcd) first := true // first loop or not for { log.Println("Main: Waiting...") + // The GAPI should always kick off an event on Next() at + // startup when (and if) it indeed has a graph to share! select { - case <-startChan: // kick the loop once at start - startChan = nil // disable - // pass - - case b := <-etcdChan: - if !b { // ignore the message - continue - } - // everything else passes through to cause a compile! - case err, ok := <-gapiChan: if !ok { // channel closed if obj.Flags.Debug { @@ -399,14 +386,21 @@ func (obj *Main) Run() error { gapiChan = nil // disable it continue } + + // the gapi lets us send an error to the channel + // this means there was a failure, but not fatal if err != nil { - obj.Exit(err) // trigger exit - continue // wait for exitchan + log.Printf("Main: Error with graph stream: %v", err) + // TODO: consider adding an option to + // exit on stream errors... + //obj.Exit(err) // trigger exit + continue // wait for exitchan or another event } if obj.NoWatch { // extra safety for bad GAPI's log.Printf("Main: GAPI stream should be quiet with NoWatch!") // fix the GAPI! continue // no stream events should be sent } + // everything else passes through to cause a compile! case <-exitchan: return diff --git a/puppet/gapi.go b/puppet/gapi.go index 63823317..a2c4b840 100644 --- a/puppet/gapi.go +++ b/puppet/gapi.go @@ -91,21 +91,27 @@ func (obj *GAPI) Next() chan error { ch <- fmt.Errorf("the puppet GAPI is not initialized") return } + startChan := make(chan struct{}) // start signal + close(startChan) // kick it off! pChan := puppetChan() for { select { + case <-startChan: // kick the loop once at start + startChan = nil // disable + // pass case _, ok := <-pChan: if !ok { // the channel closed! return } - log.Printf("Puppet: Generating new graph...") - pChan = puppetChan() // TODO: okay to update interval in case it changed? - select { - case ch <- nil: // trigger a run (send a msg) - // unblock if we exit while waiting to send! - case <-obj.closeChan: - return - } + case <-obj.closeChan: + return + } + + log.Printf("Puppet: Generating new graph...") + pChan = puppetChan() // TODO: okay to update interval in case it changed? + select { + case ch <- nil: // trigger a run (send a msg) + // unblock if we exit while waiting to send! case <-obj.closeChan: return } diff --git a/resources/resources.go b/resources/resources.go index 50ef51e5..7965f4ca 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -75,6 +75,7 @@ const refreshPathToken = "refresh" // the GAPI to store state and exchange information throughout the cluster. It // is the interface each machine uses to communicate with the rest of the world. type World interface { // TODO: is there a better name for this interface? + ResWatch() chan error ResExport([]Res) error // FIXME: should this method take a "filter" data struct instead of many args? ResCollect(hostnameFilter, kindFilter []string) ([]Res, error) diff --git a/yamlgraph/gapi.go b/yamlgraph/gapi.go index 25a6f895..0144fe84 100644 --- a/yamlgraph/gapi.go +++ b/yamlgraph/gapi.go @@ -90,23 +90,37 @@ func (obj *GAPI) Next() chan error { ch <- fmt.Errorf("yamlgraph: GAPI is not initialized") return } + startChan := make(chan struct{}) // start signal + close(startChan) // kick it off! + watchChan := obj.data.World.ResWatch() configChan := obj.configWatcher.ConfigWatch(*obj.File) // simple for { + var err error + var ok bool select { - case err, ok := <-configChan: // returns nil events on ok! + case <-startChan: // kick the loop once at start + startChan = nil // disable + // pass + case err, ok = <-watchChan: + if !ok { + return + } + case err, ok = <-configChan: // returns nil events on ok! if !ok { // the channel closed! return } - log.Printf("yamlgraph: Generating new graph...") - select { - case ch <- err: // trigger a run (send a msg) - if err != nil { - return - } - // unblock if we exit while waiting to send! - case <-obj.closeChan: - return - } + case <-obj.closeChan: + return + } + + log.Printf("yamlgraph: Generating new graph...") + select { + case ch <- err: // trigger a run (send a msg) + // TODO: if the error is really bad, we could: + //if err != nil { + // return + //} + // unblock if we exit while waiting to send! case <-obj.closeChan: return }