engine: resources: Add new consul resource
This commit adds a new consul:kv resource which allows us to set and watch keys inside a consul kv datastore. This was started by roidelapluie, and was finished during pair programming with purpleidea. Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu> Signed-off-by: James Shubin <james@shubin.ca>
This commit is contained in:
committed by
James Shubin
parent
ae1860e859
commit
3e16d1da46
@@ -17,6 +17,7 @@ You might want to look at the [generated documentation](https://godoc.org/github
|
||||
for more up-to-date information about these resources.
|
||||
|
||||
* [Augeas](#Augeas): Manipulate files using augeas.
|
||||
* [Consul:KV](#ConsulKV): Set keys in a Consul datastore.
|
||||
* [Docker](#Docker):[Container](#Container) Manage docker containers.
|
||||
* [Exec](#Exec): Execute shell commands on the system.
|
||||
* [File](#File): Manage files and directories.
|
||||
|
||||
283
engine/resources/consul_kv.go
Normal file
283
engine/resources/consul_kv.go
Normal file
@@ -0,0 +1,283 @@
|
||||
// Mgmt
|
||||
// Copyright (C) 2013-2019+ James Shubin and the project contributors
|
||||
// Written by James Shubin <james@shubin.ca> and the project contributors
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package resources
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sync"
|
||||
|
||||
"github.com/purpleidea/mgmt/engine"
|
||||
"github.com/purpleidea/mgmt/engine/traits"
|
||||
"github.com/purpleidea/mgmt/util"
|
||||
"github.com/purpleidea/mgmt/util/errwrap"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
func init() {
|
||||
engine.RegisterResource("consul:kv", func() engine.Res { return &ConsulKVRes{} })
|
||||
}
|
||||
|
||||
// ConsulKVRes is a resource that writes a value into a Consul datastore. The
|
||||
// name of the resource can either be the key name, or the concatenation of the
|
||||
// server address and the key name: http://127.0.0.1:8500/my-key. If the param
|
||||
// keys are specified, then those are used. If the Name cannot be properly
|
||||
// parsed by url.Parse, then it will be considered as the Key's value. If the
|
||||
// Key is specified explicitly, then we won't use anything from the Name.
|
||||
type ConsulKVRes struct {
|
||||
traits.Base
|
||||
init *engine.Init
|
||||
|
||||
// Key is the name of the key. Defaults to the name of the resource.
|
||||
Key string `lang:"key" yaml:"key"`
|
||||
|
||||
// Value is the value for the key.
|
||||
Value string `lang:"value" yaml:"value"`
|
||||
|
||||
// Scheme is the URI scheme for the Consul server. Default: http.
|
||||
Scheme string `lang:"scheme" yaml:"scheme"`
|
||||
|
||||
// Address is the address of the Consul server. Default: 127.0.0.1:8500.
|
||||
Address string `lang:"address" yaml:"address"`
|
||||
|
||||
// Token is used to provide an ACL token to use for this resource.
|
||||
Token string `lang:"token" yaml:"token"`
|
||||
|
||||
client *api.Client
|
||||
config *api.Config // needed to close the idle connections
|
||||
once bool // safety token
|
||||
key string // cache the key name to avoid re-running the parser
|
||||
}
|
||||
|
||||
// Default returns some sensible defaults for this resource.
|
||||
func (obj *ConsulKVRes) Default() engine.Res {
|
||||
return &ConsulKVRes{}
|
||||
}
|
||||
|
||||
// Validate if the params passed in are valid data.
|
||||
func (obj *ConsulKVRes) Validate() error {
|
||||
s, _, k := obj.inputParser()
|
||||
if k == "" {
|
||||
return fmt.Errorf("the Key is empty")
|
||||
}
|
||||
if s != "" && s != "http" && s != "https" {
|
||||
return fmt.Errorf("unknown Scheme")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Init runs some startup code for this resource.
|
||||
func (obj *ConsulKVRes) Init(init *engine.Init) error {
|
||||
obj.init = init // save for later
|
||||
|
||||
s, a, k := obj.inputParser()
|
||||
|
||||
obj.config = api.DefaultConfig()
|
||||
if s != "" {
|
||||
obj.config.Scheme = s
|
||||
}
|
||||
if a != "" {
|
||||
obj.config.Address = obj.Address
|
||||
}
|
||||
obj.key = k // store the key
|
||||
obj.init.Logf("using consul key: %s", obj.key)
|
||||
|
||||
if obj.Token != "" {
|
||||
obj.config.Token = obj.Token
|
||||
}
|
||||
|
||||
var err error
|
||||
obj.client, err = api.NewClient(obj.config)
|
||||
return errwrap.Wrapf(err, "could not create Consul client")
|
||||
}
|
||||
|
||||
// Close is run by the engine to clean up after the resource is done.
|
||||
func (obj *ConsulKVRes) Close() error {
|
||||
if obj.config != nil && obj.config.Transport != nil {
|
||||
obj.config.Transport.CloseIdleConnections()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Watch is the listener and main loop for this resource and it outputs events.
|
||||
func (obj *ConsulKVRes) Watch() error {
|
||||
wg := &sync.WaitGroup{}
|
||||
defer wg.Wait()
|
||||
|
||||
ch := make(chan error)
|
||||
exit := make(chan struct{})
|
||||
|
||||
kv := obj.client.KV()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
defer wg.Done()
|
||||
|
||||
opts := &api.QueryOptions{RequireConsistent: true}
|
||||
ctx, cancel := util.ContextWithCloser(context.Background(), exit)
|
||||
defer cancel()
|
||||
opts = opts.WithContext(ctx)
|
||||
|
||||
for {
|
||||
_, meta, err := kv.Get(obj.key, opts)
|
||||
select {
|
||||
case ch <- err: // send
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// WaitIndex = 0, which means that it is the
|
||||
// first time we run the query, as we are about
|
||||
// to change the WaitIndex to make a blocking
|
||||
// query, we can consider the watch started.
|
||||
opts.WaitIndex = meta.LastIndex
|
||||
if opts.WaitIndex != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if !obj.once {
|
||||
obj.init.Running()
|
||||
obj.once = true
|
||||
continue
|
||||
}
|
||||
|
||||
// Unexpected situation, bug in consul API...
|
||||
select {
|
||||
case ch <- fmt.Errorf("unexpected behaviour in Consul API"):
|
||||
case <-obj.init.Done: // signal for shutdown request
|
||||
}
|
||||
|
||||
case <-obj.init.Done: // signal for shutdown request
|
||||
}
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
defer close(exit)
|
||||
for {
|
||||
select {
|
||||
case err, ok := <-ch:
|
||||
if !ok { // channel shutdown
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return errwrap.Wrapf(err, "unknown %s watcher error", obj)
|
||||
}
|
||||
if obj.init.Debug {
|
||||
obj.init.Logf("event!")
|
||||
}
|
||||
obj.init.Event()
|
||||
|
||||
case <-obj.init.Done: // signal for shutdown request
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CheckApply is run to check the state and, if apply is true, to apply the
|
||||
// necessary changes to reach the desired state. This is run before Watch and
|
||||
// again if Watch finds a change occurring to the state.
|
||||
func (obj *ConsulKVRes) CheckApply(apply bool) (bool, error) {
|
||||
if obj.init.Debug {
|
||||
obj.init.Logf("consul key: %s", obj.key)
|
||||
}
|
||||
kv := obj.client.KV()
|
||||
pair, _, err := kv.Get(obj.key, nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if pair != nil && string(pair.Value) == obj.Value {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if !apply {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
p := &api.KVPair{Key: obj.key, Value: []byte(obj.Value)}
|
||||
_, err = kv.Put(p, nil)
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Cmp compares two resources and return if they are equivalent.
|
||||
func (obj *ConsulKVRes) Cmp(r engine.Res) error {
|
||||
res, ok := r.(*ConsulKVRes)
|
||||
if !ok {
|
||||
return fmt.Errorf("not a %s", obj.Kind())
|
||||
}
|
||||
|
||||
if obj.Key != res.Key {
|
||||
return fmt.Errorf("the Key param differs")
|
||||
}
|
||||
if obj.Value != res.Value {
|
||||
return fmt.Errorf("the Value param differs")
|
||||
}
|
||||
if obj.Scheme != res.Scheme {
|
||||
return fmt.Errorf("the Scheme param differs")
|
||||
}
|
||||
if obj.Address != res.Address {
|
||||
return fmt.Errorf("the Address param differs")
|
||||
}
|
||||
if obj.Token != res.Token {
|
||||
return fmt.Errorf("the Token param differs")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// inputParser parses the Name() of a resource and extracts the scheme, address,
|
||||
// and key name of a consul key. We don't have an error, because if we have one,
|
||||
// then it means the input must be a raw key. Output of this function is scheme,
|
||||
// address (includes hostname and port), and key. This also takes our parameters
|
||||
// in to account, and applies the correct overrides if they are specified there.
|
||||
func (obj *ConsulKVRes) inputParser() (string, string, string) {
|
||||
// If the key is specified explicitly, then we're not going to parse the
|
||||
// resource name for a pattern, and we use our given params as they are.
|
||||
if obj.Key != "" {
|
||||
return obj.Scheme, obj.Address, obj.Key
|
||||
}
|
||||
|
||||
// Now we parse...
|
||||
u, err := url.Parse(obj.Name())
|
||||
if err != nil {
|
||||
// If this didn't work, then we know it's explicitly a raw key.
|
||||
return obj.Scheme, obj.Address, obj.Name()
|
||||
}
|
||||
|
||||
// Otherwise, we use the parse result, and we overwrite any of the
|
||||
// fields if we have an explicit param that was specified.
|
||||
k := u.Path
|
||||
s := u.Scheme
|
||||
a := u.Host
|
||||
|
||||
//if obj.Key != "" { // this is now guaranteed to never happen
|
||||
// k = obj.Key
|
||||
//}
|
||||
if obj.Scheme != "" {
|
||||
s = obj.Scheme
|
||||
}
|
||||
if obj.Address != "" {
|
||||
a = obj.Address
|
||||
}
|
||||
|
||||
return s, a, k
|
||||
}
|
||||
71
engine/resources/consul_kv_test.go
Normal file
71
engine/resources/consul_kv_test.go
Normal file
@@ -0,0 +1,71 @@
|
||||
// Mgmt
|
||||
// Copyright (C) 2013-2019+ James Shubin and the project contributors
|
||||
// Written by James Shubin <james@shubin.ca> and the project contributors
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package resources
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/purpleidea/mgmt/engine"
|
||||
)
|
||||
|
||||
func createConsulRes(name string) *ConsulKVRes {
|
||||
r, err := engine.NewNamedResource("consul:kv", name)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("could not create resource: %+v", err))
|
||||
}
|
||||
|
||||
res := r.(*ConsulKVRes) // if this panics, the test will panic
|
||||
return res
|
||||
}
|
||||
|
||||
func TestParseConsulName(t *testing.T) {
|
||||
n1 := "test"
|
||||
r1 := createConsulRes(n1)
|
||||
if s, a, k := r1.inputParser(); s != "" || a != "" || k != "test" {
|
||||
t.Errorf("unexpected output while parsing `%s`: %s, %s, %s", n1, s, a, k)
|
||||
}
|
||||
|
||||
n2 := "http://127.0.0.1:8500/test"
|
||||
r2 := createConsulRes(n2)
|
||||
if s, a, k := r2.inputParser(); s != "http" || a != "127.0.0.1:8500" || k != "/test" {
|
||||
t.Errorf("unexpected output while parsing `%s`: %s, %s, %s", n2, s, a, k)
|
||||
}
|
||||
|
||||
n3 := "http://127.0.0.1:8500/test"
|
||||
r3 := createConsulRes(n3)
|
||||
r3.Scheme = "https"
|
||||
r3.Address = "example.com"
|
||||
if s, a, k := r3.inputParser(); s != "https" || a != "example.com" || k != "/test" {
|
||||
t.Errorf("unexpected output while parsing `%s`: %s, %s, %s", n3, s, a, k)
|
||||
}
|
||||
|
||||
n4 := "http:://127.0.0.1..5:8500/test" // wtf, url.Parse is on drugs...
|
||||
r4 := createConsulRes(n4)
|
||||
//if s, a, k := r4.inputParser(); s != "" || a != "" || k != n4 { // what i really expect
|
||||
if s, a, k := r4.inputParser(); s != "http" || a != "" || k != "" { // what i get
|
||||
t.Errorf("unexpected output while parsing `%s`: %s, %s, %s", n4, s, a, k)
|
||||
}
|
||||
|
||||
n5 := "http://127.0.0.1:8500/test" // whatever, it's ignored
|
||||
r5 := createConsulRes(n3)
|
||||
r5.Key = "some key"
|
||||
if s, a, k := r5.inputParser(); s != "" || a != "" || k != "some key" {
|
||||
t.Errorf("unexpected output while parsing `%s`: %s, %s, %s", n5, s, a, k)
|
||||
}
|
||||
}
|
||||
4
examples/lang/consul1.mcl
Normal file
4
examples/lang/consul1.mcl
Normal file
@@ -0,0 +1,4 @@
|
||||
consul:kv "love" {
|
||||
key => "mgmt/love",
|
||||
value => "YES",
|
||||
}
|
||||
7
examples/lang/consul2.mcl
Normal file
7
examples/lang/consul2.mcl
Normal file
@@ -0,0 +1,7 @@
|
||||
import "datetime"
|
||||
|
||||
$d = datetime.now()
|
||||
consul:kv "love" {
|
||||
key => "mgmt/time",
|
||||
value => template("hello! it is now: {{ datetime_print . }}\n", $d),
|
||||
}
|
||||
Reference in New Issue
Block a user