diff --git a/docs/documentation.md b/docs/documentation.md index 744cb36d..f622e065 100644 --- a/docs/documentation.md +++ b/docs/documentation.md @@ -297,6 +297,49 @@ This meta param is a safety measure to make your life easier. It works for all resources. If someone comes up with a resource which would routinely start with a dollar sign, then we can revisit the default for this resource kind. +#### Hidden + +Boolean. Hidden means that this resource will not get executed on the resource +graph on which it is defined. This can be used as a simple boolean switch, or, +more commonly in combination with the Export meta param which specifies that the +resource params are exported into the shared database. When this is true, it +does not prevent export. In fact, it is commonly used in combination with +Export. Using this option will still include it in the resource graph, but it +will exist there in a special "mode" where it will not conflict with any other +identically named resources. It can even be used as part of an edge or via a +send/recv receiver. It can NOT be a sending vertex. These properties +differentiate the use of this instead of simply wrapping a resource in an "if" +statement. + +#### Export + +List of strings. Export is a list of hostnames (and/or the special "*" entry) +which if set, will mark this resource data as intended for export to those +hosts. This does not prevent any users of the shared data storage from reading +these values, so if you want to guarantee secrecy, use the encryption +primitives. This only labels the data accordingly, so that other hosts can know +what data is available for them to collect. The (kind, name, host) export triple +must be unique from any given exporter. In other words, you may not export two +different instances of a kind+name to the same host, the exports must not +conflict. On resource collect, this parameter is not preserved. + +```mcl +file "/tmp/foo" { + state => "exists", + content => "i'm exported!\n", + + Meta:hidden => true, + Meta:export => ["h1",], +} + +file "/tmp/foo" { + state => "exists", + content => "i'm exported AND i'm used here\n", + + Meta:export => ["h1",], +} +``` + #### Reverse Boolean. Reverse is a property that some resources can implement that specifies diff --git a/docs/resource-guide.md b/docs/resource-guide.md index 453d2454..2a432ed4 100644 --- a/docs/resource-guide.md +++ b/docs/resource-guide.md @@ -523,9 +523,10 @@ graph edges from another resource. These values are consumed during the any resource that has an appropriate value and that has the `Sendable` trait. You can read more about this in the Send/Recv section below. -### Collectable +### Exportable -This is currently a stub and will be updated once the DSL is further along. +Exportable allows a resource to tell the exporter what subset of its data it +wishes to export when that occurs. It is rare that you will need to use this. ## Resource Initialization diff --git a/engine/graph/actions.go b/engine/graph/actions.go index fbdc948e..8de265b5 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -181,6 +181,18 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { refreshableRes.SetRefresh(refresh) // tell the resource } + // Run the exported resource exporter! + var exportOK bool + var exportErr error + wg := &sync.WaitGroup{} + wg.Add(1) + // (Run this concurrently with the CheckApply related stuff below...) + go func() { + defer wg.Done() + // doesn't really need to be in parallel, but we can... + exportOK, exportErr = obj.Exporter.Export(ctx, res) + }() + // Check cached state, to skip CheckApply, but can't skip if refreshing! // If the resource doesn't implement refresh, skip the refresh test. // FIXME: if desired, check that we pass through refresh notifications! @@ -190,6 +202,13 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { } else if noop && (refresh && isRefreshableRes) { // had a refresh to do w/ noop! checkOK, err = false, nil // therefore the state is wrong + } else if res.MetaParams().Hidden { + // We're not running CheckApply + if obj.Debug { + obj.Logf("%s: Hidden", res) + } + checkOK, err = true, nil // default + } else { // run the CheckApply! if obj.Debug { @@ -201,6 +220,13 @@ func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error { obj.Logf("%s: CheckApply(%t): Return(%t, %s)", res, !noop, checkOK, engineUtil.CleanError(err)) } } + wg.Wait() + checkOK = checkOK && exportOK // always combine + if err == nil { // If CheckApply didn't error, look at exportOK. + // This is because if CheckApply errors we don't need to care or + // tell anyone about an exporting error. + err = exportErr + } if checkOK && err != nil { // should never return this way return fmt.Errorf("%s: resource programming error: CheckApply(%t): %t, %+v", res, !noop, checkOK, err) @@ -304,14 +330,24 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { } // initialize or reinitialize the meta state for this resource uid - obj.mlock.Lock() - if _, exists := obj.metas[engine.PtrUID(res)]; !exists || res.MetaParams().Reset { - obj.metas[engine.PtrUID(res)] = &engine.MetaState{ - CheckApplyRetry: res.MetaParams().Retry, // lookup the retry value - } + // if we're using a Hidden resource, we don't support this feature + // TODO: should we consider supporting it? is it really necessary? + // XXX: to support this for Hidden, we'd need to handle dupe names + metas := &engine.MetaState{ + CheckApplyRetry: res.MetaParams().Retry, // lookup the retry value + } + if !res.MetaParams().Hidden { + // Skip this if Hidden since we can have a hidden res that has + // the same kind+name as a regular res, and this would conflict. + obj.mlock.Lock() + if _, exists := obj.metas[engine.PtrUID(res)]; !exists || res.MetaParams().Reset { + obj.metas[engine.PtrUID(res)] = &engine.MetaState{ + CheckApplyRetry: res.MetaParams().Retry, // lookup the retry value + } + } + metas = obj.metas[engine.PtrUID(res)] // handle + obj.mlock.Unlock() } - metas := obj.metas[engine.PtrUID(res)] // handle - obj.mlock.Unlock() //defer close(obj.state[vertex].stopped) // done signal @@ -376,10 +412,21 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { delay = 0 // reset continue } + + } else if res.MetaParams().Hidden { + // We're not running Watch + if obj.Debug { + obj.Logf("%s: Hidden", res) + } + obj.state[vertex].cuid.StartTimer() // TODO: Should we do this? + err = obj.state[vertex].hidden(obj.state[vertex].doneCtx) + obj.state[vertex].cuid.StopTimer() // TODO: Should we do this? + } else if interval := res.MetaParams().Poll; interval > 0 { // poll instead of watching :( obj.state[vertex].cuid.StartTimer() err = obj.state[vertex].poll(obj.state[vertex].doneCtx, interval) obj.state[vertex].cuid.StopTimer() // clean up nicely + } else { obj.state[vertex].cuid.StartTimer() if obj.Debug { diff --git a/engine/graph/engine.go b/engine/graph/engine.go index 5bd83341..637c0b76 100644 --- a/engine/graph/engine.go +++ b/engine/graph/engine.go @@ -59,9 +59,12 @@ type Engine struct { Version string Hostname string + // Break off separate logical pieces into chunks where possible. Converger *converger.Coordinator - Local *local.API - World engine.World + Exporter *Exporter + + Local *local.API + World engine.World // Prefix is a unique directory prefix which can be used. It should be // created if needed. @@ -85,6 +88,7 @@ type Engine struct { paused bool // are we paused? fastPause bool + isClosing bool // are we shutting down? } // Init initializes the internal structures and starts this the graph running. @@ -116,7 +120,7 @@ func (obj *Engine) Init() error { obj.wlock = &sync.Mutex{} obj.mlock = &sync.Mutex{} - obj.metas = make(map[engine.ResPtrUID]*engine.MetaState) + obj.metas = make(map[engine.ResPtrUID]*engine.MetaState) // don't include .Hidden res obj.slock = &sync.Mutex{} obj.semas = make(map[string]*semaphore.Semaphore) @@ -125,6 +129,18 @@ func (obj *Engine) Init() error { obj.paused = true // start off true, so we can Resume after first Commit + obj.Exporter = &Exporter{ + World: obj.World, + Debug: obj.Debug, + Logf: func(format string, v ...interface{}) { + // TODO: is this a sane prefix to use here? + obj.Logf("export: "+format, v...) + }, + } + if err := obj.Exporter.Init(); err != nil { + return err + } + return nil } @@ -188,6 +204,12 @@ func (obj *Engine) Commit() error { if !ok { // should not happen, previously validated return fmt.Errorf("not a Res") } + // Skip this if Hidden since we can have a hidden res that has + // the same kind+name as a regular res, and this would conflict. + if res.MetaParams().Hidden { + continue + } + activeMetas[engine.PtrUID(res)] = struct{}{} // add } @@ -208,7 +230,11 @@ func (obj *Engine) Commit() error { return fmt.Errorf("the Res state already exists") } - activeMetas[engine.PtrUID(res)] = struct{}{} // add + // Skip this if Hidden since we can have a hidden res that has + // the same kind+name as a regular res, and this would conflict. + if !res.MetaParams().Hidden { + activeMetas[engine.PtrUID(res)] = struct{}{} // add + } if obj.Debug { obj.Logf("Validate(%s)", res) @@ -299,7 +325,12 @@ func (obj *Engine) Commit() error { if !ok { // should not happen, previously validated return fmt.Errorf("not a Res") } - delete(activeMetas, engine.PtrUID(res)) + + // Skip this if Hidden since we can have a hidden res that has + // the same kind+name as a regular res, and this would conflict. + if !res.MetaParams().Hidden { + delete(activeMetas, engine.PtrUID(res)) + } // wait for exit before starting new graph! close(obj.state[vertex].removeDone) // causes doneCtx to cancel @@ -501,6 +532,7 @@ func (obj *Engine) Pause(fastPause bool) error { // actually just a Load of an empty graph and a Commit. It waits for all the // resources to exit before returning. func (obj *Engine) Shutdown() error { + obj.isClosing = true emptyGraph, reterr := pgraph.NewGraph("empty") // this is a graph switch (graph sync) that switches to an empty graph! @@ -517,6 +549,15 @@ func (obj *Engine) Shutdown() error { return reterr } +// IsClosing tells the caller if a Shutdown() was run. This is helpful so that +// the graph can behave slightly differently when receiving the final empty +// graph. This is because it's empty because we passed one to unload everything, +// not because the user actually removed all resources. We may want to preserve +// the exported state for example, and not purge it. +func (obj *Engine) IsClosing() bool { + return obj.isClosing +} + // Graph returns the running graph. func (obj *Engine) Graph() *pgraph.Graph { return obj.graph diff --git a/engine/graph/exporter.go b/engine/graph/exporter.go new file mode 100644 index 00000000..863c4b67 --- /dev/null +++ b/engine/graph/exporter.go @@ -0,0 +1,349 @@ +// Mgmt +// Copyright (C) James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// +// Additional permission under GNU GPL version 3 section 7 +// +// If you modify this program, or any covered work, by linking or combining it +// with embedded mcl code and modules (and that the embedded mcl code and +// modules which link with this program, contain a copy of their source code in +// the authoritative form) containing parts covered by the terms of any other +// license, the licensors of this program grant you additional permission to +// convey the resulting work. Furthermore, the licensors of this program grant +// the original author, James Shubin, additional permission to update this +// additional permission if he deems it necessary to achieve the goals of this +// additional permission. + +package graph + +import ( + "context" + "fmt" + "sync" + + "github.com/purpleidea/mgmt/engine" + "github.com/purpleidea/mgmt/pgraph" + + //"github.com/purpleidea/mgmt/pgraph" + engineUtil "github.com/purpleidea/mgmt/engine/util" +) + +// Exporter is the main engine mechanism that sends the exported resource data +// to the World database. The code is relatively succinct, but slightly subtle. +type Exporter struct { + // Watch specifies if we want to enable the additional watch feature. It + // should probably be left off unless we're debugging something or using + // weird environments where we expect someone to mess with our res data. + Watch bool + + World engine.World + + Debug bool + Logf func(format string, v ...interface{}) + + state map[engine.ResDelete]bool // key NOT a pointer for it to be unique + prev map[engine.ResDelete]pgraph.Vertex + mutex *sync.Mutex + + // watch specific variables + workerRunning bool + workerWg *sync.WaitGroup + workerCtx context.Context + workerCancel func() +} + +// Init performs some initialization before first use. This is required. +func (obj *Exporter) Init() error { + obj.state = make(map[engine.ResDelete]bool) + obj.prev = make(map[engine.ResDelete]pgraph.Vertex) + obj.mutex = &sync.Mutex{} + + obj.workerRunning = false + obj.workerWg = &sync.WaitGroup{} + obj.workerCtx, obj.workerCancel = context.WithCancel(context.Background()) + + return nil +} + +// Export performs the worldly export, and then stores the resource unique ID in +// our in-memory data store. Exported resources use this tracking to know when +// to run their cleanups. If this function encounters an error, it returns +// (false, err). If it does nothing it returns (true, nil). If it does work it +// return (false, nil). These return codes match how CheckApply returns. This +// may run concurrently by multiple different resources, so as a result it must +// stay thread safe. +func (obj *Exporter) Export(ctx context.Context, res engine.Res) (bool, error) { + // As a result of running this operation in roughly the same places that + // the usual CheckApply step would run, we end up with a more nuanced + // and mature "exported resources" model than what was ever possible + // with other tools. We can now "wait" (via the resource graph + // dependencies) to run an export until an earlier resource dependency + // step has run. We can also programmatically "un-export" a resource by + // publishing a subsequent resource graph which either removes that + // Export flag or the entire resource. The one downside is that + // exporting to the database happens in multiple transactions rather + // than a batched bolus, but this is more appropriate because we're now + // more accurately modelling real-time systems, and this bandwidth is + // not a significant amount anyways. Lastly, we make sure to not run the + // purge when we ^C, since it should be safe to shutdown without killing + // all the data we left there. + + if res.MetaParams().Noop { + return true, nil // did nothing + } + + exports := res.MetaParams().Export + if len(exports) == 0 { + return true, nil // did nothing + } + + // It's OK to check the cache here instead of re-sending via the World + // API and so on, because the only way the Res data would change in + // World is if (1) someone messed with etcd, which we'd see with Watch, + // or (2) if the Res data changed because we have a new resource graph. + // If we have a new resource graph, then any changed elements will get + // pruned from this state cache via the Prune method, which helps us. + // If send/recv or any other weird resource method changes things, then + // we also want to invalidate the state cache. + state := true + + // TODO: This recv code is untested! + if r, ok := res.(engine.RecvableRes); ok { + for _, v := range r.Recv() { // map[string]*Send + // XXX: After we read the changed value, will it persist? + state = state && !v.Changed + } + } + + obj.mutex.Lock() + for _, ptrUID := range obj.ptrUID(res) { + b := obj.state[*ptrUID] // no need to check if exists + state = state && b // if any are false, it's all false + } + obj.mutex.Unlock() + if state { + return true, nil // state OK! + } + + // XXX: Do we want to change any metaparams when we export? + // XXX: Do we want to change any metaparams when we collect? + b64, err := obj.resToB64(res) + if err != nil { + return false, err + } + + resourceExports := []*engine.ResExport{} + duplicates := make(map[string]struct{}) + for _, export := range exports { + //ptrUID := engine.ResDelete{ + // Kind: res.Kind(), + // Name: res.Name(), + // Host: export, + //} + if export == "*" { + export = "" // XXX: use whatever means "all" + } + if _, exists := duplicates[export]; exists { + continue + } + duplicates[export] = struct{}{} + // skip this check since why race it or split the resource... + //if stateOK := obj.state[ptrUID]; stateOK { + // // rare that we'd have a split of some of these from a + // // single resource updated and others already fine, but + // // might as well do the check since it's cheap... + // continue + //} + resExport := &engine.ResExport{ + Kind: res.Kind(), + Name: res.Name(), + Host: export, + Data: b64, // encoded res data + } + resourceExports = append(resourceExports, resExport) + } + + // The fact that we Watch the write-only-by-us values at all, is a + // luxury that allows us to handle mischievous actors that overwrote an + // exported value. It really isn't necessary. It's the consumers that + // really need to watch. + if err := obj.worker(); err != nil { + return false, err // big error + } + + // TODO: Do we want to log more information about where this exports to? + obj.Logf("%s", res) + // XXX: Add a TTL if requested + b, err := obj.World.ResExport(ctx, resourceExports) // do it! + if err != nil { + return false, err + } + + obj.mutex.Lock() + defer obj.mutex.Unlock() + // NOTE: The Watch() method *must* invalidate this state if it changes. + // This is only pertinent if we're using the luxury Watch add-ons. + for _, ptrUID := range obj.ptrUID(res) { + obj.state[*ptrUID] = true // state OK! + } + + return b, nil +} + +// Prune removes any exports which are no longer actively being presented in the +// resource graph. This cleans things up between graph swaps. This should NOT +// run if we're shutting down cleanly. Keep in mind that this must act on the +// new graph which is available by "Commit", not before we're ready to "Commit". +func (obj *Exporter) Prune(ctx context.Context, graph *pgraph.Graph) error { + // mutex should be optional since this should only run when graph paused + obj.mutex.Lock() + defer obj.mutex.Unlock() + + // make searching faster by initially storing it all in a map + m := make(map[engine.ResDelete]pgraph.Vertex) // key is NOT a pointer + for _, v := range graph.Vertices() { + res, ok := v.(engine.Res) + if !ok { // should not happen + return fmt.Errorf("not a Res") + } + for _, ptrUID := range obj.ptrUID(res) { // skips non-export things + m[*ptrUID] = v + } + } + + resourceDeletes := []*engine.ResDelete{} + for k := range obj.state { + v, exists := m[k] // exists means it's in the graph + prev := obj.prev[k] + obj.prev[k] = v // may be nil + if exists && v != prev { // pointer compare to old vertex + // Here we have a Res that previously existed under the + // same kind/name/host. We need to invalidate the state + // only if it's a different Res than the previous one! + // If we do this erroneously, it causes extra traffic. + obj.state[k] = false // do this only if the Res is NEW + continue // skip it, it's staying + } + delete(obj.state, k) // it's gone! + resourceDeletes = append(resourceDeletes, &k) + } + + if len(resourceDeletes) == 0 { + return nil + } + + obj.Logf("prune: %d exports", len(resourceDeletes)) + for _, x := range resourceDeletes { + obj.Logf("prune: %s to %s", engine.Repr(x.Kind, x.Name), x.Host) + } + // XXX: this function could optimize the grouping since we split the + // list of host entries out from the kind/name since we can't have a + // unique map key with a struct that contains a slice. + if _, err := obj.World.ResDelete(ctx, resourceDeletes); err != nil { + return err + } + + return nil +} + +// resToB64 is a helper to refactor out this method. +func (obj *Exporter) resToB64(res engine.Res) (string, error) { + if r, ok := res.(engine.ExportableRes); ok { + return r.ToB64() + } + + return engineUtil.ResToB64(res) +} + +// ptrUID is a helper for this repetitive code. +func (obj *Exporter) ptrUID(res engine.Res) []*engine.ResDelete { + a := []*engine.ResDelete{} + for _, export := range res.MetaParams().Export { + if export == "*" { + export = "" // XXX: use whatever means "all" + } + + ptrUID := &engine.ResDelete{ + Kind: res.Kind(), + Name: res.Name(), + Host: export, + } + a = append(a, ptrUID) + } + return a +} + +// worker is a helper to kick off the optional Watch workers. +func (obj *Exporter) worker() error { + if !obj.Watch { + return nil // feature is disabled + } + + obj.mutex.Lock() + defer obj.mutex.Unlock() + + if obj.workerRunning { + return nil // already running + } + + kind := "" // watch everything + ch, err := obj.World.ResWatch(obj.workerCtx, kind) // (chan error, error) + if err != nil { + return err // big error + } + obj.workerRunning = true + obj.workerWg.Add(1) + go func() { + defer func() { + obj.mutex.Lock() + obj.workerRunning = false + obj.mutex.Unlock() + }() + defer obj.workerWg.Done() + Loop: + for { + var e error + var ok bool + select { + case e, ok = <-ch: + if !ok { + // chan closed + break Loop + } + + case <-obj.workerCtx.Done(): + break Loop + } + if e != nil { + // something errored... shutdown coming! + } + // event! + obj.mutex.Lock() + for k := range obj.state { + obj.state[k] = false // reset it all + } + obj.mutex.Unlock() + } + }() + + return nil +} + +// Shutdown cancels any running workers and waits for them to finish. +func (obj *Exporter) Shutdown() { + obj.workerCancel() + obj.workerWg.Wait() +} diff --git a/engine/graph/state.go b/engine/graph/state.go index 5349863d..8d3507ae 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -430,3 +430,13 @@ func (obj *State) poll(ctx context.Context, interval uint32) error { obj.init.Event() // notify engine of an event (this can block) } } + +// hidden is a replacement for Watch when the Hidden metaparameter is used. +func (obj *State) hidden(ctx context.Context) error { + obj.init.Running() // when started, notify engine that we're running + + select { + case <-ctx.Done(): // signal for shutdown request + return nil + } +} diff --git a/engine/metaparams.go b/engine/metaparams.go index 5ecbcd6c..a2c4b069 100644 --- a/engine/metaparams.go +++ b/engine/metaparams.go @@ -53,6 +53,8 @@ var DefaultMetaParams = &MetaParams{ Rewatch: false, Realize: false, // true would be more awesome, but unexpected for users Dollar: false, + Hidden: false, + Export: []string{}, } // MetaRes is the interface a resource must implement to support meta params. @@ -140,6 +142,32 @@ type MetaParams struct { // interpolate a variable name. In the rare case when it's needed, you // can disable that check with this meta param. Dollar bool `yaml:"dollar"` + + // Hidden means that this resource will not get executed on the resource + // graph on which it is defined. This can be used as a simple boolean + // switch, or, more commonly in combination with the Export meta param + // which specifies that the resource params are exported into the shared + // database. When this is true, it does not prevent export. In fact, it + // is commonly used in combination with Export. Using this option will + // still include it in the resource graph, but it will exist there in a + // special "mode" where it will not conflict with any other identically + // named resources. It can even be used as part of an edge or via a + // send/recv receiver. It can NOT be a sending vertex. These properties + // differentiate the use of this instead of simply wrapping a resource + // in an "if" statement. + Hidden bool `yaml:"hidden"` + + // Export is a list of hostnames (and/or the special "*" entry) which if + // set, will mark this resource data as intended for export to those + // hosts. This does not prevent any users of the shared data storage + // from reading these values, so if you want to guarantee secrecy, use + // the encryption primitives. This only labels the data accordingly, so + // that other hosts can know what data is available for them to collect. + // The (kind, name, host) export triple must be unique from any given + // exporter. In other words, you may not export two different instances + // of a kind+name to the same host, the exports must not conflict. On + // resource collect, this parameter is not preserved. + Export []string `yaml:"export"` } // Cmp compares two AutoGroupMeta structs and determines if they're equivalent. @@ -189,6 +217,12 @@ func (obj *MetaParams) Cmp(meta *MetaParams) error { if obj.Dollar != meta.Dollar { return fmt.Errorf("values for Dollar are different") } + if obj.Hidden != meta.Hidden { + return fmt.Errorf("values for Hidden are different") + } + if err := util.SortedStrSliceCompare(obj.Export, meta.Export); err != nil { + return errwrap.Wrapf(err, "values for Export are different") + } return nil } @@ -208,6 +242,13 @@ func (obj *MetaParams) Validate() error { } } + for _, s := range obj.Export { + if s == "" { + return fmt.Errorf("export is empty") + } + } + // TODO: Should we validate the export patterns? + return nil } @@ -218,6 +259,11 @@ func (obj *MetaParams) Copy() *MetaParams { sema = make([]string, len(obj.Sema)) copy(sema, obj.Sema) } + export := []string{} + if obj.Export != nil { + export = make([]string, len(obj.Export)) + copy(export, obj.Export) + } return &MetaParams{ Noop: obj.Noop, Retry: obj.Retry, @@ -230,6 +276,8 @@ func (obj *MetaParams) Copy() *MetaParams { Rewatch: obj.Rewatch, Realize: obj.Realize, Dollar: obj.Dollar, + Hidden: obj.Hidden, + Export: export, } } diff --git a/engine/resources.go b/engine/resources.go index 39d08bd0..4a1c2e2d 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -376,6 +376,22 @@ type CompatibleRes interface { Merge(CompatibleRes) (CompatibleRes, error) } +// ExportableRes allows the resource to have its own implementation of resource +// encoding, so that it can send data over the wire differently. It's unlikely +// that you will want to implement this interface for most scenarios. It may be +// useful to limit private data exposure, large data sizes, and to add more info +// to what would normally be shared. +type ExportableRes interface { + Res + + // ToB64 lets the resource provide an alternative implementation of the + // usual ResToB64 method. This lets the resource omit, add, or modify + // the parameter data before it goes out over the wire. + ToB64() (string, error) + + // TODO: Do we want to add a FromB64 method for decoding the Resource? +} + // YAMLRes is a resource that supports creation by unmarshalling. type YAMLRes interface { Res diff --git a/engine/world.go b/engine/world.go index 7ecba741..39280021 100644 --- a/engine/world.go +++ b/engine/world.go @@ -31,6 +31,7 @@ package engine import ( "context" + "fmt" "github.com/purpleidea/mgmt/etcd/interfaces" "github.com/purpleidea/mgmt/etcd/scheduler" @@ -117,11 +118,103 @@ type StrWorld interface { // ResWorld is a world interface that lets us store, pull and watch resources in // a distributed database. // XXX: These API's are likely to change. +// XXX: Add optional TTL's to these API's, maybe use WithTTL(...) type options. type ResWorld interface { - ResWatch(context.Context) (chan error, error) - ResExport(context.Context, []Res) error - // FIXME: should this method take a "filter" data struct instead of many args? - ResCollect(ctx context.Context, hostnameFilter, kindFilter []string) ([]Res, error) + // ResWatch returns a channel which produces a new value once on startup + // as soon as it is successfully connected, and once for every time it + // sees that a resource that has been exported for this hostname is + // added, deleted, or modified. If kind is specified, the watch will + // attempt to only send events relating to that resource kind. We always + // intended to only show events for resources which the watching host is + // allowed to see. + ResWatch(ctx context.Context, kind string) (chan error, error) + + // ResCollect does a lookup for resource entries that have previously + // been stored for us. It returns a subset of these based on the input + // filter. It does not return a Res, since while that would be useful, + // and logical, it turns out we usually want to transport the Res data + // onwards through the function graph, and using a native string is what + // is already supported. (A native res type would just be encoded as a + // string anyways.) While it might be more "correct" to do the work to + // decode the string into a Res, the user of this function would just + // encode it back to a string anyways, and this is not very efficient. + ResCollect(ctx context.Context, filters []*ResFilter) ([]*ResOutput, error) + + // ResExport stores a number of resources in the world storage system. + // The individual records should not be updated if they are identical to + // what is already present. (This is to prevent unnecessary events.) If + // this makes no changes, it returns (true, nil). If it makes a change, + // then it returns (false, nil). On any error we return (false, err). + ResExport(ctx context.Context, resourceExports []*ResExport) (bool, error) + + // ResDelete deletes a number of resources in the world storage system. + // If this doesn't delete, it returns (true, nil). If it makes a delete, + // then it returns (false, nil). On any error we return (false, err). + ResDelete(ctx context.Context, resourceDeletes []*ResDelete) (bool, error) +} + +// ResFilter specifies that we want to match an item with this three tuple. If +// any of these are empty, then it means to match an item with any value for +// that field. +// TODO: Future secure implementations must verify that the exported made a +// value available to that hostname. It's not enough for a host to request it. +// We can enforce this with public key encryption eventually. +type ResFilter struct { + Kind string + Name string + Host string // from this host +} + +// Match returns nil on a successful match. +func (obj *ResFilter) Match(kind, name, host string) error { + if obj.Kind != "" && obj.Kind != kind { + return fmt.Errorf("kind did not match") + } + if obj.Name != "" && obj.Name != name { + return fmt.Errorf("name did not match") + } + if obj.Host != "" && obj.Host != host { + return fmt.Errorf("host did not match") + } + + return nil // match! +} + +// ResOutput represents a record of exported resource data which we have read +// out from the world storage system. The Data field contains an encoded version +// of the resource, and even though decoding it will get you a Kind and Name, we +// still store those values here in duplicate for them to be available before +// decoding. +type ResOutput struct { + Kind string + Name string + Host string // from this host + Data string // encoded res data +} + +// ResExport represents a record of exported resource data which we want to save +// to the world storage system. The Data field contains an encoded version of +// the resource, and even though decoding it will get you a Kind and Name, we +// still store those values here in duplicate for them to be available before +// decoding. If Host is specified, then only the node with that hostname may +// access this resource. If it's empty than it may be collected by anyone. If we +// want to export to only three hosts, then we duplicate this entry three times. +// It's true that this is not an efficient use of storage space, but it maps +// logically to a future data structure where data is encrypted to the public +// key of that specific host where we wouldn't be able to de-duplicate anyways. +type ResExport struct { + Kind string + Name string + Host string // to/for this host + Data string // encoded res data +} + +// ResDelete represents the uniqueness key for stored resources. As a result, +// this triple is a useful map key in various locations. +type ResDelete struct { + Kind string + Name string + Host string // to/for this host } // SchedulerWorld is an interface that has to do with distributed scheduling. diff --git a/etcd/client/resources/resources.go b/etcd/client/resources/resources.go index e8a2de97..dbf8cb04 100644 --- a/etcd/client/resources/resources.go +++ b/etcd/client/resources/resources.go @@ -35,11 +35,12 @@ import ( "strings" "github.com/purpleidea/mgmt/engine" - engineUtil "github.com/purpleidea/mgmt/engine/util" "github.com/purpleidea/mgmt/etcd/interfaces" - "github.com/purpleidea/mgmt/util" + "github.com/purpleidea/mgmt/util/errwrap" etcd "go.etcd.io/etcd/client/v3" + clientv3Util "go.etcd.io/etcd/client/v3/clientv3util" + //pb "go.etcd.io/etcd/api/v3/etcdserverpb" ) const ( @@ -50,94 +51,26 @@ const ( // change. // TODO: Filter our watch (on the server side if possible) based on the // collection prefixes and filters that we care about... -func WatchResources(ctx context.Context, client interfaces.Client) (chan error, error) { - path := fmt.Sprintf("%s/exported/", ns) +// XXX: filter based on kind as well, we don't do that currently... See: +// https://github.com/etcd-io/etcd/issues/19667 +func WatchResources(ctx context.Context, client interfaces.Client, hostname, kind string) (chan error, error) { + // key structure is $NS/exported/$hostname:to/$hostname:from/$kind/$name = $data + // TODO: support the star (*) hostname matching catch-all? + path := fmt.Sprintf("%s/exported/%s/", ns, hostname) return client.Watcher(ctx, path, etcd.WithPrefix()) } -// SetResources exports all of the resources which we pass in to etcd. -func SetResources(ctx context.Context, client interfaces.Client, hostname string, resourceList []engine.Res) error { - // key structure is $NS/exported/$hostname/resources/$uid = $data - - var kindFilter []string // empty to get from everyone - hostnameFilter := []string{hostname} - // this is not a race because we should only be reading keys which we - // set, and there should not be any contention with other hosts here! - originals, err := GetResources(ctx, client, hostnameFilter, kindFilter) - if err != nil { - return err - } - - if len(originals) == 0 && len(resourceList) == 0 { // special case of no add or del - return nil - } - - ifs := []etcd.Cmp{} // list matching the desired state - ops := []etcd.Op{} // list of ops in this transaction - for _, res := range resourceList { - if res.Kind() == "" { - return fmt.Errorf("empty kind: %s", res.Name()) - } - uid := fmt.Sprintf("%s/%s", res.Kind(), res.Name()) - path := fmt.Sprintf("%s/exported/%s/resources/%s", ns, hostname, uid) - if data, err := engineUtil.ResToB64(res); err == nil { - ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", data)) // desired state - ops = append(ops, etcd.OpPut(path, data)) - } else { - return fmt.Errorf("can't convert to B64: %v", err) - } - } - - match := func(res engine.Res, resourceList []engine.Res) bool { // helper lambda - for _, x := range resourceList { - if res.Kind() == x.Kind() && res.Name() == x.Name() { - return true - } - } - return false - } - - hasDeletes := false - // delete old, now unused resources here... - for _, res := range originals { - if res.Kind() == "" { - return fmt.Errorf("empty kind: %s", res.Name()) - } - uid := fmt.Sprintf("%s/%s", res.Kind(), res.Name()) - path := fmt.Sprintf("%s/exported/%s/resources/%s", ns, hostname, uid) - - if match(res, resourceList) { // if we match, no need to delete! - continue - } - - ops = append(ops, etcd.OpDelete(path)) - - hasDeletes = true - } - - // if everything is already correct, do nothing, otherwise, run the ops! - // it's important to do this in one transaction, and atomically, because - // this way, we only generate one watch event, and only when it's needed - if hasDeletes { // always run, ifs don't matter - _, err = client.Txn(ctx, nil, ops, nil) // TODO: does this run? it should! - } else { - _, err = client.Txn(ctx, ifs, nil, ops) // TODO: do we need to look at response? - } - return err -} - -// GetResources collects all of the resources which match a filter from etcd. If -// the kindfilter or hostnameFilter is empty, then it assumes no filtering... -// TODO: Expand this with a more powerful filter based on what we eventually -// support in our collect DSL. Ideally a server side filter like WithFilter() -// could do this if the pattern was $NS/exported/$kind/$hostname/$uid = $data. -func GetResources(ctx context.Context, client interfaces.Client, hostnameFilter, kindFilter []string) ([]engine.Res, error) { - // key structure is $NS/exported/$hostname/resources/$uid = $data +// GetResources reads the resources sent to the input hostname, and also applies +// the filters to ensure we get a limited selection. +// XXX: We'd much rather filter server side if etcd had better filtering API's. +// See: https://github.com/etcd-io/etcd/issues/19667 +func GetResources(ctx context.Context, client interfaces.Client, hostname string, filters []*engine.ResFilter) ([]*engine.ResOutput, error) { + // key structure is $NS/exported/$hostname:to/$hostname:from/$kind/$name = $data path := fmt.Sprintf("%s/exported/", ns) - resourceList := []engine.Res{} + output := []*engine.ResOutput{} keyMap, err := client.Get(ctx, path, etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortAscend)) if err != nil { - return nil, fmt.Errorf("could not get resources: %v", err) + return nil, errwrap.Wrapf(err, "could not get resources") } for key, val := range keyMap { if !strings.HasPrefix(key, path) { // sanity check @@ -145,35 +78,118 @@ func GetResources(ctx context.Context, client interfaces.Client, hostnameFilter, } str := strings.Split(key[len(path):], "/") - if len(str) != 4 { - return nil, fmt.Errorf("unexpected chunk count") + if len(str) < 4 { + return nil, fmt.Errorf("unexpected chunk count of: %d", len(str)) } - hostname, r, kind, name := str[0], str[1], str[2], str[3] - if r != "resources" { - return nil, fmt.Errorf("unexpected chunk pattern") + // The name may contain slashes, so join all those pieces back! + hostnameTo, hostnameFrom, kind, name := str[0], str[1], str[2], strings.Join(str[3:], "/") + if hostnameTo == "" || hostnameFrom == "" { + return nil, fmt.Errorf("unexpected empty hostname") } if kind == "" { - return nil, fmt.Errorf("unexpected kind chunk") + return nil, fmt.Errorf("unexpected empty kind") } - if name == "" { // TODO: should I check this? + if name == "" { return nil, fmt.Errorf("unexpected empty name") } - // FIXME: ideally this would be a server side filter instead! - if len(hostnameFilter) > 0 && !util.StrInList(hostname, hostnameFilter) { + + // XXX: Do we want to include this catch-all match? + if hostnameTo != hostname && hostnameTo != "*" { // star is any continue } - // FIXME: ideally this would be a server side filter instead! - if len(kindFilter) > 0 && !util.StrInList(kind, kindFilter) { - continue + // TODO: I'd love to avoid this O(N^2) matching if possible... + for _, filter := range filters { + if err := filter.Match(kind, name, hostnameFrom); err != nil { + continue // did not match + } } - if res, err := engineUtil.B64ToRes(val); err == nil { - //obj.Logf("Get: (Hostname, Kind, Name): (%s, %s, %s)", hostname, kind, name) - resourceList = append(resourceList, res) - } else { - return nil, fmt.Errorf("can't convert from B64: %v", err) + ro := &engine.ResOutput{ + Kind: kind, + Name: name, + Host: hostnameFrom, // from this host + Data: val, // encoded res data } + output = append(output, ro) } - return resourceList, nil + + return output, nil +} + +// SetResources stores some resource data for export in etcd. It returns an +// error if anything goes wrong. If it didn't need to make a changes because the +// data was already correct in the database, it returns (true, nil). Otherwise +// it returns (false, nil). +func SetResources(ctx context.Context, client interfaces.Client, hostname string, resourceExports []*engine.ResExport) (bool, error) { + // key structure is $NS/exported/$hostname:to/$hostname:from/$kind/$name = $data + + // XXX: We run each export one at a time, because there's a bug if we + // group them, See: https://github.com/etcd-io/etcd/issues/19678 + b := true + for _, re := range resourceExports { + ifs := []etcd.Cmp{} // list matching the desired state + thn := []etcd.Op{} // list of ops in this transaction (then) + els := []etcd.Op{} // list of ops in this transaction (else) + + host := re.Host + if host == "" { + host = "*" // XXX: use whatever means "all" + } + + path := fmt.Sprintf("%s/exported/%s/%s/%s/%s", ns, host, hostname, re.Kind, re.Name) + ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", re.Data)) + els = append(els, etcd.OpPut(path, re.Data)) + + // it's important to do this in one transaction, and atomically, because + // this way, we only generate one watch event, and only when it's needed + out, err := client.Txn(ctx, ifs, thn, els) + if err != nil { + return false, err + } + + b = b && out.Succeeded // collect the true/false responses... + } + + // false means something changed + return b, nil +} + +// DelResources deletes some exported resource data from etcd. It returns an +// error if anything goes wrong. If it didn't need to make a changes because the +// data was already correct in the database, it returns (true, nil). Otherwise +// it returns (false, nil). +func DelResources(ctx context.Context, client interfaces.Client, hostname string, resourceDeletes []*engine.ResDelete) (bool, error) { + // key structure is $NS/exported/$hostname:to/$hostname:from/$kind/$name = $data + + // XXX: We run each delete one at a time, because there's a bug if we + // group them, See: https://github.com/etcd-io/etcd/issues/19678 + b := true + for _, rd := range resourceDeletes { + + ifs := []etcd.Cmp{} // list matching the desired state + thn := []etcd.Op{} // list of ops in this transaction (then) + els := []etcd.Op{} // list of ops in this transaction (else) + + host := rd.Host + if host == "" { + host = "*" // XXX: use whatever means "all" + } + + path := fmt.Sprintf("%s/exported/%s/%s/%s/%s", ns, host, hostname, rd.Kind, rd.Name) + ifs = append(ifs, clientv3Util.KeyExists(path)) + thn = append(thn, etcd.OpDelete(path)) + + // it's important to do this in one transaction, and atomically, because + // this way, we only generate one watch event, and only when it's needed + out, err := client.Txn(ctx, ifs, thn, els) + if err != nil { + return false, err + } + + b = b && out.Succeeded // collect the true/false responses... + } + + // false means something changed + return b, nil } diff --git a/etcd/world.go b/etcd/world.go index 5a580794..944fb30e 100644 --- a/etcd/world.go +++ b/etcd/world.go @@ -167,23 +167,32 @@ func (obj *World) AddDeploy(ctx context.Context, id uint64, hash, pHash string, // ResWatch returns a channel which spits out events on possible exported // resource changes. -func (obj *World) ResWatch(ctx context.Context) (chan error, error) { - return resources.WatchResources(ctx, obj.client) +func (obj *World) ResWatch(ctx context.Context, kind string) (chan error, error) { + return resources.WatchResources(ctx, obj.client, obj.init.Hostname, kind) } -// ResExport exports a list of resources under our hostname namespace. -// Subsequent calls replace the previously set collection atomically. -func (obj *World) ResExport(ctx context.Context, resourceList []engine.Res) error { - return resources.SetResources(ctx, obj.client, obj.init.Hostname, resourceList) -} - -// ResCollect gets the collection of exported resources which match the filter. +// ResCollect gets the collection of exported resources which match the filters. // It does this atomically so that a call always returns a complete collection. -func (obj *World) ResCollect(ctx context.Context, hostnameFilter, kindFilter []string) ([]engine.Res, error) { - // XXX: should we be restricted to retrieving resources that were - // exported with a tag that allows or restricts our hostname? We could - // enforce that here if the underlying API supported it... Add this? - return resources.GetResources(ctx, obj.client, hostnameFilter, kindFilter) +func (obj *World) ResCollect(ctx context.Context, filters []*engine.ResFilter) ([]*engine.ResOutput, error) { + return resources.GetResources(ctx, obj.client, obj.init.Hostname, filters) +} + +// ResExport stores a number of resources in the world storage system. The +// individual records should not be updated if they are identical to what is +// already present. (This is to prevent unnecessary events.) If this makes no +// changes, it returns (true, nil). If it makes a change, then it returns +// (false, nil). On any error we return (false, err). It stores the exports +// under our hostname namespace. Subsequent calls do NOT replace the previously +// set collection. +func (obj *World) ResExport(ctx context.Context, resourceExports []*engine.ResExport) (bool, error) { + return resources.SetResources(ctx, obj.client, obj.init.Hostname, resourceExports) +} + +// ResDelete deletes a number of resources in the world storage system. If this +// doesn't delete, it returns (true, nil). If it makes a delete, then it returns +// (false, nil). On any error we return (false, err). +func (obj *World) ResDelete(ctx context.Context, resourceDeletes []*engine.ResDelete) (bool, error) { + return resources.DelResources(ctx, obj.client, obj.init.Hostname, resourceDeletes) } // IdealClusterSizeWatch returns a stream of errors anytime the cluster-wide diff --git a/examples/lang/export0.mcl b/examples/lang/export0.mcl new file mode 100644 index 00000000..5cf5b54c --- /dev/null +++ b/examples/lang/export0.mcl @@ -0,0 +1,27 @@ +file "/tmp/hello" { + content => "hello world from @purpleidea\n", + state => $const.res.file.state.exists, + + Meta:hidden => true, + + #Meta:export => ["*",], # export to all + #Meta:export => ["hostname1",], # export to just this one + Meta:export => [ # export to everyone in this list + "${hostname}", + "hostname2", + "hostname3", + ], +} + +collect file "/tmp/hello" { + #content => "i was collected\n", # override + + Meta:hidden => false, +} + +# collect a more complex way (use helper functions here instead!) +collect file [ + struct{name => "/tmp/hello", host => "${hostname}",}, +] { + Meta:hidden => false, +} diff --git a/lang/ast/structs.go b/lang/ast/structs.go index 03d172ad..a02f8d74 100644 --- a/lang/ast/structs.go +++ b/lang/ast/structs.go @@ -382,6 +382,10 @@ type StmtRes struct { Name interfaces.Expr // unique name for the res of this kind namePtr interfaces.Func // ptr for table lookup Contents []StmtResContents // list of fields/edges in parsed order + + // Collect specifies that we are "collecting" exported resources. The + // names come from other hosts (or from ourselves during a self-export). + Collect bool } // String returns a short representation of this statement. @@ -426,6 +430,7 @@ func (obj *StmtRes) Init(data *interfaces.Data) error { } fieldNames := make(map[string]struct{}) metaNames := make(map[string]struct{}) + foundCollect := false for _, x := range obj.Contents { // Duplicate checking for identical field names. @@ -444,12 +449,27 @@ func (obj *StmtRes) Init(data *interfaces.Data) error { // Ignore the generic MetaField struct field for now. // You're allowed to have more than one Meta field, but // they can't contain the same field twice. + // FIXME: Allow duplicates in certain fields, such as + // ones that are lists... In this case, they merge... if _, exists := metaNames[line.Property]; exists && line.Property != MetaField { return fmt.Errorf("resource has duplicate meta entry of: %s", line.Property) } metaNames[line.Property] = struct{}{} } + // Duplicate checking for more than one StmtResCollect entry. + if stmt, ok := x.(*StmtResCollect); ok && foundCollect { + // programming error + return fmt.Errorf("duplicate collect body in res") + + } else if ok { + if stmt.Kind != obj.Kind { + // programming error + return fmt.Errorf("unexpected kind mismatch") + } + foundCollect = true // found one + } + if err := x.Init(data); err != nil { return err } @@ -481,6 +501,7 @@ func (obj *StmtRes) Interpolate() (interfaces.Stmt, error) { Kind: obj.Kind, Name: name, Contents: contents, + Collect: obj.Collect, }, nil } @@ -522,6 +543,7 @@ func (obj *StmtRes) Copy() (interfaces.Stmt, error) { Kind: obj.Kind, Name: name, Contents: contents, + Collect: obj.Collect, }, nil } @@ -630,6 +652,10 @@ func (obj *StmtRes) TypeCheck() ([]*interfaces.UnificationInvariant, error) { // TODO: Check other cases, like if it's a function call, and we know it // can only return a single string. (Eg: fmt.printf for example.) isString := false + isListString := false + isCollectType := false + typCollectFuncInType := types.NewType(funcs.CollectFuncInType) + if _, ok := obj.Name.(*ExprStr); ok { // It's a string! (A plain string was specified.) isString = true @@ -639,14 +665,41 @@ func (obj *StmtRes) TypeCheck() ([]*interfaces.UnificationInvariant, error) { if typ.Cmp(types.TypeStr) == nil { isString = true } + if typ.Cmp(types.TypeListStr) == nil { + isListString = true + } + if typ.Cmp(typCollectFuncInType) == nil { + isCollectType = true + } } - typExpr := types.TypeListStr // default + var typExpr *types.Type // nil // If we pass here, we only allow []str, no need for exclusives! if isString { typExpr = types.TypeStr } + if isListString { + typExpr = types.TypeListStr // default for regular resources + } + if isCollectType && obj.Collect { + typExpr = typCollectFuncInType + } + + if !obj.Collect && typExpr == nil { + typExpr = types.TypeListStr // default for regular resources + } + if obj.Collect && typExpr == nil { + // TODO: do we want a default for collect ? + typExpr = typCollectFuncInType // default for collect resources + } + + if typExpr == nil { // If we don't know for sure, then we unify it all. + typExpr = &types.Type{ + Kind: types.KindUnification, + Uni: types.NewElem(), // unification variable, eg: ?1 + } + } invar := &interfaces.UnificationInvariant{ Node: obj, @@ -747,16 +800,87 @@ func (obj *StmtRes) Output(table map[interfaces.Func]types.Value) (*interfaces.O return nil, fmt.Errorf("%w: %T", ErrTableNoValue, obj) } - names := []string{} // list of names to build + // the host in this output is who the data is from + mapping, err := obj.collect(table) // gives us (name, host, data) + if err != nil { + return nil, err + } + if mapping == nil { // for when we're not collecting + mapping = make(map[string]map[string]string) + } + + typCollectFuncInType := types.NewType(funcs.CollectFuncInType) + + names := []string{} // list of names to build (TODO: map instead?) switch { case types.TypeStr.Cmp(nameValue.Type()) == nil: name := nameValue.Str() // must not panic names = append(names, name) + for n := range mapping { // delete everything else + if n == name { + continue + } + delete(mapping, n) + } + if !obj.Collect { // mapping is empty, add a stub + mapping[name] = map[string]string{ + "*": "", // empty data + } + } case types.TypeListStr.Cmp(nameValue.Type()) == nil: for _, x := range nameValue.List() { // must not panic name := x.Str() // must not panic names = append(names, name) + if !obj.Collect { // mapping is empty, add a stub + mapping[name] = map[string]string{ + "*": "", // empty data + } + } + } + for n := range mapping { // delete everything else + if util.StrInList(n, names) { + continue + } + delete(mapping, n) + } + + case obj.Collect && typCollectFuncInType.Cmp(nameValue.Type()) == nil: + hosts := make(map[string]string) + for _, x := range nameValue.List() { // must not panic + st, ok := x.(*types.StructValue) + if !ok { + // programming error + return nil, fmt.Errorf("value is not a struct") + } + name, exists := st.Lookup(funcs.CollectFuncInFieldName) + if !exists { + // programming error? + return nil, fmt.Errorf("name field is missing") + } + host, exists := st.Lookup(funcs.CollectFuncInFieldHost) + if !exists { + // programming error? + return nil, fmt.Errorf("host field is missing") + } + + s := name.Str() // must not panic + names = append(names, s) + // host is the input telling us who we want to pull from + hosts[s] = host.Str() // correspondence map + } + for n, m := range mapping { // delete everything else + if !util.StrInList(n, names) { + delete(mapping, n) + continue + } + host := hosts[n] // the matching host for the name + for h := range m { + if h == host { + continue + } + delete(mapping[n], h) + } } default: @@ -766,22 +890,32 @@ func (obj *StmtRes) Output(table map[interfaces.Func]types.Value) (*interfaces.O resources := []engine.Res{} edges := []*interfaces.Edge{} - for _, name := range names { - res, err := obj.resource(table, name) - if err != nil { - return nil, errwrap.Wrapf(err, "error building resource") - } - edgeList, err := obj.edges(table, name) - if err != nil { - return nil, errwrap.Wrapf(err, "error building edges") - } - edges = append(edges, edgeList...) + apply, err := obj.metaparams(table) + if err != nil { + return nil, errwrap.Wrapf(err, "error generating metaparams") + } - if err := obj.metaparams(table, res); err != nil { // set metaparams - return nil, errwrap.Wrapf(err, "error building meta params") + // TODO: sort? + for name, m := range mapping { + for host, data := range m { + // host may be * if not collecting + // data may be empty if not collecting + _ = host // unused atm + res, err := obj.resource(table, name, data) // one at a time + if err != nil { + return nil, errwrap.Wrapf(err, "error building resource") + } + apply(res) // apply metaparams, does not return anything + + resources = append(resources, res) + + edgeList, err := obj.edges(table, name) + if err != nil { + return nil, errwrap.Wrapf(err, "error building edges") + } + edges = append(edges, edgeList...) } - resources = append(resources, res) } return &interfaces.Output{ @@ -790,14 +924,115 @@ func (obj *StmtRes) Output(table map[interfaces.Func]types.Value) (*interfaces.O }, nil } +// collect is a helper function to pull out the collected resource data. +func (obj *StmtRes) collect(table map[interfaces.Func]types.Value) (map[string]map[string]string, error) { + if !obj.Collect { + return nil, nil // nothing to do + } + + var val types.Value // = nil + typCollectFuncOutType := types.NewType(funcs.CollectFuncOutType) + + for _, line := range obj.Contents { + x, ok := line.(*StmtResCollect) + if !ok { + continue + } + if x.Kind != obj.Kind { // should have been caught in Init + // programming error + return nil, fmt.Errorf("unexpected kind mismatch") + } + if x.valuePtr == nil { + return nil, fmt.Errorf("%w: %T", ErrFuncPointerNil, obj) + } + + fv, exists := table[x.valuePtr] + if !exists { + return nil, fmt.Errorf("%w: %T", ErrTableNoValue, obj) + } + + if err := fv.Type().Cmp(typCollectFuncOutType); err != nil { // "[]struct{name str; host str; data str}" + // programming error + return nil, fmt.Errorf("resource collect has invalid type: `%+v`", err) + } + + val = fv // found + break + } + + if val == nil { + // programming error? + return nil, nil // nothing found + } + + m := make(map[string]map[string]string) // name, host, data + + // TODO: Store/cache this in an efficient form to avoid loops... + // TODO: Eventually collect func should, for efficiency, return: + // map{struct{name str; host str}: str} // key => $data + for _, x := range val.List() { // must not panic + st, ok := x.(*types.StructValue) + if !ok { + // programming error + return nil, fmt.Errorf("value is not a struct") + } + name, exists := st.Lookup(funcs.CollectFuncOutFieldName) + if !exists { + // programming error? + return nil, fmt.Errorf("name field is missing") + } + host, exists := st.Lookup(funcs.CollectFuncOutFieldHost) + if !exists { + // programming error? + return nil, fmt.Errorf("host field is missing") + } + data, exists := st.Lookup(funcs.CollectFuncOutFieldData) + if !exists { + // programming error? + return nil, fmt.Errorf("data field is missing") + } + + // found! + n := name.Str() // must not panic + h := host.Str() // must not panic + + if _, exists := m[n]; !exists { + m[n] = make(map[string]string) + } + m[n][h] = data.Str() // must not panic + } + + return m, nil +} + // resource is a helper function to generate the res that comes from this. // TODO: it could memoize some of the work to avoid re-computation when looped -func (obj *StmtRes) resource(table map[interfaces.Func]types.Value, resName string) (engine.Res, error) { +func (obj *StmtRes) resource(table map[interfaces.Func]types.Value, resName, data string) (engine.Res, error) { res, err := engine.NewNamedResource(obj.Kind, resName) if err != nil { return nil, errwrap.Wrapf(err, "cannot create resource kind `%s` with named `%s`", obj.Kind, resName) } + // Here we start off by using the collected resource as the base params. + // Then we overwrite over it below using the normal param setup methods. + if obj.Collect && data != "" { + // TODO: Do we want to have an alternate implementation of this + // to go along with the ExportableRes encoding variant? + if res, err = engineUtil.B64ToRes(data); err != nil { + return nil, fmt.Errorf("can't convert from B64: %v", err) + } + if res.Kind() != obj.Kind { // should have been caught somewhere + // programming error + return nil, fmt.Errorf("unexpected kind mismatch") + } + obj.data.Logf("collect: %s", res) + + // XXX: Do we want to change any metaparams when we collect? + // XXX: Do we want to change any metaparams when we export? + //res.MetaParams().Hidden = false // unlikely, but I considered + res.MetaParams().Export = []string{} // don't re-export + } + sv := reflect.ValueOf(res).Elem() // pointer to struct, then struct if k := sv.Kind(); k != reflect.Struct { panic(fmt.Sprintf("expected struct, got: %s", k)) @@ -1015,23 +1250,10 @@ func (obj *StmtRes) edges(table map[interfaces.Func]types.Value, resName string) return edges, nil } -// metaparams is a helper function to set the metaparams that come from the -// resource on to the individual resource we're working on. -func (obj *StmtRes) metaparams(table map[interfaces.Func]types.Value, res engine.Res) error { - meta := engine.DefaultMetaParams.Copy() // defaults - - var rm *engine.ReversibleMeta - if r, ok := res.(engine.ReversibleRes); ok { - rm = r.ReversibleMeta() // get a struct with the defaults - } - var aem *engine.AutoEdgeMeta - if r, ok := res.(engine.EdgeableRes); ok { - aem = r.AutoEdgeMeta() // get a struct with the defaults - } - var agm *engine.AutoGroupMeta - if r, ok := res.(engine.GroupableRes); ok { - agm = r.AutoGroupMeta() // get a struct with the defaults - } +// metaparams is a helper function to get the metaparams that come from the +// resource AST so we can eventually set them on the individual resource. +func (obj *StmtRes) metaparams(table map[interfaces.Func]types.Value) (func(engine.Res), error) { + apply := []func(engine.Res){} for _, line := range obj.Contents { x, ok := line.(*StmtResMeta) @@ -1041,11 +1263,11 @@ func (obj *StmtRes) metaparams(table map[interfaces.Func]types.Value, res engine if x.Condition != nil { if x.conditionPtr == nil { - return fmt.Errorf("%w: %T", ErrFuncPointerNil, obj) + return nil, fmt.Errorf("%w: %T", ErrFuncPointerNil, obj) } b, exists := table[x.conditionPtr] if !exists { - return fmt.Errorf("%w: %T", ErrTableNoValue, obj) + return nil, fmt.Errorf("%w: %T", ErrTableNoValue, obj) } if !b.Bool() { // if value exists, and is false, skip it @@ -1054,47 +1276,63 @@ func (obj *StmtRes) metaparams(table map[interfaces.Func]types.Value, res engine } if x.metaExprPtr == nil { - return fmt.Errorf("%w: %T", ErrFuncPointerNil, obj) + return nil, fmt.Errorf("%w: %T", ErrFuncPointerNil, obj) } v, exists := table[x.metaExprPtr] if !exists { - return fmt.Errorf("%w: %T", ErrTableNoValue, obj) + return nil, fmt.Errorf("%w: %T", ErrTableNoValue, obj) } switch p := strings.ToLower(x.Property); p { // TODO: we could add these fields dynamically if we were fancy! case "noop": - meta.Noop = v.Bool() // must not panic + apply = append(apply, func(res engine.Res) { + res.MetaParams().Noop = v.Bool() // must not panic + }) case "retry": x := v.Int() // must not panic // TODO: check that it doesn't overflow - meta.Retry = int16(x) + apply = append(apply, func(res engine.Res) { + res.MetaParams().Retry = int16(x) + }) case "retryreset": - meta.RetryReset = v.Bool() // must not panic + apply = append(apply, func(res engine.Res) { + res.MetaParams().RetryReset = v.Bool() // must not panic + }) case "delay": x := v.Int() // must not panic // TODO: check that it isn't signed - meta.Delay = uint64(x) + apply = append(apply, func(res engine.Res) { + res.MetaParams().Delay = uint64(x) + }) case "poll": x := v.Int() // must not panic // TODO: check that it doesn't overflow and isn't signed - meta.Poll = uint32(x) + apply = append(apply, func(res engine.Res) { + res.MetaParams().Poll = uint32(x) + }) case "limit": // rate.Limit x := v.Float() // must not panic - meta.Limit = rate.Limit(x) + apply = append(apply, func(res engine.Res) { + res.MetaParams().Limit = rate.Limit(x) + }) case "burst": x := v.Int() // must not panic // TODO: check that it doesn't overflow - meta.Burst = int(x) + apply = append(apply, func(res engine.Res) { + res.MetaParams().Burst = int(x) + }) case "reset": - meta.Reset = v.Bool() // must not panic + apply = append(apply, func(res engine.Res) { + res.MetaParams().Reset = v.Bool() // must not panic + }) case "sema": // []string values := []string{} @@ -1102,65 +1340,126 @@ func (obj *StmtRes) metaparams(table map[interfaces.Func]types.Value, res engine s := x.Str() // must not panic values = append(values, s) } - meta.Sema = values + apply = append(apply, func(res engine.Res) { + res.MetaParams().Sema = values + }) case "rewatch": - meta.Rewatch = v.Bool() // must not panic + apply = append(apply, func(res engine.Res) { + res.MetaParams().Rewatch = v.Bool() // must not panic + }) case "realize": - meta.Realize = v.Bool() // must not panic + apply = append(apply, func(res engine.Res) { + res.MetaParams().Realize = v.Bool() // must not panic + }) case "dollar": - meta.Dollar = v.Bool() // must not panic + apply = append(apply, func(res engine.Res) { + res.MetaParams().Dollar = v.Bool() // must not panic + }) + + case "hidden": + apply = append(apply, func(res engine.Res) { + res.MetaParams().Hidden = v.Bool() // must not panic + }) + + case "export": // []string + values := []string{} + for _, x := range v.List() { // must not panic + s := x.Str() // must not panic + values = append(values, s) + } + apply = append(apply, func(res engine.Res) { + res.MetaParams().Export = values + }) case "reverse": - if rm != nil { - rm.Disabled = !v.Bool() // must not panic - } + apply = append(apply, func(res engine.Res) { + r, ok := res.(engine.ReversibleRes) + if !ok { + return + } + // *engine.ReversibleMeta + rm := r.ReversibleMeta() // get current values + rm.Disabled = !v.Bool() // must not panic + r.SetReversibleMeta(rm) // set + }) case "autoedge": - if aem != nil { + apply = append(apply, func(res engine.Res) { + r, ok := res.(engine.EdgeableRes) + if !ok { + return + } + // *engine.AutoEdgeMeta + aem := r.AutoEdgeMeta() // get current values aem.Disabled = !v.Bool() // must not panic - } + r.SetAutoEdgeMeta(aem) // set + }) case "autogroup": - if agm != nil { + apply = append(apply, func(res engine.Res) { + r, ok := res.(engine.GroupableRes) + if !ok { + return + } + // *engine.AutoGroupMeta + agm := r.AutoGroupMeta() // get current values agm.Disabled = !v.Bool() // must not panic - } + r.SetAutoGroupMeta(agm) // set + + }) case MetaField: if val, exists := v.Struct()["noop"]; exists { - meta.Noop = val.Bool() // must not panic + apply = append(apply, func(res engine.Res) { + res.MetaParams().Noop = val.Bool() // must not panic + }) } if val, exists := v.Struct()["retry"]; exists { x := val.Int() // must not panic // TODO: check that it doesn't overflow - meta.Retry = int16(x) + apply = append(apply, func(res engine.Res) { + res.MetaParams().Retry = int16(x) + }) } if val, exists := v.Struct()["retryreset"]; exists { - meta.RetryReset = val.Bool() // must not panic + apply = append(apply, func(res engine.Res) { + res.MetaParams().RetryReset = val.Bool() // must not panic + }) } if val, exists := v.Struct()["delay"]; exists { x := val.Int() // must not panic // TODO: check that it isn't signed - meta.Delay = uint64(x) + apply = append(apply, func(res engine.Res) { + res.MetaParams().Delay = uint64(x) + }) } if val, exists := v.Struct()["poll"]; exists { x := val.Int() // must not panic // TODO: check that it doesn't overflow and isn't signed - meta.Poll = uint32(x) + apply = append(apply, func(res engine.Res) { + res.MetaParams().Poll = uint32(x) + }) } if val, exists := v.Struct()["limit"]; exists { x := val.Float() // must not panic - meta.Limit = rate.Limit(x) + apply = append(apply, func(res engine.Res) { + res.MetaParams().Limit = rate.Limit(x) + }) } if val, exists := v.Struct()["burst"]; exists { x := val.Int() // must not panic // TODO: check that it doesn't overflow - meta.Burst = int(x) + apply = append(apply, func(res engine.Res) { + res.MetaParams().Burst = int(x) + }) } if val, exists := v.Struct()["reset"]; exists { - meta.Reset = val.Bool() // must not panic + apply = append(apply, func(res engine.Res) { + res.MetaParams().Reset = val.Bool() // must not panic + }) } if val, exists := v.Struct()["sema"]; exists { values := []string{} @@ -1168,44 +1467,90 @@ func (obj *StmtRes) metaparams(table map[interfaces.Func]types.Value, res engine s := x.Str() // must not panic values = append(values, s) } - meta.Sema = values + apply = append(apply, func(res engine.Res) { + res.MetaParams().Sema = values + }) } if val, exists := v.Struct()["rewatch"]; exists { - meta.Rewatch = val.Bool() // must not panic + apply = append(apply, func(res engine.Res) { + res.MetaParams().Rewatch = val.Bool() // must not panic + }) } if val, exists := v.Struct()["realize"]; exists { - meta.Realize = val.Bool() // must not panic + apply = append(apply, func(res engine.Res) { + res.MetaParams().Realize = val.Bool() // must not panic + }) } if val, exists := v.Struct()["dollar"]; exists { - meta.Dollar = val.Bool() // must not panic + apply = append(apply, func(res engine.Res) { + res.MetaParams().Dollar = val.Bool() // must not panic + }) } - if val, exists := v.Struct()["reverse"]; exists && rm != nil { - rm.Disabled = !val.Bool() // must not panic + if val, exists := v.Struct()["hidden"]; exists { + apply = append(apply, func(res engine.Res) { + res.MetaParams().Hidden = val.Bool() // must not panic + }) } - if val, exists := v.Struct()["autoedge"]; exists && aem != nil { - aem.Disabled = !val.Bool() // must not panic + if val, exists := v.Struct()["export"]; exists { + values := []string{} + for _, x := range val.List() { // must not panic + s := x.Str() // must not panic + values = append(values, s) + } + apply = append(apply, func(res engine.Res) { + res.MetaParams().Export = values + }) } - if val, exists := v.Struct()["autogroup"]; exists && agm != nil { - agm.Disabled = !val.Bool() // must not panic + if val, exists := v.Struct()["reverse"]; exists { + apply = append(apply, func(res engine.Res) { + r, ok := res.(engine.ReversibleRes) + if !ok { + return + } + // *engine.ReversibleMeta + rm := r.ReversibleMeta() // get current values + rm.Disabled = !val.Bool() // must not panic + r.SetReversibleMeta(rm) // set + }) + } + if val, exists := v.Struct()["autoedge"]; exists { + apply = append(apply, func(res engine.Res) { + r, ok := res.(engine.EdgeableRes) + if !ok { + return + } + // *engine.AutoEdgeMeta + aem := r.AutoEdgeMeta() // get current values + aem.Disabled = !val.Bool() // must not panic + r.SetAutoEdgeMeta(aem) // set + }) + } + if val, exists := v.Struct()["autogroup"]; exists { + apply = append(apply, func(res engine.Res) { + r, ok := res.(engine.GroupableRes) + if !ok { + return + } + // *engine.AutoGroupMeta + agm := r.AutoGroupMeta() // get current values + agm.Disabled = !val.Bool() // must not panic + r.SetAutoGroupMeta(agm) // set + + }) } default: - return fmt.Errorf("unknown property: %s", p) + return nil, fmt.Errorf("unknown property: %s", p) } } - res.SetMetaParams(meta) // set it! - if r, ok := res.(engine.ReversibleRes); ok { - r.SetReversibleMeta(rm) // set - } - if r, ok := res.(engine.EdgeableRes); ok { - r.SetAutoEdgeMeta(aem) // set - } - if r, ok := res.(engine.GroupableRes); ok { - r.SetAutoGroupMeta(agm) // set + fn := func(res engine.Res) { + for _, f := range apply { + f(res) + } } - return nil + return fn, nil } // StmtResContents is the interface that is met by the resource contents. Look @@ -1816,6 +2161,8 @@ func (obj *StmtResMeta) Init(data *interfaces.Data) error { case "rewatch": case "realize": case "dollar": + case "hidden": + case "export": case "reverse": case "autoedge": case "autogroup": @@ -2033,6 +2380,12 @@ func (obj *StmtResMeta) TypeCheck(kind string) ([]*interfaces.UnificationInvaria case "dollar": typExpr = types.TypeBool + case "hidden": + typExpr = types.TypeBool + + case "export": + typExpr = types.TypeListStr + case "reverse": // TODO: We might want more parameters about how to reverse. typExpr = types.TypeBool @@ -2049,7 +2402,7 @@ func (obj *StmtResMeta) TypeCheck(kind string) ([]*interfaces.UnificationInvaria // FIXME: allow partial subsets of this struct, and in any order // FIXME: we might need an updated unification engine to do this wrap := func(reverse *types.Type) *types.Type { - return types.NewType(fmt.Sprintf("struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse %s; autoedge bool; autogroup bool}", reverse.String())) + return types.NewType(fmt.Sprintf("struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse %s; autoedge bool; autogroup bool}", reverse.String())) } // TODO: We might want more parameters about how to reverse. typExpr = wrap(types.TypeBool) @@ -2104,6 +2457,198 @@ func (obj *StmtResMeta) Graph(env *interfaces.Env) (*pgraph.Graph, error) { return graph, nil } +// StmtResCollect represents hidden resource collection data in the resource. +// This does not satisfy the Stmt interface. +type StmtResCollect struct { + //Textarea + data *interfaces.Data + + Kind string + Value interfaces.Expr + valuePtr interfaces.Func // ptr for table lookup +} + +// String returns a short representation of this statement. +func (obj *StmtResCollect) String() string { + // TODO: add .String() for Condition and Value + return fmt.Sprintf("rescollect(%s)", obj.Kind) +} + +// Apply is a general purpose iterator method that operates on any AST node. It +// is not used as the primary AST traversal function because it is less readable +// and easy to reason about than manually implementing traversal for each node. +// Nevertheless, it is a useful facility for operations that might only apply to +// a select number of node types, since they won't need extra noop iterators... +func (obj *StmtResCollect) Apply(fn func(interfaces.Node) error) error { + if err := obj.Value.Apply(fn); err != nil { + return err + } + return fn(obj) +} + +// Init initializes this branch of the AST, and returns an error if it fails to +// validate. +func (obj *StmtResCollect) Init(data *interfaces.Data) error { + obj.data = data + //obj.Textarea.Setup(data) + + if obj.Kind == "" { + return fmt.Errorf("res kind is empty") + } + + return obj.Value.Init(data) +} + +// Interpolate returns a new node (aka a copy) once it has been expanded. This +// generally increases the size of the AST when it is used. It calls Interpolate +// on any child elements and builds the new node with those new node contents. +// This interpolate is different It is different from the interpolate found in +// the Expr and Stmt interfaces because it returns a different type as output. +func (obj *StmtResCollect) Interpolate() (StmtResContents, error) { + interpolated, err := obj.Value.Interpolate() + if err != nil { + return nil, err + } + return &StmtResCollect{ + //Textarea: obj.Textarea, + data: obj.data, + Kind: obj.Kind, + Value: interpolated, + }, nil +} + +// Copy returns a light copy of this struct. Anything static will not be copied. +func (obj *StmtResCollect) Copy() (StmtResContents, error) { + copied := false + value, err := obj.Value.Copy() + if err != nil { + return nil, err + } + if value != obj.Value { // must have been copied, or pointer would be same + copied = true + } + + if !copied { // it's static + return obj, nil + } + return &StmtResCollect{ + //Textarea: obj.Textarea, + data: obj.data, + Kind: obj.Kind, + Value: value, + }, nil +} + +// Ordering returns a graph of the scope ordering that represents the data flow. +// This can be used in SetScope so that it knows the correct order to run it in. +func (obj *StmtResCollect) Ordering(produces map[string]interfaces.Node) (*pgraph.Graph, map[interfaces.Node]string, error) { + graph, err := pgraph.NewGraph("ordering") + if err != nil { + return nil, nil, err + } + graph.AddVertex(obj) + + // additional constraint... + edge := &pgraph.SimpleEdge{Name: "stmtrescollectvalue"} + graph.AddEdge(obj.Value, obj, edge) // prod -> cons + + cons := make(map[interfaces.Node]string) + nodes := []interfaces.Expr{obj.Value} + + for _, node := range nodes { + g, c, err := node.Ordering(produces) + if err != nil { + return nil, nil, err + } + graph.AddGraph(g) // add in the child graph + + for k, v := range c { // c is consumes + x, exists := cons[k] + if exists && v != x { + return nil, nil, fmt.Errorf("consumed value is different, got `%+v`, expected `%+v`", x, v) + } + cons[k] = v // add to map + + n, exists := produces[v] + if !exists { + continue + } + edge := &pgraph.SimpleEdge{Name: "stmtrescollect"} + graph.AddEdge(n, k, edge) + } + } + + return graph, cons, nil +} + +// SetScope stores the scope for later use in this resource and its children, +// which it propagates this downwards to. +func (obj *StmtResCollect) SetScope(scope *interfaces.Scope) error { + if err := obj.Value.SetScope(scope, map[string]interfaces.Expr{}); err != nil { + return err + } + return nil +} + +// TypeCheck returns the list of invariants that this node produces. It does so +// recursively on any children elements that exist in the AST, and returns the +// collection to the caller. It calls TypeCheck for child statements, and +// Infer/Check for child expressions. It is different from the TypeCheck method +// found in the Stmt interface because it adds an input parameter. +func (obj *StmtResCollect) TypeCheck(kind string) ([]*interfaces.UnificationInvariant, error) { + typ, invariants, err := obj.Value.Infer() + if err != nil { + return nil, err + } + + //invars, err := obj.Value.Check(typ) // don't call this here! + + if !engine.IsKind(kind) { + return nil, fmt.Errorf("invalid resource kind: %s", kind) + } + + typExpr := types.NewType(funcs.CollectFuncOutType) + if typExpr == nil { + return nil, fmt.Errorf("unexpected nil type") + } + + // regular scenario + invar := &interfaces.UnificationInvariant{ + Node: obj, + Expr: obj.Value, + Expect: typExpr, + Actual: typ, + } + invariants = append(invariants, invar) + + return invariants, nil +} + +// Graph returns the reactive function graph which is expressed by this node. It +// includes any vertices produced by this node, and the appropriate edges to any +// vertices that are produced by its children. Nodes which fulfill the Expr +// interface directly produce vertices (and possible children) where as nodes +// that fulfill the Stmt interface do not produces vertices, where as their +// children might. It is interesting to note that nothing directly adds an edge +// to the resources created, but rather, once all the values (expressions) with +// no outgoing edges have produced at least a single value, then the resources +// know they're able to be built. +func (obj *StmtResCollect) Graph(env *interfaces.Env) (*pgraph.Graph, error) { + graph, err := pgraph.NewGraph("rescollect") + if err != nil { + return nil, err + } + + g, f, err := obj.Value.Graph(env) + if err != nil { + return nil, err + } + graph.AddGraph(g) + obj.valuePtr = f + + return graph, nil +} + // StmtEdge is a representation of a dependency. It also supports send/recv. // Edges represents that the first resource (Kind/Name) listed in the // EdgeHalfList should happen in the resource graph *before* the next resource diff --git a/lang/core/collect.go b/lang/core/collect.go new file mode 100644 index 00000000..fb4085b0 --- /dev/null +++ b/lang/core/collect.go @@ -0,0 +1,493 @@ +// Mgmt +// Copyright (C) James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// +// Additional permission under GNU GPL version 3 section 7 +// +// If you modify this program, or any covered work, by linking or combining it +// with embedded mcl code and modules (and that the embedded mcl code and +// modules which link with this program, contain a copy of their source code in +// the authoritative form) containing parts covered by the terms of any other +// license, the licensors of this program grant you additional permission to +// convey the resulting work. Furthermore, the licensors of this program grant +// the original author, James Shubin, additional permission to update this +// additional permission if he deems it necessary to achieve the goals of this +// additional permission. + +package core + +import ( + "context" + "fmt" + + "github.com/purpleidea/mgmt/engine" + "github.com/purpleidea/mgmt/lang/funcs" + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/lang/types" + "github.com/purpleidea/mgmt/util/errwrap" +) + +const ( + // CollectFuncName is the name this function is registered as. This + // starts with an underscore so that it cannot be used from the lexer. + CollectFuncName = funcs.CollectFuncName + + // arg names... + collectArgNameKind = "kind" + collectArgNameNames = "names" + + //collectFuncInType = "[]struct{kind str; name str; host str}" + //collectFuncInFieldKind = "kind" // must match above struct field + collectFuncInFieldName = funcs.CollectFuncInFieldName + collectFuncInFieldHost = funcs.CollectFuncInFieldHost + + // collectFuncInType is the most complex of the three possible input + // types. The other two possible ones are str or []str. + collectFuncInType = funcs.CollectFuncInType // "[]struct{name str; host str}" + + collectFuncOutFieldName = funcs.CollectFuncOutFieldName + collectFuncOutFieldHost = funcs.CollectFuncOutFieldHost + collectFuncOutFieldData = funcs.CollectFuncOutFieldData + + // collectFuncOutStruct is the struct type that we return a list of. + collectFuncOutStruct = funcs.CollectFuncOutStruct + + // collectFuncOutType is the expected return type, the data field is an + // encoded resource blob. + // XXX: Once structs can be real map keys in mcl, could this instead be: + // map{struct{name str; host str}: str} // key => $data (efficiency!) + collectFuncOutType = funcs.CollectFuncOutType // "[]struct{name str; host str; data str}" +) + +func init() { + funcs.Register(CollectFuncName, func() interfaces.Func { return &CollectFunc{} }) // must register the func and name +} + +var _ interfaces.InferableFunc = &CollectFunc{} // ensure it meets this expectation + +// CollectFunc is a special internal function which gets given information about +// incoming resource collection data. For example, to collect, that "pseudo +// resource" will need to know what resource "kind" it's collecting, the names +// of those resources, and the corresponding hostnames that they are getting the +// data from. With that three-tuple of data, it can pull all of that from etcd +// and pass it into a hidden resource body field so that the collect "pseudo +// resource" can use it to build the exported resource! +// +// The "kind" comes in as the first arg. The second arg (in its complex form) is +// []struct{name str; host str} is what the end user is _asking_ this function +// for. +// TODO: We could have a second version of this collect function which takes a +// single arg which receives []struct{kind str; name str; host str} which would +// let us write a truly dynamic collector. It's unlikely we want to allow this +// in most cases because it lets you play type games since the field name in one +// resource kind might be a different type in another. +type CollectFunc struct { + // Type is the type of the second arg that we receive. (When known.) + Type *types.Type + + init *interfaces.Init + + last types.Value // last value received to use for diff + args []types.Value + kind string + result types.Value // last calculated output + + watchChan chan error +} + +// String returns a simple name for this function. This is needed so this struct +// can satisfy the pgraph.Vertex interface. +func (obj *CollectFunc) String() string { + return CollectFuncName +} + +// ArgGen returns the Nth arg name for this function. +func (obj *CollectFunc) ArgGen(index int) (string, error) { + seq := []string{collectArgNameKind, collectArgNameNames} + if l := len(seq); index >= l { + return "", fmt.Errorf("index %d exceeds arg length of %d", index, l) + } + return seq[index], nil +} + +// helper +func (obj *CollectFunc) sig() *types.Type { + arg := "?1" + if obj.Type != nil { + arg = obj.Type.String() + } + + return types.NewType(fmt.Sprintf( + "func(%s str, %s %s) %s", + collectArgNameKind, + collectArgNameNames, + arg, + collectFuncOutType, + )) +} + +// check determines if our arg type is valid. +func (obj *CollectFunc) check(typ *types.Type) error { + if typ.Cmp(types.TypeStr) == nil { + return nil + } + if typ.Cmp(types.TypeListStr) == nil { + return nil + } + if typ.Cmp(types.NewType(collectFuncInType)) == nil { + return nil + } + + return fmt.Errorf("unexpected type: %s", typ.String()) +} + +// FuncInfer takes partial type and value information from the call site of this +// function so that it can build an appropriate type signature for it. The type +// signature may include unification variables. +func (obj *CollectFunc) FuncInfer(partialType *types.Type, partialValues []types.Value) (*types.Type, []*interfaces.UnificationInvariant, error) { + // There are many variants which we could allow... These variants are + // what the user specifies in the $name field when they collect. They + // will often get the third form from helper functions that filter the + // data from the world graph, so that they can programmatically match + // using our mcl language rather than hard-coding a mini matcher lang. + // + // XXX: Do we want to allow all these variants? + // + // func(str, str) out # matches all hostnames + // OR + // func(str, []str) out # matches all hostnames + // OR + // func(str, []struct{name str; host str} ) out # matches exact tuples or all hostnames if host is "" + // SO + // func(str, ?1) out + // AND + // out = []struct{name str; host str; data str} # it could have kind too, but not needed right now + // + // NOTE: map[str]str (name => host) is NOT a good choice because even + // though we nominally have one host exporting a given name, it's valid + // to have that same name come from more than one host and for them to + // be compatible, almost like an "exported resources redundancy". + // + // NOTE map[str][]str (name => []host) is sensible, BUT it makes it + // harder to express that we want "every host", which we can do with the + // struct variant above by having host be the empty string. It's also + // easier for the mcl programmer to understand that variant. + + if l := 2; len(partialValues) != l { + return nil, nil, fmt.Errorf("function must have %d args", l) + } + if err := partialValues[0].Type().Cmp(types.TypeStr); err != nil { + return nil, nil, errwrap.Wrapf(err, "function arg kind must be a str") + } + kind := partialValues[0].Str() // must not panic + if kind == "" { + return nil, nil, fmt.Errorf("function must not have an empty kind arg") + } + if !engine.IsKind(kind) { + return nil, nil, fmt.Errorf("invalid resource kind: %s", kind) + } + + // If second arg is one of what we're expecting, then we are solved! + if len(partialType.Ord) == 2 && partialType.Map[partialType.Ord[1]] != nil { + typ := partialType.Map[partialType.Ord[1]] + if err := obj.check(typ); err == nil { + obj.Type = typ // success! + } + } + + return obj.sig(), []*interfaces.UnificationInvariant{}, nil +} + +// Build is run to turn the polymorphic, undetermined function, into the +// specific statically typed version. It is usually run after Unify completes, +// and must be run before Info() and any of the other Func interface methods are +// used. This function is idempotent, as long as the arg isn't changed between +// runs. +func (obj *CollectFunc) Build(typ *types.Type) (*types.Type, error) { + // typ is the KindFunc signature we're trying to build... + if typ.Kind != types.KindFunc { + return nil, fmt.Errorf("input type must be of kind func") + } + + if len(typ.Ord) != 2 { + return nil, fmt.Errorf("the collect function needs two args") + } + tStr, exists := typ.Map[typ.Ord[0]] + if !exists || tStr == nil { + return nil, fmt.Errorf("first arg must be specified") + } + if tStr.Cmp(types.TypeStr) != nil { + return nil, fmt.Errorf("first arg must be a str") + } + + tArg, exists := typ.Map[typ.Ord[1]] + if !exists || tArg == nil { + return nil, fmt.Errorf("second arg must be specified") + } + + if err := obj.check(tArg); err != nil { + return nil, err + } + + obj.Type = tArg // store it! + + return obj.sig(), nil +} + +// Copy is implemented so that the obj.Type value is not lost if we copy this +// function. That value is learned during FuncInfer, and previously would have +// been lost by the time we used it in Build. +func (obj *CollectFunc) Copy() interfaces.Func { + return &CollectFunc{ + Type: obj.Type, // don't copy because we use this after unification + + init: obj.init, // likely gets overwritten anyways + } +} + +// Validate tells us if the input struct takes a valid form. +func (obj *CollectFunc) Validate() error { + if obj.Type == nil { + return fmt.Errorf("the Type is unknown") + } + if err := obj.check(obj.Type); err != nil { + return err + } + return nil +} + +// Info returns some static info about itself. Build must be called before this +// will return correct data. +func (obj *CollectFunc) Info() *interfaces.Info { + // Since this function implements FuncInfer we want sig to return nil to + // avoid an accidental return of unification variables when we should be + // getting them from FuncInfer, and not from here. (During unification!) + var sig *types.Type + if obj.Type != nil && obj.check(obj.Type) == nil { + sig = obj.sig() // helper + } + return &interfaces.Info{ + Pure: false, + Memo: false, + Sig: sig, + Err: obj.Validate(), + } +} + +// Init runs some startup code for this function. +func (obj *CollectFunc) Init(init *interfaces.Init) error { + obj.init = init + obj.watchChan = make(chan error) // XXX: sender should close this, but did I implement that part yet??? + return nil +} + +// Stream returns the changing values that this func has over time. +func (obj *CollectFunc) Stream(ctx context.Context) error { + defer close(obj.init.Output) // the sender closes + ctx, cancel := context.WithCancel(ctx) + defer cancel() // important so that we cleanup the watch when exiting + for { + select { + // TODO: should this first chan be run as a priority channel to + // avoid some sort of glitch? is that even possible? can our + // hostname check with reality (below) fix that? + case input, ok := <-obj.init.Input: + if !ok { + obj.init.Input = nil // don't infinite loop back + continue // no more inputs, but don't return! + } + //if err := input.Type().Cmp(obj.Info().Sig.Input); err != nil { + // return errwrap.Wrapf(err, "wrong function input") + //} + + if obj.last != nil && input.Cmp(obj.last) == nil { + continue // value didn't change, skip it + } + obj.last = input // store for next + + args, err := interfaces.StructToCallableArgs(input) // []types.Value, error) + if err != nil { + return err + } + obj.args = args + + kind := args[0].Str() + if kind == "" { + return fmt.Errorf("can't use an empty kind") + } + if obj.init.Debug { + obj.init.Logf("kind: %s", kind) + } + + // TODO: support changing the key over time? + if obj.kind == "" { + obj.kind = kind // store it + var err error + // Don't send a value right away, wait for the + // first Watch startup event to get one! + obj.watchChan, err = obj.init.World.ResWatch(ctx, obj.kind) // watch for var changes + if err != nil { + return err + } + + } else if obj.kind != kind { + return fmt.Errorf("can't change kind, previously: `%s`", obj.kind) + } + + continue // we get values on the watch chan, not here! + + case err, ok := <-obj.watchChan: + if !ok { // closed + // XXX: if we close, perhaps the engine is + // switching etcd hosts and we should retry? + // maybe instead we should get an "etcd + // reconnect" signal, and the lang will restart? + return nil + } + if err != nil { + return errwrap.Wrapf(err, "channel watch failed on `%s`", obj.kind) + } + + result, err := obj.Call(ctx, obj.args) // get the value... + if err != nil { + return err + } + + // if the result is still the same, skip sending an update... + if obj.result != nil && result.Cmp(obj.result) == nil { + continue // result didn't change + } + obj.result = result // store new result + + case <-ctx.Done(): + return nil + } + + select { + case obj.init.Output <- obj.result: // send + // pass + case <-ctx.Done(): + return nil + } + } +} + +// Call this function with the input args and return the value if it is possible +// to do so at this time. This was previously getValue which gets the value +// we're looking for. +func (obj *CollectFunc) Call(ctx context.Context, args []types.Value) (types.Value, error) { + kind := args[0].Str() + if kind == "" { + return nil, fmt.Errorf("resource kind is empty") + } + if !engine.IsKind(kind) { + return nil, fmt.Errorf("invalid resource kind: %s", kind) + } + + filters := []*engine.ResFilter{} + + arg := args[1] + typ := arg.Type() + // Can be one of: str, []str, []struct{name str; host str} for matching. + + if typ.Cmp(types.TypeStr) == nil { // it must be a name only + filter := &engine.ResFilter{ + Kind: kind, + Name: arg.Str(), + Host: "", // any + } + filters = append(filters, filter) + } + if typ.Cmp(types.TypeListStr) == nil { + for _, x := range arg.List() { + filter := &engine.ResFilter{ + Kind: kind, + Name: x.Str(), + Host: "", // any + } + filters = append(filters, filter) + } + } + if typ.Cmp(types.NewType(collectFuncInType)) == nil { + for _, x := range arg.List() { + st, ok := x.(*types.StructValue) + if !ok { + // programming error + return nil, fmt.Errorf("value is not a struct") + } + name, exists := st.Lookup(collectFuncInFieldName) + if !exists { + // programming error? + return nil, fmt.Errorf("name field is missing") + } + host, exists := st.Lookup(collectFuncInFieldHost) + if !exists { + // programming error? + return nil, fmt.Errorf("host field is missing") + } + + filter := &engine.ResFilter{ + Kind: kind, + Name: name.Str(), + Host: host.Str(), + } + filters = append(filters, filter) + } + } + + resOutput, err := obj.init.World.ResCollect(ctx, filters) + if err != nil { + return nil, err + } + + list := types.NewList(obj.Info().Sig.Out) // collectFuncOutType + for _, x := range resOutput { + // programming error if any of these error... + if x.Kind != kind { + return nil, fmt.Errorf("unexpected kind: %s", x.Kind) + } + if x.Name == "" { + return nil, fmt.Errorf("unexpected empty name") + } + if x.Host == "" { + return nil, fmt.Errorf("unexpected empty host") + } + if x.Data == "" { + return nil, fmt.Errorf("unexpected empty data") + } + + name := &types.StrValue{V: x.Name} + host := &types.StrValue{V: x.Host} // from + data := &types.StrValue{V: x.Data} + + st := types.NewStruct(types.NewType(collectFuncOutStruct)) + if err := st.Set(collectFuncOutFieldName, name); err != nil { + return nil, errwrap.Wrapf(err, "struct could not add field `%s`, val: `%s`", collectFuncOutFieldName, name) + } + if err := st.Set(collectFuncOutFieldHost, host); err != nil { + return nil, errwrap.Wrapf(err, "struct could not add field `%s`, val: `%s`", collectFuncOutFieldHost, host) + } + if err := st.Set(collectFuncOutFieldData, data); err != nil { + return nil, errwrap.Wrapf(err, "struct could not add field `%s`, val: `%s`", collectFuncOutFieldData, data) + } + + if err := list.Add(st); err != nil { // XXX: improve perf of Add + return nil, err + } + } + + return list, nil // put struct into interface type +} diff --git a/lang/funcs/funcs.go b/lang/funcs/funcs.go index 172fde7e..e5960a2b 100644 --- a/lang/funcs/funcs.go +++ b/lang/funcs/funcs.go @@ -90,6 +90,33 @@ const ( // as. This starts with an underscore so that it cannot be used from the // lexer. StructLookupOptionalFuncName = "_struct_lookup_optional" + + // CollectFuncName is the name this function is registered as. This + // starts with an underscore so that it cannot be used from the lexer. + CollectFuncName = "_collect" + + // CollectFuncInFieldName is the name of the name field in the struct. + CollectFuncInFieldName = "name" + // CollectFuncInFieldHost is the name of the host field in the struct. + CollectFuncInFieldHost = "host" + + // CollectFuncInType is the most complex of the three possible input + // types. The other two possible ones are str or []str. + CollectFuncInType = "[]struct{" + CollectFuncInFieldName + " str; " + CollectFuncInFieldHost + " str}" + + // CollectFuncOutFieldName is the name of the name field in the struct. + CollectFuncOutFieldName = "name" + // CollectFuncOutFieldHost is the name of the host field in the struct. + CollectFuncOutFieldHost = "host" + // CollectFuncOutFieldData is the name of the data field in the struct. + CollectFuncOutFieldData = "data" + + // CollectFuncOutStruct is the struct type that we return a list of. + CollectFuncOutStruct = "struct{" + CollectFuncOutFieldName + " str; " + CollectFuncOutFieldHost + " str; " + CollectFuncOutFieldData + " str}" + + // CollectFuncOutType is the expected return type, the data field is an + // encoded resource blob. + CollectFuncOutType = "[]" + CollectFuncOutStruct ) // registeredFuncs is a global map of all possible funcs which can be used. You diff --git a/lang/interpret/interpret.go b/lang/interpret/interpret.go index d233f0a1..d65181dd 100644 --- a/lang/interpret/interpret.go +++ b/lang/interpret/interpret.go @@ -42,11 +42,83 @@ import ( "github.com/purpleidea/mgmt/util/errwrap" ) +// Interpreter is a base struct for handling the Interpret operation. There is +// nothing stateful here, you don't need to preserve this between runs. +type Interpreter struct { + // Debug represents if we're running in debug mode or not. + Debug bool + + // Logf is a logger which should be used. + Logf func(format string, v ...interface{}) + + // lookup stores the resources found by kind and name. It doesn't store + // any resources which are hidden since those could have duplicates. + // format: map[kind]map[name]Res + lookup map[engine.ResPtrUID]engine.Res + + // lookupHidden stores the hidden resources found by kind and name. It + // doesn't store any normal resources which are not hidden. + // format formerly: map[kind]map[name]Res + lookupHidden map[engine.ResPtrUID][]engine.Res + + // receive doesn't need a special extension for hidden resources since + // they can't send, only recv, and senders can't have incompatible dupes + // format formerly: map[kind]map[name]map[field]*Send + receive map[engine.ResPtrUID]map[string]*engine.Send + + // export tracks the unique combinations we export. (kind, name, host) + export map[engine.ResDelete]struct{} +} + // Interpret runs the program and outputs a generated resource graph. It // requires an AST, and the table of values required to populate that AST. Type // unification, and earlier steps should obviously be run first so that you can // actually get a useful resource graph out of this instead of an error! -func Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgraph.Graph, error) { +func (obj *Interpreter) Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgraph.Graph, error) { + + // build the kind,name -> res mapping + obj.lookup = make(map[engine.ResPtrUID]engine.Res) + obj.lookupHidden = make(map[engine.ResPtrUID][]engine.Res) + // build the send/recv mapping + obj.receive = make(map[engine.ResPtrUID]map[string]*engine.Send) + // build the exports + obj.export = make(map[engine.ResDelete]struct{}) + + // Remember that if a resource is "Hidden", then make sure it is NOT + // sending to anyone, since it would never produce a value. It can + // receive values, since those might be used during export. + // + // Remember that if a resource is "Hidden", then it may exist alongside + // another resource with the same kind+name without triggering the + // "inequivalent duplicate resource" style of errors. Of course multiple + // hidden resources with the same kind+name may also exist + // simultaneously, just keep in mind that it means that an edge pointing + // to a particular kind+name now actually may point to more than one! + // + // This is needed because of two reasons: (1) because a regular resource + // will likely never be compatible with a "Hidden" and "Exported" + // resource because one resource might have the Meta:hidden and + // Meta:export params and one might not; (2) because you may wish to + // have two different hidden resources of different params which export + // to different hosts, which means they would likely not be compatible. + // + // Since we can have more than one "Hidden" and "Exported" resource with + // the same name and kind, it's important that we don't export that data + // to the same (kind, name, host) location since we'd have multiple + // writers to the same key in our World store. We could consider + // checking for compatibility, but that's more difficult to achieve. The + // "any" host is treated as a special key, which punts this duplicate + // problem to being a collection problem. (Which could happen with two + // different hosts each exporting a different value to a single host.) + // + // Remember that the resource graph that this function returns, may now + // contain two or more identically named kind+name resources, if at + // least one of them is "Hidden". If they are entirely identical, then + // it's acceptable to merge them. They may _not_ be merged with the + // CompatibleRes API, since on resource "collection" a param may be + // changed which could conceivably be incompatible with how we ran the + // AdaptCmp API when we merged them. + output, err := ast.Output(table) // contains resList, edgeList, etc... if err != nil { return nil, err @@ -57,22 +129,53 @@ func Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgr return nil, errwrap.Wrapf(err, "could not create new graph") } - var lookup = make(map[string]map[string]engine.Res) // map[kind]map[name]Res - // build the send/recv mapping; format: map[kind]map[name]map[field]*Send - var receive = make(map[string]map[string]map[string]*engine.Send) - for _, res := range output.Resources { kind := res.Kind() name := res.Name() - if _, exists := lookup[kind]; !exists { - lookup[kind] = make(map[string]engine.Res) - receive[kind] = make(map[string]map[string]*engine.Send) - } - if _, exists := receive[kind][name]; !exists { - receive[kind][name] = make(map[string]*engine.Send) + meta := res.MetaParams() + ruid := engine.ResPtrUID{ + Kind: kind, + Name: name, } - if r, exists := lookup[kind][name]; exists { // found same name + for _, host := range meta.Export { + uid := engine.ResDelete{ + Kind: kind, + Name: name, + Host: host, + } + if _, exists := obj.export[uid]; exists { + return nil, fmt.Errorf("duplicate export: %s to %s", res, host) + } + obj.export[uid] = struct{}{} + } + + if meta.Hidden { + rs := obj.lookupHidden[ruid] + if len(rs) > 0 { + // We only need to check against the last added + // resource since this should be commutative, + // and as we add more they check themselves in. + r := rs[len(rs)-1] + + // XXX: If we want to check against the regular + // resources in obj.lookup, then do it here. + + // If they're different, then we deduplicate. + if err := engine.ResCmp(r, res); err == nil { + continue + } + } + + // add to temporary lookup table + obj.lookupHidden[ruid] = append(obj.lookupHidden[ruid], res) + continue + } + + if r, exists := obj.lookup[ruid]; exists { // found same name + // XXX: If we want to check against the special hidden + // resources in obj.lookupHidden, then do it here. + // if the resources support the compatibility API, then // we can attempt to merge them intelligently... r1, ok1 := r.(engine.CompatibleRes) @@ -87,7 +190,7 @@ func Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgr return nil, errwrap.Wrapf(err, "could not merge duplicate resources") } - lookup[kind][name] = merged + obj.lookup[ruid] = merged // they match here, we don't need to test below! continue } @@ -104,79 +207,54 @@ func Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgr // currently we add the first one that was found... continue } - lookup[kind][name] = res // add to temporary lookup table + obj.lookup[ruid] = res // add to temporary lookup table //graph.AddVertex(res) // do this below once this table is final } // ensure all the vertices exist... - for _, m := range lookup { - for _, res := range m { + for _, res := range obj.lookup { + graph.AddVertex(res) + } + for _, rs := range obj.lookupHidden { + for _, res := range rs { graph.AddVertex(res) } } - for _, e := range output.Edges { - var v1, v2 engine.Res - var exists bool - var m map[string]engine.Res - var notify = e.Notify - - if m, exists = lookup[e.Kind1]; exists { - v1, exists = m[e.Name1] + for _, edge := range output.Edges { + v1s := obj.lookupAll(edge.Kind1, edge.Name1) + if len(v1s) == 0 { + return nil, fmt.Errorf("edge cannot find resource kind: %s named: `%s`", edge.Kind1, edge.Name1) } - if !exists { - return nil, fmt.Errorf("edge cannot find resource kind: %s named: `%s`", e.Kind1, e.Name1) - } - if m, exists = lookup[e.Kind2]; exists { - v2, exists = m[e.Name2] - } - if !exists { - return nil, fmt.Errorf("edge cannot find resource kind: %s named: `%s`", e.Kind2, e.Name2) + v2s := obj.lookupAll(edge.Kind2, edge.Name2) + if len(v2s) == 0 { + return nil, fmt.Errorf("edge cannot find resource kind: %s named: `%s`", edge.Kind2, edge.Name2) } - if existingEdge := graph.FindEdge(v1, v2); existingEdge != nil { - // collate previous Notify signals to this edge with OR - notify = notify || (existingEdge.(*engine.Edge)).Notify - } - - edge := &engine.Edge{ - Name: fmt.Sprintf("%s -> %s", v1, v2), - Notify: notify, - } - graph.AddEdge(v1, v2, edge) // identical duplicates are ignored - - // send recv - if (e.Send == "") != (e.Recv == "") { // xor - return nil, fmt.Errorf("you must specify both send/recv fields or neither") - } - if e.Send == "" || e.Recv == "" { // is there send/recv to do or not? - continue - } - - // check for pre-existing send/recv at this key - if existingSend, exists := receive[e.Kind2][e.Name2][e.Recv]; exists { - // ignore identical duplicates - // TODO: does this safe ignore work with duplicate compatible resources? - if existingSend.Res != v1 || existingSend.Key != e.Send { - return nil, fmt.Errorf("resource: `%s` has duplicate receive on: `%s` param", engine.Repr(e.Kind2, e.Name2), e.Recv) + // Make edges pair wise between each two. Normally these loops + // only have one iteration each unless we have Hidden resources. + for _, v1 := range v1s { + for _, v2 := range v2s { + e := obj.makeEdge(graph, v1, v2, edge) + graph.AddEdge(v1, v2, e) // identical duplicates are ignored } } - res1, ok := v1.(engine.SendableRes) - if !ok { - return nil, fmt.Errorf("cannot send from resource: %s", engine.Stringer(v1)) + // send recv + if (edge.Send == "") != (edge.Recv == "") { // xor + return nil, fmt.Errorf("you must specify both send/recv fields or neither") } - res2, ok := v2.(engine.RecvableRes) - if !ok { - return nil, fmt.Errorf("cannot recv to resource: %s", engine.Stringer(v2)) + if edge.Send == "" || edge.Recv == "" { // is there send/recv to do or not? + continue } - if err := engineUtil.StructFieldCompat(res1.Sends(), e.Send, res2, e.Recv); err != nil { - return nil, errwrap.Wrapf(err, "cannot send/recv from %s.%s to %s.%s", engine.Stringer(v1), e.Send, engine.Stringer(v2), e.Recv) + for _, v1 := range v1s { + for _, v2 := range v2s { + if err := obj.makeSendRecv(v1, v2, edge); err != nil { + return nil, err + } + } } - - // store mapping for later - receive[e.Kind2][e.Name2][e.Recv] = &engine.Send{Res: res1, Key: e.Send} } // we need to first build up a map of all the resources handles, because @@ -186,12 +264,23 @@ func Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgr // pre-existing mappings, so we can now set them all at once at the end! // TODO: do this in a deterministic order - for kind, x := range receive { - for name, recv := range x { - if len(recv) == 0 { // skip empty maps from allocation! - continue + for st, recv := range obj.receive { + kind := st.Kind + name := st.Name + + if len(recv) == 0 { // skip empty maps from allocation! + continue + } + if r := obj.lookupRes(kind, name); r != nil { + res, ok := r.(engine.RecvableRes) + if !ok { + return nil, fmt.Errorf("cannot recv to resource: %s", engine.Repr(kind, name)) } - r := lookup[kind][name] + res.SetRecv(recv) // set it! + } + + // hidden + for _, r := range obj.lookupHiddenRes(kind, name) { res, ok := r.(engine.RecvableRes) if !ok { return nil, fmt.Errorf("cannot recv to resource: %s", engine.Repr(kind, name)) @@ -208,3 +297,104 @@ func Interpret(ast interfaces.Stmt, table map[interfaces.Func]types.Value) (*pgr return graph, nil } + +// lookupRes is a simple helper function. Returns nil if not found. +func (obj *Interpreter) lookupRes(kind, name string) engine.Res { + ruid := engine.ResPtrUID{ + Kind: kind, + Name: name, + } + res, exists := obj.lookup[ruid] + if !exists { + return nil + } + + return res +} + +// lookupHiddenRes is a simple helper function. Returns any found. +func (obj *Interpreter) lookupHiddenRes(kind, name string) []engine.Res { + ruid := engine.ResPtrUID{ + Kind: kind, + Name: name, + } + res, exists := obj.lookupHidden[ruid] + if !exists { + return nil + } + + return res +} + +// lookupAll is a simple helper function. Returns any found. +func (obj *Interpreter) lookupAll(kind, name string) []pgraph.Vertex { + vs := []pgraph.Vertex{} + + if r := obj.lookupRes(kind, name); r != nil { + vs = append(vs, r) + } + + for _, r := range obj.lookupHiddenRes(kind, name) { + vs = append(vs, r) + } + + return vs +} + +// makeEdge is a simple helper function. +func (obj *Interpreter) makeEdge(graph *pgraph.Graph, v1, v2 pgraph.Vertex, edge *interfaces.Edge) *engine.Edge { + var notify = edge.Notify + + if existingEdge := graph.FindEdge(v1, v2); existingEdge != nil { + // collate previous Notify signals to this edge with OR + notify = notify || (existingEdge.(*engine.Edge)).Notify + } + + return &engine.Edge{ + Name: fmt.Sprintf("%s -> %s", v1, v2), + Notify: notify, + } +} + +// makeSendRecv is a simple helper function. +func (obj *Interpreter) makeSendRecv(v1, v2 pgraph.Vertex, edge *interfaces.Edge) error { + ruid := engine.ResPtrUID{ + Kind: edge.Kind2, + Name: edge.Name2, + } + + if _, exists := obj.receive[ruid]; !exists { + obj.receive[ruid] = make(map[string]*engine.Send) + } + + // check for pre-existing send/recv at this key + if existingSend, exists := obj.receive[ruid][edge.Recv]; exists { + // ignore identical duplicates + // TODO: does this safe ignore work with duplicate compatible resources? + if existingSend.Res != v1 || existingSend.Key != edge.Send { + return fmt.Errorf("resource: `%s` has duplicate receive on: `%s` param", engine.Repr(edge.Kind2, edge.Name2), edge.Recv) + } + } + + if res, ok := v1.(engine.Res); ok && res.MetaParams().Hidden && edge.Send != "" { + return fmt.Errorf("cannot send from hidden resource: %s", engine.Stringer(res)) + } + + res1, ok := v1.(engine.SendableRes) + if !ok { + return fmt.Errorf("cannot send from resource: %s", engine.Stringer(res1)) + } + res2, ok := v2.(engine.RecvableRes) + if !ok { + return fmt.Errorf("cannot recv to resource: %s", engine.Stringer(res2)) + } + + if err := engineUtil.StructFieldCompat(res1.Sends(), edge.Send, res2, edge.Recv); err != nil { + return errwrap.Wrapf(err, "cannot send/recv from %s.%s to %s.%s", engine.Stringer(res1), edge.Send, engine.Stringer(res2), edge.Recv) + } + + // store mapping for later + obj.receive[ruid][edge.Recv] = &engine.Send{Res: res1, Key: edge.Send} + + return nil +} diff --git a/lang/interpret_test.go b/lang/interpret_test.go index aead36f4..fee80a38 100644 --- a/lang/interpret_test.go +++ b/lang/interpret_test.go @@ -1415,7 +1415,14 @@ func TestAstFunc2(t *testing.T) { // run interpret! table := funcs.Table() // map[interfaces.Func]types.Value - ograph, err := interpret.Interpret(iast, table) + interpreter := &interpret.Interpreter{ + Debug: testing.Verbose(), // set via the -test.v flag to `go test` + Logf: func(format string, v ...interface{}) { + logf("interpret: "+format, v...) + }, + } + + ograph, err := interpreter.Interpret(iast, table) if (!fail || !failInterpret) && err != nil { t.Errorf("test #%d: FAIL", index) t.Errorf("test #%d: interpret failed with: %+v", index, err) @@ -2267,7 +2274,14 @@ func TestAstFunc3(t *testing.T) { // run interpret! table := funcs.Table() // map[interfaces.Func]types.Value - ograph, err := interpret.Interpret(iast, table) + interpreter := &interpret.Interpreter{ + Debug: testing.Verbose(), // set via the -test.v flag to `go test` + Logf: func(format string, v ...interface{}) { + logf("interpret: "+format, v...) + }, + } + + ograph, err := interpreter.Interpret(iast, table) if (!fail || !failInterpret) && err != nil { t.Errorf("test #%d: FAIL", index) t.Errorf("test #%d: interpret failed with: %+v", index, err) diff --git a/lang/interpret_test/TestAstFunc1/resdupefields0.txtar b/lang/interpret_test/TestAstFunc1/resdupefields0.txtar index f6d136ed..8a86f01f 100644 --- a/lang/interpret_test/TestAstFunc1/resdupefields0.txtar +++ b/lang/interpret_test/TestAstFunc1/resdupefields0.txtar @@ -15,6 +15,8 @@ test "test" { rewatch => false, realize => true, dollar => false, + hidden => false, + export => ["hostname",], reverse => true, autoedge => true, autogroup => true, diff --git a/lang/interpret_test/TestAstFunc1/resdupefields5.txtar b/lang/interpret_test/TestAstFunc1/resdupefields5.txtar index 274ccec8..6a8f980c 100644 --- a/lang/interpret_test/TestAstFunc1/resdupefields5.txtar +++ b/lang/interpret_test/TestAstFunc1/resdupefields5.txtar @@ -15,6 +15,8 @@ test "test" { rewatch => false, realize => true, dollar => false, + hidden => false, + export => ["hostname",], reverse => true, autoedge => true, autogroup => true, @@ -25,25 +27,30 @@ test "test" { #}, } -- OUTPUT -- -Edge: composite: []str -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # sema -Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # dollar -Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # noop -Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # reset -Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # retryreset -Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # rewatch -Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # autoedge -Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # autogroup -Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # realize -Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # reverse -Edge: const: float(4.2) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # limit -Edge: const: int(-1) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # retry -Edge: const: int(0) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # delay -Edge: const: int(3) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # burst -Edge: const: int(5) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} # poll +Edge: composite: []str -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # export +Edge: composite: []str -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # sema +Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # dollar +Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # hidden +Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # noop +Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # reset +Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # retryreset +Edge: const: bool(false) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # rewatch +Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # autoedge +Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # autogroup +Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # realize +Edge: const: bool(true) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # reverse +Edge: const: float(4.2) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # limit +Edge: const: int(-1) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # retry +Edge: const: int(0) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # delay +Edge: const: int(3) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # burst +Edge: const: int(5) -> composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} # poll Edge: const: str("bar:3") -> composite: []str # 1 Edge: const: str("foo:1") -> composite: []str # 0 +Edge: const: str("hostname") -> composite: []str # 0 Vertex: composite: []str -Vertex: composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; reverse bool; autoedge bool; autogroup bool} +Vertex: composite: []str +Vertex: composite: struct{noop bool; retry int; retryreset bool; delay int; poll int; limit float; burst int; reset bool; sema []str; rewatch bool; realize bool; dollar bool; hidden bool; export []str; reverse bool; autoedge bool; autogroup bool} +Vertex: const: bool(false) Vertex: const: bool(false) Vertex: const: bool(false) Vertex: const: bool(false) @@ -61,5 +68,6 @@ Vertex: const: int(3) Vertex: const: int(5) Vertex: const: str("bar:3") Vertex: const: str("foo:1") +Vertex: const: str("hostname") Vertex: const: str("test") Vertex: const: str("test") diff --git a/lang/lang.go b/lang/lang.go index 38c595de..0baa9caa 100644 --- a/lang/lang.go +++ b/lang/lang.go @@ -438,8 +438,16 @@ func (obj *Lang) Interpret() (*pgraph.Graph, error) { obj.Logf("running interpret...") table := obj.funcs.Table() // map[pgraph.Vertex]types.Value + interpreter := &interpret.Interpreter{ + Debug: obj.Debug, + Logf: func(format string, v ...interface{}) { + // TODO: is this a sane prefix to use here? + obj.Logf("interpret: "+format, v...) + }, + } + // this call returns the graph - graph, err := interpret.Interpret(obj.ast, table) + graph, err := interpreter.Interpret(obj.ast, table) if err != nil { return nil, errwrap.Wrapf(err, "could not interpret") } diff --git a/lang/parser/lexer.nex b/lang/parser/lexer.nex index 015e5a16..6dfa4651 100644 --- a/lang/parser/lexer.nex +++ b/lang/parser/lexer.nex @@ -261,6 +261,11 @@ lval.str = yylex.Text() return PANIC_IDENTIFIER } +/collect/ { + yylex.pos(lval) // our pos + lval.str = yylex.Text() + return COLLECT_IDENTIFIER + } /"(\\.|[^"])*"/ { // This matches any number of the bracketed patterns // that are surrounded by the two quotes on each side. diff --git a/lang/parser/parser.y b/lang/parser/parser.y index d3fb6b01..2b34bc36 100644 --- a/lang/parser/parser.y +++ b/lang/parser/parser.y @@ -104,6 +104,7 @@ func init() { %token CLASS_IDENTIFIER INCLUDE_IDENTIFIER %token IMPORT_IDENTIFIER AS_IDENTIFIER %token COMMENT ERROR +%token COLLECT_IDENTIFIER %token PANIC_IDENTIFIER // precedence table @@ -188,6 +189,11 @@ stmt: $$.stmt = $1.stmt locate(yylex, $1, yyDollar[len(yyDollar)-1], $$.stmt) } +| collect + { + $$.stmt = $1.stmt + locate(yylex, $1, yyDollar[len(yyDollar)-1], $$.stmt) + } | resource { $$.stmt = $1.stmt @@ -1026,6 +1032,46 @@ panic: locate(yylex, $1, yyDollar[len(yyDollar)-1], $$.stmt) } ; +collect: + // `collect file "/tmp/hello" { ... }` + // `collect file ["/tmp/hello", ...,] { ... }` + // `collect file [struct{name => "/tmp/hello", host => "foo",}, ...,] { ... }` + COLLECT_IDENTIFIER resource + { + // A "collect" stmt is exactly a regular "res" statement, except + // it has the boolean "Collect" field set to true, and it also + // has a special "resource body" entry which accepts the special + // collected data from the function graph. + $$.stmt = $2.stmt // it's us now + kind := $2.stmt.(*ast.StmtRes).Kind + res := $$.stmt.(*ast.StmtRes) + res.Collect = true + // We are secretly adding a special field to the res contents, + // which receives all of the exported data so that we have it + // arrive in our function graph in the standard way. We'd need + // to have this data to be able to build the resources we want! + call := &ast.ExprCall{ + // function name to lookup special values from that kind + Name: funcs.CollectFuncName, + Args: []interfaces.Expr{ + &ast.ExprStr{ // magic operator first + V: kind, // tell it what we're reading + }, + // names to collect + // XXX: Can we copy the same AST nodes to here? + // XXX: Do I need to run .Copy() on them ? + // str, []str, or []struct{name str; host str} + res.Name, // expr (hopefully one of those types) + }, + } + collect := &ast.StmtResCollect{ // special field + Kind: kind, // might as well tell it directly + Value: call, + } + res.Contents = append(res.Contents, collect) + locate(yylex, $1, yyDollar[len(yyDollar)-1], $$.stmt) + } +; /* TODO: do we want to include this? // resource bind rbind: diff --git a/lib/main.go b/lib/main.go index db4d3907..a0da2358 100644 --- a/lib/main.go +++ b/lib/main.go @@ -1014,6 +1014,22 @@ func (obj *Main) Run() error { continue // stay paused } + // XXX: Should we do this right before Commit? + // Don't obj.ge.Apply(...), that works on the old graph! + timing = time.Now() + //if !obj.ge.IsClosing() { // XXX: do we need to do this? + // skip prune when we're closing + //} + // FIXME: is this the right ctx? + if err := obj.ge.Exporter.Prune(exitCtx, obj.ge.Graph()); err != nil { + // XXX: This should just cause a permanent error + // here which turns into a shutdown. Refactor! + obj.ge.Abort() // delete graph + Logf("error running the exporter Prune: %+v", err) + continue + } + Logf("export cleanup took: %s", time.Since(timing)) + // Start needs to be synchronous because we don't want // to loop around and cause a pause before we unpaused. // Commit already starts things, but we still need to diff --git a/pgraph/pgraph.go b/pgraph/pgraph.go index 32317f16..2285ab97 100644 --- a/pgraph/pgraph.go +++ b/pgraph/pgraph.go @@ -244,6 +244,7 @@ func (g *Graph) AddEdge(v1, v2 Vertex, e Edge) { g.AddVertex(v1, v2) // supports adding N vertices now // TODO: check if an edge exists to avoid overwriting it! // NOTE: VertexMerge() depends on overwriting it at the moment... + // NOTE: Interpret() depends on overwriting it at the moment... g.adjacency[v1][v2] = e }