Revert "Copy in out of tree patches"
This reverts commit d26b503dca.
Use new etcd "embed" API.
This commit is contained in:
404
etcd.go
404
etcd.go
@@ -60,6 +60,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
etcd "github.com/coreos/etcd/clientv3" // "clientv3"
|
etcd "github.com/coreos/etcd/clientv3" // "clientv3"
|
||||||
|
"github.com/coreos/etcd/etcdmain"
|
||||||
"github.com/coreos/etcd/etcdserver"
|
"github.com/coreos/etcd/etcdserver"
|
||||||
rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
etcdtypes "github.com/coreos/etcd/pkg/types"
|
etcdtypes "github.com/coreos/etcd/pkg/types"
|
||||||
@@ -68,37 +69,6 @@ import (
|
|||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
// XXX: these imports are used for the EtcdConn code temporarily sitting here
|
|
||||||
import (
|
|
||||||
"crypto/tls"
|
|
||||||
"github.com/cockroachdb/cmux"
|
|
||||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
|
||||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
|
||||||
"github.com/coreos/etcd/pkg/cors"
|
|
||||||
runtimeutil "github.com/coreos/etcd/pkg/runtime"
|
|
||||||
etcdtransport "github.com/coreos/etcd/pkg/transport"
|
|
||||||
"github.com/coreos/etcd/rafthttp"
|
|
||||||
"github.com/coreos/pkg/capnslog"
|
|
||||||
defaultLog "log"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"path"
|
|
||||||
)
|
|
||||||
|
|
||||||
var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mgmt") // XXX EtcdConn
|
|
||||||
|
|
||||||
// internal fd usage includes disk usage and transport usage.
|
|
||||||
// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
|
|
||||||
// at most 2 to read/lock/write WALs. One case that it needs to 2 is to
|
|
||||||
// read all logs after some snapshot index, which locates at the end of
|
|
||||||
// the second last and the head of the last. For purging, it needs to read
|
|
||||||
// directory, so it needs 1. For fd monitor, it needs 1.
|
|
||||||
// For transport, rafthttp builds two long-polling connections and at most
|
|
||||||
// four temporary connections with each member. There are at most 9 members
|
|
||||||
// in a cluster, so it should reserve 96.
|
|
||||||
// For the safety, we set the total reserved number to 150.
|
|
||||||
const reservedInternalFDNum = 150 // XXX: used for the EtcdConn code temporarily
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
NS = "_mgmt" // root namespace for mgmt operations
|
NS = "_mgmt" // root namespace for mgmt operations
|
||||||
seedSentinel = "_seed" // you must not name your hostname this
|
seedSentinel = "_seed" // you must not name your hostname this
|
||||||
@@ -206,7 +176,7 @@ type EmbdEtcd struct { // EMBeddeD etcd
|
|||||||
|
|
||||||
// etcd server related
|
// etcd server related
|
||||||
serverwg sync.WaitGroup // wait for server to shutdown
|
serverwg sync.WaitGroup // wait for server to shutdown
|
||||||
etcdConn *EtcdConn
|
etcdConn *etcdmain.EtcdConn
|
||||||
server *etcdserver.EtcdServer
|
server *etcdserver.EtcdServer
|
||||||
dataDir string // XXX: incorporate into the "/var" functionality...
|
dataDir string // XXX: incorporate into the "/var" functionality...
|
||||||
}
|
}
|
||||||
@@ -1556,7 +1526,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap)
|
|||||||
initialPeerURLsMap[memberName] = peerURLs
|
initialPeerURLsMap[memberName] = peerURLs
|
||||||
}
|
}
|
||||||
|
|
||||||
obj.etcdConn = NewEtcdConn( // XXX: get this upstream!
|
obj.etcdConn = etcdmain.NewEtcdConn(
|
||||||
false, false, // TODO: for now
|
false, false, // TODO: for now
|
||||||
nil, nil, // TODO: for now
|
nil, nil, // TODO: for now
|
||||||
nil, peerURLs, obj.clientURLs,
|
nil, peerURLs, obj.clientURLs,
|
||||||
@@ -1565,7 +1535,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap)
|
|||||||
|
|
||||||
log.Printf("Etcd: StartServer: Initializing connections...")
|
log.Printf("Etcd: StartServer: Initializing connections...")
|
||||||
if err := obj.etcdConn.Init(); err != nil {
|
if err := obj.etcdConn.Init(); err != nil {
|
||||||
if fatalErr, ok := err.(*ErrEtcdConnFatal); ok { // match?
|
if fatalErr, ok := err.(*etcdmain.ErrEtcdConnFatal); ok { // match?
|
||||||
log.Printf("Etcd: StartServer: Fatal: %+v", fatalErr.Err)
|
log.Printf("Etcd: StartServer: Fatal: %+v", fatalErr.Err)
|
||||||
return fatalErr.Err
|
return fatalErr.Err
|
||||||
}
|
}
|
||||||
@@ -2146,369 +2116,3 @@ func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, err
|
|||||||
}
|
}
|
||||||
return urlsmap, nil
|
return urlsmap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// XXX Everything below here is intended to go upstream into etcd
|
|
||||||
|
|
||||||
type ErrEtcdConnFatal struct {
|
|
||||||
Err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *ErrEtcdConnFatal) Error() string {
|
|
||||||
return fmt.Sprintf("ErrEtcdConnFatal: %v", e.Err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// EtcdConn is a helper struct that contains all the connection starting code
|
|
||||||
type EtcdConn struct {
|
|
||||||
PeerAutoTLS bool
|
|
||||||
ClientAutoTLS bool
|
|
||||||
PeerTLSInfo *etcdtransport.TLSInfo
|
|
||||||
ClientTLSInfo *etcdtransport.TLSInfo
|
|
||||||
CorsInfo *cors.CORSInfo
|
|
||||||
Lpurls []url.URL
|
|
||||||
Lcurls []url.URL
|
|
||||||
Dir string
|
|
||||||
Plog *capnslog.PackageLogger
|
|
||||||
|
|
||||||
listen func(*etcdserver.EtcdServer)
|
|
||||||
defers []func()
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewEtcdConn wraps the common net and listener code for embedded etcd reuse!
|
|
||||||
func NewEtcdConn(peerAutoTLS, clientAutoTLS bool, peerTLSInfo, clientTLSInfo *etcdtransport.TLSInfo, corsInfo *cors.CORSInfo, lpurls, lcurls []url.URL, dir string, plog *capnslog.PackageLogger) *EtcdConn {
|
|
||||||
return &EtcdConn{
|
|
||||||
PeerAutoTLS: peerAutoTLS,
|
|
||||||
ClientAutoTLS: clientAutoTLS,
|
|
||||||
PeerTLSInfo: peerTLSInfo,
|
|
||||||
ClientTLSInfo: clientTLSInfo,
|
|
||||||
CorsInfo: corsInfo,
|
|
||||||
Lpurls: lpurls,
|
|
||||||
Lcurls: lcurls,
|
|
||||||
Dir: dir,
|
|
||||||
Plog: plog,
|
|
||||||
|
|
||||||
listen: nil,
|
|
||||||
defers: []func(){},
|
|
||||||
err: fmt.Errorf("not initialized"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ec *EtcdConn) Init() error {
|
|
||||||
var err error
|
|
||||||
if ec.PeerAutoTLS && ec.PeerTLSInfo != nil && ec.PeerTLSInfo.Empty() {
|
|
||||||
var phosts []string
|
|
||||||
for _, u := range ec.Lpurls {
|
|
||||||
phosts = append(phosts, u.Host)
|
|
||||||
}
|
|
||||||
peerTLSInfoPointer, err := etcdtransport.SelfCert(path.Join(ec.Dir, "fixtures/peer"), phosts)
|
|
||||||
ec.PeerTLSInfo = &peerTLSInfoPointer
|
|
||||||
if err != nil {
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Fatalf("could not get certs (%v)", err)
|
|
||||||
} else {
|
|
||||||
ec.err = fmt.Errorf("could not get certs (%v)", err)
|
|
||||||
return &ErrEtcdConnFatal{err}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if ec.PeerAutoTLS {
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Warningf("ignoring peer auto TLS since certs given")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ec.Plog != nil {
|
|
||||||
if !ec.PeerTLSInfo.Empty() {
|
|
||||||
ec.Plog.Infof("peerTLS: %s", ec.PeerTLSInfo)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var plns []net.Listener
|
|
||||||
for _, u := range ec.Lpurls {
|
|
||||||
if u.Scheme == "http" {
|
|
||||||
if ec.Plog != nil {
|
|
||||||
if !ec.PeerTLSInfo.Empty() {
|
|
||||||
ec.Plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
|
|
||||||
}
|
|
||||||
if ec.PeerTLSInfo.ClientCertAuth {
|
|
||||||
ec.Plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
var (
|
|
||||||
l net.Listener
|
|
||||||
tlscfg *tls.Config
|
|
||||||
)
|
|
||||||
|
|
||||||
if ec.PeerTLSInfo != nil && !ec.PeerTLSInfo.Empty() {
|
|
||||||
tlscfg, err = ec.PeerTLSInfo.ServerConfig()
|
|
||||||
if err != nil {
|
|
||||||
ec.err = err
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
l, err = rafthttp.NewListener(u, tlscfg)
|
|
||||||
if err != nil {
|
|
||||||
ec.err = err
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
urlStr := u.String()
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Info("listening for peers on ", urlStr)
|
|
||||||
}
|
|
||||||
ll := l // make a unique copy for the closure
|
|
||||||
d1 := func() {
|
|
||||||
if err != nil { // XXX
|
|
||||||
ll.Close()
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Info("stopping listening for peers on ", urlStr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ec.defers = append(ec.defers, d1)
|
|
||||||
plns = append(plns, ll)
|
|
||||||
}
|
|
||||||
|
|
||||||
if ec.ClientAutoTLS && ec.ClientTLSInfo != nil && ec.ClientTLSInfo.Empty() {
|
|
||||||
var chosts []string
|
|
||||||
for _, u := range ec.Lcurls {
|
|
||||||
chosts = append(chosts, u.Host)
|
|
||||||
}
|
|
||||||
clientTLSInfoPointer, err := etcdtransport.SelfCert(path.Join(ec.Dir, "fixtures/client"), chosts)
|
|
||||||
ec.ClientTLSInfo = &clientTLSInfoPointer
|
|
||||||
if err != nil {
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Fatalf("could not get certs (%v)", err)
|
|
||||||
} else {
|
|
||||||
ec.err = fmt.Errorf("could not get certs (%v)", err)
|
|
||||||
return &ErrEtcdConnFatal{err}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if ec.ClientAutoTLS {
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Warningf("ignoring client auto TLS since certs given")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var ctlscfg *tls.Config
|
|
||||||
if ec.ClientTLSInfo != nil && !ec.ClientTLSInfo.Empty() {
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Infof("clientTLS: %s", ec.ClientTLSInfo)
|
|
||||||
}
|
|
||||||
ctlscfg, err = ec.ClientTLSInfo.ServerConfig()
|
|
||||||
if err != nil {
|
|
||||||
ec.err = err
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sctxs := make(map[string]*serveCtx)
|
|
||||||
for _, u := range ec.Lcurls {
|
|
||||||
if u.Scheme == "http" {
|
|
||||||
if ec.ClientTLSInfo != nil && !ec.ClientTLSInfo.Empty() {
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ec.ClientTLSInfo != nil && ec.ClientTLSInfo.ClientCertAuth {
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if u.Scheme == "https" && ctlscfg == nil {
|
|
||||||
err = fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String())
|
|
||||||
ec.err = err
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := &serveCtx{host: u.Host}
|
|
||||||
|
|
||||||
if u.Scheme == "https" {
|
|
||||||
ctx.secure = true
|
|
||||||
} else {
|
|
||||||
ctx.insecure = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if sctxs[u.Host] != nil {
|
|
||||||
if ctx.secure {
|
|
||||||
sctxs[u.Host].secure = true
|
|
||||||
}
|
|
||||||
if ctx.insecure {
|
|
||||||
sctxs[u.Host].insecure = true
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var l net.Listener
|
|
||||||
|
|
||||||
l, err = net.Listen("tcp", u.Host)
|
|
||||||
if err != nil {
|
|
||||||
ec.err = err
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var fdLimit uint64
|
|
||||||
if fdLimit, err = runtimeutil.FDLimit(); err == nil {
|
|
||||||
if fdLimit <= reservedInternalFDNum {
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
|
|
||||||
} else {
|
|
||||||
err = fmt.Errorf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
|
|
||||||
ec.err = err
|
|
||||||
return &ErrEtcdConnFatal{err}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
l = etcdtransport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
|
|
||||||
}
|
|
||||||
|
|
||||||
l, err = etcdtransport.NewKeepAliveListener(l, "tcp", nil)
|
|
||||||
ctx.l = l
|
|
||||||
if err != nil {
|
|
||||||
ec.err = err
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Info("listening for client requests on ", u.Host)
|
|
||||||
}
|
|
||||||
ll := l
|
|
||||||
msg := u.Host
|
|
||||||
d2 := func() {
|
|
||||||
if err != nil { // XXX
|
|
||||||
ll.Close()
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Info("stopping listening for client requests on ", msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ec.defers = append(ec.defers, d2)
|
|
||||||
sctxs[u.Host] = ctx
|
|
||||||
}
|
|
||||||
|
|
||||||
// plns []net.Listener, ctlscfg *tls.Config, sctxs map[string]*serveCtx
|
|
||||||
listen := func(s *etcdserver.EtcdServer) {
|
|
||||||
ch := http.Handler(&cors.CORSHandler{
|
|
||||||
Handler: v2http.NewClientHandler(s, s.Cfg.ReqTimeout()),
|
|
||||||
Info: ec.CorsInfo,
|
|
||||||
})
|
|
||||||
ph := v2http.NewPeerHandler(s)
|
|
||||||
|
|
||||||
// Start the peer server in a goroutine
|
|
||||||
for _, l := range plns {
|
|
||||||
go func(l net.Listener) {
|
|
||||||
e := servePeerHTTP(l, ph)
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Fatal(e)
|
|
||||||
} else {
|
|
||||||
os.Exit(1) // TODO: write on a ch to notify
|
|
||||||
}
|
|
||||||
}(l)
|
|
||||||
}
|
|
||||||
// Start a client server goroutine for each listen address
|
|
||||||
for _, sctx := range sctxs {
|
|
||||||
go func(sctx *serveCtx) {
|
|
||||||
// read timeout does not work with http close notify
|
|
||||||
// TODO: https://github.com/golang/go/issues/9524
|
|
||||||
e := serve(sctx, s, ctlscfg, ch)
|
|
||||||
if ec.Plog != nil {
|
|
||||||
ec.Plog.Fatal(e)
|
|
||||||
} else {
|
|
||||||
os.Exit(1) // TODO: write on a ch to notify
|
|
||||||
}
|
|
||||||
}(sctx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ec.listen = listen
|
|
||||||
ec.err = err // likely nil
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ec *EtcdConn) Listen(s *etcdserver.EtcdServer) error {
|
|
||||||
if ec.err != nil { // error on init
|
|
||||||
return ec.err
|
|
||||||
}
|
|
||||||
ec.listen(s)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ec *EtcdConn) Close() {
|
|
||||||
for _, f := range ec.defers {
|
|
||||||
f()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// copied from etcdmain/serve.go
|
|
||||||
type serveCtx struct {
|
|
||||||
l net.Listener
|
|
||||||
host string
|
|
||||||
secure bool
|
|
||||||
insecure bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// serve accepts incoming connections on the listener l,
|
|
||||||
// creating a new service goroutine for each. The service goroutines
|
|
||||||
// read requests and then call handler to reply to them.
|
|
||||||
func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler) error {
|
|
||||||
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
|
|
||||||
|
|
||||||
<-s.ReadyNotify()
|
|
||||||
plog.Info("ready to serve client requests")
|
|
||||||
|
|
||||||
m := cmux.New(sctx.l)
|
|
||||||
|
|
||||||
if sctx.insecure {
|
|
||||||
gs := v3rpc.Server(s, nil)
|
|
||||||
grpcl := m.Match(cmux.HTTP2())
|
|
||||||
go func() { plog.Fatal(gs.Serve(grpcl)) }()
|
|
||||||
|
|
||||||
srvhttp := &http.Server{
|
|
||||||
Handler: handler,
|
|
||||||
ErrorLog: logger, // do not log user error
|
|
||||||
}
|
|
||||||
httpl := m.Match(cmux.HTTP1())
|
|
||||||
go func() { plog.Fatal(srvhttp.Serve(httpl)) }()
|
|
||||||
plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.host)
|
|
||||||
}
|
|
||||||
|
|
||||||
if sctx.secure {
|
|
||||||
gs := v3rpc.Server(s, tlscfg)
|
|
||||||
handler = grpcHandlerFunc(gs, handler)
|
|
||||||
|
|
||||||
tlsl := tls.NewListener(m.Match(cmux.Any()), tlscfg)
|
|
||||||
// TODO: add debug flag; enable logging when debug flag is set
|
|
||||||
srv := &http.Server{
|
|
||||||
Handler: handler,
|
|
||||||
TLSConfig: tlscfg,
|
|
||||||
ErrorLog: logger, // do not log user error
|
|
||||||
}
|
|
||||||
go func() { plog.Fatal(srv.Serve(tlsl)) }()
|
|
||||||
|
|
||||||
plog.Infof("serving client requests on %s", sctx.host)
|
|
||||||
}
|
|
||||||
|
|
||||||
return m.Serve()
|
|
||||||
}
|
|
||||||
|
|
||||||
// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
|
|
||||||
// connections or otherHandler otherwise. Copied from cockroachdb.
|
|
||||||
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
|
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
|
|
||||||
grpcServer.ServeHTTP(w, r)
|
|
||||||
} else {
|
|
||||||
otherHandler.ServeHTTP(w, r)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func servePeerHTTP(l net.Listener, handler http.Handler) error {
|
|
||||||
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
|
|
||||||
// TODO: add debug flag; enable logging when debug flag is set
|
|
||||||
srv := &http.Server{
|
|
||||||
Handler: handler,
|
|
||||||
ReadTimeout: 5 * time.Minute,
|
|
||||||
ErrorLog: logger, // do not log user error
|
|
||||||
}
|
|
||||||
return srv.Serve(l)
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user