diff --git a/lang/funcs/engine.go b/lang/funcs/engine.go index 006d8891..1f9a6d3c 100644 --- a/lang/funcs/engine.go +++ b/lang/funcs/engine.go @@ -450,10 +450,10 @@ func (obj *Engine) Run() error { // XXX: maybe we can get rid of the table... obj.table[vertex] = value // save the latest node.mutex.Lock() - if err := node.Expr.SetValue(value); err != nil { - node.mutex.Unlock() // don't block node.String() - panic(fmt.Sprintf("could not set value for `%s`: %+v", node, err)) - } + //if err := node.Expr.SetValue(value); err != nil { + // node.mutex.Unlock() // don't block node.String() + // panic(fmt.Sprintf("could not set value for `%s`: %+v", node, err)) + //} node.loaded = true // set *after* value is in :) obj.Logf("func `%s` changed", node) node.mutex.Unlock() @@ -485,7 +485,7 @@ func (obj *Engine) Run() error { } } // no more output values are coming... - obj.Logf("func `%s` stopped", node) + obj.SafeLogf("func `%s` stopped", node) // nodes that never loaded will cause the engine to hang if !node.loaded { @@ -603,6 +603,22 @@ func (obj *Engine) agDone(vertex pgraph.Vertex) { } } +// Lock takes a write lock on the data that gets written to the AST, so that +// interpret/SetValue can be run without anything changing part way through. +// XXX: This API is kind of yucky, but is related to us running .String() on the +// nodes. Maybe we can avoid this somehow? +func (obj *Engine) Lock() { + obj.mutex.Lock() +} + +// Unlock takes a write lock on the data that gets written to the AST, so that +// interpret/SetValue can be run without anything changing part way through. +// XXX: This API is kind of yucky, but is related to us running .String() on the +// nodes. Maybe we can avoid this somehow? +func (obj *Engine) Unlock() { + obj.mutex.Unlock() +} + // RLock takes a read lock on the data that gets written to the AST, so that // interpret can be run without anything changing part way through. func (obj *Engine) RLock() { @@ -636,6 +652,22 @@ func (obj *Engine) Stream() chan error { return obj.streamChan } +// Table returns a copy of the populated data table of values. We return a copy +// because since these values are constantly changing, we need an atomic +// snapshot to present to the consumer of this API. +// TODO: is this globally glitch consistent? +// TODO: do we need an API to return a single value? (wrapped in read locks) +func (obj *Engine) Table() map[pgraph.Vertex]types.Value { + obj.mutex.RLock() + defer obj.mutex.RUnlock() + table := make(map[pgraph.Vertex]types.Value) + for k, v := range obj.table { + //table[k] = v.Copy() // XXX: do we need to copy these values? + table[k] = v + } + return table +} + // Close shuts down the function engine. It waits till everything has finished. func (obj *Engine) Close() error { var err error diff --git a/lang/interpret_test.go b/lang/interpret_test.go index 707fab68..c76c3233 100644 --- a/lang/interpret_test.go +++ b/lang/interpret_test.go @@ -1497,6 +1497,40 @@ func TestAstFunc2(t *testing.T) { } } + // XXX: temporary compatibility mapping for now... + // XXX: this could be a helper function eventually... + //// map the graph from interfaces.Expr to interfaces.Func + //mapExprFunc := make(map[interfaces.Expr]interfaces.Func) + //for v1, x := range graph.Adjacency() { + // v1, ok := v1.(interfaces.Expr) + // if !ok { + // panic("programming error") + // } + // if _, exists := mapExprFunc[v1]; !exists { + // var err error + // mapExprFunc[v1], err = v1.Func() + // if err != nil { + // panic("programming error") + // } + // } + // //funcs.AddVertex(v1) + // for v2 := range x { + // v2, ok := v2.(interfaces.Expr) + // if !ok { + // panic("programming error") + // } + // if _, exists := mapExprFunc[v2]; !exists { + // var err error + // mapExprFunc[v2], err = v2.Func() + // if err != nil { + // panic("programming error") + // } + // + // } + // //funcs.AddEdge(v1, v2, edge) + // } + //} + // run the function engine once to get some real output funcs := &funcs.Engine{ Graph: graph, // not the same as the output graph! @@ -1538,30 +1572,86 @@ func TestAstFunc2(t *testing.T) { // wait for some activity logf("stream...") stream := funcs.Stream() - select { - case err, ok := <-stream: - if !ok { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: stream closed", index) - return - } - if err != nil { - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: stream errored: %+v", index, err) - return - } - case <-time.After(60 * time.Second): // blocked functions - t.Errorf("test #%d: FAIL", index) - t.Errorf("test #%d: stream timeout", index) - return + // sometimes the <-stream seems to constantly (or for a + // long time?) win the races against the <-time.After(), + // so add some limit to how many times we need to stream + max := 1 + Loop: + for { + select { + case err, ok := <-stream: + if !ok { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: stream closed", index) + return + } + if err != nil { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: stream errored: %+v", index, err) + return + } + t.Logf("test #%d: stream...", index) + max-- + if max == 0 { + break Loop + } + + case <-time.After(3 * time.Second): // blocked functions + break Loop + + case <-time.After(60 * time.Second): // blocked functions + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: stream timeout", index) + return + } } // run interpret! - funcs.RLock() // in case something is actually changing - ograph, err := interpret.Interpret(iast) - funcs.RUnlock() + table := funcs.Table() // map[pgraph.Vertex]types.Value + fn := func(n interfaces.Node) error { + expr, ok := n.(interfaces.Expr) + if !ok { + return nil + } + //f, exists := mapExprFunc[expr] + //if !exists { + // panic("programming error in mapExprFunc lookup") + //} + //val, exists := table[f] + //if !exists { + // fmt.Printf("XXX missing value in table is pointer: %p\n", f) + // return fmt.Errorf("missing value in table for: %s", f) + //} + v, ok := expr.(pgraph.Vertex) + if !ok { + panic("programming error in interfaces.Expr -> pgraph.Vertex lookup") + } + val, exists := table[v] + if !exists { + // XXX: we have values in the AST which aren't need... + // XXX: confirmed with: time go test -race github.com/purpleidea/mgmt/lang/ -v -run TestAstFunc2/test_#42 (func-math1) for example. + fmt.Printf("XXX: missing value in table is pointer: %p\n", v) + return nil // XXX: workaround for now... + //return fmt.Errorf("missing value in table for: %s", v) + } + return expr.SetValue(val) // set the value + } + funcs.Lock() // XXX: apparently there are races between SetValue and reading obj.V values... + if err := iast.Apply(fn); err != nil { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: apply failed with: %+v", index, err) + t.Errorf("test #%d: table:", index) + for k, v := range table { + t.Errorf("test #%d: table: key: %+v ; value: %+v", index, k, v) + } + funcs.Unlock() + return + } + funcs.Unlock() + + ograph, err := interpret.Interpret(iast) if (!fail || !failInterpret) && err != nil { t.Errorf("test #%d: FAIL", index) t.Errorf("test #%d: interpret failed with: %+v", index, err) diff --git a/lang/lang.go b/lang/lang.go index 3789edbe..9ae00a9c 100644 --- a/lang/lang.go +++ b/lang/lang.go @@ -331,14 +331,38 @@ func (obj *Lang) Interpret() (*pgraph.Graph, error) { } obj.Logf("running interpret...") - if obj.funcs != nil { // no need to rlock if we have a static graph - obj.funcs.RLock() + table := obj.funcs.Table() // map[pgraph.Vertex]types.Value + fn := func(n interfaces.Node) error { + expr, ok := n.(interfaces.Expr) + if !ok { + return nil + } + v, ok := expr.(pgraph.Vertex) + if !ok { + panic("programming error in interfaces.Expr -> pgraph.Vertex lookup") + } + val, exists := table[v] + if !exists { + fmt.Printf("XXX: missing value in table is pointer: %p\n", v) + return nil // XXX: workaround for now... + //return fmt.Errorf("missing value in table for: %s", v) + } + return expr.SetValue(val) // set the value } + obj.funcs.Lock() // XXX: apparently there are races between SetValue and reading obj.V values... + if err := obj.ast.Apply(fn); err != nil { + if obj.Debug { + for k, v := range table { + obj.Logf("table: key: %+v ; value: %+v", k, v) + } + } + obj.funcs.Unlock() + return nil, err + } + obj.funcs.Unlock() + // this call returns the graph graph, err := interpret.Interpret(obj.ast) - if obj.funcs != nil { - obj.funcs.RUnlock() - } if err != nil { return nil, errwrap.Wrapf(err, "could not interpret") }