diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index d6260fb3..549c710e 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -32,6 +32,7 @@ along with this program. If not, see . 3. [Setup - Getting started with mgmt](#setup) 4. [Features - All things mgmt can do](#features) * [Autoedges - Automatic resource relationships](#autoedges) + * [Autogrouping - Automatic resource grouping](#autogrouping) 5. [Usage/FAQ - Notes on usage and frequently asked questions](#usage-and-frequently-asked-questions) 6. [Reference - Detailed reference](#reference) * [Graph definition file](#graph-definition-file) @@ -94,8 +95,22 @@ order to handle this situation you can disable autoedges per resource and explicitly declare that you want `my.cnf` to be written to disk before the installation of the `mysql-server` package. -You can disable autoedges for a resource by setting the `autoedge` key for -the meta attributes of a resource to `false`. +You can disable autoedges for a resource by setting the `autoedge` key on +the meta attributes of that resource to `false`. + +###Autogrouping + +Automatic grouping or AutoGroup is the mechanism in mgmt by which it will +automatically group multiple resource vertices into a single one. This is +particularly useful for grouping multiple package resources into a single +resource, since the multiple installations can happen together in a single +transaction, which saves a lot of time because package resources typically have +a large fixed cost to running (downloading and verifying the package repo) and +if they are grouped they share this fixed cost. This grouping feature can be +used for other use cases too. + +You can disable autogrouping for a resource by setting the `autogroup` key on +the meta attributes of that resource to `false`. ##Usage and frequently asked questions (Send your questions as a patch to this FAQ! I'll review it, merge it, and diff --git a/config.go b/config.go index 3465fe7d..fe42b785 100644 --- a/config.go +++ b/config.go @@ -23,16 +23,17 @@ import ( "gopkg.in/yaml.v2" "io/ioutil" "log" + "reflect" "strings" ) type collectorResConfig struct { - Res string `yaml:"res"` + Kind string `yaml:"kind"` Pattern string `yaml:"pattern"` // XXX: Not Implemented } type vertexConfig struct { - Res string `yaml:"res"` + Kind string `yaml:"kind"` Name string `yaml:"name"` } @@ -82,105 +83,79 @@ func ParseConfigFromFile(filename string) *GraphConfig { return &config } -// XXX: we need to fix this function so that it either fails without modifying -// the graph, passes successfully and modifies it, or basically panics i guess -// this way an invalid compilation can leave the old graph running, and we we -// don't modify a partial graph. so we really need to validate, and then perform -// whatever actions are necessary -// finding some way to do this on a copy of the graph, and then do a graph diff -// and merge the new data into the old graph would be more appropriate, in -// particular if we can ensure the graph merge can't fail. As for the putting -// of stuff into etcd, we should probably store the operations to complete in -// the new graph, and keep retrying until it succeeds, thus blocking any new -// etcd operations until that time. -func UpdateGraphFromConfig(config *GraphConfig, hostname string, g *Graph, etcdO *EtcdWObject) bool { +// NewGraphFromConfig returns a new graph from existing input, such as from the +// existing graph, and a GraphConfig struct. +func (g *Graph) NewGraphFromConfig(config *GraphConfig, etcdO *EtcdWObject, hostname string) (*Graph, error) { - var NoopMap = make(map[string]*Vertex) - var PkgMap = make(map[string]*Vertex) - var FileMap = make(map[string]*Vertex) - var SvcMap = make(map[string]*Vertex) - var ExecMap = make(map[string]*Vertex) + var graph *Graph // new graph to return + if g == nil { // FIXME: how can we check for an empty graph? + graph = NewGraph("Graph") // give graph a default name + } else { + graph = g.Copy() // same vertices, since they're pointers! + } var lookup = make(map[string]map[string]*Vertex) - lookup["noop"] = NoopMap - lookup["pkg"] = PkgMap - lookup["file"] = FileMap - lookup["svc"] = SvcMap - lookup["exec"] = ExecMap //log.Printf("%+v", config) // debug - g.SetName(config.Graph) // set graph name + // TODO: if defined (somehow)... + graph.SetName(config.Graph) // set graph name var keep []*Vertex // list of vertex which are the same in new graph - for _, obj := range config.Resources.Noop { - v := g.GetVertexMatch(obj) - if v == nil { // no match found - obj.Init() - v = NewVertex(obj) - g.AddVertex(v) // call standalone in case not part of an edge + // use reflection to avoid duplicating code... better options welcome! + value := reflect.Indirect(reflect.ValueOf(config.Resources)) + vtype := value.Type() + for i := 0; i < vtype.NumField(); i++ { // number of fields in struct + name := vtype.Field(i).Name // string of field name + field := value.FieldByName(name) + iface := field.Interface() // interface type of value + slice := reflect.ValueOf(iface) + // XXX: should we just drop these everywhere and have the kind strings be all lowercase? + kind := FirstToUpper(name) + if DEBUG { + log.Printf("Config: Processing: %v...", kind) } - NoopMap[obj.Name] = v // used for constructing edges - keep = append(keep, v) // append - } - - for _, obj := range config.Resources.Pkg { - v := g.GetVertexMatch(obj) - if v == nil { // no match found - obj.Init() - v = NewVertex(obj) - g.AddVertex(v) // call standalone in case not part of an edge - } - PkgMap[obj.Name] = v // used for constructing edges - keep = append(keep, v) // append - } - - for _, obj := range config.Resources.File { - // XXX: should we export based on a @@ prefix, or a metaparam - // like exported => true || exported => (host pattern)||(other pattern?) - if strings.HasPrefix(obj.Name, "@@") { // exported resource - // add to etcd storage... - obj.Name = obj.Name[2:] //slice off @@ - if !etcdO.EtcdPut(hostname, obj.Name, "file", obj) { - log.Printf("Problem exporting file resource %v.", obj.Name) - continue + for j := 0; j < slice.Len(); j++ { // loop through resources of same kind + x := slice.Index(j).Interface() + obj, ok := x.(Res) // convert to Res type + if !ok { + return nil, fmt.Errorf("Error: Config: Can't convert: %v of type: %T to Res.", x, x) } - } else { - // XXX: we don't have a way of knowing if any of the - // metaparams are undefined, and as a result to set the - // defaults that we want! I hate the go yaml parser!!! - v := g.GetVertexMatch(obj) - if v == nil { // no match found - obj.Init() - v = NewVertex(obj) - g.AddVertex(v) // call standalone in case not part of an edge + + if _, exists := lookup[kind]; !exists { + lookup[kind] = make(map[string]*Vertex) } - FileMap[obj.Name] = v // used for constructing edges - keep = append(keep, v) // append - } - } + // XXX: should we export based on a @@ prefix, or a metaparam + // like exported => true || exported => (host pattern)||(other pattern?) + if !strings.HasPrefix(obj.GetName(), "@@") { // exported resource + // XXX: we don't have a way of knowing if any of the + // metaparams are undefined, and as a result to set the + // defaults that we want! I hate the go yaml parser!!! + v := graph.GetVertexMatch(obj) + if v == nil { // no match found + obj.Init() + v = NewVertex(obj) + graph.AddVertex(v) // call standalone in case not part of an edge + } + lookup[kind][obj.GetName()] = v // used for constructing edges + keep = append(keep, v) // append - for _, obj := range config.Resources.Svc { - v := g.GetVertexMatch(obj) - if v == nil { // no match found - obj.Init() - v = NewVertex(obj) - g.AddVertex(v) // call standalone in case not part of an edge - } - SvcMap[obj.Name] = v // used for constructing edges - keep = append(keep, v) // append - } + } else { + // XXX: do this in a different function... + // add to etcd storage... + obj.SetName(obj.GetName()[2:]) //slice off @@ - for _, obj := range config.Resources.Exec { - v := g.GetVertexMatch(obj) - if v == nil { // no match found - obj.Init() - v = NewVertex(obj) - g.AddVertex(v) // call standalone in case not part of an edge + data, err := ResToB64(obj) + if err != nil { + return nil, fmt.Errorf("Config: Could not encode %v resource: %v, error: %v", kind, obj.GetName(), err) + } + + if !etcdO.EtcdPut(hostname, obj.GetName(), kind, data) { + return nil, fmt.Errorf("Config: Could not export %v resource: %v", kind, obj.GetName()) + } + } } - ExecMap[obj.Name] = v // used for constructing edges - keep = append(keep, v) // append } // lookup from etcd graph @@ -189,100 +164,71 @@ func UpdateGraphFromConfig(config *GraphConfig, hostname string, g *Graph, etcdO nodes, ok := etcdO.EtcdGet() if ok { for _, t := range config.Collector { - // XXX: use t.Res and optionally t.Pattern to collect from etcd storage - log.Printf("Collect: %v; Pattern: %v", t.Res, t.Pattern) + // XXX: should we just drop these everywhere and have the kind strings be all lowercase? + kind := FirstToUpper(t.Kind) - for _, x := range etcdO.EtcdGetProcess(nodes, "file") { - var obj *FileRes - if B64ToObj(x, &obj) != true { - log.Printf("Collect: File: %v not collected!", x) + // use t.Kind and optionally t.Pattern to collect from etcd storage + log.Printf("Collect: %v; Pattern: %v", kind, t.Pattern) + for _, str := range etcdO.EtcdGetProcess(nodes, kind) { + obj, err := B64ToRes(str) + if err != nil { + log.Printf("B64ToRes failed to decode: %v", err) + log.Printf("Collect: %v: not collected!", kind) continue } - if t.Pattern != "" { // XXX: currently the pattern for files can only override the Dirname variable :P - obj.Dirname = t.Pattern + + if t.Pattern != "" { // XXX: simplistic for now + obj.CollectPattern(t.Pattern) // obj.Dirname = t.Pattern } - log.Printf("Collect: File: %v collected!", obj.GetName()) + log.Printf("Collect: %v[%v]: collected!", kind, obj.GetName()) - // XXX: similar to file add code: - v := g.GetVertexMatch(obj) + // XXX: similar to other resource add code: + if _, exists := lookup[kind]; !exists { + lookup[kind] = make(map[string]*Vertex) + } + v := graph.GetVertexMatch(obj) if v == nil { // no match found obj.Init() // initialize go channels or things won't work!!! v = NewVertex(obj) - g.AddVertex(v) // call standalone in case not part of an edge + graph.AddVertex(v) // call standalone in case not part of an edge } - FileMap[obj.GetName()] = v // used for constructing edges - keep = append(keep, v) // append - + lookup[kind][obj.GetName()] = v // used for constructing edges + keep = append(keep, v) // append } } } // get rid of any vertices we shouldn't "keep" (that aren't in new graph) - for _, v := range g.GetVertices() { - if !HasVertex(v, keep) { + for _, v := range graph.GetVertices() { + if !VertexContains(v, keep) { // wait for exit before starting new graph! - v.Res.SendEvent(eventExit, true, false) - g.DeleteVertex(v) + v.SendEvent(eventExit, true, false) + graph.DeleteVertex(v) } } for _, e := range config.Edges { - if _, ok := lookup[e.From.Res]; !ok { - return false + if _, ok := lookup[FirstToUpper(e.From.Kind)]; !ok { + return nil, fmt.Errorf("Can't find 'from' resource!") } - if _, ok := lookup[e.To.Res]; !ok { - return false + if _, ok := lookup[FirstToUpper(e.To.Kind)]; !ok { + return nil, fmt.Errorf("Can't find 'to' resource!") } - if _, ok := lookup[e.From.Res][e.From.Name]; !ok { - return false + if _, ok := lookup[FirstToUpper(e.From.Kind)][e.From.Name]; !ok { + return nil, fmt.Errorf("Can't find 'from' name!") } - if _, ok := lookup[e.To.Res][e.To.Name]; !ok { - return false + if _, ok := lookup[FirstToUpper(e.To.Kind)][e.To.Name]; !ok { + return nil, fmt.Errorf("Can't find 'to' name!") } - g.AddEdge(lookup[e.From.Res][e.From.Name], lookup[e.To.Res][e.To.Name], NewEdge(e.Name)) + graph.AddEdge(lookup[FirstToUpper(e.From.Kind)][e.From.Name], lookup[FirstToUpper(e.To.Kind)][e.To.Name], NewEdge(e.Name)) } - // add auto edges - log.Println("Compile: Adding AutoEdges...") - for _, v := range g.GetVertices() { // for each vertexes autoedges - if !v.GetMeta().AutoEdge { // is the metaparam true? - continue - } - autoEdgeObj := v.AutoEdges() - if autoEdgeObj == nil { - log.Printf("%v[%v]: Config: No auto edges were found!", v.Kind(), v.GetName()) - continue // next vertex - } - - for { // while the autoEdgeObj has more uuids to add... - uuids := autoEdgeObj.Next() // get some! - if uuids == nil { - log.Printf("%v[%v]: Config: The auto edge list is empty!", v.Kind(), v.GetName()) - break // inner loop - } - if DEBUG { - log.Println("Compile: AutoEdge: UUIDS:") - for i, u := range uuids { - log.Printf("Compile: AutoEdge: UUID%d: %v", i, u) - } - } - - // match and add edges - result := g.AddEdgesByMatchingUUIDS(v, uuids) - - // report back, and find out if we should continue - if !autoEdgeObj.Test(result) { - break - } - } - } - - return true + return graph, nil } // add edges to the vertex in a graph based on if it matches a uuid list -func (g *Graph) AddEdgesByMatchingUUIDS(v *Vertex, uuids []ResUUID) []bool { +func (g *Graph) addEdgesByMatchingUUIDS(v *Vertex, uuids []ResUUID) []bool { // search for edges and see what matches! var result []bool @@ -316,9 +262,251 @@ func (g *Graph) AddEdgesByMatchingUUIDS(v *Vertex, uuids []ResUUID) []bool { break } } - result = append(result, found) } - return result } + +// add auto edges to graph +func (g *Graph) AutoEdges() { + log.Println("Compile: Adding AutoEdges...") + for _, v := range g.GetVertices() { // for each vertexes autoedges + if !v.GetMeta().AutoEdge { // is the metaparam true? + continue + } + autoEdgeObj := v.AutoEdges() + if autoEdgeObj == nil { + log.Printf("%v[%v]: Config: No auto edges were found!", v.Kind(), v.GetName()) + continue // next vertex + } + + for { // while the autoEdgeObj has more uuids to add... + uuids := autoEdgeObj.Next() // get some! + if uuids == nil { + log.Printf("%v[%v]: Config: The auto edge list is empty!", v.Kind(), v.GetName()) + break // inner loop + } + if DEBUG { + log.Println("Compile: AutoEdge: UUIDS:") + for i, u := range uuids { + log.Printf("Compile: AutoEdge: UUID%d: %v", i, u) + } + } + + // match and add edges + result := g.addEdgesByMatchingUUIDS(v, uuids) + + // report back, and find out if we should continue + if !autoEdgeObj.Test(result) { + break + } + } + } +} + +// AutoGrouper is the required interface to implement for an autogroup algorithm +type AutoGrouper interface { + // listed in the order these are typically called in... + name() string // friendly identifier + init(*Graph) error // only call once + vertexNext() (*Vertex, *Vertex, error) // mostly algorithmic + vertexCmp(*Vertex, *Vertex) error // can we merge these ? + vertexMerge(*Vertex, *Vertex) (*Vertex, error) // vertex merge fn to use + edgeMerge(*Edge, *Edge) *Edge // edge merge fn to use + vertexTest(bool) (bool, error) // call until false +} + +// baseGrouper is the base type for implementing the AutoGrouper interface +type baseGrouper struct { + graph *Graph // store a pointer to the graph + vertices []*Vertex // cached list of vertices + i int + j int + done bool +} + +// name provides a friendly name for the logs to see +func (ag *baseGrouper) name() string { + return "baseGrouper" +} + +// init is called only once and before using other AutoGrouper interface methods +// the name method is the only exception: call it any time without side effects! +func (ag *baseGrouper) init(g *Graph) error { + if ag.graph != nil { + return fmt.Errorf("The init method has already been called!") + } + ag.graph = g // pointer + ag.vertices = ag.graph.GetVertices() // cache + ag.i = 0 + ag.j = 0 + if len(ag.vertices) == 0 { // empty graph + ag.done = true + return nil + } + return nil +} + +// vertexNext is a simple iterator that loops through vertex (pair) combinations +// an intelligent algorithm would selectively offer only valid pairs of vertices +// these should satisfy logical grouping requirements for the autogroup designs! +// the desired algorithms can override, but keep this method as a base iterator! +func (ag *baseGrouper) vertexNext() (v1, v2 *Vertex, err error) { + // this does a for v... { for w... { return v, w }} but stepwise! + l := len(ag.vertices) + if ag.i < l { + v1 = ag.vertices[ag.i] + } + if ag.j < l { + v2 = ag.vertices[ag.j] + } + + // in case the vertex was deleted + if !ag.graph.HasVertex(v1) { + v1 = nil + } + if !ag.graph.HasVertex(v2) { + v2 = nil + } + + // two nested loops... + if ag.j < l { + ag.j++ + } + if ag.j == l { + ag.j = 0 + if ag.i < l { + ag.i++ + } + if ag.i == l { + ag.done = true + } + } + + return +} + +func (ag *baseGrouper) vertexCmp(v1, v2 *Vertex) error { + if v1 == nil || v2 == nil { + return fmt.Errorf("Vertex is nil!") + } + if v1 == v2 { // skip yourself + return fmt.Errorf("Vertices are the same!") + } + if v1.Kind() != v2.Kind() { // we must group similar kinds + // TODO: maybe future resources won't need this limitation? + return fmt.Errorf("The two resources aren't the same kind!") + } + // someone doesn't want to group! + if !v1.GetMeta().AutoGroup || !v2.GetMeta().AutoGroup { + return fmt.Errorf("One of the autogroup flags is false!") + } + if v1.Res.IsGrouped() { // already grouped! + return fmt.Errorf("Already grouped!") + } + if len(v2.Res.GetGroup()) > 0 { // already has children grouped! + return fmt.Errorf("Already has groups!") + } + if !v1.Res.GroupCmp(v2.Res) { // resource groupcmp failed! + return fmt.Errorf("The GroupCmp failed!") + } + return nil // success +} + +func (ag *baseGrouper) vertexMerge(v1, v2 *Vertex) (v *Vertex, err error) { + // NOTE: it's important to use w.Res instead of w, b/c + // the w by itself is the *Vertex obj, not the *Res obj + // which is contained within it! They both satisfy the + // Res interface, which is why both will compile! :( + err = v1.Res.GroupRes(v2.Res) // GroupRes skips stupid groupings + return // success or fail, and no need to merge the actual vertices! +} + +func (ag *baseGrouper) edgeMerge(e1, e2 *Edge) *Edge { + return e1 // noop +} + +// vertexTest processes the results of the grouping for the algorithm to know +// return an error if something went horribly wrong, and bool false to stop +func (ag *baseGrouper) vertexTest(b bool) (bool, error) { + // NOTE: this particular baseGrouper version doesn't track what happens + // because since we iterate over every pair, we don't care which merge! + if ag.done { + return false, nil + } + return true, nil +} + +type algorithmNameGrouper struct { // XXX rename me! + baseGrouper // "inherit" what we want, and reimplement the rest +} + +func (ag *algorithmNameGrouper) name() string { + log.Fatal("Not implemented!") // XXX + return "algorithmNameGrouper" +} + +func (ag *algorithmNameGrouper) vertexNext() (v1, v2 *Vertex, err error) { + log.Fatal("Not implemented!") // XXX + // NOTE: you can even build this like this: + //v1, v2, err = ag.baseGrouper.vertexNext() // get all iterable pairs + // ... + //ag.baseGrouper.vertexTest(...) + //return + return nil, nil, fmt.Errorf("Not implemented!") +} + +// autoGroup is the mechanical auto group "runner" that runs the interface spec +func (g *Graph) autoGroup(ag AutoGrouper) chan string { + strch := make(chan string) // output log messages here + go func(strch chan string) { + strch <- fmt.Sprintf("Compile: Grouping: Algorithm: %v...", ag.name()) + if err := ag.init(g); err != nil { + log.Fatalf("Error running autoGroup(init): %v", err) + } + + for { + var v, w *Vertex + v, w, err := ag.vertexNext() // get pair to compare + if err != nil { + log.Fatalf("Error running autoGroup(vertexNext): %v", err) + } + merged := false + // save names since they change during the runs + vStr := fmt.Sprintf("%s", v) // valid even if it is nil + wStr := fmt.Sprintf("%s", w) + + if err := ag.vertexCmp(v, w); err != nil { // cmp ? + strch <- fmt.Sprintf("Compile: Grouping: !GroupCmp for: %s into %s", wStr, vStr) + + // remove grouped vertex and merge edges (res is safe) + } else if err := g.VertexMerge(v, w, ag.vertexMerge, ag.edgeMerge); err != nil { // merge... + strch <- fmt.Sprintf("Compile: Grouping: !VertexMerge for: %s into %s", wStr, vStr) + + } else { // success! + strch <- fmt.Sprintf("Compile: Grouping: Success for: %s into %s", wStr, vStr) + merged = true // woo + } + + // did these get used? + if ok, err := ag.vertexTest(merged); err != nil { + log.Fatalf("Error running autoGroup(vertexTest): %v", err) + } else if !ok { + break // done! + } + } + + close(strch) + return + }(strch) // call function + return strch +} + +// AutoGroup runs the auto grouping on the graph and prints out log messages +func (g *Graph) AutoGroup() { + // receive log messages from channel... + // this allows test cases to avoid printing them when they're unwanted! + for str := range g.autoGroup(&baseGrouper{}) { + log.Println(str) + } +} diff --git a/configwatch.go b/configwatch.go index 307498ce..804005ce 100644 --- a/configwatch.go +++ b/configwatch.go @@ -138,7 +138,7 @@ func ConfigWatch(file string) chan bool { } case err := <-watcher.Errors: - log.Println("error:", err) + log.Printf("error: %v", err) log.Fatal(err) } diff --git a/etcd.go b/etcd.go index b929fce0..2e7f347d 100644 --- a/etcd.go +++ b/etcd.go @@ -207,20 +207,14 @@ func (etcdO *EtcdWObject) EtcdWatch() chan etcdMsg { } // helper function to store our data in etcd -func (etcdO *EtcdWObject) EtcdPut(hostname, key, res string, obj interface{}) bool { +func (etcdO *EtcdWObject) EtcdPut(hostname, key, res string, data string) bool { kapi := etcdO.GetKAPI() - output, ok := ObjToB64(obj) - if !ok { - log.Printf("Etcd: Could not encode %v key.", key) - return false - } - path := fmt.Sprintf("/exported/%s/resources/%s/res", hostname, key) _, err := kapi.Set(etcd_context.Background(), path, res, nil) // XXX validate... path = fmt.Sprintf("/exported/%s/resources/%s/value", hostname, key) - resp, err := kapi.Set(etcd_context.Background(), path, output, nil) + resp, err := kapi.Set(etcd_context.Background(), path, data, nil) if err != nil { if cerr, ok := err.(*etcd.ClusterError); ok { // not running or disconnected diff --git a/examples/etcd1a.yaml b/examples/etcd1a.yaml index 6dd73936..46378d3c 100644 --- a/examples/etcd1a.yaml +++ b/examples/etcd1a.yaml @@ -13,6 +13,6 @@ resources: i am f2, exported from host A state: exists collect: -- res: file +- kind: file pattern: "/tmp/mgmtA/" edges: [] diff --git a/examples/etcd1b.yaml b/examples/etcd1b.yaml index c232acb4..ae491180 100644 --- a/examples/etcd1b.yaml +++ b/examples/etcd1b.yaml @@ -13,6 +13,6 @@ resources: i am f2, exported from host B state: exists collect: -- res: file +- kind: file pattern: "/tmp/mgmtB/" edges: [] diff --git a/examples/etcd1c.yaml b/examples/etcd1c.yaml index 80184451..2b9bd8cc 100644 --- a/examples/etcd1c.yaml +++ b/examples/etcd1c.yaml @@ -13,6 +13,6 @@ resources: i am f2, exported from host C state: exists collect: -- res: file +- kind: file pattern: "/tmp/mgmtC/" edges: [] diff --git a/examples/exec1.yaml b/examples/exec1.yaml index ab2f6227..adb0bfd6 100644 --- a/examples/exec1.yaml +++ b/examples/exec1.yaml @@ -45,15 +45,15 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec2 - name: e2 from: - res: exec + kind: exec name: exec2 to: - res: exec + kind: exec name: exec3 diff --git a/examples/exec1a.yaml b/examples/exec1a.yaml index f9d3c1f1..4896429e 100644 --- a/examples/exec1a.yaml +++ b/examples/exec1a.yaml @@ -25,8 +25,8 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec2 diff --git a/examples/exec1b.yaml b/examples/exec1b.yaml index 34a0f943..04f8e002 100644 --- a/examples/exec1b.yaml +++ b/examples/exec1b.yaml @@ -25,8 +25,8 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec2 diff --git a/examples/exec1c.yaml b/examples/exec1c.yaml index 59992a06..942dcd42 100644 --- a/examples/exec1c.yaml +++ b/examples/exec1c.yaml @@ -25,8 +25,8 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec2 diff --git a/examples/exec2.yaml b/examples/exec2.yaml index 567988b7..bedec63d 100644 --- a/examples/exec2.yaml +++ b/examples/exec2.yaml @@ -55,29 +55,29 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec2 - name: e2 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec3 - name: e3 from: - res: exec + kind: exec name: exec2 to: - res: exec + kind: exec name: exec4 - name: e4 from: - res: exec + kind: exec name: exec3 to: - res: exec + kind: exec name: exec4 diff --git a/examples/file1.yaml b/examples/file1.yaml index 1dd94c9a..f69394ee 100644 --- a/examples/file1.yaml +++ b/examples/file1.yaml @@ -27,15 +27,15 @@ resources: edges: - name: e1 from: - res: file + kind: file name: file1 to: - res: file + kind: file name: file2 - name: e2 from: - res: file + kind: file name: file2 to: - res: file + kind: file name: file3 diff --git a/examples/graph0.yaml b/examples/graph0.yaml index 07acecc6..231d0051 100644 --- a/examples/graph0.yaml +++ b/examples/graph0.yaml @@ -13,8 +13,8 @@ resources: edges: - name: e1 from: - res: noop + kind: noop name: noop1 to: - res: file + kind: file name: file1 diff --git a/examples/graph10.yaml b/examples/graph10.yaml index 41a6cac8..fc7c7dfd 100644 --- a/examples/graph10.yaml +++ b/examples/graph10.yaml @@ -86,43 +86,43 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec4 - name: e2 from: - res: exec + kind: exec name: exec2 to: - res: exec + kind: exec name: exec4 - name: e3 from: - res: exec + kind: exec name: exec3 to: - res: exec + kind: exec name: exec4 - name: e4 from: - res: exec + kind: exec name: exec4 to: - res: exec + kind: exec name: exec5 - name: e5 from: - res: exec + kind: exec name: exec4 to: - res: exec + kind: exec name: exec6 - name: e6 from: - res: exec + kind: exec name: exec4 to: - res: exec + kind: exec name: exec7 diff --git a/examples/graph1a.yaml b/examples/graph1a.yaml index b17302f9..20421b35 100644 --- a/examples/graph1a.yaml +++ b/examples/graph1a.yaml @@ -15,8 +15,8 @@ resources: edges: - name: e1 from: - res: file + kind: file name: file1 to: - res: file + kind: file name: file2 diff --git a/examples/graph1b.yaml b/examples/graph1b.yaml index 4dbd872d..efedc800 100644 --- a/examples/graph1b.yaml +++ b/examples/graph1b.yaml @@ -15,8 +15,8 @@ resources: edges: - name: e2 from: - res: file + kind: file name: file2 to: - res: file + kind: file name: file3 diff --git a/examples/graph3a.yaml b/examples/graph3a.yaml index 4e0906ca..4f32f24b 100644 --- a/examples/graph3a.yaml +++ b/examples/graph3a.yaml @@ -23,6 +23,6 @@ resources: i am f4, exported from host A state: exists collect: -- res: file +- kind: file pattern: "/tmp/mgmtA/" edges: [] diff --git a/examples/graph3b.yaml b/examples/graph3b.yaml index d7a670f9..f3616497 100644 --- a/examples/graph3b.yaml +++ b/examples/graph3b.yaml @@ -23,6 +23,6 @@ resources: i am f4, exported from host B state: exists collect: -- res: file +- kind: file pattern: "/tmp/mgmtB/" edges: [] diff --git a/examples/graph3c.yaml b/examples/graph3c.yaml index db60ffee..82050ab7 100644 --- a/examples/graph3c.yaml +++ b/examples/graph3c.yaml @@ -23,6 +23,6 @@ resources: i am f4, exported from host C state: exists collect: -- res: file +- kind: file pattern: "/tmp/mgmtC/" edges: [] diff --git a/examples/graph4.yaml b/examples/graph4.yaml index 941a2c65..d147cc3c 100644 --- a/examples/graph4.yaml +++ b/examples/graph4.yaml @@ -13,6 +13,6 @@ resources: i am f3, exported from host A state: exists collect: -- res: file +- kind: file pattern: '' edges: diff --git a/examples/graph5.yaml b/examples/graph5.yaml index 736e4dde..5cec004d 100644 --- a/examples/graph5.yaml +++ b/examples/graph5.yaml @@ -8,6 +8,6 @@ resources: i am f1 state: exists collect: -- res: file +- kind: file pattern: '' edges: diff --git a/examples/graph9.yaml b/examples/graph9.yaml index 9a4dc6be..0e195493 100644 --- a/examples/graph9.yaml +++ b/examples/graph9.yaml @@ -56,22 +56,22 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec5 - name: e2 from: - res: exec + kind: exec name: exec2 to: - res: exec + kind: exec name: exec5 - name: e3 from: - res: exec + kind: exec name: exec3 to: - res: exec + kind: exec name: exec5 diff --git a/examples/svc1.yaml b/examples/svc1.yaml index 9fdd6ced..11e2a733 100644 --- a/examples/svc1.yaml +++ b/examples/svc1.yaml @@ -16,15 +16,15 @@ resources: edges: - name: e1 from: - res: noop + kind: noop name: noop1 to: - res: file + kind: file name: file1 - name: e2 from: - res: file + kind: file name: file1 to: - res: svc + kind: svc name: purpleidea diff --git a/exec.go b/exec.go index 368edbba..2f7c1528 100644 --- a/exec.go +++ b/exec.go @@ -20,12 +20,17 @@ package main import ( "bufio" "bytes" + "encoding/gob" "errors" "log" "os/exec" "strings" ) +func init() { + gob.Register(&ExecRes{}) +} + type ExecRes struct { BaseRes `yaml:",inline"` State string `yaml:"state"` // state: exists/present?, absent, (undefined?) @@ -97,7 +102,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan } // Exec watcher -func (obj *ExecRes) Watch() { +func (obj *ExecRes) Watch(processChan chan struct{}) { if obj.IsWatching() { return } @@ -187,8 +192,8 @@ func (obj *ExecRes) Watch() { if send { send = false // it is okay to invalidate the clean state on poke too - obj.isStateOK = false // something made state dirty - Process(obj) // XXX: rename this function + obj.isStateOK = false // something made state dirty + processChan <- struct{}{} // trigger process } } } diff --git a/file.go b/file.go index d56a3e8d..bb8c015b 100644 --- a/file.go +++ b/file.go @@ -22,6 +22,7 @@ import ( "encoding/hex" "gopkg.in/fsnotify.v1" //"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1" + "encoding/gob" "io" "log" "math" @@ -31,6 +32,10 @@ import ( "syscall" ) +func init() { + gob.Register(&FileRes{}) +} + type FileRes struct { BaseRes `yaml:",inline"` Path string `yaml:"path"` // path variable (should default to name) @@ -97,7 +102,7 @@ func (obj *FileRes) Validate() bool { // File watcher for files and directories // Modify with caution, probably important to write some test cases first! // obj.GetPath(): file or directory -func (obj *FileRes) Watch() { +func (obj *FileRes) Watch(processChan chan struct{}) { if obj.IsWatching() { return } @@ -230,7 +235,7 @@ func (obj *FileRes) Watch() { case err := <-watcher.Errors: obj.SetConvergedState(resConvergedNil) // XXX ? - log.Println("error:", err) + log.Printf("error: %v", err) log.Fatal(err) //obj.events <- fmt.Sprintf("file: %v", "error") // XXX: how should we handle errors? @@ -255,7 +260,7 @@ func (obj *FileRes) Watch() { dirty = false obj.isStateOK = false // something made state dirty } - Process(obj) // XXX: rename this function + processChan <- struct{}{} // trigger process } } } @@ -488,3 +493,8 @@ func (obj *FileRes) Compare(res Res) bool { } return true } + +func (obj *FileRes) CollectPattern(pattern string) { + // XXX: currently the pattern for files can only override the Dirname variable :P + obj.Dirname = pattern // XXX: simplistic for now +} diff --git a/main.go b/main.go index fc06b05e..ec9eb03d 100644 --- a/main.go +++ b/main.go @@ -63,7 +63,7 @@ func run(c *cli.Context) { converged := make(chan bool) // converged signal log.Printf("This is: %v, version: %v", program, version) log.Printf("Main: Start: %v", start) - G := NewGraph("Graph") // give graph a default name + var G, fullGraph *Graph // exit after `max-runtime` seconds for no reason at all... if i := c.Int("max-runtime"); i > 0 { @@ -102,10 +102,11 @@ func run(c *cli.Context) { if !c.Bool("no-watch") { configchan = ConfigWatch(file) } - log.Printf("Etcd: Starting...") + log.Println("Etcd: Starting...") etcdchan := etcdO.EtcdWatch() first := true // first loop or not for { + log.Println("Main: Waiting...") select { case _ = <-startchan: // kick the loop once at start // pass @@ -134,17 +135,29 @@ func run(c *cli.Context) { } // run graph vertex LOCK... - if !first { // XXX: we can flatten this check out I think - log.Printf("State: %v -> %v", G.SetState(graphPausing), G.GetState()) + if !first { // TODO: we can flatten this check out I think G.Pause() // sync - log.Printf("State: %v -> %v", G.SetState(graphPaused), G.GetState()) } - // build the graph from a config file - // build the graph on events (eg: from etcd) - if !UpdateGraphFromConfig(config, hostname, G, etcdO) { - log.Fatal("Config: We borked the graph.") // XXX + // build graph from yaml file on events (eg: from etcd) + // we need the vertices to be paused to work on them + if newFullgraph, err := fullGraph.NewGraphFromConfig(config, etcdO, hostname); err == nil { // keep references to all original elements + fullGraph = newFullgraph + } else { + log.Printf("Config: Error making new graph from config: %v", err) + // unpause! + if !first { + G.Start(&wg, first) // sync + } + continue } + + G = fullGraph.Copy() // copy to active graph + // XXX: do etcd transaction out here... + G.AutoEdges() // add autoedges; modifies the graph + //G.AutoGroup() // run autogroup; modifies the graph // TODO + // TODO: do we want to do a transitive reduction? + log.Printf("Graph: %v", G) // show graph err := G.ExecGraphviz(c.String("graphviz-filter"), c.String("graphviz")) if err != nil { @@ -159,9 +172,7 @@ func run(c *cli.Context) { // some are not ready yet and the EtcdWatch // loops, we'll cause G.Pause(...) before we // even got going, thus causing nil pointer errors - log.Printf("State: %v -> %v", G.SetState(graphStarting), G.GetState()) G.Start(&wg, first) // sync - log.Printf("State: %v -> %v", G.SetState(graphStarted), G.GetState()) first = false } }() diff --git a/misc.go b/misc.go index 8962f7b5..1f94ec6d 100644 --- a/misc.go +++ b/misc.go @@ -18,9 +18,6 @@ package main import ( - "bytes" - "encoding/base64" - "encoding/gob" "github.com/godbus/dbus" "path" "sort" @@ -28,6 +25,11 @@ import ( "time" ) +// returns the string with the first character capitalized +func FirstToUpper(str string) string { + return strings.ToUpper(str[0:1]) + str[1:] +} + // return true if a string exists inside a list, otherwise false func StrInList(needle string, haystack []string) bool { for _, x := range haystack { @@ -136,6 +138,9 @@ func Dirname(p string) string { func Basename(p string) string { _, b := path.Split(path.Clean(p)) + if p == "" { + return "" + } if p[len(p)-1:] == "/" { // don't loose the tail slash b += "/" } @@ -265,36 +270,6 @@ func DirifyFileList(fileList []string, removeDirs bool) []string { return result } -// encode an object as base 64, serialize and then base64 encode -func ObjToB64(obj interface{}) (string, bool) { - b := bytes.Buffer{} - e := gob.NewEncoder(&b) - err := e.Encode(obj) - if err != nil { - //log.Println("Gob failed to Encode: ", err) - return "", false - } - return base64.StdEncoding.EncodeToString(b.Bytes()), true -} - -// TODO: is it possible to somehow generically just return the obj? -// decode an object into the waiting obj which you pass a reference to -func B64ToObj(str string, obj interface{}) bool { - bb, err := base64.StdEncoding.DecodeString(str) - if err != nil { - //log.Println("Base64 failed to Decode: ", err) - return false - } - b := bytes.NewBuffer(bb) - d := gob.NewDecoder(b) - err = d.Decode(obj) - if err != nil { - //log.Println("Gob failed to Decode: ", err) - return false - } - return true -} - // special version of time.After that blocks when given a negative integer // when used in a case statement, the timer restarts on each select call to it func TimeAfterOrBlock(t int) <-chan time.Time { diff --git a/misc_test.go b/misc_test.go index 990e643b..ab484193 100644 --- a/misc_test.go +++ b/misc_test.go @@ -18,7 +18,6 @@ package main import ( - "fmt" "reflect" "sort" "testing" @@ -58,6 +57,9 @@ func TestMiscT1(t *testing.T) { t.Errorf("Result is incorrect.") } + if Basename("") != "" { // TODO: should this equal something different? + t.Errorf("Result is incorrect.") + } } func TestMiscT2(t *testing.T) { @@ -169,57 +171,6 @@ func TestMiscT5(t *testing.T) { } } -func TestMiscT6(t *testing.T) { - - type foo struct { - Name string `yaml:"name"` - Res string `yaml:"res"` - Value int `yaml:"value"` - } - - obj := foo{"dude", "sweet", 42} - output, ok := ObjToB64(obj) - if ok != true { - t.Errorf("First result should be true.") - } - var data foo - if B64ToObj(output, &data) != true { - t.Errorf("Second result should be true.") - } - // TODO: there is probably a better way to compare these two... - if fmt.Sprintf("%+v\n", obj) != fmt.Sprintf("%+v\n", data) { - t.Errorf("Strings should match.") - } -} - -func TestMiscT7(t *testing.T) { - - type Foo struct { - Name string `yaml:"name"` - Res string `yaml:"res"` - Value int `yaml:"value"` - } - - type bar struct { - Foo `yaml:",inline"` // anonymous struct must be public! - Comment string `yaml:"comment"` - } - - obj := bar{Foo{"dude", "sweet", 42}, "hello world"} - output, ok := ObjToB64(obj) - if ok != true { - t.Errorf("First result should be true.") - } - var data bar - if B64ToObj(output, &data) != true { - t.Errorf("Second result should be true.") - } - // TODO: there is probably a better way to compare these two... - if fmt.Sprintf("%+v\n", obj) != fmt.Sprintf("%+v\n", data) { - t.Errorf("Strings should match.") - } -} - func TestMiscT8(t *testing.T) { r0 := []string{"/"} diff --git a/noop.go b/noop.go index 27b452dc..591b36f2 100644 --- a/noop.go +++ b/noop.go @@ -18,9 +18,14 @@ package main import ( + "encoding/gob" "log" ) +func init() { + gob.Register(&NoopRes{}) +} + type NoopRes struct { BaseRes `yaml:",inline"` Comment string `yaml:"comment"` // extra field for example purposes @@ -48,7 +53,7 @@ func (obj *NoopRes) Validate() bool { return true } -func (obj *NoopRes) Watch() { +func (obj *NoopRes) Watch(processChan chan struct{}) { if obj.IsWatching() { return } @@ -79,7 +84,7 @@ func (obj *NoopRes) Watch() { send = false // only do this on certain types of events //obj.isStateOK = false // something made state dirty - Process(obj) // XXX: rename this function + processChan <- struct{}{} // trigger process } } } diff --git a/pgraph.go b/pgraph.go index 912b614c..bb23ecb9 100644 --- a/pgraph.go +++ b/pgraph.go @@ -35,11 +35,11 @@ import ( type graphState int const ( - graphNil graphState = iota - graphStarting - graphStarted - graphPausing - graphPaused + graphStateNil graphState = iota + graphStateStarting + graphStateStarted + graphStatePausing + graphStatePaused ) // The graph abstract data type (ADT) is defined as follows: @@ -55,9 +55,8 @@ type Graph struct { } type Vertex struct { - graph *Graph // store a pointer to the graph it's on - Res // anonymous field - data map[string]string // XXX: currently unused i think, remove? + Res // anonymous field + timestamp int64 // last updated timestamp ? } type Edge struct { @@ -68,7 +67,7 @@ func NewGraph(name string) *Graph { return &Graph{ Name: name, Adjacency: make(map[*Vertex]map[*Vertex]*Edge), - state: graphNil, + state: graphStateNil, } } @@ -84,6 +83,19 @@ func NewEdge(name string) *Edge { } } +// Copy makes a copy of the graph struct +func (g *Graph) Copy() *Graph { + newGraph := &Graph{ + Name: g.Name, + Adjacency: make(map[*Vertex]map[*Vertex]*Edge, len(g.Adjacency)), + state: g.state, + } + for k, v := range g.Adjacency { + newGraph.Adjacency[k] = v // copy + } + return newGraph +} + // returns the name of the graph func (g *Graph) GetName() string { return g.Name @@ -116,13 +128,12 @@ func (g *Graph) SetVertex() { } } -// add a new vertex to the graph -func (g *Graph) AddVertex(v *Vertex) { - if _, exists := g.Adjacency[v]; !exists { - g.Adjacency[v] = make(map[*Vertex]*Edge) - - // store a pointer to the graph it's on for convenience and readability - v.graph = g +// AddVertex uses variadic input to add all listed vertices to the graph +func (g *Graph) AddVertex(xv ...*Vertex) { + for _, v := range xv { + if _, exists := g.Adjacency[v]; !exists { + g.Adjacency[v] = make(map[*Vertex]*Edge) + } } } @@ -136,9 +147,9 @@ func (g *Graph) DeleteVertex(v *Vertex) { // adds a directed edge to the graph from v1 to v2 func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) { // NOTE: this doesn't allow more than one edge between two vertexes... - // TODO: is this a problem? - g.AddVertex(v1) - g.AddVertex(v2) + g.AddVertex(v1, v2) // supports adding N vertices now + // TODO: check if an edge exists to avoid overwriting it! + // NOTE: VertexMerge() depends on overwriting it at the moment... g.Adjacency[v1][v2] = e } @@ -198,6 +209,11 @@ func (g *Graph) String() string { return fmt.Sprintf("Vertices(%d), Edges(%d)", g.NumVertices(), g.NumEdges()) } +// String returns the canonical form for a vertex +func (v *Vertex) String() string { + return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName()) +} + // output the graph in graphviz format // https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29 func (g *Graph) Graphviz() (out string) { @@ -281,7 +297,7 @@ func (g *Graph) ExecGraphviz(program, filename string) error { } // return an array (slice) of all directed vertices to vertex v (??? -> v) -// ostimestamp should use this +// OKTimestamp should use this func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex { // TODO: we might be able to implement this differently by reversing // the Adjacency graph and then looping through it again... @@ -465,9 +481,105 @@ func (g *Graph) TopologicalSort() (result []*Vertex, ok bool) { // kahn's algori return L, true } -// return a pointer to the graph a vertex is on -func (v *Vertex) GetGraph() *Graph { - return v.graph +// Reachability finds the shortest path in a DAG from a to b, and returns the +// slice of vertices that matched this particular path including both a and b. +// It returns nil if a or b is nil, and returns empty list if no path is found. +// Since there could be more than one possible result for this operation, we +// arbitrarily choose one of the shortest possible. As a result, this should +// actually return a tree if we cared about correctness. +// This operates by a recursive algorithm; a more efficient version is likely. +// If you don't give this function a DAG, you might cause infinite recursion! +func (g *Graph) Reachability(a, b *Vertex) []*Vertex { + if a == nil || b == nil { + return nil + } + vertices := g.OutgoingGraphEdges(a) // what points away from a ? + if len(vertices) == 0 { + return []*Vertex{} // nope + } + if VertexContains(b, vertices) { + return []*Vertex{a, b} // found + } + // TODO: parallelize this with go routines? + var collected = make([][]*Vertex, len(vertices)) + pick := -1 + for i, v := range vertices { + collected[i] = g.Reachability(v, b) // find b by recursion + if l := len(collected[i]); l > 0 { + // pick shortest path + // TODO: technically i should return a tree + if pick < 0 || l < len(collected[pick]) { + pick = i + } + } + } + if pick < 0 { + return []*Vertex{} // nope + } + result := []*Vertex{a} // tack on a + result = append(result, collected[pick]...) + return result +} + +// VertexMerge merges v2 into v1 by reattaching the edges where appropriate, +// and then by deleting v2 from the graph. Since more than one edge between two +// vertices is not allowed, duplicate edges are merged as well. an edge merge +// function can be provided if you'd like to control how you merge the edges! +func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) (*Vertex, error), edgeMergeFn func(*Edge, *Edge) *Edge) error { + // methodology + // 1) edges between v1 and v2 are removed + //Loop: + for k1 := range g.Adjacency { + for k2 := range g.Adjacency[k1] { + // v1 -> v2 || v2 -> v1 + if (k1 == v1 && k2 == v2) || (k1 == v2 && k2 == v1) { + delete(g.Adjacency[k1], k2) // delete map & edge + // NOTE: if we assume this is a DAG, then we can + // assume only v1 -> v2 OR v2 -> v1 exists, and + // we can break out of these loops immediately! + //break Loop + break + } + } + } + + // 2) edges that point towards v2 from X now point to v1 from X (no dupes) + for _, x := range g.IncomingGraphEdges(v2) { // all to vertex v (??? -> v) + e := g.Adjacency[x][v2] // previous edge + // merge e with ex := g.Adjacency[x][v1] if it exists! + if ex, exists := g.Adjacency[x][v1]; exists && edgeMergeFn != nil { + e = edgeMergeFn(e, ex) + } + g.AddEdge(x, v1, e) // overwrite edge + delete(g.Adjacency[x], v2) // delete old edge + } + + // 3) edges that point from v2 to X now point from v1 to X (no dupes) + for _, x := range g.OutgoingGraphEdges(v2) { // all from vertex v (v -> ???) + e := g.Adjacency[v2][x] // previous edge + // merge e with ex := g.Adjacency[v1][x] if it exists! + if ex, exists := g.Adjacency[v1][x]; exists && edgeMergeFn != nil { + e = edgeMergeFn(e, ex) + } + g.AddEdge(v1, x, e) // overwrite edge + delete(g.Adjacency[v2], x) + } + + // 4) merge and then remove the (now merged/grouped) vertex + if vertexMergeFn != nil { // run vertex merge function + if v, err := vertexMergeFn(v1, v2); err != nil { + return err + } else if v != nil { // replace v1 with the "merged" version... + v1 = v // XXX: will this replace v1 the way we want? + } + } + g.DeleteVertex(v2) // remove grouped vertex + + // 5) creation of a cyclic graph should throw an error + if _, dag := g.TopologicalSort(); !dag { // am i a dag or not? + return fmt.Errorf("Graph is not a dag!") + } + return nil // success } func HeisenbergCount(ch chan *Vertex) int { @@ -479,8 +591,134 @@ func HeisenbergCount(ch chan *Vertex) int { return c } +// GetTimestamp returns the timestamp of a vertex +func (v *Vertex) GetTimestamp() int64 { + return v.timestamp +} + +// UpdateTimestamp updates the timestamp on a vertex and returns the new value +func (v *Vertex) UpdateTimestamp() int64 { + v.timestamp = time.Now().UnixNano() // update + return v.timestamp +} + +// can this element run right now? +func (g *Graph) OKTimestamp(v *Vertex) bool { + // these are all the vertices pointing TO v, eg: ??? -> v + for _, n := range g.IncomingGraphEdges(v) { + // if the vertex has a greater timestamp than any pre-req (n) + // then we can't run right now... + // if they're equal (eg: on init of 0) then we also can't run + // b/c we should let our pre-req's go first... + x, y := v.GetTimestamp(), n.GetTimestamp() + if DEBUG { + log.Printf("%v[%v]: OKTimestamp: (%v) >= %v[%v](%v): !%v", v.Kind(), v.GetName(), x, n.Kind(), n.GetName(), y, x >= y) + } + if x >= y { + return false + } + } + return true +} + +// notify nodes after me in the dependency graph that they need refreshing... +// NOTE: this assumes that this can never fail or need to be rescheduled +func (g *Graph) Poke(v *Vertex, activity bool) { + // these are all the vertices pointing AWAY FROM v, eg: v -> ??? + for _, n := range g.OutgoingGraphEdges(v) { + // XXX: if we're in state event and haven't been cancelled by + // apply, then we can cancel a poke to a child, right? XXX + // XXX: if n.Res.GetState() != resStateEvent { // is this correct? + if true { // XXX + if DEBUG { + log.Printf("%v[%v]: Poke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) + } + n.SendEvent(eventPoke, false, activity) // XXX: can this be switched to sync? + } else { + if DEBUG { + log.Printf("%v[%v]: Poke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) + } + } + } +} + +// poke the pre-requisites that are stale and need to run before I can run... +func (g *Graph) BackPoke(v *Vertex) { + // these are all the vertices pointing TO v, eg: ??? -> v + for _, n := range g.IncomingGraphEdges(v) { + x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState() + // if the parent timestamp needs poking AND it's not in state + // resStateEvent, then poke it. If the parent is in resStateEvent it + // means that an event is pending, so we'll be expecting a poke + // back soon, so we can safely discard the extra parent poke... + // TODO: implement a stateLT (less than) to tell if something + // happens earlier in the state cycle and that doesn't wrap nil + if x >= y && (s != resStateEvent && s != resStateCheckApply) { + if DEBUG { + log.Printf("%v[%v]: BackPoke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) + } + n.SendEvent(eventBackPoke, false, false) // XXX: can this be switched to sync? + } else { + if DEBUG { + log.Printf("%v[%v]: BackPoke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) + } + } + } +} + +// XXX: rename this function +func (g *Graph) Process(v *Vertex) { + obj := v.Res + if DEBUG { + log.Printf("%v[%v]: Process()", obj.Kind(), obj.GetName()) + } + obj.SetState(resStateEvent) + var ok = true + var apply = false // did we run an apply? + // is it okay to run dependency wise right now? + // if not, that's okay because when the dependency runs, it will poke + // us back and we will run if needed then! + if g.OKTimestamp(v) { + if DEBUG { + log.Printf("%v[%v]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp()) + } + + obj.SetState(resStateCheckApply) + // if this fails, don't UpdateTimestamp() + stateok, err := obj.CheckApply(true) + if stateok && err != nil { // should never return this way + log.Fatalf("%v[%v]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), stateok, err) + } + if DEBUG { + log.Printf("%v[%v]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), stateok, err) + } + + if !stateok { // if state *was* not ok, we had to have apply'ed + if err != nil { // error during check or apply + ok = false + } else { + apply = true + } + } + + if ok { + // update this timestamp *before* we poke or the poked + // nodes might fail due to having a too old timestamp! + v.UpdateTimestamp() // this was touched... + obj.SetState(resStatePoking) // can't cancel parent poke + g.Poke(v, apply) + } + // poke at our pre-req's instead since they need to refresh/run... + } else { + // only poke at the pre-req's that need to run + go g.BackPoke(v) + } +} + // main kick to start the graph func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue + log.Printf("State: %v -> %v", g.SetState(graphStateStarting), g.GetState()) + defer log.Printf("State: %v -> %v", g.SetState(graphStateStarted), g.GetState()) t, _ := g.TopologicalSort() // TODO: only calculate indegree if `first` is true to save resources indegree := g.InDegree() // compute all of the indegree's @@ -492,7 +730,20 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ go func(vv *Vertex) { defer wg.Done() - vv.Res.Watch() + // listen for chan events from Watch() and run + // the Process() function when they're received + // this avoids us having to pass the data into + // the Watch() function about which graph it is + // running on, which isolates things nicely... + chanProcess := make(chan struct{}) + go func() { + for _ = range chanProcess { + // XXX: do we need to ACK so that it's synchronous? + g.Process(vv) + } + }() + vv.Res.Watch(chanProcess) // i block until i end + close(chanProcess) log.Printf("%v[%v]: Exited", vv.Kind(), vv.GetName()) }(v) } @@ -511,7 +762,7 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue // and not just selectively the subset with no indegree. if (!first) || indegree[v] == 0 { // ensure state is started before continuing on to next vertex - for !v.Res.SendEvent(eventStart, true, false) { + for !v.SendEvent(eventStart, true, false) { if DEBUG { // if SendEvent fails, we aren't up yet log.Printf("%v[%v]: Retrying SendEvent(Start)", v.Kind(), v.GetName()) @@ -525,13 +776,18 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue } func (g *Graph) Pause() { + log.Printf("State: %v -> %v", g.SetState(graphStatePausing), g.GetState()) + defer log.Printf("State: %v -> %v", g.SetState(graphStatePaused), g.GetState()) t, _ := g.TopologicalSort() for _, v := range t { // squeeze out the events... - v.Res.SendEvent(eventPause, true, false) + v.SendEvent(eventPause, true, false) } } func (g *Graph) Exit() { + if g == nil { + return + } // empty graph that wasn't populated yet t, _ := g.TopologicalSort() for _, v := range t { // squeeze out the events... // turn off the taps... @@ -539,7 +795,7 @@ func (g *Graph) Exit() { // when we hit the 'default' in the select statement! // XXX: we can do this to quiesce, but it's not necessary now - v.Res.SendEvent(eventExit, true, false) + v.SendEvent(eventExit, true, false) } } @@ -549,6 +805,7 @@ func (g *Graph) SetConvergedCallback(ctimeout int, converged chan bool) { } } +// in array function to test *Vertex in a slice of *Vertices func VertexContains(needle *Vertex, haystack []*Vertex) bool { for _, v := range haystack { if needle == v { @@ -558,16 +815,6 @@ func VertexContains(needle *Vertex, haystack []*Vertex) bool { return false } -// in array function to test *vertices in a slice of *vertices -func HasVertex(v *Vertex, haystack []*Vertex) bool { - for _, r := range haystack { - if v == r { - return true - } - } - return false -} - // reverse a list of vertices func Reverse(vs []*Vertex) []*Vertex { //var out []*Vertex // XXX: golint suggests, but it fails testing diff --git a/pgraph_test.go b/pgraph_test.go index 7c74bbc7..f9722e28 100644 --- a/pgraph_test.go +++ b/pgraph_test.go @@ -20,7 +20,10 @@ package main import ( + "fmt" "reflect" + "sort" + "strings" "testing" ) @@ -254,26 +257,26 @@ func TestPgraphT8(t *testing.T) { v1 := NewVertex(NewNoopRes("v1")) v2 := NewVertex(NewNoopRes("v2")) v3 := NewVertex(NewNoopRes("v3")) - if HasVertex(v1, []*Vertex{v1, v2, v3}) != true { + if VertexContains(v1, []*Vertex{v1, v2, v3}) != true { t.Errorf("Should be true instead of false.") } v4 := NewVertex(NewNoopRes("v4")) v5 := NewVertex(NewNoopRes("v5")) v6 := NewVertex(NewNoopRes("v6")) - if HasVertex(v4, []*Vertex{v5, v6}) != false { + if VertexContains(v4, []*Vertex{v5, v6}) != false { t.Errorf("Should be false instead of true.") } v7 := NewVertex(NewNoopRes("v7")) v8 := NewVertex(NewNoopRes("v8")) v9 := NewVertex(NewNoopRes("v9")) - if HasVertex(v8, []*Vertex{v7, v8, v9}) != true { + if VertexContains(v8, []*Vertex{v7, v8, v9}) != true { t.Errorf("Should be true instead of false.") } v1b := NewVertex(NewNoopRes("v1")) // same value, different objects - if HasVertex(v1b, []*Vertex{v1, v2, v3}) != false { + if VertexContains(v1b, []*Vertex{v1, v2, v3}) != false { t.Errorf("Should be false instead of true.") } } @@ -381,6 +384,211 @@ func TestPgraphT10(t *testing.T) { } } +// empty +func TestPgraphReachability0(t *testing.T) { + { + G := NewGraph("g") + result := G.Reachability(nil, nil) + if result != nil { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } + } + { + G := NewGraph("g") + v1 := NewVertex(NewNoopRes("v1")) + v6 := NewVertex(NewNoopRes("v6")) + + result := G.Reachability(v1, v6) + expected := []*Vertex{} + + if !reflect.DeepEqual(result, expected) { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } + } + { + G := NewGraph("g") + v1 := NewVertex(NewNoopRes("v1")) + v2 := NewVertex(NewNoopRes("v2")) + v3 := NewVertex(NewNoopRes("v3")) + v4 := NewVertex(NewNoopRes("v4")) + v5 := NewVertex(NewNoopRes("v5")) + v6 := NewVertex(NewNoopRes("v6")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + e5 := NewEdge("e5") + G.AddEdge(v1, v2, e1) + G.AddEdge(v2, v3, e2) + G.AddEdge(v1, v4, e3) + G.AddEdge(v3, v4, e4) + G.AddEdge(v3, v5, e5) + + result := G.Reachability(v1, v6) + expected := []*Vertex{} + + if !reflect.DeepEqual(result, expected) { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } + } +} + +// simple linear path +func TestPgraphReachability1(t *testing.T) { + G := NewGraph("g") + v1 := NewVertex(NewNoopRes("v1")) + v2 := NewVertex(NewNoopRes("v2")) + v3 := NewVertex(NewNoopRes("v3")) + v4 := NewVertex(NewNoopRes("v4")) + v5 := NewVertex(NewNoopRes("v5")) + v6 := NewVertex(NewNoopRes("v6")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + e5 := NewEdge("e5") + //e6 := NewEdge("e6") + G.AddEdge(v1, v2, e1) + G.AddEdge(v2, v3, e2) + G.AddEdge(v3, v4, e3) + G.AddEdge(v4, v5, e4) + G.AddEdge(v5, v6, e5) + + result := G.Reachability(v1, v6) + expected := []*Vertex{v1, v2, v3, v4, v5, v6} + + if !reflect.DeepEqual(result, expected) { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } +} + +// pick one of two correct paths +func TestPgraphReachability2(t *testing.T) { + G := NewGraph("g") + v1 := NewVertex(NewNoopRes("v1")) + v2 := NewVertex(NewNoopRes("v2")) + v3 := NewVertex(NewNoopRes("v3")) + v4 := NewVertex(NewNoopRes("v4")) + v5 := NewVertex(NewNoopRes("v5")) + v6 := NewVertex(NewNoopRes("v6")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + e5 := NewEdge("e5") + e6 := NewEdge("e6") + G.AddEdge(v1, v2, e1) + G.AddEdge(v1, v3, e2) + G.AddEdge(v2, v4, e3) + G.AddEdge(v3, v4, e4) + G.AddEdge(v4, v5, e5) + G.AddEdge(v5, v6, e6) + + result := G.Reachability(v1, v6) + expected1 := []*Vertex{v1, v2, v4, v5, v6} + expected2 := []*Vertex{v1, v3, v4, v5, v6} + + // !xor test + if reflect.DeepEqual(result, expected1) == reflect.DeepEqual(result, expected2) { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } +} + +// pick shortest path +func TestPgraphReachability3(t *testing.T) { + G := NewGraph("g") + v1 := NewVertex(NewNoopRes("v1")) + v2 := NewVertex(NewNoopRes("v2")) + v3 := NewVertex(NewNoopRes("v3")) + v4 := NewVertex(NewNoopRes("v4")) + v5 := NewVertex(NewNoopRes("v5")) + v6 := NewVertex(NewNoopRes("v6")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + e5 := NewEdge("e5") + e6 := NewEdge("e6") + G.AddEdge(v1, v2, e1) + G.AddEdge(v2, v3, e2) + G.AddEdge(v3, v4, e3) + G.AddEdge(v4, v5, e4) + G.AddEdge(v1, v5, e5) + G.AddEdge(v5, v6, e6) + + result := G.Reachability(v1, v6) + expected := []*Vertex{v1, v5, v6} + + if !reflect.DeepEqual(result, expected) { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } +} + +// direct path +func TestPgraphReachability4(t *testing.T) { + G := NewGraph("g") + v1 := NewVertex(NewNoopRes("v1")) + v2 := NewVertex(NewNoopRes("v2")) + v3 := NewVertex(NewNoopRes("v3")) + v4 := NewVertex(NewNoopRes("v4")) + v5 := NewVertex(NewNoopRes("v5")) + v6 := NewVertex(NewNoopRes("v6")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + e5 := NewEdge("e5") + e6 := NewEdge("e6") + G.AddEdge(v1, v2, e1) + G.AddEdge(v2, v3, e2) + G.AddEdge(v3, v4, e3) + G.AddEdge(v4, v5, e4) + G.AddEdge(v5, v6, e5) + G.AddEdge(v1, v6, e6) + + result := G.Reachability(v1, v6) + expected := []*Vertex{v1, v6} + + if !reflect.DeepEqual(result, expected) { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } +} + func TestPgraphT11(t *testing.T) { v1 := NewVertex(NewNoopRes("v1")) v2 := NewVertex(NewNoopRes("v2")) @@ -404,5 +612,647 @@ func TestPgraphT11(t *testing.T) { if rev := Reverse([]*Vertex{v6, v5, v4, v3, v2, v1}); !reflect.DeepEqual(rev, []*Vertex{v1, v2, v3, v4, v5, v6}) { t.Errorf("Reverse of vertex slice failed.") } - } + +type NoopResTest struct { + NoopRes +} + +func (obj *NoopResTest) GroupCmp(r Res) bool { + res, ok := r.(*NoopResTest) + if !ok { + return false + } + + // TODO: implement this in vertexCmp for *testBaseGrouper instead? + if strings.Contains(res.Name, ",") { // HACK + return false // element to be grouped is already grouped! + } + + // group if they start with the same letter! (helpful hack for testing) + return obj.Name[0] == res.Name[0] +} + +func NewNoopResTest(name string) *NoopResTest { + obj := &NoopResTest{ + NoopRes: NoopRes{ + BaseRes: BaseRes{ + Name: name, + Meta: MetaParams{ + AutoGroup: true, // always autogroup + }, + }, + }, + } + obj.Init() // optional here in this testing scenario (for now) + return obj +} + +// ListStrCmp compares two lists of strings +func ListStrCmp(a, b []string) bool { + //fmt.Printf("CMP: %v with %v\n", a, b) // debugging + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +// GraphCmp compares the topology of two graphs and returns nil if they're equal +// It also compares if grouped element groups are identical +func GraphCmp(g1, g2 *Graph) error { + if n1, n2 := g1.NumVertices(), g2.NumVertices(); n1 != n2 { + return fmt.Errorf("Graph g1 has %d vertices, while g2 has %d.", n1, n2) + } + if e1, e2 := g1.NumEdges(), g2.NumEdges(); e1 != e2 { + return fmt.Errorf("Graph g1 has %d edges, while g2 has %d.", e1, e2) + } + + var m = make(map[*Vertex]*Vertex) // g1 to g2 vertex correspondence +Loop: + // check vertices + for v1 := range g1.Adjacency { // for each vertex in g1 + + l1 := strings.Split(v1.GetName(), ",") // make list of everyone's names... + for _, x1 := range v1.GetGroup() { + l1 = append(l1, x1.GetName()) // add my contents + } + l1 = StrRemoveDuplicatesInList(l1) // remove duplicates + sort.Strings(l1) + + // inner loop + for v2 := range g2.Adjacency { // does it match in g2 ? + + l2 := strings.Split(v2.GetName(), ",") + for _, x2 := range v2.GetGroup() { + l2 = append(l2, x2.GetName()) + } + l2 = StrRemoveDuplicatesInList(l2) // remove duplicates + sort.Strings(l2) + + // does l1 match l2 ? + if ListStrCmp(l1, l2) { // cmp! + m[v1] = v2 + continue Loop + } + } + return fmt.Errorf("Graph g1, has no match in g2 for: %v", v1.GetName()) + } + // vertices (and groups) match :) + + // check edges + for v1 := range g1.Adjacency { // for each vertex in g1 + v2 := m[v1] // lookup in map to get correspondance + // g1.Adjacency[v1] corresponds to g2.Adjacency[v2] + if e1, e2 := len(g1.Adjacency[v1]), len(g2.Adjacency[v2]); e1 != e2 { + return fmt.Errorf("Graph g1, vertex(%v) has %d edges, while g2, vertex(%v) has %d.", v1.GetName(), e1, v2.GetName(), e2) + } + + for vv1, ee1 := range g1.Adjacency[v1] { + vv2 := m[vv1] + ee2 := g2.Adjacency[v2][vv2] + + // these are edges from v1 -> vv1 via ee1 (graph 1) + // to cmp to edges from v2 -> vv2 via ee2 (graph 2) + + // check: (1) vv1 == vv2 ? (we've already checked this!) + l1 := strings.Split(vv1.GetName(), ",") // make list of everyone's names... + for _, x1 := range vv1.GetGroup() { + l1 = append(l1, x1.GetName()) // add my contents + } + l1 = StrRemoveDuplicatesInList(l1) // remove duplicates + sort.Strings(l1) + + l2 := strings.Split(vv2.GetName(), ",") + for _, x2 := range vv2.GetGroup() { + l2 = append(l2, x2.GetName()) + } + l2 = StrRemoveDuplicatesInList(l2) // remove duplicates + sort.Strings(l2) + + // does l1 match l2 ? + if !ListStrCmp(l1, l2) { // cmp! + return fmt.Errorf("Graph g1 and g2 don't agree on: %v and %v", vv1.GetName(), vv2.GetName()) + } + + // check: (2) ee1 == ee2 + if ee1.Name != ee2.Name { + return fmt.Errorf("Graph g1 edge(%v) doesn't match g2 edge(%v)", ee1.Name, ee2.Name) + } + } + } + + return nil // success! +} + +type testBaseGrouper struct { // FIXME: update me when we've implemented the correct grouping algorithm! + baseGrouper // "inherit" what we want, and reimplement the rest +} + +func (ag *testBaseGrouper) name() string { + return "testBaseGrouper" +} + +func (ag *testBaseGrouper) vertexMerge(v1, v2 *Vertex) (v *Vertex, err error) { + if err := v1.Res.GroupRes(v2.Res); err != nil { // group them first + return nil, err + } + // HACK: update the name so it matches full list of self+grouped + obj := v1.Res + names := strings.Split(obj.GetName(), ",") // load in stored names + for _, n := range obj.GetGroup() { + names = append(names, n.GetName()) // add my contents + } + names = StrRemoveDuplicatesInList(names) // remove duplicates + sort.Strings(names) + obj.SetName(strings.Join(names, ",")) + return // success or fail, and no need to merge the actual vertices! +} + +func (ag *testBaseGrouper) edgeMerge(e1, e2 *Edge) *Edge { + // HACK: update the name so it makes a union of both names + n1 := strings.Split(e1.Name, ",") // load + n2 := strings.Split(e2.Name, ",") // load + names := append(n1, n2...) + names = StrRemoveDuplicatesInList(names) // remove duplicates + sort.Strings(names) + return NewEdge(strings.Join(names, ",")) +} + +func (g *Graph) fullPrint() (str string) { + str += "\n" + for v := range g.Adjacency { + str += fmt.Sprintf("* v: %v\n", v.GetName()) + // TODO: add explicit grouping data? + } + for v1 := range g.Adjacency { + for v2, e := range g.Adjacency[v1] { + str += fmt.Sprintf("* e: %v -> %v # %v\n", v1.GetName(), v2.GetName(), e.Name) + } + } + return +} + +// helper function +func runGraphCmp(t *testing.T, g1, g2 *Graph) { + // FIXME: update me when we've implemented the correct grouping algorithm! + ch := g1.autoGroup(&testBaseGrouper{}) // edits the graph + for _ = range ch { // bleed the channel or it won't run :( + // pass + } + err := GraphCmp(g1, g2) + if err != nil { + t.Logf(" actual (g1): %v%v", g1, g1.fullPrint()) + t.Logf("expected (g2): %v%v", g2, g2.fullPrint()) + t.Logf("Cmp error:") + t.Errorf("%v", err) + } +} + +// all of the following test cases are layed out with the following semantics: +// * vertices which start with the same single letter are considered "like" +// * "like" elements should be merged +// * vertices can have any integer after their single letter "family" type +// * grouped vertices should have a name with a comma separated list of names +// * edges follow the same conventions about grouping + +// empty graph +func TestPgraphGrouping1(t *testing.T) { + g1 := NewGraph("g1") // original graph + g2 := NewGraph("g2") // expected result + runGraphCmp(t, g1, g2) +} + +// single vertex +func TestPgraphGrouping2(t *testing.T) { + g1 := NewGraph("g1") // original graph + { // grouping to limit variable scope + a1 := NewVertex(NewNoopResTest("a1")) + g1.AddVertex(a1) + } + g2 := NewGraph("g2") // expected result + { + a1 := NewVertex(NewNoopResTest("a1")) + g2.AddVertex(a1) + } + runGraphCmp(t, g1, g2) +} + +// two vertices +func TestPgraphGrouping3(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + g1.AddVertex(a1, b1) + } + g2 := NewGraph("g2") // expected result + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + g2.AddVertex(a1, b1) + } + runGraphCmp(t, g1, g2) +} + +// two vertices merge +func TestPgraphGrouping4(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + g1.AddVertex(a1, a2) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + g2.AddVertex(a) + } + runGraphCmp(t, g1, g2) +} + +// three vertices merge +func TestPgraphGrouping5(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + a3 := NewVertex(NewNoopResTest("a3")) + g1.AddVertex(a1, a2, a3) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2,a3")) + g2.AddVertex(a) + } + runGraphCmp(t, g1, g2) +} + +// three vertices, two merge +func TestPgraphGrouping6(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + g1.AddVertex(a1, a2, b1) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b1 := NewVertex(NewNoopResTest("b1")) + g2.AddVertex(a, b1) + } + runGraphCmp(t, g1, g2) +} + +// four vertices, three merge +func TestPgraphGrouping7(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + a3 := NewVertex(NewNoopResTest("a3")) + b1 := NewVertex(NewNoopResTest("b1")) + g1.AddVertex(a1, a2, a3, b1) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2,a3")) + b1 := NewVertex(NewNoopResTest("b1")) + g2.AddVertex(a, b1) + } + runGraphCmp(t, g1, g2) +} + +// four vertices, two&two merge +func TestPgraphGrouping8(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + b2 := NewVertex(NewNoopResTest("b2")) + g1.AddVertex(a1, a2, b1, b2) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b := NewVertex(NewNoopResTest("b1,b2")) + g2.AddVertex(a, b) + } + runGraphCmp(t, g1, g2) +} + +// five vertices, two&three merge +func TestPgraphGrouping9(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + b2 := NewVertex(NewNoopResTest("b2")) + b3 := NewVertex(NewNoopResTest("b3")) + g1.AddVertex(a1, a2, b1, b2, b3) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b := NewVertex(NewNoopResTest("b1,b2,b3")) + g2.AddVertex(a, b) + } + runGraphCmp(t, g1, g2) +} + +// three unique vertices +func TestPgraphGrouping10(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + c1 := NewVertex(NewNoopResTest("c1")) + g1.AddVertex(a1, b1, c1) + } + g2 := NewGraph("g2") // expected result + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + c1 := NewVertex(NewNoopResTest("c1")) + g2.AddVertex(a1, b1, c1) + } + runGraphCmp(t, g1, g2) +} + +// three unique vertices, two merge +func TestPgraphGrouping11(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + b2 := NewVertex(NewNoopResTest("b2")) + c1 := NewVertex(NewNoopResTest("c1")) + g1.AddVertex(a1, b1, b2, c1) + } + g2 := NewGraph("g2") // expected result + { + a1 := NewVertex(NewNoopResTest("a1")) + b := NewVertex(NewNoopResTest("b1,b2")) + c1 := NewVertex(NewNoopResTest("c1")) + g2.AddVertex(a1, b, c1) + } + runGraphCmp(t, g1, g2) +} + +// simple merge 1 +// a1 a2 a1,a2 +// \ / >>> | (arrows point downwards) +// b b +func TestPgraphGrouping12(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + g1.AddEdge(a1, b1, e1) + g1.AddEdge(a2, b1, e2) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b1 := NewVertex(NewNoopResTest("b1")) + e := NewEdge("e1,e2") + g2.AddEdge(a, b1, e) + } + runGraphCmp(t, g1, g2) +} + +// simple merge 2 +// b b +// / \ >>> | (arrows point downwards) +// a1 a2 a1,a2 +func TestPgraphGrouping13(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + g1.AddEdge(b1, a1, e1) + g1.AddEdge(b1, a2, e2) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b1 := NewVertex(NewNoopResTest("b1")) + e := NewEdge("e1,e2") + g2.AddEdge(b1, a, e) + } + runGraphCmp(t, g1, g2) +} + +// triple merge +// a1 a2 a3 a1,a2,a3 +// \ | / >>> | (arrows point downwards) +// b b +func TestPgraphGrouping14(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + a3 := NewVertex(NewNoopResTest("a3")) + b1 := NewVertex(NewNoopResTest("b1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + g1.AddEdge(a1, b1, e1) + g1.AddEdge(a2, b1, e2) + g1.AddEdge(a3, b1, e3) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2,a3")) + b1 := NewVertex(NewNoopResTest("b1")) + e := NewEdge("e1,e2,e3") + g2.AddEdge(a, b1, e) + } + runGraphCmp(t, g1, g2) +} + +// chain merge +// a1 a1 +// / \ | +// b1 b2 >>> b1,b2 (arrows point downwards) +// \ / | +// c1 c1 +func TestPgraphGrouping15(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + b2 := NewVertex(NewNoopResTest("b2")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + g1.AddEdge(a1, b1, e1) + g1.AddEdge(a1, b2, e2) + g1.AddEdge(b1, c1, e3) + g1.AddEdge(b2, c1, e4) + } + g2 := NewGraph("g2") // expected result + { + a1 := NewVertex(NewNoopResTest("a1")) + b := NewVertex(NewNoopResTest("b1,b2")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1,e2") + e2 := NewEdge("e3,e4") + g2.AddEdge(a1, b, e1) + g2.AddEdge(b, c1, e2) + } + runGraphCmp(t, g1, g2) +} + +/* FIXME: uncomment me when we've implemented the correct grouping algorithm! +// reattach 1 (outer) +// a1 a2 a1,a2 +// | / | +// b1 / >>> b1 (arrows point downwards) +// | / | +// c1 c1 +func TestPgraphGrouping16(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + g1.AddEdge(a1, b1, e1) + g1.AddEdge(b1, c1, e2) + g1.AddEdge(a2, c1, e3) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b1 := NewVertex(NewNoopResTest("b1")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1,e3") + e2 := NewEdge("e2") // TODO: should this be e2,e3 (eg we split e3?) + g2.AddEdge(a, b1, e1) + g2.AddEdge(b1, c1, e2) + } + runGraphCmp(t, g1, g2) +} + +// reattach 2 (inner) +// a1 b2 a1 +// | / | +// b1 / >>> b1,b2 (arrows point downwards) +// | / | +// c1 c1 +func TestPgraphGrouping17(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + b2 := NewVertex(NewNoopResTest("b2")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + g1.AddEdge(a1, b1, e1) + g1.AddEdge(b1, c1, e2) + g1.AddEdge(b2, c1, e3) + } + g2 := NewGraph("g2") // expected result + { + a1 := NewVertex(NewNoopResTest("a1")) + b := NewVertex(NewNoopResTest("b1,b2")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2,e3") + g2.AddEdge(a1, b, e1) + g2.AddEdge(b, c1, e2) + } + runGraphCmp(t, g1, g2) +} + +// re-attach 3 (double) +// a2 a1 b2 a1,a2 +// \ | / | +// \ b1 / >>> b1,b2 (arrows point downwards) +// \ | / | +// c1 c1 +func TestPgraphGrouping18(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + b2 := NewVertex(NewNoopResTest("b2")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + g1.AddEdge(a1, b1, e1) + g1.AddEdge(b1, c1, e2) + g1.AddEdge(a2, c1, e3) + g1.AddEdge(b2, c1, e4) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b := NewVertex(NewNoopResTest("b1,b2")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1,e3") + e2 := NewEdge("e2,e4") + g2.AddEdge(a, b, e1) + g2.AddEdge(b, c1, e2) + } + runGraphCmp(t, g1, g2) +} + +// tricky merge, (no change or merge?) +// a1 a1 +// \ >>> \ (arrows point downwards) +// a2 a2 +func TestPgraphGroupingTricky1(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + e1 := NewEdge("e1") + g1.AddEdge(a1, a2, e1) + } + g2 := NewGraph("g2") // expected result ? + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + e1 := NewEdge("e1") + g2.AddEdge(a1, a2, e1) + } + //g3 := NewGraph("g2") // expected result ? + //{ + // a := NewVertex(NewNoopResTest("a1,a2")) + //} + runGraphCmp(t, g1, g2) // TODO: i'm tempted to think this is correct + //runGraphCmp(t, g1, g3) +} +*/ diff --git a/pkg.go b/pkg.go index 4dcefd0c..0ff7eb20 100644 --- a/pkg.go +++ b/pkg.go @@ -19,6 +19,7 @@ package main import ( //"packagekit" // TODO + "encoding/gob" "errors" "fmt" "log" @@ -26,6 +27,10 @@ import ( "strings" ) +func init() { + gob.Register(&PkgRes{}) +} + type PkgRes struct { BaseRes `yaml:",inline"` State string `yaml:"state"` // state: installed, uninstalled, newest, @@ -102,7 +107,7 @@ func (obj *PkgRes) Validate() bool { // use UpdatesChanged signal to watch for changes // TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/110 -func (obj *PkgRes) Watch() { +func (obj *PkgRes) Watch(processChan chan struct{}) { if obj.IsWatching() { return } @@ -168,7 +173,7 @@ func (obj *PkgRes) Watch() { dirty = false obj.isStateOK = false // something made state dirty } - Process(obj) // XXX: rename this function + processChan <- struct{}{} // trigger process } } } diff --git a/resources.go b/resources.go index c06463eb..2390f9fa 100644 --- a/resources.go +++ b/resources.go @@ -18,9 +18,11 @@ package main import ( + "bytes" + "encoding/base64" + "encoding/gob" "fmt" "log" - "time" ) //go:generate stringer -type=resState -output=resstate_stringer.go @@ -73,27 +75,24 @@ type MetaParams struct { // everything here only needs to be implemented once, in the BaseRes type Base interface { GetName() string // can't be named "Name()" because of struct field + SetName(string) Kind() string GetMeta() MetaParams SetVertex(*Vertex) SetConvergedCallback(ctimeout int, converged chan bool) - SendEvent(eventName, bool, bool) bool IsWatching() bool SetWatching(bool) GetConvergedState() resConvergedState SetConvergedState(resConvergedState) GetState() resState SetState(resState) - GetTimestamp() int64 - UpdateTimestamp() int64 - OKTimestamp() bool - Poke(bool) - BackPoke() - GroupCmp(Res) bool // TODO: is there a better name for this? - GroupRes(Res) error // group resource (arg) into self - IsGrouped() bool // am I grouped? - SetGrouped(bool) // set grouped bool - GetGroup() []Res // return everyone grouped inside me + SendEvent(eventName, bool, bool) bool + ReadEvent(*Event) (bool, bool) // TODO: optional here? + GroupCmp(Res) bool // TODO: is there a better name for this? + GroupRes(Res) error // group resource (arg) into self + IsGrouped() bool // am I grouped? + SetGrouped(bool) // set grouped bool + GetGroup() []Res // return everyone grouped inside me SetGroup([]Res) } @@ -103,17 +102,17 @@ type Res interface { Init() //Validate() bool // TODO: this might one day be added GetUUIDs() []ResUUID // most resources only return one - Watch() + Watch(chan struct{}) // send on channel to signal process() events CheckApply(bool) (bool, error) AutoEdges() AutoEdge Compare(Res) bool + CollectPattern(string) // XXX: temporary until Res collection is more advanced } type BaseRes struct { Name string `yaml:"name"` Meta MetaParams `yaml:"meta"` // struct of all the metaparams kind string - timestamp int64 // last updated timestamp ? events chan Event vertex *Vertex state resState @@ -168,11 +167,15 @@ func (obj *BaseRes) Init() { obj.events = make(chan Event) // unbuffered chan size to avoid stale events } -// this method gets used by all the resources, if we have one of (obj NoopRes) it would get overridden in that case! +// this method gets used by all the resources func (obj *BaseRes) GetName() string { return obj.Name } +func (obj *BaseRes) SetName(name string) { + obj.Name = name +} + // return the kind of resource this is func (obj *BaseRes) Kind() string { return obj.kind @@ -224,87 +227,6 @@ func (obj *BaseRes) SetState(state resState) { obj.state = state } -// GetTimestamp returns the timestamp of a vertex -func (obj *BaseRes) GetTimestamp() int64 { - return obj.timestamp -} - -// UpdateTimestamp updates the timestamp on a vertex and returns the new value -func (obj *BaseRes) UpdateTimestamp() int64 { - obj.timestamp = time.Now().UnixNano() // update - return obj.timestamp -} - -// can this element run right now? -func (obj *BaseRes) OKTimestamp() bool { - v := obj.GetVertex() - g := v.GetGraph() - // these are all the vertices pointing TO v, eg: ??? -> v - for _, n := range g.IncomingGraphEdges(v) { - // if the vertex has a greater timestamp than any pre-req (n) - // then we can't run right now... - // if they're equal (eg: on init of 0) then we also can't run - // b/c we should let our pre-req's go first... - x, y := obj.GetTimestamp(), n.Res.GetTimestamp() - if DEBUG { - log.Printf("%v[%v]: OKTimestamp: (%v) >= %v[%v](%v): !%v", obj.Kind(), obj.GetName(), x, n.Kind(), n.GetName(), y, x >= y) - } - if x >= y { - return false - } - } - return true -} - -// notify nodes after me in the dependency graph that they need refreshing... -// NOTE: this assumes that this can never fail or need to be rescheduled -func (obj *BaseRes) Poke(activity bool) { - v := obj.GetVertex() - g := v.GetGraph() - // these are all the vertices pointing AWAY FROM v, eg: v -> ??? - for _, n := range g.OutgoingGraphEdges(v) { - // XXX: if we're in state event and haven't been cancelled by - // apply, then we can cancel a poke to a child, right? XXX - // XXX: if n.Res.GetState() != resStateEvent { // is this correct? - if true { // XXX - if DEBUG { - log.Printf("%v[%v]: Poke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) - } - n.SendEvent(eventPoke, false, activity) // XXX: can this be switched to sync? - } else { - if DEBUG { - log.Printf("%v[%v]: Poke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) - } - } - } -} - -// poke the pre-requisites that are stale and need to run before I can run... -func (obj *BaseRes) BackPoke() { - v := obj.GetVertex() - g := v.GetGraph() - // these are all the vertices pointing TO v, eg: ??? -> v - for _, n := range g.IncomingGraphEdges(v) { - x, y, s := obj.GetTimestamp(), n.Res.GetTimestamp(), n.Res.GetState() - // if the parent timestamp needs poking AND it's not in state - // resStateEvent, then poke it. If the parent is in resStateEvent it - // means that an event is pending, so we'll be expecting a poke - // back soon, so we can safely discard the extra parent poke... - // TODO: implement a stateLT (less than) to tell if something - // happens earlier in the state cycle and that doesn't wrap nil - if x >= y && (s != resStateEvent && s != resStateCheckApply) { - if DEBUG { - log.Printf("%v[%v]: BackPoke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) - } - n.SendEvent(eventBackPoke, false, false) // XXX: can this be switched to sync? - } else { - if DEBUG { - log.Printf("%v[%v]: BackPoke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) - } - } - } -} - // push an event into the message queue for a particular vertex func (obj *BaseRes) SendEvent(event eventName, sync bool, activity bool) bool { // TODO: isn't this race-y ? @@ -394,50 +316,38 @@ func (obj *BaseRes) SetGroup(g []Res) { obj.grouped = g } -// XXX: rename this function -func Process(obj Res) { - if DEBUG { - log.Printf("%v[%v]: Process()", obj.Kind(), obj.GetName()) - } - obj.SetState(resStateEvent) - var ok = true - var apply = false // did we run an apply? - // is it okay to run dependency wise right now? - // if not, that's okay because when the dependency runs, it will poke - // us back and we will run if needed then! - if obj.OKTimestamp() { - if DEBUG { - log.Printf("%v[%v]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), obj.GetTimestamp()) - } - - obj.SetState(resStateCheckApply) - // if this fails, don't UpdateTimestamp() - stateok, err := obj.CheckApply(true) - if stateok && err != nil { // should never return this way - log.Fatalf("%v[%v]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), stateok, err) - } - if DEBUG { - log.Printf("%v[%v]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), stateok, err) - } - - if !stateok { // if state *was* not ok, we had to have apply'ed - if err != nil { // error during check or apply - ok = false - } else { - apply = true - } - } - - if ok { - // update this timestamp *before* we poke or the poked - // nodes might fail due to having a too old timestamp! - obj.UpdateTimestamp() // this was touched... - obj.SetState(resStatePoking) // can't cancel parent poke - obj.Poke(apply) - } - // poke at our pre-req's instead since they need to refresh/run... - } else { - // only poke at the pre-req's that need to run - go obj.BackPoke() - } +func (obj *BaseRes) CollectPattern(pattern string) { + // XXX: default method is empty +} + +// ResToB64 encodes a resource to a base64 encoded string (after serialization) +func ResToB64(res Res) (string, error) { + b := bytes.Buffer{} + e := gob.NewEncoder(&b) + err := e.Encode(&res) // pass with & + if err != nil { + return "", fmt.Errorf("Gob failed to encode: %v", err) + } + return base64.StdEncoding.EncodeToString(b.Bytes()), nil +} + +// B64ToRes decodes a resource from a base64 encoded string (after deserialization) +func B64ToRes(str string) (Res, error) { + var output interface{} + bb, err := base64.StdEncoding.DecodeString(str) + if err != nil { + return nil, fmt.Errorf("Base64 failed to decode: %v", err) + } + b := bytes.NewBuffer(bb) + d := gob.NewDecoder(b) + err = d.Decode(&output) // pass with & + if err != nil { + return nil, fmt.Errorf("Gob failed to decode: %v", err) + } + res, ok := output.(Res) + if !ok { + return nil, fmt.Errorf("Output %v is not a Res", res) + + } + return res, nil } diff --git a/resources_test.go b/resources_test.go new file mode 100644 index 00000000..70f13c7c --- /dev/null +++ b/resources_test.go @@ -0,0 +1,105 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "bytes" + "encoding/base64" + "encoding/gob" + "testing" +) + +func TestMiscEncodeDecode1(t *testing.T) { + var err error + //gob.Register( &NoopRes{} ) // happens in noop.go : init() + //gob.Register( &FileRes{} ) // happens in file.go : init() + // ... + + // encode + var input interface{} = &FileRes{} + b1 := bytes.Buffer{} + e := gob.NewEncoder(&b1) + err = e.Encode(&input) // pass with & + if err != nil { + t.Errorf("Gob failed to Encode: %v", err) + } + str := base64.StdEncoding.EncodeToString(b1.Bytes()) + + // decode + var output interface{} + bb, err := base64.StdEncoding.DecodeString(str) + if err != nil { + t.Errorf("Base64 failed to Decode: %v", err) + } + b2 := bytes.NewBuffer(bb) + d := gob.NewDecoder(b2) + err = d.Decode(&output) // pass with & + if err != nil { + t.Errorf("Gob failed to Decode: %v", err) + } + + res1, ok := input.(Res) + if !ok { + t.Errorf("Input %v is not a Res", res1) + return + } + res2, ok := output.(Res) + if !ok { + t.Errorf("Output %v is not a Res", res2) + return + } + if !res1.Compare(res2) { + t.Error("The input and output Res values do not match!") + } +} + +func TestMiscEncodeDecode2(t *testing.T) { + var err error + //gob.Register( &NoopRes{} ) // happens in noop.go : init() + //gob.Register( &FileRes{} ) // happens in file.go : init() + // ... + + // encode + var input Res = &FileRes{} + + b64, err := ResToB64(input) + if err != nil { + t.Errorf("Can't encode: %v", err) + return + } + + output, err := B64ToRes(b64) + if err != nil { + t.Errorf("Can't decode: %v", err) + return + } + + res1, ok := input.(Res) + if !ok { + t.Errorf("Input %v is not a Res", res1) + return + } + res2, ok := output.(Res) + if !ok { + t.Errorf("Output %v is not a Res", res2) + return + } + if !res1.Compare(res2) { + t.Error("The input and output Res values do not match!") + } +} diff --git a/svc.go b/svc.go index 635a66a9..69f0a309 100644 --- a/svc.go +++ b/svc.go @@ -20,6 +20,7 @@ package main import ( + "encoding/gob" "errors" "fmt" systemd "github.com/coreos/go-systemd/dbus" // change namespace @@ -28,6 +29,10 @@ import ( "log" ) +func init() { + gob.Register(&SvcRes{}) +} + type SvcRes struct { BaseRes `yaml:",inline"` State string `yaml:"state"` // state: running, stopped, undefined @@ -62,7 +67,7 @@ func (obj *SvcRes) Validate() bool { } // Service watcher -func (obj *SvcRes) Watch() { +func (obj *SvcRes) Watch(processChan chan struct{}) { if obj.IsWatching() { return } @@ -189,7 +194,7 @@ func (obj *SvcRes) Watch() { case err := <-subErrors: obj.SetConvergedState(resConvergedNil) // XXX ? - log.Println("error:", err) + log.Printf("error: %v", err) log.Fatal(err) //vertex.events <- fmt.Sprintf("svc: %v", "error") // XXX: how should we handle errors? @@ -210,7 +215,7 @@ func (obj *SvcRes) Watch() { dirty = false obj.isStateOK = false // something made state dirty } - Process(obj) // XXX: rename this function + processChan <- struct{}{} // trigger process } }