From 7aeb55de70262ed0a5213565d10bd5678ed06d56 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Sun, 24 Jul 2016 01:57:01 -0400 Subject: [PATCH] Port embedded etcd to embed API This also updates the vendored version of etcd to current git master, which is the only place this is supported at the moment. --- etcd.go | 93 ++++++++++++++++------------------- vendor/github.com/coreos/etcd | 2 +- 2 files changed, 43 insertions(+), 52 deletions(-) diff --git a/etcd.go b/etcd.go index 103400e5..90ecb071 100644 --- a/etcd.go +++ b/etcd.go @@ -60,7 +60,7 @@ import ( "time" etcd "github.com/coreos/etcd/clientv3" // "clientv3" - "github.com/coreos/etcd/etcdmain" + "github.com/coreos/etcd/embed" "github.com/coreos/etcd/etcdserver" rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" etcdtypes "github.com/coreos/etcd/pkg/types" @@ -176,9 +176,8 @@ type EmbdEtcd struct { // EMBeddeD etcd // etcd server related serverwg sync.WaitGroup // wait for server to shutdown - etcdConn *etcdmain.EtcdConn - server *etcdserver.EtcdServer - dataDir string // XXX: incorporate into the "/var" functionality... + server *embed.Etcd // technically this contains the server struct + dataDir string // XXX: incorporate into the "/var" functionality... } // NewEmbdEtcd creates the top level embedded etcd struct client and server obj @@ -280,11 +279,19 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error { } obj.client, err = etcd.New(cfg) // connect! if err != nil { + log.Printf("Etcd: Connect: CtxError...") if _, e := obj.CtxError(context.TODO(), err); e != nil { + log.Printf("Etcd: Connect: CtxError: Fatal: %v", e) return e // fatal error } continue } + // check if we're actually connected here, because this must + // block if we're not connected + if obj.client == nil { + log.Printf("Etcd: Connect: Is nil!") + continue + } break } return nil @@ -361,7 +368,9 @@ func (obj *EmbdEtcd) Startup() error { go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, etcd.WithPrefix()) - obj.Connect(false) // don't exit from this Startup function until connected! + if e := obj.Connect(false); e != nil { // don't exit from this Startup function until connected! + return e + } return nil } @@ -585,7 +594,7 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, obj.cancelLock.Unlock() log.Printf("Etcd: CtxError: Reconnecting...") - obj.Connect(true) + obj.Connect(true) // FIXME: check returned error if DEBUG { log.Printf("Etcd: CtxError: Unlocking...") } @@ -614,7 +623,9 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, func (obj *EmbdEtcd) CallbackLoop() { cuuid := obj.converger.Register() defer cuuid.Unregister() - obj.Connect(false) + if e := obj.Connect(false); e != nil { + return // fatal + } for { ctx := context.Background() // TODO: inherit as input argument? select { @@ -664,7 +675,9 @@ func (obj *EmbdEtcd) CallbackLoop() { func (obj *EmbdEtcd) Loop() { cuuid := obj.converger.Register() defer cuuid.Unregister() - obj.Connect(false) + if e := obj.Connect(false); e != nil { + return // fatal + } for { ctx := context.Background() // TODO: inherit as input argument? // priority channel... @@ -1071,7 +1084,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error { log.Printf("Trace: Etcd: volunteerCallback()") defer log.Printf("Trace: Etcd: volunteerCallback(): Finished!") } - obj.Connect(false) + obj.Connect(false) // FIXME: check return error var err error // FIXME: if this is running in response to our OWN volunteering offer, @@ -1526,47 +1539,31 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) initialPeerURLsMap[memberName] = peerURLs } - obj.etcdConn = etcdmain.NewEtcdConn( - false, false, // TODO: for now - nil, nil, // TODO: for now - nil, peerURLs, obj.clientURLs, - obj.dataDir, nil, // XXX: is this the same datadir or not? - ) + // embed etcd + cfg := embed.NewConfig() + cfg.Name = memberName // hostname + cfg.Dir = obj.dataDir + cfg.ACUrls = obj.clientURLs + cfg.APUrls = peerURLs + cfg.LCUrls = obj.clientURLs + cfg.LPUrls = peerURLs - log.Printf("Etcd: StartServer: Initializing connections...") - if err := obj.etcdConn.Init(); err != nil { - if fatalErr, ok := err.(*etcdmain.ErrEtcdConnFatal); ok { // match? - log.Printf("Etcd: StartServer: Fatal: %+v", fatalErr.Err) - return fatalErr.Err - } - return err + cfg.InitialCluster = initialPeerURLsMap.String() // including myself! + if newCluster { + cfg.ClusterState = embed.ClusterStateFlagNew + } else { + cfg.ClusterState = embed.ClusterStateFlagExisting } + //cfg.ForceNewCluster = newCluster // TODO ? - cfg := &etcdserver.ServerConfig{ - Name: memberName, // hostname - ClientURLs: obj.clientURLs, - PeerURLs: peerURLs, - DataDir: obj.dataDir, - InitialPeerURLsMap: initialPeerURLsMap, // including myself! - NewCluster: newCluster, - //ForceNewCluster: newCluster, // TODO ? - TickMs: 100, - ElectionTicks: 10, - } - - obj.server, err = etcdserver.NewServer(cfg) + log.Printf("Etcd: StartServer: Starting server...") + obj.server, err = embed.StartEtcd(cfg) if err != nil { return err } - - log.Printf("Etcd: StartServer: Starting server...") - obj.server.Start() - obj.memberId = uint64(obj.server.ID()) // store member id for internal use - - log.Printf("Etcd: StartServer: Listening...") - if err := obj.etcdConn.Listen(obj.server); err != nil { - return err - } + //log.Fatal(<-obj.server.Err()) XXX + log.Printf("Etcd: StartServer: Server running...") + obj.memberId = uint64(obj.server.Server.ID()) // store member id for internal use obj.serverwg.Add(1) return nil @@ -1576,14 +1573,8 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) func (obj *EmbdEtcd) DestroyServer() error { var err error log.Printf("Etcd: DestroyServer: Destroying...") - if obj.server != nil { - obj.server.Stop() // this blocks until server has stopped - } - log.Printf("Etcd: DestroyServer: Done stopping...") - - if obj.etcdConn != nil { - obj.etcdConn.Close() // run the etcdStart defer operations + obj.server.Close() // this blocks until server has stopped } log.Printf("Etcd: DestroyServer: Done closing...") diff --git a/vendor/github.com/coreos/etcd b/vendor/github.com/coreos/etcd index 6f48bda7..6079be7d 160000 --- a/vendor/github.com/coreos/etcd +++ b/vendor/github.com/coreos/etcd @@ -1 +1 @@ -Subproject commit 6f48bda7ac36c8a53ce4210b06e5498947b08fab +Subproject commit 6079be7dae57eda252a3fc9342dfcee687f481b3