diff --git a/lang/core/datetime/now_fact.go b/lang/core/datetime/now_fact.go index c2a76168..2b0e784f 100644 --- a/lang/core/datetime/now_fact.go +++ b/lang/core/datetime/now_fact.go @@ -102,12 +102,22 @@ func (obj *DateTimeFact) Stream(ctx context.Context) error { return nil } + result, err := obj.Call(ctx) + if err != nil { + return err + } + select { - case obj.init.Output <- &types.IntValue{ // seconds since 1970... - V: time.Now().Unix(), // .UTC() not necessary - }: + case obj.init.Output <- result: case <-ctx.Done(): return nil } } } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *DateTimeFact) Call(ctx context.Context) (types.Value, error) { + return &types.IntValue{ // seconds since 1970... + V: time.Now().Unix(), // .UTC() not necessary + }, nil +} diff --git a/lang/core/deploy/abspath.go b/lang/core/deploy/abspath.go index 0cde7663..c68d751a 100644 --- a/lang/core/deploy/abspath.go +++ b/lang/core/deploy/abspath.go @@ -60,8 +60,9 @@ type AbsPathFunc struct { data *interfaces.FuncData last types.Value // last value received to use for diff - path *string // the active path - result *string // last calculated output + args []types.Value + path *string // the active path + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -128,43 +129,64 @@ func (obj *AbsPathFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - path := input.Struct()[absPathArgNamePath].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + path := args[0].Str() // TODO: add validation for absolute path? if obj.path != nil && *obj.path == path { continue // nothing changed } obj.path = &path - p := strings.TrimSuffix(obj.data.Base, "/") - if p == obj.data.Base { // didn't trim, so we fail - // programming error - return fmt.Errorf("no trailing slash on Base, got: `%s`", p) + result, err := obj.Call(ctx, obj.args) + if err != nil { + return err } - result := p - if *obj.path == "" { - result += "/" // add the above trailing slash back - } else if !strings.HasPrefix(*obj.path, "/") { - return fmt.Errorf("path was not absolute, got: `%s`", *obj.path) - //result += "/" // be forgiving ? - } - result += *obj.path - - if obj.result != nil && *obj.result == result { + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass case <-ctx.Done(): return nil } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *AbsPathFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + path := args[0].Str() + + p := strings.TrimSuffix(obj.data.Base, "/") + if p == obj.data.Base { // didn't trim, so we fail + // programming error + return nil, fmt.Errorf("no trailing slash on Base, got: `%s`", p) + } + result := p + + if path == "" { + result += "/" // add the above trailing slash back + } else if !strings.HasPrefix(path, "/") { + return nil, fmt.Errorf("path was not absolute, got: `%s`", path) + //result += "/" // be forgiving ? + } + result += path + + return &types.StrValue{ + V: result, + }, nil +} diff --git a/lang/core/deploy/readfile.go b/lang/core/deploy/readfile.go index 0588de4a..3462ee72 100644 --- a/lang/core/deploy/readfile.go +++ b/lang/core/deploy/readfile.go @@ -61,8 +61,9 @@ type ReadFileFunc struct { data *interfaces.FuncData last types.Value // last value received to use for diff - filename *string // the active filename - result *string // last calculated output + args []types.Value + filename *string // the active filename + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -129,7 +130,13 @@ func (obj *ReadFileFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - filename := input.Struct()[readFileArgNameFilename].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + filename := args[0].Str() // TODO: add validation for absolute path? // TODO: add check for empty string if obj.filename != nil && *obj.filename == filename { @@ -137,48 +144,61 @@ func (obj *ReadFileFunc) Stream(ctx context.Context) error { } obj.filename = &filename - p := strings.TrimSuffix(obj.data.Base, "/") - if p == obj.data.Base { // didn't trim, so we fail - // programming error - return fmt.Errorf("no trailing slash on Base, got: `%s`", p) - } - path := p - - if !strings.HasPrefix(*obj.filename, "/") { - return fmt.Errorf("filename was not absolute, got: `%s`", *obj.filename) - //path += "/" // be forgiving ? - } - path += *obj.filename - - fs, err := obj.init.World.Fs(obj.data.FsURI) // open the remote file system + result, err := obj.Call(ctx, obj.args) if err != nil { - return errwrap.Wrapf(err, "can't load code from file system `%s`", obj.data.FsURI) - } - // this is relative to the module dir the func is in! - content, err := fs.ReadFile(path) // open the remote file system - // We could use it directly, but it feels like less correct. - //content, err := obj.data.Fs.ReadFile(path) // open the remote file system - if err != nil { - return errwrap.Wrapf(err, "can't read file `%s` (%s)", *obj.filename, path) + return err } - result := string(content) // convert to string - - if obj.result != nil && *obj.result == result { + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass case <-ctx.Done(): return nil } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *ReadFileFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + filename := args[0].Str() + + p := strings.TrimSuffix(obj.data.Base, "/") + if p == obj.data.Base { // didn't trim, so we fail + // programming error + return nil, fmt.Errorf("no trailing slash on Base, got: `%s`", p) + } + path := p + + if !strings.HasPrefix(filename, "/") { + return nil, fmt.Errorf("filename was not absolute, got: `%s`", filename) + //path += "/" // be forgiving ? + } + path += filename + + fs, err := obj.init.World.Fs(obj.data.FsURI) // open the remote file system + if err != nil { + return nil, errwrap.Wrapf(err, "can't load code from file system `%s`", obj.data.FsURI) + } + // this is relative to the module dir the func is in! + content, err := fs.ReadFile(path) // open the remote file system + // We could use it directly, but it feels like less correct. + //content, err := obj.data.Fs.ReadFile(path) // open the remote file system + if err != nil { + return nil, errwrap.Wrapf(err, "can't read file `%s` (%s)", filename, path) + } + + return &types.StrValue{ + V: string(content), // convert to string + }, nil +} diff --git a/lang/core/deploy/readfileabs.go b/lang/core/deploy/readfileabs.go index 29cf332a..03d45f22 100644 --- a/lang/core/deploy/readfileabs.go +++ b/lang/core/deploy/readfileabs.go @@ -61,8 +61,9 @@ type ReadFileAbsFunc struct { data *interfaces.FuncData last types.Value // last value received to use for diff - filename *string // the active filename - result *string // last calculated output + args []types.Value + filename *string // the active filename + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -129,7 +130,13 @@ func (obj *ReadFileAbsFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - filename := input.Struct()[readfileArgNameFilename].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + filename := args[0].Str() // TODO: add validation for absolute path? // TODO: add check for empty string if obj.filename != nil && *obj.filename == filename { @@ -137,34 +144,47 @@ func (obj *ReadFileAbsFunc) Stream(ctx context.Context) error { } obj.filename = &filename - fs, err := obj.init.World.Fs(obj.data.FsURI) // open the remote file system + result, err := obj.Call(ctx, obj.args) if err != nil { - return errwrap.Wrapf(err, "can't load code from file system `%s`", obj.data.FsURI) - } - content, err := fs.ReadFile(*obj.filename) // open the remote file system - // We could use it directly, but it feels like less correct. - //content, err := obj.data.Fs.ReadFile(*obj.filename) // open the remote file system - if err != nil { - return errwrap.Wrapf(err, "can't read file `%s`", *obj.filename) + return err } - result := string(content) // convert to string - - if obj.result != nil && *obj.result == result { + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass case <-ctx.Done(): return nil } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *ReadFileAbsFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + filename := args[0].Str() + + fs, err := obj.init.World.Fs(obj.data.FsURI) // open the remote file system + if err != nil { + return nil, errwrap.Wrapf(err, "can't load code from file system `%s`", obj.data.FsURI) + } + content, err := fs.ReadFile(filename) // open the remote file system + // We could use it directly, but it feels like less correct. + //content, err := obj.data.Fs.ReadFile(filename) // open the remote file system + if err != nil { + return nil, errwrap.Wrapf(err, "can't read file `%s`", filename) + } + + return &types.StrValue{ + V: string(content), // convert to string + }, nil +} diff --git a/lang/core/example/flipflop_fact.go b/lang/core/example/flipflop_fact.go index 8747030b..4d3e6500 100644 --- a/lang/core/example/flipflop_fact.go +++ b/lang/core/example/flipflop_fact.go @@ -31,6 +31,7 @@ package coreexample import ( "context" + "sync" "time" "github.com/purpleidea/mgmt/lang/funcs/facts" @@ -52,6 +53,7 @@ func init() { // function which you could specify an interval for. type FlipFlopFact struct { init *facts.Init + mutex *sync.Mutex value bool } @@ -77,6 +79,7 @@ func (obj *FlipFlopFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *FlipFlopFact) Init(init *facts.Init) error { obj.init = init + obj.mutex = &sync.Mutex{} return nil } @@ -100,14 +103,30 @@ func (obj *FlipFlopFact) Stream(ctx context.Context) error { return nil } + result, err := obj.Call(ctx) + if err != nil { + return err + } + + obj.mutex.Lock() + obj.value = !obj.value // flip it + obj.mutex.Unlock() + select { - case obj.init.Output <- &types.BoolValue{ // flip - V: obj.value, - }: + case obj.init.Output <- result: + case <-ctx.Done(): return nil } - - obj.value = !obj.value // flip it } } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *FlipFlopFact) Call(ctx context.Context) (types.Value, error) { + obj.mutex.Lock() // TODO: could be a read lock + value := obj.value + obj.mutex.Unlock() + return &types.BoolValue{ + V: value, + }, nil +} diff --git a/lang/core/example/vumeter.go b/lang/core/example/vumeter.go index ec9e0075..dad1b02d 100644 --- a/lang/core/example/vumeter.go +++ b/lang/core/example/vumeter.go @@ -66,11 +66,8 @@ type VUMeterFunc struct { init *interfaces.Init last types.Value // last value received to use for diff - symbol string - multiplier int64 - peak float64 - - result *string // last calculated output + args []types.Value + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -172,9 +169,12 @@ func (obj *VUMeterFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - obj.symbol = input.Struct()[vuMeterArgNameSymbol].Str() - obj.multiplier = input.Struct()[vuMeterArgNameMultiplier].Int() - obj.peak = input.Struct()[vuMeterArgNamePeak].Float() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + once.Do(onceFunc) continue // we must wrap around and go in through goChan @@ -185,66 +185,83 @@ func (obj *VUMeterFunc) Stream(ctx context.Context) error { continue // still waiting for input values } - // record for one second to a shared memory file - // rec /dev/shm/mgmt_rec.wav trim 0 1 2>/dev/null - args1 := []string{"/dev/shm/mgmt_rec.wav", "trim", "0", "1"} - cmd1 := exec.Command("/usr/bin/rec", args1...) - // XXX: arecord stopped working on newer linux... - // arecord -d 1 /dev/shm/mgmt_rec.wav 2>/dev/null - //args1 := []string{"-d", "1", "/dev/shm/mgmt_rec.wav"} - //cmd1 := exec.Command("/usr/bin/arecord", args1...) - cmd1.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, - Pgid: 0, - } - // start the command - if _, err := cmd1.Output(); err != nil { - return errwrap.Wrapf(err, "cmd failed to run") - } - - // sox -t .wav /dev/shm/mgmt_rec.wav -n stat 2>&1 | grep "Maximum amplitude" | cut -d ':' -f 2 - args2 := []string{"-t", ".wav", "/dev/shm/mgmt_rec.wav", "-n", "stat"} - cmd2 := exec.Command("/usr/bin/sox", args2...) - cmd2.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, - Pgid: 0, - } - - // start the command - out, err := cmd2.CombinedOutput() // data comes on stderr + result, err := obj.Call(ctx, obj.args) if err != nil { - return errwrap.Wrapf(err, "cmd failed to run") + return err } - ratio, err := extract(out) - if err != nil { - return errwrap.Wrapf(err, "failed to extract") - } - - result, err := visual(obj.symbol, int(obj.multiplier), obj.peak, ratio) - if err != nil { - return errwrap.Wrapf(err, "could not generate visual") - } - - if obj.result != nil && *obj.result == result { + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass case <-ctx.Done(): return nil } } } +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *VUMeterFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + symbol := args[0].Str() + multiplier := args[1].Int() + peak := args[2].Float() + + // record for one second to a shared memory file + // rec /dev/shm/mgmt_rec.wav trim 0 1 2>/dev/null + args1 := []string{"/dev/shm/mgmt_rec.wav", "trim", "0", "1"} + cmd1 := exec.CommandContext(ctx, "/usr/bin/rec", args1...) + // XXX: arecord stopped working on newer linux... + // arecord -d 1 /dev/shm/mgmt_rec.wav 2>/dev/null + //args1 := []string{"-d", "1", "/dev/shm/mgmt_rec.wav"} + //cmd1 := exec.CommandContext(ctx, "/usr/bin/arecord", args1...) + cmd1.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + Pgid: 0, + } + // start the command + if _, err := cmd1.Output(); err != nil { + return nil, errwrap.Wrapf(err, "cmd failed to run") + } + + // sox -t .wav /dev/shm/mgmt_rec.wav -n stat 2>&1 | grep "Maximum amplitude" | cut -d ':' -f 2 + args2 := []string{"-t", ".wav", "/dev/shm/mgmt_rec.wav", "-n", "stat"} + cmd2 := exec.CommandContext(ctx, "/usr/bin/sox", args2...) + cmd2.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + Pgid: 0, + } + + // start the command + out, err := cmd2.CombinedOutput() // data comes on stderr + if err != nil { + return nil, errwrap.Wrapf(err, "cmd failed to run") + } + + ratio, err := extract(out) + if err != nil { + return nil, errwrap.Wrapf(err, "failed to extract") + } + + result, err := visual(symbol, int(multiplier), peak, ratio) + if err != nil { + return nil, errwrap.Wrapf(err, "could not generate visual") + } + + return &types.StrValue{ + V: result, + }, nil +} + func newTicker() *time.Ticker { return time.NewTicker(time.Duration(1) * time.Second) } diff --git a/lang/core/fmt/printf.go b/lang/core/fmt/printf.go index 43e436af..4a8faa42 100644 --- a/lang/core/fmt/printf.go +++ b/lang/core/fmt/printf.go @@ -92,7 +92,7 @@ type PrintfFunc struct { init *interfaces.Init last types.Value // last value received to use for diff - result *string // last calculated output + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -325,40 +325,57 @@ func (obj *PrintfFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - format := input.Struct()[printfArgNameFormat].Str() - values := []types.Value{} - for _, name := range obj.Type.Ord { - if name == printfArgNameFormat { // skip format arg - continue - } - x := input.Struct()[name] - values = append(values, x) - } - - result, err := compileFormatToString(format, values) + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) if err != nil { - return err // no errwrap needed b/c helper func + return err } - if obj.result != nil && *obj.result == result { + result, err := obj.Call(ctx, args) + if err != nil { + return err + } + + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass case <-ctx.Done(): return nil } } } +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *PrintfFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + format := args[0].Str() + + values := []types.Value{} + for i, x := range args { + if i == 0 { // skip format arg + continue + } + values = append(values, x) + } + + result, err := compileFormatToString(format, values) + if err != nil { + return nil, err // no errwrap needed b/c helper func + } + return &types.StrValue{ + V: result, + }, nil +} + // valueToString prints our values how we expect for printf. // FIXME: if this turns out to be useful, add it to the types package. func valueToString(value types.Value) string { diff --git a/lang/core/golang/template.go b/lang/core/golang/template.go index a0ca05ea..41d69246 100644 --- a/lang/core/golang/template.go +++ b/lang/core/golang/template.go @@ -86,7 +86,7 @@ type TemplateFunc struct { init *interfaces.Init last types.Value // last value received to use for diff - result *string // last calculated output + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -364,38 +364,54 @@ func (obj *TemplateFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - st := input.Struct() - - tmpl := st[templateArgNameTemplate].Str() - vars, exists := st[templateArgNameVars] - if !exists { - vars = nil - } - - result, err := obj.run(ctx, tmpl, vars) + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) if err != nil { - return err // no errwrap needed b/c helper func + return err } - if obj.result != nil && *obj.result == result { + result, err := obj.Call(ctx, args) + if err != nil { + return err + } + + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass case <-ctx.Done(): return nil } } } +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *TemplateFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + tmpl := args[0].Str() + + var vars types.Value // nil + if len(args) == 2 { + vars = args[1] + } + + result, err := obj.run(ctx, tmpl, vars) + if err != nil { + return nil, err // no errwrap needed b/c helper func + } + return &types.StrValue{ + V: result, + }, nil +} + // safename renames the functions so they're valid inside the template. This is // a limitation of the template library, and it might be worth moving to a new // one. diff --git a/lang/core/lookup.go b/lang/core/lookup.go index 355ad721..a4947b8b 100644 --- a/lang/core/lookup.go +++ b/lang/core/lookup.go @@ -179,6 +179,12 @@ func (obj *LookupFunc) Build(typ *types.Type) (*types.Type, error) { // programming error return nil, err } + + if _, ok := f.(interfaces.CallableFunc); !ok { + // programming error + return nil, fmt.Errorf("not a CallableFunc") + } + bf, ok := f.(interfaces.BuildableFunc) if !ok { // programming error @@ -230,3 +236,13 @@ func (obj *LookupFunc) Stream(ctx context.Context) error { } return obj.fn.Stream(ctx) } + +// Call returns the result of this function. +func (obj *LookupFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + cf, ok := obj.fn.(interfaces.CallableFunc) + if !ok { + // programming error + return nil, fmt.Errorf("not a CallableFunc") + } + return cf.Call(ctx, args) +} diff --git a/lang/core/lookup_default.go b/lang/core/lookup_default.go index 756868ce..d40c9c1c 100644 --- a/lang/core/lookup_default.go +++ b/lang/core/lookup_default.go @@ -124,6 +124,12 @@ func (obj *LookupDefaultFunc) Build(typ *types.Type) (*types.Type, error) { // programming error return nil, err } + + if _, ok := f.(interfaces.CallableFunc); !ok { + // programming error + return nil, fmt.Errorf("not a CallableFunc") + } + bf, ok := f.(interfaces.BuildableFunc) if !ok { // programming error @@ -175,3 +181,13 @@ func (obj *LookupDefaultFunc) Stream(ctx context.Context) error { } return obj.fn.Stream(ctx) } + +// Call returns the result of this function. +func (obj *LookupDefaultFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + cf, ok := obj.fn.(interfaces.CallableFunc) + if !ok { + // programming error + return nil, fmt.Errorf("not a CallableFunc") + } + return cf.Call(ctx, args) +} diff --git a/lang/core/os/readfile.go b/lang/core/os/readfile.go index 2774a7e1..2351d903 100644 --- a/lang/core/os/readfile.go +++ b/lang/core/os/readfile.go @@ -62,12 +62,13 @@ type ReadFileFunc struct { init *interfaces.Init last types.Value // last value received to use for diff - filename *string // the active filename recWatcher *recwatch.RecWatcher events chan error // internal events wg *sync.WaitGroup - result *string // last calculated output + args []types.Value + filename *string // the active filename + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -214,28 +215,49 @@ func (obj *ReadFileFunc) Stream(ctx context.Context) error { continue // still waiting for input values } - // read file... - content, err := os.ReadFile(*obj.filename) + args, err := interfaces.StructToCallableArgs(obj.last) // []types.Value, error) if err != nil { - return errwrap.Wrapf(err, "error reading file") + return err } - result := string(content) // convert to string + obj.args = args - if obj.result != nil && *obj.result == result { + result, err := obj.Call(ctx, obj.args) + if err != nil { + return err + } + + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass + case <-ctx.Done(): return nil } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *ReadFileFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + filename := args[0].Str() + + // read file... + content, err := os.ReadFile(filename) + if err != nil { + return nil, errwrap.Wrapf(err, "error reading file") + } + + return &types.StrValue{ + V: string(content), // convert to string + }, nil +} diff --git a/lang/core/struct_lookup.go b/lang/core/struct_lookup.go index 57e7f099..40875362 100644 --- a/lang/core/struct_lookup.go +++ b/lang/core/struct_lookup.go @@ -336,3 +336,27 @@ func (obj *StructLookupFunc) Stream(ctx context.Context) error { } } } + +// Call returns the result of this function. +func (obj *StructLookupFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + st := args[0].(*types.StructValue) + field := args[1].Str() + + if field == "" { + return nil, fmt.Errorf("received empty field") + } + // TODO: Is it a hack to grab this first value? + if obj.field == "" { + // This can happen at compile time too. Bonus! + obj.field = field // store first field + } + if field != obj.field { + return nil, fmt.Errorf("input field changed from: `%s`, to: `%s`", obj.field, field) + } + result, exists := st.Lookup(obj.field) + if !exists { + return nil, fmt.Errorf("could not lookup field: `%s` in struct", field) + } + + return result, nil +} diff --git a/lang/core/struct_lookup_optional.go b/lang/core/struct_lookup_optional.go index 813143d4..7f46d2fe 100644 --- a/lang/core/struct_lookup_optional.go +++ b/lang/core/struct_lookup_optional.go @@ -342,3 +342,31 @@ func (obj *StructLookupOptionalFunc) Stream(ctx context.Context) error { } } } + +// Call returns the result of this function. +func (obj *StructLookupOptionalFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + st := args[0].(*types.StructValue) + field := args[1].Str() + optional := args[2] + + if field == "" { + return nil, fmt.Errorf("received empty field") + } + // TODO: Is it a hack to grab this first value? + if obj.field == "" { + // This can happen at compile time too. Bonus! + obj.field = field // store first field + } + if field != obj.field { + return nil, fmt.Errorf("input field changed from: `%s`, to: `%s`", obj.field, field) + } + + // We know the result of this lookup statically at compile time, but for + // simplicity we check each time here anyways. Maybe one day there will + // be a fancy reason why this might vary over time. + val, exists := st.Lookup(obj.field) + if !exists { + return optional, nil + } + return val, nil +} diff --git a/lang/core/sys/cpucount_fact.go b/lang/core/sys/cpucount_fact.go index 7bec5fb7..8fc9a2a8 100644 --- a/lang/core/sys/cpucount_fact.go +++ b/lang/core/sys/cpucount_fact.go @@ -63,7 +63,8 @@ func init() { // CPUCountFact is a fact that returns the current CPU count. type CPUCountFact struct { - init *facts.Init + init *facts.Init + result types.Value // last calculated output } // String returns a simple name for this fact. This is needed so this struct can @@ -109,8 +110,6 @@ func (obj CPUCountFact) Stream(ctx context.Context) error { closeChan := make(chan struct{}) // channel to unblock selects in goroutine defer close(closeChan) - var once bool // did we send at least once? - // wait for kernel to poke us about new device changes on the system wg.Add(1) go func() { @@ -134,16 +133,10 @@ func (obj CPUCountFact) Stream(ctx context.Context) error { startChan := make(chan struct{}) close(startChan) // trigger the first event - var cpuCount, newCount int64 = 0, -1 for { select { case <-startChan: startChan = nil // disable - newCount, err = getCPUCount() - if err != nil { - obj.init.Logf("Could not get initial CPU count. Setting to zero.") - } - // TODO: would we rather error instead of sending zero? case event, ok := <-eventChan: if !ok { @@ -155,34 +148,46 @@ func (obj CPUCountFact) Stream(ctx context.Context) error { if obj.init.Debug { obj.init.Logf("received uevent SEQNUM: %s", event.uevent.Data["SEQNUM"]) } - if isCPUEvent(event.uevent) { - newCount, err = getCPUCount() - if err != nil { - obj.init.Logf("could not getCPUCount: %e", err) - continue - } + if !isCPUEvent(event.uevent) { + continue } + case <-ctx.Done(): return nil } - if once && newCount == cpuCount { - continue + result, err := obj.Call(ctx) + if err != nil { + return err } - cpuCount = newCount + + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { + continue // result didn't change + } + obj.result = result // store new result select { - case obj.init.Output <- &types.IntValue{ - V: cpuCount, - }: - once = true - // send + case obj.init.Output <- result: case <-ctx.Done(): return nil } } } +// Call this fact and return the value if it is possible to do so at this time. +func (obj *CPUCountFact) Call(ctx context.Context) (types.Value, error) { + count, err := getCPUCount() // TODO: ctx? + if err != nil { + return nil, errwrap.Wrapf(err, "could not get CPU count") + } + + return &types.IntValue{ + V: int64(count), + }, nil + +} + // getCPUCount looks in sysfs to get the number of CPUs that are online. func getCPUCount() (int64, error) { dat, err := os.ReadFile("/sys/devices/system/cpu/online") diff --git a/lang/core/sys/load_fact.go b/lang/core/sys/load_fact.go index 460ee470..9e4c03d1 100644 --- a/lang/core/sys/load_fact.go +++ b/lang/core/sys/load_fact.go @@ -103,23 +103,32 @@ func (obj *LoadFact) Stream(ctx context.Context) error { return nil } - x1, x5, x15, err := load() + result, err := obj.Call(ctx) if err != nil { - return errwrap.Wrapf(err, "could not read load values") - } - - st := types.NewStruct(types.NewType(loadSignature)) - for k, v := range map[string]float64{"x1": x1, "x5": x5, "x15": x15} { - if err := st.Set(k, &types.FloatValue{V: v}); err != nil { - return errwrap.Wrapf(err, "struct could not set key: `%s`", k) - } + return err } select { - case obj.init.Output <- st: - // send + case obj.init.Output <- result: case <-ctx.Done(): return nil } } } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *LoadFact) Call(ctx context.Context) (types.Value, error) { + x1, x5, x15, err := load() + if err != nil { + return nil, errwrap.Wrapf(err, "could not read load values") + } + + st := types.NewStruct(types.NewType(loadSignature)) + for k, v := range map[string]float64{"x1": x1, "x5": x5, "x15": x15} { + if err := st.Set(k, &types.FloatValue{V: v}); err != nil { + return nil, errwrap.Wrapf(err, "struct could not set key: `%s`", k) + } + } + + return st, nil +} diff --git a/lang/core/sys/uptime_fact.go b/lang/core/sys/uptime_fact.go index eabb54ac..5a8ea21e 100644 --- a/lang/core/sys/uptime_fact.go +++ b/lang/core/sys/uptime_fact.go @@ -90,16 +90,27 @@ func (obj *UptimeFact) Stream(ctx context.Context) error { return nil } - uptime, err := uptime() + result, err := obj.Call(ctx) if err != nil { - return errwrap.Wrapf(err, "could not read uptime value") + return err } select { - case obj.init.Output <- &types.IntValue{V: uptime}: - // send + case obj.init.Output <- result: case <-ctx.Done(): return nil } } } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *UptimeFact) Call(ctx context.Context) (types.Value, error) { + uptime, err := uptime() // TODO: add ctx? + if err != nil { + return nil, errwrap.Wrapf(err, "could not read uptime value") + } + + return &types.IntValue{ + V: uptime, + }, nil +} diff --git a/lang/core/test/fastcount_fact.go b/lang/core/test/fastcount_fact.go index 65a5c5c2..3de528ca 100644 --- a/lang/core/test/fastcount_fact.go +++ b/lang/core/test/fastcount_fact.go @@ -31,6 +31,7 @@ package coretest import ( "context" + "sync" "github.com/purpleidea/mgmt/lang/funcs/facts" "github.com/purpleidea/mgmt/lang/types" @@ -50,6 +51,9 @@ func init() { // FastCountFact is a fact that counts up as fast as possible from zero forever. type FastCountFact struct { init *facts.Init + + mutex *sync.Mutex + count int } // String returns a simple name for this fact. This is needed so this struct can @@ -74,6 +78,7 @@ func (obj *FastCountFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *FastCountFact) Init(init *facts.Init) error { obj.init = init + obj.mutex = &sync.Mutex{} return nil } @@ -81,16 +86,32 @@ func (obj *FastCountFact) Init(init *facts.Init) error { func (obj *FastCountFact) Stream(ctx context.Context) error { defer close(obj.init.Output) // always signal when we're done - count := int64(0) - // streams must generate an initial event on startup for { + result, err := obj.Call(ctx) + if err != nil { + return err + } + + obj.mutex.Lock() + obj.count++ + obj.mutex.Unlock() + select { - case obj.init.Output <- &types.IntValue{V: count}: - count++ + case obj.init.Output <- result: case <-ctx.Done(): return nil } } } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *FastCountFact) Call(ctx context.Context) (types.Value, error) { + obj.mutex.Lock() // TODO: could be a read lock + count := obj.count + obj.mutex.Unlock() + return &types.IntValue{ + V: int64(count), + }, nil +} diff --git a/lang/core/test/oneinstance_fact.go b/lang/core/test/oneinstance_fact.go index f8f2d034..827c9ee4 100644 --- a/lang/core/test/oneinstance_fact.go +++ b/lang/core/test/oneinstance_fact.go @@ -243,13 +243,24 @@ func (obj *OneInstanceFact) Init(init *facts.Init) error { func (obj *OneInstanceFact) Stream(ctx context.Context) error { obj.init.Logf("Stream of `%s` @ %p", obj.Name, obj) defer close(obj.init.Output) // always signal when we're done + + result, err := obj.Call(ctx) + if err != nil { + return err + } + select { - case obj.init.Output <- &types.StrValue{ - V: msg, - }: + case obj.init.Output <- result: case <-ctx.Done(): return nil } return nil } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *OneInstanceFact) Call(ctx context.Context) (types.Value, error) { + return &types.StrValue{ + V: msg, + }, nil +} diff --git a/lang/core/value/get.go b/lang/core/value/get.go index 83534e78..1671e4e4 100644 --- a/lang/core/value/get.go +++ b/lang/core/value/get.go @@ -77,6 +77,8 @@ func init() { funcs.ModuleRegister(ModuleName, GetFloatFuncName, func() interfaces.Func { return &GetFunc{Type: types.TypeFloat} }) } +var _ interfaces.CallableFunc = &GetFunc{} + // GetFunc is special function which looks up the stored `Any` field in the // value resource that it gets it from. If it is initialized with a fixed Type // field, then it becomes a statically typed version that can only return keys @@ -88,7 +90,8 @@ type GetFunc struct { init *interfaces.Init - key string + key string + args []types.Value last types.Value result types.Value // last calculated output @@ -229,7 +232,13 @@ func (obj *GetFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - key := input.Struct()[getArgNameKey].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + key := args[0].Str() if key == "" { return fmt.Errorf("can't use an empty key") } @@ -263,7 +272,7 @@ func (obj *GetFunc) Stream(ctx context.Context) error { // return errwrap.Wrapf(err, "channel watch failed on `%s`", obj.key) //} - result, err := obj.getValue(ctx) // get the value... + result, err := obj.Call(ctx, obj.args) // get the value... if err != nil { return err } @@ -287,8 +296,12 @@ func (obj *GetFunc) Stream(ctx context.Context) error { } } -// getValue gets the value we're looking for. -func (obj *GetFunc) getValue(ctx context.Context) (types.Value, error) { +// Call this function with the input args and return the value if it is possible +// to do so at this time. This was previously getValue which gets the value +// we're looking for. +func (obj *GetFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + key := args[0].Str() + typ, exists := obj.Info().Sig.Out.Map[getFieldNameValue] // type of value field if !exists || typ == nil { // programming error @@ -303,9 +316,9 @@ func (obj *GetFunc) getValue(ctx context.Context) (types.Value, error) { // step that might be needed if the value started out empty... // TODO: We could even add a stored: bool field in the returned struct! isReady := true // assume true - val, err := obj.init.Local.ValueGet(ctx, obj.key) + val, err := obj.init.Local.ValueGet(ctx, key) if err != nil { - return nil, errwrap.Wrapf(err, "channel read failed on `%s`", obj.key) + return nil, errwrap.Wrapf(err, "channel read failed on `%s`", key) } if val == nil { // val doesn't exist isReady = false @@ -324,7 +337,7 @@ func (obj *GetFunc) getValue(ctx context.Context) (types.Value, error) { // an str for example, this error happens... Do we want // to: (1) coerce? -- no; (2) error? -- yep for now; (3) // improve type unification? -- if it's possible, yes. - return nil, errwrap.Wrapf(err, "type mismatch, check type in Value[%s]", obj.key) + return nil, errwrap.Wrapf(err, "type mismatch, check type in Value[%s]", key) } } diff --git a/lang/core/world/getval.go b/lang/core/world/getval.go index bcbbc10f..b6b5d940 100644 --- a/lang/core/world/getval.go +++ b/lang/core/world/getval.go @@ -55,12 +55,15 @@ func init() { funcs.ModuleRegister(ModuleName, GetValFuncName, func() interfaces.Func { return &GetValFunc{} }) } +var _ interfaces.CallableFunc = &GetValFunc{} + // GetValFunc is special function which returns the value of a given key in the // exposed world. type GetValFunc struct { init *interfaces.Init - key string + key string + args []types.Value last types.Value result types.Value // last calculated output @@ -132,7 +135,13 @@ func (obj *GetValFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - key := input.Struct()[getValArgNameKey].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + key := args[0].Str() if key == "" { return fmt.Errorf("can't use an empty key") } @@ -169,7 +178,7 @@ func (obj *GetValFunc) Stream(ctx context.Context) error { return errwrap.Wrapf(err, "channel watch failed on `%s`", obj.key) } - result, err := obj.getValue(ctx) // get the value... + result, err := obj.Call(ctx, obj.args) // get the value... if err != nil { return err } @@ -193,14 +202,17 @@ func (obj *GetValFunc) Stream(ctx context.Context) error { } } -// getValue gets the value we're looking for. -func (obj *GetValFunc) getValue(ctx context.Context) (types.Value, error) { +// Call this function with the input args and return the value if it is possible +// to do so at this time. This was previously getValue which gets the value +// we're looking for. +func (obj *GetValFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + key := args[0].Str() exists := true // assume true - val, err := obj.init.World.StrGet(ctx, obj.key) + val, err := obj.init.World.StrGet(ctx, key) if err != nil && obj.init.World.StrIsNotExist(err) { exists = false // val doesn't exist } else if err != nil { - return nil, errwrap.Wrapf(err, "channel read failed on `%s`", obj.key) + return nil, errwrap.Wrapf(err, "channel read failed on `%s`", key) } s := &types.StrValue{V: val} diff --git a/lang/core/world/kvlookup.go b/lang/core/world/kvlookup.go index 39f45ed5..728b0d53 100644 --- a/lang/core/world/kvlookup.go +++ b/lang/core/world/kvlookup.go @@ -51,12 +51,15 @@ func init() { funcs.ModuleRegister(ModuleName, KVLookupFuncName, func() interfaces.Func { return &KVLookupFunc{} }) } +var _ interfaces.CallableFunc = &KVLookupFunc{} + // KVLookupFunc is special function which returns all the values of a given key // in the exposed world. It is similar to exchange, but it does not set a key. type KVLookupFunc struct { init *interfaces.Init namespace string + args []types.Value last types.Value result types.Value // last calculated output @@ -127,7 +130,13 @@ func (obj *KVLookupFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - namespace := input.Struct()[kvLookupArgNameNamespace].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + namespace := args[0].Str() if namespace == "" { return fmt.Errorf("can't use an empty namespace") } @@ -145,7 +154,7 @@ func (obj *KVLookupFunc) Stream(ctx context.Context) error { return err } - result, err := obj.buildMap(ctx) // build the map... + result, err := obj.Call(ctx, obj.args) // build the map... if err != nil { return err } @@ -174,7 +183,7 @@ func (obj *KVLookupFunc) Stream(ctx context.Context) error { return errwrap.Wrapf(err, "channel watch failed on `%s`", obj.namespace) } - result, err := obj.buildMap(ctx) // build the map... + result, err := obj.Call(ctx, obj.args) // build the map... if err != nil { return err } @@ -198,11 +207,14 @@ func (obj *KVLookupFunc) Stream(ctx context.Context) error { } } -// buildMap builds the result map which we'll need. It uses struct variables. -func (obj *KVLookupFunc) buildMap(ctx context.Context) (types.Value, error) { - keyMap, err := obj.init.World.StrMapGet(ctx, obj.namespace) +// Call this function with the input args and return the value if it is possible +// to do so at this time. This was previously buildMap, which builds the result +// map which we'll need. It uses struct variables. +func (obj *KVLookupFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + namespace := args[0].Str() + keyMap, err := obj.init.World.StrMapGet(ctx, namespace) if err != nil { - return nil, errwrap.Wrapf(err, "channel read failed on `%s`", obj.namespace) + return nil, errwrap.Wrapf(err, "channel read failed on `%s`", namespace) } d := types.NewMap(obj.Info().Sig.Out) diff --git a/lang/core/world/schedule.go b/lang/core/world/schedule.go index e9748001..0bb963a5 100644 --- a/lang/core/world/schedule.go +++ b/lang/core/world/schedule.go @@ -88,6 +88,8 @@ type ScheduleFunc struct { init *interfaces.Init + args []types.Value + namespace string scheduler *scheduler.Result @@ -303,7 +305,15 @@ func (obj *ScheduleFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - namespace := input.Struct()[scheduleArgNameNamespace].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + namespace := args[0].Str() + + //namespace := input.Struct()[scheduleArgNameNamespace].Str() if namespace == "" { return fmt.Errorf("can't use an empty namespace") } diff --git a/lang/funcs/facts/facts.go b/lang/funcs/facts/facts.go index 72668516..2b1a33d1 100644 --- a/lang/funcs/facts/facts.go +++ b/lang/funcs/facts/facts.go @@ -106,4 +106,7 @@ type Fact interface { Info() *Info Init(*Init) error Stream(context.Context) error + + // TODO: should we require this here? What about a CallableFact instead? + Call(context.Context) (types.Value, error) } diff --git a/lang/funcs/facts/func.go b/lang/funcs/facts/func.go index dd731fe0..0c98b0be 100644 --- a/lang/funcs/facts/func.go +++ b/lang/funcs/facts/func.go @@ -94,3 +94,8 @@ func (obj *FactFunc) Init(init *interfaces.Init) error { func (obj *FactFunc) Stream(ctx context.Context) error { return obj.Fact.Stream(ctx) } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *FactFunc) Call(ctx context.Context, _ []types.Value) (types.Value, error) { + return obj.Fact.Call(ctx) +} diff --git a/lang/funcs/operators/operators.go b/lang/funcs/operators/operators.go index f37b7fb5..c8b6fb07 100644 --- a/lang/funcs/operators/operators.go +++ b/lang/funcs/operators/operators.go @@ -64,6 +64,9 @@ func init() { "func(float, float) float", // floating-point addition }), F: func(ctx context.Context, input []types.Value) (types.Value, error) { + if l := len(input); l != 2 { // catch programming bugs + return nil, fmt.Errorf("invalid len %d", l) + } switch k := input[0].Type().Kind; k { case types.KindStr: return &types.StrValue{ @@ -476,6 +479,9 @@ type OperatorFunc struct { init *interfaces.Init last types.Value // last value received to use for diff + lastOp string + fn interfaces.FuncSig + result types.Value // last calculated output } @@ -654,8 +660,6 @@ func (obj *OperatorFunc) Init(init *interfaces.Init) error { // Stream returns the changing values that this func has over time. func (obj *OperatorFunc) Stream(ctx context.Context) error { - var op, lastOp string - var fn interfaces.FuncSig defer close(obj.init.Output) // the sender closes for { select { @@ -685,43 +689,12 @@ func (obj *OperatorFunc) Stream(ctx context.Context) error { return fmt.Errorf("bad args, got: %v, want: %v", keys, obj.Type.Ord) } - // build up arg list - args := []types.Value{} - for _, name := range obj.Type.Ord { - v, exists := input.Struct()[name] - if !exists { - // programming error - return fmt.Errorf("function engine was early, missing arg: %s", name) - } - if name == operatorArgName { - op = v.Str() - continue // skip over the operator arg - } - args = append(args, v) + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err } - if op == "" { - // programming error - return fmt.Errorf("operator cannot be empty, args: %v", keys) - } - // operator selection is dynamic now, although mostly it - // should not change... to do so is probably uncommon... - if fn == nil { - fn = obj.findFunc(op) - - } else if op != lastOp { - // TODO: check sig is compatible instead? - return fmt.Errorf("op changed from %s to %s", lastOp, op) - } - - if fn == nil { - return fmt.Errorf("func not found for operator `%s` with sig: `%+v`", op, obj.Type) - } - lastOp = op - - var result types.Value - - result, err := fn(ctx, args) // (Value, error) + result, err := obj.Call(ctx, args) // (Value, error) if err != nil { return errwrap.Wrapf(err, "problem running function") } @@ -749,6 +722,42 @@ func (obj *OperatorFunc) Stream(ctx context.Context) error { } } +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *OperatorFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + op := args[0].Str() + + if op == "" { + // programming error + return nil, fmt.Errorf("operator cannot be empty, args: %v", args) + } + + // operator selection is dynamic now, although mostly it + // should not change... to do so is probably uncommon... + if obj.fn == nil { + obj.fn = obj.findFunc(op) + + } else if op != obj.lastOp { + // TODO: check sig is compatible instead? + return nil, fmt.Errorf("op changed from %s to %s", obj.lastOp, op) + } + + if obj.fn == nil { + return nil, fmt.Errorf("func not found for operator `%s` with sig: `%+v`", op, obj.Type) + } + obj.lastOp = op + + newArgs := []types.Value{} + for i, x := range args { + if i == 0 { + continue // skip over the operator + } + newArgs = append(newArgs, x) + } + + return obj.fn(ctx, newArgs) // (Value, error) +} + // removeOperatorArg returns a copy of the input KindFunc type, without the // operator arg which specifies which operator we're using. It *is* idempotent. func removeOperatorArg(typ *types.Type) *types.Type { diff --git a/lang/funcs/structs/channel_based_sink.go b/lang/funcs/structs/channel_based_sink.go index b05575c2..983fe631 100644 --- a/lang/funcs/structs/channel_based_sink.go +++ b/lang/funcs/structs/channel_based_sink.go @@ -108,15 +108,20 @@ func (obj *ChannelBasedSinkFunc) Stream(ctx context.Context) error { return nil // can't output any more } - value, exists := input.Struct()[obj.EdgeName] - if !exists { - return fmt.Errorf("programming error, can't find edge") + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err } - if obj.last != nil && value.Cmp(obj.last) == nil { + result, err := obj.Call(ctx, args) // get the value... + if err != nil { + return err + } + + if obj.last != nil && result.Cmp(obj.last) == nil { continue // value didn't change, skip it } - obj.last = value // store so we can send after this select + obj.last = result // store so we can send after this select case <-ctx.Done(): return nil @@ -139,3 +144,13 @@ func (obj *ChannelBasedSinkFunc) Stream(ctx context.Context) error { } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +// XXX: Is is correct to implement this here for this particular function? +func (obj *ChannelBasedSinkFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + if len(args) != 1 { + return nil, fmt.Errorf("programming error, can't find edge") + } + return args[0], nil +} diff --git a/lang/funcs/structs/channel_based_source.go b/lang/funcs/structs/channel_based_source.go index e1e63b5d..96d71543 100644 --- a/lang/funcs/structs/channel_based_source.go +++ b/lang/funcs/structs/channel_based_source.go @@ -113,3 +113,13 @@ func (obj *ChannelBasedSourceFunc) Stream(ctx context.Context) error { } } } + +// XXX: Is is correct to implement this here for this particular function? +// XXX: tricky since this really receives input from a secret channel... +// XXX: ADD A MUTEX AROUND READING obj.last ??? +//func (obj *ChannelBasedSourceFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { +// if obj.last == nil { +// return nil, fmt.Errorf("programming error") +// } +// return obj.last, nil +//} diff --git a/lang/funcs/structs/if.go b/lang/funcs/structs/if.go index 14cc72ef..253c3647 100644 --- a/lang/funcs/structs/if.go +++ b/lang/funcs/structs/if.go @@ -115,12 +115,14 @@ func (obj *IfFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - var result types.Value + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } - if input.Struct()["c"].Bool() { - result = input.Struct()["a"] // true branch - } else { - result = input.Struct()["b"] // false branch + result, err := obj.Call(ctx, args) // get the value... + if err != nil { + return err } // skip sending an update... @@ -141,3 +143,13 @@ func (obj *IfFunc) Stream(ctx context.Context) error { } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +// XXX: Is is correct to implement this here for this particular function? +func (obj *IfFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + if c := args[0].Bool(); c { + return args[1], nil // true branch + } + return args[2], nil +} diff --git a/lang/funcs/wrapped/wrapped.go b/lang/funcs/wrapped/wrapped.go index 96d409e8..f6cc6f16 100644 --- a/lang/funcs/wrapped/wrapped.go +++ b/lang/funcs/wrapped/wrapped.go @@ -150,7 +150,7 @@ func (obj *Func) Stream(ctx context.Context) error { if obj.init.Debug { obj.init.Logf("Calling function with: %+v", values) } - result, err := obj.Fn.Call(ctx, values) // (Value, error) + result, err := obj.Call(ctx, values) // (Value, error) if err != nil { if obj.init.Debug { obj.init.Logf("Function returned error: %+v", err) @@ -181,3 +181,9 @@ func (obj *Func) Stream(ctx context.Context) error { } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *Func) Call(ctx context.Context, args []types.Value) (types.Value, error) { + return obj.Fn.Call(ctx, args) // (Value, error) +}