Files
mgmt/lang/funcs/dage/dage.go
James Shubin a5f89d8d7b lang: funcs: dage: Use lazy freshness check
Not sure if this would introduce a glitch or not. Does seem to work
correctly. Without this, examples/lang/datetime2.mcl don't update
properly.
2025-09-11 23:19:45 -04:00

1094 lines
36 KiB
Go

// Mgmt
// Copyright (C) James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//
// Additional permission under GNU GPL version 3 section 7
//
// If you modify this program, or any covered work, by linking or combining it
// with embedded mcl code and modules (and that the embedded mcl code and
// modules which link with this program, contain a copy of their source code in
// the authoritative form) containing parts covered by the terms of any other
// license, the licensors of this program grant you additional permission to
// convey the resulting work. Furthermore, the licensors of this program grant
// the original author, James Shubin, additional permission to update this
// additional permission if he deems it necessary to achieve the goals of this
// additional permission.
// Package dage implements a DAG function engine.
// TODO: can we rename this to something more interesting?
package dage
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/engine/local"
"github.com/purpleidea/mgmt/lang/funcs/ref"
"github.com/purpleidea/mgmt/lang/funcs/txn"
"github.com/purpleidea/mgmt/lang/interfaces"
"github.com/purpleidea/mgmt/lang/types"
"github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/util"
"github.com/purpleidea/mgmt/util/errwrap"
)
// Engine implements a dag engine which lets us "run" a dag of functions, but
// also allows us to modify it while we are running. The functions we support
// can take one of two forms.
//
// 1) A function that supports the normal interfaces.Func API. It has Call() in
// particular. Most functions are done with only this API.
//
// 2) A function which adds Stream() to support the StreamableFunc API. Some
// functions use this along with Event() to notify of a new value.
//
// 3) A third *possible* (but not currently implemented API) would be one that
// has Stream() but takes an Input and Output channel of values instead. This is
// similar to what we previously had. Of note, every input must generate one
// output value. (And more spontaneous output values are allowed as well.)
//
// Of note, functions that support Call() Can also cause an interrupt to happen.
// It's not clear if this (3) option would be allowed to have a Call() method or
// not, and if it would send interrupts on the Output channel.
//
// Of additional note, some functions also require the "ShapelyFunc" API to work
// correctly. Use of this is rare.
//
// XXX: If this engine continuously receives function events at a higher speed
// than it can process, then it will bog down and consume memory infinitely. We
// should consider adding some sort of warning or error if we get to a certain
// size.
//
// XXX: It's likely that this engine could be made even more efficient by more
// cleverly traversing through the graph. Instead of a topological sort, we
// could have some fancy map that determines what's remaining to go through, so
// that when we "interrupt" we don't needlessly repeatedly visit nodes again and
// trigger the "epoch skip" situations. We could also do the incremental
// toposort so that we don't properly re-run the whole algorithm over and over
// if we're always just computing changes.
//
// XXX: We could consider grouping multiple incoming events into a single
// descent into the DAG. It's not clear if this kind of de-duplication would
// break some "glitch-free" aspects or not. It would probably improve
// performance but we'd have to be careful about how we did it.
//
// XXX: Respect the info().Pure and info().Memo fields somewhere...
type Engine struct {
// Name is the name used for the instance of the engine and in the graph
// that is held within it.
Name string
Hostname string
Local *local.API
World engine.World
Debug bool
Logf func(format string, v ...interface{})
// graph is the internal graph. It is only changed during interrupt.
graph *pgraph.Graph
// refCount keeps track of vertex and edge references across the entire
// graph.
refCount *ref.Count
// state stores some per-vertex (function) state
state map[interfaces.Func]*state
// wg counts every concurrent process here.
wg *sync.WaitGroup
// ag is the aggregation channel, which receives events from any of the
// StreamableFunc's that are running.
// XXX: add a mechanism to detect if it gets too full
ag *util.InfiniteChan[*state]
//ag chan *state
// cancel can be called to shutdown Run() after it's started of course.
cancel func()
// streamChan is used to send the stream of tables to the outside world.
streamChan chan interfaces.Table
// interrupt specifies that a txn "commit" just happened.
interrupt bool
// topoSort is the last topological sort we ran.
topoSort []pgraph.Vertex
// ops is a list of operations to run during interrupt. This is usually
// a delete vertex, but others are possible.
ops []ops
// err contains the last error after a shutdown occurs.
err error
errMutex *sync.Mutex // guards err
// graphvizCount keeps a running tally of how many graphs we've
// generated. This is useful for displaying a sequence (timeline) of
// graphs in a linear order.
graphvizCount int64
// graphvizDirectory stores the generated path for outputting graphviz
// files if one is not specified at runtime.
graphvizDirectory string
}
// Setup sets up the internal datastructures needed for this engine. We use this
// earlier step before Run() because it's usually not called concurrently, which
// makes it easier to catch the obvious errors before Run() runs in a goroutine.
func (obj *Engine) Setup() error {
var err error
obj.graph, err = pgraph.NewGraph(obj.Name)
if err != nil {
return err
}
obj.state = make(map[interfaces.Func]*state)
obj.refCount = (&ref.Count{}).Init()
obj.wg = &sync.WaitGroup{}
obj.errMutex = &sync.Mutex{}
//obj.ag = make(chan *state, 1) // for group events
//obj.ag = make(chan *state) // normal no buffer, we can't drop any
obj.ag = util.NewInfiniteChan[*state]() // lock-free but unbounded
obj.streamChan = make(chan interfaces.Table)
obj.ops = []ops{}
return nil
}
// Run kicks off the function engine. You must add the initial graph via Txn
// *before* you run this function.
// XXX: try and fix the engine so you can run either Txn or Run first.
func (obj *Engine) Run(ctx context.Context) error {
if obj.refCount == nil { // any arbitrary flag would be fine here
return fmt.Errorf("you must run Setup before first use")
}
//obj.wg = &sync.WaitGroup{} // in Setup
defer obj.wg.Wait()
// cancel to allow someone to shut everything down...
ctx, cancel := context.WithCancel(ctx)
defer cancel()
obj.cancel = cancel
//obj.streamChan = make(chan Table) // in Setup
defer close(obj.streamChan)
err := obj.process(ctx, 1) // start like this for now
obj.errAppend(err)
return err
}
// process could be combined with Run, but it is left separate in case we try to
// build a recursive process operation that runs on a subgraph. It would need an
// incoming graph argument as well, I would expect.
func (obj *Engine) process(ctx context.Context, epoch int64) error {
mapping := make(map[pgraph.Vertex]int)
start := 0
table := make(interfaces.Table) // map[interfaces.Func]types.Value
Start:
for {
// If it's our first time, we want to interrupt, because we may
// not ever get any events otherwise, and we'd block at select
// waiting on obj.ag forever. Remember that the effect() of Txn
// causes an interrupt when we add the first graph in. This
// means we need to do the initial Txn before we startup here!
if obj.interrupt {
if obj.Debug {
obj.Logf("interrupt!")
}
// Handle delete (and other ops) first. We keep checking
// until this is empty, because a Cleanup operation
// running inside this loop could cause more vertices
// to be added, and so on.
for len(obj.ops) > 0 {
op := obj.ops[0] // run in same order added
obj.ops = obj.ops[1:] // queue
// adds are new vertices which join the graph
if add, ok := op.(*addVertex); ok {
table[add.f] = nil // for symmetry
if err := add.fn(ctx); err != nil { // Init!
return err
}
continue
}
// deletes are the list of Func's (vertices)
// that were deleted in a txn.
if del, ok := op.(*deleteVertex); ok {
delete(table, del.f) // cleanup the table
if err := del.fn(ctx); err != nil { // Cleanup!
return err
}
continue
}
}
// Interrupt should only happen if we changed the graph
// shape, so recompute the topological sort right here.
// XXX: Can we efficiently edit the old topoSort by
// knowing the add/del? If the graph is shrinking, just
// remove those vertices from our current toposort. If
// the graph is growing, can we topo sort the subset and
// put them at the beginning of our old toposort? Is it
// guaranteed that "spawned nodes" will have earlier
// precedence than existing stuff? Can we be clever? Can
// we "float" anything upwards that's needed for the
// "toposort" by seeing what we're connected to, and
// sort all of that?
var err error
obj.topoSort, err = obj.graph.TopologicalSort()
if err != nil {
// programming error
return err
}
// This interrupt must be set *after* the above deletes
// happen, because those can cause transactions to run,
// and those transactions run obj.effect() which resets
// this interrupt value back to true!
obj.interrupt = false // reset
start = 0 // restart the loop
for i, v := range obj.topoSort { // TODO: Do it once here, or repeatedly below?
mapping[v] = i
}
goto PreIterate // skip waiting for a new event
}
if n := obj.graph.NumVertices(); n == 0 {
// If we're here, then the engine is done, because we'd
// block forever.
return nil
}
if obj.Debug {
obj.Logf("waiting for event...")
}
select {
case node, ok := <-obj.ag.Out:
if obj.Debug {
obj.Logf("got event: %v", node)
}
if !ok {
// TODO: If we don't have events, maybe shutdown?
panic("unexpected event channel shutdown")
}
// i is 0 if missing
i, _ := mapping[node.Func] // get the node to start from...
start = i
// XXX: Should we ACK() here so that Stream can "make"
// the new value available to it's Call() starting now?
case <-ctx.Done():
return ctx.Err()
}
// We have at least one event now!
PreIterate:
valid := true // assume table is valid for this iteration
Iterate:
// Iterate down through the graph...
for i := start; i < len(obj.topoSort); i++ { // formerly: for _, v := range obj.topoSort
start = i // set for subsequent runs
v := obj.topoSort[i]
f, ok := v.(interfaces.Func)
if !ok {
panic("not a Func")
}
if obj.Debug {
obj.Logf("topo(%d): %p %+v", i, f, f)
}
mapping[v] = i // store for subsequent loops
node, exists := obj.state[f]
if !exists {
panic(fmt.Sprintf("node state missing: %s", f))
}
streamableFunc, isStreamable := f.(interfaces.StreamableFunc)
if isStreamable && !node.started { // don't start twice
obj.wg.Add(1)
go func() {
defer obj.wg.Done()
// XXX: I think the design should be that
// if this ever shuts down, then the
// function engine should shut down, but
// that the individual Call() can error...
// This is inline with our os.Readfilewait
// function which models the logic we want...
// If the call errors AND we have the Except
// feature, then we want that Except to run,
// but if we get a new event, then we should
// try again. basically revive itself after
// an errored Call function. Of course if
// Stream shuts down, we're nuked, so maybe
// we might want to retry... So maybe a resource
// could tweak the retry params for such a
// function??? Or maybe a #pragma kind of
// directive thing above each function???
err := streamableFunc.Stream(ctx)
if err == nil {
return
}
obj.errAppend(err)
obj.cancel() // error
}()
node.started = true
}
if node.epoch >= epoch { // we already did this one
if obj.Debug {
obj.Logf("epoch skip: %p %v", f, f)
}
continue
}
// XXX: memoize until graph shape changes?
incoming := obj.graph.IncomingGraphVertices(f) // []pgraph.Vertex
// Not all of the incoming edges have been added yet.
// We start by doing the "easy" count, and if it fails,
// we fall back on the slightly more expensive, and
// accurate count. This is because logical edges can be
// combined into a single physical edge. This happens if
// we have the same arg (a, b) passed to the same func.
if n := len(node.Func.Info().Sig.Ord); n != len(incoming) && n != realEdgeCount(obj.graph.IncomingGraphEdges(f)) {
if obj.Debug {
obj.Logf("edge skip: %p %v", f, f)
}
valid = false
// If we skip here, we also want to skip any of
// the vertices that depend on this one. This is
// because the toposort might offer our children
// before a non-dependent node which might be
// the node that causes the interrupt which adds
// the edge which is currently not added yet.
continue
}
// if no incoming edges, no incoming data, so this noop's
si := &types.Type{
// input to functions are structs
Kind: types.KindStruct,
Map: node.Func.Info().Sig.Map,
Ord: node.Func.Info().Sig.Ord,
}
st := types.NewStruct(si)
// The above builds a struct with fields
// populated for each key (empty values)
// so we need to very carefully check if
// every field is received before we can
// safely send it downstream to an edge.
need := make(map[string]struct{}) // keys we need
for _, k := range node.Func.Info().Sig.Ord {
need[k] = struct{}{}
}
for _, vv := range incoming {
ff, ok := vv.(interfaces.Func)
if !ok {
panic("not a Func")
}
// XXX: do we need a lock around reading obj.state?
fromNode, exists := obj.state[ff]
if !exists {
panic(fmt.Sprintf("missing node state: %s", ff))
}
// Node we pull from should be newer epoch than us!
if node.epoch >= fromNode.epoch {
//if obj.Debug {
// obj.Logf("inner epoch skip: %p %v", f, f)
// //obj.Logf("inner epoch skip: NODE(%p is %d): %v FROM(%p is %d) %v", f, node.epoch, f, ff, fromNode.epoch, ff)
//}
// Don't set non-valid here because if
// we have *two* FuncValue's that both
// interrupt, the first one will happen,
// and then the reset of the graph can
// be updated to the current epoch, but
// when the full graph is ready here, we
// would skip because of this bool!
//valid = false // don't do this!
// The mistake in this check is that if
// *any* of the incoming edges are not
// ready, then we skip it all. But one
// may not even be built yet. So it's a
// mess. So at least for now, use the
// below "is nil" check instead.
//continue Iterate
}
value := fromNode.result
if value == nil {
//if valid { // must be a programming err!
//panic(fmt.Sprintf("unexpected nil node result from: %s", ff))
//}
// We're reading from a node which got
// skipped because it didn't have all of
// its edges yet. (or a programming bug)
continue Iterate
// The fromNode epoch check above should
// make this additional check redundant.
}
// set each arg, since one value
// could get used for multiple
// function inputs (shared edge)
// XXX: refactor this edge look up for efficiency since we just did IncomingGraphVertices?
edge := obj.graph.Adjacency()[ff][f]
if edge == nil {
panic(fmt.Sprintf("edge is nil from `%s` to `%s`", ff, f))
}
args := edge.(*interfaces.FuncEdge).Args
for _, arg := range args {
// Skip edge is unused at this time.
//if arg == "" { // XXX: special skip edge!
// // XXX: we could maybe detect this at the incoming loop above instead
// continue
//}
// populate struct
if err := st.Set(arg, value); err != nil {
//panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v", node, fromNode, err))
keys := []string{}
for k := range st.Struct() {
keys = append(keys, k)
}
panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v, has: %v", node, fromNode, err, keys))
}
if _, exists := need[arg]; !exists {
keys := []string{}
for k := range st.Struct() {
keys = append(keys, k)
}
// could be either a duplicate or an unwanted field (edge name)
panic(fmt.Sprintf("unexpected struct key `%s` on `%s` from `%s`, has(%d): %v", arg, node, fromNode, len(keys), keys))
}
delete(need, arg)
}
}
// We just looped through all the incoming edges.
// XXX: Can we do the above bits -> struct, and then the
// struct -> list here, all in one faster step for perf?
args, err := interfaces.StructToCallableArgs(st) // []types.Value, error)
if err != nil {
panic(fmt.Sprintf("struct to callable failure on `%s`: %v, has: %v", node, err, st))
}
// Call the function.
if obj.Debug {
obj.Logf("call: %v", f)
}
//node.result, err = f.Call(ctx, args)
node.result, err = obj.call(f, ctx, args) // recovers!
// XXX: On error lookup the fallback value if it exists.
// XXX: This might cause an interrupt + graph addition.
if err == interfaces.ErrInterrupt {
// re-run topological sort... at the top!
obj.interrupt = true // should be set in obj.effect
continue Start
}
if obj.interrupt {
// We have a function which caused an interrupt,
// but which didn't return ErrInterrupt. This is
// a programming error by the function.
return fmt.Errorf("function didn't interrupt correctly: %s", node)
}
if err != nil {
return err
}
if node.result == nil && len(obj.graph.OutgoingGraphVertices(f)) > 0 {
// XXX: this check may not work if we have our
// "empty" named edges added on here...
return fmt.Errorf("unexpected nil value from node: %s", node)
}
old := node.epoch
node.epoch = epoch // store it after a successful call
if obj.Debug {
obj.Logf("set epoch(%d) to %d: %p %v", old, epoch, f, f)
}
// XXX: Should we check here to see if we can shutdown?
// For a given node, if Stream is not running, and no
// incoming nodes are still open, and if we're Pure, and
// we can memoize, then why not shutdown this node and
// remove it from the graph? Run a graph interrupt to
// delete this vertex. This will run Cleanup. Is it safe
// to also delete the table entry? Is it needed or used?
if node.result == nil {
// got an end of line vertex that would normally
// send a dummy value... don't store in table...
continue
}
table[f] = node.result // build up our table of values
} // end of single graph traversal
if !valid { // don't send table yet, it's not complete
continue
}
// Send a table of the complete set of values, which should all
// have the same epoch, and send it as an event to the outside.
// We need a copy of the map since we'll keep modifying it now.
// The table must get cleaned up over time to be consistent. It
// currently happens in interrupt as a result of a node delete.
cp := table.Copy()
if obj.Debug {
obj.Logf("table:")
for k, v := range cp {
obj.Logf("table[%p %v]: %p %+v", k, k, v, v)
}
}
select {
case obj.streamChan <- cp:
case <-ctx.Done():
return ctx.Err()
}
// XXX: implement epoch rollover by relabelling all nodes
epoch++ // increment it after a successful traversal
if obj.Debug {
obj.Logf("epoch(%d) increment to %d", epoch-1, epoch)
}
} // end big for loop
}
// event is ultimately called from a function to trigger an event in the engine.
// We'd like for this to never block, because that makes it much easier to
// prevent deadlocks in some tricky functions. On the other side, we don't want
// to necessarily merge events if we want to ensure each sent event gets seen in
// order. A buffered channel would accomplish this, but then it would need a
// fixed size, and if it reached the capacity we'd be in the deadlock situation
// again. Instead, we use a buffered channel of size one, and a queue of data
// which stores the event information.
func (obj *Engine) event(ctx context.Context, state *state) error {
//f := state.Func // for reference, how to get the Vertex/Func pointer!
select {
case obj.ag.In <- state: // buffered to avoid blocking issues
// tell function engine who had an event... deal with it before
// we get to handle subsequent ones...
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// effect runs at the end of the transaction, but before it returns.
// XXX: we don't need delta ops if we can just plug into our implementations of
// the addVertex and deleteVertex ...
func (obj *Engine) effect( /*delta *DeltaOps*/ ) error {
obj.interrupt = true
// The toposort runs in interrupt. We could save `delta` if it's needed.
//var err error
//obj.topoSort, err = obj.graph.TopologicalSort()
//return err
return nil
}
// call is a helper to handle the recovering if needed from a function call.
func (obj *Engine) call(f interfaces.Func, ctx context.Context, args []types.Value) (result types.Value, reterr error) {
defer func() {
// catch programming errors
if r := recover(); r != nil {
obj.Logf("panic in process: %+v", r)
reterr = fmt.Errorf("panic in process: %+v", r)
}
}()
return f.Call(ctx, args)
}
// Stream returns a channel that you can follow to get aggregated graph events.
// Do not block reading from this channel as you can hold up the entire engine.
func (obj *Engine) Stream() <-chan interfaces.Table {
return obj.streamChan
}
// Err will contain the last error when Stream shuts down. It waits for all the
// running processes to exit before it returns.
func (obj *Engine) Err() error {
obj.wg.Wait()
return obj.err
}
// Txn returns a transaction that is suitable for adding and removing from the
// graph. You must run Setup before this method is called.
func (obj *Engine) Txn() interfaces.Txn {
if obj.refCount == nil {
panic("you must run Setup before first use")
}
// The very first initial Txn must have a wait group to make sure if we
// shutdown (in error) that we can Reverse things before the Lock/Unlock
// loop shutsdown.
//var free func()
//if !obj.firstTxn {
// obj.firstTxn = true
// obj.wgTxn.Add(1)
// free = func() {
// obj.wgTxn.Done()
// }
//}
return (&txn.GraphTxn{
Post: obj.effect,
Lock: func() {}, // noop for now
Unlock: func() {}, // noop for now
GraphAPI: obj,
RefCount: obj.refCount, // reference counting
//FreeFunc: free,
}).Init()
}
// addVertex is the lockless version of the AddVertex function. This is needed
// so that AddEdge can add two vertices within the same lock.
func (obj *Engine) addVertex(f interfaces.Func) error {
if _, exists := obj.state[f]; exists {
// don't err dupes, because it makes using the AddEdge API yucky
return nil
}
// add some extra checks for easier debugging
if f == nil {
return fmt.Errorf("missing func")
}
if f.Info() == nil {
return fmt.Errorf("missing func info for node: %s", f)
}
sig := f.Info().Sig
if sig == nil {
return fmt.Errorf("missing func sig for node: %s", f)
}
if sig.Kind != types.KindFunc {
return fmt.Errorf("kind is not func for node: %s", f)
}
if err := f.Validate(); err != nil {
return errwrap.Wrapf(err, "did not Validate node: %s", f)
}
txn := obj.Txn()
// This is the one of two places where we modify this map. To avoid
// concurrent writes, we only do this when we're locked! Anywhere that
// can read where we are locked must have a mutex around it or do the
// lookup when we're in an unlocked state.
node := &state{
Func: f,
name: f.String(), // cache a name to avoid locks
txn: txn,
//running: false,
//epoch: 0,
}
init := &interfaces.Init{
Hostname: obj.Hostname,
Event: func(ctx context.Context) error {
return obj.event(ctx, node) // pass state to avoid search
},
Txn: node.txn,
Local: obj.Local,
World: obj.World,
Debug: obj.Debug,
Logf: func(format string, v ...interface{}) {
// safe Logf in case f.String contains %? chars...
s := f.String() + ": " + fmt.Sprintf(format, v...)
obj.Logf("%s", s)
},
}
op := &addVertex{
f: f,
fn: func(ctx context.Context) error {
return f.Init(init) // TODO: should this take a ctx?
},
}
obj.ops = append(obj.ops, op) // mark for cleanup during interrupt
obj.state[f] = node // do this here b/c we rely on knowing it in real-time
obj.graph.AddVertex(f) // Txn relies on this happening now while it runs.
return nil
}
// AddVertex is the thread-safe way to add a vertex. You will need to call the
// engine Lock method before using this and the Unlock method afterwards.
func (obj *Engine) AddVertex(f interfaces.Func) error {
// No mutex needed here since this func runs in a non-concurrent Txn.
if obj.Debug {
obj.Logf("Engine:AddVertex: %p %s", f, f)
}
return obj.addVertex(f) // lockless version
}
// AddEdge is the thread-safe way to add an edge. You will need to call the
// engine Lock method before using this and the Unlock method afterwards. This
// will automatically run AddVertex on both input vertices if they are not
// already part of the graph. You should only create DAG's as this function
// engine cannot handle cycles and this method will error if you cause a cycle.
func (obj *Engine) AddEdge(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) error {
// No mutex needed here since this func runs in a non-concurrent Txn.
if obj.Debug {
obj.Logf("Engine:AddEdge %p %s: %p %s -> %p %s", fe, fe, f1, f1, f2, f2)
}
if obj.Debug { // not needed unless we have buggy graph building code
// safety check to avoid cycles
g := obj.graph.Copy()
//g.AddVertex(f1)
//g.AddVertex(f2)
g.AddEdge(f1, f2, fe)
if _, err := g.TopologicalSort(); err != nil {
return err // not a dag
}
// if we didn't cycle, we can modify the real graph safely...
}
// Does the graph already have these nodes in it?
//hasf1 := obj.graph.HasVertex(f1)
//hasf2 := obj.graph.HasVertex(f2)
if err := obj.addVertex(f1); err != nil { // lockless version
return err
}
if err := obj.addVertex(f2); err != nil {
// rollback f1 on error of f2
obj.deleteVertex(f1) // ignore any error
return err
}
// If f1 doesn't exist, let f1 (or it's incoming nodes) get the notify.
// If f2 is new, then it should get a new notification unless f1 is new.
// But there's no guarantee we didn't AddVertex(f2); AddEdge(f1, f2, e),
// so resend if f1 already exists. Otherwise it's not a new notification.
// previously: `if hasf1 && !hasf2`
//if hasf1 {
// //obj.resend[f2] = struct{}{} // resend notification to me
//}
obj.graph.AddEdge(f1, f2, fe) // replaces any existing edge here
// This shouldn't error, since the test graph didn't find a cycle. But
// we don't really need to do it, since the interrupt will run it too.
if obj.Debug { // not needed unless we have buggy graph building code
if _, err := obj.graph.TopologicalSort(); err != nil {
// programming error
panic(err) // not a dag
}
}
return nil
}
// deleteVertex is the lockless version of the DeleteVertex function. This is
// needed so that AddEdge can add two vertices within the same lock. It needs
// deleteVertex so it can rollback the first one if the second addVertex fails.
func (obj *Engine) deleteVertex(f interfaces.Func) error {
node, exists := obj.state[f]
if !exists {
return fmt.Errorf("vertex %p %s doesn't exist", f, f)
}
_ = node
// This is the one of two places where we modify this map. To avoid
// concurrent writes, we only do this when we're locked! Anywhere that
// can read where we are locked must have a mutex around it or do the
// lookup when we're in an unlocked state.
op := &deleteVertex{
f: f,
fn: func(ctx context.Context) error {
// XXX: do we run f.Done() first ? Did it run elsewhere?
cleanableFunc, ok := f.(interfaces.CleanableFunc)
if !ok {
return nil
}
return cleanableFunc.Cleanup(ctx)
},
}
obj.ops = append(obj.ops, op) // mark for cleanup during interrupt
delete(obj.state, f) // do this here b/c we rely on knowing it in real-time
obj.graph.DeleteVertex(f) // Txn relies on this happening now while it runs.
return nil
}
// DeleteVertex is the thread-safe way to delete a vertex. You will need to call
// the engine Lock method before using this and the Unlock method afterwards.
func (obj *Engine) DeleteVertex(f interfaces.Func) error {
// No mutex needed here since this func runs in a non-concurrent Txn.
if obj.Debug {
obj.Logf("Engine:DeleteVertex: %p %s", f, f)
}
return obj.deleteVertex(f) // lockless version
}
// DeleteEdge is the thread-safe way to delete an edge. You will need to call
// the engine Lock method before using this and the Unlock method afterwards.
func (obj *Engine) DeleteEdge(fe *interfaces.FuncEdge) error {
// No mutex needed here since this func runs in a non-concurrent Txn.
if obj.Debug {
f1, f2, found := obj.graph.LookupEdge(fe)
if found {
obj.Logf("Engine:DeleteEdge: %p %s -> %p %s", f1, f1, f2, f2)
} else {
obj.Logf("Engine:DeleteEdge: not found %p %s", fe, fe)
}
}
// Don't bother checking if edge exists first and don't error if it
// doesn't because it might have gotten deleted when a vertex did, and
// so there's no need to complain for nothing.
obj.graph.DeleteEdge(fe)
return nil
}
// HasVertex is the thread-safe way to check if a vertex exists in the graph.
// You will need to call the engine Lock method before using this and the Unlock
// method afterwards.
func (obj *Engine) HasVertex(f interfaces.Func) bool {
// No mutex needed here since this func runs in a non-concurrent Txn.
return obj.graph.HasVertex(f)
}
// LookupEdge is the thread-safe way to check which vertices (if any) exist
// between an edge in the graph. You will need to call the engine Lock method
// before using this and the Unlock method afterwards.
func (obj *Engine) LookupEdge(fe *interfaces.FuncEdge) (interfaces.Func, interfaces.Func, bool) {
// No mutex needed here since this func runs in a non-concurrent Txn.
v1, v2, found := obj.graph.LookupEdge(fe)
if !found {
return nil, nil, found
}
f1, ok := v1.(interfaces.Func)
if !ok {
panic("not a Func")
}
f2, ok := v2.(interfaces.Func)
if !ok {
panic("not a Func")
}
return f1, f2, found
}
// FindEdge is the thread-safe way to check which edge (if any) exists between
// two vertices in the graph. This is an important method in edge removal,
// because it's what you really need to know for DeleteEdge to work. Requesting
// a specific deletion isn't very sensical in this library when specified as the
// edge pointer, since we might replace it with a new edge that has new arg
// names. Instead, use this to look up what relationship you want, and then
// DeleteEdge to remove it. You will need to call the engine Lock method before
// using this and the Unlock method afterwards.
func (obj *Engine) FindEdge(f1, f2 interfaces.Func) *interfaces.FuncEdge {
// No mutex needed here since this func runs in a non-concurrent Txn.
edge := obj.graph.FindEdge(f1, f2)
if edge == nil {
return nil
}
fe, ok := edge.(*interfaces.FuncEdge)
if !ok {
panic("edge is not a FuncEdge")
}
return fe
}
// Graph returns a copy of the contained graph.
func (obj *Engine) Graph() *pgraph.Graph {
// No mutex needed here since this func runs in a non-concurrent Txn.
return obj.graph.Copy()
}
// ExecGraphviz writes out the diagram of a graph to be used for visualization
// and debugging. You must not modify the graph (eg: during Lock) when calling
// this method.
func (obj *Engine) ExecGraphviz(dir string) error {
// No mutex needed here since this func runs in a non-concurrent Txn.
// No mutex is needed at this time because we only run this in txn's and
// it should only be run with debugging enabled. Bring your own mutex.
//obj.graphvizMutex.Lock()
//defer obj.graphvizMutex.Unlock()
obj.graphvizCount++ // increment
if dir == "" {
dir = obj.graphvizDirectory
}
if dir == "" { // XXX: hack for ergonomics
d := time.Now().UnixMilli()
dir = fmt.Sprintf("/tmp/dage-graphviz-%s-%d/", obj.Name, d)
obj.graphvizDirectory = dir
}
if !strings.HasSuffix(dir, "/") {
return fmt.Errorf("dir must end with a slash")
}
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
//dashedEdges, err := pgraph.NewGraph("dashedEdges")
//if err != nil {
// return err
//}
//for _, v1 := range obj.graph.Vertices() {
// // if it's a ChannelBasedSinkFunc...
// if cb, ok := v1.(*structs.ChannelBasedSinkFunc); ok {
// // ...then add a dashed edge to its output
// dashedEdges.AddEdge(v1, cb.Target, &pgraph.SimpleEdge{
// Name: "channel", // secret channel
// })
// }
// // if it's a ChannelBasedSourceFunc...
// if cb, ok := v1.(*structs.ChannelBasedSourceFunc); ok {
// // ...then add a dashed edge from its input
// dashedEdges.AddEdge(cb.Source, v1, &pgraph.SimpleEdge{
// Name: "channel", // secret channel
// })
// }
//}
gv := &pgraph.Graphviz{
Name: obj.graph.GetName(),
Filename: fmt.Sprintf("%s/%d.dot", dir, obj.graphvizCount),
Graphs: map[*pgraph.Graph]*pgraph.GraphvizOpts{
obj.graph: nil,
//dashedEdges: {
// Style: "dashed",
//},
},
}
if err := gv.Exec(); err != nil {
return err
}
return nil
}
// errAppend is a simple helper function.
func (obj *Engine) errAppend(err error) {
obj.errMutex.Lock()
obj.err = errwrap.Append(obj.err, err)
obj.errMutex.Unlock()
}
// state tracks some internal vertex-specific state information.
type state struct {
Func interfaces.Func
name string // cache a name here for safer concurrency
txn interfaces.Txn // API of GraphTxn struct to pass to each function
// started is true if this is a StreamableFunc, and Stream was started.
started bool
// epoch represents the "iteration count" through the graph. All values
// in a returned table should be part of the same epoch. This guarantees
// that they're all consistent with respect to each other.
epoch int64 // if this rolls over, we've been running for too many years
// result is the latest output from calling this function.
result types.Value
}
// String implements the fmt.Stringer interface for pretty printing!
func (obj *state) String() string {
if obj.name != "" {
return obj.name
}
return obj.Func.String()
}
// ops is either an addVertex or deleteVertex operation.
type ops interface {
}
// addVertex is one of the "ops" that are possible.
type addVertex struct {
f interfaces.Func
fn func(context.Context) error
}
// deleteVertex is one of the "ops" that are possible.
type deleteVertex struct {
f interfaces.Func
fn func(context.Context) error
}
// realEdgeCount tells us how many "logical" edges there are. We have shared
// edges which represent more than one value, when the same value is passed more
// than once. This takes those into account correctly.
func realEdgeCount(edges []pgraph.Edge) int {
total := 0
for _, edge := range edges {
fe, ok := edge.(*interfaces.FuncEdge)
if !ok {
total++
continue
}
total += len(fe.Args) // these can represent more than one edge!
}
return total
}