Remote "agent-less" mode

This is a new mode to be used for bootstrapping mgmt clusters or in
situations with tight operational restrictions.

This includes the basics, additional functionality will follow!
This commit is contained in:
James Shubin
2016-08-03 05:00:40 -04:00
parent bdb970203c
commit 7032eea045
7 changed files with 977 additions and 3 deletions

View File

@@ -34,6 +34,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
* [Autoedges - Automatic resource relationships](#autoedges) * [Autoedges - Automatic resource relationships](#autoedges)
* [Autogrouping - Automatic resource grouping](#autogrouping) * [Autogrouping - Automatic resource grouping](#autogrouping)
* [Automatic clustering - Automatic cluster management](#automatic-clustering) * [Automatic clustering - Automatic cluster management](#automatic-clustering)
* [Remote mode - Remote "agent-less" execution](#remote-agent-less-mode)
5. [Usage/FAQ - Notes on usage and frequently asked questions](#usage-and-frequently-asked-questions) 5. [Usage/FAQ - Notes on usage and frequently asked questions](#usage-and-frequently-asked-questions)
6. [Reference - Detailed reference](#reference) 6. [Reference - Detailed reference](#reference)
* [Graph definition file](#graph-definition-file) * [Graph definition file](#graph-definition-file)
@@ -142,6 +143,27 @@ with the `--seeds` variable.
You can read the introductory blog post about this topic here: You can read the introductory blog post about this topic here:
[https://ttboj.wordpress.com/2016/06/20/automatic-clustering-in-mgmt/](https://ttboj.wordpress.com/2016/06/20/automatic-clustering-in-mgmt/) [https://ttboj.wordpress.com/2016/06/20/automatic-clustering-in-mgmt/](https://ttboj.wordpress.com/2016/06/20/automatic-clustering-in-mgmt/)
###Remote ("agent-less") mode
Remote mode is a special mode that lets you kick off mgmt runs on one or more
remote machines which are only accessible via SSH. In this mode the initiating
host connects over SSH, copies over the `mgmt` binary, opens an SSH tunnel, and
runs the remote program while simultaneously passing the etcd traffic back
through the tunnel so that the initiators etcd cluster can be used to exchange
resource data.
The interesting benefit of this architecture is that multiple hosts which can't
connect directly use the initiator to pass the important traffic through to each
other. Once the cluster has converged all the remote programs can shutdown
leaving no residual agent.
This mode can also be useful for bootstrapping a new host where you'd like to
have the service run continuously and as part of an mgmt cluster normally.
####Blog post
An introductory blog post about this topic will follow soon.
##Usage and frequently asked questions ##Usage and frequently asked questions
(Send your questions as a patch to this FAQ! I'll review it, merge it, and (Send your questions as a patch to this FAQ! I'll review it, merge it, and
respond by commit with the answer.) respond by commit with the answer.)
@@ -222,6 +244,11 @@ Globally force all resources into no-op mode. This also disables the export to
etcd functionality, but does not disable resource collection, however all etcd functionality, but does not disable resource collection, however all
resources that are collected will have their individual noop settings set. resources that are collected will have their individual noop settings set.
####`--remote <graph.yaml>`
Point to a graph file to run on the remote host specified within. This parameter
can be used multiple times if you'd like to remotely run on multiple hosts in
parallel.
##Examples ##Examples
For example configurations, please consult the [examples/](https://github.com/purpleidea/mgmt/tree/master/examples) directory in the git For example configurations, please consult the [examples/](https://github.com/purpleidea/mgmt/tree/master/examples) directory in the git
source repository. It is available from: source repository. It is available from:

View File

@@ -56,6 +56,7 @@ type GraphConfig struct {
Collector []collectorResConfig `yaml:"collect"` Collector []collectorResConfig `yaml:"collect"`
Edges []edgeConfig `yaml:"edges"` Edges []edgeConfig `yaml:"edges"`
Comment string `yaml:"comment"` Comment string `yaml:"comment"`
Remote string `yaml:"remote"`
} }
func (c *GraphConfig) Parse(data []byte) error { func (c *GraphConfig) Parse(data []byte) error {

View File

@@ -77,6 +77,8 @@ const (
exitDelay = 3 // number of sec of inactivity after exit to clean up exitDelay = 3 // number of sec of inactivity after exit to clean up
defaultIdealClusterSize = 5 // default ideal cluster size target for initial seed defaultIdealClusterSize = 5 // default ideal cluster size target for initial seed
tempPrefix = "tmp-mgmt-etcd-" // XXX use some special mgmt tmp dir tempPrefix = "tmp-mgmt-etcd-" // XXX use some special mgmt tmp dir
DefaultClientURL = "127.0.0.1:2379"
DefaultServerURL = "127.0.0.1:2380"
) )
var ( var (
@@ -214,7 +216,7 @@ func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs,
// TODO: add some sort of auto assign method for picking these defaults // TODO: add some sort of auto assign method for picking these defaults
// add a default so that our local client can connect locally if needed // add a default so that our local client can connect locally if needed
if len(obj.LocalhostClientURLs()) == 0 { // if we don't have any localhost URLs if len(obj.LocalhostClientURLs()) == 0 { // if we don't have any localhost URLs
u := url.URL{Scheme: "http", Host: "127.0.0.1:2379"} // default u := url.URL{Scheme: "http", Host: DefaultClientURL} // default
obj.clientURLs = append([]url.URL{u}, obj.clientURLs...) // prepend obj.clientURLs = append([]url.URL{u}, obj.clientURLs...) // prepend
} }
@@ -223,7 +225,7 @@ func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs,
if len(obj.endpoints) > 0 { if len(obj.endpoints) > 0 {
obj.noServer = true // we didn't have enough to be a server obj.noServer = true // we didn't have enough to be a server
} }
u := url.URL{Scheme: "http", Host: "127.0.0.1:2380"} // default u := url.URL{Scheme: "http", Host: DefaultServerURL} // default
obj.serverURLs = []url.URL{u} obj.serverURLs = []url.URL{u}
} }

23
examples/remote1.yaml Normal file
View File

@@ -0,0 +1,23 @@
---
graph: mygraph
comment: remote noop example
resources:
noop:
- name: noop1
meta:
noop: true
file:
- name: file1
path: "/tmp/mgmt-remote-hello"
content: |
hello world from @purpleidea
state: exists
edges:
- name: e1
from:
kind: noop
name: noop1
to:
kind: file
name: file1
remote: "ssh://root:password@hostname:22"

53
main.go
View File

@@ -104,6 +104,21 @@ func run(c *cli.Context) error {
return cli.NewExitError("", 1) return cli.NewExitError("", 1)
} }
if c.Bool("no-server") && len(c.StringSlice("remote")) > 0 {
// TODO: in this case, we won't be able to tunnel stuff back to
// here, so if we're okay with every remote graph running in an
// isolated mode, then this is okay. Improve on this if there's
// someone who really wants to be able to do this.
log.Println("Main: Error: the --no-server and --remote parameters cannot be used together!")
return cli.NewExitError("", 1)
}
cConns := uint16(c.Int("cconns"))
if cConns < 0 {
log.Printf("Main: Error: --cconns should be at least zero!")
return cli.NewExitError("", 1)
}
var wg sync.WaitGroup var wg sync.WaitGroup
exit := make(chan bool) // exit signal exit := make(chan bool) // exit signal
var G, fullGraph *Graph var G, fullGraph *Graph
@@ -244,6 +259,21 @@ func run(c *cli.Context) error {
} }
}() }()
// build remotes struct for remote ssh
remotes := NewRemotes(
EmbdEtcd.LocalhostClientURLs().StringSlice(),
[]string{DefaultClientURL},
noop,
c.StringSlice("remote"), // list of files
cConns,
c.Bool("allow-interactive"),
c.String("ssh-priv-id-rsa"),
)
// TODO: is there any benefit to running the remotes above in the loop?
// wait for etcd to be running before we remote in, which we do above!
go remotes.Run()
if !c.IsSet("file") && !c.IsSet("puppet") { if !c.IsSet("file") && !c.IsSet("puppet") {
converger.Start() // better start this for empty graphs converger.Start() // better start this for empty graphs
} }
@@ -253,6 +283,8 @@ func run(c *cli.Context) error {
log.Println("Destroy...") log.Println("Destroy...")
remotes.Exit() // tell all the remote connections to shutdown; waits!
G.Exit() // tell all the children to exit G.Exit() // tell all the children to exit
// tell inner main loop to exit // tell inner main loop to exit
@@ -398,6 +430,27 @@ func main() {
Value: "", Value: "",
Usage: "supply the path to an alternate puppet.conf file to use", Usage: "supply the path to an alternate puppet.conf file to use",
}, },
cli.StringSliceFlag{
Name: "remote",
Value: &cli.StringSlice{},
Usage: "list of remote graph definitions to run",
},
cli.BoolFlag{
Name: "allow-interactive",
Usage: "allow interactive prompting, such as for remote passwords",
},
cli.StringFlag{
Name: "ssh-priv-id-rsa",
Value: "~/.ssh/id_rsa",
Usage: "default path to ssh key file, set empty to never touch",
EnvVar: "MGMT_SSH_PRIV_ID_RSA",
},
cli.IntFlag{
Name: "cconns",
Value: 0,
Usage: "number of maximum concurrent remote ssh connections to run, 0 for unlimited",
EnvVar: "MGMT_CCONNS",
},
}, },
}, },
} }

