etcd: Small cleanup of the package

Split things into multiple files, and fix up some doc formatting.
This commit is contained in:
James Shubin
2017-06-03 00:34:58 -04:00
parent 9941a97e37
commit d9601471df
4 changed files with 632 additions and 574 deletions

View File

@@ -65,7 +65,6 @@ import (
"github.com/purpleidea/mgmt/converger" "github.com/purpleidea/mgmt/converger"
"github.com/purpleidea/mgmt/event" "github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/resources"
"github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util"
etcd "github.com/coreos/etcd/clientv3" // "clientv3" etcd "github.com/coreos/etcd/clientv3" // "clientv3"
@@ -96,7 +95,7 @@ var (
errApplyDeltaEventsInconsistent = errors.New("inconsistent key in ApplyDeltaEvents") errApplyDeltaEventsInconsistent = errors.New("inconsistent key in ApplyDeltaEvents")
) )
// AW is a struct for the AddWatcher queue // AW is a struct for the AddWatcher queue.
type AW struct { type AW struct {
path string path string
opts []etcd.OpOption opts []etcd.OpOption
@@ -107,8 +106,8 @@ type AW struct {
cancelFunc func() // data cancelFunc func() // data
} }
// RE is a response + error struct since these two values often occur together // RE is a response + error struct since these two values often occur together.
// This is now called an event with the move to the etcd v3 API // This is now called an event with the move to the etcd v3 API.
type RE struct { type RE struct {
response etcd.WatchResponse response etcd.WatchResponse
path string path string
@@ -120,7 +119,7 @@ type RE struct {
retries uint // number of times we've retried on error retries uint // number of times we've retried on error
} }
// KV is a key + value struct to hold the two items together // KV is a key + value struct to hold the two items together.
type KV struct { type KV struct {
key string key string
value string value string
@@ -128,7 +127,7 @@ type KV struct {
resp event.Resp resp event.Resp
} }
// GQ is a struct for the get queue // GQ is a struct for the get queue.
type GQ struct { type GQ struct {
path string path string
skipConv bool skipConv bool
@@ -137,7 +136,7 @@ type GQ struct {
data map[string]string data map[string]string
} }
// DL is a struct for the delete queue // DL is a struct for the delete queue.
type DL struct { type DL struct {
path string path string
opts []etcd.OpOption opts []etcd.OpOption
@@ -145,7 +144,7 @@ type DL struct {
data int64 data int64
} }
// TN is a struct for the txn queue // TN is a struct for the txn queue.
type TN struct { type TN struct {
ifcmps []etcd.Cmp ifcmps []etcd.Cmp
thenops []etcd.Op thenops []etcd.Op
@@ -161,7 +160,7 @@ type Flags struct {
Verbose bool // add extra log message output Verbose bool // add extra log message output
} }
// EmbdEtcd provides the embedded server and client etcd functionality // EmbdEtcd provides the embedded server and client etcd functionality.
type EmbdEtcd struct { // EMBeddeD etcd type EmbdEtcd struct { // EMBeddeD etcd
// etcd client connection related // etcd client connection related
cLock sync.Mutex // client connect lock cLock sync.Mutex // client connect lock
@@ -207,7 +206,7 @@ type EmbdEtcd struct { // EMBeddeD etcd
dataDir string // our data dir, prefix + "etcd" dataDir string // our data dir, prefix + "etcd"
} }
// NewEmbdEtcd creates the top level embedded etcd struct client and server obj // NewEmbdEtcd creates the top level embedded etcd struct client and server obj.
func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, flags Flags, prefix string, converger converger.Converger) *EmbdEtcd { func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, flags Flags, prefix string, converger converger.Converger) *EmbdEtcd {
endpoints := make(etcdtypes.URLsMap) endpoints := make(etcdtypes.URLsMap)
if hostname == seedSentinel { // safety if hostname == seedSentinel { // safety
@@ -260,7 +259,7 @@ func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs,
return obj return obj
} }
// GetConfig returns the config struct to be used for the etcd client connect // GetConfig returns the config struct to be used for the etcd client connect.
func (obj *EmbdEtcd) GetConfig() etcd.Config { func (obj *EmbdEtcd) GetConfig() etcd.Config {
endpoints := []string{} endpoints := []string{}
// XXX: filter out any urls which wouldn't resolve here ? // XXX: filter out any urls which wouldn't resolve here ?
@@ -342,7 +341,7 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error {
return nil return nil
} }
// Startup is the main entry point to kick off the embedded etcd client & server // Startup is the main entry point to kick off the embedded etcd client & server.
func (obj *EmbdEtcd) Startup() error { func (obj *EmbdEtcd) Startup() error {
bootstrapping := len(obj.endpoints) == 0 // because value changes after start bootstrapping := len(obj.endpoints) == 0 // because value changes after start
@@ -464,7 +463,7 @@ func (obj *EmbdEtcd) Destroy() error {
return nil return nil
} }
// CtxDelayErr requests a retry in Delta duration // CtxDelayErr requests a retry in Delta duration.
type CtxDelayErr struct { type CtxDelayErr struct {
Delta time.Duration Delta time.Duration
Message string Message string
@@ -474,7 +473,7 @@ func (obj *CtxDelayErr) Error() string {
return fmt.Sprintf("CtxDelayErr(%v): %s", obj.Delta, obj.Message) return fmt.Sprintf("CtxDelayErr(%v): %s", obj.Delta, obj.Message)
} }
// CtxRetriesErr lets you retry as long as you have retries available // CtxRetriesErr lets you retry as long as you have retries available.
// TODO: consider combining this with CtxDelayErr // TODO: consider combining this with CtxDelayErr
type CtxRetriesErr struct { type CtxRetriesErr struct {
Retries uint Retries uint
@@ -494,7 +493,7 @@ func (obj *CtxPermanentErr) Error() string {
return fmt.Sprintf("CtxPermanentErr: %s", obj.Message) return fmt.Sprintf("CtxPermanentErr: %s", obj.Message)
} }
// CtxReconnectErr requests a client reconnect to the new endpoint list // CtxReconnectErr requests a client reconnect to the new endpoint list.
type CtxReconnectErr struct { type CtxReconnectErr struct {
Message string Message string
} }
@@ -503,7 +502,7 @@ func (obj *CtxReconnectErr) Error() string {
return fmt.Sprintf("CtxReconnectErr: %s", obj.Message) return fmt.Sprintf("CtxReconnectErr: %s", obj.Message)
} }
// CancelCtx adds a tracked cancel function around an existing context // CancelCtx adds a tracked cancel function around an existing context.
func (obj *EmbdEtcd) CancelCtx(ctx context.Context) (context.Context, func()) { func (obj *EmbdEtcd) CancelCtx(ctx context.Context) (context.Context, func()) {
cancelCtx, cancelFunc := context.WithCancel(ctx) cancelCtx, cancelFunc := context.WithCancel(ctx)
obj.cancelLock.Lock() obj.cancelLock.Lock()
@@ -512,7 +511,7 @@ func (obj *EmbdEtcd) CancelCtx(ctx context.Context) (context.Context, func()) {
return cancelCtx, cancelFunc return cancelCtx, cancelFunc
} }
// TimeoutCtx adds a tracked cancel function with timeout around an existing context // TimeoutCtx adds a tracked cancel function with timeout around an existing context.
func (obj *EmbdEtcd) TimeoutCtx(ctx context.Context, t time.Duration) (context.Context, func()) { func (obj *EmbdEtcd) TimeoutCtx(ctx context.Context, t time.Duration) (context.Context, func()) {
timeoutCtx, cancelFunc := context.WithTimeout(ctx, t) timeoutCtx, cancelFunc := context.WithTimeout(ctx, t)
obj.cancelLock.Lock() obj.cancelLock.Lock()
@@ -699,7 +698,7 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context,
return ctx, obj.ctxErr return ctx, obj.ctxErr
} }
// CbLoop is the loop where callback execution is serialized // CbLoop is the loop where callback execution is serialized.
func (obj *EmbdEtcd) CbLoop() { func (obj *EmbdEtcd) CbLoop() {
cuid := obj.converger.Register() cuid := obj.converger.Register()
cuid.SetName("Etcd: CbLoop") cuid.SetName("Etcd: CbLoop")
@@ -755,7 +754,7 @@ func (obj *EmbdEtcd) CbLoop() {
} }
} }
// Loop is the main loop where everything is serialized // Loop is the main loop where everything is serialized.
func (obj *EmbdEtcd) Loop() { func (obj *EmbdEtcd) Loop() {
cuid := obj.converger.Register() cuid := obj.converger.Register()
cuid.SetName("Etcd: Loop") cuid.SetName("Etcd: Loop")
@@ -933,7 +932,7 @@ func (obj *EmbdEtcd) loopProcessAW(ctx context.Context, aw *AW) {
} }
} }
// Set queues up a set operation to occur using our mainloop // Set queues up a set operation to occur using our mainloop.
func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error { func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error {
resp := event.NewResp() resp := event.NewResp()
obj.setq <- &KV{key: key, value: value, opts: opts, resp: resp} obj.setq <- &KV{key: key, value: value, opts: opts, resp: resp}
@@ -943,7 +942,7 @@ func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error {
return nil return nil
} }
// rawSet actually implements the key set operation // rawSet actually implements the key set operation.
func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error { func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error {
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: rawSet()") log.Printf("Trace: Etcd: rawSet()")
@@ -960,7 +959,7 @@ func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error {
return err return err
} }
// Get performs a get operation and waits for an ACK to continue // Get performs a get operation and waits for an ACK to continue.
func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, error) { func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, error) {
return obj.ComplexGet(path, false, opts...) return obj.ComplexGet(path, false, opts...)
} }
@@ -1001,7 +1000,7 @@ func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]stri
return return
} }
// Delete performs a delete operation and waits for an ACK to continue // Delete performs a delete operation and waits for an ACK to continue.
func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error) { func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error) {
resp := event.NewResp() resp := event.NewResp()
dl := &DL{path: path, opts: opts, resp: resp, data: -1} dl := &DL{path: path, opts: opts, resp: resp, data: -1}
@@ -1029,7 +1028,7 @@ func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err er
return return
} }
// Txn performs a transaction and waits for an ACK to continue // Txn performs a transaction and waits for an ACK to continue.
func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error) { func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error) {
resp := event.NewResp() resp := event.NewResp()
tn := &TN{ifcmps: ifcmps, thenops: thenops, elseops: elseops, resp: resp, data: nil} tn := &TN{ifcmps: ifcmps, thenops: thenops, elseops: elseops, resp: resp, data: nil}
@@ -1053,8 +1052,8 @@ func (obj *EmbdEtcd) rawTxn(ctx context.Context, tn *TN) (*etcd.TxnResponse, err
return response, err return response, err
} }
// AddWatcher queues up an add watcher request and returns a cancel function // AddWatcher queues up an add watcher request and returns a cancel function.
// Remember to add the etcd.WithPrefix() option if you want to watch recursively // Remember to add the etcd.WithPrefix() option if you want to watch recursively.
func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error) { func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error) {
resp := event.NewResp() resp := event.NewResp()
awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, skipConv: skipConv, cancelFunc: nil, resp: resp} awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, skipConv: skipConv, cancelFunc: nil, resp: resp}
@@ -1065,7 +1064,7 @@ func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errChe
return awq.cancelFunc, nil return awq.cancelFunc, nil
} }
// rawAddWatcher adds a watcher and returns a cancel function to call to end it // rawAddWatcher adds a watcher and returns a cancel function to call to end it.
func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error) { func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error) {
cancelCtx, cancelFunc := obj.CancelCtx(ctx) cancelCtx, cancelFunc := obj.CancelCtx(ctx)
go func(ctx context.Context) { go func(ctx context.Context) {
@@ -1142,7 +1141,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error)
return cancelFunc, nil return cancelFunc, nil
} }
// rawCallback is the companion to AddWatcher which runs the callback processing // rawCallback is the companion to AddWatcher which runs the callback processing.
func rawCallback(ctx context.Context, re *RE) error { func rawCallback(ctx context.Context, re *RE) error {
var err = re.err // the watch event itself might have had an error var err = re.err // the watch event itself might have had an error
if err == nil { if err == nil {
@@ -1161,8 +1160,8 @@ func rawCallback(ctx context.Context, re *RE) error {
return err return err
} }
// volunteerCallback runs to respond to the volunteer list change events // volunteerCallback runs to respond to the volunteer list change events.
// functionally, it controls the adding and removing of members // Functionally, it controls the adding and removing of members.
// FIXME: we might need to respond to member change/disconnect/shutdown events, // FIXME: we might need to respond to member change/disconnect/shutdown events,
// see: https://github.com/coreos/etcd/issues/5277 // see: https://github.com/coreos/etcd/issues/5277
func (obj *EmbdEtcd) volunteerCallback(re *RE) error { func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
@@ -1351,8 +1350,8 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
return nil return nil
} }
// nominateCallback runs to respond to the nomination list change events // nominateCallback runs to respond to the nomination list change events.
// functionally, it controls the starting and stopping of the server process // Functionally, it controls the starting and stopping of the server process.
func (obj *EmbdEtcd) nominateCallback(re *RE) error { func (obj *EmbdEtcd) nominateCallback(re *RE) error {
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: nominateCallback()") log.Printf("Trace: Etcd: nominateCallback()")
@@ -1504,7 +1503,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
return nil return nil
} }
// endpointCallback runs to respond to the endpoint list change events // endpointCallback runs to respond to the endpoint list change events.
func (obj *EmbdEtcd) endpointCallback(re *RE) error { func (obj *EmbdEtcd) endpointCallback(re *RE) error {
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: endpointCallback()") log.Printf("Trace: Etcd: endpointCallback()")
@@ -1570,7 +1569,7 @@ func (obj *EmbdEtcd) endpointCallback(re *RE) error {
return nil return nil
} }
// idealClusterSizeCallback runs to respond to the ideal cluster size changes // idealClusterSizeCallback runs to respond to the ideal cluster size changes.
func (obj *EmbdEtcd) idealClusterSizeCallback(re *RE) error { func (obj *EmbdEtcd) idealClusterSizeCallback(re *RE) error {
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: idealClusterSizeCallback()") log.Printf("Trace: Etcd: idealClusterSizeCallback()")
@@ -1604,8 +1603,8 @@ func (obj *EmbdEtcd) idealClusterSizeCallback(re *RE) error {
return nil return nil
} }
// LocalhostClientURLs returns the most localhost like URLs for direct connection // LocalhostClientURLs returns the most localhost like URLs for direct connection.
// this gets clients to talk to the local servers first before searching remotely // This gets clients to talk to the local servers first before searching remotely.
func (obj *EmbdEtcd) LocalhostClientURLs() etcdtypes.URLs { func (obj *EmbdEtcd) LocalhostClientURLs() etcdtypes.URLs {
// look through obj.clientURLs and return the localhost ones // look through obj.clientURLs and return the localhost ones
urls := etcdtypes.URLs{} urls := etcdtypes.URLs{}
@@ -1695,7 +1694,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap)
return nil return nil
} }
// DestroyServer shuts down the embedded etcd server portion // DestroyServer shuts down the embedded etcd server portion.
func (obj *EmbdEtcd) DestroyServer() error { func (obj *EmbdEtcd) DestroyServer() error {
var err error var err error
log.Printf("Etcd: DestroyServer: Destroying...") log.Printf("Etcd: DestroyServer: Destroying...")
@@ -1714,541 +1713,6 @@ func (obj *EmbdEtcd) DestroyServer() error {
return err return err
} }
// TODO: Could all these Etcd*(obj *EmbdEtcd, ...) functions which deal with the
// interface between etcd paths and behaviour be grouped into a single struct ?
// Nominate nominates a particular client to be a server (peer)
func Nominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error {
if obj.flags.Trace {
log.Printf("Trace: Etcd: Nominate(%v): %v", hostname, urls.String())
defer log.Printf("Trace: Etcd: Nominate(%v): Finished!", hostname)
}
// nominate someone to be a server
nominate := fmt.Sprintf("/%s/nominated/%s", NS, hostname)
ops := []etcd.Op{} // list of ops in this txn
if urls != nil {
ops = append(ops, etcd.OpPut(nominate, urls.String())) // TODO: add a TTL? (etcd.WithLease)
} else { // delete message if set to erase
ops = append(ops, etcd.OpDelete(nominate))
}
if _, err := obj.Txn(nil, ops, nil); err != nil {
return fmt.Errorf("nominate failed") // exit in progress?
}
return nil
}
// Nominated returns a urls map of nominated etcd server volunteers
// NOTE: I know 'nominees' might be more correct, but is less consistent here
func Nominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
path := fmt.Sprintf("/%s/nominated/", NS)
keyMap, err := obj.Get(path, etcd.WithPrefix()) // map[string]string, bool
if err != nil {
return nil, fmt.Errorf("nominated isn't available: %v", err)
}
nominated := make(etcdtypes.URLsMap)
for key, val := range keyMap { // loop through directory of nominated
if !strings.HasPrefix(key, path) {
continue
}
name := key[len(path):] // get name of nominee
if val == "" { // skip "erased" values
continue
}
urls, err := etcdtypes.NewURLs(strings.Split(val, ","))
if err != nil {
return nil, fmt.Errorf("nominated data format error: %v", err)
}
nominated[name] = urls // add to map
if obj.flags.Debug {
log.Printf("Etcd: Nominated(%v): %v", name, val)
}
}
return nominated, nil
}
// Volunteer offers yourself up to be a server if needed
func Volunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error {
if obj.flags.Trace {
log.Printf("Trace: Etcd: Volunteer(%v): %v", obj.hostname, urls.String())
defer log.Printf("Trace: Etcd: Volunteer(%v): Finished!", obj.hostname)
}
// volunteer to be a server
volunteer := fmt.Sprintf("/%s/volunteers/%s", NS, obj.hostname)
ops := []etcd.Op{} // list of ops in this txn
if urls != nil {
// XXX: adding a TTL is crucial! (i think)
ops = append(ops, etcd.OpPut(volunteer, urls.String())) // value is usually a peer "serverURL"
} else { // delete message if set to erase
ops = append(ops, etcd.OpDelete(volunteer))
}
if _, err := obj.Txn(nil, ops, nil); err != nil {
return fmt.Errorf("volunteering failed") // exit in progress?
}
return nil
}
// Volunteers returns a urls map of available etcd server volunteers
func Volunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
if obj.flags.Trace {
log.Printf("Trace: Etcd: Volunteers()")
defer log.Printf("Trace: Etcd: Volunteers(): Finished!")
}
path := fmt.Sprintf("/%s/volunteers/", NS)
keyMap, err := obj.Get(path, etcd.WithPrefix())
if err != nil {
return nil, fmt.Errorf("volunteers aren't available: %v", err)
}
volunteers := make(etcdtypes.URLsMap)
for key, val := range keyMap { // loop through directory of volunteers
if !strings.HasPrefix(key, path) {
continue
}
name := key[len(path):] // get name of volunteer
if val == "" { // skip "erased" values
continue
}
urls, err := etcdtypes.NewURLs(strings.Split(val, ","))
if err != nil {
return nil, fmt.Errorf("volunteers data format error: %v", err)
}
volunteers[name] = urls // add to map
if obj.flags.Debug {
log.Printf("Etcd: Volunteer(%v): %v", name, val)
}
}
return volunteers, nil
}
// AdvertiseEndpoints advertises the list of available client endpoints
func AdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error {
if obj.flags.Trace {
log.Printf("Trace: Etcd: AdvertiseEndpoints(%v): %v", obj.hostname, urls.String())
defer log.Printf("Trace: Etcd: AdvertiseEndpoints(%v): Finished!", obj.hostname)
}
// advertise endpoints
endpoints := fmt.Sprintf("/%s/endpoints/%s", NS, obj.hostname)
ops := []etcd.Op{} // list of ops in this txn
if urls != nil {
// TODO: add a TTL? (etcd.WithLease)
ops = append(ops, etcd.OpPut(endpoints, urls.String())) // value is usually a "clientURL"
} else { // delete message if set to erase
ops = append(ops, etcd.OpDelete(endpoints))
}
if _, err := obj.Txn(nil, ops, nil); err != nil {
return fmt.Errorf("endpoint advertising failed") // exit in progress?
}
return nil
}
// Endpoints returns a urls map of available etcd server endpoints
func Endpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
if obj.flags.Trace {
log.Printf("Trace: Etcd: Endpoints()")
defer log.Printf("Trace: Etcd: Endpoints(): Finished!")
}
path := fmt.Sprintf("/%s/endpoints/", NS)
keyMap, err := obj.Get(path, etcd.WithPrefix())
if err != nil {
return nil, fmt.Errorf("endpoints aren't available: %v", err)
}
endpoints := make(etcdtypes.URLsMap)
for key, val := range keyMap { // loop through directory of endpoints
if !strings.HasPrefix(key, path) {
continue
}
name := key[len(path):] // get name of volunteer
if val == "" { // skip "erased" values
continue
}
urls, err := etcdtypes.NewURLs(strings.Split(val, ","))
if err != nil {
return nil, fmt.Errorf("endpoints data format error: %v", err)
}
endpoints[name] = urls // add to map
if obj.flags.Debug {
log.Printf("Etcd: Endpoint(%v): %v", name, val)
}
}
return endpoints, nil
}
// SetHostnameConverged sets whether a specific hostname is converged.
func SetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error {
if obj.flags.Trace {
log.Printf("Trace: Etcd: SetHostnameConverged(%s): %v", hostname, isConverged)
defer log.Printf("Trace: Etcd: SetHostnameConverged(%v): Finished!", hostname)
}
converged := fmt.Sprintf("/%s/converged/%s", NS, hostname)
op := []etcd.Op{etcd.OpPut(converged, fmt.Sprintf("%t", isConverged))}
if _, err := obj.Txn(nil, op, nil); err != nil { // TODO: do we need a skipConv flag here too?
return fmt.Errorf("set converged failed") // exit in progress?
}
return nil
}
// HostnameConverged returns a map of every hostname's converged state.
func HostnameConverged(obj *EmbdEtcd) (map[string]bool, error) {
if obj.flags.Trace {
log.Printf("Trace: Etcd: HostnameConverged()")
defer log.Printf("Trace: Etcd: HostnameConverged(): Finished!")
}
path := fmt.Sprintf("/%s/converged/", NS)
keyMap, err := obj.ComplexGet(path, true, etcd.WithPrefix()) // don't un-converge
if err != nil {
return nil, fmt.Errorf("converged values aren't available: %v", err)
}
converged := make(map[string]bool)
for key, val := range keyMap { // loop through directory...
if !strings.HasPrefix(key, path) {
continue
}
name := key[len(path):] // get name of key
if val == "" { // skip "erased" values
continue
}
b, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("converged data format error: %v", err)
}
converged[name] = b // add to map
}
return converged, nil
}
// AddHostnameConvergedWatcher adds a watcher with a callback that runs on
// hostname state changes.
func AddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error) {
path := fmt.Sprintf("/%s/converged/", NS)
internalCbFn := func(re *RE) error {
// TODO: get the value from the response, and apply delta...
// for now, just run a get operation which is easier to code!
m, err := HostnameConverged(obj)
if err != nil {
return err
}
return callbackFn(m) // call my function
}
return obj.AddWatcher(path, internalCbFn, true, true, etcd.WithPrefix()) // no block and no converger reset
}
// SetClusterSize sets the ideal target cluster size of etcd peers
func SetClusterSize(obj *EmbdEtcd, value uint16) error {
if obj.flags.Trace {
log.Printf("Trace: Etcd: SetClusterSize(): %v", value)
defer log.Printf("Trace: Etcd: SetClusterSize(): Finished!")
}
key := fmt.Sprintf("/%s/idealClusterSize", NS)
if err := obj.Set(key, strconv.FormatUint(uint64(value), 10)); err != nil {
return fmt.Errorf("function SetClusterSize failed: %v", err) // exit in progress?
}
return nil
}
// GetClusterSize gets the ideal target cluster size of etcd peers
func GetClusterSize(obj *EmbdEtcd) (uint16, error) {
key := fmt.Sprintf("/%s/idealClusterSize", NS)
keyMap, err := obj.Get(key)
if err != nil {
return 0, fmt.Errorf("function GetClusterSize failed: %v", err)
}
val, exists := keyMap[key]
if !exists || val == "" {
return 0, fmt.Errorf("function GetClusterSize failed: %v", err)
}
v, err := strconv.ParseUint(val, 10, 16)
if err != nil {
return 0, fmt.Errorf("function GetClusterSize failed: %v", err)
}
return uint16(v), nil
}
// MemberAdd adds a member to the cluster.
func MemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error) {
//obj.Connect(false) // TODO: ?
ctx := context.Background()
var response *etcd.MemberAddResponse
var err error
for {
if obj.exiting { // the exit signal has been sent!
return nil, fmt.Errorf("exiting etcd")
}
obj.rLock.RLock()
response, err = obj.client.MemberAdd(ctx, peerURLs.StringSlice())
obj.rLock.RUnlock()
if err == nil {
break
}
if ctx, err = obj.CtxError(ctx, err); err != nil {
return nil, err
}
}
return response, nil
}
// MemberRemove removes a member by mID and returns if it worked, and also
// if there was an error. This is because it might have run without error, but
// the member wasn't found, for example.
func MemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) {
//obj.Connect(false) // TODO: ?
ctx := context.Background()
for {
if obj.exiting { // the exit signal has been sent!
return false, fmt.Errorf("exiting etcd")
}
obj.rLock.RLock()
_, err := obj.client.MemberRemove(ctx, mID)
obj.rLock.RUnlock()
if err == nil {
break
} else if err == rpctypes.ErrMemberNotFound {
// if we get this, member already shut itself down :)
return false, nil
}
if ctx, err = obj.CtxError(ctx, err); err != nil {
return false, err
}
}
return true, nil
}
// Members returns information on cluster membership.
// The member ID's are the keys, because an empty names means unstarted!
// TODO: consider queueing this through the main loop with CtxError(ctx, err)
func Members(obj *EmbdEtcd) (map[uint64]string, error) {
//obj.Connect(false) // TODO: ?
ctx := context.Background()
var response *etcd.MemberListResponse
var err error
for {
if obj.exiting { // the exit signal has been sent!
return nil, fmt.Errorf("exiting etcd")
}
obj.rLock.RLock()
if obj.flags.Trace {
log.Printf("Trace: Etcd: Members(): Endpoints are: %v", obj.client.Endpoints())
}
response, err = obj.client.MemberList(ctx)
obj.rLock.RUnlock()
if err == nil {
break
}
if ctx, err = obj.CtxError(ctx, err); err != nil {
return nil, err
}
}
members := make(map[uint64]string)
for _, x := range response.Members {
members[x.ID] = x.Name // x.Name will be "" if unstarted!
}
return members, nil
}
// Leader returns the current leader of the etcd server cluster
func Leader(obj *EmbdEtcd) (string, error) {
//obj.Connect(false) // TODO: ?
var err error
membersMap := make(map[uint64]string)
if membersMap, err = Members(obj); err != nil {
return "", err
}
addresses := obj.LocalhostClientURLs() // heuristic, but probably correct
if len(addresses) == 0 {
// probably a programming error...
return "", fmt.Errorf("programming error")
}
endpoint := addresses[0].Host // FIXME: arbitrarily picked the first one
// part two
ctx := context.Background()
var response *etcd.StatusResponse
for {
if obj.exiting { // the exit signal has been sent!
return "", fmt.Errorf("exiting etcd")
}
obj.rLock.RLock()
response, err = obj.client.Maintenance.Status(ctx, endpoint)
obj.rLock.RUnlock()
if err == nil {
break
}
if ctx, err = obj.CtxError(ctx, err); err != nil {
return "", err
}
}
// isLeader: response.Header.MemberId == response.Leader
for id, name := range membersMap {
if id == response.Leader {
return name, nil
}
}
return "", fmt.Errorf("members map is not current") // not found
}
// WatchResources returns a channel that outputs events when exported resources
// change.
// TODO: Filter our watch (on the server side if possible) based on the
// collection prefixes and filters that we care about...
func WatchResources(obj *EmbdEtcd) chan error {
ch := make(chan error, 1) // buffer it so we can measure it
path := fmt.Sprintf("/%s/exported/", NS)
callback := func(re *RE) error {
// TODO: is this even needed? it used to happen on conn errors
log.Printf("Etcd: Watch: Path: %v", path) // event
if re == nil || re.response.Canceled {
return fmt.Errorf("watch is empty") // will cause a CtxError+retry
}
// we normally need to check if anything changed since the last
// event, since a set (export) with no changes still causes the
// watcher to trigger and this would cause an infinite loop. we
// don't need to do this check anymore because we do the export
// transactionally, and only if a change is needed. since it is
// atomic, all the changes arrive together which avoids dupes!!
if len(ch) == 0 { // send event only if one isn't pending
// this check avoids multiple events all queueing up and then
// being released continuously long after the changes stopped
// do not block!
ch <- nil // event
}
return nil
}
_, _ = obj.AddWatcher(path, callback, true, false, etcd.WithPrefix()) // no need to check errors
return ch
}
// SetResources exports all of the resources which we pass in to etcd
func SetResources(obj *EmbdEtcd, hostname string, resourceList []resources.Res) error {
// key structure is /$NS/exported/$hostname/resources/$uid = $data
var kindFilter []string // empty to get from everyone
hostnameFilter := []string{hostname}
// this is not a race because we should only be reading keys which we
// set, and there should not be any contention with other hosts here!
originals, err := GetResources(obj, hostnameFilter, kindFilter)
if err != nil {
return err
}
if len(originals) == 0 && len(resourceList) == 0 { // special case of no add or del
return nil
}
ifs := []etcd.Cmp{} // list matching the desired state
ops := []etcd.Op{} // list of ops in this transaction
for _, res := range resourceList {
if res.GetKind() == "" {
log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.GetName())
}
uid := fmt.Sprintf("%s/%s", res.GetKind(), res.GetName())
path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uid)
if data, err := resources.ResToB64(res); err == nil {
ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", data)) // desired state
ops = append(ops, etcd.OpPut(path, data))
} else {
return fmt.Errorf("can't convert to B64: %v", err)
}
}
match := func(res resources.Res, resourceList []resources.Res) bool { // helper lambda
for _, x := range resourceList {
if res.GetKind() == x.GetKind() && res.GetName() == x.GetName() {
return true
}
}
return false
}
hasDeletes := false
// delete old, now unused resources here...
for _, res := range originals {
if res.GetKind() == "" {
log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.GetName())
}
uid := fmt.Sprintf("%s/%s", res.GetKind(), res.GetName())
path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uid)
if match(res, resourceList) { // if we match, no need to delete!
continue
}
ops = append(ops, etcd.OpDelete(path))
hasDeletes = true
}
// if everything is already correct, do nothing, otherwise, run the ops!
// it's important to do this in one transaction, and atomically, because
// this way, we only generate one watch event, and only when it's needed
if hasDeletes { // always run, ifs don't matter
_, err = obj.Txn(nil, ops, nil) // TODO: does this run? it should!
} else {
_, err = obj.Txn(ifs, nil, ops) // TODO: do we need to look at response?
}
return err
}
// GetResources collects all of the resources which match a filter from etcd
// If the kindfilter or hostnameFilter is empty, then it assumes no filtering...
// TODO: Expand this with a more powerful filter based on what we eventually
// support in our collect DSL. Ideally a server side filter like WithFilter()
// We could do this if the pattern was /$NS/exported/$kind/$hostname/$uid = $data
func GetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resources.Res, error) {
// key structure is /$NS/exported/$hostname/resources/$uid = $data
path := fmt.Sprintf("/%s/exported/", NS)
resourceList := []resources.Res{}
keyMap, err := obj.Get(path, etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortAscend))
if err != nil {
return nil, fmt.Errorf("could not get resources: %v", err)
}
for key, val := range keyMap {
if !strings.HasPrefix(key, path) { // sanity check
continue
}
str := strings.Split(key[len(path):], "/")
if len(str) != 4 {
return nil, fmt.Errorf("unexpected chunk count")
}
hostname, r, kind, name := str[0], str[1], str[2], str[3]
if r != "resources" {
return nil, fmt.Errorf("unexpected chunk pattern")
}
if kind == "" {
return nil, fmt.Errorf("unexpected kind chunk")
}
// FIXME: ideally this would be a server side filter instead!
if len(hostnameFilter) > 0 && !util.StrInList(hostname, hostnameFilter) {
continue
}
// FIXME: ideally this would be a server side filter instead!
if len(kindFilter) > 0 && !util.StrInList(kind, kindFilter) {
continue
}
if obj, err := resources.B64ToRes(val); err == nil {
obj.SetKind(kind) // cheap init
log.Printf("Etcd: Get: (Hostname, Kind, Name): (%s, %s, %s)", hostname, kind, name)
resourceList = append(resourceList, obj)
} else {
return nil, fmt.Errorf("can't convert from B64: %v", err)
}
}
return resourceList, nil
}
//func UrlRemoveScheme(urls etcdtypes.URLs) []string { //func UrlRemoveScheme(urls etcdtypes.URLs) []string {
// strs := []string{} // strs := []string{}
// for _, u := range urls { // for _, u := range urls {
@@ -2257,7 +1721,7 @@ func GetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resourc
// return strs // return strs
//} //}
// ApplyDeltaEvents modifies a URLsMap with the deltas from a WatchResponse // ApplyDeltaEvents modifies a URLsMap with the deltas from a WatchResponse.
func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, error) { func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, error) {
if re == nil { // passthrough if re == nil { // passthrough
return urlsmap, nil return urlsmap, nil

412
etcd/methods.go Normal file
View File

@@ -0,0 +1,412 @@
// Mgmt
// Copyright (C) 2013-2017+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package etcd
import (
"fmt"
"log"
"strconv"
"strings"
etcd "github.com/coreos/etcd/clientv3"
rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
etcdtypes "github.com/coreos/etcd/pkg/types"
context "golang.org/x/net/context"
)
// TODO: Could all these Etcd*(obj *EmbdEtcd, ...) functions which deal with the
// interface between etcd paths and behaviour be grouped into a single struct ?
// Nominate nominates a particular client to be a server (peer).
func Nominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error {
if obj.flags.Trace {
log.Printf("Trace: Etcd: Nominate(%v): %v", hostname, urls.String())
defer log.Printf("Trace: Etcd: Nominate(%v): Finished!", hostname)
}
// nominate someone to be a server
nominate := fmt.Sprintf("/%s/nominated/%s", NS, hostname)
ops := []etcd.Op{} // list of ops in this txn
if urls != nil {
ops = append(ops, etcd.OpPut(nominate, urls.String())) // TODO: add a TTL? (etcd.WithLease)
} else { // delete message if set to erase
ops = append(ops, etcd.OpDelete(nominate))
}
if _, err := obj.Txn(nil, ops, nil); err != nil {
return fmt.Errorf("nominate failed") // exit in progress?
}
return nil
}
// Nominated returns a urls map of nominated etcd server volunteers.
// NOTE: I know 'nominees' might be more correct, but is less consistent here
func Nominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
path := fmt.Sprintf("/%s/nominated/", NS)
keyMap, err := obj.Get(path, etcd.WithPrefix()) // map[string]string, bool
if err != nil {
return nil, fmt.Errorf("nominated isn't available: %v", err)
}
nominated := make(etcdtypes.URLsMap)
for key, val := range keyMap { // loop through directory of nominated
if !strings.HasPrefix(key, path) {
continue
}
name := key[len(path):] // get name of nominee
if val == "" { // skip "erased" values
continue
}
urls, err := etcdtypes.NewURLs(strings.Split(val, ","))
if err != nil {
return nil, fmt.Errorf("nominated data format error: %v", err)
}
nominated[name] = urls // add to map
if obj.flags.Debug {
log.Printf("Etcd: Nominated(%v): %v", name, val)
}
}
return nominated, nil
}
// Volunteer offers yourself up to be a server if needed.
func Volunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error {
if obj.flags.Trace {
log.Printf("Trace: Etcd: Volunteer(%v): %v", obj.hostname, urls.String())
defer log.Printf("Trace: Etcd: Volunteer(%v): Finished!", obj.hostname)
}
// volunteer to be a server
volunteer := fmt.Sprintf("/%s/volunteers/%s", NS, obj.hostname)
ops := []etcd.Op{} // list of ops in this txn
if urls != nil {
// XXX: adding a TTL is crucial! (i think)
ops = append(ops, etcd.OpPut(volunteer, urls.String())) // value is usually a peer "serverURL"
} else { // delete message if set to erase
ops = append(ops, etcd.OpDelete(volunteer))
}
if _, err := obj.Txn(nil, ops, nil); err != nil {
return fmt.Errorf("volunteering failed") // exit in progress?
}
return nil
}
// Volunteers returns a urls map of available etcd server volunteers.
func Volunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
if obj.flags.Trace {
log.Printf("Trace: Etcd: Volunteers()")
defer log.Printf("Trace: Etcd: Volunteers(): Finished!")
}
path := fmt.Sprintf("/%s/volunteers/", NS)
keyMap, err := obj.Get(path, etcd.WithPrefix())
if err != nil {
return nil, fmt.Errorf("volunteers aren't available: %v", err)
}
volunteers := make(etcdtypes.URLsMap)
for key, val := range keyMap { // loop through directory of volunteers
if !strings.HasPrefix(key, path) {
continue
}
name := key[len(path):] // get name of volunteer
if val == "" { // skip "erased" values
continue
}
urls, err := etcdtypes.NewURLs(strings.Split(val, ","))
if err != nil {
return nil, fmt.Errorf("volunteers data format error: %v", err)
}
volunteers[name] = urls // add to map
if obj.flags.Debug {
log.Printf("Etcd: Volunteer(%v): %v", name, val)
}
}
return volunteers, nil
}
// AdvertiseEndpoints advertises the list of available client endpoints.
func AdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error {
if obj.flags.Trace {
log.Printf("Trace: Etcd: AdvertiseEndpoints(%v): %v", obj.hostname, urls.String())
defer log.Printf("Trace: Etcd: AdvertiseEndpoints(%v): Finished!", obj.hostname)
}
// advertise endpoints
endpoints := fmt.Sprintf("/%s/endpoints/%s", NS, obj.hostname)
ops := []etcd.Op{} // list of ops in this txn
if urls != nil {
// TODO: add a TTL? (etcd.WithLease)
ops = append(ops, etcd.OpPut(endpoints, urls.String())) // value is usually a "clientURL"
} else { // delete message if set to erase
ops = append(ops, etcd.OpDelete(endpoints))
}
if _, err := obj.Txn(nil, ops, nil); err != nil {
return fmt.Errorf("endpoint advertising failed") // exit in progress?
}
return nil
}
// Endpoints returns a urls map of available etcd server endpoints.
func Endpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
if obj.flags.Trace {
log.Printf("Trace: Etcd: Endpoints()")
defer log.Printf("Trace: Etcd: Endpoints(): Finished!")
}
path := fmt.Sprintf("/%s/endpoints/", NS)
keyMap, err := obj.Get(path, etcd.WithPrefix())
if err != nil {
return nil, fmt.Errorf("endpoints aren't available: %v", err)
}
endpoints := make(etcdtypes.URLsMap)
for key, val := range keyMap { // loop through directory of endpoints
if !strings.HasPrefix(key, path) {
continue
}
name := key[len(path):] // get name of volunteer
if val == "" { // skip "erased" values
continue
}
urls, err := etcdtypes.NewURLs(strings.Split(val, ","))
if err != nil {
return nil, fmt.Errorf("endpoints data format error: %v", err)
}
endpoints[name] = urls // add to map
if obj.flags.Debug {
log.Printf("Etcd: Endpoint(%v): %v", name, val)
}
}
return endpoints, nil
}
// SetHostnameConverged sets whether a specific hostname is converged.
func SetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error {
if obj.flags.Trace {
log.Printf("Trace: Etcd: SetHostnameConverged(%s): %v", hostname, isConverged)
defer log.Printf("Trace: Etcd: SetHostnameConverged(%v): Finished!", hostname)
}
converged := fmt.Sprintf("/%s/converged/%s", NS, hostname)
op := []etcd.Op{etcd.OpPut(converged, fmt.Sprintf("%t", isConverged))}
if _, err := obj.Txn(nil, op, nil); err != nil { // TODO: do we need a skipConv flag here too?
return fmt.Errorf("set converged failed") // exit in progress?
}
return nil
}
// HostnameConverged returns a map of every hostname's converged state.
func HostnameConverged(obj *EmbdEtcd) (map[string]bool, error) {
if obj.flags.Trace {
log.Printf("Trace: Etcd: HostnameConverged()")
defer log.Printf("Trace: Etcd: HostnameConverged(): Finished!")
}
path := fmt.Sprintf("/%s/converged/", NS)
keyMap, err := obj.ComplexGet(path, true, etcd.WithPrefix()) // don't un-converge
if err != nil {
return nil, fmt.Errorf("converged values aren't available: %v", err)
}
converged := make(map[string]bool)
for key, val := range keyMap { // loop through directory...
if !strings.HasPrefix(key, path) {
continue
}
name := key[len(path):] // get name of key
if val == "" { // skip "erased" values
continue
}
b, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("converged data format error: %v", err)
}
converged[name] = b // add to map
}
return converged, nil
}
// AddHostnameConvergedWatcher adds a watcher with a callback that runs on
// hostname state changes.
func AddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error) {
path := fmt.Sprintf("/%s/converged/", NS)
internalCbFn := func(re *RE) error {
// TODO: get the value from the response, and apply delta...
// for now, just run a get operation which is easier to code!
m, err := HostnameConverged(obj)
if err != nil {
return err
}
return callbackFn(m) // call my function
}
return obj.AddWatcher(path, internalCbFn, true, true, etcd.WithPrefix()) // no block and no converger reset
}
// SetClusterSize sets the ideal target cluster size of etcd peers.
func SetClusterSize(obj *EmbdEtcd, value uint16) error {
if obj.flags.Trace {
log.Printf("Trace: Etcd: SetClusterSize(): %v", value)
defer log.Printf("Trace: Etcd: SetClusterSize(): Finished!")
}
key := fmt.Sprintf("/%s/idealClusterSize", NS)
if err := obj.Set(key, strconv.FormatUint(uint64(value), 10)); err != nil {
return fmt.Errorf("function SetClusterSize failed: %v", err) // exit in progress?
}
return nil
}
// GetClusterSize gets the ideal target cluster size of etcd peers.
func GetClusterSize(obj *EmbdEtcd) (uint16, error) {
key := fmt.Sprintf("/%s/idealClusterSize", NS)
keyMap, err := obj.Get(key)
if err != nil {
return 0, fmt.Errorf("function GetClusterSize failed: %v", err)
}
val, exists := keyMap[key]
if !exists || val == "" {
return 0, fmt.Errorf("function GetClusterSize failed: %v", err)
}
v, err := strconv.ParseUint(val, 10, 16)
if err != nil {
return 0, fmt.Errorf("function GetClusterSize failed: %v", err)
}
return uint16(v), nil
}
// MemberAdd adds a member to the cluster.
func MemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error) {
//obj.Connect(false) // TODO: ?
ctx := context.Background()
var response *etcd.MemberAddResponse
var err error
for {
if obj.exiting { // the exit signal has been sent!
return nil, fmt.Errorf("exiting etcd")
}
obj.rLock.RLock()
response, err = obj.client.MemberAdd(ctx, peerURLs.StringSlice())
obj.rLock.RUnlock()
if err == nil {
break
}
if ctx, err = obj.CtxError(ctx, err); err != nil {
return nil, err
}
}
return response, nil
}
// MemberRemove removes a member by mID and returns if it worked, and also
// if there was an error. This is because it might have run without error, but
// the member wasn't found, for example.
func MemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) {
//obj.Connect(false) // TODO: ?
ctx := context.Background()
for {
if obj.exiting { // the exit signal has been sent!
return false, fmt.Errorf("exiting etcd")
}
obj.rLock.RLock()
_, err := obj.client.MemberRemove(ctx, mID)
obj.rLock.RUnlock()
if err == nil {
break
} else if err == rpctypes.ErrMemberNotFound {
// if we get this, member already shut itself down :)
return false, nil
}
if ctx, err = obj.CtxError(ctx, err); err != nil {
return false, err
}
}
return true, nil
}
// Members returns information on cluster membership.
// The member ID's are the keys, because an empty names means unstarted!
// TODO: consider queueing this through the main loop with CtxError(ctx, err)
func Members(obj *EmbdEtcd) (map[uint64]string, error) {
//obj.Connect(false) // TODO: ?
ctx := context.Background()
var response *etcd.MemberListResponse
var err error
for {
if obj.exiting { // the exit signal has been sent!
return nil, fmt.Errorf("exiting etcd")
}
obj.rLock.RLock()
if obj.flags.Trace {
log.Printf("Trace: Etcd: Members(): Endpoints are: %v", obj.client.Endpoints())
}
response, err = obj.client.MemberList(ctx)
obj.rLock.RUnlock()
if err == nil {
break
}
if ctx, err = obj.CtxError(ctx, err); err != nil {
return nil, err
}
}
members := make(map[uint64]string)
for _, x := range response.Members {
members[x.ID] = x.Name // x.Name will be "" if unstarted!
}
return members, nil
}
// Leader returns the current leader of the etcd server cluster.
func Leader(obj *EmbdEtcd) (string, error) {
//obj.Connect(false) // TODO: ?
var err error
membersMap := make(map[uint64]string)
if membersMap, err = Members(obj); err != nil {
return "", err
}
addresses := obj.LocalhostClientURLs() // heuristic, but probably correct
if len(addresses) == 0 {
// probably a programming error...
return "", fmt.Errorf("programming error")
}
endpoint := addresses[0].Host // FIXME: arbitrarily picked the first one
// part two
ctx := context.Background()
var response *etcd.StatusResponse
for {
if obj.exiting { // the exit signal has been sent!
return "", fmt.Errorf("exiting etcd")
}
obj.rLock.RLock()
response, err = obj.client.Maintenance.Status(ctx, endpoint)
obj.rLock.RUnlock()
if err == nil {
break
}
if ctx, err = obj.CtxError(ctx, err); err != nil {
return "", err
}
}
// isLeader: response.Header.MemberId == response.Leader
for id, name := range membersMap {
if id == response.Leader {
return name, nil
}
}
return "", fmt.Errorf("members map is not current") // not found
}

