diff --git a/lang/funcs/dage/dage.go b/lang/funcs/dage/dage.go new file mode 100644 index 00000000..6caed7ad --- /dev/null +++ b/lang/funcs/dage/dage.go @@ -0,0 +1,1587 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +// Package dage implements a DAG function engine. +// TODO: can we rename this to something more interesting? +package dage + +import ( + "context" + "fmt" + "os" + "sort" + "strings" + "sync" + "time" + + "github.com/purpleidea/mgmt/engine" + "github.com/purpleidea/mgmt/lang/funcs/structs" + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/lang/types" + "github.com/purpleidea/mgmt/pgraph" + "github.com/purpleidea/mgmt/util/errwrap" +) + +// Engine implements a dag engine which lets us "run" a dag of functions, but +// also allows us to modify it while we are running. +type Engine struct { + // Name is the name used for the instance of the engine and in the graph + // that is held within it. + Name string + + Hostname string + World engine.World + + Debug bool + Logf func(format string, v ...interface{}) + + // Callback can be specified as an alternative to using the Stream + // method to get events. If the context on it is cancelled, then it must + // shutdown quickly, because this means we are closing and want to + // disconnect. Whether you want to respect that is up to you, but the + // engine will not be able to close until you do. If specified, and an + // error has occurred, it will set that error property. + Callback func(context.Context, error) + + graph *pgraph.Graph // guarded by graphMutex + table map[interfaces.Func]types.Value // guarded by tableMutex + state map[interfaces.Func]*state + + // graphMutex wraps access to the table map. + graphMutex *sync.Mutex // TODO: &sync.RWMutex{} ? + + // tableMutex wraps access to the table map. + tableMutex *sync.RWMutex + + // refCount keeps track of vertex and edge references across the entire + // graph. + refCount *RefCount + + // wgTxn blocks shutdown until the initial Txn has Reversed. + wgTxn *sync.WaitGroup + + // firstTxn checks to make sure wgTxn is only used for the first Txn. + firstTxn bool + + wg *sync.WaitGroup + + // pause/resume state machine signals + pauseChan chan struct{} + pausedChan chan struct{} + resumeChan chan struct{} + resumedChan chan struct{} + + // resend tracks which new nodes might need a new notification + resend map[interfaces.Func]struct{} + + // nodeWaitFns is a list of cleanup functions to run after we've begun + // resume, but before we've resumed completely. These are actions that + // we would like to do when paused from a deleteVertex operation, but + // that would deadlock if we did. + nodeWaitFns []func() + + // nodeWaitMutex wraps access to the nodeWaitFns list. + nodeWaitMutex *sync.Mutex + + // streamChan is used to send notifications to the outside world. + streamChan chan error + + loaded bool // are all of the funcs loaded? + loadedChan chan struct{} // funcs loaded signal + + startedChan chan struct{} // closes when Run() starts + + // wakeChan contains a message when someone has asked for us to wake up. + wakeChan chan struct{} + + // ag is the aggregation channel which cues up outgoing events. + ag chan error + + // leafSend specifies if we should do an ag send because we have + // activity at a leaf. + leafSend bool + + // isClosed tracks nodes that have closed. This list is purged as they + // are removed from the graph. + isClosed map[*state]struct{} + + // activity tracks nodes that are ready to send to ag. The main process + // loop decides if we have the correct set to do so. A corresponding + // value of true means we have regular activity, and a value of false + // means the node closed. + activity map[*state]struct{} + + // stateMutex wraps access to the isClosed and activity maps. + stateMutex *sync.Mutex + + // stats holds some statistics and other debugging information. + stats *stats // guarded by statsMutex + + // statsMutex wraps access to the stats data. + statsMutex *sync.RWMutex + + // graphvizMutex wraps access to the Graphviz method. + graphvizMutex *sync.Mutex + + // graphvizCount keeps a running tally of how many graphs we've + // generated. This is useful for displaying a sequence (timeline) of + // graphs in a linear order. + graphvizCount int64 + + // graphvizDirectory stores the generated path for outputting graphviz + // files if one is not specified at runtime. + graphvizDirectory string +} + +// Setup sets up the internal datastructures needed for this engine. +func (obj *Engine) Setup() error { + var err error + obj.graph, err = pgraph.NewGraph(obj.Name) + if err != nil { + return err + } + obj.table = make(map[interfaces.Func]types.Value) + obj.state = make(map[interfaces.Func]*state) + obj.graphMutex = &sync.Mutex{} // TODO: &sync.RWMutex{} ? + obj.tableMutex = &sync.RWMutex{} + + obj.refCount = (&RefCount{}).Init() + + obj.wgTxn = &sync.WaitGroup{} + + obj.wg = &sync.WaitGroup{} + + obj.pauseChan = make(chan struct{}) + obj.pausedChan = make(chan struct{}) + obj.resumeChan = make(chan struct{}) + obj.resumedChan = make(chan struct{}) + + obj.resend = make(map[interfaces.Func]struct{}) + + obj.nodeWaitFns = []func(){} + + obj.nodeWaitMutex = &sync.Mutex{} + + obj.streamChan = make(chan error) + obj.loadedChan = make(chan struct{}) + obj.startedChan = make(chan struct{}) + + obj.wakeChan = make(chan struct{}, 1) // hold up to one message + + obj.ag = make(chan error) + + obj.isClosed = make(map[*state]struct{}) + + obj.activity = make(map[*state]struct{}) + obj.stateMutex = &sync.Mutex{} + + obj.stats = &stats{ + runningList: make(map[*state]struct{}), + loadedList: make(map[*state]bool), + inputList: make(map[*state]int64), + } + obj.statsMutex = &sync.RWMutex{} + + obj.graphvizMutex = &sync.Mutex{} + return nil +} + +// Cleanup cleans up and frees memory and resources after everything is done. +func (obj *Engine) Cleanup() error { + obj.wg.Wait() // don't cleanup these before Run() finished + close(obj.pauseChan) // free + close(obj.pausedChan) + close(obj.resumeChan) + close(obj.resumedChan) + return nil +} + +// Txn returns a transaction that is suitable for adding and removing from the +// graph. You must run Setup before this method is called. +func (obj *Engine) Txn() interfaces.Txn { + if obj.refCount == nil { + panic("you must run setup before first use") + } + // The very first initial Txn must have a wait group to make sure if we + // shutdown (in error) that we can Reverse things before the Lock/Unlock + // loop shutsdown. + var free func() + if !obj.firstTxn { + obj.firstTxn = true + obj.wgTxn.Add(1) + free = func() { + obj.wgTxn.Done() + } + } + return (&graphTxn{ + Lock: obj.Lock, + Unlock: obj.Unlock, + GraphAPI: obj, + RefCount: obj.refCount, // reference counting + FreeFunc: free, + }).init() +} + +// addVertex is the lockless version of the AddVertex function. This is needed +// so that AddEdge can add two vertices within the same lock. +func (obj *Engine) addVertex(f interfaces.Func) error { + if _, exists := obj.state[f]; exists { + // don't err dupes, because it makes using the AddEdge API yucky + return nil + } + + // add some extra checks for easier debugging + if f == nil { + return fmt.Errorf("missing func") + } + if f.Info() == nil { + return fmt.Errorf("missing func info") + } + sig := f.Info().Sig + if sig == nil { + return fmt.Errorf("missing func sig") + } + if sig.Kind != types.KindFunc { + return fmt.Errorf("must be kind func") + } + if err := f.Validate(); err != nil { + return errwrap.Wrapf(err, "node did not Validate") + } + + input := make(chan types.Value) + output := make(chan types.Value) + txn := obj.Txn() + + // This is the one of two places where we modify this map. To avoid + // concurrent writes, we only do this when we're locked! Anywhere that + // can read where we are locked must have a mutex around it or do the + // lookup when we're in an unlocked state. + node := &state{ + Func: f, + name: f.String(), // cache a name to avoid locks + + input: input, + output: output, + txn: txn, + + running: false, + wg: &sync.WaitGroup{}, + + rwmutex: &sync.RWMutex{}, + } + + init := &interfaces.Init{ + Hostname: obj.Hostname, + Input: node.input, + Output: node.output, + Txn: node.txn, + World: obj.World, + Debug: obj.Debug, + Logf: func(format string, v ...interface{}) { + // safe Logf in case f.String contains %? chars... + s := f.String() + ": " + fmt.Sprintf(format, v...) + obj.Logf("%s", s) + }, + } + + if err := f.Init(init); err != nil { + return err + } + // only now, do we modify the graph + obj.state[f] = node + obj.graph.AddVertex(f) + return nil +} + +// AddVertex is the thread-safe way to add a vertex. You will need to call the +// engine Lock method before using this and the Unlock method afterwards. +func (obj *Engine) AddVertex(f interfaces.Func) error { + obj.graphMutex.Lock() + defer obj.graphMutex.Unlock() + if obj.Debug { + obj.Logf("Engine:AddVertex: %p %s", f, f) + } + + return obj.addVertex(f) // lockless version +} + +// AddEdge is the thread-safe way to add an edge. You will need to call the +// engine Lock method before using this and the Unlock method afterwards. This +// will automatically run AddVertex on both input vertices if they are not +// already part of the graph. You should only create DAG's as this function +// engine cannot handle cycles and this method will error if you cause a cycle. +func (obj *Engine) AddEdge(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) error { + obj.graphMutex.Lock() + defer obj.graphMutex.Unlock() + if obj.Debug { + obj.Logf("Engine:AddEdge %p %s: %p %s -> %p %s", fe, fe, f1, f1, f2, f2) + } + + // safety check to avoid cycles + g := obj.graph.Copy() + //g.AddVertex(f1) + //g.AddVertex(f2) + g.AddEdge(f1, f2, fe) + if _, err := g.TopologicalSort(); err != nil { + return err // not a dag + } + // if we didn't cycle, we can modify the real graph safely... + + // Does the graph already have these nodes in it? + hasf1 := obj.graph.HasVertex(f1) + //hasf2 := obj.graph.HasVertex(f2) + + if err := obj.addVertex(f1); err != nil { // lockless version + return err + } + if err := obj.addVertex(f2); err != nil { + // rollback f1 on error of f2 + obj.deleteVertex(f1) // ignore any error + return err + } + + // If f1 doesn't exist, let f1 (or it's incoming nodes) get the notify. + // If f2 is new, then it should get a new notification unless f1 is new. + // But there's no guarantee we didn't AddVertex(f2); AddEdge(f1, f2, e), + // so resend if f1 already exists. Otherwise it's not a new notification. + // previously: `if hasf1 && !hasf2` + if hasf1 { + obj.resend[f2] = struct{}{} // resend notification to me + } + + obj.graph.AddEdge(f1, f2, fe) // replaces any existing edge here + + // This shouldn't error, since the test graph didn't find a cycle. + if _, err := obj.graph.TopologicalSort(); err != nil { + // programming error + panic(err) // not a dag + } + + return nil +} + +// deleteVertex is the lockless version of the DeleteVertex function. This is +// needed so that AddEdge can add two vertices within the same lock. It needs +// deleteVertex so it can rollback the first one if the second addVertex fails. +func (obj *Engine) deleteVertex(f interfaces.Func) error { + node, exists := obj.state[f] + if !exists { + return fmt.Errorf("vertex %p %s doesn't exist", f, f) + } + + if node.running { + // cancel the running vertex + node.cancel() // cancel inner ctx + + // We store this work to be performed later on in the main loop + // because this Wait() might be blocked by a defer Commit, which + // is itself blocked because this deleteVertex operation is part + // of a Commit. + obj.nodeWaitMutex.Lock() + obj.nodeWaitFns = append(obj.nodeWaitFns, func() { + node.wg.Wait() // While waiting, the Stream might cause a new Reverse Commit + node.txn.Free() // Clean up when done! + obj.stateMutex.Lock() + delete(obj.isClosed, node) // avoid memory leak + obj.stateMutex.Unlock() + }) + obj.nodeWaitMutex.Unlock() + } + + // This is the one of two places where we modify this map. To avoid + // concurrent writes, we only do this when we're locked! Anywhere that + // can read where we are locked must have a mutex around it or do the + // lookup when we're in an unlocked state. + delete(obj.state, f) + obj.graph.DeleteVertex(f) + return nil +} + +// DeleteVertex is the thread-safe way to delete a vertex. You will need to call +// the engine Lock method before using this and the Unlock method afterwards. +func (obj *Engine) DeleteVertex(f interfaces.Func) error { + obj.graphMutex.Lock() + defer obj.graphMutex.Unlock() + if obj.Debug { + obj.Logf("Engine:DeleteVertex: %p %s", f, f) + } + + return obj.deleteVertex(f) // lockless version +} + +// DeleteEdge is the thread-safe way to delete an edge. You will need to call +// the engine Lock method before using this and the Unlock method afterwards. +func (obj *Engine) DeleteEdge(fe *interfaces.FuncEdge) error { + obj.graphMutex.Lock() + defer obj.graphMutex.Unlock() + if obj.Debug { + f1, f2, found := obj.graph.LookupEdge(fe) + if found { + obj.Logf("Engine:DeleteEdge: %p %s -> %p %s", f1, f1, f2, f2) + } else { + obj.Logf("Engine:DeleteEdge: not found %p %s", fe, fe) + } + } + + // Don't bother checking if edge exists first and don't error if it + // doesn't because it might have gotten deleted when a vertex did, and + // so there's no need to complain for nothing. + obj.graph.DeleteEdge(fe) + + return nil +} + +// HasVertex is the thread-safe way to check if a vertex exists in the graph. +// You will need to call the engine Lock method before using this and the Unlock +// method afterwards. +func (obj *Engine) HasVertex(f interfaces.Func) bool { + obj.graphMutex.Lock() // XXX: should this be a RLock? + defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? + + return obj.graph.HasVertex(f) +} + +// LookupEdge is the thread-safe way to check which vertices (if any) exist +// between an edge in the graph. You will need to call the engine Lock method +// before using this and the Unlock method afterwards. +func (obj *Engine) LookupEdge(fe *interfaces.FuncEdge) (interfaces.Func, interfaces.Func, bool) { + obj.graphMutex.Lock() // XXX: should this be a RLock? + defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? + + v1, v2, found := obj.graph.LookupEdge(fe) + if !found { + return nil, nil, found + } + f1, ok := v1.(interfaces.Func) + if !ok { + panic("not a Func") + } + f2, ok := v2.(interfaces.Func) + if !ok { + panic("not a Func") + } + return f1, f2, found +} + +// FindEdge is the thread-safe way to check which edge (if any) exists between +// two vertices in the graph. This is an important method in edge removal, +// because it's what you really need to know for DeleteEdge to work. Requesting +// a specific deletion isn't very sensical in this library when specified as the +// edge pointer, since we might replace it with a new edge that has new arg +// names. Instead, use this to look up what relationship you want, and then +// DeleteEdge to remove it. You will need to call the engine Lock method before +// using this and the Unlock method afterwards. +func (obj *Engine) FindEdge(f1, f2 interfaces.Func) *interfaces.FuncEdge { + obj.graphMutex.Lock() // XXX: should this be a RLock? + defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? + + edge := obj.graph.FindEdge(f1, f2) + if edge == nil { + return nil + } + fe, ok := edge.(*interfaces.FuncEdge) + if !ok { + panic("edge is not a FuncEdge") + } + + return fe +} + +// Lock must be used before modifying the running graph. Make sure to Unlock +// when done. +// XXX: should Lock take a context if we want to bail mid-way? +// TODO: could we replace pauseChan with SubscribedSignal ? +func (obj *Engine) Lock() { // pause + select { + case obj.pauseChan <- struct{}{}: + } + //obj.rwmutex.Lock() // TODO: or should it go right before pauseChan? + + // waiting for the pause to move to paused... + select { + case <-obj.pausedChan: + } + // this mutex locks at start of Run() and unlocks at finish of Run() + obj.graphMutex.Unlock() // safe to make changes now +} + +// Unlock must be used after modifying the running graph. Make sure to Lock +// beforehand. +// XXX: should Unlock take a context if we want to bail mid-way? +func (obj *Engine) Unlock() { // resume + // this mutex locks at start of Run() and unlocks at finish of Run() + obj.graphMutex.Lock() // no more changes are allowed + select { + case obj.resumeChan <- struct{}{}: + } + //obj.rwmutex.Unlock() // TODO: or should it go right after resumedChan? + + // waiting for the resume to move to resumed... + select { + case <-obj.resumedChan: + } +} + +// wake sends a message to the wake queue to wake up the main process function +// which would otherwise spin unnecessarily. This can be called anytime, and +// doesn't hurt, it only wastes cpu if there's nothing to do. This does NOT ever +// block, and that's important so it can be called from anywhere. +func (obj *Engine) wake(name string) { + // The mutex guards the len check to avoid this function sending two + // messages down the channel, because the second would block if the + // consumer isn't fast enough. This mutex makes this method effectively + // asynchronous. + //obj.wakeMutex.Lock() + //defer obj.wakeMutex.Unlock() + //if len(obj.wakeChan) > 0 { // collapse duplicate, pending wake signals + // return + //} + select { + case obj.wakeChan <- struct{}{}: // send to chan of length 1 + if obj.Debug { + obj.Logf("wake sent from: %s", name) + } + default: // this is a cheap alternative to avoid the mutex altogether! + if obj.Debug { + obj.Logf("wake skip from: %s", name) + } + // skip sending, we already have a message pending! + } +} + +// runNodeWaitFns is a helper to run the cleanup nodeWaitFns list. It clears the +// list after it runs. +func (obj *Engine) runNodeWaitFns() { + // The lock is probably not needed here, but it won't hurt either. + obj.nodeWaitMutex.Lock() + defer obj.nodeWaitMutex.Unlock() + for _, fn := range obj.nodeWaitFns { + fn() + } + obj.nodeWaitFns = []func(){} // clear +} + +// process is the inner loop that runs through the entire graph. It can be +// called successively safely, as it is roughly idempotent, and is used to push +// values through the graph. If it is interrupted, it can pick up where it left +// off on the next run. This does however require it to re-check some things, +// but that is the price we pay for being always available to unblock. +// Importantly, re-running this resumes work in progress even if there was +// caching, and that if interrupted, it'll be queued again so as to not drop a +// wakeChan notification! We know we've read all the pending incoming values, +// because the Stream reader call wake(). +func (obj *Engine) process(ctx context.Context) (reterr error) { + defer func() { + // catch programming errors + if r := recover(); r != nil { + obj.Logf("Panic in process: %+v", r) + reterr = fmt.Errorf("panic in process: %+v", r) + } + }() + + // Toposort in dependency order. + topoSort, err := obj.graph.TopologicalSort() + if err != nil { + return err + } + + loaded := true // assume we emitted at least one value for now... + + outDegree := obj.graph.OutDegree() // map[Vertex]int + + for _, v := range topoSort { + f, ok := v.(interfaces.Func) + if !ok { + panic("not a Func") + } + node, exists := obj.state[f] + if !exists { + panic(fmt.Sprintf("missing node in iterate: %s", f)) + } + + out, exists := outDegree[f] + if !exists { + panic(fmt.Sprintf("missing out degree in iterate: %s", f)) + } + //outgoing := obj.graph.OutgoingGraphVertices(f) // []pgraph.Vertex + //node.isLeaf = len(outgoing) == 0 + node.isLeaf = out == 0 // store + + // TODO: the obj.loaded stuff isn't really consumed currently + node.rwmutex.RLock() + if !node.loaded { + loaded = false // we were wrong + } + node.rwmutex.RUnlock() + + // TODO: memoize since graph shape doesn't change in this loop! + incoming := obj.graph.IncomingGraphVertices(f) // []pgraph.Vertex + + // no incoming edges, so no incoming data + if len(incoming) == 0 || node.inputClosed { // we do this below + if !node.inputClosed { + node.inputClosed = true + close(node.input) + } + continue + } // else, process input data below... + + ready := true // assume all input values are ready for now... + inputClosed := true // assume all inputs have closed for now... + si := &types.Type{ + // input to functions are structs + Kind: types.KindStruct, + Map: node.Func.Info().Sig.Map, + Ord: node.Func.Info().Sig.Ord, + } + st := types.NewStruct(si) + // The above builds a struct with fields + // populated for each key (empty values) + // so we need to very carefully check if + // every field is received before we can + // safely send it downstream to an edge. + need := make(map[string]struct{}) // keys we need + for _, k := range node.Func.Info().Sig.Ord { + need[k] = struct{}{} + } + + for _, vv := range incoming { + ff, ok := vv.(interfaces.Func) + if !ok { + panic("not a Func") + } + obj.tableMutex.RLock() + value, exists := obj.table[ff] + obj.tableMutex.RUnlock() + if !exists { + ready = false // nope! + inputClosed = false // can't be, it's not even ready yet + break + } + // XXX: do we need a lock around reading obj.state? + fromNode, exists := obj.state[ff] + if !exists { + panic(fmt.Sprintf("missing node in notify: %s", ff)) + } + if !fromNode.outputClosed { + inputClosed = false // if any still open, then we are + } + + // set each arg, since one value + // could get used for multiple + // function inputs (shared edge) + args := obj.graph.Adjacency()[ff][f].(*interfaces.FuncEdge).Args + for _, arg := range args { + // populate struct + if err := st.Set(arg, value); err != nil { + //panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v", node, fromNode, err)) + keys := []string{} + for k := range st.Struct() { + keys = append(keys, k) + } + panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v, has: %v", node, fromNode, err, keys)) + } + if _, exists := need[arg]; !exists { + keys := []string{} + for k := range st.Struct() { + keys = append(keys, k) + } + // could be either a duplicate or an unwanted field (edge name) + panic(fmt.Sprintf("unexpected struct key on `%s` from `%s`: %v, has: %v", node, fromNode, err, keys)) + } + delete(need, arg) + } + } + + if !ready || len(need) != 0 { + continue // definitely continue, don't break here + } + + // previously it was closed, skip sending + if node.inputClosed { + continue + } + + // XXX: respect the info.Pure and info.Memo fields somewhere... + + // XXX: keep track of some state about who i sent to last before + // being interrupted so that I can avoid resending to some nodes + // if it's not necessary... + + // It's critical to avoid deadlock with this sending select that + // any events that could happen during this send can be + // preempted and that future executions of this function can be + // resumed. We must return with an error to let folks know that + // we were interrupted. + obj.Logf("send to func `%s`", node) + select { + case node.input <- st: // send to function + obj.statsMutex.Lock() + val, _ := obj.stats.inputList[node] // val is # or zero + obj.stats.inputList[node] = val + 1 // increment + obj.statsMutex.Unlock() + // pass + + case <-node.ctx.Done(): // node died + obj.wake("node.ctx.Done()") // interrupted, so queue again + // This scenario *can* happen, although it is rare. It + // triggered the old `chFn && errFn == context.Canceled` + // case which we've now removed. + //return node.ctx.Err() // old behaviour which was wrong + continue // probably best to return and come finish later + + case <-ctx.Done(): + obj.wake("node ctx.Done()") // interrupted, so queue again + return ctx.Err() + } + + // It's okay if this section gets preempted and we re-run this + // function. The worst that happens is we end up sending the + // same input data a second time. This means that we could in + // theory be causing unnecessary graph changes (and locks which + // cause preemption here) if nodes that cause locks aren't + // skipping duplicate/identical input values! + if inputClosed && !node.inputClosed { + node.inputClosed = true + close(node.input) + } + + // XXX: Do we need to somehow wait to make sure that node has + // the time to send at least one output? + // XXX: We could add a counter to each input that gets passed + // through the function... Eg: if we pass in 4, we should wait + // until a 4 comes out the output side. But we'd need to change + // the signature of func for this... + + } // end topoSort loop + + // It's okay if this section gets preempted and we re-run this bit here. + obj.loaded = loaded // this gets reset when graph adds new nodes + + if !loaded { + return nil + } + + // Check each leaf and make sure they're all ready to send, for us to + // send anything to ag channel. In addition, we need at least one send + // message from any of the valid isLeaf nodes. Since this only runs if + // everyone is loaded, we just need to check for activty leaf nodes. + obj.stateMutex.Lock() + for node := range obj.activity { + if obj.leafSend { + break // early + } + + // down here we need `true` activity! + if node.isLeaf { // calculated above in the previous loop + obj.leafSend = true + break + } + } + obj.activity = make(map[*state]struct{}) // clear + //clear(obj.activity) // new clear + + // This check happens here after the send loop to make sure one value + // got in and we didn't close it off too early. + for node := range obj.isClosed { // these are closed + node.outputClosed = true + } + obj.stateMutex.Unlock() + + if !obj.leafSend { + return nil + } + + select { + case obj.ag <- nil: // send to aggregate channel if we have events + obj.Logf("aggregated send") + obj.leafSend = false // reset + + case <-ctx.Done(): + obj.leafSend = true // since we skipped the ag send! + obj.wake("process ctx.Done()") // interrupted, so queue again + return ctx.Err() + + // XXX: should we even allow this default case? + //default: + // // exit if we're not ready to send to ag + // obj.leafSend = true // since we skipped the ag send! + // obj.wake("process default") // interrupted, so queue again + } + + return nil +} + +// Run kicks off the main engine. This takes a mutex. When we're "paused" the +// mutex is temporarily released until we "resume". Those operations transition +// with the engine Lock and Unlock methods. It is recommended to only add +// vertices to the engine after it's running. If you add them before Run, then +// Run will cause a Lock/Unlock to occur to cycle them in. Lock and Unlock race +// with the cancellation of this Run main loop. Make sure to only call one at a +// time. +func (obj *Engine) Run(ctx context.Context) (reterr error) { + obj.graphMutex.Lock() + defer obj.graphMutex.Unlock() + + // XXX: can the above defer get called while we are already unlocked? + // XXX: is it a possibility if we use <-Started() ? + + wg := &sync.WaitGroup{} + defer wg.Wait() + + defer func() { + // catch programming errors + if r := recover(); r != nil { + obj.Logf("Panic in Run: %+v", r) + reterr = fmt.Errorf("panic in Run: %+v", r) + } + }() + + ctx, cancel := context.WithCancel(ctx) // wrap parent + defer cancel() + + // Add a wait before the "started" signal runs so that Cleanup waits. + obj.wg.Add(1) + defer obj.wg.Done() + + // Send the start signal. + close(obj.startedChan) + + if n := obj.graph.NumVertices(); n > 0 { // hack to make the api easier + obj.Logf("graph contained %d vertices before Run", n) + wg.Add(1) + go func() { + defer wg.Done() + // kick the engine once to pull in any vertices from + // before we started running! + defer obj.Unlock() + obj.Lock() + }() + } + + once := &sync.Once{} + loadedSignal := func() { close(obj.loadedChan) } // only run once! + + // aggregate events channel + wg.Add(1) + go func() { + defer wg.Done() + defer close(obj.streamChan) + drain := false + for { + var err error + var ok bool + select { + case err, ok = <-obj.ag: // aggregated channel + if !ok { + return // channel shutdown + } + } + + if drain { + continue // no need to send more errors + } + + // TODO: check obj.loaded first? + once.Do(loadedSignal) + + // now send event... + if obj.Callback != nil { + // send stream signal (callback variant) + obj.Callback(ctx, err) + } else { + // send stream signal + select { + // send events or errors on streamChan + case obj.streamChan <- err: // send + case <-ctx.Done(): // when asked to exit + return + } + } + if err != nil { + cancel() // cancel the context! + //return // let the obj.ag channel drain + drain = true + } + } + }() + + // wgAg is a wait group that waits for all senders to the ag chan. + // Exceptionally, we don't close the ag channel until wgFor has also + // closed, because it can send to wg in process(). + wgAg := &sync.WaitGroup{} + wgFor := &sync.WaitGroup{} + + // We need to keep the main loop running until everyone else has shut + // down. When the top context closes, we wait for everyone to finish, + // and then we shut down this main context. + //mainCtx, mainCancel := context.WithCancel(ctx) // wrap parent + mainCtx, mainCancel := context.WithCancel(context.Background()) // DON'T wrap parent, close on your own terms + defer mainCancel() + + // close the aggregate channel when everyone is done with it... + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-ctx.Done(): + } + + // don't wait and close ag before we're really done with Run() + wgAg.Wait() // wait for last ag user to close + obj.wgTxn.Wait() // wait for first txn as well + mainCancel() // only cancel after wgAg goroutines are done + wgFor.Wait() // wait for process loop to close before closing + close(obj.ag) // last one closes the ag channel + }() + + wgFn := &sync.WaitGroup{} // wg for process function runner + defer wgFn.Wait() // extra safety + + defer obj.runNodeWaitFns() // just in case + + wgFor.Add(1) // make sure we wait for the below process loop to exit... + defer wgFor.Done() + + // errProcess and processBreakFn are used to help exit following an err. + // This approach is needed because if we simply exited, we'd block the + // main loop below because various Stream functions are waiting on the + // Lock/Unlock cycle to be able to finish cleanly, shutdown, and unblock + // all the waitgroups so we can exit. + var errProcess error + var pausedProcess bool + processBreakFn := func(err error /*, paused bool*/) { + if err == nil { // a nil error won't cause ag to shutdown below + panic("expected error, not nil") + } + obj.Logf("process break") + select { + case obj.ag <- err: // send error to aggregate channel + case <-ctx.Done(): + } + cancel() // to unblock + //mainCancel() // NO! + errProcess = err // set above error + //pausedProcess = paused // set this inline directly + } + if obj.Debug { + defer obj.Logf("exited main loop") + } + // we start off "running", but we'll have an empty graph initially... + for { + + // After we've resumed, we can try to exit. (shortcut) + // NOTE: If someone calls Lock(), which would send to + // obj.pauseChan, it *won't* deadlock here because mainCtx is + // only closed when all the worker waitgroups close first! + select { + case <-mainCtx.Done(): // when asked to exit + return errProcess // we exit happily + default: + } + + // run through our graph, check for pause request occasionally + for { + pausedProcess = false // reset + // if we're in errProcess, we skip the process loop! + if errProcess != nil { + break // skip this process loop + } + + // Start the process run for this iteration of the loop. + ctxFn, cancelFn := context.WithCancel(context.Background()) + // we run cancelFn() below to cleanup! + var errFn error + chanFn := make(chan struct{}) // normal exit signal + wgFn.Add(1) + go func() { + defer wgFn.Done() + defer close(chanFn) // signal that I exited + for { + if obj.Debug { + obj.Logf("process...") + } + if errFn = obj.process(ctxFn); errFn != nil { // store + if errFn != context.Canceled { + obj.Logf("process end err: %+v...", errFn) + } + return + } + if obj.Debug { + obj.Logf("process end...") + } + // If process finishes without error, we + // should sit here and wait until we get + // run again from a wake-up, or we exit. + select { + case <-obj.wakeChan: // wait until something has actually woken up... + obj.Logf("process wakeup...") + // loop! + case <-ctxFn.Done(): + errFn = context.Canceled + return + } + } + }() + + chFn := false + chPause := false + ctxExit := false + select { + //case <-obj.wakeChan: + // this happens entirely in the process inner, inner loop now. + + case <-chanFn: // process exited on it's own in error! + chFn = true + + case <-obj.pauseChan: + obj.Logf("pausing...") + chPause = true + + case <-mainCtx.Done(): // when asked to exit + //return nil // we exit happily + ctxExit = true + } + + //fmt.Printf("chPause: %+v\n", chPause) // debug + //fmt.Printf("ctxExit: %+v\n", ctxExit) // debug + + cancelFn() // cancel the process function + wgFn.Wait() // wait for the process function to return + + pausedProcess = chPause // tell the below select + if errFn == nil { + // break on errors (needs to know if paused) + processBreakFn(fmt.Errorf("unexpected nil error in process")) + break + } + if errFn != nil && errFn != context.Canceled { + // break on errors (needs to know if paused) + processBreakFn(errwrap.Wrapf(errFn, "process error")) + break + } + //if errFn == context.Canceled { + // // ignore, we asked for it + //} + + if ctxExit { + return nil // we exit happily + } + if chPause { + break + } + + // This used to happen if a node (in the list we are + // sending to) dies, and we returned with: + // `case <-node.ctx.Done():` // node died + // return node.ctx.Err() + // which caused this scenario. + if chFn && errFn == context.Canceled { // very rare case + // programming error + processBreakFn(fmt.Errorf("legacy unhandled process state")) + break + } + + // programming error + //return fmt.Errorf("unhandled process state") + processBreakFn(fmt.Errorf("unhandled process state")) + break + } + // if we're in errProcess, we need to add back in the pauseChan! + if errProcess != nil && !pausedProcess { + select { + case <-obj.pauseChan: + obj.Logf("lower pausing...") + + // do we want this exit case? YES + case <-mainCtx.Done(): // when asked to exit + return errProcess + } + } + + // Toposort for paused workers. We run this before the actual + // pause completes, because the second we are paused, the graph + // could then immediately change. We don't need a lock in here + // because the mutex only unlocks when pause is complete below. + //topoSort1, err := obj.graph.TopologicalSort() + //if err != nil { + // return err + //} + //for _, v := range topoSort1 {} + + // pause is complete + // no exit case from here, must be fully running or paused... + select { + case obj.pausedChan <- struct{}{}: + obj.Logf("paused!") + } + + // + // the graph changes shape right here... we are locked right now + // + + // wait until resumed/unlocked + select { + case <-obj.resumeChan: + obj.Logf("resuming...") + } + + // Do any cleanup needed from delete vertex. Or do we? + // We've ascertained that while we want this stuff to shutdown, + // and while we also know that a Stream() function running is a + // part of what we're waiting for to exit, it doesn't matter + // that it exits now! This is actually causing a deadlock + // because the pending Stream exit, might be calling a new + // Reverse commit, which means we're deadlocked. It's safe for + // the Stream to keep running, all it might do is needlessly add + // a new value to obj.table which won't bother us since we won't + // even use it in process. We _do_ want to wait for all of these + // before the final exit, but we already have that in a defer. + //obj.runNodeWaitFns() + + // Toposort to run/resume workers. (Bottom of toposort first!) + topoSort2, err := obj.graph.TopologicalSort() + if err != nil { + return err + } + reversed := pgraph.Reverse(topoSort2) + for _, v := range reversed { + f, ok := v.(interfaces.Func) + if !ok { + panic("not a Func") + } + node, exists := obj.state[f] + if !exists { + panic(fmt.Sprintf("missing node in iterate: %s", f)) + } + + if node.running { // it's not a new vertex + continue + } + obj.loaded = false // reset this + node.running = true + + obj.statsMutex.Lock() + val, _ := obj.stats.inputList[node] // val is # or zero + obj.stats.inputList[node] = val // initialize to zero + obj.statsMutex.Unlock() + + innerCtx, innerCancel := context.WithCancel(ctx) // wrap parent (not mainCtx) + // we defer innerCancel() in the goroutine to cleanup! + node.ctx = innerCtx + node.cancel = innerCancel + + // run mainloop + wgAg.Add(1) + node.wg.Add(1) + go func(f interfaces.Func, node *state) { + defer node.wg.Done() + defer wgAg.Done() + defer node.cancel() // if we close, clean up and send the signal to anyone watching + if obj.Debug { + obj.Logf("Running func `%s`", node) + obj.statsMutex.Lock() + obj.stats.runningList[node] = struct{}{} + obj.stats.loadedList[node] = false + obj.statsMutex.Unlock() + } + + fn := func(nodeCtx context.Context) (reterr error) { + defer func() { + // catch programming errors + if r := recover(); r != nil { + obj.Logf("Panic in Stream of func `%s`: %+v", node, r) + reterr = fmt.Errorf("panic in Stream of func `%s`: %+v", node, r) + } + }() + return f.Stream(nodeCtx) + } + runErr := fn(node.ctx) // wrap with recover() + if obj.Debug { + obj.Logf("Exiting func `%s`", node) + obj.statsMutex.Lock() + delete(obj.stats.runningList, node) + obj.statsMutex.Unlock() + } + if runErr != nil { + obj.Logf("Erroring func `%s`: %+v", node, runErr) + // send to a aggregate channel + // the first to error will cause ag to + // shutdown, so make sure we can exit... + select { + case obj.ag <- runErr: // send to aggregate channel + case <-node.ctx.Done(): + } + } + // if node never loaded, then we error in the node.output loop! + }(f, node) + + // consume output + wgAg.Add(1) + node.wg.Add(1) + go func(f interfaces.Func, node *state) { + defer node.wg.Done() + defer wgAg.Done() + defer func() { + // We record the fact that output + // closed, so we can eventually close + // the downstream node's input. + obj.stateMutex.Lock() + obj.isClosed[node] = struct{}{} // closed! + obj.stateMutex.Unlock() + // TODO: is this wake necessary? + obj.wake("closed") // closed, so wake up + }() + + for value := range node.output { // read from channel + if value == nil { + // bug! + obj.Logf("func `%s` got nil value", node) + panic("got nil value") + } + + obj.tableMutex.RLock() + cached, exists := obj.table[f] + obj.tableMutex.RUnlock() + if !exists { // first value received + // RACE: do this AFTER value is present! + //node.loaded = true // not yet please + obj.Logf("func `%s` started", node) + } else if value.Cmp(cached) == nil { + // skip if new value is same as previous + // if this happens often, it *might* be + // a bug in the function implementation + // FIXME: do we need to disable engine + // caching when using hysteresis? + obj.Logf("func `%s` skipped", node) + continue + } + obj.tableMutex.Lock() + obj.table[f] = value // save the latest + obj.tableMutex.Unlock() + node.rwmutex.Lock() + node.loaded = true // set *after* value is in :) + //obj.Logf("func `%s` changed", node) + node.rwmutex.Unlock() + + obj.statsMutex.Lock() + obj.stats.loadedList[node] = true + obj.statsMutex.Unlock() + + // Send a message to tell our ag channel + // that we might have sent an aggregated + // message here. They should check if we + // are a leaf and if we glitch or not... + // Make sure we do this before the wake. + obj.stateMutex.Lock() + obj.activity[node] = struct{}{} // activity! + obj.stateMutex.Unlock() + + obj.wake("new value") // new value, so send wake up + + } // end for + + // no more output values are coming... + //obj.Logf("func `%s` stopped", node) + + // nodes that never loaded will cause the engine to hang + if !node.loaded { + select { + case obj.ag <- fmt.Errorf("func `%s` stopped before it was loaded", node): + case <-node.ctx.Done(): + return + } + } + + }(f, node) + + } // end for + + // Send new notifications in case any new edges are sending away + // to these... They might have already missed the notifications! + for k := range obj.resend { // resend TO these! + node, exists := obj.state[k] + if !exists { + continue + } + // Run as a goroutine to avoid erroring in parent thread. + wg.Add(1) + go func(node *state) { + defer wg.Done() + obj.Logf("resend to func `%s`", node) + obj.wake("resend") // new value, so send wake up + }(node) + } + obj.resend = make(map[interfaces.Func]struct{}) // reset + + // now check their states... + //for _, v := range reversed { + // v, ok := v.(interfaces.Func) + // if !ok { + // panic("not a Func") + // } + // // wait for startup? + // close(obj.state[v].startup) XXX: once? + //} + + // resume is complete + // no exit case from here, must be fully running or paused... + select { + case obj.resumedChan <- struct{}{}: + obj.Logf("resumed!") + } + + } // end for +} + +// Stream returns a channel that you can follow to get aggregated graph events. +// Do not block reading from this channel as you can hold up the entire engine. +func (obj *Engine) Stream() <-chan error { + return obj.streamChan +} + +// Loaded returns a channel that closes when the function engine loads. +func (obj *Engine) Loaded() <-chan struct{} { + return obj.loadedChan +} + +// Table returns a copy of the populated data table of values. We return a copy +// because since these values are constantly changing, we need an atomic +// snapshot to present to the consumer of this API. +// TODO: is this globally glitch consistent? +// TODO: do we need an API to return a single value? (wrapped in read locks) +func (obj *Engine) Table() map[interfaces.Func]types.Value { + obj.tableMutex.RLock() + defer obj.tableMutex.RUnlock() + table := make(map[interfaces.Func]types.Value) + for k, v := range obj.table { + //table[k] = v.Copy() // TODO: do we need to copy these values? + table[k] = v + } + return table +} + +// Apply is similar to Table in that it gives you access to the internal output +// table of data, the difference being that it instead passes this information +// to a function of your choosing and holds a read/write lock during the entire +// time that your function is synchronously executing. If you use this function +// to spawn any goroutines that read or write data, then you're asking for a +// panic. +// XXX: does this need to be a Lock? Can it be an RLock? Check callers! +func (obj *Engine) Apply(fn func(map[interfaces.Func]types.Value) error) error { + // XXX: does this need to be a Lock? Can it be an RLock? Check callers! + obj.tableMutex.Lock() // differs from above RLock around obj.table + defer obj.tableMutex.Unlock() + table := make(map[interfaces.Func]types.Value) + for k, v := range obj.table { + //table[k] = v.Copy() // TODO: do we need to copy these values? + table[k] = v + } + + return fn(table) +} + +// Started returns a channel that closes when the Run function finishes starting +// up. This is useful so that we can wait before calling any of the mutex things +// that would normally panic if Run wasn't started up first. +func (obj *Engine) Started() <-chan struct{} { + return obj.startedChan +} + +// NumVertices returns the number of vertices in the current graph. +func (obj *Engine) NumVertices() int { + // XXX: would this deadlock if we added this? + //obj.graphMutex.Lock() // XXX: should this be a RLock? + //defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? + return obj.graph.NumVertices() +} + +// Stats returns some statistics in a human-readable form. +func (obj *Engine) Stats() string { + defer obj.statsMutex.RUnlock() + obj.statsMutex.RLock() + + return obj.stats.String() +} + +// Graphviz writes out the diagram of a graph to be used for visualization and +// debugging. You must not modify the graph (eg: during Lock) when calling this +// method. +func (obj *Engine) Graphviz(dir string) error { + // XXX: would this deadlock if we added this? + //obj.graphMutex.Lock() // XXX: should this be a RLock? + //defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? + + obj.graphvizMutex.Lock() + defer obj.graphvizMutex.Unlock() + + obj.graphvizCount++ // increment + + if dir == "" { + dir = obj.graphvizDirectory + } + if dir == "" { // XXX: hack for ergonomics + d := time.Now().UnixMilli() + dir = fmt.Sprintf("/tmp/dage-graphviz-%s-%d/", obj.Name, d) + obj.graphvizDirectory = dir + } + if !strings.HasSuffix(dir, "/") { + return fmt.Errorf("dir must end with a slash") + } + + if err := os.MkdirAll(dir, 0755); err != nil { + return err + } + + dashedEdges, err := pgraph.NewGraph("dashedEdges") + if err != nil { + return err + } + for _, v1 := range obj.graph.Vertices() { + // if it's a ChannelBasedSinkFunc... + if cb, ok := v1.(*structs.ChannelBasedSinkFunc); ok { + // ...then add a dashed edge to its output + dashedEdges.AddEdge(v1, cb.Target, &pgraph.SimpleEdge{ + Name: "channel", // secret channel + }) + } + // if it's a ChannelBasedSourceFunc... + if cb, ok := v1.(*structs.ChannelBasedSourceFunc); ok { + // ...then add a dashed edge from its input + dashedEdges.AddEdge(cb.Source, v1, &pgraph.SimpleEdge{ + Name: "channel", // secret channel + }) + } + } + + gv := &pgraph.Graphviz{ + Name: obj.graph.GetName(), + Filename: fmt.Sprintf("%s/%d.dot", dir, obj.graphvizCount), + Graphs: map[*pgraph.Graph]*pgraph.GraphvizOpts{ + obj.graph: nil, + dashedEdges: { + Style: "dashed", + }, + }, + } + + if err := gv.Exec(); err != nil { + return err + } + return nil +} + +// state tracks some internal vertex-specific state information. +type state struct { + Func interfaces.Func + name string // cache a name here for safer concurrency + + input chan types.Value // the top level type must be a struct + output chan types.Value + txn interfaces.Txn // API of graphTxn struct to pass to each function + + //init bool // have we run Init on our func? + //ready bool // has it received all the args it needs at least once? + loaded bool // has the func run at least once ? + inputClosed bool // is our input closed? + outputClosed bool // is our output closed? + + isLeaf bool // is my out degree zero? + + running bool + wg *sync.WaitGroup + ctx context.Context // per state ctx (inner ctx) + cancel func() // cancel above inner ctx + + rwmutex *sync.RWMutex // concurrency guard for reading/modifying this state +} + +// String implements the fmt.Stringer interface for pretty printing! +func (obj *state) String() string { + if obj.name != "" { + return obj.name + } + + return obj.Func.String() +} + +// stats holds some statistics and other debugging information. +type stats struct { + + // runningList keeps track of which nodes are still running. + runningList map[*state]struct{} + + // loadedList keeps track of which nodes have loaded. + loadedList map[*state]bool + + // inputList keeps track of the number of inputs each node received. + inputList map[*state]int64 +} + +// String implements the fmt.Stringer interface for printing out our collected +// statistics! +func (obj *stats) String() string { + // XXX: just build the lock into *stats instead of into our dage obj + s := "stats:\n" + { + s += "\trunning:\n" + names := []string{} + for k := range obj.runningList { + names = append(names, k.String()) + } + sort.Strings(names) + for _, name := range names { + s += fmt.Sprintf("\t * %s\n", name) + } + } + { + nodes := []*state{} + for k := range obj.loadedList { + nodes = append(nodes, k) + } + sort.Slice(nodes, func(i, j int) bool { return nodes[i].String() < nodes[j].String() }) + + s += "\tloaded:\n" + for _, node := range nodes { + if !obj.loadedList[node] { + continue + } + s += fmt.Sprintf("\t * %s\n", node) + } + + s += "\tnot loaded:\n" + for _, node := range nodes { + if obj.loadedList[node] { + continue + } + s += fmt.Sprintf("\t * %s\n", node) + } + } + { + s += "\tinput count:\n" + nodes := []*state{} + for k := range obj.inputList { + nodes = append(nodes, k) + } + //sort.Slice(nodes, func(i, j int) bool { return nodes[i].String() < nodes[j].String() }) + sort.Slice(nodes, func(i, j int) bool { return obj.inputList[nodes[i]] < obj.inputList[nodes[j]] }) + for _, node := range nodes { + s += fmt.Sprintf("\t * (%d) %s\n", obj.inputList[node], node) + } + } + return s +} diff --git a/lang/funcs/dage/dage_test.go b/lang/funcs/dage/dage_test.go new file mode 100644 index 00000000..9fcf44dc --- /dev/null +++ b/lang/funcs/dage/dage_test.go @@ -0,0 +1,793 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package dage + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/lang/types" + "github.com/purpleidea/mgmt/util" +) + +type testFunc struct { + Name string + Type *types.Type + Func func(types.Value) (types.Value, error) + Meta *meta + + value types.Value + init *interfaces.Init +} + +func (obj *testFunc) String() string { return obj.Name } + +func (obj *testFunc) Info() *interfaces.Info { + return &interfaces.Info{ + Pure: true, + Memo: false, // TODO: should this be something we specify here? + Sig: obj.Type, + Err: obj.Validate(), + } +} + +func (obj *testFunc) Validate() error { + if obj.Meta == nil { + return fmt.Errorf("test case error: did you add the vertex to the vertices list?") + } + return nil +} + +func (obj *testFunc) Init(init *interfaces.Init) error { + obj.init = init + return nil +} + +func (obj *testFunc) Stream(ctx context.Context) error { + defer close(obj.init.Output) // the sender closes + defer obj.init.Logf("stream closed") + obj.init.Logf("stream startup") + + // make some placeholder value because obj.value is nil + constValue, err := types.ValueOfGolang("hello") + if err != nil { + return err // unlikely + } + + for { + select { + case input, ok := <-obj.init.Input: + if !ok { + obj.init.Logf("stream input closed") + obj.init.Input = nil // don't get two closes + // already sent one value, so we can shutdown + if obj.value != nil { + return nil // can't output any more + } + + obj.value = constValue + } else { + obj.init.Logf("stream got input type(%T) value: (%+v)", input, input) + if obj.Func == nil { + obj.value = constValue + } + + if obj.Func != nil { + //obj.init.Logf("running internal function...") + v, err := obj.Func(input) // run me! + if err != nil { + return err + } + obj.value = v + } + } + + case <-ctx.Done(): + return nil + } + + select { + case obj.init.Output <- obj.value: // send anything + // add some monitoring... + obj.Meta.wg.Add(1) + go func() { + // no need for lock here + defer obj.Meta.wg.Done() + if obj.Meta.debug { + obj.Meta.logf("sending an internal event!") + } + + select { + case obj.Meta.Events[obj.Name] <- struct{}{}: + case <-obj.Meta.ctx.Done(): + } + }() + + case <-ctx.Done(): + return nil + } + } +} + +type meta struct { + EventCount int + Event chan struct{} + Events map[string]chan struct{} + + ctx context.Context + wg *sync.WaitGroup + mutex *sync.Mutex + + debug bool + logf func(format string, v ...interface{}) +} + +func (obj *meta) Lock() { obj.mutex.Lock() } +func (obj *meta) Unlock() { obj.mutex.Unlock() } + +type dageTestOp func(*Engine, interfaces.Txn, *meta) error + +func TestDageTable(t *testing.T) { + + type test struct { // an individual test + name string + vertices []interfaces.Func + actions []dageTestOp + } + testCases := []test{} + { + testCases = append(testCases, test{ + name: "empty graph", + vertices: []interfaces.Func{}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + time.Sleep(1 * time.Second) // XXX: unfortunate + defer engine.Unlock() + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + // We don't expect an empty graph to send events. + if meta.EventCount != 0 { + return fmt.Errorf("got too many stream events") + } + return nil + }, + }, + }) + } + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + + testCases = append(testCases, test{ + name: "simple add vertex", + vertices: []interfaces.Func{f1}, // so the test engine can pass in debug/observability handles + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + return engine.AddVertex(f1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 1 { + return fmt.Errorf("didn't get any stream events") + } + return nil + }, + }, + }) + } + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + // e1 arg name must match incoming edge to it + f2 := &testFunc{Name: "f2", Type: types.NewType("func(e1 str) str")} + e1 := testEdge("e1") + + testCases = append(testCases, test{ + name: "simple add edge", + vertices: []interfaces.Func{f1, f2}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + return engine.AddVertex(f1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + engine.Lock() + defer engine.Unlock() + // This newly added node should get a notification after it starts. + return engine.AddEdge(f1, f2, e1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 2 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + }, + }) + } + { + // diamond + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + f2 := &testFunc{Name: "f2", Type: types.NewType("func(e1 str) str")} + f3 := &testFunc{Name: "f3", Type: types.NewType("func(e2 str) str")} + f4 := &testFunc{Name: "f4", Type: types.NewType("func(e3 str, e4 str) str")} + e1 := testEdge("e1") + e2 := testEdge("e2") + e3 := testEdge("e3") + e4 := testEdge("e4") + + testCases = append(testCases, test{ + name: "simple add multiple edges", + vertices: []interfaces.Func{f1, f2, f3, f4}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + return engine.AddVertex(f1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + if err := engine.AddEdge(f1, f2, e1); err != nil { + return err + } + if err := engine.AddEdge(f1, f3, e2); err != nil { + return err + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + if err := engine.AddEdge(f2, f4, e3); err != nil { + return err + } + if err := engine.AddEdge(f3, f4, e4); err != nil { + return err + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + //meta.Lock() + //defer meta.Unlock() + num := 1 + for { + if num == 0 { + break + } + select { + case _, ok := <-meta.Event: + if !ok { + return fmt.Errorf("unexpectedly channel close") + } + num-- + if meta.debug { + meta.logf("got an event!") + } + case <-meta.ctx.Done(): + return meta.ctx.Err() + } + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 1 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + //meta.Lock() + //defer meta.Unlock() + num := 1 + for { + if num == 0 { + break + } + bt := util.BlockedTimer{Seconds: 2} + defer bt.Cancel() + bt.Printf("waiting for f4...\n") + select { + case _, ok := <-meta.Events["f4"]: + bt.Cancel() + if !ok { + return fmt.Errorf("unexpectedly channel close") + } + num-- + if meta.debug { + meta.logf("got an event from f4!") + } + case <-meta.ctx.Done(): + return meta.ctx.Err() + } + } + return nil + }, + }, + }) + } + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + + testCases = append(testCases, test{ + name: "simple add/delete vertex", + vertices: []interfaces.Func{f1}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + return engine.AddVertex(f1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 1 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + + //meta.Lock() + //defer meta.Unlock() + if meta.debug { + meta.logf("about to delete vertex f1!") + defer meta.logf("done deleting vertex f1!") + } + + return engine.DeleteVertex(f1) + }, + }, + }) + } + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + // e1 arg name must match incoming edge to it + f2 := &testFunc{Name: "f2", Type: types.NewType("func(e1 str) str")} + e1 := testEdge("e1") + + testCases = append(testCases, test{ + name: "simple add/delete edge", + vertices: []interfaces.Func{f1, f2}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + return engine.AddVertex(f1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + engine.Lock() + defer engine.Unlock() + // This newly added node should get a notification after it starts. + return engine.AddEdge(f1, f2, e1) + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 2 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + engine.Lock() + defer engine.Unlock() + return engine.DeleteEdge(e1) + }, + }, + }) + } + + // the following tests use the txn instead of direct locks + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + + testCases = append(testCases, test{ + name: "txn simple add vertex", + vertices: []interfaces.Func{f1}, // so the test engine can pass in debug/observability handles + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + return txn.AddVertex(f1).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 1 { + return fmt.Errorf("didn't get any stream events") + } + return nil + }, + }, + }) + } + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + // e1 arg name must match incoming edge to it + f2 := &testFunc{Name: "f2", Type: types.NewType("func(e1 str) str")} + e1 := testEdge("e1") + + testCases = append(testCases, test{ + name: "txn simple add edge", + vertices: []interfaces.Func{f1, f2}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + return txn.AddVertex(f1).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + // This newly added node should get a notification after it starts. + return txn.AddEdge(f1, f2, e1).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 2 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + }, + }) + } + { + // diamond + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + f2 := &testFunc{Name: "f2", Type: types.NewType("func(e1 str) str")} + f3 := &testFunc{Name: "f3", Type: types.NewType("func(e2 str) str")} + f4 := &testFunc{Name: "f4", Type: types.NewType("func(e3 str, e4 str) str")} + e1 := testEdge("e1") + e2 := testEdge("e2") + e3 := testEdge("e3") + e4 := testEdge("e4") + + testCases = append(testCases, test{ + name: "txn simple add multiple edges", + vertices: []interfaces.Func{f1, f2, f3, f4}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + return txn.AddVertex(f1).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + return txn.AddEdge(f1, f2, e1).AddEdge(f1, f3, e2).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + return txn.AddEdge(f2, f4, e3).AddEdge(f3, f4, e4).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + //meta.Lock() + //defer meta.Unlock() + num := 1 + for { + if num == 0 { + break + } + select { + case _, ok := <-meta.Event: + if !ok { + return fmt.Errorf("unexpectedly channel close") + } + num-- + if meta.debug { + meta.logf("got an event!") + } + case <-meta.ctx.Done(): + return meta.ctx.Err() + } + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 1 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + //meta.Lock() + //defer meta.Unlock() + num := 1 + for { + if num == 0 { + break + } + bt := util.BlockedTimer{Seconds: 2} + defer bt.Cancel() + bt.Printf("waiting for f4...\n") + select { + case _, ok := <-meta.Events["f4"]: + bt.Cancel() + if !ok { + return fmt.Errorf("unexpectedly channel close") + } + num-- + if meta.debug { + meta.logf("got an event from f4!") + } + case <-meta.ctx.Done(): + return meta.ctx.Err() + } + } + return nil + }, + }, + }) + } + { + f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + + testCases = append(testCases, test{ + name: "txn simple add/delete vertex", + vertices: []interfaces.Func{f1}, + actions: []dageTestOp{ + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + return txn.AddVertex(f1).Commit() + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + time.Sleep(1 * time.Second) // XXX: unfortunate + meta.Lock() + defer meta.Unlock() + if meta.EventCount < 1 { + return fmt.Errorf("didn't get enough stream events") + } + return nil + }, + func(engine *Engine, txn interfaces.Txn, meta *meta) error { + //meta.Lock() + //defer meta.Unlock() + if meta.debug { + meta.logf("about to delete vertex f1!") + defer meta.logf("done deleting vertex f1!") + } + + return txn.DeleteVertex(f1).Commit() + }, + }, + }) + } + //{ + // f1 := &testFunc{Name: "f1", Type: types.NewType("func() str")} + // // e1 arg name must match incoming edge to it + // f2 := &testFunc{Name: "f2", Type: types.NewType("func(e1 str) str")} + // e1 := testEdge("e1") + // + // testCases = append(testCases, test{ + // name: "txn simple add/delete edge", + // vertices: []interfaces.Func{f1, f2}, + // actions: []dageTestOp{ + // func(engine *Engine, txn interfaces.Txn, meta *meta) error { + // return txn.AddVertex(f1).Commit() + // }, + // func(engine *Engine, txn interfaces.Txn, meta *meta) error { + // time.Sleep(1 * time.Second) // XXX: unfortunate + // // This newly added node should get a notification after it starts. + // return txn.AddEdge(f1, f2, e1).Commit() + // }, + // func(engine *Engine, txn interfaces.Txn, meta *meta) error { + // time.Sleep(1 * time.Second) // XXX: unfortunate + // meta.Lock() + // defer meta.Unlock() + // if meta.EventCount < 2 { + // return fmt.Errorf("didn't get enough stream events") + // } + // return nil + // }, + // func(engine *Engine, txn interfaces.Txn, meta *meta) error { + // return txn.DeleteEdge(e1).Commit() // XXX: not implemented + // }, + // }, + // }) + //} + + if testing.Short() { + t.Logf("available tests:") + } + names := []string{} + for index, tc := range testCases { // run all the tests + if tc.name == "" { + t.Errorf("test #%d: not named", index) + continue + } + if util.StrInList(tc.name, names) { + t.Errorf("test #%d: duplicate sub test name of: %s", index, tc.name) + continue + } + names = append(names, tc.name) + + //if index != 3 { // hack to run a subset (useful for debugging) + //if tc.name != "simple txn" { + // continue + //} + + testName := fmt.Sprintf("test #%d (%s)", index, tc.name) + if testing.Short() { // make listing tests easier + t.Logf("%s", testName) + continue + } + t.Run(testName, func(t *testing.T) { + name, vertices, actions := tc.name, tc.vertices, tc.actions + + t.Logf("\n\ntest #%d (%s) ----------------\n\n", index, name) + + //logf := func(format string, v ...interface{}) { + // t.Logf(fmt.Sprintf("test #%d", index)+": "+format, v...) + //} + + //now := time.Now() + + wg := &sync.WaitGroup{} + defer wg.Wait() // defer is correct b/c we're in a func! + + min := 5 * time.Second // approx min time needed for the test + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if deadline, ok := t.Deadline(); ok { + d := deadline.Add(-min) + //t.Logf(" now: %+v", now) + //t.Logf(" d: %+v", d) + newCtx, cancel := context.WithDeadline(ctx, d) + ctx = newCtx + defer cancel() + } + + debug := testing.Verbose() // set via the -test.v flag to `go test` + + meta := &meta{ + Event: make(chan struct{}), + Events: make(map[string]chan struct{}), + + ctx: ctx, + wg: &sync.WaitGroup{}, + mutex: &sync.Mutex{}, + + debug: debug, + logf: func(format string, v ...interface{}) { + // safe Logf in case f.String contains %? chars... + s := fmt.Sprintf(format, v...) + t.Logf("%s", s) + }, + } + defer meta.wg.Wait() + + for _, f := range vertices { + testFunc, ok := f.(*testFunc) + if !ok { + t.Errorf("bad test function: %+v", f) + return + } + meta.Events[testFunc.Name] = make(chan struct{}) + testFunc.Meta = meta // add the handle + } + + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-ctx.Done(): + t.Logf("cancelling test...") + } + }() + + engine := &Engine{ + Name: "dage", + + Debug: debug, + Logf: t.Logf, + } + + if err := engine.Setup(); err != nil { + t.Errorf("could not setup engine: %+v", err) + return + } + defer engine.Cleanup() + + wg.Add(1) + go func() { + defer wg.Done() + if err := engine.Run(ctx); err != nil { + t.Errorf("error while running engine: %+v", err) + return + } + t.Logf("engine shutdown cleanly...") + }() + + <-engine.Started() // wait for startup (will not block forever) + + txn := engine.Txn() + defer txn.Free() // remember to call Free() + + wg.Add(1) + go func() { + defer wg.Done() + ch := engine.Stream() + for { + select { + case err, ok := <-ch: // channel must close to shutdown + if !ok { + return + } + meta.Lock() + meta.EventCount++ + meta.Unlock() + meta.wg.Add(1) + go func() { + // no need for lock here + defer meta.wg.Done() + if meta.debug { + meta.logf("sending an event!") + } + select { + case meta.Event <- struct{}{}: + case <-meta.ctx.Done(): + } + }() + if err != nil { + t.Errorf("graph error event: %v", err) + continue + } + t.Logf("graph stream event!") + } + } + }() + + // Run a list of actions. Any error kills it all. + t.Logf("starting actions...") + for i, action := range actions { + t.Logf("running action %d...", i+1) + if err := action(engine, txn, meta); err != nil { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: action #%d failed with: %+v", index, i, err) + break // so that cancel runs + } + } + + t.Logf("test done...") + cancel() + }) + } + + if testing.Short() { + t.Skip("skipping all tests...") + } +} diff --git a/lang/funcs/dage/ref.go b/lang/funcs/dage/ref.go new file mode 100644 index 00000000..74cd80d2 --- /dev/null +++ b/lang/funcs/dage/ref.go @@ -0,0 +1,282 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +// Package dage implements a DAG function engine. +// TODO: can we rename this to something more interesting? +package dage + +import ( + "fmt" + "sync" + + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/util/errwrap" +) + +// RefCount keeps track of vertex and edge references across the entire graph. +// Make sure to lock access somehow, ideally with the provided Locker interface. +type RefCount struct { + // mutex locks this database for read or write. + mutex *sync.Mutex + + // vertices is a reference count of the number of vertices used. + vertices map[interfaces.Func]int64 + + // edges is a reference count of the number of edges used. + edges map[*RefCountEdge]int64 // TODO: hash *RefCountEdge as a key instead +} + +// RefCountEdge is a virtual "hash" entry for the RefCount edges map key. +type RefCountEdge struct { + f1 interfaces.Func + f2 interfaces.Func + arg string +} + +// String prints a representation of the references held. +func (obj *RefCount) String() string { + s := "" + s += fmt.Sprintf("vertices (%d):\n", len(obj.vertices)) + for vertex, count := range obj.vertices { + s += fmt.Sprintf("\tvertex (%d): %p %s\n", count, vertex, vertex) + } + s += fmt.Sprintf("edges (%d):\n", len(obj.edges)) + for edge, count := range obj.edges { + s += fmt.Sprintf("\tedge (%d): %p %s -> %p %s # %s\n", count, edge.f1, edge.f1, edge.f2, edge.f2, edge.arg) + } + return s +} + +// Init must be called to initialized the struct before first use. +func (obj *RefCount) Init() *RefCount { + obj.mutex = &sync.Mutex{} + obj.vertices = make(map[interfaces.Func]int64) + obj.edges = make(map[*RefCountEdge]int64) + return obj // return self so it can be called in a chain +} + +// Lock the mutex that should be used when reading or writing from this. +func (obj *RefCount) Lock() { obj.mutex.Lock() } + +// Unlock the mutex that should be used when reading or writing from this. +func (obj *RefCount) Unlock() { obj.mutex.Unlock() } + +// VertexInc increments the reference count for the input vertex. It returns +// true if the reference count for this vertex was previously undefined or zero. +// True usually means we'd want to actually add this vertex now. If you attempt +// to increment a vertex which already has a less than zero count, then this +// will panic. This situation is likely impossible unless someone modified the +// reference counting struct directly. +func (obj *RefCount) VertexInc(f interfaces.Func) bool { + count, _ := obj.vertices[f] + obj.vertices[f] = count + 1 + if count == -1 { // unlikely, but catch any bugs + panic("negative reference count") + } + return count == 0 +} + +// VertexDec decrements the reference count for the input vertex. It returns +// true if the reference count for this vertex is now zero. True usually means +// we'd want to actually remove this vertex now. If you attempt to decrement a +// vertex which already has a zero count, then this will panic. +func (obj *RefCount) VertexDec(f interfaces.Func) bool { + count, _ := obj.vertices[f] + obj.vertices[f] = count - 1 + if count == 0 { + panic("negative reference count") + } + return count == 1 // now it's zero +} + +// EdgeInc increments the reference count for the input edge. It adds a +// reference for each arg name in the edge. Since this also increments the +// references for the two input vertices, it returns the corresponding two +// boolean values for these calls. (This function makes two calls to VertexInc.) +func (obj *RefCount) EdgeInc(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) (bool, bool) { + for _, arg := range fe.Args { // ref count each arg + r := obj.makeEdge(f1, f2, arg) + count := obj.edges[r] + obj.edges[r] = count + 1 + if count == -1 { // unlikely, but catch any bugs + panic("negative reference count") + } + } + + return obj.VertexInc(f1), obj.VertexInc(f2) +} + +// EdgeDec decrements the reference count for the input edge. It removes a +// reference for each arg name in the edge. Since this also decrements the +// references for the two input vertices, it returns the corresponding two +// boolean values for these calls. (This function makes two calls to VertexDec.) +func (obj *RefCount) EdgeDec(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) (bool, bool) { + for _, arg := range fe.Args { // ref count each arg + r := obj.makeEdge(f1, f2, arg) + count := obj.edges[r] + obj.edges[r] = count - 1 + if count == 0 { + panic("negative reference count") + } + } + + return obj.VertexDec(f1), obj.VertexDec(f2) +} + +// FreeVertex removes exactly one entry from the Vertices list or it errors. +func (obj *RefCount) FreeVertex(f interfaces.Func) error { + if count, exists := obj.vertices[f]; !exists || count != 0 { + return fmt.Errorf("no vertex of count zero found") + } + delete(obj.vertices, f) + return nil +} + +// FreeEdge removes exactly one entry from the Edges list or it errors. +func (obj *RefCount) FreeEdge(f1, f2 interfaces.Func, arg string) error { + found := []*RefCountEdge{} + for k, count := range obj.edges { + //if k == nil { // programming error + // continue + //} + if k.f1 == f1 && k.f2 == f2 && k.arg == arg && count == 0 { + found = append(found, k) + } + } + if len(found) > 1 { + return fmt.Errorf("inconsistent ref count for edge") + } + if len(found) == 0 { + return fmt.Errorf("no edge of count zero found") + } + delete(obj.edges, found[0]) // delete from map + return nil +} + +// GC runs the garbage collector on any zeroed references. Note the distinction +// between count == 0 (please delete now) and absent from the map. +func (obj *RefCount) GC(graphAPI interfaces.GraphAPI) error { + // debug + //fmt.Printf("start refs\n%s", obj.String()) + //defer func() { fmt.Printf("end refs\n%s", obj.String()) }() + free := make(map[interfaces.Func]map[interfaces.Func][]string) // f1 -> f2 + for x, count := range obj.edges { + if count != 0 { // we only care about freed things + continue + } + if _, exists := free[x.f1]; !exists { + free[x.f1] = make(map[interfaces.Func][]string) + } + if _, exists := free[x.f1][x.f2]; !exists { + free[x.f1][x.f2] = []string{} + } + free[x.f1][x.f2] = append(free[x.f1][x.f2], x.arg) // exists as refcount zero + } + + // These edges have a refcount of zero. + for f1, x := range free { + for f2, args := range x { + for _, arg := range args { + edge := graphAPI.FindEdge(f1, f2) + // any errors here are programming errors + if edge == nil { + return fmt.Errorf("missing edge from %p %s -> %p %s", f1, f1, f2, f2) + } + + once := false // sanity check + newArgs := []string{} + for _, a := range edge.Args { + if arg == a { + if once { + // programming error, duplicate arg + return fmt.Errorf("duplicate arg (%s) in edge", arg) + } + once = true + continue + } + newArgs = append(newArgs, a) + } + + if len(edge.Args) == 1 { // edge gets deleted + if a := edge.Args[0]; a != arg { // one arg + return fmt.Errorf("inconsistent arg: %s != %s", a, arg) + } + + if err := graphAPI.DeleteEdge(edge); err != nil { + return errwrap.Wrapf(err, "edge deletion error") + } + } else { + // just remove the one arg for now + edge.Args = newArgs + } + + // always free the database entry + if err := obj.FreeEdge(f1, f2, arg); err != nil { + return err + } + } + } + } + + // Now check the vertices... + vs := []interfaces.Func{} + for vertex, count := range obj.vertices { + if count != 0 { + continue + } + + // safety check, vertex is still in use by an edge + for x := range obj.edges { + if x.f1 == vertex || x.f2 == vertex { + // programming error + return fmt.Errorf("vertex unexpectedly still in use: %p %s", vertex, vertex) + } + } + + vs = append(vs, vertex) + } + + for _, vertex := range vs { + if err := graphAPI.DeleteVertex(vertex); err != nil { + return errwrap.Wrapf(err, "vertex deletion error") + } + // free the database entry + if err := obj.FreeVertex(vertex); err != nil { + return err + } + } + + return nil +} + +// makeEdge looks up an edge with the "hash" input we are seeking. If it doesn't +// find a match, it returns a new one with those fields. +func (obj *RefCount) makeEdge(f1, f2 interfaces.Func, arg string) *RefCountEdge { + for k := range obj.edges { + //if k == nil { // programming error + // continue + //} + if k.f1 == f1 && k.f2 == f2 && k.arg == arg { + return k + } + } + return &RefCountEdge{ // not found, so make a new one! + f1: f1, + f2: f2, + arg: arg, + } +} diff --git a/lang/funcs/dage/txn.go b/lang/funcs/dage/txn.go new file mode 100644 index 00000000..41a669cc --- /dev/null +++ b/lang/funcs/dage/txn.go @@ -0,0 +1,626 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +// Package dage implements a DAG function engine. +// TODO: can we rename this to something more interesting? +package dage + +import ( + "fmt" + "sort" + "sync" + + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/pgraph" +) + +// PostReverseCommit specifies that if we run Reverse, and we had previous items +// pending for Commit, that we should Commit them after our Reverse runs. +// Otherwise they remain on the pending queue and wait for you to run Commit. +const PostReverseCommit = false + +// GraphvizDebug enables writing graphviz graphs on each commit. This is very +// slow. +const GraphvizDebug = false + +// opapi is the input for any op. This allows us to keeps things compact and it +// also allows us to change API slightly without re-writing code. +type opapi struct { + GraphAPI interfaces.GraphAPI + RefCount *RefCount +} + +// opfn is an interface that holds the normal op, and the reverse op if we need +// to rollback from the forward fn. Implementations of each op can decide to +// store some internal state when running the forward op which might be needed +// for the possible future reverse op. +type opfn interface { + fmt.Stringer + + Fn(*opapi) error + Rev(*opapi) error +} + +type opfnSkipRev interface { + opfn + + // Skip tells us if this op should be skipped from reversing. + Skip() bool + + // SetSkip specifies that this op should be skipped from reversing. + SetSkip(bool) +} + +type opfnFlag interface { + opfn + + // Flag reads some misc data. + Flag() interface{} + + // SetFlag sets some misc data. + SetFlag(interface{}) +} + +// revOp returns the reversed op from an op by packing or unpacking it. +func revOp(op opfn) opfn { + if skipOp, ok := op.(opfnSkipRev); ok && skipOp.Skip() { + return nil // skip + } + + // XXX: is the reverse of a reverse just undoing it? maybe not but might not matter for us + if newOp, ok := op.(*opRev); ok { + + if newFlagOp, ok := op.(opfnFlag); ok { + newFlagOp.SetFlag("does this rev of rev even happen?") + } + + return newOp.Op // unpack it + } + + return &opRev{ + Op: op, + + opFlag: &opFlag{}, + } // pack it +} + +// opRev switches the Fn and Rev methods by wrapping the contained op in each +// other. +type opRev struct { + Op opfn + + *opFlag +} + +func (obj *opRev) Fn(opapi *opapi) error { + return obj.Op.Rev(opapi) +} + +func (obj *opRev) Rev(opapi *opapi) error { + return obj.Op.Fn(opapi) +} + +func (obj *opRev) String() string { + return "rev(" + obj.Op.String() + ")" // TODO: is this correct? +} + +type opSkip struct { + skip bool +} + +func (obj *opSkip) Skip() bool { + return obj.skip +} + +func (obj *opSkip) SetSkip(skip bool) { + obj.skip = skip +} + +type opFlag struct { + flag interface{} +} + +func (obj *opFlag) Flag() interface{} { + return obj.flag +} + +func (obj *opFlag) SetFlag(flag interface{}) { + obj.flag = flag +} + +type opAddVertex struct { + F interfaces.Func + + *opSkip + *opFlag +} + +func (obj *opAddVertex) Fn(opapi *opapi) error { + if opapi.RefCount.VertexInc(obj.F) { + // add if we're the first reference + return opapi.GraphAPI.AddVertex(obj.F) + } + + return nil +} + +func (obj *opAddVertex) Rev(opapi *opapi) error { + opapi.RefCount.VertexDec(obj.F) + // any removal happens in gc + return nil +} + +func (obj *opAddVertex) String() string { + return fmt.Sprintf("AddVertex: %+v", obj.F) +} + +type opAddEdge struct { + F1 interfaces.Func + F2 interfaces.Func + FE *interfaces.FuncEdge + + *opSkip + *opFlag +} + +func (obj *opAddEdge) Fn(opapi *opapi) error { + if obj.F1 == obj.F2 { // simplify below code/logic with this easy check + return fmt.Errorf("duplicate vertex cycle") + } + + opapi.RefCount.EdgeInc(obj.F1, obj.F2, obj.FE) + + fe := obj.FE // squish multiple edges together if one already exists + if edge := opapi.GraphAPI.FindEdge(obj.F1, obj.F2); edge != nil { + args := make(map[string]struct{}) + for _, x := range obj.FE.Args { + args[x] = struct{}{} + } + for _, x := range edge.Args { + args[x] = struct{}{} + } + if len(args) != len(obj.FE.Args)+len(edge.Args) { + // programming error + return fmt.Errorf("duplicate arg found") + } + newArgs := []string{} + for x := range args { + newArgs = append(newArgs, x) + } + sort.Strings(newArgs) // for consistency? + fe = &interfaces.FuncEdge{ + Args: newArgs, + } + } + + // The dage API currently smooshes together any existing edge args with + // our new edge arg names. It also adds the vertices if needed. + if err := opapi.GraphAPI.AddEdge(obj.F1, obj.F2, fe); err != nil { + return err + } + + return nil +} + +func (obj *opAddEdge) Rev(opapi *opapi) error { + opapi.RefCount.EdgeDec(obj.F1, obj.F2, obj.FE) + return nil +} + +func (obj *opAddEdge) String() string { + return fmt.Sprintf("AddEdge: %+v -> %+v (%+v)", obj.F1, obj.F2, obj.FE) +} + +type opDeleteVertex struct { + F interfaces.Func + + *opSkip + *opFlag +} + +func (obj *opDeleteVertex) Fn(opapi *opapi) error { + if opapi.RefCount.VertexDec(obj.F) { + //delete(opapi.RefCount.Vertices, obj.F) // don't GC this one + if err := opapi.RefCount.FreeVertex(obj.F); err != nil { + panic("could not free vertex") + } + return opapi.GraphAPI.DeleteVertex(obj.F) // do it here instead + } + return nil +} + +func (obj *opDeleteVertex) Rev(opapi *opapi) error { + if opapi.RefCount.VertexInc(obj.F) { + return opapi.GraphAPI.AddVertex(obj.F) + } + return nil +} + +func (obj *opDeleteVertex) String() string { + return fmt.Sprintf("DeleteVertex: %+v", obj.F) +} + +// graphTxn holds the state of a transaction and runs it when needed. When this +// has been setup and initialized, it implements the Txn API that can be used by +// functions in their Stream method to modify the function graph while it is +// "running". +type graphTxn struct { + // Lock is a handle to the lock function to call before the operation. + Lock func() + + // Unlock is a handle to the unlock function to call before the + // operation. + Unlock func() + + // GraphAPI is a handle pointing to the graph API implementation we're + // using for any txn operations. + GraphAPI interfaces.GraphAPI + + // RefCount keeps track of vertex and edge references across the entire + // graph. + RefCount *RefCount + + // FreeFunc is a function that will get called by a well-behaved user + // when we're done with this Txn. + FreeFunc func() + + // ops is a list of operations to run on a graph + ops []opfn + + // rev is a list of reverse operations to run on a graph + rev []opfn + + // mutex guards changes to the ops list + mutex *sync.Mutex +} + +// init must be called to initialized the struct before first use. This is +// private because the creator, not the user should run it. +func (obj *graphTxn) init() interfaces.Txn { + obj.ops = []opfn{} + obj.rev = []opfn{} + obj.mutex = &sync.Mutex{} + + return obj // return self so it can be called in a chain +} + +// Copy returns a new child Txn that has the same handles, but a separate state. +// This allows you to do an Add*/Commit/Reverse that isn't affected by a +// different user of this transaction. +// TODO: FreeFunc isn't well supported here. Replace or remove this entirely? +func (obj *graphTxn) Copy() interfaces.Txn { + txn := &graphTxn{ + Lock: obj.Lock, + Unlock: obj.Unlock, + GraphAPI: obj.GraphAPI, + RefCount: obj.RefCount, // this is shared across all txn's + // FreeFunc is shared with the parent. + } + return txn.init() +} + +// AddVertex adds a vertex to the running graph. The operation will get +// completed when Commit is run. +// XXX: should this be pgraph.Vertex instead of interfaces.Func ? +func (obj *graphTxn) AddVertex(f interfaces.Func) interfaces.Txn { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + opfn := &opAddVertex{ + F: f, + + opSkip: &opSkip{}, + opFlag: &opFlag{}, + } + obj.ops = append(obj.ops, opfn) + + return obj // return self so it can be called in a chain +} + +// AddEdge adds an edge to the running graph. The operation will get completed +// when Commit is run. +// XXX: should this be pgraph.Vertex instead of interfaces.Func ? +// XXX: should this be pgraph.Edge instead of *interfaces.FuncEdge ? +func (obj *graphTxn) AddEdge(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) interfaces.Txn { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + opfn := &opAddEdge{ + F1: f1, + F2: f2, + FE: fe, + + opSkip: &opSkip{}, + opFlag: &opFlag{}, + } + obj.ops = append(obj.ops, opfn) + + // NOTE: we can't build obj.rev yet because in this case, we'd need to + // know if the runtime graph contained one of the two pre-existing + // vertices or not, or if it would get added implicitly by this op! + + return obj // return self so it can be called in a chain +} + +// DeleteVertex adds a vertex to the running graph. The operation will get +// completed when Commit is run. +// XXX: should this be pgraph.Vertex instead of interfaces.Func ? +func (obj *graphTxn) DeleteVertex(f interfaces.Func) interfaces.Txn { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + opfn := &opDeleteVertex{ + F: f, + + opSkip: &opSkip{}, + opFlag: &opFlag{}, + } + obj.ops = append(obj.ops, opfn) + + return obj // return self so it can be called in a chain +} + +// AddGraph adds a graph to the running graph. The operation will get completed +// when Commit is run. This function panics if your graph contains vertices that +// are not of type interfaces.Func or if your edges are not of type +// *interfaces.FuncEdge. +func (obj *graphTxn) AddGraph(g *pgraph.Graph) interfaces.Txn { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + for _, v := range g.Vertices() { + f, ok := v.(interfaces.Func) + if !ok { + panic("not a Func") + } + //obj.AddVertex(f) // easy + opfn := &opAddVertex{ // replicate AddVertex + F: f, + + opSkip: &opSkip{}, + opFlag: &opFlag{}, + } + obj.ops = append(obj.ops, opfn) + } + + for v1, m := range g.Adjacency() { + f1, ok := v1.(interfaces.Func) + if !ok { + panic("not a Func") + } + for v2, e := range m { + f2, ok := v2.(interfaces.Func) + if !ok { + panic("not a Func") + } + fe, ok := e.(*interfaces.FuncEdge) + if !ok { + panic("not a *FuncEdge") + } + + //obj.AddEdge(f1, f2, fe) // easy + opfn := &opAddEdge{ // replicate AddEdge + F1: f1, + F2: f2, + FE: fe, + + opSkip: &opSkip{}, + opFlag: &opFlag{}, + } + obj.ops = append(obj.ops, opfn) + } + } + + return obj // return self so it can be called in a chain +} + +// commit runs the pending transaction. This is the lockless version that is +// only used internally. +func (obj *graphTxn) commit() error { + if len(obj.ops) == 0 { // nothing to do + return nil + } + + // TODO: Instead of requesting the below locks, it's conceivable that we + // could either write an engine that doesn't require pausing the graph + // with a lock, or one that doesn't in the specific case being changed + // here need locks. And then in theory we'd have improved performance + // from the function engine. For our function consumers, the Txn API + // would never need to change, so we don't break API! A simple example + // is the len(ops) == 0 one right above. A simplification, but shows we + // aren't forced to call the locks even when we get Commit called here. + + // Now request the lock from the actual graph engine. + obj.Lock() + defer obj.Unlock() + + // Now request the ref count mutex. This may seem redundant, but it's + // not. The above graph engine Lock might allow more than one commit + // through simultaneously depending on implementation. The actual count + // mathematics must not, and so it has a separate lock. We could lock it + // per-operation, but that would probably be a lot slower. + obj.RefCount.Lock() + defer obj.RefCount.Unlock() + + // TODO: we don't need to do this anymore, because the engine does it! + // Copy the graph structure, perform the ops, check we didn't add a + // cycle, and if it's safe, do the real thing. Otherwise error here. + //g := obj.Graph.Copy() // copy the graph structure + //for _, x := range obj.ops { + // x(g) // call it + //} + //if _, err := g.TopologicalSort(); err != nil { + // return errwrap.Wrapf(err, "topo sort failed in txn commit") + //} + // FIXME: is there anything else we should check? Should we type-check? + + // Now do it for real... + obj.rev = []opfn{} // clear it for safety + opapi := &opapi{ + GraphAPI: obj.GraphAPI, + RefCount: obj.RefCount, + } + for _, op := range obj.ops { + if err := op.Fn(opapi); err != nil { // call it + // something went wrong (we made a cycle?) + obj.rev = []opfn{} // clear it, we didn't succeed + return err + } + + op = revOp(op) // reverse the op! + if op != nil { + obj.rev = append(obj.rev, op) // add the reverse op + //obj.rev = append([]opfn{op}, obj.rev...) // add to front + } + } + obj.ops = []opfn{} // clear it + + // garbage collect anything that hit zero! + // XXX: add gc function to this struct and pass in opapi instead? + if err := obj.RefCount.GC(obj.GraphAPI); err != nil { + // programming error or ghosts + return err + } + + // XXX: running this on each commit has a huge performance hit. + // XXX: we could write out the .dot files and run graphviz afterwards + if engine, ok := obj.GraphAPI.(*Engine); ok && GraphvizDebug { + //d := time.Now().Unix() + //if err := engine.graph.ExecGraphviz(fmt.Sprintf("/tmp/txn-graphviz-%d.dot", d)); err != nil { + // panic("no graphviz") + //} + if err := engine.Graphviz(""); err != nil { + panic(err) // XXX: improve me + } + + //gv := &pgraph.Graphviz{ + // Filename: fmt.Sprintf("/tmp/txn-graphviz-%d.dot", d), + // Graphs: map[*pgraph.Graph]*pgraph.GraphvizOpts{ + // engine.graph: nil, + // }, + //} + //if err := gv.Exec(); err != nil { + // panic("no graphviz") + //} + } + return nil +} + +// Commit runs the pending transaction. If there was a pending reverse +// transaction that could have run (it would have been available following a +// Commit success) then this will erase that transaction. Usually you run cycles +// of Commit, followed by Reverse, or only Commit. (You obviously have to +// populate operations before the Commit is run.) +func (obj *graphTxn) Commit() error { + // Lock our internal state mutex first... this prevents other AddVertex + // or similar calls from interferring with our work here. + obj.mutex.Lock() + defer obj.mutex.Unlock() + + return obj.commit() +} + +// Clear erases any pending transactions that weren't committed yet. +func (obj *graphTxn) Clear() { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + obj.ops = []opfn{} // clear it +} + +// Reverse is like Commit, but it commits the reverse transaction to the one +// that previously ran with Commit. If the PostReverseCommit global has been set +// then if there were pending commit operations when this was run, then they are +// run at the end of a successful Reverse. It is generally recommended to not +// queue any operations for Commit if you plan on doing a Reverse, or to run a +// Clear before running Reverse if you want to discard the pending commits. +func (obj *graphTxn) Reverse() error { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + // first commit all the rev stuff... and then run the pending ops... + + ops := []opfn{} // save a copy + for _, op := range obj.ops { // copy + ops = append(ops, op) + } + obj.ops = []opfn{} // clear + + //for _, op := range obj.rev + for i := len(obj.rev) - 1; i >= 0; i-- { // copy in the rev stuff to commit! + op := obj.rev[i] + // mark these as being not reversible (so skip them on reverse!) + if skipOp, ok := op.(opfnSkipRev); ok { + skipOp.SetSkip(true) + } + obj.ops = append(obj.ops, op) + } + + //rev := []func(interfaces.GraphAPI){} // for the copy + //for _, op := range obj.rev { // copy + // rev = append(rev, op) + //} + obj.rev = []opfn{} // clear + + //rollback := func() { + // //for _, op := range rev { // from our safer copy + // //for _, op := range obj.ops { // copy back out the rev stuff + // for i := len(obj.ops) - 1; i >= 0; i-- { // copy in the rev stuff to commit! + // op := obj.rev[i] + // obj.rev = append(obj.rev, op) + // } + // obj.ops = []opfn{} // clear + // for _, op := range ops { // copy the original ops back in + // obj.ops = append(obj.ops, op) + // } + //} + // first commit the reverse stuff + if err := obj.commit(); err != nil { // lockless version + // restore obj.rev and obj.ops + //rollback() // probably not needed + return err + } + + // then if we had normal ops queued up, run those or at least restore... + for _, op := range ops { // copy + obj.ops = append(obj.ops, op) + } + + if PostReverseCommit { + return obj.commit() // lockless version + } + + return nil +} + +// Erase removes the historical information that Reverse would run after Commit. +func (obj *graphTxn) Erase() { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + obj.rev = []opfn{} // clear it +} + +// Free releases the wait group that was used to lock around this Txn if needed. +// It should get called when we're done with any Txn. +// TODO: this is only used for the initial Txn. Consider expanding it's use. We +// might need to allow Clear to call it as part of the clearing. +func (obj *graphTxn) Free() { + if obj.FreeFunc != nil { + obj.FreeFunc() + } +} diff --git a/lang/funcs/dage/txn_test.go b/lang/funcs/dage/txn_test.go new file mode 100644 index 00000000..d615eead --- /dev/null +++ b/lang/funcs/dage/txn_test.go @@ -0,0 +1,503 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//go:build !root + +package dage + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/purpleidea/mgmt/lang/interfaces" + "github.com/purpleidea/mgmt/pgraph" + "github.com/purpleidea/mgmt/util" +) + +type testGraphAPI struct { + graph *pgraph.Graph +} + +func (obj *testGraphAPI) AddVertex(f interfaces.Func) error { + v, ok := f.(pgraph.Vertex) + if !ok { + return fmt.Errorf("can't use func as vertex") + } + obj.graph.AddVertex(v) + return nil +} +func (obj *testGraphAPI) AddEdge(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) error { + v1, ok := f1.(pgraph.Vertex) + if !ok { + return fmt.Errorf("can't use func as vertex") + } + v2, ok := f2.(pgraph.Vertex) + if !ok { + return fmt.Errorf("can't use func as vertex") + } + obj.graph.AddEdge(v1, v2, fe) + return nil +} + +func (obj *testGraphAPI) DeleteVertex(f interfaces.Func) error { + v, ok := f.(pgraph.Vertex) + if !ok { + return fmt.Errorf("can't use func as vertex") + } + obj.graph.DeleteVertex(v) + return nil +} + +func (obj *testGraphAPI) DeleteEdge(fe *interfaces.FuncEdge) error { + obj.graph.DeleteEdge(fe) + return nil +} + +//func (obj *testGraphAPI) AddGraph(*pgraph.Graph) error { +// return fmt.Errorf("not implemented") +//} + +//func (obj *testGraphAPI) Adjacency() map[interfaces.Func]map[interfaces.Func]*interfaces.FuncEdge { +// panic("not implemented") +//} + +func (obj *testGraphAPI) HasVertex(f interfaces.Func) bool { + v, ok := f.(pgraph.Vertex) + if !ok { + panic("can't use func as vertex") + } + return obj.graph.HasVertex(v) +} + +func (obj *testGraphAPI) LookupEdge(fe *interfaces.FuncEdge) (interfaces.Func, interfaces.Func, bool) { + v1, v2, b := obj.graph.LookupEdge(fe) + if !b { + return nil, nil, b + } + + f1, ok := v1.(interfaces.Func) + if !ok { + panic("can't use vertex as func") + } + f2, ok := v2.(interfaces.Func) + if !ok { + panic("can't use vertex as func") + } + return f1, f2, true +} + +func (obj *testGraphAPI) FindEdge(f1, f2 interfaces.Func) *interfaces.FuncEdge { + edge := obj.graph.FindEdge(f1, f2) + if edge == nil { + return nil + } + fe, ok := edge.(*interfaces.FuncEdge) + if !ok { + panic("edge is not a FuncEdge") + } + + return fe +} + +type testNullFunc struct { + name string +} + +func (obj *testNullFunc) String() string { return obj.name } +func (obj *testNullFunc) Info() *interfaces.Info { return nil } +func (obj *testNullFunc) Validate() error { return nil } +func (obj *testNullFunc) Init(*interfaces.Init) error { return nil } +func (obj *testNullFunc) Stream(context.Context) error { return nil } + +func TestTxn1(t *testing.T) { + graph, err := pgraph.NewGraph("test") + if err != nil { + t.Errorf("err: %+v", err) + return + } + testGraphAPI := &testGraphAPI{graph: graph} + mutex := &sync.Mutex{} + + graphTxn := &graphTxn{ + GraphAPI: testGraphAPI, + Lock: mutex.Lock, + Unlock: mutex.Unlock, + RefCount: (&RefCount{}).Init(), + } + txn := graphTxn.init() + + f1 := &testNullFunc{"f1"} + + if err := txn.AddVertex(f1).Commit(); err != nil { + t.Errorf("commit err: %+v", err) + return + } + + if l, i := len(graph.Adjacency()), 1; l != i { + t.Errorf("got len of: %d", l) + t.Errorf("exp len of: %d", i) + return + } + + if err := txn.Reverse(); err != nil { + t.Errorf("reverse err: %+v", err) + return + } + + if l, i := len(graph.Adjacency()), 0; l != i { + t.Errorf("got len of: %d", l) + t.Errorf("exp len of: %d", i) + return + } +} + +type txnTestOp func(*pgraph.Graph, interfaces.Txn) error + +func TestTxnTable(t *testing.T) { + + type test struct { // an individual test + name string + actions []txnTestOp + } + testCases := []test{} + { + f1 := &testNullFunc{"f1"} + + testCases = append(testCases, test{ + name: "simple add vertex", + actions: []txnTestOp{ + //func(g *pgraph.Graph, txn interfaces.Txn) error { + // txn.AddVertex(f1) + // return nil + //}, + //func(g *pgraph.Graph, txn interfaces.Txn) error { + // return txn.Commit() + //}, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.AddVertex(f1).Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 1; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Reverse() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 0; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + }, + }) + } + { + f1 := &testNullFunc{"f1"} + f2 := &testNullFunc{"f2"} + e1 := testEdge("e1") + + testCases = append(testCases, test{ + name: "simple add edge", + actions: []txnTestOp{ + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.AddEdge(f1, f2, e1).Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 2; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 1; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Reverse() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 0; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 0; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + }, + }) + } + { + f1 := &testNullFunc{"f1"} + f2 := &testNullFunc{"f2"} + e1 := testEdge("e1") + + testCases = append(testCases, test{ + name: "simple add edge two-step", + actions: []txnTestOp{ + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.AddVertex(f1).Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.AddEdge(f1, f2, e1).Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 2; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 1; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + // Reverse only undoes what happened since the + // previous commit, so only one of the nodes is + // left at the end. + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Reverse() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 1; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 0; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + }, + }) + } + { + f1 := &testNullFunc{"f1"} + f2 := &testNullFunc{"f2"} + e1 := testEdge("e1") + + testCases = append(testCases, test{ + name: "simple two add edge, reverse", + actions: []txnTestOp{ + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.AddVertex(f1).AddVertex(f2).Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.AddEdge(f1, f2, e1).Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 2; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 1; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + // Reverse only undoes what happened since the + // previous commit, so only one of the nodes is + // left at the end. + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Reverse() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 2; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 0; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + }, + }) + } + { + f1 := &testNullFunc{"f1"} + f2 := &testNullFunc{"f2"} + f3 := &testNullFunc{"f3"} + f4 := &testNullFunc{"f4"} + e1 := testEdge("e1") + e2 := testEdge("e2") + e3 := testEdge("e3") + e4 := testEdge("e4") + + testCases = append(testCases, test{ + name: "simple add/delete", + actions: []txnTestOp{ + func(g *pgraph.Graph, txn interfaces.Txn) error { + txn.AddVertex(f1).AddEdge(f1, f2, e1) + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + txn.AddVertex(f1).AddEdge(f1, f3, e2) + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 3; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 2; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + txn.AddEdge(f2, f4, e3) + txn.AddEdge(f3, f4, e4) + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Commit() + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 4; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 4; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + // debug + //func(g *pgraph.Graph, txn interfaces.Txn) error { + // fileName := "/tmp/graphviz-txn1.dot" + // if err := g.ExecGraphviz(fileName); err != nil { + // return fmt.Errorf("writing graph failed at: %s, err: %+v", fileName, err) + // } + // return nil + //}, + func(g *pgraph.Graph, txn interfaces.Txn) error { + return txn.Reverse() + }, + // debug + //func(g *pgraph.Graph, txn interfaces.Txn) error { + // fileName := "/tmp/graphviz-txn2.dot" + // if err := g.ExecGraphviz(fileName); err != nil { + // return fmt.Errorf("writing graph failed at: %s, err: %+v", fileName, err) + // } + // return nil + //}, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Adjacency()), 3; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + func(g *pgraph.Graph, txn interfaces.Txn) error { + if l, i := len(g.Edges()), 2; l != i { + return fmt.Errorf("got len of: %d, exp len of: %d", l, i) + } + return nil + }, + }, + }) + } + if testing.Short() { + t.Logf("available tests:") + } + names := []string{} + for index, tc := range testCases { // run all the tests + if tc.name == "" { + t.Errorf("test #%d: not named", index) + continue + } + if util.StrInList(tc.name, names) { + t.Errorf("test #%d: duplicate sub test name of: %s", index, tc.name) + continue + } + names = append(names, tc.name) + + //if index != 3 { // hack to run a subset (useful for debugging) + //if tc.name != "simple txn" { + // continue + //} + + testName := fmt.Sprintf("test #%d (%s)", index, tc.name) + if testing.Short() { // make listing tests easier + t.Logf("%s", testName) + continue + } + t.Run(testName, func(t *testing.T) { + name, actions := tc.name, tc.actions + + t.Logf("\n\ntest #%d (%s) ----------------\n\n", index, name) + + //logf := func(format string, v ...interface{}) { + // t.Logf(fmt.Sprintf("test #%d", index)+": "+format, v...) + //} + + graph, err := pgraph.NewGraph("test") + if err != nil { + t.Errorf("err: %+v", err) + return + } + testGraphAPI := &testGraphAPI{graph: graph} + mutex := &sync.Mutex{} + + graphTxn := &graphTxn{ + GraphAPI: testGraphAPI, + Lock: mutex.Lock, + Unlock: mutex.Unlock, + RefCount: (&RefCount{}).Init(), + } + txn := graphTxn.init() + + // Run a list of actions, passing the returned txn (if + // any) to the next action. Any error kills it all. + for i, action := range actions { + if err := action(graph, txn); err != nil { + t.Errorf("test #%d: FAIL", index) + t.Errorf("test #%d: action #%d failed with: %+v", index, i, err) + return + } + } + }) + } + + if testing.Short() { + t.Skip("skipping all tests...") + } +} diff --git a/lang/funcs/dage/util_test.go b/lang/funcs/dage/util_test.go new file mode 100644 index 00000000..b241f9b8 --- /dev/null +++ b/lang/funcs/dage/util_test.go @@ -0,0 +1,30 @@ +// Mgmt +// Copyright (C) 2013-2023+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//go:build !root + +package dage + +import ( + "github.com/purpleidea/mgmt/lang/interfaces" +) + +func testEdge(name string) *interfaces.FuncEdge { + return &interfaces.FuncEdge{ + Args: []string{name}, + } +} diff --git a/lang/interfaces/func.go b/lang/interfaces/func.go index 80a2d4b9..62acda0a 100644 --- a/lang/interfaces/func.go +++ b/lang/interfaces/func.go @@ -24,6 +24,7 @@ import ( "github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/lang/types" + "github.com/purpleidea/mgmt/pgraph" ) // FuncSig is the simple signature that is used throughout our implementations. @@ -45,8 +46,20 @@ type Info struct { type Init struct { Hostname string // uuid for the host //Noop bool - Input chan types.Value // Engine will close `input` chan - Output chan types.Value // Stream must close `output` chan + + // Input is where a chan (stream) of values will get sent to this node. + // The engine will close this `input` chan. + Input chan types.Value + + // Output is the chan (stream) of values to get sent out from this node. + // The Stream function must close this `output` chan. + Output chan types.Value + + // Txn provides a transaction API that can be used to modify the + // function graph while it is "running". This should not be used by most + // nodes, and when it is used, it should be used carefully. + Txn Txn + // TODO: should we pass in a *Scope here for functions like template() ? World engine.World Debug bool @@ -230,3 +243,93 @@ type FuncEdge struct { func (obj *FuncEdge) String() string { return strings.Join(obj.Args, ", ") } + +// GraphAPI is a subset of the available graph operations that are possible on a +// pgraph that is used for storing functions. The minimum subset are those which +// are needed for implementing the Txn interface. +type GraphAPI interface { + AddVertex(Func) error + AddEdge(Func, Func, *FuncEdge) error + DeleteVertex(Func) error + DeleteEdge(*FuncEdge) error + //AddGraph(*pgraph.Graph) error + + //Adjacency() map[Func]map[Func]*FuncEdge + HasVertex(Func) bool + FindEdge(Func, Func) *FuncEdge + LookupEdge(*FuncEdge) (Func, Func, bool) +} + +// Txn is the interface that the engine graph API makes available so that +// functions can modify the function graph dynamically while it is "running". +// This could be implemented in one of two methods. +// +// Method 1: Have a pair of graph Lock and Unlock methods. Queue up the work to +// do and when we "commit" the transaction, we're just queuing up the work to do +// and then we run it all surrounded by the lock. +// +// Method 2: It's possible that we might eventually be able to actually modify +// the running graph without even causing it to pause at all. In this scenario, +// the "commit" would just directly perform those operations without even using +// the Lock and Unlock mutex operations. This is why we don't expose those in +// the API. It's also safer because someone can't forget to run Unlock which +// would block the whole code base. +type Txn interface { + // AddVertex adds a vertex to the running graph. The operation will get + // completed when Commit is run. + AddVertex(Func) Txn + + // AddEdge adds an edge to the running graph. The operation will get + // completed when Commit is run. + AddEdge(Func, Func, *FuncEdge) Txn + + // DeleteVertex removes a vertex from the running graph. The operation + // will get completed when Commit is run. + DeleteVertex(Func) Txn + + // DeleteEdge removes an edge from the running graph. It removes the + // edge that is found between the two input vertices. The operation will + // get completed when Commit is run. The edge is part of the signature + // so that it is both symmetrical with AddEdge, and also easier to + // reverse in theory. + // NOTE: This is not supported since there's no sane Reverse with GC. + // XXX: Add this in but just don't let it be reversible? + //DeleteEdge(Func, Func, *FuncEdge) Txn + + // AddGraph adds a graph to the running graph. The operation will get + // completed when Commit is run. This function panics if your graph + // contains vertices that are not of type interfaces.Func or if your + // edges are not of type *interfaces.FuncEdge. + AddGraph(*pgraph.Graph) Txn + + // Commit runs the pending transaction. + Commit() error + + // Clear erases any pending transactions that weren't committed yet. + Clear() + + // Reverse runs the reverse commit of the last successful operation to + // Commit. AddVertex is reversed by DeleteVertex, and vice-versa, and + // the same for AddEdge and DeleteEdge. Keep in mind that if AddEdge is + // called with either vertex not already part of the graph, it will + // implicitly add them, but the Reverse operation will not necessarily + // know that. As a result, it's recommended to not perform operations + // that have implicit Adds or Deletes. Notwithstanding the above, the + // initial Txn implementation can and does try to track these changes + // so that it can correctly reverse them, but this is not guaranteed by + // API, and it could contain bugs. + Reverse() error + + // Erase removes the historical information that Reverse would run after + // Commit. + Erase() + + // Free releases the wait group that was used to lock around this Txn if + // needed. It should get called when we're done with any Txn. + Free() + + // Copy returns a new child Txn that has the same handles, but a + // separate state. This allows you to do an Add*/Commit/Reverse that + // isn't affected by a different user of this transaction. + Copy() Txn +}