From 642c6b952fc16b5ed23c3f6335c8cca9f28ea3f6 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Sun, 16 Mar 2025 23:23:57 -0400 Subject: [PATCH] lang: core, funcs: Port some functions to CallableFunc API Some modern features of our function engine and language might require this new API, so port what we can and figure out the rest later. --- lang/core/datetime/now_fact.go | 16 ++- lang/core/deploy/abspath.go | 64 +++++++---- lang/core/deploy/readfile.go | 84 +++++++++------ lang/core/deploy/readfileabs.go | 56 ++++++---- lang/core/example/flipflop_fact.go | 29 ++++- lang/core/example/vumeter.go | 119 ++++++++++++--------- lang/core/fmt/printf.go | 53 +++++---- lang/core/golang/template.go | 48 ++++++--- lang/core/lookup.go | 16 +++ lang/core/lookup_default.go | 16 +++ lang/core/os/readfile.go | 44 ++++++-- lang/core/struct_lookup.go | 24 +++++ lang/core/struct_lookup_optional.go | 28 +++++ lang/core/sys/cpucount_fact.go | 51 +++++---- lang/core/sys/load_fact.go | 31 ++++-- lang/core/sys/uptime_fact.go | 19 +++- lang/core/test/fastcount_fact.go | 29 ++++- lang/core/test/oneinstance_fact.go | 17 ++- lang/core/value/get.go | 29 +++-- lang/core/world/getval.go | 26 +++-- lang/core/world/kvlookup.go | 26 +++-- lang/core/world/schedule.go | 12 ++- lang/funcs/facts/facts.go | 3 + lang/funcs/facts/func.go | 5 + lang/funcs/operators/operators.go | 83 +++++++------- lang/funcs/structs/channel_based_sink.go | 25 ++++- lang/funcs/structs/channel_based_source.go | 10 ++ lang/funcs/structs/if.go | 22 +++- lang/funcs/wrapped/wrapped.go | 8 +- 29 files changed, 702 insertions(+), 291 deletions(-) diff --git a/lang/core/datetime/now_fact.go b/lang/core/datetime/now_fact.go index c2a76168..2b0e784f 100644 --- a/lang/core/datetime/now_fact.go +++ b/lang/core/datetime/now_fact.go @@ -102,12 +102,22 @@ func (obj *DateTimeFact) Stream(ctx context.Context) error { return nil } + result, err := obj.Call(ctx) + if err != nil { + return err + } + select { - case obj.init.Output <- &types.IntValue{ // seconds since 1970... - V: time.Now().Unix(), // .UTC() not necessary - }: + case obj.init.Output <- result: case <-ctx.Done(): return nil } } } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *DateTimeFact) Call(ctx context.Context) (types.Value, error) { + return &types.IntValue{ // seconds since 1970... + V: time.Now().Unix(), // .UTC() not necessary + }, nil +} diff --git a/lang/core/deploy/abspath.go b/lang/core/deploy/abspath.go index 0cde7663..c68d751a 100644 --- a/lang/core/deploy/abspath.go +++ b/lang/core/deploy/abspath.go @@ -60,8 +60,9 @@ type AbsPathFunc struct { data *interfaces.FuncData last types.Value // last value received to use for diff - path *string // the active path - result *string // last calculated output + args []types.Value + path *string // the active path + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -128,43 +129,64 @@ func (obj *AbsPathFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - path := input.Struct()[absPathArgNamePath].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + path := args[0].Str() // TODO: add validation for absolute path? if obj.path != nil && *obj.path == path { continue // nothing changed } obj.path = &path - p := strings.TrimSuffix(obj.data.Base, "/") - if p == obj.data.Base { // didn't trim, so we fail - // programming error - return fmt.Errorf("no trailing slash on Base, got: `%s`", p) + result, err := obj.Call(ctx, obj.args) + if err != nil { + return err } - result := p - if *obj.path == "" { - result += "/" // add the above trailing slash back - } else if !strings.HasPrefix(*obj.path, "/") { - return fmt.Errorf("path was not absolute, got: `%s`", *obj.path) - //result += "/" // be forgiving ? - } - result += *obj.path - - if obj.result != nil && *obj.result == result { + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass case <-ctx.Done(): return nil } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *AbsPathFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + path := args[0].Str() + + p := strings.TrimSuffix(obj.data.Base, "/") + if p == obj.data.Base { // didn't trim, so we fail + // programming error + return nil, fmt.Errorf("no trailing slash on Base, got: `%s`", p) + } + result := p + + if path == "" { + result += "/" // add the above trailing slash back + } else if !strings.HasPrefix(path, "/") { + return nil, fmt.Errorf("path was not absolute, got: `%s`", path) + //result += "/" // be forgiving ? + } + result += path + + return &types.StrValue{ + V: result, + }, nil +} diff --git a/lang/core/deploy/readfile.go b/lang/core/deploy/readfile.go index 0588de4a..3462ee72 100644 --- a/lang/core/deploy/readfile.go +++ b/lang/core/deploy/readfile.go @@ -61,8 +61,9 @@ type ReadFileFunc struct { data *interfaces.FuncData last types.Value // last value received to use for diff - filename *string // the active filename - result *string // last calculated output + args []types.Value + filename *string // the active filename + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -129,7 +130,13 @@ func (obj *ReadFileFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - filename := input.Struct()[readFileArgNameFilename].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + filename := args[0].Str() // TODO: add validation for absolute path? // TODO: add check for empty string if obj.filename != nil && *obj.filename == filename { @@ -137,48 +144,61 @@ func (obj *ReadFileFunc) Stream(ctx context.Context) error { } obj.filename = &filename - p := strings.TrimSuffix(obj.data.Base, "/") - if p == obj.data.Base { // didn't trim, so we fail - // programming error - return fmt.Errorf("no trailing slash on Base, got: `%s`", p) - } - path := p - - if !strings.HasPrefix(*obj.filename, "/") { - return fmt.Errorf("filename was not absolute, got: `%s`", *obj.filename) - //path += "/" // be forgiving ? - } - path += *obj.filename - - fs, err := obj.init.World.Fs(obj.data.FsURI) // open the remote file system + result, err := obj.Call(ctx, obj.args) if err != nil { - return errwrap.Wrapf(err, "can't load code from file system `%s`", obj.data.FsURI) - } - // this is relative to the module dir the func is in! - content, err := fs.ReadFile(path) // open the remote file system - // We could use it directly, but it feels like less correct. - //content, err := obj.data.Fs.ReadFile(path) // open the remote file system - if err != nil { - return errwrap.Wrapf(err, "can't read file `%s` (%s)", *obj.filename, path) + return err } - result := string(content) // convert to string - - if obj.result != nil && *obj.result == result { + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass case <-ctx.Done(): return nil } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *ReadFileFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + filename := args[0].Str() + + p := strings.TrimSuffix(obj.data.Base, "/") + if p == obj.data.Base { // didn't trim, so we fail + // programming error + return nil, fmt.Errorf("no trailing slash on Base, got: `%s`", p) + } + path := p + + if !strings.HasPrefix(filename, "/") { + return nil, fmt.Errorf("filename was not absolute, got: `%s`", filename) + //path += "/" // be forgiving ? + } + path += filename + + fs, err := obj.init.World.Fs(obj.data.FsURI) // open the remote file system + if err != nil { + return nil, errwrap.Wrapf(err, "can't load code from file system `%s`", obj.data.FsURI) + } + // this is relative to the module dir the func is in! + content, err := fs.ReadFile(path) // open the remote file system + // We could use it directly, but it feels like less correct. + //content, err := obj.data.Fs.ReadFile(path) // open the remote file system + if err != nil { + return nil, errwrap.Wrapf(err, "can't read file `%s` (%s)", filename, path) + } + + return &types.StrValue{ + V: string(content), // convert to string + }, nil +} diff --git a/lang/core/deploy/readfileabs.go b/lang/core/deploy/readfileabs.go index 29cf332a..03d45f22 100644 --- a/lang/core/deploy/readfileabs.go +++ b/lang/core/deploy/readfileabs.go @@ -61,8 +61,9 @@ type ReadFileAbsFunc struct { data *interfaces.FuncData last types.Value // last value received to use for diff - filename *string // the active filename - result *string // last calculated output + args []types.Value + filename *string // the active filename + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -129,7 +130,13 @@ func (obj *ReadFileAbsFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - filename := input.Struct()[readfileArgNameFilename].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + filename := args[0].Str() // TODO: add validation for absolute path? // TODO: add check for empty string if obj.filename != nil && *obj.filename == filename { @@ -137,34 +144,47 @@ func (obj *ReadFileAbsFunc) Stream(ctx context.Context) error { } obj.filename = &filename - fs, err := obj.init.World.Fs(obj.data.FsURI) // open the remote file system + result, err := obj.Call(ctx, obj.args) if err != nil { - return errwrap.Wrapf(err, "can't load code from file system `%s`", obj.data.FsURI) - } - content, err := fs.ReadFile(*obj.filename) // open the remote file system - // We could use it directly, but it feels like less correct. - //content, err := obj.data.Fs.ReadFile(*obj.filename) // open the remote file system - if err != nil { - return errwrap.Wrapf(err, "can't read file `%s`", *obj.filename) + return err } - result := string(content) // convert to string - - if obj.result != nil && *obj.result == result { + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass case <-ctx.Done(): return nil } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *ReadFileAbsFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + filename := args[0].Str() + + fs, err := obj.init.World.Fs(obj.data.FsURI) // open the remote file system + if err != nil { + return nil, errwrap.Wrapf(err, "can't load code from file system `%s`", obj.data.FsURI) + } + content, err := fs.ReadFile(filename) // open the remote file system + // We could use it directly, but it feels like less correct. + //content, err := obj.data.Fs.ReadFile(filename) // open the remote file system + if err != nil { + return nil, errwrap.Wrapf(err, "can't read file `%s`", filename) + } + + return &types.StrValue{ + V: string(content), // convert to string + }, nil +} diff --git a/lang/core/example/flipflop_fact.go b/lang/core/example/flipflop_fact.go index 8747030b..4d3e6500 100644 --- a/lang/core/example/flipflop_fact.go +++ b/lang/core/example/flipflop_fact.go @@ -31,6 +31,7 @@ package coreexample import ( "context" + "sync" "time" "github.com/purpleidea/mgmt/lang/funcs/facts" @@ -52,6 +53,7 @@ func init() { // function which you could specify an interval for. type FlipFlopFact struct { init *facts.Init + mutex *sync.Mutex value bool } @@ -77,6 +79,7 @@ func (obj *FlipFlopFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *FlipFlopFact) Init(init *facts.Init) error { obj.init = init + obj.mutex = &sync.Mutex{} return nil } @@ -100,14 +103,30 @@ func (obj *FlipFlopFact) Stream(ctx context.Context) error { return nil } + result, err := obj.Call(ctx) + if err != nil { + return err + } + + obj.mutex.Lock() + obj.value = !obj.value // flip it + obj.mutex.Unlock() + select { - case obj.init.Output <- &types.BoolValue{ // flip - V: obj.value, - }: + case obj.init.Output <- result: + case <-ctx.Done(): return nil } - - obj.value = !obj.value // flip it } } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *FlipFlopFact) Call(ctx context.Context) (types.Value, error) { + obj.mutex.Lock() // TODO: could be a read lock + value := obj.value + obj.mutex.Unlock() + return &types.BoolValue{ + V: value, + }, nil +} diff --git a/lang/core/example/vumeter.go b/lang/core/example/vumeter.go index ec9e0075..dad1b02d 100644 --- a/lang/core/example/vumeter.go +++ b/lang/core/example/vumeter.go @@ -66,11 +66,8 @@ type VUMeterFunc struct { init *interfaces.Init last types.Value // last value received to use for diff - symbol string - multiplier int64 - peak float64 - - result *string // last calculated output + args []types.Value + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -172,9 +169,12 @@ func (obj *VUMeterFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - obj.symbol = input.Struct()[vuMeterArgNameSymbol].Str() - obj.multiplier = input.Struct()[vuMeterArgNameMultiplier].Int() - obj.peak = input.Struct()[vuMeterArgNamePeak].Float() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + once.Do(onceFunc) continue // we must wrap around and go in through goChan @@ -185,66 +185,83 @@ func (obj *VUMeterFunc) Stream(ctx context.Context) error { continue // still waiting for input values } - // record for one second to a shared memory file - // rec /dev/shm/mgmt_rec.wav trim 0 1 2>/dev/null - args1 := []string{"/dev/shm/mgmt_rec.wav", "trim", "0", "1"} - cmd1 := exec.Command("/usr/bin/rec", args1...) - // XXX: arecord stopped working on newer linux... - // arecord -d 1 /dev/shm/mgmt_rec.wav 2>/dev/null - //args1 := []string{"-d", "1", "/dev/shm/mgmt_rec.wav"} - //cmd1 := exec.Command("/usr/bin/arecord", args1...) - cmd1.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, - Pgid: 0, - } - // start the command - if _, err := cmd1.Output(); err != nil { - return errwrap.Wrapf(err, "cmd failed to run") - } - - // sox -t .wav /dev/shm/mgmt_rec.wav -n stat 2>&1 | grep "Maximum amplitude" | cut -d ':' -f 2 - args2 := []string{"-t", ".wav", "/dev/shm/mgmt_rec.wav", "-n", "stat"} - cmd2 := exec.Command("/usr/bin/sox", args2...) - cmd2.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, - Pgid: 0, - } - - // start the command - out, err := cmd2.CombinedOutput() // data comes on stderr + result, err := obj.Call(ctx, obj.args) if err != nil { - return errwrap.Wrapf(err, "cmd failed to run") + return err } - ratio, err := extract(out) - if err != nil { - return errwrap.Wrapf(err, "failed to extract") - } - - result, err := visual(obj.symbol, int(obj.multiplier), obj.peak, ratio) - if err != nil { - return errwrap.Wrapf(err, "could not generate visual") - } - - if obj.result != nil && *obj.result == result { + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass case <-ctx.Done(): return nil } } } +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *VUMeterFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + symbol := args[0].Str() + multiplier := args[1].Int() + peak := args[2].Float() + + // record for one second to a shared memory file + // rec /dev/shm/mgmt_rec.wav trim 0 1 2>/dev/null + args1 := []string{"/dev/shm/mgmt_rec.wav", "trim", "0", "1"} + cmd1 := exec.CommandContext(ctx, "/usr/bin/rec", args1...) + // XXX: arecord stopped working on newer linux... + // arecord -d 1 /dev/shm/mgmt_rec.wav 2>/dev/null + //args1 := []string{"-d", "1", "/dev/shm/mgmt_rec.wav"} + //cmd1 := exec.CommandContext(ctx, "/usr/bin/arecord", args1...) + cmd1.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + Pgid: 0, + } + // start the command + if _, err := cmd1.Output(); err != nil { + return nil, errwrap.Wrapf(err, "cmd failed to run") + } + + // sox -t .wav /dev/shm/mgmt_rec.wav -n stat 2>&1 | grep "Maximum amplitude" | cut -d ':' -f 2 + args2 := []string{"-t", ".wav", "/dev/shm/mgmt_rec.wav", "-n", "stat"} + cmd2 := exec.CommandContext(ctx, "/usr/bin/sox", args2...) + cmd2.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + Pgid: 0, + } + + // start the command + out, err := cmd2.CombinedOutput() // data comes on stderr + if err != nil { + return nil, errwrap.Wrapf(err, "cmd failed to run") + } + + ratio, err := extract(out) + if err != nil { + return nil, errwrap.Wrapf(err, "failed to extract") + } + + result, err := visual(symbol, int(multiplier), peak, ratio) + if err != nil { + return nil, errwrap.Wrapf(err, "could not generate visual") + } + + return &types.StrValue{ + V: result, + }, nil +} + func newTicker() *time.Ticker { return time.NewTicker(time.Duration(1) * time.Second) } diff --git a/lang/core/fmt/printf.go b/lang/core/fmt/printf.go index 43e436af..4a8faa42 100644 --- a/lang/core/fmt/printf.go +++ b/lang/core/fmt/printf.go @@ -92,7 +92,7 @@ type PrintfFunc struct { init *interfaces.Init last types.Value // last value received to use for diff - result *string // last calculated output + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -325,40 +325,57 @@ func (obj *PrintfFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - format := input.Struct()[printfArgNameFormat].Str() - values := []types.Value{} - for _, name := range obj.Type.Ord { - if name == printfArgNameFormat { // skip format arg - continue - } - x := input.Struct()[name] - values = append(values, x) - } - - result, err := compileFormatToString(format, values) + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) if err != nil { - return err // no errwrap needed b/c helper func + return err } - if obj.result != nil && *obj.result == result { + result, err := obj.Call(ctx, args) + if err != nil { + return err + } + + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass case <-ctx.Done(): return nil } } } +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *PrintfFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + format := args[0].Str() + + values := []types.Value{} + for i, x := range args { + if i == 0 { // skip format arg + continue + } + values = append(values, x) + } + + result, err := compileFormatToString(format, values) + if err != nil { + return nil, err // no errwrap needed b/c helper func + } + return &types.StrValue{ + V: result, + }, nil +} + // valueToString prints our values how we expect for printf. // FIXME: if this turns out to be useful, add it to the types package. func valueToString(value types.Value) string { diff --git a/lang/core/golang/template.go b/lang/core/golang/template.go index a0ca05ea..41d69246 100644 --- a/lang/core/golang/template.go +++ b/lang/core/golang/template.go @@ -86,7 +86,7 @@ type TemplateFunc struct { init *interfaces.Init last types.Value // last value received to use for diff - result *string // last calculated output + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -364,38 +364,54 @@ func (obj *TemplateFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - st := input.Struct() - - tmpl := st[templateArgNameTemplate].Str() - vars, exists := st[templateArgNameVars] - if !exists { - vars = nil - } - - result, err := obj.run(ctx, tmpl, vars) + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) if err != nil { - return err // no errwrap needed b/c helper func + return err } - if obj.result != nil && *obj.result == result { + result, err := obj.Call(ctx, args) + if err != nil { + return err + } + + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass case <-ctx.Done(): return nil } } } +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *TemplateFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + tmpl := args[0].Str() + + var vars types.Value // nil + if len(args) == 2 { + vars = args[1] + } + + result, err := obj.run(ctx, tmpl, vars) + if err != nil { + return nil, err // no errwrap needed b/c helper func + } + return &types.StrValue{ + V: result, + }, nil +} + // safename renames the functions so they're valid inside the template. This is // a limitation of the template library, and it might be worth moving to a new // one. diff --git a/lang/core/lookup.go b/lang/core/lookup.go index 355ad721..a4947b8b 100644 --- a/lang/core/lookup.go +++ b/lang/core/lookup.go @@ -179,6 +179,12 @@ func (obj *LookupFunc) Build(typ *types.Type) (*types.Type, error) { // programming error return nil, err } + + if _, ok := f.(interfaces.CallableFunc); !ok { + // programming error + return nil, fmt.Errorf("not a CallableFunc") + } + bf, ok := f.(interfaces.BuildableFunc) if !ok { // programming error @@ -230,3 +236,13 @@ func (obj *LookupFunc) Stream(ctx context.Context) error { } return obj.fn.Stream(ctx) } + +// Call returns the result of this function. +func (obj *LookupFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + cf, ok := obj.fn.(interfaces.CallableFunc) + if !ok { + // programming error + return nil, fmt.Errorf("not a CallableFunc") + } + return cf.Call(ctx, args) +} diff --git a/lang/core/lookup_default.go b/lang/core/lookup_default.go index 756868ce..d40c9c1c 100644 --- a/lang/core/lookup_default.go +++ b/lang/core/lookup_default.go @@ -124,6 +124,12 @@ func (obj *LookupDefaultFunc) Build(typ *types.Type) (*types.Type, error) { // programming error return nil, err } + + if _, ok := f.(interfaces.CallableFunc); !ok { + // programming error + return nil, fmt.Errorf("not a CallableFunc") + } + bf, ok := f.(interfaces.BuildableFunc) if !ok { // programming error @@ -175,3 +181,13 @@ func (obj *LookupDefaultFunc) Stream(ctx context.Context) error { } return obj.fn.Stream(ctx) } + +// Call returns the result of this function. +func (obj *LookupDefaultFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + cf, ok := obj.fn.(interfaces.CallableFunc) + if !ok { + // programming error + return nil, fmt.Errorf("not a CallableFunc") + } + return cf.Call(ctx, args) +} diff --git a/lang/core/os/readfile.go b/lang/core/os/readfile.go index 2774a7e1..2351d903 100644 --- a/lang/core/os/readfile.go +++ b/lang/core/os/readfile.go @@ -62,12 +62,13 @@ type ReadFileFunc struct { init *interfaces.Init last types.Value // last value received to use for diff - filename *string // the active filename recWatcher *recwatch.RecWatcher events chan error // internal events wg *sync.WaitGroup - result *string // last calculated output + args []types.Value + filename *string // the active filename + result types.Value // last calculated output } // String returns a simple name for this function. This is needed so this struct @@ -214,28 +215,49 @@ func (obj *ReadFileFunc) Stream(ctx context.Context) error { continue // still waiting for input values } - // read file... - content, err := os.ReadFile(*obj.filename) + args, err := interfaces.StructToCallableArgs(obj.last) // []types.Value, error) if err != nil { - return errwrap.Wrapf(err, "error reading file") + return err } - result := string(content) // convert to string + obj.args = args - if obj.result != nil && *obj.result == result { + result, err := obj.Call(ctx, obj.args) + if err != nil { + return err + } + + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { continue // result didn't change } - obj.result = &result // store new result + obj.result = result // store new result case <-ctx.Done(): return nil } select { - case obj.init.Output <- &types.StrValue{ - V: *obj.result, - }: + case obj.init.Output <- obj.result: // send + // pass + case <-ctx.Done(): return nil } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *ReadFileFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + filename := args[0].Str() + + // read file... + content, err := os.ReadFile(filename) + if err != nil { + return nil, errwrap.Wrapf(err, "error reading file") + } + + return &types.StrValue{ + V: string(content), // convert to string + }, nil +} diff --git a/lang/core/struct_lookup.go b/lang/core/struct_lookup.go index 57e7f099..40875362 100644 --- a/lang/core/struct_lookup.go +++ b/lang/core/struct_lookup.go @@ -336,3 +336,27 @@ func (obj *StructLookupFunc) Stream(ctx context.Context) error { } } } + +// Call returns the result of this function. +func (obj *StructLookupFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + st := args[0].(*types.StructValue) + field := args[1].Str() + + if field == "" { + return nil, fmt.Errorf("received empty field") + } + // TODO: Is it a hack to grab this first value? + if obj.field == "" { + // This can happen at compile time too. Bonus! + obj.field = field // store first field + } + if field != obj.field { + return nil, fmt.Errorf("input field changed from: `%s`, to: `%s`", obj.field, field) + } + result, exists := st.Lookup(obj.field) + if !exists { + return nil, fmt.Errorf("could not lookup field: `%s` in struct", field) + } + + return result, nil +} diff --git a/lang/core/struct_lookup_optional.go b/lang/core/struct_lookup_optional.go index 813143d4..7f46d2fe 100644 --- a/lang/core/struct_lookup_optional.go +++ b/lang/core/struct_lookup_optional.go @@ -342,3 +342,31 @@ func (obj *StructLookupOptionalFunc) Stream(ctx context.Context) error { } } } + +// Call returns the result of this function. +func (obj *StructLookupOptionalFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + st := args[0].(*types.StructValue) + field := args[1].Str() + optional := args[2] + + if field == "" { + return nil, fmt.Errorf("received empty field") + } + // TODO: Is it a hack to grab this first value? + if obj.field == "" { + // This can happen at compile time too. Bonus! + obj.field = field // store first field + } + if field != obj.field { + return nil, fmt.Errorf("input field changed from: `%s`, to: `%s`", obj.field, field) + } + + // We know the result of this lookup statically at compile time, but for + // simplicity we check each time here anyways. Maybe one day there will + // be a fancy reason why this might vary over time. + val, exists := st.Lookup(obj.field) + if !exists { + return optional, nil + } + return val, nil +} diff --git a/lang/core/sys/cpucount_fact.go b/lang/core/sys/cpucount_fact.go index 7bec5fb7..8fc9a2a8 100644 --- a/lang/core/sys/cpucount_fact.go +++ b/lang/core/sys/cpucount_fact.go @@ -63,7 +63,8 @@ func init() { // CPUCountFact is a fact that returns the current CPU count. type CPUCountFact struct { - init *facts.Init + init *facts.Init + result types.Value // last calculated output } // String returns a simple name for this fact. This is needed so this struct can @@ -109,8 +110,6 @@ func (obj CPUCountFact) Stream(ctx context.Context) error { closeChan := make(chan struct{}) // channel to unblock selects in goroutine defer close(closeChan) - var once bool // did we send at least once? - // wait for kernel to poke us about new device changes on the system wg.Add(1) go func() { @@ -134,16 +133,10 @@ func (obj CPUCountFact) Stream(ctx context.Context) error { startChan := make(chan struct{}) close(startChan) // trigger the first event - var cpuCount, newCount int64 = 0, -1 for { select { case <-startChan: startChan = nil // disable - newCount, err = getCPUCount() - if err != nil { - obj.init.Logf("Could not get initial CPU count. Setting to zero.") - } - // TODO: would we rather error instead of sending zero? case event, ok := <-eventChan: if !ok { @@ -155,34 +148,46 @@ func (obj CPUCountFact) Stream(ctx context.Context) error { if obj.init.Debug { obj.init.Logf("received uevent SEQNUM: %s", event.uevent.Data["SEQNUM"]) } - if isCPUEvent(event.uevent) { - newCount, err = getCPUCount() - if err != nil { - obj.init.Logf("could not getCPUCount: %e", err) - continue - } + if !isCPUEvent(event.uevent) { + continue } + case <-ctx.Done(): return nil } - if once && newCount == cpuCount { - continue + result, err := obj.Call(ctx) + if err != nil { + return err } - cpuCount = newCount + + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { + continue // result didn't change + } + obj.result = result // store new result select { - case obj.init.Output <- &types.IntValue{ - V: cpuCount, - }: - once = true - // send + case obj.init.Output <- result: case <-ctx.Done(): return nil } } } +// Call this fact and return the value if it is possible to do so at this time. +func (obj *CPUCountFact) Call(ctx context.Context) (types.Value, error) { + count, err := getCPUCount() // TODO: ctx? + if err != nil { + return nil, errwrap.Wrapf(err, "could not get CPU count") + } + + return &types.IntValue{ + V: int64(count), + }, nil + +} + // getCPUCount looks in sysfs to get the number of CPUs that are online. func getCPUCount() (int64, error) { dat, err := os.ReadFile("/sys/devices/system/cpu/online") diff --git a/lang/core/sys/load_fact.go b/lang/core/sys/load_fact.go index 460ee470..9e4c03d1 100644 --- a/lang/core/sys/load_fact.go +++ b/lang/core/sys/load_fact.go @@ -103,23 +103,32 @@ func (obj *LoadFact) Stream(ctx context.Context) error { return nil } - x1, x5, x15, err := load() + result, err := obj.Call(ctx) if err != nil { - return errwrap.Wrapf(err, "could not read load values") - } - - st := types.NewStruct(types.NewType(loadSignature)) - for k, v := range map[string]float64{"x1": x1, "x5": x5, "x15": x15} { - if err := st.Set(k, &types.FloatValue{V: v}); err != nil { - return errwrap.Wrapf(err, "struct could not set key: `%s`", k) - } + return err } select { - case obj.init.Output <- st: - // send + case obj.init.Output <- result: case <-ctx.Done(): return nil } } } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *LoadFact) Call(ctx context.Context) (types.Value, error) { + x1, x5, x15, err := load() + if err != nil { + return nil, errwrap.Wrapf(err, "could not read load values") + } + + st := types.NewStruct(types.NewType(loadSignature)) + for k, v := range map[string]float64{"x1": x1, "x5": x5, "x15": x15} { + if err := st.Set(k, &types.FloatValue{V: v}); err != nil { + return nil, errwrap.Wrapf(err, "struct could not set key: `%s`", k) + } + } + + return st, nil +} diff --git a/lang/core/sys/uptime_fact.go b/lang/core/sys/uptime_fact.go index eabb54ac..5a8ea21e 100644 --- a/lang/core/sys/uptime_fact.go +++ b/lang/core/sys/uptime_fact.go @@ -90,16 +90,27 @@ func (obj *UptimeFact) Stream(ctx context.Context) error { return nil } - uptime, err := uptime() + result, err := obj.Call(ctx) if err != nil { - return errwrap.Wrapf(err, "could not read uptime value") + return err } select { - case obj.init.Output <- &types.IntValue{V: uptime}: - // send + case obj.init.Output <- result: case <-ctx.Done(): return nil } } } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *UptimeFact) Call(ctx context.Context) (types.Value, error) { + uptime, err := uptime() // TODO: add ctx? + if err != nil { + return nil, errwrap.Wrapf(err, "could not read uptime value") + } + + return &types.IntValue{ + V: uptime, + }, nil +} diff --git a/lang/core/test/fastcount_fact.go b/lang/core/test/fastcount_fact.go index 65a5c5c2..3de528ca 100644 --- a/lang/core/test/fastcount_fact.go +++ b/lang/core/test/fastcount_fact.go @@ -31,6 +31,7 @@ package coretest import ( "context" + "sync" "github.com/purpleidea/mgmt/lang/funcs/facts" "github.com/purpleidea/mgmt/lang/types" @@ -50,6 +51,9 @@ func init() { // FastCountFact is a fact that counts up as fast as possible from zero forever. type FastCountFact struct { init *facts.Init + + mutex *sync.Mutex + count int } // String returns a simple name for this fact. This is needed so this struct can @@ -74,6 +78,7 @@ func (obj *FastCountFact) Info() *facts.Info { // Init runs some startup code for this fact. func (obj *FastCountFact) Init(init *facts.Init) error { obj.init = init + obj.mutex = &sync.Mutex{} return nil } @@ -81,16 +86,32 @@ func (obj *FastCountFact) Init(init *facts.Init) error { func (obj *FastCountFact) Stream(ctx context.Context) error { defer close(obj.init.Output) // always signal when we're done - count := int64(0) - // streams must generate an initial event on startup for { + result, err := obj.Call(ctx) + if err != nil { + return err + } + + obj.mutex.Lock() + obj.count++ + obj.mutex.Unlock() + select { - case obj.init.Output <- &types.IntValue{V: count}: - count++ + case obj.init.Output <- result: case <-ctx.Done(): return nil } } } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *FastCountFact) Call(ctx context.Context) (types.Value, error) { + obj.mutex.Lock() // TODO: could be a read lock + count := obj.count + obj.mutex.Unlock() + return &types.IntValue{ + V: int64(count), + }, nil +} diff --git a/lang/core/test/oneinstance_fact.go b/lang/core/test/oneinstance_fact.go index f8f2d034..827c9ee4 100644 --- a/lang/core/test/oneinstance_fact.go +++ b/lang/core/test/oneinstance_fact.go @@ -243,13 +243,24 @@ func (obj *OneInstanceFact) Init(init *facts.Init) error { func (obj *OneInstanceFact) Stream(ctx context.Context) error { obj.init.Logf("Stream of `%s` @ %p", obj.Name, obj) defer close(obj.init.Output) // always signal when we're done + + result, err := obj.Call(ctx) + if err != nil { + return err + } + select { - case obj.init.Output <- &types.StrValue{ - V: msg, - }: + case obj.init.Output <- result: case <-ctx.Done(): return nil } return nil } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *OneInstanceFact) Call(ctx context.Context) (types.Value, error) { + return &types.StrValue{ + V: msg, + }, nil +} diff --git a/lang/core/value/get.go b/lang/core/value/get.go index 83534e78..1671e4e4 100644 --- a/lang/core/value/get.go +++ b/lang/core/value/get.go @@ -77,6 +77,8 @@ func init() { funcs.ModuleRegister(ModuleName, GetFloatFuncName, func() interfaces.Func { return &GetFunc{Type: types.TypeFloat} }) } +var _ interfaces.CallableFunc = &GetFunc{} + // GetFunc is special function which looks up the stored `Any` field in the // value resource that it gets it from. If it is initialized with a fixed Type // field, then it becomes a statically typed version that can only return keys @@ -88,7 +90,8 @@ type GetFunc struct { init *interfaces.Init - key string + key string + args []types.Value last types.Value result types.Value // last calculated output @@ -229,7 +232,13 @@ func (obj *GetFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - key := input.Struct()[getArgNameKey].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + key := args[0].Str() if key == "" { return fmt.Errorf("can't use an empty key") } @@ -263,7 +272,7 @@ func (obj *GetFunc) Stream(ctx context.Context) error { // return errwrap.Wrapf(err, "channel watch failed on `%s`", obj.key) //} - result, err := obj.getValue(ctx) // get the value... + result, err := obj.Call(ctx, obj.args) // get the value... if err != nil { return err } @@ -287,8 +296,12 @@ func (obj *GetFunc) Stream(ctx context.Context) error { } } -// getValue gets the value we're looking for. -func (obj *GetFunc) getValue(ctx context.Context) (types.Value, error) { +// Call this function with the input args and return the value if it is possible +// to do so at this time. This was previously getValue which gets the value +// we're looking for. +func (obj *GetFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + key := args[0].Str() + typ, exists := obj.Info().Sig.Out.Map[getFieldNameValue] // type of value field if !exists || typ == nil { // programming error @@ -303,9 +316,9 @@ func (obj *GetFunc) getValue(ctx context.Context) (types.Value, error) { // step that might be needed if the value started out empty... // TODO: We could even add a stored: bool field in the returned struct! isReady := true // assume true - val, err := obj.init.Local.ValueGet(ctx, obj.key) + val, err := obj.init.Local.ValueGet(ctx, key) if err != nil { - return nil, errwrap.Wrapf(err, "channel read failed on `%s`", obj.key) + return nil, errwrap.Wrapf(err, "channel read failed on `%s`", key) } if val == nil { // val doesn't exist isReady = false @@ -324,7 +337,7 @@ func (obj *GetFunc) getValue(ctx context.Context) (types.Value, error) { // an str for example, this error happens... Do we want // to: (1) coerce? -- no; (2) error? -- yep for now; (3) // improve type unification? -- if it's possible, yes. - return nil, errwrap.Wrapf(err, "type mismatch, check type in Value[%s]", obj.key) + return nil, errwrap.Wrapf(err, "type mismatch, check type in Value[%s]", key) } } diff --git a/lang/core/world/getval.go b/lang/core/world/getval.go index bcbbc10f..b6b5d940 100644 --- a/lang/core/world/getval.go +++ b/lang/core/world/getval.go @@ -55,12 +55,15 @@ func init() { funcs.ModuleRegister(ModuleName, GetValFuncName, func() interfaces.Func { return &GetValFunc{} }) } +var _ interfaces.CallableFunc = &GetValFunc{} + // GetValFunc is special function which returns the value of a given key in the // exposed world. type GetValFunc struct { init *interfaces.Init - key string + key string + args []types.Value last types.Value result types.Value // last calculated output @@ -132,7 +135,13 @@ func (obj *GetValFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - key := input.Struct()[getValArgNameKey].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + key := args[0].Str() if key == "" { return fmt.Errorf("can't use an empty key") } @@ -169,7 +178,7 @@ func (obj *GetValFunc) Stream(ctx context.Context) error { return errwrap.Wrapf(err, "channel watch failed on `%s`", obj.key) } - result, err := obj.getValue(ctx) // get the value... + result, err := obj.Call(ctx, obj.args) // get the value... if err != nil { return err } @@ -193,14 +202,17 @@ func (obj *GetValFunc) Stream(ctx context.Context) error { } } -// getValue gets the value we're looking for. -func (obj *GetValFunc) getValue(ctx context.Context) (types.Value, error) { +// Call this function with the input args and return the value if it is possible +// to do so at this time. This was previously getValue which gets the value +// we're looking for. +func (obj *GetValFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + key := args[0].Str() exists := true // assume true - val, err := obj.init.World.StrGet(ctx, obj.key) + val, err := obj.init.World.StrGet(ctx, key) if err != nil && obj.init.World.StrIsNotExist(err) { exists = false // val doesn't exist } else if err != nil { - return nil, errwrap.Wrapf(err, "channel read failed on `%s`", obj.key) + return nil, errwrap.Wrapf(err, "channel read failed on `%s`", key) } s := &types.StrValue{V: val} diff --git a/lang/core/world/kvlookup.go b/lang/core/world/kvlookup.go index 39f45ed5..728b0d53 100644 --- a/lang/core/world/kvlookup.go +++ b/lang/core/world/kvlookup.go @@ -51,12 +51,15 @@ func init() { funcs.ModuleRegister(ModuleName, KVLookupFuncName, func() interfaces.Func { return &KVLookupFunc{} }) } +var _ interfaces.CallableFunc = &KVLookupFunc{} + // KVLookupFunc is special function which returns all the values of a given key // in the exposed world. It is similar to exchange, but it does not set a key. type KVLookupFunc struct { init *interfaces.Init namespace string + args []types.Value last types.Value result types.Value // last calculated output @@ -127,7 +130,13 @@ func (obj *KVLookupFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - namespace := input.Struct()[kvLookupArgNameNamespace].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + namespace := args[0].Str() if namespace == "" { return fmt.Errorf("can't use an empty namespace") } @@ -145,7 +154,7 @@ func (obj *KVLookupFunc) Stream(ctx context.Context) error { return err } - result, err := obj.buildMap(ctx) // build the map... + result, err := obj.Call(ctx, obj.args) // build the map... if err != nil { return err } @@ -174,7 +183,7 @@ func (obj *KVLookupFunc) Stream(ctx context.Context) error { return errwrap.Wrapf(err, "channel watch failed on `%s`", obj.namespace) } - result, err := obj.buildMap(ctx) // build the map... + result, err := obj.Call(ctx, obj.args) // build the map... if err != nil { return err } @@ -198,11 +207,14 @@ func (obj *KVLookupFunc) Stream(ctx context.Context) error { } } -// buildMap builds the result map which we'll need. It uses struct variables. -func (obj *KVLookupFunc) buildMap(ctx context.Context) (types.Value, error) { - keyMap, err := obj.init.World.StrMapGet(ctx, obj.namespace) +// Call this function with the input args and return the value if it is possible +// to do so at this time. This was previously buildMap, which builds the result +// map which we'll need. It uses struct variables. +func (obj *KVLookupFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + namespace := args[0].Str() + keyMap, err := obj.init.World.StrMapGet(ctx, namespace) if err != nil { - return nil, errwrap.Wrapf(err, "channel read failed on `%s`", obj.namespace) + return nil, errwrap.Wrapf(err, "channel read failed on `%s`", namespace) } d := types.NewMap(obj.Info().Sig.Out) diff --git a/lang/core/world/schedule.go b/lang/core/world/schedule.go index e9748001..0bb963a5 100644 --- a/lang/core/world/schedule.go +++ b/lang/core/world/schedule.go @@ -88,6 +88,8 @@ type ScheduleFunc struct { init *interfaces.Init + args []types.Value + namespace string scheduler *scheduler.Result @@ -303,7 +305,15 @@ func (obj *ScheduleFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - namespace := input.Struct()[scheduleArgNameNamespace].Str() + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + namespace := args[0].Str() + + //namespace := input.Struct()[scheduleArgNameNamespace].Str() if namespace == "" { return fmt.Errorf("can't use an empty namespace") } diff --git a/lang/funcs/facts/facts.go b/lang/funcs/facts/facts.go index 72668516..2b1a33d1 100644 --- a/lang/funcs/facts/facts.go +++ b/lang/funcs/facts/facts.go @@ -106,4 +106,7 @@ type Fact interface { Info() *Info Init(*Init) error Stream(context.Context) error + + // TODO: should we require this here? What about a CallableFact instead? + Call(context.Context) (types.Value, error) } diff --git a/lang/funcs/facts/func.go b/lang/funcs/facts/func.go index dd731fe0..0c98b0be 100644 --- a/lang/funcs/facts/func.go +++ b/lang/funcs/facts/func.go @@ -94,3 +94,8 @@ func (obj *FactFunc) Init(init *interfaces.Init) error { func (obj *FactFunc) Stream(ctx context.Context) error { return obj.Fact.Stream(ctx) } + +// Call this fact and return the value if it is possible to do so at this time. +func (obj *FactFunc) Call(ctx context.Context, _ []types.Value) (types.Value, error) { + return obj.Fact.Call(ctx) +} diff --git a/lang/funcs/operators/operators.go b/lang/funcs/operators/operators.go index f37b7fb5..c8b6fb07 100644 --- a/lang/funcs/operators/operators.go +++ b/lang/funcs/operators/operators.go @@ -64,6 +64,9 @@ func init() { "func(float, float) float", // floating-point addition }), F: func(ctx context.Context, input []types.Value) (types.Value, error) { + if l := len(input); l != 2 { // catch programming bugs + return nil, fmt.Errorf("invalid len %d", l) + } switch k := input[0].Type().Kind; k { case types.KindStr: return &types.StrValue{ @@ -476,6 +479,9 @@ type OperatorFunc struct { init *interfaces.Init last types.Value // last value received to use for diff + lastOp string + fn interfaces.FuncSig + result types.Value // last calculated output } @@ -654,8 +660,6 @@ func (obj *OperatorFunc) Init(init *interfaces.Init) error { // Stream returns the changing values that this func has over time. func (obj *OperatorFunc) Stream(ctx context.Context) error { - var op, lastOp string - var fn interfaces.FuncSig defer close(obj.init.Output) // the sender closes for { select { @@ -685,43 +689,12 @@ func (obj *OperatorFunc) Stream(ctx context.Context) error { return fmt.Errorf("bad args, got: %v, want: %v", keys, obj.Type.Ord) } - // build up arg list - args := []types.Value{} - for _, name := range obj.Type.Ord { - v, exists := input.Struct()[name] - if !exists { - // programming error - return fmt.Errorf("function engine was early, missing arg: %s", name) - } - if name == operatorArgName { - op = v.Str() - continue // skip over the operator arg - } - args = append(args, v) + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err } - if op == "" { - // programming error - return fmt.Errorf("operator cannot be empty, args: %v", keys) - } - // operator selection is dynamic now, although mostly it - // should not change... to do so is probably uncommon... - if fn == nil { - fn = obj.findFunc(op) - - } else if op != lastOp { - // TODO: check sig is compatible instead? - return fmt.Errorf("op changed from %s to %s", lastOp, op) - } - - if fn == nil { - return fmt.Errorf("func not found for operator `%s` with sig: `%+v`", op, obj.Type) - } - lastOp = op - - var result types.Value - - result, err := fn(ctx, args) // (Value, error) + result, err := obj.Call(ctx, args) // (Value, error) if err != nil { return errwrap.Wrapf(err, "problem running function") } @@ -749,6 +722,42 @@ func (obj *OperatorFunc) Stream(ctx context.Context) error { } } +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *OperatorFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + op := args[0].Str() + + if op == "" { + // programming error + return nil, fmt.Errorf("operator cannot be empty, args: %v", args) + } + + // operator selection is dynamic now, although mostly it + // should not change... to do so is probably uncommon... + if obj.fn == nil { + obj.fn = obj.findFunc(op) + + } else if op != obj.lastOp { + // TODO: check sig is compatible instead? + return nil, fmt.Errorf("op changed from %s to %s", obj.lastOp, op) + } + + if obj.fn == nil { + return nil, fmt.Errorf("func not found for operator `%s` with sig: `%+v`", op, obj.Type) + } + obj.lastOp = op + + newArgs := []types.Value{} + for i, x := range args { + if i == 0 { + continue // skip over the operator + } + newArgs = append(newArgs, x) + } + + return obj.fn(ctx, newArgs) // (Value, error) +} + // removeOperatorArg returns a copy of the input KindFunc type, without the // operator arg which specifies which operator we're using. It *is* idempotent. func removeOperatorArg(typ *types.Type) *types.Type { diff --git a/lang/funcs/structs/channel_based_sink.go b/lang/funcs/structs/channel_based_sink.go index b05575c2..983fe631 100644 --- a/lang/funcs/structs/channel_based_sink.go +++ b/lang/funcs/structs/channel_based_sink.go @@ -108,15 +108,20 @@ func (obj *ChannelBasedSinkFunc) Stream(ctx context.Context) error { return nil // can't output any more } - value, exists := input.Struct()[obj.EdgeName] - if !exists { - return fmt.Errorf("programming error, can't find edge") + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err } - if obj.last != nil && value.Cmp(obj.last) == nil { + result, err := obj.Call(ctx, args) // get the value... + if err != nil { + return err + } + + if obj.last != nil && result.Cmp(obj.last) == nil { continue // value didn't change, skip it } - obj.last = value // store so we can send after this select + obj.last = result // store so we can send after this select case <-ctx.Done(): return nil @@ -139,3 +144,13 @@ func (obj *ChannelBasedSinkFunc) Stream(ctx context.Context) error { } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +// XXX: Is is correct to implement this here for this particular function? +func (obj *ChannelBasedSinkFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + if len(args) != 1 { + return nil, fmt.Errorf("programming error, can't find edge") + } + return args[0], nil +} diff --git a/lang/funcs/structs/channel_based_source.go b/lang/funcs/structs/channel_based_source.go index e1e63b5d..96d71543 100644 --- a/lang/funcs/structs/channel_based_source.go +++ b/lang/funcs/structs/channel_based_source.go @@ -113,3 +113,13 @@ func (obj *ChannelBasedSourceFunc) Stream(ctx context.Context) error { } } } + +// XXX: Is is correct to implement this here for this particular function? +// XXX: tricky since this really receives input from a secret channel... +// XXX: ADD A MUTEX AROUND READING obj.last ??? +//func (obj *ChannelBasedSourceFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { +// if obj.last == nil { +// return nil, fmt.Errorf("programming error") +// } +// return obj.last, nil +//} diff --git a/lang/funcs/structs/if.go b/lang/funcs/structs/if.go index 14cc72ef..253c3647 100644 --- a/lang/funcs/structs/if.go +++ b/lang/funcs/structs/if.go @@ -115,12 +115,14 @@ func (obj *IfFunc) Stream(ctx context.Context) error { } obj.last = input // store for next - var result types.Value + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } - if input.Struct()["c"].Bool() { - result = input.Struct()["a"] // true branch - } else { - result = input.Struct()["b"] // false branch + result, err := obj.Call(ctx, args) // get the value... + if err != nil { + return err } // skip sending an update... @@ -141,3 +143,13 @@ func (obj *IfFunc) Stream(ctx context.Context) error { } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +// XXX: Is is correct to implement this here for this particular function? +func (obj *IfFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + if c := args[0].Bool(); c { + return args[1], nil // true branch + } + return args[2], nil +} diff --git a/lang/funcs/wrapped/wrapped.go b/lang/funcs/wrapped/wrapped.go index 96d409e8..f6cc6f16 100644 --- a/lang/funcs/wrapped/wrapped.go +++ b/lang/funcs/wrapped/wrapped.go @@ -150,7 +150,7 @@ func (obj *Func) Stream(ctx context.Context) error { if obj.init.Debug { obj.init.Logf("Calling function with: %+v", values) } - result, err := obj.Fn.Call(ctx, values) // (Value, error) + result, err := obj.Call(ctx, values) // (Value, error) if err != nil { if obj.init.Debug { obj.init.Logf("Function returned error: %+v", err) @@ -181,3 +181,9 @@ func (obj *Func) Stream(ctx context.Context) error { } } } + +// Call this function with the input args and return the value if it is possible +// to do so at this time. +func (obj *Func) Call(ctx context.Context, args []types.Value) (types.Value, error) { + return obj.Fn.Call(ctx, args) // (Value, error) +}