diff --git a/etcd/etcd.go b/etcd/etcd.go index 4daca922..3351b0dc 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -81,8 +81,8 @@ import ( const ( NS = "_mgmt" // root namespace for mgmt operations seedSentinel = "_seed" // you must not name your hostname this - maxStartServerTimeout = 60 // max number of seconds to wait for server to start - maxStartServerRetries = 3 // number of times to retry starting the etcd server + MaxStartServerTimeout = 60 // max number of seconds to wait for server to start + MaxStartServerRetries = 3 // number of times to retry starting the etcd server maxClientConnectRetries = 5 // number of times to retry consecutive connect failures selfRemoveTimeout = 3 // give unnominated members a chance to self exit exitDelay = 3 // number of sec of inactivity after exit to clean up @@ -201,9 +201,10 @@ type EmbdEtcd struct { // EMBeddeD etcd converger converger.Converger // converged tracking // etcd server related - serverwg sync.WaitGroup // wait for server to shutdown - server *embed.Etcd // technically this contains the server struct - dataDir string // our data dir, prefix + "etcd" + serverwg sync.WaitGroup // wait for server to shutdown + server *embed.Etcd // technically this contains the server struct + dataDir string // our data dir, prefix + "etcd" + serverReady chan struct{} // closes when ready } // NewEmbdEtcd creates the top level embedded etcd struct client and server obj. @@ -239,6 +240,7 @@ func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, flags: flags, prefix: prefix, dataDir: path.Join(prefix, "etcd"), + serverReady: make(chan struct{}), } // TODO: add some sort of auto assign method for picking these defaults // add a default so that our local client can connect locally if needed @@ -1418,8 +1420,8 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error { if re != nil { retries = re.retries } - // retry maxStartServerRetries times, then permanently fail - return &CtxRetriesErr{maxStartServerRetries - retries, fmt.Sprintf("Etcd: StartServer: Error: %+v", err)} + // retry MaxStartServerRetries times, then permanently fail + return &CtxRetriesErr{MaxStartServerRetries - retries, fmt.Sprintf("Etcd: StartServer: Error: %+v", err)} } if len(obj.endpoints) == 0 { @@ -1670,8 +1672,8 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) select { case <-obj.server.Server.ReadyNotify(): // we hang here if things are bad log.Printf("Etcd: StartServer: Done starting server!") // it didn't hang! - case <-time.After(time.Duration(maxStartServerTimeout) * time.Second): - e := fmt.Errorf("timeout of %d seconds reached", maxStartServerTimeout) + case <-time.After(time.Duration(MaxStartServerTimeout) * time.Second): + e := fmt.Errorf("timeout of %d seconds reached", MaxStartServerTimeout) log.Printf("Etcd: StartServer: %s", e.Error()) obj.server.Server.Stop() // trigger a shutdown obj.serverwg.Add(1) // add for the DestroyServer() @@ -1689,11 +1691,15 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) //log.Fatal(<-obj.server.Err()) XXX log.Printf("Etcd: StartServer: Server running...") obj.memberID = uint64(obj.server.Server.ID()) // store member id for internal use + close(obj.serverReady) // send a signal obj.serverwg.Add(1) return nil } +// ServerReady returns on a channel when the server has started successfully. +func (obj *EmbdEtcd) ServerReady() <-chan struct{} { return obj.serverReady } + // DestroyServer shuts down the embedded etcd server portion. func (obj *EmbdEtcd) DestroyServer() error { var err error @@ -1709,7 +1715,8 @@ func (obj *EmbdEtcd) DestroyServer() error { } obj.server = nil // important because this is used as an isRunning flag log.Printf("Etcd: DestroyServer: Unlocking server...") - obj.serverwg.Done() // -1 + obj.serverReady = make(chan struct{}) // reset the signal + obj.serverwg.Done() // -1 return err } diff --git a/lib/main.go b/lib/main.go index a53a1170..92246b93 100644 --- a/lib/main.go +++ b/lib/main.go @@ -346,6 +346,16 @@ func (obj *Main) Run() error { } else if err := EmbdEtcd.Startup(); err != nil { // startup (returns when etcd main loop is running) obj.Exit(fmt.Errorf("Main: Etcd: Startup failed: %v", err)) } + + // wait for etcd server to be ready before continuing... + select { + case <-EmbdEtcd.ServerReady(): + log.Printf("Main: Etcd: Server: Ready!") + // pass + case <-time.After(((etcd.MaxStartServerTimeout * etcd.MaxStartServerRetries) + 1) * time.Second): + obj.Exit(fmt.Errorf("Main: Etcd: Startup timeout")) + } + convergerStateFn := func(b bool) error { // exit if we are using the converged timeout and we are the // root node. otherwise, if we are a child node in a remote