From 2da21f90f43de8128cc6ee63f368f671a8231484 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Thu, 2 Feb 2017 19:48:42 -0500 Subject: [PATCH] pgraph, resources: Improve Init/Close and Worker status This should do some rough cleanups around the Init/Close of resources, and tracking of Worker function status. --- lib/main.go | 2 +- pgraph/actions.go | 8 +++++-- pgraph/pgraph.go | 50 +++++++++++++++++++++++++++++------------- resources/resources.go | 22 +++++++++---------- yamlgraph/gconfig.go | 6 ++--- 5 files changed, 54 insertions(+), 34 deletions(-) diff --git a/lib/main.go b/lib/main.go index 4f9f8147..4ffa017d 100644 --- a/lib/main.go +++ b/lib/main.go @@ -564,7 +564,7 @@ func (obj *Main) Run() error { // tell inner main loop to exit close(exitchan) - G.Exit() // tell all the children to exit, and waits for them to do so + G.Exit() // tells all the children to exit, and waits for them to do so // cleanup etcd main loop last so it can process everything first if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd diff --git a/pgraph/actions.go b/pgraph/actions.go index dc77aba9..4c0e53f6 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -316,7 +316,11 @@ func (g *Graph) Worker(v *Vertex) error { // the Watch() function about which graph it is // running on, which isolates things nicely... obj := v.Res - obj.SetWorking(true) // gets set to false in Res.Close() method at end... + + // run the init (should match 1-1 with Close function if this succeeds) + if err := obj.Init(); err != nil { + return errwrap.Wrapf(err, "could not Init() resource") + } lock := &sync.Mutex{} // lock around processChan closing and sending finished := false // did we close processChan ? @@ -629,7 +633,7 @@ func (g *Graph) Start(first bool) { // start or continue go func(vv *Vertex) { defer g.wg.Done() // TODO: if a sufficient number of workers error, - // should something be done? Will these restart + // should something be done? Should these restart // after perma-failure if we have a graph change? if err := g.Worker(vv); err != nil { // contains the Watch and CheckApply loops log.Printf("%s[%s]: Exited with failure: %v", vv.Kind(), vv.GetName(), err) diff --git a/pgraph/pgraph.go b/pgraph/pgraph.go index 706289d0..3c0c8b2b 100644 --- a/pgraph/pgraph.go +++ b/pgraph/pgraph.go @@ -192,17 +192,28 @@ func (g *Graph) DeleteEdge(e *Edge) { } } -// GetVertexMatch searches for an equivalent resource in the graph and returns -// the vertex it is found in, or nil if not found. -func (g *Graph) GetVertexMatch(obj resources.Res) *Vertex { - for k := range g.Adjacency { - if k.Res.Compare(obj) { - return k +// CompareMatch searches for an equivalent resource in the graph and returns the +// vertex it is found in, or nil if not found. +func (g *Graph) CompareMatch(obj resources.Res) *Vertex { + for v := range g.Adjacency { + if v.Res.Compare(obj) { + return v } } return nil } +// TODO: consider adding a mutate API. +//func (g *Graph) MutateMatch(obj resources.Res) *Vertex { +// for v := range g.Adjacency { +// if err := v.Res.Mutate(obj); err == nil { +// // transmogrified! +// return v +// } +// } +// return nil +//} + // HasVertex returns if the input vertex exists in the graph. func (g *Graph) HasVertex(v *Vertex) bool { if _, exists := g.Adjacency[v]; exists { @@ -532,7 +543,8 @@ func (g *Graph) Reachability(a, b *Vertex) []*Vertex { } // GraphSync updates the oldGraph so that it matches the newGraph receiver. It -// leaves identical elements alone so that they don't need to be refreshed. +// leaves identical elements alone so that they don't need to be refreshed. It +// tries to mutate existing elements into new ones, if they support this. // FIXME: add test cases func (g *Graph) GraphSync(oldGraph *Graph) (*Graph, error) { @@ -547,16 +559,24 @@ func (g *Graph) GraphSync(oldGraph *Graph) (*Graph, error) { for v := range g.Adjacency { // loop through the vertices (resources) res := v.Res // resource + var vertex *Vertex - vertex := oldGraph.GetVertexMatch(res) - if vertex == nil { // no match found + // step one, direct compare with res.Compare + if vertex == nil { // redundant guard for consistency + vertex = oldGraph.CompareMatch(res) + } + + // TODO: consider adding a mutate API. + // step two, try and mutate with res.Mutate + //if vertex == nil { // not found yet... + // vertex = oldGraph.MutateMatch(res) + //} + + if vertex == nil { // no match found yet if err := res.Validate(); err != nil { return nil, errwrap.Wrapf(err, "could not Validate() resource") } - if err := res.Init(); err != nil { - return nil, errwrap.Wrapf(err, "could not Init() resource") - } - vertex = NewVertex(res) + vertex = v oldGraph.AddVertex(vertex) // call standalone in case not part of an edge } lookup[v] = vertex // used for constructing edges @@ -580,8 +600,8 @@ func (g *Graph) GraphSync(oldGraph *Graph) (*Graph, error) { // lookup vertices (these should exist now) //res1 := v1.Res // resource //res2 := v2.Res - //vertex1 := oldGraph.GetVertexMatch(res1) - //vertex2 := oldGraph.GetVertexMatch(res2) + //vertex1 := oldGraph.CompareMatch(res1) + //vertex2 := oldGraph.CompareMatch(res2) vertex1, exists1 := lookup[v1] vertex2, exists2 := lookup[v2] if !exists1 || !exists2 { // no match found, bug? diff --git a/resources/resources.go b/resources/resources.go index b1c42662..ec1feca9 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -140,7 +140,6 @@ type Base interface { Events() chan *event.Event AssociateData(*Data) IsWorking() bool - SetWorking(bool) Converger() converger.Converger RegisterConverger() UnregisterConverger() @@ -286,6 +285,9 @@ func (obj *BaseRes) Validate() error { // Init initializes structures like channels if created without New constructor. func (obj *BaseRes) Init() error { + if obj.debug { + log.Printf("%s[%s]: Init()", obj.Kind(), obj.GetName()) + } if obj.kind == "" { return fmt.Errorf("Resource did not set kind!") } @@ -307,13 +309,18 @@ func (obj *BaseRes) Init() error { //} // TODO: this StatefulBool implementation could be eventually swappable //obj.refreshState = &DiskBool{Path: path.Join(dir, refreshPathToken)} + + obj.working = true // Worker method should now be running... return nil } // Close shuts down and performs any cleanup. func (obj *BaseRes) Close() error { + if obj.debug { + log.Printf("%s[%s]: Close()", obj.Kind(), obj.GetName()) + } obj.mutex.Lock() - obj.working = false // obj.SetWorking(false) + obj.working = false // Worker method should now be closing... close(obj.events) // this is where we properly close this channel! obj.mutex.Unlock() return nil @@ -357,20 +364,11 @@ func (obj *BaseRes) AssociateData(data *Data) { obj.debug = data.Debug } -// IsWorking tells us if the Worker() function is running. +// IsWorking tells us if the Worker() function is running. Not thread safe. func (obj *BaseRes) IsWorking() bool { - obj.mutex.Lock() - defer obj.mutex.Unlock() return obj.working } -// SetWorking tracks the state of if Worker() function is running. -func (obj *BaseRes) SetWorking(b bool) { - obj.mutex.Lock() - defer obj.mutex.Unlock() - obj.working = b -} - // Converger returns the converger object used by the system. It can be used to // register new convergers if needed. func (obj *BaseRes) Converger() converger.Converger { diff --git a/yamlgraph/gconfig.go b/yamlgraph/gconfig.go index d901e835..b584f29f 100644 --- a/yamlgraph/gconfig.go +++ b/yamlgraph/gconfig.go @@ -133,9 +133,8 @@ func (c *GraphConfig) NewGraphFromConfig(hostname string, world gapi.World, noop // XXX: should we export based on a @@ prefix, or a metaparam // like exported => true || exported => (host pattern)||(other pattern?) if !strings.HasPrefix(res.GetName(), "@@") { // not exported resource - v := graph.GetVertexMatch(res) + v := graph.CompareMatch(res) if v == nil { // no match found - res.Init() v = pgraph.NewVertex(res) graph.AddVertex(v) // call standalone in case not part of an edge } @@ -207,9 +206,8 @@ func (c *GraphConfig) NewGraphFromConfig(hostname string, world gapi.World, noop if _, exists := lookup[kind]; !exists { lookup[kind] = make(map[string]*pgraph.Vertex) } - v := graph.GetVertexMatch(res) + v := graph.CompareMatch(res) if v == nil { // no match found - res.Init() // initialize go channels or things won't work!!! v = pgraph.NewVertex(res) graph.AddVertex(v) // call standalone in case not part of an edge }