From 9cbaa892d3cf7738667cfc81569ef579a5266a24 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Fri, 2 Jun 2017 03:39:42 -0400 Subject: [PATCH] gapi: Allow the GAPI implementer to specify fast and exit This allows the implementer of the GAPI to specify three parameters for every Next message sent on the channel. The Fast parameter tells the agent if it should do the pause quickly or if it should finish the sequence. A quick pause means that it will cause a pause immediately after the currently running resources finish, where as a slow (default) pause will allow the wave of execution to finish. This is usually preferred in scenarios where complex graphs are used where we want each step to complete. The Exit parameter tells the engine to exit, and the Err parameter tells the engine that an error occurred. --- examples/lib/exec-send-recv.go | 12 ++++++++---- examples/lib/libmgmt-subgraph0.go | 12 ++++++++---- examples/lib/libmgmt-subgraph1.go | 12 ++++++++---- examples/lib/libmgmt1.go | 13 ++++++++----- examples/lib/libmgmt2.go | 13 ++++++++----- examples/lib/libmgmt3.go | 13 ++++++++----- gapi/gapi.go | 15 ++++++++++++++- lib/main.go | 24 +++++++++++++++--------- puppet/gapi.go | 16 ++++++++++++---- test/shell/libmgmt-change1.go | 28 ++++++++++++++++------------ yamlgraph/gapi.go | 16 ++++++++++++---- yamlgraph2/gapi.go | 16 ++++++++++++---- 12 files changed, 129 insertions(+), 61 deletions(-) diff --git a/examples/lib/exec-send-recv.go b/examples/lib/exec-send-recv.go index 191fdcea..41f5ad5d 100644 --- a/examples/lib/exec-send-recv.go +++ b/examples/lib/exec-send-recv.go @@ -124,14 +124,18 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") + next := gapi.Next{ + Err: fmt.Errorf("libmgmt: MyGAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next return } startChan := make(chan struct{}) // start signal @@ -159,7 +163,7 @@ func (obj *MyGAPI) Next() chan error { log.Printf("libmgmt: Generating new graph...") select { - case ch <- nil: // trigger a run + case ch <- gapi.Next{}: // trigger a run case <-obj.closeChan: return } diff --git a/examples/lib/libmgmt-subgraph0.go b/examples/lib/libmgmt-subgraph0.go index 31805043..d06291c1 100644 --- a/examples/lib/libmgmt-subgraph0.go +++ b/examples/lib/libmgmt-subgraph0.go @@ -133,14 +133,18 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") + next := gapi.Next{ + Err: fmt.Errorf("libmgmt: MyGAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next return } startChan := make(chan struct{}) // start signal @@ -168,7 +172,7 @@ func (obj *MyGAPI) Next() chan error { log.Printf("libmgmt: Generating new graph...") select { - case ch <- nil: // trigger a run + case ch <- gapi.Next{}: // trigger a run case <-obj.closeChan: return } diff --git a/examples/lib/libmgmt-subgraph1.go b/examples/lib/libmgmt-subgraph1.go index d17c95c3..48f7def9 100644 --- a/examples/lib/libmgmt-subgraph1.go +++ b/examples/lib/libmgmt-subgraph1.go @@ -121,14 +121,18 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") + next := gapi.Next{ + Err: fmt.Errorf("libmgmt: MyGAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next return } startChan := make(chan struct{}) // start signal @@ -156,7 +160,7 @@ func (obj *MyGAPI) Next() chan error { log.Printf("libmgmt: Generating new graph...") select { - case ch <- nil: // trigger a run + case ch <- gapi.Next{}: // trigger a run case <-obj.closeChan: return } diff --git a/examples/lib/libmgmt1.go b/examples/lib/libmgmt1.go index c74cde1a..6622d660 100644 --- a/examples/lib/libmgmt1.go +++ b/examples/lib/libmgmt1.go @@ -88,15 +88,18 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") - return + next := gapi.Next{ + Err: fmt.Errorf("libmgmt: MyGAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next } startChan := make(chan struct{}) // start signal close(startChan) // kick it off! @@ -123,7 +126,7 @@ func (obj *MyGAPI) Next() chan error { log.Printf("libmgmt: Generating new graph...") select { - case ch <- nil: // trigger a run + case ch <- gapi.Next{}: // trigger a run case <-obj.closeChan: return } diff --git a/examples/lib/libmgmt2.go b/examples/lib/libmgmt2.go index 77a56b52..dbc36fc8 100644 --- a/examples/lib/libmgmt2.go +++ b/examples/lib/libmgmt2.go @@ -83,15 +83,18 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") - return + next := gapi.Next{ + Err: fmt.Errorf("libmgmt: MyGAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next } startChan := make(chan struct{}) // start signal close(startChan) // kick it off! @@ -118,7 +121,7 @@ func (obj *MyGAPI) Next() chan error { log.Printf("libmgmt: Generating new graph...") select { - case ch <- nil: // trigger a run + case ch <- gapi.Next{}: // trigger a run case <-obj.closeChan: return } diff --git a/examples/lib/libmgmt3.go b/examples/lib/libmgmt3.go index c91594b5..abfc0ca5 100644 --- a/examples/lib/libmgmt3.go +++ b/examples/lib/libmgmt3.go @@ -127,15 +127,18 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") - return + next := gapi.Next{ + Err: fmt.Errorf("libmgmt: MyGAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next } startChan := make(chan struct{}) // start signal close(startChan) // kick it off! @@ -162,7 +165,7 @@ func (obj *MyGAPI) Next() chan error { log.Printf("libmgmt: Generating new graph...") select { - case ch <- nil: // trigger a run + case ch <- gapi.Next{}: // trigger a run case <-obj.closeChan: return } diff --git a/gapi/gapi.go b/gapi/gapi.go index 0b1c0714..63c5f828 100644 --- a/gapi/gapi.go +++ b/gapi/gapi.go @@ -33,10 +33,23 @@ type Data struct { // NOTE: we can add more fields here if needed by GAPI endpoints } +// Next describes the particular response the GAPI implementer wishes to emit. +type Next struct { + // FIXME: the Fast pause parameter should eventually get replaced with a + // "SwitchMethod" parameter or similar that instead lets the implementer + // choose between fast pause, slow pause, and interrupt. Interrupt could + // be a future extension to the Resource API that lets an Interrupt() be + // called if we want to exit immediately from the CheckApply part of the + // resource for some reason. For now we'll keep this simple with a bool. + Fast bool // run a fast pause to switch? + Exit bool // should we cause the program to exit? (specify err or not) + Err error // if something goes wrong (use with or without exit!) +} + // GAPI is a Graph API that represents incoming graphs and change streams. type GAPI interface { Init(Data) error // initializes the GAPI and passes in useful data Graph() (*pgraph.Graph, error) // returns the most recent pgraph - Next() chan error // returns a stream of switch events + Next() chan Next // returns a stream of switch events Close() error // shutdown the GAPI } diff --git a/lib/main.go b/lib/main.go index d19efd87..940660b8 100644 --- a/lib/main.go +++ b/lib/main.go @@ -380,7 +380,7 @@ func (obj *Main) Run() error { Debug: obj.Flags.Debug, } - var gapiChan chan error // stream events are nil errors + var gapiChan chan gapi.Next // stream events contain some instructions! if obj.GAPI != nil { data := gapi.Data{ Hostname: hostname, @@ -405,8 +405,9 @@ func (obj *Main) Run() error { log.Println("Main: Waiting...") // The GAPI should always kick off an event on Next() at // startup when (and if) it indeed has a graph to share! + fastPause := false select { - case err, ok := <-gapiChan: + case next, ok := <-gapiChan: if !ok { // channel closed if obj.Flags.Debug { log.Printf("Main: GAPI exited") @@ -415,17 +416,22 @@ func (obj *Main) Run() error { continue } + // if we've been asked to exit... + if next.Exit { + obj.Exit(next.Err) // trigger exit + continue // wait for exitchan + } + // the gapi lets us send an error to the channel // this means there was a failure, but not fatal - if err != nil { + if err := next.Err; err != nil { log.Printf("Main: Error with graph stream: %v", err) - // TODO: consider adding an option to - // exit on stream errors... - //obj.Exit(err) // trigger exit - continue // wait for exitchan or another event + continue // wait for another event } // everything else passes through to cause a compile! + fastPause = next.Fast // should we pause fast? + case <-exitchan: return } @@ -438,8 +444,8 @@ func (obj *Main) Run() error { // we need the vertices to be paused to work on them, so // run graph vertex LOCK... if !first { // TODO: we can flatten this check out I think - converger.Pause() // FIXME: add sync wait? - graph.Pause(false) // sync + converger.Pause() // FIXME: add sync wait? + graph.Pause(fastPause) // sync //graph.UnGroup() // FIXME: implement me if needed! } diff --git a/puppet/gapi.go b/puppet/gapi.go index caf2e6fd..dd1b7e48 100644 --- a/puppet/gapi.go +++ b/puppet/gapi.go @@ -75,17 +75,21 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *GAPI) Next() chan error { +func (obj *GAPI) Next() chan gapi.Next { puppetChan := func() <-chan time.Time { // helper function return time.Tick(time.Duration(RefreshInterval(obj.PuppetConf)) * time.Second) } - ch := make(chan error) + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("the puppet GAPI is not initialized") + next := gapi.Next{ + Err: fmt.Errorf("the puppet GAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next return } startChan := make(chan struct{}) // start signal @@ -119,8 +123,12 @@ func (obj *GAPI) Next() chan error { } else { pChan = puppetChan() // TODO: okay to update interval in case it changed? } + next := gapi.Next{ + //Exit: true, // TODO: for permanent shutdown! + Err: nil, + } select { - case ch <- nil: // trigger a run (send a msg) + case ch <- next: // trigger a run (send a msg) // unblock if we exit while waiting to send! case <-obj.closeChan: return diff --git a/test/shell/libmgmt-change1.go b/test/shell/libmgmt-change1.go index 7b9abb4f..e16965d0 100644 --- a/test/shell/libmgmt-change1.go +++ b/test/shell/libmgmt-change1.go @@ -75,35 +75,39 @@ func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *MyGAPI) Next() chan error { - ch := make(chan error) +func (obj *MyGAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("%s: MyGAPI is not initialized", obj.Name) + next := gapi.Next{ + Err: fmt.Errorf("%s: MyGAPI is not initialized", obj.Name), + Exit: true, // exit, b/c programming error? + } + ch <- next return } log.Printf("%s: Generating a bunch of new graphs...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} log.Printf("%s: New graph...", obj.Name) - ch <- nil + ch <- gapi.Next{} time.Sleep(1 * time.Second) log.Printf("%s: Done generating graphs!", obj.Name) }() diff --git a/yamlgraph/gapi.go b/yamlgraph/gapi.go index 91496b5b..eee0256b 100644 --- a/yamlgraph/gapi.go +++ b/yamlgraph/gapi.go @@ -77,14 +77,18 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *GAPI) Next() chan error { - ch := make(chan error) +func (obj *GAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("yamlgraph: GAPI is not initialized") + next := gapi.Next{ + Err: fmt.Errorf("yamlgraph: GAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next return } startChan := make(chan struct{}) // start signal @@ -122,8 +126,12 @@ func (obj *GAPI) Next() chan error { } log.Printf("yamlgraph: Generating new graph...") + next := gapi.Next{ + //Exit: true, // TODO: for permanent shutdown! + Err: err, + } select { - case ch <- err: // trigger a run (send a msg) + case ch <- next: // trigger a run (send a msg) // TODO: if the error is really bad, we could: //if err != nil { // return diff --git a/yamlgraph2/gapi.go b/yamlgraph2/gapi.go index 6f50029e..2d9b6c01 100644 --- a/yamlgraph2/gapi.go +++ b/yamlgraph2/gapi.go @@ -77,14 +77,18 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *GAPI) Next() chan error { - ch := make(chan error) +func (obj *GAPI) Next() chan gapi.Next { + ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { - ch <- fmt.Errorf("yamlgraph: GAPI is not initialized") + next := gapi.Next{ + Err: fmt.Errorf("yamlgraph: GAPI is not initialized"), + Exit: true, // exit, b/c programming error? + } + ch <- next return } startChan := make(chan struct{}) // start signal @@ -122,8 +126,12 @@ func (obj *GAPI) Next() chan error { } log.Printf("yamlgraph: Generating new graph...") + next := gapi.Next{ + //Exit: true, // TODO: for permanent shutdown! + Err: err, + } select { - case ch <- err: // trigger a run (send a msg) + case ch <- next: // trigger a run (send a msg) // TODO: if the error is really bad, we could: //if err != nil { // return