From 738485a655bec698f59182f14cbaacb400633bc6 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Thu, 12 Apr 2018 03:27:05 -0400 Subject: [PATCH] etcd: Add world API changes for watching member status This adds some useful functionality so that anyone with access to the world API, can learn information about the changing etcd cluster it's using. --- engine/world.go | 4 ++ etcd/client/simple.go | 78 ++++++++++++++++++++++++++++++++++++++ etcd/interfaces/client.go | 3 ++ etcd/interfaces/const.go | 30 +++++++++++++++ etcd/interfaces/structs.go | 57 ++++++++++++++++++++++++++++ etcd/util/util.go | 15 ++++++++ etcd/world.go | 5 +++ 7 files changed, 192 insertions(+) create mode 100644 etcd/interfaces/const.go create mode 100644 etcd/interfaces/structs.go diff --git a/engine/world.go b/engine/world.go index ad3e3e11..47f862df 100644 --- a/engine/world.go +++ b/engine/world.go @@ -20,6 +20,7 @@ package engine import ( "context" + "github.com/purpleidea/mgmt/etcd/interfaces" "github.com/purpleidea/mgmt/etcd/scheduler" ) @@ -51,4 +52,7 @@ type World interface { // TODO: is there a better name for this interface? Scheduler(namespace string, opts ...scheduler.Option) (*scheduler.Result, error) Fs(uri string) (Fs, error) + + // WatchMembers returns a channel of changing members in the cluster. + WatchMembers(context.Context) (<-chan *interfaces.MembersResult, error) } diff --git a/etcd/client/simple.go b/etcd/client/simple.go index 9569e190..5fc51e2e 100644 --- a/etcd/client/simple.go +++ b/etcd/client/simple.go @@ -24,6 +24,7 @@ import ( "time" "github.com/purpleidea/mgmt/etcd/interfaces" + etcdUtil "github.com/purpleidea/mgmt/etcd/util" "github.com/purpleidea/mgmt/util/errwrap" etcd "go.etcd.io/etcd/client/v3" @@ -480,3 +481,80 @@ func (obj *Simple) ComplexWatcher(ctx context.Context, path string, opts ...etcd Events: eventsChan, }, err } + +// WatchMembers returns a changing list of cluster membership. This is about the +// server peers, not the connected clients. +func (obj *Simple) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error) { + if obj.client == nil { // etcd + return nil, fmt.Errorf("client is nil") // extra safety! + } + + ch := make(chan *interfaces.MembersResult) + obj.wg.Add(1) // hook in to global wait group + go func() { + defer obj.wg.Done() + defer close(ch) + for { + + members := []*interfaces.Member{} + result := &interfaces.MembersResult{} + resp, err := obj.client.MemberList(ctx) + if err != nil { + result.Err = err + goto Send + } + if resp == nil { + result.Err = fmt.Errorf("empty response") + goto Send + } + + for _, m := range resp.Members { + if m == nil { // skip nil members + continue + } + // member: https://godocs.io/github.com/coreos/etcd/etcdserver/etcdserverpb#Member + + purls, err := etcdUtil.FromStringListToURLs(m.PeerURLs) + if err != nil { + result.Err = errwrap.Wrapf(err, "invalid member peer URLs") + goto Send + } + curls, err := etcdUtil.FromStringListToURLs(m.ClientURLs) + if err != nil { + result.Err = errwrap.Wrapf(err, "invalid member client URLs") + goto Send + } + + member := &interfaces.Member{ + ID: m.ID, + Name: m.Name, + //IsLeader: m.IsLeader, // XXX: add when new version of etcd supports this + PeerURLs: purls, + ClientURLs: curls, + } + members = append(members, member) + } + result.Members = members + + Send: + select { + case ch <- result: // send data + case <-ctx.Done(): + return + } + if result.Err != nil { + return + } + + // XXX: poll https://github.com/etcd-io/etcd/issues/5277 + select { + case <-time.After(interfaces.MemberChangePollingInterval): // sleep before retry + // pass + case <-ctx.Done(): + return + } + } + }() + + return ch, nil +} diff --git a/etcd/interfaces/client.go b/etcd/interfaces/client.go index b2087554..2996e5b6 100644 --- a/etcd/interfaces/client.go +++ b/etcd/interfaces/client.go @@ -60,4 +60,7 @@ type Client interface { Txn(ctx context.Context, ifCmps []etcd.Cmp, thenOps, elseOps []etcd.Op) (*etcd.TxnResponse, error) Watcher(ctx context.Context, path string, opts ...etcd.OpOption) (chan error, error) ComplexWatcher(ctx context.Context, path string, opts ...etcd.OpOption) (*WatcherInfo, error) + + // WatchMembers returns a channel of changing members in the cluster. + WatchMembers(context.Context) (<-chan *MembersResult, error) } diff --git a/etcd/interfaces/const.go b/etcd/interfaces/const.go new file mode 100644 index 00000000..396c162b --- /dev/null +++ b/etcd/interfaces/const.go @@ -0,0 +1,30 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package interfaces + +import ( + "time" +) + +const ( + // MemberChangePollingInterval is the polling interval to use when + // watching for member changes for client operations. This is basically + // the same idea in the core package named MemberChangeInterval, but + // held separately for now. + MemberChangePollingInterval = 1000 * time.Millisecond +) diff --git a/etcd/interfaces/structs.go b/etcd/interfaces/structs.go new file mode 100644 index 00000000..e668b51d --- /dev/null +++ b/etcd/interfaces/structs.go @@ -0,0 +1,57 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package interfaces + +import ( + etcdtypes "go.etcd.io/etcd/client/pkg/v3/types" +) + +// MembersResult returns the expect result (including possibly an error) from a +// WatchMembers operation. +type MembersResult struct { + + // Members is the list of members found in this result. + Members []*Member + + // Err represents an error. If this is not nil, don't touch the other + // data in this struct. + Err error +} + +// Member is our internal copy of the etcd member struct as found here: +// https://godocs.io/github.com/coreos/etcd/etcdserver/etcdserverpb#Member but +// which uses native types where possible. +type Member struct { + // ID is unique in the cluster for each member. + ID uint64 + + // Name for the member which if not not started will be an empty string. + Name string + + // IsLeader tells which member is leading the cluster. Expect this to + // change as time goes on. + // XXX: add when new version of etcd supports this + //IsLeader bool + + // PeerURLs is the list of addresses peers servers can connect to. + PeerURLs etcdtypes.URLs + + // ClientURLs is the list of addresses that clients can connect to. If + // the member is not started, then this will be a zero length list. + ClientURLs etcdtypes.URLs +} diff --git a/etcd/util/util.go b/etcd/util/util.go index 75a10553..158e3b54 100644 --- a/etcd/util/util.go +++ b/etcd/util/util.go @@ -82,6 +82,21 @@ func cmpURLs(u1, u2 etcdtypes.URLs) error { return nil } +// FromStringListToURLs takes a list of string urls and converts them into the +// native type. +func FromStringListToURLs(surls []string) (etcdtypes.URLs, error) { + result := []url.URL{} + for _, s := range surls { // flatten map + u, err := url.Parse(s) + if err != nil { + return nil, err + } + result = append(result, *u) + } + + return result, nil +} + // CmpURLsMap compares two URLsMap's, and returns nil if they are the same. func CmpURLsMap(m1, m2 etcdtypes.URLsMap) error { if (m1 == nil) != (m2 == nil) { // xor diff --git a/etcd/world.go b/etcd/world.go index 6c844b81..17746159 100644 --- a/etcd/world.go +++ b/etcd/world.go @@ -211,3 +211,8 @@ func (obj *World) Fs(uri string) (engine.Fs, error) { } return etcdFs, nil } + +// WatchMembers returns a channel of changing members in the cluster. +func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error) { + return obj.Client.WatchMembers(ctx) +}