diff --git a/util/sync.go b/util/sync.go index 9d5a66ee..0ceeebe0 100644 --- a/util/sync.go +++ b/util/sync.go @@ -229,3 +229,82 @@ func (obj *SubscribedSignal) Send() { // release (re-use the above mutex) } + +// BoundedReadSemaphore is a mutex that allows multiple Lock operations to occur +// concurrently, as if they were read locks. The distinction is that for the +// first Lock operation to complete, the Start() channel state must be read. At +// this point subsequent Lock operations will succeed. The End state completes +// once the last paired Unlock operation is run. The cycle can be repeated +// without needing to re-initialize the struct. Each Lock or Unlock operation +// itself contains a call to Lock an internal mutex for accounting and +// implementation purposes. +// +// This was previously named SharedMutex. We welcome alternate naming +// suggestions. +type BoundedReadSemaphore struct { + // mutex locks individual operations on our struct. + mutex *sync.Mutex + + // start is the Start signal channel. + start chan struct{} + + // end is the End signal channel. + end chan struct{} + + // count keeps track of the number of active lockers. + count int64 +} + +// NewBoundedReadSemaphore creates an initialized object. This must be done +// before first use, as the empty struct is not a valid BoundedReadSemaphore. +func NewBoundedReadSemaphore() *BoundedReadSemaphore { + return &BoundedReadSemaphore{ + mutex: &sync.Mutex{}, + start: make(chan struct{}), + end: make(chan struct{}), + } +} + +// Lock asks for a lock on this mutex. After the first Lock call synchronizes, +// subsequent calls will succeed quickly. The first "synchronization" waits for +// the lock start signal, which is a receive on the Start channel. +func (obj *BoundedReadSemaphore) Lock() { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + obj.count++ + + if obj.count == 1 { // we're the first lock (the mutex guarantees this) + obj.start <- struct{}{} // lock start + } +} + +// Unlock unlocks from a previous Lock operation. If this call is the last +// paired unlock operation, it blocks until the End() signal is synchronized. If +// you unlock more times than you lock, then you will cause a panic. +func (obj *BoundedReadSemaphore) Unlock() { + obj.mutex.Lock() + defer obj.mutex.Unlock() + + if obj.count == 0 { + panic("negative BoundedReadSemaphore counter") + } + + obj.count-- + + if obj.count == 0 { // we're the last unlock (the mutex guarantees this) + obj.end <- struct{}{} // lock end + } +} + +// Start will return a single signal on this output channel to advise that we've +// started the "locked" state. +func (obj *BoundedReadSemaphore) Start() <-chan struct{} { + return obj.start +} + +// End will return a single signal on this output channel to advise that we've +// returned to the "unlocked" state. +func (obj *BoundedReadSemaphore) End() <-chan struct{} { + return obj.end +} diff --git a/util/sync_test.go b/util/sync_test.go index e00022bf..d594ae90 100644 --- a/util/sync_test.go +++ b/util/sync_test.go @@ -167,3 +167,107 @@ func ExampleSubscribedSignal() { // done sending signal // exiting... } + +func ExampleBoundedReadSemaphore() { + fmt.Printf("hello\n") + defer fmt.Printf("goodbye\n") + + wg := &sync.WaitGroup{} + defer wg.Wait() + + ch := make(chan struct{}) // close signal + + brs := NewBoundedReadSemaphore() + + wg.Add(1) + go func() { + defer wg.Done() + brs.Lock() + defer brs.Unlock() + time.Sleep(100 * time.Millisecond) // delay for consistent print + + fmt.Printf("#1 is in the locked zone\n") + time.Sleep(1 * time.Second) + }() + + wg.Add(1) + go func() { + defer wg.Done() + brs.Lock() + defer brs.Unlock() + time.Sleep(200 * time.Millisecond) // delay for consistent print + + fmt.Printf("#2 is in the locked zone\n") + time.Sleep(2 * time.Second) + }() + + wg.Add(1) + go func() { + defer wg.Done() + brs.Lock() + defer brs.Unlock() + time.Sleep(300 * time.Millisecond) // delay for consistent print + + fmt.Printf("#3 is in the locked zone\n") + time.Sleep(3 * time.Second) + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer close(ch) // exit signal + max := 2 // configure me + for { + if max == 0 { + break + } + max-- + time.Sleep(4 * time.Second) + brs.Lock() + time.Sleep(100 * time.Millisecond) // delay for consistent print + fmt.Printf("#4 is in the locked zone\n") + + brs.Unlock() + time.Sleep(100 * time.Millisecond) // delay for consistent print + fmt.Printf("#4 is in the unlocked zone\n") + } + }() + +Loop: + for { + select { + case <-ch: // exit signal + break Loop + + case <-brs.Start(): // An empty value is received to start the locking. + fmt.Printf("shared mutex start\n") + } + + // subsequent Lock's that happen when at least one Lock is + // already held are permitted... + time.Sleep(1 * time.Second) + + // something happens here + + select { + case <-brs.End(): // An empty values is received when the last Unlock happens. + fmt.Printf("shared mutex end\n") + } + } + + // Output: hello + // shared mutex start + // #1 is in the locked zone + // #2 is in the locked zone + // #3 is in the locked zone + // shared mutex end + // shared mutex start + // #4 is in the locked zone + // shared mutex end + // #4 is in the unlocked zone + // shared mutex start + // #4 is in the locked zone + // shared mutex end + // #4 is in the unlocked zone + // goodbye +}