pgraph: Add logic functions for adding subgraphs

These are helper functions to merge in existing graphs into a main graph
with or without adding an edge relationship between a vertex and the new
graph. These are particularly useful if using mgmt as a lib to break
apart units of work into functions that create sub graphs, which are
then added to the main graph when they're returned.
This commit is contained in:
James Shubin
2017-05-31 14:49:54 -04:00
parent 51369adad1
commit 458e115490
3 changed files with 567 additions and 0 deletions

View File

@@ -0,0 +1,251 @@
// libmgmt example of flattened subgraph
package main
import (
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/purpleidea/mgmt/gapi"
mgmt "github.com/purpleidea/mgmt/lib"
"github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/resources"
errwrap "github.com/pkg/errors"
)
// MyGAPI implements the main GAPI interface.
type MyGAPI struct {
Name string // graph name
Interval uint // refresh interval, 0 to never refresh
data gapi.Data
initialized bool
closeChan chan struct{}
wg sync.WaitGroup // sync group for tunnel go routines
}
// NewMyGAPI creates a new MyGAPI struct and calls Init().
func NewMyGAPI(data gapi.Data, name string, interval uint) (*MyGAPI, error) {
obj := &MyGAPI{
Name: name,
Interval: interval,
}
return obj, obj.Init(data)
}
// Init initializes the MyGAPI struct.
func (obj *MyGAPI) Init(data gapi.Data) error {
if obj.initialized {
return fmt.Errorf("already initialized")
}
if obj.Name == "" {
return fmt.Errorf("the graph name must be specified")
}
obj.data = data // store for later
obj.closeChan = make(chan struct{})
obj.initialized = true
return nil
}
func (obj *MyGAPI) subGraph() (*pgraph.Graph, error) {
g, err := pgraph.NewGraph(obj.Name)
if err != nil {
return nil, err
}
metaparams := resources.DefaultMetaParams
f1 := &resources.FileRes{
BaseRes: resources.BaseRes{
Name: "file1",
MetaParams: metaparams,
},
Path: "/tmp/mgmt/sub1",
State: "present",
}
g.AddVertex(f1)
n1 := &resources.NoopRes{
BaseRes: resources.BaseRes{
Name: "noop1",
MetaParams: metaparams,
},
}
g.AddVertex(n1)
return g, nil
}
// Graph returns a current Graph.
func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
if !obj.initialized {
return nil, fmt.Errorf("libmgmt: MyGAPI is not initialized")
}
g, err := pgraph.NewGraph(obj.Name)
if err != nil {
return nil, err
}
// FIXME: these are being specified temporarily until it's the default!
metaparams := resources.DefaultMetaParams
content := "I created a subgraph!\n"
f0 := &resources.FileRes{
BaseRes: resources.BaseRes{
Name: "README",
MetaParams: metaparams,
},
Path: "/tmp/mgmt/README",
Content: &content,
State: "present",
}
g.AddVertex(f0)
subGraph, err := obj.subGraph()
if err != nil {
return nil, errwrap.Wrapf(err, "running subGraph() failed")
}
edgeGenFn := func(v1, v2 pgraph.Vertex) pgraph.Edge {
edge := &resources.Edge{
Name: fmt.Sprintf("edge: %s->%s", v1, v2),
}
// if we want to do something specific based on input
_, v2IsFile := v2.(*resources.FileRes)
if v1 == f0 && v2IsFile {
edge.Notify = true
}
return edge
}
g.AddEdgeVertexGraph(f0, subGraph, edgeGenFn)
//g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.World, obj.data.Noop)
return g, nil
}
// Next returns nil errors every time there could be a new graph.
func (obj *MyGAPI) Next() chan error {
ch := make(chan error)
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
defer close(ch) // this will run before the obj.wg.Done()
if !obj.initialized {
ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized")
return
}
startChan := make(chan struct{}) // start signal
close(startChan) // kick it off!
ticker := make(<-chan time.Time)
if obj.data.NoStreamWatch || obj.Interval <= 0 {
ticker = nil
} else {
// arbitrarily change graph every interval seconds
t := time.NewTicker(time.Duration(obj.Interval) * time.Second)
defer t.Stop()
ticker = t.C
}
for {
select {
case <-startChan: // kick the loop once at start
startChan = nil // disable
// pass
case <-ticker:
// pass
case <-obj.closeChan:
return
}
log.Printf("libmgmt: Generating new graph...")
select {
case ch <- nil: // trigger a run
case <-obj.closeChan:
return
}
}
}()
return ch
}
// Close shuts down the MyGAPI.
func (obj *MyGAPI) Close() error {
if !obj.initialized {
return fmt.Errorf("libmgmt: MyGAPI is not initialized")
}
close(obj.closeChan)
obj.wg.Wait()
obj.initialized = false // closed = true
return nil
}
// Run runs an embedded mgmt server.
func Run() error {
obj := &mgmt.Main{}
obj.Program = "libmgmt" // TODO: set on compilation
obj.Version = "0.0.1" // TODO: set on compilation
obj.TmpPrefix = true // disable for easy debugging
//prefix := "/tmp/testprefix/"
//obj.Prefix = &p // enable for easy debugging
obj.IdealClusterSize = -1
obj.ConvergedTimeout = -1
obj.Noop = false // FIXME: careful!
obj.GAPI = &MyGAPI{ // graph API
Name: "libmgmt", // TODO: set on compilation
Interval: 60 * 10, // arbitrarily change graph every 15 seconds
}
if err := obj.Init(); err != nil {
return err
}
// install the exit signal handler
exit := make(chan struct{})
defer close(exit)
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt) // catch ^C
//signal.Notify(signals, os.Kill) // catch signals
signal.Notify(signals, syscall.SIGTERM)
select {
case sig := <-signals: // any signal will do
if sig == os.Interrupt {
log.Println("Interrupted by ^C")
obj.Exit(nil)
return
}
log.Println("Interrupted by signal")
obj.Exit(fmt.Errorf("killed by %v", sig))
return
case <-exit:
return
}
}()
if err := obj.Run(); err != nil {
return err
}
return nil
}
func main() {
log.Printf("Hello!")
if err := Run(); err != nil {
fmt.Println(err)
os.Exit(1)
return
}
log.Printf("Goodbye!")
}

