diff --git a/docs/resource-guide.md b/docs/resource-guide.md index ab6e6beb..1da00e2a 100644 --- a/docs/resource-guide.md +++ b/docs/resource-guide.md @@ -277,7 +277,7 @@ will likely find the state to now be correct. ### Watch ```golang -Watch() error +Watch(ctx context.Context) error ``` `Watch` is a main loop that runs and sends messages when it detects that the @@ -304,23 +304,25 @@ If the resource is activated in `polling` mode, the `Watch` method will not get executed. As a result, the resource must still work even if the main loop is not running. +You must make sure to cleanup any running code or goroutines before Watch exits. + #### Select The lifetime of most resources `Watch` method should be spent in an infinite loop that is bounded by a `select` call. The `select` call is the point where our method hands back control to the engine (and the kernel) so that we can sleep until something of interest wakes us up. In this loop we must wait until -we get a shutdown event from the engine via the `<-obj.init.Done` channel, which +we get a shutdown event from the engine via the `<-ctx.Done()` channel, which closes when we'd like to shut everything down. At this point you should cleanup, and let `Watch` close. #### Events -If the `<-obj.init.Done` channel closes, we should shutdown our resource. When -When we want to send an event, we use the `Event` helper function. This -automatically marks the resource state as `dirty`. If you're unsure, it's not -harmful to send the event. This will ultimately cause `CheckApply` to run. This -method can block if the resource is being paused. +If the `<-ctx.Done()` channel closes, we should shutdown our resource. When we +want to send an event, we use the `Event` helper function. This automatically +marks the resource state as `dirty`. If you're unsure, it's not harmful to send +the event. This will ultimately cause `CheckApply` to run. This method can block +if the resource is being paused. #### Startup @@ -347,7 +349,7 @@ sending out erroneous `Event` messages to keep things alive until it finishes. ```golang // Watch is the listener and main loop for this resource. -func (obj *FooRes) Watch() error { +func (obj *FooRes) Watch(ctx context.Context) error { // setup the Foo resource var err error if err, obj.foo = OpenFoo(); err != nil { @@ -371,7 +373,7 @@ func (obj *FooRes) Watch() error { case err := <-obj.foo.Errors: return err // will cause a retry or permanent failure - case <-obj.init.Done: // signal for shutdown request + case <-ctx.Done(): // signal for shutdown request return nil } @@ -553,11 +555,6 @@ ready to detect changes. Event sends an event notifying the engine of a possible state change. It is only called from within `Watch`. -### Done - -Done is a channel that closes when the engine wants us to shutdown. It is only -called from within `Watch`. - ### Refresh Refresh returns whether the resource received a notification. This flag can be diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 62dc6cfb..54e944f7 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -308,7 +308,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { case <-timer.C: // the wait is over return errDelayExpired // special - case <-obj.state[vertex].init.DoneCtx.Done(): + case <-obj.state[vertex].doneCtx.Done(): return nil } } @@ -319,12 +319,12 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { } } else if interval := res.MetaParams().Poll; interval > 0 { // poll instead of watching :( obj.state[vertex].cuid.StartTimer() - err = obj.state[vertex].poll(interval) + err = obj.state[vertex].poll(obj.state[vertex].doneCtx, interval) obj.state[vertex].cuid.StopTimer() // clean up nicely } else { obj.state[vertex].cuid.StartTimer() obj.Logf("Watch(%s)", vertex) - err = res.Watch() // run the watch normally + err = res.Watch(obj.state[vertex].doneCtx) // run the watch normally obj.Logf("Watch(%s): Exited(%+v)", vertex, err) obj.state[vertex].cuid.StopTimer() // clean up nicely } diff --git a/engine/graph/autogroup/autogroup_test.go b/engine/graph/autogroup/autogroup_test.go index 290e1fd0..96d1514f 100644 --- a/engine/graph/autogroup/autogroup_test.go +++ b/engine/graph/autogroup/autogroup_test.go @@ -20,6 +20,7 @@ package autogroup import ( + "context" "fmt" "reflect" "sort" @@ -66,7 +67,7 @@ func (obj *NoopResTest) Close() error { return nil } -func (obj *NoopResTest) Watch() error { +func (obj *NoopResTest) Watch(context.Context) error { return nil // not needed } diff --git a/engine/graph/state.go b/engine/graph/state.go index 6369456c..b4f0f1cd 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -165,7 +165,6 @@ func (obj *State) Init() error { // Watch: Running: obj.event, Event: obj.event, - DoneCtx: obj.doneCtx, // CheckApply: Refresh: func() bool { @@ -393,7 +392,7 @@ func (obj *State) setDirty() { } // poll is a replacement for Watch when the Poll metaparameter is used. -func (obj *State) poll(interval uint32) error { +func (obj *State) poll(ctx context.Context, interval uint32) error { // create a time.Ticker for the given interval ticker := time.NewTicker(time.Duration(interval) * time.Second) defer ticker.Stop() @@ -405,7 +404,7 @@ func (obj *State) poll(interval uint32) error { case <-ticker.C: // received the timer event obj.init.Logf("polling...") - case <-obj.init.DoneCtx.Done(): // signal for shutdown request + case <-ctx.Done(): // signal for shutdown request return nil } diff --git a/engine/resources.go b/engine/resources.go index 137fa4af..5026103f 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -102,11 +102,6 @@ type Init struct { // Event sends an event notifying the engine of a possible state change. Event func() - // DoneCtx returns a context that will cancel to signal to us that it's - // time for us to shutdown. - // TODO: this is temporary until Watch supports context directly. - DoneCtx context.Context - // Called from within CheckApply: // Refresh returns whether the resource received a notification. This @@ -200,8 +195,8 @@ type Res interface { // Watch is run by the engine to monitor for state changes. If it // detects any, it notifies the engine which will usually run CheckApply - // in response. - Watch() error + // in response. If the input context cancels, we must shutdown. + Watch(context.Context) error // CheckApply determines if the state of the resource is correct and if // asked to with the `apply` variable, applies the requested state. diff --git a/engine/resources/augeas.go b/engine/resources/augeas.go index 2d2a3430..10156378 100644 --- a/engine/resources/augeas.go +++ b/engine/resources/augeas.go @@ -20,6 +20,7 @@ package resources import ( + "context" "fmt" "os" "strings" @@ -125,7 +126,7 @@ func (obj *AugeasRes) Close() error { // Watch is the primary listener for this resource and it outputs events. This // was taken from the File resource. // FIXME: DRY - This is taken from the file resource -func (obj *AugeasRes) Watch() error { +func (obj *AugeasRes) Watch(ctx context.Context) error { var err error obj.recWatcher, err = recwatch.NewRecWatcher(obj.File, false) if err != nil { @@ -154,7 +155,7 @@ func (obj *AugeasRes) Watch() error { } send = true - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/aws_ec2.go b/engine/resources/aws_ec2.go index 813243dd..1bccde86 100644 --- a/engine/resources/aws_ec2.go +++ b/engine/resources/aws_ec2.go @@ -409,16 +409,16 @@ func (obj *AwsEc2Res) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *AwsEc2Res) Watch() error { +func (obj *AwsEc2Res) Watch(ctx context.Context) error { if obj.WatchListenAddr != "" { - return obj.snsWatch() + return obj.snsWatch(ctx) } - return obj.longpollWatch() + return obj.longpollWatch(ctx) } // longpollWatch uses the ec2 api's built in methods to watch ec2 resource // state. -func (obj *AwsEc2Res) longpollWatch() error { +func (obj *AwsEc2Res) longpollWatch(ctx context.Context) error { send := false // We tell the engine that we're running right away. This is not correct, @@ -426,7 +426,7 @@ func (obj *AwsEc2Res) longpollWatch() error { obj.init.Running() // when started, notify engine that we're running // cancellable context used for exiting cleanly - ctx, cancel := context.WithCancel(context.TODO()) + innerCtx, cancel := context.WithCancel(context.TODO()) // clean up when we're done defer obj.wg.Wait() @@ -461,7 +461,7 @@ func (obj *AwsEc2Res) longpollWatch() error { } // wait for the instance state to change - state, err := stateWaiter(ctx, instance, obj.client) + state, err := stateWaiter(innerCtx, instance, obj.client) if err != nil { select { case obj.awsChan <- &chanStruct{ @@ -502,7 +502,7 @@ func (obj *AwsEc2Res) longpollWatch() error { send = true } - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } @@ -518,7 +518,7 @@ func (obj *AwsEc2Res) longpollWatch() error { // Init() a CloudWatch rule is created along with a corresponding SNS topic that // it can publish to. snsWatch creates an http server which listens for messages // published to the topic and processes them accordingly. -func (obj *AwsEc2Res) snsWatch() error { +func (obj *AwsEc2Res) snsWatch(ctx context.Context) error { send := false defer obj.wg.Wait() // create the sns listener @@ -533,9 +533,9 @@ func (obj *AwsEc2Res) snsWatch() error { } // close the listener and shutdown the sns server when we're done defer func() { - ctx, cancel := context.WithTimeout(context.TODO(), SnsServerShutdownTimeout*time.Second) + innerCtx, cancel := context.WithTimeout(context.TODO(), SnsServerShutdownTimeout*time.Second) defer cancel() - if err := snsServer.Shutdown(ctx); err != nil { + if err := snsServer.Shutdown(innerCtx); err != nil { if err != context.Canceled { obj.init.Logf("error stopping sns endpoint: %s", err) return @@ -596,7 +596,7 @@ func (obj *AwsEc2Res) snsWatch() error { obj.init.Logf("State: %v", msg.event) send = true - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } @@ -718,7 +718,7 @@ func (obj *AwsEc2Res) CheckApply(apply bool) (bool, error) { } // context to cancel the waiter if it takes too long - ctx, cancel := context.WithTimeout(context.TODO(), waitTimeout*time.Second) + innerCtx, cancel := context.WithTimeout(context.TODO(), waitTimeout*time.Second) defer cancel() // wait until the state converges @@ -727,11 +727,11 @@ func (obj *AwsEc2Res) CheckApply(apply bool) (bool, error) { } switch obj.State { case ec2.InstanceStateNameRunning: - err = obj.client.WaitUntilInstanceRunningWithContext(ctx, waitInput) + err = obj.client.WaitUntilInstanceRunningWithContext(innerCtx, waitInput) case ec2.InstanceStateNameStopped: - err = obj.client.WaitUntilInstanceStoppedWithContext(ctx, waitInput) + err = obj.client.WaitUntilInstanceStoppedWithContext(innerCtx, waitInput) case ec2.InstanceStateNameTerminated: - err = obj.client.WaitUntilInstanceTerminatedWithContext(ctx, waitInput) + err = obj.client.WaitUntilInstanceTerminatedWithContext(innerCtx, waitInput) default: return false, errwrap.Wrapf(err, "unrecognized instance state") } diff --git a/engine/resources/config_etcd.go b/engine/resources/config_etcd.go index 7ace399b..4c06bd01 100644 --- a/engine/resources/config_etcd.go +++ b/engine/resources/config_etcd.go @@ -95,11 +95,11 @@ func (obj *ConfigEtcdRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *ConfigEtcdRes) Watch() error { +func (obj *ConfigEtcdRes) Watch(ctx context.Context) error { obj.wg.Add(1) defer obj.wg.Done() // FIXME: add timeout to context - ctx, cancel := context.WithCancel(obj.init.DoneCtx) + ctx, cancel := context.WithCancel(ctx) defer cancel() ch, err := obj.init.World.IdealClusterSizeWatch(util.CtxWithWg(ctx, obj.wg)) if err != nil { @@ -120,7 +120,7 @@ Loop: } // pass through and send an event - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown } obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/consul_kv.go b/engine/resources/consul_kv.go index 19d2decb..ed6011db 100644 --- a/engine/resources/consul_kv.go +++ b/engine/resources/consul_kv.go @@ -117,7 +117,7 @@ func (obj *ConsulKVRes) Close() error { } // Watch is the listener and main loop for this resource and it outputs events. -func (obj *ConsulKVRes) Watch() error { +func (obj *ConsulKVRes) Watch(ctx context.Context) error { wg := &sync.WaitGroup{} defer wg.Wait() @@ -132,9 +132,9 @@ func (obj *ConsulKVRes) Watch() error { defer wg.Done() opts := &api.QueryOptions{RequireConsistent: true} - ctx, cancel := util.ContextWithCloser(context.Background(), exit) + innerCtx, cancel := util.ContextWithCloser(context.Background(), exit) defer cancel() - opts = opts.WithContext(ctx) + opts = opts.WithContext(innerCtx) for { _, meta, err := kv.Get(obj.key, opts) @@ -162,10 +162,10 @@ func (obj *ConsulKVRes) Watch() error { // Unexpected situation, bug in consul API... select { case ch <- fmt.Errorf("unexpected behaviour in Consul API"): - case <-obj.init.DoneCtx.Done(): // signal for shutdown request + case <-ctx.Done(): // signal for shutdown request } - case <-obj.init.DoneCtx.Done(): // signal for shutdown request + case <-ctx.Done(): // signal for shutdown request } return } @@ -186,7 +186,7 @@ func (obj *ConsulKVRes) Watch() error { } obj.init.Event() - case <-obj.init.DoneCtx.Done(): // signal for shutdown request + case <-ctx.Done(): // signal for shutdown request return nil } } diff --git a/engine/resources/cron.go b/engine/resources/cron.go index f189f845..6bc49798 100644 --- a/engine/resources/cron.go +++ b/engine/resources/cron.go @@ -221,7 +221,7 @@ func (obj *CronRes) Close() error { } // Watch for state changes and sends a message to the bus if there is a change. -func (obj *CronRes) Watch() error { +func (obj *CronRes) Watch(ctx context.Context) error { var bus *dbus.Conn var err error @@ -296,7 +296,7 @@ func (obj *CronRes) Watch() error { } send = true - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } // do all our event sending all together to avoid duplicate msgs diff --git a/engine/resources/dhcp.go b/engine/resources/dhcp.go index d87a3b07..e0971bcd 100644 --- a/engine/resources/dhcp.go +++ b/engine/resources/dhcp.go @@ -18,6 +18,7 @@ package resources import ( + "context" "fmt" "net" "net/url" @@ -385,7 +386,7 @@ func (obj *DHCPServerRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *DHCPServerRes) Watch() error { +func (obj *DHCPServerRes) Watch(ctx context.Context) error { addr, err := net.ResolveUDPAddr("udp", obj.getAddress()) // *net.UDPAddr if err != nil { return errwrap.Wrapf(err, "could not resolve address") @@ -461,7 +462,7 @@ func (obj *DHCPServerRes) Watch() error { case <-closeSignal: // something shut us down early return closeError - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } @@ -1052,11 +1053,11 @@ func (obj *DHCPHostRes) Close() error { // Watch is the primary listener for this resource and it outputs events. This // particular one does absolutely nothing but block until we've received a done // signal. -func (obj *DHCPHostRes) Watch() error { +func (obj *DHCPHostRes) Watch(ctx context.Context) error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/docker_container.go b/engine/resources/docker_container.go index a0b65154..3d20ccd1 100644 --- a/engine/resources/docker_container.go +++ b/engine/resources/docker_container.go @@ -170,11 +170,11 @@ func (obj *DockerContainerRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *DockerContainerRes) Watch() error { - ctx, cancel := context.WithCancel(context.Background()) +func (obj *DockerContainerRes) Watch(ctx context.Context) error { + innerCtx, cancel := context.WithCancel(context.Background()) defer cancel() - eventChan, errChan := obj.client.Events(ctx, types.EventsOptions{}) + eventChan, errChan := obj.client.Events(innerCtx, types.EventsOptions{}) obj.init.Running() // when started, notify engine that we're running @@ -196,7 +196,7 @@ func (obj *DockerContainerRes) Watch() error { } return err - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/docker_image.go b/engine/resources/docker_image.go index a6e00dfd..3e001163 100644 --- a/engine/resources/docker_image.go +++ b/engine/resources/docker_image.go @@ -131,11 +131,11 @@ func (obj *DockerImageRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *DockerImageRes) Watch() error { - ctx, cancel := context.WithCancel(context.Background()) +func (obj *DockerImageRes) Watch(ctx context.Context) error { + innerCtx, cancel := context.WithCancel(context.Background()) defer cancel() - eventChan, errChan := obj.client.Events(ctx, types.EventsOptions{}) + eventChan, errChan := obj.client.Events(innerCtx, types.EventsOptions{}) // notify engine that we're running obj.init.Running() @@ -158,7 +158,7 @@ func (obj *DockerImageRes) Watch() error { } return err - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/exec.go b/engine/resources/exec.go index 560fa436..152ff3ae 100644 --- a/engine/resources/exec.go +++ b/engine/resources/exec.go @@ -167,7 +167,7 @@ func (obj *ExecRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *ExecRes) Watch() error { +func (obj *ExecRes) Watch(ctx context.Context) error { ioChan := make(chan *cmdOutput) defer obj.wg.Wait() @@ -187,9 +187,9 @@ func (obj *ExecRes) Watch() error { cmdArgs = []string{"-c", obj.WatchCmd} } - ctx, cancel := context.WithCancel(context.Background()) + innerCtx, cancel := context.WithCancel(context.Background()) defer cancel() - cmd := exec.CommandContext(ctx, cmdName, cmdArgs...) + cmd := exec.CommandContext(innerCtx, cmdName, cmdArgs...) cmd.Dir = obj.WatchCwd // run program in pwd if "" // ignore signals sent to parent process (we're in our own group) cmd.SysProcAttr = &syscall.SysProcAttr{ @@ -203,7 +203,7 @@ func (obj *ExecRes) Watch() error { return errwrap.Wrapf(err, "error while setting credential") } - if ioChan, err = obj.cmdOutputRunner(ctx, cmd); err != nil { + if ioChan, err = obj.cmdOutputRunner(innerCtx, cmd); err != nil { return errwrap.Wrapf(err, "error starting WatchCmd") } } @@ -252,7 +252,7 @@ func (obj *ExecRes) Watch() error { send = true } - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/file.go b/engine/resources/file.go index 5f97272e..ef434a99 100644 --- a/engine/resources/file.go +++ b/engine/resources/file.go @@ -19,6 +19,7 @@ package resources import ( "bytes" + "context" "crypto/sha256" "encoding/hex" "fmt" @@ -359,7 +360,7 @@ func (obj *FileRes) Close() error { // probably important to write some test cases first! If the Watch returns an // error, it means that something has gone wrong, and it must be restarted. On a // clean exit it returns nil. -func (obj *FileRes) Watch() error { +func (obj *FileRes) Watch(ctx context.Context) error { // TODO: chan *recwatch.Event instead? inputEvents := make(chan recwatch.Event) defer close(inputEvents) @@ -497,7 +498,7 @@ func (obj *FileRes) Watch() error { } send = true - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/group.go b/engine/resources/group.go index 5332d9a0..0c5fae40 100644 --- a/engine/resources/group.go +++ b/engine/resources/group.go @@ -18,6 +18,7 @@ package resources import ( + "context" "fmt" "io/ioutil" "os/exec" @@ -76,7 +77,7 @@ func (obj *GroupRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *GroupRes) Watch() error { +func (obj *GroupRes) Watch(ctx context.Context) error { var err error obj.recWatcher, err = recwatch.NewRecWatcher(groupFile, false) if err != nil { @@ -105,7 +106,7 @@ func (obj *GroupRes) Watch() error { } send = true - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/hetzner_vm.go b/engine/resources/hetzner_vm.go index fd6f4acd..f121da45 100644 --- a/engine/resources/hetzner_vm.go +++ b/engine/resources/hetzner_vm.go @@ -371,7 +371,7 @@ func (obj *HetznerVMRes) Close() error { // Watch is not implemented for this resource, since the Hetzner API does not // provide any event streams. Instead, always use polling. // NOTE: HetznerPollLimit sets an explicit minimum on the polling interval. -func (obj *HetznerVMRes) Watch() error { +func (obj *HetznerVMRes) Watch(context.Context) error { return fmt.Errorf("invalid Watch call: requires poll metaparam") } diff --git a/engine/resources/hostname.go b/engine/resources/hostname.go index 8176132a..05d72750 100644 --- a/engine/resources/hostname.go +++ b/engine/resources/hostname.go @@ -18,6 +18,7 @@ package resources import ( + "context" "errors" "fmt" @@ -106,7 +107,7 @@ func (obj *HostnameRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *HostnameRes) Watch() error { +func (obj *HostnameRes) Watch(ctx context.Context) error { // if we share the bus with others, we will get each others messages!! bus, err := util.SystemBusPrivateUsable() // don't share the bus connection! if err != nil { @@ -135,7 +136,7 @@ func (obj *HostnameRes) Watch() error { case <-signals: send = true - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/http.go b/engine/resources/http.go index 668e18b2..07f845cc 100644 --- a/engine/resources/http.go +++ b/engine/resources/http.go @@ -230,7 +230,7 @@ func (obj *HTTPServerRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *HTTPServerRes) Watch() error { +func (obj *HTTPServerRes) Watch(ctx context.Context) error { // TODO: I think we could replace all this with: //obj.conn, err := net.Listen("tcp", obj.getAddress()) // ...but what is the advantage? @@ -304,13 +304,13 @@ func (obj *HTTPServerRes) Watch() error { // exit and waits instead for Shutdown to return. defer func() { defer close(shutdownChan) // signal that shutdown is finished - ctx := context.Background() + innerCtx := context.Background() if i := obj.getShutdownTimeout(); i != nil && *i > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(*i)*time.Second) + innerCtx, cancel = context.WithTimeout(innerCtx, time.Duration(*i)*time.Second) defer cancel() } - err := obj.server.Shutdown(ctx) // shutdown gracefully + err := obj.server.Shutdown(innerCtx) // shutdown gracefully if err == context.DeadlineExceeded { // TODO: should we bubble up the error from Close? // TODO: do we need a mutex around this Close? @@ -335,7 +335,7 @@ func (obj *HTTPServerRes) Watch() error { case <-closeSignal: // something shut us down early return closeError - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } @@ -721,11 +721,11 @@ func (obj *HTTPFileRes) Close() error { // Watch is the primary listener for this resource and it outputs events. This // particular one does absolutely nothing but block until we've received a done // signal. -func (obj *HTTPFileRes) Watch() error { +func (obj *HTTPFileRes) Watch(ctx context.Context) error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/kv.go b/engine/resources/kv.go index 8810ee3b..51696909 100644 --- a/engine/resources/kv.go +++ b/engine/resources/kv.go @@ -129,10 +129,8 @@ func (obj *KVRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *KVRes) Watch() error { - // FIXME: add timeout to context - // The obj.init.DoneCtx context is closed by the engine to signal shutdown. - ctx, cancel := context.WithCancel(obj.init.DoneCtx) +func (obj *KVRes) Watch(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) defer cancel() ch, err := obj.init.World.StrMapWatch(ctx, obj.getKey()) // get possible events! @@ -158,7 +156,7 @@ func (obj *KVRes) Watch() error { } send = true - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/mount.go b/engine/resources/mount.go index a37bf397..2625868a 100644 --- a/engine/resources/mount.go +++ b/engine/resources/mount.go @@ -192,7 +192,7 @@ func (obj *MountRes) Close() error { // Watch listens for signals from the mount unit associated with the resource. // It also watch for changes to /etc/fstab, where mounts are defined. -func (obj *MountRes) Watch() error { +func (obj *MountRes) Watch(ctx context.Context) error { // make sure systemd is running if !systemdUtil.IsRunningSystemd() { return fmt.Errorf("systemd is not running") @@ -266,7 +266,7 @@ func (obj *MountRes) Watch() error { send = true - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/msg.go b/engine/resources/msg.go index 895a6b7e..7488723b 100644 --- a/engine/resources/msg.go +++ b/engine/resources/msg.go @@ -18,6 +18,7 @@ package resources import ( + "context" "fmt" "regexp" "strings" @@ -93,13 +94,13 @@ func (obj *MsgRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *MsgRes) Watch() error { +func (obj *MsgRes) Watch(ctx context.Context) error { obj.init.Running() // when started, notify engine that we're running //var send = false // send event? for { select { - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/net.go b/engine/resources/net.go index 60d7996c..a9e730b2 100644 --- a/engine/resources/net.go +++ b/engine/resources/net.go @@ -21,6 +21,7 @@ package resources import ( "bytes" + "context" "fmt" "io/ioutil" "net" @@ -202,7 +203,7 @@ func (obj *NetRes) Close() error { // Watch listens for events from the specified interface via a netlink socket. // TODO: currently gets events from ALL interfaces, would be nice to reject // events from other interfaces. -func (obj *NetRes) Watch() error { +func (obj *NetRes) Watch(ctx context.Context) error { // create a netlink socket for receiving network interface events conn, err := socketset.NewSocketSet(rtmGrps, obj.socketFile, unix.NETLINK_ROUTE) if err != nil { @@ -299,7 +300,7 @@ func (obj *NetRes) Watch() error { send = true - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/noop.go b/engine/resources/noop.go index 1c834eef..3531b33d 100644 --- a/engine/resources/noop.go +++ b/engine/resources/noop.go @@ -18,6 +18,7 @@ package resources import ( + "context" "fmt" "github.com/purpleidea/mgmt/engine" @@ -62,11 +63,11 @@ func (obj *NoopRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *NoopRes) Watch() error { +func (obj *NoopRes) Watch(ctx context.Context) error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/nspawn.go b/engine/resources/nspawn.go index d7cc600c..80e54846 100644 --- a/engine/resources/nspawn.go +++ b/engine/resources/nspawn.go @@ -18,6 +18,7 @@ package resources import ( + "context" "errors" "fmt" "strconv" @@ -140,7 +141,7 @@ func (obj *NspawnRes) Close() error { } // Watch for state changes and sends a message to the bus if there is a change. -func (obj *NspawnRes) Watch() error { +func (obj *NspawnRes) Watch(ctx context.Context) error { // this resource depends on systemd to ensure that it's running if !systemdUtil.IsRunningSystemd() { return fmt.Errorf("systemd is not running") @@ -184,7 +185,7 @@ func (obj *NspawnRes) Watch() error { send = true } - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/password.go b/engine/resources/password.go index 146f203b..94891e65 100644 --- a/engine/resources/password.go +++ b/engine/resources/password.go @@ -18,6 +18,7 @@ package resources import ( + "context" "crypto/rand" "fmt" "io/ioutil" @@ -173,7 +174,7 @@ Loop: } // Watch is the primary listener for this resource and it outputs events. -func (obj *PasswordRes) Watch() error { +func (obj *PasswordRes) Watch(ctx context.Context) error { var err error obj.recWatcher, err = recwatch.NewRecWatcher(obj.path, false) if err != nil { @@ -196,7 +197,7 @@ func (obj *PasswordRes) Watch() error { } send = true - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/pippet.go b/engine/resources/pippet.go index 79bcc193..205b1870 100644 --- a/engine/resources/pippet.go +++ b/engine/resources/pippet.go @@ -18,6 +18,7 @@ package resources import ( + "context" "encoding/json" "fmt" "io" @@ -92,11 +93,11 @@ func (obj *PippetRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *PippetRes) Watch() error { +func (obj *PippetRes) Watch(ctx context.Context) error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/pkg.go b/engine/resources/pkg.go index c55f5176..b43e81ad 100644 --- a/engine/resources/pkg.go +++ b/engine/resources/pkg.go @@ -18,6 +18,7 @@ package resources import ( + "context" "fmt" "path" "strings" @@ -104,7 +105,7 @@ func (obj *PkgRes) Close() error { // uses the PackageKit UpdatesChanged signal to watch for changes. // TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/110 -func (obj *PkgRes) Watch() error { +func (obj *PkgRes) Watch(ctx context.Context) error { bus := packagekit.NewBus() if bus == nil { return fmt.Errorf("can't connect to PackageKit bus") @@ -143,7 +144,7 @@ func (obj *PkgRes) Watch() error { send = true - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/print.go b/engine/resources/print.go index 306da24f..fb9260b6 100644 --- a/engine/resources/print.go +++ b/engine/resources/print.go @@ -18,6 +18,7 @@ package resources import ( + "context" "fmt" "github.com/purpleidea/mgmt/engine" @@ -69,11 +70,11 @@ func (obj *PrintRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *PrintRes) Watch() error { +func (obj *PrintRes) Watch(ctx context.Context) error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/resources_test.go b/engine/resources/resources_test.go index 944a3431..5c112c36 100644 --- a/engine/resources/resources_test.go +++ b/engine/resources/resources_test.go @@ -510,9 +510,8 @@ func TestResources1(t *testing.T) { }, // Watch listens on this for close/pause events. - DoneCtx: doneCtx, - Debug: debug, - Logf: logf, + Debug: debug, + Logf: logf, // unused Send: func(st interface{}) error { @@ -591,7 +590,7 @@ func TestResources1(t *testing.T) { go func() { defer wg.Done() t.Logf("test #%d: running Watch", index) - if err := res.Watch(); err != nil { + if err := res.Watch(doneCtx); err != nil { t.Errorf("test #%d: FAIL", index) t.Errorf("test #%d: Watch failed: %s", index, err.Error()) } diff --git a/engine/resources/svc.go b/engine/resources/svc.go index 7c47995b..d925c1c1 100644 --- a/engine/resources/svc.go +++ b/engine/resources/svc.go @@ -20,6 +20,7 @@ package resources import ( + "context" "fmt" "os/user" "path" @@ -81,7 +82,7 @@ func (obj *SvcRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *SvcRes) Watch() error { +func (obj *SvcRes) Watch(ctx context.Context) error { // obj.Name: svc name if !systemdUtil.IsRunningSystemd() { return fmt.Errorf("systemd is not running") @@ -172,7 +173,7 @@ func (obj *SvcRes) Watch() error { // loop so that we can see the changed invalid signal obj.init.Logf("daemon reload") - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } } else { @@ -215,7 +216,7 @@ func (obj *SvcRes) Watch() error { case err := <-subErrors: return errwrap.Wrapf(err, "unknown %s error", obj) - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } } diff --git a/engine/resources/test.go b/engine/resources/test.go index 109767cc..493f4ef9 100644 --- a/engine/resources/test.go +++ b/engine/resources/test.go @@ -18,6 +18,7 @@ package resources import ( + "context" "fmt" "reflect" @@ -124,11 +125,11 @@ func (obj *TestRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *TestRes) Watch() error { +func (obj *TestRes) Watch(ctx context.Context) error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/tftp.go b/engine/resources/tftp.go index 39772240..3a4721bd 100644 --- a/engine/resources/tftp.go +++ b/engine/resources/tftp.go @@ -19,6 +19,7 @@ package resources import ( "bytes" + "context" "fmt" "io" "net" @@ -139,7 +140,7 @@ func (obj *TFTPServerRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *TFTPServerRes) Watch() error { +func (obj *TFTPServerRes) Watch(ctx context.Context) error { addr, err := net.ResolveUDPAddr("udp", obj.getAddress()) if err != nil { return errwrap.Wrapf(err, "could not resolve address") @@ -200,7 +201,7 @@ func (obj *TFTPServerRes) Watch() error { case <-closeSignal: // something shut us down early return closeError - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } @@ -547,11 +548,11 @@ func (obj *TFTPFileRes) Close() error { // Watch is the primary listener for this resource and it outputs events. This // particular one does absolutely nothing but block until we've received a done // signal. -func (obj *TFTPFileRes) Watch() error { +func (obj *TFTPFileRes) Watch(ctx context.Context) error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/timer.go b/engine/resources/timer.go index e8dfb26c..69435806 100644 --- a/engine/resources/timer.go +++ b/engine/resources/timer.go @@ -18,6 +18,7 @@ package resources import ( + "context" "fmt" "time" @@ -70,7 +71,7 @@ func (obj *TimerRes) newTicker() *time.Ticker { } // Watch is the primary listener for this resource and it outputs events. -func (obj *TimerRes) Watch() error { +func (obj *TimerRes) Watch(ctx context.Context) error { // create a time.Ticker for the given interval obj.ticker = obj.newTicker() defer obj.ticker.Stop() @@ -84,7 +85,7 @@ func (obj *TimerRes) Watch() error { send = true obj.init.Logf("received tick") - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/user.go b/engine/resources/user.go index e3077fc4..d99052cd 100644 --- a/engine/resources/user.go +++ b/engine/resources/user.go @@ -18,6 +18,7 @@ package resources import ( + "context" "fmt" "io/ioutil" "os/exec" @@ -110,7 +111,7 @@ func (obj *UserRes) Close() error { } // Watch is the primary listener for this resource and it outputs events. -func (obj *UserRes) Watch() error { +func (obj *UserRes) Watch(ctx context.Context) error { var err error obj.recWatcher, err = recwatch.NewRecWatcher(passwdFile, false) if err != nil { @@ -139,7 +140,7 @@ func (obj *UserRes) Watch() error { } send = true - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/virt.go b/engine/resources/virt.go index 876f42ef..9f3a8274 100644 --- a/engine/resources/virt.go +++ b/engine/resources/virt.go @@ -20,6 +20,7 @@ package resources import ( + "context" "fmt" "math/rand" "net/url" @@ -294,7 +295,7 @@ func (obj *VirtRes) connect() (conn *libvirt.Connect, err error) { } // Watch is the primary listener for this resource and it outputs events. -func (obj *VirtRes) Watch() error { +func (obj *VirtRes) Watch(ctx context.Context) error { // FIXME: how will this work if we're polling? wg := &sync.WaitGroup{} defer wg.Wait() // wait until everyone has exited before we exit! @@ -441,7 +442,7 @@ func (obj *VirtRes) Watch() error { case err := <-errorChan: return errwrap.Wrapf(err, "unknown libvirt error") - case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown + case <-ctx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/util/util_test.go b/engine/util/util_test.go index 8dd6ae41..e39553e5 100644 --- a/engine/util/util_test.go +++ b/engine/util/util_test.go @@ -20,6 +20,7 @@ package util import ( + "context" "os/user" "reflect" "strconv" @@ -201,7 +202,7 @@ func (t *testEngineRes) String() string { return "test-string" } func (t *testEngineRes) Validate() error { return nil } -func (t *testEngineRes) Watch() error { return nil } +func (t *testEngineRes) Watch(context.Context) error { return nil } func TestStructKindToFieldNameTypeMap(t *testing.T) { k := "test-kind"