pgraph, resources: Clean up the event system around the resources

This cleans up some of the resource events and also reorganizes the
struct for simplicity. This should hopefully kill off at least one race
which would cause unnecessary blocking!

Yes this patch is a bit yucky, but so was the bug I was fighting with!
This commit is contained in:
James Shubin
2017-02-22 17:03:12 -05:00
parent 3bd37a7906
commit 49594b8435
4 changed files with 67 additions and 41 deletions

View File

@@ -677,6 +677,7 @@ func (g *Graph) Exit() {
// XXX: we can do this to quiesce, but it's not necessary now // XXX: we can do this to quiesce, but it's not necessary now
v.SendEvent(event.EventExit, nil) v.SendEvent(event.EventExit, nil)
v.Res.WaitGroup().Wait()
} }
g.wg.Wait() // for now, this doesn't need to be a separate Wait() method g.wg.Wait() // for now, this doesn't need to be a separate Wait() method
} }

View File

@@ -588,6 +588,7 @@ func (g *Graph) GraphSync(oldGraph *Graph) (*Graph, error) {
if !VertexContains(v, vertexKeep) { if !VertexContains(v, vertexKeep) {
// wait for exit before starting new graph! // wait for exit before starting new graph!
v.SendEvent(event.EventExit, nil) // sync v.SendEvent(event.EventExit, nil) // sync
v.Res.WaitGroup().Wait()
oldGraph.DeleteVertex(v) oldGraph.DeleteVertex(v)
} }
} }

View File