106
pgraph/subgraph.go Normal file
View File

@@ -0,0 +1,106 @@
// Mgmt
// Copyright (C) 2013-2017+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgraph
// AddGraph adds the set of edges and vertices of a graph to the existing graph.
func (g *Graph) AddGraph(graph *Graph) {
g.addEdgeVertexGraphHelper(nil, graph, nil, false, false)
}
// AddEdgeVertexGraph adds a directed edge to the graph from a vertex.
// This is useful for flattening the relationship between a subgraph and an
// existing graph, without having to run the subgraph recursively. It adds the
// maximum number of edges, creating a relationship to every vertex.
func (g *Graph) AddEdgeVertexGraph(vertex Vertex, graph *Graph, edgeGenFn func(v1, v2 Vertex) Edge) {
g.addEdgeVertexGraphHelper(vertex, graph, edgeGenFn, false, false)
}
// AddEdgeVertexGraphLight adds a directed edge to the graph from a vertex.
// This is useful for flattening the relationship between a subgraph and an
// existing graph, without having to run the subgraph recursively. It adds the
// minimum number of edges, creating a relationship to the vertices with
// indegree equal to zero.
func (g *Graph) AddEdgeVertexGraphLight(vertex Vertex, graph *Graph, edgeGenFn func(v1, v2 Vertex) Edge) {
g.addEdgeVertexGraphHelper(vertex, graph, edgeGenFn, false, true)
}
// AddEdgeGraphVertex adds a directed edge to the vertex from a graph.
// This is useful for flattening the relationship between a subgraph and an
// existing graph, without having to run the subgraph recursively. It adds the
// maximum number of edges, creating a relationship from every vertex.
func (g *Graph) AddEdgeGraphVertex(graph *Graph, vertex Vertex, edgeGenFn func(v1, v2 Vertex) Edge) {
g.addEdgeVertexGraphHelper(vertex, graph, edgeGenFn, true, false)
}
// AddEdgeGraphVertexLight adds a directed edge to the vertex from a graph.
// This is useful for flattening the relationship between a subgraph and an
// existing graph, without having to run the subgraph recursively. It adds the
// minimum number of edges, creating a relationship from the vertices with
// outdegree equal to zero.
func (g *Graph) AddEdgeGraphVertexLight(graph *Graph, vertex Vertex, edgeGenFn func(v1, v2 Vertex) Edge) {
g.addEdgeVertexGraphHelper(vertex, graph, edgeGenFn, true, true)
}
// addEdgeVertexGraphHelper is a helper function to add a directed edges to the
// graph from a vertex, or vice-versa. It operates in this reverse direction by
// specifying the reverse argument as true. It is useful for flattening the
// relationship between a subgraph and an existing graph, without having to run
// the subgraph recursively. It adds the maximum number of edges, creating a
// relationship to or from every vertex if the light argument is false, and if
// it is true, it adds the minimum number of edges, creating a relationship to
// or from the vertices with an indegree or outdegree equal to zero depending on
// if we specified reverse or not.
func (g *Graph) addEdgeVertexGraphHelper(vertex Vertex, graph *Graph, edgeGenFn func(v1, v2 Vertex) Edge, reverse, light bool) {
var degree map[Vertex]int // compute all of the in/outdegree's if needed
if light && reverse {
degree = graph.OutDegree()
} else if light { // && !reverse
degree = graph.InDegree()
}
for _, v := range graph.VerticesSorted() { // sort to help out edgeGenFn
// forward:
// we only want to add edges to indegree == 0, because every
// other vertex is a dependency of at least one of those
// reverse:
// we only want to add edges to outdegree == 0, because every
// other vertex is a pre-requisite to at least one of these
if light && degree[v] != 0 {
continue
}
g.AddVertex(v) // ensure vertex is part of the graph
if vertex != nil && reverse {
edge := edgeGenFn(v, vertex) // generate a new unique edge
g.AddEdge(v, vertex, edge)
} else if vertex != nil { // && !reverse
edge := edgeGenFn(vertex, v)
g.AddEdge(vertex, v, edge)
}
}
// also remember to suck in all of the graph's edges too!
for v1 := range graph.Adjacency() {
for v2, e := range graph.Adjacency()[v1] {
g.AddEdge(v1, v2, e)
}
}
}

