From 95a1c6e7fb0a094dbc9f61d417f4aee1fd191358 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Thu, 9 Mar 2017 01:14:39 -0500 Subject: [PATCH] pgraph, resources: Discard BackPokes during pause and resume This prevents some nasty races where a BackPoke could arrive on a paused vertex either during a resume or pause operation. Previously we might also have poked an excessive number of resources on resume. The solution was to discard BackPokes during pause or resume. On pause, they can be discarded because we've asked the graph to quiesce, and any further work can be done on resume, and on resume we ignore them because this should only happen during the unrolling (reverse topological resume of the graph) and at the end of this the indegree == 0 vertices will initiate a series of pokes which should deal with any BackPoke that was possibly discarded. One other aspect of this which is important: if an indegree == 0 vertex is poked (Process runs) but it's already in the correct state, it should still transmit the Poke through itself so that subsequent vertices know to run. Currently this is done correctly in Process(). I'm a bit ashamed that this wasn't done properly in the engine earlier, but I suppose that's what comes out of running fancier graphs and really thinking in detail about what's truly correct. Hopefully I got it right this time! --- pgraph/actions.go | 24 +++++++++++++++---- resources/resources.go | 9 ++++---- resources/sendrecv.go | 52 ++++++++++++++++++++++++++---------------- 3 files changed, 55 insertions(+), 30 deletions(-) diff --git a/pgraph/actions.go b/pgraph/actions.go index 576daaa0..76837283 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -652,9 +652,16 @@ func (g *Graph) Start(first bool) { // start or continue log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState()) defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState()) t, _ := g.TopologicalSort() - // TODO: only calculate indegree if `first` is true to save resources indegree := g.InDegree() // compute all of the indegree's - for _, v := range Reverse(t) { + reversed := Reverse(t) + for _, v := range reversed { // run the Setup() for everyone first + if !v.Res.IsWorking() { // if Worker() is not running... + v.Res.Setup() // initialize some vars in the resource + } + } + + // run through the topological reverse, and start or unpause each vertex + for _, v := range reversed { // selective poke: here we reduce the number of initial pokes // to the minimum required to activate every vertex in the // graph, either by direct action, or by getting poked by a @@ -669,10 +676,17 @@ func (g *Graph) Start(first bool) { // start or continue // and not just selectively the subset with no indegree. // let the startup code know to poke or not - v.Res.Starter((!first) || indegree[v] == 0) + // this triggers a CheckApply AFTER Watch is Running() + // We *don't* need to also do this to new nodes or nodes that + // are about to get unpaused, because they'll get poked by one + // of the indegree == 0 vertices, and an important aspect of the + // Process() function is that even if the state is correct, it + // will pass through the Poke so that it flows through the DAG. + v.Res.Starter(indegree[v] == 0) + var unpause = true if !v.Res.IsWorking() { // if Worker() is not running... - v.Res.Setup() + unpause = false // doesn't need unpausing on first start g.wg.Add(1) // must pass in value to avoid races... // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ @@ -697,7 +711,7 @@ func (g *Graph) Start(first bool) { // start or continue // if the resource Init() fails, we don't hang! } - if !first { // unpause! + if unpause { // unpause (if needed) v.Res.SendEvent(event.EventStart, nil) // sync! } } diff --git a/resources/resources.go b/resources/resources.go index 82e64a63..e28d9746 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -339,10 +339,6 @@ func (obj *BaseRes) Init() error { obj.wcuid = obj.Converger().Register() // get a cuid for the worker! obj.pcuid = obj.Converger().Register() // get a cuid for the process - obj.eventsLock = &sync.Mutex{} - obj.eventsDone = false - obj.eventsChan = make(chan *event.Event) // unbuffered chan to avoid stale events - obj.processLock = &sync.Mutex{} // lock around processChan closing and sending obj.processDone = false // did we close processChan ? obj.processChan = make(chan *event.Event) @@ -448,7 +444,10 @@ func (obj *BaseRes) WaitGroup() *sync.WaitGroup { return obj.waitGroup } func (obj *BaseRes) Setup() { obj.started = make(chan struct{}) // closes when started obj.stopped = make(chan struct{}) // closes when stopped - return + + obj.eventsLock = &sync.Mutex{} + obj.eventsDone = false + obj.eventsChan = make(chan *event.Event) // unbuffered chan to avoid stale events } // Reset from Setup. diff --git a/resources/sendrecv.go b/resources/sendrecv.go index 219df27c..f482213c 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -100,27 +100,39 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) { obj.quiescing = false // reset ev.ACK() - // wait for next event to continue - select { - case e, ok := <-obj.Events(): - if !ok { // shutdown - err := error(nil) - return &err, false + // wait for next event to continue, but discard any backpoking! + for { + // Consider a graph (V2->V3). If while paused, we add a + // new resource (V1->V2), when we unpause, V3 will run, + // and then V2 followed by V1 (reverse topo sort) which + // can cause V2 to BackPoke to V1 (since V1 needs to go + // first) which can panic if V1 is not running yet! The + // solution is to ignore the BackPoke because once that + // V1 vertex gets running, it will then send off a poke + // to V2 that it did without the need for the BackPoke! + select { + case e, ok := <-obj.Events(): + if !ok { // shutdown + err := error(nil) + return &err, false + } + //obj.quiescing = true + //obj.quiesceGroup.Wait() // unnecessary, but symmetrically correct + //obj.quiescing = false + e.ACK() + err := e.Error() + if e.Kind == event.EventExit { + return &err, false + } else if e.Kind == event.EventStart { // eventContinue + return nil, false // don't poke on unpause! + } else if e.Kind == event.EventBackPoke { + continue // silently discard this event while paused + } + // if we get a poke event here, it's a bug! + err = fmt.Errorf("%s[%s]: unknown event: %v, while paused", obj.Kind(), obj.GetName(), e) + panic(err) // TODO: return a special sentinel instead? + //return &err, false } - //obj.quiescing = true - //obj.quiesceGroup.Wait() // unnecessary, but symmetrically correct - //obj.quiescing = false - e.ACK() - err := e.Error() - if e.Kind == event.EventExit { - return &err, false - } else if e.Kind == event.EventStart { // eventContinue - return nil, false // don't poke on unpause! - } - // if we get a poke event here, it's a bug! - err = fmt.Errorf("%s[%s]: unknown event: %v, while paused", obj.Kind(), obj.GetName(), e) - panic(err) // TODO: return a special sentinel instead? - //return &err, false } } err = fmt.Errorf("unknown event: %v", ev)