engine, etcd: Watch on star pattern for all hostnames
We forgot to watch on star hostname matches.
This commit is contained in:
@@ -119,6 +119,7 @@ type StrWorld interface {
|
|||||||
// a distributed database.
|
// a distributed database.
|
||||||
// XXX: These API's are likely to change.
|
// XXX: These API's are likely to change.
|
||||||
// XXX: Add optional TTL's to these API's, maybe use WithTTL(...) type options.
|
// XXX: Add optional TTL's to these API's, maybe use WithTTL(...) type options.
|
||||||
|
// XXX: Add a WithStar(true) option to add in the * hostname matching.
|
||||||
type ResWorld interface {
|
type ResWorld interface {
|
||||||
// ResWatch returns a channel which produces a new value once on startup
|
// ResWatch returns a channel which produces a new value once on startup
|
||||||
// as soon as it is successfully connected, and once for every time it
|
// as soon as it is successfully connected, and once for every time it
|
||||||
|
|||||||
@@ -53,11 +53,58 @@ const (
|
|||||||
// collection prefixes and filters that we care about...
|
// collection prefixes and filters that we care about...
|
||||||
// XXX: filter based on kind as well, we don't do that currently... See:
|
// XXX: filter based on kind as well, we don't do that currently... See:
|
||||||
// https://github.com/etcd-io/etcd/issues/19667
|
// https://github.com/etcd-io/etcd/issues/19667
|
||||||
|
// TODO: do the star (*) hostname matching catch-all if we have WithStar option.
|
||||||
func WatchResources(ctx context.Context, client interfaces.Client, hostname, kind string) (chan error, error) {
|
func WatchResources(ctx context.Context, client interfaces.Client, hostname, kind string) (chan error, error) {
|
||||||
// key structure is $NS/exported/$hostname:to/$hostname:from/$kind/$name = $data
|
// key structure is $NS/exported/$hostname:to/$hostname:from/$kind/$name = $data
|
||||||
// TODO: support the star (*) hostname matching catch-all?
|
ctx, cancel := context.WithCancel(ctx) // wrap
|
||||||
|
|
||||||
path := fmt.Sprintf("%s/exported/%s/", ns, hostname)
|
path := fmt.Sprintf("%s/exported/%s/", ns, hostname)
|
||||||
return client.Watcher(ctx, path, etcd.WithPrefix())
|
ch1, err := client.Watcher(ctx, path, etcd.WithPrefix())
|
||||||
|
if err != nil {
|
||||||
|
cancel()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
star := fmt.Sprintf("%s/exported/%s/", ns, "*")
|
||||||
|
ch2, err := client.Watcher(ctx, star, etcd.WithPrefix())
|
||||||
|
if err != nil {
|
||||||
|
cancel()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// multiplex the two together
|
||||||
|
ch := make(chan error)
|
||||||
|
go func() {
|
||||||
|
defer cancel()
|
||||||
|
var e error
|
||||||
|
var ok bool
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case e, ok = <-ch1:
|
||||||
|
if !ok {
|
||||||
|
ch1 = nil
|
||||||
|
if ch2 == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case e, ok = <-ch2:
|
||||||
|
if !ok {
|
||||||
|
ch2 = nil
|
||||||
|
if ch1 == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case ch <- e:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return ch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetResources reads the resources sent to the input hostname, and also applies
|
// GetResources reads the resources sent to the input hostname, and also applies
|
||||||
|
|||||||
Reference in New Issue
Block a user