From 63af50bf980b5ae12b9c1159a6dfffe417ab0f8e Mon Sep 17 00:00:00 2001 From: Felix Frank Date: Sun, 24 Feb 2019 19:50:46 +0100 Subject: [PATCH] engine: resources: pippet: Initial implementation for new resource type The pippet resource implements faster integration of Puppet resources in mgmt at runtime, by piping synchronization commands to a Puppet process that keeps running alongside mgmt. This avoids huge overhead through launching a Puppet process for each operation on a resource that is delegated to Puppet. --- engine/resources/pippet.go | 334 ++++++++++++++++++++++++++++++++ engine/resources/pippet_test.go | 136 +++++++++++++ examples/yaml/pippet1.yaml | 8 + examples/yaml/pippet2.yaml | 88 +++++++++ 4 files changed, 566 insertions(+) create mode 100644 engine/resources/pippet.go create mode 100644 engine/resources/pippet_test.go create mode 100644 examples/yaml/pippet1.yaml create mode 100644 examples/yaml/pippet2.yaml diff --git a/engine/resources/pippet.go b/engine/resources/pippet.go new file mode 100644 index 00000000..44a35916 --- /dev/null +++ b/engine/resources/pippet.go @@ -0,0 +1,334 @@ +// Mgmt +// Copyright (C) 2013-2019+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package resources + +import ( + "encoding/json" + "fmt" + "io" + "os/exec" + "sync" + + "github.com/purpleidea/mgmt/engine" + "github.com/purpleidea/mgmt/engine/traits" + "github.com/purpleidea/mgmt/util/errwrap" +) + +var pippetReceiverInstance *pippetReceiver +var pippetReceiverOnce sync.Once + +func init() { + engine.RegisterResource("pippet", func() engine.Res { return &PippetRes{} }) +} + +// PippetRes is a wrapper resource for puppet. It implements the functional +// equivalent of an exec resource that calls "puppet resource +// <params>", but offers superior performance through a long-running Puppet +// process that receives resources through a pipe (hence the name). +type PippetRes struct { + traits.Base // add the base methods without re-implementation + traits.Refreshable + + init *engine.Init + + // Type is the exact name of the wrapped Puppet resource type, e.g. + // "file", "mount". This needs not be a core type. It can be a type + // from a module. The Puppet installation local to the mgmt agent + // machine must be able recognize it. It has to be a native type + // though, as opposed to defined types from your Puppet manifest code. + Type string `yaml:"type" json:"type"` + // Title is used by Puppet as the resource title. Puppet will often + // assign special meaning to the title, e.g. use it as the path for a + // file resource, or the name of a package. + Title string `yaml:"title" json:"title"` + // Params is expected to be a hash in YAML format, pairing resource + // parameter names with their respective values, e.g. { ensure: present + // } + Params string `yaml:"params" json:"params"` + + runner *pippetReceiver +} + +// Default returns an example Pippet resource. +func (obj *PippetRes) Default() engine.Res { + return &PippetRes{ + Params: "{}", // use an empty params hash per default + } +} + +// Validate never errors out. We don't know the set of potential types that +// Puppet supports. Resource names are arbitrary. We cannot really validate the +// parameter YAML, because we cannot assume that it can be unmarshalled into a +// map[string]string; Puppet supports complex parameter values. +func (obj *PippetRes) Validate() error { + return nil +} + +// Init makes sure that the PippetReceiver object is initialized. +func (obj *PippetRes) Init(init *engine.Init) error { + obj.init = init // save for later + obj.runner = getPippetReceiverInstance() + if err := obj.runner.Register(); err != nil { + return err + } + return nil +} + +// Close is run by the engine to clean up after the resource is done. +func (obj *PippetRes) Close() error { + return obj.runner.Unregister() +} + +// Watch is the primary listener for this resource and it outputs events. +func (obj *PippetRes) Watch() error { + obj.init.Running() // when started, notify engine that we're running + + select { + case <-obj.init.Done: // closed by the engine to signal shutdown + } + + //obj.init.Event() // notify engine of an event (this can block) + + return nil +} + +// CheckApply synchronizes the resource if required. +func (obj *PippetRes) CheckApply(apply bool) (bool, error) { + var changed bool + var err error + if changed, err = applyPippetRes(obj.runner, obj); err != nil { + return false, fmt.Errorf("pippet: %s[%s]: ERROR - %v", obj.Type, obj.Title, err) + } + return !changed, nil +} + +// Cmp compares two resources and returns an error if they are not equivalent. +func (obj *PippetRes) Cmp(r engine.Res) error { + res, ok := r.(*PippetRes) + if !ok { + return fmt.Errorf("not a %s", obj.Kind()) + } + + if obj.Type != res.Type { + return fmt.Errorf("the type params differ") + } + + if obj.Title != res.Title { + return fmt.Errorf("the resource titles differ") + } + + // FIXME: This is a lie. Parameter lists can be equivalent + // but not lexically identical (e.g. whitespace differences, + // parameter order). + // This is difficult to handle because we cannot casually + // unmarshall the YAML content. + if obj.Params != res.Params { + return fmt.Errorf("the parameters differ") + } + + return nil +} + +// PippetUID is the UID struct for PippetRes. +type PippetUID struct { + engine.BaseUID + resourceType string + resourceTitle string +} + +// UIDs includes all params to make a unique identification of this object. +// Most resources only return one, although some resources can return multiple. +func (obj *PippetRes) UIDs() []engine.ResUID { + x := &PippetUID{ + BaseUID: engine.BaseUID{Name: obj.Name(), Kind: obj.Kind()}, + resourceType: obj.Type, + resourceTitle: obj.Title, + } + return []engine.ResUID{x} +} + +// UnmarshalYAML is the custom unmarshal handler for this struct. It is +// primarily useful for setting the defaults. +func (obj *PippetRes) UnmarshalYAML(unmarshal func(interface{}) error) error { + type rawRes PippetRes // indirection to avoid infinite recursion + + def := obj.Default() // get the default + res, ok := def.(*PippetRes) // put in the right format + if !ok { + return fmt.Errorf("could not convert to PippetRes") + } + raw := rawRes(*res) // convert; the defaults go here + + if err := unmarshal(&raw); err != nil { + return err + } + + *obj = PippetRes(raw) // restore from indirection with type conversion! + return nil +} + +// PippetRunner is the interface used to communicate with the PippetReceiver +// object. Its main purpose is dependency injection. +type PippetRunner interface { + LockApply() + UnlockApply() + InputStream() io.WriteCloser + OutputStream() io.ReadCloser +} + +// PippetResult is the structured return value type for the PippetReceiver's +// Apply function. +type PippetResult struct { + Error bool + Failed bool + Changed bool + Exception string +} + +// GetPippetReceiverInstance returns a pointer to the PippetReceiver object. +// The PippetReceiver is supposed to be a singleton object. The pippet resource +// code should always use the PippetReceiverInstance function to gain access to +// the pippetReceiver object. Other objects of type pippetReceiver should not +// be created. +func getPippetReceiverInstance() *pippetReceiver { + for pippetReceiverInstance == nil { + pippetReceiverOnce.Do(func() { pippetReceiverInstance = &pippetReceiver{} }) + } + return pippetReceiverInstance +} + +type pippetReceiver struct { + stdin io.WriteCloser + stdout io.ReadCloser + registerMutex sync.Mutex + applyMutex sync.Mutex + registered int +} + +// Init runs the Puppet process that will perform the work of synchronizing +// resources that are sent to its stdin. The process will keep running until +// Close is called. Init should not be called directly. It is implicitly called +// by the Register function. +func (obj *pippetReceiver) Init() error { + cmd := exec.Command("puppet", "yamlresource", "receive", "--color=no") + var err error + obj.stdin, err = cmd.StdinPipe() + if err != nil { + return err + } + obj.stdout, err = cmd.StdoutPipe() + if err != nil { + return errwrap.Append(err, obj.stdin.Close()) + } + if err = cmd.Start(); err != nil { + return errwrap.Append(err, obj.stdin.Close()) + } + buf := make([]byte, 80) + if _, err = obj.stdout.Read(buf); err != nil { + return errwrap.Append(err, obj.stdin.Close()) + } + return nil +} + +// Register should be called by any user (i.e., any pippet resource) before +// using the PippetRunner functions on this receiver object. Register +// implicitly takes care of calling Init if required. +func (obj *pippetReceiver) Register() error { + obj.registerMutex.Lock() + defer obj.registerMutex.Unlock() + obj.registered = obj.registered + 1 + if obj.registered > 1 { + return nil + } + // count was increased from 0 to 1, we need to (re-)init + var err error + if err = obj.Init(); err != nil { + obj.registered = 0 + } + return err +} + +// Unregister should be called by any object that registered itself using the +// Register function, and which no longer needs the receiver. This should +// typically happen at closing time of the pippet resource that registered +// itself. Unregister implicitly calls Close in case all registered resources +// have unregistered. +func (obj *pippetReceiver) Unregister() error { + obj.registerMutex.Lock() + defer obj.registerMutex.Unlock() + obj.registered = obj.registered - 1 + if obj.registered == 0 { + return obj.Close() + } + if obj.registered < 0 { + return fmt.Errorf("pippet runner: ERROR: unregistered more resources than were registered") + } + return nil +} + +// LockApply locks the pippetReceiver's mutex for an "Apply"transaction. +func (obj *pippetReceiver) LockApply() { + obj.applyMutex.Lock() +} + +// UnlockApply unlocks the pippetReceiver's mutex for an "Apply"transaction. +func (obj *pippetReceiver) UnlockApply() { + obj.applyMutex.Unlock() +} + +// InputStream returns the pippetReceiver's pipe writer. +func (obj *pippetReceiver) InputStream() io.WriteCloser { + return obj.stdin +} + +// OutputStream returns the pippetReceiver's pipe reader. +func (obj *pippetReceiver) OutputStream() io.ReadCloser { + return obj.stdout +} + +// Close stops the backend puppet process by closing its stdin handle. It should +// not be called directly. It is implicitly called by the Unregister function if +// appropriate. +func (obj *pippetReceiver) Close() error { + return obj.stdin.Close() +} + +// applyPippetRes does the actual work of making Puppet synchronize a resource. +func applyPippetRes(runner PippetRunner, resource *PippetRes) (bool, error) { + runner.LockApply() + defer runner.UnlockApply() + if err := json.NewEncoder(runner.InputStream()).Encode(resource); err != nil { + return false, errwrap.Wrapf(err, "failed to send resource to puppet") + } + + result := PippetResult{ + Error: true, + Exception: "missing output fields", + } + if err := json.NewDecoder(runner.OutputStream()).Decode(&result); err != nil { + return false, errwrap.Wrapf(err, "failed to read response from puppet") + } + + if result.Error { + return false, fmt.Errorf("puppet did not sync: %s", result.Exception) + } + if result.Failed { + return false, fmt.Errorf("puppet failed to sync") + } + return result.Changed, nil +} diff --git a/engine/resources/pippet_test.go b/engine/resources/pippet_test.go new file mode 100644 index 00000000..0f060b7f --- /dev/null +++ b/engine/resources/pippet_test.go @@ -0,0 +1,136 @@ +// Mgmt +// Copyright (C) 2013-2019+ James Shubin and the project contributors +// Written by James Shubin <james@shubin.ca> and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +// +build !root + +package resources + +import ( + "io" + "testing" +) + +type nullWriteCloser struct { +} + +type fakePippetReceiver struct { + stdin nullWriteCloser + stdout *io.PipeReader + Locked bool +} + +func (obj nullWriteCloser) Write(data []byte) (int, error) { + return len(data), nil +} + +func (obj nullWriteCloser) Close() error { + return nil +} + +func (obj *fakePippetReceiver) LockApply() { + obj.Locked = true +} + +func (obj *fakePippetReceiver) UnlockApply() { + obj.Locked = false +} + +func (obj *fakePippetReceiver) InputStream() io.WriteCloser { + return obj.stdin +} + +func (obj *fakePippetReceiver) OutputStream() io.ReadCloser { + return obj.stdout +} + +func newFakePippetReceiver(jsonTestOutput string) *fakePippetReceiver { + output, input := io.Pipe() + + result := &fakePippetReceiver{ + stdout: output, + } + + go func() { + // this will appear on the fake stdout + input.Write([]byte(jsonTestOutput)) + }() + + return result +} + +var pippetTestRes = &PippetRes{ + Type: "notify", + Title: "testmessage", + Params: `{msg: "This is a test"}`, +} + +func TestNormalPuppetOutput(t *testing.T) { + r := newFakePippetReceiver(`{"resource":"Notify[test]","failed":false,"changed":true,"noop":false,"error":false,"exception":null}`) + changed, err := applyPippetRes(r, pippetTestRes) + + if err != nil { + t.Errorf("normal Puppet output led to an apply error: %v", err) + } + + if !changed { + t.Errorf("return values of applyPippetRes did not reflect the changed state") + } +} + +func TestUnchangedPuppetOutput(t *testing.T) { + r := newFakePippetReceiver(`{"resource":"Notify[test]","failed":false,"changed":false,"noop":false,"error":false,"exception":null}`) + changed, err := applyPippetRes(r, pippetTestRes) + + if err != nil { + t.Errorf("normal Puppet output led to an apply error: %v", err) + } + + if changed { + t.Errorf("return values of applyPippetRes did not reflect the changed state") + } +} + +func TestFailingPuppetOutput(t *testing.T) { + r := newFakePippetReceiver(`{"resource":"Notify[test]","failed":false,"changed":false,"noop":false,"error":true,"exception":"I failed!"}`) + _, err := applyPippetRes(r, pippetTestRes) + + if err == nil { + t.Errorf("failing Puppet output led to an apply error: %v", err) + } +} + +func TestEmptyPuppetOutput(t *testing.T) { + t.Skip("empty output will currently make the application (and the test) hang") +} + +func TestPartialPuppetOutput(t *testing.T) { + r := newFakePippetReceiver(`{"resource":"Notify[test]","failed":false,"changed":true}`) + _, err := applyPippetRes(r, pippetTestRes) + + if err == nil { + t.Errorf("partial Puppet output did not lead to an apply error") + } +} + +func TestMalformedPuppetOutput(t *testing.T) { + r := newFakePippetReceiver(`oops something went wrong!!1!eleven`) + _, err := applyPippetRes(r, pippetTestRes) + + if err == nil { + t.Errorf("malformed Puppet output did not lead to an apply error") + } +} diff --git a/examples/yaml/pippet1.yaml b/examples/yaml/pippet1.yaml new file mode 100644 index 00000000..c9c99b8e --- /dev/null +++ b/examples/yaml/pippet1.yaml @@ -0,0 +1,8 @@ +--- +graph: mygraph +resources: + pippet: + - name: File[mylink] + type: file + title: mylink + params: "{ path: /tmp/pippet1_link, ensure: link, target: /tmp/pippet1_target }" diff --git a/examples/yaml/pippet2.yaml b/examples/yaml/pippet2.yaml new file mode 100644 index 00000000..e947d29e --- /dev/null +++ b/examples/yaml/pippet2.yaml @@ -0,0 +1,88 @@ +--- +graph: mygraph +resources: + pippet: + - name: File[mylink1] + type: file + title: mylink1 + params: "{ path: /tmp/pippet1_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink2] + type: file + title: mylink2 + params: "{ path: /tmp/pippet2_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink3] + type: file + title: mylink + params: "{ path: /tmp/pippet3_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink4] + type: file + title: mylink + params: "{ path: /tmp/pippet4_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink5] + type: file + title: mylink + params: "{ path: /tmp/pippet5_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink6] + type: file + title: mylink + params: "{ path: /tmp/pippet6_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink7] + type: file + title: mylink + params: "{ path: /tmp/pippet7_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink8] + type: file + title: mylink + params: "{ path: /tmp/pippet8_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink9] + type: file + title: mylink + params: "{ path: /tmp/pippet9_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink10] + type: file + title: mylink + params: "{ path: /tmp/pippet10_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink11] + type: file + title: mylink + params: "{ path: /tmp/pippet11_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink12] + type: file + title: mylink + params: "{ path: /tmp/pippet12_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink13] + type: file + title: mylink + params: "{ path: /tmp/pippet13_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink14] + type: file + title: mylink + params: "{ path: /tmp/pippet14_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink15] + type: file + title: mylink + params: "{ path: /tmp/pippet15_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink16] + type: file + title: mylink + params: "{ path: /tmp/pippet16_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink17] + type: file + title: mylink + params: "{ path: /tmp/pippet17_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink18] + type: file + title: mylink + params: "{ path: /tmp/pippet18_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink19] + type: file + title: mylink + params: "{ path: /tmp/pippet19_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink20] + type: file + title: mylink + params: "{ path: /tmp/pippet20_link, ensure: link, target: /tmp/pippet1_target }" + - name: File[mylink21] + type: file + title: mylink + params: "{ path: /tmp/pippet21_link, ensure: link, target: /tmp/pippet1_target }"