gapi: Split out graph generation into a proper graph API

This is a monster patch that splits out the yaml and puppet based graph
generation and pushes them behind a common API. In addition alternate
pluggable GAPI's can be easily added! The important side benefit is that
you can now write a custom GAPI for embedding mgmt!

This also includes some slight clean ups that I didn't find it worth
splitting into separate patches.
This commit is contained in:
James Shubin
2016-11-02 04:57:48 -04:00
parent 75dedf391a
commit 1370f2a76b
24 changed files with 916 additions and 249 deletions

View File

@@ -70,7 +70,7 @@ Older videos and other material [is available](https://github.com/purpleidea/mgm
##Setup ##Setup
During this prototype phase, the tool can be run out of the source directory. During this prototype phase, the tool can be run out of the source directory.
You'll probably want to use ```./run.sh run --file examples/graph1.yaml``` to You'll probably want to use ```./run.sh run --yaml examples/graph1.yaml``` to
get started. Beware that this _can_ cause data loss. Understand what you're get started. Beware that this _can_ cause data loss. Understand what you're
doing first, or perform these actions in a virtual environment such as the one doing first, or perform these actions in a virtual environment such as the one
provided by [Oh-My-Vagrant](https://github.com/purpleidea/oh-my-vagrant). provided by [Oh-My-Vagrant](https://github.com/purpleidea/oh-my-vagrant).
@@ -422,7 +422,7 @@ you can probably figure out most of it, as it's fairly intuitive.
The main interface to the `mgmt` tool is the command line. For the most recent The main interface to the `mgmt` tool is the command line. For the most recent
documentation, please run `mgmt --help`. documentation, please run `mgmt --help`.
####`--file <graph.yaml>` ####`--yaml <graph.yaml>`
Point to a graph file to run. Point to a graph file to run.
####`--converged-timeout <seconds>` ####`--converged-timeout <seconds>`

View File

@@ -41,7 +41,7 @@ cd $GOPATH/src/github.com/purpleidea/mgmt
``` ```
* Get the remaining golang deps with `go get ./...`, or run `make deps` if you're comfortable with how we install them. * Get the remaining golang deps with `go get ./...`, or run `make deps` if you're comfortable with how we install them.
* Run `make build` to get a freshly built `mgmt` binary. * Run `make build` to get a freshly built `mgmt` binary.
* Run `time ./mgmt run --file examples/graph0.yaml --converged-timeout=5 --tmp-prefix` to try out a very simple example! * Run `time ./mgmt run --yaml examples/graph0.yaml --converged-timeout=5 --tmp-prefix` to try out a very simple example!
* To run continuously in the default mode of operation, omit the `--converged-timeout` option. * To run continuously in the default mode of operation, omit the `--converged-timeout` option.
* Have fun hacking on our future technology! * Have fun hacking on our future technology!

View File

@@ -37,11 +37,11 @@
// //
// Smoke testing: // Smoke testing:
// mkdir /tmp/mgmt{A..E} // mkdir /tmp/mgmt{A..E}
// ./mgmt run --file examples/etcd1a.yaml --hostname h1 --tmp-prefix // ./mgmt run --yaml examples/etcd1a.yaml --hostname h1 --tmp-prefix
// ./mgmt run --file examples/etcd1b.yaml --hostname h2 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2381 --server-urls http://127.0.0.1:2382 // ./mgmt run --yaml examples/etcd1b.yaml --hostname h2 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2381 --server-urls http://127.0.0.1:2382
// ./mgmt run --file examples/etcd1c.yaml --hostname h3 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2383 --server-urls http://127.0.0.1:2384 // ./mgmt run --yaml examples/etcd1c.yaml --hostname h3 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2383 --server-urls http://127.0.0.1:2384
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/idealClusterSize 3 // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/idealClusterSize 3
// ./mgmt run --file examples/etcd1d.yaml --hostname h4 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2385 --server-urls http://127.0.0.1:2386 // ./mgmt run --yaml examples/etcd1d.yaml --hostname h4 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2385 --server-urls http://127.0.0.1:2386
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/idealClusterSize 5 // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/idealClusterSize 5
// ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list

View File

@@ -6,24 +6,66 @@ import (
"log" "log"
"os" "os"
"os/signal" "os/signal"
"sync"
"syscall" "syscall"
"time" "time"
"github.com/purpleidea/mgmt/gconfig" "github.com/purpleidea/mgmt/gapi"
mgmt "github.com/purpleidea/mgmt/mgmtmain" mgmt "github.com/purpleidea/mgmt/mgmtmain"
"github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/resources" "github.com/purpleidea/mgmt/resources"
"github.com/purpleidea/mgmt/yamlgraph"
) )
func generateGraphConfig() *gconfig.GraphConfig { // MyGAPI implements the main GAPI interface.
type MyGAPI struct {
Name string // graph name
Interval uint // refresh interval, 0 to never refresh
data gapi.Data
initialized bool
closeChan chan struct{}
wg sync.WaitGroup // sync group for tunnel go routines
}
// NewMyGAPI creates a new MyGAPI struct and calls Init().
func NewMyGAPI(data gapi.Data, name string, interval uint) (*MyGAPI, error) {
obj := &MyGAPI{
Name: name,
Interval: interval,
}
return obj, obj.Init(data)
}
// Init initializes the MyGAPI struct.
func (obj *MyGAPI) Init(data gapi.Data) error {
if obj.initialized {
return fmt.Errorf("Already initialized!")
}
if obj.Name == "" {
return fmt.Errorf("The graph name must be specified!")
}
obj.data = data // store for later
obj.closeChan = make(chan struct{})
obj.initialized = true
return nil
}
// Graph returns a current Graph.
func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
if !obj.initialized {
return nil, fmt.Errorf("libmgmt: MyGAPI is not initialized")
}
n1, err := resources.NewNoopRes("noop1") n1, err := resources.NewNoopRes("noop1")
if err != nil { if err != nil {
return nil // error return nil, fmt.Errorf("Can't create resource: %v", err)
} }
gc := &gconfig.GraphConfig{ // we can still build a graph via the yaml method
Graph: "libmgmt", gc := &yamlgraph.GraphConfig{
Resources: gconfig.Resources{ // must redefine anonymous struct :( Graph: obj.Name,
Resources: yamlgraph.Resources{ // must redefine anonymous struct :(
// in alphabetical order // in alphabetical order
Exec: []*resources.ExecRes{}, Exec: []*resources.ExecRes{},
File: []*resources.FileRes{}, File: []*resources.FileRes{},
@@ -37,38 +79,74 @@ func generateGraphConfig() *gconfig.GraphConfig {
//Collector: []collectorResConfig{}, //Collector: []collectorResConfig{},
//Edges: []Edge{}, //Edges: []Edge{},
Comment: "comment!", Comment: "comment!",
//Hostname: "???",
//Remote: "???",
} }
return gc
g, err := gc.NewGraphFromConfig(obj.data.Hostname, obj.data.EmbdEtcd, obj.data.Noop)
return g, err
}
// SwitchStream returns nil errors every time there could be a new graph.
func (obj *MyGAPI) SwitchStream() chan error {
if obj.data.NoWatch || obj.Interval <= 0 {
return nil
}
ch := make(chan error)
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
defer close(ch) // this will run before the obj.wg.Done()
if !obj.initialized {
ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized")
return
}
// arbitrarily change graph every interval seconds
ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Printf("libmgmt: Generating new graph...")
ch <- nil // trigger a run
case <-obj.closeChan:
return
}
}
}()
return ch
}
// Close shuts down the MyGAPI.
func (obj *MyGAPI) Close() error {
if !obj.initialized {
return fmt.Errorf("libmgmt: MyGAPI is not initialized")
}
close(obj.closeChan)
obj.wg.Wait()
obj.initialized = false // closed = true
return nil
} }
// Run runs an embedded mgmt server. // Run runs an embedded mgmt server.
func Run() error { func Run() error {
obj := &mgmt.Main{} obj := &mgmt.Main{}
obj.Program = "mgmtlib" // TODO: set on compilation obj.Program = "libmgmt" // TODO: set on compilation
obj.Version = "0.0.1" // TODO: set on compilation obj.Version = "0.0.1" // TODO: set on compilation
obj.TmpPrefix = true obj.TmpPrefix = true
obj.IdealClusterSize = -1 obj.IdealClusterSize = -1
obj.ConvergedTimeout = -1 obj.ConvergedTimeout = -1
obj.Noop = true obj.Noop = true
obj.GAPI = generateGraphConfig // graph API function obj.GAPI = &MyGAPI{ // graph API
Name: "libmgmt", // TODO: set on compilation
Interval: 15, // arbitrarily change graph every 15 seconds
}
if err := obj.Init(); err != nil { if err := obj.Init(); err != nil {
return err return err
} }
go func() {
for {
log.Printf("Generating new graph...")
obj.Switch(generateGraphConfig) // pass in function to run...
time.Sleep(15 * time.Second) // XXX: arbitrarily change graph every 30 seconds
}
}()
// install the exit signal handler // install the exit signal handler
exit := make(chan struct{}) exit := make(chan struct{})
defer close(exit) defer close(exit)

181
examples/lib/libmgmt2.go Normal file
View File

@@ -0,0 +1,181 @@
// libmgmt example
package main
import (
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/purpleidea/mgmt/gapi"
mgmt "github.com/purpleidea/mgmt/mgmtmain"
"github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/resources"
)
// MyGAPI implements the main GAPI interface.
type MyGAPI struct {
Name string // graph name
Count uint // number of resources to create
Interval uint // refresh interval, 0 to never refresh
data gapi.Data
initialized bool
closeChan chan struct{}
wg sync.WaitGroup // sync group for tunnel go routines
}
// NewMyGAPI creates a new MyGAPI struct and calls Init().
func NewMyGAPI(data gapi.Data, name string, interval uint, count uint) (*MyGAPI, error) {
obj := &MyGAPI{
Name: name,
Count: count,
Interval: interval,
}
return obj, obj.Init(data)
}
// Init initializes the MyGAPI struct.
func (obj *MyGAPI) Init(data gapi.Data) error {
if obj.initialized {
return fmt.Errorf("Already initialized!")
}
if obj.Name == "" {
return fmt.Errorf("The graph name must be specified!")
}
obj.data = data // store for later
obj.closeChan = make(chan struct{})
obj.initialized = true
return nil
}
// Graph returns a current Graph.
func (obj *MyGAPI) Graph() (*pgraph.Graph, error) {
if !obj.initialized {
return nil, fmt.Errorf("libmgmt: MyGAPI is not initialized")
}
g := pgraph.NewGraph(obj.Name)
var vertex *pgraph.Vertex
for i := uint(0); i < obj.Count; i++ {
n, err := resources.NewNoopRes(fmt.Sprintf("noop%d", i))
if err != nil {
return nil, fmt.Errorf("Can't create resource: %v", err)
}
v := pgraph.NewVertex(n)
g.AddVertex(v)
if i > 0 {
g.AddEdge(vertex, v, pgraph.NewEdge(fmt.Sprintf("e%d", i)))
}
vertex = v // save
}
//g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.EmbdEtcd, obj.data.Noop)
return g, nil
}
// SwitchStream returns nil errors every time there could be a new graph.
func (obj *MyGAPI) SwitchStream() chan error {
if obj.data.NoWatch || obj.Interval <= 0 {
return nil
}
ch := make(chan error)
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
defer close(ch) // this will run before the obj.wg.Done()
if !obj.initialized {
ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized")
return
}
// arbitrarily change graph every interval seconds
ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Printf("libmgmt: Generating new graph...")
ch <- nil // trigger a run
case <-obj.closeChan:
return
}
}
}()
return ch
}
// Close shuts down the MyGAPI.
func (obj *MyGAPI) Close() error {
if !obj.initialized {
return fmt.Errorf("libmgmt: MyGAPI is not initialized")
}
close(obj.closeChan)
obj.wg.Wait()
obj.initialized = false // closed = true
return nil
}
// Run runs an embedded mgmt server.
func Run() error {
obj := &mgmt.Main{}
obj.Program = "libmgmt" // TODO: set on compilation
obj.Version = "0.0.1" // TODO: set on compilation
obj.TmpPrefix = true
obj.IdealClusterSize = -1
obj.ConvergedTimeout = -1
obj.Noop = true
obj.GAPI = &MyGAPI{ // graph API
Name: "libmgmt", // TODO: set on compilation
Count: 60, // number of vertices to add
Interval: 15, // arbitrarily change graph every 15 seconds
}
if err := obj.Init(); err != nil {
return err
}
// install the exit signal handler
exit := make(chan struct{})
defer close(exit)
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt) // catch ^C
//signal.Notify(signals, os.Kill) // catch signals
signal.Notify(signals, syscall.SIGTERM)
select {
case sig := <-signals: // any signal will do
if sig == os.Interrupt {
log.Println("Interrupted by ^C")
obj.Exit(nil)
return
}
log.Println("Interrupted by signal")
obj.Exit(fmt.Errorf("Killed by %v", sig))
return
case <-exit:
return
}
}()
if err := obj.Run(); err != nil {
return err
}
return nil
}
func main() {
log.Printf("Hello!")
if err := Run(); err != nil {
fmt.Println(err)
os.Exit(1)
return
}
log.Printf("Goodbye!")
}

