lang: Add mutex around Expr String/Value/SetValue calls
The golang race detector complains about some unimportant races, and as a result, this patch adds some mutexes to prevent these test failures. We actually lock more than necessary, because a more accurate version would be more time consuming to implement. Secondarily, it's likely that in the future we replace this function graph algorithm with something that is guaranteed to be glitch-free and supports back pressure.
This commit is contained in:
@@ -47,6 +47,8 @@ type State struct {
|
||||
|
||||
input chan types.Value // the top level type must be a struct
|
||||
output chan types.Value
|
||||
|
||||
mutex *sync.RWMutex // concurrency guard for modifying Expr with String/SetValue
|
||||
}
|
||||
|
||||
// Init creates the function state if it can be found in the registered list.
|
||||
@@ -72,11 +74,17 @@ func (obj *State) Init() error {
|
||||
obj.input = make(chan types.Value) // we close this when we're done
|
||||
obj.output = make(chan types.Value) // we create it, func closes it
|
||||
|
||||
obj.mutex = &sync.RWMutex{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// String satisfies fmt.Stringer so that these print nicely.
|
||||
func (obj *State) String() string {
|
||||
// TODO: use global mutex since it's harder to add state specific mutex
|
||||
//obj.mutex.RLock() // prevent race detector issues against SetValue
|
||||
//defer obj.mutex.RUnlock()
|
||||
// FIXME: also add read locks on any of the children Expr in obj.Expr
|
||||
return obj.Expr.String()
|
||||
}
|
||||
|
||||
@@ -323,7 +331,7 @@ func (obj *Engine) Run() error {
|
||||
for _, vertex := range obj.topologicalSort {
|
||||
node := obj.state[vertex]
|
||||
if obj.Debug {
|
||||
obj.Logf("Startup func `%s`", node)
|
||||
obj.SafeLogf("Startup func `%s`", node)
|
||||
}
|
||||
|
||||
incoming := obj.Graph.IncomingGraphVertices(vertex) // []Vertex
|
||||
@@ -406,11 +414,11 @@ func (obj *Engine) Run() error {
|
||||
node := obj.state[vertex]
|
||||
defer obj.wg.Done()
|
||||
if obj.Debug {
|
||||
obj.Logf("Running func `%s`", node)
|
||||
obj.SafeLogf("Running func `%s`", node)
|
||||
}
|
||||
err := node.handle.Stream()
|
||||
if obj.Debug {
|
||||
obj.Logf("Exiting func `%s`", node)
|
||||
obj.SafeLogf("Exiting func `%s`", node)
|
||||
}
|
||||
if err != nil {
|
||||
// we closed with an error...
|
||||
@@ -450,12 +458,15 @@ func (obj *Engine) Run() error {
|
||||
obj.mutex.Lock()
|
||||
// XXX: maybe we can get rid of the table...
|
||||
obj.table[vertex] = value // save the latest
|
||||
node.mutex.Lock()
|
||||
if err := node.Expr.SetValue(value); err != nil {
|
||||
node.mutex.Unlock() // don't block node.String()
|
||||
panic(fmt.Sprintf("could not set value for `%s`: %+v", node, err))
|
||||
}
|
||||
node.loaded = true // set *after* value is in :)
|
||||
obj.mutex.Unlock()
|
||||
obj.Logf("func `%s` changed", node)
|
||||
node.mutex.Unlock()
|
||||
obj.mutex.Unlock()
|
||||
|
||||
// FIXME: will this actually prevent glitching?
|
||||
// if we only notify the aggregate channel when
|
||||
@@ -507,7 +518,10 @@ func (obj *Engine) Run() error {
|
||||
var loaded = true // initially assume true
|
||||
for _, vertex := range obj.topologicalSort {
|
||||
node := obj.state[vertex]
|
||||
if !node.loaded {
|
||||
node.mutex.RLock()
|
||||
nodeLoaded := node.loaded
|
||||
node.mutex.RUnlock()
|
||||
if !nodeLoaded {
|
||||
loaded = false // we were wrong
|
||||
break
|
||||
}
|
||||
@@ -585,6 +599,32 @@ func (obj *Engine) agDone(vertex pgraph.Vertex) {
|
||||
}
|
||||
}
|
||||
|
||||
// RLock takes a read lock on the data that gets written to the AST, so that
|
||||
// interpret can be run without anything changing part way through.
|
||||
func (obj *Engine) RLock() {
|
||||
obj.mutex.RLock()
|
||||
}
|
||||
|
||||
// RUnlock frees a read lock on the data that gets written to the AST, so that
|
||||
// interpret can be run without anything changing part way through.
|
||||
func (obj *Engine) RUnlock() {
|
||||
obj.mutex.RUnlock()
|
||||
}
|
||||
|
||||
// SafeLogf logs a message, although it adds a read lock around the logging in
|
||||
// case a `node` argument is passed in which would set off the race detector.
|
||||
func (obj *Engine) SafeLogf(format string, v ...interface{}) {
|
||||
// We're adding a global mutex, because it's harder to only isolate the
|
||||
// individual node specific mutexes needed since it may contain others!
|
||||
if len(v) > 0 {
|
||||
obj.mutex.RLock()
|
||||
}
|
||||
obj.Logf(format, v...)
|
||||
if len(v) > 0 {
|
||||
obj.mutex.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Stream returns a channel of engine events. Wait for nil events to know when
|
||||
// the Table map has changed. An error event means this will shutdown shortly.
|
||||
// Do not run the Table function before we've received one non-error event.
|
||||
|
||||
@@ -247,8 +247,14 @@ func (obj *Lang) Interpret() (*pgraph.Graph, error) {
|
||||
}
|
||||
|
||||
log.Printf("%s: Running interpret...", Name)
|
||||
if obj.funcs != nil { // no need to rlock if we have a static graph
|
||||
obj.funcs.RLock()
|
||||
}
|
||||
// this call returns the graph
|
||||
graph, err := interpret(obj.ast)
|
||||
if obj.funcs != nil {
|
||||
obj.funcs.RUnlock()
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errwrap.Wrapf(err, "could not interpret")
|
||||
}
|
||||
|
||||
@@ -93,6 +93,7 @@ func runInterpret(code string) (*pgraph.Graph, error) {
|
||||
return errwrap.Wrapf(lang.Close(), "close failed")
|
||||
}
|
||||
|
||||
// we only wait for the first event, instead of the continuous stream
|
||||
select {
|
||||
case err, ok := <-lang.Stream():
|
||||
if !ok {
|
||||
@@ -103,7 +104,7 @@ func runInterpret(code string) (*pgraph.Graph, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// run artificially without the entire engine
|
||||
// run artificially without the entire GAPI loop
|
||||
graph, err := lang.Interpret()
|
||||
if err != nil {
|
||||
err := errwrap.Wrapf(err, "interpret failed")
|
||||
|
||||
8
test.sh
8
test.sh
@@ -60,14 +60,14 @@ run-testsuite ./test/test-gotest.sh
|
||||
# do these longer tests only when running on ci
|
||||
if env | grep -q -e '^TRAVIS=true$' -e '^JENKINS_URL=' -e '^BUILD_TAG=jenkins'; then
|
||||
run-testsuite ./test/test-shell.sh
|
||||
skip-testsuite ./test/test-gotest.sh --race # XXX: temporarily disabled...
|
||||
run-testsuite ./test/test-gotest.sh --race
|
||||
run-testsuite ./test/test-integration.sh
|
||||
skip-testsuite ./test/test-integration.sh --race # XXX: temporarily disabled...
|
||||
run-testsuite ./test/test-integration.sh --race
|
||||
else
|
||||
REASON="CI server only test" skip-testsuite ./test/test-shell.sh
|
||||
REASON="CI server only test" skip-testsuite ./test/test-gotest.sh --race # XXX: temporarily disabled...
|
||||
REASON="CI server only test" skip-testsuite ./test/test-gotest.sh --race
|
||||
REASON="CI server only test" skip-testsuite ./test/test-integration.sh
|
||||
REASON="CI server only test" skip-testsuite ./test/test-integration.sh --race # XXX: temporarily disabled...
|
||||
REASON="CI server only test" skip-testsuite ./test/test-integration.sh --race
|
||||
fi
|
||||
|
||||
run-testsuite ./test/test-gometalinter.sh
|
||||
|
||||
Reference in New Issue
Block a user