Improve internal etcd error handling

This commit is contained in:
James Shubin
2016-08-29 01:31:03 -04:00
parent db4de12767
commit 5e45c5805b
2 changed files with 120 additions and 43 deletions

View File

@@ -193,6 +193,24 @@ The downside to this approach is that you won't benefit from the automatic
elastic nature of the embedded etcd servers, and that you're responsible if you elastic nature of the embedded etcd servers, and that you're responsible if you
accidentally break your etcd cluster, or if you use an unsupported version. accidentally break your etcd cluster, or if you use an unsupported version.
###What does the error message about an inconsistent dataDir mean?
If you get an error message similar to:
```
Etcd: Connect: CtxError...
Etcd: CtxError: Reason: CtxDelayErr(5s): No endpoints available yet!
Etcd: Connect: Endpoints: []
Etcd: The dataDir (/var/lib/mgmt/etcd) might be inconsistent or corrupt.
```
This happens when there are a series of fatal connect errors in a row. This can
happen when you start `mgmt` using a dataDir that doesn't correspond to the
current cluster view. As a result, the embedded etcd server never finishes
starting up, and as a result, a default endpoint never gets added. The solution
is to either reconcile the mistake, and if there is no important data saved, you
can remove the etcd dataDir. This is typically `/var/lib/mgmt/etcd/member/`.
###Did you know that there is a band named `MGMT`? ###Did you know that there is a band named `MGMT`?
I didn't realize this when naming the project, and it is accidental. After much I didn't realize this when naming the project, and it is accidental. After much

145
etcd.go
View File

