diff --git a/gapi/gapi.go b/gapi/gapi.go index fbadc9c4..c10d6ff7 100644 --- a/gapi/gapi.go +++ b/gapi/gapi.go @@ -119,11 +119,13 @@ type GAPI interface { // Next returns a stream of switch events. The engine will run Graph() // to build a new graph after every Next event. + // TODO: add context for shutting down to the input and change Close to Cleanup Next() chan Next // Close shuts down the GAPI. It asks the GAPI to close, and must cause // Next() to unblock even if is currently blocked and waiting to send a // new event. + // TODO: change Close to Cleanup Close() error } diff --git a/lang/gapi/gapi.go b/lang/gapi/gapi.go index 1f840c2c..1b975158 100644 --- a/lang/gapi/gapi.go +++ b/lang/gapi/gapi.go @@ -20,6 +20,7 @@ package gapi import ( "bytes" + "context" "fmt" "strings" "sync" @@ -61,7 +62,11 @@ func init() { type GAPI struct { InputURI string // input URI of code file system to run - lang *lang.Lang // lang struct + lang *lang.Lang // lang struct + wgRun *sync.WaitGroup + ctx context.Context + cancel func() + reterr error // this data struct is only available *after* Init, so as a result, it // can not be used inside the Cli(...) method. @@ -472,13 +477,27 @@ func (obj *GAPI) LangInit() error { if err := obj.lang.Init(); err != nil { return errwrap.Wrapf(err, "can't init the lang") } + + // XXX: I'm certain I've probably got a deadlock or race somewhere here + // or in lib/main.go so we'll fix it with an API fixup and rewrite soon + obj.wgRun = &sync.WaitGroup{} + obj.ctx, obj.cancel = context.WithCancel(context.Background()) + obj.wgRun.Add(1) + go func() { + defer obj.wgRun.Done() + obj.reterr = obj.lang.Run(obj.ctx) + }() + return nil } // LangClose is a wrapper around the lang Close method. func (obj *GAPI) LangClose() error { if obj.lang != nil { - err := obj.lang.Close() + obj.cancel() + obj.wgRun.Wait() + err := obj.lang.Cleanup() + err = errwrap.Append(err, obj.reterr) // from obj.lang.Run obj.lang = nil // clear it to avoid double closing return errwrap.Wrapf(err, "can't close the lang") // nil passthrough } @@ -520,7 +539,7 @@ func (obj *GAPI) Next() chan gapi.Next { startChan := make(chan struct{}) // start signal close(startChan) // kick it off! - streamChan := make(chan error) + streamChan := make(<-chan error) //defer obj.LangClose() // close any old lang var ok bool @@ -545,6 +564,7 @@ func (obj *GAPI) Next() chan gapi.Next { obj.data.Logf("generating new graph...") // skip this to pass through the err if present + // XXX: redo this old garbage code if langSwap && err == nil { obj.data.Logf("swap!") // run up to these three but fail on err diff --git a/lang/interpret_test.go b/lang/interpret_test.go index 2cdc3c3f..26189bda 100644 --- a/lang/interpret_test.go +++ b/lang/interpret_test.go @@ -21,21 +21,21 @@ package lang import ( "bytes" + "context" "fmt" "io/ioutil" "os" "path/filepath" "sort" "strings" + "sync" "testing" "time" - "github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine/graph/autoedge" - "github.com/purpleidea/mgmt/engine/resources" "github.com/purpleidea/mgmt/etcd" "github.com/purpleidea/mgmt/lang/ast" - "github.com/purpleidea/mgmt/lang/funcs" + "github.com/purpleidea/mgmt/lang/funcs/dage" "github.com/purpleidea/mgmt/lang/funcs/vars" "github.com/purpleidea/mgmt/lang/inputs" "github.com/purpleidea/mgmt/lang/interfaces" @@ -55,503 +55,6 @@ const ( runGraphviz = false // run graphviz in tests? ) -func vertexAstCmpFn(v1, v2 pgraph.Vertex) (bool, error) { - //fmt.Printf("V1: %T %+v\n", v1, v1) - //node := v1.(*funcs.Node) - //fmt.Printf("node: %T %+v\n", node, node) - //fmt.Printf("V2: %T %+v\n", v2, v2) - if v1.String() == "" || v2.String() == "" { - return false, fmt.Errorf("oops, empty vertex") - } - return v1.String() == v2.String(), nil -} - -func edgeAstCmpFn(e1, e2 pgraph.Edge) (bool, error) { - if e1.String() == "" || e2.String() == "" { - return false, fmt.Errorf("oops, empty edge") - } - return e1.String() == e2.String(), nil -} - -type vtex string - -func (obj *vtex) String() string { - return string(*obj) -} - -type edge string - -func (obj *edge) String() string { - return string(*obj) -} - -func TestAstFunc0(t *testing.T) { - scope := &interfaces.Scope{ // global scope - Variables: map[string]interfaces.Expr{ - "hello": &ast.ExprStr{V: "world"}, - "answer": &ast.ExprInt{V: 42}, - }, - // all the built-in top-level, core functions enter here... - Functions: ast.FuncPrefixToFunctionsScope(""), // runs funcs.LookupPrefix - } - - type test struct { // an individual test - name string - code string - fail bool - scope *interfaces.Scope - graph *pgraph.Graph - } - testCases := []test{} - - { - graph, _ := pgraph.NewGraph("g") - testCases = append(testCases, test{ // 0 - "nil", - ``, - false, - nil, - graph, - }) - } - { - graph, _ := pgraph.NewGraph("g") - testCases = append(testCases, test{ - name: "scope only", - code: ``, - fail: false, - scope: scope, // use the scope defined above - graph: graph, - }) - } - { - graph, _ := pgraph.NewGraph("g") - // empty graph at the moment, because they're all unused! - //v1, v2 := vtex("int(42)"), vtex("var(x)") - //e1 := edge("var:x") - //graph.AddVertex(&v1, &v2) - //graph.AddEdge(&v1, &v2, &e1) - testCases = append(testCases, test{ - name: "two vars", - code: ` - $x = 42 - $y = $x - `, - // TODO: this should fail with an unused variable error! - fail: false, - graph: graph, - }) - } - { - graph, _ := pgraph.NewGraph("g") - testCases = append(testCases, test{ - name: "self-referential vars", - code: ` - $x = $y - $y = $x - `, - fail: true, - graph: graph, - }) - } - { - graph, _ := pgraph.NewGraph("g") - v1, v2, v3, v4, v5 := vtex("int(42)"), vtex("var(a)"), vtex("var(b)"), vtex("var(c)"), vtex(`str("t")`) - e1, e2, e3 := edge("var:a"), edge("var:b"), edge("var:c") - graph.AddVertex(&v1, &v2, &v3, &v4, &v5) - graph.AddEdge(&v1, &v2, &e1) - graph.AddEdge(&v2, &v3, &e2) - graph.AddEdge(&v3, &v4, &e3) - testCases = append(testCases, test{ - name: "chained vars", - code: ` - test "t" { - int64ptr => $c, - } - $c = $b - $b = $a - $a = 42 - `, - fail: false, - graph: graph, - }) - } - { - graph, _ := pgraph.NewGraph("g") - v1, v2 := vtex("bool(true)"), vtex("var(b)") - graph.AddVertex(&v1, &v2) - e1 := edge("var:b") - graph.AddEdge(&v1, &v2, &e1) - testCases = append(testCases, test{ - name: "simple bool", - code: ` - if $b { - } - $b = true - `, - fail: false, - graph: graph, - }) - } - { - graph, _ := pgraph.NewGraph("g") - v1, v2, v3, v4, v5 := vtex(`str("t")`), vtex(`str("+")`), vtex("int(42)"), vtex("int(13)"), vtex(fmt.Sprintf(`call:%s(str("+"), int(42), int(13))`, funcs.OperatorFuncName)) - graph.AddVertex(&v1, &v2, &v3, &v4, &v5) - e1, e2, e3 := edge("op"), edge("a"), edge("b") - graph.AddEdge(&v2, &v5, &e1) - graph.AddEdge(&v3, &v5, &e2) - graph.AddEdge(&v4, &v5, &e3) - testCases = append(testCases, test{ - name: "simple operator", - code: ` - test "t" { - int64ptr => 42 + 13, - } - `, - fail: false, - scope: scope, - graph: graph, - }) - } - { - graph, _ := pgraph.NewGraph("g") - v1, v2, v3 := vtex(`str("t")`), vtex(`str("-")`), vtex(`str("+")`) - v4, v5, v6 := vtex("int(42)"), vtex("int(13)"), vtex("int(99)") - v7 := vtex(fmt.Sprintf(`call:%s(str("+"), int(42), int(13))`, funcs.OperatorFuncName)) - v8 := vtex(fmt.Sprintf(`call:%s(str("-"), call:%s(str("+"), int(42), int(13)), int(99))`, funcs.OperatorFuncName, funcs.OperatorFuncName)) - - graph.AddVertex(&v1, &v2, &v3, &v4, &v5, &v6, &v7, &v8) - e1, e2, e3 := edge("op"), edge("a"), edge("b") - graph.AddEdge(&v3, &v7, &e1) - graph.AddEdge(&v4, &v7, &e2) - graph.AddEdge(&v5, &v7, &e3) - - e4, e5, e6 := edge("op"), edge("a"), edge("b") - graph.AddEdge(&v2, &v8, &e4) - graph.AddEdge(&v7, &v8, &e5) - graph.AddEdge(&v6, &v8, &e6) - testCases = append(testCases, test{ - name: "simple operators", - code: ` - test "t" { - int64ptr => 42 + 13 - 99, - } - `, - fail: false, - scope: scope, - graph: graph, - }) - } - { - graph, _ := pgraph.NewGraph("g") - v1, v2 := vtex("bool(true)"), vtex(`str("t")`) - v3, v4 := vtex("int(13)"), vtex("int(42)") - v5, v6 := vtex("var(i)"), vtex("var(x)") - v7, v8 := vtex(`str("+")`), vtex(fmt.Sprintf(`call:%s(str("+"), int(42), var(i))`, funcs.OperatorFuncName)) - - e1, e2, e3, e4, e5 := edge("op"), edge("a"), edge("b"), edge("var:i"), edge("var:x") - graph.AddVertex(&v1, &v2, &v3, &v4, &v5, &v6, &v7, &v8) - graph.AddEdge(&v3, &v5, &e4) - - graph.AddEdge(&v7, &v8, &e1) - graph.AddEdge(&v4, &v8, &e2) - graph.AddEdge(&v5, &v8, &e3) - - graph.AddEdge(&v8, &v6, &e5) - testCases = append(testCases, test{ - name: "nested resource and scoped var", - code: ` - if true { - test "t" { - int64ptr => $x, - } - $x = 42 + $i - } - $i = 13 - `, - fail: false, - scope: scope, - graph: graph, - }) - } - { - testCases = append(testCases, test{ - name: "out of scope error", - code: ` - # should be out of scope, and a compile error! - if $b { - } - if true { - $b = true - } - `, - fail: true, - }) - } - { - testCases = append(testCases, test{ - name: "variable re-declaration error", - code: ` - # this should fail b/c of variable re-declaration - $x = "hello" - $x = "world" # woops - `, - fail: true, - }) - } - { - graph, _ := pgraph.NewGraph("g") - v1, v2, v3 := vtex(`str("hello")`), vtex(`str("world")`), vtex("bool(true)") - v4, v5 := vtex("var(x)"), vtex(`str("t")`) - - graph.AddVertex(&v1, &v3, &v4, &v5) - _ = v2 // v2 is not used because it's shadowed! - e1 := edge("var:x") - // only one edge! (cool) - graph.AddEdge(&v1, &v4, &e1) - - testCases = append(testCases, test{ - name: "variable shadowing", - code: ` - # this should be okay, because var is shadowed - $x = "hello" - if true { - $x = "world" # shadowed - } - test "t" { - stringptr => $x, - } - `, - fail: false, - graph: graph, - }) - } - { - graph, _ := pgraph.NewGraph("g") - v1, v2, v3 := vtex(`str("hello")`), vtex(`str("world")`), vtex("bool(true)") - v4, v5 := vtex("var(x)"), vtex(`str("t")`) - - graph.AddVertex(&v2, &v3, &v4, &v5) - _ = v1 // v1 is not used because it's shadowed! - e1 := edge("var:x") - // only one edge! (cool) - graph.AddEdge(&v2, &v4, &e1) - - testCases = append(testCases, test{ - name: "variable shadowing inner", - code: ` - # this should be okay, because var is shadowed - $x = "hello" - if true { - $x = "world" # shadowed - test "t" { - stringptr => $x, - } - } - `, - fail: false, - graph: graph, - }) - } - // // FIXME: blocked by: https://github.com/purpleidea/mgmt/issues/199 - //{ - // graph, _ := pgraph.NewGraph("g") - // v0 := vtex("bool(true)") - // v1, v2 := vtex(`str("hello")`), vtex(`str("world")`) - // v3, v4 := vtex("var(x)"), vtex("var(x)") // different vertices! - // v5, v6 := vtex(`str("t1")`), vtex(`str("t2")`) - // - // graph.AddVertex(&v0, &v1, &v2, &v3, &v4, &v5, &v6) - // e1, e2 := edge("var:x"), edge("var:x") - // graph.AddEdge(&v1, &v3, &e1) - // graph.AddEdge(&v2, &v4, &e2) - // - // testCases = append(testCases, test{ - // name: "variable shadowing both", - // code: ` - // # this should be okay, because var is shadowed - // $x = "hello" - // if true { - // $x = "world" # shadowed - // test "t2" { - // stringptr => $x, - // } - // } - // test "t1" { - // stringptr => $x, - // } - // `, - // fail: false, - // graph: graph, - // }) - //} - // // FIXME: blocked by: https://github.com/purpleidea/mgmt/issues/199 - //{ - // graph, _ := pgraph.NewGraph("g") - // v1, v2 := vtex(`str("cowsay")`), vtex(`str("cowsay")`) - // v3, v4 := vtex(`str("installed)`), vtex(`str("newest")`) - // - // graph.AddVertex(&v1, &v2, &v3, &v4) - // - // testCases = append(testCases, test{ - // name: "duplicate resource", - // code: ` - // # these two are allowed because they are compatible - // pkg "cowsay" { - // state => "installed", - // } - // pkg "cowsay" { - // state => "newest", - // } - // `, - // fail: false, - // graph: graph, - // }) - //} - { - testCases = append(testCases, test{ - name: "variable re-declaration and type change error", - code: ` - # this should fail b/c of variable re-declaration - $x = "wow" - $x = 99 # woops, but also a change of type :P - `, - fail: true, - }) - } - - names := []string{} - for index, tc := range testCases { // run all the tests - if tc.name == "" { - t.Errorf("test #%d: not named", index) - continue - } - if util.StrInList(tc.name, names) { - t.Errorf("test #%d: duplicate sub test name of: %s", index, tc.name) - continue - } - names = append(names, tc.name) - - //if index != 3 { // hack to run a subset (useful for debugging) - //if tc.name != "simple operators" { - // continue - //} - - t.Run(fmt.Sprintf("test #%d (%s)", index, tc.name), func(t *testing.T) { - name, code, fail, scope, exp := tc.name, tc.code, tc.fail, tc.scope, tc.graph - - t.Logf("\n\ntest #%d (%s) ----------------\n\n", index, name) - str := strings.NewReader(code) - xast, err := parser.LexParse(str) - if err != nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: lex/parse failed with: %+v", index, err) - return - } - t.Logf("test #%d: AST: %+v", index, xast) - - data := &interfaces.Data{ - // TODO: add missing fields here if/when needed - StrInterpolater: interpolate.StrInterpolate, - - Debug: testing.Verbose(), // set via the -test.v flag to `go test` - Logf: func(format string, v ...interface{}) { - t.Logf("ast: "+format, v...) - }, - } - // some of this might happen *after* interpolate in SetScope or Unify... - if err := xast.Init(data); err != nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: could not init and validate AST: %+v", index, err) - return - } - - iast, err := xast.Interpolate() - if err != nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: interpolate failed with: %+v", index, err) - return - } - - // propagate the scope down through the AST... - err = iast.SetScope(scope) - if !fail && err != nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: could not set scope: %+v", index, err) - return - } - if fail && err != nil { - return // fail happened during set scope, don't run unification! - } - - // apply type unification - logf := func(format string, v ...interface{}) { - t.Logf(fmt.Sprintf("test #%d", index)+": unification: "+format, v...) - } - unifier := &unification.Unifier{ - AST: iast, - Solver: unification.SimpleInvariantSolverLogger(logf), - Debug: testing.Verbose(), - Logf: logf, - } - err = unifier.Unify() - if !fail && err != nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: could not unify types: %+v", index, err) - return - } - // maybe it will fail during graph below instead? - //if fail && err == nil { - // t.Errorf("test #%d: FAIL", index) - // t.Errorf("test #%d: unification passed, expected fail", index) - // continue - //} - if fail && err != nil { - return // fail happened during unification, don't run Graph! - } - - // build the function graph - graph, err := iast.Graph() - - if !fail && err != nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: functions failed with: %+v", index, err) - return - } - if fail && err == nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: functions passed, expected fail", index) - return - } - - if fail { // can't process graph if it's nil - // TODO: match against expected error - t.Logf("test #%d: error: %+v", index, err) - return - } - - t.Logf("test #%d: graph: %+v", index, graph) - // TODO: improve: https://github.com/purpleidea/mgmt/issues/199 - if err := graph.GraphCmp(exp, vertexAstCmpFn, edgeAstCmpFn); err != nil { - t.Errorf("test #%d: FAIL\n\n", index) - t.Logf("test #%d: actual (g1): %v%s\n\n", index, graph, fullPrint(graph)) - t.Logf("test #%d: expected (g2): %v%s\n\n", index, exp, fullPrint(exp)) - t.Errorf("test #%d: cmp error:\n%v", index, err) - return - } - - for i, v := range graph.Vertices() { - t.Logf("test #%d: vertex(%d): %+v", index, i, v) - } - for v1 := range graph.Adjacency() { - for v2, e := range graph.Adjacency()[v1] { - t.Logf("test #%d: edge(%+v): %+v -> %+v", index, e, v1, v2) - } - } - }) - } -} - // TestAstFunc1 is a more advanced version which pulls code from physical dirs. func TestAstFunc1(t *testing.T) { const magicError = "# err: " @@ -918,7 +421,6 @@ func TestAstFunc1(t *testing.T) { // build the function graph graph, err := iast.Graph() - if (!fail || !failGraph) && err != nil { t.Errorf("test #%d: FAIL", index) t.Errorf("test #%d: functions failed with: %+v", index, err) @@ -1384,6 +886,31 @@ func TestAstFunc2(t *testing.T) { return } + if runGraphviz { + t.Logf("test #%d: Running graphviz after setScope...", index) + + // build a graph of the AST, to make sure everything is connected properly + graph, err := pgraph.NewGraph("setScope") + if err != nil { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: could not create setScope graph: %+v", index, err) + return + } + ast, ok := iast.(interfaces.ScopeGrapher) + if !ok { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: can't graph scope", index) + return + } + ast.ScopeGraph(graph) + + if err := graph.ExecGraphviz("/tmp/set-scope.dot"); err != nil { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: writing graph failed: %+v", index, err) + return + } + } + // apply type unification xlogf := func(format string, v ...interface{}) { logf("unification: "+format, v...) @@ -1418,7 +945,6 @@ func TestAstFunc2(t *testing.T) { // build the function graph graph, err := iast.Graph() - if (!fail || !failGraph) && err != nil { t.Errorf("test #%d: FAIL", index) t.Errorf("test #%d: functions failed with: %+v", index, err) @@ -1441,9 +967,9 @@ func TestAstFunc2(t *testing.T) { } if graph.NumVertices() == 0 { // no funcs to load! - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: function graph is empty", index) - return + //t.Errorf("test #%d: FAIL", index) + t.Logf("test #%d: function graph is empty", index) + //return // let's test the engine on empty } t.Logf("test #%d: graph: %s", index, graph) @@ -1465,81 +991,115 @@ func TestAstFunc2(t *testing.T) { } } - // XXX: temporary compatibility mapping for now... - // XXX: this could be a helper function eventually... - //// map the graph from interfaces.Expr to interfaces.Func - //mapExprFunc := make(map[interfaces.Expr]interfaces.Func) - //for v1, x := range graph.Adjacency() { - // v1, ok := v1.(interfaces.Expr) - // if !ok { - // panic("programming error") - // } - // if _, exists := mapExprFunc[v1]; !exists { - // var err error - // mapExprFunc[v1], err = v1.Func() - // if err != nil { - // panic("programming error") - // } - // } - // //funcs.AddVertex(v1) - // for v2 := range x { - // v2, ok := v2.(interfaces.Expr) - // if !ok { - // panic("programming error") - // } - // if _, exists := mapExprFunc[v2]; !exists { - // var err error - // mapExprFunc[v2], err = v2.Func() - // if err != nil { - // panic("programming error") - // } - // - // } - // //funcs.AddEdge(v1, v2, edge) - // } - //} - // run the function engine once to get some real output - funcs := &funcs.Engine{ - Graph: graph, // not the same as the output graph! + funcs := &dage.Engine{ + Name: "test", Hostname: "", // NOTE: empty b/c not used World: world, // used partially in some tests Debug: testing.Verbose(), // set via the -test.v flag to `go test` Logf: func(format string, v ...interface{}) { logf("funcs: "+format, v...) }, - Glitch: false, // FIXME: verify this functionality is perfect! } logf("function engine initializing...") - if err := funcs.Init(); err != nil { + if err := funcs.Setup(); err != nil { t.Errorf("test #%d: FAIL", index) t.Errorf("test #%d: init error with func engine: %+v", index, err) return } + defer funcs.Cleanup() - logf("function engine validating...") - if err := funcs.Validate(); err != nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: validate error with func engine: %+v", index, err) - return - } + // XXX: can we type check things somehow? + //logf("function engine validating...") + //if err := funcs.Validate(); err != nil { + // t.Errorf("test #%d: FAIL", index) + // t.Errorf("test #%d: validate error with func engine: %+v", index, err) + // return + //} logf("function engine starting...") - // On failure, we expect the caller to run Close() to shutdown all of - // the currently initialized (and running) funcs... This is needed if - // we successfully ran `Run` but isn't needed only for Init/Validate. - if err := funcs.Run(); err != nil { + wg := &sync.WaitGroup{} + defer wg.Wait() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wg.Add(1) + go func() { + defer wg.Done() + if err := funcs.Run(ctx); err != nil { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: run error with func engine: %+v", index, err) + return + } + }() + + //wg.Add(1) + //go func() { // XXX: debugging + // defer wg.Done() + // for { + // select { + // case <-time.After(100 * time.Millisecond): // blocked functions + // t.Logf("test #%d: graphviz...", index) + // funcs.Graphviz("") // log to /tmp/... + // + // case <-ctx.Done(): + // return + // } + // } + //}() + + <-funcs.Started() // wait for startup (will not block forever) + + // Sanity checks for graph size. + if count := funcs.NumVertices(); count != 0 { t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: run error with func engine: %+v", index, err) + t.Errorf("test #%d: expected empty graph on start, got %d vertices", index, count) + } + defer func() { + if count := funcs.NumVertices(); count != 0 { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: expected empty graph on exit, got %d vertices", index, count) + } + }() + defer wg.Wait() + defer cancel() + + txn := funcs.Txn() + defer txn.Free() // remember to call Free() + txn.AddGraph(graph) + if err := txn.Commit(); err != nil { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: run error with initial commit: %+v", index, err) return } - // TODO: cleanup before we print any test failures... - defer funcs.Close() // cleanup + defer txn.Reverse() // should remove everything we added + + isEmpty := make(chan struct{}) + if graph.NumVertices() == 0 { // no funcs to load! + close(isEmpty) + } // wait for some activity logf("stream...") stream := funcs.Stream() + //select { + //case err, ok := <-stream: + // if !ok { + // t.Errorf("test #%d: FAIL", index) + // t.Errorf("test #%d: stream closed", index) + // return + // } + // if err != nil { + // t.Errorf("test #%d: FAIL", index) + // t.Errorf("test #%d: stream errored: %+v", index, err) + // return + // } + // + //case <-time.After(60 * time.Second): // blocked functions + // t.Errorf("test #%d: FAIL", index) + // t.Errorf("test #%d: stream timeout", index) + // return + //} // sometimes the <-stream seems to constantly (or for a // long time?) win the races against the <-time.After(), @@ -1559,13 +1119,17 @@ func TestAstFunc2(t *testing.T) { t.Errorf("test #%d: stream errored: %+v", index, err) return } - t.Logf("test #%d: stream...", index) + t.Logf("test #%d: got stream event!", index) max-- if max == 0 { break Loop } - case <-time.After(3 * time.Second): // blocked functions + case <-isEmpty: + break Loop + + case <-time.After(10 * time.Second): // blocked functions + t.Errorf("test #%d: unblocking because no event was sent by the function engine for a while", index) break Loop case <-time.After(60 * time.Second): // blocked functions @@ -1575,51 +1139,12 @@ func TestAstFunc2(t *testing.T) { } } + t.Logf("test #%d: %s", index, funcs.Stats()) + // run interpret! - table := funcs.Table() // map[pgraph.Vertex]types.Value - fn := func(n interfaces.Node) error { - expr, ok := n.(interfaces.Expr) - if !ok { - return nil - } - //f, exists := mapExprFunc[expr] - //if !exists { - // panic("programming error in mapExprFunc lookup") - //} - //val, exists := table[f] - //if !exists { - // fmt.Printf("XXX missing value in table is pointer: %p\n", f) - // return fmt.Errorf("missing value in table for: %s", f) - //} + table := funcs.Table() // map[interfaces.Func]types.Value - v, ok := expr.(pgraph.Vertex) - if !ok { - panic("programming error in interfaces.Expr -> pgraph.Vertex lookup") - } - val, exists := table[v] - if !exists { - // XXX: we have values in the AST which aren't need... - // XXX: confirmed with: time go test -race github.com/purpleidea/mgmt/lang/ -v -run TestAstFunc2/test_#42 (func-math1) for example. - fmt.Printf("XXX: missing value in table is pointer: %p\n", v) - return nil // XXX: workaround for now... - //return fmt.Errorf("missing value in table for: %s", v) - } - return expr.SetValue(val) // set the value - } - funcs.Lock() // XXX: apparently there are races between SetValue and reading obj.V values... - if err := iast.Apply(fn); err != nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: apply failed with: %+v", index, err) - t.Errorf("test #%d: table:", index) - for k, v := range table { - t.Errorf("test #%d: table: key: %+v ; value: %+v", index, k, v) - } - funcs.Unlock() - return - } - funcs.Unlock() - - ograph, err := interpret.Interpret(iast) + ograph, err := interpret.Interpret(iast, table) if (!fail || !failInterpret) && err != nil { t.Errorf("test #%d: FAIL", index) t.Errorf("test #%d: interpret failed with: %+v", index, err) @@ -1709,246 +1234,3 @@ func TestAstFunc2(t *testing.T) { t.Skip("skipping all tests...") } } - -// TestAstInterpret0 should only be run in limited circumstances. Read the code -// comments below to see how it is run. -func TestAstInterpret0(t *testing.T) { - type test struct { // an individual test - name string - code string - fail bool - graph *pgraph.Graph - } - testCases := []test{} - - { - graph, _ := pgraph.NewGraph("g") - testCases = append(testCases, test{ // 0 - "nil", - ``, - false, - graph, - }) - } - { - testCases = append(testCases, test{ - name: "wrong res field type", - code: ` - test "t1" { - stringptr => 42, # int, not str - } - `, - fail: true, - }) - } - { - graph, _ := pgraph.NewGraph("g") - t1, _ := engine.NewNamedResource("test", "t1") - x := t1.(*resources.TestRes) - int64ptr := int64(42) - x.Int64Ptr = &int64ptr - str := "okay cool" - x.StringPtr = &str - int8ptr := int8(127) - int8ptrptr := &int8ptr - int8ptrptrptr := &int8ptrptr - x.Int8PtrPtrPtr = &int8ptrptrptr - graph.AddVertex(t1) - testCases = append(testCases, test{ - name: "resource with three pointer fields", - code: ` - test "t1" { - int64ptr => 42, - stringptr => "okay cool", - int8ptrptrptr => 127, # super nested - } - `, - fail: false, - graph: graph, - }) - } - { - graph, _ := pgraph.NewGraph("g") - t1, _ := engine.NewNamedResource("test", "t1") - x := t1.(*resources.TestRes) - stringptr := "wow" - x.StringPtr = &stringptr - graph.AddVertex(t1) - testCases = append(testCases, test{ - name: "resource with simple string pointer field", - code: ` - test "t1" { - stringptr => "wow", - } - `, - graph: graph, - }) - } - { - // FIXME: add a better vertexCmpFn so we can compare send/recv! - graph, _ := pgraph.NewGraph("g") - t1, _ := engine.NewNamedResource("test", "t1") - { - x := t1.(*resources.TestRes) - int64Ptr := int64(42) - x.Int64Ptr = &int64Ptr - graph.AddVertex(t1) - } - t2, _ := engine.NewNamedResource("test", "t2") - { - x := t2.(*resources.TestRes) - int64Ptr := int64(13) - x.Int64Ptr = &int64Ptr - graph.AddVertex(t2) - } - edge := &engine.Edge{ - Name: fmt.Sprintf("%s -> %s", t1, t2), - Notify: false, - } - graph.AddEdge(t1, t2, edge) - testCases = append(testCases, test{ - name: "two resources and send/recv edge", - code: ` - test "t1" { - int64ptr => 42, - } - test "t2" { - int64ptr => 13, - } - - Test["t1"].hello -> Test["t2"].stringptr # send/recv - `, - graph: graph, - }) - } - { - graph, _ := pgraph.NewGraph("g") - t1, _ := engine.NewNamedResource("test", "t1") - x := t1.(*resources.TestRes) - stringptr := "this is meta" - x.StringPtr = &stringptr - m := &engine.MetaParams{ - Noop: true, // overwritten - Retry: -1, - Delay: 0, - Poll: 5, - Limit: 4.2, - Burst: 3, - Sema: []string{"foo:1", "bar:3"}, - Rewatch: false, - Realize: true, - } - x.SetMetaParams(m) - graph.AddVertex(t1) - testCases = append(testCases, test{ - name: "resource with meta params", - code: ` - test "t1" { - stringptr => "this is meta", - - Meta => struct{ - noop => false, - retry => -1, - delay => 0, - poll => 5, - limit => 4.2, - burst => 3, - sema => ["foo:1", "bar:3",], - rewatch => false, - realize => true, - reverse => true, - autoedge => true, - autogroup => true, - }, - Meta:noop => true, - Meta:reverse => true, - Meta:autoedge => true, - Meta:autogroup => true, - } - `, - graph: graph, - }) - } - - names := []string{} - for index, tc := range testCases { // run all the tests - name, code, fail, exp := tc.name, tc.code, tc.fail, tc.graph - - if name == "" { - name = "" - } - if util.StrInList(name, names) { - t.Errorf("test #%d: duplicate sub test name of: %s", index, name) - continue - } - names = append(names, name) - - //if index != 3 { // hack to run a subset (useful for debugging) - //if tc.name != "nil" { - // continue - //} - - t.Logf("\n\ntest #%d (%s) ----------------\n\n", index, name) - - str := strings.NewReader(code) - xast, err := parser.LexParse(str) - if err != nil { - t.Errorf("test #%d: lex/parse failed with: %+v", index, err) - continue - } - t.Logf("test #%d: AST: %+v", index, xast) - - data := &interfaces.Data{ - // TODO: add missing fields here if/when needed - Debug: testing.Verbose(), // set via the -test.v flag to `go test` - Logf: func(format string, v ...interface{}) { - t.Logf("ast: "+format, v...) - }, - } - // some of this might happen *after* interpolate in SetScope or Unify... - if err := xast.Init(data); err != nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: could not init and validate AST: %+v", index, err) - return - } - - // these tests only work in certain cases, since this does not - // perform type unification, run the function graph engine, and - // only gives you limited results... don't expect normal code to - // run and produce meaningful things in this test... - graph, err := interpret.Interpret(xast) - - if !fail && err != nil { - t.Errorf("test #%d: interpret failed with: %+v", index, err) - continue - } - if fail && err == nil { - t.Errorf("test #%d: interpret passed, expected fail", index) - continue - } - - if fail { // can't process graph if it's nil - // TODO: match against expected error - t.Logf("test #%d: expected fail, error: %+v", index, err) - continue - } - - t.Logf("test #%d: graph: %+v", index, graph) - // TODO: improve: https://github.com/purpleidea/mgmt/issues/199 - if err := graph.GraphCmp(exp, vertexCmpFn, edgeCmpFn); err != nil { - t.Logf("test #%d: actual (g1): %v%s", index, graph, fullPrint(graph)) - t.Logf("test #%d: expected (g2): %v%s", index, exp, fullPrint(exp)) - t.Errorf("test #%d: cmp error:\n%v", index, err) - continue - } - - for i, v := range graph.Vertices() { - t.Logf("test #%d: vertex(%d): %+v", index, i, v) - } - for v1 := range graph.Adjacency() { - for v2, e := range graph.Adjacency()[v1] { - t.Logf("test #%d: edge(%+v): %+v -> %+v", index, e, v1, v2) - } - } - } -} diff --git a/lang/lang.go b/lang/lang.go index e8d4a9b9..c9468c11 100644 --- a/lang/lang.go +++ b/lang/lang.go @@ -21,13 +21,14 @@ package lang import ( "bytes" + "context" "fmt" "sync" "github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/lang/ast" - "github.com/purpleidea/mgmt/lang/funcs" _ "github.com/purpleidea/mgmt/lang/funcs/core" // import so the funcs register + "github.com/purpleidea/mgmt/lang/funcs/dage" "github.com/purpleidea/mgmt/lang/funcs/vars" "github.com/purpleidea/mgmt/lang/inputs" "github.com/purpleidea/mgmt/lang/interfaces" @@ -62,30 +63,20 @@ type Lang struct { Logf func(format string, v ...interface{}) ast interfaces.Stmt // store main prog AST here - funcs *funcs.Engine // function event engine + funcs *dage.Engine // function event engine + graph *pgraph.Graph // function graph - loadedChan chan struct{} // loaded signal - - streamChan chan error // signals a new graph can be created or problem + streamChan <-chan error // signals a new graph can be created or problem //streamBurst bool // should we try and be bursty with the stream events? - closeChan chan struct{} // close signal - wg *sync.WaitGroup + wg *sync.WaitGroup } -// Init initializes the lang struct, and starts up the initial data sources. +// Init initializes the lang struct, and starts up the initial input parsing. // NOTE: The trick is that we need to get the list of funcs to watch AND start // watching them, *before* we pull their values, that way we'll know if they // changed from the values we wanted. func (obj *Lang) Init() error { - obj.loadedChan = make(chan struct{}) - obj.streamChan = make(chan error) - obj.closeChan = make(chan struct{}) - obj.wg = &sync.WaitGroup{} - - once := &sync.Once{} - loadedSignal := func() { close(obj.loadedChan) } // only run once! - if obj.Debug { obj.Logf("input: %s", obj.Input) tree, err := util.FsTree(obj.Fs, "/") // should look like gapi @@ -215,114 +206,120 @@ func (obj *Lang) Init() error { obj.Logf("building function graph...") // we assume that for some given code, the list of funcs doesn't change // iow, we don't support variable, variables or absurd things like that - graph, err := obj.ast.Graph() // build the graph of functions + obj.graph = &pgraph.Graph{Name: "functionGraph"} + env := make(map[string]interfaces.Func) + for k, v := range scope.Variables { + g, builtinFunc, err := v.Graph(nil) + if err != nil { + return errwrap.Wrapf(err, "calling Graph on builtins") + } + obj.graph.AddGraph(g) + env[k] = builtinFunc + } + g, err := obj.ast.Graph() // build the graph of functions if err != nil { return errwrap.Wrapf(err, "could not generate function graph") } + obj.graph.AddGraph(g) if obj.Debug { - obj.Logf("function graph: %+v", graph) - graph.Logf(obj.Logf) // log graph output with this logger... + obj.Logf("function graph: %+v", obj.graph) + obj.graph.Logf(obj.Logf) // log graph output with this logger... + //if err := obj.graph.ExecGraphviz("/tmp/graphviz.dot"); err != nil { + // return errwrap.Wrapf(err, "writing graph failed") + //} } - if graph.NumVertices() == 0 { // no funcs to load! - // send only one signal since we won't ever send after this! - obj.Logf("static graph found") - obj.wg.Add(1) - go func() { - defer obj.wg.Done() - defer close(obj.streamChan) // no more events are coming! - close(obj.loadedChan) // signal - select { - case obj.streamChan <- nil: // send one signal - // pass - case <-obj.closeChan: - return - } - }() - return nil // exit early, no funcs to load! - } - - obj.funcs = &funcs.Engine{ - Graph: graph, // not the same as the output graph! + obj.funcs = &dage.Engine{ + Name: "lang", // TODO: arbitrary name for now Hostname: obj.Hostname, World: obj.World, Debug: obj.Debug, Logf: func(format string, v ...interface{}) { obj.Logf("funcs: "+format, v...) }, - Glitch: false, // FIXME: verify this functionality is perfect! } obj.Logf("function engine initializing...") - if err := obj.funcs.Init(); err != nil { + if err := obj.funcs.Setup(); err != nil { return errwrap.Wrapf(err, "init error with func engine") } - obj.Logf("function engine validating...") - if err := obj.funcs.Validate(); err != nil { - return errwrap.Wrapf(err, "validate error with func engine") - } + obj.streamChan = obj.funcs.Stream() // after obj.funcs.Setup runs + + return nil +} + +// Run kicks off the function engine. Use the context to shut it down. +func (obj *Lang) Run(ctx context.Context) (reterr error) { + wg := &sync.WaitGroup{} + defer wg.Wait() + + runCtx, cancel := context.WithCancel(context.Background()) // Don't inherit from parent + defer cancel() + + //obj.Logf("function engine validating...") + //if err := obj.funcs.Validate(); err != nil { + // return errwrap.Wrapf(err, "validate error with func engine") + //} obj.Logf("function engine starting...") - // On failure, we expect the caller to run Close() to shutdown all of - // the currently initialized (and running) funcs... This is needed if - // we successfully ran `Run` but isn't needed only for Init/Validate. - if err := obj.funcs.Run(); err != nil { - return errwrap.Wrapf(err, "run error with func engine") + wg.Add(1) + go func() { + defer wg.Done() + if err := obj.funcs.Run(runCtx); err == nil { + reterr = errwrap.Append(reterr, err) + } + // Run() should only error if not a dag I think... + }() + + <-obj.funcs.Started() // wait for startup (will not block forever) + + // Sanity checks for graph size. + if count := obj.funcs.NumVertices(); count != 0 { + return fmt.Errorf("expected empty graph on start, got %d vertices", count) } + defer func() { + if count := obj.funcs.NumVertices(); count != 0 { + err := fmt.Errorf("expected empty graph on exit, got %d vertices", count) + reterr = errwrap.Append(reterr, err) + } + }() + defer wg.Wait() + defer cancel() // now cancel Run only after Reverse and Free are done! + + txn := obj.funcs.Txn() + defer txn.Free() // remember to call Free() + txn.AddGraph(obj.graph) + if err := txn.Commit(); err != nil { + return errwrap.Wrapf(err, "error adding to function graph engine") + } + defer func() { + if err := txn.Reverse(); err != nil { // should remove everything we added + reterr = errwrap.Append(reterr, err) + } + }() // wait for some activity obj.Logf("stream...") - stream := obj.funcs.Stream() - obj.wg.Add(1) - go func() { - obj.Logf("loop...") - defer obj.wg.Done() - defer close(obj.streamChan) // no more events are coming! - for { - var err error - var ok bool - select { - case err, ok = <-stream: - if !ok { - obj.Logf("stream closed") - return - } - if err == nil { - // only do this once, on the first event - once.Do(loadedSignal) // signal - } - case <-obj.closeChan: - return - } + select { + case <-ctx.Done(): + } - select { - case obj.streamChan <- err: // send - if err != nil { - obj.Logf("stream error: %+v", err) - return - } - - case <-obj.closeChan: - return - } - } - }() return nil } // Stream returns a channel of graph change requests or errors. These are // usually sent when a func output changes. -func (obj *Lang) Stream() chan error { +func (obj *Lang) Stream() <-chan error { return obj.streamChan } // Interpret runs the interpreter and returns a graph and corresponding error. func (obj *Lang) Interpret() (*pgraph.Graph, error) { select { - case <-obj.loadedChan: // funcs are now loaded! + case <-obj.funcs.Loaded(): // funcs are now loaded! // pass default: // if this is hit, someone probably called this too early! @@ -332,37 +329,9 @@ func (obj *Lang) Interpret() (*pgraph.Graph, error) { obj.Logf("running interpret...") table := obj.funcs.Table() // map[pgraph.Vertex]types.Value - fn := func(n interfaces.Node) error { - expr, ok := n.(interfaces.Expr) - if !ok { - return nil - } - v, ok := expr.(pgraph.Vertex) - if !ok { - panic("programming error in interfaces.Expr -> pgraph.Vertex lookup") - } - val, exists := table[v] - if !exists { - fmt.Printf("XXX: missing value in table is pointer: %p\n", v) - return nil // XXX: workaround for now... - //return fmt.Errorf("missing value in table for: %s", v) - } - return expr.SetValue(val) // set the value - } - obj.funcs.Lock() // XXX: apparently there are races between SetValue and reading obj.V values... - if err := obj.ast.Apply(fn); err != nil { - if obj.Debug { - for k, v := range table { - obj.Logf("table: key: %+v ; value: %+v", k, v) - } - } - obj.funcs.Unlock() - return nil, err - } - obj.funcs.Unlock() // this call returns the graph - graph, err := interpret.Interpret(obj.ast) + graph, err := interpret.Interpret(obj.ast, table) if err != nil { return nil, errwrap.Wrapf(err, "could not interpret") } @@ -370,14 +339,7 @@ func (obj *Lang) Interpret() (*pgraph.Graph, error) { return graph, nil // return a graph } -// Close shuts down the lang struct and causes all the funcs to shutdown. It -// must be called when finished after any successful Init ran. -func (obj *Lang) Close() error { - var err error - if obj.funcs != nil { - err = obj.funcs.Close() - } - close(obj.closeChan) - obj.wg.Wait() - return err +// Cleanup cleans up and frees memory and resources after everything is done. +func (obj *Lang) Cleanup() error { + return obj.funcs.Cleanup() } diff --git a/lang/lang_test.go b/lang/lang_test.go index d2b51f14..026b5f79 100644 --- a/lang/lang_test.go +++ b/lang/lang_test.go @@ -20,7 +20,9 @@ package lang import ( + "context" "fmt" + "sync" "testing" "github.com/purpleidea/mgmt/engine" @@ -90,7 +92,7 @@ func edgeCmpFn(e1, e2 pgraph.Edge) (bool, error) { return e1.String() == e2.String(), nil } -func runInterpret(t *testing.T, code string) (*pgraph.Graph, error) { +func runInterpret(t *testing.T, code string) (_ *pgraph.Graph, reterr error) { logf := func(format string, v ...interface{}) { t.Logf("test: lang: "+format, v...) } @@ -123,31 +125,41 @@ func runInterpret(t *testing.T, code string) (*pgraph.Graph, error) { if err := lang.Init(); err != nil { return nil, errwrap.Wrapf(err, "init failed") } - closeFn := func() error { - return errwrap.Wrapf(lang.Close(), "close failed") - } + defer lang.Cleanup() + + wg := &sync.WaitGroup{} + defer wg.Wait() + + ctx, cancel := context.WithCancel(context.Background()) // TODO: get it from parent + defer cancel() + + wg.Add(1) + go func() { + defer wg.Done() + if err := lang.Run(ctx); err != nil { + reterr = errwrap.Append(reterr, err) + } + }() + defer cancel() // shutdown the Run // we only wait for the first event, instead of the continuous stream select { case err, ok := <-lang.Stream(): if !ok { - return nil, errwrap.Wrapf(closeFn(), "stream closed without event") + return nil, fmt.Errorf("stream closed without event") } if err != nil { - return nil, errwrap.Wrapf(err, "stream failed, close: %+v", closeFn()) + return nil, errwrap.Wrapf(err, "stream failed") } } // run artificially without the entire GAPI loop graph, err := lang.Interpret() if err != nil { - err := errwrap.Wrapf(err, "interpret failed") - e := closeFn() - err = errwrap.Append(err, e) // list of errors - return nil, err + return nil, errwrap.Wrapf(err, "interpret failed") } - return graph, closeFn() + return graph, nil } // TODO: empty code is not currently allowed, should we allow it? diff --git a/lang/unification_test.go b/lang/unification_test.go index f8c9bff1..f147448c 100644 --- a/lang/unification_test.go +++ b/lang/unification_test.go @@ -451,60 +451,6 @@ func TestUnification1(t *testing.T) { }, }) } - { - //$v = 42 - //$x = template("hello", $v) # redirect var for harder unification - //test "t1" { - // anotherstr => $x, - //} - innerFunc := &ast.ExprCall{ - Name: "template", - Args: []interfaces.Expr{ - &ast.ExprStr{ - V: "hello", // whatever... - }, - &ast.ExprVar{ - Name: "v", - }, - }, - } - stmt := &ast.StmtProg{ - Body: []interfaces.Stmt{ - &ast.StmtBind{ - Ident: "v", - Value: &ast.ExprInt{ - V: 42, - }, - }, - &ast.StmtBind{ - Ident: "x", - Value: innerFunc, - }, - &ast.StmtRes{ - Kind: "test", - Name: &ast.ExprStr{ - V: "t1", - }, - Contents: []ast.StmtResContents{ - &ast.StmtResField{ - Field: "anotherstr", - Value: &ast.ExprVar{ - Name: "x", - }, - }, - }, - }, - }, - } - testCases = append(testCases, test{ - name: "complex template", - ast: stmt, - fail: false, - expect: map[interfaces.Expr]*types.Type{ - innerFunc: types.NewType("str"), - }, - }) - } { // import "datetime" //test "t1" {