recwatch: Split recursive watching into sub-package

This splits the recursive watching bit of the file file resource into
it's own package. This also de-duplicates the configwatch code and puts
it into the same package. With these bits refactored, it was also easier
to clean up the error code in main.
This commit is contained in:
James Shubin
2016-10-03 14:48:57 -04:00
parent 567dcaf79d
commit 46893e84c3
5 changed files with 514 additions and 437 deletions

View File

@@ -1,228 +0,0 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
import (
"log"
"math"
"path"
"strings"
"sync"
"syscall"
"github.com/purpleidea/mgmt/global"
"github.com/purpleidea/mgmt/util"
"gopkg.in/fsnotify.v1"
//"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1"
)
// ConfigWatcher returns events on a channel anytime one of its files events.
type ConfigWatcher struct {
ch chan string
wg sync.WaitGroup
closechan chan struct{}
}
// NewConfigWatcher creates a new ConfigWatcher struct.
func NewConfigWatcher() *ConfigWatcher {
return &ConfigWatcher{
ch: make(chan string),
closechan: make(chan struct{}),
}
}
// Add new file paths to watch for events on.
func (obj *ConfigWatcher) Add(file ...string) {
if len(file) == 0 {
return
}
if len(file) > 1 {
for _, f := range file { // add all the files...
obj.Add(f) // recurse
}
return
}
// otherwise, add the one file passed in...
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
ch := ConfigWatch(file[0])
for {
select {
case <-ch:
obj.ch <- file[0]
continue
case <-obj.closechan:
return
}
}
}()
}
// Events returns a channel to listen on for file events. It closes when it is
// emptied after the Close() method is called. You can test for closure with the
// f, more := <-obj.Events() pattern.
func (obj *ConfigWatcher) Events() chan string {
return obj.ch
}
// Close shuts down the ConfigWatcher object. It closes the Events channel after
// all the currently pending events have been emptied.
func (obj *ConfigWatcher) Close() {
if obj.ch == nil {
return
}
close(obj.closechan)
obj.wg.Wait() // wait until everyone is done sending on obj.ch
//obj.ch <- "" // send finished message
close(obj.ch)
obj.ch = nil
}
// ConfigWatch writes on the channel everytime an event is seen for the path.
// XXX: it would be great if we could reuse code between this and the file resource
// XXX: patch this to submit it as part of go-fsnotify if they're interested...
func ConfigWatch(file string) chan bool {
ch := make(chan bool)
go func() {
var safename = path.Clean(file) // no trailing slash
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
patharray := util.PathSplit(safename) // tokenize the path
var index = len(patharray) // starting index
var current string // current "watcher" location
var deltaDepth int // depth delta between watcher and event
var send = false // send event?
for {
current = strings.Join(patharray[0:index], "/")
if current == "" { // the empty string top is the root dir ("/")
current = "/"
}
if global.DEBUG {
log.Printf("Watching: %v", current) // attempting to watch...
}
// initialize in the loop so that we can reset on rm-ed handles
err = watcher.Add(current)
if err != nil {
if err == syscall.ENOENT {
index-- // usually not found, move up one dir
} else if err == syscall.ENOSPC {
// XXX: occasionally: no space left on device,
// XXX: probably due to lack of inotify watches
log.Printf("Out of inotify watches for config(%v)", file)
log.Fatal(err)
} else {
log.Printf("Unknown config(%v) error:", file)
log.Fatal(err)
}
index = int(math.Max(1, float64(index)))
continue
}
select {
case event := <-watcher.Events:
// the deeper you go, the bigger the deltaDepth is...
// this is the difference between what we're watching,
// and the event... doesn't mean we can't watch deeper
if current == event.Name {
deltaDepth = 0 // i was watching what i was looking for
} else if util.HasPathPrefix(event.Name, current) {
deltaDepth = len(util.PathSplit(current)) - len(util.PathSplit(event.Name)) // -1 or less
} else if util.HasPathPrefix(current, event.Name) {
deltaDepth = len(util.PathSplit(event.Name)) - len(util.PathSplit(current)) // +1 or more
} else {
// TODO different watchers get each others events!
// https://github.com/go-fsnotify/fsnotify/issues/95
// this happened with two values such as:
// event.Name: /tmp/mgmt/f3 and current: /tmp/mgmt/f2
continue
}
//log.Printf("The delta depth is: %v", deltaDepth)
// if we have what we wanted, awesome, send an event...
if event.Name == safename {
//log.Println("Event!")
// TODO: filter out some of the events, is Write a sufficient minimum?
if event.Op&fsnotify.Write == fsnotify.Write {
send = true
}
// file removed, move the watch upwards
if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) {
//log.Println("Removal!")
watcher.Remove(current)
index--
}
// we must be a parent watcher, so descend in
if deltaDepth < 0 {
watcher.Remove(current)
index++
}
// if safename starts with event.Name, we're above, and no event should be sent
} else if util.HasPathPrefix(safename, event.Name) {
//log.Println("Above!")
if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) {
log.Println("Removal!")
watcher.Remove(current)
index--
}
if deltaDepth < 0 {
log.Println("Parent!")
if util.PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir
//send = true
}
watcher.Remove(current)
index++
}
// if event.Name startswith safename, send event, we're already deeper
} else if util.HasPathPrefix(event.Name, safename) {
//log.Println("Event2!")
//send = true
}
case err := <-watcher.Errors:
log.Printf("error: %v", err)
log.Fatal(err)
}
// do our event sending all together to avoid duplicate msgs
if send {
send = false
ch <- true
}
}
//close(ch)
}()
return ch
}

