1635 lines
50 KiB
Go
1635 lines
50 KiB
Go
// Mgmt
|
|
// Copyright (C) 2013-2024+ James Shubin and the project contributors
|
|
// Written by James Shubin <james@shubin.ca> and the project contributors
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
//
|
|
// Additional permission under GNU GPL version 3 section 7
|
|
//
|
|
// If you modify this program, or any covered work, by linking or combining it
|
|
// with embedded mcl code and modules (and that the embedded mcl code and
|
|
// modules which link with this program, contain a copy of their source code in
|
|
// the authoritative form) containing parts covered by the terms of any other
|
|
// license, the licensors of this program grant you additional permission to
|
|
// convey the resulting work. Furthermore, the licensors of this program grant
|
|
// the original author, James Shubin, additional permission to update this
|
|
// additional permission if he deems it necessary to achieve the goals of this
|
|
// additional permission.
|
|
|
|
// Package dage implements a DAG function engine.
|
|
// TODO: can we rename this to something more interesting?
|
|
package dage
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/purpleidea/mgmt/engine"
|
|
"github.com/purpleidea/mgmt/engine/local"
|
|
"github.com/purpleidea/mgmt/lang/funcs/ref"
|
|
"github.com/purpleidea/mgmt/lang/funcs/structs"
|
|
"github.com/purpleidea/mgmt/lang/funcs/txn"
|
|
"github.com/purpleidea/mgmt/lang/interfaces"
|
|
"github.com/purpleidea/mgmt/lang/types"
|
|
"github.com/purpleidea/mgmt/pgraph"
|
|
"github.com/purpleidea/mgmt/util/errwrap"
|
|
)
|
|
|
|
// Engine implements a dag engine which lets us "run" a dag of functions, but
|
|
// also allows us to modify it while we are running.
|
|
type Engine struct {
|
|
// Name is the name used for the instance of the engine and in the graph
|
|
// that is held within it.
|
|
Name string
|
|
|
|
Hostname string
|
|
Local *local.API
|
|
World engine.World
|
|
|
|
Debug bool
|
|
Logf func(format string, v ...interface{})
|
|
|
|
// Callback can be specified as an alternative to using the Stream
|
|
// method to get events. If the context on it is cancelled, then it must
|
|
// shutdown quickly, because this means we are closing and want to
|
|
// disconnect. Whether you want to respect that is up to you, but the
|
|
// engine will not be able to close until you do. If specified, and an
|
|
// error has occurred, it will set that error property.
|
|
Callback func(context.Context, error)
|
|
|
|
graph *pgraph.Graph // guarded by graphMutex
|
|
table map[interfaces.Func]types.Value // guarded by tableMutex
|
|
state map[interfaces.Func]*state
|
|
|
|
// graphMutex wraps access to the table map.
|
|
graphMutex *sync.Mutex // TODO: &sync.RWMutex{} ?
|
|
|
|
// tableMutex wraps access to the table map.
|
|
tableMutex *sync.RWMutex
|
|
|
|
// refCount keeps track of vertex and edge references across the entire
|
|
// graph.
|
|
refCount *ref.Count
|
|
|
|
// wgTxn blocks shutdown until the initial Txn has Reversed.
|
|
wgTxn *sync.WaitGroup
|
|
|
|
// firstTxn checks to make sure wgTxn is only used for the first Txn.
|
|
firstTxn bool
|
|
|
|
wg *sync.WaitGroup
|
|
|
|
// pause/resume state machine signals
|
|
pauseChan chan struct{}
|
|
pausedChan chan struct{}
|
|
resumeChan chan struct{}
|
|
resumedChan chan struct{}
|
|
|
|
// resend tracks which new nodes might need a new notification
|
|
resend map[interfaces.Func]struct{}
|
|
|
|
// nodeWaitFns is a list of cleanup functions to run after we've begun
|
|
// resume, but before we've resumed completely. These are actions that
|
|
// we would like to do when paused from a deleteVertex operation, but
|
|
// that would deadlock if we did.
|
|
nodeWaitFns []func()
|
|
|
|
// nodeWaitMutex wraps access to the nodeWaitFns list.
|
|
nodeWaitMutex *sync.Mutex
|
|
|
|
// streamChan is used to send notifications to the outside world.
|
|
streamChan chan error
|
|
|
|
loaded bool // are all of the funcs loaded?
|
|
loadedChan chan struct{} // funcs loaded signal
|
|
|
|
startedChan chan struct{} // closes when Run() starts
|
|
|
|
// wakeChan contains a message when someone has asked for us to wake up.
|
|
wakeChan chan struct{}
|
|
|
|
// ag is the aggregation channel which cues up outgoing events.
|
|
ag chan error
|
|
|
|
// leafSend specifies if we should do an ag send because we have
|
|
// activity at a leaf.
|
|
leafSend bool
|
|
|
|
// isClosed tracks nodes that have closed. This list is purged as they
|
|
// are removed from the graph.
|
|
isClosed map[*state]struct{}
|
|
|
|
// activity tracks nodes that are ready to send to ag. The main process
|
|
// loop decides if we have the correct set to do so. A corresponding
|
|
// value of true means we have regular activity, and a value of false
|
|
// means the node closed.
|
|
activity map[*state]struct{}
|
|
|
|
// stateMutex wraps access to the isClosed and activity maps.
|
|
stateMutex *sync.Mutex
|
|
|
|
// stats holds some statistics and other debugging information.
|
|
stats *stats // guarded by statsMutex
|
|
|
|
// statsMutex wraps access to the stats data.
|
|
statsMutex *sync.RWMutex
|
|
|
|
// graphvizMutex wraps access to the Graphviz method.
|
|
graphvizMutex *sync.Mutex
|
|
|
|
// graphvizCount keeps a running tally of how many graphs we've
|
|
// generated. This is useful for displaying a sequence (timeline) of
|
|
// graphs in a linear order.
|
|
graphvizCount int64
|
|
|
|
// graphvizDirectory stores the generated path for outputting graphviz
|
|
// files if one is not specified at runtime.
|
|
graphvizDirectory string
|
|
}
|
|
|
|
// Setup sets up the internal datastructures needed for this engine.
|
|
func (obj *Engine) Setup() error {
|
|
var err error
|
|
obj.graph, err = pgraph.NewGraph(obj.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
obj.table = make(map[interfaces.Func]types.Value)
|
|
obj.state = make(map[interfaces.Func]*state)
|
|
obj.graphMutex = &sync.Mutex{} // TODO: &sync.RWMutex{} ?
|
|
obj.tableMutex = &sync.RWMutex{}
|
|
|
|
obj.refCount = (&ref.Count{}).Init()
|
|
|
|
obj.wgTxn = &sync.WaitGroup{}
|
|
|
|
obj.wg = &sync.WaitGroup{}
|
|
|
|
obj.pauseChan = make(chan struct{})
|
|
obj.pausedChan = make(chan struct{})
|
|
obj.resumeChan = make(chan struct{})
|
|
obj.resumedChan = make(chan struct{})
|
|
|
|
obj.resend = make(map[interfaces.Func]struct{})
|
|
|
|
obj.nodeWaitFns = []func(){}
|
|
|
|
obj.nodeWaitMutex = &sync.Mutex{}
|
|
|
|
obj.streamChan = make(chan error)
|
|
obj.loadedChan = make(chan struct{})
|
|
obj.startedChan = make(chan struct{})
|
|
|
|
obj.wakeChan = make(chan struct{}, 1) // hold up to one message
|
|
|
|
obj.ag = make(chan error)
|
|
|
|
obj.isClosed = make(map[*state]struct{})
|
|
|
|
obj.activity = make(map[*state]struct{})
|
|
obj.stateMutex = &sync.Mutex{}
|
|
|
|
obj.stats = &stats{
|
|
runningList: make(map[*state]struct{}),
|
|
loadedList: make(map[*state]bool),
|
|
inputList: make(map[*state]int64),
|
|
}
|
|
obj.statsMutex = &sync.RWMutex{}
|
|
|
|
obj.graphvizMutex = &sync.Mutex{}
|
|
return nil
|
|
}
|
|
|
|
// Cleanup cleans up and frees memory and resources after everything is done.
|
|
func (obj *Engine) Cleanup() error {
|
|
obj.wg.Wait() // don't cleanup these before Run() finished
|
|
close(obj.pauseChan) // free
|
|
close(obj.pausedChan)
|
|
close(obj.resumeChan)
|
|
close(obj.resumedChan)
|
|
return nil
|
|
}
|
|
|
|
// Txn returns a transaction that is suitable for adding and removing from the
|
|
// graph. You must run Setup before this method is called.
|
|
func (obj *Engine) Txn() interfaces.Txn {
|
|
if obj.refCount == nil {
|
|
panic("you must run setup before first use")
|
|
}
|
|
// The very first initial Txn must have a wait group to make sure if we
|
|
// shutdown (in error) that we can Reverse things before the Lock/Unlock
|
|
// loop shutsdown.
|
|
var free func()
|
|
if !obj.firstTxn {
|
|
obj.firstTxn = true
|
|
obj.wgTxn.Add(1)
|
|
free = func() {
|
|
obj.wgTxn.Done()
|
|
}
|
|
}
|
|
return (&txn.GraphTxn{
|
|
Lock: obj.Lock,
|
|
Unlock: obj.Unlock,
|
|
GraphAPI: obj,
|
|
RefCount: obj.refCount, // reference counting
|
|
FreeFunc: free,
|
|
}).Init()
|
|
}
|
|
|
|
// addVertex is the lockless version of the AddVertex function. This is needed
|
|
// so that AddEdge can add two vertices within the same lock.
|
|
func (obj *Engine) addVertex(f interfaces.Func) error {
|
|
if _, exists := obj.state[f]; exists {
|
|
// don't err dupes, because it makes using the AddEdge API yucky
|
|
return nil
|
|
}
|
|
|
|
// add some extra checks for easier debugging
|
|
if f == nil {
|
|
return fmt.Errorf("missing func")
|
|
}
|
|
if f.Info() == nil {
|
|
return fmt.Errorf("missing func info")
|
|
}
|
|
sig := f.Info().Sig
|
|
if sig == nil {
|
|
return fmt.Errorf("missing func sig")
|
|
}
|
|
if sig.Kind != types.KindFunc {
|
|
return fmt.Errorf("must be kind func")
|
|
}
|
|
if err := f.Validate(); err != nil {
|
|
return errwrap.Wrapf(err, "node did not Validate")
|
|
}
|
|
|
|
input := make(chan types.Value)
|
|
output := make(chan types.Value)
|
|
txn := obj.Txn()
|
|
|
|
// This is the one of two places where we modify this map. To avoid
|
|
// concurrent writes, we only do this when we're locked! Anywhere that
|
|
// can read where we are locked must have a mutex around it or do the
|
|
// lookup when we're in an unlocked state.
|
|
node := &state{
|
|
Func: f,
|
|
name: f.String(), // cache a name to avoid locks
|
|
|
|
input: input,
|
|
output: output,
|
|
txn: txn,
|
|
|
|
running: false,
|
|
wg: &sync.WaitGroup{},
|
|
|
|
rwmutex: &sync.RWMutex{},
|
|
}
|
|
|
|
init := &interfaces.Init{
|
|
Hostname: obj.Hostname,
|
|
Input: node.input,
|
|
Output: node.output,
|
|
Txn: node.txn,
|
|
Local: obj.Local,
|
|
World: obj.World,
|
|
Debug: obj.Debug,
|
|
Logf: func(format string, v ...interface{}) {
|
|
// safe Logf in case f.String contains %? chars...
|
|
s := f.String() + ": " + fmt.Sprintf(format, v...)
|
|
obj.Logf("%s", s)
|
|
},
|
|
}
|
|
|
|
if err := f.Init(init); err != nil {
|
|
return err
|
|
}
|
|
// only now, do we modify the graph
|
|
obj.state[f] = node
|
|
obj.graph.AddVertex(f)
|
|
return nil
|
|
}
|
|
|
|
// AddVertex is the thread-safe way to add a vertex. You will need to call the
|
|
// engine Lock method before using this and the Unlock method afterwards.
|
|
func (obj *Engine) AddVertex(f interfaces.Func) error {
|
|
obj.graphMutex.Lock()
|
|
defer obj.graphMutex.Unlock()
|
|
if obj.Debug {
|
|
obj.Logf("Engine:AddVertex: %p %s", f, f)
|
|
}
|
|
|
|
return obj.addVertex(f) // lockless version
|
|
}
|
|
|
|
// AddEdge is the thread-safe way to add an edge. You will need to call the
|
|
// engine Lock method before using this and the Unlock method afterwards. This
|
|
// will automatically run AddVertex on both input vertices if they are not
|
|
// already part of the graph. You should only create DAG's as this function
|
|
// engine cannot handle cycles and this method will error if you cause a cycle.
|
|
func (obj *Engine) AddEdge(f1, f2 interfaces.Func, fe *interfaces.FuncEdge) error {
|
|
obj.graphMutex.Lock()
|
|
defer obj.graphMutex.Unlock()
|
|
if obj.Debug {
|
|
obj.Logf("Engine:AddEdge %p %s: %p %s -> %p %s", fe, fe, f1, f1, f2, f2)
|
|
}
|
|
|
|
// safety check to avoid cycles
|
|
g := obj.graph.Copy()
|
|
//g.AddVertex(f1)
|
|
//g.AddVertex(f2)
|
|
g.AddEdge(f1, f2, fe)
|
|
if _, err := g.TopologicalSort(); err != nil {
|
|
return err // not a dag
|
|
}
|
|
// if we didn't cycle, we can modify the real graph safely...
|
|
|
|
// Does the graph already have these nodes in it?
|
|
hasf1 := obj.graph.HasVertex(f1)
|
|
//hasf2 := obj.graph.HasVertex(f2)
|
|
|
|
if err := obj.addVertex(f1); err != nil { // lockless version
|
|
return err
|
|
}
|
|
if err := obj.addVertex(f2); err != nil {
|
|
// rollback f1 on error of f2
|
|
obj.deleteVertex(f1) // ignore any error
|
|
return err
|
|
}
|
|
|
|
// If f1 doesn't exist, let f1 (or it's incoming nodes) get the notify.
|
|
// If f2 is new, then it should get a new notification unless f1 is new.
|
|
// But there's no guarantee we didn't AddVertex(f2); AddEdge(f1, f2, e),
|
|
// so resend if f1 already exists. Otherwise it's not a new notification.
|
|
// previously: `if hasf1 && !hasf2`
|
|
if hasf1 {
|
|
obj.resend[f2] = struct{}{} // resend notification to me
|
|
}
|
|
|
|
obj.graph.AddEdge(f1, f2, fe) // replaces any existing edge here
|
|
|
|
// This shouldn't error, since the test graph didn't find a cycle.
|
|
if _, err := obj.graph.TopologicalSort(); err != nil {
|
|
// programming error
|
|
panic(err) // not a dag
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// deleteVertex is the lockless version of the DeleteVertex function. This is
|
|
// needed so that AddEdge can add two vertices within the same lock. It needs
|
|
// deleteVertex so it can rollback the first one if the second addVertex fails.
|
|
func (obj *Engine) deleteVertex(f interfaces.Func) error {
|
|
node, exists := obj.state[f]
|
|
if !exists {
|
|
return fmt.Errorf("vertex %p %s doesn't exist", f, f)
|
|
}
|
|
|
|
if node.running {
|
|
// cancel the running vertex
|
|
node.cancel() // cancel inner ctx
|
|
|
|
// We store this work to be performed later on in the main loop
|
|
// because this Wait() might be blocked by a defer Commit, which
|
|
// is itself blocked because this deleteVertex operation is part
|
|
// of a Commit.
|
|
obj.nodeWaitMutex.Lock()
|
|
obj.nodeWaitFns = append(obj.nodeWaitFns, func() {
|
|
node.wg.Wait() // While waiting, the Stream might cause a new Reverse Commit
|
|
node.txn.Free() // Clean up when done!
|
|
obj.stateMutex.Lock()
|
|
delete(obj.isClosed, node) // avoid memory leak
|
|
obj.stateMutex.Unlock()
|
|
})
|
|
obj.nodeWaitMutex.Unlock()
|
|
}
|
|
|
|
// This is the one of two places where we modify this map. To avoid
|
|
// concurrent writes, we only do this when we're locked! Anywhere that
|
|
// can read where we are locked must have a mutex around it or do the
|
|
// lookup when we're in an unlocked state.
|
|
delete(obj.state, f)
|
|
obj.graph.DeleteVertex(f)
|
|
return nil
|
|
}
|
|
|
|
// DeleteVertex is the thread-safe way to delete a vertex. You will need to call
|
|
// the engine Lock method before using this and the Unlock method afterwards.
|
|
func (obj *Engine) DeleteVertex(f interfaces.Func) error {
|
|
obj.graphMutex.Lock()
|
|
defer obj.graphMutex.Unlock()
|
|
if obj.Debug {
|
|
obj.Logf("Engine:DeleteVertex: %p %s", f, f)
|
|
}
|
|
|
|
return obj.deleteVertex(f) // lockless version
|
|
}
|
|
|
|
// DeleteEdge is the thread-safe way to delete an edge. You will need to call
|
|
// the engine Lock method before using this and the Unlock method afterwards.
|
|
func (obj *Engine) DeleteEdge(fe *interfaces.FuncEdge) error {
|
|
obj.graphMutex.Lock()
|
|
defer obj.graphMutex.Unlock()
|
|
if obj.Debug {
|
|
f1, f2, found := obj.graph.LookupEdge(fe)
|
|
if found {
|
|
obj.Logf("Engine:DeleteEdge: %p %s -> %p %s", f1, f1, f2, f2)
|
|
} else {
|
|
obj.Logf("Engine:DeleteEdge: not found %p %s", fe, fe)
|
|
}
|
|
}
|
|
|
|
// Don't bother checking if edge exists first and don't error if it
|
|
// doesn't because it might have gotten deleted when a vertex did, and
|
|
// so there's no need to complain for nothing.
|
|
obj.graph.DeleteEdge(fe)
|
|
|
|
return nil
|
|
}
|
|
|
|
// HasVertex is the thread-safe way to check if a vertex exists in the graph.
|
|
// You will need to call the engine Lock method before using this and the Unlock
|
|
// method afterwards.
|
|
func (obj *Engine) HasVertex(f interfaces.Func) bool {
|
|
obj.graphMutex.Lock() // XXX: should this be a RLock?
|
|
defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock?
|
|
|
|
return obj.graph.HasVertex(f)
|
|
}
|
|
|
|
// LookupEdge is the thread-safe way to check which vertices (if any) exist
|
|
// between an edge in the graph. You will need to call the engine Lock method
|
|
// before using this and the Unlock method afterwards.
|
|
func (obj *Engine) LookupEdge(fe *interfaces.FuncEdge) (interfaces.Func, interfaces.Func, bool) {
|
|
obj.graphMutex.Lock() // XXX: should this be a RLock?
|
|
defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock?
|
|
|
|
v1, v2, found := obj.graph.LookupEdge(fe)
|
|
if !found {
|
|
return nil, nil, found
|
|
}
|
|
f1, ok := v1.(interfaces.Func)
|
|
if !ok {
|
|
panic("not a Func")
|
|
}
|
|
f2, ok := v2.(interfaces.Func)
|
|
if !ok {
|
|
panic("not a Func")
|
|
}
|
|
return f1, f2, found
|
|
}
|
|
|
|
// FindEdge is the thread-safe way to check which edge (if any) exists between
|
|
// two vertices in the graph. This is an important method in edge removal,
|
|
// because it's what you really need to know for DeleteEdge to work. Requesting
|
|
// a specific deletion isn't very sensical in this library when specified as the
|
|
// edge pointer, since we might replace it with a new edge that has new arg
|
|
// names. Instead, use this to look up what relationship you want, and then
|
|
// DeleteEdge to remove it. You will need to call the engine Lock method before
|
|
// using this and the Unlock method afterwards.
|
|
func (obj *Engine) FindEdge(f1, f2 interfaces.Func) *interfaces.FuncEdge {
|
|
obj.graphMutex.Lock() // XXX: should this be a RLock?
|
|
defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock?
|
|
|
|
edge := obj.graph.FindEdge(f1, f2)
|
|
if edge == nil {
|
|
return nil
|
|
}
|
|
fe, ok := edge.(*interfaces.FuncEdge)
|
|
if !ok {
|
|
panic("edge is not a FuncEdge")
|
|
}
|
|
|
|
return fe
|
|
}
|
|
|
|
// Graph returns a copy of the contained graph.
|
|
func (obj *Engine) Graph() *pgraph.Graph {
|
|
//obj.graphMutex.Lock() // XXX: should this be a RLock?
|
|
//defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock?
|
|
|
|
return obj.graph.Copy()
|
|
}
|
|
|
|
// Lock must be used before modifying the running graph. Make sure to Unlock
|
|
// when done.
|
|
// XXX: should Lock take a context if we want to bail mid-way?
|
|
// TODO: could we replace pauseChan with SubscribedSignal ?
|
|
func (obj *Engine) Lock() { // pause
|
|
select {
|
|
case obj.pauseChan <- struct{}{}:
|
|
}
|
|
//obj.rwmutex.Lock() // TODO: or should it go right before pauseChan?
|
|
|
|
// waiting for the pause to move to paused...
|
|
select {
|
|
case <-obj.pausedChan:
|
|
}
|
|
// this mutex locks at start of Run() and unlocks at finish of Run()
|
|
obj.graphMutex.Unlock() // safe to make changes now
|
|
}
|
|
|
|
// Unlock must be used after modifying the running graph. Make sure to Lock
|
|
// beforehand.
|
|
// XXX: should Unlock take a context if we want to bail mid-way?
|
|
func (obj *Engine) Unlock() { // resume
|
|
// this mutex locks at start of Run() and unlocks at finish of Run()
|
|
obj.graphMutex.Lock() // no more changes are allowed
|
|
select {
|
|
case obj.resumeChan <- struct{}{}:
|
|
}
|
|
//obj.rwmutex.Unlock() // TODO: or should it go right after resumedChan?
|
|
|
|
// waiting for the resume to move to resumed...
|
|
select {
|
|
case <-obj.resumedChan:
|
|
}
|
|
}
|
|
|
|
// wake sends a message to the wake queue to wake up the main process function
|
|
// which would otherwise spin unnecessarily. This can be called anytime, and
|
|
// doesn't hurt, it only wastes cpu if there's nothing to do. This does NOT ever
|
|
// block, and that's important so it can be called from anywhere.
|
|
func (obj *Engine) wake(name string) {
|
|
// The mutex guards the len check to avoid this function sending two
|
|
// messages down the channel, because the second would block if the
|
|
// consumer isn't fast enough. This mutex makes this method effectively
|
|
// asynchronous.
|
|
//obj.wakeMutex.Lock()
|
|
//defer obj.wakeMutex.Unlock()
|
|
//if len(obj.wakeChan) > 0 { // collapse duplicate, pending wake signals
|
|
// return
|
|
//}
|
|
select {
|
|
case obj.wakeChan <- struct{}{}: // send to chan of length 1
|
|
if obj.Debug {
|
|
obj.Logf("wake sent from: %s", name)
|
|
}
|
|
default: // this is a cheap alternative to avoid the mutex altogether!
|
|
if obj.Debug {
|
|
obj.Logf("wake skip from: %s", name)
|
|
}
|
|
// skip sending, we already have a message pending!
|
|
}
|
|
}
|
|
|
|
// runNodeWaitFns is a helper to run the cleanup nodeWaitFns list. It clears the
|
|
// list after it runs.
|
|
func (obj *Engine) runNodeWaitFns() {
|
|
// The lock is probably not needed here, but it won't hurt either.
|
|
obj.nodeWaitMutex.Lock()
|
|
defer obj.nodeWaitMutex.Unlock()
|
|
for _, fn := range obj.nodeWaitFns {
|
|
fn()
|
|
}
|
|
obj.nodeWaitFns = []func(){} // clear
|
|
}
|
|
|
|
// process is the inner loop that runs through the entire graph. It can be
|
|
// called successively safely, as it is roughly idempotent, and is used to push
|
|
// values through the graph. If it is interrupted, it can pick up where it left
|
|
// off on the next run. This does however require it to re-check some things,
|
|
// but that is the price we pay for being always available to unblock.
|
|
// Importantly, re-running this resumes work in progress even if there was
|
|
// caching, and that if interrupted, it'll be queued again so as to not drop a
|
|
// wakeChan notification! We know we've read all the pending incoming values,
|
|
// because the Stream reader call wake().
|
|
func (obj *Engine) process(ctx context.Context) (reterr error) {
|
|
defer func() {
|
|
// catch programming errors
|
|
if r := recover(); r != nil {
|
|
obj.Logf("Panic in process: %+v", r)
|
|
reterr = fmt.Errorf("panic in process: %+v", r)
|
|
}
|
|
}()
|
|
|
|
// Toposort in dependency order.
|
|
topoSort, err := obj.graph.TopologicalSort()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
loaded := true // assume we emitted at least one value for now...
|
|
|
|
outDegree := obj.graph.OutDegree() // map[Vertex]int
|
|
|
|
for _, v := range topoSort {
|
|
f, ok := v.(interfaces.Func)
|
|
if !ok {
|
|
panic("not a Func")
|
|
}
|
|
node, exists := obj.state[f]
|
|
if !exists {
|
|
panic(fmt.Sprintf("missing node in iterate: %s", f))
|
|
}
|
|
|
|
out, exists := outDegree[f]
|
|
if !exists {
|
|
panic(fmt.Sprintf("missing out degree in iterate: %s", f))
|
|
}
|
|
//outgoing := obj.graph.OutgoingGraphVertices(f) // []pgraph.Vertex
|
|
//node.isLeaf = len(outgoing) == 0
|
|
node.isLeaf = out == 0 // store
|
|
|
|
// TODO: the obj.loaded stuff isn't really consumed currently
|
|
node.rwmutex.RLock()
|
|
if !node.loaded {
|
|
loaded = false // we were wrong
|
|
}
|
|
node.rwmutex.RUnlock()
|
|
|
|
// TODO: memoize since graph shape doesn't change in this loop!
|
|
incoming := obj.graph.IncomingGraphVertices(f) // []pgraph.Vertex
|
|
|
|
// no incoming edges, so no incoming data
|
|
if len(incoming) == 0 || node.inputClosed { // we do this below
|
|
if !node.inputClosed {
|
|
node.inputClosed = true
|
|
close(node.input)
|
|
}
|
|
continue
|
|
} // else, process input data below...
|
|
|
|
ready := true // assume all input values are ready for now...
|
|
inputClosed := true // assume all inputs have closed for now...
|
|
si := &types.Type{
|
|
// input to functions are structs
|
|
Kind: types.KindStruct,
|
|
Map: node.Func.Info().Sig.Map,
|
|
Ord: node.Func.Info().Sig.Ord,
|
|
}
|
|
st := types.NewStruct(si)
|
|
// The above builds a struct with fields
|
|
// populated for each key (empty values)
|
|
// so we need to very carefully check if
|
|
// every field is received before we can
|
|
// safely send it downstream to an edge.
|
|
need := make(map[string]struct{}) // keys we need
|
|
for _, k := range node.Func.Info().Sig.Ord {
|
|
need[k] = struct{}{}
|
|
}
|
|
|
|
for _, vv := range incoming {
|
|
ff, ok := vv.(interfaces.Func)
|
|
if !ok {
|
|
panic("not a Func")
|
|
}
|
|
obj.tableMutex.RLock()
|
|
value, exists := obj.table[ff]
|
|
obj.tableMutex.RUnlock()
|
|
if !exists {
|
|
ready = false // nope!
|
|
inputClosed = false // can't be, it's not even ready yet
|
|
break
|
|
}
|
|
// XXX: do we need a lock around reading obj.state?
|
|
fromNode, exists := obj.state[ff]
|
|
if !exists {
|
|
panic(fmt.Sprintf("missing node in notify: %s", ff))
|
|
}
|
|
if !fromNode.outputClosed {
|
|
inputClosed = false // if any still open, then we are
|
|
}
|
|
|
|
// set each arg, since one value
|
|
// could get used for multiple
|
|
// function inputs (shared edge)
|
|
args := obj.graph.Adjacency()[ff][f].(*interfaces.FuncEdge).Args
|
|
for _, arg := range args {
|
|
// populate struct
|
|
if err := st.Set(arg, value); err != nil {
|
|
//panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v", node, fromNode, err))
|
|
keys := []string{}
|
|
for k := range st.Struct() {
|
|
keys = append(keys, k)
|
|
}
|
|
panic(fmt.Sprintf("struct set failure on `%s` from `%s`: %v, has: %v", node, fromNode, err, keys))
|
|
}
|
|
if _, exists := need[arg]; !exists {
|
|
keys := []string{}
|
|
for k := range st.Struct() {
|
|
keys = append(keys, k)
|
|
}
|
|
// could be either a duplicate or an unwanted field (edge name)
|
|
panic(fmt.Sprintf("unexpected struct key on `%s` from `%s`: %v, has: %v", node, fromNode, err, keys))
|
|
}
|
|
delete(need, arg)
|
|
}
|
|
}
|
|
|
|
if !ready || len(need) != 0 {
|
|
continue // definitely continue, don't break here
|
|
}
|
|
|
|
// previously it was closed, skip sending
|
|
if node.inputClosed {
|
|
continue
|
|
}
|
|
|
|
// XXX: respect the info.Pure and info.Memo fields somewhere...
|
|
|
|
// XXX: keep track of some state about who i sent to last before
|
|
// being interrupted so that I can avoid resending to some nodes
|
|
// if it's not necessary...
|
|
|
|
// It's critical to avoid deadlock with this sending select that
|
|
// any events that could happen during this send can be
|
|
// preempted and that future executions of this function can be
|
|
// resumed. We must return with an error to let folks know that
|
|
// we were interrupted.
|
|
if obj.Debug {
|
|
obj.Logf("send to func `%s`", node)
|
|
}
|
|
select {
|
|
case node.input <- st: // send to function
|
|
obj.statsMutex.Lock()
|
|
val, _ := obj.stats.inputList[node] // val is # or zero
|
|
obj.stats.inputList[node] = val + 1 // increment
|
|
obj.statsMutex.Unlock()
|
|
// pass
|
|
|
|
case <-node.ctx.Done(): // node died
|
|
obj.wake("node.ctx.Done()") // interrupted, so queue again
|
|
// This scenario *can* happen, although it is rare. It
|
|
// triggered the old `chFn && errFn == context.Canceled`
|
|
// case which we've now removed.
|
|
//return node.ctx.Err() // old behaviour which was wrong
|
|
continue // probably best to return and come finish later
|
|
|
|
case <-ctx.Done():
|
|
obj.wake("node ctx.Done()") // interrupted, so queue again
|
|
return ctx.Err()
|
|
}
|
|
|
|
// It's okay if this section gets preempted and we re-run this
|
|
// function. The worst that happens is we end up sending the
|
|
// same input data a second time. This means that we could in
|
|
// theory be causing unnecessary graph changes (and locks which
|
|
// cause preemption here) if nodes that cause locks aren't
|
|
// skipping duplicate/identical input values!
|
|
if inputClosed && !node.inputClosed {
|
|
node.inputClosed = true
|
|
close(node.input)
|
|
}
|
|
|
|
// XXX: Do we need to somehow wait to make sure that node has
|
|
// the time to send at least one output?
|
|
// XXX: We could add a counter to each input that gets passed
|
|
// through the function... Eg: if we pass in 4, we should wait
|
|
// until a 4 comes out the output side. But we'd need to change
|
|
// the signature of func for this...
|
|
|
|
} // end topoSort loop
|
|
|
|
// It's okay if this section gets preempted and we re-run this bit here.
|
|
obj.loaded = loaded // this gets reset when graph adds new nodes
|
|
|
|
if !loaded {
|
|
return nil
|
|
}
|
|
|
|
// Check each leaf and make sure they're all ready to send, for us to
|
|
// send anything to ag channel. In addition, we need at least one send
|
|
// message from any of the valid isLeaf nodes. Since this only runs if
|
|
// everyone is loaded, we just need to check for activty leaf nodes.
|
|
obj.stateMutex.Lock()
|
|
for node := range obj.activity {
|
|
if obj.leafSend {
|
|
break // early
|
|
}
|
|
|
|
// down here we need `true` activity!
|
|
if node.isLeaf { // calculated above in the previous loop
|
|
obj.leafSend = true
|
|
break
|
|
}
|
|
}
|
|
obj.activity = make(map[*state]struct{}) // clear
|
|
//clear(obj.activity) // new clear
|
|
|
|
// This check happens here after the send loop to make sure one value
|
|
// got in and we didn't close it off too early.
|
|
for node := range obj.isClosed { // these are closed
|
|
node.outputClosed = true
|
|
}
|
|
obj.stateMutex.Unlock()
|
|
|
|
if !obj.leafSend {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case obj.ag <- nil: // send to aggregate channel if we have events
|
|
obj.Logf("aggregated send")
|
|
obj.leafSend = false // reset
|
|
|
|
case <-ctx.Done():
|
|
obj.leafSend = true // since we skipped the ag send!
|
|
obj.wake("process ctx.Done()") // interrupted, so queue again
|
|
return ctx.Err()
|
|
|
|
// XXX: should we even allow this default case?
|
|
//default:
|
|
// // exit if we're not ready to send to ag
|
|
// obj.leafSend = true // since we skipped the ag send!
|
|
// obj.wake("process default") // interrupted, so queue again
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Run kicks off the main engine. This takes a mutex. When we're "paused" the
|
|
// mutex is temporarily released until we "resume". Those operations transition
|
|
// with the engine Lock and Unlock methods. It is recommended to only add
|
|
// vertices to the engine after it's running. If you add them before Run, then
|
|
// Run will cause a Lock/Unlock to occur to cycle them in. Lock and Unlock race
|
|
// with the cancellation of this Run main loop. Make sure to only call one at a
|
|
// time.
|
|
func (obj *Engine) Run(ctx context.Context) (reterr error) {
|
|
obj.graphMutex.Lock()
|
|
defer obj.graphMutex.Unlock()
|
|
|
|
// XXX: can the above defer get called while we are already unlocked?
|
|
// XXX: is it a possibility if we use <-Started() ?
|
|
|
|
wg := &sync.WaitGroup{}
|
|
defer wg.Wait()
|
|
|
|
defer func() {
|
|
// catch programming errors
|
|
if r := recover(); r != nil {
|
|
obj.Logf("Panic in Run: %+v", r)
|
|
reterr = fmt.Errorf("panic in Run: %+v", r)
|
|
}
|
|
}()
|
|
|
|
ctx, cancel := context.WithCancel(ctx) // wrap parent
|
|
defer cancel()
|
|
|
|
// Add a wait before the "started" signal runs so that Cleanup waits.
|
|
obj.wg.Add(1)
|
|
defer obj.wg.Done()
|
|
|
|
// Send the start signal.
|
|
close(obj.startedChan)
|
|
|
|
if n := obj.graph.NumVertices(); n > 0 { // hack to make the api easier
|
|
obj.Logf("graph contained %d vertices before Run", n)
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
// kick the engine once to pull in any vertices from
|
|
// before we started running!
|
|
defer obj.Unlock()
|
|
obj.Lock()
|
|
}()
|
|
}
|
|
|
|
once := &sync.Once{}
|
|
loadedSignal := func() { close(obj.loadedChan) } // only run once!
|
|
|
|
// aggregate events channel
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
defer close(obj.streamChan)
|
|
drain := false
|
|
for {
|
|
var err error
|
|
var ok bool
|
|
select {
|
|
case err, ok = <-obj.ag: // aggregated channel
|
|
if !ok {
|
|
return // channel shutdown
|
|
}
|
|
}
|
|
|
|
if drain {
|
|
continue // no need to send more errors
|
|
}
|
|
|
|
// TODO: check obj.loaded first?
|
|
once.Do(loadedSignal)
|
|
|
|
// now send event...
|
|
if obj.Callback != nil {
|
|
// send stream signal (callback variant)
|
|
obj.Callback(ctx, err)
|
|
} else {
|
|
// send stream signal
|
|
select {
|
|
// send events or errors on streamChan
|
|
case obj.streamChan <- err: // send
|
|
case <-ctx.Done(): // when asked to exit
|
|
return
|
|
}
|
|
}
|
|
if err != nil {
|
|
cancel() // cancel the context!
|
|
//return // let the obj.ag channel drain
|
|
drain = true
|
|
}
|
|
}
|
|
}()
|
|
|
|
// wgAg is a wait group that waits for all senders to the ag chan.
|
|
// Exceptionally, we don't close the ag channel until wgFor has also
|
|
// closed, because it can send to wg in process().
|
|
wgAg := &sync.WaitGroup{}
|
|
wgFor := &sync.WaitGroup{}
|
|
|
|
// We need to keep the main loop running until everyone else has shut
|
|
// down. When the top context closes, we wait for everyone to finish,
|
|
// and then we shut down this main context.
|
|
//mainCtx, mainCancel := context.WithCancel(ctx) // wrap parent
|
|
mainCtx, mainCancel := context.WithCancel(context.Background()) // DON'T wrap parent, close on your own terms
|
|
defer mainCancel()
|
|
|
|
// close the aggregate channel when everyone is done with it...
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
select {
|
|
case <-ctx.Done():
|
|
}
|
|
|
|
// don't wait and close ag before we're really done with Run()
|
|
wgAg.Wait() // wait for last ag user to close
|
|
obj.wgTxn.Wait() // wait for first txn as well
|
|
mainCancel() // only cancel after wgAg goroutines are done
|
|
wgFor.Wait() // wait for process loop to close before closing
|
|
close(obj.ag) // last one closes the ag channel
|
|
}()
|
|
|
|
wgFn := &sync.WaitGroup{} // wg for process function runner
|
|
defer wgFn.Wait() // extra safety
|
|
|
|
defer obj.runNodeWaitFns() // just in case
|
|
|
|
wgFor.Add(1) // make sure we wait for the below process loop to exit...
|
|
defer wgFor.Done()
|
|
|
|
// errProcess and processBreakFn are used to help exit following an err.
|
|
// This approach is needed because if we simply exited, we'd block the
|
|
// main loop below because various Stream functions are waiting on the
|
|
// Lock/Unlock cycle to be able to finish cleanly, shutdown, and unblock
|
|
// all the waitgroups so we can exit.
|
|
var errProcess error
|
|
var pausedProcess bool
|
|
processBreakFn := func(err error /*, paused bool*/) {
|
|
if err == nil { // a nil error won't cause ag to shutdown below
|
|
panic("expected error, not nil")
|
|
}
|
|
if obj.Debug {
|
|
obj.Logf("process break")
|
|
}
|
|
select {
|
|
case obj.ag <- err: // send error to aggregate channel
|
|
case <-ctx.Done():
|
|
}
|
|
cancel() // to unblock
|
|
//mainCancel() // NO!
|
|
errProcess = err // set above error
|
|
//pausedProcess = paused // set this inline directly
|
|
}
|
|
if obj.Debug {
|
|
defer obj.Logf("exited main loop")
|
|
}
|
|
// we start off "running", but we'll have an empty graph initially...
|
|
for {
|
|
|
|
// After we've resumed, we can try to exit. (shortcut)
|
|
// NOTE: If someone calls Lock(), which would send to
|
|
// obj.pauseChan, it *won't* deadlock here because mainCtx is
|
|
// only closed when all the worker waitgroups close first!
|
|
select {
|
|
case <-mainCtx.Done(): // when asked to exit
|
|
return errProcess // we exit happily
|
|
default:
|
|
}
|
|
|
|
// run through our graph, check for pause request occasionally
|
|
for {
|
|
pausedProcess = false // reset
|
|
// if we're in errProcess, we skip the process loop!
|
|
if errProcess != nil {
|
|
break // skip this process loop
|
|
}
|
|
|
|
// Start the process run for this iteration of the loop.
|
|
ctxFn, cancelFn := context.WithCancel(context.Background())
|
|
// we run cancelFn() below to cleanup!
|
|
var errFn error
|
|
chanFn := make(chan struct{}) // normal exit signal
|
|
wgFn.Add(1)
|
|
go func() {
|
|
defer wgFn.Done()
|
|
defer close(chanFn) // signal that I exited
|
|
for {
|
|
if obj.Debug {
|
|
obj.Logf("process...")
|
|
}
|
|
if errFn = obj.process(ctxFn); errFn != nil { // store
|
|
if errFn != context.Canceled {
|
|
obj.Logf("process end err: %+v...", errFn)
|
|
}
|
|
return
|
|
}
|
|
if obj.Debug {
|
|
obj.Logf("process end...")
|
|
}
|
|
// If process finishes without error, we
|
|
// should sit here and wait until we get
|
|
// run again from a wake-up, or we exit.
|
|
select {
|
|
case <-obj.wakeChan: // wait until something has actually woken up...
|
|
if obj.Debug {
|
|
obj.Logf("process wakeup...")
|
|
}
|
|
// loop!
|
|
case <-ctxFn.Done():
|
|
errFn = context.Canceled
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
chFn := false
|
|
chPause := false
|
|
ctxExit := false
|
|
select {
|
|
//case <-obj.wakeChan:
|
|
// this happens entirely in the process inner, inner loop now.
|
|
|
|
case <-chanFn: // process exited on it's own in error!
|
|
chFn = true
|
|
|
|
case <-obj.pauseChan:
|
|
if obj.Debug {
|
|
obj.Logf("pausing...")
|
|
}
|
|
chPause = true
|
|
|
|
case <-mainCtx.Done(): // when asked to exit
|
|
//return nil // we exit happily
|
|
ctxExit = true
|
|
}
|
|
|
|
//fmt.Printf("chPause: %+v\n", chPause) // debug
|
|
//fmt.Printf("ctxExit: %+v\n", ctxExit) // debug
|
|
|
|
cancelFn() // cancel the process function
|
|
wgFn.Wait() // wait for the process function to return
|
|
|
|
pausedProcess = chPause // tell the below select
|
|
if errFn == nil {
|
|
// break on errors (needs to know if paused)
|
|
processBreakFn(fmt.Errorf("unexpected nil error in process"))
|
|
break
|
|
}
|
|
if errFn != nil && errFn != context.Canceled {
|
|
// break on errors (needs to know if paused)
|
|
processBreakFn(errwrap.Wrapf(errFn, "process error"))
|
|
break
|
|
}
|
|
//if errFn == context.Canceled {
|
|
// // ignore, we asked for it
|
|
//}
|
|
|
|
if ctxExit {
|
|
return nil // we exit happily
|
|
}
|
|
if chPause {
|
|
break
|
|
}
|
|
|
|
// This used to happen if a node (in the list we are
|
|
// sending to) dies, and we returned with:
|
|
// `case <-node.ctx.Done():` // node died
|
|
// return node.ctx.Err()
|
|
// which caused this scenario.
|
|
if chFn && errFn == context.Canceled { // very rare case
|
|
// programming error
|
|
processBreakFn(fmt.Errorf("legacy unhandled process state"))
|
|
break
|
|
}
|
|
|
|
// programming error
|
|
//return fmt.Errorf("unhandled process state")
|
|
processBreakFn(fmt.Errorf("unhandled process state"))
|
|
break
|
|
}
|
|
// if we're in errProcess, we need to add back in the pauseChan!
|
|
if errProcess != nil && !pausedProcess {
|
|
select {
|
|
case <-obj.pauseChan:
|
|
if obj.Debug {
|
|
obj.Logf("lower pausing...")
|
|
}
|
|
|
|
// do we want this exit case? YES
|
|
case <-mainCtx.Done(): // when asked to exit
|
|
return errProcess
|
|
}
|
|
}
|
|
|
|
// Toposort for paused workers. We run this before the actual
|
|
// pause completes, because the second we are paused, the graph
|
|
// could then immediately change. We don't need a lock in here
|
|
// because the mutex only unlocks when pause is complete below.
|
|
//topoSort1, err := obj.graph.TopologicalSort()
|
|
//if err != nil {
|
|
// return err
|
|
//}
|
|
//for _, v := range topoSort1 {}
|
|
|
|
// pause is complete
|
|
// no exit case from here, must be fully running or paused...
|
|
select {
|
|
case obj.pausedChan <- struct{}{}:
|
|
if obj.Debug {
|
|
obj.Logf("paused!")
|
|
}
|
|
}
|
|
|
|
//
|
|
// the graph changes shape right here... we are locked right now
|
|
//
|
|
|
|
// wait until resumed/unlocked
|
|
select {
|
|
case <-obj.resumeChan:
|
|
if obj.Debug {
|
|
obj.Logf("resuming...")
|
|
}
|
|
}
|
|
|
|
// Do any cleanup needed from delete vertex. Or do we?
|
|
// We've ascertained that while we want this stuff to shutdown,
|
|
// and while we also know that a Stream() function running is a
|
|
// part of what we're waiting for to exit, it doesn't matter
|
|
// that it exits now! This is actually causing a deadlock
|
|
// because the pending Stream exit, might be calling a new
|
|
// Reverse commit, which means we're deadlocked. It's safe for
|
|
// the Stream to keep running, all it might do is needlessly add
|
|
// a new value to obj.table which won't bother us since we won't
|
|
// even use it in process. We _do_ want to wait for all of these
|
|
// before the final exit, but we already have that in a defer.
|
|
//obj.runNodeWaitFns()
|
|
|
|
// Toposort to run/resume workers. (Bottom of toposort first!)
|
|
topoSort2, err := obj.graph.TopologicalSort()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reversed := pgraph.Reverse(topoSort2)
|
|
for _, v := range reversed {
|
|
f, ok := v.(interfaces.Func)
|
|
if !ok {
|
|
panic("not a Func")
|
|
}
|
|
node, exists := obj.state[f]
|
|
if !exists {
|
|
panic(fmt.Sprintf("missing node in iterate: %s", f))
|
|
}
|
|
|
|
if node.running { // it's not a new vertex
|
|
continue
|
|
}
|
|
obj.loaded = false // reset this
|
|
node.running = true
|
|
|
|
obj.statsMutex.Lock()
|
|
val, _ := obj.stats.inputList[node] // val is # or zero
|
|
obj.stats.inputList[node] = val // initialize to zero
|
|
obj.statsMutex.Unlock()
|
|
|
|
innerCtx, innerCancel := context.WithCancel(ctx) // wrap parent (not mainCtx)
|
|
// we defer innerCancel() in the goroutine to cleanup!
|
|
node.ctx = innerCtx
|
|
node.cancel = innerCancel
|
|
|
|
// run mainloop
|
|
wgAg.Add(1)
|
|
node.wg.Add(1)
|
|
go func(f interfaces.Func, node *state) {
|
|
defer node.wg.Done()
|
|
defer wgAg.Done()
|
|
defer node.cancel() // if we close, clean up and send the signal to anyone watching
|
|
if obj.Debug {
|
|
obj.Logf("Running func `%s`", node)
|
|
obj.statsMutex.Lock()
|
|
obj.stats.runningList[node] = struct{}{}
|
|
obj.stats.loadedList[node] = false
|
|
obj.statsMutex.Unlock()
|
|
}
|
|
|
|
fn := func(nodeCtx context.Context) (reterr error) {
|
|
defer func() {
|
|
// catch programming errors
|
|
if r := recover(); r != nil {
|
|
obj.Logf("Panic in Stream of func `%s`: %+v", node, r)
|
|
reterr = fmt.Errorf("panic in Stream of func `%s`: %+v", node, r)
|
|
}
|
|
}()
|
|
return f.Stream(nodeCtx)
|
|
}
|
|
runErr := fn(node.ctx) // wrap with recover()
|
|
if obj.Debug {
|
|
obj.Logf("Exiting func `%s`", node)
|
|
obj.statsMutex.Lock()
|
|
delete(obj.stats.runningList, node)
|
|
obj.statsMutex.Unlock()
|
|
}
|
|
if runErr != nil {
|
|
obj.Logf("Erroring func `%s`: %+v", node, runErr)
|
|
// send to a aggregate channel
|
|
// the first to error will cause ag to
|
|
// shutdown, so make sure we can exit...
|
|
select {
|
|
case obj.ag <- runErr: // send to aggregate channel
|
|
case <-node.ctx.Done():
|
|
}
|
|
}
|
|
// if node never loaded, then we error in the node.output loop!
|
|
}(f, node)
|
|
|
|
// consume output
|
|
wgAg.Add(1)
|
|
node.wg.Add(1)
|
|
go func(f interfaces.Func, node *state) {
|
|
defer node.wg.Done()
|
|
defer wgAg.Done()
|
|
defer func() {
|
|
// We record the fact that output
|
|
// closed, so we can eventually close
|
|
// the downstream node's input.
|
|
obj.stateMutex.Lock()
|
|
obj.isClosed[node] = struct{}{} // closed!
|
|
obj.stateMutex.Unlock()
|
|
// TODO: is this wake necessary?
|
|
obj.wake("closed") // closed, so wake up
|
|
}()
|
|
|
|
for value := range node.output { // read from channel
|
|
if value == nil {
|
|
// bug!
|
|
obj.Logf("func `%s` got nil value", node)
|
|
panic("got nil value")
|
|
}
|
|
|
|
obj.tableMutex.RLock()
|
|
cached, exists := obj.table[f]
|
|
obj.tableMutex.RUnlock()
|
|
if !exists { // first value received
|
|
// RACE: do this AFTER value is present!
|
|
//node.loaded = true // not yet please
|
|
if obj.Debug {
|
|
obj.Logf("func `%s` started", node)
|
|
}
|
|
} else if value.Cmp(cached) == nil {
|
|
// skip if new value is same as previous
|
|
// if this happens often, it *might* be
|
|
// a bug in the function implementation
|
|
// FIXME: do we need to disable engine
|
|
// caching when using hysteresis?
|
|
if obj.Debug {
|
|
obj.Logf("func `%s` skipped", node)
|
|
}
|
|
continue
|
|
}
|
|
obj.tableMutex.Lock()
|
|
obj.table[f] = value // save the latest
|
|
obj.tableMutex.Unlock()
|
|
node.rwmutex.Lock()
|
|
node.loaded = true // set *after* value is in :)
|
|
//obj.Logf("func `%s` changed", node)
|
|
node.rwmutex.Unlock()
|
|
|
|
obj.statsMutex.Lock()
|
|
obj.stats.loadedList[node] = true
|
|
obj.statsMutex.Unlock()
|
|
|
|
// Send a message to tell our ag channel
|
|
// that we might have sent an aggregated
|
|
// message here. They should check if we
|
|
// are a leaf and if we glitch or not...
|
|
// Make sure we do this before the wake.
|
|
obj.stateMutex.Lock()
|
|
obj.activity[node] = struct{}{} // activity!
|
|
obj.stateMutex.Unlock()
|
|
|
|
obj.wake("new value") // new value, so send wake up
|
|
|
|
} // end for
|
|
|
|
// no more output values are coming...
|
|
//obj.Logf("func `%s` stopped", node)
|
|
|
|
// nodes that never loaded will cause the engine to hang
|
|
if !node.loaded {
|
|
select {
|
|
case obj.ag <- fmt.Errorf("func `%s` stopped before it was loaded", node):
|
|
case <-node.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
}(f, node)
|
|
|
|
} // end for
|
|
|
|
// Send new notifications in case any new edges are sending away
|
|
// to these... They might have already missed the notifications!
|
|
for k := range obj.resend { // resend TO these!
|
|
node, exists := obj.state[k]
|
|
if !exists {
|
|
continue
|
|
}
|
|
// Run as a goroutine to avoid erroring in parent thread.
|
|
wg.Add(1)
|
|
go func(node *state) {
|
|
defer wg.Done()
|
|
if obj.Debug {
|
|
obj.Logf("resend to func `%s`", node)
|
|
}
|
|
obj.wake("resend") // new value, so send wake up
|
|
}(node)
|
|
}
|
|
obj.resend = make(map[interfaces.Func]struct{}) // reset
|
|
|
|
// now check their states...
|
|
//for _, v := range reversed {
|
|
// v, ok := v.(interfaces.Func)
|
|
// if !ok {
|
|
// panic("not a Func")
|
|
// }
|
|
// // wait for startup?
|
|
// close(obj.state[v].startup) XXX: once?
|
|
//}
|
|
|
|
// resume is complete
|
|
// no exit case from here, must be fully running or paused...
|
|
select {
|
|
case obj.resumedChan <- struct{}{}:
|
|
if obj.Debug {
|
|
obj.Logf("resumed!")
|
|
}
|
|
}
|
|
|
|
} // end for
|
|
}
|
|
|
|
// Stream returns a channel that you can follow to get aggregated graph events.
|
|
// Do not block reading from this channel as you can hold up the entire engine.
|
|
func (obj *Engine) Stream() <-chan error {
|
|
return obj.streamChan
|
|
}
|
|
|
|
// Loaded returns a channel that closes when the function engine loads.
|
|
func (obj *Engine) Loaded() <-chan struct{} {
|
|
return obj.loadedChan
|
|
}
|
|
|
|
// Table returns a copy of the populated data table of values. We return a copy
|
|
// because since these values are constantly changing, we need an atomic
|
|
// snapshot to present to the consumer of this API.
|
|
// TODO: is this globally glitch consistent?
|
|
// TODO: do we need an API to return a single value? (wrapped in read locks)
|
|
func (obj *Engine) Table() map[interfaces.Func]types.Value {
|
|
obj.tableMutex.RLock()
|
|
defer obj.tableMutex.RUnlock()
|
|
table := make(map[interfaces.Func]types.Value)
|
|
for k, v := range obj.table {
|
|
//table[k] = v.Copy() // TODO: do we need to copy these values?
|
|
table[k] = v
|
|
}
|
|
return table
|
|
}
|
|
|
|
// Apply is similar to Table in that it gives you access to the internal output
|
|
// table of data, the difference being that it instead passes this information
|
|
// to a function of your choosing and holds a read/write lock during the entire
|
|
// time that your function is synchronously executing. If you use this function
|
|
// to spawn any goroutines that read or write data, then you're asking for a
|
|
// panic.
|
|
// XXX: does this need to be a Lock? Can it be an RLock? Check callers!
|
|
func (obj *Engine) Apply(fn func(map[interfaces.Func]types.Value) error) error {
|
|
// XXX: does this need to be a Lock? Can it be an RLock? Check callers!
|
|
obj.tableMutex.Lock() // differs from above RLock around obj.table
|
|
defer obj.tableMutex.Unlock()
|
|
table := make(map[interfaces.Func]types.Value)
|
|
for k, v := range obj.table {
|
|
//table[k] = v.Copy() // TODO: do we need to copy these values?
|
|
table[k] = v
|
|
}
|
|
|
|
return fn(table)
|
|
}
|
|
|
|
// Started returns a channel that closes when the Run function finishes starting
|
|
// up. This is useful so that we can wait before calling any of the mutex things
|
|
// that would normally panic if Run wasn't started up first.
|
|
func (obj *Engine) Started() <-chan struct{} {
|
|
return obj.startedChan
|
|
}
|
|
|
|
// NumVertices returns the number of vertices in the current graph.
|
|
func (obj *Engine) NumVertices() int {
|
|
// XXX: would this deadlock if we added this?
|
|
//obj.graphMutex.Lock() // XXX: should this be a RLock?
|
|
//defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock?
|
|
return obj.graph.NumVertices()
|
|
}
|
|
|
|
// Stats returns some statistics in a human-readable form.
|
|
func (obj *Engine) Stats() string {
|
|
defer obj.statsMutex.RUnlock()
|
|
obj.statsMutex.RLock()
|
|
|
|
return obj.stats.String()
|
|
}
|
|
|
|
// ExecGraphviz writes out the diagram of a graph to be used for visualization
|
|
// and debugging. You must not modify the graph (eg: during Lock) when calling
|
|
// this method.
|
|
func (obj *Engine) ExecGraphviz(dir string) error {
|
|
// XXX: would this deadlock if we added this?
|
|
//obj.graphMutex.Lock() // XXX: should this be a RLock?
|
|
//defer obj.graphMutex.Unlock() // XXX: should this be an RUnlock?
|
|
|
|
obj.graphvizMutex.Lock()
|
|
defer obj.graphvizMutex.Unlock()
|
|
|
|
obj.graphvizCount++ // increment
|
|
|
|
if dir == "" {
|
|
dir = obj.graphvizDirectory
|
|
}
|
|
if dir == "" { // XXX: hack for ergonomics
|
|
d := time.Now().UnixMilli()
|
|
dir = fmt.Sprintf("/tmp/dage-graphviz-%s-%d/", obj.Name, d)
|
|
obj.graphvizDirectory = dir
|
|
}
|
|
if !strings.HasSuffix(dir, "/") {
|
|
return fmt.Errorf("dir must end with a slash")
|
|
}
|
|
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
return err
|
|
}
|
|
|
|
dashedEdges, err := pgraph.NewGraph("dashedEdges")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, v1 := range obj.graph.Vertices() {
|
|
// if it's a ChannelBasedSinkFunc...
|
|
if cb, ok := v1.(*structs.ChannelBasedSinkFunc); ok {
|
|
// ...then add a dashed edge to its output
|
|
dashedEdges.AddEdge(v1, cb.Target, &pgraph.SimpleEdge{
|
|
Name: "channel", // secret channel
|
|
})
|
|
}
|
|
// if it's a ChannelBasedSourceFunc...
|
|
if cb, ok := v1.(*structs.ChannelBasedSourceFunc); ok {
|
|
// ...then add a dashed edge from its input
|
|
dashedEdges.AddEdge(cb.Source, v1, &pgraph.SimpleEdge{
|
|
Name: "channel", // secret channel
|
|
})
|
|
}
|
|
}
|
|
|
|
gv := &pgraph.Graphviz{
|
|
Name: obj.graph.GetName(),
|
|
Filename: fmt.Sprintf("%s/%d.dot", dir, obj.graphvizCount),
|
|
Graphs: map[*pgraph.Graph]*pgraph.GraphvizOpts{
|
|
obj.graph: nil,
|
|
dashedEdges: {
|
|
Style: "dashed",
|
|
},
|
|
},
|
|
}
|
|
|
|
if err := gv.Exec(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// state tracks some internal vertex-specific state information.
|
|
type state struct {
|
|
Func interfaces.Func
|
|
name string // cache a name here for safer concurrency
|
|
|
|
input chan types.Value // the top level type must be a struct
|
|
output chan types.Value
|
|
txn interfaces.Txn // API of GraphTxn struct to pass to each function
|
|
|
|
//init bool // have we run Init on our func?
|
|
//ready bool // has it received all the args it needs at least once?
|
|
loaded bool // has the func run at least once ?
|
|
inputClosed bool // is our input closed?
|
|
outputClosed bool // is our output closed?
|
|
|
|
isLeaf bool // is my out degree zero?
|
|
|
|
running bool
|
|
wg *sync.WaitGroup
|
|
ctx context.Context // per state ctx (inner ctx)
|
|
cancel func() // cancel above inner ctx
|
|
|
|
rwmutex *sync.RWMutex // concurrency guard for reading/modifying this state
|
|
}
|
|
|
|
// String implements the fmt.Stringer interface for pretty printing!
|
|
func (obj *state) String() string {
|
|
if obj.name != "" {
|
|
return obj.name
|
|
}
|
|
|
|
return obj.Func.String()
|
|
}
|
|
|
|
// stats holds some statistics and other debugging information.
|
|
type stats struct {
|
|
|
|
// runningList keeps track of which nodes are still running.
|
|
runningList map[*state]struct{}
|
|
|
|
// loadedList keeps track of which nodes have loaded.
|
|
loadedList map[*state]bool
|
|
|
|
// inputList keeps track of the number of inputs each node received.
|
|
inputList map[*state]int64
|
|
}
|
|
|
|
// String implements the fmt.Stringer interface for printing out our collected
|
|
// statistics!
|
|
func (obj *stats) String() string {
|
|
// XXX: just build the lock into *stats instead of into our dage obj
|
|
s := "stats:\n"
|
|
{
|
|
s += "\trunning:\n"
|
|
names := []string{}
|
|
for k := range obj.runningList {
|
|
names = append(names, k.String())
|
|
}
|
|
sort.Strings(names)
|
|
for _, name := range names {
|
|
s += fmt.Sprintf("\t * %s\n", name)
|
|
}
|
|
}
|
|
{
|
|
nodes := []*state{}
|
|
for k := range obj.loadedList {
|
|
nodes = append(nodes, k)
|
|
}
|
|
sort.Slice(nodes, func(i, j int) bool { return nodes[i].String() < nodes[j].String() })
|
|
|
|
s += "\tloaded:\n"
|
|
for _, node := range nodes {
|
|
if !obj.loadedList[node] {
|
|
continue
|
|
}
|
|
s += fmt.Sprintf("\t * %s\n", node)
|
|
}
|
|
|
|
s += "\tnot loaded:\n"
|
|
for _, node := range nodes {
|
|
if obj.loadedList[node] {
|
|
continue
|
|
}
|
|
s += fmt.Sprintf("\t * %s\n", node)
|
|
}
|
|
}
|
|
{
|
|
s += "\tinput count:\n"
|
|
nodes := []*state{}
|
|
for k := range obj.inputList {
|
|
nodes = append(nodes, k)
|
|
}
|
|
//sort.Slice(nodes, func(i, j int) bool { return nodes[i].String() < nodes[j].String() })
|
|
sort.Slice(nodes, func(i, j int) bool { return obj.inputList[nodes[i]] < obj.inputList[nodes[j]] })
|
|
for _, node := range nodes {
|
|
s += fmt.Sprintf("\t * (%d) %s\n", obj.inputList[node], node)
|
|
}
|
|
}
|
|
return s
|
|
}
|