From 05d72b339d4c277d67dd668b98074fecd7c57ca5 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Wed, 6 Aug 2025 18:33:26 -0400 Subject: [PATCH] converger: Combine two signal channels into one There's no reason we need to remake these two channels, when we can just use one. We should probably rewrite this code entirely, but at least we get rid of this race for now. --- converger/converger.go | 44 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/converger/converger.go b/converger/converger.go index a3ecc12c..5e059e7c 100644 --- a/converger/converger.go +++ b/converger/converger.go @@ -61,6 +61,8 @@ func New(timeout int) *Coordinator { //resumeSignal: make(chan struct{}), // happens on pause //pausedAck: util.NewEasyAck(), // happens on pause + sendSignal: make(chan bool), + stateFns: make(map[string]func(bool) error), smutex: &sync.RWMutex{}, @@ -103,6 +105,8 @@ type Coordinator struct { // pausedAck is used to send an ack message saying that we've paused. pausedAck *util.EasyAck + sendSignal chan bool // send pause (false) or resume (true) + // stateFns run on converged state changes. stateFns map[string]func(bool) error // smutex is used for controlling access to the stateFns map. @@ -176,11 +180,28 @@ func (obj *Coordinator) Run(startPaused bool) { for { // pause if one was requested... select { - case <-obj.pauseSignal: // channel closes + //case <-obj.pauseSignal: // channel closes + // obj.pausedAck.Ack() // send ack + // // we are paused now, and waiting for resume or exit... + // select { + // case <-obj.resumeSignal: // channel closes # XXX: RACE READ + // // resumed! + // + // case <-obj.closeChan: // we can always escape + // return + // } + case b, _ := <-obj.sendSignal: + if b { // resume + panic("unexpected resume") // TODO: continue instead? + } + // paused obj.pausedAck.Ack() // send ack // we are paused now, and waiting for resume or exit... select { - case <-obj.resumeSignal: // channel closes + case b, _ := <-obj.sendSignal: + if !b { // pause + panic("unexpected pause") // TODO: continue instead? + } // resumed! case <-obj.closeChan: // we can always escape @@ -229,8 +250,13 @@ func (obj *Coordinator) Pause() error { } obj.pausedAck = util.NewEasyAck() - obj.resumeSignal = make(chan struct{}) // build the resume signal - close(obj.pauseSignal) + //obj.resumeSignal = make(chan struct{}) // build the resume signal XXX: RACE WRITE + //close(obj.pauseSignal) + select { + case obj.sendSignal <- false: + case <-obj.closeChan: + return fmt.Errorf("closing") + } // wait for ack (or exit signal) select { @@ -253,8 +279,14 @@ func (obj *Coordinator) Resume() { return } - obj.pauseSignal = make(chan struct{}) // rebuild for next pause - close(obj.resumeSignal) + //obj.pauseSignal = make(chan struct{}) // rebuild for next pause + //close(obj.resumeSignal) + select { + case obj.sendSignal <- true: + case <-obj.closeChan: + return + } + obj.poke() // unblock and notice the resume if necessary obj.paused = false