diff --git a/pgraph/actions.go b/pgraph/actions.go index 576daaa0..76837283 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -652,9 +652,16 @@ func (g *Graph) Start(first bool) { // start or continue log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState()) defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState()) t, _ := g.TopologicalSort() - // TODO: only calculate indegree if `first` is true to save resources indegree := g.InDegree() // compute all of the indegree's - for _, v := range Reverse(t) { + reversed := Reverse(t) + for _, v := range reversed { // run the Setup() for everyone first + if !v.Res.IsWorking() { // if Worker() is not running... + v.Res.Setup() // initialize some vars in the resource + } + } + + // run through the topological reverse, and start or unpause each vertex + for _, v := range reversed { // selective poke: here we reduce the number of initial pokes // to the minimum required to activate every vertex in the // graph, either by direct action, or by getting poked by a @@ -669,10 +676,17 @@ func (g *Graph) Start(first bool) { // start or continue // and not just selectively the subset with no indegree. // let the startup code know to poke or not - v.Res.Starter((!first) || indegree[v] == 0) + // this triggers a CheckApply AFTER Watch is Running() + // We *don't* need to also do this to new nodes or nodes that + // are about to get unpaused, because they'll get poked by one + // of the indegree == 0 vertices, and an important aspect of the + // Process() function is that even if the state is correct, it + // will pass through the Poke so that it flows through the DAG. + v.Res.Starter(indegree[v] == 0) + var unpause = true if !v.Res.IsWorking() { // if Worker() is not running... - v.Res.Setup() + unpause = false // doesn't need unpausing on first start g.wg.Add(1) // must pass in value to avoid races... // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ @@ -697,7 +711,7 @@ func (g *Graph) Start(first bool) { // start or continue // if the resource Init() fails, we don't hang! } - if !first { // unpause! + if unpause { // unpause (if needed) v.Res.SendEvent(event.EventStart, nil) // sync! } } diff --git a/resources/resources.go b/resources/resources.go index 82e64a63..e28d9746 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -339,10 +339,6 @@ func (obj *BaseRes) Init() error { obj.wcuid = obj.Converger().Register() // get a cuid for the worker! obj.pcuid = obj.Converger().Register() // get a cuid for the process - obj.eventsLock = &sync.Mutex{} - obj.eventsDone = false - obj.eventsChan = make(chan *event.Event) // unbuffered chan to avoid stale events - obj.processLock = &sync.Mutex{} // lock around processChan closing and sending obj.processDone = false // did we close processChan ? obj.processChan = make(chan *event.Event) @@ -448,7 +444,10 @@ func (obj *BaseRes) WaitGroup() *sync.WaitGroup { return obj.waitGroup } func (obj *BaseRes) Setup() { obj.started = make(chan struct{}) // closes when started obj.stopped = make(chan struct{}) // closes when stopped - return + + obj.eventsLock = &sync.Mutex{} + obj.eventsDone = false + obj.eventsChan = make(chan *event.Event) // unbuffered chan to avoid stale events } // Reset from Setup. diff --git a/resources/sendrecv.go b/resources/sendrecv.go index 219df27c..f482213c 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -100,27 +100,39 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) { obj.quiescing = false // reset ev.ACK() - // wait for next event to continue - select { - case e, ok := <-obj.Events(): - if !ok { // shutdown - err := error(nil) - return &err, false + // wait for next event to continue, but discard any backpoking! + for { + // Consider a graph (V2->V3). If while paused, we add a + // new resource (V1->V2), when we unpause, V3 will run, + // and then V2 followed by V1 (reverse topo sort) which + // can cause V2 to BackPoke to V1 (since V1 needs to go + // first) which can panic if V1 is not running yet! The + // solution is to ignore the BackPoke because once that + // V1 vertex gets running, it will then send off a poke + // to V2 that it did without the need for the BackPoke! + select { + case e, ok := <-obj.Events(): + if !ok { // shutdown + err := error(nil) + return &err, false + } + //obj.quiescing = true + //obj.quiesceGroup.Wait() // unnecessary, but symmetrically correct + //obj.quiescing = false + e.ACK() + err := e.Error() + if e.Kind == event.EventExit { + return &err, false + } else if e.Kind == event.EventStart { // eventContinue + return nil, false // don't poke on unpause! + } else if e.Kind == event.EventBackPoke { + continue // silently discard this event while paused + } + // if we get a poke event here, it's a bug! + err = fmt.Errorf("%s[%s]: unknown event: %v, while paused", obj.Kind(), obj.GetName(), e) + panic(err) // TODO: return a special sentinel instead? + //return &err, false } - //obj.quiescing = true - //obj.quiesceGroup.Wait() // unnecessary, but symmetrically correct - //obj.quiescing = false - e.ACK() - err := e.Error() - if e.Kind == event.EventExit { - return &err, false - } else if e.Kind == event.EventStart { // eventContinue - return nil, false // don't poke on unpause! - } - // if we get a poke event here, it's a bug! - err = fmt.Errorf("%s[%s]: unknown event: %v, while paused", obj.Kind(), obj.GetName(), e) - panic(err) // TODO: return a special sentinel instead? - //return &err, false } } err = fmt.Errorf("unknown event: %v", ev)