engine: resources: Improve exec resource

The exec resource was an early addition to the project, and it was due
for some fixes and integration into our automated tests. This patch
fixes a number of issues, and makes it ready for more general use.
This commit is contained in:
James Shubin
2019-03-02 21:14:01 -05:00
parent 829741e2ac
commit 6b803656b2
5 changed files with 470 additions and 162 deletions

View File

@@ -27,12 +27,13 @@ import (
"strings"
"sync"
"syscall"
"time"
"github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/engine/traits"
engineUtil "github.com/purpleidea/mgmt/engine/util"
"github.com/purpleidea/mgmt/util"
multierr "github.com/hashicorp/go-multierror"
errwrap "github.com/pkg/errors"
)
@@ -44,26 +45,60 @@ func init() {
type ExecRes struct {
traits.Base // add the base methods without re-implementation
traits.Edgeable
traits.Sendable
init *engine.Init
Cmd string `yaml:"cmd"` // the command to run
Cwd string `yaml:"cwd"` // the dir to run the command in (empty means use `pwd` of command)
Shell string `yaml:"shell"` // the (optional) shell to use to run the cmd
Timeout int `yaml:"timeout"` // the cmd timeout in seconds
WatchCmd string `yaml:"watchcmd"` // the watch command to run
WatchCwd string `yaml:"watchcwd"` // the dir to run the watch command in (empty means use `pwd` of command)
WatchShell string `yaml:"watchshell"` // the (optional) shell to use to run the watch cmd
IfCmd string `yaml:"ifcmd"` // the if command to run
IfCwd string `yaml:"ifcwd"` // the dir to run the if command in (empty means use `pwd` of command)
IfShell string `yaml:"ifshell"` // the (optional) shell to use to run the if cmd
User string `yaml:"user"` // the (optional) user to use to execute the command
Group string `yaml:"group"` // the (optional) group to use to execute the command
Output *string // all cmd output, read only, do not set!
Stdout *string // the cmd stdout, read only, do not set!
Stderr *string // the cmd stderr, read only, do not set!
// Cmd is the command to run. If this is not specified, we use the name.
Cmd string `yaml:"cmd"`
// Args is a list of args to pass to Cmd. This can be used *instead* of
// passing the full command and args as a single string to Cmd. It can
// only be used when a Shell is *not* specified. The advantage of this
// is that you don't have to worry about escape characters.
Args []string `yaml:"args"`
// Cmd is the dir to run the command in. If empty, then this will use
// the working directory of the calling process. (This process is mgmt,
// not the process being run here.)
Cwd string `yaml:"cwd"`
// Shell is the (optional) shell to use to run the cmd. If you specify
// this, then you can't use the Args parameter.
Shell string `yaml:"shell"`
// Timeout is the number of seconds to wait before sending a Kill to the
// running command. If the Kill is received before the process exits,
// then this be treated as an error.
Timeout uint64 `yaml:"timeout"`
wg *sync.WaitGroup
// Watch is the command to run to detect event changes. Each line of
// output from this command is treated as an event.
WatchCmd string `yaml:"watchcmd"`
// WatchCwd is the Cwd for the WatchCmd. See the docs for Cwd.
WatchCwd string `yaml:"watchcwd"`
// WatchShell is the Shell for the WatchCmd. See the docs for Shell.
WatchShell string `yaml:"watchshell"`
// IfCmd is the command that runs to guard against running the Cmd. If
// this command succeeds, then Cmd *will* be run. If this command
// returns a non-zero result, then the Cmd will not be run. Any error
// scenario or timeout will cause the resource to error.
IfCmd string `yaml:"ifcmd"`
// IfCwd is the Cwd for the IfCmd. See the docs for Cwd.
IfCwd string `yaml:"ifcwd"`
// IfShell is the Shell for the IfCmd. See the docs for Shell.
IfShell string `yaml:"ifshell"`
// User is the (optional) user to use to execute the command. It is used
// for any command being run.
User string `yaml:"user"`
// Group is the (optional) group to use to execute the command. It is
// used for any command being run.
Group string `yaml:"group"`
output *string // all cmd output, read only, do not set!
stdout *string // the cmd stdout, read only, do not set!
stderr *string // the cmd stderr, read only, do not set!
interruptChan chan struct{}
wg *sync.WaitGroup
}
// Default returns some sensible defaults for this resource.
@@ -71,10 +106,27 @@ func (obj *ExecRes) Default() engine.Res {
return &ExecRes{}
}
// getCmd returns the actual command to run. When Cmd is not specified, we use
// the Name.
func (obj *ExecRes) getCmd() string {
if obj.Cmd != "" {
return obj.Cmd
}
return obj.Name()
}
// Validate if the params passed in are valid data.
func (obj *ExecRes) Validate() error {
if obj.Cmd == "" { // this is the only thing that is really required
return fmt.Errorf("command can't be empty")
if obj.getCmd() == "" { // this is the only thing that is really required
return fmt.Errorf("the Cmd can't be empty")
}
split := strings.Fields(obj.getCmd())
if len(obj.Args) > 0 && obj.Shell != "" {
return fmt.Errorf("the Args param can't be used with a Shell")
}
if len(obj.Args) > 0 && len(split) > 1 {
return fmt.Errorf("the Args param can't be used when Cmd has args")
}
// check that, if an user or a group is set, we're running as root
@@ -95,6 +147,7 @@ func (obj *ExecRes) Validate() error {
func (obj *ExecRes) Init(init *engine.Init) error {
obj.init = init // save for later
obj.interruptChan = make(chan struct{})
obj.wg = &sync.WaitGroup{}
return nil
@@ -107,7 +160,7 @@ func (obj *ExecRes) Close() error {
// Watch is the primary listener for this resource and it outputs events.
func (obj *ExecRes) Watch() error {
ioChan := make(chan *bufioOutput)
ioChan := make(chan *cmdOutput)
defer obj.wg.Wait()
if obj.WatchCmd != "" {
@@ -125,7 +178,10 @@ func (obj *ExecRes) Watch() error {
cmdName = obj.WatchShell // usually bash, or sh
cmdArgs = []string{"-c", obj.WatchCmd}
}
cmd := exec.Command(cmdName, cmdArgs...)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cmd := exec.CommandContext(ctx, cmdName, cmdArgs...)
cmd.Dir = obj.WatchCwd // run program in pwd if ""
// ignore signals sent to parent process (we're in our own group)
cmd.SysProcAttr = &syscall.SysProcAttr{
@@ -139,25 +195,9 @@ func (obj *ExecRes) Watch() error {
return errwrap.Wrapf(err, "error while setting credential")
}
cmdReader, err := cmd.StdoutPipe()
if err != nil {
return errwrap.Wrapf(err, "error creating StdoutPipe for Cmd")
if ioChan, err = obj.cmdOutputRunner(ctx, cmd); err != nil {
return errwrap.Wrapf(err, "error starting WatchCmd")
}
scanner := bufio.NewScanner(cmdReader)
defer cmd.Wait() // wait for the command to exit before return!
defer func() {
// FIXME: without wrapping this in this func it panic's
// when running certain graphs... why?
cmd.Process.Kill() // shutdown the Watch command on exit
}()
if err := cmd.Start(); err != nil {
return errwrap.Wrapf(err, "error starting Cmd")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // unblock and cleanup
ioChan = obj.bufioChanScanner(ctx, scanner)
}
obj.init.Running() // when started, notify engine that we're running
@@ -172,12 +212,32 @@ func (obj *ExecRes) Watch() error {
return fmt.Errorf("reached EOF")
}
if err := data.err; err != nil {
// error reading input?
return errwrap.Wrapf(err, "unknown error")
// error reading input or cmd failure
exitErr, ok := err.(*exec.ExitError) // embeds an os.ProcessState
if !ok {
// command failed in some bad way
return errwrap.Wrapf(err, "unknown error")
}
pStateSys := exitErr.Sys() // (*os.ProcessState) Sys
wStatus, ok := pStateSys.(syscall.WaitStatus)
if !ok {
return errwrap.Wrapf(err, "error running cmd")
}
exitStatus := wStatus.ExitStatus()
obj.init.Logf("watchcmd exited with: %d", exitStatus)
if exitStatus != 0 {
return errwrap.Wrapf(err, "unexpected exit status of zero")
}
return err // i'm not sure if this could happen
}
// each time we get a line of output, we loop!
obj.init.Logf("watch output: %s", data.text)
if s := data.text; s == "" {
obj.init.Logf("watch output is empty!")
} else {
obj.init.Logf("watch output is:")
obj.init.Logf(s)
}
if data.text != "" {
send = true
}
@@ -231,11 +291,42 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) {
return false, errwrap.Wrapf(err, "error while setting credential")
}
var out splitWriter
out.Init()
cmd.Stdout = out.Stdout
cmd.Stderr = out.Stderr
if err := cmd.Run(); err != nil {
// TODO: check exit value
exitErr, ok := err.(*exec.ExitError) // embeds an os.ProcessState
if !ok {
// command failed in some bad way
return false, err
}
pStateSys := exitErr.Sys() // (*os.ProcessState) Sys
wStatus, ok := pStateSys.(syscall.WaitStatus)
if !ok {
return false, errwrap.Wrapf(err, "error running cmd")
}
exitStatus := wStatus.ExitStatus()
if exitStatus == 0 {
return false, fmt.Errorf("unexpected exit status of zero")
}
obj.init.Logf("ifcmd exited with: %d", exitStatus)
if s := out.String(); s == "" {
obj.init.Logf("ifcmd output is empty!")
} else {
obj.init.Logf("ifcmd output is:")
obj.init.Logf(s)
}
return true, nil // don't run
}
if s := out.String(); s == "" {
obj.init.Logf("ifcmd output is empty!")
} else {
obj.init.Logf("ifcmd output is:")
obj.init.Logf(s)
}
}
// state is not okay, no work done, exit, but without error
@@ -251,16 +342,33 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) {
// call without a shell
// FIXME: are there still whitespace splitting issues?
// TODO: we could make the split character user selectable...!
split := strings.Fields(obj.Cmd)
split := strings.Fields(obj.getCmd())
cmdName = split[0]
//d, _ := os.Getwd() // TODO: how does this ever error ?
//cmdName = path.Join(d, cmdName)
cmdArgs = split[1:]
if len(obj.Args) > 0 {
if len(split) != 1 { // should not happen
return false, fmt.Errorf("validation error")
}
cmdArgs = obj.Args
}
} else {
cmdName = obj.Shell // usually bash, or sh
cmdArgs = []string{"-c", obj.Cmd}
cmdArgs = []string{"-c", obj.getCmd()}
}
cmd := exec.Command(cmdName, cmdArgs...)
wg := &sync.WaitGroup{}
defer wg.Wait() // this must be above the defer cancel() call
var ctx context.Context
var cancel context.CancelFunc
if obj.Timeout > 0 { // cmd.Process.Kill() is called on timeout
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(obj.Timeout)*time.Second)
} else { // zero timeout means no timer
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel()
cmd := exec.CommandContext(ctx, cmdName, cmdArgs...)
cmd.Dir = obj.Cwd // run program in pwd if ""
// ignore signals sent to parent process (we're in our own group)
cmd.SysProcAttr = &syscall.SysProcAttr{
@@ -285,35 +393,32 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) {
return false, errwrap.Wrapf(err, "error starting cmd")
}
timeout := obj.Timeout
if timeout == 0 { // zero timeout means no timer, so disable it
timeout = -1
}
done := make(chan error)
go func() { done <- cmd.Wait() }()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-obj.interruptChan:
cancel()
case <-ctx.Done():
// let this exit
}
}()
select {
case e := <-done:
err = e // store
case <-util.TimeAfterOrBlock(timeout):
cmd.Process.Kill() // TODO: check error?
return false, fmt.Errorf("timeout for cmd")
}
err = cmd.Wait() // we can unblock this with the timeout
// save in memory for send/recv
// we use pointers to strings to indicate if used or not
if out.Stdout.Activity || out.Stderr.Activity {
str := out.String()
obj.Output = &str
obj.output = &str
}
if out.Stdout.Activity {
str := out.Stdout.String()
obj.Stdout = &str
obj.stdout = &str
}
if out.Stderr.Activity {
str := out.Stderr.String()
obj.Stderr = &str
obj.stderr = &str
}
// process the err result from cmd, we process non-zero exits here too!
@@ -324,7 +429,18 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) {
if !ok {
return false, errwrap.Wrapf(err, "error running cmd")
}
return false, fmt.Errorf("cmd error, exit status: %d", wStatus.ExitStatus())
exitStatus := wStatus.ExitStatus()
if !wStatus.Signaled() { // not a timeout or cancel (no signal)
return false, errwrap.Wrapf(err, "cmd error, exit status: %d", exitStatus)
}
sig := wStatus.Signal()
// we get this on timeout, because ctx calls cmd.Process.Kill()
if sig == syscall.SIGKILL {
return false, errwrap.Wrapf(err, "cmd timeout, exit status: %d", exitStatus)
}
return false, fmt.Errorf("unknown cmd error, signal: %s, exit status: %d", sig, exitStatus)
} else if err != nil {
return false, errwrap.Wrapf(err, "general cmd error")
@@ -334,11 +450,18 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) {
// would be nice, but it would require terminal log output that doesn't
// interleave all the parallel parts which would mix it all up...
if s := out.String(); s == "" {
obj.init.Logf("Command output is empty!")
obj.init.Logf("command output is empty!")
} else {
obj.init.Logf("Command output is:")
obj.init.Logf(out.String())
obj.init.Logf("command output is:")
obj.init.Logf(s)
}
if err := obj.init.Send(&ExecSends{
Output: obj.output,
Stdout: obj.stdout,
Stderr: obj.stderr,
}); err != nil {
return false, err
}
// The state tracking is for exec resources that can't "detect" their
@@ -351,58 +474,67 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) {
// Cmp compares two resources and returns an error if they are not equivalent.
func (obj *ExecRes) Cmp(r engine.Res) error {
if !obj.Compare(r) {
return fmt.Errorf("did not compare")
}
return nil
}
// Compare two resources and return if they are equivalent.
func (obj *ExecRes) Compare(r engine.Res) bool {
// we can only compare ExecRes to others of the same resource kind
res, ok := r.(*ExecRes)
if !ok {
return false
return fmt.Errorf("not a %s", obj.Kind())
}
if obj.Cmd != res.Cmd {
return false
return fmt.Errorf("the Cmd differs")
}
if len(obj.Args) != len(res.Args) {
return fmt.Errorf("the Args differ")
}
for i, a := range obj.Args {
if a != res.Args[i] {
return fmt.Errorf("the Args differ at index: %d", i)
}
}
if obj.Cwd != res.Cwd {
return false
return fmt.Errorf("the Cwd differs")
}
if obj.Shell != res.Shell {
return false
return fmt.Errorf("the Shell differs")
}
if obj.Timeout != res.Timeout {
return false
}
if obj.WatchCmd != res.WatchCmd {
return false
}
if obj.WatchCwd != res.WatchCwd {
return false
}
if obj.WatchShell != res.WatchShell {
return false
}
if obj.IfCmd != res.IfCmd {
return false
}
if obj.IfCwd != res.IfCwd {
return false
}
if obj.IfShell != res.IfShell {
return false
}
if obj.User != res.User {
return false
}
if obj.Group != res.Group {
return false
return fmt.Errorf("the Timeout differs")
}
return true
if obj.WatchCmd != res.WatchCmd {
return fmt.Errorf("the WatchCmd differs")
}
if obj.WatchCwd != res.WatchCwd {
return fmt.Errorf("the WatchCwd differs")
}
if obj.WatchShell != res.WatchShell {
return fmt.Errorf("the WatchShell differs")
}
if obj.IfCmd != res.IfCmd {
return fmt.Errorf("the IfCmd differs")
}
if obj.IfCwd != res.IfCwd {
return fmt.Errorf("the IfCwd differs")
}
if obj.IfShell != res.IfShell {
return fmt.Errorf("the IfShell differs")
}
if obj.User != res.User {
return fmt.Errorf("the User differs")
}
if obj.Group != res.Group {
return fmt.Errorf("the Group differs")
}
return nil
}
// Interrupt is called to ask the execution of this resource to end early.
func (obj *ExecRes) Interrupt() error {
close(obj.interruptChan)
return nil
}
// ExecUID is the UID struct for ExecRes.
@@ -453,13 +585,32 @@ func (obj *ExecRes) AutoEdges() (engine.AutoEdge, error) {
func (obj *ExecRes) UIDs() []engine.ResUID {
x := &ExecUID{
BaseUID: engine.BaseUID{Name: obj.Name(), Kind: obj.Kind()},
Cmd: obj.Cmd,
Cmd: obj.getCmd(),
IfCmd: obj.IfCmd,
// TODO: add more params here
}
return []engine.ResUID{x}
}
// ExecSends is the struct of data which is sent after a successful Apply.
type ExecSends struct {
// Output is the combined stdout and stderr of the command.
Output *string
// Stdout is the stdout of the command.
Stdout *string
// Stderr is the stderr of the command.
Stderr *string
}
// Sends represents the default struct of values we can send using Send/Recv.
func (obj *ExecRes) Sends() interface{} {
return &ExecSends{
Output: nil,
Stdout: nil,
Stderr: nil,
}
}
// UnmarshalYAML is the custom unmarshal handler for this struct.
// It is primarily useful for setting the defaults.
func (obj *ExecRes) UnmarshalYAML(unmarshal func(interface{}) error) error {
@@ -516,7 +667,7 @@ func (obj *ExecRes) cmdFiles() []string {
var paths []string
if obj.Shell != "" {
paths = append(paths, obj.Shell)
} else if cmdSplit := strings.Fields(obj.Cmd); len(cmdSplit) > 0 {
} else if cmdSplit := strings.Fields(obj.getCmd()); len(cmdSplit) > 0 {
paths = append(paths, cmdSplit[0])
}
if obj.WatchShell != "" {
@@ -532,36 +683,62 @@ func (obj *ExecRes) cmdFiles() []string {
return paths
}
// bufioOutput is the output struct of the bufioChanScanner channel output.
type bufioOutput struct {
// cmdOutput is the output struct of the cmdOutputRunner channel output. You
// should always check the error first. If it's nil, then you can assume the
// text data is good to use.
type cmdOutput struct {
text string
err error
}
// bufioChanScanner wraps the scanner output in a channel.
func (obj *ExecRes) bufioChanScanner(ctx context.Context, scanner *bufio.Scanner) chan *bufioOutput {
ch := make(chan *bufioOutput)
// cmdOutputRunner wraps the Cmd in with a StdoutPipe scanner and reads for
// errors. It runs Start and Wait, and errors runtime things in the channel.
// If it can't start up the command, it will fail early. Once it's running, it
// will return the channel which can be used for the duration of the process.
// Cancelling the context merely unblocks the sending on the output channel, it
// does not Kill the cmd process. For that you must do it yourself elsewhere.
func (obj *ExecRes) cmdOutputRunner(ctx context.Context, cmd *exec.Cmd) (chan *cmdOutput, error) {
cmdReader, err := cmd.StdoutPipe()
if err != nil {
return nil, errwrap.Wrapf(err, "error creating StdoutPipe for Cmd")
}
scanner := bufio.NewScanner(cmdReader)
if err := cmd.Start(); err != nil {
return nil, errwrap.Wrapf(err, "error starting Cmd")
}
ch := make(chan *cmdOutput)
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
defer close(ch)
for scanner.Scan() {
select {
case ch <- &bufioOutput{text: scanner.Text()}: // blocks here ?
case ch <- &cmdOutput{text: scanner.Text()}: // blocks here ?
case <-ctx.Done():
return
}
}
// on EOF, scanner.Err() will be nil
if err := scanner.Err(); err != nil {
reterr := scanner.Err()
if err := cmd.Wait(); err != nil { // always run Wait()
if reterr != nil {
reterr = multierr.Append(reterr, err)
} else {
reterr = err
}
}
// send any misc errors we encounter on the channel
if reterr != nil {
select {
case ch <- &bufioOutput{err: err}: // send any misc errors we encounter
case ch <- &cmdOutput{err: reterr}:
case <-ctx.Done():
return
}
}
}()
return ch
return ch, nil
}
// splitWriter mimics what the ssh.CombinedOutput command does, but stores the