diff --git a/examples/lib/exec-send-recv.go b/examples/lib/exec-send-recv.go index 191fdcea..41f5ad5d 100644 --- a/examples/lib/exec-send-recv.go +++ b/examples/lib/exec-send-recv.go @@ -124,14 +124,18 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") + next := gapi.Next{ + Err: fmt.Errorf("libmgmt: MyGAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next return } startChan := make(chan struct{}) // start signal @@ -159,7 +163,7 @@ func (obj *MyGAPI) Next() chan error { log.Printf("libmgmt: Generating new graph...") select { - case ch <- nil: // trigger a run + case ch <- gapi.Next{}: // trigger a run case <-obj.closeChan: return } diff --git a/examples/lib/libmgmt-subgraph0.go b/examples/lib/libmgmt-subgraph0.go index 31805043..d06291c1 100644 --- a/examples/lib/libmgmt-subgraph0.go +++ b/examples/lib/libmgmt-subgraph0.go @@ -133,14 +133,18 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") + next := gapi.Next{ + Err: fmt.Errorf("libmgmt: MyGAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next return } startChan := make(chan struct{}) // start signal @@ -168,7 +172,7 @@ func (obj *MyGAPI) Next() chan error { log.Printf("libmgmt: Generating new graph...") select { - case ch <- nil: // trigger a run + case ch <- gapi.Next{}: // trigger a run case <-obj.closeChan: return } diff --git a/examples/lib/libmgmt-subgraph1.go b/examples/lib/libmgmt-subgraph1.go index d17c95c3..48f7def9 100644 --- a/examples/lib/libmgmt-subgraph1.go +++ b/examples/lib/libmgmt-subgraph1.go @@ -121,14 +121,18 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") + next := gapi.Next{ + Err: fmt.Errorf("libmgmt: MyGAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next return } startChan := make(chan struct{}) // start signal @@ -156,7 +160,7 @@ func (obj *MyGAPI) Next() chan error { log.Printf("libmgmt: Generating new graph...") select { - case ch <- nil: // trigger a run + case ch <- gapi.Next{}: // trigger a run case <-obj.closeChan: return } diff --git a/examples/lib/libmgmt1.go b/examples/lib/libmgmt1.go index c74cde1a..6622d660 100644 --- a/examples/lib/libmgmt1.go +++ b/examples/lib/libmgmt1.go @@ -88,15 +88,18 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") - return + next := gapi.Next{ + Err: fmt.Errorf("libmgmt: MyGAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next } startChan := make(chan struct{}) // start signal close(startChan) // kick it off! @@ -123,7 +126,7 @@ func (obj *MyGAPI) Next() chan error { log.Printf("libmgmt: Generating new graph...") select { - case ch <- nil: // trigger a run + case ch <- gapi.Next{}: // trigger a run case <-obj.closeChan: return } diff --git a/examples/lib/libmgmt2.go b/examples/lib/libmgmt2.go index 77a56b52..dbc36fc8 100644 --- a/examples/lib/libmgmt2.go +++ b/examples/lib/libmgmt2.go @@ -83,15 +83,18 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") - return + next := gapi.Next{ + Err: fmt.Errorf("libmgmt: MyGAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next } startChan := make(chan struct{}) // start signal close(startChan) // kick it off! @@ -118,7 +121,7 @@ func (obj *MyGAPI) Next() chan error { log.Printf("libmgmt: Generating new graph...") select { - case ch <- nil: // trigger a run + case ch <- gapi.Next{}: // trigger a run case <-obj.closeChan: return } diff --git a/examples/lib/libmgmt3.go b/examples/lib/libmgmt3.go index c91594b5..abfc0ca5 100644 --- a/examples/lib/libmgmt3.go +++ b/examples/lib/libmgmt3.go @@ -127,15 +127,18 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") - return + next := gapi.Next{ + Err: fmt.Errorf("libmgmt: MyGAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next } startChan := make(chan struct{}) // start signal close(startChan) // kick it off! @@ -162,7 +165,7 @@ func (obj *MyGAPI) Next() chan error { log.Printf("libmgmt: Generating new graph...") select { - case ch <- nil: // trigger a run + case ch <- gapi.Next{}: // trigger a run case <-obj.closeChan: return } diff --git a/gapi/gapi.go b/gapi/gapi.go index 0b1c0714..63c5f828 100644 --- a/gapi/gapi.go +++ b/gapi/gapi.go @@ -33,10 +33,23 @@ type Data struct { // NOTE: we can add more fields here if needed by GAPI endpoints } +// Next describes the particular response the GAPI implementer wishes to emit. +type Next struct { + // FIXME: the Fast pause parameter should eventually get replaced with a + // "SwitchMethod" parameter or similar that instead lets the implementer + // choose between fast pause, slow pause, and interrupt. Interrupt could + // be a future extension to the Resource API that lets an Interrupt() be + // called if we want to exit immediately from the CheckApply part of the + // resource for some reason. For now we'll keep this simple with a bool. + Fast bool // run a fast pause to switch? + Exit bool // should we cause the program to exit? (specify err or not) + Err error // if something goes wrong (use with or without exit!) +} + // GAPI is a Graph API that represents incoming graphs and change streams. type GAPI interface { Init(Data) error // initializes the GAPI and passes in useful data Graph() (*pgraph.Graph, error) // returns the most recent pgraph - Next() chan error // returns a stream of switch events + Next() chan Next // returns a stream of switch events Close() error // shutdown the GAPI } diff --git a/lib/main.go b/lib/main.go index d19efd87..940660b8 100644 --- a/lib/main.go +++ b/lib/main.go @@ -380,7 +380,7 @@ func (obj *Main) Run() error { Debug: obj.Flags.Debug, } - var gapiChan chan error // stream events are nil errors + var gapiChan chan gapi.Next // stream events contain some instructions! if obj.GAPI != nil { data := gapi.Data{ Hostname: hostname, @@ -405,8 +405,9 @@ func (obj *Main) Run() error { log.Println("Main: Waiting...") // The GAPI should always kick off an event on Next() at // startup when (and if) it indeed has a graph to share! + fastPause := false select { - case err, ok := <-gapiChan: + case next, ok := <-gapiChan: if !ok { // channel closed if obj.Flags.Debug { log.Printf("Main: GAPI exited") @@ -415,17 +416,22 @@ func (obj *Main) Run() error { continue } + // if we've been asked to exit... + if next.Exit { + obj.Exit(next.Err) // trigger exit + continue // wait for exitchan + } + // the gapi lets us send an error to the channel // this means there was a failure, but not fatal - if err != nil { + if err := next.Err; err != nil { log.Printf("Main: Error with graph stream: %v", err) - // TODO: consider adding an option to - // exit on stream errors... - //obj.Exit(err) // trigger exit - continue // wait for exitchan or another event + continue // wait for another event } // everything else passes through to cause a compile! + fastPause = next.Fast // should we pause fast? + case <-exitchan: return } @@ -438,8 +444,8 @@ func (obj *Main) Run() error { // we need the vertices to be paused to work on them, so // run graph vertex LOCK... if !first { // TODO: we can flatten this check out I think - converger.Pause() // FIXME: add sync wait? - graph.Pause(false) // sync + converger.Pause() // FIXME: add sync wait? + graph.Pause(fastPause) // sync //graph.UnGroup() // FIXME: implement me if needed! } diff --git a/puppet/gapi.go b/puppet/gapi.go index caf2e6fd..dd1b7e48 100644 --- a/puppet/gapi.go +++ b/puppet/gapi.go @@ -75,17 +75,21 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *GAPI) Next() chan error { +func (obj *GAPI) Next() chan gapi.Next { puppetChan := func() <-chan time.Time { // helper function return time.Tick(time.Duration(RefreshInterval(obj.PuppetConf)) * time.Second) } - ch := make(chan error) + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("the puppet GAPI is not initialized") + next := gapi.Next{ + Err: fmt.Errorf("the puppet GAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next return } startChan := make(chan struct{}) // start signal @@ -119,8 +123,12 @@ func (obj *GAPI) Next() chan error { } else { pChan = puppetChan() // TODO: okay to update interval in case it changed? } + next := gapi.Next{ + //Exit: true, // TODO: for permanent shutdown! + Err: nil, + } select { - case ch <- nil: // trigger a run (send a msg) + case ch <- next: // trigger a run (send a msg) // unblock if we exit while waiting to send! case <-obj.closeChan: return diff --git a/test/shell/libmgmt-change1.go b/test/shell/libmgmt-change1.go index 7b9abb4f..e16965d0 100644 --- a/test/shell/libmgmt-change1.go +++ b/test/shell/libmgmt-change1.go @@ -75,35 +75,39 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("%s: MyGAPI is not initialized", obj.Name) + next := gapi.Next{ + Err: fmt.Errorf("%s: MyGAPI is not initialized", obj.Name), + Exit: true, // exit, b/c programming error? + } + ch <- next return } log.Printf("%s: Generating a bunch of new graphs...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} time.Sleep(1 * time.Second) log.Printf("%s: Done generating graphs!", obj.Name) }() diff --git a/yamlgraph/gapi.go b/yamlgraph/gapi.go index 91496b5b..eee0256b 100644 --- a/yamlgraph/gapi.go +++ b/yamlgraph/gapi.go @@ -77,14 +77,18 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *GAPI) Next() chan error { - ch := make(chan error) +func (obj *GAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("yamlgraph: GAPI is not initialized") + next := gapi.Next{ + Err: fmt.Errorf("yamlgraph: GAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next return } startChan := make(chan struct{}) // start signal @@ -122,8 +126,12 @@ func (obj *GAPI) Next() chan error { } log.Printf("yamlgraph: Generating new graph...") + next := gapi.Next{ + //Exit: true, // TODO: for permanent shutdown! + Err: err, + } select { - case ch <- err: // trigger a run (send a msg) + case ch <- next: // trigger a run (send a msg) // TODO: if the error is really bad, we could: //if err != nil { // return diff --git a/yamlgraph2/gapi.go b/yamlgraph2/gapi.go index 6f50029e..2d9b6c01 100644 --- a/yamlgraph2/gapi.go +++ b/yamlgraph2/gapi.go @@ -77,14 +77,18 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *GAPI) Next() chan error { - ch := make(chan error) +func (obj *GAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("yamlgraph: GAPI is not initialized") + next := gapi.Next{ + Err: fmt.Errorf("yamlgraph: GAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next return } startChan := make(chan struct{}) // start signal @@ -122,8 +126,12 @@ func (obj *GAPI) Next() chan error { } log.Printf("yamlgraph: Generating new graph...") + next := gapi.Next{ + //Exit: true, // TODO: for permanent shutdown! + Err: err, + } select { - case ch <- err: // trigger a run (send a msg) + case ch <- next: // trigger a run (send a msg) // TODO: if the error is really bad, we could: //if err != nil { // return