lang: ast, funcs: Remove the secret channel from call

This removes the secret channel from the call function. Having it made
it more complicated to write new function engines, and it's not clear
why it was even needed in the first place. It seems that even the
current generation of function engines work just fine without it.

Co-authored-by: Samuel Gélineau <gelisam@gmail.com>
This commit is contained in:
James Shubin
2025-08-02 13:57:27 -04:00
parent 0a76910902
commit 86c6ee8dee
8 changed files with 226 additions and 52 deletions

View File

@@ -32,6 +32,7 @@ package structs
import (
"context"
"fmt"
"sync"
"github.com/purpleidea/mgmt/lang/interfaces"
"github.com/purpleidea/mgmt/lang/types"
@@ -61,14 +62,11 @@ type CallFunc struct {
ArgVertices []interfaces.Func
OutputVertex interfaces.Func
init *interfaces.Init
lastFuncValue *full.FuncValue // remember the last function value
// outputChan is an initially-nil channel from which we receive output
// lists from the subgraph. This channel is reset when the subgraph is
// recreated.
outputChan chan types.Value
}
// String returns a simple name for this function. This is needed so this struct
@@ -98,6 +96,10 @@ func (obj *CallFunc) Validate() error {
return fmt.Errorf("number of arg Funcs must match number of func args in the type")
}
if obj.OutputVertex == nil {
return fmt.Errorf("the output vertex is missing")
}
return nil
}
@@ -127,29 +129,46 @@ func (obj *CallFunc) Init(init *interfaces.Init) error {
// methods of the Expr, and returns the actual expected value as a stream based
// on the changing inputs to that value.
func (obj *CallFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
// XXX: is there a sync.Once sort of solution that would be more elegant here?
mutex := &sync.Mutex{}
done := false
send := func(ctx context.Context, b bool) error {
mutex.Lock()
defer mutex.Unlock()
if done {
return nil
}
done = true
defer close(obj.init.Output) // the sender closes
obj.outputChan = nil
if !b {
return nil
}
// send dummy value to the output
select {
case obj.init.Output <- types.NewFloat(): // XXX: dummy value
case <-ctx.Done():
return ctx.Err()
}
return nil
}
defer send(ctx, false) // just close
defer func() {
obj.init.Txn.Reverse()
}()
canReceiveMoreFuncValues := true
canReceiveMoreOutputValues := true
for {
if !canReceiveMoreFuncValues && !canReceiveMoreOutputValues {
// break
return nil
}
select {
case input, ok := <-obj.init.Input:
if !ok {
obj.init.Input = nil // block looping back here
canReceiveMoreFuncValues = false
continue
if !done {
return fmt.Errorf("input closed without ever sending anything")
}
return nil
}
value, exists := input.Struct()[obj.EdgeName]
@@ -179,24 +198,11 @@ func (obj *CallFunc) Stream(ctx context.Context) error {
if err := obj.replaceSubGraph(newFuncValue); err != nil {
return errwrap.Wrapf(err, "could not replace subgraph")
}
canReceiveMoreOutputValues = true
send(ctx, true) // send dummy and then close
continue
case outputValue, ok := <-obj.outputChan:
// send the new output value downstream
if !ok {
obj.outputChan = nil
canReceiveMoreOutputValues = false
continue
}
// send to the output
select {
case obj.init.Output <- outputValue:
case <-ctx.Done():
return nil
}
case <-ctx.Done():
return nil
}
@@ -214,13 +220,13 @@ func (obj *CallFunc) replaceSubGraph(newFuncValue *full.FuncValue) error {
//
// outputFunc -> "callSubgraphOutput"
// }
// XXX
// delete the old subgraph
if err := obj.init.Txn.Reverse(); err != nil {
return errwrap.Wrapf(err, "could not Reverse")
}
// create the new subgraph
// This passed in Txn has AddVertex, AddEdge, and possibly AddGraph
// methods called on it. Nothing else. It will _not_ call Commit or
// Reverse. It adds to the graph, and our Commit and Reverse operations
@@ -230,17 +236,11 @@ func (obj *CallFunc) replaceSubGraph(newFuncValue *full.FuncValue) error {
return errwrap.Wrapf(err, "could not call newFuncValue.Call()")
}
obj.outputChan = make(chan types.Value)
edgeName := ChannelBasedSinkFuncArgName
subgraphOutput := &ChannelBasedSinkFunc{
Name: "callSubgraphOutput",
Target: obj,
EdgeName: edgeName,
Chan: obj.outputChan,
Type: obj.Type,
}
// create the new subgraph
edgeName := OutputFuncArgName
edge := &interfaces.FuncEdge{Args: []string{edgeName}}
obj.init.Txn.AddVertex(subgraphOutput)
obj.init.Txn.AddEdge(outputFunc, subgraphOutput, edge)
obj.init.Txn.AddVertex(outputFunc)
obj.init.Txn.AddEdge(outputFunc, obj.OutputVertex, edge)
return obj.init.Txn.Commit()
}