From 0af9af44e5bf92c57e2e968859588e16ad8382bb Mon Sep 17 00:00:00 2001 From: James Shubin Date: Mon, 17 Apr 2017 07:03:29 -0400 Subject: [PATCH] etcd, resources, world: Add World API for shared keys It's up to the end user to decide who is writing and/or overwriting them. It could also be useful to reimplement (refactor) some of the existing World API's to be implemented in terms of these primitives. --- etcd/str.go | 70 +++++++++++-------------- etcd/strmap.go | 115 +++++++++++++++++++++++++++++++++++++++++ etcd/world.go | 41 ++++++++++++--- resources/kv.go | 8 +-- resources/resources.go | 8 ++- 5 files changed, 189 insertions(+), 53 deletions(-) create mode 100644 etcd/strmap.go diff --git a/etcd/str.go b/etcd/str.go index ff33748d..575c5cb5 100644 --- a/etcd/str.go +++ b/etcd/str.go @@ -18,20 +18,22 @@ package etcd import ( + "errors" "fmt" - "strings" - - "github.com/purpleidea/mgmt/util" etcd "github.com/coreos/etcd/clientv3" errwrap "github.com/pkg/errors" ) +// ErrNotExist is returned when GetStr can not find the requested key. +// TODO: https://dave.cheney.net/2016/04/07/constant-errors +var ErrNotExist = errors.New("errNotExist") + // WatchStr returns a channel which spits out events on key activity. // FIXME: It should close the channel when it's done, and spit out errors when // something goes wrong. func WatchStr(obj *EmbdEtcd, key string) chan error { - // new key structure is /$NS/strings/$key/$hostname = $data + // new key structure is /$NS/strings/$key = $data path := fmt.Sprintf("/%s/strings/%s", NS, key) ch := make(chan error, 1) // FIXME: fix our API so that we get a close event on shutdown. @@ -50,50 +52,38 @@ func WatchStr(obj *EmbdEtcd, key string) chan error { return ch } -// GetStr collects all of the strings which match a namespace in etcd. -func GetStr(obj *EmbdEtcd, hostnameFilter []string, key string) (map[string]string, error) { - // old key structure is /$NS/strings/$hostname/$key = $data - // new key structure is /$NS/strings/$key/$hostname = $data - // FIXME: if we have the $key as the last token (old key structure), we - // can allow the key to contain the slash char, otherwise we need to - // verify that one isn't present in the input string. +// GetStr collects the string which matches a gloabl namespace in etcd. +func GetStr(obj *EmbdEtcd, key string) (string, error) { + // new key structure is /$NS/strings/$key = $data path := fmt.Sprintf("/%s/strings/%s", NS, key) - keyMap, err := obj.Get(path, etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortAscend)) + keyMap, err := obj.Get(path, etcd.WithPrefix()) if err != nil { - return nil, errwrap.Wrapf(err, "could not get strings in: %s", key) + return "", errwrap.Wrapf(err, "could not get strings in: %s", key) } - result := make(map[string]string) - for key, val := range keyMap { - if !strings.HasPrefix(key, path) { // sanity check - continue - } - str := strings.Split(key[len(path):], "/") - if len(str) != 2 { - return nil, fmt.Errorf("unexpected chunk count of %d", len(str)) - } - _, hostname := str[0], str[1] - - if hostname == "" { - return nil, fmt.Errorf("unexpected chunk length of %d", len(hostname)) - } - - // FIXME: ideally this would be a server side filter instead! - if len(hostnameFilter) > 0 && !util.StrInList(hostname, hostnameFilter) { - continue - } - //log.Printf("Etcd: GetStr(%s): (Hostname, Data): (%s, %s)", key, hostname, val) - result[hostname] = val + if len(keyMap) == 0 { + return "", ErrNotExist } - return result, nil + + if count := len(keyMap); count != 1 { + return "", fmt.Errorf("returned %d entries", count) + } + + val, exists := keyMap[path] + if !exists { + return "", fmt.Errorf("path `%s` is missing", path) + } + + //log.Printf("Etcd: GetStr(%s): %s", key, val) + return val, nil } -// SetStr sets a key and hostname pair to a certain value. If the value is nil, -// then it deletes the key. Otherwise the value should point to a string. +// SetStr sets a key and hostname pair to a certain value. If the value is +// nil, then it deletes the key. Otherwise the value should point to a string. // TODO: TTL or delete disconnect? -func SetStr(obj *EmbdEtcd, hostname, key string, data *string) error { - // key structure is /$NS/strings/$key/$hostname = $data - path := fmt.Sprintf("/%s/strings/%s/%s", NS, key, hostname) +func SetStr(obj *EmbdEtcd, key string, data *string) error { + // key structure is /$NS/strings/$key = $data + path := fmt.Sprintf("/%s/strings/%s", NS, key) ifs := []etcd.Cmp{} // list matching the desired state ops := []etcd.Op{} // list of ops in this transaction (then) els := []etcd.Op{} // list of ops in this transaction (else) diff --git a/etcd/strmap.go b/etcd/strmap.go new file mode 100644 index 00000000..261c3710 --- /dev/null +++ b/etcd/strmap.go @@ -0,0 +1,115 @@ +// Mgmt +// Copyright (C) 2013-2017+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package etcd + +import ( + "fmt" + "strings" + + "github.com/purpleidea/mgmt/util" + + etcd "github.com/coreos/etcd/clientv3" + errwrap "github.com/pkg/errors" +) + +// WatchStrMap returns a channel which spits out events on key activity. +// FIXME: It should close the channel when it's done, and spit out errors when +// something goes wrong. +func WatchStrMap(obj *EmbdEtcd, key string) chan error { + // new key structure is /$NS/strings/$key/$hostname = $data + path := fmt.Sprintf("/%s/strings/%s", NS, key) + ch := make(chan error, 1) + // FIXME: fix our API so that we get a close event on shutdown. + callback := func(re *RE) error { + // TODO: is this even needed? it used to happen on conn errors + //log.Printf("Etcd: Watch: Path: %v", path) // event + if re == nil || re.response.Canceled { + return fmt.Errorf("watch is empty") // will cause a CtxError+retry + } + if len(ch) == 0 { // send event only if one isn't pending + ch <- nil // event + } + return nil + } + _, _ = obj.AddWatcher(path, callback, true, false, etcd.WithPrefix()) // no need to check errors + return ch +} + +// GetStrMap collects all of the strings which match a namespace in etcd. +func GetStrMap(obj *EmbdEtcd, hostnameFilter []string, key string) (map[string]string, error) { + // old key structure is /$NS/strings/$hostname/$key = $data + // new key structure is /$NS/strings/$key/$hostname = $data + // FIXME: if we have the $key as the last token (old key structure), we + // can allow the key to contain the slash char, otherwise we need to + // verify that one isn't present in the input string. + path := fmt.Sprintf("/%s/strings/%s", NS, key) + keyMap, err := obj.Get(path, etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortAscend)) + if err != nil { + return nil, errwrap.Wrapf(err, "could not get strings in: %s", key) + } + result := make(map[string]string) + for key, val := range keyMap { + if !strings.HasPrefix(key, path) { // sanity check + continue + } + + str := strings.Split(key[len(path):], "/") + if len(str) != 2 { + return nil, fmt.Errorf("unexpected chunk count of %d", len(str)) + } + _, hostname := str[0], str[1] + + if hostname == "" { + return nil, fmt.Errorf("unexpected chunk length of %d", len(hostname)) + } + + // FIXME: ideally this would be a server side filter instead! + if len(hostnameFilter) > 0 && !util.StrInList(hostname, hostnameFilter) { + continue + } + //log.Printf("Etcd: GetStr(%s): (Hostname, Data): (%s, %s)", key, hostname, val) + result[hostname] = val + } + return result, nil +} + +// SetStrMap sets a key and hostname pair to a certain value. If the value is +// nil, then it deletes the key. Otherwise the value should point to a string. +// TODO: TTL or delete disconnect? +func SetStrMap(obj *EmbdEtcd, hostname, key string, data *string) error { + // key structure is /$NS/strings/$key/$hostname = $data + path := fmt.Sprintf("/%s/strings/%s/%s", NS, key, hostname) + ifs := []etcd.Cmp{} // list matching the desired state + ops := []etcd.Op{} // list of ops in this transaction (then) + els := []etcd.Op{} // list of ops in this transaction (else) + if data == nil { // perform a delete + // TODO: use https://github.com/coreos/etcd/pull/7417 if merged + //ifs = append(ifs, etcd.KeyExists(path)) + ifs = append(ifs, etcd.Compare(etcd.Version(path), ">", 0)) + ops = append(ops, etcd.OpDelete(path)) + } else { + data := *data // get the real value + ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", data)) // desired state + els = append(els, etcd.OpPut(path, data)) + } + + // it's important to do this in one transaction, and atomically, because + // this way, we only generate one watch event, and only when it's needed + _, err := obj.Txn(ifs, ops, els) // TODO: do we need to look at response? + return errwrap.Wrapf(err, "could not set strings in: %s", key) +} diff --git a/etcd/world.go b/etcd/world.go index b28a0302..9d6f885a 100644 --- a/etcd/world.go +++ b/etcd/world.go @@ -48,23 +48,48 @@ func (obj *World) ResCollect(hostnameFilter, kindFilter []string) ([]resources.R return GetResources(obj.EmbdEtcd, hostnameFilter, kindFilter) } -// SetWatch returns a channel which spits out events on possible string changes. +// StrWatch returns a channel which spits out events on possible string changes. func (obj *World) StrWatch(namespace string) chan error { return WatchStr(obj.EmbdEtcd, namespace) } -// StrGet returns a map of hostnames to values in the given namespace. -func (obj *World) StrGet(namespace string) (map[string]string, error) { - return GetStr(obj.EmbdEtcd, []string{}, namespace) +// StrIsNotExist returns whether the error from StrGet is a key missing error. +func (obj *World) StrIsNotExist(err error) bool { + return err == ErrNotExist } -// StrSet sets the namespace value to a particular string under the identity of -// its own hostname. +// StrGet returns the value for the the given namespace. +func (obj *World) StrGet(namespace string) (string, error) { + return GetStr(obj.EmbdEtcd, namespace) +} + +// StrSet sets the namespace value to a particular string. func (obj *World) StrSet(namespace, value string) error { - return SetStr(obj.EmbdEtcd, obj.Hostname, namespace, &value) + return SetStr(obj.EmbdEtcd, namespace, &value) } // StrDel deletes the value in a particular namespace. func (obj *World) StrDel(namespace string) error { - return SetStr(obj.EmbdEtcd, obj.Hostname, namespace, nil) + return SetStr(obj.EmbdEtcd, namespace, nil) +} + +// StrMapWatch returns a channel which spits out events on possible string changes. +func (obj *World) StrMapWatch(namespace string) chan error { + return WatchStrMap(obj.EmbdEtcd, namespace) +} + +// StrMapGet returns a map of hostnames to values in the given namespace. +func (obj *World) StrMapGet(namespace string) (map[string]string, error) { + return GetStrMap(obj.EmbdEtcd, []string{}, namespace) +} + +// StrMapSet sets the namespace value to a particular string under the identity +// of its own hostname. +func (obj *World) StrMapSet(namespace, value string) error { + return SetStrMap(obj.EmbdEtcd, obj.Hostname, namespace, &value) +} + +// StrMapDel deletes the value in a particular namespace. +func (obj *World) StrMapDel(namespace string) error { + return SetStrMap(obj.EmbdEtcd, obj.Hostname, namespace, nil) } diff --git a/resources/kv.go b/resources/kv.go index 6683fa31..ccd0cd6c 100644 --- a/resources/kv.go +++ b/resources/kv.go @@ -101,7 +101,7 @@ func (obj *KVRes) Watch() error { return err // bubble up a NACK... } - ch := obj.Data().World.StrWatch(obj.Key) // get possible events! + ch := obj.Data().World.StrMapWatch(obj.Key) // get possible events! var send = false // send event? var exit *error @@ -185,7 +185,7 @@ func (obj *KVRes) CheckApply(apply bool) (checkOK bool, err error) { } hostname := obj.Data().Hostname // me - keyMap, err := obj.Data().World.StrGet(obj.Key) + keyMap, err := obj.Data().World.StrMapGet(obj.Key) if err != nil { return false, errwrap.Wrapf(err, "check error during StrGet") } @@ -205,7 +205,7 @@ func (obj *KVRes) CheckApply(apply bool) (checkOK bool, err error) { return true, nil // nothing to delete, we're good! } else if ok && obj.Value == nil { // delete - err := obj.Data().World.StrDel(obj.Key) + err := obj.Data().World.StrMapDel(obj.Key) return false, errwrap.Wrapf(err, "apply error during StrDel") } @@ -213,7 +213,7 @@ func (obj *KVRes) CheckApply(apply bool) (checkOK bool, err error) { return false, nil } - if err := obj.Data().World.StrSet(obj.Key, *obj.Value); err != nil { + if err := obj.Data().World.StrMapSet(obj.Key, *obj.Value); err != nil { return false, errwrap.Wrapf(err, "apply error during StrSet") } diff --git a/resources/resources.go b/resources/resources.go index 1e3fa43b..60797d50 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -81,9 +81,15 @@ type World interface { // TODO: is there a better name for this interface? ResCollect(hostnameFilter, kindFilter []string) ([]Res, error) StrWatch(namespace string) chan error - StrGet(namespace string) (map[string]string, error) + StrIsNotExist(error) bool + StrGet(namespace string) (string, error) StrSet(namespace, value string) error StrDel(namespace string) error + + StrMapWatch(namespace string) chan error + StrMapGet(namespace string) (map[string]string, error) + StrMapSet(namespace, value string) error + StrMapDel(namespace string) error } // Data is the set of input values passed into the pgraph for the resources.