diff --git a/engine/world.go b/engine/world.go index 39280021..6963d4cf 100644 --- a/engine/world.go +++ b/engine/world.go @@ -119,6 +119,7 @@ type StrWorld interface { // a distributed database. // XXX: These API's are likely to change. // XXX: Add optional TTL's to these API's, maybe use WithTTL(...) type options. +// XXX: Add a WithStar(true) option to add in the * hostname matching. type ResWorld interface { // ResWatch returns a channel which produces a new value once on startup // as soon as it is successfully connected, and once for every time it diff --git a/etcd/client/resources/resources.go b/etcd/client/resources/resources.go index 0316efab..64dd8e59 100644 --- a/etcd/client/resources/resources.go +++ b/etcd/client/resources/resources.go @@ -53,11 +53,58 @@ const ( // collection prefixes and filters that we care about... // XXX: filter based on kind as well, we don't do that currently... See: // https://github.com/etcd-io/etcd/issues/19667 +// TODO: do the star (*) hostname matching catch-all if we have WithStar option. func WatchResources(ctx context.Context, client interfaces.Client, hostname, kind string) (chan error, error) { // key structure is $NS/exported/$hostname:to/$hostname:from/$kind/$name = $data - // TODO: support the star (*) hostname matching catch-all? + ctx, cancel := context.WithCancel(ctx) // wrap + path := fmt.Sprintf("%s/exported/%s/", ns, hostname) - return client.Watcher(ctx, path, etcd.WithPrefix()) + ch1, err := client.Watcher(ctx, path, etcd.WithPrefix()) + if err != nil { + cancel() + return nil, err + } + + star := fmt.Sprintf("%s/exported/%s/", ns, "*") + ch2, err := client.Watcher(ctx, star, etcd.WithPrefix()) + if err != nil { + cancel() + return nil, err + } + + // multiplex the two together + ch := make(chan error) + go func() { + defer cancel() + var e error + var ok bool + for { + select { + case e, ok = <-ch1: + if !ok { + ch1 = nil + if ch2 == nil { + return + } + } + case e, ok = <-ch2: + if !ok { + ch2 = nil + if ch1 == nil { + return + } + } + } + + select { + case ch <- e: + case <-ctx.Done(): + return + } + } + }() + + return ch, nil } // GetResources reads the resources sent to the input hostname, and also applies