Files
mgmt/lang/funcs/dage/dage.go
James Shubin 790b7199ca lang: New function engine
This mega patch primarily introduces a new function engine. The main
reasons for this new engine are:

1) Massively improved performance with lock-contended graphs.

Certain large function graphs could have very high lock-contention which
turned out to be much slower than I would have liked. This new algorithm
happens to be basically lock-free, so that's another helpful
improvement.

2) Glitch-free function graphs.

The function graphs could "glitch" (an FRP term) which could be
undesirable in theory. In practice this was never really an issue, and
I've not explicitly guaranteed that the new graphs are provably
glitch-free, but in practice things are a lot more consistent.

3) Simpler graph shape.

The new graphs don't require the private channels. This makes
understanding the graphs a lot easier.

4) Branched graphs only run half.

Previously we would run two pure side of an if statement, and while this
was mostly meant as an early experiment, it stayed in for far too long
and now was the right time to remove this. This also means our graphs
are much smaller and more efficient too.

Note that this changed the function API slightly. Everything has been
ported. It's possible that we introduce a new API in the future, but it
is unexpected to cause removal of the two current APIs.

In addition, we finally split out the "schedule" aspect from
world.schedule(). The "pick me" aspects now happen in a separate
resource, rather than as a yucky side-effect in the function. This also
lets us more precisely choose when we're scheduled, and we can observe
without being chosen too.

As usual many thanks to Sam for helping through some of the algorithmic
graph shape issues!
2025-09-11 23:19:45 -04:00

1087 lines
35 KiB
Go

