From ff01e4a5e782495e45cc82398cc704d20ccca324 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Tue, 30 Aug 2016 05:18:55 -0400 Subject: [PATCH] remote: Add distributed converged timeout This patch extends the --converged-timeout argument so that when used with --remote it waits for the entire set of remote mgmt agents to converge simultaneously before exiting. purpleidea says: This particular part of the patch probably took as much work as all of the work required for the initial remote patches alone! --- config.go | 8 +++- etcd.go | 59 +++++++++++++++++++++++++ main.go | 75 ++++++++++++++++++++++++++----- remote.go | 129 ++++++++++++++++++++++++++++++++++++++++++++++++------ 4 files changed, 245 insertions(+), 26 deletions(-) diff --git a/config.go b/config.go index eef3cfc6..1e507efb 100644 --- a/config.go +++ b/config.go @@ -56,6 +56,7 @@ type GraphConfig struct { Collector []collectorResConfig `yaml:"collect"` Edges []edgeConfig `yaml:"edges"` Comment string `yaml:"comment"` + Hostname string `yaml:"hostname"` // uuid for the host Remote string `yaml:"remote"` } @@ -87,7 +88,10 @@ func ParseConfigFromFile(filename string) *GraphConfig { // NewGraphFromConfig returns a new graph from existing input, such as from the // existing graph, and a GraphConfig struct. -func (g *Graph) NewGraphFromConfig(config *GraphConfig, embdEtcd *EmbdEtcd, hostname string, noop bool) (*Graph, error) { +func (g *Graph) NewGraphFromConfig(config *GraphConfig, embdEtcd *EmbdEtcd, noop bool) (*Graph, error) { + if config.Hostname == "" { + return nil, fmt.Errorf("Config: Error: Hostname can't be empty!") + } var graph *Graph // new graph to return if g == nil { // FIXME: how can we check for an empty graph? @@ -154,7 +158,7 @@ func (g *Graph) NewGraphFromConfig(config *GraphConfig, embdEtcd *EmbdEtcd, host } } // store in etcd - if err := EtcdSetResources(embdEtcd, hostname, resources); err != nil { + if err := EtcdSetResources(embdEtcd, config.Hostname, resources); err != nil { return nil, fmt.Errorf("Config: Could not export resources: %v", err) } diff --git a/etcd.go b/etcd.go index 7ace1e32..f6ef0514 100644 --- a/etcd.go +++ b/etcd.go @@ -1849,6 +1849,65 @@ func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { return endpoints, nil } +// EtcdSetHostnameConverged sets whether a specific hostname is converged. +func EtcdSetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error { + if TRACE { + log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%s): %v", hostname, isConverged) + defer log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%v): Finished!", hostname) + } + converged := fmt.Sprintf("/%s/converged/%s", NS, hostname) + op := []etcd.Op{etcd.OpPut(converged, fmt.Sprintf("%t", isConverged))} + if _, err := obj.Txn(nil, op, nil); err != nil { // TODO: do we need a skipConv flag here too? + return fmt.Errorf("Etcd: Set converged failed!") // exit in progress? + } + return nil +} + +// EtcdHostnameConverged returns a map of every hostname's converged state. +func EtcdHostnameConverged(obj *EmbdEtcd) (map[string]bool, error) { + if TRACE { + log.Printf("Trace: Etcd: EtcdHostnameConverged()") + defer log.Printf("Trace: Etcd: EtcdHostnameConverged(): Finished!") + } + path := fmt.Sprintf("/%s/converged/", NS) + keyMap, err := obj.ComplexGet(path, true, etcd.WithPrefix()) // don't un-converge + if err != nil { + return nil, fmt.Errorf("Etcd: Converged values aren't available: %v", err) + } + converged := make(map[string]bool) + for key, val := range keyMap { // loop through directory... + if !strings.HasPrefix(key, path) { + continue + } + name := key[len(path):] // get name of key + if val == "" { // skip "erased" values + continue + } + b, err := strconv.ParseBool(val) + if err != nil { + return nil, fmt.Errorf("Etcd: Converged: Data format error!: %v", err) + } + converged[name] = b // add to map + } + return converged, nil +} + +// EtcdAddHostnameConvergedWatcher adds a watcher with a callback that runs on +// hostname state changes. +func EtcdAddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error) { + path := fmt.Sprintf("/%s/converged/", NS) + internalCbFn := func(re *RE) error { + // TODO: get the value from the response, and apply delta... + // for now, just run a get operation which is easier to code! + if m, err := EtcdHostnameConverged(obj); err == nil { + return callbackFn(m) // call my function + } else { + return err + } + } + return obj.AddWatcher(path, internalCbFn, true, true, etcd.WithPrefix()) // no block and no converger reset +} + // EtcdSetClusterSize sets the ideal target cluster size of etcd peers func EtcdSetClusterSize(obj *EmbdEtcd, value uint16) error { if TRACE { diff --git a/main.go b/main.go index b6f258ba..cd7b9fe5 100644 --- a/main.go +++ b/main.go @@ -69,9 +69,19 @@ func run(c *cli.Context) error { log.Printf("This is: %v, version: %v", program, version) log.Printf("Main: Start: %v", start) - hostname := c.String("hostname") - if hostname == "" { - hostname, _ = os.Hostname() + hostname, _ := os.Hostname() + // allow passing in the hostname, instead of using --hostname + if c.IsSet("file") { + if config := ParseConfigFromFile(c.String("file")); config != nil { + if h := config.Hostname; h != "" { + hostname = h + } + } + } + if c.IsSet("hostname") { // override by cli + if h := c.String("hostname"); h != "" { + hostname = h + } } noop := c.Bool("noop") @@ -123,6 +133,17 @@ func run(c *cli.Context) error { return cli.NewExitError("", 1) } + if c.IsSet("converged-timeout") && cConns > 0 && len(c.StringSlice("remote")) > c.Int("cconns") { + log.Printf("Main: Error: combining --converged-timeout with more remotes than available connections will never converge!") + return cli.NewExitError("", 1) + } + + depth := uint16(c.Int("depth")) + if depth < 0 { // user should not be using this argument manually + log.Printf("Main: Error: negative values for --depth are not permitted!") + return cli.NewExitError("", 1) + } + if c.IsSet("prefix") && c.Bool("tmp-prefix") { log.Println("Main: Error: combining --prefix and the request for a tmp prefix is illogical!") return cli.NewExitError("", 1) @@ -162,13 +183,7 @@ func run(c *cli.Context) error { // setup converger converger := NewConverger( c.Int("converged-timeout"), - func(b bool) error { // lambda to run when converged - if b { - log.Printf("Converged for %d seconds, exiting!", c.Int("converged-timeout")) - exit <- true // trigger an exit! - } - return nil - }, + nil, // stateFn gets added in by EmbdEtcd ) go converger.Loop(true) // main loop for converger, true to start paused @@ -196,6 +211,24 @@ func run(c *cli.Context) error { log.Printf("Main: Etcd: Startup failed: %v", err) exit <- true } + convergerStateFn := func(b bool) error { + // exit if we are using the converged-timeout and we are the + // root node. otherwise, if we are a child node in a remote + // execution hierarchy, we should only notify our converged + // state and wait for the parent to trigger the exit. + if depth == 0 && c.Int("converged-timeout") >= 0 { + if b { + log.Printf("Converged for %d seconds, exiting!", c.Int("converged-timeout")) + exit <- true // trigger an exit! + } + return nil + } + // send our individual state into etcd for others to see + return EtcdSetHostnameConverged(EmbdEtcd, hostname, b) // TODO: what should happen on error? + } + if EmbdEtcd != nil { + converger.SetStateFn(convergerStateFn) + } exitchan := make(chan Event) // exit event go func() { @@ -250,6 +283,12 @@ func run(c *cli.Context) error { continue } + if config.Hostname != "" && config.Hostname != hostname { + log.Printf("Config: Hostname changed, ignoring config!") + continue + } + config.Hostname = hostname // set it in case it was "" + // run graph vertex LOCK... if !first { // TODO: we can flatten this check out I think converger.Pause() // FIXME: add sync wait? @@ -258,7 +297,7 @@ func run(c *cli.Context) error { // build graph from yaml file on events (eg: from etcd) // we need the vertices to be paused to work on them - if newFullgraph, err := fullGraph.NewGraphFromConfig(config, EmbdEtcd, hostname, noop); err == nil { // keep references to all original elements + if newFullgraph, err := fullGraph.NewGraphFromConfig(config, EmbdEtcd, noop); err == nil { // keep references to all original elements fullGraph = newFullgraph } else { log.Printf("Config: Error making new graph from config: %v", err) @@ -303,6 +342,11 @@ func run(c *cli.Context) error { events = nil // signal that no-watch is true } + // initialize the add watcher, which calls the f callback on map changes + convergerCb := func(f func(map[string]bool) error) (func(), error) { + return EtcdAddHostnameConvergedWatcher(EmbdEtcd, f) + } + // build remotes struct for remote ssh remotes := NewRemotes( EmbdEtcd.LocalhostClientURLs().StringSlice(), @@ -314,7 +358,10 @@ func run(c *cli.Context) error { c.Bool("allow-interactive"), c.String("ssh-priv-id-rsa"), !c.Bool("no-caching"), + depth, prefix, + converger, + convergerCb, ) // TODO: is there any benefit to running the remotes above in the loop? @@ -503,6 +550,12 @@ func main() { Name: "no-caching", Usage: "don't allow remote caching of remote execution binary", }, + cli.IntFlag{ + Name: "depth", + Hidden: true, // internal use only + Value: 0, + Usage: "specify depth in remote hierarchy", + }, cli.StringFlag{ Name: "prefix", Usage: "specify a path to the working prefix directory", diff --git a/remote.go b/remote.go index 6dbb88ba..c291269c 100644 --- a/remote.go +++ b/remote.go @@ -78,6 +78,8 @@ const ( // The SSH struct is the unit building block for a single remote SSH connection. type SSH struct { + hostname string // uuid of the host, as used by the --hostname argument + host string // remote host to connect to port uint16 // remote port to connect to (usually 22) user string // username to connect with @@ -89,8 +91,10 @@ type SSH struct { noop bool // whether to run the remote process with --noop noWatch bool // whether to run the remote process with --no-watch - caching bool // whether to try and cache the copy of the binary - prefix string // location we're allowed to put data on the remote server + depth uint16 // depth of this node in the remote execution hierarchy + caching bool // whether to try and cache the copy of the binary + prefix string // location we're allowed to put data on the remote server + converger Converger client *ssh.Client // client object sftp *sftp.Client // sftp object @@ -475,19 +479,24 @@ func (obj *SSH) Exec() error { obj.session.Stdout = &b obj.session.Stderr = &b + hostname := fmt.Sprintf("--hostname '%s'", obj.hostname) // TODO: do something less arbitrary about which one we pick? url := cleanURL(obj.remoteURLs[0]) // arbitrarily pick the first one seeds := fmt.Sprintf("--no-server --seeds 'http://%s'", url) // XXX: escape dangerous untrusted input? file := fmt.Sprintf("--file '%s'", obj.filepath) // XXX: escape dangerous untrusted input! - args := []string{seeds, file} + depth := fmt.Sprintf("--depth %d", obj.depth+1) // child is +1 distance + args := []string{hostname, seeds, file, depth} if obj.noop { args = append(args, "--noop") } if obj.noWatch { args = append(args, "--no-watch") } - - // TODO: add --converged-timeout support for group + if timeout := obj.converger.Timeout(); timeout >= 0 { + args = append(args, fmt.Sprintf("--converged-timeout=%d", timeout)) + } + // TODO: we use a depth parameter instead of a simple bool, in case we + // want to have outwardly expanding trees of remote execution... cmd := fmt.Sprintf("%s run %s", obj.execpath, strings.Join(args, " ")) log.Printf("Remote: Running: %s", cmd) @@ -669,17 +678,26 @@ type Remotes struct { interactive bool // allow interactive prompting sshPrivIdRsa string // path to ~/.ssh/id_rsa caching bool // whether to try and cache the copy of the binary + depth uint16 // depth of this node in the remote execution hierarchy prefix string // folder prefix to use for misc storage + converger Converger + convergerCb func(func(map[string]bool) error) (func(), error) + + wg sync.WaitGroup // keep track of each running SSH connection + lock sync.Mutex // mutex for access to sshmap + sshmap map[string]*SSH // map to each SSH struct with the remote as the key + exiting bool // flag to let us know if we're exiting + exitChan chan struct{} // closes when we should exit + semaphore Semaphore // counting semaphore to limit concurrent connections + hostnames []string // list of hostnames we've seen so far + cuuid ConvergerUUID // convergerUUID for the remote itself + cuuids map[string]ConvergerUUID // map to each SSH struct with the remote as the key + callbackCancelFunc func() // stored callback function cancel function - wg sync.WaitGroup // keep track of each running SSH connection - lock sync.Mutex // mutex for access to sshmap - sshmap map[string]*SSH // map to each SSH struct with the remote as the key - exiting bool // flag to let us know if we're exiting - semaphore Semaphore // counting semaphore to limit concurrent connections } // The NewRemotes function builds a Remotes struct. -func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fileWatch chan string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, prefix string) *Remotes { +func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fileWatch chan string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, depth uint16, prefix string, converger Converger, convergerCb func(func(map[string]bool) error) (func(), error)) *Remotes { return &Remotes{ clientURLs: clientURLs, remoteURLs: remoteURLs, @@ -690,9 +708,15 @@ func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fi interactive: interactive, sshPrivIdRsa: sshPrivIdRsa, caching: caching, + depth: depth, prefix: prefix, + converger: converger, + convergerCb: convergerCb, sshmap: make(map[string]*SSH), + exitChan: make(chan struct{}), semaphore: NewSemaphore(int(cConns)), + hostnames: make([]string, len(remotes)), + cuuids: make(map[string]ConvergerUUID), } } @@ -757,7 +781,17 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) { return nil, fmt.Errorf("No authentication methods available!") } + hostname := config.Hostname + if hostname == "" { + hostname = host // default to above + } + if StrInList(hostname, obj.hostnames) { + return nil, fmt.Errorf("Remote: Hostname `%s` already exists!", hostname) + } + obj.hostnames = append(obj.hostnames, hostname) + return &SSH{ + hostname: hostname, host: host, port: port, user: user, @@ -767,7 +801,9 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) { remoteURLs: obj.remoteURLs, noop: obj.noop, noWatch: obj.fileWatch == nil, + depth: obj.depth, caching: obj.caching, + converger: obj.converger, prefix: obj.prefix, }, nil } @@ -846,13 +882,72 @@ func (obj *Remotes) passwordCallback(user, host string) func() (string, error) { // The Run method of the Remotes struct kicks it all off. It is usually run from // a go routine. func (obj *Remotes) Run() { + // TODO: we can disable a lot of this if we're not using --converged-timeout + // link in all the converged timeout checking and callbacks... + obj.cuuid = obj.converger.Register() // one for me! + obj.cuuid.SetName("Remote: Run") + for _, f := range obj.remotes { // one for each remote... + obj.cuuids[f] = obj.converger.Register() // save a reference + obj.cuuids[f].SetName(fmt.Sprintf("Remote: %s", f)) + //obj.cuuids[f].SetConverged(false) // everyone starts off false + } + + // watch for converged state in the group of remotes... + cFunc := func(m map[string]bool) error { + // The hosts state has changed. Here is what it is now. Is this + // now converged, or not? Run SetConverged(b) to update status! + // The keys are hostnames, not filenames as in the sshmap keys. + + // update converged status for each remote + for _, f := range obj.remotes { + // TODO: add obj.lock.Lock() ? + sshobj, exists := obj.sshmap[f] + // TODO: add obj.lock.Unlock() ? + if !exists || sshobj == nil { + continue // skip, this hasn't happened yet + } + hostname := sshobj.hostname + b, ok := m[hostname] + if !ok { // no status on hostname means unconverged! + continue + } + if DEBUG { + log.Printf("Remote: Converged: Status: %+v", obj.converger.Status()) + } + // if exiting, don't update, it will be unregistered... + if !sshobj.exiting { // this is actually racy, but safe + obj.cuuids[f].SetConverged(b) // ignore errors! + } + } + + return nil + } + if cancel, err := obj.convergerCb(cFunc); err != nil { // register the callback to run + obj.callbackCancelFunc = cancel + } + // kick off the file change notifications + // NOTE: if we ever change a config after a host has converged but has + // been let to exit before the group, then it won't see the changes... if obj.fileWatch != nil { + obj.wg.Add(1) go func() { + defer obj.wg.Done() + var f string + var more bool for { - f, more := <-obj.fileWatch // read from channel - if !more { + select { + case <-obj.exitChan: // closes when we're done return + case f, more = <-obj.fileWatch: // read from channel + if !more { + return + } + obj.cuuid.SetConverged(false) // activity! + + case <-obj.cuuid.ConvergedTimer(): + obj.cuuid.SetConverged(true) // converged! + continue } obj.lock.Lock() sshobj, exists := obj.sshmap[f] @@ -893,6 +988,8 @@ func (obj *Remotes) Run() { defer obj.semaphore.V(1) } defer obj.wg.Done() + defer obj.cuuids[f].Unregister() + if err := sshobj.Go(); err != nil { log.Printf("Remote: Error: %s", err) } @@ -907,6 +1004,7 @@ func (obj *Remotes) Exit() { obj.lock.Lock() obj.exiting = true // don't spawn new ones once this flag is set! obj.lock.Unlock() + close(obj.exitChan) for _, f := range obj.remotes { sshobj, exists := obj.sshmap[f] if !exists || sshobj == nil { @@ -919,6 +1017,11 @@ func (obj *Remotes) Exit() { } } + if obj.callbackCancelFunc != nil { + obj.callbackCancelFunc() // cancel our callback + } + + defer obj.cuuid.Unregister() obj.wg.Wait() // wait for everyone to exit }