@@ -140,6 +140,7 @@ type Base interface {
Events() chan *event.Event Events() chan *event.Event
AssociateData(*Data) AssociateData(*Data)
IsWorking() bool IsWorking() bool
WaitGroup() *sync.WaitGroup
Setup() Setup()
Reset() Reset()
Converger() converger.Converger Converger() converger.Converger
@@ -192,30 +193,40 @@ type BaseRes struct {
MetaParams MetaParams `yaml:"meta"` // struct of all the metaparams MetaParams MetaParams `yaml:"meta"` // struct of all the metaparams
Recv map[string]*Send // mapping of key to receive on from value Recv map[string]*Send // mapping of key to receive on from value
kind string kind string
mutex *sync.Mutex // locks around sending and closing of events channel state ResState
events chan *event.Event prefix string // base prefix for this resource
converger converger.Converger // converged tracking
cuid converger.ConvergerUID eventsLock *sync.Mutex // locks around sending and closing of events channel
wcuid converger.ConvergerUID eventsDone bool
pcuid converger.ConvergerUID eventsChan chan *event.Event
prometheus *prometheus.Prometheus
prefix string // base prefix for this resource
debug bool
state ResState
working bool // is the Worker() loop running ?
started chan struct{} // closed when worker is started/running
stopped chan struct{} // closed when worker is stopped/exited
isStarted bool // did the started chan already close?
starter bool // does this have indegree == 0 ? XXX: usually?
isStateOK bool // whether the state is okay based on events or not
isGrouped bool // am i contained within a group?
grouped []Res // list of any grouped resources
processLock *sync.Mutex processLock *sync.Mutex
processDone bool processDone bool
processChan chan *event.Event processChan chan *event.Event
refresh bool // does this resource have a refresh to run?
converger converger.Converger // converged tracking
cuid converger.ConvergerUID
wcuid converger.ConvergerUID
pcuid converger.ConvergerUID
started chan struct{} // closed when worker is started/running
stopped chan struct{} // closed when worker is stopped/exited
isStarted bool // did the started chan already close?
starter bool // does this have indegree == 0 ? XXX: usually?
waitGroup *sync.WaitGroup
working bool // is the Worker() loop running ?
debug bool
isStateOK bool // whether the state is okay based on events or not
isGrouped bool // am i contained within a group?
grouped []Res // list of any grouped resources
refresh bool // does this resource have a refresh to run?
//refreshState StatefulBool // TODO: future stateful bool //refreshState StatefulBool // TODO: future stateful bool
prometheus *prometheus.Prometheus
} }
// UnmarshalYAML is the custom unmarshal handler for the BaseRes struct. It is // UnmarshalYAML is the custom unmarshal handler for the BaseRes struct. It is
@@ -304,9 +315,17 @@ func (obj *BaseRes) Init() error {
obj.wcuid = obj.converger.Register() // get a cuid for the worker! obj.wcuid = obj.converger.Register() // get a cuid for the worker!
obj.pcuid = obj.converger.Register() // get a cuid for the process obj.pcuid = obj.converger.Register() // get a cuid for the process
obj.mutex = &sync.Mutex{} obj.eventsLock = &sync.Mutex{}
obj.working = true // Worker method should now be running... obj.eventsDone = false
obj.events = make(chan *event.Event) // unbuffered chan to avoid stale events obj.eventsChan = make(chan *event.Event) // unbuffered chan to avoid stale events
obj.processLock = &sync.Mutex{} // lock around processChan closing and sending
obj.processDone = false // did we close processChan ?
obj.processChan = make(chan *event.Event)
obj.waitGroup = &sync.WaitGroup{} // Init and Close must be 1-1 matched!
obj.waitGroup.Add(1)
obj.working = true // Worker method should now be running...
// FIXME: force a sane default until UnmarshalYAML on *BaseRes works... // FIXME: force a sane default until UnmarshalYAML on *BaseRes works...
if obj.Meta().Burst == 0 && obj.Meta().Limit == 0 { // blocked if obj.Meta().Burst == 0 && obj.Meta().Limit == 0 { // blocked
@@ -316,10 +335,6 @@ func (obj *BaseRes) Init() error {
obj.Meta().Limit = rate.Inf obj.Meta().Limit = rate.Inf
} }
obj.processLock = &sync.Mutex{} // lock around processChan closing and sending
obj.processDone = false // did we close processChan ?
obj.processChan = make(chan *event.Event)
//dir, err := obj.VarDir("") //dir, err := obj.VarDir("")
//if err != nil { //if err != nil {
// return errwrap.Wrapf(err, "VarDir failed in Init()") // return errwrap.Wrapf(err, "VarDir failed in Init()")
@@ -340,17 +355,15 @@ func (obj *BaseRes) Close() error {
obj.processDone = true obj.processDone = true
close(obj.processChan) close(obj.processChan)
obj.processLock.Unlock() obj.processLock.Unlock()
// a Wait() for processChan to close is unnecessary I think...
obj.mutex.Lock()
obj.working = false // Worker method should now be closing...
close(obj.events) // this is where we properly close this channel!
obj.mutex.Unlock()
obj.pcuid.Unregister() obj.pcuid.Unregister()
obj.wcuid.Unregister() obj.wcuid.Unregister()
obj.cuid.Unregister() obj.cuid.Unregister()
obj.working = false // Worker method should now be closing...
close(obj.stopped) close(obj.stopped)
obj.waitGroup.Done()
return nil return nil
} }
@@ -382,7 +395,7 @@ func (obj *BaseRes) Meta() *MetaParams {
// Events returns the channel of events to listen on. // Events returns the channel of events to listen on.
func (obj *BaseRes) Events() chan *event.Event { func (obj *BaseRes) Events() chan *event.Event {
return obj.events return obj.eventsChan
} }
// AssociateData associates some data with the object in question. // AssociateData associates some data with the object in question.
@@ -398,6 +411,11 @@ func (obj *BaseRes) IsWorking() bool {
return obj.working return obj.working
} }
// WaitGroup returns a sync.WaitGroup which is open when the resource is done.
// This is more useful than a closed channel signal, since it can be re-used
// safely without having to recreate it and worry about stale channel handles.
func (obj *BaseRes) WaitGroup() *sync.WaitGroup { return obj.waitGroup }
// Setup does some work which must happen before the Worker starts. It happens // Setup does some work which must happen before the Worker starts. It happens
// once per Worker startup. // once per Worker startup.
func (obj *BaseRes) Setup() { func (obj *BaseRes) Setup() {

View File

@@ -32,9 +32,11 @@ import (
func (obj *BaseRes) Event() error { func (obj *BaseRes) Event() error {
resp := event.NewResp() resp := event.NewResp()
obj.processLock.Lock() obj.processLock.Lock()
if !obj.processDone { if obj.processDone {
obj.processChan <- &event.Event{Name: event.EventNil, Resp: resp} // trigger process obj.processLock.Unlock()
return fmt.Errorf("processChan is already closed")
} }
obj.processChan <- &event.Event{Name: event.EventNil, Resp: resp} // trigger process
obj.processLock.Unlock() obj.processLock.Unlock()
return resp.Wait() return resp.Wait()
} }
@@ -49,13 +51,17 @@ func (obj *BaseRes) SendEvent(ev event.EventName, err error) error {
} }
} }
resp := event.NewResp() resp := event.NewResp()
obj.mutex.Lock() obj.eventsLock.Lock()
if !obj.working { if obj.eventsDone {
obj.mutex.Unlock() obj.eventsLock.Unlock()
return fmt.Errorf("resource worker is not running") return fmt.Errorf("eventsChan is already closed")
} }
obj.events <- &event.Event{Name: ev, Resp: resp, Err: err} obj.eventsChan <- &event.Event{Name: ev, Resp: resp, Err: err}
obj.mutex.Unlock() if ev == event.EventExit {
obj.eventsDone = true
close(obj.eventsChan) // this is where we properly close this channel!
}
obj.eventsLock.Unlock()
resp.ACKWait() // waits until true (nil) value resp.ACKWait() // waits until true (nil) value
return nil return nil
} }