diff --git a/configwatch.go b/configwatch.go index 804005ce..a4b8ce3e 100644 --- a/configwatch.go +++ b/configwatch.go @@ -24,9 +24,64 @@ import ( "math" "path" "strings" + "sync" "syscall" ) +// ConfigWatcher returns events on a channel anytime one of its files events. +type ConfigWatcher struct { + ch chan string + wg sync.WaitGroup + closechan chan struct{} +} + +// NewConfigWatcher creates a new ConfigWatcher struct. +func NewConfigWatcher() *ConfigWatcher { + return &ConfigWatcher{ + ch: make(chan string), + closechan: make(chan struct{}), + } +} + +// The Add method adds a new file path to watch for events on. +func (obj *ConfigWatcher) Add(file string) { + obj.wg.Add(1) + go func() { + defer obj.wg.Done() + ch := ConfigWatch(file) + for { + select { + case <-ch: + obj.ch <- file + continue + case <-obj.closechan: + return + } + } + }() +} + +// Events returns a channel to listen on for file events. It closes when it is +// emptied after the Close() method is called. You can test for closure with the +// f, more := <-obj.Events() pattern. +func (obj *ConfigWatcher) Events() chan string { + return obj.ch +} + +// Close shuts down the ConfigWatcher object. It closes the Events channel after +// all the currently pending events have been emptied. +func (obj *ConfigWatcher) Close() { + if obj.ch == nil { + return + } + close(obj.closechan) + obj.wg.Wait() // wait until everyone is done sending on obj.ch + //obj.ch <- "" // send finished message + close(obj.ch) + obj.ch = nil +} + +// ConfigWatch writes on the channel everytime an event is seen for the path. // XXX: it would be great if we could reuse code between this and the file resource // XXX: patch this to submit it as part of go-fsnotify if they're interested... func ConfigWatch(file string) chan bool { @@ -51,8 +106,9 @@ func ConfigWatch(file string) chan bool { if current == "" { // the empty string top is the root dir ("/") current = "/" } - log.Printf("Watching: %v", current) // attempting to watch... - + if DEBUG { + log.Printf("Watching: %v", current) // attempting to watch... + } // initialize in the loop so that we can reset on rm-ed handles err = watcher.Add(current) if err != nil { @@ -97,7 +153,10 @@ func ConfigWatch(file string) chan bool { // if we have what we wanted, awesome, send an event... if event.Name == safename { //log.Println("Event!") - send = true + // TODO: filter out some of the events, is Write a sufficient minimum? + if event.Op&fsnotify.Write == fsnotify.Write { + send = true + } // file removed, move the watch upwards if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) { diff --git a/main.go b/main.go index 974e6a86..ff72bee0 100644 --- a/main.go +++ b/main.go @@ -262,12 +262,23 @@ func run(c *cli.Context) error { } }() + configWatcher := NewConfigWatcher() + events := configWatcher.Events() + if !c.Bool("no-watch") { + for _, f := range c.StringSlice("remote") { // add all the files... + configWatcher.Add(f) + } + } else { + events = nil // signal that no-watch is true + } + // build remotes struct for remote ssh remotes := NewRemotes( EmbdEtcd.LocalhostClientURLs().StringSlice(), []string{DefaultClientURL}, noop, c.StringSlice("remote"), // list of files + events, // watch for file changes cConns, c.Bool("allow-interactive"), c.String("ssh-priv-id-rsa"), @@ -288,7 +299,8 @@ func run(c *cli.Context) error { log.Println("Destroy...") - remotes.Exit() // tell all the remote connections to shutdown; waits! + configWatcher.Close() // stop sending file changes to remotes + remotes.Exit() // tell all the remote connections to shutdown; waits! G.Exit() // tell all the children to exit diff --git a/remote.go b/remote.go index 27e153c5..53d9485e 100644 --- a/remote.go +++ b/remote.go @@ -87,6 +87,7 @@ type SSH struct { clientURLs []string // list of urls where the local server is listening remoteURLs []string // list of urls where the remote server connects to noop bool // whether to run the remote process with --noop + noWatch bool // whether to run the remote process with --no-watch caching bool // whether to try and cache the copy of the binary prefix string // location we're allowed to put data on the remote server @@ -241,7 +242,7 @@ func (obj *SSH) Sftp() error { // TODO: should future versions use torrent for this copy and updates? obj.filepath = path.Join(obj.remotewd, path.Base(obj.file)) // same filename log.Println("Remote: Copying graph definition...") - _, err = obj.SftpCopy(obj.file, obj.filepath) + _, err = obj.SftpGraphCopy() if err != nil { // TODO: cleanup return fmt.Errorf("Error copying graph: %s", err) @@ -250,6 +251,14 @@ func (obj *SSH) Sftp() error { return nil } +// SftpGraphCopy is a helper function used for re-copying the graph definition. +func (obj *SSH) SftpGraphCopy() (int64, error) { + if obj.filepath == "" { + return -1, fmt.Errorf("Sftp session isn't ready yet!") + } + return obj.SftpCopy(obj.file, obj.filepath) +} + // SftpCopy is a simple helper function that runs a local -> remote sftp copy. func (obj *SSH) SftpCopy(src, dst string) (int64, error) { if obj.sftp == nil { @@ -473,6 +482,9 @@ func (obj *SSH) Exec() error { if obj.noop { args = append(args, "--noop") } + if obj.noWatch { + args = append(args, "--no-watch") + } // TODO: add --converged-timeout support for group @@ -648,11 +660,12 @@ type Remotes struct { remoteURLs []string // list of urls where the remote server connects to noop bool // whether to run in noop mode remotes []string // list of remote graph definition files to run - cConns uint16 // number of concurrent ssh connections, zero means unlimited - interactive bool // allow interactive prompting - sshPrivIdRsa string // path to ~/.ssh/id_rsa - caching bool // whether to try and cache the copy of the binary - prefix string // folder prefix to use for misc storage + fileWatch chan string + cConns uint16 // number of concurrent ssh connections, zero means unlimited + interactive bool // allow interactive prompting + sshPrivIdRsa string // path to ~/.ssh/id_rsa + caching bool // whether to try and cache the copy of the binary + prefix string // folder prefix to use for misc storage wg sync.WaitGroup // keep track of each running SSH connection lock sync.Mutex // mutex for access to sshmap @@ -662,12 +675,13 @@ type Remotes struct { } // The NewRemotes function builds a Remotes struct. -func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, prefix string) *Remotes { +func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fileWatch chan string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, prefix string) *Remotes { return &Remotes{ clientURLs: clientURLs, remoteURLs: remoteURLs, noop: noop, - remotes: remotes, + remotes: StrRemoveDuplicatesInList(remotes), + fileWatch: fileWatch, cConns: cConns, interactive: interactive, sshPrivIdRsa: sshPrivIdRsa, @@ -748,6 +762,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) { clientURLs: obj.clientURLs, remoteURLs: obj.remoteURLs, noop: obj.noop, + noWatch: obj.fileWatch == nil, caching: obj.caching, prefix: obj.prefix, }, nil @@ -827,6 +842,31 @@ func (obj *Remotes) passwordCallback(user, host string) func() (string, error) { // The Run method of the Remotes struct kicks it all off. It is usually run from // a go routine. func (obj *Remotes) Run() { + // kick off the file change notifications + if obj.fileWatch != nil { + go func() { + for { + f, more := <-obj.fileWatch // read from channel + if !more { + return + } + obj.lock.Lock() + sshobj, exists := obj.sshmap[f] + if !exists || sshobj == nil { + continue // skip, this hasn't happened yet + } + // NOTE: if this errors because the session isn't + // ready yet, it's fine, because we haven't copied + // the file yet, so the update notification isn't + // wasted, in fact, it's premature and redundant. + if _, err := sshobj.SftpGraphCopy(); err == nil { // push new copy + log.Printf("Remote: Copied over new graph definition: %s", f) + } // ignore errors + obj.lock.Unlock() + } + }() + } + // the semaphore provides the max simultaneous connection limit for _, f := range obj.remotes { if obj.cConns != 0 {