diff --git a/etcd/etcd.go b/etcd/etcd.go index 31d333be..4daca922 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -65,7 +65,6 @@ import ( "github.com/purpleidea/mgmt/converger" "github.com/purpleidea/mgmt/event" - "github.com/purpleidea/mgmt/resources" "github.com/purpleidea/mgmt/util" etcd "github.com/coreos/etcd/clientv3" // "clientv3" @@ -96,7 +95,7 @@ var ( errApplyDeltaEventsInconsistent = errors.New("inconsistent key in ApplyDeltaEvents") ) -// AW is a struct for the AddWatcher queue +// AW is a struct for the AddWatcher queue. type AW struct { path string opts []etcd.OpOption @@ -107,8 +106,8 @@ type AW struct { cancelFunc func() // data } -// RE is a response + error struct since these two values often occur together -// This is now called an event with the move to the etcd v3 API +// RE is a response + error struct since these two values often occur together. +// This is now called an event with the move to the etcd v3 API. type RE struct { response etcd.WatchResponse path string @@ -120,7 +119,7 @@ type RE struct { retries uint // number of times we've retried on error } -// KV is a key + value struct to hold the two items together +// KV is a key + value struct to hold the two items together. type KV struct { key string value string @@ -128,7 +127,7 @@ type KV struct { resp event.Resp } -// GQ is a struct for the get queue +// GQ is a struct for the get queue. type GQ struct { path string skipConv bool @@ -137,7 +136,7 @@ type GQ struct { data map[string]string } -// DL is a struct for the delete queue +// DL is a struct for the delete queue. type DL struct { path string opts []etcd.OpOption @@ -145,7 +144,7 @@ type DL struct { data int64 } -// TN is a struct for the txn queue +// TN is a struct for the txn queue. type TN struct { ifcmps []etcd.Cmp thenops []etcd.Op @@ -161,7 +160,7 @@ type Flags struct { Verbose bool // add extra log message output } -// EmbdEtcd provides the embedded server and client etcd functionality +// EmbdEtcd provides the embedded server and client etcd functionality. type EmbdEtcd struct { // EMBeddeD etcd // etcd client connection related cLock sync.Mutex // client connect lock @@ -207,7 +206,7 @@ type EmbdEtcd struct { // EMBeddeD etcd dataDir string // our data dir, prefix + "etcd" } -// NewEmbdEtcd creates the top level embedded etcd struct client and server obj +// NewEmbdEtcd creates the top level embedded etcd struct client and server obj. func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, flags Flags, prefix string, converger converger.Converger) *EmbdEtcd { endpoints := make(etcdtypes.URLsMap) if hostname == seedSentinel { // safety @@ -260,7 +259,7 @@ func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs etcdtypes.URLs, return obj } -// GetConfig returns the config struct to be used for the etcd client connect +// GetConfig returns the config struct to be used for the etcd client connect. func (obj *EmbdEtcd) GetConfig() etcd.Config { endpoints := []string{} // XXX: filter out any urls which wouldn't resolve here ? @@ -342,7 +341,7 @@ func (obj *EmbdEtcd) Connect(reconnect bool) error { return nil } -// Startup is the main entry point to kick off the embedded etcd client & server +// Startup is the main entry point to kick off the embedded etcd client & server. func (obj *EmbdEtcd) Startup() error { bootstrapping := len(obj.endpoints) == 0 // because value changes after start @@ -464,7 +463,7 @@ func (obj *EmbdEtcd) Destroy() error { return nil } -// CtxDelayErr requests a retry in Delta duration +// CtxDelayErr requests a retry in Delta duration. type CtxDelayErr struct { Delta time.Duration Message string @@ -474,7 +473,7 @@ func (obj *CtxDelayErr) Error() string { return fmt.Sprintf("CtxDelayErr(%v): %s", obj.Delta, obj.Message) } -// CtxRetriesErr lets you retry as long as you have retries available +// CtxRetriesErr lets you retry as long as you have retries available. // TODO: consider combining this with CtxDelayErr type CtxRetriesErr struct { Retries uint @@ -494,7 +493,7 @@ func (obj *CtxPermanentErr) Error() string { return fmt.Sprintf("CtxPermanentErr: %s", obj.Message) } -// CtxReconnectErr requests a client reconnect to the new endpoint list +// CtxReconnectErr requests a client reconnect to the new endpoint list. type CtxReconnectErr struct { Message string } @@ -503,7 +502,7 @@ func (obj *CtxReconnectErr) Error() string { return fmt.Sprintf("CtxReconnectErr: %s", obj.Message) } -// CancelCtx adds a tracked cancel function around an existing context +// CancelCtx adds a tracked cancel function around an existing context. func (obj *EmbdEtcd) CancelCtx(ctx context.Context) (context.Context, func()) { cancelCtx, cancelFunc := context.WithCancel(ctx) obj.cancelLock.Lock() @@ -512,7 +511,7 @@ func (obj *EmbdEtcd) CancelCtx(ctx context.Context) (context.Context, func()) { return cancelCtx, cancelFunc } -// TimeoutCtx adds a tracked cancel function with timeout around an existing context +// TimeoutCtx adds a tracked cancel function with timeout around an existing context. func (obj *EmbdEtcd) TimeoutCtx(ctx context.Context, t time.Duration) (context.Context, func()) { timeoutCtx, cancelFunc := context.WithTimeout(ctx, t) obj.cancelLock.Lock() @@ -699,7 +698,7 @@ func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, return ctx, obj.ctxErr } -// CbLoop is the loop where callback execution is serialized +// CbLoop is the loop where callback execution is serialized. func (obj *EmbdEtcd) CbLoop() { cuid := obj.converger.Register() cuid.SetName("Etcd: CbLoop") @@ -755,7 +754,7 @@ func (obj *EmbdEtcd) CbLoop() { } } -// Loop is the main loop where everything is serialized +// Loop is the main loop where everything is serialized. func (obj *EmbdEtcd) Loop() { cuid := obj.converger.Register() cuid.SetName("Etcd: Loop") @@ -933,7 +932,7 @@ func (obj *EmbdEtcd) loopProcessAW(ctx context.Context, aw *AW) { } } -// Set queues up a set operation to occur using our mainloop +// Set queues up a set operation to occur using our mainloop. func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error { resp := event.NewResp() obj.setq <- &KV{key: key, value: value, opts: opts, resp: resp} @@ -943,7 +942,7 @@ func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error { return nil } -// rawSet actually implements the key set operation +// rawSet actually implements the key set operation. func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error { if obj.flags.Trace { log.Printf("Trace: Etcd: rawSet()") @@ -960,7 +959,7 @@ func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error { return err } -// Get performs a get operation and waits for an ACK to continue +// Get performs a get operation and waits for an ACK to continue. func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, error) { return obj.ComplexGet(path, false, opts...) } @@ -1001,7 +1000,7 @@ func (obj *EmbdEtcd) rawGet(ctx context.Context, gq *GQ) (result map[string]stri return } -// Delete performs a delete operation and waits for an ACK to continue +// Delete performs a delete operation and waits for an ACK to continue. func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error) { resp := event.NewResp() dl := &DL{path: path, opts: opts, resp: resp, data: -1} @@ -1029,7 +1028,7 @@ func (obj *EmbdEtcd) rawDelete(ctx context.Context, dl *DL) (count int64, err er return } -// Txn performs a transaction and waits for an ACK to continue +// Txn performs a transaction and waits for an ACK to continue. func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error) { resp := event.NewResp() tn := &TN{ifcmps: ifcmps, thenops: thenops, elseops: elseops, resp: resp, data: nil} @@ -1053,8 +1052,8 @@ func (obj *EmbdEtcd) rawTxn(ctx context.Context, tn *TN) (*etcd.TxnResponse, err return response, err } -// AddWatcher queues up an add watcher request and returns a cancel function -// Remember to add the etcd.WithPrefix() option if you want to watch recursively +// AddWatcher queues up an add watcher request and returns a cancel function. +// Remember to add the etcd.WithPrefix() option if you want to watch recursively. func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error) { resp := event.NewResp() awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, skipConv: skipConv, cancelFunc: nil, resp: resp} @@ -1065,7 +1064,7 @@ func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errChe return awq.cancelFunc, nil } -// rawAddWatcher adds a watcher and returns a cancel function to call to end it +// rawAddWatcher adds a watcher and returns a cancel function to call to end it. func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error) { cancelCtx, cancelFunc := obj.CancelCtx(ctx) go func(ctx context.Context) { @@ -1142,7 +1141,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error) return cancelFunc, nil } -// rawCallback is the companion to AddWatcher which runs the callback processing +// rawCallback is the companion to AddWatcher which runs the callback processing. func rawCallback(ctx context.Context, re *RE) error { var err = re.err // the watch event itself might have had an error if err == nil { @@ -1161,8 +1160,8 @@ func rawCallback(ctx context.Context, re *RE) error { return err } -// volunteerCallback runs to respond to the volunteer list change events -// functionally, it controls the adding and removing of members +// volunteerCallback runs to respond to the volunteer list change events. +// Functionally, it controls the adding and removing of members. // FIXME: we might need to respond to member change/disconnect/shutdown events, // see: https://github.com/coreos/etcd/issues/5277 func (obj *EmbdEtcd) volunteerCallback(re *RE) error { @@ -1351,8 +1350,8 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error { return nil } -// nominateCallback runs to respond to the nomination list change events -// functionally, it controls the starting and stopping of the server process +// nominateCallback runs to respond to the nomination list change events. +// Functionally, it controls the starting and stopping of the server process. func (obj *EmbdEtcd) nominateCallback(re *RE) error { if obj.flags.Trace { log.Printf("Trace: Etcd: nominateCallback()") @@ -1504,7 +1503,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error { return nil } -// endpointCallback runs to respond to the endpoint list change events +// endpointCallback runs to respond to the endpoint list change events. func (obj *EmbdEtcd) endpointCallback(re *RE) error { if obj.flags.Trace { log.Printf("Trace: Etcd: endpointCallback()") @@ -1570,7 +1569,7 @@ func (obj *EmbdEtcd) endpointCallback(re *RE) error { return nil } -// idealClusterSizeCallback runs to respond to the ideal cluster size changes +// idealClusterSizeCallback runs to respond to the ideal cluster size changes. func (obj *EmbdEtcd) idealClusterSizeCallback(re *RE) error { if obj.flags.Trace { log.Printf("Trace: Etcd: idealClusterSizeCallback()") @@ -1604,8 +1603,8 @@ func (obj *EmbdEtcd) idealClusterSizeCallback(re *RE) error { return nil } -// LocalhostClientURLs returns the most localhost like URLs for direct connection -// this gets clients to talk to the local servers first before searching remotely +// LocalhostClientURLs returns the most localhost like URLs for direct connection. +// This gets clients to talk to the local servers first before searching remotely. func (obj *EmbdEtcd) LocalhostClientURLs() etcdtypes.URLs { // look through obj.clientURLs and return the localhost ones urls := etcdtypes.URLs{} @@ -1695,7 +1694,7 @@ func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) return nil } -// DestroyServer shuts down the embedded etcd server portion +// DestroyServer shuts down the embedded etcd server portion. func (obj *EmbdEtcd) DestroyServer() error { var err error log.Printf("Etcd: DestroyServer: Destroying...") @@ -1714,541 +1713,6 @@ func (obj *EmbdEtcd) DestroyServer() error { return err } -// TODO: Could all these Etcd*(obj *EmbdEtcd, ...) functions which deal with the -// interface between etcd paths and behaviour be grouped into a single struct ? - -// Nominate nominates a particular client to be a server (peer) -func Nominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error { - if obj.flags.Trace { - log.Printf("Trace: Etcd: Nominate(%v): %v", hostname, urls.String()) - defer log.Printf("Trace: Etcd: Nominate(%v): Finished!", hostname) - } - // nominate someone to be a server - nominate := fmt.Sprintf("/%s/nominated/%s", NS, hostname) - ops := []etcd.Op{} // list of ops in this txn - if urls != nil { - ops = append(ops, etcd.OpPut(nominate, urls.String())) // TODO: add a TTL? (etcd.WithLease) - - } else { // delete message if set to erase - ops = append(ops, etcd.OpDelete(nominate)) - } - - if _, err := obj.Txn(nil, ops, nil); err != nil { - return fmt.Errorf("nominate failed") // exit in progress? - } - return nil -} - -// Nominated returns a urls map of nominated etcd server volunteers -// NOTE: I know 'nominees' might be more correct, but is less consistent here -func Nominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { - path := fmt.Sprintf("/%s/nominated/", NS) - keyMap, err := obj.Get(path, etcd.WithPrefix()) // map[string]string, bool - if err != nil { - return nil, fmt.Errorf("nominated isn't available: %v", err) - } - nominated := make(etcdtypes.URLsMap) - for key, val := range keyMap { // loop through directory of nominated - if !strings.HasPrefix(key, path) { - continue - } - name := key[len(path):] // get name of nominee - if val == "" { // skip "erased" values - continue - } - urls, err := etcdtypes.NewURLs(strings.Split(val, ",")) - if err != nil { - return nil, fmt.Errorf("nominated data format error: %v", err) - } - nominated[name] = urls // add to map - if obj.flags.Debug { - log.Printf("Etcd: Nominated(%v): %v", name, val) - } - } - return nominated, nil -} - -// Volunteer offers yourself up to be a server if needed -func Volunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error { - if obj.flags.Trace { - log.Printf("Trace: Etcd: Volunteer(%v): %v", obj.hostname, urls.String()) - defer log.Printf("Trace: Etcd: Volunteer(%v): Finished!", obj.hostname) - } - // volunteer to be a server - volunteer := fmt.Sprintf("/%s/volunteers/%s", NS, obj.hostname) - ops := []etcd.Op{} // list of ops in this txn - if urls != nil { - // XXX: adding a TTL is crucial! (i think) - ops = append(ops, etcd.OpPut(volunteer, urls.String())) // value is usually a peer "serverURL" - - } else { // delete message if set to erase - ops = append(ops, etcd.OpDelete(volunteer)) - } - - if _, err := obj.Txn(nil, ops, nil); err != nil { - return fmt.Errorf("volunteering failed") // exit in progress? - } - return nil -} - -// Volunteers returns a urls map of available etcd server volunteers -func Volunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { - if obj.flags.Trace { - log.Printf("Trace: Etcd: Volunteers()") - defer log.Printf("Trace: Etcd: Volunteers(): Finished!") - } - path := fmt.Sprintf("/%s/volunteers/", NS) - keyMap, err := obj.Get(path, etcd.WithPrefix()) - if err != nil { - return nil, fmt.Errorf("volunteers aren't available: %v", err) - } - volunteers := make(etcdtypes.URLsMap) - for key, val := range keyMap { // loop through directory of volunteers - if !strings.HasPrefix(key, path) { - continue - } - name := key[len(path):] // get name of volunteer - if val == "" { // skip "erased" values - continue - } - urls, err := etcdtypes.NewURLs(strings.Split(val, ",")) - if err != nil { - return nil, fmt.Errorf("volunteers data format error: %v", err) - } - volunteers[name] = urls // add to map - if obj.flags.Debug { - log.Printf("Etcd: Volunteer(%v): %v", name, val) - } - } - return volunteers, nil -} - -// AdvertiseEndpoints advertises the list of available client endpoints -func AdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error { - if obj.flags.Trace { - log.Printf("Trace: Etcd: AdvertiseEndpoints(%v): %v", obj.hostname, urls.String()) - defer log.Printf("Trace: Etcd: AdvertiseEndpoints(%v): Finished!", obj.hostname) - } - // advertise endpoints - endpoints := fmt.Sprintf("/%s/endpoints/%s", NS, obj.hostname) - ops := []etcd.Op{} // list of ops in this txn - if urls != nil { - // TODO: add a TTL? (etcd.WithLease) - ops = append(ops, etcd.OpPut(endpoints, urls.String())) // value is usually a "clientURL" - - } else { // delete message if set to erase - ops = append(ops, etcd.OpDelete(endpoints)) - } - - if _, err := obj.Txn(nil, ops, nil); err != nil { - return fmt.Errorf("endpoint advertising failed") // exit in progress? - } - return nil -} - -// Endpoints returns a urls map of available etcd server endpoints -func Endpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { - if obj.flags.Trace { - log.Printf("Trace: Etcd: Endpoints()") - defer log.Printf("Trace: Etcd: Endpoints(): Finished!") - } - path := fmt.Sprintf("/%s/endpoints/", NS) - keyMap, err := obj.Get(path, etcd.WithPrefix()) - if err != nil { - return nil, fmt.Errorf("endpoints aren't available: %v", err) - } - endpoints := make(etcdtypes.URLsMap) - for key, val := range keyMap { // loop through directory of endpoints - if !strings.HasPrefix(key, path) { - continue - } - name := key[len(path):] // get name of volunteer - if val == "" { // skip "erased" values - continue - } - urls, err := etcdtypes.NewURLs(strings.Split(val, ",")) - if err != nil { - return nil, fmt.Errorf("endpoints data format error: %v", err) - } - endpoints[name] = urls // add to map - if obj.flags.Debug { - log.Printf("Etcd: Endpoint(%v): %v", name, val) - } - } - return endpoints, nil -} - -// SetHostnameConverged sets whether a specific hostname is converged. -func SetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error { - if obj.flags.Trace { - log.Printf("Trace: Etcd: SetHostnameConverged(%s): %v", hostname, isConverged) - defer log.Printf("Trace: Etcd: SetHostnameConverged(%v): Finished!", hostname) - } - converged := fmt.Sprintf("/%s/converged/%s", NS, hostname) - op := []etcd.Op{etcd.OpPut(converged, fmt.Sprintf("%t", isConverged))} - if _, err := obj.Txn(nil, op, nil); err != nil { // TODO: do we need a skipConv flag here too? - return fmt.Errorf("set converged failed") // exit in progress? - } - return nil -} - -// HostnameConverged returns a map of every hostname's converged state. -func HostnameConverged(obj *EmbdEtcd) (map[string]bool, error) { - if obj.flags.Trace { - log.Printf("Trace: Etcd: HostnameConverged()") - defer log.Printf("Trace: Etcd: HostnameConverged(): Finished!") - } - path := fmt.Sprintf("/%s/converged/", NS) - keyMap, err := obj.ComplexGet(path, true, etcd.WithPrefix()) // don't un-converge - if err != nil { - return nil, fmt.Errorf("converged values aren't available: %v", err) - } - converged := make(map[string]bool) - for key, val := range keyMap { // loop through directory... - if !strings.HasPrefix(key, path) { - continue - } - name := key[len(path):] // get name of key - if val == "" { // skip "erased" values - continue - } - b, err := strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("converged data format error: %v", err) - } - converged[name] = b // add to map - } - return converged, nil -} - -// AddHostnameConvergedWatcher adds a watcher with a callback that runs on -// hostname state changes. -func AddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error) { - path := fmt.Sprintf("/%s/converged/", NS) - internalCbFn := func(re *RE) error { - // TODO: get the value from the response, and apply delta... - // for now, just run a get operation which is easier to code! - m, err := HostnameConverged(obj) - if err != nil { - return err - } - return callbackFn(m) // call my function - } - return obj.AddWatcher(path, internalCbFn, true, true, etcd.WithPrefix()) // no block and no converger reset -} - -// SetClusterSize sets the ideal target cluster size of etcd peers -func SetClusterSize(obj *EmbdEtcd, value uint16) error { - if obj.flags.Trace { - log.Printf("Trace: Etcd: SetClusterSize(): %v", value) - defer log.Printf("Trace: Etcd: SetClusterSize(): Finished!") - } - key := fmt.Sprintf("/%s/idealClusterSize", NS) - - if err := obj.Set(key, strconv.FormatUint(uint64(value), 10)); err != nil { - return fmt.Errorf("function SetClusterSize failed: %v", err) // exit in progress? - } - return nil -} - -// GetClusterSize gets the ideal target cluster size of etcd peers -func GetClusterSize(obj *EmbdEtcd) (uint16, error) { - key := fmt.Sprintf("/%s/idealClusterSize", NS) - keyMap, err := obj.Get(key) - if err != nil { - return 0, fmt.Errorf("function GetClusterSize failed: %v", err) - } - - val, exists := keyMap[key] - if !exists || val == "" { - return 0, fmt.Errorf("function GetClusterSize failed: %v", err) - } - - v, err := strconv.ParseUint(val, 10, 16) - if err != nil { - return 0, fmt.Errorf("function GetClusterSize failed: %v", err) - } - return uint16(v), nil -} - -// MemberAdd adds a member to the cluster. -func MemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error) { - //obj.Connect(false) // TODO: ? - ctx := context.Background() - var response *etcd.MemberAddResponse - var err error - for { - if obj.exiting { // the exit signal has been sent! - return nil, fmt.Errorf("exiting etcd") - } - obj.rLock.RLock() - response, err = obj.client.MemberAdd(ctx, peerURLs.StringSlice()) - obj.rLock.RUnlock() - if err == nil { - break - } - if ctx, err = obj.CtxError(ctx, err); err != nil { - return nil, err - } - } - return response, nil -} - -// MemberRemove removes a member by mID and returns if it worked, and also -// if there was an error. This is because it might have run without error, but -// the member wasn't found, for example. -func MemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) { - //obj.Connect(false) // TODO: ? - ctx := context.Background() - for { - if obj.exiting { // the exit signal has been sent! - return false, fmt.Errorf("exiting etcd") - } - obj.rLock.RLock() - _, err := obj.client.MemberRemove(ctx, mID) - obj.rLock.RUnlock() - if err == nil { - break - } else if err == rpctypes.ErrMemberNotFound { - // if we get this, member already shut itself down :) - return false, nil - } - if ctx, err = obj.CtxError(ctx, err); err != nil { - return false, err - } - } - return true, nil -} - -// Members returns information on cluster membership. -// The member ID's are the keys, because an empty names means unstarted! -// TODO: consider queueing this through the main loop with CtxError(ctx, err) -func Members(obj *EmbdEtcd) (map[uint64]string, error) { - //obj.Connect(false) // TODO: ? - ctx := context.Background() - var response *etcd.MemberListResponse - var err error - for { - if obj.exiting { // the exit signal has been sent! - return nil, fmt.Errorf("exiting etcd") - } - obj.rLock.RLock() - if obj.flags.Trace { - log.Printf("Trace: Etcd: Members(): Endpoints are: %v", obj.client.Endpoints()) - } - response, err = obj.client.MemberList(ctx) - obj.rLock.RUnlock() - if err == nil { - break - } - if ctx, err = obj.CtxError(ctx, err); err != nil { - return nil, err - } - } - - members := make(map[uint64]string) - for _, x := range response.Members { - members[x.ID] = x.Name // x.Name will be "" if unstarted! - } - return members, nil -} - -// Leader returns the current leader of the etcd server cluster -func Leader(obj *EmbdEtcd) (string, error) { - //obj.Connect(false) // TODO: ? - var err error - membersMap := make(map[uint64]string) - if membersMap, err = Members(obj); err != nil { - return "", err - } - addresses := obj.LocalhostClientURLs() // heuristic, but probably correct - if len(addresses) == 0 { - // probably a programming error... - return "", fmt.Errorf("programming error") - } - endpoint := addresses[0].Host // FIXME: arbitrarily picked the first one - - // part two - ctx := context.Background() - var response *etcd.StatusResponse - for { - if obj.exiting { // the exit signal has been sent! - return "", fmt.Errorf("exiting etcd") - } - - obj.rLock.RLock() - response, err = obj.client.Maintenance.Status(ctx, endpoint) - obj.rLock.RUnlock() - if err == nil { - break - } - if ctx, err = obj.CtxError(ctx, err); err != nil { - return "", err - } - } - - // isLeader: response.Header.MemberId == response.Leader - for id, name := range membersMap { - if id == response.Leader { - return name, nil - } - } - return "", fmt.Errorf("members map is not current") // not found -} - -// WatchResources returns a channel that outputs events when exported resources -// change. -// TODO: Filter our watch (on the server side if possible) based on the -// collection prefixes and filters that we care about... -func WatchResources(obj *EmbdEtcd) chan error { - ch := make(chan error, 1) // buffer it so we can measure it - path := fmt.Sprintf("/%s/exported/", NS) - callback := func(re *RE) error { - // TODO: is this even needed? it used to happen on conn errors - log.Printf("Etcd: Watch: Path: %v", path) // event - if re == nil || re.response.Canceled { - return fmt.Errorf("watch is empty") // will cause a CtxError+retry - } - // we normally need to check if anything changed since the last - // event, since a set (export) with no changes still causes the - // watcher to trigger and this would cause an infinite loop. we - // don't need to do this check anymore because we do the export - // transactionally, and only if a change is needed. since it is - // atomic, all the changes arrive together which avoids dupes!! - if len(ch) == 0 { // send event only if one isn't pending - // this check avoids multiple events all queueing up and then - // being released continuously long after the changes stopped - // do not block! - ch <- nil // event - } - return nil - } - _, _ = obj.AddWatcher(path, callback, true, false, etcd.WithPrefix()) // no need to check errors - return ch -} - -// SetResources exports all of the resources which we pass in to etcd -func SetResources(obj *EmbdEtcd, hostname string, resourceList []resources.Res) error { - // key structure is /$NS/exported/$hostname/resources/$uid = $data - - var kindFilter []string // empty to get from everyone - hostnameFilter := []string{hostname} - // this is not a race because we should only be reading keys which we - // set, and there should not be any contention with other hosts here! - originals, err := GetResources(obj, hostnameFilter, kindFilter) - if err != nil { - return err - } - - if len(originals) == 0 && len(resourceList) == 0 { // special case of no add or del - return nil - } - - ifs := []etcd.Cmp{} // list matching the desired state - ops := []etcd.Op{} // list of ops in this transaction - for _, res := range resourceList { - if res.GetKind() == "" { - log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.GetName()) - } - uid := fmt.Sprintf("%s/%s", res.GetKind(), res.GetName()) - path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uid) - if data, err := resources.ResToB64(res); err == nil { - ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", data)) // desired state - ops = append(ops, etcd.OpPut(path, data)) - } else { - return fmt.Errorf("can't convert to B64: %v", err) - } - } - - match := func(res resources.Res, resourceList []resources.Res) bool { // helper lambda - for _, x := range resourceList { - if res.GetKind() == x.GetKind() && res.GetName() == x.GetName() { - return true - } - } - return false - } - - hasDeletes := false - // delete old, now unused resources here... - for _, res := range originals { - if res.GetKind() == "" { - log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.GetName()) - } - uid := fmt.Sprintf("%s/%s", res.GetKind(), res.GetName()) - path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uid) - - if match(res, resourceList) { // if we match, no need to delete! - continue - } - - ops = append(ops, etcd.OpDelete(path)) - - hasDeletes = true - } - - // if everything is already correct, do nothing, otherwise, run the ops! - // it's important to do this in one transaction, and atomically, because - // this way, we only generate one watch event, and only when it's needed - if hasDeletes { // always run, ifs don't matter - _, err = obj.Txn(nil, ops, nil) // TODO: does this run? it should! - } else { - _, err = obj.Txn(ifs, nil, ops) // TODO: do we need to look at response? - } - return err -} - -// GetResources collects all of the resources which match a filter from etcd -// If the kindfilter or hostnameFilter is empty, then it assumes no filtering... -// TODO: Expand this with a more powerful filter based on what we eventually -// support in our collect DSL. Ideally a server side filter like WithFilter() -// We could do this if the pattern was /$NS/exported/$kind/$hostname/$uid = $data -func GetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resources.Res, error) { - // key structure is /$NS/exported/$hostname/resources/$uid = $data - path := fmt.Sprintf("/%s/exported/", NS) - resourceList := []resources.Res{} - keyMap, err := obj.Get(path, etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortAscend)) - if err != nil { - return nil, fmt.Errorf("could not get resources: %v", err) - } - for key, val := range keyMap { - if !strings.HasPrefix(key, path) { // sanity check - continue - } - - str := strings.Split(key[len(path):], "/") - if len(str) != 4 { - return nil, fmt.Errorf("unexpected chunk count") - } - hostname, r, kind, name := str[0], str[1], str[2], str[3] - if r != "resources" { - return nil, fmt.Errorf("unexpected chunk pattern") - } - if kind == "" { - return nil, fmt.Errorf("unexpected kind chunk") - } - - // FIXME: ideally this would be a server side filter instead! - if len(hostnameFilter) > 0 && !util.StrInList(hostname, hostnameFilter) { - continue - } - - // FIXME: ideally this would be a server side filter instead! - if len(kindFilter) > 0 && !util.StrInList(kind, kindFilter) { - continue - } - - if obj, err := resources.B64ToRes(val); err == nil { - obj.SetKind(kind) // cheap init - log.Printf("Etcd: Get: (Hostname, Kind, Name): (%s, %s, %s)", hostname, kind, name) - resourceList = append(resourceList, obj) - } else { - return nil, fmt.Errorf("can't convert from B64: %v", err) - } - } - return resourceList, nil -} - //func UrlRemoveScheme(urls etcdtypes.URLs) []string { // strs := []string{} // for _, u := range urls { @@ -2257,7 +1721,7 @@ func GetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resourc // return strs //} -// ApplyDeltaEvents modifies a URLsMap with the deltas from a WatchResponse +// ApplyDeltaEvents modifies a URLsMap with the deltas from a WatchResponse. func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, error) { if re == nil { // passthrough return urlsmap, nil diff --git a/etcd/methods.go b/etcd/methods.go new file mode 100644 index 00000000..c951d376 --- /dev/null +++ b/etcd/methods.go @@ -0,0 +1,412 @@ +// Mgmt +// Copyright (C) 2013-2017+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package etcd + +import ( + "fmt" + "log" + "strconv" + "strings" + + etcd "github.com/coreos/etcd/clientv3" + rpctypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + etcdtypes "github.com/coreos/etcd/pkg/types" + context "golang.org/x/net/context" +) + +// TODO: Could all these Etcd*(obj *EmbdEtcd, ...) functions which deal with the +// interface between etcd paths and behaviour be grouped into a single struct ? + +// Nominate nominates a particular client to be a server (peer). +func Nominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error { + if obj.flags.Trace { + log.Printf("Trace: Etcd: Nominate(%v): %v", hostname, urls.String()) + defer log.Printf("Trace: Etcd: Nominate(%v): Finished!", hostname) + } + // nominate someone to be a server + nominate := fmt.Sprintf("/%s/nominated/%s", NS, hostname) + ops := []etcd.Op{} // list of ops in this txn + if urls != nil { + ops = append(ops, etcd.OpPut(nominate, urls.String())) // TODO: add a TTL? (etcd.WithLease) + + } else { // delete message if set to erase + ops = append(ops, etcd.OpDelete(nominate)) + } + + if _, err := obj.Txn(nil, ops, nil); err != nil { + return fmt.Errorf("nominate failed") // exit in progress? + } + return nil +} + +// Nominated returns a urls map of nominated etcd server volunteers. +// NOTE: I know 'nominees' might be more correct, but is less consistent here +func Nominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { + path := fmt.Sprintf("/%s/nominated/", NS) + keyMap, err := obj.Get(path, etcd.WithPrefix()) // map[string]string, bool + if err != nil { + return nil, fmt.Errorf("nominated isn't available: %v", err) + } + nominated := make(etcdtypes.URLsMap) + for key, val := range keyMap { // loop through directory of nominated + if !strings.HasPrefix(key, path) { + continue + } + name := key[len(path):] // get name of nominee + if val == "" { // skip "erased" values + continue + } + urls, err := etcdtypes.NewURLs(strings.Split(val, ",")) + if err != nil { + return nil, fmt.Errorf("nominated data format error: %v", err) + } + nominated[name] = urls // add to map + if obj.flags.Debug { + log.Printf("Etcd: Nominated(%v): %v", name, val) + } + } + return nominated, nil +} + +// Volunteer offers yourself up to be a server if needed. +func Volunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error { + if obj.flags.Trace { + log.Printf("Trace: Etcd: Volunteer(%v): %v", obj.hostname, urls.String()) + defer log.Printf("Trace: Etcd: Volunteer(%v): Finished!", obj.hostname) + } + // volunteer to be a server + volunteer := fmt.Sprintf("/%s/volunteers/%s", NS, obj.hostname) + ops := []etcd.Op{} // list of ops in this txn + if urls != nil { + // XXX: adding a TTL is crucial! (i think) + ops = append(ops, etcd.OpPut(volunteer, urls.String())) // value is usually a peer "serverURL" + + } else { // delete message if set to erase + ops = append(ops, etcd.OpDelete(volunteer)) + } + + if _, err := obj.Txn(nil, ops, nil); err != nil { + return fmt.Errorf("volunteering failed") // exit in progress? + } + return nil +} + +// Volunteers returns a urls map of available etcd server volunteers. +func Volunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { + if obj.flags.Trace { + log.Printf("Trace: Etcd: Volunteers()") + defer log.Printf("Trace: Etcd: Volunteers(): Finished!") + } + path := fmt.Sprintf("/%s/volunteers/", NS) + keyMap, err := obj.Get(path, etcd.WithPrefix()) + if err != nil { + return nil, fmt.Errorf("volunteers aren't available: %v", err) + } + volunteers := make(etcdtypes.URLsMap) + for key, val := range keyMap { // loop through directory of volunteers + if !strings.HasPrefix(key, path) { + continue + } + name := key[len(path):] // get name of volunteer + if val == "" { // skip "erased" values + continue + } + urls, err := etcdtypes.NewURLs(strings.Split(val, ",")) + if err != nil { + return nil, fmt.Errorf("volunteers data format error: %v", err) + } + volunteers[name] = urls // add to map + if obj.flags.Debug { + log.Printf("Etcd: Volunteer(%v): %v", name, val) + } + } + return volunteers, nil +} + +// AdvertiseEndpoints advertises the list of available client endpoints. +func AdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error { + if obj.flags.Trace { + log.Printf("Trace: Etcd: AdvertiseEndpoints(%v): %v", obj.hostname, urls.String()) + defer log.Printf("Trace: Etcd: AdvertiseEndpoints(%v): Finished!", obj.hostname) + } + // advertise endpoints + endpoints := fmt.Sprintf("/%s/endpoints/%s", NS, obj.hostname) + ops := []etcd.Op{} // list of ops in this txn + if urls != nil { + // TODO: add a TTL? (etcd.WithLease) + ops = append(ops, etcd.OpPut(endpoints, urls.String())) // value is usually a "clientURL" + + } else { // delete message if set to erase + ops = append(ops, etcd.OpDelete(endpoints)) + } + + if _, err := obj.Txn(nil, ops, nil); err != nil { + return fmt.Errorf("endpoint advertising failed") // exit in progress? + } + return nil +} + +// Endpoints returns a urls map of available etcd server endpoints. +func Endpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { + if obj.flags.Trace { + log.Printf("Trace: Etcd: Endpoints()") + defer log.Printf("Trace: Etcd: Endpoints(): Finished!") + } + path := fmt.Sprintf("/%s/endpoints/", NS) + keyMap, err := obj.Get(path, etcd.WithPrefix()) + if err != nil { + return nil, fmt.Errorf("endpoints aren't available: %v", err) + } + endpoints := make(etcdtypes.URLsMap) + for key, val := range keyMap { // loop through directory of endpoints + if !strings.HasPrefix(key, path) { + continue + } + name := key[len(path):] // get name of volunteer + if val == "" { // skip "erased" values + continue + } + urls, err := etcdtypes.NewURLs(strings.Split(val, ",")) + if err != nil { + return nil, fmt.Errorf("endpoints data format error: %v", err) + } + endpoints[name] = urls // add to map + if obj.flags.Debug { + log.Printf("Etcd: Endpoint(%v): %v", name, val) + } + } + return endpoints, nil +} + +// SetHostnameConverged sets whether a specific hostname is converged. +func SetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error { + if obj.flags.Trace { + log.Printf("Trace: Etcd: SetHostnameConverged(%s): %v", hostname, isConverged) + defer log.Printf("Trace: Etcd: SetHostnameConverged(%v): Finished!", hostname) + } + converged := fmt.Sprintf("/%s/converged/%s", NS, hostname) + op := []etcd.Op{etcd.OpPut(converged, fmt.Sprintf("%t", isConverged))} + if _, err := obj.Txn(nil, op, nil); err != nil { // TODO: do we need a skipConv flag here too? + return fmt.Errorf("set converged failed") // exit in progress? + } + return nil +} + +// HostnameConverged returns a map of every hostname's converged state. +func HostnameConverged(obj *EmbdEtcd) (map[string]bool, error) { + if obj.flags.Trace { + log.Printf("Trace: Etcd: HostnameConverged()") + defer log.Printf("Trace: Etcd: HostnameConverged(): Finished!") + } + path := fmt.Sprintf("/%s/converged/", NS) + keyMap, err := obj.ComplexGet(path, true, etcd.WithPrefix()) // don't un-converge + if err != nil { + return nil, fmt.Errorf("converged values aren't available: %v", err) + } + converged := make(map[string]bool) + for key, val := range keyMap { // loop through directory... + if !strings.HasPrefix(key, path) { + continue + } + name := key[len(path):] // get name of key + if val == "" { // skip "erased" values + continue + } + b, err := strconv.ParseBool(val) + if err != nil { + return nil, fmt.Errorf("converged data format error: %v", err) + } + converged[name] = b // add to map + } + return converged, nil +} + +// AddHostnameConvergedWatcher adds a watcher with a callback that runs on +// hostname state changes. +func AddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error) { + path := fmt.Sprintf("/%s/converged/", NS) + internalCbFn := func(re *RE) error { + // TODO: get the value from the response, and apply delta... + // for now, just run a get operation which is easier to code! + m, err := HostnameConverged(obj) + if err != nil { + return err + } + return callbackFn(m) // call my function + } + return obj.AddWatcher(path, internalCbFn, true, true, etcd.WithPrefix()) // no block and no converger reset +} + +// SetClusterSize sets the ideal target cluster size of etcd peers. +func SetClusterSize(obj *EmbdEtcd, value uint16) error { + if obj.flags.Trace { + log.Printf("Trace: Etcd: SetClusterSize(): %v", value) + defer log.Printf("Trace: Etcd: SetClusterSize(): Finished!") + } + key := fmt.Sprintf("/%s/idealClusterSize", NS) + + if err := obj.Set(key, strconv.FormatUint(uint64(value), 10)); err != nil { + return fmt.Errorf("function SetClusterSize failed: %v", err) // exit in progress? + } + return nil +} + +// GetClusterSize gets the ideal target cluster size of etcd peers. +func GetClusterSize(obj *EmbdEtcd) (uint16, error) { + key := fmt.Sprintf("/%s/idealClusterSize", NS) + keyMap, err := obj.Get(key) + if err != nil { + return 0, fmt.Errorf("function GetClusterSize failed: %v", err) + } + + val, exists := keyMap[key] + if !exists || val == "" { + return 0, fmt.Errorf("function GetClusterSize failed: %v", err) + } + + v, err := strconv.ParseUint(val, 10, 16) + if err != nil { + return 0, fmt.Errorf("function GetClusterSize failed: %v", err) + } + return uint16(v), nil +} + +// MemberAdd adds a member to the cluster. +func MemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error) { + //obj.Connect(false) // TODO: ? + ctx := context.Background() + var response *etcd.MemberAddResponse + var err error + for { + if obj.exiting { // the exit signal has been sent! + return nil, fmt.Errorf("exiting etcd") + } + obj.rLock.RLock() + response, err = obj.client.MemberAdd(ctx, peerURLs.StringSlice()) + obj.rLock.RUnlock() + if err == nil { + break + } + if ctx, err = obj.CtxError(ctx, err); err != nil { + return nil, err + } + } + return response, nil +} + +// MemberRemove removes a member by mID and returns if it worked, and also +// if there was an error. This is because it might have run without error, but +// the member wasn't found, for example. +func MemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) { + //obj.Connect(false) // TODO: ? + ctx := context.Background() + for { + if obj.exiting { // the exit signal has been sent! + return false, fmt.Errorf("exiting etcd") + } + obj.rLock.RLock() + _, err := obj.client.MemberRemove(ctx, mID) + obj.rLock.RUnlock() + if err == nil { + break + } else if err == rpctypes.ErrMemberNotFound { + // if we get this, member already shut itself down :) + return false, nil + } + if ctx, err = obj.CtxError(ctx, err); err != nil { + return false, err + } + } + return true, nil +} + +// Members returns information on cluster membership. +// The member ID's are the keys, because an empty names means unstarted! +// TODO: consider queueing this through the main loop with CtxError(ctx, err) +func Members(obj *EmbdEtcd) (map[uint64]string, error) { + //obj.Connect(false) // TODO: ? + ctx := context.Background() + var response *etcd.MemberListResponse + var err error + for { + if obj.exiting { // the exit signal has been sent! + return nil, fmt.Errorf("exiting etcd") + } + obj.rLock.RLock() + if obj.flags.Trace { + log.Printf("Trace: Etcd: Members(): Endpoints are: %v", obj.client.Endpoints()) + } + response, err = obj.client.MemberList(ctx) + obj.rLock.RUnlock() + if err == nil { + break + } + if ctx, err = obj.CtxError(ctx, err); err != nil { + return nil, err + } + } + + members := make(map[uint64]string) + for _, x := range response.Members { + members[x.ID] = x.Name // x.Name will be "" if unstarted! + } + return members, nil +} + +// Leader returns the current leader of the etcd server cluster. +func Leader(obj *EmbdEtcd) (string, error) { + //obj.Connect(false) // TODO: ? + var err error + membersMap := make(map[uint64]string) + if membersMap, err = Members(obj); err != nil { + return "", err + } + addresses := obj.LocalhostClientURLs() // heuristic, but probably correct + if len(addresses) == 0 { + // probably a programming error... + return "", fmt.Errorf("programming error") + } + endpoint := addresses[0].Host // FIXME: arbitrarily picked the first one + + // part two + ctx := context.Background() + var response *etcd.StatusResponse + for { + if obj.exiting { // the exit signal has been sent! + return "", fmt.Errorf("exiting etcd") + } + + obj.rLock.RLock() + response, err = obj.client.Maintenance.Status(ctx, endpoint) + obj.rLock.RUnlock() + if err == nil { + break + } + if ctx, err = obj.CtxError(ctx, err); err != nil { + return "", err + } + } + + // isLeader: response.Header.MemberId == response.Leader + for id, name := range membersMap { + if id == response.Leader { + return name, nil + } + } + return "", fmt.Errorf("members map is not current") // not found +} diff --git a/etcd/resources.go b/etcd/resources.go new file mode 100644 index 00000000..b19f8233 --- /dev/null +++ b/etcd/resources.go @@ -0,0 +1,182 @@ +// Mgmt +// Copyright (C) 2013-2017+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package etcd + +import ( + "fmt" + "log" + "strings" + + "github.com/purpleidea/mgmt/resources" + "github.com/purpleidea/mgmt/util" + + etcd "github.com/coreos/etcd/clientv3" +) + +// WatchResources returns a channel that outputs events when exported resources +// change. +// TODO: Filter our watch (on the server side if possible) based on the +// collection prefixes and filters that we care about... +func WatchResources(obj *EmbdEtcd) chan error { + ch := make(chan error, 1) // buffer it so we can measure it + path := fmt.Sprintf("/%s/exported/", NS) + callback := func(re *RE) error { + // TODO: is this even needed? it used to happen on conn errors + log.Printf("Etcd: Watch: Path: %v", path) // event + if re == nil || re.response.Canceled { + return fmt.Errorf("watch is empty") // will cause a CtxError+retry + } + // we normally need to check if anything changed since the last + // event, since a set (export) with no changes still causes the + // watcher to trigger and this would cause an infinite loop. we + // don't need to do this check anymore because we do the export + // transactionally, and only if a change is needed. since it is + // atomic, all the changes arrive together which avoids dupes!! + if len(ch) == 0 { // send event only if one isn't pending + // this check avoids multiple events all queueing up and then + // being released continuously long after the changes stopped + // do not block! + ch <- nil // event + } + return nil + } + _, _ = obj.AddWatcher(path, callback, true, false, etcd.WithPrefix()) // no need to check errors + return ch +} + +// SetResources exports all of the resources which we pass in to etcd. +func SetResources(obj *EmbdEtcd, hostname string, resourceList []resources.Res) error { + // key structure is /$NS/exported/$hostname/resources/$uid = $data + + var kindFilter []string // empty to get from everyone + hostnameFilter := []string{hostname} + // this is not a race because we should only be reading keys which we + // set, and there should not be any contention with other hosts here! + originals, err := GetResources(obj, hostnameFilter, kindFilter) + if err != nil { + return err + } + + if len(originals) == 0 && len(resourceList) == 0 { // special case of no add or del + return nil + } + + ifs := []etcd.Cmp{} // list matching the desired state + ops := []etcd.Op{} // list of ops in this transaction + for _, res := range resourceList { + if res.GetKind() == "" { + log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.GetName()) + } + uid := fmt.Sprintf("%s/%s", res.GetKind(), res.GetName()) + path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uid) + if data, err := resources.ResToB64(res); err == nil { + ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", data)) // desired state + ops = append(ops, etcd.OpPut(path, data)) + } else { + return fmt.Errorf("can't convert to B64: %v", err) + } + } + + match := func(res resources.Res, resourceList []resources.Res) bool { // helper lambda + for _, x := range resourceList { + if res.GetKind() == x.GetKind() && res.GetName() == x.GetName() { + return true + } + } + return false + } + + hasDeletes := false + // delete old, now unused resources here... + for _, res := range originals { + if res.GetKind() == "" { + log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.GetName()) + } + uid := fmt.Sprintf("%s/%s", res.GetKind(), res.GetName()) + path := fmt.Sprintf("/%s/exported/%s/resources/%s", NS, hostname, uid) + + if match(res, resourceList) { // if we match, no need to delete! + continue + } + + ops = append(ops, etcd.OpDelete(path)) + + hasDeletes = true + } + + // if everything is already correct, do nothing, otherwise, run the ops! + // it's important to do this in one transaction, and atomically, because + // this way, we only generate one watch event, and only when it's needed + if hasDeletes { // always run, ifs don't matter + _, err = obj.Txn(nil, ops, nil) // TODO: does this run? it should! + } else { + _, err = obj.Txn(ifs, nil, ops) // TODO: do we need to look at response? + } + return err +} + +// GetResources collects all of the resources which match a filter from etcd. +// If the kindfilter or hostnameFilter is empty, then it assumes no filtering... +// TODO: Expand this with a more powerful filter based on what we eventually +// support in our collect DSL. Ideally a server side filter like WithFilter() +// We could do this if the pattern was /$NS/exported/$kind/$hostname/$uid = $data. +func GetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resources.Res, error) { + // key structure is /$NS/exported/$hostname/resources/$uid = $data + path := fmt.Sprintf("/%s/exported/", NS) + resourceList := []resources.Res{} + keyMap, err := obj.Get(path, etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortAscend)) + if err != nil { + return nil, fmt.Errorf("could not get resources: %v", err) + } + for key, val := range keyMap { + if !strings.HasPrefix(key, path) { // sanity check + continue + } + + str := strings.Split(key[len(path):], "/") + if len(str) != 4 { + return nil, fmt.Errorf("unexpected chunk count") + } + hostname, r, kind, name := str[0], str[1], str[2], str[3] + if r != "resources" { + return nil, fmt.Errorf("unexpected chunk pattern") + } + if kind == "" { + return nil, fmt.Errorf("unexpected kind chunk") + } + + // FIXME: ideally this would be a server side filter instead! + if len(hostnameFilter) > 0 && !util.StrInList(hostname, hostnameFilter) { + continue + } + + // FIXME: ideally this would be a server side filter instead! + if len(kindFilter) > 0 && !util.StrInList(kind, kindFilter) { + continue + } + + if obj, err := resources.B64ToRes(val); err == nil { + obj.SetKind(kind) // cheap init + log.Printf("Etcd: Get: (Hostname, Kind, Name): (%s, %s, %s)", hostname, kind, name) + resourceList = append(resourceList, obj) + } else { + return nil, fmt.Errorf("can't convert from B64: %v", err) + } + } + return resourceList, nil +} diff --git a/etcd/str.go b/etcd/str.go index 575c5cb5..0a30eb91 100644 --- a/etcd/str.go +++ b/etcd/str.go @@ -52,7 +52,7 @@ func WatchStr(obj *EmbdEtcd, key string) chan error { return ch } -// GetStr collects the string which matches a gloabl namespace in etcd. +// GetStr collects the string which matches a global namespace in etcd. func GetStr(obj *EmbdEtcd, key string) (string, error) { // new key structure is /$NS/strings/$key = $data path := fmt.Sprintf("/%s/strings/%s", NS, key)