engine, lang: Modern exported resources

I've been waiting to write this patch for a long time. I firmly believe
that the idea of "exported resources" was truly a brilliant one, but
which was never even properly understood by its original inventors! This
patch set aims to show how it should have been done.

The main differences are:

* Real-time modelling, since "once per run" makes no sense.
* Filter with code/functions not language syntax.
* Directed exporting to limit the intended recipients.

The next step is to add more "World" reading and filtering functions to
make it easy and expressive to make your selection of resources to
collect!
This commit is contained in:
James Shubin
2025-03-24 18:54:06 -04:00
parent 955112f64f
commit 045b29291e
24 changed files with 2367 additions and 312 deletions

View File

@@ -297,6 +297,49 @@ This meta param is a safety measure to make your life easier. It works for all
resources. If someone comes up with a resource which would routinely start with
a dollar sign, then we can revisit the default for this resource kind.
#### Hidden
Boolean. Hidden means that this resource will not get executed on the resource
graph on which it is defined. This can be used as a simple boolean switch, or,
more commonly in combination with the Export meta param which specifies that the
resource params are exported into the shared database. When this is true, it
does not prevent export. In fact, it is commonly used in combination with
Export. Using this option will still include it in the resource graph, but it
will exist there in a special "mode" where it will not conflict with any other
identically named resources. It can even be used as part of an edge or via a
send/recv receiver. It can NOT be a sending vertex. These properties
differentiate the use of this instead of simply wrapping a resource in an "if"
statement.
#### Export
List of strings. Export is a list of hostnames (and/or the special "*" entry)
which if set, will mark this resource data as intended for export to those
hosts. This does not prevent any users of the shared data storage from reading
these values, so if you want to guarantee secrecy, use the encryption
primitives. This only labels the data accordingly, so that other hosts can know
what data is available for them to collect. The (kind, name, host) export triple
must be unique from any given exporter. In other words, you may not export two
different instances of a kind+name to the same host, the exports must not
conflict. On resource collect, this parameter is not preserved.
```mcl
file "/tmp/foo" {
state => "exists",
content => "i'm exported!\n",
Meta:hidden => true,
Meta:export => ["h1",],
}
file "/tmp/foo" {
state => "exists",
content => "i'm exported AND i'm used here\n",
Meta:export => ["h1",],
}
```
#### Reverse
Boolean. Reverse is a property that some resources can implement that specifies

View File

@@ -523,9 +523,10 @@ graph edges from another resource. These values are consumed during the
any resource that has an appropriate value and that has the `Sendable` trait.
You can read more about this in the Send/Recv section below.
### Collectable
### Exportable
This is currently a stub and will be updated once the DSL is further along.
Exportable allows a resource to tell the exporter what subset of its data it
wishes to export when that occurs. It is rare that you will need to use this.
## Resource Initialization

View File

