engine: graph: Use an rwmutex around read/write of vertex state

This fixes two small races we had while simultaneously reading from and
writing to the vertex timestamp, and simultaneously writing to the
vertex isStateOK (dirty) flag.

They were actually "safe" races, in that it doesn't matter if the
read/write race got the old or new value, or that the double write
happened. The time sequencing was correct (I believe) in both cases, but
this triggers the race detector now that we have tests for it.
This commit is contained in:
James Shubin
2024-01-02 18:01:45 -05:00
parent a07dc0a511
commit c2f508e261
3 changed files with 37 additions and 6 deletions

View File

@@ -41,14 +41,18 @@ func (obj *Engine) OKTimestamp(vertex pgraph.Vertex) bool {
// be bad.
func (obj *Engine) BadTimestamps(vertex pgraph.Vertex) []pgraph.Vertex {
vs := []pgraph.Vertex{}
ts := obj.state[vertex].timestamp
obj.state[vertex].mutex.RLock() // concurrent read start
ts := obj.state[vertex].timestamp // race
obj.state[vertex].mutex.RUnlock() // concurrent read end
// these are all the vertices pointing TO vertex, eg: ??? -> vertex
for _, v := range obj.graph.IncomingGraphVertices(vertex) {
// If the vertex has a greater timestamp than any prerequisite,
// then we can't run right now. If they're equal (eg: initially
// with a value of 0) then we also can't run because we should
// let our pre-requisites go first.
t := obj.state[v].timestamp
obj.state[v].mutex.RLock() // concurrent read start
t := obj.state[v].timestamp // race
obj.state[v].mutex.RUnlock() // concurrent read end
if obj.Debug {
obj.Logf("OKTimestamp: %d >= %d (%s): !%t", ts, t, v.String(), ts >= t)
}
@@ -132,8 +136,7 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
}
// if changed == true, at least one was updated
// invalidate cache, mark as dirty
obj.state[v].tuid.StopTimer()
obj.state[v].isStateOK = false
obj.state[v].setDirty()
//break // we might have more vertices now
}
@@ -227,7 +230,9 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
wg := &sync.WaitGroup{}
// update this timestamp *before* we poke or the poked
// nodes might fail due to having a too old timestamp!
obj.state[vertex].timestamp = time.Now().UnixNano() // update timestamp
obj.state[vertex].mutex.Lock() // concurrent write start
obj.state[vertex].timestamp = time.Now().UnixNano() // update timestamp (race)
obj.state[vertex].mutex.Unlock() // concurrent write end
for _, v := range obj.graph.OutgoingGraphVertices(vertex) {
if !obj.OKTimestamp(v) {
// there is at least another one that will poke this...

View File

@@ -64,6 +64,8 @@ type State struct {
isStateOK bool // is state OK or do we need to run CheckApply ?
workerErr error // did the Worker error?
mutex *sync.RWMutex // used for editing state properties
// doneCtx is cancelled when Watch should shut down. When any of the
// following channels close, it causes this to close.
doneCtx context.Context
@@ -143,6 +145,7 @@ func (obj *State) Init() error {
return fmt.Errorf("the Logf function is missing")
}
obj.mutex = &sync.RWMutex{}
obj.doneCtx, obj.doneCtxCancel = context.WithCancel(context.Background())
obj.processDone = make(chan struct{})
@@ -387,7 +390,9 @@ func (obj *State) event() {
// CheckApply will have some work to do in order to converge it.
func (obj *State) setDirty() {
obj.tuid.StopTimer()
obj.isStateOK = false
obj.mutex.Lock()
obj.isStateOK = false // concurrent write
obj.mutex.Unlock()
}
// poll is a replacement for Watch when the Poll metaparameter is used.