16 Commits

Author SHA1 Message Date
Julien Pivotto
33d20ac6d8 prometheus: Add detailed metrics
Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
2017-03-16 14:18:46 +01:00
James Shubin
660554cc45 todo: Update the TODO file to be more current
We should really try to remember to patch it with fixes when we do them
:)
2017-03-15 14:57:29 -04:00
James Shubin
a455324e8c examples: Add missing file example
I was using this for testing graph changes but forgot to commit it.
2017-03-13 08:05:49 -04:00
James Shubin
cd5e2e1148 pgraph: Add fast pausing and exiting of graphs
This causes a graph to actually stop processing part way through, even
if there are poke's that want to continue on. This is so that the user
experience of pressing ^C actually causes a shutdown without finishing
the graph execution. It might be preferred to have this be a user
defined setting at some point in the future, such as if the user presses
^C twice. As well, we might want to implement an interrupt API so that
individual resource execution can be asked to bail out early if
requested. This could happen on a third ^C press.
2017-03-13 07:54:03 -04:00
James Shubin
074da4da19 pgraph, resources: Run the resource Setup in parallel
This is a reasonable thing to do at this time.
2017-03-13 07:54:03 -04:00
James Shubin
e4e39d820c pgraph: semaphore: Refactor semaphore size function and test 2017-03-13 07:49:29 -04:00
James Shubin
e5dbb214a2 pgraph: Move the BackPoke to before the semaphores
I can't think of a reason we should grab a semaphore before backpoking.
The semaphore is intended to block around the actual work in CheckApply,
not the dependency resolution of the correct vertex.
2017-03-13 07:49:29 -04:00
James Shubin
91af528ff8 pgraph: Move the quiesce done indicator to avoid deadlock
This avoids a deadlock on resource failure when retry==0. Without this
we would never exit. This adds a test in too!
2017-03-12 13:52:35 -04:00
James Shubin
18c4e39ea3 resources: exit: Misc cleanups
Some of this code hadn't been touched much since an early mgmt. Here's a
quick cleanup of some cruft.
2017-03-12 13:21:22 -04:00
James Shubin
bda455ce78 resources: exec: Ignore signals sent to main process
When we send a ^C to the main process, our children see it too! This
puts them in their own process group so that they're not affected.
There's still the matter of properly hooking up the internal exit signal
to a proper shutdown, but that's separate.

This might mean that there should be a case for an interrupt aspect to
the resource API which would allow a second ^C by the engine, to cause a
forceful termination by the resource if that resource supported that.
2017-03-12 13:11:54 -04:00
James Shubin
a07aea1ad3 resources: exec: Clean up command error processing
Show the exit status on error and general cleanups.
2017-03-12 12:44:03 -04:00
James Shubin
18e2dbf144 resources: exec: Remove state checks that are done in the engine
These state checks are now done automatically in the engine, and so they
should be removed to make the code easier to read.
2017-03-12 12:35:03 -04:00
James Shubin
564a07e62e resources: exec: Don't invalidate state on poke
This was some legacy incorrect decision from earlier mgmt.
2017-03-12 12:35:02 -04:00
James Shubin
a358135e41 resources: exec: Remove the pollint parameter
Since we now have a poll metaparameter, we don't need the resource
specific code.
2017-03-12 10:49:26 -04:00
James Shubin
6d9be15035 pgraph: semaphore: Add lock around semaphore map
I forgot about the `concurrent map write` race, but now it's fixed. I
suppose we could probably pre-create all semaphores in the graph at once
before Start, and remove this lock, but that's an optimization for a
later day.
2017-03-11 09:06:18 -05:00
James Shubin
b740e0b78a git: Add more features to tag.sh script
This helps me make releases and probably won't help you, but why not be
transparent about things and tools!
2017-03-11 08:41:56 -05:00
20 changed files with 580 additions and 127 deletions

10
TODO.md
View File

