diff --git a/lang/ast/structs.go b/lang/ast/structs.go index 21ef72f3..cc300929 100644 --- a/lang/ast/structs.go +++ b/lang/ast/structs.go @@ -11000,20 +11000,36 @@ func (obj *ExprCall) Graph(env *interfaces.Env) (*pgraph.Graph, interfaces.Func, // Add a vertex for the call itself. edgeName := structs.CallFuncArgNameFunction + edgeNameDummy := structs.OutputFuncDummyArgName + + callSubgraphOutput := &structs.OutputFunc{ // the new graph shape thing! + Textarea: obj.Textarea, + Name: "callSubgraphOutput", + Type: obj.typ, + EdgeName: structs.OutputFuncArgName, + } + graph.AddVertex(callSubgraphOutput) + callFunc := &structs.CallFunc{ Textarea: obj.Textarea, - Type: obj.typ, - FuncType: ftyp, - EdgeName: edgeName, - ArgVertices: argFuncs, + Type: obj.typ, + FuncType: ftyp, + EdgeName: edgeName, + ArgVertices: argFuncs, + OutputVertex: callSubgraphOutput, } graph.AddVertex(callFunc) + graph.AddEdge(funcValueFunc, callFunc, &interfaces.FuncEdge{ Args: []string{edgeName}, }) - return graph, callFunc, nil + graph.AddEdge(callFunc, callSubgraphOutput, &interfaces.FuncEdge{ + Args: []string{edgeNameDummy}, + }) + + return graph, callSubgraphOutput, nil } // funcValueFunc is a helper function to make the code more readable. This was diff --git a/lang/funcs/structs/call.go b/lang/funcs/structs/call.go index a2cbde69..97ee43cf 100644 --- a/lang/funcs/structs/call.go +++ b/lang/funcs/structs/call.go @@ -32,6 +32,7 @@ package structs import ( "context" "fmt" + "sync" "github.com/purpleidea/mgmt/lang/interfaces" "github.com/purpleidea/mgmt/lang/types" @@ -61,14 +62,11 @@ type CallFunc struct { ArgVertices []interfaces.Func + OutputVertex interfaces.Func + init *interfaces.Init lastFuncValue *full.FuncValue // remember the last function value - - // outputChan is an initially-nil channel from which we receive output - // lists from the subgraph. This channel is reset when the subgraph is - // recreated. - outputChan chan types.Value } // String returns a simple name for this function. This is needed so this struct @@ -98,6 +96,10 @@ func (obj *CallFunc) Validate() error { return fmt.Errorf("number of arg Funcs must match number of func args in the type") } + if obj.OutputVertex == nil { + return fmt.Errorf("the output vertex is missing") + } + return nil } @@ -127,29 +129,46 @@ func (obj *CallFunc) Init(init *interfaces.Init) error { // methods of the Expr, and returns the actual expected value as a stream based // on the changing inputs to that value. func (obj *CallFunc) Stream(ctx context.Context) error { - defer close(obj.init.Output) // the sender closes + // XXX: is there a sync.Once sort of solution that would be more elegant here? + mutex := &sync.Mutex{} + done := false + send := func(ctx context.Context, b bool) error { + mutex.Lock() + defer mutex.Unlock() + if done { + return nil + } + done = true + defer close(obj.init.Output) // the sender closes - obj.outputChan = nil + if !b { + return nil + } + + // send dummy value to the output + select { + case obj.init.Output <- types.NewFloat(): // XXX: dummy value + case <-ctx.Done(): + return ctx.Err() + } + + return nil + } + defer send(ctx, false) // just close defer func() { obj.init.Txn.Reverse() }() - canReceiveMoreFuncValues := true - canReceiveMoreOutputValues := true for { - - if !canReceiveMoreFuncValues && !canReceiveMoreOutputValues { - // break - return nil - } - select { case input, ok := <-obj.init.Input: if !ok { obj.init.Input = nil // block looping back here - canReceiveMoreFuncValues = false - continue + if !done { + return fmt.Errorf("input closed without ever sending anything") + } + return nil } value, exists := input.Struct()[obj.EdgeName] @@ -179,24 +198,11 @@ func (obj *CallFunc) Stream(ctx context.Context) error { if err := obj.replaceSubGraph(newFuncValue); err != nil { return errwrap.Wrapf(err, "could not replace subgraph") } - canReceiveMoreOutputValues = true + + send(ctx, true) // send dummy and then close + continue - case outputValue, ok := <-obj.outputChan: - // send the new output value downstream - if !ok { - obj.outputChan = nil - canReceiveMoreOutputValues = false - continue - } - - // send to the output - select { - case obj.init.Output <- outputValue: - case <-ctx.Done(): - return nil - } - case <-ctx.Done(): return nil } @@ -214,13 +220,13 @@ func (obj *CallFunc) replaceSubGraph(newFuncValue *full.FuncValue) error { // // outputFunc -> "callSubgraphOutput" // } + // XXX // delete the old subgraph if err := obj.init.Txn.Reverse(); err != nil { return errwrap.Wrapf(err, "could not Reverse") } - // create the new subgraph // This passed in Txn has AddVertex, AddEdge, and possibly AddGraph // methods called on it. Nothing else. It will _not_ call Commit or // Reverse. It adds to the graph, and our Commit and Reverse operations @@ -230,17 +236,11 @@ func (obj *CallFunc) replaceSubGraph(newFuncValue *full.FuncValue) error { return errwrap.Wrapf(err, "could not call newFuncValue.Call()") } - obj.outputChan = make(chan types.Value) - edgeName := ChannelBasedSinkFuncArgName - subgraphOutput := &ChannelBasedSinkFunc{ - Name: "callSubgraphOutput", - Target: obj, - EdgeName: edgeName, - Chan: obj.outputChan, - Type: obj.Type, - } + // create the new subgraph + edgeName := OutputFuncArgName edge := &interfaces.FuncEdge{Args: []string{edgeName}} - obj.init.Txn.AddVertex(subgraphOutput) - obj.init.Txn.AddEdge(outputFunc, subgraphOutput, edge) + obj.init.Txn.AddVertex(outputFunc) + obj.init.Txn.AddEdge(outputFunc, obj.OutputVertex, edge) + return obj.init.Txn.Commit() } diff --git a/lang/funcs/structs/output.go b/lang/funcs/structs/output.go new file mode 100644 index 00000000..4809b4ca --- /dev/null +++ b/lang/funcs/structs/output.go @@ -0,0 +1,146 @@ +// Mgmt +// Copyright (C) James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// +// Additional permission under GNU GPL version 3 section 7 +// +// If you modify this program, or any covered work, by linking or combining it +// with embedded mcl code and modules (and that the embedded mcl code and +// modules which link with this program, contain a copy of their source code in +// the authoritative form) containing parts covered by the terms of any other +// license, the licensors of this program grant you additional permission to +// convey the resulting work. Furthermore, the licensors of this program grant +// the original author, James Shubin, additional permission to update this +// additional permission if he deems it necessary to achieve the goals of this +// additional permission. + +package structs + +import ( + "context" + "fmt" + + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/lang/types" +) + +const ( + // OutputFuncArgName is the name for the edge which connects + // the input value to OutputFunc. + OutputFuncArgName = "out" + + OutputFuncDummyArgName = "dummy" +) + +// OutputFunc is a Func which receives values from upstream nodes and emits them +// downstream. It accepts (and ignores) a "dummy" arg as well. +type OutputFunc struct { + interfaces.Textarea // XXX: Do we want this here for this func as well ? + + Name string + EdgeName string + Type *types.Type + + init *interfaces.Init + last types.Value // last value received to use for diff +} + +// String returns a simple name for this function. This is needed so this struct +// can satisfy the pgraph.Vertex interface. +func (obj *OutputFunc) String() string { + return obj.Name +} + +// ArgGen returns the Nth arg name for this function. +func (obj *OutputFunc) ArgGen(index int) (string, error) { + seq := []string{obj.EdgeName, OutputFuncDummyArgName} + if l := len(seq); index >= l { + return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) + } + return seq[index], nil +} + +// Validate makes sure we've built our struct properly. It is usually unused for +// normal functions that users can use directly. +func (obj *OutputFunc) Validate() error { + return nil +} + +// Info returns some static info about itself. +func (obj *OutputFunc) Info() *interfaces.Info { + // XXX: contains "dummy" return type + s := fmt.Sprintf("func(%s %s, %s float) %s", obj.EdgeName, obj.Type, OutputFuncDummyArgName, obj.Type) + return &interfaces.Info{ + Pure: false, + Memo: false, + Sig: types.NewType(s), + Err: obj.Validate(), + } +} + +// Init runs some startup code for this function. +func (obj *OutputFunc) Init(init *interfaces.Init) error { + obj.init = init + return nil +} + +// Stream returns the changing values that this func has over time. +func (obj *OutputFunc) Stream(ctx context.Context) error { + defer close(obj.init.Output) // the sender closes + + for { + select { + case input, ok := <-obj.init.Input: + if !ok { + return nil // can't output any more + } + + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + + result, err := obj.Call(ctx, args) // get the value... + if err != nil { + return err + } + + if obj.last != nil && result.Cmp(obj.last) == nil { + continue // value didn't change, skip it + } + obj.last = result // store so we can send after this select + + case <-ctx.Done(): + return nil + } + + select { + case obj.init.Output <- obj.last: // send + case <-ctx.Done(): + return nil + } + } +} + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *OutputFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + if len(args) != 2 { + return nil, fmt.Errorf("programming error, can't find edge") + } + // Send the useful input arg, not the dummy arg. + return args[0], nil +} diff --git a/lang/interpret_test/TestAstFunc1/changing-func.txtar b/lang/interpret_test/TestAstFunc1/changing-func.txtar index 859c76b8..6e333598 100644 --- a/lang/interpret_test/TestAstFunc1/changing-func.txtar +++ b/lang/interpret_test/TestAstFunc1/changing-func.txtar @@ -25,6 +25,8 @@ Edge: FuncValue -> if # a Edge: FuncValue -> if # a Edge: FuncValue -> if # b Edge: FuncValue -> if # b +Edge: call -> callSubgraphOutput # dummy +Edge: call -> callSubgraphOutput # dummy Edge: const: bool(false) -> if # c Edge: const: bool(true) -> if # c Edge: if -> call # fn @@ -35,6 +37,8 @@ Vertex: FuncValue Vertex: FuncValue Vertex: call Vertex: call +Vertex: callSubgraphOutput +Vertex: callSubgraphOutput Vertex: const: bool(false) Vertex: const: bool(true) Vertex: if diff --git a/lang/interpret_test/TestAstFunc1/returned-func.txtar b/lang/interpret_test/TestAstFunc1/returned-func.txtar index 5b882b19..012da900 100644 --- a/lang/interpret_test/TestAstFunc1/returned-func.txtar +++ b/lang/interpret_test/TestAstFunc1/returned-func.txtar @@ -12,5 +12,7 @@ $out = $fn() test "${out}" {} -- OUTPUT -- Edge: FuncValue -> call # fn +Edge: call -> callSubgraphOutput # dummy Vertex: FuncValue Vertex: call +Vertex: callSubgraphOutput diff --git a/lang/interpret_test/TestAstFunc1/returned-lambda.txtar b/lang/interpret_test/TestAstFunc1/returned-lambda.txtar index 8e7ead2a..406f5247 100644 --- a/lang/interpret_test/TestAstFunc1/returned-lambda.txtar +++ b/lang/interpret_test/TestAstFunc1/returned-lambda.txtar @@ -11,5 +11,7 @@ $out = $fn() test "${out}" {} -- OUTPUT -- Edge: FuncValue -> call # fn +Edge: call -> callSubgraphOutput # dummy Vertex: FuncValue Vertex: call +Vertex: callSubgraphOutput diff --git a/lang/interpret_test/TestAstFunc1/shape0.txtar b/lang/interpret_test/TestAstFunc1/shape0.txtar index cd8987df..b94e1ac2 100644 --- a/lang/interpret_test/TestAstFunc1/shape0.txtar +++ b/lang/interpret_test/TestAstFunc1/shape0.txtar @@ -13,11 +13,13 @@ $z = apply($add1, 1) test [fmt.printf("%d", $z),] {} -- OUTPUT -- Edge: FuncValue -> call # fn -Edge: call -> printf: func(format str, a int) str # a +Edge: call -> callSubgraphOutput # dummy +Edge: callSubgraphOutput -> printf: func(format str, a int) str # a Edge: const: str("%d") -> printf: func(format str, a int) str # format Edge: printf: func(format str, a int) str -> composite: []str # 0 Vertex: FuncValue Vertex: call +Vertex: callSubgraphOutput Vertex: composite: []str Vertex: const: int(1) Vertex: const: str("%d") diff --git a/lang/interpret_test/TestAstFunc1/shape8.txtar b/lang/interpret_test/TestAstFunc1/shape8.txtar index b2febe09..4a576f12 100644 --- a/lang/interpret_test/TestAstFunc1/shape8.txtar +++ b/lang/interpret_test/TestAstFunc1/shape8.txtar @@ -21,7 +21,8 @@ test [$lambda("hello"),] {} Edge: FuncValue -> if # a Edge: FuncValue -> if # b Edge: _operator -> if # c -Edge: call -> composite: []str # 0 +Edge: call -> callSubgraphOutput # dummy +Edge: callSubgraphOutput -> composite: []str # 0 Edge: const: int(0) -> _operator # b Edge: const: int(10) -> _operator # a Edge: const: str(">") -> _operator # op @@ -30,6 +31,7 @@ Vertex: FuncValue Vertex: FuncValue Vertex: _operator Vertex: call +Vertex: callSubgraphOutput Vertex: composite: []str Vertex: const: int(0) Vertex: const: int(10)