diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index c99ed0e2..33e9662e 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -193,6 +193,24 @@ The downside to this approach is that you won't benefit from the automatic elastic nature of the embedded etcd servers, and that you're responsible if you accidentally break your etcd cluster, or if you use an unsupported version. +###What does the error message about an inconsistent dataDir mean? + +If you get an error message similar to: + +``` +Etcd: Connect: CtxError... +Etcd: CtxError: Reason: CtxDelayErr(5s): No endpoints available yet! +Etcd: Connect: Endpoints: [] +Etcd: The dataDir (/var/lib/mgmt/etcd) might be inconsistent or corrupt. +``` + +This happens when there are a series of fatal connect errors in a row. This can +happen when you start `mgmt` using a dataDir that doesn't correspond to the +current cluster view. As a result, the embedded etcd server never finishes +starting up, and as a result, a default endpoint never gets added. The solution +is to either reconcile the mistake, and if there is no important data saved, you +can remove the etcd dataDir. This is typically `/var/lib/mgmt/etcd/member/`. + ###Did you know that there is a band named `MGMT`? I didn't realize this when naming the project, and it is accidental. After much diff --git a/etcd.go b/etcd.go index c0da55a6..c08de232 100644 --- a/etcd.go +++ b/etcd.go @@ -73,6 +73,7 @@ const ( NS = "_mgmt" // root namespace for mgmt operations seedSentinel = "_seed" // you must not name your hostname this maxStartServerRetries = 3 // number of times to retry starting the etcd server + maxClientConnectRetries = 5 // number of times to retry consecutive connect failures selfRemoveTimeout = 3 // give unnominated members a chance to self exit exitDelay = 3 // number of sec of inactivity after exit to clean up defaultIdealClusterSize = 5 // default ideal cluster size target for initial seed @@ -145,6 +146,8 @@ type EmbdEtcd struct { // EMBeddeD etcd cLock sync.Mutex // client connect lock rLock sync.RWMutex // client reconnect lock client *etcd.Client + cError error // permanent client error + ctxErr error // permanent ctx error // exit and cleanup related cancelLock sync.Mutex // lock for the cancels list @@ -261,6 +264,9 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error { } obj.cLock.Lock() defer obj.cLock.Unlock() + if obj.cError != nil { // stop on permanent error + return obj.cError + } if obj.client != nil { // memoize if reconnect { // i think this requires the rLock when using it concurrently @@ -273,6 +279,7 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error { return nil } } + var emax uint16 = 0 for { // loop until connect var err error cfg := obj.GetConfig() @@ -282,10 +289,21 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error { log.Printf("Etcd: Connect: Endpoints: []") } obj.client, err = etcd.New(cfg) // connect! + if err == etcd.ErrNoAvailableEndpoints { + emax++ + if emax > maxClientConnectRetries { + log.Printf("Etcd: The dataDir (%s) might be inconsistent or corrupt.", obj.dataDir) + log.Printf("Etcd: Please see: %s", "https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#what-does-the-error-message-about-an-inconsistent-datadir-mean") + obj.cError = fmt.Errorf("Can't find an available endpoint.") + return obj.cError + } + err = &CtxDelayErr{time.Duration(emax) * time.Second, "No endpoints available yet!"} // retry with backoff... + } if err != nil { log.Printf("Etcd: Connect: CtxError...") if _, e := obj.CtxError(context.TODO(), err); e != nil { log.Printf("Etcd: Connect: CtxError: Fatal: %v", e) + obj.cError = e return e // fatal error } continue @@ -306,7 +324,12 @@ func (obj *EmbdEtcd) Startup() error { bootstrapping := len(obj.endpoints) == 0 // because value changes after start // connect but don't block here, because servers might not be up yet... - go obj.Connect(false) + go func() { + if err := obj.Connect(false); err != nil { + log.Printf("Etcd: Startup: Error: %v", err) + // XXX: Now cause Startup() to exit with error somehow! + } + }() go obj.CallbackLoop() // start callback loop go obj.Loop() // start main loop @@ -352,7 +375,9 @@ func (obj *EmbdEtcd) Startup() error { // give an initial value to the obj.nominate map we keep in sync // this emulates EtcdNominate(obj, obj.hostname, obj.serverURLs) obj.nominated[obj.hostname] = obj.serverURLs // initial value - obj.nominateCallback(nil) // kick this off once + // NOTE: when we are stuck waiting for the server to start up, + // it is probably happening on this call right here... + obj.nominateCallback(nil) // kick this off once } // self volunteer @@ -372,8 +397,8 @@ func (obj *EmbdEtcd) Startup() error { go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, etcd.WithPrefix()) - if e := obj.Connect(false); e != nil { // don't exit from this Startup function until connected! - return e + if err := obj.Connect(false); err != nil { // don't exit from this Startup function until connected! + return err } return nil } @@ -437,6 +462,15 @@ func (obj *CtxRetriesErr) Error() string { return fmt.Sprintf("CtxRetriesErr(%v): %s", obj.Retries, obj.Message) } +// CtxPermanentErr is a permanent failure error to notify about borkage. +type CtxPermanentErr struct { + Message string +} + +func (obj *CtxPermanentErr) Error() string { + return fmt.Sprintf("CtxPermanentErr: %s", obj.Message) +} + // CtxReconnectErr requests a client reconnect to the new endpoint list type CtxReconnectErr struct { Message string @@ -468,6 +502,9 @@ func (obj *EmbdEtcd) TimeoutCtx(ctx context.Context, t time.Duration) (context.C // that needs to be resolved before we can continue, eg: connection disconnected, // change of server to connect to, etc... It modifies the context if needed. func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, error) { + if obj.ctxErr != nil { // stop on permanent error + return ctx, obj.ctxErr + } const ctxErr = "ctxErr" const ctxIter = "ctxIter" expBackoff := func(tmin, texp, iter, tmax int) time.Duration { @@ -519,7 +556,8 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, log.Fatal("Etcd: CtxError: Error: Unexpected lack of error!") } if obj.exiting { - return ctx, fmt.Errorf("Etcd: CtxError: Exit in progress!") + obj.ctxErr = fmt.Errorf("Etcd: CtxError: Exit in progress!") + return ctx, obj.ctxErr } // happens when we trigger the cancels during reconnect @@ -539,11 +577,17 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, if retriesErr, ok := err.(*CtxRetriesErr); ok { // custom retry error log.Printf("Etcd: CtxError: Reason: %s", retriesErr.Error()) if retriesErr.Retries == 0 { - return ctx, fmt.Errorf("Etcd: CtxError: CtxRetriesErr: No more retries!") + obj.ctxErr = fmt.Errorf("Etcd: CtxError: CtxRetriesErr: No more retries!") + return ctx, obj.ctxErr } return ctx, nil } + if permanentErr, ok := err.(*CtxPermanentErr); ok { // custom permanent error + obj.ctxErr = fmt.Errorf("Etcd: CtxError: Reason: %s", permanentErr.Error()) + return ctx, obj.ctxErr // quit + } + if err == etcd.ErrNoAvailableEndpoints { // etcd server is probably starting up // TODO: tmin, texp, tmax := 500, 2, 16000 // ms, exp base, ms // TODO: return ctxHelper(tmin, texp, tmax), nil @@ -600,7 +644,11 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, obj.cancelLock.Unlock() log.Printf("Etcd: CtxError: Reconnecting...") - obj.Connect(true) // FIXME: check returned error + if err := obj.Connect(true); err != nil { + defer obj.rLock.Unlock() + obj.ctxErr = fmt.Errorf("Etcd: Permanent connect error: %v", err) + return ctx, obj.ctxErr + } if DEBUG { log.Printf("Etcd: CtxError: Unlocking...") } @@ -622,7 +670,8 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, // if you hit this code path here, please report the unmatched error! log.Printf("Etcd: CtxError: Unknown error(%T): %+v", err, err) time.Sleep(1 * time.Second) - return ctx, fmt.Errorf("Etcd: CtxError: Unknown error!") + obj.ctxErr = fmt.Errorf("Etcd: CtxError: Unknown error!") + return ctx, obj.ctxErr } // CallbackLoop is the loop where callback execution is serialized @@ -658,8 +707,9 @@ func (obj *EmbdEtcd) CallbackLoop() { break } re.retries++ // increment error retry count - // FIXME: look at return value of CtxError() - ctx, _ = obj.CtxError(ctx, err) + if ctx, err = obj.CtxError(ctx, err); err != nil { + break // TODO: it's bad, break or return? + } } if TRACE { log.Printf("Trace: Etcd: Loop: Event: FinishLoop") @@ -730,13 +780,9 @@ func (obj *EmbdEtcd) Loop() { kv.resp.ACK() // success break } - ctx, _ = obj.CtxError(ctx, err) // try to reconnect, etc... - // FIXME: look at return value of CtxError() - // XXX: future idea... - //ctx, err = obj.CtxError(ctx, err) // try to reconnect, etc... - //if err != nil { // XXX check error? - // break // it's bad, break! - //} + if ctx, err = obj.CtxError(ctx, err); err != nil { // try to reconnect, etc... + break // TODO: it's bad, break or return? + } } if TRACE { log.Printf("Trace: Etcd: Loop: Set: FinishLoop") @@ -759,8 +805,9 @@ func (obj *EmbdEtcd) Loop() { gq.resp.ACK() // success break } - // FIXME: look at return value of CtxError() - ctx, _ = obj.CtxError(ctx, err) + if ctx, err = obj.CtxError(ctx, err); err != nil { + break // TODO: it's bad, break or return? + } } if TRACE { log.Printf("Trace: Etcd: Loop: Get: FinishLoop") @@ -783,8 +830,9 @@ func (obj *EmbdEtcd) Loop() { dl.resp.ACK() // success break } - // FIXME: look at return value of CtxError() - ctx, _ = obj.CtxError(ctx, err) + if ctx, err = obj.CtxError(ctx, err); err != nil { + break // TODO: it's bad, break or return? + } } if TRACE { log.Printf("Trace: Etcd: Loop: Delete: FinishLoop") @@ -807,8 +855,9 @@ func (obj *EmbdEtcd) Loop() { tn.resp.ACK() // success break } - // FIXME: look at return value of CtxError() - ctx, _ = obj.CtxError(ctx, err) + if ctx, err = obj.CtxError(ctx, err); err != nil { + break // TODO: it's bad, break or return? + } } if TRACE { log.Printf("Trace: Etcd: Loop: Txn: FinishLoop") @@ -849,8 +898,9 @@ func (obj *EmbdEtcd) loopProcessAW(ctx context.Context, aw *AW) { aw.resp.ACK() // success return } - // FIXME: look at return value of CtxError() - ctx, _ = obj.CtxError(ctx, err) + if ctx, err = obj.CtxError(ctx, err); err != nil { + return // TODO: do something else ? + } } } @@ -1022,14 +1072,11 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error) // this new context is the fix for a tricky set // of bugs which were encountered when re-using // the existing canceled context! it has state! - ctx = context.Background() // this is critical! - ctx, _ = obj.CtxError(ctx, err) // try to reconnect, etc... - // FIXME: look at return value of CtxError() - // XXX: future idea... - //ctx, err = obj.CtxError(ctx, err) // try to reconnect, etc... - //if err != nil { // XXX check error? - // return // it's bad, break! - //} + ctx = context.Background() // this is critical! + + if ctx, err = obj.CtxError(ctx, err); err != nil { + return // TODO: it's bad, break or return? + } // remake it, but add old Rev when valid opts := []etcd.OpOption{} @@ -1090,7 +1137,11 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error { log.Printf("Trace: Etcd: volunteerCallback()") defer log.Printf("Trace: Etcd: volunteerCallback(): Finished!") } - obj.Connect(false) // FIXME: check return error + if err := obj.Connect(false); err != nil { + log.Printf("Etcd: volunteerCallback(): Connect failed permanently: %v", err) + // permanently fail... + return &CtxPermanentErr{fmt.Sprintf("Etcd: volunteerCallback(): Connect error: %s", err)} + } var err error // FIXME: if this is running in response to our OWN volunteering offer, @@ -1577,7 +1628,8 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) //cfg.ForceNewCluster = newCluster // TODO ? log.Printf("Etcd: StartServer: Starting server...") - obj.server, err = embed.StartEtcd(cfg) + obj.server, err = embed.StartEtcd(cfg) // we hang here if things are bad + log.Printf("Etcd: StartServer: Done starting server!") // it didn't hang! if err != nil { return err } @@ -1821,12 +1873,16 @@ func EtcdMemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddRespo if err == nil { break } - // FIXME: look at return value of CtxError() - ctx, _ = obj.CtxError(ctx, err) + if ctx, err = obj.CtxError(ctx, err); err != nil { + return nil, err + } } return response, nil } +// EtcdMemberRemove removes a member by mID and returns if it worked, and also +// if there was an error. This is because It might have run without error, but +// the member wasn't found, for example. func EtcdMemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) { //obj.Connect(false) // TODO ? ctx := context.Background() @@ -1843,8 +1899,9 @@ func EtcdMemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) { // if we get this, member already shut itself down :) return false, nil } - // FIXME: look at return value of CtxError() - ctx, _ = obj.CtxError(ctx, err) + if ctx, err = obj.CtxError(ctx, err); err != nil { + return false, err + } } return true, nil } @@ -1870,8 +1927,9 @@ func EtcdMembers(obj *EmbdEtcd) (map[uint64]string, error) { if err == nil { break } - // FIXME: look at return value of CtxError() - ctx, _ = obj.CtxError(ctx, err) + if ctx, err = obj.CtxError(ctx, err); err != nil { + return nil, err + } } members := make(map[uint64]string) @@ -1910,8 +1968,9 @@ func EtcdLeader(obj *EmbdEtcd) (string, error) { if err == nil { break } - // FIXME: look at return value of CtxError() - ctx, _ = obj.CtxError(ctx, err) + if ctx, err = obj.CtxError(ctx, err); err != nil { + return "", err + } } // isLeader: response.Header.MemberId == response.Leader