util: Add subscribed signal primitive
Add a little sync primitive to our utility library. This should hopefully make some of the future code easier to deal with.
This commit is contained in:
66
util/sync.go
66
util/sync.go
@@ -134,3 +134,69 @@ func (obj *EasyExit) Error() error {
|
|||||||
obj.wg.Wait() // wait for cleanup
|
obj.wg.Wait() // wait for cleanup
|
||||||
return obj.err
|
return obj.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SubscribedSignal represents a synchronized read signal. It doesn't need to be
|
||||||
|
// instantiated before it can be used. It must not be copied after first use. It
|
||||||
|
// is equivalent to receiving a multicast signal from a closing channel, except
|
||||||
|
// that it must be acknowledged by every reader of the signal, and once this is
|
||||||
|
// done, it is reset and can be re-used. Readers must obtain a handle to the
|
||||||
|
// signal with the Subscribe method, and the signal is sent out with the Done
|
||||||
|
// method.
|
||||||
|
type SubscribedSignal struct {
|
||||||
|
wg sync.WaitGroup
|
||||||
|
exit chan struct{}
|
||||||
|
mutex sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe is used by any reader of the signal. Once this function returns, it
|
||||||
|
// means that you're now ready to watch the signal. The signal can be watched as
|
||||||
|
// is done normally with any other ready channel. Once you have received the
|
||||||
|
// signal or when you are no longer interested in the signal you *must* call the
|
||||||
|
// cancel/ack function which is returned by this function on subscribe. If you
|
||||||
|
// do not, you will block the Send portion of this subscribed signal
|
||||||
|
// indefinitely. This is thread safe and can be called multiple times in
|
||||||
|
// parallel because this call is protected by a mutex. The mutex also prevents
|
||||||
|
// simultaneous calls with the Send method. the returned cancel/ack method must
|
||||||
|
// return before it's safe to call this method a subsequent time for a new
|
||||||
|
// signal. One important note: there is a possible race that *you* can cause if
|
||||||
|
// you race this Subscribe call, with the Send call. Make sure you run Subscribe
|
||||||
|
// and it returns *before* you run Send if you want to be sure to receive the
|
||||||
|
// next signal. This should be common sense but it is mentioned here to be
|
||||||
|
// helpful. They are protected by a lock, so they can't both run simultaneously.
|
||||||
|
func (obj *SubscribedSignal) Subscribe() (<-chan struct{}, func()) {
|
||||||
|
obj.mutex.Lock()
|
||||||
|
defer obj.mutex.Unlock()
|
||||||
|
|
||||||
|
if obj.exit == nil { // initialize on first use (safe b/c we use a lock)
|
||||||
|
obj.exit = make(chan struct{}) // initialize
|
||||||
|
}
|
||||||
|
|
||||||
|
obj.wg.Add(1)
|
||||||
|
return obj.exit, func() { // cancel/ack function
|
||||||
|
obj.wg.Done()
|
||||||
|
|
||||||
|
// wait for the reset signal before proceeding
|
||||||
|
obj.mutex.RLock()
|
||||||
|
defer obj.mutex.RUnlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send is called if you want to multicast the signal to all subscribed parties.
|
||||||
|
// It will require all parties to acknowledge the receipt of the signal before
|
||||||
|
// it will unblock. Just before returning, it will reset the signal so that it
|
||||||
|
// can be called a subsequent time. This is thread safe and can be called
|
||||||
|
// multiple times in parallel because this call is protected by a mutex. The
|
||||||
|
// mutex also prevents simultaneous calls with the Subscribe method.
|
||||||
|
func (obj *SubscribedSignal) Send() {
|
||||||
|
obj.mutex.Lock()
|
||||||
|
defer obj.mutex.Unlock()
|
||||||
|
|
||||||
|
if obj.exit != nil { // in case we Send before anyone runs Subscribe
|
||||||
|
close(obj.exit) // send the close signal
|
||||||
|
}
|
||||||
|
obj.wg.Wait() // wait for everyone to ack
|
||||||
|
|
||||||
|
obj.exit = make(chan struct{}) // reset
|
||||||
|
|
||||||
|
// release (re-use the above mutex)
|
||||||
|
}
|
||||||
|
|||||||
@@ -20,6 +20,8 @@
|
|||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -61,3 +63,84 @@ func TestEasyAck3(t *testing.T) {
|
|||||||
t.Errorf("the second Ack did not arrive in time")
|
t.Errorf("the second Ack did not arrive in time")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ExampleSubscribeSync() {
|
||||||
|
fmt.Println("hello")
|
||||||
|
|
||||||
|
x := &SubscribedSignal{}
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
ready := &sync.WaitGroup{}
|
||||||
|
|
||||||
|
// unit1
|
||||||
|
wg.Add(1)
|
||||||
|
ready.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
ch, ack := x.Subscribe()
|
||||||
|
ready.Done()
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
fmt.Println("got signal")
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Second) // wait a bit for fun
|
||||||
|
fmt.Println("(1) sending ack...")
|
||||||
|
ack() // must call ack
|
||||||
|
fmt.Println("done sending ack")
|
||||||
|
}()
|
||||||
|
|
||||||
|
// unit2
|
||||||
|
wg.Add(1)
|
||||||
|
ready.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
ch, ack := x.Subscribe()
|
||||||
|
ready.Done()
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
fmt.Println("got signal")
|
||||||
|
}
|
||||||
|
time.Sleep(2 * time.Second) // wait a bit for fun
|
||||||
|
fmt.Println("(2) sending ack...")
|
||||||
|
ack() // must call ack
|
||||||
|
fmt.Println("done sending ack")
|
||||||
|
}()
|
||||||
|
|
||||||
|
// unit3
|
||||||
|
wg.Add(1)
|
||||||
|
ready.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
ch, ack := x.Subscribe()
|
||||||
|
ready.Done()
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
fmt.Println("got signal")
|
||||||
|
}
|
||||||
|
time.Sleep(3 * time.Second) // wait a bit for fun
|
||||||
|
fmt.Println("(3) sending ack...")
|
||||||
|
ack() // must call ack
|
||||||
|
fmt.Println("done sending ack")
|
||||||
|
}()
|
||||||
|
|
||||||
|
ready.Wait() // wait for all subscribes
|
||||||
|
fmt.Println("sending signal...")
|
||||||
|
x.Send() // trigger!
|
||||||
|
fmt.Println("done sending signal")
|
||||||
|
|
||||||
|
wg.Wait() // wait for everyone to exit
|
||||||
|
fmt.Println("exiting...")
|
||||||
|
|
||||||
|
// Output: hello
|
||||||
|
// sending signal...
|
||||||
|
// got signal
|
||||||
|
// got signal
|
||||||
|
// got signal
|
||||||
|
// (1) sending ack...
|
||||||
|
// (2) sending ack...
|
||||||
|
// (3) sending ack...
|
||||||
|
// done sending signal
|
||||||
|
// done sending ack
|
||||||
|
// done sending ack
|
||||||
|
// done sending ack
|
||||||
|
// exiting...
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user