diff --git a/integration/basic_test.go b/integration/basic_test.go index c284395a..1eff1d52 100644 --- a/integration/basic_test.go +++ b/integration/basic_test.go @@ -128,3 +128,125 @@ func TestInstance1(t *testing.T) { }) } } + +func TestCluster0(t *testing.T) { + // TODO: implement a simple test for documentation purposes +} + +func TestCluster1(t *testing.T) { + type test struct { // an individual test + name string + code string // mcl code + fail bool + hosts []string + expect map[string]map[string]string // hostname, file, contents + } + values := []test{} + + { + code := Code(` + $root = getenv("MGMT_TEST_ROOT") + + file "${root}/mgmt-hostname" { + content => "i am ${hostname()}\n", + state => "exists", + } + `) + values = append(values, test{ + name: "simple pair", + code: code, + fail: false, + hosts: []string{"h1", "h2"}, + expect: map[string]map[string]string{ + "h1": { + "mgmt-hostname": "i am h1\n", + }, + "h2": { + "mgmt-hostname": "i am h2\n", + }, + }, + }) + } + { + code := Code(` + $root = getenv("MGMT_TEST_ROOT") + + file "${root}/mgmt-hostname" { + content => "i am ${hostname()}\n", + state => "exists", + } + `) + values = append(values, test{ + name: "hello world", + code: code, + fail: false, + hosts: []string{"h1", "h2", "h3"}, + expect: map[string]map[string]string{ + "h1": { + "mgmt-hostname": "i am h1\n", + }, + "h2": { + "mgmt-hostname": "i am h2\n", + }, + "h3": { + "mgmt-hostname": "i am h3\n", + }, + }, + }) + } + + for index, test := range values { // run all the tests + t.Run(fmt.Sprintf("test #%d (%s)", index, test.name), func(t *testing.T) { + code, fail, hosts, expect := test.code, test.fail, test.hosts, test.expect + + c := Cluster{ + Hostnames: hosts, + Preserve: true, + } + err := c.SimpleDeployLang(code) + if d := c.Dir(); d != "" { + t.Logf("test ran in:\n%s", d) + } + + if !fail && err != nil { + t.Errorf("failed with: %+v", err) + return + } + if fail && err == nil { + t.Errorf("passed, expected fail") + return + } + + if expect == nil { // test done early + return + } + + instances := c.Instances() + for _, h := range hosts { + instance := instances[h] + d := instance.Dir() + hexpect, exists := expect[h] + if !exists { + continue + } + + files := []string{} + for x := range hexpect { + files = append(files, x) + } + sort.Strings(files) // loop in a deterministic order + for _, f := range files { + filename := path.Join(d, RootDirectory, f) + b, err := ioutil.ReadFile(filename) + if err != nil { + t.Errorf("could not read file: `%s`", filename) + continue + } + if hexpect[f] != string(b) { + t.Errorf("file: `%s` did not match expected", f) + } + } + } + }) + } +} diff --git a/integration/cluster.go b/integration/cluster.go new file mode 100644 index 00000000..e34de44b --- /dev/null +++ b/integration/cluster.go @@ -0,0 +1,224 @@ +// Mgmt +// Copyright (C) 2013-2018+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package integration + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path" + "time" + + multierr "github.com/hashicorp/go-multierror" + errwrap "github.com/pkg/errors" +) + +// Cluster represents an mgmt cluster. It uses the instance building blocks to +// run clustered tests. +type Cluster struct { + // Hostnames is the list of unique identifiers for this cluster. + Hostnames []string + + // Preserve prevents the runtime output from being explicitly deleted. + // This is helpful for running analysis or tests on the output. + Preserve bool + + // Debug enables more verbosity. + Debug bool + + // dir is the directory where all files will be written under. + dir string + + instances map[string]*Instance +} + +// Init runs some initialization for this Cluster. It errors if the struct was +// populated in an invalid way, or if it can't initialize correctly. +func (obj *Cluster) Init() error { + obj.instances = make(map[string]*Instance) + + // create temporary directory to use during testing + var err error + if obj.dir == "" { + obj.dir, err = ioutil.TempDir("", "mgmt-integration-cluster-") + if err != nil { + return errwrap.Wrapf(err, "can't create temporary directory") + } + } + + for _, h := range obj.Hostnames { + instancePrefix := path.Join(obj.dir, h) + if err := os.MkdirAll(instancePrefix, dirMode); err != nil { + return errwrap.Wrapf(err, "can't create instance directory") + } + + obj.instances[h] = &Instance{ + Hostname: h, + Preserve: obj.Preserve, + Debug: obj.Debug, + + dir: instancePrefix, + } + if e := obj.instances[h].Init(); e != nil { + err = multierr.Append(err, e) + } + } + + return err +} + +// Close cleans up after we're done with this Cluster. +func (obj *Cluster) Close() error { + var err error + // do this in reverse for fun + for i := len(obj.Hostnames) - 1; i >= 0; i-- { + h := obj.Hostnames[i] + instance, exists := obj.instances[h] + if !exists { + continue + } + if e := instance.Close(); e != nil { + err = multierr.Append(err, e) + } + } + if !obj.Preserve { + if obj.dir == "" || obj.dir == "/" { + panic("obj.dir is set to a dangerous path") + } + if err := os.RemoveAll(obj.dir); err != nil { // dangerous ;) + return errwrap.Wrapf(err, "can't remove instance dir") + } + } + return err +} + +// RunLinear starts up each instance linearly, one at a time. +func (obj *Cluster) RunLinear() error { + for i, h := range obj.Hostnames { + // build a list of earlier instances that have already run + seeds := []*Instance{} + for j := 0; j < i; j++ { + x := obj.instances[obj.Hostnames[j]] + seeds = append(seeds, x) + } + + instance, exists := obj.instances[h] + if !exists { + return fmt.Errorf("instance `%s` not found", h) + } + + if err := instance.Run(seeds); err != nil { + return errwrap.Wrapf(err, "trouble running instance `%s`", h) + } + + // FIXME: consider removing this wait entirely + // wait for startup before continuing with the next one + ctx, cancel := context.WithTimeout(context.Background(), longTimeout*time.Second) + defer cancel() + if err := instance.Wait(ctx); err != nil { // wait to get a converged signal + return errwrap.Wrapf(err, "mgmt wait on instance `%s` failed", h) // timeout expired + } + } + + return nil +} + +// Kill the cluster immediately. This is a `kill -9` for if things get stuck. +func (obj *Cluster) Kill() error { + var err error + // do this in reverse for fun + for i := len(obj.Hostnames) - 1; i >= 0; i-- { + h := obj.Hostnames[i] + instance, exists := obj.instances[h] + if !exists { + continue + } + if e := instance.Kill(); e != nil { + err = multierr.Append(err, e) + } + } + return err +} + +// Quit sends a friendly shutdown request to the cluster. You can specify a +// context if you'd like to exit earlier. If you trigger an early exit with the +// context, then this will end up running a `kill -9` so it can return. Remember +// to leave a longer timeout when using a context since this will have to call +// quit on each member individually. +func (obj *Cluster) Quit(ctx context.Context) error { + var err error + // do this in reverse for fun + for i := len(obj.Hostnames) - 1; i >= 0; i-- { + h := obj.Hostnames[i] + instance, exists := obj.instances[h] + if !exists { + continue + } + if e := instance.Quit(ctx); e != nil { + err = multierr.Append(err, e) + } + } + return err +} + +// Wait until the first converged state is hit for each member in the cluster. +// Remember to leave a longer timeout when using a context since this will have +// to call wait on each member individually. +func (obj *Cluster) Wait(ctx context.Context) error { + var err error + for _, h := range obj.Hostnames { + instance, exists := obj.instances[h] + if !exists { + continue + } + // TODO: do we want individual waits? + //ctx, cancel := context.WithTimeout(context.Background(), longTimeout*time.Second) + //defer cancel() + if e := instance.Wait(ctx); e != nil { + err = multierr.Append(err, e) + } + } + return err +} + +// DeployLang deploys some code to the cluster. It arbitrarily picks the first +// host to run the deploy on. +func (obj *Cluster) DeployLang(code string) error { + if len(obj.Hostnames) == 0 { + return fmt.Errorf("must have at least one host to deploy") + } + h := obj.Hostnames[0] + instance, exists := obj.instances[h] + if !exists { + return fmt.Errorf("instance `%s` not found", h) + } + return instance.DeployLang(code) +} + +// Instances returns the map of instances attached to this cluster. It is most +// useful after a cluster has started. Before Init, it won't have any entries. +func (obj *Cluster) Instances() map[string]*Instance { + return obj.instances +} + +// Dir returns the dir where the instance can write to. You should only use this +// after Init has been called, or it won't have been created and determined yet. +func (obj *Cluster) Dir() string { + return obj.dir +} diff --git a/integration/instance.go b/integration/instance.go index 32b24a8b..400e2aeb 100644 --- a/integration/instance.go +++ b/integration/instance.go @@ -74,7 +74,9 @@ type Instance struct { // Debug enables more verbosity. Debug bool - dir string + // dir is the directory where all files will be written under. + dir string + tmpPrefixDirectory string testRootDirectory string convergedStatusFile string @@ -83,6 +85,7 @@ type Instance struct { cmd *exec.Cmd clientURL string // set when launched with run + serverURL string } // Init runs some initialization for this instance. It errors if the struct was @@ -93,10 +96,12 @@ func (obj *Instance) Init() error { } // create temporary directory to use during testing - var err error - obj.dir, err = ioutil.TempDir("", fmt.Sprintf("mgmt-integration-%s-", obj.Hostname)) - if err != nil { - return errwrap.Wrapf(err, "can't create temporary directory") + if obj.dir == "" { + var err error + obj.dir, err = ioutil.TempDir("", fmt.Sprintf("mgmt-integration-%s-", obj.Hostname)) + if err != nil { + return errwrap.Wrapf(err, "can't create temporary directory") + } } tmpPrefix := path.Join(obj.dir, PrefixDirectory) @@ -137,7 +142,36 @@ func (obj *Instance) Run(seeds []*Instance) error { } if len(seeds) == 0 { + // set so that Deploy can know where to connect + // also set so that we can allow others to find us and connect obj.clientURL = "http://127.0.0.1:2379" + obj.serverURL = "http://127.0.0.1:2380" + } else { + // pick next available pair of ports + var maxClientPort, maxServerPort int + for _, instance := range seeds { + clientPort, err := ParsePort(instance.clientURL) + if err != nil { + return errwrap.Wrapf(err, "could not parse client URL from `%s`", instance.Hostname) + } + if clientPort > maxClientPort { + maxClientPort = clientPort + } + + serverPort, err := ParsePort(instance.serverURL) + if err != nil { + return errwrap.Wrapf(err, "could not parse server URL from `%s`", instance.Hostname) + } + if serverPort > maxServerPort { + maxServerPort = serverPort + } + } + if maxClientPort+2 == maxServerPort || maxClientPort == maxServerPort+2 { + return fmt.Errorf("port conflict found") + } + + obj.clientURL = fmt.Sprintf("http://127.0.0.1:%d", maxClientPort+2) // odd + obj.serverURL = fmt.Sprintf("http://127.0.0.1:%d", maxServerPort+2) // even } cmdName, err := BinaryPath() @@ -147,6 +181,8 @@ func (obj *Instance) Run(seeds []*Instance) error { cmdArgs := []string{ "run", // mode fmt.Sprintf("--hostname=%s", obj.Hostname), + fmt.Sprintf("--client-urls=%s", obj.clientURL), + fmt.Sprintf("--server-urls=%s", obj.serverURL), fmt.Sprintf("--prefix=%s", obj.tmpPrefixDirectory), fmt.Sprintf("--converged-timeout=%d", convergedTimeout), "--converged-timeout-no-exit", @@ -160,9 +196,9 @@ func (obj *Instance) Run(seeds []*Instance) error { } urls = append(urls, instance.clientURL) } - // TODO: we could just pick the first one instead... - //s := fmt.Sprintf("--seeds=%s", urls[0]) - s := fmt.Sprintf("--seeds=%s", strings.Join(urls, ",")) + s := fmt.Sprintf("--seeds=%s", urls[0]) + // TODO: we could just add all the seeds instead... + //s := fmt.Sprintf("--seeds=%s", strings.Join(urls, ",")) cmdArgs = append(cmdArgs, s) } obj.cmd = exec.Command(cmdName, cmdArgs...) @@ -182,6 +218,9 @@ func (obj *Instance) Kill() error { if obj.cmd == nil { return nil // already dead } + if obj.cmd.Process == nil { + return nil // nothing running + } // cause a stack dump first if we can _ = obj.cmd.Process.Signal(syscall.SIGQUIT) @@ -240,8 +279,16 @@ func (obj *Instance) Wait(ctx context.Context) error { return errwrap.Wrapf(err, "could not watch file") } defer recWatcher.Close() + startup := make(chan struct{}) + close(startup) for { select { + // FIXME: instead of sending one event here, the recwatch + // library should sent one initial event at startup... + case <-startup: + startup = nil + // send an initial event + case event, ok := <-recWatcher.Events(): if !ok { return fmt.Errorf("file watcher shut down") @@ -250,38 +297,40 @@ func (obj *Instance) Wait(ctx context.Context) error { return errwrap.Wrapf(err, "error event received") } - contents, err := ioutil.ReadFile(obj.convergedStatusFile) - if err != nil { - return errwrap.Wrapf(err, "error reading converged status file") - } - raw := strings.Split(string(contents), "\n") - lines := []string{} - for _, x := range raw { - if x == "" { // drop blank lines! - continue - } - lines = append(lines, x) - } - - if c := len(lines); c < obj.convergedStatusIndex { - return fmt.Errorf("file is missing lines or was truncated, got: %d", c) - } - - var converged bool - for i := obj.convergedStatusIndex; i < len(lines); i++ { - obj.convergedStatusIndex = i + 1 // new max - line := lines[i] - if line == "true" { // converged! - converged = true - } - } - if converged { - return nil - } + // send event... case <-ctx.Done(): return ctx.Err() } + + contents, err := ioutil.ReadFile(obj.convergedStatusFile) + if err != nil { + continue // file might not exist yet, wait for an event + } + raw := strings.Split(string(contents), "\n") + lines := []string{} + for _, x := range raw { + if x == "" { // drop blank lines! + continue + } + lines = append(lines, x) + } + + if c := len(lines); c < obj.convergedStatusIndex { + return fmt.Errorf("file is missing lines or was truncated, got: %d", c) + } + + var converged bool + for i := obj.convergedStatusIndex; i < len(lines); i++ { + obj.convergedStatusIndex = i + 1 // new max + line := lines[i] + if line == "true" { // converged! + converged = true + } + } + if converged { + return nil + } } } diff --git a/integration/patterns.go b/integration/patterns.go index 1aa82556..6da6b538 100644 --- a/integration/patterns.go +++ b/integration/patterns.go @@ -77,3 +77,59 @@ func (obj *Instance) SimpleDeployLang(code string) error { return nil } + +// SimpleDeployLang is a helper method that takes a struct representing a +// cluster and runs a sequence of methods on it. This particular helper starts +// up a series of instances linearly, deploys some code, and then shuts down. +// Both after initially starting up, after peering each instance, and after +// deploy, it waits for the instance to converge before running the next step. +func (obj *Cluster) SimpleDeployLang(code string) error { + if err := obj.Init(); err != nil { + return errwrap.Wrapf(err, "could not init instance") + } + defer obj.Close() // clean up working directories + + // start the cluster + if err := obj.RunLinear(); err != nil { + return errwrap.Wrapf(err, "mgmt could not start") + } + defer obj.Kill() // do a kill -9 + + // wait for an internal converge signal as a baseline + // FIXME: add this wait if we remove it from RunLinear + //{ + // ctx, cancel := context.WithTimeout(context.Background(), time.Duration(longTimeout*len(obj.Hostnames))*time.Second) + // defer cancel() + // if err := obj.Wait(ctx); err != nil { // wait to get a converged signal + // return errwrap.Wrapf(err, "mgmt initial wait failed") // timeout expired + // } + //} + + // push a deploy + if err := obj.DeployLang(code); err != nil { + return errwrap.Wrapf(err, "mgmt could not deploy") + } + + // wait for an internal converge signal + { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(longTimeout*len(obj.Hostnames))*time.Second) + defer cancel() + if err := obj.Wait(ctx); err != nil { // wait to get a converged signal + return errwrap.Wrapf(err, "mgmt post-deploy wait failed") // timeout expired + } + } + + // press ^C + { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(longTimeout*len(obj.Hostnames))*time.Second) + defer cancel() + if err := obj.Quit(ctx); err != nil { + if err == context.DeadlineExceeded { + return errwrap.Wrapf(err, "mgmt blocked on exit") + } + return errwrap.Wrapf(err, "mgmt exited with error") + } + } + + return nil +} diff --git a/integration/util.go b/integration/util.go index 998f67dc..98c153ee 100644 --- a/integration/util.go +++ b/integration/util.go @@ -19,10 +19,15 @@ package integration import ( "fmt" + "net" + "net/url" "path" "path/filepath" "runtime" + "strconv" "strings" + + errwrap "github.com/pkg/errors" ) const ( @@ -73,3 +78,20 @@ func Code(code string) string { return strings.Join(output, "\n") } + +// ParsePort parses a URL and returns the port that was found. +func ParsePort(input string) (int, error) { + u, err := url.Parse(input) + if err != nil { + return 0, errwrap.Wrapf(err, "could not parse URL") + } + _, sport, err := net.SplitHostPort(u.Host) + if err != nil { + return 0, errwrap.Wrapf(err, "could not get port") + } + port, err := strconv.Atoi(sport) + if err != nil { + return 0, errwrap.Wrapf(err, "could not parse port") + } + return port, nil +} diff --git a/integration/util_test.go b/integration/util_test.go index 0b13aa66..364b3178 100644 --- a/integration/util_test.go +++ b/integration/util_test.go @@ -63,3 +63,21 @@ file "${root}/mgmt-hello-world" { t.Errorf("code samples differ") } } + +func TestParsePort(t *testing.T) { + if port, err := ParsePort("http://127.0.0.1:2379"); err != nil { + t.Errorf("could not determine port: %+v", err) + } else if port != 2379 { + t.Errorf("unexpected port: %d", port) + } + + if port, err := ParsePort("http://127.0.0.1:2381"); err != nil { + t.Errorf("could not determine port: %+v", err) + } else if port != 2381 { + t.Errorf("unexpected port: %d", port) + } + + if port, err := ParsePort("http://127.0.0.1"); err == nil { + t.Errorf("expected error, got: %d", port) + } +}