lang: Move out this legacy execution function
Hasn't been used in a while, but it's fine if we want to use it for tests.
This commit is contained in:
@@ -49,6 +49,181 @@ import (
|
|||||||
"github.com/kylelemons/godebug/pretty"
|
"github.com/kylelemons/godebug/pretty"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// PureFuncExec is only used for tests.
|
||||||
|
func PureFuncExec(handle interfaces.Func, args []types.Value) (types.Value, error) {
|
||||||
|
hostname := "" // XXX: add to interface
|
||||||
|
debug := false // XXX: add to interface
|
||||||
|
logf := func(format string, v ...interface{}) {} // XXX: add to interface
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
info := handle.Info()
|
||||||
|
if !info.Pure {
|
||||||
|
return nil, fmt.Errorf("func is not pure")
|
||||||
|
}
|
||||||
|
// if function is expensive to run, we won't run it provisionally
|
||||||
|
if info.Slow {
|
||||||
|
return nil, fmt.Errorf("func is slow")
|
||||||
|
}
|
||||||
|
|
||||||
|
sig := handle.Info().Sig
|
||||||
|
if sig.Kind != types.KindFunc {
|
||||||
|
return nil, fmt.Errorf("must be kind func")
|
||||||
|
}
|
||||||
|
if sig.HasUni() {
|
||||||
|
return nil, fmt.Errorf("func contains unification vars")
|
||||||
|
}
|
||||||
|
|
||||||
|
if buildableFunc, ok := handle.(interfaces.BuildableFunc); ok {
|
||||||
|
if _, err := buildableFunc.Build(sig); err != nil {
|
||||||
|
return nil, fmt.Errorf("can't build function: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := handle.Validate(); err != nil {
|
||||||
|
return nil, errwrap.Wrapf(err, "could not validate func")
|
||||||
|
}
|
||||||
|
|
||||||
|
ord := handle.Info().Sig.Ord
|
||||||
|
if i, j := len(ord), len(args); i != j {
|
||||||
|
return nil, fmt.Errorf("expected %d args, got %d", i, j)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
defer wg.Wait()
|
||||||
|
|
||||||
|
errch := make(chan error)
|
||||||
|
input := make(chan types.Value) // we close this when we're done
|
||||||
|
output := make(chan types.Value) // we create it, func closes it
|
||||||
|
|
||||||
|
init := &interfaces.Init{
|
||||||
|
Hostname: hostname,
|
||||||
|
Input: input,
|
||||||
|
Output: output,
|
||||||
|
World: nil, // should not be used for pure functions
|
||||||
|
Debug: debug,
|
||||||
|
Logf: func(format string, v ...interface{}) {
|
||||||
|
logf("func: "+format, v...)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := handle.Init(init); err != nil {
|
||||||
|
return nil, errwrap.Wrapf(err, "could not init func")
|
||||||
|
}
|
||||||
|
|
||||||
|
close1 := make(chan struct{})
|
||||||
|
close2 := make(chan struct{})
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
defer close(errch) // last one turns out the lights
|
||||||
|
select {
|
||||||
|
case <-close1:
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-close2:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
defer close(close1)
|
||||||
|
if debug {
|
||||||
|
logf("Running func")
|
||||||
|
}
|
||||||
|
err := handle.Stream(ctx) // sends to output chan
|
||||||
|
if debug {
|
||||||
|
logf("Exiting func")
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// we closed with an error...
|
||||||
|
select {
|
||||||
|
case errch <- errwrap.Wrapf(err, "problem streaming func"):
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
defer close(close2)
|
||||||
|
defer close(input) // we only send one value
|
||||||
|
if len(args) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
si := &types.Type{
|
||||||
|
// input to functions are structs
|
||||||
|
Kind: types.KindStruct,
|
||||||
|
Map: handle.Info().Sig.Map,
|
||||||
|
Ord: handle.Info().Sig.Ord,
|
||||||
|
}
|
||||||
|
st := types.NewStruct(si)
|
||||||
|
|
||||||
|
for i, arg := range args {
|
||||||
|
name := handle.Info().Sig.Ord[i]
|
||||||
|
if err := st.Set(name, arg); err != nil { // populate struct
|
||||||
|
select {
|
||||||
|
case errch <- errwrap.Wrapf(err, "struct set failure"):
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case input <- st: // send to function (must not block)
|
||||||
|
case <-close1: // unblock the input send in case stream closed
|
||||||
|
select {
|
||||||
|
case errch <- fmt.Errorf("stream closed early"):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
once := false
|
||||||
|
var result types.Value
|
||||||
|
var reterr error
|
||||||
|
Loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case value, ok := <-output: // read from channel
|
||||||
|
if !ok {
|
||||||
|
output = nil
|
||||||
|
continue Loop // only exit via errch closing!
|
||||||
|
}
|
||||||
|
if once {
|
||||||
|
reterr = fmt.Errorf("got more than one value")
|
||||||
|
continue // only exit via errch closing!
|
||||||
|
}
|
||||||
|
once = true
|
||||||
|
result = value // save value
|
||||||
|
|
||||||
|
case err, ok := <-errch: // handle possible errors
|
||||||
|
if !ok {
|
||||||
|
break Loop
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
// programming error
|
||||||
|
err = fmt.Errorf("error was missing")
|
||||||
|
}
|
||||||
|
e := errwrap.Wrapf(err, "problem streaming func")
|
||||||
|
reterr = errwrap.Append(reterr, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
if result == nil && reterr == nil {
|
||||||
|
// programming error
|
||||||
|
// XXX: i think this can happen when we exit without error, but
|
||||||
|
// before we send one output message... not sure how this happens
|
||||||
|
// XXX: iow, we never send on output, and errch closes...
|
||||||
|
// XXX: this could happen if we send zero input args, and Stream exits without error
|
||||||
|
return nil, fmt.Errorf("function exited with nil result and nil error")
|
||||||
|
}
|
||||||
|
return result, reterr
|
||||||
|
}
|
||||||
|
|
||||||
func TestPureFuncExec0(t *testing.T) {
|
func TestPureFuncExec0(t *testing.T) {
|
||||||
type test struct { // an individual test
|
type test struct { // an individual test
|
||||||
name string
|
name string
|
||||||
@@ -139,7 +314,7 @@ func TestPureFuncExec0(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := funcs.PureFuncExec(f, args)
|
result, err := PureFuncExec(f, args)
|
||||||
|
|
||||||
if !fail && err != nil {
|
if !fail && err != nil {
|
||||||
t.Errorf("test #%d: func failed with: %+v", index, err)
|
t.Errorf("test #%d: func failed with: %+v", index, err)
|
||||||
|
|||||||
@@ -31,17 +31,13 @@
|
|||||||
package funcs
|
package funcs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
|
|
||||||
docsUtil "github.com/purpleidea/mgmt/docs/util"
|
docsUtil "github.com/purpleidea/mgmt/docs/util"
|
||||||
"github.com/purpleidea/mgmt/lang/interfaces"
|
"github.com/purpleidea/mgmt/lang/interfaces"
|
||||||
"github.com/purpleidea/mgmt/lang/types"
|
|
||||||
"github.com/purpleidea/mgmt/util/errwrap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -292,181 +288,3 @@ func GetFunctionMetadata(fn interface{}) (*docsUtil.Metadata, error) {
|
|||||||
Typename: funcname,
|
Typename: funcname,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PureFuncExec is usually used to provisionally speculate about the result of a
|
|
||||||
// pure function, by running it once, and returning the result. Pure functions
|
|
||||||
// are expected to only produce one value that depends only on the input values.
|
|
||||||
// This won't run any slow functions either.
|
|
||||||
func PureFuncExec(handle interfaces.Func, args []types.Value) (types.Value, error) {
|
|
||||||
hostname := "" // XXX: add to interface
|
|
||||||
debug := false // XXX: add to interface
|
|
||||||
logf := func(format string, v ...interface{}) {} // XXX: add to interface
|
|
||||||
ctx, cancel := context.WithCancel(context.TODO())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
info := handle.Info()
|
|
||||||
if !info.Pure {
|
|
||||||
return nil, fmt.Errorf("func is not pure")
|
|
||||||
}
|
|
||||||
// if function is expensive to run, we won't run it provisionally
|
|
||||||
if info.Slow {
|
|
||||||
return nil, fmt.Errorf("func is slow")
|
|
||||||
}
|
|
||||||
|
|
||||||
sig := handle.Info().Sig
|
|
||||||
if sig.Kind != types.KindFunc {
|
|
||||||
return nil, fmt.Errorf("must be kind func")
|
|
||||||
}
|
|
||||||
if sig.HasUni() {
|
|
||||||
return nil, fmt.Errorf("func contains unification vars")
|
|
||||||
}
|
|
||||||
|
|
||||||
if buildableFunc, ok := handle.(interfaces.BuildableFunc); ok {
|
|
||||||
if _, err := buildableFunc.Build(sig); err != nil {
|
|
||||||
return nil, fmt.Errorf("can't build function: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := handle.Validate(); err != nil {
|
|
||||||
return nil, errwrap.Wrapf(err, "could not validate func")
|
|
||||||
}
|
|
||||||
|
|
||||||
ord := handle.Info().Sig.Ord
|
|
||||||
if i, j := len(ord), len(args); i != j {
|
|
||||||
return nil, fmt.Errorf("expected %d args, got %d", i, j)
|
|
||||||
}
|
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
|
||||||
defer wg.Wait()
|
|
||||||
|
|
||||||
errch := make(chan error)
|
|
||||||
input := make(chan types.Value) // we close this when we're done
|
|
||||||
output := make(chan types.Value) // we create it, func closes it
|
|
||||||
|
|
||||||
init := &interfaces.Init{
|
|
||||||
Hostname: hostname,
|
|
||||||
Input: input,
|
|
||||||
Output: output,
|
|
||||||
World: nil, // should not be used for pure functions
|
|
||||||
Debug: debug,
|
|
||||||
Logf: func(format string, v ...interface{}) {
|
|
||||||
logf("func: "+format, v...)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := handle.Init(init); err != nil {
|
|
||||||
return nil, errwrap.Wrapf(err, "could not init func")
|
|
||||||
}
|
|
||||||
|
|
||||||
close1 := make(chan struct{})
|
|
||||||
close2 := make(chan struct{})
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
defer close(errch) // last one turns out the lights
|
|
||||||
select {
|
|
||||||
case <-close1:
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-close2:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
defer close(close1)
|
|
||||||
if debug {
|
|
||||||
logf("Running func")
|
|
||||||
}
|
|
||||||
err := handle.Stream(ctx) // sends to output chan
|
|
||||||
if debug {
|
|
||||||
logf("Exiting func")
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// we closed with an error...
|
|
||||||
select {
|
|
||||||
case errch <- errwrap.Wrapf(err, "problem streaming func"):
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
defer close(close2)
|
|
||||||
defer close(input) // we only send one value
|
|
||||||
if len(args) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
si := &types.Type{
|
|
||||||
// input to functions are structs
|
|
||||||
Kind: types.KindStruct,
|
|
||||||
Map: handle.Info().Sig.Map,
|
|
||||||
Ord: handle.Info().Sig.Ord,
|
|
||||||
}
|
|
||||||
st := types.NewStruct(si)
|
|
||||||
|
|
||||||
for i, arg := range args {
|
|
||||||
name := handle.Info().Sig.Ord[i]
|
|
||||||
if err := st.Set(name, arg); err != nil { // populate struct
|
|
||||||
select {
|
|
||||||
case errch <- errwrap.Wrapf(err, "struct set failure"):
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case input <- st: // send to function (must not block)
|
|
||||||
case <-close1: // unblock the input send in case stream closed
|
|
||||||
select {
|
|
||||||
case errch <- fmt.Errorf("stream closed early"):
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
once := false
|
|
||||||
var result types.Value
|
|
||||||
var reterr error
|
|
||||||
Loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case value, ok := <-output: // read from channel
|
|
||||||
if !ok {
|
|
||||||
output = nil
|
|
||||||
continue Loop // only exit via errch closing!
|
|
||||||
}
|
|
||||||
if once {
|
|
||||||
reterr = fmt.Errorf("got more than one value")
|
|
||||||
continue // only exit via errch closing!
|
|
||||||
}
|
|
||||||
once = true
|
|
||||||
result = value // save value
|
|
||||||
|
|
||||||
case err, ok := <-errch: // handle possible errors
|
|
||||||
if !ok {
|
|
||||||
break Loop
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
// programming error
|
|
||||||
err = fmt.Errorf("error was missing")
|
|
||||||
}
|
|
||||||
e := errwrap.Wrapf(err, "problem streaming func")
|
|
||||||
reterr = errwrap.Append(reterr, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
|
|
||||||
if result == nil && reterr == nil {
|
|
||||||
// programming error
|
|
||||||
// XXX: i think this can happen when we exit without error, but
|
|
||||||
// before we send one output message... not sure how this happens
|
|
||||||
// XXX: iow, we never send on output, and errch closes...
|
|
||||||
// XXX: this could happen if we send zero input args, and Stream exits without error
|
|
||||||
return nil, fmt.Errorf("function exited with nil result and nil error")
|
|
||||||
}
|
|
||||||
return result, reterr
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user