pgraph, resources: Wait for innerWorker to exit cleanly

Don't run the Close() method until the innerWorker has exited cleanly.
This is a guarantee which we make to the resources.
This commit is contained in:
James Shubin
2017-02-22 22:59:40 -05:00
parent 8be09eadd4
commit 2462ea0892
2 changed files with 25 additions and 9 deletions

View File

@@ -465,6 +465,7 @@ func (g *Graph) Worker(v *Vertex) error {
} }
// run the init (should match 1-1 with Close function) // run the init (should match 1-1 with Close function)
if err := obj.Init(); err != nil { if err := obj.Init(); err != nil {
obj.ProcessExit()
// always exit the worker function by finishing with Close() // always exit the worker function by finishing with Close()
if e := obj.Close(); e != nil { if e := obj.Close(); e != nil {
err = multierr.Append(err, e) // list of errors err = multierr.Append(err, e) // list of errors
@@ -481,7 +482,12 @@ func (g *Graph) Worker(v *Vertex) error {
wcuid.SetConverged(true) // starts off false, and waits for loop timeout wcuid.SetConverged(true) // starts off false, and waits for loop timeout
pcuid.SetConverged(true) // starts off true, because it's not running... pcuid.SetConverged(true) // starts off true, because it's not running...
go g.innerWorker(v) wg := obj.ProcessSync()
wg.Add(1)
go func() {
defer wg.Done()
g.innerWorker(v)
}()
var err error // propagate the error up (this is a permanent BAD error!) var err error // propagate the error up (this is a permanent BAD error!)
// the watch delay runs inside of the Watch resource loop, so that it // the watch delay runs inside of the Watch resource loop, so that it
@@ -511,6 +517,7 @@ func (g *Graph) Worker(v *Vertex) error {
// NOTE: this code should match the similar Res code! // NOTE: this code should match the similar Res code!
//cuid.SetConverged(false) // TODO: ? //cuid.SetConverged(false) // TODO: ?
if exit, send := obj.ReadEvent(event); exit != nil { if exit, send := obj.ReadEvent(event); exit != nil {
obj.ProcessExit()
err := *exit // exit err err := *exit // exit err
if e := obj.Close(); err == nil { if e := obj.Close(); err == nil {
err = e err = e
@@ -586,6 +593,7 @@ func (g *Graph) Worker(v *Vertex) error {
//v.SendEvent(eventPoke, false, false) //v.SendEvent(eventPoke, false, false)
} }
obj.ProcessExit()
// close resource and return possible errors if any // close resource and return possible errors if any
if e := obj.Close(); err == nil { if e := obj.Close(); err == nil {
err = e err = e

View File

@@ -168,6 +168,8 @@ type Base interface {
Starter(bool) Starter(bool)
Poll() error // poll alternative to watching :( Poll() error // poll alternative to watching :(
ProcessChan() chan *event.Event ProcessChan() chan *event.Event
ProcessSync() *sync.WaitGroup
ProcessExit()
Prometheus() *prometheus.Prometheus Prometheus() *prometheus.Prometheus
} }
@@ -204,6 +206,7 @@ type BaseRes struct {
processLock *sync.Mutex processLock *sync.Mutex
processDone bool processDone bool
processChan chan *event.Event processChan chan *event.Event
processSync *sync.WaitGroup
converger converger.Converger // converged tracking converger converger.Converger // converged tracking
cuid converger.UID cuid converger.UID
@@ -322,6 +325,7 @@ func (obj *BaseRes) Init() error {
obj.processLock = &sync.Mutex{} // lock around processChan closing and sending obj.processLock = &sync.Mutex{} // lock around processChan closing and sending
obj.processDone = false // did we close processChan ? obj.processDone = false // did we close processChan ?
obj.processChan = make(chan *event.Event) obj.processChan = make(chan *event.Event)
obj.processSync = &sync.WaitGroup{}
obj.waitGroup = &sync.WaitGroup{} // Init and Close must be 1-1 matched! obj.waitGroup = &sync.WaitGroup{} // Init and Close must be 1-1 matched!
obj.waitGroup.Add(1) obj.waitGroup.Add(1)
@@ -351,12 +355,6 @@ func (obj *BaseRes) Close() error {
log.Printf("%s[%s]: Close()", obj.Kind(), obj.GetName()) log.Printf("%s[%s]: Close()", obj.Kind(), obj.GetName())
} }
obj.processLock.Lock() // lock to avoid a send when closed!
obj.processDone = true
close(obj.processChan)
obj.processLock.Unlock()
// a Wait() for processChan to close is unnecessary I think...
obj.pcuid.Unregister() obj.pcuid.Unregister()
obj.wcuid.Unregister() obj.wcuid.Unregister()
obj.cuid.Unregister() obj.cuid.Unregister()
@@ -466,8 +464,18 @@ func (obj *BaseRes) StateOK(b bool) {
} }
// ProcessChan returns the chan that resources send events to. Internal API! // ProcessChan returns the chan that resources send events to. Internal API!
func (obj *BaseRes) ProcessChan() chan *event.Event { func (obj *BaseRes) ProcessChan() chan *event.Event { return obj.processChan }
return obj.processChan
// ProcessSync returns the WaitGroup that blocks until the innerWorker closes.
func (obj *BaseRes) ProcessSync() *sync.WaitGroup { return obj.processSync }
// ProcessExit causes the innerWorker to close and waits until it does so.
func (obj *BaseRes) ProcessExit() {
obj.processLock.Lock() // lock to avoid a send when closed!
obj.processDone = true
close(obj.processChan)
obj.processLock.Unlock()
obj.processSync.Wait()
} }
// GroupCmp compares two resources and decides if they're suitable for grouping // GroupCmp compares two resources and decides if they're suitable for grouping