diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 56e057b2..3eaad4b2 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -119,6 +119,7 @@ func (obj *Engine) Process(vertex pgraph.Vertex) error { for _, changed := range updated { if changed { // at least one was updated // invalidate cache, mark as dirty + obj.state[vertex].tuid.StopTimer() obj.state[vertex].isStateOK = false break } @@ -174,6 +175,7 @@ func (obj *Engine) Process(vertex pgraph.Vertex) error { // if CheckApply ran without noop and without error, state should be good if !noop && err == nil { // aka !noop || checkOK + obj.state[vertex].tuid.StartTimer() obj.state[vertex].isStateOK = true // reset if refresh { obj.SetUpstreamRefresh(vertex, false) // refresh happened, clear the request @@ -252,9 +254,11 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { defer close(obj.state[vertex].stopped) // done signal obj.state[vertex].cuid = obj.Converger.Register() + obj.state[vertex].tuid = obj.Converger.Register() // must wait for all users of the cuid to finish *before* we unregister! // as a result, this defer happens *before* the below wait group Wait... defer obj.state[vertex].cuid.Unregister() + defer obj.state[vertex].tuid.Unregister() defer obj.state[vertex].wg.Wait() // this Worker is the last to exit! diff --git a/engine/graph/state.go b/engine/graph/state.go index 8fa1d654..3c2a90ef 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -86,6 +86,7 @@ type State struct { working bool // is the Main() loop running ? cuid converger.UID // primary converger + tuid converger.UID // secondary converger init *engine.Init // a copy of the init struct passed to res Init } @@ -121,6 +122,7 @@ func (obj *State) Init() error { } //obj.cuid = obj.Converger.Register() // gets registered in Worker() + //obj.tuid = obj.Converger.Register() // gets registered in Worker() obj.init = &engine.Init{ Program: obj.Program, @@ -128,6 +130,7 @@ func (obj *State) Init() error { // Watch: Running: func() error { + obj.tuid.StopTimer() close(obj.started) // this is reset in the reset func obj.isStateOK = false // assume we're initially dirty // optimization: skip the initial send if not a starter @@ -141,6 +144,7 @@ func (obj *State) Init() error { Events: obj.eventsChan, Read: obj.read, Dirty: func() { // TODO: should we rename this SetDirty? + obj.tuid.StopTimer() obj.isStateOK = false }, @@ -208,6 +212,9 @@ func (obj *State) Close() error { //if obj.cuid != nil { // obj.cuid.Unregister() // gets unregistered in Worker() //} + //if obj.tuid != nil { + // obj.tuid.Unregister() // gets unregistered in Worker() + //} // redundant safety obj.wg.Wait() // wait until all poke's and events on me have exited