From b134c4b77815861a4ca202021cb8f9f81a8f29b3 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Sun, 28 May 2023 16:20:42 -0400 Subject: [PATCH] lang: interfaces, funcs: Port Func API to new Stream signature This removes the `Close() error` and replaces it with a more modern Stream API that takes a context. This removes boilerplate and makes integration with concurrent code easier. The only downside is that there isn't an explicit cleanup step, but only one function was even using that and it was possible to switch it to a defer in Stream. This also renames the functions from polyfunc to just func which we determine by API not naming. --- docs/function-guide.md | 36 +++---------- docs/language-guide.md | 21 ++------ ...{contains_polyfunc.go => contains_func.go} | 40 ++++++-------- lang/funcs/core/core_test.go | 13 +++-- lang/funcs/core/datetime/now_fact.go | 17 ++---- lang/funcs/core/deploy/abspath_func.go | 16 ++---- lang/funcs/core/deploy/readfile_func.go | 16 ++---- lang/funcs/core/deploy/readfileabs_func.go | 16 ++---- lang/funcs/core/example/flipflop_fact.go | 19 +++---- lang/funcs/core/example/vumeter_func.go | 15 ++---- lang/funcs/core/fmt/printf_func.go | 16 ++---- lang/funcs/core/iter/map_func.go | 16 ++---- lang/funcs/core/os/readfile_func.go | 21 +++----- lang/funcs/core/os/system_func.go | 29 +++++------ lang/funcs/core/random1_func.go | 16 ++---- lang/funcs/core/sys/cpucount_fact.go | 20 +++---- lang/funcs/core/sys/cpucount_fact_test.go | 6 ++- lang/funcs/core/sys/hostname_fact.go | 16 ++---- lang/funcs/core/sys/load_fact.go | 17 ++---- lang/funcs/core/sys/uptime_fact.go | 17 ++---- lang/funcs/core/template_func.go | 16 ++---- lang/funcs/core/world/exchange_func.go | 16 ++---- lang/funcs/core/world/kvlookup_func.go | 18 ++----- lang/funcs/core/world/schedule_func.go | 52 +++++++++---------- lang/funcs/engine.go | 27 +++++++--- lang/funcs/facts/facts.go | 4 +- lang/funcs/facts/func.go | 10 ++-- lang/funcs/funcs.go | 10 ++-- .../{history_polyfunc.go => history_func.go} | 16 ++---- ...aplookup_polyfunc.go => maplookup_func.go} | 38 ++++++-------- ...{operator_polyfunc.go => operator_func.go} | 43 ++++++--------- lang/funcs/simple/simple.go | 16 ++---- lang/funcs/simplepoly/simplepoly.go | 16 ++---- ...ookup_polyfunc.go => structlookup_func.go} | 38 ++++++-------- lang/funcs/structs/call.go | 16 ++---- lang/funcs/structs/composite.go | 18 ++----- lang/funcs/structs/const.go | 15 ++---- lang/funcs/structs/function.go | 18 ++----- lang/funcs/structs/if.go | 16 ++---- lang/funcs/structs/var.go | 16 ++---- lang/interfaces/func.go | 14 ++++- 41 files changed, 276 insertions(+), 540 deletions(-) rename lang/funcs/{contains_polyfunc.go => contains_func.go} (90%) rename lang/funcs/{history_polyfunc.go => history_func.go} (97%) rename lang/funcs/{maplookup_polyfunc.go => maplookup_func.go} (94%) rename lang/funcs/{operator_polyfunc.go => operator_func.go} (95%) rename lang/funcs/{structlookup_polyfunc.go => structlookup_func.go} (93%) diff --git a/docs/function-guide.md b/docs/function-guide.md index 354ae2b9..c5496b43 100644 --- a/docs/function-guide.md +++ b/docs/function-guide.md @@ -239,27 +239,6 @@ use in the other methods. // Init runs some startup code for this function. func (obj *FooFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) // shutdown signal - return nil -} -``` - -### Close - -```golang -Close() error -``` - -This is called to cleanup the function. It usually causes the stream to -shutdown. Even if `Stream()` decided to shutdown early, it might still get -called. It is usually called by the engine to tell the function to shutdown. - -#### Example - -```golang -// Close runs some shutdown code for this function and turns off the stream. -func (obj *FooFunc) Close() error { - close(obj.closeChan) // send a signal to tell the stream to close return nil } ``` @@ -267,23 +246,24 @@ func (obj *FooFunc) Close() error { ### Stream ```golang -Stream() error +Stream(context.Context) error ``` `Stream` is where the real _work_ is done. This method is started by the language function engine. It will run this function while simultaneously sending -it values on the `input` channel. It will only send a complete set of input +it values on the `Input` channel. It will only send a complete set of input values. You should send a value to the output channel when you have decided that one should be produced. Make sure to only use input values of the expected type as declared in the `Info` struct, and send values of the similarly declared appropriate return type. Failure to do so will may result in a panic and -sadness. +sadness. You must shutdown if the input context cancels. You must close the +`Output` channel if you are done generating new values and/or when you shutdown. #### Example ```golang // Stream returns the single value that was generated and then closes. -func (obj *FooFunc) Stream() error { +func (obj *FooFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes var result string for { @@ -300,7 +280,7 @@ func (obj *FooFunc) Stream() error { result = fmt.Sprintf("the input is: %d", ix) - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -309,7 +289,7 @@ func (obj *FooFunc) Stream() error { V: result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } @@ -340,8 +320,6 @@ type FooFunc struct { init *interfaces.Init // this space can be used if needed - - closeChan chan struct{} // shutdown signal } ``` diff --git a/docs/language-guide.md b/docs/language-guide.md index 09bd648e..a94d6983 100644 --- a/docs/language-guide.md +++ b/docs/language-guide.md @@ -683,7 +683,7 @@ Please see the example functions in ### Stream ```golang -Stream() error +Stream(context.Context) error ``` Stream is called by the function engine when it is ready for your function to @@ -692,23 +692,8 @@ value. Failure to produce at least one value will probably cause the function engine to hang waiting for your output. This function must close the `Output` channel when it has no more values to send. The engine will close the `Input` channel when it has no more values to send. This may or may not influence -whether or not you close the `Output` channel. - -#### Example - -```golang -Please see the example functions in -[lang/funcs/core/](https://github.com/purpleidea/mgmt/tree/master/lang/funcs/core/). -``` - -### Close - -```golang -Close() error -``` - -Close asks the particular function to shutdown its `Stream()` function and -return. +whether or not you close the `Output` channel. You must shutdown if the input +context cancels. #### Example diff --git a/lang/funcs/contains_polyfunc.go b/lang/funcs/contains_func.go similarity index 90% rename from lang/funcs/contains_polyfunc.go rename to lang/funcs/contains_func.go index 28a51936..e0b21d39 100644 --- a/lang/funcs/contains_polyfunc.go +++ b/lang/funcs/contains_func.go @@ -18,6 +18,7 @@ package funcs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -37,29 +38,27 @@ const ( ) func init() { - Register(ContainsFuncName, func() interfaces.Func { return &ContainsPolyFunc{} }) // must register the func and name + Register(ContainsFuncName, func() interfaces.Func { return &ContainsFunc{} }) // must register the func and name } -// ContainsPolyFunc returns true if a value is found in a list. Otherwise false. -type ContainsPolyFunc struct { +// ContainsFunc returns true if a value is found in a list. Otherwise false. +type ContainsFunc struct { Type *types.Type // this is the type of value stored in our list init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct // can satisfy the pgraph.Vertex interface. -func (obj *ContainsPolyFunc) String() string { +func (obj *ContainsFunc) String() string { return ContainsFuncName } // ArgGen returns the Nth arg name for this function. -func (obj *ContainsPolyFunc) ArgGen(index int) (string, error) { +func (obj *ContainsFunc) ArgGen(index int) (string, error) { seq := []string{containsArgNameNeedle, containsArgNameHaystack} if l := len(seq); index >= l { return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) @@ -68,7 +67,7 @@ func (obj *ContainsPolyFunc) ArgGen(index int) (string, error) { } // Unify returns the list of invariants that this func produces. -func (obj *ContainsPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { +func (obj *ContainsFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { var invariants []interfaces.Invariant var invar interfaces.Invariant @@ -139,7 +138,7 @@ func (obj *ContainsPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant // invariant. This can only happen once, because by then we'll have given all // the new information we can, and falsely producing redundant information is a // good way to stall the solver if it thinks it keeps learning more things! -func (obj *ContainsPolyFunc) fnBuilder(recurse bool, expr, dummyNeedle, dummyHaystack, dummyOut interfaces.Expr) func(fnInvariants []interfaces.Invariant, solved map[interfaces.Expr]*types.Type) ([]interfaces.Invariant, error) { +func (obj *ContainsFunc) fnBuilder(recurse bool, expr, dummyNeedle, dummyHaystack, dummyOut interfaces.Expr) func(fnInvariants []interfaces.Invariant, solved map[interfaces.Expr]*types.Type) ([]interfaces.Invariant, error) { return func(fnInvariants []interfaces.Invariant, solved map[interfaces.Expr]*types.Type) ([]interfaces.Invariant, error) { for _, invariant := range fnInvariants { // search for this special type of invariant @@ -241,7 +240,7 @@ func (obj *ContainsPolyFunc) fnBuilder(recurse bool, expr, dummyNeedle, dummyHay // Polymorphisms returns the list of possible function signatures available for // this static polymorphic function. It relies on type and value hints to limit // the number of returned possibilities. -func (obj *ContainsPolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { +func (obj *ContainsFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { // TODO: return `variant` as arg for now -- maybe there's a better way? variant := []*types.Type{types.NewType("func(needle variant, haystack variant) bool")} @@ -292,7 +291,7 @@ func (obj *ContainsPolyFunc) Polymorphisms(partialType *types.Type, partialValue // and must be run before Info() and any of the other Func interface methods are // used. This function is idempotent, as long as the arg isn't changed between // runs. -func (obj *ContainsPolyFunc) Build(typ *types.Type) error { +func (obj *ContainsFunc) Build(typ *types.Type) error { // typ is the KindFunc signature we're trying to build... if typ.Kind != types.KindFunc { return fmt.Errorf("input type must be of kind func") @@ -335,7 +334,7 @@ func (obj *ContainsPolyFunc) Build(typ *types.Type) error { } // Validate tells us if the input struct takes a valid form. -func (obj *ContainsPolyFunc) Validate() error { +func (obj *ContainsFunc) Validate() error { if obj.Type == nil { // build must be run first return fmt.Errorf("type is still unspecified") } @@ -344,7 +343,7 @@ func (obj *ContainsPolyFunc) Validate() error { // Info returns some static info about itself. Build must be called before this // will return correct data. -func (obj *ContainsPolyFunc) Info() *interfaces.Info { +func (obj *ContainsFunc) Info() *interfaces.Info { var sig *types.Type if obj.Type != nil { // don't panic if called speculatively s := obj.Type.String() @@ -359,14 +358,13 @@ func (obj *ContainsPolyFunc) Info() *interfaces.Info { } // Init runs some startup code for this function. -func (obj *ContainsPolyFunc) Init(init *interfaces.Init) error { +func (obj *ContainsFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *ContainsPolyFunc) Stream() error { +func (obj *ContainsFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -397,20 +395,14 @@ func (obj *ContainsPolyFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *ContainsPolyFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/core_test.go b/lang/funcs/core/core_test.go index d519f11d..33ebd3ca 100644 --- a/lang/funcs/core/core_test.go +++ b/lang/funcs/core/core_test.go @@ -583,6 +583,9 @@ func TestLiveFuncExec0(t *testing.T) { valueptrch := make(chan int) // which Nth value are we at? killTimeline := make(chan struct{}) // ask timeline to exit + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // wait for close signals wg.Add(1) go func() { @@ -617,7 +620,7 @@ func TestLiveFuncExec0(t *testing.T) { if debug { logf("Running func") } - err := handle.Stream() // sends to output chan + err := handle.Stream(ctx) // sends to output chan t.Logf("test #%d: stream exited with: %+v", index, err) if debug { logf("Exiting func") @@ -740,12 +743,8 @@ func TestLiveFuncExec0(t *testing.T) { t.Logf("test #%d: timeline finished", index) close(argch) - t.Logf("test #%d: running Close", index) - if err := handle.Close(); err != nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: could not close func: %+v", index, err) - return - } + t.Logf("test #%d: running cancel", index) + cancel() }() // read everything diff --git a/lang/funcs/core/datetime/now_fact.go b/lang/funcs/core/datetime/now_fact.go index 3d182a9c..80bae764 100644 --- a/lang/funcs/core/datetime/now_fact.go +++ b/lang/funcs/core/datetime/now_fact.go @@ -18,6 +18,7 @@ package coredatetime import ( + "context" "time" "github.com/purpleidea/mgmt/lang/funcs/facts" @@ -36,8 +37,7 @@ func init() { // DateTimeFact is a fact which returns the current date and time. type DateTimeFact struct { - init *facts.Init - closeChan chan struct{} + init *facts.Init } // String returns a simple name for this fact. This is needed so this struct can @@ -62,12 +62,11 @@ func (obj *DateTimeFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *DateTimeFact) Init(init *facts.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this fact has over time. -func (obj *DateTimeFact) Stream() error { +func (obj *DateTimeFact) Stream(ctx context.Context) error { defer close(obj.init.Output) // always signal when we're done // XXX: this might be an interesting fact to write because: // 1) will the sleeps from the ticker be in sync with the second ticker? @@ -87,7 +86,7 @@ func (obj *DateTimeFact) Stream() error { startChan = nil // disable case <-ticker.C: // received the timer event // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -95,14 +94,8 @@ func (obj *DateTimeFact) Stream() error { case obj.init.Output <- &types.IntValue{ // seconds since 1970... V: time.Now().Unix(), // .UTC() not necessary }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this fact and turns off the stream. -func (obj *DateTimeFact) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/deploy/abspath_func.go b/lang/funcs/core/deploy/abspath_func.go index 557d092c..b51a8e84 100644 --- a/lang/funcs/core/deploy/abspath_func.go +++ b/lang/funcs/core/deploy/abspath_func.go @@ -18,6 +18,7 @@ package coredeploy import ( + "context" "fmt" "strings" @@ -49,8 +50,6 @@ type AbsPathFunc struct { path *string // the active path result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -91,7 +90,6 @@ func (obj *AbsPathFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *AbsPathFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) if obj.data == nil { // programming error return fmt.Errorf("missing function data") @@ -100,7 +98,7 @@ func (obj *AbsPathFunc) Init(init *interfaces.Init) error { } // Stream returns the changing values that this func has over time. -func (obj *AbsPathFunc) Stream() error { +func (obj *AbsPathFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -145,7 +143,7 @@ func (obj *AbsPathFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -153,14 +151,8 @@ func (obj *AbsPathFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *AbsPathFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/deploy/readfile_func.go b/lang/funcs/core/deploy/readfile_func.go index 6d0a922c..57d89fd5 100644 --- a/lang/funcs/core/deploy/readfile_func.go +++ b/lang/funcs/core/deploy/readfile_func.go @@ -18,6 +18,7 @@ package coredeploy import ( + "context" "fmt" "strings" @@ -50,8 +51,6 @@ type ReadFileFunc struct { filename *string // the active filename result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -92,7 +91,6 @@ func (obj *ReadFileFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *ReadFileFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) if obj.data == nil { // programming error return fmt.Errorf("missing function data") @@ -101,7 +99,7 @@ func (obj *ReadFileFunc) Init(init *interfaces.Init) error { } // Stream returns the changing values that this func has over time. -func (obj *ReadFileFunc) Stream() error { +func (obj *ReadFileFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -159,7 +157,7 @@ func (obj *ReadFileFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -167,14 +165,8 @@ func (obj *ReadFileFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *ReadFileFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/deploy/readfileabs_func.go b/lang/funcs/core/deploy/readfileabs_func.go index 523b8fda..e42e92b7 100644 --- a/lang/funcs/core/deploy/readfileabs_func.go +++ b/lang/funcs/core/deploy/readfileabs_func.go @@ -18,6 +18,7 @@ package coredeploy import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/funcs" @@ -50,8 +51,6 @@ type ReadFileAbsFunc struct { filename *string // the active filename result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -92,7 +91,6 @@ func (obj *ReadFileAbsFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *ReadFileAbsFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) if obj.data == nil { // programming error return fmt.Errorf("missing function data") @@ -101,7 +99,7 @@ func (obj *ReadFileAbsFunc) Init(init *interfaces.Init) error { } // Stream returns the changing values that this func has over time. -func (obj *ReadFileAbsFunc) Stream() error { +func (obj *ReadFileAbsFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -145,7 +143,7 @@ func (obj *ReadFileAbsFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -153,14 +151,8 @@ func (obj *ReadFileAbsFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *ReadFileAbsFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/example/flipflop_fact.go b/lang/funcs/core/example/flipflop_fact.go index efd17c23..aa8a27e0 100644 --- a/lang/funcs/core/example/flipflop_fact.go +++ b/lang/funcs/core/example/flipflop_fact.go @@ -18,6 +18,7 @@ package coreexample import ( + "context" "time" "github.com/purpleidea/mgmt/lang/funcs/facts" @@ -38,9 +39,8 @@ func init() { // and is not meant for serious computing. This would be better served by a flip // function which you could specify an interval for. type FlipFlopFact struct { - init *facts.Init - value bool - closeChan chan struct{} + init *facts.Init + value bool } // String returns a simple name for this fact. This is needed so this struct can @@ -65,12 +65,11 @@ func (obj *FlipFlopFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *FlipFlopFact) Init(init *facts.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this fact has over time. -func (obj *FlipFlopFact) Stream() error { +func (obj *FlipFlopFact) Stream(ctx context.Context) error { defer close(obj.init.Output) // always signal when we're done // TODO: don't hard code 5 sec interval ticker := time.NewTicker(time.Duration(5) * time.Second) @@ -85,7 +84,7 @@ func (obj *FlipFlopFact) Stream() error { startChan = nil // disable case <-ticker.C: // received the timer event // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -93,16 +92,10 @@ func (obj *FlipFlopFact) Stream() error { case obj.init.Output <- &types.BoolValue{ // flip V: obj.value, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } obj.value = !obj.value // flip it } } - -// Close runs some shutdown code for this fact and turns off the stream. -func (obj *FlipFlopFact) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/example/vumeter_func.go b/lang/funcs/core/example/vumeter_func.go index ac11dae7..2e0db7fd 100644 --- a/lang/funcs/core/example/vumeter_func.go +++ b/lang/funcs/core/example/vumeter_func.go @@ -59,8 +59,6 @@ type VUMeterFunc struct { peak float64 result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -133,12 +131,11 @@ func (obj *VUMeterFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *VUMeterFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *VUMeterFunc) Stream() error { +func (obj *VUMeterFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes ticker := newTicker() defer ticker.Stop() @@ -222,7 +219,7 @@ func (obj *VUMeterFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -230,18 +227,12 @@ func (obj *VUMeterFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs some shutdown code for this function and turns off the stream. -func (obj *VUMeterFunc) Close() error { - close(obj.closeChan) - return nil -} - func newTicker() *time.Ticker { return time.NewTicker(time.Duration(1) * time.Second) } diff --git a/lang/funcs/core/fmt/printf_func.go b/lang/funcs/core/fmt/printf_func.go index fb615280..c998b76c 100644 --- a/lang/funcs/core/fmt/printf_func.go +++ b/lang/funcs/core/fmt/printf_func.go @@ -18,6 +18,7 @@ package corefmt import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/funcs" @@ -55,8 +56,6 @@ type PrintfFunc struct { last types.Value // last value received to use for diff result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -367,12 +366,11 @@ func (obj *PrintfFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *PrintfFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *PrintfFunc) Stream() error { +func (obj *PrintfFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -409,7 +407,7 @@ func (obj *PrintfFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -417,18 +415,12 @@ func (obj *PrintfFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs some shutdown code for this function and turns off the stream. -func (obj *PrintfFunc) Close() error { - close(obj.closeChan) - return nil -} - // valueToString prints our values how we expect for printf. // FIXME: if this turns out to be useful, add it to the types package. func valueToString(value types.Value) string { diff --git a/lang/funcs/core/iter/map_func.go b/lang/funcs/core/iter/map_func.go index 1981f20a..c09eb818 100644 --- a/lang/funcs/core/iter/map_func.go +++ b/lang/funcs/core/iter/map_func.go @@ -18,6 +18,7 @@ package coreiter import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/funcs" @@ -62,8 +63,6 @@ type MapFunc struct { function func([]types.Value) (types.Value, error) result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -556,12 +555,11 @@ func (obj *MapFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *MapFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *MapFunc) Stream() error { +func (obj *MapFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes rtyp := types.NewType(fmt.Sprintf("[]%s", obj.RType.String())) for { @@ -613,21 +611,15 @@ func (obj *MapFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *MapFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/os/readfile_func.go b/lang/funcs/core/os/readfile_func.go index 8e37872e..fc65801d 100644 --- a/lang/funcs/core/os/readfile_func.go +++ b/lang/funcs/core/os/readfile_func.go @@ -18,6 +18,7 @@ package coreos import ( + "context" "fmt" "io/ioutil" "sync" @@ -55,8 +56,6 @@ type ReadFileFunc struct { wg *sync.WaitGroup result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -94,13 +93,13 @@ func (obj *ReadFileFunc) Init(init *interfaces.Init) error { obj.init = init obj.events = make(chan error) obj.wg = &sync.WaitGroup{} - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *ReadFileFunc) Stream() error { +func (obj *ReadFileFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes + defer close(obj.events) // clean up for fun defer obj.wg.Wait() defer func() { if obj.recWatcher != nil { @@ -182,7 +181,7 @@ func (obj *ReadFileFunc) Stream() error { case obj.events <- err: // send event... - case <-obj.closeChan: + case <-ctx.Done(): // don't block here on shutdown return } @@ -215,7 +214,7 @@ func (obj *ReadFileFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -223,16 +222,8 @@ func (obj *ReadFileFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *ReadFileFunc) Close() error { - close(obj.closeChan) - obj.wg.Wait() // block so we don't exit by the closure of obj.events - close(obj.events) // clean up for fun - return nil -} diff --git a/lang/funcs/core/os/system_func.go b/lang/funcs/core/os/system_func.go index 442562b5..0ad428dd 100644 --- a/lang/funcs/core/os/system_func.go +++ b/lang/funcs/core/os/system_func.go @@ -49,10 +49,8 @@ func init() { // after the other, the downstream resources might not run for every line unless // the "Meta:realize" metaparam is set to true. type SystemFunc struct { - init *interfaces.Init - - closeChan chan struct{} - cancel context.CancelFunc + init *interfaces.Init + cancel context.CancelFunc } // String returns a simple name for this function. This is needed so this struct @@ -89,12 +87,15 @@ func (obj *SystemFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *SystemFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *SystemFunc) Stream() error { +func (obj *SystemFunc) Stream(ctx context.Context) error { + // XXX: this implementation is a bit awkward especially with the port to + // the Stream(context.Context) signature change. This is a straight port + // but we could refactor this eventually. + // Close the output chan to signal that no more values are coming. defer close(obj.init.Output) @@ -115,7 +116,7 @@ func (obj *SystemFunc) Stream() error { // Kill the current process, if any. A new cancel function is created // each time a new process is started. - var ctx context.Context + var innerCtx context.Context defer func() { if obj.cancel == nil { return @@ -132,7 +133,7 @@ func (obj *SystemFunc) Stream() error { select { case <-processedChan: return nil - case <-obj.closeChan: + case <-ctx.Done(): return nil } } @@ -147,8 +148,8 @@ func (obj *SystemFunc) Stream() error { // Run the command, connecting it to ctx so we can kill // it if needed, and to two Readers so we can read its // stdout and stderr. - ctx, obj.cancel = context.WithCancel(context.Background()) - cmd := exec.CommandContext(ctx, "sh", "-c", shellCommand) + innerCtx, obj.cancel = context.WithCancel(context.Background()) + cmd := exec.CommandContext(innerCtx, "sh", "-c", shellCommand) stdoutReader, err := cmd.StdoutPipe() if err != nil { return err @@ -205,14 +206,8 @@ func (obj *SystemFunc) Stream() error { wg.Wait() close(processedChan) }() - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *SystemFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/random1_func.go b/lang/funcs/core/random1_func.go index 376e5224..982817c9 100644 --- a/lang/funcs/core/random1_func.go +++ b/lang/funcs/core/random1_func.go @@ -18,6 +18,7 @@ package core // TODO: should this be in its own individual package? import ( + "context" "crypto/rand" "fmt" "math/big" @@ -56,8 +57,6 @@ type Random1Func struct { init *interfaces.Init finished bool // did we send the random string? - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -119,12 +118,11 @@ func generate(length uint16) (string, error) { // Init runs some startup code for this function. func (obj *Random1Func) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the single value that was generated and then closes. -func (obj *Random1Func) Stream() error { +func (obj *Random1Func) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes var result string for { @@ -153,7 +151,7 @@ func (obj *Random1Func) Stream() error { return err // no errwrap needed b/c helper func } - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -164,14 +162,8 @@ func (obj *Random1Func) Stream() error { // we only send one value, then wait for input to close obj.finished = true - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *Random1Func) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/sys/cpucount_fact.go b/lang/funcs/core/sys/cpucount_fact.go index d8e0d03e..b8911d5b 100644 --- a/lang/funcs/core/sys/cpucount_fact.go +++ b/lang/funcs/core/sys/cpucount_fact.go @@ -20,6 +20,7 @@ package coresys import ( + "context" "io/ioutil" "regexp" "strconv" @@ -50,8 +51,7 @@ func init() { // CPUCountFact is a fact that returns the current CPU count. type CPUCountFact struct { - init *facts.Init - closeChan chan struct{} + init *facts.Init } // String returns a simple name for this fact. This is needed so this struct can @@ -67,18 +67,16 @@ func (obj *CPUCountFact) Info() *facts.Info { } } -// Init runs startup code for this fact. Initializes the closeChan and sets the -// facts.Init variable. +// Init runs startup code for this fact and sets the facts.Init variable. func (obj *CPUCountFact) Init(init *facts.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this fact has over time. It will // first poll sysfs to get the initial cpu count, and then receives UEvents from // the kernel as CPUs are added/removed. -func (obj CPUCountFact) Stream() error { +func (obj CPUCountFact) Stream(ctx context.Context) error { defer close(obj.init.Output) // signal when we're done ss, err := socketset.NewSocketSet(rtmGrps, socketFile, unix.NETLINK_KOBJECT_UEVENT) @@ -152,7 +150,7 @@ func (obj CPUCountFact) Stream() error { continue } } - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -167,18 +165,12 @@ func (obj CPUCountFact) Stream() error { }: once = true // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs cleanup code for the fact and turns off the Stream. -func (obj *CPUCountFact) Close() error { - close(obj.closeChan) - return nil -} - // getCPUCount looks in sysfs to get the number of CPUs that are online. func getCPUCount() (int64, error) { dat, err := ioutil.ReadFile("/sys/devices/system/cpu/online") diff --git a/lang/funcs/core/sys/cpucount_fact_test.go b/lang/funcs/core/sys/cpucount_fact_test.go index ea4a4424..46f32cc1 100644 --- a/lang/funcs/core/sys/cpucount_fact_test.go +++ b/lang/funcs/core/sys/cpucount_fact_test.go @@ -20,6 +20,7 @@ package coresys import ( + "context" "testing" "github.com/purpleidea/mgmt/lang/funcs/facts" @@ -41,8 +42,9 @@ func TestSimple(t *testing.T) { return } + ctx, cancel := context.WithCancel(context.Background()) go func() { - defer fact.Close() + defer cancel() Loop: for { select { @@ -54,7 +56,7 @@ func TestSimple(t *testing.T) { }() // now start the stream - if err := fact.Stream(); err != nil { + if err := fact.Stream(ctx); err != nil { t.Error(err) } } diff --git a/lang/funcs/core/sys/hostname_fact.go b/lang/funcs/core/sys/hostname_fact.go index eed041f2..c088971a 100644 --- a/lang/funcs/core/sys/hostname_fact.go +++ b/lang/funcs/core/sys/hostname_fact.go @@ -18,6 +18,8 @@ package coresys import ( + "context" + "github.com/purpleidea/mgmt/lang/funcs/facts" "github.com/purpleidea/mgmt/lang/types" ) @@ -35,8 +37,7 @@ func init() { // HostnameFact is a function that returns the hostname. // TODO: support hostnames that change in the future. type HostnameFact struct { - init *facts.Init - closeChan chan struct{} + init *facts.Init } // String returns a simple name for this fact. This is needed so this struct can @@ -61,26 +62,19 @@ func (obj *HostnameFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *HostnameFact) Init(init *facts.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the single value that this fact has, and then closes. -func (obj *HostnameFact) Stream() error { +func (obj *HostnameFact) Stream(ctx context.Context) error { select { case obj.init.Output <- &types.StrValue{ V: obj.init.Hostname, }: // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } close(obj.init.Output) // signal that we're done sending return nil } - -// Close runs some shutdown code for this fact and turns off the stream. -func (obj *HostnameFact) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/sys/load_fact.go b/lang/funcs/core/sys/load_fact.go index 3599cd0d..f6e1e212 100644 --- a/lang/funcs/core/sys/load_fact.go +++ b/lang/funcs/core/sys/load_fact.go @@ -18,6 +18,7 @@ package coresys import ( + "context" "time" "github.com/purpleidea/mgmt/lang/funcs/facts" @@ -39,8 +40,7 @@ func init() { // LoadFact is a fact which returns the current system load. type LoadFact struct { - init *facts.Init - closeChan chan struct{} + init *facts.Init } // String returns a simple name for this fact. This is needed so this struct can @@ -65,12 +65,11 @@ func (obj *LoadFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *LoadFact) Init(init *facts.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this fact has over time. -func (obj *LoadFact) Stream() error { +func (obj *LoadFact) Stream(ctx context.Context) error { defer close(obj.init.Output) // always signal when we're done // it seems the different values only update once every 5 @@ -88,7 +87,7 @@ func (obj *LoadFact) Stream() error { startChan = nil // disable case <-ticker.C: // received the timer event // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -107,14 +106,8 @@ func (obj *LoadFact) Stream() error { select { case obj.init.Output <- st: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this fact and turns off the stream. -func (obj *LoadFact) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/sys/uptime_fact.go b/lang/funcs/core/sys/uptime_fact.go index 0db7cda7..e3fb8387 100644 --- a/lang/funcs/core/sys/uptime_fact.go +++ b/lang/funcs/core/sys/uptime_fact.go @@ -18,6 +18,7 @@ package coresys import ( + "context" "time" "github.com/purpleidea/mgmt/lang/funcs/facts" @@ -37,8 +38,7 @@ func init() { // UptimeFact is a fact which returns the current uptime of your system. type UptimeFact struct { - init *facts.Init - closeChan chan struct{} + init *facts.Init } // String returns a simple name for this fact. This is needed so this struct can @@ -57,12 +57,11 @@ func (obj *UptimeFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *UptimeFact) Init(init *facts.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this fact has over time. -func (obj *UptimeFact) Stream() error { +func (obj *UptimeFact) Stream(ctx context.Context) error { defer close(obj.init.Output) ticker := time.NewTicker(time.Duration(1) * time.Second) @@ -75,7 +74,7 @@ func (obj *UptimeFact) Stream() error { startChan = nil case <-ticker.C: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -87,14 +86,8 @@ func (obj *UptimeFact) Stream() error { select { case obj.init.Output <- &types.IntValue{V: uptime}: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this fact and turns off the stream. -func (obj *UptimeFact) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/template_func.go b/lang/funcs/core/template_func.go index fc47e2cc..75b05bef 100644 --- a/lang/funcs/core/template_func.go +++ b/lang/funcs/core/template_func.go @@ -19,6 +19,7 @@ package core // TODO: should this be in its own individual package? import ( "bytes" + "context" "fmt" "reflect" "strings" @@ -73,8 +74,6 @@ type TemplateFunc struct { last types.Value // last value received to use for diff result *string // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -367,7 +366,6 @@ func (obj *TemplateFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *TemplateFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } @@ -491,7 +489,7 @@ Loop: } // Stream returns the changing values that this func has over time. -func (obj *TemplateFunc) Stream() error { +func (obj *TemplateFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -526,7 +524,7 @@ func (obj *TemplateFunc) Stream() error { } obj.result = &result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -534,18 +532,12 @@ func (obj *TemplateFunc) Stream() error { case obj.init.Output <- &types.StrValue{ V: *obj.result, }: - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs some shutdown code for this function and turns off the stream. -func (obj *TemplateFunc) Close() error { - close(obj.closeChan) - return nil -} - // safename renames the functions so they're valid inside the template. This is // a limitation of the template library, and it might be worth moving to a new // one. diff --git a/lang/funcs/core/world/exchange_func.go b/lang/funcs/core/world/exchange_func.go index b146e19a..d110dd4e 100644 --- a/lang/funcs/core/world/exchange_func.go +++ b/lang/funcs/core/world/exchange_func.go @@ -52,7 +52,6 @@ type ExchangeFunc struct { result types.Value // last calculated output watchChan chan error - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -93,14 +92,13 @@ func (obj *ExchangeFunc) Info() *interfaces.Info { func (obj *ExchangeFunc) Init(init *interfaces.Init) error { obj.init = init obj.watchChan = make(chan error) // XXX: sender should close this, but did I implement that part yet??? - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *ExchangeFunc) Stream() error { +func (obj *ExchangeFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() for { select { @@ -189,21 +187,15 @@ func (obj *ExchangeFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *ExchangeFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/core/world/kvlookup_func.go b/lang/funcs/core/world/kvlookup_func.go index 7903980d..42385a17 100644 --- a/lang/funcs/core/world/kvlookup_func.go +++ b/lang/funcs/core/world/kvlookup_func.go @@ -51,7 +51,6 @@ type KVLookupFunc struct { result types.Value // last calculated output watchChan chan error - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -90,14 +89,13 @@ func (obj *KVLookupFunc) Info() *interfaces.Info { func (obj *KVLookupFunc) Init(init *interfaces.Init) error { obj.init = init obj.watchChan = make(chan error) // XXX: sender should close this, but did I implement that part yet??? - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *KVLookupFunc) Stream() error { +func (obj *KVLookupFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() for { select { @@ -143,7 +141,7 @@ func (obj *KVLookupFunc) Stream() error { select { case obj.init.Output <- result: // send one! // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -176,25 +174,19 @@ func (obj *KVLookupFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs some shutdown code for this function and turns off the stream. -func (obj *KVLookupFunc) Close() error { - close(obj.closeChan) - return nil -} - // buildMap builds the result map which we'll need. It uses struct variables. func (obj *KVLookupFunc) buildMap(ctx context.Context) (types.Value, error) { keyMap, err := obj.init.World.StrMapGet(ctx, obj.namespace) diff --git a/lang/funcs/core/world/schedule_func.go b/lang/funcs/core/world/schedule_func.go index b102b913..2476a803 100644 --- a/lang/funcs/core/world/schedule_func.go +++ b/lang/funcs/core/world/schedule_func.go @@ -62,12 +62,12 @@ const ( ) func init() { - funcs.ModuleRegister(ModuleName, ScheduleFuncName, func() interfaces.Func { return &SchedulePolyFunc{} }) + funcs.ModuleRegister(ModuleName, ScheduleFuncName, func() interfaces.Func { return &ScheduleFunc{} }) } -// SchedulePolyFunc is special function which determines where code should run -// in the cluster. -type SchedulePolyFunc struct { +// ScheduleFunc is special function which determines where code should run in +// the cluster. +type ScheduleFunc struct { Type *types.Type // this is the type of opts used if specified built bool // was this function built yet? @@ -81,17 +81,16 @@ type SchedulePolyFunc struct { result types.Value // last calculated output watchChan chan *schedulerResult - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct // can satisfy the pgraph.Vertex interface. -func (obj *SchedulePolyFunc) String() string { +func (obj *ScheduleFunc) String() string { return ScheduleFuncName } // validOpts returns the available mapping of valid opts fields to types. -func (obj *SchedulePolyFunc) validOpts() map[string]*types.Type { +func (obj *ScheduleFunc) validOpts() map[string]*types.Type { return map[string]*types.Type{ "strategy": types.TypeStr, "max": types.TypeInt, @@ -101,7 +100,7 @@ func (obj *SchedulePolyFunc) validOpts() map[string]*types.Type { } // ArgGen returns the Nth arg name for this function. -func (obj *SchedulePolyFunc) ArgGen(index int) (string, error) { +func (obj *ScheduleFunc) ArgGen(index int) (string, error) { seq := []string{scheduleArgNameNamespace, scheduleArgNameOpts} // 2nd arg is optional if l := len(seq); index >= l { return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) @@ -110,7 +109,7 @@ func (obj *SchedulePolyFunc) ArgGen(index int) (string, error) { } // Unify returns the list of invariants that this func produces. -func (obj *SchedulePolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { +func (obj *ScheduleFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { var invariants []interfaces.Invariant var invar interfaces.Invariant @@ -311,7 +310,7 @@ func (obj *SchedulePolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant // Polymorphisms returns the list of possible function signatures available for // this static polymorphic function. It relies on type and value hints to limit // the number of returned possibilities. -func (obj *SchedulePolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { +func (obj *ScheduleFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { // TODO: technically, we could generate all permutations of the struct! //variant := []*types.Type{} //t0 := types.NewType("func(namespace str) []str") @@ -399,7 +398,7 @@ func (obj *SchedulePolyFunc) Polymorphisms(partialType *types.Type, partialValue // and must be run before Info() and any of the other Func interface methods are // used. This function is idempotent, as long as the arg isn't changed between // runs. -func (obj *SchedulePolyFunc) Build(typ *types.Type) error { +func (obj *ScheduleFunc) Build(typ *types.Type) error { // typ is the KindFunc signature we're trying to build... if typ.Kind != types.KindFunc { return fmt.Errorf("input type must be of kind func") @@ -482,7 +481,7 @@ func (obj *SchedulePolyFunc) Build(typ *types.Type) error { } // Validate tells us if the input struct takes a valid form. -func (obj *SchedulePolyFunc) Validate() error { +func (obj *ScheduleFunc) Validate() error { if !obj.built { return fmt.Errorf("function wasn't built yet") } @@ -495,7 +494,7 @@ func (obj *SchedulePolyFunc) Validate() error { // Info returns some static info about itself. Build must be called before this // will return correct data. -func (obj *SchedulePolyFunc) Info() *interfaces.Info { +func (obj *ScheduleFunc) Info() *interfaces.Info { // It's important that you don't return a non-nil sig if this is called // before you're built. Type unification may call it opportunistically. var typ *types.Type @@ -515,16 +514,15 @@ func (obj *SchedulePolyFunc) Info() *interfaces.Info { } // Init runs some startup code for this function. -func (obj *SchedulePolyFunc) Init(init *interfaces.Init) error { +func (obj *ScheduleFunc) Init(init *interfaces.Init) error { obj.init = init obj.watchChan = make(chan *schedulerResult) - obj.closeChan = make(chan struct{}) //obj.init.Debug = true // use this for local debugging return nil } // Stream returns the changing values that this func has over time. -func (obj *SchedulePolyFunc) Stream() error { +func (obj *ScheduleFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -618,24 +616,28 @@ func (obj *SchedulePolyFunc) Stream() error { // process the stream of scheduling output... go func() { defer close(obj.watchChan) - ctx, cancel := context.WithCancel(context.Background()) + // XXX: maybe we could share the parent + // ctx, but I have to work out the + // ordering logic first. For now this is + // just a port of what it was before. + newCtx, cancel := context.WithCancel(context.Background()) go func() { defer cancel() // unblock Next() defer obj.scheduler.Shutdown() select { - case <-obj.closeChan: + case <-ctx.Done(): return } }() for { - hosts, err := obj.scheduler.Next(ctx) + hosts, err := obj.scheduler.Next(newCtx) select { case obj.watchChan <- &schedulerResult{ hosts: hosts, err: err, }: - case <-obj.closeChan: + case <-ctx.Done(): return } } @@ -688,25 +690,19 @@ func (obj *SchedulePolyFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs some shutdown code for this function and turns off the stream. -func (obj *SchedulePolyFunc) Close() error { - close(obj.closeChan) - return nil -} - // schedulerResult combines our internal events into a single message packet. type schedulerResult struct { hosts []string diff --git a/lang/funcs/engine.go b/lang/funcs/engine.go index 1f9a6d3c..c8819e12 100644 --- a/lang/funcs/engine.go +++ b/lang/funcs/engine.go @@ -18,6 +18,7 @@ package funcs import ( + "context" "fmt" "sync" @@ -35,6 +36,9 @@ type State struct { handle interfaces.Func // the function (if not nil, we've found it on init) + ctx context.Context // used for shutting down each Stream function. + cancel context.CancelFunc + init bool // have we run Init on our func? ready bool // has it received all the args it needs at least once? loaded bool // has the func run at least once ? @@ -112,6 +116,9 @@ type Engine struct { streamChan chan error // signals a new graph can be created or problem + ctx context.Context // used for shutting down each Stream function. + cancel context.CancelFunc + closeChan chan struct{} // close signal wg *sync.WaitGroup } @@ -137,6 +144,8 @@ func (obj *Engine) Init() error { } obj.topologicalSort = topologicalSort // cache the result + obj.ctx, obj.cancel = context.WithCancel(context.Background()) // top + for _, vertex := range obj.Graph.Vertices() { // is this an interface we can use? if _, exists := obj.state[vertex]; exists { @@ -152,7 +161,13 @@ func (obj *Engine) Init() error { obj.Logf("Loading func `%s`", vertex) } - obj.state[vertex] = &State{Expr: expr} // store some state! + innerCtx, innerCancel := context.WithCancel(obj.ctx) + obj.state[vertex] = &State{ + Expr: expr, + + ctx: innerCtx, + cancel: innerCancel, + } // store some state! e1 := obj.state[vertex].Init() e2 := errwrap.Wrapf(e1, "error loading func `%s`", vertex) @@ -407,7 +422,7 @@ func (obj *Engine) Run() error { if obj.Debug { obj.SafeLogf("Running func `%s`", node) } - err := node.handle.Stream() + err := node.handle.Stream(node.ctx) if obj.Debug { obj.SafeLogf("Exiting func `%s`", node) } @@ -673,14 +688,10 @@ func (obj *Engine) Close() error { var err error for _, vertex := range obj.topologicalSort { // FIXME: should we do this in reverse? node := obj.state[vertex] - if node.init { // did we Init this func? - if e := node.handle.Close(); e != nil { - e := errwrap.Wrapf(e, "problem closing func `%s`", node) - err = errwrap.Append(err, e) // list of errors - } - } + node.cancel() // ctx } close(obj.closeChan) obj.wg.Wait() // wait so that each func doesn't need to do this in close + obj.cancel() // free return err } diff --git a/lang/funcs/facts/facts.go b/lang/funcs/facts/facts.go index a86d29a2..5d4b8089 100644 --- a/lang/funcs/facts/facts.go +++ b/lang/funcs/facts/facts.go @@ -19,6 +19,7 @@ package facts import ( + "context" "fmt" "github.com/purpleidea/mgmt/engine" @@ -83,6 +84,5 @@ type Fact interface { //Validate() error // currently not needed since no facts are internal Info() *Info Init(*Init) error - Stream() error - Close() error + Stream(context.Context) error } diff --git a/lang/funcs/facts/func.go b/lang/funcs/facts/func.go index 60618f2c..25bc43bc 100644 --- a/lang/funcs/facts/func.go +++ b/lang/funcs/facts/func.go @@ -18,6 +18,7 @@ package facts import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -75,11 +76,6 @@ func (obj *FactFunc) Init(init *interfaces.Init) error { } // Stream returns the changing values that this function has over time. -func (obj *FactFunc) Stream() error { - return obj.Fact.Stream() -} - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *FactFunc) Close() error { - return obj.Fact.Close() +func (obj *FactFunc) Stream(ctx context.Context) error { + return obj.Fact.Stream(ctx) } diff --git a/lang/funcs/funcs.go b/lang/funcs/funcs.go index 4f0ee59c..ec2c0e10 100644 --- a/lang/funcs/funcs.go +++ b/lang/funcs/funcs.go @@ -19,6 +19,7 @@ package funcs import ( + "context" "fmt" "strings" "sync" @@ -150,6 +151,8 @@ func PureFuncExec(handle interfaces.Func, args []types.Value) (types.Value, erro hostname := "" // XXX: add to interface debug := false // XXX: add to interface logf := func(format string, v ...interface{}) {} // XXX: add to interface + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() info := handle.Info() if !info.Pure { @@ -217,7 +220,7 @@ func PureFuncExec(handle interfaces.Func, args []types.Value) (types.Value, erro if debug { logf("Running func") } - err := handle.Stream() // sends to output chan + err := handle.Stream(ctx) // sends to output chan if debug { logf("Exiting func") } @@ -296,10 +299,7 @@ Loop: } } - if err := handle.Close(); err != nil { - err = errwrap.Append(err, reterr) - return nil, errwrap.Wrapf(err, "problem closing func") - } + cancel() if result == nil && reterr == nil { // programming error diff --git a/lang/funcs/history_polyfunc.go b/lang/funcs/history_func.go similarity index 97% rename from lang/funcs/history_polyfunc.go rename to lang/funcs/history_func.go index 2d5a7e1b..c5fbb9c8 100644 --- a/lang/funcs/history_polyfunc.go +++ b/lang/funcs/history_func.go @@ -18,6 +18,7 @@ package funcs // TODO: should this be in its own individual package? import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -58,8 +59,6 @@ type HistoryFunc struct { history []types.Value // goes from newest (index->0) to oldest (len()-1) result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -358,12 +357,11 @@ func (obj *HistoryFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *HistoryFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *HistoryFunc) Stream() error { +func (obj *HistoryFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -412,21 +410,15 @@ func (obj *HistoryFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *HistoryFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/maplookup_polyfunc.go b/lang/funcs/maplookup_func.go similarity index 94% rename from lang/funcs/maplookup_polyfunc.go rename to lang/funcs/maplookup_func.go index 878f128b..40c1faac 100644 --- a/lang/funcs/maplookup_polyfunc.go +++ b/lang/funcs/maplookup_func.go @@ -18,6 +18,7 @@ package funcs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -38,29 +39,27 @@ const ( ) func init() { - Register(MapLookupFuncName, func() interfaces.Func { return &MapLookupPolyFunc{} }) // must register the func and name + Register(MapLookupFuncName, func() interfaces.Func { return &MapLookupFunc{} }) // must register the func and name } -// MapLookupPolyFunc is a key map lookup function. -type MapLookupPolyFunc struct { +// MapLookupFunc is a key map lookup function. +type MapLookupFunc struct { Type *types.Type // Kind == Map, that is used as the map we lookup init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct // can satisfy the pgraph.Vertex interface. -func (obj *MapLookupPolyFunc) String() string { +func (obj *MapLookupFunc) String() string { return MapLookupFuncName } // ArgGen returns the Nth arg name for this function. -func (obj *MapLookupPolyFunc) ArgGen(index int) (string, error) { +func (obj *MapLookupFunc) ArgGen(index int) (string, error) { seq := []string{mapLookupArgNameMap, mapLookupArgNameKey, mapLookupArgNameDef} if l := len(seq); index >= l { return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) @@ -69,7 +68,7 @@ func (obj *MapLookupPolyFunc) ArgGen(index int) (string, error) { } // Unify returns the list of invariants that this func produces. -func (obj *MapLookupPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { +func (obj *MapLookupFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { var invariants []interfaces.Invariant var invar interfaces.Invariant @@ -369,7 +368,7 @@ func (obj *MapLookupPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invarian // Polymorphisms returns the list of possible function signatures available for // this static polymorphic function. It relies on type and value hints to limit // the number of returned possibilities. -func (obj *MapLookupPolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { +func (obj *MapLookupFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { // TODO: return `variant` as arg for now -- maybe there's a better way? variant := []*types.Type{types.NewType("func(map variant, key variant, default variant) variant")} @@ -468,7 +467,7 @@ func (obj *MapLookupPolyFunc) Polymorphisms(partialType *types.Type, partialValu // and must be run before Info() and any of the other Func interface methods are // used. This function is idempotent, as long as the arg isn't changed between // runs. -func (obj *MapLookupPolyFunc) Build(typ *types.Type) error { +func (obj *MapLookupFunc) Build(typ *types.Type) error { // typ is the KindFunc signature we're trying to build... if typ.Kind != types.KindFunc { return fmt.Errorf("input type must be of kind func") @@ -516,7 +515,7 @@ func (obj *MapLookupPolyFunc) Build(typ *types.Type) error { } // Validate tells us if the input struct takes a valid form. -func (obj *MapLookupPolyFunc) Validate() error { +func (obj *MapLookupFunc) Validate() error { if obj.Type == nil { // build must be run first return fmt.Errorf("type is still unspecified") } @@ -528,7 +527,7 @@ func (obj *MapLookupPolyFunc) Validate() error { // Info returns some static info about itself. Build must be called before this // will return correct data. -func (obj *MapLookupPolyFunc) Info() *interfaces.Info { +func (obj *MapLookupFunc) Info() *interfaces.Info { var typ *types.Type if obj.Type != nil { // don't panic if called speculatively // TODO: can obj.Type.Key or obj.Type.Val be nil (a partial) ? @@ -545,14 +544,13 @@ func (obj *MapLookupPolyFunc) Info() *interfaces.Info { } // Init runs some startup code for this function. -func (obj *MapLookupPolyFunc) Init(init *interfaces.Init) error { +func (obj *MapLookupFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *MapLookupPolyFunc) Stream() error { +func (obj *MapLookupFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -589,20 +587,14 @@ func (obj *MapLookupPolyFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *MapLookupPolyFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/operator_polyfunc.go b/lang/funcs/operator_func.go similarity index 95% rename from lang/funcs/operator_polyfunc.go rename to lang/funcs/operator_func.go index 9f6c4067..99b34482 100644 --- a/lang/funcs/operator_polyfunc.go +++ b/lang/funcs/operator_func.go @@ -18,6 +18,7 @@ package funcs // this is here, in case we allow others to register operators... import ( + "context" "fmt" "math" "sort" @@ -332,7 +333,7 @@ func init() { }, }) - Register(OperatorFuncName, func() interfaces.Func { return &OperatorPolyFunc{} }) // must register the func and name + Register(OperatorFuncName, func() interfaces.Func { return &OperatorFunc{} }) // must register the func and name } // OperatorFuncs maps an operator to a list of callable function values. @@ -422,29 +423,26 @@ func LookupOperatorShort(operator string, size int) ([]*types.Type, error) { return results, nil } -// OperatorPolyFunc is an operator function that performs an operation on N -// values. -type OperatorPolyFunc struct { +// OperatorFunc is an operator function that performs an operation on N values. +type OperatorFunc struct { Type *types.Type // Kind == Function, including operator arg init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct // can satisfy the pgraph.Vertex interface. -func (obj *OperatorPolyFunc) String() string { +func (obj *OperatorFunc) String() string { // TODO: return the exact operator if we can guarantee it doesn't change return OperatorFuncName } // argNames returns the maximum list of possible argNames. This can be truncated // if needed. The first arg name is the operator. -func (obj *OperatorPolyFunc) argNames() ([]string, error) { +func (obj *OperatorFunc) argNames() ([]string, error) { // we could just do this statically, but i did it dynamically so that I // wouldn't ever have to remember to update this list... max := 0 @@ -474,7 +472,7 @@ func (obj *OperatorPolyFunc) argNames() ([]string, error) { // findFunc tries to find the first available registered operator function that // matches the Operator/Type pattern requested. If none is found it returns nil. -func (obj *OperatorPolyFunc) findFunc(operator string) *types.FuncValue { +func (obj *OperatorFunc) findFunc(operator string) *types.FuncValue { fns, exists := OperatorFuncs[operator] if !exists { return nil @@ -489,7 +487,7 @@ func (obj *OperatorPolyFunc) findFunc(operator string) *types.FuncValue { } // ArgGen returns the Nth arg name for this function. -func (obj *OperatorPolyFunc) ArgGen(index int) (string, error) { +func (obj *OperatorFunc) ArgGen(index int) (string, error) { seq, err := obj.argNames() if err != nil { return "", err @@ -501,7 +499,7 @@ func (obj *OperatorPolyFunc) ArgGen(index int) (string, error) { } // Unify returns the list of invariants that this func produces. -func (obj *OperatorPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { +func (obj *OperatorFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { var invariants []interfaces.Invariant var invar interfaces.Invariant @@ -746,7 +744,7 @@ func (obj *OperatorPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant // Polymorphisms returns the list of possible function signatures available for // this static polymorphic function. It relies on type and value hints to limit // the number of returned possibilities. -func (obj *OperatorPolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { +func (obj *OperatorFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { var op string var size = -1 @@ -793,7 +791,7 @@ func (obj *OperatorPolyFunc) Polymorphisms(partialType *types.Type, partialValue // and must be run before Info() and any of the other Func interface methods are // used. This function is idempotent, as long as the arg isn't changed between // runs. -func (obj *OperatorPolyFunc) Build(typ *types.Type) error { +func (obj *OperatorFunc) Build(typ *types.Type) error { // typ is the KindFunc signature we're trying to build... if len(typ.Ord) < 1 { return fmt.Errorf("the operator function needs at least 1 arg") @@ -807,7 +805,7 @@ func (obj *OperatorPolyFunc) Build(typ *types.Type) error { } // Validate tells us if the input struct takes a valid form. -func (obj *OperatorPolyFunc) Validate() error { +func (obj *OperatorFunc) Validate() error { if obj.Type == nil { // build must be run first return fmt.Errorf("type is still unspecified") } @@ -819,7 +817,7 @@ func (obj *OperatorPolyFunc) Validate() error { // Info returns some static info about itself. Build must be called before this // will return correct data. -func (obj *OperatorPolyFunc) Info() *interfaces.Info { +func (obj *OperatorFunc) Info() *interfaces.Info { return &interfaces.Info{ Pure: true, Memo: false, @@ -829,14 +827,13 @@ func (obj *OperatorPolyFunc) Info() *interfaces.Info { } // Init runs some startup code for this function. -func (obj *OperatorPolyFunc) Init(init *interfaces.Init) error { +func (obj *OperatorFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *OperatorPolyFunc) Stream() error { +func (obj *OperatorFunc) Stream(ctx context.Context) error { var op, lastOp string var fn *types.FuncValue defer close(obj.init.Output) // the sender closes @@ -896,24 +893,18 @@ func (obj *OperatorPolyFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } -// Close runs some shutdown code for this function and turns off the stream. -func (obj *OperatorPolyFunc) Close() error { - close(obj.closeChan) - return nil -} - // removeOperatorArg returns a copy of the input KindFunc type, without the // operator arg which specifies which operator we're using. It *is* idempotent. func removeOperatorArg(typ *types.Type) *types.Type { diff --git a/lang/funcs/simple/simple.go b/lang/funcs/simple/simple.go index 11d07487..f9d4be05 100644 --- a/lang/funcs/simple/simple.go +++ b/lang/funcs/simple/simple.go @@ -18,6 +18,7 @@ package simple import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/funcs" @@ -78,8 +79,6 @@ type WrappedFunc struct { last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -128,12 +127,11 @@ func (obj *WrappedFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *WrappedFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *WrappedFunc) Stream() error { +func (obj *WrappedFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -172,7 +170,7 @@ func (obj *WrappedFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -181,14 +179,8 @@ func (obj *WrappedFunc) Stream() error { if len(obj.Fn.Type().Ord) == 0 { return nil // no more values, we're a pure func } - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *WrappedFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/simplepoly/simplepoly.go b/lang/funcs/simplepoly/simplepoly.go index 18e81454..2f9fba79 100644 --- a/lang/funcs/simplepoly/simplepoly.go +++ b/lang/funcs/simplepoly/simplepoly.go @@ -18,6 +18,7 @@ package simplepoly import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/funcs" @@ -141,8 +142,6 @@ type WrappedFunc struct { last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -546,12 +545,11 @@ func (obj *WrappedFunc) Info() *interfaces.Info { // Init runs some startup code for this function. func (obj *WrappedFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *WrappedFunc) Stream() error { +func (obj *WrappedFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -599,7 +597,7 @@ func (obj *WrappedFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -608,14 +606,8 @@ func (obj *WrappedFunc) Stream() error { if len(obj.fn.Type().Ord) == 0 { return nil // no more values, we're a pure func } - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *WrappedFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structlookup_polyfunc.go b/lang/funcs/structlookup_func.go similarity index 93% rename from lang/funcs/structlookup_polyfunc.go rename to lang/funcs/structlookup_func.go index 583b3b78..2b3ae3e6 100644 --- a/lang/funcs/structlookup_polyfunc.go +++ b/lang/funcs/structlookup_func.go @@ -18,6 +18,7 @@ package funcs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -37,11 +38,11 @@ const ( ) func init() { - Register(StructLookupFuncName, func() interfaces.Func { return &StructLookupPolyFunc{} }) // must register the func and name + Register(StructLookupFuncName, func() interfaces.Func { return &StructLookupFunc{} }) // must register the func and name } -// StructLookupPolyFunc is a key map lookup function. -type StructLookupPolyFunc struct { +// StructLookupFunc is a struct field lookup function. +type StructLookupFunc struct { Type *types.Type // Kind == Struct, that is used as the struct we lookup Out *types.Type // type of field we're extracting @@ -50,18 +51,16 @@ type StructLookupPolyFunc struct { field string result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct // can satisfy the pgraph.Vertex interface. -func (obj *StructLookupPolyFunc) String() string { +func (obj *StructLookupFunc) String() string { return StructLookupFuncName } // ArgGen returns the Nth arg name for this function. -func (obj *StructLookupPolyFunc) ArgGen(index int) (string, error) { +func (obj *StructLookupFunc) ArgGen(index int) (string, error) { seq := []string{structLookupArgNameStruct, structLookupArgNameField} if l := len(seq); index >= l { return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) @@ -70,7 +69,7 @@ func (obj *StructLookupPolyFunc) ArgGen(index int) (string, error) { } // Unify returns the list of invariants that this func produces. -func (obj *StructLookupPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { +func (obj *StructLookupFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) { var invariants []interfaces.Invariant var invar interfaces.Invariant @@ -309,7 +308,7 @@ func (obj *StructLookupPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invar // Polymorphisms returns the list of possible function signatures available for // this static polymorphic function. It relies on type and value hints to limit // the number of returned possibilities. -func (obj *StructLookupPolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { +func (obj *StructLookupFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) { // TODO: return `variant` as arg for now -- maybe there's a better way? variant := []*types.Type{types.NewType("func(struct variant, field str) variant")} @@ -395,7 +394,7 @@ func (obj *StructLookupPolyFunc) Polymorphisms(partialType *types.Type, partialV // and must be run before Info() and any of the other Func interface methods are // used. This function is idempotent, as long as the arg isn't changed between // runs. -func (obj *StructLookupPolyFunc) Build(typ *types.Type) error { +func (obj *StructLookupFunc) Build(typ *types.Type) error { // typ is the KindFunc signature we're trying to build... if typ.Kind != types.KindFunc { return fmt.Errorf("input type must be of kind func") @@ -434,7 +433,7 @@ func (obj *StructLookupPolyFunc) Build(typ *types.Type) error { } // Validate tells us if the input struct takes a valid form. -func (obj *StructLookupPolyFunc) Validate() error { +func (obj *StructLookupFunc) Validate() error { if obj.Type == nil { // build must be run first return fmt.Errorf("type is still unspecified") } @@ -455,7 +454,7 @@ func (obj *StructLookupPolyFunc) Validate() error { // Info returns some static info about itself. Build must be called before this // will return correct data. -func (obj *StructLookupPolyFunc) Info() *interfaces.Info { +func (obj *StructLookupFunc) Info() *interfaces.Info { var sig *types.Type if obj.Type != nil { // don't panic if called speculatively // TODO: can obj.Out be nil (a partial) ? @@ -470,14 +469,13 @@ func (obj *StructLookupPolyFunc) Info() *interfaces.Info { } // Init runs some startup code for this function. -func (obj *StructLookupPolyFunc) Init(init *interfaces.Init) error { +func (obj *StructLookupFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the changing values that this func has over time. -func (obj *StructLookupPolyFunc) Stream() error { +func (obj *StructLookupFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -522,20 +520,14 @@ func (obj *StructLookupPolyFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *StructLookupPolyFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structs/call.go b/lang/funcs/structs/call.go index a77ab699..80285000 100644 --- a/lang/funcs/structs/call.go +++ b/lang/funcs/structs/call.go @@ -18,6 +18,7 @@ package structs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -46,8 +47,6 @@ type CallFunc struct { init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -112,14 +111,13 @@ func (obj *CallFunc) Info() *interfaces.Info { // Init runs some startup code for this composite function. func (obj *CallFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream takes an input struct in the format as described in the Func and Graph // methods of the Expr, and returns the actual expected value as a stream based // on the changing inputs to that value. -func (obj *CallFunc) Stream() error { +func (obj *CallFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -171,21 +169,15 @@ func (obj *CallFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *CallFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structs/composite.go b/lang/funcs/structs/composite.go index 6c967cd0..959f20c6 100644 --- a/lang/funcs/structs/composite.go +++ b/lang/funcs/structs/composite.go @@ -18,6 +18,7 @@ package structs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -41,8 +42,6 @@ type CompositeFunc struct { init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -117,14 +116,13 @@ func (obj *CompositeFunc) Info() *interfaces.Info { // Init runs some startup code for this composite function. func (obj *CompositeFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream takes an input struct in the format as described in the Func and Graph // methods of the Expr, and returns the actual expected value as a stream based // on the changing inputs to that value. -func (obj *CompositeFunc) Stream() error { +func (obj *CompositeFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -139,7 +137,7 @@ func (obj *CompositeFunc) Stream() error { select { case obj.init.Output <- result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } @@ -207,21 +205,15 @@ func (obj *CompositeFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *CompositeFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structs/const.go b/lang/funcs/structs/const.go index b8c03f06..8f18c6ab 100644 --- a/lang/funcs/structs/const.go +++ b/lang/funcs/structs/const.go @@ -18,6 +18,7 @@ package structs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -33,8 +34,7 @@ const ( type ConstFunc struct { Value types.Value - init *interfaces.Init - closeChan chan struct{} + init *interfaces.Init } // String returns a simple name for this function. This is needed so this struct @@ -70,24 +70,17 @@ func (obj *ConstFunc) Info() *interfaces.Info { // Init runs some startup code for this const. func (obj *ConstFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream returns the single value that this const has, and then closes. -func (obj *ConstFunc) Stream() error { +func (obj *ConstFunc) Stream(ctx context.Context) error { select { case obj.init.Output <- obj.Value: // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } close(obj.init.Output) // signal that we're done sending return nil } - -// Close runs some shutdown code for this const and turns off the stream. -func (obj *ConstFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structs/function.go b/lang/funcs/structs/function.go index 85707f03..330fa296 100644 --- a/lang/funcs/structs/function.go +++ b/lang/funcs/structs/function.go @@ -18,6 +18,7 @@ package structs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/funcs" @@ -41,8 +42,6 @@ type FunctionFunc struct { init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -125,14 +124,13 @@ func (obj *FunctionFunc) Info() *interfaces.Info { // Init runs some startup code for this composite function. func (obj *FunctionFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream takes an input struct in the format as described in the Func and Graph // methods of the Expr, and returns the actual expected value as a stream based // on the changing inputs to that value. -func (obj *FunctionFunc) Stream() error { +func (obj *FunctionFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -158,7 +156,7 @@ func (obj *FunctionFunc) Stream() error { select { case obj.init.Output <- result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } @@ -194,21 +192,15 @@ func (obj *FunctionFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *FunctionFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structs/if.go b/lang/funcs/structs/if.go index 80e1b803..846eca4b 100644 --- a/lang/funcs/structs/if.go +++ b/lang/funcs/structs/if.go @@ -18,6 +18,7 @@ package structs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -37,8 +38,6 @@ type IfFunc struct { init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -82,14 +81,13 @@ func (obj *IfFunc) Info() *interfaces.Info { // Init runs some startup code for this if expression function. func (obj *IfFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream takes an input struct in the format as described in the Func and Graph // methods of the Expr, and returns the actual expected value as a stream based // on the changing inputs to that value. -func (obj *IfFunc) Stream() error { +func (obj *IfFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -119,21 +117,15 @@ func (obj *IfFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *IfFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/funcs/structs/var.go b/lang/funcs/structs/var.go index 9dd88aec..8fef1330 100644 --- a/lang/funcs/structs/var.go +++ b/lang/funcs/structs/var.go @@ -18,6 +18,7 @@ package structs import ( + "context" "fmt" "github.com/purpleidea/mgmt/lang/interfaces" @@ -40,8 +41,6 @@ type VarFunc struct { init *interfaces.Init last types.Value // last value received to use for diff result types.Value // last calculated output - - closeChan chan struct{} } // String returns a simple name for this function. This is needed so this struct @@ -84,14 +83,13 @@ func (obj *VarFunc) Info() *interfaces.Info { // Init runs some startup code for this composite function. func (obj *VarFunc) Init(init *interfaces.Init) error { obj.init = init - obj.closeChan = make(chan struct{}) return nil } // Stream takes an input struct in the format as described in the Func and Graph // methods of the Expr, and returns the actual expected value as a stream based // on the changing inputs to that value. -func (obj *VarFunc) Stream() error { +func (obj *VarFunc) Stream(ctx context.Context) error { defer close(obj.init.Output) // the sender closes for { select { @@ -121,21 +119,15 @@ func (obj *VarFunc) Stream() error { } obj.result = result // store new result - case <-obj.closeChan: + case <-ctx.Done(): return nil } select { case obj.init.Output <- obj.result: // send // pass - case <-obj.closeChan: + case <-ctx.Done(): return nil } } } - -// Close runs some shutdown code for this function and turns off the stream. -func (obj *VarFunc) Close() error { - close(obj.closeChan) - return nil -} diff --git a/lang/interfaces/func.go b/lang/interfaces/func.go index a879a41f..6f381ae5 100644 --- a/lang/interfaces/func.go +++ b/lang/interfaces/func.go @@ -18,6 +18,7 @@ package interfaces import ( + "context" "fmt" "strings" @@ -69,9 +70,18 @@ type Func interface { // not known yet. This is because the Info method might be called // speculatively to aid in type unification. Info() *Info + + // Init passes some important values and references to the function. Init(*Init) error - Stream() error - Close() error + + // Stream is the mainloop of the function. It reads and writes from + // channels to return the changing values that this func has over time. + // It should shutdown and cleanup when the input context is cancelled. + // It must not exit before any goroutines it spawned have terminated. + // It must close the Output chan if it's done sending new values out. It + // must send at least one value, or return an error. It may also return + // an error at anytime if it can't continue. + Stream(context.Context) error } // PolyFunc is an interface for functions which are statically polymorphic. In