lang: funcs: Add runner pure func execution
This adds a function runner that runs pure functions. It will hopefully be useful for speculative execution of functions for compile time determination of types.
This commit is contained in:
@@ -21,8 +21,14 @@ package funcs
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/purpleidea/mgmt/lang/interfaces"
|
||||
"github.com/purpleidea/mgmt/lang/types"
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
|
||||
multierr "github.com/hashicorp/go-multierror"
|
||||
errwrap "github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -128,3 +134,162 @@ func Map() map[string]func() interfaces.Func {
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// PureFuncExec is usually used to provisionally speculate about the result of a
|
||||
// pure function, by running it once, and returning the result. Pure functions
|
||||
// are expected to only produce one value that depends only on the input values.
|
||||
// This won't run any slow functions either.
|
||||
func PureFuncExec(handle interfaces.Func, args []types.Value) (types.Value, error) {
|
||||
hostname := "" // XXX: add to interface
|
||||
debug := false // XXX: add to interface
|
||||
logf := func(format string, v ...interface{}) {} // XXX: add to interface
|
||||
|
||||
info := handle.Info()
|
||||
if !info.Pure {
|
||||
return nil, fmt.Errorf("func is not pure")
|
||||
}
|
||||
// if function is expensive to run, we won't run it provisionally
|
||||
if info.Slow {
|
||||
return nil, fmt.Errorf("func is slow")
|
||||
}
|
||||
|
||||
if err := handle.Validate(); err != nil {
|
||||
return nil, errwrap.Wrapf(err, "could not validate func")
|
||||
}
|
||||
|
||||
sig := handle.Info().Sig
|
||||
if sig.Kind != types.KindFunc {
|
||||
return nil, fmt.Errorf("must be kind func")
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
defer wg.Wait()
|
||||
|
||||
errch := make(chan error)
|
||||
input := make(chan types.Value) // we close this when we're done
|
||||
output := make(chan types.Value) // we create it, func closes it
|
||||
|
||||
init := &interfaces.Init{
|
||||
Hostname: hostname,
|
||||
Input: input,
|
||||
Output: output,
|
||||
World: nil, // should not be used for pure functions
|
||||
Debug: debug,
|
||||
Logf: func(format string, v ...interface{}) {
|
||||
logf("func: "+format, v...)
|
||||
},
|
||||
}
|
||||
|
||||
if err := handle.Init(init); err != nil {
|
||||
return nil, errwrap.Wrapf(err, "could not init func")
|
||||
}
|
||||
|
||||
close1 := make(chan struct{})
|
||||
close2 := make(chan struct{})
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer close(errch) // last one turns out the lights
|
||||
select {
|
||||
case <-close1:
|
||||
}
|
||||
select {
|
||||
case <-close2:
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer close(close1)
|
||||
if debug {
|
||||
logf("Running func")
|
||||
}
|
||||
err := handle.Stream() // sends to output chan
|
||||
if debug {
|
||||
logf("Exiting func")
|
||||
}
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
// we closed with an error...
|
||||
select {
|
||||
case errch <- errwrap.Wrapf(err, "problem streaming func"):
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer close(close2)
|
||||
defer close(input) // we only send one value
|
||||
if len(args) == 0 {
|
||||
return
|
||||
}
|
||||
si := &types.Type{
|
||||
// input to functions are structs
|
||||
Kind: types.KindStruct,
|
||||
Map: handle.Info().Sig.Map,
|
||||
Ord: handle.Info().Sig.Ord,
|
||||
}
|
||||
st := types.NewStruct(si)
|
||||
|
||||
for i, arg := range args {
|
||||
name := util.NumToAlpha(i) // assume (incorrectly) for now...
|
||||
if err := st.Set(name, arg); err != nil { // populate struct
|
||||
select {
|
||||
case errch <- errwrap.Wrapf(err, "struct set failure"):
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case input <- st: // send to function (must not block)
|
||||
case <-close1: // unblock the input send in case stream closed
|
||||
select {
|
||||
case errch <- fmt.Errorf("stream closed early"):
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
once := false
|
||||
var result types.Value
|
||||
var reterr error
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case value, ok := <-output: // read from channel
|
||||
if !ok {
|
||||
output = nil
|
||||
continue Loop // only exit via errch closing!
|
||||
}
|
||||
if once {
|
||||
reterr = fmt.Errorf("got more than one value")
|
||||
continue // only exit via errch closing!
|
||||
}
|
||||
once = true
|
||||
result = value // save value
|
||||
|
||||
case err, ok := <-errch: // handle possible errors
|
||||
if !ok {
|
||||
break Loop
|
||||
}
|
||||
e := errwrap.Wrapf(err, "problem streaming func")
|
||||
if reterr != nil {
|
||||
reterr = multierr.Append(reterr, e)
|
||||
} else {
|
||||
reterr = e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := handle.Close(); err != nil {
|
||||
if reterr != nil {
|
||||
err = multierr.Append(err, reterr)
|
||||
}
|
||||
return nil, errwrap.Wrapf(err, "problem closing func")
|
||||
}
|
||||
|
||||
return result, reterr
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user