diff --git a/pgraph/actions.go b/pgraph/actions.go index 55968b06..576daaa0 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -328,6 +328,7 @@ func (obj *SentinelErr) Error() string { } // innerWorker is the CheckApply runner that reads from processChan. +// TODO: would it be better if this was a method on BaseRes that took in *Graph? func (g *Graph) innerWorker(v *Vertex) { obj := v.Res running := false @@ -365,6 +366,7 @@ Loop: log.Printf("%s[%s]: Skipped event!", v.Kind(), v.GetName()) } ev.ACK() // ready for next message + v.Res.QuiesceGroup().Done() continue } @@ -376,6 +378,7 @@ Loop: } playback = true ev.ACK() // ready for next message + v.Res.QuiesceGroup().Done() continue } @@ -384,6 +387,7 @@ Loop: e := fmt.Errorf("%s[%s]: Permanently limited (rate != Inf, burst: 0)", v.Kind(), v.GetName()) v.SendEvent(event.EventExit, &SentinelErr{e}) ev.ACK() // ready for next message + v.Res.QuiesceGroup().Done() continue } @@ -403,6 +407,7 @@ Loop: timer.Reset(d) waiting = true // waiting for retry timer ev.ACK() + v.Res.QuiesceGroup().Done() continue } // otherwise, we run directly! } @@ -419,6 +424,7 @@ Loop: if retry == 0 { // wrap the error in the sentinel v.SendEvent(event.EventExit, &SentinelErr{e}) + v.Res.QuiesceGroup().Done() return } if retry > 0 { // don't decrement the -1 @@ -428,6 +434,8 @@ Loop: // start the timer... timer.Reset(delay) waiting = true // waiting for retry timer + // don't v.Res.QuiesceGroup().Done() b/c + // the timer is running and it can exit! return } retry = v.Meta().Retry // reset on success @@ -457,15 +465,23 @@ Loop: done = make(chan struct{}) // reset // re-send this event, to trigger a CheckApply() if playback { - playback = false // this lock avoids us sending to // channel after we've closed it! // TODO: can this experience indefinite postponement ? // see: https://github.com/golang/go/issues/11506 - go obj.Event() // replay a new event + // pause or exit is in process if not quiescing! + if !v.Res.IsQuiescing() { + playback = false + v.Res.QuiesceGroup().Add(1) // lock around it, b/c still running... + go func() { + obj.Event() // replay a new event + v.Res.QuiesceGroup().Done() + }() + } } running = false pcuid.SetConverged(true) // "unblock" Process + v.Res.QuiesceGroup().Done() case <-wcuid.ConvergedTimer(): wcuid.SetConverged(true) // converged! diff --git a/resources/resources.go b/resources/resources.go index 3475b8b7..82e64a63 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -156,6 +156,8 @@ type Base interface { Events() chan *event.Event Data() *Data IsWorking() bool + IsQuiescing() bool + QuiesceGroup() *sync.WaitGroup WaitGroup() *sync.WaitGroup Setup() Reset() @@ -235,10 +237,12 @@ type BaseRes struct { isStarted bool // did the started chan already close? starter bool // does this have indegree == 0 ? XXX: usually? - waitGroup *sync.WaitGroup - working bool // is the Worker() loop running ? - debug bool - isStateOK bool // whether the state is okay based on events or not + quiescing bool // are we quiescing (pause or exit) + quiesceGroup *sync.WaitGroup + waitGroup *sync.WaitGroup + working bool // is the Worker() loop running ? + debug bool + isStateOK bool // whether the state is okay based on events or not isGrouped bool // am i contained within a group? grouped []Res // list of any grouped resources @@ -344,6 +348,9 @@ func (obj *BaseRes) Init() error { obj.processChan = make(chan *event.Event) obj.processSync = &sync.WaitGroup{} + obj.quiescing = false // no quiesce operation is happening at the moment + obj.quiesceGroup = &sync.WaitGroup{} + obj.waitGroup = &sync.WaitGroup{} // Init and Close must be 1-1 matched! obj.waitGroup.Add(1) obj.working = true // Worker method should now be running... @@ -423,6 +430,14 @@ func (obj *BaseRes) IsWorking() bool { return obj.working } +// IsQuiescing returns if there is a quiesce operation in progress. Pause and +// exit both meet this criteria, and this tells some systems to wind down, such +// as the event replay mechanism. +func (obj *BaseRes) IsQuiescing() bool { return obj.quiescing } + +// QuiesceGroup returns the sync group associated with the quiesce operations. +func (obj *BaseRes) QuiesceGroup() *sync.WaitGroup { return obj.quiesceGroup } + // WaitGroup returns a sync.WaitGroup which is open when the resource is done. // This is more useful than a closed channel signal, since it can be re-used // safely without having to recreate it and worry about stale channel handles. diff --git a/resources/sendrecv.go b/resources/sendrecv.go index 94ae6c36..219df27c 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -36,6 +36,7 @@ func (obj *BaseRes) Event() error { obj.processLock.Unlock() return fmt.Errorf("processChan is already closed") } + obj.quiesceGroup.Add(1) // add to processChan queue count obj.processChan <- &event.Event{Kind: event.EventNil, Resp: resp} // trigger process obj.processLock.Unlock() return resp.Wait() @@ -69,24 +70,36 @@ func (obj *BaseRes) SendEvent(ev event.Kind, err error) error { // ReadEvent processes events when a select gets one, and handles the pause // code too! The return values specify if we should exit and poke respectively. func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) { - ev.ACK() + //ev.ACK() err := ev.Error() switch ev.Kind { case event.EventStart: + ev.ACK() return nil, true case event.EventPoke: + ev.ACK() return nil, true case event.EventBackPoke: + ev.ACK() return nil, true // forward poking in response to a back poke! case event.EventExit: + obj.quiescing = true + obj.quiesceGroup.Wait() + obj.quiescing = false // for symmetry + ev.ACK() // FIXME: what do we do if we have a pending refresh (poke) and an exit? return &err, false case event.EventPause: + obj.quiescing = true // set the quiesce flag to avoid event replays + obj.quiesceGroup.Wait() + obj.quiescing = false // reset + ev.ACK() + // wait for next event to continue select { case e, ok := <-obj.Events(): @@ -94,6 +107,9 @@ func (obj *BaseRes) ReadEvent(ev *event.Event) (exit *error, send bool) { err := error(nil) return &err, false } + //obj.quiescing = true + //obj.quiesceGroup.Wait() // unnecessary, but symmetrically correct + //obj.quiescing = false e.ACK() err := e.Error() if e.Kind == event.EventExit {