Port embedded etcd to embed API

This also updates the vendored version of etcd to current git master,
which is the only place this is supported at the moment.
This commit is contained in:
James Shubin
2016-07-24 01:57:01 -04:00
parent 8ca65f9fda
commit 7aeb55de70
2 changed files with 43 additions and 52 deletions

91
etcd.go
View File

@@ -60,7 +60,7 @@ import (
"time" "time"
etcd "github.com/coreos/etcd/clientv3" // "clientv3" etcd "github.com/coreos/etcd/clientv3" // "clientv3"
"github.com/coreos/etcd/etcdmain" "github.com/coreos/etcd/embed"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
etcdtypes "github.com/coreos/etcd/pkg/types" etcdtypes "github.com/coreos/etcd/pkg/types"
@@ -176,8 +176,7 @@ type EmbdEtcd struct { // EMBeddeD etcd
// etcd server related // etcd server related
serverwg sync.WaitGroup // wait for server to shutdown serverwg sync.WaitGroup // wait for server to shutdown
etcdConn *etcdmain.EtcdConn server *embed.Etcd // technically this contains the server struct
server *etcdserver.EtcdServer
dataDir string // XXX: incorporate into the "/var" functionality... dataDir string // XXX: incorporate into the "/var" functionality...
} }
@@ -280,11 +279,19 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error {
} }
obj.client, err = etcd.New(cfg) // connect! obj.client, err = etcd.New(cfg) // connect!
if err != nil { if err != nil {
log.Printf("Etcd: Connect: CtxError...")
if _, e := obj.CtxError(context.TODO(), err); e != nil { if _, e := obj.CtxError(context.TODO(), err); e != nil {
log.Printf("Etcd: Connect: CtxError: Fatal: %v", e)
return e // fatal error return e // fatal error
} }
continue continue
} }
// check if we're actually connected here, because this must
// block if we're not connected
if obj.client == nil {
log.Printf("Etcd: Connect: Is nil!")
continue
}
break break
} }
return nil return nil
@@ -361,7 +368,9 @@ func (obj *EmbdEtcd) Startup() error {
go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, etcd.WithPrefix()) go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, etcd.WithPrefix())
obj.Connect(false) // don't exit from this Startup function until connected! if e := obj.Connect(false); e != nil { // don't exit from this Startup function until connected!
return e
}
return nil return nil
} }
@@ -585,7 +594,7 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
obj.cancelLock.Unlock() obj.cancelLock.Unlock()
log.Printf("Etcd: CtxError: Reconnecting...") log.Printf("Etcd: CtxError: Reconnecting...")
obj.Connect(true) obj.Connect(true) // FIXME: check returned error
if DEBUG { if DEBUG {
log.Printf("Etcd: CtxError: Unlocking...") log.Printf("Etcd: CtxError: Unlocking...")
} }
@@ -614,7 +623,9 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
func (obj *EmbdEtcd) CallbackLoop() { func (obj *EmbdEtcd) CallbackLoop() {
cuuid := obj.converger.Register() cuuid := obj.converger.Register()
defer cuuid.Unregister() defer cuuid.Unregister()
obj.Connect(false) if e := obj.Connect(false); e != nil {
return // fatal
}
for { for {
ctx := context.Background() // TODO: inherit as input argument? ctx := context.Background() // TODO: inherit as input argument?
select { select {
@@ -664,7 +675,9 @@ func (obj *EmbdEtcd) CallbackLoop() {
func (obj *EmbdEtcd) Loop() { func (obj *EmbdEtcd) Loop() {
cuuid := obj.converger.Register() cuuid := obj.converger.Register()
defer cuuid.Unregister() defer cuuid.Unregister()
obj.Connect(false) if e := obj.Connect(false); e != nil {
return // fatal
}
for { for {
ctx := context.Background() // TODO: inherit as input argument? ctx := context.Background() // TODO: inherit as input argument?
// priority channel... // priority channel...
@@ -1071,7 +1084,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
log.Printf("Trace: Etcd: volunteerCallback()") log.Printf("Trace: Etcd: volunteerCallback()")
defer log.Printf("Trace: Etcd: volunteerCallback(): Finished!") defer log.Printf("Trace: Etcd: volunteerCallback(): Finished!")
} }
obj.Connect(false) obj.Connect(false) // FIXME: check return error
var err error var err error
// FIXME: if this is running in response to our OWN volunteering offer, // FIXME: if this is running in response to our OWN volunteering offer,
@@ -1526,47 +1539,31 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap)
initialPeerURLsMap[memberName] = peerURLs initialPeerURLsMap[memberName] = peerURLs
} }
obj.etcdConn = etcdmain.NewEtcdConn( // embed etcd
false, false, // TODO: for now cfg := embed.NewConfig()
nil, nil, // TODO: for now cfg.Name = memberName // hostname
nil, peerURLs, obj.clientURLs, cfg.Dir = obj.dataDir
obj.dataDir, nil, // XXX: is this the same datadir or not? cfg.ACUrls = obj.clientURLs
) cfg.APUrls = peerURLs
cfg.LCUrls = obj.clientURLs
cfg.LPUrls = peerURLs
log.Printf("Etcd: StartServer: Initializing connections...") cfg.InitialCluster = initialPeerURLsMap.String() // including myself!
if err := obj.etcdConn.Init(); err != nil { if newCluster {
if fatalErr, ok := err.(*etcdmain.ErrEtcdConnFatal); ok { // match? cfg.ClusterState = embed.ClusterStateFlagNew
log.Printf("Etcd: StartServer: Fatal: %+v", fatalErr.Err) } else {
return fatalErr.Err cfg.ClusterState = embed.ClusterStateFlagExisting
}
return err
} }
//cfg.ForceNewCluster = newCluster // TODO ?
cfg := &etcdserver.ServerConfig{ log.Printf("Etcd: StartServer: Starting server...")
Name: memberName, // hostname obj.server, err = embed.StartEtcd(cfg)
ClientURLs: obj.clientURLs,
PeerURLs: peerURLs,
DataDir: obj.dataDir,
InitialPeerURLsMap: initialPeerURLsMap, // including myself!
NewCluster: newCluster,
//ForceNewCluster: newCluster, // TODO ?
TickMs: 100,
ElectionTicks: 10,
}
obj.server, err = etcdserver.NewServer(cfg)
if err != nil { if err != nil {
return err return err
} }
//log.Fatal(<-obj.server.Err()) XXX
log.Printf("Etcd: StartServer: Starting server...") log.Printf("Etcd: StartServer: Server running...")
obj.server.Start() obj.memberId = uint64(obj.server.Server.ID()) // store member id for internal use
obj.memberId = uint64(obj.server.ID()) // store member id for internal use
log.Printf("Etcd: StartServer: Listening...")
if err := obj.etcdConn.Listen(obj.server); err != nil {
return err
}
obj.serverwg.Add(1) obj.serverwg.Add(1)
return nil return nil
@@ -1576,14 +1573,8 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap)
func (obj *EmbdEtcd) DestroyServer() error { func (obj *EmbdEtcd) DestroyServer() error {
var err error var err error
log.Printf("Etcd: DestroyServer: Destroying...") log.Printf("Etcd: DestroyServer: Destroying...")
if obj.server != nil { if obj.server != nil {
obj.server.Stop() // this blocks until server has stopped obj.server.Close() // this blocks until server has stopped
}
log.Printf("Etcd: DestroyServer: Done stopping...")
if obj.etcdConn != nil {
obj.etcdConn.Close() // run the etcdStart defer operations
} }
log.Printf("Etcd: DestroyServer: Done closing...") log.Printf("Etcd: DestroyServer: Done closing...")