Files
mgmt/etcd/client/simple.go
James Shubin bed7e6be79 etcd: Pass through the namespace
This is a bit tricky, and we should nuke and redo some of this API. The
sneaky bit has to do with whether we've already added the namespace
magic into our etcd client or not.
2025-09-09 00:04:22 -04:00

591 lines
16 KiB
Go

// Mgmt
// Copyright (C) James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//
// Additional permission under GNU GPL version 3 section 7
//
// If you modify this program, or any covered work, by linking or combining it
// with embedded mcl code and modules (and that the embedded mcl code and
// modules which link with this program, contain a copy of their source code in
// the authoritative form) containing parts covered by the terms of any other
// license, the licensors of this program grant you additional permission to
// convey the resulting work. Furthermore, the licensors of this program grant
// the original author, James Shubin, additional permission to update this
// additional permission if he deems it necessary to achieve the goals of this
// additional permission.
package client
import (
"context"
"fmt"
"sync"
"time"
"github.com/purpleidea/mgmt/etcd/interfaces"
etcdUtil "github.com/purpleidea/mgmt/etcd/util"
"github.com/purpleidea/mgmt/util/errwrap"
etcd "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/namespace"
)
// method represents the method we used to build the simple client.
type method uint8
const (
methodError method = iota
methodSeeds
methodClient
methodNamespace
)
// NewClientFromSeeds builds a new simple client by connecting to a list of
// seeds.
func NewClientFromSeeds(seeds []string) *Simple {
return &Simple{
method: methodSeeds,
wg: &sync.WaitGroup{},
seeds: seeds,
}
}
// NewClientFromSeedsNamespace builds a new simple client by connecting to a
// list of seeds and ensuring all key access is prefixed with a namespace.
func NewClientFromSeedsNamespace(seeds []string, ns string) *Simple {
return &Simple{
method: methodSeeds,
wg: &sync.WaitGroup{},
seeds: seeds,
namespace: ns,
}
}
// NewClientFromClient builds a new simple client by taking an existing client
// struct. It does not disconnect this when Close is called, as that is up to
// the parent, which is the owner of that client input struct.
func NewClientFromClient(client *etcd.Client) *Simple {
return &Simple{
method: methodClient,
wg: &sync.WaitGroup{},
client: client,
}
}
// NewClientFromNamespaceStr builds a new simple client by taking an existing
// client and a string namespace. Warning, this doesn't properly nest the
// namespaces.
func NewClientFromNamespaceStr(client *etcd.Client, ns string) *Simple {
if client == nil {
return &Simple{
method: methodError,
err: fmt.Errorf("client is nil"),
}
}
kv := client.KV
w := client.Watcher
if ns != "" { // only layer if not empty
kv = namespace.NewKV(client.KV, ns)
w = namespace.NewWatcher(client.Watcher, ns)
}
return &Simple{
method: methodClient, // similar enough to this one to share it!
wg: &sync.WaitGroup{},
namespace: ns, // XXX: should we add this here?
client: client, // store for GetClient()
kv: kv,
w: w,
}
}
// NewClientFromSimple builds a simple client from an existing client interface
// which must be a simple client. This awkward method is required so that
// namespace nesting works properly, because the *etcd.Client doesn't directly
// pass through the namespace. I'd love to nuke this function, but it's good
// enough for now.
func NewClientFromSimple(client interfaces.Client, ns string) *Simple {
if client == nil {
return &Simple{
method: methodError,
err: fmt.Errorf("client is nil"),
}
}
simple, ok := client.(*Simple)
if !ok {
return &Simple{
method: methodError,
err: fmt.Errorf("client is not simple"),
}
}
kv := simple.kv
w := simple.w
if ns != "" { // only layer if not empty
kv = namespace.NewKV(simple.kv, ns)
w = namespace.NewWatcher(simple.w, ns)
}
return &Simple{
method: methodNamespace,
wg: &sync.WaitGroup{},
namespace: client.GetNamespace(), // XXX: should we add this here?
client: client.GetClient(), // store for GetClient()
kv: kv,
w: w,
}
}
// NewClientFromNamespace builds a new simple client by taking an existing set
// of interface API's that we might use.
func NewClientFromNamespace(client *etcd.Client, kv etcd.KV, w etcd.Watcher, ns string) *Simple {
return &Simple{
method: methodNamespace,
wg: &sync.WaitGroup{},
namespace: ns, // in case someone wants to read what our ns is.
client: client, // store for GetClient()
kv: kv,
w: w,
}
}
// Simple provides a simple etcd client for deploy and status operations. You
// can set Debug and Logf after you've built this with one of the NewClient*
// methods.
type Simple struct {
Debug bool
Logf func(format string, v ...interface{})
method method
wg *sync.WaitGroup
// err is the error we set when using methodError
err error
// seeds is the list of endpoints to try to connect to.
seeds []string
namespace string
// client is the etcd client connection.
client *etcd.Client
// kv and w are the namespaced interfaces that we got passed.
kv etcd.KV
w etcd.Watcher
}
// logf is a safe wrapper around the Logf parameter that doesn't panic if the
// user didn't pass a logger in.
func (obj *Simple) logf(format string, v ...interface{}) {
if obj.Logf == nil {
return
}
obj.Logf(format, v...)
}
// config returns the config struct to be used for the etcd client connect.
func (obj *Simple) config() etcd.Config {
cfg := etcd.Config{
Endpoints: obj.seeds,
// RetryDialer chooses the next endpoint to use
// it comes with a default dialer if unspecified
DialTimeout: 5 * time.Second,
}
return cfg
}
// connect connects the client to a server, and then builds the *API structs.
func (obj *Simple) connect() error {
if obj.client != nil { // memoize
return nil
}
var err error
cfg := obj.config()
obj.client, err = etcd.New(cfg) // connect!
if err != nil {
return errwrap.Wrapf(err, "client connect error")
}
obj.kv = obj.client.KV
obj.w = obj.client.Watcher
if obj.namespace != "" { // bonus feature of seeds method
obj.kv = namespace.NewKV(obj.client.KV, obj.namespace)
obj.w = namespace.NewWatcher(obj.client.Watcher, obj.namespace)
}
return nil
}
// Init starts up the struct.
func (obj *Simple) Init() error {
// By the end of this, we must have obj.kv and obj.w available for use.
switch obj.method {
case methodError:
return obj.err // use the error we set
case methodSeeds:
if len(obj.seeds) <= 0 {
return fmt.Errorf("zero seeds")
}
return obj.connect()
case methodClient:
if obj.client == nil {
return fmt.Errorf("no client")
}
if obj.kv == nil { // overwrite if not specified!
obj.kv = obj.client.KV
}
if obj.w == nil {
obj.w = obj.client.Watcher
}
return nil
case methodNamespace:
if obj.kv == nil || obj.w == nil {
return fmt.Errorf("empty namespace")
}
return nil
}
return fmt.Errorf("unknown method: %+v", obj.method)
}
// Close cleans up the struct after we're finished.
func (obj *Simple) Close() error {
defer obj.wg.Wait()
switch obj.method {
case methodError: // for consistency
return fmt.Errorf("did not Init")
case methodSeeds:
return obj.client.Close()
case methodClient:
// we we're given a client, so we don't own it or close it
return nil
case methodNamespace:
return nil
}
return fmt.Errorf("unknown method: %+v", obj.method)
}
// GetClient returns a handle to an open etcd Client. This is needed for certain
// upstream API's that don't support passing in KV and Watcher instead.
func (obj *Simple) GetClient() *etcd.Client {
return obj.client
}
// GetNamespace returns the namespace prefix we use for all keys. This is needed
// whenever we use the client manually.
func (obj *Simple) GetNamespace() string {
return obj.namespace
}
// Set runs a set operation. If you'd like more information about whether a
// value changed or not, use Txn instead.
func (obj *Simple) Set(ctx context.Context, key, value string, opts ...etcd.OpOption) error {
// key is the full key path
resp, err := obj.kv.Put(ctx, key, value, opts...)
if obj.Debug {
obj.logf("set(%s): %v", key, resp) // bonus
}
return err
}
// Get runs a get operation.
func (obj *Simple) Get(ctx context.Context, path string, opts ...etcd.OpOption) (map[string]string, error) {
resp, err := obj.kv.Get(ctx, path, opts...)
if err != nil {
return nil, err
}
if resp == nil {
return nil, fmt.Errorf("empty response")
}
// TODO: write a resp.ToMap() function on https://godoc.org/github.com/etcd-io/etcd/api/etcdserverpb#RangeResponse
result := make(map[string]string)
for _, x := range resp.Kvs {
result[string(x.Key)] = string(x.Value)
}
return result, nil
}
// Del runs a delete operation.
func (obj *Simple) Del(ctx context.Context, path string, opts ...etcd.OpOption) (int64, error) {
resp, err := obj.kv.Delete(ctx, path, opts...)
if err == nil {
return resp.Deleted, nil
}
return -1, err
}
// Txn runs a transaction.
func (obj *Simple) Txn(ctx context.Context, ifCmps []etcd.Cmp, thenOps, elseOps []etcd.Op) (*etcd.TxnResponse, error) {
resp, err := obj.kv.Txn(ctx).If(ifCmps...).Then(thenOps...).Else(elseOps...).Commit()
if obj.Debug {
obj.logf("txn: %v", resp) // bonus
}
return resp, err
}
// Watcher is a watcher that returns a chan of error's instead of a chan with
// all sorts of watcher data. This is useful when we only want an event signal,
// but we don't care about the specifics.
func (obj *Simple) Watcher(ctx context.Context, path string, opts ...etcd.OpOption) (chan error, error) {
cancelCtx, cancel := context.WithCancel(ctx)
info, err := obj.ComplexWatcher(cancelCtx, path, opts...)
if err != nil {
defer cancel()
return nil, err
}
ch := make(chan error)
obj.wg.Add(1) // hook in to global wait group
go func() {
defer obj.wg.Done()
defer close(ch)
defer cancel()
var data *interfaces.WatcherData
var ok bool
for {
select {
case data, ok = <-info.Events: // read
if !ok {
return
}
case <-cancelCtx.Done():
continue // wait for ch closure, but don't block
}
select {
case ch <- data.Err: // send (might be nil!)
case <-cancelCtx.Done():
continue // wait for ch closure, but don't block
}
}
}()
return ch, nil
}
// ComplexWatcher is a more capable watcher that also returns data information.
// This starts a watch request. It writes on a channel that you can follow to
// know when an event or an error occurs. It always sends one startup event. It
// will not return until the watch has been started. If it cannot start, then it
// will return an error. Remember to add the WithPrefix() option if you want to
// watch recursively.
// TODO: do we need to support retry and changed client connections?
// XXX: do we need to track last successful revision and retry from there? If so
// use: lastRev := response.Header.Revision // TODO: +1 ? and: etcd.WithRev(rev)
func (obj *Simple) ComplexWatcher(ctx context.Context, path string, opts ...etcd.OpOption) (*interfaces.WatcherInfo, error) {
if obj.client == nil { // catch bugs, this often means programming error
return nil, fmt.Errorf("client is nil") // extra safety!
}
cancelCtx, cancel := context.WithCancel(ctx)
eventsChan := make(chan *interfaces.WatcherData) // channel of runtime errors
var count uint8
wg := &sync.WaitGroup{}
// TODO: if we can detect the use of WithCreatedNotify, we don't need to
// hard-code it down below... https://github.com/etcd-io/etcd/issues/9689
// XXX: proof of concept patch: https://github.com/etcd-io/etcd/pull/9705
//for _, op := range opts {
// //if op.Cmp(etcd.WithCreatedNotify()) == nil { // would be best
// if etcd.OpOptionCmp(op, etcd.WithCreatedNotify()) == nil {
// count++
// wg.Add(1)
// break
// }
//}
count++
wg.Add(1)
wOpts := []etcd.OpOption{
etcd.WithCreatedNotify(),
}
wOpts = append(wOpts, opts...)
var err error
obj.wg.Add(1) // hook in to global wait group
go func() {
defer obj.wg.Done()
defer close(eventsChan)
defer cancel() // it's safe to cancel() more than once!
ch := obj.w.Watch(cancelCtx, path, wOpts...)
for {
var resp etcd.WatchResponse
var ok bool
var created bool
select {
case resp, ok = <-ch:
if !ok {
if count > 0 { // closed before startup
// set err in parent scope!
err = fmt.Errorf("watch closed")
count--
wg.Done()
}
return
}
// the watch is now running!
if count > 0 && resp.Created {
created = true
count--
wg.Done()
}
isCanceled := resp.Canceled || resp.Err() == context.Canceled
// TODO: this might not be needed
if resp.Header.Revision == 0 { // by inspection
if obj.Debug {
obj.logf("watch: received empty message") // switched client connection
}
isCanceled = true
}
if isCanceled {
data := &interfaces.WatcherData{
Err: context.Canceled,
}
select { // send the error
case eventsChan <- data:
case <-ctx.Done():
if count > 0 { // XXX: hack
wg.Done()
}
return
}
continue // channel should close shortly
}
}
// TODO: consider processing the response data into a
// more useful form for the callback...
data := &interfaces.WatcherData{
Created: created,
Path: path,
Header: resp.Header,
Events: resp.Events,
Err: resp.Err(),
}
select { // send the event
case eventsChan <- data:
case <-ctx.Done():
if count > 0 { // XXX: hack
wg.Done()
}
return
}
}
}()
wg.Wait() // wait for created event before we return
return &interfaces.WatcherInfo{
Cancel: cancel,
Events: eventsChan,
}, err
}
// WatchMembers returns a changing list of cluster membership. This is about the
// server peers, not the connected clients.
func (obj *Simple) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error) {
if obj.client == nil { // etcd
return nil, fmt.Errorf("client is nil") // extra safety!
}
ch := make(chan *interfaces.MembersResult)
obj.wg.Add(1) // hook in to global wait group
go func() {
defer obj.wg.Done()
defer close(ch)
for {
members := []*interfaces.Member{}
result := &interfaces.MembersResult{}
resp, err := obj.client.MemberList(ctx)
if err != nil {
result.Err = err
goto Send
}
if resp == nil {
result.Err = fmt.Errorf("empty response")
goto Send
}
for _, m := range resp.Members {
if m == nil { // skip nil members
continue
}
// member: https://godocs.io/github.com/coreos/etcd/etcdserver/etcdserverpb#Member
purls, err := etcdUtil.FromStringListToURLs(m.PeerURLs)
if err != nil {
result.Err = errwrap.Wrapf(err, "invalid member peer URLs")
goto Send
}
curls, err := etcdUtil.FromStringListToURLs(m.ClientURLs)
if err != nil {
result.Err = errwrap.Wrapf(err, "invalid member client URLs")
goto Send
}
member := &interfaces.Member{
ID: m.ID,
Name: m.Name,
//IsLeader: m.IsLeader, // XXX: add when new version of etcd supports this
PeerURLs: purls,
ClientURLs: curls,
}
members = append(members, member)
}
result.Members = members
Send:
select {
case ch <- result: // send data
case <-ctx.Done():
return
}
if result.Err != nil {
return
}
// XXX: poll https://github.com/etcd-io/etcd/issues/5277
select {
case <-time.After(interfaces.MemberChangePollingInterval): // sleep before retry
// pass
case <-ctx.Done():
return
}
}
}()
return ch, nil
}