From b60d222c816d292e916054ca69136e11566ced07 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Wed, 30 Aug 2023 02:32:17 -0400 Subject: [PATCH] integration: Port tests to use standalone etcd This ports the integration tests to use a standalone etcd server instead of depending on the flaky elastic etcd clustering. Hopefully we will polish and/or reimplement that at some point in the future, but at least for now let's make things reliable. --- integration/basic_test.go | 6 +- integration/cluster.go | 120 ++++++++++++++++++++++++++++++++++++-- integration/instance.go | 57 ++++++++++++++++++ 3 files changed, 177 insertions(+), 6 deletions(-) diff --git a/integration/basic_test.go b/integration/basic_test.go index 4ac6c3a9..7e25da43 100644 --- a/integration/basic_test.go +++ b/integration/basic_test.go @@ -158,6 +158,7 @@ func TestCluster1(t *testing.T) { name string code string // mcl code fail bool + etcd bool // standalone etcd? hosts []string expect map[string]map[string]string // hostname, file, contents } @@ -178,6 +179,7 @@ func TestCluster1(t *testing.T) { name: "simple pair", code: code, fail: false, + etcd: true, hosts: []string{"h1", "h2"}, expect: map[string]map[string]string{ "h1": { @@ -204,6 +206,7 @@ func TestCluster1(t *testing.T) { name: "hello world", code: code, fail: false, + etcd: true, hosts: []string{"h1", "h2", "h3"}, expect: map[string]map[string]string{ "h1": { @@ -221,9 +224,10 @@ func TestCluster1(t *testing.T) { for index, tc := range testCases { // run all the tests t.Run(fmt.Sprintf("test #%d (%s)", index, tc.name), func(t *testing.T) { - code, fail, hosts, expect := tc.code, tc.fail, tc.hosts, tc.expect + code, fail, etcd, hosts, expect := tc.code, tc.fail, tc.etcd, tc.hosts, tc.expect c := Cluster{ + Etcd: etcd, Hostnames: hosts, Preserve: true, Debug: false, // TODO: set to true if not too wordy diff --git a/integration/cluster.go b/integration/cluster.go index e6dc5bf1..e76f69e9 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -22,15 +22,24 @@ import ( "fmt" "io/ioutil" "os" + "os/exec" "path" + "syscall" "time" + "github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util/errwrap" ) +const etcdHostname = "etcd" // a unique hostname to use for a single etcd server + // Cluster represents an mgmt cluster. It uses the instance building blocks to // run clustered tests. type Cluster struct { + // Etcd specifies if we should run a standalone etcd instance instead of + // using the automatic, built-in etcd clustering. + Etcd bool + // Hostnames is the list of unique identifiers for this cluster. Hostnames []string @@ -47,16 +56,17 @@ type Cluster struct { // dir is the directory where all files will be written under. dir string + etcdInstance *Instance + instances map[string]*Instance } // Init runs some initialization for this Cluster. It errors if the struct was // populated in an invalid way, or if it can't initialize correctly. func (obj *Cluster) Init() error { - obj.instances = make(map[string]*Instance) + var err error // create temporary directory to use during testing - var err error if obj.dir == "" { obj.dir, err = ioutil.TempDir("", "mgmt-integration-cluster-") if err != nil { @@ -64,6 +74,35 @@ func (obj *Cluster) Init() error { } } + if obj.Etcd { + if util.StrInList(etcdHostname, obj.Hostnames) { + return fmt.Errorf("can't use special `%s` hostname for regular hosts", etcdHostname) + } + + h := etcdHostname + instancePrefix := path.Join(obj.dir, h) + if err := os.MkdirAll(instancePrefix, dirMode); err != nil { + return errwrap.Wrapf(err, "can't create instance directory") + } + + obj.etcdInstance = &Instance{ + Etcd: true, + Hostname: h, + Preserve: obj.Preserve, + Logf: func(format string, v ...interface{}) { + obj.Logf(fmt.Sprintf("instance <%s>: ", h)+format, v...) + }, + Debug: obj.Debug, + + dir: instancePrefix, + } + if err := obj.etcdInstance.Init(); err != nil { + return errwrap.Wrapf(err, "can't create etcd instance") + } + } + + obj.instances = make(map[string]*Instance) + for _, hostname := range obj.Hostnames { h := hostname instancePrefix := path.Join(obj.dir, h) @@ -72,8 +111,9 @@ func (obj *Cluster) Init() error { } obj.instances[h] = &Instance{ - Hostname: h, - Preserve: obj.Preserve, + EtcdServer: obj.Etcd, // is the 0th instance an etcd? + Hostname: h, + Preserve: obj.Preserve, Logf: func(format string, v ...interface{}) { obj.Logf(fmt.Sprintf("instance <%s>: ", h)+format, v...) }, @@ -99,6 +139,12 @@ func (obj *Cluster) Close() error { } err = errwrap.Append(err, instance.Close()) } + if obj.Etcd { + if err := obj.etcdInstance.Close(); err != nil { + return errwrap.Wrapf(err, "can't close etcd instance") + } + } + if !obj.Preserve { if obj.dir == "" || obj.dir == "/" { panic("obj.dir is set to a dangerous path") @@ -112,6 +158,21 @@ func (obj *Cluster) Close() error { // RunLinear starts up each instance linearly, one at a time. func (obj *Cluster) RunLinear() error { + if obj.Etcd { + // Start etcd standalone via `mgmt etcd` built-in sub command. + if err := obj.etcdInstance.Run(nil); err != nil { + return errwrap.Wrapf(err, "trouble running etcd") + } + + // FIXME: Do we need to wait for etcd to startup? + // wait for startup before continuing with the next one + //ctx, cancel := context.WithTimeout(context.Background(), longTimeout*time.Second) + //defer cancel() + //if err := obj.etcdInstance.Wait(ctx); err != nil { // wait to get a converged signal + // return errwrap.Wrapf(err, "mgmt wait on etcd failed") // timeout expired + //} + } + for i, h := range obj.Hostnames { // build a list of earlier instances that have already run seeds := []*Instance{} @@ -125,6 +186,10 @@ func (obj *Cluster) RunLinear() error { return fmt.Errorf("instance `%s` not found", h) } + if obj.Etcd { + seeds = []*Instance{obj.etcdInstance} // use main etcd + } + if err := instance.Run(seeds); err != nil { return errwrap.Wrapf(err, "trouble running instance `%s`", h) } @@ -153,6 +218,9 @@ func (obj *Cluster) Kill() error { } err = errwrap.Append(err, instance.Kill()) } + if obj.Etcd { + err = errwrap.Append(err, obj.etcdInstance.Kill()) + } return err } @@ -172,6 +240,40 @@ func (obj *Cluster) Quit(ctx context.Context) error { } err = errwrap.Append(err, instance.Quit(ctx)) } + + isInterruptSignal := func(err error) bool { + if err == nil { + return false // not an error ;) + } + exitErr, ok := err.(*exec.ExitError) // embeds an os.ProcessState + if !ok { + return false // not an ExitError + } + + pStateSys := exitErr.Sys() // (*os.ProcessState) Sys + wStatus, ok := pStateSys.(syscall.WaitStatus) + if !ok { + return false // not what we're looking for + } + if !wStatus.Signaled() { + return false // not a timeout or cancel (no signal) + } + sig := wStatus.Signal() + //exitStatus := wStatus.ExitStatus() // exitStatus == -1 + + if sig != os.Interrupt { + return false // wrong signal + } + + return true + } + + if obj.Etcd { + // etcd exits non-zero if you ^C it, so ignore it if it's that! + if err := obj.etcdInstance.Quit(ctx); err != nil && !isInterruptSignal(err) { + err = errwrap.Append(err, obj.etcdInstance.Quit(ctx)) + } + } return err } @@ -180,6 +282,10 @@ func (obj *Cluster) Quit(ctx context.Context) error { // to call wait on each member individually. func (obj *Cluster) Wait(ctx context.Context) error { var err error + // TODO: not implemented + //if obj.Etcd { + // err = errwrap.Append(err, obj.etcdInstance.Wait(ctx)) + //} for _, h := range obj.Hostnames { instance, exists := obj.instances[h] if !exists { @@ -194,8 +300,12 @@ func (obj *Cluster) Wait(ctx context.Context) error { } // DeployLang deploys some code to the cluster. It arbitrarily picks the first -// host to run the deploy on. +// host to run the deploy on unless there is an etcd server running. func (obj *Cluster) DeployLang(code string) error { + if obj.Etcd { + return obj.etcdInstance.DeployLang(code) + } + if len(obj.Hostnames) == 0 { return fmt.Errorf("must have at least one host to deploy") } diff --git a/integration/instance.go b/integration/instance.go index 57c3188b..ef32ebe3 100644 --- a/integration/instance.go +++ b/integration/instance.go @@ -66,7 +66,16 @@ const ( // Instance represents a single running mgmt instance. It is a building block // that can be used to run standalone tests, or combined to run clustered tests. +// It can also be used to run a standalone etcd server instance. type Instance struct { + // Etcd specifies that this is a pure etcd instance instead of an mgmt + // one. + Etcd bool + + // EtcdServer specifies we're connecting to an etcd instance instead of + // a normal mgmt peer. + EtcdServer bool + // Hostname is a unique identifier for this instance. Hostname string @@ -97,6 +106,10 @@ type Instance struct { // Init runs some initialization for this instance. It errors if the struct was // populated in an invalid way, or if it can't initialize correctly. func (obj *Instance) Init() error { + if obj.Etcd && obj.EtcdServer { + return fmt.Errorf("if we're etcd, we're not connecting to one") + } + if obj.Hostname == "" { return fmt.Errorf("must specify a hostname") } @@ -184,6 +197,42 @@ func (obj *Instance) Run(seeds []*Instance) error { if err != nil { return err } + + // run `mgmt etcd` + if obj.Etcd { + cmdArgs := []string{ + "etcd", // mode + //fmt.Sprintf("--name=%s", obj.Hostname), + fmt.Sprintf("--listen-client-urls=%s", obj.clientURL), + fmt.Sprintf("--listen-peer-urls=%s", obj.serverURL), + fmt.Sprintf("--advertise-client-urls=%s", obj.clientURL), + //fmt.Sprintf("--advertise-peer-urls=%s", obj.serverURL), + fmt.Sprintf("--data-dir=%s", obj.tmpPrefixDirectory), + } + + obj.Logf("run: %s %s", cmdName, strings.Join(cmdArgs, " ")) + obj.cmd = exec.Command(cmdName, cmdArgs...) + //obj.cmd.Env = []string{ + // fmt.Sprintf("MGMT_TEST_ROOT=%s", obj.testRootDirectory), + //} + obj.cmd.Dir = obj.tmpPrefixDirectory // run program in pwd if "" + + // output file for storing logs + file, err := os.Create(path.Join(obj.dir, StdoutStderrFile)) + if err != nil { + return errwrap.Wrapf(err, "error creating log file") + } + defer file.Close() + obj.cmd.Stdout = file + obj.cmd.Stderr = file + + if err := obj.cmd.Start(); err != nil { + return errwrap.Wrapf(err, "error starting etcd") + } + + return nil + } + cmdArgs := []string{ "run", // mode fmt.Sprintf("--hostname=%s", obj.Hostname), @@ -207,6 +256,10 @@ func (obj *Instance) Run(seeds []*Instance) error { //s := fmt.Sprintf("--seeds=%s", strings.Join(urls, ",")) cmdArgs = append(cmdArgs, s) } + if obj.EtcdServer { + cmdArgs = append(cmdArgs, "--no-server") + cmdArgs = append(cmdArgs, "--ideal-cluster-size=1") + } gapi := "empty" // empty GAPI (for now) cmdArgs = append(cmdArgs, gapi) obj.Logf("run: %s %s", cmdName, strings.Join(cmdArgs, " ")) @@ -291,6 +344,10 @@ func (obj *Instance) Wait(ctx context.Context) error { // return fmt.Errorf("no process is running") //} + if obj.Etcd { + return fmt.Errorf("etcd Wait not implemented") + } + recurse := false recWatcher, err := recwatch.NewRecWatcher(obj.convergedStatusFile, recurse) if err != nil {