diff --git a/examples/lib/exec-send-recv.go b/examples/lib/exec-send-recv.go new file mode 100644 index 00000000..191fdcea --- /dev/null +++ b/examples/lib/exec-send-recv.go @@ -0,0 +1,242 @@ +// libmgmt example of send->recv +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/purpleidea/mgmt/gapi" + mgmt "github.com/purpleidea/mgmt/lib" + "github.com/purpleidea/mgmt/pgraph" + "github.com/purpleidea/mgmt/resources" +) + +// MyGAPI implements the main GAPI interface. +type MyGAPI struct { + Name string // graph name + Interval uint // refresh interval, 0 to never refresh + + data gapi.Data + initialized bool + closeChan chan struct{} + wg sync.WaitGroup // sync group for tunnel go routines +} + +// NewMyGAPI creates a new MyGAPI struct and calls Init(). +func NewMyGAPI(data gapi.Data, name string, interval uint) (*MyGAPI, error) { + obj := &MyGAPI{ + Name: name, + Interval: interval, + } + return obj, obj.Init(data) +} + +// Init initializes the MyGAPI struct. +func (obj *MyGAPI) Init(data gapi.Data) error { + if obj.initialized { + return fmt.Errorf("already initialized") + } + if obj.Name == "" { + return fmt.Errorf("the graph name must be specified") + } + obj.data = data // store for later + obj.closeChan = make(chan struct{}) + obj.initialized = true + return nil +} + +// Graph returns a current Graph. +func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { + if !obj.initialized { + return nil, fmt.Errorf("libmgmt: MyGAPI is not initialized") + } + + g, err := pgraph.NewGraph(obj.Name) + if err != nil { + return nil, err + } + + // FIXME: these are being specified temporarily until it's the default! + metaparams := resources.DefaultMetaParams + + exec1 := &resources.ExecRes{ + BaseRes: resources.BaseRes{ + Name: "exec1", + MetaParams: metaparams, + }, + Cmd: "echo hello world && echo goodbye world 1>&2", // to stdout && stderr + Shell: "/bin/bash", + } + g.AddVertex(exec1) + + output := &resources.FileRes{ + BaseRes: resources.BaseRes{ + Name: "output", + MetaParams: metaparams, + // send->recv! + Recv: map[string]*resources.Send{ + "Content": {Res: exec1, Key: "Output"}, + }, + }, + Path: "/tmp/mgmt/output", + State: "present", + } + g.AddVertex(output) + g.AddEdge(exec1, output, &resources.Edge{Name: "e0"}) + + stdout := &resources.FileRes{ + BaseRes: resources.BaseRes{ + Name: "stdout", + MetaParams: metaparams, + // send->recv! + Recv: map[string]*resources.Send{ + "Content": {Res: exec1, Key: "Stdout"}, + }, + }, + Path: "/tmp/mgmt/stdout", + State: "present", + } + g.AddVertex(stdout) + g.AddEdge(exec1, stdout, &resources.Edge{Name: "e1"}) + + stderr := &resources.FileRes{ + BaseRes: resources.BaseRes{ + Name: "stderr", + MetaParams: metaparams, + // send->recv! + Recv: map[string]*resources.Send{ + "Content": {Res: exec1, Key: "Stderr"}, + }, + }, + Path: "/tmp/mgmt/stderr", + State: "present", + } + g.AddVertex(stderr) + g.AddEdge(exec1, stderr, &resources.Edge{Name: "e2"}) + + //g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.World, obj.data.Noop) + return g, nil +} + +// Next returns nil errors every time there could be a new graph. +func (obj *MyGAPI) Next() chan error { + ch := make(chan error) + obj.wg.Add(1) + go func() { + defer obj.wg.Done() + defer close(ch) // this will run before the obj.wg.Done() + if !obj.initialized { + ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") + return + } + startChan := make(chan struct{}) // start signal + close(startChan) // kick it off! + + ticker := make(<-chan time.Time) + if obj.data.NoStreamWatch || obj.Interval <= 0 { + ticker = nil + } else { + // arbitrarily change graph every interval seconds + t := time.NewTicker(time.Duration(obj.Interval) * time.Second) + defer t.Stop() + ticker = t.C + } + for { + select { + case <-startChan: // kick the loop once at start + startChan = nil // disable + // pass + case <-ticker: + // pass + case <-obj.closeChan: + return + } + + log.Printf("libmgmt: Generating new graph...") + select { + case ch <- nil: // trigger a run + case <-obj.closeChan: + return + } + } + }() + return ch +} + +// Close shuts down the MyGAPI. +func (obj *MyGAPI) Close() error { + if !obj.initialized { + return fmt.Errorf("libmgmt: MyGAPI is not initialized") + } + close(obj.closeChan) + obj.wg.Wait() + obj.initialized = false // closed = true + return nil +} + +// Run runs an embedded mgmt server. +func Run() error { + + obj := &mgmt.Main{} + obj.Program = "libmgmt" // TODO: set on compilation + obj.Version = "0.0.1" // TODO: set on compilation + obj.TmpPrefix = true // disable for easy debugging + //prefix := "/tmp/testprefix/" + //obj.Prefix = &p // enable for easy debugging + obj.IdealClusterSize = -1 + obj.ConvergedTimeout = -1 + obj.Noop = false // FIXME: careful! + + obj.GAPI = &MyGAPI{ // graph API + Name: "libmgmt", // TODO: set on compilation + Interval: 60 * 10, // arbitrarily change graph every 15 seconds + } + + if err := obj.Init(); err != nil { + return err + } + + // install the exit signal handler + exit := make(chan struct{}) + defer close(exit) + go func() { + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) // catch ^C + //signal.Notify(signals, os.Kill) // catch signals + signal.Notify(signals, syscall.SIGTERM) + + select { + case sig := <-signals: // any signal will do + if sig == os.Interrupt { + log.Println("Interrupted by ^C") + obj.Exit(nil) + return + } + log.Println("Interrupted by signal") + obj.Exit(fmt.Errorf("killed by %v", sig)) + return + case <-exit: + return + } + }() + + if err := obj.Run(); err != nil { + return err + } + return nil +} + +func main() { + log.Printf("Hello!") + if err := Run(); err != nil { + fmt.Println(err) + os.Exit(1) + return + } + log.Printf("Goodbye!") +} diff --git a/resources/exec.go b/resources/exec.go index 99b0698a..fd8fe558 100644 --- a/resources/exec.go +++ b/resources/exec.go @@ -25,6 +25,7 @@ import ( "log" "os/exec" "strings" + "sync" "syscall" "github.com/purpleidea/mgmt/util" @@ -40,13 +41,16 @@ func init() { // ExecRes is an exec resource for running commands. type ExecRes struct { BaseRes `yaml:",inline"` - Cmd string `yaml:"cmd"` // the command to run - Shell string `yaml:"shell"` // the (optional) shell to use to run the cmd - Timeout int `yaml:"timeout"` // the cmd timeout in seconds - WatchCmd string `yaml:"watchcmd"` // the watch command to run - WatchShell string `yaml:"watchshell"` // the (optional) shell to use to run the watch cmd - IfCmd string `yaml:"ifcmd"` // the if command to run - IfShell string `yaml:"ifshell"` // the (optional) shell to use to run the if cmd + Cmd string `yaml:"cmd"` // the command to run + Shell string `yaml:"shell"` // the (optional) shell to use to run the cmd + Timeout int `yaml:"timeout"` // the cmd timeout in seconds + WatchCmd string `yaml:"watchcmd"` // the watch command to run + WatchShell string `yaml:"watchshell"` // the (optional) shell to use to run the watch cmd + IfCmd string `yaml:"ifcmd"` // the if command to run + IfShell string `yaml:"ifshell"` // the (optional) shell to use to run the if cmd + Output *string // all cmd output, read only, do not set! + Stdout *string // the cmd stdout, read only, do not set! + Stderr *string // the cmd stderr, read only, do not set! } // Default returns some sensible defaults for this resource. @@ -244,8 +248,12 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) { Pgid: 0, } - var out bytes.Buffer - cmd.Stdout = &out + var out splitWriter + out.Init() + // from the docs: "If Stdout and Stderr are the same writer, at most one + // goroutine at a time will call Write." so we trick it here! + cmd.Stdout = out.Stdout + cmd.Stderr = out.Stderr if err := cmd.Start(); err != nil { return false, errwrap.Wrapf(err, "error starting cmd") @@ -268,6 +276,21 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) { return false, fmt.Errorf("timeout for cmd") } + // save in memory for send/recv + // we use pointers to strings to indicate if used or not + if out.Stdout.Activity || out.Stderr.Activity { + str := out.String() + obj.Output = &str + } + if out.Stdout.Activity { + str := out.Stdout.String() + obj.Stdout = &str + } + if out.Stderr.Activity { + str := out.Stderr.String() + obj.Stderr = &str + } + // process the err result from cmd, we process non-zero exits here too! exitErr, ok := err.(*exec.ExitError) // embeds an os.ProcessState if err != nil && ok { @@ -397,3 +420,71 @@ func (obj *ExecRes) UnmarshalYAML(unmarshal func(interface{}) error) error { *obj = ExecRes(raw) // restore from indirection with type conversion! return nil } + +// splitWriter mimics what the ssh.CombinedOutput command does, but stores the +// the stdout and stderr separately. This is slightly tricky because we don't +// want the combined output to be interleaved incorrectly. It creates sub writer +// structs which share the same lock and a shared output buffer. +type splitWriter struct { + Stdout *wrapWriter + Stderr *wrapWriter + + stdout bytes.Buffer // just the stdout + stderr bytes.Buffer // just the stderr + output bytes.Buffer // combined output + mutex *sync.Mutex + initialized bool // is this initialized? +} + +// Init initializes the splitWriter. +func (sw *splitWriter) Init() { + if sw.initialized { + panic("splitWriter is already initialized") + } + sw.mutex = &sync.Mutex{} + sw.Stdout = &wrapWriter{ + Mutex: sw.mutex, + Buffer: &sw.stdout, + Output: &sw.output, + } + sw.Stderr = &wrapWriter{ + Mutex: sw.mutex, + Buffer: &sw.stderr, + Output: &sw.output, + } + sw.initialized = true +} + +// String returns the contents of the combined output buffer. +func (sw *splitWriter) String() string { + if !sw.initialized { + panic("splitWriter is not initialized") + } + return sw.output.String() +} + +// wrapWriter is a simple writer which is used internally by splitWriter. +type wrapWriter struct { + Mutex *sync.Mutex + Buffer *bytes.Buffer // stdout or stderr + Output *bytes.Buffer // combined output + Activity bool // did we get any writes? +} + +// Write writes to both bytes buffers with a parent lock to mix output safely. +func (w *wrapWriter) Write(p []byte) (int, error) { + // TODO: can we move the lock to only guard around the Output.Write ? + w.Mutex.Lock() + defer w.Mutex.Unlock() + w.Activity = true + i, err := w.Buffer.Write(p) // first write + if err != nil { + return i, err + } + return w.Output.Write(p) // shared write +} + +// String returns the contents of the unshared buffer. +func (w *wrapWriter) String() string { + return w.Buffer.String() +} diff --git a/resources/exec_test.go b/resources/exec_test.go new file mode 100644 index 00000000..f6894103 --- /dev/null +++ b/resources/exec_test.go @@ -0,0 +1,178 @@ +// Mgmt +// Copyright (C) 2013-2017+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package resources + +import ( + "testing" +) + +func TestExecSendRecv1(t *testing.T) { + r1 := &ExecRes{ + BaseRes: BaseRes{ + Name: "exec1", + //MetaParams: MetaParams, + }, + Cmd: "echo hello world", + Shell: "/bin/bash", + } + + r1.Setup(nil, r1, r1) + defer func() { + if err := r1.Close(); err != nil { + t.Errorf("close failed with: %v", err) + } + }() + if err := r1.Init(); err != nil { + t.Errorf("init failed with: %v", err) + } + // run artificially without the entire engine + if _, err := r1.CheckApply(true); err != nil { + t.Errorf("checkapply failed with: %v", err) + } + + t.Logf("output is: %v", r1.Output) + if r1.Output != nil { + t.Logf("output is: %v", *r1.Output) + } + t.Logf("stdout is: %v", r1.Stdout) + if r1.Stdout != nil { + t.Logf("stdout is: %v", *r1.Stdout) + } + t.Logf("stderr is: %v", r1.Stderr) + if r1.Stderr != nil { + t.Logf("stderr is: %v", *r1.Stderr) + } + + if r1.Stdout == nil { + t.Errorf("stdout is nil") + } else { + if out := *r1.Stdout; out != "hello world\n" { + t.Errorf("got wrong stdout(%d): %s", len(out), out) + } + } +} + +func TestExecSendRecv2(t *testing.T) { + r1 := &ExecRes{ + BaseRes: BaseRes{ + Name: "exec1", + //MetaParams: MetaParams, + }, + Cmd: "echo hello world 1>&2", // to stderr + Shell: "/bin/bash", + } + + r1.Setup(nil, r1, r1) + defer func() { + if err := r1.Close(); err != nil { + t.Errorf("close failed with: %v", err) + } + }() + if err := r1.Init(); err != nil { + t.Errorf("init failed with: %v", err) + } + // run artificially without the entire engine + if _, err := r1.CheckApply(true); err != nil { + t.Errorf("checkapply failed with: %v", err) + } + + t.Logf("output is: %v", r1.Output) + if r1.Output != nil { + t.Logf("output is: %v", *r1.Output) + } + t.Logf("stdout is: %v", r1.Stdout) + if r1.Stdout != nil { + t.Logf("stdout is: %v", *r1.Stdout) + } + t.Logf("stderr is: %v", r1.Stderr) + if r1.Stderr != nil { + t.Logf("stderr is: %v", *r1.Stderr) + } + + if r1.Stderr == nil { + t.Errorf("stderr is nil") + } else { + if out := *r1.Stderr; out != "hello world\n" { + t.Errorf("got wrong stderr(%d): %s", len(out), out) + } + } +} + +func TestExecSendRecv3(t *testing.T) { + r1 := &ExecRes{ + BaseRes: BaseRes{ + Name: "exec1", + //MetaParams: MetaParams, + }, + Cmd: "echo hello world && echo goodbye world 1>&2", // to stdout && stderr + Shell: "/bin/bash", + } + + r1.Setup(nil, r1, r1) + defer func() { + if err := r1.Close(); err != nil { + t.Errorf("close failed with: %v", err) + } + }() + if err := r1.Init(); err != nil { + t.Errorf("init failed with: %v", err) + } + // run artificially without the entire engine + if _, err := r1.CheckApply(true); err != nil { + t.Errorf("checkapply failed with: %v", err) + } + + t.Logf("output is: %v", r1.Output) + if r1.Output != nil { + t.Logf("output is: %v", *r1.Output) + } + t.Logf("stdout is: %v", r1.Stdout) + if r1.Stdout != nil { + t.Logf("stdout is: %v", *r1.Stdout) + } + t.Logf("stderr is: %v", r1.Stderr) + if r1.Stderr != nil { + t.Logf("stderr is: %v", *r1.Stderr) + } + + if r1.Output == nil { + t.Errorf("output is nil") + } else { + // it looks like bash or golang race to the write, so whichever + // order they come out in is ok, as long as they come out whole + if out := *r1.Output; out != "hello world\ngoodbye world\n" && out != "goodbye world\nhello world\n" { + t.Errorf("got wrong output(%d): %s", len(out), out) + } + } + + if r1.Stdout == nil { + t.Errorf("stdout is nil") + } else { + if out := *r1.Stdout; out != "hello world\n" { + t.Errorf("got wrong stdout(%d): %s", len(out), out) + } + } + + if r1.Stderr == nil { + t.Errorf("stderr is nil") + } else { + if out := *r1.Stderr; out != "goodbye world\n" { + t.Errorf("got wrong stderr(%d): %s", len(out), out) + } + } +}