test, integration: Add cluster primitives to integration framework

This further extends the integration framework to add some simple
primitives for building clusters. More complex primitives and patterns
can be added in the future, but this should serve the general cases.
This commit is contained in:
James Shubin
2018-03-09 19:28:55 -05:00
parent f3b99b3940
commit 62d1fc7ed3
6 changed files with 527 additions and 36 deletions

View File

@@ -74,7 +74,9 @@ type Instance struct {
// Debug enables more verbosity.
Debug bool
dir string
// dir is the directory where all files will be written under.
dir string
tmpPrefixDirectory string
testRootDirectory string
convergedStatusFile string
@@ -83,6 +85,7 @@ type Instance struct {
cmd *exec.Cmd
clientURL string // set when launched with run
serverURL string
}
// Init runs some initialization for this instance. It errors if the struct was
@@ -93,10 +96,12 @@ func (obj *Instance) Init() error {
}
// create temporary directory to use during testing
var err error
obj.dir, err = ioutil.TempDir("", fmt.Sprintf("mgmt-integration-%s-", obj.Hostname))
if err != nil {
return errwrap.Wrapf(err, "can't create temporary directory")
if obj.dir == "" {
var err error
obj.dir, err = ioutil.TempDir("", fmt.Sprintf("mgmt-integration-%s-", obj.Hostname))
if err != nil {
return errwrap.Wrapf(err, "can't create temporary directory")
}
}
tmpPrefix := path.Join(obj.dir, PrefixDirectory)
@@ -137,7 +142,36 @@ func (obj *Instance) Run(seeds []*Instance) error {
}
if len(seeds) == 0 {
// set so that Deploy can know where to connect
// also set so that we can allow others to find us and connect
obj.clientURL = "http://127.0.0.1:2379"
obj.serverURL = "http://127.0.0.1:2380"
} else {
// pick next available pair of ports
var maxClientPort, maxServerPort int
for _, instance := range seeds {
clientPort, err := ParsePort(instance.clientURL)
if err != nil {
return errwrap.Wrapf(err, "could not parse client URL from `%s`", instance.Hostname)
}
if clientPort > maxClientPort {
maxClientPort = clientPort
}
serverPort, err := ParsePort(instance.serverURL)
if err != nil {
return errwrap.Wrapf(err, "could not parse server URL from `%s`", instance.Hostname)
}
if serverPort > maxServerPort {
maxServerPort = serverPort
}
}
if maxClientPort+2 == maxServerPort || maxClientPort == maxServerPort+2 {
return fmt.Errorf("port conflict found")
}
obj.clientURL = fmt.Sprintf("http://127.0.0.1:%d", maxClientPort+2) // odd
obj.serverURL = fmt.Sprintf("http://127.0.0.1:%d", maxServerPort+2) // even
}
cmdName, err := BinaryPath()
@@ -147,6 +181,8 @@ func (obj *Instance) Run(seeds []*Instance) error {
cmdArgs := []string{
"run", // mode
fmt.Sprintf("--hostname=%s", obj.Hostname),
fmt.Sprintf("--client-urls=%s", obj.clientURL),
fmt.Sprintf("--server-urls=%s", obj.serverURL),
fmt.Sprintf("--prefix=%s", obj.tmpPrefixDirectory),
fmt.Sprintf("--converged-timeout=%d", convergedTimeout),
"--converged-timeout-no-exit",
@@ -160,9 +196,9 @@ func (obj *Instance) Run(seeds []*Instance) error {
}
urls = append(urls, instance.clientURL)
}
// TODO: we could just pick the first one instead...
//s := fmt.Sprintf("--seeds=%s", urls[0])
s := fmt.Sprintf("--seeds=%s", strings.Join(urls, ","))
s := fmt.Sprintf("--seeds=%s", urls[0])
// TODO: we could just add all the seeds instead...
//s := fmt.Sprintf("--seeds=%s", strings.Join(urls, ","))
cmdArgs = append(cmdArgs, s)
}
obj.cmd = exec.Command(cmdName, cmdArgs...)
@@ -182,6 +218,9 @@ func (obj *Instance) Kill() error {
if obj.cmd == nil {
return nil // already dead
}
if obj.cmd.Process == nil {
return nil // nothing running
}
// cause a stack dump first if we can
_ = obj.cmd.Process.Signal(syscall.SIGQUIT)
@@ -240,8 +279,16 @@ func (obj *Instance) Wait(ctx context.Context) error {
return errwrap.Wrapf(err, "could not watch file")
}
defer recWatcher.Close()
startup := make(chan struct{})
close(startup)
for {
select {
// FIXME: instead of sending one event here, the recwatch
// library should sent one initial event at startup...
case <-startup:
startup = nil
// send an initial event
case event, ok := <-recWatcher.Events():
if !ok {
return fmt.Errorf("file watcher shut down")
@@ -250,38 +297,40 @@ func (obj *Instance) Wait(ctx context.Context) error {
return errwrap.Wrapf(err, "error event received")
}
contents, err := ioutil.ReadFile(obj.convergedStatusFile)
if err != nil {
return errwrap.Wrapf(err, "error reading converged status file")
}
raw := strings.Split(string(contents), "\n")
lines := []string{}
for _, x := range raw {
if x == "" { // drop blank lines!
continue
}
lines = append(lines, x)
}
if c := len(lines); c < obj.convergedStatusIndex {
return fmt.Errorf("file is missing lines or was truncated, got: %d", c)
}
var converged bool
for i := obj.convergedStatusIndex; i < len(lines); i++ {
obj.convergedStatusIndex = i + 1 // new max
line := lines[i]
if line == "true" { // converged!
converged = true
}
}
if converged {
return nil
}
// send event...
case <-ctx.Done():
return ctx.Err()
}
contents, err := ioutil.ReadFile(obj.convergedStatusFile)
if err != nil {
continue // file might not exist yet, wait for an event
}
raw := strings.Split(string(contents), "\n")
lines := []string{}
for _, x := range raw {
if x == "" { // drop blank lines!
continue
}
lines = append(lines, x)
}
if c := len(lines); c < obj.convergedStatusIndex {
return fmt.Errorf("file is missing lines or was truncated, got: %d", c)
}
var converged bool
for i := obj.convergedStatusIndex; i < len(lines); i++ {
obj.convergedStatusIndex = i + 1 // new max
line := lines[i]
if line == "true" { // converged!
converged = true
}
}
if converged {
return nil
}
}
}