From c2f508e2619fcc319728b69f996f32247508a1aa Mon Sep 17 00:00:00 2001 From: James Shubin Date: Tue, 2 Jan 2024 18:01:45 -0500 Subject: [PATCH] engine: graph: Use an rwmutex around read/write of vertex state This fixes two small races we had while simultaneously reading from and writing to the vertex timestamp, and simultaneously writing to the vertex isStateOK (dirty) flag. They were actually "safe" races, in that it doesn't matter if the read/write race got the old or new value, or that the double write happened. The time sequencing was correct (I believe) in both cases, but this triggers the race detector now that we have tests for it. --- engine/graph/actions.go | 15 ++++++++++----- engine/graph/state.go | 7 ++++++- lang/interpret_test/TestAstFunc3/race.txtar | 21 +++++++++++++++++++++ 3 files changed, 37 insertions(+), 6 deletions(-) create mode 100644 lang/interpret_test/TestAstFunc3/race.txtar diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 85b23a05..58c8313c 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -41,14 +41,18 @@ func (obj *Engine) OKTimestamp(vertex pgraph.Vertex) bool { // be bad. func (obj *Engine) BadTimestamps(vertex pgraph.Vertex) []pgraph.Vertex { vs := []pgraph.Vertex{} - ts := obj.state[vertex].timestamp + obj.state[vertex].mutex.RLock() // concurrent read start + ts := obj.state[vertex].timestamp // race + obj.state[vertex].mutex.RUnlock() // concurrent read end // these are all the vertices pointing TO vertex, eg: ??? -> vertex for _, v := range obj.graph.IncomingGraphVertices(vertex) { // If the vertex has a greater timestamp than any prerequisite, // then we can't run right now. If they're equal (eg: initially // with a value of 0) then we also can't run because we should // let our pre-requisites go first. - t := obj.state[v].timestamp + obj.state[v].mutex.RLock() // concurrent read start + t := obj.state[v].timestamp // race + obj.state[v].mutex.RUnlock() // concurrent read end if obj.Debug { obj.Logf("OKTimestamp: %d >= %d (%s): !%t", ts, t, v.String(), ts >= t) } @@ -132,8 +136,7 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { } // if changed == true, at least one was updated // invalidate cache, mark as dirty - obj.state[v].tuid.StopTimer() - obj.state[v].isStateOK = false + obj.state[v].setDirty() //break // we might have more vertices now } @@ -227,7 +230,9 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { wg := &sync.WaitGroup{} // update this timestamp *before* we poke or the poked // nodes might fail due to having a too old timestamp! - obj.state[vertex].timestamp = time.Now().UnixNano() // update timestamp + obj.state[vertex].mutex.Lock() // concurrent write start + obj.state[vertex].timestamp = time.Now().UnixNano() // update timestamp (race) + obj.state[vertex].mutex.Unlock() // concurrent write end for _, v := range obj.graph.OutgoingGraphVertices(vertex) { if !obj.OKTimestamp(v) { // there is at least another one that will poke this... diff --git a/engine/graph/state.go b/engine/graph/state.go index 33cbff10..324e0b9e 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -64,6 +64,8 @@ type State struct { isStateOK bool // is state OK or do we need to run CheckApply ? workerErr error // did the Worker error? + mutex *sync.RWMutex // used for editing state properties + // doneCtx is cancelled when Watch should shut down. When any of the // following channels close, it causes this to close. doneCtx context.Context @@ -143,6 +145,7 @@ func (obj *State) Init() error { return fmt.Errorf("the Logf function is missing") } + obj.mutex = &sync.RWMutex{} obj.doneCtx, obj.doneCtxCancel = context.WithCancel(context.Background()) obj.processDone = make(chan struct{}) @@ -387,7 +390,9 @@ func (obj *State) event() { // CheckApply will have some work to do in order to converge it. func (obj *State) setDirty() { obj.tuid.StopTimer() - obj.isStateOK = false + obj.mutex.Lock() + obj.isStateOK = false // concurrent write + obj.mutex.Unlock() } // poll is a replacement for Watch when the Poll metaparameter is used. diff --git a/lang/interpret_test/TestAstFunc3/race.txtar b/lang/interpret_test/TestAstFunc3/race.txtar new file mode 100644 index 00000000..be2549d7 --- /dev/null +++ b/lang/interpret_test/TestAstFunc3/race.txtar @@ -0,0 +1,21 @@ +-- main.mcl -- +# check for timestamp or other state races in the resource engine +print "print1" { + msg => "i am print1", + + Meta:autogroup => false, +} + +print "print2" { + msg => "i am print2", + + Meta:autogroup => false, + + Depend => Print["print1"], +} +-- OUTPUT -- +Edge: print[print1] -> print[print2] # print[print1] -> print[print2] +Field: print[print1].Msg = "i am print1" +Field: print[print2].Msg = "i am print2" +Vertex: print[print1] +Vertex: print[print2]