From f269096eb9094a4cc257c4d54fdc54c85f97fd25 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Wed, 19 Mar 2025 06:01:42 -0400 Subject: [PATCH] cli, etcd, lib: Remove the etcd client from main We are slowly getting rid of more cruft and abstracting it nicely. More to go! --- cli/deploy.go | 10 +++-- etcd/world.go | 101 ++++++++++++++++++++++++++++++++++++-------------- lib/main.go | 22 ++--------- 3 files changed, 83 insertions(+), 50 deletions(-) diff --git a/cli/deploy.go b/cli/deploy.go index 3ceaa79f..3e6f0a76 100644 --- a/cli/deploy.go +++ b/cli/deploy.go @@ -199,7 +199,9 @@ func (obj *DeployArgs) Run(ctx context.Context, data *cliUtil.Data) (bool, error var world engine.World world = &etcd.World{ // XXX: What should some of these fields be? - Client: etcdClient, + Client: etcdClient, // XXX: remove me when etcdfs below is done + Seeds: obj.Seeds, + NS: lib.NS, //MetadataPrefix: lib.MetadataPrefix, //StoragePrefix: lib.StoragePrefix, //StandaloneFs: ???.DeployFs, // used for static deploys @@ -208,9 +210,9 @@ func (obj *DeployArgs) Run(ctx context.Context, data *cliUtil.Data) (bool, error } if obj.SshUrl != "" { // alternate world implementation over SSH world = &etcdSSH.World{ - URL: obj.SshUrl, - //Client: client, - NS: lib.NS, + URL: obj.SshUrl, + Seeds: obj.Seeds, + NS: lib.NS, //MetadataPrefix: lib.MetadataPrefix, //StoragePrefix: lib.StoragePrefix, //StandaloneFs: ???.DeployFs, // used for static deploys diff --git a/etcd/world.go b/etcd/world.go index aa4e219f..5a580794 100644 --- a/etcd/world.go +++ b/etcd/world.go @@ -53,23 +53,56 @@ import ( // World is an etcd backed implementation of the World interface. type World struct { // NOTE: Update the etcd/ssh/ World struct if this one changes. - // XXX: build your own etcd client... - Client interfaces.Client + + // Client is the etcd client to use. This should not be specified, one + // will be created automatically. This exists for legacy reasons and for + // the SSH etcd world implementation. Maybe it can be removed in the + // future. + Client interfaces.Client + + // Seeds are the list of etcd endpoints to connect to. + Seeds []string + + // NS is the etcd namespace to use. + NS string + MetadataPrefix string // expected metadata prefix StoragePrefix string // storage prefix for etcdfs storage StandaloneFs engine.Fs // store an fs here for local usage GetURI func() string init *engine.WorldInit + client interfaces.Client simpleDeploy *deployer.SimpleDeploy + + cleanups []func() error } // Init runs first. func (obj *World) Init(init *engine.WorldInit) error { obj.init = init + obj.client = obj.Client // legacy default + if obj.Client == nil { + c := client.NewClientFromSeedsNamespace( + obj.Seeds, // endpoints + obj.NS, + ) + if err := c.Init(); err != nil { + return errwrap.Wrapf(err, "client Init failed") + } + obj.cleanups = append(obj.cleanups, func() error { + e := c.Close() + if obj.init.Debug && e != nil { + obj.init.Logf("etcd client close error: %+v", e) + } + return e + }) + obj.client = c + } + obj.simpleDeploy = &deployer.SimpleDeploy{ - Client: obj.Client, + Client: obj.client, Debug: obj.init.Debug, Logf: func(format string, v ...interface{}) { obj.init.Logf("deploy: "+format, v...) @@ -78,19 +111,33 @@ func (obj *World) Init(init *engine.WorldInit) error { if err := obj.simpleDeploy.Init(); err != nil { return errwrap.Wrapf(err, "deploy Init failed") } + obj.cleanups = append(obj.cleanups, func() error { + e := obj.simpleDeploy.Close() + if obj.init.Debug && e != nil { + obj.init.Logf("deploy close error: %+v", e) + } + return e + }) return nil } +// cleanup performs all the "close" actions either at the very end or as we go. +func (obj *World) cleanup() error { + var errs error + for i := len(obj.cleanups) - 1; i >= 0; i-- { // reverse + f := obj.cleanups[i] + if err := f(); err != nil { + errs = errwrap.Append(errs, err) + } + } + obj.cleanups = nil // clean + return errs +} + // Close runs last. func (obj *World) Close() error { - var errs error - if obj.simpleDeploy != nil { - err := obj.simpleDeploy.Close() - err = errwrap.Wrapf(err, "deploy Close failed") - errs = errwrap.Append(errs, err) - } - return errs + return obj.cleanup() } // WatchDeploy returns a channel which spits out events on new deploy activity. @@ -121,13 +168,13 @@ func (obj *World) AddDeploy(ctx context.Context, id uint64, hash, pHash string, // ResWatch returns a channel which spits out events on possible exported // resource changes. func (obj *World) ResWatch(ctx context.Context) (chan error, error) { - return resources.WatchResources(ctx, obj.Client) + return resources.WatchResources(ctx, obj.client) } // ResExport exports a list of resources under our hostname namespace. // Subsequent calls replace the previously set collection atomically. func (obj *World) ResExport(ctx context.Context, resourceList []engine.Res) error { - return resources.SetResources(ctx, obj.Client, obj.init.Hostname, resourceList) + return resources.SetResources(ctx, obj.client, obj.init.Hostname, resourceList) } // ResCollect gets the collection of exported resources which match the filter. @@ -136,13 +183,13 @@ func (obj *World) ResCollect(ctx context.Context, hostnameFilter, kindFilter []s // XXX: should we be restricted to retrieving resources that were // exported with a tag that allows or restricts our hostname? We could // enforce that here if the underlying API supported it... Add this? - return resources.GetResources(ctx, obj.Client, hostnameFilter, kindFilter) + return resources.GetResources(ctx, obj.client, hostnameFilter, kindFilter) } // IdealClusterSizeWatch returns a stream of errors anytime the cluster-wide // dynamic cluster size setpoint changes. func (obj *World) IdealClusterSizeWatch(ctx context.Context) (chan error, error) { - c := client.NewClientFromSimple(obj.Client, ChooserPath) + c := client.NewClientFromSimple(obj.client, ChooserPath) if err := c.Init(); err != nil { return nil, err } @@ -166,7 +213,7 @@ func (obj *World) IdealClusterSizeWatch(ctx context.Context) (chan error, error) // IdealClusterSizeGet gets the cluster-wide dynamic cluster size setpoint. func (obj *World) IdealClusterSizeGet(ctx context.Context) (uint16, error) { - c := client.NewClientFromSimple(obj.Client, ChooserPath) + c := client.NewClientFromSimple(obj.client, ChooserPath) if err := c.Init(); err != nil { return 0, err } @@ -176,7 +223,7 @@ func (obj *World) IdealClusterSizeGet(ctx context.Context) (uint16, error) { // IdealClusterSizeSet sets the cluster-wide dynamic cluster size setpoint. func (obj *World) IdealClusterSizeSet(ctx context.Context, size uint16) (bool, error) { - c := client.NewClientFromSimple(obj.Client, ChooserPath) + c := client.NewClientFromSimple(obj.client, ChooserPath) if err := c.Init(); err != nil { return false, err } @@ -186,7 +233,7 @@ func (obj *World) IdealClusterSizeSet(ctx context.Context, size uint16) (bool, e // StrWatch returns a channel which spits out events on possible string changes. func (obj *World) StrWatch(ctx context.Context, namespace string) (chan error, error) { - return str.WatchStr(ctx, obj.Client, namespace) + return str.WatchStr(ctx, obj.client, namespace) } // StrIsNotExist returns whether the error from StrGet is a key missing error. @@ -196,41 +243,41 @@ func (obj *World) StrIsNotExist(err error) bool { // StrGet returns the value for the the given namespace. func (obj *World) StrGet(ctx context.Context, namespace string) (string, error) { - return str.GetStr(ctx, obj.Client, namespace) + return str.GetStr(ctx, obj.client, namespace) } // StrSet sets the namespace value to a particular string. // XXX: This can overwrite another hosts value that was set with StrMapSet. Add // possible cryptographic signing or special namespacing to prevent such things. func (obj *World) StrSet(ctx context.Context, namespace, value string) error { - return str.SetStr(ctx, obj.Client, namespace, &value) + return str.SetStr(ctx, obj.client, namespace, &value) } // StrDel deletes the value in a particular namespace. func (obj *World) StrDel(ctx context.Context, namespace string) error { - return str.SetStr(ctx, obj.Client, namespace, nil) + return str.SetStr(ctx, obj.client, namespace, nil) } // StrMapWatch returns a channel which spits out events on possible string // changes. func (obj *World) StrMapWatch(ctx context.Context, namespace string) (chan error, error) { - return strmap.WatchStrMap(ctx, obj.Client, namespace) + return strmap.WatchStrMap(ctx, obj.client, namespace) } // StrMapGet returns a map of hostnames to values in the given namespace. func (obj *World) StrMapGet(ctx context.Context, namespace string) (map[string]string, error) { - return strmap.GetStrMap(ctx, obj.Client, []string{}, namespace) + return strmap.GetStrMap(ctx, obj.client, []string{}, namespace) } // StrMapSet sets the namespace value to a particular string under the identity // of its own hostname. func (obj *World) StrMapSet(ctx context.Context, namespace, value string) error { - return strmap.SetStrMap(ctx, obj.Client, obj.init.Hostname, namespace, &value) + return strmap.SetStrMap(ctx, obj.client, obj.init.Hostname, namespace, &value) } // StrMapDel deletes the value in a particular namespace. func (obj *World) StrMapDel(ctx context.Context, namespace string) error { - return strmap.SetStrMap(ctx, obj.Client, obj.init.Hostname, namespace, nil) + return strmap.SetStrMap(ctx, obj.client, obj.init.Hostname, namespace, nil) } // Scheduler returns a scheduling result of hosts in a particular namespace. @@ -245,7 +292,7 @@ func (obj *World) Scheduler(namespace string, opts ...scheduler.Option) (*schedu modifiedOpts = append(modifiedOpts, scheduler.Logf(obj.init.Logf)) path := fmt.Sprintf(schedulerPathFmt, namespace) - return scheduler.Schedule(obj.Client.GetClient(), path, obj.init.Hostname, modifiedOpts...) + return scheduler.Schedule(obj.client.GetClient(), path, obj.init.Hostname, modifiedOpts...) } // URI returns the current FS URI. @@ -285,7 +332,7 @@ func (obj *World) Fs(uri string) (engine.Fs, error) { } etcdFs := &etcdfs.Fs{ - Client: obj.Client, // TODO: do we need to add a namespace? + Client: obj.client, // TODO: do we need to add a namespace? Metadata: u.Path, DataPrefix: obj.StoragePrefix, @@ -299,5 +346,5 @@ func (obj *World) Fs(uri string) (engine.Fs, error) { // WatchMembers returns a channel of changing members in the cluster. func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error) { - return obj.Client.WatchMembers(ctx) + return obj.client.WatchMembers(ctx) } diff --git a/lib/main.go b/lib/main.go index 1706448a..1e9ae609 100644 --- a/lib/main.go +++ b/lib/main.go @@ -50,7 +50,6 @@ import ( _ "github.com/purpleidea/mgmt/engine/resources" // let register's run "github.com/purpleidea/mgmt/etcd" "github.com/purpleidea/mgmt/etcd/chooser" - etcdClient "github.com/purpleidea/mgmt/etcd/client" etcdInterfaces "github.com/purpleidea/mgmt/etcd/interfaces" etcdSSH "github.com/purpleidea/mgmt/etcd/ssh" "github.com/purpleidea/mgmt/gapi" @@ -586,22 +585,6 @@ func (obj *Main) Run() error { if err != nil { return errwrap.Wrapf(err, "make Client failed") } - } else { - c := etcdClient.NewClientFromSeedsNamespace( - obj.Seeds, // endpoints - NS, - ) - if err := c.Init(); err != nil { - return errwrap.Wrapf(err, "client Init failed") - } - defer func() { - err := errwrap.Wrapf(c.Close(), "client Close failed") - if err != nil { - // TODO: cause the final exit code to be non-zero - Logf("client cleanup error: %+v", err) - } - }() - client = c } // implementation of the Local API (we only expect just this single one) @@ -621,8 +604,9 @@ func (obj *Main) Run() error { // an etcd component from the etcd package added in. var world engine.World world = &etcd.World{ - Client: client, - //NS: NS, + Client: client, // XXX: remove me when embdEtcd is inside world + Seeds: obj.Seeds, + NS: NS, MetadataPrefix: MetadataPrefix, StoragePrefix: StoragePrefix, StandaloneFs: obj.DeployFs, // used for static deploys