// Mgmt // Copyright (C) 2013-2023+ James Shubin and the project contributors // Written by James Shubin and the project contributors // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see . // Package dage implements a DAG function engine. // TODO: can we rename this to something more interesting? package dage import ( "context" "fmt" "os" "sort" "strings" "sync" "time" "github.com/purpleidea/mgmt/engine" "github.com/purpleidea/mgmt/engine/local" "github.com/purpleidea/mgmt/lang/funcs/structs" "github.com/purpleidea/mgmt/lang/interfaces" "github.com/purpleidea/mgmt/lang/types" "github.com/purpleidea/mgmt/pgraph" "github.com/purpleidea/mgmt/util/errwrap" ) // Engine implements a dag engine which lets us "run" a dag of functions, but // also allows us to modify it while we are running. type Engine struct { // Name is the name used for the instance of the engine and in the graph // that is held within it. Name string Hostname string Local *local.API World engine.World Debug bool Logf func(format string, v ...interface{}) // Callback can be specified as an alternative to using the Stream // method to get events. If the context on it is cancelled, then it must // shutdown quickly, because this means we are closing and want to // disconnect. Whether you want to respect that is up to you, but the // engine will not be able to close until you do. If specified, and an // error has occurred, it will set that error property. Callback func(context.Context, error) graph *pgraph.Graph // guarded by graphMutex table map[interfaces.Func]types.Value // guarded by tableMutex state map[interfaces.Func]*state // graphMutex wraps access to the table map. graphMutex *sync.Mutex // TODO: &sync.RWMutex{} ? // tableMutex wraps access to the table map. tableMutex *sync.RWMutex // refCount keeps track of vertex and edge references across the entire // graph. refCount *RefCount // wgTxn blocks shutdown until the initial Txn has Reversed. wgTxn *sync.WaitGroup // firstTxn checks to make sure wgTxn is only used for the first Txn. firstTxn bool wg *sync.WaitGroup // pause/resume state machine signals pauseChan chan struct{} pausedChan chan struct{} resumeChan chan struct{} resumedChan chan struct{} // resend tracks which new nodes might need a new notification resend map[interfaces.Func]struct{} // nodeWaitFns is a list of cleanup functions to run after we've begun // resume, but before we've resumed completely. These are actions that // we would like to do when paused from a deleteVertex operation, but // that would deadlock if we did. nodeWaitFns []func() // nodeWaitMutex wraps access to the nodeWaitFns list. nodeWaitMutex *sync.Mutex // streamChan is used to send notifications to the outside world. streamChan chan error loaded bool // are all of the funcs loaded? loadedChan chan struct{} // funcs loaded signal startedChan chan struct{} // closes when Run() starts // wakeChan contains a message when someone has asked for us to wake up. wakeChan chan struct{} // ag is the aggregation channel which cues up outgoing events. ag chan error // leafSend specifies if we should do an ag send because we have // activity at a leaf. leafSend bool // isClosed tracks nodes that have closed. This list is purged as they // are removed from the graph. isClosed map[*state]struct{} // activity tracks nodes that are ready to send to ag. The main process // loop decides if we have the correct set to do so. A corresponding // value of true means we have regular activity, and a value of false // means the node closed. activity map[*state]struct{} // stateMutex wraps access to the isClosed and activity maps. stateMutex *sync.Mutex // stats holds some statistics and other debugging information. stats *stats // guarded by statsMutex // statsMutex wraps access to the stats data. statsMutex *sync.RWMutex // graphvizMutex wraps access to the Graphviz method. graphvizMutex *sync.Mutex // graphvizCount keeps a running tally of how many graphs we've // generated. This is useful for displaying a sequence (timeline) of // graphs in a linear order. graphvizCount int64 // graphvizDirectory stores the generated path for outputting graphviz // files if one is not specified at runtime. graphvizDirectory string } // Setup sets up the internal datastructures needed for this engine. func (obj *Engine) Setup() error { var err error obj.graph, err = pgraph.NewGraph(obj.Name) if err != nil { return err } obj.table = make(map[interfaces.Func]types.Value) obj.state = make(map[interfaces.Func]*state) obj.graphMutex = &sync.Mutex{} // TODO: &sync.RWMutex{} ? obj.tableMutex = &sync.RWMutex{} obj.refCount = (&RefCount{}).Init() obj.wgTxn = &sync.WaitGroup{} obj.wg = &sync.WaitGroup{} obj.pauseChan = make(chan struct{}) obj.pausedChan = make(chan struct{}) obj.resumeChan = make(chan struct{}) obj.resumedChan = make(chan struct{}) obj.resend = make(map[interfaces.Func]struct{}) obj.nodeWaitFns = []func(){} obj.nodeWaitMutex = &sync.Mutex{} obj.streamChan = make(chan error) obj.loadedChan = make(chan struct{}) obj.startedChan = make(chan struct{}) obj.wakeChan = make(chan struct{}, 1) // hold up to one message obj.ag = make(chan error) obj.isClosed = make(map[*state]struct{}) obj.activity = make(map[*state]struct{}) obj.stateMutex = &sync.Mutex{} obj.stats = &stats{ runningList: make(map[*state]struct{}), loadedList: make(map[*state]bool), inputList: make(map[*state]int64), } obj.statsMutex = &sync.RWMutex{} obj.graphvizMutex = &sync.Mutex{} return nil } // Cleanup cleans up and frees memory and resources after everything is done. func (obj *Engine) Cleanup() error { obj.wg.Wait() // don't cleanup these before Run() finished close(obj.pauseChan) // free close(obj.pausedChan) close(obj.resumeChan) close(obj.resumedChan) return nil } // Txn returns a transaction that is suitable for adding and removing from the // graph. You must run Setup before this method is called. func (obj *Engine) Txn() interfaces.Txn { if obj.refCount == nil { panic("you must run setup before first use") } // The very first initial Txn must have a wait group to make sure if we // shutdown (in error) that we can Reverse things before the Lock/Unlock // loop shutsdown. var free func() if !obj.firstTxn { obj.firstTxn = true obj.wgTxn.Add(1) free = func() { obj.wgTxn.Done() } } return (&graphTxn{ Lock: obj.Lock, Unlock: obj.Unlock, GraphAPI: obj, RefCount: obj.refCount, // reference counting FreeFunc: free, }).init() } // addVertex is the lockless version of the AddVertex function. This is needed // so that AddEdge can add two vertices within the same lock. func (obj *Engine) addVertex(f interfaces.Func) error { if _, exists := obj.state[f]; exists { // don't err dupes, because it makes using the AddEdge API yucky return nil } // add some extra checks for easier debugging if f == nil { return fmt.Errorf("missing func") } if f.Info() == nil { return fmt.Errorf("missing func info") } sig := f.Info().Sig if sig == nil { return fmt.Errorf("missing func sig") } if sig.Kind != types.KindFunc { return fmt.Errorf("must be kind func") } if err := f.Validate(); err != nil { return errwrap.Wrapf(err, "node did not Validate") } input := make(chan types.Value) output := make(chan types.Value) txn := obj.Txn() // This is the one of two places where we modify this map. To avoid // concurrent writes, we only do this when we're locked! Anywhere that // can read where we are locked must have a mutex around it or do the // lookup when we're in an unlocked state. node := &state{ Func: f, name: f.String(), // cache a name to avoid locks input: input, output: output, txn: txn, running: false, wg: &sync.WaitGroup{}, rwmutex: &sync.RWMutex{}, } init := &interfaces.Init{ Hostname: obj.Hostname, Input: node.input, Output: node.output, Txn: node.txn, Local: obj.Local, World: obj.World, Debug: obj.Debug, Logf: func(format string, v ...interface{}) { // safe Logf in case f.String contains %? chars... s := f.String() + ": " + fmt.Sprintf(format, v...) obj.Logf("%s", s) }, } if err := f.Init(init); err != nil { return err } // only now, do we modify the graph obj.state[f] = node obj.graph.AddVertex(f) return nil } // AddVertex is the thread-safe way to add a vertex. You will need to call the // engine Lock method before using this and the Unlock method afterwards. func (obj *Engine) AddVertex(f interfaces.Func) error { obj.graphMutex.Lock() defer obj.graphMutex.Unlock() if obj.Debug { obj.Logf("Engine:AddVertex: %p %s", f, f) } return obj.addVertex(f) // lockless version } // AddEdge is the thread-safe way to add an edge. You will need to call the // engine Lock method before using this and the Unlock method afterwards. This // will automatically run AddVertex on both input vertices if they are not // already part of the graph. You should only create DAG's as this function // engine cannot handle cycles and this method will error if you cause a cycle. func (obj *Engine) AddEdge(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) error { obj.graphMutex.Lock() defer obj.graphMutex.Unlock() if obj.Debug { obj.Logf("Engine:AddEdge %p %s: %p %s -> %p %s", fe, fe, f1, f1, f2, f2) } // safety check to avoid cycles g := obj.graph.Copy() //g.AddVertex(f1) //g.AddVertex(f2) g.AddEdge(f1, f2, fe) if _, err := g.TopologicalSort(); err != nil { return err // not a dag } // if we didn't cycle, we can modify the real graph safely... // Does the graph already have these nodes in it? hasf1 := obj.graph.HasVertex(f1) //hasf2 := obj.graph.HasVertex(f2) if err := obj.addVertex(f1); err != nil { // lockless version return err } if err := obj.addVertex(f2); err != nil { // rollback f1 on error of f2 obj.deleteVertex(f1) // ignore any error return err } // If f1 doesn't exist, let f1 (or it's incoming nodes) get the notify. // If f2 is new, then it should get a new notification unless f1 is new. // But there's no guarantee we didn't AddVertex(f2); AddEdge(f1, f2, e), // so resend if f1 already exists. Otherwise it's not a new notification. // previously: `if hasf1 && !hasf2` if hasf1 { obj.resend[f2] = struct{}{} // resend notification to me } obj.graph.AddEdge(f1, f2, fe) // replaces any existing edge here // This shouldn't error, since the test graph didn't find a cycle. if _, err := obj.graph.TopologicalSort(); err != nil { // programming error panic(err) // not a dag } return nil } // deleteVertex is the lockless version of the DeleteVertex function. This is // needed so that AddEdge can add two vertices within the same lock. It needs // deleteVertex so it can rollback the first one if the second addVertex fails. func (obj *Engine) deleteVertex(f interfaces.Func) error { node, exists := obj.state[f] if !exists { return fmt.Errorf("vertex %p %s doesn't exist", f, f) } if node.running { // cancel the running vertex node.cancel() // cancel inner ctx // We store this work to be performed later on in the main loop // because this Wait() might be blocked by a defer Commit, which // is itself blocked because this deleteVertex operation is part // of a Commit. obj.nodeWaitMutex.Lock() obj.nodeWaitFns = append(obj.nodeWaitFns, func() { node.wg.Wait() // While waiting, the Stream might cause a new Reverse Commit node.txn.Free() // Clean up when done! obj.stateMutex.Lock() delete(obj.isClosed, node) // avoid memory leak obj.stateMutex.Unlock() }) obj.nodeWaitMutex.Unlock() } // This is the one of two places where we modify this map. To avoid // concurrent writes, we only do this when we're locked! Anywhere that // can read where we are locked must have a mutex around it or do the // lookup when we're in an unlocked state. delete(obj.state, f) obj.graph.DeleteVertex(f) return nil } // DeleteVertex is the thread-safe way to delete a vertex. You will need to call // the engine Lock method before using this and the Unlock method afterwards. func (obj *Engine) DeleteVertex(f interfaces.Func) error { obj.graphMutex.Lock() defer obj.graphMutex.Unlock() if obj.Debug { obj.Logf("Engine:DeleteVertex: %p %s", f, f) } return obj.deleteVertex(f) // lockless version } // DeleteEdge is the thread-safe way to delete an edge. You will need to call // the engine Lock method before using this and the Unlock method afterwards. func (obj *Engine) DeleteEdge(fe *interfaces.FuncEdge) error { obj.graphMutex.Lock() defer obj.graphMutex.Unlock() if obj.Debug { f1, f2, found := obj.graph.LookupEdge(fe) if found { obj.Logf("Engine:DeleteEdge: %p %s -> %p %s", f1, f1, f2, f2) } else { obj.Logf("Engine:DeleteEdge: not found %p %s", fe, fe) } } // Don't bother checking if edge exists first and don't error if it // doesn't because it might have gotten deleted when a vertex did, and // so there's no need to complain for nothing. obj.graph.DeleteEdge(fe) return nil } // HasVertex is the thread-safe way to check if a vertex exists in the graph. // You will need to call the engine Lock method before using this and the Unlock // method afterwards. func (obj *Engine) HasVertex(f interfaces.Func) bool { obj.graphMutex.Lock() // XXX: should this be a RLock? defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? return obj.graph.HasVertex(f) } // LookupEdge is the thread-safe way to check which vertices (if any) exist // between an edge in the graph. You will need to call the engine Lock method // before using this and the Unlock method afterwards. func (obj *Engine) LookupEdge(fe *interfaces.FuncEdge) (interfaces.Func, interfaces.Func, bool) { obj.graphMutex.Lock() // XXX: should this be a RLock? defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? v1, v2, found := obj.graph.LookupEdge(fe) if !found { return nil, nil, found } f1, ok := v1.(interfaces.Func) if !ok { panic("not a Func") } f2, ok := v2.(interfaces.Func) if !ok { panic("not a Func") } return f1, f2, found } // FindEdge is the thread-safe way to check which edge (if any) exists between // two vertices in the graph. This is an important method in edge removal, // because it's what you really need to know for DeleteEdge to work. Requesting // a specific deletion isn't very sensical in this library when specified as the // edge pointer, since we might replace it with a new edge that has new arg // names. Instead, use this to look up what relationship you want, and then // DeleteEdge to remove it. You will need to call the engine Lock method before // using this and the Unlock method afterwards. func (obj *Engine) FindEdge(f1, f2 interfaces.Func) *interfaces.FuncEdge { obj.graphMutex.Lock() // XXX: should this be a RLock? defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? edge := obj.graph.FindEdge(f1, f2) if edge == nil { return nil } fe, ok := edge.(*interfaces.FuncEdge) if !ok { panic("edge is not a FuncEdge") } return fe } // Lock must be used before modifying the running graph. Make sure to Unlock // when done. // XXX: should Lock take a context if we want to bail mid-way? // TODO: could we replace pauseChan with SubscribedSignal ? func (obj *Engine) Lock() { // pause select { case obj.pauseChan <- struct{}{}: } //obj.rwmutex.Lock() // TODO: or should it go right before pauseChan? // waiting for the pause to move to paused... select { case <-obj.pausedChan: } // this mutex locks at start of Run() and unlocks at finish of Run() obj.graphMutex.Unlock() // safe to make changes now } // Unlock must be used after modifying the running graph. Make sure to Lock // beforehand. // XXX: should Unlock take a context if we want to bail mid-way? func (obj *Engine) Unlock() { // resume // this mutex locks at start of Run() and unlocks at finish of Run() obj.graphMutex.Lock() // no more changes are allowed select { case obj.resumeChan <- struct{}{}: } //obj.rwmutex.Unlock() // TODO: or should it go right after resumedChan? // waiting for the resume to move to resumed... select { case <-obj.resumedChan: } } // wake sends a message to the wake queue to wake up the main process function // which would otherwise spin unnecessarily. This can be called anytime, and // doesn't hurt, it only wastes cpu if there's nothing to do. This does NOT ever // block, and that's important so it can be called from anywhere. func (obj *Engine) wake(name string) { // The mutex guards the len check to avoid this function sending two // messages down the channel, because the second would block if the // consumer isn't fast enough. This mutex makes this method effectively // asynchronous. //obj.wakeMutex.Lock() //defer obj.wakeMutex.Unlock() //if len(obj.wakeChan) > 0 { // collapse duplicate, pending wake signals // return //} select { case obj.wakeChan <- struct{}{}: // send to chan of length 1 if obj.Debug { obj.Logf("wake sent from: %s", name) } default: // this is a cheap alternative to avoid the mutex altogether! if obj.Debug { obj.Logf("wake skip from: %s", name) } // skip sending, we already have a message pending! } } // runNodeWaitFns is a helper to run the cleanup nodeWaitFns list. It clears the // list after it runs. func (obj *Engine) runNodeWaitFns() { // The lock is probably not needed here, but it won't hurt either. obj.nodeWaitMutex.Lock() defer obj.nodeWaitMutex.Unlock() for _, fn := range obj.nodeWaitFns { fn() } obj.nodeWaitFns = []func(){} // clear } // process is the inner loop that runs through the entire graph. It can be // called successively safely, as it is roughly idempotent, and is used to push // values through the graph. If it is interrupted, it can pick up where it left // off on the next run. This does however require it to re-check some things, // but that is the price we pay for being always available to unblock. // Importantly, re-running this resumes work in progress even if there was // caching, and that if interrupted, it'll be queued again so as to not drop a // wakeChan notification! We know we've read all the pending incoming values, // because the Stream reader call wake(). func (obj *Engine) process(ctx context.Context) (reterr error) { defer func() { // catch programming errors if r := recover(); r != nil { obj.Logf("Panic in process: %+v", r) reterr = fmt.Errorf("panic in process: %+v", r) } }() // Toposort in dependency order. topoSort, err := obj.graph.TopologicalSort() if err != nil { return err } loaded := true // assume we emitted at least one value for now... outDegree := obj.graph.OutDegree() // map[Vertex]int for _, v := range topoSort { f, ok := v.(interfaces.Func) if !ok { panic("not a Func") } node, exists := obj.state[f] if !exists { panic(fmt.Sprintf("missing node in iterate: %s", f)) } out, exists := outDegree[f] if !exists { panic(fmt.Sprintf("missing out degree in iterate: %s", f)) } //outgoing := obj.graph.OutgoingGraphVertices(f) // []pgraph.Vertex //node.isLeaf = len(outgoing) == 0 node.isLeaf = out == 0 // store // TODO: the obj.loaded stuff isn't really consumed currently node.rwmutex.RLock() if !node.loaded { loaded = false // we were wrong } node.rwmutex.RUnlock() // TODO: memoize since graph shape doesn't change in this loop! incoming := obj.graph.IncomingGraphVertices(f) // []pgraph.Vertex // no incoming edges, so no incoming data if len(incoming) == 0 || node.inputClosed { // we do this below if !node.inputClosed { node.inputClosed = true close(node.input) } continue } // else, process input data below... ready := true // assume all input values are ready for now... inputClosed := true // assume all inputs have closed for now... si := &types.Type{ // input to functions are structs Kind: types.KindStruct, Map: node.Func.Info().Sig.Map, Ord: node.Func.Info().Sig.Ord, } st := types.NewStruct(si) // The above builds a struct with fields // populated for each key (empty values) // so we need to very carefully check if // every field is received before we can // safely send it downstream to an edge. need := make(map[string]struct{}) // keys we need for _, k := range node.Func.Info().Sig.Ord { need[k] = struct{}{} } for _, vv := range incoming { ff, ok := vv.(interfaces.Func) if !ok { panic("not a Func") } obj.tableMutex.RLock() value, exists := obj.table[ff] obj.tableMutex.RUnlock() if !exists { ready = false // nope! inputClosed = false // can't be, it's not even ready yet break } // XXX: do we need a lock around reading obj.state? fromNode, exists := obj.state[ff] if !exists { panic(fmt.Sprintf("missing node in notify: %s", ff)) } if !fromNode.outputClosed { inputClosed = false // if any still open, then we are } // set each arg, since one value // could get used for multiple // function inputs (shared edge) args := obj.graph.Adjacency()[ff][f].(*interfaces.FuncEdge).Args for _, arg := range args { // populate struct if err := st.Set(arg, value); err != nil { //panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v", node, fromNode, err)) keys := []string{} for k := range st.Struct() { keys = append(keys, k) } panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v, has: %v", node, fromNode, err, keys)) } if _, exists := need[arg]; !exists { keys := []string{} for k := range st.Struct() { keys = append(keys, k) } // could be either a duplicate or an unwanted field (edge name) panic(fmt.Sprintf("unexpected struct key on `%s` from `%s`: %v, has: %v", node, fromNode, err, keys)) } delete(need, arg) } } if !ready || len(need) != 0 { continue // definitely continue, don't break here } // previously it was closed, skip sending if node.inputClosed { continue } // XXX: respect the info.Pure and info.Memo fields somewhere... // XXX: keep track of some state about who i sent to last before // being interrupted so that I can avoid resending to some nodes // if it's not necessary... // It's critical to avoid deadlock with this sending select that // any events that could happen during this send can be // preempted and that future executions of this function can be // resumed. We must return with an error to let folks know that // we were interrupted. if obj.Debug { obj.Logf("send to func `%s`", node) } select { case node.input <- st: // send to function obj.statsMutex.Lock() val, _ := obj.stats.inputList[node] // val is # or zero obj.stats.inputList[node] = val + 1 // increment obj.statsMutex.Unlock() // pass case <-node.ctx.Done(): // node died obj.wake("node.ctx.Done()") // interrupted, so queue again // This scenario *can* happen, although it is rare. It // triggered the old `chFn && errFn == context.Canceled` // case which we've now removed. //return node.ctx.Err() // old behaviour which was wrong continue // probably best to return and come finish later case <-ctx.Done(): obj.wake("node ctx.Done()") // interrupted, so queue again return ctx.Err() } // It's okay if this section gets preempted and we re-run this // function. The worst that happens is we end up sending the // same input data a second time. This means that we could in // theory be causing unnecessary graph changes (and locks which // cause preemption here) if nodes that cause locks aren't // skipping duplicate/identical input values! if inputClosed && !node.inputClosed { node.inputClosed = true close(node.input) } // XXX: Do we need to somehow wait to make sure that node has // the time to send at least one output? // XXX: We could add a counter to each input that gets passed // through the function... Eg: if we pass in 4, we should wait // until a 4 comes out the output side. But we'd need to change // the signature of func for this... } // end topoSort loop // It's okay if this section gets preempted and we re-run this bit here. obj.loaded = loaded // this gets reset when graph adds new nodes if !loaded { return nil } // Check each leaf and make sure they're all ready to send, for us to // send anything to ag channel. In addition, we need at least one send // message from any of the valid isLeaf nodes. Since this only runs if // everyone is loaded, we just need to check for activty leaf nodes. obj.stateMutex.Lock() for node := range obj.activity { if obj.leafSend { break // early } // down here we need `true` activity! if node.isLeaf { // calculated above in the previous loop obj.leafSend = true break } } obj.activity = make(map[*state]struct{}) // clear //clear(obj.activity) // new clear // This check happens here after the send loop to make sure one value // got in and we didn't close it off too early. for node := range obj.isClosed { // these are closed node.outputClosed = true } obj.stateMutex.Unlock() if !obj.leafSend { return nil } select { case obj.ag <- nil: // send to aggregate channel if we have events obj.Logf("aggregated send") obj.leafSend = false // reset case <-ctx.Done(): obj.leafSend = true // since we skipped the ag send! obj.wake("process ctx.Done()") // interrupted, so queue again return ctx.Err() // XXX: should we even allow this default case? //default: // // exit if we're not ready to send to ag // obj.leafSend = true // since we skipped the ag send! // obj.wake("process default") // interrupted, so queue again } return nil } // Run kicks off the main engine. This takes a mutex. When we're "paused" the // mutex is temporarily released until we "resume". Those operations transition // with the engine Lock and Unlock methods. It is recommended to only add // vertices to the engine after it's running. If you add them before Run, then // Run will cause a Lock/Unlock to occur to cycle them in. Lock and Unlock race // with the cancellation of this Run main loop. Make sure to only call one at a // time. func (obj *Engine) Run(ctx context.Context) (reterr error) { obj.graphMutex.Lock() defer obj.graphMutex.Unlock() // XXX: can the above defer get called while we are already unlocked? // XXX: is it a possibility if we use <-Started() ? wg := &sync.WaitGroup{} defer wg.Wait() defer func() { // catch programming errors if r := recover(); r != nil { obj.Logf("Panic in Run: %+v", r) reterr = fmt.Errorf("panic in Run: %+v", r) } }() ctx, cancel := context.WithCancel(ctx) // wrap parent defer cancel() // Add a wait before the "started" signal runs so that Cleanup waits. obj.wg.Add(1) defer obj.wg.Done() // Send the start signal. close(obj.startedChan) if n := obj.graph.NumVertices(); n > 0 { // hack to make the api easier obj.Logf("graph contained %d vertices before Run", n) wg.Add(1) go func() { defer wg.Done() // kick the engine once to pull in any vertices from // before we started running! defer obj.Unlock() obj.Lock() }() } once := &sync.Once{} loadedSignal := func() { close(obj.loadedChan) } // only run once! // aggregate events channel wg.Add(1) go func() { defer wg.Done() defer close(obj.streamChan) drain := false for { var err error var ok bool select { case err, ok = <-obj.ag: // aggregated channel if !ok { return // channel shutdown } } if drain { continue // no need to send more errors } // TODO: check obj.loaded first? once.Do(loadedSignal) // now send event... if obj.Callback != nil { // send stream signal (callback variant) obj.Callback(ctx, err) } else { // send stream signal select { // send events or errors on streamChan case obj.streamChan <- err: // send case <-ctx.Done(): // when asked to exit return } } if err != nil { cancel() // cancel the context! //return // let the obj.ag channel drain drain = true } } }() // wgAg is a wait group that waits for all senders to the ag chan. // Exceptionally, we don't close the ag channel until wgFor has also // closed, because it can send to wg in process(). wgAg := &sync.WaitGroup{} wgFor := &sync.WaitGroup{} // We need to keep the main loop running until everyone else has shut // down. When the top context closes, we wait for everyone to finish, // and then we shut down this main context. //mainCtx, mainCancel := context.WithCancel(ctx) // wrap parent mainCtx, mainCancel := context.WithCancel(context.Background()) // DON'T wrap parent, close on your own terms defer mainCancel() // close the aggregate channel when everyone is done with it... wg.Add(1) go func() { defer wg.Done() select { case <-ctx.Done(): } // don't wait and close ag before we're really done with Run() wgAg.Wait() // wait for last ag user to close obj.wgTxn.Wait() // wait for first txn as well mainCancel() // only cancel after wgAg goroutines are done wgFor.Wait() // wait for process loop to close before closing close(obj.ag) // last one closes the ag channel }() wgFn := &sync.WaitGroup{} // wg for process function runner defer wgFn.Wait() // extra safety defer obj.runNodeWaitFns() // just in case wgFor.Add(1) // make sure we wait for the below process loop to exit... defer wgFor.Done() // errProcess and processBreakFn are used to help exit following an err. // This approach is needed because if we simply exited, we'd block the // main loop below because various Stream functions are waiting on the // Lock/Unlock cycle to be able to finish cleanly, shutdown, and unblock // all the waitgroups so we can exit. var errProcess error var pausedProcess bool processBreakFn := func(err error /*, paused bool*/) { if err == nil { // a nil error won't cause ag to shutdown below panic("expected error, not nil") } if obj.Debug { obj.Logf("process break") } select { case obj.ag <- err: // send error to aggregate channel case <-ctx.Done(): } cancel() // to unblock //mainCancel() // NO! errProcess = err // set above error //pausedProcess = paused // set this inline directly } if obj.Debug { defer obj.Logf("exited main loop") } // we start off "running", but we'll have an empty graph initially... for { // After we've resumed, we can try to exit. (shortcut) // NOTE: If someone calls Lock(), which would send to // obj.pauseChan, it *won't* deadlock here because mainCtx is // only closed when all the worker waitgroups close first! select { case <-mainCtx.Done(): // when asked to exit return errProcess // we exit happily default: } // run through our graph, check for pause request occasionally for { pausedProcess = false // reset // if we're in errProcess, we skip the process loop! if errProcess != nil { break // skip this process loop } // Start the process run for this iteration of the loop. ctxFn, cancelFn := context.WithCancel(context.Background()) // we run cancelFn() below to cleanup! var errFn error chanFn := make(chan struct{}) // normal exit signal wgFn.Add(1) go func() { defer wgFn.Done() defer close(chanFn) // signal that I exited for { if obj.Debug { obj.Logf("process...") } if errFn = obj.process(ctxFn); errFn != nil { // store if errFn != context.Canceled { obj.Logf("process end err: %+v...", errFn) } return } if obj.Debug { obj.Logf("process end...") } // If process finishes without error, we // should sit here and wait until we get // run again from a wake-up, or we exit. select { case <-obj.wakeChan: // wait until something has actually woken up... if obj.Debug { obj.Logf("process wakeup...") } // loop! case <-ctxFn.Done(): errFn = context.Canceled return } } }() chFn := false chPause := false ctxExit := false select { //case <-obj.wakeChan: // this happens entirely in the process inner, inner loop now. case <-chanFn: // process exited on it's own in error! chFn = true case <-obj.pauseChan: if obj.Debug { obj.Logf("pausing...") } chPause = true case <-mainCtx.Done(): // when asked to exit //return nil // we exit happily ctxExit = true } //fmt.Printf("chPause: %+v\n", chPause) // debug //fmt.Printf("ctxExit: %+v\n", ctxExit) // debug cancelFn() // cancel the process function wgFn.Wait() // wait for the process function to return pausedProcess = chPause // tell the below select if errFn == nil { // break on errors (needs to know if paused) processBreakFn(fmt.Errorf("unexpected nil error in process")) break } if errFn != nil && errFn != context.Canceled { // break on errors (needs to know if paused) processBreakFn(errwrap.Wrapf(errFn, "process error")) break } //if errFn == context.Canceled { // // ignore, we asked for it //} if ctxExit { return nil // we exit happily } if chPause { break } // This used to happen if a node (in the list we are // sending to) dies, and we returned with: // `case <-node.ctx.Done():` // node died // return node.ctx.Err() // which caused this scenario. if chFn && errFn == context.Canceled { // very rare case // programming error processBreakFn(fmt.Errorf("legacy unhandled process state")) break } // programming error //return fmt.Errorf("unhandled process state") processBreakFn(fmt.Errorf("unhandled process state")) break } // if we're in errProcess, we need to add back in the pauseChan! if errProcess != nil && !pausedProcess { select { case <-obj.pauseChan: if obj.Debug { obj.Logf("lower pausing...") } // do we want this exit case? YES case <-mainCtx.Done(): // when asked to exit return errProcess } } // Toposort for paused workers. We run this before the actual // pause completes, because the second we are paused, the graph // could then immediately change. We don't need a lock in here // because the mutex only unlocks when pause is complete below. //topoSort1, err := obj.graph.TopologicalSort() //if err != nil { // return err //} //for _, v := range topoSort1 {} // pause is complete // no exit case from here, must be fully running or paused... select { case obj.pausedChan <- struct{}{}: if obj.Debug { obj.Logf("paused!") } } // // the graph changes shape right here... we are locked right now // // wait until resumed/unlocked select { case <-obj.resumeChan: if obj.Debug { obj.Logf("resuming...") } } // Do any cleanup needed from delete vertex. Or do we? // We've ascertained that while we want this stuff to shutdown, // and while we also know that a Stream() function running is a // part of what we're waiting for to exit, it doesn't matter // that it exits now! This is actually causing a deadlock // because the pending Stream exit, might be calling a new // Reverse commit, which means we're deadlocked. It's safe for // the Stream to keep running, all it might do is needlessly add // a new value to obj.table which won't bother us since we won't // even use it in process. We _do_ want to wait for all of these // before the final exit, but we already have that in a defer. //obj.runNodeWaitFns() // Toposort to run/resume workers. (Bottom of toposort first!) topoSort2, err := obj.graph.TopologicalSort() if err != nil { return err } reversed := pgraph.Reverse(topoSort2) for _, v := range reversed { f, ok := v.(interfaces.Func) if !ok { panic("not a Func") } node, exists := obj.state[f] if !exists { panic(fmt.Sprintf("missing node in iterate: %s", f)) } if node.running { // it's not a new vertex continue } obj.loaded = false // reset this node.running = true obj.statsMutex.Lock() val, _ := obj.stats.inputList[node] // val is # or zero obj.stats.inputList[node] = val // initialize to zero obj.statsMutex.Unlock() innerCtx, innerCancel := context.WithCancel(ctx) // wrap parent (not mainCtx) // we defer innerCancel() in the goroutine to cleanup! node.ctx = innerCtx node.cancel = innerCancel // run mainloop wgAg.Add(1) node.wg.Add(1) go func(f interfaces.Func, node *state) { defer node.wg.Done() defer wgAg.Done() defer node.cancel() // if we close, clean up and send the signal to anyone watching if obj.Debug { obj.Logf("Running func `%s`", node) obj.statsMutex.Lock() obj.stats.runningList[node] = struct{}{} obj.stats.loadedList[node] = false obj.statsMutex.Unlock() } fn := func(nodeCtx context.Context) (reterr error) { defer func() { // catch programming errors if r := recover(); r != nil { obj.Logf("Panic in Stream of func `%s`: %+v", node, r) reterr = fmt.Errorf("panic in Stream of func `%s`: %+v", node, r) } }() return f.Stream(nodeCtx) } runErr := fn(node.ctx) // wrap with recover() if obj.Debug { obj.Logf("Exiting func `%s`", node) obj.statsMutex.Lock() delete(obj.stats.runningList, node) obj.statsMutex.Unlock() } if runErr != nil { obj.Logf("Erroring func `%s`: %+v", node, runErr) // send to a aggregate channel // the first to error will cause ag to // shutdown, so make sure we can exit... select { case obj.ag <- runErr: // send to aggregate channel case <-node.ctx.Done(): } } // if node never loaded, then we error in the node.output loop! }(f, node) // consume output wgAg.Add(1) node.wg.Add(1) go func(f interfaces.Func, node *state) { defer node.wg.Done() defer wgAg.Done() defer func() { // We record the fact that output // closed, so we can eventually close // the downstream node's input. obj.stateMutex.Lock() obj.isClosed[node] = struct{}{} // closed! obj.stateMutex.Unlock() // TODO: is this wake necessary? obj.wake("closed") // closed, so wake up }() for value := range node.output { // read from channel if value == nil { // bug! obj.Logf("func `%s` got nil value", node) panic("got nil value") } obj.tableMutex.RLock() cached, exists := obj.table[f] obj.tableMutex.RUnlock() if !exists { // first value received // RACE: do this AFTER value is present! //node.loaded = true // not yet please if obj.Debug { obj.Logf("func `%s` started", node) } } else if value.Cmp(cached) == nil { // skip if new value is same as previous // if this happens often, it *might* be // a bug in the function implementation // FIXME: do we need to disable engine // caching when using hysteresis? if obj.Debug { obj.Logf("func `%s` skipped", node) } continue } obj.tableMutex.Lock() obj.table[f] = value // save the latest obj.tableMutex.Unlock() node.rwmutex.Lock() node.loaded = true // set *after* value is in :) //obj.Logf("func `%s` changed", node) node.rwmutex.Unlock() obj.statsMutex.Lock() obj.stats.loadedList[node] = true obj.statsMutex.Unlock() // Send a message to tell our ag channel // that we might have sent an aggregated // message here. They should check if we // are a leaf and if we glitch or not... // Make sure we do this before the wake. obj.stateMutex.Lock() obj.activity[node] = struct{}{} // activity! obj.stateMutex.Unlock() obj.wake("new value") // new value, so send wake up } // end for // no more output values are coming... //obj.Logf("func `%s` stopped", node) // nodes that never loaded will cause the engine to hang if !node.loaded { select { case obj.ag <- fmt.Errorf("func `%s` stopped before it was loaded", node): case <-node.ctx.Done(): return } } }(f, node) } // end for // Send new notifications in case any new edges are sending away // to these... They might have already missed the notifications! for k := range obj.resend { // resend TO these! node, exists := obj.state[k] if !exists { continue } // Run as a goroutine to avoid erroring in parent thread. wg.Add(1) go func(node *state) { defer wg.Done() if obj.Debug { obj.Logf("resend to func `%s`", node) } obj.wake("resend") // new value, so send wake up }(node) } obj.resend = make(map[interfaces.Func]struct{}) // reset // now check their states... //for _, v := range reversed { // v, ok := v.(interfaces.Func) // if !ok { // panic("not a Func") // } // // wait for startup? // close(obj.state[v].startup) XXX: once? //} // resume is complete // no exit case from here, must be fully running or paused... select { case obj.resumedChan <- struct{}{}: if obj.Debug { obj.Logf("resumed!") } } } // end for } // Stream returns a channel that you can follow to get aggregated graph events. // Do not block reading from this channel as you can hold up the entire engine. func (obj *Engine) Stream() <-chan error { return obj.streamChan } // Loaded returns a channel that closes when the function engine loads. func (obj *Engine) Loaded() <-chan struct{} { return obj.loadedChan } // Table returns a copy of the populated data table of values. We return a copy // because since these values are constantly changing, we need an atomic // snapshot to present to the consumer of this API. // TODO: is this globally glitch consistent? // TODO: do we need an API to return a single value? (wrapped in read locks) func (obj *Engine) Table() map[interfaces.Func]types.Value { obj.tableMutex.RLock() defer obj.tableMutex.RUnlock() table := make(map[interfaces.Func]types.Value) for k, v := range obj.table { //table[k] = v.Copy() // TODO: do we need to copy these values? table[k] = v } return table } // Apply is similar to Table in that it gives you access to the internal output // table of data, the difference being that it instead passes this information // to a function of your choosing and holds a read/write lock during the entire // time that your function is synchronously executing. If you use this function // to spawn any goroutines that read or write data, then you're asking for a // panic. // XXX: does this need to be a Lock? Can it be an RLock? Check callers! func (obj *Engine) Apply(fn func(map[interfaces.Func]types.Value) error) error { // XXX: does this need to be a Lock? Can it be an RLock? Check callers! obj.tableMutex.Lock() // differs from above RLock around obj.table defer obj.tableMutex.Unlock() table := make(map[interfaces.Func]types.Value) for k, v := range obj.table { //table[k] = v.Copy() // TODO: do we need to copy these values? table[k] = v } return fn(table) } // Started returns a channel that closes when the Run function finishes starting // up. This is useful so that we can wait before calling any of the mutex things // that would normally panic if Run wasn't started up first. func (obj *Engine) Started() <-chan struct{} { return obj.startedChan } // NumVertices returns the number of vertices in the current graph. func (obj *Engine) NumVertices() int { // XXX: would this deadlock if we added this? //obj.graphMutex.Lock() // XXX: should this be a RLock? //defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? return obj.graph.NumVertices() } // Stats returns some statistics in a human-readable form. func (obj *Engine) Stats() string { defer obj.statsMutex.RUnlock() obj.statsMutex.RLock() return obj.stats.String() } // Graphviz writes out the diagram of a graph to be used for visualization and // debugging. You must not modify the graph (eg: during Lock) when calling this // method. func (obj *Engine) Graphviz(dir string) error { // XXX: would this deadlock if we added this? //obj.graphMutex.Lock() // XXX: should this be a RLock? //defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock? obj.graphvizMutex.Lock() defer obj.graphvizMutex.Unlock() obj.graphvizCount++ // increment if dir == "" { dir = obj.graphvizDirectory } if dir == "" { // XXX: hack for ergonomics d := time.Now().UnixMilli() dir = fmt.Sprintf("/tmp/dage-graphviz-%s-%d/", obj.Name, d) obj.graphvizDirectory = dir } if !strings.HasSuffix(dir, "/") { return fmt.Errorf("dir must end with a slash") } if err := os.MkdirAll(dir, 0755); err != nil { return err } dashedEdges, err := pgraph.NewGraph("dashedEdges") if err != nil { return err } for _, v1 := range obj.graph.Vertices() { // if it's a ChannelBasedSinkFunc... if cb, ok := v1.(*structs.ChannelBasedSinkFunc); ok { // ...then add a dashed edge to its output dashedEdges.AddEdge(v1, cb.Target, &pgraph.SimpleEdge{ Name: "channel", // secret channel }) } // if it's a ChannelBasedSourceFunc... if cb, ok := v1.(*structs.ChannelBasedSourceFunc); ok { // ...then add a dashed edge from its input dashedEdges.AddEdge(cb.Source, v1, &pgraph.SimpleEdge{ Name: "channel", // secret channel }) } } gv := &pgraph.Graphviz{ Name: obj.graph.GetName(), Filename: fmt.Sprintf("%s/%d.dot", dir, obj.graphvizCount), Graphs: map[*pgraph.Graph]*pgraph.GraphvizOpts{ obj.graph: nil, dashedEdges: { Style: "dashed", }, }, } if err := gv.Exec(); err != nil { return err } return nil } // state tracks some internal vertex-specific state information. type state struct { Func interfaces.Func name string // cache a name here for safer concurrency input chan types.Value // the top level type must be a struct output chan types.Value txn interfaces.Txn // API of graphTxn struct to pass to each function //init bool // have we run Init on our func? //ready bool // has it received all the args it needs at least once? loaded bool // has the func run at least once ? inputClosed bool // is our input closed? outputClosed bool // is our output closed? isLeaf bool // is my out degree zero? running bool wg *sync.WaitGroup ctx context.Context // per state ctx (inner ctx) cancel func() // cancel above inner ctx rwmutex *sync.RWMutex // concurrency guard for reading/modifying this state } // String implements the fmt.Stringer interface for pretty printing! func (obj *state) String() string { if obj.name != "" { return obj.name } return obj.Func.String() } // stats holds some statistics and other debugging information. type stats struct { // runningList keeps track of which nodes are still running. runningList map[*state]struct{} // loadedList keeps track of which nodes have loaded. loadedList map[*state]bool // inputList keeps track of the number of inputs each node received. inputList map[*state]int64 } // String implements the fmt.Stringer interface for printing out our collected // statistics! func (obj *stats) String() string { // XXX: just build the lock into *stats instead of into our dage obj s := "stats:\n" { s += "\trunning:\n" names := []string{} for k := range obj.runningList { names = append(names, k.String()) } sort.Strings(names) for _, name := range names { s += fmt.Sprintf("\t * %s\n", name) } } { nodes := []*state{} for k := range obj.loadedList { nodes = append(nodes, k) } sort.Slice(nodes, func(i, j int) bool { return nodes[i].String() < nodes[j].String() }) s += "\tloaded:\n" for _, node := range nodes { if !obj.loadedList[node] { continue } s += fmt.Sprintf("\t * %s\n", node) } s += "\tnot loaded:\n" for _, node := range nodes { if obj.loadedList[node] { continue } s += fmt.Sprintf("\t * %s\n", node) } } { s += "\tinput count:\n" nodes := []*state{} for k := range obj.inputList { nodes = append(nodes, k) } //sort.Slice(nodes, func(i, j int) bool { return nodes[i].String() < nodes[j].String() }) sort.Slice(nodes, func(i, j int) bool { return obj.inputList[nodes[i]] < obj.inputList[nodes[j]] }) for _, node := range nodes { s += fmt.Sprintf("\t * (%d) %s\n", obj.inputList[node], node) } } return s }