etcd: Remove stuttering in package

This is a good first step to cleaning up the package.
This commit is contained in:
James Shubin
2017-02-12 22:51:46 -05:00
parent e96041d76f
commit 35d3328e3e
3 changed files with 82 additions and 82 deletions

View File

@@ -368,7 +368,7 @@ func (obj *EmbdEtcd) Startup() error {
// if we have no endpoints, it means we are bootstrapping... // if we have no endpoints, it means we are bootstrapping...
if !bootstrapping { if !bootstrapping {
log.Println("Etcd: Startup: Getting initial values...") log.Println("Etcd: Startup: Getting initial values...")
if nominated, err := EtcdNominated(obj); err == nil { if nominated, err := Nominated(obj); err == nil {
obj.nominated = nominated // store a local copy obj.nominated = nominated // store a local copy
} else { } else {
log.Printf("Etcd: Startup: Nominate lookup error.") log.Printf("Etcd: Startup: Nominate lookup error.")
@@ -377,7 +377,7 @@ func (obj *EmbdEtcd) Startup() error {
} }
// get initial ideal cluster size // get initial ideal cluster size
if idealClusterSize, err := EtcdGetClusterSize(obj); err == nil { if idealClusterSize, err := GetClusterSize(obj); err == nil {
obj.idealClusterSize = idealClusterSize obj.idealClusterSize = idealClusterSize
log.Printf("Etcd: Startup: Ideal cluster size is: %d", idealClusterSize) log.Printf("Etcd: Startup: Ideal cluster size is: %d", idealClusterSize)
} else { } else {
@@ -396,7 +396,7 @@ func (obj *EmbdEtcd) Startup() error {
if !obj.noServer && bootstrapping { if !obj.noServer && bootstrapping {
log.Printf("Etcd: Bootstrapping...") log.Printf("Etcd: Bootstrapping...")
// give an initial value to the obj.nominate map we keep in sync // give an initial value to the obj.nominate map we keep in sync
// this emulates EtcdNominate(obj, obj.hostname, obj.serverURLs) // this emulates Nominate(obj, obj.hostname, obj.serverURLs)
obj.nominated[obj.hostname] = obj.serverURLs // initial value obj.nominated[obj.hostname] = obj.serverURLs // initial value
// NOTE: when we are stuck waiting for the server to start up, // NOTE: when we are stuck waiting for the server to start up,
// it is probably happening on this call right here... // it is probably happening on this call right here...
@@ -407,11 +407,11 @@ func (obj *EmbdEtcd) Startup() error {
if !obj.noServer && len(obj.serverURLs) > 0 { if !obj.noServer && len(obj.serverURLs) > 0 {
// we run this in a go routine because it blocks waiting for server // we run this in a go routine because it blocks waiting for server
log.Printf("Etcd: Startup: Volunteering...") log.Printf("Etcd: Startup: Volunteering...")
go EtcdVolunteer(obj, obj.serverURLs) go Volunteer(obj, obj.serverURLs)
} }
if bootstrapping { if bootstrapping {
if err := EtcdSetClusterSize(obj, obj.idealClusterSize); err != nil { if err := SetClusterSize(obj, obj.idealClusterSize); err != nil {
log.Printf("Etcd: Startup: Ideal cluster size storage error.") log.Printf("Etcd: Startup: Ideal cluster size storage error.")
obj.Destroy() obj.Destroy()
return fmt.Errorf("Etcd: Startup: Error: %v", err) return fmt.Errorf("Etcd: Startup: Error: %v", err)
@@ -432,7 +432,7 @@ func (obj *EmbdEtcd) Destroy() error {
// this should also trigger an unnominate, which should cause a shutdown // this should also trigger an unnominate, which should cause a shutdown
log.Printf("Etcd: Destroy: Unvolunteering...") log.Printf("Etcd: Destroy: Unvolunteering...")
if err := EtcdVolunteer(obj, nil); err != nil { // unvolunteer so we can shutdown... if err := Volunteer(obj, nil); err != nil { // unvolunteer so we can shutdown...
log.Printf("Etcd: Destroy: Error: %v", err) // we have a problem log.Printf("Etcd: Destroy: Error: %v", err) // we have a problem
} }
@@ -1189,7 +1189,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
return nil // if we're not a server, we're not a leader, return return nil // if we're not a server, we're not a leader, return
} }
membersMap, err := EtcdMembers(obj) // map[uint64]string membersMap, err := Members(obj) // map[uint64]string
if err != nil { if err != nil {
return fmt.Errorf("Etcd: Members: Error: %+v", err) return fmt.Errorf("Etcd: Members: Error: %+v", err)
} }
@@ -1212,7 +1212,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
} }
} }
leader, err := EtcdLeader(obj) // XXX: race! leader, err := Leader(obj) // XXX: race!
if err != nil { if err != nil {
log.Printf("Etcd: Leader: Error: %+v", err) log.Printf("Etcd: Leader: Error: %+v", err)
return fmt.Errorf("Etcd: Leader: Error: %+v", err) return fmt.Errorf("Etcd: Leader: Error: %+v", err)
@@ -1225,7 +1225,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
// i am the leader! // i am the leader!
// get the list of available volunteers // get the list of available volunteers
volunteersMap, err := EtcdVolunteers(obj) volunteersMap, err := Volunteers(obj)
if err != nil { if err != nil {
log.Printf("Etcd: Volunteers: Error: %+v", err) log.Printf("Etcd: Volunteers: Error: %+v", err)
return fmt.Errorf("Etcd: Volunteers: Error: %+v", err) return fmt.Errorf("Etcd: Volunteers: Error: %+v", err)
@@ -1245,7 +1245,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
// if we're the only member left, just shutdown... // if we're the only member left, just shutdown...
if len(members) == 1 && members[0] == obj.hostname && len(quitters) == 1 && quitters[0] == obj.hostname { if len(members) == 1 && members[0] == obj.hostname && len(quitters) == 1 && quitters[0] == obj.hostname {
log.Printf("Etcd: Quitters: Shutting down self...") log.Printf("Etcd: Quitters: Shutting down self...")
if err := EtcdNominate(obj, obj.hostname, nil); err != nil { // unnominate myself if err := Nominate(obj, obj.hostname, nil); err != nil { // unnominate myself
return &CtxDelayErr{1 * time.Second, fmt.Sprintf("error shutting down self: %v", err)} return &CtxDelayErr{1 * time.Second, fmt.Sprintf("error shutting down self: %v", err)}
} }
return nil return nil
@@ -1267,7 +1267,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
// NOTE: storing peerURLs when they're already in volunteers/ is // NOTE: storing peerURLs when they're already in volunteers/ is
// redundant, but it seems to be necessary for a sane algorithm. // redundant, but it seems to be necessary for a sane algorithm.
// nominate before we call the API so that members see it first! // nominate before we call the API so that members see it first!
EtcdNominate(obj, chosen, peerURLs) Nominate(obj, chosen, peerURLs)
// XXX: add a ttl here, because once we nominate someone, we // XXX: add a ttl here, because once we nominate someone, we
// need to give them up to N seconds to start up after we run // need to give them up to N seconds to start up after we run
// the MemberAdd API because if they don't, in some situations // the MemberAdd API because if they don't, in some situations
@@ -1280,7 +1280,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
// member at a time! // member at a time!
log.Printf("Etcd: Member Add: %v", peerURLs) log.Printf("Etcd: Member Add: %v", peerURLs)
mresp, err := EtcdMemberAdd(obj, peerURLs) mresp, err := MemberAdd(obj, peerURLs)
if err != nil { if err != nil {
// on error this function will run again, which is good // on error this function will run again, which is good
// because we need to make sure to run the below parts! // because we need to make sure to run the below parts!
@@ -1311,7 +1311,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
// programming error // programming error
log.Fatalf("Etcd: Member Remove: Error: %v(%v) not in members list!", quitter, mID) log.Fatalf("Etcd: Member Remove: Error: %v(%v) not in members list!", quitter, mID)
} }
EtcdNominate(obj, quitter, nil) // unnominate Nominate(obj, quitter, nil) // unnominate
// once we issue the above unnominate, that peer will // once we issue the above unnominate, that peer will
// shutdown, and this might cause us to loose quorum, // shutdown, and this might cause us to loose quorum,
// therefore, let that member remove itself, and then // therefore, let that member remove itself, and then
@@ -1326,7 +1326,7 @@ func (obj *EmbdEtcd) volunteerCallback(re *RE) error {
log.Printf("Etcd: Waiting %d seconds for %s to self remove...", selfRemoveTimeout, quitter) log.Printf("Etcd: Waiting %d seconds for %s to self remove...", selfRemoveTimeout, quitter)
time.Sleep(selfRemoveTimeout * time.Second) time.Sleep(selfRemoveTimeout * time.Second)
// in case the removed member doesn't remove itself, do it! // in case the removed member doesn't remove itself, do it!
removed, err := EtcdMemberRemove(obj, mID) removed, err := MemberRemove(obj, mID)
if err != nil { if err != nil {
return fmt.Errorf("Etcd: Member Remove: Error: %+v", err) return fmt.Errorf("Etcd: Member Remove: Error: %+v", err)
} }
@@ -1373,7 +1373,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
// leader once the current leader calls the MemberAdd API and it // leader once the current leader calls the MemberAdd API and it
// steps down trying to form a two host cluster. Instead, we can // steps down trying to form a two host cluster. Instead, we can
// look at the event response data to read the nominated values! // look at the event response data to read the nominated values!
//nominated, err = EtcdNominated(obj) // nope, won't always work //nominated, err = Nominated(obj) // nope, won't always work
// since we only see what has *changed* in the response data, we // since we only see what has *changed* in the response data, we
// have to keep track of the original state and apply the deltas // have to keep track of the original state and apply the deltas
// this must be idempotent in case it errors and is called again // this must be idempotent in case it errors and is called again
@@ -1387,7 +1387,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
} else { } else {
// TODO: should we just use the above delta method for everything? // TODO: should we just use the above delta method for everything?
//nominated, err := EtcdNominated(obj) // just get it //nominated, err := Nominated(obj) // just get it
//if err != nil { //if err != nil {
// return fmt.Errorf("Etcd: Nominate: Error: %+v", err) // return fmt.Errorf("Etcd: Nominate: Error: %+v", err)
//} //}
@@ -1436,7 +1436,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
// XXX: just put this wherever for now so we don't block // XXX: just put this wherever for now so we don't block
// nominate self so "member" list is correct for peers to see // nominate self so "member" list is correct for peers to see
EtcdNominate(obj, obj.hostname, obj.serverURLs) Nominate(obj, obj.hostname, obj.serverURLs)
// XXX: if this fails, where will we retry this part ? // XXX: if this fails, where will we retry this part ?
} }
@@ -1444,7 +1444,7 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
if curls := obj.clientURLs; len(curls) > 0 { if curls := obj.clientURLs; len(curls) > 0 {
// XXX: don't advertise local addresses! 127.0.0.1:2381 doesn't really help remote hosts // XXX: don't advertise local addresses! 127.0.0.1:2381 doesn't really help remote hosts
// XXX: but sometimes this is what we want... hmmm how do we decide? filter on callback? // XXX: but sometimes this is what we want... hmmm how do we decide? filter on callback?
EtcdAdvertiseEndpoints(obj, curls) AdvertiseEndpoints(obj, curls)
// XXX: if this fails, where will we retry this part ? // XXX: if this fails, where will we retry this part ?
// force this to remove sentinel before we reconnect... // force this to remove sentinel before we reconnect...
@@ -1455,14 +1455,14 @@ func (obj *EmbdEtcd) nominateCallback(re *RE) error {
} else if obj.server != nil && !exists { } else if obj.server != nil && !exists {
// un advertise client urls // un advertise client urls
EtcdAdvertiseEndpoints(obj, nil) AdvertiseEndpoints(obj, nil)
// i have been un-nominated, remove self and shutdown server! // i have been un-nominated, remove self and shutdown server!
if len(obj.nominated) != 0 { // don't call if nobody left but me! if len(obj.nominated) != 0 { // don't call if nobody left but me!
// this works around: https://github.com/coreos/etcd/issues/5482, // this works around: https://github.com/coreos/etcd/issues/5482,
// and it probably makes sense to avoid calling if we're the last // and it probably makes sense to avoid calling if we're the last
log.Printf("Etcd: Member Remove: Removing self: %v", obj.memberID) log.Printf("Etcd: Member Remove: Removing self: %v", obj.memberID)
removed, err := EtcdMemberRemove(obj, obj.memberID) removed, err := MemberRemove(obj, obj.memberID)
if err != nil { if err != nil {
return fmt.Errorf("Etcd: Member Remove: Error: %+v", err) return fmt.Errorf("Etcd: Member Remove: Error: %+v", err)
} }
@@ -1524,7 +1524,7 @@ func (obj *EmbdEtcd) endpointCallback(re *RE) error {
endpoints, err := ApplyDeltaEvents(re, endpoints) endpoints, err := ApplyDeltaEvents(re, endpoints)
if err != nil || exists { if err != nil || exists {
// TODO: we could also lookup endpoints from the maintenance api // TODO: we could also lookup endpoints from the maintenance api
endpoints, err = EtcdEndpoints(obj) endpoints, err = Endpoints(obj)
if err != nil { if err != nil {
return err return err
} }
@@ -1717,11 +1717,11 @@ func (obj *EmbdEtcd) DestroyServer() error {
// TODO: Could all these Etcd*(obj *EmbdEtcd, ...) functions which deal with the // TODO: Could all these Etcd*(obj *EmbdEtcd, ...) functions which deal with the
// interface between etcd paths and behaviour be grouped into a single struct ? // interface between etcd paths and behaviour be grouped into a single struct ?
// EtcdNominate nominates a particular client to be a server (peer) // Nominate nominates a particular client to be a server (peer)
func EtcdNominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error { func Nominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error {
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: EtcdNominate(%v): %v", hostname, urls.String()) log.Printf("Trace: Etcd: Nominate(%v): %v", hostname, urls.String())
defer log.Printf("Trace: Etcd: EtcdNominate(%v): Finished!", hostname) defer log.Printf("Trace: Etcd: Nominate(%v): Finished!", hostname)
} }
// nominate someone to be a server // nominate someone to be a server
nominate := fmt.Sprintf("/%s/nominated/%s", NS, hostname) nominate := fmt.Sprintf("/%s/nominated/%s", NS, hostname)
@@ -1739,9 +1739,9 @@ func EtcdNominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error {
return nil return nil
} }
// EtcdNominated returns a urls map of nominated etcd server volunteers // Nominated returns a urls map of nominated etcd server volunteers
// NOTE: I know 'nominees' might be more correct, but is less consistent here // NOTE: I know 'nominees' might be more correct, but is less consistent here
func EtcdNominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { func Nominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
path := fmt.Sprintf("/%s/nominated/", NS) path := fmt.Sprintf("/%s/nominated/", NS)
keyMap, err := obj.Get(path, etcd.WithPrefix()) // map[string]string, bool keyMap, err := obj.Get(path, etcd.WithPrefix()) // map[string]string, bool
if err != nil { if err != nil {
@@ -1768,11 +1768,11 @@ func EtcdNominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
return nominated, nil return nominated, nil
} }
// EtcdVolunteer offers yourself up to be a server if needed // Volunteer offers yourself up to be a server if needed
func EtcdVolunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error { func Volunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error {
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: EtcdVolunteer(%v): %v", obj.hostname, urls.String()) log.Printf("Trace: Etcd: Volunteer(%v): %v", obj.hostname, urls.String())
defer log.Printf("Trace: Etcd: EtcdVolunteer(%v): Finished!", obj.hostname) defer log.Printf("Trace: Etcd: Volunteer(%v): Finished!", obj.hostname)
} }
// volunteer to be a server // volunteer to be a server
volunteer := fmt.Sprintf("/%s/volunteers/%s", NS, obj.hostname) volunteer := fmt.Sprintf("/%s/volunteers/%s", NS, obj.hostname)
@@ -1791,11 +1791,11 @@ func EtcdVolunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error {
return nil return nil
} }
// EtcdVolunteers returns a urls map of available etcd server volunteers // Volunteers returns a urls map of available etcd server volunteers
func EtcdVolunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { func Volunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: EtcdVolunteers()") log.Printf("Trace: Etcd: Volunteers()")
defer log.Printf("Trace: Etcd: EtcdVolunteers(): Finished!") defer log.Printf("Trace: Etcd: Volunteers(): Finished!")
} }
path := fmt.Sprintf("/%s/volunteers/", NS) path := fmt.Sprintf("/%s/volunteers/", NS)
keyMap, err := obj.Get(path, etcd.WithPrefix()) keyMap, err := obj.Get(path, etcd.WithPrefix())
@@ -1823,11 +1823,11 @@ func EtcdVolunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
return volunteers, nil return volunteers, nil
} }
// EtcdAdvertiseEndpoints advertises the list of available client endpoints // AdvertiseEndpoints advertises the list of available client endpoints
func EtcdAdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error { func AdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error {
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: EtcdAdvertiseEndpoints(%v): %v", obj.hostname, urls.String()) log.Printf("Trace: Etcd: AdvertiseEndpoints(%v): %v", obj.hostname, urls.String())
defer log.Printf("Trace: Etcd: EtcdAdvertiseEndpoints(%v): Finished!", obj.hostname) defer log.Printf("Trace: Etcd: AdvertiseEndpoints(%v): Finished!", obj.hostname)
} }
// advertise endpoints // advertise endpoints
endpoints := fmt.Sprintf("/%s/endpoints/%s", NS, obj.hostname) endpoints := fmt.Sprintf("/%s/endpoints/%s", NS, obj.hostname)
@@ -1846,11 +1846,11 @@ func EtcdAdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error {
return nil return nil
} }
// EtcdEndpoints returns a urls map of available etcd server endpoints // Endpoints returns a urls map of available etcd server endpoints
func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) { func Endpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: EtcdEndpoints()") log.Printf("Trace: Etcd: Endpoints()")
defer log.Printf("Trace: Etcd: EtcdEndpoints(): Finished!") defer log.Printf("Trace: Etcd: Endpoints(): Finished!")
} }
path := fmt.Sprintf("/%s/endpoints/", NS) path := fmt.Sprintf("/%s/endpoints/", NS)
keyMap, err := obj.Get(path, etcd.WithPrefix()) keyMap, err := obj.Get(path, etcd.WithPrefix())
@@ -1878,11 +1878,11 @@ func EtcdEndpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error) {
return endpoints, nil return endpoints, nil
} }
// EtcdSetHostnameConverged sets whether a specific hostname is converged. // SetHostnameConverged sets whether a specific hostname is converged.
func EtcdSetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error { func SetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error {
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%s): %v", hostname, isConverged) log.Printf("Trace: Etcd: SetHostnameConverged(%s): %v", hostname, isConverged)
defer log.Printf("Trace: Etcd: EtcdSetHostnameConverged(%v): Finished!", hostname) defer log.Printf("Trace: Etcd: SetHostnameConverged(%v): Finished!", hostname)
} }
converged := fmt.Sprintf("/%s/converged/%s", NS, hostname) converged := fmt.Sprintf("/%s/converged/%s", NS, hostname)
op := []etcd.Op{etcd.OpPut(converged, fmt.Sprintf("%t", isConverged))} op := []etcd.Op{etcd.OpPut(converged, fmt.Sprintf("%t", isConverged))}
@@ -1892,11 +1892,11 @@ func EtcdSetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool)
return nil return nil
} }
// EtcdHostnameConverged returns a map of every hostname's converged state. // HostnameConverged returns a map of every hostname's converged state.
func EtcdHostnameConverged(obj *EmbdEtcd) (map[string]bool, error) { func HostnameConverged(obj *EmbdEtcd) (map[string]bool, error) {
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: EtcdHostnameConverged()") log.Printf("Trace: Etcd: HostnameConverged()")
defer log.Printf("Trace: Etcd: EtcdHostnameConverged(): Finished!") defer log.Printf("Trace: Etcd: HostnameConverged(): Finished!")
} }
path := fmt.Sprintf("/%s/converged/", NS) path := fmt.Sprintf("/%s/converged/", NS)
keyMap, err := obj.ComplexGet(path, true, etcd.WithPrefix()) // don't un-converge keyMap, err := obj.ComplexGet(path, true, etcd.WithPrefix()) // don't un-converge
@@ -1921,14 +1921,14 @@ func EtcdHostnameConverged(obj *EmbdEtcd) (map[string]bool, error) {
return converged, nil return converged, nil
} }
// EtcdAddHostnameConvergedWatcher adds a watcher with a callback that runs on // AddHostnameConvergedWatcher adds a watcher with a callback that runs on
// hostname state changes. // hostname state changes.
func EtcdAddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error) { func AddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error) {
path := fmt.Sprintf("/%s/converged/", NS) path := fmt.Sprintf("/%s/converged/", NS)
internalCbFn := func(re *RE) error { internalCbFn := func(re *RE) error {
// TODO: get the value from the response, and apply delta... // TODO: get the value from the response, and apply delta...
// for now, just run a get operation which is easier to code! // for now, just run a get operation which is easier to code!
m, err := EtcdHostnameConverged(obj) m, err := HostnameConverged(obj)
if err != nil { if err != nil {
return err return err
} }
@@ -1937,11 +1937,11 @@ func EtcdAddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]b
return obj.AddWatcher(path, internalCbFn, true, true, etcd.WithPrefix()) // no block and no converger reset return obj.AddWatcher(path, internalCbFn, true, true, etcd.WithPrefix()) // no block and no converger reset
} }
// EtcdSetClusterSize sets the ideal target cluster size of etcd peers // SetClusterSize sets the ideal target cluster size of etcd peers
func EtcdSetClusterSize(obj *EmbdEtcd, value uint16) error { func SetClusterSize(obj *EmbdEtcd, value uint16) error {
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: EtcdSetClusterSize(): %v", value) log.Printf("Trace: Etcd: SetClusterSize(): %v", value)
defer log.Printf("Trace: Etcd: EtcdSetClusterSize(): Finished!") defer log.Printf("Trace: Etcd: SetClusterSize(): Finished!")
} }
key := fmt.Sprintf("/%s/idealClusterSize", NS) key := fmt.Sprintf("/%s/idealClusterSize", NS)
@@ -1951,8 +1951,8 @@ func EtcdSetClusterSize(obj *EmbdEtcd, value uint16) error {
return nil return nil
} }
// EtcdGetClusterSize gets the ideal target cluster size of etcd peers // GetClusterSize gets the ideal target cluster size of etcd peers
func EtcdGetClusterSize(obj *EmbdEtcd) (uint16, error) { func GetClusterSize(obj *EmbdEtcd) (uint16, error) {
key := fmt.Sprintf("/%s/idealClusterSize", NS) key := fmt.Sprintf("/%s/idealClusterSize", NS)
keyMap, err := obj.Get(key) keyMap, err := obj.Get(key)
if err != nil { if err != nil {
@@ -1971,8 +1971,8 @@ func EtcdGetClusterSize(obj *EmbdEtcd) (uint16, error) {
return uint16(v), nil return uint16(v), nil
} }
// EtcdMemberAdd adds a member to the cluster. // MemberAdd adds a member to the cluster.
func EtcdMemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error) { func MemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error) {
//obj.Connect(false) // TODO: ? //obj.Connect(false) // TODO: ?
ctx := context.Background() ctx := context.Background()
var response *etcd.MemberAddResponse var response *etcd.MemberAddResponse
@@ -1994,10 +1994,10 @@ func EtcdMemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddRespo
return response, nil return response, nil
} }
// EtcdMemberRemove removes a member by mID and returns if it worked, and also // MemberRemove removes a member by mID and returns if it worked, and also
// if there was an error. This is because it might have run without error, but // if there was an error. This is because it might have run without error, but
// the member wasn't found, for example. // the member wasn't found, for example.
func EtcdMemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) { func MemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) {
//obj.Connect(false) // TODO: ? //obj.Connect(false) // TODO: ?
ctx := context.Background() ctx := context.Background()
for { for {
@@ -2020,10 +2020,10 @@ func EtcdMemberRemove(obj *EmbdEtcd, mID uint64) (bool, error) {
return true, nil return true, nil
} }
// EtcdMembers returns information on cluster membership. // Members returns information on cluster membership.
// The member ID's are the keys, because an empty names means unstarted! // The member ID's are the keys, because an empty names means unstarted!
// TODO: consider queueing this through the main loop with CtxError(ctx, err) // TODO: consider queueing this through the main loop with CtxError(ctx, err)
func EtcdMembers(obj *EmbdEtcd) (map[uint64]string, error) { func Members(obj *EmbdEtcd) (map[uint64]string, error) {
//obj.Connect(false) // TODO: ? //obj.Connect(false) // TODO: ?
ctx := context.Background() ctx := context.Background()
var response *etcd.MemberListResponse var response *etcd.MemberListResponse
@@ -2034,7 +2034,7 @@ func EtcdMembers(obj *EmbdEtcd) (map[uint64]string, error) {
} }
obj.rLock.RLock() obj.rLock.RLock()
if obj.flags.Trace { if obj.flags.Trace {
log.Printf("Trace: Etcd: EtcdMembers(): Endpoints are: %v", obj.client.Endpoints()) log.Printf("Trace: Etcd: Members(): Endpoints are: %v", obj.client.Endpoints())
} }
response, err = obj.client.MemberList(ctx) response, err = obj.client.MemberList(ctx)
obj.rLock.RUnlock() obj.rLock.RUnlock()
@@ -2053,12 +2053,12 @@ func EtcdMembers(obj *EmbdEtcd) (map[uint64]string, error) {
return members, nil return members, nil
} }
// EtcdLeader returns the current leader of the etcd server cluster // Leader returns the current leader of the etcd server cluster
func EtcdLeader(obj *EmbdEtcd) (string, error) { func Leader(obj *EmbdEtcd) (string, error) {
//obj.Connect(false) // TODO: ? //obj.Connect(false) // TODO: ?
var err error var err error
membersMap := make(map[uint64]string) membersMap := make(map[uint64]string)
if membersMap, err = EtcdMembers(obj); err != nil { if membersMap, err = Members(obj); err != nil {
return "", err return "", err
} }
addresses := obj.LocalhostClientURLs() // heuristic, but probably correct addresses := obj.LocalhostClientURLs() // heuristic, but probably correct
@@ -2096,10 +2096,10 @@ func EtcdLeader(obj *EmbdEtcd) (string, error) {
return "", fmt.Errorf("Etcd: Members map is not current!") // not found return "", fmt.Errorf("Etcd: Members map is not current!") // not found
} }
// EtcdWatch returns a channel that outputs a true bool when activity occurs // WatchAll returns a channel that outputs a true bool when activity occurs
// TODO: Filter our watch (on the server side if possible) based on the // TODO: Filter our watch (on the server side if possible) based on the
// collection prefixes and filters that we care about... // collection prefixes and filters that we care about...
func EtcdWatch(obj *EmbdEtcd) chan bool { func WatchAll(obj *EmbdEtcd) chan bool {
ch := make(chan bool, 1) // buffer it so we can measure it ch := make(chan bool, 1) // buffer it so we can measure it
path := fmt.Sprintf("/%s/exported/", NS) path := fmt.Sprintf("/%s/exported/", NS)
callback := func(re *RE) error { callback := func(re *RE) error {
@@ -2126,15 +2126,15 @@ func EtcdWatch(obj *EmbdEtcd) chan bool {
return ch return ch
} }
// EtcdSetResources exports all of the resources which we pass in to etcd // SetResources exports all of the resources which we pass in to etcd
func EtcdSetResources(obj *EmbdEtcd, hostname string, resourceList []resources.Res) error { func SetResources(obj *EmbdEtcd, hostname string, resourceList []resources.Res) error {
// key structure is /$NS/exported/$hostname/resources/$uid = $data // key structure is /$NS/exported/$hostname/resources/$uid = $data
var kindFilter []string // empty to get from everyone var kindFilter []string // empty to get from everyone
hostnameFilter := []string{hostname} hostnameFilter := []string{hostname}
// this is not a race because we should only be reading keys which we // this is not a race because we should only be reading keys which we
// set, and there should not be any contention with other hosts here! // set, and there should not be any contention with other hosts here!
originals, err := EtcdGetResources(obj, hostnameFilter, kindFilter) originals, err := GetResources(obj, hostnameFilter, kindFilter)
if err != nil { if err != nil {
return err return err
} }
@@ -2197,12 +2197,12 @@ func EtcdSetResources(obj *EmbdEtcd, hostname string, resourceList []resources.R
return err return err
} }
// EtcdGetResources collects all of the resources which match a filter from etcd // GetResources collects all of the resources which match a filter from etcd
// If the kindfilter or hostnameFilter is empty, then it assumes no filtering... // If the kindfilter or hostnameFilter is empty, then it assumes no filtering...
// TODO: Expand this with a more powerful filter based on what we eventually // TODO: Expand this with a more powerful filter based on what we eventually
// support in our collect DSL. Ideally a server side filter like WithFilter() // support in our collect DSL. Ideally a server side filter like WithFilter()
// We could do this if the pattern was /$NS/exported/$kind/$hostname/$uid = $data // We could do this if the pattern was /$NS/exported/$kind/$hostname/$uid = $data
func EtcdGetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resources.Res, error) { func GetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resources.Res, error) {
// key structure is /$NS/exported/$hostname/resources/$uid = $data // key structure is /$NS/exported/$hostname/resources/$uid = $data
path := fmt.Sprintf("/%s/exported/", NS) path := fmt.Sprintf("/%s/exported/", NS)
resourceList := []resources.Res{} resourceList := []resources.Res{}

View File

@@ -30,7 +30,7 @@ type World struct {
// ResExport exports a list of resources under our hostname namespace. // ResExport exports a list of resources under our hostname namespace.
// Subsequent calls replace the previously set collection atomically. // Subsequent calls replace the previously set collection atomically.
func (obj *World) ResExport(resourceList []resources.Res) error { func (obj *World) ResExport(resourceList []resources.Res) error {
return EtcdSetResources(obj.EmbdEtcd, obj.Hostname, resourceList) return SetResources(obj.EmbdEtcd, obj.Hostname, resourceList)
} }
// ResCollect gets the collection of exported resources which match the filter. // ResCollect gets the collection of exported resources which match the filter.
@@ -39,5 +39,5 @@ func (obj *World) ResCollect(hostnameFilter, kindFilter []string) ([]resources.R
// XXX: should we be restricted to retrieving resources that were // XXX: should we be restricted to retrieving resources that were
// exported with a tag that allows or restricts our hostname? We could // exported with a tag that allows or restricts our hostname? We could
// enforce that here if the underlying API supported it... Add this? // enforce that here if the underlying API supported it... Add this?
return EtcdGetResources(obj.EmbdEtcd, hostnameFilter, kindFilter) return GetResources(obj.EmbdEtcd, hostnameFilter, kindFilter)
} }

View File

@@ -342,7 +342,7 @@ func (obj *Main) Run() error {
return nil return nil
} }
// send our individual state into etcd for others to see // send our individual state into etcd for others to see
return etcd.EtcdSetHostnameConverged(EmbdEtcd, hostname, b) // TODO: what should happen on error? return etcd.SetHostnameConverged(EmbdEtcd, hostname, b) // TODO: what should happen on error?
} }
if EmbdEtcd != nil { if EmbdEtcd != nil {
converger.SetStateFn(convergerStateFn) converger.SetStateFn(convergerStateFn)
@@ -373,7 +373,7 @@ func (obj *Main) Run() error {
close(startChan) // kick it off! close(startChan) // kick it off!
log.Println("Etcd: Starting...") log.Println("Etcd: Starting...")
etcdChan := etcd.EtcdWatch(EmbdEtcd) etcdChan := etcd.WatchAll(EmbdEtcd)
first := true // first loop or not first := true // first loop or not
for { for {
log.Println("Main: Waiting...") log.Println("Main: Waiting...")
@@ -511,7 +511,7 @@ func (obj *Main) Run() error {
// initialize the add watcher, which calls the f callback on map changes // initialize the add watcher, which calls the f callback on map changes
convergerCb := func(f func(map[string]bool) error) (func(), error) { convergerCb := func(f func(map[string]bool) error) (func(), error) {
return etcd.EtcdAddHostnameConvergedWatcher(EmbdEtcd, f) return etcd.AddHostnameConvergedWatcher(EmbdEtcd, f)
} }
// build remotes struct for remote ssh // build remotes struct for remote ssh