diff --git a/configwatch.go b/configwatch.go deleted file mode 100644 index 1477ab3a..00000000 --- a/configwatch.go +++ /dev/null @@ -1,228 +0,0 @@ -// Mgmt -// Copyright (C) 2013-2016+ James Shubin and the project contributors -// Written by James Shubin and the project contributors -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package main - -import ( - "log" - "math" - "path" - "strings" - "sync" - "syscall" - - "github.com/purpleidea/mgmt/global" - "github.com/purpleidea/mgmt/util" - - "gopkg.in/fsnotify.v1" - //"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1" -) - -// ConfigWatcher returns events on a channel anytime one of its files events. -type ConfigWatcher struct { - ch chan string - wg sync.WaitGroup - closechan chan struct{} -} - -// NewConfigWatcher creates a new ConfigWatcher struct. -func NewConfigWatcher() *ConfigWatcher { - return &ConfigWatcher{ - ch: make(chan string), - closechan: make(chan struct{}), - } -} - -// Add new file paths to watch for events on. -func (obj *ConfigWatcher) Add(file ...string) { - if len(file) == 0 { - return - } - if len(file) > 1 { - for _, f := range file { // add all the files... - obj.Add(f) // recurse - } - return - } - // otherwise, add the one file passed in... - obj.wg.Add(1) - go func() { - defer obj.wg.Done() - ch := ConfigWatch(file[0]) - for { - select { - case <-ch: - obj.ch <- file[0] - continue - case <-obj.closechan: - return - } - } - }() -} - -// Events returns a channel to listen on for file events. It closes when it is -// emptied after the Close() method is called. You can test for closure with the -// f, more := <-obj.Events() pattern. -func (obj *ConfigWatcher) Events() chan string { - return obj.ch -} - -// Close shuts down the ConfigWatcher object. It closes the Events channel after -// all the currently pending events have been emptied. -func (obj *ConfigWatcher) Close() { - if obj.ch == nil { - return - } - close(obj.closechan) - obj.wg.Wait() // wait until everyone is done sending on obj.ch - //obj.ch <- "" // send finished message - close(obj.ch) - obj.ch = nil -} - -// ConfigWatch writes on the channel everytime an event is seen for the path. -// XXX: it would be great if we could reuse code between this and the file resource -// XXX: patch this to submit it as part of go-fsnotify if they're interested... -func ConfigWatch(file string) chan bool { - ch := make(chan bool) - go func() { - var safename = path.Clean(file) // no trailing slash - - watcher, err := fsnotify.NewWatcher() - if err != nil { - log.Fatal(err) - } - defer watcher.Close() - - patharray := util.PathSplit(safename) // tokenize the path - var index = len(patharray) // starting index - var current string // current "watcher" location - var deltaDepth int // depth delta between watcher and event - var send = false // send event? - - for { - current = strings.Join(patharray[0:index], "/") - if current == "" { // the empty string top is the root dir ("/") - current = "/" - } - if global.DEBUG { - log.Printf("Watching: %v", current) // attempting to watch... - } - // initialize in the loop so that we can reset on rm-ed handles - err = watcher.Add(current) - if err != nil { - if err == syscall.ENOENT { - index-- // usually not found, move up one dir - } else if err == syscall.ENOSPC { - // XXX: occasionally: no space left on device, - // XXX: probably due to lack of inotify watches - log.Printf("Out of inotify watches for config(%v)", file) - log.Fatal(err) - } else { - log.Printf("Unknown config(%v) error:", file) - log.Fatal(err) - } - index = int(math.Max(1, float64(index))) - continue - } - - select { - case event := <-watcher.Events: - // the deeper you go, the bigger the deltaDepth is... - // this is the difference between what we're watching, - // and the event... doesn't mean we can't watch deeper - if current == event.Name { - deltaDepth = 0 // i was watching what i was looking for - - } else if util.HasPathPrefix(event.Name, current) { - deltaDepth = len(util.PathSplit(current)) - len(util.PathSplit(event.Name)) // -1 or less - - } else if util.HasPathPrefix(current, event.Name) { - deltaDepth = len(util.PathSplit(event.Name)) - len(util.PathSplit(current)) // +1 or more - - } else { - // TODO different watchers get each others events! - // https://github.com/go-fsnotify/fsnotify/issues/95 - // this happened with two values such as: - // event.Name: /tmp/mgmt/f3 and current: /tmp/mgmt/f2 - continue - } - //log.Printf("The delta depth is: %v", deltaDepth) - - // if we have what we wanted, awesome, send an event... - if event.Name == safename { - //log.Println("Event!") - // TODO: filter out some of the events, is Write a sufficient minimum? - if event.Op&fsnotify.Write == fsnotify.Write { - send = true - } - - // file removed, move the watch upwards - if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) { - //log.Println("Removal!") - watcher.Remove(current) - index-- - } - - // we must be a parent watcher, so descend in - if deltaDepth < 0 { - watcher.Remove(current) - index++ - } - - // if safename starts with event.Name, we're above, and no event should be sent - } else if util.HasPathPrefix(safename, event.Name) { - //log.Println("Above!") - - if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) { - log.Println("Removal!") - watcher.Remove(current) - index-- - } - - if deltaDepth < 0 { - log.Println("Parent!") - if util.PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir - //send = true - } - watcher.Remove(current) - index++ - } - - // if event.Name startswith safename, send event, we're already deeper - } else if util.HasPathPrefix(event.Name, safename) { - //log.Println("Event2!") - //send = true - } - - case err := <-watcher.Errors: - log.Printf("error: %v", err) - log.Fatal(err) - - } - - // do our event sending all together to avoid duplicate msgs - if send { - send = false - ch <- true - } - } - //close(ch) - }() - return ch -} diff --git a/main.go b/main.go index 760f60f9..2d4bf20d 100644 --- a/main.go +++ b/main.go @@ -33,6 +33,7 @@ import ( "github.com/purpleidea/mgmt/global" "github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/puppet" + "github.com/purpleidea/mgmt/recwatch" "github.com/purpleidea/mgmt/remote" "github.com/purpleidea/mgmt/util" @@ -49,21 +50,24 @@ var ( ) // signal handler -func waitForSignal(exit chan bool) { +func waitForSignal(exit chan error) error { signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // catch ^C //signal.Notify(signals, os.Kill) // catch signals signal.Notify(signals, syscall.SIGTERM) select { - case e := <-signals: // any signal will do - if e == os.Interrupt { + case sig := <-signals: // any signal will do + if sig == os.Interrupt { log.Println("Interrupted by ^C") + return nil } else { log.Println("Interrupted by signal") + return fmt.Errorf("Killed by %v", sig) } - case <-exit: // or a manual signal + case err := <-exit: // or a manual signal log.Println("Interrupted by exit signal") + return err } } @@ -173,14 +177,14 @@ func run(c *cli.Context) error { log.Printf("Main: Working prefix is: %s", prefix) var wg sync.WaitGroup - exit := make(chan bool) // exit signal + exit := make(chan error) // exit signal var G, fullGraph *pgraph.Graph // exit after `max-runtime` seconds for no reason at all... if i := c.Int("max-runtime"); i > 0 { go func() { time.Sleep(time.Duration(i) * time.Second) - exit <- true + exit <- nil }() } @@ -209,11 +213,9 @@ func run(c *cli.Context) error { ) if EmbdEtcd == nil { // TODO: verify EmbdEtcd is not nil below... - log.Printf("Main: Etcd: Creation failed!") - exit <- true + exit <- fmt.Errorf("Main: Etcd: Creation failed!") } else if err := EmbdEtcd.Startup(); err != nil { // startup (returns when etcd main loop is running) - log.Printf("Main: Etcd: Startup failed: %v", err) - exit <- true + exit <- fmt.Errorf("Main: Etcd: Startup failed: %v", err) } convergerStateFn := func(b bool) error { // exit if we are using the converged-timeout and we are the @@ -223,7 +225,7 @@ func run(c *cli.Context) error { if depth == 0 && c.Int("converged-timeout") >= 0 { if b { log.Printf("Converged for %d seconds, exiting!", c.Int("converged-timeout")) - exit <- true // trigger an exit! + exit <- nil // trigger an exit! } return nil } @@ -239,10 +241,10 @@ func run(c *cli.Context) error { startchan := make(chan struct{}) // start signal go func() { startchan <- struct{}{} }() file := c.String("file") - var configchan chan bool + var configchan chan error var puppetchan <-chan time.Time if !c.Bool("no-watch") && c.IsSet("file") { - configchan = ConfigWatch(file) + configchan = recwatch.ConfigWatch(file) } else if c.IsSet("puppet") { interval := puppet.PuppetInterval(c.String("puppet-conf")) puppetchan = time.Tick(time.Duration(interval) * time.Second) @@ -265,10 +267,15 @@ func run(c *cli.Context) error { case <-puppetchan: // nothing, just go on - case msg := <-configchan: - if c.Bool("no-watch") || !msg { + case e := <-configchan: + if c.Bool("no-watch") { continue // not ready to read config } + if e != nil { + exit <- e // trigger exit + continue + //return // TODO: return or wait for exitchan? + } // XXX: case compile_event: ... // ... case <-exitchan: @@ -337,13 +344,22 @@ func run(c *cli.Context) error { } }() - configWatcher := NewConfigWatcher() + configWatcher := recwatch.NewConfigWatcher() events := configWatcher.Events() if !c.Bool("no-watch") { configWatcher.Add(c.StringSlice("remote")...) // add all the files... } else { events = nil // signal that no-watch is true } + go func() { + select { + case err := <-configWatcher.Error(): + exit <- err // trigger an exit! + + case <-exitchan: + return + } + }() // initialize the add watcher, which calls the f callback on map changes convergerCb := func(f func(map[string]bool) error) (func(), error) { @@ -377,7 +393,7 @@ func run(c *cli.Context) error { } log.Println("Main: Running...") - waitForSignal(exit) // pass in exit channel to watch + err = waitForSignal(exit) // pass in exit channel to watch log.Println("Destroy...") @@ -402,7 +418,7 @@ func run(c *cli.Context) error { // TODO: wait for each vertex to exit... log.Println("Goodbye!") - return nil + return err } func main() { diff --git a/recwatch/configwatch.go b/recwatch/configwatch.go new file mode 100644 index 00000000..60477424 --- /dev/null +++ b/recwatch/configwatch.go @@ -0,0 +1,134 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package recwatch + +import ( + "log" + "sync" + + "github.com/purpleidea/mgmt/global" +) + +// ConfigWatcher returns events on a channel anytime one of its files events. +type ConfigWatcher struct { + ch chan string + wg sync.WaitGroup + closechan chan struct{} + errorchan chan error +} + +// NewConfigWatcher creates a new ConfigWatcher struct. +func NewConfigWatcher() *ConfigWatcher { + return &ConfigWatcher{ + ch: make(chan string), + closechan: make(chan struct{}), + errorchan: make(chan error), + } +} + +// Add new file paths to watch for events on. +func (obj *ConfigWatcher) Add(file ...string) { + if len(file) == 0 { + return + } + if len(file) > 1 { + for _, f := range file { // add all the files... + obj.Add(f) // recurse + } + return + } + // otherwise, add the one file passed in... + obj.wg.Add(1) + go func() { + defer obj.wg.Done() + ch := ConfigWatch(file[0]) + for { + select { + case e := <-ch: + if e != nil { + obj.errorchan <- e + return + } + obj.ch <- file[0] + continue + case <-obj.closechan: + return + } + } + }() +} + +// Error returns a channel of errors that notifies us of permanent issues. +func (obj *ConfigWatcher) Error() <-chan error { + return obj.errorchan +} + +// Events returns a channel to listen on for file events. It closes when it is +// emptied after the Close() method is called. You can test for closure with the +// f, more := <-obj.Events() pattern. +func (obj *ConfigWatcher) Events() chan string { + return obj.ch +} + +// Close shuts down the ConfigWatcher object. It closes the Events channel after +// all the currently pending events have been emptied. +func (obj *ConfigWatcher) Close() { + if obj.ch == nil { + return + } + close(obj.closechan) + obj.wg.Wait() // wait until everyone is done sending on obj.ch + //obj.ch <- "" // send finished message + close(obj.ch) + obj.ch = nil + close(obj.errorchan) +} + +// ConfigWatch writes on the channel every time an event is seen for the path. +func ConfigWatch(file string) chan error { + ch := make(chan error) + go func() { + recWatcher, err := NewRecWatcher(file, false) + if err != nil { + ch <- err + close(ch) + return + } + defer recWatcher.Close() + for { + if global.DEBUG { + log.Printf("Watching: %v", file) + } + select { + case event, ok := <-recWatcher.Events(): + if !ok { // channel is closed + close(ch) + return + } + if err := event.Error; err != nil { + ch <- err + close(ch) + return + } + ch <- nil // send event! + } + } + //close(ch) + }() + return ch +} diff --git a/recwatch/recwatch.go b/recwatch/recwatch.go new file mode 100644 index 00000000..ebf4e4f8 --- /dev/null +++ b/recwatch/recwatch.go @@ -0,0 +1,317 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Package recwatch provides recursive file watching events via fsnotify. +package recwatch + +import ( + "fmt" + "log" + "math" + "os" + "path" + "path/filepath" + "strings" + "sync" + "syscall" + + "github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead? + "github.com/purpleidea/mgmt/util" + + "gopkg.in/fsnotify.v1" + //"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1" +) + +// Event represents a watcher event. These can include errors. +type Event struct { + Error error + Body *fsnotify.Event +} + +// RecWatcher is the struct for the recursive watcher. Run Init() on it. +type RecWatcher struct { + Path string // computed path + Recurse bool // should we watch recursively? + isDir bool // computed isDir + safename string // safe path + watcher *fsnotify.Watcher + watches map[string]struct{} + events chan Event // one channel for events and err... + once sync.Once + wg sync.WaitGroup + exit chan struct{} + closeErr error +} + +// NewRecWatcher creates an initializes a new recursive watcher. +func NewRecWatcher(path string, recurse bool) (*RecWatcher, error) { + obj := &RecWatcher{ + Path: path, + Recurse: recurse, + } + return obj, obj.Init() +} + +// Init starts the recursive file watcher. +func (obj *RecWatcher) Init() error { + obj.watcher = nil + obj.watches = make(map[string]struct{}) + obj.events = make(chan Event) + obj.exit = make(chan struct{}) + obj.isDir = strings.HasSuffix(obj.Path, "/") // dirs have trailing slashes + obj.safename = path.Clean(obj.Path) // no trailing slash + + var err error + obj.watcher, err = fsnotify.NewWatcher() + if err != nil { + return err + } + + if obj.isDir { + if err := obj.addSubFolders(obj.safename); err != nil { + return err + } + } + + go func() { + if err := obj.Watch(); err != nil { + obj.events <- Event{Error: err} + } + obj.Close() + }() + return nil +} + +//func (obj *RecWatcher) Add(path string) error { // XXX implement me or not? +// +//} +// +//func (obj *RecWatcher) Remove(path string) error { // XXX implement me or not? +// +//} + +// Close shuts down the watcher. +func (obj *RecWatcher) Close() error { + obj.once.Do(obj.close) // don't cause the channel to close twice + return obj.closeErr +} + +// This close function is the function that actually does the close work. Don't +// call it more than once! +func (obj *RecWatcher) close() { + var err error + close(obj.exit) // send exit signal + obj.wg.Wait() + if obj.watcher != nil { + err = obj.watcher.Close() + obj.watcher = nil + // TODO: should we send the close error? + //if err != nil { + // obj.events <- Event{Error: err} + //} + } + close(obj.events) + obj.closeErr = err // set the error +} + +// Events returns a channel of events. These include events for errors. +func (obj *RecWatcher) Events() chan Event { return obj.events } + +// Watch is the primary listener for this resource and it outputs events. +func (obj *RecWatcher) Watch() error { + if obj.watcher == nil { + return fmt.Errorf("Watcher is not initialized!") + } + obj.wg.Add(1) + defer obj.wg.Done() + + patharray := util.PathSplit(obj.safename) // tokenize the path + var index = len(patharray) // starting index + var current string // current "watcher" location + var deltaDepth int // depth delta between watcher and event + var send = false // send event? + + for { + current = strings.Join(patharray[0:index], "/") + if current == "" { // the empty string top is the root dir ("/") + current = "/" + } + if global.DEBUG { + log.Printf("Watching: %s", current) // attempting to watch... + } + // initialize in the loop so that we can reset on rm-ed handles + if err := obj.watcher.Add(current); err != nil { + if global.DEBUG { + log.Printf("watcher.Add(%s): Error: %v", current, err) + } + if err == syscall.ENOENT { + index-- // usually not found, move up one dir + } else if err == syscall.ENOSPC { + // no space left on device, out of inotify watches + // TODO: consider letting the user fall back to + // polling if they hit this error very often... + return fmt.Errorf("Out of inotify watches: %v", err) + } else if os.IsPermission(err) { + return fmt.Errorf("Permission denied adding a watch: %v", err) + } else { + return fmt.Errorf("Unknown error: %v", err) + } + index = int(math.Max(1, float64(index))) + continue + } + + select { + case event := <-obj.watcher.Events: + if global.DEBUG { + log.Printf("Watch(%s), Event(%s): %v", current, event.Name, event.Op) + } + // the deeper you go, the bigger the deltaDepth is... + // this is the difference between what we're watching, + // and the event... doesn't mean we can't watch deeper + if current == event.Name { + deltaDepth = 0 // i was watching what i was looking for + + } else if util.HasPathPrefix(event.Name, current) { + deltaDepth = len(util.PathSplit(current)) - len(util.PathSplit(event.Name)) // -1 or less + + } else if util.HasPathPrefix(current, event.Name) { + deltaDepth = len(util.PathSplit(event.Name)) - len(util.PathSplit(current)) // +1 or more + // if below me... + if _, exists := obj.watches[event.Name]; exists { + send = true + if event.Op&fsnotify.Remove == fsnotify.Remove { + obj.watcher.Remove(event.Name) + delete(obj.watches, event.Name) + } + if (event.Op&fsnotify.Create == fsnotify.Create) && isDir(event.Name) { + obj.watcher.Add(event.Name) + obj.watches[event.Name] = struct{}{} + if err := obj.addSubFolders(event.Name); err != nil { + return err + } + } + } + + } else { + // TODO different watchers get each others events! + // https://github.com/go-fsnotify/fsnotify/issues/95 + // this happened with two values such as: + // event.Name: /tmp/mgmt/f3 and current: /tmp/mgmt/f2 + continue + } + //log.Printf("The delta depth is: %v", deltaDepth) + + // if we have what we wanted, awesome, send an event... + if event.Name == obj.safename { + //log.Println("Event!") + // FIXME: should all these below cases trigger? + send = true + + if obj.isDir { + if err := obj.addSubFolders(obj.safename); err != nil { + return err + } + } + + // file removed, move the watch upwards + if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) { + //log.Println("Removal!") + obj.watcher.Remove(current) + index-- + } + + // we must be a parent watcher, so descend in + if deltaDepth < 0 { + // XXX: we can block here due to: https://github.com/fsnotify/fsnotify/issues/123 + obj.watcher.Remove(current) + index++ + } + + // if safename starts with event.Name, we're above, and no event should be sent + } else if util.HasPathPrefix(obj.safename, event.Name) { + //log.Println("Above!") + + if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) { + log.Println("Removal!") + obj.watcher.Remove(current) + index-- + } + + if deltaDepth < 0 { + log.Println("Parent!") + if util.PathPrefixDelta(obj.safename, event.Name) == 1 { // we're the parent dir + send = true + } + obj.watcher.Remove(current) + index++ + } + + // if event.Name startswith safename, send event, we're already deeper + } else if util.HasPathPrefix(event.Name, obj.safename) { + //log.Println("Event2!") + send = true + } + + // do all our event sending all together to avoid duplicate msgs + if send { + send = false + // only invalid state on certain types of events + obj.events <- Event{Error: nil, Body: &event} + } + + case err := <-obj.watcher.Errors: + return fmt.Errorf("Unknown watcher error: %v", err) + + case <-obj.exit: + return nil + } + } +} + +// addSubFolders is a helper that is used to add recursive dirs to the watches. +func (obj *RecWatcher) addSubFolders(p string) error { + if !obj.Recurse { + return nil // if we're not watching recursively, just exit early + } + // look at all subfolders... + walkFn := func(path string, info os.FileInfo, err error) error { + if global.DEBUG { + log.Printf("Walk: %s (%v): %v", path, info, err) + } + if err != nil { + return nil + } + if info.IsDir() { + obj.watches[path] = struct{}{} // add key + err := obj.watcher.Add(path) + if err != nil { + return err // TODO: will this bubble up? + } + } + return nil + } + err := filepath.Walk(p, walkFn) + return err +} + +func isDir(path string) bool { + finfo, err := os.Stat(path) + if err != nil { + return false + } + return finfo.IsDir() +} diff --git a/resources/file.go b/resources/file.go index 78f2a2de..765ed2ce 100644 --- a/resources/file.go +++ b/resources/file.go @@ -26,20 +26,16 @@ import ( "io" "io/ioutil" "log" - "math" "os" "path" "path/filepath" "strings" - "syscall" "time" "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead? + "github.com/purpleidea/mgmt/recwatch" "github.com/purpleidea/mgmt/util" - - "gopkg.in/fsnotify.v1" - //"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1" ) func init() { @@ -48,20 +44,19 @@ func init() { // FileRes is a file and directory resource. type FileRes struct { - BaseRes `yaml:",inline"` - Path string `yaml:"path"` // path variable (should default to name) - Dirname string `yaml:"dirname"` - Basename string `yaml:"basename"` - Content string `yaml:"content"` // FIXME: how do you describe: "leave content alone" - state = "create" ? - Source string `yaml:"source"` // file path for source content - State string `yaml:"state"` // state: exists/present?, absent, (undefined?) - Recurse bool `yaml:"recurse"` - Force bool `yaml:"force"` - path string // computed path - isDir bool // computed isDir - sha256sum string - watcher *fsnotify.Watcher - watches map[string]struct{} + BaseRes `yaml:",inline"` + Path string `yaml:"path"` // path variable (should default to name) + Dirname string `yaml:"dirname"` + Basename string `yaml:"basename"` + Content string `yaml:"content"` // FIXME: how do you describe: "leave content alone" - state = "create" ? + Source string `yaml:"source"` // file path for source content + State string `yaml:"state"` // state: exists/present?, absent, (undefined?) + Recurse bool `yaml:"recurse"` + Force bool `yaml:"force"` + path string // computed path + isDir bool // computed isDir + sha256sum string + recWatcher *recwatch.RecWatcher } // NewFileRes is a constructor for this resource. It also calls Init() for you. @@ -86,7 +81,6 @@ func NewFileRes(name, path, dirname, basename, content, source, state string, re // Init runs some startup code for this resource. func (obj *FileRes) Init() error { obj.sha256sum = "" - obj.watches = make(map[string]struct{}) if obj.Path == "" { // use the name as the path default if missing obj.Path = obj.BaseRes.Name } @@ -141,32 +135,6 @@ func (obj *FileRes) Validate() error { return nil } -// addSubFolders is a helper that is used to add recursive dirs to the watches. -func (obj *FileRes) addSubFolders(p string) error { - if !obj.Recurse { - return nil // if we're not watching recursively, just exit early - } - // look at all subfolders... - walkFn := func(path string, info os.FileInfo, err error) error { - if global.DEBUG { - log.Printf("%s[%s]: Walk: %s (%v): %v", obj.Kind(), obj.GetName(), path, info, err) - } - if err != nil { - return nil - } - if info.IsDir() { - obj.watches[path] = struct{}{} // add key - err := obj.watcher.Add(path) - if err != nil { - return err // TODO: will this bubble up? - } - } - return nil - } - err := filepath.Walk(p, walkFn) - return err -} - // Watch is the primary listener for this resource and it outputs events. // This one is a file watcher for files and directories. // Modify with caution, it is probably important to write some test cases first! @@ -191,167 +159,37 @@ func (obj *FileRes) Watch(processChan chan event.Event) error { return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout } - var safename = path.Clean(obj.path) // no trailing slash - var err error - obj.watcher, err = fsnotify.NewWatcher() + obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse) if err != nil { return err } - defer obj.watcher.Close() + defer obj.recWatcher.Close() - patharray := util.PathSplit(safename) // tokenize the path - var index = len(patharray) // starting index - var current string // current "watcher" location - var deltaDepth int // depth delta between watcher and event - var send = false // send event? + var send = false // send event? var exit = false var dirty = false - isDir := func(p string) bool { - finfo, err := os.Stat(p) - if err != nil { - return false - } - return finfo.IsDir() - } - - if obj.isDir { - if err := obj.addSubFolders(safename); err != nil { - return err - } - } for { - current = strings.Join(patharray[0:index], "/") - if current == "" { // the empty string top is the root dir ("/") - current = "/" - } if global.DEBUG { - log.Printf("%s[%s]: Watching: %v", obj.Kind(), obj.GetName(), current) // attempting to watch... - } - // initialize in the loop so that we can reset on rm-ed handles - err = obj.watcher.Add(current) - if err != nil { - if global.DEBUG { - log.Printf("%s[%s]: watcher.Add(%v): Error: %v", obj.Kind(), obj.GetName(), current, err) - } - if err == syscall.ENOENT { - index-- // usually not found, move up one dir - } else if err == syscall.ENOSPC { - // no space left on device, out of inotify watches - // TODO: consider letting the user fall back to - // polling if they hit this error very often... - return fmt.Errorf("%s[%s]: Out of inotify watches: %v", obj.Kind(), obj.GetName(), err) - } else if os.IsPermission(err) { - return fmt.Errorf("%s[%s]: Permission denied to add a watch: %v", obj.Kind(), obj.GetName(), err) - } else { - return fmt.Errorf("Unknown %s[%s] error: %v", obj.Kind(), obj.GetName(), err) - } - index = int(math.Max(1, float64(index))) - continue + log.Printf("%s[%s]: Watching: %s", obj.Kind(), obj.GetName(), obj.Path) // attempting to watch... } obj.SetState(ResStateWatching) // reset select { - case event := <-obj.watcher.Events: - if global.DEBUG { - log.Printf("%s[%s]: Watch(%s), Event(%s): %v", obj.Kind(), obj.GetName(), current, event.Name, event.Op) + case event, ok := <-obj.recWatcher.Events(): + if !ok { // channel shutdown + return nil } - cuuid.SetConverged(false) // XXX: technically i can detect if the event is erroneous or not first - // the deeper you go, the bigger the deltaDepth is... - // this is the difference between what we're watching, - // and the event... doesn't mean we can't watch deeper - if current == event.Name { - deltaDepth = 0 // i was watching what i was looking for - - } else if util.HasPathPrefix(event.Name, current) { - deltaDepth = len(util.PathSplit(current)) - len(util.PathSplit(event.Name)) // -1 or less - - } else if util.HasPathPrefix(current, event.Name) { - deltaDepth = len(util.PathSplit(event.Name)) - len(util.PathSplit(current)) // +1 or more - // if below me... - if _, exists := obj.watches[event.Name]; exists { - send = true - dirty = true - if event.Op&fsnotify.Remove == fsnotify.Remove { - obj.watcher.Remove(event.Name) - delete(obj.watches, event.Name) - } - if (event.Op&fsnotify.Create == fsnotify.Create) && isDir(event.Name) { - obj.watcher.Add(event.Name) - obj.watches[event.Name] = struct{}{} - if err := obj.addSubFolders(event.Name); err != nil { - return err - } - } - } - - } else { - // TODO different watchers get each others events! - // https://github.com/go-fsnotify/fsnotify/issues/95 - // this happened with two values such as: - // event.Name: /tmp/mgmt/f3 and current: /tmp/mgmt/f2 - continue - } - //log.Printf("The delta depth is: %v", deltaDepth) - - // if we have what we wanted, awesome, send an event... - if event.Name == safename { - //log.Println("Event!") - // FIXME: should all these below cases trigger? - send = true - dirty = true - - if obj.isDir { - if err := obj.addSubFolders(safename); err != nil { - return err - } - } - - // file removed, move the watch upwards - if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) { - //log.Println("Removal!") - obj.watcher.Remove(current) - index-- - } - - // we must be a parent watcher, so descend in - if deltaDepth < 0 { - // XXX: we can block here due to: https://github.com/fsnotify/fsnotify/issues/123 - obj.watcher.Remove(current) - index++ - } - - // if safename starts with event.Name, we're above, and no event should be sent - } else if util.HasPathPrefix(safename, event.Name) { - //log.Println("Above!") - - if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) { - log.Println("Removal!") - obj.watcher.Remove(current) - index-- - } - - if deltaDepth < 0 { - log.Println("Parent!") - if util.PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir - send = true - dirty = true - } - obj.watcher.Remove(current) - index++ - } - - // if event.Name startswith safename, send event, we're already deeper - } else if util.HasPathPrefix(event.Name, safename) { - //log.Println("Event2!") - send = true - dirty = true - } - - case err := <-obj.watcher.Errors: cuuid.SetConverged(false) - return fmt.Errorf("Unknown %s[%s] watcher error: %v", obj.Kind(), obj.GetName(), err) + if err := event.Error; err != nil { + return fmt.Errorf("Unknown %s[%s] watcher error: %v", obj.Kind(), obj.GetName(), err) + } + if global.DEBUG { // don't access event.Body if event.Error isn't nil + log.Printf("%s[%s]: Event(%s): %v", obj.Kind(), obj.GetName(), event.Body.Name, event.Body.Op) + } + send = true + dirty = true case event := <-obj.events: cuuid.SetConverged(false)