lib, gapi: Next method of GAPI should generate first event

This puts the generation of the initial event into the Next method of
the GAPI. If it does not happen, then we will never get a graph. This is
important because this notifies the GAPI when we're actually ready to
try and generate a graph, rather than blocking on the Graph method if we
have a long compile for example.

This is also required for the etcd watch cleanup.
This commit is contained in:
James Shubin
2017-04-10 03:15:26 -04:00
parent 6fd5623b1f
commit a4858be967
10 changed files with 170 additions and 79 deletions

View File

@@ -89,9 +89,6 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
// Next returns nil errors every time there could be a new graph. // Next returns nil errors every time there could be a new graph.
func (obj *MyGAPI) Next() chan error { func (obj *MyGAPI) Next() chan error {
if obj.data.NoWatch || obj.Interval <= 0 {
return nil
}
ch := make(chan error) ch := make(chan error)
obj.wg.Add(1) obj.wg.Add(1)
go func() { go func() {
@@ -101,19 +98,32 @@ func (obj *MyGAPI) Next() chan error {
ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized")
return return
} }
startChan := make(chan struct{}) // start signal
close(startChan) // kick it off!
ticker := make(<-chan time.Time)
if obj.data.NoStreamWatch || obj.Interval <= 0 {
ticker = nil
} else {
// arbitrarily change graph every interval seconds // arbitrarily change graph every interval seconds
ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second) t := time.NewTicker(time.Duration(obj.Interval) * time.Second)
defer ticker.Stop() defer t.Stop()
ticker = t.C
}
for { for {
select { select {
case <-ticker.C: case <-startChan: // kick the loop once at start
log.Printf("libmgmt: Generating new graph...") startChan = nil // disable
select { // pass
case ch <- nil: // trigger a run case <-ticker:
// pass
case <-obj.closeChan: case <-obj.closeChan:
return return
} }
log.Printf("libmgmt: Generating new graph...")
select {
case ch <- nil: // trigger a run
case <-obj.closeChan: case <-obj.closeChan:
return return
} }

View File

@@ -82,9 +82,6 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
// Next returns nil errors every time there could be a new graph. // Next returns nil errors every time there could be a new graph.
func (obj *MyGAPI) Next() chan error { func (obj *MyGAPI) Next() chan error {
if obj.data.NoWatch || obj.Interval <= 0 {
return nil
}
ch := make(chan error) ch := make(chan error)
obj.wg.Add(1) obj.wg.Add(1)
go func() { go func() {
@@ -94,19 +91,32 @@ func (obj *MyGAPI) Next() chan error {
ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized")
return return
} }
startChan := make(chan struct{}) // start signal
close(startChan) // kick it off!
ticker := make(<-chan time.Time)
if obj.data.NoStreamWatch || obj.Interval <= 0 {
ticker = nil
} else {
// arbitrarily change graph every interval seconds // arbitrarily change graph every interval seconds
ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second) t := time.NewTicker(time.Duration(obj.Interval) * time.Second)
defer ticker.Stop() defer t.Stop()
ticker = t.C
}
for { for {
select { select {
case <-ticker.C: case <-startChan: // kick the loop once at start
log.Printf("libmgmt: Generating new graph...") startChan = nil // disable
select { // pass
case ch <- nil: // trigger a run case <-ticker:
// pass
case <-obj.closeChan: case <-obj.closeChan:
return return
} }
log.Printf("libmgmt: Generating new graph...")
select {
case ch <- nil: // trigger a run
case <-obj.closeChan: case <-obj.closeChan:
return return
} }

View File

@@ -129,9 +129,6 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
// Next returns nil errors every time there could be a new graph. // Next returns nil errors every time there could be a new graph.
func (obj *MyGAPI) Next() chan error { func (obj *MyGAPI) Next() chan error {
if obj.data.NoWatch || obj.Interval <= 0 {
return nil
}
ch := make(chan error) ch := make(chan error)
obj.wg.Add(1) obj.wg.Add(1)
go func() { go func() {
@@ -141,19 +138,32 @@ func (obj *MyGAPI) Next() chan error {
ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized")
return return
} }
startChan := make(chan struct{}) // start signal
close(startChan) // kick it off!
ticker := make(<-chan time.Time)
if obj.data.NoStreamWatch || obj.Interval <= 0 {
ticker = nil
} else {
// arbitrarily change graph every interval seconds // arbitrarily change graph every interval seconds
ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second) t := time.NewTicker(time.Duration(obj.Interval) * time.Second)
defer ticker.Stop() defer t.Stop()
ticker = t.C
}
for { for {
select { select {
case <-ticker.C: case <-startChan: // kick the loop once at start
log.Printf("libmgmt: Generating new graph...") startChan = nil // disable
select { // pass
case ch <- nil: // trigger a run case <-ticker:
// pass
case <-obj.closeChan: case <-obj.closeChan:
return return
} }
log.Printf("libmgmt: Generating new graph...")
select {
case ch <- nil: // trigger a run
case <-obj.closeChan: case <-obj.closeChan:
return return
} }

View File

@@ -28,7 +28,8 @@ type Data struct {
Hostname string // uuid for the host, required for GAPI Hostname string // uuid for the host, required for GAPI
World resources.World World resources.World
Noop bool Noop bool
NoWatch bool NoConfigWatch bool
NoStreamWatch bool
// NOTE: we can add more fields here if needed by GAPI endpoints // NOTE: we can add more fields here if needed by GAPI endpoints
} }

View File

@@ -92,6 +92,9 @@ func run(c *cli.Context) error {
obj.Remotes = c.StringSlice("remote") // FIXME: GAPI-ify somehow? obj.Remotes = c.StringSlice("remote") // FIXME: GAPI-ify somehow?
obj.NoWatch = c.Bool("no-watch") obj.NoWatch = c.Bool("no-watch")
obj.NoConfigWatch = c.Bool("no-config-watch")
obj.NoStreamWatch = c.Bool("no-stream-watch")
obj.Noop = c.Bool("noop") obj.Noop = c.Bool("noop")
obj.Sema = c.Int("sema") obj.Sema = c.Int("sema")
obj.Graphviz = c.String("graphviz") obj.Graphviz = c.String("graphviz")
@@ -237,8 +240,17 @@ func CLI(program, version string, flags Flags) error {
cli.BoolFlag{ cli.BoolFlag{
Name: "no-watch", Name: "no-watch",
Usage: "do not update graph under any switch events",
},
cli.BoolFlag{
Name: "no-config-watch",
Usage: "do not update graph on config switch events",
},
cli.BoolFlag{
Name: "no-stream-watch",
Usage: "do not update graph on stream switch events", Usage: "do not update graph on stream switch events",
}, },
cli.BoolFlag{ cli.BoolFlag{
Name: "noop", Name: "noop",
Usage: "globally force all resources into no-op mode", Usage: "globally force all resources into no-op mode",

View File

@@ -65,7 +65,10 @@ type Main struct {
GAPI gapi.GAPI // graph API interface struct GAPI gapi.GAPI // graph API interface struct
Remotes []string // list of remote graph definitions to run Remotes []string // list of remote graph definitions to run
NoWatch bool // do not update graph on watched graph definition file changes NoWatch bool // do not change graph under any circumstances
NoConfigWatch bool // do not update graph due to config changes
NoStreamWatch bool // do not update graph due to stream changes
Noop bool // globally force all resources into no-op mode Noop bool // globally force all resources into no-op mode
Sema int // add a semaphore with this lock count to each resource Sema int // add a semaphore with this lock count to each resource
Graphviz string // output file for graphviz data Graphviz string // output file for graphviz data
@@ -112,6 +115,15 @@ func (obj *Main) Init() error {
return fmt.Errorf("choosing a prefix and the request for a tmp prefix is illogical") return fmt.Errorf("choosing a prefix and the request for a tmp prefix is illogical")
} }
// if we've turned off watching, then be explicit and disable them all!
// if all the watches are disabled, then it's equivalent to no watching
if obj.NoWatch {
obj.NoConfigWatch = true
obj.NoStreamWatch = true
} else if obj.NoConfigWatch && obj.NoStreamWatch {
obj.NoWatch = true
}
obj.idealClusterSize = uint16(obj.IdealClusterSize) obj.idealClusterSize = uint16(obj.IdealClusterSize)
if obj.IdealClusterSize < 0 { // value is undefined, set to the default if obj.IdealClusterSize < 0 { // value is undefined, set to the default
obj.idealClusterSize = etcd.DefaultIdealClusterSize obj.idealClusterSize = etcd.DefaultIdealClusterSize
@@ -361,11 +373,14 @@ func (obj *Main) Run() error {
Hostname: hostname, Hostname: hostname,
World: world, World: world,
Noop: obj.Noop, Noop: obj.Noop,
NoWatch: obj.NoWatch, //NoWatch: obj.NoWatch,
NoConfigWatch: obj.NoConfigWatch,
NoStreamWatch: obj.NoStreamWatch,
} }
if err := obj.GAPI.Init(data); err != nil { if err := obj.GAPI.Init(data); err != nil {
obj.Exit(fmt.Errorf("Main: GAPI: Init failed: %v", err)) obj.Exit(fmt.Errorf("Main: GAPI: Init failed: %v", err))
} else if !obj.NoWatch { } else {
// this must generate at least one event for it to work
gapiChan = obj.GAPI.Next() // stream of graph switch events! gapiChan = obj.GAPI.Next() // stream of graph switch events!
} }
} }
@@ -396,10 +411,6 @@ func (obj *Main) Run() error {
//obj.Exit(err) // trigger exit //obj.Exit(err) // trigger exit
continue // wait for exitchan or another event continue // wait for exitchan or another event
} }
if obj.NoWatch { // extra safety for bad GAPI's
log.Printf("Main: GAPI stream should be quiet with NoWatch!") // fix the GAPI!
continue // no stream events should be sent
}
// everything else passes through to cause a compile! // everything else passes through to cause a compile!
case <-exitchan: case <-exitchan:
@@ -509,7 +520,7 @@ func (obj *Main) Run() error {
configWatcher := recwatch.NewConfigWatcher() configWatcher := recwatch.NewConfigWatcher()
configWatcher.Flags = recwatch.Flags{Debug: obj.Flags.Debug} configWatcher.Flags = recwatch.Flags{Debug: obj.Flags.Debug}
events := configWatcher.Events() events := configWatcher.Events()
if !obj.NoWatch { if !obj.NoWatch { // FIXME: fit this into a clean GAPI?
configWatcher.Add(obj.Remotes...) // add all the files... configWatcher.Add(obj.Remotes...) // add all the files...
} else { } else {
events = nil // signal that no-watch is true events = nil // signal that no-watch is true

View File

@@ -76,9 +76,6 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) {
// Next returns nil errors every time there could be a new graph. // Next returns nil errors every time there could be a new graph.
func (obj *GAPI) Next() chan error { func (obj *GAPI) Next() chan error {
if obj.data.NoWatch {
return nil
}
puppetChan := func() <-chan time.Time { // helper function puppetChan := func() <-chan time.Time { // helper function
return time.Tick(time.Duration(RefreshInterval(obj.PuppetConf)) * time.Second) return time.Tick(time.Duration(RefreshInterval(obj.PuppetConf)) * time.Second)
} }
@@ -93,7 +90,16 @@ func (obj *GAPI) Next() chan error {
} }
startChan := make(chan struct{}) // start signal startChan := make(chan struct{}) // start signal
close(startChan) // kick it off! close(startChan) // kick it off!
pChan := puppetChan()
pChan := make(<-chan time.Time)
// NOTE: we don't look at obj.data.NoConfigWatch since emulating
// puppet means we do not switch graphs on code changes anyways.
if obj.data.NoStreamWatch {
pChan = nil
} else {
pChan = puppetChan()
}
for { for {
select { select {
case <-startChan: // kick the loop once at start case <-startChan: // kick the loop once at start
@@ -108,7 +114,11 @@ func (obj *GAPI) Next() chan error {
} }
log.Printf("Puppet: Generating new graph...") log.Printf("Puppet: Generating new graph...")
if obj.data.NoStreamWatch {
pChan = nil
} else {
pChan = puppetChan() // TODO: okay to update interval in case it changed? pChan = puppetChan() // TODO: okay to update interval in case it changed?
}
select { select {
case ch <- nil: // trigger a run (send a msg) case ch <- nil: // trigger a run (send a msg)
// unblock if we exit while waiting to send! // unblock if we exit while waiting to send!

View File

@@ -74,9 +74,6 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
// Next returns nil errors every time there could be a new graph. // Next returns nil errors every time there could be a new graph.
func (obj *MyGAPI) Next() chan error { func (obj *MyGAPI) Next() chan error {
if obj.data.NoWatch || obj.Interval <= 0 {
return nil
}
ch := make(chan error) ch := make(chan error)
obj.wg.Add(1) obj.wg.Add(1)
go func() { go func() {

View File

@@ -78,9 +78,6 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) {
// Next returns nil errors every time there could be a new graph. // Next returns nil errors every time there could be a new graph.
func (obj *GAPI) Next() chan error { func (obj *GAPI) Next() chan error {
if obj.data.NoWatch {
return nil
}
ch := make(chan error) ch := make(chan error)
obj.wg.Add(1) obj.wg.Add(1)
go func() { go func() {
@@ -92,8 +89,19 @@ func (obj *GAPI) Next() chan error {
} }
startChan := make(chan struct{}) // start signal startChan := make(chan struct{}) // start signal
close(startChan) // kick it off! close(startChan) // kick it off!
watchChan := obj.data.World.ResWatch()
configChan := obj.configWatcher.ConfigWatch(*obj.File) // simple watchChan, configChan := make(chan error), make(chan error)
if obj.data.NoConfigWatch {
configChan = nil
} else {
configChan = obj.configWatcher.ConfigWatch(*obj.File) // simple
}
if obj.data.NoStreamWatch {
watchChan = nil
} else {
watchChan = obj.data.World.ResWatch()
}
for { for {
var err error var err error
var ok bool var ok bool

View File

@@ -78,9 +78,6 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) {
// Next returns nil errors every time there could be a new graph. // Next returns nil errors every time there could be a new graph.
func (obj *GAPI) Next() chan error { func (obj *GAPI) Next() chan error {
if obj.data.NoWatch {
return nil
}
ch := make(chan error) ch := make(chan error)
obj.wg.Add(1) obj.wg.Add(1)
go func() { go func() {
@@ -90,23 +87,48 @@ func (obj *GAPI) Next() chan error {
ch <- fmt.Errorf("yamlgraph: GAPI is not initialized") ch <- fmt.Errorf("yamlgraph: GAPI is not initialized")
return return
} }
configChan := obj.configWatcher.ConfigWatch(*obj.File) // simple startChan := make(chan struct{}) // start signal
close(startChan) // kick it off!
watchChan, configChan := make(chan error), make(chan error)
if obj.data.NoConfigWatch {
configChan = nil
} else {
configChan = obj.configWatcher.ConfigWatch(*obj.File) // simple
}
if obj.data.NoStreamWatch {
watchChan = nil
} else {
watchChan = obj.data.World.ResWatch()
}
for { for {
var err error
var ok bool
select { select {
case err, ok := <-configChan: // returns nil events on ok! case <-startChan: // kick the loop once at start
startChan = nil // disable
// pass
case err, ok = <-watchChan:
if !ok {
return
}
case err, ok = <-configChan: // returns nil events on ok!
if !ok { // the channel closed! if !ok { // the channel closed!
return return
} }
log.Printf("yamlgraph: Generating new graph...")
select {
case ch <- err: // trigger a run (send a msg)
if err != nil {
return
}
// unblock if we exit while waiting to send!
case <-obj.closeChan: case <-obj.closeChan:
return return
} }
log.Printf("yamlgraph: Generating new graph...")
select {
case ch <- err: // trigger a run (send a msg)
// TODO: if the error is really bad, we could:
//if err != nil {
// return
//}
// unblock if we exit while waiting to send!
case <-obj.closeChan: case <-obj.closeChan:
return return
} }