diff --git a/lang/funcs/engine.go b/lang/funcs/engine.go index d3d79f20..0cef4440 100644 --- a/lang/funcs/engine.go +++ b/lang/funcs/engine.go @@ -47,6 +47,8 @@ type State struct { input chan types.Value // the top level type must be a struct output chan types.Value + + mutex *sync.RWMutex // concurrency guard for modifying Expr with String/SetValue } // Init creates the function state if it can be found in the registered list. @@ -72,11 +74,17 @@ func (obj *State) Init() error { obj.input = make(chan types.Value) // we close this when we're done obj.output = make(chan types.Value) // we create it, func closes it + obj.mutex = &sync.RWMutex{} + return nil } // String satisfies fmt.Stringer so that these print nicely. func (obj *State) String() string { + // TODO: use global mutex since it's harder to add state specific mutex + //obj.mutex.RLock() // prevent race detector issues against SetValue + //defer obj.mutex.RUnlock() + // FIXME: also add read locks on any of the children Expr in obj.Expr return obj.Expr.String() } @@ -323,7 +331,7 @@ func (obj *Engine) Run() error { for _, vertex := range obj.topologicalSort { node := obj.state[vertex] if obj.Debug { - obj.Logf("Startup func `%s`", node) + obj.SafeLogf("Startup func `%s`", node) } incoming := obj.Graph.IncomingGraphVertices(vertex) // []Vertex @@ -406,11 +414,11 @@ func (obj *Engine) Run() error { node := obj.state[vertex] defer obj.wg.Done() if obj.Debug { - obj.Logf("Running func `%s`", node) + obj.SafeLogf("Running func `%s`", node) } err := node.handle.Stream() if obj.Debug { - obj.Logf("Exiting func `%s`", node) + obj.SafeLogf("Exiting func `%s`", node) } if err != nil { // we closed with an error... @@ -450,12 +458,15 @@ func (obj *Engine) Run() error { obj.mutex.Lock() // XXX: maybe we can get rid of the table... obj.table[vertex] = value // save the latest + node.mutex.Lock() if err := node.Expr.SetValue(value); err != nil { + node.mutex.Unlock() // don't block node.String() panic(fmt.Sprintf("could not set value for `%s`: %+v", node, err)) } node.loaded = true // set *after* value is in :) - obj.mutex.Unlock() obj.Logf("func `%s` changed", node) + node.mutex.Unlock() + obj.mutex.Unlock() // FIXME: will this actually prevent glitching? // if we only notify the aggregate channel when @@ -507,7 +518,10 @@ func (obj *Engine) Run() error { var loaded = true // initially assume true for _, vertex := range obj.topologicalSort { node := obj.state[vertex] - if !node.loaded { + node.mutex.RLock() + nodeLoaded := node.loaded + node.mutex.RUnlock() + if !nodeLoaded { loaded = false // we were wrong break } @@ -585,6 +599,32 @@ func (obj *Engine) agDone(vertex pgraph.Vertex) { } } +// RLock takes a read lock on the data that gets written to the AST, so that +// interpret can be run without anything changing part way through. +func (obj *Engine) RLock() { + obj.mutex.RLock() +} + +// RUnlock frees a read lock on the data that gets written to the AST, so that +// interpret can be run without anything changing part way through. +func (obj *Engine) RUnlock() { + obj.mutex.RUnlock() +} + +// SafeLogf logs a message, although it adds a read lock around the logging in +// case a `node` argument is passed in which would set off the race detector. +func (obj *Engine) SafeLogf(format string, v ...interface{}) { + // We're adding a global mutex, because it's harder to only isolate the + // individual node specific mutexes needed since it may contain others! + if len(v) > 0 { + obj.mutex.RLock() + } + obj.Logf(format, v...) + if len(v) > 0 { + obj.mutex.RUnlock() + } +} + // Stream returns a channel of engine events. Wait for nil events to know when // the Table map has changed. An error event means this will shutdown shortly. // Do not run the Table function before we've received one non-error event. diff --git a/lang/lang.go b/lang/lang.go index de897203..a311b40e 100644 --- a/lang/lang.go +++ b/lang/lang.go @@ -247,8 +247,14 @@ func (obj *Lang) Interpret() (*pgraph.Graph, error) { } log.Printf("%s: Running interpret...", Name) + if obj.funcs != nil { // no need to rlock if we have a static graph + obj.funcs.RLock() + } // this call returns the graph graph, err := interpret(obj.ast) + if obj.funcs != nil { + obj.funcs.RUnlock() + } if err != nil { return nil, errwrap.Wrapf(err, "could not interpret") } diff --git a/lang/lang_test.go b/lang/lang_test.go index 74869e9f..355a07aa 100644 --- a/lang/lang_test.go +++ b/lang/lang_test.go @@ -93,6 +93,7 @@ func runInterpret(code string) (*pgraph.Graph, error) { return errwrap.Wrapf(lang.Close(), "close failed") } + // we only wait for the first event, instead of the continuous stream select { case err, ok := <-lang.Stream(): if !ok { @@ -103,7 +104,7 @@ func runInterpret(code string) (*pgraph.Graph, error) { } } - // run artificially without the entire engine + // run artificially without the entire GAPI loop graph, err := lang.Interpret() if err != nil { err := errwrap.Wrapf(err, "interpret failed") diff --git a/test.sh b/test.sh index 9d71d73f..e2fe6523 100755 --- a/test.sh +++ b/test.sh @@ -60,14 +60,14 @@ run-testsuite ./test/test-gotest.sh # do these longer tests only when running on ci if env | grep -q -e '^TRAVIS=true$' -e '^JENKINS_URL=' -e '^BUILD_TAG=jenkins'; then run-testsuite ./test/test-shell.sh - skip-testsuite ./test/test-gotest.sh --race # XXX: temporarily disabled... + run-testsuite ./test/test-gotest.sh --race run-testsuite ./test/test-integration.sh - skip-testsuite ./test/test-integration.sh --race # XXX: temporarily disabled... + run-testsuite ./test/test-integration.sh --race else REASON="CI server only test" skip-testsuite ./test/test-shell.sh - REASON="CI server only test" skip-testsuite ./test/test-gotest.sh --race # XXX: temporarily disabled... + REASON="CI server only test" skip-testsuite ./test/test-gotest.sh --race REASON="CI server only test" skip-testsuite ./test/test-integration.sh - REASON="CI server only test" skip-testsuite ./test/test-integration.sh --race # XXX: temporarily disabled... + REASON="CI server only test" skip-testsuite ./test/test-integration.sh --race fi run-testsuite ./test/test-gometalinter.sh