util: Add a new sync primitive named brs

This adds the BoundedReadSemaphore mutex that I invented. It's not that
is necessarily particularly revolutionary, but it is useful. I wish I
had a better name for it, but I couldn't think of anything. It's fairly
obvious what it does, so if you have a suggestion of how to name it,
please do so!
This commit is contained in:
James Shubin
2023-07-27 17:42:48 -04:00
parent 514927c0b3
commit 5e0922395c
2 changed files with 183 additions and 0 deletions

View File

@@ -229,3 +229,82 @@ func (obj *SubscribedSignal) Send() {
// release (re-use the above mutex)
}
// BoundedReadSemaphore is a mutex that allows multiple Lock operations to occur
// concurrently, as if they were read locks. The distinction is that for the
// first Lock operation to complete, the Start() channel state must be read. At
// this point subsequent Lock operations will succeed. The End state completes
// once the last paired Unlock operation is run. The cycle can be repeated
// without needing to re-initialize the struct. Each Lock or Unlock operation
// itself contains a call to Lock an internal mutex for accounting and
// implementation purposes.
//
// This was previously named SharedMutex. We welcome alternate naming
// suggestions.
type BoundedReadSemaphore struct {
// mutex locks individual operations on our struct.
mutex *sync.Mutex
// start is the Start signal channel.
start chan struct{}
// end is the End signal channel.
end chan struct{}
// count keeps track of the number of active lockers.
count int64
}
// NewBoundedReadSemaphore creates an initialized object. This must be done
// before first use, as the empty struct is not a valid BoundedReadSemaphore.
func NewBoundedReadSemaphore() *BoundedReadSemaphore {
return &BoundedReadSemaphore{
mutex: &sync.Mutex{},
start: make(chan struct{}),
end: make(chan struct{}),
}
}
// Lock asks for a lock on this mutex. After the first Lock call synchronizes,
// subsequent calls will succeed quickly. The first "synchronization" waits for
// the lock start signal, which is a receive on the Start channel.
func (obj *BoundedReadSemaphore) Lock() {
obj.mutex.Lock()
defer obj.mutex.Unlock()
obj.count++
if obj.count == 1 { // we're the first lock (the mutex guarantees this)
obj.start <- struct{}{} // lock start
}
}
// Unlock unlocks from a previous Lock operation. If this call is the last
// paired unlock operation, it blocks until the End() signal is synchronized. If
// you unlock more times than you lock, then you will cause a panic.
func (obj *BoundedReadSemaphore) Unlock() {
obj.mutex.Lock()
defer obj.mutex.Unlock()
if obj.count == 0 {
panic("negative BoundedReadSemaphore counter")
}
obj.count--
if obj.count == 0 { // we're the last unlock (the mutex guarantees this)
obj.end <- struct{}{} // lock end
}
}
// Start will return a single signal on this output channel to advise that we've
// started the "locked" state.
func (obj *BoundedReadSemaphore) Start() <-chan struct{} {
return obj.start
}
// End will return a single signal on this output channel to advise that we've
// returned to the "unlocked" state.
func (obj *BoundedReadSemaphore) End() <-chan struct{} {
return obj.end
}

View File

@@ -167,3 +167,107 @@ func ExampleSubscribedSignal() {
// done sending signal
// exiting...
}
func ExampleBoundedReadSemaphore() {
fmt.Printf("hello\n")
defer fmt.Printf("goodbye\n")
wg := &sync.WaitGroup{}
defer wg.Wait()
ch := make(chan struct{}) // close signal
brs := NewBoundedReadSemaphore()
wg.Add(1)
go func() {
defer wg.Done()
brs.Lock()
defer brs.Unlock()
time.Sleep(100 * time.Millisecond) // delay for consistent print
fmt.Printf("#1 is in the locked zone\n")
time.Sleep(1 * time.Second)
}()
wg.Add(1)
go func() {
defer wg.Done()
brs.Lock()
defer brs.Unlock()
time.Sleep(200 * time.Millisecond) // delay for consistent print
fmt.Printf("#2 is in the locked zone\n")
time.Sleep(2 * time.Second)
}()
wg.Add(1)
go func() {
defer wg.Done()
brs.Lock()
defer brs.Unlock()
time.Sleep(300 * time.Millisecond) // delay for consistent print
fmt.Printf("#3 is in the locked zone\n")
time.Sleep(3 * time.Second)
}()
wg.Add(1)
go func() {
defer wg.Done()
defer close(ch) // exit signal
max := 2 // configure me
for {
if max == 0 {
break
}
max--
time.Sleep(4 * time.Second)
brs.Lock()
time.Sleep(100 * time.Millisecond) // delay for consistent print
fmt.Printf("#4 is in the locked zone\n")
brs.Unlock()
time.Sleep(100 * time.Millisecond) // delay for consistent print
fmt.Printf("#4 is in the unlocked zone\n")
}
}()
Loop:
for {
select {
case <-ch: // exit signal
break Loop
case <-brs.Start(): // An empty value is received to start the locking.
fmt.Printf("shared mutex start\n")
}
// subsequent Lock's that happen when at least one Lock is
// already held are permitted...
time.Sleep(1 * time.Second)
// something happens here
select {
case <-brs.End(): // An empty values is received when the last Unlock happens.
fmt.Printf("shared mutex end\n")
}
}
// Output: hello
// shared mutex start
// #1 is in the locked zone
// #2 is in the locked zone
// #3 is in the locked zone
// shared mutex end
// shared mutex start
// #4 is in the locked zone
// shared mutex end
// #4 is in the unlocked zone
// shared mutex start
// #4 is in the locked zone
// shared mutex end
// #4 is in the unlocked zone
// goodbye
}