// Mgmt
// Copyright (C) James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//
// Additional permission under GNU GPL version 3 section 7
//
// If you modify this program, or any covered work, by linking or combining it
// with embedded mcl code and modules (and that the embedded mcl code and
// modules which link with this program, contain a copy of their source code in
// the authoritative form) containing parts covered by the terms of any other
// license, the licensors of this program grant you additional permission to
// convey the resulting work. Furthermore, the licensors of this program grant
// the original author, James Shubin, additional permission to update this
// additional permission if he deems it necessary to achieve the goals of this
// additional permission.
// Package dage implements a DAG function engine.
// TODO: can we rename this to something more interesting?
package dage
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/engine/local"
"github.com/purpleidea/mgmt/lang/funcs/ref"
"github.com/purpleidea/mgmt/lang/funcs/txn"
"github.com/purpleidea/mgmt/lang/interfaces"
"github.com/purpleidea/mgmt/lang/types"
"github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/util"
"github.com/purpleidea/mgmt/util/errwrap"
)
// Engine implements a dag engine which lets us "run" a dag of functions, but
// also allows us to modify it while we are running. The functions we support
// can take one of two forms.
//
// 1) A function that supports the normal interfaces.Func API. It has Call() in
// particular. Most functions are done with only this API.
//
// 2) A function which adds Stream() to support the StreamableFunc API. Some
// functions use this along with Event() to notify of a new value.
//
// 3) A third *possible* (but not currently implemented API) would be one that
// has Stream() but takes an Input and Output channel of values instead. This is
// similar to what we previously had. Of note, every input must generate one
// output value. (And more spontaneous output values are allowed as well.)
//
// Of note, functions that support Call() Can also cause an interrupt to happen.
// It's not clear if this (3) option would be allowed to have a Call() method or
// not, and if it would send interrupts on the Output channel.
//
// Of additional note, some functions also require the "ShapelyFunc" API to work
// correctly. Use of this is rare.
//
// XXX: If this engine continuously receives function events at a higher speed
// than it can process, then it will bog down and consume memory infinitely. We
// should consider adding some sort of warning or error if we get to a certain
// size.
//
// XXX: It's likely that this engine could be made even more efficient by more
// cleverly traversing through the graph. Instead of a topological sort, we
// could have some fancy map that determines what's remaining to go through, so
// that when we "interrupt" we don't needlessly repeatedly visit nodes again and
// trigger the "epoch skip" situations. We could also do the incremental
// toposort so that we don't properly re-run the whole algorithm over and over
// if we're always just computing changes.
//
// XXX: We could consider grouping multiple incoming events into a single
// descent into the DAG. It's not clear if this kind of de-duplication would
// break some "glitch-free" aspects or not. It would probably improve
// performance but we'd have to be careful about how we did it.
//
// XXX: Respect the info().Pure and info().Memo fields somewhere...
type Engine struct {
// Name is the name used for the instance of the engine and in the graph
// that is held within it.
Name string
Hostname string
Local *local.API
World engine.World
Debug bool
Logf func(format string, v ...interface{})
// graph is the internal graph. It is only changed during interrupt.
graph *pgraph.Graph
// refCount keeps track of vertex and edge references across the entire
// graph.
refCount *ref.Count
// state stores some per-vertex (function) state
state map[interfaces.Func]*state
// wg counts every concurrent process here.
wg *sync.WaitGroup
// ag is the aggregation channel, which receives events from any of the
// StreamableFunc's that are running.
// XXX: add a mechanism to detect if it gets too full
ag *util.InfiniteChan[*state]
//ag chan *state
// cancel can be called to shutdown Run() after it's started of course.
cancel func()
// streamChan is used to send the stream of tables to the outside world.
streamChan chan interfaces.Table
// interrupt specifies that a txn "commit" just happened.
interrupt bool
// topoSort is the last topological sort we ran.
topoSort []pgraph.Vertex
// ops is a list of operations to run during interrupt. This is usually
// a delete vertex, but others are possible.
ops []ops
// err contains the last error after a shutdown occurs.
err error
errMutex *sync.Mutex // guards err
// graphvizCount keeps a running tally of how many graphs we've
// generated. This is useful for displaying a sequence (timeline) of
// graphs in a linear order.
graphvizCount int64
// graphvizDirectory stores the generated path for outputting graphviz
// files if one is not specified at runtime.
graphvizDirectory string
}
// Setup sets up the internal datastructures needed for this engine. We use this
// earlier step before Run() because it's usually not called concurrently, which
// makes it easier to catch the obvious errors before Run() runs in a goroutine.
func (obj *Engine) Setup() error {
var err error
obj.graph, err = pgraph.NewGraph(obj.Name)
if err != nil {
return err
}
obj.state = make(map[interfaces.Func]*state)
obj.refCount = (&ref.Count{}).Init()
obj.wg = &sync.WaitGroup{}
obj.errMutex = &sync.Mutex{}
//obj.ag = make(chan *state, 1) // for group events
//obj.ag = make(chan *state) // normal no buffer, we can't drop any
obj.ag = util.NewInfiniteChan[*state]() // lock-free but unbounded
obj.streamChan = make(chan interfaces.Table)
obj.ops = []ops{}
return nil
}
// Run kicks off the function engine. You must add the initial graph via Txn
// *before* you run this function.
// XXX: try and fix the engine so you can run either Txn or Run first.
func (obj *Engine) Run(ctx context.Context) error {
if obj.refCount == nil { // any arbitrary flag would be fine here
return fmt.Errorf("you must run Setup before first use")
}
//obj.wg = &sync.WaitGroup{} // in Setup
defer obj.wg.Wait()
// cancel to allow someone to shut everything down...
ctx, cancel := context.WithCancel(ctx)
defer cancel()
obj.cancel = cancel
//obj.streamChan = make(chan Table) // in Setup
defer close(obj.streamChan)
err := obj.process(ctx, 1) // start like this for now
obj.errAppend(err)
return err
}
// process could be combined with Run, but it is left separate in case we try to
// build a recursive process operation that runs on a subgraph. It would need an
// incoming graph argument as well, I would expect.
func (obj *Engine) process(ctx context.Context, epoch int64) error {
mapping := make(map[pgraph.Vertex]int)
start := 0
table := make(interfaces.Table) // map[interfaces.Func]types.Value
Start:
for {
// If it's our first time, we want to interrupt, because we may
// not ever get any events otherwise, and we'd block at select
// waiting on obj.ag forever. Remember that the effect() of Txn
// causes an interrupt when we add the first graph in. This
// means we need to do the initial Txn before we startup here!
if obj.interrupt {
if obj.Debug {
obj.Logf("interrupt!")
}
// Handle delete (and other ops) first. We keep checking
// until this is empty, because a Cleanup operation
// running inside this loop could cause more vertices
// to be added, and so on.
for len(obj.ops) > 0 {
op := obj.ops[0] // run in same order added
obj.ops = obj.ops[1:] // queue
// adds are new vertices which join the graph
if add, ok := op.(*addVertex); ok {
table[add.f] = nil // for symmetry
if err := add.fn(ctx); err != nil { // Init!
return err
}
continue
}
// deletes are the list of Func's (vertices)
// that were deleted in a txn.
if del, ok := op.(*deleteVertex); ok {
delete(table, del.f) // cleanup the table
if err := del.fn(ctx); err != nil { // Cleanup!
return err
}
continue
}
}
// Interrupt should only happen if we changed the graph
// shape, so recompute the topological sort right here.
// XXX: Can we efficiently edit the old topoSort by
// knowing the add/del? If the graph is shrinking, just
// remove those vertices from our current toposort. If
// the graph is growing, can we topo sort the subset and
// put them at the beginning of our old toposort? Is it
// guaranteed that "spawned nodes" will have earlier
// precedence than existing stuff? Can we be clever? Can
// we "float" anything upwards that's needed for the
// "toposort" by seeing what we're connected to, and
// sort all of that?
var err error
obj.topoSort, err = obj.graph.TopologicalSort()
if err != nil {
// programming error
return err
}
// This interrupt must be set *after* the above deletes
// happen, because those can cause transactions to run,
// and those transactions run obj.effect() which resets
// this interrupt value back to true!
obj.interrupt = false // reset
start = 0 // restart the loop
for i, v := range obj.topoSort { // TODO: Do it once here, or repeatedly below?
mapping[v] = i
}
goto PreIterate // skip waiting for a new event
}
if n := obj.graph.NumVertices(); n == 0 {
// If we're here, then the engine is done, because we'd
// block forever.
return nil
}
if obj.Debug {
obj.Logf("waiting for event...")
}
select {
case node, ok := <-obj.ag.Out:
if obj.Debug {
obj.Logf("got event: %v", node)
}
if !ok {
// TODO: If we don't have events, maybe shutdown?
panic("unexpected event channel shutdown")
}
// i is 0 if missing
i, _ := mapping[node.Func] // get the node to start from...
start = i
// XXX: Should we ACK() here so that Stream can "make"
// the new value available to it's Call() starting now?
case <-ctx.Done():
return ctx.Err()
}
// We have at least one event now!
PreIterate:
valid := true // assume table is valid for this iteration
Iterate:
// Iterate down through the graph...
for i := start; i < len(obj.topoSort); i++ { // formerly: for _, v := range obj.topoSort
start = i // set for subsequent runs
v := obj.topoSort[i]
f, ok := v.(interfaces.Func)
if !ok {
panic("not a Func")
}
if obj.Debug {
obj.Logf("topo(%d): %p %+v", i, f, f)
}
mapping[v] = i // store for subsequent loops
node, exists := obj.state[f]
if !exists {
panic(fmt.Sprintf("node state missing: %s", f))
}
streamableFunc, isStreamable := f.(interfaces.StreamableFunc)
if isStreamable && !node.started { // don't start twice
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
// XXX: I think the design should be that
// if this ever shuts down, then the
// function engine should shut down, but
// that the individual Call() can error...
// This is inline with our os.Readfilewait
// function which models the logic we want...
// If the call errors AND we have the Except
// feature, then we want that Except to run,
// but if we get a new event, then we should
// try again. basically revive itself after
// an errored Call function. Of course if
// Stream shuts down, we're nuked, so maybe
// we might want to retry... So maybe a resource
// could tweak the retry params for such a
// function??? Or maybe a #pragma kind of
// directive thing above each function???
err := streamableFunc.Stream(ctx)
if err == nil {
return
}
obj.errAppend(err)
obj.cancel() // error
}()
node.started = true
}
if node.epoch >= epoch { // we already did this one
if obj.Debug {
obj.Logf("epoch skip: %p %v", f, f)
}
continue
}
// XXX: memoize until graph shape changes?
incoming := obj.graph.IncomingGraphVertices(f) // []pgraph.Vertex
// Not all of the incoming edges have been added yet.
// We start by doing the "easy" count, and if it fails,
// we fall back on the slightly more expensive, and
// accurate count. This is because logical edges can be
// combined into a single physical edge. This happens if
// we have the same arg (a, b) passed to the same func.
if n := len(node.Func.Info().Sig.Ord); n != len(incoming) && n != realEdgeCount(obj.graph.IncomingGraphEdges(f)) {
if obj.Debug {
obj.Logf("edge skip: %p %v", f, f)
}
valid = false
// If we skip here, we also want to skip any of
// the vertices that depend on this one. This is
// because the toposort might offer our children
// before a non-dependent node which might be
// the node that causes the interrupt which adds
// the edge which is currently not added yet.
continue
}
// if no incoming edges, no incoming data, so this noop's
si := &types.Type{
// input to functions are structs
Kind: types.KindStruct,
Map: node.Func.Info().Sig.Map,
Ord: node.Func.Info().Sig.Ord,
}
st := types.NewStruct(si)
// The above builds a struct with fields
// populated for each key (empty values)
// so we need to very carefully check if
// every field is received before we can
// safely send it downstream to an edge.
need := make(map[string]struct{}) // keys we need
for _, k := range node.Func.Info().Sig.Ord {
need[k] = struct{}{}
}
for _, vv := range incoming {
ff, ok := vv.(interfaces.Func)
if !ok {
panic("not a Func")
}
// XXX: do we need a lock around reading obj.state?
fromNode, exists := obj.state[ff]
if !exists {
panic(fmt.Sprintf("missing node state: %s", ff))
}
// Node we pull from should be newer epoch than us!
if node.epoch >= fromNode.epoch {
if obj.Debug {
obj.Logf("inner epoch skip: %p %v", f, f)
//obj.Logf("inner epoch skip: NODE(%p is %d): %v FROM(%p is %d) %v", f, node.epoch, f, ff, fromNode.epoch, ff)
}
// Don't set non-valid here because if
// we have *two* FuncValue's that both
// interrupt, the first one will happen,
// and then the reset of the graph can
// be updated to the current epoch, but
// when the full graph is ready here, we
// would skip because of this bool!
//valid = false // don't do this!
continue Iterate
}
value := fromNode.result
if value == nil {
//if valid { // must be a programming err!
panic(fmt.Sprintf("unexpected nil node result from: %s", ff))
//}
// We're reading from a node which got
// skipped because it didn't have all of
// its edges yet. (or a programming bug)
//continue Iterate
// The fromNode epoch check above should
// make this additional check redundant.
}
// set each arg, since one value
// could get used for multiple
// function inputs (shared edge)
// XXX: refactor this edge look up for efficiency since we just did IncomingGraphVertices?
edge := obj.graph.Adjacency()[ff][f]
if edge == nil {
panic(fmt.Sprintf("edge is nil from `%s` to `%s`", ff, f))
}
args := edge.(*interfaces.FuncEdge).Args
for _, arg := range args {
// Skip edge is unused at this time.
//if arg == "" { // XXX: special skip edge!
// // XXX: we could maybe detect this at the incoming loop above instead
// continue
//}
// populate struct
if err := st.Set(arg, value); err != nil {
//panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v", node, fromNode, err))
keys := []string{}
for k := range st.Struct() {
keys = append(keys, k)
}
panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v, has: %v", node, fromNode, err, keys))
}
if _, exists := need[arg]; !exists {
keys := []string{}
for k := range st.Struct() {
keys = append(keys, k)
}
// could be either a duplicate or an unwanted field (edge name)
panic(fmt.Sprintf("unexpected struct key `%s` on `%s` from `%s`, has(%d): %v", arg, node, fromNode, len(keys), keys))
}
delete(need, arg)
}
}
// We just looped through all the incoming edges.
// XXX: Can we do the above bits -> struct, and then the
// struct -> list here, all in one faster step for perf?
args, err := interfaces.StructToCallableArgs(st) // []types.Value, error)
if err != nil {
panic(fmt.Sprintf("struct to callable failure on `%s`: %v, has: %v", node, err, st))
}
// Call the function.
if obj.Debug {
obj.Logf("call: %v", f)
}
//node.result, err = f.Call(ctx, args)
node.result, err = obj.call(f, ctx, args) // recovers!
// XXX: On error lookup the fallback value if it exists.
// XXX: This might cause an interrupt + graph addition.
if err == interfaces.ErrInterrupt {
// re-run topological sort... at the top!
obj.interrupt = true // should be set in obj.effect
continue Start
}
if obj.interrupt {
// We have a function which caused an interrupt,
// but which didn't return ErrInterrupt. This is
// a programming error by the function.
return fmt.Errorf("function didn't interrupt correctly: %s", node)
}
if err != nil {
return err
}
if node.result == nil && len(obj.graph.OutgoingGraphVertices(f)) > 0 {
// XXX: this check may not work if we have our
// "empty" named edges added on here...
return fmt.Errorf("unexpected nil value from node: %s", node)
}
old := node.epoch
node.epoch = epoch // store it after a successful call
if obj.Debug {
obj.Logf("set epoch(%d) to %d: %p %v", old, epoch, f, f)
}
// XXX: Should we check here to see if we can shutdown?
// For a given node, if Stream is not running, and no
// incoming nodes are still open, and if we're Pure, and
// we can memoize, then why not shutdown this node and
// remove it from the graph? Run a graph interrupt to
// delete this vertex. This will run Cleanup. Is it safe
// to also delete the table entry? Is it needed or used?
if node.result == nil {
// got an end of line vertex that would normally
// send a dummy value... don't store in table...
continue
}
table[f] = node.result // build up our table of values
} // end of single graph traversal
if !valid { // don't send table yet, it's not complete
continue
}
// Send a table of the complete set of values, which should all
// have the same epoch, and send it as an event to the outside.
// We need a copy of the map since we'll keep modifying it now.
// The table must get cleaned up over time to be consistent. It
// currently happens in interrupt as a result of a node delete.
cp := table.Copy()
if obj.Debug {
obj.Logf("table:")
for k, v := range cp {
obj.Logf("table[%p %v]: %p %+v", k, k, v, v)
}
}
select {
case obj.streamChan <- cp:
case <-ctx.Done():
return ctx.Err()
}
// XXX: implement epoch rollover by relabelling all nodes
epoch++ // increment it after a successful traversal
if obj.Debug {
obj.Logf("epoch(%d) increment to %d", epoch-1, epoch)
}
} // end big for loop
}
// event is ultimately called from a function to trigger an event in the engine.
// We'd like for this to never block, because that makes it much easier to
// prevent deadlocks in some tricky functions. On the other side, we don't want
// to necessarily merge events if we want to ensure each sent event gets seen in
// order. A buffered channel would accomplish this, but then it would need a
// fixed size, and if it reached the capacity we'd be in the deadlock situation
// again. Instead, we use a buffered channel of size one, and a queue of data
// which stores the event information.
func (obj *Engine) event(ctx context.Context, state *state) error {
//f := state.Func // for reference, how to get the Vertex/Func pointer!
select {
case obj.ag.In <- state: // buffered to avoid blocking issues
// tell function engine who had an event... deal with it before
// we get to handle subsequent ones...
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// effect runs at the end of the transaction, but before it returns.
// XXX: we don't need delta ops if we can just plug into our implementations of
// the addVertex and deleteVertex ...
func (obj *Engine) effect( /*delta *DeltaOps*/ ) error {
obj.interrupt = true
// The toposort runs in interrupt. We could save `delta` if it's needed.
//var err error
//obj.topoSort, err = obj.graph.TopologicalSort()
//return err
return nil
}
// call is a helper to handle the recovering if needed from a function call.
func (obj *Engine) call(f interfaces.Func, ctx context.Context, args []types.Value) (result types.Value, reterr error) {
defer func() {
// catch programming errors
if r := recover(); r != nil {
obj.Logf("panic in process: %+v", r)
reterr = fmt.Errorf("panic in process: %+v", r)
}
}()
return f.Call(ctx, args)
}
// Stream returns a channel that you can follow to get aggregated graph events.
// Do not block reading from this channel as you can hold up the entire engine.
func (obj *Engine) Stream() <-chan interfaces.Table {
return obj.streamChan
}
// Err will contain the last error when Stream shuts down. It waits for all the
// running processes to exit before it returns.
func (obj *Engine) Err() error {
obj.wg.Wait()
return obj.err
}
// Txn returns a transaction that is suitable for adding and removing from the
// graph. You must run Setup before this method is called.
func (obj *Engine) Txn() interfaces.Txn {
if obj.refCount == nil {
panic("you must run Setup before first use")
}
// The very first initial Txn must have a wait group to make sure if we
// shutdown (in error) that we can Reverse things before the Lock/Unlock
// loop shutsdown.
//var free func()
//if !obj.firstTxn {
// obj.firstTxn = true
// obj.wgTxn.Add(1)
// free = func() {
// obj.wgTxn.Done()
// }
//}
return (&txn.GraphTxn{
Post: obj.effect,
Lock: func() {}, // noop for now
Unlock: func() {}, // noop for now
GraphAPI: obj,
RefCount: obj.refCount, // reference counting
//FreeFunc: free,
}).Init()
}
// addVertex is the lockless version of the AddVertex function. This is needed
// so that AddEdge can add two vertices within the same lock.
func (obj *Engine) addVertex(f interfaces.Func) error {
if _, exists := obj.state[f]; exists {
// don't err dupes, because it makes using the AddEdge API yucky
return nil
}
// add some extra checks for easier debugging
if f == nil {
return fmt.Errorf("missing func")
}
if f.Info() == nil {
return fmt.Errorf("missing func info for node: %s", f)
}
sig := f.Info().Sig
if sig == nil {
return fmt.Errorf("missing func sig for node: %s", f)
}
if sig.Kind != types.KindFunc {
return fmt.Errorf("kind is not func for node: %s", f)
}
if err := f.Validate(); err != nil {
return errwrap.Wrapf(err, "did not Validate node: %s", f)
}
txn := obj.Txn()
// This is the one of two places where we modify this map. To avoid
// concurrent writes, we only do this when we're locked! Anywhere that
// can read where we are locked must have a mutex around it or do the
// lookup when we're in an unlocked state.
node := &state{
Func: f,
name: f.String(), // cache a name to avoid locks
txn: txn,
//running: false,
//epoch: 0,
}
init := &interfaces.Init{
Hostname: obj.Hostname,
Event: func(ctx context.Context) error {
return obj.event(ctx, node) // pass state to avoid search
},
Txn: node.txn,
Local: obj.Local,
World: obj.World,
Debug: obj.Debug,
Logf: func(format string, v ...interface{}) {
// safe Logf in case f.String contains %? chars...
s := f.String() + ": " + fmt.Sprintf(format, v...)
obj.Logf("%s", s)
},
}
op := &addVertex{
f: f,
fn: func(ctx context.Context) error {
return f.Init(init) // TODO: should this take a ctx?
},
}
obj.ops = append(obj.ops, op) // mark for cleanup during interrupt
obj.state[f] = node // do this here b/c we rely on knowing it in real-time
obj.graph.AddVertex(f) // Txn relies on this happening now while it runs.
return nil
}
// AddVertex is the thread-safe way to add a vertex. You will need to call the
// engine Lock method before using this and the Unlock method afterwards.
func (obj *Engine) AddVertex(f interfaces.Func) error {
// No mutex needed here since this func runs in a non-concurrent Txn.
if obj.Debug {
obj.Logf("Engine:AddVertex: %p %s", f, f)
}
return obj.addVertex(f) // lockless version
}
// AddEdge is the thread-safe way to add an edge. You will need to call the
// engine Lock method before using this and the Unlock method afterwards. This
// will automatically run AddVertex on both input vertices if they are not
// already part of the graph. You should only create DAG's as this function
// engine cannot handle cycles and this method will error if you cause a cycle.
func (obj *Engine) AddEdge(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) error {
// No mutex needed here since this func runs in a non-concurrent Txn.
if obj.Debug {
obj.Logf("Engine:AddEdge %p %s: %p %s -> %p %s", fe, fe, f1, f1, f2, f2)
}
if obj.Debug { // not needed unless we have buggy graph building code
// safety check to avoid cycles
g := obj.graph.Copy()
//g.AddVertex(f1)
//g.AddVertex(f2)
g.AddEdge(f1, f2, fe)
if _, err := g.TopologicalSort(); err != nil {
return err // not a dag
}
// if we didn't cycle, we can modify the real graph safely...
}
// Does the graph already have these nodes in it?
//hasf1 := obj.graph.HasVertex(f1)
//hasf2 := obj.graph.HasVertex(f2)
if err := obj.addVertex(f1); err != nil { // lockless version
return err
}
if err := obj.addVertex(f2); err != nil {
// rollback f1 on error of f2
obj.deleteVertex(f1) // ignore any error
return err
}
// If f1 doesn't exist, let f1 (or it's incoming nodes) get the notify.
// If f2 is new, then it should get a new notification unless f1 is new.
// But there's no guarantee we didn't AddVertex(f2); AddEdge(f1, f2, e),
// so resend if f1 already exists. Otherwise it's not a new notification.
// previously: `if hasf1 && !hasf2`
//if hasf1 {
// //obj.resend[f2] = struct{}{} // resend notification to me
//}
obj.graph.AddEdge(f1, f2, fe) // replaces any existing edge here
// This shouldn't error, since the test graph didn't find a cycle. But
// we don't really need to do it, since the interrupt will run it too.
if obj.Debug { // not needed unless we have buggy graph building code
if _, err := obj.graph.TopologicalSort(); err != nil {
// programming error
panic(err) // not a dag
}
}
return nil
}
// deleteVertex is the lockless version of the DeleteVertex function. This is
// needed so that AddEdge can add two vertices within the same lock. It needs
// deleteVertex so it can rollback the first one if the second addVertex fails.
func (obj *Engine) deleteVertex(f interfaces.Func) error {
node, exists := obj.state[f]
if !exists {
return fmt.Errorf("vertex %p %s doesn't exist", f, f)
}
_ = node
// This is the one of two places where we modify this map. To avoid
// concurrent writes, we only do this when we're locked! Anywhere that
// can read where we are locked must have a mutex around it or do the
// lookup when we're in an unlocked state.
op := &deleteVertex{
f: f,
fn: func(ctx context.Context) error {
// XXX: do we run f.Done() first ? Did it run elsewhere?
cleanableFunc, ok := f.(interfaces.CleanableFunc)
if !ok {
return nil
}
return cleanableFunc.Cleanup(ctx)
},
}
obj.ops = append(obj.ops, op) // mark for cleanup during interrupt
delete(obj.state, f) // do this here b/c we rely on knowing it in real-time
obj.graph.DeleteVertex(f) // Txn relies on this happening now while it runs.
return nil
}
// DeleteVertex is the thread-safe way to delete a vertex. You will need to call
// the engine Lock method before using this and the Unlock method afterwards.
func (obj *Engine) DeleteVertex(f interfaces.Func) error {
// No mutex needed here since this func runs in a non-concurrent Txn.
if obj.Debug {
obj.Logf("Engine:DeleteVertex: %p %s", f, f)
}
return obj.deleteVertex(f) // lockless version
}
// DeleteEdge is the thread-safe way to delete an edge. You will need to call
// the engine Lock method before using this and the Unlock method afterwards.
func (obj *Engine) DeleteEdge(fe *interfaces.FuncEdge) error {
// No mutex needed here since this func runs in a non-concurrent Txn.
if obj.Debug {
f1, f2, found := obj.graph.LookupEdge(fe)
if found {
obj.Logf("Engine:DeleteEdge: %p %s -> %p %s", f1, f1, f2, f2)
} else {
obj.Logf("Engine:DeleteEdge: not found %p %s", fe, fe)
}
}
// Don't bother checking if edge exists first and don't error if it
// doesn't because it might have gotten deleted when a vertex did, and
// so there's no need to complain for nothing.
obj.graph.DeleteEdge(fe)
return nil
}
// HasVertex is the thread-safe way to check if a vertex exists in the graph.
// You will need to call the engine Lock method before using this and the Unlock
// method afterwards.
func (obj *Engine) HasVertex(f interfaces.Func) bool {
// No mutex needed here since this func runs in a non-concurrent Txn.
return obj.graph.HasVertex(f)
}
// LookupEdge is the thread-safe way to check which vertices (if any) exist
// between an edge in the graph. You will need to call the engine Lock method
// before using this and the Unlock method afterwards.
func (obj *Engine) LookupEdge(fe *interfaces.FuncEdge) (interfaces.Func, interfaces.Func, bool) {
// No mutex needed here since this func runs in a non-concurrent Txn.
v1, v2, found := obj.graph.LookupEdge(fe)
if !found {
return nil, nil, found
}
f1, ok := v1.(interfaces.Func)
if !ok {
panic("not a Func")
}
f2, ok := v2.(interfaces.Func)
if !ok {
panic("not a Func")
}
return f1, f2, found
}
// FindEdge is the thread-safe way to check which edge (if any) exists between
// two vertices in the graph. This is an important method in edge removal,
// because it's what you really need to know for DeleteEdge to work. Requesting
// a specific deletion isn't very sensical in this library when specified as the
// edge pointer, since we might replace it with a new edge that has new arg
// names. Instead, use this to look up what relationship you want, and then
// DeleteEdge to remove it. You will need to call the engine Lock method before
// using this and the Unlock method afterwards.
func (obj *Engine) FindEdge(f1, f2 interfaces.Func) *interfaces.FuncEdge {
// No mutex needed here since this func runs in a non-concurrent Txn.
edge := obj.graph.FindEdge(f1, f2)
if edge == nil {
return nil
}
fe, ok := edge.(*interfaces.FuncEdge)
if !ok {
panic("edge is not a FuncEdge")
}
return fe
}
// Graph returns a copy of the contained graph.
func (obj *Engine) Graph() *pgraph.Graph {
// No mutex needed here since this func runs in a non-concurrent Txn.
return obj.graph.Copy()
}
// ExecGraphviz writes out the diagram of a graph to be used for visualization
// and debugging. You must not modify the graph (eg: during Lock) when calling
// this method.
func (obj *Engine) ExecGraphviz(dir string) error {
// No mutex needed here since this func runs in a non-concurrent Txn.
// No mutex is needed at this time because we only run this in txn's and
// it should only be run with debugging enabled. Bring your own mutex.
//obj.graphvizMutex.Lock()
//defer obj.graphvizMutex.Unlock()
obj.graphvizCount++ // increment
if dir == "" {
dir = obj.graphvizDirectory
}
if dir == "" { // XXX: hack for ergonomics
d := time.Now().UnixMilli()
dir = fmt.Sprintf("/tmp/dage-graphviz-%s-%d/", obj.Name, d)
obj.graphvizDirectory = dir
}
if !strings.HasSuffix(dir, "/") {
return fmt.Errorf("dir must end with a slash")
}
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
//dashedEdges, err := pgraph.NewGraph("dashedEdges")
//if err != nil {
// return err
//}
//for _, v1 := range obj.graph.Vertices() {
// // if it's a ChannelBasedSinkFunc...
// if cb, ok := v1.(*structs.ChannelBasedSinkFunc); ok {
// // ...then add a dashed edge to its output
// dashedEdges.AddEdge(v1, cb.Target, &pgraph.SimpleEdge{
// Name: "channel", // secret channel
// })
// }
// // if it's a ChannelBasedSourceFunc...
// if cb, ok := v1.(*structs.ChannelBasedSourceFunc); ok {
// // ...then add a dashed edge from its input
// dashedEdges.AddEdge(cb.Source, v1, &pgraph.SimpleEdge{
// Name: "channel", // secret channel
// })
// }
//}
gv := &pgraph.Graphviz{
Name: obj.graph.GetName(),
Filename: fmt.Sprintf("%s/%d.dot", dir, obj.graphvizCount),
Graphs: map[*pgraph.Graph]*pgraph.GraphvizOpts{
obj.graph: nil,
//dashedEdges: {
// Style: "dashed",
//},
},
}
if err := gv.Exec(); err != nil {
return err
}
return nil
}
// errAppend is a simple helper function.
func (obj *Engine) errAppend(err error) {
obj.errMutex.Lock()
obj.err = errwrap.Append(obj.err, err)
obj.errMutex.Unlock()
}
// state tracks some internal vertex-specific state information.
type state struct {
Func interfaces.Func
name string // cache a name here for safer concurrency
txn interfaces.Txn // API of GraphTxn struct to pass to each function
// started is true if this is a StreamableFunc, and Stream was started.
started bool
// epoch represents the "iteration count" through the graph. All values
// in a returned table should be part of the same epoch. This guarantees
// that they're all consistent with respect to each other.
epoch int64 // if this rolls over, we've been running for too many years
// result is the latest output from calling this function.
result types.Value
}
// String implements the fmt.Stringer interface for pretty printing!
func (obj *state) String() string {
if obj.name != "" {
return obj.name
}
return obj.Func.String()
}
// ops is either an addVertex or deleteVertex operation.
type ops interface {
}
// addVertex is one of the "ops" that are possible.
type addVertex struct {
f interfaces.Func
fn func(context.Context) error
}
// deleteVertex is one of the "ops" that are possible.
type deleteVertex struct {
f interfaces.Func
fn func(context.Context) error
}
// realEdgeCount tells us how many "logical" edges there are. We have shared
// edges which represent more than one value, when the same value is passed more
// than once. This takes those into account correctly.
func realEdgeCount(edges []pgraph.Edge) int {
total := 0
for _, edge := range edges {
fe, ok := edge.(*interfaces.FuncEdge)
if !ok {
total++
continue
}
total += len(fe.Args) // these can represent more than one edge!
}
return total
}