diff --git a/config.go b/config.go index 4ed35c72..37f97633 100644 --- a/config.go +++ b/config.go @@ -66,7 +66,23 @@ func (c *graphConfig) Parse(data []byte) error { return nil } -func UpdateGraphFromConfig(filename, hostname string, g *Graph, kapi etcd.KeysAPI) bool { +func ParseConfigFromFile(filename string) *graphConfig { + data, err := ioutil.ReadFile(filename) + if err != nil { + log.Printf("Error: Config: ParseConfigFromFile: File: %v", err) + return nil + } + + var config graphConfig + if err := config.Parse(data); err != nil { + log.Printf("Error: Config: ParseConfigFromFile: Parse: %v", err) + return nil + } + + return &config +} + +func UpdateGraphFromConfig(config *graphConfig, hostname string, g *Graph, kapi etcd.KeysAPI) { var NoopMap map[string]*Vertex = make(map[string]*Vertex) var FileMap map[string]*Vertex = make(map[string]*Vertex) @@ -77,17 +93,6 @@ func UpdateGraphFromConfig(filename, hostname string, g *Graph, kapi etcd.KeysAP lookup["file"] = FileMap lookup["service"] = ServiceMap - data, err := ioutil.ReadFile(filename) - if err != nil { - log.Fatal(err) - return false - } - - var config graphConfig - if err := config.Parse(data); err != nil { - log.Fatal(err) - return false - } //fmt.Printf("%+v\n", config) // debug g.SetName(config.Graph) // set graph name @@ -116,7 +121,7 @@ func UpdateGraphFromConfig(filename, hostname string, g *Graph, kapi etcd.KeysAP continue } } else { - obj := NewFileType(t.Name, t.Path, t.Content, t.State) + obj := NewFileType(t.Name, t.Path, t.Dirname, t.Basename, t.Content, t.State) v := g.GetVertexMatch(obj) if v == nil { // no match found v = NewVertex(obj) @@ -145,15 +150,19 @@ func UpdateGraphFromConfig(filename, hostname string, g *Graph, kapi etcd.KeysAP if ok { for _, t := range config.Collector { // XXX: use t.Type and optionally t.Pattern to collect from etcd storage - log.Printf("Collect: %v(%v)", t.Type, t.Pattern) + log.Printf("Collect: %v; Pattern: %v", t.Type, t.Pattern) for _, x := range EtcdGetProcess(nodes, "file") { var obj *FileType if B64ToObj(x, &obj) != true { - log.Printf("File: %v error!", x) + log.Printf("Collect: File: %v not collected!", x) continue } - log.Printf("File: %v found!", obj.GetName()) + if t.Pattern != "" { // XXX: currently the pattern for files can only override the Dirname variable :P + obj.Dirname = t.Pattern + } + + log.Printf("Collect: File: %v collected!", obj.GetName()) // XXX: similar to file add code: v := g.GetVertexMatch(obj) @@ -182,6 +191,4 @@ func UpdateGraphFromConfig(filename, hostname string, g *Graph, kapi etcd.KeysAP for _, e := range config.Edges { g.AddEdge(lookup[e.From.Type][e.From.Name], lookup[e.To.Type][e.To.Name], NewEdge(e.Name)) } - - return true } diff --git a/configwatch.go b/configwatch.go new file mode 100644 index 00000000..a10b1669 --- /dev/null +++ b/configwatch.go @@ -0,0 +1,155 @@ +// Mgmt +// Copyright (C) 2013-2015+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "gopkg.in/fsnotify.v1" + //"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1" + "log" + "math" + "path" + "strings" + "syscall" +) + +// XXX: it would be great if we could reuse code between this and the file type +// XXX: patch this to submit it as part of go-fsnotify if they're interested... +func ConfigWatch(file string) chan bool { + ch := make(chan bool) + go func() { + var safename = path.Clean(file) // no trailing slash + + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Fatal(err) + } + defer watcher.Close() + + patharray := PathSplit(safename) // tokenize the path + var index = len(patharray) // starting index + var current string // current "watcher" location + var delta_depth int // depth delta between watcher and event + var send = false // send event? + + for { + current = strings.Join(patharray[0:index], "/") + if current == "" { // the empty string top is the root dir ("/") + current = "/" + } + log.Printf("Watching: %v\n", current) // attempting to watch... + + // initialize in the loop so that we can reset on rm-ed handles + err = watcher.Add(current) + if err != nil { + if err == syscall.ENOENT { + index-- // usually not found, move up one dir + } else if err == syscall.ENOSPC { + // XXX: occasionally: no space left on device, + // XXX: probably due to lack of inotify watches + log.Printf("Lack of watches for config(%v) error: %+v\n", file, err.Error) // 0x408da0 + log.Fatal(err) + } else { + log.Printf("Unknown config(%v) error:\n", file) + log.Fatal(err) + } + index = int(math.Max(1, float64(index))) + continue + } + + select { + case event := <-watcher.Events: + // the deeper you go, the bigger the delta_depth is... + // this is the difference between what we're watching, + // and the event... doesn't mean we can't watch deeper + if current == event.Name { + delta_depth = 0 // i was watching what i was looking for + + } else if HasPathPrefix(event.Name, current) { + delta_depth = len(PathSplit(current)) - len(PathSplit(event.Name)) // -1 or less + + } else if HasPathPrefix(current, event.Name) { + delta_depth = len(PathSplit(event.Name)) - len(PathSplit(current)) // +1 or more + + } else { + // TODO different watchers get each others events! + // https://github.com/go-fsnotify/fsnotify/issues/95 + // this happened with two values such as: + // event.Name: /tmp/mgmt/f3 and current: /tmp/mgmt/f2 + continue + } + //log.Printf("The delta depth is: %v\n", delta_depth) + + // if we have what we wanted, awesome, send an event... + if event.Name == safename { + //log.Println("Event!") + send = true + + // file removed, move the watch upwards + if delta_depth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) { + //log.Println("Removal!") + watcher.Remove(current) + index-- + } + + // we must be a parent watcher, so descend in + if delta_depth < 0 { + watcher.Remove(current) + index++ + } + + // if safename starts with event.Name, we're above, and no event should be sent + } else if HasPathPrefix(safename, event.Name) { + //log.Println("Above!") + + if delta_depth >= 0 && (event.Op&fsnotify.Remove == fsnotify.Remove) { + log.Println("Removal!") + watcher.Remove(current) + index-- + } + + if delta_depth < 0 { + log.Println("Parent!") + if PathPrefixDelta(safename, event.Name) == 1 { // we're the parent dir + //send = true + } + watcher.Remove(current) + index++ + } + + // if event.Name startswith safename, send event, we're already deeper + } else if HasPathPrefix(event.Name, safename) { + //log.Println("Event2!") + //send = true + } + + case err := <-watcher.Errors: + log.Println("error:", err) + log.Fatal(err) + + } + + // do our event sending all together to avoid duplicate msgs + if send { + send = false + ch <- true + } + } + close(ch) + }() + return ch +} diff --git a/etcd.go b/etcd.go index 4983d573..3eb96c74 100644 --- a/etcd.go +++ b/etcd.go @@ -27,10 +27,19 @@ import ( "time" ) -func EtcdGetKAPI() etcd.KeysAPI { +//go:generate stringer -type=etcdMsg -output=etcdmsg_stringer.go +type etcdMsg int +const ( + etcdStart etcdMsg = iota + etcdEvent + etcdFoo + etcdBar +) + +func EtcdGetKAPI(seed string) etcd.KeysAPI { cfg := etcd.Config{ - Endpoints: []string{"http://127.0.0.1:2379"}, + Endpoints: []string{seed}, Transport: etcd.DefaultTransport, // set timeout per request to fail fast when the target endpoint is unavailable HeaderTimeoutPerRequest: time.Second, @@ -56,24 +65,21 @@ func EtcdGetKAPI() etcd.KeysAPI { return etcd.NewKeysAPI(c) } -func EtcdWatch(kapi etcd.KeysAPI, kick bool) chan string { +func EtcdWatch(kapi etcd.KeysAPI) chan etcdMsg { // XXX: i think we need this buffered so that when we're hanging on the // channel, which is inside the EtcdWatch main loop, we still want the // calls to Get/Set on etcd to succeed, so blocking them here would // kill the whole thing - ch := make(chan string, 1) // XXX: buffer of at least 1 is required - if kick { - ch <- "hello" - } - go func(ch chan string) { + ch := make(chan etcdMsg, 1) // XXX: buffer of at least 1 is required + go func(ch chan etcdMsg) { tmin := 500 // initial (min) delay in ms t := tmin // current time tmult := 2 // multiplier for exponential delay tmax := 16000 // max delay watcher := kapi.Watcher("/exported/", &etcd.WatcherOptions{Recursive: true}) for { - log.Printf("Watching etcd...") - resp, err := watcher.Next(etcd_context.Background()) + log.Printf("Etcd: Watching...") + resp, err := watcher.Next(etcd_context.Background()) // blocks here if err != nil { if err == etcd_context.Canceled { // ctx is canceled by another routine @@ -87,8 +93,18 @@ func EtcdWatch(kapi etcd.KeysAPI, kick bool) chan string { for _, e := range cerr.Errors { if strings.HasSuffix(e.Error(), "getsockopt: connection refused") { t = int(math.Min(float64(t*tmult), float64(tmax))) - log.Printf("Waiting %d ms for etcd...", t) + log.Printf("Etcd: Waiting %d ms for connection...", t) time.Sleep(time.Duration(t) * time.Millisecond) // sleep for t ms + + } else if e.Error() == "unexpected EOF" { + log.Printf("Etcd: Disconnected...") + + } else if strings.HasPrefix(e.Error(), "unsupported protocol scheme") { + // usually a bad peer endpoint value + log.Fatal("Bad peer endpoint value?") + + } else { + log.Fatal("Woops: ", e.Error()) } } } else { @@ -114,7 +130,8 @@ func EtcdWatch(kapi etcd.KeysAPI, kick bool) chan string { // IOW, ignore everything except for the value or some // field which gets set last... this could be the max count field thing... - ch <- resp.Node.Value // event + log.Printf("Etcd: Value: %v", resp.Node.Value) // event + ch <- etcdEvent // event } } // end for loop @@ -127,7 +144,7 @@ func EtcdWatch(kapi etcd.KeysAPI, kick bool) chan string { func EtcdPut(kapi etcd.KeysAPI, hostname, key, typ string, obj interface{}) bool { output, ok := ObjToB64(obj) if !ok { - log.Printf("Could not encode %v for etcd.", key) + log.Printf("Etcd: Could not encode %v key.", key) return false } @@ -146,7 +163,7 @@ func EtcdPut(kapi etcd.KeysAPI, hostname, key, typ string, obj interface{}) bool //if e == etcd.ErrClusterUnavailable } } - log.Printf("Could not store %v in etcd.", key) + log.Printf("Etcd: Could not store %v key.", key) return false } log.Print("Etcd: ", resp) // w00t... bonus @@ -155,7 +172,6 @@ func EtcdPut(kapi etcd.KeysAPI, hostname, key, typ string, obj interface{}) bool // lookup /exported/ node hierarchy func EtcdGet(kapi etcd.KeysAPI) (etcd.Nodes, bool) { - // key structure is /exported//types/... resp, err := kapi.Get(etcd_context.Background(), "/exported/", &etcd.GetOptions{Recursive: true}) if err != nil { @@ -165,7 +181,6 @@ func EtcdGet(kapi etcd.KeysAPI) (etcd.Nodes, bool) { } func EtcdGetProcess(nodes etcd.Nodes, typ string) []string { - //path := fmt.Sprintf("/exported/%s/types/", h) top := "/exported/" log.Printf("Etcd: Get: %+v", nodes) // Get().Nodes.Nodes diff --git a/event.go b/event.go index 0aa34e0b..10ae1f14 100644 --- a/event.go +++ b/event.go @@ -24,11 +24,8 @@ const ( eventExit eventName = iota eventStart eventPause - eventContinue eventPoke eventChanged - //eventPaused - eventStarted ) type Event struct { diff --git a/examples/graph1.yaml b/examples/graph1.yaml index 1ccff29c..74f08a2a 100644 --- a/examples/graph1.yaml +++ b/examples/graph1.yaml @@ -5,22 +5,22 @@ types: - name: noop1 file: - name: file1 - path: /tmp/mgmt/f1 + path: "/tmp/mgmt/f1" content: | i am f1 state: exists - name: file2 - path: /tmp/mgmt/f2 + path: "/tmp/mgmt/f2" content: | i am f2 state: exists - name: file3 - path: /tmp/mgmt/f3 + path: "/tmp/mgmt/f3" content: | i am f3 state: exists - name: file4 - path: /tmp/mgmt/f4 + path: "/tmp/mgmt/f4" content: | i am f4 and i should not be here state: absent diff --git a/examples/graph2.yaml b/examples/graph2.yaml index cf316909..9edc2990 100644 --- a/examples/graph2.yaml +++ b/examples/graph2.yaml @@ -5,7 +5,7 @@ types: - name: noop1 file: - name: file1 - path: /tmp/mgmt/f1 + path: "/tmp/mgmt/f1" content: | i am f1 state: exists diff --git a/examples/graph3a.yaml b/examples/graph3a.yaml index 05068129..22f585b9 100644 --- a/examples/graph3a.yaml +++ b/examples/graph3a.yaml @@ -2,43 +2,43 @@ graph: mygraph types: noop: - - name: noop1 + - name: noop1a file: - - name: file1 - path: /tmp/mgmt/f1 + - name: file1a + path: "/tmp/mgmt1/f1a" content: | i am f1 state: exists - - name: file2 - path: /tmp/mgmt/f2 + - name: file2a + path: "/tmp/mgmt1/f2a" content: | i am f2 state: exists - - name: '@@file3' - path: /tmp/mgmt/f3 + - name: "@@file3a" + path: "/tmp/mgmt1/f3a" content: | i am f3, exported from host A state: exists - - name: '@@file4' - path: /tmp/mgmt/f4 + - name: "@@file4a" + path: "/tmp/mgmt1/f4a" content: | i am f4, exported from host A state: exists collect: - type: file - pattern: '' + pattern: "/tmp/mgmt1/" edges: - name: e1 from: type: noop - name: noop1 + name: noop1a to: type: file - name: file1 + name: file1a - name: e2 from: type: file - name: file1 + name: file1a to: type: file - name: file2 + name: file2a diff --git a/examples/graph3b.yaml b/examples/graph3b.yaml index 04a51f8a..6d02bcc7 100644 --- a/examples/graph3b.yaml +++ b/examples/graph3b.yaml @@ -2,43 +2,43 @@ graph: mygraph types: noop: - - name: noop1 + - name: noop1b file: - - name: file1 - path: /tmp/mgmt/f1 + - name: file1b + path: "/tmp/mgmt2/f1b" content: | i am f1 state: exists - - name: file2 - path: /tmp/mgmt/f2 + - name: file2b + path: "/tmp/mgmt2/f2b" content: | i am f2 state: exists - - name: '@@file3' - path: /tmp/mgmt/f3 + - name: "@@file3b" + path: "/tmp/mgmt2/f3b" content: | i am f3, exported from host B state: exists - - name: '@@file4' - path: /tmp/mgmt/f4 + - name: "@@file4b" + path: "/tmp/mgmt2/f4b" content: | i am f4, exported from host B state: exists collect: - type: file - pattern: '' + pattern: "/tmp/mgmt2/" edges: - name: e1 from: type: noop - name: noop1 + name: noop1b to: type: file - name: file1 + name: file1b - name: e2 from: type: file - name: file1 + name: file1b to: type: file - name: file2 + name: file2b diff --git a/examples/graph3c.yaml b/examples/graph3c.yaml new file mode 100644 index 00000000..4c54195c --- /dev/null +++ b/examples/graph3c.yaml @@ -0,0 +1,44 @@ +--- +graph: mygraph +types: + noop: + - name: noop1c + file: + - name: file1c + path: "/tmp/mgmt3/f1c" + content: | + i am f1 + state: exists + - name: file2c + path: "/tmp/mgmt3/f2c" + content: | + i am f2 + state: exists + - name: "@@file3c" + path: "/tmp/mgmt3/f3c" + content: | + i am f3, exported from host C + state: exists + - name: "@@file4c" + path: "/tmp/mgmt3/f4c" + content: | + i am f4, exported from host C + state: exists +collect: +- type: file + pattern: "/tmp/mgmt3/" +edges: +- name: e1 + from: + type: noop + name: noop1c + to: + type: file + name: file1c +- name: e2 + from: + type: file + name: file1c + to: + type: file + name: file2c diff --git a/examples/graph4.yaml b/examples/graph4.yaml index 6cd26139..d3cdbe3e 100644 --- a/examples/graph4.yaml +++ b/examples/graph4.yaml @@ -3,12 +3,12 @@ graph: mygraph types: file: - name: file1 - path: /tmp/mgmt/f1 + path: "/tmp/mgmt/f1" content: | i am f1 state: exists - - name: '@@file3' - path: /tmp/mgmt/f3 + - name: "@@file3" + path: "/tmp/mgmt/f3" content: | i am f3, exported from host A state: exists diff --git a/examples/graph5.yaml b/examples/graph5.yaml index 66f90dd2..af5c929d 100644 --- a/examples/graph5.yaml +++ b/examples/graph5.yaml @@ -3,7 +3,7 @@ graph: mygraph types: file: - name: file1 - path: /tmp/mgmt/f1 + path: "/tmp/mgmt/f1" content: | i am f1 state: exists diff --git a/file.go b/file.go index 0cdc5497..674bf362 100644 --- a/file.go +++ b/file.go @@ -35,12 +35,14 @@ import ( type FileType struct { BaseType `yaml:",inline"` Path string `yaml:"path"` // path variable (should default to name) + Dirname string `yaml:"dirname"` + Basename string `yaml:"basename"` Content string `yaml:"content"` State string `yaml:"state"` // state: exists/present?, absent, (undefined?) sha256sum string } -func NewFileType(name, path, content, state string) *FileType { +func NewFileType(name, path, dirname, basename, content, state string) *FileType { // FIXME if path = nil, path = name ... return &FileType{ BaseType: BaseType{ @@ -49,6 +51,8 @@ func NewFileType(name, path, content, state string) *FileType { vertex: nil, }, Path: path, + Dirname: dirname, + Basename: basename, Content: content, State: state, sha256sum: "", @@ -59,15 +63,52 @@ func (obj *FileType) GetType() string { return "File" } +// validate if the params passed in are valid data +func (obj *FileType) Validate() bool { + if obj.Dirname != "" { + // must end with / + if obj.Dirname[len(obj.Dirname)-1:] != "/" { + return false + } + } + if obj.Basename != "" { + // must not start with / + if obj.Basename[0:1] == "/" { + return false + } + } + return true +} + +func (obj *FileType) GetPath() string { + d := Dirname(obj.Path) + b := Basename(obj.Path) + if !obj.Validate() || (obj.Dirname == "" && obj.Basename == "") { + return obj.Path + } else if obj.Dirname == "" { + return d + obj.Basename + } else if obj.Basename == "" { + return obj.Dirname + b + } else { // if obj.dirname != "" && obj.basename != "" { + return obj.Dirname + obj.Basename + } +} + // File watcher for files and directories // Modify with caution, probably important to write some test cases first! -// obj.Path: file or directory +// obj.GetPath(): file or directory func (obj *FileType) Watch() { + if obj.IsWatching() { + return + } + obj.SetWatching(true) + defer obj.SetWatching(false) + //var recursive bool = false - //var isdir = (obj.Path[len(obj.Path)-1:] == "/") // dirs have trailing slashes + //var isdir = (obj.GetPath()[len(obj.GetPath())-1:] == "/") // dirs have trailing slashes //fmt.Printf("IsDirectory: %v\n", isdir) //vertex := obj.GetVertex() // stored with SetVertex - var safename = path.Clean(obj.Path) // no trailing slash + var safename = path.Clean(obj.GetPath()) // no trailing slash watcher, err := fsnotify.NewWatcher() if err != nil { @@ -204,7 +245,7 @@ func (obj *FileType) HashSHA256fromContent() string { } func (obj *FileType) StateOK() bool { - if _, err := os.Stat(obj.Path); os.IsNotExist(err) { + if _, err := os.Stat(obj.GetPath()); os.IsNotExist(err) { // no such file or directory if obj.State == "absent" { return true // missing file should be missing, phew :) @@ -216,7 +257,7 @@ func (obj *FileType) StateOK() bool { // TODO: add file mode check here... - if PathIsDir(obj.Path) { + if PathIsDir(obj.GetPath()) { return obj.StateOKDir() } else { return obj.StateOKFile() @@ -224,7 +265,7 @@ func (obj *FileType) StateOK() bool { } func (obj *FileType) StateOKFile() bool { - if PathIsDir(obj.Path) { + if PathIsDir(obj.GetPath()) { log.Fatal("This should only be called on a File type.") } @@ -232,7 +273,7 @@ func (obj *FileType) StateOKFile() bool { hash := sha256.New() - f, err := os.Open(obj.Path) + f, err := os.Open(obj.GetPath()) if err != nil { //log.Fatal(err) return false @@ -255,7 +296,7 @@ func (obj *FileType) StateOKFile() bool { } func (obj *FileType) StateOKDir() bool { - if !PathIsDir(obj.Path) { + if !PathIsDir(obj.GetPath()) { log.Fatal("This should only be called on a Dir type.") } @@ -267,7 +308,7 @@ func (obj *FileType) StateOKDir() bool { func (obj *FileType) Apply() bool { fmt.Printf("Apply->File[%v]\n", obj.Name) - if PathIsDir(obj.Path) { + if PathIsDir(obj.GetPath()) { return obj.ApplyDir() } else { return obj.ApplyFile() @@ -276,13 +317,13 @@ func (obj *FileType) Apply() bool { func (obj *FileType) ApplyFile() bool { - if PathIsDir(obj.Path) { + if PathIsDir(obj.GetPath()) { log.Fatal("This should only be called on a File type.") } if obj.State == "absent" { - log.Printf("About to remove: %v\n", obj.Path) - err := os.Remove(obj.Path) + log.Printf("About to remove: %v\n", obj.GetPath()) + err := os.Remove(obj.GetPath()) if err != nil { return false } @@ -290,7 +331,7 @@ func (obj *FileType) ApplyFile() bool { } //fmt.Println("writing: " + filename) - f, err := os.Create(obj.Path) + f, err := os.Create(obj.GetPath()) if err != nil { log.Println("error:", err) return false @@ -307,7 +348,7 @@ func (obj *FileType) ApplyFile() bool { } func (obj *FileType) ApplyDir() bool { - if !PathIsDir(obj.Path) { + if !PathIsDir(obj.GetPath()) { log.Fatal("This should only be called on a Dir type.") } @@ -329,7 +370,7 @@ func (obj *FileType) compare(typ *FileType) bool { if obj.Name != typ.Name { return false } - if obj.Path != typ.Path { + if obj.GetPath() != typ.Path { return false } if obj.Content != typ.Content { diff --git a/main.go b/main.go index 88ad2cee..1eb8dc86 100644 --- a/main.go +++ b/main.go @@ -74,21 +74,61 @@ func run(c *cli.Context) { }() } + // initial etcd peer endpoint + seed := c.String("seed") + if seed == "" { + // XXX: start up etcd server, others will join me! + seed = "http://127.0.0.1:2379" // thus we use the local server! + } + // then, connect to `seed` as a client + + // FIXME: validate seed, or wait for it to fail in etcd init? + // etcd hostname := c.String("hostname") if hostname == "" { hostname, _ = os.Hostname() // etcd watch key // XXX: this is not the correct key name this is the set key name... WOOPS } - go func(hostname string) { + go func() { + startchan := make(chan struct{}) // start signal + go func() { startchan <- struct{}{} }() + file := c.String("file") + configchan := ConfigWatch(file) log.Printf("Starting etcd...\n") - kapi := EtcdGetKAPI() + kapi := EtcdGetKAPI(seed) + etcdchan := EtcdWatch(kapi) first := true // first loop or not - for x := range EtcdWatch(kapi, true) { + for { + select { + case _ = <-startchan: // kick the loop once at start + // pass + case msg := <-etcdchan: + switch msg { + // some types of messages we ignore... + case etcdFoo, etcdBar: + continue + // while others passthrough and cause a compile! + case etcdStart, etcdEvent: + // pass + default: + log.Fatal("Etcd: Unhandled message: %v", msg) + } + case msg := <-configchan: + if c.Bool("no-watch") || !msg { + continue // not ready to read config + } + + //case compile_event: XXX + } + + config := ParseConfigFromFile(file) + if config == nil { + log.Printf("Config parse failure") + continue + } // run graph vertex LOCK... - if !first { - log.Printf("Watcher().Node.Value(%v): %+v", hostname, x) - + if !first { // XXX: we can flatten this check out I think G.SetState(graphPausing) log.Printf("State: %v", G.State()) G.Pause() // sync @@ -97,10 +137,8 @@ func run(c *cli.Context) { } // build the graph from a config file - // build the graph on events (eg: from etcd) but kick it once... - if !UpdateGraphFromConfig(c.String("file"), hostname, G, kapi) { - log.Fatal("Graph failure") - } + // build the graph on events (eg: from etcd) + UpdateGraphFromConfig(config, hostname, G, kapi) log.Printf("Graph: %v\n", G) // show graph err := G.ExecGraphviz(c.String("graphviz-filter"), c.String("graphviz")) if err != nil { @@ -109,29 +147,20 @@ func run(c *cli.Context) { log.Printf("Graphviz: Successfully generated graph!") } G.SetVertex() - if first { - // G.Start(...) needs to be synchronous or wait, - // because if half of the nodes are started and - // some are not ready yet and the EtcdWatch - // loops, we'll cause G.Pause(...) before we - // even got going, thus causing nil pointer errors - G.SetState(graphStarting) - log.Printf("State: %v", G.State()) - G.Start(&wg) - G.SetState(graphStarted) - log.Printf("State: %v", G.State()) + // G.Start(...) needs to be synchronous or wait, + // because if half of the nodes are started and + // some are not ready yet and the EtcdWatch + // loops, we'll cause G.Pause(...) before we + // even got going, thus causing nil pointer errors + G.SetState(graphStarting) + log.Printf("State: %v", G.State()) + G.Start(&wg) // sync + G.SetState(graphStarted) + log.Printf("State: %v", G.State()) - } else { - G.SetState(graphContinuing) - log.Printf("State: %v", G.State()) - - G.Continue() // sync - G.SetState(graphStarted) - log.Printf("State: %v", G.State()) - } first = false } - }(hostname) + }() log.Println("Running...") @@ -175,6 +204,10 @@ func main() { Value: "", Usage: "graph definition to run", }, + cli.BoolFlag{ + Name: "no-watch", + Usage: "do not update graph on watched graph definition file changes", + }, cli.StringFlag{ Name: "code, c", Value: "", @@ -196,6 +229,12 @@ func main() { Value: "", Usage: "hostname to use", }, + // if empty, it will startup a new server + cli.StringFlag{ + Name: "seed, s", + Value: "", + Usage: "default etc peer endpoint", + }, cli.IntFlag{ Name: "exittime", Value: 0, diff --git a/misc.go b/misc.go index ebeaf772..6229ccd4 100644 --- a/misc.go +++ b/misc.go @@ -27,10 +27,21 @@ import ( // Similar to the GNU dirname command func Dirname(p string) string { + if p == "/" { + return "" + } d, _ := path.Split(path.Clean(p)) return d } +func Basename(p string) string { + _, b := path.Split(path.Clean(p)) + if p[len(p)-1:] == "/" { // don't loose the tail slash + b += "/" + } + return b +} + // Split a path into an array of tokens excluding any trailing empty tokens func PathSplit(p string) []string { return strings.Split(path.Clean(p), "/") diff --git a/misc_test.go b/misc_test.go index f4851cb0..4d5dea7a 100644 --- a/misc_test.go +++ b/misc_test.go @@ -32,9 +32,22 @@ func TestMiscT1(t *testing.T) { t.Errorf("Result is incorrect.") } - if Dirname("/") != "/" { + if Dirname("/") != "" { // TODO: should this equal "/" or "" ? t.Errorf("Result is incorrect.") } + + if Basename("/foo/bar/baz") != "baz" { + t.Errorf("Result is incorrect.") + } + + if Basename("/foo/bar/baz/") != "baz/" { + t.Errorf("Result is incorrect.") + } + + if Basename("/") != "/" { // TODO: should this equal "" or "/" ? + t.Errorf("Result is incorrect.") + } + } func TestMiscT2(t *testing.T) { diff --git a/omv.yaml b/omv.yaml index 10dfc287..5da7f75d 100644 --- a/omv.yaml +++ b/omv.yaml @@ -1,7 +1,7 @@ --- :domain: example.com :network: 192.168.123.0/24 -:image: centos-7.1 +:image: fedora-23 :cpus: '' :memory: '' :disks: 0 @@ -34,5 +34,6 @@ :reboot: false :unsafe: false :nested: false +:tests: [] :comment: '' :reallyrm: false diff --git a/pgraph.go b/pgraph.go index 31c5217e..a2ffccde 100644 --- a/pgraph.go +++ b/pgraph.go @@ -39,7 +39,6 @@ const ( graphStarted graphPausing graphPaused - graphContinuing ) // The graph abstract data type (ADT) is defined as follows: @@ -538,18 +537,20 @@ func HeisenbergCount(ch chan *Vertex) int { } // main kick to start the graph -func (g *Graph) Start(wg *sync.WaitGroup) { +func (g *Graph) Start(wg *sync.WaitGroup) { // start or continue t, _ := g.TopologicalSort() for _, v := range Reverse(t) { - wg.Add(1) - // must pass in value to avoid races... - // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ - go func(vv *Vertex) { - defer wg.Done() - vv.Type.Watch() - log.Printf("Finish: %v", vv.GetName()) - }(v) + if !v.Type.IsWatching() { // if Watch() is not running... + wg.Add(1) + // must pass in value to avoid races... + // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ + go func(vv *Vertex) { + defer wg.Done() + vv.Type.Watch() + log.Printf("Finish: %v", vv.GetName()) + }(v) + } // ensure state is started before continuing on to next vertex v.Type.SendEvent(eventStart, true) @@ -557,13 +558,6 @@ func (g *Graph) Start(wg *sync.WaitGroup) { } } -func (g *Graph) Continue() { - t, _ := g.TopologicalSort() - for _, v := range Reverse(t) { - v.Type.SendEvent(eventContinue, true) - } -} - func (g *Graph) Pause() { t, _ := g.TopologicalSort() for _, v := range t { // squeeze out the events... diff --git a/service.go b/service.go index 1576143a..87a1f015 100644 --- a/service.go +++ b/service.go @@ -51,6 +51,12 @@ func (obj *ServiceType) GetType() string { // Service watcher func (obj *ServiceType) Watch() { + if obj.IsWatching() { + return + } + obj.SetWatching(true) + defer obj.SetWatching(false) + // obj.Name: service name //vertex := obj.GetVertex() // stored with SetVertex if !util.IsRunningSystemd() { diff --git a/types.go b/types.go index 5cf33529..05ef1f48 100644 --- a/types.go +++ b/types.go @@ -33,6 +33,8 @@ type Type interface { SetVertex(*Vertex) Compare(Type) bool SendEvent(eventName, bool) + IsWatching() bool + SetWatching(bool) GetTimestamp() int64 UpdateTimestamp() int64 //Process() @@ -43,6 +45,7 @@ type BaseType struct { timestamp int64 // last updated timestamp ? events chan Event vertex *Vertex + watching bool // is Watch() loop running ? } type NoopType struct { @@ -84,6 +87,16 @@ func (obj *BaseType) SetVertex(v *Vertex) { obj.vertex = v } +// is the Watch() function running? +func (obj *BaseType) IsWatching() bool { + return obj.watching +} + +// store status of if the Watch() function is running +func (obj *BaseType) SetWatching(b bool) { + obj.watching = b +} + // get timestamp of a vertex func (obj *BaseType) GetTimestamp() int64 { return obj.timestamp @@ -160,7 +173,7 @@ func (obj *BaseType) ReadEvent(event *Event) bool { e.ACK() if e.Name == eventExit { return false - } else if e.Name == eventContinue { + } else if e.Name == eventStart { // eventContinue return true } else { log.Fatal("Unknown event: ", e) @@ -208,6 +221,12 @@ func (obj *NoopType) GetType() string { } func (obj *NoopType) Watch() { + if obj.IsWatching() { + return + } + obj.SetWatching(true) + defer obj.SetWatching(false) + //vertex := obj.vertex // stored with SetVertex var send = false // send event? for {