From 9c9f2f558aabb17d34ddcfbd70e3ff8465174abc Mon Sep 17 00:00:00 2001 From: James Shubin Date: Sat, 19 Apr 2025 23:10:28 -0400 Subject: [PATCH] lang: Move out this legacy execution function Hasn't been used in a while, but it's fine if we want to use it for tests. --- lang/core/core_test.go | 177 ++++++++++++++++++++++++++++++++++++++- lang/funcs/funcs.go | 182 ----------------------------------------- 2 files changed, 176 insertions(+), 183 deletions(-) diff --git a/lang/core/core_test.go b/lang/core/core_test.go index 6181c59f..e79f3211 100644 --- a/lang/core/core_test.go +++ b/lang/core/core_test.go @@ -49,6 +49,181 @@ import ( "github.com/kylelemons/godebug/pretty" ) +// PureFuncExec is only used for tests. +func PureFuncExec(handle interfaces.Func, args []types.Value) (types.Value, error) { + hostname := "" // XXX: add to interface + debug := false // XXX: add to interface + logf := func(format string, v ...interface{}) {} // XXX: add to interface + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + info := handle.Info() + if !info.Pure { + return nil, fmt.Errorf("func is not pure") + } + // if function is expensive to run, we won't run it provisionally + if info.Slow { + return nil, fmt.Errorf("func is slow") + } + + sig := handle.Info().Sig + if sig.Kind != types.KindFunc { + return nil, fmt.Errorf("must be kind func") + } + if sig.HasUni() { + return nil, fmt.Errorf("func contains unification vars") + } + + if buildableFunc, ok := handle.(interfaces.BuildableFunc); ok { + if _, err := buildableFunc.Build(sig); err != nil { + return nil, fmt.Errorf("can't build function: %v", err) + } + } + + if err := handle.Validate(); err != nil { + return nil, errwrap.Wrapf(err, "could not validate func") + } + + ord := handle.Info().Sig.Ord + if i, j := len(ord), len(args); i != j { + return nil, fmt.Errorf("expected %d args, got %d", i, j) + } + + wg := &sync.WaitGroup{} + defer wg.Wait() + + errch := make(chan error) + input := make(chan types.Value) // we close this when we're done + output := make(chan types.Value) // we create it, func closes it + + init := &interfaces.Init{ + Hostname: hostname, + Input: input, + Output: output, + World: nil, // should not be used for pure functions + Debug: debug, + Logf: func(format string, v ...interface{}) { + logf("func: "+format, v...) + }, + } + + if err := handle.Init(init); err != nil { + return nil, errwrap.Wrapf(err, "could not init func") + } + + close1 := make(chan struct{}) + close2 := make(chan struct{}) + wg.Add(1) + go func() { + defer wg.Done() + defer close(errch) // last one turns out the lights + select { + case <-close1: + } + select { + case <-close2: + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer close(close1) + if debug { + logf("Running func") + } + err := handle.Stream(ctx) // sends to output chan + if debug { + logf("Exiting func") + } + if err == nil { + return + } + // we closed with an error... + select { + case errch <- errwrap.Wrapf(err, "problem streaming func"): + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer close(close2) + defer close(input) // we only send one value + if len(args) == 0 { + return + } + si := &types.Type{ + // input to functions are structs + Kind: types.KindStruct, + Map: handle.Info().Sig.Map, + Ord: handle.Info().Sig.Ord, + } + st := types.NewStruct(si) + + for i, arg := range args { + name := handle.Info().Sig.Ord[i] + if err := st.Set(name, arg); err != nil { // populate struct + select { + case errch <- errwrap.Wrapf(err, "struct set failure"): + } + return + } + } + + select { + case input <- st: // send to function (must not block) + case <-close1: // unblock the input send in case stream closed + select { + case errch <- fmt.Errorf("stream closed early"): + } + } + }() + + once := false + var result types.Value + var reterr error +Loop: + for { + select { + case value, ok := <-output: // read from channel + if !ok { + output = nil + continue Loop // only exit via errch closing! + } + if once { + reterr = fmt.Errorf("got more than one value") + continue // only exit via errch closing! + } + once = true + result = value // save value + + case err, ok := <-errch: // handle possible errors + if !ok { + break Loop + } + if err == nil { + // programming error + err = fmt.Errorf("error was missing") + } + e := errwrap.Wrapf(err, "problem streaming func") + reterr = errwrap.Append(reterr, e) + } + } + + cancel() + + if result == nil && reterr == nil { + // programming error + // XXX: i think this can happen when we exit without error, but + // before we send one output message... not sure how this happens + // XXX: iow, we never send on output, and errch closes... + // XXX: this could happen if we send zero input args, and Stream exits without error + return nil, fmt.Errorf("function exited with nil result and nil error") + } + return result, reterr +} + func TestPureFuncExec0(t *testing.T) { type test struct { // an individual test name string @@ -139,7 +314,7 @@ func TestPureFuncExec0(t *testing.T) { return } - result, err := funcs.PureFuncExec(f, args) + result, err := PureFuncExec(f, args) if !fail && err != nil { t.Errorf("test #%d: func failed with: %+v", index, err) diff --git a/lang/funcs/funcs.go b/lang/funcs/funcs.go index e5960a2b..183bf504 100644 --- a/lang/funcs/funcs.go +++ b/lang/funcs/funcs.go @@ -31,17 +31,13 @@ package funcs import ( - "context" "fmt" "reflect" "runtime" "strings" - "sync" docsUtil "github.com/purpleidea/mgmt/docs/util" "github.com/purpleidea/mgmt/lang/interfaces" - "github.com/purpleidea/mgmt/lang/types" - "github.com/purpleidea/mgmt/util/errwrap" ) const ( @@ -292,181 +288,3 @@ func GetFunctionMetadata(fn interface{}) (*docsUtil.Metadata, error) { Typename: funcname, }, nil } - -// PureFuncExec is usually used to provisionally speculate about the result of a -// pure function, by running it once, and returning the result. Pure functions -// are expected to only produce one value that depends only on the input values. -// This won't run any slow functions either. -func PureFuncExec(handle interfaces.Func, args []types.Value) (types.Value, error) { - hostname := "" // XXX: add to interface - debug := false // XXX: add to interface - logf := func(format string, v ...interface{}) {} // XXX: add to interface - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - info := handle.Info() - if !info.Pure { - return nil, fmt.Errorf("func is not pure") - } - // if function is expensive to run, we won't run it provisionally - if info.Slow { - return nil, fmt.Errorf("func is slow") - } - - sig := handle.Info().Sig - if sig.Kind != types.KindFunc { - return nil, fmt.Errorf("must be kind func") - } - if sig.HasUni() { - return nil, fmt.Errorf("func contains unification vars") - } - - if buildableFunc, ok := handle.(interfaces.BuildableFunc); ok { - if _, err := buildableFunc.Build(sig); err != nil { - return nil, fmt.Errorf("can't build function: %v", err) - } - } - - if err := handle.Validate(); err != nil { - return nil, errwrap.Wrapf(err, "could not validate func") - } - - ord := handle.Info().Sig.Ord - if i, j := len(ord), len(args); i != j { - return nil, fmt.Errorf("expected %d args, got %d", i, j) - } - - wg := &sync.WaitGroup{} - defer wg.Wait() - - errch := make(chan error) - input := make(chan types.Value) // we close this when we're done - output := make(chan types.Value) // we create it, func closes it - - init := &interfaces.Init{ - Hostname: hostname, - Input: input, - Output: output, - World: nil, // should not be used for pure functions - Debug: debug, - Logf: func(format string, v ...interface{}) { - logf("func: "+format, v...) - }, - } - - if err := handle.Init(init); err != nil { - return nil, errwrap.Wrapf(err, "could not init func") - } - - close1 := make(chan struct{}) - close2 := make(chan struct{}) - wg.Add(1) - go func() { - defer wg.Done() - defer close(errch) // last one turns out the lights - select { - case <-close1: - } - select { - case <-close2: - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - defer close(close1) - if debug { - logf("Running func") - } - err := handle.Stream(ctx) // sends to output chan - if debug { - logf("Exiting func") - } - if err == nil { - return - } - // we closed with an error... - select { - case errch <- errwrap.Wrapf(err, "problem streaming func"): - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - defer close(close2) - defer close(input) // we only send one value - if len(args) == 0 { - return - } - si := &types.Type{ - // input to functions are structs - Kind: types.KindStruct, - Map: handle.Info().Sig.Map, - Ord: handle.Info().Sig.Ord, - } - st := types.NewStruct(si) - - for i, arg := range args { - name := handle.Info().Sig.Ord[i] - if err := st.Set(name, arg); err != nil { // populate struct - select { - case errch <- errwrap.Wrapf(err, "struct set failure"): - } - return - } - } - - select { - case input <- st: // send to function (must not block) - case <-close1: // unblock the input send in case stream closed - select { - case errch <- fmt.Errorf("stream closed early"): - } - } - }() - - once := false - var result types.Value - var reterr error -Loop: - for { - select { - case value, ok := <-output: // read from channel - if !ok { - output = nil - continue Loop // only exit via errch closing! - } - if once { - reterr = fmt.Errorf("got more than one value") - continue // only exit via errch closing! - } - once = true - result = value // save value - - case err, ok := <-errch: // handle possible errors - if !ok { - break Loop - } - if err == nil { - // programming error - err = fmt.Errorf("error was missing") - } - e := errwrap.Wrapf(err, "problem streaming func") - reterr = errwrap.Append(reterr, e) - } - } - - cancel() - - if result == nil && reterr == nil { - // programming error - // XXX: i think this can happen when we exit without error, but - // before we send one output message... not sure how this happens - // XXX: iow, we never send on output, and errch closes... - // XXX: this could happen if we send zero input args, and Stream exits without error - return nil, fmt.Errorf("function exited with nil result and nil error") - } - return result, reterr -}