868
remote.go Normal file
View File

@@ -0,0 +1,868 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This set of structs and methods are for running mgmt remotely over SSH. This
// gives us the architectural robustness of our current design, combined with
// the ability to run it with an "agent-less" approach for bootstrapping, and
// in environments with more restrictive installation requirements. In general
// the following sequence is run:
//
// 1) connect to remote host
// 2) make temporary directory
// 3) copy over the mgmt binary and graph definition
// 4) tunnel tcp connections for etcd
// 5) run it!
// 6) finish and quit
// 7) close tunnels
// 8) clean up
// 9) disconnect
//
// The main advantage of this agent-less approach, is while multiple of these
// remote mgmt transient agents are running, they can still exchange data and
// converge together without directly connecting, since they all tunnel through
// the etcd server running on the initiator.
package main // TODO: make this a separate ssh package
// TODO: running with two identical remote endpoints over a slow connection, eg:
// --remote file1.yaml --remote file1.yaml
// where we ^C when both file copies are running seems to deadlock the process.
import (
"bytes"
"fmt"
"github.com/howeyc/gopass"
"github.com/kardianos/osext"
"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"net/url"
"os"
"os/user"
"path"
"strconv"
"strings"
"sync"
"time"
)
const (
// FIXME: should this dir be in /var/ instead?
formatPattern = "/tmp/mgmt.%s/" // remote format, to match `mktemp`
formatChars = "abcdefghijklmnopqrstuvwxyz0123456789" // chars for fmt string // TODO: what does mktemp use?
maxCollisions = 13 // number of tries to try making a unique remote directory
defaultUser = "mgmt" // default user
defaultPort uint16 = 22 // default port
maxPasswordTries = 3 // max number of interactive password tries
nonInteractivePasswordTimeout = 5 * 2 // five minutes
)
// The SSH struct is the unit building block for a single remote SSH connection.
type SSH struct {
host string // remote host to connect to
port uint16 // remote port to connect to (usually 22)
user string // username to connect with
auth []ssh.AuthMethod // list of auth for ssh
file string // the graph definition file to run
clientURLs []string // list of urls where the local server is listening
remoteURLs []string // list of urls where the remote server connects to
noop bool // whether to run the remote process with --noop
client *ssh.Client // client object
sftp *sftp.Client // sftp object
listener net.Listener // remote listener
session *ssh.Session // session for exec
f1 *os.File // file object for SftpCopy source
f2 *sftp.File // file object for SftpCopy destination
wg sync.WaitGroup // sync group for tunnel go routines
lock sync.Mutex // mutex to avoid exit races
exiting bool // flag to let us know if we're exiting
remotewd string // path to remote working directory
execpath string // path to remote mgmt binary
filepath string // path to remote file config
}
// Connect kicks off the SSH connection.
func (obj *SSH) Connect() error {
config := &ssh.ClientConfig{
User: obj.user,
// you must pass in at least one implementation of AuthMethod
Auth: obj.auth,
}
var err error
obj.client, err = ssh.Dial("tcp", fmt.Sprintf("%s:%d", obj.host, obj.port), config)
if err != nil {
return fmt.Errorf("Can't dial: %s", err.Error()) // Error() returns a string
}
return nil
}
// Close cleans up after the main SSH connection.
func (obj *SSH) Close() error {
if obj.client == nil {
return nil
}
return obj.client.Close()
}
// The Sftp function uses the sftp protocol to create a remote dir and copy over
// the binary to run. On error the string represents the path to the remote dir.
func (obj *SSH) Sftp() error {
var err error
if obj.client == nil {
return fmt.Errorf("Not dialed!")
}
// this check is needed because the golang path.Base function is weird!
if strings.HasSuffix(obj.file, "/") {
return fmt.Errorf("File must not be a directory.")
}
// we run local operations first so that remote clean up is easier...
selfpath := ""
if selfpath, err = osext.Executable(); err != nil {
return fmt.Errorf("Can't get executable path: %v", err)
}
log.Printf("Remote: Self executable is: %s", selfpath)
// this calls NewSession and does everything in its own session :)
obj.sftp, err = sftp.NewClient(obj.client)
if err != nil {
return err
}
// TODO: make the path configurable to deal with /tmp/ mounted noexec?
obj.remotewd = ""
for i := 0; true; {
// NOTE: since fmtUUID is deterministic, if we don't clean up
// previous runs, we may get the same paths generated, and here
// they will conflict.
obj.remotewd = fmt.Sprintf(formatPattern, fmtUUID(10)) // eg: /tmp/mgmt.abcdefghij/
if err := obj.sftp.Mkdir(obj.remotewd); err != nil {
i++ // count number of times we've tried
e := fmt.Errorf("Can't make tmp directory: %s", err)
log.Println(e)
if i >= maxCollisions {
log.Printf("Remote: Please clean up the remote dir: %s", obj.remotewd)
return e
}
continue // try again, unlucky conflict!
}
log.Printf("Remote: Remotely created: %s", obj.remotewd)
break
}
// FIXME: consider running a hashing function to check if the remote file
// is valid before copying it over again... this would need a deterministic
// temp directory location first... this actually happens with fmtUUID!
// future patch!
obj.execpath = path.Join(obj.remotewd, program) // program is a compile time string from main.go
log.Printf("Remote: Remote path is: %s", obj.execpath)
log.Println("Remote: Copying binary, please be patient...")
_, err = obj.SftpCopy(selfpath, obj.execpath)
if err != nil {
// TODO: cleanup
return fmt.Errorf("Error copying binary: %s", err)
}
if obj.exitCheck() {
return nil
}
// make file executable
// TODO: do we want the group or other bits set?
if err := obj.sftp.Chmod(obj.execpath, 0770); err != nil {
return fmt.Errorf("Can't set file mode bits!")
}
// copy graph file
// TODO: should future versions use torrent for this copy and updates?
obj.filepath = path.Join(obj.remotewd, path.Base(obj.file)) // same filename
log.Println("Remote: Copying graph definition...")
_, err = obj.SftpCopy(obj.file, obj.filepath)
if err != nil {
// TODO: cleanup
return fmt.Errorf("Error copying graph: %s", err)
}
return nil
}
// SftpCopy is a simple helper function that runs a local -> remote sftp copy.
func (obj *SSH) SftpCopy(src, dst string) (int64, error) {
if obj.sftp == nil {
return -1, fmt.Errorf("Sftp session is not active!")
}
var err error
// TODO: add a check to make sure we don't run two copies of this
// function at the same time! they both would use obj.f1 and obj.f2
obj.f1, err = os.Open(src) // open a handle to read the file
if err != nil {
return -1, err
}
defer obj.f1.Close()
if obj.exitCheck() {
return -1, nil
}
obj.f2, err = obj.sftp.Create(dst) // open a handle to create the file
if err != nil {
return -1, err
}
defer obj.f2.Close()
if obj.exitCheck() {
return -1, nil
}
// the actual copy, this might take time...
n, err := io.Copy(obj.f2, obj.f1) // dst, src -> n, error
if err != nil {
return n, fmt.Errorf("Can't copy to remote path: %v", err)
}
if n <= 0 {
return n, fmt.Errorf("Zero bytes copied!")
}
return n, nil
}
// SftpClean cleans up the mess and closes the connection from the sftp work.
func (obj *SSH) SftpClean() error {
if obj.sftp == nil {
return nil
}
// TODO: if this runs before we ever use f1 or f2 it could be a panic!
// TODO: fix this possible? panic if we ever end up caring about it...
// close any copy operations that are in progress...
obj.f1.Close() // TODO: we probably only need to shutdown one of them,
obj.f2.Close() // but which one should we shutdown? close both for now
// clean up the graph definition in obj.remotewd
err := obj.sftp.Remove(obj.filepath)
// TODO: add binary caching
if e := obj.sftp.Remove(obj.execpath); e != nil {
err = e
}
if e := obj.sftp.Remove(obj.remotewd); e != nil {
err = e
}
if e := obj.sftp.Close(); e != nil {
err = e
}
// TODO: return all errors when we have a better error struct
return err
}
// Tunnel initiates the reverse SSH tunnel. You can .Wait() on the returned
// sync WaitGroup to know when the tunnels have closed completely.
func (obj *SSH) Tunnel() error {
var err error
if len(obj.clientURLs) < 1 {
return fmt.Errorf("Need at least one client URL to tunnel!")
}
if len(obj.remoteURLs) < 1 {
return fmt.Errorf("Need at least one remote URL to tunnel!")
}
// TODO: do something less arbitrary about which one we pick?
url := cleanURL(obj.remoteURLs[0]) // arbitrarily pick the first one
// reverse `ssh -R` listener to listen on the remote host
obj.listener, err = obj.client.Listen("tcp", url) // remote
if err != nil {
return fmt.Errorf("Can't listen on remote host: %s", err)
}
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
for {
conn, err := obj.listener.Accept()
if err != nil {
// a Close() will trigger an EOF "error" here!
if err == io.EOF {
return
}
log.Printf("Remote: Error accepting on remote host: %s", err)
return // FIXME: return or continue?
}
// XXX: pass in wg to this method and to its children?
if f := obj.forward(conn); f != nil {
// TODO: is this correct?
defer f.Close() // close the remote connection
} else {
// TODO: is this correct?
// close the listener since it is useless now
obj.listener.Close()
}
}
}()
return nil
}
// forward is a helper function to make the tunnelling code more readable.
func (obj *SSH) forward(remoteConn net.Conn) net.Conn {
// TODO: validate URL format?
// TODO: do something less arbitrary about which one we pick?
url := cleanURL(obj.clientURLs[0]) // arbitrarily pick the first one
localConn, err := net.Dial("tcp", url) // local
if err != nil {
log.Printf("Remote: Local dial error: %s", err)
return nil // seen as an error...
}
cp := func(writer, reader net.Conn) {
// Copy copies from src to dst until either EOF is reached on
// src or an error occurs. It returns the number of bytes copied
// and the first error encountered while copying, if any.
// Note: src & dst are backwards in golang as compared to cp, lol!
n, err := io.Copy(writer, reader) // from reader to writer
if err != nil {
log.Printf("Remote: io.Copy error: %s", err)
// FIXME: what should we do here???
}
if DEBUG {
log.Printf("Remote: io.Copy finished: %d", n)
}
}
go cp(remoteConn, localConn)
go cp(localConn, remoteConn)
return localConn // success!
}
// TunnelClose causes any currently connected Tunnel to shutdown.
func (obj *SSH) TunnelClose() error {
if obj.listener != nil {
err := obj.listener.Close()
obj.wg.Wait() // wait for everyone to close
obj.listener = nil
return err
}
return nil
}
// Exec runs the binary on the remote server.
func (obj *SSH) Exec() error {
if obj.execpath == "" {
return fmt.Errorf("Must have a binary path to execute!")
}
if obj.filepath == "" {
return fmt.Errorf("Must have a graph definition to run!")
}
var err error
obj.session, err = obj.client.NewSession()
if err != nil {
return fmt.Errorf("Failed to create session: %s", err.Error())
}
defer obj.session.Close()
var b combinedWriter
obj.session.Stdout = &b
obj.session.Stderr = &b
// TODO: do something less arbitrary about which one we pick?
url := cleanURL(obj.remoteURLs[0]) // arbitrarily pick the first one
seeds := fmt.Sprintf("--no-server --seeds 'http://%s'", url) // XXX: escape dangerous untrusted input?
file := fmt.Sprintf("--file '%s'", obj.filepath) // XXX: escape dangerous untrusted input!
args := []string{seeds, file}
if obj.noop {
args = append(args, "--noop")
}
// TODO: add --converged-timeout support for group
cmd := fmt.Sprintf("%s run %s", obj.execpath, strings.Join(args, " "))
log.Printf("Remote: Running: %s", cmd)
if err := obj.session.Run(cmd); err != nil {
// The returned error is nil if the command runs, has no
// problems copying stdin, stdout, and stderr, and exits with a
// zero exit status. If the remote server does not send an exit
// status, an error of type *ExitMissingError is returned. If
// the command completes unsuccessfully or is interrupted by a
// signal, the error is of type *ExitError. Other error types
// may be returned for I/O problems.
if e, ok := err.(*ssh.ExitError); ok {
if sig := e.Waitmsg.Signal(); sig != "" {
log.Printf("Remote: Exit signal: %s", sig)
}
log.Printf("Remote: Error: Output...\n%s", b.PrefixedString("|\t"))
return fmt.Errorf("Exited (%d) with: %s", e.Waitmsg.ExitStatus(), e.Error())
} else if e, ok := err.(*ssh.ExitMissingError); ok {
return fmt.Errorf("Exit code missing: %s", e.Error())
}
// TODO: catch other types of errors here...
return fmt.Errorf("Failed for unknown reason: %s", err.Error())
}
log.Printf("Remote: Output...\n%s", b.PrefixedString("|\t"))
return nil
}
// simpleRun is a simple helper for running commands in new sessions.
func (obj *SSH) simpleRun(cmd string) error {
session, err := obj.client.NewSession() // not the main session!
if err != nil {
return fmt.Errorf("Failed to create session: %s", err.Error())
}
defer session.Close()
if err := session.Run(cmd); err != nil {
return fmt.Errorf("Error running command: %s", err)
}
return nil
}
// ExecExit sends a SIGINT (^C) signal to the remote process, and waits for the
// process to exit.
func (obj *SSH) ExecExit() error {
if obj.session == nil {
return nil
}
// Signal sends the given signal to the remote process.
// FIXME: this doesn't work, see: https://github.com/golang/go/issues/16597
// FIXME: additionally, a disconnect leaves the remote process running! :(
if err := obj.session.Signal(ssh.SIGINT); err != nil {
log.Printf("Remote: Signal: Error: %s", err)
}
// FIXME: workaround: force a signal!
if err := obj.simpleRun(fmt.Sprintf("killall -SIGINT %s", program)); err != nil { // FIXME: low specificity
log.Printf("Remote: Failed to send SIGINT: %s", err.Error())
}
// emergency timeout...
go func() {
// try killing the process more violently
time.Sleep(10 * time.Second)
//obj.session.Signal(ssh.SIGKILL)
cmd := fmt.Sprintf("killall -SIGKILL %s", program) // FIXME: low specificity
obj.simpleRun(cmd)
}()
// FIXME: workaround: wait (spin lock) until process quits cleanly...
cmd := fmt.Sprintf("while killall -0 %s 2> /dev/null; do sleep 1s; done", program) // FIXME: low specificity
if err := obj.simpleRun(cmd); err != nil {
return fmt.Errorf("Error waiting: %s", err)
}
return nil
}
// Go kicks off the entire sequence of one SSH connection.
func (obj *SSH) Go() error {
if obj.exitCheck() {
return nil
}
// connect
log.Println("Remote: Connect...")
if err := obj.Connect(); err != nil {
return fmt.Errorf("Remote: SSH errored with: %v", err)
}
defer obj.Close()
if obj.exitCheck() {
return nil
}
// sftp
log.Println("Remote: Sftp...")
defer obj.SftpClean()
if err := obj.Sftp(); err != nil {
return fmt.Errorf("Remote: Sftp errored with: %v", err)
}
if obj.exitCheck() {
return nil
}
// tunnel
log.Println("Remote: Tunnelling...")
if err := obj.Tunnel(); err != nil { // non-blocking
log.Printf("Remote: Tunnel errored with: %v", err)
return err
}
defer obj.TunnelClose()
if obj.exitCheck() {
return nil
}
// exec
log.Println("Remote: Exec...")
if err := obj.Exec(); err != nil {
log.Printf("Remote: Exec errored with: %v", err)
return err
}
log.Println("Remote: Done!")
return nil
}
// exitCheck is a helper function which stops additional stages from running if
// we detect that a Stop() action has been called.
func (obj *SSH) exitCheck() bool {
obj.lock.Lock()
defer obj.lock.Unlock()
if obj.exiting {
return true // prevent from continuing to the next stage
}
return false
}
// Stop shuts down any SSH in progress as safely and quickly as possible.
func (obj *SSH) Stop() error {
obj.lock.Lock()
obj.exiting = true // don't spawn new steps once this flag is set!
obj.lock.Unlock()
// TODO: return all errors when we have a better error struct
var e error
// go through each stage in reverse order and request an exit
if err := obj.ExecExit(); e == nil && err != nil { // waits for program to exit
e = err
}
if err := obj.TunnelClose(); e == nil && err != nil {
e = err
}
// TODO: match errors due to stop signal and ignore them!
if err := obj.SftpClean(); e == nil && err != nil {
e = err
}
if err := obj.Close(); e == nil && err != nil {
e = err
}
return e
}
// The Remotes struct manages a set of SSH connections.
// TODO: rename this to something more logical
type Remotes struct {
clientURLs []string // list of urls where the local server is listening
remoteURLs []string // list of urls where the remote server connects to
noop bool // whether to run in noop mode
remotes []string // list of remote graph definition files to run
cConns uint16 // number of concurrent ssh connections, zero means unlimited
interactive bool // allow interactive prompting
sshPrivIdRsa string // path to ~/.ssh/id_rsa
wg sync.WaitGroup // keep track of each running SSH connection
lock sync.Mutex // mutex for access to sshmap
sshmap map[string]*SSH // map to each SSH struct with the remote as the key
exiting bool // flag to let us know if we're exiting
semaphore Semaphore // counting semaphore to limit concurrent connections
}
// The NewRemotes function builds a Remotes struct.
func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, cConns uint16, interactive bool, sshPrivIdRsa string) *Remotes {
return &Remotes{
clientURLs: clientURLs,
remoteURLs: remoteURLs,
noop: noop,
remotes: remotes,
cConns: cConns,
interactive: interactive,
sshPrivIdRsa: sshPrivIdRsa,
sshmap: make(map[string]*SSH),
semaphore: NewSemaphore(int(cConns)),
}
}
// NewSSH is a helper function that does the initial parsing into an SSH obj.
// It takes as input the path to a graph definition file.
func (obj *Remotes) NewSSH(file string) (*SSH, error) {
// first do the parsing...
config := ParseConfigFromFile(file)
if config == nil {
return nil, fmt.Errorf("Remote: Error parsing remote graph: %s", file)
}
if config.Remote == "" {
return nil, fmt.Errorf("Remote: No remote endpoint in the graph: %s", file)
}
// do the url parsing...
u, err := url.Parse(config.Remote)
if err != nil {
return nil, err
}
if u.Scheme != "" && u.Scheme != "ssh" {
return nil, fmt.Errorf("Unknown remote scheme: %s", u.Scheme)
}
host := ""
port := defaultPort // default
x := strings.Split(u.Host, ":")
if c := len(x); c == 0 || c > 2 { // need one or two chunks
return nil, fmt.Errorf("Can't parse host pattern: %s", u.Host)
} else if c == 2 {
v, err := strconv.ParseUint(x[1], 10, 16)
if err != nil {
return nil, fmt.Errorf("Can't parse port: %s", x[1])
}
port = uint16(v)
}
host = x[0]
if host == "" {
return nil, fmt.Errorf("Empty hostname!")
}
user := defaultUser // default
if x := u.User.Username(); x != "" {
user = x
}
auth := []ssh.AuthMethod{}
if secret, b := u.User.Password(); b {
auth = append(auth, ssh.Password(secret))
}
// get ssh key auth if available
if a, err := obj.sshKeyAuth(); err == nil {
auth = append(auth, a)
}
// if there are no auth methods available, add interactive to be helpful
if len(auth) == 0 || obj.interactive {
auth = append(auth, ssh.RetryableAuthMethod(ssh.PasswordCallback(obj.passwordCallback(user, host)), maxPasswordTries))
}
if len(auth) == 0 {
return nil, fmt.Errorf("No authentication methods available!")
}
return &SSH{
host: host,
port: port,
user: user,
auth: auth,
file: file,
clientURLs: obj.clientURLs,
remoteURLs: obj.remoteURLs,
noop: obj.noop,
}, nil
}
// sshKeyAuth is a helper function to get the ssh key auth struct needed
func (obj *Remotes) sshKeyAuth() (ssh.AuthMethod, error) {
if obj.sshPrivIdRsa == "" {
return nil, fmt.Errorf("Empty path specified!")
}
p := ""
// TODO: this doesn't match strings of the form: ~james/.ssh/id_rsa
if strings.HasPrefix(obj.sshPrivIdRsa, "~/") {
usr, err := user.Current()
if err != nil {
log.Printf("Remote: Can't find home directory automatically.")
return nil, err
}
p = path.Join(usr.HomeDir, obj.sshPrivIdRsa[len("~/"):])
}
if p == "" {
return nil, fmt.Errorf("Empty path specified!")
}
// A public key may be used to authenticate against the server by using
// an unencrypted PEM-encoded private key file. If you have an encrypted
// private key, the crypto/x509 package can be used to decrypt it.
key, err := ioutil.ReadFile(p)
if err != nil {
return nil, err
}
// Create the Signer for this private key.
signer, err := ssh.ParsePrivateKey(key)
if err != nil {
return nil, err
}
return ssh.PublicKeys(signer), nil
}
// passwordCallback is a function which returns the appropriate type of callback.
func (obj *Remotes) passwordCallback(user, host string) func() (string, error) {
timeout := nonInteractivePasswordTimeout // default
if obj.interactive { // return after a timeout if not interactive
timeout = -1 // unlimited when we asked for interactive mode!
}
cb := func() (string, error) {
passchan := make(chan string)
failchan := make(chan error)
go func() {
log.Printf("Remote: Prompting for %s@%s password...", user, host)
fmt.Printf("Password: ")
password, err := gopass.GetPasswd()
if err != nil { // on ^C or getch() error
// returning an error will cancel the N retries on this
failchan <- err
return
}
passchan <- string(password)
}()
// wait for password, but include a timeout if we promiscuously
// added the interactive mode
select {
case p := <-passchan:
return p, nil
case e := <-failchan:
return "", e
case <-TimeAfterOrBlock(timeout):
return "", fmt.Errorf("Interactive timeout reached!")
}
}
return cb
}
// The Run method of the Remotes struct kicks it all off. It is usually run from
// a go routine.
func (obj *Remotes) Run() {
// the semaphore provides the max simultaneous connection limit
for _, f := range obj.remotes {
if obj.cConns != 0 {
obj.semaphore.P(1) // take one
}
obj.lock.Lock()
if obj.exiting {
return
}
sshobj, err := obj.NewSSH(f)
if err != nil {
log.Printf("Remote: Error: %s", err)
continue
}
obj.sshmap[f] = sshobj // save a reference
obj.wg.Add(1)
go func() {
if obj.cConns != 0 {
defer obj.semaphore.V(1)
}
defer obj.wg.Done()
if err := sshobj.Go(); err != nil {
log.Printf("Remote: Error: %s", err)
}
}()
obj.lock.Unlock()
}
}
// The Exit method causes as much of the Remotes struct to shutdown as quickly
// and as cleanly as possible. It only returns once everything is shutdown.
func (obj *Remotes) Exit() {
obj.lock.Lock()
obj.exiting = true // don't spawn new ones once this flag is set!
obj.lock.Unlock()
for _, f := range obj.remotes {
sshobj, exists := obj.sshmap[f]
if !exists || sshobj == nil {
continue
}
// TODO: should we run these as go routines?
if err := sshobj.Stop(); err != nil {
log.Printf("Remote: Error stopping: %s", err)
}
}
obj.wg.Wait() // wait for everyone to exit
}
// fmtUUID makes a random string of length n, it is not cryptographically safe.
// This function actually usually generates the same sequence of random strings
// each time the program is run, which makes repeatability of this code easier.
func fmtUUID(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = formatChars[rand.Intn(len(formatChars))]
}
return string(b)
}
// cleanURL removes the scheme and leaves just the host:port combination.
func cleanURL(s string) string {
x := s
if !strings.Contains(s, "://") {
x = "ssh://" + x
}
// the url.Parse returns "" for u.Host if given "hostname:22" as input.
u, err := url.Parse(x)
if err != nil {
return ""
}
return u.Host
}
// Semaphore is a counting semaphore.
type Semaphore chan struct{}
func NewSemaphore(size int) Semaphore {
return make(Semaphore, size)
}
// P acquires n resources.
func (s Semaphore) P(n int) {
e := struct{}{}
for i := 0; i < n; i++ {
s <- e // acquire one
}
}
// V releases n resources.
func (s Semaphore) V(n int) {
for i := 0; i < n; i++ {
<-s // release one
}
}
// combinedWriter mimics what the ssh.CombinedOutput command does.
type combinedWriter struct {
b bytes.Buffer
mu sync.Mutex
}
// The Write method writes to the bytes buffer with a lock to mix output safely.
func (w *combinedWriter) Write(p []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()
return w.b.Write(p)
}
// The String function returns the contents of the buffer.
func (w *combinedWriter) String() string {
return w.b.String()
}
// The PrefixedString returns the contents of the buffer with the prefix
// appended to every line.
func (w *combinedWriter) PrefixedString(prefix string) string {
return prefix + strings.TrimSuffix(strings.Replace(w.String(), "\n", "\n"+prefix, -1), prefix)
}

View File

@@ -5,4 +5,4 @@ ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && cd .. && pwd )" # dir!
cd "${ROOT}" cd "${ROOT}"
go vet && echo PASS || exit 1 # since it doesn't output an ok message on pass go vet && echo PASS || exit 1 # since it doesn't output an ok message on pass
grep 'log.' *.go | grep '\\n' && exit 1 || echo PASS # no \n needed in log.Printf() grep 'log.' *.go | grep '\\n"' && exit 1 || echo PASS # no \n needed in log.Printf()