lang: Initial implementation of the mgmt language
This is an initial implementation of the mgmt language. It is a declarative (immutable) functional, reactive, domain specific programming language. It is intended to be a language that is: * safe * powerful * easy to reason about With these properties, we hope this language, and the mgmt engine will allow you to model the real-time systems that you'd like to automate. This also includes a number of other associated changes. Sorry for the large size of this patch.
This commit is contained in:
270
lib/main.go
270
lib/main.go
@@ -23,6 +23,7 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/converger"
|
||||
@@ -37,7 +38,6 @@ import (
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
|
||||
etcdtypes "github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
multierr "github.com/hashicorp/go-multierror"
|
||||
errwrap "github.com/pkg/errors"
|
||||
)
|
||||
@@ -62,8 +62,9 @@ type Main struct {
|
||||
TmpPrefix bool // request a pseudo-random, temporary prefix to be used
|
||||
AllowTmpPrefix bool // allow creation of a new temporary prefix if main prefix is unavailable
|
||||
|
||||
GAPI gapi.GAPI // graph API interface struct
|
||||
Remotes []string // list of remote graph definitions to run
|
||||
Deploy *gapi.Deploy // deploy object including GAPI for static deploys
|
||||
DeployFs resources.Fs // used for static deploys
|
||||
Remotes []string // list of remote graph definitions to run
|
||||
|
||||
NoWatch bool // do not change graph under any circumstances
|
||||
NoConfigWatch bool // do not update graph due to config changes
|
||||
@@ -202,25 +203,7 @@ func (obj *Main) Exit(err error) {
|
||||
// Run is the main execution entrypoint to run mgmt.
|
||||
func (obj *Main) Run() error {
|
||||
|
||||
var start = time.Now().UnixNano()
|
||||
|
||||
var flags int
|
||||
if obj.Flags.Debug || true { // TODO: remove || true
|
||||
flags = log.LstdFlags | log.Lshortfile
|
||||
}
|
||||
flags = (flags - log.Ldate) // remove the date for now
|
||||
log.SetFlags(flags)
|
||||
|
||||
// un-hijack from capnslog...
|
||||
log.SetOutput(os.Stderr)
|
||||
if obj.Flags.Verbose {
|
||||
capnslog.SetFormatter(capnslog.NewLogFormatter(os.Stderr, "(etcd) ", flags))
|
||||
} else {
|
||||
capnslog.SetFormatter(capnslog.NewNilFormatter())
|
||||
}
|
||||
|
||||
log.Printf("This is: %s, version: %s", obj.Program, obj.Version)
|
||||
log.Printf("Main: Start: %v", start)
|
||||
hello(obj.Program, obj.Version, obj.Flags) // say hello!
|
||||
|
||||
hostname, err := os.Hostname() // a sensible default
|
||||
// allow passing in the hostname, instead of using the system setting
|
||||
@@ -338,6 +321,8 @@ func (obj *Main) Run() error {
|
||||
nil, // stateFn gets added in by EmbdEtcd
|
||||
)
|
||||
go converger.Loop(true) // main loop for converger, true to start paused
|
||||
// TODO: will this shut things down prematurely?
|
||||
converger.Start() // better start this anyways...
|
||||
|
||||
// embedded etcd
|
||||
if len(obj.seeds) == 0 {
|
||||
@@ -370,13 +355,16 @@ func (obj *Main) Run() error {
|
||||
}
|
||||
|
||||
// wait for etcd server to be ready before continuing...
|
||||
select {
|
||||
case <-EmbdEtcd.ServerReady():
|
||||
log.Printf("Main: Etcd: Server: Ready!")
|
||||
// pass
|
||||
case <-time.After(((etcd.MaxStartServerTimeout * etcd.MaxStartServerRetries) + 1) * time.Second):
|
||||
obj.Exit(fmt.Errorf("Main: Etcd: Startup timeout"))
|
||||
}
|
||||
// XXX: this is wrong if we're not going to be a server! we'll block!!!
|
||||
// select {
|
||||
// case <-EmbdEtcd.ServerReady():
|
||||
// log.Printf("Main: Etcd: Server: Ready!")
|
||||
// // pass
|
||||
// case <-time.After(((etcd.MaxStartServerTimeout * etcd.MaxStartServerRetries) + 1) * time.Second):
|
||||
// obj.Exit(fmt.Errorf("Main: Etcd: Startup timeout"))
|
||||
// }
|
||||
|
||||
time.Sleep(1 * time.Second) // XXX: temporary workaround
|
||||
|
||||
convergerStateFn := func(b bool) error {
|
||||
// exit if we are using the converged timeout and we are the
|
||||
@@ -399,8 +387,15 @@ func (obj *Main) Run() error {
|
||||
|
||||
// implementation of the World API (alternates can be substituted in)
|
||||
world := &etcd.World{
|
||||
Hostname: hostname,
|
||||
EmbdEtcd: EmbdEtcd,
|
||||
Hostname: hostname,
|
||||
EmbdEtcd: EmbdEtcd,
|
||||
MetadataPrefix: MetadataPrefix,
|
||||
StoragePrefix: StoragePrefix,
|
||||
StandaloneFs: obj.DeployFs, // used for static deploys
|
||||
Debug: obj.Flags.Debug,
|
||||
Logf: func(format string, v ...interface{}) {
|
||||
log.Printf("world: etcd: "+format, v...)
|
||||
},
|
||||
}
|
||||
|
||||
graph.Data = &resources.ResData{
|
||||
@@ -410,35 +405,96 @@ func (obj *Main) Run() error {
|
||||
World: world,
|
||||
Prefix: pgraphPrefix,
|
||||
Debug: obj.Flags.Debug,
|
||||
}
|
||||
|
||||
var gapiChan chan gapi.Next // stream events contain some instructions!
|
||||
if obj.GAPI != nil {
|
||||
data := gapi.Data{
|
||||
Hostname: hostname,
|
||||
World: world,
|
||||
Noop: obj.Noop,
|
||||
//NoWatch: obj.NoWatch,
|
||||
NoConfigWatch: obj.NoConfigWatch,
|
||||
NoStreamWatch: obj.NoStreamWatch,
|
||||
}
|
||||
if err := obj.GAPI.Init(data); err != nil {
|
||||
obj.Exit(fmt.Errorf("Main: GAPI: Init failed: %v", err))
|
||||
} else {
|
||||
// this must generate at least one event for it to work
|
||||
gapiChan = obj.GAPI.Next() // stream of graph switch events!
|
||||
}
|
||||
Logf: func(format string, v ...interface{}) {
|
||||
log.Printf("resources: "+format, v...)
|
||||
},
|
||||
}
|
||||
|
||||
exitchan := make(chan struct{}) // exit on close
|
||||
wg := &sync.WaitGroup{} // waitgroup for inner loop (go routine)
|
||||
|
||||
deployChan := make(chan *gapi.Deploy)
|
||||
var gapiImpl gapi.GAPI // active GAPI implementation
|
||||
gapiImpl = nil // starts off missing
|
||||
|
||||
var gapiChan chan gapi.Next // stream events contain some instructions!
|
||||
gapiChan = nil // starts off blocked
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
first := true // first loop or not
|
||||
var mainDeploy *gapi.Deploy
|
||||
for {
|
||||
log.Println("Main: Waiting...")
|
||||
// The GAPI should always kick off an event on Next() at
|
||||
// startup when (and if) it indeed has a graph to share!
|
||||
fastPause := false
|
||||
select {
|
||||
case deploy, ok := <-deployChan:
|
||||
if !ok { // channel closed
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("Main: Deploy: exited")
|
||||
}
|
||||
deployChan = nil // disable it
|
||||
|
||||
if gapiImpl != nil { // currently running...
|
||||
gapiChan = nil
|
||||
if err := gapiImpl.Close(); err != nil {
|
||||
err = errwrap.Wrapf(err, "the GAPI closed poorly")
|
||||
log.Printf("Main: Deploy: GAPI: Final close failed: %+v", err)
|
||||
}
|
||||
}
|
||||
return // this is the only place we exit
|
||||
}
|
||||
if deploy == nil {
|
||||
log.Printf("Main: Deploy: Received empty deploy")
|
||||
continue
|
||||
}
|
||||
mainDeploy = deploy // save this one
|
||||
gapiObj := mainDeploy.GAPI
|
||||
if gapiObj == nil {
|
||||
log.Printf("Main: Deploy: Received empty GAPI")
|
||||
continue
|
||||
}
|
||||
|
||||
if gapiImpl != nil { // currently running...
|
||||
gapiChan = nil
|
||||
if err := gapiImpl.Close(); err != nil {
|
||||
err = errwrap.Wrapf(err, "the GAPI closed poorly")
|
||||
log.Printf("Main: Deploy: GAPI: Close failed: %+v", err)
|
||||
}
|
||||
}
|
||||
gapiImpl = gapiObj // copy it to active
|
||||
|
||||
data := gapi.Data{
|
||||
Program: obj.Program,
|
||||
Hostname: hostname,
|
||||
World: world,
|
||||
Noop: mainDeploy.Noop,
|
||||
// FIXME: should the below flags come from the deploy struct?
|
||||
//NoWatch: obj.NoWatch,
|
||||
NoConfigWatch: obj.NoConfigWatch,
|
||||
NoStreamWatch: obj.NoStreamWatch,
|
||||
Debug: obj.Flags.Debug,
|
||||
Logf: func(format string, v ...interface{}) {
|
||||
log.Printf("gapi: "+format, v...)
|
||||
},
|
||||
}
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("Main: GAPI: Init...")
|
||||
}
|
||||
if err := gapiImpl.Init(data); err != nil {
|
||||
log.Printf("Main: GAPI: Init failed: %+v", err)
|
||||
// TODO: consider running previous GAPI?
|
||||
} else {
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("Main: GAPI: Next...")
|
||||
}
|
||||
// this must generate at least one event for it to work
|
||||
gapiChan = gapiImpl.Next() // stream of graph switch events!
|
||||
}
|
||||
continue
|
||||
|
||||
case next, ok := <-gapiChan:
|
||||
if !ok { // channel closed
|
||||
if obj.Flags.Debug {
|
||||
@@ -449,6 +505,8 @@ func (obj *Main) Run() error {
|
||||
}
|
||||
|
||||
// if we've been asked to exit...
|
||||
// TODO: do we want to block exits and wait?
|
||||
// TODO: we might want to wait for the next GAPI
|
||||
if next.Exit {
|
||||
obj.Exit(next.Err) // trigger exit
|
||||
continue // wait for exitchan
|
||||
@@ -464,15 +522,18 @@ func (obj *Main) Run() error {
|
||||
|
||||
fastPause = next.Fast // should we pause fast?
|
||||
|
||||
case <-exitchan:
|
||||
return
|
||||
//case <-exitchan: // we only exit on deployChan close!
|
||||
// return
|
||||
}
|
||||
|
||||
if obj.GAPI == nil {
|
||||
if gapiImpl == nil { // TODO: can this ever happen anymore?
|
||||
log.Printf("Main: GAPI is empty!")
|
||||
continue
|
||||
}
|
||||
|
||||
if first {
|
||||
converger.Pause() // it's already started!
|
||||
}
|
||||
// we need the vertices to be paused to work on them, so
|
||||
// run graph vertex LOCK...
|
||||
if !first { // TODO: we can flatten this check out I think
|
||||
@@ -483,7 +544,7 @@ func (obj *Main) Run() error {
|
||||
}
|
||||
|
||||
// make the graph from yaml, lib, puppet->yaml, or dsl!
|
||||
newGraph, err := obj.GAPI.Graph() // generate graph!
|
||||
newGraph, err := gapiImpl.Graph() // generate graph!
|
||||
if err != nil {
|
||||
log.Printf("Main: Error creating new graph: %v", err)
|
||||
// unpause!
|
||||
@@ -503,14 +564,14 @@ func (obj *Main) Run() error {
|
||||
for _, v := range newGraph.Vertices() {
|
||||
m := resources.VtoR(v).Meta()
|
||||
// apply the global noop parameter if requested
|
||||
if obj.Noop {
|
||||
m.Noop = obj.Noop
|
||||
if mainDeploy.Noop {
|
||||
m.Noop = mainDeploy.Noop
|
||||
}
|
||||
|
||||
// append the semaphore to each resource
|
||||
if obj.Sema > 0 { // NOTE: size == 0 would block
|
||||
if mainDeploy.Sema > 0 { // NOTE: size == 0 would block
|
||||
// a semaphore with an empty id is valid
|
||||
m.Sema = append(m.Sema, fmt.Sprintf(":%d", obj.Sema))
|
||||
m.Sema = append(m.Sema, fmt.Sprintf(":%d", mainDeploy.Sema))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -660,22 +721,93 @@ func (obj *Main) Run() error {
|
||||
// obj.Exit(fmt.Errorf("Main: Remotes: Run timeout"))
|
||||
}
|
||||
|
||||
if obj.GAPI == nil {
|
||||
converger.Start() // better start this for empty graphs
|
||||
if obj.Deploy != nil {
|
||||
deploy := obj.Deploy
|
||||
// redundant
|
||||
deploy.Noop = obj.Noop
|
||||
deploy.Sema = obj.Sema
|
||||
|
||||
select {
|
||||
case deployChan <- deploy:
|
||||
// send
|
||||
case <-exitchan:
|
||||
// pass
|
||||
}
|
||||
|
||||
// don't inline this, because when we close the deployChan it's
|
||||
// the signal to tell the engine to actually shutdown...
|
||||
go func() {
|
||||
defer close(deployChan) // no more are coming ever!
|
||||
select { // wait until we're ready to shutdown
|
||||
case <-exitchan:
|
||||
return
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
// etcd based deploy
|
||||
go func() {
|
||||
defer close(deployChan)
|
||||
startChan := make(chan struct{}) // start signal
|
||||
close(startChan) // kick it off!
|
||||
for {
|
||||
select {
|
||||
case <-startChan: // kick the loop once at start
|
||||
startChan = nil // disable
|
||||
|
||||
case err, ok := <-etcd.WatchDeploy(EmbdEtcd):
|
||||
if !ok {
|
||||
obj.Exit(nil) // regular shutdown
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
// TODO: it broke, can we restart?
|
||||
obj.Exit(fmt.Errorf("Main: Deploy: Watch error"))
|
||||
return
|
||||
}
|
||||
|
||||
case <-exitchan:
|
||||
return
|
||||
}
|
||||
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("Main: Deploy: Got activity")
|
||||
}
|
||||
str, err := etcd.GetDeploy(EmbdEtcd, 0) // 0 means get the latest one
|
||||
if err != nil {
|
||||
log.Printf("Main: Deploy: Error getting deploy %+v", err)
|
||||
continue
|
||||
}
|
||||
if str == "" { // no available deploys exist yet
|
||||
continue
|
||||
}
|
||||
|
||||
// decode the deploy (incl. GAPI) and send it!
|
||||
deploy, err := gapi.NewDeployFromB64(str)
|
||||
if err != nil {
|
||||
log.Printf("Main: Deploy: Error decoding deploy %+v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case deployChan <- deploy:
|
||||
// send
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("Main: Deploy: Sending new GAPI")
|
||||
}
|
||||
|
||||
case <-exitchan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
log.Println("Main: Running...")
|
||||
|
||||
reterr := <-obj.exit // wait for exit signal
|
||||
|
||||
log.Println("Main: Destroy...")
|
||||
|
||||
if obj.GAPI != nil {
|
||||
if err := obj.GAPI.Close(); err != nil {
|
||||
err = errwrap.Wrapf(err, "the GAPI closed poorly")
|
||||
reterr = multierr.Append(reterr, err) // list of errors
|
||||
}
|
||||
}
|
||||
|
||||
configWatcher.Close() // stop sending file changes to remotes
|
||||
if err := remotes.Exit(); err != nil { // tell all the remote connections to shutdown; waits!
|
||||
err = errwrap.Wrapf(err, "the Remote exited poorly")
|
||||
@@ -684,6 +816,7 @@ func (obj *Main) Run() error {
|
||||
|
||||
// tell inner main loop to exit
|
||||
close(exitchan)
|
||||
wg.Wait()
|
||||
|
||||
graph.Exit() // tells all the children to exit, and waits for them to do so
|
||||
|
||||
@@ -704,8 +837,9 @@ func (obj *Main) Run() error {
|
||||
if obj.Flags.Debug {
|
||||
log.Printf("Main: Graph: %v", graph)
|
||||
}
|
||||
|
||||
// TODO: wait for each vertex to exit...
|
||||
if reterr != nil {
|
||||
log.Printf("Main: Error: %v", reterr)
|
||||
}
|
||||
log.Println("Goodbye!")
|
||||
return reterr
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user