360 lines
13 KiB
Go
360 lines
13 KiB
Go
// Mgmt
|
|
// Copyright (C) James Shubin and the project contributors
|
|
// Written by James Shubin <james@shubin.ca> and the project contributors
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
//
|
|
// Additional permission under GNU GPL version 3 section 7
|
|
//
|
|
// If you modify this program, or any covered work, by linking or combining it
|
|
// with embedded mcl code and modules (and that the embedded mcl code and
|
|
// modules which link with this program, contain a copy of their source code in
|
|
// the authoritative form) containing parts covered by the terms of any other
|
|
// license, the licensors of this program grant you additional permission to
|
|
// convey the resulting work. Furthermore, the licensors of this program grant
|
|
// the original author, James Shubin, additional permission to update this
|
|
// additional permission if he deems it necessary to achieve the goals of this
|
|
// additional permission.
|
|
|
|
package etcd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/url"
|
|
"strings"
|
|
|
|
"github.com/purpleidea/mgmt/engine"
|
|
"github.com/purpleidea/mgmt/etcd/chooser"
|
|
"github.com/purpleidea/mgmt/etcd/client"
|
|
"github.com/purpleidea/mgmt/etcd/client/resources"
|
|
"github.com/purpleidea/mgmt/etcd/client/str"
|
|
"github.com/purpleidea/mgmt/etcd/client/strmap"
|
|
"github.com/purpleidea/mgmt/etcd/deployer"
|
|
etcdfs "github.com/purpleidea/mgmt/etcd/fs"
|
|
"github.com/purpleidea/mgmt/etcd/interfaces"
|
|
"github.com/purpleidea/mgmt/etcd/scheduler"
|
|
"github.com/purpleidea/mgmt/lang/embedded"
|
|
"github.com/purpleidea/mgmt/util"
|
|
"github.com/purpleidea/mgmt/util/errwrap"
|
|
)
|
|
|
|
// World is an etcd backed implementation of the World interface.
|
|
type World struct {
|
|
// NOTE: Update the etcd/ssh/ World struct if this one changes.
|
|
|
|
// Client is the etcd client to use. This should not be specified, one
|
|
// will be created automatically. This exists for legacy reasons and for
|
|
// the SSH etcd world implementation. Maybe it can be removed in the
|
|
// future.
|
|
Client interfaces.Client
|
|
|
|
// Seeds are the list of etcd endpoints to connect to.
|
|
Seeds []string
|
|
|
|
// NS is the etcd namespace to use.
|
|
NS string
|
|
|
|
MetadataPrefix string // expected metadata prefix
|
|
StoragePrefix string // storage prefix for etcdfs storage
|
|
StandaloneFs engine.Fs // store an fs here for local usage
|
|
GetURI func() string
|
|
|
|
init *engine.WorldInit
|
|
client interfaces.Client
|
|
simpleDeploy *deployer.SimpleDeploy
|
|
|
|
cleanups []func() error
|
|
}
|
|
|
|
// Connect runs first.
|
|
func (obj *World) Connect(ctx context.Context, init *engine.WorldInit) error {
|
|
obj.init = init
|
|
|
|
obj.client = obj.Client // legacy default
|
|
if obj.Client == nil {
|
|
c := client.NewClientFromSeedsNamespace(
|
|
obj.Seeds, // endpoints
|
|
obj.NS,
|
|
)
|
|
if err := c.Init(); err != nil {
|
|
return errwrap.Wrapf(err, "client Init failed")
|
|
}
|
|
obj.cleanups = append(obj.cleanups, func() error {
|
|
e := c.Close()
|
|
if obj.init.Debug && e != nil {
|
|
obj.init.Logf("etcd client close error: %+v", e)
|
|
}
|
|
return e
|
|
})
|
|
obj.client = c
|
|
}
|
|
|
|
obj.simpleDeploy = &deployer.SimpleDeploy{
|
|
Client: obj.client,
|
|
Debug: obj.init.Debug,
|
|
Logf: func(format string, v ...interface{}) {
|
|
obj.init.Logf("deploy: "+format, v...)
|
|
},
|
|
}
|
|
if err := obj.simpleDeploy.Init(); err != nil {
|
|
return errwrap.Wrapf(err, "deploy Init failed")
|
|
}
|
|
obj.cleanups = append(obj.cleanups, func() error {
|
|
e := obj.simpleDeploy.Close()
|
|
if obj.init.Debug && e != nil {
|
|
obj.init.Logf("deploy close error: %+v", e)
|
|
}
|
|
return e
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// cleanup performs all the "close" actions either at the very end or as we go.
|
|
func (obj *World) cleanup() error {
|
|
var errs error
|
|
for i := len(obj.cleanups) - 1; i >= 0; i-- { // reverse
|
|
f := obj.cleanups[i]
|
|
if err := f(); err != nil {
|
|
errs = errwrap.Append(errs, err)
|
|
}
|
|
}
|
|
obj.cleanups = nil // clean
|
|
return errs
|
|
}
|
|
|
|
// Cleanup runs last.
|
|
func (obj *World) Cleanup() error {
|
|
return obj.cleanup()
|
|
}
|
|
|
|
// WatchDeploy returns a channel which spits out events on new deploy activity.
|
|
func (obj *World) WatchDeploy(ctx context.Context) (chan error, error) {
|
|
return obj.simpleDeploy.WatchDeploy(ctx)
|
|
}
|
|
|
|
// GetDeploys gets all the available deploys.
|
|
func (obj *World) GetDeploys(ctx context.Context) (map[uint64]string, error) {
|
|
return obj.simpleDeploy.GetDeploys(ctx)
|
|
}
|
|
|
|
// GetDeploy returns the deploy with the specified id if it exists.
|
|
func (obj *World) GetDeploy(ctx context.Context, id uint64) (string, error) {
|
|
return obj.simpleDeploy.GetDeploy(ctx, id)
|
|
}
|
|
|
|
// GetMaxDeployID returns the maximum deploy id.
|
|
func (obj *World) GetMaxDeployID(ctx context.Context) (uint64, error) {
|
|
return obj.simpleDeploy.GetMaxDeployID(ctx)
|
|
}
|
|
|
|
// AddDeploy adds a new deploy.
|
|
func (obj *World) AddDeploy(ctx context.Context, id uint64, hash, pHash string, data *string) error {
|
|
return obj.simpleDeploy.AddDeploy(ctx, id, hash, pHash, data)
|
|
}
|
|
|
|
// ResWatch returns a channel which spits out events on possible exported
|
|
// resource changes.
|
|
func (obj *World) ResWatch(ctx context.Context, kind string) (chan error, error) {
|
|
return resources.WatchResources(ctx, obj.client, obj.init.Hostname, kind)
|
|
}
|
|
|
|
// ResCollect gets the collection of exported resources which match the filters.
|
|
// It does this atomically so that a call always returns a complete collection.
|
|
func (obj *World) ResCollect(ctx context.Context, filters []*engine.ResFilter) ([]*engine.ResOutput, error) {
|
|
return resources.GetResources(ctx, obj.client, obj.init.Hostname, filters)
|
|
}
|
|
|
|
// ResExport stores a number of resources in the world storage system. The
|
|
// individual records should not be updated if they are identical to what is
|
|
// already present. (This is to prevent unnecessary events.) If this makes no
|
|
// changes, it returns (true, nil). If it makes a change, then it returns
|
|
// (false, nil). On any error we return (false, err). It stores the exports
|
|
// under our hostname namespace. Subsequent calls do NOT replace the previously
|
|
// set collection.
|
|
func (obj *World) ResExport(ctx context.Context, resourceExports []*engine.ResExport) (bool, error) {
|
|
return resources.SetResources(ctx, obj.client, obj.init.Hostname, resourceExports)
|
|
}
|
|
|
|
// ResDelete deletes a number of resources in the world storage system. If this
|
|
// doesn't delete, it returns (true, nil). If it makes a delete, then it returns
|
|
// (false, nil). On any error we return (false, err).
|
|
func (obj *World) ResDelete(ctx context.Context, resourceDeletes []*engine.ResDelete) (bool, error) {
|
|
return resources.DelResources(ctx, obj.client, obj.init.Hostname, resourceDeletes)
|
|
}
|
|
|
|
// IdealClusterSizeWatch returns a stream of errors anytime the cluster-wide
|
|
// dynamic cluster size setpoint changes.
|
|
func (obj *World) IdealClusterSizeWatch(ctx context.Context) (chan error, error) {
|
|
c := client.NewClientFromSimple(obj.client, ChooserPath)
|
|
if err := c.Init(); err != nil {
|
|
return nil, err
|
|
}
|
|
wg := util.WgFromCtx(ctx)
|
|
if wg != nil {
|
|
wg.Add(1)
|
|
}
|
|
go func() {
|
|
if wg != nil {
|
|
defer wg.Done()
|
|
}
|
|
// This must get closed *after* because it will not finish until
|
|
// the Watcher returns, because it contains a wg.Wait() in it...
|
|
defer c.Close() // ignore error
|
|
select {
|
|
case <-ctx.Done():
|
|
}
|
|
}()
|
|
return c.Watcher(ctx, chooser.IdealDynamicSizePath)
|
|
}
|
|
|
|
// IdealClusterSizeGet gets the cluster-wide dynamic cluster size setpoint.
|
|
func (obj *World) IdealClusterSizeGet(ctx context.Context) (uint16, error) {
|
|
c := client.NewClientFromSimple(obj.client, ChooserPath)
|
|
if err := c.Init(); err != nil {
|
|
return 0, err
|
|
}
|
|
defer c.Close() // ignore error
|
|
return chooser.DynamicSizeGet(ctx, c) // use client with added namespace
|
|
}
|
|
|
|
// IdealClusterSizeSet sets the cluster-wide dynamic cluster size setpoint.
|
|
func (obj *World) IdealClusterSizeSet(ctx context.Context, size uint16) (bool, error) {
|
|
c := client.NewClientFromSimple(obj.client, ChooserPath)
|
|
if err := c.Init(); err != nil {
|
|
return false, err
|
|
}
|
|
defer c.Close() // ignore error
|
|
return chooser.DynamicSizeSet(ctx, c, size)
|
|
}
|
|
|
|
// StrWatch returns a channel which spits out events on possible string changes.
|
|
func (obj *World) StrWatch(ctx context.Context, namespace string) (chan error, error) {
|
|
return str.WatchStr(ctx, obj.client, namespace)
|
|
}
|
|
|
|
// StrIsNotExist returns whether the error from StrGet is a key missing error.
|
|
func (obj *World) StrIsNotExist(err error) bool {
|
|
return err == interfaces.ErrNotExist
|
|
}
|
|
|
|
// StrGet returns the value for the the given namespace.
|
|
func (obj *World) StrGet(ctx context.Context, namespace string) (string, error) {
|
|
return str.GetStr(ctx, obj.client, namespace)
|
|
}
|
|
|
|
// StrSet sets the namespace value to a particular string.
|
|
// XXX: This can overwrite another hosts value that was set with StrMapSet. Add
|
|
// possible cryptographic signing or special namespacing to prevent such things.
|
|
func (obj *World) StrSet(ctx context.Context, namespace, value string) error {
|
|
return str.SetStr(ctx, obj.client, namespace, &value)
|
|
}
|
|
|
|
// StrDel deletes the value in a particular namespace.
|
|
func (obj *World) StrDel(ctx context.Context, namespace string) error {
|
|
return str.SetStr(ctx, obj.client, namespace, nil)
|
|
}
|
|
|
|
// StrMapWatch returns a channel which spits out events on possible string
|
|
// changes.
|
|
func (obj *World) StrMapWatch(ctx context.Context, namespace string) (chan error, error) {
|
|
return strmap.WatchStrMap(ctx, obj.client, namespace)
|
|
}
|
|
|
|
// StrMapGet returns a map of hostnames to values in the given namespace.
|
|
func (obj *World) StrMapGet(ctx context.Context, namespace string) (map[string]string, error) {
|
|
return strmap.GetStrMap(ctx, obj.client, []string{}, namespace)
|
|
}
|
|
|
|
// StrMapSet sets the namespace value to a particular string under the identity
|
|
// of its own hostname.
|
|
func (obj *World) StrMapSet(ctx context.Context, namespace, value string) error {
|
|
return strmap.SetStrMap(ctx, obj.client, obj.init.Hostname, namespace, &value)
|
|
}
|
|
|
|
// StrMapDel deletes the value in a particular namespace.
|
|
func (obj *World) StrMapDel(ctx context.Context, namespace string) error {
|
|
return strmap.SetStrMap(ctx, obj.client, obj.init.Hostname, namespace, nil)
|
|
}
|
|
|
|
// Scheduler returns a scheduling result of hosts in a particular namespace.
|
|
// XXX: Add a context.Context here
|
|
func (obj *World) Scheduler(namespace string, opts ...scheduler.Option) (*scheduler.Result, error) {
|
|
modifiedOpts := []scheduler.Option{}
|
|
for _, o := range opts {
|
|
modifiedOpts = append(modifiedOpts, o) // copy in
|
|
}
|
|
|
|
modifiedOpts = append(modifiedOpts, scheduler.Debug(obj.init.Debug))
|
|
modifiedOpts = append(modifiedOpts, scheduler.Logf(obj.init.Logf))
|
|
|
|
path := fmt.Sprintf(schedulerPathFmt, namespace)
|
|
return scheduler.Schedule(obj.client.GetClient(), path, obj.init.Hostname, modifiedOpts...)
|
|
}
|
|
|
|
// URI returns the current FS URI.
|
|
// TODO: Can we improve this API or deprecate it entirely?
|
|
func (obj *World) URI() string {
|
|
return obj.GetURI()
|
|
}
|
|
|
|
// Fs returns a distributed file system from a unique URI. For single host
|
|
// execution that doesn't span more than a single host, this file system might
|
|
// actually be a local or memory backed file system, so actually only
|
|
// distributed within the boredom that is a single host cluster.
|
|
func (obj *World) Fs(uri string) (engine.Fs, error) {
|
|
u, err := url.Parse(uri)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// we're in standalone mode
|
|
if u.Scheme == "memmapfs" && u.Path == "/" {
|
|
return obj.StandaloneFs, nil
|
|
}
|
|
|
|
if u.Scheme == embedded.Scheme {
|
|
path := strings.TrimPrefix(u.Path, "/") // expect a leading slash
|
|
return embedded.Lookup(path) // does not expect a leading slash
|
|
}
|
|
|
|
if u.Scheme != etcdfs.Scheme {
|
|
return nil, fmt.Errorf("unknown scheme: `%s`", u.Scheme)
|
|
}
|
|
if u.Path == "" {
|
|
return nil, fmt.Errorf("empty path: %s", u.Path)
|
|
}
|
|
if !strings.HasPrefix(u.Path, obj.MetadataPrefix) {
|
|
return nil, fmt.Errorf("wrong path prefix: %s", u.Path)
|
|
}
|
|
|
|
etcdFs := &etcdfs.Fs{
|
|
Client: obj.client, // TODO: do we need to add a namespace?
|
|
Metadata: u.Path,
|
|
DataPrefix: obj.StoragePrefix,
|
|
|
|
Debug: obj.init.Debug,
|
|
Logf: func(format string, v ...interface{}) {
|
|
obj.init.Logf("fs: "+format, v...)
|
|
},
|
|
}
|
|
return etcdFs, nil
|
|
}
|
|
|
|
// WatchMembers returns a channel of changing members in the cluster.
|
|
func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error) {
|
|
return obj.client.WatchMembers(ctx)
|
|
}
|