cli, etcd, lib: Remove the etcd client from main
We are slowly getting rid of more cruft and abstracting it nicely. More to go!
This commit is contained in:
@@ -199,7 +199,9 @@ func (obj *DeployArgs) Run(ctx context.Context, data *cliUtil.Data) (bool, error
|
|||||||
|
|
||||||
var world engine.World
|
var world engine.World
|
||||||
world = &etcd.World{ // XXX: What should some of these fields be?
|
world = &etcd.World{ // XXX: What should some of these fields be?
|
||||||
Client: etcdClient,
|
Client: etcdClient, // XXX: remove me when etcdfs below is done
|
||||||
|
Seeds: obj.Seeds,
|
||||||
|
NS: lib.NS,
|
||||||
//MetadataPrefix: lib.MetadataPrefix,
|
//MetadataPrefix: lib.MetadataPrefix,
|
||||||
//StoragePrefix: lib.StoragePrefix,
|
//StoragePrefix: lib.StoragePrefix,
|
||||||
//StandaloneFs: ???.DeployFs, // used for static deploys
|
//StandaloneFs: ???.DeployFs, // used for static deploys
|
||||||
@@ -209,7 +211,7 @@ func (obj *DeployArgs) Run(ctx context.Context, data *cliUtil.Data) (bool, error
|
|||||||
if obj.SshUrl != "" { // alternate world implementation over SSH
|
if obj.SshUrl != "" { // alternate world implementation over SSH
|
||||||
world = &etcdSSH.World{
|
world = &etcdSSH.World{
|
||||||
URL: obj.SshUrl,
|
URL: obj.SshUrl,
|
||||||
//Client: client,
|
Seeds: obj.Seeds,
|
||||||
NS: lib.NS,
|
NS: lib.NS,
|
||||||
//MetadataPrefix: lib.MetadataPrefix,
|
//MetadataPrefix: lib.MetadataPrefix,
|
||||||
//StoragePrefix: lib.StoragePrefix,
|
//StoragePrefix: lib.StoragePrefix,
|
||||||
|
|||||||
@@ -53,23 +53,56 @@ import (
|
|||||||
// World is an etcd backed implementation of the World interface.
|
// World is an etcd backed implementation of the World interface.
|
||||||
type World struct {
|
type World struct {
|
||||||
// NOTE: Update the etcd/ssh/ World struct if this one changes.
|
// NOTE: Update the etcd/ssh/ World struct if this one changes.
|
||||||
// XXX: build your own etcd client...
|
|
||||||
|
// Client is the etcd client to use. This should not be specified, one
|
||||||
|
// will be created automatically. This exists for legacy reasons and for
|
||||||
|
// the SSH etcd world implementation. Maybe it can be removed in the
|
||||||
|
// future.
|
||||||
Client interfaces.Client
|
Client interfaces.Client
|
||||||
|
|
||||||
|
// Seeds are the list of etcd endpoints to connect to.
|
||||||
|
Seeds []string
|
||||||
|
|
||||||
|
// NS is the etcd namespace to use.
|
||||||
|
NS string
|
||||||
|
|
||||||
MetadataPrefix string // expected metadata prefix
|
MetadataPrefix string // expected metadata prefix
|
||||||
StoragePrefix string // storage prefix for etcdfs storage
|
StoragePrefix string // storage prefix for etcdfs storage
|
||||||
StandaloneFs engine.Fs // store an fs here for local usage
|
StandaloneFs engine.Fs // store an fs here for local usage
|
||||||
GetURI func() string
|
GetURI func() string
|
||||||
|
|
||||||
init *engine.WorldInit
|
init *engine.WorldInit
|
||||||
|
client interfaces.Client
|
||||||
simpleDeploy *deployer.SimpleDeploy
|
simpleDeploy *deployer.SimpleDeploy
|
||||||
|
|
||||||
|
cleanups []func() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init runs first.
|
// Init runs first.
|
||||||
func (obj *World) Init(init *engine.WorldInit) error {
|
func (obj *World) Init(init *engine.WorldInit) error {
|
||||||
obj.init = init
|
obj.init = init
|
||||||
|
|
||||||
|
obj.client = obj.Client // legacy default
|
||||||
|
if obj.Client == nil {
|
||||||
|
c := client.NewClientFromSeedsNamespace(
|
||||||
|
obj.Seeds, // endpoints
|
||||||
|
obj.NS,
|
||||||
|
)
|
||||||
|
if err := c.Init(); err != nil {
|
||||||
|
return errwrap.Wrapf(err, "client Init failed")
|
||||||
|
}
|
||||||
|
obj.cleanups = append(obj.cleanups, func() error {
|
||||||
|
e := c.Close()
|
||||||
|
if obj.init.Debug && e != nil {
|
||||||
|
obj.init.Logf("etcd client close error: %+v", e)
|
||||||
|
}
|
||||||
|
return e
|
||||||
|
})
|
||||||
|
obj.client = c
|
||||||
|
}
|
||||||
|
|
||||||
obj.simpleDeploy = &deployer.SimpleDeploy{
|
obj.simpleDeploy = &deployer.SimpleDeploy{
|
||||||
Client: obj.Client,
|
Client: obj.client,
|
||||||
Debug: obj.init.Debug,
|
Debug: obj.init.Debug,
|
||||||
Logf: func(format string, v ...interface{}) {
|
Logf: func(format string, v ...interface{}) {
|
||||||
obj.init.Logf("deploy: "+format, v...)
|
obj.init.Logf("deploy: "+format, v...)
|
||||||
@@ -78,21 +111,35 @@ func (obj *World) Init(init *engine.WorldInit) error {
|
|||||||
if err := obj.simpleDeploy.Init(); err != nil {
|
if err := obj.simpleDeploy.Init(); err != nil {
|
||||||
return errwrap.Wrapf(err, "deploy Init failed")
|
return errwrap.Wrapf(err, "deploy Init failed")
|
||||||
}
|
}
|
||||||
|
obj.cleanups = append(obj.cleanups, func() error {
|
||||||
|
e := obj.simpleDeploy.Close()
|
||||||
|
if obj.init.Debug && e != nil {
|
||||||
|
obj.init.Logf("deploy close error: %+v", e)
|
||||||
|
}
|
||||||
|
return e
|
||||||
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close runs last.
|
// cleanup performs all the "close" actions either at the very end or as we go.
|
||||||
func (obj *World) Close() error {
|
func (obj *World) cleanup() error {
|
||||||
var errs error
|
var errs error
|
||||||
if obj.simpleDeploy != nil {
|
for i := len(obj.cleanups) - 1; i >= 0; i-- { // reverse
|
||||||
err := obj.simpleDeploy.Close()
|
f := obj.cleanups[i]
|
||||||
err = errwrap.Wrapf(err, "deploy Close failed")
|
if err := f(); err != nil {
|
||||||
errs = errwrap.Append(errs, err)
|
errs = errwrap.Append(errs, err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
obj.cleanups = nil // clean
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close runs last.
|
||||||
|
func (obj *World) Close() error {
|
||||||
|
return obj.cleanup()
|
||||||
|
}
|
||||||
|
|
||||||
// WatchDeploy returns a channel which spits out events on new deploy activity.
|
// WatchDeploy returns a channel which spits out events on new deploy activity.
|
||||||
func (obj *World) WatchDeploy(ctx context.Context) (chan error, error) {
|
func (obj *World) WatchDeploy(ctx context.Context) (chan error, error) {
|
||||||
return obj.simpleDeploy.WatchDeploy(ctx)
|
return obj.simpleDeploy.WatchDeploy(ctx)
|
||||||
@@ -121,13 +168,13 @@ func (obj *World) AddDeploy(ctx context.Context, id uint64, hash, pHash string,
|
|||||||
// ResWatch returns a channel which spits out events on possible exported
|
// ResWatch returns a channel which spits out events on possible exported
|
||||||
// resource changes.
|
// resource changes.
|
||||||
func (obj *World) ResWatch(ctx context.Context) (chan error, error) {
|
func (obj *World) ResWatch(ctx context.Context) (chan error, error) {
|
||||||
return resources.WatchResources(ctx, obj.Client)
|
return resources.WatchResources(ctx, obj.client)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResExport exports a list of resources under our hostname namespace.
|
// ResExport exports a list of resources under our hostname namespace.
|
||||||
// Subsequent calls replace the previously set collection atomically.
|
// Subsequent calls replace the previously set collection atomically.
|
||||||
func (obj *World) ResExport(ctx context.Context, resourceList []engine.Res) error {
|
func (obj *World) ResExport(ctx context.Context, resourceList []engine.Res) error {
|
||||||
return resources.SetResources(ctx, obj.Client, obj.init.Hostname, resourceList)
|
return resources.SetResources(ctx, obj.client, obj.init.Hostname, resourceList)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResCollect gets the collection of exported resources which match the filter.
|
// ResCollect gets the collection of exported resources which match the filter.
|
||||||
@@ -136,13 +183,13 @@ func (obj *World) ResCollect(ctx context.Context, hostnameFilter, kindFilter []s
|
|||||||
// XXX: should we be restricted to retrieving resources that were
|
// XXX: should we be restricted to retrieving resources that were
|
||||||
// exported with a tag that allows or restricts our hostname? We could
|
// exported with a tag that allows or restricts our hostname? We could
|
||||||
// enforce that here if the underlying API supported it... Add this?
|
// enforce that here if the underlying API supported it... Add this?
|
||||||
return resources.GetResources(ctx, obj.Client, hostnameFilter, kindFilter)
|
return resources.GetResources(ctx, obj.client, hostnameFilter, kindFilter)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IdealClusterSizeWatch returns a stream of errors anytime the cluster-wide
|
// IdealClusterSizeWatch returns a stream of errors anytime the cluster-wide
|
||||||
// dynamic cluster size setpoint changes.
|
// dynamic cluster size setpoint changes.
|
||||||
func (obj *World) IdealClusterSizeWatch(ctx context.Context) (chan error, error) {
|
func (obj *World) IdealClusterSizeWatch(ctx context.Context) (chan error, error) {
|
||||||
c := client.NewClientFromSimple(obj.Client, ChooserPath)
|
c := client.NewClientFromSimple(obj.client, ChooserPath)
|
||||||
if err := c.Init(); err != nil {
|
if err := c.Init(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -166,7 +213,7 @@ func (obj *World) IdealClusterSizeWatch(ctx context.Context) (chan error, error)
|
|||||||
|
|
||||||
// IdealClusterSizeGet gets the cluster-wide dynamic cluster size setpoint.
|
// IdealClusterSizeGet gets the cluster-wide dynamic cluster size setpoint.
|
||||||
func (obj *World) IdealClusterSizeGet(ctx context.Context) (uint16, error) {
|
func (obj *World) IdealClusterSizeGet(ctx context.Context) (uint16, error) {
|
||||||
c := client.NewClientFromSimple(obj.Client, ChooserPath)
|
c := client.NewClientFromSimple(obj.client, ChooserPath)
|
||||||
if err := c.Init(); err != nil {
|
if err := c.Init(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@@ -176,7 +223,7 @@ func (obj *World) IdealClusterSizeGet(ctx context.Context) (uint16, error) {
|
|||||||
|
|
||||||
// IdealClusterSizeSet sets the cluster-wide dynamic cluster size setpoint.
|
// IdealClusterSizeSet sets the cluster-wide dynamic cluster size setpoint.
|
||||||
func (obj *World) IdealClusterSizeSet(ctx context.Context, size uint16) (bool, error) {
|
func (obj *World) IdealClusterSizeSet(ctx context.Context, size uint16) (bool, error) {
|
||||||
c := client.NewClientFromSimple(obj.Client, ChooserPath)
|
c := client.NewClientFromSimple(obj.client, ChooserPath)
|
||||||
if err := c.Init(); err != nil {
|
if err := c.Init(); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@@ -186,7 +233,7 @@ func (obj *World) IdealClusterSizeSet(ctx context.Context, size uint16) (bool, e
|
|||||||
|
|
||||||
// StrWatch returns a channel which spits out events on possible string changes.
|
// StrWatch returns a channel which spits out events on possible string changes.
|
||||||
func (obj *World) StrWatch(ctx context.Context, namespace string) (chan error, error) {
|
func (obj *World) StrWatch(ctx context.Context, namespace string) (chan error, error) {
|
||||||
return str.WatchStr(ctx, obj.Client, namespace)
|
return str.WatchStr(ctx, obj.client, namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StrIsNotExist returns whether the error from StrGet is a key missing error.
|
// StrIsNotExist returns whether the error from StrGet is a key missing error.
|
||||||
@@ -196,41 +243,41 @@ func (obj *World) StrIsNotExist(err error) bool {
|
|||||||
|
|
||||||
// StrGet returns the value for the the given namespace.
|
// StrGet returns the value for the the given namespace.
|
||||||
func (obj *World) StrGet(ctx context.Context, namespace string) (string, error) {
|
func (obj *World) StrGet(ctx context.Context, namespace string) (string, error) {
|
||||||
return str.GetStr(ctx, obj.Client, namespace)
|
return str.GetStr(ctx, obj.client, namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StrSet sets the namespace value to a particular string.
|
// StrSet sets the namespace value to a particular string.
|
||||||
// XXX: This can overwrite another hosts value that was set with StrMapSet. Add
|
// XXX: This can overwrite another hosts value that was set with StrMapSet. Add
|
||||||
// possible cryptographic signing or special namespacing to prevent such things.
|
// possible cryptographic signing or special namespacing to prevent such things.
|
||||||
func (obj *World) StrSet(ctx context.Context, namespace, value string) error {
|
func (obj *World) StrSet(ctx context.Context, namespace, value string) error {
|
||||||
return str.SetStr(ctx, obj.Client, namespace, &value)
|
return str.SetStr(ctx, obj.client, namespace, &value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StrDel deletes the value in a particular namespace.
|
// StrDel deletes the value in a particular namespace.
|
||||||
func (obj *World) StrDel(ctx context.Context, namespace string) error {
|
func (obj *World) StrDel(ctx context.Context, namespace string) error {
|
||||||
return str.SetStr(ctx, obj.Client, namespace, nil)
|
return str.SetStr(ctx, obj.client, namespace, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StrMapWatch returns a channel which spits out events on possible string
|
// StrMapWatch returns a channel which spits out events on possible string
|
||||||
// changes.
|
// changes.
|
||||||
func (obj *World) StrMapWatch(ctx context.Context, namespace string) (chan error, error) {
|
func (obj *World) StrMapWatch(ctx context.Context, namespace string) (chan error, error) {
|
||||||
return strmap.WatchStrMap(ctx, obj.Client, namespace)
|
return strmap.WatchStrMap(ctx, obj.client, namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StrMapGet returns a map of hostnames to values in the given namespace.
|
// StrMapGet returns a map of hostnames to values in the given namespace.
|
||||||
func (obj *World) StrMapGet(ctx context.Context, namespace string) (map[string]string, error) {
|
func (obj *World) StrMapGet(ctx context.Context, namespace string) (map[string]string, error) {
|
||||||
return strmap.GetStrMap(ctx, obj.Client, []string{}, namespace)
|
return strmap.GetStrMap(ctx, obj.client, []string{}, namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StrMapSet sets the namespace value to a particular string under the identity
|
// StrMapSet sets the namespace value to a particular string under the identity
|
||||||
// of its own hostname.
|
// of its own hostname.
|
||||||
func (obj *World) StrMapSet(ctx context.Context, namespace, value string) error {
|
func (obj *World) StrMapSet(ctx context.Context, namespace, value string) error {
|
||||||
return strmap.SetStrMap(ctx, obj.Client, obj.init.Hostname, namespace, &value)
|
return strmap.SetStrMap(ctx, obj.client, obj.init.Hostname, namespace, &value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StrMapDel deletes the value in a particular namespace.
|
// StrMapDel deletes the value in a particular namespace.
|
||||||
func (obj *World) StrMapDel(ctx context.Context, namespace string) error {
|
func (obj *World) StrMapDel(ctx context.Context, namespace string) error {
|
||||||
return strmap.SetStrMap(ctx, obj.Client, obj.init.Hostname, namespace, nil)
|
return strmap.SetStrMap(ctx, obj.client, obj.init.Hostname, namespace, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scheduler returns a scheduling result of hosts in a particular namespace.
|
// Scheduler returns a scheduling result of hosts in a particular namespace.
|
||||||
@@ -245,7 +292,7 @@ func (obj *World) Scheduler(namespace string, opts ...scheduler.Option) (*schedu
|
|||||||
modifiedOpts = append(modifiedOpts, scheduler.Logf(obj.init.Logf))
|
modifiedOpts = append(modifiedOpts, scheduler.Logf(obj.init.Logf))
|
||||||
|
|
||||||
path := fmt.Sprintf(schedulerPathFmt, namespace)
|
path := fmt.Sprintf(schedulerPathFmt, namespace)
|
||||||
return scheduler.Schedule(obj.Client.GetClient(), path, obj.init.Hostname, modifiedOpts...)
|
return scheduler.Schedule(obj.client.GetClient(), path, obj.init.Hostname, modifiedOpts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// URI returns the current FS URI.
|
// URI returns the current FS URI.
|
||||||
@@ -285,7 +332,7 @@ func (obj *World) Fs(uri string) (engine.Fs, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
etcdFs := &etcdfs.Fs{
|
etcdFs := &etcdfs.Fs{
|
||||||
Client: obj.Client, // TODO: do we need to add a namespace?
|
Client: obj.client, // TODO: do we need to add a namespace?
|
||||||
Metadata: u.Path,
|
Metadata: u.Path,
|
||||||
DataPrefix: obj.StoragePrefix,
|
DataPrefix: obj.StoragePrefix,
|
||||||
|
|
||||||
@@ -299,5 +346,5 @@ func (obj *World) Fs(uri string) (engine.Fs, error) {
|
|||||||
|
|
||||||
// WatchMembers returns a channel of changing members in the cluster.
|
// WatchMembers returns a channel of changing members in the cluster.
|
||||||
func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error) {
|
func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error) {
|
||||||
return obj.Client.WatchMembers(ctx)
|
return obj.client.WatchMembers(ctx)
|
||||||
}
|
}
|
||||||
|
|||||||
22
lib/main.go
22
lib/main.go
@@ -50,7 +50,6 @@ import (
|
|||||||
_ "github.com/purpleidea/mgmt/engine/resources" // let register's run
|
_ "github.com/purpleidea/mgmt/engine/resources" // let register's run
|
||||||
"github.com/purpleidea/mgmt/etcd"
|
"github.com/purpleidea/mgmt/etcd"
|
||||||
"github.com/purpleidea/mgmt/etcd/chooser"
|
"github.com/purpleidea/mgmt/etcd/chooser"
|
||||||
etcdClient "github.com/purpleidea/mgmt/etcd/client"
|
|
||||||
etcdInterfaces "github.com/purpleidea/mgmt/etcd/interfaces"
|
etcdInterfaces "github.com/purpleidea/mgmt/etcd/interfaces"
|
||||||
etcdSSH "github.com/purpleidea/mgmt/etcd/ssh"
|
etcdSSH "github.com/purpleidea/mgmt/etcd/ssh"
|
||||||
"github.com/purpleidea/mgmt/gapi"
|
"github.com/purpleidea/mgmt/gapi"
|
||||||
@@ -586,22 +585,6 @@ func (obj *Main) Run() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errwrap.Wrapf(err, "make Client failed")
|
return errwrap.Wrapf(err, "make Client failed")
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
c := etcdClient.NewClientFromSeedsNamespace(
|
|
||||||
obj.Seeds, // endpoints
|
|
||||||
NS,
|
|
||||||
)
|
|
||||||
if err := c.Init(); err != nil {
|
|
||||||
return errwrap.Wrapf(err, "client Init failed")
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
err := errwrap.Wrapf(c.Close(), "client Close failed")
|
|
||||||
if err != nil {
|
|
||||||
// TODO: cause the final exit code to be non-zero
|
|
||||||
Logf("client cleanup error: %+v", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
client = c
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// implementation of the Local API (we only expect just this single one)
|
// implementation of the Local API (we only expect just this single one)
|
||||||
@@ -621,8 +604,9 @@ func (obj *Main) Run() error {
|
|||||||
// an etcd component from the etcd package added in.
|
// an etcd component from the etcd package added in.
|
||||||
var world engine.World
|
var world engine.World
|
||||||
world = &etcd.World{
|
world = &etcd.World{
|
||||||
Client: client,
|
Client: client, // XXX: remove me when embdEtcd is inside world
|
||||||
//NS: NS,
|
Seeds: obj.Seeds,
|
||||||
|
NS: NS,
|
||||||
MetadataPrefix: MetadataPrefix,
|
MetadataPrefix: MetadataPrefix,
|
||||||
StoragePrefix: StoragePrefix,
|
StoragePrefix: StoragePrefix,
|
||||||
StandaloneFs: obj.DeployFs, // used for static deploys
|
StandaloneFs: obj.DeployFs, // used for static deploys
|
||||||
|
|||||||
Reference in New Issue
Block a user