41
gapi/gapi.go Normal file
View File

@@ -0,0 +1,41 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Package gapi defines the interface that graph API generators must meet.
package gapi
import (
"github.com/purpleidea/mgmt/etcd"
"github.com/purpleidea/mgmt/pgraph"
)
// Data is the set of input values passed into the GAPI structs via Init.
type Data struct {
Hostname string // uuid for the host, required for GAPI
EmbdEtcd *etcd.EmbdEtcd
Noop bool
NoWatch bool
// NOTE: we can add more fields here if needed by GAPI endpoints
}
// GAPI is a Graph API that represents incoming graphs and change streams.
type GAPI interface {
Init(Data) error // initializes the GAPI and passes in useful data
Graph() (*pgraph.Graph, error) // returns the most recent pgraph
SwitchStream() chan error // returns a stream of switch events
Close() error // shutdown the GAPI
}

View File

@@ -24,6 +24,9 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/purpleidea/mgmt/puppet"
"github.com/purpleidea/mgmt/yamlgraph"
"github.com/urfave/cli" "github.com/urfave/cli"
) )
@@ -35,24 +38,44 @@ func run(c *cli.Context) error {
obj.Program = c.App.Name obj.Program = c.App.Name
obj.Version = c.App.Version obj.Version = c.App.Version
if h := c.String("hostname"); c.IsSet("hostname") && h != "" {
obj.Hostname = &h
}
if s := c.String("prefix"); c.IsSet("prefix") && s != "" { if s := c.String("prefix"); c.IsSet("prefix") && s != "" {
obj.Prefix = &s obj.Prefix = &s
} }
obj.TmpPrefix = c.Bool("tmp-prefix") obj.TmpPrefix = c.Bool("tmp-prefix")
obj.AllowTmpPrefix = c.Bool("allow-tmp-prefix") obj.AllowTmpPrefix = c.Bool("allow-tmp-prefix")
if h := c.String("hostname"); c.IsSet("hostname") && h != "" { if _ = c.String("code"); c.IsSet("code") {
obj.Hostname = &h if obj.GAPI != nil {
return fmt.Errorf("Can't combine code GAPI with existing GAPI.")
}
// TODO: implement DSL GAPI
//obj.GAPI = &dsl.GAPI{
// Code: &s,
//}
return fmt.Errorf("The Code GAPI is not implemented yet!") // TODO: DSL
} }
if y := c.String("yaml"); c.IsSet("yaml") {
if f := c.String("file"); c.IsSet("file") { if obj.GAPI != nil {
obj.File = &f return fmt.Errorf("Can't combine YAML GAPI with existing GAPI.")
}
obj.GAPI = &yamlgraph.GAPI{
File: &y,
}
} }
if p := c.String("puppet"); c.IsSet("puppet") { if p := c.String("puppet"); c.IsSet("puppet") {
obj.Puppet = &p if obj.GAPI != nil {
return fmt.Errorf("Can't combine puppet GAPI with existing GAPI.")
}
obj.GAPI = &puppet.GAPI{
PuppetParam: &p,
PuppetConf: c.String("puppet-conf"),
}
} }
obj.PuppetConf = c.String("puppet-conf") obj.Remotes = c.StringSlice("remote") // FIXME: GAPI-ify somehow?
obj.Remotes = c.StringSlice("remote")
obj.NoWatch = c.Bool("no-watch") obj.NoWatch = c.Bool("no-watch")
obj.Noop = c.Bool("noop") obj.Noop = c.Bool("noop")
@@ -129,6 +152,13 @@ func CLI(program, version string) error {
Usage: "run", Usage: "run",
Action: run, Action: run,
Flags: []cli.Flag{ Flags: []cli.Flag{
// useful for testing multiple instances on same machine
cli.StringFlag{
Name: "hostname",
Value: "",
Usage: "hostname to use",
},
cli.StringFlag{ cli.StringFlag{
Name: "prefix", Name: "prefix",
Usage: "specify a path to the working prefix directory", Usage: "specify a path to the working prefix directory",
@@ -143,23 +173,15 @@ func CLI(program, version string) error {
Usage: "allow creation of a new temporary prefix if main prefix is unavailable", Usage: "allow creation of a new temporary prefix if main prefix is unavailable",
}, },
// useful for testing multiple instances on same machine
cli.StringFlag{
Name: "hostname",
Value: "",
Usage: "hostname to use",
},
cli.StringFlag{ cli.StringFlag{
Name: "code, c", Name: "code, c",
Value: "", Value: "",
Usage: "code definition to run", Usage: "code definition to run",
}, },
cli.StringFlag{ cli.StringFlag{
Name: "file, f", Name: "yaml",
Value: "", Value: "",
Usage: "graph definition to run", Usage: "yaml graph definition to run",
EnvVar: "MGMT_FILE",
}, },
cli.StringFlag{ cli.StringFlag{
Name: "puppet, p", Name: "puppet, p",
@@ -179,7 +201,7 @@ func CLI(program, version string) error {
cli.BoolFlag{ cli.BoolFlag{
Name: "no-watch", Name: "no-watch",
Usage: "do not update graph on watched graph definition file changes", Usage: "do not update graph on stream switch events",
}, },
cli.BoolFlag{ cli.BoolFlag{
Name: "noop", Name: "noop",

View File

@@ -27,15 +27,16 @@ import (
"github.com/purpleidea/mgmt/converger" "github.com/purpleidea/mgmt/converger"
"github.com/purpleidea/mgmt/etcd" "github.com/purpleidea/mgmt/etcd"
"github.com/purpleidea/mgmt/gconfig" "github.com/purpleidea/mgmt/gapi"
"github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/puppet"
"github.com/purpleidea/mgmt/recwatch" "github.com/purpleidea/mgmt/recwatch"
"github.com/purpleidea/mgmt/remote" "github.com/purpleidea/mgmt/remote"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
etcdtypes "github.com/coreos/etcd/pkg/types" etcdtypes "github.com/coreos/etcd/pkg/types"
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"
multierr "github.com/hashicorp/go-multierror"
errwrap "github.com/pkg/errors"
) )
// Main is the main struct for running the mgmt logic. // Main is the main struct for running the mgmt logic.
@@ -43,17 +44,14 @@ type Main struct {
Program string // the name of this program, usually set at compile time Program string // the name of this program, usually set at compile time
Version string // the version of this program, usually set at compile time Version string // the version of this program, usually set at compile time
Hostname *string // hostname to use; nil if undefined
Prefix *string // prefix passed in; nil if undefined Prefix *string // prefix passed in; nil if undefined
TmpPrefix bool // request a pseudo-random, temporary prefix to be used TmpPrefix bool // request a pseudo-random, temporary prefix to be used
AllowTmpPrefix bool // allow creation of a new temporary prefix if main prefix is unavailable AllowTmpPrefix bool // allow creation of a new temporary prefix if main prefix is unavailable
Hostname *string // hostname to use; nil if undefined GAPI gapi.GAPI // graph API interface struct
Remotes []string // list of remote graph definitions to run
File *string // graph file to run; nil if undefined
Puppet *string // puppet mode to run; nil if undefined
PuppetConf string // the path to an alternate puppet.conf file
GAPI func() *gconfig.GraphConfig // graph API; nil if undefined
Remotes []string // list of remote graph definitions to run
NoWatch bool // do not update graph on watched graph definition file changes NoWatch bool // do not update graph on watched graph definition file changes
Noop bool // globally force all resources into no-op mode Noop bool // globally force all resources into no-op mode
@@ -82,8 +80,7 @@ type Main struct {
serverURLs etcdtypes.URLs // processed server urls value serverURLs etcdtypes.URLs // processed server urls value
idealClusterSize uint16 // processed ideal cluster size value idealClusterSize uint16 // processed ideal cluster size value
exit chan error // exit signal exit chan error // exit signal
switchChan chan func() *gconfig.GraphConfig // graph switches
} }
// Init initializes the main struct after it performs some validation. // Init initializes the main struct after it performs some validation.
@@ -97,10 +94,6 @@ func (obj *Main) Init() error {
return fmt.Errorf("Choosing a prefix and the request for a tmp prefix is illogical!") return fmt.Errorf("Choosing a prefix and the request for a tmp prefix is illogical!")
} }
if obj.File != nil && obj.Puppet != nil {
return fmt.Errorf("The File and Puppet parameters cannot be used together!")
}
obj.idealClusterSize = uint16(obj.IdealClusterSize) obj.idealClusterSize = uint16(obj.IdealClusterSize)
if obj.IdealClusterSize < 0 { // value is undefined, set to the default if obj.IdealClusterSize < 0 { // value is undefined, set to the default
obj.idealClusterSize = etcd.DefaultIdealClusterSize obj.idealClusterSize = etcd.DefaultIdealClusterSize
@@ -152,7 +145,6 @@ func (obj *Main) Init() error {
} }
obj.exit = make(chan error) obj.exit = make(chan error)
obj.switchChan = make(chan func() *gconfig.GraphConfig)
return nil return nil
} }
@@ -161,14 +153,6 @@ func (obj *Main) Exit(err error) {
obj.exit <- err // trigger an exit! obj.exit <- err // trigger an exit!
} }
// Switch causes mgmt try to switch the currently running graph to a new one.
// The function passed in will usually be called immediately, but it can also
// happen after a delay, and more often than this Switch function is called!
func (obj *Main) Switch(f func() *gconfig.GraphConfig) {
obj.switchChan <- f
// TODO: should we get an ACK() and pass back a return value ?
}
// Run is the main execution entrypoint to run mgmt. // Run is the main execution entrypoint to run mgmt.
func (obj *Main) Run() error { func (obj *Main) Run() error {
@@ -192,19 +176,15 @@ func (obj *Main) Run() error {
log.Printf("This is: %s, version: %s", obj.Program, obj.Version) log.Printf("This is: %s, version: %s", obj.Program, obj.Version)
log.Printf("Main: Start: %v", start) log.Printf("Main: Start: %v", start)
var hostname, _ = os.Hostname() hostname, err := os.Hostname() // a sensible default
// allow passing in the hostname, instead of using --hostname // allow passing in the hostname, instead of using the system setting
if obj.File != nil { if h := obj.Hostname; h != nil && *h != "" { // override by cli
if config := gconfig.ParseConfigFromFile(*obj.File); config != nil { hostname = *h
if h := config.Hostname; h != "" { } else if err != nil {
hostname = h return errwrap.Wrapf(err, "Can't get default hostname!")
}
}
} }
if obj.Hostname != nil { // override by cli if hostname == "" { // safety check
if h := obj.Hostname; *h != "" { return fmt.Errorf("Hostname cannot be empty!")
hostname = *h
}
} }
var prefix = fmt.Sprintf("/var/lib/%s/", obj.Program) // default prefix var prefix = fmt.Sprintf("/var/lib/%s/", obj.Program) // default prefix
@@ -215,7 +195,7 @@ func (obj *Main) Run() error {
if obj.TmpPrefix || os.MkdirAll(prefix, 0770) != nil { if obj.TmpPrefix || os.MkdirAll(prefix, 0770) != nil {
if obj.TmpPrefix || obj.AllowTmpPrefix { if obj.TmpPrefix || obj.AllowTmpPrefix {
var err error var err error
if prefix, err = ioutil.TempDir("", obj.Program+"-"); err != nil { if prefix, err = ioutil.TempDir("", obj.Program+"-"+hostname+"-"); err != nil {
return fmt.Errorf("Main: Error: Can't create temporary prefix!") return fmt.Errorf("Main: Error: Can't create temporary prefix!")
} }
log.Println("Main: Warning: Working prefix directory is temporary!") log.Println("Main: Warning: Working prefix directory is temporary!")
@@ -227,7 +207,7 @@ func (obj *Main) Run() error {
log.Printf("Main: Working prefix is: %s", prefix) log.Printf("Main: Working prefix is: %s", prefix)
var wg sync.WaitGroup var wg sync.WaitGroup
var G, fullGraph *pgraph.Graph var G, oldGraph *pgraph.Graph
// exit after `max-runtime` seconds for no reason at all... // exit after `max-runtime` seconds for no reason at all...
if i := obj.MaxRuntime; i > 0 { if i := obj.MaxRuntime; i > 0 {
@@ -285,19 +265,26 @@ func (obj *Main) Run() error {
converger.SetStateFn(convergerStateFn) converger.SetStateFn(convergerStateFn)
} }
var gapiChan chan error // stream events are nil errors
if obj.GAPI != nil {
data := gapi.Data{
Hostname: hostname,
EmbdEtcd: EmbdEtcd,
Noop: obj.Noop,
NoWatch: obj.NoWatch,
}
if err := obj.GAPI.Init(data); err != nil {
obj.Exit(fmt.Errorf("Main: GAPI: Init failed: %v", err))
} else if !obj.NoWatch {
gapiChan = obj.GAPI.SwitchStream() // stream of graph switch events!
}
}
exitchan := make(chan struct{}) // exit on close exitchan := make(chan struct{}) // exit on close
go func() { go func() {
startchan := make(chan struct{}) // start signal startchan := make(chan struct{}) // start signal
go func() { startchan <- struct{}{} }() go func() { startchan <- struct{}{} }()
var configChan chan error
var puppetChan <-chan time.Time
var customFunc = obj.GAPI // default
if !obj.NoWatch && obj.File != nil {
configChan = recwatch.ConfigWatch(*obj.File)
} else if obj.Puppet != nil {
interval := puppet.PuppetInterval(obj.PuppetConf)
puppetChan = time.Tick(time.Duration(interval) * time.Second)
}
log.Println("Etcd: Starting...") log.Println("Etcd: Starting...")
etcdchan := etcd.EtcdWatch(EmbdEtcd) etcdchan := etcd.EtcdWatch(EmbdEtcd)
first := true // first loop or not first := true // first loop or not
@@ -313,60 +300,42 @@ func (obj *Main) Run() error {
} }
// everything else passes through to cause a compile! // everything else passes through to cause a compile!
case customFunc = <-obj.switchChan: case err, ok := <-gapiChan:
// handle a graph switch with a new custom function if !ok { // channel closed
obj.GAPI = customFunc continue
case <-puppetChan:
// nothing, just go on
case e := <-configChan:
if obj.NoWatch {
continue // not ready to read config
} }
if e != nil { if err != nil {
obj.Exit(e) // trigger exit obj.Exit(err) // trigger exit
continue continue
//return // TODO: return or wait for exitchan? //return // TODO: return or wait for exitchan?
} }
// XXX: case compile_event: ... if obj.NoWatch { // extra safety for bad GAPI's
// ... log.Printf("Main: GAPI stream should be quiet with NoWatch!") // fix the GAPI!
continue // no stream events should be sent
}
case <-exitchan: case <-exitchan:
return return
} }
var config *gconfig.GraphConfig if obj.GAPI == nil {
if obj.File != nil { log.Printf("Config: GAPI is empty!")
config = gconfig.ParseConfigFromFile(*obj.File)
} else if obj.Puppet != nil {
config = puppet.ParseConfigFromPuppet(*obj.Puppet, obj.PuppetConf)
} else if obj.GAPI != nil {
config = obj.GAPI()
}
if config == nil {
log.Printf("Config: Parse failure")
continue continue
} }
if config.Hostname != "" && config.Hostname != hostname { // we need the vertices to be paused to work on them, so
log.Printf("Config: Hostname changed, ignoring config!")
continue
}
config.Hostname = hostname // set it in case it was ""
// run graph vertex LOCK... // run graph vertex LOCK...
if !first { // TODO: we can flatten this check out I think if !first { // TODO: we can flatten this check out I think
converger.Pause() // FIXME: add sync wait? converger.Pause() // FIXME: add sync wait?
G.Pause() // sync G.Pause() // sync
//G.UnGroup() // FIXME: implement me if needed!
} }
// build graph from config struct on events, eg: etcd... // make the graph from yaml, lib, puppet->yaml, or dsl!
// we need the vertices to be paused to work on them newGraph, err := obj.GAPI.Graph() // generate graph!
if newFullgraph, err := config.NewGraphFromConfig(fullGraph, EmbdEtcd, obj.Noop); err == nil { // keep references to all original elements if err != nil {
fullGraph = newFullgraph log.Printf("Config: Error creating new graph: %v", err)
} else {
log.Printf("Config: Error making new graph from config: %v", err)
// unpause! // unpause!
if !first { if !first {
G.Start(&wg, first) // sync G.Start(&wg, first) // sync
@@ -375,18 +344,40 @@ func (obj *Main) Run() error {
continue continue
} }
G = fullGraph.Copy() // copy to active graph // apply the global noop parameter if requested
// XXX: do etcd transaction out here... if obj.Noop {
for _, m := range newGraph.GraphMetas() {
m.Noop = obj.Noop
}
}
// FIXME: make sure we "UnGroup()" any semi-destructive
// changes to the resources so our efficient GraphSync
// will be able to re-use and cmp to the old graph.
newFullGraph, err := newGraph.GraphSync(oldGraph)
if err != nil {
log.Printf("Config: Error running graph sync: %v", err)
// unpause!
if !first {
G.Start(&wg, first) // sync
converger.Start() // after G.Start()
}
continue
}
oldGraph = newFullGraph // save old graph
G = oldGraph.Copy() // copy to active graph
G.AutoEdges() // add autoedges; modifies the graph G.AutoEdges() // add autoedges; modifies the graph
G.AutoGroup() // run autogroup; modifies the graph G.AutoGroup() // run autogroup; modifies the graph
// TODO: do we want to do a transitive reduction? // TODO: do we want to do a transitive reduction?
log.Printf("Graph: %v", G) // show graph log.Printf("Graph: %v", G) // show graph
err := G.ExecGraphviz(obj.GraphvizFilter, obj.Graphviz) if obj.GraphvizFilter != "" {
if err != nil { if err := G.ExecGraphviz(obj.GraphvizFilter, obj.Graphviz); err != nil {
log.Printf("Graphviz: %v", err) log.Printf("Graphviz: %v", err)
} else { } else {
log.Printf("Graphviz: Successfully generated graph!") log.Printf("Graphviz: Successfully generated graph!")
}
} }
G.AssociateData(converger) G.AssociateData(converger)
// G.Start(...) needs to be synchronous or wait, // G.Start(...) needs to be synchronous or wait,
@@ -444,17 +435,27 @@ func (obj *Main) Run() error {
// wait for etcd to be running before we remote in, which we do above! // wait for etcd to be running before we remote in, which we do above!
go remotes.Run() go remotes.Run()
if obj.File == nil && obj.Puppet == nil && obj.GAPI == nil { if obj.GAPI == nil {
converger.Start() // better start this for empty graphs converger.Start() // better start this for empty graphs
} }
log.Println("Main: Running...") log.Println("Main: Running...")
err := <-obj.exit // wait for exit signal reterr := <-obj.exit // wait for exit signal
log.Println("Destroy...") log.Println("Destroy...")
configWatcher.Close() // stop sending file changes to remotes if obj.GAPI != nil {
remotes.Exit() // tell all the remote connections to shutdown; waits! if err := obj.GAPI.Close(); err != nil {
err = errwrap.Wrapf(err, "GAPI closed poorly!")
reterr = multierr.Append(reterr, err) // list of errors
}
}
configWatcher.Close() // stop sending file changes to remotes
if err := remotes.Exit(); err != nil { // tell all the remote connections to shutdown; waits!
err = errwrap.Wrapf(err, "Remote exited poorly!")
reterr = multierr.Append(reterr, err) // list of errors
}
G.Exit() // tell all the children to exit G.Exit() // tell all the children to exit
@@ -463,7 +464,8 @@ func (obj *Main) Run() error {
// cleanup etcd main loop last so it can process everything first // cleanup etcd main loop last so it can process everything first
if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd
log.Printf("Etcd exited poorly with: %v", err) err = errwrap.Wrapf(err, "Etcd exited poorly!")
reterr = multierr.Append(reterr, err) // list of errors
} }
if obj.DEBUG { if obj.DEBUG {
@@ -474,5 +476,5 @@ func (obj *Main) Run() error {
// TODO: wait for each vertex to exit... // TODO: wait for each vertex to exit...
log.Println("Goodbye!") log.Println("Goodbye!")
return err return reterr
} }

View File

@@ -22,6 +22,8 @@ import (
"log" "log"
"github.com/purpleidea/mgmt/global" "github.com/purpleidea/mgmt/global"
errwrap "github.com/pkg/errors"
) )
// AutoGrouper is the required interface to implement for an autogroup algorithm // AutoGrouper is the required interface to implement for an autogroup algorithm
@@ -283,8 +285,8 @@ func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex)
g.DeleteVertex(v2) // remove grouped vertex g.DeleteVertex(v2) // remove grouped vertex
// 5) creation of a cyclic graph should throw an error // 5) creation of a cyclic graph should throw an error
if _, dag := g.TopologicalSort(); !dag { // am i a dag or not? if _, err := g.TopologicalSort(); err != nil { // am i a dag or not?
return fmt.Errorf("Graph is not a dag!") return errwrap.Wrapf(err, "TopologicalSort failed") // not a dag
} }
return nil // success return nil // success
} }

View File

@@ -19,7 +19,6 @@
package pgraph package pgraph
import ( import (
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
@@ -36,6 +35,8 @@ import (
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/global" "github.com/purpleidea/mgmt/global"
"github.com/purpleidea/mgmt/resources" "github.com/purpleidea/mgmt/resources"
errwrap "github.com/pkg/errors"
) )
//go:generate stringer -type=graphState -output=graphstate_stringer.go //go:generate stringer -type=graphState -output=graphstate_stringer.go
@@ -163,6 +164,18 @@ func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) {
g.Adjacency[v1][v2] = e g.Adjacency[v1][v2] = e
} }
// DeleteEdge deletes a particular edge from the graph.
// FIXME: add test cases
func (g *Graph) DeleteEdge(e *Edge) {
for v1 := range g.Adjacency {
for v2, edge := range g.Adjacency[v1] {
if e == edge {
delete(g.Adjacency[v1], v2)
}
}
}
}
// GetVertexMatch searches for an equivalent resource in the graph and returns // GetVertexMatch searches for an equivalent resource in the graph and returns
// the vertex it is found in, or nil if not found. // the vertex it is found in, or nil if not found.
func (g *Graph) GetVertexMatch(obj resources.Res) *Vertex { func (g *Graph) GetVertexMatch(obj resources.Res) *Vertex {
@@ -285,11 +298,11 @@ func (g *Graph) ExecGraphviz(program, filename string) error {
switch program { switch program {
case "dot", "neato", "twopi", "circo", "fdp": case "dot", "neato", "twopi", "circo", "fdp":
default: default:
return errors.New("Invalid graphviz program selected!") return fmt.Errorf("Invalid graphviz program selected!")
} }
if filename == "" { if filename == "" {
return errors.New("No filename given!") return fmt.Errorf("No filename given!")
} }
// run as a normal user if possible when run with sudo // run as a normal user if possible when run with sudo
@@ -298,18 +311,18 @@ func (g *Graph) ExecGraphviz(program, filename string) error {
err := ioutil.WriteFile(filename, []byte(g.Graphviz()), 0644) err := ioutil.WriteFile(filename, []byte(g.Graphviz()), 0644)
if err != nil { if err != nil {
return errors.New("Error writing to filename!") return fmt.Errorf("Error writing to filename!")
} }
if err1 == nil && err2 == nil { if err1 == nil && err2 == nil {
if err := os.Chown(filename, uid, gid); err != nil { if err := os.Chown(filename, uid, gid); err != nil {
return errors.New("Error changing file owner!") return fmt.Errorf("Error changing file owner!")
} }
} }
path, err := exec.LookPath(program) path, err := exec.LookPath(program)
if err != nil { if err != nil {
return errors.New("Graphviz is missing!") return fmt.Errorf("Graphviz is missing!")
} }
out := fmt.Sprintf("%v.png", filename) out := fmt.Sprintf("%v.png", filename)
@@ -324,7 +337,7 @@ func (g *Graph) ExecGraphviz(program, filename string) error {
} }
_, err = cmd.Output() _, err = cmd.Output()
if err != nil { if err != nil {
return errors.New("Error writing to image!") return fmt.Errorf("Error writing to image!")
} }
return nil return nil
} }
@@ -468,7 +481,7 @@ func (g *Graph) OutDegree() map[*Vertex]int {
// TopologicalSort returns the sort of graph vertices in that order. // TopologicalSort returns the sort of graph vertices in that order.
// based on descriptions and code from wikipedia and rosetta code // based on descriptions and code from wikipedia and rosetta code
// TODO: add memoization, and cache invalidation to speed this up :) // TODO: add memoization, and cache invalidation to speed this up :)
func (g *Graph) TopologicalSort() (result []*Vertex, ok bool) { // kahn's algorithm func (g *Graph) TopologicalSort() ([]*Vertex, error) { // kahn's algorithm
var L []*Vertex // empty list that will contain the sorted elements var L []*Vertex // empty list that will contain the sorted elements
var S []*Vertex // set of all nodes with no incoming edges var S []*Vertex // set of all nodes with no incoming edges
remaining := make(map[*Vertex]int) // amount of edges remaining remaining := make(map[*Vertex]int) // amount of edges remaining
@@ -505,13 +518,13 @@ func (g *Graph) TopologicalSort() (result []*Vertex, ok bool) { // kahn's algori
if in > 0 { if in > 0 {
for n := range g.Adjacency[c] { for n := range g.Adjacency[c] {
if remaining[n] > 0 { if remaining[n] > 0 {
return nil, false // not a dag! return nil, fmt.Errorf("Not a dag!")
} }
} }
} }
} }
return L, true return L, nil
} }
// Reachability finds the shortest path in a DAG from a to b, and returns the // Reachability finds the shortest path in a DAG from a to b, and returns the
@@ -939,6 +952,94 @@ func (g *Graph) Exit() {
} }
} }
// GraphSync updates the oldGraph so that it matches the newGraph receiver. It
// leaves identical elements alone so that they don't need to be refreshed.
// FIXME: add test cases
func (g *Graph) GraphSync(oldGraph *Graph) (*Graph, error) {
if oldGraph == nil {
oldGraph = NewGraph(g.GetName()) // copy over the name
}
oldGraph.SetName(g.GetName()) // overwrite the name
var lookup = make(map[*Vertex]*Vertex)
var vertexKeep []*Vertex // list of vertices which are the same in new graph
var edgeKeep []*Edge // list of vertices which are the same in new graph
for v := range g.Adjacency { // loop through the vertices (resources)
res := v.Res // resource
vertex := oldGraph.GetVertexMatch(res)
if vertex == nil { // no match found
if err := res.Init(); err != nil {
return nil, errwrap.Wrapf(err, "could not Init() resource")
}
vertex = NewVertex(res)
oldGraph.AddVertex(vertex) // call standalone in case not part of an edge
}
lookup[v] = vertex // used for constructing edges
vertexKeep = append(vertexKeep, vertex) // append
}
// get rid of any vertices we shouldn't keep (that aren't in new graph)
for v := range oldGraph.Adjacency {
if !VertexContains(v, vertexKeep) {
// wait for exit before starting new graph!
v.SendEvent(event.EventExit, true, false)
oldGraph.DeleteVertex(v)
}
}
// compare edges
for v1 := range g.Adjacency { // loop through the vertices (resources)
for v2, e := range g.Adjacency[v1] {
// we have an edge!
// lookup vertices (these should exist now)
//res1 := v1.Res // resource
//res2 := v2.Res
//vertex1 := oldGraph.GetVertexMatch(res1)
//vertex2 := oldGraph.GetVertexMatch(res2)
vertex1, exists1 := lookup[v1]
vertex2, exists2 := lookup[v2]
if !exists1 || !exists2 { // no match found, bug?
//if vertex1 == nil || vertex2 == nil { // no match found
return nil, fmt.Errorf("New vertices weren't found!") // programming error
}
edge, exists := oldGraph.Adjacency[vertex1][vertex2]
if !exists || edge.Name != e.Name { // TODO: edgeCmp
edge = e // use or overwrite edge
}
oldGraph.Adjacency[vertex1][vertex2] = edge // store it (AddEdge)
edgeKeep = append(edgeKeep, edge) // mark as saved
}
}
// delete unused edges
for v1 := range oldGraph.Adjacency {
for _, e := range oldGraph.Adjacency[v1] {
// we have an edge!
if !EdgeContains(e, edgeKeep) {
oldGraph.DeleteEdge(e)
}
}
}
return oldGraph, nil
}
// GraphMetas returns a list of pointers to each of the resource MetaParams.
func (g *Graph) GraphMetas() []*resources.MetaParams {
metas := []*resources.MetaParams{}
for v := range g.Adjacency { // loop through the vertices (resources))
res := v.Res // resource
meta := res.Meta()
metas = append(metas, meta)
}
return metas
}
// AssociateData associates some data with the object in the graph in question // AssociateData associates some data with the object in the graph in question
func (g *Graph) AssociateData(converger converger.Converger) { func (g *Graph) AssociateData(converger converger.Converger) {
for v := range g.GetVerticesChan() { for v := range g.GetVerticesChan() {
@@ -956,6 +1057,16 @@ func VertexContains(needle *Vertex, haystack []*Vertex) bool {
return false return false
} }
// EdgeContains is an "in array" function to test for an edge in a slice of edges.
func EdgeContains(needle *Edge, haystack []*Edge) bool {
for _, v := range haystack {
if needle == v {
return true
}
}
return false
}
// Reverse reverses a list of vertices. // Reverse reverses a list of vertices.
func Reverse(vs []*Vertex) []*Vertex { func Reverse(vs []*Vertex) []*Vertex {
//var out []*Vertex // XXX: golint suggests, but it fails testing //var out []*Vertex // XXX: golint suggests, but it fails testing

View File

@@ -352,11 +352,11 @@ func TestPgraphT9(t *testing.T) {
t.Errorf("Outdegree of v6 should be 0 instead of: %d.", i) t.Errorf("Outdegree of v6 should be 0 instead of: %d.", i)
} }
s, ok := G.TopologicalSort() s, err := G.TopologicalSort()
// either possibility is a valid toposort // either possibility is a valid toposort
match := reflect.DeepEqual(s, []*Vertex{v1, v2, v3, v4, v5, v6}) || reflect.DeepEqual(s, []*Vertex{v1, v3, v2, v4, v5, v6}) match := reflect.DeepEqual(s, []*Vertex{v1, v2, v3, v4, v5, v6}) || reflect.DeepEqual(s, []*Vertex{v1, v3, v2, v4, v5, v6})
if !ok || !match { if err != nil || !match {
t.Errorf("Topological sort failed, status: %v.", ok) t.Errorf("Topological sort failed, error: %v.", err)
str := "Found:" str := "Found:"
for _, v := range s { for _, v := range s {
str += " " + v.Res.GetName() str += " " + v.Res.GetName()
@@ -387,8 +387,8 @@ func TestPgraphT10(t *testing.T) {
G.AddEdge(v5, v6, e5) G.AddEdge(v5, v6, e5)
G.AddEdge(v4, v2, e6) // cycle G.AddEdge(v4, v2, e6) // cycle
if _, ok := G.TopologicalSort(); ok { if _, err := G.TopologicalSort(); err == nil {
t.Errorf("Topological sort passed, but graph is cyclic.") t.Errorf("Topological sort passed, but graph is cyclic!")
} }
} }

121
puppet/gapi.go Normal file
View File

@@ -0,0 +1,121 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package puppet
import (
"fmt"
"log"
"sync"
"time"
"github.com/purpleidea/mgmt/gapi"
"github.com/purpleidea/mgmt/pgraph"
)
// GAPI implements the main puppet GAPI interface.
type GAPI struct {
PuppetParam *string // puppet mode to run; nil if undefined
PuppetConf string // the path to an alternate puppet.conf file
data gapi.Data
initialized bool
closeChan chan struct{}
wg sync.WaitGroup // sync group for tunnel go routines
}
// NewGAPI creates a new puppet GAPI struct and calls Init().
func NewGAPI(data gapi.Data, puppetParam *string, puppetConf string) (*GAPI, error) {
obj := &GAPI{
PuppetParam: puppetParam,
PuppetConf: puppetConf,
}
return obj, obj.Init(data)
}
// Init initializes the puppet GAPI struct.
func (obj *GAPI) Init(data gapi.Data) error {
if obj.initialized {
return fmt.Errorf("Already initialized!")
}
if obj.PuppetParam == nil {
return fmt.Errorf("The PuppetParam param must be specified!")
}
obj.data = data // store for later
obj.closeChan = make(chan struct{})
obj.initialized = true
return nil
}
// Graph returns a current Graph.
func (obj *GAPI) Graph() (*pgraph.Graph, error) {
if !obj.initialized {
return nil, fmt.Errorf("Puppet: GAPI is not initialized!")
}
config := ParseConfigFromPuppet(*obj.PuppetParam, obj.PuppetConf)
if config == nil {
return nil, fmt.Errorf("Puppet: ParseConfigFromPuppet returned nil!")
}
g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.EmbdEtcd, obj.data.Noop)
return g, err
}
// SwitchStream returns nil errors every time there could be a new graph.
func (obj *GAPI) SwitchStream() chan error {
if obj.data.NoWatch {
return nil
}
puppetChan := func() <-chan time.Time { // helper function
return time.Tick(time.Duration(PuppetInterval(obj.PuppetConf)) * time.Second)
}
ch := make(chan error)
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
defer close(ch) // this will run before the obj.wg.Done()
if !obj.initialized {
ch <- fmt.Errorf("Puppet: GAPI is not initialized!")
return
}
pChan := puppetChan()
for {
select {
case _, ok := <-pChan:
if !ok { // the channel closed!
return
}
log.Printf("Puppet: Generating new graph...")
pChan = puppetChan() // TODO: okay to update interval in case it changed?
ch <- nil // trigger a run
case <-obj.closeChan:
return
}
}
}()
return ch
}
// Close shuts down the Puppet GAPI.
func (obj *GAPI) Close() error {
if !obj.initialized {
return fmt.Errorf("Puppet: GAPI is not initialized!")
}
close(obj.closeChan)
obj.wg.Wait()
obj.initialized = false // closed = true
return nil
}

View File

@@ -26,8 +26,8 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/purpleidea/mgmt/gconfig"
"github.com/purpleidea/mgmt/global" "github.com/purpleidea/mgmt/global"
"github.com/purpleidea/mgmt/yamlgraph"
) )
const ( const (
@@ -87,7 +87,7 @@ func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) {
// ParseConfigFromPuppet takes a special puppet param string and config and // ParseConfigFromPuppet takes a special puppet param string and config and
// returns the graph configuration structure. // returns the graph configuration structure.
func ParseConfigFromPuppet(puppetParam, puppetConf string) *gconfig.GraphConfig { func ParseConfigFromPuppet(puppetParam, puppetConf string) *yamlgraph.GraphConfig {
var puppetConfArg string var puppetConfArg string
if puppetConf != "" { if puppetConf != "" {
puppetConfArg = "--config=" + puppetConf puppetConfArg = "--config=" + puppetConf
@@ -104,7 +104,7 @@ func ParseConfigFromPuppet(puppetParam, puppetConf string) *gconfig.GraphConfig
log.Println("Puppet: launching translator") log.Println("Puppet: launching translator")
var config gconfig.GraphConfig var config yamlgraph.GraphConfig
if data, err := runPuppetCommand(cmd); err != nil { if data, err := runPuppetCommand(cmd); err != nil {
return nil return nil
} else if err := config.Parse(data); err != nil { } else if err := config.Parse(data); err != nil {

View File

@@ -62,12 +62,14 @@ import (
"time" "time"
cv "github.com/purpleidea/mgmt/converger" cv "github.com/purpleidea/mgmt/converger"
"github.com/purpleidea/mgmt/gconfig"
"github.com/purpleidea/mgmt/global" "github.com/purpleidea/mgmt/global"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
"github.com/purpleidea/mgmt/yamlgraph"
multierr "github.com/hashicorp/go-multierror"
"github.com/howeyc/gopass" "github.com/howeyc/gopass"
"github.com/kardianos/osext" "github.com/kardianos/osext"
errwrap "github.com/pkg/errors"
"github.com/pkg/sftp" "github.com/pkg/sftp"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
) )
@@ -491,7 +493,7 @@ func (obj *SSH) Exec() error {
// TODO: do something less arbitrary about which one we pick? // TODO: do something less arbitrary about which one we pick?
url := cleanURL(obj.remoteURLs[0]) // arbitrarily pick the first one url := cleanURL(obj.remoteURLs[0]) // arbitrarily pick the first one
seeds := fmt.Sprintf("--no-server --seeds 'http://%s'", url) // XXX: escape untrusted input? (or check if url is valid) seeds := fmt.Sprintf("--no-server --seeds 'http://%s'", url) // XXX: escape untrusted input? (or check if url is valid)
file := fmt.Sprintf("--file '%s'", obj.filepath) // XXX: escape untrusted input! (or check if file path exists) file := fmt.Sprintf("--yaml '%s'", obj.filepath) // XXX: escape untrusted input! (or check if file path exists)
depth := fmt.Sprintf("--depth %d", obj.depth+1) // child is +1 distance depth := fmt.Sprintf("--depth %d", obj.depth+1) // child is +1 distance
args := []string{hostname, seeds, file, depth} args := []string{hostname, seeds, file, depth}
if obj.noop { if obj.noop {
@@ -734,7 +736,7 @@ func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fi
// It takes as input the path to a graph definition file. // It takes as input the path to a graph definition file.
func (obj *Remotes) NewSSH(file string) (*SSH, error) { func (obj *Remotes) NewSSH(file string) (*SSH, error) {
// first do the parsing... // first do the parsing...
config := gconfig.ParseConfigFromFile(file) config := yamlgraph.ParseConfigFromFile(file) // FIXME: GAPI-ify somehow?
if config == nil { if config == nil {
return nil, fmt.Errorf("Remote: Error parsing remote graph: %s", file) return nil, fmt.Errorf("Remote: Error parsing remote graph: %s", file)
} }
@@ -791,7 +793,8 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) {
return nil, fmt.Errorf("No authentication methods available!") return nil, fmt.Errorf("No authentication methods available!")
} }
hostname := config.Hostname //hostname := config.Hostname // TODO: optionally specify local hostname somehow
hostname := ""
if hostname == "" { if hostname == "" {
hostname = host // default to above hostname = host // default to above
} }
@@ -1017,11 +1020,12 @@ func (obj *Remotes) Run() {
// Exit causes as much of the Remotes struct to shutdown as quickly and as // Exit causes as much of the Remotes struct to shutdown as quickly and as
// cleanly as possible. It only returns once everything is shutdown. // cleanly as possible. It only returns once everything is shutdown.
func (obj *Remotes) Exit() { func (obj *Remotes) Exit() error {
obj.lock.Lock() obj.lock.Lock()
obj.exiting = true // don't spawn new ones once this flag is set! obj.exiting = true // don't spawn new ones once this flag is set!
obj.lock.Unlock() obj.lock.Unlock()
close(obj.exitChan) close(obj.exitChan)
var reterr error
for _, f := range obj.remotes { for _, f := range obj.remotes {
sshobj, exists := obj.sshmap[f] sshobj, exists := obj.sshmap[f]
if !exists || sshobj == nil { if !exists || sshobj == nil {
@@ -1030,7 +1034,8 @@ func (obj *Remotes) Exit() {
// TODO: should we run these as go routines? // TODO: should we run these as go routines?
if err := sshobj.Stop(); err != nil { if err := sshobj.Stop(); err != nil {
log.Printf("Remote: Error stopping: %s", err) err = errwrap.Wrapf(err, "Remote: Error stopping!")
reterr = multierr.Append(reterr, err) // list of errors
} }
} }
@@ -1040,6 +1045,7 @@ func (obj *Remotes) Exit() {
defer obj.cuid.Unregister() defer obj.cuid.Unregister()
obj.wg.Wait() // wait for everyone to exit obj.wg.Wait() // wait for everyone to exit
return reterr
} }
// fmtUID makes a random string of length n, it is not cryptographically safe. // fmtUID makes a random string of length n, it is not cryptographically safe.

View File

@@ -27,7 +27,7 @@ import (
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/global" "github.com/purpleidea/mgmt/global"
"github.com/pkg/errors" errwrap "github.com/pkg/errors"
"github.com/rgbkrk/libvirt-go" "github.com/rgbkrk/libvirt-go"
) )
@@ -78,7 +78,7 @@ func NewVirtRes(name string, uri, state string, transient bool, cpus uint16, mem
func (obj *VirtRes) Init() error { func (obj *VirtRes) Init() error {
if !libvirtInitialized { if !libvirtInitialized {
if err := libvirt.EventRegisterDefaultImpl(); err != nil { if err := libvirt.EventRegisterDefaultImpl(); err != nil {
return errors.Wrapf(err, "EventRegisterDefaultImpl failed") return errwrap.Wrapf(err, "EventRegisterDefaultImpl failed")
} }
libvirtInitialized = true libvirtInitialized = true
} }
@@ -139,7 +139,7 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error {
} }
//log.Printf("EventRunDefaultImpl started!") //log.Printf("EventRunDefaultImpl started!")
if err := libvirt.EventRunDefaultImpl(); err != nil { if err := libvirt.EventRunDefaultImpl(); err != nil {
errorChan <- errors.Wrapf(err, "EventRunDefaultImpl failed") errorChan <- errwrap.Wrapf(err, "EventRunDefaultImpl failed")
return return
} }
//log.Printf("EventRunDefaultImpl looped!") //log.Printf("EventRunDefaultImpl looped!")
@@ -254,13 +254,13 @@ func (obj *VirtRes) attrCheckApply(apply bool) (bool, error) {
dom, err := obj.conn.LookupDomainByName(obj.GetName()) dom, err := obj.conn.LookupDomainByName(obj.GetName())
if err != nil { if err != nil {
return false, errors.Wrapf(err, "conn.LookupDomainByName failed") return false, errwrap.Wrapf(err, "conn.LookupDomainByName failed")
} }
domInfo, err := dom.GetInfo() domInfo, err := dom.GetInfo()
if err != nil { if err != nil {
// we don't know if the state is ok // we don't know if the state is ok
return false, errors.Wrapf(err, "domain.GetInfo failed") return false, errwrap.Wrapf(err, "domain.GetInfo failed")
} }
// check memory // check memory
@@ -270,7 +270,7 @@ func (obj *VirtRes) attrCheckApply(apply bool) (bool, error) {
return false, nil return false, nil
} }
if err := dom.SetMemory(obj.Memory); err != nil { if err := dom.SetMemory(obj.Memory); err != nil {
return false, errors.Wrapf(err, "domain.SetMemory failed") return false, errwrap.Wrapf(err, "domain.SetMemory failed")
} }
log.Printf("%s[%s]: Memory changed", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Memory changed", obj.Kind(), obj.GetName())
} }
@@ -282,7 +282,7 @@ func (obj *VirtRes) attrCheckApply(apply bool) (bool, error) {
return false, nil return false, nil
} }
if err := dom.SetVcpus(obj.CPUs); err != nil { if err := dom.SetVcpus(obj.CPUs); err != nil {
return false, errors.Wrapf(err, "domain.SetVcpus failed") return false, errwrap.Wrapf(err, "domain.SetVcpus failed")
} }
log.Printf("%s[%s]: CPUs changed", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: CPUs changed", obj.Kind(), obj.GetName())
} }
@@ -373,13 +373,13 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) {
var c = true var c = true
dom, c, err = obj.domainCreate() // create the domain dom, c, err = obj.domainCreate() // create the domain
if err != nil { if err != nil {
return false, errors.Wrapf(err, "domainCreate failed") return false, errwrap.Wrapf(err, "domainCreate failed")
} else if !c { } else if !c {
checkOK = false checkOK = false
} }
} else { } else {
return false, errors.Wrapf(err, "LookupDomainByName failed") return false, errwrap.Wrapf(err, "LookupDomainByName failed")
} }
defer dom.Free() defer dom.Free()
// domain exists // domain exists
@@ -387,17 +387,17 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) {
domInfo, err := dom.GetInfo() domInfo, err := dom.GetInfo()
if err != nil { if err != nil {
// we don't know if the state is ok // we don't know if the state is ok
return false, errors.Wrapf(err, "domain.GetInfo failed") return false, errwrap.Wrapf(err, "domain.GetInfo failed")
} }
isPersistent, err := dom.IsPersistent() isPersistent, err := dom.IsPersistent()
if err != nil { if err != nil {
// we don't know if the state is ok // we don't know if the state is ok
return false, errors.Wrapf(err, "domain.IsPersistent failed") return false, errwrap.Wrapf(err, "domain.IsPersistent failed")
} }
isActive, err := dom.IsActive() isActive, err := dom.IsActive()
if err != nil { if err != nil {
// we don't know if the state is ok // we don't know if the state is ok
return false, errors.Wrapf(err, "domain.IsActive failed") return false, errwrap.Wrapf(err, "domain.IsActive failed")
} }
// check for persistence // check for persistence
@@ -407,16 +407,16 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) {
} }
if isPersistent { if isPersistent {
if err := dom.Undefine(); err != nil { if err := dom.Undefine(); err != nil {
return false, errors.Wrapf(err, "domain.Undefine failed") return false, errwrap.Wrapf(err, "domain.Undefine failed")
} }
log.Printf("%s[%s]: Domain undefined", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Domain undefined", obj.Kind(), obj.GetName())
} else { } else {
domXML, err := dom.GetXMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE) domXML, err := dom.GetXMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE)
if err != nil { if err != nil {
return false, errors.Wrapf(err, "domain.GetXMLDesc failed") return false, errwrap.Wrapf(err, "domain.GetXMLDesc failed")
} }
if _, err = obj.conn.DomainDefineXML(domXML); err != nil { if _, err = obj.conn.DomainDefineXML(domXML); err != nil {
return false, errors.Wrapf(err, "conn.DomainDefineXML failed") return false, errwrap.Wrapf(err, "conn.DomainDefineXML failed")
} }
log.Printf("%s[%s]: Domain defined", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Domain defined", obj.Kind(), obj.GetName())
} }
@@ -439,14 +439,14 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) {
} }
if isActive { // domain must be paused ? if isActive { // domain must be paused ?
if err := dom.Resume(); err != nil { if err := dom.Resume(); err != nil {
return false, errors.Wrapf(err, "domain.Resume failed") return false, errwrap.Wrapf(err, "domain.Resume failed")
} }
checkOK = false checkOK = false
log.Printf("%s[%s]: Domain resumed", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Domain resumed", obj.Kind(), obj.GetName())
break break
} }
if err := dom.Create(); err != nil { if err := dom.Create(); err != nil {
return false, errors.Wrapf(err, "domain.Create failed") return false, errwrap.Wrapf(err, "domain.Create failed")
} }
checkOK = false checkOK = false
log.Printf("%s[%s]: Domain created", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Domain created", obj.Kind(), obj.GetName())
@@ -460,14 +460,14 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) {
} }
if isActive { // domain must be running ? if isActive { // domain must be running ?
if err := dom.Suspend(); err != nil { if err := dom.Suspend(); err != nil {
return false, errors.Wrapf(err, "domain.Suspend failed") return false, errwrap.Wrapf(err, "domain.Suspend failed")
} }
checkOK = false checkOK = false
log.Printf("%s[%s]: Domain paused", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Domain paused", obj.Kind(), obj.GetName())
break break
} }
if err := dom.CreateWithFlags(libvirt.VIR_DOMAIN_START_PAUSED); err != nil { if err := dom.CreateWithFlags(libvirt.VIR_DOMAIN_START_PAUSED); err != nil {
return false, errors.Wrapf(err, "domain.CreateWithFlags failed") return false, errwrap.Wrapf(err, "domain.CreateWithFlags failed")
} }
checkOK = false checkOK = false
log.Printf("%s[%s]: Domain created paused", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Domain created paused", obj.Kind(), obj.GetName())
@@ -481,7 +481,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) {
} }
if err := dom.Destroy(); err != nil { if err := dom.Destroy(); err != nil {
return false, errors.Wrapf(err, "domain.Destroy failed") return false, errwrap.Wrapf(err, "domain.Destroy failed")
} }
checkOK = false checkOK = false
log.Printf("%s[%s]: Domain destroyed", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Domain destroyed", obj.Kind(), obj.GetName())
@@ -495,7 +495,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) {
// mem & cpu checks... // mem & cpu checks...
if !obj.absent { if !obj.absent {
if c, err := obj.attrCheckApply(apply); err != nil { if c, err := obj.attrCheckApply(apply); err != nil {
return false, errors.Wrapf(err, "attrCheckApply failed") return false, errwrap.Wrapf(err, "attrCheckApply failed")
} else if !c { } else if !c {
checkOK = false checkOK = false
} }

View File

@@ -32,7 +32,7 @@
- iptables -F - iptables -F
- cd /vagrant/mgmt/ && make path - cd /vagrant/mgmt/ && make path
- cd /vagrant/mgmt/ && make deps && make build && cp mgmt ~/bin/ - cd /vagrant/mgmt/ && make deps && make build && cp mgmt ~/bin/
- cd && mgmt run --file /vagrant/mgmt/examples/pkg1.yaml --converged-timeout=5 - cd && mgmt run --yaml /vagrant/mgmt/examples/pkg1.yaml --converged-timeout=5
:namespace: omv :namespace: omv
:count: 0 :count: 0
:username: '' :username: ''

View File

@@ -33,7 +33,7 @@
- iptables -F - iptables -F
- cd /vagrant/mgmt/ && make path - cd /vagrant/mgmt/ && make path
- cd /vagrant/mgmt/ && make deps && make build && cp mgmt ~/bin/ - cd /vagrant/mgmt/ && make deps && make build && cp mgmt ~/bin/
- cd && mgmt run --file /vagrant/mgmt/examples/pkg1.yaml --converged-timeout=5 - cd && mgmt run --yaml /vagrant/mgmt/examples/pkg1.yaml --converged-timeout=5
:namespace: omv :namespace: omv
:count: 0 :count: 0
:username: '' :username: ''

View File

@@ -7,7 +7,7 @@ if env | grep -q -e '^TRAVIS=true$'; then
fi fi
# run till completion # run till completion
timeout --kill-after=15s 10s ./mgmt run --file t2.yaml --converged-timeout=5 --no-watch --tmp-prefix & timeout --kill-after=15s 10s ./mgmt run --yaml t2.yaml --converged-timeout=5 --no-watch --tmp-prefix &
pid=$! pid=$!
wait $pid # get exit status wait $pid # get exit status
e=$? e=$?

View File

@@ -10,11 +10,11 @@ fi
mkdir -p "${MGMT_TMPDIR}"mgmt{A..C} mkdir -p "${MGMT_TMPDIR}"mgmt{A..C}
# run till completion # run till completion
timeout --kill-after=15s 10s ./mgmt run --file t3-a.yaml --converged-timeout=5 --no-watch --tmp-prefix & timeout --kill-after=15s 10s ./mgmt run --yaml t3-a.yaml --converged-timeout=5 --no-watch --tmp-prefix &
pid1=$! pid1=$!
timeout --kill-after=15s 10s ./mgmt run --file t3-b.yaml --converged-timeout=5 --no-watch --tmp-prefix & timeout --kill-after=15s 10s ./mgmt run --yaml t3-b.yaml --converged-timeout=5 --no-watch --tmp-prefix &
pid2=$! pid2=$!
timeout --kill-after=15s 10s ./mgmt run --file t3-c.yaml --converged-timeout=5 --no-watch --tmp-prefix & timeout --kill-after=15s 10s ./mgmt run --yaml t3-c.yaml --converged-timeout=5 --no-watch --tmp-prefix &
pid3=$! pid3=$!
wait $pid1 # get exit status wait $pid1 # get exit status

View File

@@ -1,7 +1,7 @@
#!/bin/bash -e #!/bin/bash -e
# should take slightly more than 25s, but fail if we take 35s) # should take slightly more than 25s, but fail if we take 35s)
timeout --kill-after=35s 30s ./mgmt run --file t4.yaml --converged-timeout=5 --no-watch --tmp-prefix & timeout --kill-after=35s 30s ./mgmt run --yaml t4.yaml --converged-timeout=5 --no-watch --tmp-prefix &
pid=$! pid=$!
wait $pid # get exit status wait $pid # get exit status
exit $? exit $?

View File

@@ -1,7 +1,7 @@
#!/bin/bash -e #!/bin/bash -e
# should take slightly more than 35s, but fail if we take 45s) # should take slightly more than 35s, but fail if we take 45s)
timeout --kill-after=45s 40s ./mgmt run --file t5.yaml --converged-timeout=5 --no-watch --tmp-prefix & timeout --kill-after=45s 40s ./mgmt run --yaml t5.yaml --converged-timeout=5 --no-watch --tmp-prefix &
pid=$! pid=$!
wait $pid # get exit status wait $pid # get exit status
exit $? exit $?

View File

@@ -7,7 +7,7 @@ if env | grep -q -e '^TRAVIS=true$'; then
fi fi
# run till completion # run till completion
timeout --kill-after=20s 15s ./mgmt run --file t6.yaml --no-watch --tmp-prefix & timeout --kill-after=20s 15s ./mgmt run --yaml t6.yaml --no-watch --tmp-prefix &
pid=$! pid=$!
sleep 1s # let it converge sleep 1s # let it converge
test -e /tmp/mgmt/f1 test -e /tmp/mgmt/f1

120
yamlgraph/gapi.go Normal file
View File

@@ -0,0 +1,120 @@
// Mgmt
// Copyright (C) 2013-2016+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package yamlgraph
import (
"fmt"
"log"
"sync"
"github.com/purpleidea/mgmt/gapi"
"github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/recwatch"
)
// GAPI implements the main yamlgraph GAPI interface.
type GAPI struct {
File *string // yaml graph definition to use; nil if undefined
data gapi.Data
initialized bool
closeChan chan struct{}
wg sync.WaitGroup // sync group for tunnel go routines
}
// NewGAPI creates a new yamlgraph GAPI struct and calls Init().
func NewGAPI(data gapi.Data, file *string) (*GAPI, error) {
obj := &GAPI{
File: file,
}
return obj, obj.Init(data)
}
// Init initializes the yamlgraph GAPI struct.
func (obj *GAPI) Init(data gapi.Data) error {
if obj.initialized {
return fmt.Errorf("Already initialized!")
}
if obj.File == nil {
return fmt.Errorf("The File param must be specified!")
}
obj.data = data // store for later
obj.closeChan = make(chan struct{})
obj.initialized = true
return nil
}
// Graph returns a current Graph.
func (obj *GAPI) Graph() (*pgraph.Graph, error) {
if !obj.initialized {
return nil, fmt.Errorf("yamlgraph: GAPI is not initialized")
}
config := ParseConfigFromFile(*obj.File)
if config == nil {
return nil, fmt.Errorf("yamlgraph: ParseConfigFromFile returned nil")
}
g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.EmbdEtcd, obj.data.Noop)
return g, err
}
// SwitchStream returns nil errors every time there could be a new graph.
func (obj *GAPI) SwitchStream() chan error {
if obj.data.NoWatch {
return nil
}
ch := make(chan error)
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
defer close(ch) // this will run before the obj.wg.Done()
if !obj.initialized {
ch <- fmt.Errorf("yamlgraph: GAPI is not initialized")
return
}
configChan := recwatch.ConfigWatch(*obj.File)
for {
select {
case err, ok := <-configChan: // returns nil events on ok!
if !ok { // the channel closed!
return
}
log.Printf("yamlgraph: Generating new graph...")
ch <- err // trigger a run
if err != nil {
return
}
case <-obj.closeChan:
return
}
}
}()
return ch
}
// Close shuts down the yamlgraph GAPI.
func (obj *GAPI) Close() error {
if !obj.initialized {
return fmt.Errorf("yamlgraph: GAPI is not initialized")
}
close(obj.closeChan)
obj.wg.Wait()
obj.initialized = false // closed = true
return nil
}

View File

@@ -15,8 +15,8 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
// Package gconfig provides the facilities for loading a graph from a yaml file. // Package yamlgraph provides the facilities for loading a graph from a yaml file.
package gconfig package yamlgraph
import ( import (
"errors" "errors"
@@ -27,7 +27,6 @@ import (
"strings" "strings"
"github.com/purpleidea/mgmt/etcd" "github.com/purpleidea/mgmt/etcd"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/global" "github.com/purpleidea/mgmt/global"
"github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/resources" "github.com/purpleidea/mgmt/resources"
@@ -74,7 +73,6 @@ type GraphConfig struct {
Collector []collectorResConfig `yaml:"collect"` Collector []collectorResConfig `yaml:"collect"`
Edges []Edge `yaml:"edges"` Edges []Edge `yaml:"edges"`
Comment string `yaml:"comment"` Comment string `yaml:"comment"`
Hostname string `yaml:"hostname"` // uuid for the host
Remote string `yaml:"remote"` Remote string `yaml:"remote"`
} }
@@ -89,36 +87,13 @@ func (c *GraphConfig) Parse(data []byte) error {
return nil return nil
} }
// ParseConfigFromFile takes a filename and returns the graph config structure. // NewGraphFromConfig transforms a GraphConfig struct into a new graph.
func ParseConfigFromFile(filename string) *GraphConfig { // FIXME: remove any possibly left over, now obsolete graph diff code from here!
data, err := ioutil.ReadFile(filename) func (c *GraphConfig) NewGraphFromConfig(hostname string, embdEtcd *etcd.EmbdEtcd, noop bool) (*pgraph.Graph, error) {
if err != nil { // hostname is the uuid for the host
log.Printf("Config: Error: ParseConfigFromFile: File: %v", err)
return nil
}
var config GraphConfig var graph *pgraph.Graph // new graph to return
if err := config.Parse(data); err != nil { graph = pgraph.NewGraph("Graph") // give graph a default name
log.Printf("Config: Error: ParseConfigFromFile: Parse: %v", err)
return nil
}
return &config
}
// NewGraphFromConfig returns a new graph from existing input, such as from the
// existing graph, and a GraphConfig struct.
func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtcd, noop bool) (*pgraph.Graph, error) {
if c.Hostname == "" {
return nil, fmt.Errorf("Config: Error: Hostname can't be empty!")
}
var graph *pgraph.Graph // new graph to return
if g == nil { // FIXME: how can we check for an empty graph?
graph = pgraph.NewGraph("Graph") // give graph a default name
} else {
graph = g.Copy() // same vertices, since they're pointers!
}
var lookup = make(map[string]map[string]*pgraph.Vertex) var lookup = make(map[string]map[string]*pgraph.Vertex)
@@ -148,9 +123,9 @@ func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtc
if !ok { if !ok {
return nil, fmt.Errorf("Config: Error: Can't convert: %v of type: %T to Res.", x, x) return nil, fmt.Errorf("Config: Error: Can't convert: %v of type: %T to Res.", x, x)
} }
if noop { //if noop { // now done in mgmtmain
res.Meta().Noop = noop // res.Meta().Noop = noop
} //}
if _, exists := lookup[kind]; !exists { if _, exists := lookup[kind]; !exists {
lookup[kind] = make(map[string]*pgraph.Vertex) lookup[kind] = make(map[string]*pgraph.Vertex)
} }
@@ -175,7 +150,7 @@ func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtc
} }
} }
// store in etcd // store in etcd
if err := etcd.EtcdSetResources(embdEtcd, c.Hostname, resourceList); err != nil { if err := etcd.EtcdSetResources(embdEtcd, hostname, resourceList); err != nil {
return nil, fmt.Errorf("Config: Could not export resources: %v", err) return nil, fmt.Errorf("Config: Could not export resources: %v", err)
} }
@@ -217,9 +192,9 @@ func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtc
matched = true matched = true
// collect resources but add the noop metaparam // collect resources but add the noop metaparam
if noop { //if noop { // now done in mgmtmain
res.Meta().Noop = noop // res.Meta().Noop = noop
} //}
if t.Pattern != "" { // XXX: simplistic for now if t.Pattern != "" { // XXX: simplistic for now
res.CollectPattern(t.Pattern) // res.Dirname = t.Pattern res.CollectPattern(t.Pattern) // res.Dirname = t.Pattern
@@ -244,15 +219,6 @@ func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtc
} }
} }
// get rid of any vertices we shouldn't "keep" (that aren't in new graph)
for _, v := range graph.GetVertices() {
if !pgraph.VertexContains(v, keep) {
// wait for exit before starting new graph!
v.SendEvent(event.EventExit, true, false)
graph.DeleteVertex(v)
}
}
for _, e := range c.Edges { for _, e := range c.Edges {
if _, ok := lookup[util.FirstToUpper(e.From.Kind)]; !ok { if _, ok := lookup[util.FirstToUpper(e.From.Kind)]; !ok {
return nil, fmt.Errorf("Can't find 'from' resource!") return nil, fmt.Errorf("Can't find 'from' resource!")
@@ -271,3 +237,20 @@ func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtc
return graph, nil return graph, nil
} }
// ParseConfigFromFile takes a filename and returns the graph config structure.
func ParseConfigFromFile(filename string) *GraphConfig {
data, err := ioutil.ReadFile(filename)
if err != nil {
log.Printf("Config: Error: ParseConfigFromFile: File: %v", err)
return nil
}
var config GraphConfig
if err := config.Parse(data); err != nil {
log.Printf("Config: Error: ParseConfigFromFile: Parse: %v", err)
return nil
}
return &config
}