lang: core, funcs: Port some functions to CallableFunc API

Some modern features of our function engine and language might require
this new API, so port what we can and figure out the rest later.
This commit is contained in:
James Shubin
2025-03-16 23:23:57 -04:00
parent f313380480
commit 642c6b952f
29 changed files with 702 additions and 291 deletions

View File

@@ -106,4 +106,7 @@ type Fact interface {
Info() *Info
Init(*Init) error
Stream(context.Context) error
// TODO: should we require this here? What about a CallableFact instead?
Call(context.Context) (types.Value, error)
}

View File

@@ -94,3 +94,8 @@ func (obj *FactFunc) Init(init *interfaces.Init) error {
func (obj *FactFunc) Stream(ctx context.Context) error {
return obj.Fact.Stream(ctx)
}
// Call this fact and return the value if it is possible to do so at this time.
func (obj *FactFunc) Call(ctx context.Context, _ []types.Value) (types.Value, error) {
return obj.Fact.Call(ctx)
}

View File

@@ -64,6 +64,9 @@ func init() {
"func(float, float) float", // floating-point addition
}),
F: func(ctx context.Context, input []types.Value) (types.Value, error) {
if l := len(input); l != 2 { // catch programming bugs
return nil, fmt.Errorf("invalid len %d", l)
}
switch k := input[0].Type().Kind; k {
case types.KindStr:
return &types.StrValue{
@@ -476,6 +479,9 @@ type OperatorFunc struct {
init *interfaces.Init
last types.Value // last value received to use for diff
lastOp string
fn interfaces.FuncSig
result types.Value // last calculated output
}
@@ -654,8 +660,6 @@ func (obj *OperatorFunc) Init(init *interfaces.Init) error {
// Stream returns the changing values that this func has over time.
func (obj *OperatorFunc) Stream(ctx context.Context) error {
var op, lastOp string
var fn interfaces.FuncSig
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -685,43 +689,12 @@ func (obj *OperatorFunc) Stream(ctx context.Context) error {
return fmt.Errorf("bad args, got: %v, want: %v", keys, obj.Type.Ord)
}
// build up arg list
args := []types.Value{}
for _, name := range obj.Type.Ord {
v, exists := input.Struct()[name]
if !exists {
// programming error
return fmt.Errorf("function engine was early, missing arg: %s", name)
}
if name == operatorArgName {
op = v.Str()
continue // skip over the operator arg
}
args = append(args, v)
args, err := interfaces.StructToCallableArgs(input) // []types.Value, error)
if err != nil {
return err
}
if op == "" {
// programming error
return fmt.Errorf("operator cannot be empty, args: %v", keys)
}
// operator selection is dynamic now, although mostly it
// should not change... to do so is probably uncommon...
if fn == nil {
fn = obj.findFunc(op)
} else if op != lastOp {
// TODO: check sig is compatible instead?
return fmt.Errorf("op changed from %s to %s", lastOp, op)
}
if fn == nil {
return fmt.Errorf("func not found for operator `%s` with sig: `%+v`", op, obj.Type)
}
lastOp = op
var result types.Value
result, err := fn(ctx, args) // (Value, error)
result, err := obj.Call(ctx, args) // (Value, error)
if err != nil {
return errwrap.Wrapf(err, "problem running function")
}
@@ -749,6 +722,42 @@ func (obj *OperatorFunc) Stream(ctx context.Context) error {
}
}
// Call this function with the input args and return the value if it is possible
// to do so at this time.
func (obj *OperatorFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) {
op := args[0].Str()
if op == "" {
// programming error
return nil, fmt.Errorf("operator cannot be empty, args: %v", args)
}
// operator selection is dynamic now, although mostly it
// should not change... to do so is probably uncommon...
if obj.fn == nil {
obj.fn = obj.findFunc(op)
} else if op != obj.lastOp {
// TODO: check sig is compatible instead?
return nil, fmt.Errorf("op changed from %s to %s", obj.lastOp, op)
}
if obj.fn == nil {
return nil, fmt.Errorf("func not found for operator `%s` with sig: `%+v`", op, obj.Type)
}
obj.lastOp = op
newArgs := []types.Value{}
for i, x := range args {
if i == 0 {
continue // skip over the operator
}
newArgs = append(newArgs, x)
}
return obj.fn(ctx, newArgs) // (Value, error)
}
// removeOperatorArg returns a copy of the input KindFunc type, without the
// operator arg which specifies which operator we're using. It *is* idempotent.
func removeOperatorArg(typ *types.Type) *types.Type {

View File

@@ -108,15 +108,20 @@ func (obj *ChannelBasedSinkFunc) Stream(ctx context.Context) error {
return nil // can't output any more
}
value, exists := input.Struct()[obj.EdgeName]
if !exists {
return fmt.Errorf("programming error, can't find edge")
args, err := interfaces.StructToCallableArgs(input) // []types.Value, error)
if err != nil {
return err
}
if obj.last != nil && value.Cmp(obj.last) == nil {
result, err := obj.Call(ctx, args) // get the value...
if err != nil {
return err
}
if obj.last != nil && result.Cmp(obj.last) == nil {
continue // value didn't change, skip it
}
obj.last = value // store so we can send after this select
obj.last = result // store so we can send after this select
case <-ctx.Done():
return nil
@@ -139,3 +144,13 @@ func (obj *ChannelBasedSinkFunc) Stream(ctx context.Context) error {
}
}
}
// Call this function with the input args and return the value if it is possible
// to do so at this time.
// XXX: Is is correct to implement this here for this particular function?
func (obj *ChannelBasedSinkFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) {
if len(args) != 1 {
return nil, fmt.Errorf("programming error, can't find edge")
}
return args[0], nil
}

View File

@@ -113,3 +113,13 @@ func (obj *ChannelBasedSourceFunc) Stream(ctx context.Context) error {
}
}
}
// XXX: Is is correct to implement this here for this particular function?
// XXX: tricky since this really receives input from a secret channel...
// XXX: ADD A MUTEX AROUND READING obj.last ???
//func (obj *ChannelBasedSourceFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) {
// if obj.last == nil {
// return nil, fmt.Errorf("programming error")
// }
// return obj.last, nil
//}

View File

@@ -115,12 +115,14 @@ func (obj *IfFunc) Stream(ctx context.Context) error {
}
obj.last = input // store for next
var result types.Value
args, err := interfaces.StructToCallableArgs(input) // []types.Value, error)
if err != nil {
return err
}
if input.Struct()["c"].Bool() {
result = input.Struct()["a"] // true branch
} else {
result = input.Struct()["b"] // false branch
result, err := obj.Call(ctx, args) // get the value...
if err != nil {
return err
}
// skip sending an update...
@@ -141,3 +143,13 @@ func (obj *IfFunc) Stream(ctx context.Context) error {
}
}
}
// Call this function with the input args and return the value if it is possible
// to do so at this time.
// XXX: Is is correct to implement this here for this particular function?
func (obj *IfFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) {
if c := args[0].Bool(); c {
return args[1], nil // true branch
}
return args[2], nil
}

View File

@@ -150,7 +150,7 @@ func (obj *Func) Stream(ctx context.Context) error {
if obj.init.Debug {
obj.init.Logf("Calling function with: %+v", values)
}
result, err := obj.Fn.Call(ctx, values) // (Value, error)
result, err := obj.Call(ctx, values) // (Value, error)
if err != nil {
if obj.init.Debug {
obj.init.Logf("Function returned error: %+v", err)
@@ -181,3 +181,9 @@ func (obj *Func) Stream(ctx context.Context) error {
}
}
}
// Call this function with the input args and return the value if it is possible
// to do so at this time.
func (obj *Func) Call(ctx context.Context, args []types.Value) (types.Value, error) {
return obj.Fn.Call(ctx, args) // (Value, error)
}