From 47d2a661bce5879c942c69bfbf26a53d70071d67 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Mon, 4 Sep 2023 18:17:05 -0400 Subject: [PATCH] lang: interfaces, funcs: Add a new graph engine called dage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This adds a new implementation of the function engine that runs the DAG function graph. This version is notable in that it can run a graph that changes shape over time. To make changes to the same of the graph, you must use the new transaction (Txn) system. This system implements a simple garbage collector (GC) for scheduled removal of nodes that the transaction system "reverses" out of the graph. Special thanks to Samuel Gélineau for his help hacking on and debugging so much of this concurrency work with me. --- lang/funcs/dage/dage.go | 1587 ++++++++++++++++++++++++++++++++++ lang/funcs/dage/dage_test.go | 793 +++++++++++++++++ lang/funcs/dage/ref.go | 282 ++++++ lang/funcs/dage/txn.go | 626 ++++++++++++++ lang/funcs/dage/txn_test.go | 503 +++++++++++ lang/funcs/dage/util_test.go | 30 + lang/interfaces/func.go | 107 ++- 7 files changed, 3926 insertions(+), 2 deletions(-) create mode 100644 lang/funcs/dage/dage.go create mode 100644 lang/funcs/dage/dage_test.go create mode 100644 lang/funcs/dage/ref.go create mode 100644 lang/funcs/dage/txn.go create mode 100644 lang/funcs/dage/txn_test.go create mode 100644 lang/funcs/dage/util_test.go diff --git a/lang/funcs/dage/dage.go b/lang/funcs/dage/dage.go new file mode 100644 index 00000000..6caed7ad --- /dev/null +++ b/lang/funcs/dage/dage.go @@ -0,0 +1,1587 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +// Package dage implements a DAG function engine. +// TODO: can we rename this to something more interesting? +package dage + +import ( + "context" + "fmt" + "os" + "sort" + "strings" + "sync" + "time" + + "github.com/purpleidea/mgmt/engine" + "github.com/purpleidea/mgmt/lang/funcs/structs" + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/lang/types" + "github.com/purpleidea/mgmt/pgraph" + "github.com/purpleidea/mgmt/util/errwrap" +) + +// Engine implements a dag engine which lets us "run" a dag of functions, but +// also allows us to modify it while we are running. +type Engine struct { + // Name is the name used for the instance of the engine and in the graph + // that is held within it. + Name string + + Hostname string + World engine.World + + Debug bool + Logf func(format string, v ...interface{}) + + // Callback can be specified as an alternative to using the Stream + // method to get events. If the context on it is cancelled, then it must + // shutdown quickly, because this means we are closing and want to + // disconnect. Whether you want to respect that is up to you, but the + // engine will not be able to close until you do. If specified, and an + // error has occurred, it will set that error property. + Callback func(context.Context, error) + + graph *pgraph.Graph // guarded by graphMutex + table map[interfaces.Func]types.Value // guarded by tableMutex + state map[interfaces.Func]*state + + // graphMutex wraps access to the table map. + graphMutex *sync.Mutex // TODO: &sync.RWMutex{} ? + + // tableMutex wraps access to the table map. + tableMutex *sync.RWMutex + + // refCount keeps track of vertex and edge references across the entire + // graph. + refCount *RefCount + + // wgTxn blocks shutdown until the initial Txn has Reversed. + wgTxn *sync.WaitGroup + + // firstTxn checks to make sure wgTxn is only used for the first Txn. + firstTxn bool + + wg *sync.WaitGroup + + // pause/resume state machine signals + pauseChan chan struct{} + pausedChan chan struct{} + resumeChan chan struct{} + resumedChan chan struct{} + + // resend tracks which new nodes might need a new notification + resend map[interfaces.Func]struct{} + + // nodeWaitFns is a list of cleanup functions to run after we've begun + // resume, but before we've resumed completely. These are actions that + // we would like to do when paused from a deleteVertex operation, but + // that would deadlock if we did. + nodeWaitFns []func() + + // nodeWaitMutex wraps access to the nodeWaitFns list. + nodeWaitMutex *sync.Mutex + + // streamChan is used to send notifications to the outside world. + streamChan chan error + + loaded bool // are all of the funcs loaded? + loadedChan chan struct{} // funcs loaded signal + + startedChan chan struct{} // closes when Run() starts + + // wakeChan contains a message when someone has asked for us to wake up. + wakeChan chan struct{} + + // ag is the aggregation channel which cues up outgoing events. + ag chan error + + // leafSend specifies if we should do an ag send because we have + // activity at a leaf. + leafSend bool + + // isClosed tracks nodes that have closed. This list is purged as they + // are removed from the graph. + isClosed map[*state]struct{} + + // activity tracks nodes that are ready to send to ag. The main process + // loop decides if we have the correct set to do so. A corresponding + // value of true means we have regular activity, and a value of false + // means the node closed. + activity map[*state]struct{} + + // stateMutex wraps access to the isClosed and activity maps. + stateMutex *sync.Mutex + + // stats holds some statistics and other debugging information. + stats *stats // guarded by statsMutex + + // statsMutex wraps access to the stats data. + statsMutex *sync.RWMutex + + // graphvizMutex wraps access to the Graphviz method. + graphvizMutex *sync.Mutex + + // graphvizCount keeps a running tally of how many graphs we've + // generated. This is useful for displaying a sequence (timeline) of + // graphs in a linear order. + graphvizCount int64 + + // graphvizDirectory stores the generated path for outputting graphviz + // files if one is not specified at runtime. + graphvizDirectory string +} + +// Setup sets up the internal datastructures needed for this engine. +func (obj *Engine) Setup() error { + var err error + obj.graph, err = pgraph.NewGraph(obj.Name) + if err != nil { + return err + } + obj.table = make(map[interfaces.Func]types.Value) + obj.state = make(map[interfaces.Func]*state) + obj.graphMutex = &sync.Mutex{} // TODO: &sync.RWMutex{} ? + obj.tableMutex = &sync.RWMutex{} + + obj.refCount = (&RefCount{}).Init() + + obj.wgTxn = &sync.WaitGroup{} + + obj.wg = &sync.WaitGroup{} + + obj.pauseChan = make(chan struct{}) + obj.pausedChan = make(chan struct{}) + obj.resumeChan = make(chan struct{}) + obj.resumedChan = make(chan struct{}) + + obj.resend = make(map[interfaces.Func]struct{}) + + obj.nodeWaitFns = []func(){} + + obj.nodeWaitMutex = &sync.Mutex{} + + obj.streamChan = make(chan error) + obj.loadedChan = make(chan struct{}) + obj.startedChan = make(chan struct{}) + + obj.wakeChan = make(chan struct{}, 1) // hold up to one message + + obj.ag = make(chan error) + + obj.isClosed = make(map[*state]struct{}) + + obj.activity = make(map[*state]struct{}) + obj.stateMutex = &sync.Mutex{} + + obj.stats = &stats{ + runningList: make(map[*state]struct{}), + loadedList: make(map[*state]bool), + inputList: make(map[*state]int64), + } + obj.statsMutex = &sync.RWMutex{} + + obj.graphvizMutex = &sync.Mutex{} + return nil +} + +// Cleanup cleans up and frees memory and resources after everything is done. +func (obj *Engine) Cleanup() error { + obj.wg.Wait() // don't cleanup these before Run() finished + close(obj.pauseChan) // free + close(obj.pausedChan) + close(obj.resumeChan) + close(obj.resumedChan) + return nil +} + +// Txn returns a transaction that is suitable for adding and removing from the +// graph. You must run Setup before this method is called. +func (obj *Engine) Txn() interfaces.Txn { + if obj.refCount == nil { + panic("you must run setup before first use") + } + // The very first initial Txn must have a wait group to make sure if we + // shutdown (in error) that we can Reverse things before the Lock/Unlock + // loop shutsdown. + var free func() + if !obj.firstTxn { + obj.firstTxn = true + obj.wgTxn.Add(1) + free = func() { + obj.wgTxn.Done() + } + } + return (&graphTxn{ + Lock: obj.Lock, + Unlock: obj.Unlock, + GraphAPI: obj, + RefCount: obj.refCount, // reference counting + FreeFunc: free, + }).init() +} + +// addVertex is the lockless version of the AddVertex function. This is needed +// so that AddEdge can add two vertices within the same lock. +func (obj *Engine) addVertex(f interfaces.Func) error { + if _, exists := obj.state[f]; exists { + // don't err dupes, because it makes using the AddEdge API yucky + return nil + } + + // add some extra checks for easier debugging + if f == nil { + return fmt.Errorf("missing func") + } + if f.Info() == nil { + return fmt.Errorf("missing func info") + } + sig := f.Info().Sig + if sig == nil { + return fmt.Errorf("missing func sig") + } + if sig.Kind != types.KindFunc { + return fmt.Errorf("must be kind func") + } + if err := f.Validate(); err != nil { + return errwrap.Wrapf(err, "node did not Validate") + } + + input := make(chan types.Value) + output := make(chan types.Value) + txn := obj.Txn() + + // This is the one of two places where we modify this map. To avoid + // concurrent writes, we only do this when we're locked! Anywhere that + // can read where we are locked must have a mutex around it or do the + // lookup when we're in an unlocked state. + node := &state{ + Func: f, + name: f.String(), // cache a name to avoid locks + + input: input, + output: output, + txn: txn, + + running: false, + wg: &sync.WaitGroup{}, + + rwmutex: &sync.RWMutex{}, + } + + init := &interfaces.Init{ + Hostname: obj.Hostname, + Input: node.input, + Output: node.output, + Txn: node.txn, + World: obj.World, + Debug: obj.Debug, + Logf: func(format string, v ...interface{}) { + // safe Logf in case f.String contains %? chars... + s := f.String() + ": " + fmt.Sprintf(format, v...) + obj.Logf("%s", s) + }, + } + + if err := f.Init(init); err != nil { + return err + } + // only now, do we modify the graph + obj.state[f] = node + obj.graph.AddVertex(f) + return nil +} + +// AddVertex is the thread-safe way to add a vertex. You will need to call the +// engine Lock method before using this and the Unlock method afterwards. +func (obj *Engine) AddVertex(f interfaces.Func) error { + obj.graphMutex.Lock() + defer obj.graphMutex.Unlock() + if obj.Debug { + obj.Logf("Engine:AddVertex: %p %s", f, f) + } + + return obj.addVertex(f) // lockless version +} + +// AddEdge is the thread-safe way to add an edge. You will need to call the +// engine Lock method before using this and the Unlock method afterwards. This +// will automatically run AddVertex on both input vertices if they are not +// already part of the graph. You should only create DAG's as this function +// engine cannot handle cycles and this method will error if you cause a cycle. +func (obj *Engine) AddEdge(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) error { + obj.graphMutex.Lock() + defer obj.graphMutex.Unlock() + if obj.Debug { + obj.Logf("Engine:AddEdge %p %s: %p %s -> %p %s", fe, fe, f1, f1, f2, f2) + } + + // safety check to avoid cycles + g := obj.graph.Copy() + //g.AddVertex(f1) + //g.AddVertex(f2) + g.AddEdge(f1, f2, fe) + if _, err := g.TopologicalSort(); err != nil { + return err // not a dag + } + // if we didn't cycle, we can modify the real graph safely... + + // Does the graph already have these nodes in it? + hasf1 := obj.graph.HasVertex(f1) + //hasf2 := obj.graph.HasVertex(f2) + + if err := obj.addVertex(f1); err != nil { // lockless version + return err + } + if err := obj.addVertex(f2); err != nil { + // rollback f1 on error of f2 + obj.deleteVertex(f1) // ignore any error + return err + } + + // If f1 doesn't exist, let f1 (or it's incoming nodes) get the notify. + // If f2 is new, then it should get a new notification unless f1 is new. + // But there's no guarantee we didn't AddVertex(f2); AddEdge(f1, f2, e), + // so resend if f1 already exists. Otherwise it's not a new notification. + // previously: `if hasf1 && !hasf2` + if hasf1 { + obj.resend[f2] = struct{}{} // resend notification to me + } + + obj.graph.AddEdge(f1, f2, fe) // replaces any existing edge here + + // This shouldn't error, since the test graph didn't find a cycle. + if _, err := obj.graph.TopologicalSort(); err != nil { + // programming error + panic(err) // not a dag + } + + return nil +} + +// deleteVertex is the lockless version of the DeleteVertex function. This is +// needed so that AddEdge can add two vertices within the same lock. It needs +// deleteVertex so it can rollback the first one if the second addVertex fails. +func (obj *Engine) deleteVertex(f interfaces.Func) error { + node, exists := obj.state[f] + if !exists { + return fmt.Errorf("vertex %p %s doesn't exist", f, f) + } + + if node.running { + // cancel the running vertex + node.cancel() // cancel inner ctx + + // We store this work to be performed later on in the main loop + // because this Wait() might be blocked by a defer Commit, which + // is itself blocked because this deleteVertex operation is part + // of a Commit. + obj.nodeWaitMutex.Lock() + obj.nodeWaitFns = append(obj.nodeWaitFns, func() { + node.wg.Wait() // While waiting, the Stream might cause a new Reverse Commit + node.txn.Free() // Clean up when done! + obj.stateMutex.Lock() + delete(obj.isClosed, node) // avoid memory leak + obj.stateMutex.Unlock() + }) + obj.nodeWaitMutex.Unlock() + } + + // This is the one of two places where we modify this map. To avoid + // concurrent writes, we only do this when we're locked! Anywhere that + // can read where we are locked must have a mutex around it or do the + // lookup when we're in an unlocked state. + delete(obj.state, f) + obj.graph.DeleteVertex(f) + return nil +} + +// DeleteVertex is the thread-safe way to delete a vertex. You will need to call +// the engine Lock method before using this and the Unlock method afterwards. +func (obj *Engine) DeleteVertex(f interfaces.Func) error { + obj.graphMutex.Lock() + defer obj.graphMutex.Unlock() + if obj.Debug { + obj.Logf("Engine:DeleteVertex: %p %s", f, f) + } + + return obj.deleteVertex(f) // lockless version +} + +// DeleteEdge is the thread-safe way to delete an edge. You will need to call +// the engine Lock method before using this and the Unlock method afterwards. +func (obj *Engine) DeleteEdge(fe *interfaces.FuncEdge) error { + obj.graphMutex.Lock() + defer obj.graphMutex.Unlock() + if obj.Debug { + f1, f2, found := obj.graph.LookupEdge(fe) + if found { + obj.Logf("Engine:DeleteEdge: %p %s -> %p %s", f1, f1, f2, f2) + } else { + obj.Logf("Engine:DeleteEdge: not found %p %s", fe, fe) + } + } + + // Don't bother checking if edge exists first and don't error if it + // doesn't because it might have gotten deleted when a vertex did, and + // so there's no need to complain for nothing. + obj.graph.DeleteEdge(fe) + + return nil +} + +// HasVertex is the thread-safe way to check if a vertex exists in the graph. +// You will need to call the engine Lock method before using this and the Unlock +// method afterwards. +func (obj *Engine) HasVertex(f interfaces.Func) bool { + obj.graphMutex.Lock() // XXX: should this be a RLock? + defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? + + return obj.graph.HasVertex(f) +} + +// LookupEdge is the thread-safe way to check which vertices (if any) exist +// between an edge in the graph. You will need to call the engine Lock method +// before using this and the Unlock method afterwards. +func (obj *Engine) LookupEdge(fe *interfaces.FuncEdge) (interfaces.Func, interfaces.Func, bool) { + obj.graphMutex.Lock() // XXX: should this be a RLock? + defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? + + v1, v2, found := obj.graph.LookupEdge(fe) + if !found { + return nil, nil, found + } + f1, ok := v1.(interfaces.Func) + if !ok { + panic("not a Func") + } + f2, ok := v2.(interfaces.Func) + if !ok { + panic("not a Func") + } + return f1, f2, found +} + +// FindEdge is the thread-safe way to check which edge (if any) exists between +// two vertices in the graph. This is an important method in edge removal, +// because it's what you really need to know for DeleteEdge to work. Requesting +// a specific deletion isn't very sensical in this library when specified as the +// edge pointer, since we might replace it with a new edge that has new arg +// names. Instead, use this to look up what relationship you want, and then +// DeleteEdge to remove it. You will need to call the engine Lock method before +// using this and the Unlock method afterwards. +func (obj *Engine) FindEdge(f1, f2 interfaces.Func) *interfaces.FuncEdge { + obj.graphMutex.Lock() // XXX: should this be a RLock? + defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? + + edge := obj.graph.FindEdge(f1, f2) + if edge == nil { + return nil + } + fe, ok := edge.(*interfaces.FuncEdge) + if !ok { + panic("edge is not a FuncEdge") + } + + return fe +} + +// Lock must be used before modifying the running graph. Make sure to Unlock +// when done. +// XXX: should Lock take a context if we want to bail mid-way? +// TODO: could we replace pauseChan with SubscribedSignal ? +func (obj *Engine) Lock() { // pause + select { + case obj.pauseChan <- struct{}{}: + } + //obj.rwmutex.Lock() // TODO: or should it go right before pauseChan? + + // waiting for the pause to move to paused... + select { + case <-obj.pausedChan: + } + // this mutex locks at start of Run() and unlocks at finish of Run() + obj.graphMutex.Unlock() // safe to make changes now +} + +// Unlock must be used after modifying the running graph. Make sure to Lock +// beforehand. +// XXX: should Unlock take a context if we want to bail mid-way? +func (obj *Engine) Unlock() { // resume + // this mutex locks at start of Run() and unlocks at finish of Run() + obj.graphMutex.Lock() // no more changes are allowed + select { + case obj.resumeChan <- struct{}{}: + } + //obj.rwmutex.Unlock() // TODO: or should it go right after resumedChan? + + // waiting for the resume to move to resumed... + select { + case <-obj.resumedChan: + } +} + +// wake sends a message to the wake queue to wake up the main process function +// which would otherwise spin unnecessarily. This can be called anytime, and +// doesn't hurt, it only wastes cpu if there's nothing to do. This does NOT ever +// block, and that's important so it can be called from anywhere. +func (obj *Engine) wake(name string) { + // The mutex guards the len check to avoid this function sending two + // messages down the channel, because the second would block if the + // consumer isn't fast enough. This mutex makes this method effectively + // asynchronous. + //obj.wakeMutex.Lock() + //defer obj.wakeMutex.Unlock() + //if len(obj.wakeChan) > 0 { // collapse duplicate, pending wake signals + // return + //} + select { + case obj.wakeChan <- struct{}{}: // send to chan of length 1 + if obj.Debug { + obj.Logf("wake sent from: %s", name) + } + default: // this is a cheap alternative to avoid the mutex altogether! + if obj.Debug { + obj.Logf("wake skip from: %s", name) + } + // skip sending, we already have a message pending! + } +} + +// runNodeWaitFns is a helper to run the cleanup nodeWaitFns list. It clears the +// list after it runs. +func (obj *Engine) runNodeWaitFns() { + // The lock is probably not needed here, but it won't hurt either. + obj.nodeWaitMutex.Lock() + defer obj.nodeWaitMutex.Unlock() + for _, fn := range obj.nodeWaitFns { + fn() + } + obj.nodeWaitFns = []func(){} // clear +} + +// process is the inner loop that runs through the entire graph. It can be +// called successively safely, as it is roughly idempotent, and is used to push +// values through the graph. If it is interrupted, it can pick up where it left +// off on the next run. This does however require it to re-check some things, +// but that is the price we pay for being always available to unblock. +// Importantly, re-running this resumes work in progress even if there was +// caching, and that if interrupted, it'll be queued again so as to not drop a +// wakeChan notification! We know we've read all the pending incoming values, +// because the Stream reader call wake(). +func (obj *Engine) process(ctx context.Context) (reterr error) { + defer func() { + // catch programming errors + if r := recover(); r != nil { + obj.Logf("Panic in process: %+v", r) + reterr = fmt.Errorf("panic in process: %+v", r) + } + }() + + // Toposort in dependency order. + topoSort, err := obj.graph.TopologicalSort() + if err != nil { + return err + } + + loaded := true // assume we emitted at least one value for now... + + outDegree := obj.graph.OutDegree() // map[Vertex]int + + for _, v := range topoSort { + f, ok := v.(interfaces.Func) + if !ok { + panic("not a Func") + } + node, exists := obj.state[f] + if !exists { + panic(fmt.Sprintf("missing node in iterate: %s", f)) + } + + out, exists := outDegree[f] + if !exists { + panic(fmt.Sprintf("missing out degree in iterate: %s", f)) + } + //outgoing := obj.graph.OutgoingGraphVertices(f) // []pgraph.Vertex + //node.isLeaf = len(outgoing) == 0 + node.isLeaf = out == 0 // store + + // TODO: the obj.loaded stuff isn't really consumed currently + node.rwmutex.RLock() + if !node.loaded { + loaded = false // we were wrong + } + node.rwmutex.RUnlock() + + // TODO: memoize since graph shape doesn't change in this loop! + incoming := obj.graph.IncomingGraphVertices(f) // []pgraph.Vertex + + // no incoming edges, so no incoming data + if len(incoming) == 0 || node.inputClosed { // we do this below + if !node.inputClosed { + node.inputClosed = true + close(node.input) + } + continue + } // else, process input data below... + + ready := true // assume all input values are ready for now... + inputClosed := true // assume all inputs have closed for now... + si := &types.Type{ + // input to functions are structs + Kind: types.KindStruct, + Map: node.Func.Info().Sig.Map, + Ord: node.Func.Info().Sig.Ord, + } + st := types.NewStruct(si) + // The above builds a struct with fields + // populated for each key (empty values) + // so we need to very carefully check if + // every field is received before we can + // safely send it downstream to an edge. + need := make(map[string]struct{}) // keys we need + for _, k := range node.Func.Info().Sig.Ord { + need[k] = struct{}{} + } + + for _, vv := range incoming { + ff, ok := vv.(interfaces.Func) + if !ok { + panic("not a Func") + } + obj.tableMutex.RLock() + value, exists := obj.table[ff] + obj.tableMutex.RUnlock() + if !exists { + ready = false // nope! + inputClosed = false // can't be, it's not even ready yet + break + } + // XXX: do we need a lock around reading obj.state? + fromNode, exists := obj.state[ff] + if !exists { + panic(fmt.Sprintf("missing node in notify: %s", ff)) + } + if !fromNode.outputClosed { + inputClosed = false // if any still open, then we are + } + + // set each arg, since one value + // could get used for multiple + // function inputs (shared edge) + args := obj.graph.Adjacency()[ff][f].(*interfaces.FuncEdge).Args + for _, arg := range args { + // populate struct + if err := st.Set(arg, value); err != nil { + //panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v", node, fromNode, err)) + keys := []string{} + for k := range st.Struct() { + keys = append(keys, k) + } + panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v, has: %v", node, fromNode, err, keys)) + } + if _, exists := need[arg]; !exists { + keys := []string{} + for k := range st.Struct() { + keys = append(keys, k) + } + // could be either a duplicate or an unwanted field (edge name) + panic(fmt.Sprintf("unexpected struct key on `%s` from `%s`: %v, has: %v", node, fromNode, err, keys)) + } + delete(need, arg) + } + } + + if !ready || len(need) != 0 { + continue // definitely continue, don't break here + } + + // previously it was closed, skip sending + if node.inputClosed { + continue + } + + // XXX: respect the info.Pure and info.Memo fields somewhere... + + // XXX: keep track of some state about who i sent to last before + // being interrupted so that I can avoid resending to some nodes + // if it's not necessary... + + // It's critical to avoid deadlock with this sending select that + // any events that could happen during this send can be + // preempted and that future executions of this function can be + // resumed. We must return with an error to let folks know that + // we were interrupted. + obj.Logf("send to func `%s`", node) + select { + case node.input <- st: // send to function + obj.statsMutex.Lock() + val, _ := obj.stats.inputList[node] // val is # or zero + obj.stats.inputList[node] = val + 1 // increment + obj.statsMutex.Unlock() + // pass + + case <-node.ctx.Done(): // node died + obj.wake("node.ctx.Done()") // interrupted, so queue again + // This scenario *can* happen, although it is rare. It + // triggered the old `chFn && errFn == context.Canceled` + // case which we've now removed. + //return node.ctx.Err() // old behaviour which was wrong + continue // probably best to return and come finish later + + case <-ctx.Done(): + obj.wake("node ctx.Done()") // interrupted, so queue again + return ctx.Err() + } + + // It's okay if this section gets preempted and we re-run this + // function. The worst that happens is we end up sending the + // same input data a second time. This means that we could in + // theory be causing unnecessary graph changes (and locks which + // cause preemption here) if nodes that cause locks aren't + // skipping duplicate/identical input values! + if inputClosed && !node.inputClosed { + node.inputClosed = true + close(node.input) + } + + // XXX: Do we need to somehow wait to make sure that node has + // the time to send at least one output? + // XXX: We could add a counter to each input that gets passed + // through the function... Eg: if we pass in 4, we should wait + // until a 4 comes out the output side. But we'd need to change + // the signature of func for this... + + } // end topoSort loop + + // It's okay if this section gets preempted and we re-run this bit here. + obj.loaded = loaded // this gets reset when graph adds new nodes + + if !loaded { + return nil + } + + // Check each leaf and make sure they're all ready to send, for us to + // send anything to ag channel. In addition, we need at least one send + // message from any of the valid isLeaf nodes. Since this only runs if + // everyone is loaded, we just need to check for activty leaf nodes. + obj.stateMutex.Lock() + for node := range obj.activity { + if obj.leafSend { + break // early + } + + // down here we need `true` activity! + if node.isLeaf { // calculated above in the previous loop + obj.leafSend = true + break + } + } + obj.activity = make(map[*state]struct{}) // clear + //clear(obj.activity) // new clear + + // This check happens here after the send loop to make sure one value + // got in and we didn't close it off too early. + for node := range obj.isClosed { // these are closed + node.outputClosed = true + } + obj.stateMutex.Unlock() + + if !obj.leafSend { + return nil + } + + select { + case obj.ag <- nil: // send to aggregate channel if we have events + obj.Logf("aggregated send") + obj.leafSend = false // reset + + case <-ctx.Done(): + obj.leafSend = true // since we skipped the ag send! + obj.wake("process ctx.Done()") // interrupted, so queue again + return ctx.Err() + + // XXX: should we even allow this default case? + //default: + // // exit if we're not ready to send to ag + // obj.leafSend = true // since we skipped the ag send! + // obj.wake("process default") // interrupted, so queue again + } + + return nil +} + +// Run kicks off the main engine. This takes a mutex. When we're "paused" the +// mutex is temporarily released until we "resume". Those operations transition +// with the engine Lock and Unlock methods. It is recommended to only add +// vertices to the engine after it's running. If you add them before Run, then +// Run will cause a Lock/Unlock to occur to cycle them in. Lock and Unlock race +// with the cancellation of this Run main loop. Make sure to only call one at a +// time. +func (obj *Engine) Run(ctx context.Context) (reterr error) { + obj.graphMutex.Lock() + defer obj.graphMutex.Unlock() + + // XXX: can the above defer get called while we are already unlocked? + // XXX: is it a possibility if we use <-Started() ? + + wg := &sync.WaitGroup{} + defer wg.Wait() + + defer func() { + // catch programming errors + if r := recover(); r != nil { + obj.Logf("Panic in Run: %+v", r) + reterr = fmt.Errorf("panic in Run: %+v", r) + } + }() + + ctx, cancel := context.WithCancel(ctx) // wrap parent + defer cancel() + + // Add a wait before the "started" signal runs so that Cleanup waits. + obj.wg.Add(1) + defer obj.wg.Done() + + // Send the start signal. + close(obj.startedChan) + + if n := obj.graph.NumVertices(); n > 0 { // hack to make the api easier + obj.Logf("graph contained %d vertices before Run", n) + wg.Add(1) + go func() { + defer wg.Done() + // kick the engine once to pull in any vertices from + // before we started running! + defer obj.Unlock() + obj.Lock() + }() + } + + once := &sync.Once{} + loadedSignal := func() { close(obj.loadedChan) } // only run once! + + // aggregate events channel + wg.Add(1) + go func() { + defer wg.Done() + defer close(obj.streamChan) + drain := false + for { + var err error + var ok bool + select { + case err, ok = <-obj.ag: // aggregated channel + if !ok { + return // channel shutdown + } + } + + if drain { + continue // no need to send more errors + } + + // TODO: check obj.loaded first? + once.Do(loadedSignal) + + // now send event... + if obj.Callback != nil { + // send stream signal (callback variant) + obj.Callback(ctx, err) + } else { + // send stream signal + select { + // send events or errors on streamChan + case obj.streamChan <- err: // send + case <-ctx.Done(): // when asked to exit + return + } + } + if err != nil { + cancel() // cancel the context! + //return // let the obj.ag channel drain + drain = true + } + } + }() + + // wgAg is a wait group that waits for all senders to the ag chan. + // Exceptionally, we don't close the ag channel until wgFor has also + // closed, because it can send to wg in process(). + wgAg := &sync.WaitGroup{} + wgFor := &sync.WaitGroup{} + + // We need to keep the main loop running until everyone else has shut + // down. When the top context closes, we wait for everyone to finish, + // and then we shut down this main context. + //mainCtx, mainCancel := context.WithCancel(ctx) // wrap parent + mainCtx, mainCancel := context.WithCancel(context.Background()) // DON'T wrap parent, close on your own terms + defer mainCancel() + + // close the aggregate channel when everyone is done with it... + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-ctx.Done(): + } + + // don't wait and close ag before we're really done with Run() + wgAg.Wait() // wait for last ag user to close + obj.wgTxn.Wait() // wait for first txn as well + mainCancel() // only cancel after wgAg goroutines are done + wgFor.Wait() // wait for process loop to close before closing + close(obj.ag) // last one closes the ag channel + }() + + wgFn := &sync.WaitGroup{} // wg for process function runner + defer wgFn.Wait() // extra safety + + defer obj.runNodeWaitFns() // just in case + + wgFor.Add(1) // make sure we wait for the below process loop to exit... + defer wgFor.Done() + + // errProcess and processBreakFn are used to help exit following an err. + // This approach is needed because if we simply exited, we'd block the + // main loop below because various Stream functions are waiting on the + // Lock/Unlock cycle to be able to finish cleanly, shutdown, and unblock + // all the waitgroups so we can exit. + var errProcess error + var pausedProcess bool + processBreakFn := func(err error /*, paused bool*/) { + if err == nil { // a nil error won't cause ag to shutdown below + panic("expected error, not nil") + } + obj.Logf("process break") + select { + case obj.ag <- err: // send error to aggregate channel + case <-ctx.Done(): + } + cancel() // to unblock + //mainCancel() // NO! + errProcess = err // set above error + //pausedProcess = paused // set this inline directly + } + if obj.Debug { + defer obj.Logf("exited main loop") + } + // we start off "running", but we'll have an empty graph initially... + for { + + // After we've resumed, we can try to exit. (shortcut) + // NOTE: If someone calls Lock(), which would send to + // obj.pauseChan, it *won't* deadlock here because mainCtx is + // only closed when all the worker waitgroups close first! + select { + case <-mainCtx.Done(): // when asked to exit + return errProcess // we exit happily + default: + } + + // run through our graph, check for pause request occasionally + for { + pausedProcess = false // reset + // if we're in errProcess, we skip the process loop! + if errProcess != nil { + break // skip this process loop + } + + // Start the process run for this iteration of the loop. + ctxFn, cancelFn := context.WithCancel(context.Background()) + // we run cancelFn() below to cleanup! + var errFn error + chanFn := make(chan struct{}) // normal exit signal + wgFn.Add(1) + go func() { + defer wgFn.Done() + defer close(chanFn) // signal that I exited + for { + if obj.Debug { + obj.Logf("process...") + } + if errFn = obj.process(ctxFn); errFn != nil { // store + if errFn != context.Canceled { + obj.Logf("process end err: %+v...", errFn) + } + return + } + if obj.Debug { + obj.Logf("process end...") + } + // If process finishes without error, we + // should sit here and wait until we get + // run again from a wake-up, or we exit. + select { + case <-obj.wakeChan: // wait until something has actually woken up... + obj.Logf("process wakeup...") + // loop! + case <-ctxFn.Done(): + errFn = context.Canceled + return + } + } + }() + + chFn := false + chPause := false + ctxExit := false + select { + //case <-obj.wakeChan: + // this happens entirely in the process inner, inner loop now. + + case <-chanFn: // process exited on it's own in error! + chFn = true + + case <-obj.pauseChan: + obj.Logf("pausing...") + chPause = true + + case <-mainCtx.Done(): // when asked to exit + //return nil // we exit happily + ctxExit = true + } + + //fmt.Printf("chPause: %+v\n", chPause) // debug + //fmt.Printf("ctxExit: %+v\n", ctxExit) // debug + + cancelFn() // cancel the process function + wgFn.Wait() // wait for the process function to return + + pausedProcess = chPause // tell the below select + if errFn == nil { + // break on errors (needs to know if paused) + processBreakFn(fmt.Errorf("unexpected nil error in process")) + break + } + if errFn != nil && errFn != context.Canceled { + // break on errors (needs to know if paused) + processBreakFn(errwrap.Wrapf(errFn, "process error")) + break + } + //if errFn == context.Canceled { + // // ignore, we asked for it + //} + + if ctxExit { + return nil // we exit happily + } + if chPause { + break + } + + // This used to happen if a node (in the list we are + // sending to) dies, and we returned with: + // `case <-node.ctx.Done():` // node died + // return node.ctx.Err() + // which caused this scenario. + if chFn && errFn == context.Canceled { // very rare case + // programming error + processBreakFn(fmt.Errorf("legacy unhandled process state")) + break + } + + // programming error + //return fmt.Errorf("unhandled process state") + processBreakFn(fmt.Errorf("unhandled process state")) + break + } + // if we're in errProcess, we need to add back in the pauseChan! + if errProcess != nil && !pausedProcess { + select { + case <-obj.pauseChan: + obj.Logf("lower pausing...") + + // do we want this exit case? YES + case <-mainCtx.Done(): // when asked to exit + return errProcess + } + } + + // Toposort for paused workers. We run this before the actual + // pause completes, because the second we are paused, the graph + // could then immediately change. We don't need a lock in here + // because the mutex only unlocks when pause is complete below. + //topoSort1, err := obj.graph.TopologicalSort() + //if err != nil { + // return err + //} + //for _, v := range topoSort1 {} + + // pause is complete + // no exit case from here, must be fully running or paused... + select { + case obj.pausedChan <- struct{}{}: + obj.Logf("paused!") + } + + // + // the graph changes shape right here... we are locked right now + // + + // wait until resumed/unlocked + select { + case <-obj.resumeChan: + obj.Logf("resuming...") + } + + // Do any cleanup needed from delete vertex. Or do we? + // We've ascertained that while we want this stuff to shutdown, + // and while we also know that a Stream() function running is a + // part of what we're waiting for to exit, it doesn't matter + // that it exits now! This is actually causing a deadlock + // because the pending Stream exit, might be calling a new + // Reverse commit, which means we're deadlocked. It's safe for + // the Stream to keep running, all it might do is needlessly add + // a new value to obj.table which won't bother us since we won't + // even use it in process. We _do_ want to wait for all of these + // before the final exit, but we already have that in a defer. + //obj.runNodeWaitFns() + + // Toposort to run/resume workers. (Bottom of toposort first!) + topoSort2, err := obj.graph.TopologicalSort() + if err != nil { + return err + } + reversed := pgraph.Reverse(topoSort2) + for _, v := range reversed { + f, ok := v.(interfaces.Func) + if !ok { + panic("not a Func") + } + node, exists := obj.state[f] + if !exists { + panic(fmt.Sprintf("missing node in iterate: %s", f)) + } + + if node.running { // it's not a new vertex + continue + } + obj.loaded = false // reset this + node.running = true + + obj.statsMutex.Lock() + val, _ := obj.stats.inputList[node] // val is # or zero + obj.stats.inputList[node] = val // initialize to zero + obj.statsMutex.Unlock() + + innerCtx, innerCancel := context.WithCancel(ctx) // wrap parent (not mainCtx) + // we defer innerCancel() in the goroutine to cleanup! + node.ctx = innerCtx + node.cancel = innerCancel + + // run mainloop + wgAg.Add(1) + node.wg.Add(1) + go func(f interfaces.Func, node *state) { + defer node.wg.Done() + defer wgAg.Done() + defer node.cancel() // if we close, clean up and send the signal to anyone watching + if obj.Debug { + obj.Logf("Running func `%s`", node) + obj.statsMutex.Lock() + obj.stats.runningList[node] = struct{}{} + obj.stats.loadedList[node] = false + obj.statsMutex.Unlock() + } + + fn := func(nodeCtx context.Context) (reterr error) { + defer func() { + // catch programming errors + if r := recover(); r != nil { + obj.Logf("Panic in Stream of func `%s`: %+v", node, r) + reterr = fmt.Errorf("panic in Stream of func `%s`: %+v", node, r) + } + }() + return f.Stream(nodeCtx) + } + runErr := fn(node.ctx) // wrap with recover() + if obj.Debug { + obj.Logf("Exiting func `%s`", node) + obj.statsMutex.Lock() + delete(obj.stats.runningList, node) + obj.statsMutex.Unlock() + } + if runErr != nil { + obj.Logf("Erroring func `%s`: %+v", node, runErr) + // send to a aggregate channel + // the first to error will cause ag to + // shutdown, so make sure we can exit... + select { + case obj.ag <- runErr: // send to aggregate channel + case <-node.ctx.Done(): + } + } + // if node never loaded, then we error in the node.output loop! + }(f, node) + + // consume output + wgAg.Add(1) + node.wg.Add(1) + go func(f interfaces.Func, node *state) { + defer node.wg.Done() + defer wgAg.Done() + defer func() { + // We record the fact that output + // closed, so we can eventually close + // the downstream node's input. + obj.stateMutex.Lock() + obj.isClosed[node] = struct{}{} // closed! + obj.stateMutex.Unlock() + // TODO: is this wake necessary? + obj.wake("closed") // closed, so wake up + }() + + for value := range node.output { // read from channel + if value == nil { + // bug! + obj.Logf("func `%s` got nil value", node) + panic("got nil value") + } + + obj.tableMutex.RLock() + cached, exists := obj.table[f] + obj.tableMutex.RUnlock() + if !exists { // first value received + // RACE: do this AFTER value is present! + //node.loaded = true // not yet please + obj.Logf("func `%s` started", node) + } else if value.Cmp(cached) == nil { + // skip if new value is same as previous + // if this happens often, it *might* be + // a bug in the function implementation + // FIXME: do we need to disable engine + // caching when using hysteresis? + obj.Logf("func `%s` skipped", node) + continue + } + obj.tableMutex.Lock() + obj.table[f] = value // save the latest + obj.tableMutex.Unlock() + node.rwmutex.Lock() + node.loaded = true // set *after* value is in :) + //obj.Logf("func `%s` changed", node) + node.rwmutex.Unlock() + + obj.statsMutex.Lock() + obj.stats.loadedList[node] = true + obj.statsMutex.Unlock() + + // Send a message to tell our ag channel + // that we might have sent an aggregated + // message here. They should check if we + // are a leaf and if we glitch or not... + // Make sure we do this before the wake. + obj.stateMutex.Lock() + obj.activity[node] = struct{}{} // activity! + obj.stateMutex.Unlock() + + obj.wake("new value") // new value, so send wake up + + } // end for + + // no more output values are coming... + //obj.Logf("func `%s` stopped", node) + + // nodes that never loaded will cause the engine to hang + if !node.loaded { + select { + case obj.ag <- fmt.Errorf("func `%s` stopped before it was loaded", node): + case <-node.ctx.Done(): + return + } + } + + }(f, node) + + } // end for + + // Send new notifications in case any new edges are sending away + // to these... They might have already missed the notifications! + for k := range obj.resend { // resend TO these! + node, exists := obj.state[k] + if !exists { + continue + } + // Run as a goroutine to avoid erroring in parent thread. + wg.Add(1) + go func(node *state) { + defer wg.Done() + obj.Logf("resend to func `%s`", node) + obj.wake("resend") // new value, so send wake up + }(node) + } + obj.resend = make(map[interfaces.Func]struct{}) // reset + + // now check their states... + //for _, v := range reversed { + // v, ok := v.(interfaces.Func) + // if !ok { + // panic("not a Func") + // } + // // wait for startup? + // close(obj.state[v].startup) XXX: once? + //} + + // resume is complete + // no exit case from here, must be fully running or paused... + select { + case obj.resumedChan <- struct{}{}: + obj.Logf("resumed!") + } + + } // end for +} + +// Stream returns a channel that you can follow to get aggregated graph events. +// Do not block reading from this channel as you can hold up the entire engine. +func (obj *Engine) Stream() <-chan error { + return obj.streamChan +} + +// Loaded returns a channel that closes when the function engine loads. +func (obj *Engine) Loaded() <-chan struct{} { + return obj.loadedChan +} + +// Table returns a copy of the populated data table of values. We return a copy +// because since these values are constantly changing, we need an atomic +// snapshot to present to the consumer of this API. +// TODO: is this globally glitch consistent? +// TODO: do we need an API to return a single value? (wrapped in read locks) +func (obj *Engine) Table() map[interfaces.Func]types.Value { + obj.tableMutex.RLock() + defer obj.tableMutex.RUnlock() + table := make(map[interfaces.Func]types.Value) + for k, v := range obj.table { + //table[k] = v.Copy() // TODO: do we need to copy these values? + table[k] = v + } + return table +} + +// Apply is similar to Table in that it gives you access to the internal output +// table of data, the difference being that it instead passes this information +// to a function of your choosing and holds a read/write lock during the entire +// time that your function is synchronously executing. If you use this function +// to spawn any goroutines that read or write data, then you're asking for a +// panic. +// XXX: does this need to be a Lock? Can it be an RLock? Check callers! +func (obj *Engine) Apply(fn func(map[interfaces.Func]types.Value) error) error { + // XXX: does this need to be a Lock? Can it be an RLock? Check callers! + obj.tableMutex.Lock() // differs from above RLock around obj.table + defer obj.tableMutex.Unlock() + table := make(map[interfaces.Func]types.Value) + for k, v := range obj.table { + //table[k] = v.Copy() // TODO: do we need to copy these values? + table[k] = v + } + + return fn(table) +} + +// Started returns a channel that closes when the Run function finishes starting +// up. This is useful so that we can wait before calling any of the mutex things +// that would normally panic if Run wasn't started up first. +func (obj *Engine) Started() <-chan struct{} { + return obj.startedChan +} + +// NumVertices returns the number of vertices in the current graph. +func (obj *Engine) NumVertices() int { + // XXX: would this deadlock if we added this? + //obj.graphMutex.Lock() // XXX: should this be a RLock? + //defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? + return obj.graph.NumVertices() +} + +// Stats returns some statistics in a human-readable form. +func (obj *Engine) Stats() string { + defer obj.statsMutex.RUnlock() + obj.statsMutex.RLock() + + return obj.stats.String() +} + +// Graphviz writes out the diagram of a graph to be used for visualization and +// debugging. You must not modify the graph (eg: during Lock) when calling this +// method. +func (obj *Engine) Graphviz(dir string) error { + // XXX: would this deadlock if we added this? + //obj.graphMutex.Lock() // XXX: should this be a RLock? + //defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? + + obj.graphvizMutex.Lock() + defer obj.graphvizMutex.Unlock() + + obj.graphvizCount++ // increment + + if dir == "" { + dir = obj.graphvizDirectory + } + if dir == "" { // XXX: hack for ergonomics + d := time.Now().UnixMilli() + dir = fmt.Sprintf("/tmp/dage-graphviz-%s-%d/", obj.Name, d) + obj.graphvizDirectory = dir + } + if !strings.HasSuffix(dir, "/") { + return fmt.Errorf("dir must end with a slash") + } + + if err := os.MkdirAll(dir, 0755); err != nil { + return err + } + + dashedEdges, err := pgraph.NewGraph("dashedEdges") + if err != nil { + return err + } + for _, v1 := range obj.graph.Vertices() { + // if it's a ChannelBasedSinkFunc... + if cb, ok := v1.(*structs.ChannelBasedSinkFunc); ok { + // ...then add a dashed edge to its output + dashedEdges.AddEdge(v1, cb.Target, &pgraph.SimpleEdge{ + Name: "channel", // secret channel + }) + } + // if it's a ChannelBasedSourceFunc... + if cb, ok := v1.(*structs.ChannelBasedSourceFunc); ok { + // ...then add a dashed edge from its input + dashedEdges.AddEdge(cb.Source, v1, &pgraph.SimpleEdge{ + Name: "channel", // secret channel + }) + } + } + + gv := &pgraph.Graphviz{ + Name: obj.graph.GetName(), + Filename: fmt.Sprintf("%s/%d.dot", dir, obj.graphvizCount), + Graphs: map[*pgraph.Graph]*pgraph.GraphvizOpts{ + obj.graph: nil, + dashedEdges: { + Style: "dashed", + }, + }, + } + + if err := gv.Exec(); err != nil { + return err + } + return nil +} + +// state tracks some internal vertex-specific state information. +type state struct { + Func interfaces.Func + name string // cache a name here for safer concurrency + + input chan types.Value // the top level type must be a struct + output chan types.Value + txn interfaces.Txn // API of graphTxn struct to pass to each function + + //init bool // have we run Init on our func? + //ready bool // has it received all the args it needs at least once? + loaded bool // has the func run at least once ? + inputClosed bool // is our input closed? + outputClosed bool // is our output closed? + + isLeaf bool // is my out degree zero? + + running bool + wg *sync.WaitGroup + ctx context.Context // per state ctx (inner ctx) + cancel func() // cancel above inner ctx + + rwmutex *sync.RWMutex // concurrency guard for reading/modifying this state +} + +// String implements the fmt.Stringer interface for pretty printing! +func (obj *state) String() string { + if obj.name != "" { + return obj.name + } + + return obj.Func.String() +} + +// stats holds some statistics and other debugging information. +type stats struct { + + // runningList keeps track of which nodes are still running. + runningList map[*state]struct{} + + // loadedList keeps track of which nodes have loaded. + loadedList map[*state]bool + + // inputList keeps track of the number of inputs each node received. + inputList map[*state]int64 +} + +// String implements the fmt.Stringer interface for printing out our collected +// statistics! +func (obj *stats) String() string { + // XXX: just build the lock into *stats instead of into our dage obj + s := "stats:\n" + { + s += "\trunning:\n" + names := []string{} + for k := range obj.runningList { + names = append(names, k.String()) + } + sort.Strings(names) + for _, name := range names { + s += fmt.Sprintf("\t * %s\n", name) + } + } + { + nodes := []*state{} + for k := range obj.loadedList { + nodes = append(nodes, k) + } + sort.Slice(nodes, func(i, j int) bool { return nodes[i].String() < nodes[j].String() }) + + s += "\tloaded:\n" + for _, node := range nodes { + if !obj.loadedList[node] { + continue + } + s += fmt.Sprintf("\t * %s\n", node) + } + + s += "\tnot loaded:\n" + for _, node := range nodes { + if obj.loadedList[node] { + continue + } + s += fmt.Sprintf("\t * %s\n", node) + } + } + { + s += "\tinput count:\n" + nodes := []*state{} + for k := range obj.inputList { + nodes = append(nodes, k) + } + //sort.Slice(nodes, func(i, j int) bool { return nodes[i].String() < nodes[j].String() }) + sort.Slice(nodes, func(i, j int) bool { return obj.inputList[nodes[i]] < obj.inputList[nodes[j]] }) + for _, node := range nodes { + s += fmt.Sprintf("\t * (%d) %s\n", obj.inputList[node], node) + } + } + return s +} diff --git a/lang/funcs/dage/dage_test.go b/lang/funcs/dage/dage_test.go new file mode 100644 index 00000000..9fcf44dc --- /dev/null +++ b/lang/funcs/dage/dage_test.go @@ -0,0 +1,793 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package dage + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/lang/types" + "github.com/purpleidea/mgmt/util" +) + +type testFunc struct { + Name string + Type *types.Type + Func func(types.Value) (types.Value, error) + Meta *meta + + value types.Value + init *interfaces.Init +} + +func (obj *testFunc) String() string { return obj.Name } + +func (obj *testFunc) Info() *interfaces.Info { + return &interfaces.Info{ + Pure: true, + Memo: false, // TODO: should this be something we specify here? + Sig: obj.Type, + Err: obj.Validate(), + } +} + +func (obj *testFunc) Validate() error { + if obj.Meta == nil { + return fmt.Errorf("test case error: did you add the vertex to the vertices list?") + } + return nil +} + +func (obj *testFunc) Init(init *interfaces.Init) error { + obj.init = init + return nil +} + +func (obj *testFunc) Stream(ctx context.Context) error { + defer close(obj.init.Output) // the sender closes + defer obj.init.Logf("stream closed") + obj.init.Logf("stream startup") + + // make some placeholder value because obj.value is nil + constValue, err := types.ValueOfGolang("hello") + if err != nil { + return err // unlikely + } + + for { + select { + case input, ok := <-obj.init.Input: + if !ok { + obj.init.Logf("stream input closed") + obj.init.Input = nil // don't get two closes + // already sent one value, so we can shutdown + if obj.value != nil { + return nil // can't output any more + } + + obj.value = constValue + } else { + obj.init.Logf("stream got input type(%T) value: (%+v)", input, input) + if obj.Func == nil { + obj.value = constValue + } + + if obj.Func != nil { + //obj.init.Logf("running internal function...") + v, err := obj.Func(input) // run me! + if err != nil { + return err + } + obj.value = v + } + } + + case <-ctx.Done(): + return nil + } + + select { + case obj.init.Output <- obj.value: // send anything + // add some monitoring... + obj.Meta.wg.Add(1) + go func() { + // no need for lock here + defer obj.Meta.wg.Done() + if obj.Meta.debug { + obj.Meta.logf("sending an internal event!") + } + + select { + case obj.Meta.Events[obj.Name] <- struct{}{}: + case <-obj.Meta.ctx.Done(): + } + }() + + case <-ctx.Done(): + return nil + } + } +} + +type meta struct { + EventCount int + Event chan struct{} + Events map[string]chan struct{} + + ctx context.Context + wg *sync.WaitGroup + mutex *sync.Mutex + + debug bool + logf func(format string, v ...interface{}) +} + +func (obj *meta) Lock() { obj.mutex.Lock() } +func (obj *meta) Unlock() { obj.mutex.Unlock() } + +type dageTestOp func(*Engine, interfaces.Txn, *meta) error + +func TestDageTable(t *testing.T) { + + type test struct { // an individual test + name string + vertices []interfaces.Func + actions []dageTestOp + } + testCases := []test{} + { + testCases = append(testCases, test{ + name: "empty graph", + vertices: []interfaces.Func{}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + time.Sleep(1 * time.Second) // XXX: unfortunate + defer engine.Unlock() + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + // We don't expect an empty graph to send events. + if meta.EventCount != 0 { + return fmt.Errorf("got too many stream events") + } + return nil + }, + }, + }) + } + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + + testCases = append(testCases, test{ + name: "simple add vertex", + vertices: []interfaces.Func{f1}, // so the test engine can pass in debug/observability handles + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + return engine.AddVertex(f1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 1 { + return fmt.Errorf("didn't get any stream events") + } + return nil + }, + }, + }) + } + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + // e1 arg name must match incoming edge to it + f2 := &testFunc{Name: "f2", Type: types.NewType("func(e1 str) str")} + e1 := testEdge("e1") + + testCases = append(testCases, test{ + name: "simple add edge", + vertices: []interfaces.Func{f1, f2}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + return engine.AddVertex(f1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + engine.Lock() + defer engine.Unlock() + // This newly added node should get a notification after it starts. + return engine.AddEdge(f1, f2, e1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 2 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + }, + }) + } + { + // diamond + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + f2 := &testFunc{Name: "f2", Type: types.NewType("func(e1 str) str")} + f3 := &testFunc{Name: "f3", Type: types.NewType("func(e2 str) str")} + f4 := &testFunc{Name: "f4", Type: types.NewType("func(e3 str, e4 str) str")} + e1 := testEdge("e1") + e2 := testEdge("e2") + e3 := testEdge("e3") + e4 := testEdge("e4") + + testCases = append(testCases, test{ + name: "simple add multiple edges", + vertices: []interfaces.Func{f1, f2, f3, f4}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + return engine.AddVertex(f1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + if err := engine.AddEdge(f1, f2, e1); err != nil { + return err + } + if err := engine.AddEdge(f1, f3, e2); err != nil { + return err + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + if err := engine.AddEdge(f2, f4, e3); err != nil { + return err + } + if err := engine.AddEdge(f3, f4, e4); err != nil { + return err + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + //meta.Lock() + //defer meta.Unlock() + num := 1 + for { + if num == 0 { + break + } + select { + case _, ok := <-meta.Event: + if !ok { + return fmt.Errorf("unexpectedly channel close") + } + num-- + if meta.debug { + meta.logf("got an event!") + } + case <-meta.ctx.Done(): + return meta.ctx.Err() + } + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 1 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + //meta.Lock() + //defer meta.Unlock() + num := 1 + for { + if num == 0 { + break + } + bt := util.BlockedTimer{Seconds: 2} + defer bt.Cancel() + bt.Printf("waiting for f4...\n") + select { + case _, ok := <-meta.Events["f4"]: + bt.Cancel() + if !ok { + return fmt.Errorf("unexpectedly channel close") + } + num-- + if meta.debug { + meta.logf("got an event from f4!") + } + case <-meta.ctx.Done(): + return meta.ctx.Err() + } + } + return nil + }, + }, + }) + } + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + + testCases = append(testCases, test{ + name: "simple add/delete vertex", + vertices: []interfaces.Func{f1}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + return engine.AddVertex(f1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 1 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + + //meta.Lock() + //defer meta.Unlock() + if meta.debug { + meta.logf("about to delete vertex f1!") + defer meta.logf("done deleting vertex f1!") + } + + return engine.DeleteVertex(f1) + }, + }, + }) + } + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + // e1 arg name must match incoming edge to it + f2 := &testFunc{Name: "f2", Type: types.NewType("func(e1 str) str")} + e1 := testEdge("e1") + + testCases = append(testCases, test{ + name: "simple add/delete edge", + vertices: []interfaces.Func{f1, f2}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + return engine.AddVertex(f1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + engine.Lock() + defer engine.Unlock() + // This newly added node should get a notification after it starts. + return engine.AddEdge(f1, f2, e1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 2 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + return engine.DeleteEdge(e1) + }, + }, + }) + } + + // the following tests use the txn instead of direct locks + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + + testCases = append(testCases, test{ + name: "txn simple add vertex", + vertices: []interfaces.Func{f1}, // so the test engine can pass in debug/observability handles + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + return txn.AddVertex(f1).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 1 { + return fmt.Errorf("didn't get any stream events") + } + return nil + }, + }, + }) + } + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + // e1 arg name must match incoming edge to it + f2 := &testFunc{Name: "f2", Type: types.NewType("func(e1 str) str")} + e1 := testEdge("e1") + + testCases = append(testCases, test{ + name: "txn simple add edge", + vertices: []interfaces.Func{f1, f2}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + return txn.AddVertex(f1).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + // This newly added node should get a notification after it starts. + return txn.AddEdge(f1, f2, e1).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 2 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + }, + }) + } + { + // diamond + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + f2 := &testFunc{Name: "f2", Type: types.NewType("func(e1 str) str")} + f3 := &testFunc{Name: "f3", Type: types.NewType("func(e2 str) str")} + f4 := &testFunc{Name: "f4", Type: types.NewType("func(e3 str, e4 str) str")} + e1 := testEdge("e1") + e2 := testEdge("e2") + e3 := testEdge("e3") + e4 := testEdge("e4") + + testCases = append(testCases, test{ + name: "txn simple add multiple edges", + vertices: []interfaces.Func{f1, f2, f3, f4}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + return txn.AddVertex(f1).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + return txn.AddEdge(f1, f2, e1).AddEdge(f1, f3, e2).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + return txn.AddEdge(f2, f4, e3).AddEdge(f3, f4, e4).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + //meta.Lock() + //defer meta.Unlock() + num := 1 + for { + if num == 0 { + break + } + select { + case _, ok := <-meta.Event: + if !ok { + return fmt.Errorf("unexpectedly channel close") + } + num-- + if meta.debug { + meta.logf("got an event!") + } + case <-meta.ctx.Done(): + return meta.ctx.Err() + } + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 1 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + //meta.Lock() + //defer meta.Unlock() + num := 1 + for { + if num == 0 { + break + } + bt := util.BlockedTimer{Seconds: 2} + defer bt.Cancel() + bt.Printf("waiting for f4...\n") + select { + case _, ok := <-meta.Events["f4"]: + bt.Cancel() + if !ok { + return fmt.Errorf("unexpectedly channel close") + } + num-- + if meta.debug { + meta.logf("got an event from f4!") + } + case <-meta.ctx.Done(): + return meta.ctx.Err() + } + } + return nil + }, + }, + }) + } + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + + testCases = append(testCases, test{ + name: "txn simple add/delete vertex", + vertices: []interfaces.Func{f1}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + return txn.AddVertex(f1).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 1 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + //meta.Lock() + //defer meta.Unlock() + if meta.debug { + meta.logf("about to delete vertex f1!") + defer meta.logf("done deleting vertex f1!") + } + + return txn.DeleteVertex(f1).Commit() + }, + }, + }) + } + //{ + // f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + // // e1 arg name must match incoming edge to it + // f2 := &testFunc{Name: "f2", Type: types.NewType("func(e1 str) str")} + // e1 := testEdge("e1") + // + // testCases = append(testCases, test{ + // name: "txn simple add/delete edge", + // vertices: []interfaces.Func{f1, f2}, + // actions: []dageTestOp{ + // func(engine *Engine, txn interfaces.Txn, meta *meta) error { + // return txn.AddVertex(f1).Commit() + // }, + // func(engine *Engine, txn interfaces.Txn, meta *meta) error { + // time.Sleep(1 * time.Second) // XXX: unfortunate + // // This newly added node should get a notification after it starts. + // return txn.AddEdge(f1, f2, e1).Commit() + // }, + // func(engine *Engine, txn interfaces.Txn, meta *meta) error { + // time.Sleep(1 * time.Second) // XXX: unfortunate + // meta.Lock() + // defer meta.Unlock() + // if meta.EventCount < 2 { + // return fmt.Errorf("didn't get enough stream events") + // } + // return nil + // }, + // func(engine *Engine, txn interfaces.Txn, meta *meta) error { + // return txn.DeleteEdge(e1).Commit() // XXX: not implemented + // }, + // }, + // }) + //} + + if testing.Short() { + t.Logf("available tests:") + } + names := []string{} + for index, tc := range testCases { // run all the tests + if tc.name == "" { + t.Errorf("test #%d: not named", index) + continue + } + if util.StrInList(tc.name, names) { + t.Errorf("test #%d: duplicate sub test name of: %s", index, tc.name) + continue + } + names = append(names, tc.name) + + //if index != 3 { // hack to run a subset (useful for debugging) + //if tc.name != "simple txn" { + // continue + //} + + testName := fmt.Sprintf("test #%d (%s)", index, tc.name) + if testing.Short() { // make listing tests easier + t.Logf("%s", testName) + continue + } + t.Run(testName, func(t *testing.T) { + name, vertices, actions := tc.name, tc.vertices, tc.actions + + t.Logf("\n\ntest #%d (%s) ----------------\n\n", index, name) + + //logf := func(format string, v ...interface{}) { + // t.Logf(fmt.Sprintf("test #%d", index)+": "+format, v...) + //} + + //now := time.Now() + + wg := &sync.WaitGroup{} + defer wg.Wait() // defer is correct b/c we're in a func! + + min := 5 * time.Second // approx min time needed for the test + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if deadline, ok := t.Deadline(); ok { + d := deadline.Add(-min) + //t.Logf(" now: %+v", now) + //t.Logf(" d: %+v", d) + newCtx, cancel := context.WithDeadline(ctx, d) + ctx = newCtx + defer cancel() + } + + debug := testing.Verbose() // set via the -test.v flag to `go test` + + meta := &meta{ + Event: make(chan struct{}), + Events: make(map[string]chan struct{}), + + ctx: ctx, + wg: &sync.WaitGroup{}, + mutex: &sync.Mutex{}, + + debug: debug, + logf: func(format string, v ...interface{}) { + // safe Logf in case f.String contains %? chars... + s := fmt.Sprintf(format, v...) + t.Logf("%s", s) + }, + } + defer meta.wg.Wait() + + for _, f := range vertices { + testFunc, ok := f.(*testFunc) + if !ok { + t.Errorf("bad test function: %+v", f) + return + } + meta.Events[testFunc.Name] = make(chan struct{}) + testFunc.Meta = meta // add the handle + } + + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-ctx.Done(): + t.Logf("cancelling test...") + } + }() + + engine := &Engine{ + Name: "dage", + + Debug: debug, + Logf: t.Logf, + } + + if err := engine.Setup(); err != nil { + t.Errorf("could not setup engine: %+v", err) + return + } + defer engine.Cleanup() + + wg.Add(1) + go func() { + defer wg.Done() + if err := engine.Run(ctx); err != nil { + t.Errorf("error while running engine: %+v", err) + return + } + t.Logf("engine shutdown cleanly...") + }() + + <-engine.Started() // wait for startup (will not block forever) + + txn := engine.Txn() + defer txn.Free() // remember to call Free() + + wg.Add(1) + go func() { + defer wg.Done() + ch := engine.Stream() + for { + select { + case err, ok := <-ch: // channel must close to shutdown + if !ok { + return + } + meta.Lock() + meta.EventCount++ + meta.Unlock() + meta.wg.Add(1) + go func() { + // no need for lock here + defer meta.wg.Done() + if meta.debug { + meta.logf("sending an event!") + } + select { + case meta.Event <- struct{}{}: + case <-meta.ctx.Done(): + } + }() + if err != nil { + t.Errorf("graph error event: %v", err) + continue + } + t.Logf("graph stream event!") + } + } + }() + + // Run a list of actions. Any error kills it all. + t.Logf("starting actions...") + for i, action := range actions { + t.Logf("running action %d...", i+1) + if err := action(engine, txn, meta); err != nil { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: action #%d failed with: %+v", index, i, err) + break // so that cancel runs + } + } + + t.Logf("test done...") + cancel() + }) + } + + if testing.Short() { + t.Skip("skipping all tests...") + } +} diff --git a/lang/funcs/dage/ref.go b/lang/funcs/dage/ref.go new file mode 100644 index 00000000..74cd80d2 --- /dev/null +++ b/lang/funcs/dage/ref.go @@ -0,0 +1,282 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +// Package dage implements a DAG function engine. +// TODO: can we rename this to something more interesting? +package dage + +import ( + "fmt" + "sync" + + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/util/errwrap" +) + +// RefCount keeps track of vertex and edge references across the entire graph. +// Make sure to lock access somehow, ideally with the provided Locker interface. +type RefCount struct { + // mutex locks this database for read or write. + mutex *sync.Mutex + + // vertices is a reference count of the number of vertices used. + vertices map[interfaces.Func]int64 + + // edges is a reference count of the number of edges used. + edges map[*RefCountEdge]int64 // TODO: hash *RefCountEdge as a key instead +} + +// RefCountEdge is a virtual "hash" entry for the RefCount edges map key. +type RefCountEdge struct { + f1 interfaces.Func + f2 interfaces.Func + arg string +} + +// String prints a representation of the references held. +func (obj *RefCount) String() string { + s := "" + s += fmt.Sprintf("vertices (%d):\n", len(obj.vertices)) + for vertex, count := range obj.vertices { + s += fmt.Sprintf("\tvertex (%d): %p %s\n", count, vertex, vertex) + } + s += fmt.Sprintf("edges (%d):\n", len(obj.edges)) + for edge, count := range obj.edges { + s += fmt.Sprintf("\tedge (%d): %p %s -> %p %s # %s\n", count, edge.f1, edge.f1, edge.f2, edge.f2, edge.arg) + } + return s +} + +// Init must be called to initialized the struct before first use. +func (obj *RefCount) Init() *RefCount { + obj.mutex = &sync.Mutex{} + obj.vertices = make(map[interfaces.Func]int64) + obj.edges = make(map[*RefCountEdge]int64) + return obj // return self so it can be called in a chain +} + +// Lock the mutex that should be used when reading or writing from this. +func (obj *RefCount) Lock() { obj.mutex.Lock() } + +// Unlock the mutex that should be used when reading or writing from this. +func (obj *RefCount) Unlock() { obj.mutex.Unlock() } + +// VertexInc increments the reference count for the input vertex. It returns +// true if the reference count for this vertex was previously undefined or zero. +// True usually means we'd want to actually add this vertex now. If you attempt +// to increment a vertex which already has a less than zero count, then this +// will panic. This situation is likely impossible unless someone modified the +// reference counting struct directly. +func (obj *RefCount) VertexInc(f interfaces.Func) bool { + count, _ := obj.vertices[f] + obj.vertices[f] = count + 1 + if count == -1 { // unlikely, but catch any bugs + panic("negative reference count") + } + return count == 0 +} + +// VertexDec decrements the reference count for the input vertex. It returns +// true if the reference count for this vertex is now zero. True usually means +// we'd want to actually remove this vertex now. If you attempt to decrement a +// vertex which already has a zero count, then this will panic. +func (obj *RefCount) VertexDec(f interfaces.Func) bool { + count, _ := obj.vertices[f] + obj.vertices[f] = count - 1 + if count == 0 { + panic("negative reference count") + } + return count == 1 // now it's zero +} + +// EdgeInc increments the reference count for the input edge. It adds a +// reference for each arg name in the edge. Since this also increments the +// references for the two input vertices, it returns the corresponding two +// boolean values for these calls. (This function makes two calls to VertexInc.) +func (obj *RefCount) EdgeInc(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) (bool, bool) { + for _, arg := range fe.Args { // ref count each arg + r := obj.makeEdge(f1, f2, arg) + count := obj.edges[r] + obj.edges[r] = count + 1 + if count == -1 { // unlikely, but catch any bugs + panic("negative reference count") + } + } + + return obj.VertexInc(f1), obj.VertexInc(f2) +} + +// EdgeDec decrements the reference count for the input edge. It removes a +// reference for each arg name in the edge. Since this also decrements the +// references for the two input vertices, it returns the corresponding two +// boolean values for these calls. (This function makes two calls to VertexDec.) +func (obj *RefCount) EdgeDec(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) (bool, bool) { + for _, arg := range fe.Args { // ref count each arg + r := obj.makeEdge(f1, f2, arg) + count := obj.edges[r] + obj.edges[r] = count - 1 + if count == 0 { + panic("negative reference count") + } + } + + return obj.VertexDec(f1), obj.VertexDec(f2) +} + +// FreeVertex removes exactly one entry from the Vertices list or it errors. +func (obj *RefCount) FreeVertex(f interfaces.Func) error { + if count, exists := obj.vertices[f]; !exists || count != 0 { + return fmt.Errorf("no vertex of count zero found") + } + delete(obj.vertices, f) + return nil +} + +// FreeEdge removes exactly one entry from the Edges list or it errors. +func (obj *RefCount) FreeEdge(f1, f2 interfaces.Func, arg string) error { + found := []*RefCountEdge{} + for k, count := range obj.edges { + //if k == nil { // programming error + // continue + //} + if k.f1 == f1 && k.f2 == f2 && k.arg == arg && count == 0 { + found = append(found, k) + } + } + if len(found) > 1 { + return fmt.Errorf("inconsistent ref count for edge") + } + if len(found) == 0 { + return fmt.Errorf("no edge of count zero found") + } + delete(obj.edges, found[0]) // delete from map + return nil +} + +// GC runs the garbage collector on any zeroed references. Note the distinction +// between count == 0 (please delete now) and absent from the map. +func (obj *RefCount) GC(graphAPI interfaces.GraphAPI) error { + // debug + //fmt.Printf("start refs\n%s", obj.String()) + //defer func() { fmt.Printf("end refs\n%s", obj.String()) }() + free := make(map[interfaces.Func]map[interfaces.Func][]string) // f1 -> f2 + for x, count := range obj.edges { + if count != 0 { // we only care about freed things + continue + } + if _, exists := free[x.f1]; !exists { + free[x.f1] = make(map[interfaces.Func][]string) + } + if _, exists := free[x.f1][x.f2]; !exists { + free[x.f1][x.f2] = []string{} + } + free[x.f1][x.f2] = append(free[x.f1][x.f2], x.arg) // exists as refcount zero + } + + // These edges have a refcount of zero. + for f1, x := range free { + for f2, args := range x { + for _, arg := range args { + edge := graphAPI.FindEdge(f1, f2) + // any errors here are programming errors + if edge == nil { + return fmt.Errorf("missing edge from %p %s -> %p %s", f1, f1, f2, f2) + } + + once := false // sanity check + newArgs := []string{} + for _, a := range edge.Args { + if arg == a { + if once { + // programming error, duplicate arg + return fmt.Errorf("duplicate arg (%s) in edge", arg) + } + once = true + continue + } + newArgs = append(newArgs, a) + } + + if len(edge.Args) == 1 { // edge gets deleted + if a := edge.Args[0]; a != arg { // one arg + return fmt.Errorf("inconsistent arg: %s != %s", a, arg) + } + + if err := graphAPI.DeleteEdge(edge); err != nil { + return errwrap.Wrapf(err, "edge deletion error") + } + } else { + // just remove the one arg for now + edge.Args = newArgs + } + + // always free the database entry + if err := obj.FreeEdge(f1, f2, arg); err != nil { + return err + } + } + } + } + + // Now check the vertices... + vs := []interfaces.Func{} + for vertex, count := range obj.vertices { + if count != 0 { + continue + } + + // safety check, vertex is still in use by an edge + for x := range obj.edges { + if x.f1 == vertex || x.f2 == vertex { + // programming error + return fmt.Errorf("vertex unexpectedly still in use: %p %s", vertex, vertex) + } + } + + vs = append(vs, vertex) + } + + for _, vertex := range vs { + if err := graphAPI.DeleteVertex(vertex); err != nil { + return errwrap.Wrapf(err, "vertex deletion error") + } + // free the database entry + if err := obj.FreeVertex(vertex); err != nil { + return err + } + } + + return nil +} + +// makeEdge looks up an edge with the "hash" input we are seeking. If it doesn't +// find a match, it returns a new one with those fields. +func (obj *RefCount) makeEdge(f1, f2 interfaces.Func, arg string) *RefCountEdge { + for k := range obj.edges { + //if k == nil { // programming error + // continue + //} + if k.f1 == f1 && k.f2 == f2 && k.arg == arg { + return k + } + } + return &RefCountEdge{ // not found, so make a new one! + f1: f1, + f2: f2, + arg: arg, + } +} diff --git a/lang/funcs/dage/txn.go b/lang/funcs/dage/txn.go new file mode 100644 index 00000000..41a669cc --- /dev/null +++ b/lang/funcs/dage/txn.go @@ -0,0 +1,626 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +// Package dage implements a DAG function engine. +// TODO: can we rename this to something more interesting? +package dage + +import ( + "fmt" + "sort" + "sync" + + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/pgraph" +) + +// PostReverseCommit specifies that if we run Reverse, and we had previous items +// pending for Commit, that we should Commit them after our Reverse runs. +// Otherwise they remain on the pending queue and wait for you to run Commit. +const PostReverseCommit = false + +// GraphvizDebug enables writing graphviz graphs on each commit. This is very +// slow. +const GraphvizDebug = false + +// opapi is the input for any op. This allows us to keeps things compact and it +// also allows us to change API slightly without re-writing code. +type opapi struct { + GraphAPI interfaces.GraphAPI + RefCount *RefCount +} + +// opfn is an interface that holds the normal op, and the reverse op if we need +// to rollback from the forward fn. Implementations of each op can decide to +// store some internal state when running the forward op which might be needed +// for the possible future reverse op. +type opfn interface { + fmt.Stringer + + Fn(*opapi) error + Rev(*opapi) error +} + +type opfnSkipRev interface { + opfn + + // Skip tells us if this op should be skipped from reversing. + Skip() bool + + // SetSkip specifies that this op should be skipped from reversing. + SetSkip(bool) +} + +type opfnFlag interface { + opfn + + // Flag reads some misc data. + Flag() interface{} + + // SetFlag sets some misc data. + SetFlag(interface{}) +} + +// revOp returns the reversed op from an op by packing or unpacking it. +func revOp(op opfn) opfn { + if skipOp, ok := op.(opfnSkipRev); ok && skipOp.Skip() { + return nil // skip + } + + // XXX: is the reverse of a reverse just undoing it? maybe not but might not matter for us + if newOp, ok := op.(*opRev); ok { + + if newFlagOp, ok := op.(opfnFlag); ok { + newFlagOp.SetFlag("does this rev of rev even happen?") + } + + return newOp.Op // unpack it + } + + return &opRev{ + Op: op, + + opFlag: &opFlag{}, + } // pack it +} + +// opRev switches the Fn and Rev methods by wrapping the contained op in each +// other. +type opRev struct { + Op opfn + + *opFlag +} + +func (obj *opRev) Fn(opapi *opapi) error { + return obj.Op.Rev(opapi) +} + +func (obj *opRev) Rev(opapi *opapi) error { + return obj.Op.Fn(opapi) +} + +func (obj *opRev) String() string { + return "rev(" + obj.Op.String() + ")" // TODO: is this correct? +} + +type opSkip struct { + skip bool +} + +func (obj *opSkip) Skip() bool { + return obj.skip +} + +func (obj *opSkip) SetSkip(skip bool) { + obj.skip = skip +} + +type opFlag struct { + flag interface{} +} + +func (obj *opFlag) Flag() interface{} { + return obj.flag +} + +func (obj *opFlag) SetFlag(flag interface{}) { + obj.flag = flag +} + +type opAddVertex struct { + F interfaces.Func + + *opSkip + *opFlag +} + +func (obj *opAddVertex) Fn(opapi *opapi) error { + if opapi.RefCount.VertexInc(obj.F) { + // add if we're the first reference + return opapi.GraphAPI.AddVertex(obj.F) + } + + return nil +} + +func (obj *opAddVertex) Rev(opapi *opapi) error { + opapi.RefCount.VertexDec(obj.F) + // any removal happens in gc + return nil +} + +func (obj *opAddVertex) String() string { + return fmt.Sprintf("AddVertex: %+v", obj.F) +} + +type opAddEdge struct { + F1 interfaces.Func + F2 interfaces.Func + FE *interfaces.FuncEdge + + *opSkip + *opFlag +} + +func (obj *opAddEdge) Fn(opapi *opapi) error { + if obj.F1 == obj.F2 { // simplify below code/logic with this easy check + return fmt.Errorf("duplicate vertex cycle") + } + + opapi.RefCount.EdgeInc(obj.F1, obj.F2, obj.FE) + + fe := obj.FE // squish multiple edges together if one already exists + if edge := opapi.GraphAPI.FindEdge(obj.F1, obj.F2); edge != nil { + args := make(map[string]struct{}) + for _, x := range obj.FE.Args { + args[x] = struct{}{} + } + for _, x := range edge.Args { + args[x] = struct{}{} + } + if len(args) != len(obj.FE.Args)+len(edge.Args) { + // programming error + return fmt.Errorf("duplicate arg found") + } + newArgs := []string{} + for x := range args { + newArgs = append(newArgs, x) + } + sort.Strings(newArgs) // for consistency? + fe = &interfaces.FuncEdge{ + Args: newArgs, + } + } + + // The dage API currently smooshes together any existing edge args with + // our new edge arg names. It also adds the vertices if needed. + if err := opapi.GraphAPI.AddEdge(obj.F1, obj.F2, fe); err != nil { + return err + } + + return nil +} + +func (obj *opAddEdge) Rev(opapi *opapi) error { + opapi.RefCount.EdgeDec(obj.F1, obj.F2, obj.FE) + return nil +} + +func (obj *opAddEdge) String() string { + return fmt.Sprintf("AddEdge: %+v -> %+v (%+v)", obj.F1, obj.F2, obj.FE) +} + +type opDeleteVertex struct { + F interfaces.Func + + *opSkip + *opFlag +} + +func (obj *opDeleteVertex) Fn(opapi *opapi) error { + if opapi.RefCount.VertexDec(obj.F) { + //delete(opapi.RefCount.Vertices, obj.F) // don't GC this one + if err := opapi.RefCount.FreeVertex(obj.F); err != nil { + panic("could not free vertex") + } + return opapi.GraphAPI.DeleteVertex(obj.F) // do it here instead + } + return nil +} + +func (obj *opDeleteVertex) Rev(opapi *opapi) error { + if opapi.RefCount.VertexInc(obj.F) { + return opapi.GraphAPI.AddVertex(obj.F) + } + return nil +} + +func (obj *opDeleteVertex) String() string { + return fmt.Sprintf("DeleteVertex: %+v", obj.F) +} + +// graphTxn holds the state of a transaction and runs it when needed. When this +// has been setup and initialized, it implements the Txn API that can be used by +// functions in their Stream method to modify the function graph while it is +// "running". +type graphTxn struct { + // Lock is a handle to the lock function to call before the operation. + Lock func() + + // Unlock is a handle to the unlock function to call before the + // operation. + Unlock func() + + // GraphAPI is a handle pointing to the graph API implementation we're + // using for any txn operations. + GraphAPI interfaces.GraphAPI + + // RefCount keeps track of vertex and edge references across the entire + // graph. + RefCount *RefCount + + // FreeFunc is a function that will get called by a well-behaved user + // when we're done with this Txn. + FreeFunc func() + + // ops is a list of operations to run on a graph + ops []opfn + + // rev is a list of reverse operations to run on a graph + rev []opfn + + // mutex guards changes to the ops list + mutex *sync.Mutex +} + +// init must be called to initialized the struct before first use. This is +// private because the creator, not the user should run it. +func (obj *graphTxn) init() interfaces.Txn { + obj.ops = []opfn{} + obj.rev = []opfn{} + obj.mutex = &sync.Mutex{} + + return obj // return self so it can be called in a chain +} + +// Copy returns a new child Txn that has the same handles, but a separate state. +// This allows you to do an Add*/Commit/Reverse that isn't affected by a +// different user of this transaction. +// TODO: FreeFunc isn't well supported here. Replace or remove this entirely? +func (obj *graphTxn) Copy() interfaces.Txn { + txn := &graphTxn{ + Lock: obj.Lock, + Unlock: obj.Unlock, + GraphAPI: obj.GraphAPI, + RefCount: obj.RefCount, // this is shared across all txn's + // FreeFunc is shared with the parent. + } + return txn.init() +} + +// AddVertex adds a vertex to the running graph. The operation will get +// completed when Commit is run. +// XXX: should this be pgraph.Vertex instead of interfaces.Func ? +func (obj *graphTxn) AddVertex(f interfaces.Func) interfaces.Txn { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + opfn := &opAddVertex{ + F: f, + + opSkip: &opSkip{}, + opFlag: &opFlag{}, + } + obj.ops = append(obj.ops, opfn) + + return obj // return self so it can be called in a chain +} + +// AddEdge adds an edge to the running graph. The operation will get completed +// when Commit is run. +// XXX: should this be pgraph.Vertex instead of interfaces.Func ? +// XXX: should this be pgraph.Edge instead of *interfaces.FuncEdge ? +func (obj *graphTxn) AddEdge(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) interfaces.Txn { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + opfn := &opAddEdge{ + F1: f1, + F2: f2, + FE: fe, + + opSkip: &opSkip{}, + opFlag: &opFlag{}, + } + obj.ops = append(obj.ops, opfn) + + // NOTE: we can't build obj.rev yet because in this case, we'd need to + // know if the runtime graph contained one of the two pre-existing + // vertices or not, or if it would get added implicitly by this op! + + return obj // return self so it can be called in a chain +} + +// DeleteVertex adds a vertex to the running graph. The operation will get +// completed when Commit is run. +// XXX: should this be pgraph.Vertex instead of interfaces.Func ? +func (obj *graphTxn) DeleteVertex(f interfaces.Func) interfaces.Txn { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + opfn := &opDeleteVertex{ + F: f, + + opSkip: &opSkip{}, + opFlag: &opFlag{}, + } + obj.ops = append(obj.ops, opfn) + + return obj // return self so it can be called in a chain +} + +// AddGraph adds a graph to the running graph. The operation will get completed +// when Commit is run. This function panics if your graph contains vertices that +// are not of type interfaces.Func or if your edges are not of type +// *interfaces.FuncEdge. +func (obj *graphTxn) AddGraph(g *pgraph.Graph) interfaces.Txn { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + for _, v := range g.Vertices() { + f, ok := v.(interfaces.Func) + if !ok { + panic("not a Func") + } + //obj.AddVertex(f) // easy + opfn := &opAddVertex{ // replicate AddVertex + F: f, + + opSkip: &opSkip{}, + opFlag: &opFlag{}, + } + obj.ops = append(obj.ops, opfn) + } + + for v1, m := range g.Adjacency() { + f1, ok := v1.(interfaces.Func) + if !ok { + panic("not a Func") + } + for v2, e := range m { + f2, ok := v2.(interfaces.Func) + if !ok { + panic("not a Func") + } + fe, ok := e.(*interfaces.FuncEdge) + if !ok { + panic("not a *FuncEdge") + } + + //obj.AddEdge(f1, f2, fe) // easy + opfn := &opAddEdge{ // replicate AddEdge + F1: f1, + F2: f2, + FE: fe, + + opSkip: &opSkip{}, + opFlag: &opFlag{}, + } + obj.ops = append(obj.ops, opfn) + } + } + + return obj // return self so it can be called in a chain +} + +// commit runs the pending transaction. This is the lockless version that is +// only used internally. +func (obj *graphTxn) commit() error { + if len(obj.ops) == 0 { // nothing to do + return nil + } + + // TODO: Instead of requesting the below locks, it's conceivable that we + // could either write an engine that doesn't require pausing the graph + // with a lock, or one that doesn't in the specific case being changed + // here need locks. And then in theory we'd have improved performance + // from the function engine. For our function consumers, the Txn API + // would never need to change, so we don't break API! A simple example + // is the len(ops) == 0 one right above. A simplification, but shows we + // aren't forced to call the locks even when we get Commit called here. + + // Now request the lock from the actual graph engine. + obj.Lock() + defer obj.Unlock() + + // Now request the ref count mutex. This may seem redundant, but it's + // not. The above graph engine Lock might allow more than one commit + // through simultaneously depending on implementation. The actual count + // mathematics must not, and so it has a separate lock. We could lock it + // per-operation, but that would probably be a lot slower. + obj.RefCount.Lock() + defer obj.RefCount.Unlock() + + // TODO: we don't need to do this anymore, because the engine does it! + // Copy the graph structure, perform the ops, check we didn't add a + // cycle, and if it's safe, do the real thing. Otherwise error here. + //g := obj.Graph.Copy() // copy the graph structure + //for _, x := range obj.ops { + // x(g) // call it + //} + //if _, err := g.TopologicalSort(); err != nil { + // return errwrap.Wrapf(err, "topo sort failed in txn commit") + //} + // FIXME: is there anything else we should check? Should we type-check? + + // Now do it for real... + obj.rev = []opfn{} // clear it for safety + opapi := &opapi{ + GraphAPI: obj.GraphAPI, + RefCount: obj.RefCount, + } + for _, op := range obj.ops { + if err := op.Fn(opapi); err != nil { // call it + // something went wrong (we made a cycle?) + obj.rev = []opfn{} // clear it, we didn't succeed + return err + } + + op = revOp(op) // reverse the op! + if op != nil { + obj.rev = append(obj.rev, op) // add the reverse op + //obj.rev = append([]opfn{op}, obj.rev...) // add to front + } + } + obj.ops = []opfn{} // clear it + + // garbage collect anything that hit zero! + // XXX: add gc function to this struct and pass in opapi instead? + if err := obj.RefCount.GC(obj.GraphAPI); err != nil { + // programming error or ghosts + return err + } + + // XXX: running this on each commit has a huge performance hit. + // XXX: we could write out the .dot files and run graphviz afterwards + if engine, ok := obj.GraphAPI.(*Engine); ok && GraphvizDebug { + //d := time.Now().Unix() + //if err := engine.graph.ExecGraphviz(fmt.Sprintf("/tmp/txn-graphviz-%d.dot", d)); err != nil { + // panic("no graphviz") + //} + if err := engine.Graphviz(""); err != nil { + panic(err) // XXX: improve me + } + + //gv := &pgraph.Graphviz{ + // Filename: fmt.Sprintf("/tmp/txn-graphviz-%d.dot", d), + // Graphs: map[*pgraph.Graph]*pgraph.GraphvizOpts{ + // engine.graph: nil, + // }, + //} + //if err := gv.Exec(); err != nil { + // panic("no graphviz") + //} + } + return nil +} + +// Commit runs the pending transaction. If there was a pending reverse +// transaction that could have run (it would have been available following a +// Commit success) then this will erase that transaction. Usually you run cycles +// of Commit, followed by Reverse, or only Commit. (You obviously have to +// populate operations before the Commit is run.) +func (obj *graphTxn) Commit() error { + // Lock our internal state mutex first... this prevents other AddVertex + // or similar calls from interferring with our work here. + obj.mutex.Lock() + defer obj.mutex.Unlock() + + return obj.commit() +} + +// Clear erases any pending transactions that weren't committed yet. +func (obj *graphTxn) Clear() { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + obj.ops = []opfn{} // clear it +} + +// Reverse is like Commit, but it commits the reverse transaction to the one +// that previously ran with Commit. If the PostReverseCommit global has been set +// then if there were pending commit operations when this was run, then they are +// run at the end of a successful Reverse. It is generally recommended to not +// queue any operations for Commit if you plan on doing a Reverse, or to run a +// Clear before running Reverse if you want to discard the pending commits. +func (obj *graphTxn) Reverse() error { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + // first commit all the rev stuff... and then run the pending ops... + + ops := []opfn{} // save a copy + for _, op := range obj.ops { // copy + ops = append(ops, op) + } + obj.ops = []opfn{} // clear + + //for _, op := range obj.rev + for i := len(obj.rev) - 1; i >= 0; i-- { // copy in the rev stuff to commit! + op := obj.rev[i] + // mark these as being not reversible (so skip them on reverse!) + if skipOp, ok := op.(opfnSkipRev); ok { + skipOp.SetSkip(true) + } + obj.ops = append(obj.ops, op) + } + + //rev := []func(interfaces.GraphAPI){} // for the copy + //for _, op := range obj.rev { // copy + // rev = append(rev, op) + //} + obj.rev = []opfn{} // clear + + //rollback := func() { + // //for _, op := range rev { // from our safer copy + // //for _, op := range obj.ops { // copy back out the rev stuff + // for i := len(obj.ops) - 1; i >= 0; i-- { // copy in the rev stuff to commit! + // op := obj.rev[i] + // obj.rev = append(obj.rev, op) + // } + // obj.ops = []opfn{} // clear + // for _, op := range ops { // copy the original ops back in + // obj.ops = append(obj.ops, op) + // } + //} + // first commit the reverse stuff + if err := obj.commit(); err != nil { // lockless version + // restore obj.rev and obj.ops + //rollback() // probably not needed + return err + } + + // then if we had normal ops queued up, run those or at least restore... + for _, op := range ops { // copy + obj.ops = append(obj.ops, op) + } + + if PostReverseCommit { + return obj.commit() // lockless version + } + + return nil +} + +// Erase removes the historical information that Reverse would run after Commit. +func (obj *graphTxn) Erase() { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + obj.rev = []opfn{} // clear it +} + +// Free releases the wait group that was used to lock around this Txn if needed. +// It should get called when we're done with any Txn. +// TODO: this is only used for the initial Txn. Consider expanding it's use. We +// might need to allow Clear to call it as part of the clearing. +func (obj *graphTxn) Free() { + if obj.FreeFunc != nil { + obj.FreeFunc() + } +} diff --git a/lang/funcs/dage/txn_test.go b/lang/funcs/dage/txn_test.go new file mode 100644 index 00000000..d615eead --- /dev/null +++ b/lang/funcs/dage/txn_test.go @@ -0,0 +1,503 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//go:build !root + +package dage + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/pgraph" + "github.com/purpleidea/mgmt/util" +) + +type testGraphAPI struct { + graph *pgraph.Graph +} + +func (obj *testGraphAPI) AddVertex(f interfaces.Func) error { + v, ok := f.(pgraph.Vertex) + if !ok { + return fmt.Errorf("can't use func as vertex") + } + obj.graph.AddVertex(v) + return nil +} +func (obj *testGraphAPI) AddEdge(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) error { + v1, ok := f1.(pgraph.Vertex) + if !ok { + return fmt.Errorf("can't use func as vertex") + } + v2, ok := f2.(pgraph.Vertex) + if !ok { + return fmt.Errorf("can't use func as vertex") + } + obj.graph.AddEdge(v1, v2, fe) + return nil +} + +func (obj *testGraphAPI) DeleteVertex(f interfaces.Func) error { + v, ok := f.(pgraph.Vertex) + if !ok { + return fmt.Errorf("can't use func as vertex") + } + obj.graph.DeleteVertex(v) + return nil +} + +func (obj *testGraphAPI) DeleteEdge(fe *interfaces.FuncEdge) error { + obj.graph.DeleteEdge(fe) + return nil +} + +//func (obj *testGraphAPI) AddGraph(*pgraph.Graph) error { +// return fmt.Errorf("not implemented") +//} + +//func (obj *testGraphAPI) Adjacency() map[interfaces.Func]map[interfaces.Func]*interfaces.FuncEdge { +// panic("not implemented") +//} + +func (obj *testGraphAPI) HasVertex(f interfaces.Func) bool { + v, ok := f.(pgraph.Vertex) + if !ok { + panic("can't use func as vertex") + } + return obj.graph.HasVertex(v) +} + +func (obj *testGraphAPI) LookupEdge(fe *interfaces.FuncEdge) (interfaces.Func, interfaces.Func, bool) { + v1, v2, b := obj.graph.LookupEdge(fe) + if !b { + return nil, nil, b + } + + f1, ok := v1.(interfaces.Func) + if !ok { + panic("can't use vertex as func") + } + f2, ok := v2.(interfaces.Func) + if !ok { + panic("can't use vertex as func") + } + return f1, f2, true +} + +func (obj *testGraphAPI) FindEdge(f1, f2 interfaces.Func) *interfaces.FuncEdge { + edge := obj.graph.FindEdge(f1, f2) + if edge == nil { + return nil + } + fe, ok := edge.(*interfaces.FuncEdge) + if !ok { + panic("edge is not a FuncEdge") + } + + return fe +} + +type testNullFunc struct { + name string +} + +func (obj *testNullFunc) String() string { return obj.name } +func (obj *testNullFunc) Info() *interfaces.Info { return nil } +func (obj *testNullFunc) Validate() error { return nil } +func (obj *testNullFunc) Init(*interfaces.Init) error { return nil } +func (obj *testNullFunc) Stream(context.Context) error { return nil } + +func TestTxn1(t *testing.T) { + graph, err := pgraph.NewGraph("test") + if err != nil { + t.Errorf("err: %+v", err) + return + } + testGraphAPI := &testGraphAPI{graph: graph} + mutex := &sync.Mutex{} + + graphTxn := &graphTxn{ + GraphAPI: testGraphAPI, + Lock: mutex.Lock, + Unlock: mutex.Unlock, + RefCount: (&RefCount{}).Init(), + } + txn := graphTxn.init() + + f1 := &testNullFunc{"f1"} + + if err := txn.AddVertex(f1).Commit(); err != nil { + t.Errorf("commit err: %+v", err) + return + } + + if l, i := len(graph.Adjacency()), 1; l != i { + t.Errorf("got len of: %d", l) + t.Errorf("exp len of: %d", i) + return + } + + if err := txn.Reverse(); err != nil { + t.Errorf("reverse err: %+v", err) + return + } + + if l, i := len(graph.Adjacency()), 0; l != i { + t.Errorf("got len of: %d", l) + t.Errorf("exp len of: %d", i) + return + } +} + +type txnTestOp func(*pgraph.Graph, interfaces.Txn) error + +func TestTxnTable(t *testing.T) { + + type test struct { // an individual test + name string + actions []txnTestOp + } + testCases := []test{} + { + f1 := &testNullFunc{"f1"} + + testCases = append(testCases, test{ + name: "simple add vertex", + actions: []txnTestOp{ + //func(g *pgraph.Graph, txn interfaces.Txn) error { + // txn.AddVertex(f1) + // return nil + //}, + //func(g *pgraph.Graph, txn interfaces.Txn) error { + // return txn.Commit() + //}, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.AddVertex(f1).Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 1; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Reverse() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 0; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + }, + }) + } + { + f1 := &testNullFunc{"f1"} + f2 := &testNullFunc{"f2"} + e1 := testEdge("e1") + + testCases = append(testCases, test{ + name: "simple add edge", + actions: []txnTestOp{ + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.AddEdge(f1, f2, e1).Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 2; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 1; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Reverse() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 0; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 0; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + }, + }) + } + { + f1 := &testNullFunc{"f1"} + f2 := &testNullFunc{"f2"} + e1 := testEdge("e1") + + testCases = append(testCases, test{ + name: "simple add edge two-step", + actions: []txnTestOp{ + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.AddVertex(f1).Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.AddEdge(f1, f2, e1).Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 2; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 1; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + // Reverse only undoes what happened since the + // previous commit, so only one of the nodes is + // left at the end. + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Reverse() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 1; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 0; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + }, + }) + } + { + f1 := &testNullFunc{"f1"} + f2 := &testNullFunc{"f2"} + e1 := testEdge("e1") + + testCases = append(testCases, test{ + name: "simple two add edge, reverse", + actions: []txnTestOp{ + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.AddVertex(f1).AddVertex(f2).Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.AddEdge(f1, f2, e1).Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 2; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 1; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + // Reverse only undoes what happened since the + // previous commit, so only one of the nodes is + // left at the end. + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Reverse() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 2; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 0; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + }, + }) + } + { + f1 := &testNullFunc{"f1"} + f2 := &testNullFunc{"f2"} + f3 := &testNullFunc{"f3"} + f4 := &testNullFunc{"f4"} + e1 := testEdge("e1") + e2 := testEdge("e2") + e3 := testEdge("e3") + e4 := testEdge("e4") + + testCases = append(testCases, test{ + name: "simple add/delete", + actions: []txnTestOp{ + func(g *pgraph.Graph, txn interfaces.Txn) error { + txn.AddVertex(f1).AddEdge(f1, f2, e1) + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + txn.AddVertex(f1).AddEdge(f1, f3, e2) + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 3; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 2; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + txn.AddEdge(f2, f4, e3) + txn.AddEdge(f3, f4, e4) + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 4; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 4; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + // debug + //func(g *pgraph.Graph, txn interfaces.Txn) error { + // fileName := "/tmp/graphviz-txn1.dot" + // if err := g.ExecGraphviz(fileName); err != nil { + // return fmt.Errorf("writing graph failed at: %s, err: %+v", fileName, err) + // } + // return nil + //}, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Reverse() + }, + // debug + //func(g *pgraph.Graph, txn interfaces.Txn) error { + // fileName := "/tmp/graphviz-txn2.dot" + // if err := g.ExecGraphviz(fileName); err != nil { + // return fmt.Errorf("writing graph failed at: %s, err: %+v", fileName, err) + // } + // return nil + //}, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 3; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 2; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + }, + }) + } + if testing.Short() { + t.Logf("available tests:") + } + names := []string{} + for index, tc := range testCases { // run all the tests + if tc.name == "" { + t.Errorf("test #%d: not named", index) + continue + } + if util.StrInList(tc.name, names) { + t.Errorf("test #%d: duplicate sub test name of: %s", index, tc.name) + continue + } + names = append(names, tc.name) + + //if index != 3 { // hack to run a subset (useful for debugging) + //if tc.name != "simple txn" { + // continue + //} + + testName := fmt.Sprintf("test #%d (%s)", index, tc.name) + if testing.Short() { // make listing tests easier + t.Logf("%s", testName) + continue + } + t.Run(testName, func(t *testing.T) { + name, actions := tc.name, tc.actions + + t.Logf("\n\ntest #%d (%s) ----------------\n\n", index, name) + + //logf := func(format string, v ...interface{}) { + // t.Logf(fmt.Sprintf("test #%d", index)+": "+format, v...) + //} + + graph, err := pgraph.NewGraph("test") + if err != nil { + t.Errorf("err: %+v", err) + return + } + testGraphAPI := &testGraphAPI{graph: graph} + mutex := &sync.Mutex{} + + graphTxn := &graphTxn{ + GraphAPI: testGraphAPI, + Lock: mutex.Lock, + Unlock: mutex.Unlock, + RefCount: (&RefCount{}).Init(), + } + txn := graphTxn.init() + + // Run a list of actions, passing the returned txn (if + // any) to the next action. Any error kills it all. + for i, action := range actions { + if err := action(graph, txn); err != nil { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: action #%d failed with: %+v", index, i, err) + return + } + } + }) + } + + if testing.Short() { + t.Skip("skipping all tests...") + } +} diff --git a/lang/funcs/dage/util_test.go b/lang/funcs/dage/util_test.go new file mode 100644 index 00000000..b241f9b8 --- /dev/null +++ b/lang/funcs/dage/util_test.go @@ -0,0 +1,30 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//go:build !root + +package dage + +import ( + "github.com/purpleidea/mgmt/lang/interfaces" +) + +func testEdge(name string) *interfaces.FuncEdge { + return &interfaces.FuncEdge{ + Args: []string{name}, + } +} diff --git a/lang/interfaces/func.go b/lang/interfaces/func.go index 80a2d4b9..62acda0a 100644 --- a/lang/interfaces/func.go +++ b/lang/interfaces/func.go @@ -24,6 +24,7 @@ import ( "github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/lang/types" + "github.com/purpleidea/mgmt/pgraph" ) // FuncSig is the simple signature that is used throughout our implementations. @@ -45,8 +46,20 @@ type Info struct { type Init struct { Hostname string // uuid for the host //Noop bool - Input chan types.Value // Engine will close `input` chan - Output chan types.Value // Stream must close `output` chan + + // Input is where a chan (stream) of values will get sent to this node. + // The engine will close this `input` chan. + Input chan types.Value + + // Output is the chan (stream) of values to get sent out from this node. + // The Stream function must close this `output` chan. + Output chan types.Value + + // Txn provides a transaction API that can be used to modify the + // function graph while it is "running". This should not be used by most + // nodes, and when it is used, it should be used carefully. + Txn Txn + // TODO: should we pass in a *Scope here for functions like template() ? World engine.World Debug bool @@ -230,3 +243,93 @@ type FuncEdge struct { func (obj *FuncEdge) String() string { return strings.Join(obj.Args, ", ") } + +// GraphAPI is a subset of the available graph operations that are possible on a +// pgraph that is used for storing functions. The minimum subset are those which +// are needed for implementing the Txn interface. +type GraphAPI interface { + AddVertex(Func) error + AddEdge(Func, Func, *FuncEdge) error + DeleteVertex(Func) error + DeleteEdge(*FuncEdge) error + //AddGraph(*pgraph.Graph) error + + //Adjacency() map[Func]map[Func]*FuncEdge + HasVertex(Func) bool + FindEdge(Func, Func) *FuncEdge + LookupEdge(*FuncEdge) (Func, Func, bool) +} + +// Txn is the interface that the engine graph API makes available so that +// functions can modify the function graph dynamically while it is "running". +// This could be implemented in one of two methods. +// +// Method 1: Have a pair of graph Lock and Unlock methods. Queue up the work to +// do and when we "commit" the transaction, we're just queuing up the work to do +// and then we run it all surrounded by the lock. +// +// Method 2: It's possible that we might eventually be able to actually modify +// the running graph without even causing it to pause at all. In this scenario, +// the "commit" would just directly perform those operations without even using +// the Lock and Unlock mutex operations. This is why we don't expose those in +// the API. It's also safer because someone can't forget to run Unlock which +// would block the whole code base. +type Txn interface { + // AddVertex adds a vertex to the running graph. The operation will get + // completed when Commit is run. + AddVertex(Func) Txn + + // AddEdge adds an edge to the running graph. The operation will get + // completed when Commit is run. + AddEdge(Func, Func, *FuncEdge) Txn + + // DeleteVertex removes a vertex from the running graph. The operation + // will get completed when Commit is run. + DeleteVertex(Func) Txn + + // DeleteEdge removes an edge from the running graph. It removes the + // edge that is found between the two input vertices. The operation will + // get completed when Commit is run. The edge is part of the signature + // so that it is both symmetrical with AddEdge, and also easier to + // reverse in theory. + // NOTE: This is not supported since there's no sane Reverse with GC. + // XXX: Add this in but just don't let it be reversible? + //DeleteEdge(Func, Func, *FuncEdge) Txn + + // AddGraph adds a graph to the running graph. The operation will get + // completed when Commit is run. This function panics if your graph + // contains vertices that are not of type interfaces.Func or if your + // edges are not of type *interfaces.FuncEdge. + AddGraph(*pgraph.Graph) Txn + + // Commit runs the pending transaction. + Commit() error + + // Clear erases any pending transactions that weren't committed yet. + Clear() + + // Reverse runs the reverse commit of the last successful operation to + // Commit. AddVertex is reversed by DeleteVertex, and vice-versa, and + // the same for AddEdge and DeleteEdge. Keep in mind that if AddEdge is + // called with either vertex not already part of the graph, it will + // implicitly add them, but the Reverse operation will not necessarily + // know that. As a result, it's recommended to not perform operations + // that have implicit Adds or Deletes. Notwithstanding the above, the + // initial Txn implementation can and does try to track these changes + // so that it can correctly reverse them, but this is not guaranteed by + // API, and it could contain bugs. + Reverse() error + + // Erase removes the historical information that Reverse would run after + // Commit. + Erase() + + // Free releases the wait group that was used to lock around this Txn if + // needed. It should get called when we're done with any Txn. + Free() + + // Copy returns a new child Txn that has the same handles, but a + // separate state. This allows you to do an Add*/Commit/Reverse that + // isn't affected by a different user of this transaction. + Copy() Txn +}