engine: Fix up some send/recv corner cases

Initially I wasn't 100% clear or decided on the send/recv semantics.
After some experimenting, I think this is much closer to what we want.
Nothing should break or regress here, this only enables more
possibilities.
This commit is contained in:
James Shubin
2025-05-05 23:53:37 -04:00
parent ae1d9b94d4
commit 774d408e13
17 changed files with 412 additions and 32 deletions

View File

@@ -159,7 +159,6 @@ var AwsRegions = []string{
// http://docs.aws.amazon.com/cli/latest/userguide/cli-config-files.html
type AwsEc2Res struct {
traits.Base // add the base methods without re-implementation
traits.Sendable
init *engine.Init

View File

@@ -38,6 +38,7 @@ import (
"os"
"os/exec"
"os/user"
"path"
"sort"
"strings"
"sync"
@@ -161,10 +162,28 @@ type ExecRes struct {
// used for any command being run.
Group string `lang:"group" yaml:"group"`
// SendOutput is a value which can be sent for the Send/Recv Output
// field if no value is available in the cache. This is used in very
// specialized scenarios (particularly prototyping and unclean
// environments) and should not be used routinely. It should be used
// only in situations where we didn't produce our own sending values,
// and there are none in the cache, and instead are relying on a runtime
// mechanism to help us out. This can commonly occur if you wish to make
// incremental progress when locally testing some code using Send/Recv,
// but you are combining it with --tmp-prefix for other reasons.
SendOutput *string `lang:"send_output" yaml:"send_output"`
// SendStdout is like SendOutput but for stdout alone. See those docs.
SendStdout *string `lang:"send_stdout" yaml:"send_stdout"`
// SendStderr is like SendOutput but for stderr alone. See those docs.
SendStderr *string `lang:"send_stderr" yaml:"send_stderr"`
output *string // all cmd output, read only, do not set!
stdout *string // the cmd stdout, read only, do not set!
stderr *string // the cmd stderr, read only, do not set!
dir string // the path to local storage
interruptChan chan struct{}
wg *sync.WaitGroup
}
@@ -225,6 +244,12 @@ func (obj *ExecRes) Validate() error {
func (obj *ExecRes) Init(init *engine.Init) error {
obj.init = init // save for later
dir, err := obj.init.VarDir("")
if err != nil {
return errwrap.Wrapf(err, "could not get VarDir in Init()")
}
obj.dir = dir
obj.interruptChan = make(chan struct{})
obj.wg = &sync.WaitGroup{}
@@ -364,6 +389,10 @@ func (obj *ExecRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// check and this will run. It is still guarded by the IfCmd, but it can
// have a chance to execute, and all without the check of obj.Refresh()!
if err := obj.checkApplyReadCache(); err != nil {
return false, err
}
if obj.IfCmd != "" { // if there is no onlyif check, we should just run
var cmdName string
var cmdArgs []string
@@ -423,6 +452,13 @@ func (obj *ExecRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
obj.init.Logf("ifcmd out:")
obj.init.Logf("%s", s)
}
//if err := obj.checkApplyWriteCache(); err != nil {
// return false, err
//}
obj.safety()
if err := obj.send(); err != nil {
return false, err
}
return true, nil // don't run
}
if s := out.String(); s == "" {
@@ -436,12 +472,26 @@ func (obj *ExecRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
if obj.Creates != "" { // gate the extra syscall
if _, err := os.Stat(obj.Creates); err == nil {
obj.init.Logf("creates file exists, skipping cmd")
//if err := obj.checkApplyWriteCache(); err != nil {
// return false, err
//}
obj.safety()
if err := obj.send(); err != nil {
return false, err
}
return true, nil // don't run
}
}
// state is not okay, no work done, exit, but without error
if !apply {
//if err := obj.checkApplyWriteCache(); err != nil {
// return false, err
//}
//obj.safety()
if err := obj.send(); err != nil {
return false, err
}
return false, nil
}
@@ -654,11 +704,10 @@ func (obj *ExecRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
}
}
if err := obj.init.Send(&ExecSends{
Output: obj.output,
Stdout: obj.stdout,
Stderr: obj.stderr,
}); err != nil {
if err := obj.checkApplyWriteCache(); err != nil {
return false, err
}
if err := obj.send(); err != nil {
return false, err
}
@@ -670,6 +719,77 @@ func (obj *ExecRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
return false, nil // success
}
// send is a helper to avoid duplication of the same send operation.
func (obj *ExecRes) send() error {
return obj.init.Send(&ExecSends{
Output: obj.output,
Stdout: obj.stdout,
Stderr: obj.stderr,
})
}
// safety is a helper function that populates the cached "send" values if they
// are empty. It must only be called right before actually sending any values,
// and right before CheckApply returns. It should be used only in situations
// where we didn't produce our own sending values, and there are none in the
// cache, and instead are relying on a runtime mechanism to help us out. This
// mechanism is useful as a backstop for when we're running in unclean
// scenarios.
func (obj *ExecRes) safety() {
if x := obj.SendOutput; x != nil && obj.output == nil {
s := *x // copy
obj.output = &s
}
if x := obj.SendStdout; x != nil && obj.stdout == nil {
s := *x // copy
obj.stdout = &s
}
if x := obj.SendStderr; x != nil && obj.stderr == nil {
s := *x // copy
obj.stderr = &s
}
}
// checkApplyReadCache is a helper to do all our reading from the cache.
func (obj *ExecRes) checkApplyReadCache() error {
output, err := engineUtil.ReadData(path.Join(obj.dir, "output"))
if err != nil {
return err
}
obj.output = output
stdout, err := engineUtil.ReadData(path.Join(obj.dir, "stdout"))
if err != nil {
return err
}
obj.stdout = stdout
stderr, err := engineUtil.ReadData(path.Join(obj.dir, "stderr"))
if err != nil {
return err
}
obj.stderr = stderr
return nil
}
// checkApplyWriteCache is a helper to do all our writing into the cache.
func (obj *ExecRes) checkApplyWriteCache() error {
if _, err := engineUtil.WriteData(path.Join(obj.dir, "output"), obj.output); err != nil {
return err
}
if _, err := engineUtil.WriteData(path.Join(obj.dir, "stdout"), obj.stdout); err != nil {
return err
}
if _, err := engineUtil.WriteData(path.Join(obj.dir, "stderr"), obj.stderr); err != nil {
return err
}
return nil
}
// Cmp compares two resources and returns an error if they are not equivalent.
func (obj *ExecRes) Cmp(r engine.Res) error {
// we can only compare ExecRes to others of the same resource kind
@@ -740,6 +860,16 @@ func (obj *ExecRes) Cmp(r engine.Res) error {
return fmt.Errorf("the Group differs")
}
if err := engineUtil.StrPtrCmp(obj.SendOutput, res.SendOutput); err != nil {
return errwrap.Wrapf(err, "the SendOutput differs")
}
if err := engineUtil.StrPtrCmp(obj.SendStdout, res.SendStdout); err != nil {
return errwrap.Wrapf(err, "the SendStdout differs")
}
if err := engineUtil.StrPtrCmp(obj.SendStderr, res.SendStderr); err != nil {
return errwrap.Wrapf(err, "the SendStderr differs")
}
return nil
}

View File

@@ -35,6 +35,7 @@ import (
"context"
"fmt"
"os/exec"
"path"
"strings"
"syscall"
"testing"
@@ -46,7 +47,8 @@ import (
)
func fakeExecInit(t *testing.T) (*engine.Init, *ExecSends) {
debug := testing.Verbose() // set via the -test.v flag to `go test`
tmpdir := fmt.Sprintf("%s/", t.TempDir()) // gets cleaned up at end, new dir for each call
debug := testing.Verbose() // set via the -test.v flag to `go test`
logf := func(format string, v ...interface{}) {
t.Logf("test: "+format, v...)
}
@@ -60,6 +62,9 @@ func fakeExecInit(t *testing.T) (*engine.Init, *ExecSends) {
*execSends = *x // set
return nil
},
VarDir: func(p string) (string, error) {
return path.Join(tmpdir, p), nil
},
Debug: debug,
Logf: logf,
}, execSends

View File

@@ -1373,6 +1373,7 @@ func (obj *FileRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// might not have a new value to copy, and therefore we won't see this
// notification of change. Therefore, it is important to process these
// promptly, if they must not be lost, such as for cache invalidation.
// NOTE: Modern send/recv doesn't really have this limitation anymore.
if val, exists := obj.init.Recv()["content"]; exists && val.Changed {
// if we received on Content, and it changed, invalidate the cache!
obj.init.Logf("contentCheckApply: invalidating sha256sum of `content`")

View File

@@ -89,7 +89,6 @@ type HTTPProxyRes struct {
traits.Base // add the base methods without re-implementation
traits.Edgeable // XXX: add autoedge support
traits.Groupable // can be grouped into HTTPServerRes
traits.Sendable
init *engine.Init
@@ -520,20 +519,6 @@ func (obj *HTTPProxyRes) Cmp(r engine.Res) error {
return nil
}
// HTTPProxySends is the struct of data which is sent after a successful Apply.
type HTTPProxySends struct {
// Data is the received value being sent.
// TODO: should this be []byte or *[]byte instead?
Data *string `lang:"data"`
}
// Sends represents the default struct of values we can send using Send/Recv.
func (obj *HTTPProxyRes) Sends() interface{} {
return &HTTPProxySends{
Data: nil,
}
}
// UnmarshalYAML is the custom unmarshal handler for this struct. It is
// primarily useful for setting the defaults.
func (obj *HTTPProxyRes) UnmarshalYAML(unmarshal func(interface{}) error) error {

View File

@@ -465,10 +465,20 @@ func (obj *HTTPUIInputRes) valueCheckApply(ctx context.Context, apply bool) (boo
obj.mutex.Unlock()
if obj.last != nil && *obj.last == value {
if err := obj.init.Send(&HTTPUIInputSends{
Value: &value,
}); err != nil {
return false, err
}
return true, nil // expected value has already been sent
}
if !apply { // XXX: does this break send/recv if we end early?
if !apply {
if err := obj.init.Send(&HTTPUIInputSends{
Value: &value, // XXX: arbitrary since we're in noop mode
}); err != nil {
return false, err
}
return false, nil
}
@@ -509,10 +519,20 @@ func (obj *HTTPUIInputRes) storeCheckApply(ctx context.Context, apply bool) (boo
obj.mutex.Unlock()
if exists && v1 == v2 { // both sides are happy
if err := obj.init.Send(&HTTPUIInputSends{
Value: &v2,
}); err != nil {
return false, err
}
return true, nil
}
if !apply { // XXX: does this break send/recv if we end early?
if !apply {
if err := obj.init.Send(&HTTPUIInputSends{
Value: &v2, // XXX: arbitrary since we're in noop mode
}); err != nil {
return false, err
}
return false, nil
}

View File

@@ -41,6 +41,7 @@ import (
"github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/engine/traits"
engineUtil "github.com/purpleidea/mgmt/engine/util"
"github.com/purpleidea/mgmt/util/errwrap"
"github.com/purpleidea/mgmt/util/recwatch"
)
@@ -115,6 +116,8 @@ func (obj *PasswordRes) Cleanup() error {
return nil
}
// read is a helper to read the data from disk. This is similar to an engineUtil
// function named ReadData but is kept separate for safety anyways.
func (obj *PasswordRes) read() (string, error) {
file, err := os.Open(obj.path) // open a handle to read the file
if err != nil {
@@ -128,14 +131,28 @@ func (obj *PasswordRes) read() (string, error) {
return strings.TrimSpace(string(data)), nil
}
// write is a helper to store the data on disk. This is similar to an engineUtil
// function named WriteData but is kept separate for safety anyways.
func (obj *PasswordRes) write(password string) (int, error) {
file, err := os.Create(obj.path) // open a handle to create the file
uid, gid, err := engineUtil.GetUIDGID()
if err != nil {
return -1, err
}
// Chmod it before we write the secret data.
file, err := os.OpenFile(obj.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return -1, errwrap.Wrapf(err, "can't create file")
}
defer file.Close()
var c int
if c, err = file.Write([]byte(password + newline)); err != nil {
// Chown it before we write the secret data.
if err := file.Chown(uid, gid); err != nil {
return -1, err
}
c, err := file.Write([]byte(password + newline))
if err != nil {
return c, errwrap.Wrapf(err, "can't write file")
}
return c, file.Sync()
@@ -269,11 +286,21 @@ func (obj *PasswordRes) CheckApply(ctx context.Context, apply bool) (bool, error
//}
if !refresh && exists && !generate && !write { // nothing to do, done!
if err := obj.init.Send(&PasswordSends{
Password: &password,
}); err != nil {
return false, err
}
return true, nil
}
// a refresh was requested, the token doesn't exist, or the check failed
if !apply {
if err := obj.init.Send(&PasswordSends{
Password: &password, // XXX: arbitrary since we're in noop mode
}); err != nil {
return false, err
}
return false, nil
}

View File

@@ -501,7 +501,8 @@ func TestResources1(t *testing.T) {
doneCtx, doneCtxCancel := context.WithCancel(context.Background())
defer doneCtxCancel()
debug := testing.Verbose() // set via the -test.v flag to `go test`
tmpdir := fmt.Sprintf("%s/", t.TempDir()) // gets cleaned up at end, new dir for each call
debug := testing.Verbose() // set via the -test.v flag to `go test`
logf := func(format string, v ...interface{}) {
t.Logf(fmt.Sprintf("test #%d: ", index)+format, v...)
}
@@ -521,6 +522,10 @@ func TestResources1(t *testing.T) {
}
},
VarDir: func(p string) (string, error) {
return path.Join(tmpdir, p), nil
},
// Watch listens on this for close/pause events.
Debug: debug,
Logf: logf,

View File

@@ -134,6 +134,7 @@ func (obj *ValueRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
// might not have a new value to copy, and therefore we won't see this
// notification of change. Therefore, it is important to process these
// promptly, if they must not be lost, such as for cache invalidation.
// NOTE: Modern send/recv doesn't really have this limitation anymore.
if !obj.isSet {
obj.cachedAny = obj.Any // store anything we have if any
}
@@ -173,7 +174,12 @@ func (obj *ValueRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
checkOK = true
}
if !apply { // XXX: does this break send/recv if we end early?
if !apply {
if err := obj.init.Send(&ValueSends{
Any: obj.cachedAny,
}); err != nil {
return false, err
}
return checkOK, nil
}
@@ -191,7 +197,7 @@ func (obj *ValueRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
}
// send
//if obj.cachedAny != nil { // TODO: okay to send if value got removed too?
//if obj.cachedAny != nil { // XXX: okay to send if value got removed too?
if err := obj.init.Send(&ValueSends{
Any: obj.cachedAny,
}); err != nil {