engine: resources: Add fragments support to file resource

This adds a "fragments" mode to the file resource. In addition to
"content" and "source", you can now alternatively build a file from the
file fragments of other files.
This commit is contained in:
James Shubin
2019-10-22 12:17:17 -04:00
parent b961c96862
commit 58421fd31a
7 changed files with 445 additions and 30 deletions

View File

@@ -29,6 +29,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"github.com/purpleidea/mgmt/engine"
@@ -81,11 +82,25 @@ type FileRes struct {
State string `lang:"state" yaml:"state"`
// Content specifies the file contents to use. If this is nil, they are
// left undefined. It cannot be combined with Source.
// left undefined. It cannot be combined with the Source or Fragments
// parameters.
Content *string `lang:"content" yaml:"content"`
// Source specifies the source contents for the file resource. It cannot
// be combined with the Content parameter.
// be combined with the Content or Fragments parameters.
Source string `lang:"source" yaml:"source"`
// Fragments specifies that the file is built from a list of individual
// files. If one of the files is a directory, then the list of files in
// that directory are the fragments to combine. Multiple of these can be
// used together, although most simple cases will probably only either
// involve a single directory path or a fixed list of individual files.
// All paths are absolute and as a result must start with a slash. The
// directories (if any) must end with a slash as well. This cannot be
// combined with the Content or Source parameters. If a file with param
// is reversed, the reversed file is one that has `Content` set instead.
// Automatic edges will be added from these fragments. This currently
// isn't recursive in that if a fragment is a directory, this only
// searches one level deep at the moment.
Fragments []string `lang:"fragments" yaml:"fragments"`
// Owner specifies the file owner. You can specify either the string
// name, or a string representation of the owner integer uid.
@@ -100,8 +115,7 @@ type FileRes struct {
Recurse bool `lang:"recurse" yaml:"recurse"`
Force bool `lang:"force" yaml:"force"`
sha256sum string
recWatcher *recwatch.RecWatcher
sha256sum string
}
// getPath returns the actual path to use for this resource. It computes this
@@ -173,16 +187,26 @@ func (obj *FileRes) Validate() error {
return fmt.Errorf("the State is invalid")
}
if obj.State == FileStateAbsent && obj.Content != nil {
return fmt.Errorf("can't specify Content for an absent file")
isContent := obj.Content != nil
isSrc := obj.Source != ""
isFrag := len(obj.Fragments) > 0
if (isContent && isSrc) || (isSrc && isFrag) || (isFrag && isContent) {
return fmt.Errorf("can only specify one of Content, Source, and Fragments")
}
if obj.Content != nil && obj.Source != "" {
return fmt.Errorf("can't specify both Content and Source")
if obj.State == FileStateAbsent && (isContent || isSrc || isFrag) {
return fmt.Errorf("can't specify file Content, Source, or Fragments when State is %s", FileStateAbsent)
}
if obj.isDir() && obj.Content != nil { // makes no sense
return fmt.Errorf("can't specify Content when creating a Dir")
if obj.isDir() && (isContent || isFrag) { // makes no sense
return fmt.Errorf("can't specify Content or Fragments when creating a Dir")
}
for _, frag := range obj.Fragments {
// absolute paths begin with a slash
if !strings.HasPrefix(frag, "/") {
return fmt.Errorf("the frag (`%s`) isn't an absolute path", frag)
}
}
// TODO: should we silently ignore these errors or include them?
@@ -247,14 +271,107 @@ func (obj *FileRes) Close() error {
// Modify with caution, it is probably important to write some test cases first!
// If the Watch returns an error, it means that something has gone wrong, and it
// must be restarted. On a clean exit it returns nil.
// FIXME: Also watch the source directory when using obj.Source !!!
func (obj *FileRes) Watch() error {
var err error
obj.recWatcher, err = recwatch.NewRecWatcher(obj.getPath(), obj.Recurse)
// TODO: chan *recwatch.Event instead?
inputEvents := make(chan recwatch.Event)
defer close(inputEvents)
wg := &sync.WaitGroup{}
defer wg.Wait()
exit := make(chan struct{})
// TODO: should this be after (later in the file) than the `defer recWatcher.Close()` ?
// TODO: should this be after (later in the file) the `defer recWatcher.Close()` ?
defer close(exit)
recWatcher, err := recwatch.NewRecWatcher(obj.getPath(), obj.Recurse)
if err != nil {
return err
}
defer obj.recWatcher.Close()
defer recWatcher.Close()
// watch the various inputs to this file resource too!
if obj.Source != "" {
// This block is virtually identical to the below one.
recurse := strings.HasSuffix(obj.Source, "/") // isDir
rw, err := recwatch.NewRecWatcher(obj.Source, recurse)
if err != nil {
return err
}
defer rw.Close()
wg.Add(1)
go func() {
defer wg.Done()
for {
// TODO: *recwatch.Event instead?
var event recwatch.Event
var ok bool
var shutdown bool
select {
case event, ok = <-rw.Events(): // recv
case <-exit: // unblock
return
}
if !ok {
err := fmt.Errorf("channel shutdown")
event = recwatch.Event{Error: err}
shutdown = true
}
select {
case inputEvents <- event: // send
if shutdown { // optimization to free early
return
}
case <-exit: // unblock
return
}
}
}()
}
for _, frag := range obj.Fragments {
// This block is virtually identical to the above one.
recurse := false // TODO: is it okay for depth==1 dirs?
//recurse := strings.HasSuffix(frag, "/") // isDir
rw, err := recwatch.NewRecWatcher(frag, recurse)
if err != nil {
return err
}
defer rw.Close()
wg.Add(1)
go func() {
defer wg.Done()
for {
// TODO: *recwatch.Event instead?
var event recwatch.Event
var ok bool
var shutdown bool
select {
case event, ok = <-rw.Events(): // recv
case <-exit: // unblock
return
}
if !ok {
err := fmt.Errorf("channel shutdown")
event = recwatch.Event{Error: err}
shutdown = true
}
select {
case inputEvents <- event: // send
if shutdown { // optimization to free early
return
}
case <-exit: // unblock
return
}
}
}()
}
obj.init.Running() // when started, notify engine that we're running
@@ -265,9 +382,12 @@ func (obj *FileRes) Watch() error {
}
select {
case event, ok := <-obj.recWatcher.Events():
case event, ok := <-recWatcher.Events():
if !ok { // channel shutdown
return nil
// TODO: Should this be an error? Previously it
// was a `return nil`, and i'm not sure why...
//return nil
return fmt.Errorf("unexpected close")
}
if err := event.Error; err != nil {
return errwrap.Wrapf(err, "unknown %s watcher error", obj)
@@ -277,6 +397,18 @@ func (obj *FileRes) Watch() error {
}
send = true
case event, ok := <-inputEvents:
if !ok {
return fmt.Errorf("unexpected close")
}
if err := event.Error; err != nil {
return errwrap.Wrapf(err, "unknown %s input watcher error", obj)
}
if obj.init.Debug { // don't access event.Body if event.Error isn't nil
obj.init.Logf("input event(%s): %v", event.Body.Name, event.Body.Op)
}
send = true
case <-obj.init.Done: // closed by the engine to signal shutdown
return nil
}
@@ -690,7 +822,7 @@ func (obj *FileRes) stateCheckApply(apply bool) (bool, error) {
// Optimization: we shouldn't even look at obj.Content here, but we can
// skip this empty file creation here since we know we're going to be
// making it there anyways. This way we save the extra fopen noise.
if obj.Content != nil {
if obj.Content != nil || len(obj.Fragments) > 0 {
return false, nil // pretend we actually made it
}
@@ -718,6 +850,7 @@ func (obj *FileRes) contentCheckApply(apply bool) (bool, error) {
return true, nil
}
// Actually write the file. This is similar to fragmentsCheckApply.
bufferSrc := bytes.NewReader([]byte(*obj.Content))
sha256sum, checkOK, err := obj.fileCheckApply(apply, bufferSrc, obj.getPath(), obj.sha256sum)
if sha256sum != "" { // empty values mean errored or didn't hash
@@ -749,6 +882,66 @@ func (obj *FileRes) sourceCheckApply(apply bool) (bool, error) {
return checkOK, nil
}
// fragmentsCheckApply performs a CheckApply for the file fragments.
func (obj *FileRes) fragmentsCheckApply(apply bool) (bool, error) {
obj.init.Logf("fragmentsCheckApply(%t)", apply)
// fragments is not defined, leave it alone...
if len(obj.Fragments) == 0 {
return true, nil
}
content := ""
// TODO: In the future we could have a flag that merges and then sorts
// all the individual files in each directory before they are combined.
for _, frag := range obj.Fragments {
// It's a single file. Add it to what we're building...
if isDir := strings.HasSuffix(frag, "/"); !isDir {
out, err := ioutil.ReadFile(frag)
if err != nil {
return false, errwrap.Wrapf(err, "could not read file fragment")
}
content += string(out)
continue
}
// We're a dir, peer inside...
files, err := ioutil.ReadDir(frag)
if err != nil {
return false, errwrap.Wrapf(err, "could not read fragment directory")
}
// TODO: Add a sort and filter option so that we can choose the
// way we iterate through this directory to build out the file.
for _, file := range files {
if file.IsDir() { // skip recursive solutions for now...
continue
}
f := path.Join(frag, file.Name())
out, err := ioutil.ReadFile(f)
if err != nil {
return false, errwrap.Wrapf(err, "could not read directory file fragment")
}
content += string(out)
}
}
// Actually write the file. This is similar to contentCheckApply.
bufferSrc := bytes.NewReader([]byte(content))
// NOTE: We pass in an invalidated sha256sum cache since we don't cache
// all the individual files, and it could all change without us knowing.
// TODO: Is the sha256sum caching even having an effect at all here ???
sha256sum, checkOK, err := obj.fileCheckApply(apply, bufferSrc, obj.getPath(), "")
if sha256sum != "" { // empty values mean errored or didn't hash
// this can be valid even when the whole function errors
obj.sha256sum = sha256sum // cache value
}
if err != nil {
return false, err
}
// if no err, but !ok, then...
return checkOK, nil // success
}
// chownCheckApply performs a CheckApply for the file ownership.
func (obj *FileRes) chownCheckApply(apply bool) (bool, error) {
obj.init.Logf("chownCheckApply(%t)", apply)
@@ -858,7 +1051,8 @@ func (obj *FileRes) CheckApply(apply bool) (bool, error) {
checkOK := true
// run stateCheckApply before contentCheckApply and sourceCheckApply
// Run stateCheckApply before contentCheckApply, sourceCheckApply, and
// fragmentsCheckApply.
if c, err := obj.stateCheckApply(apply); err != nil {
return false, err
} else if !c {
@@ -874,6 +1068,11 @@ func (obj *FileRes) CheckApply(apply bool) (bool, error) {
} else if !c {
checkOK = false
}
if c, err := obj.fragmentsCheckApply(apply); err != nil {
return false, err
} else if !c {
checkOK = false
}
if c, err := obj.chownCheckApply(apply); err != nil {
return false, err
@@ -918,6 +1117,14 @@ func (obj *FileRes) Cmp(r engine.Res) error {
if obj.Source != res.Source {
return fmt.Errorf("the Source differs")
}
if len(obj.Fragments) != len(res.Fragments) {
return fmt.Errorf("the number of Fragments differs")
}
for i, x := range obj.Fragments {
if frag := res.Fragments[i]; x != frag {
return fmt.Errorf("the fragment at index %d differs", i)
}
}
if obj.Owner != res.Owner {
return fmt.Errorf("the Owner differs")
@@ -958,6 +1165,11 @@ func (obj *FileUID) IFF(uid engine.ResUID) bool {
// FileResAutoEdges holds the state of the auto edge generator.
type FileResAutoEdges struct {
// We do all of these first...
frags []engine.ResUID
fdone bool
// Then this is the second part...
data []engine.ResUID
pointer int
found bool
@@ -965,6 +1177,12 @@ type FileResAutoEdges struct {
// Next returns the next automatic edge.
func (obj *FileResAutoEdges) Next() []engine.ResUID {
// We do all of these first...
if !obj.fdone && len(obj.frags) > 0 {
return obj.frags // return them all at the same time
}
// Then this is the second part...
if obj.found {
panic("Shouldn't be called anymore!")
}
@@ -978,6 +1196,13 @@ func (obj *FileResAutoEdges) Next() []engine.ResUID {
// Test gets results of the earlier Next() call, & returns if we should continue!
func (obj *FileResAutoEdges) Test(input []bool) bool {
// We do all of these first...
if !obj.fdone && len(obj.frags) > 0 {
obj.fdone = true // mark as done
return true // keep going
}
// Then this is the second part...
// if there aren't any more remaining
if len(obj.data) <= obj.pointer {
return false
@@ -1013,7 +1238,24 @@ func (obj *FileRes) AutoEdges() (engine.AutoEdge, error) {
path: x, // what matters
}) // build list
}
// Ensure any file or dir fragments come first.
frags := []engine.ResUID{}
for _, frag := range obj.Fragments {
var reversed = true // cheat by passing a pointer
frags = append(frags, &FileUID{
BaseUID: engine.BaseUID{
Name: obj.Name(),
Kind: obj.Kind(),
Reversed: &reversed,
},
path: frag, // what matters
}) // build list
}
return &FileResAutoEdges{
frags: frags,
data: data,
pointer: 0,
found: false,
@@ -1075,18 +1317,23 @@ func (obj *FileRes) Copy() engine.CopyableRes {
s := *obj.Content
content = &s
}
fragments := []string{}
for _, frag := range obj.Fragments {
fragments = append(fragments, frag)
}
return &FileRes{
Path: obj.Path,
Dirname: obj.Dirname,
Basename: obj.Basename,
State: obj.State, // TODO: if this becomes a pointer, copy the string!
Content: content,
Source: obj.Source,
Owner: obj.Owner,
Group: obj.Group,
Mode: obj.Mode,
Recurse: obj.Recurse,
Force: obj.Force,
Path: obj.Path,
Dirname: obj.Dirname,
Basename: obj.Basename,
State: obj.State, // TODO: if this becomes a pointer, copy the string!
Content: content,
Source: obj.Source,
Fragments: fragments,
Owner: obj.Owner,
Group: obj.Group,
Mode: obj.Mode,
Recurse: obj.Recurse,
Force: obj.Force,
}
}
@@ -1133,8 +1380,9 @@ func (obj *FileRes) Reversed() (engine.ReversibleRes, error) {
// If we've specified content, we might need to restore the original, OR
// if we're removing the file with a `state => "absent"`, save it too...
// We do this whether we specified content with Content or w/ Fragments.
// The `res.State != FileStateAbsent` check is an optional optimization.
if (obj.Content != nil || obj.State == FileStateAbsent) && res.State != FileStateAbsent {
if ((obj.Content != nil || len(obj.Fragments) > 0) || obj.State == FileStateAbsent) && res.State != FileStateAbsent {
content, err := ioutil.ReadFile(obj.getPath())
if err != nil && !os.IsNotExist(err) {
return nil, errwrap.Wrapf(err, "could not read file for reversal storage")
@@ -1154,6 +1402,13 @@ func (obj *FileRes) Reversed() (engine.ReversibleRes, error) {
return nil, fmt.Errorf("can't reverse with Source yet")
}
// We suck in the previous file contents above when Fragments is used...
// This is basically the very same code path as when we reverse Content.
// TODO: Do we want to do it this way or is there a better reverse path?
if len(obj.Fragments) > 0 {
res.Fragments = []string{}
}
// There is a race if the operating system is adding/changing/removing
// the file between the ioutil.Readfile at the top and here. If there is
// a discrepancy between the two, then you might get an unexpected