diff --git a/pgraph/actions.go b/pgraph/actions.go index 155dcd13..660044f5 100644 --- a/pgraph/actions.go +++ b/pgraph/actions.go @@ -677,6 +677,7 @@ func (g *Graph) Exit() { // XXX: we can do this to quiesce, but it's not necessary now v.SendEvent(event.EventExit, nil) + v.Res.WaitGroup().Wait() } g.wg.Wait() // for now, this doesn't need to be a separate Wait() method } diff --git a/pgraph/pgraph.go b/pgraph/pgraph.go index 3c0c8b2b..3d55f167 100644 --- a/pgraph/pgraph.go +++ b/pgraph/pgraph.go @@ -588,6 +588,7 @@ func (g *Graph) GraphSync(oldGraph *Graph) (*Graph, error) { if !VertexContains(v, vertexKeep) { // wait for exit before starting new graph! v.SendEvent(event.EventExit, nil) // sync + v.Res.WaitGroup().Wait() oldGraph.DeleteVertex(v) } } diff --git a/resources/resources.go b/resources/resources.go index 0b0167bd..e33dcb14 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -140,6 +140,7 @@ type Base interface { Events() chan *event.Event AssociateData(*Data) IsWorking() bool + WaitGroup() *sync.WaitGroup Setup() Reset() Converger() converger.Converger @@ -192,30 +193,40 @@ type BaseRes struct { MetaParams MetaParams `yaml:"meta"` // struct of all the metaparams Recv map[string]*Send // mapping of key to receive on from value - kind string - mutex *sync.Mutex // locks around sending and closing of events channel - events chan *event.Event - converger converger.Converger // converged tracking - cuid converger.ConvergerUID - wcuid converger.ConvergerUID - pcuid converger.ConvergerUID - prometheus *prometheus.Prometheus - prefix string // base prefix for this resource - debug bool - state ResState - working bool // is the Worker() loop running ? - started chan struct{} // closed when worker is started/running - stopped chan struct{} // closed when worker is stopped/exited - isStarted bool // did the started chan already close? - starter bool // does this have indegree == 0 ? XXX: usually? - isStateOK bool // whether the state is okay based on events or not - isGrouped bool // am i contained within a group? - grouped []Res // list of any grouped resources + kind string + state ResState + prefix string // base prefix for this resource + + eventsLock *sync.Mutex // locks around sending and closing of events channel + eventsDone bool + eventsChan chan *event.Event + processLock *sync.Mutex processDone bool processChan chan *event.Event - refresh bool // does this resource have a refresh to run? + + converger converger.Converger // converged tracking + cuid converger.ConvergerUID + wcuid converger.ConvergerUID + pcuid converger.ConvergerUID + + started chan struct{} // closed when worker is started/running + stopped chan struct{} // closed when worker is stopped/exited + isStarted bool // did the started chan already close? + starter bool // does this have indegree == 0 ? XXX: usually? + + waitGroup *sync.WaitGroup + working bool // is the Worker() loop running ? + debug bool + isStateOK bool // whether the state is okay based on events or not + + isGrouped bool // am i contained within a group? + grouped []Res // list of any grouped resources + + refresh bool // does this resource have a refresh to run? //refreshState StatefulBool // TODO: future stateful bool + + prometheus *prometheus.Prometheus } // UnmarshalYAML is the custom unmarshal handler for the BaseRes struct. It is @@ -304,9 +315,17 @@ func (obj *BaseRes) Init() error { obj.wcuid = obj.converger.Register() // get a cuid for the worker! obj.pcuid = obj.converger.Register() // get a cuid for the process - obj.mutex = &sync.Mutex{} - obj.working = true // Worker method should now be running... - obj.events = make(chan *event.Event) // unbuffered chan to avoid stale events + obj.eventsLock = &sync.Mutex{} + obj.eventsDone = false + obj.eventsChan = make(chan *event.Event) // unbuffered chan to avoid stale events + + obj.processLock = &sync.Mutex{} // lock around processChan closing and sending + obj.processDone = false // did we close processChan ? + obj.processChan = make(chan *event.Event) + + obj.waitGroup = &sync.WaitGroup{} // Init and Close must be 1-1 matched! + obj.waitGroup.Add(1) + obj.working = true // Worker method should now be running... // FIXME: force a sane default until UnmarshalYAML on *BaseRes works... if obj.Meta().Burst == 0 && obj.Meta().Limit == 0 { // blocked @@ -316,10 +335,6 @@ func (obj *BaseRes) Init() error { obj.Meta().Limit = rate.Inf } - obj.processLock = &sync.Mutex{} // lock around processChan closing and sending - obj.processDone = false // did we close processChan ? - obj.processChan = make(chan *event.Event) - //dir, err := obj.VarDir("") //if err != nil { // return errwrap.Wrapf(err, "VarDir failed in Init()") @@ -340,17 +355,15 @@ func (obj *BaseRes) Close() error { obj.processDone = true close(obj.processChan) obj.processLock.Unlock() - - obj.mutex.Lock() - obj.working = false // Worker method should now be closing... - close(obj.events) // this is where we properly close this channel! - obj.mutex.Unlock() + // a Wait() for processChan to close is unnecessary I think... obj.pcuid.Unregister() obj.wcuid.Unregister() obj.cuid.Unregister() + obj.working = false // Worker method should now be closing... close(obj.stopped) + obj.waitGroup.Done() return nil } @@ -382,7 +395,7 @@ func (obj *BaseRes) Meta() *MetaParams { // Events returns the channel of events to listen on. func (obj *BaseRes) Events() chan *event.Event { - return obj.events + return obj.eventsChan } // AssociateData associates some data with the object in question. @@ -398,6 +411,11 @@ func (obj *BaseRes) IsWorking() bool { return obj.working } +// WaitGroup returns a sync.WaitGroup which is open when the resource is done. +// This is more useful than a closed channel signal, since it can be re-used +// safely without having to recreate it and worry about stale channel handles. +func (obj *BaseRes) WaitGroup() *sync.WaitGroup { return obj.waitGroup } + // Setup does some work which must happen before the Worker starts. It happens // once per Worker startup. func (obj *BaseRes) Setup() { diff --git a/resources/sendrecv.go b/resources/sendrecv.go index c7dfdc39..2cd5274a 100644 --- a/resources/sendrecv.go +++ b/resources/sendrecv.go @@ -32,9 +32,11 @@ import ( func (obj *BaseRes) Event() error { resp := event.NewResp() obj.processLock.Lock() - if !obj.processDone { - obj.processChan <- &event.Event{Name: event.EventNil, Resp: resp} // trigger process + if obj.processDone { + obj.processLock.Unlock() + return fmt.Errorf("processChan is already closed") } + obj.processChan <- &event.Event{Name: event.EventNil, Resp: resp} // trigger process obj.processLock.Unlock() return resp.Wait() } @@ -49,13 +51,17 @@ func (obj *BaseRes) SendEvent(ev event.EventName, err error) error { } } resp := event.NewResp() - obj.mutex.Lock() - if !obj.working { - obj.mutex.Unlock() - return fmt.Errorf("resource worker is not running") + obj.eventsLock.Lock() + if obj.eventsDone { + obj.eventsLock.Unlock() + return fmt.Errorf("eventsChan is already closed") } - obj.events <- &event.Event{Name: ev, Resp: resp, Err: err} - obj.mutex.Unlock() + obj.eventsChan <- &event.Event{Name: ev, Resp: resp, Err: err} + if ev == event.EventExit { + obj.eventsDone = true + close(obj.eventsChan) // this is where we properly close this channel! + } + obj.eventsLock.Unlock() resp.ACKWait() // waits until true (nil) value return nil }