pgraph: Add fast pausing and exiting of graphs

This causes a graph to actually stop processing part way through, even
if there are poke's that want to continue on. This is so that the user
experience of pressing ^C actually causes a shutdown without finishing
the graph execution. It might be preferred to have this be a user
defined setting at some point in the future, such as if the user presses
^C twice. As well, we might want to implement an interrupt API so that
individual resource execution can be asked to bail out early if
requested. This could happen on a third ^C press.
This commit is contained in:
James Shubin
2017-03-13 07:44:01 -04:00
parent 074da4da19
commit cd5e2e1148
7 changed files with 188 additions and 3 deletions

View File

@@ -65,6 +65,14 @@ func (g *Graph) OKTimestamp(v *Vertex) bool {
// Poke tells nodes after me in the dependency graph that they need to refresh.
func (g *Graph) Poke(v *Vertex) error {
// if we're pausing (or exiting) then we should suspend poke's so that
// the graph doesn't go on running forever until it's completely done!
// this is an optional feature which we can do by default on user exit
if g.fastPause {
return nil // TODO: should this be an error instead?
}
var wg sync.WaitGroup
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
for _, n := range g.OutgoingGraphVertices(v) {
@@ -725,14 +733,20 @@ func (g *Graph) Start(first bool) { // start or continue
// we wait for everyone to start before exiting!
}
// Pause sends pause events to the graph in a topological sort order.
func (g *Graph) Pause() {
// Pause sends pause events to the graph in a topological sort order. If you set
// the fastPause argument to true, then it will ask future propagation waves to
// not run through the graph before exiting, and instead will exit much quicker.
func (g *Graph) Pause(fastPause bool) {
log.Printf("State: %v -> %v", g.setState(graphStatePausing), g.getState())
defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState())
if fastPause {
g.fastPause = true // set flag
}
t, _ := g.TopologicalSort()
for _, v := range t { // squeeze out the events...
v.SendEvent(event.EventPause, nil) // sync
}
g.fastPause = false // reset flag
}
// Exit sends exit events to the graph in a topological sort order.
@@ -740,6 +754,10 @@ func (g *Graph) Exit() {
if g == nil { // empty graph that wasn't populated yet
return
}
// FIXME: a second ^C could put this into fast pause, but do it for now!
g.Pause(true) // implement this with pause to avoid duplicating the code
t, _ := g.TopologicalSort()
for _, v := range t { // squeeze out the events...
// turn off the taps...

View File

@@ -58,6 +58,7 @@ type Graph struct {
Adjacency map[*Vertex]map[*Vertex]*Edge // *Vertex -> *Vertex (edge)
Flags Flags
state graphState
fastPause bool // used to disable pokes for a fast pause
mutex *sync.Mutex // used when modifying graph State variable
wg *sync.WaitGroup
semas map[string]*semaphore.Semaphore
@@ -129,6 +130,7 @@ func (g *Graph) Copy() *Graph {
wg: g.wg,
semas: g.semas,
slock: g.slock,
fastPause: g.fastPause,
prometheus: g.prometheus,
}