pgraph: Untangle the semaphore code from the pgraph implementation

This re-implements the semaphore code on top of the graph kv store.
This commit is contained in:
James Shubin
2017-05-13 13:00:36 -04:00
parent 70e7ee2d46
commit d74c2115fd
2 changed files with 45 additions and 13 deletions

View File

@@ -26,7 +26,6 @@ import (
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/prometheus"
"github.com/purpleidea/mgmt/resources"
"github.com/purpleidea/mgmt/util/semaphore"
errwrap "github.com/pkg/errors"
)
@@ -59,8 +58,6 @@ type Graph struct {
fastPause bool // used to disable pokes for a fast pause
mutex *sync.Mutex // used when modifying graph State variable
wg *sync.WaitGroup
semas map[string]*semaphore.Semaphore
slock *sync.Mutex // semaphore mutex
prometheus *prometheus.Prometheus // the prometheus instance
}
@@ -93,8 +90,6 @@ func (g *Graph) Init() error {
// ptr b/c: Mutex/WaitGroup must not be copied after first use
g.mutex = &sync.Mutex{}
g.wg = &sync.WaitGroup{}
g.semas = make(map[string]*semaphore.Semaphore)
g.slock = &sync.Mutex{}
return nil
}
@@ -155,8 +150,6 @@ func (g *Graph) Copy() *Graph {
state: g.state,
mutex: g.mutex,
wg: g.wg,
semas: g.semas,
slock: g.slock,
fastPause: g.fastPause,
prometheus: g.prometheus,

View File

@@ -22,6 +22,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"github.com/purpleidea/mgmt/util/semaphore"
@@ -35,15 +36,18 @@ const SemaSep = ":"
func (g *Graph) SemaLock(semas []string) error {
var reterr error
sort.Strings(semas) // very important to avoid deadlock in the dag!
slock := SemaLockFromGraph(g)
smap := *SemaMapFromGraph(g) // returns a *map, but can't use directly
for _, id := range semas {
g.slock.Lock() // semaphore creation lock
sema, ok := g.semas[id] // lookup
slock.Lock() // semaphore creation lock
sema, ok := smap[id] // lookup
if !ok {
size := SemaSize(id) // defaults to 1
g.semas[id] = semaphore.NewSemaphore(size)
sema = g.semas[id]
smap[id] = semaphore.NewSemaphore(size)
sema = smap[id]
}
g.slock.Unlock()
slock.Unlock()
if err := sema.P(1); err != nil { // lock!
reterr = multierr.Append(reterr, err) // list of errors
@@ -56,8 +60,10 @@ func (g *Graph) SemaLock(semas []string) error {
func (g *Graph) SemaUnlock(semas []string) error {
var reterr error
sort.Strings(semas) // unlock in the same order to remove partial locks
smap := *SemaMapFromGraph(g)
for _, id := range semas {
sema, ok := g.semas[id] // lookup
sema, ok := smap[id] // lookup
if !ok {
// programming error!
panic(fmt.Sprintf("graph: sema: %s does not exist", id))
@@ -83,3 +89,36 @@ func SemaSize(id string) int {
}
return size
}
// SemaLockFromGraph returns a pointer to the semaphore lock stored with the
// graph, otherwise it panics. If one does not exist, it will create it.
func SemaLockFromGraph(g *Graph) *sync.Mutex {
x, exists := g.Value("slock")
if !exists {
g.SetValue("slock", &sync.Mutex{})
x, _ = g.Value("slock")
}
slock, ok := x.(*sync.Mutex)
if !ok {
panic("not a *sync.Mutex")
}
return slock
}
// SemaMapFromGraph returns a pointer to the map of semaphores stored with the
// graph, otherwise it panics. If one does not exist, it will create it.
func SemaMapFromGraph(g *Graph) *map[string]*semaphore.Semaphore {
x, exists := g.Value("semas")
if !exists {
semas := make(map[string]*semaphore.Semaphore)
g.SetValue("semas", &semas)
x, _ = g.Value("semas")
}
semas, ok := x.(*map[string]*semaphore.Semaphore)
if !ok {
panic("not a *map[string]*semaphore.Semaphore")
}
return semas
}