resources: Replace stored pgraph with mgraph and clean up hacks
Now that we're using our meta wrapper graph struct instead of the pgraph, we can re-implement our SetValue hacks in terms of struct fields and the implementation is now cleaner.
This commit is contained in:
@@ -69,7 +69,7 @@ func (obj *BaseRes) Poke() error {
|
|||||||
// if we're pausing (or exiting) then we should suspend poke's so that
|
// if we're pausing (or exiting) then we should suspend poke's so that
|
||||||
// the graph doesn't go on running forever until it's completely done!
|
// the graph doesn't go on running forever until it's completely done!
|
||||||
// this is an optional feature which we can do by default on user exit
|
// this is an optional feature which we can do by default on user exit
|
||||||
if b, ok := obj.Graph.Value("fastpause"); ok && util.Bool(b) {
|
if obj.Graph.FastPause {
|
||||||
return nil // TODO: should this be an error instead?
|
return nil // TODO: should this be an error instead?
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -199,15 +199,15 @@ func (obj *BaseRes) Process() error {
|
|||||||
// The exception is that semaphores with a zero count will always block!
|
// The exception is that semaphores with a zero count will always block!
|
||||||
// TODO: Add a close mechanism to close/unblock zero count semaphores...
|
// TODO: Add a close mechanism to close/unblock zero count semaphores...
|
||||||
semas := obj.Meta().Sema
|
semas := obj.Meta().Sema
|
||||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) && len(semas) > 0 {
|
if obj.debug && len(semas) > 0 {
|
||||||
log.Printf("%s: Sema: P(%s)", obj, strings.Join(semas, ", "))
|
log.Printf("%s: Sema: P(%s)", obj, strings.Join(semas, ", "))
|
||||||
}
|
}
|
||||||
if err := SemaLock(obj.Graph, semas); err != nil { // lock
|
if err := obj.Graph.SemaLock(semas); err != nil { // lock
|
||||||
// NOTE: in practice, this might not ever be truly necessary...
|
// NOTE: in practice, this might not ever be truly necessary...
|
||||||
return fmt.Errorf("shutdown of semaphores")
|
return fmt.Errorf("shutdown of semaphores")
|
||||||
}
|
}
|
||||||
defer SemaUnlock(obj.Graph, semas) // unlock
|
defer obj.Graph.SemaUnlock(semas) // unlock
|
||||||
if b, ok := obj.Graph.Value("debug"); ok && util.Bool(b) && len(semas) > 0 {
|
if obj.debug && len(semas) > 0 {
|
||||||
defer log.Printf("%s: Sema: V(%s)", obj, strings.Join(semas, ", "))
|
defer log.Printf("%s: Sema: V(%s)", obj, strings.Join(semas, ", "))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import (
|
|||||||
|
|
||||||
"github.com/purpleidea/mgmt/event"
|
"github.com/purpleidea/mgmt/event"
|
||||||
"github.com/purpleidea/mgmt/pgraph"
|
"github.com/purpleidea/mgmt/pgraph"
|
||||||
|
"github.com/purpleidea/mgmt/util/semaphore"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:generate stringer -type=graphState -output=graphstate_stringer.go
|
//go:generate stringer -type=graphState -output=graphstate_stringer.go
|
||||||
@@ -42,19 +43,24 @@ type MGraph struct {
|
|||||||
//Graph *pgraph.Graph
|
//Graph *pgraph.Graph
|
||||||
*pgraph.Graph // wrap a graph, and use its methods directly
|
*pgraph.Graph // wrap a graph, and use its methods directly
|
||||||
|
|
||||||
Data *ResData
|
Data *ResData
|
||||||
Debug bool
|
FastPause bool
|
||||||
|
Debug bool
|
||||||
|
|
||||||
state graphState
|
state graphState
|
||||||
// ptr b/c: Mutex/WaitGroup must not be copied after first use
|
// ptr b/c: Mutex/WaitGroup must not be copied after first use
|
||||||
mutex *sync.Mutex
|
mutex *sync.Mutex
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
|
slock *sync.Mutex
|
||||||
|
semas map[string]*semaphore.Semaphore
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init initializes the internal structures.
|
// Init initializes the internal structures.
|
||||||
func (obj *MGraph) Init() {
|
func (obj *MGraph) Init() {
|
||||||
obj.mutex = &sync.Mutex{}
|
obj.mutex = &sync.Mutex{}
|
||||||
obj.wg = &sync.WaitGroup{}
|
obj.wg = &sync.WaitGroup{}
|
||||||
|
obj.slock = &sync.Mutex{} // semaphore lock
|
||||||
|
obj.semas = make(map[string]*semaphore.Semaphore)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getState returns the state of the graph. This state is used for optimizing
|
// getState returns the state of the graph. This state is used for optimizing
|
||||||
@@ -84,7 +90,6 @@ func (obj *MGraph) Update(newGraph *pgraph.Graph) {
|
|||||||
for _, v := range obj.Graph.Vertices() {
|
for _, v := range obj.Graph.Vertices() {
|
||||||
res := VtoR(v) // resource
|
res := VtoR(v) // resource
|
||||||
*res.Data() = *obj.Data // push the data around
|
*res.Data() = *obj.Data // push the data around
|
||||||
res.Update(obj.Graph) // update graph pointer
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -107,7 +112,7 @@ func (obj *MGraph) Start(first bool) { // start or continue
|
|||||||
// NOTE: vertex == res here, but pass in both in
|
// NOTE: vertex == res here, but pass in both in
|
||||||
// case we ever wrap the res in something before
|
// case we ever wrap the res in something before
|
||||||
// we store it as the vertex in the graph struct
|
// we store it as the vertex in the graph struct
|
||||||
res.Setup(obj.Graph, vertex, res) // initialize some vars in the resource
|
res.Setup(obj, vertex, res) // initialize some vars in the resource
|
||||||
}
|
}
|
||||||
}(v, VtoR(v))
|
}(v, VtoR(v))
|
||||||
}
|
}
|
||||||
@@ -183,13 +188,13 @@ func (obj *MGraph) Pause(fastPause bool) {
|
|||||||
log.Printf("State: %v -> %v", obj.setState(graphStatePausing), obj.getState())
|
log.Printf("State: %v -> %v", obj.setState(graphStatePausing), obj.getState())
|
||||||
defer log.Printf("State: %v -> %v", obj.setState(graphStatePaused), obj.getState())
|
defer log.Printf("State: %v -> %v", obj.setState(graphStatePaused), obj.getState())
|
||||||
if fastPause {
|
if fastPause {
|
||||||
obj.Graph.SetValue("fastpause", true) // set flag
|
obj.FastPause = true // set flag
|
||||||
}
|
}
|
||||||
t, _ := obj.Graph.TopologicalSort()
|
t, _ := obj.Graph.TopologicalSort()
|
||||||
for _, v := range t { // squeeze out the events...
|
for _, v := range t { // squeeze out the events...
|
||||||
VtoR(v).SendEvent(event.EventPause, nil) // sync
|
VtoR(v).SendEvent(event.EventPause, nil) // sync
|
||||||
}
|
}
|
||||||
obj.Graph.SetValue("fastpause", false) // reset flag
|
obj.FastPause = false // reset flag
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exit sends exit events to the graph in a topological sort order.
|
// Exit sends exit events to the graph in a topological sort order.
|
||||||
|
|||||||
@@ -117,8 +117,7 @@ type Base interface {
|
|||||||
Events() chan *event.Event
|
Events() chan *event.Event
|
||||||
Data() *ResData
|
Data() *ResData
|
||||||
Working() *bool
|
Working() *bool
|
||||||
Setup(*pgraph.Graph, pgraph.Vertex, Res)
|
Setup(*MGraph, pgraph.Vertex, Res)
|
||||||
Update(*pgraph.Graph)
|
|
||||||
Reset()
|
Reset()
|
||||||
Exit()
|
Exit()
|
||||||
GetState() ResState
|
GetState() ResState
|
||||||
@@ -167,7 +166,7 @@ type Res interface {
|
|||||||
// BaseRes is the base struct that gets used in every resource.
|
// BaseRes is the base struct that gets used in every resource.
|
||||||
type BaseRes struct {
|
type BaseRes struct {
|
||||||
Res Res // pointer to full res
|
Res Res // pointer to full res
|
||||||
Graph *pgraph.Graph // pointer to graph I'm currently in
|
Graph *MGraph // pointer to graph I'm currently in
|
||||||
Vertex pgraph.Vertex // pointer to vertex I currently am
|
Vertex pgraph.Vertex // pointer to vertex I currently am
|
||||||
|
|
||||||
Recv map[string]*Send // mapping of key to receive on from value
|
Recv map[string]*Send // mapping of key to receive on from value
|
||||||
@@ -344,7 +343,7 @@ func (obj *BaseRes) Working() *bool {
|
|||||||
// Setup does some work which must happen before the Worker starts. It happens
|
// Setup does some work which must happen before the Worker starts. It happens
|
||||||
// once per Worker startup. It can happen in parallel with other Setup calls, so
|
// once per Worker startup. It can happen in parallel with other Setup calls, so
|
||||||
// add locks around any operation that's not thread-safe.
|
// add locks around any operation that's not thread-safe.
|
||||||
func (obj *BaseRes) Setup(graph *pgraph.Graph, vertex pgraph.Vertex, res Res) {
|
func (obj *BaseRes) Setup(mgraph *MGraph, vertex pgraph.Vertex, res Res) {
|
||||||
obj.started = make(chan struct{}) // closes when started
|
obj.started = make(chan struct{}) // closes when started
|
||||||
obj.stopped = make(chan struct{}) // closes when stopped
|
obj.stopped = make(chan struct{}) // closes when stopped
|
||||||
|
|
||||||
@@ -354,12 +353,7 @@ func (obj *BaseRes) Setup(graph *pgraph.Graph, vertex pgraph.Vertex, res Res) {
|
|||||||
|
|
||||||
obj.Res = res // store a pointer to the full object
|
obj.Res = res // store a pointer to the full object
|
||||||
obj.Vertex = vertex // store a pointer to the vertex i'm
|
obj.Vertex = vertex // store a pointer to the vertex i'm
|
||||||
obj.Graph = graph // store a pointer to the graph we're in
|
obj.Graph = mgraph // store a pointer to the graph we're in
|
||||||
}
|
|
||||||
|
|
||||||
// Update refreshes the internal graph pointer that we're primarily used in.
|
|
||||||
func (obj *BaseRes) Update(graph *pgraph.Graph) {
|
|
||||||
obj.Graph = graph // store a pointer to the graph i'm in
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset from Setup. These can get called for different vertices in parallel.
|
// Reset from Setup. These can get called for different vertices in parallel.
|
||||||
|
|||||||
@@ -22,9 +22,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/pgraph"
|
|
||||||
"github.com/purpleidea/mgmt/util/semaphore"
|
"github.com/purpleidea/mgmt/util/semaphore"
|
||||||
|
|
||||||
multierr "github.com/hashicorp/go-multierror"
|
multierr "github.com/hashicorp/go-multierror"
|
||||||
@@ -34,21 +32,19 @@ import (
|
|||||||
const SemaSep = ":"
|
const SemaSep = ":"
|
||||||
|
|
||||||
// SemaLock acquires the list of semaphores in the graph.
|
// SemaLock acquires the list of semaphores in the graph.
|
||||||
func SemaLock(g *pgraph.Graph, semas []string) error {
|
func (obj *MGraph) SemaLock(semas []string) error {
|
||||||
var reterr error
|
var reterr error
|
||||||
sort.Strings(semas) // very important to avoid deadlock in the dag!
|
sort.Strings(semas) // very important to avoid deadlock in the dag!
|
||||||
slock := SemaLockFromGraph(g)
|
|
||||||
smap := SemaMapFromGraph(g) // returns a map, which can be modified by ref
|
|
||||||
|
|
||||||
for _, id := range semas {
|
for _, id := range semas {
|
||||||
slock.Lock() // semaphore creation lock
|
obj.slock.Lock() // semaphore creation lock
|
||||||
sema, ok := smap[id] // lookup
|
sema, ok := obj.semas[id] // lookup
|
||||||
if !ok {
|
if !ok {
|
||||||
size := SemaSize(id) // defaults to 1
|
size := SemaSize(id) // defaults to 1
|
||||||
smap[id] = semaphore.NewSemaphore(size)
|
obj.semas[id] = semaphore.NewSemaphore(size)
|
||||||
sema = smap[id]
|
sema = obj.semas[id]
|
||||||
}
|
}
|
||||||
slock.Unlock()
|
obj.slock.Unlock()
|
||||||
|
|
||||||
if err := sema.P(1); err != nil { // lock!
|
if err := sema.P(1); err != nil { // lock!
|
||||||
reterr = multierr.Append(reterr, err) // list of errors
|
reterr = multierr.Append(reterr, err) // list of errors
|
||||||
@@ -58,13 +54,12 @@ func SemaLock(g *pgraph.Graph, semas []string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SemaUnlock releases the list of semaphores in the graph.
|
// SemaUnlock releases the list of semaphores in the graph.
|
||||||
func SemaUnlock(g *pgraph.Graph, semas []string) error {
|
func (obj *MGraph) SemaUnlock(semas []string) error {
|
||||||
var reterr error
|
var reterr error
|
||||||
sort.Strings(semas) // unlock in the same order to remove partial locks
|
sort.Strings(semas) // unlock in the same order to remove partial locks
|
||||||
smap := SemaMapFromGraph(g)
|
|
||||||
|
|
||||||
for _, id := range semas {
|
for _, id := range semas {
|
||||||
sema, ok := smap[id] // lookup
|
sema, ok := obj.semas[id] // lookup
|
||||||
if !ok {
|
if !ok {
|
||||||
// programming error!
|
// programming error!
|
||||||
panic(fmt.Sprintf("graph: sema: %s does not exist", id))
|
panic(fmt.Sprintf("graph: sema: %s does not exist", id))
|
||||||
@@ -90,36 +85,3 @@ func SemaSize(id string) int {
|
|||||||
}
|
}
|
||||||
return size
|
return size
|
||||||
}
|
}
|
||||||
|
|
||||||
// SemaLockFromGraph returns a pointer to the semaphore lock stored with the
|
|
||||||
// graph, otherwise it panics. If one does not exist, it will create it.
|
|
||||||
func SemaLockFromGraph(g *pgraph.Graph) *sync.Mutex {
|
|
||||||
x, exists := g.Value("slock")
|
|
||||||
if !exists {
|
|
||||||
g.SetValue("slock", &sync.Mutex{})
|
|
||||||
x, _ = g.Value("slock")
|
|
||||||
}
|
|
||||||
|
|
||||||
slock, ok := x.(*sync.Mutex)
|
|
||||||
if !ok {
|
|
||||||
panic("not a *sync.Mutex")
|
|
||||||
}
|
|
||||||
return slock
|
|
||||||
}
|
|
||||||
|
|
||||||
// SemaMapFromGraph returns a pointer to the map of semaphores stored with the
|
|
||||||
// graph, otherwise it panics. If one does not exist, it will create it.
|
|
||||||
func SemaMapFromGraph(g *pgraph.Graph) map[string]*semaphore.Semaphore {
|
|
||||||
x, exists := g.Value("semas")
|
|
||||||
if !exists {
|
|
||||||
semas := make(map[string]*semaphore.Semaphore)
|
|
||||||
g.SetValue("semas", semas)
|
|
||||||
x, _ = g.Value("semas")
|
|
||||||
}
|
|
||||||
|
|
||||||
semas, ok := x.(map[string]*semaphore.Semaphore)
|
|
||||||
if !ok {
|
|
||||||
panic("not a map[string]*semaphore.Semaphore")
|
|
||||||
}
|
|
||||||
return semas
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user