pgraph, resources: Run the resource Setup in parallel

This is a reasonable thing to do at this time.
This commit is contained in:
James Shubin
2017-03-13 07:36:41 -04:00
parent e4e39d820c
commit 074da4da19
2 changed files with 15 additions and 7 deletions

View File

@@ -654,11 +654,18 @@ func (g *Graph) Start(first bool) { // start or continue
t, _ := g.TopologicalSort()
indegree := g.InDegree() // compute all of the indegree's
reversed := Reverse(t)
wg := &sync.WaitGroup{}
for _, v := range reversed { // run the Setup() for everyone first
if !v.Res.IsWorking() { // if Worker() is not running...
v.Res.Setup() // initialize some vars in the resource
// run these in parallel, as long as we wait before continuing
wg.Add(1)
go func(vv *Vertex) {
defer wg.Done()
if !vv.Res.IsWorking() { // if Worker() is not running...
vv.Res.Setup() // initialize some vars in the resource
}
}(v)
}
wg.Wait()
// run through the topological reverse, and start or unpause each vertex
for _, v := range reversed {
@@ -730,9 +737,9 @@ func (g *Graph) Pause() {
// Exit sends exit events to the graph in a topological sort order.
func (g *Graph) Exit() {
if g == nil {
if g == nil { // empty graph that wasn't populated yet
return
} // empty graph that wasn't populated yet
}
t, _ := g.TopologicalSort()
for _, v := range t { // squeeze out the events...
// turn off the taps...

View File

@@ -440,7 +440,8 @@ func (obj *BaseRes) QuiesceGroup() *sync.WaitGroup { return obj.quiesceGroup }
func (obj *BaseRes) WaitGroup() *sync.WaitGroup { return obj.waitGroup }
// Setup does some work which must happen before the Worker starts. It happens
// once per Worker startup.
// once per Worker startup. It can happen in parallel with other Setup calls, so
// add locks around any operation that's not thread-safe.
func (obj *BaseRes) Setup() {
obj.started = make(chan struct{}) // closes when started
obj.stopped = make(chan struct{}) // closes when stopped
@@ -450,7 +451,7 @@ func (obj *BaseRes) Setup() {
obj.eventsChan = make(chan *event.Event) // unbuffered chan to avoid stale events
}
// Reset from Setup.
// Reset from Setup. These can get called for different vertices in parallel.
func (obj *BaseRes) Reset() {
return
}