Instead of constantly making these updates, let's just remove the year since things are stored in git anyways, and this is not an actual modern legal risk anymore.
299 lines
8.3 KiB
Go
299 lines
8.3 KiB
Go
// Mgmt
|
|
// Copyright (C) James Shubin and the project contributors
|
|
// Written by James Shubin <james@shubin.ca> and the project contributors
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
//
|
|
// Additional permission under GNU GPL version 3 section 7
|
|
//
|
|
// If you modify this program, or any covered work, by linking or combining it
|
|
// with embedded mcl code and modules (and that the embedded mcl code and
|
|
// modules which link with this program, contain a copy of their source code in
|
|
// the authoritative form) containing parts covered by the terms of any other
|
|
// license, the licensors of this program grant you additional permission to
|
|
// convey the resulting work. Furthermore, the licensors of this program grant
|
|
// the original author, James Shubin, additional permission to update this
|
|
// additional permission if he deems it necessary to achieve the goals of this
|
|
// additional permission.
|
|
|
|
//go:build !noconsul
|
|
|
|
package resources
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/url"
|
|
"sync"
|
|
|
|
"github.com/purpleidea/mgmt/engine"
|
|
"github.com/purpleidea/mgmt/engine/traits"
|
|
"github.com/purpleidea/mgmt/util"
|
|
"github.com/purpleidea/mgmt/util/errwrap"
|
|
|
|
"github.com/hashicorp/consul/api"
|
|
)
|
|
|
|
func init() {
|
|
engine.RegisterResource("consul:kv", func() engine.Res { return &ConsulKVRes{} })
|
|
}
|
|
|
|
// ConsulKVRes is a resource that writes a value into a Consul datastore. The
|
|
// name of the resource can either be the key name, or the concatenation of the
|
|
// server address and the key name: http://127.0.0.1:8500/my-key. If the param
|
|
// keys are specified, then those are used. If the Name cannot be properly
|
|
// parsed by url.Parse, then it will be considered as the Key's value. If the
|
|
// Key is specified explicitly, then we won't use anything from the Name.
|
|
type ConsulKVRes struct {
|
|
traits.Base
|
|
init *engine.Init
|
|
|
|
// Key is the name of the key. Defaults to the name of the resource.
|
|
Key string `lang:"key" yaml:"key"`
|
|
|
|
// Value is the value for the key.
|
|
Value string `lang:"value" yaml:"value"`
|
|
|
|
// Scheme is the URI scheme for the Consul server. Default: http.
|
|
Scheme string `lang:"scheme" yaml:"scheme"`
|
|
|
|
// Address is the address of the Consul server. Default: 127.0.0.1:8500.
|
|
Address string `lang:"address" yaml:"address"`
|
|
|
|
// Token is used to provide an ACL token to use for this resource.
|
|
Token string `lang:"token" yaml:"token"`
|
|
|
|
client *api.Client
|
|
config *api.Config // needed to close the idle connections
|
|
once bool // safety token
|
|
key string // cache the key name to avoid re-running the parser
|
|
}
|
|
|
|
// Default returns some sensible defaults for this resource.
|
|
func (obj *ConsulKVRes) Default() engine.Res {
|
|
return &ConsulKVRes{}
|
|
}
|
|
|
|
// Validate if the params passed in are valid data.
|
|
func (obj *ConsulKVRes) Validate() error {
|
|
s, _, k := obj.inputParser()
|
|
if k == "" {
|
|
return fmt.Errorf("the Key is empty")
|
|
}
|
|
if s != "" && s != "http" && s != "https" {
|
|
return fmt.Errorf("unknown Scheme")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Init runs some startup code for this resource.
|
|
func (obj *ConsulKVRes) Init(init *engine.Init) error {
|
|
obj.init = init // save for later
|
|
|
|
s, a, k := obj.inputParser()
|
|
|
|
obj.config = api.DefaultConfig()
|
|
if s != "" {
|
|
obj.config.Scheme = s
|
|
}
|
|
if a != "" {
|
|
obj.config.Address = obj.Address
|
|
}
|
|
obj.key = k // store the key
|
|
obj.init.Logf("using consul key: %s", obj.key)
|
|
|
|
if obj.Token != "" {
|
|
obj.config.Token = obj.Token
|
|
}
|
|
|
|
var err error
|
|
obj.client, err = api.NewClient(obj.config)
|
|
return errwrap.Wrapf(err, "could not create Consul client")
|
|
}
|
|
|
|
// Cleanup is run by the engine to clean up after the resource is done.
|
|
func (obj *ConsulKVRes) Cleanup() error {
|
|
if obj.config != nil && obj.config.Transport != nil {
|
|
obj.config.Transport.CloseIdleConnections()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Watch is the listener and main loop for this resource and it outputs events.
|
|
func (obj *ConsulKVRes) Watch(ctx context.Context) error {
|
|
wg := &sync.WaitGroup{}
|
|
defer wg.Wait()
|
|
|
|
ch := make(chan error)
|
|
exit := make(chan struct{})
|
|
|
|
kv := obj.client.KV()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer close(ch)
|
|
defer wg.Done()
|
|
|
|
opts := &api.QueryOptions{RequireConsistent: true}
|
|
innerCtx, cancel := util.ContextWithCloser(context.Background(), exit)
|
|
defer cancel()
|
|
opts = opts.WithContext(innerCtx)
|
|
|
|
for {
|
|
_, meta, err := kv.Get(obj.key, opts)
|
|
select {
|
|
case ch <- err: // send
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// WaitIndex = 0, which means that it is the
|
|
// first time we run the query, as we are about
|
|
// to change the WaitIndex to make a blocking
|
|
// query, we can consider the watch started.
|
|
opts.WaitIndex = meta.LastIndex
|
|
if opts.WaitIndex != 0 {
|
|
continue
|
|
}
|
|
|
|
if !obj.once {
|
|
obj.init.Running()
|
|
obj.once = true
|
|
continue
|
|
}
|
|
|
|
// Unexpected situation, bug in consul API...
|
|
select {
|
|
case ch <- fmt.Errorf("unexpected behaviour in Consul API"):
|
|
case <-ctx.Done(): // signal for shutdown request
|
|
}
|
|
|
|
case <-ctx.Done(): // signal for shutdown request
|
|
}
|
|
return
|
|
}
|
|
}()
|
|
|
|
defer close(exit)
|
|
for {
|
|
select {
|
|
case err, ok := <-ch:
|
|
if !ok { // channel shutdown
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return errwrap.Wrapf(err, "unknown %s watcher error", obj)
|
|
}
|
|
if obj.init.Debug {
|
|
obj.init.Logf("event!")
|
|
}
|
|
obj.init.Event()
|
|
|
|
case <-ctx.Done(): // signal for shutdown request
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// CheckApply is run to check the state and, if apply is true, to apply the
|
|
// necessary changes to reach the desired state. This is run before Watch and
|
|
// again if Watch finds a change occurring to the state.
|
|
func (obj *ConsulKVRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
|
|
// XXX: use ctx for get and put
|
|
if obj.init.Debug {
|
|
obj.init.Logf("consul key: %s", obj.key)
|
|
}
|
|
kv := obj.client.KV()
|
|
pair, _, err := kv.Get(obj.key, nil)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if pair != nil && string(pair.Value) == obj.Value {
|
|
return true, nil
|
|
}
|
|
|
|
if !apply {
|
|
return false, nil
|
|
}
|
|
|
|
p := &api.KVPair{Key: obj.key, Value: []byte(obj.Value)}
|
|
_, err = kv.Put(p, nil)
|
|
return false, err
|
|
}
|
|
|
|
// Cmp compares two resources and return if they are equivalent.
|
|
func (obj *ConsulKVRes) Cmp(r engine.Res) error {
|
|
res, ok := r.(*ConsulKVRes)
|
|
if !ok {
|
|
return fmt.Errorf("not a %s", obj.Kind())
|
|
}
|
|
|
|
if obj.Key != res.Key {
|
|
return fmt.Errorf("the Key param differs")
|
|
}
|
|
if obj.Value != res.Value {
|
|
return fmt.Errorf("the Value param differs")
|
|
}
|
|
if obj.Scheme != res.Scheme {
|
|
return fmt.Errorf("the Scheme param differs")
|
|
}
|
|
if obj.Address != res.Address {
|
|
return fmt.Errorf("the Address param differs")
|
|
}
|
|
if obj.Token != res.Token {
|
|
return fmt.Errorf("the Token param differs")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// inputParser parses the Name() of a resource and extracts the scheme, address,
|
|
// and key name of a consul key. We don't have an error, because if we have one,
|
|
// then it means the input must be a raw key. Output of this function is scheme,
|
|
// address (includes hostname and port), and key. This also takes our parameters
|
|
// in to account, and applies the correct overrides if they are specified there.
|
|
func (obj *ConsulKVRes) inputParser() (string, string, string) {
|
|
// If the key is specified explicitly, then we're not going to parse the
|
|
// resource name for a pattern, and we use our given params as they are.
|
|
if obj.Key != "" {
|
|
return obj.Scheme, obj.Address, obj.Key
|
|
}
|
|
|
|
// Now we parse...
|
|
u, err := url.Parse(obj.Name())
|
|
if err != nil {
|
|
// If this didn't work, then we know it's explicitly a raw key.
|
|
return obj.Scheme, obj.Address, obj.Name()
|
|
}
|
|
|
|
// Otherwise, we use the parse result, and we overwrite any of the
|
|
// fields if we have an explicit param that was specified.
|
|
k := u.Path
|
|
s := u.Scheme
|
|
a := u.Host
|
|
|
|
//if obj.Key != "" { // this is now guaranteed to never happen
|
|
// k = obj.Key
|
|
//}
|
|
if obj.Scheme != "" {
|
|
s = obj.Scheme
|
|
}
|
|
if obj.Address != "" {
|
|
a = obj.Address
|
|
}
|
|
|
|
return s, a, k
|
|
}
|