engine: graph: Allow send/recv to work with autogrouped resources

We've previously not received a value from within an autogrouped
resource. It turns out this would be quite useful, and so this patch
implements the additional plumbing and testing so that this works!

Testing that an autogrouped resource can still send values has not been
done at this time.
This commit is contained in:
James Shubin
2023-12-08 18:18:17 -05:00
parent bf5cc63bc5
commit 18e1f08156
5 changed files with 219 additions and 36 deletions

View File

@@ -112,21 +112,35 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
// sendrecv! // sendrecv!
// connect any senders to receivers and detect if values changed // connect any senders to receivers and detect if values changed
// this actually checks and sends into resource trees recursively...
if res, ok := vertex.(engine.RecvableRes); ok { if res, ok := vertex.(engine.RecvableRes); ok {
if updated, err := obj.SendRecv(res); err != nil { if updated, err := obj.SendRecv(res); err != nil {
return errwrap.Wrapf(err, "could not SendRecv") return errwrap.Wrapf(err, "could not SendRecv")
} else if len(updated) > 0 { } else if len(updated) > 0 {
for _, changed := range updated { for r, m := range updated { // map[engine.RecvableRes]map[string]bool
if changed { // at least one was updated v, ok := r.(pgraph.Vertex)
// invalidate cache, mark as dirty if !ok {
obj.state[vertex].tuid.StopTimer() continue
obj.state[vertex].isStateOK = false }
break _, stateExists := obj.state[v] // autogrouped children probably don't have a state
if !stateExists {
continue
}
for _, changed := range m {
if !changed {
continue
}
// if changed == true, at least one was updated
// invalidate cache, mark as dirty
obj.state[v].tuid.StopTimer()
obj.state[v].isStateOK = false
//break // we might have more vertices now
}
// re-validate after we change any values
if err := engine.Validate(r); err != nil {
return errwrap.Wrapf(err, "failed Validate after SendRecv")
} }
}
// re-validate after we change any values
if err := engine.Validate(res); err != nil {
return errwrap.Wrapf(err, "failed Validate after SendRecv")
} }
} }
} }

View File

