etcd: util: Move etcd utils into separate package
This commit is contained in:
@@ -23,6 +23,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/etcd/interfaces"
|
"github.com/purpleidea/mgmt/etcd/interfaces"
|
||||||
|
etcdUtil "github.com/purpleidea/mgmt/etcd/util"
|
||||||
"github.com/purpleidea/mgmt/util"
|
"github.com/purpleidea/mgmt/util"
|
||||||
"github.com/purpleidea/mgmt/util/errwrap"
|
"github.com/purpleidea/mgmt/util/errwrap"
|
||||||
|
|
||||||
@@ -84,7 +85,7 @@ func (obj *EmbdEtcd) endpointApply(data *interfaces.WatcherData) error {
|
|||||||
// is the endpoint list different?
|
// is the endpoint list different?
|
||||||
// TODO: do we want to use the skipEndpointApply here too?
|
// TODO: do we want to use the skipEndpointApply here too?
|
||||||
skipEndpointApply := obj.NoServer && len(endpoints) == 0 && len(obj.endpoints) > 0
|
skipEndpointApply := obj.NoServer && len(endpoints) == 0 && len(obj.endpoints) > 0
|
||||||
if err := cmpURLsMap(obj.endpoints, endpoints); err != nil && !skipEndpointApply {
|
if err := etcdUtil.CmpURLsMap(obj.endpoints, endpoints); err != nil && !skipEndpointApply {
|
||||||
obj.endpoints = endpoints // set
|
obj.endpoints = endpoints // set
|
||||||
// can happen if a server drops out for example
|
// can happen if a server drops out for example
|
||||||
obj.Logf("endpoint list changed to: %+v", endpoints)
|
obj.Logf("endpoint list changed to: %+v", endpoints)
|
||||||
@@ -153,7 +154,7 @@ func (obj *EmbdEtcd) nominateCb(ctx context.Context) error {
|
|||||||
var sendError = false
|
var sendError = false
|
||||||
var serverErr error
|
var serverErr error
|
||||||
obj.Logf("waiting for server...")
|
obj.Logf("waiting for server...")
|
||||||
nominated, err := copyURLsMap(obj.nominated)
|
nominated, err := etcdUtil.CopyURLsMap(obj.nominated)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -321,11 +322,11 @@ func (obj *EmbdEtcd) volunteerCb(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: do we really need to check these errors?
|
// TODO: do we really need to check these errors?
|
||||||
m, err := copyURLsMap(obj.membermap) // list of members...
|
m, err := etcdUtil.CopyURLsMap(obj.membermap) // list of members...
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
v, err := copyURLsMap(obj.volunteers)
|
v, err := etcdUtil.CopyURLsMap(obj.volunteers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
15
etcd/etcd.go
15
etcd/etcd.go
@@ -119,6 +119,7 @@ import (
|
|||||||
"github.com/purpleidea/mgmt/etcd/chooser"
|
"github.com/purpleidea/mgmt/etcd/chooser"
|
||||||
"github.com/purpleidea/mgmt/etcd/client"
|
"github.com/purpleidea/mgmt/etcd/client"
|
||||||
"github.com/purpleidea/mgmt/etcd/interfaces"
|
"github.com/purpleidea/mgmt/etcd/interfaces"
|
||||||
|
etcdUtil "github.com/purpleidea/mgmt/etcd/util"
|
||||||
"github.com/purpleidea/mgmt/util"
|
"github.com/purpleidea/mgmt/util"
|
||||||
"github.com/purpleidea/mgmt/util/errwrap"
|
"github.com/purpleidea/mgmt/util/errwrap"
|
||||||
|
|
||||||
@@ -405,7 +406,7 @@ func (obj *EmbdEtcd) Validate() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := copyURLs(obj.Seeds); err != nil { // this will validate
|
if _, err := etcdUtil.CopyURLs(obj.Seeds); err != nil { // this will validate
|
||||||
return errwrap.Wrapf(err, "the Seeds are not valid")
|
return errwrap.Wrapf(err, "the Seeds are not valid")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -475,7 +476,7 @@ func (obj *EmbdEtcd) Init() error {
|
|||||||
|
|
||||||
// TODO: if we don't have any localhost URLs, should we warn so
|
// TODO: if we don't have any localhost URLs, should we warn so
|
||||||
// that our local client can be able to connect more easily?
|
// that our local client can be able to connect more easily?
|
||||||
if len(localhostURLs(obj.ClientURLs)) == 0 {
|
if len(etcdUtil.LocalhostURLs(obj.ClientURLs)) == 0 {
|
||||||
u, err := url.Parse(DefaultClientURL)
|
u, err := url.Parse(DefaultClientURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -592,9 +593,9 @@ func (obj *EmbdEtcd) Close() error {
|
|||||||
func (obj *EmbdEtcd) curls() (etcdtypes.URLs, error) {
|
func (obj *EmbdEtcd) curls() (etcdtypes.URLs, error) {
|
||||||
// TODO: do we need the copy?
|
// TODO: do we need the copy?
|
||||||
if len(obj.AClientURLs) > 0 {
|
if len(obj.AClientURLs) > 0 {
|
||||||
return copyURLs(obj.AClientURLs)
|
return etcdUtil.CopyURLs(obj.AClientURLs)
|
||||||
}
|
}
|
||||||
return copyURLs(obj.ClientURLs)
|
return etcdUtil.CopyURLs(obj.ClientURLs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// surls returns the server (peer) urls that we should use everywhere except for
|
// surls returns the server (peer) urls that we should use everywhere except for
|
||||||
@@ -602,9 +603,9 @@ func (obj *EmbdEtcd) curls() (etcdtypes.URLs, error) {
|
|||||||
func (obj *EmbdEtcd) surls() (etcdtypes.URLs, error) {
|
func (obj *EmbdEtcd) surls() (etcdtypes.URLs, error) {
|
||||||
// TODO: do we need the copy?
|
// TODO: do we need the copy?
|
||||||
if len(obj.AServerURLs) > 0 {
|
if len(obj.AServerURLs) > 0 {
|
||||||
return copyURLs(obj.AServerURLs)
|
return etcdUtil.CopyURLs(obj.AServerURLs)
|
||||||
}
|
}
|
||||||
return copyURLs(obj.ServerURLs)
|
return etcdUtil.CopyURLs(obj.ServerURLs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// err is an error helper that sends to the errChan.
|
// err is an error helper that sends to the errChan.
|
||||||
@@ -1372,7 +1373,7 @@ func (obj *EmbdEtcd) Exited() <-chan struct{} { return obj.exitsSignal }
|
|||||||
// config returns the config struct to be used during the etcd client connect.
|
// config returns the config struct to be used during the etcd client connect.
|
||||||
func (obj *EmbdEtcd) config() etcd.Config {
|
func (obj *EmbdEtcd) config() etcd.Config {
|
||||||
// FIXME: filter out any urls which wouldn't resolve ?
|
// FIXME: filter out any urls which wouldn't resolve ?
|
||||||
endpoints := fromURLsMapToStringList(obj.endpoints) // flatten map
|
endpoints := etcdUtil.FromURLsMapToStringList(obj.endpoints) // flatten map
|
||||||
// We don't need to do any sort of priority sort here, since for initial
|
// We don't need to do any sort of priority sort here, since for initial
|
||||||
// connect we'd be the only one, so it doesn't matter, and subsequent
|
// connect we'd be the only one, so it doesn't matter, and subsequent
|
||||||
// changes are made with SetEndpoints, not here, so we never need to
|
// changes are made with SetEndpoints, not here, so we never need to
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/purpleidea/mgmt/etcd/interfaces"
|
"github.com/purpleidea/mgmt/etcd/interfaces"
|
||||||
|
etcdUtil "github.com/purpleidea/mgmt/etcd/util"
|
||||||
"github.com/purpleidea/mgmt/util"
|
"github.com/purpleidea/mgmt/util"
|
||||||
"github.com/purpleidea/mgmt/util/errwrap"
|
"github.com/purpleidea/mgmt/util/errwrap"
|
||||||
|
|
||||||
@@ -41,13 +42,13 @@ func (obj *EmbdEtcd) setEndpoints() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
eps := fromURLsMapToStringList(obj.endpoints) // get flat list
|
eps := etcdUtil.FromURLsMapToStringList(obj.endpoints) // get flat list
|
||||||
sort.Strings(eps) // sort for determinism
|
sort.Strings(eps) // sort for determinism
|
||||||
|
|
||||||
curls, _ := obj.curls() // ignore error, was already validated
|
curls, _ := obj.curls() // ignore error, was already validated
|
||||||
|
|
||||||
// prio sort so we connect locally first
|
// prio sort so we connect locally first
|
||||||
urls := fromURLsToStringList(curls)
|
urls := etcdUtil.FromURLsToStringList(curls)
|
||||||
headFn := func(x string) bool {
|
headFn := func(x string) bool {
|
||||||
return !util.StrInList(x, urls)
|
return !util.StrInList(x, urls)
|
||||||
}
|
}
|
||||||
@@ -113,7 +114,7 @@ func applyDeltaEvents(data *interfaces.WatcherData, urlsMap etcdtypes.URLsMap) (
|
|||||||
if err := data.Err; err != nil {
|
if err := data.Err; err != nil {
|
||||||
return nil, errwrap.Wrapf(err, "data contains an error")
|
return nil, errwrap.Wrapf(err, "data contains an error")
|
||||||
}
|
}
|
||||||
out, err := copyURLsMap(urlsMap)
|
out, err := etcdUtil.CopyURLsMap(urlsMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
etcdUtil "github.com/purpleidea/mgmt/etcd/util"
|
||||||
"github.com/purpleidea/mgmt/util/errwrap"
|
"github.com/purpleidea/mgmt/util/errwrap"
|
||||||
|
|
||||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
@@ -216,14 +217,14 @@ func (obj *EmbdEtcd) isLeader(ctx context.Context) (bool, error) {
|
|||||||
var ep, backup *url.URL
|
var ep, backup *url.URL
|
||||||
if len(obj.ClientURLs) > 0 {
|
if len(obj.ClientURLs) > 0 {
|
||||||
// heuristic, but probably correct
|
// heuristic, but probably correct
|
||||||
addresses := localhostURLs(obj.ClientURLs)
|
addresses := etcdUtil.LocalhostURLs(obj.ClientURLs)
|
||||||
if len(addresses) > 0 {
|
if len(addresses) > 0 {
|
||||||
ep = &addresses[0] // arbitrarily pick the first one
|
ep = &addresses[0] // arbitrarily pick the first one
|
||||||
}
|
}
|
||||||
backup = &obj.ClientURLs[0] // backup
|
backup = &obj.ClientURLs[0] // backup
|
||||||
}
|
}
|
||||||
if ep == nil && len(obj.AClientURLs) > 0 {
|
if ep == nil && len(obj.AClientURLs) > 0 {
|
||||||
addresses := localhostURLs(obj.AClientURLs)
|
addresses := etcdUtil.LocalhostURLs(obj.AClientURLs)
|
||||||
if len(addresses) > 0 {
|
if len(addresses) > 0 {
|
||||||
ep = &addresses[0]
|
ep = &addresses[0]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
etcdUtil "github.com/purpleidea/mgmt/etcd/util"
|
||||||
"github.com/purpleidea/mgmt/util"
|
"github.com/purpleidea/mgmt/util"
|
||||||
"github.com/purpleidea/mgmt/util/errwrap"
|
"github.com/purpleidea/mgmt/util/errwrap"
|
||||||
|
|
||||||
@@ -116,7 +117,7 @@ func (obj *EmbdEtcd) runServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) (
|
|||||||
if len(obj.ServerURLs) > 0 {
|
if len(obj.ServerURLs) > 0 {
|
||||||
peerURLs = obj.ServerURLs
|
peerURLs = obj.ServerURLs
|
||||||
}
|
}
|
||||||
initialPeerURLsMap, err := copyURLsMap(peerURLsMap)
|
initialPeerURLsMap, err := etcdUtil.CopyURLsMap(peerURLsMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errwrap.Wrapf(err, "error copying URLsMap")
|
return errwrap.Wrapf(err, "error copying URLsMap")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,9 +15,7 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package etcd
|
package util
|
||||||
|
|
||||||
// TODO: move to sub-package if this expands in utility or is used elsewhere...
|
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -38,9 +36,9 @@ func copyURL(u *url.URL) (*url.URL, error) {
|
|||||||
return url.Parse(u.String()) // copy it
|
return url.Parse(u.String()) // copy it
|
||||||
}
|
}
|
||||||
|
|
||||||
// copyURLs copies a URLs.
|
// CopyURLs copies a URLs.
|
||||||
// TODO: submit this upstream to etcd ?
|
// TODO: submit this upstream to etcd ?
|
||||||
func copyURLs(urls etcdtypes.URLs) (etcdtypes.URLs, error) {
|
func CopyURLs(urls etcdtypes.URLs) (etcdtypes.URLs, error) {
|
||||||
out := []url.URL{}
|
out := []url.URL{}
|
||||||
for _, x := range urls {
|
for _, x := range urls {
|
||||||
u, err := copyURL(&x)
|
u, err := copyURL(&x)
|
||||||
@@ -52,12 +50,12 @@ func copyURLs(urls etcdtypes.URLs) (etcdtypes.URLs, error) {
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// copyURLsMap copies a URLsMap.
|
// CopyURLsMap copies a URLsMap.
|
||||||
// TODO: submit this upstream to etcd ?
|
// TODO: submit this upstream to etcd ?
|
||||||
func copyURLsMap(urlsMap etcdtypes.URLsMap) (etcdtypes.URLsMap, error) {
|
func CopyURLsMap(urlsMap etcdtypes.URLsMap) (etcdtypes.URLsMap, error) {
|
||||||
out := make(etcdtypes.URLsMap)
|
out := make(etcdtypes.URLsMap)
|
||||||
for k, v := range urlsMap {
|
for k, v := range urlsMap {
|
||||||
urls, err := copyURLs(v)
|
urls, err := CopyURLs(v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -84,8 +82,8 @@ func cmpURLs(u1, u2 etcdtypes.URLs) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// cmpURLsMap compares two URLsMap's, and returns nil if they are the same.
|
// CmpURLsMap compares two URLsMap's, and returns nil if they are the same.
|
||||||
func cmpURLsMap(m1, m2 etcdtypes.URLsMap) error {
|
func CmpURLsMap(m1, m2 etcdtypes.URLsMap) error {
|
||||||
if (m1 == nil) != (m2 == nil) { // xor
|
if (m1 == nil) != (m2 == nil) { // xor
|
||||||
return fmt.Errorf("maps differ")
|
return fmt.Errorf("maps differ")
|
||||||
}
|
}
|
||||||
@@ -112,7 +110,9 @@ func newURLsMap() etcdtypes.URLsMap {
|
|||||||
return make(etcdtypes.URLsMap)
|
return make(etcdtypes.URLsMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
func fromURLsToStringList(urls etcdtypes.URLs) []string {
|
// FromURLsToStringList turns a list of etcd URLs into a list of strings using
|
||||||
|
// the full URL scheme.
|
||||||
|
func FromURLsToStringList(urls etcdtypes.URLs) []string {
|
||||||
result := []string{}
|
result := []string{}
|
||||||
for _, u := range urls { // flatten map
|
for _, u := range urls { // flatten map
|
||||||
result = append(result, u.String()) // use full url including scheme
|
result = append(result, u.String()) // use full url including scheme
|
||||||
@@ -120,9 +120,9 @@ func fromURLsToStringList(urls etcdtypes.URLs) []string {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// fromURLsMapToStringList flattens a map of URLs into a single string list.
|
// FromURLsMapToStringList flattens a map of URLs into a single string list.
|
||||||
// Remember to sort the result if you want it to be deterministic!
|
// Remember to sort the result if you want it to be deterministic!
|
||||||
func fromURLsMapToStringList(m etcdtypes.URLsMap) []string {
|
func FromURLsMapToStringList(m etcdtypes.URLsMap) []string {
|
||||||
result := []string{}
|
result := []string{}
|
||||||
for _, x := range m { // flatten map
|
for _, x := range m { // flatten map
|
||||||
for _, u := range x {
|
for _, u := range x {
|
||||||
@@ -134,14 +134,14 @@ func fromURLsMapToStringList(m etcdtypes.URLsMap) []string {
|
|||||||
|
|
||||||
// validateURLsMap checks if each embedded URL is parseable correctly.
|
// validateURLsMap checks if each embedded URL is parseable correctly.
|
||||||
//func validateURLsMap(urlsMap etcdtypes.URLsMap) error {
|
//func validateURLsMap(urlsMap etcdtypes.URLsMap) error {
|
||||||
// _, err := copyURLsMap(urlsMap) // would fail if anything didn't parse
|
// _, err := CopyURLsMap(urlsMap) // would fail if anything didn't parse
|
||||||
// return err
|
// return err
|
||||||
//}
|
//}
|
||||||
|
|
||||||
// localhostURLs returns the most localhost like URLs for direct connection.
|
// LocalhostURLs returns the most localhost like URLs for direct connection.
|
||||||
// This gets clients to talk to the local servers first before looking remotely.
|
// This gets clients to talk to the local servers first before looking remotely.
|
||||||
// TODO: improve this algorithm as it's currently a bad heuristic
|
// TODO: improve this algorithm as it's currently a bad heuristic
|
||||||
func localhostURLs(urls etcdtypes.URLs) etcdtypes.URLs {
|
func LocalhostURLs(urls etcdtypes.URLs) etcdtypes.URLs {
|
||||||
out := etcdtypes.URLs{}
|
out := etcdtypes.URLs{}
|
||||||
for _, u := range urls {
|
for _, u := range urls {
|
||||||
// "localhost" or anything in 127.0.0.0/8 is valid!
|
// "localhost" or anything in 127.0.0.0/8 is valid!
|
||||||
@@ -17,7 +17,7 @@
|
|||||||
|
|
||||||
//go:build !root
|
//go:build !root
|
||||||
|
|
||||||
package etcd
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/url"
|
"net/url"
|
||||||
@@ -94,7 +94,7 @@ func TestCopyURLs0(t *testing.T) {
|
|||||||
urls1 = append(urls1, *u)
|
urls1 = append(urls1, *u)
|
||||||
}
|
}
|
||||||
|
|
||||||
urls2, err := copyURLs(urls1)
|
urls2, err := CopyURLs(urls1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("urls did not copy: %+v", err)
|
t.Errorf("urls did not copy: %+v", err)
|
||||||
continue
|
continue
|
||||||
@@ -176,13 +176,13 @@ func TestCopyURLsMap0(t *testing.T) {
|
|||||||
urlsMap1[key] = urls
|
urlsMap1[key] = urls
|
||||||
}
|
}
|
||||||
|
|
||||||
urlsMap2, err := copyURLsMap(urlsMap1)
|
urlsMap2, err := CopyURLsMap(urlsMap1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("urlsMap did not copy: %+v", err)
|
t.Errorf("urlsMap did not copy: %+v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cmpURLsMap(urlsMap1, urlsMap2); err != nil {
|
if err := CmpURLsMap(urlsMap1, urlsMap2); err != nil {
|
||||||
t.Errorf("urlsMap did not cmp, err: %+v", err)
|
t.Errorf("urlsMap did not cmp, err: %+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user