pgraph: Build the sync group into the graph structure
This hides the sync/wait logic inside the graph itself.
This commit is contained in:
16
lib/main.go
16
lib/main.go
@@ -23,7 +23,6 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/converger"
|
"github.com/purpleidea/mgmt/converger"
|
||||||
@@ -267,7 +266,6 @@ func (obj *Main) Run() error {
|
|||||||
// TODO: Import admin key
|
// TODO: Import admin key
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
var G, oldGraph *pgraph.Graph
|
var G, oldGraph *pgraph.Graph
|
||||||
|
|
||||||
// exit after `max-runtime` seconds for no reason at all...
|
// exit after `max-runtime` seconds for no reason at all...
|
||||||
@@ -412,8 +410,8 @@ func (obj *Main) Run() error {
|
|||||||
log.Printf("Config: Error creating new graph: %v", err)
|
log.Printf("Config: Error creating new graph: %v", err)
|
||||||
// unpause!
|
// unpause!
|
||||||
if !first {
|
if !first {
|
||||||
G.Start(&wg, first) // sync
|
G.Start(first) // sync
|
||||||
converger.Start() // after G.Start()
|
converger.Start() // after G.Start()
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -440,8 +438,8 @@ func (obj *Main) Run() error {
|
|||||||
log.Printf("Config: Error running graph sync: %v", err)
|
log.Printf("Config: Error running graph sync: %v", err)
|
||||||
// unpause!
|
// unpause!
|
||||||
if !first {
|
if !first {
|
||||||
G.Start(&wg, first) // sync
|
G.Start(first) // sync
|
||||||
converger.Start() // after G.Start()
|
converger.Start() // after G.Start()
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -466,8 +464,8 @@ func (obj *Main) Run() error {
|
|||||||
// some are not ready yet and the EtcdWatch
|
// some are not ready yet and the EtcdWatch
|
||||||
// loops, we'll cause G.Pause(...) before we
|
// loops, we'll cause G.Pause(...) before we
|
||||||
// even got going, thus causing nil pointer errors
|
// even got going, thus causing nil pointer errors
|
||||||
G.Start(&wg, first) // sync
|
G.Start(first) // sync
|
||||||
converger.Start() // after G.Start()
|
converger.Start() // after G.Start()
|
||||||
first = false
|
first = false
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -557,7 +555,7 @@ func (obj *Main) Run() error {
|
|||||||
log.Printf("Main: Graph: %v", G)
|
log.Printf("Main: Graph: %v", G)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait() // wait for primary go routines to exit
|
G.Wait() // wait for the graph vertex worker goroutines to exit
|
||||||
|
|
||||||
// TODO: wait for each vertex to exit...
|
// TODO: wait for each vertex to exit...
|
||||||
log.Println("Goodbye!")
|
log.Println("Goodbye!")
|
||||||
|
|||||||
@@ -450,7 +450,7 @@ func (g *Graph) Worker(v *Vertex) error {
|
|||||||
|
|
||||||
// Start is a main kick to start the graph. It goes through in reverse topological
|
// Start is a main kick to start the graph. It goes through in reverse topological
|
||||||
// sort order so that events can't hit un-started vertices.
|
// sort order so that events can't hit un-started vertices.
|
||||||
func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
|
func (g *Graph) Start(first bool) { // start or continue
|
||||||
log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState())
|
log.Printf("State: %v -> %v", g.setState(graphStateStarting), g.getState())
|
||||||
defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState())
|
defer log.Printf("State: %v -> %v", g.setState(graphStateStarted), g.getState())
|
||||||
t, _ := g.TopologicalSort()
|
t, _ := g.TopologicalSort()
|
||||||
@@ -459,11 +459,11 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
|
|||||||
for _, v := range Reverse(t) {
|
for _, v := range Reverse(t) {
|
||||||
|
|
||||||
if !v.Res.IsWatching() { // if Watch() is not running...
|
if !v.Res.IsWatching() { // if Watch() is not running...
|
||||||
wg.Add(1)
|
g.wg.Add(1)
|
||||||
// must pass in value to avoid races...
|
// must pass in value to avoid races...
|
||||||
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
|
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
|
||||||
go func(vv *Vertex) {
|
go func(vv *Vertex) {
|
||||||
defer wg.Done()
|
defer g.wg.Done()
|
||||||
// TODO: if a sufficient number of workers error,
|
// TODO: if a sufficient number of workers error,
|
||||||
// should something be done? Will these restart
|
// should something be done? Will these restart
|
||||||
// after perma-failure if we have a graph change?
|
// after perma-failure if we have a graph change?
|
||||||
@@ -502,6 +502,11 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait waits for all the graph vertex workers to exit.
|
||||||
|
func (g *Graph) Wait() {
|
||||||
|
g.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// Pause sends pause events to the graph in a topological sort order.
|
// Pause sends pause events to the graph in a topological sort order.
|
||||||
func (g *Graph) Pause() {
|
func (g *Graph) Pause() {
|
||||||
log.Printf("State: %v -> %v", g.setState(graphStatePausing), g.getState())
|
log.Printf("State: %v -> %v", g.setState(graphStatePausing), g.getState())
|
||||||
|
|||||||
@@ -55,7 +55,8 @@ type Graph struct {
|
|||||||
Adjacency map[*Vertex]map[*Vertex]*Edge // *Vertex -> *Vertex (edge)
|
Adjacency map[*Vertex]map[*Vertex]*Edge // *Vertex -> *Vertex (edge)
|
||||||
Flags Flags
|
Flags Flags
|
||||||
state graphState
|
state graphState
|
||||||
mutex sync.Mutex // used when modifying graph State variable
|
mutex *sync.Mutex // used when modifying graph State variable
|
||||||
|
wg *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// Vertex is the primary vertex struct in this library.
|
// Vertex is the primary vertex struct in this library.
|
||||||
@@ -78,6 +79,8 @@ func NewGraph(name string) *Graph {
|
|||||||
Name: name,
|
Name: name,
|
||||||
Adjacency: make(map[*Vertex]map[*Vertex]*Edge),
|
Adjacency: make(map[*Vertex]map[*Vertex]*Edge),
|
||||||
state: graphStateNil,
|
state: graphStateNil,
|
||||||
|
// ptr b/c: "A WaitGroup must not be copied after first use."
|
||||||
|
wg: &sync.WaitGroup{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,6 +115,8 @@ func (g *Graph) Copy() *Graph {
|
|||||||
Adjacency: make(map[*Vertex]map[*Vertex]*Edge, len(g.Adjacency)),
|
Adjacency: make(map[*Vertex]map[*Vertex]*Edge, len(g.Adjacency)),
|
||||||
Flags: g.Flags,
|
Flags: g.Flags,
|
||||||
state: g.state,
|
state: g.state,
|
||||||
|
mutex: g.mutex,
|
||||||
|
wg: g.wg,
|
||||||
}
|
}
|
||||||
for k, v := range g.Adjacency {
|
for k, v := range g.Adjacency {
|
||||||
newGraph.Adjacency[k] = v // copy
|
newGraph.Adjacency[k] = v // copy
|
||||||
|
|||||||
Reference in New Issue
Block a user