diff --git a/etcd.go b/etcd.go index 1e593fc2..1dad5d6b 100644 --- a/etcd.go +++ b/etcd.go @@ -92,6 +92,7 @@ type AW struct { opts []etcd.OpOption callback func(*RE) error errCheck bool + skipConv bool // ask event to skip converger updates resp Resp cancelFunc func() // data } @@ -104,6 +105,7 @@ type RE struct { err error callback func(*RE) error errCheck bool // should we check the error of the callback? + skipConv bool // event skips converger updates retryHint bool // set to true for one event after a watcher failure retries uint // number of times we've retried on error } @@ -118,10 +120,11 @@ type KV struct { // GQ is a struct for the get queue type GQ struct { - path string - opts []etcd.OpOption - resp Resp - data map[string]string + path string + skipConv bool + opts []etcd.OpOption + resp Resp + data map[string]string } // DL is a struct for the delete queue @@ -337,11 +340,11 @@ func (obj *EmbdEtcd) Startup() error { // TODO: implement native etcd watcher method on member API changes path := fmt.Sprintf("/%s/nominated/", NS) - go obj.AddWatcher(path, obj.nominateCallback, true, etcd.WithPrefix()) // no block + go obj.AddWatcher(path, obj.nominateCallback, true, false, etcd.WithPrefix()) // no block // setup ideal cluster size watcher key := fmt.Sprintf("/%s/idealClusterSize", NS) - go obj.AddWatcher(key, obj.idealClusterSizeCallback, true) // no block + go obj.AddWatcher(key, obj.idealClusterSizeCallback, true, false) // no block // if we have no endpoints, it means we are bootstrapping... if !bootstrapping { @@ -367,7 +370,7 @@ func (obj *EmbdEtcd) Startup() error { if !obj.noServer { path := fmt.Sprintf("/%s/volunteers/", NS) - go obj.AddWatcher(path, obj.volunteerCallback, true, etcd.WithPrefix()) // no block + go obj.AddWatcher(path, obj.volunteerCallback, true, false, etcd.WithPrefix()) // no block } // if i am alone and will have to be a server... @@ -396,7 +399,7 @@ func (obj *EmbdEtcd) Startup() error { } } - go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, etcd.WithPrefix()) + go obj.AddWatcher(fmt.Sprintf("/%s/endpoints/", NS), obj.endpointCallback, true, false, etcd.WithPrefix()) if err := obj.Connect(false); err != nil { // don't exit from this Startup function until connected! return err @@ -683,12 +686,17 @@ func (obj *EmbdEtcd) CbLoop() { if e := obj.Connect(false); e != nil { return // fatal } + // we use this timer because when we ignore un-converge events and loop, + // we reset the ConvergedTimer case statement, ruining the timeout math! + cuuid.StartTimer() for { ctx := context.Background() // TODO: inherit as input argument? select { // etcd watcher event case re := <-obj.wevents: - cuuid.SetConverged(false) // activity! + if !re.skipConv { // if we want to count it... + cuuid.ResetTimer() // activity! + } if TRACE { log.Printf("Trace: Etcd: CbLoop: Event: StartLoop") } @@ -717,13 +725,10 @@ func (obj *EmbdEtcd) CbLoop() { log.Printf("Trace: Etcd: CbLoop: Event: FinishLoop") } - // converged timeout - case _ = <-cuuid.ConvergedTimer(): - cuuid.SetConverged(true) // converged! - // exit loop commit case <-obj.exitTimeout: log.Println("Etcd: Exiting callback loop!") + cuuid.StopTimer() // clean up nicely return } } @@ -737,12 +742,13 @@ func (obj *EmbdEtcd) Loop() { if e := obj.Connect(false); e != nil { return // fatal } + cuuid.StartTimer() for { ctx := context.Background() // TODO: inherit as input argument? // priority channel... select { case aw := <-obj.awq: - cuuid.SetConverged(false) // activity! + cuuid.ResetTimer() // activity! if TRACE { log.Printf("Trace: Etcd: Loop: PriorityAW: StartLoop") } @@ -758,7 +764,7 @@ func (obj *EmbdEtcd) Loop() { select { // add watcher case aw := <-obj.awq: - cuuid.SetConverged(false) // activity! + cuuid.ResetTimer() // activity! if TRACE { log.Printf("Trace: Etcd: Loop: AW: StartLoop") } @@ -769,7 +775,7 @@ func (obj *EmbdEtcd) Loop() { // set kv pair case kv := <-obj.setq: - cuuid.SetConverged(false) // activity! + cuuid.ResetTimer() // activity! if TRACE { log.Printf("Trace: Etcd: Loop: Set: StartLoop") } @@ -793,7 +799,9 @@ func (obj *EmbdEtcd) Loop() { // get value case gq := <-obj.getq: - cuuid.SetConverged(false) // activity! + if !gq.skipConv { + cuuid.ResetTimer() // activity! + } if TRACE { log.Printf("Trace: Etcd: Loop: Get: StartLoop") } @@ -818,7 +826,7 @@ func (obj *EmbdEtcd) Loop() { // delete value case dl := <-obj.delq: - cuuid.SetConverged(false) // activity! + cuuid.ResetTimer() // activity! if TRACE { log.Printf("Trace: Etcd: Loop: Delete: StartLoop") } @@ -843,7 +851,7 @@ func (obj *EmbdEtcd) Loop() { // run txn case tn := <-obj.txnq: - cuuid.SetConverged(false) // activity! + cuuid.ResetTimer() // activity! if TRACE { log.Printf("Trace: Etcd: Loop: Txn: StartLoop") } @@ -866,10 +874,6 @@ func (obj *EmbdEtcd) Loop() { log.Printf("Trace: Etcd: Loop: Txn: FinishLoop") } - // converged timeout - case _ = <-cuuid.ConvergedTimer(): - cuuid.SetConverged(true) // converged! - // exit loop signal case <-obj.exitchan: log.Println("Etcd: Exiting loop shortly...") @@ -882,6 +886,7 @@ func (obj *EmbdEtcd) Loop() { // exit loop commit case <-obj.exitTimeout: log.Println("Etcd: Exiting loop!") + cuuid.StopTimer() // clean up nicely return } } @@ -936,8 +941,15 @@ func (obj *EmbdEtcd) rawSet(ctx context.Context, kv *KV) error { // Get performs a get operation and waits for an ACK to continue func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, error) { + return obj.ComplexGet(path, false, opts...) +} + +// ComplexGet performs a get operation and waits for an ACK to continue. It can +// accept more arguments that are useful for the less common operations. +// TODO: perhaps a get should never cause an un-converge ? +func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error) { resp := NewResp() - gq := &GQ{path: path, opts: opts, resp: resp, data: nil} + gq := &GQ{path: path, skipConv: skipConv, opts: opts, resp: resp, data: nil} obj.getq <- gq // send if !resp.Wait() { // wait for ack/nack return nil, fmt.Errorf("Etcd: Get: Probably received an exit...") @@ -1022,9 +1034,9 @@ func (obj *EmbdEtcd) rawTxn(ctx context.Context, tn *TN) (*etcd.TxnResponse, err // AddWatcher queues up an add watcher request and returns a cancel function // Remember to add the etcd.WithPrefix() option if you want to watch recursively -func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, opts ...etcd.OpOption) (func(), error) { +func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error) { resp := NewResp() - awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, cancelFunc: nil, resp: resp} + awq := &AW{path: path, opts: opts, callback: callback, errCheck: errCheck, skipConv: skipConv, cancelFunc: nil, resp: resp} obj.awq <- awq // send if !resp.Wait() { // wait for ack/nack return nil, fmt.Errorf("Etcd: AddWatcher: Got NACK!") @@ -1103,7 +1115,7 @@ func (obj *EmbdEtcd) rawAddWatcher(ctx context.Context, aw *AW) (func(), error) // the response includes a list of grouped events, each // of which includes one Kv struct. Send these all in a // batched group so that they are processed together... - obj.wevents <- &RE{response: response, path: aw.path, err: err, callback: aw.callback, errCheck: aw.errCheck, retryHint: retry} // send event + obj.wevents <- &RE{response: response, path: aw.path, err: err, callback: aw.callback, errCheck: aw.errCheck, skipConv: aw.skipConv, retryHint: retry} // send event } }(cancelCtx) return cancelFunc, nil @@ -2022,7 +2034,7 @@ func EtcdWatch(obj *EmbdEtcd) chan bool { } return nil } - _, _ = obj.AddWatcher(path, callback, true, etcd.WithPrefix()) // no need to check errors + _, _ = obj.AddWatcher(path, callback, true, false, etcd.WithPrefix()) // no need to check errors return ch }