From 8ca65f9fdab64afe7e6784fe113ed3c41316131e Mon Sep 17 00:00:00 2001 From: James Shubin Date: Sun, 24 Jul 2016 00:08:58 -0400 Subject: [PATCH] Revert "Copy in out of tree patches" This reverts commit d26b503dcaa59707723212db8c2d86af0c1b0d30. Use new etcd "embed" API. --- etcd.go | 404 +------------------------------------------------------- 1 file changed, 4 insertions(+), 400 deletions(-) diff --git a/etcd.go b/etcd.go index b4c3c4ab..103400e5 100644 --- a/etcd.go +++ b/etcd.go @@ -60,6 +60,7 @@ import ( "time" etcd "github.com/coreos/etcd/clientv3" // "clientv3" + "github.com/coreos/etcd/etcdmain" "github.com/coreos/etcd/etcdserver" rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" etcdtypes "github.com/coreos/etcd/pkg/types" @@ -68,37 +69,6 @@ import ( "google.golang.org/grpc" ) -// XXX: these imports are used for the EtcdConn code temporarily sitting here -import ( - "crypto/tls" - "github.com/cockroachdb/cmux" - "github.com/coreos/etcd/etcdserver/api/v2http" - "github.com/coreos/etcd/etcdserver/api/v3rpc" - "github.com/coreos/etcd/pkg/cors" - runtimeutil "github.com/coreos/etcd/pkg/runtime" - etcdtransport "github.com/coreos/etcd/pkg/transport" - "github.com/coreos/etcd/rafthttp" - "github.com/coreos/pkg/capnslog" - defaultLog "log" - "net" - "net/http" - "path" -) - -var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mgmt") // XXX EtcdConn - -// internal fd usage includes disk usage and transport usage. -// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs -// at most 2 to read/lock/write WALs. One case that it needs to 2 is to -// read all logs after some snapshot index, which locates at the end of -// the second last and the head of the last. For purging, it needs to read -// directory, so it needs 1. For fd monitor, it needs 1. -// For transport, rafthttp builds two long-polling connections and at most -// four temporary connections with each member. There are at most 9 members -// in a cluster, so it should reserve 96. -// For the safety, we set the total reserved number to 150. -const reservedInternalFDNum = 150 // XXX: used for the EtcdConn code temporarily - const ( NS = "_mgmt" // root namespace for mgmt operations seedSentinel = "_seed" // you must not name your hostname this @@ -206,7 +176,7 @@ type EmbdEtcd struct { // EMBeddeD etcd // etcd server related serverwg sync.WaitGroup // wait for server to shutdown - etcdConn *EtcdConn + etcdConn *etcdmain.EtcdConn server *etcdserver.EtcdServer dataDir string // XXX: incorporate into the "/var" functionality... } @@ -1556,7 +1526,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) initialPeerURLsMap[memberName] = peerURLs } - obj.etcdConn = NewEtcdConn( // XXX: get this upstream! + obj.etcdConn = etcdmain.NewEtcdConn( false, false, // TODO: for now nil, nil, // TODO: for now nil, peerURLs, obj.clientURLs, @@ -1565,7 +1535,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) log.Printf("Etcd: StartServer: Initializing connections...") if err := obj.etcdConn.Init(); err != nil { - if fatalErr, ok := err.(*ErrEtcdConnFatal); ok { // match? + if fatalErr, ok := err.(*etcdmain.ErrEtcdConnFatal); ok { // match? log.Printf("Etcd: StartServer: Fatal: %+v", fatalErr.Err) return fatalErr.Err } @@ -2146,369 +2116,3 @@ func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, err } return urlsmap, nil } - -// XXX Everything below here is intended to go upstream into etcd - -type ErrEtcdConnFatal struct { - Err error -} - -func (e *ErrEtcdConnFatal) Error() string { - return fmt.Sprintf("ErrEtcdConnFatal: %v", e.Err) -} - -// EtcdConn is a helper struct that contains all the connection starting code -type EtcdConn struct { - PeerAutoTLS bool - ClientAutoTLS bool - PeerTLSInfo *etcdtransport.TLSInfo - ClientTLSInfo *etcdtransport.TLSInfo - CorsInfo *cors.CORSInfo - Lpurls []url.URL - Lcurls []url.URL - Dir string - Plog *capnslog.PackageLogger - - listen func(*etcdserver.EtcdServer) - defers []func() - err error -} - -// NewEtcdConn wraps the common net and listener code for embedded etcd reuse! -func NewEtcdConn(peerAutoTLS, clientAutoTLS bool, peerTLSInfo, clientTLSInfo *etcdtransport.TLSInfo, corsInfo *cors.CORSInfo, lpurls, lcurls []url.URL, dir string, plog *capnslog.PackageLogger) *EtcdConn { - return &EtcdConn{ - PeerAutoTLS: peerAutoTLS, - ClientAutoTLS: clientAutoTLS, - PeerTLSInfo: peerTLSInfo, - ClientTLSInfo: clientTLSInfo, - CorsInfo: corsInfo, - Lpurls: lpurls, - Lcurls: lcurls, - Dir: dir, - Plog: plog, - - listen: nil, - defers: []func(){}, - err: fmt.Errorf("not initialized"), - } -} - -func (ec *EtcdConn) Init() error { - var err error - if ec.PeerAutoTLS && ec.PeerTLSInfo != nil && ec.PeerTLSInfo.Empty() { - var phosts []string - for _, u := range ec.Lpurls { - phosts = append(phosts, u.Host) - } - peerTLSInfoPointer, err := etcdtransport.SelfCert(path.Join(ec.Dir, "fixtures/peer"), phosts) - ec.PeerTLSInfo = &peerTLSInfoPointer - if err != nil { - if ec.Plog != nil { - ec.Plog.Fatalf("could not get certs (%v)", err) - } else { - ec.err = fmt.Errorf("could not get certs (%v)", err) - return &ErrEtcdConnFatal{err} - } - } - } else if ec.PeerAutoTLS { - if ec.Plog != nil { - ec.Plog.Warningf("ignoring peer auto TLS since certs given") - } - } - if ec.Plog != nil { - if !ec.PeerTLSInfo.Empty() { - ec.Plog.Infof("peerTLS: %s", ec.PeerTLSInfo) - } - } - - var plns []net.Listener - for _, u := range ec.Lpurls { - if u.Scheme == "http" { - if ec.Plog != nil { - if !ec.PeerTLSInfo.Empty() { - ec.Plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) - } - if ec.PeerTLSInfo.ClientCertAuth { - ec.Plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) - } - } - } - var ( - l net.Listener - tlscfg *tls.Config - ) - - if ec.PeerTLSInfo != nil && !ec.PeerTLSInfo.Empty() { - tlscfg, err = ec.PeerTLSInfo.ServerConfig() - if err != nil { - ec.err = err - return err - } - } - - l, err = rafthttp.NewListener(u, tlscfg) - if err != nil { - ec.err = err - return err - } - - urlStr := u.String() - if ec.Plog != nil { - ec.Plog.Info("listening for peers on ", urlStr) - } - ll := l // make a unique copy for the closure - d1 := func() { - if err != nil { // XXX - ll.Close() - if ec.Plog != nil { - ec.Plog.Info("stopping listening for peers on ", urlStr) - } - } - } - ec.defers = append(ec.defers, d1) - plns = append(plns, ll) - } - - if ec.ClientAutoTLS && ec.ClientTLSInfo != nil && ec.ClientTLSInfo.Empty() { - var chosts []string - for _, u := range ec.Lcurls { - chosts = append(chosts, u.Host) - } - clientTLSInfoPointer, err := etcdtransport.SelfCert(path.Join(ec.Dir, "fixtures/client"), chosts) - ec.ClientTLSInfo = &clientTLSInfoPointer - if err != nil { - if ec.Plog != nil { - ec.Plog.Fatalf("could not get certs (%v)", err) - } else { - ec.err = fmt.Errorf("could not get certs (%v)", err) - return &ErrEtcdConnFatal{err} - } - } - } else if ec.ClientAutoTLS { - if ec.Plog != nil { - ec.Plog.Warningf("ignoring client auto TLS since certs given") - } - } - - var ctlscfg *tls.Config - if ec.ClientTLSInfo != nil && !ec.ClientTLSInfo.Empty() { - if ec.Plog != nil { - ec.Plog.Infof("clientTLS: %s", ec.ClientTLSInfo) - } - ctlscfg, err = ec.ClientTLSInfo.ServerConfig() - if err != nil { - ec.err = err - return err - } - } - - sctxs := make(map[string]*serveCtx) - for _, u := range ec.Lcurls { - if u.Scheme == "http" { - if ec.ClientTLSInfo != nil && !ec.ClientTLSInfo.Empty() { - if ec.Plog != nil { - ec.Plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String()) - } - } - if ec.ClientTLSInfo != nil && ec.ClientTLSInfo.ClientCertAuth { - if ec.Plog != nil { - ec.Plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) - } - } - } - if u.Scheme == "https" && ctlscfg == nil { - err = fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String()) - ec.err = err - return err - } - - ctx := &serveCtx{host: u.Host} - - if u.Scheme == "https" { - ctx.secure = true - } else { - ctx.insecure = true - } - - if sctxs[u.Host] != nil { - if ctx.secure { - sctxs[u.Host].secure = true - } - if ctx.insecure { - sctxs[u.Host].insecure = true - } - continue - } - - var l net.Listener - - l, err = net.Listen("tcp", u.Host) - if err != nil { - ec.err = err - return err - } - - var fdLimit uint64 - if fdLimit, err = runtimeutil.FDLimit(); err == nil { - if fdLimit <= reservedInternalFDNum { - if ec.Plog != nil { - ec.Plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) - } else { - err = fmt.Errorf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) - ec.err = err - return &ErrEtcdConnFatal{err} - } - } - l = etcdtransport.LimitListener(l, int(fdLimit-reservedInternalFDNum)) - } - - l, err = etcdtransport.NewKeepAliveListener(l, "tcp", nil) - ctx.l = l - if err != nil { - ec.err = err - return err - } - - if ec.Plog != nil { - ec.Plog.Info("listening for client requests on ", u.Host) - } - ll := l - msg := u.Host - d2 := func() { - if err != nil { // XXX - ll.Close() - if ec.Plog != nil { - ec.Plog.Info("stopping listening for client requests on ", msg) - } - } - } - ec.defers = append(ec.defers, d2) - sctxs[u.Host] = ctx - } - - // plns []net.Listener, ctlscfg *tls.Config, sctxs map[string]*serveCtx - listen := func(s *etcdserver.EtcdServer) { - ch := http.Handler(&cors.CORSHandler{ - Handler: v2http.NewClientHandler(s, s.Cfg.ReqTimeout()), - Info: ec.CorsInfo, - }) - ph := v2http.NewPeerHandler(s) - - // Start the peer server in a goroutine - for _, l := range plns { - go func(l net.Listener) { - e := servePeerHTTP(l, ph) - if ec.Plog != nil { - ec.Plog.Fatal(e) - } else { - os.Exit(1) // TODO: write on a ch to notify - } - }(l) - } - // Start a client server goroutine for each listen address - for _, sctx := range sctxs { - go func(sctx *serveCtx) { - // read timeout does not work with http close notify - // TODO: https://github.com/golang/go/issues/9524 - e := serve(sctx, s, ctlscfg, ch) - if ec.Plog != nil { - ec.Plog.Fatal(e) - } else { - os.Exit(1) // TODO: write on a ch to notify - } - }(sctx) - } - } - ec.listen = listen - ec.err = err // likely nil - return err -} - -func (ec *EtcdConn) Listen(s *etcdserver.EtcdServer) error { - if ec.err != nil { // error on init - return ec.err - } - ec.listen(s) - return nil -} - -func (ec *EtcdConn) Close() { - for _, f := range ec.defers { - f() - } -} - -// copied from etcdmain/serve.go -type serveCtx struct { - l net.Listener - host string - secure bool - insecure bool -} - -// serve accepts incoming connections on the listener l, -// creating a new service goroutine for each. The service goroutines -// read requests and then call handler to reply to them. -func serve(sctx *serveCtx, s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler) error { - logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) - - <-s.ReadyNotify() - plog.Info("ready to serve client requests") - - m := cmux.New(sctx.l) - - if sctx.insecure { - gs := v3rpc.Server(s, nil) - grpcl := m.Match(cmux.HTTP2()) - go func() { plog.Fatal(gs.Serve(grpcl)) }() - - srvhttp := &http.Server{ - Handler: handler, - ErrorLog: logger, // do not log user error - } - httpl := m.Match(cmux.HTTP1()) - go func() { plog.Fatal(srvhttp.Serve(httpl)) }() - plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.host) - } - - if sctx.secure { - gs := v3rpc.Server(s, tlscfg) - handler = grpcHandlerFunc(gs, handler) - - tlsl := tls.NewListener(m.Match(cmux.Any()), tlscfg) - // TODO: add debug flag; enable logging when debug flag is set - srv := &http.Server{ - Handler: handler, - TLSConfig: tlscfg, - ErrorLog: logger, // do not log user error - } - go func() { plog.Fatal(srv.Serve(tlsl)) }() - - plog.Infof("serving client requests on %s", sctx.host) - } - - return m.Serve() -} - -// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC -// connections or otherHandler otherwise. Copied from cockroachdb. -func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { - grpcServer.ServeHTTP(w, r) - } else { - otherHandler.ServeHTTP(w, r) - } - }) -} - -func servePeerHTTP(l net.Listener, handler http.Handler) error { - logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) - // TODO: add debug flag; enable logging when debug flag is set - srv := &http.Server{ - Handler: handler, - ReadTimeout: 5 * time.Minute, - ErrorLog: logger, // do not log user error - } - return srv.Serve(l) -}