Copy in out of tree patches
These patches are proposed upstream changes and code for and from etcd. Ideally we would revert this patch when/if things are merged upstream! The majority of the work is in: https://github.com/coreos/etcd/pull/5584
This commit is contained in:
404
etcd.go
404
etcd.go
@@ -60,7 +60,6 @@ import (
|
||||
"time"
|
||||
|
||||
etcd "github.com/coreos/etcd/clientv3" // "clientv3"
|
||||
"github.com/coreos/etcd/etcdmain"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
etcdtypes "github.com/coreos/etcd/pkg/types"
|
||||
@@ -69,6 +68,37 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// XXX: these imports are used for the EtcdConn code temporarily sitting here
|
||||
import (
|
||||
"crypto/tls"
|
||||
"github.com/cockroachdb/cmux"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
runtimeutil "github.com/coreos/etcd/pkg/runtime"
|
||||
etcdtransport "github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
defaultLog "log"
|
||||
"net"
|
||||
"net/http"
|
||||
"path"
|
||||
)
|
||||
|
||||
var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mgmt") // XXX EtcdConn
|
||||
|
||||
// internal fd usage includes disk usage and transport usage.
|
||||
// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
|
||||
// at most 2 to read/lock/write WALs. One case that it needs to 2 is to
|
||||
// read all logs after some snapshot index, which locates at the end of
|
||||
// the second last and the head of the last. For purging, it needs to read
|
||||
// directory, so it needs 1. For fd monitor, it needs 1.
|
||||
// For transport, rafthttp builds two long-polling connections and at most
|
||||
// four temporary connections with each member. There are at most 9 members
|
||||
// in a cluster, so it should reserve 96.
|
||||
// For the safety, we set the total reserved number to 150.
|
||||
const reservedInternalFDNum = 150 // XXX: used for the EtcdConn code temporarily
|
||||
|
||||
const (
|
||||
NS = "_mgmt" // root namespace for mgmt operations
|
||||
seedSentinel = "_seed" // you must not name your hostname this
|
||||
@@ -176,7 +206,7 @@ type EmbdEtcd struct { // EMBeddeD etcd
|
||||
|
||||
// etcd server related
|
||||
serverwg sync.WaitGroup // wait for server to shutdown
|
||||
etcdConn *etcdmain.EtcdConn
|
||||
etcdConn *EtcdConn
|
||||
server *etcdserver.EtcdServer
|
||||
dataDir string // XXX: incorporate into the "/var" functionality...
|
||||
}
|
||||
@@ -1526,7 +1556,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap)
|
||||
initialPeerURLsMap[memberName] = peerURLs
|
||||
}
|
||||
|
||||
obj.etcdConn = etcdmain.NewEtcdConn(
|
||||
obj.etcdConn = NewEtcdConn( // XXX: get this upstream!
|
||||
false, false, // TODO: for now
|
||||
nil, nil, // TODO: for now
|
||||
nil, peerURLs, obj.clientURLs,
|
||||
@@ -1535,7 +1565,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap)
|
||||
|
||||
log.Printf("Etcd: StartServer: Initializing connections...")
|
||||
if err := obj.etcdConn.Init(); err != nil {
|
||||
if fatalErr, ok := err.(*etcdmain.ErrEtcdConnFatal); ok { // match?
|
||||
if fatalErr, ok := err.(*ErrEtcdConnFatal); ok { // match?
|
||||
log.Printf("Etcd: StartServer: Fatal: %+v", fatalErr.Err)
|
||||
return fatalErr.Err
|
||||
}
|
||||
@@ -2116,3 +2146,369 @@ func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, err
|
||||
}
|
||||
return urlsmap, nil
|
||||
}
|
||||
|
||||
// XXX Everything below here is intended to go upstream into etcd
|
||||
|
||||
type ErrEtcdConnFatal struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
func (e *ErrEtcdConnFatal) Error() string {
|
||||
return fmt.Sprintf("ErrEtcdConnFatal: %v", e.Err)
|
||||
}
|
||||
|
||||
// EtcdConn is a helper struct that contains all the connection starting code
|
||||
type EtcdConn struct {
|
||||
PeerAutoTLS bool
|
||||
ClientAutoTLS bool
|
||||
PeerTLSInfo *etcdtransport.TLSInfo
|
||||
ClientTLSInfo *etcdtransport.TLSInfo
|
||||
CorsInfo *cors.CORSInfo
|
||||
Lpurls []url.URL
|
||||
Lcurls []url.URL
|
||||
Dir string
|
||||
Plog *capnslog.PackageLogger
|
||||
|
||||
listen func(*etcdserver.EtcdServer)
|
||||
defers []func()
|
||||
err error
|
||||
}
|
||||
|
||||
// NewEtcdConn wraps the common net and listener code for embedded etcd reuse!
|
||||
func NewEtcdConn(peerAutoTLS, clientAutoTLS bool, peerTLSInfo, clientTLSInfo *etcdtransport.TLSInfo, corsInfo *cors.CORSInfo, lpurls, lcurls []url.URL, dir string, plog *capnslog.PackageLogger) *EtcdConn {
|
||||
return &EtcdConn{
|
||||
PeerAutoTLS: peerAutoTLS,
|
||||
ClientAutoTLS: clientAutoTLS,
|
||||
PeerTLSInfo: peerTLSInfo,
|
||||
ClientTLSInfo: clientTLSInfo,
|
||||
CorsInfo: corsInfo,
|
||||
Lpurls: lpurls,
|
||||
Lcurls: lcurls,
|
||||
Dir: dir,
|
||||
Plog: plog,
|
||||
|
||||
listen: nil,
|
||||
defers: []func(){},
|
||||
err: fmt.Errorf("not initialized"),
|
||||
}
|
||||
}
|
||||
|
||||
func (ec *EtcdConn) Init() error {
|
||||
var err error
|
||||
if ec.PeerAutoTLS && ec.PeerTLSInfo != nil && ec.PeerTLSInfo.Empty() {
|
||||
var phosts []string
|
||||
for _, u := range ec.Lpurls {
|
||||
phosts = append(phosts, u.Host)
|
||||
}
|
||||
peerTLSInfoPointer, err := etcdtransport.SelfCert(path.Join(ec.Dir, "fixtures/peer"), phosts)
|
||||
ec.PeerTLSInfo = &peerTLSInfoPointer
|
||||
if err != nil {
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Fatalf("could not get certs (%v)", err)
|
||||
} else {
|
||||
ec.err = fmt.Errorf("could not get certs (%v)", err)
|
||||
return &ErrEtcdConnFatal{err}
|
||||
}
|
||||
}
|
||||
} else if ec.PeerAutoTLS {
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Warningf("ignoring peer auto TLS since certs given")
|
||||
}
|
||||
}
|
||||
if ec.Plog != nil {
|
||||
if !ec.PeerTLSInfo.Empty() {
|
||||
ec.Plog.Infof("peerTLS: %s", ec.PeerTLSInfo)
|
||||
}
|
||||
}
|
||||
|
||||
var plns []net.Listener
|
||||
for _, u := range ec.Lpurls {
|
||||
if u.Scheme == "http" {
|
||||
if ec.Plog != nil {
|
||||
if !ec.PeerTLSInfo.Empty() {
|
||||
ec.Plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
|
||||
}
|
||||
if ec.PeerTLSInfo.ClientCertAuth {
|
||||
ec.Plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
var (
|
||||
l net.Listener
|
||||
tlscfg *tls.Config
|
||||
)
|
||||
|
||||
if ec.PeerTLSInfo != nil && !ec.PeerTLSInfo.Empty() {
|
||||
tlscfg, err = ec.PeerTLSInfo.ServerConfig()
|
||||
if err != nil {
|
||||
ec.err = err
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
l, err = rafthttp.NewListener(u, tlscfg)
|
||||
if err != nil {
|
||||
ec.err = err
|
||||
return err
|
||||
}
|
||||
|
||||
urlStr := u.String()
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Info("listening for peers on ", urlStr)
|
||||
}
|
||||
ll := l // make a unique copy for the closure
|
||||
d1 := func() {
|
||||
if err != nil { // XXX
|
||||
ll.Close()
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Info("stopping listening for peers on ", urlStr)
|
||||
}
|
||||
}
|
||||
}
|
||||
ec.defers = append(ec.defers, d1)
|
||||
plns = append(plns, ll)
|
||||
}
|
||||
|
||||
if ec.ClientAutoTLS && ec.ClientTLSInfo != nil && ec.ClientTLSInfo.Empty() {
|
||||
var chosts []string
|
||||
for _, u := range ec.Lcurls {
|
||||
chosts = append(chosts, u.Host)
|
||||
}
|
||||
clientTLSInfoPointer, err := etcdtransport.SelfCert(path.Join(ec.Dir, "fixtures/client"), chosts)
|
||||
ec.ClientTLSInfo = &clientTLSInfoPointer
|
||||
if err != nil {
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Fatalf("could not get certs (%v)", err)
|
||||
} else {
|
||||
ec.err = fmt.Errorf("could not get certs (%v)", err)
|
||||
return &ErrEtcdConnFatal{err}
|
||||
}
|
||||
}
|
||||
} else if ec.ClientAutoTLS {
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Warningf("ignoring client auto TLS since certs given")
|
||||
}
|
||||
}
|
||||
|
||||
var ctlscfg *tls.Config
|
||||
if ec.ClientTLSInfo != nil && !ec.ClientTLSInfo.Empty() {
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Infof("clientTLS: %s", ec.ClientTLSInfo)
|
||||
}
|
||||
ctlscfg, err = ec.ClientTLSInfo.ServerConfig()
|
||||
if err != nil {
|
||||
ec.err = err
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
sctxs := make(map[string]*serveCtx)
|
||||
for _, u := range ec.Lcurls {
|
||||
if u.Scheme == "http" {
|
||||
if ec.ClientTLSInfo != nil && !ec.ClientTLSInfo.Empty() {
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String())
|
||||
}
|
||||
}
|
||||
if ec.ClientTLSInfo != nil && ec.ClientTLSInfo.ClientCertAuth {
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
if u.Scheme == "https" && ctlscfg == nil {
|
||||
err = fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String())
|
||||
ec.err = err
|
||||
return err
|
||||
}
|
||||
|
||||
ctx := &serveCtx{host: u.Host}
|
||||
|
||||
if u.Scheme == "https" {
|
||||
ctx.secure = true
|
||||
} else {
|
||||
ctx.insecure = true
|
||||
}
|
||||
|
||||
if sctxs[u.Host] != nil {
|
||||
if ctx.secure {
|
||||
sctxs[u.Host].secure = true
|
||||
}
|
||||
if ctx.insecure {
|
||||
sctxs[u.Host].insecure = true
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
var l net.Listener
|
||||
|
||||
l, err = net.Listen("tcp", u.Host)
|
||||
if err != nil {
|
||||
ec.err = err
|
||||
return err
|
||||
}
|
||||
|
||||
var fdLimit uint64
|
||||
if fdLimit, err = runtimeutil.FDLimit(); err == nil {
|
||||
if fdLimit <= reservedInternalFDNum {
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
|
||||
} else {
|
||||
err = fmt.Errorf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
|
||||
ec.err = err
|
||||
return &ErrEtcdConnFatal{err}
|
||||
}
|
||||
}
|
||||
l = etcdtransport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
|
||||
}
|
||||
|
||||
l, err = etcdtransport.NewKeepAliveListener(l, "tcp", nil)
|
||||
ctx.l = l
|
||||
if err != nil {
|
||||
ec.err = err
|
||||
return err
|
||||
}
|
||||
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Info("listening for client requests on ", u.Host)
|
||||
}
|
||||
ll := l
|
||||
msg := u.Host
|
||||
d2 := func() {
|
||||
if err != nil { // XXX
|
||||
ll.Close()
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Info("stopping listening for client requests on ", msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
ec.defers = append(ec.defers, d2)
|
||||
sctxs[u.Host] = ctx
|
||||
}
|
||||
|
||||
// plns []net.Listener, ctlscfg *tls.Config, sctxs map[string]*serveCtx
|
||||
listen := func(s *etcdserver.EtcdServer) {
|
||||
ch := http.Handler(&cors.CORSHandler{
|
||||
Handler: v2http.NewClientHandler(s, s.Cfg.ReqTimeout()),
|
||||
Info: ec.CorsInfo,
|
||||
})
|
||||
ph := v2http.NewPeerHandler(s)
|
||||
|
||||
// Start the peer server in a goroutine
|
||||
for _, l := range plns {
|
||||
go func(l net.Listener) {
|
||||
e := servePeerHTTP(l, ph)
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Fatal(e)
|
||||
} else {
|
||||
os.Exit(1) // TODO: write on a ch to notify
|
||||
}
|
||||
}(l)
|
||||
}
|
||||
// Start a client server goroutine for each listen address
|
||||
for _, sctx := range sctxs {
|
||||
go func(sctx *serveCtx) {
|
||||
// read timeout does not work with http close notify
|
||||
// TODO: https://github.com/golang/go/issues/9524
|
||||
e := serve(sctx, s, ctlscfg, ch)
|
||||
if ec.Plog != nil {
|
||||
ec.Plog.Fatal(e)
|
||||
} else {
|
||||
os.Exit(1) // TODO: write on a ch to notify
|
||||
}
|
||||
}(sctx)
|
||||
}
|
||||
}
|
||||
ec.listen = listen
|
||||
ec.err = err // likely nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (ec *EtcdConn) Listen(s *etcdserver.EtcdServer) error {
|
||||
if ec.err != nil { // error on init
|
||||
return ec.err
|
||||
}
|
||||
ec.listen(s)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ec *EtcdConn) Close() {
|
||||
for _, f := range ec.defers {
|
||||
f()
|
||||
}
|
||||
}
|
||||
|
||||
// copied from etcdmain/serve.go
|
||||
type serveCtx struct {
|
||||
l net.Listener
|
||||
host string
|
||||
secure bool
|
||||
insecure bool
|
||||
}
|
||||
|
||||
// serve accepts incoming connections on the listener l,
|
||||
// creating a new service goroutine for each. The service goroutines
|
||||
// read requests and then call handler to reply to them.
|
||||
func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler) error {
|
||||
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
|
||||
|
||||
<-s.ReadyNotify()
|
||||
plog.Info("ready to serve client requests")
|
||||
|
||||
m := cmux.New(sctx.l)
|
||||
|
||||
if sctx.insecure {
|
||||
gs := v3rpc.Server(s, nil)
|
||||
grpcl := m.Match(cmux.HTTP2())
|
||||
go func() { plog.Fatal(gs.Serve(grpcl)) }()
|
||||
|
||||
srvhttp := &http.Server{
|
||||
Handler: handler,
|
||||
ErrorLog: logger, // do not log user error
|
||||
}
|
||||
httpl := m.Match(cmux.HTTP1())
|
||||
go func() { plog.Fatal(srvhttp.Serve(httpl)) }()
|
||||
plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.host)
|
||||
}
|
||||
|
||||
if sctx.secure {
|
||||
gs := v3rpc.Server(s, tlscfg)
|
||||
handler = grpcHandlerFunc(gs, handler)
|
||||
|
||||
tlsl := tls.NewListener(m.Match(cmux.Any()), tlscfg)
|
||||
// TODO: add debug flag; enable logging when debug flag is set
|
||||
srv := &http.Server{
|
||||
Handler: handler,
|
||||
TLSConfig: tlscfg,
|
||||
ErrorLog: logger, // do not log user error
|
||||
}
|
||||
go func() { plog.Fatal(srv.Serve(tlsl)) }()
|
||||
|
||||
plog.Infof("serving client requests on %s", sctx.host)
|
||||
}
|
||||
|
||||
return m.Serve()
|
||||
}
|
||||
|
||||
// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
|
||||
// connections or otherHandler otherwise. Copied from cockroachdb.
|
||||
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
|
||||
grpcServer.ServeHTTP(w, r)
|
||||
} else {
|
||||
otherHandler.ServeHTTP(w, r)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func servePeerHTTP(l net.Listener, handler http.Handler) error {
|
||||
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
|
||||
// TODO: add debug flag; enable logging when debug flag is set
|
||||
srv := &http.Server{
|
||||
Handler: handler,
|
||||
ReadTimeout: 5 * time.Minute,
|
||||
ErrorLog: logger, // do not log user error
|
||||
}
|
||||
return srv.Serve(l)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user