From 2f860be5fe2ba07e90274942847f16be1ba51a97 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Wed, 6 Aug 2025 16:40:31 -0400 Subject: [PATCH] engine: graph: Lock around frequent read races These are all "safe" in terms of not ever conflicting, but the golang memory model weirdness requires an actual lock to avoid race detector errors. --- engine/graph/actions.go | 158 ++++++++++++++++++++++------------------ engine/graph/engine.go | 4 + 2 files changed, 91 insertions(+), 71 deletions(-) diff --git a/engine/graph/actions.go b/engine/graph/actions.go index dd103cc9..4e80f03a 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -52,19 +52,27 @@ func (obj *Engine) OKTimestamp(vertex pgraph.Vertex) bool { // BadTimestamps returns the list of vertices that are causing our timestamp to // be bad. func (obj *Engine) BadTimestamps(vertex pgraph.Vertex) []pgraph.Vertex { + obj.tlock.RLock() + state := obj.state[vertex] + obj.tlock.RUnlock() + vs := []pgraph.Vertex{} - obj.state[vertex].mutex.RLock() // concurrent read start - ts := obj.state[vertex].timestamp // race - obj.state[vertex].mutex.RUnlock() // concurrent read end + state.mutex.RLock() // concurrent read start + ts := state.timestamp // race + state.mutex.RUnlock() // concurrent read end // these are all the vertices pointing TO vertex, eg: ??? -> vertex for _, v := range obj.graph.IncomingGraphVertices(vertex) { + obj.tlock.RLock() + state := obj.state[v] + obj.tlock.RUnlock() + // If the vertex has a greater timestamp than any prerequisite, // then we can't run right now. If they're equal (eg: initially // with a value of 0) then we also can't run because we should // let our pre-requisites go first. - obj.state[v].mutex.RLock() // concurrent read start - t := obj.state[v].timestamp // race - obj.state[v].mutex.RUnlock() // concurrent read end + state.mutex.RLock() // concurrent read start + t := state.timestamp // race + state.mutex.RUnlock() // concurrent read end if obj.Debug { obj.Logf("OKTimestamp: %d >= %d (%s): !%t", ts, t, v.String(), ts >= t) } @@ -83,6 +91,10 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { return fmt.Errorf("vertex is not a Res") } + obj.tlock.RLock() + state := obj.state[vertex] + obj.tlock.RUnlock() + // backpoke! (can be async) if vs := obj.BadTimestamps(vertex); len(vs) > 0 { // back poke in parallel (sync b/c of waitgroup) @@ -266,7 +278,7 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { // Check cached state, to skip CheckApply, but can't skip if refreshing! // If the resource doesn't implement refresh, skip the refresh test. // FIXME: if desired, check that we pass through refresh notifications! - if (!refresh || !isRefreshableRes) && obj.state[vertex].isStateOK.Load() { // mutex RLock/RUnlock + if (!refresh || !isRefreshableRes) && state.isStateOK.Load() { // mutex RLock/RUnlock checkOK, err = true, nil } else if noop && (refresh && isRefreshableRes) { // had a refresh to do w/ noop! @@ -303,7 +315,7 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { } if !checkOK { // something changed, restart timer - obj.state[vertex].cuid.ResetTimer() // activity! + state.cuid.ResetTimer() // activity! if obj.Debug { obj.Logf("%s: converger: reset timer", res) } @@ -311,10 +323,10 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { // if CheckApply ran without noop and without error, state should be good if !noop && err == nil { // aka !noop || checkOK - obj.state[vertex].tuid.StartTimer() - //obj.state[vertex].mutex.Lock() - obj.state[vertex].isStateOK.Store(true) // reset - //obj.state[vertex].mutex.Unlock() + state.tuid.StartTimer() + //state.mutex.Lock() + state.isStateOK.Store(true) // reset + //state.mutex.Unlock() if refresh { obj.SetUpstreamRefresh(vertex, false) // refresh happened, clear the request if isRefreshableRes { @@ -351,9 +363,9 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { wg := &sync.WaitGroup{} // update this timestamp *before* we poke or the poked // nodes might fail due to having a too old timestamp! - obj.state[vertex].mutex.Lock() // concurrent write start - obj.state[vertex].timestamp = time.Now().UnixNano() // update timestamp (race) - obj.state[vertex].mutex.Unlock() // concurrent write end + state.mutex.Lock() // concurrent write start + state.timestamp = time.Now().UnixNano() // update timestamp (race) + state.mutex.Unlock() // concurrent write end for _, v := range obj.graph.OutgoingGraphVertices(vertex) { if !obj.OKTimestamp(v) { // there is at least another one that will poke this... @@ -394,6 +406,10 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { return fmt.Errorf("vertex is not a resource") } + obj.tlock.RLock() + state := obj.state[vertex] + obj.tlock.RUnlock() + // bonus safety check if res.MetaParams().Burst == 0 && !(res.MetaParams().Limit == rate.Inf) { // blocked return fmt.Errorf("permanently limited (rate != Inf, burst = 0)") @@ -419,42 +435,42 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { obj.mlock.Unlock() } - //defer close(obj.state[vertex].stopped) // done signal + //defer close(state.stopped) // done signal - obj.state[vertex].cuid = obj.Converger.Register() - obj.state[vertex].tuid = obj.Converger.Register() + state.cuid = obj.Converger.Register() // XXX RACE READ + state.tuid = obj.Converger.Register() // must wait for all users of the cuid to finish *before* we unregister! // as a result, this defer happens *before* the below wait group Wait... - defer obj.state[vertex].cuid.Unregister() - defer obj.state[vertex].tuid.Unregister() + defer state.cuid.Unregister() + defer state.tuid.Unregister() - defer obj.state[vertex].wg.Wait() // this Worker is the last to exit! + defer state.wg.Wait() // this Worker is the last to exit! - obj.state[vertex].wg.Add(1) + state.wg.Add(1) go func() { - defer obj.state[vertex].wg.Done() - defer close(obj.state[vertex].eventsChan) // we close this on behalf of res + defer state.wg.Done() + defer close(state.eventsChan) // we close this on behalf of res // This is a close reverse-multiplexer. If any of the channels // close, then it will cause the doneCtx to cancel. That way, // multiple different folks can send a close signal, without // every worrying about duplicate channel close panics. - obj.state[vertex].wg.Add(1) + state.wg.Add(1) go func() { - defer obj.state[vertex].wg.Done() + defer state.wg.Done() // reverse-multiplexer: any close, causes *the* close! select { - case <-obj.state[vertex].processDone: - case <-obj.state[vertex].watchDone: - case <-obj.state[vertex].limitDone: - case <-obj.state[vertex].retryDone: - case <-obj.state[vertex].removeDone: - case <-obj.state[vertex].eventsDone: + case <-state.processDone: + case <-state.watchDone: + case <-state.limitDone: + case <-state.retryDone: + case <-state.removeDone: + case <-state.eventsDone: } // the main "done" signal gets activated here! - obj.state[vertex].doneCtxCancel() // cancels doneCtx + state.doneCtxCancel() // cancels doneCtx }() var err error @@ -466,14 +482,14 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { errDelayExpired := engine.Error("delay exit") err = func() error { // slim watch main loop timer := time.NewTimer(time.Duration(delay) * time.Millisecond) - defer obj.state[vertex].init.Logf("the Watch delay expired!") + defer state.init.Logf("the Watch delay expired!") defer timer.Stop() // it's nice to cleanup for { select { case <-timer.C: // the wait is over return errDelayExpired // special - case <-obj.state[vertex].doneCtx.Done(): + case <-state.doneCtx.Done(): return nil } } @@ -488,21 +504,21 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { if obj.Debug { obj.Logf("%s: Hidden", res) } - obj.state[vertex].cuid.StartTimer() // TODO: Should we do this? - err = obj.state[vertex].hidden(obj.state[vertex].doneCtx) - obj.state[vertex].cuid.StopTimer() // TODO: Should we do this? + state.cuid.StartTimer() // TODO: Should we do this? + err = state.hidden(state.doneCtx) + state.cuid.StopTimer() // TODO: Should we do this? } else if interval := res.MetaParams().Poll; interval > 0 { // poll instead of watching :( - obj.state[vertex].cuid.StartTimer() - err = obj.state[vertex].poll(obj.state[vertex].doneCtx, interval) - obj.state[vertex].cuid.StopTimer() // clean up nicely + state.cuid.StartTimer() + err = state.poll(state.doneCtx, interval) + state.cuid.StopTimer() // clean up nicely } else { - obj.state[vertex].cuid.StartTimer() + state.cuid.StartTimer() if obj.Debug { obj.Logf("%s: Watch...", vertex) } - err = res.Watch(obj.state[vertex].doneCtx) // run the watch normally + err = res.Watch(state.doneCtx) // run the watch normally if obj.Debug { if s := engineUtil.CleanError(err); err != nil { obj.Logf("%s: Watch Error: %s", vertex, s) @@ -510,7 +526,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { obj.Logf("%s: Watch Exited...", vertex) } } - obj.state[vertex].cuid.StopTimer() // clean up nicely + state.cuid.StopTimer() // clean up nicely } if err == nil { // || err == engine.ErrClosed return // exited cleanly, we're done @@ -523,7 +539,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { } if retry > 0 { // don't decrement past 0 retry-- - obj.state[vertex].init.Logf("retrying Watch after %.4f seconds (%d left)", float64(delay)/1000, retry) + state.init.Logf("retrying Watch after %.4f seconds (%d left)", float64(delay)/1000, retry) continue } //if retry == 0 { // optional @@ -536,14 +552,14 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { // If the CheckApply loop exits and THEN the Watch fails with an // error, then we'd be stuck here if exit signal didn't unblock! select { - case obj.state[vertex].eventsChan <- errwrap.Wrapf(err, "watch failed"): + case state.eventsChan <- errwrap.Wrapf(err, "watch failed"): // send } }() // If this exits cleanly, we must unblock the reverse-multiplexer. // I think this additional close is unnecessary, but it's not harmful. - defer close(obj.state[vertex].eventsDone) // causes doneCtx to cancel + defer close(state.eventsDone) // causes doneCtx to cancel limiter := rate.NewLimiter(res.MetaParams().Limit, res.MetaParams().Burst) var reserv *rate.Reservation var reterr error @@ -557,7 +573,7 @@ Loop: // This select is also the main event receiver and is also the // only place where we read from the poke channel. select { - case err, ok := <-obj.state[vertex].eventsChan: // read from watch channel + case err, ok := <-state.eventsChan: // read from watch channel if !ok { return reterr // we only return when chan closes } @@ -566,7 +582,7 @@ Loop: // we then save so we can return it to the caller of us. if err != nil { failed = true - close(obj.state[vertex].watchDone) // causes doneCtx to cancel + close(state.watchDone) // causes doneCtx to cancel reterr = errwrap.Append(reterr, err) // permanent failure continue } @@ -576,7 +592,7 @@ Loop: reserv = limiter.ReserveN(time.Now(), 1) // one event // reserv.OK() seems to always be true here! - case _, ok := <-obj.state[vertex].pokeChan: // read from buffered poke channel + case _, ok := <-state.pokeChan: // read from buffered poke channel if !ok { // we never close it panic("unexpected close of poke channel") } @@ -585,9 +601,9 @@ Loop: } reserv = nil // we didn't receive a real event here... - case _, ok := <-obj.state[vertex].pauseSignal: // one message + case _, ok := <-state.pauseSignal: // one message if !ok { - obj.state[vertex].pauseSignal = nil + state.pauseSignal = nil continue // this is not a new pause message } // NOTE: If we allowed a doneCtx below to let us out @@ -599,7 +615,7 @@ Loop: // we are paused now, and waiting for resume or exit... select { - case _, ok := <-obj.state[vertex].resumeSignal: // channel closes + case _, ok := <-state.resumeSignal: // channel closes if !ok { closed = true } @@ -614,9 +630,9 @@ Loop: } // drop redundant pokes - for len(obj.state[vertex].pokeChan) > 0 { + for len(state.pokeChan) > 0 { select { - case <-obj.state[vertex].pokeChan: + case <-state.pokeChan: default: // race, someone else read one! } @@ -633,7 +649,7 @@ Loop: d = reserv.DelayFrom(time.Now()) } if reserv != nil && d > 0 { // delay - obj.state[vertex].init.Logf("limited (rate: %v/sec, burst: %d, next: %dms)", res.MetaParams().Limit, res.MetaParams().Burst, d/time.Millisecond) + state.init.Logf("limited (rate: %v/sec, burst: %d, next: %dms)", res.MetaParams().Limit, res.MetaParams().Burst, d/time.Millisecond) timer := time.NewTimer(time.Duration(d) * time.Millisecond) LimitWait: for { @@ -645,13 +661,13 @@ Loop: break LimitWait // consume other events while we're waiting... - case e, ok := <-obj.state[vertex].eventsChan: // read from watch channel + case e, ok := <-state.eventsChan: // read from watch channel if !ok { return reterr // we only return when chan closes } if e != nil { failed = true - close(obj.state[vertex].limitDone) // causes doneCtx to cancel + close(state.limitDone) // causes doneCtx to cancel reterr = errwrap.Append(reterr, e) // permanent failure break LimitWait } @@ -662,13 +678,13 @@ Loop: limiter.ReserveN(time.Now(), 1) // one event // this pause/resume block is the same as the upper main one - case _, ok := <-obj.state[vertex].pauseSignal: + case _, ok := <-state.pauseSignal: if !ok { - obj.state[vertex].pauseSignal = nil + state.pauseSignal = nil break LimitWait } select { - case _, ok := <-obj.state[vertex].resumeSignal: // channel closes + case _, ok := <-state.resumeSignal: // channel closes if !ok { closed = true } @@ -677,7 +693,7 @@ Loop: } } timer.Stop() // it's nice to cleanup - obj.state[vertex].init.Logf("rate limiting expired!") + state.init.Logf("rate limiting expired!") } // don't Process anymore if we've already failed or shutdown... if failed || closed { @@ -704,13 +720,13 @@ Loop: break RetryWait // consume other events while we're waiting... - case e, ok := <-obj.state[vertex].eventsChan: // read from watch channel + case e, ok := <-state.eventsChan: // read from watch channel if !ok { return reterr // we only return when chan closes } if e != nil { failed = true - close(obj.state[vertex].retryDone) // causes doneCtx to cancel + close(state.retryDone) // causes doneCtx to cancel reterr = errwrap.Append(reterr, e) // permanent failure break RetryWait } @@ -721,13 +737,13 @@ Loop: limiter.ReserveN(time.Now(), 1) // one event // this pause/resume block is the same as the upper main one - case _, ok := <-obj.state[vertex].pauseSignal: + case _, ok := <-state.pauseSignal: if !ok { - obj.state[vertex].pauseSignal = nil + state.pauseSignal = nil break RetryWait } select { - case _, ok := <-obj.state[vertex].resumeSignal: // channel closes + case _, ok := <-state.resumeSignal: // channel closes if !ok { closed = true } @@ -737,7 +753,7 @@ Loop: } timer.Stop() // it's nice to cleanup delay = 0 // reset - obj.state[vertex].init.Logf("the CheckApply delay expired!") + state.init.Logf("the CheckApply delay expired!") } // don't Process anymore if we've already failed or shutdown... if failed || closed { @@ -748,7 +764,7 @@ Loop: obj.Logf("Process(%s)", vertex) } backPoke := false - err = obj.Process(obj.state[vertex].doneCtx, vertex) + err = obj.Process(state.doneCtx, vertex) if err == engine.ErrBackPoke { backPoke = true err = nil // for future code safety @@ -773,7 +789,7 @@ Loop: } if metas.CheckApplyRetry > 0 { // don't decrement past 0 metas.CheckApplyRetry-- - obj.state[vertex].init.Logf( + state.init.Logf( "retrying CheckApply after %.4f seconds (%d left)", float64(delay)/1000, metas.CheckApplyRetry, @@ -788,7 +804,7 @@ Loop: // this dies. If Process fails permanently, we ask it // to exit right here... (It happens when we loop...) failed = true - close(obj.state[vertex].processDone) // causes doneCtx to cancel + close(state.processDone) // causes doneCtx to cancel reterr = errwrap.Append(reterr, err) // permanent failure continue diff --git a/engine/graph/engine.go b/engine/graph/engine.go index 637c0b76..e8381648 100644 --- a/engine/graph/engine.go +++ b/engine/graph/engine.go @@ -75,6 +75,7 @@ type Engine struct { graph *pgraph.Graph nextGraph *pgraph.Graph state map[pgraph.Vertex]*State + tlock *sync.RWMutex // lock around state map waits map[pgraph.Vertex]*sync.WaitGroup // wg for the Worker func wlock *sync.Mutex // lock around waits map @@ -116,6 +117,7 @@ func (obj *Engine) Init() error { } obj.state = make(map[pgraph.Vertex]*State) + obj.tlock = &sync.RWMutex{} obj.waits = make(map[pgraph.Vertex]*sync.WaitGroup) obj.wlock = &sync.Mutex{} @@ -345,7 +347,9 @@ func (obj *Engine) Commit() error { // delete to free up memory from old graphs fn := func() error { + obj.tlock.Lock() delete(obj.state, vertex) + obj.tlock.Unlock() delete(obj.waits, vertex) return nil }