Automatically update remote files on change

This extends the automatic watching of graph definition files across the
remote SSH boundary.
This commit is contained in:
James Shubin
2016-08-09 16:20:57 -04:00
parent 1d0e187838
commit 79ba750dd5
3 changed files with 123 additions and 12 deletions

View File

@@ -24,9 +24,64 @@ import (
"math"
"path"
"strings"
"sync"
"syscall"
)
// ConfigWatcher returns events on a channel anytime one of its files events.
type ConfigWatcher struct {
ch chan string
wg sync.WaitGroup
closechan chan struct{}
}
// NewConfigWatcher creates a new ConfigWatcher struct.
func NewConfigWatcher() *ConfigWatcher {
return &ConfigWatcher{
ch: make(chan string),
closechan: make(chan struct{}),
}
}
// The Add method adds a new file path to watch for events on.
func (obj *ConfigWatcher) Add(file string) {
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
ch := ConfigWatch(file)
for {
select {
case <-ch:
obj.ch <- file
continue
case <-obj.closechan:
return
}
}
}()
}
// Events returns a channel to listen on for file events. It closes when it is
// emptied after the Close() method is called. You can test for closure with the
// f, more := <-obj.Events() pattern.
func (obj *ConfigWatcher) Events() chan string {
return obj.ch
}
// Close shuts down the ConfigWatcher object. It closes the Events channel after
// all the currently pending events have been emptied.
func (obj *ConfigWatcher) Close() {
if obj.ch == nil {
return
}
close(obj.closechan)
obj.wg.Wait() // wait until everyone is done sending on obj.ch
//obj.ch <- "" // send finished message
close(obj.ch)
obj.ch = nil
}
// ConfigWatch writes on the channel everytime an event is seen for the path.
// XXX: it would be great if we could reuse code between this and the file resource
// XXX: patch this to submit it as part of go-fsnotify if they're interested...
func ConfigWatch(file string) chan bool {
@@ -51,8 +106,9 @@ func ConfigWatch(file string) chan bool {
if current == "" { // the empty string top is the root dir ("/")
current = "/"
}
log.Printf("Watching: %v", current) // attempting to watch...
if DEBUG {
log.Printf("Watching: %v", current) // attempting to watch...
}
// initialize in the loop so that we can reset on rm-ed handles
err = watcher.Add(current)
if err != nil {
@@ -97,7 +153,10 @@ func ConfigWatch(file string) chan bool {
// if we have what we wanted, awesome, send an event...
if event.Name == safename {
//log.Println("Event!")
send = true
// TODO: filter out some of the events, is Write a sufficient minimum?
if event.Op&fsnotify.Write == fsnotify.Write {
send = true
}
// file removed, move the watch upwards
if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) {

14
main.go
View File

@@ -262,12 +262,23 @@ func run(c *cli.Context) error {
}
}()
configWatcher := NewConfigWatcher()
events := configWatcher.Events()
if !c.Bool("no-watch") {
for _, f := range c.StringSlice("remote") { // add all the files...
configWatcher.Add(f)
}
} else {
events = nil // signal that no-watch is true
}
// build remotes struct for remote ssh
remotes := NewRemotes(
EmbdEtcd.LocalhostClientURLs().StringSlice(),
[]string{DefaultClientURL},
noop,
c.StringSlice("remote"), // list of files
events, // watch for file changes
cConns,
c.Bool("allow-interactive"),
c.String("ssh-priv-id-rsa"),
@@ -288,7 +299,8 @@ func run(c *cli.Context) error {
log.Println("Destroy...")
remotes.Exit() // tell all the remote connections to shutdown; waits!
configWatcher.Close() // stop sending file changes to remotes
remotes.Exit() // tell all the remote connections to shutdown; waits!
G.Exit() // tell all the children to exit

View File

@@ -87,6 +87,7 @@ type SSH struct {
clientURLs []string // list of urls where the local server is listening
remoteURLs []string // list of urls where the remote server connects to
noop bool // whether to run the remote process with --noop
noWatch bool // whether to run the remote process with --no-watch
caching bool // whether to try and cache the copy of the binary
prefix string // location we're allowed to put data on the remote server
@@ -241,7 +242,7 @@ func (obj *SSH) Sftp() error {
// TODO: should future versions use torrent for this copy and updates?
obj.filepath = path.Join(obj.remotewd, path.Base(obj.file)) // same filename
log.Println("Remote: Copying graph definition...")
_, err = obj.SftpCopy(obj.file, obj.filepath)
_, err = obj.SftpGraphCopy()
if err != nil {
// TODO: cleanup
return fmt.Errorf("Error copying graph: %s", err)
@@ -250,6 +251,14 @@ func (obj *SSH) Sftp() error {
return nil
}
// SftpGraphCopy is a helper function used for re-copying the graph definition.
func (obj *SSH) SftpGraphCopy() (int64, error) {
if obj.filepath == "" {
return -1, fmt.Errorf("Sftp session isn't ready yet!")
}
return obj.SftpCopy(obj.file, obj.filepath)
}
// SftpCopy is a simple helper function that runs a local -> remote sftp copy.
func (obj *SSH) SftpCopy(src, dst string) (int64, error) {
if obj.sftp == nil {
@@ -473,6 +482,9 @@ func (obj *SSH) Exec() error {
if obj.noop {
args = append(args, "--noop")
}
if obj.noWatch {
args = append(args, "--no-watch")
}
// TODO: add --converged-timeout support for group
@@ -648,11 +660,12 @@ type Remotes struct {
remoteURLs []string // list of urls where the remote server connects to
noop bool // whether to run in noop mode
remotes []string // list of remote graph definition files to run
cConns uint16 // number of concurrent ssh connections, zero means unlimited
interactive bool // allow interactive prompting
sshPrivIdRsa string // path to ~/.ssh/id_rsa
caching bool // whether to try and cache the copy of the binary
prefix string // folder prefix to use for misc storage
fileWatch chan string
cConns uint16 // number of concurrent ssh connections, zero means unlimited
interactive bool // allow interactive prompting
sshPrivIdRsa string // path to ~/.ssh/id_rsa
caching bool // whether to try and cache the copy of the binary
prefix string // folder prefix to use for misc storage
wg sync.WaitGroup // keep track of each running SSH connection
lock sync.Mutex // mutex for access to sshmap
@@ -662,12 +675,13 @@ type Remotes struct {
}
// The NewRemotes function builds a Remotes struct.
func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, prefix string) *Remotes {
func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fileWatch chan string, cConns uint16, interactive bool, sshPrivIdRsa string, caching bool, prefix string) *Remotes {
return &Remotes{
clientURLs: clientURLs,
remoteURLs: remoteURLs,
noop: noop,
remotes: remotes,
remotes: StrRemoveDuplicatesInList(remotes),
fileWatch: fileWatch,
cConns: cConns,
interactive: interactive,
sshPrivIdRsa: sshPrivIdRsa,
@@ -748,6 +762,7 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
clientURLs: obj.clientURLs,
remoteURLs: obj.remoteURLs,
noop: obj.noop,
noWatch: obj.fileWatch == nil,
caching: obj.caching,
prefix: obj.prefix,
}, nil
@@ -827,6 +842,31 @@ func (obj *Remotes) passwordCallback(user, host string) func() (string, error) {
// The Run method of the Remotes struct kicks it all off. It is usually run from
// a go routine.
func (obj *Remotes) Run() {
// kick off the file change notifications
if obj.fileWatch != nil {
go func() {
for {
f, more := <-obj.fileWatch // read from channel
if !more {
return
}
obj.lock.Lock()
sshobj, exists := obj.sshmap[f]
if !exists || sshobj == nil {
continue // skip, this hasn't happened yet
}
// NOTE: if this errors because the session isn't
// ready yet, it's fine, because we haven't copied
// the file yet, so the update notification isn't
// wasted, in fact, it's premature and redundant.
if _, err := sshobj.SftpGraphCopy(); err == nil { // push new copy
log.Printf("Remote: Copied over new graph definition: %s", f)
} // ignore errors
obj.lock.Unlock()
}
}()
}
// the semaphore provides the max simultaneous connection limit
for _, f := range obj.remotes {
if obj.cConns != 0 {