diff --git a/pgraph/actions.go b/pgraph/actions.go index 0a4cea60..a68a8354 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -164,130 +164,131 @@ func (g *Graph) Process(v *Vertex) error { // is it okay to run dependency wise right now? // if not, that's okay because when the dependency runs, it will poke // us back and we will run if needed then! - if g.OKTimestamp(v) { - if g.Flags.Debug { - log.Printf("%s[%s]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp()) - } - - obj.SetState(resources.ResStateCheckApply) - - // connect any senders to receivers and detect if values changed - if updated, err := obj.SendRecv(obj); err != nil { - return errwrap.Wrapf(err, "could not SendRecv in Process") - } else if len(updated) > 0 { - for _, changed := range updated { - if changed { // at least one was updated - obj.StateOK(false) // invalidate cache, mark as dirty - break - } - } - } - - var noop = obj.Meta().Noop // lookup the noop value - var refresh bool - var checkOK bool - var err error - - if g.Flags.Debug { - log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), !noop) - } - - // lookup the refresh (notification) variable - refresh = g.RefreshPending(v) // do i need to perform a refresh? - obj.SetRefresh(refresh) // tell the resource - - // check cached state, to skip CheckApply; can't skip if refreshing - if !refresh && obj.IsStateOK() { - checkOK, err = true, nil - - // NOTE: technically this block is wrong because we don't know - // if the resource implements refresh! If it doesn't, we could - // skip this, but it doesn't make a big difference under noop! - } else if noop && refresh { // had a refresh to do w/ noop! - checkOK, err = false, nil // therefore the state is wrong - - // run the CheckApply! - } else { - // if the CheckApply run takes longer than the converged - // timeout, we could inappropriately converge mid-apply! - // avoid this by blocking convergence with a fake report - block := obj.Converger().Register() // get an extra cuid - block.SetConverged(false) // block while CheckApply runs! - - // if this fails, don't UpdateTimestamp() - checkOK, err = obj.CheckApply(!noop) - - block.SetConverged(true) // unblock - block.Unregister() - - // TODO: Can the `Poll` converged timeout tracking be a - // more general method for all converged timeouts? this - // would simplify the resources by removing boilerplate - if v.Meta().Poll > 0 { - if !checkOK { // something changed, restart timer - cuid := v.Res.ConvergerUID() // get the converger uid used to report status - cuid.ResetTimer() // activity! - if g.Flags.Debug { - log.Printf("%s[%s]: Converger: ResetTimer", obj.Kind(), obj.GetName()) - } - } - } - } - - if checkOK && err != nil { // should never return this way - log.Fatalf("%s[%s]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), checkOK, err) - } - if g.Flags.Debug { - log.Printf("%s[%s]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), checkOK, err) - } - - // if CheckApply ran without noop and without error, state should be good - if !noop && err == nil { // aka !noop || checkOK - obj.StateOK(true) // reset - if refresh { - g.SetUpstreamRefresh(v, false) // refresh happened, clear the request - obj.SetRefresh(false) - } - } - - if !checkOK { // if state *was* not ok, we had to have apply'ed - if err != nil { // error during check or apply - ok = false - } else { - applied = true - } - } - - // when noop is true we always want to update timestamp - if noop && err == nil { - ok = true - } - - if ok { - // did we actually do work? - activity := applied - if noop { - activity = false // no we didn't do work... - } - - if activity { // add refresh flag to downstream edges... - g.SetDownstreamRefresh(v, true) - } - - // update this timestamp *before* we poke or the poked - // nodes might fail due to having a too old timestamp! - v.UpdateTimestamp() // this was touched... - obj.SetState(resources.ResStatePoking) // can't cancel parent poke - if err := g.Poke(v, activity); err != nil { - return errwrap.Wrapf(err, "the Poke() failed") - } - } - // poke at our pre-req's instead since they need to refresh/run... - return errwrap.Wrapf(err, "could not Process() successfully") + if !g.OKTimestamp(v) { + go g.BackPoke(v) + return nil } - // else... only poke at the pre-req's that need to run - go g.BackPoke(v) - return nil + // timestamp must be okay... + + if g.Flags.Debug { + log.Printf("%s[%s]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp()) + } + + obj.SetState(resources.ResStateCheckApply) + + // connect any senders to receivers and detect if values changed + if updated, err := obj.SendRecv(obj); err != nil { + return errwrap.Wrapf(err, "could not SendRecv in Process") + } else if len(updated) > 0 { + for _, changed := range updated { + if changed { // at least one was updated + obj.StateOK(false) // invalidate cache, mark as dirty + break + } + } + } + + var noop = obj.Meta().Noop // lookup the noop value + var refresh bool + var checkOK bool + var err error + + if g.Flags.Debug { + log.Printf("%s[%s]: CheckApply(%t)", obj.Kind(), obj.GetName(), !noop) + } + + // lookup the refresh (notification) variable + refresh = g.RefreshPending(v) // do i need to perform a refresh? + obj.SetRefresh(refresh) // tell the resource + + // check cached state, to skip CheckApply; can't skip if refreshing + if !refresh && obj.IsStateOK() { + checkOK, err = true, nil + + // NOTE: technically this block is wrong because we don't know + // if the resource implements refresh! If it doesn't, we could + // skip this, but it doesn't make a big difference under noop! + } else if noop && refresh { // had a refresh to do w/ noop! + checkOK, err = false, nil // therefore the state is wrong + + // run the CheckApply! + } else { + // if the CheckApply run takes longer than the converged + // timeout, we could inappropriately converge mid-apply! + // avoid this by blocking convergence with a fake report + block := obj.Converger().Register() // get an extra cuid + block.SetConverged(false) // block while CheckApply runs! + + // if this fails, don't UpdateTimestamp() + checkOK, err = obj.CheckApply(!noop) + + block.SetConverged(true) // unblock + block.Unregister() + + // TODO: Can the `Poll` converged timeout tracking be a + // more general method for all converged timeouts? this + // would simplify the resources by removing boilerplate + if v.Meta().Poll > 0 { + if !checkOK { // something changed, restart timer + cuid := v.Res.ConvergerUID() // get the converger uid used to report status + cuid.ResetTimer() // activity! + if g.Flags.Debug { + log.Printf("%s[%s]: Converger: ResetTimer", obj.Kind(), obj.GetName()) + } + } + } + } + + if checkOK && err != nil { // should never return this way + log.Fatalf("%s[%s]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), checkOK, err) + } + if g.Flags.Debug { + log.Printf("%s[%s]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), checkOK, err) + } + + // if CheckApply ran without noop and without error, state should be good + if !noop && err == nil { // aka !noop || checkOK + obj.StateOK(true) // reset + if refresh { + g.SetUpstreamRefresh(v, false) // refresh happened, clear the request + obj.SetRefresh(false) + } + } + + if !checkOK { // if state *was* not ok, we had to have apply'ed + if err != nil { // error during check or apply + ok = false + } else { + applied = true + } + } + + // when noop is true we always want to update timestamp + if noop && err == nil { + ok = true + } + + if ok { + // did we actually do work? + activity := applied + if noop { + activity = false // no we didn't do work... + } + + if activity { // add refresh flag to downstream edges... + g.SetDownstreamRefresh(v, true) + } + + // update this timestamp *before* we poke or the poked + // nodes might fail due to having a too old timestamp! + v.UpdateTimestamp() // this was touched... + obj.SetState(resources.ResStatePoking) // can't cancel parent poke + if err := g.Poke(v, activity); err != nil { + return errwrap.Wrapf(err, "the Poke() failed") + } + } + // poke at our pre-req's instead since they need to refresh/run... + return errwrap.Wrapf(err, "could not Process() successfully") } // SentinelErr is a sentinal as an error type that wraps an arbitrary error.