From 81063ae6dfa5b058dc2309c51a77d736f1487a31 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Mon, 2 Jun 2025 15:08:37 -0400 Subject: [PATCH] etcd: ssh: Reconnect on SSH failures If the SSH connection dies, the dialer can now reconnect that portion. --- etcd/ssh/ssh.go | 63 +++++++++++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/etcd/ssh/ssh.go b/etcd/ssh/ssh.go index d61117cf..9e4b9a9f 100644 --- a/etcd/ssh/ssh.go +++ b/etcd/ssh/ssh.go @@ -254,40 +254,47 @@ func (obj *World) Connect(ctx context.Context, init *engine.WorldInit) error { return e }) - tunnels := make(map[string]net.Conn) - for _, seed := range obj.Seeds { - addr := seedSSH[seed] - obj.init.Logf("tunnel: %s", addr) - tunnel, err := obj.sshClient.Dial("tcp", addr) - if err != nil { - return errwrap.Append(obj.cleanup(), err) + // This runs repeatedly when etcd tries to reconnect. + grpcWithContextDialerFunc := func(ctx context.Context, addr string) (net.Conn, error) { + var reterr error + for _, seed := range obj.Seeds { // first successful connect wins + if addr != seedSSH[seed] { + continue // not what we're expecting + } + obj.init.Logf("tunnel: %s", addr) + + tunnel, err := obj.sshClient.Dial("tcp", addr) + if err != nil { + reterr = err + obj.init.Logf("ssh dial error: %v", err) + continue + } + + // TODO: do we need a mutex around adding these? + obj.cleanups = append(obj.cleanups, func() error { + e := tunnel.Close() + if e == io.EOF { // XXX: why does this happen? + return nil // ignore + } + if obj.init.Debug && e != nil { + obj.init.Logf("ssh client close error: %+v", e) + } + return e + }) + + return tunnel, nil // connected successfully } - obj.cleanups = append(obj.cleanups, func() error { - e := tunnel.Close() - if e == io.EOF { // XXX: why does this happen? - return nil // ignore - } - if obj.init.Debug && e != nil { - obj.init.Logf("ssh client close error: %+v", e) - } - return e - }) - tunnels[addr] = tunnel + + if reterr != nil { + return nil, reterr + } + return nil, fmt.Errorf("no ssh tunnels available") // TODO: better error message? } etcdClient, err := clientv3.New(clientv3.Config{ Endpoints: obj.Seeds, DialOptions: []grpc.DialOption{ - grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { - tunnel, exists := tunnels[addr] - if !exists { - obj.init.Logf("can't find tunnel: %s", addr) // tell user early... - return nil, fmt.Errorf("can't find tunnel: %s", addr) - } - // TODO: print the scheme here on this log msg - obj.init.Logf("etcd dial: %s", addr) - return tunnel, nil - }), + grpc.WithContextDialer(grpcWithContextDialerFunc), }, }) if err != nil {