remote: Add distributed converged timeout

This patch extends the --converged-timeout argument so that when used
with --remote it waits for the entire set of remote mgmt agents to
converge simultaneously before exiting.

purpleidea says: This particular part of the patch probably took as much
work as all of the work required for the initial remote patches alone!
This commit is contained in:
James Shubin
2016-08-30 05:18:55 -04:00
parent 6794aff77c
commit ff01e4a5e7
4 changed files with 245 additions and 26 deletions

View File

@@ -56,6 +56,7 @@ type GraphConfig struct {
Collector []collectorResConfig `yaml:"collect"` Collector []collectorResConfig `yaml:"collect"`
Edges []edgeConfig `yaml:"edges"` Edges []edgeConfig `yaml:"edges"`
Comment string `yaml:"comment"` Comment string `yaml:"comment"`
Hostname string `yaml:"hostname"` // uuid for the host
Remote string `yaml:"remote"` Remote string `yaml:"remote"`
} }
@@ -87,7 +88,10 @@ func ParseConfigFromFile(filename string) *GraphConfig {
// NewGraphFromConfig returns a new graph from existing input, such as from the // NewGraphFromConfig returns a new graph from existing input, such as from the
// existing graph, and a GraphConfig struct. // existing graph, and a GraphConfig struct.
func (g *Graph) NewGraphFromConfig(config *GraphConfig, embdEtcd *EmbdEtcd, hostname string, noop bool) (*Graph, error) { func (g *Graph) NewGraphFromConfig(config *GraphConfig, embdEtcd *EmbdEtcd, noop bool) (*Graph, error) {
if config.Hostname == "" {
return nil, fmt.Errorf("Config: Error: Hostname can't be empty!")
}
var graph *Graph // new graph to return var graph *Graph // new graph to return
if g == nil { // FIXME: how can we check for an empty graph? if g == nil { // FIXME: how can we check for an empty graph?
@@ -154,7 +158,7 @@ func (g *Graph) NewGraphFromConfig(config *GraphConfig, embdEtcd *EmbdEtcd, host
} }
} }
// store in etcd // store in etcd
if err := EtcdSetResources(embdEtcd, hostname, resources); err != nil { if err := EtcdSetResources(embdEtcd, config.Hostname, resources); err != nil {
return nil, fmt.Errorf("Config: Could not export resources: %v", err) return nil, fmt.Errorf("Config: Could not export resources: %v", err)
} }

59
etcd.go
View File

@@ -1849,6 +1849,65 @@ func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
return endpoints, nil return endpoints, nil
} }
// EtcdSetHostnameConverged sets whether a specific hostname is converged.
func EtcdSetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error {
if TRACE {
log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%s): %v", hostname, isConverged)
defer log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%v): Finished!", hostname)
}
converged := fmt.Sprintf("/%s/converged/%s", NS, hostname)
op := []etcd.Op{etcd.OpPut(converged, fmt.Sprintf("%t", isConverged))}
if _, err := obj.Txn(nil, op, nil); err != nil { // TODO: do we need a skipConv flag here too?
return fmt.Errorf("Etcd: Set converged failed!") // exit in progress?
}
return nil
}
// EtcdHostnameConverged returns a map of every hostname's converged state.
func EtcdHostnameConverged(obj *EmbdEtcd) (map[string]bool, error) {
if TRACE {
log.Printf("Trace: Etcd: EtcdHostnameConverged()")
defer log.Printf("Trace: Etcd: EtcdHostnameConverged(): Finished!")
}
path := fmt.Sprintf("/%s/converged/", NS)
keyMap, err := obj.ComplexGet(path, true, etcd.WithPrefix()) // don't un-converge
if err != nil {
return nil, fmt.Errorf("Etcd: Converged values aren't available: %v", err)
}
converged := make(map[string]bool)
for key, val := range keyMap { // loop through directory...
if !strings.HasPrefix(key, path) {
continue
}
name := key[len(path):] // get name of key
if val == "" { // skip "erased" values
continue
}
b, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("Etcd: Converged: Data format error!: %v", err)
}
converged[name] = b // add to map
}
return converged, nil
}
// EtcdAddHostnameConvergedWatcher adds a watcher with a callback that runs on
// hostname state changes.
func EtcdAddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error) {
path := fmt.Sprintf("/%s/converged/", NS)
internalCbFn := func(re *RE) error {
// TODO: get the value from the response, and apply delta...
// for now, just run a get operation which is easier to code!
if m, err := EtcdHostnameConverged(obj); err == nil {
return callbackFn(m) // call my function
} else {
return err
}
}
return obj.AddWatcher(path, internalCbFn, true, true, etcd.WithPrefix()) // no block and no converger reset
}
// EtcdSetClusterSize sets the ideal target cluster size of etcd peers // EtcdSetClusterSize sets the ideal target cluster size of etcd peers
func EtcdSetClusterSize(obj *EmbdEtcd, value uint16) error { func EtcdSetClusterSize(obj *EmbdEtcd, value uint16) error {
if TRACE { if TRACE {

75
main.go
View File

@@ -69,9 +69,19 @@ func run(c *cli.Context) error {
log.Printf("This is: %v, version: %v", program, version) log.Printf("This is: %v, version: %v", program, version)
log.Printf("Main: Start: %v", start) log.Printf("Main: Start: %v", start)
hostname := c.String("hostname") hostname, _ := os.Hostname()
if hostname == "" { // allow passing in the hostname, instead of using --hostname
hostname, _ = os.Hostname() if c.IsSet("file") {
if config := ParseConfigFromFile(c.String("file")); config != nil {
if h := config.Hostname; h != "" {
hostname = h
}
}
}
if c.IsSet("hostname") { // override by cli
if h := c.String("hostname"); h != "" {
hostname = h
}
} }
noop := c.Bool("noop") noop := c.Bool("noop")
@@ -123,6 +133,17 @@ func run(c *cli.Context) error {
return cli.NewExitError("", 1) return cli.NewExitError("", 1)
} }
if c.IsSet("converged-timeout") && cConns > 0 && len(c.StringSlice("remote")) > c.Int("cconns") {
log.Printf("Main: Error: combining --converged-timeout with more remotes than available connections will never converge!")
return cli.NewExitError("", 1)
}
depth := uint16(c.Int("depth"))
if depth < 0 { // user should not be using this argument manually
log.Printf("Main: Error: negative values for --depth are not permitted!")
return cli.NewExitError("", 1)
}
if c.IsSet("prefix") && c.Bool("tmp-prefix") { if c.IsSet("prefix") && c.Bool("tmp-prefix") {
log.Println("Main: Error: combining --prefix and the request for a tmp prefix is illogical!") log.Println("Main: Error: combining --prefix and the request for a tmp prefix is illogical!")
return cli.NewExitError("", 1) return cli.NewExitError("", 1)
@@ -162,13 +183,7 @@ func run(c *cli.Context) error {
// setup converger // setup converger
converger := NewConverger( converger := NewConverger(
c.Int("converged-timeout"), c.Int("converged-timeout"),
func(b bool) error { // lambda to run when converged nil, // stateFn gets added in by EmbdEtcd
if b {
log.Printf("Converged for %d seconds, exiting!", c.Int("converged-timeout"))
exit <- true // trigger an exit!
}
return nil
},
) )
go converger.Loop(true) // main loop for converger, true to start paused go converger.Loop(true) // main loop for converger, true to start paused
@@ -196,6 +211,24 @@ func run(c *cli.Context) error {
log.Printf("Main: Etcd: Startup failed: %v", err) log.Printf("Main: Etcd: Startup failed: %v", err)
exit <- true exit <- true
} }
convergerStateFn := func(b bool) error {
// exit if we are using the converged-timeout and we are the
// root node. otherwise, if we are a child node in a remote
// execution hierarchy, we should only notify our converged
// state and wait for the parent to trigger the exit.
if depth == 0 && c.Int("converged-timeout") >= 0 {
if b {
log.Printf("Converged for %d seconds, exiting!", c.Int("converged-timeout"))
exit <- true // trigger an exit!
}
return nil
}
// send our individual state into etcd for others to see
return EtcdSetHostnameConverged(EmbdEtcd, hostname, b) // TODO: what should happen on error?
}
if EmbdEtcd != nil {
converger.SetStateFn(convergerStateFn)
}
exitchan := make(chan Event) // exit event exitchan := make(chan Event) // exit event
go func() { go func() {
@@ -250,6 +283,12 @@ func run(c *cli.Context) error {
continue continue
} }
if config.Hostname != "" && config.Hostname != hostname {
log.Printf("Config: Hostname changed, ignoring config!")
continue
}
config.Hostname = hostname // set it in case it was ""
// run graph vertex LOCK... // run graph vertex LOCK...
if !first { // TODO: we can flatten this check out I think if !first { // TODO: we can flatten this check out I think
converger.Pause() // FIXME: add sync wait? converger.Pause() // FIXME: add sync wait?
@@ -258,7 +297,7 @@ func run(c *cli.Context) error {
// build graph from yaml file on events (eg: from etcd) // build graph from yaml file on events (eg: from etcd)
// we need the vertices to be paused to work on them // we need the vertices to be paused to work on them
if newFullgraph, err := fullGraph.NewGraphFromConfig(config, EmbdEtcd, hostname, noop); err == nil { // keep references to all original elements if newFullgraph, err := fullGraph.NewGraphFromConfig(config, EmbdEtcd, noop); err == nil { // keep references to all original elements
fullGraph = newFullgraph fullGraph = newFullgraph
} else { } else {
log.Printf("Config: Error making new graph from config: %v", err) log.Printf("Config: Error making new graph from config: %v", err)
@@ -303,6 +342,11 @@ func run(c *cli.Context) error {
events = nil // signal that no-watch is true events = nil // signal that no-watch is true
} }
// initialize the add watcher, which calls the f callback on map changes
convergerCb := func(f func(map[string]bool) error) (func(), error) {
return EtcdAddHostnameConvergedWatcher(EmbdEtcd, f)
}
// build remotes struct for remote ssh // build remotes struct for remote ssh
remotes := NewRemotes( remotes := NewRemotes(
EmbdEtcd.LocalhostClientURLs().StringSlice(), EmbdEtcd.LocalhostClientURLs().StringSlice(),
@@ -314,7 +358,10 @@ func run(c *cli.Context) error {
c.Bool("allow-interactive"), c.Bool("allow-interactive"),
c.String("ssh-priv-id-rsa"), c.String("ssh-priv-id-rsa"),
!c.Bool("no-caching"), !c.Bool("no-caching"),
depth,
prefix, prefix,
converger,
convergerCb,
) )
// TODO: is there any benefit to running the remotes above in the loop? // TODO: is there any benefit to running the remotes above in the loop?
@@ -503,6 +550,12 @@ func main() {
Name: "no-caching", Name: "no-caching",
Usage: "don't allow remote caching of remote execution binary", Usage: "don't allow remote caching of remote execution binary",
}, },
cli.IntFlag{
Name: "depth",
Hidden: true, // internal use only
Value: 0,
Usage: "specify depth in remote hierarchy",
},
cli.StringFlag{ cli.StringFlag{
Name: "prefix", Name: "prefix",
Usage: "specify a path to the working prefix directory", Usage: "specify a path to the working prefix directory",

113
remote.go
View File

@@ -78,6 +78,8 @@ const (
// The SSH struct is the unit building block for a single remote SSH connection. // The SSH struct is the unit building block for a single remote SSH connection.
type SSH struct { type SSH struct {
hostname string // uuid of the host, as used by the --hostname argument
host string // remote host to connect to host string // remote host to connect to
port uint16 // remote port to connect to (usually 22) port uint16 // remote port to connect to (usually 22)
user string // username to connect with user string // username to connect with
@@ -89,8 +91,10 @@ type SSH struct {
noop bool // whether to run the remote process with --noop noop bool // whether to run the remote process with --noop
noWatch bool // whether to run the remote process with --no-watch noWatch bool // whether to run the remote process with --no-watch
depth uint16 // depth of this node in the remote execution hierarchy
caching bool // whether to try and cache the copy of the binary caching bool // whether to try and cache the copy of the binary
prefix string // location we're allowed to put data on the remote server prefix string // location we're allowed to put data on the remote server
converger Converger
client *ssh.Client // client object client *ssh.Client // client object
sftp *sftp.Client // sftp object sftp *sftp.Client // sftp object
@@ -475,19 +479,24 @@ func (obj *SSH) Exec() error {
obj.session.Stdout = &b obj.session.Stdout = &b
obj.session.Stderr = &b obj.session.Stderr = &b
hostname := fmt.Sprintf("--hostname '%s'", obj.hostname)
// TODO: do something less arbitrary about which one we pick? // TODO: do something less arbitrary about which one we pick?
url := cleanURL(obj.remoteURLs[0]) // arbitrarily pick the first one url := cleanURL(obj.remoteURLs[0]) // arbitrarily pick the first one
seeds := fmt.Sprintf("--no-server --seeds 'http://%s'", url) // XXX: escape dangerous untrusted input? seeds := fmt.Sprintf("--no-server --seeds 'http://%s'", url) // XXX: escape dangerous untrusted input?
file := fmt.Sprintf("--file '%s'", obj.filepath) // XXX: escape dangerous untrusted input! file := fmt.Sprintf("--file '%s'", obj.filepath) // XXX: escape dangerous untrusted input!
args := []string{seeds, file} depth := fmt.Sprintf("--depth %d", obj.depth+1) // child is +1 distance
args := []string{hostname, seeds, file, depth}
if obj.noop { if obj.noop {
args = append(args, "--noop") args = append(args, "--noop")
} }
if obj.noWatch { if obj.noWatch {
args = append(args, "--no-watch") args = append(args, "--no-watch")
} }
if timeout := obj.converger.Timeout(); timeout >= 0 {
// TODO: add --converged-timeout support for group args = append(args, fmt.Sprintf("--converged-timeout=%d", timeout))
}
// TODO: we use a depth parameter instead of a simple bool, in case we
// want to have outwardly expanding trees of remote execution...
cmd := fmt.Sprintf("%s run %s", obj.execpath, strings.Join(args, " ")) cmd := fmt.Sprintf("%s run %s", obj.execpath, strings.Join(args, " "))
log.Printf("Remote: Running: %s", cmd) log.Printf("Remote: Running: %s", cmd)
@@ -669,17 +678,26 @@ type Remotes struct {
interactive bool // allow interactive prompting interactive bool // allow interactive prompting
sshPrivIdRsa string // path to ~/.ssh/id_rsa sshPrivIdRsa string // path to ~/.ssh/id_rsa
caching bool // whether to try and cache the copy of the binary caching bool // whether to try and cache the copy of the binary
depth uint16 // depth of this node in the remote execution hierarchy
prefix string // folder prefix to use for misc storage prefix string // folder prefix to use for misc storage
converger Converger
convergerCb func(func(map[string]bool) error) (func(), error)
wg sync.WaitGroup // keep track of each running SSH connection wg sync.WaitGroup // keep track of each running SSH connection
lock sync.Mutex // mutex for access to sshmap lock sync.Mutex // mutex for access to sshmap
sshmap map[string]*SSH // map to each SSH struct with the remote as the key sshmap map[string]*SSH // map to each SSH struct with the remote as the key
exiting bool // flag to let us know if we're exiting exiting bool // flag to let us know if we're exiting
exitChan chan struct{} // closes when we should exit
semaphore Semaphore // counting semaphore to limit concurrent connections semaphore Semaphore // counting semaphore to limit concurrent connections
hostnames []string // list of hostnames we've seen so far
cuuid ConvergerUUID // convergerUUID for the remote itself
cuuids map[string]ConvergerUUID // map to each SSH struct with the remote as the key
callbackCancelFunc func() // stored callback function cancel function
} }
// The NewRemotes function builds a Remotes struct. // The NewRemotes function builds a Remotes struct.
func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fileWatch chan string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, prefix string) *Remotes { func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fileWatch chan string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, depth uint16, prefix string, converger Converger, convergerCb func(func(map[string]bool) error) (func(), error)) *Remotes {
return &Remotes{ return &Remotes{
clientURLs: clientURLs, clientURLs: clientURLs,
remoteURLs: remoteURLs, remoteURLs: remoteURLs,
@@ -690,9 +708,15 @@ func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fi
interactive: interactive, interactive: interactive,
sshPrivIdRsa: sshPrivIdRsa, sshPrivIdRsa: sshPrivIdRsa,
caching: caching, caching: caching,
depth: depth,
prefix: prefix, prefix: prefix,
converger: converger,
convergerCb: convergerCb,
sshmap: make(map[string]*SSH), sshmap: make(map[string]*SSH),
exitChan: make(chan struct{}),
semaphore: NewSemaphore(int(cConns)), semaphore: NewSemaphore(int(cConns)),
hostnames: make([]string, len(remotes)),
cuuids: make(map[string]ConvergerUUID),
} }
} }
@@ -757,7 +781,17 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
return nil, fmt.Errorf("No authentication methods available!") return nil, fmt.Errorf("No authentication methods available!")
} }
hostname := config.Hostname
if hostname == "" {
hostname = host // default to above
}
if StrInList(hostname, obj.hostnames) {
return nil, fmt.Errorf("Remote: Hostname `%s` already exists!", hostname)
}
obj.hostnames = append(obj.hostnames, hostname)
return &SSH{ return &SSH{
hostname: hostname,
host: host, host: host,
port: port, port: port,
user: user, user: user,
@@ -767,7 +801,9 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
remoteURLs: obj.remoteURLs, remoteURLs: obj.remoteURLs,
noop: obj.noop, noop: obj.noop,
noWatch: obj.fileWatch == nil, noWatch: obj.fileWatch == nil,
depth: obj.depth,
caching: obj.caching, caching: obj.caching,
converger: obj.converger,
prefix: obj.prefix, prefix: obj.prefix,
}, nil }, nil
} }
@@ -846,14 +882,73 @@ func (obj *Remotes) passwordCallback(user, host string) func() (string, error) {
// The Run method of the Remotes struct kicks it all off. It is usually run from // The Run method of the Remotes struct kicks it all off. It is usually run from
// a go routine. // a go routine.
func (obj *Remotes) Run() { func (obj *Remotes) Run() {
// TODO: we can disable a lot of this if we're not using --converged-timeout
// link in all the converged timeout checking and callbacks...
obj.cuuid = obj.converger.Register() // one for me!
obj.cuuid.SetName("Remote: Run")
for _, f := range obj.remotes { // one for each remote...
obj.cuuids[f] = obj.converger.Register() // save a reference
obj.cuuids[f].SetName(fmt.Sprintf("Remote: %s", f))
//obj.cuuids[f].SetConverged(false) // everyone starts off false
}
// watch for converged state in the group of remotes...
cFunc := func(m map[string]bool) error {
// The hosts state has changed. Here is what it is now. Is this
// now converged, or not? Run SetConverged(b) to update status!
// The keys are hostnames, not filenames as in the sshmap keys.
// update converged status for each remote
for _, f := range obj.remotes {
// TODO: add obj.lock.Lock() ?
sshobj, exists := obj.sshmap[f]
// TODO: add obj.lock.Unlock() ?
if !exists || sshobj == nil {
continue // skip, this hasn't happened yet
}
hostname := sshobj.hostname
b, ok := m[hostname]
if !ok { // no status on hostname means unconverged!
continue
}
if DEBUG {
log.Printf("Remote: Converged: Status: %+v", obj.converger.Status())
}
// if exiting, don't update, it will be unregistered...
if !sshobj.exiting { // this is actually racy, but safe
obj.cuuids[f].SetConverged(b) // ignore errors!
}
}
return nil
}
if cancel, err := obj.convergerCb(cFunc); err != nil { // register the callback to run
obj.callbackCancelFunc = cancel
}
// kick off the file change notifications // kick off the file change notifications
// NOTE: if we ever change a config after a host has converged but has
// been let to exit before the group, then it won't see the changes...
if obj.fileWatch != nil { if obj.fileWatch != nil {
obj.wg.Add(1)
go func() { go func() {
defer obj.wg.Done()
var f string
var more bool
for { for {
f, more := <-obj.fileWatch // read from channel select {
case <-obj.exitChan: // closes when we're done
return
case f, more = <-obj.fileWatch: // read from channel
if !more { if !more {
return return
} }
obj.cuuid.SetConverged(false) // activity!
case <-obj.cuuid.ConvergedTimer():
obj.cuuid.SetConverged(true) // converged!
continue
}
obj.lock.Lock() obj.lock.Lock()
sshobj, exists := obj.sshmap[f] sshobj, exists := obj.sshmap[f]
obj.lock.Unlock() obj.lock.Unlock()
@@ -893,6 +988,8 @@ func (obj *Remotes) Run() {
defer obj.semaphore.V(1) defer obj.semaphore.V(1)
} }
defer obj.wg.Done() defer obj.wg.Done()
defer obj.cuuids[f].Unregister()
if err := sshobj.Go(); err != nil { if err := sshobj.Go(); err != nil {
log.Printf("Remote: Error: %s", err) log.Printf("Remote: Error: %s", err)
} }
@@ -907,6 +1004,7 @@ func (obj *Remotes) Exit() {
obj.lock.Lock() obj.lock.Lock()
obj.exiting = true // don't spawn new ones once this flag is set! obj.exiting = true // don't spawn new ones once this flag is set!
obj.lock.Unlock() obj.lock.Unlock()
close(obj.exitChan)
for _, f := range obj.remotes { for _, f := range obj.remotes {
sshobj, exists := obj.sshmap[f] sshobj, exists := obj.sshmap[f]
if !exists || sshobj == nil { if !exists || sshobj == nil {
@@ -919,6 +1017,11 @@ func (obj *Remotes) Exit() {
} }
} }
if obj.callbackCancelFunc != nil {
obj.callbackCancelFunc() // cancel our callback
}
defer obj.cuuid.Unregister()
obj.wg.Wait() // wait for everyone to exit obj.wg.Wait() // wait for everyone to exit
} }