@@ -181,6 +181,18 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
refreshableRes.SetRefresh(refresh) // tell the resource
}
// Run the exported resource exporter!
var exportOK bool
var exportErr error
wg := &sync.WaitGroup{}
wg.Add(1)
// (Run this concurrently with the CheckApply related stuff below...)
go func() {
defer wg.Done()
// doesn't really need to be in parallel, but we can...
exportOK, exportErr = obj.Exporter.Export(ctx, res)
}()
// Check cached state, to skip CheckApply, but can't skip if refreshing!
// If the resource doesn't implement refresh, skip the refresh test.
// FIXME: if desired, check that we pass through refresh notifications!
@@ -190,6 +202,13 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
} else if noop && (refresh && isRefreshableRes) { // had a refresh to do w/ noop!
checkOK, err = false, nil // therefore the state is wrong
} else if res.MetaParams().Hidden {
// We're not running CheckApply
if obj.Debug {
obj.Logf("%s: Hidden", res)
}
checkOK, err = true, nil // default
} else {
// run the CheckApply!
if obj.Debug {
@@ -201,6 +220,13 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error {
obj.Logf("%s: CheckApply(%t): Return(%t, %s)", res, !noop, checkOK, engineUtil.CleanError(err))
}
}
wg.Wait()
checkOK = checkOK && exportOK // always combine
if err == nil { // If CheckApply didn't error, look at exportOK.
// This is because if CheckApply errors we don't need to care or
// tell anyone about an exporting error.
err = exportErr
}
if checkOK && err != nil { // should never return this way
return fmt.Errorf("%s: resource programming error: CheckApply(%t): %t, %+v", res, !noop, checkOK, err)
@@ -304,14 +330,24 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
}
// initialize or reinitialize the meta state for this resource uid
obj.mlock.Lock()
if _, exists := obj.metas[engine.PtrUID(res)]; !exists || res.MetaParams().Reset {
obj.metas[engine.PtrUID(res)] = &engine.MetaState{
CheckApplyRetry: res.MetaParams().Retry, // lookup the retry value
}
// if we're using a Hidden resource, we don't support this feature
// TODO: should we consider supporting it? is it really necessary?
// XXX: to support this for Hidden, we'd need to handle dupe names
metas := &engine.MetaState{
CheckApplyRetry: res.MetaParams().Retry, // lookup the retry value
}
if !res.MetaParams().Hidden {
// Skip this if Hidden since we can have a hidden res that has
// the same kind+name as a regular res, and this would conflict.
obj.mlock.Lock()
if _, exists := obj.metas[engine.PtrUID(res)]; !exists || res.MetaParams().Reset {
obj.metas[engine.PtrUID(res)] = &engine.MetaState{
CheckApplyRetry: res.MetaParams().Retry, // lookup the retry value
}
}
metas = obj.metas[engine.PtrUID(res)] // handle
obj.mlock.Unlock()
}
metas := obj.metas[engine.PtrUID(res)] // handle
obj.mlock.Unlock()
//defer close(obj.state[vertex].stopped) // done signal
@@ -376,10 +412,21 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
delay = 0 // reset
continue
}
} else if res.MetaParams().Hidden {
// We're not running Watch
if obj.Debug {
obj.Logf("%s: Hidden", res)
}
obj.state[vertex].cuid.StartTimer() // TODO: Should we do this?
err = obj.state[vertex].hidden(obj.state[vertex].doneCtx)
obj.state[vertex].cuid.StopTimer() // TODO: Should we do this?
} else if interval := res.MetaParams().Poll; interval > 0 { // poll instead of watching :(
obj.state[vertex].cuid.StartTimer()
err = obj.state[vertex].poll(obj.state[vertex].doneCtx, interval)
obj.state[vertex].cuid.StopTimer() // clean up nicely
} else {
obj.state[vertex].cuid.StartTimer()
if obj.Debug {

View File

@@ -59,9 +59,12 @@ type Engine struct {
Version string
Hostname string
// Break off separate logical pieces into chunks where possible.
Converger *converger.Coordinator
Local *local.API
World engine.World
Exporter *Exporter
Local *local.API
World engine.World
// Prefix is a unique directory prefix which can be used. It should be
// created if needed.
@@ -85,6 +88,7 @@ type Engine struct {
paused bool // are we paused?
fastPause bool
isClosing bool // are we shutting down?
}
// Init initializes the internal structures and starts this the graph running.
@@ -116,7 +120,7 @@ func (obj *Engine) Init() error {
obj.wlock = &sync.Mutex{}
obj.mlock = &sync.Mutex{}
obj.metas = make(map[engine.ResPtrUID]*engine.MetaState)
obj.metas = make(map[engine.ResPtrUID]*engine.MetaState) // don't include .Hidden res
obj.slock = &sync.Mutex{}
obj.semas = make(map[string]*semaphore.Semaphore)
@@ -125,6 +129,18 @@ func (obj *Engine) Init() error {
obj.paused = true // start off true, so we can Resume after first Commit
obj.Exporter = &Exporter{
World: obj.World,
Debug: obj.Debug,
Logf: func(format string, v ...interface{}) {
// TODO: is this a sane prefix to use here?
obj.Logf("export: "+format, v...)
},
}
if err := obj.Exporter.Init(); err != nil {
return err
}
return nil
}
@@ -188,6 +204,12 @@ func (obj *Engine) Commit() error {
if !ok { // should not happen, previously validated
return fmt.Errorf("not a Res")
}
// Skip this if Hidden since we can have a hidden res that has
// the same kind+name as a regular res, and this would conflict.
if res.MetaParams().Hidden {
continue
}
activeMetas[engine.PtrUID(res)] = struct{}{} // add
}
@@ -208,7 +230,11 @@ func (obj *Engine) Commit() error {
return fmt.Errorf("the Res state already exists")
}
activeMetas[engine.PtrUID(res)] = struct{}{} // add
// Skip this if Hidden since we can have a hidden res that has
// the same kind+name as a regular res, and this would conflict.
if !res.MetaParams().Hidden {
activeMetas[engine.PtrUID(res)] = struct{}{} // add
}
if obj.Debug {
obj.Logf("Validate(%s)", res)
@@ -299,7 +325,12 @@ func (obj *Engine) Commit() error {
if !ok { // should not happen, previously validated
return fmt.Errorf("not a Res")
}
delete(activeMetas, engine.PtrUID(res))
// Skip this if Hidden since we can have a hidden res that has
// the same kind+name as a regular res, and this would conflict.
if !res.MetaParams().Hidden {
delete(activeMetas, engine.PtrUID(res))
}
// wait for exit before starting new graph!
close(obj.state[vertex].removeDone) // causes doneCtx to cancel
@@ -501,6 +532,7 @@ func (obj *Engine) Pause(fastPause bool) error {
// actually just a Load of an empty graph and a Commit. It waits for all the
// resources to exit before returning.
func (obj *Engine) Shutdown() error {
obj.isClosing = true
emptyGraph, reterr := pgraph.NewGraph("empty")
// this is a graph switch (graph sync) that switches to an empty graph!
@@ -517,6 +549,15 @@ func (obj *Engine) Shutdown() error {
return reterr
}
// IsClosing tells the caller if a Shutdown() was run. This is helpful so that
// the graph can behave slightly differently when receiving the final empty
// graph. This is because it's empty because we passed one to unload everything,
// not because the user actually removed all resources. We may want to preserve
// the exported state for example, and not purge it.
func (obj *Engine) IsClosing() bool {
return obj.isClosing
}
// Graph returns the running graph.
func (obj *Engine) Graph() *pgraph.Graph {
return obj.graph

349
engine/graph/exporter.go Normal file
View File

@@ -0,0 +1,349 @@
// Mgmt
// Copyright (C) James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//
// Additional permission under GNU GPL version 3 section 7
//
// If you modify this program, or any covered work, by linking or combining it
// with embedded mcl code and modules (and that the embedded mcl code and
// modules which link with this program, contain a copy of their source code in
// the authoritative form) containing parts covered by the terms of any other
// license, the licensors of this program grant you additional permission to
// convey the resulting work. Furthermore, the licensors of this program grant
// the original author, James Shubin, additional permission to update this
// additional permission if he deems it necessary to achieve the goals of this
// additional permission.
package graph
import (
"context"
"fmt"
"sync"
"github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/pgraph"
//"github.com/purpleidea/mgmt/pgraph"
engineUtil "github.com/purpleidea/mgmt/engine/util"
)
// Exporter is the main engine mechanism that sends the exported resource data
// to the World database. The code is relatively succinct, but slightly subtle.
type Exporter struct {
// Watch specifies if we want to enable the additional watch feature. It
// should probably be left off unless we're debugging something or using
// weird environments where we expect someone to mess with our res data.
Watch bool
World engine.World
Debug bool
Logf func(format string, v ...interface{})
state map[engine.ResDelete]bool // key NOT a pointer for it to be unique
prev map[engine.ResDelete]pgraph.Vertex
mutex *sync.Mutex
// watch specific variables
workerRunning bool
workerWg *sync.WaitGroup
workerCtx context.Context
workerCancel func()
}
// Init performs some initialization before first use. This is required.
func (obj *Exporter) Init() error {
obj.state = make(map[engine.ResDelete]bool)
obj.prev = make(map[engine.ResDelete]pgraph.Vertex)
obj.mutex = &sync.Mutex{}
obj.workerRunning = false
obj.workerWg = &sync.WaitGroup{}
obj.workerCtx, obj.workerCancel = context.WithCancel(context.Background())
return nil
}
// Export performs the worldly export, and then stores the resource unique ID in
// our in-memory data store. Exported resources use this tracking to know when
// to run their cleanups. If this function encounters an error, it returns
// (false, err). If it does nothing it returns (true, nil). If it does work it
// return (false, nil). These return codes match how CheckApply returns. This
// may run concurrently by multiple different resources, so as a result it must
// stay thread safe.
func (obj *Exporter) Export(ctx context.Context, res engine.Res) (bool, error) {
// As a result of running this operation in roughly the same places that
// the usual CheckApply step would run, we end up with a more nuanced
// and mature "exported resources" model than what was ever possible
// with other tools. We can now "wait" (via the resource graph
// dependencies) to run an export until an earlier resource dependency
// step has run. We can also programmatically "un-export" a resource by
// publishing a subsequent resource graph which either removes that
// Export flag or the entire resource. The one downside is that
// exporting to the database happens in multiple transactions rather
// than a batched bolus, but this is more appropriate because we're now
// more accurately modelling real-time systems, and this bandwidth is
// not a significant amount anyways. Lastly, we make sure to not run the
// purge when we ^C, since it should be safe to shutdown without killing
// all the data we left there.
if res.MetaParams().Noop {
return true, nil // did nothing
}
exports := res.MetaParams().Export
if len(exports) == 0 {
return true, nil // did nothing
}
// It's OK to check the cache here instead of re-sending via the World
// API and so on, because the only way the Res data would change in
// World is if (1) someone messed with etcd, which we'd see with Watch,
// or (2) if the Res data changed because we have a new resource graph.
// If we have a new resource graph, then any changed elements will get
// pruned from this state cache via the Prune method, which helps us.
// If send/recv or any other weird resource method changes things, then
// we also want to invalidate the state cache.
state := true
// TODO: This recv code is untested!
if r, ok := res.(engine.RecvableRes); ok {
for _, v := range r.Recv() { // map[string]*Send
// XXX: After we read the changed value, will it persist?
state = state && !v.Changed
}
}
obj.mutex.Lock()
for _, ptrUID := range obj.ptrUID(res) {
b := obj.state[*ptrUID] // no need to check if exists
state = state && b // if any are false, it's all false
}
obj.mutex.Unlock()
if state {
return true, nil // state OK!
}
// XXX: Do we want to change any metaparams when we export?
// XXX: Do we want to change any metaparams when we collect?
b64, err := obj.resToB64(res)
if err != nil {
return false, err
}
resourceExports := []*engine.ResExport{}
duplicates := make(map[string]struct{})
for _, export := range exports {
//ptrUID := engine.ResDelete{
// Kind: res.Kind(),
// Name: res.Name(),
// Host: export,
//}
if export == "*" {
export = "" // XXX: use whatever means "all"
}
if _, exists := duplicates[export]; exists {
continue
}
duplicates[export] = struct{}{}
// skip this check since why race it or split the resource...
//if stateOK := obj.state[ptrUID]; stateOK {
// // rare that we'd have a split of some of these from a
// // single resource updated and others already fine, but
// // might as well do the check since it's cheap...
// continue
//}
resExport := &engine.ResExport{
Kind: res.Kind(),
Name: res.Name(),
Host: export,
Data: b64, // encoded res data
}
resourceExports = append(resourceExports, resExport)
}
// The fact that we Watch the write-only-by-us values at all, is a
// luxury that allows us to handle mischievous actors that overwrote an
// exported value. It really isn't necessary. It's the consumers that
// really need to watch.
if err := obj.worker(); err != nil {
return false, err // big error
}
// TODO: Do we want to log more information about where this exports to?
obj.Logf("%s", res)
// XXX: Add a TTL if requested
b, err := obj.World.ResExport(ctx, resourceExports) // do it!
if err != nil {
return false, err
}
obj.mutex.Lock()
defer obj.mutex.Unlock()
// NOTE: The Watch() method *must* invalidate this state if it changes.
// This is only pertinent if we're using the luxury Watch add-ons.
for _, ptrUID := range obj.ptrUID(res) {
obj.state[*ptrUID] = true // state OK!
}
return b, nil
}
// Prune removes any exports which are no longer actively being presented in the
// resource graph. This cleans things up between graph swaps. This should NOT
// run if we're shutting down cleanly. Keep in mind that this must act on the
// new graph which is available by "Commit", not before we're ready to "Commit".
func (obj *Exporter) Prune(ctx context.Context, graph *pgraph.Graph) error {
// mutex should be optional since this should only run when graph paused
obj.mutex.Lock()
defer obj.mutex.Unlock()
// make searching faster by initially storing it all in a map
m := make(map[engine.ResDelete]pgraph.Vertex) // key is NOT a pointer
for _, v := range graph.Vertices() {
res, ok := v.(engine.Res)
if !ok { // should not happen
return fmt.Errorf("not a Res")
}
for _, ptrUID := range obj.ptrUID(res) { // skips non-export things
m[*ptrUID] = v
}
}
resourceDeletes := []*engine.ResDelete{}
for k := range obj.state {
v, exists := m[k] // exists means it's in the graph
prev := obj.prev[k]
obj.prev[k] = v // may be nil
if exists && v != prev { // pointer compare to old vertex
// Here we have a Res that previously existed under the
// same kind/name/host. We need to invalidate the state
// only if it's a different Res than the previous one!
// If we do this erroneously, it causes extra traffic.
obj.state[k] = false // do this only if the Res is NEW
continue // skip it, it's staying
}
delete(obj.state, k) // it's gone!
resourceDeletes = append(resourceDeletes, &k)
}
if len(resourceDeletes) == 0 {
return nil
}
obj.Logf("prune: %d exports", len(resourceDeletes))
for _, x := range resourceDeletes {
obj.Logf("prune: %s to %s", engine.Repr(x.Kind, x.Name), x.Host)
}
// XXX: this function could optimize the grouping since we split the
// list of host entries out from the kind/name since we can't have a
// unique map key with a struct that contains a slice.
if _, err := obj.World.ResDelete(ctx, resourceDeletes); err != nil {
return err
}
return nil
}
// resToB64 is a helper to refactor out this method.
func (obj *Exporter) resToB64(res engine.Res) (string, error) {
if r, ok := res.(engine.ExportableRes); ok {
return r.ToB64()
}
return engineUtil.ResToB64(res)
}
// ptrUID is a helper for this repetitive code.
func (obj *Exporter) ptrUID(res engine.Res) []*engine.ResDelete {
a := []*engine.ResDelete{}
for _, export := range res.MetaParams().Export {
if export == "*" {
export = "" // XXX: use whatever means "all"
}
ptrUID := &engine.ResDelete{
Kind: res.Kind(),
Name: res.Name(),
Host: export,
}
a = append(a, ptrUID)
}
return a
}
// worker is a helper to kick off the optional Watch workers.
func (obj *Exporter) worker() error {
if !obj.Watch {
return nil // feature is disabled
}
obj.mutex.Lock()
defer obj.mutex.Unlock()
if obj.workerRunning {
return nil // already running
}
kind := "" // watch everything
ch, err := obj.World.ResWatch(obj.workerCtx, kind) // (chan error, error)
if err != nil {
return err // big error
}
obj.workerRunning = true
obj.workerWg.Add(1)
go func() {
defer func() {
obj.mutex.Lock()
obj.workerRunning = false
obj.mutex.Unlock()
}()
defer obj.workerWg.Done()
Loop:
for {
var e error
var ok bool
select {
case e, ok = <-ch:
if !ok {
// chan closed
break Loop
}
case <-obj.workerCtx.Done():
break Loop
}
if e != nil {
// something errored... shutdown coming!
}
// event!
obj.mutex.Lock()
for k := range obj.state {
obj.state[k] = false // reset it all
}
obj.mutex.Unlock()
}
}()
return nil
}
// Shutdown cancels any running workers and waits for them to finish.
func (obj *Exporter) Shutdown() {
obj.workerCancel()
obj.workerWg.Wait()
}

View File

@@ -430,3 +430,13 @@ func (obj *State) poll(ctx context.Context, interval uint32) error {
obj.init.Event() // notify engine of an event (this can block)
}
}
// hidden is a replacement for Watch when the Hidden metaparameter is used.
func (obj *State) hidden(ctx context.Context) error {
obj.init.Running() // when started, notify engine that we're running
select {
case <-ctx.Done(): // signal for shutdown request
return nil
}
}

View File

@@ -53,6 +53,8 @@ var DefaultMetaParams = &MetaParams{
Rewatch: false,
Realize: false, // true would be more awesome, but unexpected for users
Dollar: false,
Hidden: false,
Export: []string{},
}
// MetaRes is the interface a resource must implement to support meta params.
@@ -140,6 +142,32 @@ type MetaParams struct {
// interpolate a variable name. In the rare case when it's needed, you
// can disable that check with this meta param.
Dollar bool `yaml:"dollar"`
// Hidden means that this resource will not get executed on the resource
// graph on which it is defined. This can be used as a simple boolean
// switch, or, more commonly in combination with the Export meta param
// which specifies that the resource params are exported into the shared
// database. When this is true, it does not prevent export. In fact, it
// is commonly used in combination with Export. Using this option will
// still include it in the resource graph, but it will exist there in a
// special "mode" where it will not conflict with any other identically
// named resources. It can even be used as part of an edge or via a
// send/recv receiver. It can NOT be a sending vertex. These properties
// differentiate the use of this instead of simply wrapping a resource
// in an "if" statement.
Hidden bool `yaml:"hidden"`
// Export is a list of hostnames (and/or the special "*" entry) which if
// set, will mark this resource data as intended for export to those
// hosts. This does not prevent any users of the shared data storage
// from reading these values, so if you want to guarantee secrecy, use
// the encryption primitives. This only labels the data accordingly, so
// that other hosts can know what data is available for them to collect.
// The (kind, name, host) export triple must be unique from any given
// exporter. In other words, you may not export two different instances
// of a kind+name to the same host, the exports must not conflict. On
// resource collect, this parameter is not preserved.
Export []string `yaml:"export"`
}
// Cmp compares two AutoGroupMeta structs and determines if they're equivalent.
@@ -189,6 +217,12 @@ func (obj *MetaParams) Cmp(meta *MetaParams) error {
if obj.Dollar != meta.Dollar {
return fmt.Errorf("values for Dollar are different")
}
if obj.Hidden != meta.Hidden {
return fmt.Errorf("values for Hidden are different")
}
if err := util.SortedStrSliceCompare(obj.Export, meta.Export); err != nil {
return errwrap.Wrapf(err, "values for Export are different")
}
return nil
}
@@ -208,6 +242,13 @@ func (obj *MetaParams) Validate() error {
}
}
for _, s := range obj.Export {
if s == "" {
return fmt.Errorf("export is empty")
}
}
// TODO: Should we validate the export patterns?
return nil
}
@@ -218,6 +259,11 @@ func (obj *MetaParams) Copy() *MetaParams {
sema = make([]string, len(obj.Sema))
copy(sema, obj.Sema)
}
export := []string{}
if obj.Export != nil {
export = make([]string, len(obj.Export))
copy(export, obj.Export)
}
return &MetaParams{
Noop: obj.Noop,
Retry: obj.Retry,
@@ -230,6 +276,8 @@ func (obj *MetaParams) Copy() *MetaParams {
Rewatch: obj.Rewatch,
Realize: obj.Realize,
Dollar: obj.Dollar,
Hidden: obj.Hidden,
Export: export,
}
}

View File

@@ -376,6 +376,22 @@ type CompatibleRes interface {
Merge(CompatibleRes) (CompatibleRes, error)
}
// ExportableRes allows the resource to have its own implementation of resource
// encoding, so that it can send data over the wire differently. It's unlikely
// that you will want to implement this interface for most scenarios. It may be
// useful to limit private data exposure, large data sizes, and to add more info
// to what would normally be shared.
type ExportableRes interface {
Res
// ToB64 lets the resource provide an alternative implementation of the
// usual ResToB64 method. This lets the resource omit, add, or modify
// the parameter data before it goes out over the wire.
ToB64() (string, error)
// TODO: Do we want to add a FromB64 method for decoding the Resource?
}
// YAMLRes is a resource that supports creation by unmarshalling.
type YAMLRes interface {
Res

View File

@@ -31,6 +31,7 @@ package engine
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/etcd/interfaces"
"github.com/purpleidea/mgmt/etcd/scheduler"
@@ -117,11 +118,103 @@ type StrWorld interface {
// ResWorld is a world interface that lets us store, pull and watch resources in
// a distributed database.
// XXX: These API's are likely to change.
// XXX: Add optional TTL's to these API's, maybe use WithTTL(...) type options.
type ResWorld interface {
ResWatch(context.Context) (chan error, error)
ResExport(context.Context, []Res) error
// FIXME: should this method take a "filter" data struct instead of many args?
ResCollect(ctx context.Context, hostnameFilter, kindFilter []string) ([]Res, error)
// ResWatch returns a channel which produces a new value once on startup
// as soon as it is successfully connected, and once for every time it
// sees that a resource that has been exported for this hostname is
// added, deleted, or modified. If kind is specified, the watch will
// attempt to only send events relating to that resource kind. We always
// intended to only show events for resources which the watching host is
// allowed to see.
ResWatch(ctx context.Context, kind string) (chan error, error)
// ResCollect does a lookup for resource entries that have previously
// been stored for us. It returns a subset of these based on the input
// filter. It does not return a Res, since while that would be useful,
// and logical, it turns out we usually want to transport the Res data
// onwards through the function graph, and using a native string is what
// is already supported. (A native res type would just be encoded as a
// string anyways.) While it might be more "correct" to do the work to
// decode the string into a Res, the user of this function would just
// encode it back to a string anyways, and this is not very efficient.
ResCollect(ctx context.Context, filters []*ResFilter) ([]*ResOutput, error)
// ResExport stores a number of resources in the world storage system.
// The individual records should not be updated if they are identical to
// what is already present. (This is to prevent unnecessary events.) If
// this makes no changes, it returns (true, nil). If it makes a change,
// then it returns (false, nil). On any error we return (false, err).
ResExport(ctx context.Context, resourceExports []*ResExport) (bool, error)
// ResDelete deletes a number of resources in the world storage system.
// If this doesn't delete, it returns (true, nil). If it makes a delete,
// then it returns (false, nil). On any error we return (false, err).
ResDelete(ctx context.Context, resourceDeletes []*ResDelete) (bool, error)
}
// ResFilter specifies that we want to match an item with this three tuple. If
// any of these are empty, then it means to match an item with any value for
// that field.
// TODO: Future secure implementations must verify that the exported made a
// value available to that hostname. It's not enough for a host to request it.
// We can enforce this with public key encryption eventually.
type ResFilter struct {
Kind string
Name string
Host string // from this host
}
// Match returns nil on a successful match.
func (obj *ResFilter) Match(kind, name, host string) error {
if obj.Kind != "" && obj.Kind != kind {
return fmt.Errorf("kind did not match")
}
if obj.Name != "" && obj.Name != name {
return fmt.Errorf("name did not match")
}
if obj.Host != "" && obj.Host != host {
return fmt.Errorf("host did not match")
}
return nil // match!
}
// ResOutput represents a record of exported resource data which we have read
// out from the world storage system. The Data field contains an encoded version
// of the resource, and even though decoding it will get you a Kind and Name, we
// still store those values here in duplicate for them to be available before
// decoding.
type ResOutput struct {
Kind string
Name string
Host string // from this host
Data string // encoded res data
}
// ResExport represents a record of exported resource data which we want to save
// to the world storage system. The Data field contains an encoded version of
// the resource, and even though decoding it will get you a Kind and Name, we
// still store those values here in duplicate for them to be available before
// decoding. If Host is specified, then only the node with that hostname may
// access this resource. If it's empty than it may be collected by anyone. If we
// want to export to only three hosts, then we duplicate this entry three times.
// It's true that this is not an efficient use of storage space, but it maps
// logically to a future data structure where data is encrypted to the public
// key of that specific host where we wouldn't be able to de-duplicate anyways.
type ResExport struct {
Kind string
Name string
Host string // to/for this host
Data string // encoded res data
}
// ResDelete represents the uniqueness key for stored resources. As a result,
// this triple is a useful map key in various locations.
type ResDelete struct {
Kind string
Name string
Host string // to/for this host
}
// SchedulerWorld is an interface that has to do with distributed scheduling.

View File

@@ -35,11 +35,12 @@ import (
"strings"
"github.com/purpleidea/mgmt/engine"
engineUtil "github.com/purpleidea/mgmt/engine/util"
"github.com/purpleidea/mgmt/etcd/interfaces"
"github.com/purpleidea/mgmt/util"
"github.com/purpleidea/mgmt/util/errwrap"
etcd "go.etcd.io/etcd/client/v3"
clientv3Util "go.etcd.io/etcd/client/v3/clientv3util"
//pb "go.etcd.io/etcd/api/v3/etcdserverpb"
)
const (
@@ -50,94 +51,26 @@ const (
// change.
// TODO: Filter our watch (on the server side if possible) based on the
// collection prefixes and filters that we care about...
func WatchResources(ctx context.Context, client interfaces.Client) (chan error, error) {
path := fmt.Sprintf("%s/exported/", ns)
// XXX: filter based on kind as well, we don't do that currently... See:
// https://github.com/etcd-io/etcd/issues/19667
func WatchResources(ctx context.Context, client interfaces.Client, hostname, kind string) (chan error, error) {
// key structure is $NS/exported/$hostname:to/$hostname:from/$kind/$name = $data
// TODO: support the star (*) hostname matching catch-all?
path := fmt.Sprintf("%s/exported/%s/", ns, hostname)
return client.Watcher(ctx, path, etcd.WithPrefix())
}
// SetResources exports all of the resources which we pass in to etcd.
func SetResources(ctx context.Context, client interfaces.Client, hostname string, resourceList []engine.Res) error {
// key structure is $NS/exported/$hostname/resources/$uid = $data
var kindFilter []string // empty to get from everyone
hostnameFilter := []string{hostname}
// this is not a race because we should only be reading keys which we
// set, and there should not be any contention with other hosts here!
originals, err := GetResources(ctx, client, hostnameFilter, kindFilter)
if err != nil {
return err
}
if len(originals) == 0 && len(resourceList) == 0 { // special case of no add or del
return nil
}
ifs := []etcd.Cmp{} // list matching the desired state
ops := []etcd.Op{} // list of ops in this transaction
for _, res := range resourceList {
if res.Kind() == "" {
return fmt.Errorf("empty kind: %s", res.Name())
}
uid := fmt.Sprintf("%s/%s", res.Kind(), res.Name())
path := fmt.Sprintf("%s/exported/%s/resources/%s", ns, hostname, uid)
if data, err := engineUtil.ResToB64(res); err == nil {
ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", data)) // desired state
ops = append(ops, etcd.OpPut(path, data))
} else {
return fmt.Errorf("can't convert to B64: %v", err)
}
}
match := func(res engine.Res, resourceList []engine.Res) bool { // helper lambda
for _, x := range resourceList {
if res.Kind() == x.Kind() && res.Name() == x.Name() {
return true
}
}
return false
}
hasDeletes := false
// delete old, now unused resources here...
for _, res := range originals {
if res.Kind() == "" {
return fmt.Errorf("empty kind: %s", res.Name())
}
uid := fmt.Sprintf("%s/%s", res.Kind(), res.Name())
path := fmt.Sprintf("%s/exported/%s/resources/%s", ns, hostname, uid)
if match(res, resourceList) { // if we match, no need to delete!
continue
}
ops = append(ops, etcd.OpDelete(path))
hasDeletes = true
}
// if everything is already correct, do nothing, otherwise, run the ops!
// it's important to do this in one transaction, and atomically, because
// this way, we only generate one watch event, and only when it's needed
if hasDeletes { // always run, ifs don't matter
_, err = client.Txn(ctx, nil, ops, nil) // TODO: does this run? it should!
} else {
_, err = client.Txn(ctx, ifs, nil, ops) // TODO: do we need to look at response?
}
return err
}
// GetResources collects all of the resources which match a filter from etcd. If
// the kindfilter or hostnameFilter is empty, then it assumes no filtering...
// TODO: Expand this with a more powerful filter based on what we eventually
// support in our collect DSL. Ideally a server side filter like WithFilter()
// could do this if the pattern was $NS/exported/$kind/$hostname/$uid = $data.
func GetResources(ctx context.Context, client interfaces.Client, hostnameFilter, kindFilter []string) ([]engine.Res, error) {
// key structure is $NS/exported/$hostname/resources/$uid = $data
// GetResources reads the resources sent to the input hostname, and also applies
// the filters to ensure we get a limited selection.
// XXX: We'd much rather filter server side if etcd had better filtering API's.
// See: https://github.com/etcd-io/etcd/issues/19667
func GetResources(ctx context.Context, client interfaces.Client, hostname string, filters []*engine.ResFilter) ([]*engine.ResOutput, error) {
// key structure is $NS/exported/$hostname:to/$hostname:from/$kind/$name = $data
path := fmt.Sprintf("%s/exported/", ns)
resourceList := []engine.Res{}
output := []*engine.ResOutput{}
keyMap, err := client.Get(ctx, path, etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortAscend))
if err != nil {
return nil, fmt.Errorf("could not get resources: %v", err)
return nil, errwrap.Wrapf(err, "could not get resources")
}
for key, val := range keyMap {
if !strings.HasPrefix(key, path) { // sanity check
@@ -145,35 +78,118 @@ func GetResources(ctx context.Context, client interfaces.Client, hostnameFilter,
}
str := strings.Split(key[len(path):], "/")
if len(str) != 4 {
return nil, fmt.Errorf("unexpected chunk count")
if len(str) < 4 {
return nil, fmt.Errorf("unexpected chunk count of: %d", len(str))
}
hostname, r, kind, name := str[0], str[1], str[2], str[3]
if r != "resources" {
return nil, fmt.Errorf("unexpected chunk pattern")
// The name may contain slashes, so join all those pieces back!
hostnameTo, hostnameFrom, kind, name := str[0], str[1], str[2], strings.Join(str[3:], "/")
if hostnameTo == "" || hostnameFrom == "" {
return nil, fmt.Errorf("unexpected empty hostname")
}
if kind == "" {
return nil, fmt.Errorf("unexpected kind chunk")
return nil, fmt.Errorf("unexpected empty kind")
}
if name == "" { // TODO: should I check this?
if name == "" {
return nil, fmt.Errorf("unexpected empty name")
}
// FIXME: ideally this would be a server side filter instead!
if len(hostnameFilter) > 0 && !util.StrInList(hostname, hostnameFilter) {
// XXX: Do we want to include this catch-all match?
if hostnameTo != hostname && hostnameTo != "*" { // star is any
continue
}
// FIXME: ideally this would be a server side filter instead!
if len(kindFilter) > 0 && !util.StrInList(kind, kindFilter) {
continue
// TODO: I'd love to avoid this O(N^2) matching if possible...
for _, filter := range filters {
if err := filter.Match(kind, name, hostnameFrom); err != nil {
continue // did not match
}
}
if res, err := engineUtil.B64ToRes(val); err == nil {
//obj.Logf("Get: (Hostname, Kind, Name): (%s, %s, %s)", hostname, kind, name)
resourceList = append(resourceList, res)
} else {
return nil, fmt.Errorf("can't convert from B64: %v", err)
ro := &engine.ResOutput{
Kind: kind,
Name: name,
Host: hostnameFrom, // from this host
Data: val, // encoded res data
}
output = append(output, ro)
}
return resourceList, nil
return output, nil
}
// SetResources stores some resource data for export in etcd. It returns an
// error if anything goes wrong. If it didn't need to make a changes because the
// data was already correct in the database, it returns (true, nil). Otherwise
// it returns (false, nil).
func SetResources(ctx context.Context, client interfaces.Client, hostname string, resourceExports []*engine.ResExport) (bool, error) {
// key structure is $NS/exported/$hostname:to/$hostname:from/$kind/$name = $data
// XXX: We run each export one at a time, because there's a bug if we
// group them, See: https://github.com/etcd-io/etcd/issues/19678
b := true
for _, re := range resourceExports {
ifs := []etcd.Cmp{} // list matching the desired state
thn := []etcd.Op{} // list of ops in this transaction (then)
els := []etcd.Op{} // list of ops in this transaction (else)
host := re.Host
if host == "" {
host = "*" // XXX: use whatever means "all"
}
path := fmt.Sprintf("%s/exported/%s/%s/%s/%s", ns, host, hostname, re.Kind, re.Name)
ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", re.Data))
els = append(els, etcd.OpPut(path, re.Data))
// it's important to do this in one transaction, and atomically, because
// this way, we only generate one watch event, and only when it's needed
out, err := client.Txn(ctx, ifs, thn, els)
if err != nil {
return false, err
}
b = b && out.Succeeded // collect the true/false responses...
}
// false means something changed
return b, nil
}
// DelResources deletes some exported resource data from etcd. It returns an
// error if anything goes wrong. If it didn't need to make a changes because the
// data was already correct in the database, it returns (true, nil). Otherwise
// it returns (false, nil).
func DelResources(ctx context.Context, client interfaces.Client, hostname string, resourceDeletes []*engine.ResDelete) (bool, error) {
// key structure is $NS/exported/$hostname:to/$hostname:from/$kind/$name = $data
// XXX: We run each delete one at a time, because there's a bug if we
// group them, See: https://github.com/etcd-io/etcd/issues/19678
b := true
for _, rd := range resourceDeletes {
ifs := []etcd.Cmp{} // list matching the desired state
thn := []etcd.Op{} // list of ops in this transaction (then)
els := []etcd.Op{} // list of ops in this transaction (else)
host := rd.Host
if host == "" {
host = "*" // XXX: use whatever means "all"
}
path := fmt.Sprintf("%s/exported/%s/%s/%s/%s", ns, host, hostname, rd.Kind, rd.Name)
ifs = append(ifs, clientv3Util.KeyExists(path))
thn = append(thn, etcd.OpDelete(path))
// it's important to do this in one transaction, and atomically, because
// this way, we only generate one watch event, and only when it's needed
out, err := client.Txn(ctx, ifs, thn, els)
if err != nil {
return false, err
}
b = b && out.Succeeded // collect the true/false responses...
}
// false means something changed
return b, nil
}

View File

@@ -167,23 +167,32 @@ func (obj *World) AddDeploy(ctx context.Context, id uint64, hash, pHash string,
// ResWatch returns a channel which spits out events on possible exported
// resource changes.
func (obj *World) ResWatch(ctx context.Context) (chan error, error) {
return resources.WatchResources(ctx, obj.client)
func (obj *World) ResWatch(ctx context.Context, kind string) (chan error, error) {
return resources.WatchResources(ctx, obj.client, obj.init.Hostname, kind)
}
// ResExport exports a list of resources under our hostname namespace.
// Subsequent calls replace the previously set collection atomically.
func (obj *World) ResExport(ctx context.Context, resourceList []engine.Res) error {
return resources.SetResources(ctx, obj.client, obj.init.Hostname, resourceList)
}
// ResCollect gets the collection of exported resources which match the filter.
// ResCollect gets the collection of exported resources which match the filters.
// It does this atomically so that a call always returns a complete collection.
func (obj *World) ResCollect(ctx context.Context, hostnameFilter, kindFilter []string) ([]engine.Res, error) {
// XXX: should we be restricted to retrieving resources that were
// exported with a tag that allows or restricts our hostname? We could
// enforce that here if the underlying API supported it... Add this?
return resources.GetResources(ctx, obj.client, hostnameFilter, kindFilter)
func (obj *World) ResCollect(ctx context.Context, filters []*engine.ResFilter) ([]*engine.ResOutput, error) {
return resources.GetResources(ctx, obj.client, obj.init.Hostname, filters)
}
// ResExport stores a number of resources in the world storage system. The
// individual records should not be updated if they are identical to what is
// already present. (This is to prevent unnecessary events.) If this makes no
// changes, it returns (true, nil). If it makes a change, then it returns
// (false, nil). On any error we return (false, err). It stores the exports
// under our hostname namespace. Subsequent calls do NOT replace the previously
// set collection.
func (obj *World) ResExport(ctx context.Context, resourceExports []*engine.ResExport) (bool, error) {
return resources.SetResources(ctx, obj.client, obj.init.Hostname, resourceExports)
}
// ResDelete deletes a number of resources in the world storage system. If this
// doesn't delete, it returns (true, nil). If it makes a delete, then it returns
// (false, nil). On any error we return (false, err).
func (obj *World) ResDelete(ctx context.Context, resourceDeletes []*engine.ResDelete) (bool, error) {
return resources.DelResources(ctx, obj.client, obj.init.Hostname, resourceDeletes)
}
// IdealClusterSizeWatch returns a stream of errors anytime the cluster-wide

27
examples/lang/export0.mcl Normal file
View File

@@ -0,0 +1,27 @@
file "/tmp/hello" {
content => "hello world from @purpleidea\n",
state => $const.res.file.state.exists,
Meta:hidden => true,
#Meta:export => ["*",], # export to all
#Meta:export => ["hostname1",], # export to just this one
Meta:export => [ # export to everyone in this list
"${hostname}",
"hostname2",
"hostname3",
],
}
collect file "/tmp/hello" {
#content => "i was collected\n", # override
Meta:hidden => false,
}
# collect a more complex way (use helper functions here instead!)
collect file [
struct{name => "/tmp/hello", host => "${hostname}",},
] {
Meta:hidden => false,
}

View File

@@ -382,6 +382,10 @@ type StmtRes struct {
Name interfaces.Expr // unique name for the res of this kind
namePtr interfaces.Func // ptr for table lookup
Contents []StmtResContents // list of fields/edges in parsed order
// Collect specifies that we are "collecting" exported resources. The
// names come from other hosts (or from ourselves during a self-export).
Collect bool
}
// String returns a short representation of this statement.
@@ -426,6 +430,7 @@ func (obj *StmtRes) Init(data *interfaces.Data) error {
}
fieldNames := make(map[string]struct{})
metaNames := make(map[string]struct{})
foundCollect := false
for _, x := range obj.Contents {
// Duplicate checking for identical field names.
@@ -444,12 +449,27 @@ func (obj *StmtRes) Init(data *interfaces.Data) error {
// Ignore the generic MetaField struct field for now.
// You're allowed to have more than one Meta field, but
// they can't contain the same field twice.
// FIXME: Allow duplicates in certain fields, such as
// ones that are lists... In this case, they merge...
if _, exists := metaNames[line.Property]; exists && line.Property != MetaField {
return fmt.Errorf("resource has duplicate meta entry of: %s", line.Property)
}
metaNames[line.Property] = struct{}{}
}
// Duplicate checking for more than one StmtResCollect entry.
if stmt, ok := x.(*StmtResCollect); ok && foundCollect {
// programming error
return fmt.Errorf("duplicate collect body in res")
} else if ok {
if stmt.Kind != obj.Kind {
// programming error
return fmt.Errorf("unexpected kind mismatch")
}
foundCollect = true // found one
}
if err := x.Init(data); err != nil {
return err
}
@@ -481,6 +501,7 @@ func (obj *StmtRes) Interpolate() (interfaces.Stmt, error) {
Kind: obj.Kind,
Name: name,
Contents: contents,
Collect: obj.Collect,
}, nil
}
@@ -522,6 +543,7 @@ func (obj *StmtRes) Copy() (interfaces.Stmt, error) {
Kind: obj.Kind,
Name: name,
Contents: contents,
Collect: obj.Collect,
}, nil
}
@@ -630,6 +652,10 @@ func (obj *StmtRes) TypeCheck() ([]*interfaces.UnificationInvariant, error) {
// TODO: Check other cases, like if it's a function call, and we know it
// can only return a single string. (Eg: fmt.printf for example.)
isString := false
isListString := false
isCollectType := false
typCollectFuncInType := types.NewType(funcs.CollectFuncInType)
if _, ok := obj.Name.(*ExprStr); ok {
// It's a string! (A plain string was specified.)
isString = true
@@ -639,14 +665,41 @@ func (obj *StmtRes) TypeCheck() ([]*interfaces.UnificationInvariant, error) {
if typ.Cmp(types.TypeStr) == nil {
isString = true
}
if typ.Cmp(types.TypeListStr) == nil {
isListString = true
}
if typ.Cmp(typCollectFuncInType) == nil {
isCollectType = true
}
}
typExpr := types.TypeListStr // default
var typExpr *types.Type // nil
// If we pass here, we only allow []str, no need for exclusives!
if isString {
typExpr = types.TypeStr
}
if isListString {
typExpr = types.TypeListStr // default for regular resources
}
if isCollectType && obj.Collect {
typExpr = typCollectFuncInType
}
if !obj.Collect && typExpr == nil {
typExpr = types.TypeListStr // default for regular resources
}
if obj.Collect && typExpr == nil {
// TODO: do we want a default for collect ?
typExpr = typCollectFuncInType // default for collect resources
}
if typExpr == nil { // If we don't know for sure, then we unify it all.
typExpr = &types.Type{
Kind: types.KindUnification,
Uni: types.NewElem(), // unification variable, eg: ?1
}
}
invar := &interfaces.UnificationInvariant{
Node: obj,
@@ -747,16 +800,87 @@ func (obj *StmtRes) Output(table map[interfaces.Func]types.Value) (*interfaces.O
return nil, fmt.Errorf("%w: %T", ErrTableNoValue, obj)
}
names := []string{} // list of names to build
// the host in this output is who the data is from
mapping, err := obj.collect(table) // gives us (name, host, data)
if err != nil {
return nil, err
}
if mapping == nil { // for when we're not collecting
mapping = make(map[string]map[string]string)
}
typCollectFuncInType := types.NewType(funcs.CollectFuncInType)
names := []string{} // list of names to build (TODO: map instead?)
switch {
case types.TypeStr.Cmp(nameValue.Type()) == nil:
name := nameValue.Str() // must not panic
names = append(names, name)
for n := range mapping { // delete everything else
if n == name {
continue
}
delete(mapping, n)
}
if !obj.Collect { // mapping is empty, add a stub
mapping[name] = map[string]string{
"*": "", // empty data
}
}
case types.TypeListStr.Cmp(nameValue.Type()) == nil:
for _, x := range nameValue.List() { // must not panic
name := x.Str() // must not panic
names = append(names, name)
if !obj.Collect { // mapping is empty, add a stub
mapping[name] = map[string]string{
"*": "", // empty data
}
}
}
for n := range mapping { // delete everything else
if util.StrInList(n, names) {
continue
}
delete(mapping, n)
}
case obj.Collect && typCollectFuncInType.Cmp(nameValue.Type()) == nil:
hosts := make(map[string]string)
for _, x := range nameValue.List() { // must not panic
st, ok := x.(*types.StructValue)
if !ok {
// programming error
return nil, fmt.Errorf("value is not a struct")
}
name, exists := st.Lookup(funcs.CollectFuncInFieldName)
if !exists {
// programming error?
return nil, fmt.Errorf("name field is missing")
}
host, exists := st.Lookup(funcs.CollectFuncInFieldHost)
if !exists {
// programming error?
return nil, fmt.Errorf("host field is missing")
}
s := name.Str() // must not panic
names = append(names, s)
// host is the input telling us who we want to pull from
hosts[s] = host.Str() // correspondence map
}
for n, m := range mapping { // delete everything else
if !util.StrInList(n, names) {
delete(mapping, n)
continue
}
host := hosts[n] // the matching host for the name
for h := range m {
if h == host {
continue
}
delete(mapping[n], h)
}
}
default:
@@ -766,22 +890,32 @@ func (obj *StmtRes) Output(table map[interfaces.Func]types.Value) (*interfaces.O
resources := []engine.Res{}
edges := []*interfaces.Edge{}
for _, name := range names {
res, err := obj.resource(table, name)
if err != nil {
return nil, errwrap.Wrapf(err, "error building resource")
}
edgeList, err := obj.edges(table, name)
if err != nil {
return nil, errwrap.Wrapf(err, "error building edges")
}
edges = append(edges, edgeList...)
apply, err := obj.metaparams(table)
if err != nil {
return nil, errwrap.Wrapf(err, "error generating metaparams")
}
if err := obj.metaparams(table, res); err != nil { // set metaparams
return nil, errwrap.Wrapf(err, "error building meta params")
// TODO: sort?
for name, m := range mapping {
for host, data := range m {
// host may be * if not collecting
// data may be empty if not collecting
_ = host // unused atm
res, err := obj.resource(table, name, data) // one at a time
if err != nil {
return nil, errwrap.Wrapf(err, "error building resource")
}
apply(res) // apply metaparams, does not return anything
resources = append(resources, res)
edgeList, err := obj.edges(table, name)
if err != nil {
return nil, errwrap.Wrapf(err, "error building edges")
}
edges = append(edges, edgeList...)
}
resources = append(resources, res)
}
return &interfaces.Output{
@@ -790,14 +924,115 @@ func (obj *StmtRes) Output(table map[interfaces.Func]types.Value) (*interfaces.O
}, nil
}
// collect is a helper function to pull out the collected resource data.
func (obj *StmtRes) collect(table map[interfaces.Func]types.Value) (map[string]map[string]string, error) {
if !obj.Collect {
return nil, nil // nothing to do
}
var val types.Value // = nil
typCollectFuncOutType := types.NewType(funcs.CollectFuncOutType)
for _, line := range obj.Contents {
x, ok := line.(*StmtResCollect)
if !ok {
continue
}
if x.Kind != obj.Kind { // should have been caught in Init
// programming error
return nil, fmt.Errorf("unexpected kind mismatch")
}
if x.valuePtr == nil {
return nil, fmt.Errorf("%w: %T", ErrFuncPointerNil, obj)
}
fv, exists := table[x.valuePtr]
if !exists {
return nil, fmt.Errorf("%w: %T", ErrTableNoValue, obj)
}
if err := fv.Type().Cmp(typCollectFuncOutType); err != nil { // "[]struct{name str; host str; data str}"
// programming error
return nil, fmt.Errorf("resource collect has invalid type: `%+v`", err)
}
val = fv // found
break
}
if val == nil {
// programming error?
return nil, nil // nothing found
}
m := make(map[string]map[string]string) // name, host, data
// TODO: Store/cache this in an efficient form to avoid loops...
// TODO: Eventually collect func should, for efficiency, return:
// map{struct{name str; host str}: str} // key => $data
for _, x := range val.List() { // must not panic
st, ok := x.(*types.StructValue)
if !ok {
// programming error
return nil, fmt.Errorf("value is not a struct")
}
name, exists := st.Lookup(funcs.CollectFuncOutFieldName)
if !exists {
// programming error?
return nil, fmt.Errorf("name field is missing")
}
host, exists := st.Lookup(funcs.CollectFuncOutFieldHost)
if !exists {
// programming error?
return nil, fmt.Errorf("host field is missing")
}
data, exists := st.Lookup(funcs.CollectFuncOutFieldData)
if !exists {
// programming error?
return nil, fmt.Errorf("data field is missing")
}
// found!
n := name.Str() // must not panic
h := host.Str() // must not panic
if _, exists := m[n]; !exists {
m[n] = make(map[string]string)
}
m[n][h] = data.Str() // must not panic
}
return m, nil
}
// resource is a helper function to generate the res that comes from this.
// TODO: it could memoize some of the work to avoid re-computation when looped
func (obj *StmtRes) resource(table map[interfaces.Func]types.Value, resName string) (engine.Res, error) {
func (obj *StmtRes) resource(table map[interfaces.Func]types.Value, resName, data string) (engine.Res, error) {
res, err := engine.NewNamedResource(obj.Kind, resName)
if err != nil {
return nil, errwrap.Wrapf(err, "cannot create resource kind `%s` with named `%s`", obj.Kind, resName)
}
// Here we start off by using the collected resource as the base params.
// Then we overwrite over it below using the normal param setup methods.
if obj.Collect && data != "" {
// TODO: Do we want to have an alternate implementation of this
// to go along with the ExportableRes encoding variant?
if res, err = engineUtil.B64ToRes(data); err != nil {
return nil, fmt.Errorf("can't convert from B64: %v", err)
}
if res.Kind() != obj.Kind { // should have been caught somewhere
// programming error
return nil, fmt.Errorf("unexpected kind mismatch")
}
obj.data.Logf("collect: %s", res)
// XXX: Do we want to change any metaparams when we collect?
// XXX: Do we want to change any metaparams when we export?
//res.MetaParams().Hidden = false // unlikely, but I considered
res.MetaParams().Export = []string{} // don't re-export
}
sv := reflect.ValueOf(res).Elem() // pointer to struct, then struct
if k := sv.Kind(); k != reflect.Struct {
panic(fmt.Sprintf("expected struct, got: %s", k))
@@ -1015,23 +1250,10 @@ func (obj *StmtRes) edges(table map[interfaces.Func]types.Value, resName string)
return edges, nil
}
// metaparams is a helper function to set the metaparams that come from the
// resource on to the individual resource we're working on.
func (obj *StmtRes) metaparams(table map[interfaces.Func]types.Value, res engine.Res) error {
meta := engine.DefaultMetaParams.Copy() // defaults
var rm *engine.ReversibleMeta
if r, ok := res.(engine.ReversibleRes); ok {
rm = r.ReversibleMeta() // get a struct with the defaults
}
var aem *engine.AutoEdgeMeta
if r, ok := res.(engine.EdgeableRes); ok {
aem = r.AutoEdgeMeta() // get a struct with the defaults
}
var agm *engine.AutoGroupMeta
if r, ok := res.(engine.GroupableRes); ok {
agm = r.AutoGroupMeta() // get a struct with the defaults
}
// metaparams is a helper function to get the metaparams that come from the
// resource AST so we can eventually set them on the individual resource.
func (obj *StmtRes) metaparams(table map[interfaces.Func]types.Value) (func(engine.Res), error) {
apply := []func(engine.Res){}
for _, line := range obj.Contents {
x, ok := line.(*StmtResMeta)
@@ -1041,11 +1263,11 @@ func (obj *StmtRes) metaparams(table map[interfaces.Func]types.Value, res engine
if x.Condition != nil {
if x.conditionPtr == nil {
return fmt.Errorf("%w: %T", ErrFuncPointerNil, obj)
return nil, fmt.Errorf("%w: %T", ErrFuncPointerNil, obj)
}
b, exists := table[x.conditionPtr]
if !exists {
return fmt.Errorf("%w: %T", ErrTableNoValue, obj)
return nil, fmt.Errorf("%w: %T", ErrTableNoValue, obj)
}
if !b.Bool() { // if value exists, and is false, skip it
@@ -1054,47 +1276,63 @@ func (obj *StmtRes) metaparams(table map[interfaces.Func]types.Value, res engine
}
if x.metaExprPtr == nil {
return fmt.Errorf("%w: %T", ErrFuncPointerNil, obj)
return nil, fmt.Errorf("%w: %T", ErrFuncPointerNil, obj)
}
v, exists := table[x.metaExprPtr]
if !exists {
return fmt.Errorf("%w: %T", ErrTableNoValue, obj)
return nil, fmt.Errorf("%w: %T", ErrTableNoValue, obj)
}
switch p := strings.ToLower(x.Property); p {
// TODO: we could add these fields dynamically if we were fancy!
case "noop":
meta.Noop = v.Bool() // must not panic
apply = append(apply, func(res engine.Res) {
res.MetaParams().Noop = v.Bool() // must not panic
})
case "retry":
x := v.Int() // must not panic
// TODO: check that it doesn't overflow
meta.Retry = int16(x)
apply = append(apply, func(res engine.Res) {
res.MetaParams().Retry = int16(x)
})
case "retryreset":
meta.RetryReset = v.Bool() // must not panic
apply = append(apply, func(res engine.Res) {
res.MetaParams().RetryReset = v.Bool() // must not panic
})
case "delay":
x := v.Int() // must not panic
// TODO: check that it isn't signed
meta.Delay = uint64(x)
apply = append(apply, func(res engine.Res) {
res.MetaParams().Delay = uint64(x)
})
case "poll":
x := v.Int() // must not panic
// TODO: check that it doesn't overflow and isn't signed
meta.Poll = uint32(x)
apply = append(apply, func(res engine.Res) {
res.MetaParams().Poll = uint32(x)
})
case "limit": // rate.Limit
x := v.Float() // must not panic
meta.Limit = rate.Limit(x)
apply = append(apply, func(res engine.Res) {
res.MetaParams().Limit = rate.Limit(x)
})
case "burst":
x := v.Int() // must not panic
// TODO: check that it doesn't overflow
meta.Burst = int(x)
apply = append(apply, func(res engine.Res) {
res.MetaParams().Burst = int(x)
})
case "reset":
meta.Reset = v.Bool() // must not panic
apply = append(apply, func(res engine.Res) {
res.MetaParams().Reset = v.Bool() // must not panic
})
case "sema": // []string
values := []string{}
@@ -1102,65 +1340,126 @@ func (obj *StmtRes) metaparams(table map[interfaces.Func]types.Value, res engine
s := x.Str() // must not panic
values = append(values, s)
}
meta.Sema = values
apply = append(apply, func(res engine.Res) {
res.MetaParams().Sema = values
})
case "rewatch":
meta.Rewatch = v.Bool() // must not panic
apply = append(apply, func(res engine.Res) {
res.MetaParams().Rewatch = v.Bool() // must not panic
})
case "realize":
meta.Realize = v.Bool() // must not panic
apply = append(apply, func(res engine.Res) {
res.MetaParams().Realize = v.Bool() // must not panic
})
case "dollar":
meta.Dollar = v.Bool() // must not panic
apply = append(apply, func(res engine.Res) {
res.MetaParams().Dollar = v.Bool() // must not panic
})
case "hidden":
apply = append(apply, func(res engine.Res) {
res.MetaParams().Hidden = v.Bool() // must not panic
})
case "export": // []string
values := []string{}
for _, x := range v.List() { // must not panic
s := x.Str() // must not panic
values = append(values, s)
}
apply = append(apply, func(res engine.Res) {
res.MetaParams().Export = values
})
case "reverse":
if rm != nil {
rm.Disabled = !v.Bool() // must not panic
}
apply = append(apply, func(res engine.Res) {
r, ok := res.(engine.ReversibleRes)
if !ok {
return
}
// *engine.ReversibleMeta
rm := r.ReversibleMeta() // get current values
rm.Disabled = !v.Bool() // must not panic
r.SetReversibleMeta(rm) // set
})
case "autoedge":
if aem != nil {
apply = append(apply, func(res engine.Res) {
r, ok := res.(engine.EdgeableRes)
if !ok {
return
}
// *engine.AutoEdgeMeta
aem := r.AutoEdgeMeta() // get current values
aem.Disabled = !v.Bool() // must not panic
}
r.SetAutoEdgeMeta(aem) // set
})
case "autogroup":
if agm != nil {
apply = append(apply, func(res engine.Res) {
r, ok := res.(engine.GroupableRes)
if !ok {
return
}
// *engine.AutoGroupMeta
agm := r.AutoGroupMeta() // get current values
agm.Disabled = !v.Bool() // must not panic
}
r.SetAutoGroupMeta(agm) // set
})
case MetaField:
if val, exists := v.Struct()["noop"]; exists {
meta.Noop = val.Bool() // must not panic
apply = append(apply, func(res engine.Res) {
res.MetaParams().Noop = val.Bool() // must not panic
})
}
if val, exists := v.Struct()["retry"]; exists {
x := val.Int() // must not panic
// TODO: check that it doesn't overflow
meta.Retry = int16(x)
apply = append(apply, func(res engine.Res) {
res.MetaParams().Retry = int16(x)
})
}
if val, exists := v.Struct()["retryreset"]; exists {
meta.RetryReset = val.Bool() // must not panic
apply = append(apply, func(res engine.Res) {
res.MetaParams().RetryReset = val.Bool() // must not panic
})
}
if val, exists := v.Struct()["delay"]; exists {
x := val.Int() // must not panic
// TODO: check that it isn't signed
meta.Delay = uint64(x)
apply = append(apply, func(res engine.Res) {
res.MetaParams().Delay = uint64(x)
})
}
if val, exists := v.Struct()["poll"]; exists {
x := val.Int() // must not panic
// TODO: check that it doesn't overflow and isn't signed
meta.Poll = uint32(x)
apply = append(apply, func(res engine.Res) {
res.MetaParams().Poll = uint32(x)
})
}
if val, exists := v.Struct()["limit"]; exists {
x := val.Float() // must not panic
meta.Limit = rate.Limit(x)
apply = append(apply, func(res engine.Res) {
res.MetaParams().Limit = rate.Limit(x)
})
}
if val, exists := v.Struct()["burst"]; exists {
x := val.Int() // must not panic
// TODO: check that it doesn't overflow
meta.Burst = int(x)
apply = append(apply, func(res engine.Res) {
res.MetaParams().Burst = int(x)
})
}
if val, exists := v.Struct()["reset"]; exists {
meta.Reset = val.Bool() // must not panic
apply = append(apply, func(res engine.Res) {
res.MetaParams().Reset = val.Bool() // must not panic
})
}
if val, exists := v.Struct()["sema"]; exists {
values := []string{}
@@ -1168,44 +1467,90 @@ func (obj *StmtRes) metaparams(table map[interfaces.Func]types.Value, res engine
s := x.Str() // must not panic
values = append(values, s)
}
meta.Sema = values
apply = append(apply, func(res engine.Res) {
res.MetaParams().Sema = values
})
}
if val, exists := v.Struct()["rewatch"]; exists {
meta.Rewatch = val.Bool() // must not panic
apply = append(apply, func(res engine.Res) {
res.MetaParams().Rewatch = val.Bool() // must not panic
})
}
if val, exists := v.Struct()["realize"]; exists {
meta.Realize = val.Bool() // must not panic
apply = append(apply, func(res engine.Res) {
res.MetaParams().Realize = val.Bool() // must not panic
})
}
if val, exists := v.Struct()["dollar"]; exists {
meta.Dollar = val.Bool() // must not panic
apply = append(apply, func(res engine.Res) {
res.MetaParams().Dollar = val.Bool() // must not panic
})
}
if val, exists := v.Struct()["reverse"]; exists && rm != nil {
rm.Disabled = !val.Bool() // must not panic
if val, exists := v.Struct()["hidden"]; exists {
apply = append(apply, func(res engine.Res) {
res.MetaParams().Hidden = val.Bool() // must not panic
})
}
if val, exists := v.Struct()["autoedge"]; exists && aem != nil {
aem.Disabled = !val.Bool() // must not panic
if val, exists := v.Struct()["export"]; exists {
values := []string{}
for _, x := range val.List() { // must not panic
s := x.Str() // must not panic
values = append(values, s)
}
apply = append(apply, func(res engine.Res) {
res.MetaParams().Export = values
})
}
if val, exists := v.Struct()["autogroup"]; exists && agm != nil {
agm.Disabled = !val.Bool() // must not panic
if val, exists := v.Struct()["reverse"]; exists {
apply = append(apply, func(res engine.Res) {
r, ok := res.(engine.ReversibleRes)
if !ok {
return
}
// *engine.ReversibleMeta
rm := r.ReversibleMeta() // get current values
rm.Disabled = !val.Bool() // must not panic
r.SetReversibleMeta(rm) // set
})
}
if val, exists := v.Struct()["autoedge"]; exists {
apply = append(apply, func(res engine.Res) {
r, ok := res.(engine.EdgeableRes)
if !ok {
return
}
// *engine.AutoEdgeMeta
aem := r.AutoEdgeMeta() // get current values
aem.Disabled = !val.Bool() // must not panic
r.SetAutoEdgeMeta(aem) // set
})
}
if val, exists := v.Struct()["autogroup"]; exists {
apply = append(apply, func(res engine.Res) {
r, ok := res.(engine.GroupableRes)
if !ok {
return
}
// *engine.AutoGroupMeta
agm := r.AutoGroupMeta() // get current values
agm.Disabled = !val.Bool() // must not panic
r.SetAutoGroupMeta(agm) // set
})
}
default:
return fmt.Errorf("unknown property: %s", p)
return nil, fmt.Errorf("unknown property: %s", p)
}
}
res.SetMetaParams(meta) // set it!
if r, ok := res.(engine.ReversibleRes); ok {
r.SetReversibleMeta(rm) // set
}
if r, ok := res.(engine.EdgeableRes); ok {
r.SetAutoEdgeMeta(aem) // set
}
if r, ok := res.(engine.GroupableRes); ok {
r.SetAutoGroupMeta(agm) // set
fn := func(res engine.Res) {
for _, f := range apply {
f(res)
}
}
return nil
return fn, nil
}
// StmtResContents is the interface that is met by the resource contents. Look
@@ -1816,6 +2161,8 @@ func (obj *StmtResMeta) Init(data *interfaces.Data) error {
case "rewatch":
case "realize":
case "dollar":
case "hidden":
case "export":
case "reverse":
case "autoedge":
case "autogroup":
@@ -2033,6 +2380,12 @@ func (obj *StmtResMeta) TypeCheck(kind string) ([]*interfaces.UnificationInvaria
case "dollar":
typExpr = types.TypeBool
case "hidden":
typExpr = types.TypeBool
case "export":
typExpr = types.TypeListStr
case "reverse":
// TODO: We might want more parameters about how to reverse.
typExpr = types.TypeBool
@@ -2049,7 +2402,7 @@ func (obj *StmtResMeta) TypeCheck(kind string) ([]*interfaces.UnificationInvaria
// FIXME: allow partial subsets of this struct, and in any order
// FIXME: we might need an updated unification engine to do this
wrap := func(reverse *types.Type) *types.Type {
return types.NewType(fmt.Sprintf("struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse %s; autoedge bool; autogroup bool}", reverse.String()))
return types.NewType(fmt.Sprintf("struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse %s; autoedge bool; autogroup bool}", reverse.String()))
}
// TODO: We might want more parameters about how to reverse.
typExpr = wrap(types.TypeBool)
@@ -2104,6 +2457,198 @@ func (obj *StmtResMeta) Graph(env *interfaces.Env) (*pgraph.Graph, error) {
return graph, nil
}
// StmtResCollect represents hidden resource collection data in the resource.
// This does not satisfy the Stmt interface.
type StmtResCollect struct {
//Textarea
data *interfaces.Data
Kind string
Value interfaces.Expr
valuePtr interfaces.Func // ptr for table lookup
}
// String returns a short representation of this statement.
func (obj *StmtResCollect) String() string {
// TODO: add .String() for Condition and Value
return fmt.Sprintf("rescollect(%s)", obj.Kind)
}
// Apply is a general purpose iterator method that operates on any AST node. It
// is not used as the primary AST traversal function because it is less readable
// and easy to reason about than manually implementing traversal for each node.
// Nevertheless, it is a useful facility for operations that might only apply to
// a select number of node types, since they won't need extra noop iterators...
func (obj *StmtResCollect) Apply(fn func(interfaces.Node) error) error {
if err := obj.Value.Apply(fn); err != nil {
return err
}
return fn(obj)
}
// Init initializes this branch of the AST, and returns an error if it fails to
// validate.
func (obj *StmtResCollect) Init(data *interfaces.Data) error {
obj.data = data
//obj.Textarea.Setup(data)
if obj.Kind == "" {
return fmt.Errorf("res kind is empty")
}
return obj.Value.Init(data)
}
// Interpolate returns a new node (aka a copy) once it has been expanded. This
// generally increases the size of the AST when it is used. It calls Interpolate
// on any child elements and builds the new node with those new node contents.
// This interpolate is different It is different from the interpolate found in
// the Expr and Stmt interfaces because it returns a different type as output.
func (obj *StmtResCollect) Interpolate() (StmtResContents, error) {
interpolated, err := obj.Value.Interpolate()
if err != nil {
return nil, err
}
return &StmtResCollect{
//Textarea: obj.Textarea,
data: obj.data,
Kind: obj.Kind,
Value: interpolated,
}, nil
}
// Copy returns a light copy of this struct. Anything static will not be copied.
func (obj *StmtResCollect) Copy() (StmtResContents, error) {
copied := false
value, err := obj.Value.Copy()
if err != nil {
return nil, err
}
if value != obj.Value { // must have been copied, or pointer would be same
copied = true
}
if !copied { // it's static
return obj, nil
}
return &StmtResCollect{
//Textarea: obj.Textarea,
data: obj.data,
Kind: obj.Kind,
Value: value,
}, nil
}
// Ordering returns a graph of the scope ordering that represents the data flow.
// This can be used in SetScope so that it knows the correct order to run it in.
func (obj *StmtResCollect) Ordering(produces map[string]interfaces.Node) (*pgraph.Graph, map[interfaces.Node]string, error) {
graph, err := pgraph.NewGraph("ordering")
if err != nil {
return nil, nil, err
}
graph.AddVertex(obj)
// additional constraint...
edge := &pgraph.SimpleEdge{Name: "stmtrescollectvalue"}
graph.AddEdge(obj.Value, obj, edge) // prod -> cons
cons := make(map[interfaces.Node]string)
nodes := []interfaces.Expr{obj.Value}
for _, node := range nodes {
g, c, err := node.Ordering(produces)
if err != nil {
return nil, nil, err
}
graph.AddGraph(g) // add in the child graph
for k, v := range c { // c is consumes
x, exists := cons[k]
if exists && v != x {
return nil, nil, fmt.Errorf("consumed value is different, got `%+v`, expected `%+v`", x, v)
}
cons[k] = v // add to map
n, exists := produces[v]
if !exists {
continue
}
edge := &pgraph.SimpleEdge{Name: "stmtrescollect"}
graph.AddEdge(n, k, edge)
}
}
return graph, cons, nil
}
// SetScope stores the scope for later use in this resource and its children,
// which it propagates this downwards to.
func (obj *StmtResCollect) SetScope(scope *interfaces.Scope) error {
if err := obj.Value.SetScope(scope, map[string]interfaces.Expr{}); err != nil {
return err
}
return nil
}
// TypeCheck returns the list of invariants that this node produces. It does so
// recursively on any children elements that exist in the AST, and returns the
// collection to the caller. It calls TypeCheck for child statements, and
// Infer/Check for child expressions. It is different from the TypeCheck method
// found in the Stmt interface because it adds an input parameter.
func (obj *StmtResCollect) TypeCheck(kind string) ([]*interfaces.UnificationInvariant, error) {
typ, invariants, err := obj.Value.Infer()
if err != nil {
return nil, err
}
//invars, err := obj.Value.Check(typ) // don't call this here!
if !engine.IsKind(kind) {
return nil, fmt.Errorf("invalid resource kind: %s", kind)
}
typExpr := types.NewType(funcs.CollectFuncOutType)
if typExpr == nil {
return nil, fmt.Errorf("unexpected nil type")
}
// regular scenario
invar := &interfaces.UnificationInvariant{
Node: obj,
Expr: obj.Value,
Expect: typExpr,
Actual: typ,
}
invariants = append(invariants, invar)
return invariants, nil
}
// Graph returns the reactive function graph which is expressed by this node. It
// includes any vertices produced by this node, and the appropriate edges to any
// vertices that are produced by its children. Nodes which fulfill the Expr
// interface directly produce vertices (and possible children) where as nodes
// that fulfill the Stmt interface do not produces vertices, where as their
// children might. It is interesting to note that nothing directly adds an edge
// to the resources created, but rather, once all the values (expressions) with
// no outgoing edges have produced at least a single value, then the resources
// know they're able to be built.
func (obj *StmtResCollect) Graph(env *interfaces.Env) (*pgraph.Graph, error) {
graph, err := pgraph.NewGraph("rescollect")
if err != nil {
return nil, err
}
g, f, err := obj.Value.Graph(env)
if err != nil {
return nil, err
}
graph.AddGraph(g)
obj.valuePtr = f
return graph, nil
}
// StmtEdge is a representation of a dependency. It also supports send/recv.
// Edges represents that the first resource (Kind/Name) listed in the
// EdgeHalfList should happen in the resource graph *before* the next resource

493
lang/core/collect.go Normal file
View File

@@ -0,0 +1,493 @@
// Mgmt
// Copyright (C) James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//
// Additional permission under GNU GPL version 3 section 7
//
// If you modify this program, or any covered work, by linking or combining it
// with embedded mcl code and modules (and that the embedded mcl code and
// modules which link with this program, contain a copy of their source code in
// the authoritative form) containing parts covered by the terms of any other
// license, the licensors of this program grant you additional permission to
// convey the resulting work. Furthermore, the licensors of this program grant
// the original author, James Shubin, additional permission to update this
// additional permission if he deems it necessary to achieve the goals of this
// additional permission.
package core
import (
"context"
"fmt"
"github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/lang/funcs"
"github.com/purpleidea/mgmt/lang/interfaces"
"github.com/purpleidea/mgmt/lang/types"
"github.com/purpleidea/mgmt/util/errwrap"
)
const (
// CollectFuncName is the name this function is registered as. This
// starts with an underscore so that it cannot be used from the lexer.
CollectFuncName = funcs.CollectFuncName
// arg names...
collectArgNameKind = "kind"
collectArgNameNames = "names"
//collectFuncInType = "[]struct{kind str; name str; host str}"
//collectFuncInFieldKind = "kind" // must match above struct field
collectFuncInFieldName = funcs.CollectFuncInFieldName
collectFuncInFieldHost = funcs.CollectFuncInFieldHost
// collectFuncInType is the most complex of the three possible input
// types. The other two possible ones are str or []str.
collectFuncInType = funcs.CollectFuncInType // "[]struct{name str; host str}"
collectFuncOutFieldName = funcs.CollectFuncOutFieldName
collectFuncOutFieldHost = funcs.CollectFuncOutFieldHost
collectFuncOutFieldData = funcs.CollectFuncOutFieldData
// collectFuncOutStruct is the struct type that we return a list of.
collectFuncOutStruct = funcs.CollectFuncOutStruct
// collectFuncOutType is the expected return type, the data field is an
// encoded resource blob.
// XXX: Once structs can be real map keys in mcl, could this instead be:
// map{struct{name str; host str}: str} // key => $data (efficiency!)
collectFuncOutType = funcs.CollectFuncOutType // "[]struct{name str; host str; data str}"
)
func init() {
funcs.Register(CollectFuncName, func() interfaces.Func { return &CollectFunc{} }) // must register the func and name
}
var _ interfaces.InferableFunc = &CollectFunc{} // ensure it meets this expectation
// CollectFunc is a special internal function which gets given information about
// incoming resource collection data. For example, to collect, that "pseudo
// resource" will need to know what resource "kind" it's collecting, the names
// of those resources, and the corresponding hostnames that they are getting the
// data from. With that three-tuple of data, it can pull all of that from etcd
// and pass it into a hidden resource body field so that the collect "pseudo
// resource" can use it to build the exported resource!
//
// The "kind" comes in as the first arg. The second arg (in its complex form) is
// []struct{name str; host str} is what the end user is _asking_ this function
// for.
// TODO: We could have a second version of this collect function which takes a
// single arg which receives []struct{kind str; name str; host str} which would
// let us write a truly dynamic collector. It's unlikely we want to allow this
// in most cases because it lets you play type games since the field name in one
// resource kind might be a different type in another.
type CollectFunc struct {
// Type is the type of the second arg that we receive. (When known.)
Type *types.Type
init *interfaces.Init
last types.Value // last value received to use for diff
args []types.Value
kind string
result types.Value // last calculated output
watchChan chan error
}
// String returns a simple name for this function. This is needed so this struct
// can satisfy the pgraph.Vertex interface.
func (obj *CollectFunc) String() string {
return CollectFuncName
}
// ArgGen returns the Nth arg name for this function.
func (obj *CollectFunc) ArgGen(index int) (string, error) {
seq := []string{collectArgNameKind, collectArgNameNames}
if l := len(seq); index >= l {
return "", fmt.Errorf("index %d exceeds arg length of %d", index, l)
}
return seq[index], nil
}
// helper
func (obj *CollectFunc) sig() *types.Type {
arg := "?1"
if obj.Type != nil {
arg = obj.Type.String()
}
return types.NewType(fmt.Sprintf(
"func(%s str, %s %s) %s",
collectArgNameKind,
collectArgNameNames,
arg,
collectFuncOutType,
))
}
// check determines if our arg type is valid.
func (obj *CollectFunc) check(typ *types.Type) error {
if typ.Cmp(types.TypeStr) == nil {
return nil
}
if typ.Cmp(types.TypeListStr) == nil {
return nil
}
if typ.Cmp(types.NewType(collectFuncInType)) == nil {
return nil
}
return fmt.Errorf("unexpected type: %s", typ.String())
}
// FuncInfer takes partial type and value information from the call site of this
// function so that it can build an appropriate type signature for it. The type
// signature may include unification variables.
func (obj *CollectFunc) FuncInfer(partialType *types.Type, partialValues []types.Value) (*types.Type, []*interfaces.UnificationInvariant, error) {
// There are many variants which we could allow... These variants are
// what the user specifies in the $name field when they collect. They
// will often get the third form from helper functions that filter the
// data from the world graph, so that they can programmatically match
// using our mcl language rather than hard-coding a mini matcher lang.
//
// XXX: Do we want to allow all these variants?
//
// func(str, str) out # matches all hostnames
// OR
// func(str, []str) out # matches all hostnames
// OR
// func(str, []struct{name str; host str} ) out # matches exact tuples or all hostnames if host is ""
// SO
// func(str, ?1) out
// AND
// out = []struct{name str; host str; data str} # it could have kind too, but not needed right now
//
// NOTE: map[str]str (name => host) is NOT a good choice because even
// though we nominally have one host exporting a given name, it's valid
// to have that same name come from more than one host and for them to
// be compatible, almost like an "exported resources redundancy".
//
// NOTE map[str][]str (name => []host) is sensible, BUT it makes it
// harder to express that we want "every host", which we can do with the
// struct variant above by having host be the empty string. It's also
// easier for the mcl programmer to understand that variant.
if l := 2; len(partialValues) != l {
return nil, nil, fmt.Errorf("function must have %d args", l)
}
if err := partialValues[0].Type().Cmp(types.TypeStr); err != nil {
return nil, nil, errwrap.Wrapf(err, "function arg kind must be a str")
}
kind := partialValues[0].Str() // must not panic
if kind == "" {
return nil, nil, fmt.Errorf("function must not have an empty kind arg")
}
if !engine.IsKind(kind) {
return nil, nil, fmt.Errorf("invalid resource kind: %s", kind)
}
// If second arg is one of what we're expecting, then we are solved!
if len(partialType.Ord) == 2 && partialType.Map[partialType.Ord[1]] != nil {
typ := partialType.Map[partialType.Ord[1]]
if err := obj.check(typ); err == nil {
obj.Type = typ // success!
}
}
return obj.sig(), []*interfaces.UnificationInvariant{}, nil
}
// Build is run to turn the polymorphic, undetermined function, into the
// specific statically typed version. It is usually run after Unify completes,
// and must be run before Info() and any of the other Func interface methods are
// used. This function is idempotent, as long as the arg isn't changed between
// runs.
func (obj *CollectFunc) Build(typ *types.Type) (*types.Type, error) {
// typ is the KindFunc signature we're trying to build...
if typ.Kind != types.KindFunc {
return nil, fmt.Errorf("input type must be of kind func")
}
if len(typ.Ord) != 2 {
return nil, fmt.Errorf("the collect function needs two args")
}
tStr, exists := typ.Map[typ.Ord[0]]
if !exists || tStr == nil {
return nil, fmt.Errorf("first arg must be specified")
}
if tStr.Cmp(types.TypeStr) != nil {
return nil, fmt.Errorf("first arg must be a str")
}
tArg, exists := typ.Map[typ.Ord[1]]
if !exists || tArg == nil {
return nil, fmt.Errorf("second arg must be specified")
}
if err := obj.check(tArg); err != nil {
return nil, err
}
obj.Type = tArg // store it!
return obj.sig(), nil
}
// Copy is implemented so that the obj.Type value is not lost if we copy this
// function. That value is learned during FuncInfer, and previously would have
// been lost by the time we used it in Build.
func (obj *CollectFunc) Copy() interfaces.Func {
return &CollectFunc{
Type: obj.Type, // don't copy because we use this after unification
init: obj.init, // likely gets overwritten anyways
}
}
// Validate tells us if the input struct takes a valid form.
func (obj *CollectFunc) Validate() error {
if obj.Type == nil {
return fmt.Errorf("the Type is unknown")
}
if err := obj.check(obj.Type); err != nil {
return err
}
return nil
}
// Info returns some static info about itself. Build must be called before this
// will return correct data.
func (obj *CollectFunc) Info() *interfaces.Info {
// Since this function implements FuncInfer we want sig to return nil to
// avoid an accidental return of unification variables when we should be
// getting them from FuncInfer, and not from here. (During unification!)
var sig *types.Type
if obj.Type != nil && obj.check(obj.Type) == nil {
sig = obj.sig() // helper
}
return &interfaces.Info{
Pure: false,
Memo: false,
Sig: sig,
Err: obj.Validate(),
}
}
// Init runs some startup code for this function.
func (obj *CollectFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.watchChan = make(chan error) // XXX: sender should close this, but did I implement that part yet???
return nil
}
// Stream returns the changing values that this func has over time.
func (obj *CollectFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
ctx, cancel := context.WithCancel(ctx)
defer cancel() // important so that we cleanup the watch when exiting
for {
select {
// TODO: should this first chan be run as a priority channel to
// avoid some sort of glitch? is that even possible? can our
// hostname check with reality (below) fix that?
case input, ok := <-obj.init.Input:
if !ok {
obj.init.Input = nil // don't infinite loop back
continue // no more inputs, but don't return!
}
//if err := input.Type().Cmp(obj.Info().Sig.Input); err != nil {
// return errwrap.Wrapf(err, "wrong function input")
//}
if obj.last != nil && input.Cmp(obj.last) == nil {
continue // value didn't change, skip it
}
obj.last = input // store for next
args, err := interfaces.StructToCallableArgs(input) // []types.Value, error)
if err != nil {
return err
}
obj.args = args
kind := args[0].Str()
if kind == "" {
return fmt.Errorf("can't use an empty kind")
}
if obj.init.Debug {
obj.init.Logf("kind: %s", kind)
}
// TODO: support changing the key over time?
if obj.kind == "" {
obj.kind = kind // store it
var err error
// Don't send a value right away, wait for the
// first Watch startup event to get one!
obj.watchChan, err = obj.init.World.ResWatch(ctx, obj.kind) // watch for var changes
if err != nil {
return err
}
} else if obj.kind != kind {
return fmt.Errorf("can't change kind, previously: `%s`", obj.kind)
}
continue // we get values on the watch chan, not here!
case err, ok := <-obj.watchChan:
if !ok { // closed
// XXX: if we close, perhaps the engine is
// switching etcd hosts and we should retry?
// maybe instead we should get an "etcd
// reconnect" signal, and the lang will restart?
return nil
}
if err != nil {
return errwrap.Wrapf(err, "channel watch failed on `%s`", obj.kind)
}
result, err := obj.Call(ctx, obj.args) // get the value...
if err != nil {
return err
}
// if the result is still the same, skip sending an update...
if obj.result != nil && result.Cmp(obj.result) == nil {
continue // result didn't change
}
obj.result = result // store new result
case <-ctx.Done():
return nil
}
select {
case obj.init.Output <- obj.result: // send
// pass
case <-ctx.Done():
return nil
}
}
}
// Call this function with the input args and return the value if it is possible
// to do so at this time. This was previously getValue which gets the value
// we're looking for.
func (obj *CollectFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) {
kind := args[0].Str()
if kind == "" {
return nil, fmt.Errorf("resource kind is empty")
}
if !engine.IsKind(kind) {
return nil, fmt.Errorf("invalid resource kind: %s", kind)
}
filters := []*engine.ResFilter{}
arg := args[1]
typ := arg.Type()
// Can be one of: str, []str, []struct{name str; host str} for matching.
if typ.Cmp(types.TypeStr) == nil { // it must be a name only
filter := &engine.ResFilter{
Kind: kind,
Name: arg.Str(),
Host: "", // any
}
filters = append(filters, filter)
}
if typ.Cmp(types.TypeListStr) == nil {
for _, x := range arg.List() {
filter := &engine.ResFilter{
Kind: kind,
Name: x.Str(),
Host: "", // any
}
filters = append(filters, filter)
}
}
if typ.Cmp(types.NewType(collectFuncInType)) == nil {
for _, x := range arg.List() {
st, ok := x.(*types.StructValue)
if !ok {
// programming error
return nil, fmt.Errorf("value is not a struct")
}
name, exists := st.Lookup(collectFuncInFieldName)
if !exists {
// programming error?
return nil, fmt.Errorf("name field is missing")
}
host, exists := st.Lookup(collectFuncInFieldHost)
if !exists {
// programming error?
return nil, fmt.Errorf("host field is missing")
}
filter := &engine.ResFilter{
Kind: kind,
Name: name.Str(),
Host: host.Str(),
}
filters = append(filters, filter)
}
}
resOutput, err := obj.init.World.ResCollect(ctx, filters)
if err != nil {
return nil, err
}
list := types.NewList(obj.Info().Sig.Out) // collectFuncOutType
for _, x := range resOutput {
// programming error if any of these error...
if x.Kind != kind {
return nil, fmt.Errorf("unexpected kind: %s", x.Kind)
}
if x.Name == "" {
return nil, fmt.Errorf("unexpected empty name")
}
if x.Host == "" {
return nil, fmt.Errorf("unexpected empty host")
}
if x.Data == "" {
return nil, fmt.Errorf("unexpected empty data")
}
name := &types.StrValue{V: x.Name}
host := &types.StrValue{V: x.Host} // from
data := &types.StrValue{V: x.Data}
st := types.NewStruct(types.NewType(collectFuncOutStruct))
if err := st.Set(collectFuncOutFieldName, name); err != nil {
return nil, errwrap.Wrapf(err, "struct could not add field `%s`, val: `%s`", collectFuncOutFieldName, name)
}
if err := st.Set(collectFuncOutFieldHost, host); err != nil {
return nil, errwrap.Wrapf(err, "struct could not add field `%s`, val: `%s`", collectFuncOutFieldHost, host)
}
if err := st.Set(collectFuncOutFieldData, data); err != nil {
return nil, errwrap.Wrapf(err, "struct could not add field `%s`, val: `%s`", collectFuncOutFieldData, data)
}
if err := list.Add(st); err != nil { // XXX: improve perf of Add
return nil, err
}
}
return list, nil // put struct into interface type
}

View File

@@ -90,6 +90,33 @@ const (
// as. This starts with an underscore so that it cannot be used from the
// lexer.
StructLookupOptionalFuncName = "_struct_lookup_optional"
// CollectFuncName is the name this function is registered as. This
// starts with an underscore so that it cannot be used from the lexer.
CollectFuncName = "_collect"
// CollectFuncInFieldName is the name of the name field in the struct.
CollectFuncInFieldName = "name"
// CollectFuncInFieldHost is the name of the host field in the struct.
CollectFuncInFieldHost = "host"
// CollectFuncInType is the most complex of the three possible input
// types. The other two possible ones are str or []str.
CollectFuncInType = "[]struct{" + CollectFuncInFieldName + " str; " + CollectFuncInFieldHost + " str}"
// CollectFuncOutFieldName is the name of the name field in the struct.
CollectFuncOutFieldName = "name"
// CollectFuncOutFieldHost is the name of the host field in the struct.
CollectFuncOutFieldHost = "host"
// CollectFuncOutFieldData is the name of the data field in the struct.
CollectFuncOutFieldData = "data"
// CollectFuncOutStruct is the struct type that we return a list of.
CollectFuncOutStruct = "struct{" + CollectFuncOutFieldName + " str; " + CollectFuncOutFieldHost + " str; " + CollectFuncOutFieldData + " str}"
// CollectFuncOutType is the expected return type, the data field is an
// encoded resource blob.
CollectFuncOutType = "[]" + CollectFuncOutStruct
)
// registeredFuncs is a global map of all possible funcs which can be used. You

View File

@@ -42,11 +42,83 @@ import (
"github.com/purpleidea/mgmt/util/errwrap"
)
// Interpreter is a base struct for handling the Interpret operation. There is
// nothing stateful here, you don't need to preserve this between runs.
type Interpreter struct {
// Debug represents if we're running in debug mode or not.
Debug bool
// Logf is a logger which should be used.
Logf func(format string, v ...interface{})
// lookup stores the resources found by kind and name. It doesn't store
// any resources which are hidden since those could have duplicates.
// format: map[kind]map[name]Res
lookup map[engine.ResPtrUID]engine.Res
// lookupHidden stores the hidden resources found by kind and name. It
// doesn't store any normal resources which are not hidden.
// format formerly: map[kind]map[name]Res
lookupHidden map[engine.ResPtrUID][]engine.Res
// receive doesn't need a special extension for hidden resources since
// they can't send, only recv, and senders can't have incompatible dupes
// format formerly: map[kind]map[name]map[field]*Send
receive map[engine.ResPtrUID]map[string]*engine.Send
// export tracks the unique combinations we export. (kind, name, host)
export map[engine.ResDelete]struct{}
}
// Interpret runs the program and outputs a generated resource graph. It
// requires an AST, and the table of values required to populate that AST. Type
// unification, and earlier steps should obviously be run first so that you can
// actually get a useful resource graph out of this instead of an error!
func Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgraph.Graph, error) {
func (obj *Interpreter) Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgraph.Graph, error) {
// build the kind,name -> res mapping
obj.lookup = make(map[engine.ResPtrUID]engine.Res)
obj.lookupHidden = make(map[engine.ResPtrUID][]engine.Res)
// build the send/recv mapping
obj.receive = make(map[engine.ResPtrUID]map[string]*engine.Send)
// build the exports
obj.export = make(map[engine.ResDelete]struct{})
// Remember that if a resource is "Hidden", then make sure it is NOT
// sending to anyone, since it would never produce a value. It can
// receive values, since those might be used during export.
//
// Remember that if a resource is "Hidden", then it may exist alongside
// another resource with the same kind+name without triggering the
// "inequivalent duplicate resource" style of errors. Of course multiple
// hidden resources with the same kind+name may also exist
// simultaneously, just keep in mind that it means that an edge pointing
// to a particular kind+name now actually may point to more than one!
//
// This is needed because of two reasons: (1) because a regular resource
// will likely never be compatible with a "Hidden" and "Exported"
// resource because one resource might have the Meta:hidden and
// Meta:export params and one might not; (2) because you may wish to
// have two different hidden resources of different params which export
// to different hosts, which means they would likely not be compatible.
//
// Since we can have more than one "Hidden" and "Exported" resource with
// the same name and kind, it's important that we don't export that data
// to the same (kind, name, host) location since we'd have multiple
// writers to the same key in our World store. We could consider
// checking for compatibility, but that's more difficult to achieve. The
// "any" host is treated as a special key, which punts this duplicate
// problem to being a collection problem. (Which could happen with two
// different hosts each exporting a different value to a single host.)
//
// Remember that the resource graph that this function returns, may now
// contain two or more identically named kind+name resources, if at
// least one of them is "Hidden". If they are entirely identical, then
// it's acceptable to merge them. They may _not_ be merged with the
// CompatibleRes API, since on resource "collection" a param may be
// changed which could conceivably be incompatible with how we ran the
// AdaptCmp API when we merged them.
output, err := ast.Output(table) // contains resList, edgeList, etc...
if err != nil {
return nil, err
@@ -57,22 +129,53 @@ func Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgr
return nil, errwrap.Wrapf(err, "could not create new graph")
}
var lookup = make(map[string]map[string]engine.Res) // map[kind]map[name]Res
// build the send/recv mapping; format: map[kind]map[name]map[field]*Send
var receive = make(map[string]map[string]map[string]*engine.Send)
for _, res := range output.Resources {
kind := res.Kind()
name := res.Name()
if _, exists := lookup[kind]; !exists {
lookup[kind] = make(map[string]engine.Res)
receive[kind] = make(map[string]map[string]*engine.Send)
}
if _, exists := receive[kind][name]; !exists {
receive[kind][name] = make(map[string]*engine.Send)
meta := res.MetaParams()
ruid := engine.ResPtrUID{
Kind: kind,
Name: name,
}
if r, exists := lookup[kind][name]; exists { // found same name
for _, host := range meta.Export {
uid := engine.ResDelete{
Kind: kind,
Name: name,
Host: host,
}
if _, exists := obj.export[uid]; exists {
return nil, fmt.Errorf("duplicate export: %s to %s", res, host)
}
obj.export[uid] = struct{}{}
}
if meta.Hidden {
rs := obj.lookupHidden[ruid]
if len(rs) > 0 {
// We only need to check against the last added
// resource since this should be commutative,
// and as we add more they check themselves in.
r := rs[len(rs)-1]
// XXX: If we want to check against the regular
// resources in obj.lookup, then do it here.
// If they're different, then we deduplicate.
if err := engine.ResCmp(r, res); err == nil {
continue
}
}
// add to temporary lookup table
obj.lookupHidden[ruid] = append(obj.lookupHidden[ruid], res)
continue
}
if r, exists := obj.lookup[ruid]; exists { // found same name
// XXX: If we want to check against the special hidden
// resources in obj.lookupHidden, then do it here.
// if the resources support the compatibility API, then
// we can attempt to merge them intelligently...
r1, ok1 := r.(engine.CompatibleRes)
@@ -87,7 +190,7 @@ func Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgr
return nil, errwrap.Wrapf(err, "could not merge duplicate resources")
}
lookup[kind][name] = merged
obj.lookup[ruid] = merged
// they match here, we don't need to test below!
continue
}
@@ -104,79 +207,54 @@ func Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgr
// currently we add the first one that was found...
continue
}
lookup[kind][name] = res // add to temporary lookup table
obj.lookup[ruid] = res // add to temporary lookup table
//graph.AddVertex(res) // do this below once this table is final
}
// ensure all the vertices exist...
for _, m := range lookup {
for _, res := range m {
for _, res := range obj.lookup {
graph.AddVertex(res)
}
for _, rs := range obj.lookupHidden {
for _, res := range rs {
graph.AddVertex(res)
}
}
for _, e := range output.Edges {
var v1, v2 engine.Res
var exists bool
var m map[string]engine.Res
var notify = e.Notify
if m, exists = lookup[e.Kind1]; exists {
v1, exists = m[e.Name1]
for _, edge := range output.Edges {
v1s := obj.lookupAll(edge.Kind1, edge.Name1)
if len(v1s) == 0 {
return nil, fmt.Errorf("edge cannot find resource kind: %s named: `%s`", edge.Kind1, edge.Name1)
}
if !exists {
return nil, fmt.Errorf("edge cannot find resource kind: %s named: `%s`", e.Kind1, e.Name1)
}
if m, exists = lookup[e.Kind2]; exists {
v2, exists = m[e.Name2]
}
if !exists {
return nil, fmt.Errorf("edge cannot find resource kind: %s named: `%s`", e.Kind2, e.Name2)
v2s := obj.lookupAll(edge.Kind2, edge.Name2)
if len(v2s) == 0 {
return nil, fmt.Errorf("edge cannot find resource kind: %s named: `%s`", edge.Kind2, edge.Name2)
}
if existingEdge := graph.FindEdge(v1, v2); existingEdge != nil {
// collate previous Notify signals to this edge with OR
notify = notify || (existingEdge.(*engine.Edge)).Notify
}
edge := &engine.Edge{
Name: fmt.Sprintf("%s -> %s", v1, v2),
Notify: notify,
}
graph.AddEdge(v1, v2, edge) // identical duplicates are ignored
// send recv
if (e.Send == "") != (e.Recv == "") { // xor
return nil, fmt.Errorf("you must specify both send/recv fields or neither")
}
if e.Send == "" || e.Recv == "" { // is there send/recv to do or not?
continue
}
// check for pre-existing send/recv at this key
if existingSend, exists := receive[e.Kind2][e.Name2][e.Recv]; exists {
// ignore identical duplicates
// TODO: does this safe ignore work with duplicate compatible resources?
if existingSend.Res != v1 || existingSend.Key != e.Send {
return nil, fmt.Errorf("resource: `%s` has duplicate receive on: `%s` param", engine.Repr(e.Kind2, e.Name2), e.Recv)
// Make edges pair wise between each two. Normally these loops
// only have one iteration each unless we have Hidden resources.
for _, v1 := range v1s {
for _, v2 := range v2s {
e := obj.makeEdge(graph, v1, v2, edge)
graph.AddEdge(v1, v2, e) // identical duplicates are ignored
}
}
res1, ok := v1.(engine.SendableRes)
if !ok {
return nil, fmt.Errorf("cannot send from resource: %s", engine.Stringer(v1))
// send recv
if (edge.Send == "") != (edge.Recv == "") { // xor
return nil, fmt.Errorf("you must specify both send/recv fields or neither")
}
res2, ok := v2.(engine.RecvableRes)
if !ok {
return nil, fmt.Errorf("cannot recv to resource: %s", engine.Stringer(v2))
if edge.Send == "" || edge.Recv == "" { // is there send/recv to do or not?
continue
}
if err := engineUtil.StructFieldCompat(res1.Sends(), e.Send, res2, e.Recv); err != nil {
return nil, errwrap.Wrapf(err, "cannot send/recv from %s.%s to %s.%s", engine.Stringer(v1), e.Send, engine.Stringer(v2), e.Recv)
for _, v1 := range v1s {
for _, v2 := range v2s {
if err := obj.makeSendRecv(v1, v2, edge); err != nil {
return nil, err
}
}
}
// store mapping for later
receive[e.Kind2][e.Name2][e.Recv] = &engine.Send{Res: res1, Key: e.Send}
}
// we need to first build up a map of all the resources handles, because
@@ -186,12 +264,23 @@ func Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgr
// pre-existing mappings, so we can now set them all at once at the end!
// TODO: do this in a deterministic order
for kind, x := range receive {
for name, recv := range x {
if len(recv) == 0 { // skip empty maps from allocation!
continue
for st, recv := range obj.receive {
kind := st.Kind
name := st.Name
if len(recv) == 0 { // skip empty maps from allocation!
continue
}
if r := obj.lookupRes(kind, name); r != nil {
res, ok := r.(engine.RecvableRes)
if !ok {
return nil, fmt.Errorf("cannot recv to resource: %s", engine.Repr(kind, name))
}
r := lookup[kind][name]
res.SetRecv(recv) // set it!
}
// hidden
for _, r := range obj.lookupHiddenRes(kind, name) {
res, ok := r.(engine.RecvableRes)
if !ok {
return nil, fmt.Errorf("cannot recv to resource: %s", engine.Repr(kind, name))
@@ -208,3 +297,104 @@ func Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgr
return graph, nil
}
// lookupRes is a simple helper function. Returns nil if not found.
func (obj *Interpreter) lookupRes(kind, name string) engine.Res {
ruid := engine.ResPtrUID{
Kind: kind,
Name: name,
}
res, exists := obj.lookup[ruid]
if !exists {
return nil
}
return res
}
// lookupHiddenRes is a simple helper function. Returns any found.
func (obj *Interpreter) lookupHiddenRes(kind, name string) []engine.Res {
ruid := engine.ResPtrUID{
Kind: kind,
Name: name,
}
res, exists := obj.lookupHidden[ruid]
if !exists {
return nil
}
return res
}
// lookupAll is a simple helper function. Returns any found.
func (obj *Interpreter) lookupAll(kind, name string) []pgraph.Vertex {
vs := []pgraph.Vertex{}
if r := obj.lookupRes(kind, name); r != nil {
vs = append(vs, r)
}
for _, r := range obj.lookupHiddenRes(kind, name) {
vs = append(vs, r)
}
return vs
}
// makeEdge is a simple helper function.
func (obj *Interpreter) makeEdge(graph *pgraph.Graph, v1, v2 pgraph.Vertex, edge *interfaces.Edge) *engine.Edge {
var notify = edge.Notify
if existingEdge := graph.FindEdge(v1, v2); existingEdge != nil {
// collate previous Notify signals to this edge with OR
notify = notify || (existingEdge.(*engine.Edge)).Notify
}
return &engine.Edge{
Name: fmt.Sprintf("%s -> %s", v1, v2),
Notify: notify,
}
}
// makeSendRecv is a simple helper function.
func (obj *Interpreter) makeSendRecv(v1, v2 pgraph.Vertex, edge *interfaces.Edge) error {
ruid := engine.ResPtrUID{
Kind: edge.Kind2,
Name: edge.Name2,
}
if _, exists := obj.receive[ruid]; !exists {
obj.receive[ruid] = make(map[string]*engine.Send)
}
// check for pre-existing send/recv at this key
if existingSend, exists := obj.receive[ruid][edge.Recv]; exists {
// ignore identical duplicates
// TODO: does this safe ignore work with duplicate compatible resources?
if existingSend.Res != v1 || existingSend.Key != edge.Send {
return fmt.Errorf("resource: `%s` has duplicate receive on: `%s` param", engine.Repr(edge.Kind2, edge.Name2), edge.Recv)
}
}
if res, ok := v1.(engine.Res); ok && res.MetaParams().Hidden && edge.Send != "" {
return fmt.Errorf("cannot send from hidden resource: %s", engine.Stringer(res))
}
res1, ok := v1.(engine.SendableRes)
if !ok {
return fmt.Errorf("cannot send from resource: %s", engine.Stringer(res1))
}
res2, ok := v2.(engine.RecvableRes)
if !ok {
return fmt.Errorf("cannot recv to resource: %s", engine.Stringer(res2))
}
if err := engineUtil.StructFieldCompat(res1.Sends(), edge.Send, res2, edge.Recv); err != nil {
return errwrap.Wrapf(err, "cannot send/recv from %s.%s to %s.%s", engine.Stringer(res1), edge.Send, engine.Stringer(res2), edge.Recv)
}
// store mapping for later
obj.receive[ruid][edge.Recv] = &engine.Send{Res: res1, Key: edge.Send}
return nil
}

View File

@@ -1415,7 +1415,14 @@ func TestAstFunc2(t *testing.T) {
// run interpret!
table := funcs.Table() // map[interfaces.Func]types.Value
ograph, err := interpret.Interpret(iast, table)
interpreter := &interpret.Interpreter{
Debug: testing.Verbose(), // set via the -test.v flag to `go test`
Logf: func(format string, v ...interface{}) {
logf("interpret: "+format, v...)
},
}
ograph, err := interpreter.Interpret(iast, table)
if (!fail || !failInterpret) && err != nil {
t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: interpret failed with: %+v", index, err)
@@ -2267,7 +2274,14 @@ func TestAstFunc3(t *testing.T) {
// run interpret!
table := funcs.Table() // map[interfaces.Func]types.Value
ograph, err := interpret.Interpret(iast, table)
interpreter := &interpret.Interpreter{
Debug: testing.Verbose(), // set via the -test.v flag to `go test`
Logf: func(format string, v ...interface{}) {
logf("interpret: "+format, v...)
},
}
ograph, err := interpreter.Interpret(iast, table)
if (!fail || !failInterpret) && err != nil {
t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: interpret failed with: %+v", index, err)

View File

@@ -15,6 +15,8 @@ test "test" {
rewatch => false,
realize => true,
dollar => false,
hidden => false,
export => ["hostname",],
reverse => true,
autoedge => true,
autogroup => true,

View File

@@ -15,6 +15,8 @@ test "test" {
rewatch => false,
realize => true,
dollar => false,
hidden => false,
export => ["hostname",],
reverse => true,
autoedge => true,
autogroup => true,
@@ -25,25 +27,30 @@ test "test" {
#},
}
-- OUTPUT --
Edge: composite: []str -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # sema
Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # dollar
Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # noop
Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # reset
Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # retryreset
Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # rewatch
Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # autoedge
Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # autogroup
Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # realize
Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # reverse
Edge: const: float(4.2) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # limit
Edge: const: int(-1) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # retry
Edge: const: int(0) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # delay
Edge: const: int(3) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # burst
Edge: const: int(5) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # poll
Edge: composite: []str -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # export
Edge: composite: []str -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # sema
Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # dollar
Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # hidden
Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # noop
Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # reset
Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # retryreset
Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # rewatch
Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # autoedge
Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # autogroup
Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # realize
Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # reverse
Edge: const: float(4.2) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # limit
Edge: const: int(-1) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # retry
Edge: const: int(0) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # delay
Edge: const: int(3) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # burst
Edge: const: int(5) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # poll
Edge: const: str("bar:3") -> composite: []str # 1
Edge: const: str("foo:1") -> composite: []str # 0
Edge: const: str("hostname") -> composite: []str # 0
Vertex: composite: []str
Vertex: composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool}
Vertex: composite: []str
Vertex: composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool}
Vertex: const: bool(false)
Vertex: const: bool(false)
Vertex: const: bool(false)
Vertex: const: bool(false)
@@ -61,5 +68,6 @@ Vertex: const: int(3)
Vertex: const: int(5)
Vertex: const: str("bar:3")
Vertex: const: str("foo:1")
Vertex: const: str("hostname")
Vertex: const: str("test")
Vertex: const: str("test")

View File

@@ -438,8 +438,16 @@ func (obj *Lang) Interpret() (*pgraph.Graph, error) {
obj.Logf("running interpret...")
table := obj.funcs.Table() // map[pgraph.Vertex]types.Value
interpreter := &interpret.Interpreter{
Debug: obj.Debug,
Logf: func(format string, v ...interface{}) {
// TODO: is this a sane prefix to use here?
obj.Logf("interpret: "+format, v...)
},
}
// this call returns the graph
graph, err := interpret.Interpret(obj.ast, table)
graph, err := interpreter.Interpret(obj.ast, table)
if err != nil {
return nil, errwrap.Wrapf(err, "could not interpret")
}

View File

@@ -261,6 +261,11 @@
lval.str = yylex.Text()
return PANIC_IDENTIFIER
}
/collect/ {
yylex.pos(lval) // our pos
lval.str = yylex.Text()
return COLLECT_IDENTIFIER
}
/"(\\.|[^"])*"/
{ // This matches any number of the bracketed patterns
// that are surrounded by the two quotes on each side.

View File

@@ -104,6 +104,7 @@ func init() {
%token CLASS_IDENTIFIER INCLUDE_IDENTIFIER
%token IMPORT_IDENTIFIER AS_IDENTIFIER
%token COMMENT ERROR
%token COLLECT_IDENTIFIER
%token PANIC_IDENTIFIER
// precedence table
@@ -188,6 +189,11 @@ stmt:
$$.stmt = $1.stmt
locate(yylex, $1, yyDollar[len(yyDollar)-1], $$.stmt)
}
| collect
{
$$.stmt = $1.stmt
locate(yylex, $1, yyDollar[len(yyDollar)-1], $$.stmt)
}
| resource
{
$$.stmt = $1.stmt
@@ -1026,6 +1032,46 @@ panic:
locate(yylex, $1, yyDollar[len(yyDollar)-1], $$.stmt)
}
;
collect:
// `collect file "/tmp/hello" { ... }`
// `collect file ["/tmp/hello", ...,] { ... }`
// `collect file [struct{name => "/tmp/hello", host => "foo",}, ...,] { ... }`
COLLECT_IDENTIFIER resource
{
// A "collect" stmt is exactly a regular "res" statement, except
// it has the boolean "Collect" field set to true, and it also
// has a special "resource body" entry which accepts the special
// collected data from the function graph.
$$.stmt = $2.stmt // it's us now
kind := $2.stmt.(*ast.StmtRes).Kind
res := $$.stmt.(*ast.StmtRes)
res.Collect = true
// We are secretly adding a special field to the res contents,
// which receives all of the exported data so that we have it
// arrive in our function graph in the standard way. We'd need
// to have this data to be able to build the resources we want!
call := &ast.ExprCall{
// function name to lookup special values from that kind
Name: funcs.CollectFuncName,
Args: []interfaces.Expr{
&ast.ExprStr{ // magic operator first
V: kind, // tell it what we're reading
},
// names to collect
// XXX: Can we copy the same AST nodes to here?
// XXX: Do I need to run .Copy() on them ?
// str, []str, or []struct{name str; host str}
res.Name, // expr (hopefully one of those types)
},
}
collect := &ast.StmtResCollect{ // special field
Kind: kind, // might as well tell it directly
Value: call,
}
res.Contents = append(res.Contents, collect)
locate(yylex, $1, yyDollar[len(yyDollar)-1], $$.stmt)
}
;
/* TODO: do we want to include this?
// resource bind
rbind:

View File

@@ -1014,6 +1014,22 @@ func (obj *Main) Run() error {
continue // stay paused
}
// XXX: Should we do this right before Commit?
// Don't obj.ge.Apply(...), that works on the old graph!
timing = time.Now()
//if !obj.ge.IsClosing() { // XXX: do we need to do this?
// skip prune when we're closing
//}
// FIXME: is this the right ctx?
if err := obj.ge.Exporter.Prune(exitCtx, obj.ge.Graph()); err != nil {
// XXX: This should just cause a permanent error
// here which turns into a shutdown. Refactor!
obj.ge.Abort() // delete graph
Logf("error running the exporter Prune: %+v", err)
continue
}
Logf("export cleanup took: %s", time.Since(timing))
// Start needs to be synchronous because we don't want
// to loop around and cause a pause before we unpaused.
// Commit already starts things, but we still need to

View File

@@ -244,6 +244,7 @@ func (g *Graph) AddEdge(v1, v2 Vertex, e Edge) {
g.AddVertex(v1, v2) // supports adding N vertices now
// TODO: check if an edge exists to avoid overwriting it!
// NOTE: VertexMerge() depends on overwriting it at the moment...
// NOTE: Interpret() depends on overwriting it at the moment...
g.adjacency[v1][v2] = e
}