lang: Test the resource engine briefly

We run the resource engine once and look at its values. This is useful
for testing send/recv in particular.

The converger code is probably not working properly. We'll look into
that subsequently if this gets used a lot.
This commit is contained in:
James Shubin
2023-11-15 18:28:42 -05:00
parent 7105e38544
commit 47c441ba40

View File

@@ -32,7 +32,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/purpleidea/mgmt/converger"
"github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/engine/graph"
"github.com/purpleidea/mgmt/engine/graph/autoedge" "github.com/purpleidea/mgmt/engine/graph/autoedge"
engineUtil "github.com/purpleidea/mgmt/engine/util" engineUtil "github.com/purpleidea/mgmt/engine/util"
"github.com/purpleidea/mgmt/etcd" "github.com/purpleidea/mgmt/etcd"
@@ -1244,7 +1246,7 @@ func TestAstFunc2(t *testing.T) {
// TestAstFunc3 is an even more advanced version which also examines parameter // TestAstFunc3 is an even more advanced version which also examines parameter
// values. It briefly runs the function engine and captures output. Only use // values. It briefly runs the function engine and captures output. Only use
// with stable, static output. // with stable, static output. It also briefly runs the resource engine too!
func TestAstFunc3(t *testing.T) { func TestAstFunc3(t *testing.T) {
const magicError = "# err: " const magicError = "# err: "
const magicErrorLexParse = "errLexParse: " const magicErrorLexParse = "errLexParse: "
@@ -1915,6 +1917,7 @@ func TestAstFunc3(t *testing.T) {
} }
// add automatic edges... // add automatic edges...
// TODO: use ge.AutoEdge() instead?
err = autoedge.AutoEdge(ograph, testing.Verbose(), logf) err = autoedge.AutoEdge(ograph, testing.Verbose(), logf)
if (!fail || !failAutoEdge) && err != nil { if (!fail || !failAutoEdge) && err != nil {
t.Errorf("test #%d: FAIL", index) t.Errorf("test #%d: FAIL", index)
@@ -1939,10 +1942,105 @@ func TestAstFunc3(t *testing.T) {
// TODO: perform autogrouping? // TODO: perform autogrouping?
t.Logf("test #%d: graph: %+v", index, ograph) // TODO: perform reversals?
str := strings.Trim(ograph.Sprint(), "\n") // text format of output graph
for i, v := range ograph.Vertices() { t.Logf("test #%d: graph: %+v", index, ograph)
// setup converger
convergedTimeout := int64(5)
converger := converger.New(
convergedTimeout,
)
converged := make(chan struct{})
converger.AddStateFn("converged-exit", func(isConverged bool) error {
if isConverged {
logf("converged for %d seconds, exiting!", convergedTimeout)
close(converged) // trigger an exit!
}
return nil
})
// TODO: waitgroup ?
go converger.Run(true) // main loop for converger, true to start paused
converger.Ready() // block until ready
defer func() {
// TODO: shutdown converger, but make sure that using it in a
// still running embdEtcd struct doesn't block waiting on it...
converger.Shutdown()
}()
// run engine a bit so that send/recv happens
ge := &graph.Engine{
Program: "testing", // TODO: name it mgmt?
//Version: obj.Version,
Hostname: "localhost",
World: world,
Prefix: fmt.Sprintf("%s/", filepath.Join(tmpdir, "engine")),
Converger: converger,
Debug: testing.Verbose(),
Logf: func(format string, v ...interface{}) {
logf("engine: "+format, v...)
},
}
if err := ge.Init(); err != nil {
t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: engine Init failed with: %+v", index, err)
return
}
defer func() {
if err := ge.Shutdown(); err != nil {
// TODO: cause the final exit code to be non-zero
t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: engine Shutdown failed with: %+v", index, err)
return
}
}()
if err := ge.Load(ograph); err != nil { // copy in new graph
t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: error copying in new graph: %+v", index, err)
return
}
if err := ge.Validate(); err != nil { // validate the new graph
t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: error validating the new graph: %+v", index, err)
return
}
// TODO: apply the global metaparams to the graph
fastPause := false
ge.Pause(fastPause) // sync
if err := ge.Commit(); err != nil {
t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: error running commit: %+v", index, err)
return
}
if err := ge.Resume(); err != nil { // sync
t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: error resuming graph: %+v", index, err)
return
}
// wait for converger instead...
select {
case <-converged:
case <-time.After(5 * time.Second): // temporary
// XXX: add this when we debug converger
//case <-time.After(60 * time.Second): // blocked or non-converged engine?
// t.Errorf("test #%d: FAIL", index)
// t.Errorf("test #%d: stream timeout", index)
// return
}
ngraph := ge.Graph()
t.Logf("test #%d: graph: %+v", index, ngraph)
str := strings.Trim(ngraph.Sprint(), "\n") // text format of output graph
for i, v := range ngraph.Vertices() {
res, ok := v.(engine.Res) res, ok := v.(engine.Res)
if !ok { if !ok {
t.Errorf("test #%d: FAIL\n\n", index) t.Errorf("test #%d: FAIL\n\n", index)
@@ -1967,7 +2065,7 @@ func TestAstFunc3(t *testing.T) {
v := m[field] v := m[field]
str += fmt.Sprintf("Field: %s[%s].%s = %s\n", res.Kind(), res.Name(), field, v) str += fmt.Sprintf("Field: %s[%s].%s = %s\n", res.Kind(), res.Name(), field, v)
} }
if i < len(ograph.Vertices()) { if i < len(ngraph.Vertices()) {
str += "\n" str += "\n"
} }
} }
@@ -1995,11 +2093,11 @@ func TestAstFunc3(t *testing.T) {
return return
} }
for i, v := range ograph.Vertices() { for i, v := range ngraph.Vertices() {
t.Logf("test #%d: vertex(%d): %+v", index, i, v) t.Logf("test #%d: vertex(%d): %+v", index, i, v)
} }
for v1 := range ograph.Adjacency() { for v1 := range ngraph.Adjacency() {
for v2, e := range ograph.Adjacency()[v1] { for v2, e := range ngraph.Adjacency()[v1] {
t.Logf("test #%d: edge(%+v): %+v -> %+v", index, e, v1, v2) t.Logf("test #%d: edge(%+v): %+v -> %+v", index, e, v1, v2)
} }
} }