diff --git a/docs/function-guide.md b/docs/function-guide.md index 354ae2b9..c5496b43 100644 --- a/docs/function-guide.md +++ b/docs/function-guide.md @@ -239,27 +239,6 @@ use in the other methods. // Init runs some startup code for this function. func (obj *FooFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) // shutdown signal - return nil -} -``` - -### Close - -```golang -Close() error -``` - -This is called to cleanup the function. It usually causes the stream to -shutdown. Even if `Stream()` decided to shutdown early, it might still get -called. It is usually called by the engine to tell the function to shutdown. - -#### Example - -```golang -// Close runs some shutdown code for this function and turns off the stream. -func (obj *FooFunc) Close() error { - close(obj.closeChan) // send a signal to tell the stream to close return nil } ``` @@ -267,23 +246,24 @@ func (obj *FooFunc) Close() error { ### Stream ```golang -Stream() error +Stream(context.Context) error ``` `Stream` is where the real _work_ is done. This method is started by the language function engine. It will run this function while simultaneously sending -it values on the `input` channel. It will only send a complete set of input +it values on the `Input` channel. It will only send a complete set of input values. You should send a value to the output channel when you have decided that one should be produced. Make sure to only use input values of the expected type as declared in the `Info` struct, and send values of the similarly declared appropriate return type. Failure to do so will may result in a panic and -sadness. +sadness. You must shutdown if the input context cancels. You must close the +`Output` channel if you are done generating new values and/or when you shutdown. #### Example ```golang // Stream returns the single value that was generated and then closes. -func (obj *FooFunc) Stream() error { +func (obj *FooFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes var result string for { @@ -300,7 +280,7 @@ func (obj *FooFunc) Stream() error { result = fmt.Sprintf("the input is: %d", ix) - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -309,7 +289,7 @@ func (obj *FooFunc) Stream() error { V: result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } @@ -340,8 +320,6 @@ type FooFunc struct { init *interfaces.Init // this space can be used if needed - - closeChan chan struct{} // shutdown signal } ``` diff --git a/docs/language-guide.md b/docs/language-guide.md index 09bd648e..a94d6983 100644 --- a/docs/language-guide.md +++ b/docs/language-guide.md @@ -683,7 +683,7 @@ Please see the example functions in ### Stream ```golang -Stream() error +Stream(context.Context) error ``` Stream is called by the function engine when it is ready for your function to @@ -692,23 +692,8 @@ value. Failure to produce at least one value will probably cause the function engine to hang waiting for your output. This function must close the `Output` channel when it has no more values to send. The engine will close the `Input` channel when it has no more values to send. This may or may not influence -whether or not you close the `Output` channel. - -#### Example - -```golang -Please see the example functions in -[lang/funcs/core/](https://github.com/purpleidea/mgmt/tree/master/lang/funcs/core/). -``` - -### Close - -```golang -Close() error -``` - -Close asks the particular function to shutdown its `Stream()` function and -return. +whether or not you close the `Output` channel. You must shutdown if the input +context cancels. #### Example diff --git a/lang/funcs/contains_polyfunc.go b/lang/funcs/contains_func.go similarity index 90% rename from lang/funcs/contains_polyfunc.go rename to lang/funcs/contains_func.go index 28a51936..e0b21d39 100644 --- a/lang/funcs/contains_polyfunc.go +++ b/lang/funcs/contains_func.go @@ -18,6 +18,7 @@ package funcs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -37,29 +38,27 @@ const ( ) func init() { - Register(ContainsFuncName, func() interfaces.Func { return &ContainsPolyFunc{} }) // must register the func and name + Register(ContainsFuncName, func() interfaces.Func { return &ContainsFunc{} }) // must register the func and name } -// ContainsPolyFunc returns true if a value is found in a list. Otherwise false. -type ContainsPolyFunc struct { +// ContainsFunc returns true if a value is found in a list. Otherwise false. +type ContainsFunc struct { Type *types.Type // this is the type of value stored in our list init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct // can satisfy the pgraph.Vertex interface. -func (obj *ContainsPolyFunc) String() string { +func (obj *ContainsFunc) String() string { return ContainsFuncName } // ArgGen returns the Nth arg name for this function. -func (obj *ContainsPolyFunc) ArgGen(index int) (string, error) { +func (obj *ContainsFunc) ArgGen(index int) (string, error) { seq := []string{containsArgNameNeedle, containsArgNameHaystack} if l := len(seq); index >= l { return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) @@ -68,7 +67,7 @@ func (obj *ContainsPolyFunc) ArgGen(index int) (string, error) { } // Unify returns the list of invariants that this func produces. -func (obj *ContainsPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { +func (obj *ContainsFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { var invariants []interfaces.Invariant var invar interfaces.Invariant @@ -139,7 +138,7 @@ func (obj *ContainsPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant // invariant. This can only happen once, because by then we'll have given all // the new information we can, and falsely producing redundant information is a // good way to stall the solver if it thinks it keeps learning more things! -func (obj *ContainsPolyFunc) fnBuilder(recurse bool, expr, dummyNeedle, dummyHaystack, dummyOut interfaces.Expr) func(fnInvariants []interfaces.Invariant, solved map[interfaces.Expr]*types.Type) ([]interfaces.Invariant, error) { +func (obj *ContainsFunc) fnBuilder(recurse bool, expr, dummyNeedle, dummyHaystack, dummyOut interfaces.Expr) func(fnInvariants []interfaces.Invariant, solved map[interfaces.Expr]*types.Type) ([]interfaces.Invariant, error) { return func(fnInvariants []interfaces.Invariant, solved map[interfaces.Expr]*types.Type) ([]interfaces.Invariant, error) { for _, invariant := range fnInvariants { // search for this special type of invariant @@ -241,7 +240,7 @@ func (obj *ContainsPolyFunc) fnBuilder(recurse bool, expr, dummyNeedle, dummyHay // Polymorphisms returns the list of possible function signatures available for // this static polymorphic function. It relies on type and value hints to limit // the number of returned possibilities. -func (obj *ContainsPolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { +func (obj *ContainsFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { // TODO: return `variant` as arg for now -- maybe there's a better way? variant := []*types.Type{types.NewType("func(needle variant, haystack variant) bool")} @@ -292,7 +291,7 @@ func (obj *ContainsPolyFunc) Polymorphisms(partialType *types.Type, partialValue // and must be run before Info() and any of the other Func interface methods are // used. This function is idempotent, as long as the arg isn't changed between // runs. -func (obj *ContainsPolyFunc) Build(typ *types.Type) error { +func (obj *ContainsFunc) Build(typ *types.Type) error { // typ is the KindFunc signature we're trying to build... if typ.Kind != types.KindFunc { return fmt.Errorf("input type must be of kind func") @@ -335,7 +334,7 @@ func (obj *ContainsPolyFunc) Build(typ *types.Type) error { } // Validate tells us if the input struct takes a valid form. -func (obj *ContainsPolyFunc) Validate() error { +func (obj *ContainsFunc) Validate() error { if obj.Type == nil { // build must be run first return fmt.Errorf("type is still unspecified") } @@ -344,7 +343,7 @@ func (obj *ContainsPolyFunc) Validate() error { // Info returns some static info about itself. Build must be called before this // will return correct data. -func (obj *ContainsPolyFunc) Info() *interfaces.Info { +func (obj *ContainsFunc) Info() *interfaces.Info { var sig *types.Type if obj.Type != nil { // don't panic if called speculatively s := obj.Type.String() @@ -359,14 +358,13 @@ func (obj *ContainsPolyFunc) Info() *interfaces.Info { } // Init runs some startup code for this function. -func (obj *ContainsPolyFunc) Init(init *interfaces.Init) error { +func (obj *ContainsFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *ContainsPolyFunc) Stream() error { +func (obj *ContainsFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -397,20 +395,14 @@ func (obj *ContainsPolyFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *ContainsPolyFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/core_test.go b/lang/funcs/core/core_test.go index d519f11d..33ebd3ca 100644 --- a/lang/funcs/core/core_test.go +++ b/lang/funcs/core/core_test.go @@ -583,6 +583,9 @@ func TestLiveFuncExec0(t *testing.T) { valueptrch := make(chan int) // which Nth value are we at? killTimeline := make(chan struct{}) // ask timeline to exit + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // wait for close signals wg.Add(1) go func() { @@ -617,7 +620,7 @@ func TestLiveFuncExec0(t *testing.T) { if debug { logf("Running func") } - err := handle.Stream() // sends to output chan + err := handle.Stream(ctx) // sends to output chan t.Logf("test #%d: stream exited with: %+v", index, err) if debug { logf("Exiting func") @@ -740,12 +743,8 @@ func TestLiveFuncExec0(t *testing.T) { t.Logf("test #%d: timeline finished", index) close(argch) - t.Logf("test #%d: running Close", index) - if err := handle.Close(); err != nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: could not close func: %+v", index, err) - return - } + t.Logf("test #%d: running cancel", index) + cancel() }() // read everything diff --git a/lang/funcs/core/datetime/now_fact.go b/lang/funcs/core/datetime/now_fact.go index 3d182a9c..80bae764 100644 --- a/lang/funcs/core/datetime/now_fact.go +++ b/lang/funcs/core/datetime/now_fact.go @@ -18,6 +18,7 @@ package coredatetime import ( + "context" "time" "github.com/purpleidea/mgmt/lang/funcs/facts" @@ -36,8 +37,7 @@ func init() { // DateTimeFact is a fact which returns the current date and time. type DateTimeFact struct { - init *facts.Init - closeChan chan struct{} + init *facts.Init } // String returns a simple name for this fact. This is needed so this struct can @@ -62,12 +62,11 @@ func (obj *DateTimeFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *DateTimeFact) Init(init *facts.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this fact has over time. -func (obj *DateTimeFact) Stream() error { +func (obj *DateTimeFact) Stream(ctx context.Context) error { defer close(obj.init.Output) // always signal when we're done // XXX: this might be an interesting fact to write because: // 1) will the sleeps from the ticker be in sync with the second ticker? @@ -87,7 +86,7 @@ func (obj *DateTimeFact) Stream() error { startChan = nil // disable case <-ticker.C: // received the timer event // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -95,14 +94,8 @@ func (obj *DateTimeFact) Stream() error { case obj.init.Output <- &types.IntValue{ // seconds since 1970... V: time.Now().Unix(), // .UTC() not necessary }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this fact and turns off the stream. -func (obj *DateTimeFact) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/deploy/abspath_func.go b/lang/funcs/core/deploy/abspath_func.go index 557d092c..b51a8e84 100644 --- a/lang/funcs/core/deploy/abspath_func.go +++ b/lang/funcs/core/deploy/abspath_func.go @@ -18,6 +18,7 @@ package coredeploy import ( + "context" "fmt" "strings" @@ -49,8 +50,6 @@ type AbsPathFunc struct { path *string // the active path result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -91,7 +90,6 @@ func (obj *AbsPathFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *AbsPathFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) if obj.data == nil { // programming error return fmt.Errorf("missing function data") @@ -100,7 +98,7 @@ func (obj *AbsPathFunc) Init(init *interfaces.Init) error { } // Stream returns the changing values that this func has over time. -func (obj *AbsPathFunc) Stream() error { +func (obj *AbsPathFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -145,7 +143,7 @@ func (obj *AbsPathFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -153,14 +151,8 @@ func (obj *AbsPathFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *AbsPathFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/deploy/readfile_func.go b/lang/funcs/core/deploy/readfile_func.go index 6d0a922c..57d89fd5 100644 --- a/lang/funcs/core/deploy/readfile_func.go +++ b/lang/funcs/core/deploy/readfile_func.go @@ -18,6 +18,7 @@ package coredeploy import ( + "context" "fmt" "strings" @@ -50,8 +51,6 @@ type ReadFileFunc struct { filename *string // the active filename result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -92,7 +91,6 @@ func (obj *ReadFileFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *ReadFileFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) if obj.data == nil { // programming error return fmt.Errorf("missing function data") @@ -101,7 +99,7 @@ func (obj *ReadFileFunc) Init(init *interfaces.Init) error { } // Stream returns the changing values that this func has over time. -func (obj *ReadFileFunc) Stream() error { +func (obj *ReadFileFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -159,7 +157,7 @@ func (obj *ReadFileFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -167,14 +165,8 @@ func (obj *ReadFileFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *ReadFileFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/deploy/readfileabs_func.go b/lang/funcs/core/deploy/readfileabs_func.go index 523b8fda..e42e92b7 100644 --- a/lang/funcs/core/deploy/readfileabs_func.go +++ b/lang/funcs/core/deploy/readfileabs_func.go @@ -18,6 +18,7 @@ package coredeploy import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/funcs" @@ -50,8 +51,6 @@ type ReadFileAbsFunc struct { filename *string // the active filename result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -92,7 +91,6 @@ func (obj *ReadFileAbsFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *ReadFileAbsFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) if obj.data == nil { // programming error return fmt.Errorf("missing function data") @@ -101,7 +99,7 @@ func (obj *ReadFileAbsFunc) Init(init *interfaces.Init) error { } // Stream returns the changing values that this func has over time. -func (obj *ReadFileAbsFunc) Stream() error { +func (obj *ReadFileAbsFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -145,7 +143,7 @@ func (obj *ReadFileAbsFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -153,14 +151,8 @@ func (obj *ReadFileAbsFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *ReadFileAbsFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/example/flipflop_fact.go b/lang/funcs/core/example/flipflop_fact.go index efd17c23..aa8a27e0 100644 --- a/lang/funcs/core/example/flipflop_fact.go +++ b/lang/funcs/core/example/flipflop_fact.go @@ -18,6 +18,7 @@ package coreexample import ( + "context" "time" "github.com/purpleidea/mgmt/lang/funcs/facts" @@ -38,9 +39,8 @@ func init() { // and is not meant for serious computing. This would be better served by a flip // function which you could specify an interval for. type FlipFlopFact struct { - init *facts.Init - value bool - closeChan chan struct{} + init *facts.Init + value bool } // String returns a simple name for this fact. This is needed so this struct can @@ -65,12 +65,11 @@ func (obj *FlipFlopFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *FlipFlopFact) Init(init *facts.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this fact has over time. -func (obj *FlipFlopFact) Stream() error { +func (obj *FlipFlopFact) Stream(ctx context.Context) error { defer close(obj.init.Output) // always signal when we're done // TODO: don't hard code 5 sec interval ticker := time.NewTicker(time.Duration(5) * time.Second) @@ -85,7 +84,7 @@ func (obj *FlipFlopFact) Stream() error { startChan = nil // disable case <-ticker.C: // received the timer event // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -93,16 +92,10 @@ func (obj *FlipFlopFact) Stream() error { case obj.init.Output <- &types.BoolValue{ // flip V: obj.value, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } obj.value = !obj.value // flip it } } - -// Close runs some shutdown code for this fact and turns off the stream. -func (obj *FlipFlopFact) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/example/vumeter_func.go b/lang/funcs/core/example/vumeter_func.go index ac11dae7..2e0db7fd 100644 --- a/lang/funcs/core/example/vumeter_func.go +++ b/lang/funcs/core/example/vumeter_func.go @@ -59,8 +59,6 @@ type VUMeterFunc struct { peak float64 result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -133,12 +131,11 @@ func (obj *VUMeterFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *VUMeterFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *VUMeterFunc) Stream() error { +func (obj *VUMeterFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes ticker := newTicker() defer ticker.Stop() @@ -222,7 +219,7 @@ func (obj *VUMeterFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -230,18 +227,12 @@ func (obj *VUMeterFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs some shutdown code for this function and turns off the stream. -func (obj *VUMeterFunc) Close() error { - close(obj.closeChan) - return nil -} - func newTicker() *time.Ticker { return time.NewTicker(time.Duration(1) * time.Second) } diff --git a/lang/funcs/core/fmt/printf_func.go b/lang/funcs/core/fmt/printf_func.go index fb615280..c998b76c 100644 --- a/lang/funcs/core/fmt/printf_func.go +++ b/lang/funcs/core/fmt/printf_func.go @@ -18,6 +18,7 @@ package corefmt import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/funcs" @@ -55,8 +56,6 @@ type PrintfFunc struct { last types.Value // last value received to use for diff result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -367,12 +366,11 @@ func (obj *PrintfFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *PrintfFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *PrintfFunc) Stream() error { +func (obj *PrintfFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -409,7 +407,7 @@ func (obj *PrintfFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -417,18 +415,12 @@ func (obj *PrintfFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs some shutdown code for this function and turns off the stream. -func (obj *PrintfFunc) Close() error { - close(obj.closeChan) - return nil -} - // valueToString prints our values how we expect for printf. // FIXME: if this turns out to be useful, add it to the types package. func valueToString(value types.Value) string { diff --git a/lang/funcs/core/iter/map_func.go b/lang/funcs/core/iter/map_func.go index 1981f20a..c09eb818 100644 --- a/lang/funcs/core/iter/map_func.go +++ b/lang/funcs/core/iter/map_func.go @@ -18,6 +18,7 @@ package coreiter import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/funcs" @@ -62,8 +63,6 @@ type MapFunc struct { function func([]types.Value) (types.Value, error) result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -556,12 +555,11 @@ func (obj *MapFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *MapFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *MapFunc) Stream() error { +func (obj *MapFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes rtyp := types.NewType(fmt.Sprintf("[]%s", obj.RType.String())) for { @@ -613,21 +611,15 @@ func (obj *MapFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *MapFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/os/readfile_func.go b/lang/funcs/core/os/readfile_func.go index 8e37872e..fc65801d 100644 --- a/lang/funcs/core/os/readfile_func.go +++ b/lang/funcs/core/os/readfile_func.go @@ -18,6 +18,7 @@ package coreos import ( + "context" "fmt" "io/ioutil" "sync" @@ -55,8 +56,6 @@ type ReadFileFunc struct { wg *sync.WaitGroup result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -94,13 +93,13 @@ func (obj *ReadFileFunc) Init(init *interfaces.Init) error { obj.init = init obj.events = make(chan error) obj.wg = &sync.WaitGroup{} - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *ReadFileFunc) Stream() error { +func (obj *ReadFileFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes + defer close(obj.events) // clean up for fun defer obj.wg.Wait() defer func() { if obj.recWatcher != nil { @@ -182,7 +181,7 @@ func (obj *ReadFileFunc) Stream() error { case obj.events <- err: // send event... - case <-obj.closeChan: + case <-ctx.Done(): // don't block here on shutdown return } @@ -215,7 +214,7 @@ func (obj *ReadFileFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -223,16 +222,8 @@ func (obj *ReadFileFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *ReadFileFunc) Close() error { - close(obj.closeChan) - obj.wg.Wait() // block so we don't exit by the closure of obj.events - close(obj.events) // clean up for fun - return nil -} diff --git a/lang/funcs/core/os/system_func.go b/lang/funcs/core/os/system_func.go index 442562b5..0ad428dd 100644 --- a/lang/funcs/core/os/system_func.go +++ b/lang/funcs/core/os/system_func.go @@ -49,10 +49,8 @@ func init() { // after the other, the downstream resources might not run for every line unless // the "Meta:realize" metaparam is set to true. type SystemFunc struct { - init *interfaces.Init - - closeChan chan struct{} - cancel context.CancelFunc + init *interfaces.Init + cancel context.CancelFunc } // String returns a simple name for this function. This is needed so this struct @@ -89,12 +87,15 @@ func (obj *SystemFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *SystemFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *SystemFunc) Stream() error { +func (obj *SystemFunc) Stream(ctx context.Context) error { + // XXX: this implementation is a bit awkward especially with the port to + // the Stream(context.Context) signature change. This is a straight port + // but we could refactor this eventually. + // Close the output chan to signal that no more values are coming. defer close(obj.init.Output) @@ -115,7 +116,7 @@ func (obj *SystemFunc) Stream() error { // Kill the current process, if any. A new cancel function is created // each time a new process is started. - var ctx context.Context + var innerCtx context.Context defer func() { if obj.cancel == nil { return @@ -132,7 +133,7 @@ func (obj *SystemFunc) Stream() error { select { case <-processedChan: return nil - case <-obj.closeChan: + case <-ctx.Done(): return nil } } @@ -147,8 +148,8 @@ func (obj *SystemFunc) Stream() error { // Run the command, connecting it to ctx so we can kill // it if needed, and to two Readers so we can read its // stdout and stderr. - ctx, obj.cancel = context.WithCancel(context.Background()) - cmd := exec.CommandContext(ctx, "sh", "-c", shellCommand) + innerCtx, obj.cancel = context.WithCancel(context.Background()) + cmd := exec.CommandContext(innerCtx, "sh", "-c", shellCommand) stdoutReader, err := cmd.StdoutPipe() if err != nil { return err @@ -205,14 +206,8 @@ func (obj *SystemFunc) Stream() error { wg.Wait() close(processedChan) }() - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *SystemFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/random1_func.go b/lang/funcs/core/random1_func.go index 376e5224..982817c9 100644 --- a/lang/funcs/core/random1_func.go +++ b/lang/funcs/core/random1_func.go @@ -18,6 +18,7 @@ package core // TODO: should this be in its own individual package? import ( + "context" "crypto/rand" "fmt" "math/big" @@ -56,8 +57,6 @@ type Random1Func struct { init *interfaces.Init finished bool // did we send the random string? - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -119,12 +118,11 @@ func generate(length uint16) (string, error) { // Init runs some startup code for this function. func (obj *Random1Func) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the single value that was generated and then closes. -func (obj *Random1Func) Stream() error { +func (obj *Random1Func) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes var result string for { @@ -153,7 +151,7 @@ func (obj *Random1Func) Stream() error { return err // no errwrap needed b/c helper func } - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -164,14 +162,8 @@ func (obj *Random1Func) Stream() error { // we only send one value, then wait for input to close obj.finished = true - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *Random1Func) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/sys/cpucount_fact.go b/lang/funcs/core/sys/cpucount_fact.go index d8e0d03e..b8911d5b 100644 --- a/lang/funcs/core/sys/cpucount_fact.go +++ b/lang/funcs/core/sys/cpucount_fact.go @@ -20,6 +20,7 @@ package coresys import ( + "context" "io/ioutil" "regexp" "strconv" @@ -50,8 +51,7 @@ func init() { // CPUCountFact is a fact that returns the current CPU count. type CPUCountFact struct { - init *facts.Init - closeChan chan struct{} + init *facts.Init } // String returns a simple name for this fact. This is needed so this struct can @@ -67,18 +67,16 @@ func (obj *CPUCountFact) Info() *facts.Info { } } -// Init runs startup code for this fact. Initializes the closeChan and sets the -// facts.Init variable. +// Init runs startup code for this fact and sets the facts.Init variable. func (obj *CPUCountFact) Init(init *facts.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this fact has over time. It will // first poll sysfs to get the initial cpu count, and then receives UEvents from // the kernel as CPUs are added/removed. -func (obj CPUCountFact) Stream() error { +func (obj CPUCountFact) Stream(ctx context.Context) error { defer close(obj.init.Output) // signal when we're done ss, err := socketset.NewSocketSet(rtmGrps, socketFile, unix.NETLINK_KOBJECT_UEVENT) @@ -152,7 +150,7 @@ func (obj CPUCountFact) Stream() error { continue } } - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -167,18 +165,12 @@ func (obj CPUCountFact) Stream() error { }: once = true // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs cleanup code for the fact and turns off the Stream. -func (obj *CPUCountFact) Close() error { - close(obj.closeChan) - return nil -} - // getCPUCount looks in sysfs to get the number of CPUs that are online. func getCPUCount() (int64, error) { dat, err := ioutil.ReadFile("/sys/devices/system/cpu/online") diff --git a/lang/funcs/core/sys/cpucount_fact_test.go b/lang/funcs/core/sys/cpucount_fact_test.go index ea4a4424..46f32cc1 100644 --- a/lang/funcs/core/sys/cpucount_fact_test.go +++ b/lang/funcs/core/sys/cpucount_fact_test.go @@ -20,6 +20,7 @@ package coresys import ( + "context" "testing" "github.com/purpleidea/mgmt/lang/funcs/facts" @@ -41,8 +42,9 @@ func TestSimple(t *testing.T) { return } + ctx, cancel := context.WithCancel(context.Background()) go func() { - defer fact.Close() + defer cancel() Loop: for { select { @@ -54,7 +56,7 @@ func TestSimple(t *testing.T) { }() // now start the stream - if err := fact.Stream(); err != nil { + if err := fact.Stream(ctx); err != nil { t.Error(err) } } diff --git a/lang/funcs/core/sys/hostname_fact.go b/lang/funcs/core/sys/hostname_fact.go index eed041f2..c088971a 100644 --- a/lang/funcs/core/sys/hostname_fact.go +++ b/lang/funcs/core/sys/hostname_fact.go @@ -18,6 +18,8 @@ package coresys import ( + "context" + "github.com/purpleidea/mgmt/lang/funcs/facts" "github.com/purpleidea/mgmt/lang/types" ) @@ -35,8 +37,7 @@ func init() { // HostnameFact is a function that returns the hostname. // TODO: support hostnames that change in the future. type HostnameFact struct { - init *facts.Init - closeChan chan struct{} + init *facts.Init } // String returns a simple name for this fact. This is needed so this struct can @@ -61,26 +62,19 @@ func (obj *HostnameFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *HostnameFact) Init(init *facts.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the single value that this fact has, and then closes. -func (obj *HostnameFact) Stream() error { +func (obj *HostnameFact) Stream(ctx context.Context) error { select { case obj.init.Output <- &types.StrValue{ V: obj.init.Hostname, }: // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } close(obj.init.Output) // signal that we're done sending return nil } - -// Close runs some shutdown code for this fact and turns off the stream. -func (obj *HostnameFact) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/sys/load_fact.go b/lang/funcs/core/sys/load_fact.go index 3599cd0d..f6e1e212 100644 --- a/lang/funcs/core/sys/load_fact.go +++ b/lang/funcs/core/sys/load_fact.go @@ -18,6 +18,7 @@ package coresys import ( + "context" "time" "github.com/purpleidea/mgmt/lang/funcs/facts" @@ -39,8 +40,7 @@ func init() { // LoadFact is a fact which returns the current system load. type LoadFact struct { - init *facts.Init - closeChan chan struct{} + init *facts.Init } // String returns a simple name for this fact. This is needed so this struct can @@ -65,12 +65,11 @@ func (obj *LoadFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *LoadFact) Init(init *facts.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this fact has over time. -func (obj *LoadFact) Stream() error { +func (obj *LoadFact) Stream(ctx context.Context) error { defer close(obj.init.Output) // always signal when we're done // it seems the different values only update once every 5 @@ -88,7 +87,7 @@ func (obj *LoadFact) Stream() error { startChan = nil // disable case <-ticker.C: // received the timer event // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -107,14 +106,8 @@ func (obj *LoadFact) Stream() error { select { case obj.init.Output <- st: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this fact and turns off the stream. -func (obj *LoadFact) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/sys/uptime_fact.go b/lang/funcs/core/sys/uptime_fact.go index 0db7cda7..e3fb8387 100644 --- a/lang/funcs/core/sys/uptime_fact.go +++ b/lang/funcs/core/sys/uptime_fact.go @@ -18,6 +18,7 @@ package coresys import ( + "context" "time" "github.com/purpleidea/mgmt/lang/funcs/facts" @@ -37,8 +38,7 @@ func init() { // UptimeFact is a fact which returns the current uptime of your system. type UptimeFact struct { - init *facts.Init - closeChan chan struct{} + init *facts.Init } // String returns a simple name for this fact. This is needed so this struct can @@ -57,12 +57,11 @@ func (obj *UptimeFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *UptimeFact) Init(init *facts.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this fact has over time. -func (obj *UptimeFact) Stream() error { +func (obj *UptimeFact) Stream(ctx context.Context) error { defer close(obj.init.Output) ticker := time.NewTicker(time.Duration(1) * time.Second) @@ -75,7 +74,7 @@ func (obj *UptimeFact) Stream() error { startChan = nil case <-ticker.C: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -87,14 +86,8 @@ func (obj *UptimeFact) Stream() error { select { case obj.init.Output <- &types.IntValue{V: uptime}: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this fact and turns off the stream. -func (obj *UptimeFact) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/template_func.go b/lang/funcs/core/template_func.go index fc47e2cc..75b05bef 100644 --- a/lang/funcs/core/template_func.go +++ b/lang/funcs/core/template_func.go @@ -19,6 +19,7 @@ package core // TODO: should this be in its own individual package? import ( "bytes" + "context" "fmt" "reflect" "strings" @@ -73,8 +74,6 @@ type TemplateFunc struct { last types.Value // last value received to use for diff result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -367,7 +366,6 @@ func (obj *TemplateFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *TemplateFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } @@ -491,7 +489,7 @@ Loop: } // Stream returns the changing values that this func has over time. -func (obj *TemplateFunc) Stream() error { +func (obj *TemplateFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -526,7 +524,7 @@ func (obj *TemplateFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -534,18 +532,12 @@ func (obj *TemplateFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs some shutdown code for this function and turns off the stream. -func (obj *TemplateFunc) Close() error { - close(obj.closeChan) - return nil -} - // safename renames the functions so they're valid inside the template. This is // a limitation of the template library, and it might be worth moving to a new // one. diff --git a/lang/funcs/core/world/exchange_func.go b/lang/funcs/core/world/exchange_func.go index b146e19a..d110dd4e 100644 --- a/lang/funcs/core/world/exchange_func.go +++ b/lang/funcs/core/world/exchange_func.go @@ -52,7 +52,6 @@ type ExchangeFunc struct { result types.Value // last calculated output watchChan chan error - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -93,14 +92,13 @@ func (obj *ExchangeFunc) Info() *interfaces.Info { func (obj *ExchangeFunc) Init(init *interfaces.Init) error { obj.init = init obj.watchChan = make(chan error) // XXX: sender should close this, but did I implement that part yet??? - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *ExchangeFunc) Stream() error { +func (obj *ExchangeFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() for { select { @@ -189,21 +187,15 @@ func (obj *ExchangeFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *ExchangeFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/world/kvlookup_func.go b/lang/funcs/core/world/kvlookup_func.go index 7903980d..42385a17 100644 --- a/lang/funcs/core/world/kvlookup_func.go +++ b/lang/funcs/core/world/kvlookup_func.go @@ -51,7 +51,6 @@ type KVLookupFunc struct { result types.Value // last calculated output watchChan chan error - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -90,14 +89,13 @@ func (obj *KVLookupFunc) Info() *interfaces.Info { func (obj *KVLookupFunc) Init(init *interfaces.Init) error { obj.init = init obj.watchChan = make(chan error) // XXX: sender should close this, but did I implement that part yet??? - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *KVLookupFunc) Stream() error { +func (obj *KVLookupFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() for { select { @@ -143,7 +141,7 @@ func (obj *KVLookupFunc) Stream() error { select { case obj.init.Output <- result: // send one! // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -176,25 +174,19 @@ func (obj *KVLookupFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs some shutdown code for this function and turns off the stream. -func (obj *KVLookupFunc) Close() error { - close(obj.closeChan) - return nil -} - // buildMap builds the result map which we'll need. It uses struct variables. func (obj *KVLookupFunc) buildMap(ctx context.Context) (types.Value, error) { keyMap, err := obj.init.World.StrMapGet(ctx, obj.namespace) diff --git a/lang/funcs/core/world/schedule_func.go b/lang/funcs/core/world/schedule_func.go index b102b913..2476a803 100644 --- a/lang/funcs/core/world/schedule_func.go +++ b/lang/funcs/core/world/schedule_func.go @@ -62,12 +62,12 @@ const ( ) func init() { - funcs.ModuleRegister(ModuleName, ScheduleFuncName, func() interfaces.Func { return &SchedulePolyFunc{} }) + funcs.ModuleRegister(ModuleName, ScheduleFuncName, func() interfaces.Func { return &ScheduleFunc{} }) } -// SchedulePolyFunc is special function which determines where code should run -// in the cluster. -type SchedulePolyFunc struct { +// ScheduleFunc is special function which determines where code should run in +// the cluster. +type ScheduleFunc struct { Type *types.Type // this is the type of opts used if specified built bool // was this function built yet? @@ -81,17 +81,16 @@ type SchedulePolyFunc struct { result types.Value // last calculated output watchChan chan *schedulerResult - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct // can satisfy the pgraph.Vertex interface. -func (obj *SchedulePolyFunc) String() string { +func (obj *ScheduleFunc) String() string { return ScheduleFuncName } // validOpts returns the available mapping of valid opts fields to types. -func (obj *SchedulePolyFunc) validOpts() map[string]*types.Type { +func (obj *ScheduleFunc) validOpts() map[string]*types.Type { return map[string]*types.Type{ "strategy": types.TypeStr, "max": types.TypeInt, @@ -101,7 +100,7 @@ func (obj *SchedulePolyFunc) validOpts() map[string]*types.Type { } // ArgGen returns the Nth arg name for this function. -func (obj *SchedulePolyFunc) ArgGen(index int) (string, error) { +func (obj *ScheduleFunc) ArgGen(index int) (string, error) { seq := []string{scheduleArgNameNamespace, scheduleArgNameOpts} // 2nd arg is optional if l := len(seq); index >= l { return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) @@ -110,7 +109,7 @@ func (obj *SchedulePolyFunc) ArgGen(index int) (string, error) { } // Unify returns the list of invariants that this func produces. -func (obj *SchedulePolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { +func (obj *ScheduleFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { var invariants []interfaces.Invariant var invar interfaces.Invariant @@ -311,7 +310,7 @@ func (obj *SchedulePolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant // Polymorphisms returns the list of possible function signatures available for // this static polymorphic function. It relies on type and value hints to limit // the number of returned possibilities. -func (obj *SchedulePolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { +func (obj *ScheduleFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { // TODO: technically, we could generate all permutations of the struct! //variant := []*types.Type{} //t0 := types.NewType("func(namespace str) []str") @@ -399,7 +398,7 @@ func (obj *SchedulePolyFunc) Polymorphisms(partialType *types.Type, partialValue // and must be run before Info() and any of the other Func interface methods are // used. This function is idempotent, as long as the arg isn't changed between // runs. -func (obj *SchedulePolyFunc) Build(typ *types.Type) error { +func (obj *ScheduleFunc) Build(typ *types.Type) error { // typ is the KindFunc signature we're trying to build... if typ.Kind != types.KindFunc { return fmt.Errorf("input type must be of kind func") @@ -482,7 +481,7 @@ func (obj *SchedulePolyFunc) Build(typ *types.Type) error { } // Validate tells us if the input struct takes a valid form. -func (obj *SchedulePolyFunc) Validate() error { +func (obj *ScheduleFunc) Validate() error { if !obj.built { return fmt.Errorf("function wasn't built yet") } @@ -495,7 +494,7 @@ func (obj *SchedulePolyFunc) Validate() error { // Info returns some static info about itself. Build must be called before this // will return correct data. -func (obj *SchedulePolyFunc) Info() *interfaces.Info { +func (obj *ScheduleFunc) Info() *interfaces.Info { // It's important that you don't return a non-nil sig if this is called // before you're built. Type unification may call it opportunistically. var typ *types.Type @@ -515,16 +514,15 @@ func (obj *SchedulePolyFunc) Info() *interfaces.Info { } // Init runs some startup code for this function. -func (obj *SchedulePolyFunc) Init(init *interfaces.Init) error { +func (obj *ScheduleFunc) Init(init *interfaces.Init) error { obj.init = init obj.watchChan = make(chan *schedulerResult) - obj.closeChan = make(chan struct{}) //obj.init.Debug = true // use this for local debugging return nil } // Stream returns the changing values that this func has over time. -func (obj *SchedulePolyFunc) Stream() error { +func (obj *ScheduleFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -618,24 +616,28 @@ func (obj *SchedulePolyFunc) Stream() error { // process the stream of scheduling output... go func() { defer close(obj.watchChan) - ctx, cancel := context.WithCancel(context.Background()) + // XXX: maybe we could share the parent + // ctx, but I have to work out the + // ordering logic first. For now this is + // just a port of what it was before. + newCtx, cancel := context.WithCancel(context.Background()) go func() { defer cancel() // unblock Next() defer obj.scheduler.Shutdown() select { - case <-obj.closeChan: + case <-ctx.Done(): return } }() for { - hosts, err := obj.scheduler.Next(ctx) + hosts, err := obj.scheduler.Next(newCtx) select { case obj.watchChan <- &schedulerResult{ hosts: hosts, err: err, }: - case <-obj.closeChan: + case <-ctx.Done(): return } } @@ -688,25 +690,19 @@ func (obj *SchedulePolyFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs some shutdown code for this function and turns off the stream. -func (obj *SchedulePolyFunc) Close() error { - close(obj.closeChan) - return nil -} - // schedulerResult combines our internal events into a single message packet. type schedulerResult struct { hosts []string diff --git a/lang/funcs/engine.go b/lang/funcs/engine.go index 1f9a6d3c..c8819e12 100644 --- a/lang/funcs/engine.go +++ b/lang/funcs/engine.go @@ -18,6 +18,7 @@ package funcs import ( + "context" "fmt" "sync" @@ -35,6 +36,9 @@ type State struct { handle interfaces.Func // the function (if not nil, we've found it on init) + ctx context.Context // used for shutting down each Stream function. + cancel context.CancelFunc + init bool // have we run Init on our func? ready bool // has it received all the args it needs at least once? loaded bool // has the func run at least once ? @@ -112,6 +116,9 @@ type Engine struct { streamChan chan error // signals a new graph can be created or problem + ctx context.Context // used for shutting down each Stream function. + cancel context.CancelFunc + closeChan chan struct{} // close signal wg *sync.WaitGroup } @@ -137,6 +144,8 @@ func (obj *Engine) Init() error { } obj.topologicalSort = topologicalSort // cache the result + obj.ctx, obj.cancel = context.WithCancel(context.Background()) // top + for _, vertex := range obj.Graph.Vertices() { // is this an interface we can use? if _, exists := obj.state[vertex]; exists { @@ -152,7 +161,13 @@ func (obj *Engine) Init() error { obj.Logf("Loading func `%s`", vertex) } - obj.state[vertex] = &State{Expr: expr} // store some state! + innerCtx, innerCancel := context.WithCancel(obj.ctx) + obj.state[vertex] = &State{ + Expr: expr, + + ctx: innerCtx, + cancel: innerCancel, + } // store some state! e1 := obj.state[vertex].Init() e2 := errwrap.Wrapf(e1, "error loading func `%s`", vertex) @@ -407,7 +422,7 @@ func (obj *Engine) Run() error { if obj.Debug { obj.SafeLogf("Running func `%s`", node) } - err := node.handle.Stream() + err := node.handle.Stream(node.ctx) if obj.Debug { obj.SafeLogf("Exiting func `%s`", node) } @@ -673,14 +688,10 @@ func (obj *Engine) Close() error { var err error for _, vertex := range obj.topologicalSort { // FIXME: should we do this in reverse? node := obj.state[vertex] - if node.init { // did we Init this func? - if e := node.handle.Close(); e != nil { - e := errwrap.Wrapf(e, "problem closing func `%s`", node) - err = errwrap.Append(err, e) // list of errors - } - } + node.cancel() // ctx } close(obj.closeChan) obj.wg.Wait() // wait so that each func doesn't need to do this in close + obj.cancel() // free return err } diff --git a/lang/funcs/facts/facts.go b/lang/funcs/facts/facts.go index a86d29a2..5d4b8089 100644 --- a/lang/funcs/facts/facts.go +++ b/lang/funcs/facts/facts.go @@ -19,6 +19,7 @@ package facts import ( + "context" "fmt" "github.com/purpleidea/mgmt/engine" @@ -83,6 +84,5 @@ type Fact interface { //Validate() error // currently not needed since no facts are internal Info() *Info Init(*Init) error - Stream() error - Close() error + Stream(context.Context) error } diff --git a/lang/funcs/facts/func.go b/lang/funcs/facts/func.go index 60618f2c..25bc43bc 100644 --- a/lang/funcs/facts/func.go +++ b/lang/funcs/facts/func.go @@ -18,6 +18,7 @@ package facts import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -75,11 +76,6 @@ func (obj *FactFunc) Init(init *interfaces.Init) error { } // Stream returns the changing values that this function has over time. -func (obj *FactFunc) Stream() error { - return obj.Fact.Stream() -} - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *FactFunc) Close() error { - return obj.Fact.Close() +func (obj *FactFunc) Stream(ctx context.Context) error { + return obj.Fact.Stream(ctx) } diff --git a/lang/funcs/funcs.go b/lang/funcs/funcs.go index 4f0ee59c..ec2c0e10 100644 --- a/lang/funcs/funcs.go +++ b/lang/funcs/funcs.go @@ -19,6 +19,7 @@ package funcs import ( + "context" "fmt" "strings" "sync" @@ -150,6 +151,8 @@ func PureFuncExec(handle interfaces.Func, args []types.Value) (types.Value, erro hostname := "" // XXX: add to interface debug := false // XXX: add to interface logf := func(format string, v ...interface{}) {} // XXX: add to interface + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() info := handle.Info() if !info.Pure { @@ -217,7 +220,7 @@ func PureFuncExec(handle interfaces.Func, args []types.Value) (types.Value, erro if debug { logf("Running func") } - err := handle.Stream() // sends to output chan + err := handle.Stream(ctx) // sends to output chan if debug { logf("Exiting func") } @@ -296,10 +299,7 @@ Loop: } } - if err := handle.Close(); err != nil { - err = errwrap.Append(err, reterr) - return nil, errwrap.Wrapf(err, "problem closing func") - } + cancel() if result == nil && reterr == nil { // programming error diff --git a/lang/funcs/history_polyfunc.go b/lang/funcs/history_func.go similarity index 97% rename from lang/funcs/history_polyfunc.go rename to lang/funcs/history_func.go index 2d5a7e1b..c5fbb9c8 100644 --- a/lang/funcs/history_polyfunc.go +++ b/lang/funcs/history_func.go @@ -18,6 +18,7 @@ package funcs // TODO: should this be in its own individual package? import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -58,8 +59,6 @@ type HistoryFunc struct { history []types.Value // goes from newest (index->0) to oldest (len()-1) result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -358,12 +357,11 @@ func (obj *HistoryFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *HistoryFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *HistoryFunc) Stream() error { +func (obj *HistoryFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -412,21 +410,15 @@ func (obj *HistoryFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *HistoryFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/maplookup_polyfunc.go b/lang/funcs/maplookup_func.go similarity index 94% rename from lang/funcs/maplookup_polyfunc.go rename to lang/funcs/maplookup_func.go index 878f128b..40c1faac 100644 --- a/lang/funcs/maplookup_polyfunc.go +++ b/lang/funcs/maplookup_func.go @@ -18,6 +18,7 @@ package funcs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -38,29 +39,27 @@ const ( ) func init() { - Register(MapLookupFuncName, func() interfaces.Func { return &MapLookupPolyFunc{} }) // must register the func and name + Register(MapLookupFuncName, func() interfaces.Func { return &MapLookupFunc{} }) // must register the func and name } -// MapLookupPolyFunc is a key map lookup function. -type MapLookupPolyFunc struct { +// MapLookupFunc is a key map lookup function. +type MapLookupFunc struct { Type *types.Type // Kind == Map, that is used as the map we lookup init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct // can satisfy the pgraph.Vertex interface. -func (obj *MapLookupPolyFunc) String() string { +func (obj *MapLookupFunc) String() string { return MapLookupFuncName } // ArgGen returns the Nth arg name for this function. -func (obj *MapLookupPolyFunc) ArgGen(index int) (string, error) { +func (obj *MapLookupFunc) ArgGen(index int) (string, error) { seq := []string{mapLookupArgNameMap, mapLookupArgNameKey, mapLookupArgNameDef} if l := len(seq); index >= l { return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) @@ -69,7 +68,7 @@ func (obj *MapLookupPolyFunc) ArgGen(index int) (string, error) { } // Unify returns the list of invariants that this func produces. -func (obj *MapLookupPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { +func (obj *MapLookupFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { var invariants []interfaces.Invariant var invar interfaces.Invariant @@ -369,7 +368,7 @@ func (obj *MapLookupPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invarian // Polymorphisms returns the list of possible function signatures available for // this static polymorphic function. It relies on type and value hints to limit // the number of returned possibilities. -func (obj *MapLookupPolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { +func (obj *MapLookupFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { // TODO: return `variant` as arg for now -- maybe there's a better way? variant := []*types.Type{types.NewType("func(map variant, key variant, default variant) variant")} @@ -468,7 +467,7 @@ func (obj *MapLookupPolyFunc) Polymorphisms(partialType *types.Type, partialValu // and must be run before Info() and any of the other Func interface methods are // used. This function is idempotent, as long as the arg isn't changed between // runs. -func (obj *MapLookupPolyFunc) Build(typ *types.Type) error { +func (obj *MapLookupFunc) Build(typ *types.Type) error { // typ is the KindFunc signature we're trying to build... if typ.Kind != types.KindFunc { return fmt.Errorf("input type must be of kind func") @@ -516,7 +515,7 @@ func (obj *MapLookupPolyFunc) Build(typ *types.Type) error { } // Validate tells us if the input struct takes a valid form. -func (obj *MapLookupPolyFunc) Validate() error { +func (obj *MapLookupFunc) Validate() error { if obj.Type == nil { // build must be run first return fmt.Errorf("type is still unspecified") } @@ -528,7 +527,7 @@ func (obj *MapLookupPolyFunc) Validate() error { // Info returns some static info about itself. Build must be called before this // will return correct data. -func (obj *MapLookupPolyFunc) Info() *interfaces.Info { +func (obj *MapLookupFunc) Info() *interfaces.Info { var typ *types.Type if obj.Type != nil { // don't panic if called speculatively // TODO: can obj.Type.Key or obj.Type.Val be nil (a partial) ? @@ -545,14 +544,13 @@ func (obj *MapLookupPolyFunc) Info() *interfaces.Info { } // Init runs some startup code for this function. -func (obj *MapLookupPolyFunc) Init(init *interfaces.Init) error { +func (obj *MapLookupFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *MapLookupPolyFunc) Stream() error { +func (obj *MapLookupFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -589,20 +587,14 @@ func (obj *MapLookupPolyFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *MapLookupPolyFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/operator_polyfunc.go b/lang/funcs/operator_func.go similarity index 95% rename from lang/funcs/operator_polyfunc.go rename to lang/funcs/operator_func.go index 9f6c4067..99b34482 100644 --- a/lang/funcs/operator_polyfunc.go +++ b/lang/funcs/operator_func.go @@ -18,6 +18,7 @@ package funcs // this is here, in case we allow others to register operators... import ( + "context" "fmt" "math" "sort" @@ -332,7 +333,7 @@ func init() { }, }) - Register(OperatorFuncName, func() interfaces.Func { return &OperatorPolyFunc{} }) // must register the func and name + Register(OperatorFuncName, func() interfaces.Func { return &OperatorFunc{} }) // must register the func and name } // OperatorFuncs maps an operator to a list of callable function values. @@ -422,29 +423,26 @@ func LookupOperatorShort(operator string, size int) ([]*types.Type, error) { return results, nil } -// OperatorPolyFunc is an operator function that performs an operation on N -// values. -type OperatorPolyFunc struct { +// OperatorFunc is an operator function that performs an operation on N values. +type OperatorFunc struct { Type *types.Type // Kind == Function, including operator arg init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct // can satisfy the pgraph.Vertex interface. -func (obj *OperatorPolyFunc) String() string { +func (obj *OperatorFunc) String() string { // TODO: return the exact operator if we can guarantee it doesn't change return OperatorFuncName } // argNames returns the maximum list of possible argNames. This can be truncated // if needed. The first arg name is the operator. -func (obj *OperatorPolyFunc) argNames() ([]string, error) { +func (obj *OperatorFunc) argNames() ([]string, error) { // we could just do this statically, but i did it dynamically so that I // wouldn't ever have to remember to update this list... max := 0 @@ -474,7 +472,7 @@ func (obj *OperatorPolyFunc) argNames() ([]string, error) { // findFunc tries to find the first available registered operator function that // matches the Operator/Type pattern requested. If none is found it returns nil. -func (obj *OperatorPolyFunc) findFunc(operator string) *types.FuncValue { +func (obj *OperatorFunc) findFunc(operator string) *types.FuncValue { fns, exists := OperatorFuncs[operator] if !exists { return nil @@ -489,7 +487,7 @@ func (obj *OperatorPolyFunc) findFunc(operator string) *types.FuncValue { } // ArgGen returns the Nth arg name for this function. -func (obj *OperatorPolyFunc) ArgGen(index int) (string, error) { +func (obj *OperatorFunc) ArgGen(index int) (string, error) { seq, err := obj.argNames() if err != nil { return "", err @@ -501,7 +499,7 @@ func (obj *OperatorPolyFunc) ArgGen(index int) (string, error) { } // Unify returns the list of invariants that this func produces. -func (obj *OperatorPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { +func (obj *OperatorFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { var invariants []interfaces.Invariant var invar interfaces.Invariant @@ -746,7 +744,7 @@ func (obj *OperatorPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant // Polymorphisms returns the list of possible function signatures available for // this static polymorphic function. It relies on type and value hints to limit // the number of returned possibilities. -func (obj *OperatorPolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { +func (obj *OperatorFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { var op string var size = -1 @@ -793,7 +791,7 @@ func (obj *OperatorPolyFunc) Polymorphisms(partialType *types.Type, partialValue // and must be run before Info() and any of the other Func interface methods are // used. This function is idempotent, as long as the arg isn't changed between // runs. -func (obj *OperatorPolyFunc) Build(typ *types.Type) error { +func (obj *OperatorFunc) Build(typ *types.Type) error { // typ is the KindFunc signature we're trying to build... if len(typ.Ord) < 1 { return fmt.Errorf("the operator function needs at least 1 arg") @@ -807,7 +805,7 @@ func (obj *OperatorPolyFunc) Build(typ *types.Type) error { } // Validate tells us if the input struct takes a valid form. -func (obj *OperatorPolyFunc) Validate() error { +func (obj *OperatorFunc) Validate() error { if obj.Type == nil { // build must be run first return fmt.Errorf("type is still unspecified") } @@ -819,7 +817,7 @@ func (obj *OperatorPolyFunc) Validate() error { // Info returns some static info about itself. Build must be called before this // will return correct data. -func (obj *OperatorPolyFunc) Info() *interfaces.Info { +func (obj *OperatorFunc) Info() *interfaces.Info { return &interfaces.Info{ Pure: true, Memo: false, @@ -829,14 +827,13 @@ func (obj *OperatorPolyFunc) Info() *interfaces.Info { } // Init runs some startup code for this function. -func (obj *OperatorPolyFunc) Init(init *interfaces.Init) error { +func (obj *OperatorFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *OperatorPolyFunc) Stream() error { +func (obj *OperatorFunc) Stream(ctx context.Context) error { var op, lastOp string var fn *types.FuncValue defer close(obj.init.Output) // the sender closes @@ -896,24 +893,18 @@ func (obj *OperatorPolyFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs some shutdown code for this function and turns off the stream. -func (obj *OperatorPolyFunc) Close() error { - close(obj.closeChan) - return nil -} - // removeOperatorArg returns a copy of the input KindFunc type, without the // operator arg which specifies which operator we're using. It *is* idempotent. func removeOperatorArg(typ *types.Type) *types.Type { diff --git a/lang/funcs/simple/simple.go b/lang/funcs/simple/simple.go index 11d07487..f9d4be05 100644 --- a/lang/funcs/simple/simple.go +++ b/lang/funcs/simple/simple.go @@ -18,6 +18,7 @@ package simple import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/funcs" @@ -78,8 +79,6 @@ type WrappedFunc struct { last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -128,12 +127,11 @@ func (obj *WrappedFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *WrappedFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *WrappedFunc) Stream() error { +func (obj *WrappedFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -172,7 +170,7 @@ func (obj *WrappedFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -181,14 +179,8 @@ func (obj *WrappedFunc) Stream() error { if len(obj.Fn.Type().Ord) == 0 { return nil // no more values, we're a pure func } - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *WrappedFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/simplepoly/simplepoly.go b/lang/funcs/simplepoly/simplepoly.go index 18e81454..2f9fba79 100644 --- a/lang/funcs/simplepoly/simplepoly.go +++ b/lang/funcs/simplepoly/simplepoly.go @@ -18,6 +18,7 @@ package simplepoly import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/funcs" @@ -141,8 +142,6 @@ type WrappedFunc struct { last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -546,12 +545,11 @@ func (obj *WrappedFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *WrappedFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *WrappedFunc) Stream() error { +func (obj *WrappedFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -599,7 +597,7 @@ func (obj *WrappedFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -608,14 +606,8 @@ func (obj *WrappedFunc) Stream() error { if len(obj.fn.Type().Ord) == 0 { return nil // no more values, we're a pure func } - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *WrappedFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structlookup_polyfunc.go b/lang/funcs/structlookup_func.go similarity index 93% rename from lang/funcs/structlookup_polyfunc.go rename to lang/funcs/structlookup_func.go index 583b3b78..2b3ae3e6 100644 --- a/lang/funcs/structlookup_polyfunc.go +++ b/lang/funcs/structlookup_func.go @@ -18,6 +18,7 @@ package funcs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -37,11 +38,11 @@ const ( ) func init() { - Register(StructLookupFuncName, func() interfaces.Func { return &StructLookupPolyFunc{} }) // must register the func and name + Register(StructLookupFuncName, func() interfaces.Func { return &StructLookupFunc{} }) // must register the func and name } -// StructLookupPolyFunc is a key map lookup function. -type StructLookupPolyFunc struct { +// StructLookupFunc is a struct field lookup function. +type StructLookupFunc struct { Type *types.Type // Kind == Struct, that is used as the struct we lookup Out *types.Type // type of field we're extracting @@ -50,18 +51,16 @@ type StructLookupPolyFunc struct { field string result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct // can satisfy the pgraph.Vertex interface. -func (obj *StructLookupPolyFunc) String() string { +func (obj *StructLookupFunc) String() string { return StructLookupFuncName } // ArgGen returns the Nth arg name for this function. -func (obj *StructLookupPolyFunc) ArgGen(index int) (string, error) { +func (obj *StructLookupFunc) ArgGen(index int) (string, error) { seq := []string{structLookupArgNameStruct, structLookupArgNameField} if l := len(seq); index >= l { return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) @@ -70,7 +69,7 @@ func (obj *StructLookupPolyFunc) ArgGen(index int) (string, error) { } // Unify returns the list of invariants that this func produces. -func (obj *StructLookupPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { +func (obj *StructLookupFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { var invariants []interfaces.Invariant var invar interfaces.Invariant @@ -309,7 +308,7 @@ func (obj *StructLookupPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invar // Polymorphisms returns the list of possible function signatures available for // this static polymorphic function. It relies on type and value hints to limit // the number of returned possibilities. -func (obj *StructLookupPolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { +func (obj *StructLookupFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { // TODO: return `variant` as arg for now -- maybe there's a better way? variant := []*types.Type{types.NewType("func(struct variant, field str) variant")} @@ -395,7 +394,7 @@ func (obj *StructLookupPolyFunc) Polymorphisms(partialType *types.Type, partialV // and must be run before Info() and any of the other Func interface methods are // used. This function is idempotent, as long as the arg isn't changed between // runs. -func (obj *StructLookupPolyFunc) Build(typ *types.Type) error { +func (obj *StructLookupFunc) Build(typ *types.Type) error { // typ is the KindFunc signature we're trying to build... if typ.Kind != types.KindFunc { return fmt.Errorf("input type must be of kind func") @@ -434,7 +433,7 @@ func (obj *StructLookupPolyFunc) Build(typ *types.Type) error { } // Validate tells us if the input struct takes a valid form. -func (obj *StructLookupPolyFunc) Validate() error { +func (obj *StructLookupFunc) Validate() error { if obj.Type == nil { // build must be run first return fmt.Errorf("type is still unspecified") } @@ -455,7 +454,7 @@ func (obj *StructLookupPolyFunc) Validate() error { // Info returns some static info about itself. Build must be called before this // will return correct data. -func (obj *StructLookupPolyFunc) Info() *interfaces.Info { +func (obj *StructLookupFunc) Info() *interfaces.Info { var sig *types.Type if obj.Type != nil { // don't panic if called speculatively // TODO: can obj.Out be nil (a partial) ? @@ -470,14 +469,13 @@ func (obj *StructLookupPolyFunc) Info() *interfaces.Info { } // Init runs some startup code for this function. -func (obj *StructLookupPolyFunc) Init(init *interfaces.Init) error { +func (obj *StructLookupFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *StructLookupPolyFunc) Stream() error { +func (obj *StructLookupFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -522,20 +520,14 @@ func (obj *StructLookupPolyFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *StructLookupPolyFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structs/call.go b/lang/funcs/structs/call.go index a77ab699..80285000 100644 --- a/lang/funcs/structs/call.go +++ b/lang/funcs/structs/call.go @@ -18,6 +18,7 @@ package structs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -46,8 +47,6 @@ type CallFunc struct { init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -112,14 +111,13 @@ func (obj *CallFunc) Info() *interfaces.Info { // Init runs some startup code for this composite function. func (obj *CallFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream takes an input struct in the format as described in the Func and Graph // methods of the Expr, and returns the actual expected value as a stream based // on the changing inputs to that value. -func (obj *CallFunc) Stream() error { +func (obj *CallFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -171,21 +169,15 @@ func (obj *CallFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *CallFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structs/composite.go b/lang/funcs/structs/composite.go index 6c967cd0..959f20c6 100644 --- a/lang/funcs/structs/composite.go +++ b/lang/funcs/structs/composite.go @@ -18,6 +18,7 @@ package structs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -41,8 +42,6 @@ type CompositeFunc struct { init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -117,14 +116,13 @@ func (obj *CompositeFunc) Info() *interfaces.Info { // Init runs some startup code for this composite function. func (obj *CompositeFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream takes an input struct in the format as described in the Func and Graph // methods of the Expr, and returns the actual expected value as a stream based // on the changing inputs to that value. -func (obj *CompositeFunc) Stream() error { +func (obj *CompositeFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -139,7 +137,7 @@ func (obj *CompositeFunc) Stream() error { select { case obj.init.Output <- result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } @@ -207,21 +205,15 @@ func (obj *CompositeFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *CompositeFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structs/const.go b/lang/funcs/structs/const.go index b8c03f06..8f18c6ab 100644 --- a/lang/funcs/structs/const.go +++ b/lang/funcs/structs/const.go @@ -18,6 +18,7 @@ package structs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -33,8 +34,7 @@ const ( type ConstFunc struct { Value types.Value - init *interfaces.Init - closeChan chan struct{} + init *interfaces.Init } // String returns a simple name for this function. This is needed so this struct @@ -70,24 +70,17 @@ func (obj *ConstFunc) Info() *interfaces.Info { // Init runs some startup code for this const. func (obj *ConstFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the single value that this const has, and then closes. -func (obj *ConstFunc) Stream() error { +func (obj *ConstFunc) Stream(ctx context.Context) error { select { case obj.init.Output <- obj.Value: // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } close(obj.init.Output) // signal that we're done sending return nil } - -// Close runs some shutdown code for this const and turns off the stream. -func (obj *ConstFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structs/function.go b/lang/funcs/structs/function.go index 85707f03..330fa296 100644 --- a/lang/funcs/structs/function.go +++ b/lang/funcs/structs/function.go @@ -18,6 +18,7 @@ package structs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/funcs" @@ -41,8 +42,6 @@ type FunctionFunc struct { init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -125,14 +124,13 @@ func (obj *FunctionFunc) Info() *interfaces.Info { // Init runs some startup code for this composite function. func (obj *FunctionFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream takes an input struct in the format as described in the Func and Graph // methods of the Expr, and returns the actual expected value as a stream based // on the changing inputs to that value. -func (obj *FunctionFunc) Stream() error { +func (obj *FunctionFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -158,7 +156,7 @@ func (obj *FunctionFunc) Stream() error { select { case obj.init.Output <- result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -194,21 +192,15 @@ func (obj *FunctionFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *FunctionFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structs/if.go b/lang/funcs/structs/if.go index 80e1b803..846eca4b 100644 --- a/lang/funcs/structs/if.go +++ b/lang/funcs/structs/if.go @@ -18,6 +18,7 @@ package structs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -37,8 +38,6 @@ type IfFunc struct { init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -82,14 +81,13 @@ func (obj *IfFunc) Info() *interfaces.Info { // Init runs some startup code for this if expression function. func (obj *IfFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream takes an input struct in the format as described in the Func and Graph // methods of the Expr, and returns the actual expected value as a stream based // on the changing inputs to that value. -func (obj *IfFunc) Stream() error { +func (obj *IfFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -119,21 +117,15 @@ func (obj *IfFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *IfFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structs/var.go b/lang/funcs/structs/var.go index 9dd88aec..8fef1330 100644 --- a/lang/funcs/structs/var.go +++ b/lang/funcs/structs/var.go @@ -18,6 +18,7 @@ package structs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -40,8 +41,6 @@ type VarFunc struct { init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -84,14 +83,13 @@ func (obj *VarFunc) Info() *interfaces.Info { // Init runs some startup code for this composite function. func (obj *VarFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream takes an input struct in the format as described in the Func and Graph // methods of the Expr, and returns the actual expected value as a stream based // on the changing inputs to that value. -func (obj *VarFunc) Stream() error { +func (obj *VarFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -121,21 +119,15 @@ func (obj *VarFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *VarFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/interfaces/func.go b/lang/interfaces/func.go index a879a41f..6f381ae5 100644 --- a/lang/interfaces/func.go +++ b/lang/interfaces/func.go @@ -18,6 +18,7 @@ package interfaces import ( + "context" "fmt" "strings" @@ -69,9 +70,18 @@ type Func interface { // not known yet. This is because the Info method might be called // speculatively to aid in type unification. Info() *Info + + // Init passes some important values and references to the function. Init(*Init) error - Stream() error - Close() error + + // Stream is the mainloop of the function. It reads and writes from + // channels to return the changing values that this func has over time. + // It should shutdown and cleanup when the input context is cancelled. + // It must not exit before any goroutines it spawned have terminated. + // It must close the Output chan if it's done sending new values out. It + // must send at least one value, or return an error. It may also return + // an error at anytime if it can't continue. + Stream(context.Context) error } // PolyFunc is an interface for functions which are statically polymorphic. In