diff --git a/lang/core/os/readfilewait.go b/lang/core/os/readfilewait.go new file mode 100644 index 00000000..01809fc9 --- /dev/null +++ b/lang/core/os/readfilewait.go @@ -0,0 +1,274 @@ +// Mgmt +// Copyright (C) James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// +// Additional permission under GNU GPL version 3 section 7 +// +// If you modify this program, or any covered work, by linking or combining it +// with embedded mcl code and modules (and that the embedded mcl code and +// modules which link with this program, contain a copy of their source code in +// the authoritative form) containing parts covered by the terms of any other +// license, the licensors of this program grant you additional permission to +// convey the resulting work. Furthermore, the licensors of this program grant +// the original author, James Shubin, additional permission to update this +// additional permission if he deems it necessary to achieve the goals of this +// additional permission. + +package coreos + +import ( + "context" + "fmt" + "os" + "sync" + + "github.com/purpleidea/mgmt/lang/funcs" + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/lang/types" + "github.com/purpleidea/mgmt/util/errwrap" + "github.com/purpleidea/mgmt/util/recwatch" +) + +const ( + // ReadFileWaitFuncName is the name this function is registered as. + ReadFileWaitFuncName = "readfilewait" + + // arg names... + readFileWaitArgNameFilename = "filename" +) + +func init() { + funcs.ModuleRegister(ModuleName, ReadFileWaitFuncName, func() interfaces.Func { return &ReadFileWaitFunc{} }) // must register the func and name +} + +// ReadFileWaitFunc is a function that reads the full contents from a local +// file. If the file contents change or the file path changes, a new string will +// be sent. This is different from the normal readfile in that if will not error +// if the file doesn't exist. If the file is not found, it returns the empty +// string. If the file re-appears, it returns those new contents. This function +// will eventually be deprecated when the function graph error system is stable. +type ReadFileWaitFunc struct { + init *interfaces.Init + last types.Value // last value received to use for diff + + recWatcher *recwatch.RecWatcher + events chan error // internal events + wg *sync.WaitGroup + + args []types.Value + filename *string // the active filename + result types.Value // last calculated output +} + +// String returns a simple name for this function. This is needed so this struct +// can satisfy the pgraph.Vertex interface. +func (obj *ReadFileWaitFunc) String() string { + return ReadFileWaitFuncName +} + +// ArgGen returns the Nth arg name for this function. +func (obj *ReadFileWaitFunc) ArgGen(index int) (string, error) { + seq := []string{readFileWaitArgNameFilename} + if l := len(seq); index >= l { + return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) + } + return seq[index], nil +} + +// Validate makes sure we've built our struct properly. It is usually unused for +// normal functions that users can use directly. +func (obj *ReadFileWaitFunc) Validate() error { + return nil +} + +// Info returns some static info about itself. +func (obj *ReadFileWaitFunc) Info() *interfaces.Info { + return &interfaces.Info{ + Pure: false, // maybe false because the file contents can change + Memo: false, + Fast: false, + Spec: false, + Sig: types.NewType(fmt.Sprintf("func(%s str) str", readFileWaitArgNameFilename)), + } +} + +// Init runs some startup code for this function. +func (obj *ReadFileWaitFunc) Init(init *interfaces.Init) error { + obj.init = init + obj.events = make(chan error) + obj.wg = &sync.WaitGroup{} + return nil +} + +// Stream returns the changing values that this func has over time. +func (obj *ReadFileWaitFunc) Stream(ctx context.Context) error { + defer close(obj.init.Output) // the sender closes + defer close(obj.events) // clean up for fun + defer obj.wg.Wait() + defer func() { + if obj.recWatcher != nil { + obj.recWatcher.Close() // close previous watcher + obj.wg.Wait() + } + }() + for { + select { + case input, ok := <-obj.init.Input: + if !ok { + obj.init.Input = nil // don't infinite loop back + continue // no more inputs, but don't return! + } + //if err := input.Type().Cmp(obj.Info().Sig.Input); err != nil { + // return errwrap.Wrapf(err, "wrong function input") + //} + + if obj.last != nil && input.Cmp(obj.last) == nil { + continue // value didn't change, skip it + } + obj.last = input // store for next + + filename := input.Struct()[readFileWaitArgNameFilename].Str() + // TODO: add validation for absolute path? + // TODO: add check for empty string + if obj.filename != nil && *obj.filename == filename { + continue // nothing changed + } + obj.filename = &filename + + if obj.recWatcher != nil { + obj.recWatcher.Close() // close previous watcher + obj.wg.Wait() + } + // create new watcher + obj.recWatcher = &recwatch.RecWatcher{ + Path: *obj.filename, + Recurse: false, + Opts: []recwatch.Option{ + recwatch.Logf(obj.init.Logf), + recwatch.Debug(obj.init.Debug), + }, + } + if err := obj.recWatcher.Init(); err != nil { + obj.recWatcher = nil + // TODO: should we ignore the error and send ""? + return errwrap.Wrapf(err, "could not watch file") + } + + // FIXME: instead of sending one event here, the recwatch + // library should send one initial event at startup... + startup := make(chan struct{}) + close(startup) + + // watch recwatch events in a proxy goroutine, since + // changing the recwatch object would panic the main + // select when it's nil... + obj.wg.Add(1) + go func() { + defer obj.wg.Done() + for { + var err error + select { + case <-startup: + startup = nil + // send an initial event + + case event, ok := <-obj.recWatcher.Events(): + if !ok { + return // file watcher shut down + } + if err = event.Error; err != nil { + err = errwrap.Wrapf(err, "error event received") + } + } + + select { + case obj.events <- err: + // send event... + + case <-ctx.Done(): + // don't block here on shutdown + return + } + //err = nil // reset + } + }() + continue // wait for an actual event or we'd send empty! + + case err, ok := <-obj.events: + if !ok { + return fmt.Errorf("no more events") + } + if err != nil { + return errwrap.Wrapf(err, "error event received") + } + + if obj.last == nil { + continue // still waiting for input values + } + + args, err := interfaces.StructToCallableArgs(obj.last) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + result, err := obj.Call(ctx, obj.args) + if err != nil { + return err + } + + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { + continue // result didn't change + } + obj.result = result // store new result + + case <-ctx.Done(): + return nil + } + + select { + case obj.init.Output <- obj.result: // send + // pass + + case <-ctx.Done(): + return nil + } + } +} + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *ReadFileWaitFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + if len(args) < 1 { + return nil, fmt.Errorf("not enough args") + } + filename := args[0].Str() + + // read file... + content, err := os.ReadFile(filename) + if err != nil && !os.IsNotExist(err) { // ignore file not found errors + return nil, errwrap.Wrapf(err, "error reading file") + } + s := string(content) // convert to string + //if err != nil { // file not exists + // s = "" // if we errored this should already be empty + //} + + return &types.StrValue{ + V: s, + }, nil +}