lang: Core language and GAPI changes

These changes help plumb things in more easily for the lambdas work.
This commit is contained in:
James Shubin
2023-09-25 16:10:10 -04:00
parent 47d2a661bc
commit d4b1e8f1be
6 changed files with 261 additions and 1037 deletions

View File

@@ -21,13 +21,14 @@ package lang
import (
"bytes"
"context"
"fmt"
"sync"
"github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/lang/ast"
"github.com/purpleidea/mgmt/lang/funcs"
_ "github.com/purpleidea/mgmt/lang/funcs/core" // import so the funcs register
"github.com/purpleidea/mgmt/lang/funcs/dage"
"github.com/purpleidea/mgmt/lang/funcs/vars"
"github.com/purpleidea/mgmt/lang/inputs"
"github.com/purpleidea/mgmt/lang/interfaces"
@@ -62,30 +63,20 @@ type Lang struct {
Logf func(format string, v ...interface{})
ast interfaces.Stmt // store main prog AST here
funcs *funcs.Engine // function event engine
funcs *dage.Engine // function event engine
graph *pgraph.Graph // function graph
loadedChan chan struct{} // loaded signal
streamChan chan error // signals a new graph can be created or problem
streamChan <-chan error // signals a new graph can be created or problem
//streamBurst bool // should we try and be bursty with the stream events?
closeChan chan struct{} // close signal
wg *sync.WaitGroup
wg *sync.WaitGroup
}
// Init initializes the lang struct, and starts up the initial data sources.
// Init initializes the lang struct, and starts up the initial input parsing.
// NOTE: The trick is that we need to get the list of funcs to watch AND start
// watching them, *before* we pull their values, that way we'll know if they
// changed from the values we wanted.
func (obj *Lang) Init() error {
obj.loadedChan = make(chan struct{})
obj.streamChan = make(chan error)
obj.closeChan = make(chan struct{})
obj.wg = &sync.WaitGroup{}
once := &sync.Once{}
loadedSignal := func() { close(obj.loadedChan) } // only run once!
if obj.Debug {
obj.Logf("input: %s", obj.Input)
tree, err := util.FsTree(obj.Fs, "/") // should look like gapi
@@ -215,114 +206,120 @@ func (obj *Lang) Init() error {
obj.Logf("building function graph...")
// we assume that for some given code, the list of funcs doesn't change
// iow, we don't support variable, variables or absurd things like that
graph, err := obj.ast.Graph() // build the graph of functions
obj.graph = &pgraph.Graph{Name: "functionGraph"}
env := make(map[string]interfaces.Func)
for k, v := range scope.Variables {
g, builtinFunc, err := v.Graph(nil)
if err != nil {
return errwrap.Wrapf(err, "calling Graph on builtins")
}
obj.graph.AddGraph(g)
env[k] = builtinFunc
}
g, err := obj.ast.Graph() // build the graph of functions
if err != nil {
return errwrap.Wrapf(err, "could not generate function graph")
}
obj.graph.AddGraph(g)
if obj.Debug {
obj.Logf("function graph: %+v", graph)
graph.Logf(obj.Logf) // log graph output with this logger...
obj.Logf("function graph: %+v", obj.graph)
obj.graph.Logf(obj.Logf) // log graph output with this logger...
//if err := obj.graph.ExecGraphviz("/tmp/graphviz.dot"); err != nil {
// return errwrap.Wrapf(err, "writing graph failed")
//}
}
if graph.NumVertices() == 0 { // no funcs to load!
// send only one signal since we won't ever send after this!
obj.Logf("static graph found")
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
defer close(obj.streamChan) // no more events are coming!
close(obj.loadedChan) // signal
select {
case obj.streamChan <- nil: // send one signal
// pass
case <-obj.closeChan:
return
}
}()
return nil // exit early, no funcs to load!
}
obj.funcs = &funcs.Engine{
Graph: graph, // not the same as the output graph!
obj.funcs = &dage.Engine{
Name: "lang", // TODO: arbitrary name for now
Hostname: obj.Hostname,
World: obj.World,
Debug: obj.Debug,
Logf: func(format string, v ...interface{}) {
obj.Logf("funcs: "+format, v...)
},
Glitch: false, // FIXME: verify this functionality is perfect!
}
obj.Logf("function engine initializing...")
if err := obj.funcs.Init(); err != nil {
if err := obj.funcs.Setup(); err != nil {
return errwrap.Wrapf(err, "init error with func engine")
}
obj.Logf("function engine validating...")
if err := obj.funcs.Validate(); err != nil {
return errwrap.Wrapf(err, "validate error with func engine")
}
obj.streamChan = obj.funcs.Stream() // after obj.funcs.Setup runs
return nil
}
// Run kicks off the function engine. Use the context to shut it down.
func (obj *Lang) Run(ctx context.Context) (reterr error) {
wg := &sync.WaitGroup{}
defer wg.Wait()
runCtx, cancel := context.WithCancel(context.Background()) // Don't inherit from parent
defer cancel()
//obj.Logf("function engine validating...")
//if err := obj.funcs.Validate(); err != nil {
// return errwrap.Wrapf(err, "validate error with func engine")
//}
obj.Logf("function engine starting...")
// On failure, we expect the caller to run Close() to shutdown all of
// the currently initialized (and running) funcs... This is needed if
// we successfully ran `Run` but isn't needed only for Init/Validate.
if err := obj.funcs.Run(); err != nil {
return errwrap.Wrapf(err, "run error with func engine")
wg.Add(1)
go func() {
defer wg.Done()
if err := obj.funcs.Run(runCtx); err == nil {
reterr = errwrap.Append(reterr, err)
}
// Run() should only error if not a dag I think...
}()
<-obj.funcs.Started() // wait for startup (will not block forever)
// Sanity checks for graph size.
if count := obj.funcs.NumVertices(); count != 0 {
return fmt.Errorf("expected empty graph on start, got %d vertices", count)
}
defer func() {
if count := obj.funcs.NumVertices(); count != 0 {
err := fmt.Errorf("expected empty graph on exit, got %d vertices", count)
reterr = errwrap.Append(reterr, err)
}
}()
defer wg.Wait()
defer cancel() // now cancel Run only after Reverse and Free are done!
txn := obj.funcs.Txn()
defer txn.Free() // remember to call Free()
txn.AddGraph(obj.graph)
if err := txn.Commit(); err != nil {
return errwrap.Wrapf(err, "error adding to function graph engine")
}
defer func() {
if err := txn.Reverse(); err != nil { // should remove everything we added
reterr = errwrap.Append(reterr, err)
}
}()
// wait for some activity
obj.Logf("stream...")
stream := obj.funcs.Stream()
obj.wg.Add(1)
go func() {
obj.Logf("loop...")
defer obj.wg.Done()
defer close(obj.streamChan) // no more events are coming!
for {
var err error
var ok bool
select {
case err, ok = <-stream:
if !ok {
obj.Logf("stream closed")
return
}
if err == nil {
// only do this once, on the first event
once.Do(loadedSignal) // signal
}
case <-obj.closeChan:
return
}
select {
case <-ctx.Done():
}
select {
case obj.streamChan <- err: // send
if err != nil {
obj.Logf("stream error: %+v", err)
return
}
case <-obj.closeChan:
return
}
}
}()
return nil
}
// Stream returns a channel of graph change requests or errors. These are
// usually sent when a func output changes.
func (obj *Lang) Stream() chan error {
func (obj *Lang) Stream() <-chan error {
return obj.streamChan
}
// Interpret runs the interpreter and returns a graph and corresponding error.
func (obj *Lang) Interpret() (*pgraph.Graph, error) {
select {
case <-obj.loadedChan: // funcs are now loaded!
case <-obj.funcs.Loaded(): // funcs are now loaded!
// pass
default:
// if this is hit, someone probably called this too early!
@@ -332,37 +329,9 @@ func (obj *Lang) Interpret() (*pgraph.Graph, error) {
obj.Logf("running interpret...")
table := obj.funcs.Table() // map[pgraph.Vertex]types.Value
fn := func(n interfaces.Node) error {
expr, ok := n.(interfaces.Expr)
if !ok {
return nil
}
v, ok := expr.(pgraph.Vertex)
if !ok {
panic("programming error in interfaces.Expr -> pgraph.Vertex lookup")
}
val, exists := table[v]
if !exists {
fmt.Printf("XXX: missing value in table is pointer: %p\n", v)
return nil // XXX: workaround for now...
//return fmt.Errorf("missing value in table for: %s", v)
}
return expr.SetValue(val) // set the value
}
obj.funcs.Lock() // XXX: apparently there are races between SetValue and reading obj.V values...
if err := obj.ast.Apply(fn); err != nil {
if obj.Debug {
for k, v := range table {
obj.Logf("table: key: %+v ; value: %+v", k, v)
}
}
obj.funcs.Unlock()
return nil, err
}
obj.funcs.Unlock()
// this call returns the graph
graph, err := interpret.Interpret(obj.ast)
graph, err := interpret.Interpret(obj.ast, table)
if err != nil {
return nil, errwrap.Wrapf(err, "could not interpret")
}
@@ -370,14 +339,7 @@ func (obj *Lang) Interpret() (*pgraph.Graph, error) {
return graph, nil // return a graph
}
// Close shuts down the lang struct and causes all the funcs to shutdown. It
// must be called when finished after any successful Init ran.
func (obj *Lang) Close() error {
var err error
if obj.funcs != nil {
err = obj.funcs.Close()
}
close(obj.closeChan)
obj.wg.Wait()
return err
// Cleanup cleans up and frees memory and resources after everything is done.
func (obj *Lang) Cleanup() error {
return obj.funcs.Cleanup()
}