@@ -20,6 +20,8 @@ package graph
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"sort"
"strings"
"github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine"
engineUtil "github.com/purpleidea/mgmt/engine/util" engineUtil "github.com/purpleidea/mgmt/engine/util"
@@ -29,18 +31,67 @@ import (
// SendRecv pulls in the sent values into the receive slots. It is called by the // SendRecv pulls in the sent values into the receive slots. It is called by the
// receiver and must be given as input the full resource struct to receive on. // receiver and must be given as input the full resource struct to receive on.
// It applies the loaded values to the resource. // It applies the loaded values to the resource. It is called recursively, as it
func (obj *Engine) SendRecv(res engine.RecvableRes) (map[string]bool, error) { // recurses into any grouped resources found within the first receiver. It
recv := res.Recv() // returns a map of resource pointer, to resource field key, to changed boolean.
func (obj *Engine) SendRecv(res engine.RecvableRes) (map[engine.RecvableRes]map[string]bool, error) {
updated := make(map[engine.RecvableRes]map[string]bool) // list of updated keys
if obj.Debug { if obj.Debug {
// NOTE: this could expose private resource data like passwords obj.Logf("SendRecv: %s", res) // receiving here
obj.Logf("%s: SendRecv: %+v", res, recv) }
if groupableRes, ok := res.(engine.GroupableRes); ok {
for _, x := range groupableRes.GetGroup() { // grouped elements
recvableRes, ok := x.(engine.RecvableRes)
if !ok {
continue
}
if obj.Debug {
obj.Logf("SendRecv: %s: grouped: %s", res, x) // receiving here
}
// We need to recurse here so that autogrouped resources
// inside autogrouped resources would work... In case we
// work correctly. We just need to make sure that things
// are grouped in the correct order, but that is not our
// problem! Recurse and merge in the changed results...
innerUpdated, err := obj.SendRecv(recvableRes)
if err != nil {
return nil, errwrap.Wrapf(err, "recursive SendRecv error")
}
for r, m := range innerUpdated { // res ptr, map
if _, exists := updated[r]; !exists {
updated[r] = make(map[string]bool)
}
for s, b := range m {
// don't overwrite in case one exists...
if old, exists := updated[r][s]; exists {
b = b || old // unlikely i think
}
updated[r][s] = b
}
}
}
}
recv := res.Recv()
keys := []string{}
for k := range recv { // map[string]*Send
keys = append(keys, k)
}
sort.Strings(keys)
if obj.Debug && len(keys) > 0 {
// NOTE: this could expose private resource data like passwords
obj.Logf("SendRecv: %s recv: %+v", res, strings.Join(keys, ", "))
} }
var updated = make(map[string]bool) // list of updated keys
var err error var err error
for k, v := range recv { for k, v := range recv { // map[string]*Send
updated[k] = false // default // v.Res // SendableRes // a handle to the resource which is sending a value
v.Changed = false // reset to the default // v.Key // string // the key in the resource that we're sending
if _, exists := updated[res]; !exists {
updated[res] = make(map[string]bool)
}
updated[res][k] = false // default
v.Changed = false // reset to the default
var st interface{} = v.Res // old style direct send/recv var st interface{} = v.Res // old style direct send/recv
if true { // new style send/recv API if true { // new style send/recv API
@@ -167,8 +218,8 @@ func (obj *Engine) SendRecv(res engine.RecvableRes) (map[string]bool, error) {
continue continue
} }
//dest.Set(orig) // do it for all types that match //dest.Set(orig) // do it for all types that match
updated[k] = true // we updated this key! updated[res][k] = true // we updated this key!
v.Changed = true // tag this key as updated! v.Changed = true // tag this key as updated!
obj.Logf("SendRecv: %s.%s -> %s.%s", v.Res, v.Key, res, k) obj.Logf("SendRecv: %s.%s -> %s.%s", v.Res, v.Key, res, k)
} }
return updated, err return updated, err

View File

@@ -0,0 +1,32 @@
# send/recv of value1.any into print1.msg works!
value "value1" {
any => "i am value1",
}
print "print1" {
msg => "i am print1",
Meta:autogroup => false,
}
Value["value1"].any -> Print["print1"].msg
# One of these will be autogrouped into the other! The inner one can receive!
# send/recv from value2.any into print2.msg works
# send/recv from value3.any into (the usually autogrouped) print3 works too!
value "value2" {
any => "i am value2",
}
value "value3" {
any => "i am value3",
}
print "print2" {
msg => "i am print2",
Meta:autogroup => true,
}
print "print3" {
msg => "i am print3",
Meta:autogroup => true,
}
Value["value2"].any -> Print["print2"].msg
Value["value3"].any -> Print["print3"].msg

View File

@@ -36,6 +36,7 @@ import (
"github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/engine/graph" "github.com/purpleidea/mgmt/engine/graph"
"github.com/purpleidea/mgmt/engine/graph/autoedge" "github.com/purpleidea/mgmt/engine/graph/autoedge"
"github.com/purpleidea/mgmt/engine/graph/autogroup"
"github.com/purpleidea/mgmt/engine/local" "github.com/purpleidea/mgmt/engine/local"
engineUtil "github.com/purpleidea/mgmt/engine/util" engineUtil "github.com/purpleidea/mgmt/engine/util"
"github.com/purpleidea/mgmt/etcd" "github.com/purpleidea/mgmt/etcd"
@@ -50,6 +51,7 @@ import (
"github.com/purpleidea/mgmt/lang/unification" "github.com/purpleidea/mgmt/lang/unification"
"github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
"github.com/purpleidea/mgmt/util/errwrap"
"github.com/kylelemons/godebug/pretty" "github.com/kylelemons/godebug/pretty"
"github.com/spf13/afero" "github.com/spf13/afero"
@@ -2061,6 +2063,15 @@ func TestAstFunc3(t *testing.T) {
// TODO: apply the global metaparams to the graph // TODO: apply the global metaparams to the graph
// XXX: can we change this into a ge.Apply operation?
// run autogroup; modifies the graph
if err := ge.AutoGroup(&autogroup.NonReachabilityGrouper{}); err != nil {
//ge.Abort() // delete graph
t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: error running autogrouping: %+v", index, err)
return
}
fastPause := false fastPause := false
ge.Pause(fastPause) // sync ge.Pause(fastPause) // sync
if err := ge.Commit(); err != nil { if err := ge.Commit(); err != nil {
@@ -2091,34 +2102,24 @@ func TestAstFunc3(t *testing.T) {
t.Logf("test #%d: graph: %+v", index, ngraph) t.Logf("test #%d: graph: %+v", index, ngraph)
str := strings.Trim(ngraph.Sprint(), "\n") // text format of output graph str := strings.Trim(ngraph.Sprint(), "\n") // text format of output graph
for i, v := range ngraph.Vertices() { for _, v := range ngraph.Vertices() {
res, ok := v.(engine.Res) res, ok := v.(engine.Res)
if !ok { if !ok {
t.Errorf("test #%d: FAIL\n\n", index) t.Errorf("test #%d: FAIL\n\n", index)
t.Logf("test #%d: unexpected non-resource: %+v", index, v) t.Logf("test #%d: unexpected non-resource: %+v", index, v)
return return
} }
m, err := engineUtil.ResToParamValues(res)
s, err := stringResFields(res)
if err != nil { if err != nil {
t.Errorf("test #%d: FAIL\n\n", index) t.Errorf("test #%d: FAIL\n\n", index)
t.Logf("test #%d: can't read resource: %+v", index, err) t.Logf("test #%d: can't read resource: %+v", index, err)
return return
} }
if i == 0 { if str != "" {
str += "\n"
}
keys := []string{}
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys) // sort for determinism
for _, field := range keys {
v := m[field]
str += fmt.Sprintf("Field: %s[%s].%s = %s\n", res.Kind(), res.Name(), field, v)
}
if i < len(ngraph.Vertices()) {
str += "\n" str += "\n"
} }
str += s
} }
if expstr == magicEmpty { if expstr == magicEmpty {
@@ -2162,3 +2163,40 @@ func TestAstFunc3(t *testing.T) {
t.Skip("skipping all tests...") t.Skip("skipping all tests...")
} }
} }
// stringResFields is a helper function to store a resource graph as a text
// format for test comparisons.
func stringResFields(res engine.Res) (string, error) {
m, err := engineUtil.ResToParamValues(res)
if err != nil {
return "", errwrap.Wrapf(err, "can't read resource %s", res)
}
str := ""
keys := []string{}
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys) // sort for determinism
for _, field := range keys {
v := m[field]
str += fmt.Sprintf("Field: %s[%s].%s = %s\n", res.Kind(), res.Name(), field, v)
}
groupableRes, ok := res.(engine.GroupableRes)
if !ok {
return str, nil
}
for _, x := range groupableRes.GetGroup() { // grouped elements
s, err := stringResFields(x) // recurse
if err != nil {
return "", err
}
// add a prefix to each line?
s = strings.Trim(s, "\n") // trim trailing newlines
for _, f := range strings.Split(s, "\n") {
str += fmt.Sprintf("Group: %s: ", res) + f + "\n"
}
//str += s
}
return str, nil
}

View File

@@ -0,0 +1,48 @@
-- main.mcl --
# send/recv of value1.any into print1.msg works!
value "value1" {
any => "i am value1",
}
print "print1" {
msg => "i am print1",
Meta:autogroup => false,
}
Value["value1"].any -> Print["print1"].msg
# One of these will be autogrouped into the other! The inner one can receive!
# send/recv from value2.any into print2.msg works
# send/recv from value3.any into (the usually autogrouped) print3 works too!
value "value2" {
any => "i am value2",
}
value "value3" {
any => "i am value3",
}
print "print2" {
msg => "i am print2",
Meta:autogroup => true,
}
print "print3" {
msg => "i am print3",
Meta:autogroup => true,
}
Value["value2"].any -> Print["print2"].msg
Value["value3"].any -> Print["print3"].msg
-- OUTPUT --
Edge: value[value1] -> print[print1] # value[value1] -> print[print1]
Edge: value[value2] -> print[print2] # value[value2] -> print[print2]
Edge: value[value3] -> print[print2] # value[value3] -> print[print3]
Field: print[print1].Msg = "i am value1"
Field: print[print2].Msg = "i am value2"
Field: value[value1].Any = "i am value1"
Field: value[value2].Any = "i am value2"
Field: value[value3].Any = "i am value3"
Group: print[print2]: Field: print[print3].Msg = "i am value3"
Vertex: print[print1]
Vertex: print[print2]
Vertex: value[value1]
Vertex: value[value2]
Vertex: value[value3]