engine: Add a ctx to the CheckApply API

This is just a rough port, there are lots of optimizations to be done
and lots of timeout values that should be replaced by a new timeout meta
param!
This commit is contained in:
James Shubin
2023-09-01 22:56:32 -04:00
parent 567de2e115
commit 7ccda7e99b
37 changed files with 175 additions and 120 deletions

View File

@@ -206,7 +206,7 @@ on an error if something went wrong.
### CheckApply ### CheckApply
```golang ```golang
CheckApply(apply bool) (checkOK bool, err error) CheckApply(ctx context.Context, apply bool) (checkOK bool, err error)
``` ```
`CheckApply` is where the real _work_ is done. Under normal circumstances, this `CheckApply` is where the real _work_ is done. Under normal circumstances, this
@@ -215,7 +215,8 @@ should return: `(true, nil)`. If the `apply` variable is set to `true`, then
this means that we should then proceed to run the changes required to bring the this means that we should then proceed to run the changes required to bring the
resource into the correct state. If the `apply` variable is set to `false`, then resource into the correct state. If the `apply` variable is set to `false`, then
the resource is operating in _noop_ mode and _no operational changes_ should be the resource is operating in _noop_ mode and _no operational changes_ should be
made! made! The ctx should be monitored in case a shutdown has been requested. This
may be used if a timeout occurred, or if the user shutdown the engine.
After having executed the necessary operations to bring the resource back into After having executed the necessary operations to bring the resource back into
the desired state, or after having detected that the state was incorrect, but the desired state, or after having detected that the state was incorrect, but
@@ -234,7 +235,7 @@ to `CheckApply`.
```golang ```golang
// CheckApply does the idempotent work of checking and applying resource state. // CheckApply does the idempotent work of checking and applying resource state.
func (obj *FooRes) CheckApply(apply bool) (bool, error) { func (obj *FooRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// check the state // check the state
if state_is_okay { return true, nil } // done early! :) if state_is_okay { return true, nil } // done early! :)
@@ -731,9 +732,9 @@ to be separate. It turns out that the current `CheckApply` can wrap this easily.
It would look approximately like this: It would look approximately like this:
```golang ```golang
func (obj *FooRes) CheckApply(apply bool) (bool, error) { func (obj *FooRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// my private split implementation of check and apply // my private split implementation of check and apply
if c, err := obj.check(); err != nil { if c, err := obj.check(ctx); err != nil {
return false, err // we errored return false, err // we errored
} else if c { } else if c {
return true, nil // state was good! return true, nil // state was good!
@@ -743,7 +744,7 @@ func (obj *FooRes) CheckApply(apply bool) (bool, error) {
return false, nil // state needs fixing, but apply is false return false, nil // state needs fixing, but apply is false
} }
err := obj.apply() // errors if failure or unable to apply err := obj.apply(ctx) // errors if failure or unable to apply
return false, err // always return false, with an optional error return false, err // always return false, with an optional error
} }

View File

@@ -18,6 +18,7 @@
package graph package graph
import ( import (
"context"
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
@@ -60,7 +61,7 @@ func (obj *Engine) BadTimestamps(vertex pgraph.Vertex) []pgraph.Vertex {
} }
// Process is the primary function to execute a particular vertex in the graph. // Process is the primary function to execute a particular vertex in the graph.
func (obj *Engine) Process(vertex pgraph.Vertex) error { func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
res, isRes := vertex.(engine.Res) res, isRes := vertex.(engine.Res)
if !isRes { if !isRes {
return fmt.Errorf("vertex is not a Res") return fmt.Errorf("vertex is not a Res")
@@ -155,7 +156,7 @@ func (obj *Engine) Process(vertex pgraph.Vertex) error {
// run the CheckApply! // run the CheckApply!
obj.Logf("%s: CheckApply(%t)", res, !noop) obj.Logf("%s: CheckApply(%t)", res, !noop)
// if this fails, don't UpdateTimestamp() // if this fails, don't UpdateTimestamp()
checkOK, err = res.CheckApply(!noop) checkOK, err = res.CheckApply(ctx, !noop)
obj.Logf("%s: CheckApply(%t): Return(%t, %s)", res, !noop, checkOK, engineUtil.CleanError(err)) obj.Logf("%s: CheckApply(%t): Return(%t, %s)", res, !noop, checkOK, engineUtil.CleanError(err))
} }
@@ -563,7 +564,7 @@ Loop:
if obj.Debug { if obj.Debug {
obj.Logf("Process(%s)", vertex) obj.Logf("Process(%s)", vertex)
} }
err = obj.Process(vertex) err = obj.Process(obj.state[vertex].doneCtx, vertex)
if obj.Debug { if obj.Debug {
obj.Logf("Process(%s): Return(%s)", vertex, engineUtil.CleanError(err)) obj.Logf("Process(%s): Return(%s)", vertex, engineUtil.CleanError(err))
} }

View File

@@ -71,7 +71,7 @@ func (obj *NoopResTest) Watch(context.Context) error {
return nil // not needed return nil // not needed
} }
func (obj *NoopResTest) CheckApply(apply bool) (checkOK bool, err error) { func (obj *NoopResTest) CheckApply(ctx context.Context, apply bool) (checkOK bool, err error) {
return true, nil // state is always okay return true, nil // state is always okay
} }

View File

@@ -199,8 +199,12 @@ type Res interface {
Watch(context.Context) error Watch(context.Context) error
// CheckApply determines if the state of the resource is correct and if // CheckApply determines if the state of the resource is correct and if
// asked to with the `apply` variable, applies the requested state. // asked to with the `apply` variable, applies the requested state. If
CheckApply(apply bool) (checkOK bool, err error) // the input context cancels, we must return as quickly as possible. We
// should never exit immediately if this would cause permanent
// corruption of some sort. However it doesn't mean that a resource was
// taken to the desired state.
CheckApply(ctx context.Context, apply bool) (checkOK bool, err error)
// Cmp compares itself to another resource and returns an error if they // Cmp compares itself to another resource and returns an error if they
// are not equivalent. This is more strict than the Adapts method of the // are not equivalent. This is more strict than the Adapts method of the

View File

@@ -168,7 +168,7 @@ func (obj *AugeasRes) Watch(ctx context.Context) error {
} }
// checkApplySet runs CheckApply for one element of the AugeasRes.Set // checkApplySet runs CheckApply for one element of the AugeasRes.Set
func (obj *AugeasRes) checkApplySet(apply bool, ag *augeas.Augeas, set *AugeasSet) (bool, error) { func (obj *AugeasRes) checkApplySet(ctx context.Context, apply bool, ag *augeas.Augeas, set *AugeasSet) (bool, error) {
fullpath := fmt.Sprintf("/files/%v/%v", obj.File, set.Path) fullpath := fmt.Sprintf("/files/%v/%v", obj.File, set.Path)
// We do not check for errors because errors are also thrown when // We do not check for errors because errors are also thrown when
@@ -193,7 +193,7 @@ func (obj *AugeasRes) checkApplySet(apply bool, ag *augeas.Augeas, set *AugeasSe
} }
// CheckApply method for Augeas resource. // CheckApply method for Augeas resource.
func (obj *AugeasRes) CheckApply(apply bool) (bool, error) { func (obj *AugeasRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("CheckApply: %s", obj.File) obj.init.Logf("CheckApply: %s", obj.File)
// By default we do not set any option to augeas, we use the defaults. // By default we do not set any option to augeas, we use the defaults.
opts := augeas.None opts := augeas.None
@@ -230,7 +230,7 @@ func (obj *AugeasRes) CheckApply(apply bool) (bool, error) {
checkOK := true checkOK := true
for _, set := range obj.Sets { for _, set := range obj.Sets {
if setCheckOK, err := obj.checkApplySet(apply, &ag, set); err != nil { if setCheckOK, err := obj.checkApplySet(ctx, apply, &ag, set); err != nil {
return false, errwrap.Wrapf(err, "augeas: error during CheckApply of one Set") return false, errwrap.Wrapf(err, "augeas: error during CheckApply of one Set")
} else if !setCheckOK { } else if !setCheckOK {
checkOK = false checkOK = false

View File

@@ -625,7 +625,7 @@ func (obj *AwsEc2Res) snsWatch(ctx context.Context) error {
} }
// CheckApply method for AwsEc2 resource. // CheckApply method for AwsEc2 resource.
func (obj *AwsEc2Res) CheckApply(apply bool) (bool, error) { func (obj *AwsEc2Res) CheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("CheckApply(%t)", apply) obj.init.Logf("CheckApply(%t)", apply)
// find the instance we need to check // find the instance we need to check
@@ -735,7 +735,7 @@ func (obj *AwsEc2Res) CheckApply(apply bool) (bool, error) {
} }
// context to cancel the waiter if it takes too long // context to cancel the waiter if it takes too long
innerCtx, cancel := context.WithTimeout(context.TODO(), waitTimeout*time.Second) innerCtx, cancel := context.WithTimeout(ctx, waitTimeout*time.Second)
defer cancel() defer cancel()
// wait until the state converges // wait until the state converges

View File

@@ -130,10 +130,10 @@ Loop:
// to zero, then it *won't* try and change it away from zero, because it assumes // to zero, then it *won't* try and change it away from zero, because it assumes
// that someone has requested a shutdown. If the value is seen on first startup, // that someone has requested a shutdown. If the value is seen on first startup,
// then it will change it, because it might be a zero from the previous cluster. // then it will change it, because it might be a zero from the previous cluster.
func (obj *ConfigEtcdRes) sizeCheckApply(apply bool) (bool, error) { func (obj *ConfigEtcdRes) sizeCheckApply(ctx context.Context, apply bool) (bool, error) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
defer wg.Wait() // this must be above the defer cancel() call defer wg.Wait() // this must be above the defer cancel() call
ctx, cancel := context.WithTimeout(context.Background(), sizeCheckApplyTimeout) ctx, cancel := context.WithTimeout(ctx, sizeCheckApplyTimeout)
defer cancel() defer cancel()
wg.Add(1) wg.Add(1)
go func() { go func() {
@@ -182,17 +182,17 @@ func (obj *ConfigEtcdRes) sizeCheckApply(apply bool) (bool, error) {
} }
// CheckApply method for Noop resource. Does nothing, returns happy! // CheckApply method for Noop resource. Does nothing, returns happy!
func (obj *ConfigEtcdRes) CheckApply(apply bool) (bool, error) { func (obj *ConfigEtcdRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
checkOK := true checkOK := true
if c, err := obj.sizeCheckApply(apply); err != nil { if c, err := obj.sizeCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false
} }
// TODO: add more config settings management here... // TODO: add more config settings management here...
//if c, err := obj.TODOCheckApply(apply); err != nil { //if c, err := obj.TODOCheckApply(ctx, apply); err != nil {
// return false, err // return false, err
//} else if !c { //} else if !c {
// checkOK = false // checkOK = false

View File

@@ -195,7 +195,8 @@ func (obj *ConsulKVRes) Watch(ctx context.Context) error {
// CheckApply is run to check the state and, if apply is true, to apply the // CheckApply is run to check the state and, if apply is true, to apply the
// necessary changes to reach the desired state. This is run before Watch and // necessary changes to reach the desired state. This is run before Watch and
// again if Watch finds a change occurring to the state. // again if Watch finds a change occurring to the state.
func (obj *ConsulKVRes) CheckApply(apply bool) (bool, error) { func (obj *ConsulKVRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// XXX: use ctx for get and put
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("consul key: %s", obj.key) obj.init.Logf("consul key: %s", obj.key)
} }

View File

@@ -317,16 +317,16 @@ func (obj *CronRes) Watch(ctx context.Context) error {
// CheckApply is run to check the state and, if apply is true, to apply the // CheckApply is run to check the state and, if apply is true, to apply the
// necessary changes to reach the desired state. This is run before Watch and // necessary changes to reach the desired state. This is run before Watch and
// again if Watch finds a change occurring to the state. // again if Watch finds a change occurring to the state.
func (obj *CronRes) CheckApply(apply bool) (bool, error) { func (obj *CronRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
checkOK := true checkOK := true
// use the embedded file resource to apply the correct state // use the embedded file resource to apply the correct state
if c, err := obj.file.CheckApply(apply); err != nil { if c, err := obj.file.CheckApply(ctx, apply); err != nil {
return false, errwrap.Wrapf(err, "nested file failed") return false, errwrap.Wrapf(err, "nested file failed")
} else if !c { } else if !c {
checkOK = false checkOK = false
} }
// check timer state and apply the defined state if needed // check timer state and apply the defined state if needed
if c, err := obj.unitCheckApply(apply); err != nil { if c, err := obj.unitCheckApply(ctx, apply); err != nil {
return false, errwrap.Wrapf(err, "unitCheckApply error") return false, errwrap.Wrapf(err, "unitCheckApply error")
} else if !c { } else if !c {
checkOK = false checkOK = false
@@ -336,7 +336,7 @@ func (obj *CronRes) CheckApply(apply bool) (bool, error) {
// unitCheckApply checks the state of the systemd-timer unit and, if apply is // unitCheckApply checks the state of the systemd-timer unit and, if apply is
// true, applies the defined state. // true, applies the defined state.
func (obj *CronRes) unitCheckApply(apply bool) (bool, error) { func (obj *CronRes) unitCheckApply(ctx context.Context, apply bool) (bool, error) {
var conn *sdbus.Conn var conn *sdbus.Conn
var godbusConn *dbus.Conn var godbusConn *dbus.Conn
var err error var err error
@@ -383,7 +383,7 @@ func (obj *CronRes) unitCheckApply(apply bool) (bool, error) {
} }
// context for stopping/restarting the unit // context for stopping/restarting the unit
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout*time.Second) ctx, cancel := context.WithTimeout(ctx, ctxTimeout*time.Second)
defer cancel() defer cancel()
// godbus connection for stopping/restarting the unit // godbus connection for stopping/restarting the unit

View File

@@ -476,7 +476,7 @@ func (obj *DHCPServerRes) Watch(ctx context.Context) error {
// sidCheckApply runs the server ID cache operation in CheckApply, which can // sidCheckApply runs the server ID cache operation in CheckApply, which can
// help CheckApply fail before the handler runs, so at least we see an error. // help CheckApply fail before the handler runs, so at least we see an error.
func (obj *DHCPServerRes) sidCheckApply(apply bool) (bool, error) { func (obj *DHCPServerRes) sidCheckApply(ctx context.Context, apply bool) (bool, error) {
// Mutex guards the cached obj.serverID value. // Mutex guards the cached obj.serverID value.
defer obj.sidMutex.Unlock() defer obj.sidMutex.Unlock()
obj.sidMutex.Lock() obj.sidMutex.Lock()
@@ -500,7 +500,7 @@ func (obj *DHCPServerRes) sidCheckApply(apply bool) (bool, error) {
// CheckApply never has anything to do for this resource, so it always succeeds. // CheckApply never has anything to do for this resource, so it always succeeds.
// It does however check that certain runtime requirements (such as the Root dir // It does however check that certain runtime requirements (such as the Root dir
// existing if one was specified) are fulfilled. // existing if one was specified) are fulfilled.
func (obj *DHCPServerRes) CheckApply(apply bool) (bool, error) { func (obj *DHCPServerRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("CheckApply") obj.init.Logf("CheckApply")
} }
@@ -524,7 +524,7 @@ func (obj *DHCPServerRes) CheckApply(apply bool) (bool, error) {
// Cheap runtime validation! // Cheap runtime validation!
checkOK := true checkOK := true
if c, err := obj.sidCheckApply(apply); err != nil { if c, err := obj.sidCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false
@@ -1066,7 +1066,7 @@ func (obj *DHCPHostRes) Watch(ctx context.Context) error {
} }
// CheckApply never has anything to do for this resource, so it always succeeds. // CheckApply never has anything to do for this resource, so it always succeeds.
func (obj *DHCPHostRes) CheckApply(apply bool) (bool, error) { func (obj *DHCPHostRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("CheckApply") obj.init.Logf("CheckApply")
} }

View File

@@ -214,11 +214,11 @@ func (obj *DockerContainerRes) Watch(ctx context.Context) error {
} }
// CheckApply method for Docker resource. // CheckApply method for Docker resource.
func (obj *DockerContainerRes) CheckApply(apply bool) (bool, error) { func (obj *DockerContainerRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
var id string var id string
var destroy bool var destroy bool
ctx, cancel := context.WithTimeout(context.Background(), checkApplyCtxTimeout*time.Second) ctx, cancel := context.WithTimeout(ctx, checkApplyCtxTimeout*time.Second)
defer cancel() defer cancel()
// List any container whose name matches this resource. // List any container whose name matches this resource.

View File

@@ -172,8 +172,8 @@ func (obj *DockerImageRes) Watch(ctx context.Context) error {
} }
// CheckApply method for Docker resource. // CheckApply method for Docker resource.
func (obj *DockerImageRes) CheckApply(apply bool) (checkOK bool, err error) { func (obj *DockerImageRes) CheckApply(ctx context.Context, apply bool) (checkOK bool, err error) {
ctx, cancel := context.WithTimeout(context.Background(), dockerImageCheckApplyCtxTimeout*time.Second) ctx, cancel := context.WithTimeout(ctx, dockerImageCheckApplyCtxTimeout*time.Second)
defer cancel() defer cancel()
s, err := obj.client.ImageList(ctx, types.ImageListOptions{ s, err := obj.client.ImageList(ctx, types.ImageListOptions{

View File

@@ -277,7 +277,7 @@ func (obj *ExecRes) Watch(ctx context.Context) error {
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.
// TODO: expand the IfCmd to be a list of commands // TODO: expand the IfCmd to be a list of commands
func (obj *ExecRes) CheckApply(apply bool) (bool, error) { func (obj *ExecRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// If we receive a refresh signal, then the engine skips the IsStateOK() // If we receive a refresh signal, then the engine skips the IsStateOK()
// check and this will run. It is still guarded by the IfCmd, but it can // check and this will run. It is still guarded by the IfCmd, but it can
// have a chance to execute, and all without the check of obj.Refresh()! // have a chance to execute, and all without the check of obj.Refresh()!
@@ -381,15 +381,15 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
defer wg.Wait() // this must be above the defer cancel() call defer wg.Wait() // this must be above the defer cancel() call
var ctx context.Context var innerCtx context.Context
var cancel context.CancelFunc var cancel context.CancelFunc
if obj.Timeout > 0 { // cmd.Process.Kill() is called on timeout if obj.Timeout > 0 { // cmd.Process.Kill() is called on timeout
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(obj.Timeout)*time.Second) innerCtx, cancel = context.WithTimeout(ctx, time.Duration(obj.Timeout)*time.Second)
} else { // zero timeout means no timer } else { // zero timeout means no timer
ctx, cancel = context.WithCancel(context.Background()) innerCtx, cancel = context.WithCancel(ctx)
} }
defer cancel() defer cancel()
cmd := exec.CommandContext(ctx, cmdName, cmdArgs...) cmd := exec.CommandContext(innerCtx, cmdName, cmdArgs...)
cmd.Dir = obj.Cwd // run program in pwd if "" cmd.Dir = obj.Cwd // run program in pwd if ""
envKeys := []string{} envKeys := []string{}
@@ -432,7 +432,7 @@ func (obj *ExecRes) CheckApply(apply bool) (bool, error) {
select { select {
case <-obj.interruptChan: case <-obj.interruptChan:
cancel() cancel()
case <-ctx.Done(): case <-innerCtx.Done():
// let this exit // let this exit
} }
}() }()

View File

@@ -53,6 +53,18 @@ func fakeExecInit(t *testing.T) (*engine.Init, *ExecSends) {
} }
func TestExecSendRecv1(t *testing.T) { func TestExecSendRecv1(t *testing.T) {
now := time.Now()
min := time.Second * 3 // approx min time needed for the test
ctx := context.Background()
if deadline, ok := t.Deadline(); ok {
d := deadline.Add(-min)
t.Logf(" now: %+v", now)
t.Logf(" d: %+v", d)
newCtx, cancel := context.WithDeadline(ctx, d)
ctx = newCtx
defer cancel()
}
r1 := &ExecRes{ r1 := &ExecRes{
Cmd: "echo hello world", Cmd: "echo hello world",
Shell: "/bin/bash", Shell: "/bin/bash",
@@ -71,7 +83,7 @@ func TestExecSendRecv1(t *testing.T) {
t.Errorf("init failed with: %v", err) t.Errorf("init failed with: %v", err)
} }
// run artificially without the entire engine // run artificially without the entire engine
if _, err := r1.CheckApply(true); err != nil { if _, err := r1.CheckApply(ctx, true); err != nil {
t.Errorf("checkapply failed with: %v", err) t.Errorf("checkapply failed with: %v", err)
} }
@@ -98,6 +110,18 @@ func TestExecSendRecv1(t *testing.T) {
} }
func TestExecSendRecv2(t *testing.T) { func TestExecSendRecv2(t *testing.T) {
now := time.Now()
min := time.Second * 3 // approx min time needed for the test
ctx := context.Background()
if deadline, ok := t.Deadline(); ok {
d := deadline.Add(-min)
t.Logf(" now: %+v", now)
t.Logf(" d: %+v", d)
newCtx, cancel := context.WithDeadline(ctx, d)
ctx = newCtx
defer cancel()
}
r1 := &ExecRes{ r1 := &ExecRes{
Cmd: "echo hello world 1>&2", // to stderr Cmd: "echo hello world 1>&2", // to stderr
Shell: "/bin/bash", Shell: "/bin/bash",
@@ -116,7 +140,7 @@ func TestExecSendRecv2(t *testing.T) {
t.Errorf("init failed with: %v", err) t.Errorf("init failed with: %v", err)
} }
// run artificially without the entire engine // run artificially without the entire engine
if _, err := r1.CheckApply(true); err != nil { if _, err := r1.CheckApply(ctx, true); err != nil {
t.Errorf("checkapply failed with: %v", err) t.Errorf("checkapply failed with: %v", err)
} }
@@ -143,6 +167,18 @@ func TestExecSendRecv2(t *testing.T) {
} }
func TestExecSendRecv3(t *testing.T) { func TestExecSendRecv3(t *testing.T) {
now := time.Now()
min := time.Second * 3 // approx min time needed for the test
ctx := context.Background()
if deadline, ok := t.Deadline(); ok {
d := deadline.Add(-min)
t.Logf(" now: %+v", now)
t.Logf(" d: %+v", d)
newCtx, cancel := context.WithDeadline(ctx, d)
ctx = newCtx
defer cancel()
}
r1 := &ExecRes{ r1 := &ExecRes{
Cmd: "echo hello world && echo goodbye world 1>&2", // to stdout && stderr Cmd: "echo hello world && echo goodbye world 1>&2", // to stdout && stderr
Shell: "/bin/bash", Shell: "/bin/bash",
@@ -161,7 +197,7 @@ func TestExecSendRecv3(t *testing.T) {
t.Errorf("init failed with: %v", err) t.Errorf("init failed with: %v", err)
} }
// run artificially without the entire engine // run artificially without the entire engine
if _, err := r1.CheckApply(true); err != nil { if _, err := r1.CheckApply(ctx, true); err != nil {
t.Errorf("checkapply failed with: %v", err) t.Errorf("checkapply failed with: %v", err)
} }
@@ -206,8 +242,20 @@ func TestExecSendRecv3(t *testing.T) {
} }
func TestExecTimeoutBehaviour(t *testing.T) { func TestExecTimeoutBehaviour(t *testing.T) {
now := time.Now()
min := time.Second * 3 // approx min time needed for the test
ctx := context.Background()
if deadline, ok := t.Deadline(); ok {
d := deadline.Add(-min)
t.Logf(" now: %+v", now)
t.Logf(" d: %+v", d)
newCtx, cancel := context.WithDeadline(ctx, d)
ctx = newCtx
defer cancel()
}
// cmd.Process.Kill() is called on timeout // cmd.Process.Kill() is called on timeout
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel() defer cancel()
cmdName := "/bin/sleep" // it's /usr/bin/sleep on modern distros cmdName := "/bin/sleep" // it's /usr/bin/sleep on modern distros
cmdArgs := []string{"300"} // 5 min in seconds cmdArgs := []string{"300"} // 5 min in seconds

View File

@@ -534,7 +534,7 @@ func (obj *FileRes) Watch(ctx context.Context) error {
// of computing the source data hash, and it returns the computed value if this // of computing the source data hash, and it returns the computed value if this
// function reaches that stage. As usual, it respects the apply action variable, // function reaches that stage. As usual, it respects the apply action variable,
// and has some symmetry with the main CheckApply function. // and has some symmetry with the main CheckApply function.
func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sha256sum string) (string, bool, error) { func (obj *FileRes) fileCheckApply(ctx context.Context, apply bool, src io.ReadSeeker, dst string, sha256sum string) (string, bool, error) {
// TODO: does it make sense to switch dst to an io.Writer ? // TODO: does it make sense to switch dst to an io.Writer ?
// TODO: use obj.Force when dealing with symlinks and other file types! // TODO: use obj.Force when dealing with symlinks and other file types!
if obj.init.Debug { if obj.init.Debug {
@@ -674,7 +674,7 @@ func (obj *FileRes) fileCheckApply(apply bool, src io.ReadSeeker, dst string, sh
} }
// dirCheckApply is the CheckApply operation for an empty directory. // dirCheckApply is the CheckApply operation for an empty directory.
func (obj *FileRes) dirCheckApply(apply bool) (bool, error) { func (obj *FileRes) dirCheckApply(ctx context.Context, apply bool) (bool, error) {
// check if the path exists and is a directory // check if the path exists and is a directory
fileInfo, err := os.Stat(obj.getPath()) fileInfo, err := os.Stat(obj.getPath())
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
@@ -726,7 +726,7 @@ func (obj *FileRes) dirCheckApply(apply bool) (bool, error) {
// fileCheckApply method. It returns checkOK and error as is normally expected. // fileCheckApply method. It returns checkOK and error as is normally expected.
// If excludes is specified, none of those files there will be deleted by this, // If excludes is specified, none of those files there will be deleted by this,
// with the exception that a sync *can* convert a file to a dir, or vice-versa. // with the exception that a sync *can* convert a file to a dir, or vice-versa.
func (obj *FileRes) syncCheckApply(apply bool, src, dst string, excludes []string) (bool, error) { func (obj *FileRes) syncCheckApply(ctx context.Context, apply bool, src, dst string, excludes []string) (bool, error) {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("syncCheckApply: %s -> %s", src, dst) obj.init.Logf("syncCheckApply: %s -> %s", src, dst)
} }
@@ -760,7 +760,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string, excludes []strin
return false, err return false, err
} }
_, checkOK, err := obj.fileCheckApply(apply, fin, dst, "") _, checkOK, err := obj.fileCheckApply(ctx, apply, fin, dst, "")
if err != nil { if err != nil {
fin.Close() fin.Close()
return false, err return false, err
@@ -831,7 +831,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string, excludes []strin
obj.init.Logf("syncCheckApply: recurse: %s -> %s", absSrc, absDst) obj.init.Logf("syncCheckApply: recurse: %s -> %s", absSrc, absDst)
} }
if obj.Recurse { if obj.Recurse {
if c, err := obj.syncCheckApply(apply, absSrc, absDst, excludes); err != nil { // recurse if c, err := obj.syncCheckApply(ctx, apply, absSrc, absDst, excludes); err != nil { // recurse
return false, errwrap.Wrapf(err, "syncCheckApply: recurse failed") return false, errwrap.Wrapf(err, "syncCheckApply: recurse failed")
} else if !c { // don't let subsequent passes make this true } else if !c { // don't let subsequent passes make this true
checkOK = false checkOK = false
@@ -888,7 +888,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string, excludes []strin
} }
_ = absSrc _ = absSrc
//obj.init.Logf("syncCheckApply: recurse rm: %s -> %s", absSrc, absDst) //obj.init.Logf("syncCheckApply: recurse rm: %s -> %s", absSrc, absDst)
//if c, err := obj.syncCheckApply(apply, absSrc, absDst, excludes); err != nil { //if c, err := obj.syncCheckApply(ctx, apply, absSrc, absDst, excludes); err != nil {
// return false, errwrap.Wrapf(err, "syncCheckApply: recurse rm failed") // return false, errwrap.Wrapf(err, "syncCheckApply: recurse rm failed")
//} else if !c { // don't let subsequent passes make this true //} else if !c { // don't let subsequent passes make this true
// checkOK = false // checkOK = false
@@ -910,7 +910,7 @@ func (obj *FileRes) syncCheckApply(apply bool, src, dst string, excludes []strin
// stateCheckApply performs a CheckApply of the file state to create or remove // stateCheckApply performs a CheckApply of the file state to create or remove
// an empty file or directory. // an empty file or directory.
func (obj *FileRes) stateCheckApply(apply bool) (bool, error) { func (obj *FileRes) stateCheckApply(ctx context.Context, apply bool) (bool, error) {
if obj.State == FileStateUndefined { // state is not specified if obj.State == FileStateUndefined { // state is not specified
return true, nil return true, nil
} }
@@ -953,7 +953,7 @@ func (obj *FileRes) stateCheckApply(apply bool) (bool, error) {
// we need to make a file or a directory now // we need to make a file or a directory now
if obj.isDir() { if obj.isDir() {
return obj.dirCheckApply(apply) return obj.dirCheckApply(ctx, apply)
} }
// Optimization: we shouldn't even look at obj.Content here, but we can // Optimization: we shouldn't even look at obj.Content here, but we can
@@ -979,7 +979,7 @@ func (obj *FileRes) stateCheckApply(apply bool) (bool, error) {
} }
// contentCheckApply performs a CheckApply for the file content. // contentCheckApply performs a CheckApply for the file content.
func (obj *FileRes) contentCheckApply(apply bool) (bool, error) { func (obj *FileRes) contentCheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("contentCheckApply(%t)", apply) obj.init.Logf("contentCheckApply(%t)", apply)
// content is not defined, leave it alone... // content is not defined, leave it alone...
@@ -989,7 +989,7 @@ func (obj *FileRes) contentCheckApply(apply bool) (bool, error) {
// Actually write the file. This is similar to fragmentsCheckApply. // Actually write the file. This is similar to fragmentsCheckApply.
bufferSrc := bytes.NewReader([]byte(*obj.Content)) bufferSrc := bytes.NewReader([]byte(*obj.Content))
sha256sum, checkOK, err := obj.fileCheckApply(apply, bufferSrc, obj.getPath(), obj.sha256sum) sha256sum, checkOK, err := obj.fileCheckApply(ctx, apply, bufferSrc, obj.getPath(), obj.sha256sum)
if sha256sum != "" { // empty values mean errored or didn't hash if sha256sum != "" { // empty values mean errored or didn't hash
// this can be valid even when the whole function errors // this can be valid even when the whole function errors
obj.sha256sum = sha256sum // cache value obj.sha256sum = sha256sum // cache value
@@ -1002,7 +1002,7 @@ func (obj *FileRes) contentCheckApply(apply bool) (bool, error) {
} }
// sourceCheckApply performs a CheckApply for the file source. // sourceCheckApply performs a CheckApply for the file source.
func (obj *FileRes) sourceCheckApply(apply bool) (bool, error) { func (obj *FileRes) sourceCheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("sourceCheckApply(%t)", apply) obj.init.Logf("sourceCheckApply(%t)", apply)
// source is not defined, leave it alone... // source is not defined, leave it alone...
@@ -1047,7 +1047,7 @@ func (obj *FileRes) sourceCheckApply(apply bool) (bool, error) {
} }
// XXX: should this work with obj.Purge && obj.Source != "" or not? // XXX: should this work with obj.Purge && obj.Source != "" or not?
checkOK, err := obj.syncCheckApply(apply, obj.Source, obj.getPath(), excludes) checkOK, err := obj.syncCheckApply(ctx, apply, obj.Source, obj.getPath(), excludes)
if err != nil { if err != nil {
obj.init.Logf("syncCheckApply: error: %v", err) obj.init.Logf("syncCheckApply: error: %v", err)
return false, err return false, err
@@ -1057,7 +1057,7 @@ func (obj *FileRes) sourceCheckApply(apply bool) (bool, error) {
} }
// fragmentsCheckApply performs a CheckApply for the file fragments. // fragmentsCheckApply performs a CheckApply for the file fragments.
func (obj *FileRes) fragmentsCheckApply(apply bool) (bool, error) { func (obj *FileRes) fragmentsCheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("fragmentsCheckApply(%t)", apply) obj.init.Logf("fragmentsCheckApply(%t)", apply)
// fragments is not defined, leave it alone... // fragments is not defined, leave it alone...
@@ -1104,7 +1104,7 @@ func (obj *FileRes) fragmentsCheckApply(apply bool) (bool, error) {
// NOTE: We pass in an invalidated sha256sum cache since we don't cache // NOTE: We pass in an invalidated sha256sum cache since we don't cache
// all the individual files, and it could all change without us knowing. // all the individual files, and it could all change without us knowing.
// TODO: Is the sha256sum caching even having an effect at all here ??? // TODO: Is the sha256sum caching even having an effect at all here ???
sha256sum, checkOK, err := obj.fileCheckApply(apply, bufferSrc, obj.getPath(), "") sha256sum, checkOK, err := obj.fileCheckApply(ctx, apply, bufferSrc, obj.getPath(), "")
if sha256sum != "" { // empty values mean errored or didn't hash if sha256sum != "" { // empty values mean errored or didn't hash
// this can be valid even when the whole function errors // this can be valid even when the whole function errors
obj.sha256sum = sha256sum // cache value obj.sha256sum = sha256sum // cache value
@@ -1117,7 +1117,7 @@ func (obj *FileRes) fragmentsCheckApply(apply bool) (bool, error) {
} }
// chownCheckApply performs a CheckApply for the file ownership. // chownCheckApply performs a CheckApply for the file ownership.
func (obj *FileRes) chownCheckApply(apply bool) (bool, error) { func (obj *FileRes) chownCheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("chownCheckApply(%t)", apply) obj.init.Logf("chownCheckApply(%t)", apply)
if obj.Owner == "" && obj.Group == "" { if obj.Owner == "" && obj.Group == "" {
@@ -1177,7 +1177,7 @@ func (obj *FileRes) chownCheckApply(apply bool) (bool, error) {
} }
// chmodCheckApply performs a CheckApply for the file permissions. // chmodCheckApply performs a CheckApply for the file permissions.
func (obj *FileRes) chmodCheckApply(apply bool) (bool, error) { func (obj *FileRes) chmodCheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("chmodCheckApply(%t)", apply) obj.init.Logf("chmodCheckApply(%t)", apply)
if obj.Mode == "" { if obj.Mode == "" {
@@ -1210,7 +1210,7 @@ func (obj *FileRes) chmodCheckApply(apply bool) (bool, error) {
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.
func (obj *FileRes) CheckApply(apply bool) (bool, error) { func (obj *FileRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// NOTE: all send/recv change notifications *must* be processed before // NOTE: all send/recv change notifications *must* be processed before
// there is a possibility of failure in CheckApply. This is because if // there is a possibility of failure in CheckApply. This is because if
// we fail (and possibly run again) the subsequent send->recv transfer // we fail (and possibly run again) the subsequent send->recv transfer
@@ -1227,33 +1227,33 @@ func (obj *FileRes) CheckApply(apply bool) (bool, error) {
// Run stateCheckApply before contentCheckApply, sourceCheckApply, and // Run stateCheckApply before contentCheckApply, sourceCheckApply, and
// fragmentsCheckApply. // fragmentsCheckApply.
if c, err := obj.stateCheckApply(apply); err != nil { if c, err := obj.stateCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false
} }
if c, err := obj.contentCheckApply(apply); err != nil { if c, err := obj.contentCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false
} }
if c, err := obj.sourceCheckApply(apply); err != nil { if c, err := obj.sourceCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false
} }
if c, err := obj.fragmentsCheckApply(apply); err != nil { if c, err := obj.fragmentsCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false
} }
if c, err := obj.chownCheckApply(apply); err != nil { if c, err := obj.chownCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false
} }
if c, err := obj.chmodCheckApply(apply); err != nil { if c, err := obj.chmodCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false

View File

@@ -122,7 +122,7 @@ func (obj *GroupRes) Watch(ctx context.Context) error {
} }
// CheckApply method for Group resource. // CheckApply method for Group resource.
func (obj *GroupRes) CheckApply(apply bool) (bool, error) { func (obj *GroupRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("CheckApply(%t)", apply) obj.init.Logf("CheckApply(%t)", apply)
// check if the group exists // check if the group exists

View File

@@ -388,9 +388,8 @@ func (obj *HetznerVMRes) Watch(context.Context) error {
// NOTE: this last assumption might still fail in case the same resource // NOTE: this last assumption might still fail in case the same resource
// instance is managed by multiple running mgmt instances! // instance is managed by multiple running mgmt instances!
// TODO: possible to ensure safe concurrency? // TODO: possible to ensure safe concurrency?
func (obj *HetznerVMRes) CheckApply(apply bool) (bool, error) { func (obj *HetznerVMRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
checkOK := true checkOK := true
ctx := context.TODO()
// Request up-to-date server info from the API. // Request up-to-date server info from the API.
if err := obj.getServerUpdate(ctx); err != nil { if err := obj.getServerUpdate(ctx); err != nil {
return false, errwrap.Wrapf(err, "getServerUpdate failed") return false, errwrap.Wrapf(err, "getServerUpdate failed")

View File

@@ -187,7 +187,7 @@ func (obj *HostnameRes) updateHostnameProperty(object dbus.BusObject, expectedVa
} }
// CheckApply method for Hostname resource. // CheckApply method for Hostname resource.
func (obj *HostnameRes) CheckApply(apply bool) (bool, error) { func (obj *HostnameRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
conn, err := util.SystemBusPrivateUsable() conn, err := util.SystemBusPrivateUsable()
if err != nil { if err != nil {
return false, errwrap.Wrapf(err, "failed to connect to the private system bus") return false, errwrap.Wrapf(err, "failed to connect to the private system bus")

View File

@@ -350,7 +350,7 @@ func (obj *HTTPServerRes) Watch(ctx context.Context) error {
// CheckApply never has anything to do for this resource, so it always succeeds. // CheckApply never has anything to do for this resource, so it always succeeds.
// It does however check that certain runtime requirements (such as the Root dir // It does however check that certain runtime requirements (such as the Root dir
// existing if one was specified) are fulfilled. // existing if one was specified) are fulfilled.
func (obj *HTTPServerRes) CheckApply(apply bool) (bool, error) { func (obj *HTTPServerRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("CheckApply") obj.init.Logf("CheckApply")
} }
@@ -734,7 +734,7 @@ func (obj *HTTPFileRes) Watch(ctx context.Context) error {
} }
// CheckApply never has anything to do for this resource, so it always succeeds. // CheckApply never has anything to do for this resource, so it always succeeds.
func (obj *HTTPFileRes) CheckApply(apply bool) (bool, error) { func (obj *HTTPFileRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("CheckApply") obj.init.Logf("CheckApply")
} }

View File

@@ -207,12 +207,12 @@ func (obj *KVRes) lessThanCheck(value string) (bool, error) {
} }
// CheckApply method for Password resource. Does nothing, returns happy! // CheckApply method for Password resource. Does nothing, returns happy!
func (obj *KVRes) CheckApply(apply bool) (bool, error) { func (obj *KVRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("CheckApply(%t)", apply) obj.init.Logf("CheckApply(%t)", apply)
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
defer wg.Wait() // this must be above the defer cancel() call defer wg.Wait() // this must be above the defer cancel() call
ctx, cancel := context.WithTimeout(context.Background(), kvCheckApplyTimeout) ctx, cancel := context.WithTimeout(ctx, kvCheckApplyTimeout)
defer cancel() defer cancel()
wg.Add(1) wg.Add(1)
go func() { go func() {

View File

@@ -291,7 +291,7 @@ func (obj *MountRes) Watch(ctx context.Context) error {
// fstabCheckApply checks /etc/fstab for entries corresponding to the resource // fstabCheckApply checks /etc/fstab for entries corresponding to the resource
// definition, and adds or deletes the entry as needed. // definition, and adds or deletes the entry as needed.
func (obj *MountRes) fstabCheckApply(apply bool) (bool, error) { func (obj *MountRes) fstabCheckApply(ctx context.Context, apply bool) (bool, error) {
exists, err := fstabEntryExists(fstabPath, obj.mount) exists, err := fstabEntryExists(fstabPath, obj.mount)
if err != nil { if err != nil {
return false, errwrap.Wrapf(err, "error checking if fstab entry exists") return false, errwrap.Wrapf(err, "error checking if fstab entry exists")
@@ -321,7 +321,7 @@ func (obj *MountRes) fstabCheckApply(apply bool) (bool, error) {
// mountCheckApply checks if the defined resource is mounted, and mounts or // mountCheckApply checks if the defined resource is mounted, and mounts or
// unmounts it according to the defined state. // unmounts it according to the defined state.
func (obj *MountRes) mountCheckApply(apply bool) (bool, error) { func (obj *MountRes) mountCheckApply(ctx context.Context, apply bool) (bool, error) {
exists, err := mountExists(procPath, obj.mount) exists, err := mountExists(procPath, obj.mount)
if err != nil { if err != nil {
return false, errwrap.Wrapf(err, "error checking if mount exists") return false, errwrap.Wrapf(err, "error checking if mount exists")
@@ -340,7 +340,7 @@ func (obj *MountRes) mountCheckApply(apply bool) (bool, error) {
if obj.State == "exists" { if obj.State == "exists" {
// Reload mounts from /etc/fstab by performing a `daemon-reload` and // Reload mounts from /etc/fstab by performing a `daemon-reload` and
// restarting `local-fs.target` and `remote-fs.target` units. // restarting `local-fs.target` and `remote-fs.target` units.
if err := mountReload(); err != nil { if err := mountReload(ctx); err != nil {
return false, errwrap.Wrapf(err, "error reloading /etc/fstab") return false, errwrap.Wrapf(err, "error reloading /etc/fstab")
} }
return false, nil // we're done return false, nil // we're done
@@ -355,16 +355,16 @@ func (obj *MountRes) mountCheckApply(apply bool) (bool, error) {
// CheckApply is run to check the state and, if apply is true, to apply the // CheckApply is run to check the state and, if apply is true, to apply the
// necessary changes to reach the desired state. This is run before Watch and // necessary changes to reach the desired state. This is run before Watch and
// again if Watch finds a change occurring to the state. // again if Watch finds a change occurring to the state.
func (obj *MountRes) CheckApply(apply bool) (bool, error) { func (obj *MountRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
checkOK := true checkOK := true
if c, err := obj.fstabCheckApply(apply); err != nil { if c, err := obj.fstabCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false
} }
if c, err := obj.mountCheckApply(apply); err != nil { if c, err := obj.mountCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false
@@ -584,7 +584,7 @@ func mountCompare(def, proc *fstab.Mount) (bool, error) {
// mountReload performs a daemon-reload and restarts fs-local.target and // mountReload performs a daemon-reload and restarts fs-local.target and
// fs-remote.target, to let systemd mount any new entries in /etc/fstab. // fs-remote.target, to let systemd mount any new entries in /etc/fstab.
func mountReload() error { func mountReload(ctx context.Context) error {
// establish a godbus connection // establish a godbus connection
conn, err := util.SystemBusPrivateUsable() conn, err := util.SystemBusPrivateUsable()
if err != nil { if err != nil {
@@ -598,12 +598,12 @@ func mountReload() error {
} }
// systemctl restart local-fs.target // systemctl restart local-fs.target
if err := restartUnit(conn, "local-fs.target"); err != nil { if err := restartUnit(ctx, conn, "local-fs.target"); err != nil {
return errwrap.Wrapf(err, "error restarting unit") return errwrap.Wrapf(err, "error restarting unit")
} }
// systemctl restart remote-fs.target // systemctl restart remote-fs.target
if err := restartUnit(conn, "remote-fs.target"); err != nil { if err := restartUnit(ctx, conn, "remote-fs.target"); err != nil {
return errwrap.Wrapf(err, "error restarting unit") return errwrap.Wrapf(err, "error restarting unit")
} }
@@ -612,9 +612,9 @@ func mountReload() error {
// restartUnit restarts the given dbus unit and waits for it to finish starting // restartUnit restarts the given dbus unit and waits for it to finish starting
// up. If restartTimeout is exceeded, it will return an error. // up. If restartTimeout is exceeded, it will return an error.
func restartUnit(conn *dbus.Conn, unit string) error { func restartUnit(ctx context.Context, conn *dbus.Conn, unit string) error {
// timeout if we don't get the JobRemoved event // timeout if we don't get the JobRemoved event
ctx, cancel := context.WithTimeout(context.TODO(), dbusRestartCtxTimeout*time.Second) ctx, cancel := context.WithTimeout(ctx, dbusRestartCtxTimeout*time.Second)
defer cancel() defer cancel()
// Add a dbus rule to watch the systemd1 JobRemoved signal used to wait // Add a dbus rule to watch the systemd1 JobRemoved signal used to wait

View File

@@ -163,7 +163,7 @@ func (obj *MsgRes) journalPriority() journal.Priority {
// CheckApply method for Msg resource. Every check leads to an apply, meaning // CheckApply method for Msg resource. Every check leads to an apply, meaning
// that the message is flushed to the journal. // that the message is flushed to the journal.
func (obj *MsgRes) CheckApply(apply bool) (bool, error) { func (obj *MsgRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// isStateOK() done by engine, so we updateStateOK() to pass in value // isStateOK() done by engine, so we updateStateOK() to pass in value
//if obj.isAllStateOK() { //if obj.isAllStateOK() {
// return true, nil // return true, nil

View File

@@ -313,7 +313,7 @@ func (obj *NetRes) Watch(ctx context.Context) error {
// ifaceCheckApply checks the state of the network device and brings it up or // ifaceCheckApply checks the state of the network device and brings it up or
// down as necessary. // down as necessary.
func (obj *NetRes) ifaceCheckApply(apply bool) (bool, error) { func (obj *NetRes) ifaceCheckApply(ctx context.Context, apply bool) (bool, error) {
// check the interface state // check the interface state
state, err := obj.iface.state() state, err := obj.iface.state()
if err != nil { if err != nil {
@@ -340,7 +340,7 @@ func (obj *NetRes) ifaceCheckApply(apply bool) (bool, error) {
// addrCheckApply checks if the interface has the correct addresses and then // addrCheckApply checks if the interface has the correct addresses and then
// adds/deletes addresses as necessary. // adds/deletes addresses as necessary.
func (obj *NetRes) addrCheckApply(apply bool) (bool, error) { func (obj *NetRes) addrCheckApply(ctx context.Context, apply bool) (bool, error) {
// get the link's addresses // get the link's addresses
ifaceAddrs, err := obj.iface.getAddrs() ifaceAddrs, err := obj.iface.getAddrs()
if err != nil { if err != nil {
@@ -388,7 +388,7 @@ func (obj *NetRes) addrCheckApply(apply bool) (bool, error) {
// gatewayCheckApply checks if the interface has the correct default gateway and // gatewayCheckApply checks if the interface has the correct default gateway and
// adds/deletes routes as necessary. // adds/deletes routes as necessary.
func (obj *NetRes) gatewayCheckApply(apply bool) (bool, error) { func (obj *NetRes) gatewayCheckApply(ctx context.Context, apply bool) (bool, error) {
// get all routes from the interface // get all routes from the interface
routes, err := netlink.RouteList(obj.iface.link, netlink.FAMILY_V4) routes, err := netlink.RouteList(obj.iface.link, netlink.FAMILY_V4)
if err != nil { if err != nil {
@@ -440,7 +440,7 @@ func (obj *NetRes) gatewayCheckApply(apply bool) (bool, error) {
} }
// fileCheckApply checks and maintains the systemd-networkd unit file contents. // fileCheckApply checks and maintains the systemd-networkd unit file contents.
func (obj *NetRes) fileCheckApply(apply bool) (bool, error) { func (obj *NetRes) fileCheckApply(ctx context.Context, apply bool) (bool, error) {
// check if the unit file exists // check if the unit file exists
_, err := os.Stat(obj.unitFilePath) _, err := os.Stat(obj.unitFilePath)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
@@ -475,11 +475,11 @@ func (obj *NetRes) fileCheckApply(apply bool) (bool, error) {
// CheckApply is run to check the state and, if apply is true, to apply the // CheckApply is run to check the state and, if apply is true, to apply the
// necessary changes to reach the desired state. This is run before Watch and // necessary changes to reach the desired state. This is run before Watch and
// again if Watch finds a change occurring to the state. // again if Watch finds a change occurring to the state.
func (obj *NetRes) CheckApply(apply bool) (bool, error) { func (obj *NetRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
checkOK := true checkOK := true
// check the network device // check the network device
if c, err := obj.ifaceCheckApply(apply); err != nil { if c, err := obj.ifaceCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false
@@ -491,14 +491,14 @@ func (obj *NetRes) CheckApply(apply bool) (bool, error) {
} }
// check the addresses // check the addresses
if c, err := obj.addrCheckApply(apply); err != nil { if c, err := obj.addrCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false
} }
// check the gateway // check the gateway
if c, err := obj.gatewayCheckApply(apply); err != nil { if c, err := obj.gatewayCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false
@@ -510,7 +510,7 @@ func (obj *NetRes) CheckApply(apply bool) (bool, error) {
} }
// check the networkd unit file // check the networkd unit file
if c, err := obj.fileCheckApply(apply); err != nil { if c, err := obj.fileCheckApply(ctx, apply); err != nil {
return false, err return false, err
} else if !c { } else if !c {
checkOK = false checkOK = false

View File

@@ -76,7 +76,7 @@ func (obj *NoopRes) Watch(ctx context.Context) error {
} }
// CheckApply method for Noop resource. Does nothing, returns happy! // CheckApply method for Noop resource. Does nothing, returns happy!
func (obj *NoopRes) CheckApply(apply bool) (bool, error) { func (obj *NoopRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
if obj.init.Refresh() { if obj.init.Refresh() {
obj.init.Logf("received a notification!") obj.init.Logf("received a notification!")
} }

View File

@@ -203,7 +203,7 @@ func (obj *NspawnRes) Watch(ctx context.Context) error {
// CheckApply is run to check the state and, if apply is true, to apply the // CheckApply is run to check the state and, if apply is true, to apply the
// necessary changes to reach the desired state. This is run before Watch and // necessary changes to reach the desired state. This is run before Watch and
// again if Watch finds a change occurring to the state. // again if Watch finds a change occurring to the state.
func (obj *NspawnRes) CheckApply(apply bool) (bool, error) { func (obj *NspawnRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// this resource depends on systemd to ensure that it's running // this resource depends on systemd to ensure that it's running
if !systemdUtil.IsRunningSystemd() { if !systemdUtil.IsRunningSystemd() {
return false, errors.New("systemd is not running") return false, errors.New("systemd is not running")
@@ -253,7 +253,7 @@ func (obj *NspawnRes) CheckApply(apply bool) (bool, error) {
obj.init.Logf("CheckApply() applying '%s' state", obj.State) obj.init.Logf("CheckApply() applying '%s' state", obj.State)
// use the embedded svc to apply the correct state // use the embedded svc to apply the correct state
if _, err := obj.svc.CheckApply(apply); err != nil { if _, err := obj.svc.CheckApply(ctx, apply); err != nil {
return false, errwrap.Wrapf(err, "nested svc failed") return false, errwrap.Wrapf(err, "nested svc failed")
} }

View File

@@ -219,7 +219,7 @@ func (obj *PasswordRes) Watch(ctx context.Context) error {
} }
// CheckApply method for Password resource. Does nothing, returns happy! // CheckApply method for Password resource. Does nothing, returns happy!
func (obj *PasswordRes) CheckApply(apply bool) (bool, error) { func (obj *PasswordRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
var refresh = obj.init.Refresh() // do we have a pending reload to apply? var refresh = obj.init.Refresh() // do we have a pending reload to apply?
var exists = true // does the file (aka the token) exist? var exists = true // does the file (aka the token) exist?
var generate bool // do we need to generate a new password? var generate bool // do we need to generate a new password?

View File

@@ -108,7 +108,7 @@ func (obj *PippetRes) Watch(ctx context.Context) error {
} }
// CheckApply synchronizes the resource if required. // CheckApply synchronizes the resource if required.
func (obj *PippetRes) CheckApply(apply bool) (bool, error) { func (obj *PippetRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
if !apply { if !apply {
return false, nil return false, nil
} }

View File

@@ -278,7 +278,7 @@ func (obj *PkgRes) populateFileList() error {
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.
func (obj *PkgRes) CheckApply(apply bool) (bool, error) { func (obj *PkgRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("Check: %s", obj.fmtNames(obj.getNames())) obj.init.Logf("Check: %s", obj.fmtNames(obj.getNames()))
bus := packagekit.NewBus() bus := packagekit.NewBus()

View File

@@ -83,7 +83,7 @@ func (obj *PrintRes) Watch(ctx context.Context) error {
} }
// CheckApply method for Print resource. Does nothing, returns happy! // CheckApply method for Print resource. Does nothing, returns happy!
func (obj *PrintRes) CheckApply(apply bool) (bool, error) { func (obj *PrintRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("CheckApply: %t", apply) obj.init.Logf("CheckApply: %t", apply)
if val, exists := obj.init.Recv()["Msg"]; exists && val.Changed { if val, exists := obj.init.Recv()["Msg"]; exists && val.Changed {
// if we received on Msg, and it changed, log message // if we received on Msg, and it changed, log message

View File

@@ -644,7 +644,7 @@ func TestResources1(t *testing.T) {
} }
t.Logf("test #%d: running CheckApply", index) t.Logf("test #%d: running CheckApply", index)
checkOK, err := res.CheckApply(true) // no noop! checkOK, err := res.CheckApply(doneCtx, true) // no noop!
if err != nil { if err != nil {
t.Errorf("test #%d: FAIL", index) t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: CheckApply failed: %s", index, err.Error()) t.Errorf("test #%d: CheckApply failed: %s", index, err.Error())
@@ -771,7 +771,7 @@ func TestResources2(t *testing.T) {
// the returned error. // the returned error.
resCheckApplyError := func(res engine.Res, expCheckOK bool, errOK func(e error) error) func() error { resCheckApplyError := func(res engine.Res, expCheckOK bool, errOK func(e error) error) func() error {
return func() error { return func() error {
checkOK, err := res.CheckApply(true) // no noop! checkOK, err := res.CheckApply(context.TODO(), true) // no noop!
if e := errOK(err); e != nil { if e := errOK(err); e != nil {
return errwrap.Wrapf(e, "error from CheckApply did not match expected") return errwrap.Wrapf(e, "error from CheckApply did not match expected")
} }

View File

@@ -238,7 +238,7 @@ func (obj *SvcRes) Watch(ctx context.Context) error {
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.
func (obj *SvcRes) CheckApply(apply bool) (bool, error) { func (obj *SvcRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
if !systemdUtil.IsRunningSystemd() { if !systemdUtil.IsRunningSystemd() {
return false, fmt.Errorf("systemd is not running") return false, fmt.Errorf("systemd is not running")
} }
@@ -338,6 +338,7 @@ func (obj *SvcRes) CheckApply(apply bool) (bool, error) {
refresh = false // we did a stop, so a reload is not needed refresh = false // we did a stop, so a reload is not needed
} }
// XXX: use ctx here
status := <-result status := <-result
if &status == nil { if &status == nil {
return false, fmt.Errorf("systemd service action result is nil") return false, fmt.Errorf("systemd service action result is nil")

View File

@@ -139,7 +139,7 @@ func (obj *TestRes) Watch(ctx context.Context) error {
} }
// CheckApply method for Test resource. Does nothing, returns happy! // CheckApply method for Test resource. Does nothing, returns happy!
func (obj *TestRes) CheckApply(apply bool) (bool, error) { func (obj *TestRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
for key, val := range obj.init.Recv() { for key, val := range obj.init.Recv() {
obj.init.Logf("CheckApply: Received `%s`, changed: %t", key, val.Changed) obj.init.Logf("CheckApply: Received `%s`, changed: %t", key, val.Changed)
} }

View File

@@ -216,7 +216,7 @@ func (obj *TFTPServerRes) Watch(ctx context.Context) error {
// CheckApply never has anything to do for this resource, so it always succeeds. // CheckApply never has anything to do for this resource, so it always succeeds.
// It does however check that certain runtime requirements (such as the Root dir // It does however check that certain runtime requirements (such as the Root dir
// existing if one was specified) are fulfilled. // existing if one was specified) are fulfilled.
func (obj *TFTPServerRes) CheckApply(apply bool) (bool, error) { func (obj *TFTPServerRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("CheckApply") obj.init.Logf("CheckApply")
} }
@@ -561,7 +561,7 @@ func (obj *TFTPFileRes) Watch(ctx context.Context) error {
} }
// CheckApply never has anything to do for this resource, so it always succeeds. // CheckApply never has anything to do for this resource, so it always succeeds.
func (obj *TFTPFileRes) CheckApply(apply bool) (bool, error) { func (obj *TFTPFileRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
if obj.init.Debug { if obj.init.Debug {
obj.init.Logf("CheckApply") obj.init.Logf("CheckApply")
} }

View File

@@ -98,7 +98,7 @@ func (obj *TimerRes) Watch(ctx context.Context) error {
} }
// CheckApply method for Timer resource. Triggers a timer reset on notify. // CheckApply method for Timer resource. Triggers a timer reset on notify.
func (obj *TimerRes) CheckApply(apply bool) (bool, error) { func (obj *TimerRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// because there are no checks to run, this resource has a less // because there are no checks to run, this resource has a less
// traditional pattern than what is seen in most resources... // traditional pattern than what is seen in most resources...
if !obj.init.Refresh() { // this works for apply || !apply if !obj.init.Refresh() { // this works for apply || !apply

View File

@@ -169,7 +169,7 @@ func (obj *UserRes) Watch(ctx context.Context) error {
} }
// CheckApply method for User resource. // CheckApply method for User resource.
func (obj *UserRes) CheckApply(apply bool) (bool, error) { func (obj *UserRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("CheckApply(%t)", apply) obj.init.Logf("CheckApply(%t)", apply)
var exists = true var exists = true

View File

@@ -505,7 +505,7 @@ func (obj *VirtRes) domainCreate() (*libvirt.Domain, bool, error) {
} }
// stateCheckApply starts, stops, or pauses/unpauses the domain as needed. // stateCheckApply starts, stops, or pauses/unpauses the domain as needed.
func (obj *VirtRes) stateCheckApply(apply bool, dom *libvirt.Domain) (bool, error) { func (obj *VirtRes) stateCheckApply(ctx context.Context, apply bool, dom *libvirt.Domain) (bool, error) {
var checkOK = true var checkOK = true
domInfo, err := dom.GetInfo() domInfo, err := dom.GetInfo()
if err != nil { if err != nil {
@@ -586,7 +586,7 @@ func (obj *VirtRes) stateCheckApply(apply bool, dom *libvirt.Domain) (bool, erro
// attrCheckApply performs the CheckApply functions for CPU, Memory and others. // attrCheckApply performs the CheckApply functions for CPU, Memory and others.
// This shouldn't be called when the machine is absent; it won't be found! // This shouldn't be called when the machine is absent; it won't be found!
func (obj *VirtRes) attrCheckApply(apply bool, dom *libvirt.Domain) (bool, error) { func (obj *VirtRes) attrCheckApply(ctx context.Context, apply bool, dom *libvirt.Domain) (bool, error) {
var checkOK = true var checkOK = true
domInfo, err := dom.GetInfo() domInfo, err := dom.GetInfo()
if err != nil { if err != nil {
@@ -752,7 +752,7 @@ func (obj *VirtRes) domainShutdownSync(apply bool, dom *libvirt.Domain) (bool, e
// CheckApply checks the resource state and applies the resource if the bool // CheckApply checks the resource state and applies the resource if the bool
// input is true. It returns error info and if the state check passed or not. // input is true. It returns error info and if the state check passed or not.
func (obj *VirtRes) CheckApply(apply bool) (bool, error) { func (obj *VirtRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
if obj.conn == nil { // programming error? if obj.conn == nil { // programming error?
return false, fmt.Errorf("got called with nil connection") return false, fmt.Errorf("got called with nil connection")
} }
@@ -843,7 +843,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) {
// FIXME: is doing this early check (therefore twice total) a good idea? // FIXME: is doing this early check (therefore twice total) a good idea?
// run additional pre-emptive attr change checks here for hotplug stuff! // run additional pre-emptive attr change checks here for hotplug stuff!
if !obj.absent { if !obj.absent {
if c, err := obj.attrCheckApply(apply, dom); err != nil { if c, err := obj.attrCheckApply(ctx, apply, dom); err != nil {
return false, errwrap.Wrapf(err, "early attrCheckApply failed") return false, errwrap.Wrapf(err, "early attrCheckApply failed")
} else if !c { } else if !c {
checkOK = false checkOK = false
@@ -852,7 +852,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) {
// TODO: do we need to run again below after we've booted up the domain? // TODO: do we need to run again below after we've booted up the domain?
// apply correct machine state, eg: startup/shutoff/pause as needed // apply correct machine state, eg: startup/shutoff/pause as needed
if c, err := obj.stateCheckApply(apply, dom); err != nil { if c, err := obj.stateCheckApply(ctx, apply, dom); err != nil {
return false, errwrap.Wrapf(err, "stateCheckApply failed") return false, errwrap.Wrapf(err, "stateCheckApply failed")
} else if !c { } else if !c {
checkOK = false checkOK = false
@@ -863,7 +863,7 @@ func (obj *VirtRes) CheckApply(apply bool) (bool, error) {
// mem & cpu checks... // mem & cpu checks...
if !obj.absent { if !obj.absent {
if c, err := obj.attrCheckApply(apply, dom); err != nil { if c, err := obj.attrCheckApply(ctx, apply, dom); err != nil {
return false, errwrap.Wrapf(err, "attrCheckApply failed") return false, errwrap.Wrapf(err, "attrCheckApply failed")
} else if !c { } else if !c {
checkOK = false checkOK = false

View File

@@ -176,7 +176,7 @@ type testEngineRes struct {
privateProp2 []int privateProp2 []int
} }
func (t *testEngineRes) CheckApply(bool) (bool, error) { return false, nil } func (t *testEngineRes) CheckApply(context.Context, bool) (bool, error) { return false, nil }
func (t *testEngineRes) Cleanup() error { return nil } func (t *testEngineRes) Cleanup() error { return nil }