engine: resources: The kv resource can set un-mapped values
Previously the resource could only set values in a per-hostname namespace, but for single, user-managed values, we'd like to be able to control things entirely. Now this resource can do that.
This commit is contained in:
@@ -72,6 +72,18 @@ type KVRes struct {
|
|||||||
// undefined, then this will delete that key.
|
// undefined, then this will delete that key.
|
||||||
Value *string `lang:"value" yaml:"value"`
|
Value *string `lang:"value" yaml:"value"`
|
||||||
|
|
||||||
|
// Mapped specifies that we will store the value in a map with each
|
||||||
|
// hostname as part of the key. This is very useful for exchanging
|
||||||
|
// values when running this resource on multiple nodes simultaneously.
|
||||||
|
// To read/write/watch a single, global key, this value should be false.
|
||||||
|
// Note that resources may fight if more than one uses this. The `world`
|
||||||
|
// functions like `exchange`, require this to be true, since they're
|
||||||
|
// pulling values out of a pool that each node sets. The `world`
|
||||||
|
// functions like `getval`, require this to be false, since they're
|
||||||
|
// pulling values directly out of the same namespace that is shared by
|
||||||
|
// all nodes.
|
||||||
|
Mapped bool
|
||||||
|
|
||||||
// SkipLessThan causes the value to be updated as long as it is greater.
|
// SkipLessThan causes the value to be updated as long as it is greater.
|
||||||
SkipLessThan bool `lang:"skiplessthan" yaml:"skiplessthan"`
|
SkipLessThan bool `lang:"skiplessthan" yaml:"skiplessthan"`
|
||||||
|
|
||||||
@@ -93,6 +105,48 @@ func (obj *KVRes) getKey() string {
|
|||||||
return obj.Name()
|
return obj.Name()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (obj *KVRes) kvWatch(ctx context.Context, key string) (chan error, error) {
|
||||||
|
if obj.Mapped {
|
||||||
|
return obj.init.World.StrMapWatch(ctx, key)
|
||||||
|
}
|
||||||
|
return obj.init.World.StrWatch(ctx, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (obj *KVRes) kvGet(ctx context.Context, key string) (string, bool, error) {
|
||||||
|
if obj.Mapped {
|
||||||
|
hostname := obj.init.Hostname // me
|
||||||
|
keyMap, err := obj.init.World.StrMapGet(ctx, obj.getKey())
|
||||||
|
if err != nil {
|
||||||
|
return "", false, err
|
||||||
|
}
|
||||||
|
val, exists := keyMap[hostname]
|
||||||
|
return val, exists, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
val, err := obj.init.World.StrGet(ctx, key)
|
||||||
|
if err != nil && obj.init.World.StrIsNotExist(err) {
|
||||||
|
return "", false, nil // val doesn't exist
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return "", false, err
|
||||||
|
}
|
||||||
|
return val, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (obj *KVRes) kvSet(ctx context.Context, key, val string) error {
|
||||||
|
if obj.Mapped {
|
||||||
|
return obj.init.World.StrMapSet(ctx, key, val)
|
||||||
|
}
|
||||||
|
return obj.init.World.StrSet(ctx, key, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (obj *KVRes) kvDel(ctx context.Context, key string) error {
|
||||||
|
if obj.Mapped {
|
||||||
|
return obj.init.World.StrMapDel(ctx, key)
|
||||||
|
}
|
||||||
|
return obj.init.World.StrDel(ctx, key)
|
||||||
|
}
|
||||||
|
|
||||||
// Default returns some sensible defaults for this resource.
|
// Default returns some sensible defaults for this resource.
|
||||||
func (obj *KVRes) Default() engine.Res {
|
func (obj *KVRes) Default() engine.Res {
|
||||||
return &KVRes{}
|
return &KVRes{}
|
||||||
@@ -136,7 +190,7 @@ func (obj *KVRes) Watch(ctx context.Context) error {
|
|||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
ch, err := obj.init.World.StrMapWatch(ctx, obj.getKey()) // get possible events!
|
ch, err := obj.kvWatch(ctx, obj.getKey()) // get possible events!
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errwrap.Wrapf(err, "error during watch")
|
return errwrap.Wrapf(err, "error during watch")
|
||||||
}
|
}
|
||||||
@@ -233,13 +287,11 @@ func (obj *KVRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
|
|||||||
obj.init.Logf("CheckApply: `Value` was updated!")
|
obj.init.Logf("CheckApply: `Value` was updated!")
|
||||||
}
|
}
|
||||||
|
|
||||||
hostname := obj.init.Hostname // me
|
value, exists, err := obj.kvGet(ctx, obj.getKey())
|
||||||
keyMap, err := obj.init.World.StrMapGet(ctx, obj.getKey())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errwrap.Wrapf(err, "error during get")
|
return false, errwrap.Wrapf(err, "error during get")
|
||||||
}
|
}
|
||||||
|
if exists && obj.Value != nil {
|
||||||
if value, exists := keyMap[hostname]; exists && obj.Value != nil {
|
|
||||||
if value == *obj.Value {
|
if value == *obj.Value {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
@@ -265,7 +317,7 @@ func (obj *KVRes) CheckApply(ctx context.Context, apply bool) (bool, error) {
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := obj.init.World.StrMapSet(ctx, obj.getKey(), *obj.Value); err != nil {
|
if err := obj.kvSet(ctx, obj.getKey(), *obj.Value); err != nil {
|
||||||
return false, errwrap.Wrapf(err, "error during set")
|
return false, errwrap.Wrapf(err, "error during set")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -291,6 +343,9 @@ func (obj *KVRes) Cmp(r engine.Res) error {
|
|||||||
return fmt.Errorf("the contents of Value differs")
|
return fmt.Errorf("the contents of Value differs")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if obj.Mapped != res.Mapped {
|
||||||
|
return fmt.Errorf("the Mapped param differs")
|
||||||
|
}
|
||||||
if obj.SkipLessThan != res.SkipLessThan {
|
if obj.SkipLessThan != res.SkipLessThan {
|
||||||
return fmt.Errorf("the SkipLessThan param differs")
|
return fmt.Errorf("the SkipLessThan param differs")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -128,6 +128,8 @@ func (obj *World) StrGet(ctx context.Context, namespace string) (string, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// StrSet sets the namespace value to a particular string.
|
// StrSet sets the namespace value to a particular string.
|
||||||
|
// XXX: This can overwrite another hosts value that was set with StrMapSet. Add
|
||||||
|
// possible cryptographic signing or special namespacing to prevent such things.
|
||||||
func (obj *World) StrSet(ctx context.Context, namespace, value string) error {
|
func (obj *World) StrSet(ctx context.Context, namespace, value string) error {
|
||||||
return str.SetStr(ctx, obj.Client, namespace, &value)
|
return str.SetStr(ctx, obj.Client, namespace, &value)
|
||||||
}
|
}
|
||||||
|
|||||||
19
examples/lang/getval0.mcl
Normal file
19
examples/lang/getval0.mcl
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
# etcdctl --endpoints=localhost:2379 get --prefix / # to see all the values
|
||||||
|
# etcdctl --endpoints=localhost:2379 put /somekey somevalue # to put a value
|
||||||
|
import "world"
|
||||||
|
|
||||||
|
$key = "somekey"
|
||||||
|
$st = world.getval($key)
|
||||||
|
|
||||||
|
$val = $st->value
|
||||||
|
$exists = $st->exists
|
||||||
|
|
||||||
|
# stores a value in: /_mgmt/strings/${key}
|
||||||
|
kv "kv" {
|
||||||
|
key => $key,
|
||||||
|
value => "three",
|
||||||
|
}
|
||||||
|
file "/tmp/val" {
|
||||||
|
state => $const.res.file.state.exists,
|
||||||
|
content => "val: ${val}\n",
|
||||||
|
}
|
||||||
@@ -13,6 +13,7 @@ exec "exec0" {
|
|||||||
kv "kv0" {
|
kv "kv0" {
|
||||||
key => $ns,
|
key => $ns,
|
||||||
#value => "two",
|
#value => "two",
|
||||||
|
mapped = true,
|
||||||
}
|
}
|
||||||
|
|
||||||
Exec["exec0"].output -> Kv["kv0"].value
|
Exec["exec0"].output -> Kv["kv0"].value
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ if $state == "one" or $state == "default" {
|
|||||||
kv "${ns}" {
|
kv "${ns}" {
|
||||||
key => $ns,
|
key => $ns,
|
||||||
value => "two",
|
value => "two",
|
||||||
|
mapped = true,
|
||||||
}
|
}
|
||||||
Exec["timer"] -> Kv["${ns}"]
|
Exec["timer"] -> Kv["${ns}"]
|
||||||
}
|
}
|
||||||
@@ -33,6 +34,7 @@ if $state == "two" {
|
|||||||
kv "${ns}" {
|
kv "${ns}" {
|
||||||
key => $ns,
|
key => $ns,
|
||||||
value => "three",
|
value => "three",
|
||||||
|
mapped = true,
|
||||||
}
|
}
|
||||||
Exec["timer"] -> Kv["${ns}"]
|
Exec["timer"] -> Kv["${ns}"]
|
||||||
}
|
}
|
||||||
@@ -49,6 +51,7 @@ if $state == "three" {
|
|||||||
kv "${ns}" {
|
kv "${ns}" {
|
||||||
key => $ns,
|
key => $ns,
|
||||||
value => "one",
|
value => "one",
|
||||||
|
mapped = true,
|
||||||
}
|
}
|
||||||
Exec["timer"] -> Kv["${ns}"]
|
Exec["timer"] -> Kv["${ns}"]
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user