cli, engine, etcd, lib: Split out the deployer into world

This should hopefully make the refactor into a clean world API a bit
better. Still more to do though!
This commit is contained in:
James Shubin
2025-03-18 04:24:39 -04:00
parent 1a35ab61ca
commit 7ad54fe3e8
5 changed files with 144 additions and 41 deletions

View File

@@ -36,8 +36,9 @@ import (
"os/signal" "os/signal"
cliUtil "github.com/purpleidea/mgmt/cli/util" cliUtil "github.com/purpleidea/mgmt/cli/util"
"github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/etcd"
"github.com/purpleidea/mgmt/etcd/client" "github.com/purpleidea/mgmt/etcd/client"
"github.com/purpleidea/mgmt/etcd/deployer"
etcdfs "github.com/purpleidea/mgmt/etcd/fs" etcdfs "github.com/purpleidea/mgmt/etcd/fs"
"github.com/purpleidea/mgmt/gapi" "github.com/purpleidea/mgmt/gapi"
"github.com/purpleidea/mgmt/lib" "github.com/purpleidea/mgmt/lib"
@@ -186,26 +187,33 @@ func (obj *DeployArgs) Run(ctx context.Context, data *cliUtil.Data) (bool, error
} }
}() }()
simpleDeploy := &deployer.SimpleDeploy{ var world engine.World
world = &etcd.World{ // XXX: What should some of these fields be?
//Hostname: hostname,
Client: etcdClient, Client: etcdClient,
//MetadataPrefix: lib.MetadataPrefix,
//StoragePrefix: lib.StoragePrefix,
//StandaloneFs: ???.DeployFs, // used for static deploys
Debug: data.Flags.Debug, Debug: data.Flags.Debug,
Logf: func(format string, v ...interface{}) { Logf: func(format string, v ...interface{}) {
Logf("deploy: "+format, v...) Logf("world: "+format, v...)
}, },
//GetURI: func() string {
//},
} }
if err := simpleDeploy.Init(); err != nil { if err := world.Init(); err != nil {
return false, errwrap.Wrapf(err, "deploy Init failed") return false, errwrap.Wrapf(err, "world Init failed")
} }
defer func() { defer func() {
err := errwrap.Wrapf(simpleDeploy.Close(), "deploy Close failed") err := errwrap.Wrapf(world.Close(), "world Close failed")
if err != nil { if err != nil {
// TODO: cause the final exit code to be non-zero // TODO: cause the final exit code to be non-zero?
Logf("deploy cleanup error: %+v", err) Logf("close error: %+v", err)
} }
}() }()
// get max id (from all the previous deploys) // get max id (from all the previous deploys)
max, err := simpleDeploy.GetMaxDeployID(ctx) max, err := world.GetMaxDeployID(ctx)
if err != nil { if err != nil {
return false, errwrap.Wrapf(err, "error getting max deploy id") return false, errwrap.Wrapf(err, "error getting max deploy id")
} }
@@ -213,6 +221,7 @@ func (obj *DeployArgs) Run(ctx context.Context, data *cliUtil.Data) (bool, error
var id = max + 1 // next id var id = max + 1 // next id
Logf("previous max deploy id: %d", max) Logf("previous max deploy id: %d", max)
// XXX: Get this from the World API? (Which might need improving!)
etcdFs := &etcdfs.Fs{ etcdFs := &etcdfs.Fs{
Client: etcdClient, Client: etcdClient,
// TODO: using a uuid is meant as a temporary measure, i hate them // TODO: using a uuid is meant as a temporary measure, i hate them
@@ -262,7 +271,7 @@ func (obj *DeployArgs) Run(ctx context.Context, data *cliUtil.Data) (bool, error
Logf("pushing...") Logf("pushing...")
// this nominally checks the previous git hash matches our expectation // this nominally checks the previous git hash matches our expectation
if err := simpleDeploy.AddDeploy(ctx, id, hash, pHash, &str); err != nil { if err := world.AddDeploy(ctx, id, hash, pHash, &str); err != nil {
return false, errwrap.Wrapf(err, "could not create deploy id `%d`", id) return false, errwrap.Wrapf(err, "could not create deploy id `%d`", id)
} }
Logf("success, id: %d", id) Logf("success, id: %d", id)

View File

@@ -40,8 +40,16 @@ import (
// GAPI to store state and exchange information throughout the cluster. It is // GAPI to store state and exchange information throughout the cluster. It is
// the interface each machine uses to communicate with the rest of the world. // the interface each machine uses to communicate with the rest of the world.
type World interface { // TODO: is there a better name for this interface? type World interface { // TODO: is there a better name for this interface?
// Init sets things up and is called once before any other methods.
Init() error
// Close does some cleanup and is the last method that is ever called.
Close() error
FsWorld FsWorld
DeployWorld
StrWorld StrWorld
ResWorld ResWorld
@@ -60,6 +68,21 @@ type FsWorld interface {
Fs(uri string) (Fs, error) Fs(uri string) (Fs, error)
} }
// DeployWorld is a world interface with all of the deploy functions.
type DeployWorld interface {
WatchDeploy(context.Context) (chan error, error)
// TODO: currently unused, but already implemented
//GetDeploys(ctx context.Context) (map[uint64]string, error)
GetDeploy(ctx context.Context, id uint64) (string, error)
GetMaxDeployID(ctx context.Context) (uint64, error)
// TODO: This could be split out to a sub-interface?
AddDeploy(ctx context.Context, id uint64, hash, pHash string, data *string) error
}
// StrWorld is a world interface which is useful for reading, writing, and // StrWorld is a world interface which is useful for reading, writing, and
// watching strings in a shared, distributed database. It is likely that much of // watching strings in a shared, distributed database. It is likely that much of
// the functionality is built upon these primitives. // the functionality is built upon these primitives.

View File

@@ -41,11 +41,13 @@ import (
"github.com/purpleidea/mgmt/etcd/client/resources" "github.com/purpleidea/mgmt/etcd/client/resources"
"github.com/purpleidea/mgmt/etcd/client/str" "github.com/purpleidea/mgmt/etcd/client/str"
"github.com/purpleidea/mgmt/etcd/client/strmap" "github.com/purpleidea/mgmt/etcd/client/strmap"
"github.com/purpleidea/mgmt/etcd/deployer"
etcdfs "github.com/purpleidea/mgmt/etcd/fs" etcdfs "github.com/purpleidea/mgmt/etcd/fs"
"github.com/purpleidea/mgmt/etcd/interfaces" "github.com/purpleidea/mgmt/etcd/interfaces"
"github.com/purpleidea/mgmt/etcd/scheduler" "github.com/purpleidea/mgmt/etcd/scheduler"
"github.com/purpleidea/mgmt/lang/embedded" "github.com/purpleidea/mgmt/lang/embedded"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
"github.com/purpleidea/mgmt/util/errwrap"
) )
// World is an etcd backed implementation of the World interface. // World is an etcd backed implementation of the World interface.
@@ -58,6 +60,60 @@ type World struct {
GetURI func() string GetURI func() string
Debug bool Debug bool
Logf func(format string, v ...interface{}) Logf func(format string, v ...interface{})
simpleDeploy *deployer.SimpleDeploy
}
// Init runs first.
func (obj *World) Init() error {
obj.simpleDeploy = &deployer.SimpleDeploy{
Client: obj.Client,
Debug: obj.Debug,
Logf: func(format string, v ...interface{}) {
obj.Logf("deploy: "+format, v...)
},
}
if err := obj.simpleDeploy.Init(); err != nil {
return errwrap.Wrapf(err, "deploy Init failed")
}
return nil
}
// Close runs last.
func (obj *World) Close() error {
var errs error
if obj.simpleDeploy != nil {
err := obj.simpleDeploy.Close()
err = errwrap.Wrapf(err, "deploy Close failed")
errs = errwrap.Append(errs, err)
}
return errs
}
// WatchDeploy returns a channel which spits out events on new deploy activity.
func (obj *World) WatchDeploy(ctx context.Context) (chan error, error) {
return obj.simpleDeploy.WatchDeploy(ctx)
}
// GetDeploys gets all the available deploys.
func (obj *World) GetDeploys(ctx context.Context) (map[uint64]string, error) {
return obj.simpleDeploy.GetDeploys(ctx)
}
// GetDeploy returns the deploy with the specified id if it exists.
func (obj *World) GetDeploy(ctx context.Context, id uint64) (string, error) {
return obj.simpleDeploy.GetDeploy(ctx, id)
}
// GetMaxDeployID returns the maximum deploy id.
func (obj *World) GetMaxDeployID(ctx context.Context) (uint64, error) {
return obj.simpleDeploy.GetMaxDeployID(ctx)
}
// AddDeploy adds a new deploy.
func (obj *World) AddDeploy(ctx context.Context, id uint64, hash, pHash string, data *string) error {
return obj.simpleDeploy.AddDeploy(ctx, id, hash, pHash, data)
} }
// ResWatch returns a channel which spits out events on possible exported // ResWatch returns a channel which spits out events on possible exported

View File

@@ -52,6 +52,7 @@ import (
"github.com/purpleidea/mgmt/engine/local" "github.com/purpleidea/mgmt/engine/local"
engineUtil "github.com/purpleidea/mgmt/engine/util" engineUtil "github.com/purpleidea/mgmt/engine/util"
"github.com/purpleidea/mgmt/etcd" "github.com/purpleidea/mgmt/etcd"
etcdClient "github.com/purpleidea/mgmt/etcd/client"
"github.com/purpleidea/mgmt/lang/ast" "github.com/purpleidea/mgmt/lang/ast"
"github.com/purpleidea/mgmt/lang/funcs/dage" "github.com/purpleidea/mgmt/lang/funcs/dage"
"github.com/purpleidea/mgmt/lang/funcs/vars" "github.com/purpleidea/mgmt/lang/funcs/vars"
@@ -889,9 +890,10 @@ func TestAstFunc2(t *testing.T) {
}).Init() }).Init()
// implementation of the World API (alternatives can be substituted in) // implementation of the World API (alternatives can be substituted in)
world := &etcd.World{ var world engine.World
world = &etcd.World{
//Hostname: hostname, //Hostname: hostname,
//Client: etcdClient, Client: etcdClient.NewClientFromClient(nil), // stub
//MetadataPrefix: /fs, // MetadataPrefix //MetadataPrefix: /fs, // MetadataPrefix
//StoragePrefix: "/storage", // StoragePrefix //StoragePrefix: "/storage", // StoragePrefix
// TODO: is this correct? (seems to work for testing) // TODO: is this correct? (seems to work for testing)
@@ -901,6 +903,16 @@ func TestAstFunc2(t *testing.T) {
logf("world: etcd: "+format, v...) logf("world: etcd: "+format, v...)
}, },
} }
if err := world.Init(); err != nil {
t.Errorf("world Init failed: %+v", err)
return
}
defer func() {
err := errwrap.Wrapf(world.Close(), "world Close failed")
if err != nil {
t.Errorf("close error: %+v", err)
}
}()
variables := map[string]interfaces.Expr{ variables := map[string]interfaces.Expr{
"purpleidea": &ast.ExprStr{V: "hello world!"}, // james says hi "purpleidea": &ast.ExprStr{V: "hello world!"}, // james says hi
@@ -1769,9 +1781,10 @@ func TestAstFunc3(t *testing.T) {
}).Init() }).Init()
// implementation of the World API (alternatives can be substituted in) // implementation of the World API (alternatives can be substituted in)
world := &etcd.World{ var world engine.World
world = &etcd.World{
//Hostname: hostname, //Hostname: hostname,
//Client: etcdClient, Client: etcdClient.NewClientFromClient(nil), // stub
//MetadataPrefix: /fs, // MetadataPrefix //MetadataPrefix: /fs, // MetadataPrefix
//StoragePrefix: "/storage", // StoragePrefix //StoragePrefix: "/storage", // StoragePrefix
// TODO: is this correct? (seems to work for testing) // TODO: is this correct? (seems to work for testing)
@@ -1781,6 +1794,16 @@ func TestAstFunc3(t *testing.T) {
logf("world: etcd: "+format, v...) logf("world: etcd: "+format, v...)
}, },
} }
if err := world.Init(); err != nil {
t.Errorf("world Init failed: %+v", err)
return
}
defer func() {
err := errwrap.Wrapf(world.Close(), "world Close failed")
if err != nil {
t.Errorf("close error: %+v", err)
}
}()
variables := map[string]interfaces.Expr{ variables := map[string]interfaces.Expr{
"purpleidea": &ast.ExprStr{V: "hello world!"}, // james says hi "purpleidea": &ast.ExprStr{V: "hello world!"}, // james says hi

View File

@@ -51,7 +51,6 @@ import (
"github.com/purpleidea/mgmt/etcd" "github.com/purpleidea/mgmt/etcd"
"github.com/purpleidea/mgmt/etcd/chooser" "github.com/purpleidea/mgmt/etcd/chooser"
etcdClient "github.com/purpleidea/mgmt/etcd/client" etcdClient "github.com/purpleidea/mgmt/etcd/client"
"github.com/purpleidea/mgmt/etcd/deployer"
etcdInterfaces "github.com/purpleidea/mgmt/etcd/interfaces" etcdInterfaces "github.com/purpleidea/mgmt/etcd/interfaces"
"github.com/purpleidea/mgmt/gapi" "github.com/purpleidea/mgmt/gapi"
"github.com/purpleidea/mgmt/gapi/empty" "github.com/purpleidea/mgmt/gapi/empty"
@@ -595,24 +594,6 @@ func (obj *Main) Run() error {
client = c client = c
} }
simpleDeploy := &deployer.SimpleDeploy{
Client: client,
Debug: obj.Debug,
Logf: func(format string, v ...interface{}) {
obj.Logf("deploy: "+format, v...)
},
}
if err := simpleDeploy.Init(); err != nil {
return errwrap.Wrapf(err, "deploy Init failed")
}
defer func() {
err := errwrap.Wrapf(simpleDeploy.Close(), "deploy Close failed")
if err != nil {
// TODO: cause the final exit code to be non-zero
Logf("cleanup error: %+v", err)
}
}()
// implementation of the Local API (we only expect just this single one) // implementation of the Local API (we only expect just this single one)
localAPI := (&local.API{ localAPI := (&local.API{
Prefix: fmt.Sprintf("%s/", path.Join(prefix, "local")), Prefix: fmt.Sprintf("%s/", path.Join(prefix, "local")),
@@ -628,7 +609,8 @@ func (obj *Main) Run() error {
// XXX: The "implementation of the World API" should have more than just // XXX: The "implementation of the World API" should have more than just
// etcd in it, so this could live elsewhere package wise and just have // etcd in it, so this could live elsewhere package wise and just have
// an etcd component from the etcd package added in. // an etcd component from the etcd package added in.
world := &etcd.World{ var world engine.World
world = &etcd.World{
Hostname: hostname, Hostname: hostname,
Client: client, Client: client,
MetadataPrefix: MetadataPrefix, MetadataPrefix: MetadataPrefix,
@@ -645,6 +627,16 @@ func (obj *Main) Run() error {
return gapiInfoResult.URI return gapiInfoResult.URI
}, },
} }
if err := world.Init(); err != nil {
return errwrap.Wrapf(err, "world Init failed")
}
defer func() {
err := errwrap.Wrapf(world.Close(), "world Close failed")
if err != nil {
// TODO: cause the final exit code to be non-zero?
Logf("close error: %+v", err)
}
}()
obj.ge = &graph.Engine{ obj.ge = &graph.Engine{
Program: obj.Program, Program: obj.Program,
@@ -1046,7 +1038,7 @@ func (obj *Main) Run() error {
// get max id (from all the previous deploys) // get max id (from all the previous deploys)
// this is what the existing cluster is already running // this is what the existing cluster is already running
// TODO: add a timeout to context? // TODO: add a timeout to context?
max, err := simpleDeploy.GetMaxDeployID(exitCtx) max, err := world.GetMaxDeployID(exitCtx)
if err != nil { if err != nil {
close(deployChan) // because we won't close it downstream... close(deployChan) // because we won't close it downstream...
return errwrap.Wrapf(err, "error getting max deploy id") return errwrap.Wrapf(err, "error getting max deploy id")
@@ -1085,7 +1077,7 @@ func (obj *Main) Run() error {
// now we can wait for future deploys, but if we already had an // now we can wait for future deploys, but if we already had an
// initial deploy from run, don't switch to this unless it's new // initial deploy from run, don't switch to this unless it's new
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
watchChan, err := simpleDeploy.WatchDeploy(ctx) watchChan, err := world.WatchDeploy(ctx)
if err != nil { if err != nil {
cancel() cancel()
Logf("error starting deploy: %+v", err) Logf("error starting deploy: %+v", err)
@@ -1140,7 +1132,7 @@ func (obj *Main) Run() error {
// return // exit via channel close instead // return // exit via channel close instead
} }
latest, err := simpleDeploy.GetMaxDeployID(ctx) // or zero latest, err := world.GetMaxDeployID(ctx) // or zero
if err != nil { if err != nil {
Logf("error getting max deploy id: %+v", err) Logf("error getting max deploy id: %+v", err)
continue continue
@@ -1167,7 +1159,7 @@ func (obj *Main) Run() error {
// 0 passes through an empty deploy without an error... // 0 passes through an empty deploy without an error...
// (unless there is some sort of etcd error that occurs) // (unless there is some sort of etcd error that occurs)
str, err := simpleDeploy.GetDeploy(ctx, latest) str, err := world.GetDeploy(ctx, latest)
if err != nil { if err != nil {
Logf("deploy: error getting deploy: %+v", err) Logf("deploy: error getting deploy: %+v", err)
continue continue