lang: interfaces, funcs: Port Func API to new Stream signature

This removes the `Close() error` and replaces it with a more modern
Stream API that takes a context. This removes boilerplate and makes
integration with concurrent code easier. The only downside is that there
isn't an explicit cleanup step, but only one function was even using
that and it was possible to switch it to a defer in Stream.

This also renames the functions from polyfunc to just func which we
determine by API not naming.
This commit is contained in:
James Shubin
2023-05-28 16:20:42 -04:00
parent 6a06f7b2ea
commit b134c4b778
41 changed files with 276 additions and 540 deletions

View File

@@ -239,27 +239,6 @@ use in the other methods.
// Init runs some startup code for this function.
func (obj *FooFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{}) // shutdown signal
return nil
}
```
### Close
```golang
Close() error
```
This is called to cleanup the function. It usually causes the stream to
shutdown. Even if `Stream()` decided to shutdown early, it might still get
called. It is usually called by the engine to tell the function to shutdown.
#### Example
```golang
// Close runs some shutdown code for this function and turns off the stream.
func (obj *FooFunc) Close() error {
close(obj.closeChan) // send a signal to tell the stream to close
return nil
}
```
@@ -267,23 +246,24 @@ func (obj *FooFunc) Close() error {
### Stream
```golang
Stream() error
Stream(context.Context) error
```
`Stream` is where the real _work_ is done. This method is started by the
language function engine. It will run this function while simultaneously sending
it values on the `input` channel. It will only send a complete set of input
it values on the `Input` channel. It will only send a complete set of input
values. You should send a value to the output channel when you have decided that
one should be produced. Make sure to only use input values of the expected type
as declared in the `Info` struct, and send values of the similarly declared
appropriate return type. Failure to do so will may result in a panic and
sadness.
sadness. You must shutdown if the input context cancels. You must close the
`Output` channel if you are done generating new values and/or when you shutdown.
#### Example
```golang
// Stream returns the single value that was generated and then closes.
func (obj *FooFunc) Stream() error {
func (obj *FooFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
var result string
for {
@@ -300,7 +280,7 @@ func (obj *FooFunc) Stream() error {
result = fmt.Sprintf("the input is: %d", ix)
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -309,7 +289,7 @@ func (obj *FooFunc) Stream() error {
V: result,
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
@@ -340,8 +320,6 @@ type FooFunc struct {
init *interfaces.Init
// this space can be used if needed
closeChan chan struct{} // shutdown signal
}
```

View File

@@ -683,7 +683,7 @@ Please see the example functions in
### Stream
```golang
Stream() error
Stream(context.Context) error
```
Stream is called by the function engine when it is ready for your function to
@@ -692,23 +692,8 @@ value. Failure to produce at least one value will probably cause the function
engine to hang waiting for your output. This function must close the `Output`
channel when it has no more values to send. The engine will close the `Input`
channel when it has no more values to send. This may or may not influence
whether or not you close the `Output` channel.
#### Example
```golang
Please see the example functions in
[lang/funcs/core/](https://github.com/purpleidea/mgmt/tree/master/lang/funcs/core/).
```
### Close
```golang
Close() error
```
Close asks the particular function to shutdown its `Stream()` function and
return.
whether or not you close the `Output` channel. You must shutdown if the input
context cancels.
#### Example

View File

@@ -18,6 +18,7 @@
package funcs
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/interfaces"
@@ -37,29 +38,27 @@ const (
)
func init() {
Register(ContainsFuncName, func() interfaces.Func { return &ContainsPolyFunc{} }) // must register the func and name
Register(ContainsFuncName, func() interfaces.Func { return &ContainsFunc{} }) // must register the func and name
}
// ContainsPolyFunc returns true if a value is found in a list. Otherwise false.
type ContainsPolyFunc struct {
// ContainsFunc returns true if a value is found in a list. Otherwise false.
type ContainsFunc struct {
Type *types.Type // this is the type of value stored in our list
init *interfaces.Init
last types.Value // last value received to use for diff
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
// can satisfy the pgraph.Vertex interface.
func (obj *ContainsPolyFunc) String() string {
func (obj *ContainsFunc) String() string {
return ContainsFuncName
}
// ArgGen returns the Nth arg name for this function.
func (obj *ContainsPolyFunc) ArgGen(index int) (string, error) {
func (obj *ContainsFunc) ArgGen(index int) (string, error) {
seq := []string{containsArgNameNeedle, containsArgNameHaystack}
if l := len(seq); index >= l {
return "", fmt.Errorf("index %d exceeds arg length of %d", index, l)
@@ -68,7 +67,7 @@ func (obj *ContainsPolyFunc) ArgGen(index int) (string, error) {
}
// Unify returns the list of invariants that this func produces.
func (obj *ContainsPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) {
func (obj *ContainsFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) {
var invariants []interfaces.Invariant
var invar interfaces.Invariant
@@ -139,7 +138,7 @@ func (obj *ContainsPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant
// invariant. This can only happen once, because by then we'll have given all
// the new information we can, and falsely producing redundant information is a
// good way to stall the solver if it thinks it keeps learning more things!
func (obj *ContainsPolyFunc) fnBuilder(recurse bool, expr, dummyNeedle, dummyHaystack, dummyOut interfaces.Expr) func(fnInvariants []interfaces.Invariant, solved map[interfaces.Expr]*types.Type) ([]interfaces.Invariant, error) {
func (obj *ContainsFunc) fnBuilder(recurse bool, expr, dummyNeedle, dummyHaystack, dummyOut interfaces.Expr) func(fnInvariants []interfaces.Invariant, solved map[interfaces.Expr]*types.Type) ([]interfaces.Invariant, error) {
return func(fnInvariants []interfaces.Invariant, solved map[interfaces.Expr]*types.Type) ([]interfaces.Invariant, error) {
for _, invariant := range fnInvariants {
// search for this special type of invariant
@@ -241,7 +240,7 @@ func (obj *ContainsPolyFunc) fnBuilder(recurse bool, expr, dummyNeedle, dummyHay
// Polymorphisms returns the list of possible function signatures available for
// this static polymorphic function. It relies on type and value hints to limit
// the number of returned possibilities.
func (obj *ContainsPolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) {
func (obj *ContainsFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) {
// TODO: return `variant` as arg for now -- maybe there's a better way?
variant := []*types.Type{types.NewType("func(needle variant, haystack variant) bool")}
@@ -292,7 +291,7 @@ func (obj *ContainsPolyFunc) Polymorphisms(partialType *types.Type, partialValue
// and must be run before Info() and any of the other Func interface methods are
// used. This function is idempotent, as long as the arg isn't changed between
// runs.
func (obj *ContainsPolyFunc) Build(typ *types.Type) error {
func (obj *ContainsFunc) Build(typ *types.Type) error {
// typ is the KindFunc signature we're trying to build...
if typ.Kind != types.KindFunc {
return fmt.Errorf("input type must be of kind func")
@@ -335,7 +334,7 @@ func (obj *ContainsPolyFunc) Build(typ *types.Type) error {
}
// Validate tells us if the input struct takes a valid form.
func (obj *ContainsPolyFunc) Validate() error {
func (obj *ContainsFunc) Validate() error {
if obj.Type == nil { // build must be run first
return fmt.Errorf("type is still unspecified")
}
@@ -344,7 +343,7 @@ func (obj *ContainsPolyFunc) Validate() error {
// Info returns some static info about itself. Build must be called before this
// will return correct data.
func (obj *ContainsPolyFunc) Info() *interfaces.Info {
func (obj *ContainsFunc) Info() *interfaces.Info {
var sig *types.Type
if obj.Type != nil { // don't panic if called speculatively
s := obj.Type.String()
@@ -359,14 +358,13 @@ func (obj *ContainsPolyFunc) Info() *interfaces.Info {
}
// Init runs some startup code for this function.
func (obj *ContainsPolyFunc) Init(init *interfaces.Init) error {
func (obj *ContainsFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *ContainsPolyFunc) Stream() error {
func (obj *ContainsFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -397,20 +395,14 @@ func (obj *ContainsPolyFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *ContainsPolyFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -583,6 +583,9 @@ func TestLiveFuncExec0(t *testing.T) {
valueptrch := make(chan int) // which Nth value are we at?
killTimeline := make(chan struct{}) // ask timeline to exit
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// wait for close signals
wg.Add(1)
go func() {
@@ -617,7 +620,7 @@ func TestLiveFuncExec0(t *testing.T) {
if debug {
logf("Running func")
}
err := handle.Stream() // sends to output chan
err := handle.Stream(ctx) // sends to output chan
t.Logf("test #%d: stream exited with: %+v", index, err)
if debug {
logf("Exiting func")
@@ -740,12 +743,8 @@ func TestLiveFuncExec0(t *testing.T) {
t.Logf("test #%d: timeline finished", index)
close(argch)
t.Logf("test #%d: running Close", index)
if err := handle.Close(); err != nil {
t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: could not close func: %+v", index, err)
return
}
t.Logf("test #%d: running cancel", index)
cancel()
}()
// read everything

View File

@@ -18,6 +18,7 @@
package coredatetime
import (
"context"
"time"
"github.com/purpleidea/mgmt/lang/funcs/facts"
@@ -37,7 +38,6 @@ func init() {
// DateTimeFact is a fact which returns the current date and time.
type DateTimeFact struct {
init *facts.Init
closeChan chan struct{}
}
// String returns a simple name for this fact. This is needed so this struct can
@@ -62,12 +62,11 @@ func (obj *DateTimeFact) Info() *facts.Info {
// Init runs some startup code for this fact.
func (obj *DateTimeFact) Init(init *facts.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this fact has over time.
func (obj *DateTimeFact) Stream() error {
func (obj *DateTimeFact) Stream(ctx context.Context) error {
defer close(obj.init.Output) // always signal when we're done
// XXX: this might be an interesting fact to write because:
// 1) will the sleeps from the ticker be in sync with the second ticker?
@@ -87,7 +86,7 @@ func (obj *DateTimeFact) Stream() error {
startChan = nil // disable
case <-ticker.C: // received the timer event
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -95,14 +94,8 @@ func (obj *DateTimeFact) Stream() error {
case obj.init.Output <- &types.IntValue{ // seconds since 1970...
V: time.Now().Unix(), // .UTC() not necessary
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this fact and turns off the stream.
func (obj *DateTimeFact) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package coredeploy
import (
"context"
"fmt"
"strings"
@@ -49,8 +50,6 @@ type AbsPathFunc struct {
path *string // the active path
result *string // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -91,7 +90,6 @@ func (obj *AbsPathFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *AbsPathFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
if obj.data == nil {
// programming error
return fmt.Errorf("missing function data")
@@ -100,7 +98,7 @@ func (obj *AbsPathFunc) Init(init *interfaces.Init) error {
}
// Stream returns the changing values that this func has over time.
func (obj *AbsPathFunc) Stream() error {
func (obj *AbsPathFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -145,7 +143,7 @@ func (obj *AbsPathFunc) Stream() error {
}
obj.result = &result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -153,14 +151,8 @@ func (obj *AbsPathFunc) Stream() error {
case obj.init.Output <- &types.StrValue{
V: *obj.result,
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *AbsPathFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package coredeploy
import (
"context"
"fmt"
"strings"
@@ -50,8 +51,6 @@ type ReadFileFunc struct {
filename *string // the active filename
result *string // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -92,7 +91,6 @@ func (obj *ReadFileFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *ReadFileFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
if obj.data == nil {
// programming error
return fmt.Errorf("missing function data")
@@ -101,7 +99,7 @@ func (obj *ReadFileFunc) Init(init *interfaces.Init) error {
}
// Stream returns the changing values that this func has over time.
func (obj *ReadFileFunc) Stream() error {
func (obj *ReadFileFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -159,7 +157,7 @@ func (obj *ReadFileFunc) Stream() error {
}
obj.result = &result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -167,14 +165,8 @@ func (obj *ReadFileFunc) Stream() error {
case obj.init.Output <- &types.StrValue{
V: *obj.result,
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *ReadFileFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package coredeploy
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/funcs"
@@ -50,8 +51,6 @@ type ReadFileAbsFunc struct {
filename *string // the active filename
result *string // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -92,7 +91,6 @@ func (obj *ReadFileAbsFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *ReadFileAbsFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
if obj.data == nil {
// programming error
return fmt.Errorf("missing function data")
@@ -101,7 +99,7 @@ func (obj *ReadFileAbsFunc) Init(init *interfaces.Init) error {
}
// Stream returns the changing values that this func has over time.
func (obj *ReadFileAbsFunc) Stream() error {
func (obj *ReadFileAbsFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -145,7 +143,7 @@ func (obj *ReadFileAbsFunc) Stream() error {
}
obj.result = &result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -153,14 +151,8 @@ func (obj *ReadFileAbsFunc) Stream() error {
case obj.init.Output <- &types.StrValue{
V: *obj.result,
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *ReadFileAbsFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package coreexample
import (
"context"
"time"
"github.com/purpleidea/mgmt/lang/funcs/facts"
@@ -40,7 +41,6 @@ func init() {
type FlipFlopFact struct {
init *facts.Init
value bool
closeChan chan struct{}
}
// String returns a simple name for this fact. This is needed so this struct can
@@ -65,12 +65,11 @@ func (obj *FlipFlopFact) Info() *facts.Info {
// Init runs some startup code for this fact.
func (obj *FlipFlopFact) Init(init *facts.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this fact has over time.
func (obj *FlipFlopFact) Stream() error {
func (obj *FlipFlopFact) Stream(ctx context.Context) error {
defer close(obj.init.Output) // always signal when we're done
// TODO: don't hard code 5 sec interval
ticker := time.NewTicker(time.Duration(5) * time.Second)
@@ -85,7 +84,7 @@ func (obj *FlipFlopFact) Stream() error {
startChan = nil // disable
case <-ticker.C: // received the timer event
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -93,16 +92,10 @@ func (obj *FlipFlopFact) Stream() error {
case obj.init.Output <- &types.BoolValue{ // flip
V: obj.value,
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
obj.value = !obj.value // flip it
}
}
// Close runs some shutdown code for this fact and turns off the stream.
func (obj *FlipFlopFact) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -59,8 +59,6 @@ type VUMeterFunc struct {
peak float64
result *string // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -133,12 +131,11 @@ func (obj *VUMeterFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *VUMeterFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *VUMeterFunc) Stream() error {
func (obj *VUMeterFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
ticker := newTicker()
defer ticker.Stop()
@@ -222,7 +219,7 @@ func (obj *VUMeterFunc) Stream() error {
}
obj.result = &result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -230,18 +227,12 @@ func (obj *VUMeterFunc) Stream() error {
case obj.init.Output <- &types.StrValue{
V: *obj.result,
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *VUMeterFunc) Close() error {
close(obj.closeChan)
return nil
}
func newTicker() *time.Ticker {
return time.NewTicker(time.Duration(1) * time.Second)
}

View File

@@ -18,6 +18,7 @@
package corefmt
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/funcs"
@@ -55,8 +56,6 @@ type PrintfFunc struct {
last types.Value // last value received to use for diff
result *string // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -367,12 +366,11 @@ func (obj *PrintfFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *PrintfFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *PrintfFunc) Stream() error {
func (obj *PrintfFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -409,7 +407,7 @@ func (obj *PrintfFunc) Stream() error {
}
obj.result = &result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -417,18 +415,12 @@ func (obj *PrintfFunc) Stream() error {
case obj.init.Output <- &types.StrValue{
V: *obj.result,
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *PrintfFunc) Close() error {
close(obj.closeChan)
return nil
}
// valueToString prints our values how we expect for printf.
// FIXME: if this turns out to be useful, add it to the types package.
func valueToString(value types.Value) string {

View File

@@ -18,6 +18,7 @@
package coreiter
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/funcs"
@@ -62,8 +63,6 @@ type MapFunc struct {
function func([]types.Value) (types.Value, error)
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -556,12 +555,11 @@ func (obj *MapFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *MapFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *MapFunc) Stream() error {
func (obj *MapFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
rtyp := types.NewType(fmt.Sprintf("[]%s", obj.RType.String()))
for {
@@ -613,21 +611,15 @@ func (obj *MapFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *MapFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package coreos
import (
"context"
"fmt"
"io/ioutil"
"sync"
@@ -55,8 +56,6 @@ type ReadFileFunc struct {
wg *sync.WaitGroup
result *string // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -94,13 +93,13 @@ func (obj *ReadFileFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.events = make(chan error)
obj.wg = &sync.WaitGroup{}
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *ReadFileFunc) Stream() error {
func (obj *ReadFileFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
defer close(obj.events) // clean up for fun
defer obj.wg.Wait()
defer func() {
if obj.recWatcher != nil {
@@ -182,7 +181,7 @@ func (obj *ReadFileFunc) Stream() error {
case obj.events <- err:
// send event...
case <-obj.closeChan:
case <-ctx.Done():
// don't block here on shutdown
return
}
@@ -215,7 +214,7 @@ func (obj *ReadFileFunc) Stream() error {
}
obj.result = &result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -223,16 +222,8 @@ func (obj *ReadFileFunc) Stream() error {
case obj.init.Output <- &types.StrValue{
V: *obj.result,
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *ReadFileFunc) Close() error {
close(obj.closeChan)
obj.wg.Wait() // block so we don't exit by the closure of obj.events
close(obj.events) // clean up for fun
return nil
}

View File

@@ -50,8 +50,6 @@ func init() {
// the "Meta:realize" metaparam is set to true.
type SystemFunc struct {
init *interfaces.Init
closeChan chan struct{}
cancel context.CancelFunc
}
@@ -89,12 +87,15 @@ func (obj *SystemFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *SystemFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *SystemFunc) Stream() error {
func (obj *SystemFunc) Stream(ctx context.Context) error {
// XXX: this implementation is a bit awkward especially with the port to
// the Stream(context.Context) signature change. This is a straight port
// but we could refactor this eventually.
// Close the output chan to signal that no more values are coming.
defer close(obj.init.Output)
@@ -115,7 +116,7 @@ func (obj *SystemFunc) Stream() error {
// Kill the current process, if any. A new cancel function is created
// each time a new process is started.
var ctx context.Context
var innerCtx context.Context
defer func() {
if obj.cancel == nil {
return
@@ -132,7 +133,7 @@ func (obj *SystemFunc) Stream() error {
select {
case <-processedChan:
return nil
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
@@ -147,8 +148,8 @@ func (obj *SystemFunc) Stream() error {
// Run the command, connecting it to ctx so we can kill
// it if needed, and to two Readers so we can read its
// stdout and stderr.
ctx, obj.cancel = context.WithCancel(context.Background())
cmd := exec.CommandContext(ctx, "sh", "-c", shellCommand)
innerCtx, obj.cancel = context.WithCancel(context.Background())
cmd := exec.CommandContext(innerCtx, "sh", "-c", shellCommand)
stdoutReader, err := cmd.StdoutPipe()
if err != nil {
return err
@@ -205,14 +206,8 @@ func (obj *SystemFunc) Stream() error {
wg.Wait()
close(processedChan)
}()
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *SystemFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package core // TODO: should this be in its own individual package?
import (
"context"
"crypto/rand"
"fmt"
"math/big"
@@ -56,8 +57,6 @@ type Random1Func struct {
init *interfaces.Init
finished bool // did we send the random string?
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -119,12 +118,11 @@ func generate(length uint16) (string, error) {
// Init runs some startup code for this function.
func (obj *Random1Func) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the single value that was generated and then closes.
func (obj *Random1Func) Stream() error {
func (obj *Random1Func) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
var result string
for {
@@ -153,7 +151,7 @@ func (obj *Random1Func) Stream() error {
return err // no errwrap needed b/c helper func
}
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -164,14 +162,8 @@ func (obj *Random1Func) Stream() error {
// we only send one value, then wait for input to close
obj.finished = true
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *Random1Func) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -20,6 +20,7 @@
package coresys
import (
"context"
"io/ioutil"
"regexp"
"strconv"
@@ -51,7 +52,6 @@ func init() {
// CPUCountFact is a fact that returns the current CPU count.
type CPUCountFact struct {
init *facts.Init
closeChan chan struct{}
}
// String returns a simple name for this fact. This is needed so this struct can
@@ -67,18 +67,16 @@ func (obj *CPUCountFact) Info() *facts.Info {
}
}
// Init runs startup code for this fact. Initializes the closeChan and sets the
// facts.Init variable.
// Init runs startup code for this fact and sets the facts.Init variable.
func (obj *CPUCountFact) Init(init *facts.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this fact has over time. It will
// first poll sysfs to get the initial cpu count, and then receives UEvents from
// the kernel as CPUs are added/removed.
func (obj CPUCountFact) Stream() error {
func (obj CPUCountFact) Stream(ctx context.Context) error {
defer close(obj.init.Output) // signal when we're done
ss, err := socketset.NewSocketSet(rtmGrps, socketFile, unix.NETLINK_KOBJECT_UEVENT)
@@ -152,7 +150,7 @@ func (obj CPUCountFact) Stream() error {
continue
}
}
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -167,18 +165,12 @@ func (obj CPUCountFact) Stream() error {
}:
once = true
// send
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs cleanup code for the fact and turns off the Stream.
func (obj *CPUCountFact) Close() error {
close(obj.closeChan)
return nil
}
// getCPUCount looks in sysfs to get the number of CPUs that are online.
func getCPUCount() (int64, error) {
dat, err := ioutil.ReadFile("/sys/devices/system/cpu/online")

View File

@@ -20,6 +20,7 @@
package coresys
import (
"context"
"testing"
"github.com/purpleidea/mgmt/lang/funcs/facts"
@@ -41,8 +42,9 @@ func TestSimple(t *testing.T) {
return
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer fact.Close()
defer cancel()
Loop:
for {
select {
@@ -54,7 +56,7 @@ func TestSimple(t *testing.T) {
}()
// now start the stream
if err := fact.Stream(); err != nil {
if err := fact.Stream(ctx); err != nil {
t.Error(err)
}
}

View File

@@ -18,6 +18,8 @@
package coresys
import (
"context"
"github.com/purpleidea/mgmt/lang/funcs/facts"
"github.com/purpleidea/mgmt/lang/types"
)
@@ -36,7 +38,6 @@ func init() {
// TODO: support hostnames that change in the future.
type HostnameFact struct {
init *facts.Init
closeChan chan struct{}
}
// String returns a simple name for this fact. This is needed so this struct can
@@ -61,26 +62,19 @@ func (obj *HostnameFact) Info() *facts.Info {
// Init runs some startup code for this fact.
func (obj *HostnameFact) Init(init *facts.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the single value that this fact has, and then closes.
func (obj *HostnameFact) Stream() error {
func (obj *HostnameFact) Stream(ctx context.Context) error {
select {
case obj.init.Output <- &types.StrValue{
V: obj.init.Hostname,
}:
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
close(obj.init.Output) // signal that we're done sending
return nil
}
// Close runs some shutdown code for this fact and turns off the stream.
func (obj *HostnameFact) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package coresys
import (
"context"
"time"
"github.com/purpleidea/mgmt/lang/funcs/facts"
@@ -40,7 +41,6 @@ func init() {
// LoadFact is a fact which returns the current system load.
type LoadFact struct {
init *facts.Init
closeChan chan struct{}
}
// String returns a simple name for this fact. This is needed so this struct can
@@ -65,12 +65,11 @@ func (obj *LoadFact) Info() *facts.Info {
// Init runs some startup code for this fact.
func (obj *LoadFact) Init(init *facts.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this fact has over time.
func (obj *LoadFact) Stream() error {
func (obj *LoadFact) Stream(ctx context.Context) error {
defer close(obj.init.Output) // always signal when we're done
// it seems the different values only update once every 5
@@ -88,7 +87,7 @@ func (obj *LoadFact) Stream() error {
startChan = nil // disable
case <-ticker.C: // received the timer event
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -107,14 +106,8 @@ func (obj *LoadFact) Stream() error {
select {
case obj.init.Output <- st:
// send
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this fact and turns off the stream.
func (obj *LoadFact) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package coresys
import (
"context"
"time"
"github.com/purpleidea/mgmt/lang/funcs/facts"
@@ -38,7 +39,6 @@ func init() {
// UptimeFact is a fact which returns the current uptime of your system.
type UptimeFact struct {
init *facts.Init
closeChan chan struct{}
}
// String returns a simple name for this fact. This is needed so this struct can
@@ -57,12 +57,11 @@ func (obj *UptimeFact) Info() *facts.Info {
// Init runs some startup code for this fact.
func (obj *UptimeFact) Init(init *facts.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this fact has over time.
func (obj *UptimeFact) Stream() error {
func (obj *UptimeFact) Stream(ctx context.Context) error {
defer close(obj.init.Output)
ticker := time.NewTicker(time.Duration(1) * time.Second)
@@ -75,7 +74,7 @@ func (obj *UptimeFact) Stream() error {
startChan = nil
case <-ticker.C:
// send
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -87,14 +86,8 @@ func (obj *UptimeFact) Stream() error {
select {
case obj.init.Output <- &types.IntValue{V: uptime}:
// send
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this fact and turns off the stream.
func (obj *UptimeFact) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -19,6 +19,7 @@ package core // TODO: should this be in its own individual package?
import (
"bytes"
"context"
"fmt"
"reflect"
"strings"
@@ -73,8 +74,6 @@ type TemplateFunc struct {
last types.Value // last value received to use for diff
result *string // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -367,7 +366,6 @@ func (obj *TemplateFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *TemplateFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
@@ -491,7 +489,7 @@ Loop:
}
// Stream returns the changing values that this func has over time.
func (obj *TemplateFunc) Stream() error {
func (obj *TemplateFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -526,7 +524,7 @@ func (obj *TemplateFunc) Stream() error {
}
obj.result = &result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -534,18 +532,12 @@ func (obj *TemplateFunc) Stream() error {
case obj.init.Output <- &types.StrValue{
V: *obj.result,
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *TemplateFunc) Close() error {
close(obj.closeChan)
return nil
}
// safename renames the functions so they're valid inside the template. This is
// a limitation of the template library, and it might be worth moving to a new
// one.

View File

@@ -52,7 +52,6 @@ type ExchangeFunc struct {
result types.Value // last calculated output
watchChan chan error
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -93,14 +92,13 @@ func (obj *ExchangeFunc) Info() *interfaces.Info {
func (obj *ExchangeFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.watchChan = make(chan error) // XXX: sender should close this, but did I implement that part yet???
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *ExchangeFunc) Stream() error {
func (obj *ExchangeFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
select {
@@ -189,21 +187,15 @@ func (obj *ExchangeFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *ExchangeFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -51,7 +51,6 @@ type KVLookupFunc struct {
result types.Value // last calculated output
watchChan chan error
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -90,14 +89,13 @@ func (obj *KVLookupFunc) Info() *interfaces.Info {
func (obj *KVLookupFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.watchChan = make(chan error) // XXX: sender should close this, but did I implement that part yet???
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *KVLookupFunc) Stream() error {
func (obj *KVLookupFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
select {
@@ -143,7 +141,7 @@ func (obj *KVLookupFunc) Stream() error {
select {
case obj.init.Output <- result: // send one!
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -176,25 +174,19 @@ func (obj *KVLookupFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *KVLookupFunc) Close() error {
close(obj.closeChan)
return nil
}
// buildMap builds the result map which we'll need. It uses struct variables.
func (obj *KVLookupFunc) buildMap(ctx context.Context) (types.Value, error) {
keyMap, err := obj.init.World.StrMapGet(ctx, obj.namespace)

View File

@@ -62,12 +62,12 @@ const (
)
func init() {
funcs.ModuleRegister(ModuleName, ScheduleFuncName, func() interfaces.Func { return &SchedulePolyFunc{} })
funcs.ModuleRegister(ModuleName, ScheduleFuncName, func() interfaces.Func { return &ScheduleFunc{} })
}
// SchedulePolyFunc is special function which determines where code should run
// in the cluster.
type SchedulePolyFunc struct {
// ScheduleFunc is special function which determines where code should run in
// the cluster.
type ScheduleFunc struct {
Type *types.Type // this is the type of opts used if specified
built bool // was this function built yet?
@@ -81,17 +81,16 @@ type SchedulePolyFunc struct {
result types.Value // last calculated output
watchChan chan *schedulerResult
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
// can satisfy the pgraph.Vertex interface.
func (obj *SchedulePolyFunc) String() string {
func (obj *ScheduleFunc) String() string {
return ScheduleFuncName
}
// validOpts returns the available mapping of valid opts fields to types.
func (obj *SchedulePolyFunc) validOpts() map[string]*types.Type {
func (obj *ScheduleFunc) validOpts() map[string]*types.Type {
return map[string]*types.Type{
"strategy": types.TypeStr,
"max": types.TypeInt,
@@ -101,7 +100,7 @@ func (obj *SchedulePolyFunc) validOpts() map[string]*types.Type {
}
// ArgGen returns the Nth arg name for this function.
func (obj *SchedulePolyFunc) ArgGen(index int) (string, error) {
func (obj *ScheduleFunc) ArgGen(index int) (string, error) {
seq := []string{scheduleArgNameNamespace, scheduleArgNameOpts} // 2nd arg is optional
if l := len(seq); index >= l {
return "", fmt.Errorf("index %d exceeds arg length of %d", index, l)
@@ -110,7 +109,7 @@ func (obj *SchedulePolyFunc) ArgGen(index int) (string, error) {
}
// Unify returns the list of invariants that this func produces.
func (obj *SchedulePolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) {
func (obj *ScheduleFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) {
var invariants []interfaces.Invariant
var invar interfaces.Invariant
@@ -311,7 +310,7 @@ func (obj *SchedulePolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant
// Polymorphisms returns the list of possible function signatures available for
// this static polymorphic function. It relies on type and value hints to limit
// the number of returned possibilities.
func (obj *SchedulePolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) {
func (obj *ScheduleFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) {
// TODO: technically, we could generate all permutations of the struct!
//variant := []*types.Type{}
//t0 := types.NewType("func(namespace str) []str")
@@ -399,7 +398,7 @@ func (obj *SchedulePolyFunc) Polymorphisms(partialType *types.Type, partialValue
// and must be run before Info() and any of the other Func interface methods are
// used. This function is idempotent, as long as the arg isn't changed between
// runs.
func (obj *SchedulePolyFunc) Build(typ *types.Type) error {
func (obj *ScheduleFunc) Build(typ *types.Type) error {
// typ is the KindFunc signature we're trying to build...
if typ.Kind != types.KindFunc {
return fmt.Errorf("input type must be of kind func")
@@ -482,7 +481,7 @@ func (obj *SchedulePolyFunc) Build(typ *types.Type) error {
}
// Validate tells us if the input struct takes a valid form.
func (obj *SchedulePolyFunc) Validate() error {
func (obj *ScheduleFunc) Validate() error {
if !obj.built {
return fmt.Errorf("function wasn't built yet")
}
@@ -495,7 +494,7 @@ func (obj *SchedulePolyFunc) Validate() error {
// Info returns some static info about itself. Build must be called before this
// will return correct data.
func (obj *SchedulePolyFunc) Info() *interfaces.Info {
func (obj *ScheduleFunc) Info() *interfaces.Info {
// It's important that you don't return a non-nil sig if this is called
// before you're built. Type unification may call it opportunistically.
var typ *types.Type
@@ -515,16 +514,15 @@ func (obj *SchedulePolyFunc) Info() *interfaces.Info {
}
// Init runs some startup code for this function.
func (obj *SchedulePolyFunc) Init(init *interfaces.Init) error {
func (obj *ScheduleFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.watchChan = make(chan *schedulerResult)
obj.closeChan = make(chan struct{})
//obj.init.Debug = true // use this for local debugging
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *SchedulePolyFunc) Stream() error {
func (obj *ScheduleFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -618,24 +616,28 @@ func (obj *SchedulePolyFunc) Stream() error {
// process the stream of scheduling output...
go func() {
defer close(obj.watchChan)
ctx, cancel := context.WithCancel(context.Background())
// XXX: maybe we could share the parent
// ctx, but I have to work out the
// ordering logic first. For now this is
// just a port of what it was before.
newCtx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel() // unblock Next()
defer obj.scheduler.Shutdown()
select {
case <-obj.closeChan:
case <-ctx.Done():
return
}
}()
for {
hosts, err := obj.scheduler.Next(ctx)
hosts, err := obj.scheduler.Next(newCtx)
select {
case obj.watchChan <- &schedulerResult{
hosts: hosts,
err: err,
}:
case <-obj.closeChan:
case <-ctx.Done():
return
}
}
@@ -688,25 +690,19 @@ func (obj *SchedulePolyFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *SchedulePolyFunc) Close() error {
close(obj.closeChan)
return nil
}
// schedulerResult combines our internal events into a single message packet.
type schedulerResult struct {
hosts []string

View File

@@ -18,6 +18,7 @@
package funcs
import (
"context"
"fmt"
"sync"
@@ -35,6 +36,9 @@ type State struct {
handle interfaces.Func // the function (if not nil, we've found it on init)
ctx context.Context // used for shutting down each Stream function.
cancel context.CancelFunc
init bool // have we run Init on our func?
ready bool // has it received all the args it needs at least once?
loaded bool // has the func run at least once ?
@@ -112,6 +116,9 @@ type Engine struct {
streamChan chan error // signals a new graph can be created or problem
ctx context.Context // used for shutting down each Stream function.
cancel context.CancelFunc
closeChan chan struct{} // close signal
wg *sync.WaitGroup
}
@@ -137,6 +144,8 @@ func (obj *Engine) Init() error {
}
obj.topologicalSort = topologicalSort // cache the result
obj.ctx, obj.cancel = context.WithCancel(context.Background()) // top
for _, vertex := range obj.Graph.Vertices() {
// is this an interface we can use?
if _, exists := obj.state[vertex]; exists {
@@ -152,7 +161,13 @@ func (obj *Engine) Init() error {
obj.Logf("Loading func `%s`", vertex)
}
obj.state[vertex] = &State{Expr: expr} // store some state!
innerCtx, innerCancel := context.WithCancel(obj.ctx)
obj.state[vertex] = &State{
Expr: expr,
ctx: innerCtx,
cancel: innerCancel,
} // store some state!
e1 := obj.state[vertex].Init()
e2 := errwrap.Wrapf(e1, "error loading func `%s`", vertex)
@@ -407,7 +422,7 @@ func (obj *Engine) Run() error {
if obj.Debug {
obj.SafeLogf("Running func `%s`", node)
}
err := node.handle.Stream()
err := node.handle.Stream(node.ctx)
if obj.Debug {
obj.SafeLogf("Exiting func `%s`", node)
}
@@ -673,14 +688,10 @@ func (obj *Engine) Close() error {
var err error
for _, vertex := range obj.topologicalSort { // FIXME: should we do this in reverse?
node := obj.state[vertex]
if node.init { // did we Init this func?
if e := node.handle.Close(); e != nil {
e := errwrap.Wrapf(e, "problem closing func `%s`", node)
err = errwrap.Append(err, e) // list of errors
}
}
node.cancel() // ctx
}
close(obj.closeChan)
obj.wg.Wait() // wait so that each func doesn't need to do this in close
obj.cancel() // free
return err
}

View File

@@ -19,6 +19,7 @@
package facts
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/engine"
@@ -83,6 +84,5 @@ type Fact interface {
//Validate() error // currently not needed since no facts are internal
Info() *Info
Init(*Init) error
Stream() error
Close() error
Stream(context.Context) error
}

View File

@@ -18,6 +18,7 @@
package facts
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/interfaces"
@@ -75,11 +76,6 @@ func (obj *FactFunc) Init(init *interfaces.Init) error {
}
// Stream returns the changing values that this function has over time.
func (obj *FactFunc) Stream() error {
return obj.Fact.Stream()
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *FactFunc) Close() error {
return obj.Fact.Close()
func (obj *FactFunc) Stream(ctx context.Context) error {
return obj.Fact.Stream(ctx)
}

View File

@@ -19,6 +19,7 @@
package funcs
import (
"context"
"fmt"
"strings"
"sync"
@@ -150,6 +151,8 @@ func PureFuncExec(handle interfaces.Func, args []types.Value) (types.Value, erro
hostname := "" // XXX: add to interface
debug := false // XXX: add to interface
logf := func(format string, v ...interface{}) {} // XXX: add to interface
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
info := handle.Info()
if !info.Pure {
@@ -217,7 +220,7 @@ func PureFuncExec(handle interfaces.Func, args []types.Value) (types.Value, erro
if debug {
logf("Running func")
}
err := handle.Stream() // sends to output chan
err := handle.Stream(ctx) // sends to output chan
if debug {
logf("Exiting func")
}
@@ -296,10 +299,7 @@ Loop:
}
}
if err := handle.Close(); err != nil {
err = errwrap.Append(err, reterr)
return nil, errwrap.Wrapf(err, "problem closing func")
}
cancel()
if result == nil && reterr == nil {
// programming error

View File

@@ -18,6 +18,7 @@
package funcs // TODO: should this be in its own individual package?
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/interfaces"
@@ -58,8 +59,6 @@ type HistoryFunc struct {
history []types.Value // goes from newest (index->0) to oldest (len()-1)
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -358,12 +357,11 @@ func (obj *HistoryFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *HistoryFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *HistoryFunc) Stream() error {
func (obj *HistoryFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -412,21 +410,15 @@ func (obj *HistoryFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *HistoryFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package funcs
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/interfaces"
@@ -38,29 +39,27 @@ const (
)
func init() {
Register(MapLookupFuncName, func() interfaces.Func { return &MapLookupPolyFunc{} }) // must register the func and name
Register(MapLookupFuncName, func() interfaces.Func { return &MapLookupFunc{} }) // must register the func and name
}
// MapLookupPolyFunc is a key map lookup function.
type MapLookupPolyFunc struct {
// MapLookupFunc is a key map lookup function.
type MapLookupFunc struct {
Type *types.Type // Kind == Map, that is used as the map we lookup
init *interfaces.Init
last types.Value // last value received to use for diff
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
// can satisfy the pgraph.Vertex interface.
func (obj *MapLookupPolyFunc) String() string {
func (obj *MapLookupFunc) String() string {
return MapLookupFuncName
}
// ArgGen returns the Nth arg name for this function.
func (obj *MapLookupPolyFunc) ArgGen(index int) (string, error) {
func (obj *MapLookupFunc) ArgGen(index int) (string, error) {
seq := []string{mapLookupArgNameMap, mapLookupArgNameKey, mapLookupArgNameDef}
if l := len(seq); index >= l {
return "", fmt.Errorf("index %d exceeds arg length of %d", index, l)
@@ -69,7 +68,7 @@ func (obj *MapLookupPolyFunc) ArgGen(index int) (string, error) {
}
// Unify returns the list of invariants that this func produces.
func (obj *MapLookupPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) {
func (obj *MapLookupFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) {
var invariants []interfaces.Invariant
var invar interfaces.Invariant
@@ -369,7 +368,7 @@ func (obj *MapLookupPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invarian
// Polymorphisms returns the list of possible function signatures available for
// this static polymorphic function. It relies on type and value hints to limit
// the number of returned possibilities.
func (obj *MapLookupPolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) {
func (obj *MapLookupFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) {
// TODO: return `variant` as arg for now -- maybe there's a better way?
variant := []*types.Type{types.NewType("func(map variant, key variant, default variant) variant")}
@@ -468,7 +467,7 @@ func (obj *MapLookupPolyFunc) Polymorphisms(partialType *types.Type, partialValu
// and must be run before Info() and any of the other Func interface methods are
// used. This function is idempotent, as long as the arg isn't changed between
// runs.
func (obj *MapLookupPolyFunc) Build(typ *types.Type) error {
func (obj *MapLookupFunc) Build(typ *types.Type) error {
// typ is the KindFunc signature we're trying to build...
if typ.Kind != types.KindFunc {
return fmt.Errorf("input type must be of kind func")
@@ -516,7 +515,7 @@ func (obj *MapLookupPolyFunc) Build(typ *types.Type) error {
}
// Validate tells us if the input struct takes a valid form.
func (obj *MapLookupPolyFunc) Validate() error {
func (obj *MapLookupFunc) Validate() error {
if obj.Type == nil { // build must be run first
return fmt.Errorf("type is still unspecified")
}
@@ -528,7 +527,7 @@ func (obj *MapLookupPolyFunc) Validate() error {
// Info returns some static info about itself. Build must be called before this
// will return correct data.
func (obj *MapLookupPolyFunc) Info() *interfaces.Info {
func (obj *MapLookupFunc) Info() *interfaces.Info {
var typ *types.Type
if obj.Type != nil { // don't panic if called speculatively
// TODO: can obj.Type.Key or obj.Type.Val be nil (a partial) ?
@@ -545,14 +544,13 @@ func (obj *MapLookupPolyFunc) Info() *interfaces.Info {
}
// Init runs some startup code for this function.
func (obj *MapLookupPolyFunc) Init(init *interfaces.Init) error {
func (obj *MapLookupFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *MapLookupPolyFunc) Stream() error {
func (obj *MapLookupFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -589,20 +587,14 @@ func (obj *MapLookupPolyFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *MapLookupPolyFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package funcs // this is here, in case we allow others to register operators...
import (
"context"
"fmt"
"math"
"sort"
@@ -332,7 +333,7 @@ func init() {
},
})
Register(OperatorFuncName, func() interfaces.Func { return &OperatorPolyFunc{} }) // must register the func and name
Register(OperatorFuncName, func() interfaces.Func { return &OperatorFunc{} }) // must register the func and name
}
// OperatorFuncs maps an operator to a list of callable function values.
@@ -422,29 +423,26 @@ func LookupOperatorShort(operator string, size int) ([]*types.Type, error) {
return results, nil
}
// OperatorPolyFunc is an operator function that performs an operation on N
// values.
type OperatorPolyFunc struct {
// OperatorFunc is an operator function that performs an operation on N values.
type OperatorFunc struct {
Type *types.Type // Kind == Function, including operator arg
init *interfaces.Init
last types.Value // last value received to use for diff
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
// can satisfy the pgraph.Vertex interface.
func (obj *OperatorPolyFunc) String() string {
func (obj *OperatorFunc) String() string {
// TODO: return the exact operator if we can guarantee it doesn't change
return OperatorFuncName
}
// argNames returns the maximum list of possible argNames. This can be truncated
// if needed. The first arg name is the operator.
func (obj *OperatorPolyFunc) argNames() ([]string, error) {
func (obj *OperatorFunc) argNames() ([]string, error) {
// we could just do this statically, but i did it dynamically so that I
// wouldn't ever have to remember to update this list...
max := 0
@@ -474,7 +472,7 @@ func (obj *OperatorPolyFunc) argNames() ([]string, error) {
// findFunc tries to find the first available registered operator function that
// matches the Operator/Type pattern requested. If none is found it returns nil.
func (obj *OperatorPolyFunc) findFunc(operator string) *types.FuncValue {
func (obj *OperatorFunc) findFunc(operator string) *types.FuncValue {
fns, exists := OperatorFuncs[operator]
if !exists {
return nil
@@ -489,7 +487,7 @@ func (obj *OperatorPolyFunc) findFunc(operator string) *types.FuncValue {
}
// ArgGen returns the Nth arg name for this function.
func (obj *OperatorPolyFunc) ArgGen(index int) (string, error) {
func (obj *OperatorFunc) ArgGen(index int) (string, error) {
seq, err := obj.argNames()
if err != nil {
return "", err
@@ -501,7 +499,7 @@ func (obj *OperatorPolyFunc) ArgGen(index int) (string, error) {
}
// Unify returns the list of invariants that this func produces.
func (obj *OperatorPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) {
func (obj *OperatorFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) {
var invariants []interfaces.Invariant
var invar interfaces.Invariant
@@ -746,7 +744,7 @@ func (obj *OperatorPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant
// Polymorphisms returns the list of possible function signatures available for
// this static polymorphic function. It relies on type and value hints to limit
// the number of returned possibilities.
func (obj *OperatorPolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) {
func (obj *OperatorFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) {
var op string
var size = -1
@@ -793,7 +791,7 @@ func (obj *OperatorPolyFunc) Polymorphisms(partialType *types.Type, partialValue
// and must be run before Info() and any of the other Func interface methods are
// used. This function is idempotent, as long as the arg isn't changed between
// runs.
func (obj *OperatorPolyFunc) Build(typ *types.Type) error {
func (obj *OperatorFunc) Build(typ *types.Type) error {
// typ is the KindFunc signature we're trying to build...
if len(typ.Ord) < 1 {
return fmt.Errorf("the operator function needs at least 1 arg")
@@ -807,7 +805,7 @@ func (obj *OperatorPolyFunc) Build(typ *types.Type) error {
}
// Validate tells us if the input struct takes a valid form.
func (obj *OperatorPolyFunc) Validate() error {
func (obj *OperatorFunc) Validate() error {
if obj.Type == nil { // build must be run first
return fmt.Errorf("type is still unspecified")
}
@@ -819,7 +817,7 @@ func (obj *OperatorPolyFunc) Validate() error {
// Info returns some static info about itself. Build must be called before this
// will return correct data.
func (obj *OperatorPolyFunc) Info() *interfaces.Info {
func (obj *OperatorFunc) Info() *interfaces.Info {
return &interfaces.Info{
Pure: true,
Memo: false,
@@ -829,14 +827,13 @@ func (obj *OperatorPolyFunc) Info() *interfaces.Info {
}
// Init runs some startup code for this function.
func (obj *OperatorPolyFunc) Init(init *interfaces.Init) error {
func (obj *OperatorFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *OperatorPolyFunc) Stream() error {
func (obj *OperatorFunc) Stream(ctx context.Context) error {
var op, lastOp string
var fn *types.FuncValue
defer close(obj.init.Output) // the sender closes
@@ -896,24 +893,18 @@ func (obj *OperatorPolyFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *OperatorPolyFunc) Close() error {
close(obj.closeChan)
return nil
}
// removeOperatorArg returns a copy of the input KindFunc type, without the
// operator arg which specifies which operator we're using. It *is* idempotent.
func removeOperatorArg(typ *types.Type) *types.Type {

View File

@@ -18,6 +18,7 @@
package simple
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/funcs"
@@ -78,8 +79,6 @@ type WrappedFunc struct {
last types.Value // last value received to use for diff
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -128,12 +127,11 @@ func (obj *WrappedFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *WrappedFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *WrappedFunc) Stream() error {
func (obj *WrappedFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -172,7 +170,7 @@ func (obj *WrappedFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -181,14 +179,8 @@ func (obj *WrappedFunc) Stream() error {
if len(obj.Fn.Type().Ord) == 0 {
return nil // no more values, we're a pure func
}
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *WrappedFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package simplepoly
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/funcs"
@@ -141,8 +142,6 @@ type WrappedFunc struct {
last types.Value // last value received to use for diff
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -546,12 +545,11 @@ func (obj *WrappedFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *WrappedFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *WrappedFunc) Stream() error {
func (obj *WrappedFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -599,7 +597,7 @@ func (obj *WrappedFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -608,14 +606,8 @@ func (obj *WrappedFunc) Stream() error {
if len(obj.fn.Type().Ord) == 0 {
return nil // no more values, we're a pure func
}
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *WrappedFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package funcs
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/interfaces"
@@ -37,11 +38,11 @@ const (
)
func init() {
Register(StructLookupFuncName, func() interfaces.Func { return &StructLookupPolyFunc{} }) // must register the func and name
Register(StructLookupFuncName, func() interfaces.Func { return &StructLookupFunc{} }) // must register the func and name
}
// StructLookupPolyFunc is a key map lookup function.
type StructLookupPolyFunc struct {
// StructLookupFunc is a struct field lookup function.
type StructLookupFunc struct {
Type *types.Type // Kind == Struct, that is used as the struct we lookup
Out *types.Type // type of field we're extracting
@@ -50,18 +51,16 @@ type StructLookupPolyFunc struct {
field string
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
// can satisfy the pgraph.Vertex interface.
func (obj *StructLookupPolyFunc) String() string {
func (obj *StructLookupFunc) String() string {
return StructLookupFuncName
}
// ArgGen returns the Nth arg name for this function.
func (obj *StructLookupPolyFunc) ArgGen(index int) (string, error) {
func (obj *StructLookupFunc) ArgGen(index int) (string, error) {
seq := []string{structLookupArgNameStruct, structLookupArgNameField}
if l := len(seq); index >= l {
return "", fmt.Errorf("index %d exceeds arg length of %d", index, l)
@@ -70,7 +69,7 @@ func (obj *StructLookupPolyFunc) ArgGen(index int) (string, error) {
}
// Unify returns the list of invariants that this func produces.
func (obj *StructLookupPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) {
func (obj *StructLookupFunc) Unify(expr interfaces.Expr) ([]interfaces.Invariant, error) {
var invariants []interfaces.Invariant
var invar interfaces.Invariant
@@ -309,7 +308,7 @@ func (obj *StructLookupPolyFunc) Unify(expr interfaces.Expr) ([]interfaces.Invar
// Polymorphisms returns the list of possible function signatures available for
// this static polymorphic function. It relies on type and value hints to limit
// the number of returned possibilities.
func (obj *StructLookupPolyFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) {
func (obj *StructLookupFunc) Polymorphisms(partialType *types.Type, partialValues []types.Value) ([]*types.Type, error) {
// TODO: return `variant` as arg for now -- maybe there's a better way?
variant := []*types.Type{types.NewType("func(struct variant, field str) variant")}
@@ -395,7 +394,7 @@ func (obj *StructLookupPolyFunc) Polymorphisms(partialType *types.Type, partialV
// and must be run before Info() and any of the other Func interface methods are
// used. This function is idempotent, as long as the arg isn't changed between
// runs.
func (obj *StructLookupPolyFunc) Build(typ *types.Type) error {
func (obj *StructLookupFunc) Build(typ *types.Type) error {
// typ is the KindFunc signature we're trying to build...
if typ.Kind != types.KindFunc {
return fmt.Errorf("input type must be of kind func")
@@ -434,7 +433,7 @@ func (obj *StructLookupPolyFunc) Build(typ *types.Type) error {
}
// Validate tells us if the input struct takes a valid form.
func (obj *StructLookupPolyFunc) Validate() error {
func (obj *StructLookupFunc) Validate() error {
if obj.Type == nil { // build must be run first
return fmt.Errorf("type is still unspecified")
}
@@ -455,7 +454,7 @@ func (obj *StructLookupPolyFunc) Validate() error {
// Info returns some static info about itself. Build must be called before this
// will return correct data.
func (obj *StructLookupPolyFunc) Info() *interfaces.Info {
func (obj *StructLookupFunc) Info() *interfaces.Info {
var sig *types.Type
if obj.Type != nil { // don't panic if called speculatively
// TODO: can obj.Out be nil (a partial) ?
@@ -470,14 +469,13 @@ func (obj *StructLookupPolyFunc) Info() *interfaces.Info {
}
// Init runs some startup code for this function.
func (obj *StructLookupPolyFunc) Init(init *interfaces.Init) error {
func (obj *StructLookupFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *StructLookupPolyFunc) Stream() error {
func (obj *StructLookupFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -522,20 +520,14 @@ func (obj *StructLookupPolyFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *StructLookupPolyFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package structs
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/interfaces"
@@ -46,8 +47,6 @@ type CallFunc struct {
init *interfaces.Init
last types.Value // last value received to use for diff
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -112,14 +111,13 @@ func (obj *CallFunc) Info() *interfaces.Info {
// Init runs some startup code for this composite function.
func (obj *CallFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream takes an input struct in the format as described in the Func and Graph
// methods of the Expr, and returns the actual expected value as a stream based
// on the changing inputs to that value.
func (obj *CallFunc) Stream() error {
func (obj *CallFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -171,21 +169,15 @@ func (obj *CallFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *CallFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package structs
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/interfaces"
@@ -41,8 +42,6 @@ type CompositeFunc struct {
init *interfaces.Init
last types.Value // last value received to use for diff
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -117,14 +116,13 @@ func (obj *CompositeFunc) Info() *interfaces.Info {
// Init runs some startup code for this composite function.
func (obj *CompositeFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream takes an input struct in the format as described in the Func and Graph
// methods of the Expr, and returns the actual expected value as a stream based
// on the changing inputs to that value.
func (obj *CompositeFunc) Stream() error {
func (obj *CompositeFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -139,7 +137,7 @@ func (obj *CompositeFunc) Stream() error {
select {
case obj.init.Output <- result: // send
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
@@ -207,21 +205,15 @@ func (obj *CompositeFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *CompositeFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package structs
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/interfaces"
@@ -34,7 +35,6 @@ type ConstFunc struct {
Value types.Value
init *interfaces.Init
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -70,24 +70,17 @@ func (obj *ConstFunc) Info() *interfaces.Info {
// Init runs some startup code for this const.
func (obj *ConstFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream returns the single value that this const has, and then closes.
func (obj *ConstFunc) Stream() error {
func (obj *ConstFunc) Stream(ctx context.Context) error {
select {
case obj.init.Output <- obj.Value:
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
close(obj.init.Output) // signal that we're done sending
return nil
}
// Close runs some shutdown code for this const and turns off the stream.
func (obj *ConstFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package structs
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/funcs"
@@ -41,8 +42,6 @@ type FunctionFunc struct {
init *interfaces.Init
last types.Value // last value received to use for diff
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -125,14 +124,13 @@ func (obj *FunctionFunc) Info() *interfaces.Info {
// Init runs some startup code for this composite function.
func (obj *FunctionFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream takes an input struct in the format as described in the Func and Graph
// methods of the Expr, and returns the actual expected value as a stream based
// on the changing inputs to that value.
func (obj *FunctionFunc) Stream() error {
func (obj *FunctionFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -158,7 +156,7 @@ func (obj *FunctionFunc) Stream() error {
select {
case obj.init.Output <- result: // send
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
@@ -194,21 +192,15 @@ func (obj *FunctionFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *FunctionFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package structs
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/interfaces"
@@ -37,8 +38,6 @@ type IfFunc struct {
init *interfaces.Init
last types.Value // last value received to use for diff
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -82,14 +81,13 @@ func (obj *IfFunc) Info() *interfaces.Info {
// Init runs some startup code for this if expression function.
func (obj *IfFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream takes an input struct in the format as described in the Func and Graph
// methods of the Expr, and returns the actual expected value as a stream based
// on the changing inputs to that value.
func (obj *IfFunc) Stream() error {
func (obj *IfFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -119,21 +117,15 @@ func (obj *IfFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *IfFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package structs
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/lang/interfaces"
@@ -40,8 +41,6 @@ type VarFunc struct {
init *interfaces.Init
last types.Value // last value received to use for diff
result types.Value // last calculated output
closeChan chan struct{}
}
// String returns a simple name for this function. This is needed so this struct
@@ -84,14 +83,13 @@ func (obj *VarFunc) Info() *interfaces.Info {
// Init runs some startup code for this composite function.
func (obj *VarFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}
// Stream takes an input struct in the format as described in the Func and Graph
// methods of the Expr, and returns the actual expected value as a stream based
// on the changing inputs to that value.
func (obj *VarFunc) Stream() error {
func (obj *VarFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
@@ -121,21 +119,15 @@ func (obj *VarFunc) Stream() error {
}
obj.result = result // store new result
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}
// Close runs some shutdown code for this function and turns off the stream.
func (obj *VarFunc) Close() error {
close(obj.closeChan)
return nil
}

View File

@@ -18,6 +18,7 @@
package interfaces
import (
"context"
"fmt"
"strings"
@@ -69,9 +70,18 @@ type Func interface {
// not known yet. This is because the Info method might be called
// speculatively to aid in type unification.
Info() *Info
// Init passes some important values and references to the function.
Init(*Init) error
Stream() error
Close() error
// Stream is the mainloop of the function. It reads and writes from
// channels to return the changing values that this func has over time.
// It should shutdown and cleanup when the input context is cancelled.
// It must not exit before any goroutines it spawned have terminated.
// It must close the Output chan if it's done sending new values out. It
// must send at least one value, or return an error. It may also return
// an error at anytime if it can't continue.
Stream(context.Context) error
}
// PolyFunc is an interface for functions which are statically polymorphic. In