Files
mgmt/etcd/world.go
2024-03-05 01:05:50 -05:00

239 lines
8.9 KiB
Go

// Mgmt
// Copyright (C) 2013-2024+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//
// Additional permission under GNU GPL version 3 section 7
//
// If you modify this program, or any covered work, by linking or combining it
// with embedded mcl code and modules (and that the embedded mcl code and
// modules which link with this program, contain a copy of their source code in
// the authoritative form) containing parts covered by the terms of any other
// license, the licensors of this program grant you additional permission to
// convey the resulting work. Furthermore, the licensors of this program grant
// the original author, James Shubin, additional permission to update this
// additional permission if he deems it necessary to achieve the goals of this
// additional permission.
package etcd
import (
"context"
"fmt"
"net/url"
"strings"
"github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/etcd/chooser"
"github.com/purpleidea/mgmt/etcd/client"
"github.com/purpleidea/mgmt/etcd/client/resources"
"github.com/purpleidea/mgmt/etcd/client/str"
"github.com/purpleidea/mgmt/etcd/client/strmap"
etcdfs "github.com/purpleidea/mgmt/etcd/fs"
"github.com/purpleidea/mgmt/etcd/interfaces"
"github.com/purpleidea/mgmt/etcd/scheduler"
"github.com/purpleidea/mgmt/lang/embedded"
"github.com/purpleidea/mgmt/util"
)
// World is an etcd backed implementation of the World interface.
type World struct {
Hostname string // uuid for the consumer of these
Client interfaces.Client
MetadataPrefix string // expected metadata prefix
StoragePrefix string // storage prefix for etcdfs storage
StandaloneFs engine.Fs // store an fs here for local usage
Debug bool
Logf func(format string, v ...interface{})
}
// ResWatch returns a channel which spits out events on possible exported
// resource changes.
func (obj *World) ResWatch(ctx context.Context) (chan error, error) {
return resources.WatchResources(ctx, obj.Client)
}
// ResExport exports a list of resources under our hostname namespace.
// Subsequent calls replace the previously set collection atomically.
func (obj *World) ResExport(ctx context.Context, resourceList []engine.Res) error {
return resources.SetResources(ctx, obj.Client, obj.Hostname, resourceList)
}
// ResCollect gets the collection of exported resources which match the filter.
// It does this atomically so that a call always returns a complete collection.
func (obj *World) ResCollect(ctx context.Context, hostnameFilter, kindFilter []string) ([]engine.Res, error) {
// XXX: should we be restricted to retrieving resources that were
// exported with a tag that allows or restricts our hostname? We could
// enforce that here if the underlying API supported it... Add this?
return resources.GetResources(ctx, obj.Client, hostnameFilter, kindFilter)
}
// IdealClusterSizeWatch returns a stream of errors anytime the cluster-wide
// dynamic cluster size setpoint changes.
func (obj *World) IdealClusterSizeWatch(ctx context.Context) (chan error, error) {
c := client.NewClientFromSimple(obj.Client, ChooserPath)
if err := c.Init(); err != nil {
return nil, err
}
wg := util.WgFromCtx(ctx)
if wg != nil {
wg.Add(1)
}
go func() {
if wg != nil {
defer wg.Done()
}
// This must get closed *after* because it will not finish until
// the Watcher returns, because it contains a wg.Wait() in it...
defer c.Close() // ignore error
select {
case <-ctx.Done():
}
}()
return c.Watcher(ctx, chooser.IdealDynamicSizePath)
}
// IdealClusterSizeGet gets the cluster-wide dynamic cluster size setpoint.
func (obj *World) IdealClusterSizeGet(ctx context.Context) (uint16, error) {
c := client.NewClientFromSimple(obj.Client, ChooserPath)
if err := c.Init(); err != nil {
return 0, err
}
defer c.Close() // ignore error
return chooser.DynamicSizeGet(ctx, c) // use client with added namespace
}
// IdealClusterSizeSet sets the cluster-wide dynamic cluster size setpoint.
func (obj *World) IdealClusterSizeSet(ctx context.Context, size uint16) (bool, error) {
c := client.NewClientFromSimple(obj.Client, ChooserPath)
if err := c.Init(); err != nil {
return false, err
}
defer c.Close() // ignore error
return chooser.DynamicSizeSet(ctx, c, size)
}
// StrWatch returns a channel which spits out events on possible string changes.
func (obj *World) StrWatch(ctx context.Context, namespace string) (chan error, error) {
return str.WatchStr(ctx, obj.Client, namespace)
}
// StrIsNotExist returns whether the error from StrGet is a key missing error.
func (obj *World) StrIsNotExist(err error) bool {
return err == interfaces.ErrNotExist
}
// StrGet returns the value for the the given namespace.
func (obj *World) StrGet(ctx context.Context, namespace string) (string, error) {
return str.GetStr(ctx, obj.Client, namespace)
}
// StrSet sets the namespace value to a particular string.
// XXX: This can overwrite another hosts value that was set with StrMapSet. Add
// possible cryptographic signing or special namespacing to prevent such things.
func (obj *World) StrSet(ctx context.Context, namespace, value string) error {
return str.SetStr(ctx, obj.Client, namespace, &value)
}
// StrDel deletes the value in a particular namespace.
func (obj *World) StrDel(ctx context.Context, namespace string) error {
return str.SetStr(ctx, obj.Client, namespace, nil)
}
// StrMapWatch returns a channel which spits out events on possible string
// changes.
func (obj *World) StrMapWatch(ctx context.Context, namespace string) (chan error, error) {
return strmap.WatchStrMap(ctx, obj.Client, namespace)
}
// StrMapGet returns a map of hostnames to values in the given namespace.
func (obj *World) StrMapGet(ctx context.Context, namespace string) (map[string]string, error) {
return strmap.GetStrMap(ctx, obj.Client, []string{}, namespace)
}
// StrMapSet sets the namespace value to a particular string under the identity
// of its own hostname.
func (obj *World) StrMapSet(ctx context.Context, namespace, value string) error {
return strmap.SetStrMap(ctx, obj.Client, obj.Hostname, namespace, &value)
}
// StrMapDel deletes the value in a particular namespace.
func (obj *World) StrMapDel(ctx context.Context, namespace string) error {
return strmap.SetStrMap(ctx, obj.Client, obj.Hostname, namespace, nil)
}
// Scheduler returns a scheduling result of hosts in a particular namespace.
// XXX: Add a context.Context here
func (obj *World) Scheduler(namespace string, opts ...scheduler.Option) (*scheduler.Result, error) {
modifiedOpts := []scheduler.Option{}
for _, o := range opts {
modifiedOpts = append(modifiedOpts, o) // copy in
}
modifiedOpts = append(modifiedOpts, scheduler.Debug(obj.Debug))
modifiedOpts = append(modifiedOpts, scheduler.Logf(obj.Logf))
path := fmt.Sprintf(schedulerPathFmt, namespace)
return scheduler.Schedule(obj.Client.GetClient(), path, obj.Hostname, modifiedOpts...)
}
// Fs returns a distributed file system from a unique URI. For single host
// execution that doesn't span more than a single host, this file system might
// actually be a local or memory backed file system, so actually only
// distributed within the boredom that is a single host cluster.
func (obj *World) Fs(uri string) (engine.Fs, error) {
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
// we're in standalone mode
if u.Scheme == "memmapfs" && u.Path == "/" {
return obj.StandaloneFs, nil
}
if u.Scheme == embedded.Scheme {
path := strings.TrimPrefix(u.Path, "/") // expect a leading slash
return embedded.Lookup(path) // does not expect a leading slash
}
if u.Scheme != etcdfs.Scheme {
return nil, fmt.Errorf("unknown scheme: `%s`", u.Scheme)
}
if u.Path == "" {
return nil, fmt.Errorf("empty path: %s", u.Path)
}
if !strings.HasPrefix(u.Path, obj.MetadataPrefix) {
return nil, fmt.Errorf("wrong path prefix: %s", u.Path)
}
etcdFs := &etcdfs.Fs{
Client: obj.Client, // TODO: do we need to add a namespace?
Metadata: u.Path,
DataPrefix: obj.StoragePrefix,
Debug: obj.Debug,
Logf: func(format string, v ...interface{}) {
obj.Logf("fs: "+format, v...)
},
}
return etcdFs, nil
}
// WatchMembers returns a channel of changing members in the cluster.
func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error) {
return obj.Client.WatchMembers(ctx)
}