diff --git a/engine/graph/engine.go b/engine/graph/engine.go index 3c449b1b..a3558660 100644 --- a/engine/graph/engine.go +++ b/engine/graph/engine.go @@ -91,6 +91,8 @@ type Engine struct { paused bool // are we paused? fastPause *atomic.Bool isClosing bool // are we shutting down? + + errMutex *sync.Mutex // wraps the *state workerErr (one mutex for all) } // Init initializes the internal structures and starts this the graph running. @@ -133,6 +135,8 @@ func (obj *Engine) Init() error { obj.paused = true // start off true, so we can Resume after first Commit obj.fastPause = &atomic.Bool{} + obj.errMutex = &sync.Mutex{} + obj.Exporter = &Exporter{ World: obj.World, Debug: obj.Debug, @@ -311,7 +315,9 @@ func (obj *Engine) Commit() error { obj.Logf("%s: Exited...", v) } } + obj.errMutex.Lock() obj.state[v].workerErr = err // store the error + obj.errMutex.Unlock() // If the Rewatch metaparam is true, then this will get // restarted if we do a graph cmp swap. This is why the // graph cmp function runs the removes before the adds. @@ -379,12 +385,15 @@ func (obj *Engine) Commit() error { s1, ok1 := obj.state[v1] s2, ok2 := obj.state[v2] x1, x2 := false, false + // no need to have different mutexes for each state atm + obj.errMutex.Lock() if ok1 { x1 = s1.workerErr != nil && swap1 } if ok2 { x2 = s2.workerErr != nil && swap2 } + obj.errMutex.Unlock() if x1 || x2 { // We swap, even if they're the same, so that we reload!