engine: graph: Lock around frequent read races

These are all "safe" in terms of not ever conflicting, but the golang
memory model weirdness requires an actual lock to avoid race detector
errors.
This commit is contained in:
James Shubin
2025-08-06 16:40:31 -04:00
parent 5692837175
commit 2f860be5fe
2 changed files with 91 additions and 71 deletions

View File

@@ -52,19 +52,27 @@ func (obj *Engine) OKTimestamp(vertex pgraph.Vertex) bool {
// BadTimestamps returns the list of vertices that are causing our timestamp to // BadTimestamps returns the list of vertices that are causing our timestamp to
// be bad. // be bad.
func (obj *Engine) BadTimestamps(vertex pgraph.Vertex) []pgraph.Vertex { func (obj *Engine) BadTimestamps(vertex pgraph.Vertex) []pgraph.Vertex {
obj.tlock.RLock()
state := obj.state[vertex]
obj.tlock.RUnlock()
vs := []pgraph.Vertex{} vs := []pgraph.Vertex{}
obj.state[vertex].mutex.RLock() // concurrent read start state.mutex.RLock() // concurrent read start
ts := obj.state[vertex].timestamp // race ts := state.timestamp // race
obj.state[vertex].mutex.RUnlock() // concurrent read end state.mutex.RUnlock() // concurrent read end
// these are all the vertices pointing TO vertex, eg: ??? -> vertex // these are all the vertices pointing TO vertex, eg: ??? -> vertex
for _, v := range obj.graph.IncomingGraphVertices(vertex) { for _, v := range obj.graph.IncomingGraphVertices(vertex) {
obj.tlock.RLock()
state := obj.state[v]
obj.tlock.RUnlock()
// If the vertex has a greater timestamp than any prerequisite, // If the vertex has a greater timestamp than any prerequisite,
// then we can't run right now. If they're equal (eg: initially // then we can't run right now. If they're equal (eg: initially
// with a value of 0) then we also can't run because we should // with a value of 0) then we also can't run because we should
// let our pre-requisites go first. // let our pre-requisites go first.
obj.state[v].mutex.RLock() // concurrent read start state.mutex.RLock() // concurrent read start
t := obj.state[v].timestamp // race t := state.timestamp // race
obj.state[v].mutex.RUnlock() // concurrent read end state.mutex.RUnlock() // concurrent read end
if obj.Debug { if obj.Debug {
obj.Logf("OKTimestamp: %d >= %d (%s): !%t", ts, t, v.String(), ts >= t) obj.Logf("OKTimestamp: %d >= %d (%s): !%t", ts, t, v.String(), ts >= t)
} }
@@ -83,6 +91,10 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
return fmt.Errorf("vertex is not a Res") return fmt.Errorf("vertex is not a Res")
} }
obj.tlock.RLock()
state := obj.state[vertex]
obj.tlock.RUnlock()
// backpoke! (can be async) // backpoke! (can be async)
if vs := obj.BadTimestamps(vertex); len(vs) > 0 { if vs := obj.BadTimestamps(vertex); len(vs) > 0 {
// back poke in parallel (sync b/c of waitgroup) // back poke in parallel (sync b/c of waitgroup)
@@ -266,7 +278,7 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
// Check cached state, to skip CheckApply, but can't skip if refreshing! // Check cached state, to skip CheckApply, but can't skip if refreshing!
// If the resource doesn't implement refresh, skip the refresh test. // If the resource doesn't implement refresh, skip the refresh test.
// FIXME: if desired, check that we pass through refresh notifications! // FIXME: if desired, check that we pass through refresh notifications!
if (!refresh || !isRefreshableRes) && obj.state[vertex].isStateOK.Load() { // mutex RLock/RUnlock if (!refresh || !isRefreshableRes) && state.isStateOK.Load() { // mutex RLock/RUnlock
checkOK, err = true, nil checkOK, err = true, nil
} else if noop && (refresh && isRefreshableRes) { // had a refresh to do w/ noop! } else if noop && (refresh && isRefreshableRes) { // had a refresh to do w/ noop!
@@ -303,7 +315,7 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
} }
if !checkOK { // something changed, restart timer if !checkOK { // something changed, restart timer
obj.state[vertex].cuid.ResetTimer() // activity! state.cuid.ResetTimer() // activity!
if obj.Debug { if obj.Debug {
obj.Logf("%s: converger: reset timer", res) obj.Logf("%s: converger: reset timer", res)
} }
@@ -311,10 +323,10 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
// if CheckApply ran without noop and without error, state should be good // if CheckApply ran without noop and without error, state should be good
if !noop && err == nil { // aka !noop || checkOK if !noop && err == nil { // aka !noop || checkOK
obj.state[vertex].tuid.StartTimer() state.tuid.StartTimer()
//obj.state[vertex].mutex.Lock() //state.mutex.Lock()
obj.state[vertex].isStateOK.Store(true) // reset state.isStateOK.Store(true) // reset
//obj.state[vertex].mutex.Unlock() //state.mutex.Unlock()
if refresh { if refresh {
obj.SetUpstreamRefresh(vertex, false) // refresh happened, clear the request obj.SetUpstreamRefresh(vertex, false) // refresh happened, clear the request
if isRefreshableRes { if isRefreshableRes {
@@ -351,9 +363,9 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
// update this timestamp *before* we poke or the poked // update this timestamp *before* we poke or the poked
// nodes might fail due to having a too old timestamp! // nodes might fail due to having a too old timestamp!
obj.state[vertex].mutex.Lock() // concurrent write start state.mutex.Lock() // concurrent write start
obj.state[vertex].timestamp = time.Now().UnixNano() // update timestamp (race) state.timestamp = time.Now().UnixNano() // update timestamp (race)
obj.state[vertex].mutex.Unlock() // concurrent write end state.mutex.Unlock() // concurrent write end
for _, v := range obj.graph.OutgoingGraphVertices(vertex) { for _, v := range obj.graph.OutgoingGraphVertices(vertex) {
if !obj.OKTimestamp(v) { if !obj.OKTimestamp(v) {
// there is at least another one that will poke this... // there is at least another one that will poke this...
@@ -394,6 +406,10 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
return fmt.Errorf("vertex is not a resource") return fmt.Errorf("vertex is not a resource")
} }
obj.tlock.RLock()
state := obj.state[vertex]
obj.tlock.RUnlock()
// bonus safety check // bonus safety check
if res.MetaParams().Burst == 0 && !(res.MetaParams().Limit == rate.Inf) { // blocked if res.MetaParams().Burst == 0 && !(res.MetaParams().Limit == rate.Inf) { // blocked
return fmt.Errorf("permanently limited (rate != Inf, burst = 0)") return fmt.Errorf("permanently limited (rate != Inf, burst = 0)")
@@ -419,42 +435,42 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
obj.mlock.Unlock() obj.mlock.Unlock()
} }
//defer close(obj.state[vertex].stopped) // done signal //defer close(state.stopped) // done signal
obj.state[vertex].cuid = obj.Converger.Register() state.cuid = obj.Converger.Register() // XXX RACE READ
obj.state[vertex].tuid = obj.Converger.Register() state.tuid = obj.Converger.Register()
// must wait for all users of the cuid to finish *before* we unregister! // must wait for all users of the cuid to finish *before* we unregister!
// as a result, this defer happens *before* the below wait group Wait... // as a result, this defer happens *before* the below wait group Wait...
defer obj.state[vertex].cuid.Unregister() defer state.cuid.Unregister()
defer obj.state[vertex].tuid.Unregister() defer state.tuid.Unregister()
defer obj.state[vertex].wg.Wait() // this Worker is the last to exit! defer state.wg.Wait() // this Worker is the last to exit!
obj.state[vertex].wg.Add(1) state.wg.Add(1)
go func() { go func() {
defer obj.state[vertex].wg.Done() defer state.wg.Done()
defer close(obj.state[vertex].eventsChan) // we close this on behalf of res defer close(state.eventsChan) // we close this on behalf of res
// This is a close reverse-multiplexer. If any of the channels // This is a close reverse-multiplexer. If any of the channels
// close, then it will cause the doneCtx to cancel. That way, // close, then it will cause the doneCtx to cancel. That way,
// multiple different folks can send a close signal, without // multiple different folks can send a close signal, without
// every worrying about duplicate channel close panics. // every worrying about duplicate channel close panics.
obj.state[vertex].wg.Add(1) state.wg.Add(1)
go func() { go func() {
defer obj.state[vertex].wg.Done() defer state.wg.Done()
// reverse-multiplexer: any close, causes *the* close! // reverse-multiplexer: any close, causes *the* close!
select { select {
case <-obj.state[vertex].processDone: case <-state.processDone:
case <-obj.state[vertex].watchDone: case <-state.watchDone:
case <-obj.state[vertex].limitDone: case <-state.limitDone:
case <-obj.state[vertex].retryDone: case <-state.retryDone:
case <-obj.state[vertex].removeDone: case <-state.removeDone:
case <-obj.state[vertex].eventsDone: case <-state.eventsDone:
} }
// the main "done" signal gets activated here! // the main "done" signal gets activated here!
obj.state[vertex].doneCtxCancel() // cancels doneCtx state.doneCtxCancel() // cancels doneCtx
}() }()
var err error var err error
@@ -466,14 +482,14 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
errDelayExpired := engine.Error("delay exit") errDelayExpired := engine.Error("delay exit")
err = func() error { // slim watch main loop err = func() error { // slim watch main loop
timer := time.NewTimer(time.Duration(delay) * time.Millisecond) timer := time.NewTimer(time.Duration(delay) * time.Millisecond)
defer obj.state[vertex].init.Logf("the Watch delay expired!") defer state.init.Logf("the Watch delay expired!")
defer timer.Stop() // it's nice to cleanup defer timer.Stop() // it's nice to cleanup
for { for {
select { select {
case <-timer.C: // the wait is over case <-timer.C: // the wait is over
return errDelayExpired // special return errDelayExpired // special
case <-obj.state[vertex].doneCtx.Done(): case <-state.doneCtx.Done():
return nil return nil
} }
} }
@@ -488,21 +504,21 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
if obj.Debug { if obj.Debug {
obj.Logf("%s: Hidden", res) obj.Logf("%s: Hidden", res)
} }
obj.state[vertex].cuid.StartTimer() // TODO: Should we do this? state.cuid.StartTimer() // TODO: Should we do this?
err = obj.state[vertex].hidden(obj.state[vertex].doneCtx) err = state.hidden(state.doneCtx)
obj.state[vertex].cuid.StopTimer() // TODO: Should we do this? state.cuid.StopTimer() // TODO: Should we do this?
} else if interval := res.MetaParams().Poll; interval > 0 { // poll instead of watching :( } else if interval := res.MetaParams().Poll; interval > 0 { // poll instead of watching :(
obj.state[vertex].cuid.StartTimer() state.cuid.StartTimer()
err = obj.state[vertex].poll(obj.state[vertex].doneCtx, interval) err = state.poll(state.doneCtx, interval)
obj.state[vertex].cuid.StopTimer() // clean up nicely state.cuid.StopTimer() // clean up nicely
} else { } else {
obj.state[vertex].cuid.StartTimer() state.cuid.StartTimer()
if obj.Debug { if obj.Debug {
obj.Logf("%s: Watch...", vertex) obj.Logf("%s: Watch...", vertex)
} }
err = res.Watch(obj.state[vertex].doneCtx) // run the watch normally err = res.Watch(state.doneCtx) // run the watch normally
if obj.Debug { if obj.Debug {
if s := engineUtil.CleanError(err); err != nil { if s := engineUtil.CleanError(err); err != nil {
obj.Logf("%s: Watch Error: %s", vertex, s) obj.Logf("%s: Watch Error: %s", vertex, s)
@@ -510,7 +526,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
obj.Logf("%s: Watch Exited...", vertex) obj.Logf("%s: Watch Exited...", vertex)
} }
} }
obj.state[vertex].cuid.StopTimer() // clean up nicely state.cuid.StopTimer() // clean up nicely
} }
if err == nil { // || err == engine.ErrClosed if err == nil { // || err == engine.ErrClosed
return // exited cleanly, we're done return // exited cleanly, we're done
@@ -523,7 +539,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
} }
if retry > 0 { // don't decrement past 0 if retry > 0 { // don't decrement past 0
retry-- retry--
obj.state[vertex].init.Logf("retrying Watch after %.4f seconds (%d left)", float64(delay)/1000, retry) state.init.Logf("retrying Watch after %.4f seconds (%d left)", float64(delay)/1000, retry)
continue continue
} }
//if retry == 0 { // optional //if retry == 0 { // optional
@@ -536,14 +552,14 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
// If the CheckApply loop exits and THEN the Watch fails with an // If the CheckApply loop exits and THEN the Watch fails with an
// error, then we'd be stuck here if exit signal didn't unblock! // error, then we'd be stuck here if exit signal didn't unblock!
select { select {
case obj.state[vertex].eventsChan <- errwrap.Wrapf(err, "watch failed"): case state.eventsChan <- errwrap.Wrapf(err, "watch failed"):
// send // send
} }
}() }()
// If this exits cleanly, we must unblock the reverse-multiplexer. // If this exits cleanly, we must unblock the reverse-multiplexer.
// I think this additional close is unnecessary, but it's not harmful. // I think this additional close is unnecessary, but it's not harmful.
defer close(obj.state[vertex].eventsDone) // causes doneCtx to cancel defer close(state.eventsDone) // causes doneCtx to cancel
limiter := rate.NewLimiter(res.MetaParams().Limit, res.MetaParams().Burst) limiter := rate.NewLimiter(res.MetaParams().Limit, res.MetaParams().Burst)
var reserv *rate.Reservation var reserv *rate.Reservation
var reterr error var reterr error
@@ -557,7 +573,7 @@ Loop:
// This select is also the main event receiver and is also the // This select is also the main event receiver and is also the
// only place where we read from the poke channel. // only place where we read from the poke channel.
select { select {
case err, ok := <-obj.state[vertex].eventsChan: // read from watch channel case err, ok := <-state.eventsChan: // read from watch channel
if !ok { if !ok {
return reterr // we only return when chan closes return reterr // we only return when chan closes
} }
@@ -566,7 +582,7 @@ Loop:
// we then save so we can return it to the caller of us. // we then save so we can return it to the caller of us.
if err != nil { if err != nil {
failed = true failed = true
close(obj.state[vertex].watchDone) // causes doneCtx to cancel close(state.watchDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, err) // permanent failure reterr = errwrap.Append(reterr, err) // permanent failure
continue continue
} }
@@ -576,7 +592,7 @@ Loop:
reserv = limiter.ReserveN(time.Now(), 1) // one event reserv = limiter.ReserveN(time.Now(), 1) // one event
// reserv.OK() seems to always be true here! // reserv.OK() seems to always be true here!
case _, ok := <-obj.state[vertex].pokeChan: // read from buffered poke channel case _, ok := <-state.pokeChan: // read from buffered poke channel
if !ok { // we never close it if !ok { // we never close it
panic("unexpected close of poke channel") panic("unexpected close of poke channel")
} }
@@ -585,9 +601,9 @@ Loop:
} }
reserv = nil // we didn't receive a real event here... reserv = nil // we didn't receive a real event here...
case _, ok := <-obj.state[vertex].pauseSignal: // one message case _, ok := <-state.pauseSignal: // one message
if !ok { if !ok {
obj.state[vertex].pauseSignal = nil state.pauseSignal = nil
continue // this is not a new pause message continue // this is not a new pause message
} }
// NOTE: If we allowed a doneCtx below to let us out // NOTE: If we allowed a doneCtx below to let us out
@@ -599,7 +615,7 @@ Loop:
// we are paused now, and waiting for resume or exit... // we are paused now, and waiting for resume or exit...
select { select {
case _, ok := <-obj.state[vertex].resumeSignal: // channel closes case _, ok := <-state.resumeSignal: // channel closes
if !ok { if !ok {
closed = true closed = true
} }
@@ -614,9 +630,9 @@ Loop:
} }
// drop redundant pokes // drop redundant pokes
for len(obj.state[vertex].pokeChan) > 0 { for len(state.pokeChan) > 0 {
select { select {
case <-obj.state[vertex].pokeChan: case <-state.pokeChan:
default: default:
// race, someone else read one! // race, someone else read one!
} }
@@ -633,7 +649,7 @@ Loop:
d = reserv.DelayFrom(time.Now()) d = reserv.DelayFrom(time.Now())
} }
if reserv != nil && d > 0 { // delay if reserv != nil && d > 0 { // delay
obj.state[vertex].init.Logf("limited (rate: %v/sec, burst: %d, next: %dms)", res.MetaParams().Limit, res.MetaParams().Burst, d/time.Millisecond) state.init.Logf("limited (rate: %v/sec, burst: %d, next: %dms)", res.MetaParams().Limit, res.MetaParams().Burst, d/time.Millisecond)
timer := time.NewTimer(time.Duration(d) * time.Millisecond) timer := time.NewTimer(time.Duration(d) * time.Millisecond)
LimitWait: LimitWait:
for { for {
@@ -645,13 +661,13 @@ Loop:
break LimitWait break LimitWait
// consume other events while we're waiting... // consume other events while we're waiting...
case e, ok := <-obj.state[vertex].eventsChan: // read from watch channel case e, ok := <-state.eventsChan: // read from watch channel
if !ok { if !ok {
return reterr // we only return when chan closes return reterr // we only return when chan closes
} }
if e != nil { if e != nil {
failed = true failed = true
close(obj.state[vertex].limitDone) // causes doneCtx to cancel close(state.limitDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, e) // permanent failure reterr = errwrap.Append(reterr, e) // permanent failure
break LimitWait break LimitWait
} }
@@ -662,13 +678,13 @@ Loop:
limiter.ReserveN(time.Now(), 1) // one event limiter.ReserveN(time.Now(), 1) // one event
// this pause/resume block is the same as the upper main one // this pause/resume block is the same as the upper main one
case _, ok := <-obj.state[vertex].pauseSignal: case _, ok := <-state.pauseSignal:
if !ok { if !ok {
obj.state[vertex].pauseSignal = nil state.pauseSignal = nil
break LimitWait break LimitWait
} }
select { select {
case _, ok := <-obj.state[vertex].resumeSignal: // channel closes case _, ok := <-state.resumeSignal: // channel closes
if !ok { if !ok {
closed = true closed = true
} }
@@ -677,7 +693,7 @@ Loop:
} }
} }
timer.Stop() // it's nice to cleanup timer.Stop() // it's nice to cleanup
obj.state[vertex].init.Logf("rate limiting expired!") state.init.Logf("rate limiting expired!")
} }
// don't Process anymore if we've already failed or shutdown... // don't Process anymore if we've already failed or shutdown...
if failed || closed { if failed || closed {
@@ -704,13 +720,13 @@ Loop:
break RetryWait break RetryWait
// consume other events while we're waiting... // consume other events while we're waiting...
case e, ok := <-obj.state[vertex].eventsChan: // read from watch channel case e, ok := <-state.eventsChan: // read from watch channel
if !ok { if !ok {
return reterr // we only return when chan closes return reterr // we only return when chan closes
} }
if e != nil { if e != nil {
failed = true failed = true
close(obj.state[vertex].retryDone) // causes doneCtx to cancel close(state.retryDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, e) // permanent failure reterr = errwrap.Append(reterr, e) // permanent failure
break RetryWait break RetryWait
} }
@@ -721,13 +737,13 @@ Loop:
limiter.ReserveN(time.Now(), 1) // one event limiter.ReserveN(time.Now(), 1) // one event
// this pause/resume block is the same as the upper main one // this pause/resume block is the same as the upper main one
case _, ok := <-obj.state[vertex].pauseSignal: case _, ok := <-state.pauseSignal:
if !ok { if !ok {
obj.state[vertex].pauseSignal = nil state.pauseSignal = nil
break RetryWait break RetryWait
} }
select { select {
case _, ok := <-obj.state[vertex].resumeSignal: // channel closes case _, ok := <-state.resumeSignal: // channel closes
if !ok { if !ok {
closed = true closed = true
} }
@@ -737,7 +753,7 @@ Loop:
} }
timer.Stop() // it's nice to cleanup timer.Stop() // it's nice to cleanup
delay = 0 // reset delay = 0 // reset
obj.state[vertex].init.Logf("the CheckApply delay expired!") state.init.Logf("the CheckApply delay expired!")
} }
// don't Process anymore if we've already failed or shutdown... // don't Process anymore if we've already failed or shutdown...
if failed || closed { if failed || closed {
@@ -748,7 +764,7 @@ Loop:
obj.Logf("Process(%s)", vertex) obj.Logf("Process(%s)", vertex)
} }
backPoke := false backPoke := false
err = obj.Process(obj.state[vertex].doneCtx, vertex) err = obj.Process(state.doneCtx, vertex)
if err == engine.ErrBackPoke { if err == engine.ErrBackPoke {
backPoke = true backPoke = true
err = nil // for future code safety err = nil // for future code safety
@@ -773,7 +789,7 @@ Loop:
} }
if metas.CheckApplyRetry > 0 { // don't decrement past 0 if metas.CheckApplyRetry > 0 { // don't decrement past 0
metas.CheckApplyRetry-- metas.CheckApplyRetry--
obj.state[vertex].init.Logf( state.init.Logf(
"retrying CheckApply after %.4f seconds (%d left)", "retrying CheckApply after %.4f seconds (%d left)",
float64(delay)/1000, float64(delay)/1000,
metas.CheckApplyRetry, metas.CheckApplyRetry,
@@ -788,7 +804,7 @@ Loop:
// this dies. If Process fails permanently, we ask it // this dies. If Process fails permanently, we ask it
// to exit right here... (It happens when we loop...) // to exit right here... (It happens when we loop...)
failed = true failed = true
close(obj.state[vertex].processDone) // causes doneCtx to cancel close(state.processDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, err) // permanent failure reterr = errwrap.Append(reterr, err) // permanent failure
continue continue

View File

@@ -75,6 +75,7 @@ type Engine struct {
graph *pgraph.Graph graph *pgraph.Graph
nextGraph *pgraph.Graph nextGraph *pgraph.Graph
state map[pgraph.Vertex]*State state map[pgraph.Vertex]*State
tlock *sync.RWMutex // lock around state map
waits map[pgraph.Vertex]*sync.WaitGroup // wg for the Worker func waits map[pgraph.Vertex]*sync.WaitGroup // wg for the Worker func
wlock *sync.Mutex // lock around waits map wlock *sync.Mutex // lock around waits map
@@ -116,6 +117,7 @@ func (obj *Engine) Init() error {
} }
obj.state = make(map[pgraph.Vertex]*State) obj.state = make(map[pgraph.Vertex]*State)
obj.tlock = &sync.RWMutex{}
obj.waits = make(map[pgraph.Vertex]*sync.WaitGroup) obj.waits = make(map[pgraph.Vertex]*sync.WaitGroup)
obj.wlock = &sync.Mutex{} obj.wlock = &sync.Mutex{}
@@ -345,7 +347,9 @@ func (obj *Engine) Commit() error {
// delete to free up memory from old graphs // delete to free up memory from old graphs
fn := func() error { fn := func() error {
obj.tlock.Lock()
delete(obj.state, vertex) delete(obj.state, vertex)
obj.tlock.Unlock()
delete(obj.waits, vertex) delete(obj.waits, vertex)
return nil return nil
} }