diff --git a/converger/converger.go b/converger/converger.go index a3ecc12c..5e059e7c 100644 --- a/converger/converger.go +++ b/converger/converger.go @@ -61,6 +61,8 @@ func New(timeout int) *Coordinator { //resumeSignal: make(chan struct{}), // happens on pause //pausedAck: util.NewEasyAck(), // happens on pause + sendSignal: make(chan bool), + stateFns: make(map[string]func(bool) error), smutex: &sync.RWMutex{}, @@ -103,6 +105,8 @@ type Coordinator struct { // pausedAck is used to send an ack message saying that we've paused. pausedAck *util.EasyAck + sendSignal chan bool // send pause (false) or resume (true) + // stateFns run on converged state changes. stateFns map[string]func(bool) error // smutex is used for controlling access to the stateFns map. @@ -176,11 +180,28 @@ func (obj *Coordinator) Run(startPaused bool) { for { // pause if one was requested... select { - case <-obj.pauseSignal: // channel closes + //case <-obj.pauseSignal: // channel closes + // obj.pausedAck.Ack() // send ack + // // we are paused now, and waiting for resume or exit... + // select { + // case <-obj.resumeSignal: // channel closes # XXX: RACE READ + // // resumed! + // + // case <-obj.closeChan: // we can always escape + // return + // } + case b, _ := <-obj.sendSignal: + if b { // resume + panic("unexpected resume") // TODO: continue instead? + } + // paused obj.pausedAck.Ack() // send ack // we are paused now, and waiting for resume or exit... select { - case <-obj.resumeSignal: // channel closes + case b, _ := <-obj.sendSignal: + if !b { // pause + panic("unexpected pause") // TODO: continue instead? + } // resumed! case <-obj.closeChan: // we can always escape @@ -229,8 +250,13 @@ func (obj *Coordinator) Pause() error { } obj.pausedAck = util.NewEasyAck() - obj.resumeSignal = make(chan struct{}) // build the resume signal - close(obj.pauseSignal) + //obj.resumeSignal = make(chan struct{}) // build the resume signal XXX: RACE WRITE + //close(obj.pauseSignal) + select { + case obj.sendSignal <- false: + case <-obj.closeChan: + return fmt.Errorf("closing") + } // wait for ack (or exit signal) select { @@ -253,8 +279,14 @@ func (obj *Coordinator) Resume() { return } - obj.pauseSignal = make(chan struct{}) // rebuild for next pause - close(obj.resumeSignal) + //obj.pauseSignal = make(chan struct{}) // rebuild for next pause + //close(obj.resumeSignal) + select { + case obj.sendSignal <- true: + case <-obj.closeChan: + return + } + obj.poke() // unblock and notice the resume if necessary obj.paused = false