engine: resources: Add a WatchFiles field to exec
This adds a field that takes a list of files for exec to watch for events on.
This commit is contained in:
committed by
James Shubin
parent
4903995052
commit
f2a6a6769f
@@ -120,6 +120,9 @@ type ExecRes struct {
|
|||||||
// WatchCwd is the Cwd for the WatchCmd. See the docs for Cwd.
|
// WatchCwd is the Cwd for the WatchCmd. See the docs for Cwd.
|
||||||
WatchCwd string `lang:"watchcwd" yaml:"watchcwd"`
|
WatchCwd string `lang:"watchcwd" yaml:"watchcwd"`
|
||||||
|
|
||||||
|
// WatchFiles is a list of files that will be kept track of.
|
||||||
|
WatchFiles []string `lang:"watchfiles" yaml:"watchfiles"`
|
||||||
|
|
||||||
// WatchShell is the Shell for the WatchCmd. See the docs for Shell.
|
// WatchShell is the Shell for the WatchCmd. See the docs for Shell.
|
||||||
WatchShell string `lang:"watchshell" yaml:"watchshell"`
|
WatchShell string `lang:"watchshell" yaml:"watchshell"`
|
||||||
|
|
||||||
@@ -216,6 +219,12 @@ func (obj *ExecRes) Validate() error {
|
|||||||
return fmt.Errorf("the Args param can't be used when Cmd has args")
|
return fmt.Errorf("the Args param can't be used when Cmd has args")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, file := range obj.WatchFiles {
|
||||||
|
if !strings.HasPrefix(file, "/") {
|
||||||
|
return fmt.Errorf("the path (`%s`) in WatchFiles must be absolute", file)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if obj.Creates != "" && !strings.HasPrefix(obj.Creates, "/") {
|
if obj.Creates != "" && !strings.HasPrefix(obj.Creates, "/") {
|
||||||
return fmt.Errorf("the Creates param must be an absolute path")
|
return fmt.Errorf("the Creates param must be an absolute path")
|
||||||
}
|
}
|
||||||
@@ -263,10 +272,13 @@ func (obj *ExecRes) Cleanup() error {
|
|||||||
|
|
||||||
// Watch is the primary listener for this resource and it outputs events.
|
// Watch is the primary listener for this resource and it outputs events.
|
||||||
func (obj *ExecRes) Watch(ctx context.Context) error {
|
func (obj *ExecRes) Watch(ctx context.Context) error {
|
||||||
defer obj.wg.Wait()
|
wg := &sync.WaitGroup{}
|
||||||
|
defer wg.Wait()
|
||||||
|
|
||||||
ioChan := make(chan *cmdOutput)
|
ioChan := make(chan *cmdOutput)
|
||||||
rwChan := make(chan recwatch.Event)
|
rwChan := make(chan recwatch.Event)
|
||||||
|
filesChan := make(chan recwatch.Event)
|
||||||
|
|
||||||
var watchCmd *exec.Cmd
|
var watchCmd *exec.Cmd
|
||||||
if obj.WatchCmd != "" {
|
if obj.WatchCmd != "" {
|
||||||
var cmdName string
|
var cmdName string
|
||||||
@@ -306,6 +318,46 @@ func (obj *ExecRes) Watch(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, file := range obj.WatchFiles {
|
||||||
|
recurse := strings.HasSuffix(file, "/") // check if it's a file or dir
|
||||||
|
recWatcher, err := recwatch.NewRecWatcher(file, recurse)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer recWatcher.Close()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for {
|
||||||
|
var files recwatch.Event
|
||||||
|
var ok bool
|
||||||
|
var shutdown bool
|
||||||
|
|
||||||
|
select {
|
||||||
|
case files, ok = <-recWatcher.Events(): // receiving events
|
||||||
|
case <-ctx.Done(): // unblock
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
err := fmt.Errorf("channel shutdown")
|
||||||
|
files = recwatch.Event{Error: err}
|
||||||
|
shutdown = true
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case filesChan <- files: // send events
|
||||||
|
if shutdown { // optimization to free early
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
if obj.Creates != "" {
|
if obj.Creates != "" {
|
||||||
recWatcher, err := recwatch.NewRecWatcher(obj.Creates, false)
|
recWatcher, err := recwatch.NewRecWatcher(obj.Creates, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -369,6 +421,15 @@ func (obj *ExecRes) Watch(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
send = true
|
send = true
|
||||||
|
|
||||||
|
case files, ok := <-filesChan:
|
||||||
|
if !ok { // channel shutdown
|
||||||
|
return fmt.Errorf("unexpected recwatch shutdown")
|
||||||
|
}
|
||||||
|
if err := files.Error; err != nil {
|
||||||
|
return errwrap.Wrapf(err, "unknown %s watcher error", obj)
|
||||||
|
}
|
||||||
|
send = true
|
||||||
|
|
||||||
case <-ctx.Done(): // closed by the engine to signal shutdown
|
case <-ctx.Done(): // closed by the engine to signal shutdown
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -828,6 +889,9 @@ func (obj *ExecRes) Cmp(r engine.Res) error {
|
|||||||
if obj.WatchShell != res.WatchShell {
|
if obj.WatchShell != res.WatchShell {
|
||||||
return fmt.Errorf("the WatchShell differs")
|
return fmt.Errorf("the WatchShell differs")
|
||||||
}
|
}
|
||||||
|
if err := engineUtil.StrListCmp(obj.WatchFiles, res.WatchFiles); err != nil {
|
||||||
|
return errwrap.Wrapf(err, "the WatchFiles differ")
|
||||||
|
}
|
||||||
|
|
||||||
if obj.IfCmd != res.IfCmd {
|
if obj.IfCmd != res.IfCmd {
|
||||||
return fmt.Errorf("the IfCmd differs")
|
return fmt.Errorf("the IfCmd differs")
|
||||||
|
|||||||
6
examples/lang/exec1.mcl
Normal file
6
examples/lang/exec1.mcl
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
exec "exec1" {
|
||||||
|
cmd => "echo hello world > /tmp/whatever",
|
||||||
|
shell => "/bin/bash",
|
||||||
|
creates => "/tmp/whatever",
|
||||||
|
watchfiles => ["/tmp/whatever", "/tmp/adir/",],
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user