engine: graph: Add err mutex

Here's a race that pops up. This is suboptimal locking, but it's not
important for now.
This commit is contained in:
James Shubin
2025-09-09 01:57:37 -04:00
parent a070722937
commit fb8958f192

View File

@@ -91,6 +91,8 @@ type Engine struct {
paused bool // are we paused?
fastPause *atomic.Bool
isClosing bool // are we shutting down?
errMutex *sync.Mutex // wraps the *state workerErr (one mutex for all)
}
// Init initializes the internal structures and starts this the graph running.
@@ -133,6 +135,8 @@ func (obj *Engine) Init() error {
obj.paused = true // start off true, so we can Resume after first Commit
obj.fastPause = &atomic.Bool{}
obj.errMutex = &sync.Mutex{}
obj.Exporter = &Exporter{
World: obj.World,
Debug: obj.Debug,
@@ -311,7 +315,9 @@ func (obj *Engine) Commit() error {
obj.Logf("%s: Exited...", v)
}
}
obj.errMutex.Lock()
obj.state[v].workerErr = err // store the error
obj.errMutex.Unlock()
// If the Rewatch metaparam is true, then this will get
// restarted if we do a graph cmp swap. This is why the
// graph cmp function runs the removes before the adds.
@@ -379,12 +385,15 @@ func (obj *Engine) Commit() error {
s1, ok1 := obj.state[v1]
s2, ok2 := obj.state[v2]
x1, x2 := false, false
// no need to have different mutexes for each state atm
obj.errMutex.Lock()
if ok1 {
x1 = s1.workerErr != nil && swap1
}
if ok2 {
x2 = s2.workerErr != nil && swap2
}
obj.errMutex.Unlock()
if x1 || x2 {
// We swap, even if they're the same, so that we reload!