From 58421fd31a15d1ca2f651bbdb50034f50c697bbc Mon Sep 17 00:00:00 2001 From: James Shubin Date: Tue, 22 Oct 2019 12:17:17 -0400 Subject: [PATCH] engine: resources: Add fragments support to file resource This adds a "fragments" mode to the file resource. In addition to "content" and "source", you can now alternatively build a file from the file fragments of other files. --- docs/resources.md | 7 + engine/resources/file.go | 315 ++++++++++++++++-- engine/resources/resources_test.go | 74 ++++ examples/lang/frag1.mcl | 33 ++ .../TestAstFunc2/file-fragments1.output | 11 + .../TestAstFunc2/file-fragments1/main.mcl | 33 ++ recwatch/recwatch.go | 2 + 7 files changed, 445 insertions(+), 30 deletions(-) create mode 100644 examples/lang/frag1.mcl create mode 100644 lang/interpret_test/TestAstFunc2/file-fragments1.output create mode 100644 lang/interpret_test/TestAstFunc2/file-fragments1/main.mcl diff --git a/docs/resources.md b/docs/resources.md index 2a39a87e..6c8b24ee 100644 --- a/docs/resources.md +++ b/docs/resources.md @@ -98,6 +98,13 @@ The content property is a string that specifies the desired file contents. The source property points to a source file or directory path that we wish to copy over and use as the desired contents for our resource. +### Fragments + +The fragments property lets you specify a list of files to concatenate together +to make up the contents of this file. They will be combined in the order that +they are listed in. If one of the files specified is a directory, then the +files in that top-level directory will be themselves combined together and used. + ### Recurse The recurse property limits whether file resource operations should recurse into diff --git a/engine/resources/file.go b/engine/resources/file.go index e1d7c17a..25db53ae 100644 --- a/engine/resources/file.go +++ b/engine/resources/file.go @@ -29,6 +29,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "syscall" "github.com/purpleidea/mgmt/engine" @@ -81,11 +82,25 @@ type FileRes struct { State string `lang:"state" yaml:"state"` // Content specifies the file contents to use. If this is nil, they are - // left undefined. It cannot be combined with Source. + // left undefined. It cannot be combined with the Source or Fragments + // parameters. Content *string `lang:"content" yaml:"content"` // Source specifies the source contents for the file resource. It cannot - // be combined with the Content parameter. + // be combined with the Content or Fragments parameters. Source string `lang:"source" yaml:"source"` + // Fragments specifies that the file is built from a list of individual + // files. If one of the files is a directory, then the list of files in + // that directory are the fragments to combine. Multiple of these can be + // used together, although most simple cases will probably only either + // involve a single directory path or a fixed list of individual files. + // All paths are absolute and as a result must start with a slash. The + // directories (if any) must end with a slash as well. This cannot be + // combined with the Content or Source parameters. If a file with param + // is reversed, the reversed file is one that has `Content` set instead. + // Automatic edges will be added from these fragments. This currently + // isn't recursive in that if a fragment is a directory, this only + // searches one level deep at the moment. + Fragments []string `lang:"fragments" yaml:"fragments"` // Owner specifies the file owner. You can specify either the string // name, or a string representation of the owner integer uid. @@ -100,8 +115,7 @@ type FileRes struct { Recurse bool `lang:"recurse" yaml:"recurse"` Force bool `lang:"force" yaml:"force"` - sha256sum string - recWatcher *recwatch.RecWatcher + sha256sum string } // getPath returns the actual path to use for this resource. It computes this @@ -173,16 +187,26 @@ func (obj *FileRes) Validate() error { return fmt.Errorf("the State is invalid") } - if obj.State == FileStateAbsent && obj.Content != nil { - return fmt.Errorf("can't specify Content for an absent file") + isContent := obj.Content != nil + isSrc := obj.Source != "" + isFrag := len(obj.Fragments) > 0 + if (isContent && isSrc) || (isSrc && isFrag) || (isFrag && isContent) { + return fmt.Errorf("can only specify one of Content, Source, and Fragments") } - if obj.Content != nil && obj.Source != "" { - return fmt.Errorf("can't specify both Content and Source") + if obj.State == FileStateAbsent && (isContent || isSrc || isFrag) { + return fmt.Errorf("can't specify file Content, Source, or Fragments when State is %s", FileStateAbsent) } - if obj.isDir() && obj.Content != nil { // makes no sense - return fmt.Errorf("can't specify Content when creating a Dir") + if obj.isDir() && (isContent || isFrag) { // makes no sense + return fmt.Errorf("can't specify Content or Fragments when creating a Dir") + } + + for _, frag := range obj.Fragments { + // absolute paths begin with a slash + if !strings.HasPrefix(frag, "/") { + return fmt.Errorf("the frag (`%s`) isn't an absolute path", frag) + } } // TODO: should we silently ignore these errors or include them? @@ -247,14 +271,107 @@ func (obj *FileRes) Close() error { // Modify with caution, it is probably important to write some test cases first! // If the Watch returns an error, it means that something has gone wrong, and it // must be restarted. On a clean exit it returns nil. -// FIXME: Also watch the source directory when using obj.Source !!! func (obj *FileRes) Watch() error { - var err error - obj.recWatcher, err = recwatch.NewRecWatcher(obj.getPath(), obj.Recurse) + // TODO: chan *recwatch.Event instead? + inputEvents := make(chan recwatch.Event) + defer close(inputEvents) + + wg := &sync.WaitGroup{} + defer wg.Wait() + + exit := make(chan struct{}) + // TODO: should this be after (later in the file) than the `defer recWatcher.Close()` ? + // TODO: should this be after (later in the file) the `defer recWatcher.Close()` ? + defer close(exit) + + recWatcher, err := recwatch.NewRecWatcher(obj.getPath(), obj.Recurse) if err != nil { return err } - defer obj.recWatcher.Close() + defer recWatcher.Close() + + // watch the various inputs to this file resource too! + if obj.Source != "" { + // This block is virtually identical to the below one. + recurse := strings.HasSuffix(obj.Source, "/") // isDir + rw, err := recwatch.NewRecWatcher(obj.Source, recurse) + if err != nil { + return err + } + defer rw.Close() + + wg.Add(1) + go func() { + defer wg.Done() + for { + // TODO: *recwatch.Event instead? + var event recwatch.Event + var ok bool + var shutdown bool + select { + case event, ok = <-rw.Events(): // recv + case <-exit: // unblock + return + } + + if !ok { + err := fmt.Errorf("channel shutdown") + event = recwatch.Event{Error: err} + shutdown = true + } + + select { + case inputEvents <- event: // send + if shutdown { // optimization to free early + return + } + case <-exit: // unblock + return + } + } + }() + } + for _, frag := range obj.Fragments { + // This block is virtually identical to the above one. + recurse := false // TODO: is it okay for depth==1 dirs? + //recurse := strings.HasSuffix(frag, "/") // isDir + rw, err := recwatch.NewRecWatcher(frag, recurse) + if err != nil { + return err + } + defer rw.Close() + + wg.Add(1) + go func() { + defer wg.Done() + for { + // TODO: *recwatch.Event instead? + var event recwatch.Event + var ok bool + var shutdown bool + select { + case event, ok = <-rw.Events(): // recv + case <-exit: // unblock + return + } + + if !ok { + err := fmt.Errorf("channel shutdown") + event = recwatch.Event{Error: err} + shutdown = true + } + + select { + case inputEvents <- event: // send + if shutdown { // optimization to free early + return + } + case <-exit: // unblock + return + } + } + }() + } obj.init.Running() // when started, notify engine that we're running @@ -265,9 +382,12 @@ func (obj *FileRes) Watch() error { } select { - case event, ok := <-obj.recWatcher.Events(): + case event, ok := <-recWatcher.Events(): if !ok { // channel shutdown - return nil + // TODO: Should this be an error? Previously it + // was a `return nil`, and i'm not sure why... + //return nil + return fmt.Errorf("unexpected close") } if err := event.Error; err != nil { return errwrap.Wrapf(err, "unknown %s watcher error", obj) @@ -277,6 +397,18 @@ func (obj *FileRes) Watch() error { } send = true + case event, ok := <-inputEvents: + if !ok { + return fmt.Errorf("unexpected close") + } + if err := event.Error; err != nil { + return errwrap.Wrapf(err, "unknown %s input watcher error", obj) + } + if obj.init.Debug { // don't access event.Body if event.Error isn't nil + obj.init.Logf("input event(%s): %v", event.Body.Name, event.Body.Op) + } + send = true + case <-obj.init.Done: // closed by the engine to signal shutdown return nil } @@ -690,7 +822,7 @@ func (obj *FileRes) stateCheckApply(apply bool) (bool, error) { // Optimization: we shouldn't even look at obj.Content here, but we can // skip this empty file creation here since we know we're going to be // making it there anyways. This way we save the extra fopen noise. - if obj.Content != nil { + if obj.Content != nil || len(obj.Fragments) > 0 { return false, nil // pretend we actually made it } @@ -718,6 +850,7 @@ func (obj *FileRes) contentCheckApply(apply bool) (bool, error) { return true, nil } + // Actually write the file. This is similar to fragmentsCheckApply. bufferSrc := bytes.NewReader([]byte(*obj.Content)) sha256sum, checkOK, err := obj.fileCheckApply(apply, bufferSrc, obj.getPath(), obj.sha256sum) if sha256sum != "" { // empty values mean errored or didn't hash @@ -749,6 +882,66 @@ func (obj *FileRes) sourceCheckApply(apply bool) (bool, error) { return checkOK, nil } +// fragmentsCheckApply performs a CheckApply for the file fragments. +func (obj *FileRes) fragmentsCheckApply(apply bool) (bool, error) { + obj.init.Logf("fragmentsCheckApply(%t)", apply) + + // fragments is not defined, leave it alone... + if len(obj.Fragments) == 0 { + return true, nil + } + + content := "" + // TODO: In the future we could have a flag that merges and then sorts + // all the individual files in each directory before they are combined. + for _, frag := range obj.Fragments { + // It's a single file. Add it to what we're building... + if isDir := strings.HasSuffix(frag, "/"); !isDir { + out, err := ioutil.ReadFile(frag) + if err != nil { + return false, errwrap.Wrapf(err, "could not read file fragment") + } + content += string(out) + continue + } + + // We're a dir, peer inside... + files, err := ioutil.ReadDir(frag) + if err != nil { + return false, errwrap.Wrapf(err, "could not read fragment directory") + } + // TODO: Add a sort and filter option so that we can choose the + // way we iterate through this directory to build out the file. + for _, file := range files { + if file.IsDir() { // skip recursive solutions for now... + continue + } + f := path.Join(frag, file.Name()) + out, err := ioutil.ReadFile(f) + if err != nil { + return false, errwrap.Wrapf(err, "could not read directory file fragment") + } + content += string(out) + } + } + + // Actually write the file. This is similar to contentCheckApply. + bufferSrc := bytes.NewReader([]byte(content)) + // NOTE: We pass in an invalidated sha256sum cache since we don't cache + // all the individual files, and it could all change without us knowing. + // TODO: Is the sha256sum caching even having an effect at all here ??? + sha256sum, checkOK, err := obj.fileCheckApply(apply, bufferSrc, obj.getPath(), "") + if sha256sum != "" { // empty values mean errored or didn't hash + // this can be valid even when the whole function errors + obj.sha256sum = sha256sum // cache value + } + if err != nil { + return false, err + } + // if no err, but !ok, then... + return checkOK, nil // success +} + // chownCheckApply performs a CheckApply for the file ownership. func (obj *FileRes) chownCheckApply(apply bool) (bool, error) { obj.init.Logf("chownCheckApply(%t)", apply) @@ -858,7 +1051,8 @@ func (obj *FileRes) CheckApply(apply bool) (bool, error) { checkOK := true - // run stateCheckApply before contentCheckApply and sourceCheckApply + // Run stateCheckApply before contentCheckApply, sourceCheckApply, and + // fragmentsCheckApply. if c, err := obj.stateCheckApply(apply); err != nil { return false, err } else if !c { @@ -874,6 +1068,11 @@ func (obj *FileRes) CheckApply(apply bool) (bool, error) { } else if !c { checkOK = false } + if c, err := obj.fragmentsCheckApply(apply); err != nil { + return false, err + } else if !c { + checkOK = false + } if c, err := obj.chownCheckApply(apply); err != nil { return false, err @@ -918,6 +1117,14 @@ func (obj *FileRes) Cmp(r engine.Res) error { if obj.Source != res.Source { return fmt.Errorf("the Source differs") } + if len(obj.Fragments) != len(res.Fragments) { + return fmt.Errorf("the number of Fragments differs") + } + for i, x := range obj.Fragments { + if frag := res.Fragments[i]; x != frag { + return fmt.Errorf("the fragment at index %d differs", i) + } + } if obj.Owner != res.Owner { return fmt.Errorf("the Owner differs") @@ -958,6 +1165,11 @@ func (obj *FileUID) IFF(uid engine.ResUID) bool { // FileResAutoEdges holds the state of the auto edge generator. type FileResAutoEdges struct { + // We do all of these first... + frags []engine.ResUID + fdone bool + + // Then this is the second part... data []engine.ResUID pointer int found bool @@ -965,6 +1177,12 @@ type FileResAutoEdges struct { // Next returns the next automatic edge. func (obj *FileResAutoEdges) Next() []engine.ResUID { + // We do all of these first... + if !obj.fdone && len(obj.frags) > 0 { + return obj.frags // return them all at the same time + } + + // Then this is the second part... if obj.found { panic("Shouldn't be called anymore!") } @@ -978,6 +1196,13 @@ func (obj *FileResAutoEdges) Next() []engine.ResUID { // Test gets results of the earlier Next() call, & returns if we should continue! func (obj *FileResAutoEdges) Test(input []bool) bool { + // We do all of these first... + if !obj.fdone && len(obj.frags) > 0 { + obj.fdone = true // mark as done + return true // keep going + } + + // Then this is the second part... // if there aren't any more remaining if len(obj.data) <= obj.pointer { return false @@ -1013,7 +1238,24 @@ func (obj *FileRes) AutoEdges() (engine.AutoEdge, error) { path: x, // what matters }) // build list } + + // Ensure any file or dir fragments come first. + frags := []engine.ResUID{} + for _, frag := range obj.Fragments { + var reversed = true // cheat by passing a pointer + frags = append(frags, &FileUID{ + BaseUID: engine.BaseUID{ + Name: obj.Name(), + Kind: obj.Kind(), + Reversed: &reversed, + }, + path: frag, // what matters + }) // build list + + } + return &FileResAutoEdges{ + frags: frags, data: data, pointer: 0, found: false, @@ -1075,18 +1317,23 @@ func (obj *FileRes) Copy() engine.CopyableRes { s := *obj.Content content = &s } + fragments := []string{} + for _, frag := range obj.Fragments { + fragments = append(fragments, frag) + } return &FileRes{ - Path: obj.Path, - Dirname: obj.Dirname, - Basename: obj.Basename, - State: obj.State, // TODO: if this becomes a pointer, copy the string! - Content: content, - Source: obj.Source, - Owner: obj.Owner, - Group: obj.Group, - Mode: obj.Mode, - Recurse: obj.Recurse, - Force: obj.Force, + Path: obj.Path, + Dirname: obj.Dirname, + Basename: obj.Basename, + State: obj.State, // TODO: if this becomes a pointer, copy the string! + Content: content, + Source: obj.Source, + Fragments: fragments, + Owner: obj.Owner, + Group: obj.Group, + Mode: obj.Mode, + Recurse: obj.Recurse, + Force: obj.Force, } } @@ -1133,8 +1380,9 @@ func (obj *FileRes) Reversed() (engine.ReversibleRes, error) { // If we've specified content, we might need to restore the original, OR // if we're removing the file with a `state => "absent"`, save it too... + // We do this whether we specified content with Content or w/ Fragments. // The `res.State != FileStateAbsent` check is an optional optimization. - if (obj.Content != nil || obj.State == FileStateAbsent) && res.State != FileStateAbsent { + if ((obj.Content != nil || len(obj.Fragments) > 0) || obj.State == FileStateAbsent) && res.State != FileStateAbsent { content, err := ioutil.ReadFile(obj.getPath()) if err != nil && !os.IsNotExist(err) { return nil, errwrap.Wrapf(err, "could not read file for reversal storage") @@ -1154,6 +1402,13 @@ func (obj *FileRes) Reversed() (engine.ReversibleRes, error) { return nil, fmt.Errorf("can't reverse with Source yet") } + // We suck in the previous file contents above when Fragments is used... + // This is basically the very same code path as when we reverse Content. + // TODO: Do we want to do it this way or is there a better reverse path? + if len(obj.Fragments) > 0 { + res.Fragments = []string{} + } + // There is a race if the operating system is adding/changing/removing // the file between the ioutil.Readfile at the top and here. If there is // a discrepancy between the two, then you might get an unexpected diff --git a/engine/resources/resources_test.go b/engine/resources/resources_test.go index 0a8d9527..35d63ddd 100644 --- a/engine/resources/resources_test.go +++ b/engine/resources/resources_test.go @@ -723,6 +723,15 @@ func TestResources2(t *testing.T) { return nil } } + fileMkdir := func(p string, all bool) func() error { + // mkdir at the path + return func() error { + if all { + return os.MkdirAll(p, 0777) + } + return os.Mkdir(p, 0777) + } + } testCases := []test{} { @@ -1121,6 +1130,71 @@ func TestResources2(t *testing.T) { cleanup: func() error { return nil }, }) } + { + //file "/tmp/somefile" { + // state => "exists", + // fragments => [ + // "/tmp/frag1", + // "/tmp/fragdir1/", + // "/tmp/frag2", + // "/tmp/fragdir2/", + // "/tmp/frag3", + // ], + //} + r1 := makeRes("file", "r1") + res := r1.(*FileRes) // if this panics, the test will panic + p := "/tmp/somefile" + res.Path = p + res.State = "exists" + res.Fragments = []string{ + "/tmp/frag1", + "/tmp/fragdir1/", + "/tmp/frag2", + "/tmp/fragdir2/", + "/tmp/frag3", + } + + frag1 := "frag1\n" + f1 := "f1\n" + f2 := "f2\n" + f3 := "f3\n" + frag2 := "frag2\n" + f1d2 := "f1 from fragdir2\n" + f2d2 := "f2 from fragdir2\n" + f3d2 := "f3 from fragdir2\n" + frag3 := "frag3\n" + content := frag1 + f1 + f2 + f3 + frag2 + f1d2 + f2d2 + f3d2 + frag3 + + timeline := []func() error{ + fileWrite("/tmp/frag1", frag1), + fileWrite("/tmp/frag2", frag2), + fileWrite("/tmp/frag3", frag3), + fileMkdir("/tmp/fragdir1/", true), + fileWrite("/tmp/fragdir1/f1", f1), + fileWrite("/tmp/fragdir1/f2", f2), + fileWrite("/tmp/fragdir1/f3", f3), + fileMkdir("/tmp/fragdir2/", true), + fileWrite("/tmp/fragdir2/f1", f1d2), + fileWrite("/tmp/fragdir2/f2", f2d2), + fileWrite("/tmp/fragdir2/f3", f3d2), + fileWrite(p, "whatever"), + resValidate(r1), + resInit(r1), + resCheckApply(r1, false), // changed + fileExpect(p, content), + resCheckApply(r1, true), // it's already good + resClose(r1), + fileExpect(p, content), // ensure it exists + } + + testCases = append(testCases, test{ + name: "file fragments", + timeline: timeline, + expect: func() error { return nil }, + startup: func() error { return nil }, + cleanup: func() error { return nil }, + }) + } names := []string{} for index, tc := range testCases { // run all the tests diff --git a/examples/lang/frag1.mcl b/examples/lang/frag1.mcl new file mode 100644 index 00000000..a8dc481f --- /dev/null +++ b/examples/lang/frag1.mcl @@ -0,0 +1,33 @@ +file "/tmp/frags/" { + state => "exists", +} + +# fragments +file "/tmp/frags/f1" { + state => "exists", + content => "i am f1\n", +} +file "/tmp/frags/f2" { + state => "exists", + content => "i am f2\n", +} +file "/tmp/frags/f3" { + state => "exists", + content => "i am f3\n", +} + +# You can also drop in an unmanaged file into the frags dir for it to get used! +# And of course you can hard-code the list of files to use like this one is... +file "/tmp/bonus_file" { + state => "exists", + content => "i am a bonus file\n", +} + +# automatic edges will get added! +file "/tmp/whole1" { + state => "exists", + fragments => [ + "/tmp/frags/", # pull from this dir + "/tmp/bonus_file", # also pull this one file + ], +} diff --git a/lang/interpret_test/TestAstFunc2/file-fragments1.output b/lang/interpret_test/TestAstFunc2/file-fragments1.output new file mode 100644 index 00000000..75b9337f --- /dev/null +++ b/lang/interpret_test/TestAstFunc2/file-fragments1.output @@ -0,0 +1,11 @@ +Edge: file[/tmp/frags/] -> file[/tmp/frags/f1] # file[/tmp/frags/] -> file[/tmp/frags/f1] (autoedge) +Edge: file[/tmp/frags/] -> file[/tmp/frags/f2] # file[/tmp/frags/] -> file[/tmp/frags/f2] (autoedge) +Edge: file[/tmp/frags/] -> file[/tmp/frags/f3] # file[/tmp/frags/] -> file[/tmp/frags/f3] (autoedge) +Edge: file[/tmp/bonus_file] -> file[/tmp/whole1] # file[/tmp/bonus_file] -> file[/tmp/whole1] (autoedge) +Edge: file[/tmp/frags/] -> file[/tmp/whole1] # file[/tmp/frags/] -> file[/tmp/whole1] (autoedge) +Vertex: file[/tmp/bonus_file] +Vertex: file[/tmp/frags/] +Vertex: file[/tmp/frags/f1] +Vertex: file[/tmp/frags/f2] +Vertex: file[/tmp/frags/f3] +Vertex: file[/tmp/whole1] diff --git a/lang/interpret_test/TestAstFunc2/file-fragments1/main.mcl b/lang/interpret_test/TestAstFunc2/file-fragments1/main.mcl new file mode 100644 index 00000000..a8dc481f --- /dev/null +++ b/lang/interpret_test/TestAstFunc2/file-fragments1/main.mcl @@ -0,0 +1,33 @@ +file "/tmp/frags/" { + state => "exists", +} + +# fragments +file "/tmp/frags/f1" { + state => "exists", + content => "i am f1\n", +} +file "/tmp/frags/f2" { + state => "exists", + content => "i am f2\n", +} +file "/tmp/frags/f3" { + state => "exists", + content => "i am f3\n", +} + +# You can also drop in an unmanaged file into the frags dir for it to get used! +# And of course you can hard-code the list of files to use like this one is... +file "/tmp/bonus_file" { + state => "exists", + content => "i am a bonus file\n", +} + +# automatic edges will get added! +file "/tmp/whole1" { + state => "exists", + fragments => [ + "/tmp/frags/", # pull from this dir + "/tmp/bonus_file", # also pull this one file + ], +} diff --git a/recwatch/recwatch.go b/recwatch/recwatch.go index e93c7244..380b0541 100644 --- a/recwatch/recwatch.go +++ b/recwatch/recwatch.go @@ -96,6 +96,7 @@ func (obj *RecWatcher) Init() error { obj.mutex.Lock() if !obj.closed { select { + // TODO: &Event instead? case obj.events <- Event{Error: err}: case <-obj.exit: // pass @@ -282,6 +283,7 @@ func (obj *RecWatcher) Watch() error { // only invalid state on certain types of events select { // exit even when we're blocked on event sending + // TODO: &Event instead? case obj.events <- Event{Error: nil, Body: &event}: case <-obj.exit: return fmt.Errorf("pending event not sent")