1772 lines
60 KiB
Go
1772 lines
60 KiB
Go
// Mgmt
|
|
// Copyright (C) 2013-2017+ James Shubin and the project contributors
|
|
// Written by James Shubin <james@shubin.ca> and the project contributors
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
// TODO: Add TTL's (eg: volunteering)
|
|
// TODO: Remove race around leader operations
|
|
// TODO: Fix server reuse issue (bind: address already in use)
|
|
// TODO: Fix unstarted member
|
|
// TODO: Fix excessive StartLoop/FinishLoop
|
|
// TODO: Add VIP for servers (incorporate with net resource)
|
|
// TODO: Auto assign ports/ip's for peers (if possible)
|
|
// TODO: Fix godoc
|
|
|
|
// Package etcd implements the distributed key value store integration.
|
|
// This also takes care of managing and clustering the embedded etcd server.
|
|
// The elastic etcd algorithm works in the following way:
|
|
// * When you start up mgmt, you can pass it a list of seeds.
|
|
// * If no seeds are given, then assume you are the first server and startup.
|
|
// * If a seed is given, connect as a client, and optionally volunteer to be a server.
|
|
// * All volunteering clients should listen for a message from the master for nomination.
|
|
// * If a client has been nominated, it should startup a server.
|
|
// * All servers should list for their nomination to be removed and shutdown if so.
|
|
// * The elected leader should decide who to nominate/unnominate to keep the right number of servers.
|
|
//
|
|
// Smoke testing:
|
|
// mkdir /tmp/mgmt{A..E}
|
|
// ./mgmt run --yaml examples/etcd1a.yaml --hostname h1 --tmp-prefix --no-pgp
|
|
// ./mgmt run --yaml examples/etcd1b.yaml --hostname h2 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2381 --server-urls http://127.0.0.1:2382
|
|
// ./mgmt run --yaml examples/etcd1c.yaml --hostname h3 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2383 --server-urls http://127.0.0.1:2384
|
|
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/idealClusterSize 3
|
|
// ./mgmt run --yaml examples/etcd1d.yaml --hostname h4 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2385 --server-urls http://127.0.0.1:2386
|
|
// ./mgmt run --yaml examples/etcd1e.yaml --hostname h5 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2387 --server-urls http://127.0.0.1:2388
|
|
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list
|
|
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/idealClusterSize 5
|
|
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list
|
|
package etcd
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"math"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/purpleidea/mgmt/converger"
|
|
"github.com/purpleidea/mgmt/event"
|
|
"github.com/purpleidea/mgmt/util"
|
|
|
|
etcd "github.com/coreos/etcd/clientv3" // "clientv3"
|
|
"github.com/coreos/etcd/embed"
|
|
"github.com/coreos/etcd/etcdserver"
|
|
rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
etcdtypes "github.com/coreos/etcd/pkg/types"
|
|
raft "github.com/coreos/etcd/raft"
|
|
context "golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// constant parameters which may need to be tweaked or customized
|
|
const (
|
|
NS = "_mgmt" // root namespace for mgmt operations
|
|
seedSentinel = "_seed" // you must not name your hostname this
|
|
MaxStartServerTimeout = 60 // max number of seconds to wait for server to start
|
|
MaxStartServerRetries = 3 // number of times to retry starting the etcd server
|
|
maxClientConnectRetries = 5 // number of times to retry consecutive connect failures
|
|
selfRemoveTimeout = 3 // give unnominated members a chance to self exit
|
|
exitDelay = 3 // number of sec of inactivity after exit to clean up
|
|
DefaultIdealClusterSize = 5 // default ideal cluster size target for initial seed
|
|
DefaultClientURL = "127.0.0.1:2379"
|
|
DefaultServerURL = "127.0.0.1:2380"
|
|
)
|
|
|
|
var (
|
|
errApplyDeltaEventsInconsistent = errors.New("inconsistent key in ApplyDeltaEvents")
|
|
)
|
|
|
|
// AW is a struct for the AddWatcher queue.
|
|
type AW struct {
|
|
path string
|
|
opts []etcd.OpOption
|
|
callback func(*RE) error
|
|
errCheck bool
|
|
skipConv bool // ask event to skip converger updates
|
|
resp event.Resp
|
|
cancelFunc func() // data
|
|
}
|
|
|
|
// RE is a response + error struct since these two values often occur together.
|
|
// This is now called an event with the move to the etcd v3 API.
|
|
type RE struct {
|
|
response etcd.WatchResponse
|
|
path string
|
|
err error
|
|
callback func(*RE) error
|
|
errCheck bool // should we check the error of the callback?
|
|
skipConv bool // event skips converger updates
|
|
retryHint bool // set to true for one event after a watcher failure
|
|
retries uint // number of times we've retried on error
|
|
}
|
|
|
|
// KV is a key + value struct to hold the two items together.
|
|
type KV struct {
|
|
key string
|
|
value string
|
|
opts []etcd.OpOption
|
|
resp event.Resp
|
|
}
|
|
|
|
// GQ is a struct for the get queue.
|
|
type GQ struct {
|
|
path string
|
|
skipConv bool
|
|
opts []etcd.OpOption
|
|
resp event.Resp
|
|
data map[string]string
|
|
}
|
|
|
|
// DL is a struct for the delete queue.
|
|
type DL struct {
|
|
path string
|
|
opts []etcd.OpOption
|
|
resp event.Resp
|
|
data int64
|
|
}
|
|
|
|
// TN is a struct for the txn queue.
|
|
type TN struct {
|
|
ifcmps []etcd.Cmp
|
|
thenops []etcd.Op
|
|
elseops []etcd.Op
|
|
resp event.Resp
|
|
data *etcd.TxnResponse
|
|
}
|
|
|
|
// Flags are some constant flags which are used throughout the program.
|
|
type Flags struct {
|
|
Debug bool // add additional log messages
|
|
Trace bool // add execution flow log messages
|
|
Verbose bool // add extra log message output
|
|
}
|
|
|
|
// EmbdEtcd provides the embedded server and client etcd functionality.
|
|
type EmbdEtcd struct { // EMBeddeD etcd
|
|
// etcd client connection related
|
|
cLock sync.Mutex // client connect lock
|
|
rLock sync.RWMutex // client reconnect lock
|
|
client *etcd.Client
|
|
cError error // permanent client error
|
|
ctxErr error // permanent ctx error
|
|
|
|
// exit and cleanup related
|
|
cancelLock sync.Mutex // lock for the cancels list
|
|
cancels []func() // array of every cancel function for watches
|
|
exiting bool
|
|
exitchan chan struct{}
|
|
exitTimeout <-chan time.Time
|
|
|
|
hostname string
|
|
memberID uint64 // cluster membership id of server if running
|
|
endpoints etcdtypes.URLsMap // map of servers a client could connect to
|
|
clientURLs etcdtypes.URLs // locations to listen for clients if i am a server
|
|
serverURLs etcdtypes.URLs // locations to listen for servers if i am a server (peer)
|
|
noServer bool // disable all server peering if true
|
|
|
|
// local tracked state
|
|
nominated etcdtypes.URLsMap // copy of who's nominated to locally track state
|
|
lastRevision int64 // the revision id of message being processed
|
|
idealClusterSize uint16 // ideal cluster size
|
|
|
|
// etcd channels
|
|
awq chan *AW // add watch queue
|
|
wevents chan *RE // response+error
|
|
setq chan *KV // set queue
|
|
getq chan *GQ // get queue
|
|
delq chan *DL // delete queue
|
|
txnq chan *TN // txn queue
|
|
|
|
flags Flags
|
|
prefix string // folder prefix to use for misc storage
|
|
converger converger.Converger // converged tracking
|
|
|
|
// etcd server related
|
|
serverwg sync.WaitGroup // wait for server to shutdown
|
|
server *embed.Etcd // technically this contains the server struct
|
|
dataDir string // our data dir, prefix + "etcd"
|
|
serverReady chan struct{} // closes when ready
|
|
}
|
|
|
|
// NewEmbdEtcd creates the top level embedded etcd struct client and server obj.
|
|
func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, flags Flags, prefix string, converger converger.Converger) *EmbdEtcd {
|
|
endpoints := make(etcdtypes.URLsMap)
|
|
if hostname == seedSentinel { // safety
|
|
return nil
|
|
}
|
|
if len(seeds) > 0 {
|
|
endpoints[seedSentinel] = seeds
|
|
idealClusterSize = 0 // unset, get from running cluster
|
|
}
|
|
obj := &EmbdEtcd{
|
|
exitchan: make(chan struct{}), // exit signal for main loop
|
|
exitTimeout: nil,
|
|
awq: make(chan *AW),
|
|
wevents: make(chan *RE),
|
|
setq: make(chan *KV),
|
|
getq: make(chan *GQ),
|
|
delq: make(chan *DL),
|
|
txnq: make(chan *TN),
|
|
|
|
nominated: make(etcdtypes.URLsMap),
|
|
|
|
hostname: hostname,
|
|
endpoints: endpoints,
|
|
clientURLs: clientURLs,
|
|
serverURLs: serverURLs,
|
|
noServer: noServer,
|
|
|
|
idealClusterSize: idealClusterSize,
|
|
converger: converger,
|
|
flags: flags,
|
|
prefix: prefix,
|
|
dataDir: path.Join(prefix, "etcd"),
|
|
serverReady: make(chan struct{}),
|
|
}
|
|
// TODO: add some sort of auto assign method for picking these defaults
|
|
// add a default so that our local client can connect locally if needed
|
|
if len(obj.LocalhostClientURLs()) == 0 { // if we don't have any localhost URLs
|
|
u := url.URL{Scheme: "http", Host: DefaultClientURL} // default
|
|
obj.clientURLs = append([]url.URL{u}, obj.clientURLs...) // prepend
|
|
}
|
|
|
|
// add a default for local use and testing, harmless and useful!
|
|
if !obj.noServer && len(obj.serverURLs) == 0 {
|
|
if len(obj.endpoints) > 0 {
|
|
obj.noServer = true // we didn't have enough to be a server
|
|
}
|
|
u := url.URL{Scheme: "http", Host: DefaultServerURL} // default
|
|
obj.serverURLs = []url.URL{u}
|
|
}
|
|
|
|
return obj
|
|
}
|
|
|
|
// GetConfig returns the config struct to be used for the etcd client connect.
|
|
func (obj *EmbdEtcd) GetConfig() etcd.Config {
|
|
endpoints := []string{}
|
|
// XXX: filter out any urls which wouldn't resolve here ?
|
|
for _, eps := range obj.endpoints { // flatten map
|
|
for _, u := range eps {
|
|
endpoints = append(endpoints, u.Host) // remove http:// prefix
|
|
}
|
|
}
|
|
sort.Strings(endpoints) // sort for determinism
|
|
cfg := etcd.Config{
|
|
Endpoints: endpoints,
|
|
// RetryDialer chooses the next endpoint to use
|
|
// it comes with a default dialer if unspecified
|
|
DialTimeout: 5 * time.Second,
|
|
}
|
|
return cfg
|
|
}
|
|
|
|
// Connect connects the client to a server, and then builds the *API structs.
|
|
// If reconnect is true, it will force a reconnect with new config endpoints.
|
|
func (obj *EmbdEtcd) Connect(reconnect bool) error {
|
|
if obj.flags.Debug {
|
|
log.Println("Etcd: Connect...")
|
|
}
|
|
obj.cLock.Lock()
|
|
defer obj.cLock.Unlock()
|
|
if obj.cError != nil { // stop on permanent error
|
|
return obj.cError
|
|
}
|
|
if obj.client != nil { // memoize
|
|
if reconnect {
|
|
// i think this requires the rLock when using it concurrently
|
|
err := obj.client.Close()
|
|
if err != nil {
|
|
log.Printf("Etcd: (Re)Connect: Close: Error: %+v", err)
|
|
}
|
|
obj.client = nil // for kicks
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
var emax uint16 // = 0
|
|
for { // loop until connect
|
|
var err error
|
|
cfg := obj.GetConfig()
|
|
if eps := obj.endpoints; len(eps) > 0 {
|
|
log.Printf("Etcd: Connect: Endpoints: %v", eps)
|
|
} else {
|
|
log.Printf("Etcd: Connect: Endpoints: []")
|
|
}
|
|
obj.client, err = etcd.New(cfg) // connect!
|
|
if err == etcd.ErrNoAvailableEndpoints {
|
|
emax++
|
|
if emax > maxClientConnectRetries {
|
|
log.Printf("Etcd: The dataDir (%s) might be inconsistent or corrupt.", obj.dataDir)
|
|
log.Printf("Etcd: Please see: %s", "https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#what-does-the-error-message-about-an-inconsistent-datadir-mean")
|
|
obj.cError = fmt.Errorf("can't find an available endpoint")
|
|
return obj.cError
|
|
}
|
|
err = &CtxDelayErr{time.Duration(emax) * time.Second, "No endpoints available yet!"} // retry with backoff...
|
|
}
|
|
if err != nil {
|
|
log.Printf("Etcd: Connect: CtxError...")
|
|
if _, e := obj.CtxError(context.TODO(), err); e != nil {
|
|
log.Printf("Etcd: Connect: CtxError: Fatal: %v", e)
|
|
obj.cError = e
|
|
return e // fatal error
|
|
}
|
|
continue
|
|
}
|
|
// check if we're actually connected here, because this must
|
|
// block if we're not connected
|
|
if obj.client == nil {
|
|
log.Printf("Etcd: Connect: Is nil!")
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Startup is the main entry point to kick off the embedded etcd client & server.
|
|
func (obj *EmbdEtcd) Startup() error {
|
|
bootstrapping := len(obj.endpoints) == 0 // because value changes after start
|
|
|
|
// connect but don't block here, because servers might not be up yet...
|
|
go func() {
|
|
if err := obj.Connect(false); err != nil {
|
|
log.Printf("Etcd: Startup: Error: %v", err)
|
|
// XXX: Now cause Startup() to exit with error somehow!
|
|
}
|
|
}()
|
|
|
|
go obj.CbLoop() // start callback loop
|
|
go obj.Loop() // start main loop
|
|
|
|
// TODO: implement native etcd watcher method on member API changes
|
|
path := fmt.Sprintf("/%s/nominated/", NS)
|
|
go obj.AddWatcher(path, obj.nominateCallback, true, false, etcd.WithPrefix()) // no block
|
|
|
|
// setup ideal cluster size watcher
|
|
key := fmt.Sprintf("/%s/idealClusterSize", NS)
|
|
go obj.AddWatcher(key, obj.idealClusterSizeCallback, true, false) // no block
|
|
|
|
// if we have no endpoints, it means we are bootstrapping...
|
|
if !bootstrapping {
|
|
log.Println("Etcd: Startup: Getting initial values...")
|
|
if nominated, err := Nominated(obj); err == nil {
|
|
obj.nominated = nominated // store a local copy
|
|
} else {
|
|
log.Printf("Etcd: Startup: Nominate lookup error.")
|
|
obj.Destroy()
|
|
return fmt.Errorf("Etcd: Startup: Error: %v", err)
|
|
}
|
|
|
|
// get initial ideal cluster size
|
|
if idealClusterSize, err := GetClusterSize(obj); err == nil {
|
|
obj.idealClusterSize = idealClusterSize
|
|
log.Printf("Etcd: Startup: Ideal cluster size is: %d", idealClusterSize)
|
|
} else {
|
|
// perhaps the first server didn't set it yet. it's ok,
|
|
// we can get it from the watcher if it ever gets set!
|
|
log.Printf("Etcd: Startup: Ideal cluster size lookup error.")
|
|
}
|
|
}
|
|
|
|
if !obj.noServer {
|
|
path := fmt.Sprintf("/%s/volunteers/", NS)
|
|
go obj.AddWatcher(path, obj.volunteerCallback, true, false, etcd.WithPrefix()) // no block
|
|
}
|
|
|
|
// if i am alone and will have to be a server...
|
|
if !obj.noServer && bootstrapping {
|
|
log.Printf("Etcd: Bootstrapping...")
|
|
// give an initial value to the obj.nominate map we keep in sync
|
|
// this emulates Nominate(obj, obj.hostname, obj.serverURLs)
|
|
obj.nominated[obj.hostname] = obj.serverURLs // initial value
|
|
// NOTE: when we are stuck waiting for the server to start up,
|
|
// it is probably happening on this call right here...
|
|
obj.nominateCallback(nil) // kick this off once
|
|
}
|
|
|
|
// self volunteer
|
|
if !obj.noServer && len(obj.serverURLs) > 0 {
|
|
// we run this in a go routine because it blocks waiting for server
|
|
log.Printf("Etcd: Startup: Volunteering...")
|
|
go Volunteer(obj, obj.serverURLs)
|
|
}
|
|
|
|
if bootstrapping {
|
|
if err := SetClusterSize(obj, obj.idealClusterSize); err != nil {
|
|
log.Printf("Etcd: Startup: Ideal cluster size storage error.")
|
|
obj.Destroy()
|
|
return fmt.Errorf("Etcd: Startup: Error: %v", err)
|
|
}
|
|
}
|
|
|
|
go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, false, etcd.WithPrefix())
|
|
|
|
if err := obj.Connect(false); err != nil { // don't exit from this Startup function until connected!
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Destroy cleans up the entire embedded etcd system. Use DestroyServer if you
|
|
// only want to shutdown the embedded server portion.
|
|
func (obj *EmbdEtcd) Destroy() error {
|
|
|
|
// this should also trigger an unnominate, which should cause a shutdown
|
|
log.Printf("Etcd: Destroy: Unvolunteering...")
|
|
if err := Volunteer(obj, nil); err != nil { // unvolunteer so we can shutdown...
|
|
log.Printf("Etcd: Destroy: Error: %v", err) // we have a problem
|
|
}
|
|
|
|
obj.serverwg.Wait() // wait for server shutdown signal
|
|
|
|
obj.exiting = true // must happen before we run the cancel functions!
|
|
|
|
// clean up any watchers which might want to continue
|
|
obj.cancelLock.Lock() // TODO: do we really need the lock here on exit?
|
|
log.Printf("Etcd: Destroy: Cancelling %d operations...", len(obj.cancels))
|
|
for _, cancelFunc := range obj.cancels {
|
|
cancelFunc()
|
|
}
|
|
obj.cancelLock.Unlock()
|
|
|
|
obj.exitchan <- struct{}{} // cause main loop to exit
|
|
|
|
obj.rLock.Lock()
|
|
if obj.client != nil {
|
|
obj.client.Close()
|
|
}
|
|
obj.client = nil
|
|
obj.rLock.Unlock()
|
|
|
|
// this happens in response to the unnominate callback. not needed here!
|
|
//if obj.server != nil {
|
|
// return obj.DestroyServer()
|
|
//}
|
|
return nil
|
|
}
|
|
|
|
// CtxDelayErr requests a retry in Delta duration.
|
|
type CtxDelayErr struct {
|
|
Delta time.Duration
|
|
Message string
|
|
}
|
|
|
|
func (obj *CtxDelayErr) Error() string {
|
|
return fmt.Sprintf("CtxDelayErr(%v): %s", obj.Delta, obj.Message)
|
|
}
|
|
|
|
// CtxRetriesErr lets you retry as long as you have retries available.
|
|
// TODO: consider combining this with CtxDelayErr
|
|
type CtxRetriesErr struct {
|
|
Retries uint
|
|
Message string
|
|
}
|
|
|
|
func (obj *CtxRetriesErr) Error() string {
|
|
return fmt.Sprintf("CtxRetriesErr(%v): %s", obj.Retries, obj.Message)
|
|
}
|
|
|
|
// CtxPermanentErr is a permanent failure error to notify about borkage.
|
|
type CtxPermanentErr struct {
|
|
Message string
|
|
}
|
|
|
|
func (obj *CtxPermanentErr) Error() string {
|
|
return fmt.Sprintf("CtxPermanentErr: %s", obj.Message)
|
|
}
|
|
|
|
// CtxReconnectErr requests a client reconnect to the new endpoint list.
|
|
type CtxReconnectErr struct {
|
|
Message string
|
|
}
|
|
|
|
func (obj *CtxReconnectErr) Error() string {
|
|
return fmt.Sprintf("CtxReconnectErr: %s", obj.Message)
|
|
}
|
|
|
|
// CancelCtx adds a tracked cancel function around an existing context.
|
|
func (obj *EmbdEtcd) CancelCtx(ctx context.Context) (context.Context, func()) {
|
|
cancelCtx, cancelFunc := context.WithCancel(ctx)
|
|
obj.cancelLock.Lock()
|
|
obj.cancels = append(obj.cancels, cancelFunc) // not thread-safe, needs lock
|
|
obj.cancelLock.Unlock()
|
|
return cancelCtx, cancelFunc
|
|
}
|
|
|
|
// TimeoutCtx adds a tracked cancel function with timeout around an existing context.
|
|
func (obj *EmbdEtcd) TimeoutCtx(ctx context.Context, t time.Duration) (context.Context, func()) {
|
|
timeoutCtx, cancelFunc := context.WithTimeout(ctx, t)
|
|
obj.cancelLock.Lock()
|
|
obj.cancels = append(obj.cancels, cancelFunc) // not thread-safe, needs lock
|
|
obj.cancelLock.Unlock()
|
|
return timeoutCtx, cancelFunc
|
|
}
|
|
|
|
// CtxError is called whenever there is a connection or other client problem
|
|
// that needs to be resolved before we can continue, eg: connection disconnected,
|
|
// change of server to connect to, etc... It modifies the context if needed.
|
|
func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, error) {
|
|
if obj.ctxErr != nil { // stop on permanent error
|
|
return ctx, obj.ctxErr
|
|
}
|
|
type ctxKey string // use a non-basic type as ctx key (str can conflict)
|
|
const ctxErr ctxKey = "ctxErr"
|
|
const ctxIter ctxKey = "ctxIter"
|
|
expBackoff := func(tmin, texp, iter, tmax int) time.Duration {
|
|
// https://en.wikipedia.org/wiki/Exponential_backoff
|
|
// tmin <= texp^iter - 1 <= tmax // TODO: check my math
|
|
return time.Duration(math.Min(math.Max(math.Pow(float64(texp), float64(iter))-1.0, float64(tmin)), float64(tmax))) * time.Millisecond
|
|
}
|
|
var isTimeout = false
|
|
var iter int // = 0
|
|
if ctxerr, ok := ctx.Value(ctxErr).(error); ok {
|
|
if obj.flags.Debug {
|
|
log.Printf("Etcd: CtxError: err(%v), ctxerr(%v)", err, ctxerr)
|
|
}
|
|
if i, ok := ctx.Value(ctxIter).(int); ok {
|
|
iter = i + 1 // load and increment
|
|
if obj.flags.Debug {
|
|
log.Printf("Etcd: CtxError: Iter: %v", iter)
|
|
}
|
|
}
|
|
isTimeout = err == context.DeadlineExceeded
|
|
if obj.flags.Debug {
|
|
log.Printf("Etcd: CtxError: isTimeout: %v", isTimeout)
|
|
}
|
|
if !isTimeout {
|
|
iter = 0 // reset timer
|
|
}
|
|
err = ctxerr // restore error
|
|
} else if obj.flags.Debug {
|
|
log.Printf("Etcd: CtxError: No value found")
|
|
}
|
|
ctxHelper := func(tmin, texp, tmax int) context.Context {
|
|
t := expBackoff(tmin, texp, iter, tmax)
|
|
if obj.flags.Debug {
|
|
log.Printf("Etcd: CtxError: Timeout: %v", t)
|
|
}
|
|
|
|
ctxT, _ := obj.TimeoutCtx(ctx, t)
|
|
ctxV := context.WithValue(ctxT, ctxIter, iter) // save iter
|
|
ctxF := context.WithValue(ctxV, ctxErr, err) // save err
|
|
return ctxF
|
|
}
|
|
_ = ctxHelper // TODO
|
|
|
|
isGrpc := func(e error) bool { // helper function
|
|
return grpc.ErrorDesc(err) == e.Error()
|
|
}
|
|
|
|
if err == nil {
|
|
log.Fatal("Etcd: CtxError: Error: Unexpected lack of error!")
|
|
}
|
|
if obj.exiting {
|
|
obj.ctxErr = fmt.Errorf("exit in progress")
|
|
return ctx, obj.ctxErr
|
|
}
|
|
|
|
// happens when we trigger the cancels during reconnect
|
|
if err == context.Canceled {
|
|
// TODO: do we want to create a fresh ctx here for all cancels?
|
|
//ctx = context.Background()
|
|
ctx, _ = obj.CancelCtx(ctx) // add a new one
|
|
return ctx, nil // we should retry, reconnect probably happened
|
|
}
|
|
|
|
if delayErr, ok := err.(*CtxDelayErr); ok { // custom delay error
|
|
log.Printf("Etcd: CtxError: Reason: %s", delayErr.Error())
|
|
time.Sleep(delayErr.Delta) // sleep the amount of time requested
|
|
return ctx, nil
|
|
}
|
|
|
|
if retriesErr, ok := err.(*CtxRetriesErr); ok { // custom retry error
|
|
log.Printf("Etcd: CtxError: Reason: %s", retriesErr.Error())
|
|
if retriesErr.Retries == 0 {
|
|
obj.ctxErr = fmt.Errorf("no more retries due to CtxRetriesErr")
|
|
return ctx, obj.ctxErr
|
|
}
|
|
return ctx, nil
|
|
}
|
|
|
|
if permanentErr, ok := err.(*CtxPermanentErr); ok { // custom permanent error
|
|
obj.ctxErr = fmt.Errorf("error due to CtxPermanentErr: %s", permanentErr.Error())
|
|
return ctx, obj.ctxErr // quit
|
|
}
|
|
|
|
if err == etcd.ErrNoAvailableEndpoints { // etcd server is probably starting up
|
|
// TODO: tmin, texp, tmax := 500, 2, 16000 // ms, exp base, ms
|
|
// TODO: return ctxHelper(tmin, texp, tmax), nil
|
|
log.Printf("Etcd: CtxError: No endpoints available yet!")
|
|
time.Sleep(500 * time.Millisecond) // a ctx timeout won't help!
|
|
return ctx, nil // passthrough
|
|
}
|
|
|
|
// etcd server is apparently still starting up...
|
|
if err == rpctypes.ErrNotCapable { // isGrpc(rpctypes.ErrNotCapable) also matches
|
|
log.Printf("Etcd: CtxError: Server is starting up...")
|
|
time.Sleep(500 * time.Millisecond) // a ctx timeout won't help!
|
|
return ctx, nil // passthrough
|
|
}
|
|
|
|
if err == grpc.ErrClientConnTimeout { // sometimes caused by "too many colons" misconfiguration
|
|
return ctx, fmt.Errorf("misconfiguration: %v", err) // permanent failure?
|
|
}
|
|
|
|
// this can happen if my client connection shuts down, but without any
|
|
// available alternatives. in this case, rotate it off to someone else
|
|
reconnectErr, isReconnectErr := err.(*CtxReconnectErr) // custom reconnect error
|
|
switch {
|
|
case isReconnectErr:
|
|
log.Printf("Etcd: CtxError: Reason: %s", reconnectErr.Error())
|
|
fallthrough
|
|
case err == raft.ErrStopped: // TODO: does this ever happen?
|
|
fallthrough
|
|
case err == etcdserver.ErrStopped: // TODO: does this ever happen?
|
|
fallthrough
|
|
case isGrpc(raft.ErrStopped):
|
|
fallthrough
|
|
case isGrpc(etcdserver.ErrStopped):
|
|
fallthrough
|
|
case isGrpc(grpc.ErrClientConnClosing):
|
|
|
|
if obj.flags.Debug {
|
|
log.Printf("Etcd: CtxError: Error(%T): %+v", err, err)
|
|
log.Printf("Etcd: Endpoints are: %v", obj.client.Endpoints())
|
|
log.Printf("Etcd: Client endpoints are: %v", obj.endpoints)
|
|
}
|
|
|
|
if obj.flags.Debug {
|
|
log.Printf("Etcd: CtxError: Locking...")
|
|
}
|
|
obj.rLock.Lock()
|
|
// TODO: should this really be nested inside the other lock?
|
|
obj.cancelLock.Lock()
|
|
// we need to cancel any WIP connections like Txn()'s and so on
|
|
// we run the cancel()'s that are stored up so they don't block
|
|
log.Printf("Etcd: CtxError: Cancelling %d operations...", len(obj.cancels))
|
|
for _, cancelFunc := range obj.cancels {
|
|
cancelFunc()
|
|
}
|
|
obj.cancels = []func(){} // reset
|
|
obj.cancelLock.Unlock()
|
|
|
|
log.Printf("Etcd: CtxError: Reconnecting...")
|
|
if err := obj.Connect(true); err != nil {
|
|
defer obj.rLock.Unlock()
|
|
obj.ctxErr = fmt.Errorf("permanent connect error: %v", err)
|
|
return ctx, obj.ctxErr
|
|
}
|
|
if obj.flags.Debug {
|
|
log.Printf("Etcd: CtxError: Unlocking...")
|
|
}
|
|
obj.rLock.Unlock()
|
|
log.Printf("Etcd: CtxError: Reconnected!")
|
|
return ctx, nil
|
|
}
|
|
|
|
// FIXME: we might be one of the members in a two member cluster that
|
|
// had the other member crash.. hmmm bork?!
|
|
if isGrpc(context.DeadlineExceeded) {
|
|
log.Printf("Etcd: CtxError: DeadlineExceeded(%T): %+v", err, err) // TODO
|
|
}
|
|
|
|
if err == rpctypes.ErrDuplicateKey {
|
|
log.Fatalf("Etcd: CtxError: Programming error: %+v", err)
|
|
}
|
|
|
|
// if you hit this code path here, please report the unmatched error!
|
|
log.Printf("Etcd: CtxError: Unknown error(%T): %+v", err, err)
|
|
time.Sleep(1 * time.Second)
|
|
obj.ctxErr = fmt.Errorf("unknown CtxError")
|
|
return ctx, obj.ctxErr
|
|
}
|
|
|
|
// CbLoop is the loop where callback execution is serialized.
|
|
func (obj *EmbdEtcd) CbLoop() {
|
|
cuid := obj.converger.Register()
|
|
cuid.SetName("Etcd: CbLoop")
|
|
defer cuid.Unregister()
|
|
if e := obj.Connect(false); e != nil {
|
|
return // fatal
|
|
}
|
|
// we use this timer because when we ignore un-converge events and loop,
|
|
// we reset the ConvergedTimer case statement, ruining the timeout math!
|
|
cuid.StartTimer()
|
|
for {
|
|
ctx := context.Background() // TODO: inherit as input argument?
|
|
select {
|
|
// etcd watcher event
|
|
case re := <-obj.wevents:
|
|
if !re.skipConv { // if we want to count it...
|
|
cuid.ResetTimer() // activity!
|
|
}
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: CbLoop: Event: StartLoop")
|
|
}
|
|
for {
|
|
if obj.exiting { // the exit signal has been sent!
|
|
//re.resp.NACK() // nope!
|
|
break
|
|
}
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: CbLoop: rawCallback()")
|
|
}
|
|
err := rawCallback(ctx, re)
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: CbLoop: rawCallback(): %v", err)
|
|
}
|
|
if err == nil {
|
|
//re.resp.ACK() // success
|
|
break
|
|
}
|
|
re.retries++ // increment error retry count
|
|
if ctx, err = obj.CtxError(ctx, err); err != nil {
|
|
break // TODO: it's bad, break or return?
|
|
}
|
|
}
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: CbLoop: Event: FinishLoop")
|
|
}
|
|
|
|
// exit loop commit
|
|
case <-obj.exitTimeout:
|
|
log.Println("Etcd: Exiting callback loop!")
|
|
cuid.StopTimer() // clean up nicely
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Loop is the main loop where everything is serialized.
|
|
func (obj *EmbdEtcd) Loop() {
|
|
cuid := obj.converger.Register()
|
|
cuid.SetName("Etcd: Loop")
|
|
defer cuid.Unregister()
|
|
if e := obj.Connect(false); e != nil {
|
|
return // fatal
|
|
}
|
|
cuid.StartTimer()
|
|
for {
|
|
ctx := context.Background() // TODO: inherit as input argument?
|
|
// priority channel...
|
|
select {
|
|
case aw := <-obj.awq:
|
|
cuid.ResetTimer() // activity!
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: Loop: PriorityAW: StartLoop")
|
|
}
|
|
obj.loopProcessAW(ctx, aw)
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: Loop: PriorityAW: FinishLoop")
|
|
}
|
|
continue // loop to drain the priority channel first!
|
|
default:
|
|
// passthrough to normal channel
|
|
}
|
|
|
|
select {
|
|
// add watcher
|
|
case aw := <-obj.awq:
|
|
cuid.ResetTimer() // activity!
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: Loop: AW: StartLoop")
|
|
}
|
|
obj.loopProcessAW(ctx, aw)
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: Loop: AW: FinishLoop")
|
|
}
|
|
|
|
// set kv pair
|
|
case kv := <-obj.setq:
|
|
cuid.ResetTimer() // activity!
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: Loop: Set: StartLoop")
|
|
}
|
|
for {
|
|
if obj.exiting { // the exit signal has been sent!
|
|
kv.resp.NACK() // nope!
|
|
break
|
|
}
|
|
err := obj.rawSet(ctx, kv)
|
|
if err == nil {
|
|
kv.resp.ACK() // success
|
|
break
|
|
}
|
|
if ctx, err = obj.CtxError(ctx, err); err != nil { // try to reconnect, etc...
|
|
break // TODO: it's bad, break or return?
|
|
}
|
|
}
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: Loop: Set: FinishLoop")
|
|
}
|
|
|
|
// get value
|
|
case gq := <-obj.getq:
|
|
if !gq.skipConv {
|
|
cuid.ResetTimer() // activity!
|
|
}
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: Loop: Get: StartLoop")
|
|
}
|
|
for {
|
|
if obj.exiting { // the exit signal has been sent!
|
|
gq.resp.NACK() // nope!
|
|
break
|
|
}
|
|
data, err := obj.rawGet(ctx, gq)
|
|
if err == nil {
|
|
gq.data = data // update struct
|
|
gq.resp.ACK() // success
|
|
break
|
|
}
|
|
if ctx, err = obj.CtxError(ctx, err); err != nil {
|
|
break // TODO: it's bad, break or return?
|
|
}
|
|
}
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: Loop: Get: FinishLoop")
|
|
}
|
|
|
|
// delete value
|
|
case dl := <-obj.delq:
|
|
cuid.ResetTimer() // activity!
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: Loop: Delete: StartLoop")
|
|
}
|
|
for {
|
|
if obj.exiting { // the exit signal has been sent!
|
|
dl.resp.NACK() // nope!
|
|
break
|
|
}
|
|
data, err := obj.rawDelete(ctx, dl)
|
|
if err == nil {
|
|
dl.data = data // update struct
|
|
dl.resp.ACK() // success
|
|
break
|
|
}
|
|
if ctx, err = obj.CtxError(ctx, err); err != nil {
|
|
break // TODO: it's bad, break or return?
|
|
}
|
|
}
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: Loop: Delete: FinishLoop")
|
|
}
|
|
|
|
// run txn
|
|
case tn := <-obj.txnq:
|
|
cuid.ResetTimer() // activity!
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: Loop: Txn: StartLoop")
|
|
}
|
|
for {
|
|
if obj.exiting { // the exit signal has been sent!
|
|
tn.resp.NACK() // nope!
|
|
break
|
|
}
|
|
data, err := obj.rawTxn(ctx, tn)
|
|
if err == nil {
|
|
tn.data = data // update struct
|
|
tn.resp.ACK() // success
|
|
break
|
|
}
|
|
if ctx, err = obj.CtxError(ctx, err); err != nil {
|
|
break // TODO: it's bad, break or return?
|
|
}
|
|
}
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: Loop: Txn: FinishLoop")
|
|
}
|
|
|
|
// exit loop signal
|
|
case <-obj.exitchan:
|
|
log.Println("Etcd: Exiting loop shortly...")
|
|
// activate exitTimeout switch which only opens after N
|
|
// seconds of inactivity in this select switch, which
|
|
// lets everything get bled dry to avoid blocking calls
|
|
// which would otherwise block us from exiting cleanly!
|
|
obj.exitTimeout = util.TimeAfterOrBlock(exitDelay)
|
|
|
|
// exit loop commit
|
|
case <-obj.exitTimeout:
|
|
log.Println("Etcd: Exiting loop!")
|
|
cuid.StopTimer() // clean up nicely
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// loopProcessAW is a helper function to facilitate creating priority channels!
|
|
func (obj *EmbdEtcd) loopProcessAW(ctx context.Context, aw *AW) {
|
|
for {
|
|
if obj.exiting { // the exit signal has been sent!
|
|
aw.resp.NACK() // nope!
|
|
return
|
|
}
|
|
// cancelFunc is our data payload
|
|
cancelFunc, err := obj.rawAddWatcher(ctx, aw)
|
|
if err == nil {
|
|
aw.cancelFunc = cancelFunc // update struct
|
|
aw.resp.ACK() // success
|
|
return
|
|
}
|
|
if ctx, err = obj.CtxError(ctx, err); err != nil {
|
|
return // TODO: do something else ?
|
|
}
|
|
}
|
|
}
|
|
|
|
// Set queues up a set operation to occur using our mainloop.
|
|
func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error {
|
|
resp := event.NewResp()
|
|
obj.setq <- &KV{key: key, value: value, opts: opts, resp: resp}
|
|
if err := resp.Wait(); err != nil { // wait for ack/nack
|
|
return fmt.Errorf("Etcd: Set: Probably received an exit: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// rawSet actually implements the key set operation.
|
|
func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error {
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: rawSet()")
|
|
}
|
|
// key is the full key path
|
|
// TODO: should this be : obj.client.KV.Put or obj.client.Put ?
|
|
obj.rLock.RLock() // these read locks need to wrap any use of obj.client
|
|
response, err := obj.client.KV.Put(ctx, kv.key, kv.value, kv.opts...)
|
|
obj.rLock.RUnlock()
|
|
log.Printf("Etcd: Set(%s): %v", kv.key, response) // w00t... bonus
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: rawSet(): %v", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Get performs a get operation and waits for an ACK to continue.
|
|
func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, error) {
|
|
return obj.ComplexGet(path, false, opts...)
|
|
}
|
|
|
|
// ComplexGet performs a get operation and waits for an ACK to continue. It can
|
|
// accept more arguments that are useful for the less common operations.
|
|
// TODO: perhaps a get should never cause an un-converge ?
|
|
func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error) {
|
|
resp := event.NewResp()
|
|
gq := &GQ{path: path, skipConv: skipConv, opts: opts, resp: resp, data: nil}
|
|
obj.getq <- gq // send
|
|
if err := resp.Wait(); err != nil { // wait for ack/nack
|
|
return nil, fmt.Errorf("Etcd: Get: Probably received an exit: %v", err)
|
|
}
|
|
return gq.data, nil
|
|
}
|
|
|
|
func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]string, err error) {
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: rawGet()")
|
|
}
|
|
obj.rLock.RLock()
|
|
response, err := obj.client.KV.Get(ctx, gq.path, gq.opts...)
|
|
obj.rLock.RUnlock()
|
|
if err != nil || response == nil {
|
|
return nil, err
|
|
}
|
|
|
|
// TODO: write a response.ToMap() function on https://godoc.org/github.com/coreos/etcd/etcdserver/etcdserverpb#RangeResponse
|
|
result = make(map[string]string)
|
|
for _, x := range response.Kvs {
|
|
result[bytes.NewBuffer(x.Key).String()] = bytes.NewBuffer(x.Value).String()
|
|
}
|
|
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: rawGet(): %v", result)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Delete performs a delete operation and waits for an ACK to continue.
|
|
func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error) {
|
|
resp := event.NewResp()
|
|
dl := &DL{path: path, opts: opts, resp: resp, data: -1}
|
|
obj.delq <- dl // send
|
|
if err := resp.Wait(); err != nil { // wait for ack/nack
|
|
return -1, fmt.Errorf("Etcd: Delete: Probably received an exit: %v", err)
|
|
}
|
|
return dl.data, nil
|
|
}
|
|
|
|
func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err error) {
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: rawDelete()")
|
|
}
|
|
count = -1
|
|
obj.rLock.RLock()
|
|
response, err := obj.client.KV.Delete(ctx, dl.path, dl.opts...)
|
|
obj.rLock.RUnlock()
|
|
if err == nil {
|
|
count = response.Deleted
|
|
}
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: rawDelete(): %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Txn performs a transaction and waits for an ACK to continue.
|
|
func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error) {
|
|
resp := event.NewResp()
|
|
tn := &TN{ifcmps: ifcmps, thenops: thenops, elseops: elseops, resp: resp, data: nil}
|
|
obj.txnq <- tn // send
|
|
if err := resp.Wait(); err != nil { // wait for ack/nack
|
|
return nil, fmt.Errorf("Etcd: Txn: Probably received an exit: %v", err)
|
|
}
|
|
return tn.data, nil
|
|
}
|
|
|
|
func (obj *EmbdEtcd) rawTxn(ctx context.Context, tn *TN) (*etcd.TxnResponse, error) {
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: rawTxn()")
|
|
}
|
|
obj.rLock.RLock()
|
|
response, err := obj.client.KV.Txn(ctx).If(tn.ifcmps...).Then(tn.thenops...).Else(tn.elseops...).Commit()
|
|
obj.rLock.RUnlock()
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: rawTxn(): %v, %v", response, err)
|
|
}
|
|
return response, err
|
|
}
|
|
|
|
// AddWatcher queues up an add watcher request and returns a cancel function.
|
|
// Remember to add the etcd.WithPrefix() option if you want to watch recursively.
|
|
func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error) {
|
|
resp := event.NewResp()
|
|
awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, skipConv: skipConv, cancelFunc: nil, resp: resp}
|
|
obj.awq <- awq // send
|
|
if err := resp.Wait(); err != nil { // wait for ack/nack
|
|
return nil, fmt.Errorf("Etcd: AddWatcher: Got NACK: %v", err)
|
|
}
|
|
return awq.cancelFunc, nil
|
|
}
|
|
|
|
// rawAddWatcher adds a watcher and returns a cancel function to call to end it.
|
|
func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error) {
|
|
cancelCtx, cancelFunc := obj.CancelCtx(ctx)
|
|
go func(ctx context.Context) {
|
|
defer cancelFunc() // it's safe to cancelFunc() more than once!
|
|
obj.rLock.RLock()
|
|
rch := obj.client.Watcher.Watch(ctx, aw.path, aw.opts...)
|
|
obj.rLock.RUnlock()
|
|
var rev int64
|
|
var useRev = false
|
|
var retry, locked bool = false, false
|
|
for {
|
|
response := <-rch // read
|
|
err := response.Err()
|
|
isCanceled := response.Canceled || err == context.Canceled
|
|
if response.Header.Revision == 0 { // by inspection
|
|
if obj.flags.Debug {
|
|
log.Printf("Etcd: Watch: Received empty message!") // switched client connection
|
|
}
|
|
isCanceled = true
|
|
}
|
|
|
|
if isCanceled {
|
|
if obj.exiting { // if not, it could be reconnect
|
|
return
|
|
}
|
|
err = context.Canceled
|
|
}
|
|
|
|
if err == nil { // watch from latest good revision
|
|
rev = response.Header.Revision // TODO: +1 ?
|
|
useRev = true
|
|
if !locked {
|
|
retry = false
|
|
}
|
|
locked = false
|
|
} else {
|
|
if obj.flags.Debug {
|
|
log.Printf("Etcd: Watch: Error: %v", err) // probably fixable
|
|
}
|
|
// this new context is the fix for a tricky set
|
|
// of bugs which were encountered when re-using
|
|
// the existing canceled context! it has state!
|
|
ctx = context.Background() // this is critical!
|
|
|
|
if ctx, err = obj.CtxError(ctx, err); err != nil {
|
|
return // TODO: it's bad, break or return?
|
|
}
|
|
|
|
// remake it, but add old Rev when valid
|
|
opts := []etcd.OpOption{}
|
|
if useRev {
|
|
opts = append(opts, etcd.WithRev(rev))
|
|
}
|
|
opts = append(opts, aw.opts...)
|
|
rch = nil
|
|
obj.rLock.RLock()
|
|
if obj.client == nil {
|
|
defer obj.rLock.RUnlock()
|
|
return // we're exiting
|
|
}
|
|
rch = obj.client.Watcher.Watch(ctx, aw.path, opts...)
|
|
obj.rLock.RUnlock()
|
|
locked = true
|
|
retry = true
|
|
continue
|
|
}
|
|
|
|
// the response includes a list of grouped events, each
|
|
// of which includes one Kv struct. Send these all in a
|
|
// batched group so that they are processed together...
|
|
obj.wevents <- &RE{response: response, path: aw.path, err: err, callback: aw.callback, errCheck: aw.errCheck, skipConv: aw.skipConv, retryHint: retry} // send event
|
|
}
|
|
}(cancelCtx)
|
|
return cancelFunc, nil
|
|
}
|
|
|
|
// rawCallback is the companion to AddWatcher which runs the callback processing.
|
|
func rawCallback(ctx context.Context, re *RE) error {
|
|
var err = re.err // the watch event itself might have had an error
|
|
if err == nil {
|
|
if callback := re.callback; callback != nil {
|
|
// TODO: we could add an async option if needed
|
|
// NOTE: the callback must *not* block!
|
|
// FIXME: do we need to pass ctx in via *RE, or in the callback signature ?
|
|
err = callback(re) // run the callback
|
|
if !re.errCheck || err == nil {
|
|
return nil
|
|
}
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// volunteerCallback runs to respond to the volunteer list change events.
|
|
// Functionally, it controls the adding and removing of members.
|
|
// FIXME: we might need to respond to member change/disconnect/shutdown events,
|
|
// see: https://github.com/coreos/etcd/issues/5277
|
|
func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: volunteerCallback()")
|
|
defer log.Printf("Trace: Etcd: volunteerCallback(): Finished!")
|
|
}
|
|
if err := obj.Connect(false); err != nil {
|
|
log.Printf("Etcd: volunteerCallback(): Connect failed permanently: %v", err)
|
|
// permanently fail...
|
|
return &CtxPermanentErr{fmt.Sprintf("Etcd: volunteerCallback(): Connect error: %s", err)}
|
|
}
|
|
var err error
|
|
|
|
// FIXME: if this is running in response to our OWN volunteering offer,
|
|
// skip doing stuff if we're not a server yet because it's pointless,
|
|
// and we might have just lost quorum if we just got nominated! Why the
|
|
// lack of quorum is needed to read data in etcd v3 but not in v2 is a
|
|
// mystery for now, since in v3 this now blocks! Maybe it's that the
|
|
// Maintenance.Status API requires a leader to return? Maybe that's it!
|
|
// FIXME: are there any situations where we don't want to short circuit
|
|
// here, such as if i'm the last node?
|
|
if obj.server == nil {
|
|
return nil // if we're not a server, we're not a leader, return
|
|
}
|
|
|
|
membersMap, err := Members(obj) // map[uint64]string
|
|
if err != nil {
|
|
return fmt.Errorf("Etcd: Members: Error: %+v", err)
|
|
}
|
|
members := util.StrMapValuesUint64(membersMap) // get values
|
|
log.Printf("Etcd: Members: List: %+v", members)
|
|
|
|
// we only do *one* change operation at a time so that the cluster can
|
|
// advance safely. we ensure this by returning CtxDelayErr any time an
|
|
// operation happens to ensure the function will reschedule itself due
|
|
// to the CtxError processing after this callback "fails". This custom
|
|
// error is caught by CtxError, and lets us specify a retry delay too!
|
|
|
|
// check for unstarted members, since we're currently "unhealthy"
|
|
for mID, name := range membersMap {
|
|
if name == "" {
|
|
// reschedule in one second
|
|
// XXX: will the unnominate TTL still happen if we are
|
|
// in an unhealthy state? that's what we're waiting for
|
|
return &CtxDelayErr{2 * time.Second, fmt.Sprintf("unstarted member, mID: %d", mID)}
|
|
}
|
|
}
|
|
|
|
leader, err := Leader(obj) // XXX: race!
|
|
if err != nil {
|
|
log.Printf("Etcd: Leader: Error: %+v", err)
|
|
return fmt.Errorf("Etcd: Leader: Error: %+v", err)
|
|
}
|
|
log.Printf("Etcd: Leader: %+v", leader)
|
|
if leader != obj.hostname {
|
|
log.Printf("Etcd: We are not the leader...")
|
|
return nil
|
|
}
|
|
// i am the leader!
|
|
|
|
// get the list of available volunteers
|
|
volunteersMap, err := Volunteers(obj)
|
|
if err != nil {
|
|
log.Printf("Etcd: Volunteers: Error: %+v", err)
|
|
return fmt.Errorf("Etcd: Volunteers: Error: %+v", err)
|
|
}
|
|
|
|
volunteers := []string{} // get keys
|
|
for k := range volunteersMap {
|
|
volunteers = append(volunteers, k)
|
|
}
|
|
sort.Strings(volunteers) // deterministic order
|
|
log.Printf("Etcd: Volunteers: %v", volunteers)
|
|
|
|
// unnominate anyone that unvolunteers, so that they can shutdown cleanly
|
|
quitters := util.StrFilterElementsInList(volunteers, members)
|
|
log.Printf("Etcd: Quitters: %v", quitters)
|
|
|
|
// if we're the only member left, just shutdown...
|
|
if len(members) == 1 && members[0] == obj.hostname && len(quitters) == 1 && quitters[0] == obj.hostname {
|
|
log.Printf("Etcd: Quitters: Shutting down self...")
|
|
if err := Nominate(obj, obj.hostname, nil); err != nil { // unnominate myself
|
|
return &CtxDelayErr{1 * time.Second, fmt.Sprintf("error shutting down self: %v", err)}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
candidates := util.StrFilterElementsInList(members, volunteers)
|
|
log.Printf("Etcd: Candidates: %v", candidates)
|
|
|
|
// TODO: switch to < 0 so that we can shut the whole cluster down with 0
|
|
if obj.idealClusterSize < 1 { // safety in case value is not ready yet
|
|
return &CtxDelayErr{1 * time.Second, "The idealClusterSize is < 1."} // retry in one second
|
|
}
|
|
|
|
// do we need more members?
|
|
if len(candidates) > 0 && len(members)-len(quitters) < int(obj.idealClusterSize) {
|
|
chosen := candidates[0] // XXX: use a better picker algorithm
|
|
peerURLs := volunteersMap[chosen] // comma separated list of urls
|
|
|
|
// NOTE: storing peerURLs when they're already in volunteers/ is
|
|
// redundant, but it seems to be necessary for a sane algorithm.
|
|
// nominate before we call the API so that members see it first!
|
|
Nominate(obj, chosen, peerURLs)
|
|
// XXX: add a ttl here, because once we nominate someone, we
|
|
// need to give them up to N seconds to start up after we run
|
|
// the MemberAdd API because if they don't, in some situations
|
|
// such as if we're adding the second node to the cluster, then
|
|
// we've lost quorum until a second member joins! If the TTL
|
|
// expires, we need to MemberRemove! In this special case, we
|
|
// need to forcefully remove the second member if we don't add
|
|
// them, because we'll be in a lack of quorum state and unable
|
|
// to do anything... As a result, we should always only add ONE
|
|
// member at a time!
|
|
|
|
log.Printf("Etcd: Member Add: %v", peerURLs)
|
|
mresp, err := MemberAdd(obj, peerURLs)
|
|
if err != nil {
|
|
// on error this function will run again, which is good
|
|
// because we need to make sure to run the below parts!
|
|
return fmt.Errorf("Etcd: Member Add: Error: %+v", err)
|
|
}
|
|
log.Printf("Etcd: Member Add: %+v", mresp.Member.PeerURLs)
|
|
// return and reschedule to check for unstarted members, etc...
|
|
return &CtxDelayErr{1 * time.Second, fmt.Sprintf("Member %s added successfully!", chosen)} // retry asap
|
|
|
|
} else if len(quitters) == 0 && len(members) > int(obj.idealClusterSize) { // too many members
|
|
for _, kicked := range members {
|
|
// don't kick ourself unless we are the only one left...
|
|
if kicked != obj.hostname || (obj.idealClusterSize == 0 && len(members) == 1) {
|
|
quitters = []string{kicked} // XXX: use a better picker algorithm
|
|
log.Printf("Etcd: Extras: %v", quitters)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// we must remove them from the members API or it will look like a crash
|
|
if lq := len(quitters); lq > 0 {
|
|
log.Printf("Etcd: Quitters: Shutting down %d members...", lq)
|
|
}
|
|
for _, quitter := range quitters {
|
|
mID, ok := util.Uint64KeyFromStrInMap(quitter, membersMap)
|
|
if !ok {
|
|
// programming error
|
|
log.Fatalf("Etcd: Member Remove: Error: %v(%v) not in members list!", quitter, mID)
|
|
}
|
|
Nominate(obj, quitter, nil) // unnominate
|
|
// once we issue the above unnominate, that peer will
|
|
// shutdown, and this might cause us to loose quorum,
|
|
// therefore, let that member remove itself, and then
|
|
// double check that it did happen in case delinquent
|
|
// TODO: get built-in transactional member Add/Remove
|
|
// functionality to avoid a separate nominate list...
|
|
if quitter == obj.hostname { // remove in unnominate!
|
|
log.Printf("Etcd: Quitters: Removing self...")
|
|
continue // TODO: CtxDelayErr ?
|
|
}
|
|
|
|
log.Printf("Etcd: Waiting %d seconds for %s to self remove...", selfRemoveTimeout, quitter)
|
|
time.Sleep(selfRemoveTimeout * time.Second)
|
|
// in case the removed member doesn't remove itself, do it!
|
|
removed, err := MemberRemove(obj, mID)
|
|
if err != nil {
|
|
return fmt.Errorf("Etcd: Member Remove: Error: %+v", err)
|
|
}
|
|
if removed {
|
|
log.Printf("Etcd: Member Removed (forced): %v(%v)", quitter, mID)
|
|
}
|
|
|
|
// Remove the endpoint from our list to avoid blocking
|
|
// future MemberList calls which would try and connect
|
|
// to a missing endpoint... The endpoints should get
|
|
// updated from the member exiting safely if it doesn't
|
|
// crash, but if it did and/or since it's a race to see
|
|
// if the update event will get seen before we need the
|
|
// new data, just do it now anyways, then update the
|
|
// endpoint list and trigger a reconnect.
|
|
delete(obj.endpoints, quitter) // proactively delete it
|
|
obj.endpointCallback(nil) // update!
|
|
log.Printf("Member %s (%d) removed successfully!", quitter, mID)
|
|
return &CtxReconnectErr{"a member was removed"} // retry asap and update endpoint list
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// nominateCallback runs to respond to the nomination list change events.
|
|
// Functionally, it controls the starting and stopping of the server process.
|
|
func (obj *EmbdEtcd) nominateCallback(re *RE) error {
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: nominateCallback()")
|
|
defer log.Printf("Trace: Etcd: nominateCallback(): Finished!")
|
|
}
|
|
bootstrapping := len(obj.endpoints) == 0
|
|
var revision int64 // = 0
|
|
if re != nil {
|
|
revision = re.response.Header.Revision
|
|
}
|
|
if !bootstrapping && (re == nil || revision != obj.lastRevision) {
|
|
// don't reprocess if we've already processed this message
|
|
// this can happen if the callback errors and is re-called
|
|
obj.lastRevision = revision
|
|
|
|
// if we tried to lookup the nominated members here (in etcd v3)
|
|
// this would sometimes block because we would loose the cluster
|
|
// leader once the current leader calls the MemberAdd API and it
|
|
// steps down trying to form a two host cluster. Instead, we can
|
|
// look at the event response data to read the nominated values!
|
|
//nominated, err = Nominated(obj) // nope, won't always work
|
|
// since we only see what has *changed* in the response data, we
|
|
// have to keep track of the original state and apply the deltas
|
|
// this must be idempotent in case it errors and is called again
|
|
// if we're retrying and we get a data format error, it's normal
|
|
nominated := obj.nominated
|
|
if nominated, err := ApplyDeltaEvents(re, nominated); err == nil {
|
|
obj.nominated = nominated
|
|
} else if !re.retryHint || err != errApplyDeltaEventsInconsistent {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
} else {
|
|
// TODO: should we just use the above delta method for everything?
|
|
//nominated, err := Nominated(obj) // just get it
|
|
//if err != nil {
|
|
// return fmt.Errorf("Etcd: Nominate: Error: %+v", err)
|
|
//}
|
|
//obj.nominated = nominated // update our local copy
|
|
}
|
|
if n := obj.nominated; len(n) > 0 {
|
|
log.Printf("Etcd: Nominated: %+v", n)
|
|
} else {
|
|
log.Printf("Etcd: Nominated: []")
|
|
}
|
|
|
|
// if there are no other peers, we create a new server
|
|
_, exists := obj.nominated[obj.hostname]
|
|
// FIXME: can we get rid of the len(obj.nominated) == 0 ?
|
|
newCluster := len(obj.nominated) == 0 || (len(obj.nominated) == 1 && exists)
|
|
if obj.flags.Debug {
|
|
log.Printf("Etcd: nominateCallback(): newCluster: %v; exists: %v; obj.server == nil: %t", newCluster, exists, obj.server == nil)
|
|
}
|
|
// XXX: check if i have actually volunteered first of all...
|
|
if obj.server == nil && (newCluster || exists) {
|
|
|
|
log.Printf("Etcd: StartServer(newCluster: %t): %+v", newCluster, obj.nominated)
|
|
err := obj.StartServer(
|
|
newCluster, // newCluster
|
|
obj.nominated, // other peer members and urls or empty map
|
|
)
|
|
if err != nil {
|
|
var retries uint
|
|
if re != nil {
|
|
retries = re.retries
|
|
}
|
|
// retry MaxStartServerRetries times, then permanently fail
|
|
return &CtxRetriesErr{MaxStartServerRetries - retries, fmt.Sprintf("Etcd: StartServer: Error: %+v", err)}
|
|
}
|
|
|
|
if len(obj.endpoints) == 0 {
|
|
// add server to obj.endpoints list...
|
|
addresses := obj.LocalhostClientURLs()
|
|
if len(addresses) == 0 {
|
|
// probably a programming error...
|
|
log.Fatal("Etcd: No valid clientUrls exist!")
|
|
}
|
|
obj.endpoints[obj.hostname] = addresses // now we have some!
|
|
// client connects to one of the obj.endpoints servers...
|
|
log.Printf("Etcd: Addresses are: %s", addresses)
|
|
|
|
// XXX: just put this wherever for now so we don't block
|
|
// nominate self so "member" list is correct for peers to see
|
|
Nominate(obj, obj.hostname, obj.serverURLs)
|
|
// XXX: if this fails, where will we retry this part ?
|
|
}
|
|
|
|
// advertise client urls
|
|
if curls := obj.clientURLs; len(curls) > 0 {
|
|
// XXX: don't advertise local addresses! 127.0.0.1:2381 doesn't really help remote hosts
|
|
// XXX: but sometimes this is what we want... hmmm how do we decide? filter on callback?
|
|
AdvertiseEndpoints(obj, curls)
|
|
// XXX: if this fails, where will we retry this part ?
|
|
|
|
// force this to remove sentinel before we reconnect...
|
|
obj.endpointCallback(nil)
|
|
}
|
|
|
|
return &CtxReconnectErr{"local server is running"} // trigger reconnect to self
|
|
|
|
} else if obj.server != nil && !exists {
|
|
// un advertise client urls
|
|
AdvertiseEndpoints(obj, nil)
|
|
|
|
// i have been un-nominated, remove self and shutdown server!
|
|
if len(obj.nominated) != 0 { // don't call if nobody left but me!
|
|
// this works around: https://github.com/coreos/etcd/issues/5482,
|
|
// and it probably makes sense to avoid calling if we're the last
|
|
log.Printf("Etcd: Member Remove: Removing self: %v", obj.memberID)
|
|
removed, err := MemberRemove(obj, obj.memberID)
|
|
if err != nil {
|
|
return fmt.Errorf("Etcd: Member Remove: Error: %+v", err)
|
|
}
|
|
if removed {
|
|
log.Printf("Etcd: Member Removed (self): %v(%v)", obj.hostname, obj.memberID)
|
|
}
|
|
}
|
|
|
|
log.Printf("Etcd: DestroyServer...")
|
|
obj.DestroyServer()
|
|
// TODO: make sure to think about the implications of
|
|
// shutting down and potentially intercepting signals
|
|
// here after i've removed myself from the nominated!
|
|
|
|
// if we are connected to self and other servers exist: trigger
|
|
// if any of the obj.clientURLs are in the endpoints list, then
|
|
// we are stale. it is not likely that the advertised endpoints
|
|
// have been updated because we're still blocking the callback.
|
|
stale := false
|
|
for key, eps := range obj.endpoints {
|
|
if key != obj.hostname && len(eps) > 0 { // other endpoints?
|
|
stale = true // only half true so far
|
|
break
|
|
}
|
|
}
|
|
|
|
for _, curl := range obj.clientURLs { // these just got shutdown
|
|
for _, ep := range obj.client.Endpoints() {
|
|
if (curl.Host == ep || curl.String() == ep) && stale {
|
|
// add back the sentinel to force update
|
|
log.Printf("Etcd: Forcing endpoint callback...")
|
|
obj.endpoints[seedSentinel] = nil //etcdtypes.URLs{}
|
|
obj.endpointCallback(nil) // update!
|
|
return &CtxReconnectErr{"local server has shutdown"} // trigger reconnect
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// endpointCallback runs to respond to the endpoint list change events.
|
|
func (obj *EmbdEtcd) endpointCallback(re *RE) error {
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: endpointCallback()")
|
|
defer log.Printf("Trace: Etcd: endpointCallback(): Finished!")
|
|
}
|
|
|
|
// if the startup sentinel exists, or delta fails, then get a fresh copy
|
|
endpoints := make(etcdtypes.URLsMap, len(obj.endpoints))
|
|
// this would copy the reference: endpoints := obj.endpoints
|
|
for k, v := range obj.endpoints {
|
|
endpoints[k] = make(etcdtypes.URLs, len(v))
|
|
copy(endpoints[k], v)
|
|
}
|
|
|
|
// updating
|
|
_, exists := endpoints[seedSentinel]
|
|
endpoints, err := ApplyDeltaEvents(re, endpoints)
|
|
if err != nil || exists {
|
|
// TODO: we could also lookup endpoints from the maintenance api
|
|
endpoints, err = Endpoints(obj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// change detection
|
|
var changed = false // do we need to update?
|
|
if len(obj.endpoints) != len(endpoints) {
|
|
changed = true
|
|
}
|
|
for k, v1 := range obj.endpoints {
|
|
if changed { // catches previous statement and inner loop break
|
|
break
|
|
}
|
|
v2, exists := endpoints[k]
|
|
if !exists {
|
|
changed = true
|
|
break
|
|
}
|
|
if len(v1) != len(v2) {
|
|
changed = true
|
|
break
|
|
}
|
|
for i := range v1 {
|
|
if v1[i] != v2[i] {
|
|
changed = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
// is the endpoint list different?
|
|
if changed {
|
|
obj.endpoints = endpoints // set
|
|
if eps := endpoints; len(eps) > 0 {
|
|
log.Printf("Etcd: Endpoints: %+v", eps)
|
|
} else {
|
|
log.Printf("Etcd: Endpoints: []")
|
|
}
|
|
// can happen if a server drops out for example
|
|
return &CtxReconnectErr{"endpoint list changed"} // trigger reconnect with new endpoint list
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// idealClusterSizeCallback runs to respond to the ideal cluster size changes.
|
|
func (obj *EmbdEtcd) idealClusterSizeCallback(re *RE) error {
|
|
if obj.flags.Trace {
|
|
log.Printf("Trace: Etcd: idealClusterSizeCallback()")
|
|
defer log.Printf("Trace: Etcd: idealClusterSizeCallback(): Finished!")
|
|
}
|
|
path := fmt.Sprintf("/%s/idealClusterSize", NS)
|
|
for _, event := range re.response.Events {
|
|
if key := bytes.NewBuffer(event.Kv.Key).String(); key != path {
|
|
continue
|
|
}
|
|
if event.Type != etcd.EventTypePut {
|
|
continue
|
|
}
|
|
val := bytes.NewBuffer(event.Kv.Value).String()
|
|
if val == "" {
|
|
continue
|
|
}
|
|
v, err := strconv.ParseUint(val, 10, 16)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if i := uint16(v); i > 0 {
|
|
log.Printf("Etcd: Ideal cluster size is now: %d", i)
|
|
obj.idealClusterSize = i
|
|
// now, emulate the calling of the volunteerCallback...
|
|
go func() {
|
|
obj.wevents <- &RE{callback: obj.volunteerCallback, errCheck: true} // send event
|
|
}() // don't block
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// LocalhostClientURLs returns the most localhost like URLs for direct connection.
|
|
// This gets clients to talk to the local servers first before searching remotely.
|
|
func (obj *EmbdEtcd) LocalhostClientURLs() etcdtypes.URLs {
|
|
// look through obj.clientURLs and return the localhost ones
|
|
urls := etcdtypes.URLs{}
|
|
for _, x := range obj.clientURLs {
|
|
// "localhost" or anything in 127.0.0.0/8 is valid!
|
|
if s := x.Host; strings.HasPrefix(s, "localhost") || strings.HasPrefix(s, "127.") {
|
|
urls = append(urls, x)
|
|
}
|
|
}
|
|
return urls
|
|
}
|
|
|
|
// StartServer kicks of a new embedded etcd server.
|
|
func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) error {
|
|
var err error
|
|
memberName := obj.hostname
|
|
|
|
err = os.MkdirAll(obj.dataDir, 0770)
|
|
if err != nil {
|
|
log.Printf("Etcd: StartServer: Couldn't mkdir: %s.", obj.dataDir)
|
|
log.Printf("Etcd: StartServer: Mkdir error: %s.", err)
|
|
obj.DestroyServer()
|
|
return err
|
|
}
|
|
|
|
// if no peer URLs exist, then starting a server is mostly only for some
|
|
// testing, but etcd doesn't allow the value to be empty so we use this!
|
|
peerURLs, _ := etcdtypes.NewURLs([]string{"http://localhost:0"})
|
|
if len(obj.serverURLs) > 0 {
|
|
peerURLs = obj.serverURLs
|
|
}
|
|
initialPeerURLsMap := make(etcdtypes.URLsMap)
|
|
for k, v := range peerURLsMap {
|
|
initialPeerURLsMap[k] = v // copy
|
|
}
|
|
if _, exists := peerURLsMap[memberName]; !exists {
|
|
initialPeerURLsMap[memberName] = peerURLs
|
|
}
|
|
|
|
// embed etcd
|
|
cfg := embed.NewConfig()
|
|
cfg.Name = memberName // hostname
|
|
cfg.Dir = obj.dataDir
|
|
cfg.ACUrls = obj.clientURLs
|
|
cfg.APUrls = peerURLs
|
|
cfg.LCUrls = obj.clientURLs
|
|
cfg.LPUrls = peerURLs
|
|
cfg.StrictReconfigCheck = false // XXX: workaround https://github.com/coreos/etcd/issues/6305
|
|
|
|
cfg.InitialCluster = initialPeerURLsMap.String() // including myself!
|
|
if newCluster {
|
|
cfg.ClusterState = embed.ClusterStateFlagNew
|
|
} else {
|
|
cfg.ClusterState = embed.ClusterStateFlagExisting
|
|
}
|
|
//cfg.ForceNewCluster = newCluster // TODO: ?
|
|
|
|
log.Printf("Etcd: StartServer: Starting server...")
|
|
obj.server, err = embed.StartEtcd(cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
select {
|
|
case <-obj.server.Server.ReadyNotify(): // we hang here if things are bad
|
|
log.Printf("Etcd: StartServer: Done starting server!") // it didn't hang!
|
|
case <-time.After(time.Duration(MaxStartServerTimeout) * time.Second):
|
|
e := fmt.Errorf("timeout of %d seconds reached", MaxStartServerTimeout)
|
|
log.Printf("Etcd: StartServer: %s", e.Error())
|
|
obj.server.Server.Stop() // trigger a shutdown
|
|
obj.serverwg.Add(1) // add for the DestroyServer()
|
|
obj.DestroyServer()
|
|
return e
|
|
// TODO: should we wait for this notification elsewhere?
|
|
case <-obj.server.Server.StopNotify(): // it's going down now...
|
|
e := fmt.Errorf("received stop notification")
|
|
log.Printf("Etcd: StartServer: %s", e.Error())
|
|
obj.server.Server.Stop() // trigger a shutdown
|
|
obj.serverwg.Add(1) // add for the DestroyServer()
|
|
obj.DestroyServer()
|
|
return e
|
|
}
|
|
//log.Fatal(<-obj.server.Err()) XXX
|
|
log.Printf("Etcd: StartServer: Server running...")
|
|
obj.memberID = uint64(obj.server.Server.ID()) // store member id for internal use
|
|
close(obj.serverReady) // send a signal
|
|
|
|
obj.serverwg.Add(1)
|
|
return nil
|
|
}
|
|
|
|
// ServerReady returns on a channel when the server has started successfully.
|
|
func (obj *EmbdEtcd) ServerReady() <-chan struct{} { return obj.serverReady }
|
|
|
|
// DestroyServer shuts down the embedded etcd server portion.
|
|
func (obj *EmbdEtcd) DestroyServer() error {
|
|
var err error
|
|
log.Printf("Etcd: DestroyServer: Destroying...")
|
|
if obj.server != nil {
|
|
obj.server.Close() // this blocks until server has stopped
|
|
}
|
|
log.Printf("Etcd: DestroyServer: Done closing...")
|
|
|
|
obj.memberID = 0
|
|
if obj.server == nil { // skip the .Done() below because we didn't .Add(1) it.
|
|
return err
|
|
}
|
|
obj.server = nil // important because this is used as an isRunning flag
|
|
log.Printf("Etcd: DestroyServer: Unlocking server...")
|
|
obj.serverReady = make(chan struct{}) // reset the signal
|
|
obj.serverwg.Done() // -1
|
|
return err
|
|
}
|
|
|
|
//func UrlRemoveScheme(urls etcdtypes.URLs) []string {
|
|
// strs := []string{}
|
|
// for _, u := range urls {
|
|
// strs = append(strs, u.Host) // remove http:// prefix
|
|
// }
|
|
// return strs
|
|
//}
|
|
|
|
// ApplyDeltaEvents modifies a URLsMap with the deltas from a WatchResponse.
|
|
func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, error) {
|
|
if re == nil { // passthrough
|
|
return urlsmap, nil
|
|
}
|
|
for _, event := range re.response.Events {
|
|
key := bytes.NewBuffer(event.Kv.Key).String()
|
|
key = key[len(re.path):] // remove path prefix
|
|
log.Printf("Etcd: ApplyDeltaEvents: Event(%s): %s", event.Type.String(), key)
|
|
|
|
switch event.Type {
|
|
case etcd.EventTypePut:
|
|
val := bytes.NewBuffer(event.Kv.Value).String()
|
|
if val == "" {
|
|
return nil, fmt.Errorf("value in ApplyDeltaEvents is empty")
|
|
}
|
|
urls, err := etcdtypes.NewURLs(strings.Split(val, ","))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("format error in ApplyDeltaEvents: %v", err)
|
|
}
|
|
urlsmap[key] = urls // add to map
|
|
|
|
// expiry cases are seen as delete in v3 for now
|
|
//case etcd.EventTypeExpire: // doesn't exist right now
|
|
// fallthrough
|
|
case etcd.EventTypeDelete:
|
|
if _, exists := urlsmap[key]; !exists {
|
|
// this can happen if we retry an operation b/w
|
|
// a reconnect so ignore if we are reconnecting
|
|
log.Printf("Etcd: ApplyDeltaEvents: Inconsistent key: %v", key)
|
|
return nil, errApplyDeltaEventsInconsistent
|
|
}
|
|
delete(urlsmap, key)
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unknown event in ApplyDeltaEvents: %+v", event.Type)
|
|
}
|
|
}
|
|
return urlsmap, nil
|
|
}
|