diff --git a/main.go b/main.go index 4a466454..3a8864ea 100644 --- a/main.go +++ b/main.go @@ -19,575 +19,21 @@ package main import ( "fmt" - "io/ioutil" - "log" "os" - "os/signal" - "sync" - "syscall" - "time" - "github.com/purpleidea/mgmt/converger" - "github.com/purpleidea/mgmt/etcd" - "github.com/purpleidea/mgmt/gconfig" - "github.com/purpleidea/mgmt/global" - "github.com/purpleidea/mgmt/pgraph" - "github.com/purpleidea/mgmt/puppet" - "github.com/purpleidea/mgmt/recwatch" - "github.com/purpleidea/mgmt/remote" - "github.com/purpleidea/mgmt/util" - - etcdtypes "github.com/coreos/etcd/pkg/types" - "github.com/coreos/pkg/capnslog" - "github.com/urfave/cli" + "github.com/purpleidea/mgmt/mgmtmain" ) // set at compile time var ( program string version string - prefix = fmt.Sprintf("/var/lib/%s/", program) ) -// signal handler -func waitForSignal(exit chan error) error { - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt) // catch ^C - //signal.Notify(signals, os.Kill) // catch signals - signal.Notify(signals, syscall.SIGTERM) - - select { - case sig := <-signals: // any signal will do - if sig == os.Interrupt { - log.Println("Interrupted by ^C") - return nil - } - log.Println("Interrupted by signal") - return fmt.Errorf("Killed by %v", sig) - case err := <-exit: // or a manual signal - log.Println("Interrupted by exit signal") - return err - } -} - -// run is the main run target. -func run(c *cli.Context) error { - var start = time.Now().UnixNano() - log.Printf("This is: %v, version: %v", program, version) - log.Printf("Main: Start: %v", start) - - hostname, _ := os.Hostname() - // allow passing in the hostname, instead of using --hostname - if c.IsSet("file") { - if config := gconfig.ParseConfigFromFile(c.String("file")); config != nil { - if h := config.Hostname; h != "" { - hostname = h - } - } - } - if c.IsSet("hostname") { // override by cli - if h := c.String("hostname"); h != "" { - hostname = h - } - } - noop := c.Bool("noop") - - seeds, err := etcdtypes.NewURLs( - util.FlattenListWithSplit(c.StringSlice("seeds"), []string{",", ";", " "}), - ) - if err != nil && len(c.StringSlice("seeds")) > 0 { - log.Printf("Main: Error: seeds didn't parse correctly!") - return cli.NewExitError("", 1) - } - clientURLs, err := etcdtypes.NewURLs( - util.FlattenListWithSplit(c.StringSlice("client-urls"), []string{",", ";", " "}), - ) - if err != nil && len(c.StringSlice("client-urls")) > 0 { - log.Printf("Main: Error: clientURLs didn't parse correctly!") - return cli.NewExitError("", 1) - } - serverURLs, err := etcdtypes.NewURLs( - util.FlattenListWithSplit(c.StringSlice("server-urls"), []string{",", ";", " "}), - ) - if err != nil && len(c.StringSlice("server-urls")) > 0 { - log.Printf("Main: Error: serverURLs didn't parse correctly!") - return cli.NewExitError("", 1) - } - - idealClusterSize := uint16(c.Int("ideal-cluster-size")) - if idealClusterSize < 1 { - log.Printf("Main: Error: idealClusterSize should be at least one!") - return cli.NewExitError("", 1) - } - - if c.IsSet("file") && c.IsSet("puppet") { - log.Println("Main: Error: the --file and --puppet parameters cannot be used together!") - return cli.NewExitError("", 1) - } - - if c.Bool("no-server") && len(c.StringSlice("remote")) > 0 { - // TODO: in this case, we won't be able to tunnel stuff back to - // here, so if we're okay with every remote graph running in an - // isolated mode, then this is okay. Improve on this if there's - // someone who really wants to be able to do this. - log.Println("Main: Error: the --no-server and --remote parameters cannot be used together!") - return cli.NewExitError("", 1) - } - - cConns := uint16(c.Int("cconns")) - if cConns < 0 { - log.Printf("Main: Error: --cconns should be at least zero!") - return cli.NewExitError("", 1) - } - - if c.IsSet("converged-timeout") && cConns > 0 && len(c.StringSlice("remote")) > c.Int("cconns") { - log.Printf("Main: Error: combining --converged-timeout with more remotes than available connections will never converge!") - return cli.NewExitError("", 1) - } - - depth := uint16(c.Int("depth")) - if depth < 0 { // user should not be using this argument manually - log.Printf("Main: Error: negative values for --depth are not permitted!") - return cli.NewExitError("", 1) - } - - if c.IsSet("prefix") && c.Bool("tmp-prefix") { - log.Println("Main: Error: combining --prefix and the request for a tmp prefix is illogical!") - return cli.NewExitError("", 1) - } - if s := c.String("prefix"); c.IsSet("prefix") && s != "" { - prefix = s - } - - // make sure the working directory prefix exists - if c.Bool("tmp-prefix") || os.MkdirAll(prefix, 0770) != nil { - if c.Bool("tmp-prefix") || c.Bool("allow-tmp-prefix") { - if prefix, err = ioutil.TempDir("", program+"-"); err != nil { - log.Printf("Main: Error: Can't create temporary prefix!") - return cli.NewExitError("", 1) - } - log.Println("Main: Warning: Working prefix directory is temporary!") - - } else { - log.Printf("Main: Error: Can't create prefix!") - return cli.NewExitError("", 1) - } - } - log.Printf("Main: Working prefix is: %s", prefix) - - var wg sync.WaitGroup - exit := make(chan error) // exit signal - var G, fullGraph *pgraph.Graph - - // exit after `max-runtime` seconds for no reason at all... - if i := c.Int("max-runtime"); i > 0 { - go func() { - time.Sleep(time.Duration(i) * time.Second) - exit <- nil - }() - } - - // setup converger - converger := converger.NewConverger( - c.Int("converged-timeout"), - nil, // stateFn gets added in by EmbdEtcd - ) - go converger.Loop(true) // main loop for converger, true to start paused - - // embedded etcd - if len(seeds) == 0 { - log.Printf("Main: Seeds: No seeds specified!") - } else { - log.Printf("Main: Seeds(%v): %v", len(seeds), seeds) - } - EmbdEtcd := etcd.NewEmbdEtcd( - hostname, - seeds, - clientURLs, - serverURLs, - c.Bool("no-server"), - idealClusterSize, - prefix, - converger, - ) - if EmbdEtcd == nil { - // TODO: verify EmbdEtcd is not nil below... - exit <- fmt.Errorf("Main: Etcd: Creation failed!") - } else if err := EmbdEtcd.Startup(); err != nil { // startup (returns when etcd main loop is running) - exit <- fmt.Errorf("Main: Etcd: Startup failed: %v", err) - } - convergerStateFn := func(b bool) error { - // exit if we are using the converged-timeout and we are the - // root node. otherwise, if we are a child node in a remote - // execution hierarchy, we should only notify our converged - // state and wait for the parent to trigger the exit. - if depth == 0 && c.Int("converged-timeout") >= 0 { - if b { - log.Printf("Converged for %d seconds, exiting!", c.Int("converged-timeout")) - exit <- nil // trigger an exit! - } - return nil - } - // send our individual state into etcd for others to see - return etcd.EtcdSetHostnameConverged(EmbdEtcd, hostname, b) // TODO: what should happen on error? - } - if EmbdEtcd != nil { - converger.SetStateFn(convergerStateFn) - } - - exitchan := make(chan struct{}) // exit on close - go func() { - startchan := make(chan struct{}) // start signal - go func() { startchan <- struct{}{} }() - file := c.String("file") - var configchan chan error - var puppetchan <-chan time.Time - if !c.Bool("no-watch") && c.IsSet("file") { - configchan = recwatch.ConfigWatch(file) - } else if c.IsSet("puppet") { - interval := puppet.PuppetInterval(c.String("puppet-conf")) - puppetchan = time.Tick(time.Duration(interval) * time.Second) - } - log.Println("Etcd: Starting...") - etcdchan := etcd.EtcdWatch(EmbdEtcd) - first := true // first loop or not - for { - log.Println("Main: Waiting...") - select { - case <-startchan: // kick the loop once at start - // pass - - case b := <-etcdchan: - if !b { // ignore the message - continue - } - // everything else passes through to cause a compile! - - case <-puppetchan: - // nothing, just go on - - case e := <-configchan: - if c.Bool("no-watch") { - continue // not ready to read config - } - if e != nil { - exit <- e // trigger exit - continue - //return // TODO: return or wait for exitchan? - } - // XXX: case compile_event: ... - // ... - case <-exitchan: - return - } - - var config *gconfig.GraphConfig - if c.IsSet("file") { - config = gconfig.ParseConfigFromFile(file) - } else if c.IsSet("puppet") { - config = puppet.ParseConfigFromPuppet(c.String("puppet"), c.String("puppet-conf")) - } - if config == nil { - log.Printf("Config: Parse failure") - continue - } - - if config.Hostname != "" && config.Hostname != hostname { - log.Printf("Config: Hostname changed, ignoring config!") - continue - } - config.Hostname = hostname // set it in case it was "" - - // run graph vertex LOCK... - if !first { // TODO: we can flatten this check out I think - converger.Pause() // FIXME: add sync wait? - G.Pause() // sync - } - - // build graph from yaml file on events (eg: from etcd) - // we need the vertices to be paused to work on them - if newFullgraph, err := config.NewGraphFromConfig(fullGraph, EmbdEtcd, noop); err == nil { // keep references to all original elements - fullGraph = newFullgraph - } else { - log.Printf("Config: Error making new graph from config: %v", err) - // unpause! - if !first { - G.Start(&wg, first) // sync - converger.Start() // after G.Start() - } - continue - } - - G = fullGraph.Copy() // copy to active graph - // XXX: do etcd transaction out here... - G.AutoEdges() // add autoedges; modifies the graph - G.AutoGroup() // run autogroup; modifies the graph - // TODO: do we want to do a transitive reduction? - - log.Printf("Graph: %v", G) // show graph - err := G.ExecGraphviz(c.String("graphviz-filter"), c.String("graphviz")) - if err != nil { - log.Printf("Graphviz: %v", err) - } else { - log.Printf("Graphviz: Successfully generated graph!") - } - G.AssociateData(converger) - // G.Start(...) needs to be synchronous or wait, - // because if half of the nodes are started and - // some are not ready yet and the EtcdWatch - // loops, we'll cause G.Pause(...) before we - // even got going, thus causing nil pointer errors - G.Start(&wg, first) // sync - converger.Start() // after G.Start() - first = false - } - }() - - configWatcher := recwatch.NewConfigWatcher() - events := configWatcher.Events() - if !c.Bool("no-watch") { - configWatcher.Add(c.StringSlice("remote")...) // add all the files... - } else { - events = nil // signal that no-watch is true - } - go func() { - select { - case err := <-configWatcher.Error(): - exit <- err // trigger an exit! - - case <-exitchan: - return - } - }() - - // initialize the add watcher, which calls the f callback on map changes - convergerCb := func(f func(map[string]bool) error) (func(), error) { - return etcd.EtcdAddHostnameConvergedWatcher(EmbdEtcd, f) - } - - // build remotes struct for remote ssh - remotes := remote.NewRemotes( - EmbdEtcd.LocalhostClientURLs().StringSlice(), - []string{etcd.DefaultClientURL}, - noop, - c.StringSlice("remote"), // list of files - events, // watch for file changes - cConns, - c.Bool("allow-interactive"), - c.String("ssh-priv-id-rsa"), - !c.Bool("no-caching"), - depth, - prefix, - converger, - convergerCb, - program, - ) - - // TODO: is there any benefit to running the remotes above in the loop? - // wait for etcd to be running before we remote in, which we do above! - go remotes.Run() - - if !c.IsSet("file") && !c.IsSet("puppet") { - converger.Start() // better start this for empty graphs - } - log.Println("Main: Running...") - - err = waitForSignal(exit) // pass in exit channel to watch - - log.Println("Destroy...") - - configWatcher.Close() // stop sending file changes to remotes - remotes.Exit() // tell all the remote connections to shutdown; waits! - - G.Exit() // tell all the children to exit - - // tell inner main loop to exit - close(exitchan) - - // cleanup etcd main loop last so it can process everything first - if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd - log.Printf("Etcd exited poorly with: %v", err) - } - - if global.DEBUG { - log.Printf("Graph: %v", G) - } - - wg.Wait() // wait for primary go routines to exit - - // TODO: wait for each vertex to exit... - log.Println("Goodbye!") - return err -} - func main() { - var flags int - if global.DEBUG || true { // TODO: remove || true - flags = log.LstdFlags | log.Lshortfile + if err := mgmtmain.CLI(program, version); err != nil { + fmt.Println(err) + os.Exit(1) + return } - flags = (flags - log.Ldate) // remove the date for now - log.SetFlags(flags) - - // un-hijack from capnslog... - log.SetOutput(os.Stderr) - if global.VERBOSE { - capnslog.SetFormatter(capnslog.NewLogFormatter(os.Stderr, "(etcd) ", flags)) - } else { - capnslog.SetFormatter(capnslog.NewNilFormatter()) - } - - // test for sanity - if program == "" || version == "" { - log.Fatal("Program was not compiled correctly. Please see Makefile.") - } - app := cli.NewApp() - app.Name = program - app.Usage = "next generation config management" - app.Version = version - //app.Action = ... // without a default action, help runs - - app.Commands = []cli.Command{ - { - Name: "run", - Aliases: []string{"r"}, - Usage: "run", - Action: run, - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "file, f", - Value: "", - Usage: "graph definition to run", - EnvVar: "MGMT_FILE", - }, - cli.BoolFlag{ - Name: "no-watch", - Usage: "do not update graph on watched graph definition file changes", - }, - cli.StringFlag{ - Name: "code, c", - Value: "", - Usage: "code definition to run", - }, - cli.StringFlag{ - Name: "graphviz, g", - Value: "", - Usage: "output file for graphviz data", - }, - cli.StringFlag{ - Name: "graphviz-filter, gf", - Value: "dot", // directed graph default - Usage: "graphviz filter to use", - }, - // useful for testing multiple instances on same machine - cli.StringFlag{ - Name: "hostname", - Value: "", - Usage: "hostname to use", - }, - // if empty, it will startup a new server - cli.StringSliceFlag{ - Name: "seeds, s", - Value: &cli.StringSlice{}, // empty slice - Usage: "default etc client endpoint", - EnvVar: "MGMT_SEEDS", - }, - // port 2379 and 4001 are common - cli.StringSliceFlag{ - Name: "client-urls", - Value: &cli.StringSlice{}, - Usage: "list of URLs to listen on for client traffic", - EnvVar: "MGMT_CLIENT_URLS", - }, - // port 2380 and 7001 are common - cli.StringSliceFlag{ - Name: "server-urls, peer-urls", - Value: &cli.StringSlice{}, - Usage: "list of URLs to listen on for server (peer) traffic", - EnvVar: "MGMT_SERVER_URLS", - }, - cli.BoolFlag{ - Name: "no-server", - Usage: "do not let other servers peer with me", - }, - cli.IntFlag{ - Name: "ideal-cluster-size", - Value: etcd.DefaultIdealClusterSize, - Usage: "ideal number of server peers in cluster, only read by initial server", - EnvVar: "MGMT_IDEAL_CLUSTER_SIZE", - }, - cli.IntFlag{ - Name: "converged-timeout, t", - Value: -1, - Usage: "exit after approximately this many seconds in a converged state", - EnvVar: "MGMT_CONVERGED_TIMEOUT", - }, - cli.IntFlag{ - Name: "max-runtime", - Value: 0, - Usage: "exit after a maximum of approximately this many seconds", - EnvVar: "MGMT_MAX_RUNTIME", - }, - cli.BoolFlag{ - Name: "noop", - Usage: "globally force all resources into no-op mode", - }, - cli.StringFlag{ - Name: "puppet, p", - Value: "", - Usage: "load graph from puppet, optionally takes a manifest or path to manifest file", - }, - cli.StringFlag{ - Name: "puppet-conf", - Value: "", - Usage: "supply the path to an alternate puppet.conf file to use", - }, - cli.StringSliceFlag{ - Name: "remote", - Value: &cli.StringSlice{}, - Usage: "list of remote graph definitions to run", - }, - cli.BoolFlag{ - Name: "allow-interactive", - Usage: "allow interactive prompting, such as for remote passwords", - }, - cli.StringFlag{ - Name: "ssh-priv-id-rsa", - Value: "~/.ssh/id_rsa", - Usage: "default path to ssh key file, set empty to never touch", - EnvVar: "MGMT_SSH_PRIV_ID_RSA", - }, - cli.IntFlag{ - Name: "cconns", - Value: 0, - Usage: "number of maximum concurrent remote ssh connections to run, 0 for unlimited", - EnvVar: "MGMT_CCONNS", - }, - cli.BoolFlag{ - Name: "no-caching", - Usage: "don't allow remote caching of remote execution binary", - }, - cli.IntFlag{ - Name: "depth", - Hidden: true, // internal use only - Value: 0, - Usage: "specify depth in remote hierarchy", - }, - cli.StringFlag{ - Name: "prefix", - Usage: "specify a path to the working prefix directory", - EnvVar: "MGMT_PREFIX", - }, - cli.BoolFlag{ - Name: "tmp-prefix", - Usage: "request a pseudo-random, temporary prefix to be used", - }, - cli.BoolFlag{ - Name: "allow-tmp-prefix", - Usage: "allow creation of a new temporary prefix if main prefix is unavailable", - }, - }, - }, - } - app.EnableBashCompletion = true - app.Run(os.Args) } diff --git a/main_test.go b/main_test.go deleted file mode 100644 index 415d68c5..00000000 --- a/main_test.go +++ /dev/null @@ -1,26 +0,0 @@ -// Mgmt -// Copyright (C) 2013-2016+ James Shubin and the project contributors -// Written by James Shubin and the project contributors -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package main - -import ( -//"testing" -) - -//func TestT1(t *testing.T) { - -//} diff --git a/mgmtmain/cli.go b/mgmtmain/cli.go new file mode 100644 index 00000000..8c43a93e --- /dev/null +++ b/mgmtmain/cli.go @@ -0,0 +1,271 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package mgmtmain + +import ( + "fmt" + "log" + "os" + "os/signal" + "syscall" + + "github.com/urfave/cli" +) + +// run is the main run target. +func run(c *cli.Context) error { + + obj := &Main{} + + if s := c.String("prefix"); c.IsSet("prefix") && s != "" { + obj.Prefix = &s + } + obj.TmpPrefix = c.Bool("tmp-prefix") + obj.AllowTmpPrefix = c.Bool("allow-tmp-prefix") + + if h := c.String("hostname"); c.IsSet("hostname") && h != "" { + obj.Hostname = &h + } + + if f := c.String("file"); c.IsSet("file") { + obj.File = &f + } + if p := c.String("puppet"); c.IsSet("puppet") { + obj.Puppet = &p + } + obj.PuppetConf = c.String("puppet-conf") + obj.Remotes = c.StringSlice("remote") + + obj.NoWatch = c.Bool("no-watch") + obj.Noop = c.Bool("noop") + obj.Graphviz = c.String("graphviz") + obj.GraphvizFilter = c.String("graphviz-filter") + obj.ConvergedTimeout = c.Int("converged-timeout") + obj.MaxRuntime = uint(c.Int("max-runtime")) + + obj.Seeds = c.StringSlice("seeds") + obj.ClientURLs = c.StringSlice("client-urls") + obj.ServerURLs = c.StringSlice("server-urls") + obj.IdealClusterSize = c.Int("ideal-cluster-size") + obj.NoServer = c.Bool("no-server") + + obj.CConns = uint16(c.Int("cconns")) + obj.AllowInteractive = c.Bool("allow-interactive") + obj.SSHPrivIDRsa = c.String("ssh-priv-id-rsa") + obj.NoCaching = c.Bool("no-caching") + obj.Depth = uint16(c.Int("depth")) + + if err := obj.Init(); err != nil { + return err + } + + // install the exit signal handler + exit := make(chan struct{}) + defer close(exit) + go func() { + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) // catch ^C + //signal.Notify(signals, os.Kill) // catch signals + signal.Notify(signals, syscall.SIGTERM) + + select { + case sig := <-signals: // any signal will do + if sig == os.Interrupt { + log.Println("Interrupted by ^C") + obj.Exit(nil) + return + } + log.Println("Interrupted by signal") + obj.Exit(fmt.Errorf("Killed by %v", sig)) + return + case <-exit: + return + } + }() + + if err := obj.Run(); err != nil { + return err + //return cli.NewExitError(err.Error(), 1) // TODO: ? + //return cli.NewExitError("", 1) // TODO: ? + } + return nil +} + +// CLI is the entry point for using mgmt normally from the CLI. +func CLI(program, version string) error { + + // test for sanity + if program == "" || version == "" { + return fmt.Errorf("Program was not compiled correctly. Please see Makefile.") + } + app := cli.NewApp() + app.Name = program + app.Usage = "next generation config management" + app.Version = version + //app.Action = ... // without a default action, help runs + + app.Commands = []cli.Command{ + { + Name: "run", + Aliases: []string{"r"}, + Usage: "run", + Action: run, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "prefix", + Usage: "specify a path to the working prefix directory", + EnvVar: "MGMT_PREFIX", + }, + cli.BoolFlag{ + Name: "tmp-prefix", + Usage: "request a pseudo-random, temporary prefix to be used", + }, + cli.BoolFlag{ + Name: "allow-tmp-prefix", + Usage: "allow creation of a new temporary prefix if main prefix is unavailable", + }, + + // useful for testing multiple instances on same machine + cli.StringFlag{ + Name: "hostname", + Value: "", + Usage: "hostname to use", + }, + + cli.StringFlag{ + Name: "code, c", + Value: "", + Usage: "code definition to run", + }, + cli.StringFlag{ + Name: "file, f", + Value: "", + Usage: "graph definition to run", + EnvVar: "MGMT_FILE", + }, + cli.StringFlag{ + Name: "puppet, p", + Value: "", + Usage: "load graph from puppet, optionally takes a manifest or path to manifest file", + }, + cli.StringFlag{ + Name: "puppet-conf", + Value: "", + Usage: "the path to an alternate puppet.conf file", + }, + cli.StringSliceFlag{ + Name: "remote", + Value: &cli.StringSlice{}, + Usage: "list of remote graph definitions to run", + }, + + cli.BoolFlag{ + Name: "no-watch", + Usage: "do not update graph on watched graph definition file changes", + }, + cli.BoolFlag{ + Name: "noop", + Usage: "globally force all resources into no-op mode", + }, + cli.StringFlag{ + Name: "graphviz, g", + Value: "", + Usage: "output file for graphviz data", + }, + cli.StringFlag{ + Name: "graphviz-filter, gf", + Value: "dot", // directed graph default + Usage: "graphviz filter to use", + }, + cli.IntFlag{ + Name: "converged-timeout, t", + Value: -1, + Usage: "exit after approximately this many seconds in a converged state", + EnvVar: "MGMT_CONVERGED_TIMEOUT", + }, + cli.IntFlag{ + Name: "max-runtime", + Value: 0, + Usage: "exit after a maximum of approximately this many seconds", + EnvVar: "MGMT_MAX_RUNTIME", + }, + + // if empty, it will startup a new server + cli.StringSliceFlag{ + Name: "seeds, s", + Value: &cli.StringSlice{}, // empty slice + Usage: "default etc client endpoint", + EnvVar: "MGMT_SEEDS", + }, + // port 2379 and 4001 are common + cli.StringSliceFlag{ + Name: "client-urls", + Value: &cli.StringSlice{}, + Usage: "list of URLs to listen on for client traffic", + EnvVar: "MGMT_CLIENT_URLS", + }, + // port 2380 and 7001 are common + cli.StringSliceFlag{ + Name: "server-urls, peer-urls", + Value: &cli.StringSlice{}, + Usage: "list of URLs to listen on for server (peer) traffic", + EnvVar: "MGMT_SERVER_URLS", + }, + cli.IntFlag{ + Name: "ideal-cluster-size", + Value: -1, + Usage: "ideal number of server peers in cluster; only read by initial server", + EnvVar: "MGMT_IDEAL_CLUSTER_SIZE", + }, + cli.BoolFlag{ + Name: "no-server", + Usage: "do not let other servers peer with me", + }, + + cli.IntFlag{ + Name: "cconns", + Value: 0, + Usage: "number of maximum concurrent remote ssh connections to run; 0 for unlimited", + EnvVar: "MGMT_CCONNS", + }, + cli.BoolFlag{ + Name: "allow-interactive", + Usage: "allow interactive prompting, such as for remote passwords", + }, + cli.StringFlag{ + Name: "ssh-priv-id-rsa", + Value: "~/.ssh/id_rsa", + Usage: "default path to ssh key file, set empty to never touch", + EnvVar: "MGMT_SSH_PRIV_ID_RSA", + }, + cli.BoolFlag{ + Name: "no-caching", + Usage: "don't allow remote caching of remote execution binary", + }, + cli.IntFlag{ + Name: "depth", + Hidden: true, // internal use only + Value: 0, + Usage: "specify depth in remote hierarchy", + }, + }, + }, + } + app.EnableBashCompletion = true + return app.Run(os.Args) +} diff --git a/mgmtmain/main.go b/mgmtmain/main.go new file mode 100644 index 00000000..ac1a83fa --- /dev/null +++ b/mgmtmain/main.go @@ -0,0 +1,454 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package mgmtmain + +import ( + "fmt" + "io/ioutil" + "log" + "os" + "sync" + "time" + + "github.com/purpleidea/mgmt/converger" + "github.com/purpleidea/mgmt/etcd" + "github.com/purpleidea/mgmt/gconfig" + "github.com/purpleidea/mgmt/pgraph" + "github.com/purpleidea/mgmt/puppet" + "github.com/purpleidea/mgmt/recwatch" + "github.com/purpleidea/mgmt/remote" + "github.com/purpleidea/mgmt/util" + + etcdtypes "github.com/coreos/etcd/pkg/types" + "github.com/coreos/pkg/capnslog" +) + +// Main is the main struct for running the mgmt logic. +type Main struct { + Program string // the name of this program, usually set at compile time + Version string // the version of this program, usually set at compile time + + Prefix *string // prefix passed in; nil if undefined + TmpPrefix bool // request a pseudo-random, temporary prefix to be used + AllowTmpPrefix bool // allow creation of a new temporary prefix if main prefix is unavailable + + Hostname *string // hostname to use; nil if undefined + + File *string // graph file to run; nil if undefined + Puppet *string // puppet mode to run; nil if undefined + PuppetConf string // the path to an alternate puppet.conf file + Remotes []string // list of remote graph definitions to run + + NoWatch bool // do not update graph on watched graph definition file changes + Noop bool // globally force all resources into no-op mode + Graphviz string // output file for graphviz data + GraphvizFilter string // graphviz filter to use + ConvergedTimeout int // exit after approximately this many seconds in a converged state; -1 to disable + MaxRuntime uint // exit after a maximum of approximately this many seconds + + Seeds []string // default etc client endpoint + ClientURLs []string // list of URLs to listen on for client traffic + ServerURLs []string // list of URLs to listen on for server (peer) traffic + IdealClusterSize int // ideal number of server peers in cluster; only read by initial server + NoServer bool // do not let other servers peer with me + + CConns uint16 // number of maximum concurrent remote ssh connections to run, 0 for unlimited + AllowInteractive bool // allow interactive prompting, such as for remote passwords + SSHPrivIDRsa string // default path to ssh key file, set empty to never touch + NoCaching bool // don't allow remote caching of remote execution binary + Depth uint16 // depth in remote hierarchy; for internal use only + + DEBUG bool + VERBOSE bool + + seeds etcdtypes.URLs // processed seeds value + clientURLs etcdtypes.URLs // processed client urls value + serverURLs etcdtypes.URLs // processed server urls value + idealClusterSize uint16 // processed ideal cluster size value + exit chan error // exit signal +} + +// Init initializes the main struct after it performs some validation. +func (obj *Main) Init() error { + + if obj.Prefix != nil && obj.TmpPrefix { + return fmt.Errorf("Choosing a prefix and the request for a tmp prefix is illogical!") + } + + if obj.File != nil && obj.Puppet != nil { + return fmt.Errorf("The File and Puppet parameters cannot be used together!") + } + + obj.idealClusterSize = uint16(obj.IdealClusterSize) + if obj.IdealClusterSize < 0 { // value is undefined, set to the default + obj.idealClusterSize = etcd.DefaultIdealClusterSize + } + + if obj.idealClusterSize < 1 { + return fmt.Errorf("IdealClusterSize should be at least one!") + } + + if obj.NoServer && len(obj.Remotes) > 0 { + // TODO: in this case, we won't be able to tunnel stuff back to + // here, so if we're okay with every remote graph running in an + // isolated mode, then this is okay. Improve on this if there's + // someone who really wants to be able to do this. + return fmt.Errorf("The Server is required when using Remotes!") + } + + if obj.CConns < 0 { + return fmt.Errorf("The CConns value should be at least zero!") + } + + if obj.ConvergedTimeout >= 0 && obj.CConns > 0 && len(obj.Remotes) > int(obj.CConns) { + return fmt.Errorf("You can't converge if you have more remotes than available connections!") + } + + if obj.Depth < 0 { // user should not be using this argument manually + return fmt.Errorf("Negative values for Depth are not permitted!") + } + + // transform the url list inputs into etcd typed lists + var err error + obj.seeds, err = etcdtypes.NewURLs( + util.FlattenListWithSplit(obj.Seeds, []string{",", ";", " "}), + ) + if err != nil && len(obj.Seeds) > 0 { + return fmt.Errorf("Seeds didn't parse correctly!") + } + obj.clientURLs, err = etcdtypes.NewURLs( + util.FlattenListWithSplit(obj.ClientURLs, []string{",", ";", " "}), + ) + if err != nil && len(obj.ClientURLs) > 0 { + return fmt.Errorf("ClientURLs didn't parse correctly!") + } + obj.serverURLs, err = etcdtypes.NewURLs( + util.FlattenListWithSplit(obj.ServerURLs, []string{",", ";", " "}), + ) + if err != nil && len(obj.ServerURLs) > 0 { + return fmt.Errorf("ServerURLs didn't parse correctly!") + } + + obj.exit = make(chan error) + return nil +} + +// Exit causes a safe shutdown. This is often attached to the ^C signal handler. +func (obj *Main) Exit(err error) { + obj.exit <- err // trigger an exit! +} + +// Run is the main execution entrypoint to run mgmt. +func (obj *Main) Run() error { + + var start = time.Now().UnixNano() + + var flags int + if obj.DEBUG || true { // TODO: remove || true + flags = log.LstdFlags | log.Lshortfile + } + flags = (flags - log.Ldate) // remove the date for now + log.SetFlags(flags) + + // un-hijack from capnslog... + log.SetOutput(os.Stderr) + if obj.VERBOSE { + capnslog.SetFormatter(capnslog.NewLogFormatter(os.Stderr, "(etcd) ", flags)) + } else { + capnslog.SetFormatter(capnslog.NewNilFormatter()) + } + + log.Printf("This is: %s, version: %s", obj.Program, obj.Version) + log.Printf("Main: Start: %v", start) + + var hostname, _ = os.Hostname() + // allow passing in the hostname, instead of using --hostname + if obj.File != nil { + if config := gconfig.ParseConfigFromFile(*obj.File); config != nil { + if h := config.Hostname; h != "" { + hostname = h + } + } + } + if obj.Hostname != nil { // override by cli + if h := obj.Hostname; *h != "" { + hostname = *h + } + } + + var prefix = fmt.Sprintf("/var/lib/%s/", obj.Program) // default prefix + if p := obj.Prefix; p != nil { + prefix = *p + } + // make sure the working directory prefix exists + if obj.TmpPrefix || os.MkdirAll(prefix, 0770) != nil { + if obj.TmpPrefix || obj.AllowTmpPrefix { + var err error + if prefix, err = ioutil.TempDir("", obj.Program+"-"); err != nil { + return fmt.Errorf("Main: Error: Can't create temporary prefix!") + } + log.Println("Main: Warning: Working prefix directory is temporary!") + + } else { + return fmt.Errorf("Main: Error: Can't create prefix!") + } + } + log.Printf("Main: Working prefix is: %s", prefix) + + var wg sync.WaitGroup + var G, fullGraph *pgraph.Graph + + // exit after `max-runtime` seconds for no reason at all... + if i := obj.MaxRuntime; i > 0 { + go func() { + time.Sleep(time.Duration(i) * time.Second) + obj.Exit(nil) + }() + } + + // setup converger + converger := converger.NewConverger( + obj.ConvergedTimeout, + nil, // stateFn gets added in by EmbdEtcd + ) + go converger.Loop(true) // main loop for converger, true to start paused + + // embedded etcd + if len(obj.seeds) == 0 { + log.Printf("Main: Seeds: No seeds specified!") + } else { + log.Printf("Main: Seeds(%d): %v", len(obj.seeds), obj.seeds) + } + EmbdEtcd := etcd.NewEmbdEtcd( + hostname, + obj.seeds, + obj.clientURLs, + obj.serverURLs, + obj.NoServer, + obj.idealClusterSize, + prefix, + converger, + ) + if EmbdEtcd == nil { + // TODO: verify EmbdEtcd is not nil below... + obj.Exit(fmt.Errorf("Main: Etcd: Creation failed!")) + } else if err := EmbdEtcd.Startup(); err != nil { // startup (returns when etcd main loop is running) + obj.Exit(fmt.Errorf("Main: Etcd: Startup failed: %v", err)) + } + convergerStateFn := func(b bool) error { + // exit if we are using the converged timeout and we are the + // root node. otherwise, if we are a child node in a remote + // execution hierarchy, we should only notify our converged + // state and wait for the parent to trigger the exit. + if t := obj.ConvergedTimeout; obj.Depth == 0 && t >= 0 { + if b { + log.Printf("Converged for %d seconds, exiting!", t) + obj.Exit(nil) // trigger an exit! + } + return nil + } + // send our individual state into etcd for others to see + return etcd.EtcdSetHostnameConverged(EmbdEtcd, hostname, b) // TODO: what should happen on error? + } + if EmbdEtcd != nil { + converger.SetStateFn(convergerStateFn) + } + + exitchan := make(chan struct{}) // exit on close + go func() { + startchan := make(chan struct{}) // start signal + go func() { startchan <- struct{}{} }() + var configchan chan error + var puppetchan <-chan time.Time + if !obj.NoWatch && obj.File != nil { + configchan = recwatch.ConfigWatch(*obj.File) + } else if obj.Puppet != nil { + interval := puppet.PuppetInterval(obj.PuppetConf) + puppetchan = time.Tick(time.Duration(interval) * time.Second) + } + log.Println("Etcd: Starting...") + etcdchan := etcd.EtcdWatch(EmbdEtcd) + first := true // first loop or not + for { + log.Println("Main: Waiting...") + select { + case <-startchan: // kick the loop once at start + // pass + + case b := <-etcdchan: + if !b { // ignore the message + continue + } + // everything else passes through to cause a compile! + + case <-puppetchan: + // nothing, just go on + + case e := <-configchan: + if obj.NoWatch { + continue // not ready to read config + } + if e != nil { + obj.Exit(e) // trigger exit + continue + //return // TODO: return or wait for exitchan? + } + // XXX: case compile_event: ... + // ... + case <-exitchan: + return + } + + var config *gconfig.GraphConfig + if obj.File != nil { + config = gconfig.ParseConfigFromFile(*obj.File) + } else if obj.Puppet != nil { + config = puppet.ParseConfigFromPuppet(*obj.Puppet, obj.PuppetConf) + } + if config == nil { + log.Printf("Config: Parse failure") + continue + } + + if config.Hostname != "" && config.Hostname != hostname { + log.Printf("Config: Hostname changed, ignoring config!") + continue + } + config.Hostname = hostname // set it in case it was "" + + // run graph vertex LOCK... + if !first { // TODO: we can flatten this check out I think + converger.Pause() // FIXME: add sync wait? + G.Pause() // sync + } + + // build graph from yaml file on events (eg: from etcd) + // we need the vertices to be paused to work on them + if newFullgraph, err := config.NewGraphFromConfig(fullGraph, EmbdEtcd, obj.Noop); err == nil { // keep references to all original elements + fullGraph = newFullgraph + } else { + log.Printf("Config: Error making new graph from config: %v", err) + // unpause! + if !first { + G.Start(&wg, first) // sync + converger.Start() // after G.Start() + } + continue + } + + G = fullGraph.Copy() // copy to active graph + // XXX: do etcd transaction out here... + G.AutoEdges() // add autoedges; modifies the graph + G.AutoGroup() // run autogroup; modifies the graph + // TODO: do we want to do a transitive reduction? + + log.Printf("Graph: %v", G) // show graph + err := G.ExecGraphviz(obj.GraphvizFilter, obj.Graphviz) + if err != nil { + log.Printf("Graphviz: %v", err) + } else { + log.Printf("Graphviz: Successfully generated graph!") + } + G.AssociateData(converger) + // G.Start(...) needs to be synchronous or wait, + // because if half of the nodes are started and + // some are not ready yet and the EtcdWatch + // loops, we'll cause G.Pause(...) before we + // even got going, thus causing nil pointer errors + G.Start(&wg, first) // sync + converger.Start() // after G.Start() + first = false + } + }() + + configWatcher := recwatch.NewConfigWatcher() + events := configWatcher.Events() + if !obj.NoWatch { + configWatcher.Add(obj.Remotes...) // add all the files... + } else { + events = nil // signal that no-watch is true + } + go func() { + select { + case err := <-configWatcher.Error(): + obj.Exit(err) // trigger an exit! + + case <-exitchan: + return + } + }() + + // initialize the add watcher, which calls the f callback on map changes + convergerCb := func(f func(map[string]bool) error) (func(), error) { + return etcd.EtcdAddHostnameConvergedWatcher(EmbdEtcd, f) + } + + // build remotes struct for remote ssh + remotes := remote.NewRemotes( + EmbdEtcd.LocalhostClientURLs().StringSlice(), + []string{etcd.DefaultClientURL}, + obj.Noop, + obj.Remotes, // list of files + events, // watch for file changes + obj.CConns, + obj.AllowInteractive, + obj.SSHPrivIDRsa, + !obj.NoCaching, + obj.Depth, + prefix, + converger, + convergerCb, + obj.Program, + ) + + // TODO: is there any benefit to running the remotes above in the loop? + // wait for etcd to be running before we remote in, which we do above! + go remotes.Run() + + if obj.File == nil && obj.Puppet == nil { + converger.Start() // better start this for empty graphs + } + log.Println("Main: Running...") + + err := <-obj.exit // wait for exit signal + + log.Println("Destroy...") + + configWatcher.Close() // stop sending file changes to remotes + remotes.Exit() // tell all the remote connections to shutdown; waits! + + G.Exit() // tell all the children to exit + + // tell inner main loop to exit + close(exitchan) + + // cleanup etcd main loop last so it can process everything first + if err := EmbdEtcd.Destroy(); err != nil { // shutdown and cleanup etcd + log.Printf("Etcd exited poorly with: %v", err) + } + + if obj.DEBUG { + log.Printf("Graph: %v", G) + } + + wg.Wait() // wait for primary go routines to exit + + // TODO: wait for each vertex to exit... + log.Println("Goodbye!") + return err +}