From 074da4da190266b037e46ae6b31cc0c7a2b128a7 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Mon, 13 Mar 2017 07:36:41 -0400 Subject: [PATCH] pgraph, resources: Run the resource Setup in parallel This is a reasonable thing to do at this time. --- pgraph/actions.go | 17 ++++++++++++----- resources/resources.go | 5 +++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/pgraph/actions.go b/pgraph/actions.go index 37903436..47bde1d5 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -654,11 +654,18 @@ func (g *Graph) Start(first bool) { // start or continue t, _ := g.TopologicalSort() indegree := g.InDegree() // compute all of the indegree's reversed := Reverse(t) + wg := &sync.WaitGroup{} for _, v := range reversed { // run the Setup() for everyone first - if !v.Res.IsWorking() { // if Worker() is not running... - v.Res.Setup() // initialize some vars in the resource - } + // run these in parallel, as long as we wait before continuing + wg.Add(1) + go func(vv *Vertex) { + defer wg.Done() + if !vv.Res.IsWorking() { // if Worker() is not running... + vv.Res.Setup() // initialize some vars in the resource + } + }(v) } + wg.Wait() // run through the topological reverse, and start or unpause each vertex for _, v := range reversed { @@ -730,9 +737,9 @@ func (g *Graph) Pause() { // Exit sends exit events to the graph in a topological sort order. func (g *Graph) Exit() { - if g == nil { + if g == nil { // empty graph that wasn't populated yet return - } // empty graph that wasn't populated yet + } t, _ := g.TopologicalSort() for _, v := range t { // squeeze out the events... // turn off the taps... diff --git a/resources/resources.go b/resources/resources.go index e28d9746..97b5e452 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -440,7 +440,8 @@ func (obj *BaseRes) QuiesceGroup() *sync.WaitGroup { return obj.quiesceGroup } func (obj *BaseRes) WaitGroup() *sync.WaitGroup { return obj.waitGroup } // Setup does some work which must happen before the Worker starts. It happens -// once per Worker startup. +// once per Worker startup. It can happen in parallel with other Setup calls, so +// add locks around any operation that's not thread-safe. func (obj *BaseRes) Setup() { obj.started = make(chan struct{}) // closes when started obj.stopped = make(chan struct{}) // closes when stopped @@ -450,7 +451,7 @@ func (obj *BaseRes) Setup() { obj.eventsChan = make(chan *event.Event) // unbuffered chan to avoid stale events } -// Reset from Setup. +// Reset from Setup. These can get called for different vertices in parallel. func (obj *BaseRes) Reset() { return }