lang: funcs: dage: Move much logging behind debug flag
Most of this logging isn't useful for ordinary usage. Hide it for now. Eventually when we have a fancy logging system (curses-like) we can bring back more information on the state of everything.
This commit is contained in:
@@ -727,7 +727,9 @@ func (obj *Engine) process(ctx context.Context) (reterr error) {
|
|||||||
// preempted and that future executions of this function can be
|
// preempted and that future executions of this function can be
|
||||||
// resumed. We must return with an error to let folks know that
|
// resumed. We must return with an error to let folks know that
|
||||||
// we were interrupted.
|
// we were interrupted.
|
||||||
|
if obj.Debug {
|
||||||
obj.Logf("send to func `%s`", node)
|
obj.Logf("send to func `%s`", node)
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case node.input <- st: // send to function
|
case node.input <- st: // send to function
|
||||||
obj.statsMutex.Lock()
|
obj.statsMutex.Lock()
|
||||||
@@ -968,7 +970,9 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
|
|||||||
if err == nil { // a nil error won't cause ag to shutdown below
|
if err == nil { // a nil error won't cause ag to shutdown below
|
||||||
panic("expected error, not nil")
|
panic("expected error, not nil")
|
||||||
}
|
}
|
||||||
|
if obj.Debug {
|
||||||
obj.Logf("process break")
|
obj.Logf("process break")
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case obj.ag <- err: // send error to aggregate channel
|
case obj.ag <- err: // send error to aggregate channel
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -1029,7 +1033,9 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
|
|||||||
// run again from a wake-up, or we exit.
|
// run again from a wake-up, or we exit.
|
||||||
select {
|
select {
|
||||||
case <-obj.wakeChan: // wait until something has actually woken up...
|
case <-obj.wakeChan: // wait until something has actually woken up...
|
||||||
|
if obj.Debug {
|
||||||
obj.Logf("process wakeup...")
|
obj.Logf("process wakeup...")
|
||||||
|
}
|
||||||
// loop!
|
// loop!
|
||||||
case <-ctxFn.Done():
|
case <-ctxFn.Done():
|
||||||
errFn = context.Canceled
|
errFn = context.Canceled
|
||||||
@@ -1049,7 +1055,9 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
|
|||||||
chFn = true
|
chFn = true
|
||||||
|
|
||||||
case <-obj.pauseChan:
|
case <-obj.pauseChan:
|
||||||
|
if obj.Debug {
|
||||||
obj.Logf("pausing...")
|
obj.Logf("pausing...")
|
||||||
|
}
|
||||||
chPause = true
|
chPause = true
|
||||||
|
|
||||||
case <-mainCtx.Done(): // when asked to exit
|
case <-mainCtx.Done(): // when asked to exit
|
||||||
@@ -1105,7 +1113,9 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
|
|||||||
if errProcess != nil && !pausedProcess {
|
if errProcess != nil && !pausedProcess {
|
||||||
select {
|
select {
|
||||||
case <-obj.pauseChan:
|
case <-obj.pauseChan:
|
||||||
|
if obj.Debug {
|
||||||
obj.Logf("lower pausing...")
|
obj.Logf("lower pausing...")
|
||||||
|
}
|
||||||
|
|
||||||
// do we want this exit case? YES
|
// do we want this exit case? YES
|
||||||
case <-mainCtx.Done(): // when asked to exit
|
case <-mainCtx.Done(): // when asked to exit
|
||||||
@@ -1127,8 +1137,10 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
|
|||||||
// no exit case from here, must be fully running or paused...
|
// no exit case from here, must be fully running or paused...
|
||||||
select {
|
select {
|
||||||
case obj.pausedChan <- struct{}{}:
|
case obj.pausedChan <- struct{}{}:
|
||||||
|
if obj.Debug {
|
||||||
obj.Logf("paused!")
|
obj.Logf("paused!")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// the graph changes shape right here... we are locked right now
|
// the graph changes shape right here... we are locked right now
|
||||||
@@ -1137,8 +1149,10 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
|
|||||||
// wait until resumed/unlocked
|
// wait until resumed/unlocked
|
||||||
select {
|
select {
|
||||||
case <-obj.resumeChan:
|
case <-obj.resumeChan:
|
||||||
|
if obj.Debug {
|
||||||
obj.Logf("resuming...")
|
obj.Logf("resuming...")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Do any cleanup needed from delete vertex. Or do we?
|
// Do any cleanup needed from delete vertex. Or do we?
|
||||||
// We've ascertained that while we want this stuff to shutdown,
|
// We've ascertained that while we want this stuff to shutdown,
|
||||||
@@ -1260,14 +1274,18 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
|
|||||||
if !exists { // first value received
|
if !exists { // first value received
|
||||||
// RACE: do this AFTER value is present!
|
// RACE: do this AFTER value is present!
|
||||||
//node.loaded = true // not yet please
|
//node.loaded = true // not yet please
|
||||||
|
if obj.Debug {
|
||||||
obj.Logf("func `%s` started", node)
|
obj.Logf("func `%s` started", node)
|
||||||
|
}
|
||||||
} else if value.Cmp(cached) == nil {
|
} else if value.Cmp(cached) == nil {
|
||||||
// skip if new value is same as previous
|
// skip if new value is same as previous
|
||||||
// if this happens often, it *might* be
|
// if this happens often, it *might* be
|
||||||
// a bug in the function implementation
|
// a bug in the function implementation
|
||||||
// FIXME: do we need to disable engine
|
// FIXME: do we need to disable engine
|
||||||
// caching when using hysteresis?
|
// caching when using hysteresis?
|
||||||
|
if obj.Debug {
|
||||||
obj.Logf("func `%s` skipped", node)
|
obj.Logf("func `%s` skipped", node)
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
obj.tableMutex.Lock()
|
obj.tableMutex.Lock()
|
||||||
@@ -1322,7 +1340,9 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(node *state) {
|
go func(node *state) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
if obj.Debug {
|
||||||
obj.Logf("resend to func `%s`", node)
|
obj.Logf("resend to func `%s`", node)
|
||||||
|
}
|
||||||
obj.wake("resend") // new value, so send wake up
|
obj.wake("resend") // new value, so send wake up
|
||||||
}(node)
|
}(node)
|
||||||
}
|
}
|
||||||
@@ -1342,8 +1362,10 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
|
|||||||
// no exit case from here, must be fully running or paused...
|
// no exit case from here, must be fully running or paused...
|
||||||
select {
|
select {
|
||||||
case obj.resumedChan <- struct{}{}:
|
case obj.resumedChan <- struct{}{}:
|
||||||
|
if obj.Debug {
|
||||||
obj.Logf("resumed!")
|
obj.Logf("resumed!")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // end for
|
} // end for
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user