converger: Combine two signal channels into one

There's no reason we need to remake these two channels, when we can just
use one. We should probably rewrite this code entirely, but at least we
get rid of this race for now.
This commit is contained in:
James Shubin
2025-08-06 18:33:26 -04:00
parent d2cda4ca78
commit 05d72b339d

View File

@@ -61,6 +61,8 @@ func New(timeout int) *Coordinator {
//resumeSignal: make(chan struct{}), // happens on pause //resumeSignal: make(chan struct{}), // happens on pause
//pausedAck: util.NewEasyAck(), // happens on pause //pausedAck: util.NewEasyAck(), // happens on pause
sendSignal: make(chan bool),
stateFns: make(map[string]func(bool) error), stateFns: make(map[string]func(bool) error),
smutex: &sync.RWMutex{}, smutex: &sync.RWMutex{},
@@ -103,6 +105,8 @@ type Coordinator struct {
// pausedAck is used to send an ack message saying that we've paused. // pausedAck is used to send an ack message saying that we've paused.
pausedAck *util.EasyAck pausedAck *util.EasyAck
sendSignal chan bool // send pause (false) or resume (true)
// stateFns run on converged state changes. // stateFns run on converged state changes.
stateFns map[string]func(bool) error stateFns map[string]func(bool) error
// smutex is used for controlling access to the stateFns map. // smutex is used for controlling access to the stateFns map.
@@ -176,11 +180,28 @@ func (obj *Coordinator) Run(startPaused bool) {
for { for {
// pause if one was requested... // pause if one was requested...
select { select {
case <-obj.pauseSignal: // channel closes //case <-obj.pauseSignal: // channel closes
// obj.pausedAck.Ack() // send ack
// // we are paused now, and waiting for resume or exit...
// select {
// case <-obj.resumeSignal: // channel closes # XXX: RACE READ
// // resumed!
//
// case <-obj.closeChan: // we can always escape
// return
// }
case b, _ := <-obj.sendSignal:
if b { // resume
panic("unexpected resume") // TODO: continue instead?
}
// paused
obj.pausedAck.Ack() // send ack obj.pausedAck.Ack() // send ack
// we are paused now, and waiting for resume or exit... // we are paused now, and waiting for resume or exit...
select { select {
case <-obj.resumeSignal: // channel closes case b, _ := <-obj.sendSignal:
if !b { // pause
panic("unexpected pause") // TODO: continue instead?
}
// resumed! // resumed!
case <-obj.closeChan: // we can always escape case <-obj.closeChan: // we can always escape
@@ -229,8 +250,13 @@ func (obj *Coordinator) Pause() error {
} }
obj.pausedAck = util.NewEasyAck() obj.pausedAck = util.NewEasyAck()
obj.resumeSignal = make(chan struct{}) // build the resume signal //obj.resumeSignal = make(chan struct{}) // build the resume signal XXX: RACE WRITE
close(obj.pauseSignal) //close(obj.pauseSignal)
select {
case obj.sendSignal <- false:
case <-obj.closeChan:
return fmt.Errorf("closing")
}
// wait for ack (or exit signal) // wait for ack (or exit signal)
select { select {
@@ -253,8 +279,14 @@ func (obj *Coordinator) Resume() {
return return
} }
obj.pauseSignal = make(chan struct{}) // rebuild for next pause //obj.pauseSignal = make(chan struct{}) // rebuild for next pause
close(obj.resumeSignal) //close(obj.resumeSignal)
select {
case obj.sendSignal <- true:
case <-obj.closeChan:
return
}
obj.poke() // unblock and notice the resume if necessary obj.poke() // unblock and notice the resume if necessary
obj.paused = false obj.paused = false