@@ -1,16 +1,16 @@
# TODO
If you're looking for something to do, look here!
Let us know if you're working on one of the items.
If you'd like something to work on, ping @purpleidea and I'll create an issue
tailored especially for you! Just let me know your approximate golang skill
level and how many hours you'd like to spend on the patch.
## Package resource
- [ ] getfiles support on debian [bug](https://github.com/hughsie/PackageKit/issues/118)
- [ ] directory info on fedora [bug](https://github.com/hughsie/PackageKit/issues/117)
- [ ] dnf blocker [bug](https://github.com/hughsie/PackageKit/issues/110)
- [ ] install signal blocker [bug](https://github.com/hughsie/PackageKit/issues/109)
## File resource [bug](https://github.com/purpleidea/mgmt/issues/64) [:heart:](https://github.com/purpleidea/mgmt/labels/mgmtlove)
- [ ] chown/chmod support [:heart:](https://github.com/purpleidea/mgmt/labels/mgmtlove)
- [ ] user/group support [:heart:](https://github.com/purpleidea/mgmt/labels/mgmtlove)
- [ ] recurse limit support [:heart:](https://github.com/purpleidea/mgmt/labels/mgmtlove)
- [ ] fanotify support [bug](https://github.com/go-fsnotify/fsnotify/issues/114)
@@ -29,7 +29,6 @@ Let us know if you're working on one of the items.
## Virt (libvirt) resource
- [ ] base resource improvements [:heart:](https://github.com/purpleidea/mgmt/labels/mgmtlove)
- [ ] port to upstream https://github.com/libvirt/libvirt-go [:heart:](https://github.com/purpleidea/mgmt/labels/mgmtlove)
## Net (systemd-networkd) resource
- [ ] base resource
@@ -52,6 +51,9 @@ Let us know if you're working on one of the items.
## Torrent/dht file transfer
- [ ] base plumbing
## GPG/Auth improvements
- [ ] base plumbing
## Language improvements
- [ ] language design
- [ ] lexer/parser

10
examples/file4.yaml Normal file
View File

@@ -0,0 +1,10 @@
---
graph: mygraph
resources:
file:
- name: file1
path: "/tmp/mgmt/hello"
content: |
i am a file
state: exists
edges: []

View File

@@ -421,7 +421,7 @@ func (obj *Main) Run() error {
// run graph vertex LOCK...
if !first { // TODO: we can flatten this check out I think
converger.Pause() // FIXME: add sync wait?
G.Pause() // sync
G.Pause(false) // sync
//G.UnGroup() // FIXME: implement me if needed!
}

View File

@@ -26,6 +26,7 @@ import (
"time"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/prometheus"
"github.com/purpleidea/mgmt/resources"
multierr "github.com/hashicorp/go-multierror"
@@ -65,6 +66,14 @@ func (g *Graph) OKTimestamp(v *Vertex) bool {
// Poke tells nodes after me in the dependency graph that they need to refresh.
func (g *Graph) Poke(v *Vertex) error {
// if we're pausing (or exiting) then we should suspend poke's so that
// the graph doesn't go on running forever until it's completely done!
// this is an optional feature which we can do by default on user exit
if g.fastPause {
return nil // TODO: should this be an error instead?
}
var wg sync.WaitGroup
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
for _, n := range g.OutgoingGraphVertices(v) {
@@ -169,6 +178,18 @@ func (g *Graph) Process(v *Vertex) error {
defer obj.SetState(resources.ResStateNil) // reset state when finished
obj.SetState(resources.ResStateProcess)
// is it okay to run dependency wise right now?
// if not, that's okay because when the dependency runs, it will poke
// us back and we will run if needed then!
if !g.OKTimestamp(v) {
go g.BackPoke(v)
return nil
}
// timestamp must be okay...
if g.Flags.Debug {
log.Printf("%s[%s]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp())
}
// semaphores!
// These shouldn't ever block an exit, since the graph should eventually
// converge causing their them to unlock. More interestingly, since they
@@ -191,18 +212,6 @@ func (g *Graph) Process(v *Vertex) error {
var ok = true
var applied = false // did we run an apply?
// is it okay to run dependency wise right now?
// if not, that's okay because when the dependency runs, it will poke
// us back and we will run if needed then!
if !g.OKTimestamp(v) {
go g.BackPoke(v)
return nil
}
// timestamp must be okay...
if g.Flags.Debug {
log.Printf("%s[%s]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp())
}
// connect any senders to receivers and detect if values changed
if updated, err := obj.SendRecv(obj); err != nil {
@@ -385,9 +394,9 @@ Loop:
// catch invalid rates
if v.Meta().Burst == 0 && !(v.Meta().Limit == rate.Inf) { // blocked
e := fmt.Errorf("%s[%s]: Permanently limited (rate != Inf, burst: 0)", v.Kind(), v.GetName())
v.SendEvent(event.EventExit, &SentinelErr{e})
ev.ACK() // ready for next message
v.Res.QuiesceGroup().Done()
v.SendEvent(event.EventExit, &SentinelErr{e})
continue
}
@@ -422,14 +431,23 @@ Loop:
playback = true
log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e)
if retry == 0 {
if err := obj.Prometheus().UpdateState(fmt.Sprintf("%v[%v]", v.Kind(), v.GetName()), v.Kind(), prometheus.ResStateHardFail); err != nil {
// TODO: how to error this?
log.Printf("%s[%s]: Prometheus.UpdateState() errored: %v", v.Kind(), v.GetName(), err)
}
// wrap the error in the sentinel
v.Res.QuiesceGroup().Done() // before the Wait that happens in SendEvent!
v.SendEvent(event.EventExit, &SentinelErr{e})
v.Res.QuiesceGroup().Done()
return
}
if retry > 0 { // don't decrement the -1
retry--
}
if err := obj.Prometheus().UpdateState(fmt.Sprintf("%v[%v]", v.Kind(), v.GetName()), v.Kind(), prometheus.ResStateSoftFail); err != nil {
// TODO: how to error this?
log.Printf("%s[%s]: Prometheus.UpdateState() errored: %v", v.Kind(), v.GetName(), err)
}
log.Printf("%s[%s]: CheckApply: Retrying after %.4f seconds (%d left)", v.Kind(), v.GetName(), delay.Seconds(), retry)
// start the timer...
timer.Reset(delay)
@@ -654,11 +672,18 @@ func (g *Graph) Start(first bool) { // start or continue
t, _ := g.TopologicalSort()
indegree := g.InDegree() // compute all of the indegree's
reversed := Reverse(t)
wg := &sync.WaitGroup{}
for _, v := range reversed { // run the Setup() for everyone first
if !v.Res.IsWorking() { // if Worker() is not running...
v.Res.Setup() // initialize some vars in the resource
// run these in parallel, as long as we wait before continuing
wg.Add(1)
go func(vv *Vertex) {
defer wg.Done()
if !vv.Res.IsWorking() { // if Worker() is not running...
vv.Res.Setup() // initialize some vars in the resource
}
}(v)
}
wg.Wait()
// run through the topological reverse, and start or unpause each vertex
for _, v := range reversed {
@@ -718,21 +743,31 @@ func (g *Graph) Start(first bool) { // start or continue
// we wait for everyone to start before exiting!
}
// Pause sends pause events to the graph in a topological sort order.
func (g *Graph) Pause() {
// Pause sends pause events to the graph in a topological sort order. If you set
// the fastPause argument to true, then it will ask future propagation waves to
// not run through the graph before exiting, and instead will exit much quicker.
func (g *Graph) Pause(fastPause bool) {
log.Printf("State: %v -> %v", g.setState(graphStatePausing), g.getState())
defer log.Printf("State: %v -> %v", g.setState(graphStatePaused), g.getState())
if fastPause {
g.fastPause = true // set flag
}
t, _ := g.TopologicalSort()
for _, v := range t { // squeeze out the events...
v.SendEvent(event.EventPause, nil) // sync
}
g.fastPause = false // reset flag
}
// Exit sends exit events to the graph in a topological sort order.
func (g *Graph) Exit() {
if g == nil {
if g == nil { // empty graph that wasn't populated yet
return
} // empty graph that wasn't populated yet
}
// FIXME: a second ^C could put this into fast pause, but do it for now!
g.Pause(true) // implement this with pause to avoid duplicating the code
t, _ := g.TopologicalSort()
for _, v := range t { // squeeze out the events...
// turn off the taps...

View File

@@ -58,9 +58,11 @@ type Graph struct {
Adjacency map[*Vertex]map[*Vertex]*Edge // *Vertex -> *Vertex (edge)
Flags Flags
state graphState
fastPause bool // used to disable pokes for a fast pause
mutex *sync.Mutex // used when modifying graph State variable
wg *sync.WaitGroup
semas map[string]*semaphore.Semaphore
slock *sync.Mutex // semaphore mutex
prometheus *prometheus.Prometheus // the prometheus instance
}
@@ -89,6 +91,7 @@ func NewGraph(name string) *Graph {
mutex: &sync.Mutex{},
wg: &sync.WaitGroup{},
semas: make(map[string]*semaphore.Semaphore),
slock: &sync.Mutex{},
}
}
@@ -126,6 +129,8 @@ func (g *Graph) Copy() *Graph {
mutex: g.mutex,
wg: g.wg,
semas: g.semas,
slock: g.slock,
fastPause: g.fastPause,
prometheus: g.prometheus,
}

View File

@@ -36,21 +36,14 @@ func (g *Graph) SemaLock(semas []string) error {
var reterr error
sort.Strings(semas) // very important to avoid deadlock in the dag!
for _, id := range semas {
size := 1 // default semaphore size
// valid id's include "some_id", "hello:42" and ":13"
if index := strings.LastIndex(id, SemaSep); index > -1 && (len(id)-index+len(SemaSep)) >= 1 {
// NOTE: we only allow size > 0 here!
if i, err := strconv.Atoi(id[index+len(SemaSep):]); err == nil && i > 0 {
size = i
}
}
g.slock.Lock() // semaphore creation lock
sema, ok := g.semas[id] // lookup
if !ok {
size := SemaSize(id) // defaults to 1
g.semas[id] = semaphore.NewSemaphore(size)
sema = g.semas[id]
}
g.slock.Unlock()
if err := sema.P(1); err != nil { // lock!
reterr = multierr.Append(reterr, err) // list of errors
@@ -76,3 +69,17 @@ func (g *Graph) SemaUnlock(semas []string) error {
}
return reterr
}
// SemaSize returns the size integer associated with the semaphore id. It
// defaults to 1 if not found.
func SemaSize(id string) int {
size := 1 // default semaphore size
// valid id's include "some_id", "hello:42" and ":13"
if index := strings.LastIndex(id, SemaSep); index > -1 && (len(id)-index+len(SemaSep)) >= 1 {
// NOTE: we only allow size > 0 here!
if i, err := strconv.Atoi(id[index+len(SemaSep):]); err == nil && i > 0 {
size = i
}
}
return size
}

View File

@@ -23,6 +23,19 @@ import (
"github.com/purpleidea/mgmt/resources"
)
func TestSemaSize(t *testing.T) {
pairs := map[string]int{
"id:42": 42,
":13": 13,
"some_id": 1,
}
for id, size := range pairs {
if i := SemaSize(id); i != size {
t.Errorf("sema id `%s`, expected: `%d`, got: `%d`", id, size, i)
}
}
}
func NewNoopResTestSema(name string, semas []string) *NoopResTest {
obj := &NoopResTest{
NoopRes: resources.NoopRes{

View File

@@ -20,17 +20,33 @@
package prometheus
import (
"errors"
"net/http"
"strconv"
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
errwrap "github.com/pkg/errors"
)
// DefaultPrometheusListen is registered in
// https://github.com/prometheus/prometheus/wiki/Default-port-allocations
const DefaultPrometheusListen = "127.0.0.1:9233"
// ResState represents the status of a resource.
type ResState int
const (
// ResStateOK represents a working resource
ResStateOK ResState = iota
// ResStateSoftFail represents a resource in soft fail (will be retried)
ResStateSoftFail
// ResStateHardFail represents a resource in hard fail (will NOT be retried)
ResStateHardFail
)
// Prometheus is the struct that contains information about the
// prometheus instance. Run Init() on it.
type Prometheus struct {
@@ -38,7 +54,18 @@ type Prometheus struct {
checkApplyTotal *prometheus.CounterVec // total of CheckApplies that have been triggered
pgraphStartTimeSeconds prometheus.Gauge // process start time in seconds since unix epoch
managedResources *prometheus.GaugeVec // Resources we manage now
failedResourcesTotal *prometheus.CounterVec // Total of failures since mgmt has started
failedResources *prometheus.GaugeVec // Number of current resources
resourcesState map[string]resStateWithKind // Maps the resources with their current kind/state
mutex *sync.Mutex // Mutex used to update resourcesState
}
// resStateWithKind is used to count the failures by kind
type resStateWithKind struct {
state ResState
kind string
}
// Init some parameters - currently the Listen address.
@@ -46,6 +73,10 @@ func (obj *Prometheus) Init() error {
if len(obj.Listen) == 0 {
obj.Listen = DefaultPrometheusListen
}
obj.mutex = &sync.Mutex{}
obj.resourcesState = make(map[string]resStateWithKind)
obj.checkApplyTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "mgmt_checkapply_total",
@@ -68,6 +99,38 @@ func (obj *Prometheus) Init() error {
)
prometheus.MustRegister(obj.pgraphStartTimeSeconds)
obj.managedResources = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "mgmt_resources",
Help: "Number of managed resources.",
},
// kind: resource type: Svc, File, ...
[]string{"kind"},
)
prometheus.MustRegister(obj.managedResources)
obj.failedResourcesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "mgmt_failures_total",
Help: "Total of failed resources.",
},
// kind: resource type: Svc, File, ...
// failure: soft or hard
[]string{"kind", "failure"},
)
prometheus.MustRegister(obj.failedResourcesTotal)
obj.failedResources = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "mgmt_failures",
Help: "Number of failing resources.",
},
// kind: resource type: Svc, File, ...
// failure: soft or hard
[]string{"kind", "failure"},
)
prometheus.MustRegister(obj.failedResources)
return nil
}
@@ -107,3 +170,94 @@ func (obj *Prometheus) UpdatePgraphStartTime() error {
obj.pgraphStartTimeSeconds.SetToCurrentTime()
return nil
}
// AddManagedResource increments the Managed Resource counter and updates the resource status.
func (obj *Prometheus) AddManagedResource(resUUID string, rtype string) error {
if obj == nil {
return nil // happens when mgmt is launched without --prometheus
}
obj.managedResources.With(prometheus.Labels{"kind": rtype}).Inc()
if err := obj.UpdateState(resUUID, rtype, ResStateOK); err != nil {
return errwrap.Wrapf(err, "can't update the resource status in the map")
}
return nil
}
// RemoveManagedResource decrements the Managed Resource counter and updates the resource status.
func (obj *Prometheus) RemoveManagedResource(resUUID string, rtype string) error {
if obj == nil {
return nil // happens when mgmt is launched without --prometheus
}
obj.managedResources.With(prometheus.Labels{"kind": rtype}).Dec()
if err := obj.deleteState(resUUID); err != nil {
return errwrap.Wrapf(err, "can't remove the resource status from the map")
}
return nil
}
// deleteState removes the resources for the state map and re-populates the failing gauge.
func (obj *Prometheus) deleteState(resUUID string) error {
if obj == nil {
return nil // happens when mgmt is launched without --prometheus
}
obj.mutex.Lock()
delete(obj.resourcesState, resUUID)
obj.mutex.Unlock()
if err := obj.updateFailingGauge(); err != nil {
return errwrap.Wrapf(err, "can't update the failing gauge")
}
return nil
}
// UpdateState updates the state of the resources in our internal state map
// then triggers a refresh of the failing gauge.
func (obj *Prometheus) UpdateState(resUUID string, rtype string, newState ResState) error {
defer obj.updateFailingGauge()
if obj == nil {
return nil // happens when mgmt is launched without --prometheus
}
obj.mutex.Lock()
obj.resourcesState[resUUID] = resStateWithKind{state: newState, kind: rtype}
obj.mutex.Unlock()
if newState != ResStateOK {
var strState string
if newState == ResStateSoftFail {
strState = "soft"
} else if newState == ResStateHardFail {
strState = "hard"
} else {
return errors.New("state should be soft or hard failure")
}
obj.failedResourcesTotal.With(prometheus.Labels{"kind": rtype, "failure": strState}).Inc()
}
return nil
}
// updateFailingGauge refreshes the failing gauge by parsking the internal
// state map.
func (obj *Prometheus) updateFailingGauge() error {
if obj == nil {
return nil // happens when mgmt is launched without --prometheus
}
var softFails, hardFails map[string]float64
softFails = make(map[string]float64)
hardFails = make(map[string]float64)
for _, v := range obj.resourcesState {
if v.state == ResStateSoftFail {
softFails[v.kind]++
} else if v.state == ResStateHardFail {
hardFails[v.kind]++
}
}
// TODO: we might want to Zero the metrics we are not using
// because in prometheus design the metrics keep living for some time
// even after they are removed.
obj.failedResources.Reset()
for k, v := range softFails {
obj.failedResources.With(prometheus.Labels{"kind": k, "failure": "soft"}).Set(v)
}
for k, v := range hardFails {
obj.failedResources.With(prometheus.Labels{"kind": k, "failure": "hard"}).Set(v)
}
return nil
}

View File

@@ -25,6 +25,7 @@ import (
"log"
"os/exec"
"strings"
"syscall"
"github.com/purpleidea/mgmt/util"
@@ -38,7 +39,6 @@ func init() {
// ExecRes is an exec resource for running commands.
type ExecRes struct {
BaseRes `yaml:",inline"`
State string `yaml:"state"` // state: exists/present?, absent, (undefined?)
Cmd string `yaml:"cmd"` // the command to run
Shell string `yaml:"shell"` // the (optional) shell to use to run the cmd
Timeout int `yaml:"timeout"` // the cmd timeout in seconds
@@ -46,7 +46,6 @@ type ExecRes struct {
WatchShell string `yaml:"watchshell"` // the (optional) shell to use to run the watch cmd
IfCmd string `yaml:"ifcmd"` // the if command to run
IfShell string `yaml:"ifshell"` // the (optional) shell to use to run the if cmd
PollInt int `yaml:"pollint"` // the poll interval for the ifcmd
}
// Default returns some sensible defaults for this resource.
@@ -64,11 +63,6 @@ func (obj *ExecRes) Validate() error {
return fmt.Errorf("command can't be empty")
}
// if we have a watch command, then we don't poll with the if command!
if obj.WatchCmd != "" && obj.PollInt > 0 {
return fmt.Errorf("don't poll when we have a watch command")
}
return obj.BaseRes.Validate()
}
@@ -119,6 +113,11 @@ func (obj *ExecRes) Watch() error {
}
cmd := exec.Command(cmdName, cmdArgs...)
//cmd.Dir = "" // look for program in pwd ?
// ignore signals sent to parent process (we're in our own group)
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
}
cmdReader, err := cmd.StdoutPipe()
if err != nil {
@@ -126,11 +125,11 @@ func (obj *ExecRes) Watch() error {
}
scanner := bufio.NewScanner(cmdReader)
defer cmd.Wait() // XXX: is this necessary?
defer cmd.Wait() // wait for the command to exit before return!
defer func() {
// FIXME: without wrapping this in this func it panic's
// when running examples/graph8d.yaml
cmd.Process.Kill() // TODO: is this necessary?
// when running certain graphs... why?
cmd.Process.Kill() // shutdown the Watch command on exit
}()
if err := cmd.Start(); err != nil {
return errwrap.Wrapf(err, "error starting Cmd")
@@ -151,6 +150,7 @@ func (obj *ExecRes) Watch() error {
log.Printf("%s[%s]: Watch output: %s", obj.Kind(), obj.GetName(), text)
if text != "" {
send = true
obj.StateOK(false) // something made state dirty
}
case err := <-errch:
@@ -171,8 +171,6 @@ func (obj *ExecRes) Watch() error {
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
// it is okay to invalidate the clean state on poke too
obj.StateOK(false) // something made state dirty
obj.Event()
}
}
@@ -181,24 +179,12 @@ func (obj *ExecRes) Watch() error {
// CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not.
// TODO: expand the IfCmd to be a list of commands
func (obj *ExecRes) CheckApply(apply bool) (checkOK bool, err error) {
func (obj *ExecRes) CheckApply(apply bool) (bool, error) {
// If we receive a refresh signal, then the engine skips the IsStateOK()
// check and this will run. It is still guarded by the IfCmd, but it can
// have a chance to execute, and all without the check of obj.Refresh()!
// if there is a watch command, but no if command, run based on state
if obj.WatchCmd != "" && obj.IfCmd == "" {
if obj.IsStateOK() { // FIXME: this is done by engine now...
return true, nil
}
// if there is no watcher, but there is an onlyif check, run it to see
} else if obj.IfCmd != "" { // && obj.WatchCmd == ""
// there is a watcher, but there is also an if command
//} else if obj.IfCmd != "" && obj.WatchCmd != "" {
if obj.PollInt > 0 { // && obj.WatchCmd == ""
// XXX: have the Watch() command output onlyif poll events...
// XXX: we can optimize by saving those results for returning here
// return XXX
}
if obj.IfCmd != "" { // if there is no onlyif check, we should just run
var cmdName string
var cmdArgs []string
@@ -214,18 +200,17 @@ func (obj *ExecRes) CheckApply(apply bool) (checkOK bool, err error) {
cmdName = obj.IfShell // usually bash, or sh
cmdArgs = []string{"-c", obj.IfCmd}
}
err = exec.Command(cmdName, cmdArgs...).Run()
if err != nil {
cmd := exec.Command(cmdName, cmdArgs...)
// ignore signals sent to parent process (we're in our own group)
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
}
if err := cmd.Run(); err != nil {
// TODO: check exit value
return true, nil // don't run
}
// if there is no watcher and no onlyif check, assume we should run
} else { // if obj.WatchCmd == "" && obj.IfCmd == "" {
// just run if state is dirty
if obj.IsStateOK() { // FIXME: this is done by engine now...
return true, nil
}
}
// state is not okay, no work done, exit, but without error
@@ -252,11 +237,17 @@ func (obj *ExecRes) CheckApply(apply bool) (checkOK bool, err error) {
}
cmd := exec.Command(cmdName, cmdArgs...)
//cmd.Dir = "" // look for program in pwd ?
// ignore signals sent to parent process (we're in our own group)
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
}
var out bytes.Buffer
cmd.Stdout = &out
if err := cmd.Start(); err != nil {
return false, errwrap.Wrapf(err, "error starting Cmd")
return false, errwrap.Wrapf(err, "error starting cmd")
}
timeout := obj.Timeout
@@ -266,16 +257,30 @@ func (obj *ExecRes) CheckApply(apply bool) (checkOK bool, err error) {
done := make(chan error)
go func() { done <- cmd.Wait() }()
var err error // error returned by cmd
select {
case err := <-done:
if err != nil {
e := errwrap.Wrapf(err, "error waiting for Cmd")
return false, e
}
case e := <-done:
err = e // store
case <-util.TimeAfterOrBlock(timeout):
//cmd.Process.Kill() // TODO: is this necessary?
return false, fmt.Errorf("timeout waiting for Cmd")
cmd.Process.Kill() // TODO: check error?
return false, fmt.Errorf("timeout for cmd")
}
// process the err result from cmd, we process non-zero exits here too!
exitErr, ok := err.(*exec.ExitError) // embeds an os.ProcessState
if err != nil && ok {
pStateSys := exitErr.Sys() // (*os.ProcessState) Sys
wStatus, ok := pStateSys.(syscall.WaitStatus)
if !ok {
e := errwrap.Wrapf(err, "error running cmd")
return false, e
}
return false, fmt.Errorf("cmd error, exit status: %d", wStatus.ExitStatus())
} else if err != nil {
e := errwrap.Wrapf(err, "general cmd error")
return false, e
}
// TODO: if we printed the stdout while the command is running, this
@@ -288,7 +293,6 @@ func (obj *ExecRes) CheckApply(apply bool) (checkOK bool, err error) {
log.Printf("%s[%s]: Command output is:", obj.Kind(), obj.GetName())
log.Printf(out.String())
}
// XXX: return based on exit value!!
// The state tracking is for exec resources that can't "detect" their
// state, and assume it's invalid when the Watch() function triggers.
@@ -306,40 +310,6 @@ type ExecUID struct {
// TODO: add more elements here
}
// IFF aka if and only if they are equivalent, return true. If not, false.
func (obj *ExecUID) IFF(uid ResUID) bool {
res, ok := uid.(*ExecUID)
if !ok {
return false
}
if obj.Cmd != res.Cmd {
return false
}
// TODO: add more checks here
//if obj.Shell != res.Shell {
// return false
//}
//if obj.Timeout != res.Timeout {
// return false
//}
//if obj.WatchCmd != res.WatchCmd {
// return false
//}
//if obj.WatchShell != res.WatchShell {
// return false
//}
if obj.IfCmd != res.IfCmd {
return false
}
//if obj.PollInt != res.PollInt {
// return false
//}
//if obj.State != res.State {
// return false
//}
return true
}
// AutoEdges returns the AutoEdge interface. In this case no autoedges are used.
func (obj *ExecRes) AutoEdges() AutoEdge {
// TODO: parse as many exec params to look for auto edges, for example
@@ -398,10 +368,7 @@ func (obj *ExecRes) Compare(res Res) bool {
if obj.IfCmd != res.IfCmd {
return false
}
if obj.PollInt != res.PollInt {
return false
}
if obj.State != res.State {
if obj.IfShell != res.IfShell {
return false
}
default:

View File

@@ -249,8 +249,6 @@ type BaseRes struct {
refresh bool // does this resource have a refresh to run?
//refreshState StatefulBool // TODO: future stateful bool
prometheus *prometheus.Prometheus
}
// UnmarshalYAML is the custom unmarshal handler for the BaseRes struct. It is
@@ -366,6 +364,10 @@ func (obj *BaseRes) Init() error {
// TODO: this StatefulBool implementation could be eventually swappable
//obj.refreshState = &DiskBool{Path: path.Join(dir, refreshPathToken)}
if err := obj.Prometheus().AddManagedResource(fmt.Sprintf("%v[%v]", obj.Kind(), obj.GetName()), obj.Kind()); err != nil {
return errwrap.Wrapf(err, "could not increase prometheus counter!")
}
return nil
}
@@ -383,6 +385,10 @@ func (obj *BaseRes) Close() error {
close(obj.stopped)
obj.waitGroup.Done()
if err := obj.Prometheus().RemoveManagedResource(fmt.Sprintf("%v[%v]", obj.Kind(), obj.GetName()), obj.kind); err != nil {
return errwrap.Wrapf(err, "could not decrease prometheus counter!")
}
return nil
}
@@ -440,7 +446,8 @@ func (obj *BaseRes) QuiesceGroup() *sync.WaitGroup { return obj.quiesceGroup }
func (obj *BaseRes) WaitGroup() *sync.WaitGroup { return obj.waitGroup }
// Setup does some work which must happen before the Worker starts. It happens
// once per Worker startup.
// once per Worker startup. It can happen in parallel with other Setup calls, so
// add locks around any operation that's not thread-safe.
func (obj *BaseRes) Setup() {
obj.started = make(chan struct{}) // closes when started
obj.stopped = make(chan struct{}) // closes when stopped
@@ -450,7 +457,7 @@ func (obj *BaseRes) Setup() {
obj.eventsChan = make(chan *event.Event) // unbuffered chan to avoid stale events
}
// Reset from Setup.
// Reset from Setup. These can get called for different vertices in parallel.
func (obj *BaseRes) Reset() {
return
}
@@ -683,5 +690,5 @@ func (obj *BaseRes) Poll() error {
// Prometheus returns the prometheus instance.
func (obj *BaseRes) Prometheus() *prometheus.Prometheus {
return obj.prometheus
return obj.Data().Prometheus
}

2
tag.sh
View File

@@ -9,3 +9,5 @@ echo "Press ^C within 3s to abort."
sleep 3s
echo "release: tag $t" | git tag --file=- --sign $t
git push origin $t
git diff --stat "$v" "$t"
if which contrib.sh 2>/dev/null; then contrib.sh "$v"; fi

9
test/shell/exec-fail.sh Executable file
View File

@@ -0,0 +1,9 @@
#!/bin/bash -e
# should take a few seconds plus converged timeout, and test we don't hang!
# TODO: should we return a different exit code if the resources fail?
# TODO: should we be converged if one of the resources has permanently failed?
$timeout --kill-after=20s 15s ./mgmt run --yaml exec-fail.yaml --converged-timeout=5 --no-watch --no-pgp --tmp-prefix &
pid=$!
wait $pid # get exit status
exit $?

15
test/shell/exec-fail.yaml Normal file
View File

@@ -0,0 +1,15 @@
---
graph: mygraph
resources:
exec:
- name: exec1
cmd: false this should fail but not hang
shell: ''
timeout: 0
watchcmd: sleep 5s
watchshell: ''
ifcmd: ''
ifshell: ''
pollint: 0
state: present
edges: []

10
test/shell/graph-exit1.sh Executable file
View File

@@ -0,0 +1,10 @@
#!/bin/bash -e
# should take 15 seconds for longest resources plus startup time to shutdown
# we don't want the ^C to allow the rest of the graph to continue executing!
$timeout --kill-after=35s 25s ./mgmt run --yaml graph-exit.yaml --no-watch --no-pgp --tmp-prefix &
pid=$!
sleep 5s # let the initial resources start to run...
killall -SIGINT mgmt # send ^C to exit mgmt
wait $pid # get exit status
exit $?

View File

@@ -0,0 +1,71 @@
---
graph: parallel
resources:
exec:
- name: exec1
cmd: sleep 15s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
state: present
- name: exec2
cmd: sleep 15s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
state: present
- name: exec3
cmd: sleep 15s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
state: present
- name: exec4
cmd: sleep 15s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
state: present
- name: exec0
cmd: sleep 15s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
state: present
edges:
- name: e1
from:
kind: exec
name: exec1
to:
kind: exec
name: exec2
- name: e2
from:
kind: exec
name: exec2
to:
kind: exec
name: exec3
- name: e3
from:
kind: exec
name: exec3
to:
kind: exec
name: exec4

10
test/shell/graph-exit2.sh Executable file
View File

@@ -0,0 +1,10 @@
#!/bin/bash -e
# should take 15 seconds for longest resources plus startup time to shutdown
# we don't want the ^C to allow the rest of the graph to continue executing!
$timeout --kill-after=45s 35s ./mgmt run --yaml graph-exit.yaml --no-watch --no-pgp --tmp-prefix &
pid=$!
sleep 10s # let the initial resources start to run...
killall -SIGINT mgmt # send ^C to exit mgmt
wait $pid # get exit status
exit $?

View File

@@ -0,0 +1,74 @@
---
graph: parallel
resources:
exec:
- name: exec1
meta:
retry: 10
delay: 1000
cmd: 'sleep 5s && exit 13'
shell: '/bin/bash'
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
state: present
- name: exec2
cmd: sleep 15s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
state: present
- name: exec3
cmd: sleep 15s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
state: present
- name: exec4
cmd: sleep 15s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
state: present
- name: exec0
cmd: sleep 15s
shell: ''
timeout: 0
watchcmd: ''
watchshell: ''
ifcmd: ''
ifshell: ''
state: present
edges:
- name: e1
from:
kind: exec
name: exec1
to:
kind: exec
name: exec2
- name: e2
from:
kind: exec
name: exec2
to:
kind: exec
name: exec3
- name: e3
from:
kind: exec
name: exec3
to:
kind: exec
name: exec4

View File

@@ -1,7 +1,5 @@
#!/bin/bash -e
exit 0 # FIXME: disabled until intermittent failures can be resolved
# run a graph, with prometheus support
timeout --kill-after=30s 25s ./mgmt run --tmp-prefix --no-pgp --prometheus --yaml prometheus-3.yaml &
pid=$!

35
test/shell/prometheus-4.sh Executable file
View File

@@ -0,0 +1,35 @@
#!/bin/bash -xe
# run a graph, with prometheus support
timeout --kill-after=30s 25s ./mgmt run --tmp-prefix --no-pgp --prometheus --yaml prometheus-4.yaml &
pid=$!
sleep 10s # let it converge
# For test debugging purpose
curl 127.0.0.1:9233/metrics
# Check for mgmt_resources
curl 127.0.0.1:9233/metrics | grep '^mgmt_resources{kind="file"} 4$'
# One CheckApply for a File ; in noop mode.
curl 127.0.0.1:9233/metrics | grep 'mgmt_checkapply_total{apply="false",errorful="false",eventful="true",kind="file"} 1$'
# Two CheckApply for a File ; without errors, with events
curl 127.0.0.1:9233/metrics | grep 'mgmt_checkapply_total{apply="true",errorful="false",eventful="true",kind="file"} 2$'
# Multiple CheckApplies with errors
curl 127.0.0.1:9233/metrics | grep 'mgmt_checkapply_total{apply="true",errorful="true",eventful="true",kind="file"} [0-9]\+'
# One soft failure ATM
curl 127.0.0.1:9233/metrics | grep 'mgmt_failures{failure="soft",kind="file"} 1$'
# Multiple soft failures since startup
if curl 127.0.0.1:9233/metrics | grep 'mgmt_failures_total{failure="soft",kind="file"} 1$'
then
false
fi
curl 127.0.0.1:9233/metrics | grep 'mgmt_failures_total{failure="soft",kind="file"} [0-9]\+'
killall -SIGINT mgmt # send ^C to exit mgmt
wait $pid # get exit status
exit $?

View File

@@ -0,0 +1,29 @@
---
graph: mygraph
resources:
file:
- name: file1
path: "/tmp/mgmt/NONEXIST/f1"
content: |
i am f1
state: exists
meta:
retry: -1
delay: 1000
- name: file2
path: "/tmp/mgmt/f2"
content: |
i am f2
state: exists
- name: file3
path: "/tmp/mgmt/f3"
content: |
i am f3
state: exists
- name: file4
path: "/tmp/mgmt/f4"
content: |
i am f4
state: exists
meta:
noop: true