From 7ccda7e99be8dedf69b8e29d0971c5a72dcf6662 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Fri, 1 Sep 2023 22:56:32 -0400 Subject: [PATCH] engine: Add a ctx to the CheckApply API This is just a rough port, there are lots of optimizations to be done and lots of timeout values that should be replaced by a new timeout meta param! --- docs/resource-guide.md | 13 +++--- engine/graph/actions.go | 7 +-- engine/graph/autogroup/autogroup_test.go | 2 +- engine/resources.go | 8 +++- engine/resources/augeas.go | 6 +-- engine/resources/aws_ec2.go | 4 +- engine/resources/config_etcd.go | 10 ++--- engine/resources/consul_kv.go | 3 +- engine/resources/cron.go | 10 ++--- engine/resources/dhcp.go | 8 ++-- engine/resources/docker_container.go | 4 +- engine/resources/docker_image.go | 4 +- engine/resources/exec.go | 12 ++--- engine/resources/exec_test.go | 56 ++++++++++++++++++++++-- engine/resources/file.go | 46 +++++++++---------- engine/resources/group.go | 2 +- engine/resources/hetzner_vm.go | 3 +- engine/resources/hostname.go | 2 +- engine/resources/http.go | 4 +- engine/resources/kv.go | 4 +- engine/resources/mount.go | 22 +++++----- engine/resources/msg.go | 2 +- engine/resources/net.go | 18 ++++---- engine/resources/noop.go | 2 +- engine/resources/nspawn.go | 4 +- engine/resources/password.go | 2 +- engine/resources/pippet.go | 2 +- engine/resources/pkg.go | 2 +- engine/resources/print.go | 2 +- engine/resources/resources_test.go | 4 +- engine/resources/svc.go | 3 +- engine/resources/test.go | 2 +- engine/resources/tftp.go | 4 +- engine/resources/timer.go | 2 +- engine/resources/user.go | 2 +- engine/resources/virt.go | 12 ++--- engine/util/util_test.go | 2 +- 37 files changed, 175 insertions(+), 120 deletions(-) diff --git a/docs/resource-guide.md b/docs/resource-guide.md index 23b184e4..e67a6526 100644 --- a/docs/resource-guide.md +++ b/docs/resource-guide.md @@ -206,7 +206,7 @@ on an error if something went wrong. ### CheckApply ```golang -CheckApply(apply bool) (checkOK bool, err error) +CheckApply(ctx context.Context, apply bool) (checkOK bool, err error) ``` `CheckApply` is where the real _work_ is done. Under normal circumstances, this @@ -215,7 +215,8 @@ should return: `(true, nil)`. If the `apply` variable is set to `true`, then this means that we should then proceed to run the changes required to bring the resource into the correct state. If the `apply` variable is set to `false`, then the resource is operating in _noop_ mode and _no operational changes_ should be -made! +made! The ctx should be monitored in case a shutdown has been requested. This +may be used if a timeout occurred, or if the user shutdown the engine. After having executed the necessary operations to bring the resource back into the desired state, or after having detected that the state was incorrect, but @@ -234,7 +235,7 @@ to `CheckApply`. ```golang // CheckApply does the idempotent work of checking and applying resource state. -func (obj *FooRes) CheckApply(apply bool) (bool, error) { +func (obj *FooRes) CheckApply(ctx context.Context, apply bool) (bool, error) { // check the state if state_is_okay { return true, nil } // done early! :) @@ -731,9 +732,9 @@ to be separate. It turns out that the current `CheckApply` can wrap this easily. It would look approximately like this: ```golang -func (obj *FooRes) CheckApply(apply bool) (bool, error) { +func (obj *FooRes) CheckApply(ctx context.Context, apply bool) (bool, error) { // my private split implementation of check and apply - if c, err := obj.check(); err != nil { + if c, err := obj.check(ctx); err != nil { return false, err // we errored } else if c { return true, nil // state was good! @@ -743,7 +744,7 @@ func (obj *FooRes) CheckApply(apply bool) (bool, error) { return false, nil // state needs fixing, but apply is false } - err := obj.apply() // errors if failure or unable to apply + err := obj.apply(ctx) // errors if failure or unable to apply return false, err // always return false, with an optional error } diff --git a/engine/graph/actions.go b/engine/graph/actions.go index ee898c66..35e8a13c 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -18,6 +18,7 @@ package graph import ( + "context" "fmt" "strings" "sync" @@ -60,7 +61,7 @@ func (obj *Engine) BadTimestamps(vertex pgraph.Vertex) []pgraph.Vertex { } // Process is the primary function to execute a particular vertex in the graph. -func (obj *Engine) Process(vertex pgraph.Vertex) error { +func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { res, isRes := vertex.(engine.Res) if !isRes { return fmt.Errorf("vertex is not a Res") @@ -155,7 +156,7 @@ func (obj *Engine) Process(vertex pgraph.Vertex) error { // run the CheckApply! obj.Logf("%s: CheckApply(%t)", res, !noop) // if this fails, don't UpdateTimestamp() - checkOK, err = res.CheckApply(!noop) + checkOK, err = res.CheckApply(ctx, !noop) obj.Logf("%s: CheckApply(%t): Return(%t, %s)", res, !noop, checkOK, engineUtil.CleanError(err)) } @@ -563,7 +564,7 @@ Loop: if obj.Debug { obj.Logf("Process(%s)", vertex) } - err = obj.Process(vertex) + err = obj.Process(obj.state[vertex].doneCtx, vertex) if obj.Debug { obj.Logf("Process(%s): Return(%s)", vertex, engineUtil.CleanError(err)) } diff --git a/engine/graph/autogroup/autogroup_test.go b/engine/graph/autogroup/autogroup_test.go index 4b2a15d7..bb9d5ca9 100644 --- a/engine/graph/autogroup/autogroup_test.go +++ b/engine/graph/autogroup/autogroup_test.go @@ -71,7 +71,7 @@ func (obj *NoopResTest) Watch(context.Context) error { return nil // not needed } -func (obj *NoopResTest) CheckApply(apply bool) (checkOK bool, err error) { +func (obj *NoopResTest) CheckApply(ctx context.Context, apply bool) (checkOK bool, err error) { return true, nil // state is always okay } diff --git a/engine/resources.go b/engine/resources.go index 3b4bcc30..2e41f188 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -199,8 +199,12 @@ type Res interface { Watch(context.Context) error // CheckApply determines if the state of the resource is correct and if - // asked to with the `apply` variable, applies the requested state. - CheckApply(apply bool) (checkOK bool, err error) + // asked to with the `apply` variable, applies the requested state. If + // the input context cancels, we must return as quickly as possible. We + // should never exit immediately if this would cause permanent + // corruption of some sort. However it doesn't mean that a resource was + // taken to the desired state. + CheckApply(ctx context.Context, apply bool) (checkOK bool, err error) // Cmp compares itself to another resource and returns an error if they // are not equivalent. This is more strict than the Adapts method of the diff --git a/engine/resources/augeas.go b/engine/resources/augeas.go index 0b3fe236..684bced0 100644 --- a/engine/resources/augeas.go +++ b/engine/resources/augeas.go @@ -168,7 +168,7 @@ func (obj *AugeasRes) Watch(ctx context.Context) error { } // checkApplySet runs CheckApply for one element of the AugeasRes.Set -func (obj *AugeasRes) checkApplySet(apply bool, ag *augeas.Augeas, set *AugeasSet) (bool, error) { +func (obj *AugeasRes) checkApplySet(ctx context.Context, apply bool, ag *augeas.Augeas, set *AugeasSet) (bool, error) { fullpath := fmt.Sprintf("/files/%v/%v", obj.File, set.Path) // We do not check for errors because errors are also thrown when @@ -193,7 +193,7 @@ func (obj *AugeasRes) checkApplySet(apply bool, ag *augeas.Augeas, set *AugeasSe } // CheckApply method for Augeas resource. -func (obj *AugeasRes) CheckApply(apply bool) (bool, error) { +func (obj *AugeasRes) CheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("CheckApply: %s", obj.File) // By default we do not set any option to augeas, we use the defaults. opts := augeas.None @@ -230,7 +230,7 @@ func (obj *AugeasRes) CheckApply(apply bool) (bool, error) { checkOK := true for _, set := range obj.Sets { - if setCheckOK, err := obj.checkApplySet(apply, &ag, set); err != nil { + if setCheckOK, err := obj.checkApplySet(ctx, apply, &ag, set); err != nil { return false, errwrap.Wrapf(err, "augeas: error during CheckApply of one Set") } else if !setCheckOK { checkOK = false diff --git a/engine/resources/aws_ec2.go b/engine/resources/aws_ec2.go index 6655679d..6b584cc6 100644 --- a/engine/resources/aws_ec2.go +++ b/engine/resources/aws_ec2.go @@ -625,7 +625,7 @@ func (obj *AwsEc2Res) snsWatch(ctx context.Context) error { } // CheckApply method for AwsEc2 resource. -func (obj *AwsEc2Res) CheckApply(apply bool) (bool, error) { +func (obj *AwsEc2Res) CheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("CheckApply(%t)", apply) // find the instance we need to check @@ -735,7 +735,7 @@ func (obj *AwsEc2Res) CheckApply(apply bool) (bool, error) { } // context to cancel the waiter if it takes too long - innerCtx, cancel := context.WithTimeout(context.TODO(), waitTimeout*time.Second) + innerCtx, cancel := context.WithTimeout(ctx, waitTimeout*time.Second) defer cancel() // wait until the state converges diff --git a/engine/resources/config_etcd.go b/engine/resources/config_etcd.go index 2b6d89a1..4a139a60 100644 --- a/engine/resources/config_etcd.go +++ b/engine/resources/config_etcd.go @@ -130,10 +130,10 @@ Loop: // to zero, then it *won't* try and change it away from zero, because it assumes // that someone has requested a shutdown. If the value is seen on first startup, // then it will change it, because it might be a zero from the previous cluster. -func (obj *ConfigEtcdRes) sizeCheckApply(apply bool) (bool, error) { +func (obj *ConfigEtcdRes) sizeCheckApply(ctx context.Context, apply bool) (bool, error) { wg := &sync.WaitGroup{} defer wg.Wait() // this must be above the defer cancel() call - ctx, cancel := context.WithTimeout(context.Background(), sizeCheckApplyTimeout) + ctx, cancel := context.WithTimeout(ctx, sizeCheckApplyTimeout) defer cancel() wg.Add(1) go func() { @@ -182,17 +182,17 @@ func (obj *ConfigEtcdRes) sizeCheckApply(apply bool) (bool, error) { } // CheckApply method for Noop resource. Does nothing, returns happy! -func (obj *ConfigEtcdRes) CheckApply(apply bool) (bool, error) { +func (obj *ConfigEtcdRes) CheckApply(ctx context.Context, apply bool) (bool, error) { checkOK := true - if c, err := obj.sizeCheckApply(apply); err != nil { + if c, err := obj.sizeCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false } // TODO: add more config settings management here... - //if c, err := obj.TODOCheckApply(apply); err != nil { + //if c, err := obj.TODOCheckApply(ctx, apply); err != nil { // return false, err //} else if !c { // checkOK = false diff --git a/engine/resources/consul_kv.go b/engine/resources/consul_kv.go index b3ddb1bc..ba16823a 100644 --- a/engine/resources/consul_kv.go +++ b/engine/resources/consul_kv.go @@ -195,7 +195,8 @@ func (obj *ConsulKVRes) Watch(ctx context.Context) error { // CheckApply is run to check the state and, if apply is true, to apply the // necessary changes to reach the desired state. This is run before Watch and // again if Watch finds a change occurring to the state. -func (obj *ConsulKVRes) CheckApply(apply bool) (bool, error) { +func (obj *ConsulKVRes) CheckApply(ctx context.Context, apply bool) (bool, error) { + // XXX: use ctx for get and put if obj.init.Debug { obj.init.Logf("consul key: %s", obj.key) } diff --git a/engine/resources/cron.go b/engine/resources/cron.go index 2202b5c0..ea6e5109 100644 --- a/engine/resources/cron.go +++ b/engine/resources/cron.go @@ -317,16 +317,16 @@ func (obj *CronRes) Watch(ctx context.Context) error { // CheckApply is run to check the state and, if apply is true, to apply the // necessary changes to reach the desired state. This is run before Watch and // again if Watch finds a change occurring to the state. -func (obj *CronRes) CheckApply(apply bool) (bool, error) { +func (obj *CronRes) CheckApply(ctx context.Context, apply bool) (bool, error) { checkOK := true // use the embedded file resource to apply the correct state - if c, err := obj.file.CheckApply(apply); err != nil { + if c, err := obj.file.CheckApply(ctx, apply); err != nil { return false, errwrap.Wrapf(err, "nested file failed") } else if !c { checkOK = false } // check timer state and apply the defined state if needed - if c, err := obj.unitCheckApply(apply); err != nil { + if c, err := obj.unitCheckApply(ctx, apply); err != nil { return false, errwrap.Wrapf(err, "unitCheckApply error") } else if !c { checkOK = false @@ -336,7 +336,7 @@ func (obj *CronRes) CheckApply(apply bool) (bool, error) { // unitCheckApply checks the state of the systemd-timer unit and, if apply is // true, applies the defined state. -func (obj *CronRes) unitCheckApply(apply bool) (bool, error) { +func (obj *CronRes) unitCheckApply(ctx context.Context, apply bool) (bool, error) { var conn *sdbus.Conn var godbusConn *dbus.Conn var err error @@ -383,7 +383,7 @@ func (obj *CronRes) unitCheckApply(apply bool) (bool, error) { } // context for stopping/restarting the unit - ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout*time.Second) + ctx, cancel := context.WithTimeout(ctx, ctxTimeout*time.Second) defer cancel() // godbus connection for stopping/restarting the unit diff --git a/engine/resources/dhcp.go b/engine/resources/dhcp.go index 2cf39fb8..8416e4e1 100644 --- a/engine/resources/dhcp.go +++ b/engine/resources/dhcp.go @@ -476,7 +476,7 @@ func (obj *DHCPServerRes) Watch(ctx context.Context) error { // sidCheckApply runs the server ID cache operation in CheckApply, which can // help CheckApply fail before the handler runs, so at least we see an error. -func (obj *DHCPServerRes) sidCheckApply(apply bool) (bool, error) { +func (obj *DHCPServerRes) sidCheckApply(ctx context.Context, apply bool) (bool, error) { // Mutex guards the cached obj.serverID value. defer obj.sidMutex.Unlock() obj.sidMutex.Lock() @@ -500,7 +500,7 @@ func (obj *DHCPServerRes) sidCheckApply(apply bool) (bool, error) { // CheckApply never has anything to do for this resource, so it always succeeds. // It does however check that certain runtime requirements (such as the Root dir // existing if one was specified) are fulfilled. -func (obj *DHCPServerRes) CheckApply(apply bool) (bool, error) { +func (obj *DHCPServerRes) CheckApply(ctx context.Context, apply bool) (bool, error) { if obj.init.Debug { obj.init.Logf("CheckApply") } @@ -524,7 +524,7 @@ func (obj *DHCPServerRes) CheckApply(apply bool) (bool, error) { // Cheap runtime validation! checkOK := true - if c, err := obj.sidCheckApply(apply); err != nil { + if c, err := obj.sidCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false @@ -1066,7 +1066,7 @@ func (obj *DHCPHostRes) Watch(ctx context.Context) error { } // CheckApply never has anything to do for this resource, so it always succeeds. -func (obj *DHCPHostRes) CheckApply(apply bool) (bool, error) { +func (obj *DHCPHostRes) CheckApply(ctx context.Context, apply bool) (bool, error) { if obj.init.Debug { obj.init.Logf("CheckApply") } diff --git a/engine/resources/docker_container.go b/engine/resources/docker_container.go index 6c2b5fb0..1fc4d70b 100644 --- a/engine/resources/docker_container.go +++ b/engine/resources/docker_container.go @@ -214,11 +214,11 @@ func (obj *DockerContainerRes) Watch(ctx context.Context) error { } // CheckApply method for Docker resource. -func (obj *DockerContainerRes) CheckApply(apply bool) (bool, error) { +func (obj *DockerContainerRes) CheckApply(ctx context.Context, apply bool) (bool, error) { var id string var destroy bool - ctx, cancel := context.WithTimeout(context.Background(), checkApplyCtxTimeout*time.Second) + ctx, cancel := context.WithTimeout(ctx, checkApplyCtxTimeout*time.Second) defer cancel() // List any container whose name matches this resource. diff --git a/engine/resources/docker_image.go b/engine/resources/docker_image.go index 638b9ecd..94e99d15 100644 --- a/engine/resources/docker_image.go +++ b/engine/resources/docker_image.go @@ -172,8 +172,8 @@ func (obj *DockerImageRes) Watch(ctx context.Context) error { } // CheckApply method for Docker resource. -func (obj *DockerImageRes) CheckApply(apply bool) (checkOK bool, err error) { - ctx, cancel := context.WithTimeout(context.Background(), dockerImageCheckApplyCtxTimeout*time.Second) +func (obj *DockerImageRes) CheckApply(ctx context.Context, apply bool) (checkOK bool, err error) { + ctx, cancel := context.WithTimeout(ctx, dockerImageCheckApplyCtxTimeout*time.Second) defer cancel() s, err := obj.client.ImageList(ctx, types.ImageListOptions{ diff --git a/engine/resources/exec.go b/engine/resources/exec.go index f7fe547a..52603279 100644 --- a/engine/resources/exec.go +++ b/engine/resources/exec.go @@ -277,7 +277,7 @@ func (obj *ExecRes) Watch(ctx context.Context) error { // CheckApply checks the resource state and applies the resource if the bool // input is true. It returns error info and if the state check passed or not. // TODO: expand the IfCmd to be a list of commands -func (obj *ExecRes) CheckApply(apply bool) (bool, error) { +func (obj *ExecRes) CheckApply(ctx context.Context, apply bool) (bool, error) { // If we receive a refresh signal, then the engine skips the IsStateOK() // check and this will run. It is still guarded by the IfCmd, but it can // have a chance to execute, and all without the check of obj.Refresh()! @@ -381,15 +381,15 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) { wg := &sync.WaitGroup{} defer wg.Wait() // this must be above the defer cancel() call - var ctx context.Context + var innerCtx context.Context var cancel context.CancelFunc if obj.Timeout > 0 { // cmd.Process.Kill() is called on timeout - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(obj.Timeout)*time.Second) + innerCtx, cancel = context.WithTimeout(ctx, time.Duration(obj.Timeout)*time.Second) } else { // zero timeout means no timer - ctx, cancel = context.WithCancel(context.Background()) + innerCtx, cancel = context.WithCancel(ctx) } defer cancel() - cmd := exec.CommandContext(ctx, cmdName, cmdArgs...) + cmd := exec.CommandContext(innerCtx, cmdName, cmdArgs...) cmd.Dir = obj.Cwd // run program in pwd if "" envKeys := []string{} @@ -432,7 +432,7 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) { select { case <-obj.interruptChan: cancel() - case <-ctx.Done(): + case <-innerCtx.Done(): // let this exit } }() diff --git a/engine/resources/exec_test.go b/engine/resources/exec_test.go index 6e41df74..f6d2849d 100644 --- a/engine/resources/exec_test.go +++ b/engine/resources/exec_test.go @@ -53,6 +53,18 @@ func fakeExecInit(t *testing.T) (*engine.Init, *ExecSends) { } func TestExecSendRecv1(t *testing.T) { + now := time.Now() + min := time.Second * 3 // approx min time needed for the test + ctx := context.Background() + if deadline, ok := t.Deadline(); ok { + d := deadline.Add(-min) + t.Logf(" now: %+v", now) + t.Logf(" d: %+v", d) + newCtx, cancel := context.WithDeadline(ctx, d) + ctx = newCtx + defer cancel() + } + r1 := &ExecRes{ Cmd: "echo hello world", Shell: "/bin/bash", @@ -71,7 +83,7 @@ func TestExecSendRecv1(t *testing.T) { t.Errorf("init failed with: %v", err) } // run artificially without the entire engine - if _, err := r1.CheckApply(true); err != nil { + if _, err := r1.CheckApply(ctx, true); err != nil { t.Errorf("checkapply failed with: %v", err) } @@ -98,6 +110,18 @@ func TestExecSendRecv1(t *testing.T) { } func TestExecSendRecv2(t *testing.T) { + now := time.Now() + min := time.Second * 3 // approx min time needed for the test + ctx := context.Background() + if deadline, ok := t.Deadline(); ok { + d := deadline.Add(-min) + t.Logf(" now: %+v", now) + t.Logf(" d: %+v", d) + newCtx, cancel := context.WithDeadline(ctx, d) + ctx = newCtx + defer cancel() + } + r1 := &ExecRes{ Cmd: "echo hello world 1>&2", // to stderr Shell: "/bin/bash", @@ -116,7 +140,7 @@ func TestExecSendRecv2(t *testing.T) { t.Errorf("init failed with: %v", err) } // run artificially without the entire engine - if _, err := r1.CheckApply(true); err != nil { + if _, err := r1.CheckApply(ctx, true); err != nil { t.Errorf("checkapply failed with: %v", err) } @@ -143,6 +167,18 @@ func TestExecSendRecv2(t *testing.T) { } func TestExecSendRecv3(t *testing.T) { + now := time.Now() + min := time.Second * 3 // approx min time needed for the test + ctx := context.Background() + if deadline, ok := t.Deadline(); ok { + d := deadline.Add(-min) + t.Logf(" now: %+v", now) + t.Logf(" d: %+v", d) + newCtx, cancel := context.WithDeadline(ctx, d) + ctx = newCtx + defer cancel() + } + r1 := &ExecRes{ Cmd: "echo hello world && echo goodbye world 1>&2", // to stdout && stderr Shell: "/bin/bash", @@ -161,7 +197,7 @@ func TestExecSendRecv3(t *testing.T) { t.Errorf("init failed with: %v", err) } // run artificially without the entire engine - if _, err := r1.CheckApply(true); err != nil { + if _, err := r1.CheckApply(ctx, true); err != nil { t.Errorf("checkapply failed with: %v", err) } @@ -206,8 +242,20 @@ func TestExecSendRecv3(t *testing.T) { } func TestExecTimeoutBehaviour(t *testing.T) { + now := time.Now() + min := time.Second * 3 // approx min time needed for the test + ctx := context.Background() + if deadline, ok := t.Deadline(); ok { + d := deadline.Add(-min) + t.Logf(" now: %+v", now) + t.Logf(" d: %+v", d) + newCtx, cancel := context.WithDeadline(ctx, d) + ctx = newCtx + defer cancel() + } + // cmd.Process.Kill() is called on timeout - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() cmdName := "/bin/sleep" // it's /usr/bin/sleep on modern distros cmdArgs := []string{"300"} // 5 min in seconds diff --git a/engine/resources/file.go b/engine/resources/file.go index 196f354d..224acccc 100644 --- a/engine/resources/file.go +++ b/engine/resources/file.go @@ -534,7 +534,7 @@ func (obj *FileRes) Watch(ctx context.Context) error { // of computing the source data hash, and it returns the computed value if this // function reaches that stage. As usual, it respects the apply action variable, // and has some symmetry with the main CheckApply function. -func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sha256sum string) (string, bool, error) { +func (obj *FileRes) fileCheckApply(ctx context.Context, apply bool, src io.ReadSeeker, dst string, sha256sum string) (string, bool, error) { // TODO: does it make sense to switch dst to an io.Writer ? // TODO: use obj.Force when dealing with symlinks and other file types! if obj.init.Debug { @@ -674,7 +674,7 @@ func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sh } // dirCheckApply is the CheckApply operation for an empty directory. -func (obj *FileRes) dirCheckApply(apply bool) (bool, error) { +func (obj *FileRes) dirCheckApply(ctx context.Context, apply bool) (bool, error) { // check if the path exists and is a directory fileInfo, err := os.Stat(obj.getPath()) if err != nil && !os.IsNotExist(err) { @@ -726,7 +726,7 @@ func (obj *FileRes) dirCheckApply(apply bool) (bool, error) { // fileCheckApply method. It returns checkOK and error as is normally expected. // If excludes is specified, none of those files there will be deleted by this, // with the exception that a sync *can* convert a file to a dir, or vice-versa. -func (obj *FileRes) syncCheckApply(apply bool, src, dst string, excludes []string) (bool, error) { +func (obj *FileRes) syncCheckApply(ctx context.Context, apply bool, src, dst string, excludes []string) (bool, error) { if obj.init.Debug { obj.init.Logf("syncCheckApply: %s -> %s", src, dst) } @@ -760,7 +760,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string, excludes []strin return false, err } - _, checkOK, err := obj.fileCheckApply(apply, fin, dst, "") + _, checkOK, err := obj.fileCheckApply(ctx, apply, fin, dst, "") if err != nil { fin.Close() return false, err @@ -831,7 +831,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string, excludes []strin obj.init.Logf("syncCheckApply: recurse: %s -> %s", absSrc, absDst) } if obj.Recurse { - if c, err := obj.syncCheckApply(apply, absSrc, absDst, excludes); err != nil { // recurse + if c, err := obj.syncCheckApply(ctx, apply, absSrc, absDst, excludes); err != nil { // recurse return false, errwrap.Wrapf(err, "syncCheckApply: recurse failed") } else if !c { // don't let subsequent passes make this true checkOK = false @@ -888,7 +888,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string, excludes []strin } _ = absSrc //obj.init.Logf("syncCheckApply: recurse rm: %s -> %s", absSrc, absDst) - //if c, err := obj.syncCheckApply(apply, absSrc, absDst, excludes); err != nil { + //if c, err := obj.syncCheckApply(ctx, apply, absSrc, absDst, excludes); err != nil { // return false, errwrap.Wrapf(err, "syncCheckApply: recurse rm failed") //} else if !c { // don't let subsequent passes make this true // checkOK = false @@ -910,7 +910,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string, excludes []strin // stateCheckApply performs a CheckApply of the file state to create or remove // an empty file or directory. -func (obj *FileRes) stateCheckApply(apply bool) (bool, error) { +func (obj *FileRes) stateCheckApply(ctx context.Context, apply bool) (bool, error) { if obj.State == FileStateUndefined { // state is not specified return true, nil } @@ -953,7 +953,7 @@ func (obj *FileRes) stateCheckApply(apply bool) (bool, error) { // we need to make a file or a directory now if obj.isDir() { - return obj.dirCheckApply(apply) + return obj.dirCheckApply(ctx, apply) } // Optimization: we shouldn't even look at obj.Content here, but we can @@ -979,7 +979,7 @@ func (obj *FileRes) stateCheckApply(apply bool) (bool, error) { } // contentCheckApply performs a CheckApply for the file content. -func (obj *FileRes) contentCheckApply(apply bool) (bool, error) { +func (obj *FileRes) contentCheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("contentCheckApply(%t)", apply) // content is not defined, leave it alone... @@ -989,7 +989,7 @@ func (obj *FileRes) contentCheckApply(apply bool) (bool, error) { // Actually write the file. This is similar to fragmentsCheckApply. bufferSrc := bytes.NewReader([]byte(*obj.Content)) - sha256sum, checkOK, err := obj.fileCheckApply(apply, bufferSrc, obj.getPath(), obj.sha256sum) + sha256sum, checkOK, err := obj.fileCheckApply(ctx, apply, bufferSrc, obj.getPath(), obj.sha256sum) if sha256sum != "" { // empty values mean errored or didn't hash // this can be valid even when the whole function errors obj.sha256sum = sha256sum // cache value @@ -1002,7 +1002,7 @@ func (obj *FileRes) contentCheckApply(apply bool) (bool, error) { } // sourceCheckApply performs a CheckApply for the file source. -func (obj *FileRes) sourceCheckApply(apply bool) (bool, error) { +func (obj *FileRes) sourceCheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("sourceCheckApply(%t)", apply) // source is not defined, leave it alone... @@ -1047,7 +1047,7 @@ func (obj *FileRes) sourceCheckApply(apply bool) (bool, error) { } // XXX: should this work with obj.Purge && obj.Source != "" or not? - checkOK, err := obj.syncCheckApply(apply, obj.Source, obj.getPath(), excludes) + checkOK, err := obj.syncCheckApply(ctx, apply, obj.Source, obj.getPath(), excludes) if err != nil { obj.init.Logf("syncCheckApply: error: %v", err) return false, err @@ -1057,7 +1057,7 @@ func (obj *FileRes) sourceCheckApply(apply bool) (bool, error) { } // fragmentsCheckApply performs a CheckApply for the file fragments. -func (obj *FileRes) fragmentsCheckApply(apply bool) (bool, error) { +func (obj *FileRes) fragmentsCheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("fragmentsCheckApply(%t)", apply) // fragments is not defined, leave it alone... @@ -1104,7 +1104,7 @@ func (obj *FileRes) fragmentsCheckApply(apply bool) (bool, error) { // NOTE: We pass in an invalidated sha256sum cache since we don't cache // all the individual files, and it could all change without us knowing. // TODO: Is the sha256sum caching even having an effect at all here ??? - sha256sum, checkOK, err := obj.fileCheckApply(apply, bufferSrc, obj.getPath(), "") + sha256sum, checkOK, err := obj.fileCheckApply(ctx, apply, bufferSrc, obj.getPath(), "") if sha256sum != "" { // empty values mean errored or didn't hash // this can be valid even when the whole function errors obj.sha256sum = sha256sum // cache value @@ -1117,7 +1117,7 @@ func (obj *FileRes) fragmentsCheckApply(apply bool) (bool, error) { } // chownCheckApply performs a CheckApply for the file ownership. -func (obj *FileRes) chownCheckApply(apply bool) (bool, error) { +func (obj *FileRes) chownCheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("chownCheckApply(%t)", apply) if obj.Owner == "" && obj.Group == "" { @@ -1177,7 +1177,7 @@ func (obj *FileRes) chownCheckApply(apply bool) (bool, error) { } // chmodCheckApply performs a CheckApply for the file permissions. -func (obj *FileRes) chmodCheckApply(apply bool) (bool, error) { +func (obj *FileRes) chmodCheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("chmodCheckApply(%t)", apply) if obj.Mode == "" { @@ -1210,7 +1210,7 @@ func (obj *FileRes) chmodCheckApply(apply bool) (bool, error) { // CheckApply checks the resource state and applies the resource if the bool // input is true. It returns error info and if the state check passed or not. -func (obj *FileRes) CheckApply(apply bool) (bool, error) { +func (obj *FileRes) CheckApply(ctx context.Context, apply bool) (bool, error) { // NOTE: all send/recv change notifications *must* be processed before // there is a possibility of failure in CheckApply. This is because if // we fail (and possibly run again) the subsequent send->recv transfer @@ -1227,33 +1227,33 @@ func (obj *FileRes) CheckApply(apply bool) (bool, error) { // Run stateCheckApply before contentCheckApply, sourceCheckApply, and // fragmentsCheckApply. - if c, err := obj.stateCheckApply(apply); err != nil { + if c, err := obj.stateCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false } - if c, err := obj.contentCheckApply(apply); err != nil { + if c, err := obj.contentCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false } - if c, err := obj.sourceCheckApply(apply); err != nil { + if c, err := obj.sourceCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false } - if c, err := obj.fragmentsCheckApply(apply); err != nil { + if c, err := obj.fragmentsCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false } - if c, err := obj.chownCheckApply(apply); err != nil { + if c, err := obj.chownCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false } - if c, err := obj.chmodCheckApply(apply); err != nil { + if c, err := obj.chmodCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false diff --git a/engine/resources/group.go b/engine/resources/group.go index 1748f65d..8f319bfb 100644 --- a/engine/resources/group.go +++ b/engine/resources/group.go @@ -122,7 +122,7 @@ func (obj *GroupRes) Watch(ctx context.Context) error { } // CheckApply method for Group resource. -func (obj *GroupRes) CheckApply(apply bool) (bool, error) { +func (obj *GroupRes) CheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("CheckApply(%t)", apply) // check if the group exists diff --git a/engine/resources/hetzner_vm.go b/engine/resources/hetzner_vm.go index 53527267..24c99d7e 100644 --- a/engine/resources/hetzner_vm.go +++ b/engine/resources/hetzner_vm.go @@ -388,9 +388,8 @@ func (obj *HetznerVMRes) Watch(context.Context) error { // NOTE: this last assumption might still fail in case the same resource // instance is managed by multiple running mgmt instances! // TODO: possible to ensure safe concurrency? -func (obj *HetznerVMRes) CheckApply(apply bool) (bool, error) { +func (obj *HetznerVMRes) CheckApply(ctx context.Context, apply bool) (bool, error) { checkOK := true - ctx := context.TODO() // Request up-to-date server info from the API. if err := obj.getServerUpdate(ctx); err != nil { return false, errwrap.Wrapf(err, "getServerUpdate failed") diff --git a/engine/resources/hostname.go b/engine/resources/hostname.go index 06329670..0f081bcb 100644 --- a/engine/resources/hostname.go +++ b/engine/resources/hostname.go @@ -187,7 +187,7 @@ func (obj *HostnameRes) updateHostnameProperty(object dbus.BusObject, expectedVa } // CheckApply method for Hostname resource. -func (obj *HostnameRes) CheckApply(apply bool) (bool, error) { +func (obj *HostnameRes) CheckApply(ctx context.Context, apply bool) (bool, error) { conn, err := util.SystemBusPrivateUsable() if err != nil { return false, errwrap.Wrapf(err, "failed to connect to the private system bus") diff --git a/engine/resources/http.go b/engine/resources/http.go index f625239e..f6cb076a 100644 --- a/engine/resources/http.go +++ b/engine/resources/http.go @@ -350,7 +350,7 @@ func (obj *HTTPServerRes) Watch(ctx context.Context) error { // CheckApply never has anything to do for this resource, so it always succeeds. // It does however check that certain runtime requirements (such as the Root dir // existing if one was specified) are fulfilled. -func (obj *HTTPServerRes) CheckApply(apply bool) (bool, error) { +func (obj *HTTPServerRes) CheckApply(ctx context.Context, apply bool) (bool, error) { if obj.init.Debug { obj.init.Logf("CheckApply") } @@ -734,7 +734,7 @@ func (obj *HTTPFileRes) Watch(ctx context.Context) error { } // CheckApply never has anything to do for this resource, so it always succeeds. -func (obj *HTTPFileRes) CheckApply(apply bool) (bool, error) { +func (obj *HTTPFileRes) CheckApply(ctx context.Context, apply bool) (bool, error) { if obj.init.Debug { obj.init.Logf("CheckApply") } diff --git a/engine/resources/kv.go b/engine/resources/kv.go index cb9900fb..ab0f8626 100644 --- a/engine/resources/kv.go +++ b/engine/resources/kv.go @@ -207,12 +207,12 @@ func (obj *KVRes) lessThanCheck(value string) (bool, error) { } // CheckApply method for Password resource. Does nothing, returns happy! -func (obj *KVRes) CheckApply(apply bool) (bool, error) { +func (obj *KVRes) CheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("CheckApply(%t)", apply) wg := &sync.WaitGroup{} defer wg.Wait() // this must be above the defer cancel() call - ctx, cancel := context.WithTimeout(context.Background(), kvCheckApplyTimeout) + ctx, cancel := context.WithTimeout(ctx, kvCheckApplyTimeout) defer cancel() wg.Add(1) go func() { diff --git a/engine/resources/mount.go b/engine/resources/mount.go index 9f1d33ef..e591679f 100644 --- a/engine/resources/mount.go +++ b/engine/resources/mount.go @@ -291,7 +291,7 @@ func (obj *MountRes) Watch(ctx context.Context) error { // fstabCheckApply checks /etc/fstab for entries corresponding to the resource // definition, and adds or deletes the entry as needed. -func (obj *MountRes) fstabCheckApply(apply bool) (bool, error) { +func (obj *MountRes) fstabCheckApply(ctx context.Context, apply bool) (bool, error) { exists, err := fstabEntryExists(fstabPath, obj.mount) if err != nil { return false, errwrap.Wrapf(err, "error checking if fstab entry exists") @@ -321,7 +321,7 @@ func (obj *MountRes) fstabCheckApply(apply bool) (bool, error) { // mountCheckApply checks if the defined resource is mounted, and mounts or // unmounts it according to the defined state. -func (obj *MountRes) mountCheckApply(apply bool) (bool, error) { +func (obj *MountRes) mountCheckApply(ctx context.Context, apply bool) (bool, error) { exists, err := mountExists(procPath, obj.mount) if err != nil { return false, errwrap.Wrapf(err, "error checking if mount exists") @@ -340,7 +340,7 @@ func (obj *MountRes) mountCheckApply(apply bool) (bool, error) { if obj.State == "exists" { // Reload mounts from /etc/fstab by performing a `daemon-reload` and // restarting `local-fs.target` and `remote-fs.target` units. - if err := mountReload(); err != nil { + if err := mountReload(ctx); err != nil { return false, errwrap.Wrapf(err, "error reloading /etc/fstab") } return false, nil // we're done @@ -355,16 +355,16 @@ func (obj *MountRes) mountCheckApply(apply bool) (bool, error) { // CheckApply is run to check the state and, if apply is true, to apply the // necessary changes to reach the desired state. This is run before Watch and // again if Watch finds a change occurring to the state. -func (obj *MountRes) CheckApply(apply bool) (bool, error) { +func (obj *MountRes) CheckApply(ctx context.Context, apply bool) (bool, error) { checkOK := true - if c, err := obj.fstabCheckApply(apply); err != nil { + if c, err := obj.fstabCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false } - if c, err := obj.mountCheckApply(apply); err != nil { + if c, err := obj.mountCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false @@ -584,7 +584,7 @@ func mountCompare(def, proc *fstab.Mount) (bool, error) { // mountReload performs a daemon-reload and restarts fs-local.target and // fs-remote.target, to let systemd mount any new entries in /etc/fstab. -func mountReload() error { +func mountReload(ctx context.Context) error { // establish a godbus connection conn, err := util.SystemBusPrivateUsable() if err != nil { @@ -598,12 +598,12 @@ func mountReload() error { } // systemctl restart local-fs.target - if err := restartUnit(conn, "local-fs.target"); err != nil { + if err := restartUnit(ctx, conn, "local-fs.target"); err != nil { return errwrap.Wrapf(err, "error restarting unit") } // systemctl restart remote-fs.target - if err := restartUnit(conn, "remote-fs.target"); err != nil { + if err := restartUnit(ctx, conn, "remote-fs.target"); err != nil { return errwrap.Wrapf(err, "error restarting unit") } @@ -612,9 +612,9 @@ func mountReload() error { // restartUnit restarts the given dbus unit and waits for it to finish starting // up. If restartTimeout is exceeded, it will return an error. -func restartUnit(conn *dbus.Conn, unit string) error { +func restartUnit(ctx context.Context, conn *dbus.Conn, unit string) error { // timeout if we don't get the JobRemoved event - ctx, cancel := context.WithTimeout(context.TODO(), dbusRestartCtxTimeout*time.Second) + ctx, cancel := context.WithTimeout(ctx, dbusRestartCtxTimeout*time.Second) defer cancel() // Add a dbus rule to watch the systemd1 JobRemoved signal used to wait diff --git a/engine/resources/msg.go b/engine/resources/msg.go index 0f361678..a3531d07 100644 --- a/engine/resources/msg.go +++ b/engine/resources/msg.go @@ -163,7 +163,7 @@ func (obj *MsgRes) journalPriority() journal.Priority { // CheckApply method for Msg resource. Every check leads to an apply, meaning // that the message is flushed to the journal. -func (obj *MsgRes) CheckApply(apply bool) (bool, error) { +func (obj *MsgRes) CheckApply(ctx context.Context, apply bool) (bool, error) { // isStateOK() done by engine, so we updateStateOK() to pass in value //if obj.isAllStateOK() { // return true, nil diff --git a/engine/resources/net.go b/engine/resources/net.go index 943d3d69..1d76ef2d 100644 --- a/engine/resources/net.go +++ b/engine/resources/net.go @@ -313,7 +313,7 @@ func (obj *NetRes) Watch(ctx context.Context) error { // ifaceCheckApply checks the state of the network device and brings it up or // down as necessary. -func (obj *NetRes) ifaceCheckApply(apply bool) (bool, error) { +func (obj *NetRes) ifaceCheckApply(ctx context.Context, apply bool) (bool, error) { // check the interface state state, err := obj.iface.state() if err != nil { @@ -340,7 +340,7 @@ func (obj *NetRes) ifaceCheckApply(apply bool) (bool, error) { // addrCheckApply checks if the interface has the correct addresses and then // adds/deletes addresses as necessary. -func (obj *NetRes) addrCheckApply(apply bool) (bool, error) { +func (obj *NetRes) addrCheckApply(ctx context.Context, apply bool) (bool, error) { // get the link's addresses ifaceAddrs, err := obj.iface.getAddrs() if err != nil { @@ -388,7 +388,7 @@ func (obj *NetRes) addrCheckApply(apply bool) (bool, error) { // gatewayCheckApply checks if the interface has the correct default gateway and // adds/deletes routes as necessary. -func (obj *NetRes) gatewayCheckApply(apply bool) (bool, error) { +func (obj *NetRes) gatewayCheckApply(ctx context.Context, apply bool) (bool, error) { // get all routes from the interface routes, err := netlink.RouteList(obj.iface.link, netlink.FAMILY_V4) if err != nil { @@ -440,7 +440,7 @@ func (obj *NetRes) gatewayCheckApply(apply bool) (bool, error) { } // fileCheckApply checks and maintains the systemd-networkd unit file contents. -func (obj *NetRes) fileCheckApply(apply bool) (bool, error) { +func (obj *NetRes) fileCheckApply(ctx context.Context, apply bool) (bool, error) { // check if the unit file exists _, err := os.Stat(obj.unitFilePath) if err != nil && !os.IsNotExist(err) { @@ -475,11 +475,11 @@ func (obj *NetRes) fileCheckApply(apply bool) (bool, error) { // CheckApply is run to check the state and, if apply is true, to apply the // necessary changes to reach the desired state. This is run before Watch and // again if Watch finds a change occurring to the state. -func (obj *NetRes) CheckApply(apply bool) (bool, error) { +func (obj *NetRes) CheckApply(ctx context.Context, apply bool) (bool, error) { checkOK := true // check the network device - if c, err := obj.ifaceCheckApply(apply); err != nil { + if c, err := obj.ifaceCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false @@ -491,14 +491,14 @@ func (obj *NetRes) CheckApply(apply bool) (bool, error) { } // check the addresses - if c, err := obj.addrCheckApply(apply); err != nil { + if c, err := obj.addrCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false } // check the gateway - if c, err := obj.gatewayCheckApply(apply); err != nil { + if c, err := obj.gatewayCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false @@ -510,7 +510,7 @@ func (obj *NetRes) CheckApply(apply bool) (bool, error) { } // check the networkd unit file - if c, err := obj.fileCheckApply(apply); err != nil { + if c, err := obj.fileCheckApply(ctx, apply); err != nil { return false, err } else if !c { checkOK = false diff --git a/engine/resources/noop.go b/engine/resources/noop.go index b6b186a9..d6c70341 100644 --- a/engine/resources/noop.go +++ b/engine/resources/noop.go @@ -76,7 +76,7 @@ func (obj *NoopRes) Watch(ctx context.Context) error { } // CheckApply method for Noop resource. Does nothing, returns happy! -func (obj *NoopRes) CheckApply(apply bool) (bool, error) { +func (obj *NoopRes) CheckApply(ctx context.Context, apply bool) (bool, error) { if obj.init.Refresh() { obj.init.Logf("received a notification!") } diff --git a/engine/resources/nspawn.go b/engine/resources/nspawn.go index 594ab604..20ed3182 100644 --- a/engine/resources/nspawn.go +++ b/engine/resources/nspawn.go @@ -203,7 +203,7 @@ func (obj *NspawnRes) Watch(ctx context.Context) error { // CheckApply is run to check the state and, if apply is true, to apply the // necessary changes to reach the desired state. This is run before Watch and // again if Watch finds a change occurring to the state. -func (obj *NspawnRes) CheckApply(apply bool) (bool, error) { +func (obj *NspawnRes) CheckApply(ctx context.Context, apply bool) (bool, error) { // this resource depends on systemd to ensure that it's running if !systemdUtil.IsRunningSystemd() { return false, errors.New("systemd is not running") @@ -253,7 +253,7 @@ func (obj *NspawnRes) CheckApply(apply bool) (bool, error) { obj.init.Logf("CheckApply() applying '%s' state", obj.State) // use the embedded svc to apply the correct state - if _, err := obj.svc.CheckApply(apply); err != nil { + if _, err := obj.svc.CheckApply(ctx, apply); err != nil { return false, errwrap.Wrapf(err, "nested svc failed") } diff --git a/engine/resources/password.go b/engine/resources/password.go index 18b9716b..ffb11729 100644 --- a/engine/resources/password.go +++ b/engine/resources/password.go @@ -219,7 +219,7 @@ func (obj *PasswordRes) Watch(ctx context.Context) error { } // CheckApply method for Password resource. Does nothing, returns happy! -func (obj *PasswordRes) CheckApply(apply bool) (bool, error) { +func (obj *PasswordRes) CheckApply(ctx context.Context, apply bool) (bool, error) { var refresh = obj.init.Refresh() // do we have a pending reload to apply? var exists = true // does the file (aka the token) exist? var generate bool // do we need to generate a new password? diff --git a/engine/resources/pippet.go b/engine/resources/pippet.go index 84c3321a..cc42f4f0 100644 --- a/engine/resources/pippet.go +++ b/engine/resources/pippet.go @@ -108,7 +108,7 @@ func (obj *PippetRes) Watch(ctx context.Context) error { } // CheckApply synchronizes the resource if required. -func (obj *PippetRes) CheckApply(apply bool) (bool, error) { +func (obj *PippetRes) CheckApply(ctx context.Context, apply bool) (bool, error) { if !apply { return false, nil } diff --git a/engine/resources/pkg.go b/engine/resources/pkg.go index 61f442a0..a51517a4 100644 --- a/engine/resources/pkg.go +++ b/engine/resources/pkg.go @@ -278,7 +278,7 @@ func (obj *PkgRes) populateFileList() error { // CheckApply checks the resource state and applies the resource if the bool // input is true. It returns error info and if the state check passed or not. -func (obj *PkgRes) CheckApply(apply bool) (bool, error) { +func (obj *PkgRes) CheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("Check: %s", obj.fmtNames(obj.getNames())) bus := packagekit.NewBus() diff --git a/engine/resources/print.go b/engine/resources/print.go index 92f578e9..caf67177 100644 --- a/engine/resources/print.go +++ b/engine/resources/print.go @@ -83,7 +83,7 @@ func (obj *PrintRes) Watch(ctx context.Context) error { } // CheckApply method for Print resource. Does nothing, returns happy! -func (obj *PrintRes) CheckApply(apply bool) (bool, error) { +func (obj *PrintRes) CheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("CheckApply: %t", apply) if val, exists := obj.init.Recv()["Msg"]; exists && val.Changed { // if we received on Msg, and it changed, log message diff --git a/engine/resources/resources_test.go b/engine/resources/resources_test.go index 036e2f5a..2f863214 100644 --- a/engine/resources/resources_test.go +++ b/engine/resources/resources_test.go @@ -644,7 +644,7 @@ func TestResources1(t *testing.T) { } t.Logf("test #%d: running CheckApply", index) - checkOK, err := res.CheckApply(true) // no noop! + checkOK, err := res.CheckApply(doneCtx, true) // no noop! if err != nil { t.Errorf("test #%d: FAIL", index) t.Errorf("test #%d: CheckApply failed: %s", index, err.Error()) @@ -771,7 +771,7 @@ func TestResources2(t *testing.T) { // the returned error. resCheckApplyError := func(res engine.Res, expCheckOK bool, errOK func(e error) error) func() error { return func() error { - checkOK, err := res.CheckApply(true) // no noop! + checkOK, err := res.CheckApply(context.TODO(), true) // no noop! if e := errOK(err); e != nil { return errwrap.Wrapf(e, "error from CheckApply did not match expected") } diff --git a/engine/resources/svc.go b/engine/resources/svc.go index 19281d21..da60decb 100644 --- a/engine/resources/svc.go +++ b/engine/resources/svc.go @@ -238,7 +238,7 @@ func (obj *SvcRes) Watch(ctx context.Context) error { // CheckApply checks the resource state and applies the resource if the bool // input is true. It returns error info and if the state check passed or not. -func (obj *SvcRes) CheckApply(apply bool) (bool, error) { +func (obj *SvcRes) CheckApply(ctx context.Context, apply bool) (bool, error) { if !systemdUtil.IsRunningSystemd() { return false, fmt.Errorf("systemd is not running") } @@ -338,6 +338,7 @@ func (obj *SvcRes) CheckApply(apply bool) (bool, error) { refresh = false // we did a stop, so a reload is not needed } + // XXX: use ctx here status := <-result if &status == nil { return false, fmt.Errorf("systemd service action result is nil") diff --git a/engine/resources/test.go b/engine/resources/test.go index ff82d4a2..f986abeb 100644 --- a/engine/resources/test.go +++ b/engine/resources/test.go @@ -139,7 +139,7 @@ func (obj *TestRes) Watch(ctx context.Context) error { } // CheckApply method for Test resource. Does nothing, returns happy! -func (obj *TestRes) CheckApply(apply bool) (bool, error) { +func (obj *TestRes) CheckApply(ctx context.Context, apply bool) (bool, error) { for key, val := range obj.init.Recv() { obj.init.Logf("CheckApply: Received `%s`, changed: %t", key, val.Changed) } diff --git a/engine/resources/tftp.go b/engine/resources/tftp.go index 289d95b4..5b06f8da 100644 --- a/engine/resources/tftp.go +++ b/engine/resources/tftp.go @@ -216,7 +216,7 @@ func (obj *TFTPServerRes) Watch(ctx context.Context) error { // CheckApply never has anything to do for this resource, so it always succeeds. // It does however check that certain runtime requirements (such as the Root dir // existing if one was specified) are fulfilled. -func (obj *TFTPServerRes) CheckApply(apply bool) (bool, error) { +func (obj *TFTPServerRes) CheckApply(ctx context.Context, apply bool) (bool, error) { if obj.init.Debug { obj.init.Logf("CheckApply") } @@ -561,7 +561,7 @@ func (obj *TFTPFileRes) Watch(ctx context.Context) error { } // CheckApply never has anything to do for this resource, so it always succeeds. -func (obj *TFTPFileRes) CheckApply(apply bool) (bool, error) { +func (obj *TFTPFileRes) CheckApply(ctx context.Context, apply bool) (bool, error) { if obj.init.Debug { obj.init.Logf("CheckApply") } diff --git a/engine/resources/timer.go b/engine/resources/timer.go index 510e53b3..66e19bcf 100644 --- a/engine/resources/timer.go +++ b/engine/resources/timer.go @@ -98,7 +98,7 @@ func (obj *TimerRes) Watch(ctx context.Context) error { } // CheckApply method for Timer resource. Triggers a timer reset on notify. -func (obj *TimerRes) CheckApply(apply bool) (bool, error) { +func (obj *TimerRes) CheckApply(ctx context.Context, apply bool) (bool, error) { // because there are no checks to run, this resource has a less // traditional pattern than what is seen in most resources... if !obj.init.Refresh() { // this works for apply || !apply diff --git a/engine/resources/user.go b/engine/resources/user.go index 30f05b0b..479153ce 100644 --- a/engine/resources/user.go +++ b/engine/resources/user.go @@ -169,7 +169,7 @@ func (obj *UserRes) Watch(ctx context.Context) error { } // CheckApply method for User resource. -func (obj *UserRes) CheckApply(apply bool) (bool, error) { +func (obj *UserRes) CheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("CheckApply(%t)", apply) var exists = true diff --git a/engine/resources/virt.go b/engine/resources/virt.go index 1c3bdc7e..43205f1b 100644 --- a/engine/resources/virt.go +++ b/engine/resources/virt.go @@ -505,7 +505,7 @@ func (obj *VirtRes) domainCreate() (*libvirt.Domain, bool, error) { } // stateCheckApply starts, stops, or pauses/unpauses the domain as needed. -func (obj *VirtRes) stateCheckApply(apply bool, dom *libvirt.Domain) (bool, error) { +func (obj *VirtRes) stateCheckApply(ctx context.Context, apply bool, dom *libvirt.Domain) (bool, error) { var checkOK = true domInfo, err := dom.GetInfo() if err != nil { @@ -586,7 +586,7 @@ func (obj *VirtRes) stateCheckApply(apply bool, dom *libvirt.Domain) (bool, erro // attrCheckApply performs the CheckApply functions for CPU, Memory and others. // This shouldn't be called when the machine is absent; it won't be found! -func (obj *VirtRes) attrCheckApply(apply bool, dom *libvirt.Domain) (bool, error) { +func (obj *VirtRes) attrCheckApply(ctx context.Context, apply bool, dom *libvirt.Domain) (bool, error) { var checkOK = true domInfo, err := dom.GetInfo() if err != nil { @@ -752,7 +752,7 @@ func (obj *VirtRes) domainShutdownSync(apply bool, dom *libvirt.Domain) (bool, e // CheckApply checks the resource state and applies the resource if the bool // input is true. It returns error info and if the state check passed or not. -func (obj *VirtRes) CheckApply(apply bool) (bool, error) { +func (obj *VirtRes) CheckApply(ctx context.Context, apply bool) (bool, error) { if obj.conn == nil { // programming error? return false, fmt.Errorf("got called with nil connection") } @@ -843,7 +843,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { // FIXME: is doing this early check (therefore twice total) a good idea? // run additional pre-emptive attr change checks here for hotplug stuff! if !obj.absent { - if c, err := obj.attrCheckApply(apply, dom); err != nil { + if c, err := obj.attrCheckApply(ctx, apply, dom); err != nil { return false, errwrap.Wrapf(err, "early attrCheckApply failed") } else if !c { checkOK = false @@ -852,7 +852,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { // TODO: do we need to run again below after we've booted up the domain? // apply correct machine state, eg: startup/shutoff/pause as needed - if c, err := obj.stateCheckApply(apply, dom); err != nil { + if c, err := obj.stateCheckApply(ctx, apply, dom); err != nil { return false, errwrap.Wrapf(err, "stateCheckApply failed") } else if !c { checkOK = false @@ -863,7 +863,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) { // mem & cpu checks... if !obj.absent { - if c, err := obj.attrCheckApply(apply, dom); err != nil { + if c, err := obj.attrCheckApply(ctx, apply, dom); err != nil { return false, errwrap.Wrapf(err, "attrCheckApply failed") } else if !c { checkOK = false diff --git a/engine/util/util_test.go b/engine/util/util_test.go index bdffc1a0..ddb4fefb 100644 --- a/engine/util/util_test.go +++ b/engine/util/util_test.go @@ -176,7 +176,7 @@ type testEngineRes struct { privateProp2 []int } -func (t *testEngineRes) CheckApply(bool) (bool, error) { return false, nil } +func (t *testEngineRes) CheckApply(context.Context, bool) (bool, error) { return false, nil } func (t *testEngineRes) Cleanup() error { return nil }