etcd: Use new converged timers and allow skipping them

This implements the new extensions to the converged UUID API so that we
can keep a consistent timer running and reset it when needed. This is
useful because we now allow certain Watcher callbacks and other
operations to explicitly _not_ cause a convergerUUID to un-converge.
This is a necessary dependency for the distributed remote
converged-timeout work.
This commit is contained in:
James Shubin
2016-08-30 04:18:53 -04:00
parent eee652cefe
commit 636f2a36b1

60
etcd.go
View File

@@ -92,6 +92,7 @@ type AW struct {
opts []etcd.OpOption
callback func(*RE) error
errCheck bool
skipConv bool // ask event to skip converger updates
resp Resp
cancelFunc func() // data
}
@@ -104,6 +105,7 @@ type RE struct {
err error
callback func(*RE) error
errCheck bool // should we check the error of the callback?
skipConv bool // event skips converger updates
retryHint bool // set to true for one event after a watcher failure
retries uint // number of times we've retried on error
}
@@ -119,6 +121,7 @@ type KV struct {
// GQ is a struct for the get queue
type GQ struct {
path string
skipConv bool
opts []etcd.OpOption
resp Resp
data map[string]string
@@ -337,11 +340,11 @@ func (obj *EmbdEtcd) Startup() error {
// TODO: implement native etcd watcher method on member API changes
path := fmt.Sprintf("/%s/nominated/", NS)
go obj.AddWatcher(path, obj.nominateCallback, true, etcd.WithPrefix()) // no block
go obj.AddWatcher(path, obj.nominateCallback, true, false, etcd.WithPrefix()) // no block
// setup ideal cluster size watcher
key := fmt.Sprintf("/%s/idealClusterSize", NS)
go obj.AddWatcher(key, obj.idealClusterSizeCallback, true) // no block
go obj.AddWatcher(key, obj.idealClusterSizeCallback, true, false) // no block
// if we have no endpoints, it means we are bootstrapping...
if !bootstrapping {
@@ -367,7 +370,7 @@ func (obj *EmbdEtcd) Startup() error {
if !obj.noServer {
path := fmt.Sprintf("/%s/volunteers/", NS)
go obj.AddWatcher(path, obj.volunteerCallback, true, etcd.WithPrefix()) // no block
go obj.AddWatcher(path, obj.volunteerCallback, true, false, etcd.WithPrefix()) // no block
}
// if i am alone and will have to be a server...
@@ -396,7 +399,7 @@ func (obj *EmbdEtcd) Startup() error {
}
}
go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, etcd.WithPrefix())
go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, false, etcd.WithPrefix())
if err := obj.Connect(false); err != nil { // don't exit from this Startup function until connected!
return err
@@ -683,12 +686,17 @@ func (obj *EmbdEtcd) CbLoop() {
if e := obj.Connect(false); e != nil {
return // fatal
}
// we use this timer because when we ignore un-converge events and loop,
// we reset the ConvergedTimer case statement, ruining the timeout math!
cuuid.StartTimer()
for {
ctx := context.Background() // TODO: inherit as input argument?
select {
// etcd watcher event
case re := <-obj.wevents:
cuuid.SetConverged(false) // activity!
if !re.skipConv { // if we want to count it...
cuuid.ResetTimer() // activity!
}
if TRACE {
log.Printf("Trace: Etcd: CbLoop: Event: StartLoop")
}
@@ -717,13 +725,10 @@ func (obj *EmbdEtcd) CbLoop() {
log.Printf("Trace: Etcd: CbLoop: Event: FinishLoop")
}
// converged timeout
case _ = <-cuuid.ConvergedTimer():
cuuid.SetConverged(true) // converged!
// exit loop commit
case <-obj.exitTimeout:
log.Println("Etcd: Exiting callback loop!")
cuuid.StopTimer() // clean up nicely
return
}
}
@@ -737,12 +742,13 @@ func (obj *EmbdEtcd) Loop() {
if e := obj.Connect(false); e != nil {
return // fatal
}
cuuid.StartTimer()
for {
ctx := context.Background() // TODO: inherit as input argument?
// priority channel...
select {
case aw := <-obj.awq:
cuuid.SetConverged(false) // activity!
cuuid.ResetTimer() // activity!
if TRACE {
log.Printf("Trace: Etcd: Loop: PriorityAW: StartLoop")
}
@@ -758,7 +764,7 @@ func (obj *EmbdEtcd) Loop() {
select {
// add watcher
case aw := <-obj.awq:
cuuid.SetConverged(false) // activity!
cuuid.ResetTimer() // activity!
if TRACE {
log.Printf("Trace: Etcd: Loop: AW: StartLoop")
}
@@ -769,7 +775,7 @@ func (obj *EmbdEtcd) Loop() {
// set kv pair
case kv := <-obj.setq:
cuuid.SetConverged(false) // activity!
cuuid.ResetTimer() // activity!
if TRACE {
log.Printf("Trace: Etcd: Loop: Set: StartLoop")
}
@@ -793,7 +799,9 @@ func (obj *EmbdEtcd) Loop() {
// get value
case gq := <-obj.getq:
cuuid.SetConverged(false) // activity!
if !gq.skipConv {
cuuid.ResetTimer() // activity!
}
if TRACE {
log.Printf("Trace: Etcd: Loop: Get: StartLoop")
}
@@ -818,7 +826,7 @@ func (obj *EmbdEtcd) Loop() {
// delete value
case dl := <-obj.delq:
cuuid.SetConverged(false) // activity!
cuuid.ResetTimer() // activity!
if TRACE {
log.Printf("Trace: Etcd: Loop: Delete: StartLoop")
}
@@ -843,7 +851,7 @@ func (obj *EmbdEtcd) Loop() {
// run txn
case tn := <-obj.txnq:
cuuid.SetConverged(false) // activity!
cuuid.ResetTimer() // activity!
if TRACE {
log.Printf("Trace: Etcd: Loop: Txn: StartLoop")
}
@@ -866,10 +874,6 @@ func (obj *EmbdEtcd) Loop() {
log.Printf("Trace: Etcd: Loop: Txn: FinishLoop")
}
// converged timeout
case _ = <-cuuid.ConvergedTimer():
cuuid.SetConverged(true) // converged!
// exit loop signal
case <-obj.exitchan:
log.Println("Etcd: Exiting loop shortly...")
@@ -882,6 +886,7 @@ func (obj *EmbdEtcd) Loop() {
// exit loop commit
case <-obj.exitTimeout:
log.Println("Etcd: Exiting loop!")
cuuid.StopTimer() // clean up nicely
return
}
}
@@ -936,8 +941,15 @@ func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error {
// Get performs a get operation and waits for an ACK to continue
func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, error) {
return obj.ComplexGet(path, false, opts...)
}
// ComplexGet performs a get operation and waits for an ACK to continue. It can
// accept more arguments that are useful for the less common operations.
// TODO: perhaps a get should never cause an un-converge ?
func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error) {
resp := NewResp()
gq := &GQ{path: path, opts: opts, resp: resp, data: nil}
gq := &GQ{path: path, skipConv: skipConv, opts: opts, resp: resp, data: nil}
obj.getq <- gq // send
if !resp.Wait() { // wait for ack/nack
return nil, fmt.Errorf("Etcd: Get: Probably received an exit...")
@@ -1022,9 +1034,9 @@ func (obj *EmbdEtcd) rawTxn(ctx context.Context, tn *TN) (*etcd.TxnResponse, err
// AddWatcher queues up an add watcher request and returns a cancel function
// Remember to add the etcd.WithPrefix() option if you want to watch recursively
func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, opts ...etcd.OpOption) (func(), error) {
func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error) {
resp := NewResp()
awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, cancelFunc: nil, resp: resp}
awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, skipConv: skipConv, cancelFunc: nil, resp: resp}
obj.awq <- awq // send
if !resp.Wait() { // wait for ack/nack
return nil, fmt.Errorf("Etcd: AddWatcher: Got NACK!")
@@ -1103,7 +1115,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error)
// the response includes a list of grouped events, each
// of which includes one Kv struct. Send these all in a
// batched group so that they are processed together...
obj.wevents <- &RE{response: response, path: aw.path, err: err, callback: aw.callback, errCheck: aw.errCheck, retryHint: retry} // send event
obj.wevents <- &RE{response: response, path: aw.path, err: err, callback: aw.callback, errCheck: aw.errCheck, skipConv: aw.skipConv, retryHint: retry} // send event
}
}(cancelCtx)
return cancelFunc, nil
@@ -2022,7 +2034,7 @@ func EtcdWatch(obj *EmbdEtcd) chan bool {
}
return nil
}
_, _ = obj.AddWatcher(path, callback, true, etcd.WithPrefix()) // no need to check errors
_, _ = obj.AddWatcher(path, callback, true, false, etcd.WithPrefix()) // no need to check errors
return ch
}