52
main.go
View File

@@ -33,6 +33,7 @@ import (
"github.com/purpleidea/mgmt/global" "github.com/purpleidea/mgmt/global"
"github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/puppet" "github.com/purpleidea/mgmt/puppet"
"github.com/purpleidea/mgmt/recwatch"
"github.com/purpleidea/mgmt/remote" "github.com/purpleidea/mgmt/remote"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
@@ -49,21 +50,24 @@ var (
) )
// signal handler // signal handler
func waitForSignal(exit chan bool) { func waitForSignal(exit chan error) error {
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt) // catch ^C signal.Notify(signals, os.Interrupt) // catch ^C
//signal.Notify(signals, os.Kill) // catch signals //signal.Notify(signals, os.Kill) // catch signals
signal.Notify(signals, syscall.SIGTERM) signal.Notify(signals, syscall.SIGTERM)
select { select {
case e := <-signals: // any signal will do case sig := <-signals: // any signal will do
if e == os.Interrupt { if sig == os.Interrupt {
log.Println("Interrupted by ^C") log.Println("Interrupted by ^C")
return nil
} else { } else {
log.Println("Interrupted by signal") log.Println("Interrupted by signal")
return fmt.Errorf("Killed by %v", sig)
} }
case <-exit: // or a manual signal case err := <-exit: // or a manual signal
log.Println("Interrupted by exit signal") log.Println("Interrupted by exit signal")
return err
} }
} }
@@ -173,14 +177,14 @@ func run(c *cli.Context) error {
log.Printf("Main: Working prefix is: %s", prefix) log.Printf("Main: Working prefix is: %s", prefix)
var wg sync.WaitGroup var wg sync.WaitGroup
exit := make(chan bool) // exit signal exit := make(chan error) // exit signal
var G, fullGraph *pgraph.Graph var G, fullGraph *pgraph.Graph
// exit after `max-runtime` seconds for no reason at all... // exit after `max-runtime` seconds for no reason at all...
if i := c.Int("max-runtime"); i > 0 { if i := c.Int("max-runtime"); i > 0 {
go func() { go func() {
time.Sleep(time.Duration(i) * time.Second) time.Sleep(time.Duration(i) * time.Second)
exit <- true exit <- nil
}() }()
} }
@@ -209,11 +213,9 @@ func run(c *cli.Context) error {
) )
if EmbdEtcd == nil { if EmbdEtcd == nil {
// TODO: verify EmbdEtcd is not nil below... // TODO: verify EmbdEtcd is not nil below...
log.Printf("Main: Etcd: Creation failed!") exit <- fmt.Errorf("Main: Etcd: Creation failed!")
exit <- true
} else if err := EmbdEtcd.Startup(); err != nil { // startup (returns when etcd main loop is running) } else if err := EmbdEtcd.Startup(); err != nil { // startup (returns when etcd main loop is running)
log.Printf("Main: Etcd: Startup failed: %v", err) exit <- fmt.Errorf("Main: Etcd: Startup failed: %v", err)
exit <- true
} }
convergerStateFn := func(b bool) error { convergerStateFn := func(b bool) error {
// exit if we are using the converged-timeout and we are the // exit if we are using the converged-timeout and we are the
@@ -223,7 +225,7 @@ func run(c *cli.Context) error {
if depth == 0 && c.Int("converged-timeout") >= 0 { if depth == 0 && c.Int("converged-timeout") >= 0 {
if b { if b {
log.Printf("Converged for %d seconds, exiting!", c.Int("converged-timeout")) log.Printf("Converged for %d seconds, exiting!", c.Int("converged-timeout"))
exit <- true // trigger an exit! exit <- nil // trigger an exit!
} }
return nil return nil
} }
@@ -239,10 +241,10 @@ func run(c *cli.Context) error {
startchan := make(chan struct{}) // start signal startchan := make(chan struct{}) // start signal
go func() { startchan <- struct{}{} }() go func() { startchan <- struct{}{} }()
file := c.String("file") file := c.String("file")
var configchan chan bool var configchan chan error
var puppetchan <-chan time.Time var puppetchan <-chan time.Time
if !c.Bool("no-watch") && c.IsSet("file") { if !c.Bool("no-watch") && c.IsSet("file") {
configchan = ConfigWatch(file) configchan = recwatch.ConfigWatch(file)
} else if c.IsSet("puppet") { } else if c.IsSet("puppet") {
interval := puppet.PuppetInterval(c.String("puppet-conf")) interval := puppet.PuppetInterval(c.String("puppet-conf"))
puppetchan = time.Tick(time.Duration(interval) * time.Second) puppetchan = time.Tick(time.Duration(interval) * time.Second)
@@ -265,10 +267,15 @@ func run(c *cli.Context) error {
case <-puppetchan: case <-puppetchan:
// nothing, just go on // nothing, just go on
case msg := <-configchan: case e := <-configchan:
if c.Bool("no-watch") || !msg { if c.Bool("no-watch") {
continue // not ready to read config continue // not ready to read config
} }
if e != nil {
exit <- e // trigger exit
continue
//return // TODO: return or wait for exitchan?
}
// XXX: case compile_event: ... // XXX: case compile_event: ...
// ... // ...
case <-exitchan: case <-exitchan:
@@ -337,13 +344,22 @@ func run(c *cli.Context) error {
} }
}() }()
configWatcher := NewConfigWatcher() configWatcher := recwatch.NewConfigWatcher()
events := configWatcher.Events() events := configWatcher.Events()
if !c.Bool("no-watch") { if !c.Bool("no-watch") {
configWatcher.Add(c.StringSlice("remote")...) // add all the files... configWatcher.Add(c.StringSlice("remote")...) // add all the files...
} else { } else {
events = nil // signal that no-watch is true events = nil // signal that no-watch is true
} }
go func() {
select {
case err := <-configWatcher.Error():
exit <- err // trigger an exit!
case <-exitchan:
return
}
}()
// initialize the add watcher, which calls the f callback on map changes // initialize the add watcher, which calls the f callback on map changes
convergerCb := func(f func(map[string]bool) error) (func(), error) { convergerCb := func(f func(map[string]bool) error) (func(), error) {
@@ -377,7 +393,7 @@ func run(c *cli.Context) error {
} }
log.Println("Main: Running...") log.Println("Main: Running...")
waitForSignal(exit) // pass in exit channel to watch err = waitForSignal(exit) // pass in exit channel to watch
log.Println("Destroy...") log.Println("Destroy...")
@@ -402,7 +418,7 @@ func run(c *cli.Context) error {
// TODO: wait for each vertex to exit... // TODO: wait for each vertex to exit...
log.Println("Goodbye!") log.Println("Goodbye!")
return nil return err
} }
func main() { func main() {

134
recwatch/configwatch.go Normal file
View File

@@ -0,0 +1,134 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package recwatch
import (
"log"
"sync"
"github.com/purpleidea/mgmt/global"
)
// ConfigWatcher returns events on a channel anytime one of its files events.
type ConfigWatcher struct {
ch chan string
wg sync.WaitGroup
closechan chan struct{}
errorchan chan error
}
// NewConfigWatcher creates a new ConfigWatcher struct.
func NewConfigWatcher() *ConfigWatcher {
return &ConfigWatcher{
ch: make(chan string),
closechan: make(chan struct{}),
errorchan: make(chan error),
}
}
// Add new file paths to watch for events on.
func (obj *ConfigWatcher) Add(file ...string) {
if len(file) == 0 {
return
}
if len(file) > 1 {
for _, f := range file { // add all the files...
obj.Add(f) // recurse
}
return
}
// otherwise, add the one file passed in...
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
ch := ConfigWatch(file[0])
for {
select {
case e := <-ch:
if e != nil {
obj.errorchan <- e
return
}
obj.ch <- file[0]
continue
case <-obj.closechan:
return
}
}
}()
}
// Error returns a channel of errors that notifies us of permanent issues.
func (obj *ConfigWatcher) Error() <-chan error {
return obj.errorchan
}
// Events returns a channel to listen on for file events. It closes when it is
// emptied after the Close() method is called. You can test for closure with the
// f, more := <-obj.Events() pattern.
func (obj *ConfigWatcher) Events() chan string {
return obj.ch
}
// Close shuts down the ConfigWatcher object. It closes the Events channel after
// all the currently pending events have been emptied.
func (obj *ConfigWatcher) Close() {
if obj.ch == nil {
return
}
close(obj.closechan)
obj.wg.Wait() // wait until everyone is done sending on obj.ch
//obj.ch <- "" // send finished message
close(obj.ch)
obj.ch = nil
close(obj.errorchan)
}
// ConfigWatch writes on the channel every time an event is seen for the path.
func ConfigWatch(file string) chan error {
ch := make(chan error)
go func() {
recWatcher, err := NewRecWatcher(file, false)
if err != nil {
ch <- err
close(ch)
return
}
defer recWatcher.Close()
for {
if global.DEBUG {
log.Printf("Watching: %v", file)
}
select {
case event, ok := <-recWatcher.Events():
if !ok { // channel is closed
close(ch)
return
}
if err := event.Error; err != nil {
ch <- err
close(ch)
return
}
ch <- nil // send event!
}
}
//close(ch)
}()
return ch
}

317
recwatch/recwatch.go Normal file
View File

@@ -0,0 +1,317 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Package recwatch provides recursive file watching events via fsnotify.
package recwatch
import (
"fmt"
"log"
"math"
"os"
"path"
"path/filepath"
"strings"
"sync"
"syscall"
"github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead?
"github.com/purpleidea/mgmt/util"
"gopkg.in/fsnotify.v1"
//"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1"
)
// Event represents a watcher event. These can include errors.
type Event struct {
Error error
Body *fsnotify.Event
}
// RecWatcher is the struct for the recursive watcher. Run Init() on it.
type RecWatcher struct {
Path string // computed path
Recurse bool // should we watch recursively?
isDir bool // computed isDir
safename string // safe path
watcher *fsnotify.Watcher
watches map[string]struct{}
events chan Event // one channel for events and err...
once sync.Once
wg sync.WaitGroup
exit chan struct{}
closeErr error
}
// NewRecWatcher creates an initializes a new recursive watcher.
func NewRecWatcher(path string, recurse bool) (*RecWatcher, error) {
obj := &RecWatcher{
Path: path,
Recurse: recurse,
}
return obj, obj.Init()
}
// Init starts the recursive file watcher.
func (obj *RecWatcher) Init() error {
obj.watcher = nil
obj.watches = make(map[string]struct{})
obj.events = make(chan Event)
obj.exit = make(chan struct{})
obj.isDir = strings.HasSuffix(obj.Path, "/") // dirs have trailing slashes
obj.safename = path.Clean(obj.Path) // no trailing slash
var err error
obj.watcher, err = fsnotify.NewWatcher()
if err != nil {
return err
}
if obj.isDir {
if err := obj.addSubFolders(obj.safename); err != nil {
return err
}
}
go func() {
if err := obj.Watch(); err != nil {
obj.events <- Event{Error: err}
}
obj.Close()
}()
return nil
}
//func (obj *RecWatcher) Add(path string) error { // XXX implement me or not?
//
//}
//
//func (obj *RecWatcher) Remove(path string) error { // XXX implement me or not?
//
//}
// Close shuts down the watcher.
func (obj *RecWatcher) Close() error {
obj.once.Do(obj.close) // don't cause the channel to close twice
return obj.closeErr
}
// This close function is the function that actually does the close work. Don't
// call it more than once!
func (obj *RecWatcher) close() {
var err error
close(obj.exit) // send exit signal
obj.wg.Wait()
if obj.watcher != nil {
err = obj.watcher.Close()
obj.watcher = nil
// TODO: should we send the close error?
//if err != nil {
// obj.events <- Event{Error: err}
//}
}
close(obj.events)
obj.closeErr = err // set the error
}
// Events returns a channel of events. These include events for errors.
func (obj *RecWatcher) Events() chan Event { return obj.events }
// Watch is the primary listener for this resource and it outputs events.
func (obj *RecWatcher) Watch() error {
if obj.watcher == nil {
return fmt.Errorf("Watcher is not initialized!")
}
obj.wg.Add(1)
defer obj.wg.Done()
patharray := util.PathSplit(obj.safename) // tokenize the path
var index = len(patharray) // starting index
var current string // current "watcher" location
var deltaDepth int // depth delta between watcher and event
var send = false // send event?
for {
current = strings.Join(patharray[0:index], "/")
if current == "" { // the empty string top is the root dir ("/")
current = "/"
}
if global.DEBUG {
log.Printf("Watching: %s", current) // attempting to watch...
}
// initialize in the loop so that we can reset on rm-ed handles
if err := obj.watcher.Add(current); err != nil {
if global.DEBUG {
log.Printf("watcher.Add(%s): Error: %v", current, err)
}
if err == syscall.ENOENT {
index-- // usually not found, move up one dir
} else if err == syscall.ENOSPC {
// no space left on device, out of inotify watches
// TODO: consider letting the user fall back to
// polling if they hit this error very often...
return fmt.Errorf("Out of inotify watches: %v", err)
} else if os.IsPermission(err) {
return fmt.Errorf("Permission denied adding a watch: %v", err)
} else {
return fmt.Errorf("Unknown error: %v", err)
}
index = int(math.Max(1, float64(index)))
continue
}
select {
case event := <-obj.watcher.Events:
if global.DEBUG {
log.Printf("Watch(%s), Event(%s): %v", current, event.Name, event.Op)
}
// the deeper you go, the bigger the deltaDepth is...
// this is the difference between what we're watching,
// and the event... doesn't mean we can't watch deeper
if current == event.Name {
deltaDepth = 0 // i was watching what i was looking for
} else if util.HasPathPrefix(event.Name, current) {
deltaDepth = len(util.PathSplit(current)) - len(util.PathSplit(event.Name)) // -1 or less
} else if util.HasPathPrefix(current, event.Name) {
deltaDepth = len(util.PathSplit(event.Name)) - len(util.PathSplit(current)) // +1 or more
// if below me...
if _, exists := obj.watches[event.Name]; exists {
send = true
if event.Op&fsnotify.Remove == fsnotify.Remove {
obj.watcher.Remove(event.Name)
delete(obj.watches, event.Name)
}
if (event.Op&fsnotify.Create == fsnotify.Create) && isDir(event.Name) {
obj.watcher.Add(event.Name)
obj.watches[event.Name] = struct{}{}
if err := obj.addSubFolders(event.Name); err != nil {
return err
}
}
}
} else {
// TODO different watchers get each others events!
// https://github.com/go-fsnotify/fsnotify/issues/95
// this happened with two values such as:
// event.Name: /tmp/mgmt/f3 and current: /tmp/mgmt/f2
continue
}
//log.Printf("The delta depth is: %v", deltaDepth)
// if we have what we wanted, awesome, send an event...
if event.Name == obj.safename {
//log.Println("Event!")
// FIXME: should all these below cases trigger?
send = true
if obj.isDir {
if err := obj.addSubFolders(obj.safename); err != nil {
return err
}
}
// file removed, move the watch upwards
if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) {
//log.Println("Removal!")
obj.watcher.Remove(current)
index--
}
// we must be a parent watcher, so descend in
if deltaDepth < 0 {
// XXX: we can block here due to: https://github.com/fsnotify/fsnotify/issues/123
obj.watcher.Remove(current)
index++
}
// if safename starts with event.Name, we're above, and no event should be sent
} else if util.HasPathPrefix(obj.safename, event.Name) {
//log.Println("Above!")
if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) {
log.Println("Removal!")
obj.watcher.Remove(current)
index--
}
if deltaDepth < 0 {
log.Println("Parent!")
if util.PathPrefixDelta(obj.safename, event.Name) == 1 { // we're the parent dir
send = true
}
obj.watcher.Remove(current)
index++
}
// if event.Name startswith safename, send event, we're already deeper
} else if util.HasPathPrefix(event.Name, obj.safename) {
//log.Println("Event2!")
send = true
}
// do all our event sending all together to avoid duplicate msgs
if send {
send = false
// only invalid state on certain types of events
obj.events <- Event{Error: nil, Body: &event}
}
case err := <-obj.watcher.Errors:
return fmt.Errorf("Unknown watcher error: %v", err)
case <-obj.exit:
return nil
}
}
}
// addSubFolders is a helper that is used to add recursive dirs to the watches.
func (obj *RecWatcher) addSubFolders(p string) error {
if !obj.Recurse {
return nil // if we're not watching recursively, just exit early
}
// look at all subfolders...
walkFn := func(path string, info os.FileInfo, err error) error {
if global.DEBUG {
log.Printf("Walk: %s (%v): %v", path, info, err)
}
if err != nil {
return nil
}
if info.IsDir() {
obj.watches[path] = struct{}{} // add key
err := obj.watcher.Add(path)
if err != nil {
return err // TODO: will this bubble up?
}
}
return nil
}
err := filepath.Walk(p, walkFn)
return err
}
func isDir(path string) bool {
finfo, err := os.Stat(path)
if err != nil {
return false
}
return finfo.IsDir()
}

View File

@@ -26,20 +26,16 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
"math"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"strings" "strings"
"syscall"
"time" "time"
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead? "github.com/purpleidea/mgmt/global" // XXX: package mgmtmain instead?
"github.com/purpleidea/mgmt/recwatch"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
"gopkg.in/fsnotify.v1"
//"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1"
) )
func init() { func init() {
@@ -60,8 +56,7 @@ type FileRes struct {
path string // computed path path string // computed path
isDir bool // computed isDir isDir bool // computed isDir
sha256sum string sha256sum string
watcher *fsnotify.Watcher recWatcher *recwatch.RecWatcher
watches map[string]struct{}
} }
// NewFileRes is a constructor for this resource. It also calls Init() for you. // NewFileRes is a constructor for this resource. It also calls Init() for you.
@@ -86,7 +81,6 @@ func NewFileRes(name, path, dirname, basename, content, source, state string, re
// Init runs some startup code for this resource. // Init runs some startup code for this resource.
func (obj *FileRes) Init() error { func (obj *FileRes) Init() error {
obj.sha256sum = "" obj.sha256sum = ""
obj.watches = make(map[string]struct{})
if obj.Path == "" { // use the name as the path default if missing if obj.Path == "" { // use the name as the path default if missing
obj.Path = obj.BaseRes.Name obj.Path = obj.BaseRes.Name
} }
@@ -141,32 +135,6 @@ func (obj *FileRes) Validate() error {
return nil return nil
} }
// addSubFolders is a helper that is used to add recursive dirs to the watches.
func (obj *FileRes) addSubFolders(p string) error {
if !obj.Recurse {
return nil // if we're not watching recursively, just exit early
}
// look at all subfolders...
walkFn := func(path string, info os.FileInfo, err error) error {
if global.DEBUG {
log.Printf("%s[%s]: Walk: %s (%v): %v", obj.Kind(), obj.GetName(), path, info, err)
}
if err != nil {
return nil
}
if info.IsDir() {
obj.watches[path] = struct{}{} // add key
err := obj.watcher.Add(path)
if err != nil {
return err // TODO: will this bubble up?
}
}
return nil
}
err := filepath.Walk(p, walkFn)
return err
}
// Watch is the primary listener for this resource and it outputs events. // Watch is the primary listener for this resource and it outputs events.
// This one is a file watcher for files and directories. // This one is a file watcher for files and directories.
// Modify with caution, it is probably important to write some test cases first! // Modify with caution, it is probably important to write some test cases first!
@@ -191,167 +159,37 @@ func (obj *FileRes) Watch(processChan chan event.Event) error {
return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout return time.After(time.Duration(500) * time.Millisecond) // 1/2 the resolution of converged timeout
} }
var safename = path.Clean(obj.path) // no trailing slash
var err error var err error
obj.watcher, err = fsnotify.NewWatcher() obj.recWatcher, err = recwatch.NewRecWatcher(obj.Path, obj.Recurse)
if err != nil { if err != nil {
return err return err
} }
defer obj.watcher.Close() defer obj.recWatcher.Close()
patharray := util.PathSplit(safename) // tokenize the path
var index = len(patharray) // starting index
var current string // current "watcher" location
var deltaDepth int // depth delta between watcher and event
var send = false // send event? var send = false // send event?
var exit = false var exit = false
var dirty = false var dirty = false
isDir := func(p string) bool {
finfo, err := os.Stat(p)
if err != nil {
return false
}
return finfo.IsDir()
}
if obj.isDir {
if err := obj.addSubFolders(safename); err != nil {
return err
}
}
for { for {
current = strings.Join(patharray[0:index], "/")
if current == "" { // the empty string top is the root dir ("/")
current = "/"
}
if global.DEBUG { if global.DEBUG {
log.Printf("%s[%s]: Watching: %v", obj.Kind(), obj.GetName(), current) // attempting to watch... log.Printf("%s[%s]: Watching: %s", obj.Kind(), obj.GetName(), obj.Path) // attempting to watch...
}
// initialize in the loop so that we can reset on rm-ed handles
err = obj.watcher.Add(current)
if err != nil {
if global.DEBUG {
log.Printf("%s[%s]: watcher.Add(%v): Error: %v", obj.Kind(), obj.GetName(), current, err)
}
if err == syscall.ENOENT {
index-- // usually not found, move up one dir
} else if err == syscall.ENOSPC {
// no space left on device, out of inotify watches
// TODO: consider letting the user fall back to
// polling if they hit this error very often...
return fmt.Errorf("%s[%s]: Out of inotify watches: %v", obj.Kind(), obj.GetName(), err)
} else if os.IsPermission(err) {
return fmt.Errorf("%s[%s]: Permission denied to add a watch: %v", obj.Kind(), obj.GetName(), err)
} else {
return fmt.Errorf("Unknown %s[%s] error: %v", obj.Kind(), obj.GetName(), err)
}
index = int(math.Max(1, float64(index)))
continue
} }
obj.SetState(ResStateWatching) // reset obj.SetState(ResStateWatching) // reset
select { select {
case event := <-obj.watcher.Events: case event, ok := <-obj.recWatcher.Events():
if global.DEBUG { if !ok { // channel shutdown
log.Printf("%s[%s]: Watch(%s), Event(%s): %v", obj.Kind(), obj.GetName(), current, event.Name, event.Op) return nil
} }
cuuid.SetConverged(false) // XXX: technically i can detect if the event is erroneous or not first
// the deeper you go, the bigger the deltaDepth is...
// this is the difference between what we're watching,
// and the event... doesn't mean we can't watch deeper
if current == event.Name {
deltaDepth = 0 // i was watching what i was looking for
} else if util.HasPathPrefix(event.Name, current) {
deltaDepth = len(util.PathSplit(current)) - len(util.PathSplit(event.Name)) // -1 or less
} else if util.HasPathPrefix(current, event.Name) {
deltaDepth = len(util.PathSplit(event.Name)) - len(util.PathSplit(current)) // +1 or more
// if below me...
if _, exists := obj.watches[event.Name]; exists {
send = true
dirty = true
if event.Op&fsnotify.Remove == fsnotify.Remove {
obj.watcher.Remove(event.Name)
delete(obj.watches, event.Name)
}
if (event.Op&fsnotify.Create == fsnotify.Create) && isDir(event.Name) {
obj.watcher.Add(event.Name)
obj.watches[event.Name] = struct{}{}
if err := obj.addSubFolders(event.Name); err != nil {
return err
}
}
}
} else {
// TODO different watchers get each others events!
// https://github.com/go-fsnotify/fsnotify/issues/95
// this happened with two values such as:
// event.Name: /tmp/mgmt/f3 and current: /tmp/mgmt/f2
continue
}
//log.Printf("The delta depth is: %v", deltaDepth)
// if we have what we wanted, awesome, send an event...
if event.Name == safename {
//log.Println("Event!")
// FIXME: should all these below cases trigger?
send = true
dirty = true
if obj.isDir {
if err := obj.addSubFolders(safename); err != nil {
return err
}
}
// file removed, move the watch upwards
if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) {
//log.Println("Removal!")
obj.watcher.Remove(current)
index--
}
// we must be a parent watcher, so descend in
if deltaDepth < 0 {
// XXX: we can block here due to: https://github.com/fsnotify/fsnotify/issues/123
obj.watcher.Remove(current)
index++
}
// if safename starts with event.Name, we're above, and no event should be sent
} else if util.HasPathPrefix(safename, event.Name) {
//log.Println("Above!")
if deltaDepth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) {
log.Println("Removal!")
obj.watcher.Remove(current)
index--
}
if deltaDepth < 0 {
log.Println("Parent!")
if util.PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir
send = true
dirty = true
}
obj.watcher.Remove(current)
index++
}
// if event.Name startswith safename, send event, we're already deeper
} else if util.HasPathPrefix(event.Name, safename) {
//log.Println("Event2!")
send = true
dirty = true
}
case err := <-obj.watcher.Errors:
cuuid.SetConverged(false) cuuid.SetConverged(false)
if err := event.Error; err != nil {
return fmt.Errorf("Unknown %s[%s] watcher error: %v", obj.Kind(), obj.GetName(), err) return fmt.Errorf("Unknown %s[%s] watcher error: %v", obj.Kind(), obj.GetName(), err)
}
if global.DEBUG { // don't access event.Body if event.Error isn't nil
log.Printf("%s[%s]: Event(%s): %v", obj.Kind(), obj.GetName(), event.Body.Name, event.Body.Op)
}
send = true
dirty = true
case event := <-obj.events: case event := <-obj.events:
cuuid.SetConverged(false) cuuid.SetConverged(false)