// Mgmt // Copyright (C) 2013-2016+ James Shubin and the project contributors // Written by James Shubin and the project contributors // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . // TODO: Add TTL's (eg: volunteering) // TODO: Remove race around leader operations // TODO: Fix server reuse issue (bind: address already in use) // TODO: Fix unstarted member // TODO: Fix excessive StartLoop/FinishLoop // TODO: Add VIP for servers (incorporate with net resource) // TODO: Auto assign ports/ip's for peers (if possible) // TODO: Fix godoc // Smoke testing: // ./mgmt run --file examples/etcd1a.yaml --hostname h1 // ./mgmt run --file examples/etcd1b.yaml --hostname h2 --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2381 --server-urls http://127.0.0.1:2382 // ./mgmt run --file examples/etcd1c.yaml --hostname h3 --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2383 --server-urls http://127.0.0.1:2384 // ./mgmt run --file examples/etcd1d.yaml --hostname h4 --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2385 --server-urls http://127.0.0.1:2386 // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/idealClusterSize 3 // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/idealClusterSize 5 // The elastic etcd algorithm works in the following way: // * When you start up mgmt, you can pass it a list of seeds. // * If no seeds are given, then assume you are the first server and startup. // * If a seed is given, connect as a client, and optionally volunteer to be a server. // * All volunteering clients should listen for a message from the master for nomination. // * If a client has been nominated, it should startup a server. // * All servers should list for their nomination to be removed and shutdown if so. // * The elected leader should decide who to nominate/unnominate to keep the right number of servers. package main import ( "bytes" "errors" "fmt" "log" "math" "net/url" "os" "path" "sort" "strconv" "strings" "sync" "time" etcd "github.com/coreos/etcd/clientv3" // "clientv3" "github.com/coreos/etcd/embed" "github.com/coreos/etcd/etcdserver" rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" etcdtypes "github.com/coreos/etcd/pkg/types" raft "github.com/coreos/etcd/raft" context "golang.org/x/net/context" "google.golang.org/grpc" ) // constant parameters which may need to be tweaked or customized const ( NS = "_mgmt" // root namespace for mgmt operations seedSentinel = "_seed" // you must not name your hostname this maxStartServerTimeout = 60 // max number of seconds to wait for server to start maxStartServerRetries = 3 // number of times to retry starting the etcd server maxClientConnectRetries = 5 // number of times to retry consecutive connect failures selfRemoveTimeout = 3 // give unnominated members a chance to self exit exitDelay = 3 // number of sec of inactivity after exit to clean up defaultIdealClusterSize = 5 // default ideal cluster size target for initial seed DefaultClientURL = "127.0.0.1:2379" DefaultServerURL = "127.0.0.1:2380" ) var ( errApplyDeltaEventsInconsistent = errors.New("Etcd: ApplyDeltaEvents: Inconsistent key!") ) // AW is a struct for the AddWatcher queue type AW struct { path string opts []etcd.OpOption callback func(*RE) error errCheck bool skipConv bool // ask event to skip converger updates resp Resp cancelFunc func() // data } // RE is a response + error struct since these two values often occur together // This is now called an event with the move to the etcd v3 API type RE struct { response etcd.WatchResponse path string err error callback func(*RE) error errCheck bool // should we check the error of the callback? skipConv bool // event skips converger updates retryHint bool // set to true for one event after a watcher failure retries uint // number of times we've retried on error } // KV is a key + value struct to hold the two items together type KV struct { key string value string opts []etcd.OpOption resp Resp } // GQ is a struct for the get queue type GQ struct { path string skipConv bool opts []etcd.OpOption resp Resp data map[string]string } // DL is a struct for the delete queue type DL struct { path string opts []etcd.OpOption resp Resp data int64 } // TN is a struct for the txn queue type TN struct { ifcmps []etcd.Cmp thenops []etcd.Op elseops []etcd.Op resp Resp data *etcd.TxnResponse } // EmbdEtcd provides the embedded server and client etcd functionality type EmbdEtcd struct { // EMBeddeD etcd // etcd client connection related cLock sync.Mutex // client connect lock rLock sync.RWMutex // client reconnect lock client *etcd.Client cError error // permanent client error ctxErr error // permanent ctx error // exit and cleanup related cancelLock sync.Mutex // lock for the cancels list cancels []func() // array of every cancel function for watches exiting bool exitchan chan struct{} exitTimeout <-chan time.Time hostname string memberID uint64 // cluster membership id of server if running endpoints etcdtypes.URLsMap // map of servers a client could connect to clientURLs etcdtypes.URLs // locations to listen for clients if i am a server serverURLs etcdtypes.URLs // locations to listen for servers if i am a server (peer) noServer bool // disable all server peering if true // local tracked state nominated etcdtypes.URLsMap // copy of who's nominated to locally track state lastRevision int64 // the revision id of message being processed idealClusterSize uint16 // ideal cluster size // etcd channels awq chan *AW // add watch queue wevents chan *RE // response+error setq chan *KV // set queue getq chan *GQ // get queue delq chan *DL // delete queue txnq chan *TN // txn queue prefix string // folder prefix to use for misc storage converger Converger // converged tracking // etcd server related serverwg sync.WaitGroup // wait for server to shutdown server *embed.Etcd // technically this contains the server struct dataDir string // our data dir, prefix + "etcd" } // NewEmbdEtcd creates the top level embedded etcd struct client and server obj func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, prefix string, converger Converger) *EmbdEtcd { endpoints := make(etcdtypes.URLsMap) if hostname == seedSentinel { // safety return nil } if len(seeds) > 0 { endpoints[seedSentinel] = seeds idealClusterSize = 0 // unset, get from running cluster } obj := &EmbdEtcd{ exitchan: make(chan struct{}), // exit signal for main loop exitTimeout: nil, awq: make(chan *AW), wevents: make(chan *RE), setq: make(chan *KV), getq: make(chan *GQ), delq: make(chan *DL), txnq: make(chan *TN), nominated: make(etcdtypes.URLsMap), hostname: hostname, endpoints: endpoints, clientURLs: clientURLs, serverURLs: serverURLs, noServer: noServer, idealClusterSize: idealClusterSize, converger: converger, prefix: prefix, dataDir: path.Join(prefix, "etcd"), } // TODO: add some sort of auto assign method for picking these defaults // add a default so that our local client can connect locally if needed if len(obj.LocalhostClientURLs()) == 0 { // if we don't have any localhost URLs u := url.URL{Scheme: "http", Host: DefaultClientURL} // default obj.clientURLs = append([]url.URL{u}, obj.clientURLs...) // prepend } // add a default for local use and testing, harmless and useful! if !obj.noServer && len(obj.serverURLs) == 0 { if len(obj.endpoints) > 0 { obj.noServer = true // we didn't have enough to be a server } u := url.URL{Scheme: "http", Host: DefaultServerURL} // default obj.serverURLs = []url.URL{u} } return obj } // GetConfig returns the config struct to be used for the etcd client connect func (obj *EmbdEtcd) GetConfig() etcd.Config { endpoints := []string{} // XXX: filter out any urls which wouldn't resolve here ? for _, eps := range obj.endpoints { // flatten map for _, u := range eps { endpoints = append(endpoints, u.Host) // remove http:// prefix } } sort.Strings(endpoints) // sort for determinism cfg := etcd.Config{ Endpoints: endpoints, // RetryDialer chooses the next endpoint to use // it comes with a default dialer if unspecified DialTimeout: 5 * time.Second, } return cfg } // Connect connects the client to a server, and then builds the *API structs. // If reconnect is true, it will force a reconnect with new config endpoints. func (obj *EmbdEtcd) Connect(reconnect bool) error { if DEBUG { log.Println("Etcd: Connect...") } obj.cLock.Lock() defer obj.cLock.Unlock() if obj.cError != nil { // stop on permanent error return obj.cError } if obj.client != nil { // memoize if reconnect { // i think this requires the rLock when using it concurrently err := obj.client.Close() if err != nil { log.Printf("Etcd: (Re)Connect: Close: Error: %+v", err) } obj.client = nil // for kicks } else { return nil } } var emax uint16 // = 0 for { // loop until connect var err error cfg := obj.GetConfig() if eps := obj.endpoints; len(eps) > 0 { log.Printf("Etcd: Connect: Endpoints: %v", eps) } else { log.Printf("Etcd: Connect: Endpoints: []") } obj.client, err = etcd.New(cfg) // connect! if err == etcd.ErrNoAvailableEndpoints { emax++ if emax > maxClientConnectRetries { log.Printf("Etcd: The dataDir (%s) might be inconsistent or corrupt.", obj.dataDir) log.Printf("Etcd: Please see: %s", "https://github.com/purpleidea/mgmt/blob/master/DOCUMENTATION.md#what-does-the-error-message-about-an-inconsistent-datadir-mean") obj.cError = fmt.Errorf("Can't find an available endpoint.") return obj.cError } err = &CtxDelayErr{time.Duration(emax) * time.Second, "No endpoints available yet!"} // retry with backoff... } if err != nil { log.Printf("Etcd: Connect: CtxError...") if _, e := obj.CtxError(context.TODO(), err); e != nil { log.Printf("Etcd: Connect: CtxError: Fatal: %v", e) obj.cError = e return e // fatal error } continue } // check if we're actually connected here, because this must // block if we're not connected if obj.client == nil { log.Printf("Etcd: Connect: Is nil!") continue } break } return nil } // Startup is the main entry point to kick off the embedded etcd client & server func (obj *EmbdEtcd) Startup() error { bootstrapping := len(obj.endpoints) == 0 // because value changes after start // connect but don't block here, because servers might not be up yet... go func() { if err := obj.Connect(false); err != nil { log.Printf("Etcd: Startup: Error: %v", err) // XXX: Now cause Startup() to exit with error somehow! } }() go obj.CbLoop() // start callback loop go obj.Loop() // start main loop // TODO: implement native etcd watcher method on member API changes path := fmt.Sprintf("/%s/nominated/", NS) go obj.AddWatcher(path, obj.nominateCallback, true, false, etcd.WithPrefix()) // no block // setup ideal cluster size watcher key := fmt.Sprintf("/%s/idealClusterSize", NS) go obj.AddWatcher(key, obj.idealClusterSizeCallback, true, false) // no block // if we have no endpoints, it means we are bootstrapping... if !bootstrapping { log.Println("Etcd: Startup: Getting initial values...") if nominated, err := EtcdNominated(obj); err == nil { obj.nominated = nominated // store a local copy } else { log.Printf("Etcd: Startup: Nominate lookup error.") obj.Destroy() return fmt.Errorf("Etcd: Startup: Error: %v", err) } // get initial ideal cluster size if idealClusterSize, err := EtcdGetClusterSize(obj); err == nil { obj.idealClusterSize = idealClusterSize log.Printf("Etcd: Startup: Ideal cluster size is: %d", idealClusterSize) } else { // perhaps the first server didn't set it yet. it's ok, // we can get it from the watcher if it ever gets set! log.Printf("Etcd: Startup: Ideal cluster size lookup error.") } } if !obj.noServer { path := fmt.Sprintf("/%s/volunteers/", NS) go obj.AddWatcher(path, obj.volunteerCallback, true, false, etcd.WithPrefix()) // no block } // if i am alone and will have to be a server... if !obj.noServer && bootstrapping { log.Printf("Etcd: Bootstrapping...") // give an initial value to the obj.nominate map we keep in sync // this emulates EtcdNominate(obj, obj.hostname, obj.serverURLs) obj.nominated[obj.hostname] = obj.serverURLs // initial value // NOTE: when we are stuck waiting for the server to start up, // it is probably happening on this call right here... obj.nominateCallback(nil) // kick this off once } // self volunteer if !obj.noServer && len(obj.serverURLs) > 0 { // we run this in a go routine because it blocks waiting for server log.Printf("Etcd: Startup: Volunteering...") go EtcdVolunteer(obj, obj.serverURLs) } if bootstrapping { if err := EtcdSetClusterSize(obj, obj.idealClusterSize); err != nil { log.Printf("Etcd: Startup: Ideal cluster size storage error.") obj.Destroy() return fmt.Errorf("Etcd: Startup: Error: %v", err) } } go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, false, etcd.WithPrefix()) if err := obj.Connect(false); err != nil { // don't exit from this Startup function until connected! return err } return nil } // Destroy cleans up the entire embedded etcd system. Use DestroyServer if you // only want to shutdown the embedded server portion. func (obj *EmbdEtcd) Destroy() error { // this should also trigger an unnominate, which should cause a shutdown log.Printf("Etcd: Destroy: Unvolunteering...") if err := EtcdVolunteer(obj, nil); err != nil { // unvolunteer so we can shutdown... log.Printf("Etcd: Destroy: Error: %v", err) // we have a problem } obj.serverwg.Wait() // wait for server shutdown signal obj.exiting = true // must happen before we run the cancel functions! // clean up any watchers which might want to continue obj.cancelLock.Lock() // TODO: do we really need the lock here on exit? log.Printf("Etcd: Destroy: Cancelling %d operations...", len(obj.cancels)) for _, cancelFunc := range obj.cancels { cancelFunc() } obj.cancelLock.Unlock() obj.exitchan <- struct{}{} // cause main loop to exit obj.rLock.Lock() if obj.client != nil { obj.client.Close() } obj.client = nil obj.rLock.Unlock() // this happens in response to the unnominate callback. not needed here! //if obj.server != nil { // return obj.DestroyServer() //} return nil } // CtxDelayErr requests a retry in Delta duration type CtxDelayErr struct { Delta time.Duration Message string } func (obj *CtxDelayErr) Error() string { return fmt.Sprintf("CtxDelayErr(%v): %s", obj.Delta, obj.Message) } // CtxRetriesErr lets you retry as long as you have retries available // TODO: consider combining this with CtxDelayErr type CtxRetriesErr struct { Retries uint Message string } func (obj *CtxRetriesErr) Error() string { return fmt.Sprintf("CtxRetriesErr(%v): %s", obj.Retries, obj.Message) } // CtxPermanentErr is a permanent failure error to notify about borkage. type CtxPermanentErr struct { Message string } func (obj *CtxPermanentErr) Error() string { return fmt.Sprintf("CtxPermanentErr: %s", obj.Message) } // CtxReconnectErr requests a client reconnect to the new endpoint list type CtxReconnectErr struct { Message string } func (obj *CtxReconnectErr) Error() string { return fmt.Sprintf("CtxReconnectErr: %s", obj.Message) } // CancelCtx adds a tracked cancel function around an existing context func (obj *EmbdEtcd) CancelCtx(ctx context.Context) (context.Context, func()) { cancelCtx, cancelFunc := context.WithCancel(ctx) obj.cancelLock.Lock() obj.cancels = append(obj.cancels, cancelFunc) // not thread-safe, needs lock obj.cancelLock.Unlock() return cancelCtx, cancelFunc } // TimeoutCtx adds a tracked cancel function with timeout around an existing context func (obj *EmbdEtcd) TimeoutCtx(ctx context.Context, t time.Duration) (context.Context, func()) { timeoutCtx, cancelFunc := context.WithTimeout(ctx, t) obj.cancelLock.Lock() obj.cancels = append(obj.cancels, cancelFunc) // not thread-safe, needs lock obj.cancelLock.Unlock() return timeoutCtx, cancelFunc } // CtxError is called whenever there is a connection or other client problem // that needs to be resolved before we can continue, eg: connection disconnected, // change of server to connect to, etc... It modifies the context if needed. func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, error) { if obj.ctxErr != nil { // stop on permanent error return ctx, obj.ctxErr } const ctxErr = "ctxErr" const ctxIter = "ctxIter" expBackoff := func(tmin, texp, iter, tmax int) time.Duration { // https://en.wikipedia.org/wiki/Exponential_backoff // tmin <= texp^iter - 1 <= tmax // TODO: check my math return time.Duration(math.Min(math.Max(math.Pow(float64(texp), float64(iter))-1.0, float64(tmin)), float64(tmax))) * time.Millisecond } var isTimeout = false var iter int // = 0 if ctxerr, ok := ctx.Value(ctxErr).(error); ok { if DEBUG { log.Printf("Etcd: CtxError: err(%v), ctxerr(%v)", err, ctxerr) } if i, ok := ctx.Value(ctxIter).(int); ok { iter = i + 1 // load and increment if DEBUG { log.Printf("Etcd: CtxError: Iter: %v", iter) } } isTimeout = err == context.DeadlineExceeded if DEBUG { log.Printf("Etcd: CtxError: isTimeout: %v", isTimeout) } if !isTimeout { iter = 0 // reset timer } err = ctxerr // restore error } else if DEBUG { log.Printf("Etcd: CtxError: No value found") } ctxHelper := func(tmin, texp, tmax int) context.Context { t := expBackoff(tmin, texp, iter, tmax) if DEBUG { log.Printf("Etcd: CtxError: Timeout: %v", t) } ctxT, _ := obj.TimeoutCtx(ctx, t) ctxV := context.WithValue(ctxT, ctxIter, iter) // save iter ctxF := context.WithValue(ctxV, ctxErr, err) // save err return ctxF } _ = ctxHelper // TODO isGrpc := func(e error) bool { // helper function return grpc.ErrorDesc(err) == e.Error() } if err == nil { log.Fatal("Etcd: CtxError: Error: Unexpected lack of error!") } if obj.exiting { obj.ctxErr = fmt.Errorf("Etcd: CtxError: Exit in progress!") return ctx, obj.ctxErr } // happens when we trigger the cancels during reconnect if err == context.Canceled { // TODO: do we want to create a fresh ctx here for all cancels? //ctx = context.Background() ctx, _ = obj.CancelCtx(ctx) // add a new one return ctx, nil // we should retry, reconnect probably happened } if delayErr, ok := err.(*CtxDelayErr); ok { // custom delay error log.Printf("Etcd: CtxError: Reason: %s", delayErr.Error()) time.Sleep(delayErr.Delta) // sleep the amount of time requested return ctx, nil } if retriesErr, ok := err.(*CtxRetriesErr); ok { // custom retry error log.Printf("Etcd: CtxError: Reason: %s", retriesErr.Error()) if retriesErr.Retries == 0 { obj.ctxErr = fmt.Errorf("Etcd: CtxError: CtxRetriesErr: No more retries!") return ctx, obj.ctxErr } return ctx, nil } if permanentErr, ok := err.(*CtxPermanentErr); ok { // custom permanent error obj.ctxErr = fmt.Errorf("Etcd: CtxError: Reason: %s", permanentErr.Error()) return ctx, obj.ctxErr // quit } if err == etcd.ErrNoAvailableEndpoints { // etcd server is probably starting up // TODO: tmin, texp, tmax := 500, 2, 16000 // ms, exp base, ms // TODO: return ctxHelper(tmin, texp, tmax), nil log.Printf("Etcd: CtxError: No endpoints available yet!") time.Sleep(500 * time.Millisecond) // a ctx timeout won't help! return ctx, nil // passthrough } // etcd server is apparently still starting up... if err == rpctypes.ErrNotCapable { // isGrpc(rpctypes.ErrNotCapable) also matches log.Printf("Etcd: CtxError: Server is starting up...") time.Sleep(500 * time.Millisecond) // a ctx timeout won't help! return ctx, nil // passthrough } if err == grpc.ErrClientConnTimeout { // sometimes caused by "too many colons" misconfiguration return ctx, fmt.Errorf("Etcd: Error: Misconfiguration: %v", err) // permanent failure? } // this can happen if my client connection shuts down, but without any // available alternatives. in this case, rotate it off to someone else reconnectErr, isReconnectErr := err.(*CtxReconnectErr) // custom reconnect error switch { case isReconnectErr: log.Printf("Etcd: CtxError: Reason: %s", reconnectErr.Error()) fallthrough case err == raft.ErrStopped: // TODO: does this ever happen? fallthrough case err == etcdserver.ErrStopped: // TODO: does this ever happen? fallthrough case isGrpc(raft.ErrStopped): fallthrough case isGrpc(etcdserver.ErrStopped): fallthrough case isGrpc(grpc.ErrClientConnClosing): if DEBUG { log.Printf("Etcd: CtxError: Error(%T): %+v", err, err) log.Printf("Etcd: Endpoints are: %v", obj.client.Endpoints()) log.Printf("Etcd: Client endpoints are: %v", obj.endpoints) } if DEBUG { log.Printf("Etcd: CtxError: Locking...") } obj.rLock.Lock() // TODO: should this really be nested inside the other lock? obj.cancelLock.Lock() // we need to cancel any WIP connections like Txn()'s and so on // we run the cancel()'s that are stored up so they don't block log.Printf("Etcd: CtxError: Cancelling %d operations...", len(obj.cancels)) for _, cancelFunc := range obj.cancels { cancelFunc() } obj.cancels = []func(){} // reset obj.cancelLock.Unlock() log.Printf("Etcd: CtxError: Reconnecting...") if err := obj.Connect(true); err != nil { defer obj.rLock.Unlock() obj.ctxErr = fmt.Errorf("Etcd: Permanent connect error: %v", err) return ctx, obj.ctxErr } if DEBUG { log.Printf("Etcd: CtxError: Unlocking...") } obj.rLock.Unlock() log.Printf("Etcd: CtxError: Reconnected!") return ctx, nil } // FIXME: we might be one of the members in a two member cluster that // had the other member crash.. hmmm bork?! if isGrpc(context.DeadlineExceeded) { log.Printf("Etcd: CtxError: DeadlineExceeded(%T): %+v", err, err) // TODO } if err == rpctypes.ErrDuplicateKey { log.Fatalf("Etcd: CtxError: Programming error: %+v", err) } // if you hit this code path here, please report the unmatched error! log.Printf("Etcd: CtxError: Unknown error(%T): %+v", err, err) time.Sleep(1 * time.Second) obj.ctxErr = fmt.Errorf("Etcd: CtxError: Unknown error!") return ctx, obj.ctxErr } // CbLoop is the loop where callback execution is serialized func (obj *EmbdEtcd) CbLoop() { cuuid := obj.converger.Register() cuuid.SetName("Etcd: CbLoop") defer cuuid.Unregister() if e := obj.Connect(false); e != nil { return // fatal } // we use this timer because when we ignore un-converge events and loop, // we reset the ConvergedTimer case statement, ruining the timeout math! cuuid.StartTimer() for { ctx := context.Background() // TODO: inherit as input argument? select { // etcd watcher event case re := <-obj.wevents: if !re.skipConv { // if we want to count it... cuuid.ResetTimer() // activity! } if TRACE { log.Printf("Trace: Etcd: CbLoop: Event: StartLoop") } for { if obj.exiting { // the exit signal has been sent! //re.resp.NACK() // nope! break } if TRACE { log.Printf("Trace: Etcd: CbLoop: rawCallback()") } err := rawCallback(ctx, re) if TRACE { log.Printf("Trace: Etcd: CbLoop: rawCallback(): %v", err) } if err == nil { //re.resp.ACK() // success break } re.retries++ // increment error retry count if ctx, err = obj.CtxError(ctx, err); err != nil { break // TODO: it's bad, break or return? } } if TRACE { log.Printf("Trace: Etcd: CbLoop: Event: FinishLoop") } // exit loop commit case <-obj.exitTimeout: log.Println("Etcd: Exiting callback loop!") cuuid.StopTimer() // clean up nicely return } } } // Loop is the main loop where everything is serialized func (obj *EmbdEtcd) Loop() { cuuid := obj.converger.Register() cuuid.SetName("Etcd: Loop") defer cuuid.Unregister() if e := obj.Connect(false); e != nil { return // fatal } cuuid.StartTimer() for { ctx := context.Background() // TODO: inherit as input argument? // priority channel... select { case aw := <-obj.awq: cuuid.ResetTimer() // activity! if TRACE { log.Printf("Trace: Etcd: Loop: PriorityAW: StartLoop") } obj.loopProcessAW(ctx, aw) if TRACE { log.Printf("Trace: Etcd: Loop: PriorityAW: FinishLoop") } continue // loop to drain the priority channel first! default: // passthrough to normal channel } select { // add watcher case aw := <-obj.awq: cuuid.ResetTimer() // activity! if TRACE { log.Printf("Trace: Etcd: Loop: AW: StartLoop") } obj.loopProcessAW(ctx, aw) if TRACE { log.Printf("Trace: Etcd: Loop: AW: FinishLoop") } // set kv pair case kv := <-obj.setq: cuuid.ResetTimer() // activity! if TRACE { log.Printf("Trace: Etcd: Loop: Set: StartLoop") } for { if obj.exiting { // the exit signal has been sent! kv.resp.NACK() // nope! break } err := obj.rawSet(ctx, kv) if err == nil { kv.resp.ACK() // success break } if ctx, err = obj.CtxError(ctx, err); err != nil { // try to reconnect, etc... break // TODO: it's bad, break or return? } } if TRACE { log.Printf("Trace: Etcd: Loop: Set: FinishLoop") } // get value case gq := <-obj.getq: if !gq.skipConv { cuuid.ResetTimer() // activity! } if TRACE { log.Printf("Trace: Etcd: Loop: Get: StartLoop") } for { if obj.exiting { // the exit signal has been sent! gq.resp.NACK() // nope! break } data, err := obj.rawGet(ctx, gq) if err == nil { gq.data = data // update struct gq.resp.ACK() // success break } if ctx, err = obj.CtxError(ctx, err); err != nil { break // TODO: it's bad, break or return? } } if TRACE { log.Printf("Trace: Etcd: Loop: Get: FinishLoop") } // delete value case dl := <-obj.delq: cuuid.ResetTimer() // activity! if TRACE { log.Printf("Trace: Etcd: Loop: Delete: StartLoop") } for { if obj.exiting { // the exit signal has been sent! dl.resp.NACK() // nope! break } data, err := obj.rawDelete(ctx, dl) if err == nil { dl.data = data // update struct dl.resp.ACK() // success break } if ctx, err = obj.CtxError(ctx, err); err != nil { break // TODO: it's bad, break or return? } } if TRACE { log.Printf("Trace: Etcd: Loop: Delete: FinishLoop") } // run txn case tn := <-obj.txnq: cuuid.ResetTimer() // activity! if TRACE { log.Printf("Trace: Etcd: Loop: Txn: StartLoop") } for { if obj.exiting { // the exit signal has been sent! tn.resp.NACK() // nope! break } data, err := obj.rawTxn(ctx, tn) if err == nil { tn.data = data // update struct tn.resp.ACK() // success break } if ctx, err = obj.CtxError(ctx, err); err != nil { break // TODO: it's bad, break or return? } } if TRACE { log.Printf("Trace: Etcd: Loop: Txn: FinishLoop") } // exit loop signal case <-obj.exitchan: log.Println("Etcd: Exiting loop shortly...") // activate exitTimeout switch which only opens after N // seconds of inactivity in this select switch, which // lets everything get bled dry to avoid blocking calls // which would otherwise block us from exiting cleanly! obj.exitTimeout = TimeAfterOrBlock(exitDelay) // exit loop commit case <-obj.exitTimeout: log.Println("Etcd: Exiting loop!") cuuid.StopTimer() // clean up nicely return } } } // loopProcessAW is a helper function to facilitate creating priority channels! func (obj *EmbdEtcd) loopProcessAW(ctx context.Context, aw *AW) { for { if obj.exiting { // the exit signal has been sent! aw.resp.NACK() // nope! return } // cancelFunc is our data payload cancelFunc, err := obj.rawAddWatcher(ctx, aw) if err == nil { aw.cancelFunc = cancelFunc // update struct aw.resp.ACK() // success return } if ctx, err = obj.CtxError(ctx, err); err != nil { return // TODO: do something else ? } } } // Set queues up a set operation to occur using our mainloop func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error { resp := NewResp() obj.setq <- &KV{key: key, value: value, opts: opts, resp: resp} if !resp.Wait() { // wait for ack/nack return fmt.Errorf("Etcd: Set: Probably received an exit...") } return nil } // rawSet actually implements the key set operation func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error { if TRACE { log.Printf("Trace: Etcd: rawSet()") } // key is the full key path // TODO: should this be : obj.client.KV.Put or obj.client.Put ? obj.rLock.RLock() // these read locks need to wrap any use of obj.client response, err := obj.client.KV.Put(ctx, kv.key, kv.value, kv.opts...) obj.rLock.RUnlock() log.Printf("Etcd: Set(%s): %v", kv.key, response) // w00t... bonus if TRACE { log.Printf("Trace: Etcd: rawSet(): %v", err) } return err } // Get performs a get operation and waits for an ACK to continue func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, error) { return obj.ComplexGet(path, false, opts...) } // ComplexGet performs a get operation and waits for an ACK to continue. It can // accept more arguments that are useful for the less common operations. // TODO: perhaps a get should never cause an un-converge ? func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error) { resp := NewResp() gq := &GQ{path: path, skipConv: skipConv, opts: opts, resp: resp, data: nil} obj.getq <- gq // send if !resp.Wait() { // wait for ack/nack return nil, fmt.Errorf("Etcd: Get: Probably received an exit...") } return gq.data, nil } func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]string, err error) { if TRACE { log.Printf("Trace: Etcd: rawGet()") } obj.rLock.RLock() response, err := obj.client.KV.Get(ctx, gq.path, gq.opts...) obj.rLock.RUnlock() if err != nil || response == nil { return nil, err } // TODO: write a response.ToMap() function on https://godoc.org/github.com/coreos/etcd/etcdserver/etcdserverpb#RangeResponse result = make(map[string]string) for _, x := range response.Kvs { result[bytes.NewBuffer(x.Key).String()] = bytes.NewBuffer(x.Value).String() } if TRACE { log.Printf("Trace: Etcd: rawGet(): %v", result) } return } // Delete performs a delete operation and waits for an ACK to continue func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error) { resp := NewResp() dl := &DL{path: path, opts: opts, resp: resp, data: -1} obj.delq <- dl // send if !resp.Wait() { // wait for ack/nack return -1, fmt.Errorf("Etcd: Delete: Probably received an exit...") } return dl.data, nil } func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err error) { if TRACE { log.Printf("Trace: Etcd: rawDelete()") } count = -1 obj.rLock.RLock() response, err := obj.client.KV.Delete(ctx, dl.path, dl.opts...) obj.rLock.RUnlock() if err == nil { count = response.Deleted } if TRACE { log.Printf("Trace: Etcd: rawDelete(): %v", err) } return } // Txn performs a transaction and waits for an ACK to continue func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error) { resp := NewResp() tn := &TN{ifcmps: ifcmps, thenops: thenops, elseops: elseops, resp: resp, data: nil} obj.txnq <- tn // send if !resp.Wait() { // wait for ack/nack return nil, fmt.Errorf("Etcd: Txn: Probably received an exit...") } return tn.data, nil } func (obj *EmbdEtcd) rawTxn(ctx context.Context, tn *TN) (*etcd.TxnResponse, error) { if TRACE { log.Printf("Trace: Etcd: rawTxn()") } obj.rLock.RLock() response, err := obj.client.KV.Txn(ctx).If(tn.ifcmps...).Then(tn.thenops...).Else(tn.elseops...).Commit() obj.rLock.RUnlock() if TRACE { log.Printf("Trace: Etcd: rawTxn(): %v, %v", response, err) } return response, err } // AddWatcher queues up an add watcher request and returns a cancel function // Remember to add the etcd.WithPrefix() option if you want to watch recursively func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error) { resp := NewResp() awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, skipConv: skipConv, cancelFunc: nil, resp: resp} obj.awq <- awq // send if !resp.Wait() { // wait for ack/nack return nil, fmt.Errorf("Etcd: AddWatcher: Got NACK!") } return awq.cancelFunc, nil } // rawAddWatcher adds a watcher and returns a cancel function to call to end it func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error) { cancelCtx, cancelFunc := obj.CancelCtx(ctx) go func(ctx context.Context) { defer cancelFunc() // it's safe to cancelFunc() more than once! obj.rLock.RLock() rch := obj.client.Watcher.Watch(ctx, aw.path, aw.opts...) obj.rLock.RUnlock() var rev int64 var useRev = false var retry, locked bool = false, false for { response := <-rch // read err := response.Err() isCanceled := response.Canceled || err == context.Canceled if response.Header.Revision == 0 { // by inspection if DEBUG { log.Printf("Etcd: Watch: Received empty message!") // switched client connection } isCanceled = true } if isCanceled { if obj.exiting { // if not, it could be reconnect return } err = context.Canceled } if err == nil { // watch from latest good revision rev = response.Header.Revision // TODO +1 ? useRev = true if !locked { retry = false } locked = false } else { if DEBUG { log.Printf("Etcd: Watch: Error: %v", err) // probably fixable } // this new context is the fix for a tricky set // of bugs which were encountered when re-using // the existing canceled context! it has state! ctx = context.Background() // this is critical! if ctx, err = obj.CtxError(ctx, err); err != nil { return // TODO: it's bad, break or return? } // remake it, but add old Rev when valid opts := []etcd.OpOption{} if useRev { opts = append(opts, etcd.WithRev(rev)) } opts = append(opts, aw.opts...) rch = nil obj.rLock.RLock() if obj.client == nil { defer obj.rLock.RUnlock() return // we're exiting } rch = obj.client.Watcher.Watch(ctx, aw.path, opts...) obj.rLock.RUnlock() locked = true retry = true continue } // the response includes a list of grouped events, each // of which includes one Kv struct. Send these all in a // batched group so that they are processed together... obj.wevents <- &RE{response: response, path: aw.path, err: err, callback: aw.callback, errCheck: aw.errCheck, skipConv: aw.skipConv, retryHint: retry} // send event } }(cancelCtx) return cancelFunc, nil } // rawCallback is the companion to AddWatcher which runs the callback processing func rawCallback(ctx context.Context, re *RE) error { var err = re.err // the watch event itself might have had an error if err == nil { if callback := re.callback; callback != nil { // TODO: we could add an async option if needed // NOTE: the callback must *not* block! // FIXME: do we need to pass ctx in via *RE, or in the callback signature ? err = callback(re) // run the callback if TRACE { log.Printf("Trace: Etcd: rawCallback(): %v", err) } if !re.errCheck || err == nil { return nil } } else { return nil } } return err } // volunteerCallback runs to respond to the volunteer list change events // functionally, it controls the adding and removing of members // FIXME: we might need to respond to member change/disconnect/shutdown events, // see: https://github.com/coreos/etcd/issues/5277 func (obj *EmbdEtcd) volunteerCallback(re *RE) error { if TRACE { log.Printf("Trace: Etcd: volunteerCallback()") defer log.Printf("Trace: Etcd: volunteerCallback(): Finished!") } if err := obj.Connect(false); err != nil { log.Printf("Etcd: volunteerCallback(): Connect failed permanently: %v", err) // permanently fail... return &CtxPermanentErr{fmt.Sprintf("Etcd: volunteerCallback(): Connect error: %s", err)} } var err error // FIXME: if this is running in response to our OWN volunteering offer, // skip doing stuff if we're not a server yet because it's pointless, // and we might have just lost quorum if we just got nominated! Why the // lack of quorum is needed to read data in etcd v3 but not in v2 is a // mystery for now, since in v3 this now blocks! Maybe it's that the // Maintenance.Status API requires a leader to return? Maybe that's it! // FIXME: are there any situations where we don't want to short circuit // here, such as if i'm the last node? if obj.server == nil { return nil // if we're not a server, we're not a leader, return } membersMap, err := EtcdMembers(obj) // map[uint64]string if err != nil { return fmt.Errorf("Etcd: Members: Error: %+v", err) } members := StrMapValuesUint64(membersMap) // get values log.Printf("Etcd: Members: List: %+v", members) // we only do *one* change operation at a time so that the cluster can // advance safely. we ensure this by returning CtxDelayErr any time an // operation happens to ensure the function will reschedule itself due // to the CtxError processing after this callback "fails". This custom // error is caught by CtxError, and lets us specify a retry delay too! // check for unstarted members, since we're currently "unhealthy" for mID, name := range membersMap { if name == "" { // reschedule in one second // XXX: will the unnominate TTL still happen if we are // in an unhealthy state? that's what we're waiting for return &CtxDelayErr{2 * time.Second, fmt.Sprintf("unstarted member, mID: %d", mID)} } } leader, err := EtcdLeader(obj) // XXX: race! if err != nil { log.Printf("Etcd: Leader: Error: %+v", err) return fmt.Errorf("Etcd: Leader: Error: %+v", err) } log.Printf("Etcd: Leader: %+v", leader) if leader != obj.hostname { log.Printf("Etcd: We are not the leader...") return nil } // i am the leader! // get the list of available volunteers volunteersMap, err := EtcdVolunteers(obj) if err != nil { log.Printf("Etcd: Volunteers: Error: %+v", err) return fmt.Errorf("Etcd: Volunteers: Error: %+v", err) } volunteers := []string{} // get keys for k := range volunteersMap { volunteers = append(volunteers, k) } sort.Strings(volunteers) // deterministic order log.Printf("Etcd: Volunteers: %v", volunteers) // unnominate anyone that unvolunteers, so that they can shutdown cleanly quitters := StrFilterElementsInList(volunteers, members) log.Printf("Etcd: Quitters: %v", quitters) // if we're the only member left, just shutdown... if len(members) == 1 && members[0] == obj.hostname && len(quitters) == 1 && quitters[0] == obj.hostname { log.Printf("Etcd: Quitters: Shutting down self...") if err := EtcdNominate(obj, obj.hostname, nil); err != nil { // unnominate myself return &CtxDelayErr{1 * time.Second, fmt.Sprintf("error shutting down self: %v", err)} } return nil } candidates := StrFilterElementsInList(members, volunteers) log.Printf("Etcd: Candidates: %v", candidates) // TODO: switch to < 0 so that we can shut the whole cluster down with 0 if obj.idealClusterSize < 1 { // safety in case value is not ready yet return &CtxDelayErr{1 * time.Second, "The idealClusterSize is < 1."} // retry in one second } // do we need more members? if len(candidates) > 0 && len(members)-len(quitters) < int(obj.idealClusterSize) { chosen := candidates[0] // XXX: use a better picker algorithm peerURLs := volunteersMap[chosen] // comma separated list of urls // NOTE: storing peerURLs when they're already in volunteers/ is // redundant, but it seems to be necessary for a sane algorithm. // nominate before we call the API so that members see it first! EtcdNominate(obj, chosen, peerURLs) // XXX: add a ttl here, because once we nominate someone, we // need to give them up to N seconds to start up after we run // the MemberAdd API because if they don't, in some situations // such as if we're adding the second node to the cluster, then // we've lost quorum until a second member joins! If the TTL // expires, we need to MemberRemove! In this special case, we // need to forcefully remove the second member if we don't add // them, because we'll be in a lack of quorum state and unable // to do anything... As a result, we should always only add ONE // member at a time! log.Printf("Etcd: Member Add: %v", peerURLs) mresp, err := EtcdMemberAdd(obj, peerURLs) if err != nil { // on error this function will run again, which is good // because we need to make sure to run the below parts! return fmt.Errorf("Etcd: Member Add: Error: %+v", err) } log.Printf("Etcd: Member Add: %+v", mresp.Member.PeerURLs) // return and reschedule to check for unstarted members, etc... return &CtxDelayErr{1 * time.Second, fmt.Sprintf("Member %s added successfully!", chosen)} // retry asap } else if len(quitters) == 0 && len(members) > int(obj.idealClusterSize) { // too many members for _, kicked := range members { // don't kick ourself unless we are the only one left... if kicked != obj.hostname || (obj.idealClusterSize == 0 && len(members) == 1) { quitters = []string{kicked} // XXX: use a better picker algorithm log.Printf("Etcd: Extras: %v", quitters) break } } } // we must remove them from the members API or it will look like a crash if lq := len(quitters); lq > 0 { log.Printf("Etcd: Quitters: Shutting down %d members...", lq) } for _, quitter := range quitters { mID, ok := Uint64KeyFromStrInMap(quitter, membersMap) if !ok { // programming error log.Fatalf("Etcd: Member Remove: Error: %v(%v) not in members list!", quitter, mID) } EtcdNominate(obj, quitter, nil) // unnominate // once we issue the above unnominate, that peer will // shutdown, and this might cause us to loose quorum, // therefore, let that member remove itself, and then // double check that it did happen in case delinquent // TODO: get built-in transactional member Add/Remove // functionality to avoid a separate nominate list... if quitter == obj.hostname { // remove in unnominate! log.Printf("Etcd: Quitters: Removing self...") continue // TODO: CtxDelayErr ? } log.Printf("Etcd: Waiting %d seconds for %s to self remove...", selfRemoveTimeout, quitter) time.Sleep(selfRemoveTimeout * time.Second) // in case the removed member doesn't remove itself, do it! removed, err := EtcdMemberRemove(obj, mID) if err != nil { return fmt.Errorf("Etcd: Member Remove: Error: %+v", err) } if removed { log.Printf("Etcd: Member Removed (forced): %v(%v)", quitter, mID) } // Remove the endpoint from our list to avoid blocking // future MemberList calls which would try and connect // to a missing endpoint... The endpoints should get // updated from the member exiting safely if it doesn't // crash, but if it did and/or since it's a race to see // if the update event will get seen before we need the // new data, just do it now anyways, then update the // endpoint list and trigger a reconnect. delete(obj.endpoints, quitter) // proactively delete it obj.endpointCallback(nil) // update! log.Printf("Member %s (%d) removed successfully!", quitter, mID) return &CtxReconnectErr{"a member was removed"} // retry asap and update endpoint list } return nil } // nominateCallback runs to respond to the nomination list change events // functionally, it controls the starting and stopping of the server process func (obj *EmbdEtcd) nominateCallback(re *RE) error { if TRACE { log.Printf("Trace: Etcd: nominateCallback()") defer log.Printf("Trace: Etcd: nominateCallback(): Finished!") } bootstrapping := len(obj.endpoints) == 0 var revision int64 // = 0 if re != nil { revision = re.response.Header.Revision } if !bootstrapping && (re == nil || revision != obj.lastRevision) { // don't reprocess if we've already processed this message // this can happen if the callback errors and is re-called obj.lastRevision = revision // if we tried to lookup the nominated members here (in etcd v3) // this would sometimes block because we would loose the cluster // leader once the current leader calls the MemberAdd API and it // steps down trying to form a two host cluster. Instead, we can // look at the event response data to read the nominated values! //nominated, err = EtcdNominated(obj) // nope, won't always work // since we only see what has *changed* in the response data, we // have to keep track of the original state and apply the deltas // this must be idempotent in case it errors and is called again // if we're retrying and we get a data format error, it's normal nominated := obj.nominated if nominated, err := ApplyDeltaEvents(re, nominated); err == nil { obj.nominated = nominated } else if !re.retryHint || err != errApplyDeltaEventsInconsistent { log.Fatal(err) } } else { // TODO: should we just use the above delta method for everything? //nominated, err := EtcdNominated(obj) // just get it //if err != nil { // return fmt.Errorf("Etcd: Nominate: Error: %+v", err) //} //obj.nominated = nominated // update our local copy } if n := obj.nominated; len(n) > 0 { log.Printf("Etcd: Nominated: %+v", n) } else { log.Printf("Etcd: Nominated: []") } // if there are no other peers, we create a new server _, exists := obj.nominated[obj.hostname] // FIXME: can we get rid of the len(obj.nominated) == 0 ? newCluster := len(obj.nominated) == 0 || (len(obj.nominated) == 1 && exists) if DEBUG { log.Printf("Etcd: nominateCallback(): newCluster: %v; exists: %v; obj.server == nil: %t", newCluster, exists, obj.server == nil) } // XXX check if i have actually volunteered first of all... if obj.server == nil && (newCluster || exists) { log.Printf("Etcd: StartServer(newCluster: %t): %+v", newCluster, obj.nominated) err := obj.StartServer( newCluster, // newCluster obj.nominated, // other peer members and urls or empty map ) if err != nil { // retry maxStartServerRetries times, then permanently fail return &CtxRetriesErr{maxStartServerRetries - re.retries, fmt.Sprintf("Etcd: StartServer: Error: %+v", err)} } if len(obj.endpoints) == 0 { // add server to obj.endpoints list... addresses := obj.LocalhostClientURLs() if len(addresses) == 0 { // probably a programming error... log.Fatal("Etcd: No valid clientUrls exist!") } obj.endpoints[obj.hostname] = addresses // now we have some! // client connects to one of the obj.endpoints servers... log.Printf("Etcd: Addresses are: %s", addresses) // XXX: just put this wherever for now so we don't block // nominate self so "member" list is correct for peers to see EtcdNominate(obj, obj.hostname, obj.serverURLs) // XXX if this fails, where will we retry this part ? } // advertise client urls if curls := obj.clientURLs; len(curls) > 0 { // XXX: don't advertise local addresses! 127.0.0.1:2381 doesn't really help remote hosts // XXX: but sometimes this is what we want... hmmm how do we decide? filter on callback? EtcdAdvertiseEndpoints(obj, curls) // XXX if this fails, where will we retry this part ? // force this to remove sentinel before we reconnect... obj.endpointCallback(nil) } return &CtxReconnectErr{"local server is running"} // trigger reconnect to self } else if obj.server != nil && !exists { // un advertise client urls EtcdAdvertiseEndpoints(obj, nil) // i have been un-nominated, remove self and shutdown server! if len(obj.nominated) != 0 { // don't call if nobody left but me! // this works around: https://github.com/coreos/etcd/issues/5482, // and it probably makes sense to avoid calling if we're the last log.Printf("Etcd: Member Remove: Removing self: %v", obj.memberID) removed, err := EtcdMemberRemove(obj, obj.memberID) if err != nil { return fmt.Errorf("Etcd: Member Remove: Error: %+v", err) } if removed { log.Printf("Etcd: Member Removed (self): %v(%v)", obj.hostname, obj.memberID) } } log.Printf("Etcd: DestroyServer...") obj.DestroyServer() // TODO: make sure to think about the implications of // shutting down and potentially intercepting signals // here after i've removed myself from the nominated! // if we are connected to self and other servers exist: trigger // if any of the obj.clientURLs are in the endpoints list, then // we are stale. it is not likely that the advertised endpoints // have been updated because we're still blocking the callback. stale := false for key, eps := range obj.endpoints { if key != obj.hostname && len(eps) > 0 { // other endpoints? stale = true // only half true so far break } } for _, curl := range obj.clientURLs { // these just got shutdown for _, ep := range obj.client.Endpoints() { if (curl.Host == ep || curl.String() == ep) && stale { // add back the sentinel to force update log.Printf("Etcd: Forcing endpoint callback...") obj.endpoints[seedSentinel] = nil //etcdtypes.URLs{} obj.endpointCallback(nil) // update! return &CtxReconnectErr{"local server has shutdown"} // trigger reconnect } } } } return nil } // endpointCallback runs to respond to the endpoint list change events func (obj *EmbdEtcd) endpointCallback(re *RE) error { if TRACE { log.Printf("Trace: Etcd: endpointCallback()") defer log.Printf("Trace: Etcd: endpointCallback(): Finished!") } // if the startup sentinel exists, or delta fails, then get a fresh copy endpoints := make(etcdtypes.URLsMap, len(obj.endpoints)) // this would copy the reference: endpoints := obj.endpoints for k, v := range obj.endpoints { endpoints[k] = make(etcdtypes.URLs, len(v)) copy(endpoints[k], v) } // updating _, exists := endpoints[seedSentinel] endpoints, err := ApplyDeltaEvents(re, endpoints) if err != nil || exists { // TODO: we could also lookup endpoints from the maintenance api endpoints, err = EtcdEndpoints(obj) if err != nil { return err } } // change detection var changed = false // do we need to update? if len(obj.endpoints) != len(endpoints) { changed = true } for k, v1 := range obj.endpoints { if changed { // catches previous statement and inner loop break break } v2, exists := endpoints[k] if !exists { changed = true break } if len(v1) != len(v2) { changed = true break } for i := range v1 { if v1[i] != v2[i] { changed = true break } } } // is the endpoint list different? if changed { obj.endpoints = endpoints // set if eps := endpoints; len(eps) > 0 { log.Printf("Etcd: Endpoints: %+v", eps) } else { log.Printf("Etcd: Endpoints: []") } // can happen if a server drops out for example return &CtxReconnectErr{"endpoint list changed"} // trigger reconnect with new endpoint list } return nil } // idealClusterSizeCallback runs to respond to the ideal cluster size changes func (obj *EmbdEtcd) idealClusterSizeCallback(re *RE) error { if TRACE { log.Printf("Trace: Etcd: idealClusterSizeCallback()") defer log.Printf("Trace: Etcd: idealClusterSizeCallback(): Finished!") } path := fmt.Sprintf("/%s/idealClusterSize", NS) for _, event := range re.response.Events { if key := bytes.NewBuffer(event.Kv.Key).String(); key != path { continue } if event.Type != etcd.EventTypePut { continue } val := bytes.NewBuffer(event.Kv.Value).String() if val == "" { continue } v, err := strconv.ParseUint(val, 10, 16) if err != nil { continue } if i := uint16(v); i > 0 { log.Printf("Etcd: Ideal cluster size is now: %d", i) obj.idealClusterSize = i // now, emulate the calling of the volunteerCallback... go func() { obj.wevents <- &RE{callback: obj.volunteerCallback, errCheck: true} // send event }() // don't block } } return nil } // LocalhostClientURLs returns the most localhost like URLs for direct connection // this gets clients to talk to the local servers first before searching remotely func (obj *EmbdEtcd) LocalhostClientURLs() etcdtypes.URLs { // look through obj.clientURLs and return the localhost ones urls := etcdtypes.URLs{} for _, x := range obj.clientURLs { // "localhost" or anything in 127.0.0.0/8 is valid! if s := x.Host; strings.HasPrefix(s, "localhost") || strings.HasPrefix(s, "127.") { urls = append(urls, x) } } return urls } // StartServer kicks of a new embedded etcd server. func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) error { var err error memberName := obj.hostname err = os.MkdirAll(obj.dataDir, 0770) if err != nil { log.Printf("Etcd: StartServer: Couldn't mkdir: %s.", obj.dataDir) log.Printf("Etcd: StartServer: Mkdir error: %s.", err) obj.DestroyServer() return err } // if no peer URLs exist, then starting a server is mostly only for some // testing, but etcd doesn't allow the value to be empty so we use this! peerURLs, _ := etcdtypes.NewURLs([]string{"http://localhost:0"}) if len(obj.serverURLs) > 0 { peerURLs = obj.serverURLs } initialPeerURLsMap := make(etcdtypes.URLsMap) for k, v := range peerURLsMap { initialPeerURLsMap[k] = v // copy } if _, exists := peerURLsMap[memberName]; !exists { initialPeerURLsMap[memberName] = peerURLs } // embed etcd cfg := embed.NewConfig() cfg.Name = memberName // hostname cfg.Dir = obj.dataDir cfg.ACUrls = obj.clientURLs cfg.APUrls = peerURLs cfg.LCUrls = obj.clientURLs cfg.LPUrls = peerURLs cfg.StrictReconfigCheck = false // XXX: workaround https://github.com/coreos/etcd/issues/6305 cfg.InitialCluster = initialPeerURLsMap.String() // including myself! if newCluster { cfg.ClusterState = embed.ClusterStateFlagNew } else { cfg.ClusterState = embed.ClusterStateFlagExisting } //cfg.ForceNewCluster = newCluster // TODO ? log.Printf("Etcd: StartServer: Starting server...") obj.server, err = embed.StartEtcd(cfg) if err != nil { return err } select { case <-obj.server.Server.ReadyNotify(): // we hang here if things are bad log.Printf("Etcd: StartServer: Done starting server!") // it didn't hang! case <-time.After(time.Duration(maxStartServerTimeout) * time.Second): e := fmt.Errorf("Etcd: StartServer: Timeout of %d seconds reached!", maxStartServerTimeout) log.Printf(e.Error()) obj.server.Server.Stop() // trigger a shutdown obj.serverwg.Add(1) // add for the DestroyServer() obj.DestroyServer() return e } //log.Fatal(<-obj.server.Err()) XXX log.Printf("Etcd: StartServer: Server running...") obj.memberID = uint64(obj.server.Server.ID()) // store member id for internal use obj.serverwg.Add(1) return nil } // DestroyServer shuts down the embedded etcd server portion func (obj *EmbdEtcd) DestroyServer() error { var err error log.Printf("Etcd: DestroyServer: Destroying...") if obj.server != nil { obj.server.Close() // this blocks until server has stopped } log.Printf("Etcd: DestroyServer: Done closing...") obj.memberID = 0 if obj.server == nil { // skip the .Done() below because we didn't .Add(1) it. return err } obj.server = nil // important because this is used as an isRunning flag log.Printf("Etcd: DestroyServer: Unlocking server...") obj.serverwg.Done() // -1 return err } // TODO: Could all these Etcd*(obj *EmbdEtcd, ...) functions which deal with the // interface between etcd paths and behaviour be grouped into a single struct ? // EtcdNominate nominates a particular client to be a server (peer) func EtcdNominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error { if TRACE { log.Printf("Trace: Etcd: EtcdNominate(%v): %v", hostname, urls.String()) defer log.Printf("Trace: Etcd: EtcdNominate(%v): Finished!", hostname) } // nominate someone to be a server nominate := fmt.Sprintf("/%s/nominated/%s", NS, hostname) ops := []etcd.Op{} // list of ops in this txn if urls != nil { ops = append(ops, etcd.OpPut(nominate, urls.String())) // TODO: add a TTL? (etcd.WithLease) } else { // delete message if set to erase ops = append(ops, etcd.OpDelete(nominate)) } if _, err := obj.Txn(nil, ops, nil); err != nil { return fmt.Errorf("Etcd: Nominate failed!") // exit in progress? } return nil } // EtcdNominated returns a urls map of nominated etcd server volunteers // NOTE: I know 'nominees' might be more correct, but is less consistent here func EtcdNominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { path := fmt.Sprintf("/%s/nominated/", NS) keyMap, err := obj.Get(path, etcd.WithPrefix()) // map[string]string, bool if err != nil { return nil, fmt.Errorf("Etcd: Nominated isn't available: %v", err) } nominated := make(etcdtypes.URLsMap) for key, val := range keyMap { // loop through directory of nominated if !strings.HasPrefix(key, path) { continue } name := key[len(path):] // get name of nominee if val == "" { // skip "erased" values continue } urls, err := etcdtypes.NewURLs(strings.Split(val, ",")) if err != nil { return nil, fmt.Errorf("Etcd: Nominated: Data format error!: %v", err) } nominated[name] = urls // add to map if DEBUG { log.Printf("Etcd: Nominated(%v): %v", name, val) } } return nominated, nil } // EtcdVolunteer offers yourself up to be a server if needed func EtcdVolunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error { if TRACE { log.Printf("Trace: Etcd: EtcdVolunteer(%v): %v", obj.hostname, urls.String()) defer log.Printf("Trace: Etcd: EtcdVolunteer(%v): Finished!", obj.hostname) } // volunteer to be a server volunteer := fmt.Sprintf("/%s/volunteers/%s", NS, obj.hostname) ops := []etcd.Op{} // list of ops in this txn if urls != nil { // XXX: adding a TTL is crucial! (i think) ops = append(ops, etcd.OpPut(volunteer, urls.String())) // value is usually a peer "serverURL" } else { // delete message if set to erase ops = append(ops, etcd.OpDelete(volunteer)) } if _, err := obj.Txn(nil, ops, nil); err != nil { return fmt.Errorf("Etcd: Volunteering failed!") // exit in progress? } return nil } // EtcdVolunteers returns a urls map of available etcd server volunteers func EtcdVolunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { if TRACE { log.Printf("Trace: Etcd: EtcdVolunteers()") defer log.Printf("Trace: Etcd: EtcdVolunteers(): Finished!") } path := fmt.Sprintf("/%s/volunteers/", NS) keyMap, err := obj.Get(path, etcd.WithPrefix()) if err != nil { return nil, fmt.Errorf("Etcd: Volunteers aren't available: %v", err) } volunteers := make(etcdtypes.URLsMap) for key, val := range keyMap { // loop through directory of volunteers if !strings.HasPrefix(key, path) { continue } name := key[len(path):] // get name of volunteer if val == "" { // skip "erased" values continue } urls, err := etcdtypes.NewURLs(strings.Split(val, ",")) if err != nil { return nil, fmt.Errorf("Etcd: Volunteers: Data format error!: %v", err) } volunteers[name] = urls // add to map if DEBUG { log.Printf("Etcd: Volunteer(%v): %v", name, val) } } return volunteers, nil } // EtcdAdvertiseEndpoints advertises the list of available client endpoints func EtcdAdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error { if TRACE { log.Printf("Trace: Etcd: EtcdAdvertiseEndpoints(%v): %v", obj.hostname, urls.String()) defer log.Printf("Trace: Etcd: EtcdAdvertiseEndpoints(%v): Finished!", obj.hostname) } // advertise endpoints endpoints := fmt.Sprintf("/%s/endpoints/%s", NS, obj.hostname) ops := []etcd.Op{} // list of ops in this txn if urls != nil { // TODO: add a TTL? (etcd.WithLease) ops = append(ops, etcd.OpPut(endpoints, urls.String())) // value is usually a "clientURL" } else { // delete message if set to erase ops = append(ops, etcd.OpDelete(endpoints)) } if _, err := obj.Txn(nil, ops, nil); err != nil { return fmt.Errorf("Etcd: Endpoint advertising failed!") // exit in progress? } return nil } // EtcdEndpoints returns a urls map of available etcd server endpoints func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { if TRACE { log.Printf("Trace: Etcd: EtcdEndpoints()") defer log.Printf("Trace: Etcd: EtcdEndpoints(): Finished!") } path := fmt.Sprintf("/%s/endpoints/", NS) keyMap, err := obj.Get(path, etcd.WithPrefix()) if err != nil { return nil, fmt.Errorf("Etcd: Endpoints aren't available: %v", err) } endpoints := make(etcdtypes.URLsMap) for key, val := range keyMap { // loop through directory of endpoints if !strings.HasPrefix(key, path) { continue } name := key[len(path):] // get name of volunteer if val == "" { // skip "erased" values continue } urls, err := etcdtypes.NewURLs(strings.Split(val, ",")) if err != nil { return nil, fmt.Errorf("Etcd: Endpoints: Data format error!: %v", err) } endpoints[name] = urls // add to map if DEBUG { log.Printf("Etcd: Endpoint(%v): %v", name, val) } } return endpoints, nil } // EtcdSetHostnameConverged sets whether a specific hostname is converged. func EtcdSetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error { if TRACE { log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%s): %v", hostname, isConverged) defer log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%v): Finished!", hostname) } converged := fmt.Sprintf("/%s/converged/%s", NS, hostname) op := []etcd.Op{etcd.OpPut(converged, fmt.Sprintf("%t", isConverged))} if _, err := obj.Txn(nil, op, nil); err != nil { // TODO: do we need a skipConv flag here too? return fmt.Errorf("Etcd: Set converged failed!") // exit in progress? } return nil } // EtcdHostnameConverged returns a map of every hostname's converged state. func EtcdHostnameConverged(obj *EmbdEtcd) (map[string]bool, error) { if TRACE { log.Printf("Trace: Etcd: EtcdHostnameConverged()") defer log.Printf("Trace: Etcd: EtcdHostnameConverged(): Finished!") } path := fmt.Sprintf("/%s/converged/", NS) keyMap, err := obj.ComplexGet(path, true, etcd.WithPrefix()) // don't un-converge if err != nil { return nil, fmt.Errorf("Etcd: Converged values aren't available: %v", err) } converged := make(map[string]bool) for key, val := range keyMap { // loop through directory... if !strings.HasPrefix(key, path) { continue } name := key[len(path):] // get name of key if val == "" { // skip "erased" values continue } b, err := strconv.ParseBool(val) if err != nil { return nil, fmt.Errorf("Etcd: Converged: Data format error!: %v", err) } converged[name] = b // add to map } return converged, nil } // EtcdAddHostnameConvergedWatcher adds a watcher with a callback that runs on // hostname state changes. func EtcdAddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error) { path := fmt.Sprintf("/%s/converged/", NS) internalCbFn := func(re *RE) error { // TODO: get the value from the response, and apply delta... // for now, just run a get operation which is easier to code! m, err := EtcdHostnameConverged(obj) if err != nil { return err } return callbackFn(m) // call my function } return obj.AddWatcher(path, internalCbFn, true, true, etcd.WithPrefix()) // no block and no converger reset } // EtcdSetClusterSize sets the ideal target cluster size of etcd peers func EtcdSetClusterSize(obj *EmbdEtcd, value uint16) error { if TRACE { log.Printf("Trace: Etcd: EtcdSetClusterSize(): %v", value) defer log.Printf("Trace: Etcd: EtcdSetClusterSize(): Finished!") } key := fmt.Sprintf("/%s/idealClusterSize", NS) if err := obj.Set(key, strconv.FormatUint(uint64(value), 10)); err != nil { return fmt.Errorf("Etcd: SetClusterSize failed!") // exit in progress? } return nil } // EtcdGetClusterSize gets the ideal target cluster size of etcd peers func EtcdGetClusterSize(obj *EmbdEtcd) (uint16, error) { key := fmt.Sprintf("/%s/idealClusterSize", NS) keyMap, err := obj.Get(key) if err != nil { return 0, fmt.Errorf("Etcd: GetClusterSize failed: %v", err) } val, exists := keyMap[key] if !exists || val == "" { return 0, fmt.Errorf("Etcd: GetClusterSize failed: %v", err) } v, err := strconv.ParseUint(val, 10, 16) if err != nil { return 0, fmt.Errorf("Etcd: GetClusterSize failed: %v", err) } return uint16(v), nil } // EtcdMemberAdd adds a member to the cluster. func EtcdMemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error) { //obj.Connect(false) // TODO ? ctx := context.Background() var response *etcd.MemberAddResponse var err error for { if obj.exiting { // the exit signal has been sent! return nil, fmt.Errorf("Exiting...") } obj.rLock.RLock() response, err = obj.client.MemberAdd(ctx, peerURLs.StringSlice()) obj.rLock.RUnlock() if err == nil { break } if ctx, err = obj.CtxError(ctx, err); err != nil { return nil, err } } return response, nil } // EtcdMemberRemove removes a member by mID and returns if it worked, and also // if there was an error. This is because it might have run without error, but // the member wasn't found, for example. func EtcdMemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) { //obj.Connect(false) // TODO ? ctx := context.Background() for { if obj.exiting { // the exit signal has been sent! return false, fmt.Errorf("Exiting...") } obj.rLock.RLock() _, err := obj.client.MemberRemove(ctx, mID) obj.rLock.RUnlock() if err == nil { break } else if err == rpctypes.ErrMemberNotFound { // if we get this, member already shut itself down :) return false, nil } if ctx, err = obj.CtxError(ctx, err); err != nil { return false, err } } return true, nil } // EtcdMembers returns information on cluster membership. // The member ID's are the keys, because an empty names means unstarted! // TODO: consider queueing this through the main loop with CtxError(ctx, err) func EtcdMembers(obj *EmbdEtcd) (map[uint64]string, error) { //obj.Connect(false) // TODO ? ctx := context.Background() var response *etcd.MemberListResponse var err error for { if obj.exiting { // the exit signal has been sent! return nil, fmt.Errorf("Exiting...") } obj.rLock.RLock() if TRACE { log.Printf("Trace: Etcd: EtcdMembers(): Endpoints are: %v", obj.client.Endpoints()) } response, err = obj.client.MemberList(ctx) obj.rLock.RUnlock() if err == nil { break } if ctx, err = obj.CtxError(ctx, err); err != nil { return nil, err } } members := make(map[uint64]string) for _, x := range response.Members { members[x.ID] = x.Name // x.Name will be "" if unstarted! } return members, nil } // EtcdLeader returns the current leader of the etcd server cluster func EtcdLeader(obj *EmbdEtcd) (string, error) { //obj.Connect(false) // TODO ? var err error membersMap := make(map[uint64]string) if membersMap, err = EtcdMembers(obj); err != nil { return "", err } addresses := obj.LocalhostClientURLs() // heuristic, but probably correct if len(addresses) == 0 { // probably a programming error... return "", fmt.Errorf("Etcd: Leader: Programming error!") } endpoint := addresses[0].Host // FIXME: arbitrarily picked the first one // part two ctx := context.Background() var response *etcd.StatusResponse for { if obj.exiting { // the exit signal has been sent! return "", fmt.Errorf("Exiting...") } obj.rLock.RLock() response, err = obj.client.Maintenance.Status(ctx, endpoint) obj.rLock.RUnlock() if err == nil { break } if ctx, err = obj.CtxError(ctx, err); err != nil { return "", err } } // isLeader: response.Header.MemberId == response.Leader for id, name := range membersMap { if id == response.Leader { return name, nil } } return "", fmt.Errorf("Etcd: Members map is not current!") // not found } // EtcdWatch returns a channel that outputs a true bool when activity occurs // TODO: Filter our watch (on the server side if possible) based on the // collection prefixes and filters that we care about... func EtcdWatch(obj *EmbdEtcd) chan bool { ch := make(chan bool, 1) // buffer it so we can measure it path := fmt.Sprintf("/%s/exported/", NS) callback := func(re *RE) error { // TODO: is this even needed? it used to happen on conn errors log.Printf("Etcd: Watch: Path: %v", path) // event if re == nil || re.response.Canceled { return fmt.Errorf("Etcd: Watch is empty!") // will cause a CtxError+retry } // we normally need to check if anything changed since the last // event, since a set (export) with no changes still causes the // watcher to trigger and this would cause an infinite loop. we // don't need to do this check anymore because we do the export // transactionally, and only if a change is needed. since it is // atomic, all the changes arrive together which avoids dupes!! if len(ch) == 0 { // send event only if one isn't pending // this check avoids multiple events all queueing up and then // being released continuously long after the changes stopped // do not block! ch <- true // event } return nil } _, _ = obj.AddWatcher(path, callback, true, false, etcd.WithPrefix()) // no need to check errors return ch } // EtcdSetResources exports all of the resources which we pass in to etcd func EtcdSetResources(obj *EmbdEtcd, hostname string, resources []Res) error { // key structure is /$NS/exported/$hostname/resources/$uuid = $data var kindFilter []string // empty to get from everyone hostnameFilter := []string{hostname} // this is not a race because we should only be reading keys which we // set, and there should not be any contention with other hosts here! originals, err := EtcdGetResources(obj, hostnameFilter, kindFilter) if err != nil { return err } if len(originals) == 0 && len(resources) == 0 { // special case of no add or del return nil } ifs := []etcd.Cmp{} // list matching the desired state ops := []etcd.Op{} // list of ops in this transaction for _, res := range resources { if res.Kind() == "" { log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.GetName()) } uuid := fmt.Sprintf("%s/%s", res.Kind(), res.GetName()) path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uuid) if data, err := ResToB64(res); err == nil { ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", data)) // desired state ops = append(ops, etcd.OpPut(path, data)) } else { return fmt.Errorf("Etcd: SetResources: Error: Can't convert to B64: %v", err) } } match := func(res Res, resources []Res) bool { // helper lambda for _, x := range resources { if res.Kind() == x.Kind() && res.GetName() == x.GetName() { return true } } return false } hasDeletes := false // delete old, now unused resources here... for _, res := range originals { if res.Kind() == "" { log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.GetName()) } uuid := fmt.Sprintf("%s/%s", res.Kind(), res.GetName()) path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uuid) if match(res, resources) { // if we match, no need to delete! continue } ops = append(ops, etcd.OpDelete(path)) hasDeletes = true } // if everything is already correct, do nothing, otherwise, run the ops! // it's important to do this in one transaction, and atomically, because // this way, we only generate one watch event, and only when it's needed if hasDeletes { // always run, ifs don't matter _, err = obj.Txn(nil, ops, nil) // TODO: does this run? it should! } else { _, err = obj.Txn(ifs, nil, ops) // TODO: do we need to look at response? } return err } // EtcdGetResources collects all of the resources which match a filter from etcd // If the kindfilter or hostnameFilter is empty, then it assumes no filtering... // TODO: Expand this with a more powerful filter based on what we eventually // support in our collect DSL. Ideally a server side filter like WithFilter() // We could do this if the pattern was /$NS/exported/$kind/$hostname/$uuid = $data func EtcdGetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]Res, error) { // key structure is /$NS/exported/$hostname/resources/$uuid = $data path := fmt.Sprintf("/%s/exported/", NS) resources := []Res{} keyMap, err := obj.Get(path, etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortAscend)) if err != nil { return nil, fmt.Errorf("Etcd: GetResources: Error: Could not get resources: %v", err) } for key, val := range keyMap { if !strings.HasPrefix(key, path) { // sanity check continue } str := strings.Split(key[len(path):], "/") if len(str) != 4 { return nil, fmt.Errorf("Etcd: GetResources: Error: Unexpected chunk count!") } hostname, r, kind, name := str[0], str[1], str[2], str[3] if r != "resources" { return nil, fmt.Errorf("Etcd: GetResources: Error: Unexpected chunk pattern!") } if kind == "" { return nil, fmt.Errorf("Etcd: GetResources: Error: Unexpected kind chunk!") } // FIXME: ideally this would be a server side filter instead! if len(hostnameFilter) > 0 && !StrInList(hostname, hostnameFilter) { continue } // FIXME: ideally this would be a server side filter instead! if len(kindFilter) > 0 && !StrInList(kind, kindFilter) { continue } if obj, err := B64ToRes(val); err == nil { obj.setKind(kind) // cheap init log.Printf("Etcd: Get: (Hostname, Kind, Name): (%s, %s, %s)", hostname, kind, name) resources = append(resources, obj) } else { return nil, fmt.Errorf("Etcd: GetResources: Error: Can't convert from B64: %v", err) } } return resources, nil } //func UrlRemoveScheme(urls etcdtypes.URLs) []string { // strs := []string{} // for _, u := range urls { // strs = append(strs, u.Host) // remove http:// prefix // } // return strs //} // ApplyDeltaEvents modifies a URLsMap with the deltas from a WatchResponse func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, error) { if re == nil { // passthrough return urlsmap, nil } for _, event := range re.response.Events { key := bytes.NewBuffer(event.Kv.Key).String() key = key[len(re.path):] // remove path prefix log.Printf("Etcd: ApplyDeltaEvents: Event(%s): %s", event.Type.String(), key) switch event.Type { case etcd.EventTypePut: val := bytes.NewBuffer(event.Kv.Value).String() if val == "" { return nil, fmt.Errorf("Etcd: ApplyDeltaEvents: Value is empty!") } urls, err := etcdtypes.NewURLs(strings.Split(val, ",")) if err != nil { return nil, fmt.Errorf("Etcd: ApplyDeltaEvents: Format error: %v", err) } urlsmap[key] = urls // add to map // expiry cases are seen as delete in v3 for now //case etcd.EventTypeExpire: // doesn't exist right now // fallthrough case etcd.EventTypeDelete: if _, exists := urlsmap[key]; !exists { // this can happen if we retry an operation b/w // a reconnect so ignore if we are reconnecting if DEBUG { log.Printf("Etcd: ApplyDeltaEvents: Inconsistent key: %v", key) } return nil, errApplyDeltaEventsInconsistent } delete(urlsmap, key) default: return nil, fmt.Errorf("Etcd: ApplyDeltaEvents: Error: Unknown event: %+v", event.Type) } } return urlsmap, nil }