resources: exec: Add send/recv for exec output, stdout and stderr
This adds send/recv output parameters from exec for stdout, stderr, and output which is a combination of those two. This also includes a few tests, and a working example too! Gone are the `some_command > some_file` days of puppet.
This commit is contained in:
@@ -25,6 +25,7 @@ import (
|
||||
"log"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
@@ -40,13 +41,16 @@ func init() {
|
||||
// ExecRes is an exec resource for running commands.
|
||||
type ExecRes struct {
|
||||
BaseRes `yaml:",inline"`
|
||||
Cmd string `yaml:"cmd"` // the command to run
|
||||
Shell string `yaml:"shell"` // the (optional) shell to use to run the cmd
|
||||
Timeout int `yaml:"timeout"` // the cmd timeout in seconds
|
||||
WatchCmd string `yaml:"watchcmd"` // the watch command to run
|
||||
WatchShell string `yaml:"watchshell"` // the (optional) shell to use to run the watch cmd
|
||||
IfCmd string `yaml:"ifcmd"` // the if command to run
|
||||
IfShell string `yaml:"ifshell"` // the (optional) shell to use to run the if cmd
|
||||
Cmd string `yaml:"cmd"` // the command to run
|
||||
Shell string `yaml:"shell"` // the (optional) shell to use to run the cmd
|
||||
Timeout int `yaml:"timeout"` // the cmd timeout in seconds
|
||||
WatchCmd string `yaml:"watchcmd"` // the watch command to run
|
||||
WatchShell string `yaml:"watchshell"` // the (optional) shell to use to run the watch cmd
|
||||
IfCmd string `yaml:"ifcmd"` // the if command to run
|
||||
IfShell string `yaml:"ifshell"` // the (optional) shell to use to run the if cmd
|
||||
Output *string // all cmd output, read only, do not set!
|
||||
Stdout *string // the cmd stdout, read only, do not set!
|
||||
Stderr *string // the cmd stderr, read only, do not set!
|
||||
}
|
||||
|
||||
// Default returns some sensible defaults for this resource.
|
||||
@@ -244,8 +248,12 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) {
|
||||
Pgid: 0,
|
||||
}
|
||||
|
||||
var out bytes.Buffer
|
||||
cmd.Stdout = &out
|
||||
var out splitWriter
|
||||
out.Init()
|
||||
// from the docs: "If Stdout and Stderr are the same writer, at most one
|
||||
// goroutine at a time will call Write." so we trick it here!
|
||||
cmd.Stdout = out.Stdout
|
||||
cmd.Stderr = out.Stderr
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
return false, errwrap.Wrapf(err, "error starting cmd")
|
||||
@@ -268,6 +276,21 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) {
|
||||
return false, fmt.Errorf("timeout for cmd")
|
||||
}
|
||||
|
||||
// save in memory for send/recv
|
||||
// we use pointers to strings to indicate if used or not
|
||||
if out.Stdout.Activity || out.Stderr.Activity {
|
||||
str := out.String()
|
||||
obj.Output = &str
|
||||
}
|
||||
if out.Stdout.Activity {
|
||||
str := out.Stdout.String()
|
||||
obj.Stdout = &str
|
||||
}
|
||||
if out.Stderr.Activity {
|
||||
str := out.Stderr.String()
|
||||
obj.Stderr = &str
|
||||
}
|
||||
|
||||
// process the err result from cmd, we process non-zero exits here too!
|
||||
exitErr, ok := err.(*exec.ExitError) // embeds an os.ProcessState
|
||||
if err != nil && ok {
|
||||
@@ -397,3 +420,71 @@ func (obj *ExecRes) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||
*obj = ExecRes(raw) // restore from indirection with type conversion!
|
||||
return nil
|
||||
}
|
||||
|
||||
// splitWriter mimics what the ssh.CombinedOutput command does, but stores the
|
||||
// the stdout and stderr separately. This is slightly tricky because we don't
|
||||
// want the combined output to be interleaved incorrectly. It creates sub writer
|
||||
// structs which share the same lock and a shared output buffer.
|
||||
type splitWriter struct {
|
||||
Stdout *wrapWriter
|
||||
Stderr *wrapWriter
|
||||
|
||||
stdout bytes.Buffer // just the stdout
|
||||
stderr bytes.Buffer // just the stderr
|
||||
output bytes.Buffer // combined output
|
||||
mutex *sync.Mutex
|
||||
initialized bool // is this initialized?
|
||||
}
|
||||
|
||||
// Init initializes the splitWriter.
|
||||
func (sw *splitWriter) Init() {
|
||||
if sw.initialized {
|
||||
panic("splitWriter is already initialized")
|
||||
}
|
||||
sw.mutex = &sync.Mutex{}
|
||||
sw.Stdout = &wrapWriter{
|
||||
Mutex: sw.mutex,
|
||||
Buffer: &sw.stdout,
|
||||
Output: &sw.output,
|
||||
}
|
||||
sw.Stderr = &wrapWriter{
|
||||
Mutex: sw.mutex,
|
||||
Buffer: &sw.stderr,
|
||||
Output: &sw.output,
|
||||
}
|
||||
sw.initialized = true
|
||||
}
|
||||
|
||||
// String returns the contents of the combined output buffer.
|
||||
func (sw *splitWriter) String() string {
|
||||
if !sw.initialized {
|
||||
panic("splitWriter is not initialized")
|
||||
}
|
||||
return sw.output.String()
|
||||
}
|
||||
|
||||
// wrapWriter is a simple writer which is used internally by splitWriter.
|
||||
type wrapWriter struct {
|
||||
Mutex *sync.Mutex
|
||||
Buffer *bytes.Buffer // stdout or stderr
|
||||
Output *bytes.Buffer // combined output
|
||||
Activity bool // did we get any writes?
|
||||
}
|
||||
|
||||
// Write writes to both bytes buffers with a parent lock to mix output safely.
|
||||
func (w *wrapWriter) Write(p []byte) (int, error) {
|
||||
// TODO: can we move the lock to only guard around the Output.Write ?
|
||||
w.Mutex.Lock()
|
||||
defer w.Mutex.Unlock()
|
||||
w.Activity = true
|
||||
i, err := w.Buffer.Write(p) // first write
|
||||
if err != nil {
|
||||
return i, err
|
||||
}
|
||||
return w.Output.Write(p) // shared write
|
||||
}
|
||||
|
||||
// String returns the contents of the unshared buffer.
|
||||
func (w *wrapWriter) String() string {
|
||||
return w.Buffer.String()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user