From 0b1a4a0f3087cc49aa56dd98eb500f616b1e3c04 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Mon, 6 Mar 2017 04:46:07 -0500 Subject: [PATCH] pgraph, resources: Quiesce when pausing or exiting the resource This prevents a nasty race that can happen in a graph with more than one resource. If a resource has someone that it can BackPoke, and then suppose an event comes in. It runs the obj.Event() method (from inside its Watch loop) and then *before* the resulting Process method can run it receives a pause event and pauses. Then the parent resource pauses as well. Finally (it's a race) the Process gets around to running, and decides it needs to BackPoke. At this point since the parent resource is paused, it receives the BackPoke at a time when it can't handle receiving one, and it panics! As a result, we now track the number of running Process possibilities via a WaitGroup which gets incremented from the obj.Event() and we don't finish our pause or exit operations until it has quiesced and our WaitGroup lets us know via Wait(). Lastly in order to prevent repeated replays, we detect when we're quiescing and suspend replaying until post pause. We don't need to save the replay (playback variable) explicitly because its state remains during pause, and on exit it would get re-checked anyways. --- pgraph/actions.go | 20 ++++++++++++++++++-- resources/resources.go | 23 +++++++++++++++++++---- resources/sendrecv.go | 18 +++++++++++++++++- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/pgraph/actions.go b/pgraph/actions.go index 55968b06..576daaa0 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -328,6 +328,7 @@ func (obj *SentinelErr) Error() string { } // innerWorker is the CheckApply runner that reads from processChan. +// TODO: would it be better if this was a method on BaseRes that took in *Graph? func (g *Graph) innerWorker(v *Vertex) { obj := v.Res running := false @@ -365,6 +366,7 @@ Loop: log.Printf("%s[%s]: Skipped event!", v.Kind(), v.GetName()) } ev.ACK() // ready for next message + v.Res.QuiesceGroup().Done() continue } @@ -376,6 +378,7 @@ Loop: } playback = true ev.ACK() // ready for next message + v.Res.QuiesceGroup().Done() continue } @@ -384,6 +387,7 @@ Loop: e := fmt.Errorf("%s[%s]: Permanently limited (rate != Inf, burst: 0)", v.Kind(), v.GetName()) v.SendEvent(event.EventExit, &SentinelErr{e}) ev.ACK() // ready for next message + v.Res.QuiesceGroup().Done() continue } @@ -403,6 +407,7 @@ Loop: timer.Reset(d) waiting = true // waiting for retry timer ev.ACK() + v.Res.QuiesceGroup().Done() continue } // otherwise, we run directly! } @@ -419,6 +424,7 @@ Loop: if retry == 0 { // wrap the error in the sentinel v.SendEvent(event.EventExit, &SentinelErr{e}) + v.Res.QuiesceGroup().Done() return } if retry > 0 { // don't decrement the -1 @@ -428,6 +434,8 @@ Loop: // start the timer... timer.Reset(delay) waiting = true // waiting for retry timer + // don't v.Res.QuiesceGroup().Done() b/c + // the timer is running and it can exit! return } retry = v.Meta().Retry // reset on success @@ -457,15 +465,23 @@ Loop: done = make(chan struct{}) // reset // re-send this event, to trigger a CheckApply() if playback { - playback = false // this lock avoids us sending to // channel after we've closed it! // TODO: can this experience indefinite postponement ? // see: https://github.com/golang/go/issues/11506 - go obj.Event() // replay a new event + // pause or exit is in process if not quiescing! + if !v.Res.IsQuiescing() { + playback = false + v.Res.QuiesceGroup().Add(1) // lock around it, b/c still running... + go func() { + obj.Event() // replay a new event + v.Res.QuiesceGroup().Done() + }() + } } running = false pcuid.SetConverged(true) // "unblock" Process + v.Res.QuiesceGroup().Done() case <-wcuid.ConvergedTimer(): wcuid.SetConverged(true) // converged! diff --git a/resources/resources.go b/resources/resources.go index 3475b8b7..82e64a63 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -156,6 +156,8 @@ type Base interface { Events() chan *event.Event Data() *Data IsWorking() bool + IsQuiescing() bool + QuiesceGroup() *sync.WaitGroup WaitGroup() *sync.WaitGroup Setup() Reset() @@ -235,10 +237,12 @@ type BaseRes struct { isStarted bool // did the started chan already close? starter bool // does this have indegree == 0 ? XXX: usually? - waitGroup *sync.WaitGroup - working bool // is the Worker() loop running ? - debug bool - isStateOK bool // whether the state is okay based on events or not + quiescing bool // are we quiescing (pause or exit) + quiesceGroup *sync.WaitGroup + waitGroup *sync.WaitGroup + working bool // is the Worker() loop running ? + debug bool + isStateOK bool // whether the state is okay based on events or not isGrouped bool // am i contained within a group? grouped []Res // list of any grouped resources @@ -344,6 +348,9 @@ func (obj *BaseRes) Init() error { obj.processChan = make(chan *event.Event) obj.processSync = &sync.WaitGroup{} + obj.quiescing = false // no quiesce operation is happening at the moment + obj.quiesceGroup = &sync.WaitGroup{} + obj.waitGroup = &sync.WaitGroup{} // Init and Close must be 1-1 matched! obj.waitGroup.Add(1) obj.working = true // Worker method should now be running... @@ -423,6 +430,14 @@ func (obj *BaseRes) IsWorking() bool { return obj.working } +// IsQuiescing returns if there is a quiesce operation in progress. Pause and +// exit both meet this criteria, and this tells some systems to wind down, such +// as the event replay mechanism. +func (obj *BaseRes) IsQuiescing() bool { return obj.quiescing } + +// QuiesceGroup returns the sync group associated with the quiesce operations. +func (obj *BaseRes) QuiesceGroup() *sync.WaitGroup { return obj.quiesceGroup } + // WaitGroup returns a sync.WaitGroup which is open when the resource is done. // This is more useful than a closed channel signal, since it can be re-used // safely without having to recreate it and worry about stale channel handles. diff --git a/resources/sendrecv.go b/resources/sendrecv.go index 94ae6c36..219df27c 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -36,6 +36,7 @@ func (obj *BaseRes) Event() error { obj.processLock.Unlock() return fmt.Errorf("processChan is already closed") } + obj.quiesceGroup.Add(1) // add to processChan queue count obj.processChan <- &event.Event{Kind: event.EventNil, Resp: resp} // trigger process obj.processLock.Unlock() return resp.Wait() @@ -69,24 +70,36 @@ func (obj *BaseRes) SendEvent(ev event.Kind, err error) error { // ReadEvent processes events when a select gets one, and handles the pause // code too! The return values specify if we should exit and poke respectively. func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) { - ev.ACK() + //ev.ACK() err := ev.Error() switch ev.Kind { case event.EventStart: + ev.ACK() return nil, true case event.EventPoke: + ev.ACK() return nil, true case event.EventBackPoke: + ev.ACK() return nil, true // forward poking in response to a back poke! case event.EventExit: + obj.quiescing = true + obj.quiesceGroup.Wait() + obj.quiescing = false // for symmetry + ev.ACK() // FIXME: what do we do if we have a pending refresh (poke) and an exit? return &err, false case event.EventPause: + obj.quiescing = true // set the quiesce flag to avoid event replays + obj.quiesceGroup.Wait() + obj.quiescing = false // reset + ev.ACK() + // wait for next event to continue select { case e, ok := <-obj.Events(): @@ -94,6 +107,9 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) { err := error(nil) return &err, false } + //obj.quiescing = true + //obj.quiesceGroup.Wait() // unnecessary, but symmetrically correct + //obj.quiescing = false e.ACK() err := e.Error() if e.Kind == event.EventExit {