This is a prototype that i'm attempting to "release early". Expect a lot of changes! It is intended to be a config management tool that will: * be event based * execute actions in parallel * function as a distributed system There are a bunch more design ideas going into this, please stay tuned!
415 lines
9.6 KiB
Go
415 lines
9.6 KiB
Go
// Mgmt
|
|
// Copyright (C) 2013-2015+ James Shubin and the project contributors
|
|
// Written by James Shubin <james@shubin.ca> and the project contributors
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
// Pgraph (Pointer Graph)
|
|
package main
|
|
|
|
import (
|
|
"code.google.com/p/go-uuid/uuid"
|
|
//"container/list" // doubly linked list
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// The graph abstract data type (ADT) is defined as follows:
|
|
// NOTE: the directed graph arrows point from left to right ( --> )
|
|
// NOTE: the arrows point towards their dependencies (eg: arrows mean requires)
|
|
type Graph struct {
|
|
uuid string
|
|
Name string
|
|
Adjacency map[*Vertex]map[*Vertex]*Edge
|
|
//Directed bool
|
|
startcount int
|
|
}
|
|
|
|
type Vertex struct {
|
|
uuid string
|
|
graph *Graph // store a pointer to the graph it's on
|
|
Name string
|
|
Type string
|
|
Timestamp int64 // last updated timestamp ?
|
|
Events chan string // FIXME: eventually a struct for the event?
|
|
Typedata Type
|
|
data map[string]string
|
|
}
|
|
|
|
type Edge struct {
|
|
uuid string
|
|
Name string
|
|
}
|
|
|
|
func NewGraph(name string) *Graph {
|
|
return &Graph{
|
|
uuid: uuid.New(),
|
|
Name: name,
|
|
Adjacency: make(map[*Vertex]map[*Vertex]*Edge),
|
|
}
|
|
}
|
|
|
|
func NewVertex(name, t string) *Vertex {
|
|
return &Vertex{
|
|
uuid: uuid.New(),
|
|
Name: name,
|
|
Type: t,
|
|
Timestamp: -1,
|
|
Events: make(chan string, 1), // XXX: chan size?
|
|
data: make(map[string]string),
|
|
}
|
|
}
|
|
|
|
func NewEdge(name string) *Edge {
|
|
return &Edge{
|
|
uuid: uuid.New(),
|
|
Name: name,
|
|
}
|
|
}
|
|
|
|
// Graph() creates a new, empty graph.
|
|
// addVertex(vert) adds an instance of Vertex to the graph.
|
|
func (g *Graph) AddVertex(v *Vertex) {
|
|
if _, exists := g.Adjacency[v]; !exists {
|
|
g.Adjacency[v] = make(map[*Vertex]*Edge)
|
|
|
|
// store a pointer to the graph it's on for convenience and readability
|
|
v.graph = g
|
|
}
|
|
}
|
|
|
|
// addEdge(fromVert, toVert) Adds a new, directed edge to the graph that connects two vertices.
|
|
func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) {
|
|
// NOTE: this doesn't allow more than one edge between two vertexes...
|
|
// TODO: is this a problem?
|
|
g.AddVertex(v1)
|
|
g.AddVertex(v2)
|
|
g.Adjacency[v1][v2] = e
|
|
}
|
|
|
|
// addEdge(fromVert, toVert, weight) Adds a new, weighted, directed edge to the graph that connects two vertices.
|
|
// getVertex(vertKey) finds the vertex in the graph named vertKey.
|
|
func (g *Graph) GetVertex(uuid string) chan *Vertex {
|
|
ch := make(chan *Vertex, 1)
|
|
go func(uuid string) {
|
|
for k := range g.Adjacency {
|
|
v := *k
|
|
if v.uuid == uuid {
|
|
ch <- k
|
|
break
|
|
}
|
|
}
|
|
close(ch)
|
|
}(uuid)
|
|
return ch
|
|
}
|
|
|
|
func (g *Graph) NumVertices() int {
|
|
return len(g.Adjacency)
|
|
}
|
|
|
|
func (g *Graph) NumEdges() int {
|
|
// XXX: not implemented
|
|
return -1
|
|
}
|
|
|
|
// get an array (slice) of all vertices in the graph
|
|
func (g *Graph) GetVertices() []*Vertex {
|
|
vertices := make([]*Vertex, 0)
|
|
for k := range g.Adjacency {
|
|
vertices = append(vertices, k)
|
|
}
|
|
return vertices
|
|
}
|
|
|
|
// returns a channel of all vertices in the graph
|
|
func (g *Graph) GetVerticesChan() chan *Vertex {
|
|
ch := make(chan *Vertex)
|
|
// TODO: do you need to pass this through into the go routine?
|
|
go func(ch chan *Vertex) {
|
|
for k := range g.Adjacency {
|
|
ch <- k
|
|
}
|
|
close(ch)
|
|
}(ch)
|
|
return ch
|
|
}
|
|
|
|
// make the graph pretty print
|
|
func (g *Graph) String() string {
|
|
return fmt.Sprintf("Vertices(%d), Edges(%d)", g.NumVertices(), g.NumEdges())
|
|
}
|
|
|
|
//func (s []*Vertex) contains(element *Vertex) bool {
|
|
// google/golang hackers apparently do not think contains should be a built-in!
|
|
func Contains(s []*Vertex, element *Vertex) bool {
|
|
for _, v := range s {
|
|
if element == v {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// return an array (slice) of all vertices that connect to vertex v
|
|
func (g *Graph) GraphEdges(vertex *Vertex) []*Vertex {
|
|
// TODO: we might be able to implement this differently by reversing
|
|
// the Adjacency graph and then looping through it again...
|
|
s := make([]*Vertex, 0) // stack
|
|
for w, _ := range g.Adjacency[vertex] { // forward paths
|
|
//fmt.Printf("forward: %v -> %v\n", v.Name, w.Name)
|
|
s = append(s, w)
|
|
}
|
|
|
|
for k, x := range g.Adjacency { // reverse paths
|
|
for w, _ := range x {
|
|
if w == vertex {
|
|
//fmt.Printf("reverse: %v -> %v\n", v.Name, k.Name)
|
|
s = append(s, k)
|
|
}
|
|
}
|
|
}
|
|
return s
|
|
}
|
|
|
|
// return an array (slice) of all directed vertices to vertex v
|
|
func (g *Graph) DirectedGraphEdges(vertex *Vertex) []*Vertex {
|
|
// TODO: we might be able to implement this differently by reversing
|
|
// the Adjacency graph and then looping through it again...
|
|
s := make([]*Vertex, 0) // stack
|
|
for w, _ := range g.Adjacency[vertex] { // forward paths
|
|
//fmt.Printf("forward: %v -> %v\n", v.Name, w.Name)
|
|
s = append(s, w)
|
|
}
|
|
return s
|
|
}
|
|
|
|
// get timestamp of a vertex
|
|
func (v *Vertex) GetTimestamp() int64 {
|
|
return v.Timestamp
|
|
}
|
|
|
|
// update timestamp of a vertex
|
|
func (v *Vertex) UpdateTimestamp() int64 {
|
|
v.Timestamp = time.Now().UnixNano() // update
|
|
return v.Timestamp
|
|
}
|
|
|
|
func (g *Graph) DFS(start *Vertex) []*Vertex {
|
|
d := make([]*Vertex, 0) // discovered
|
|
s := make([]*Vertex, 0) // stack
|
|
if _, exists := g.Adjacency[start]; !exists {
|
|
return nil // TODO: error
|
|
}
|
|
v := start
|
|
s = append(s, v)
|
|
for len(s) > 0 {
|
|
|
|
v, s = s[len(s)-1], s[:len(s)-1] // s.pop()
|
|
|
|
if !Contains(d, v) { // if not discovered
|
|
d = append(d, v) // label as discovered
|
|
|
|
for _, w := range g.GraphEdges(v) {
|
|
s = append(s, w)
|
|
}
|
|
}
|
|
}
|
|
return d
|
|
}
|
|
|
|
// build a new graph containing only vertices from the list...
|
|
func (g *Graph) FilterGraph(name string, vertices []*Vertex) *Graph {
|
|
newgraph := NewGraph(name)
|
|
|
|
for k1, x := range g.Adjacency {
|
|
for k2, e := range x {
|
|
|
|
//fmt.Printf("Filter: %v -> %v # %v\n", k1.Name, k2.Name, e.Name)
|
|
if Contains(vertices, k1) || Contains(vertices, k2) {
|
|
newgraph.AddEdge(k1, k2, e)
|
|
}
|
|
}
|
|
}
|
|
|
|
return newgraph
|
|
}
|
|
|
|
// return a channel containing the N disconnected graphs in our main graph
|
|
// we can then process each of these in parallel
|
|
func (g *Graph) GetDisconnectedGraphs() chan *Graph {
|
|
ch := make(chan *Graph)
|
|
go func() {
|
|
var start *Vertex
|
|
d := make([]*Vertex, 0) // discovered
|
|
c := g.NumVertices()
|
|
for len(d) < c {
|
|
|
|
// get an undiscovered vertex to start from
|
|
for _, s := range g.GetVertices() {
|
|
if !Contains(d, s) {
|
|
start = s
|
|
}
|
|
}
|
|
|
|
// dfs through the graph
|
|
dfs := g.DFS(start)
|
|
// filter all the collected elements into a new graph
|
|
newgraph := g.FilterGraph(g.Name, dfs)
|
|
|
|
// add number of elements found to found variable
|
|
d = append(d, dfs...) // extend
|
|
|
|
// return this new graph to the channel
|
|
ch <- newgraph
|
|
|
|
// if we've found all the elements, then we're done
|
|
// otherwise loop through to continue...
|
|
}
|
|
|
|
close(ch)
|
|
}()
|
|
return ch
|
|
}
|
|
|
|
func (v *Vertex) Value(key string) (string, bool) {
|
|
if value, exists := v.data[key]; exists {
|
|
return value, true
|
|
}
|
|
return "", false
|
|
}
|
|
|
|
func (v *Vertex) SetValue(key, value string) bool {
|
|
v.data[key] = value
|
|
return true
|
|
}
|
|
|
|
func (g *Graph) GetVerticesKeyValue(key, value string) chan *Vertex {
|
|
ch := make(chan *Vertex)
|
|
go func() {
|
|
for vertex := range g.GetVerticesChan() {
|
|
if v, exists := vertex.Value(key); exists && v == value {
|
|
ch <- vertex
|
|
}
|
|
}
|
|
close(ch)
|
|
}()
|
|
return ch
|
|
}
|
|
|
|
// return a pointer to the graph a vertex is on
|
|
func (v *Vertex) GetGraph() *Graph {
|
|
return v.graph
|
|
}
|
|
|
|
func HeisenbergCount(ch chan *Vertex) int {
|
|
c := 0
|
|
for x := range ch {
|
|
_ = x
|
|
c++
|
|
}
|
|
return c
|
|
}
|
|
|
|
func (v *Vertex) Associate(t Type) {
|
|
v.Typedata = t
|
|
}
|
|
|
|
func (v *Vertex) OKTimestamp() bool {
|
|
g := v.GetGraph()
|
|
for _, n := range g.DirectedGraphEdges(v) {
|
|
if v.GetTimestamp() > n.GetTimestamp() {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// poke the XXX children?
|
|
func (v *Vertex) Poke() {
|
|
g := v.GetGraph()
|
|
|
|
for _, n := range g.DirectedGraphEdges(v) { // XXX: do we want the reverse order?
|
|
// poke!
|
|
n.Events <- fmt.Sprintf("poke(%v)", v.Name)
|
|
}
|
|
}
|
|
|
|
func (g *Graph) Exit() {
|
|
// tell all the vertices to exit...
|
|
for v := range g.GetVerticesChan() {
|
|
v.Exit()
|
|
}
|
|
}
|
|
|
|
func (v *Vertex) Exit() {
|
|
v.Events <- "exit"
|
|
}
|
|
|
|
// main loop for each vertex
|
|
// warning: this logic might be subtle and tricky.
|
|
// be careful as it might not even be correct now!
|
|
func (v *Vertex) Start() {
|
|
log.Printf("Main->Vertex[%v]->Start()\n", v.Name)
|
|
|
|
//g := v.GetGraph()
|
|
var t = v.Typedata
|
|
|
|
// this whole wg2 wait group is only necessary if we need to wait for
|
|
// the go routine to exit...
|
|
var wg2 sync.WaitGroup
|
|
|
|
wg2.Add(1)
|
|
go func(v *Vertex, t Type) {
|
|
defer wg2.Done()
|
|
//fmt.Printf("About to watch [%v].\n", v.Name)
|
|
t.Watch(v)
|
|
}(v, t)
|
|
|
|
var ok bool
|
|
//XXX make sure dependencies run and become more current first...
|
|
for {
|
|
select {
|
|
case event := <-v.Events:
|
|
|
|
log.Printf("Event[%v]: %v\n", v.Name, event)
|
|
|
|
if event == "exit" {
|
|
t.Exit() // type exit
|
|
wg2.Wait() // wait for worker to exit
|
|
return
|
|
}
|
|
|
|
ok = true
|
|
if v.OKTimestamp() {
|
|
if !t.StateOK() { // TODO: can we rename this to something better?
|
|
// throw an error if apply fails...
|
|
// if this fails, don't UpdateTimestamp()
|
|
if !t.Apply() { // check for error
|
|
ok = false
|
|
}
|
|
}
|
|
|
|
if ok {
|
|
v.UpdateTimestamp() // this was touched...
|
|
v.Poke() // XXX
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|