From 5f6c8545c6a320049c749aaade11d478caeaf87e Mon Sep 17 00:00:00 2001 From: James Shubin Date: Fri, 2 Jun 2017 18:50:23 -0400 Subject: [PATCH] resources: Replace stored pgraph with mgraph and clean up hacks Now that we're using our meta wrapper graph struct instead of the pgraph, we can re-implement our SetValue hacks in terms of struct fields and the implementation is now cleaner. --- resources/actions.go | 10 ++++---- resources/mgraph.go | 17 ++++++++----- resources/resources.go | 14 ++++------- resources/semaphore.go | 54 +++++++----------------------------------- 4 files changed, 28 insertions(+), 67 deletions(-) diff --git a/resources/actions.go b/resources/actions.go index b3856f0e..ce373ed9 100644 --- a/resources/actions.go +++ b/resources/actions.go @@ -69,7 +69,7 @@ func (obj *BaseRes) Poke() error { // if we're pausing (or exiting) then we should suspend poke's so that // the graph doesn't go on running forever until it's completely done! // this is an optional feature which we can do by default on user exit - if b, ok := obj.Graph.Value("fastpause"); ok && util.Bool(b) { + if obj.Graph.FastPause { return nil // TODO: should this be an error instead? } @@ -199,15 +199,15 @@ func (obj *BaseRes) Process() error { // The exception is that semaphores with a zero count will always block! // TODO: Add a close mechanism to close/unblock zero count semaphores... semas := obj.Meta().Sema - if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) && len(semas) > 0 { + if obj.debug && len(semas) > 0 { log.Printf("%s: Sema: P(%s)", obj, strings.Join(semas, ", ")) } - if err := SemaLock(obj.Graph, semas); err != nil { // lock + if err := obj.Graph.SemaLock(semas); err != nil { // lock // NOTE: in practice, this might not ever be truly necessary... return fmt.Errorf("shutdown of semaphores") } - defer SemaUnlock(obj.Graph, semas) // unlock - if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) && len(semas) > 0 { + defer obj.Graph.SemaUnlock(semas) // unlock + if obj.debug && len(semas) > 0 { defer log.Printf("%s: Sema: V(%s)", obj, strings.Join(semas, ", ")) } diff --git a/resources/mgraph.go b/resources/mgraph.go index 8ec2caac..21fc0d32 100644 --- a/resources/mgraph.go +++ b/resources/mgraph.go @@ -23,6 +23,7 @@ import ( "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/pgraph" + "github.com/purpleidea/mgmt/util/semaphore" ) //go:generate stringer -type=graphState -output=graphstate_stringer.go @@ -42,19 +43,24 @@ type MGraph struct { //Graph *pgraph.Graph *pgraph.Graph // wrap a graph, and use its methods directly - Data *ResData - Debug bool + Data *ResData + FastPause bool + Debug bool state graphState // ptr b/c: Mutex/WaitGroup must not be copied after first use mutex *sync.Mutex wg *sync.WaitGroup + slock *sync.Mutex + semas map[string]*semaphore.Semaphore } // Init initializes the internal structures. func (obj *MGraph) Init() { obj.mutex = &sync.Mutex{} obj.wg = &sync.WaitGroup{} + obj.slock = &sync.Mutex{} // semaphore lock + obj.semas = make(map[string]*semaphore.Semaphore) } // getState returns the state of the graph. This state is used for optimizing @@ -84,7 +90,6 @@ func (obj *MGraph) Update(newGraph *pgraph.Graph) { for _, v := range obj.Graph.Vertices() { res := VtoR(v) // resource *res.Data() = *obj.Data // push the data around - res.Update(obj.Graph) // update graph pointer } } @@ -107,7 +112,7 @@ func (obj *MGraph) Start(first bool) { // start or continue // NOTE: vertex == res here, but pass in both in // case we ever wrap the res in something before // we store it as the vertex in the graph struct - res.Setup(obj.Graph, vertex, res) // initialize some vars in the resource + res.Setup(obj, vertex, res) // initialize some vars in the resource } }(v, VtoR(v)) } @@ -183,13 +188,13 @@ func (obj *MGraph) Pause(fastPause bool) { log.Printf("State: %v -> %v", obj.setState(graphStatePausing), obj.getState()) defer log.Printf("State: %v -> %v", obj.setState(graphStatePaused), obj.getState()) if fastPause { - obj.Graph.SetValue("fastpause", true) // set flag + obj.FastPause = true // set flag } t, _ := obj.Graph.TopologicalSort() for _, v := range t { // squeeze out the events... VtoR(v).SendEvent(event.EventPause, nil) // sync } - obj.Graph.SetValue("fastpause", false) // reset flag + obj.FastPause = false // reset flag } // Exit sends exit events to the graph in a topological sort order. diff --git a/resources/resources.go b/resources/resources.go index 3c791f38..ab96c407 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -117,8 +117,7 @@ type Base interface { Events() chan *event.Event Data() *ResData Working() *bool - Setup(*pgraph.Graph, pgraph.Vertex, Res) - Update(*pgraph.Graph) + Setup(*MGraph, pgraph.Vertex, Res) Reset() Exit() GetState() ResState @@ -167,7 +166,7 @@ type Res interface { // BaseRes is the base struct that gets used in every resource. type BaseRes struct { Res Res // pointer to full res - Graph *pgraph.Graph // pointer to graph I'm currently in + Graph *MGraph // pointer to graph I'm currently in Vertex pgraph.Vertex // pointer to vertex I currently am Recv map[string]*Send // mapping of key to receive on from value @@ -344,7 +343,7 @@ func (obj *BaseRes) Working() *bool { // Setup does some work which must happen before the Worker starts. It happens // once per Worker startup. It can happen in parallel with other Setup calls, so // add locks around any operation that's not thread-safe. -func (obj *BaseRes) Setup(graph *pgraph.Graph, vertex pgraph.Vertex, res Res) { +func (obj *BaseRes) Setup(mgraph *MGraph, vertex pgraph.Vertex, res Res) { obj.started = make(chan struct{}) // closes when started obj.stopped = make(chan struct{}) // closes when stopped @@ -354,12 +353,7 @@ func (obj *BaseRes) Setup(graph *pgraph.Graph, vertex pgraph.Vertex, res Res) { obj.Res = res // store a pointer to the full object obj.Vertex = vertex // store a pointer to the vertex i'm - obj.Graph = graph // store a pointer to the graph we're in -} - -// Update refreshes the internal graph pointer that we're primarily used in. -func (obj *BaseRes) Update(graph *pgraph.Graph) { - obj.Graph = graph // store a pointer to the graph i'm in + obj.Graph = mgraph // store a pointer to the graph we're in } // Reset from Setup. These can get called for different vertices in parallel. diff --git a/resources/semaphore.go b/resources/semaphore.go index 635306de..c70b550f 100644 --- a/resources/semaphore.go +++ b/resources/semaphore.go @@ -22,9 +22,7 @@ import ( "sort" "strconv" "strings" - "sync" - "github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/util/semaphore" multierr "github.com/hashicorp/go-multierror" @@ -34,21 +32,19 @@ import ( const SemaSep = ":" // SemaLock acquires the list of semaphores in the graph. -func SemaLock(g *pgraph.Graph, semas []string) error { +func (obj *MGraph) SemaLock(semas []string) error { var reterr error sort.Strings(semas) // very important to avoid deadlock in the dag! - slock := SemaLockFromGraph(g) - smap := SemaMapFromGraph(g) // returns a map, which can be modified by ref for _, id := range semas { - slock.Lock() // semaphore creation lock - sema, ok := smap[id] // lookup + obj.slock.Lock() // semaphore creation lock + sema, ok := obj.semas[id] // lookup if !ok { size := SemaSize(id) // defaults to 1 - smap[id] = semaphore.NewSemaphore(size) - sema = smap[id] + obj.semas[id] = semaphore.NewSemaphore(size) + sema = obj.semas[id] } - slock.Unlock() + obj.slock.Unlock() if err := sema.P(1); err != nil { // lock! reterr = multierr.Append(reterr, err) // list of errors @@ -58,13 +54,12 @@ func SemaLock(g *pgraph.Graph, semas []string) error { } // SemaUnlock releases the list of semaphores in the graph. -func SemaUnlock(g *pgraph.Graph, semas []string) error { +func (obj *MGraph) SemaUnlock(semas []string) error { var reterr error sort.Strings(semas) // unlock in the same order to remove partial locks - smap := SemaMapFromGraph(g) for _, id := range semas { - sema, ok := smap[id] // lookup + sema, ok := obj.semas[id] // lookup if !ok { // programming error! panic(fmt.Sprintf("graph: sema: %s does not exist", id)) @@ -90,36 +85,3 @@ func SemaSize(id string) int { } return size } - -// SemaLockFromGraph returns a pointer to the semaphore lock stored with the -// graph, otherwise it panics. If one does not exist, it will create it. -func SemaLockFromGraph(g *pgraph.Graph) *sync.Mutex { - x, exists := g.Value("slock") - if !exists { - g.SetValue("slock", &sync.Mutex{}) - x, _ = g.Value("slock") - } - - slock, ok := x.(*sync.Mutex) - if !ok { - panic("not a *sync.Mutex") - } - return slock -} - -// SemaMapFromGraph returns a pointer to the map of semaphores stored with the -// graph, otherwise it panics. If one does not exist, it will create it. -func SemaMapFromGraph(g *pgraph.Graph) map[string]*semaphore.Semaphore { - x, exists := g.Value("semas") - if !exists { - semas := make(map[string]*semaphore.Semaphore) - g.SetValue("semas", semas) - x, _ = g.Value("semas") - } - - semas, ok := x.(map[string]*semaphore.Semaphore) - if !ok { - panic("not a map[string]*semaphore.Semaphore") - } - return semas -}