lang: Remove SetValue from the engine
This removes the calling of SetValue from the engine, and instead replaces it with the Table() API. The downside is that this is likely slower, and the current API with locking being exposed publicly is kind of ugly. The upside is that this might make building the new engine easier. Future versions might remove locking from the API if we can avoid making any accesses to expressions. Currently this happens within Logf/SafeLogf which is our main (only?) usage at the moment. Logging could become smarter perhaps. Alternatively, we might pass in a "setter" function that gets called safely from within the engine. This could wrap SetValue and the locking functions wouldn't be part of the public API.
This commit is contained in:
@@ -450,10 +450,10 @@ func (obj *Engine) Run() error {
|
||||
// XXX: maybe we can get rid of the table...
|
||||
obj.table[vertex] = value // save the latest
|
||||
node.mutex.Lock()
|
||||
if err := node.Expr.SetValue(value); err != nil {
|
||||
node.mutex.Unlock() // don't block node.String()
|
||||
panic(fmt.Sprintf("could not set value for `%s`: %+v", node, err))
|
||||
}
|
||||
//if err := node.Expr.SetValue(value); err != nil {
|
||||
// node.mutex.Unlock() // don't block node.String()
|
||||
// panic(fmt.Sprintf("could not set value for `%s`: %+v", node, err))
|
||||
//}
|
||||
node.loaded = true // set *after* value is in :)
|
||||
obj.Logf("func `%s` changed", node)
|
||||
node.mutex.Unlock()
|
||||
@@ -485,7 +485,7 @@ func (obj *Engine) Run() error {
|
||||
}
|
||||
}
|
||||
// no more output values are coming...
|
||||
obj.Logf("func `%s` stopped", node)
|
||||
obj.SafeLogf("func `%s` stopped", node)
|
||||
|
||||
// nodes that never loaded will cause the engine to hang
|
||||
if !node.loaded {
|
||||
@@ -603,6 +603,22 @@ func (obj *Engine) agDone(vertex pgraph.Vertex) {
|
||||
}
|
||||
}
|
||||
|
||||
// Lock takes a write lock on the data that gets written to the AST, so that
|
||||
// interpret/SetValue can be run without anything changing part way through.
|
||||
// XXX: This API is kind of yucky, but is related to us running .String() on the
|
||||
// nodes. Maybe we can avoid this somehow?
|
||||
func (obj *Engine) Lock() {
|
||||
obj.mutex.Lock()
|
||||
}
|
||||
|
||||
// Unlock takes a write lock on the data that gets written to the AST, so that
|
||||
// interpret/SetValue can be run without anything changing part way through.
|
||||
// XXX: This API is kind of yucky, but is related to us running .String() on the
|
||||
// nodes. Maybe we can avoid this somehow?
|
||||
func (obj *Engine) Unlock() {
|
||||
obj.mutex.Unlock()
|
||||
}
|
||||
|
||||
// RLock takes a read lock on the data that gets written to the AST, so that
|
||||
// interpret can be run without anything changing part way through.
|
||||
func (obj *Engine) RLock() {
|
||||
@@ -636,6 +652,22 @@ func (obj *Engine) Stream() chan error {
|
||||
return obj.streamChan
|
||||
}
|
||||
|
||||
// Table returns a copy of the populated data table of values. We return a copy
|
||||
// because since these values are constantly changing, we need an atomic
|
||||
// snapshot to present to the consumer of this API.
|
||||
// TODO: is this globally glitch consistent?
|
||||
// TODO: do we need an API to return a single value? (wrapped in read locks)
|
||||
func (obj *Engine) Table() map[pgraph.Vertex]types.Value {
|
||||
obj.mutex.RLock()
|
||||
defer obj.mutex.RUnlock()
|
||||
table := make(map[pgraph.Vertex]types.Value)
|
||||
for k, v := range obj.table {
|
||||
//table[k] = v.Copy() // XXX: do we need to copy these values?
|
||||
table[k] = v
|
||||
}
|
||||
return table
|
||||
}
|
||||
|
||||
// Close shuts down the function engine. It waits till everything has finished.
|
||||
func (obj *Engine) Close() error {
|
||||
var err error
|
||||
|
||||
@@ -1497,6 +1497,40 @@ func TestAstFunc2(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: temporary compatibility mapping for now...
|
||||
// XXX: this could be a helper function eventually...
|
||||
//// map the graph from interfaces.Expr to interfaces.Func
|
||||
//mapExprFunc := make(map[interfaces.Expr]interfaces.Func)
|
||||
//for v1, x := range graph.Adjacency() {
|
||||
// v1, ok := v1.(interfaces.Expr)
|
||||
// if !ok {
|
||||
// panic("programming error")
|
||||
// }
|
||||
// if _, exists := mapExprFunc[v1]; !exists {
|
||||
// var err error
|
||||
// mapExprFunc[v1], err = v1.Func()
|
||||
// if err != nil {
|
||||
// panic("programming error")
|
||||
// }
|
||||
// }
|
||||
// //funcs.AddVertex(v1)
|
||||
// for v2 := range x {
|
||||
// v2, ok := v2.(interfaces.Expr)
|
||||
// if !ok {
|
||||
// panic("programming error")
|
||||
// }
|
||||
// if _, exists := mapExprFunc[v2]; !exists {
|
||||
// var err error
|
||||
// mapExprFunc[v2], err = v2.Func()
|
||||
// if err != nil {
|
||||
// panic("programming error")
|
||||
// }
|
||||
//
|
||||
// }
|
||||
// //funcs.AddEdge(v1, v2, edge)
|
||||
// }
|
||||
//}
|
||||
|
||||
// run the function engine once to get some real output
|
||||
funcs := &funcs.Engine{
|
||||
Graph: graph, // not the same as the output graph!
|
||||
@@ -1538,30 +1572,86 @@ func TestAstFunc2(t *testing.T) {
|
||||
// wait for some activity
|
||||
logf("stream...")
|
||||
stream := funcs.Stream()
|
||||
select {
|
||||
case err, ok := <-stream:
|
||||
if !ok {
|
||||
t.Errorf("test #%d: FAIL", index)
|
||||
t.Errorf("test #%d: stream closed", index)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("test #%d: FAIL", index)
|
||||
t.Errorf("test #%d: stream errored: %+v", index, err)
|
||||
return
|
||||
}
|
||||
|
||||
case <-time.After(60 * time.Second): // blocked functions
|
||||
t.Errorf("test #%d: FAIL", index)
|
||||
t.Errorf("test #%d: stream timeout", index)
|
||||
return
|
||||
// sometimes the <-stream seems to constantly (or for a
|
||||
// long time?) win the races against the <-time.After(),
|
||||
// so add some limit to how many times we need to stream
|
||||
max := 1
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case err, ok := <-stream:
|
||||
if !ok {
|
||||
t.Errorf("test #%d: FAIL", index)
|
||||
t.Errorf("test #%d: stream closed", index)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("test #%d: FAIL", index)
|
||||
t.Errorf("test #%d: stream errored: %+v", index, err)
|
||||
return
|
||||
}
|
||||
t.Logf("test #%d: stream...", index)
|
||||
max--
|
||||
if max == 0 {
|
||||
break Loop
|
||||
}
|
||||
|
||||
case <-time.After(3 * time.Second): // blocked functions
|
||||
break Loop
|
||||
|
||||
case <-time.After(60 * time.Second): // blocked functions
|
||||
t.Errorf("test #%d: FAIL", index)
|
||||
t.Errorf("test #%d: stream timeout", index)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// run interpret!
|
||||
funcs.RLock() // in case something is actually changing
|
||||
ograph, err := interpret.Interpret(iast)
|
||||
funcs.RUnlock()
|
||||
table := funcs.Table() // map[pgraph.Vertex]types.Value
|
||||
fn := func(n interfaces.Node) error {
|
||||
expr, ok := n.(interfaces.Expr)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
//f, exists := mapExprFunc[expr]
|
||||
//if !exists {
|
||||
// panic("programming error in mapExprFunc lookup")
|
||||
//}
|
||||
//val, exists := table[f]
|
||||
//if !exists {
|
||||
// fmt.Printf("XXX missing value in table is pointer: %p\n", f)
|
||||
// return fmt.Errorf("missing value in table for: %s", f)
|
||||
//}
|
||||
|
||||
v, ok := expr.(pgraph.Vertex)
|
||||
if !ok {
|
||||
panic("programming error in interfaces.Expr -> pgraph.Vertex lookup")
|
||||
}
|
||||
val, exists := table[v]
|
||||
if !exists {
|
||||
// XXX: we have values in the AST which aren't need...
|
||||
// XXX: confirmed with: time go test -race github.com/purpleidea/mgmt/lang/ -v -run TestAstFunc2/test_#42 (func-math1) for example.
|
||||
fmt.Printf("XXX: missing value in table is pointer: %p\n", v)
|
||||
return nil // XXX: workaround for now...
|
||||
//return fmt.Errorf("missing value in table for: %s", v)
|
||||
}
|
||||
return expr.SetValue(val) // set the value
|
||||
}
|
||||
funcs.Lock() // XXX: apparently there are races between SetValue and reading obj.V values...
|
||||
if err := iast.Apply(fn); err != nil {
|
||||
t.Errorf("test #%d: FAIL", index)
|
||||
t.Errorf("test #%d: apply failed with: %+v", index, err)
|
||||
t.Errorf("test #%d: table:", index)
|
||||
for k, v := range table {
|
||||
t.Errorf("test #%d: table: key: %+v ; value: %+v", index, k, v)
|
||||
}
|
||||
funcs.Unlock()
|
||||
return
|
||||
}
|
||||
funcs.Unlock()
|
||||
|
||||
ograph, err := interpret.Interpret(iast)
|
||||
if (!fail || !failInterpret) && err != nil {
|
||||
t.Errorf("test #%d: FAIL", index)
|
||||
t.Errorf("test #%d: interpret failed with: %+v", index, err)
|
||||
|
||||
34
lang/lang.go
34
lang/lang.go
@@ -331,14 +331,38 @@ func (obj *Lang) Interpret() (*pgraph.Graph, error) {
|
||||
}
|
||||
|
||||
obj.Logf("running interpret...")
|
||||
if obj.funcs != nil { // no need to rlock if we have a static graph
|
||||
obj.funcs.RLock()
|
||||
table := obj.funcs.Table() // map[pgraph.Vertex]types.Value
|
||||
fn := func(n interfaces.Node) error {
|
||||
expr, ok := n.(interfaces.Expr)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
v, ok := expr.(pgraph.Vertex)
|
||||
if !ok {
|
||||
panic("programming error in interfaces.Expr -> pgraph.Vertex lookup")
|
||||
}
|
||||
val, exists := table[v]
|
||||
if !exists {
|
||||
fmt.Printf("XXX: missing value in table is pointer: %p\n", v)
|
||||
return nil // XXX: workaround for now...
|
||||
//return fmt.Errorf("missing value in table for: %s", v)
|
||||
}
|
||||
return expr.SetValue(val) // set the value
|
||||
}
|
||||
obj.funcs.Lock() // XXX: apparently there are races between SetValue and reading obj.V values...
|
||||
if err := obj.ast.Apply(fn); err != nil {
|
||||
if obj.Debug {
|
||||
for k, v := range table {
|
||||
obj.Logf("table: key: %+v ; value: %+v", k, v)
|
||||
}
|
||||
}
|
||||
obj.funcs.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
obj.funcs.Unlock()
|
||||
|
||||
// this call returns the graph
|
||||
graph, err := interpret.Interpret(obj.ast)
|
||||
if obj.funcs != nil {
|
||||
obj.funcs.RUnlock()
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errwrap.Wrapf(err, "could not interpret")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user