diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 6501d8a8..870ee679 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -70,7 +70,7 @@ Older videos and other material [is available](https://github.com/purpleidea/mgm ##Setup During this prototype phase, the tool can be run out of the source directory. -You'll probably want to use ```./run.sh run --file examples/graph1.yaml``` to +You'll probably want to use ```./run.sh run --yaml examples/graph1.yaml``` to get started. Beware that this _can_ cause data loss. Understand what you're doing first, or perform these actions in a virtual environment such as the one provided by [Oh-My-Vagrant](https://github.com/purpleidea/oh-my-vagrant). @@ -422,7 +422,7 @@ you can probably figure out most of it, as it's fairly intuitive. The main interface to the `mgmt` tool is the command line. For the most recent documentation, please run `mgmt --help`. -####`--file ` +####`--yaml ` Point to a graph file to run. ####`--converged-timeout ` diff --git a/README.md b/README.md index cda7dab6..1a7846a8 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ cd $GOPATH/src/github.com/purpleidea/mgmt ``` * Get the remaining golang deps with `go get ./...`, or run `make deps` if you're comfortable with how we install them. * Run `make build` to get a freshly built `mgmt` binary. -* Run `time ./mgmt run --file examples/graph0.yaml --converged-timeout=5 --tmp-prefix` to try out a very simple example! +* Run `time ./mgmt run --yaml examples/graph0.yaml --converged-timeout=5 --tmp-prefix` to try out a very simple example! * To run continuously in the default mode of operation, omit the `--converged-timeout` option. * Have fun hacking on our future technology! diff --git a/etcd/etcd.go b/etcd/etcd.go index 725919a1..9feab727 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -37,11 +37,11 @@ // // Smoke testing: // mkdir /tmp/mgmt{A..E} -// ./mgmt run --file examples/etcd1a.yaml --hostname h1 --tmp-prefix -// ./mgmt run --file examples/etcd1b.yaml --hostname h2 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2381 --server-urls http://127.0.0.1:2382 -// ./mgmt run --file examples/etcd1c.yaml --hostname h3 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2383 --server-urls http://127.0.0.1:2384 +// ./mgmt run --yaml examples/etcd1a.yaml --hostname h1 --tmp-prefix +// ./mgmt run --yaml examples/etcd1b.yaml --hostname h2 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2381 --server-urls http://127.0.0.1:2382 +// ./mgmt run --yaml examples/etcd1c.yaml --hostname h3 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2383 --server-urls http://127.0.0.1:2384 // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/idealClusterSize 3 -// ./mgmt run --file examples/etcd1d.yaml --hostname h4 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2385 --server-urls http://127.0.0.1:2386 +// ./mgmt run --yaml examples/etcd1d.yaml --hostname h4 --tmp-prefix --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2385 --server-urls http://127.0.0.1:2386 // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/idealClusterSize 5 // ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list diff --git a/examples/lib/libmgmt1.go b/examples/lib/libmgmt1.go index b5ac3244..0865720c 100644 --- a/examples/lib/libmgmt1.go +++ b/examples/lib/libmgmt1.go @@ -6,24 +6,66 @@ import ( "log" "os" "os/signal" + "sync" "syscall" "time" - "github.com/purpleidea/mgmt/gconfig" + "github.com/purpleidea/mgmt/gapi" mgmt "github.com/purpleidea/mgmt/mgmtmain" + "github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/resources" + "github.com/purpleidea/mgmt/yamlgraph" ) -func generateGraphConfig() *gconfig.GraphConfig { +// MyGAPI implements the main GAPI interface. +type MyGAPI struct { + Name string // graph name + Interval uint // refresh interval, 0 to never refresh + + data gapi.Data + initialized bool + closeChan chan struct{} + wg sync.WaitGroup // sync group for tunnel go routines +} + +// NewMyGAPI creates a new MyGAPI struct and calls Init(). +func NewMyGAPI(data gapi.Data, name string, interval uint) (*MyGAPI, error) { + obj := &MyGAPI{ + Name: name, + Interval: interval, + } + return obj, obj.Init(data) +} + +// Init initializes the MyGAPI struct. +func (obj *MyGAPI) Init(data gapi.Data) error { + if obj.initialized { + return fmt.Errorf("Already initialized!") + } + if obj.Name == "" { + return fmt.Errorf("The graph name must be specified!") + } + obj.data = data // store for later + obj.closeChan = make(chan struct{}) + obj.initialized = true + return nil +} + +// Graph returns a current Graph. +func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { + if !obj.initialized { + return nil, fmt.Errorf("libmgmt: MyGAPI is not initialized") + } n1, err := resources.NewNoopRes("noop1") if err != nil { - return nil // error + return nil, fmt.Errorf("Can't create resource: %v", err) } - gc := &gconfig.GraphConfig{ - Graph: "libmgmt", - Resources: gconfig.Resources{ // must redefine anonymous struct :( + // we can still build a graph via the yaml method + gc := &yamlgraph.GraphConfig{ + Graph: obj.Name, + Resources: yamlgraph.Resources{ // must redefine anonymous struct :( // in alphabetical order Exec: []*resources.ExecRes{}, File: []*resources.FileRes{}, @@ -37,38 +79,74 @@ func generateGraphConfig() *gconfig.GraphConfig { //Collector: []collectorResConfig{}, //Edges: []Edge{}, Comment: "comment!", - //Hostname: "???", - //Remote: "???", } - return gc + + g, err := gc.NewGraphFromConfig(obj.data.Hostname, obj.data.EmbdEtcd, obj.data.Noop) + return g, err +} + +// SwitchStream returns nil errors every time there could be a new graph. +func (obj *MyGAPI) SwitchStream() chan error { + if obj.data.NoWatch || obj.Interval <= 0 { + return nil + } + ch := make(chan error) + obj.wg.Add(1) + go func() { + defer obj.wg.Done() + defer close(ch) // this will run before the obj.wg.Done() + if !obj.initialized { + ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") + return + } + + // arbitrarily change graph every interval seconds + ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + log.Printf("libmgmt: Generating new graph...") + ch <- nil // trigger a run + case <-obj.closeChan: + return + } + } + }() + return ch +} + +// Close shuts down the MyGAPI. +func (obj *MyGAPI) Close() error { + if !obj.initialized { + return fmt.Errorf("libmgmt: MyGAPI is not initialized") + } + close(obj.closeChan) + obj.wg.Wait() + obj.initialized = false // closed = true + return nil } // Run runs an embedded mgmt server. func Run() error { obj := &mgmt.Main{} - obj.Program = "mgmtlib" // TODO: set on compilation + obj.Program = "libmgmt" // TODO: set on compilation obj.Version = "0.0.1" // TODO: set on compilation obj.TmpPrefix = true obj.IdealClusterSize = -1 obj.ConvergedTimeout = -1 obj.Noop = true - obj.GAPI = generateGraphConfig // graph API function + obj.GAPI = &MyGAPI{ // graph API + Name: "libmgmt", // TODO: set on compilation + Interval: 15, // arbitrarily change graph every 15 seconds + } if err := obj.Init(); err != nil { return err } - go func() { - for { - log.Printf("Generating new graph...") - obj.Switch(generateGraphConfig) // pass in function to run... - - time.Sleep(15 * time.Second) // XXX: arbitrarily change graph every 30 seconds - } - }() - // install the exit signal handler exit := make(chan struct{}) defer close(exit) diff --git a/examples/lib/libmgmt2.go b/examples/lib/libmgmt2.go new file mode 100644 index 00000000..6a993dba --- /dev/null +++ b/examples/lib/libmgmt2.go @@ -0,0 +1,181 @@ +// libmgmt example +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/purpleidea/mgmt/gapi" + mgmt "github.com/purpleidea/mgmt/mgmtmain" + "github.com/purpleidea/mgmt/pgraph" + "github.com/purpleidea/mgmt/resources" +) + +// MyGAPI implements the main GAPI interface. +type MyGAPI struct { + Name string // graph name + Count uint // number of resources to create + Interval uint // refresh interval, 0 to never refresh + + data gapi.Data + initialized bool + closeChan chan struct{} + wg sync.WaitGroup // sync group for tunnel go routines +} + +// NewMyGAPI creates a new MyGAPI struct and calls Init(). +func NewMyGAPI(data gapi.Data, name string, interval uint, count uint) (*MyGAPI, error) { + obj := &MyGAPI{ + Name: name, + Count: count, + Interval: interval, + } + return obj, obj.Init(data) +} + +// Init initializes the MyGAPI struct. +func (obj *MyGAPI) Init(data gapi.Data) error { + if obj.initialized { + return fmt.Errorf("Already initialized!") + } + if obj.Name == "" { + return fmt.Errorf("The graph name must be specified!") + } + obj.data = data // store for later + obj.closeChan = make(chan struct{}) + obj.initialized = true + return nil +} + +// Graph returns a current Graph. +func (obj *MyGAPI) Graph() (*pgraph.Graph, error) { + if !obj.initialized { + return nil, fmt.Errorf("libmgmt: MyGAPI is not initialized") + } + + g := pgraph.NewGraph(obj.Name) + var vertex *pgraph.Vertex + for i := uint(0); i < obj.Count; i++ { + n, err := resources.NewNoopRes(fmt.Sprintf("noop%d", i)) + if err != nil { + return nil, fmt.Errorf("Can't create resource: %v", err) + } + v := pgraph.NewVertex(n) + g.AddVertex(v) + if i > 0 { + g.AddEdge(vertex, v, pgraph.NewEdge(fmt.Sprintf("e%d", i))) + } + vertex = v // save + } + + //g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.EmbdEtcd, obj.data.Noop) + return g, nil +} + +// SwitchStream returns nil errors every time there could be a new graph. +func (obj *MyGAPI) SwitchStream() chan error { + if obj.data.NoWatch || obj.Interval <= 0 { + return nil + } + ch := make(chan error) + obj.wg.Add(1) + go func() { + defer obj.wg.Done() + defer close(ch) // this will run before the obj.wg.Done() + if !obj.initialized { + ch <- fmt.Errorf("libmgmt: MyGAPI is not initialized") + return + } + + // arbitrarily change graph every interval seconds + ticker := time.NewTicker(time.Duration(obj.Interval) * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + log.Printf("libmgmt: Generating new graph...") + ch <- nil // trigger a run + case <-obj.closeChan: + return + } + } + }() + return ch +} + +// Close shuts down the MyGAPI. +func (obj *MyGAPI) Close() error { + if !obj.initialized { + return fmt.Errorf("libmgmt: MyGAPI is not initialized") + } + close(obj.closeChan) + obj.wg.Wait() + obj.initialized = false // closed = true + return nil +} + +// Run runs an embedded mgmt server. +func Run() error { + + obj := &mgmt.Main{} + obj.Program = "libmgmt" // TODO: set on compilation + obj.Version = "0.0.1" // TODO: set on compilation + obj.TmpPrefix = true + obj.IdealClusterSize = -1 + obj.ConvergedTimeout = -1 + obj.Noop = true + + obj.GAPI = &MyGAPI{ // graph API + Name: "libmgmt", // TODO: set on compilation + Count: 60, // number of vertices to add + Interval: 15, // arbitrarily change graph every 15 seconds + } + + if err := obj.Init(); err != nil { + return err + } + + // install the exit signal handler + exit := make(chan struct{}) + defer close(exit) + go func() { + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) // catch ^C + //signal.Notify(signals, os.Kill) // catch signals + signal.Notify(signals, syscall.SIGTERM) + + select { + case sig := <-signals: // any signal will do + if sig == os.Interrupt { + log.Println("Interrupted by ^C") + obj.Exit(nil) + return + } + log.Println("Interrupted by signal") + obj.Exit(fmt.Errorf("Killed by %v", sig)) + return + case <-exit: + return + } + }() + + if err := obj.Run(); err != nil { + return err + } + return nil +} + +func main() { + log.Printf("Hello!") + if err := Run(); err != nil { + fmt.Println(err) + os.Exit(1) + return + } + log.Printf("Goodbye!") +} diff --git a/gapi/gapi.go b/gapi/gapi.go new file mode 100644 index 00000000..67d48247 --- /dev/null +++ b/gapi/gapi.go @@ -0,0 +1,41 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Package gapi defines the interface that graph API generators must meet. +package gapi + +import ( + "github.com/purpleidea/mgmt/etcd" + "github.com/purpleidea/mgmt/pgraph" +) + +// Data is the set of input values passed into the GAPI structs via Init. +type Data struct { + Hostname string // uuid for the host, required for GAPI + EmbdEtcd *etcd.EmbdEtcd + Noop bool + NoWatch bool + // NOTE: we can add more fields here if needed by GAPI endpoints +} + +// GAPI is a Graph API that represents incoming graphs and change streams. +type GAPI interface { + Init(Data) error // initializes the GAPI and passes in useful data + Graph() (*pgraph.Graph, error) // returns the most recent pgraph + SwitchStream() chan error // returns a stream of switch events + Close() error // shutdown the GAPI +} diff --git a/mgmtmain/cli.go b/mgmtmain/cli.go index 08687aa0..eb90b462 100644 --- a/mgmtmain/cli.go +++ b/mgmtmain/cli.go @@ -24,6 +24,9 @@ import ( "os/signal" "syscall" + "github.com/purpleidea/mgmt/puppet" + "github.com/purpleidea/mgmt/yamlgraph" + "github.com/urfave/cli" ) @@ -35,24 +38,44 @@ func run(c *cli.Context) error { obj.Program = c.App.Name obj.Version = c.App.Version + if h := c.String("hostname"); c.IsSet("hostname") && h != "" { + obj.Hostname = &h + } + if s := c.String("prefix"); c.IsSet("prefix") && s != "" { obj.Prefix = &s } obj.TmpPrefix = c.Bool("tmp-prefix") obj.AllowTmpPrefix = c.Bool("allow-tmp-prefix") - if h := c.String("hostname"); c.IsSet("hostname") && h != "" { - obj.Hostname = &h + if _ = c.String("code"); c.IsSet("code") { + if obj.GAPI != nil { + return fmt.Errorf("Can't combine code GAPI with existing GAPI.") + } + // TODO: implement DSL GAPI + //obj.GAPI = &dsl.GAPI{ + // Code: &s, + //} + return fmt.Errorf("The Code GAPI is not implemented yet!") // TODO: DSL } - - if f := c.String("file"); c.IsSet("file") { - obj.File = &f + if y := c.String("yaml"); c.IsSet("yaml") { + if obj.GAPI != nil { + return fmt.Errorf("Can't combine YAML GAPI with existing GAPI.") + } + obj.GAPI = &yamlgraph.GAPI{ + File: &y, + } } if p := c.String("puppet"); c.IsSet("puppet") { - obj.Puppet = &p + if obj.GAPI != nil { + return fmt.Errorf("Can't combine puppet GAPI with existing GAPI.") + } + obj.GAPI = &puppet.GAPI{ + PuppetParam: &p, + PuppetConf: c.String("puppet-conf"), + } } - obj.PuppetConf = c.String("puppet-conf") - obj.Remotes = c.StringSlice("remote") + obj.Remotes = c.StringSlice("remote") // FIXME: GAPI-ify somehow? obj.NoWatch = c.Bool("no-watch") obj.Noop = c.Bool("noop") @@ -129,6 +152,13 @@ func CLI(program, version string) error { Usage: "run", Action: run, Flags: []cli.Flag{ + // useful for testing multiple instances on same machine + cli.StringFlag{ + Name: "hostname", + Value: "", + Usage: "hostname to use", + }, + cli.StringFlag{ Name: "prefix", Usage: "specify a path to the working prefix directory", @@ -143,23 +173,15 @@ func CLI(program, version string) error { Usage: "allow creation of a new temporary prefix if main prefix is unavailable", }, - // useful for testing multiple instances on same machine - cli.StringFlag{ - Name: "hostname", - Value: "", - Usage: "hostname to use", - }, - cli.StringFlag{ Name: "code, c", Value: "", Usage: "code definition to run", }, cli.StringFlag{ - Name: "file, f", - Value: "", - Usage: "graph definition to run", - EnvVar: "MGMT_FILE", + Name: "yaml", + Value: "", + Usage: "yaml graph definition to run", }, cli.StringFlag{ Name: "puppet, p", @@ -179,7 +201,7 @@ func CLI(program, version string) error { cli.BoolFlag{ Name: "no-watch", - Usage: "do not update graph on watched graph definition file changes", + Usage: "do not update graph on stream switch events", }, cli.BoolFlag{ Name: "noop", diff --git a/mgmtmain/main.go b/mgmtmain/main.go index 538916dd..3d253a4a 100644 --- a/mgmtmain/main.go +++ b/mgmtmain/main.go @@ -27,15 +27,16 @@ import ( "github.com/purpleidea/mgmt/converger" "github.com/purpleidea/mgmt/etcd" - "github.com/purpleidea/mgmt/gconfig" + "github.com/purpleidea/mgmt/gapi" "github.com/purpleidea/mgmt/pgraph" - "github.com/purpleidea/mgmt/puppet" "github.com/purpleidea/mgmt/recwatch" "github.com/purpleidea/mgmt/remote" "github.com/purpleidea/mgmt/util" etcdtypes "github.com/coreos/etcd/pkg/types" "github.com/coreos/pkg/capnslog" + multierr "github.com/hashicorp/go-multierror" + errwrap "github.com/pkg/errors" ) // Main is the main struct for running the mgmt logic. @@ -43,17 +44,14 @@ type Main struct { Program string // the name of this program, usually set at compile time Version string // the version of this program, usually set at compile time + Hostname *string // hostname to use; nil if undefined + Prefix *string // prefix passed in; nil if undefined TmpPrefix bool // request a pseudo-random, temporary prefix to be used AllowTmpPrefix bool // allow creation of a new temporary prefix if main prefix is unavailable - Hostname *string // hostname to use; nil if undefined - - File *string // graph file to run; nil if undefined - Puppet *string // puppet mode to run; nil if undefined - PuppetConf string // the path to an alternate puppet.conf file - GAPI func() *gconfig.GraphConfig // graph API; nil if undefined - Remotes []string // list of remote graph definitions to run + GAPI gapi.GAPI // graph API interface struct + Remotes []string // list of remote graph definitions to run NoWatch bool // do not update graph on watched graph definition file changes Noop bool // globally force all resources into no-op mode @@ -82,8 +80,7 @@ type Main struct { serverURLs etcdtypes.URLs // processed server urls value idealClusterSize uint16 // processed ideal cluster size value - exit chan error // exit signal - switchChan chan func() *gconfig.GraphConfig // graph switches + exit chan error // exit signal } // Init initializes the main struct after it performs some validation. @@ -97,10 +94,6 @@ func (obj *Main) Init() error { return fmt.Errorf("Choosing a prefix and the request for a tmp prefix is illogical!") } - if obj.File != nil && obj.Puppet != nil { - return fmt.Errorf("The File and Puppet parameters cannot be used together!") - } - obj.idealClusterSize = uint16(obj.IdealClusterSize) if obj.IdealClusterSize < 0 { // value is undefined, set to the default obj.idealClusterSize = etcd.DefaultIdealClusterSize @@ -152,7 +145,6 @@ func (obj *Main) Init() error { } obj.exit = make(chan error) - obj.switchChan = make(chan func() *gconfig.GraphConfig) return nil } @@ -161,14 +153,6 @@ func (obj *Main) Exit(err error) { obj.exit <- err // trigger an exit! } -// Switch causes mgmt try to switch the currently running graph to a new one. -// The function passed in will usually be called immediately, but it can also -// happen after a delay, and more often than this Switch function is called! -func (obj *Main) Switch(f func() *gconfig.GraphConfig) { - obj.switchChan <- f - // TODO: should we get an ACK() and pass back a return value ? -} - // Run is the main execution entrypoint to run mgmt. func (obj *Main) Run() error { @@ -192,19 +176,15 @@ func (obj *Main) Run() error { log.Printf("This is: %s, version: %s", obj.Program, obj.Version) log.Printf("Main: Start: %v", start) - var hostname, _ = os.Hostname() - // allow passing in the hostname, instead of using --hostname - if obj.File != nil { - if config := gconfig.ParseConfigFromFile(*obj.File); config != nil { - if h := config.Hostname; h != "" { - hostname = h - } - } + hostname, err := os.Hostname() // a sensible default + // allow passing in the hostname, instead of using the system setting + if h := obj.Hostname; h != nil && *h != "" { // override by cli + hostname = *h + } else if err != nil { + return errwrap.Wrapf(err, "Can't get default hostname!") } - if obj.Hostname != nil { // override by cli - if h := obj.Hostname; *h != "" { - hostname = *h - } + if hostname == "" { // safety check + return fmt.Errorf("Hostname cannot be empty!") } var prefix = fmt.Sprintf("/var/lib/%s/", obj.Program) // default prefix @@ -215,7 +195,7 @@ func (obj *Main) Run() error { if obj.TmpPrefix || os.MkdirAll(prefix, 0770) != nil { if obj.TmpPrefix || obj.AllowTmpPrefix { var err error - if prefix, err = ioutil.TempDir("", obj.Program+"-"); err != nil { + if prefix, err = ioutil.TempDir("", obj.Program+"-"+hostname+"-"); err != nil { return fmt.Errorf("Main: Error: Can't create temporary prefix!") } log.Println("Main: Warning: Working prefix directory is temporary!") @@ -227,7 +207,7 @@ func (obj *Main) Run() error { log.Printf("Main: Working prefix is: %s", prefix) var wg sync.WaitGroup - var G, fullGraph *pgraph.Graph + var G, oldGraph *pgraph.Graph // exit after `max-runtime` seconds for no reason at all... if i := obj.MaxRuntime; i > 0 { @@ -285,19 +265,26 @@ func (obj *Main) Run() error { converger.SetStateFn(convergerStateFn) } + var gapiChan chan error // stream events are nil errors + if obj.GAPI != nil { + data := gapi.Data{ + Hostname: hostname, + EmbdEtcd: EmbdEtcd, + Noop: obj.Noop, + NoWatch: obj.NoWatch, + } + if err := obj.GAPI.Init(data); err != nil { + obj.Exit(fmt.Errorf("Main: GAPI: Init failed: %v", err)) + } else if !obj.NoWatch { + gapiChan = obj.GAPI.SwitchStream() // stream of graph switch events! + } + } + exitchan := make(chan struct{}) // exit on close go func() { startchan := make(chan struct{}) // start signal go func() { startchan <- struct{}{} }() - var configChan chan error - var puppetChan <-chan time.Time - var customFunc = obj.GAPI // default - if !obj.NoWatch && obj.File != nil { - configChan = recwatch.ConfigWatch(*obj.File) - } else if obj.Puppet != nil { - interval := puppet.PuppetInterval(obj.PuppetConf) - puppetChan = time.Tick(time.Duration(interval) * time.Second) - } + log.Println("Etcd: Starting...") etcdchan := etcd.EtcdWatch(EmbdEtcd) first := true // first loop or not @@ -313,60 +300,42 @@ func (obj *Main) Run() error { } // everything else passes through to cause a compile! - case customFunc = <-obj.switchChan: - // handle a graph switch with a new custom function - obj.GAPI = customFunc - - case <-puppetChan: - // nothing, just go on - - case e := <-configChan: - if obj.NoWatch { - continue // not ready to read config + case err, ok := <-gapiChan: + if !ok { // channel closed + continue } - if e != nil { - obj.Exit(e) // trigger exit + if err != nil { + obj.Exit(err) // trigger exit continue //return // TODO: return or wait for exitchan? } - // XXX: case compile_event: ... - // ... + if obj.NoWatch { // extra safety for bad GAPI's + log.Printf("Main: GAPI stream should be quiet with NoWatch!") // fix the GAPI! + continue // no stream events should be sent + } + case <-exitchan: return } - var config *gconfig.GraphConfig - if obj.File != nil { - config = gconfig.ParseConfigFromFile(*obj.File) - } else if obj.Puppet != nil { - config = puppet.ParseConfigFromPuppet(*obj.Puppet, obj.PuppetConf) - } else if obj.GAPI != nil { - config = obj.GAPI() - } - - if config == nil { - log.Printf("Config: Parse failure") + if obj.GAPI == nil { + log.Printf("Config: GAPI is empty!") continue } - if config.Hostname != "" && config.Hostname != hostname { - log.Printf("Config: Hostname changed, ignoring config!") - continue - } - config.Hostname = hostname // set it in case it was "" - + // we need the vertices to be paused to work on them, so // run graph vertex LOCK... if !first { // TODO: we can flatten this check out I think converger.Pause() // FIXME: add sync wait? G.Pause() // sync + + //G.UnGroup() // FIXME: implement me if needed! } - // build graph from config struct on events, eg: etcd... - // we need the vertices to be paused to work on them - if newFullgraph, err := config.NewGraphFromConfig(fullGraph, EmbdEtcd, obj.Noop); err == nil { // keep references to all original elements - fullGraph = newFullgraph - } else { - log.Printf("Config: Error making new graph from config: %v", err) + // make the graph from yaml, lib, puppet->yaml, or dsl! + newGraph, err := obj.GAPI.Graph() // generate graph! + if err != nil { + log.Printf("Config: Error creating new graph: %v", err) // unpause! if !first { G.Start(&wg, first) // sync @@ -375,18 +344,40 @@ func (obj *Main) Run() error { continue } - G = fullGraph.Copy() // copy to active graph - // XXX: do etcd transaction out here... + // apply the global noop parameter if requested + if obj.Noop { + for _, m := range newGraph.GraphMetas() { + m.Noop = obj.Noop + } + } + + // FIXME: make sure we "UnGroup()" any semi-destructive + // changes to the resources so our efficient GraphSync + // will be able to re-use and cmp to the old graph. + newFullGraph, err := newGraph.GraphSync(oldGraph) + if err != nil { + log.Printf("Config: Error running graph sync: %v", err) + // unpause! + if !first { + G.Start(&wg, first) // sync + converger.Start() // after G.Start() + } + continue + } + oldGraph = newFullGraph // save old graph + G = oldGraph.Copy() // copy to active graph + G.AutoEdges() // add autoedges; modifies the graph G.AutoGroup() // run autogroup; modifies the graph // TODO: do we want to do a transitive reduction? log.Printf("Graph: %v", G) // show graph - err := G.ExecGraphviz(obj.GraphvizFilter, obj.Graphviz) - if err != nil { - log.Printf("Graphviz: %v", err) - } else { - log.Printf("Graphviz: Successfully generated graph!") + if obj.GraphvizFilter != "" { + if err := G.ExecGraphviz(obj.GraphvizFilter, obj.Graphviz); err != nil { + log.Printf("Graphviz: %v", err) + } else { + log.Printf("Graphviz: Successfully generated graph!") + } } G.AssociateData(converger) // G.Start(...) needs to be synchronous or wait, @@ -444,17 +435,27 @@ func (obj *Main) Run() error { // wait for etcd to be running before we remote in, which we do above! go remotes.Run() - if obj.File == nil && obj.Puppet == nil && obj.GAPI == nil { + if obj.GAPI == nil { converger.Start() // better start this for empty graphs } log.Println("Main: Running...") - err := <-obj.exit // wait for exit signal + reterr := <-obj.exit // wait for exit signal log.Println("Destroy...") - configWatcher.Close() // stop sending file changes to remotes - remotes.Exit() // tell all the remote connections to shutdown; waits! + if obj.GAPI != nil { + if err := obj.GAPI.Close(); err != nil { + err = errwrap.Wrapf(err, "GAPI closed poorly!") + reterr = multierr.Append(reterr, err) // list of errors + } + } + + configWatcher.Close() // stop sending file changes to remotes + if err := remotes.Exit(); err != nil { // tell all the remote connections to shutdown; waits! + err = errwrap.Wrapf(err, "Remote exited poorly!") + reterr = multierr.Append(reterr, err) // list of errors + } G.Exit() // tell all the children to exit @@ -463,7 +464,8 @@ func (obj *Main) Run() error { // cleanup etcd main loop last so it can process everything first if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd - log.Printf("Etcd exited poorly with: %v", err) + err = errwrap.Wrapf(err, "Etcd exited poorly!") + reterr = multierr.Append(reterr, err) // list of errors } if obj.DEBUG { @@ -474,5 +476,5 @@ func (obj *Main) Run() error { // TODO: wait for each vertex to exit... log.Println("Goodbye!") - return err + return reterr } diff --git a/pgraph/autogroup.go b/pgraph/autogroup.go index 6ae8f3a7..637a494d 100644 --- a/pgraph/autogroup.go +++ b/pgraph/autogroup.go @@ -22,6 +22,8 @@ import ( "log" "github.com/purpleidea/mgmt/global" + + errwrap "github.com/pkg/errors" ) // AutoGrouper is the required interface to implement for an autogroup algorithm @@ -283,8 +285,8 @@ func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) g.DeleteVertex(v2) // remove grouped vertex // 5) creation of a cyclic graph should throw an error - if _, dag := g.TopologicalSort(); !dag { // am i a dag or not? - return fmt.Errorf("Graph is not a dag!") + if _, err := g.TopologicalSort(); err != nil { // am i a dag or not? + return errwrap.Wrapf(err, "TopologicalSort failed") // not a dag } return nil // success } diff --git a/pgraph/pgraph.go b/pgraph/pgraph.go index ef888bb0..5c0a1066 100644 --- a/pgraph/pgraph.go +++ b/pgraph/pgraph.go @@ -19,7 +19,6 @@ package pgraph import ( - "errors" "fmt" "io/ioutil" "log" @@ -36,6 +35,8 @@ import ( "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/global" "github.com/purpleidea/mgmt/resources" + + errwrap "github.com/pkg/errors" ) //go:generate stringer -type=graphState -output=graphstate_stringer.go @@ -163,6 +164,18 @@ func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) { g.Adjacency[v1][v2] = e } +// DeleteEdge deletes a particular edge from the graph. +// FIXME: add test cases +func (g *Graph) DeleteEdge(e *Edge) { + for v1 := range g.Adjacency { + for v2, edge := range g.Adjacency[v1] { + if e == edge { + delete(g.Adjacency[v1], v2) + } + } + } +} + // GetVertexMatch searches for an equivalent resource in the graph and returns // the vertex it is found in, or nil if not found. func (g *Graph) GetVertexMatch(obj resources.Res) *Vertex { @@ -285,11 +298,11 @@ func (g *Graph) ExecGraphviz(program, filename string) error { switch program { case "dot", "neato", "twopi", "circo", "fdp": default: - return errors.New("Invalid graphviz program selected!") + return fmt.Errorf("Invalid graphviz program selected!") } if filename == "" { - return errors.New("No filename given!") + return fmt.Errorf("No filename given!") } // run as a normal user if possible when run with sudo @@ -298,18 +311,18 @@ func (g *Graph) ExecGraphviz(program, filename string) error { err := ioutil.WriteFile(filename, []byte(g.Graphviz()), 0644) if err != nil { - return errors.New("Error writing to filename!") + return fmt.Errorf("Error writing to filename!") } if err1 == nil && err2 == nil { if err := os.Chown(filename, uid, gid); err != nil { - return errors.New("Error changing file owner!") + return fmt.Errorf("Error changing file owner!") } } path, err := exec.LookPath(program) if err != nil { - return errors.New("Graphviz is missing!") + return fmt.Errorf("Graphviz is missing!") } out := fmt.Sprintf("%v.png", filename) @@ -324,7 +337,7 @@ func (g *Graph) ExecGraphviz(program, filename string) error { } _, err = cmd.Output() if err != nil { - return errors.New("Error writing to image!") + return fmt.Errorf("Error writing to image!") } return nil } @@ -468,7 +481,7 @@ func (g *Graph) OutDegree() map[*Vertex]int { // TopologicalSort returns the sort of graph vertices in that order. // based on descriptions and code from wikipedia and rosetta code // TODO: add memoization, and cache invalidation to speed this up :) -func (g *Graph) TopologicalSort() (result []*Vertex, ok bool) { // kahn's algorithm +func (g *Graph) TopologicalSort() ([]*Vertex, error) { // kahn's algorithm var L []*Vertex // empty list that will contain the sorted elements var S []*Vertex // set of all nodes with no incoming edges remaining := make(map[*Vertex]int) // amount of edges remaining @@ -505,13 +518,13 @@ func (g *Graph) TopologicalSort() (result []*Vertex, ok bool) { // kahn's algori if in > 0 { for n := range g.Adjacency[c] { if remaining[n] > 0 { - return nil, false // not a dag! + return nil, fmt.Errorf("Not a dag!") } } } } - return L, true + return L, nil } // Reachability finds the shortest path in a DAG from a to b, and returns the @@ -939,6 +952,94 @@ func (g *Graph) Exit() { } } +// GraphSync updates the oldGraph so that it matches the newGraph receiver. It +// leaves identical elements alone so that they don't need to be refreshed. +// FIXME: add test cases +func (g *Graph) GraphSync(oldGraph *Graph) (*Graph, error) { + + if oldGraph == nil { + oldGraph = NewGraph(g.GetName()) // copy over the name + } + oldGraph.SetName(g.GetName()) // overwrite the name + + var lookup = make(map[*Vertex]*Vertex) + var vertexKeep []*Vertex // list of vertices which are the same in new graph + var edgeKeep []*Edge // list of vertices which are the same in new graph + + for v := range g.Adjacency { // loop through the vertices (resources) + res := v.Res // resource + + vertex := oldGraph.GetVertexMatch(res) + if vertex == nil { // no match found + if err := res.Init(); err != nil { + return nil, errwrap.Wrapf(err, "could not Init() resource") + } + vertex = NewVertex(res) + oldGraph.AddVertex(vertex) // call standalone in case not part of an edge + } + lookup[v] = vertex // used for constructing edges + vertexKeep = append(vertexKeep, vertex) // append + } + + // get rid of any vertices we shouldn't keep (that aren't in new graph) + for v := range oldGraph.Adjacency { + if !VertexContains(v, vertexKeep) { + // wait for exit before starting new graph! + v.SendEvent(event.EventExit, true, false) + oldGraph.DeleteVertex(v) + } + } + + // compare edges + for v1 := range g.Adjacency { // loop through the vertices (resources) + for v2, e := range g.Adjacency[v1] { + // we have an edge! + + // lookup vertices (these should exist now) + //res1 := v1.Res // resource + //res2 := v2.Res + //vertex1 := oldGraph.GetVertexMatch(res1) + //vertex2 := oldGraph.GetVertexMatch(res2) + vertex1, exists1 := lookup[v1] + vertex2, exists2 := lookup[v2] + if !exists1 || !exists2 { // no match found, bug? + //if vertex1 == nil || vertex2 == nil { // no match found + return nil, fmt.Errorf("New vertices weren't found!") // programming error + } + + edge, exists := oldGraph.Adjacency[vertex1][vertex2] + if !exists || edge.Name != e.Name { // TODO: edgeCmp + edge = e // use or overwrite edge + } + oldGraph.Adjacency[vertex1][vertex2] = edge // store it (AddEdge) + edgeKeep = append(edgeKeep, edge) // mark as saved + } + } + + // delete unused edges + for v1 := range oldGraph.Adjacency { + for _, e := range oldGraph.Adjacency[v1] { + // we have an edge! + if !EdgeContains(e, edgeKeep) { + oldGraph.DeleteEdge(e) + } + } + } + + return oldGraph, nil +} + +// GraphMetas returns a list of pointers to each of the resource MetaParams. +func (g *Graph) GraphMetas() []*resources.MetaParams { + metas := []*resources.MetaParams{} + for v := range g.Adjacency { // loop through the vertices (resources)) + res := v.Res // resource + meta := res.Meta() + metas = append(metas, meta) + } + return metas +} + // AssociateData associates some data with the object in the graph in question func (g *Graph) AssociateData(converger converger.Converger) { for v := range g.GetVerticesChan() { @@ -956,6 +1057,16 @@ func VertexContains(needle *Vertex, haystack []*Vertex) bool { return false } +// EdgeContains is an "in array" function to test for an edge in a slice of edges. +func EdgeContains(needle *Edge, haystack []*Edge) bool { + for _, v := range haystack { + if needle == v { + return true + } + } + return false +} + // Reverse reverses a list of vertices. func Reverse(vs []*Vertex) []*Vertex { //var out []*Vertex // XXX: golint suggests, but it fails testing diff --git a/pgraph/pgraph_test.go b/pgraph/pgraph_test.go index cad42aed..4515c33e 100644 --- a/pgraph/pgraph_test.go +++ b/pgraph/pgraph_test.go @@ -352,11 +352,11 @@ func TestPgraphT9(t *testing.T) { t.Errorf("Outdegree of v6 should be 0 instead of: %d.", i) } - s, ok := G.TopologicalSort() + s, err := G.TopologicalSort() // either possibility is a valid toposort match := reflect.DeepEqual(s, []*Vertex{v1, v2, v3, v4, v5, v6}) || reflect.DeepEqual(s, []*Vertex{v1, v3, v2, v4, v5, v6}) - if !ok || !match { - t.Errorf("Topological sort failed, status: %v.", ok) + if err != nil || !match { + t.Errorf("Topological sort failed, error: %v.", err) str := "Found:" for _, v := range s { str += " " + v.Res.GetName() @@ -387,8 +387,8 @@ func TestPgraphT10(t *testing.T) { G.AddEdge(v5, v6, e5) G.AddEdge(v4, v2, e6) // cycle - if _, ok := G.TopologicalSort(); ok { - t.Errorf("Topological sort passed, but graph is cyclic.") + if _, err := G.TopologicalSort(); err == nil { + t.Errorf("Topological sort passed, but graph is cyclic!") } } diff --git a/puppet/gapi.go b/puppet/gapi.go new file mode 100644 index 00000000..4faeb0a6 --- /dev/null +++ b/puppet/gapi.go @@ -0,0 +1,121 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package puppet + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/purpleidea/mgmt/gapi" + "github.com/purpleidea/mgmt/pgraph" +) + +// GAPI implements the main puppet GAPI interface. +type GAPI struct { + PuppetParam *string // puppet mode to run; nil if undefined + PuppetConf string // the path to an alternate puppet.conf file + + data gapi.Data + initialized bool + closeChan chan struct{} + wg sync.WaitGroup // sync group for tunnel go routines +} + +// NewGAPI creates a new puppet GAPI struct and calls Init(). +func NewGAPI(data gapi.Data, puppetParam *string, puppetConf string) (*GAPI, error) { + obj := &GAPI{ + PuppetParam: puppetParam, + PuppetConf: puppetConf, + } + return obj, obj.Init(data) +} + +// Init initializes the puppet GAPI struct. +func (obj *GAPI) Init(data gapi.Data) error { + if obj.initialized { + return fmt.Errorf("Already initialized!") + } + if obj.PuppetParam == nil { + return fmt.Errorf("The PuppetParam param must be specified!") + } + obj.data = data // store for later + obj.closeChan = make(chan struct{}) + obj.initialized = true + return nil +} + +// Graph returns a current Graph. +func (obj *GAPI) Graph() (*pgraph.Graph, error) { + if !obj.initialized { + return nil, fmt.Errorf("Puppet: GAPI is not initialized!") + } + config := ParseConfigFromPuppet(*obj.PuppetParam, obj.PuppetConf) + if config == nil { + return nil, fmt.Errorf("Puppet: ParseConfigFromPuppet returned nil!") + } + g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.EmbdEtcd, obj.data.Noop) + return g, err +} + +// SwitchStream returns nil errors every time there could be a new graph. +func (obj *GAPI) SwitchStream() chan error { + if obj.data.NoWatch { + return nil + } + puppetChan := func() <-chan time.Time { // helper function + return time.Tick(time.Duration(PuppetInterval(obj.PuppetConf)) * time.Second) + } + ch := make(chan error) + obj.wg.Add(1) + go func() { + defer obj.wg.Done() + defer close(ch) // this will run before the obj.wg.Done() + if !obj.initialized { + ch <- fmt.Errorf("Puppet: GAPI is not initialized!") + return + } + pChan := puppetChan() + for { + select { + case _, ok := <-pChan: + if !ok { // the channel closed! + return + } + log.Printf("Puppet: Generating new graph...") + pChan = puppetChan() // TODO: okay to update interval in case it changed? + ch <- nil // trigger a run + case <-obj.closeChan: + return + } + } + }() + return ch +} + +// Close shuts down the Puppet GAPI. +func (obj *GAPI) Close() error { + if !obj.initialized { + return fmt.Errorf("Puppet: GAPI is not initialized!") + } + close(obj.closeChan) + obj.wg.Wait() + obj.initialized = false // closed = true + return nil +} diff --git a/puppet/puppet.go b/puppet/puppet.go index 168e4e85..4e9974ed 100644 --- a/puppet/puppet.go +++ b/puppet/puppet.go @@ -26,8 +26,8 @@ import ( "strconv" "strings" - "github.com/purpleidea/mgmt/gconfig" "github.com/purpleidea/mgmt/global" + "github.com/purpleidea/mgmt/yamlgraph" ) const ( @@ -87,7 +87,7 @@ func runPuppetCommand(cmd *exec.Cmd) ([]byte, error) { // ParseConfigFromPuppet takes a special puppet param string and config and // returns the graph configuration structure. -func ParseConfigFromPuppet(puppetParam, puppetConf string) *gconfig.GraphConfig { +func ParseConfigFromPuppet(puppetParam, puppetConf string) *yamlgraph.GraphConfig { var puppetConfArg string if puppetConf != "" { puppetConfArg = "--config=" + puppetConf @@ -104,7 +104,7 @@ func ParseConfigFromPuppet(puppetParam, puppetConf string) *gconfig.GraphConfig log.Println("Puppet: launching translator") - var config gconfig.GraphConfig + var config yamlgraph.GraphConfig if data, err := runPuppetCommand(cmd); err != nil { return nil } else if err := config.Parse(data); err != nil { diff --git a/remote/remote.go b/remote/remote.go index 4994006a..779d567a 100644 --- a/remote/remote.go +++ b/remote/remote.go @@ -62,12 +62,14 @@ import ( "time" cv "github.com/purpleidea/mgmt/converger" - "github.com/purpleidea/mgmt/gconfig" "github.com/purpleidea/mgmt/global" "github.com/purpleidea/mgmt/util" + "github.com/purpleidea/mgmt/yamlgraph" + multierr "github.com/hashicorp/go-multierror" "github.com/howeyc/gopass" "github.com/kardianos/osext" + errwrap "github.com/pkg/errors" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" ) @@ -491,7 +493,7 @@ func (obj *SSH) Exec() error { // TODO: do something less arbitrary about which one we pick? url := cleanURL(obj.remoteURLs[0]) // arbitrarily pick the first one seeds := fmt.Sprintf("--no-server --seeds 'http://%s'", url) // XXX: escape untrusted input? (or check if url is valid) - file := fmt.Sprintf("--file '%s'", obj.filepath) // XXX: escape untrusted input! (or check if file path exists) + file := fmt.Sprintf("--yaml '%s'", obj.filepath) // XXX: escape untrusted input! (or check if file path exists) depth := fmt.Sprintf("--depth %d", obj.depth+1) // child is +1 distance args := []string{hostname, seeds, file, depth} if obj.noop { @@ -734,7 +736,7 @@ func NewRemotes(clientURLs, remoteURLs []string, noop bool, remotes []string, fi // It takes as input the path to a graph definition file. func (obj *Remotes) NewSSH(file string) (*SSH, error) { // first do the parsing... - config := gconfig.ParseConfigFromFile(file) + config := yamlgraph.ParseConfigFromFile(file) // FIXME: GAPI-ify somehow? if config == nil { return nil, fmt.Errorf("Remote: Error parsing remote graph: %s", file) } @@ -791,7 +793,8 @@ func (obj *Remotes) NewSSH(file string) (*SSH, error) { return nil, fmt.Errorf("No authentication methods available!") } - hostname := config.Hostname + //hostname := config.Hostname // TODO: optionally specify local hostname somehow + hostname := "" if hostname == "" { hostname = host // default to above } @@ -1017,11 +1020,12 @@ func (obj *Remotes) Run() { // Exit causes as much of the Remotes struct to shutdown as quickly and as // cleanly as possible. It only returns once everything is shutdown. -func (obj *Remotes) Exit() { +func (obj *Remotes) Exit() error { obj.lock.Lock() obj.exiting = true // don't spawn new ones once this flag is set! obj.lock.Unlock() close(obj.exitChan) + var reterr error for _, f := range obj.remotes { sshobj, exists := obj.sshmap[f] if !exists || sshobj == nil { @@ -1030,7 +1034,8 @@ func (obj *Remotes) Exit() { // TODO: should we run these as go routines? if err := sshobj.Stop(); err != nil { - log.Printf("Remote: Error stopping: %s", err) + err = errwrap.Wrapf(err, "Remote: Error stopping!") + reterr = multierr.Append(reterr, err) // list of errors } } @@ -1040,6 +1045,7 @@ func (obj *Remotes) Exit() { defer obj.cuid.Unregister() obj.wg.Wait() // wait for everyone to exit + return reterr } // fmtUID makes a random string of length n, it is not cryptographically safe. diff --git a/resources/virt.go b/resources/virt.go index d8330a4f..73464c65 100644 --- a/resources/virt.go +++ b/resources/virt.go @@ -27,7 +27,7 @@ import ( "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/global" - "github.com/pkg/errors" + errwrap "github.com/pkg/errors" "github.com/rgbkrk/libvirt-go" ) @@ -78,7 +78,7 @@ func NewVirtRes(name string, uri, state string, transient bool, cpus uint16, mem func (obj *VirtRes) Init() error { if !libvirtInitialized { if err := libvirt.EventRegisterDefaultImpl(); err != nil { - return errors.Wrapf(err, "EventRegisterDefaultImpl failed") + return errwrap.Wrapf(err, "EventRegisterDefaultImpl failed") } libvirtInitialized = true } @@ -139,7 +139,7 @@ func (obj *VirtRes) Watch(processChan chan event.Event) error { } //log.Printf("EventRunDefaultImpl started!") if err := libvirt.EventRunDefaultImpl(); err != nil { - errorChan <- errors.Wrapf(err, "EventRunDefaultImpl failed") + errorChan <- errwrap.Wrapf(err, "EventRunDefaultImpl failed") return } //log.Printf("EventRunDefaultImpl looped!") @@ -254,13 +254,13 @@ func (obj *VirtRes) attrCheckApply(apply bool) (bool, error) { dom, err := obj.conn.LookupDomainByName(obj.GetName()) if err != nil { - return false, errors.Wrapf(err, "conn.LookupDomainByName failed") + return false, errwrap.Wrapf(err, "conn.LookupDomainByName failed") } domInfo, err := dom.GetInfo() if err != nil { // we don't know if the state is ok - return false, errors.Wrapf(err, "domain.GetInfo failed") + return false, errwrap.Wrapf(err, "domain.GetInfo failed") } // check memory @@ -270,7 +270,7 @@ func (obj *VirtRes) attrCheckApply(apply bool) (bool, error) { return false, nil } if err := dom.SetMemory(obj.Memory); err != nil { - return false, errors.Wrapf(err, "domain.SetMemory failed") + return false, errwrap.Wrapf(err, "domain.SetMemory failed") } log.Printf("%s[%s]: Memory changed", obj.Kind(), obj.GetName()) } @@ -282,7 +282,7 @@ func (obj *VirtRes) attrCheckApply(apply bool) (bool, error) { return false, nil } if err := dom.SetVcpus(obj.CPUs); err != nil { - return false, errors.Wrapf(err, "domain.SetVcpus failed") + return false, errwrap.Wrapf(err, "domain.SetVcpus failed") } log.Printf("%s[%s]: CPUs changed", obj.Kind(), obj.GetName()) } @@ -373,13 +373,13 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { var c = true dom, c, err = obj.domainCreate() // create the domain if err != nil { - return false, errors.Wrapf(err, "domainCreate failed") + return false, errwrap.Wrapf(err, "domainCreate failed") } else if !c { checkOK = false } } else { - return false, errors.Wrapf(err, "LookupDomainByName failed") + return false, errwrap.Wrapf(err, "LookupDomainByName failed") } defer dom.Free() // domain exists @@ -387,17 +387,17 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { domInfo, err := dom.GetInfo() if err != nil { // we don't know if the state is ok - return false, errors.Wrapf(err, "domain.GetInfo failed") + return false, errwrap.Wrapf(err, "domain.GetInfo failed") } isPersistent, err := dom.IsPersistent() if err != nil { // we don't know if the state is ok - return false, errors.Wrapf(err, "domain.IsPersistent failed") + return false, errwrap.Wrapf(err, "domain.IsPersistent failed") } isActive, err := dom.IsActive() if err != nil { // we don't know if the state is ok - return false, errors.Wrapf(err, "domain.IsActive failed") + return false, errwrap.Wrapf(err, "domain.IsActive failed") } // check for persistence @@ -407,16 +407,16 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { } if isPersistent { if err := dom.Undefine(); err != nil { - return false, errors.Wrapf(err, "domain.Undefine failed") + return false, errwrap.Wrapf(err, "domain.Undefine failed") } log.Printf("%s[%s]: Domain undefined", obj.Kind(), obj.GetName()) } else { domXML, err := dom.GetXMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE) if err != nil { - return false, errors.Wrapf(err, "domain.GetXMLDesc failed") + return false, errwrap.Wrapf(err, "domain.GetXMLDesc failed") } if _, err = obj.conn.DomainDefineXML(domXML); err != nil { - return false, errors.Wrapf(err, "conn.DomainDefineXML failed") + return false, errwrap.Wrapf(err, "conn.DomainDefineXML failed") } log.Printf("%s[%s]: Domain defined", obj.Kind(), obj.GetName()) } @@ -439,14 +439,14 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { } if isActive { // domain must be paused ? if err := dom.Resume(); err != nil { - return false, errors.Wrapf(err, "domain.Resume failed") + return false, errwrap.Wrapf(err, "domain.Resume failed") } checkOK = false log.Printf("%s[%s]: Domain resumed", obj.Kind(), obj.GetName()) break } if err := dom.Create(); err != nil { - return false, errors.Wrapf(err, "domain.Create failed") + return false, errwrap.Wrapf(err, "domain.Create failed") } checkOK = false log.Printf("%s[%s]: Domain created", obj.Kind(), obj.GetName()) @@ -460,14 +460,14 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { } if isActive { // domain must be running ? if err := dom.Suspend(); err != nil { - return false, errors.Wrapf(err, "domain.Suspend failed") + return false, errwrap.Wrapf(err, "domain.Suspend failed") } checkOK = false log.Printf("%s[%s]: Domain paused", obj.Kind(), obj.GetName()) break } if err := dom.CreateWithFlags(libvirt.VIR_DOMAIN_START_PAUSED); err != nil { - return false, errors.Wrapf(err, "domain.CreateWithFlags failed") + return false, errwrap.Wrapf(err, "domain.CreateWithFlags failed") } checkOK = false log.Printf("%s[%s]: Domain created paused", obj.Kind(), obj.GetName()) @@ -481,7 +481,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { } if err := dom.Destroy(); err != nil { - return false, errors.Wrapf(err, "domain.Destroy failed") + return false, errwrap.Wrapf(err, "domain.Destroy failed") } checkOK = false log.Printf("%s[%s]: Domain destroyed", obj.Kind(), obj.GetName()) @@ -495,7 +495,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { // mem & cpu checks... if !obj.absent { if c, err := obj.attrCheckApply(apply); err != nil { - return false, errors.Wrapf(err, "attrCheckApply failed") + return false, errwrap.Wrapf(err, "attrCheckApply failed") } else if !c { checkOK = false } diff --git a/test/omv/pkg1a.yaml b/test/omv/pkg1a.yaml index 30445ad7..f19995cf 100644 --- a/test/omv/pkg1a.yaml +++ b/test/omv/pkg1a.yaml @@ -32,7 +32,7 @@ - iptables -F - cd /vagrant/mgmt/ && make path - cd /vagrant/mgmt/ && make deps && make build && cp mgmt ~/bin/ - - cd && mgmt run --file /vagrant/mgmt/examples/pkg1.yaml --converged-timeout=5 + - cd && mgmt run --yaml /vagrant/mgmt/examples/pkg1.yaml --converged-timeout=5 :namespace: omv :count: 0 :username: '' diff --git a/test/omv/pkg1b.yaml b/test/omv/pkg1b.yaml index 6fe595d7..62518bdf 100644 --- a/test/omv/pkg1b.yaml +++ b/test/omv/pkg1b.yaml @@ -33,7 +33,7 @@ - iptables -F - cd /vagrant/mgmt/ && make path - cd /vagrant/mgmt/ && make deps && make build && cp mgmt ~/bin/ - - cd && mgmt run --file /vagrant/mgmt/examples/pkg1.yaml --converged-timeout=5 + - cd && mgmt run --yaml /vagrant/mgmt/examples/pkg1.yaml --converged-timeout=5 :namespace: omv :count: 0 :username: '' diff --git a/test/shell/t2.sh b/test/shell/t2.sh index c1a16be1..1923e3c8 100755 --- a/test/shell/t2.sh +++ b/test/shell/t2.sh @@ -7,7 +7,7 @@ if env | grep -q -e '^TRAVIS=true$'; then fi # run till completion -timeout --kill-after=15s 10s ./mgmt run --file t2.yaml --converged-timeout=5 --no-watch --tmp-prefix & +timeout --kill-after=15s 10s ./mgmt run --yaml t2.yaml --converged-timeout=5 --no-watch --tmp-prefix & pid=$! wait $pid # get exit status e=$? diff --git a/test/shell/t3.sh b/test/shell/t3.sh index b15e7394..e285d7b3 100755 --- a/test/shell/t3.sh +++ b/test/shell/t3.sh @@ -10,11 +10,11 @@ fi mkdir -p "${MGMT_TMPDIR}"mgmt{A..C} # run till completion -timeout --kill-after=15s 10s ./mgmt run --file t3-a.yaml --converged-timeout=5 --no-watch --tmp-prefix & +timeout --kill-after=15s 10s ./mgmt run --yaml t3-a.yaml --converged-timeout=5 --no-watch --tmp-prefix & pid1=$! -timeout --kill-after=15s 10s ./mgmt run --file t3-b.yaml --converged-timeout=5 --no-watch --tmp-prefix & +timeout --kill-after=15s 10s ./mgmt run --yaml t3-b.yaml --converged-timeout=5 --no-watch --tmp-prefix & pid2=$! -timeout --kill-after=15s 10s ./mgmt run --file t3-c.yaml --converged-timeout=5 --no-watch --tmp-prefix & +timeout --kill-after=15s 10s ./mgmt run --yaml t3-c.yaml --converged-timeout=5 --no-watch --tmp-prefix & pid3=$! wait $pid1 # get exit status diff --git a/test/shell/t4.sh b/test/shell/t4.sh index d0f4cc6d..139bffa2 100755 --- a/test/shell/t4.sh +++ b/test/shell/t4.sh @@ -1,7 +1,7 @@ #!/bin/bash -e # should take slightly more than 25s, but fail if we take 35s) -timeout --kill-after=35s 30s ./mgmt run --file t4.yaml --converged-timeout=5 --no-watch --tmp-prefix & +timeout --kill-after=35s 30s ./mgmt run --yaml t4.yaml --converged-timeout=5 --no-watch --tmp-prefix & pid=$! wait $pid # get exit status exit $? diff --git a/test/shell/t5.sh b/test/shell/t5.sh index f7d9506a..48b6ee37 100755 --- a/test/shell/t5.sh +++ b/test/shell/t5.sh @@ -1,7 +1,7 @@ #!/bin/bash -e # should take slightly more than 35s, but fail if we take 45s) -timeout --kill-after=45s 40s ./mgmt run --file t5.yaml --converged-timeout=5 --no-watch --tmp-prefix & +timeout --kill-after=45s 40s ./mgmt run --yaml t5.yaml --converged-timeout=5 --no-watch --tmp-prefix & pid=$! wait $pid # get exit status exit $? diff --git a/test/shell/t6.sh b/test/shell/t6.sh index 2ee21024..f01420c2 100755 --- a/test/shell/t6.sh +++ b/test/shell/t6.sh @@ -7,7 +7,7 @@ if env | grep -q -e '^TRAVIS=true$'; then fi # run till completion -timeout --kill-after=20s 15s ./mgmt run --file t6.yaml --no-watch --tmp-prefix & +timeout --kill-after=20s 15s ./mgmt run --yaml t6.yaml --no-watch --tmp-prefix & pid=$! sleep 1s # let it converge test -e /tmp/mgmt/f1 diff --git a/yamlgraph/gapi.go b/yamlgraph/gapi.go new file mode 100644 index 00000000..044f94b1 --- /dev/null +++ b/yamlgraph/gapi.go @@ -0,0 +1,120 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package yamlgraph + +import ( + "fmt" + "log" + "sync" + + "github.com/purpleidea/mgmt/gapi" + "github.com/purpleidea/mgmt/pgraph" + "github.com/purpleidea/mgmt/recwatch" +) + +// GAPI implements the main yamlgraph GAPI interface. +type GAPI struct { + File *string // yaml graph definition to use; nil if undefined + + data gapi.Data + initialized bool + closeChan chan struct{} + wg sync.WaitGroup // sync group for tunnel go routines +} + +// NewGAPI creates a new yamlgraph GAPI struct and calls Init(). +func NewGAPI(data gapi.Data, file *string) (*GAPI, error) { + obj := &GAPI{ + File: file, + } + return obj, obj.Init(data) +} + +// Init initializes the yamlgraph GAPI struct. +func (obj *GAPI) Init(data gapi.Data) error { + if obj.initialized { + return fmt.Errorf("Already initialized!") + } + if obj.File == nil { + return fmt.Errorf("The File param must be specified!") + } + obj.data = data // store for later + obj.closeChan = make(chan struct{}) + obj.initialized = true + return nil +} + +// Graph returns a current Graph. +func (obj *GAPI) Graph() (*pgraph.Graph, error) { + if !obj.initialized { + return nil, fmt.Errorf("yamlgraph: GAPI is not initialized") + } + + config := ParseConfigFromFile(*obj.File) + if config == nil { + return nil, fmt.Errorf("yamlgraph: ParseConfigFromFile returned nil") + } + + g, err := config.NewGraphFromConfig(obj.data.Hostname, obj.data.EmbdEtcd, obj.data.Noop) + return g, err +} + +// SwitchStream returns nil errors every time there could be a new graph. +func (obj *GAPI) SwitchStream() chan error { + if obj.data.NoWatch { + return nil + } + ch := make(chan error) + obj.wg.Add(1) + go func() { + defer obj.wg.Done() + defer close(ch) // this will run before the obj.wg.Done() + if !obj.initialized { + ch <- fmt.Errorf("yamlgraph: GAPI is not initialized") + return + } + configChan := recwatch.ConfigWatch(*obj.File) + for { + select { + case err, ok := <-configChan: // returns nil events on ok! + if !ok { // the channel closed! + return + } + log.Printf("yamlgraph: Generating new graph...") + ch <- err // trigger a run + if err != nil { + return + } + case <-obj.closeChan: + return + } + } + }() + return ch +} + +// Close shuts down the yamlgraph GAPI. +func (obj *GAPI) Close() error { + if !obj.initialized { + return fmt.Errorf("yamlgraph: GAPI is not initialized") + } + close(obj.closeChan) + obj.wg.Wait() + obj.initialized = false // closed = true + return nil +} diff --git a/gconfig/gconfig.go b/yamlgraph/gconfig.go similarity index 87% rename from gconfig/gconfig.go rename to yamlgraph/gconfig.go index 876d0ee9..93fcd7a6 100644 --- a/gconfig/gconfig.go +++ b/yamlgraph/gconfig.go @@ -15,8 +15,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -// Package gconfig provides the facilities for loading a graph from a yaml file. -package gconfig +// Package yamlgraph provides the facilities for loading a graph from a yaml file. +package yamlgraph import ( "errors" @@ -27,7 +27,6 @@ import ( "strings" "github.com/purpleidea/mgmt/etcd" - "github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/global" "github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/resources" @@ -74,7 +73,6 @@ type GraphConfig struct { Collector []collectorResConfig `yaml:"collect"` Edges []Edge `yaml:"edges"` Comment string `yaml:"comment"` - Hostname string `yaml:"hostname"` // uuid for the host Remote string `yaml:"remote"` } @@ -89,36 +87,13 @@ func (c *GraphConfig) Parse(data []byte) error { return nil } -// ParseConfigFromFile takes a filename and returns the graph config structure. -func ParseConfigFromFile(filename string) *GraphConfig { - data, err := ioutil.ReadFile(filename) - if err != nil { - log.Printf("Config: Error: ParseConfigFromFile: File: %v", err) - return nil - } +// NewGraphFromConfig transforms a GraphConfig struct into a new graph. +// FIXME: remove any possibly left over, now obsolete graph diff code from here! +func (c *GraphConfig) NewGraphFromConfig(hostname string, embdEtcd *etcd.EmbdEtcd, noop bool) (*pgraph.Graph, error) { + // hostname is the uuid for the host - var config GraphConfig - if err := config.Parse(data); err != nil { - log.Printf("Config: Error: ParseConfigFromFile: Parse: %v", err) - return nil - } - - return &config -} - -// NewGraphFromConfig returns a new graph from existing input, such as from the -// existing graph, and a GraphConfig struct. -func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtcd, noop bool) (*pgraph.Graph, error) { - if c.Hostname == "" { - return nil, fmt.Errorf("Config: Error: Hostname can't be empty!") - } - - var graph *pgraph.Graph // new graph to return - if g == nil { // FIXME: how can we check for an empty graph? - graph = pgraph.NewGraph("Graph") // give graph a default name - } else { - graph = g.Copy() // same vertices, since they're pointers! - } + var graph *pgraph.Graph // new graph to return + graph = pgraph.NewGraph("Graph") // give graph a default name var lookup = make(map[string]map[string]*pgraph.Vertex) @@ -148,9 +123,9 @@ func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtc if !ok { return nil, fmt.Errorf("Config: Error: Can't convert: %v of type: %T to Res.", x, x) } - if noop { - res.Meta().Noop = noop - } + //if noop { // now done in mgmtmain + // res.Meta().Noop = noop + //} if _, exists := lookup[kind]; !exists { lookup[kind] = make(map[string]*pgraph.Vertex) } @@ -175,7 +150,7 @@ func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtc } } // store in etcd - if err := etcd.EtcdSetResources(embdEtcd, c.Hostname, resourceList); err != nil { + if err := etcd.EtcdSetResources(embdEtcd, hostname, resourceList); err != nil { return nil, fmt.Errorf("Config: Could not export resources: %v", err) } @@ -217,9 +192,9 @@ func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtc matched = true // collect resources but add the noop metaparam - if noop { - res.Meta().Noop = noop - } + //if noop { // now done in mgmtmain + // res.Meta().Noop = noop + //} if t.Pattern != "" { // XXX: simplistic for now res.CollectPattern(t.Pattern) // res.Dirname = t.Pattern @@ -244,15 +219,6 @@ func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtc } } - // get rid of any vertices we shouldn't "keep" (that aren't in new graph) - for _, v := range graph.GetVertices() { - if !pgraph.VertexContains(v, keep) { - // wait for exit before starting new graph! - v.SendEvent(event.EventExit, true, false) - graph.DeleteVertex(v) - } - } - for _, e := range c.Edges { if _, ok := lookup[util.FirstToUpper(e.From.Kind)]; !ok { return nil, fmt.Errorf("Can't find 'from' resource!") @@ -271,3 +237,20 @@ func (c *GraphConfig) NewGraphFromConfig(g *pgraph.Graph, embdEtcd *etcd.EmbdEtc return graph, nil } + +// ParseConfigFromFile takes a filename and returns the graph config structure. +func ParseConfigFromFile(filename string) *GraphConfig { + data, err := ioutil.ReadFile(filename) + if err != nil { + log.Printf("Config: Error: ParseConfigFromFile: File: %v", err) + return nil + } + + var config GraphConfig + if err := config.Parse(data); err != nil { + log.Printf("Config: Error: ParseConfigFromFile: Parse: %v", err) + return nil + } + + return &config +}