diff --git a/examples/lang/system_func.mcl b/examples/lang/system_func.mcl new file mode 100644 index 00000000..dd580a79 --- /dev/null +++ b/examples/lang/system_func.mcl @@ -0,0 +1,70 @@ +import "datetime" +import "fmt" +import "os" + +# Re-runs the command every second, but the output of the calculation +# ($now / 10) % 10 is the same for 10 seconds in a row. +# +# The value of $n over time looks like this: +# +# [...] +# 8 +# [10 seconds pause] +# 9 +# [10 seconds pause] +# 0 +# [10 seconds pause] +# 1 +# [10 seconds pause] +# 2 +# [...] +$n = os.system(fmt.printf("echo \"(%d / 10) %% 10\" | bc", datetime.now())) + +# Re-runs the command every 10 seconds, when $n changes. Produces a str for +# every line that the command outputs, which happens to be every $n seconds. +# +# The value of $i over time looks like this: +# +# [...] +# 108 +# [8 seconds pause] +# 208 +# [10 - 8 = 2 seconds pause, aborts and restarts] +# 109 +# [9 seconds pause] +# 209 +# [10 - 9 = 1 second pause, aborts and restarts] +# 100 +# 200 +# 300 +# done +# double done +# [10 - 0 - 0 - 0 = 10 seconds pause, restarts] +# 101 +# [1 second pause] +# 201 +# [1 second pause] +# 301 +# [1 second pause] +# done +# double done +# [10 - 1 - 1 - 1 = 7 seconds pause, restarts] +# 102 +# [2 seconds pause] +# 202 +# [2 seconds pause] +# 302 +# [2 seconds pause] +# done +# double done +# [10 - 2 - 2 - 2 = 4 seconds pause, restarts] +# [...] +$i = os.system(fmt.printf("for x in `seq 3`; do echo \"100 * $x + %s\" | bc; sleep %s; done; echo done; echo double done", $n, $n)) + +# The anotherstr field is updated every time $i changes, however when such a +# field changes several times in quick succession, the resource is only +# guaranteed to be ran for the last value. Thus, it is likely that the "done" +# values will not be printed. +test "out" { + anotherstr => $i, +} diff --git a/lang/funcs/core/os/system_func.go b/lang/funcs/core/os/system_func.go new file mode 100644 index 00000000..6e00b956 --- /dev/null +++ b/lang/funcs/core/os/system_func.go @@ -0,0 +1,198 @@ +// Mgmt +// Copyright (C) 2013-2022+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package coreos + +import ( + "bufio" + "context" + "fmt" + "os/exec" + "sync" + + "github.com/purpleidea/mgmt/lang/funcs" + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/lang/types" +) + +func init() { + funcs.ModuleRegister(ModuleName, "system", func() interfaces.Func { return &SystemFunc{} }) +} + +// SystemFunc runs a string as a shell command, then produces each line from +// stdout. If the input string changes, then the commands are executed one after +// the other and the concatenation of their outputs is produced line by line. +// +// Note that in the likely case in which the process emits several lines one +// after the other, the downstream resources might not run for every line unless +// the "Meta:realize" metaparam is set to true. +type SystemFunc struct { + init *interfaces.Init + + closeChan chan struct{} +} + +// ArgGen returns the Nth arg name for this function. +func (obj *SystemFunc) ArgGen(index int) (string, error) { + seq := []string{"shell_command"} + if l := len(seq); index >= l { + return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) + } + return seq[index], nil +} + +// Validate makes sure we've built our struct properly. It is usually unused for +// normal functions that users can use directly. +func (obj *SystemFunc) Validate() error { + return nil +} + +// Info returns some static info about itself. +func (obj *SystemFunc) Info() *interfaces.Info { + return &interfaces.Info{ + Pure: false, // definitely false + Memo: false, + Sig: types.NewType("func(shell_command str) str"), + Err: obj.Validate(), + } +} + +// Init runs some startup code for this function. +func (obj *SystemFunc) Init(init *interfaces.Init) error { + obj.init = init + obj.closeChan = make(chan struct{}) + return nil +} + +// Stream returns the changing values that this func has over time. +func (obj *SystemFunc) Stream() error { + // Close the output chan to signal that no more values are coming. + defer close(obj.init.Output) + + // A channel which closes when the current process exits, on its own + // or due to cancel(). The channel is only closed once all the pending + // stdout and stderr lines have been processed. + // + // The channel starts closed because no process is running yet. A new + // channel is created each time a new process is started. We never run + // more than one process at a time. + processedChan := make(chan struct{}) + close(processedChan) + + // Wait for the current process to exit, if any. + defer func() { + <-processedChan + }() + + // Kill the current process, if any. A new cancel function is created + // each time a new process is started. + var ctx context.Context + var cancel context.CancelFunc + cancel = func() {} + defer cancel() + + for { + select { + case input, more := <-obj.init.Input: + if !more { + // Wait until the current process exits and all of its + // stdout is sent downstream. + select { + case <-processedChan: + return nil + case <-obj.closeChan: + return nil + } + } + shellCommand := input.Struct()["shell_command"].Str() + + // Kill the previous command, if any. + cancel() + <-processedChan + + // Run the command, connecting it to ctx so we can kill + // it if needed, and to two Readers so we can read its + // stdout and stderr. + ctx, cancel = context.WithCancel(context.Background()) + cmd := exec.CommandContext(ctx, "sh", "-c", shellCommand) + stdoutReader, err := cmd.StdoutPipe() + if err != nil { + return err + } + stderrReader, err := cmd.StderrPipe() + if err != nil { + return err + } + if err = cmd.Start(); err != nil { + return err + } + + // We will now start several goroutines: + // 1. To process stdout + // 2. To process stderr + // 3. To wait for (1) and (2) to terminate and close processedChan + // + // This WaitGroup is used by (3) to wait for (1) and (2). + wg := &sync.WaitGroup{} + + // Emit one value downstream for each line from stdout. + // Terminates when the process exits, on its own or due + // to cancel(). + wg.Add(1) + go func() { + defer wg.Done() + + stdoutScanner := bufio.NewScanner(stdoutReader) + for stdoutScanner.Scan() { + outputValue := &types.StrValue{V: stdoutScanner.Text()} + obj.init.Output <- outputValue + } + }() + + // Log the lines from stderr, to help the user debug. + // Terminates when the process exits, on its own or + // due to cancel(). + wg.Add(1) + go func() { + defer wg.Done() + + stderrScanner := bufio.NewScanner(stderrReader) + for stderrScanner.Scan() { + obj.init.Logf("system: \"%v\": stderr: %v\n", shellCommand, stderrScanner.Text()) + } + }() + + // Closes processedChan after the previous two + // goroutines terminate. Thus, this goroutine also + // terminates when the process exits, on its own or due + // to cancel(). + processedChan = make(chan struct{}) + go func() { + wg.Wait() + close(processedChan) + }() + case <-obj.closeChan: + return nil + } + } +} + +// Close runs some shutdown code for this function and turns off the stream. +func (obj *SystemFunc) Close() error { + close(obj.closeChan) + return nil +}