diff --git a/util/sync.go b/util/sync.go index 4e667896..f01ddfe7 100644 --- a/util/sync.go +++ b/util/sync.go @@ -134,3 +134,69 @@ func (obj *EasyExit) Error() error { obj.wg.Wait() // wait for cleanup return obj.err } + +// SubscribedSignal represents a synchronized read signal. It doesn't need to be +// instantiated before it can be used. It must not be copied after first use. It +// is equivalent to receiving a multicast signal from a closing channel, except +// that it must be acknowledged by every reader of the signal, and once this is +// done, it is reset and can be re-used. Readers must obtain a handle to the +// signal with the Subscribe method, and the signal is sent out with the Done +// method. +type SubscribedSignal struct { + wg sync.WaitGroup + exit chan struct{} + mutex sync.RWMutex +} + +// Subscribe is used by any reader of the signal. Once this function returns, it +// means that you're now ready to watch the signal. The signal can be watched as +// is done normally with any other ready channel. Once you have received the +// signal or when you are no longer interested in the signal you *must* call the +// cancel/ack function which is returned by this function on subscribe. If you +// do not, you will block the Send portion of this subscribed signal +// indefinitely. This is thread safe and can be called multiple times in +// parallel because this call is protected by a mutex. The mutex also prevents +// simultaneous calls with the Send method. the returned cancel/ack method must +// return before it's safe to call this method a subsequent time for a new +// signal. One important note: there is a possible race that *you* can cause if +// you race this Subscribe call, with the Send call. Make sure you run Subscribe +// and it returns *before* you run Send if you want to be sure to receive the +// next signal. This should be common sense but it is mentioned here to be +// helpful. They are protected by a lock, so they can't both run simultaneously. +func (obj *SubscribedSignal) Subscribe() (<-chan struct{}, func()) { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + if obj.exit == nil { // initialize on first use (safe b/c we use a lock) + obj.exit = make(chan struct{}) // initialize + } + + obj.wg.Add(1) + return obj.exit, func() { // cancel/ack function + obj.wg.Done() + + // wait for the reset signal before proceeding + obj.mutex.RLock() + defer obj.mutex.RUnlock() + } +} + +// Send is called if you want to multicast the signal to all subscribed parties. +// It will require all parties to acknowledge the receipt of the signal before +// it will unblock. Just before returning, it will reset the signal so that it +// can be called a subsequent time. This is thread safe and can be called +// multiple times in parallel because this call is protected by a mutex. The +// mutex also prevents simultaneous calls with the Subscribe method. +func (obj *SubscribedSignal) Send() { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + if obj.exit != nil { // in case we Send before anyone runs Subscribe + close(obj.exit) // send the close signal + } + obj.wg.Wait() // wait for everyone to ack + + obj.exit = make(chan struct{}) // reset + + // release (re-use the above mutex) +} diff --git a/util/sync_test.go b/util/sync_test.go index aa3e2c8b..7aa43ad8 100644 --- a/util/sync_test.go +++ b/util/sync_test.go @@ -20,6 +20,8 @@ package util import ( + "fmt" + "sync" "testing" "time" ) @@ -61,3 +63,84 @@ func TestEasyAck3(t *testing.T) { t.Errorf("the second Ack did not arrive in time") } } + +func ExampleSubscribeSync() { + fmt.Println("hello") + + x := &SubscribedSignal{} + wg := &sync.WaitGroup{} + ready := &sync.WaitGroup{} + + // unit1 + wg.Add(1) + ready.Add(1) + go func() { + defer wg.Done() + ch, ack := x.Subscribe() + ready.Done() + select { + case <-ch: + fmt.Println("got signal") + } + time.Sleep(1 * time.Second) // wait a bit for fun + fmt.Println("(1) sending ack...") + ack() // must call ack + fmt.Println("done sending ack") + }() + + // unit2 + wg.Add(1) + ready.Add(1) + go func() { + defer wg.Done() + ch, ack := x.Subscribe() + ready.Done() + select { + case <-ch: + fmt.Println("got signal") + } + time.Sleep(2 * time.Second) // wait a bit for fun + fmt.Println("(2) sending ack...") + ack() // must call ack + fmt.Println("done sending ack") + }() + + // unit3 + wg.Add(1) + ready.Add(1) + go func() { + defer wg.Done() + ch, ack := x.Subscribe() + ready.Done() + select { + case <-ch: + fmt.Println("got signal") + } + time.Sleep(3 * time.Second) // wait a bit for fun + fmt.Println("(3) sending ack...") + ack() // must call ack + fmt.Println("done sending ack") + }() + + ready.Wait() // wait for all subscribes + fmt.Println("sending signal...") + x.Send() // trigger! + fmt.Println("done sending signal") + + wg.Wait() // wait for everyone to exit + fmt.Println("exiting...") + + // Output: hello + // sending signal... + // got signal + // got signal + // got signal + // (1) sending ack... + // (2) sending ack... + // (3) sending ack... + // done sending signal + // done sending ack + // done sending ack + // done sending ack + // exiting... +}