From 6b4fa2107419229c2ce58b34171c80c3bbaf2d9f Mon Sep 17 00:00:00 2001 From: James Shubin Date: Thu, 10 Dec 2015 03:34:51 -0500 Subject: [PATCH] Mega patch This is still a dirty prototype, so please excuse the mess. Please excuse the fact that this is a mega patch. Once things settle down this won't happen any more. Some of the changes squashed into here include: * Merge vertex loop with type loop (The file watcher seems to cache events anyways) * Improve pgraph library * Add indegree, outdegree, and topological sort with tests * Add reverse function for vertex list * Tons of additional cleanup! Amazingly, on my first successful compile, this seemed to run! A special thanks to Ira Cooper who helped me talk through some of the algorithmic decisions and for his help in finding better ones! --- .gitignore | 2 + Makefile | 3 +- config.go | 133 +++++++++---- etcd.go | 217 ++++++++++++++++++++ event.go | 36 +++- examples/graph3a.yaml | 44 +++++ examples/graph3b.yaml | 44 +++++ examples/graph4.yaml | 18 ++ examples/graph5.yaml | 13 ++ examples/graph6.yaml | 6 + examples/purpleidea.service | 8 + file.go | 115 ++++++----- main.go | 90 ++++++--- misc.go | 33 ++++ misc_test.go | 52 +++++ pgraph.go | 382 ++++++++++++++++++++++-------------- pgraph_test.go | 273 ++++++++++++++++++++++---- service.go | 80 ++++---- test/test-yamlfmt.sh | 2 +- types.go | 223 ++++++++++++++++++--- 20 files changed, 1411 insertions(+), 363 deletions(-) create mode 100644 etcd.go create mode 100644 examples/graph3a.yaml create mode 100644 examples/graph3b.yaml create mode 100644 examples/graph4.yaml create mode 100644 examples/graph5.yaml create mode 100644 examples/graph6.yaml create mode 100644 examples/purpleidea.service diff --git a/.gitignore b/.gitignore index d20706f8..210f4188 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ mgmt-documentation.pdf old/ tmp/ +*_stringer.go +mgmt diff --git a/Makefile b/Makefile index 59acea4d..b0450201 100644 --- a/Makefile +++ b/Makefile @@ -24,6 +24,7 @@ race: build: mgmt mgmt: main.go + go generate go build -ldflags "-X main.version=$(VERSION) -X main.program=$(PROGRAM)" clean: @@ -34,7 +35,7 @@ test: format: find -type f -name '*.go' -not -path './old/*' -not -path './tmp/*' -exec gofmt -w {} \; - find -type f -name '*.yaml' -not -path './old/*' -not -path './tmp/*' -not -path './omv.yaml' -exec ruby -e "require 'yaml'; x=YAML.load_file('{}').to_yaml; File.open('{}', 'w').write x" \; + find -type f -name '*.yaml' -not -path './old/*' -not -path './tmp/*' -not -path './omv.yaml' -exec ruby -e "require 'yaml'; x=YAML.load_file('{}').to_yaml.each_line.map(&:rstrip).join(10.chr)+10.chr; File.open('{}', 'w').write x" \; docs: mgmt-documentation.pdf diff --git a/config.go b/config.go index 8a3f6606..4ed35c72 100644 --- a/config.go +++ b/config.go @@ -18,27 +18,19 @@ package main import ( + //etcd_context "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + etcd "github.com/coreos/etcd/client" + "errors" "gopkg.in/yaml.v2" "io/ioutil" "log" + "strings" ) -type noopTypeConfig struct { - Name string `yaml:"name"` -} - -type fileTypeConfig struct { - Name string `yaml:"name"` - Path string `yaml:"path"` - Content string `yaml:"content"` - State string `yaml:"state"` -} - -type serviceTypeConfig struct { - Name string `yaml:"name"` - State string `yaml:"state"` - Startup string `yaml:"startup"` +type collectorTypeConfig struct { + Type string `yaml:"type"` + Pattern string `yaml:"pattern"` // XXX: Not Implemented } type vertexConfig struct { @@ -55,12 +47,13 @@ type edgeConfig struct { type graphConfig struct { Graph string `yaml:"graph"` Types struct { - Noop []noopTypeConfig `yaml:"noop"` - File []fileTypeConfig `yaml:"file"` - Service []serviceTypeConfig `yaml:"service"` + Noop []NoopType `yaml:"noop"` + File []FileType `yaml:"file"` + Service []ServiceType `yaml:"service"` } `yaml:"types"` - Edges []edgeConfig `yaml:"edges"` - Comment string `yaml:"comment"` + Collector []collectorTypeConfig `yaml:"collect"` + Edges []edgeConfig `yaml:"edges"` + Comment string `yaml:"comment"` } func (c *graphConfig) Parse(data []byte) error { @@ -73,7 +66,7 @@ func (c *graphConfig) Parse(data []byte) error { return nil } -func GraphFromConfig(filename string) *Graph { +func UpdateGraphFromConfig(filename, hostname string, g *Graph, kapi etcd.KeysAPI) bool { var NoopMap map[string]*Vertex = make(map[string]*Vertex) var FileMap map[string]*Vertex = make(map[string]*Vertex) @@ -87,40 +80,108 @@ func GraphFromConfig(filename string) *Graph { data, err := ioutil.ReadFile(filename) if err != nil { log.Fatal(err) + return false } var config graphConfig if err := config.Parse(data); err != nil { log.Fatal(err) + return false } - //fmt.Printf("%+v\n", config) // debug + //fmt.Printf("%+v\n", config) // debug - g := NewGraph(config.Graph) + g.SetName(config.Graph) // set graph name + + var keep []*Vertex // list of vertex which are the same in new graph for _, t := range config.Types.Noop { - NoopMap[t.Name] = NewVertex(t.Name, "noop") - // FIXME: duplicate of name stored twice... where should it go? - NoopMap[t.Name].Associate(NewNoopType(t.Name)) - g.AddVertex(NoopMap[t.Name]) // call standalone in case not part of an edge + obj := NewNoopType(t.Name) + v := g.GetVertexMatch(obj) + if v == nil { // no match found + v = NewVertex(obj) + g.AddVertex(v) // call standalone in case not part of an edge + } + NoopMap[obj.Name] = v // used for constructing edges + keep = append(keep, v) // append } for _, t := range config.Types.File { - FileMap[t.Name] = NewVertex(t.Name, "file") - // FIXME: duplicate of name stored twice... where should it go? - FileMap[t.Name].Associate(NewFileType(t.Name, t.Path, t.Content, t.State)) - g.AddVertex(FileMap[t.Name]) // call standalone in case not part of an edge + // XXX: should we export based on a @@ prefix, or a metaparam + // like exported => true || exported => (host pattern)||(other pattern?) + if strings.HasPrefix(t.Name, "@@") { // exported resource + // add to etcd storage... + t.Name = t.Name[2:] //slice off @@ + if !EtcdPut(kapi, hostname, t.Name, "file", t) { + log.Printf("Problem exporting file resource %v.", t.Name) + continue + } + } else { + obj := NewFileType(t.Name, t.Path, t.Content, t.State) + v := g.GetVertexMatch(obj) + if v == nil { // no match found + v = NewVertex(obj) + g.AddVertex(v) // call standalone in case not part of an edge + } + FileMap[obj.Name] = v // used for constructing edges + keep = append(keep, v) // append + } } for _, t := range config.Types.Service { - ServiceMap[t.Name] = NewVertex(t.Name, "service") - // FIXME: duplicate of name stored twice... where should it go? - ServiceMap[t.Name].Associate(NewServiceType(t.Name, t.State, t.Startup)) - g.AddVertex(ServiceMap[t.Name]) // call standalone in case not part of an edge + obj := NewServiceType(t.Name, t.State, t.Startup) + v := g.GetVertexMatch(obj) + if v == nil { // no match found + v = NewVertex(obj) + g.AddVertex(v) // call standalone in case not part of an edge + } + ServiceMap[obj.Name] = v // used for constructing edges + keep = append(keep, v) // append + } + + // lookup from etcd graph + // do all the graph look ups in one single step, so that if the etcd + // database changes, we don't have a partial state of affairs... + nodes, ok := EtcdGet(kapi) + if ok { + for _, t := range config.Collector { + // XXX: use t.Type and optionally t.Pattern to collect from etcd storage + log.Printf("Collect: %v(%v)", t.Type, t.Pattern) + + for _, x := range EtcdGetProcess(nodes, "file") { + var obj *FileType + if B64ToObj(x, &obj) != true { + log.Printf("File: %v error!", x) + continue + } + log.Printf("File: %v found!", obj.GetName()) + + // XXX: similar to file add code: + v := g.GetVertexMatch(obj) + if v == nil { // no match found + obj.Init() // initialize go channels or things won't work!!! + v = NewVertex(obj) + g.AddVertex(v) // call standalone in case not part of an edge + } + FileMap[obj.GetName()] = v // used for constructing edges + keep = append(keep, v) // append + + } + + } + } + + // get rid of any vertices we shouldn't "keep" (that aren't in new graph) + for _, v := range g.GetVertices() { + if !HasVertex(v, keep) { + // wait for exit before starting new graph! + v.Type.SendEvent(eventExit, true) + g.DeleteVertex(v) + } } for _, e := range config.Edges { g.AddEdge(lookup[e.From.Type][e.From.Name], lookup[e.To.Type][e.To.Name], NewEdge(e.Name)) } - return g + return true } diff --git a/etcd.go b/etcd.go new file mode 100644 index 00000000..4983d573 --- /dev/null +++ b/etcd.go @@ -0,0 +1,217 @@ +// Mgmt +// Copyright (C) 2013-2015+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "fmt" + etcd_context "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + etcd "github.com/coreos/etcd/client" + "log" + "math" + "strings" + "time" +) + +func EtcdGetKAPI() etcd.KeysAPI { + + cfg := etcd.Config{ + Endpoints: []string{"http://127.0.0.1:2379"}, + Transport: etcd.DefaultTransport, + // set timeout per request to fail fast when the target endpoint is unavailable + HeaderTimeoutPerRequest: time.Second, + } + + var c etcd.Client + var err error + + c, err = etcd.New(cfg) + if err != nil { + // XXX: not sure if this ever errors + if cerr, ok := err.(*etcd.ClusterError); ok { + // XXX: not sure if this part ever matches + // not running or disconnected + if cerr == etcd.ErrClusterUnavailable { + log.Fatal("XXX: etcd: ErrClusterUnavailable") + } else { + log.Fatal("XXX: etcd: Unknown") + } + } + log.Fatal(err) // some unhandled error + } + return etcd.NewKeysAPI(c) +} + +func EtcdWatch(kapi etcd.KeysAPI, kick bool) chan string { + // XXX: i think we need this buffered so that when we're hanging on the + // channel, which is inside the EtcdWatch main loop, we still want the + // calls to Get/Set on etcd to succeed, so blocking them here would + // kill the whole thing + ch := make(chan string, 1) // XXX: buffer of at least 1 is required + if kick { + ch <- "hello" + } + go func(ch chan string) { + tmin := 500 // initial (min) delay in ms + t := tmin // current time + tmult := 2 // multiplier for exponential delay + tmax := 16000 // max delay + watcher := kapi.Watcher("/exported/", &etcd.WatcherOptions{Recursive: true}) + for { + log.Printf("Watching etcd...") + resp, err := watcher.Next(etcd_context.Background()) + if err != nil { + if err == etcd_context.Canceled { + // ctx is canceled by another routine + log.Fatal("Canceled") + } else if err == etcd_context.DeadlineExceeded { + // ctx is attached with a deadline and it exceeded + log.Fatal("Deadline") + } else if cerr, ok := err.(*etcd.ClusterError); ok { + // not running or disconnected + // TODO: is there a better way to parse errors? + for _, e := range cerr.Errors { + if strings.HasSuffix(e.Error(), "getsockopt: connection refused") { + t = int(math.Min(float64(t*tmult), float64(tmax))) + log.Printf("Waiting %d ms for etcd...", t) + time.Sleep(time.Duration(t) * time.Millisecond) // sleep for t ms + } + } + } else { + // bad cluster endpoints, which are not etcd servers + log.Fatal("Woops: ", err) + } + } else { + //log.Print(resp) + //log.Printf("Watcher().Node.Value(%v): %+v", key, resp.Node.Value) + // FIXME: we should actually reset when the server comes back, not here on msg! + //XXX: can we fix this with one of these patterns?: https://blog.golang.org/go-concurrency-patterns-timing-out-and + t = tmin // reset timer + + // don't trigger event if nothing changed + if n, p := resp.Node, resp.PrevNode; resp.Action == "set" && p != nil { + if n.Key == p.Key && n.Value == p.Value { + continue + } + } + + // FIXME: we get events on key/type/value changes for + // each type directory... ignore the non final ones... + // IOW, ignore everything except for the value or some + // field which gets set last... this could be the max count field thing... + + ch <- resp.Node.Value // event + } + + } // end for loop + close(ch) + }(ch) // call go routine + return ch +} + +// helper function to store our data in etcd +func EtcdPut(kapi etcd.KeysAPI, hostname, key, typ string, obj interface{}) bool { + output, ok := ObjToB64(obj) + if !ok { + log.Printf("Could not encode %v for etcd.", key) + return false + } + + path := fmt.Sprintf("/exported/%s/types/%s/type", hostname, key) + _, err := kapi.Set(etcd_context.Background(), path, typ, nil) + // XXX validate... + + path = fmt.Sprintf("/exported/%s/types/%s/value", hostname, key) + resp, err := kapi.Set(etcd_context.Background(), path, output, nil) + if err != nil { + if cerr, ok := err.(*etcd.ClusterError); ok { + // not running or disconnected + for _, e := range cerr.Errors { + if strings.HasSuffix(e.Error(), "getsockopt: connection refused") { + } + //if e == etcd.ErrClusterUnavailable + } + } + log.Printf("Could not store %v in etcd.", key) + return false + } + log.Print("Etcd: ", resp) // w00t... bonus + return true +} + +// lookup /exported/ node hierarchy +func EtcdGet(kapi etcd.KeysAPI) (etcd.Nodes, bool) { + + // key structure is /exported//types/... + resp, err := kapi.Get(etcd_context.Background(), "/exported/", &etcd.GetOptions{Recursive: true}) + if err != nil { + return nil, false // not found + } + return resp.Node.Nodes, true +} + +func EtcdGetProcess(nodes etcd.Nodes, typ string) []string { + + //path := fmt.Sprintf("/exported/%s/types/", h) + top := "/exported/" + log.Printf("Etcd: Get: %+v", nodes) // Get().Nodes.Nodes + output := make([]string, 0) + + for _, x := range nodes { // loop through hosts + if !strings.HasPrefix(x.Key, top) { + log.Fatal("Error!") + } + host := x.Key[len(top):] + //log.Printf("Get().Nodes[%v]: %+v ==> %+v", -1, host, x.Nodes) + //log.Printf("Get().Nodes[%v]: %+v ==> %+v", i, x.Key, x.Nodes) + types, ok := EtcdGetChildNodeByKey(x, "types") + if !ok { + continue + } + for _, y := range types.Nodes { // loop through types + //key := y.Key # UUID? + //log.Printf("Get(%v): TYPE[%v]", host, y.Key) + t, ok := EtcdGetChildNodeByKey(y, "type") + if !ok { + continue + } + if typ != "" && typ != t.Value { + continue + } // filter based on type + + v, ok := EtcdGetChildNodeByKey(y, "value") // B64ToObj this + if !ok { + continue + } + log.Printf("Etcd: Hostname: %v; Get: %v", host, t.Value) + + output = append(output, v.Value) + } + } + return output +} + +// TODO: wrap this somehow so it's a method of *etcd.Node +// helper function that returns the node for a particular key under a node +func EtcdGetChildNodeByKey(node *etcd.Node, key string) (*etcd.Node, bool) { + for _, x := range node.Nodes { + if x.Key == fmt.Sprintf("%s/%s", node.Key, key) { + return x, true + } + } + return nil, false // not found +} diff --git a/event.go b/event.go index 9f026314..0aa34e0b 100644 --- a/event.go +++ b/event.go @@ -17,20 +17,36 @@ package main -import ( - "code.google.com/p/go-uuid/uuid" +//go:generate stringer -type=eventName -output=eventname_stringer.go +type eventName int + +const ( + eventExit eventName = iota + eventStart + eventPause + eventContinue + eventPoke + eventChanged + //eventPaused + eventStarted ) type Event struct { - uuid string - Name string - Type string + Name eventName + Resp chan bool // channel to send an ack response on, nil to skip + //Wg *sync.WaitGroup // receiver barrier to Wait() for everyone else on + Msg string // some words for fun } -func NewEvent(name, t string) *Event { - return &Event{ - uuid: uuid.New(), - Name: name, - Type: t, +// send a single acknowledgement on the channel if one was requested +func (event *Event) ACK() { + if event.Resp != nil { // if they've requested an ACK + event.Resp <- true // send ACK + } +} + +func (event *Event) NACK() { + if event.Resp != nil { // if they've requested an ACK + event.Resp <- false // send NACK } } diff --git a/examples/graph3a.yaml b/examples/graph3a.yaml new file mode 100644 index 00000000..05068129 --- /dev/null +++ b/examples/graph3a.yaml @@ -0,0 +1,44 @@ +--- +graph: mygraph +types: + noop: + - name: noop1 + file: + - name: file1 + path: /tmp/mgmt/f1 + content: | + i am f1 + state: exists + - name: file2 + path: /tmp/mgmt/f2 + content: | + i am f2 + state: exists + - name: '@@file3' + path: /tmp/mgmt/f3 + content: | + i am f3, exported from host A + state: exists + - name: '@@file4' + path: /tmp/mgmt/f4 + content: | + i am f4, exported from host A + state: exists +collect: +- type: file + pattern: '' +edges: +- name: e1 + from: + type: noop + name: noop1 + to: + type: file + name: file1 +- name: e2 + from: + type: file + name: file1 + to: + type: file + name: file2 diff --git a/examples/graph3b.yaml b/examples/graph3b.yaml new file mode 100644 index 00000000..04a51f8a --- /dev/null +++ b/examples/graph3b.yaml @@ -0,0 +1,44 @@ +--- +graph: mygraph +types: + noop: + - name: noop1 + file: + - name: file1 + path: /tmp/mgmt/f1 + content: | + i am f1 + state: exists + - name: file2 + path: /tmp/mgmt/f2 + content: | + i am f2 + state: exists + - name: '@@file3' + path: /tmp/mgmt/f3 + content: | + i am f3, exported from host B + state: exists + - name: '@@file4' + path: /tmp/mgmt/f4 + content: | + i am f4, exported from host B + state: exists +collect: +- type: file + pattern: '' +edges: +- name: e1 + from: + type: noop + name: noop1 + to: + type: file + name: file1 +- name: e2 + from: + type: file + name: file1 + to: + type: file + name: file2 diff --git a/examples/graph4.yaml b/examples/graph4.yaml new file mode 100644 index 00000000..6cd26139 --- /dev/null +++ b/examples/graph4.yaml @@ -0,0 +1,18 @@ +--- +graph: mygraph +types: + file: + - name: file1 + path: /tmp/mgmt/f1 + content: | + i am f1 + state: exists + - name: '@@file3' + path: /tmp/mgmt/f3 + content: | + i am f3, exported from host A + state: exists +collect: +- type: file + pattern: '' +edges: diff --git a/examples/graph5.yaml b/examples/graph5.yaml new file mode 100644 index 00000000..66f90dd2 --- /dev/null +++ b/examples/graph5.yaml @@ -0,0 +1,13 @@ +--- +graph: mygraph +types: + file: + - name: file1 + path: /tmp/mgmt/f1 + content: | + i am f1 + state: exists +collect: +- type: file + pattern: '' +edges: diff --git a/examples/graph6.yaml b/examples/graph6.yaml new file mode 100644 index 00000000..82ff26e8 --- /dev/null +++ b/examples/graph6.yaml @@ -0,0 +1,6 @@ +--- +graph: mygraph +types: + noop: + - name: noop1 +edges: diff --git a/examples/purpleidea.service b/examples/purpleidea.service new file mode 100644 index 00000000..ca6d6005 --- /dev/null +++ b/examples/purpleidea.service @@ -0,0 +1,8 @@ +[Unit] +Description=Fake service for testing + +[Service] +ExecStart=/usr/bin/sleep 8h + +[Install] +WantedBy=multi-user.target diff --git a/file.go b/file.go index 3c8fc9fc..ce7423ae 100644 --- a/file.go +++ b/file.go @@ -18,7 +18,6 @@ package main import ( - "code.google.com/p/go-uuid/uuid" "crypto/sha256" "encoding/hex" "fmt" @@ -34,23 +33,21 @@ import ( ) type FileType struct { - uuid string - Type string // always "file" - Name string // name variable - Events chan string // FIXME: eventually a struct for the event? - Path string // path variable (should default to name) - Content string - State string // state: exists/present?, absent, (undefined?) + BaseType `yaml:",inline"` + Path string `yaml:"path"` // path variable (should default to name) + Content string `yaml:"content"` + State string `yaml:"state"` // state: exists/present?, absent, (undefined?) sha256sum string } func NewFileType(name, path, content, state string) *FileType { // FIXME if path = nil, path = name ... return &FileType{ - uuid: uuid.New(), - Type: "file", - Name: name, - Events: make(chan string, 1), // XXX: chan size? + BaseType: BaseType{ + Name: name, + events: make(chan Event), + vertex: nil, + }, Path: path, Content: content, State: state, @@ -60,12 +57,12 @@ func NewFileType(name, path, content, state string) *FileType { // File watcher for files and directories // Modify with caution, probably important to write some test cases first! -func (obj FileType) Watch(v *Vertex) { - // obj.Path: file or directory +// obj.Path: file or directory +func (obj *FileType) Watch() { //var recursive bool = false //var isdir = (obj.Path[len(obj.Path)-1:] == "/") // dirs have trailing slashes //fmt.Printf("IsDirectory: %v\n", isdir) - + //vertex := obj.GetVertex() // stored with SetVertex var safename = path.Clean(obj.Path) // no trailing slash watcher, err := fsnotify.NewWatcher() @@ -79,7 +76,6 @@ func (obj FileType) Watch(v *Vertex) { var current string // current "watcher" location var delta_depth int // depth delta between watcher and event var send = false // send event? - var extraCheck = false for { current = strings.Join(patharray[0:index], "/") @@ -94,9 +90,9 @@ func (obj FileType) Watch(v *Vertex) { if err == syscall.ENOENT { index-- // usually not found, move up one dir } else if err == syscall.ENOSPC { - // XXX: i sometimes see: no space left on device - // XXX: why causes this to happen ? - log.Printf("Strange file[%v] error: %+v\n", obj.Name, err.Error) // 0x408da0 + // XXX: occasionally: no space left on device, + // XXX: probably due to lack of inotify watches + log.Printf("Lack of watches for file[%v] error: %+v\n", obj.Name, err.Error) // 0x408da0 log.Fatal(err) } else { log.Printf("Unknown file[%v] error:\n", obj.Name) @@ -106,19 +102,6 @@ func (obj FileType) Watch(v *Vertex) { continue } - // XXX: check state after inotify started - // SMALL RACE: after we terminate watch, till when it's started - // something could have gotten created/changed/etc... right? - if extraCheck { - extraCheck = false - // XXX - //if exists ... { - // send signal - // continue - // change index? i don't think so. be thorough and check - //} - } - select { case event := <-watcher.Events: // the deeper you go, the bigger the delta_depth is... @@ -134,15 +117,10 @@ func (obj FileType) Watch(v *Vertex) { delta_depth = len(PathSplit(event.Name)) - len(PathSplit(current)) // +1 or more } else { - // XXX multiple watchers receive each others events + // TODO different watchers get each others events! // https://github.com/go-fsnotify/fsnotify/issues/95 // this happened with two values such as: // event.Name: /tmp/mgmt/f3 and current: /tmp/mgmt/f2 - // are the different watchers getting each others events?? - //log.Printf("The delta depth is NaN...\n") - //log.Printf("Value of event.Name is: %v\n", event.Name) - //log.Printf("........ current is: %v\n", current) - //log.Fatal("The delta depth is NaN!") continue } //log.Printf("The delta depth is: %v\n", delta_depth) @@ -193,32 +171,24 @@ func (obj FileType) Watch(v *Vertex) { case err := <-watcher.Errors: log.Println("error:", err) log.Fatal(err) - v.Events <- fmt.Sprintf("file: %v", "error") + //obj.events <- fmt.Sprintf("file: %v", "error") // XXX: how should we handle errors? - case exit := <-obj.Events: - if exit == "exit" { - return - } else { - log.Fatal("Unknown event: %v\n", exit) + case event := <-obj.events: + if ok := obj.ReadEvent(&event); !ok { + return // exit } + send = true } // do all our event sending all together to avoid duplicate msgs if send { send = false - //log.Println("Sending event!") - //v.Events <- fmt.Sprintf("file(%v): %v", obj.Path, event.Op) - v.Events <- fmt.Sprintf("file(%v): %v", obj.Path, "event!") // FIXME: use struct + obj.Process(obj) // XXX: rename this function } } } -func (obj FileType) Exit() bool { - obj.Events <- "exit" - return true -} - -func (obj FileType) HashSHA256fromContent() string { +func (obj *FileType) HashSHA256fromContent() string { if obj.sha256sum != "" { // return if already computed return obj.sha256sum } @@ -229,7 +199,7 @@ func (obj FileType) HashSHA256fromContent() string { return obj.sha256sum } -func (obj FileType) StateOK() bool { +func (obj *FileType) StateOK() bool { if _, err := os.Stat(obj.Path); os.IsNotExist(err) { // no such file or directory if obj.State == "absent" { @@ -249,7 +219,7 @@ func (obj FileType) StateOK() bool { } } -func (obj FileType) StateOKFile() bool { +func (obj *FileType) StateOKFile() bool { if PathIsDir(obj.Path) { log.Fatal("This should only be called on a File type.") } @@ -280,7 +250,7 @@ func (obj FileType) StateOKFile() bool { return false } -func (obj FileType) StateOKDir() bool { +func (obj *FileType) StateOKDir() bool { if !PathIsDir(obj.Path) { log.Fatal("This should only be called on a Dir type.") } @@ -290,8 +260,8 @@ func (obj FileType) StateOKDir() bool { return false } -func (obj FileType) Apply() bool { - fmt.Printf("Apply->%v[%v]\n", obj.Type, obj.Name) +func (obj *FileType) Apply() bool { + fmt.Printf("Apply->File[%v]\n", obj.Name) if PathIsDir(obj.Path) { return obj.ApplyDir() @@ -300,7 +270,7 @@ func (obj FileType) Apply() bool { } } -func (obj FileType) ApplyFile() bool { +func (obj *FileType) ApplyFile() bool { if PathIsDir(obj.Path) { log.Fatal("This should only be called on a File type.") @@ -332,7 +302,7 @@ func (obj FileType) ApplyFile() bool { return true } -func (obj FileType) ApplyDir() bool { +func (obj *FileType) ApplyDir() bool { if !PathIsDir(obj.Path) { log.Fatal("This should only be called on a Dir type.") } @@ -341,3 +311,28 @@ func (obj FileType) ApplyDir() bool { log.Fatal("Not implemented!") return true } + +func (obj *FileType) Compare(typ Type) bool { + switch typ.(type) { + case *FileType: + return obj.compare(typ.(*FileType)) + default: + return false + } +} + +func (obj *FileType) compare(typ *FileType) bool { + if obj.Name != typ.Name { + return false + } + if obj.Path != typ.Path { + return false + } + if obj.Content != typ.Content { + return false + } + if obj.State != typ.State { + return false + } + return true +} diff --git a/main.go b/main.go index 3b92fcfc..7046f690 100644 --- a/main.go +++ b/main.go @@ -63,6 +63,8 @@ func run(c *cli.Context) { var wg sync.WaitGroup exit := make(chan bool) // exit signal log.Printf("This is: %v, version: %v\n", program, version) + log.Printf("Start: %v\n", start) + G := NewGraph("Graph") // give graph a default name // exit after `exittime` seconds for no reason at all... if i := c.Int("exittime"); i > 0 { @@ -72,29 +74,58 @@ func run(c *cli.Context) { }() } - // build the graph from a config file - G := GraphFromConfig(c.String("file")) - log.Printf("Graph: %v\n", G) // show graph - - log.Printf("Start: %v\n", start) - - for x := range G.GetVerticesChan() { // XXX ? - log.Printf("Main->Starting[%v]\n", x.Name) - - wg.Add(1) - // must pass in value to avoid races... - // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ - go func(v *Vertex) { - defer wg.Done() - v.Start() - log.Printf("Main->Finish[%v]\n", v.Name) - }(x) - - // generate a startup "poke" so that an initial check happens - go func(v *Vertex) { - v.Events <- fmt.Sprintf("Startup(%v)", v.Name) - }(x) + // etcd + hostname := c.String("hostname") + if hostname == "" { + hostname, _ = os.Hostname() // etcd watch key // XXX: this is not the correct key name this is the set key name... WOOPS } + go func(hostname string) { + log.Printf("Starting etcd...\n") + kapi := EtcdGetKAPI() + first := true // first loop or not + for x := range EtcdWatch(kapi, true) { + + // run graph vertex LOCK... + if !first { + log.Printf("Watcher().Node.Value(%v): %+v", hostname, x) + + G.SetState(graphPausing) + log.Printf("State: %v", G.State()) + G.Pause() // sync + G.SetState(graphPaused) + log.Printf("State: %v", G.State()) + } + + // build the graph from a config file + // build the graph on events (eg: from etcd) but kick it once... + if !UpdateGraphFromConfig(c.String("file"), hostname, G, kapi) { + log.Fatal("Graph failure") + } + log.Printf("Graph: %v\n", G) // show graph + G.SetVertex() + if first { + // G.Start(...) needs to be synchronous or wait, + // because if half of the nodes are started and + // some are not ready yet and the EtcdWatch + // loops, we'll cause G.Pause(...) before we + // even got going, thus causing nil pointer errors + G.SetState(graphStarting) + log.Printf("State: %v", G.State()) + G.Start(&wg) + G.SetState(graphStarted) + log.Printf("State: %v", G.State()) + + } else { + G.SetState(graphContinuing) + log.Printf("State: %v", G.State()) + + G.Continue() // sync + G.SetState(graphStarted) + log.Printf("State: %v", G.State()) + } + first = false + } + }(hostname) log.Println("Running...") @@ -116,6 +147,10 @@ func run(c *cli.Context) { } func main() { + //if DEBUG { + log.SetFlags(log.LstdFlags | log.Lshortfile) + //} + log.SetFlags(log.Flags() - log.Ldate) // remove the date for now app := cli.NewApp() app.Name = program app.Usage = "next generation config management" @@ -134,6 +169,17 @@ func main() { Value: "", Usage: "graph definition to run", }, + cli.StringFlag{ + Name: "code, c", + Value: "", + Usage: "code definition to run", + }, + // useful for testing multiple instances on same machine + cli.StringFlag{ + Name: "hostname", + Value: "", + Usage: "hostname to use", + }, cli.IntFlag{ Name: "exittime", Value: 0, diff --git a/misc.go b/misc.go index eff0355b..ebeaf772 100644 --- a/misc.go +++ b/misc.go @@ -18,6 +18,9 @@ package main import ( + "bytes" + "encoding/base64" + "encoding/gob" "path" "strings" ) @@ -66,3 +69,33 @@ func PathPrefixDelta(p, prefix string) int { func PathIsDir(p string) bool { return p[len(p)-1:] == "/" // a dir has a trailing slash in this context } + +// encode an object as base 64, serialize and then base64 encode +func ObjToB64(obj interface{}) (string, bool) { + b := bytes.Buffer{} + e := gob.NewEncoder(&b) + err := e.Encode(obj) + if err != nil { + //log.Println("Gob failed to Encode: ", err) + return "", false + } + return base64.StdEncoding.EncodeToString(b.Bytes()), true +} + +// TODO: is it possible to somehow generically just return the obj? +// decode an object into the waiting obj which you pass a reference to +func B64ToObj(str string, obj interface{}) bool { + bb, err := base64.StdEncoding.DecodeString(str) + if err != nil { + //log.Println("Base64 failed to Decode: ", err) + return false + } + b := bytes.NewBuffer(bb) + d := gob.NewDecoder(b) + err = d.Decode(obj) + if err != nil { + //log.Println("Gob failed to Decode: ", err) + return false + } + return true +} diff --git a/misc_test.go b/misc_test.go index 4cac564d..f4851cb0 100644 --- a/misc_test.go +++ b/misc_test.go @@ -18,6 +18,7 @@ package main import ( + "fmt" "testing" ) @@ -133,3 +134,54 @@ func TestMiscT5(t *testing.T) { t.Errorf("Result should be true.") } } + +func TestMiscT6(t *testing.T) { + + type foo struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + Value int `yaml:"value"` + } + + obj := foo{"dude", "sweet", 42} + output, ok := ObjToB64(obj) + if ok != true { + t.Errorf("First result should be true.") + } + var data foo + if B64ToObj(output, &data) != true { + t.Errorf("Second result should be true.") + } + // TODO: there is probably a better way to compare these two... + if fmt.Sprintf("%+v\n", obj) != fmt.Sprintf("%+v\n", data) { + t.Errorf("Strings should match.") + } +} + +func TestMiscT7(t *testing.T) { + + type Foo struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + Value int `yaml:"value"` + } + + type bar struct { + Foo `yaml:",inline"` // anonymous struct must be public! + Comment string `yaml:"comment"` + } + + obj := bar{Foo{"dude", "sweet", 42}, "hello world"} + output, ok := ObjToB64(obj) + if ok != true { + t.Errorf("First result should be true.") + } + var data bar + if B64ToObj(output, &data) != true { + t.Errorf("Second result should be true.") + } + // TODO: there is probably a better way to compare these two... + if fmt.Sprintf("%+v\n", obj) != fmt.Sprintf("%+v\n", data) { + t.Errorf("Strings should match.") + } +} diff --git a/pgraph.go b/pgraph.go index 91bc44c8..7862671f 100644 --- a/pgraph.go +++ b/pgraph.go @@ -19,69 +19,93 @@ package main import ( - "code.google.com/p/go-uuid/uuid" //"container/list" // doubly linked list "fmt" "log" "sync" - "time" +) + +//go:generate stringer -type=graphState -output=graphstate_stringer.go +type graphState int + +const ( + graphNil graphState = iota + graphStarting + graphStarted + graphPausing + graphPaused + graphContinuing ) // The graph abstract data type (ADT) is defined as follows: -// NOTE: the directed graph arrows point from left to right ( --> ) -// NOTE: the arrows point towards their dependencies (eg: arrows mean requires) +// * the directed graph arrows point from left to right ( -> ) +// * the arrows point away from their dependencies (eg: arrows mean "before") +// * IOW, you might see package -> file -> service (where package runs first) +// * This is also the direction that the notify should happen in... type Graph struct { - uuid string Name string - Adjacency map[*Vertex]map[*Vertex]*Edge + Adjacency map[*Vertex]map[*Vertex]*Edge // *Vertex -> *Vertex (edge) + state graphState + mutex sync.Mutex // used when modifying graph State variable //Directed bool - startcount int } type Vertex struct { - uuid string - graph *Graph // store a pointer to the graph it's on - Name string - Type string - Timestamp int64 // last updated timestamp ? - Events chan string // FIXME: eventually a struct for the event? - Typedata Type - data map[string]string + graph *Graph // store a pointer to the graph it's on + Type // anonymous field + data map[string]string // XXX: currently unused i think, remove? } type Edge struct { - uuid string Name string } func NewGraph(name string) *Graph { return &Graph{ - uuid: uuid.New(), Name: name, Adjacency: make(map[*Vertex]map[*Vertex]*Edge), + state: graphNil, } } -func NewVertex(name, t string) *Vertex { +func NewVertex(t Type) *Vertex { return &Vertex{ - uuid: uuid.New(), - Name: name, - Type: t, - Timestamp: -1, - Events: make(chan string, 1), // XXX: chan size? - data: make(map[string]string), + Type: t, + data: make(map[string]string), } } func NewEdge(name string) *Edge { return &Edge{ - uuid: uuid.New(), Name: name, } } -// Graph() creates a new, empty graph. -// addVertex(vert) adds an instance of Vertex to the graph. +// set name of the graph +func (g *Graph) SetName(name string) { + g.Name = name +} + +func (g *Graph) State() graphState { + g.mutex.Lock() + defer g.mutex.Unlock() + return g.state +} + +func (g *Graph) SetState(state graphState) { + g.mutex.Lock() + defer g.mutex.Unlock() + g.state = state +} + +// store a pointer in the type to it's parent vertex +func (g *Graph) SetVertex() { + for v := range g.GetVerticesChan() { + v.Type.SetVertex(v) + } +} + +// add a new vertex to the graph func (g *Graph) AddVertex(v *Vertex) { if _, exists := g.Adjacency[v]; !exists { g.Adjacency[v] = make(map[*Vertex]*Edge) @@ -91,7 +115,14 @@ func (g *Graph) AddVertex(v *Vertex) { } } -// addEdge(fromVert, toVert) Adds a new, directed edge to the graph that connects two vertices. +func (g *Graph) DeleteVertex(v *Vertex) { + delete(g.Adjacency, v) + for k := range g.Adjacency { + delete(g.Adjacency[k], v) + } +} + +// adds a directed edge to the graph from v1 to v2 func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) { // NOTE: this doesn't allow more than one edge between two vertexes... // TODO: is this a problem? @@ -100,30 +131,55 @@ func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) { g.Adjacency[v1][v2] = e } -// addEdge(fromVert, toVert, weight) Adds a new, weighted, directed edge to the graph that connects two vertices. -// getVertex(vertKey) finds the vertex in the graph named vertKey. -func (g *Graph) GetVertex(uuid string) chan *Vertex { +// XXX: does it make sense to return a channel here? +// GetVertex finds the vertex in the graph with a particular search name +func (g *Graph) GetVertex(name string) chan *Vertex { ch := make(chan *Vertex, 1) - go func(uuid string) { + go func(name string) { for k := range g.Adjacency { - v := *k - if v.uuid == uuid { + if k.GetName() == name { ch <- k break } } close(ch) - }(uuid) + }(name) return ch } +func (g *Graph) GetVertexMatch(obj Type) *Vertex { + for k := range g.Adjacency { + if k.Compare(obj) { // XXX test + return k + } + } + return nil +} + +func (g *Graph) HasVertex(v *Vertex) bool { + if _, exists := g.Adjacency[v]; exists { + return true + } + //for k := range g.Adjacency { + // if k == v { + // return true + // } + //} + return false +} + +// number of vertices in the graph func (g *Graph) NumVertices() int { return len(g.Adjacency) } +// number of edges in the graph func (g *Graph) NumEdges() int { - // XXX: not implemented - return -1 + count := 0 + for k := range g.Adjacency { + count += len(g.Adjacency[k]) + } + return count } // get an array (slice) of all vertices in the graph @@ -138,7 +194,6 @@ func (g *Graph) GetVertices() []*Vertex { // returns a channel of all vertices in the graph func (g *Graph) GetVerticesChan() chan *Vertex { ch := make(chan *Vertex) - // TODO: do you need to pass this through into the go routine? go func(ch chan *Vertex) { for k := range g.Adjacency { ch <- k @@ -153,7 +208,6 @@ func (g *Graph) String() string { return fmt.Sprintf("Vertices(%d), Edges(%d)", g.NumVertices(), g.NumEdges()) } -//func (s []*Vertex) contains(element *Vertex) bool { // google/golang hackers apparently do not think contains should be a built-in! func Contains(s []*Vertex, element *Vertex) bool { for _, v := range s { @@ -164,20 +218,15 @@ func Contains(s []*Vertex, element *Vertex) bool { return false } -// return an array (slice) of all vertices that connect to vertex v -func (g *Graph) GraphEdges(vertex *Vertex) []*Vertex { +// return an array (slice) of all directed vertices to vertex v (??? -> v) +// ostimestamp should use this +func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex { // TODO: we might be able to implement this differently by reversing // the Adjacency graph and then looping through it again... - s := make([]*Vertex, 0) // stack - for w, _ := range g.Adjacency[vertex] { // forward paths - //fmt.Printf("forward: %v -> %v\n", v.Name, w.Name) - s = append(s, w) - } - - for k, x := range g.Adjacency { // reverse paths - for w, _ := range x { - if w == vertex { - //fmt.Printf("reverse: %v -> %v\n", v.Name, k.Name) + s := make([]*Vertex, 0) + for k, _ := range g.Adjacency { // reverse paths + for w, _ := range g.Adjacency[k] { + if w == v { s = append(s, k) } } @@ -185,27 +234,22 @@ func (g *Graph) GraphEdges(vertex *Vertex) []*Vertex { return s } -// return an array (slice) of all directed vertices to vertex v -func (g *Graph) DirectedGraphEdges(vertex *Vertex) []*Vertex { - // TODO: we might be able to implement this differently by reversing - // the Adjacency graph and then looping through it again... - s := make([]*Vertex, 0) // stack - for w, _ := range g.Adjacency[vertex] { // forward paths - //fmt.Printf("forward: %v -> %v\n", v.Name, w.Name) - s = append(s, w) +// return an array (slice) of all vertices that vertex v points to (v -> ???) +// poke should use this +func (g *Graph) OutgoingGraphEdges(v *Vertex) []*Vertex { + s := make([]*Vertex, 0) + for k, _ := range g.Adjacency[v] { // forward paths + s = append(s, k) } return s } -// get timestamp of a vertex -func (v *Vertex) GetTimestamp() int64 { - return v.Timestamp -} - -// update timestamp of a vertex -func (v *Vertex) UpdateTimestamp() int64 { - v.Timestamp = time.Now().UnixNano() // update - return v.Timestamp +// return an array (slice) of all vertices that connect to vertex v +func (g *Graph) GraphEdges(v *Vertex) []*Vertex { + s := make([]*Vertex, 0) + s = append(s, g.IncomingGraphEdges(v)...) + s = append(s, g.OutgoingGraphEdges(v)...) + return s } func (g *Graph) DFS(start *Vertex) []*Vertex { @@ -279,12 +323,89 @@ func (g *Graph) GetDisconnectedGraphs() chan *Graph { // if we've found all the elements, then we're done // otherwise loop through to continue... } - close(ch) }() return ch } +// return the indegree for the graph +func (g *Graph) InDegree() map[*Vertex]int { + result := make(map[*Vertex]int) + for k := range g.Adjacency { + result[k] = 0 // initialize + } + + for k := range g.Adjacency { + for z := range g.Adjacency[k] { + result[z] += 1 + } + } + return result +} + +// return the outdegree for the graph +func (g *Graph) OutDegree() map[*Vertex]int { + result := make(map[*Vertex]int) + + for k := range g.Adjacency { + result[k] = 0 // initialize + for _ = range g.Adjacency[k] { + result[k] += 1 + } + } + return result +} + +// returns a topological sort for the graph +// based on descriptions and code from wikipedia and rosetta code +// TODO: add memoization, and cache invalidation to speed this up :) +func (g *Graph) TopologicalSort() (result []*Vertex, ok bool) { // kahn's algorithm + + L := make([]*Vertex, 0) // empty list that will contain the sorted elements + S := make([]*Vertex, 0) // set of all nodes with no incoming edges + remaining := make(map[*Vertex]int) // amount of edges remaining + + for v, d := range g.InDegree() { + if d == 0 { + // accumulate set of all nodes with no incoming edges + S = append(S, v) + } else { + // initialize remaining edge count from indegree + remaining[v] = d + } + } + + for len(S) > 0 { + last := len(S) - 1 // remove a node v from S + v := S[last] + S = S[:last] + L = append(L, v) // add v to tail of L + for n, _ := range g.Adjacency[v] { + // for each node n remaining in the graph, consume from + // remaining, so for remaining[n] > 0 + if remaining[n] > 0 { + remaining[n]-- // remove edge from the graph + if remaining[n] == 0 { // if n has no other incoming edges + S = append(S, n) // insert n into S + } + } + } + } + + // if graph has edges, eg if any value in rem is > 0 + for c, in := range remaining { + if in > 0 { + for n, _ := range g.Adjacency[c] { + if remaining[n] > 0 { + return nil, false // not a dag! + } + } + } + } + + return L, true +} + func (v *Vertex) Value(key string) (string, bool) { if value, exists := v.data[key]; exists { return value, true @@ -324,91 +445,68 @@ func HeisenbergCount(ch chan *Vertex) int { return c } -func (v *Vertex) Associate(t Type) { - v.Typedata = t -} +// main kick to start the graph +func (g *Graph) Start(wg *sync.WaitGroup) { + t, _ := g.TopologicalSort() + for _, v := range Reverse(t) { + + wg.Add(1) + // must pass in value to avoid races... + // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ + go func(vv *Vertex) { + defer wg.Done() + vv.Type.Watch() + log.Printf("Finish: %v", vv.GetName()) + }(v) + + // ensure state is started before continuing on to next vertex + v.Type.SendEvent(eventStart, true) -func (v *Vertex) OKTimestamp() bool { - g := v.GetGraph() - for _, n := range g.DirectedGraphEdges(v) { - if v.GetTimestamp() > n.GetTimestamp() { - return false - } } - - return true } -// poke the XXX children? -func (v *Vertex) Poke() { - g := v.GetGraph() +func (g *Graph) Continue() { + t, _ := g.TopologicalSort() + for _, v := range Reverse(t) { + v.Type.SendEvent(eventContinue, true) + } +} - for _, n := range g.DirectedGraphEdges(v) { // XXX: do we want the reverse order? - // poke! - n.Events <- fmt.Sprintf("poke(%v)", v.Name) +func (g *Graph) Pause() { + t, _ := g.TopologicalSort() + for _, v := range t { // squeeze out the events... + v.Type.SendEvent(eventPause, true) } } func (g *Graph) Exit() { - // tell all the vertices to exit... - for v := range g.GetVerticesChan() { - v.Exit() + t, _ := g.TopologicalSort() + for _, v := range t { // squeeze out the events... + // turn off the taps... + // XXX: do this by sending an exit signal, and then returning + // when we hit the 'default' in the select statement! + // XXX: we can do this to quiesce, but it's not necessary now + + v.Type.SendEvent(eventExit, true) } } -func (v *Vertex) Exit() { - v.Events <- "exit" -} - -// main loop for each vertex -// warning: this logic might be subtle and tricky. -// be careful as it might not even be correct now! -func (v *Vertex) Start() { - log.Printf("Main->Vertex[%v]->Start()\n", v.Name) - - //g := v.GetGraph() - var t = v.Typedata - - // this whole wg2 wait group is only necessary if we need to wait for - // the go routine to exit... - var wg2 sync.WaitGroup - - wg2.Add(1) - go func(v *Vertex, t Type) { - defer wg2.Done() - //fmt.Printf("About to watch [%v].\n", v.Name) - t.Watch(v) - }(v, t) - - var ok bool - //XXX make sure dependencies run and become more current first... - for { - select { - case event := <-v.Events: - - log.Printf("Event[%v]: %v\n", v.Name, event) - - if event == "exit" { - t.Exit() // type exit - wg2.Wait() // wait for worker to exit - return - } - - ok = true - if v.OKTimestamp() { - if !t.StateOK() { // TODO: can we rename this to something better? - // throw an error if apply fails... - // if this fails, don't UpdateTimestamp() - if !t.Apply() { // check for error - ok = false - } - } - - if ok { - v.UpdateTimestamp() // this was touched... - v.Poke() // XXX - } - } +// in array function to test *vertices in a slice of *vertices +func HasVertex(v *Vertex, haystack []*Vertex) bool { + for _, r := range haystack { + if v == r { + return true } } + return false +} + +// reverse a list of vertices +func Reverse(vs []*Vertex) []*Vertex { + out := make([]*Vertex, 0) // empty list + l := len(vs) + for i := range vs { + out = append(out, vs[l-i-1]) + } + return out } diff --git a/pgraph_test.go b/pgraph_test.go index cf54e7f3..ef232571 100644 --- a/pgraph_test.go +++ b/pgraph_test.go @@ -20,6 +20,7 @@ package main import ( + "reflect" "testing" ) @@ -31,25 +32,33 @@ func TestPgraphT1(t *testing.T) { t.Errorf("Should have 0 vertices instead of: %d.", i) } - v1 := NewVertex("v1", "type") - v2 := NewVertex("v2", "type") + if i := G.NumEdges(); i != 0 { + t.Errorf("Should have 0 edges instead of: %d.", i) + } + + v1 := NewVertex(NewNoopType("v1")) + v2 := NewVertex(NewNoopType("v2")) e1 := NewEdge("e1") G.AddEdge(v1, v2, e1) if i := G.NumVertices(); i != 2 { t.Errorf("Should have 2 vertices instead of: %d.", i) } + + if i := G.NumEdges(); i != 1 { + t.Errorf("Should have 1 edges instead of: %d.", i) + } } func TestPgraphT2(t *testing.T) { G := NewGraph("g2") - v1 := NewVertex("v1", "type") - v2 := NewVertex("v2", "type") - v3 := NewVertex("v3", "type") - v4 := NewVertex("v4", "type") - v5 := NewVertex("v5", "type") - v6 := NewVertex("v6", "type") + v1 := NewVertex(NewNoopType("v1")) + v2 := NewVertex(NewNoopType("v2")) + v3 := NewVertex(NewNoopType("v3")) + v4 := NewVertex(NewNoopType("v4")) + v5 := NewVertex(NewNoopType("v5")) + v6 := NewVertex(NewNoopType("v6")) e1 := NewEdge("e1") e2 := NewEdge("e2") e3 := NewEdge("e3") @@ -71,12 +80,12 @@ func TestPgraphT2(t *testing.T) { func TestPgraphT3(t *testing.T) { G := NewGraph("g3") - v1 := NewVertex("v1", "type") - v2 := NewVertex("v2", "type") - v3 := NewVertex("v3", "type") - v4 := NewVertex("v4", "type") - v5 := NewVertex("v5", "type") - v6 := NewVertex("v6", "type") + v1 := NewVertex(NewNoopType("v1")) + v2 := NewVertex(NewNoopType("v2")) + v3 := NewVertex(NewNoopType("v3")) + v4 := NewVertex(NewNoopType("v4")) + v5 := NewVertex(NewNoopType("v5")) + v6 := NewVertex(NewNoopType("v6")) e1 := NewEdge("e1") e2 := NewEdge("e2") e3 := NewEdge("e3") @@ -95,7 +104,7 @@ func TestPgraphT3(t *testing.T) { t.Errorf("Should have 3 vertices instead of: %d.", i) t.Errorf("Found: %v", out1) for _, v := range out1 { - t.Errorf("Value: %v", v.Name) + t.Errorf("Value: %v", v.GetName()) } } @@ -104,7 +113,7 @@ func TestPgraphT3(t *testing.T) { t.Errorf("Should have 3 vertices instead of: %d.", i) t.Errorf("Found: %v", out1) for _, v := range out1 { - t.Errorf("Value: %v", v.Name) + t.Errorf("Value: %v", v.GetName()) } } } @@ -112,9 +121,9 @@ func TestPgraphT3(t *testing.T) { func TestPgraphT4(t *testing.T) { G := NewGraph("g4") - v1 := NewVertex("v1", "type") - v2 := NewVertex("v2", "type") - v3 := NewVertex("v3", "type") + v1 := NewVertex(NewNoopType("v1")) + v2 := NewVertex(NewNoopType("v2")) + v3 := NewVertex(NewNoopType("v3")) e1 := NewEdge("e1") e2 := NewEdge("e2") e3 := NewEdge("e3") @@ -127,19 +136,19 @@ func TestPgraphT4(t *testing.T) { t.Errorf("Should have 3 vertices instead of: %d.", i) t.Errorf("Found: %v", out) for _, v := range out { - t.Errorf("Value: %v", v.Name) + t.Errorf("Value: %v", v.GetName()) } } } func TestPgraphT5(t *testing.T) { G := NewGraph("g5") - v1 := NewVertex("v1", "type") - v2 := NewVertex("v2", "type") - v3 := NewVertex("v3", "type") - v4 := NewVertex("v4", "type") - v5 := NewVertex("v5", "type") - v6 := NewVertex("v6", "type") + v1 := NewVertex(NewNoopType("v1")) + v2 := NewVertex(NewNoopType("v2")) + v3 := NewVertex(NewNoopType("v3")) + v4 := NewVertex(NewNoopType("v4")) + v5 := NewVertex(NewNoopType("v5")) + v6 := NewVertex(NewNoopType("v6")) e1 := NewEdge("e1") e2 := NewEdge("e2") e3 := NewEdge("e3") @@ -159,17 +168,16 @@ func TestPgraphT5(t *testing.T) { if i := out.NumVertices(); i != 3 { t.Errorf("Should have 3 vertices instead of: %d.", i) } - } func TestPgraphT6(t *testing.T) { G := NewGraph("g6") - v1 := NewVertex("v1", "type") - v2 := NewVertex("v2", "type") - v3 := NewVertex("v3", "type") - v4 := NewVertex("v4", "type") - v5 := NewVertex("v5", "type") - v6 := NewVertex("v6", "type") + v1 := NewVertex(NewNoopType("v1")) + v2 := NewVertex(NewNoopType("v2")) + v3 := NewVertex(NewNoopType("v3")) + v4 := NewVertex(NewNoopType("v4")) + v5 := NewVertex(NewNoopType("v5")) + v6 := NewVertex(NewNoopType("v6")) e1 := NewEdge("e1") e2 := NewEdge("e2") e3 := NewEdge("e3") @@ -197,5 +205,204 @@ func TestPgraphT6(t *testing.T) { if i := HeisenbergGraphCount(graphs); i != 2 { t.Errorf("Should have 2 graphs instead of: %d.", i) } +} + +func TestPgraphT7(t *testing.T) { + + G := NewGraph("g7") + v1 := NewVertex(NewNoopType("v1")) + v2 := NewVertex(NewNoopType("v2")) + v3 := NewVertex(NewNoopType("v3")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + G.AddEdge(v1, v2, e1) + G.AddEdge(v2, v3, e2) + G.AddEdge(v3, v1, e3) + + if i := G.NumVertices(); i != 3 { + t.Errorf("Should have 3 vertices instead of: %d.", i) + } + + G.DeleteVertex(v2) + + if i := G.NumVertices(); i != 2 { + t.Errorf("Should have 2 vertices instead of: %d.", i) + } + + G.DeleteVertex(v1) + + if i := G.NumVertices(); i != 1 { + t.Errorf("Should have 1 vertices instead of: %d.", i) + } + + G.DeleteVertex(v3) + + if i := G.NumVertices(); i != 0 { + t.Errorf("Should have 0 vertices instead of: %d.", i) + } + + G.DeleteVertex(v2) // duplicate deletes don't error... + + if i := G.NumVertices(); i != 0 { + t.Errorf("Should have 0 vertices instead of: %d.", i) + } +} + +func TestPgraphT8(t *testing.T) { + + v1 := NewVertex(NewNoopType("v1")) + v2 := NewVertex(NewNoopType("v2")) + v3 := NewVertex(NewNoopType("v3")) + if HasVertex(v1, []*Vertex{v1, v2, v3}) != true { + t.Errorf("Should be true instead of false.") + } + + v4 := NewVertex(NewNoopType("v4")) + v5 := NewVertex(NewNoopType("v5")) + v6 := NewVertex(NewNoopType("v6")) + if HasVertex(v4, []*Vertex{v5, v6}) != false { + t.Errorf("Should be false instead of true.") + } + + v7 := NewVertex(NewNoopType("v7")) + v8 := NewVertex(NewNoopType("v8")) + v9 := NewVertex(NewNoopType("v9")) + if HasVertex(v8, []*Vertex{v7, v8, v9}) != true { + t.Errorf("Should be true instead of false.") + } + + v_1 := NewVertex(NewNoopType("v1")) // same value, different objects + if HasVertex(v_1, []*Vertex{v1, v2, v3}) != false { + t.Errorf("Should be false instead of true.") + } +} + +func TestPgraphT9(t *testing.T) { + + G := NewGraph("g9") + v1 := NewVertex(NewNoopType("v1")) + v2 := NewVertex(NewNoopType("v2")) + v3 := NewVertex(NewNoopType("v3")) + v4 := NewVertex(NewNoopType("v4")) + v5 := NewVertex(NewNoopType("v5")) + v6 := NewVertex(NewNoopType("v6")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + e5 := NewEdge("e5") + e6 := NewEdge("e6") + G.AddEdge(v1, v2, e1) + G.AddEdge(v1, v3, e2) + G.AddEdge(v2, v4, e3) + G.AddEdge(v3, v4, e4) + + G.AddEdge(v4, v5, e5) + G.AddEdge(v5, v6, e6) + + indegree := G.InDegree() // map[*Vertex]int + if i := indegree[v1]; i != 0 { + t.Errorf("Indegree of v1 should be 0 instead of: %d.", i) + } + if i := indegree[v2]; i != 1 { + t.Errorf("Indegree of v2 should be 1 instead of: %d.", i) + } + if i := indegree[v3]; i != 1 { + t.Errorf("Indegree of v3 should be 1 instead of: %d.", i) + } + if i := indegree[v4]; i != 2 { + t.Errorf("Indegree of v4 should be 2 instead of: %d.", i) + } + if i := indegree[v5]; i != 1 { + t.Errorf("Indegree of v5 should be 1 instead of: %d.", i) + } + if i := indegree[v6]; i != 1 { + t.Errorf("Indegree of v6 should be 1 instead of: %d.", i) + } + + outdegree := G.OutDegree() // map[*Vertex]int + if i := outdegree[v1]; i != 2 { + t.Errorf("Outdegree of v1 should be 2 instead of: %d.", i) + } + if i := outdegree[v2]; i != 1 { + t.Errorf("Outdegree of v2 should be 1 instead of: %d.", i) + } + if i := outdegree[v3]; i != 1 { + t.Errorf("Outdegree of v3 should be 1 instead of: %d.", i) + } + if i := outdegree[v4]; i != 1 { + t.Errorf("Outdegree of v4 should be 1 instead of: %d.", i) + } + if i := outdegree[v5]; i != 1 { + t.Errorf("Outdegree of v5 should be 1 instead of: %d.", i) + } + if i := outdegree[v6]; i != 0 { + t.Errorf("Outdegree of v6 should be 0 instead of: %d.", i) + } + + s, ok := G.TopologicalSort() + // either possibility is a valid toposort + match := reflect.DeepEqual(s, []*Vertex{v1, v2, v3, v4, v5, v6}) || reflect.DeepEqual(s, []*Vertex{v1, v3, v2, v4, v5, v6}) + if !ok || !match { + t.Errorf("Topological sort failed, status: %v.", ok) + str := "Found:" + for _, v := range s { + str += " " + v.Type.GetName() + } + t.Errorf(str) + } +} + +func TestPgraphT10(t *testing.T) { + + G := NewGraph("g10") + v1 := NewVertex(NewNoopType("v1")) + v2 := NewVertex(NewNoopType("v2")) + v3 := NewVertex(NewNoopType("v3")) + v4 := NewVertex(NewNoopType("v4")) + v5 := NewVertex(NewNoopType("v5")) + v6 := NewVertex(NewNoopType("v6")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + e5 := NewEdge("e5") + e6 := NewEdge("e6") + G.AddEdge(v1, v2, e1) + G.AddEdge(v2, v3, e2) + G.AddEdge(v3, v4, e3) + G.AddEdge(v4, v5, e4) + G.AddEdge(v5, v6, e5) + G.AddEdge(v4, v2, e6) // cycle + + if _, ok := G.TopologicalSort(); ok { + t.Errorf("Topological sort passed, but graph is cyclic.") + } +} + +func TestPgraphT11(t *testing.T) { + v1 := NewVertex(NewNoopType("v1")) + v2 := NewVertex(NewNoopType("v2")) + v3 := NewVertex(NewNoopType("v3")) + v4 := NewVertex(NewNoopType("v4")) + v5 := NewVertex(NewNoopType("v5")) + v6 := NewVertex(NewNoopType("v6")) + + if rev := Reverse([]*Vertex{}); !reflect.DeepEqual(rev, []*Vertex{}) { + t.Errorf("Reverse of vertex slice failed.") + } + + if rev := Reverse([]*Vertex{v1}); !reflect.DeepEqual(rev, []*Vertex{v1}) { + t.Errorf("Reverse of vertex slice failed.") + } + + if rev := Reverse([]*Vertex{v1, v2, v3, v4, v5, v6}); !reflect.DeepEqual(rev, []*Vertex{v6, v5, v4, v3, v2, v1}) { + t.Errorf("Reverse of vertex slice failed.") + } + + if rev := Reverse([]*Vertex{v6, v5, v4, v3, v2, v1}); !reflect.DeepEqual(rev, []*Vertex{v1, v2, v3, v4, v5, v6}) { + t.Errorf("Reverse of vertex slice failed.") + } } diff --git a/service.go b/service.go index 440f266d..76319b5b 100644 --- a/service.go +++ b/service.go @@ -20,7 +20,6 @@ package main import ( - "code.google.com/p/go-uuid/uuid" "fmt" systemd "github.com/coreos/go-systemd/dbus" // change namespace "github.com/coreos/go-systemd/util" @@ -29,29 +28,27 @@ import ( ) type ServiceType struct { - uuid string - Type string // always "service" - Name string // name variable - Events chan string // FIXME: eventually a struct for the event? - State string // state: running, stopped - Startup string // enabled, disabled, undefined + BaseType `yaml:",inline"` + State string `yaml:"state"` // state: running, stopped + Startup string `yaml:"startup"` // enabled, disabled, undefined } func NewServiceType(name, state, startup string) *ServiceType { return &ServiceType{ - uuid: uuid.New(), - Type: "service", - Name: name, - Events: make(chan string, 1), // XXX: chan size? + BaseType: BaseType{ + Name: name, + events: make(chan Event), + vertex: nil, + }, State: state, Startup: startup, } } // Service watcher -func (obj ServiceType) Watch(v *Vertex) { +func (obj *ServiceType) Watch() { // obj.Name: service name - + //vertex := obj.GetVertex() // stored with SetVertex if !util.IsRunningSystemd() { log.Fatal("Systemd is not running.") } @@ -119,12 +116,11 @@ func (obj ServiceType) Watch(v *Vertex) { // loop so that we can see the changed invalid signal log.Printf("Service[%v]->DaemonReload()\n", service) - case exit := <-obj.Events: - if exit == "exit" { - return - } else { - log.Fatal("Unknown event: %v\n", exit) + case event := <-obj.events: + if ok := obj.ReadEvent(&event); !ok { + return // exit } + send = true } } else { if !activeSet { @@ -156,31 +152,25 @@ func (obj ServiceType) Watch(v *Vertex) { case err := <-subErrors: log.Println("error:", err) log.Fatal(err) - v.Events <- fmt.Sprintf("service: %v", "error") + //vertex.events <- fmt.Sprintf("service: %v", "error") // XXX: how should we handle errors? - case exit := <-obj.Events: - if exit == "exit" { - return - } else { - log.Fatal("Unknown event: %v\n", exit) + case event := <-obj.events: + if ok := obj.ReadEvent(&event); !ok { + return // exit } + send = true } } if send { send = false - //log.Println("Sending event!") - v.Events <- fmt.Sprintf("service(%v): %v", obj.Name, "event!") // FIXME: use struct + obj.Process(obj) // XXX: rename this function } + } } -func (obj ServiceType) Exit() bool { - obj.Events <- "exit" - return true -} - -func (obj ServiceType) StateOK() bool { +func (obj *ServiceType) StateOK() bool { if !util.IsRunningSystemd() { log.Fatal("Systemd is not running.") @@ -232,8 +222,8 @@ func (obj ServiceType) StateOK() bool { return true // all is good, no state change needed } -func (obj ServiceType) Apply() bool { - fmt.Printf("Apply->%v[%v]\n", obj.Type, obj.Name) +func (obj *ServiceType) Apply() bool { + fmt.Printf("Apply->Service[%v]\n", obj.Name) if !util.IsRunningSystemd() { log.Fatal("Systemd is not running.") @@ -292,3 +282,25 @@ func (obj ServiceType) Apply() bool { return true } + +func (obj *ServiceType) Compare(typ Type) bool { + switch typ.(type) { + case *ServiceType: + return obj.compare(typ.(*ServiceType)) + default: + return false + } +} + +func (obj *ServiceType) compare(typ *ServiceType) bool { + if obj.Name != typ.Name { + return false + } + if obj.State != typ.State { + return false + } + if obj.Startup != typ.Startup { + return false + } + return true +} diff --git a/test/test-yamlfmt.sh b/test/test-yamlfmt.sh index 3a914dd1..07b7f1c9 100755 --- a/test/test-yamlfmt.sh +++ b/test/test-yamlfmt.sh @@ -37,7 +37,7 @@ find_files() { bad_files=$( for i in $(find_files); do - if ! diff -q <( ruby -e "require 'yaml'; puts YAML.load_file('$i').to_yaml" 2>/dev/null ) <( cat "$i" ) &>/dev/null; then + if ! diff -q <( ruby -e "require 'yaml'; puts YAML.load_file('$i').to_yaml.each_line.map(&:rstrip).join(10.chr)+10.chr" 2>/dev/null ) <( cat "$i" ) &>/dev/null; then echo "$i" fi done diff --git a/types.go b/types.go index f01eabed..addac288 100644 --- a/types.go +++ b/types.go @@ -18,56 +18,231 @@ package main import ( - "code.google.com/p/go-uuid/uuid" "fmt" "log" + "time" ) type Type interface { - //Name() string - Watch(*Vertex) + Init() + GetName() string // can't be named "Name()" because of struct field + Watch() StateOK() bool // TODO: can we rename this to something better? Apply() bool - Exit() bool + SetVertex(*Vertex) + Compare(Type) bool + SendEvent(eventName, bool) + GetTimestamp() int64 + UpdateTimestamp() int64 + //Process() +} + +type BaseType struct { + Name string `yaml:"name"` + timestamp int64 // last updated timestamp ? + events chan Event + vertex *Vertex } type NoopType struct { - uuid string - Type string // always "noop" - Name string // name variable - Events chan string // FIXME: eventually a struct for the event? + BaseType `yaml:",inline"` + Comment string `yaml:"comment"` // extra field for example purposes } func NewNoopType(name string) *NoopType { + // FIXME: we could get rid of this New constructor and use raw object creation with a required Init() return &NoopType{ - uuid: uuid.New(), - Type: "noop", - Name: name, - Events: make(chan string, 1), // XXX: chan size? + BaseType: BaseType{ + Name: name, + events: make(chan Event), // unbuffered chan size to avoid stale events + vertex: nil, + }, + Comment: "", } } -func (obj NoopType) Watch(v *Vertex) { - select { - case exit := <-obj.Events: - if exit == "exit" { +// initialize structures like channels if created without New constructor +func (obj *BaseType) Init() { + obj.events = make(chan Event) +} + +// this method gets used by all the types, if we have one of (obj NoopType) it would get overridden in that case! +func (obj *BaseType) GetName() string { + return obj.Name +} + +func (obj *BaseType) GetVertex() *Vertex { + return obj.vertex +} + +func (obj *BaseType) SetVertex(v *Vertex) { + obj.vertex = v +} + +// get timestamp of a vertex +func (obj *BaseType) GetTimestamp() int64 { + return obj.timestamp +} + +// update timestamp of a vertex +func (obj *BaseType) UpdateTimestamp() int64 { + obj.timestamp = time.Now().UnixNano() // update + return obj.timestamp +} + +// can this element run right now? +func (obj *BaseType) OKTimestamp() bool { + v := obj.GetVertex() + g := v.GetGraph() + // these are all the vertices pointing TO v, eg: ??? -> v + for _, n := range g.IncomingGraphEdges(v) { + // if the vertex has a greater timestamp than any pre-req (n) + // then we can't run right now... + if obj.GetTimestamp() > n.Type.GetTimestamp() { + return false + } + } + return true +} + +func (obj *BaseType) Poke() bool { // XXX: how can this ever fail and return false? eg: when is a poke not possible and should be rescheduled? + v := obj.GetVertex() + g := v.GetGraph() + // these are all the vertices pointing AWAY FROM v, eg: v -> ??? + for _, n := range g.OutgoingGraphEdges(v) { + n.SendEvent(eventPoke, false) // XXX: should this be sync or not? XXX: try it as async for now, but switch to sync and see if we deadlock -- maybe it's possible, i don't know for sure yet + } + return true +} + +// push an event into the message queue for a particular type vertex +func (obj *BaseType) SendEvent(event eventName, sync bool) { + if !sync { + obj.events <- Event{event, nil, ""} + return + } + + resp := make(chan bool) + obj.events <- Event{event, resp, ""} + for { + value := <-resp + // wait until true value + if value { return - } else { - log.Fatal("Unknown event: %v\n", exit) } } } -func (obj NoopType) Exit() bool { - obj.Events <- "exit" - return true +// process events when a select gets one +// this handles the pause code too! +func (obj *BaseType) ReadEvent(event *Event) bool { + + event.ACK() + switch event.Name { + case eventStart: + return true + + case eventPoke: + return true + + case eventExit: + return false + + case eventPause: + // wait for next event to continue + select { + case e := <-obj.events: + e.ACK() + if e.Name == eventExit { + return false + } else if e.Name == eventContinue { + return true + } else { + log.Fatal("Unknown event: ", e) + } + } + + default: + log.Fatal("Unknown event: ", event) + } + return false // required to keep the stupid go compiler happy } -func (obj NoopType) StateOK() bool { +// XXX: rename this function +func (obj *BaseType) Process(typ Type) { + var ok bool + + ok = true + // is it okay to run dependency wise right now? + // if not, that's okay because when the dependency runs, it will poke + // us back and we will run if needed then! + if obj.OKTimestamp() { + // XXX XXX: why does this have to be typ instead of just obj! "obj.StateOK undefined (type *BaseType has no field or method StateOK)" + + if !typ.StateOK() { // TODO: can we rename this to something better? + // throw an error if apply fails... + // if this fails, don't UpdateTimestamp() + if !typ.Apply() { // check for error + ok = false + } + } + + if ok { + // if poke fails, don't update timestamp + // since we didn't propagate the pokes! + if obj.Poke() { + obj.UpdateTimestamp() // this was touched... + } + } + } + +} + +func (obj *NoopType) Watch() { + //vertex := obj.vertex // stored with SetVertex + var send = false // send event? + for { + + select { + case event := <-obj.events: + + if ok := obj.ReadEvent(&event); !ok { + return // exit + } + send = true + } + + // do all our event sending all together to avoid duplicate msgs + if send { + send = false + + obj.Process(obj) // XXX: rename this function + } + } +} + +func (obj *NoopType) StateOK() bool { return true // never needs updating } -func (obj NoopType) Apply() bool { - fmt.Printf("Apply->%v[%v]\n", obj.Type, obj.Name) +func (obj *NoopType) Apply() bool { + fmt.Printf("Apply->Noop[%v]\n", obj.Name) + return true +} + +func (obj *NoopType) Compare(typ Type) bool { + switch typ.(type) { + // we can only compare NoopType to others of the same type + case *NoopType: + return obj.compare(typ.(*NoopType)) + default: + return false + } +} + +func (obj *NoopType) compare(typ *NoopType) bool { + if obj.Name != typ.Name { + return false + } return true }