// Mgmt // Copyright (C) James Shubin and the project contributors // Written by James Shubin and the project contributors // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see . // // Additional permission under GNU GPL version 3 section 7 // // If you modify this program, or any covered work, by linking or combining it // with embedded mcl code and modules (and that the embedded mcl code and // modules which link with this program, contain a copy of their source code in // the authoritative form) containing parts covered by the terms of any other // license, the licensors of this program grant you additional permission to // convey the resulting work. Furthermore, the licensors of this program grant // the original author, James Shubin, additional permission to update this // additional permission if he deems it necessary to achieve the goals of this // additional permission. package coreos import ( "bufio" "context" "fmt" "os/exec" "sync" "github.com/purpleidea/mgmt/lang/funcs" "github.com/purpleidea/mgmt/lang/interfaces" "github.com/purpleidea/mgmt/lang/types" ) const ( // SystemFuncName is the name this function is registered as. SystemFuncName = "system" // arg names... systemArgNameCmd = "cmd" // SystemFuncBufferLength is the number of lines we can buffer before we // block. If you need a larger value, please let us know your use-case. SystemFuncBufferLength = 1024 ) func init() { funcs.ModuleRegister(ModuleName, SystemFuncName, func() interfaces.Func { return &SystemFunc{} }) } // SystemFunc runs a string as a shell command, then produces each line from // stdout. If the input string changes, then the commands are executed one after // the other and the concatenation of their outputs is produced line by line. // // Note that in the likely case in which the process emits several lines one // after the other, the downstream resources might not run for every line unless // the "Meta:realize" metaparam is set to true. // // Furthermore, there is no guarantee that every intermediate line will be seen, // particularly if there is no delay between them. Only the last line is // guaranteed. As a result, it is not recommend to use this for timing or // coordination. If you are using this for an intermediate value, or a // non-declarative system, then it's likely you are using this wrong. type SystemFunc struct { init *interfaces.Init cancel context.CancelFunc input chan string // stream of inputs last *string // the active command output *string // the last output values chan string count int mutex *sync.Mutex } // String returns a simple name for this function. This is needed so this struct // can satisfy the pgraph.Vertex interface. func (obj *SystemFunc) String() string { return SystemFuncName } // ArgGen returns the Nth arg name for this function. func (obj *SystemFunc) ArgGen(index int) (string, error) { seq := []string{systemArgNameCmd} if l := len(seq); index >= l { return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) } return seq[index], nil } // Validate makes sure we've built our struct properly. It is usually unused for // normal functions that users can use directly. func (obj *SystemFunc) Validate() error { return nil } // Info returns some static info about itself. func (obj *SystemFunc) Info() *interfaces.Info { return &interfaces.Info{ Pure: false, // definitely false Memo: false, Fast: false, Spec: false, Sig: types.NewType(fmt.Sprintf("func(%s str) str", systemArgNameCmd)), Err: obj.Validate(), } } // Init runs some startup code for this function. func (obj *SystemFunc) Init(init *interfaces.Init) error { obj.init = init obj.input = make(chan string) obj.values = make(chan string, SystemFuncBufferLength) obj.mutex = &sync.Mutex{} return nil } // Stream returns the changing values that this func has over time. func (obj *SystemFunc) Stream(ctx context.Context) (reterr error) { // XXX: this implementation is a bit awkward especially with the port to // the Stream(context.Context) signature change. This is a straight port // but we could refactor this eventually. // A channel which closes when the current process exits, on its own // or due to cancel(). The channel is only closed once all the pending // stdout and stderr lines have been processed. // // The channel starts closed because no process is running yet. A new // channel is created each time a new process is started. We never run // more than one process at a time. processedChan := make(chan struct{}) close(processedChan) // Wait for the current process to exit, if any. defer func() { <-processedChan }() // Kill the current process, if any. A new cancel function is created // each time a new process is started. var innerCtx context.Context defer func() { if obj.cancel == nil { return } obj.cancel() }() for { select { case shellCommand, more := <-obj.input: if !more { // Wait until the current process exits and all of its // stdout is sent downstream. select { case <-processedChan: return nil case <-ctx.Done(): return nil } } if obj.last != nil && *obj.last == shellCommand { continue // nothing changed } obj.last = &shellCommand // Kill the previous command, if any. if obj.cancel != nil { obj.cancel() } <-processedChan // Run the command, connecting it to ctx so we can kill // it if needed, and to two Readers so we can read its // stdout and stderr. innerCtx, obj.cancel = context.WithCancel(context.Background()) cmd := exec.CommandContext(innerCtx, "sh", "-c", shellCommand) stdoutReader, err := cmd.StdoutPipe() if err != nil { return err } stderrReader, err := cmd.StderrPipe() if err != nil { return err } if err = cmd.Start(); err != nil { return err } // We will now start several goroutines: // 1. To process stdout // 2. To process stderr // 3. To wait for (1) and (2) to terminate and close processedChan // // This WaitGroup is used by (3) to wait for (1) and (2). wg := &sync.WaitGroup{} // Emit one value downstream for each line from stdout. // Terminates when the process exits, on its own or due // to cancel(). wg.Add(1) go func() { defer wg.Done() stdoutScanner := bufio.NewScanner(stdoutReader) for stdoutScanner.Scan() { s := stdoutScanner.Text() obj.mutex.Lock() obj.count++ obj.mutex.Unlock() select { case obj.values <- s: // buffered case <-ctx.Done(): // don't block here on shutdown reterr = ctx.Err() // return err return } if err := obj.init.Event(ctx); err != nil { // send event reterr = err // return err return } } }() // Log the lines from stderr, to help the user debug. // Terminates when the process exits, on its own or // due to cancel(). wg.Add(1) go func() { defer wg.Done() stderrScanner := bufio.NewScanner(stderrReader) for stderrScanner.Scan() { obj.init.Logf("system: \"%v\": stderr: %v\n", shellCommand, stderrScanner.Text()) } }() // Closes processedChan after the previous two // goroutines terminate. Thus, this goroutine also // terminates when the process exits, on its own or due // to cancel(). processedChan = make(chan struct{}) go func() { wg.Wait() close(processedChan) }() case <-ctx.Done(): return ctx.Err() } } } // Call this function with the input args and return the value if it is possible // to do so at this time. func (obj *SystemFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { if len(args) < 1 { return nil, fmt.Errorf("not enough args") } cmd := args[0].Str() // Check before we send to a chan where we'd need Stream to be running. if obj.init == nil { return nil, funcs.ErrCantSpeculate } // Tell the Stream what we're watching now... This doesn't block because // Stream should always be ready to consume unless it's closing down... // If it dies, then a ctx closure should come soon. select { case obj.input <- cmd: case <-ctx.Done(): return nil, ctx.Err() } obj.mutex.Lock() // If there are no values, and we've previously received a value then... if obj.count == 0 && obj.output != nil { s := *obj.output obj.mutex.Unlock() return &types.StrValue{ V: s, }, nil } obj.count-- // we might be briefly negative obj.mutex.Unlock() // We know a value must be coming (or the command blocks) so we wait... select { case s, ok := <-obj.values: if !ok { return nil, fmt.Errorf("unexpected close") } obj.output = &s // store return &types.StrValue{ V: s, }, nil case <-ctx.Done(): return nil, ctx.Err() } } // Done is a message from the engine to tell us that no more Call's are coming. func (obj *SystemFunc) Done() error { close(obj.input) // At this point we know obj.input won't be used. return nil }