pgraph, resources: Improve Init/Close and Worker status

This should do some rough cleanups around the Init/Close of resources,
and tracking of Worker function status.
This commit is contained in:
James Shubin
2017-02-02 19:48:42 -05:00
parent bec7f1726f
commit 2da21f90f4
5 changed files with 54 additions and 34 deletions

View File

@@ -564,7 +564,7 @@ func (obj *Main) Run() error {
// tell inner main loop to exit
close(exitchan)
G.Exit() // tell all the children to exit, and waits for them to do so
G.Exit() // tells all the children to exit, and waits for them to do so
// cleanup etcd main loop last so it can process everything first
if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd

View File

@@ -316,7 +316,11 @@ func (g *Graph) Worker(v *Vertex) error {
// the Watch() function about which graph it is
// running on, which isolates things nicely...
obj := v.Res
obj.SetWorking(true) // gets set to false in Res.Close() method at end...
// run the init (should match 1-1 with Close function if this succeeds)
if err := obj.Init(); err != nil {
return errwrap.Wrapf(err, "could not Init() resource")
}
lock := &sync.Mutex{} // lock around processChan closing and sending
finished := false // did we close processChan ?
@@ -629,7 +633,7 @@ func (g *Graph) Start(first bool) { // start or continue
go func(vv *Vertex) {
defer g.wg.Done()
// TODO: if a sufficient number of workers error,
// should something be done? Will these restart
// should something be done? Should these restart
// after perma-failure if we have a graph change?
if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops
log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err)

View File

@@ -192,17 +192,28 @@ func (g *Graph) DeleteEdge(e *Edge) {
}
}
// GetVertexMatch searches for an equivalent resource in the graph and returns
// the vertex it is found in, or nil if not found.
func (g *Graph) GetVertexMatch(obj resources.Res) *Vertex {
for k := range g.Adjacency {
if k.Res.Compare(obj) {
return k
// CompareMatch searches for an equivalent resource in the graph and returns the
// vertex it is found in, or nil if not found.
func (g *Graph) CompareMatch(obj resources.Res) *Vertex {
for v := range g.Adjacency {
if v.Res.Compare(obj) {
return v
}
}
return nil
}
// TODO: consider adding a mutate API.
//func (g *Graph) MutateMatch(obj resources.Res) *Vertex {
// for v := range g.Adjacency {
// if err := v.Res.Mutate(obj); err == nil {
// // transmogrified!
// return v
// }
// }
// return nil
//}
// HasVertex returns if the input vertex exists in the graph.
func (g *Graph) HasVertex(v *Vertex) bool {
if _, exists := g.Adjacency[v]; exists {
@@ -532,7 +543,8 @@ func (g *Graph) Reachability(a, b *Vertex) []*Vertex {
}
// GraphSync updates the oldGraph so that it matches the newGraph receiver. It
// leaves identical elements alone so that they don't need to be refreshed.
// leaves identical elements alone so that they don't need to be refreshed. It
// tries to mutate existing elements into new ones, if they support this.
// FIXME: add test cases
func (g *Graph) GraphSync(oldGraph *Graph) (*Graph, error) {
@@ -547,16 +559,24 @@ func (g *Graph) GraphSync(oldGraph *Graph) (*Graph, error) {
for v := range g.Adjacency { // loop through the vertices (resources)
res := v.Res // resource
var vertex *Vertex
vertex := oldGraph.GetVertexMatch(res)
if vertex == nil { // no match found
// step one, direct compare with res.Compare
if vertex == nil { // redundant guard for consistency
vertex = oldGraph.CompareMatch(res)
}
// TODO: consider adding a mutate API.
// step two, try and mutate with res.Mutate
//if vertex == nil { // not found yet...
// vertex = oldGraph.MutateMatch(res)
//}
if vertex == nil { // no match found yet
if err := res.Validate(); err != nil {
return nil, errwrap.Wrapf(err, "could not Validate() resource")
}
if err := res.Init(); err != nil {
return nil, errwrap.Wrapf(err, "could not Init() resource")
}
vertex = NewVertex(res)
vertex = v
oldGraph.AddVertex(vertex) // call standalone in case not part of an edge
}
lookup[v] = vertex // used for constructing edges
@@ -580,8 +600,8 @@ func (g *Graph) GraphSync(oldGraph *Graph) (*Graph, error) {
// lookup vertices (these should exist now)
//res1 := v1.Res // resource
//res2 := v2.Res
//vertex1 := oldGraph.GetVertexMatch(res1)
//vertex2 := oldGraph.GetVertexMatch(res2)
//vertex1 := oldGraph.CompareMatch(res1)
//vertex2 := oldGraph.CompareMatch(res2)
vertex1, exists1 := lookup[v1]
vertex2, exists2 := lookup[v2]
if !exists1 || !exists2 { // no match found, bug?

View File

@@ -140,7 +140,6 @@ type Base interface {
Events() chan *event.Event
AssociateData(*Data)
IsWorking() bool
SetWorking(bool)
Converger() converger.Converger
RegisterConverger()
UnregisterConverger()
@@ -286,6 +285,9 @@ func (obj *BaseRes) Validate() error {
// Init initializes structures like channels if created without New constructor.
func (obj *BaseRes) Init() error {
if obj.debug {
log.Printf("%s[%s]: Init()", obj.Kind(), obj.GetName())
}
if obj.kind == "" {
return fmt.Errorf("Resource did not set kind!")
}
@@ -307,13 +309,18 @@ func (obj *BaseRes) Init() error {
//}
// TODO: this StatefulBool implementation could be eventually swappable
//obj.refreshState = &DiskBool{Path: path.Join(dir, refreshPathToken)}
obj.working = true // Worker method should now be running...
return nil
}
// Close shuts down and performs any cleanup.
func (obj *BaseRes) Close() error {
if obj.debug {
log.Printf("%s[%s]: Close()", obj.Kind(), obj.GetName())
}
obj.mutex.Lock()
obj.working = false // obj.SetWorking(false)
obj.working = false // Worker method should now be closing...
close(obj.events) // this is where we properly close this channel!
obj.mutex.Unlock()
return nil
@@ -357,20 +364,11 @@ func (obj *BaseRes) AssociateData(data *Data) {
obj.debug = data.Debug
}
// IsWorking tells us if the Worker() function is running.
// IsWorking tells us if the Worker() function is running. Not thread safe.
func (obj *BaseRes) IsWorking() bool {
obj.mutex.Lock()
defer obj.mutex.Unlock()
return obj.working
}
// SetWorking tracks the state of if Worker() function is running.
func (obj *BaseRes) SetWorking(b bool) {
obj.mutex.Lock()
defer obj.mutex.Unlock()
obj.working = b
}
// Converger returns the converger object used by the system. It can be used to
// register new convergers if needed.
func (obj *BaseRes) Converger() converger.Converger {

View File

@@ -133,9 +133,8 @@ func (c *GraphConfig) NewGraphFromConfig(hostname string, world gapi.World, noop
// XXX: should we export based on a @@ prefix, or a metaparam
// like exported => true || exported => (host pattern)||(other pattern?)
if !strings.HasPrefix(res.GetName(), "@@") { // not exported resource
v := graph.GetVertexMatch(res)
v := graph.CompareMatch(res)
if v == nil { // no match found
res.Init()
v = pgraph.NewVertex(res)
graph.AddVertex(v) // call standalone in case not part of an edge
}
@@ -207,9 +206,8 @@ func (c *GraphConfig) NewGraphFromConfig(hostname string, world gapi.World, noop
if _, exists := lookup[kind]; !exists {
lookup[kind] = make(map[string]*pgraph.Vertex)
}
v := graph.GetVertexMatch(res)
v := graph.CompareMatch(res)
if v == nil { // no match found
res.Init() // initialize go channels or things won't work!!!
v = pgraph.NewVertex(res)
graph.AddVertex(v) // call standalone in case not part of an edge
}