From 3e16d1da4647743337725662bdd632a46501336d Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Thu, 31 Oct 2019 23:03:55 +0100 Subject: [PATCH] engine: resources: Add new consul resource This commit adds a new consul:kv resource which allows us to set and watch keys inside a consul kv datastore. This was started by roidelapluie, and was finished during pair programming with purpleidea. Signed-off-by: Julien Pivotto Signed-off-by: James Shubin --- docs/resources.md | 1 + engine/resources/consul_kv.go | 283 +++++++++++++++++++++++++++++ engine/resources/consul_kv_test.go | 71 ++++++++ examples/lang/consul1.mcl | 4 + examples/lang/consul2.mcl | 7 + 5 files changed, 366 insertions(+) create mode 100644 engine/resources/consul_kv.go create mode 100644 engine/resources/consul_kv_test.go create mode 100644 examples/lang/consul1.mcl create mode 100644 examples/lang/consul2.mcl diff --git a/docs/resources.md b/docs/resources.md index 0292d4a4..010c8ca2 100644 --- a/docs/resources.md +++ b/docs/resources.md @@ -17,6 +17,7 @@ You might want to look at the [generated documentation](https://godoc.org/github for more up-to-date information about these resources. * [Augeas](#Augeas): Manipulate files using augeas. +* [Consul:KV](#ConsulKV): Set keys in a Consul datastore. * [Docker](#Docker):[Container](#Container) Manage docker containers. * [Exec](#Exec): Execute shell commands on the system. * [File](#File): Manage files and directories. diff --git a/engine/resources/consul_kv.go b/engine/resources/consul_kv.go new file mode 100644 index 00000000..aecb0bf4 --- /dev/null +++ b/engine/resources/consul_kv.go @@ -0,0 +1,283 @@ +// Mgmt +// Copyright (C) 2013-2019+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package resources + +import ( + "context" + "fmt" + "net/url" + "sync" + + "github.com/purpleidea/mgmt/engine" + "github.com/purpleidea/mgmt/engine/traits" + "github.com/purpleidea/mgmt/util" + "github.com/purpleidea/mgmt/util/errwrap" + + "github.com/hashicorp/consul/api" +) + +func init() { + engine.RegisterResource("consul:kv", func() engine.Res { return &ConsulKVRes{} }) +} + +// ConsulKVRes is a resource that writes a value into a Consul datastore. The +// name of the resource can either be the key name, or the concatenation of the +// server address and the key name: http://127.0.0.1:8500/my-key. If the param +// keys are specified, then those are used. If the Name cannot be properly +// parsed by url.Parse, then it will be considered as the Key's value. If the +// Key is specified explicitly, then we won't use anything from the Name. +type ConsulKVRes struct { + traits.Base + init *engine.Init + + // Key is the name of the key. Defaults to the name of the resource. + Key string `lang:"key" yaml:"key"` + + // Value is the value for the key. + Value string `lang:"value" yaml:"value"` + + // Scheme is the URI scheme for the Consul server. Default: http. + Scheme string `lang:"scheme" yaml:"scheme"` + + // Address is the address of the Consul server. Default: 127.0.0.1:8500. + Address string `lang:"address" yaml:"address"` + + // Token is used to provide an ACL token to use for this resource. + Token string `lang:"token" yaml:"token"` + + client *api.Client + config *api.Config // needed to close the idle connections + once bool // safety token + key string // cache the key name to avoid re-running the parser +} + +// Default returns some sensible defaults for this resource. +func (obj *ConsulKVRes) Default() engine.Res { + return &ConsulKVRes{} +} + +// Validate if the params passed in are valid data. +func (obj *ConsulKVRes) Validate() error { + s, _, k := obj.inputParser() + if k == "" { + return fmt.Errorf("the Key is empty") + } + if s != "" && s != "http" && s != "https" { + return fmt.Errorf("unknown Scheme") + } + return nil +} + +// Init runs some startup code for this resource. +func (obj *ConsulKVRes) Init(init *engine.Init) error { + obj.init = init // save for later + + s, a, k := obj.inputParser() + + obj.config = api.DefaultConfig() + if s != "" { + obj.config.Scheme = s + } + if a != "" { + obj.config.Address = obj.Address + } + obj.key = k // store the key + obj.init.Logf("using consul key: %s", obj.key) + + if obj.Token != "" { + obj.config.Token = obj.Token + } + + var err error + obj.client, err = api.NewClient(obj.config) + return errwrap.Wrapf(err, "could not create Consul client") +} + +// Close is run by the engine to clean up after the resource is done. +func (obj *ConsulKVRes) Close() error { + if obj.config != nil && obj.config.Transport != nil { + obj.config.Transport.CloseIdleConnections() + } + return nil +} + +// Watch is the listener and main loop for this resource and it outputs events. +func (obj *ConsulKVRes) Watch() error { + wg := &sync.WaitGroup{} + defer wg.Wait() + + ch := make(chan error) + exit := make(chan struct{}) + + kv := obj.client.KV() + + wg.Add(1) + go func() { + defer close(ch) + defer wg.Done() + + opts := &api.QueryOptions{RequireConsistent: true} + ctx, cancel := util.ContextWithCloser(context.Background(), exit) + defer cancel() + opts = opts.WithContext(ctx) + + for { + _, meta, err := kv.Get(obj.key, opts) + select { + case ch <- err: // send + if err != nil { + return + } + + // WaitIndex = 0, which means that it is the + // first time we run the query, as we are about + // to change the WaitIndex to make a blocking + // query, we can consider the watch started. + opts.WaitIndex = meta.LastIndex + if opts.WaitIndex != 0 { + continue + } + + if !obj.once { + obj.init.Running() + obj.once = true + continue + } + + // Unexpected situation, bug in consul API... + select { + case ch <- fmt.Errorf("unexpected behaviour in Consul API"): + case <-obj.init.Done: // signal for shutdown request + } + + case <-obj.init.Done: // signal for shutdown request + } + return + } + }() + + defer close(exit) + for { + select { + case err, ok := <-ch: + if !ok { // channel shutdown + return nil + } + if err != nil { + return errwrap.Wrapf(err, "unknown %s watcher error", obj) + } + if obj.init.Debug { + obj.init.Logf("event!") + } + obj.init.Event() + + case <-obj.init.Done: // signal for shutdown request + return nil + } + } +} + +// CheckApply is run to check the state and, if apply is true, to apply the +// necessary changes to reach the desired state. This is run before Watch and +// again if Watch finds a change occurring to the state. +func (obj *ConsulKVRes) CheckApply(apply bool) (bool, error) { + if obj.init.Debug { + obj.init.Logf("consul key: %s", obj.key) + } + kv := obj.client.KV() + pair, _, err := kv.Get(obj.key, nil) + if err != nil { + return false, err + } + + if pair != nil && string(pair.Value) == obj.Value { + return true, nil + } + + if !apply { + return false, nil + } + + p := &api.KVPair{Key: obj.key, Value: []byte(obj.Value)} + _, err = kv.Put(p, nil) + return false, err +} + +// Cmp compares two resources and return if they are equivalent. +func (obj *ConsulKVRes) Cmp(r engine.Res) error { + res, ok := r.(*ConsulKVRes) + if !ok { + return fmt.Errorf("not a %s", obj.Kind()) + } + + if obj.Key != res.Key { + return fmt.Errorf("the Key param differs") + } + if obj.Value != res.Value { + return fmt.Errorf("the Value param differs") + } + if obj.Scheme != res.Scheme { + return fmt.Errorf("the Scheme param differs") + } + if obj.Address != res.Address { + return fmt.Errorf("the Address param differs") + } + if obj.Token != res.Token { + return fmt.Errorf("the Token param differs") + } + + return nil +} + +// inputParser parses the Name() of a resource and extracts the scheme, address, +// and key name of a consul key. We don't have an error, because if we have one, +// then it means the input must be a raw key. Output of this function is scheme, +// address (includes hostname and port), and key. This also takes our parameters +// in to account, and applies the correct overrides if they are specified there. +func (obj *ConsulKVRes) inputParser() (string, string, string) { + // If the key is specified explicitly, then we're not going to parse the + // resource name for a pattern, and we use our given params as they are. + if obj.Key != "" { + return obj.Scheme, obj.Address, obj.Key + } + + // Now we parse... + u, err := url.Parse(obj.Name()) + if err != nil { + // If this didn't work, then we know it's explicitly a raw key. + return obj.Scheme, obj.Address, obj.Name() + } + + // Otherwise, we use the parse result, and we overwrite any of the + // fields if we have an explicit param that was specified. + k := u.Path + s := u.Scheme + a := u.Host + + //if obj.Key != "" { // this is now guaranteed to never happen + // k = obj.Key + //} + if obj.Scheme != "" { + s = obj.Scheme + } + if obj.Address != "" { + a = obj.Address + } + + return s, a, k +} diff --git a/engine/resources/consul_kv_test.go b/engine/resources/consul_kv_test.go new file mode 100644 index 00000000..f0d5e976 --- /dev/null +++ b/engine/resources/consul_kv_test.go @@ -0,0 +1,71 @@ +// Mgmt +// Copyright (C) 2013-2019+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package resources + +import ( + "fmt" + "testing" + + "github.com/purpleidea/mgmt/engine" +) + +func createConsulRes(name string) *ConsulKVRes { + r, err := engine.NewNamedResource("consul:kv", name) + if err != nil { + panic(fmt.Sprintf("could not create resource: %+v", err)) + } + + res := r.(*ConsulKVRes) // if this panics, the test will panic + return res +} + +func TestParseConsulName(t *testing.T) { + n1 := "test" + r1 := createConsulRes(n1) + if s, a, k := r1.inputParser(); s != "" || a != "" || k != "test" { + t.Errorf("unexpected output while parsing `%s`: %s, %s, %s", n1, s, a, k) + } + + n2 := "http://127.0.0.1:8500/test" + r2 := createConsulRes(n2) + if s, a, k := r2.inputParser(); s != "http" || a != "127.0.0.1:8500" || k != "/test" { + t.Errorf("unexpected output while parsing `%s`: %s, %s, %s", n2, s, a, k) + } + + n3 := "http://127.0.0.1:8500/test" + r3 := createConsulRes(n3) + r3.Scheme = "https" + r3.Address = "example.com" + if s, a, k := r3.inputParser(); s != "https" || a != "example.com" || k != "/test" { + t.Errorf("unexpected output while parsing `%s`: %s, %s, %s", n3, s, a, k) + } + + n4 := "http:://127.0.0.1..5:8500/test" // wtf, url.Parse is on drugs... + r4 := createConsulRes(n4) + //if s, a, k := r4.inputParser(); s != "" || a != "" || k != n4 { // what i really expect + if s, a, k := r4.inputParser(); s != "http" || a != "" || k != "" { // what i get + t.Errorf("unexpected output while parsing `%s`: %s, %s, %s", n4, s, a, k) + } + + n5 := "http://127.0.0.1:8500/test" // whatever, it's ignored + r5 := createConsulRes(n3) + r5.Key = "some key" + if s, a, k := r5.inputParser(); s != "" || a != "" || k != "some key" { + t.Errorf("unexpected output while parsing `%s`: %s, %s, %s", n5, s, a, k) + } +} diff --git a/examples/lang/consul1.mcl b/examples/lang/consul1.mcl new file mode 100644 index 00000000..63cf27e0 --- /dev/null +++ b/examples/lang/consul1.mcl @@ -0,0 +1,4 @@ +consul:kv "love" { + key => "mgmt/love", + value => "YES", +} diff --git a/examples/lang/consul2.mcl b/examples/lang/consul2.mcl new file mode 100644 index 00000000..50983230 --- /dev/null +++ b/examples/lang/consul2.mcl @@ -0,0 +1,7 @@ +import "datetime" + +$d = datetime.now() +consul:kv "love" { + key => "mgmt/time", + value => template("hello! it is now: {{ datetime_print . }}\n", $d), +}