resources: graph: Add partial implementation of a graph resource
This is something I've wanted to do for a while, but for the reasons mentioned in the comments, I've been unable to complete yet. I figured I'd at least merge what does exist so far in case someone else would like to pick this up. It's a bit of a brain hurdle / monster, because the tricky part is refactoring the core engine so that this fits in nicely. Perhaps someone will have more time and/or less tunnel vision than I to either merge something or sketch out some ideas on the path forwards. I think it's a useful goal because if recursive resources are possible, it could force the core engine into a more elegant design. Happy hacking!
This commit is contained in:
239
examples/lib/libmgmt-subgraph1.go
Normal file
239
examples/lib/libmgmt-subgraph1.go
Normal file
@@ -0,0 +1,239 @@
|
||||
// libmgmt example of graph resource
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/gapi"
|
||||
mgmt "github.com/purpleidea/mgmt/lib"
|
||||
"github.com/purpleidea/mgmt/pgraph"
|
||||
"github.com/purpleidea/mgmt/resources"
|
||||
)
|
||||
|
||||
// MyGAPI implements the main GAPI interface.
|
||||
type MyGAPI struct {
|
||||
Name string // graph name
|
||||
Interval uint // refresh interval, 0 to never refresh
|
||||
|
||||
data gapi.Data
|
||||
initialized bool
|
||||
closeChan chan struct{}
|
||||
wg sync.WaitGroup // sync group for tunnel go routines
|
||||
}
|
||||
|
||||
// NewMyGAPI creates a new MyGAPI struct and calls Init().
|
||||
func NewMyGAPI(data gapi.Data, name string, interval uint) (*MyGAPI, error) {
|
||||
obj := &MyGAPI{
|
||||
Name: name,
|
||||
Interval: interval,
|
||||
}
|
||||
return obj, obj.Init(data)
|
||||
}
|
||||
|
||||
// Init initializes the MyGAPI struct.
|
||||
func (obj *MyGAPI) Init(data gapi.Data) error {
|
||||
if obj.initialized {
|
||||
return fmt.Errorf("already initialized")
|
||||
}
|
||||
if obj.Name == "" {
|
||||
return fmt.Errorf("the graph name must be specified")
|
||||
}
|
||||
obj.data = data // store for later
|
||||
obj.closeChan = make(chan struct{})
|
||||
obj.initialized = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Graph returns a current Graph.
|
||||
func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
|
||||
if !obj.initialized {
|
||||
return nil, fmt.Errorf("libmgmt: MyGAPI is not initialized")
|
||||
}
|
||||
|
||||
g, err := pgraph.NewGraph(obj.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// FIXME: these are being specified temporarily until it's the default!
|
||||
metaparams := resources.DefaultMetaParams
|
||||
|
||||
content := "I created a subgraph!\n"
|
||||
f0 := &resources.FileRes{
|
||||
BaseRes: resources.BaseRes{
|
||||
Name: "README",
|
||||
MetaParams: metaparams,
|
||||
},
|
||||
Path: "/tmp/mgmt/README",
|
||||
Content: &content,
|
||||
State: "present",
|
||||
}
|
||||
g.AddVertex(f0)
|
||||
|
||||
// create a subgraph to add *into* a graph resource
|
||||
subGraph, err := pgraph.NewGraph(fmt.Sprintf("%s->subgraph", obj.Name))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// add elements into the sub graph
|
||||
f1 := &resources.FileRes{
|
||||
BaseRes: resources.BaseRes{
|
||||
Name: "file1",
|
||||
MetaParams: metaparams,
|
||||
},
|
||||
Path: "/tmp/mgmt/sub1",
|
||||
|
||||
State: "present",
|
||||
}
|
||||
subGraph.AddVertex(f1)
|
||||
|
||||
n1 := &resources.NoopRes{
|
||||
BaseRes: resources.BaseRes{
|
||||
Name: "noop1",
|
||||
MetaParams: metaparams,
|
||||
},
|
||||
}
|
||||
subGraph.AddVertex(n1)
|
||||
|
||||
e0 := &resources.Edge{Name: "e0"}
|
||||
e0.Notify = true // send a notification from v0 to v1
|
||||
subGraph.AddEdge(f1, n1, e0)
|
||||
|
||||
// create the actual resource to hold the sub graph
|
||||
subGraphRes0 := &resources.GraphRes{ // TODO: should we name this SubGraphRes ?
|
||||
BaseRes: resources.BaseRes{
|
||||
Name: "subgraph1",
|
||||
MetaParams: metaparams,
|
||||
},
|
||||
Graph: subGraph,
|
||||
}
|
||||
g.AddVertex(subGraphRes0) // add it to the main graph
|
||||
|
||||
//g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.World, obj.data.Noop)
|
||||
return g, nil
|
||||
}
|
||||
|
||||
// Next returns nil errors every time there could be a new graph.
|
||||
func (obj *MyGAPI) Next() chan error {
|
||||
ch := make(chan error)
|
||||
obj.wg.Add(1)
|
||||
go func() {
|
||||
defer obj.wg.Done()
|
||||
defer close(ch) // this will run before the obj.wg.Done()
|
||||
if !obj.initialized {
|
||||
ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized")
|
||||
return
|
||||
}
|
||||
startChan := make(chan struct{}) // start signal
|
||||
close(startChan) // kick it off!
|
||||
|
||||
ticker := make(<-chan time.Time)
|
||||
if obj.data.NoStreamWatch || obj.Interval <= 0 {
|
||||
ticker = nil
|
||||
} else {
|
||||
// arbitrarily change graph every interval seconds
|
||||
t := time.NewTicker(time.Duration(obj.Interval) * time.Second)
|
||||
defer t.Stop()
|
||||
ticker = t.C
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-startChan: // kick the loop once at start
|
||||
startChan = nil // disable
|
||||
// pass
|
||||
case <-ticker:
|
||||
// pass
|
||||
case <-obj.closeChan:
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("libmgmt: Generating new graph...")
|
||||
select {
|
||||
case ch <- nil: // trigger a run
|
||||
case <-obj.closeChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
|
||||
// Close shuts down the MyGAPI.
|
||||
func (obj *MyGAPI) Close() error {
|
||||
if !obj.initialized {
|
||||
return fmt.Errorf("libmgmt: MyGAPI is not initialized")
|
||||
}
|
||||
close(obj.closeChan)
|
||||
obj.wg.Wait()
|
||||
obj.initialized = false // closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run runs an embedded mgmt server.
|
||||
func Run() error {
|
||||
|
||||
obj := &mgmt.Main{}
|
||||
obj.Program = "libmgmt" // TODO: set on compilation
|
||||
obj.Version = "0.0.1" // TODO: set on compilation
|
||||
obj.TmpPrefix = true // disable for easy debugging
|
||||
//prefix := "/tmp/testprefix/"
|
||||
//obj.Prefix = &p // enable for easy debugging
|
||||
obj.IdealClusterSize = -1
|
||||
obj.ConvergedTimeout = -1
|
||||
obj.Noop = false // FIXME: careful!
|
||||
|
||||
obj.GAPI = &MyGAPI{ // graph API
|
||||
Name: "libmgmt", // TODO: set on compilation
|
||||
Interval: 60 * 10, // arbitrarily change graph every 15 seconds
|
||||
}
|
||||
|
||||
if err := obj.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// install the exit signal handler
|
||||
exit := make(chan struct{})
|
||||
defer close(exit)
|
||||
go func() {
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, os.Interrupt) // catch ^C
|
||||
//signal.Notify(signals, os.Kill) // catch signals
|
||||
signal.Notify(signals, syscall.SIGTERM)
|
||||
|
||||
select {
|
||||
case sig := <-signals: // any signal will do
|
||||
if sig == os.Interrupt {
|
||||
log.Println("Interrupted by ^C")
|
||||
obj.Exit(nil)
|
||||
return
|
||||
}
|
||||
log.Println("Interrupted by signal")
|
||||
obj.Exit(fmt.Errorf("killed by %v", sig))
|
||||
return
|
||||
case <-exit:
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
if err := obj.Run(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
log.Printf("Hello!")
|
||||
if err := Run(); err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
return
|
||||
}
|
||||
log.Printf("Goodbye!")
|
||||
}
|
||||
242
resources/graph.go
Normal file
242
resources/graph.go
Normal file
@@ -0,0 +1,242 @@
|
||||
// Mgmt
|
||||
// Copyright (C) 2013-2017+ James Shubin and the project contributors
|
||||
// Written by James Shubin <james@shubin.ca> and the project contributors
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package resources
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
|
||||
"github.com/purpleidea/mgmt/pgraph"
|
||||
|
||||
multierr "github.com/hashicorp/go-multierror"
|
||||
errwrap "github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterResource("graph", func() Res { return &GraphRes{} })
|
||||
gob.Register(&GraphRes{})
|
||||
}
|
||||
|
||||
// GraphRes is a resource that recursively runs a sub graph of resources.
|
||||
// TODO: should we name this SubGraphRes instead?
|
||||
// TODO: we could also flatten "sub graphs" into the main graph to avoid this,
|
||||
// and this could even be done with a graph transformation called flatten,
|
||||
// similar to where autogroup and autoedges run.
|
||||
// XXX: this resource is not complete, and hasn't even been tested
|
||||
type GraphRes struct {
|
||||
BaseRes `yaml:",inline"`
|
||||
Graph *pgraph.Graph `yaml:"graph"` // TODO: how do we suck in a graph via yaml?
|
||||
|
||||
initCount int // number of successfully initialized resources
|
||||
}
|
||||
|
||||
// GraphUID is a unique representation for a GraphRes object.
|
||||
type GraphUID struct {
|
||||
BaseUID
|
||||
//foo string // XXX: not implemented
|
||||
}
|
||||
|
||||
// Default returns some sensible defaults for this resource.
|
||||
func (obj *GraphRes) Default() Res {
|
||||
return &GraphRes{
|
||||
BaseRes: BaseRes{
|
||||
MetaParams: DefaultMetaParams, // force a default
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Validate the params and sub resources that are passed to GraphRes.
|
||||
func (obj *GraphRes) Validate() error {
|
||||
var err error
|
||||
for _, v := range obj.Graph.VerticesSorted() { // validate everyone
|
||||
if e := VtoR(v).Validate(); err != nil {
|
||||
err = multierr.Append(err, e) // list of errors
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return errwrap.Wrapf(err, "could not Validate() graph")
|
||||
}
|
||||
|
||||
return obj.BaseRes.Validate()
|
||||
}
|
||||
|
||||
// Init runs some startup code for this resource.
|
||||
func (obj *GraphRes) Init() error {
|
||||
// Loop through each vertex and initialize it, but keep track of how far
|
||||
// we've succeeded, because on failure we'll stop and prepare to reverse
|
||||
// through from there running the Close operation on each vertex that we
|
||||
// previously did an Init on. The engine always ensures that we run this
|
||||
// with a 1-1 relationship between Init and Close, so we must do so too.
|
||||
for i, v := range obj.Graph.VerticesSorted() { // deterministic order!
|
||||
obj.initCount = i + 1 // store the number that we tried to init
|
||||
if err := VtoR(v).Init(); err != nil {
|
||||
return errwrap.Wrapf(err, "could not Init() graph")
|
||||
}
|
||||
}
|
||||
|
||||
obj.BaseRes.Kind = "graph"
|
||||
return obj.BaseRes.Init() // call base init, b/c we're overrriding
|
||||
}
|
||||
|
||||
// Close runs some cleanup code for this resource.
|
||||
func (obj *GraphRes) Close() error {
|
||||
// The idea is to Close anything we did an Init on including the BaseRes
|
||||
// methods which are not guaranteed to be safe if called multiple times!
|
||||
var err error
|
||||
vertices := obj.Graph.VerticesSorted() // deterministic order!
|
||||
last := obj.initCount - 1 // index of last vertex we did init on
|
||||
for i := range vertices {
|
||||
v := vertices[last-i] // go through in reverse
|
||||
|
||||
// if we hit this condition, we haven't been able to get through
|
||||
// the entire list of vertices that we'd have liked to, on init!
|
||||
if obj.initCount == 0 {
|
||||
// if we get here, we exit without calling BaseRes.Close
|
||||
// because the matching BaseRes.Init did not get called!
|
||||
return errwrap.Wrapf(err, "could not Close() partial graph")
|
||||
//break
|
||||
}
|
||||
|
||||
obj.initCount-- // count to avoid closing one that didn't init!
|
||||
// try to close everyone that got an init, don't stop suddenly!
|
||||
if e := VtoR(v).Close(); e != nil {
|
||||
err = multierr.Append(err, e) // list of errors
|
||||
}
|
||||
}
|
||||
|
||||
// call base close, b/c we're overriding
|
||||
if e := obj.BaseRes.Close(); err == nil {
|
||||
err = e
|
||||
} else if e != nil {
|
||||
err = multierr.Append(err, e) // list of errors
|
||||
}
|
||||
// this returns nil if err is nil
|
||||
return errwrap.Wrapf(err, "could not Close() graph")
|
||||
}
|
||||
|
||||
// Watch is the primary listener for this resource and it outputs events.
|
||||
// XXX: should this use mgraph.Start/Pause? if so then what does CheckApply do?
|
||||
// XXX: we should probably refactor the core engine to make this work, which
|
||||
// will hopefully lead us to a more elegant core that is easier to understand
|
||||
func (obj *GraphRes) Watch() error {
|
||||
|
||||
return fmt.Errorf("Not implemented")
|
||||
}
|
||||
|
||||
// CheckApply method for Graph resource.
|
||||
// XXX: not implemented
|
||||
func (obj *GraphRes) CheckApply(apply bool) (bool, error) {
|
||||
|
||||
return false, fmt.Errorf("Not implemented")
|
||||
|
||||
}
|
||||
|
||||
// UIDs includes all params to make a unique identification of this object.
|
||||
// Most resources only return one, although some resources can return multiple.
|
||||
func (obj *GraphRes) UIDs() []ResUID {
|
||||
x := &GraphUID{
|
||||
BaseUID: BaseUID{
|
||||
Name: obj.GetName(),
|
||||
Kind: obj.GetKind(),
|
||||
},
|
||||
//foo: obj.foo, // XXX: not implemented
|
||||
}
|
||||
uids := []ResUID{}
|
||||
for _, v := range obj.Graph.VerticesSorted() {
|
||||
uids = append(uids, VtoR(v).UIDs()...)
|
||||
}
|
||||
return append([]ResUID{x}, uids...)
|
||||
}
|
||||
|
||||
// XXX: hook up the autogrouping magic!
|
||||
|
||||
// AutoEdges returns the AutoEdges. In this case none are used.
|
||||
func (obj *GraphRes) AutoEdges() AutoEdge {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Compare two resources and return if they are equivalent.
|
||||
func (obj *GraphRes) Compare(r Res) bool {
|
||||
// we can only compare GraphRes to others of the same resource kind
|
||||
res, ok := r.(*GraphRes)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if !obj.BaseRes.Compare(res) {
|
||||
return false
|
||||
}
|
||||
if obj.Name != res.Name {
|
||||
return false
|
||||
}
|
||||
|
||||
//if obj.Foo != res.Foo { // XXX: not implemented
|
||||
// return false
|
||||
//}
|
||||
// compare the structure of the two graphs...
|
||||
vertexCmpFn := func(v1, v2 pgraph.Vertex) (bool, error) {
|
||||
if v1.String() == "" || v2.String() == "" {
|
||||
return false, fmt.Errorf("oops, empty vertex")
|
||||
}
|
||||
return VtoR(v1).Compare(VtoR(v2)), nil
|
||||
}
|
||||
|
||||
edgeCmpFn := func(e1, e2 pgraph.Edge) (bool, error) {
|
||||
if e1.String() == "" || e2.String() == "" {
|
||||
return false, fmt.Errorf("oops, empty edge")
|
||||
}
|
||||
edge1 := e1.(*Edge) // panic if wrong
|
||||
edge2 := e2.(*Edge) // panic if wrong
|
||||
return edge1.Compare(edge2), nil
|
||||
}
|
||||
if err := obj.Graph.GraphCmp(res.Graph, vertexCmpFn, edgeCmpFn); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// compare individual elements in structurally equivalent graphs
|
||||
// TODO: is this redundant with the GraphCmp?
|
||||
g1 := obj.Graph.VerticesSorted()
|
||||
g2 := res.Graph.VerticesSorted()
|
||||
for i, v1 := range g1 {
|
||||
v2 := g2[i]
|
||||
if !VtoR(v1).Compare(VtoR(v2)) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// UnmarshalYAML is the custom unmarshal handler for this struct.
|
||||
// It is primarily useful for setting the defaults.
|
||||
func (obj *GraphRes) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||
type rawRes GraphRes // indirection to avoid infinite recursion
|
||||
|
||||
def := obj.Default() // get the default
|
||||
res, ok := def.(*GraphRes) // put in the right format
|
||||
if !ok {
|
||||
return fmt.Errorf("could not convert to GraphRes")
|
||||
}
|
||||
raw := rawRes(*res) // convert; the defaults go here
|
||||
|
||||
if err := unmarshal(&raw); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*obj = GraphRes(raw) // restore from indirection with type conversion!
|
||||
return nil
|
||||
}
|
||||
@@ -58,6 +58,7 @@ type Resources struct {
|
||||
Augeas []*resources.AugeasRes `yaml:"augeas"`
|
||||
Exec []*resources.ExecRes `yaml:"exec"`
|
||||
File []*resources.FileRes `yaml:"file"`
|
||||
Graph []*resources.GraphRes `yaml:"graph"`
|
||||
Hostname []*resources.HostnameRes `yaml:"hostname"`
|
||||
KV []*resources.KVRes `yaml:"kv"`
|
||||
Msg []*resources.MsgRes `yaml:"msg"`
|
||||
|
||||
Reference in New Issue
Block a user