converger: Block converging in the engine

This more appropriately blocks converging in the engine, since we are
now 1-1 decoupled from the Watch resource. This simplifies resource
writing, and should be more accurate around small converged timeouts.

We don't block in the Worker routine when we are polling, because we
expect to get constant poll events, and we can instead be more careful
about these by looking at CheckApply results.

If we can do this for all resources in the future, it would be
excellent!
This commit is contained in:
James Shubin
2017-01-25 10:17:43 -05:00
parent 7e15a9e181
commit 357102fdb5

View File

@@ -221,18 +221,9 @@ func (g *Graph) Process(v *Vertex) error {
// run the CheckApply!
} else {
// if the CheckApply run takes longer than the converged
// timeout, we could inappropriately converge mid-apply!
// avoid this by blocking convergence with a fake report
block := obj.Converger().Register() // get an extra cuid
block.SetConverged(false) // block while CheckApply runs!
// if this fails, don't UpdateTimestamp()
checkOK, err = obj.CheckApply(!noop)
block.SetConverged(true) // unblock
block.Unregister()
// TODO: Can the `Poll` converged timeout tracking be a
// more general method for all converged timeouts? this
// would simplify the resources by removing boilerplate
@@ -324,6 +315,18 @@ func (g *Graph) Worker(v *Vertex) error {
lock := &sync.Mutex{} // lock around processChan closing and sending
finished := false // did we close processChan ?
processChan := make(chan *event.Event)
// if the CheckApply run takes longer than the converged
// timeout, we could inappropriately converge mid-apply!
// avoid this by blocking convergence with a fake report
// we also add a similar blocker around the worker loop!
wcuid := obj.Converger().Register() // get an extra cuid for the worker!
defer wcuid.Unregister()
wcuid.SetConverged(true) // starts off false, and waits for loop timeout
pcuid := obj.Converger().Register() // get an extra cuid for the process
defer pcuid.Unregister()
pcuid.SetConverged(true) // starts off true, because it's not running...
go func() {
running := false
done := make(chan struct{})
@@ -342,14 +345,14 @@ func (g *Graph) Worker(v *Vertex) error {
Loop:
for {
// this has to be synchronous, because otherwise the Res
// event loop will keep running and change state,
// causing the converged timeout to fire!
select {
case ev, ok := <-processChan: // must use like this
if !ok { // processChan closed, let's exit
break Loop // no event, so no ack!
}
if v.Res.Meta().Poll == 0 { // skip for polling
wcuid.SetConverged(false)
}
// if process started, but no action yet, skip!
if v.Res.GetState() == resources.ResStateProcess {
@@ -402,6 +405,7 @@ func (g *Graph) Worker(v *Vertex) error {
running = true
go func(ev *event.Event) {
pcuid.SetConverged(false) // "block" Process
if e := g.Process(v); e != nil {
playback = true
log.Printf("%s[%s]: CheckApply errored: %v", v.Kind(), v.GetName(), e)
@@ -425,6 +429,9 @@ func (g *Graph) Worker(v *Vertex) error {
ev.ACK() // sync (now mostly useless)
case <-timer.C:
if v.Res.Meta().Poll == 0 { // skip for polling
wcuid.SetConverged(false)
}
waiting = false
if !timer.Stop() {
//<-timer.C // blocks, docs are wrong!
@@ -434,6 +441,9 @@ func (g *Graph) Worker(v *Vertex) error {
// a CheckApply run (with possibly retry pause) finished
case <-done:
if v.Res.Meta().Poll == 0 { // skip for polling
wcuid.SetConverged(false)
}
if g.Flags.Debug {
log.Printf("%s[%s]: CheckApply finished!", v.Kind(), v.GetName())
}
@@ -454,6 +464,11 @@ func (g *Graph) Worker(v *Vertex) error {
}()
}
running = false
pcuid.SetConverged(true) // "unblock" Process
case <-wcuid.ConvergedTimer():
wcuid.SetConverged(true) // converged!
continue
}
}
}()