diff --git a/pgraph/actions.go b/pgraph/actions.go index 660044f5..2ffa8071 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -465,6 +465,7 @@ func (g *Graph) Worker(v *Vertex) error { } // run the init (should match 1-1 with Close function) if err := obj.Init(); err != nil { + obj.ProcessExit() // always exit the worker function by finishing with Close() if e := obj.Close(); e != nil { err = multierr.Append(err, e) // list of errors @@ -481,7 +482,12 @@ func (g *Graph) Worker(v *Vertex) error { wcuid.SetConverged(true) // starts off false, and waits for loop timeout pcuid.SetConverged(true) // starts off true, because it's not running... - go g.innerWorker(v) + wg := obj.ProcessSync() + wg.Add(1) + go func() { + defer wg.Done() + g.innerWorker(v) + }() var err error // propagate the error up (this is a permanent BAD error!) // the watch delay runs inside of the Watch resource loop, so that it @@ -511,6 +517,7 @@ func (g *Graph) Worker(v *Vertex) error { // NOTE: this code should match the similar Res code! //cuid.SetConverged(false) // TODO: ? if exit, send := obj.ReadEvent(event); exit != nil { + obj.ProcessExit() err := *exit // exit err if e := obj.Close(); err == nil { err = e @@ -586,6 +593,7 @@ func (g *Graph) Worker(v *Vertex) error { //v.SendEvent(eventPoke, false, false) } + obj.ProcessExit() // close resource and return possible errors if any if e := obj.Close(); err == nil { err = e diff --git a/resources/resources.go b/resources/resources.go index f920dcee..f997182d 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -168,6 +168,8 @@ type Base interface { Starter(bool) Poll() error // poll alternative to watching :( ProcessChan() chan *event.Event + ProcessSync() *sync.WaitGroup + ProcessExit() Prometheus() *prometheus.Prometheus } @@ -204,6 +206,7 @@ type BaseRes struct { processLock *sync.Mutex processDone bool processChan chan *event.Event + processSync *sync.WaitGroup converger converger.Converger // converged tracking cuid converger.UID @@ -322,6 +325,7 @@ func (obj *BaseRes) Init() error { obj.processLock = &sync.Mutex{} // lock around processChan closing and sending obj.processDone = false // did we close processChan ? obj.processChan = make(chan *event.Event) + obj.processSync = &sync.WaitGroup{} obj.waitGroup = &sync.WaitGroup{} // Init and Close must be 1-1 matched! obj.waitGroup.Add(1) @@ -351,12 +355,6 @@ func (obj *BaseRes) Close() error { log.Printf("%s[%s]: Close()", obj.Kind(), obj.GetName()) } - obj.processLock.Lock() // lock to avoid a send when closed! - obj.processDone = true - close(obj.processChan) - obj.processLock.Unlock() - // a Wait() for processChan to close is unnecessary I think... - obj.pcuid.Unregister() obj.wcuid.Unregister() obj.cuid.Unregister() @@ -466,8 +464,18 @@ func (obj *BaseRes) StateOK(b bool) { } // ProcessChan returns the chan that resources send events to. Internal API! -func (obj *BaseRes) ProcessChan() chan *event.Event { - return obj.processChan +func (obj *BaseRes) ProcessChan() chan *event.Event { return obj.processChan } + +// ProcessSync returns the WaitGroup that blocks until the innerWorker closes. +func (obj *BaseRes) ProcessSync() *sync.WaitGroup { return obj.processSync } + +// ProcessExit causes the innerWorker to close and waits until it does so. +func (obj *BaseRes) ProcessExit() { + obj.processLock.Lock() // lock to avoid a send when closed! + obj.processDone = true + close(obj.processChan) + obj.processLock.Unlock() + obj.processSync.Wait() } // GroupCmp compares two resources and decides if they're suitable for grouping