diff --git a/engine/resources/kv.go b/engine/resources/kv.go index 1b213b42..340c1bfe 100644 --- a/engine/resources/kv.go +++ b/engine/resources/kv.go @@ -72,6 +72,18 @@ type KVRes struct { // undefined, then this will delete that key. Value *string `lang:"value" yaml:"value"` + // Mapped specifies that we will store the value in a map with each + // hostname as part of the key. This is very useful for exchanging + // values when running this resource on multiple nodes simultaneously. + // To read/write/watch a single, global key, this value should be false. + // Note that resources may fight if more than one uses this. The `world` + // functions like `exchange`, require this to be true, since they're + // pulling values out of a pool that each node sets. The `world` + // functions like `getval`, require this to be false, since they're + // pulling values directly out of the same namespace that is shared by + // all nodes. + Mapped bool + // SkipLessThan causes the value to be updated as long as it is greater. SkipLessThan bool `lang:"skiplessthan" yaml:"skiplessthan"` @@ -93,6 +105,48 @@ func (obj *KVRes) getKey() string { return obj.Name() } +func (obj *KVRes) kvWatch(ctx context.Context, key string) (chan error, error) { + if obj.Mapped { + return obj.init.World.StrMapWatch(ctx, key) + } + return obj.init.World.StrWatch(ctx, key) +} + +func (obj *KVRes) kvGet(ctx context.Context, key string) (string, bool, error) { + if obj.Mapped { + hostname := obj.init.Hostname // me + keyMap, err := obj.init.World.StrMapGet(ctx, obj.getKey()) + if err != nil { + return "", false, err + } + val, exists := keyMap[hostname] + return val, exists, nil + } + + val, err := obj.init.World.StrGet(ctx, key) + if err != nil && obj.init.World.StrIsNotExist(err) { + return "", false, nil // val doesn't exist + } + if err != nil { + return "", false, err + } + return val, true, nil +} + +func (obj *KVRes) kvSet(ctx context.Context, key, val string) error { + if obj.Mapped { + return obj.init.World.StrMapSet(ctx, key, val) + } + return obj.init.World.StrSet(ctx, key, val) +} + +func (obj *KVRes) kvDel(ctx context.Context, key string) error { + if obj.Mapped { + return obj.init.World.StrMapDel(ctx, key) + } + return obj.init.World.StrDel(ctx, key) +} + // Default returns some sensible defaults for this resource. func (obj *KVRes) Default() engine.Res { return &KVRes{} @@ -136,7 +190,7 @@ func (obj *KVRes) Watch(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - ch, err := obj.init.World.StrMapWatch(ctx, obj.getKey()) // get possible events! + ch, err := obj.kvWatch(ctx, obj.getKey()) // get possible events! if err != nil { return errwrap.Wrapf(err, "error during watch") } @@ -233,13 +287,11 @@ func (obj *KVRes) CheckApply(ctx context.Context, apply bool) (bool, error) { obj.init.Logf("CheckApply: `Value` was updated!") } - hostname := obj.init.Hostname // me - keyMap, err := obj.init.World.StrMapGet(ctx, obj.getKey()) + value, exists, err := obj.kvGet(ctx, obj.getKey()) if err != nil { return false, errwrap.Wrapf(err, "error during get") } - - if value, exists := keyMap[hostname]; exists && obj.Value != nil { + if exists && obj.Value != nil { if value == *obj.Value { return true, nil } @@ -265,7 +317,7 @@ func (obj *KVRes) CheckApply(ctx context.Context, apply bool) (bool, error) { return false, nil } - if err := obj.init.World.StrMapSet(ctx, obj.getKey(), *obj.Value); err != nil { + if err := obj.kvSet(ctx, obj.getKey(), *obj.Value); err != nil { return false, errwrap.Wrapf(err, "error during set") } @@ -291,6 +343,9 @@ func (obj *KVRes) Cmp(r engine.Res) error { return fmt.Errorf("the contents of Value differs") } } + if obj.Mapped != res.Mapped { + return fmt.Errorf("the Mapped param differs") + } if obj.SkipLessThan != res.SkipLessThan { return fmt.Errorf("the SkipLessThan param differs") } diff --git a/etcd/world.go b/etcd/world.go index 6a9d71a4..aa467174 100644 --- a/etcd/world.go +++ b/etcd/world.go @@ -128,6 +128,8 @@ func (obj *World) StrGet(ctx context.Context, namespace string) (string, error) } // StrSet sets the namespace value to a particular string. +// XXX: This can overwrite another hosts value that was set with StrMapSet. Add +// possible cryptographic signing or special namespacing to prevent such things. func (obj *World) StrSet(ctx context.Context, namespace, value string) error { return str.SetStr(ctx, obj.Client, namespace, &value) } diff --git a/examples/lang/getval0.mcl b/examples/lang/getval0.mcl new file mode 100644 index 00000000..d8bf3e67 --- /dev/null +++ b/examples/lang/getval0.mcl @@ -0,0 +1,19 @@ +# etcdctl --endpoints=localhost:2379 get --prefix / # to see all the values +# etcdctl --endpoints=localhost:2379 put /somekey somevalue # to put a value +import "world" + +$key = "somekey" +$st = world.getval($key) + +$val = $st->value +$exists = $st->exists + +# stores a value in: /_mgmt/strings/${key} +kv "kv" { + key => $key, + value => "three", +} +file "/tmp/val" { + state => $const.res.file.state.exists, + content => "val: ${val}\n", +} diff --git a/examples/lang/sendrecv1.mcl b/examples/lang/sendrecv1.mcl index dea59fc8..9dee1b4c 100644 --- a/examples/lang/sendrecv1.mcl +++ b/examples/lang/sendrecv1.mcl @@ -13,6 +13,7 @@ exec "exec0" { kv "kv0" { key => $ns, #value => "two", + mapped = true, } Exec["exec0"].output -> Kv["kv0"].value diff --git a/examples/lang/states0.mcl b/examples/lang/states0.mcl index 0b5a0ac3..35dddd6d 100644 --- a/examples/lang/states0.mcl +++ b/examples/lang/states0.mcl @@ -17,6 +17,7 @@ if $state == "one" or $state == "default" { kv "${ns}" { key => $ns, value => "two", + mapped = true, } Exec["timer"] -> Kv["${ns}"] } @@ -33,6 +34,7 @@ if $state == "two" { kv "${ns}" { key => $ns, value => "three", + mapped = true, } Exec["timer"] -> Kv["${ns}"] } @@ -49,6 +51,7 @@ if $state == "three" { kv "${ns}" { key => $ns, value => "one", + mapped = true, } Exec["timer"] -> Kv["${ns}"] }