diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 58c8313c..f7f70417 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -165,7 +165,7 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { // Check cached state, to skip CheckApply, but can't skip if refreshing! // If the resource doesn't implement refresh, skip the refresh test. // FIXME: if desired, check that we pass through refresh notifications! - if (!refresh || !isRefreshableRes) && obj.state[vertex].isStateOK { + if (!refresh || !isRefreshableRes) && obj.state[vertex].isStateOK.Load() { // mutex RLock/RUnlock checkOK, err = true, nil } else if noop && (refresh && isRefreshableRes) { // had a refresh to do w/ noop! @@ -193,7 +193,9 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { // if CheckApply ran without noop and without error, state should be good if !noop && err == nil { // aka !noop || checkOK obj.state[vertex].tuid.StartTimer() - obj.state[vertex].isStateOK = true // reset + //obj.state[vertex].mutex.Lock() + obj.state[vertex].isStateOK.Store(true) // reset + //obj.state[vertex].mutex.Unlock() if refresh { obj.SetUpstreamRefresh(vertex, false) // refresh happened, clear the request if isRefreshableRes { diff --git a/engine/graph/reverse.go b/engine/graph/reverse.go index d5a832cc..2dde9d55 100644 --- a/engine/graph/reverse.go +++ b/engine/graph/reverse.go @@ -246,7 +246,7 @@ func (obj *State) ReversalCleanup() error { return nil // nothing to erase, we're not a reversal resource } - if !obj.isStateOK { // did we successfully reverse? + if !obj.isStateOK.Load() { // did we successfully reverse? (mutex RLock/RUnlock) obj.Logf("did not complete reversal") // warn return nil } diff --git a/engine/graph/state.go b/engine/graph/state.go index 324e0b9e..2a5275e6 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/purpleidea/mgmt/converger" @@ -60,9 +61,9 @@ type State struct { // Logf is the logging function that should be used to display messages. Logf func(format string, v ...interface{}) - timestamp int64 // last updated timestamp - isStateOK bool // is state OK or do we need to run CheckApply ? - workerErr error // did the Worker error? + timestamp int64 // last updated timestamp + isStateOK *atomic.Bool // is state OK or do we need to run CheckApply ? + workerErr error // did the Worker error? mutex *sync.RWMutex // used for editing state properties @@ -145,6 +146,8 @@ func (obj *State) Init() error { return fmt.Errorf("the Logf function is missing") } + obj.isStateOK = &atomic.Bool{} + obj.mutex = &sync.RWMutex{} obj.doneCtx, obj.doneCtxCancel = context.WithCancel(context.Background()) @@ -390,9 +393,9 @@ func (obj *State) event() { // CheckApply will have some work to do in order to converge it. func (obj *State) setDirty() { obj.tuid.StopTimer() - obj.mutex.Lock() - obj.isStateOK = false // concurrent write - obj.mutex.Unlock() + //obj.mutex.Lock() + obj.isStateOK.Store(false) // concurrent write + //obj.mutex.Unlock() } // poll is a replacement for Watch when the Poll metaparameter is used.