etcd: Add world API changes for watching member status
This adds some useful functionality so that anyone with access to the world API, can learn information about the changing etcd cluster it's using.
This commit is contained in:
@@ -20,6 +20,7 @@ package engine
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/purpleidea/mgmt/etcd/interfaces"
|
||||
"github.com/purpleidea/mgmt/etcd/scheduler"
|
||||
)
|
||||
|
||||
@@ -51,4 +52,7 @@ type World interface { // TODO: is there a better name for this interface?
|
||||
Scheduler(namespace string, opts ...scheduler.Option) (*scheduler.Result, error)
|
||||
|
||||
Fs(uri string) (Fs, error)
|
||||
|
||||
// WatchMembers returns a channel of changing members in the cluster.
|
||||
WatchMembers(context.Context) (<-chan *interfaces.MembersResult, error)
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/purpleidea/mgmt/etcd/interfaces"
|
||||
etcdUtil "github.com/purpleidea/mgmt/etcd/util"
|
||||
"github.com/purpleidea/mgmt/util/errwrap"
|
||||
|
||||
etcd "go.etcd.io/etcd/client/v3"
|
||||
@@ -480,3 +481,80 @@ func (obj *Simple) ComplexWatcher(ctx context.Context, path string, opts ...etcd
|
||||
Events: eventsChan,
|
||||
}, err
|
||||
}
|
||||
|
||||
// WatchMembers returns a changing list of cluster membership. This is about the
|
||||
// server peers, not the connected clients.
|
||||
func (obj *Simple) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error) {
|
||||
if obj.client == nil { // etcd
|
||||
return nil, fmt.Errorf("client is nil") // extra safety!
|
||||
}
|
||||
|
||||
ch := make(chan *interfaces.MembersResult)
|
||||
obj.wg.Add(1) // hook in to global wait group
|
||||
go func() {
|
||||
defer obj.wg.Done()
|
||||
defer close(ch)
|
||||
for {
|
||||
|
||||
members := []*interfaces.Member{}
|
||||
result := &interfaces.MembersResult{}
|
||||
resp, err := obj.client.MemberList(ctx)
|
||||
if err != nil {
|
||||
result.Err = err
|
||||
goto Send
|
||||
}
|
||||
if resp == nil {
|
||||
result.Err = fmt.Errorf("empty response")
|
||||
goto Send
|
||||
}
|
||||
|
||||
for _, m := range resp.Members {
|
||||
if m == nil { // skip nil members
|
||||
continue
|
||||
}
|
||||
// member: https://godocs.io/github.com/coreos/etcd/etcdserver/etcdserverpb#Member
|
||||
|
||||
purls, err := etcdUtil.FromStringListToURLs(m.PeerURLs)
|
||||
if err != nil {
|
||||
result.Err = errwrap.Wrapf(err, "invalid member peer URLs")
|
||||
goto Send
|
||||
}
|
||||
curls, err := etcdUtil.FromStringListToURLs(m.ClientURLs)
|
||||
if err != nil {
|
||||
result.Err = errwrap.Wrapf(err, "invalid member client URLs")
|
||||
goto Send
|
||||
}
|
||||
|
||||
member := &interfaces.Member{
|
||||
ID: m.ID,
|
||||
Name: m.Name,
|
||||
//IsLeader: m.IsLeader, // XXX: add when new version of etcd supports this
|
||||
PeerURLs: purls,
|
||||
ClientURLs: curls,
|
||||
}
|
||||
members = append(members, member)
|
||||
}
|
||||
result.Members = members
|
||||
|
||||
Send:
|
||||
select {
|
||||
case ch <- result: // send data
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
if result.Err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// XXX: poll https://github.com/etcd-io/etcd/issues/5277
|
||||
select {
|
||||
case <-time.After(interfaces.MemberChangePollingInterval): // sleep before retry
|
||||
// pass
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
@@ -60,4 +60,7 @@ type Client interface {
|
||||
Txn(ctx context.Context, ifCmps []etcd.Cmp, thenOps, elseOps []etcd.Op) (*etcd.TxnResponse, error)
|
||||
Watcher(ctx context.Context, path string, opts ...etcd.OpOption) (chan error, error)
|
||||
ComplexWatcher(ctx context.Context, path string, opts ...etcd.OpOption) (*WatcherInfo, error)
|
||||
|
||||
// WatchMembers returns a channel of changing members in the cluster.
|
||||
WatchMembers(context.Context) (<-chan *MembersResult, error)
|
||||
}
|
||||
|
||||
30
etcd/interfaces/const.go
Normal file
30
etcd/interfaces/const.go
Normal file
@@ -0,0 +1,30 @@
|
||||
// Mgmt
|
||||
// Copyright (C) 2013-2023+ James Shubin and the project contributors
|
||||
// Written by James Shubin <james@shubin.ca> and the project contributors
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package interfaces
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// MemberChangePollingInterval is the polling interval to use when
|
||||
// watching for member changes for client operations. This is basically
|
||||
// the same idea in the core package named MemberChangeInterval, but
|
||||
// held separately for now.
|
||||
MemberChangePollingInterval = 1000 * time.Millisecond
|
||||
)
|
||||
57
etcd/interfaces/structs.go
Normal file
57
etcd/interfaces/structs.go
Normal file
@@ -0,0 +1,57 @@
|
||||
// Mgmt
|
||||
// Copyright (C) 2013-2023+ James Shubin and the project contributors
|
||||
// Written by James Shubin <james@shubin.ca> and the project contributors
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package interfaces
|
||||
|
||||
import (
|
||||
etcdtypes "go.etcd.io/etcd/client/pkg/v3/types"
|
||||
)
|
||||
|
||||
// MembersResult returns the expect result (including possibly an error) from a
|
||||
// WatchMembers operation.
|
||||
type MembersResult struct {
|
||||
|
||||
// Members is the list of members found in this result.
|
||||
Members []*Member
|
||||
|
||||
// Err represents an error. If this is not nil, don't touch the other
|
||||
// data in this struct.
|
||||
Err error
|
||||
}
|
||||
|
||||
// Member is our internal copy of the etcd member struct as found here:
|
||||
// https://godocs.io/github.com/coreos/etcd/etcdserver/etcdserverpb#Member but
|
||||
// which uses native types where possible.
|
||||
type Member struct {
|
||||
// ID is unique in the cluster for each member.
|
||||
ID uint64
|
||||
|
||||
// Name for the member which if not not started will be an empty string.
|
||||
Name string
|
||||
|
||||
// IsLeader tells which member is leading the cluster. Expect this to
|
||||
// change as time goes on.
|
||||
// XXX: add when new version of etcd supports this
|
||||
//IsLeader bool
|
||||
|
||||
// PeerURLs is the list of addresses peers servers can connect to.
|
||||
PeerURLs etcdtypes.URLs
|
||||
|
||||
// ClientURLs is the list of addresses that clients can connect to. If
|
||||
// the member is not started, then this will be a zero length list.
|
||||
ClientURLs etcdtypes.URLs
|
||||
}
|
||||
@@ -82,6 +82,21 @@ func cmpURLs(u1, u2 etcdtypes.URLs) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// FromStringListToURLs takes a list of string urls and converts them into the
|
||||
// native type.
|
||||
func FromStringListToURLs(surls []string) (etcdtypes.URLs, error) {
|
||||
result := []url.URL{}
|
||||
for _, s := range surls { // flatten map
|
||||
u, err := url.Parse(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, *u)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// CmpURLsMap compares two URLsMap's, and returns nil if they are the same.
|
||||
func CmpURLsMap(m1, m2 etcdtypes.URLsMap) error {
|
||||
if (m1 == nil) != (m2 == nil) { // xor
|
||||
|
||||
@@ -211,3 +211,8 @@ func (obj *World) Fs(uri string) (engine.Fs, error) {
|
||||
}
|
||||
return etcdFs, nil
|
||||
}
|
||||
|
||||
// WatchMembers returns a channel of changing members in the cluster.
|
||||
func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error) {
|
||||
return obj.Client.WatchMembers(ctx)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user