Add resource auto grouping

Sorry for the size of this patch, I was busy hacking and plumbing away
and it got out of hand! I'm allowing this because there doesn't seem to
be anyone hacking away on parts of the code that this would break, since
the resource code is fairly stable in this change. In particular, it
revisits and refreshes some areas of the code that didn't see anything
new or innovative since the project first started. I've gotten rid of a
lot of cruft, and in particular cleaned up some things that I didn't
know how to do better before! Here's hoping I'll continue to learn and
have more to improve upon in the future! (Well let's not hope _too_ hard
though!)

The logical goal of this patch was to make logical grouping of resources
possible. For example, it might be more efficient to group three package
installations into a single transaction, instead of having to run three
separate transactions. This is because a package installation typically
has an initial one-time per run cost which shouldn't need to be
repeated.

Another future goal would be to group file resources sharing a common
base path under a common recursive fanotify watcher. Since this depends
on fanotify capabilities first, this hasn't been implemented yet, but
could be a useful method of reducing the number of separate watches
needed, since there is a finite limit.

It's worth mentioning that grouping resources typically _reduces_ the
parallel execution capability of a particular graph, but depending on
the cost/benefit tradeoff, this might be preferential. I'd submit it's
almost universally beneficial for pkg resources.

This monster patch includes:
* the autogroup feature
* the grouping interface
* a placeholder algorithm
* an extensive test case infrastructure to test grouping algorithms
* a move of some base resource methods into pgraph refactoring
* some config/compile clean ups to remove code duplication
* b64 encoding/decoding improvements
* a rename of the yaml "res" entries to "kind" (more logical)
* some docs
* small fixes
* and more!
This commit is contained in:
James Shubin
2016-03-20 01:17:35 -04:00
parent 9720812a78
commit 1b01f908e3
36 changed files with 1791 additions and 515 deletions

View File

