From dd8d17232fddfef3766a9a2f05359aef9f891f1a Mon Sep 17 00:00:00 2001 From: James Shubin Date: Tue, 20 Dec 2016 04:23:51 -0500 Subject: [PATCH] pgraph: Build the sync group into the graph structure This hides the sync/wait logic inside the graph itself. --- lib/main.go | 16 +++++++--------- pgraph/actions.go | 11 ++++++++--- pgraph/pgraph.go | 7 ++++++- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/lib/main.go b/lib/main.go index 9051d856..e22484ca 100644 --- a/lib/main.go +++ b/lib/main.go @@ -23,7 +23,6 @@ import ( "log" "os" "path" - "sync" "time" "github.com/purpleidea/mgmt/converger" @@ -267,7 +266,6 @@ func (obj *Main) Run() error { // TODO: Import admin key } - var wg sync.WaitGroup var G, oldGraph *pgraph.Graph // exit after `max-runtime` seconds for no reason at all... @@ -412,8 +410,8 @@ func (obj *Main) Run() error { log.Printf("Config: Error creating new graph: %v", err) // unpause! if !first { - G.Start(&wg, first) // sync - converger.Start() // after G.Start() + G.Start(first) // sync + converger.Start() // after G.Start() } continue } @@ -440,8 +438,8 @@ func (obj *Main) Run() error { log.Printf("Config: Error running graph sync: %v", err) // unpause! if !first { - G.Start(&wg, first) // sync - converger.Start() // after G.Start() + G.Start(first) // sync + converger.Start() // after G.Start() } continue } @@ -466,8 +464,8 @@ func (obj *Main) Run() error { // some are not ready yet and the EtcdWatch // loops, we'll cause G.Pause(...) before we // even got going, thus causing nil pointer errors - G.Start(&wg, first) // sync - converger.Start() // after G.Start() + G.Start(first) // sync + converger.Start() // after G.Start() first = false } }() @@ -557,7 +555,7 @@ func (obj *Main) Run() error { log.Printf("Main: Graph: %v", G) } - wg.Wait() // wait for primary go routines to exit + G.Wait() // wait for the graph vertex worker goroutines to exit // TODO: wait for each vertex to exit... log.Println("Goodbye!") diff --git a/pgraph/actions.go b/pgraph/actions.go index dcf231f6..1626cdd4 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -450,7 +450,7 @@ func (g *Graph) Worker(v *Vertex) error { // Start is a main kick to start the graph. It goes through in reverse topological // sort order so that events can't hit un-started vertices. -func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue +func (g *Graph) Start(first bool) { // start or continue log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState()) defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState()) t, _ := g.TopologicalSort() @@ -459,11 +459,11 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue for _, v := range Reverse(t) { if !v.Res.IsWatching() { // if Watch() is not running... - wg.Add(1) + g.wg.Add(1) // must pass in value to avoid races... // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ go func(vv *Vertex) { - defer wg.Done() + defer g.wg.Done() // TODO: if a sufficient number of workers error, // should something be done? Will these restart // after perma-failure if we have a graph change? @@ -502,6 +502,11 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue } } +// Wait waits for all the graph vertex workers to exit. +func (g *Graph) Wait() { + g.wg.Wait() +} + // Pause sends pause events to the graph in a topological sort order. func (g *Graph) Pause() { log.Printf("State: %v -> %v", g.setState(graphStatePausing), g.getState()) diff --git a/pgraph/pgraph.go b/pgraph/pgraph.go index a1a68585..3a464f4a 100644 --- a/pgraph/pgraph.go +++ b/pgraph/pgraph.go @@ -55,7 +55,8 @@ type Graph struct { Adjacency map[*Vertex]map[*Vertex]*Edge // *Vertex -> *Vertex (edge) Flags Flags state graphState - mutex sync.Mutex // used when modifying graph State variable + mutex *sync.Mutex // used when modifying graph State variable + wg *sync.WaitGroup } // Vertex is the primary vertex struct in this library. @@ -78,6 +79,8 @@ func NewGraph(name string) *Graph { Name: name, Adjacency: make(map[*Vertex]map[*Vertex]*Edge), state: graphStateNil, + // ptr b/c: "A WaitGroup must not be copied after first use." + wg: &sync.WaitGroup{}, } } @@ -112,6 +115,8 @@ func (g *Graph) Copy() *Graph { Adjacency: make(map[*Vertex]map[*Vertex]*Edge, len(g.Adjacency)), Flags: g.Flags, state: g.state, + mutex: g.mutex, + wg: g.wg, } for k, v := range g.Adjacency { newGraph.Adjacency[k] = v // copy