engine: graph: Use an atomic bool instead of a mutex
The isStateOK variable can be accessed concurrently as these are supposed to be "benign" races. As such, they need to be labelled as such so that we don't hit some undefined compiler behaviour. Here are five good references relating to "benign" data races in golang. 1) https://web.archive.org/web/20181022150257/https://software.intel.com/en-us/blogs/2013/01/06/benign-data-races-what-could-possibly-go-wrong 2) https://go.dev/ref/mem - "Informal Overview" section. 3) https://docs.oracle.com/cd/E19205-01/820-0619/gecqt/index.html 4) https://www.usenix.org/legacy/event/hotpar11/tech/final_files/Boehm.pdf 5) https://go.dev/doc/articles/race_detector TL;DR: wrap your benign races with sync/atomic or eliminate them.
This commit is contained in:
@@ -165,7 +165,7 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
|
|||||||
// Check cached state, to skip CheckApply, but can't skip if refreshing!
|
// Check cached state, to skip CheckApply, but can't skip if refreshing!
|
||||||
// If the resource doesn't implement refresh, skip the refresh test.
|
// If the resource doesn't implement refresh, skip the refresh test.
|
||||||
// FIXME: if desired, check that we pass through refresh notifications!
|
// FIXME: if desired, check that we pass through refresh notifications!
|
||||||
if (!refresh || !isRefreshableRes) && obj.state[vertex].isStateOK {
|
if (!refresh || !isRefreshableRes) && obj.state[vertex].isStateOK.Load() { // mutex RLock/RUnlock
|
||||||
checkOK, err = true, nil
|
checkOK, err = true, nil
|
||||||
|
|
||||||
} else if noop && (refresh && isRefreshableRes) { // had a refresh to do w/ noop!
|
} else if noop && (refresh && isRefreshableRes) { // had a refresh to do w/ noop!
|
||||||
@@ -193,7 +193,9 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
|
|||||||
// if CheckApply ran without noop and without error, state should be good
|
// if CheckApply ran without noop and without error, state should be good
|
||||||
if !noop && err == nil { // aka !noop || checkOK
|
if !noop && err == nil { // aka !noop || checkOK
|
||||||
obj.state[vertex].tuid.StartTimer()
|
obj.state[vertex].tuid.StartTimer()
|
||||||
obj.state[vertex].isStateOK = true // reset
|
//obj.state[vertex].mutex.Lock()
|
||||||
|
obj.state[vertex].isStateOK.Store(true) // reset
|
||||||
|
//obj.state[vertex].mutex.Unlock()
|
||||||
if refresh {
|
if refresh {
|
||||||
obj.SetUpstreamRefresh(vertex, false) // refresh happened, clear the request
|
obj.SetUpstreamRefresh(vertex, false) // refresh happened, clear the request
|
||||||
if isRefreshableRes {
|
if isRefreshableRes {
|
||||||
|
|||||||
@@ -246,7 +246,7 @@ func (obj *State) ReversalCleanup() error {
|
|||||||
return nil // nothing to erase, we're not a reversal resource
|
return nil // nothing to erase, we're not a reversal resource
|
||||||
}
|
}
|
||||||
|
|
||||||
if !obj.isStateOK { // did we successfully reverse?
|
if !obj.isStateOK.Load() { // did we successfully reverse? (mutex RLock/RUnlock)
|
||||||
obj.Logf("did not complete reversal") // warn
|
obj.Logf("did not complete reversal") // warn
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/converger"
|
"github.com/purpleidea/mgmt/converger"
|
||||||
@@ -60,9 +61,9 @@ type State struct {
|
|||||||
// Logf is the logging function that should be used to display messages.
|
// Logf is the logging function that should be used to display messages.
|
||||||
Logf func(format string, v ...interface{})
|
Logf func(format string, v ...interface{})
|
||||||
|
|
||||||
timestamp int64 // last updated timestamp
|
timestamp int64 // last updated timestamp
|
||||||
isStateOK bool // is state OK or do we need to run CheckApply ?
|
isStateOK *atomic.Bool // is state OK or do we need to run CheckApply ?
|
||||||
workerErr error // did the Worker error?
|
workerErr error // did the Worker error?
|
||||||
|
|
||||||
mutex *sync.RWMutex // used for editing state properties
|
mutex *sync.RWMutex // used for editing state properties
|
||||||
|
|
||||||
@@ -145,6 +146,8 @@ func (obj *State) Init() error {
|
|||||||
return fmt.Errorf("the Logf function is missing")
|
return fmt.Errorf("the Logf function is missing")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
obj.isStateOK = &atomic.Bool{}
|
||||||
|
|
||||||
obj.mutex = &sync.RWMutex{}
|
obj.mutex = &sync.RWMutex{}
|
||||||
obj.doneCtx, obj.doneCtxCancel = context.WithCancel(context.Background())
|
obj.doneCtx, obj.doneCtxCancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
@@ -390,9 +393,9 @@ func (obj *State) event() {
|
|||||||
// CheckApply will have some work to do in order to converge it.
|
// CheckApply will have some work to do in order to converge it.
|
||||||
func (obj *State) setDirty() {
|
func (obj *State) setDirty() {
|
||||||
obj.tuid.StopTimer()
|
obj.tuid.StopTimer()
|
||||||
obj.mutex.Lock()
|
//obj.mutex.Lock()
|
||||||
obj.isStateOK = false // concurrent write
|
obj.isStateOK.Store(false) // concurrent write
|
||||||
obj.mutex.Unlock()
|
//obj.mutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// poll is a replacement for Watch when the Poll metaparameter is used.
|
// poll is a replacement for Watch when the Poll metaparameter is used.
|
||||||
|
|||||||
Reference in New Issue
Block a user