From d74c2115fd52b4fdf64b4f852ae3df3930b234d4 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Sat, 13 May 2017 13:00:36 -0400 Subject: [PATCH] pgraph: Untangle the semaphore code from the pgraph implementation This re-implements the semaphore code on top of the graph kv store. --- pgraph/pgraph.go | 7 ------- pgraph/semaphore.go | 51 +++++++++++++++++++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/pgraph/pgraph.go b/pgraph/pgraph.go index 3a7d948b..1c3ff759 100644 --- a/pgraph/pgraph.go +++ b/pgraph/pgraph.go @@ -26,7 +26,6 @@ import ( "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/prometheus" "github.com/purpleidea/mgmt/resources" - "github.com/purpleidea/mgmt/util/semaphore" errwrap "github.com/pkg/errors" ) @@ -59,8 +58,6 @@ type Graph struct { fastPause bool // used to disable pokes for a fast pause mutex *sync.Mutex // used when modifying graph State variable wg *sync.WaitGroup - semas map[string]*semaphore.Semaphore - slock *sync.Mutex // semaphore mutex prometheus *prometheus.Prometheus // the prometheus instance } @@ -93,8 +90,6 @@ func (g *Graph) Init() error { // ptr b/c: Mutex/WaitGroup must not be copied after first use g.mutex = &sync.Mutex{} g.wg = &sync.WaitGroup{} - g.semas = make(map[string]*semaphore.Semaphore) - g.slock = &sync.Mutex{} return nil } @@ -155,8 +150,6 @@ func (g *Graph) Copy() *Graph { state: g.state, mutex: g.mutex, wg: g.wg, - semas: g.semas, - slock: g.slock, fastPause: g.fastPause, prometheus: g.prometheus, diff --git a/pgraph/semaphore.go b/pgraph/semaphore.go index e2b6bc08..b6ac7680 100644 --- a/pgraph/semaphore.go +++ b/pgraph/semaphore.go @@ -22,6 +22,7 @@ import ( "sort" "strconv" "strings" + "sync" "github.com/purpleidea/mgmt/util/semaphore" @@ -35,15 +36,18 @@ const SemaSep = ":" func (g *Graph) SemaLock(semas []string) error { var reterr error sort.Strings(semas) // very important to avoid deadlock in the dag! + slock := SemaLockFromGraph(g) + smap := *SemaMapFromGraph(g) // returns a *map, but can't use directly + for _, id := range semas { - g.slock.Lock() // semaphore creation lock - sema, ok := g.semas[id] // lookup + slock.Lock() // semaphore creation lock + sema, ok := smap[id] // lookup if !ok { size := SemaSize(id) // defaults to 1 - g.semas[id] = semaphore.NewSemaphore(size) - sema = g.semas[id] + smap[id] = semaphore.NewSemaphore(size) + sema = smap[id] } - g.slock.Unlock() + slock.Unlock() if err := sema.P(1); err != nil { // lock! reterr = multierr.Append(reterr, err) // list of errors @@ -56,8 +60,10 @@ func (g *Graph) SemaLock(semas []string) error { func (g *Graph) SemaUnlock(semas []string) error { var reterr error sort.Strings(semas) // unlock in the same order to remove partial locks + smap := *SemaMapFromGraph(g) + for _, id := range semas { - sema, ok := g.semas[id] // lookup + sema, ok := smap[id] // lookup if !ok { // programming error! panic(fmt.Sprintf("graph: sema: %s does not exist", id)) @@ -83,3 +89,36 @@ func SemaSize(id string) int { } return size } + +// SemaLockFromGraph returns a pointer to the semaphore lock stored with the +// graph, otherwise it panics. If one does not exist, it will create it. +func SemaLockFromGraph(g *Graph) *sync.Mutex { + x, exists := g.Value("slock") + if !exists { + g.SetValue("slock", &sync.Mutex{}) + x, _ = g.Value("slock") + } + + slock, ok := x.(*sync.Mutex) + if !ok { + panic("not a *sync.Mutex") + } + return slock +} + +// SemaMapFromGraph returns a pointer to the map of semaphores stored with the +// graph, otherwise it panics. If one does not exist, it will create it. +func SemaMapFromGraph(g *Graph) *map[string]*semaphore.Semaphore { + x, exists := g.Value("semas") + if !exists { + semas := make(map[string]*semaphore.Semaphore) + g.SetValue("semas", &semas) + x, _ = g.Value("semas") + } + + semas, ok := x.(*map[string]*semaphore.Semaphore) + if !ok { + panic("not a *map[string]*semaphore.Semaphore") + } + return semas +}