diff --git a/examples/lib/libmgmt1.go b/examples/lib/libmgmt1.go index 510bb7ac..c74cde1a 100644 --- a/examples/lib/libmgmt1.go +++ b/examples/lib/libmgmt1.go @@ -89,9 +89,6 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { // Next returns nil errors every time there could be a new graph. func (obj *MyGAPI) Next() chan error { - if obj.data.NoWatch || obj.Interval <= 0 { - return nil - } ch := make(chan error) obj.wg.Add(1) go func() { @@ -101,19 +98,32 @@ func (obj *MyGAPI) Next() chan error { ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") return } + startChan := make(chan struct{}) // start signal + close(startChan) // kick it off! - // arbitrarily change graph every interval seconds - ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second) - defer ticker.Stop() + ticker := make(<-chan time.Time) + if obj.data.NoStreamWatch || obj.Interval <= 0 { + ticker = nil + } else { + // arbitrarily change graph every interval seconds + t := time.NewTicker(time.Duration(obj.Interval) * time.Second) + defer t.Stop() + ticker = t.C + } for { select { - case <-ticker.C: - log.Printf("libmgmt: Generating new graph...") - select { - case ch <- nil: // trigger a run - case <-obj.closeChan: - return - } + case <-startChan: // kick the loop once at start + startChan = nil // disable + // pass + case <-ticker: + // pass + case <-obj.closeChan: + return + } + + log.Printf("libmgmt: Generating new graph...") + select { + case ch <- nil: // trigger a run case <-obj.closeChan: return } diff --git a/examples/lib/libmgmt2.go b/examples/lib/libmgmt2.go index 9d815846..bbd6d007 100644 --- a/examples/lib/libmgmt2.go +++ b/examples/lib/libmgmt2.go @@ -82,9 +82,6 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { // Next returns nil errors every time there could be a new graph. func (obj *MyGAPI) Next() chan error { - if obj.data.NoWatch || obj.Interval <= 0 { - return nil - } ch := make(chan error) obj.wg.Add(1) go func() { @@ -94,19 +91,32 @@ func (obj *MyGAPI) Next() chan error { ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") return } + startChan := make(chan struct{}) // start signal + close(startChan) // kick it off! - // arbitrarily change graph every interval seconds - ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second) - defer ticker.Stop() + ticker := make(<-chan time.Time) + if obj.data.NoStreamWatch || obj.Interval <= 0 { + ticker = nil + } else { + // arbitrarily change graph every interval seconds + t := time.NewTicker(time.Duration(obj.Interval) * time.Second) + defer t.Stop() + ticker = t.C + } for { select { - case <-ticker.C: - log.Printf("libmgmt: Generating new graph...") - select { - case ch <- nil: // trigger a run - case <-obj.closeChan: - return - } + case <-startChan: // kick the loop once at start + startChan = nil // disable + // pass + case <-ticker: + // pass + case <-obj.closeChan: + return + } + + log.Printf("libmgmt: Generating new graph...") + select { + case ch <- nil: // trigger a run case <-obj.closeChan: return } diff --git a/examples/lib/libmgmt3.go b/examples/lib/libmgmt3.go index c6fd090d..329acdb9 100644 --- a/examples/lib/libmgmt3.go +++ b/examples/lib/libmgmt3.go @@ -129,9 +129,6 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { // Next returns nil errors every time there could be a new graph. func (obj *MyGAPI) Next() chan error { - if obj.data.NoWatch || obj.Interval <= 0 { - return nil - } ch := make(chan error) obj.wg.Add(1) go func() { @@ -141,19 +138,32 @@ func (obj *MyGAPI) Next() chan error { ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") return } + startChan := make(chan struct{}) // start signal + close(startChan) // kick it off! - // arbitrarily change graph every interval seconds - ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second) - defer ticker.Stop() + ticker := make(<-chan time.Time) + if obj.data.NoStreamWatch || obj.Interval <= 0 { + ticker = nil + } else { + // arbitrarily change graph every interval seconds + t := time.NewTicker(time.Duration(obj.Interval) * time.Second) + defer t.Stop() + ticker = t.C + } for { select { - case <-ticker.C: - log.Printf("libmgmt: Generating new graph...") - select { - case ch <- nil: // trigger a run - case <-obj.closeChan: - return - } + case <-startChan: // kick the loop once at start + startChan = nil // disable + // pass + case <-ticker: + // pass + case <-obj.closeChan: + return + } + + log.Printf("libmgmt: Generating new graph...") + select { + case ch <- nil: // trigger a run case <-obj.closeChan: return } diff --git a/gapi/gapi.go b/gapi/gapi.go index d2c2efb0..0b1c0714 100644 --- a/gapi/gapi.go +++ b/gapi/gapi.go @@ -25,10 +25,11 @@ import ( // Data is the set of input values passed into the GAPI structs via Init. type Data struct { - Hostname string // uuid for the host, required for GAPI - World resources.World - Noop bool - NoWatch bool + Hostname string // uuid for the host, required for GAPI + World resources.World + Noop bool + NoConfigWatch bool + NoStreamWatch bool // NOTE: we can add more fields here if needed by GAPI endpoints } diff --git a/lib/cli.go b/lib/cli.go index 27bbf2dc..559355b3 100644 --- a/lib/cli.go +++ b/lib/cli.go @@ -92,6 +92,9 @@ func run(c *cli.Context) error { obj.Remotes = c.StringSlice("remote") // FIXME: GAPI-ify somehow? obj.NoWatch = c.Bool("no-watch") + obj.NoConfigWatch = c.Bool("no-config-watch") + obj.NoStreamWatch = c.Bool("no-stream-watch") + obj.Noop = c.Bool("noop") obj.Sema = c.Int("sema") obj.Graphviz = c.String("graphviz") @@ -237,8 +240,17 @@ func CLI(program, version string, flags Flags) error { cli.BoolFlag{ Name: "no-watch", + Usage: "do not update graph under any switch events", + }, + cli.BoolFlag{ + Name: "no-config-watch", + Usage: "do not update graph on config switch events", + }, + cli.BoolFlag{ + Name: "no-stream-watch", Usage: "do not update graph on stream switch events", }, + cli.BoolFlag{ Name: "noop", Usage: "globally force all resources into no-op mode", diff --git a/lib/main.go b/lib/main.go index 4b79f4ad..b8cbc35a 100644 --- a/lib/main.go +++ b/lib/main.go @@ -65,7 +65,10 @@ type Main struct { GAPI gapi.GAPI // graph API interface struct Remotes []string // list of remote graph definitions to run - NoWatch bool // do not update graph on watched graph definition file changes + NoWatch bool // do not change graph under any circumstances + NoConfigWatch bool // do not update graph due to config changes + NoStreamWatch bool // do not update graph due to stream changes + Noop bool // globally force all resources into no-op mode Sema int // add a semaphore with this lock count to each resource Graphviz string // output file for graphviz data @@ -112,6 +115,15 @@ func (obj *Main) Init() error { return fmt.Errorf("choosing a prefix and the request for a tmp prefix is illogical") } + // if we've turned off watching, then be explicit and disable them all! + // if all the watches are disabled, then it's equivalent to no watching + if obj.NoWatch { + obj.NoConfigWatch = true + obj.NoStreamWatch = true + } else if obj.NoConfigWatch && obj.NoStreamWatch { + obj.NoWatch = true + } + obj.idealClusterSize = uint16(obj.IdealClusterSize) if obj.IdealClusterSize < 0 { // value is undefined, set to the default obj.idealClusterSize = etcd.DefaultIdealClusterSize @@ -361,11 +373,14 @@ func (obj *Main) Run() error { Hostname: hostname, World: world, Noop: obj.Noop, - NoWatch: obj.NoWatch, + //NoWatch: obj.NoWatch, + NoConfigWatch: obj.NoConfigWatch, + NoStreamWatch: obj.NoStreamWatch, } if err := obj.GAPI.Init(data); err != nil { obj.Exit(fmt.Errorf("Main: GAPI: Init failed: %v", err)) - } else if !obj.NoWatch { + } else { + // this must generate at least one event for it to work gapiChan = obj.GAPI.Next() // stream of graph switch events! } } @@ -396,10 +411,6 @@ func (obj *Main) Run() error { //obj.Exit(err) // trigger exit continue // wait for exitchan or another event } - if obj.NoWatch { // extra safety for bad GAPI's - log.Printf("Main: GAPI stream should be quiet with NoWatch!") // fix the GAPI! - continue // no stream events should be sent - } // everything else passes through to cause a compile! case <-exitchan: @@ -509,7 +520,7 @@ func (obj *Main) Run() error { configWatcher := recwatch.NewConfigWatcher() configWatcher.Flags = recwatch.Flags{Debug: obj.Flags.Debug} events := configWatcher.Events() - if !obj.NoWatch { + if !obj.NoWatch { // FIXME: fit this into a clean GAPI? configWatcher.Add(obj.Remotes...) // add all the files... } else { events = nil // signal that no-watch is true diff --git a/puppet/gapi.go b/puppet/gapi.go index a2c4b840..caf2e6fd 100644 --- a/puppet/gapi.go +++ b/puppet/gapi.go @@ -76,9 +76,6 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) { // Next returns nil errors every time there could be a new graph. func (obj *GAPI) Next() chan error { - if obj.data.NoWatch { - return nil - } puppetChan := func() <-chan time.Time { // helper function return time.Tick(time.Duration(RefreshInterval(obj.PuppetConf)) * time.Second) } @@ -93,7 +90,16 @@ func (obj *GAPI) Next() chan error { } startChan := make(chan struct{}) // start signal close(startChan) // kick it off! - pChan := puppetChan() + + pChan := make(<-chan time.Time) + // NOTE: we don't look at obj.data.NoConfigWatch since emulating + // puppet means we do not switch graphs on code changes anyways. + if obj.data.NoStreamWatch { + pChan = nil + } else { + pChan = puppetChan() + } + for { select { case <-startChan: // kick the loop once at start @@ -108,7 +114,11 @@ func (obj *GAPI) Next() chan error { } log.Printf("Puppet: Generating new graph...") - pChan = puppetChan() // TODO: okay to update interval in case it changed? + if obj.data.NoStreamWatch { + pChan = nil + } else { + pChan = puppetChan() // TODO: okay to update interval in case it changed? + } select { case ch <- nil: // trigger a run (send a msg) // unblock if we exit while waiting to send! diff --git a/test/shell/libmgmt-change1.go b/test/shell/libmgmt-change1.go index e13feb0d..086d8985 100644 --- a/test/shell/libmgmt-change1.go +++ b/test/shell/libmgmt-change1.go @@ -74,9 +74,6 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { // Next returns nil errors every time there could be a new graph. func (obj *MyGAPI) Next() chan error { - if obj.data.NoWatch || obj.Interval <= 0 { - return nil - } ch := make(chan error) obj.wg.Add(1) go func() { diff --git a/yamlgraph/gapi.go b/yamlgraph/gapi.go index 0144fe84..91496b5b 100644 --- a/yamlgraph/gapi.go +++ b/yamlgraph/gapi.go @@ -78,9 +78,6 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) { // Next returns nil errors every time there could be a new graph. func (obj *GAPI) Next() chan error { - if obj.data.NoWatch { - return nil - } ch := make(chan error) obj.wg.Add(1) go func() { @@ -92,8 +89,19 @@ func (obj *GAPI) Next() chan error { } startChan := make(chan struct{}) // start signal close(startChan) // kick it off! - watchChan := obj.data.World.ResWatch() - configChan := obj.configWatcher.ConfigWatch(*obj.File) // simple + + watchChan, configChan := make(chan error), make(chan error) + if obj.data.NoConfigWatch { + configChan = nil + } else { + configChan = obj.configWatcher.ConfigWatch(*obj.File) // simple + } + if obj.data.NoStreamWatch { + watchChan = nil + } else { + watchChan = obj.data.World.ResWatch() + } + for { var err error var ok bool diff --git a/yamlgraph2/gapi.go b/yamlgraph2/gapi.go index bc71ebdf..6f50029e 100644 --- a/yamlgraph2/gapi.go +++ b/yamlgraph2/gapi.go @@ -78,9 +78,6 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) { // Next returns nil errors every time there could be a new graph. func (obj *GAPI) Next() chan error { - if obj.data.NoWatch { - return nil - } ch := make(chan error) obj.wg.Add(1) go func() { @@ -90,23 +87,48 @@ func (obj *GAPI) Next() chan error { ch <- fmt.Errorf("yamlgraph: GAPI is not initialized") return } - configChan := obj.configWatcher.ConfigWatch(*obj.File) // simple + startChan := make(chan struct{}) // start signal + close(startChan) // kick it off! + + watchChan, configChan := make(chan error), make(chan error) + if obj.data.NoConfigWatch { + configChan = nil + } else { + configChan = obj.configWatcher.ConfigWatch(*obj.File) // simple + } + if obj.data.NoStreamWatch { + watchChan = nil + } else { + watchChan = obj.data.World.ResWatch() + } + for { + var err error + var ok bool select { - case err, ok := <-configChan: // returns nil events on ok! + case <-startChan: // kick the loop once at start + startChan = nil // disable + // pass + case err, ok = <-watchChan: + if !ok { + return + } + case err, ok = <-configChan: // returns nil events on ok! if !ok { // the channel closed! return } - log.Printf("yamlgraph: Generating new graph...") - select { - case ch <- err: // trigger a run (send a msg) - if err != nil { - return - } - // unblock if we exit while waiting to send! - case <-obj.closeChan: - return - } + case <-obj.closeChan: + return + } + + log.Printf("yamlgraph: Generating new graph...") + select { + case ch <- err: // trigger a run (send a msg) + // TODO: if the error is really bad, we could: + //if err != nil { + // return + //} + // unblock if we exit while waiting to send! case <-obj.closeChan: return }