182
etcd/resources.go Normal file
View File

@@ -0,0 +1,182 @@
// Mgmt
// Copyright (C) 2013-2017+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package etcd
import (
"fmt"
"log"
"strings"
"github.com/purpleidea/mgmt/resources"
"github.com/purpleidea/mgmt/util"
etcd "github.com/coreos/etcd/clientv3"
)
// WatchResources returns a channel that outputs events when exported resources
// change.
// TODO: Filter our watch (on the server side if possible) based on the
// collection prefixes and filters that we care about...
func WatchResources(obj *EmbdEtcd) chan error {
ch := make(chan error, 1) // buffer it so we can measure it
path := fmt.Sprintf("/%s/exported/", NS)
callback := func(re *RE) error {
// TODO: is this even needed? it used to happen on conn errors
log.Printf("Etcd: Watch: Path: %v", path) // event
if re == nil || re.response.Canceled {
return fmt.Errorf("watch is empty") // will cause a CtxError+retry
}
// we normally need to check if anything changed since the last
// event, since a set (export) with no changes still causes the
// watcher to trigger and this would cause an infinite loop. we
// don't need to do this check anymore because we do the export
// transactionally, and only if a change is needed. since it is
// atomic, all the changes arrive together which avoids dupes!!
if len(ch) == 0 { // send event only if one isn't pending
// this check avoids multiple events all queueing up and then
// being released continuously long after the changes stopped
// do not block!
ch <- nil // event
}
return nil
}
_, _ = obj.AddWatcher(path, callback, true, false, etcd.WithPrefix()) // no need to check errors
return ch
}
// SetResources exports all of the resources which we pass in to etcd.
func SetResources(obj *EmbdEtcd, hostname string, resourceList []resources.Res) error {
// key structure is /$NS/exported/$hostname/resources/$uid = $data
var kindFilter []string // empty to get from everyone
hostnameFilter := []string{hostname}
// this is not a race because we should only be reading keys which we
// set, and there should not be any contention with other hosts here!
originals, err := GetResources(obj, hostnameFilter, kindFilter)
if err != nil {
return err
}
if len(originals) == 0 && len(resourceList) == 0 { // special case of no add or del
return nil
}
ifs := []etcd.Cmp{} // list matching the desired state
ops := []etcd.Op{} // list of ops in this transaction
for _, res := range resourceList {
if res.GetKind() == "" {
log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.GetName())
}
uid := fmt.Sprintf("%s/%s", res.GetKind(), res.GetName())
path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uid)
if data, err := resources.ResToB64(res); err == nil {
ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", data)) // desired state
ops = append(ops, etcd.OpPut(path, data))
} else {
return fmt.Errorf("can't convert to B64: %v", err)
}
}
match := func(res resources.Res, resourceList []resources.Res) bool { // helper lambda
for _, x := range resourceList {
if res.GetKind() == x.GetKind() && res.GetName() == x.GetName() {
return true
}
}
return false
}
hasDeletes := false
// delete old, now unused resources here...
for _, res := range originals {
if res.GetKind() == "" {
log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.GetName())
}
uid := fmt.Sprintf("%s/%s", res.GetKind(), res.GetName())
path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uid)
if match(res, resourceList) { // if we match, no need to delete!
continue
}
ops = append(ops, etcd.OpDelete(path))
hasDeletes = true
}
// if everything is already correct, do nothing, otherwise, run the ops!
// it's important to do this in one transaction, and atomically, because
// this way, we only generate one watch event, and only when it's needed
if hasDeletes { // always run, ifs don't matter
_, err = obj.Txn(nil, ops, nil) // TODO: does this run? it should!
} else {
_, err = obj.Txn(ifs, nil, ops) // TODO: do we need to look at response?
}
return err
}
// GetResources collects all of the resources which match a filter from etcd.
// If the kindfilter or hostnameFilter is empty, then it assumes no filtering...
// TODO: Expand this with a more powerful filter based on what we eventually
// support in our collect DSL. Ideally a server side filter like WithFilter()
// We could do this if the pattern was /$NS/exported/$kind/$hostname/$uid = $data.
func GetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resources.Res, error) {
// key structure is /$NS/exported/$hostname/resources/$uid = $data
path := fmt.Sprintf("/%s/exported/", NS)
resourceList := []resources.Res{}
keyMap, err := obj.Get(path, etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortAscend))
if err != nil {
return nil, fmt.Errorf("could not get resources: %v", err)
}
for key, val := range keyMap {
if !strings.HasPrefix(key, path) { // sanity check
continue
}
str := strings.Split(key[len(path):], "/")
if len(str) != 4 {
return nil, fmt.Errorf("unexpected chunk count")
}
hostname, r, kind, name := str[0], str[1], str[2], str[3]
if r != "resources" {
return nil, fmt.Errorf("unexpected chunk pattern")
}
if kind == "" {
return nil, fmt.Errorf("unexpected kind chunk")
}
// FIXME: ideally this would be a server side filter instead!
if len(hostnameFilter) > 0 && !util.StrInList(hostname, hostnameFilter) {
continue
}
// FIXME: ideally this would be a server side filter instead!
if len(kindFilter) > 0 && !util.StrInList(kind, kindFilter) {
continue
}
if obj, err := resources.B64ToRes(val); err == nil {
obj.SetKind(kind) // cheap init
log.Printf("Etcd: Get: (Hostname, Kind, Name): (%s, %s, %s)", hostname, kind, name)
resourceList = append(resourceList, obj)
} else {
return nil, fmt.Errorf("can't convert from B64: %v", err)
}
}
return resourceList, nil
}

View File

@@ -52,7 +52,7 @@ func WatchStr(obj *EmbdEtcd, key string) chan error {
return ch return ch
} }
// GetStr collects the string which matches a gloabl namespace in etcd. // GetStr collects the string which matches a global namespace in etcd.
func GetStr(obj *EmbdEtcd, key string) (string, error) { func GetStr(obj *EmbdEtcd, key string) (string, error) {
// new key structure is /$NS/strings/$key = $data // new key structure is /$NS/strings/$key = $data
path := fmt.Sprintf("/%s/strings/%s", NS, key) path := fmt.Sprintf("/%s/strings/%s", NS, key)