210
pgraph/subgraph_test.go Normal file
View File

@@ -0,0 +1,210 @@
// Mgmt
// Copyright (C) 2013-2017+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgraph
import (
"fmt"
"testing"
)
// TODO: unify with the other function like this...
// TODO: where should we put our test helpers?
func runGraphCmp(t *testing.T, g1, g2 *Graph) {
err := g1.GraphCmp(g2, vertexCmpFn, edgeCmpFn)
if err != nil {
t.Logf(" actual (g1): %v%v", g1, fullPrint(g1))
t.Logf("expected (g2): %v%v", g2, fullPrint(g2))
t.Logf("Cmp error:")
t.Errorf("%v", err)
}
}
// TODO: unify with the other function like this...
func fullPrint(g *Graph) (str string) {
str += "\n"
for v := range g.Adjacency() {
str += fmt.Sprintf("* v: %s\n", v)
}
for v1 := range g.Adjacency() {
for v2, e := range g.Adjacency()[v1] {
str += fmt.Sprintf("* e: %s -> %s # %s\n", v1, v2, e)
}
}
return
}
// edgeGenFn generates unique edges for each vertex pair, assuming unique
// vertices.
func edgeGenFn(v1, v2 Vertex) Edge {
return NE(fmt.Sprintf("%s,%s", v1, v2))
}
func TestPgraphAddEdgeGraph1(t *testing.T) {
v1 := NV("v1")
v2 := NV("v2")
v3 := NV("v3")
v4 := NV("v4")
v5 := NV("v5")
e1 := NE("e1")
e2 := NE("e2")
e3 := NE("e3")
g := &Graph{}
g.AddEdge(v1, v3, e1)
g.AddEdge(v2, v3, e2)
sub := &Graph{}
sub.AddEdge(v4, v5, e3)
g.AddGraph(sub)
// expected (can re-use the same vertices)
expected := &Graph{}
expected.AddEdge(v1, v3, e1)
expected.AddEdge(v2, v3, e2)
expected.AddEdge(v4, v5, e3)
//expected.AddEdge(v3, v4, NE("v3,v4"))
//expected.AddEdge(v3, v5, NE("v3,v5"))
runGraphCmp(t, g, expected)
}
func TestPgraphAddEdgeVertexGraph1(t *testing.T) {
v1 := NV("v1")
v2 := NV("v2")
v3 := NV("v3")
v4 := NV("v4")
v5 := NV("v5")
e1 := NE("e1")
e2 := NE("e2")
e3 := NE("e3")
g := &Graph{}
g.AddEdge(v1, v3, e1)
g.AddEdge(v2, v3, e2)
sub := &Graph{}
sub.AddEdge(v4, v5, e3)
g.AddEdgeVertexGraph(v3, sub, edgeGenFn)
// expected (can re-use the same vertices)
expected := &Graph{}
expected.AddEdge(v1, v3, e1)
expected.AddEdge(v2, v3, e2)
expected.AddEdge(v4, v5, e3)
expected.AddEdge(v3, v4, NE("v3,v4"))
expected.AddEdge(v3, v5, NE("v3,v5"))
runGraphCmp(t, g, expected)
}
func TestPgraphAddEdgeGraphVertex1(t *testing.T) {
v1 := NV("v1")
v2 := NV("v2")
v3 := NV("v3")
v4 := NV("v4")
v5 := NV("v5")
e1 := NE("e1")
e2 := NE("e2")
e3 := NE("e3")
g := &Graph{}
g.AddEdge(v1, v3, e1)
g.AddEdge(v2, v3, e2)
sub := &Graph{}
sub.AddEdge(v4, v5, e3)
g.AddEdgeGraphVertex(sub, v3, edgeGenFn)
// expected (can re-use the same vertices)
expected := &Graph{}
expected.AddEdge(v1, v3, e1)
expected.AddEdge(v2, v3, e2)
expected.AddEdge(v4, v5, e3)
expected.AddEdge(v4, v3, NE("v4,v3"))
expected.AddEdge(v5, v3, NE("v5,v3"))
runGraphCmp(t, g, expected)
}
func TestPgraphAddEdgeVertexGraphLight1(t *testing.T) {
v1 := NV("v1")
v2 := NV("v2")
v3 := NV("v3")
v4 := NV("v4")
v5 := NV("v5")
e1 := NE("e1")
e2 := NE("e2")
e3 := NE("e3")
g := &Graph{}
g.AddEdge(v1, v3, e1)
g.AddEdge(v2, v3, e2)
sub := &Graph{}
sub.AddEdge(v4, v5, e3)
g.AddEdgeVertexGraphLight(v3, sub, edgeGenFn)
// expected (can re-use the same vertices)
expected := &Graph{}
expected.AddEdge(v1, v3, e1)
expected.AddEdge(v2, v3, e2)
expected.AddEdge(v4, v5, e3)
expected.AddEdge(v3, v4, NE("v3,v4"))
//expected.AddEdge(v3, v5, NE("v3,v5")) // not needed with light
runGraphCmp(t, g, expected)
}
func TestPgraphAddEdgeGraphVertexLight1(t *testing.T) {
v1 := NV("v1")
v2 := NV("v2")
v3 := NV("v3")
v4 := NV("v4")
v5 := NV("v5")
e1 := NE("e1")
e2 := NE("e2")
e3 := NE("e3")
g := &Graph{}
g.AddEdge(v1, v3, e1)
g.AddEdge(v2, v3, e2)
sub := &Graph{}
sub.AddEdge(v4, v5, e3)
g.AddEdgeGraphVertexLight(sub, v3, edgeGenFn)
// expected (can re-use the same vertices)
expected := &Graph{}
expected.AddEdge(v1, v3, e1)
expected.AddEdge(v2, v3, e2)
expected.AddEdge(v4, v5, e3)
//expected.AddEdge(v4, v3, NE("v4,v3")) // not needed with light
expected.AddEdge(v5, v3, NE("v5,v3"))
runGraphCmp(t, g, expected)
}