@@ -73,6 +73,7 @@ const (
NS = "_mgmt" // root namespace for mgmt operations NS = "_mgmt" // root namespace for mgmt operations
seedSentinel = "_seed" // you must not name your hostname this seedSentinel = "_seed" // you must not name your hostname this
maxStartServerRetries = 3 // number of times to retry starting the etcd server maxStartServerRetries = 3 // number of times to retry starting the etcd server
maxClientConnectRetries = 5 // number of times to retry consecutive connect failures
selfRemoveTimeout = 3 // give unnominated members a chance to self exit selfRemoveTimeout = 3 // give unnominated members a chance to self exit
exitDelay = 3 // number of sec of inactivity after exit to clean up exitDelay = 3 // number of sec of inactivity after exit to clean up
defaultIdealClusterSize = 5 // default ideal cluster size target for initial seed defaultIdealClusterSize = 5 // default ideal cluster size target for initial seed
@@ -145,6 +146,8 @@ type EmbdEtcd struct { // EMBeddeD etcd
cLock sync.Mutex // client connect lock cLock sync.Mutex // client connect lock
rLock sync.RWMutex // client reconnect lock rLock sync.RWMutex // client reconnect lock
client *etcd.Client client *etcd.Client
cError error // permanent client error
ctxErr error // permanent ctx error
// exit and cleanup related // exit and cleanup related
cancelLock sync.Mutex // lock for the cancels list cancelLock sync.Mutex // lock for the cancels list
@@ -261,6 +264,9 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error {
} }
obj.cLock.Lock() obj.cLock.Lock()
defer obj.cLock.Unlock() defer obj.cLock.Unlock()
if obj.cError != nil { // stop on permanent error
return obj.cError
}
if obj.client != nil { // memoize if obj.client != nil { // memoize
if reconnect { if reconnect {
// i think this requires the rLock when using it concurrently // i think this requires the rLock when using it concurrently
@@ -273,6 +279,7 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error {
return nil return nil
} }
} }
var emax uint16 = 0
for { // loop until connect for { // loop until connect
var err error var err error
cfg := obj.GetConfig() cfg := obj.GetConfig()
@@ -282,10 +289,21 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error {
log.Printf("Etcd: Connect: Endpoints: []") log.Printf("Etcd: Connect: Endpoints: []")
} }
obj.client, err = etcd.New(cfg) // connect! obj.client, err = etcd.New(cfg) // connect!
if err == etcd.ErrNoAvailableEndpoints {
emax++
if emax > maxClientConnectRetries {
log.Printf("Etcd: The dataDir (%s) might be inconsistent or corrupt.", obj.dataDir)
log.Printf("Etcd: Please see: %s", "https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#what-does-the-error-message-about-an-inconsistent-datadir-mean")
obj.cError = fmt.Errorf("Can't find an available endpoint.")
return obj.cError
}
err = &CtxDelayErr{time.Duration(emax) * time.Second, "No endpoints available yet!"} // retry with backoff...
}
if err != nil { if err != nil {
log.Printf("Etcd: Connect: CtxError...") log.Printf("Etcd: Connect: CtxError...")
if _, e := obj.CtxError(context.TODO(), err); e != nil { if _, e := obj.CtxError(context.TODO(), err); e != nil {
log.Printf("Etcd: Connect: CtxError: Fatal: %v", e) log.Printf("Etcd: Connect: CtxError: Fatal: %v", e)
obj.cError = e
return e // fatal error return e // fatal error
} }
continue continue
@@ -306,7 +324,12 @@ func (obj *EmbdEtcd) Startup() error {
bootstrapping := len(obj.endpoints) == 0 // because value changes after start bootstrapping := len(obj.endpoints) == 0 // because value changes after start
// connect but don't block here, because servers might not be up yet... // connect but don't block here, because servers might not be up yet...
go obj.Connect(false) go func() {
if err := obj.Connect(false); err != nil {
log.Printf("Etcd: Startup: Error: %v", err)
// XXX: Now cause Startup() to exit with error somehow!
}
}()
go obj.CallbackLoop() // start callback loop go obj.CallbackLoop() // start callback loop
go obj.Loop() // start main loop go obj.Loop() // start main loop
@@ -352,7 +375,9 @@ func (obj *EmbdEtcd) Startup() error {
// give an initial value to the obj.nominate map we keep in sync // give an initial value to the obj.nominate map we keep in sync
// this emulates EtcdNominate(obj, obj.hostname, obj.serverURLs) // this emulates EtcdNominate(obj, obj.hostname, obj.serverURLs)
obj.nominated[obj.hostname] = obj.serverURLs // initial value obj.nominated[obj.hostname] = obj.serverURLs // initial value
obj.nominateCallback(nil) // kick this off once // NOTE: when we are stuck waiting for the server to start up,
// it is probably happening on this call right here...
obj.nominateCallback(nil) // kick this off once
} }
// self volunteer // self volunteer
@@ -372,8 +397,8 @@ func (obj *EmbdEtcd) Startup() error {
go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, etcd.WithPrefix()) go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, etcd.WithPrefix())
if e := obj.Connect(false); e != nil { // don't exit from this Startup function until connected! if err := obj.Connect(false); err != nil { // don't exit from this Startup function until connected!
return e return err
} }
return nil return nil
} }
@@ -437,6 +462,15 @@ func (obj *CtxRetriesErr) Error() string {
return fmt.Sprintf("CtxRetriesErr(%v): %s", obj.Retries, obj.Message) return fmt.Sprintf("CtxRetriesErr(%v): %s", obj.Retries, obj.Message)
} }
// CtxPermanentErr is a permanent failure error to notify about borkage.
type CtxPermanentErr struct {
Message string
}
func (obj *CtxPermanentErr) Error() string {
return fmt.Sprintf("CtxPermanentErr: %s", obj.Message)
}
// CtxReconnectErr requests a client reconnect to the new endpoint list // CtxReconnectErr requests a client reconnect to the new endpoint list
type CtxReconnectErr struct { type CtxReconnectErr struct {
Message string Message string
@@ -468,6 +502,9 @@ func (obj *EmbdEtcd) TimeoutCtx(ctx context.Context, t time.Duration) (context.C
// that needs to be resolved before we can continue, eg: connection disconnected, // that needs to be resolved before we can continue, eg: connection disconnected,
// change of server to connect to, etc... It modifies the context if needed. // change of server to connect to, etc... It modifies the context if needed.
func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, error) { func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, error) {
if obj.ctxErr != nil { // stop on permanent error
return ctx, obj.ctxErr
}
const ctxErr = "ctxErr" const ctxErr = "ctxErr"
const ctxIter = "ctxIter" const ctxIter = "ctxIter"
expBackoff := func(tmin, texp, iter, tmax int) time.Duration { expBackoff := func(tmin, texp, iter, tmax int) time.Duration {
@@ -519,7 +556,8 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
log.Fatal("Etcd: CtxError: Error: Unexpected lack of error!") log.Fatal("Etcd: CtxError: Error: Unexpected lack of error!")
} }
if obj.exiting { if obj.exiting {
return ctx, fmt.Errorf("Etcd: CtxError: Exit in progress!") obj.ctxErr = fmt.Errorf("Etcd: CtxError: Exit in progress!")
return ctx, obj.ctxErr
} }
// happens when we trigger the cancels during reconnect // happens when we trigger the cancels during reconnect
@@ -539,11 +577,17 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
if retriesErr, ok := err.(*CtxRetriesErr); ok { // custom retry error if retriesErr, ok := err.(*CtxRetriesErr); ok { // custom retry error
log.Printf("Etcd: CtxError: Reason: %s", retriesErr.Error()) log.Printf("Etcd: CtxError: Reason: %s", retriesErr.Error())
if retriesErr.Retries == 0 { if retriesErr.Retries == 0 {
return ctx, fmt.Errorf("Etcd: CtxError: CtxRetriesErr: No more retries!") obj.ctxErr = fmt.Errorf("Etcd: CtxError: CtxRetriesErr: No more retries!")
return ctx, obj.ctxErr
} }
return ctx, nil return ctx, nil
} }
if permanentErr, ok := err.(*CtxPermanentErr); ok { // custom permanent error
obj.ctxErr = fmt.Errorf("Etcd: CtxError: Reason: %s", permanentErr.Error())
return ctx, obj.ctxErr // quit
}
if err == etcd.ErrNoAvailableEndpoints { // etcd server is probably starting up if err == etcd.ErrNoAvailableEndpoints { // etcd server is probably starting up
// TODO: tmin, texp, tmax := 500, 2, 16000 // ms, exp base, ms // TODO: tmin, texp, tmax := 500, 2, 16000 // ms, exp base, ms
// TODO: return ctxHelper(tmin, texp, tmax), nil // TODO: return ctxHelper(tmin, texp, tmax), nil
@@ -600,7 +644,11 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
obj.cancelLock.Unlock() obj.cancelLock.Unlock()
log.Printf("Etcd: CtxError: Reconnecting...") log.Printf("Etcd: CtxError: Reconnecting...")
obj.Connect(true) // FIXME: check returned error if err := obj.Connect(true); err != nil {
defer obj.rLock.Unlock()
obj.ctxErr = fmt.Errorf("Etcd: Permanent connect error: %v", err)
return ctx, obj.ctxErr
}
if DEBUG { if DEBUG {
log.Printf("Etcd: CtxError: Unlocking...") log.Printf("Etcd: CtxError: Unlocking...")
} }
@@ -622,7 +670,8 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
// if you hit this code path here, please report the unmatched error! // if you hit this code path here, please report the unmatched error!
log.Printf("Etcd: CtxError: Unknown error(%T): %+v", err, err) log.Printf("Etcd: CtxError: Unknown error(%T): %+v", err, err)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
return ctx, fmt.Errorf("Etcd: CtxError: Unknown error!") obj.ctxErr = fmt.Errorf("Etcd: CtxError: Unknown error!")
return ctx, obj.ctxErr
} }
// CallbackLoop is the loop where callback execution is serialized // CallbackLoop is the loop where callback execution is serialized
@@ -658,8 +707,9 @@ func (obj *EmbdEtcd) CallbackLoop() {
break break
} }
re.retries++ // increment error retry count re.retries++ // increment error retry count
// FIXME: look at return value of CtxError() if ctx, err = obj.CtxError(ctx, err); err != nil {
ctx, _ = obj.CtxError(ctx, err) break // TODO: it's bad, break or return?
}
} }
if TRACE { if TRACE {
log.Printf("Trace: Etcd: Loop: Event: FinishLoop") log.Printf("Trace: Etcd: Loop: Event: FinishLoop")
@@ -730,13 +780,9 @@ func (obj *EmbdEtcd) Loop() {
kv.resp.ACK() // success kv.resp.ACK() // success
break break
} }
ctx, _ = obj.CtxError(ctx, err) // try to reconnect, etc... if ctx, err = obj.CtxError(ctx, err); err != nil { // try to reconnect, etc...
// FIXME: look at return value of CtxError() break // TODO: it's bad, break or return?
// XXX: future idea... }
//ctx, err = obj.CtxError(ctx, err) // try to reconnect, etc...
//if err != nil { // XXX check error?
// break // it's bad, break!
//}
} }
if TRACE { if TRACE {
log.Printf("Trace: Etcd: Loop: Set: FinishLoop") log.Printf("Trace: Etcd: Loop: Set: FinishLoop")
@@ -759,8 +805,9 @@ func (obj *EmbdEtcd) Loop() {
gq.resp.ACK() // success gq.resp.ACK() // success
break break
} }
// FIXME: look at return value of CtxError() if ctx, err = obj.CtxError(ctx, err); err != nil {
ctx, _ = obj.CtxError(ctx, err) break // TODO: it's bad, break or return?
}
} }
if TRACE { if TRACE {
log.Printf("Trace: Etcd: Loop: Get: FinishLoop") log.Printf("Trace: Etcd: Loop: Get: FinishLoop")
@@ -783,8 +830,9 @@ func (obj *EmbdEtcd) Loop() {
dl.resp.ACK() // success dl.resp.ACK() // success
break break
} }
// FIXME: look at return value of CtxError() if ctx, err = obj.CtxError(ctx, err); err != nil {
ctx, _ = obj.CtxError(ctx, err) break // TODO: it's bad, break or return?
}
} }
if TRACE { if TRACE {
log.Printf("Trace: Etcd: Loop: Delete: FinishLoop") log.Printf("Trace: Etcd: Loop: Delete: FinishLoop")
@@ -807,8 +855,9 @@ func (obj *EmbdEtcd) Loop() {
tn.resp.ACK() // success tn.resp.ACK() // success
break break
} }
// FIXME: look at return value of CtxError() if ctx, err = obj.CtxError(ctx, err); err != nil {
ctx, _ = obj.CtxError(ctx, err) break // TODO: it's bad, break or return?
}
} }
if TRACE { if TRACE {
log.Printf("Trace: Etcd: Loop: Txn: FinishLoop") log.Printf("Trace: Etcd: Loop: Txn: FinishLoop")
@@ -849,8 +898,9 @@ func (obj *EmbdEtcd) loopProcessAW(ctx context.Context, aw *AW) {
aw.resp.ACK() // success aw.resp.ACK() // success
return return
} }
// FIXME: look at return value of CtxError() if ctx, err = obj.CtxError(ctx, err); err != nil {
ctx, _ = obj.CtxError(ctx, err) return // TODO: do something else ?
}
} }
} }
@@ -1022,14 +1072,11 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error)
// this new context is the fix for a tricky set // this new context is the fix for a tricky set
// of bugs which were encountered when re-using // of bugs which were encountered when re-using
// the existing canceled context! it has state! // the existing canceled context! it has state!
ctx = context.Background() // this is critical! ctx = context.Background() // this is critical!
ctx, _ = obj.CtxError(ctx, err) // try to reconnect, etc...
// FIXME: look at return value of CtxError() if ctx, err = obj.CtxError(ctx, err); err != nil {
// XXX: future idea... return // TODO: it's bad, break or return?
//ctx, err = obj.CtxError(ctx, err) // try to reconnect, etc... }
//if err != nil { // XXX check error?
// return // it's bad, break!
//}
// remake it, but add old Rev when valid // remake it, but add old Rev when valid
opts := []etcd.OpOption{} opts := []etcd.OpOption{}
@@ -1090,7 +1137,11 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
log.Printf("Trace: Etcd: volunteerCallback()") log.Printf("Trace: Etcd: volunteerCallback()")
defer log.Printf("Trace: Etcd: volunteerCallback(): Finished!") defer log.Printf("Trace: Etcd: volunteerCallback(): Finished!")
} }
obj.Connect(false) // FIXME: check return error if err := obj.Connect(false); err != nil {
log.Printf("Etcd: volunteerCallback(): Connect failed permanently: %v", err)
// permanently fail...
return &CtxPermanentErr{fmt.Sprintf("Etcd: volunteerCallback(): Connect error: %s", err)}
}
var err error var err error
// FIXME: if this is running in response to our OWN volunteering offer, // FIXME: if this is running in response to our OWN volunteering offer,
@@ -1577,7 +1628,8 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap)
//cfg.ForceNewCluster = newCluster // TODO ? //cfg.ForceNewCluster = newCluster // TODO ?
log.Printf("Etcd: StartServer: Starting server...") log.Printf("Etcd: StartServer: Starting server...")
obj.server, err = embed.StartEtcd(cfg) obj.server, err = embed.StartEtcd(cfg) // we hang here if things are bad
log.Printf("Etcd: StartServer: Done starting server!") // it didn't hang!
if err != nil { if err != nil {
return err return err
} }
@@ -1821,12 +1873,16 @@ func EtcdMemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddRespo
if err == nil { if err == nil {
break break
} }
// FIXME: look at return value of CtxError() if ctx, err = obj.CtxError(ctx, err); err != nil {
ctx, _ = obj.CtxError(ctx, err) return nil, err
}
} }
return response, nil return response, nil
} }
// EtcdMemberRemove removes a member by mID and returns if it worked, and also
// if there was an error. This is because It might have run without error, but
// the member wasn't found, for example.
func EtcdMemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) { func EtcdMemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) {
//obj.Connect(false) // TODO ? //obj.Connect(false) // TODO ?
ctx := context.Background() ctx := context.Background()
@@ -1843,8 +1899,9 @@ func EtcdMemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) {
// if we get this, member already shut itself down :) // if we get this, member already shut itself down :)
return false, nil return false, nil
} }
// FIXME: look at return value of CtxError() if ctx, err = obj.CtxError(ctx, err); err != nil {
ctx, _ = obj.CtxError(ctx, err) return false, err
}
} }
return true, nil return true, nil
} }
@@ -1870,8 +1927,9 @@ func EtcdMembers(obj *EmbdEtcd) (map[uint64]string, error) {
if err == nil { if err == nil {
break break
} }
// FIXME: look at return value of CtxError() if ctx, err = obj.CtxError(ctx, err); err != nil {
ctx, _ = obj.CtxError(ctx, err) return nil, err
}
} }
members := make(map[uint64]string) members := make(map[uint64]string)
@@ -1910,8 +1968,9 @@ func EtcdLeader(obj *EmbdEtcd) (string, error) {
if err == nil { if err == nil {
break break
} }
// FIXME: look at return value of CtxError() if ctx, err = obj.CtxError(ctx, err); err != nil {
ctx, _ = obj.CtxError(ctx, err) return "", err
}
} }
// isLeader: response.Header.MemberId == response.Leader // isLeader: response.Header.MemberId == response.Leader