Files
mgmt/etcd/etcd.go
James Shubin dce83efa96 etcd: Add a special magic option hack
Workaround some legacy code for now.
2025-03-11 04:53:08 -04:00

1473 lines
47 KiB
Go

// Mgmt
// Copyright (C) James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//
// Additional permission under GNU GPL version 3 section 7
//
// If you modify this program, or any covered work, by linking or combining it
// with embedded mcl code and modules (and that the embedded mcl code and
// modules which link with this program, contain a copy of their source code in
// the authoritative form) containing parts covered by the terms of any other
// license, the licensors of this program grant you additional permission to
// convey the resulting work. Furthermore, the licensors of this program grant
// the original author, James Shubin, additional permission to update this
// additional permission if he deems it necessary to achieve the goals of this
// additional permission.
// TODO: remove race around leader operations
// TODO: fix unstarted member
// TODO: add VIP for servers (incorporate with net resource)
// TODO: auto assign ports/ip's for peers (if possible)
// TODO: check the shutdown ordering, so everything unrolls to a shutdown
// TODO: add the converger Register/Unregister stuff and timers if needed
// Package etcd implements the distributed key value store and fs integration.
// This also takes care of managing and clustering of the embedded etcd server.
// The automatic clustering is considered experimental. If you require a more
// robust, battle-test etcd cluster, then manage your own, and point each mgmt
// agent at it with --seeds and --no-server.
//
// # Algorithm
//
// The elastic etcd algorithm works in the following way:
//
// * When you start up mgmt, you can pass it a list of seeds.
//
// * If no seeds are given, then assume you are the first server and startup.
//
// * If a seed is given, connect as a client, and volunteer to be a server.
//
// * All volunteering clients should listen for a message for nomination.
//
// * If a client has been nominated, it should startup a server.
//
// * A server should shutdown if its nomination is removed.
//
// * The elected leader should decide who to nominate/unnominate as needed.
//
// # Notes
//
// If you attempt to add a new member to the cluster with a duplicate hostname,
// then the behaviour is undefined, and you could bork your cluster. This is not
// recommended or supported. Please ensure that your hostnames are unique.
//
// A single ^C requests an orderly shutdown, however a third ^C will ask etcd to
// shutdown forcefully. It is not recommended that you use this option, it
// exists as a way to make exit easier if something deadlocked the cluster. If
// this was due to user error (eg: duplicate hostnames) then it was your fault,
// but if the member did not shutdown from a single ^C under normal
// circumstances, then please file a bug.
//
// There are currently some races in this implementation. In practice, this
// should not cause any adverse effects unless you simultaneously add or remove
// members at a high rate. Fixing these races will probably require some
// internal changes to etcd. Help is welcome if you're interested in working on
// this.
//
// # Smoke testing
//
// Here is a simple way to test etcd clustering basics...
//
// ./mgmt run --tmp-prefix --no-pgp --hostname h1 empty
// ./mgmt run --tmp-prefix --no-pgp --hostname h2 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2381 --server-urls=http://127.0.0.1:2382 empty
// ./mgmt run --tmp-prefix --no-pgp --hostname h3 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2383 --server-urls=http://127.0.0.1:2384 empty
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/chooser/dynamicsize/idealclustersize 3
// ./mgmt run --tmp-prefix --no-pgp --hostname h4 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2385 --server-urls=http://127.0.0.1:2386 empty
// ./mgmt run --tmp-prefix --no-pgp --hostname h5 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2387 --server-urls=http://127.0.0.1:2388 empty
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/chooser/dynamicsize/idealclustersize 5
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list
//
// # Bugs
//
// A member might occasionally think that an endpoint still exists after it has
// already shutdown. This isn't a major issue, since if that endpoint doesn't
// respond, then it will automatically choose the next available one. To see
// this issue, turn on debugging and start: H1, H2, H3, then stop H2, and you
// might see that H3 still knows about H2.
//
// Shutting down a cluster by setting the idealclustersize to zero is currently
// buggy and not supported. Try this at your own risk.
//
// If a member is nominated, and it doesn't respond to the nominate event and
// startup, and we lost quorum to add it, then we could be in a blocked state.
// This can be improved upon if we can call memberRemove after a timeout.
//
// Adding new cluster members very quickly, might trigger a:
// `runtime error: error validating peerURLs ... member count is unequal` error.
// See: https://github.com/etcd-io/etcd/issues/10626 for more information.
//
// If you use the dynamic size feature to start and stop the server process,
// once it has already started and then stopped, it can't be re-started because
// of a bug in etcd that doesn't free the port. Instead you'll get a:
// `bind: address already in use` error. See:
// https://github.com/etcd-io/etcd/issues/6042 for more information.
package etcd
import (
"context"
"fmt"
"net/url"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/purpleidea/mgmt/converger"
"github.com/purpleidea/mgmt/etcd/chooser"
"github.com/purpleidea/mgmt/etcd/client"
"github.com/purpleidea/mgmt/etcd/interfaces"
etcdUtil "github.com/purpleidea/mgmt/etcd/util"
"github.com/purpleidea/mgmt/util"
"github.com/purpleidea/mgmt/util/errwrap"
etcdtypes "go.etcd.io/etcd/client/pkg/v3/types"
etcd "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/client/v3/namespace"
"go.etcd.io/etcd/server/v3/embed"
)
const (
// TODO: figure out a trailing slash convention...
// NominatedPath is the unprefixed path under which nominated hosts are
// stored. This is public so that other consumers can know to avoid this
// key prefix.
NominatedPath = "/nominated/"
nominatedPathFmt = NominatedPath + "%s" // takes a hostname on the end
// VolunteerPath is the unprefixed path under which volunteering hosts
// are stored. This is public so that other consumers can know to avoid
// this key prefix.
VolunteerPath = "/volunteer/"
volunteerPathFmt = VolunteerPath + "%s" // takes a hostname on the end
// EndpointsPath is the unprefixed path under which the advertised host
// endpoints are stored. This is public so that other consumers can know
// to avoid this key prefix.
EndpointsPath = "/endpoints/"
endpointsPathFmt = EndpointsPath + "%s" // takes a hostname on the end
// ChooserPath is the unprefixed path under which the chooser algorithm
// may store data. This is public so that other consumers can know to
// avoid this key prefix.
ChooserPath = "/chooser" // all hosts share the same namespace
// ConvergedPath is the unprefixed path under which the converger
// may store data. This is public so that other consumers can know to
// avoid this key prefix.
ConvergedPath = "/converged/"
convergedPathFmt = ConvergedPath + "%s" // takes a hostname on the end
// SchedulerPath is the unprefixed path under which the scheduler
// may store data. This is public so that other consumers can know to
// avoid this key prefix.
SchedulerPath = "/scheduler/"
schedulerPathFmt = SchedulerPath + "%s" // takes a namespace on the end
// DefaultClientURL is the default value that is used for client URLs.
// It is pulled from the upstream etcd package.
DefaultClientURL = embed.DefaultListenClientURLs // 127.0.0.1:2379
// DefaultServerURL is the default value that is used for server URLs.
// It is pulled from the upstream etcd package.
DefaultServerURL = embed.DefaultListenPeerURLs // 127.0.0.1:2380
// DefaultMaxTxnOps is the maximum number of operations to run in a
// single etcd transaction. If you exceed this limit, it is possible
// that you have either an extremely large code base, or that you have
// some code which is possibly not as efficient as it could be. Let us
// know so that we can analyze the situation, and increase this if
// necessary.
DefaultMaxTxnOps = 512
// RunStartupTimeout is the amount of time we will wait for regular run
// startup before cancelling it all.
RunStartupTimeout = 30 * time.Second
// ClientDialTimeout is the DialTimeout option in the client config.
ClientDialTimeout = 5 * time.Second
// ClientDialKeepAliveTime is the DialKeepAliveTime config value for the
// etcd client. It is recommended that you use this so that dead
// endpoints don't block any cluster operations.
ClientDialKeepAliveTime = 2 * time.Second // from etcdctl
// ClientDialKeepAliveTimeout is the DialKeepAliveTimeout config value
// for the etcd client. It is recommended that you use this so that dead
// endpoints don't block any cluster operations.
ClientDialKeepAliveTimeout = 6 * time.Second // from etcdctl
// MemberChangeInterval is the polling interval to use when watching for
// member changes during add or remove.
MemberChangeInterval = 500 * time.Millisecond
// SelfRemoveTimeout gives unnominated members a chance to self exit.
SelfRemoveTimeout = 10 * time.Second
// ForceExitTimeout is the amount of time we will wait for a force exit
// to occur before cancelling it all.
ForceExitTimeout = 15 * time.Second
// SessionTTL is the number of seconds to wait before a dead or
// unresponsive host has their volunteer keys removed from the cluster.
// This should be an integer multiple of seconds, since one second is
// the TTL precision used in etcd.
SessionTTL = 10 * time.Second // seconds
// RequireLeaderCtx specifies whether the volunteer loop should use the
// WithRequireLeader ctx wrapper. It is unknown at this time if this
// would cause occasional events to be lost, more extensive testing is
// needed.
RequireLeaderCtx = false
// ConvergerHostnameNamespace is a unique key used in the converger.
ConvergerHostnameNamespace = "etcd-hostname"
)
// EmbdEtcd provides the embedded server and client etcd functionality.
type EmbdEtcd struct { // EMBeddeD etcd
Hostname string
// Seeds is the list of servers that this client could connect to.
Seeds etcdtypes.URLs
// ClientURLs are the locations to listen for clients if i am a server.
ClientURLs etcdtypes.URLs
// ServerURLs are the locations to listen for servers (peers) if i am a
// server (peer).
ServerURLs etcdtypes.URLs
// AClientURLs are the client urls to advertise.
AClientURLs etcdtypes.URLs
// AServerURLscare the server (peer) urls to advertise.
AServerURLs etcdtypes.URLs
// NoServer disables all server peering for this host.
// TODO: allow changing this at runtime with some function call?
NoServer bool
// NoNetwork causes this to use unix:// sockets instead of TCP for
// connections.
NoNetwork bool
// NoMagic turns off some things which aren't needed when used with a
// simple Seeds and NoServer option.
NoMagic bool
// Chooser is the implementation of the algorithm that decides which
// hosts to add or remove to grow and shrink the cluster.
Chooser chooser.Chooser
chooser chooser.Chooser // the one we use if it's active
// Converger is a converged coordinator object that can be used to
// track the converged state.
Converger *converger.Coordinator
// NS is a string namespace that we prefix to every key operation.
NS string
// Prefix is the directory where any etcd related state is stored. It
// must be an absolute directory path.
Prefix string
Debug bool
Logf func(format string, v ...interface{})
wg *sync.WaitGroup
exit *util.EasyExit // exit signal
closing bool // are we closing ?
hardexit *util.EasyExit // hard exit signal (to unblock borked things)
errChan chan error // global error chan, closes when Run is done
// errExit1 ... errExitN all must get closed for errChan to close.
errExit1 chan struct{} // connect
errExit2 chan struct{} // chooser
errExit3 chan struct{} // nominate
errExit4 chan struct{} // volunteer
errExit5 chan struct{} // endpoints
errExitN chan struct{} // special signal for server closing (starts/stops)
// coordinate an organized exit so we wait for everyone without blocking
activeExit1 bool
activeExit2 bool
activeExit3 bool
activeExit4 bool
activeExit5 bool
activateExit1 *util.EasyAckOnce
activateExit2 *util.EasyAckOnce
activateExit3 *util.EasyAckOnce
activateExit4 *util.EasyAckOnce
activateExit5 *util.EasyAckOnce
readySignal chan struct{} // closes when we're up and running
exitsSignal chan struct{} // closes when run exits
// locally tracked state
// nominated is a local cache of who's been nominated. This contains
// values for where a *server* would connect to. It gets updated
// primarily in the nominateCb watcher loop.
// TODO: maybe this should just be a list?
// TODO: is there a difference here between ServerURLs and AServerURLs ?
nominated etcdtypes.URLsMap // map[hostname]URLs
// volunteers is a local cache of who's volunteered. This contains
// values for where a *server* would connect to. It gets updated
// primarily in the volunteerCb watcher loop.
// TODO: maybe this should just be a list?
// TODO: is there a difference here between ServerURLs and AServerURLs ?
volunteers etcdtypes.URLsMap // map[hostname]URLs
// membermap is a local cache of server endpoints. This contains values
// for where a *server* (peer) would connect to. It gets updated in the
// membership state functions.
membermap etcdtypes.URLsMap // map[hostname]URLs
// endpoints is a local cache of server endpoints. It differs from the
// config value which is a flattened representation of the same. That
// value can be seen via client.Endpoints() and client.SetEndpoints().
// This contains values for where a *client* would connect to. It gets
// updated in the membership state functions.
endpoints etcdtypes.URLsMap // map[hostname]URLs
// memberIDs is a local cache of which cluster servers (peers) are
// associated with each memberID. It gets updated in the membership
// state functions. Note that unstarted members have an ID, but no name
// yet, so they aren't included here, since that key would be the empty
// string.
memberIDs map[string]uint64 // map[hostname]memberID
// behaviour mutexes
stateMutex *sync.RWMutex // lock around all locally tracked state
orderingMutex *sync.Mutex // lock around non-concurrent changes
nominatedMutex *sync.Mutex // lock around nominatedCb
volunteerMutex *sync.Mutex // lock around volunteerCb
// client related
etcd *etcd.Client
connectSignal chan struct{} // TODO: use a SubscribedSignal instead?
client *client.Simple // provides useful helper methods
clients []*client.Simple // list of registered clients
session *concurrency.Session // session that expires on disconnect
leaseID etcd.LeaseID // the leaseID used by this session
// server related
server *embed.Etcd // contains the server struct
serverID uint64 // uint64 because memberRemove uses that
serverwg *sync.WaitGroup // wait for server to shutdown
servermu *sync.Mutex // lock around destroy server
serverExit *util.EasyExit // exit signal
serverReadySignal *util.SubscribedSignal // signals when server is up and running
serverExitsSignal *util.SubscribedSignal // signals when runServer exits
// task queue state
taskQueue []*task
taskQueueWg *sync.WaitGroup
taskQueueLock *sync.Mutex
taskQueueRunning bool
taskQueueID int
}
// sessionTTLSec transforms the time representation into the nearest number of
// seconds, which is needed by the etcd API.
func sessionTTLSec(d time.Duration) int {
return int(d.Seconds())
}
// Validate the initial struct. This is called from Init, but can be used if you
// would like to check your configuration is correct.
func (obj *EmbdEtcd) Validate() error {
s := sessionTTLSec(SessionTTL)
if s <= 0 {
return fmt.Errorf("the SessionTTL const of %s (%d sec) must be greater than zero", SessionTTL.String(), s)
}
if s > etcd.MaxLeaseTTL {
return fmt.Errorf("the SessionTTL const of %s (%d sec) must be less than %d sec", SessionTTL.String(), s, etcd.MaxLeaseTTL)
}
if obj.Hostname == "" {
return fmt.Errorf("the Hostname was not specified")
}
if obj.NoServer && len(obj.Seeds) == 0 {
return fmt.Errorf("need at least one seed if NoServer is true")
}
if !obj.NoServer { // you don't need a Chooser if there's no server...
if obj.Chooser == nil {
return fmt.Errorf("need to specify a Chooser implementation")
}
if err := obj.Chooser.Validate(); err != nil {
return errwrap.Wrapf(err, "the Chooser did not validate")
}
}
if obj.NoNetwork {
if len(obj.Seeds) != 0 || len(obj.ClientURLs) != 0 || len(obj.ServerURLs) != 0 {
return fmt.Errorf("option NoNetwork is mutually exclusive with Seeds, ClientURLs and ServerURLs")
}
}
if obj.NoMagic && !obj.NoServer {
return fmt.Errorf("we need Magic if we're a Server")
}
if _, err := etcdUtil.CopyURLs(obj.Seeds); err != nil { // this will validate
return errwrap.Wrapf(err, "the Seeds are not valid")
}
if obj.NS == "/" {
return fmt.Errorf("the namespace should be empty instead of /")
}
if strings.HasSuffix(obj.NS, "/") {
return fmt.Errorf("the namespace should not end in /")
}
if obj.Prefix == "" || obj.Prefix == "/" {
return fmt.Errorf("the prefix of `%s` is invalid", obj.Prefix)
}
if obj.Logf == nil {
return fmt.Errorf("no Logf function was specified")
}
return nil
}
// Init initializes the struct after it has been populated as desired. You must
// not use the struct if this returns an error.
func (obj *EmbdEtcd) Init() error {
if err := obj.Validate(); err != nil {
return errwrap.Wrapf(err, "validate error")
}
if obj.ClientURLs == nil {
obj.ClientURLs = []url.URL{} // initialize
}
if obj.ServerURLs == nil {
obj.ServerURLs = []url.URL{}
}
if obj.AClientURLs == nil {
obj.AClientURLs = []url.URL{}
}
if obj.AServerURLs == nil {
obj.AServerURLs = []url.URL{}
}
curls, err := obj.curls()
if err != nil {
return err
}
surls, err := obj.surls()
if err != nil {
return err
}
if !obj.NoServer {
// add a default
if len(curls) == 0 {
u, err := url.Parse(DefaultClientURL)
if err != nil {
return err
}
obj.ClientURLs = []url.URL{*u}
}
// add a default for local use and testing, harmless and useful!
if len(surls) == 0 {
u, err := url.Parse(DefaultServerURL) // default
if err != nil {
return err
}
obj.ServerURLs = []url.URL{*u}
}
// TODO: if we don't have any localhost URLs, should we warn so
// that our local client can be able to connect more easily?
if len(etcdUtil.LocalhostURLs(obj.ClientURLs)) == 0 {
u, err := url.Parse(DefaultClientURL)
if err != nil {
return err
}
obj.ClientURLs = append([]url.URL{*u}, obj.ClientURLs...) // prepend
}
}
if obj.NoNetwork {
var err error
// FIXME: convince etcd to store these files in our obj.Prefix!
obj.ClientURLs, err = etcdtypes.NewURLs([]string{"unix://clients.sock:0"})
if err != nil {
return err
}
obj.ServerURLs, err = etcdtypes.NewURLs([]string{"unix://servers.sock:0"})
if err != nil {
return err
}
}
if !obj.NoServer { // you don't need a Chooser if there's no server...
data := &chooser.Data{
Hostname: obj.Hostname,
Debug: obj.Debug,
Logf: func(format string, v ...interface{}) {
obj.Logf("chooser: "+format, v...)
},
}
obj.chooser = obj.Chooser // copy
if err := obj.chooser.Init(data); err != nil {
return errwrap.Wrapf(err, "error initializing chooser")
}
}
if err := os.MkdirAll(obj.Prefix, 0770); err != nil {
return errwrap.Wrapf(err, "couldn't mkdir: %s", obj.Prefix)
}
obj.wg = &sync.WaitGroup{}
obj.exit = util.NewEasyExit()
obj.hardexit = util.NewEasyExit()
obj.errChan = make(chan error)
obj.errExit1 = make(chan struct{})
obj.errExit2 = make(chan struct{})
obj.errExit3 = make(chan struct{})
obj.errExit4 = make(chan struct{})
obj.errExit5 = make(chan struct{})
obj.errExitN = make(chan struct{}) // done before call to runServer!
close(obj.errExitN) // starts closed
//obj.activeExit1 = false
//obj.activeExit2 = false
//obj.activeExit3 = false
//obj.activeExit4 = false
//obj.activeExit5 = false
obj.activateExit1 = util.NewEasyAckOnce()
obj.activateExit2 = util.NewEasyAckOnce()
obj.activateExit3 = util.NewEasyAckOnce()
obj.activateExit4 = util.NewEasyAckOnce()
obj.activateExit5 = util.NewEasyAckOnce()
obj.readySignal = make(chan struct{})
obj.exitsSignal = make(chan struct{})
// locally tracked state
obj.nominated = make(etcdtypes.URLsMap)
obj.volunteers = make(etcdtypes.URLsMap)
obj.membermap = make(etcdtypes.URLsMap)
obj.endpoints = make(etcdtypes.URLsMap)
obj.memberIDs = make(map[string]uint64)
// behaviour mutexes
obj.stateMutex = &sync.RWMutex{}
// TODO: I'm not sure if orderingMutex is actually required or not...
obj.orderingMutex = &sync.Mutex{}
obj.nominatedMutex = &sync.Mutex{}
obj.volunteerMutex = &sync.Mutex{}
// client related
obj.connectSignal = make(chan struct{})
obj.clients = []*client.Simple{}
// server related
obj.serverwg = &sync.WaitGroup{}
obj.servermu = &sync.Mutex{}
obj.serverExit = util.NewEasyExit() // is reset after destroyServer exit
obj.serverReadySignal = &util.SubscribedSignal{}
obj.serverExitsSignal = &util.SubscribedSignal{}
// task queue state
obj.taskQueue = []*task{}
obj.taskQueueWg = &sync.WaitGroup{}
obj.taskQueueLock = &sync.Mutex{}
return nil
}
// Close cleans up after you are done using the struct.
func (obj *EmbdEtcd) Close() error {
var reterr error
if obj.chooser != nil {
reterr = errwrap.Append(reterr, obj.chooser.Close())
}
return reterr
}
// curls returns the client urls that we should use everywhere except for
// locally, where we prefer to use the non-advertised perspective.
func (obj *EmbdEtcd) curls() (etcdtypes.URLs, error) {
// TODO: do we need the copy?
if len(obj.AClientURLs) > 0 {
return etcdUtil.CopyURLs(obj.AClientURLs)
}
return etcdUtil.CopyURLs(obj.ClientURLs)
}
// surls returns the server (peer) urls that we should use everywhere except for
// locally, where we prefer to use the non-advertised perspective.
func (obj *EmbdEtcd) surls() (etcdtypes.URLs, error) {
// TODO: do we need the copy?
if len(obj.AServerURLs) > 0 {
return etcdUtil.CopyURLs(obj.AServerURLs)
}
return etcdUtil.CopyURLs(obj.ServerURLs)
}
// err is an error helper that sends to the errChan.
func (obj *EmbdEtcd) err(err error) {
select {
case obj.errChan <- err:
}
}
// Run is the main entry point to kick off the embedded etcd client and server.
// It blocks until we've exited for shutdown. The shutdown can be triggered by
// calling Destroy.
func (obj *EmbdEtcd) Run() error {
curls, err := obj.curls()
if err != nil {
return err
}
surls, err := obj.surls()
if err != nil {
return err
}
exitCtx := obj.exit.Context() // local exit signal
obj.Logf("running...")
defer obj.Logf("exited!")
wg := &sync.WaitGroup{}
defer wg.Wait()
defer close(obj.exitsSignal)
defer obj.wg.Wait()
defer obj.exit.Done(nil) // unblock anything waiting for exit...
startupCtx, cancel := context.WithTimeout(exitCtx, RunStartupTimeout)
defer cancel()
defer obj.Logf("waiting for exit cleanup...") // TODO: is this useful?
// After we trigger a hardexit, wait for the ForceExitTimeout and then
// cancel any remaining stuck context's. This helps prevent angry users.
unblockCtx, runTimeout := context.WithCancel(context.Background())
defer runTimeout()
wg.Add(1)
go func() {
defer wg.Done()
defer runTimeout()
select {
case <-obj.hardexit.Signal(): // bork unblocker
case <-obj.exitsSignal:
}
select {
case <-time.After(ForceExitTimeout):
case <-obj.exitsSignal:
}
}()
// main loop exit signal
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
// when all the senders on errChan have exited, we can exit too
defer close(obj.errChan)
// these signals guard around the errChan close operation
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
// We wait here until we're notified to know whether or
// not this particular exit signal will be relevant...
// This is because during some runs, we might not use
// all of the signals, therefore we don't want to wait
// for them!
select {
case <-obj.activateExit1.Wait():
case <-exitCtx.Done():
}
if !obj.activeExit1 {
return
}
select {
case <-obj.errExit1:
if obj.Debug {
obj.Logf("exited connect loop (1)")
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-obj.activateExit2.Wait():
case <-exitCtx.Done():
}
if !obj.activeExit2 {
return
}
select {
case <-obj.errExit2:
if obj.Debug {
obj.Logf("exited chooser loop (2)")
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-obj.activateExit3.Wait():
case <-exitCtx.Done():
}
if !obj.activeExit3 {
return
}
select {
case <-obj.errExit3:
if obj.Debug {
obj.Logf("exited nominate loop (3)")
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-obj.activateExit4.Wait():
case <-exitCtx.Done():
}
if !obj.activeExit4 {
return
}
select {
case <-obj.errExit4:
if obj.Debug {
obj.Logf("exited volunteer loop (4)")
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-obj.activateExit5.Wait():
case <-exitCtx.Done():
}
if !obj.activeExit5 {
return
}
select {
case <-obj.errExit5:
if obj.Debug {
obj.Logf("exited endpoints loop (5)")
}
}
}()
wg.Wait() // wait for all the other exit signals before this one
select {
case <-obj.errExitN: // last one is for server (it can start/stop)
if obj.Debug {
obj.Logf("exited server loop (0)")
}
}
}()
// main loop
var reterr error
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
Loop:
for {
select {
case err, ok := <-obj.errChan:
if !ok { // when this closes, we can shutdown
break Loop
}
if err == nil {
err = fmt.Errorf("unexpected nil error")
}
obj.Logf("runtime error: %+v", err)
if reterr == nil { // happens only once
obj.exit.Done(err) // trigger an exit in Run!
}
reterr = errwrap.Append(reterr, err)
}
}
}()
bootstrapping := len(obj.Seeds) == 0 // we're the first, start a server!
canServer := !obj.NoServer
// Opportunistic "connect events" system, so that we can connect
// promiscuously when it's needed, instead of needing to linearize code.
obj.activeExit1 = true // activate errExit1
obj.activateExit1.Ack()
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
defer close(obj.errExit1) // multi-signal for errChan close op
if bootstrapping {
serverReady, ackReady := obj.ServerReady() // must call ack!
serverExited, ackExited := obj.ServerExited() // must call ack!
select {
case <-serverReady:
ackReady() // must be called
ackExited() // must be called
case <-serverExited:
ackExited() // must be called
ackReady() // must be called
// send an error in case server doesn't
// TODO: do we want this error to be sent?
obj.err(fmt.Errorf("server exited early"))
return
case <-obj.exit.Signal(): // exit early on exit signal
ackReady() // must be called
ackExited() // must be called
return
}
}
// Connect here. If we're bootstrapping, the server came up
// right above us. No need to add to our endpoints manually,
// that is done for us in the server start method.
if err := obj.connect(); err != nil {
obj.err(errwrap.Wrapf(err, "error during client connect"))
return
}
obj.client = client.NewClientFromClient(obj.etcd)
obj.client.Debug = obj.Debug
obj.client.Logf = func(format string, v ...interface{}) {
obj.Logf("client: "+format, v...)
}
if err := obj.client.Init(); err != nil {
obj.err(errwrap.Wrapf(err, "error during client init"))
return
}
// Build a session for making leases that expire on disconnect!
options := []concurrency.SessionOption{
concurrency.WithTTL(sessionTTLSec(SessionTTL)),
}
if obj.leaseID > 0 { // in the odd chance we ever do reconnects
options = append(options, concurrency.WithLease(obj.leaseID))
}
obj.session, err = concurrency.NewSession(obj.etcd, options...)
if err != nil {
obj.err(errwrap.Wrapf(err, "could not create session"))
return
}
obj.leaseID = obj.session.Lease()
obj.Logf("connected!")
if !bootstrapping { // new clients need an initial state sync...
if err := obj.memberStateFromList(startupCtx); err != nil {
obj.err(errwrap.Wrapf(err, "error during initial state sync"))
return
}
}
close(obj.connectSignal)
}()
defer func() {
if obj.session != nil {
obj.session.Close() // this revokes the lease...
}
// run cleanup functions in reverse order
for i := len(obj.clients) - 1; i >= 0; i-- {
obj.clients[i].Close() // ignore errs
}
if obj.client != nil { // in case we bailed out early
obj.client.Close() // ignore err, but contains wg.Wait()
}
if obj.etcd == nil { // in case we bailed out early
return
}
obj.disconnect()
obj.Logf("disconnected!")
//close(obj.disconnectSignal)
}()
obj.Logf("watching chooser...")
chooserChan := make(chan error)
obj.activeExit2 = true // activate errExit2
obj.activateExit2.Ack()
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
defer close(obj.errExit2) // multi-signal for errChan close op
if obj.chooser == nil {
return
}
// wait till we're connected
select {
case <-obj.connectSignal:
case <-exitCtx.Done():
return // run exited early
}
p := obj.NS + ChooserPath
c, err := obj.MakeClientFromNamespace(p)
if err != nil {
obj.err(errwrap.Wrapf(err, "error during chooser init"))
return
}
if err := obj.chooser.Connect(exitCtx, c); err != nil {
obj.err(errwrap.Wrapf(err, "error during chooser connect"))
return
}
ch, err := obj.chooser.Watch()
if err != nil {
obj.err(errwrap.Wrapf(err, "error running chooser watch"))
return
}
chooserChan = ch // watch it
}()
defer func() {
if obj.chooser == nil {
return
}
obj.chooser.Disconnect() // ignore error if any
}()
// call this once to start the server so we'll be able to connect
if bootstrapping {
obj.Logf("bootstrapping...")
obj.volunteers[obj.Hostname] = surls // bootstrap this!
obj.nominated[obj.Hostname] = surls
// alternatively we can bootstrap like this if we add more stuff...
//data := bootstrapWatcherData(obj.Hostname, surls) // server urls
//if err := obj.nominateApply(data); err != nil { // fake apply
// return err
//}
// server starts inside here if bootstrapping!
if err := obj.nominateCb(startupCtx); err != nil {
// If while bootstrapping a new server, an existing one
// is running on the same port, then we error this here.
return err
}
// wait till we're connected
select {
case <-obj.connectSignal:
case <-exitCtx.Done():
// TODO: should this return an error?
return nil // run exited early
}
// advertise our new endpoint (comes paired after nominateCb)
if err := obj.advertise(startupCtx, obj.Hostname, curls); err != nil { // endpoints
return errwrap.Wrapf(err, "error with endpoints advertise")
}
// run to add entry into our public nominated datastructure
// FIXME: this might be redundant, but probably not harmful in
// our bootstrapping process... it will get done in volunteerCb
if err := obj.nominate(startupCtx, obj.Hostname, surls); err != nil {
return errwrap.Wrapf(err, "error nominating self")
}
}
// If obj.NoServer, then we don't need to start up the nominate watcher,
// unless we're the first server... But we check that both are not true!
if bootstrapping || canServer {
if !bootstrapping && canServer { // wait for client!
select {
case <-obj.connectSignal:
case <-exitCtx.Done():
return nil // just exit
}
}
ctx, cancel := context.WithCancel(unblockCtx)
defer cancel()
info, err := obj.client.ComplexWatcher(ctx, obj.NS+NominatedPath, etcd.WithPrefix())
if err != nil {
obj.activateExit3.Ack()
return errwrap.Wrapf(err, "error adding nominated watcher")
}
obj.Logf("watching nominees...")
obj.activeExit3 = true // activate errExit3
obj.activateExit3.Ack()
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
defer close(obj.errExit3) // multi-signal for errChan close op
defer cancel()
for {
var event *interfaces.WatcherData
var ok bool
select {
case event, ok = <-info.Events:
if !ok {
return
}
}
if err := event.Err; err != nil {
obj.err(errwrap.Wrapf(err, "nominated watcher errored"))
continue
}
// on the initial created event, we populate...
if !bootstrapping && event.Created && len(event.Events) == 0 {
obj.Logf("populating nominated list...")
nominated, err := obj.getNominated(ctx)
if err != nil {
obj.err(errwrap.Wrapf(err, "get nominated errored"))
continue
}
obj.nominated = nominated
} else if err := obj.nominateApply(event); err != nil {
obj.err(errwrap.Wrapf(err, "nominate apply errored"))
continue
}
// decide the desired state before we change it
doStop := obj.serverAction(serverActionStop)
doStart := obj.serverAction(serverActionStart)
// server is running, but it should not be
if doStop { // stop?
// first, un advertise client urls
// TODO: should this cause destroy server instead? does it already?
if err := obj.advertise(ctx, obj.Hostname, nil); err != nil { // remove me
obj.err(errwrap.Wrapf(err, "error with endpoints unadvertise"))
continue
}
}
// runServer gets started in a goroutine here...
err := obj.nominateCb(ctx)
if obj.Debug {
obj.Logf("nominateCb: %+v", err)
}
if doStart { // start?
if err := obj.advertise(ctx, obj.Hostname, curls); err != nil { // add one
obj.err(errwrap.Wrapf(err, "error with endpoints advertise"))
continue
}
}
if err == interfaces.ErrShutdown {
if obj.Debug {
obj.Logf("nominated watcher shutdown")
}
return
}
if err == nil {
continue
}
obj.err(errwrap.Wrapf(err, "nominated watcher callback errored"))
continue
}
}()
defer func() {
// wait for unnominate of self to be seen...
select {
case <-obj.errExit3:
case <-obj.hardexit.Signal(): // bork unblocker
obj.Logf("unblocked unnominate signal")
// now unblock the server in case it's running!
if err := obj.destroyServer(); err != nil { // sync until exited
obj.err(errwrap.Wrapf(err, "destroyServer errored"))
return
}
}
}()
defer func() {
// wait for volunteer loop to exit
select {
case <-obj.errExit4:
}
}()
}
obj.activateExit3.Ack()
// volunteering code (volunteer callback and initial volunteering)
if !obj.NoServer && len(obj.ServerURLs) > 0 {
ctx, cancel := context.WithCancel(unblockCtx)
defer cancel() // cleanup on close...
info, err := obj.client.ComplexWatcher(ctx, obj.NS+VolunteerPath, etcd.WithPrefix())
if err != nil {
obj.activateExit4.Ack()
return errwrap.Wrapf(err, "error adding volunteer watcher")
}
unvolunteered := make(chan struct{})
obj.Logf("watching volunteers...")
obj.wg.Add(1)
obj.activeExit4 = true // activate errExit4
obj.activateExit4.Ack()
go func() {
defer obj.wg.Done()
defer close(obj.errExit4) // multi-signal for errChan close op
for {
var event *interfaces.WatcherData
var ok bool
select {
case event, ok = <-info.Events:
if !ok {
return
}
if err := event.Err; err != nil {
obj.err(errwrap.Wrapf(err, "volunteer watcher errored"))
continue
}
case chooserEvent, ok := <-chooserChan:
if !ok {
obj.Logf("got chooser shutdown...")
chooserChan = nil // done here!
continue
}
if chooserEvent != nil {
obj.err(errwrap.Wrapf(err, "chooser watcher errored"))
continue
}
obj.Logf("got chooser event...")
event = nil // pass through the apply...
// chooser events should poke volunteerCb
}
_, exists1 := obj.volunteers[obj.Hostname] // before
// on the initial created event, we populate...
if !bootstrapping && event != nil && event.Created && len(event.Events) == 0 {
obj.Logf("populating volunteers list...")
volunteers, err := obj.getVolunteers(ctx)
if err != nil {
obj.err(errwrap.Wrapf(err, "get volunteers errored"))
continue
}
// TODO: do we need to add ourself?
//_, exists := volunteers[obj.Hostname]
//if !exists {
// volunteers[obj.Hostname] = surls
//}
obj.volunteers = volunteers
} else if err := obj.volunteerApply(event); event != nil && err != nil {
obj.err(errwrap.Wrapf(err, "volunteer apply errored"))
continue
}
_, exists2 := obj.volunteers[obj.Hostname] // after
err := obj.volunteerCb(ctx)
if err == nil {
// it was there, and it got removed
if exists1 && !exists2 {
close(unvolunteered)
}
continue
}
obj.err(errwrap.Wrapf(err, "volunteer watcher callback errored"))
continue
}
}()
defer func() {
// wait for unvolunteer of self to be seen...
select {
case <-unvolunteered:
case <-obj.hardexit.Signal(): // bork unblocker
obj.Logf("unblocked unvolunteer signal")
}
}()
// self volunteer
obj.Logf("volunteering...")
surls, err := obj.surls()
if err != nil {
return err
}
if err := obj.volunteer(ctx, surls); err != nil {
return err
}
defer obj.volunteer(ctx, nil) // unvolunteer
defer obj.Logf("unvolunteering...")
defer func() {
// Move the leader if I'm it, so that the member remove
// chooser operation happens on a different member than
// myself. A leaving member should not decide its fate.
member, err := obj.moveLeaderSomewhere(ctx)
if err != nil {
// TODO: use obj.err ?
obj.Logf("move leader failed with: %+v", err)
return
}
if member != "" {
obj.Logf("moved leader to: %s", member)
}
}()
}
obj.activateExit4.Ack()
// startup endpoints watcher (to learn about other servers)
if !obj.NoMagic { // disable possibly buggy code for now
ctx, cancel := context.WithCancel(unblockCtx)
defer cancel() // cleanup on close...
if err := obj.runEndpoints(ctx); err != nil {
obj.activateExit5.Ack()
return err
}
}
obj.activateExit5.Ack()
// We don't set state, we only watch others, so nothing to defer close!
if obj.Converger != nil {
obj.Converger.AddStateFn(ConvergerHostnameNamespace, func(converged bool) error {
// send our individual state into etcd for others to see
// TODO: what should happen on error?
return obj.setHostnameConverged(exitCtx, obj.Hostname, converged)
})
defer obj.Converger.RemoveStateFn(ConvergerHostnameNamespace)
}
// NOTE: Add anything else we want to start up here...
// If we get all the way down here, *and* we're connected, we're ready!
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
select {
case <-obj.connectSignal:
close(obj.readySignal) // we're ready to be used now...
case <-exitCtx.Done():
}
}()
select {
case <-exitCtx.Done():
}
obj.closing = true // flag to let nominateCb know we're shutting down...
// kick off all the defer()'s....
return reterr
}
// runEndpoints is a helper function to move all of this code into a new block.
func (obj *EmbdEtcd) runEndpoints(ctx context.Context) error {
bootstrapping := len(obj.Seeds) == 0
select {
case <-obj.connectSignal:
case <-ctx.Done():
return nil // TODO: just exit ?
}
info, err := obj.client.ComplexWatcher(ctx, obj.NS+EndpointsPath, etcd.WithPrefix())
if err != nil {
obj.activateExit5.Ack()
return errwrap.Wrapf(err, "error adding endpoints watcher")
}
obj.Logf("watching endpoints...")
obj.wg.Add(1)
obj.activeExit5 = true // activate errExit5
obj.activateExit5.Ack()
go func() {
defer obj.wg.Done()
defer close(obj.errExit5) // multi-signal for errChan close op
for {
var event *interfaces.WatcherData
var ok bool
select {
case event, ok = <-info.Events:
if !ok {
return
}
if err := event.Err; err != nil {
obj.err(errwrap.Wrapf(err, "endpoints watcher errored"))
continue
}
}
// on the initial created event, we populate...
if !bootstrapping && event.Created && len(event.Events) == 0 {
obj.Logf("populating endpoints list...")
endpoints, err := obj.getEndpoints(ctx)
if err != nil {
obj.err(errwrap.Wrapf(err, "get endpoints errored"))
continue
}
// If we're using an external etcd server, then
// we would not initially see any endpoints, and
// we'd erase our knowledge of our single
// endpoint, which would deadlock our etcd
// client. Instead, skip updates that take our
// list down to zero endpoints. We maintain this
// watch in case someone (an mgmt resource that
// is managing the etcd cluster for example?)
// wants to set these values for all of the mgmt
// clients connected to the etcd server pool.
if obj.NoServer && len(endpoints) == 0 && len(obj.endpoints) > 0 {
if obj.Debug {
obj.Logf("had %d endpoints: %+v", len(obj.endpoints), obj.endpoints)
obj.Logf("got %d endpoints: %+v", len(endpoints), endpoints)
}
obj.Logf("skipping endpoints update")
continue
}
obj.endpoints = endpoints
obj.setEndpoints()
} else if err := obj.endpointApply(event); err != nil {
obj.err(errwrap.Wrapf(err, "endpoint apply errored"))
continue
}
// there is no endpoint callback necessary
// TODO: do we need this member state sync?
if err := obj.memberStateFromList(ctx); err != nil {
obj.err(errwrap.Wrapf(err, "error during endpoint state sync"))
continue
}
}
}()
return nil
}
// Destroy cleans up the entire embedded etcd system. Use DestroyServer if you
// only want to shutdown the embedded server portion.
func (obj *EmbdEtcd) Destroy() error {
obj.Logf("destroy...")
obj.exit.Done(nil) // trigger an exit in Run!
reterr := obj.exit.Error() // wait for exit signal (block until arrival)
obj.wg.Wait()
return reterr
}
// Interrupt causes this member to force shutdown. It does not safely wait for
// an ordered shutdown. It is not recommended to use this unless you're borked.
func (obj *EmbdEtcd) Interrupt() error {
obj.Logf("interrupt...")
wg := &sync.WaitGroup{}
var err error
wg.Add(1)
go func() {
defer wg.Done()
err = obj.Destroy() // set return error
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-obj.exit.Signal(): // wait for Destroy to run first
}
obj.hardexit.Done(nil) // trigger a hard exit
}()
wg.Wait()
return err
}
// Ready returns a channel that closes when we're up and running. This process
// happens when calling Run. If Run is never called, this will never happen. Our
// main startup must be running, and our client must be connected to get here.
func (obj *EmbdEtcd) Ready() <-chan struct{} { return obj.readySignal }
// Exited returns a channel that closes when we've destroyed. This process
// happens after Run exits. If Run is never called, this will never happen.
func (obj *EmbdEtcd) Exited() <-chan struct{} { return obj.exitsSignal }
// config returns the config struct to be used during the etcd client connect.
func (obj *EmbdEtcd) config() etcd.Config {
// FIXME: filter out any urls which wouldn't resolve ?
endpoints := etcdUtil.FromURLsMapToStringList(obj.endpoints) // flatten map
// We don't need to do any sort of priority sort here, since for initial
// connect we'd be the only one, so it doesn't matter, and subsequent
// changes are made with SetEndpoints, not here, so we never need to
// prioritize our local endpoint.
sort.Strings(endpoints) // sort for determinism
if len(endpoints) == 0 { // initially, we need to use the defaults...
for _, u := range obj.Seeds {
endpoints = append(endpoints, u.String())
}
}
// XXX: connect to our local obj.ClientURLs instead of obj.AClientURLs ?
cfg := etcd.Config{
Endpoints: endpoints, // eg: []string{"http://254.0.0.1:12345"}
// RetryDialer chooses the next endpoint to use, and comes with
// a default dialer if unspecified.
DialTimeout: ClientDialTimeout,
// I think the keepalive stuff is needed for endpoint health.
DialKeepAliveTime: ClientDialKeepAliveTime,
DialKeepAliveTimeout: ClientDialKeepAliveTimeout,
// 0 disables auto-sync. By default auto-sync is disabled.
AutoSyncInterval: 0, // we do something equivalent ourselves
}
return cfg
}
// connect connects the client to a server. If we are the first peer, then that
// server is itself.
func (obj *EmbdEtcd) connect() error {
obj.Logf("connect...")
if obj.etcd != nil {
return fmt.Errorf("already connected")
}
cfg := obj.config() // get config
var err error
obj.etcd, err = etcd.New(cfg) // connect!
return err
}
// disconnect closes the etcd connection.
func (obj *EmbdEtcd) disconnect() error {
obj.Logf("disconnect...")
if obj.etcd == nil {
return fmt.Errorf("already disconnected")
}
return obj.etcd.Close()
}
// MakeClient returns an etcd Client interface that is suitable for basic tasks.
// Don't run this until the Ready method has acknowledged.
func (obj *EmbdEtcd) MakeClient() (interfaces.Client, error) {
c := client.NewClientFromClient(obj.etcd)
if err := c.Init(); err != nil {
return nil, err
}
obj.clients = append(obj.clients, c) // make sure to clean up after...
return c, nil
}
// MakeClientFromNamespace returns an etcd Client interface that is suitable for
// basic tasks and that has a key namespace prefix. // Don't run this until the
// Ready method has acknowledged.
func (obj *EmbdEtcd) MakeClientFromNamespace(ns string) (interfaces.Client, error) {
kv := namespace.NewKV(obj.etcd.KV, ns)
w := namespace.NewWatcher(obj.etcd.Watcher, ns)
c := client.NewClientFromNamespace(obj.etcd, kv, w)
if err := c.Init(); err != nil {
return nil, err
}
obj.clients = append(obj.clients, c) // make sure to clean up after...
return c, nil
}