semaphore: Create a semaphore metaparam

This adds a P/V style semaphore mechanism to the resource graph. This
enables the user to specify a number of "id:count" tags associated with
each resource which will reduce the parallelism of the CheckApply
operation to that maximum count.

This is particularly interesting because (assuming I'm not mistaken) the
implementation is dead-lock free assuming that no individual resource
permanently ever blocks during execution! I don't have a formal proof of
this, but I was able to convince myself on paper that it was the case.

An actual proof that N P/V counting semaphores in a DAG won't ever
dead-lock would be particularly welcome! Hint: the trick is to acquire
them in alphabetical order while respecting the DAG flow. Disclaimer,
this assumes that the lock count is always > 0 of course.
This commit is contained in:
James Shubin
2017-02-26 20:02:36 -05:00
parent 757cb0cf23
commit d8e19cd79a
11 changed files with 442 additions and 3 deletions

View File

@@ -457,6 +457,17 @@ the rate limiter as designated by the `Limit` value. If the `Limit` is not set
to `+Infinity`, this must be a non-zero value. Please see the
[rate](https://godoc.org/golang.org/x/time/rate) package for more information.
#### Sema
List of string ids. Sema is a P/V style counting semaphore which can be used to
limit parallelism during the CheckApply phase of resource execution. Each
resource can have `N` different semaphores which share a graph global namespace.
Each semaphore has a maximum count associated with it. The default value of the
size is 1 (one) if size is unspecified. Each string id is the unique id of the
semaphore. If the id contains a trailing colon (:) followed by a positive
integer, then that value is the max size for that semaphore. Valid semaphore
id's include: `some_id`, `hello:42`, `not:smart:4` and `:13`. It is expected
that the last bare example be only used by the engine to add a global semaphore.
### Graph definition file
graph.yaml is the compiled graph definition file. The format is currently
undocumented, but by looking through the [examples/](https://github.com/purpleidea/mgmt/tree/master/examples)
@@ -481,6 +492,15 @@ Globally force all resources into no-op mode. This also disables the export to
etcd functionality, but does not disable resource collection, however all
resources that are collected will have their individual noop settings set.
#### `--sema <size>`
Globally add a counting semaphore of this size to each resource in the graph.
The semaphore will get given an id of `:size`. In other words if you specify a
size of 42, you can expect a semaphore if named: `:42`. It is expected that
consumers of the semaphore metaparameter always include a prefix to avoid a
collision with this globally defined semaphore. The size value must be greater
than zero at this time. The traditional non-parallel execution found in config
management tools such as `Puppet` can be obtained with `--sema 1`.
#### `--remote <graph.yaml>`
Point to a graph file to run on the remote host specified within. This parameter
can be used multiple times if you'd like to remotely run on multiple hosts in

67
examples/exec3-sema.yaml Normal file
View File

@@ -0,0 +1,67 @@
---
graph: parallel
resources:
exec:
- name: pkg10
meta:
sema: ['mylock:1', 'otherlock:42']
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
- name: svc10
meta:
sema: ['mylock:1']
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
- name: exec10
meta:
sema: ['mylock:1']
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
- name: pkg15
meta:
sema: ['mylock:1', 'otherlock:42']
cmd: sleep 15s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
edges:
- name: e1
from:
kind: exec
name: pkg10
to:
kind: exec
name: svc10
- name: e2
from:
kind: exec
name: svc10
to:
kind: exec
name: exec10

View File

@@ -84,6 +84,7 @@ func run(c *cli.Context) error {
obj.NoWatch = c.Bool("no-watch")
obj.Noop = c.Bool("noop")
obj.Sema = c.Int("sema")
obj.Graphviz = c.String("graphviz")
obj.GraphvizFilter = c.String("graphviz-filter")
obj.ConvergedTimeout = c.Int("converged-timeout")
@@ -228,6 +229,11 @@ func CLI(program, version string, flags Flags) error {
Name: "noop",
Usage: "globally force all resources into no-op mode",
},
cli.IntFlag{
Name: "sema",
Value: -1,
Usage: "globally add a semaphore to all resources with this lock count",
},
cli.StringFlag{
Name: "graphviz, g",
Value: "",

View File

@@ -67,6 +67,7 @@ type Main struct {
NoWatch bool // do not update graph on watched graph definition file changes
Noop bool // globally force all resources into no-op mode
Sema int // add a semaphore with this lock count to each resource
Graphviz string // output file for graphviz data
GraphvizFilter string // graphviz filter to use
ConvergedTimeout int // exit after approximately this many seconds in a converged state; -1 to disable
@@ -443,11 +444,17 @@ func (obj *Main) Run() error {
Debug: obj.Flags.Debug,
})
// apply the global noop parameter if requested
if obj.Noop {
for _, m := range newGraph.GraphMetas() {
for _, m := range newGraph.GraphMetas() {
// apply the global noop parameter if requested
if obj.Noop {
m.Noop = obj.Noop
}
// append the semaphore to each resource
if obj.Sema > 0 { // NOTE: size == 0 would block
// a semaphore with an empty id is valid
m.Sema = append(m.Sema, fmt.Sprintf(":%d", obj.Sema))
}
}
// FIXME: make sure we "UnGroup()" any semi-destructive

View File

@@ -21,6 +21,7 @@ import (
"fmt"
"log"
"math"
"strings"
"sync"
"time"
@@ -164,8 +165,30 @@ func (g *Graph) Process(v *Vertex) error {
if g.Flags.Debug {
log.Printf("%s[%s]: Process()", obj.Kind(), obj.GetName())
}
// FIXME: should these SetState methods be here or after the sema code?
defer obj.SetState(resources.ResStateNil) // reset state when finished
obj.SetState(resources.ResStateProcess)
// semaphores!
// These shouldn't ever block an exit, since the graph should eventually
// converge causing their them to unlock. More interestingly, since they
// run in a DAG alphabetically, there is no way to permanently deadlock,
// assuming that resources individually don't ever block from finishing!
// The exception is that semaphores with a zero count will always block!
// TODO: Add a close mechanism to close/unblock zero count semaphores...
semas := obj.Meta().Sema
if g.Flags.Debug && len(semas) > 0 {
log.Printf("%s[%s]: Sema: P(%s)", obj.Kind(), obj.GetName(), strings.Join(semas, ", "))
}
if err := g.SemaLock(semas); err != nil { // lock
// NOTE: in practice, this might not ever be truly necessary...
return fmt.Errorf("shutdown of semaphores")
}
defer g.SemaUnlock(semas) // unlock
if g.Flags.Debug && len(semas) > 0 {
defer log.Printf("%s[%s]: Sema: V(%s)", obj.Kind(), obj.GetName(), strings.Join(semas, ", "))
}
var ok = true
var applied = false // did we run an apply?
// is it okay to run dependency wise right now?

View File

@@ -26,6 +26,7 @@ import (
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/prometheus"
"github.com/purpleidea/mgmt/resources"
"github.com/purpleidea/mgmt/util/semaphore"
errwrap "github.com/pkg/errors"
)
@@ -59,6 +60,7 @@ type Graph struct {
state graphState
mutex *sync.Mutex // used when modifying graph State variable
wg *sync.WaitGroup
semas map[string]*semaphore.Semaphore
prometheus *prometheus.Prometheus // the prometheus instance
}
@@ -86,6 +88,7 @@ func NewGraph(name string) *Graph {
// ptr b/c: Mutex/WaitGroup must not be copied after first use
mutex: &sync.Mutex{},
wg: &sync.WaitGroup{},
semas: make(map[string]*semaphore.Semaphore),
}
}
@@ -122,6 +125,7 @@ func (g *Graph) Copy() *Graph {
state: g.state,
mutex: g.mutex,
wg: g.wg,
semas: g.semas,
prometheus: g.prometheus,
}

78
pgraph/semaphore.go Normal file
View File

@@ -0,0 +1,78 @@
// Mgmt
// Copyright (C) 2013-2017+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgraph
import (
"fmt"
"sort"
"strconv"
"strings"
"github.com/purpleidea/mgmt/util/semaphore"
multierr "github.com/hashicorp/go-multierror"
)
// SemaSep is the trailing separator to split the semaphore id from the size.
const SemaSep = ":"
// SemaLock acquires the list of semaphores in the graph.
func (g *Graph) SemaLock(semas []string) error {
var reterr error
sort.Strings(semas) // very important to avoid deadlock in the dag!
for _, id := range semas {
size := 1 // default semaphore size
// valid id's include "some_id", "hello:42" and ":13"
if index := strings.LastIndex(id, SemaSep); index > -1 && (len(id)-index+len(SemaSep)) >= 1 {
// NOTE: we only allow size > 0 here!
if i, err := strconv.Atoi(id[index+len(SemaSep):]); err == nil && i > 0 {
size = i
}
}
sema, ok := g.semas[id] // lookup
if !ok {
g.semas[id] = semaphore.NewSemaphore(size)
sema = g.semas[id]
}
if err := sema.P(1); err != nil { // lock!
reterr = multierr.Append(reterr, err) // list of errors
}
}
return reterr
}
// SemaUnlock releases the list of semaphores in the graph.
func (g *Graph) SemaUnlock(semas []string) error {
var reterr error
sort.Strings(semas) // unlock in the same order to remove partial locks
for _, id := range semas {
sema, ok := g.semas[id] // lookup
if !ok {
// programming error!
panic(fmt.Sprintf("graph: sema: %s does not exist", id))
}
if err := sema.V(1); err != nil { // unlock!
reterr = multierr.Append(reterr, err) // list of errors
}
}
return reterr
}

View File

@@ -27,6 +27,7 @@ import (
"math"
"os"
"path"
"sort"
"sync"
"time"
@@ -101,6 +102,7 @@ type MetaParams struct {
Poll uint32 `yaml:"poll"` // metaparam, number of seconds between poll intervals, 0 to watch
Limit rate.Limit `yaml:"limit"` // metaparam, number of events per second to allow through
Burst int `yaml:"burst"` // metaparam, number of events to allow in a burst
Sema []string `yaml:"sema"` // metaparam, list of semaphore ids (id | id:count)
}
// UnmarshalYAML is the custom unmarshal handler for the MetaParams struct. It
@@ -127,6 +129,7 @@ var DefaultMetaParams = MetaParams{
Poll: 0, // defaults to watching for events
Limit: rate.Inf, // defaults to no limit
Burst: 0, // no burst needed on an infinite rate // TODO: is this a good default?
//Sema: []string{},
}
// The Base interface is everything that is common to all resources.
@@ -550,6 +553,24 @@ func (obj *BaseRes) Compare(res Res) bool {
if obj.Meta().Burst != res.Meta().Burst {
return false
}
// are the two slices the same?
cmpSlices := func(a, b []string) bool {
if len(a) != len(b) {
return false
}
sort.Strings(a)
sort.Strings(b)
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
if !cmpSlices(obj.Meta().Sema, res.Meta().Sema) {
return false
}
return true
}

8
test/shell/sema-1.sh Executable file
View File

@@ -0,0 +1,8 @@
#!/bin/bash -e
# should take at least 55s, but fail if we block this
# TODO: it would be nice to make sure this test doesn't exit too early!
$timeout --kill-after=120s 110s ./mgmt run --yaml sema-1.yaml --sema 2 --converged-timeout=5 --no-watch --no-pgp --tmp-prefix &
pid=$!
wait $pid # get exit status
exit $?

128
test/shell/sema-1.yaml Normal file
View File

@@ -0,0 +1,128 @@
---
graph: mygraph
comment: simple exec fan in to fan out example with semaphore
resources:
exec:
- name: exec1
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
- name: exec2
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
- name: exec3
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
- name: exec4
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
- name: exec5
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
- name: exec6
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
- name: exec7
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
- name: exec8
cmd: sleep 10s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
edges:
- name: e1
from:
kind: exec
name: exec1
to:
kind: exec
name: exec4
- name: e2
from:
kind: exec
name: exec2
to:
kind: exec
name: exec4
- name: e3
from:
kind: exec
name: exec3
to:
kind: exec
name: exec4
- name: e4
from:
kind: exec
name: exec4
to:
kind: exec
name: exec5
- name: e5
from:
kind: exec
name: exec4
to:
kind: exec
name: exec6
- name: e6
from:
kind: exec
name: exec4
to:
kind: exec
name: exec7

View File

@@ -0,0 +1,77 @@
// Mgmt
// Copyright (C) 2013-2017+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Package semaphore contains an implementation of a counting semaphore.
package semaphore
import (
"fmt"
)
// Semaphore is a counting semaphore. It must be initialized before use.
type Semaphore struct {
C chan struct{}
closed chan struct{}
}
// NewSemaphore creates a new semaphore.
func NewSemaphore(size int) *Semaphore {
obj := &Semaphore{}
obj.Init(size)
return obj
}
// Init initializes the semaphore.
func (obj *Semaphore) Init(size int) {
obj.C = make(chan struct{}, size)
obj.closed = make(chan struct{})
}
// Close shuts down the semaphore and releases all the locks.
func (obj *Semaphore) Close() {
// TODO: we could return an error if any semaphores were killed, but
// it's not particularly useful to know that for this application...
close(obj.closed)
}
// P acquires n resources.
func (obj *Semaphore) P(n int) error {
for i := 0; i < n; i++ {
select {
case obj.C <- struct{}{}: // acquire one
case <-obj.closed: // exit signal
return fmt.Errorf("closed")
}
}
return nil
}
// V releases n resources.
func (obj *Semaphore) V(n int) error {
for i := 0; i < n; i++ {
select {
case <-obj.C: // release one
// TODO: is the closed signal needed if unlocks should always pass?
case <-obj.closed: // exit signal
return fmt.Errorf("closed")
// TODO: is it true you shouldn't call a release before a lock?
default: // trying to release something that isn't locked
panic("semaphore: V > P")
}
}
return nil
}