integration: Port tests to use standalone etcd
This ports the integration tests to use a standalone etcd server instead of depending on the flaky elastic etcd clustering. Hopefully we will polish and/or reimplement that at some point in the future, but at least for now let's make things reliable.
This commit is contained in:
@@ -158,6 +158,7 @@ func TestCluster1(t *testing.T) {
|
|||||||
name string
|
name string
|
||||||
code string // mcl code
|
code string // mcl code
|
||||||
fail bool
|
fail bool
|
||||||
|
etcd bool // standalone etcd?
|
||||||
hosts []string
|
hosts []string
|
||||||
expect map[string]map[string]string // hostname, file, contents
|
expect map[string]map[string]string // hostname, file, contents
|
||||||
}
|
}
|
||||||
@@ -178,6 +179,7 @@ func TestCluster1(t *testing.T) {
|
|||||||
name: "simple pair",
|
name: "simple pair",
|
||||||
code: code,
|
code: code,
|
||||||
fail: false,
|
fail: false,
|
||||||
|
etcd: true,
|
||||||
hosts: []string{"h1", "h2"},
|
hosts: []string{"h1", "h2"},
|
||||||
expect: map[string]map[string]string{
|
expect: map[string]map[string]string{
|
||||||
"h1": {
|
"h1": {
|
||||||
@@ -204,6 +206,7 @@ func TestCluster1(t *testing.T) {
|
|||||||
name: "hello world",
|
name: "hello world",
|
||||||
code: code,
|
code: code,
|
||||||
fail: false,
|
fail: false,
|
||||||
|
etcd: true,
|
||||||
hosts: []string{"h1", "h2", "h3"},
|
hosts: []string{"h1", "h2", "h3"},
|
||||||
expect: map[string]map[string]string{
|
expect: map[string]map[string]string{
|
||||||
"h1": {
|
"h1": {
|
||||||
@@ -221,9 +224,10 @@ func TestCluster1(t *testing.T) {
|
|||||||
|
|
||||||
for index, tc := range testCases { // run all the tests
|
for index, tc := range testCases { // run all the tests
|
||||||
t.Run(fmt.Sprintf("test #%d (%s)", index, tc.name), func(t *testing.T) {
|
t.Run(fmt.Sprintf("test #%d (%s)", index, tc.name), func(t *testing.T) {
|
||||||
code, fail, hosts, expect := tc.code, tc.fail, tc.hosts, tc.expect
|
code, fail, etcd, hosts, expect := tc.code, tc.fail, tc.etcd, tc.hosts, tc.expect
|
||||||
|
|
||||||
c := Cluster{
|
c := Cluster{
|
||||||
|
Etcd: etcd,
|
||||||
Hostnames: hosts,
|
Hostnames: hosts,
|
||||||
Preserve: true,
|
Preserve: true,
|
||||||
Debug: false, // TODO: set to true if not too wordy
|
Debug: false, // TODO: set to true if not too wordy
|
||||||
|
|||||||
@@ -22,15 +22,24 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
"os/exec"
|
||||||
"path"
|
"path"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/purpleidea/mgmt/util"
|
||||||
"github.com/purpleidea/mgmt/util/errwrap"
|
"github.com/purpleidea/mgmt/util/errwrap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const etcdHostname = "etcd" // a unique hostname to use for a single etcd server
|
||||||
|
|
||||||
// Cluster represents an mgmt cluster. It uses the instance building blocks to
|
// Cluster represents an mgmt cluster. It uses the instance building blocks to
|
||||||
// run clustered tests.
|
// run clustered tests.
|
||||||
type Cluster struct {
|
type Cluster struct {
|
||||||
|
// Etcd specifies if we should run a standalone etcd instance instead of
|
||||||
|
// using the automatic, built-in etcd clustering.
|
||||||
|
Etcd bool
|
||||||
|
|
||||||
// Hostnames is the list of unique identifiers for this cluster.
|
// Hostnames is the list of unique identifiers for this cluster.
|
||||||
Hostnames []string
|
Hostnames []string
|
||||||
|
|
||||||
@@ -47,16 +56,17 @@ type Cluster struct {
|
|||||||
// dir is the directory where all files will be written under.
|
// dir is the directory where all files will be written under.
|
||||||
dir string
|
dir string
|
||||||
|
|
||||||
|
etcdInstance *Instance
|
||||||
|
|
||||||
instances map[string]*Instance
|
instances map[string]*Instance
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init runs some initialization for this Cluster. It errors if the struct was
|
// Init runs some initialization for this Cluster. It errors if the struct was
|
||||||
// populated in an invalid way, or if it can't initialize correctly.
|
// populated in an invalid way, or if it can't initialize correctly.
|
||||||
func (obj *Cluster) Init() error {
|
func (obj *Cluster) Init() error {
|
||||||
obj.instances = make(map[string]*Instance)
|
var err error
|
||||||
|
|
||||||
// create temporary directory to use during testing
|
// create temporary directory to use during testing
|
||||||
var err error
|
|
||||||
if obj.dir == "" {
|
if obj.dir == "" {
|
||||||
obj.dir, err = ioutil.TempDir("", "mgmt-integration-cluster-")
|
obj.dir, err = ioutil.TempDir("", "mgmt-integration-cluster-")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -64,6 +74,35 @@ func (obj *Cluster) Init() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if obj.Etcd {
|
||||||
|
if util.StrInList(etcdHostname, obj.Hostnames) {
|
||||||
|
return fmt.Errorf("can't use special `%s` hostname for regular hosts", etcdHostname)
|
||||||
|
}
|
||||||
|
|
||||||
|
h := etcdHostname
|
||||||
|
instancePrefix := path.Join(obj.dir, h)
|
||||||
|
if err := os.MkdirAll(instancePrefix, dirMode); err != nil {
|
||||||
|
return errwrap.Wrapf(err, "can't create instance directory")
|
||||||
|
}
|
||||||
|
|
||||||
|
obj.etcdInstance = &Instance{
|
||||||
|
Etcd: true,
|
||||||
|
Hostname: h,
|
||||||
|
Preserve: obj.Preserve,
|
||||||
|
Logf: func(format string, v ...interface{}) {
|
||||||
|
obj.Logf(fmt.Sprintf("instance <%s>: ", h)+format, v...)
|
||||||
|
},
|
||||||
|
Debug: obj.Debug,
|
||||||
|
|
||||||
|
dir: instancePrefix,
|
||||||
|
}
|
||||||
|
if err := obj.etcdInstance.Init(); err != nil {
|
||||||
|
return errwrap.Wrapf(err, "can't create etcd instance")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
obj.instances = make(map[string]*Instance)
|
||||||
|
|
||||||
for _, hostname := range obj.Hostnames {
|
for _, hostname := range obj.Hostnames {
|
||||||
h := hostname
|
h := hostname
|
||||||
instancePrefix := path.Join(obj.dir, h)
|
instancePrefix := path.Join(obj.dir, h)
|
||||||
@@ -72,6 +111,7 @@ func (obj *Cluster) Init() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
obj.instances[h] = &Instance{
|
obj.instances[h] = &Instance{
|
||||||
|
EtcdServer: obj.Etcd, // is the 0th instance an etcd?
|
||||||
Hostname: h,
|
Hostname: h,
|
||||||
Preserve: obj.Preserve,
|
Preserve: obj.Preserve,
|
||||||
Logf: func(format string, v ...interface{}) {
|
Logf: func(format string, v ...interface{}) {
|
||||||
@@ -99,6 +139,12 @@ func (obj *Cluster) Close() error {
|
|||||||
}
|
}
|
||||||
err = errwrap.Append(err, instance.Close())
|
err = errwrap.Append(err, instance.Close())
|
||||||
}
|
}
|
||||||
|
if obj.Etcd {
|
||||||
|
if err := obj.etcdInstance.Close(); err != nil {
|
||||||
|
return errwrap.Wrapf(err, "can't close etcd instance")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !obj.Preserve {
|
if !obj.Preserve {
|
||||||
if obj.dir == "" || obj.dir == "/" {
|
if obj.dir == "" || obj.dir == "/" {
|
||||||
panic("obj.dir is set to a dangerous path")
|
panic("obj.dir is set to a dangerous path")
|
||||||
@@ -112,6 +158,21 @@ func (obj *Cluster) Close() error {
|
|||||||
|
|
||||||
// RunLinear starts up each instance linearly, one at a time.
|
// RunLinear starts up each instance linearly, one at a time.
|
||||||
func (obj *Cluster) RunLinear() error {
|
func (obj *Cluster) RunLinear() error {
|
||||||
|
if obj.Etcd {
|
||||||
|
// Start etcd standalone via `mgmt etcd` built-in sub command.
|
||||||
|
if err := obj.etcdInstance.Run(nil); err != nil {
|
||||||
|
return errwrap.Wrapf(err, "trouble running etcd")
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME: Do we need to wait for etcd to startup?
|
||||||
|
// wait for startup before continuing with the next one
|
||||||
|
//ctx, cancel := context.WithTimeout(context.Background(), longTimeout*time.Second)
|
||||||
|
//defer cancel()
|
||||||
|
//if err := obj.etcdInstance.Wait(ctx); err != nil { // wait to get a converged signal
|
||||||
|
// return errwrap.Wrapf(err, "mgmt wait on etcd failed") // timeout expired
|
||||||
|
//}
|
||||||
|
}
|
||||||
|
|
||||||
for i, h := range obj.Hostnames {
|
for i, h := range obj.Hostnames {
|
||||||
// build a list of earlier instances that have already run
|
// build a list of earlier instances that have already run
|
||||||
seeds := []*Instance{}
|
seeds := []*Instance{}
|
||||||
@@ -125,6 +186,10 @@ func (obj *Cluster) RunLinear() error {
|
|||||||
return fmt.Errorf("instance `%s` not found", h)
|
return fmt.Errorf("instance `%s` not found", h)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if obj.Etcd {
|
||||||
|
seeds = []*Instance{obj.etcdInstance} // use main etcd
|
||||||
|
}
|
||||||
|
|
||||||
if err := instance.Run(seeds); err != nil {
|
if err := instance.Run(seeds); err != nil {
|
||||||
return errwrap.Wrapf(err, "trouble running instance `%s`", h)
|
return errwrap.Wrapf(err, "trouble running instance `%s`", h)
|
||||||
}
|
}
|
||||||
@@ -153,6 +218,9 @@ func (obj *Cluster) Kill() error {
|
|||||||
}
|
}
|
||||||
err = errwrap.Append(err, instance.Kill())
|
err = errwrap.Append(err, instance.Kill())
|
||||||
}
|
}
|
||||||
|
if obj.Etcd {
|
||||||
|
err = errwrap.Append(err, obj.etcdInstance.Kill())
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -172,6 +240,40 @@ func (obj *Cluster) Quit(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
err = errwrap.Append(err, instance.Quit(ctx))
|
err = errwrap.Append(err, instance.Quit(ctx))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
isInterruptSignal := func(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false // not an error ;)
|
||||||
|
}
|
||||||
|
exitErr, ok := err.(*exec.ExitError) // embeds an os.ProcessState
|
||||||
|
if !ok {
|
||||||
|
return false // not an ExitError
|
||||||
|
}
|
||||||
|
|
||||||
|
pStateSys := exitErr.Sys() // (*os.ProcessState) Sys
|
||||||
|
wStatus, ok := pStateSys.(syscall.WaitStatus)
|
||||||
|
if !ok {
|
||||||
|
return false // not what we're looking for
|
||||||
|
}
|
||||||
|
if !wStatus.Signaled() {
|
||||||
|
return false // not a timeout or cancel (no signal)
|
||||||
|
}
|
||||||
|
sig := wStatus.Signal()
|
||||||
|
//exitStatus := wStatus.ExitStatus() // exitStatus == -1
|
||||||
|
|
||||||
|
if sig != os.Interrupt {
|
||||||
|
return false // wrong signal
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if obj.Etcd {
|
||||||
|
// etcd exits non-zero if you ^C it, so ignore it if it's that!
|
||||||
|
if err := obj.etcdInstance.Quit(ctx); err != nil && !isInterruptSignal(err) {
|
||||||
|
err = errwrap.Append(err, obj.etcdInstance.Quit(ctx))
|
||||||
|
}
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -180,6 +282,10 @@ func (obj *Cluster) Quit(ctx context.Context) error {
|
|||||||
// to call wait on each member individually.
|
// to call wait on each member individually.
|
||||||
func (obj *Cluster) Wait(ctx context.Context) error {
|
func (obj *Cluster) Wait(ctx context.Context) error {
|
||||||
var err error
|
var err error
|
||||||
|
// TODO: not implemented
|
||||||
|
//if obj.Etcd {
|
||||||
|
// err = errwrap.Append(err, obj.etcdInstance.Wait(ctx))
|
||||||
|
//}
|
||||||
for _, h := range obj.Hostnames {
|
for _, h := range obj.Hostnames {
|
||||||
instance, exists := obj.instances[h]
|
instance, exists := obj.instances[h]
|
||||||
if !exists {
|
if !exists {
|
||||||
@@ -194,8 +300,12 @@ func (obj *Cluster) Wait(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeployLang deploys some code to the cluster. It arbitrarily picks the first
|
// DeployLang deploys some code to the cluster. It arbitrarily picks the first
|
||||||
// host to run the deploy on.
|
// host to run the deploy on unless there is an etcd server running.
|
||||||
func (obj *Cluster) DeployLang(code string) error {
|
func (obj *Cluster) DeployLang(code string) error {
|
||||||
|
if obj.Etcd {
|
||||||
|
return obj.etcdInstance.DeployLang(code)
|
||||||
|
}
|
||||||
|
|
||||||
if len(obj.Hostnames) == 0 {
|
if len(obj.Hostnames) == 0 {
|
||||||
return fmt.Errorf("must have at least one host to deploy")
|
return fmt.Errorf("must have at least one host to deploy")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -66,7 +66,16 @@ const (
|
|||||||
|
|
||||||
// Instance represents a single running mgmt instance. It is a building block
|
// Instance represents a single running mgmt instance. It is a building block
|
||||||
// that can be used to run standalone tests, or combined to run clustered tests.
|
// that can be used to run standalone tests, or combined to run clustered tests.
|
||||||
|
// It can also be used to run a standalone etcd server instance.
|
||||||
type Instance struct {
|
type Instance struct {
|
||||||
|
// Etcd specifies that this is a pure etcd instance instead of an mgmt
|
||||||
|
// one.
|
||||||
|
Etcd bool
|
||||||
|
|
||||||
|
// EtcdServer specifies we're connecting to an etcd instance instead of
|
||||||
|
// a normal mgmt peer.
|
||||||
|
EtcdServer bool
|
||||||
|
|
||||||
// Hostname is a unique identifier for this instance.
|
// Hostname is a unique identifier for this instance.
|
||||||
Hostname string
|
Hostname string
|
||||||
|
|
||||||
@@ -97,6 +106,10 @@ type Instance struct {
|
|||||||
// Init runs some initialization for this instance. It errors if the struct was
|
// Init runs some initialization for this instance. It errors if the struct was
|
||||||
// populated in an invalid way, or if it can't initialize correctly.
|
// populated in an invalid way, or if it can't initialize correctly.
|
||||||
func (obj *Instance) Init() error {
|
func (obj *Instance) Init() error {
|
||||||
|
if obj.Etcd && obj.EtcdServer {
|
||||||
|
return fmt.Errorf("if we're etcd, we're not connecting to one")
|
||||||
|
}
|
||||||
|
|
||||||
if obj.Hostname == "" {
|
if obj.Hostname == "" {
|
||||||
return fmt.Errorf("must specify a hostname")
|
return fmt.Errorf("must specify a hostname")
|
||||||
}
|
}
|
||||||
@@ -184,6 +197,42 @@ func (obj *Instance) Run(seeds []*Instance) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// run `mgmt etcd`
|
||||||
|
if obj.Etcd {
|
||||||
|
cmdArgs := []string{
|
||||||
|
"etcd", // mode
|
||||||
|
//fmt.Sprintf("--name=%s", obj.Hostname),
|
||||||
|
fmt.Sprintf("--listen-client-urls=%s", obj.clientURL),
|
||||||
|
fmt.Sprintf("--listen-peer-urls=%s", obj.serverURL),
|
||||||
|
fmt.Sprintf("--advertise-client-urls=%s", obj.clientURL),
|
||||||
|
//fmt.Sprintf("--advertise-peer-urls=%s", obj.serverURL),
|
||||||
|
fmt.Sprintf("--data-dir=%s", obj.tmpPrefixDirectory),
|
||||||
|
}
|
||||||
|
|
||||||
|
obj.Logf("run: %s %s", cmdName, strings.Join(cmdArgs, " "))
|
||||||
|
obj.cmd = exec.Command(cmdName, cmdArgs...)
|
||||||
|
//obj.cmd.Env = []string{
|
||||||
|
// fmt.Sprintf("MGMT_TEST_ROOT=%s", obj.testRootDirectory),
|
||||||
|
//}
|
||||||
|
obj.cmd.Dir = obj.tmpPrefixDirectory // run program in pwd if ""
|
||||||
|
|
||||||
|
// output file for storing logs
|
||||||
|
file, err := os.Create(path.Join(obj.dir, StdoutStderrFile))
|
||||||
|
if err != nil {
|
||||||
|
return errwrap.Wrapf(err, "error creating log file")
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
obj.cmd.Stdout = file
|
||||||
|
obj.cmd.Stderr = file
|
||||||
|
|
||||||
|
if err := obj.cmd.Start(); err != nil {
|
||||||
|
return errwrap.Wrapf(err, "error starting etcd")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
cmdArgs := []string{
|
cmdArgs := []string{
|
||||||
"run", // mode
|
"run", // mode
|
||||||
fmt.Sprintf("--hostname=%s", obj.Hostname),
|
fmt.Sprintf("--hostname=%s", obj.Hostname),
|
||||||
@@ -207,6 +256,10 @@ func (obj *Instance) Run(seeds []*Instance) error {
|
|||||||
//s := fmt.Sprintf("--seeds=%s", strings.Join(urls, ","))
|
//s := fmt.Sprintf("--seeds=%s", strings.Join(urls, ","))
|
||||||
cmdArgs = append(cmdArgs, s)
|
cmdArgs = append(cmdArgs, s)
|
||||||
}
|
}
|
||||||
|
if obj.EtcdServer {
|
||||||
|
cmdArgs = append(cmdArgs, "--no-server")
|
||||||
|
cmdArgs = append(cmdArgs, "--ideal-cluster-size=1")
|
||||||
|
}
|
||||||
gapi := "empty" // empty GAPI (for now)
|
gapi := "empty" // empty GAPI (for now)
|
||||||
cmdArgs = append(cmdArgs, gapi)
|
cmdArgs = append(cmdArgs, gapi)
|
||||||
obj.Logf("run: %s %s", cmdName, strings.Join(cmdArgs, " "))
|
obj.Logf("run: %s %s", cmdName, strings.Join(cmdArgs, " "))
|
||||||
@@ -291,6 +344,10 @@ func (obj *Instance) Wait(ctx context.Context) error {
|
|||||||
// return fmt.Errorf("no process is running")
|
// return fmt.Errorf("no process is running")
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
if obj.Etcd {
|
||||||
|
return fmt.Errorf("etcd Wait not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
recurse := false
|
recurse := false
|
||||||
recWatcher, err := recwatch.NewRecWatcher(obj.convergedStatusFile, recurse)
|
recWatcher, err := recwatch.NewRecWatcher(obj.convergedStatusFile, recurse)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user