etcd: Switch to buffered channel to remove duplicates
Since we don't return the actual values and instead only tell about events (which leaves the `Get` of the value as a second operation) then we don't have to use a channel with backpressure since all the events are identical.
This commit is contained in:
@@ -33,7 +33,7 @@ import (
|
|||||||
func WatchStr(obj *EmbdEtcd, key string) chan error {
|
func WatchStr(obj *EmbdEtcd, key string) chan error {
|
||||||
// new key structure is /$NS/strings/$key/$hostname = $data
|
// new key structure is /$NS/strings/$key/$hostname = $data
|
||||||
path := fmt.Sprintf("/%s/strings/%s", NS, key)
|
path := fmt.Sprintf("/%s/strings/%s", NS, key)
|
||||||
ch := make(chan error)
|
ch := make(chan error, 1)
|
||||||
// FIXME: fix our API so that we get a close event on shutdown.
|
// FIXME: fix our API so that we get a close event on shutdown.
|
||||||
callback := func(re *RE) error {
|
callback := func(re *RE) error {
|
||||||
// TODO: is this even needed? it used to happen on conn errors
|
// TODO: is this even needed? it used to happen on conn errors
|
||||||
@@ -41,7 +41,9 @@ func WatchStr(obj *EmbdEtcd, key string) chan error {
|
|||||||
if re == nil || re.response.Canceled {
|
if re == nil || re.response.Canceled {
|
||||||
return fmt.Errorf("watch is empty") // will cause a CtxError+retry
|
return fmt.Errorf("watch is empty") // will cause a CtxError+retry
|
||||||
}
|
}
|
||||||
ch <- nil // event
|
if len(ch) == 0 { // send event only if one isn't pending
|
||||||
|
ch <- nil // event
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, _ = obj.AddWatcher(path, callback, true, false, etcd.WithPrefix()) // no need to check errors
|
_, _ = obj.AddWatcher(path, callback, true, false, etcd.WithPrefix()) // no need to check errors
|
||||||
|
|||||||
Reference in New Issue
Block a user