From b27b809ea994deca49733276a25276f592c5f12b Mon Sep 17 00:00:00 2001 From: James Shubin Date: Tue, 8 Aug 2023 23:02:21 -0400 Subject: [PATCH] lang: funcs: Remove old engine code This gets rid of the old engine implementation which can only run a static graph. We'll need a changing graph for lambdas. --- lang/funcs/engine.go | 697 ---------------------------------- lang/funcs/facts/func_test.go | 86 ----- 2 files changed, 783 deletions(-) delete mode 100644 lang/funcs/engine.go delete mode 100644 lang/funcs/facts/func_test.go diff --git a/lang/funcs/engine.go b/lang/funcs/engine.go deleted file mode 100644 index c8819e12..00000000 --- a/lang/funcs/engine.go +++ /dev/null @@ -1,697 +0,0 @@ -// Mgmt -// Copyright (C) 2013-2023+ James Shubin and the project contributors -// Written by James Shubin and the project contributors -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package funcs - -import ( - "context" - "fmt" - "sync" - - "github.com/purpleidea/mgmt/engine" - "github.com/purpleidea/mgmt/lang/interfaces" - "github.com/purpleidea/mgmt/lang/types" - "github.com/purpleidea/mgmt/pgraph" - "github.com/purpleidea/mgmt/util/errwrap" -) - -// State represents the state of a function vertex. This corresponds to an AST -// expr, which is the memory address (pointer) in the graph. -type State struct { - Expr interfaces.Expr // pointer to the expr vertex - - handle interfaces.Func // the function (if not nil, we've found it on init) - - ctx context.Context // used for shutting down each Stream function. - cancel context.CancelFunc - - init bool // have we run Init on our func? - ready bool // has it received all the args it needs at least once? - loaded bool // has the func run at least once ? - closed bool // did we close ourself down? - - notify chan struct{} // ping here when new input values exist - - input chan types.Value // the top level type must be a struct - output chan types.Value - - mutex *sync.RWMutex // concurrency guard for modifying Expr with String/SetValue -} - -// Init creates the function state if it can be found in the registered list. -func (obj *State) Init() error { - handle, err := obj.Expr.Func() // build one and store it, don't re-gen - if err != nil { - return err - } - if err := handle.Validate(); err != nil { - return errwrap.Wrapf(err, "could not validate func") - } - obj.handle = handle - - sig := obj.handle.Info().Sig - if sig.Kind != types.KindFunc { - return fmt.Errorf("must be kind func") - } - if len(sig.Ord) > 0 { - // since we accept input, better get our notification chan built - obj.notify = make(chan struct{}) - } - - obj.input = make(chan types.Value) // we close this when we're done - obj.output = make(chan types.Value) // we create it, func closes it - - obj.mutex = &sync.RWMutex{} - - return nil -} - -// String satisfies fmt.Stringer so that these print nicely. -func (obj *State) String() string { - // TODO: use global mutex since it's harder to add state specific mutex - //obj.mutex.RLock() // prevent race detector issues against SetValue - //defer obj.mutex.RUnlock() - // FIXME: also add read locks on any of the children Expr in obj.Expr - return obj.Expr.String() -} - -// Engine represents the running time varying directed acyclic function graph. -type Engine struct { - Graph *pgraph.Graph - Hostname string - World engine.World - Debug bool - Logf func(format string, v ...interface{}) - - // Glitch: https://en.wikipedia.org/wiki/Reactive_programming#Glitches - Glitch bool // allow glitching? (more responsive, but less accurate) - - ag chan error // used to aggregate fact events without reflect - agLock *sync.Mutex - agCount int // last one turns out the light (closes the ag channel) - - topologicalSort []pgraph.Vertex // cached sorting of the graph for perf - - state map[pgraph.Vertex]*State // state associated with the vertex - - mutex *sync.RWMutex // concurrency guard for the table map - table map[pgraph.Vertex]types.Value // live table of output values - - loaded bool // are all of the funcs loaded? - loadedChan chan struct{} // funcs loaded signal - - streamChan chan error // signals a new graph can be created or problem - - ctx context.Context // used for shutting down each Stream function. - cancel context.CancelFunc - - closeChan chan struct{} // close signal - wg *sync.WaitGroup -} - -// Init initializes the struct. This is the first call you must make. Do not -// proceed with calls to other methods unless this succeeds first. This also -// loads all the functions by calling Init on each one in the graph. -// TODO: should Init take the graph as an input arg to keep it as a private -// field? -func (obj *Engine) Init() error { - obj.ag = make(chan error) - obj.agLock = &sync.Mutex{} - obj.state = make(map[pgraph.Vertex]*State) - obj.mutex = &sync.RWMutex{} - obj.table = make(map[pgraph.Vertex]types.Value) - obj.loadedChan = make(chan struct{}) - obj.streamChan = make(chan error) - obj.closeChan = make(chan struct{}) - obj.wg = &sync.WaitGroup{} - topologicalSort, err := obj.Graph.TopologicalSort() - if err != nil { - return errwrap.Wrapf(err, "topo sort failed") - } - obj.topologicalSort = topologicalSort // cache the result - - obj.ctx, obj.cancel = context.WithCancel(context.Background()) // top - - for _, vertex := range obj.Graph.Vertices() { - // is this an interface we can use? - if _, exists := obj.state[vertex]; exists { - return fmt.Errorf("vertex (%+v) is not unique in the graph", vertex) - } - - expr, ok := vertex.(interfaces.Expr) - if !ok { - return fmt.Errorf("vertex (%+v) was not an expr", vertex) - } - - if obj.Debug { - obj.Logf("Loading func `%s`", vertex) - } - - innerCtx, innerCancel := context.WithCancel(obj.ctx) - obj.state[vertex] = &State{ - Expr: expr, - - ctx: innerCtx, - cancel: innerCancel, - } // store some state! - - e1 := obj.state[vertex].Init() - e2 := errwrap.Wrapf(e1, "error loading func `%s`", vertex) - err = errwrap.Append(err, e2) // list of errors - } - if err != nil { // usually due to `not found` errors - return errwrap.Wrapf(err, "could not load requested funcs") - } - return nil -} - -// Validate the graph type checks properly and other tests. Must run Init first. -// This should check that: (1) all vertices have the correct number of inputs, -// (2) that the *Info signatures all match correctly, (3) that the argument -// names match correctly, and that the whole graph is statically correct. -func (obj *Engine) Validate() error { - inList := func(needle interfaces.Func, haystack []interfaces.Func) bool { - if needle == nil { - panic("nil value passed to inList") // catch bugs! - } - for _, x := range haystack { - if needle == x { - return true - } - } - return false - } - var err error - ptrs := []interfaces.Func{} // Func is a ptr - for _, vertex := range obj.Graph.Vertices() { - node := obj.state[vertex] - // TODO: this doesn't work for facts because they're in the Func - // duplicate pointers would get closed twice, causing a panic... - if inList(node.handle, ptrs) { // check for duplicate ptrs! - e := fmt.Errorf("vertex `%s` has duplicate ptr", vertex) - err = errwrap.Append(err, e) - } - ptrs = append(ptrs, node.handle) - } - for _, edge := range obj.Graph.Edges() { - if _, ok := edge.(*interfaces.FuncEdge); !ok { - e := fmt.Errorf("edge `%s` was not the correct type", edge) - err = errwrap.Append(err, e) - } - } - if err != nil { - return err // stage the errors so the user can fix many at once! - } - - // check if vertices expecting inputs have them - for vertex, count := range obj.Graph.InDegree() { - node := obj.state[vertex] - if exp := len(node.handle.Info().Sig.Ord); exp != count { - e := fmt.Errorf("expected %d inputs to `%s`, got %d", exp, node, count) - if obj.Debug { - obj.Logf("expected %d inputs to `%s`, got %d", exp, node, count) - obj.Logf("expected: %+v for `%s`", node.handle.Info().Sig.Ord, node) - } - err = errwrap.Append(err, e) - } - } - - // expected vertex -> argName - expected := make(map[*State]map[string]int) // expected input fields - for vertex1 := range obj.Graph.Adjacency() { - // check for outputs that don't go anywhere? - //node1 := obj.state[vertex1] - //if len(obj.Graph.Adjacency()[vertex1]) == 0 { // no vertex1 -> vertex2 - // if node1.handle.Info().Sig.Output != nil { - // // an output value goes nowhere... - // } - //} - for vertex2 := range obj.Graph.Adjacency()[vertex1] { // populate - node2 := obj.state[vertex2] - expected[node2] = make(map[string]int) - for _, key := range node2.handle.Info().Sig.Ord { - expected[node2][key] = 1 - } - } - } - - for vertex1 := range obj.Graph.Adjacency() { - node1 := obj.state[vertex1] - for vertex2, edge := range obj.Graph.Adjacency()[vertex1] { - node2 := obj.state[vertex2] - edge := edge.(*interfaces.FuncEdge) - // check vertex1 -> vertex2 (with e) is valid - - for _, arg := range edge.Args { // loop over each arg - sig := node2.handle.Info().Sig - if len(sig.Ord) == 0 { - e := fmt.Errorf("no input expected from `%s` to `%s` with arg `%s`", node1, node2, arg) - err = errwrap.Append(err, e) - continue - } - - if count, exists := expected[node2][arg]; !exists { - e := fmt.Errorf("wrong input name from `%s` to `%s` with arg `%s`", node1, node2, arg) - err = errwrap.Append(err, e) - } else if count == 0 { - e := fmt.Errorf("duplicate input from `%s` to `%s` with arg `%s`", node1, node2, arg) - err = errwrap.Append(err, e) - } - expected[node2][arg]-- // subtract one use - - out := node1.handle.Info().Sig.Out - if out == nil { - e := fmt.Errorf("no output possible from `%s` to `%s` with arg `%s`", node1, node2, arg) - err = errwrap.Append(err, e) - continue - } - typ, exists := sig.Map[arg] // key in struct - if !exists { - // second check of this! - e := fmt.Errorf("wrong input name from `%s` to `%s` with arg `%s`", node1, node2, arg) - err = errwrap.Append(err, errwrap.Wrapf(e, "programming error")) - continue - } - - if typ.Kind == types.KindVariant { // FIXME: hack for now - // pass (input arg variants) - } else if out.Kind == types.KindVariant { // FIXME: hack for now - // pass (output arg variants) - } else if typ.Cmp(out) != nil { - e := fmt.Errorf("type mismatch from `%s` (%s) to `%s` (%s) with arg `%s`", node1, out, node2, typ, arg) - err = errwrap.Append(err, e) - } - } - } - } - - // check for leftover function inputs which weren't filled up by outputs - // (we're trying to call a function with fewer input args than required) - for node, m := range expected { // map[*State]map[string]int - for arg, count := range m { - if count != 0 { // count should be zero if all were used - e := fmt.Errorf("missing input to `%s` on arg `%s`", node, arg) - err = errwrap.Append(err, e) - } - } - } - - if err != nil { - return err // stage the errors so the user can fix many at once! - } - - return nil -} - -// Run starts up this function engine and gets it all running. It errors if the -// startup failed for some reason. On success, use the Stream and Table methods -// for future interaction with the engine, and the Close method to shut it off. -func (obj *Engine) Run() error { - if len(obj.topologicalSort) == 0 { // no funcs to load! - close(obj.loadedChan) - close(obj.streamChan) - return nil - } - - // TODO: build a timer that runs while we wait for all funcs to startup. - // after some delay print a message to tell us which funcs we're waiting - // for to startup and that they are slow and blocking everyone, and then - // fail permanently after the timeout so that bad code can't block this! - - // loop through all funcs that we might need - obj.agAdd(len(obj.topologicalSort)) - for _, vertex := range obj.topologicalSort { - node := obj.state[vertex] - if obj.Debug { - obj.SafeLogf("Startup func `%s`", node) - } - - incoming := obj.Graph.IncomingGraphVertices(vertex) // []Vertex - - init := &interfaces.Init{ - Hostname: obj.Hostname, - Input: node.input, - Output: node.output, - World: obj.World, - Debug: obj.Debug, - Logf: func(format string, v ...interface{}) { - obj.Logf("func: "+format, v...) - }, - } - if err := node.handle.Init(init); err != nil { - return errwrap.Wrapf(err, "could not init func `%s`", node) - } - node.init = true // we've successfully initialized - - // no incoming edges, so no incoming data - if len(incoming) == 0 { // TODO: do this here or earlier? - close(node.input) - } else { - // process function input data - obj.wg.Add(1) - go func(vertex pgraph.Vertex) { - node := obj.state[vertex] - defer obj.wg.Done() - defer close(node.input) - var ready bool - // the final closing output to this, closes this - for range node.notify { // new input values - // now build the struct if we can... - - ready = true // assume for now... - si := &types.Type{ - // input to functions are structs - Kind: types.KindStruct, - Map: node.handle.Info().Sig.Map, - Ord: node.handle.Info().Sig.Ord, - } - st := types.NewStruct(si) - for _, v := range incoming { - args := obj.Graph.Adjacency()[v][vertex].(*interfaces.FuncEdge).Args - from := obj.state[v] - obj.mutex.RLock() - value, exists := obj.table[v] - obj.mutex.RUnlock() - if !exists { - ready = false // nope! - break - } - - // set each arg, since one value - // could get used for multiple - // function inputs (shared edge) - for _, arg := range args { - err := st.Set(arg, value) // populate struct - if err != nil { - panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v", node, from, err)) - } - } - } - if !ready { - continue - } - - select { - case node.input <- st: // send to function - - case <-obj.closeChan: - return - } - } - }(vertex) - } - - obj.wg.Add(1) - go func(vertex pgraph.Vertex) { // run function - node := obj.state[vertex] - defer obj.wg.Done() - if obj.Debug { - obj.SafeLogf("Running func `%s`", node) - } - err := node.handle.Stream(node.ctx) - if obj.Debug { - obj.SafeLogf("Exiting func `%s`", node) - } - if err != nil { - // we closed with an error... - err := errwrap.Wrapf(err, "problem streaming func `%s`", node) - select { - case obj.ag <- err: // send to aggregate channel - - case <-obj.closeChan: - return - } - } - }(vertex) - - obj.wg.Add(1) - go func(vertex pgraph.Vertex) { // process function output data - node := obj.state[vertex] - defer obj.wg.Done() - defer obj.agDone(vertex) - outgoing := obj.Graph.OutgoingGraphVertices(vertex) // []Vertex - for value := range node.output { // read from channel - obj.mutex.RLock() - cached, exists := obj.table[vertex] - obj.mutex.RUnlock() - if !exists { // first value received - // RACE: do this AFTER value is present! - //node.loaded = true // not yet please - obj.Logf("func `%s` started", node) - } else if value.Cmp(cached) == nil { - // skip if new value is same as previous - // if this happens often, it *might* be - // a bug in the function implementation - // FIXME: do we need to disable engine - // caching when using hysteresis? - obj.Logf("func `%s` skipped", node) - continue - } - obj.mutex.Lock() - // XXX: maybe we can get rid of the table... - obj.table[vertex] = value // save the latest - node.mutex.Lock() - //if err := node.Expr.SetValue(value); err != nil { - // node.mutex.Unlock() // don't block node.String() - // panic(fmt.Sprintf("could not set value for `%s`: %+v", node, err)) - //} - node.loaded = true // set *after* value is in :) - obj.Logf("func `%s` changed", node) - node.mutex.Unlock() - obj.mutex.Unlock() - - // FIXME: will this actually prevent glitching? - // if we only notify the aggregate channel when - // we're at the bottom of the topo sort (eg: no - // outgoing vertices to notify) then we'll have - // a glitch free subtree in the programs ast... - if obj.Glitch || len(outgoing) == 0 { - select { - case obj.ag <- nil: // send to aggregate channel - - case <-obj.closeChan: - return - } - } - - // notify the receiving vertices - for _, v := range outgoing { - node := obj.state[v] - select { - case node.notify <- struct{}{}: - - case <-obj.closeChan: - return - } - } - } - // no more output values are coming... - obj.SafeLogf("func `%s` stopped", node) - - // nodes that never loaded will cause the engine to hang - if !node.loaded { - select { - case obj.ag <- fmt.Errorf("func `%s` stopped before it was loaded", node): - case <-obj.closeChan: - return - } - } - }(vertex) - } - - // send event on streamChan when any of the (aggregated) facts change - obj.wg.Add(1) - go func() { - defer obj.wg.Done() - defer close(obj.streamChan) - Loop: - for { - var err error - var ok bool - select { - case err, ok = <-obj.ag: // aggregated channel - if !ok { - break Loop // channel shutdown - } - - if !obj.loaded { - // now check if we're ready - var loaded = true // initially assume true - for _, vertex := range obj.topologicalSort { - node := obj.state[vertex] - node.mutex.RLock() - nodeLoaded := node.loaded - node.mutex.RUnlock() - if !nodeLoaded { - loaded = false // we were wrong - // TODO: add better "not loaded" reporting - if obj.Debug { - obj.Logf("not yet loaded: %s", node) - } - break - } - } - obj.loaded = loaded - - if obj.loaded { - // this causes an initial start - // signal to be sent out below, - // since the stream sender runs - if obj.Debug { - obj.Logf("loaded") - } - close(obj.loadedChan) // signal - } else { - if err == nil { - continue // not ready to send signal - } // pass errors through... - } - } - - case <-obj.closeChan: - return - } - - // send stream signal - select { - // send events or errors on streamChan, eg: func failure - case obj.streamChan <- err: // send - if err != nil { - return - } - case <-obj.closeChan: - return - } - } - }() - - return nil -} - -// agAdd registers a user on the ag channel. -func (obj *Engine) agAdd(i int) { - defer obj.agLock.Unlock() - obj.agLock.Lock() - obj.agCount += i -} - -// agDone closes the channel if we're the last one using it. -func (obj *Engine) agDone(vertex pgraph.Vertex) { - defer obj.agLock.Unlock() - obj.agLock.Lock() - node := obj.state[vertex] - node.closed = true - - // FIXME: (perf) cache this into a table which we narrow down with each - // successive call. look at the outgoing vertices that I would affect... - for _, v := range obj.Graph.OutgoingGraphVertices(vertex) { // close for each one - // now determine who provides inputs to that vertex... - var closed = true - for _, vv := range obj.Graph.IncomingGraphVertices(v) { - // are they all closed? - if !obj.state[vv].closed { - closed = false - break - } - } - if closed { // if they're all closed, we can close the input - close(obj.state[v].notify) - } - } - - if obj.agCount == 0 { - close(obj.ag) - } -} - -// Lock takes a write lock on the data that gets written to the AST, so that -// interpret/SetValue can be run without anything changing part way through. -// XXX: This API is kind of yucky, but is related to us running .String() on the -// nodes. Maybe we can avoid this somehow? -func (obj *Engine) Lock() { - obj.mutex.Lock() -} - -// Unlock takes a write lock on the data that gets written to the AST, so that -// interpret/SetValue can be run without anything changing part way through. -// XXX: This API is kind of yucky, but is related to us running .String() on the -// nodes. Maybe we can avoid this somehow? -func (obj *Engine) Unlock() { - obj.mutex.Unlock() -} - -// RLock takes a read lock on the data that gets written to the AST, so that -// interpret can be run without anything changing part way through. -func (obj *Engine) RLock() { - obj.mutex.RLock() -} - -// RUnlock frees a read lock on the data that gets written to the AST, so that -// interpret can be run without anything changing part way through. -func (obj *Engine) RUnlock() { - obj.mutex.RUnlock() -} - -// SafeLogf logs a message, although it adds a read lock around the logging in -// case a `node` argument is passed in which would set off the race detector. -func (obj *Engine) SafeLogf(format string, v ...interface{}) { - // We're adding a global mutex, because it's harder to only isolate the - // individual node specific mutexes needed since it may contain others! - if len(v) > 0 { - obj.mutex.RLock() - } - obj.Logf(format, v...) - if len(v) > 0 { - obj.mutex.RUnlock() - } -} - -// Stream returns a channel of engine events. Wait for nil events to know when -// the Table map has changed. An error event means this will shutdown shortly. -// Do not run the Table function before we've received one non-error event. -func (obj *Engine) Stream() chan error { - return obj.streamChan -} - -// Table returns a copy of the populated data table of values. We return a copy -// because since these values are constantly changing, we need an atomic -// snapshot to present to the consumer of this API. -// TODO: is this globally glitch consistent? -// TODO: do we need an API to return a single value? (wrapped in read locks) -func (obj *Engine) Table() map[pgraph.Vertex]types.Value { - obj.mutex.RLock() - defer obj.mutex.RUnlock() - table := make(map[pgraph.Vertex]types.Value) - for k, v := range obj.table { - //table[k] = v.Copy() // XXX: do we need to copy these values? - table[k] = v - } - return table -} - -// Close shuts down the function engine. It waits till everything has finished. -func (obj *Engine) Close() error { - var err error - for _, vertex := range obj.topologicalSort { // FIXME: should we do this in reverse? - node := obj.state[vertex] - node.cancel() // ctx - } - close(obj.closeChan) - obj.wg.Wait() // wait so that each func doesn't need to do this in close - obj.cancel() // free - return err -} diff --git a/lang/funcs/facts/func_test.go b/lang/funcs/facts/func_test.go deleted file mode 100644 index ced797e4..00000000 --- a/lang/funcs/facts/func_test.go +++ /dev/null @@ -1,86 +0,0 @@ -// Mgmt -// Copyright (C) 2013-2023+ James Shubin and the project contributors -// Written by James Shubin and the project contributors -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//go:build !root - -package facts - -import ( - "testing" - "time" - - "github.com/purpleidea/mgmt/lang/funcs" - "github.com/purpleidea/mgmt/pgraph" -) - -func TestFuncGraph0(t *testing.T) { - t.Logf("Hello!") - g, _ := pgraph.NewGraph("empty") // empty graph - - obj := &funcs.Engine{ - Graph: g, - } - - t.Logf("Init...") - if err := obj.Init(); err != nil { - t.Errorf("could not init: %+v", err) - return - } - - t.Logf("Validate...") - if err := obj.Validate(); err != nil { - t.Errorf("could not validate: %+v", err) - return - } - - t.Logf("Run...") - if err := obj.Run(); err != nil { - t.Errorf("could not run: %+v", err) - return - } - - // wait for some activity - t.Logf("Stream...") - stream := obj.Stream() - t.Logf("Loop...") - br := time.After(time.Duration(5) * time.Second) -Loop: - for { - select { - case err, ok := <-stream: - if !ok { - t.Logf("Stream break...") - break Loop - } - if err != nil { - t.Logf("Error: %+v", err) - continue - } - - case <-br: - t.Logf("Break...") - t.Errorf("empty graph should have closed stream") - break Loop - } - } - - t.Logf("Closing...") - if err := obj.Close(); err != nil { - t.Errorf("could not close: %+v", err) - return - } -}