engine: graph: Avoid race on fast pause variable
This code is basically unused, but let's keep it in for now in case we eventually replace it with some contextual ctx code instead.
This commit is contained in:
@@ -376,7 +376,7 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
|
|||||||
// so that the graph doesn't go on running forever until
|
// so that the graph doesn't go on running forever until
|
||||||
// it's completely done. This is an optional feature and
|
// it's completely done. This is an optional feature and
|
||||||
// we can select it via ^C on user exit or via the GAPI.
|
// we can select it via ^C on user exit or via the GAPI.
|
||||||
if obj.fastPause {
|
if obj.fastPause.Load() {
|
||||||
obj.Logf("%s: fast pausing, poke skipped", res)
|
obj.Logf("%s: fast pausing, poke skipped", res)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/converger"
|
"github.com/purpleidea/mgmt/converger"
|
||||||
"github.com/purpleidea/mgmt/engine"
|
"github.com/purpleidea/mgmt/engine"
|
||||||
@@ -88,7 +89,7 @@ type Engine struct {
|
|||||||
wg *sync.WaitGroup // wg for the whole engine (only used for close)
|
wg *sync.WaitGroup // wg for the whole engine (only used for close)
|
||||||
|
|
||||||
paused bool // are we paused?
|
paused bool // are we paused?
|
||||||
fastPause bool
|
fastPause *atomic.Bool
|
||||||
isClosing bool // are we shutting down?
|
isClosing bool // are we shutting down?
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -130,6 +131,7 @@ func (obj *Engine) Init() error {
|
|||||||
obj.wg = &sync.WaitGroup{}
|
obj.wg = &sync.WaitGroup{}
|
||||||
|
|
||||||
obj.paused = true // start off true, so we can Resume after first Commit
|
obj.paused = true // start off true, so we can Resume after first Commit
|
||||||
|
obj.fastPause = &atomic.Bool{}
|
||||||
|
|
||||||
obj.Exporter = &Exporter{
|
obj.Exporter = &Exporter{
|
||||||
World: obj.World,
|
World: obj.World,
|
||||||
@@ -502,7 +504,7 @@ func (obj *Engine) Resume() error {
|
|||||||
// poke. In general this is only called when you're trying to hurry up the exit.
|
// poke. In general this is only called when you're trying to hurry up the exit.
|
||||||
// XXX: Not implemented
|
// XXX: Not implemented
|
||||||
func (obj *Engine) SetFastPause() {
|
func (obj *Engine) SetFastPause() {
|
||||||
obj.fastPause = true
|
obj.fastPause.Store(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pause the active, running graph.
|
// Pause the active, running graph.
|
||||||
@@ -515,7 +517,7 @@ func (obj *Engine) Pause(fastPause bool) error {
|
|||||||
return fmt.Errorf("already paused")
|
return fmt.Errorf("already paused")
|
||||||
}
|
}
|
||||||
|
|
||||||
obj.fastPause = fastPause
|
obj.fastPause.Store(fastPause)
|
||||||
topoSort, _ := obj.graph.TopologicalSort()
|
topoSort, _ := obj.graph.TopologicalSort()
|
||||||
for _, vertex := range topoSort { // squeeze out the events...
|
for _, vertex := range topoSort { // squeeze out the events...
|
||||||
// The Event is sent to an unbuffered channel, so this event is
|
// The Event is sent to an unbuffered channel, so this event is
|
||||||
@@ -528,7 +530,7 @@ func (obj *Engine) Pause(fastPause bool) error {
|
|||||||
obj.paused = true
|
obj.paused = true
|
||||||
|
|
||||||
// we are now completely paused...
|
// we are now completely paused...
|
||||||
obj.fastPause = false // reset
|
obj.fastPause.Store(false) // reset
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user