@@ -32,6 +32,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
3. [Setup - Getting started with mgmt](#setup)
4. [Features - All things mgmt can do](#features)
* [Autoedges - Automatic resource relationships](#autoedges)
* [Autogrouping - Automatic resource grouping](#autogrouping)
5. [Usage/FAQ - Notes on usage and frequently asked questions](#usage-and-frequently-asked-questions)
6. [Reference - Detailed reference](#reference)
* [Graph definition file](#graph-definition-file)
@@ -94,8 +95,22 @@ order to handle this situation you can disable autoedges per resource and
explicitly declare that you want `my.cnf` to be written to disk before the
installation of the `mysql-server` package.
You can disable autoedges for a resource by setting the `autoedge` key for
the meta attributes of a resource to `false`.
You can disable autoedges for a resource by setting the `autoedge` key on
the meta attributes of that resource to `false`.
###Autogrouping
Automatic grouping or AutoGroup is the mechanism in mgmt by which it will
automatically group multiple resource vertices into a single one. This is
particularly useful for grouping multiple package resources into a single
resource, since the multiple installations can happen together in a single
transaction, which saves a lot of time because package resources typically have
a large fixed cost to running (downloading and verifying the package repo) and
if they are grouped they share this fixed cost. This grouping feature can be
used for other use cases too.
You can disable autogrouping for a resource by setting the `autogroup` key on
the meta attributes of that resource to `false`.
##Usage and frequently asked questions
(Send your questions as a patch to this FAQ! I'll review it, merge it, and

466
config.go
View File

@@ -23,16 +23,17 @@ import (
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
"reflect"
"strings"
)
type collectorResConfig struct {
Res string `yaml:"res"`
Kind string `yaml:"kind"`
Pattern string `yaml:"pattern"` // XXX: Not Implemented
}
type vertexConfig struct {
Res string `yaml:"res"`
Kind string `yaml:"kind"`
Name string `yaml:"name"`
}
@@ -82,105 +83,79 @@ func ParseConfigFromFile(filename string) *GraphConfig {
return &config
}
// XXX: we need to fix this function so that it either fails without modifying
// the graph, passes successfully and modifies it, or basically panics i guess
// this way an invalid compilation can leave the old graph running, and we we
// don't modify a partial graph. so we really need to validate, and then perform
// whatever actions are necessary
// finding some way to do this on a copy of the graph, and then do a graph diff
// and merge the new data into the old graph would be more appropriate, in
// particular if we can ensure the graph merge can't fail. As for the putting
// of stuff into etcd, we should probably store the operations to complete in
// the new graph, and keep retrying until it succeeds, thus blocking any new
// etcd operations until that time.
func UpdateGraphFromConfig(config *GraphConfig, hostname string, g *Graph, etcdO *EtcdWObject) bool {
// NewGraphFromConfig returns a new graph from existing input, such as from the
// existing graph, and a GraphConfig struct.
func (g *Graph) NewGraphFromConfig(config *GraphConfig, etcdO *EtcdWObject, hostname string) (*Graph, error) {
var NoopMap = make(map[string]*Vertex)
var PkgMap = make(map[string]*Vertex)
var FileMap = make(map[string]*Vertex)
var SvcMap = make(map[string]*Vertex)
var ExecMap = make(map[string]*Vertex)
var graph *Graph // new graph to return
if g == nil { // FIXME: how can we check for an empty graph?
graph = NewGraph("Graph") // give graph a default name
} else {
graph = g.Copy() // same vertices, since they're pointers!
}
var lookup = make(map[string]map[string]*Vertex)
lookup["noop"] = NoopMap
lookup["pkg"] = PkgMap
lookup["file"] = FileMap
lookup["svc"] = SvcMap
lookup["exec"] = ExecMap
//log.Printf("%+v", config) // debug
g.SetName(config.Graph) // set graph name
// TODO: if defined (somehow)...
graph.SetName(config.Graph) // set graph name
var keep []*Vertex // list of vertex which are the same in new graph
for _, obj := range config.Resources.Noop {
v := g.GetVertexMatch(obj)
if v == nil { // no match found
obj.Init()
v = NewVertex(obj)
g.AddVertex(v) // call standalone in case not part of an edge
// use reflection to avoid duplicating code... better options welcome!
value := reflect.Indirect(reflect.ValueOf(config.Resources))
vtype := value.Type()
for i := 0; i < vtype.NumField(); i++ { // number of fields in struct
name := vtype.Field(i).Name // string of field name
field := value.FieldByName(name)
iface := field.Interface() // interface type of value
slice := reflect.ValueOf(iface)
// XXX: should we just drop these everywhere and have the kind strings be all lowercase?
kind := FirstToUpper(name)
if DEBUG {
log.Printf("Config: Processing: %v...", kind)
}
NoopMap[obj.Name] = v // used for constructing edges
keep = append(keep, v) // append
for j := 0; j < slice.Len(); j++ { // loop through resources of same kind
x := slice.Index(j).Interface()
obj, ok := x.(Res) // convert to Res type
if !ok {
return nil, fmt.Errorf("Error: Config: Can't convert: %v of type: %T to Res.", x, x)
}
for _, obj := range config.Resources.Pkg {
v := g.GetVertexMatch(obj)
if v == nil { // no match found
obj.Init()
v = NewVertex(obj)
g.AddVertex(v) // call standalone in case not part of an edge
if _, exists := lookup[kind]; !exists {
lookup[kind] = make(map[string]*Vertex)
}
PkgMap[obj.Name] = v // used for constructing edges
keep = append(keep, v) // append
}
for _, obj := range config.Resources.File {
// XXX: should we export based on a @@ prefix, or a metaparam
// like exported => true || exported => (host pattern)||(other pattern?)
if strings.HasPrefix(obj.Name, "@@") { // exported resource
// add to etcd storage...
obj.Name = obj.Name[2:] //slice off @@
if !etcdO.EtcdPut(hostname, obj.Name, "file", obj) {
log.Printf("Problem exporting file resource %v.", obj.Name)
continue
}
} else {
if !strings.HasPrefix(obj.GetName(), "@@") { // exported resource
// XXX: we don't have a way of knowing if any of the
// metaparams are undefined, and as a result to set the
// defaults that we want! I hate the go yaml parser!!!
v := g.GetVertexMatch(obj)
v := graph.GetVertexMatch(obj)
if v == nil { // no match found
obj.Init()
v = NewVertex(obj)
g.AddVertex(v) // call standalone in case not part of an edge
graph.AddVertex(v) // call standalone in case not part of an edge
}
FileMap[obj.Name] = v // used for constructing edges
lookup[kind][obj.GetName()] = v // used for constructing edges
keep = append(keep, v) // append
}
} else {
// XXX: do this in a different function...
// add to etcd storage...
obj.SetName(obj.GetName()[2:]) //slice off @@
data, err := ResToB64(obj)
if err != nil {
return nil, fmt.Errorf("Config: Could not encode %v resource: %v, error: %v", kind, obj.GetName(), err)
}
for _, obj := range config.Resources.Svc {
v := g.GetVertexMatch(obj)
if v == nil { // no match found
obj.Init()
v = NewVertex(obj)
g.AddVertex(v) // call standalone in case not part of an edge
if !etcdO.EtcdPut(hostname, obj.GetName(), kind, data) {
return nil, fmt.Errorf("Config: Could not export %v resource: %v", kind, obj.GetName())
}
SvcMap[obj.Name] = v // used for constructing edges
keep = append(keep, v) // append
}
for _, obj := range config.Resources.Exec {
v := g.GetVertexMatch(obj)
if v == nil { // no match found
obj.Init()
v = NewVertex(obj)
g.AddVertex(v) // call standalone in case not part of an edge
}
ExecMap[obj.Name] = v // used for constructing edges
keep = append(keep, v) // append
}
// lookup from etcd graph
@@ -189,100 +164,71 @@ func UpdateGraphFromConfig(config *GraphConfig, hostname string, g *Graph, etcdO
nodes, ok := etcdO.EtcdGet()
if ok {
for _, t := range config.Collector {
// XXX: use t.Res and optionally t.Pattern to collect from etcd storage
log.Printf("Collect: %v; Pattern: %v", t.Res, t.Pattern)
// XXX: should we just drop these everywhere and have the kind strings be all lowercase?
kind := FirstToUpper(t.Kind)
for _, x := range etcdO.EtcdGetProcess(nodes, "file") {
var obj *FileRes
if B64ToObj(x, &obj) != true {
log.Printf("Collect: File: %v not collected!", x)
// use t.Kind and optionally t.Pattern to collect from etcd storage
log.Printf("Collect: %v; Pattern: %v", kind, t.Pattern)
for _, str := range etcdO.EtcdGetProcess(nodes, kind) {
obj, err := B64ToRes(str)
if err != nil {
log.Printf("B64ToRes failed to decode: %v", err)
log.Printf("Collect: %v: not collected!", kind)
continue
}
if t.Pattern != "" { // XXX: currently the pattern for files can only override the Dirname variable :P
obj.Dirname = t.Pattern
if t.Pattern != "" { // XXX: simplistic for now
obj.CollectPattern(t.Pattern) // obj.Dirname = t.Pattern
}
log.Printf("Collect: File: %v collected!", obj.GetName())
log.Printf("Collect: %v[%v]: collected!", kind, obj.GetName())
// XXX: similar to file add code:
v := g.GetVertexMatch(obj)
// XXX: similar to other resource add code:
if _, exists := lookup[kind]; !exists {
lookup[kind] = make(map[string]*Vertex)
}
v := graph.GetVertexMatch(obj)
if v == nil { // no match found
obj.Init() // initialize go channels or things won't work!!!
v = NewVertex(obj)
g.AddVertex(v) // call standalone in case not part of an edge
graph.AddVertex(v) // call standalone in case not part of an edge
}
FileMap[obj.GetName()] = v // used for constructing edges
lookup[kind][obj.GetName()] = v // used for constructing edges
keep = append(keep, v) // append
}
}
}
// get rid of any vertices we shouldn't "keep" (that aren't in new graph)
for _, v := range g.GetVertices() {
if !HasVertex(v, keep) {
for _, v := range graph.GetVertices() {
if !VertexContains(v, keep) {
// wait for exit before starting new graph!
v.Res.SendEvent(eventExit, true, false)
g.DeleteVertex(v)
v.SendEvent(eventExit, true, false)
graph.DeleteVertex(v)
}
}
for _, e := range config.Edges {
if _, ok := lookup[e.From.Res]; !ok {
return false
if _, ok := lookup[FirstToUpper(e.From.Kind)]; !ok {
return nil, fmt.Errorf("Can't find 'from' resource!")
}
if _, ok := lookup[e.To.Res]; !ok {
return false
if _, ok := lookup[FirstToUpper(e.To.Kind)]; !ok {
return nil, fmt.Errorf("Can't find 'to' resource!")
}
if _, ok := lookup[e.From.Res][e.From.Name]; !ok {
return false
if _, ok := lookup[FirstToUpper(e.From.Kind)][e.From.Name]; !ok {
return nil, fmt.Errorf("Can't find 'from' name!")
}
if _, ok := lookup[e.To.Res][e.To.Name]; !ok {
return false
if _, ok := lookup[FirstToUpper(e.To.Kind)][e.To.Name]; !ok {
return nil, fmt.Errorf("Can't find 'to' name!")
}
g.AddEdge(lookup[e.From.Res][e.From.Name], lookup[e.To.Res][e.To.Name], NewEdge(e.Name))
graph.AddEdge(lookup[FirstToUpper(e.From.Kind)][e.From.Name], lookup[FirstToUpper(e.To.Kind)][e.To.Name], NewEdge(e.Name))
}
// add auto edges
log.Println("Compile: Adding AutoEdges...")
for _, v := range g.GetVertices() { // for each vertexes autoedges
if !v.GetMeta().AutoEdge { // is the metaparam true?
continue
}
autoEdgeObj := v.AutoEdges()
if autoEdgeObj == nil {
log.Printf("%v[%v]: Config: No auto edges were found!", v.Kind(), v.GetName())
continue // next vertex
}
for { // while the autoEdgeObj has more uuids to add...
uuids := autoEdgeObj.Next() // get some!
if uuids == nil {
log.Printf("%v[%v]: Config: The auto edge list is empty!", v.Kind(), v.GetName())
break // inner loop
}
if DEBUG {
log.Println("Compile: AutoEdge: UUIDS:")
for i, u := range uuids {
log.Printf("Compile: AutoEdge: UUID%d: %v", i, u)
}
}
// match and add edges
result := g.AddEdgesByMatchingUUIDS(v, uuids)
// report back, and find out if we should continue
if !autoEdgeObj.Test(result) {
break
}
}
}
return true
return graph, nil
}
// add edges to the vertex in a graph based on if it matches a uuid list
func (g *Graph) AddEdgesByMatchingUUIDS(v *Vertex, uuids []ResUUID) []bool {
func (g *Graph) addEdgesByMatchingUUIDS(v *Vertex, uuids []ResUUID) []bool {
// search for edges and see what matches!
var result []bool
@@ -316,9 +262,251 @@ func (g *Graph) AddEdgesByMatchingUUIDS(v *Vertex, uuids []ResUUID) []bool {
break
}
}
result = append(result, found)
}
return result
}
// add auto edges to graph
func (g *Graph) AutoEdges() {
log.Println("Compile: Adding AutoEdges...")
for _, v := range g.GetVertices() { // for each vertexes autoedges
if !v.GetMeta().AutoEdge { // is the metaparam true?
continue
}
autoEdgeObj := v.AutoEdges()
if autoEdgeObj == nil {
log.Printf("%v[%v]: Config: No auto edges were found!", v.Kind(), v.GetName())
continue // next vertex
}
for { // while the autoEdgeObj has more uuids to add...
uuids := autoEdgeObj.Next() // get some!
if uuids == nil {
log.Printf("%v[%v]: Config: The auto edge list is empty!", v.Kind(), v.GetName())
break // inner loop
}
if DEBUG {
log.Println("Compile: AutoEdge: UUIDS:")
for i, u := range uuids {
log.Printf("Compile: AutoEdge: UUID%d: %v", i, u)
}
}
// match and add edges
result := g.addEdgesByMatchingUUIDS(v, uuids)
// report back, and find out if we should continue
if !autoEdgeObj.Test(result) {
break
}
}
}
}
// AutoGrouper is the required interface to implement for an autogroup algorithm
type AutoGrouper interface {
// listed in the order these are typically called in...
name() string // friendly identifier
init(*Graph) error // only call once
vertexNext() (*Vertex, *Vertex, error) // mostly algorithmic
vertexCmp(*Vertex, *Vertex) error // can we merge these ?
vertexMerge(*Vertex, *Vertex) (*Vertex, error) // vertex merge fn to use
edgeMerge(*Edge, *Edge) *Edge // edge merge fn to use
vertexTest(bool) (bool, error) // call until false
}
// baseGrouper is the base type for implementing the AutoGrouper interface
type baseGrouper struct {
graph *Graph // store a pointer to the graph
vertices []*Vertex // cached list of vertices
i int
j int
done bool
}
// name provides a friendly name for the logs to see
func (ag *baseGrouper) name() string {
return "baseGrouper"
}
// init is called only once and before using other AutoGrouper interface methods
// the name method is the only exception: call it any time without side effects!
func (ag *baseGrouper) init(g *Graph) error {
if ag.graph != nil {
return fmt.Errorf("The init method has already been called!")
}
ag.graph = g // pointer
ag.vertices = ag.graph.GetVertices() // cache
ag.i = 0
ag.j = 0
if len(ag.vertices) == 0 { // empty graph
ag.done = true
return nil
}
return nil
}
// vertexNext is a simple iterator that loops through vertex (pair) combinations
// an intelligent algorithm would selectively offer only valid pairs of vertices
// these should satisfy logical grouping requirements for the autogroup designs!
// the desired algorithms can override, but keep this method as a base iterator!
func (ag *baseGrouper) vertexNext() (v1, v2 *Vertex, err error) {
// this does a for v... { for w... { return v, w }} but stepwise!
l := len(ag.vertices)
if ag.i < l {
v1 = ag.vertices[ag.i]
}
if ag.j < l {
v2 = ag.vertices[ag.j]
}
// in case the vertex was deleted
if !ag.graph.HasVertex(v1) {
v1 = nil
}
if !ag.graph.HasVertex(v2) {
v2 = nil
}
// two nested loops...
if ag.j < l {
ag.j++
}
if ag.j == l {
ag.j = 0
if ag.i < l {
ag.i++
}
if ag.i == l {
ag.done = true
}
}
return
}
func (ag *baseGrouper) vertexCmp(v1, v2 *Vertex) error {
if v1 == nil || v2 == nil {
return fmt.Errorf("Vertex is nil!")
}
if v1 == v2 { // skip yourself
return fmt.Errorf("Vertices are the same!")
}
if v1.Kind() != v2.Kind() { // we must group similar kinds
// TODO: maybe future resources won't need this limitation?
return fmt.Errorf("The two resources aren't the same kind!")
}
// someone doesn't want to group!
if !v1.GetMeta().AutoGroup || !v2.GetMeta().AutoGroup {
return fmt.Errorf("One of the autogroup flags is false!")
}
if v1.Res.IsGrouped() { // already grouped!
return fmt.Errorf("Already grouped!")
}
if len(v2.Res.GetGroup()) > 0 { // already has children grouped!
return fmt.Errorf("Already has groups!")
}
if !v1.Res.GroupCmp(v2.Res) { // resource groupcmp failed!
return fmt.Errorf("The GroupCmp failed!")
}
return nil // success
}
func (ag *baseGrouper) vertexMerge(v1, v2 *Vertex) (v *Vertex, err error) {
// NOTE: it's important to use w.Res instead of w, b/c
// the w by itself is the *Vertex obj, not the *Res obj
// which is contained within it! They both satisfy the
// Res interface, which is why both will compile! :(
err = v1.Res.GroupRes(v2.Res) // GroupRes skips stupid groupings
return // success or fail, and no need to merge the actual vertices!
}
func (ag *baseGrouper) edgeMerge(e1, e2 *Edge) *Edge {
return e1 // noop
}
// vertexTest processes the results of the grouping for the algorithm to know
// return an error if something went horribly wrong, and bool false to stop
func (ag *baseGrouper) vertexTest(b bool) (bool, error) {
// NOTE: this particular baseGrouper version doesn't track what happens
// because since we iterate over every pair, we don't care which merge!
if ag.done {
return false, nil
}
return true, nil
}
type algorithmNameGrouper struct { // XXX rename me!
baseGrouper // "inherit" what we want, and reimplement the rest
}
func (ag *algorithmNameGrouper) name() string {
log.Fatal("Not implemented!") // XXX
return "algorithmNameGrouper"
}
func (ag *algorithmNameGrouper) vertexNext() (v1, v2 *Vertex, err error) {
log.Fatal("Not implemented!") // XXX
// NOTE: you can even build this like this:
//v1, v2, err = ag.baseGrouper.vertexNext() // get all iterable pairs
// ...
//ag.baseGrouper.vertexTest(...)
//return
return nil, nil, fmt.Errorf("Not implemented!")
}
// autoGroup is the mechanical auto group "runner" that runs the interface spec
func (g *Graph) autoGroup(ag AutoGrouper) chan string {
strch := make(chan string) // output log messages here
go func(strch chan string) {
strch <- fmt.Sprintf("Compile: Grouping: Algorithm: %v...", ag.name())
if err := ag.init(g); err != nil {
log.Fatalf("Error running autoGroup(init): %v", err)
}
for {
var v, w *Vertex
v, w, err := ag.vertexNext() // get pair to compare
if err != nil {
log.Fatalf("Error running autoGroup(vertexNext): %v", err)
}
merged := false
// save names since they change during the runs
vStr := fmt.Sprintf("%s", v) // valid even if it is nil
wStr := fmt.Sprintf("%s", w)
if err := ag.vertexCmp(v, w); err != nil { // cmp ?
strch <- fmt.Sprintf("Compile: Grouping: !GroupCmp for: %s into %s", wStr, vStr)
// remove grouped vertex and merge edges (res is safe)
} else if err := g.VertexMerge(v, w, ag.vertexMerge, ag.edgeMerge); err != nil { // merge...
strch <- fmt.Sprintf("Compile: Grouping: !VertexMerge for: %s into %s", wStr, vStr)
} else { // success!
strch <- fmt.Sprintf("Compile: Grouping: Success for: %s into %s", wStr, vStr)
merged = true // woo
}
// did these get used?
if ok, err := ag.vertexTest(merged); err != nil {
log.Fatalf("Error running autoGroup(vertexTest): %v", err)
} else if !ok {
break // done!
}
}
close(strch)
return
}(strch) // call function
return strch
}
// AutoGroup runs the auto grouping on the graph and prints out log messages
func (g *Graph) AutoGroup() {
// receive log messages from channel...
// this allows test cases to avoid printing them when they're unwanted!
for str := range g.autoGroup(&baseGrouper{}) {
log.Println(str)
}
}

View File

@@ -138,7 +138,7 @@ func ConfigWatch(file string) chan bool {
}
case err := <-watcher.Errors:
log.Println("error:", err)
log.Printf("error: %v", err)
log.Fatal(err)
}

10
etcd.go
View File

@@ -207,20 +207,14 @@ func (etcdO *EtcdWObject) EtcdWatch() chan etcdMsg {
}
// helper function to store our data in etcd
func (etcdO *EtcdWObject) EtcdPut(hostname, key, res string, obj interface{}) bool {
func (etcdO *EtcdWObject) EtcdPut(hostname, key, res string, data string) bool {
kapi := etcdO.GetKAPI()
output, ok := ObjToB64(obj)
if !ok {
log.Printf("Etcd: Could not encode %v key.", key)
return false
}
path := fmt.Sprintf("/exported/%s/resources/%s/res", hostname, key)
_, err := kapi.Set(etcd_context.Background(), path, res, nil)
// XXX validate...
path = fmt.Sprintf("/exported/%s/resources/%s/value", hostname, key)
resp, err := kapi.Set(etcd_context.Background(), path, output, nil)
resp, err := kapi.Set(etcd_context.Background(), path, data, nil)
if err != nil {
if cerr, ok := err.(*etcd.ClusterError); ok {
// not running or disconnected

View File

@@ -13,6 +13,6 @@ resources:
i am f2, exported from host A
state: exists
collect:
- res: file
- kind: file
pattern: "/tmp/mgmtA/"
edges: []

View File

@@ -13,6 +13,6 @@ resources:
i am f2, exported from host B
state: exists
collect:
- res: file
- kind: file
pattern: "/tmp/mgmtB/"
edges: []

View File

@@ -13,6 +13,6 @@ resources:
i am f2, exported from host C
state: exists
collect:
- res: file
- kind: file
pattern: "/tmp/mgmtC/"
edges: []

View File

@@ -45,15 +45,15 @@ resources:
edges:
- name: e1
from:
res: exec
kind: exec
name: exec1
to:
res: exec
kind: exec
name: exec2
- name: e2
from:
res: exec
kind: exec
name: exec2
to:
res: exec
kind: exec
name: exec3

View File

@@ -25,8 +25,8 @@ resources:
edges:
- name: e1
from:
res: exec
kind: exec
name: exec1
to:
res: exec
kind: exec
name: exec2

View File

@@ -25,8 +25,8 @@ resources:
edges:
- name: e1
from:
res: exec
kind: exec
name: exec1
to:
res: exec
kind: exec
name: exec2

View File

@@ -25,8 +25,8 @@ resources:
edges:
- name: e1
from:
res: exec
kind: exec
name: exec1
to:
res: exec
kind: exec
name: exec2

View File

@@ -55,29 +55,29 @@ resources:
edges:
- name: e1
from:
res: exec
kind: exec
name: exec1
to:
res: exec
kind: exec
name: exec2
- name: e2
from:
res: exec
kind: exec
name: exec1
to:
res: exec
kind: exec
name: exec3
- name: e3
from:
res: exec
kind: exec
name: exec2
to:
res: exec
kind: exec
name: exec4
- name: e4
from:
res: exec
kind: exec
name: exec3
to:
res: exec
kind: exec
name: exec4

View File

@@ -27,15 +27,15 @@ resources:
edges:
- name: e1
from:
res: file
kind: file
name: file1
to:
res: file
kind: file
name: file2
- name: e2
from:
res: file
kind: file
name: file2
to:
res: file
kind: file
name: file3

View File

@@ -13,8 +13,8 @@ resources:
edges:
- name: e1
from:
res: noop
kind: noop
name: noop1
to:
res: file
kind: file
name: file1

View File

@@ -86,43 +86,43 @@ resources:
edges:
- name: e1
from:
res: exec
kind: exec
name: exec1
to:
res: exec
kind: exec
name: exec4
- name: e2
from:
res: exec
kind: exec
name: exec2
to:
res: exec
kind: exec
name: exec4
- name: e3
from:
res: exec
kind: exec
name: exec3
to:
res: exec
kind: exec
name: exec4
- name: e4
from:
res: exec
kind: exec
name: exec4
to:
res: exec
kind: exec
name: exec5
- name: e5
from:
res: exec
kind: exec
name: exec4
to:
res: exec
kind: exec
name: exec6
- name: e6
from:
res: exec
kind: exec
name: exec4
to:
res: exec
kind: exec
name: exec7

View File

@@ -15,8 +15,8 @@ resources:
edges:
- name: e1
from:
res: file
kind: file
name: file1
to:
res: file
kind: file
name: file2

View File

@@ -15,8 +15,8 @@ resources:
edges:
- name: e2
from:
res: file
kind: file
name: file2
to:
res: file
kind: file
name: file3

View File

@@ -23,6 +23,6 @@ resources:
i am f4, exported from host A
state: exists
collect:
- res: file
- kind: file
pattern: "/tmp/mgmtA/"
edges: []

View File

@@ -23,6 +23,6 @@ resources:
i am f4, exported from host B
state: exists
collect:
- res: file
- kind: file
pattern: "/tmp/mgmtB/"
edges: []

View File

@@ -23,6 +23,6 @@ resources:
i am f4, exported from host C
state: exists
collect:
- res: file
- kind: file
pattern: "/tmp/mgmtC/"
edges: []

View File

@@ -13,6 +13,6 @@ resources:
i am f3, exported from host A
state: exists
collect:
- res: file
- kind: file
pattern: ''
edges:

View File

@@ -8,6 +8,6 @@ resources:
i am f1
state: exists
collect:
- res: file
- kind: file
pattern: ''
edges:

View File

@@ -56,22 +56,22 @@ resources:
edges:
- name: e1
from:
res: exec
kind: exec
name: exec1
to:
res: exec
kind: exec
name: exec5
- name: e2
from:
res: exec
kind: exec
name: exec2
to:
res: exec
kind: exec
name: exec5
- name: e3
from:
res: exec
kind: exec
name: exec3
to:
res: exec
kind: exec
name: exec5

View File

@@ -16,15 +16,15 @@ resources:
edges:
- name: e1
from:
res: noop
kind: noop
name: noop1
to:
res: file
kind: file
name: file1
- name: e2
from:
res: file
kind: file
name: file1
to:
res: svc
kind: svc
name: purpleidea

View File

@@ -20,12 +20,17 @@ package main
import (
"bufio"
"bytes"
"encoding/gob"
"errors"
"log"
"os/exec"
"strings"
)
func init() {
gob.Register(&ExecRes{})
}
type ExecRes struct {
BaseRes `yaml:",inline"`
State string `yaml:"state"` // state: exists/present?, absent, (undefined?)
@@ -97,7 +102,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan
}
// Exec watcher
func (obj *ExecRes) Watch() {
func (obj *ExecRes) Watch(processChan chan struct{}) {
if obj.IsWatching() {
return
}
@@ -188,7 +193,7 @@ func (obj *ExecRes) Watch() {
send = false
// it is okay to invalidate the clean state on poke too
obj.isStateOK = false // something made state dirty
Process(obj) // XXX: rename this function
processChan <- struct{}{} // trigger process
}
}
}

16
file.go
View File

@@ -22,6 +22,7 @@ import (
"encoding/hex"
"gopkg.in/fsnotify.v1"
//"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1"
"encoding/gob"
"io"
"log"
"math"
@@ -31,6 +32,10 @@ import (
"syscall"
)
func init() {
gob.Register(&FileRes{})
}
type FileRes struct {
BaseRes `yaml:",inline"`
Path string `yaml:"path"` // path variable (should default to name)
@@ -97,7 +102,7 @@ func (obj *FileRes) Validate() bool {
// File watcher for files and directories
// Modify with caution, probably important to write some test cases first!
// obj.GetPath(): file or directory
func (obj *FileRes) Watch() {
func (obj *FileRes) Watch(processChan chan struct{}) {
if obj.IsWatching() {
return
}
@@ -230,7 +235,7 @@ func (obj *FileRes) Watch() {
case err := <-watcher.Errors:
obj.SetConvergedState(resConvergedNil) // XXX ?
log.Println("error:", err)
log.Printf("error: %v", err)
log.Fatal(err)
//obj.events <- fmt.Sprintf("file: %v", "error") // XXX: how should we handle errors?
@@ -255,7 +260,7 @@ func (obj *FileRes) Watch() {
dirty = false
obj.isStateOK = false // something made state dirty
}
Process(obj) // XXX: rename this function
processChan <- struct{}{} // trigger process
}
}
}
@@ -488,3 +493,8 @@ func (obj *FileRes) Compare(res Res) bool {
}
return true
}
func (obj *FileRes) CollectPattern(pattern string) {
// XXX: currently the pattern for files can only override the Dirname variable :P
obj.Dirname = pattern // XXX: simplistic for now
}

33
main.go
View File

@@ -63,7 +63,7 @@ func run(c *cli.Context) {
converged := make(chan bool) // converged signal
log.Printf("This is: %v, version: %v", program, version)
log.Printf("Main: Start: %v", start)
G := NewGraph("Graph") // give graph a default name
var G, fullGraph *Graph
// exit after `max-runtime` seconds for no reason at all...
if i := c.Int("max-runtime"); i > 0 {
@@ -102,10 +102,11 @@ func run(c *cli.Context) {
if !c.Bool("no-watch") {
configchan = ConfigWatch(file)
}
log.Printf("Etcd: Starting...")
log.Println("Etcd: Starting...")
etcdchan := etcdO.EtcdWatch()
first := true // first loop or not
for {
log.Println("Main: Waiting...")
select {
case _ = <-startchan: // kick the loop once at start
// pass
@@ -134,17 +135,29 @@ func run(c *cli.Context) {
}
// run graph vertex LOCK...
if !first { // XXX: we can flatten this check out I think
log.Printf("State: %v -> %v", G.SetState(graphPausing), G.GetState())
if !first { // TODO: we can flatten this check out I think
G.Pause() // sync
log.Printf("State: %v -> %v", G.SetState(graphPaused), G.GetState())
}
// build the graph from a config file
// build the graph on events (eg: from etcd)
if !UpdateGraphFromConfig(config, hostname, G, etcdO) {
log.Fatal("Config: We borked the graph.") // XXX
// build graph from yaml file on events (eg: from etcd)
// we need the vertices to be paused to work on them
if newFullgraph, err := fullGraph.NewGraphFromConfig(config, etcdO, hostname); err == nil { // keep references to all original elements
fullGraph = newFullgraph
} else {
log.Printf("Config: Error making new graph from config: %v", err)
// unpause!
if !first {
G.Start(&wg, first) // sync
}
continue
}
G = fullGraph.Copy() // copy to active graph
// XXX: do etcd transaction out here...
G.AutoEdges() // add autoedges; modifies the graph
//G.AutoGroup() // run autogroup; modifies the graph // TODO
// TODO: do we want to do a transitive reduction?
log.Printf("Graph: %v", G) // show graph
err := G.ExecGraphviz(c.String("graphviz-filter"), c.String("graphviz"))
if err != nil {
@@ -159,9 +172,7 @@ func run(c *cli.Context) {
// some are not ready yet and the EtcdWatch
// loops, we'll cause G.Pause(...) before we
// even got going, thus causing nil pointer errors
log.Printf("State: %v -> %v", G.SetState(graphStarting), G.GetState())
G.Start(&wg, first) // sync
log.Printf("State: %v -> %v", G.SetState(graphStarted), G.GetState())
first = false
}
}()

41
misc.go
View File

@@ -18,9 +18,6 @@
package main
import (
"bytes"
"encoding/base64"
"encoding/gob"
"github.com/godbus/dbus"
"path"
"sort"
@@ -28,6 +25,11 @@ import (
"time"
)
// returns the string with the first character capitalized
func FirstToUpper(str string) string {
return strings.ToUpper(str[0:1]) + str[1:]
}
// return true if a string exists inside a list, otherwise false
func StrInList(needle string, haystack []string) bool {
for _, x := range haystack {
@@ -136,6 +138,9 @@ func Dirname(p string) string {
func Basename(p string) string {
_, b := path.Split(path.Clean(p))
if p == "" {
return ""
}
if p[len(p)-1:] == "/" { // don't loose the tail slash
b += "/"
}
@@ -265,36 +270,6 @@ func DirifyFileList(fileList []string, removeDirs bool) []string {
return result
}
// encode an object as base 64, serialize and then base64 encode
func ObjToB64(obj interface{}) (string, bool) {
b := bytes.Buffer{}
e := gob.NewEncoder(&b)
err := e.Encode(obj)
if err != nil {
//log.Println("Gob failed to Encode: ", err)
return "", false
}
return base64.StdEncoding.EncodeToString(b.Bytes()), true
}
// TODO: is it possible to somehow generically just return the obj?
// decode an object into the waiting obj which you pass a reference to
func B64ToObj(str string, obj interface{}) bool {
bb, err := base64.StdEncoding.DecodeString(str)
if err != nil {
//log.Println("Base64 failed to Decode: ", err)
return false
}
b := bytes.NewBuffer(bb)
d := gob.NewDecoder(b)
err = d.Decode(obj)
if err != nil {
//log.Println("Gob failed to Decode: ", err)
return false
}
return true
}
// special version of time.After that blocks when given a negative integer
// when used in a case statement, the timer restarts on each select call to it
func TimeAfterOrBlock(t int) <-chan time.Time {

View File

@@ -18,7 +18,6 @@
package main
import (
"fmt"
"reflect"
"sort"
"testing"
@@ -58,6 +57,9 @@ func TestMiscT1(t *testing.T) {
t.Errorf("Result is incorrect.")
}
if Basename("") != "" { // TODO: should this equal something different?
t.Errorf("Result is incorrect.")
}
}
func TestMiscT2(t *testing.T) {
@@ -169,57 +171,6 @@ func TestMiscT5(t *testing.T) {
}
}
func TestMiscT6(t *testing.T) {
type foo struct {
Name string `yaml:"name"`
Res string `yaml:"res"`
Value int `yaml:"value"`
}
obj := foo{"dude", "sweet", 42}
output, ok := ObjToB64(obj)
if ok != true {
t.Errorf("First result should be true.")
}
var data foo
if B64ToObj(output, &data) != true {
t.Errorf("Second result should be true.")
}
// TODO: there is probably a better way to compare these two...
if fmt.Sprintf("%+v\n", obj) != fmt.Sprintf("%+v\n", data) {
t.Errorf("Strings should match.")
}
}
func TestMiscT7(t *testing.T) {
type Foo struct {
Name string `yaml:"name"`
Res string `yaml:"res"`
Value int `yaml:"value"`
}
type bar struct {
Foo `yaml:",inline"` // anonymous struct must be public!
Comment string `yaml:"comment"`
}
obj := bar{Foo{"dude", "sweet", 42}, "hello world"}
output, ok := ObjToB64(obj)
if ok != true {
t.Errorf("First result should be true.")
}
var data bar
if B64ToObj(output, &data) != true {
t.Errorf("Second result should be true.")
}
// TODO: there is probably a better way to compare these two...
if fmt.Sprintf("%+v\n", obj) != fmt.Sprintf("%+v\n", data) {
t.Errorf("Strings should match.")
}
}
func TestMiscT8(t *testing.T) {
r0 := []string{"/"}

View File

@@ -18,9 +18,14 @@
package main
import (
"encoding/gob"
"log"
)
func init() {
gob.Register(&NoopRes{})
}
type NoopRes struct {
BaseRes `yaml:",inline"`
Comment string `yaml:"comment"` // extra field for example purposes
@@ -48,7 +53,7 @@ func (obj *NoopRes) Validate() bool {
return true
}
func (obj *NoopRes) Watch() {
func (obj *NoopRes) Watch(processChan chan struct{}) {
if obj.IsWatching() {
return
}
@@ -79,7 +84,7 @@ func (obj *NoopRes) Watch() {
send = false
// only do this on certain types of events
//obj.isStateOK = false // something made state dirty
Process(obj) // XXX: rename this function
processChan <- struct{}{} // trigger process
}
}
}

315
pgraph.go
View File

@@ -35,11 +35,11 @@ import (
type graphState int
const (
graphNil graphState = iota
graphStarting
graphStarted
graphPausing
graphPaused
graphStateNil graphState = iota
graphStateStarting
graphStateStarted
graphStatePausing
graphStatePaused
)
// The graph abstract data type (ADT) is defined as follows:
@@ -55,9 +55,8 @@ type Graph struct {
}
type Vertex struct {
graph *Graph // store a pointer to the graph it's on
Res // anonymous field
data map[string]string // XXX: currently unused i think, remove?
timestamp int64 // last updated timestamp ?
}
type Edge struct {
@@ -68,7 +67,7 @@ func NewGraph(name string) *Graph {
return &Graph{
Name: name,
Adjacency: make(map[*Vertex]map[*Vertex]*Edge),
state: graphNil,
state: graphStateNil,
}
}
@@ -84,6 +83,19 @@ func NewEdge(name string) *Edge {
}
}
// Copy makes a copy of the graph struct
func (g *Graph) Copy() *Graph {
newGraph := &Graph{
Name: g.Name,
Adjacency: make(map[*Vertex]map[*Vertex]*Edge, len(g.Adjacency)),
state: g.state,
}
for k, v := range g.Adjacency {
newGraph.Adjacency[k] = v // copy
}
return newGraph
}
// returns the name of the graph
func (g *Graph) GetName() string {
return g.Name
@@ -116,13 +128,12 @@ func (g *Graph) SetVertex() {
}
}
// add a new vertex to the graph
func (g *Graph) AddVertex(v *Vertex) {
// AddVertex uses variadic input to add all listed vertices to the graph
func (g *Graph) AddVertex(xv ...*Vertex) {
for _, v := range xv {
if _, exists := g.Adjacency[v]; !exists {
g.Adjacency[v] = make(map[*Vertex]*Edge)
// store a pointer to the graph it's on for convenience and readability
v.graph = g
}
}
}
@@ -136,9 +147,9 @@ func (g *Graph) DeleteVertex(v *Vertex) {
// adds a directed edge to the graph from v1 to v2
func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) {
// NOTE: this doesn't allow more than one edge between two vertexes...
// TODO: is this a problem?
g.AddVertex(v1)
g.AddVertex(v2)
g.AddVertex(v1, v2) // supports adding N vertices now
// TODO: check if an edge exists to avoid overwriting it!
// NOTE: VertexMerge() depends on overwriting it at the moment...
g.Adjacency[v1][v2] = e
}
@@ -198,6 +209,11 @@ func (g *Graph) String() string {
return fmt.Sprintf("Vertices(%d), Edges(%d)", g.NumVertices(), g.NumEdges())
}
// String returns the canonical form for a vertex
func (v *Vertex) String() string {
return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName())
}
// output the graph in graphviz format
// https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29
func (g *Graph) Graphviz() (out string) {
@@ -281,7 +297,7 @@ func (g *Graph) ExecGraphviz(program, filename string) error {
}
// return an array (slice) of all directed vertices to vertex v (??? -> v)
// ostimestamp should use this
// OKTimestamp should use this
func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex {
// TODO: we might be able to implement this differently by reversing
// the Adjacency graph and then looping through it again...
@@ -465,9 +481,105 @@ func (g *Graph) TopologicalSort() (result []*Vertex, ok bool) { // kahn's algori
return L, true
}
// return a pointer to the graph a vertex is on
func (v *Vertex) GetGraph() *Graph {
return v.graph
// Reachability finds the shortest path in a DAG from a to b, and returns the
// slice of vertices that matched this particular path including both a and b.
// It returns nil if a or b is nil, and returns empty list if no path is found.
// Since there could be more than one possible result for this operation, we
// arbitrarily choose one of the shortest possible. As a result, this should
// actually return a tree if we cared about correctness.
// This operates by a recursive algorithm; a more efficient version is likely.
// If you don't give this function a DAG, you might cause infinite recursion!
func (g *Graph) Reachability(a, b *Vertex) []*Vertex {
if a == nil || b == nil {
return nil
}
vertices := g.OutgoingGraphEdges(a) // what points away from a ?
if len(vertices) == 0 {
return []*Vertex{} // nope
}
if VertexContains(b, vertices) {
return []*Vertex{a, b} // found
}
// TODO: parallelize this with go routines?
var collected = make([][]*Vertex, len(vertices))
pick := -1
for i, v := range vertices {
collected[i] = g.Reachability(v, b) // find b by recursion
if l := len(collected[i]); l > 0 {
// pick shortest path
// TODO: technically i should return a tree
if pick < 0 || l < len(collected[pick]) {
pick = i
}
}
}
if pick < 0 {
return []*Vertex{} // nope
}
result := []*Vertex{a} // tack on a
result = append(result, collected[pick]...)
return result
}
// VertexMerge merges v2 into v1 by reattaching the edges where appropriate,
// and then by deleting v2 from the graph. Since more than one edge between two
// vertices is not allowed, duplicate edges are merged as well. an edge merge
// function can be provided if you'd like to control how you merge the edges!
func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) (*Vertex, error), edgeMergeFn func(*Edge, *Edge) *Edge) error {
// methodology
// 1) edges between v1 and v2 are removed
//Loop:
for k1 := range g.Adjacency {
for k2 := range g.Adjacency[k1] {
// v1 -> v2 || v2 -> v1
if (k1 == v1 && k2 == v2) || (k1 == v2 && k2 == v1) {
delete(g.Adjacency[k1], k2) // delete map & edge
// NOTE: if we assume this is a DAG, then we can
// assume only v1 -> v2 OR v2 -> v1 exists, and
// we can break out of these loops immediately!
//break Loop
break
}
}
}
// 2) edges that point towards v2 from X now point to v1 from X (no dupes)
for _, x := range g.IncomingGraphEdges(v2) { // all to vertex v (??? -> v)
e := g.Adjacency[x][v2] // previous edge
// merge e with ex := g.Adjacency[x][v1] if it exists!
if ex, exists := g.Adjacency[x][v1]; exists && edgeMergeFn != nil {
e = edgeMergeFn(e, ex)
}
g.AddEdge(x, v1, e) // overwrite edge
delete(g.Adjacency[x], v2) // delete old edge
}
// 3) edges that point from v2 to X now point from v1 to X (no dupes)
for _, x := range g.OutgoingGraphEdges(v2) { // all from vertex v (v -> ???)
e := g.Adjacency[v2][x] // previous edge
// merge e with ex := g.Adjacency[v1][x] if it exists!
if ex, exists := g.Adjacency[v1][x]; exists && edgeMergeFn != nil {
e = edgeMergeFn(e, ex)
}
g.AddEdge(v1, x, e) // overwrite edge
delete(g.Adjacency[v2], x)
}
// 4) merge and then remove the (now merged/grouped) vertex
if vertexMergeFn != nil { // run vertex merge function
if v, err := vertexMergeFn(v1, v2); err != nil {
return err
} else if v != nil { // replace v1 with the "merged" version...
v1 = v // XXX: will this replace v1 the way we want?
}
}
g.DeleteVertex(v2) // remove grouped vertex
// 5) creation of a cyclic graph should throw an error
if _, dag := g.TopologicalSort(); !dag { // am i a dag or not?
return fmt.Errorf("Graph is not a dag!")
}
return nil // success
}
func HeisenbergCount(ch chan *Vertex) int {
@@ -479,8 +591,134 @@ func HeisenbergCount(ch chan *Vertex) int {
return c
}
// GetTimestamp returns the timestamp of a vertex
func (v *Vertex) GetTimestamp() int64 {
return v.timestamp
}
// UpdateTimestamp updates the timestamp on a vertex and returns the new value
func (v *Vertex) UpdateTimestamp() int64 {
v.timestamp = time.Now().UnixNano() // update
return v.timestamp
}
// can this element run right now?
func (g *Graph) OKTimestamp(v *Vertex) bool {
// these are all the vertices pointing TO v, eg: ??? -> v
for _, n := range g.IncomingGraphEdges(v) {
// if the vertex has a greater timestamp than any pre-req (n)
// then we can't run right now...
// if they're equal (eg: on init of 0) then we also can't run
// b/c we should let our pre-req's go first...
x, y := v.GetTimestamp(), n.GetTimestamp()
if DEBUG {
log.Printf("%v[%v]: OKTimestamp: (%v) >= %v[%v](%v): !%v", v.Kind(), v.GetName(), x, n.Kind(), n.GetName(), y, x >= y)
}
if x >= y {
return false
}
}
return true
}
// notify nodes after me in the dependency graph that they need refreshing...
// NOTE: this assumes that this can never fail or need to be rescheduled
func (g *Graph) Poke(v *Vertex, activity bool) {
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
for _, n := range g.OutgoingGraphEdges(v) {
// XXX: if we're in state event and haven't been cancelled by
// apply, then we can cancel a poke to a child, right? XXX
// XXX: if n.Res.GetState() != resStateEvent { // is this correct?
if true { // XXX
if DEBUG {
log.Printf("%v[%v]: Poke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
}
n.SendEvent(eventPoke, false, activity) // XXX: can this be switched to sync?
} else {
if DEBUG {
log.Printf("%v[%v]: Poke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
}
}
}
}
// poke the pre-requisites that are stale and need to run before I can run...
func (g *Graph) BackPoke(v *Vertex) {
// these are all the vertices pointing TO v, eg: ??? -> v
for _, n := range g.IncomingGraphEdges(v) {
x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState()
// if the parent timestamp needs poking AND it's not in state
// resStateEvent, then poke it. If the parent is in resStateEvent it
// means that an event is pending, so we'll be expecting a poke
// back soon, so we can safely discard the extra parent poke...
// TODO: implement a stateLT (less than) to tell if something
// happens earlier in the state cycle and that doesn't wrap nil
if x >= y && (s != resStateEvent && s != resStateCheckApply) {
if DEBUG {
log.Printf("%v[%v]: BackPoke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
}
n.SendEvent(eventBackPoke, false, false) // XXX: can this be switched to sync?
} else {
if DEBUG {
log.Printf("%v[%v]: BackPoke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
}
}
}
}
// XXX: rename this function
func (g *Graph) Process(v *Vertex) {
obj := v.Res
if DEBUG {
log.Printf("%v[%v]: Process()", obj.Kind(), obj.GetName())
}
obj.SetState(resStateEvent)
var ok = true
var apply = false // did we run an apply?
// is it okay to run dependency wise right now?
// if not, that's okay because when the dependency runs, it will poke
// us back and we will run if needed then!
if g.OKTimestamp(v) {
if DEBUG {
log.Printf("%v[%v]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp())
}
obj.SetState(resStateCheckApply)
// if this fails, don't UpdateTimestamp()
stateok, err := obj.CheckApply(true)
if stateok && err != nil { // should never return this way
log.Fatalf("%v[%v]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), stateok, err)
}
if DEBUG {
log.Printf("%v[%v]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), stateok, err)
}
if !stateok { // if state *was* not ok, we had to have apply'ed
if err != nil { // error during check or apply
ok = false
} else {
apply = true
}
}
if ok {
// update this timestamp *before* we poke or the poked
// nodes might fail due to having a too old timestamp!
v.UpdateTimestamp() // this was touched...
obj.SetState(resStatePoking) // can't cancel parent poke
g.Poke(v, apply)
}
// poke at our pre-req's instead since they need to refresh/run...
} else {
// only poke at the pre-req's that need to run
go g.BackPoke(v)
}
}
// main kick to start the graph
func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
log.Printf("State: %v -> %v", g.SetState(graphStateStarting), g.GetState())
defer log.Printf("State: %v -> %v", g.SetState(graphStateStarted), g.GetState())
t, _ := g.TopologicalSort()
// TODO: only calculate indegree if `first` is true to save resources
indegree := g.InDegree() // compute all of the indegree's
@@ -492,7 +730,20 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
go func(vv *Vertex) {
defer wg.Done()
vv.Res.Watch()
// listen for chan events from Watch() and run
// the Process() function when they're received
// this avoids us having to pass the data into
// the Watch() function about which graph it is
// running on, which isolates things nicely...
chanProcess := make(chan struct{})
go func() {
for _ = range chanProcess {
// XXX: do we need to ACK so that it's synchronous?
g.Process(vv)
}
}()
vv.Res.Watch(chanProcess) // i block until i end
close(chanProcess)
log.Printf("%v[%v]: Exited", vv.Kind(), vv.GetName())
}(v)
}
@@ -511,7 +762,7 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
// and not just selectively the subset with no indegree.
if (!first) || indegree[v] == 0 {
// ensure state is started before continuing on to next vertex
for !v.Res.SendEvent(eventStart, true, false) {
for !v.SendEvent(eventStart, true, false) {
if DEBUG {
// if SendEvent fails, we aren't up yet
log.Printf("%v[%v]: Retrying SendEvent(Start)", v.Kind(), v.GetName())
@@ -525,13 +776,18 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue
}
func (g *Graph) Pause() {
log.Printf("State: %v -> %v", g.SetState(graphStatePausing), g.GetState())
defer log.Printf("State: %v -> %v", g.SetState(graphStatePaused), g.GetState())
t, _ := g.TopologicalSort()
for _, v := range t { // squeeze out the events...
v.Res.SendEvent(eventPause, true, false)
v.SendEvent(eventPause, true, false)
}
}
func (g *Graph) Exit() {
if g == nil {
return
} // empty graph that wasn't populated yet
t, _ := g.TopologicalSort()
for _, v := range t { // squeeze out the events...
// turn off the taps...
@@ -539,7 +795,7 @@ func (g *Graph) Exit() {
// when we hit the 'default' in the select statement!
// XXX: we can do this to quiesce, but it's not necessary now
v.Res.SendEvent(eventExit, true, false)
v.SendEvent(eventExit, true, false)
}
}
@@ -549,6 +805,7 @@ func (g *Graph) SetConvergedCallback(ctimeout int, converged chan bool) {
}
}
// in array function to test *Vertex in a slice of *Vertices
func VertexContains(needle *Vertex, haystack []*Vertex) bool {
for _, v := range haystack {
if needle == v {
@@ -558,16 +815,6 @@ func VertexContains(needle *Vertex, haystack []*Vertex) bool {
return false
}
// in array function to test *vertices in a slice of *vertices
func HasVertex(v *Vertex, haystack []*Vertex) bool {
for _, r := range haystack {
if v == r {
return true
}
}
return false
}
// reverse a list of vertices
func Reverse(vs []*Vertex) []*Vertex {
//var out []*Vertex // XXX: golint suggests, but it fails testing

View File

@@ -20,7 +20,10 @@
package main
import (
"fmt"
"reflect"
"sort"
"strings"
"testing"
)
@@ -254,26 +257,26 @@ func TestPgraphT8(t *testing.T) {
v1 := NewVertex(NewNoopRes("v1"))
v2 := NewVertex(NewNoopRes("v2"))
v3 := NewVertex(NewNoopRes("v3"))
if HasVertex(v1, []*Vertex{v1, v2, v3}) != true {
if VertexContains(v1, []*Vertex{v1, v2, v3}) != true {
t.Errorf("Should be true instead of false.")
}
v4 := NewVertex(NewNoopRes("v4"))
v5 := NewVertex(NewNoopRes("v5"))
v6 := NewVertex(NewNoopRes("v6"))
if HasVertex(v4, []*Vertex{v5, v6}) != false {
if VertexContains(v4, []*Vertex{v5, v6}) != false {
t.Errorf("Should be false instead of true.")
}
v7 := NewVertex(NewNoopRes("v7"))
v8 := NewVertex(NewNoopRes("v8"))
v9 := NewVertex(NewNoopRes("v9"))
if HasVertex(v8, []*Vertex{v7, v8, v9}) != true {
if VertexContains(v8, []*Vertex{v7, v8, v9}) != true {
t.Errorf("Should be true instead of false.")
}
v1b := NewVertex(NewNoopRes("v1")) // same value, different objects
if HasVertex(v1b, []*Vertex{v1, v2, v3}) != false {
if VertexContains(v1b, []*Vertex{v1, v2, v3}) != false {
t.Errorf("Should be false instead of true.")
}
}
@@ -381,6 +384,211 @@ func TestPgraphT10(t *testing.T) {
}
}
// empty
func TestPgraphReachability0(t *testing.T) {
{
G := NewGraph("g")
result := G.Reachability(nil, nil)
if result != nil {
t.Logf("Reachability failed!")
str := "Got:"
for _, v := range result {
str += " " + v.Res.GetName()
}
t.Errorf(str)
}
}
{
G := NewGraph("g")
v1 := NewVertex(NewNoopRes("v1"))
v6 := NewVertex(NewNoopRes("v6"))
result := G.Reachability(v1, v6)
expected := []*Vertex{}
if !reflect.DeepEqual(result, expected) {
t.Logf("Reachability failed!")
str := "Got:"
for _, v := range result {
str += " " + v.Res.GetName()
}
t.Errorf(str)
}
}
{
G := NewGraph("g")
v1 := NewVertex(NewNoopRes("v1"))
v2 := NewVertex(NewNoopRes("v2"))
v3 := NewVertex(NewNoopRes("v3"))
v4 := NewVertex(NewNoopRes("v4"))
v5 := NewVertex(NewNoopRes("v5"))
v6 := NewVertex(NewNoopRes("v6"))
e1 := NewEdge("e1")
e2 := NewEdge("e2")
e3 := NewEdge("e3")
e4 := NewEdge("e4")
e5 := NewEdge("e5")
G.AddEdge(v1, v2, e1)
G.AddEdge(v2, v3, e2)
G.AddEdge(v1, v4, e3)
G.AddEdge(v3, v4, e4)
G.AddEdge(v3, v5, e5)
result := G.Reachability(v1, v6)
expected := []*Vertex{}
if !reflect.DeepEqual(result, expected) {
t.Logf("Reachability failed!")
str := "Got:"
for _, v := range result {
str += " " + v.Res.GetName()
}
t.Errorf(str)
}
}
}
// simple linear path
func TestPgraphReachability1(t *testing.T) {
G := NewGraph("g")
v1 := NewVertex(NewNoopRes("v1"))
v2 := NewVertex(NewNoopRes("v2"))
v3 := NewVertex(NewNoopRes("v3"))
v4 := NewVertex(NewNoopRes("v4"))
v5 := NewVertex(NewNoopRes("v5"))
v6 := NewVertex(NewNoopRes("v6"))
e1 := NewEdge("e1")
e2 := NewEdge("e2")
e3 := NewEdge("e3")
e4 := NewEdge("e4")
e5 := NewEdge("e5")
//e6 := NewEdge("e6")
G.AddEdge(v1, v2, e1)
G.AddEdge(v2, v3, e2)
G.AddEdge(v3, v4, e3)
G.AddEdge(v4, v5, e4)
G.AddEdge(v5, v6, e5)
result := G.Reachability(v1, v6)
expected := []*Vertex{v1, v2, v3, v4, v5, v6}
if !reflect.DeepEqual(result, expected) {
t.Logf("Reachability failed!")
str := "Got:"
for _, v := range result {
str += " " + v.Res.GetName()
}
t.Errorf(str)
}
}
// pick one of two correct paths
func TestPgraphReachability2(t *testing.T) {
G := NewGraph("g")
v1 := NewVertex(NewNoopRes("v1"))
v2 := NewVertex(NewNoopRes("v2"))
v3 := NewVertex(NewNoopRes("v3"))
v4 := NewVertex(NewNoopRes("v4"))
v5 := NewVertex(NewNoopRes("v5"))
v6 := NewVertex(NewNoopRes("v6"))
e1 := NewEdge("e1")
e2 := NewEdge("e2")
e3 := NewEdge("e3")
e4 := NewEdge("e4")
e5 := NewEdge("e5")
e6 := NewEdge("e6")
G.AddEdge(v1, v2, e1)
G.AddEdge(v1, v3, e2)
G.AddEdge(v2, v4, e3)
G.AddEdge(v3, v4, e4)
G.AddEdge(v4, v5, e5)
G.AddEdge(v5, v6, e6)
result := G.Reachability(v1, v6)
expected1 := []*Vertex{v1, v2, v4, v5, v6}
expected2 := []*Vertex{v1, v3, v4, v5, v6}
// !xor test
if reflect.DeepEqual(result, expected1) == reflect.DeepEqual(result, expected2) {
t.Logf("Reachability failed!")
str := "Got:"
for _, v := range result {
str += " " + v.Res.GetName()
}
t.Errorf(str)
}
}
// pick shortest path
func TestPgraphReachability3(t *testing.T) {
G := NewGraph("g")
v1 := NewVertex(NewNoopRes("v1"))
v2 := NewVertex(NewNoopRes("v2"))
v3 := NewVertex(NewNoopRes("v3"))
v4 := NewVertex(NewNoopRes("v4"))
v5 := NewVertex(NewNoopRes("v5"))
v6 := NewVertex(NewNoopRes("v6"))
e1 := NewEdge("e1")
e2 := NewEdge("e2")
e3 := NewEdge("e3")
e4 := NewEdge("e4")
e5 := NewEdge("e5")
e6 := NewEdge("e6")
G.AddEdge(v1, v2, e1)
G.AddEdge(v2, v3, e2)
G.AddEdge(v3, v4, e3)
G.AddEdge(v4, v5, e4)
G.AddEdge(v1, v5, e5)
G.AddEdge(v5, v6, e6)
result := G.Reachability(v1, v6)
expected := []*Vertex{v1, v5, v6}
if !reflect.DeepEqual(result, expected) {
t.Logf("Reachability failed!")
str := "Got:"
for _, v := range result {
str += " " + v.Res.GetName()
}
t.Errorf(str)
}
}
// direct path
func TestPgraphReachability4(t *testing.T) {
G := NewGraph("g")
v1 := NewVertex(NewNoopRes("v1"))
v2 := NewVertex(NewNoopRes("v2"))
v3 := NewVertex(NewNoopRes("v3"))
v4 := NewVertex(NewNoopRes("v4"))
v5 := NewVertex(NewNoopRes("v5"))
v6 := NewVertex(NewNoopRes("v6"))
e1 := NewEdge("e1")
e2 := NewEdge("e2")
e3 := NewEdge("e3")
e4 := NewEdge("e4")
e5 := NewEdge("e5")
e6 := NewEdge("e6")
G.AddEdge(v1, v2, e1)
G.AddEdge(v2, v3, e2)
G.AddEdge(v3, v4, e3)
G.AddEdge(v4, v5, e4)
G.AddEdge(v5, v6, e5)
G.AddEdge(v1, v6, e6)
result := G.Reachability(v1, v6)
expected := []*Vertex{v1, v6}
if !reflect.DeepEqual(result, expected) {
t.Logf("Reachability failed!")
str := "Got:"
for _, v := range result {
str += " " + v.Res.GetName()
}
t.Errorf(str)
}
}
func TestPgraphT11(t *testing.T) {
v1 := NewVertex(NewNoopRes("v1"))
v2 := NewVertex(NewNoopRes("v2"))
@@ -404,5 +612,647 @@ func TestPgraphT11(t *testing.T) {
if rev := Reverse([]*Vertex{v6, v5, v4, v3, v2, v1}); !reflect.DeepEqual(rev, []*Vertex{v1, v2, v3, v4, v5, v6}) {
t.Errorf("Reverse of vertex slice failed.")
}
}
type NoopResTest struct {
NoopRes
}
func (obj *NoopResTest) GroupCmp(r Res) bool {
res, ok := r.(*NoopResTest)
if !ok {
return false
}
// TODO: implement this in vertexCmp for *testBaseGrouper instead?
if strings.Contains(res.Name, ",") { // HACK
return false // element to be grouped is already grouped!
}
// group if they start with the same letter! (helpful hack for testing)
return obj.Name[0] == res.Name[0]
}
func NewNoopResTest(name string) *NoopResTest {
obj := &NoopResTest{
NoopRes: NoopRes{
BaseRes: BaseRes{
Name: name,
Meta: MetaParams{
AutoGroup: true, // always autogroup
},
},
},
}
obj.Init() // optional here in this testing scenario (for now)
return obj
}
// ListStrCmp compares two lists of strings
func ListStrCmp(a, b []string) bool {
//fmt.Printf("CMP: %v with %v\n", a, b) // debugging
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
// GraphCmp compares the topology of two graphs and returns nil if they're equal
// It also compares if grouped element groups are identical
func GraphCmp(g1, g2 *Graph) error {
if n1, n2 := g1.NumVertices(), g2.NumVertices(); n1 != n2 {
return fmt.Errorf("Graph g1 has %d vertices, while g2 has %d.", n1, n2)
}
if e1, e2 := g1.NumEdges(), g2.NumEdges(); e1 != e2 {
return fmt.Errorf("Graph g1 has %d edges, while g2 has %d.", e1, e2)
}
var m = make(map[*Vertex]*Vertex) // g1 to g2 vertex correspondence
Loop:
// check vertices
for v1 := range g1.Adjacency { // for each vertex in g1
l1 := strings.Split(v1.GetName(), ",") // make list of everyone's names...
for _, x1 := range v1.GetGroup() {
l1 = append(l1, x1.GetName()) // add my contents
}
l1 = StrRemoveDuplicatesInList(l1) // remove duplicates
sort.Strings(l1)
// inner loop
for v2 := range g2.Adjacency { // does it match in g2 ?
l2 := strings.Split(v2.GetName(), ",")
for _, x2 := range v2.GetGroup() {
l2 = append(l2, x2.GetName())
}
l2 = StrRemoveDuplicatesInList(l2) // remove duplicates
sort.Strings(l2)
// does l1 match l2 ?
if ListStrCmp(l1, l2) { // cmp!
m[v1] = v2
continue Loop
}
}
return fmt.Errorf("Graph g1, has no match in g2 for: %v", v1.GetName())
}
// vertices (and groups) match :)
// check edges
for v1 := range g1.Adjacency { // for each vertex in g1
v2 := m[v1] // lookup in map to get correspondance
// g1.Adjacency[v1] corresponds to g2.Adjacency[v2]
if e1, e2 := len(g1.Adjacency[v1]), len(g2.Adjacency[v2]); e1 != e2 {
return fmt.Errorf("Graph g1, vertex(%v) has %d edges, while g2, vertex(%v) has %d.", v1.GetName(), e1, v2.GetName(), e2)
}
for vv1, ee1 := range g1.Adjacency[v1] {
vv2 := m[vv1]
ee2 := g2.Adjacency[v2][vv2]
// these are edges from v1 -> vv1 via ee1 (graph 1)
// to cmp to edges from v2 -> vv2 via ee2 (graph 2)
// check: (1) vv1 == vv2 ? (we've already checked this!)
l1 := strings.Split(vv1.GetName(), ",") // make list of everyone's names...
for _, x1 := range vv1.GetGroup() {
l1 = append(l1, x1.GetName()) // add my contents
}
l1 = StrRemoveDuplicatesInList(l1) // remove duplicates
sort.Strings(l1)
l2 := strings.Split(vv2.GetName(), ",")
for _, x2 := range vv2.GetGroup() {
l2 = append(l2, x2.GetName())
}
l2 = StrRemoveDuplicatesInList(l2) // remove duplicates
sort.Strings(l2)
// does l1 match l2 ?
if !ListStrCmp(l1, l2) { // cmp!
return fmt.Errorf("Graph g1 and g2 don't agree on: %v and %v", vv1.GetName(), vv2.GetName())
}
// check: (2) ee1 == ee2
if ee1.Name != ee2.Name {
return fmt.Errorf("Graph g1 edge(%v) doesn't match g2 edge(%v)", ee1.Name, ee2.Name)
}
}
}
return nil // success!
}
type testBaseGrouper struct { // FIXME: update me when we've implemented the correct grouping algorithm!
baseGrouper // "inherit" what we want, and reimplement the rest
}
func (ag *testBaseGrouper) name() string {
return "testBaseGrouper"
}
func (ag *testBaseGrouper) vertexMerge(v1, v2 *Vertex) (v *Vertex, err error) {
if err := v1.Res.GroupRes(v2.Res); err != nil { // group them first
return nil, err
}
// HACK: update the name so it matches full list of self+grouped
obj := v1.Res
names := strings.Split(obj.GetName(), ",") // load in stored names
for _, n := range obj.GetGroup() {
names = append(names, n.GetName()) // add my contents
}
names = StrRemoveDuplicatesInList(names) // remove duplicates
sort.Strings(names)
obj.SetName(strings.Join(names, ","))
return // success or fail, and no need to merge the actual vertices!
}
func (ag *testBaseGrouper) edgeMerge(e1, e2 *Edge) *Edge {
// HACK: update the name so it makes a union of both names
n1 := strings.Split(e1.Name, ",") // load
n2 := strings.Split(e2.Name, ",") // load
names := append(n1, n2...)
names = StrRemoveDuplicatesInList(names) // remove duplicates
sort.Strings(names)
return NewEdge(strings.Join(names, ","))
}
func (g *Graph) fullPrint() (str string) {
str += "\n"
for v := range g.Adjacency {
str += fmt.Sprintf("* v: %v\n", v.GetName())
// TODO: add explicit grouping data?
}
for v1 := range g.Adjacency {
for v2, e := range g.Adjacency[v1] {
str += fmt.Sprintf("* e: %v -> %v # %v\n", v1.GetName(), v2.GetName(), e.Name)
}
}
return
}
// helper function
func runGraphCmp(t *testing.T, g1, g2 *Graph) {
// FIXME: update me when we've implemented the correct grouping algorithm!
ch := g1.autoGroup(&testBaseGrouper{}) // edits the graph
for _ = range ch { // bleed the channel or it won't run :(
// pass
}
err := GraphCmp(g1, g2)
if err != nil {
t.Logf(" actual (g1): %v%v", g1, g1.fullPrint())
t.Logf("expected (g2): %v%v", g2, g2.fullPrint())
t.Logf("Cmp error:")
t.Errorf("%v", err)
}
}
// all of the following test cases are layed out with the following semantics:
// * vertices which start with the same single letter are considered "like"
// * "like" elements should be merged
// * vertices can have any integer after their single letter "family" type
// * grouped vertices should have a name with a comma separated list of names
// * edges follow the same conventions about grouping
// empty graph
func TestPgraphGrouping1(t *testing.T) {
g1 := NewGraph("g1") // original graph
g2 := NewGraph("g2") // expected result
runGraphCmp(t, g1, g2)
}
// single vertex
func TestPgraphGrouping2(t *testing.T) {
g1 := NewGraph("g1") // original graph
{ // grouping to limit variable scope
a1 := NewVertex(NewNoopResTest("a1"))
g1.AddVertex(a1)
}
g2 := NewGraph("g2") // expected result
{
a1 := NewVertex(NewNoopResTest("a1"))
g2.AddVertex(a1)
}
runGraphCmp(t, g1, g2)
}
// two vertices
func TestPgraphGrouping3(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
b1 := NewVertex(NewNoopResTest("b1"))
g1.AddVertex(a1, b1)
}
g2 := NewGraph("g2") // expected result
{
a1 := NewVertex(NewNoopResTest("a1"))
b1 := NewVertex(NewNoopResTest("b1"))
g2.AddVertex(a1, b1)
}
runGraphCmp(t, g1, g2)
}
// two vertices merge
func TestPgraphGrouping4(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
g1.AddVertex(a1, a2)
}
g2 := NewGraph("g2") // expected result
{
a := NewVertex(NewNoopResTest("a1,a2"))
g2.AddVertex(a)
}
runGraphCmp(t, g1, g2)
}
// three vertices merge
func TestPgraphGrouping5(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
a3 := NewVertex(NewNoopResTest("a3"))
g1.AddVertex(a1, a2, a3)
}
g2 := NewGraph("g2") // expected result
{
a := NewVertex(NewNoopResTest("a1,a2,a3"))
g2.AddVertex(a)
}
runGraphCmp(t, g1, g2)
}
// three vertices, two merge
func TestPgraphGrouping6(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
b1 := NewVertex(NewNoopResTest("b1"))
g1.AddVertex(a1, a2, b1)
}
g2 := NewGraph("g2") // expected result
{
a := NewVertex(NewNoopResTest("a1,a2"))
b1 := NewVertex(NewNoopResTest("b1"))
g2.AddVertex(a, b1)
}
runGraphCmp(t, g1, g2)
}
// four vertices, three merge
func TestPgraphGrouping7(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
a3 := NewVertex(NewNoopResTest("a3"))
b1 := NewVertex(NewNoopResTest("b1"))
g1.AddVertex(a1, a2, a3, b1)
}
g2 := NewGraph("g2") // expected result
{
a := NewVertex(NewNoopResTest("a1,a2,a3"))
b1 := NewVertex(NewNoopResTest("b1"))
g2.AddVertex(a, b1)
}
runGraphCmp(t, g1, g2)
}
// four vertices, two&two merge
func TestPgraphGrouping8(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
b1 := NewVertex(NewNoopResTest("b1"))
b2 := NewVertex(NewNoopResTest("b2"))
g1.AddVertex(a1, a2, b1, b2)
}
g2 := NewGraph("g2") // expected result
{
a := NewVertex(NewNoopResTest("a1,a2"))
b := NewVertex(NewNoopResTest("b1,b2"))
g2.AddVertex(a, b)
}
runGraphCmp(t, g1, g2)
}
// five vertices, two&three merge
func TestPgraphGrouping9(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
b1 := NewVertex(NewNoopResTest("b1"))
b2 := NewVertex(NewNoopResTest("b2"))
b3 := NewVertex(NewNoopResTest("b3"))
g1.AddVertex(a1, a2, b1, b2, b3)
}
g2 := NewGraph("g2") // expected result
{
a := NewVertex(NewNoopResTest("a1,a2"))
b := NewVertex(NewNoopResTest("b1,b2,b3"))
g2.AddVertex(a, b)
}
runGraphCmp(t, g1, g2)
}
// three unique vertices
func TestPgraphGrouping10(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
b1 := NewVertex(NewNoopResTest("b1"))
c1 := NewVertex(NewNoopResTest("c1"))
g1.AddVertex(a1, b1, c1)
}
g2 := NewGraph("g2") // expected result
{
a1 := NewVertex(NewNoopResTest("a1"))
b1 := NewVertex(NewNoopResTest("b1"))
c1 := NewVertex(NewNoopResTest("c1"))
g2.AddVertex(a1, b1, c1)
}
runGraphCmp(t, g1, g2)
}
// three unique vertices, two merge
func TestPgraphGrouping11(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
b1 := NewVertex(NewNoopResTest("b1"))
b2 := NewVertex(NewNoopResTest("b2"))
c1 := NewVertex(NewNoopResTest("c1"))
g1.AddVertex(a1, b1, b2, c1)
}
g2 := NewGraph("g2") // expected result
{
a1 := NewVertex(NewNoopResTest("a1"))
b := NewVertex(NewNoopResTest("b1,b2"))
c1 := NewVertex(NewNoopResTest("c1"))
g2.AddVertex(a1, b, c1)
}
runGraphCmp(t, g1, g2)
}
// simple merge 1
// a1 a2 a1,a2
// \ / >>> | (arrows point downwards)
// b b
func TestPgraphGrouping12(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
b1 := NewVertex(NewNoopResTest("b1"))
e1 := NewEdge("e1")
e2 := NewEdge("e2")
g1.AddEdge(a1, b1, e1)
g1.AddEdge(a2, b1, e2)
}
g2 := NewGraph("g2") // expected result
{
a := NewVertex(NewNoopResTest("a1,a2"))
b1 := NewVertex(NewNoopResTest("b1"))
e := NewEdge("e1,e2")
g2.AddEdge(a, b1, e)
}
runGraphCmp(t, g1, g2)
}
// simple merge 2
// b b
// / \ >>> | (arrows point downwards)
// a1 a2 a1,a2
func TestPgraphGrouping13(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
b1 := NewVertex(NewNoopResTest("b1"))
e1 := NewEdge("e1")
e2 := NewEdge("e2")
g1.AddEdge(b1, a1, e1)
g1.AddEdge(b1, a2, e2)
}
g2 := NewGraph("g2") // expected result
{
a := NewVertex(NewNoopResTest("a1,a2"))
b1 := NewVertex(NewNoopResTest("b1"))
e := NewEdge("e1,e2")
g2.AddEdge(b1, a, e)
}
runGraphCmp(t, g1, g2)
}
// triple merge
// a1 a2 a3 a1,a2,a3
// \ | / >>> | (arrows point downwards)
// b b
func TestPgraphGrouping14(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
a3 := NewVertex(NewNoopResTest("a3"))
b1 := NewVertex(NewNoopResTest("b1"))
e1 := NewEdge("e1")
e2 := NewEdge("e2")
e3 := NewEdge("e3")
g1.AddEdge(a1, b1, e1)
g1.AddEdge(a2, b1, e2)
g1.AddEdge(a3, b1, e3)
}
g2 := NewGraph("g2") // expected result
{
a := NewVertex(NewNoopResTest("a1,a2,a3"))
b1 := NewVertex(NewNoopResTest("b1"))
e := NewEdge("e1,e2,e3")
g2.AddEdge(a, b1, e)
}
runGraphCmp(t, g1, g2)
}
// chain merge
// a1 a1
// / \ |
// b1 b2 >>> b1,b2 (arrows point downwards)
// \ / |
// c1 c1
func TestPgraphGrouping15(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
b1 := NewVertex(NewNoopResTest("b1"))
b2 := NewVertex(NewNoopResTest("b2"))
c1 := NewVertex(NewNoopResTest("c1"))
e1 := NewEdge("e1")
e2 := NewEdge("e2")
e3 := NewEdge("e3")
e4 := NewEdge("e4")
g1.AddEdge(a1, b1, e1)
g1.AddEdge(a1, b2, e2)
g1.AddEdge(b1, c1, e3)
g1.AddEdge(b2, c1, e4)
}
g2 := NewGraph("g2") // expected result
{
a1 := NewVertex(NewNoopResTest("a1"))
b := NewVertex(NewNoopResTest("b1,b2"))
c1 := NewVertex(NewNoopResTest("c1"))
e1 := NewEdge("e1,e2")
e2 := NewEdge("e3,e4")
g2.AddEdge(a1, b, e1)
g2.AddEdge(b, c1, e2)
}
runGraphCmp(t, g1, g2)
}
/* FIXME: uncomment me when we've implemented the correct grouping algorithm!
// reattach 1 (outer)
// a1 a2 a1,a2
// | / |
// b1 / >>> b1 (arrows point downwards)
// | / |
// c1 c1
func TestPgraphGrouping16(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
b1 := NewVertex(NewNoopResTest("b1"))
c1 := NewVertex(NewNoopResTest("c1"))
e1 := NewEdge("e1")
e2 := NewEdge("e2")
e3 := NewEdge("e3")
g1.AddEdge(a1, b1, e1)
g1.AddEdge(b1, c1, e2)
g1.AddEdge(a2, c1, e3)
}
g2 := NewGraph("g2") // expected result
{
a := NewVertex(NewNoopResTest("a1,a2"))
b1 := NewVertex(NewNoopResTest("b1"))
c1 := NewVertex(NewNoopResTest("c1"))
e1 := NewEdge("e1,e3")
e2 := NewEdge("e2") // TODO: should this be e2,e3 (eg we split e3?)
g2.AddEdge(a, b1, e1)
g2.AddEdge(b1, c1, e2)
}
runGraphCmp(t, g1, g2)
}
// reattach 2 (inner)
// a1 b2 a1
// | / |
// b1 / >>> b1,b2 (arrows point downwards)
// | / |
// c1 c1
func TestPgraphGrouping17(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
b1 := NewVertex(NewNoopResTest("b1"))
b2 := NewVertex(NewNoopResTest("b2"))
c1 := NewVertex(NewNoopResTest("c1"))
e1 := NewEdge("e1")
e2 := NewEdge("e2")
e3 := NewEdge("e3")
g1.AddEdge(a1, b1, e1)
g1.AddEdge(b1, c1, e2)
g1.AddEdge(b2, c1, e3)
}
g2 := NewGraph("g2") // expected result
{
a1 := NewVertex(NewNoopResTest("a1"))
b := NewVertex(NewNoopResTest("b1,b2"))
c1 := NewVertex(NewNoopResTest("c1"))
e1 := NewEdge("e1")
e2 := NewEdge("e2,e3")
g2.AddEdge(a1, b, e1)
g2.AddEdge(b, c1, e2)
}
runGraphCmp(t, g1, g2)
}
// re-attach 3 (double)
// a2 a1 b2 a1,a2
// \ | / |
// \ b1 / >>> b1,b2 (arrows point downwards)
// \ | / |
// c1 c1
func TestPgraphGrouping18(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
b1 := NewVertex(NewNoopResTest("b1"))
b2 := NewVertex(NewNoopResTest("b2"))
c1 := NewVertex(NewNoopResTest("c1"))
e1 := NewEdge("e1")
e2 := NewEdge("e2")
e3 := NewEdge("e3")
e4 := NewEdge("e4")
g1.AddEdge(a1, b1, e1)
g1.AddEdge(b1, c1, e2)
g1.AddEdge(a2, c1, e3)
g1.AddEdge(b2, c1, e4)
}
g2 := NewGraph("g2") // expected result
{
a := NewVertex(NewNoopResTest("a1,a2"))
b := NewVertex(NewNoopResTest("b1,b2"))
c1 := NewVertex(NewNoopResTest("c1"))
e1 := NewEdge("e1,e3")
e2 := NewEdge("e2,e4")
g2.AddEdge(a, b, e1)
g2.AddEdge(b, c1, e2)
}
runGraphCmp(t, g1, g2)
}
// tricky merge, (no change or merge?)
// a1 a1
// \ >>> \ (arrows point downwards)
// a2 a2
func TestPgraphGroupingTricky1(t *testing.T) {
g1 := NewGraph("g1") // original graph
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
e1 := NewEdge("e1")
g1.AddEdge(a1, a2, e1)
}
g2 := NewGraph("g2") // expected result ?
{
a1 := NewVertex(NewNoopResTest("a1"))
a2 := NewVertex(NewNoopResTest("a2"))
e1 := NewEdge("e1")
g2.AddEdge(a1, a2, e1)
}
//g3 := NewGraph("g2") // expected result ?
//{
// a := NewVertex(NewNoopResTest("a1,a2"))
//}
runGraphCmp(t, g1, g2) // TODO: i'm tempted to think this is correct
//runGraphCmp(t, g1, g3)
}
*/

9
pkg.go
View File

@@ -19,6 +19,7 @@ package main
import (
//"packagekit" // TODO
"encoding/gob"
"errors"
"fmt"
"log"
@@ -26,6 +27,10 @@ import (
"strings"
)
func init() {
gob.Register(&PkgRes{})
}
type PkgRes struct {
BaseRes `yaml:",inline"`
State string `yaml:"state"` // state: installed, uninstalled, newest, <version>
@@ -102,7 +107,7 @@ func (obj *PkgRes) Validate() bool {
// use UpdatesChanged signal to watch for changes
// TODO: https://github.com/hughsie/PackageKit/issues/109
// TODO: https://github.com/hughsie/PackageKit/issues/110
func (obj *PkgRes) Watch() {
func (obj *PkgRes) Watch(processChan chan struct{}) {
if obj.IsWatching() {
return
}
@@ -168,7 +173,7 @@ func (obj *PkgRes) Watch() {
dirty = false
obj.isStateOK = false // something made state dirty
}
Process(obj) // XXX: rename this function
processChan <- struct{}{} // trigger process
}
}
}

View File

@@ -18,9 +18,11 @@
package main
import (
"bytes"
"encoding/base64"
"encoding/gob"
"fmt"
"log"
"time"
)
//go:generate stringer -type=resState -output=resstate_stringer.go
@@ -73,22 +75,19 @@ type MetaParams struct {
// everything here only needs to be implemented once, in the BaseRes
type Base interface {
GetName() string // can't be named "Name()" because of struct field
SetName(string)
Kind() string
GetMeta() MetaParams
SetVertex(*Vertex)
SetConvergedCallback(ctimeout int, converged chan bool)
SendEvent(eventName, bool, bool) bool
IsWatching() bool
SetWatching(bool)
GetConvergedState() resConvergedState
SetConvergedState(resConvergedState)
GetState() resState
SetState(resState)
GetTimestamp() int64
UpdateTimestamp() int64
OKTimestamp() bool
Poke(bool)
BackPoke()
SendEvent(eventName, bool, bool) bool
ReadEvent(*Event) (bool, bool) // TODO: optional here?
GroupCmp(Res) bool // TODO: is there a better name for this?
GroupRes(Res) error // group resource (arg) into self
IsGrouped() bool // am I grouped?
@@ -103,17 +102,17 @@ type Res interface {
Init()
//Validate() bool // TODO: this might one day be added
GetUUIDs() []ResUUID // most resources only return one
Watch()
Watch(chan struct{}) // send on channel to signal process() events
CheckApply(bool) (bool, error)
AutoEdges() AutoEdge
Compare(Res) bool
CollectPattern(string) // XXX: temporary until Res collection is more advanced
}
type BaseRes struct {
Name string `yaml:"name"`
Meta MetaParams `yaml:"meta"` // struct of all the metaparams
kind string
timestamp int64 // last updated timestamp ?
events chan Event
vertex *Vertex
state resState
@@ -168,11 +167,15 @@ func (obj *BaseRes) Init() {
obj.events = make(chan Event) // unbuffered chan size to avoid stale events
}
// this method gets used by all the resources, if we have one of (obj NoopRes) it would get overridden in that case!
// this method gets used by all the resources
func (obj *BaseRes) GetName() string {
return obj.Name
}
func (obj *BaseRes) SetName(name string) {
obj.Name = name
}
// return the kind of resource this is
func (obj *BaseRes) Kind() string {
return obj.kind
@@ -224,87 +227,6 @@ func (obj *BaseRes) SetState(state resState) {
obj.state = state
}
// GetTimestamp returns the timestamp of a vertex
func (obj *BaseRes) GetTimestamp() int64 {
return obj.timestamp
}
// UpdateTimestamp updates the timestamp on a vertex and returns the new value
func (obj *BaseRes) UpdateTimestamp() int64 {
obj.timestamp = time.Now().UnixNano() // update
return obj.timestamp
}
// can this element run right now?
func (obj *BaseRes) OKTimestamp() bool {
v := obj.GetVertex()
g := v.GetGraph()
// these are all the vertices pointing TO v, eg: ??? -> v
for _, n := range g.IncomingGraphEdges(v) {
// if the vertex has a greater timestamp than any pre-req (n)
// then we can't run right now...
// if they're equal (eg: on init of 0) then we also can't run
// b/c we should let our pre-req's go first...
x, y := obj.GetTimestamp(), n.Res.GetTimestamp()
if DEBUG {
log.Printf("%v[%v]: OKTimestamp: (%v) >= %v[%v](%v): !%v", obj.Kind(), obj.GetName(), x, n.Kind(), n.GetName(), y, x >= y)
}
if x >= y {
return false
}
}
return true
}
// notify nodes after me in the dependency graph that they need refreshing...
// NOTE: this assumes that this can never fail or need to be rescheduled
func (obj *BaseRes) Poke(activity bool) {
v := obj.GetVertex()
g := v.GetGraph()
// these are all the vertices pointing AWAY FROM v, eg: v -> ???
for _, n := range g.OutgoingGraphEdges(v) {
// XXX: if we're in state event and haven't been cancelled by
// apply, then we can cancel a poke to a child, right? XXX
// XXX: if n.Res.GetState() != resStateEvent { // is this correct?
if true { // XXX
if DEBUG {
log.Printf("%v[%v]: Poke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
}
n.SendEvent(eventPoke, false, activity) // XXX: can this be switched to sync?
} else {
if DEBUG {
log.Printf("%v[%v]: Poke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
}
}
}
}
// poke the pre-requisites that are stale and need to run before I can run...
func (obj *BaseRes) BackPoke() {
v := obj.GetVertex()
g := v.GetGraph()
// these are all the vertices pointing TO v, eg: ??? -> v
for _, n := range g.IncomingGraphEdges(v) {
x, y, s := obj.GetTimestamp(), n.Res.GetTimestamp(), n.Res.GetState()
// if the parent timestamp needs poking AND it's not in state
// resStateEvent, then poke it. If the parent is in resStateEvent it
// means that an event is pending, so we'll be expecting a poke
// back soon, so we can safely discard the extra parent poke...
// TODO: implement a stateLT (less than) to tell if something
// happens earlier in the state cycle and that doesn't wrap nil
if x >= y && (s != resStateEvent && s != resStateCheckApply) {
if DEBUG {
log.Printf("%v[%v]: BackPoke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName())
}
n.SendEvent(eventBackPoke, false, false) // XXX: can this be switched to sync?
} else {
if DEBUG {
log.Printf("%v[%v]: BackPoke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName())
}
}
}
}
// push an event into the message queue for a particular vertex
func (obj *BaseRes) SendEvent(event eventName, sync bool, activity bool) bool {
// TODO: isn't this race-y ?
@@ -394,50 +316,38 @@ func (obj *BaseRes) SetGroup(g []Res) {
obj.grouped = g
}
// XXX: rename this function
func Process(obj Res) {
if DEBUG {
log.Printf("%v[%v]: Process()", obj.Kind(), obj.GetName())
}
obj.SetState(resStateEvent)
var ok = true
var apply = false // did we run an apply?
// is it okay to run dependency wise right now?
// if not, that's okay because when the dependency runs, it will poke
// us back and we will run if needed then!
if obj.OKTimestamp() {
if DEBUG {
log.Printf("%v[%v]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), obj.GetTimestamp())
}
obj.SetState(resStateCheckApply)
// if this fails, don't UpdateTimestamp()
stateok, err := obj.CheckApply(true)
if stateok && err != nil { // should never return this way
log.Fatalf("%v[%v]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), stateok, err)
}
if DEBUG {
log.Printf("%v[%v]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), stateok, err)
}
if !stateok { // if state *was* not ok, we had to have apply'ed
if err != nil { // error during check or apply
ok = false
} else {
apply = true
}
}
if ok {
// update this timestamp *before* we poke or the poked
// nodes might fail due to having a too old timestamp!
obj.UpdateTimestamp() // this was touched...
obj.SetState(resStatePoking) // can't cancel parent poke
obj.Poke(apply)
}
// poke at our pre-req's instead since they need to refresh/run...
} else {
// only poke at the pre-req's that need to run
go obj.BackPoke()
}
func (obj *BaseRes) CollectPattern(pattern string) {
// XXX: default method is empty
}
// ResToB64 encodes a resource to a base64 encoded string (after serialization)
func ResToB64(res Res) (string, error) {
b := bytes.Buffer{}
e := gob.NewEncoder(&b)
err := e.Encode(&res) // pass with &
if err != nil {
return "", fmt.Errorf("Gob failed to encode: %v", err)
}
return base64.StdEncoding.EncodeToString(b.Bytes()), nil
}
// B64ToRes decodes a resource from a base64 encoded string (after deserialization)
func B64ToRes(str string) (Res, error) {
var output interface{}
bb, err := base64.StdEncoding.DecodeString(str)
if err != nil {
return nil, fmt.Errorf("Base64 failed to decode: %v", err)
}
b := bytes.NewBuffer(bb)
d := gob.NewDecoder(b)
err = d.Decode(&output) // pass with &
if err != nil {
return nil, fmt.Errorf("Gob failed to decode: %v", err)
}
res, ok := output.(Res)
if !ok {
return nil, fmt.Errorf("Output %v is not a Res", res)
}
return res, nil
}

105
resources_test.go Normal file
View File

@@ -0,0 +1,105 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
import (
"bytes"
"encoding/base64"
"encoding/gob"
"testing"
)
func TestMiscEncodeDecode1(t *testing.T) {
var err error
//gob.Register( &NoopRes{} ) // happens in noop.go : init()
//gob.Register( &FileRes{} ) // happens in file.go : init()
// ...
// encode
var input interface{} = &FileRes{}
b1 := bytes.Buffer{}
e := gob.NewEncoder(&b1)
err = e.Encode(&input) // pass with &
if err != nil {
t.Errorf("Gob failed to Encode: %v", err)
}
str := base64.StdEncoding.EncodeToString(b1.Bytes())
// decode
var output interface{}
bb, err := base64.StdEncoding.DecodeString(str)
if err != nil {
t.Errorf("Base64 failed to Decode: %v", err)
}
b2 := bytes.NewBuffer(bb)
d := gob.NewDecoder(b2)
err = d.Decode(&output) // pass with &
if err != nil {
t.Errorf("Gob failed to Decode: %v", err)
}
res1, ok := input.(Res)
if !ok {
t.Errorf("Input %v is not a Res", res1)
return
}
res2, ok := output.(Res)
if !ok {
t.Errorf("Output %v is not a Res", res2)
return
}
if !res1.Compare(res2) {
t.Error("The input and output Res values do not match!")
}
}
func TestMiscEncodeDecode2(t *testing.T) {
var err error
//gob.Register( &NoopRes{} ) // happens in noop.go : init()
//gob.Register( &FileRes{} ) // happens in file.go : init()
// ...
// encode
var input Res = &FileRes{}
b64, err := ResToB64(input)
if err != nil {
t.Errorf("Can't encode: %v", err)
return
}
output, err := B64ToRes(b64)
if err != nil {
t.Errorf("Can't decode: %v", err)
return
}
res1, ok := input.(Res)
if !ok {
t.Errorf("Input %v is not a Res", res1)
return
}
res2, ok := output.(Res)
if !ok {
t.Errorf("Output %v is not a Res", res2)
return
}
if !res1.Compare(res2) {
t.Error("The input and output Res values do not match!")
}
}

11
svc.go
View File

@@ -20,6 +20,7 @@
package main
import (
"encoding/gob"
"errors"
"fmt"
systemd "github.com/coreos/go-systemd/dbus" // change namespace
@@ -28,6 +29,10 @@ import (
"log"
)
func init() {
gob.Register(&SvcRes{})
}
type SvcRes struct {
BaseRes `yaml:",inline"`
State string `yaml:"state"` // state: running, stopped, undefined
@@ -62,7 +67,7 @@ func (obj *SvcRes) Validate() bool {
}
// Service watcher
func (obj *SvcRes) Watch() {
func (obj *SvcRes) Watch(processChan chan struct{}) {
if obj.IsWatching() {
return
}
@@ -189,7 +194,7 @@ func (obj *SvcRes) Watch() {
case err := <-subErrors:
obj.SetConvergedState(resConvergedNil) // XXX ?
log.Println("error:", err)
log.Printf("error: %v", err)
log.Fatal(err)
//vertex.events <- fmt.Sprintf("svc: %v", "error") // XXX: how should we handle errors?
@@ -210,7 +215,7 @@ func (obj *SvcRes) Watch() {
dirty = false
obj.isStateOK = false // something made state dirty
}
Process(obj) // XXX: rename this function
processChan <- struct{}{} // trigger process
}
}