RWMutex

使用场景

读写锁适用于读多写少的场景。基础特性包括:

  • RLock被reader占用时,Lock(/WLock)会被block。反之依然
  • RLock被占用时,其他读线程也可以获取到RLock。即允许多个reader同时访问共享资源
  • WLock同一时间只能被一个routine/thread获取

根据不同的优先级策略,可以有不同的实现方案:

  • 读优先(Read-preferring RW locks)
    读优先允许最大量的并发,但是竞争激烈时可能导致write-starvation。因为读多写少的场景下,如果读请求的优先级更高,写锁获取成功的概率就更低了。

  • 写优先(Write-preferring RW locks)
    为了避免write-starvation的情况出现,给WLock的优先级更高,如果RLock已经被占用,那么WLock也将会在block中, 此时后续的新到的RLock调用也将被block,直到RLock被释放,WLock被writer获取成功后释放。
    对比读优先,所允许的并发量相对要小一些。且实现更为复杂,性能也相对也要差一些。

  • 总结:A good way to think about it is RWMutex is a Mutex with a reader counter. RLock increments the counter while RUnlock decrements it. A call to Lock will block as long as that counter is > 0.

实现原理

一些大致的实现思路

思路一

一个写锁(mutex)保证writer互斥
一个读锁和计数器实现读锁。读锁用户维护计数器的准确性。计数器用于多个reader计数

读优先的实现:

//读锁
RLock()
r.lock()
r_cnt++
if r_cnt==1: // 读写互斥
w.lock()
r.unlock()

RUnlock()
r.lock()
r_cnt--
if r_cnt==0:
w.unlock()
r.unlock()

//写锁
Lock()
w.lock()

Unlock()
w.unlock()

思路二

使用锁和条件变量

RLock()
m.lock()
while w: // wLock使用中,block到被唤醒
c.wait(m)
r++
m.unlock()

RUnlock()
m.lock()
r--
if r==0:
c.signal_all() // 唤醒writer
m.unlock()

Lock()
m.lock()
while w || r > 0 : // 有reader,block到被唤醒
c.wait(m)
w = true
m.unlock()

Unlock()
m.lock()
w = false
c.signal_all() // 唤醒reader或writer
m.unlock()

golang的实现

为避免writer被饿死,RWMutex中至少需要记录三个信息:

  • 使用中的reader计数器:a
  • writer请求中/使用中的标志:b
  • 挂起在writer之后的reader:c

golang用两个变量包括了以上三部分信息,具体见golang的实现(忽略race部分)

type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers. writer等待reader完成的信号量
readerSem uint32 // semaphore for readers to wait for completing writers. reader等待writer完成的信号量

// 计数器承担两个作用,一个是用来记录reader的个数,另一个作用用作判断是否有writer挂起或者在占有锁
readerCount int32 // number of pending readers. 挂起的reader和使用中的reader,如果有writer操作(挂起/占有),计数器会变为负数以示区分
readerWait int32 // number of departing readers. 使用中的reader
}

func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}

// 没有writer时只需执行+1操作.
// 如果有writer调用,计数器会被置为负数。此时reader被block,等待writer unlock后唤醒
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_Semacquire(&rw.readerSem)
}

if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}

// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}

// 计数器小于0,表示有writer在等待
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
// 使用中的reader已全部释放,唤醒等待中的writer
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}
if race.Enabled {
race.Enable()
}
}

// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}

// First, resolve competition with other writers.
rw.w.Lock()

// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}

// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) an RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Release(unsafe.Pointer(&rw.writerSem))
race.Disable()
}

// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}

参考