diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 85b23a05..58c8313c 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -41,14 +41,18 @@ func (obj *Engine) OKTimestamp(vertex pgraph.Vertex) bool { // be bad. func (obj *Engine) BadTimestamps(vertex pgraph.Vertex) []pgraph.Vertex { vs := []pgraph.Vertex{} - ts := obj.state[vertex].timestamp + obj.state[vertex].mutex.RLock() // concurrent read start + ts := obj.state[vertex].timestamp // race + obj.state[vertex].mutex.RUnlock() // concurrent read end // these are all the vertices pointing TO vertex, eg: ??? -> vertex for _, v := range obj.graph.IncomingGraphVertices(vertex) { // If the vertex has a greater timestamp than any prerequisite, // then we can't run right now. If they're equal (eg: initially // with a value of 0) then we also can't run because we should // let our pre-requisites go first. - t := obj.state[v].timestamp + obj.state[v].mutex.RLock() // concurrent read start + t := obj.state[v].timestamp // race + obj.state[v].mutex.RUnlock() // concurrent read end if obj.Debug { obj.Logf("OKTimestamp: %d >= %d (%s): !%t", ts, t, v.String(), ts >= t) } @@ -132,8 +136,7 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { } // if changed == true, at least one was updated // invalidate cache, mark as dirty - obj.state[v].tuid.StopTimer() - obj.state[v].isStateOK = false + obj.state[v].setDirty() //break // we might have more vertices now } @@ -227,7 +230,9 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { wg := &sync.WaitGroup{} // update this timestamp *before* we poke or the poked // nodes might fail due to having a too old timestamp! - obj.state[vertex].timestamp = time.Now().UnixNano() // update timestamp + obj.state[vertex].mutex.Lock() // concurrent write start + obj.state[vertex].timestamp = time.Now().UnixNano() // update timestamp (race) + obj.state[vertex].mutex.Unlock() // concurrent write end for _, v := range obj.graph.OutgoingGraphVertices(vertex) { if !obj.OKTimestamp(v) { // there is at least another one that will poke this... diff --git a/engine/graph/state.go b/engine/graph/state.go index 33cbff10..324e0b9e 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -64,6 +64,8 @@ type State struct { isStateOK bool // is state OK or do we need to run CheckApply ? workerErr error // did the Worker error? + mutex *sync.RWMutex // used for editing state properties + // doneCtx is cancelled when Watch should shut down. When any of the // following channels close, it causes this to close. doneCtx context.Context @@ -143,6 +145,7 @@ func (obj *State) Init() error { return fmt.Errorf("the Logf function is missing") } + obj.mutex = &sync.RWMutex{} obj.doneCtx, obj.doneCtxCancel = context.WithCancel(context.Background()) obj.processDone = make(chan struct{}) @@ -387,7 +390,9 @@ func (obj *State) event() { // CheckApply will have some work to do in order to converge it. func (obj *State) setDirty() { obj.tuid.StopTimer() - obj.isStateOK = false + obj.mutex.Lock() + obj.isStateOK = false // concurrent write + obj.mutex.Unlock() } // poll is a replacement for Watch when the Poll metaparameter is used. diff --git a/lang/interpret_test/TestAstFunc3/race.txtar b/lang/interpret_test/TestAstFunc3/race.txtar new file mode 100644 index 00000000..be2549d7 --- /dev/null +++ b/lang/interpret_test/TestAstFunc3/race.txtar @@ -0,0 +1,21 @@ +-- main.mcl -- +# check for timestamp or other state races in the resource engine +print "print1" { + msg => "i am print1", + + Meta:autogroup => false, +} + +print "print2" { + msg => "i am print2", + + Meta:autogroup => false, + + Depend => Print["print1"], +} +-- OUTPUT -- +Edge: print[print1] -> print[print2] # print[print1] -> print[print2] +Field: print[print1].Msg = "i am print1" +Field: print[print2].Msg = "i am print2" +Vertex: print[print1] +Vertex: print[print2]