From 354a1c23b0d9b471012707a27f8e188c7627ae80 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Thu, 17 Jan 2019 18:46:00 -0500 Subject: [PATCH] engine: graph: Prevent converged timeout of dirty res Somewhere after the engine re-write we seem to have regressed and converge early even if some resource is dirty. This adds an additional timer so that we don't start the individual resource converged countdown until our state is okay. --- engine/graph/actions.go | 4 ++++ engine/graph/state.go | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/engine/graph/actions.go b/engine/graph/actions.go index 56e057b2..3eaad4b2 100644 --- a/engine/graph/actions.go +++ b/engine/graph/actions.go @@ -119,6 +119,7 @@ func (obj *Engine) Process(vertex pgraph.Vertex) error { for _, changed := range updated { if changed { // at least one was updated // invalidate cache, mark as dirty + obj.state[vertex].tuid.StopTimer() obj.state[vertex].isStateOK = false break } @@ -174,6 +175,7 @@ func (obj *Engine) Process(vertex pgraph.Vertex) error { // if CheckApply ran without noop and without error, state should be good if !noop && err == nil { // aka !noop || checkOK + obj.state[vertex].tuid.StartTimer() obj.state[vertex].isStateOK = true // reset if refresh { obj.SetUpstreamRefresh(vertex, false) // refresh happened, clear the request @@ -252,9 +254,11 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error { defer close(obj.state[vertex].stopped) // done signal obj.state[vertex].cuid = obj.Converger.Register() + obj.state[vertex].tuid = obj.Converger.Register() // must wait for all users of the cuid to finish *before* we unregister! // as a result, this defer happens *before* the below wait group Wait... defer obj.state[vertex].cuid.Unregister() + defer obj.state[vertex].tuid.Unregister() defer obj.state[vertex].wg.Wait() // this Worker is the last to exit! diff --git a/engine/graph/state.go b/engine/graph/state.go index 8fa1d654..3c2a90ef 100644 --- a/engine/graph/state.go +++ b/engine/graph/state.go @@ -86,6 +86,7 @@ type State struct { working bool // is the Main() loop running ? cuid converger.UID // primary converger + tuid converger.UID // secondary converger init *engine.Init // a copy of the init struct passed to res Init } @@ -121,6 +122,7 @@ func (obj *State) Init() error { } //obj.cuid = obj.Converger.Register() // gets registered in Worker() + //obj.tuid = obj.Converger.Register() // gets registered in Worker() obj.init = &engine.Init{ Program: obj.Program, @@ -128,6 +130,7 @@ func (obj *State) Init() error { // Watch: Running: func() error { + obj.tuid.StopTimer() close(obj.started) // this is reset in the reset func obj.isStateOK = false // assume we're initially dirty // optimization: skip the initial send if not a starter @@ -141,6 +144,7 @@ func (obj *State) Init() error { Events: obj.eventsChan, Read: obj.read, Dirty: func() { // TODO: should we rename this SetDirty? + obj.tuid.StopTimer() obj.isStateOK = false }, @@ -208,6 +212,9 @@ func (obj *State) Close() error { //if obj.cuid != nil { // obj.cuid.Unregister() // gets unregistered in Worker() //} + //if obj.tuid != nil { + // obj.tuid.Unregister() // gets unregistered in Worker() + //} // redundant safety obj.wg.Wait() // wait until all poke's and events on me have exited