diff --git a/cli/deploy.go b/cli/deploy.go index 1406aa63..7d7d1a00 100644 --- a/cli/deploy.go +++ b/cli/deploy.go @@ -36,8 +36,9 @@ import ( "os/signal" cliUtil "github.com/purpleidea/mgmt/cli/util" + "github.com/purpleidea/mgmt/engine" + "github.com/purpleidea/mgmt/etcd" "github.com/purpleidea/mgmt/etcd/client" - "github.com/purpleidea/mgmt/etcd/deployer" etcdfs "github.com/purpleidea/mgmt/etcd/fs" "github.com/purpleidea/mgmt/gapi" "github.com/purpleidea/mgmt/lib" @@ -186,26 +187,33 @@ func (obj *DeployArgs) Run(ctx context.Context, data *cliUtil.Data) (bool, error } }() - simpleDeploy := &deployer.SimpleDeploy{ + var world engine.World + world = &etcd.World{ // XXX: What should some of these fields be? + //Hostname: hostname, Client: etcdClient, - Debug: data.Flags.Debug, + //MetadataPrefix: lib.MetadataPrefix, + //StoragePrefix: lib.StoragePrefix, + //StandaloneFs: ???.DeployFs, // used for static deploys + Debug: data.Flags.Debug, Logf: func(format string, v ...interface{}) { - Logf("deploy: "+format, v...) + Logf("world: "+format, v...) }, + //GetURI: func() string { + //}, } - if err := simpleDeploy.Init(); err != nil { - return false, errwrap.Wrapf(err, "deploy Init failed") + if err := world.Init(); err != nil { + return false, errwrap.Wrapf(err, "world Init failed") } defer func() { - err := errwrap.Wrapf(simpleDeploy.Close(), "deploy Close failed") + err := errwrap.Wrapf(world.Close(), "world Close failed") if err != nil { - // TODO: cause the final exit code to be non-zero - Logf("deploy cleanup error: %+v", err) + // TODO: cause the final exit code to be non-zero? + Logf("close error: %+v", err) } }() // get max id (from all the previous deploys) - max, err := simpleDeploy.GetMaxDeployID(ctx) + max, err := world.GetMaxDeployID(ctx) if err != nil { return false, errwrap.Wrapf(err, "error getting max deploy id") } @@ -213,6 +221,7 @@ func (obj *DeployArgs) Run(ctx context.Context, data *cliUtil.Data) (bool, error var id = max + 1 // next id Logf("previous max deploy id: %d", max) + // XXX: Get this from the World API? (Which might need improving!) etcdFs := &etcdfs.Fs{ Client: etcdClient, // TODO: using a uuid is meant as a temporary measure, i hate them @@ -262,7 +271,7 @@ func (obj *DeployArgs) Run(ctx context.Context, data *cliUtil.Data) (bool, error Logf("pushing...") // this nominally checks the previous git hash matches our expectation - if err := simpleDeploy.AddDeploy(ctx, id, hash, pHash, &str); err != nil { + if err := world.AddDeploy(ctx, id, hash, pHash, &str); err != nil { return false, errwrap.Wrapf(err, "could not create deploy id `%d`", id) } Logf("success, id: %d", id) diff --git a/engine/world.go b/engine/world.go index dcf8b736..e2a99422 100644 --- a/engine/world.go +++ b/engine/world.go @@ -40,8 +40,16 @@ import ( // GAPI to store state and exchange information throughout the cluster. It is // the interface each machine uses to communicate with the rest of the world. type World interface { // TODO: is there a better name for this interface? + // Init sets things up and is called once before any other methods. + Init() error + + // Close does some cleanup and is the last method that is ever called. + Close() error + FsWorld + DeployWorld + StrWorld ResWorld @@ -60,6 +68,21 @@ type FsWorld interface { Fs(uri string) (Fs, error) } +// DeployWorld is a world interface with all of the deploy functions. +type DeployWorld interface { + WatchDeploy(context.Context) (chan error, error) + + // TODO: currently unused, but already implemented + //GetDeploys(ctx context.Context) (map[uint64]string, error) + + GetDeploy(ctx context.Context, id uint64) (string, error) + + GetMaxDeployID(ctx context.Context) (uint64, error) + + // TODO: This could be split out to a sub-interface? + AddDeploy(ctx context.Context, id uint64, hash, pHash string, data *string) error +} + // StrWorld is a world interface which is useful for reading, writing, and // watching strings in a shared, distributed database. It is likely that much of // the functionality is built upon these primitives. diff --git a/etcd/world.go b/etcd/world.go index 965a956c..d0d371a5 100644 --- a/etcd/world.go +++ b/etcd/world.go @@ -41,11 +41,13 @@ import ( "github.com/purpleidea/mgmt/etcd/client/resources" "github.com/purpleidea/mgmt/etcd/client/str" "github.com/purpleidea/mgmt/etcd/client/strmap" + "github.com/purpleidea/mgmt/etcd/deployer" etcdfs "github.com/purpleidea/mgmt/etcd/fs" "github.com/purpleidea/mgmt/etcd/interfaces" "github.com/purpleidea/mgmt/etcd/scheduler" "github.com/purpleidea/mgmt/lang/embedded" "github.com/purpleidea/mgmt/util" + "github.com/purpleidea/mgmt/util/errwrap" ) // World is an etcd backed implementation of the World interface. @@ -58,6 +60,60 @@ type World struct { GetURI func() string Debug bool Logf func(format string, v ...interface{}) + + simpleDeploy *deployer.SimpleDeploy +} + +// Init runs first. +func (obj *World) Init() error { + obj.simpleDeploy = &deployer.SimpleDeploy{ + Client: obj.Client, + Debug: obj.Debug, + Logf: func(format string, v ...interface{}) { + obj.Logf("deploy: "+format, v...) + }, + } + if err := obj.simpleDeploy.Init(); err != nil { + return errwrap.Wrapf(err, "deploy Init failed") + } + + return nil +} + +// Close runs last. +func (obj *World) Close() error { + var errs error + if obj.simpleDeploy != nil { + err := obj.simpleDeploy.Close() + err = errwrap.Wrapf(err, "deploy Close failed") + errs = errwrap.Append(errs, err) + } + return errs +} + +// WatchDeploy returns a channel which spits out events on new deploy activity. +func (obj *World) WatchDeploy(ctx context.Context) (chan error, error) { + return obj.simpleDeploy.WatchDeploy(ctx) +} + +// GetDeploys gets all the available deploys. +func (obj *World) GetDeploys(ctx context.Context) (map[uint64]string, error) { + return obj.simpleDeploy.GetDeploys(ctx) +} + +// GetDeploy returns the deploy with the specified id if it exists. +func (obj *World) GetDeploy(ctx context.Context, id uint64) (string, error) { + return obj.simpleDeploy.GetDeploy(ctx, id) +} + +// GetMaxDeployID returns the maximum deploy id. +func (obj *World) GetMaxDeployID(ctx context.Context) (uint64, error) { + return obj.simpleDeploy.GetMaxDeployID(ctx) +} + +// AddDeploy adds a new deploy. +func (obj *World) AddDeploy(ctx context.Context, id uint64, hash, pHash string, data *string) error { + return obj.simpleDeploy.AddDeploy(ctx, id, hash, pHash, data) } // ResWatch returns a channel which spits out events on possible exported diff --git a/lang/interpret_test.go b/lang/interpret_test.go index 37f6dcdf..cfc952ce 100644 --- a/lang/interpret_test.go +++ b/lang/interpret_test.go @@ -52,6 +52,7 @@ import ( "github.com/purpleidea/mgmt/engine/local" engineUtil "github.com/purpleidea/mgmt/engine/util" "github.com/purpleidea/mgmt/etcd" + etcdClient "github.com/purpleidea/mgmt/etcd/client" "github.com/purpleidea/mgmt/lang/ast" "github.com/purpleidea/mgmt/lang/funcs/dage" "github.com/purpleidea/mgmt/lang/funcs/vars" @@ -889,9 +890,10 @@ func TestAstFunc2(t *testing.T) { }).Init() // implementation of the World API (alternatives can be substituted in) - world := &etcd.World{ - //Hostname: hostname, - //Client: etcdClient, + var world engine.World + world = &etcd.World{ + //Hostname: hostname, + Client: etcdClient.NewClientFromClient(nil), // stub //MetadataPrefix: /fs, // MetadataPrefix //StoragePrefix: "/storage", // StoragePrefix // TODO: is this correct? (seems to work for testing) @@ -901,6 +903,16 @@ func TestAstFunc2(t *testing.T) { logf("world: etcd: "+format, v...) }, } + if err := world.Init(); err != nil { + t.Errorf("world Init failed: %+v", err) + return + } + defer func() { + err := errwrap.Wrapf(world.Close(), "world Close failed") + if err != nil { + t.Errorf("close error: %+v", err) + } + }() variables := map[string]interfaces.Expr{ "purpleidea": &ast.ExprStr{V: "hello world!"}, // james says hi @@ -1769,9 +1781,10 @@ func TestAstFunc3(t *testing.T) { }).Init() // implementation of the World API (alternatives can be substituted in) - world := &etcd.World{ - //Hostname: hostname, - //Client: etcdClient, + var world engine.World + world = &etcd.World{ + //Hostname: hostname, + Client: etcdClient.NewClientFromClient(nil), // stub //MetadataPrefix: /fs, // MetadataPrefix //StoragePrefix: "/storage", // StoragePrefix // TODO: is this correct? (seems to work for testing) @@ -1781,6 +1794,16 @@ func TestAstFunc3(t *testing.T) { logf("world: etcd: "+format, v...) }, } + if err := world.Init(); err != nil { + t.Errorf("world Init failed: %+v", err) + return + } + defer func() { + err := errwrap.Wrapf(world.Close(), "world Close failed") + if err != nil { + t.Errorf("close error: %+v", err) + } + }() variables := map[string]interfaces.Expr{ "purpleidea": &ast.ExprStr{V: "hello world!"}, // james says hi diff --git a/lib/main.go b/lib/main.go index fd3a526b..ce2be3e0 100644 --- a/lib/main.go +++ b/lib/main.go @@ -51,7 +51,6 @@ import ( "github.com/purpleidea/mgmt/etcd" "github.com/purpleidea/mgmt/etcd/chooser" etcdClient "github.com/purpleidea/mgmt/etcd/client" - "github.com/purpleidea/mgmt/etcd/deployer" etcdInterfaces "github.com/purpleidea/mgmt/etcd/interfaces" "github.com/purpleidea/mgmt/gapi" "github.com/purpleidea/mgmt/gapi/empty" @@ -595,24 +594,6 @@ func (obj *Main) Run() error { client = c } - simpleDeploy := &deployer.SimpleDeploy{ - Client: client, - Debug: obj.Debug, - Logf: func(format string, v ...interface{}) { - obj.Logf("deploy: "+format, v...) - }, - } - if err := simpleDeploy.Init(); err != nil { - return errwrap.Wrapf(err, "deploy Init failed") - } - defer func() { - err := errwrap.Wrapf(simpleDeploy.Close(), "deploy Close failed") - if err != nil { - // TODO: cause the final exit code to be non-zero - Logf("cleanup error: %+v", err) - } - }() - // implementation of the Local API (we only expect just this single one) localAPI := (&local.API{ Prefix: fmt.Sprintf("%s/", path.Join(prefix, "local")), @@ -628,7 +609,8 @@ func (obj *Main) Run() error { // XXX: The "implementation of the World API" should have more than just // etcd in it, so this could live elsewhere package wise and just have // an etcd component from the etcd package added in. - world := &etcd.World{ + var world engine.World + world = &etcd.World{ Hostname: hostname, Client: client, MetadataPrefix: MetadataPrefix, @@ -645,6 +627,16 @@ func (obj *Main) Run() error { return gapiInfoResult.URI }, } + if err := world.Init(); err != nil { + return errwrap.Wrapf(err, "world Init failed") + } + defer func() { + err := errwrap.Wrapf(world.Close(), "world Close failed") + if err != nil { + // TODO: cause the final exit code to be non-zero? + Logf("close error: %+v", err) + } + }() obj.ge = &graph.Engine{ Program: obj.Program, @@ -1046,7 +1038,7 @@ func (obj *Main) Run() error { // get max id (from all the previous deploys) // this is what the existing cluster is already running // TODO: add a timeout to context? - max, err := simpleDeploy.GetMaxDeployID(exitCtx) + max, err := world.GetMaxDeployID(exitCtx) if err != nil { close(deployChan) // because we won't close it downstream... return errwrap.Wrapf(err, "error getting max deploy id") @@ -1085,7 +1077,7 @@ func (obj *Main) Run() error { // now we can wait for future deploys, but if we already had an // initial deploy from run, don't switch to this unless it's new ctx, cancel := context.WithCancel(context.Background()) - watchChan, err := simpleDeploy.WatchDeploy(ctx) + watchChan, err := world.WatchDeploy(ctx) if err != nil { cancel() Logf("error starting deploy: %+v", err) @@ -1140,7 +1132,7 @@ func (obj *Main) Run() error { // return // exit via channel close instead } - latest, err := simpleDeploy.GetMaxDeployID(ctx) // or zero + latest, err := world.GetMaxDeployID(ctx) // or zero if err != nil { Logf("error getting max deploy id: %+v", err) continue @@ -1167,7 +1159,7 @@ func (obj *Main) Run() error { // 0 passes through an empty deploy without an error... // (unless there is some sort of etcd error that occurs) - str, err := simpleDeploy.GetDeploy(ctx, latest) + str, err := world.GetDeploy(ctx, latest) if err != nil { Logf("deploy: error getting deploy: %+v", err) continue