gapi: Move separate etcd Watch method into GAPI

This cleans up the API to not have a special case for etcd anymore. In
particular, this also adds the requirement that the GAPI must generate
an event on startup as soon as it is ready to generate a graph.
This commit is contained in:
James Shubin
2017-03-20 15:10:27 -04:00
parent 66d9c7091c
commit 6fd5623b1f
6 changed files with 62 additions and 40 deletions

View File

@@ -2096,11 +2096,12 @@ func Leader(obj *EmbdEtcd) (string, error) {
return "", fmt.Errorf("members map is not current") // not found return "", fmt.Errorf("members map is not current") // not found
} }
// WatchAll returns a channel that outputs a true bool when activity occurs // WatchResources returns a channel that outputs events when exported resources
// change.
// TODO: Filter our watch (on the server side if possible) based on the // TODO: Filter our watch (on the server side if possible) based on the
// collection prefixes and filters that we care about... // collection prefixes and filters that we care about...
func WatchAll(obj *EmbdEtcd) chan bool { func WatchResources(obj *EmbdEtcd) chan error {
ch := make(chan bool, 1) // buffer it so we can measure it ch := make(chan error, 1) // buffer it so we can measure it
path := fmt.Sprintf("/%s/exported/", NS) path := fmt.Sprintf("/%s/exported/", NS)
callback := func(re *RE) error { callback := func(re *RE) error {
// TODO: is this even needed? it used to happen on conn errors // TODO: is this even needed? it used to happen on conn errors
@@ -2118,7 +2119,7 @@ func WatchAll(obj *EmbdEtcd) chan bool {
// this check avoids multiple events all queueing up and then // this check avoids multiple events all queueing up and then
// being released continuously long after the changes stopped // being released continuously long after the changes stopped
// do not block! // do not block!
ch <- true // event ch <- nil // event
} }
return nil return nil
} }

View File

@@ -27,6 +27,12 @@ type World struct {
EmbdEtcd *EmbdEtcd EmbdEtcd *EmbdEtcd
} }
// ResWatch returns a channel which spits out events on possible exported
// resource changes.
func (obj *World) ResWatch() chan error {
return WatchResources(obj.EmbdEtcd)
}
// ResExport exports a list of resources under our hostname namespace. // ResExport exports a list of resources under our hostname namespace.
// Subsequent calls replace the previously set collection atomically. // Subsequent calls replace the previously set collection atomically.
func (obj *World) ResExport(resourceList []resources.Res) error { func (obj *World) ResExport(resourceList []resources.Res) error {

View File

@@ -372,25 +372,12 @@ func (obj *Main) Run() error {
exitchan := make(chan struct{}) // exit on close exitchan := make(chan struct{}) // exit on close
go func() { go func() {
startChan := make(chan struct{}) // start signal
close(startChan) // kick it off!
log.Println("Main: Etcd: Starting...")
etcdChan := etcd.WatchAll(EmbdEtcd)
first := true // first loop or not first := true // first loop or not
for { for {
log.Println("Main: Waiting...") log.Println("Main: Waiting...")
// The GAPI should always kick off an event on Next() at
// startup when (and if) it indeed has a graph to share!
select { select {
case <-startChan: // kick the loop once at start
startChan = nil // disable
// pass
case b := <-etcdChan:
if !b { // ignore the message
continue
}
// everything else passes through to cause a compile!
case err, ok := <-gapiChan: case err, ok := <-gapiChan:
if !ok { // channel closed if !ok { // channel closed
if obj.Flags.Debug { if obj.Flags.Debug {
@@ -399,14 +386,21 @@ func (obj *Main) Run() error {
gapiChan = nil // disable it gapiChan = nil // disable it
continue continue
} }
// the gapi lets us send an error to the channel
// this means there was a failure, but not fatal
if err != nil { if err != nil {
obj.Exit(err) // trigger exit log.Printf("Main: Error with graph stream: %v", err)
continue // wait for exitchan // TODO: consider adding an option to
// exit on stream errors...
//obj.Exit(err) // trigger exit
continue // wait for exitchan or another event
} }
if obj.NoWatch { // extra safety for bad GAPI's if obj.NoWatch { // extra safety for bad GAPI's
log.Printf("Main: GAPI stream should be quiet with NoWatch!") // fix the GAPI! log.Printf("Main: GAPI stream should be quiet with NoWatch!") // fix the GAPI!
continue // no stream events should be sent continue // no stream events should be sent
} }
// everything else passes through to cause a compile!
case <-exitchan: case <-exitchan:
return return

View File

@@ -91,21 +91,27 @@ func (obj *GAPI) Next() chan error {
ch <- fmt.Errorf("the puppet GAPI is not initialized") ch <- fmt.Errorf("the puppet GAPI is not initialized")
return return
} }
startChan := make(chan struct{}) // start signal
close(startChan) // kick it off!
pChan := puppetChan() pChan := puppetChan()
for { for {
select { select {
case <-startChan: // kick the loop once at start
startChan = nil // disable
// pass
case _, ok := <-pChan: case _, ok := <-pChan:
if !ok { // the channel closed! if !ok { // the channel closed!
return return
} }
log.Printf("Puppet: Generating new graph...") case <-obj.closeChan:
pChan = puppetChan() // TODO: okay to update interval in case it changed? return
select { }
case ch <- nil: // trigger a run (send a msg)
// unblock if we exit while waiting to send! log.Printf("Puppet: Generating new graph...")
case <-obj.closeChan: pChan = puppetChan() // TODO: okay to update interval in case it changed?
return select {
} case ch <- nil: // trigger a run (send a msg)
// unblock if we exit while waiting to send!
case <-obj.closeChan: case <-obj.closeChan:
return return
} }

View File

@@ -75,6 +75,7 @@ const refreshPathToken = "refresh"
// the GAPI to store state and exchange information throughout the cluster. It // the GAPI to store state and exchange information throughout the cluster. It
// is the interface each machine uses to communicate with the rest of the world. // is the interface each machine uses to communicate with the rest of the world.
type World interface { // TODO: is there a better name for this interface? type World interface { // TODO: is there a better name for this interface?
ResWatch() chan error
ResExport([]Res) error ResExport([]Res) error
// FIXME: should this method take a "filter" data struct instead of many args? // FIXME: should this method take a "filter" data struct instead of many args?
ResCollect(hostnameFilter, kindFilter []string) ([]Res, error) ResCollect(hostnameFilter, kindFilter []string) ([]Res, error)

View File

@@ -90,23 +90,37 @@ func (obj *GAPI) Next() chan error {
ch <- fmt.Errorf("yamlgraph: GAPI is not initialized") ch <- fmt.Errorf("yamlgraph: GAPI is not initialized")
return return
} }
startChan := make(chan struct{}) // start signal
close(startChan) // kick it off!
watchChan := obj.data.World.ResWatch()
configChan := obj.configWatcher.ConfigWatch(*obj.File) // simple configChan := obj.configWatcher.ConfigWatch(*obj.File) // simple
for { for {
var err error
var ok bool
select { select {
case err, ok := <-configChan: // returns nil events on ok! case <-startChan: // kick the loop once at start
startChan = nil // disable
// pass
case err, ok = <-watchChan:
if !ok {
return
}
case err, ok = <-configChan: // returns nil events on ok!
if !ok { // the channel closed! if !ok { // the channel closed!
return return
} }
log.Printf("yamlgraph: Generating new graph...") case <-obj.closeChan:
select { return
case ch <- err: // trigger a run (send a msg) }
if err != nil {
return log.Printf("yamlgraph: Generating new graph...")
} select {
// unblock if we exit while waiting to send! case ch <- err: // trigger a run (send a msg)
case <-obj.closeChan: // TODO: if the error is really bad, we could:
return //if err != nil {
} // return
//}
// unblock if we exit while waiting to send!
case <-obj.closeChan: case <-obj.closeChan:
return return
} }