From 53a878bf61661501d1d0d90ba9b3f6234ac1c90e Mon Sep 17 00:00:00 2001 From: James Shubin Date: Mon, 7 Aug 2023 19:44:41 -0400 Subject: [PATCH] engine: resources, graph: Change the done channel into a ctx This is part one of porting Watch to context. --- engine/graph/actions.go | 18 +++++++++--------- engine/graph/engine.go | 2 +- engine/graph/state.go | 16 ++++++++++------ engine/resources.go | 8 +++++--- engine/resources/augeas.go | 2 +- engine/resources/aws_ec2.go | 4 ++-- engine/resources/config_etcd.go | 5 ++--- engine/resources/consul_kv.go | 6 +++--- engine/resources/cron.go | 2 +- engine/resources/dhcp.go | 6 +++--- engine/resources/docker_container.go | 2 +- engine/resources/docker_image.go | 2 +- engine/resources/exec.go | 2 +- engine/resources/file.go | 2 +- engine/resources/group.go | 2 +- engine/resources/hostname.go | 2 +- engine/resources/http.go | 4 ++-- engine/resources/kv.go | 7 +++---- engine/resources/mount.go | 2 +- engine/resources/msg.go | 2 +- engine/resources/net.go | 2 +- engine/resources/noop.go | 2 +- engine/resources/nspawn.go | 2 +- engine/resources/password.go | 2 +- engine/resources/pippet.go | 2 +- engine/resources/pkg.go | 2 +- engine/resources/print.go | 2 +- engine/resources/resources_test.go | 13 ++++++++----- engine/resources/svc.go | 4 ++-- engine/resources/test.go | 2 +- engine/resources/tftp.go | 4 ++-- engine/resources/timer.go | 2 +- engine/resources/user.go | 2 +- engine/resources/virt.go | 2 +- 34 files changed, 73 insertions(+), 66 deletions(-) diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 53788ed2..62dc6cfb 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -272,7 +272,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { defer close(obj.state[vertex].eventsChan) // we close this on behalf of res // This is a close reverse-multiplexer. If any of the channels - // close, then it will cause the doneChan to close. That way, + // close, then it will cause the doneCtx to cancel. That way, // multiple different folks can send a close signal, without // every worrying about duplicate channel close panics. obj.state[vertex].wg.Add(1) @@ -289,7 +289,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { } // the main "done" signal gets activated here! - close(obj.state[vertex].doneChan) + obj.state[vertex].doneCtxCancel() // cancels doneCtx }() var err error @@ -308,7 +308,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { case <-timer.C: // the wait is over return errDelayExpired // special - case <-obj.state[vertex].init.Done: + case <-obj.state[vertex].init.DoneCtx.Done(): return nil } } @@ -359,7 +359,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { // If this exits cleanly, we must unblock the reverse-multiplexer. // I think this additional close is unnecessary, but it's not harmful. - defer close(obj.state[vertex].eventsDone) // causes doneChan to close + defer close(obj.state[vertex].eventsDone) // causes doneCtx to cancel limiter := rate.NewLimiter(res.MetaParams().Limit, res.MetaParams().Burst) var reserv *rate.Reservation var reterr error @@ -376,7 +376,7 @@ Loop: // we then save so we can return it to the caller of us. if err != nil { failed = true - close(obj.state[vertex].watchDone) // causes doneChan to close + close(obj.state[vertex].watchDone) // causes doneCtx to cancel reterr = errwrap.Append(reterr, err) // permanent failure continue } @@ -411,7 +411,7 @@ Loop: // pause if one was requested... select { case <-obj.state[vertex].pauseSignal: // channel closes - // NOTE: If we allowed a doneChan below to let us out + // NOTE: If we allowed a doneCtx below to let us out // of the resumeSignal wait, then we could loop around // and run this again, causing a panic. Instead of this // being made safe with a sync.Once, we instead run a @@ -457,7 +457,7 @@ Loop: } if e != nil { failed = true - close(obj.state[vertex].limitDone) // causes doneChan to close + close(obj.state[vertex].limitDone) // causes doneCtx to cancel reterr = errwrap.Append(reterr, e) // permanent failure break LimitWait } @@ -497,7 +497,7 @@ Loop: } if e != nil { failed = true - close(obj.state[vertex].limitDone) // causes doneChan to close + close(obj.state[vertex].limitDone) // causes doneCtx to cancel reterr = errwrap.Append(reterr, e) // permanent failure break RetryWait } @@ -545,7 +545,7 @@ Loop: // this dies. If Process fails permanently, we ask it // to exit right here... (It happens when we loop...) failed = true - close(obj.state[vertex].processDone) // causes doneChan to close + close(obj.state[vertex].processDone) // causes doneCtx to cancel reterr = errwrap.Append(reterr, err) // permanent failure continue diff --git a/engine/graph/engine.go b/engine/graph/engine.go index 66f2fd5f..6f288de9 100644 --- a/engine/graph/engine.go +++ b/engine/graph/engine.go @@ -251,7 +251,7 @@ func (obj *Engine) Commit() error { free := []func() error{} // functions to run after graphsync to reset... vertexRemoveFn := func(vertex pgraph.Vertex) error { // wait for exit before starting new graph! - close(obj.state[vertex].removeDone) // causes doneChan to close + close(obj.state[vertex].removeDone) // causes doneCtx to cancel obj.state[vertex].Resume() // unblock from resume obj.waits[vertex].Wait() // sync diff --git a/engine/graph/state.go b/engine/graph/state.go index fd9f4b6c..6369456c 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -18,6 +18,7 @@ package graph import ( + "context" "fmt" "sync" "time" @@ -60,9 +61,12 @@ type State struct { isStateOK bool // is state OK or do we need to run CheckApply ? workerErr error // did the Worker error? - // doneChan closes when Watch should shut down. When any of the + // doneCtx is cancelled when Watch should shut down. When any of the // following channels close, it causes this to close. - doneChan chan struct{} + doneCtx context.Context + + // doneCtxCancel is the cancel function for doneCtx. + doneCtxCancel func() // processDone is closed when the Process/CheckApply function fails // permanently, and wants to cause Watch to exit. @@ -131,7 +135,7 @@ func (obj *State) Init() error { return fmt.Errorf("the Logf function is missing") } - obj.doneChan = make(chan struct{}) + obj.doneCtx, obj.doneCtxCancel = context.WithCancel(context.Background()) obj.processDone = make(chan struct{}) obj.watchDone = make(chan struct{}) @@ -161,7 +165,7 @@ func (obj *State) Init() error { // Watch: Running: obj.event, Event: obj.event, - Done: obj.doneChan, + DoneCtx: obj.doneCtx, // CheckApply: Refresh: func() bool { @@ -338,7 +342,7 @@ func (obj *State) Pause() error { select { case <-obj.pausedAck.Wait(): // we got it! // we're paused - case <-obj.doneChan: + case <-obj.doneCtx.Done(): return engine.ErrClosed } obj.paused = true @@ -401,7 +405,7 @@ func (obj *State) poll(interval uint32) error { case <-ticker.C: // received the timer event obj.init.Logf("polling...") - case <-obj.init.Done: // signal for shutdown request + case <-obj.init.DoneCtx.Done(): // signal for shutdown request return nil } diff --git a/engine/resources.go b/engine/resources.go index c76a16e7..137fa4af 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -18,6 +18,7 @@ package engine import ( + "context" "encoding/gob" "fmt" @@ -101,9 +102,10 @@ type Init struct { // Event sends an event notifying the engine of a possible state change. Event func() - // Done returns a channel that will close to signal to us that it's time - // for us to shutdown. - Done chan struct{} + // DoneCtx returns a context that will cancel to signal to us that it's + // time for us to shutdown. + // TODO: this is temporary until Watch supports context directly. + DoneCtx context.Context // Called from within CheckApply: diff --git a/engine/resources/augeas.go b/engine/resources/augeas.go index 26f24a3d..2d2a3430 100644 --- a/engine/resources/augeas.go +++ b/engine/resources/augeas.go @@ -154,7 +154,7 @@ func (obj *AugeasRes) Watch() error { } send = true - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/aws_ec2.go b/engine/resources/aws_ec2.go index bba0bd3e..813243dd 100644 --- a/engine/resources/aws_ec2.go +++ b/engine/resources/aws_ec2.go @@ -502,7 +502,7 @@ func (obj *AwsEc2Res) longpollWatch() error { send = true } - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } @@ -596,7 +596,7 @@ func (obj *AwsEc2Res) snsWatch() error { obj.init.Logf("State: %v", msg.event) send = true - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/config_etcd.go b/engine/resources/config_etcd.go index 9f2c77c8..7ace399b 100644 --- a/engine/resources/config_etcd.go +++ b/engine/resources/config_etcd.go @@ -99,8 +99,7 @@ func (obj *ConfigEtcdRes) Watch() error { obj.wg.Add(1) defer obj.wg.Done() // FIXME: add timeout to context - // The obj.init.Done channel is closed by the engine to signal shutdown. - ctx, cancel := util.ContextWithCloser(context.Background(), obj.init.Done) + ctx, cancel := context.WithCancel(obj.init.DoneCtx) defer cancel() ch, err := obj.init.World.IdealClusterSizeWatch(util.CtxWithWg(ctx, obj.wg)) if err != nil { @@ -121,7 +120,7 @@ Loop: } // pass through and send an event - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown } obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/consul_kv.go b/engine/resources/consul_kv.go index b299b2be..19d2decb 100644 --- a/engine/resources/consul_kv.go +++ b/engine/resources/consul_kv.go @@ -162,10 +162,10 @@ func (obj *ConsulKVRes) Watch() error { // Unexpected situation, bug in consul API... select { case ch <- fmt.Errorf("unexpected behaviour in Consul API"): - case <-obj.init.Done: // signal for shutdown request + case <-obj.init.DoneCtx.Done(): // signal for shutdown request } - case <-obj.init.Done: // signal for shutdown request + case <-obj.init.DoneCtx.Done(): // signal for shutdown request } return } @@ -186,7 +186,7 @@ func (obj *ConsulKVRes) Watch() error { } obj.init.Event() - case <-obj.init.Done: // signal for shutdown request + case <-obj.init.DoneCtx.Done(): // signal for shutdown request return nil } } diff --git a/engine/resources/cron.go b/engine/resources/cron.go index 93cd092e..f189f845 100644 --- a/engine/resources/cron.go +++ b/engine/resources/cron.go @@ -296,7 +296,7 @@ func (obj *CronRes) Watch() error { } send = true - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } // do all our event sending all together to avoid duplicate msgs diff --git a/engine/resources/dhcp.go b/engine/resources/dhcp.go index 6ef2a2e2..d87a3b07 100644 --- a/engine/resources/dhcp.go +++ b/engine/resources/dhcp.go @@ -461,7 +461,7 @@ func (obj *DHCPServerRes) Watch() error { case <-closeSignal: // something shut us down early return closeError - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } @@ -518,7 +518,7 @@ func (obj *DHCPServerRes) CheckApply(apply bool) (bool, error) { //select { //case <-ch: ////case <-obj.interruptChan: // TODO: if we ever support InterruptableRes - //case <-obj.init.Done: // closed by the engine to signal shutdown + //case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown //} // Cheap runtime validation! @@ -1056,7 +1056,7 @@ func (obj *DHCPHostRes) Watch() error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/docker_container.go b/engine/resources/docker_container.go index 56cda452..a0b65154 100644 --- a/engine/resources/docker_container.go +++ b/engine/resources/docker_container.go @@ -196,7 +196,7 @@ func (obj *DockerContainerRes) Watch() error { } return err - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/docker_image.go b/engine/resources/docker_image.go index c1b8d82d..a6e00dfd 100644 --- a/engine/resources/docker_image.go +++ b/engine/resources/docker_image.go @@ -158,7 +158,7 @@ func (obj *DockerImageRes) Watch() error { } return err - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/exec.go b/engine/resources/exec.go index b2c6fc43..560fa436 100644 --- a/engine/resources/exec.go +++ b/engine/resources/exec.go @@ -252,7 +252,7 @@ func (obj *ExecRes) Watch() error { send = true } - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/file.go b/engine/resources/file.go index 021a23a3..5f97272e 100644 --- a/engine/resources/file.go +++ b/engine/resources/file.go @@ -497,7 +497,7 @@ func (obj *FileRes) Watch() error { } send = true - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/group.go b/engine/resources/group.go index fc5fc196..5332d9a0 100644 --- a/engine/resources/group.go +++ b/engine/resources/group.go @@ -105,7 +105,7 @@ func (obj *GroupRes) Watch() error { } send = true - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/hostname.go b/engine/resources/hostname.go index 89794aa0..8176132a 100644 --- a/engine/resources/hostname.go +++ b/engine/resources/hostname.go @@ -135,7 +135,7 @@ func (obj *HostnameRes) Watch() error { case <-signals: send = true - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/http.go b/engine/resources/http.go index 91d0938a..668e18b2 100644 --- a/engine/resources/http.go +++ b/engine/resources/http.go @@ -335,7 +335,7 @@ func (obj *HTTPServerRes) Watch() error { case <-closeSignal: // something shut us down early return closeError - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } @@ -725,7 +725,7 @@ func (obj *HTTPFileRes) Watch() error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/kv.go b/engine/resources/kv.go index 92429863..8810ee3b 100644 --- a/engine/resources/kv.go +++ b/engine/resources/kv.go @@ -26,7 +26,6 @@ import ( "github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine/traits" - "github.com/purpleidea/mgmt/util" "github.com/purpleidea/mgmt/util/errwrap" ) @@ -132,8 +131,8 @@ func (obj *KVRes) Close() error { // Watch is the primary listener for this resource and it outputs events. func (obj *KVRes) Watch() error { // FIXME: add timeout to context - // The obj.init.Done channel is closed by the engine to signal shutdown. - ctx, cancel := util.ContextWithCloser(context.Background(), obj.init.Done) + // The obj.init.DoneCtx context is closed by the engine to signal shutdown. + ctx, cancel := context.WithCancel(obj.init.DoneCtx) defer cancel() ch, err := obj.init.World.StrMapWatch(ctx, obj.getKey()) // get possible events! @@ -159,7 +158,7 @@ func (obj *KVRes) Watch() error { } send = true - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/mount.go b/engine/resources/mount.go index e923f8e5..a37bf397 100644 --- a/engine/resources/mount.go +++ b/engine/resources/mount.go @@ -266,7 +266,7 @@ func (obj *MountRes) Watch() error { send = true - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/msg.go b/engine/resources/msg.go index 2bc4f17b..895a6b7e 100644 --- a/engine/resources/msg.go +++ b/engine/resources/msg.go @@ -99,7 +99,7 @@ func (obj *MsgRes) Watch() error { //var send = false // send event? for { select { - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/net.go b/engine/resources/net.go index 3d02f203..60d7996c 100644 --- a/engine/resources/net.go +++ b/engine/resources/net.go @@ -299,7 +299,7 @@ func (obj *NetRes) Watch() error { send = true - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/noop.go b/engine/resources/noop.go index 9b4ff993..1c834eef 100644 --- a/engine/resources/noop.go +++ b/engine/resources/noop.go @@ -66,7 +66,7 @@ func (obj *NoopRes) Watch() error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/nspawn.go b/engine/resources/nspawn.go index 6b6722ea..d7cc600c 100644 --- a/engine/resources/nspawn.go +++ b/engine/resources/nspawn.go @@ -184,7 +184,7 @@ func (obj *NspawnRes) Watch() error { send = true } - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/password.go b/engine/resources/password.go index 1ff31a7d..146f203b 100644 --- a/engine/resources/password.go +++ b/engine/resources/password.go @@ -196,7 +196,7 @@ func (obj *PasswordRes) Watch() error { } send = true - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/pippet.go b/engine/resources/pippet.go index 15cc1ffc..79bcc193 100644 --- a/engine/resources/pippet.go +++ b/engine/resources/pippet.go @@ -96,7 +96,7 @@ func (obj *PippetRes) Watch() error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/pkg.go b/engine/resources/pkg.go index 8349fda1..c55f5176 100644 --- a/engine/resources/pkg.go +++ b/engine/resources/pkg.go @@ -143,7 +143,7 @@ func (obj *PkgRes) Watch() error { send = true - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/print.go b/engine/resources/print.go index 04dd9745..306da24f 100644 --- a/engine/resources/print.go +++ b/engine/resources/print.go @@ -73,7 +73,7 @@ func (obj *PrintRes) Watch() error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/resources_test.go b/engine/resources/resources_test.go index 2557f093..944a3431 100644 --- a/engine/resources/resources_test.go +++ b/engine/resources/resources_test.go @@ -20,6 +20,7 @@ package resources import ( + "context" "fmt" "io/ioutil" "os" @@ -485,7 +486,9 @@ func TestResources1(t *testing.T) { changedChan := make(chan bool, 1) // buffered! readyChan := make(chan struct{}) eventChan := make(chan struct{}) - doneChan := make(chan struct{}) + doneCtx, doneCtxCancel := context.WithCancel(context.Background()) + defer doneCtxCancel() + debug := testing.Verbose() // set via the -test.v flag to `go test` logf := func(format string, v ...interface{}) { t.Logf(fmt.Sprintf("test #%d: ", index)+format, v...) @@ -507,9 +510,9 @@ func TestResources1(t *testing.T) { }, // Watch listens on this for close/pause events. - Done: doneChan, - Debug: debug, - Logf: logf, + DoneCtx: doneCtx, + Debug: debug, + Logf: logf, // unused Send: func(st interface{}) error { @@ -629,7 +632,7 @@ func TestResources1(t *testing.T) { } } t.Logf("test #%d: shutting down Watch", index) - close(doneChan) // send Watch shutdown command + doneCtxCancel() // send Watch shutdown command }() Loop: for { diff --git a/engine/resources/svc.go b/engine/resources/svc.go index f1fe9d19..7c47995b 100644 --- a/engine/resources/svc.go +++ b/engine/resources/svc.go @@ -172,7 +172,7 @@ func (obj *SvcRes) Watch() error { // loop so that we can see the changed invalid signal obj.init.Logf("daemon reload") - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } } else { @@ -215,7 +215,7 @@ func (obj *SvcRes) Watch() error { case err := <-subErrors: return errwrap.Wrapf(err, "unknown %s error", obj) - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } } diff --git a/engine/resources/test.go b/engine/resources/test.go index 93857eed..109767cc 100644 --- a/engine/resources/test.go +++ b/engine/resources/test.go @@ -128,7 +128,7 @@ func (obj *TestRes) Watch() error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/tftp.go b/engine/resources/tftp.go index edd692f8..39772240 100644 --- a/engine/resources/tftp.go +++ b/engine/resources/tftp.go @@ -200,7 +200,7 @@ func (obj *TFTPServerRes) Watch() error { case <-closeSignal: // something shut us down early return closeError - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } @@ -551,7 +551,7 @@ func (obj *TFTPFileRes) Watch() error { obj.init.Running() // when started, notify engine that we're running select { - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown } //obj.init.Event() // notify engine of an event (this can block) diff --git a/engine/resources/timer.go b/engine/resources/timer.go index c1cfec75..e8dfb26c 100644 --- a/engine/resources/timer.go +++ b/engine/resources/timer.go @@ -84,7 +84,7 @@ func (obj *TimerRes) Watch() error { send = true obj.init.Logf("received tick") - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/user.go b/engine/resources/user.go index 0245cbe3..e3077fc4 100644 --- a/engine/resources/user.go +++ b/engine/resources/user.go @@ -139,7 +139,7 @@ func (obj *UserRes) Watch() error { } send = true - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil } diff --git a/engine/resources/virt.go b/engine/resources/virt.go index 2e09f00c..876f42ef 100644 --- a/engine/resources/virt.go +++ b/engine/resources/virt.go @@ -441,7 +441,7 @@ func (obj *VirtRes) Watch() error { case err := <-errorChan: return errwrap.Wrapf(err, "unknown libvirt error") - case <-obj.init.Done: // closed by the engine to signal shutdown + case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown return nil }