diff --git a/gapi/empty/empty.go b/gapi/empty/empty.go index e25e4256..47c31c4e 100644 --- a/gapi/empty/empty.go +++ b/gapi/empty/empty.go @@ -30,6 +30,7 @@ package empty import ( + "context" "fmt" "sync" @@ -62,8 +63,8 @@ type GAPI struct { data *gapi.Data initialized bool - closeChan chan struct{} wg *sync.WaitGroup // sync group for tunnel go routines + err error } // Cli takes an *Info struct, and returns our deploy if activated, and if there @@ -91,7 +92,6 @@ func (obj *GAPI) Init(data *gapi.Data) error { return fmt.Errorf("already initialized") } obj.data = data // store for later - obj.closeChan = make(chan struct{}) obj.wg = &sync.WaitGroup{} obj.initialized = true return nil @@ -104,64 +104,58 @@ func (obj *GAPI) Info() *gapi.InfoResult { } } -// Graph returns a current Graph. -func (obj *GAPI) Graph() (*pgraph.Graph, error) { - if !obj.initialized { - return nil, fmt.Errorf("%s: GAPI is not initialized", Name) - } - - obj.data.Logf("generating empty graph...") - g, err := pgraph.NewGraph("empty") - if err != nil { - return nil, err - } - - return g, nil -} - // Next returns nil errors every time there could be a new graph. -func (obj *GAPI) Next() chan gapi.Next { +func (obj *GAPI) Next(ctx context.Context) chan gapi.Next { ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { + err := fmt.Errorf("%s: GAPI is not initialized", Name) next := gapi.Next{ - Err: fmt.Errorf("%s: GAPI is not initialized", Name), + Err: err, Exit: true, // exit, b/c programming error? } select { case ch <- next: - case <-obj.closeChan: + case <-ctx.Done(): + obj.err = ctx.Err() + return } + obj.err = err + return + } + + obj.data.Logf("generating empty graph...") + g, err := pgraph.NewGraph("empty") + if err != nil { + obj.err = err return } // send only one event next := gapi.Next{ - Exit: false, - Err: nil, + Graph: g, + Exit: false, + Err: nil, } select { case ch <- next: // trigger a run (send a msg) // pass // unblock if we exit while waiting to send! - case <-obj.closeChan: + case <-ctx.Done(): + obj.err = ctx.Err() return } }() return ch } -// Close shuts down the lang GAPI. -func (obj *GAPI) Close() error { - if !obj.initialized { - return fmt.Errorf("%s: GAPI is not initialized", Name) - } - close(obj.closeChan) +// Err will contain the last error when Next shuts down. It waits for all the +// running processes to exit before it returns. +func (obj *GAPI) Err() error { obj.wg.Wait() - obj.initialized = false // closed = true - return nil + return obj.err } diff --git a/gapi/gapi.go b/gapi/gapi.go index d6f30e0f..8d2a0cda 100644 --- a/gapi/gapi.go +++ b/gapi/gapi.go @@ -31,6 +31,7 @@ package gapi import ( + "context" "encoding/gob" "fmt" @@ -111,6 +112,9 @@ type Data struct { // Next describes the particular response the GAPI implementer wishes to emit. type Next struct { + // Graph returns the current resource graph. + Graph *pgraph.Graph + // FIXME: the Fast pause parameter should eventually get replaced with a // "SwitchMethod" parameter or similar that instead lets the implementer // choose between fast pause, slow pause, and interrupt. Interrupt could @@ -144,18 +148,11 @@ type GAPI interface { // Info returns some data about the GAPI implementation. Info() *InfoResult - // Graph returns the most recent pgraph. This is called by the engine on - // every event from Next(). - Graph() (*pgraph.Graph, error) + // Next returns a stream of events. Each next event contains a resource + // graph. + Next(ctx context.Context) chan Next - // Next returns a stream of switch events. The engine will run Graph() - // to build a new graph after every Next event. - // TODO: add context for shutting down to the input and change Close to Cleanup - Next() chan Next - - // Close shuts down the GAPI. It asks the GAPI to close, and must cause - // Next() to unblock even if is currently blocked and waiting to send a - // new event. - // TODO: change Close to Cleanup - Close() error + // Err will contain the last error when Next shuts down. It waits for + // all the running processes to exit before it returns. + Err() error } diff --git a/lang/gapi/gapi.go b/lang/gapi/gapi.go index 11e0d04f..9b4992c0 100644 --- a/lang/gapi/gapi.go +++ b/lang/gapi/gapi.go @@ -77,7 +77,6 @@ type GAPI struct { Data *lang.Data lang *lang.Lang // lang struct - wgRun *sync.WaitGroup ctx context.Context cancel func() reterr error @@ -86,8 +85,8 @@ type GAPI struct { // can not be used inside the Cli(...) method. data *gapi.Data initialized bool - closeChan chan struct{} wg *sync.WaitGroup // sync group for tunnel go routines + err error } // Cli takes an *Info struct, and returns our deploy if activated, and if there @@ -511,17 +510,9 @@ func (obj *GAPI) Init(data *gapi.Data) error { return fmt.Errorf("the InputURI param must be specified") } obj.data = data // store for later - obj.closeChan = make(chan struct{}) obj.wg = &sync.WaitGroup{} obj.initialized = true - return nil -} -// LangInit is a wrapper around the lang Init method. -func (obj *GAPI) LangInit(ctx context.Context) error { - if obj.lang != nil { - return nil // already ran init, close first! - } if obj.InputURI == "-" { return fmt.Errorf("stdin passthrough is not supported at this time") } @@ -548,39 +539,11 @@ func (obj *GAPI) LangInit(ctx context.Context) error { obj.data.Logf(Name+": "+format, v...) }, } - if err := lang.Init(ctx); err != nil { + if err := lang.Init(context.TODO()); err != nil { // XXX: CTX? return errwrap.Wrapf(err, "can't init the lang") } obj.lang = lang // once we can't fail, store the struct... - // XXX: I'm certain I've probably got a deadlock or race somewhere here - // or in lib/main.go so we'll fix it with an API fixup and rewrite soon - obj.wgRun = &sync.WaitGroup{} - obj.ctx, obj.cancel = context.WithCancel(context.Background()) - obj.wgRun.Add(1) - go func() { - defer obj.wgRun.Done() - obj.reterr = obj.lang.Run(obj.ctx) - if obj.reterr == nil { - return - } - // XXX: Temporary extra logging for catching bugs! - obj.data.Logf(Name+": %+v", obj.reterr) - }() - - return nil -} - -// LangClose is a wrapper around the lang Close method. -func (obj *GAPI) LangClose() error { - if obj.lang != nil { - obj.cancel() - obj.wgRun.Wait() - err := obj.lang.Cleanup() - err = errwrap.Append(err, obj.reterr) // from obj.lang.Run - obj.lang = nil // clear it to avoid double closing - return errwrap.Wrapf(err, "can't close the lang") // nil passthrough - } return nil } @@ -591,33 +554,21 @@ func (obj *GAPI) Info() *gapi.InfoResult { } } -// Graph returns a current Graph. -func (obj *GAPI) Graph() (*pgraph.Graph, error) { - if !obj.initialized { - return nil, fmt.Errorf("%s: GAPI is not initialized", Name) - } - - g, err := obj.lang.Interpret() - if err != nil { - return nil, errwrap.Wrapf(err, "%s: interpret error", Name) - } - - return g, nil -} - // Next returns nil errors every time there could be a new graph. -func (obj *GAPI) Next() chan gapi.Next { - // TODO: This ctx stuff is temporary until we improve the Next() API. - ctx, cancel := context.WithCancel(context.Background()) +func (obj *GAPI) Next(ctx context.Context) chan gapi.Next { + ch := make(chan gapi.Next) + obj.wg.Add(1) go func() { + defer obj.lang.Cleanup() // after everyone closes + defer obj.wg.Wait() // wait before cleanup defer obj.wg.Done() - select { - case <-obj.closeChan: - cancel() // close the ctx to unblock type unification - } + err := obj.lang.Run(ctx) + // XXX: Temporary extra logging for catching bugs! + obj.data.Logf(Name+": %+v", err) + obj.err = err }() - ch := make(chan gapi.Next) + obj.wg.Add(1) go func() { defer obj.wg.Done() @@ -629,99 +580,38 @@ func (obj *GAPI) Next() chan gapi.Next { } select { case ch <- next: - case <-obj.closeChan: + case <-ctx.Done(): + obj.err = ctx.Err() } return } - startChan := make(chan struct{}) // start signal - close(startChan) // kick it off! - streamChan := make(<-chan error) - //defer obj.LangClose() // close any old lang + streamChan := obj.lang.Stream(ctx) var ok bool for { - var err error - var langSwap bool // do we need to swap the lang object? + var graph *pgraph.Graph select { - // TODO: this should happen in ConfigWatch instead :) - case <-startChan: // kick the loop once at start - startChan = nil // disable - err = nil // set nil as the message to send - langSwap = true - - case err, ok = <-streamChan: // a variable changed + case graph, ok = <-streamChan: // a variable changed if !ok { // the channel closed! return } - case <-obj.closeChan: + case <-ctx.Done(): + obj.err = ctx.Err() return } obj.data.Logf("generating new graph...") - // skip this to pass through the err if present - // XXX: redo this old garbage code - if langSwap && err == nil { - obj.data.Logf("swap!") - // run up to these three but fail on err - if e := obj.LangClose(); e != nil { // close any old lang - err = e // pass through the err - } else if e := obj.LangInit(ctx); e != nil { // init the new one! - err = e // pass through the err - - // Always run LangClose after LangInit - // when done. This is currently needed - // because we should tell the lang obj - // to shut down all the running facts. - if e := obj.LangClose(); e != nil { - err = errwrap.Append(err, e) // list of errors - } - } else { - - if obj.data.NoStreamWatch { // TODO: do we want to allow this for the lang? - obj.data.Logf("warning: language will not stream") - // send only one event - limitChan := make(chan error) - obj.wg.Add(1) - go func() { - defer obj.wg.Done() - defer close(limitChan) - select { - // only one - case err, ok := <-obj.lang.Stream(): - if !ok { - return - } - select { - case limitChan <- err: - case <-obj.closeChan: - return - } - case <-obj.closeChan: - return - } - }() - streamChan = limitChan - } else { - // stream for lang events - streamChan = obj.lang.Stream() // update stream - } - continue // wait for stream to trigger - } - } - next := gapi.Next{ - Exit: err != nil, // TODO: do we want to shutdown? - Err: err, + Graph: graph, } select { case ch <- next: // trigger a run (send a msg) - if err != nil { - return - } + // unblock if we exit while waiting to send! - case <-obj.closeChan: + case <-ctx.Done(): + obj.err = ctx.Err() return } } @@ -729,14 +619,9 @@ func (obj *GAPI) Next() chan gapi.Next { return ch } -// Close shuts down the lang GAPI. -func (obj *GAPI) Close() error { - if !obj.initialized { - return fmt.Errorf("%s: GAPI is not initialized", Name) - } - close(obj.closeChan) +// Err will contain the last error when Next shuts down. It waits for all the +// running processes to exit before it returns. +func (obj *GAPI) Err() error { obj.wg.Wait() - obj.LangClose() // close lang, esp. if blocked in Stream() wait - obj.initialized = false // closed = true - return nil + return obj.err } diff --git a/lib/main.go b/lib/main.go index 371a2fba..26e07452 100644 --- a/lib/main.go +++ b/lib/main.go @@ -703,6 +703,10 @@ func (obj *Main) Run() error { // The GAPI should always kick off an event on Next() at // startup when (and if) it indeed has a graph to share! fastPause := false + + var next gapi.Next // active GAPI next struct + var ok bool + select { case deploy, ok := <-deployChan: if !ok { // channel closed @@ -711,10 +715,6 @@ func (obj *Main) Run() error { if gapiImpl != nil { // currently running... gapiChan = nil - if err := gapiImpl.Close(); err != nil { - err = errwrap.Wrapf(err, "the gapi closed poorly") - Logf("deploy: gapi: final close failed: %+v", err) - } } if started { @@ -741,10 +741,6 @@ func (obj *Main) Run() error { if gapiImpl != nil { // currently running... gapiChan = nil - if err := gapiImpl.Close(); err != nil { - err = errwrap.Wrapf(err, "the gapi closed poorly") - Logf("deploy: gapi: close failed: %+v", err) - } } gapiImpl = gapiObj // copy it to active @@ -770,17 +766,18 @@ func (obj *Main) Run() error { if err := gapiImpl.Init(data); err != nil { Logf("gapi: init failed: %+v", err) // TODO: consider running previous GAPI? - } else { - if obj.Debug { - Logf("gapi: next...") - } - // this must generate at least one event for it to work - gapiChan = gapiImpl.Next() // stream of graph switch events! - gapiInfoResult = gapiImpl.Info() + continue } + + if obj.Debug { + Logf("gapi: next...") + } + // this must generate at least one event for it to work + gapiChan = gapiImpl.Next(exitCtx) // stream of graph switch events! + gapiInfoResult = gapiImpl.Info() continue - case next, ok := <-gapiChan: + case next, ok = <-gapiChan: if !ok { // channel closed if obj.Debug { Logf("gapi exited") @@ -819,11 +816,7 @@ func (obj *Main) Run() error { // make the graph from yaml, lib, puppet->yaml, or mcl! timing = time.Now() - newGraph, err := gapiImpl.Graph() // generate graph! - if err != nil { - Logf("error creating new graph: %+v", err) - continue - } + newGraph := next.Graph // get graph! Logf("new graph took: %s", time.Since(timing)) if obj.Debug { Logf("new graph: %+v", newGraph) @@ -1012,10 +1005,6 @@ func (obj *Main) Run() error { // block gapi until a newDeploy comes in... if gapiImpl != nil { // currently running... gapiChan = nil - if err := gapiImpl.Close(); err != nil { - err = errwrap.Wrapf(err, "the gapi closed poorly") - Logf("deploy: gapi: close failed: %+v", err) - } } continue // stay paused } diff --git a/puppet/gapi.go b/puppet/gapi.go index a5303f2b..f796848e 100644 --- a/puppet/gapi.go +++ b/puppet/gapi.go @@ -30,6 +30,7 @@ package puppet import ( + "context" "fmt" "os" "strings" @@ -76,8 +77,8 @@ type GAPI struct { puppetConf string data *gapi.Data initialized bool - closeChan chan struct{} wg sync.WaitGroup + err error } // Cli takes an *Info struct, and returns our deploy if activated, and if there @@ -239,7 +240,6 @@ func (obj *GAPI) Init(data *gapi.Data) error { } } - obj.closeChan = make(chan struct{}) obj.initialized = true return nil } @@ -251,8 +251,8 @@ func (obj *GAPI) Info() *gapi.InfoResult { } } -// Graph returns a current Graph. -func (obj *GAPI) Graph() (*pgraph.Graph, error) { +// graph returns a current Graph. +func (obj *GAPI) graph() (*pgraph.Graph, error) { if !obj.initialized { return nil, fmt.Errorf("%s: GAPI is not initialized", Name) } @@ -268,21 +268,29 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *GAPI) Next() chan gapi.Next { +func (obj *GAPI) Next(ctx context.Context) chan gapi.Next { puppetChan := func() <-chan time.Time { // helper function return time.Tick(time.Duration(obj.refreshInterval()) * time.Second) } ch := make(chan gapi.Next) obj.wg.Add(1) go func() { + defer obj.cleanup() defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { + err := fmt.Errorf("%s: GAPI is not initialized", Name) next := gapi.Next{ - Err: fmt.Errorf("%s: GAPI is not initialized", Name), + Err: err, Exit: true, // exit, b/c programming error? } - ch <- next + select { + case ch <- next: + case <-ctx.Done(): + obj.err = ctx.Err() + return + } + obj.err = err return } startChan := make(chan struct{}) // start signal @@ -304,24 +312,34 @@ func (obj *GAPI) Next() chan gapi.Next { if !ok { // the channel closed! return } - case <-obj.closeChan: + case <-ctx.Done(): + obj.err = ctx.Err() return } - obj.data.Logf("generating new graph...") if obj.data.NoStreamWatch { pChan = nil } else { pChan = puppetChan() // TODO: okay to update interval in case it changed? } + + obj.data.Logf("generating new graph...") + g, err := obj.graph() + if err != nil { + obj.err = err + return + } + next := gapi.Next{ + Graph: g, //Exit: true, // TODO: for permanent shutdown! Err: nil, } select { case ch <- next: // trigger a run (send a msg) // unblock if we exit while waiting to send! - case <-obj.closeChan: + case <-ctx.Done(): + obj.err = ctx.Err() return } } @@ -329,8 +347,15 @@ func (obj *GAPI) Next() chan gapi.Next { return ch } -// Close shuts down the Puppet GAPI. -func (obj *GAPI) Close() error { +// Err will contain the last error when Next shuts down. It waits for all the +// running processes to exit before it returns. +func (obj *GAPI) Err() error { + obj.wg.Wait() + return obj.err +} + +// cleanup cleans up the Puppet GAPI. +func (obj *GAPI) cleanup() error { if !obj.initialized { return fmt.Errorf("%s: GAPI is not initialized", Name) } @@ -347,7 +372,6 @@ func (obj *GAPI) Close() error { os.Remove(obj.puppetConf) } - close(obj.closeChan) obj.wg.Wait() obj.initialized = false // closed = true return nil diff --git a/yamlgraph/gapi.go b/yamlgraph/gapi.go index d823359b..c9b4667a 100644 --- a/yamlgraph/gapi.go +++ b/yamlgraph/gapi.go @@ -30,6 +30,7 @@ package yamlgraph import ( + "context" "fmt" "sync" @@ -60,6 +61,7 @@ type GAPI struct { initialized bool closeChan chan struct{} wg sync.WaitGroup // sync group for tunnel go routines + err error } // Cli takes an *Info struct, and returns our deploy if activated, and if there @@ -120,8 +122,8 @@ func (obj *GAPI) Info() *gapi.InfoResult { } } -// Graph returns a current Graph. -func (obj *GAPI) Graph() (*pgraph.Graph, error) { +// graph returns a current Graph. +func (obj *GAPI) graph() (*pgraph.Graph, error) { if !obj.initialized { return nil, fmt.Errorf("%s: GAPI is not initialized", Name) } @@ -154,23 +156,27 @@ func (obj *GAPI) Graph() (*pgraph.Graph, error) { } // Next returns nil errors every time there could be a new graph. -func (obj *GAPI) Next() chan gapi.Next { +func (obj *GAPI) Next(ctx context.Context) chan gapi.Next { ch := make(chan gapi.Next) obj.wg.Add(1) go func() { defer obj.wg.Done() defer close(ch) // this will run before the obj.wg.Done() if !obj.initialized { + err := fmt.Errorf("%s: GAPI is not initialized", Name) next := gapi.Next{ - Err: fmt.Errorf("%s: GAPI is not initialized", Name), + Err: err, Exit: true, // exit, b/c programming error? } - ch <- next + select { + case ch <- next: + case <-ctx.Done(): + obj.err = ctx.Err() + return + } + obj.err = err return } - // FIXME: add timeout to context - //ctx, cancel := context.WithCancel(context.Background()) - //defer cancel() startChan := make(chan struct{}) // start signal close(startChan) // kick it off! @@ -203,12 +209,19 @@ func (obj *GAPI) Next() chan gapi.Next { if !ok { return } - case <-obj.closeChan: + case <-ctx.Done(): return } obj.data.Logf("generating new graph...") + g, err := obj.graph() + if err != nil { + obj.err = err + return + } + next := gapi.Next{ + Graph: g, //Exit: true, // TODO: for permanent shutdown! Err: err, } @@ -219,7 +232,7 @@ func (obj *GAPI) Next() chan gapi.Next { // return //} // unblock if we exit while waiting to send! - case <-obj.closeChan: + case <-ctx.Done(): return } } @@ -227,13 +240,9 @@ func (obj *GAPI) Next() chan gapi.Next { return ch } -// Close shuts down the yamlgraph GAPI. -func (obj *GAPI) Close() error { - if !obj.initialized { - return fmt.Errorf("%s: GAPI is not initialized", Name) - } - close(obj.closeChan) +// Err will contain the last error when Next shuts down. It waits for all the +// running processes to exit before it returns. +func (obj *GAPI) Err() error { obj.wg.Wait() - obj.initialized = false // closed = true - return nil + return obj.err }