diff --git a/docs/documentation.md b/docs/documentation.md index dd440364..935acb70 100644 --- a/docs/documentation.md +++ b/docs/documentation.md @@ -457,6 +457,17 @@ the rate limiter as designated by the `Limit` value. If the `Limit` is not set to `+Infinity`, this must be a non-zero value. Please see the [rate](https://godoc.org/golang.org/x/time/rate) package for more information. +#### Sema +List of string ids. Sema is a P/V style counting semaphore which can be used to +limit parallelism during the CheckApply phase of resource execution. Each +resource can have `N` different semaphores which share a graph global namespace. +Each semaphore has a maximum count associated with it. The default value of the +size is 1 (one) if size is unspecified. Each string id is the unique id of the +semaphore. If the id contains a trailing colon (:) followed by a positive +integer, then that value is the max size for that semaphore. Valid semaphore +id's include: `some_id`, `hello:42`, `not:smart:4` and `:13`. It is expected +that the last bare example be only used by the engine to add a global semaphore. + ### Graph definition file graph.yaml is the compiled graph definition file. The format is currently undocumented, but by looking through the [examples/](https://github.com/purpleidea/mgmt/tree/master/examples) @@ -481,6 +492,15 @@ Globally force all resources into no-op mode. This also disables the export to etcd functionality, but does not disable resource collection, however all resources that are collected will have their individual noop settings set. +#### `--sema ` +Globally add a counting semaphore of this size to each resource in the graph. +The semaphore will get given an id of `:size`. In other words if you specify a +size of 42, you can expect a semaphore if named: `:42`. It is expected that +consumers of the semaphore metaparameter always include a prefix to avoid a +collision with this globally defined semaphore. The size value must be greater +than zero at this time. The traditional non-parallel execution found in config +management tools such as `Puppet` can be obtained with `--sema 1`. + #### `--remote ` Point to a graph file to run on the remote host specified within. This parameter can be used multiple times if you'd like to remotely run on multiple hosts in diff --git a/examples/exec3-sema.yaml b/examples/exec3-sema.yaml new file mode 100644 index 00000000..d9c3abdd --- /dev/null +++ b/examples/exec3-sema.yaml @@ -0,0 +1,67 @@ +--- +graph: parallel +resources: + exec: + - name: pkg10 + meta: + sema: ['mylock:1', 'otherlock:42'] + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: svc10 + meta: + sema: ['mylock:1'] + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: exec10 + meta: + sema: ['mylock:1'] + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: pkg15 + meta: + sema: ['mylock:1', 'otherlock:42'] + cmd: sleep 15s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present +edges: +- name: e1 + from: + kind: exec + name: pkg10 + to: + kind: exec + name: svc10 +- name: e2 + from: + kind: exec + name: svc10 + to: + kind: exec + name: exec10 diff --git a/lib/cli.go b/lib/cli.go index 4c5bfccd..76a9dbfe 100644 --- a/lib/cli.go +++ b/lib/cli.go @@ -84,6 +84,7 @@ func run(c *cli.Context) error { obj.NoWatch = c.Bool("no-watch") obj.Noop = c.Bool("noop") + obj.Sema = c.Int("sema") obj.Graphviz = c.String("graphviz") obj.GraphvizFilter = c.String("graphviz-filter") obj.ConvergedTimeout = c.Int("converged-timeout") @@ -228,6 +229,11 @@ func CLI(program, version string, flags Flags) error { Name: "noop", Usage: "globally force all resources into no-op mode", }, + cli.IntFlag{ + Name: "sema", + Value: -1, + Usage: "globally add a semaphore to all resources with this lock count", + }, cli.StringFlag{ Name: "graphviz, g", Value: "", diff --git a/lib/main.go b/lib/main.go index f216534a..705de701 100644 --- a/lib/main.go +++ b/lib/main.go @@ -67,6 +67,7 @@ type Main struct { NoWatch bool // do not update graph on watched graph definition file changes Noop bool // globally force all resources into no-op mode + Sema int // add a semaphore with this lock count to each resource Graphviz string // output file for graphviz data GraphvizFilter string // graphviz filter to use ConvergedTimeout int // exit after approximately this many seconds in a converged state; -1 to disable @@ -443,11 +444,17 @@ func (obj *Main) Run() error { Debug: obj.Flags.Debug, }) - // apply the global noop parameter if requested - if obj.Noop { - for _, m := range newGraph.GraphMetas() { + for _, m := range newGraph.GraphMetas() { + // apply the global noop parameter if requested + if obj.Noop { m.Noop = obj.Noop } + + // append the semaphore to each resource + if obj.Sema > 0 { // NOTE: size == 0 would block + // a semaphore with an empty id is valid + m.Sema = append(m.Sema, fmt.Sprintf(":%d", obj.Sema)) + } } // FIXME: make sure we "UnGroup()" any semi-destructive diff --git a/pgraph/actions.go b/pgraph/actions.go index cedfe3d1..55968b06 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -21,6 +21,7 @@ import ( "fmt" "log" "math" + "strings" "sync" "time" @@ -164,8 +165,30 @@ func (g *Graph) Process(v *Vertex) error { if g.Flags.Debug { log.Printf("%s[%s]: Process()", obj.Kind(), obj.GetName()) } + // FIXME: should these SetState methods be here or after the sema code? defer obj.SetState(resources.ResStateNil) // reset state when finished obj.SetState(resources.ResStateProcess) + + // semaphores! + // These shouldn't ever block an exit, since the graph should eventually + // converge causing their them to unlock. More interestingly, since they + // run in a DAG alphabetically, there is no way to permanently deadlock, + // assuming that resources individually don't ever block from finishing! + // The exception is that semaphores with a zero count will always block! + // TODO: Add a close mechanism to close/unblock zero count semaphores... + semas := obj.Meta().Sema + if g.Flags.Debug && len(semas) > 0 { + log.Printf("%s[%s]: Sema: P(%s)", obj.Kind(), obj.GetName(), strings.Join(semas, ", ")) + } + if err := g.SemaLock(semas); err != nil { // lock + // NOTE: in practice, this might not ever be truly necessary... + return fmt.Errorf("shutdown of semaphores") + } + defer g.SemaUnlock(semas) // unlock + if g.Flags.Debug && len(semas) > 0 { + defer log.Printf("%s[%s]: Sema: V(%s)", obj.Kind(), obj.GetName(), strings.Join(semas, ", ")) + } + var ok = true var applied = false // did we run an apply? // is it okay to run dependency wise right now? diff --git a/pgraph/pgraph.go b/pgraph/pgraph.go index da8d4230..c5852abe 100644 --- a/pgraph/pgraph.go +++ b/pgraph/pgraph.go @@ -26,6 +26,7 @@ import ( "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/prometheus" "github.com/purpleidea/mgmt/resources" + "github.com/purpleidea/mgmt/util/semaphore" errwrap "github.com/pkg/errors" ) @@ -59,6 +60,7 @@ type Graph struct { state graphState mutex *sync.Mutex // used when modifying graph State variable wg *sync.WaitGroup + semas map[string]*semaphore.Semaphore prometheus *prometheus.Prometheus // the prometheus instance } @@ -86,6 +88,7 @@ func NewGraph(name string) *Graph { // ptr b/c: Mutex/WaitGroup must not be copied after first use mutex: &sync.Mutex{}, wg: &sync.WaitGroup{}, + semas: make(map[string]*semaphore.Semaphore), } } @@ -122,6 +125,7 @@ func (g *Graph) Copy() *Graph { state: g.state, mutex: g.mutex, wg: g.wg, + semas: g.semas, prometheus: g.prometheus, } diff --git a/pgraph/semaphore.go b/pgraph/semaphore.go new file mode 100644 index 00000000..9a32697b --- /dev/null +++ b/pgraph/semaphore.go @@ -0,0 +1,78 @@ +// Mgmt +// Copyright (C) 2013-2017+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgraph + +import ( + "fmt" + "sort" + "strconv" + "strings" + + "github.com/purpleidea/mgmt/util/semaphore" + + multierr "github.com/hashicorp/go-multierror" +) + +// SemaSep is the trailing separator to split the semaphore id from the size. +const SemaSep = ":" + +// SemaLock acquires the list of semaphores in the graph. +func (g *Graph) SemaLock(semas []string) error { + var reterr error + sort.Strings(semas) // very important to avoid deadlock in the dag! + for _, id := range semas { + + size := 1 // default semaphore size + // valid id's include "some_id", "hello:42" and ":13" + if index := strings.LastIndex(id, SemaSep); index > -1 && (len(id)-index+len(SemaSep)) >= 1 { + // NOTE: we only allow size > 0 here! + if i, err := strconv.Atoi(id[index+len(SemaSep):]); err == nil && i > 0 { + size = i + } + } + + sema, ok := g.semas[id] // lookup + if !ok { + g.semas[id] = semaphore.NewSemaphore(size) + sema = g.semas[id] + } + + if err := sema.P(1); err != nil { // lock! + reterr = multierr.Append(reterr, err) // list of errors + } + } + return reterr +} + +// SemaUnlock releases the list of semaphores in the graph. +func (g *Graph) SemaUnlock(semas []string) error { + var reterr error + sort.Strings(semas) // unlock in the same order to remove partial locks + for _, id := range semas { + sema, ok := g.semas[id] // lookup + if !ok { + // programming error! + panic(fmt.Sprintf("graph: sema: %s does not exist", id)) + } + + if err := sema.V(1); err != nil { // unlock! + reterr = multierr.Append(reterr, err) // list of errors + } + } + return reterr +} diff --git a/resources/resources.go b/resources/resources.go index f997182d..aaf5e460 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -27,6 +27,7 @@ import ( "math" "os" "path" + "sort" "sync" "time" @@ -101,6 +102,7 @@ type MetaParams struct { Poll uint32 `yaml:"poll"` // metaparam, number of seconds between poll intervals, 0 to watch Limit rate.Limit `yaml:"limit"` // metaparam, number of events per second to allow through Burst int `yaml:"burst"` // metaparam, number of events to allow in a burst + Sema []string `yaml:"sema"` // metaparam, list of semaphore ids (id | id:count) } // UnmarshalYAML is the custom unmarshal handler for the MetaParams struct. It @@ -127,6 +129,7 @@ var DefaultMetaParams = MetaParams{ Poll: 0, // defaults to watching for events Limit: rate.Inf, // defaults to no limit Burst: 0, // no burst needed on an infinite rate // TODO: is this a good default? + //Sema: []string{}, } // The Base interface is everything that is common to all resources. @@ -550,6 +553,24 @@ func (obj *BaseRes) Compare(res Res) bool { if obj.Meta().Burst != res.Meta().Burst { return false } + + // are the two slices the same? + cmpSlices := func(a, b []string) bool { + if len(a) != len(b) { + return false + } + sort.Strings(a) + sort.Strings(b) + for i := range a { + if a[i] != b[i] { + return false + } + } + return true + } + if !cmpSlices(obj.Meta().Sema, res.Meta().Sema) { + return false + } return true } diff --git a/test/shell/sema-1.sh b/test/shell/sema-1.sh new file mode 100755 index 00000000..8c9cb212 --- /dev/null +++ b/test/shell/sema-1.sh @@ -0,0 +1,8 @@ +#!/bin/bash -e + +# should take at least 55s, but fail if we block this +# TODO: it would be nice to make sure this test doesn't exit too early! +$timeout --kill-after=120s 110s ./mgmt run --yaml sema-1.yaml --sema 2 --converged-timeout=5 --no-watch --no-pgp --tmp-prefix & +pid=$! +wait $pid # get exit status +exit $? diff --git a/test/shell/sema-1.yaml b/test/shell/sema-1.yaml new file mode 100644 index 00000000..f07e70d2 --- /dev/null +++ b/test/shell/sema-1.yaml @@ -0,0 +1,128 @@ +--- +graph: mygraph +comment: simple exec fan in to fan out example with semaphore +resources: + exec: + - name: exec1 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: exec2 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: exec3 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: exec4 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: exec5 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: exec6 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: exec7 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present + - name: exec8 + cmd: sleep 10s + shell: '' + timeout: 0 + watchcmd: '' + watchshell: '' + ifcmd: '' + ifshell: '' + pollint: 0 + state: present +edges: +- name: e1 + from: + kind: exec + name: exec1 + to: + kind: exec + name: exec4 +- name: e2 + from: + kind: exec + name: exec2 + to: + kind: exec + name: exec4 +- name: e3 + from: + kind: exec + name: exec3 + to: + kind: exec + name: exec4 +- name: e4 + from: + kind: exec + name: exec4 + to: + kind: exec + name: exec5 +- name: e5 + from: + kind: exec + name: exec4 + to: + kind: exec + name: exec6 +- name: e6 + from: + kind: exec + name: exec4 + to: + kind: exec + name: exec7 diff --git a/util/semaphore/semaphore.go b/util/semaphore/semaphore.go new file mode 100644 index 00000000..0105067e --- /dev/null +++ b/util/semaphore/semaphore.go @@ -0,0 +1,77 @@ +// Mgmt +// Copyright (C) 2013-2017+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Package semaphore contains an implementation of a counting semaphore. +package semaphore + +import ( + "fmt" +) + +// Semaphore is a counting semaphore. It must be initialized before use. +type Semaphore struct { + C chan struct{} + closed chan struct{} +} + +// NewSemaphore creates a new semaphore. +func NewSemaphore(size int) *Semaphore { + obj := &Semaphore{} + obj.Init(size) + return obj +} + +// Init initializes the semaphore. +func (obj *Semaphore) Init(size int) { + obj.C = make(chan struct{}, size) + obj.closed = make(chan struct{}) +} + +// Close shuts down the semaphore and releases all the locks. +func (obj *Semaphore) Close() { + // TODO: we could return an error if any semaphores were killed, but + // it's not particularly useful to know that for this application... + close(obj.closed) +} + +// P acquires n resources. +func (obj *Semaphore) P(n int) error { + for i := 0; i < n; i++ { + select { + case obj.C <- struct{}{}: // acquire one + case <-obj.closed: // exit signal + return fmt.Errorf("closed") + } + } + return nil +} + +// V releases n resources. +func (obj *Semaphore) V(n int) error { + for i := 0; i < n; i++ { + select { + case <-obj.C: // release one + // TODO: is the closed signal needed if unlocks should always pass? + case <-obj.closed: // exit signal + return fmt.Errorf("closed") + // TODO: is it true you shouldn't call a release before a lock? + default: // trying to release something that isn't locked + panic("semaphore: V > P") + } + } + return nil +}