lang: funcs: Remove old engine code
This gets rid of the old engine implementation which can only run a static graph. We'll need a changing graph for lambdas.
This commit is contained in:
@@ -1,697 +0,0 @@
|
|||||||
// Mgmt
|
|
||||||
// Copyright (C) 2013-2023+ James Shubin and the project contributors
|
|
||||||
// Written by James Shubin <james@shubin.ca> and the project contributors
|
|
||||||
//
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package funcs
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/engine"
|
|
||||||
"github.com/purpleidea/mgmt/lang/interfaces"
|
|
||||||
"github.com/purpleidea/mgmt/lang/types"
|
|
||||||
"github.com/purpleidea/mgmt/pgraph"
|
|
||||||
"github.com/purpleidea/mgmt/util/errwrap"
|
|
||||||
)
|
|
||||||
|
|
||||||
// State represents the state of a function vertex. This corresponds to an AST
|
|
||||||
// expr, which is the memory address (pointer) in the graph.
|
|
||||||
type State struct {
|
|
||||||
Expr interfaces.Expr // pointer to the expr vertex
|
|
||||||
|
|
||||||
handle interfaces.Func // the function (if not nil, we've found it on init)
|
|
||||||
|
|
||||||
ctx context.Context // used for shutting down each Stream function.
|
|
||||||
cancel context.CancelFunc
|
|
||||||
|
|
||||||
init bool // have we run Init on our func?
|
|
||||||
ready bool // has it received all the args it needs at least once?
|
|
||||||
loaded bool // has the func run at least once ?
|
|
||||||
closed bool // did we close ourself down?
|
|
||||||
|
|
||||||
notify chan struct{} // ping here when new input values exist
|
|
||||||
|
|
||||||
input chan types.Value // the top level type must be a struct
|
|
||||||
output chan types.Value
|
|
||||||
|
|
||||||
mutex *sync.RWMutex // concurrency guard for modifying Expr with String/SetValue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Init creates the function state if it can be found in the registered list.
|
|
||||||
func (obj *State) Init() error {
|
|
||||||
handle, err := obj.Expr.Func() // build one and store it, don't re-gen
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := handle.Validate(); err != nil {
|
|
||||||
return errwrap.Wrapf(err, "could not validate func")
|
|
||||||
}
|
|
||||||
obj.handle = handle
|
|
||||||
|
|
||||||
sig := obj.handle.Info().Sig
|
|
||||||
if sig.Kind != types.KindFunc {
|
|
||||||
return fmt.Errorf("must be kind func")
|
|
||||||
}
|
|
||||||
if len(sig.Ord) > 0 {
|
|
||||||
// since we accept input, better get our notification chan built
|
|
||||||
obj.notify = make(chan struct{})
|
|
||||||
}
|
|
||||||
|
|
||||||
obj.input = make(chan types.Value) // we close this when we're done
|
|
||||||
obj.output = make(chan types.Value) // we create it, func closes it
|
|
||||||
|
|
||||||
obj.mutex = &sync.RWMutex{}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// String satisfies fmt.Stringer so that these print nicely.
|
|
||||||
func (obj *State) String() string {
|
|
||||||
// TODO: use global mutex since it's harder to add state specific mutex
|
|
||||||
//obj.mutex.RLock() // prevent race detector issues against SetValue
|
|
||||||
//defer obj.mutex.RUnlock()
|
|
||||||
// FIXME: also add read locks on any of the children Expr in obj.Expr
|
|
||||||
return obj.Expr.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Engine represents the running time varying directed acyclic function graph.
|
|
||||||
type Engine struct {
|
|
||||||
Graph *pgraph.Graph
|
|
||||||
Hostname string
|
|
||||||
World engine.World
|
|
||||||
Debug bool
|
|
||||||
Logf func(format string, v ...interface{})
|
|
||||||
|
|
||||||
// Glitch: https://en.wikipedia.org/wiki/Reactive_programming#Glitches
|
|
||||||
Glitch bool // allow glitching? (more responsive, but less accurate)
|
|
||||||
|
|
||||||
ag chan error // used to aggregate fact events without reflect
|
|
||||||
agLock *sync.Mutex
|
|
||||||
agCount int // last one turns out the light (closes the ag channel)
|
|
||||||
|
|
||||||
topologicalSort []pgraph.Vertex // cached sorting of the graph for perf
|
|
||||||
|
|
||||||
state map[pgraph.Vertex]*State // state associated with the vertex
|
|
||||||
|
|
||||||
mutex *sync.RWMutex // concurrency guard for the table map
|
|
||||||
table map[pgraph.Vertex]types.Value // live table of output values
|
|
||||||
|
|
||||||
loaded bool // are all of the funcs loaded?
|
|
||||||
loadedChan chan struct{} // funcs loaded signal
|
|
||||||
|
|
||||||
streamChan chan error // signals a new graph can be created or problem
|
|
||||||
|
|
||||||
ctx context.Context // used for shutting down each Stream function.
|
|
||||||
cancel context.CancelFunc
|
|
||||||
|
|
||||||
closeChan chan struct{} // close signal
|
|
||||||
wg *sync.WaitGroup
|
|
||||||
}
|
|
||||||
|
|
||||||
// Init initializes the struct. This is the first call you must make. Do not
|
|
||||||
// proceed with calls to other methods unless this succeeds first. This also
|
|
||||||
// loads all the functions by calling Init on each one in the graph.
|
|
||||||
// TODO: should Init take the graph as an input arg to keep it as a private
|
|
||||||
// field?
|
|
||||||
func (obj *Engine) Init() error {
|
|
||||||
obj.ag = make(chan error)
|
|
||||||
obj.agLock = &sync.Mutex{}
|
|
||||||
obj.state = make(map[pgraph.Vertex]*State)
|
|
||||||
obj.mutex = &sync.RWMutex{}
|
|
||||||
obj.table = make(map[pgraph.Vertex]types.Value)
|
|
||||||
obj.loadedChan = make(chan struct{})
|
|
||||||
obj.streamChan = make(chan error)
|
|
||||||
obj.closeChan = make(chan struct{})
|
|
||||||
obj.wg = &sync.WaitGroup{}
|
|
||||||
topologicalSort, err := obj.Graph.TopologicalSort()
|
|
||||||
if err != nil {
|
|
||||||
return errwrap.Wrapf(err, "topo sort failed")
|
|
||||||
}
|
|
||||||
obj.topologicalSort = topologicalSort // cache the result
|
|
||||||
|
|
||||||
obj.ctx, obj.cancel = context.WithCancel(context.Background()) // top
|
|
||||||
|
|
||||||
for _, vertex := range obj.Graph.Vertices() {
|
|
||||||
// is this an interface we can use?
|
|
||||||
if _, exists := obj.state[vertex]; exists {
|
|
||||||
return fmt.Errorf("vertex (%+v) is not unique in the graph", vertex)
|
|
||||||
}
|
|
||||||
|
|
||||||
expr, ok := vertex.(interfaces.Expr)
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("vertex (%+v) was not an expr", vertex)
|
|
||||||
}
|
|
||||||
|
|
||||||
if obj.Debug {
|
|
||||||
obj.Logf("Loading func `%s`", vertex)
|
|
||||||
}
|
|
||||||
|
|
||||||
innerCtx, innerCancel := context.WithCancel(obj.ctx)
|
|
||||||
obj.state[vertex] = &State{
|
|
||||||
Expr: expr,
|
|
||||||
|
|
||||||
ctx: innerCtx,
|
|
||||||
cancel: innerCancel,
|
|
||||||
} // store some state!
|
|
||||||
|
|
||||||
e1 := obj.state[vertex].Init()
|
|
||||||
e2 := errwrap.Wrapf(e1, "error loading func `%s`", vertex)
|
|
||||||
err = errwrap.Append(err, e2) // list of errors
|
|
||||||
}
|
|
||||||
if err != nil { // usually due to `not found` errors
|
|
||||||
return errwrap.Wrapf(err, "could not load requested funcs")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate the graph type checks properly and other tests. Must run Init first.
|
|
||||||
// This should check that: (1) all vertices have the correct number of inputs,
|
|
||||||
// (2) that the *Info signatures all match correctly, (3) that the argument
|
|
||||||
// names match correctly, and that the whole graph is statically correct.
|
|
||||||
func (obj *Engine) Validate() error {
|
|
||||||
inList := func(needle interfaces.Func, haystack []interfaces.Func) bool {
|
|
||||||
if needle == nil {
|
|
||||||
panic("nil value passed to inList") // catch bugs!
|
|
||||||
}
|
|
||||||
for _, x := range haystack {
|
|
||||||
if needle == x {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
var err error
|
|
||||||
ptrs := []interfaces.Func{} // Func is a ptr
|
|
||||||
for _, vertex := range obj.Graph.Vertices() {
|
|
||||||
node := obj.state[vertex]
|
|
||||||
// TODO: this doesn't work for facts because they're in the Func
|
|
||||||
// duplicate pointers would get closed twice, causing a panic...
|
|
||||||
if inList(node.handle, ptrs) { // check for duplicate ptrs!
|
|
||||||
e := fmt.Errorf("vertex `%s` has duplicate ptr", vertex)
|
|
||||||
err = errwrap.Append(err, e)
|
|
||||||
}
|
|
||||||
ptrs = append(ptrs, node.handle)
|
|
||||||
}
|
|
||||||
for _, edge := range obj.Graph.Edges() {
|
|
||||||
if _, ok := edge.(*interfaces.FuncEdge); !ok {
|
|
||||||
e := fmt.Errorf("edge `%s` was not the correct type", edge)
|
|
||||||
err = errwrap.Append(err, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err // stage the errors so the user can fix many at once!
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if vertices expecting inputs have them
|
|
||||||
for vertex, count := range obj.Graph.InDegree() {
|
|
||||||
node := obj.state[vertex]
|
|
||||||
if exp := len(node.handle.Info().Sig.Ord); exp != count {
|
|
||||||
e := fmt.Errorf("expected %d inputs to `%s`, got %d", exp, node, count)
|
|
||||||
if obj.Debug {
|
|
||||||
obj.Logf("expected %d inputs to `%s`, got %d", exp, node, count)
|
|
||||||
obj.Logf("expected: %+v for `%s`", node.handle.Info().Sig.Ord, node)
|
|
||||||
}
|
|
||||||
err = errwrap.Append(err, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// expected vertex -> argName
|
|
||||||
expected := make(map[*State]map[string]int) // expected input fields
|
|
||||||
for vertex1 := range obj.Graph.Adjacency() {
|
|
||||||
// check for outputs that don't go anywhere?
|
|
||||||
//node1 := obj.state[vertex1]
|
|
||||||
//if len(obj.Graph.Adjacency()[vertex1]) == 0 { // no vertex1 -> vertex2
|
|
||||||
// if node1.handle.Info().Sig.Output != nil {
|
|
||||||
// // an output value goes nowhere...
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
for vertex2 := range obj.Graph.Adjacency()[vertex1] { // populate
|
|
||||||
node2 := obj.state[vertex2]
|
|
||||||
expected[node2] = make(map[string]int)
|
|
||||||
for _, key := range node2.handle.Info().Sig.Ord {
|
|
||||||
expected[node2][key] = 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for vertex1 := range obj.Graph.Adjacency() {
|
|
||||||
node1 := obj.state[vertex1]
|
|
||||||
for vertex2, edge := range obj.Graph.Adjacency()[vertex1] {
|
|
||||||
node2 := obj.state[vertex2]
|
|
||||||
edge := edge.(*interfaces.FuncEdge)
|
|
||||||
// check vertex1 -> vertex2 (with e) is valid
|
|
||||||
|
|
||||||
for _, arg := range edge.Args { // loop over each arg
|
|
||||||
sig := node2.handle.Info().Sig
|
|
||||||
if len(sig.Ord) == 0 {
|
|
||||||
e := fmt.Errorf("no input expected from `%s` to `%s` with arg `%s`", node1, node2, arg)
|
|
||||||
err = errwrap.Append(err, e)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if count, exists := expected[node2][arg]; !exists {
|
|
||||||
e := fmt.Errorf("wrong input name from `%s` to `%s` with arg `%s`", node1, node2, arg)
|
|
||||||
err = errwrap.Append(err, e)
|
|
||||||
} else if count == 0 {
|
|
||||||
e := fmt.Errorf("duplicate input from `%s` to `%s` with arg `%s`", node1, node2, arg)
|
|
||||||
err = errwrap.Append(err, e)
|
|
||||||
}
|
|
||||||
expected[node2][arg]-- // subtract one use
|
|
||||||
|
|
||||||
out := node1.handle.Info().Sig.Out
|
|
||||||
if out == nil {
|
|
||||||
e := fmt.Errorf("no output possible from `%s` to `%s` with arg `%s`", node1, node2, arg)
|
|
||||||
err = errwrap.Append(err, e)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
typ, exists := sig.Map[arg] // key in struct
|
|
||||||
if !exists {
|
|
||||||
// second check of this!
|
|
||||||
e := fmt.Errorf("wrong input name from `%s` to `%s` with arg `%s`", node1, node2, arg)
|
|
||||||
err = errwrap.Append(err, errwrap.Wrapf(e, "programming error"))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if typ.Kind == types.KindVariant { // FIXME: hack for now
|
|
||||||
// pass (input arg variants)
|
|
||||||
} else if out.Kind == types.KindVariant { // FIXME: hack for now
|
|
||||||
// pass (output arg variants)
|
|
||||||
} else if typ.Cmp(out) != nil {
|
|
||||||
e := fmt.Errorf("type mismatch from `%s` (%s) to `%s` (%s) with arg `%s`", node1, out, node2, typ, arg)
|
|
||||||
err = errwrap.Append(err, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check for leftover function inputs which weren't filled up by outputs
|
|
||||||
// (we're trying to call a function with fewer input args than required)
|
|
||||||
for node, m := range expected { // map[*State]map[string]int
|
|
||||||
for arg, count := range m {
|
|
||||||
if count != 0 { // count should be zero if all were used
|
|
||||||
e := fmt.Errorf("missing input to `%s` on arg `%s`", node, arg)
|
|
||||||
err = errwrap.Append(err, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err // stage the errors so the user can fix many at once!
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run starts up this function engine and gets it all running. It errors if the
|
|
||||||
// startup failed for some reason. On success, use the Stream and Table methods
|
|
||||||
// for future interaction with the engine, and the Close method to shut it off.
|
|
||||||
func (obj *Engine) Run() error {
|
|
||||||
if len(obj.topologicalSort) == 0 { // no funcs to load!
|
|
||||||
close(obj.loadedChan)
|
|
||||||
close(obj.streamChan)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: build a timer that runs while we wait for all funcs to startup.
|
|
||||||
// after some delay print a message to tell us which funcs we're waiting
|
|
||||||
// for to startup and that they are slow and blocking everyone, and then
|
|
||||||
// fail permanently after the timeout so that bad code can't block this!
|
|
||||||
|
|
||||||
// loop through all funcs that we might need
|
|
||||||
obj.agAdd(len(obj.topologicalSort))
|
|
||||||
for _, vertex := range obj.topologicalSort {
|
|
||||||
node := obj.state[vertex]
|
|
||||||
if obj.Debug {
|
|
||||||
obj.SafeLogf("Startup func `%s`", node)
|
|
||||||
}
|
|
||||||
|
|
||||||
incoming := obj.Graph.IncomingGraphVertices(vertex) // []Vertex
|
|
||||||
|
|
||||||
init := &interfaces.Init{
|
|
||||||
Hostname: obj.Hostname,
|
|
||||||
Input: node.input,
|
|
||||||
Output: node.output,
|
|
||||||
World: obj.World,
|
|
||||||
Debug: obj.Debug,
|
|
||||||
Logf: func(format string, v ...interface{}) {
|
|
||||||
obj.Logf("func: "+format, v...)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if err := node.handle.Init(init); err != nil {
|
|
||||||
return errwrap.Wrapf(err, "could not init func `%s`", node)
|
|
||||||
}
|
|
||||||
node.init = true // we've successfully initialized
|
|
||||||
|
|
||||||
// no incoming edges, so no incoming data
|
|
||||||
if len(incoming) == 0 { // TODO: do this here or earlier?
|
|
||||||
close(node.input)
|
|
||||||
} else {
|
|
||||||
// process function input data
|
|
||||||
obj.wg.Add(1)
|
|
||||||
go func(vertex pgraph.Vertex) {
|
|
||||||
node := obj.state[vertex]
|
|
||||||
defer obj.wg.Done()
|
|
||||||
defer close(node.input)
|
|
||||||
var ready bool
|
|
||||||
// the final closing output to this, closes this
|
|
||||||
for range node.notify { // new input values
|
|
||||||
// now build the struct if we can...
|
|
||||||
|
|
||||||
ready = true // assume for now...
|
|
||||||
si := &types.Type{
|
|
||||||
// input to functions are structs
|
|
||||||
Kind: types.KindStruct,
|
|
||||||
Map: node.handle.Info().Sig.Map,
|
|
||||||
Ord: node.handle.Info().Sig.Ord,
|
|
||||||
}
|
|
||||||
st := types.NewStruct(si)
|
|
||||||
for _, v := range incoming {
|
|
||||||
args := obj.Graph.Adjacency()[v][vertex].(*interfaces.FuncEdge).Args
|
|
||||||
from := obj.state[v]
|
|
||||||
obj.mutex.RLock()
|
|
||||||
value, exists := obj.table[v]
|
|
||||||
obj.mutex.RUnlock()
|
|
||||||
if !exists {
|
|
||||||
ready = false // nope!
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// set each arg, since one value
|
|
||||||
// could get used for multiple
|
|
||||||
// function inputs (shared edge)
|
|
||||||
for _, arg := range args {
|
|
||||||
err := st.Set(arg, value) // populate struct
|
|
||||||
if err != nil {
|
|
||||||
panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v", node, from, err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !ready {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case node.input <- st: // send to function
|
|
||||||
|
|
||||||
case <-obj.closeChan:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(vertex)
|
|
||||||
}
|
|
||||||
|
|
||||||
obj.wg.Add(1)
|
|
||||||
go func(vertex pgraph.Vertex) { // run function
|
|
||||||
node := obj.state[vertex]
|
|
||||||
defer obj.wg.Done()
|
|
||||||
if obj.Debug {
|
|
||||||
obj.SafeLogf("Running func `%s`", node)
|
|
||||||
}
|
|
||||||
err := node.handle.Stream(node.ctx)
|
|
||||||
if obj.Debug {
|
|
||||||
obj.SafeLogf("Exiting func `%s`", node)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
// we closed with an error...
|
|
||||||
err := errwrap.Wrapf(err, "problem streaming func `%s`", node)
|
|
||||||
select {
|
|
||||||
case obj.ag <- err: // send to aggregate channel
|
|
||||||
|
|
||||||
case <-obj.closeChan:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(vertex)
|
|
||||||
|
|
||||||
obj.wg.Add(1)
|
|
||||||
go func(vertex pgraph.Vertex) { // process function output data
|
|
||||||
node := obj.state[vertex]
|
|
||||||
defer obj.wg.Done()
|
|
||||||
defer obj.agDone(vertex)
|
|
||||||
outgoing := obj.Graph.OutgoingGraphVertices(vertex) // []Vertex
|
|
||||||
for value := range node.output { // read from channel
|
|
||||||
obj.mutex.RLock()
|
|
||||||
cached, exists := obj.table[vertex]
|
|
||||||
obj.mutex.RUnlock()
|
|
||||||
if !exists { // first value received
|
|
||||||
// RACE: do this AFTER value is present!
|
|
||||||
//node.loaded = true // not yet please
|
|
||||||
obj.Logf("func `%s` started", node)
|
|
||||||
} else if value.Cmp(cached) == nil {
|
|
||||||
// skip if new value is same as previous
|
|
||||||
// if this happens often, it *might* be
|
|
||||||
// a bug in the function implementation
|
|
||||||
// FIXME: do we need to disable engine
|
|
||||||
// caching when using hysteresis?
|
|
||||||
obj.Logf("func `%s` skipped", node)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
obj.mutex.Lock()
|
|
||||||
// XXX: maybe we can get rid of the table...
|
|
||||||
obj.table[vertex] = value // save the latest
|
|
||||||
node.mutex.Lock()
|
|
||||||
//if err := node.Expr.SetValue(value); err != nil {
|
|
||||||
// node.mutex.Unlock() // don't block node.String()
|
|
||||||
// panic(fmt.Sprintf("could not set value for `%s`: %+v", node, err))
|
|
||||||
//}
|
|
||||||
node.loaded = true // set *after* value is in :)
|
|
||||||
obj.Logf("func `%s` changed", node)
|
|
||||||
node.mutex.Unlock()
|
|
||||||
obj.mutex.Unlock()
|
|
||||||
|
|
||||||
// FIXME: will this actually prevent glitching?
|
|
||||||
// if we only notify the aggregate channel when
|
|
||||||
// we're at the bottom of the topo sort (eg: no
|
|
||||||
// outgoing vertices to notify) then we'll have
|
|
||||||
// a glitch free subtree in the programs ast...
|
|
||||||
if obj.Glitch || len(outgoing) == 0 {
|
|
||||||
select {
|
|
||||||
case obj.ag <- nil: // send to aggregate channel
|
|
||||||
|
|
||||||
case <-obj.closeChan:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// notify the receiving vertices
|
|
||||||
for _, v := range outgoing {
|
|
||||||
node := obj.state[v]
|
|
||||||
select {
|
|
||||||
case node.notify <- struct{}{}:
|
|
||||||
|
|
||||||
case <-obj.closeChan:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// no more output values are coming...
|
|
||||||
obj.SafeLogf("func `%s` stopped", node)
|
|
||||||
|
|
||||||
// nodes that never loaded will cause the engine to hang
|
|
||||||
if !node.loaded {
|
|
||||||
select {
|
|
||||||
case obj.ag <- fmt.Errorf("func `%s` stopped before it was loaded", node):
|
|
||||||
case <-obj.closeChan:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(vertex)
|
|
||||||
}
|
|
||||||
|
|
||||||
// send event on streamChan when any of the (aggregated) facts change
|
|
||||||
obj.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer obj.wg.Done()
|
|
||||||
defer close(obj.streamChan)
|
|
||||||
Loop:
|
|
||||||
for {
|
|
||||||
var err error
|
|
||||||
var ok bool
|
|
||||||
select {
|
|
||||||
case err, ok = <-obj.ag: // aggregated channel
|
|
||||||
if !ok {
|
|
||||||
break Loop // channel shutdown
|
|
||||||
}
|
|
||||||
|
|
||||||
if !obj.loaded {
|
|
||||||
// now check if we're ready
|
|
||||||
var loaded = true // initially assume true
|
|
||||||
for _, vertex := range obj.topologicalSort {
|
|
||||||
node := obj.state[vertex]
|
|
||||||
node.mutex.RLock()
|
|
||||||
nodeLoaded := node.loaded
|
|
||||||
node.mutex.RUnlock()
|
|
||||||
if !nodeLoaded {
|
|
||||||
loaded = false // we were wrong
|
|
||||||
// TODO: add better "not loaded" reporting
|
|
||||||
if obj.Debug {
|
|
||||||
obj.Logf("not yet loaded: %s", node)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
obj.loaded = loaded
|
|
||||||
|
|
||||||
if obj.loaded {
|
|
||||||
// this causes an initial start
|
|
||||||
// signal to be sent out below,
|
|
||||||
// since the stream sender runs
|
|
||||||
if obj.Debug {
|
|
||||||
obj.Logf("loaded")
|
|
||||||
}
|
|
||||||
close(obj.loadedChan) // signal
|
|
||||||
} else {
|
|
||||||
if err == nil {
|
|
||||||
continue // not ready to send signal
|
|
||||||
} // pass errors through...
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-obj.closeChan:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// send stream signal
|
|
||||||
select {
|
|
||||||
// send events or errors on streamChan, eg: func failure
|
|
||||||
case obj.streamChan <- err: // send
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-obj.closeChan:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// agAdd registers a user on the ag channel.
|
|
||||||
func (obj *Engine) agAdd(i int) {
|
|
||||||
defer obj.agLock.Unlock()
|
|
||||||
obj.agLock.Lock()
|
|
||||||
obj.agCount += i
|
|
||||||
}
|
|
||||||
|
|
||||||
// agDone closes the channel if we're the last one using it.
|
|
||||||
func (obj *Engine) agDone(vertex pgraph.Vertex) {
|
|
||||||
defer obj.agLock.Unlock()
|
|
||||||
obj.agLock.Lock()
|
|
||||||
node := obj.state[vertex]
|
|
||||||
node.closed = true
|
|
||||||
|
|
||||||
// FIXME: (perf) cache this into a table which we narrow down with each
|
|
||||||
// successive call. look at the outgoing vertices that I would affect...
|
|
||||||
for _, v := range obj.Graph.OutgoingGraphVertices(vertex) { // close for each one
|
|
||||||
// now determine who provides inputs to that vertex...
|
|
||||||
var closed = true
|
|
||||||
for _, vv := range obj.Graph.IncomingGraphVertices(v) {
|
|
||||||
// are they all closed?
|
|
||||||
if !obj.state[vv].closed {
|
|
||||||
closed = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if closed { // if they're all closed, we can close the input
|
|
||||||
close(obj.state[v].notify)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if obj.agCount == 0 {
|
|
||||||
close(obj.ag)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lock takes a write lock on the data that gets written to the AST, so that
|
|
||||||
// interpret/SetValue can be run without anything changing part way through.
|
|
||||||
// XXX: This API is kind of yucky, but is related to us running .String() on the
|
|
||||||
// nodes. Maybe we can avoid this somehow?
|
|
||||||
func (obj *Engine) Lock() {
|
|
||||||
obj.mutex.Lock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unlock takes a write lock on the data that gets written to the AST, so that
|
|
||||||
// interpret/SetValue can be run without anything changing part way through.
|
|
||||||
// XXX: This API is kind of yucky, but is related to us running .String() on the
|
|
||||||
// nodes. Maybe we can avoid this somehow?
|
|
||||||
func (obj *Engine) Unlock() {
|
|
||||||
obj.mutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// RLock takes a read lock on the data that gets written to the AST, so that
|
|
||||||
// interpret can be run without anything changing part way through.
|
|
||||||
func (obj *Engine) RLock() {
|
|
||||||
obj.mutex.RLock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// RUnlock frees a read lock on the data that gets written to the AST, so that
|
|
||||||
// interpret can be run without anything changing part way through.
|
|
||||||
func (obj *Engine) RUnlock() {
|
|
||||||
obj.mutex.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// SafeLogf logs a message, although it adds a read lock around the logging in
|
|
||||||
// case a `node` argument is passed in which would set off the race detector.
|
|
||||||
func (obj *Engine) SafeLogf(format string, v ...interface{}) {
|
|
||||||
// We're adding a global mutex, because it's harder to only isolate the
|
|
||||||
// individual node specific mutexes needed since it may contain others!
|
|
||||||
if len(v) > 0 {
|
|
||||||
obj.mutex.RLock()
|
|
||||||
}
|
|
||||||
obj.Logf(format, v...)
|
|
||||||
if len(v) > 0 {
|
|
||||||
obj.mutex.RUnlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stream returns a channel of engine events. Wait for nil events to know when
|
|
||||||
// the Table map has changed. An error event means this will shutdown shortly.
|
|
||||||
// Do not run the Table function before we've received one non-error event.
|
|
||||||
func (obj *Engine) Stream() chan error {
|
|
||||||
return obj.streamChan
|
|
||||||
}
|
|
||||||
|
|
||||||
// Table returns a copy of the populated data table of values. We return a copy
|
|
||||||
// because since these values are constantly changing, we need an atomic
|
|
||||||
// snapshot to present to the consumer of this API.
|
|
||||||
// TODO: is this globally glitch consistent?
|
|
||||||
// TODO: do we need an API to return a single value? (wrapped in read locks)
|
|
||||||
func (obj *Engine) Table() map[pgraph.Vertex]types.Value {
|
|
||||||
obj.mutex.RLock()
|
|
||||||
defer obj.mutex.RUnlock()
|
|
||||||
table := make(map[pgraph.Vertex]types.Value)
|
|
||||||
for k, v := range obj.table {
|
|
||||||
//table[k] = v.Copy() // XXX: do we need to copy these values?
|
|
||||||
table[k] = v
|
|
||||||
}
|
|
||||||
return table
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close shuts down the function engine. It waits till everything has finished.
|
|
||||||
func (obj *Engine) Close() error {
|
|
||||||
var err error
|
|
||||||
for _, vertex := range obj.topologicalSort { // FIXME: should we do this in reverse?
|
|
||||||
node := obj.state[vertex]
|
|
||||||
node.cancel() // ctx
|
|
||||||
}
|
|
||||||
close(obj.closeChan)
|
|
||||||
obj.wg.Wait() // wait so that each func doesn't need to do this in close
|
|
||||||
obj.cancel() // free
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
@@ -1,86 +0,0 @@
|
|||||||
// Mgmt
|
|
||||||
// Copyright (C) 2013-2023+ James Shubin and the project contributors
|
|
||||||
// Written by James Shubin <james@shubin.ca> and the project contributors
|
|
||||||
//
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
//go:build !root
|
|
||||||
|
|
||||||
package facts
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/lang/funcs"
|
|
||||||
"github.com/purpleidea/mgmt/pgraph"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestFuncGraph0(t *testing.T) {
|
|
||||||
t.Logf("Hello!")
|
|
||||||
g, _ := pgraph.NewGraph("empty") // empty graph
|
|
||||||
|
|
||||||
obj := &funcs.Engine{
|
|
||||||
Graph: g,
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("Init...")
|
|
||||||
if err := obj.Init(); err != nil {
|
|
||||||
t.Errorf("could not init: %+v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("Validate...")
|
|
||||||
if err := obj.Validate(); err != nil {
|
|
||||||
t.Errorf("could not validate: %+v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("Run...")
|
|
||||||
if err := obj.Run(); err != nil {
|
|
||||||
t.Errorf("could not run: %+v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for some activity
|
|
||||||
t.Logf("Stream...")
|
|
||||||
stream := obj.Stream()
|
|
||||||
t.Logf("Loop...")
|
|
||||||
br := time.After(time.Duration(5) * time.Second)
|
|
||||||
Loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case err, ok := <-stream:
|
|
||||||
if !ok {
|
|
||||||
t.Logf("Stream break...")
|
|
||||||
break Loop
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
t.Logf("Error: %+v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-br:
|
|
||||||
t.Logf("Break...")
|
|
||||||
t.Errorf("empty graph should have closed stream")
|
|
||||||
break Loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("Closing...")
|
|
||||||
if err := obj.Close(); err != nil {
|
|
||||||
t.Errorf("could not close: %+v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user