etcd: ssh: Reconnect on SSH failures
If the SSH connection dies, the dialer can now reconnect that portion.
This commit is contained in:
@@ -254,40 +254,47 @@ func (obj *World) Connect(ctx context.Context, init *engine.WorldInit) error {
|
|||||||
return e
|
return e
|
||||||
})
|
})
|
||||||
|
|
||||||
tunnels := make(map[string]net.Conn)
|
// This runs repeatedly when etcd tries to reconnect.
|
||||||
for _, seed := range obj.Seeds {
|
grpcWithContextDialerFunc := func(ctx context.Context, addr string) (net.Conn, error) {
|
||||||
addr := seedSSH[seed]
|
var reterr error
|
||||||
obj.init.Logf("tunnel: %s", addr)
|
for _, seed := range obj.Seeds { // first successful connect wins
|
||||||
tunnel, err := obj.sshClient.Dial("tcp", addr)
|
if addr != seedSSH[seed] {
|
||||||
if err != nil {
|
continue // not what we're expecting
|
||||||
return errwrap.Append(obj.cleanup(), err)
|
}
|
||||||
|
obj.init.Logf("tunnel: %s", addr)
|
||||||
|
|
||||||
|
tunnel, err := obj.sshClient.Dial("tcp", addr)
|
||||||
|
if err != nil {
|
||||||
|
reterr = err
|
||||||
|
obj.init.Logf("ssh dial error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: do we need a mutex around adding these?
|
||||||
|
obj.cleanups = append(obj.cleanups, func() error {
|
||||||
|
e := tunnel.Close()
|
||||||
|
if e == io.EOF { // XXX: why does this happen?
|
||||||
|
return nil // ignore
|
||||||
|
}
|
||||||
|
if obj.init.Debug && e != nil {
|
||||||
|
obj.init.Logf("ssh client close error: %+v", e)
|
||||||
|
}
|
||||||
|
return e
|
||||||
|
})
|
||||||
|
|
||||||
|
return tunnel, nil // connected successfully
|
||||||
}
|
}
|
||||||
obj.cleanups = append(obj.cleanups, func() error {
|
|
||||||
e := tunnel.Close()
|
if reterr != nil {
|
||||||
if e == io.EOF { // XXX: why does this happen?
|
return nil, reterr
|
||||||
return nil // ignore
|
}
|
||||||
}
|
return nil, fmt.Errorf("no ssh tunnels available") // TODO: better error message?
|
||||||
if obj.init.Debug && e != nil {
|
|
||||||
obj.init.Logf("ssh client close error: %+v", e)
|
|
||||||
}
|
|
||||||
return e
|
|
||||||
})
|
|
||||||
tunnels[addr] = tunnel
|
|
||||||
}
|
}
|
||||||
|
|
||||||
etcdClient, err := clientv3.New(clientv3.Config{
|
etcdClient, err := clientv3.New(clientv3.Config{
|
||||||
Endpoints: obj.Seeds,
|
Endpoints: obj.Seeds,
|
||||||
DialOptions: []grpc.DialOption{
|
DialOptions: []grpc.DialOption{
|
||||||
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
|
grpc.WithContextDialer(grpcWithContextDialerFunc),
|
||||||
tunnel, exists := tunnels[addr]
|
|
||||||
if !exists {
|
|
||||||
obj.init.Logf("can't find tunnel: %s", addr) // tell user early...
|
|
||||||
return nil, fmt.Errorf("can't find tunnel: %s", addr)
|
|
||||||
}
|
|
||||||
// TODO: print the scheme here on this log msg
|
|
||||||
obj.init.Logf("etcd dial: %s", addr)
|
|
||||||
return tunnel, nil
|
|
||||||
}),
|
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user