etcd: Rewrite embed etcd implementation

This is a giant cleanup of the etcd code. The earlier version was
written when I was less experienced with golang.

This is still not perfect, and does contain some races, but at least
it's a decent base to start from. The automatic elastic clustering
should be considered an experimental feature. If you need a more
battle-tested cluster, then you should manage etcd manually and point
mgmt at your existing cluster.
This commit is contained in:
James Shubin
2018-05-05 17:35:08 -04:00
parent fb275d9537
commit a5842a41b2
56 changed files with 5459 additions and 2654 deletions

View File

@@ -18,6 +18,7 @@
package lib
import (
"context"
"fmt"
"io/ioutil"
"log"
@@ -33,6 +34,8 @@ import (
"github.com/purpleidea/mgmt/engine/graph/autogroup"
_ "github.com/purpleidea/mgmt/engine/resources" // let register's run
"github.com/purpleidea/mgmt/etcd"
"github.com/purpleidea/mgmt/etcd/chooser"
"github.com/purpleidea/mgmt/etcd/deployer"
"github.com/purpleidea/mgmt/gapi"
"github.com/purpleidea/mgmt/gapi/empty"
"github.com/purpleidea/mgmt/pgp"
@@ -44,10 +47,14 @@ import (
etcdtypes "github.com/coreos/etcd/pkg/types"
)
const (
// NS is the root namespace for etcd operations. All keys must use it!
NS = "/_mgmt" // must not end with a slash!
)
// Flags are some constant flags which are used throughout the program.
type Flags struct {
Debug bool // add additional log messages
Trace bool // add execution flow log messages
Verbose bool // add extra log message output
}
@@ -105,7 +112,8 @@ type Main struct {
Prometheus bool // enable prometheus metrics
PrometheusListen string // prometheus instance bind specification
ge *graph.Engine
embdEtcd *etcd.EmbdEtcd // TODO: can be an interface in the future...
ge *graph.Engine
exit *util.EasyExit // exit signal
cleanup []func() error // list of functions to run on close
@@ -140,7 +148,7 @@ func (obj *Main) Init() error {
obj.idealClusterSize = uint16(obj.IdealClusterSize)
if obj.IdealClusterSize < 0 { // value is undefined, set to the default
obj.idealClusterSize = etcd.DefaultIdealClusterSize
obj.idealClusterSize = chooser.DefaultIdealDynamicSize
}
if obj.idealClusterSize < 1 {
@@ -194,7 +202,8 @@ func (obj *Main) Run() error {
hello(obj.Program, obj.Version, obj.Flags) // say hello!
defer Logf("goodbye!")
defer obj.exit.Done(nil) // ensure this gets called even if Exit doesn't
exitCtx := obj.exit.Context() // local exit signal
defer obj.exit.Done(nil) // ensure this gets called even if Exit doesn't
hostname, err := os.Hostname() // a sensible default
// allow passing in the hostname, instead of using the system setting
@@ -243,13 +252,14 @@ func (obj *Main) Run() error {
if err := prom.InitKindMetrics(engine.RegisteredResourcesNames()); err != nil {
return errwrap.Wrapf(err, "can't initialize kind-specific prometheus metrics")
}
obj.cleanup = append(obj.cleanup, func() error {
defer func() {
Logf("prometheus: stopping instance")
if err := prom.Stop(); err != nil {
return errwrap.Wrapf(err, "the prometheus instance exited poorly")
err := errwrap.Wrapf(prom.Stop(), "the prometheus instance exited poorly")
if err != nil {
// TODO: cause the final exit code to be non-zero
Logf("cleanup error: %+v", err)
}
return nil
})
}()
}
if !obj.NoPgp {
@@ -296,6 +306,8 @@ func (obj *Main) Run() error {
exitchan := make(chan struct{}) // exit on close
wg := &sync.WaitGroup{} // waitgroup for inner loop & goroutines
defer wg.Wait() // wait in case we have an early exit
defer obj.exit.Done(nil) // trigger exit in case something blocks
// exit after `max-runtime` seconds for no reason at all...
if i := obj.MaxRuntime; i > 0 {
@@ -335,63 +347,108 @@ func (obj *Main) Run() error {
// XXX: should this be moved to later in the code?
go converger.Run(true) // main loop for converger, true to start paused
converger.Ready() // block until ready
obj.cleanup = append(obj.cleanup, func() error {
defer func() {
// TODO: shutdown converger, but make sure that using it in a
// still running embdEtcd struct doesn't block waiting on it...
converger.Shutdown()
return nil
})
}()
// embedded etcd
if len(obj.seeds) == 0 {
Logf("etcd: seeds: no seeds specified!")
Logf("no seeds specified!")
} else {
Logf("etcd: seeds(%d): %+v", len(obj.seeds), obj.seeds)
Logf("seeds(%d): %+v", len(obj.seeds), obj.seeds)
}
embdEtcd := etcd.NewEmbdEtcd(
hostname,
obj.seeds,
obj.clientURLs,
obj.serverURLs,
obj.advertiseClientURLs,
obj.advertiseServerURLs,
obj.NoServer,
obj.NoNetwork,
obj.idealClusterSize,
etcd.Flags{
Debug: obj.Flags.Debug,
Trace: obj.Flags.Trace,
Verbose: obj.Flags.Verbose,
},
prefix,
converger,
)
if embdEtcd == nil {
return fmt.Errorf("etcd: creation failed")
} else if err := embdEtcd.Startup(); err != nil { // startup (returns when etcd main loop is running)
return errwrap.Wrapf(err, "etcd: startup failed")
}
obj.cleanup = append(obj.cleanup, func() error {
// cleanup etcd main loop last so it can process everything first
err := embdEtcd.Destroy() // shutdown and cleanup etcd
return errwrap.Wrapf(err, "etcd: exited poorly")
})
obj.embdEtcd = &etcd.EmbdEtcd{
Hostname: hostname,
Seeds: obj.seeds,
// wait for etcd server to be ready before continuing...
// XXX: this is wrong if we're not going to be a server! we'll block!!!
// select {
// case <-embdEtcd.ServerReady():
// Logf("etcd: server: ready!")
// // pass
// case <-time.After(((etcd.MaxStartServerTimeout * etcd.MaxStartServerRetries) + 1) * time.Second):
// return fmt.Errorf("etcd: startup timeout")
// }
time.Sleep(1 * time.Second) // XXX: temporary workaround
ClientURLs: obj.clientURLs,
ServerURLs: obj.serverURLs,
AClientURLs: obj.advertiseClientURLs,
AServerURLs: obj.advertiseServerURLs,
NoServer: obj.NoServer,
NoNetwork: obj.NoNetwork,
Chooser: &chooser.DynamicSize{
IdealClusterSize: obj.idealClusterSize,
},
Converger: converger,
NS: NS, // namespace
Prefix: fmt.Sprintf("%s/", path.Join(prefix, "etcd")),
Debug: obj.Flags.Debug,
Logf: func(format string, v ...interface{}) {
log.Printf("etcd: "+format, v...)
},
}
if err := obj.embdEtcd.Init(); err != nil {
return errwrap.Wrapf(err, "etcd init failed")
}
defer func() {
// cleanup etcd main loop last so it can process everything first
err := errwrap.Wrapf(obj.embdEtcd.Close(), "etcd close failed")
if err != nil {
// TODO: cause the final exit code to be non-zero
Logf("cleanup error: %+v", err)
}
}()
var etcdErr error
// don't add a wait group here, this is done in embdEtcd.Destroy()
go func() {
etcdErr = obj.embdEtcd.Run() // returns when it shuts down...
obj.exit.Done(errwrap.Wrapf(etcdErr, "etcd run failed")) // trigger exit
}()
// tell etcd to shutdown, blocks until done!
// TODO: handle/report error?
defer obj.embdEtcd.Destroy()
// wait for etcd to be ready before continuing...
// TODO: do we need to add a timeout here?
select {
case <-obj.embdEtcd.Ready():
Logf("etcd is ready!")
// pass
case <-obj.embdEtcd.Exited():
Logf("etcd was destroyed!")
err := fmt.Errorf("etcd was destroyed on startup")
if etcdErr != nil {
err = etcdErr
}
return err
}
// TODO: should getting a client from EmbdEtcd already come with the NS?
etcdClient, err := obj.embdEtcd.MakeClientFromNamespace(NS)
if err != nil {
return errwrap.Wrapf(err, "make Client failed")
}
simpleDeploy := &deployer.SimpleDeploy{
Client: etcdClient,
Debug: obj.Flags.Debug,
Logf: func(format string, v ...interface{}) {
log.Printf("deploy: "+format, v...)
},
}
if err := simpleDeploy.Init(); err != nil {
return errwrap.Wrapf(err, "deploy Init failed")
}
defer func() {
err := errwrap.Wrapf(simpleDeploy.Close(), "deploy Close failed")
if err != nil {
// TODO: cause the final exit code to be non-zero
Logf("cleanup error: %+v", err)
}
}()
// implementation of the World API (alternatives can be substituted in)
world := &etcd.World{
Hostname: hostname,
EmbdEtcd: embdEtcd,
Client: etcdClient,
MetadataPrefix: MetadataPrefix,
StoragePrefix: StoragePrefix,
StandaloneFs: obj.DeployFs, // used for static deploys
@@ -415,9 +472,16 @@ func (obj *Main) Run() error {
}
if err := obj.ge.Init(); err != nil {
return errwrap.Wrapf(err, "engine: creation failed")
return errwrap.Wrapf(err, "engine Init failed")
}
// After this point, the inner "main loop" must run, so that the engine
defer func() {
err := errwrap.Wrapf(obj.ge.Close(), "engine Close failed")
if err != nil {
// TODO: cause the final exit code to be non-zero
Logf("cleanup error: %+v", err)
}
}()
// After this point, the inner "main loop" will run, so that the engine
// can get closed with the deploy close via the deploy chan shutdown...
// main loop logic starts here
@@ -456,7 +520,7 @@ func (obj *Main) Run() error {
obj.ge.Pause(false)
}
// must be paused before this is run
obj.ge.Close()
//obj.ge.Close() // run in defer instead
return // this is the only place we exit
}
@@ -678,9 +742,10 @@ func (obj *Main) Run() error {
// get max id (from all the previous deploys)
// this is what the existing cluster is already running
// TODO: can this block since we didn't deploy yet?
max, err := etcd.GetMaxDeployID(embdEtcd)
// TODO: add a timeout to context?
max, err := simpleDeploy.GetMaxDeployID(exitCtx)
if err != nil {
close(deployChan) // because we won't close it downstream...
return errwrap.Wrapf(err, "error getting max deploy id")
}
@@ -710,9 +775,24 @@ func (obj *Main) Run() error {
// now we can wait for future deploys, but if we already had an
// initial deploy from run, don't switch to this unless it's new
ctx, cancel := context.WithCancel(context.Background())
watchChan, err := simpleDeploy.WatchDeploy(ctx)
if err != nil {
cancel()
Logf("error starting deploy: %+v", err)
return
}
wg.Add(1)
go func() {
defer wg.Done()
defer cancel() // unblock watch deploy
select { // wait until we're ready to shutdown
case <-exitchan:
}
}()
canceled := false
var last uint64
startChan := make(chan struct{}) // start signal
close(startChan) // kick it off!
for {
if obj.NoDeployWatch && (obj.Deploy != nil || last > 0) {
// block here, because when we close the
@@ -725,29 +805,33 @@ func (obj *Main) Run() error {
}
select {
case <-startChan: // kick the loop once at start
startChan = nil // disable
case err, ok := <-etcd.WatchDeploy(embdEtcd):
// WatchDeploy should send an initial event now...
case err, ok := <-watchChan:
if !ok {
obj.exit.Done(nil) // regular shutdown
// TODO: is any of this needed in here?
if !canceled {
obj.exit.Done(nil) // regular shutdown
}
return
}
if err == context.Canceled {
canceled = true
continue // channel close is coming...
}
if err != nil {
// TODO: it broke, can we restart?
obj.exit.Done(fmt.Errorf("deploy: watch error"))
return
obj.exit.Done(errwrap.Wrapf(err, "deploy: watch error"))
continue
}
startChan = nil // disable it early...
if obj.Flags.Debug {
Logf("deploy: got activity")
}
case <-exitchan:
return
//case <-exitchan:
// return // exit via channel close instead
}
latest, err := etcd.GetMaxDeployID(embdEtcd) // or zero
latest, err := simpleDeploy.GetMaxDeployID(ctx) // or zero
if err != nil {
Logf("error getting max deploy id: %+v", err)
continue
@@ -774,7 +858,7 @@ func (obj *Main) Run() error {
// 0 passes through an empty deploy without an error...
// (unless there is some sort of etcd error that occurs)
str, err := etcd.GetDeploy(embdEtcd, latest)
str, err := simpleDeploy.GetDeploy(ctx, latest)
if err != nil {
Logf("deploy: error getting deploy: %+v", err)
continue
@@ -871,6 +955,9 @@ func (obj *Main) FastExit(err error) {
// might leave some of your resources in a partial or unknown state.
func (obj *Main) Interrupt(err error) {
// XXX: implement and run Interrupt API for supported resources
obj.FastExit(err)
if obj.embdEtcd != nil {
obj.embdEtcd.Interrupt() // unblock borked clusters
}
}