总结:A good way to think about it is RWMutex is a Mutex with a reader counter. RLock increments the counter while RUnlock decrements it. A call to Lock will block as long as that counter is > 0.
//读锁 RLock() r.lock() r_cnt++ if r_cnt==1: // 读写互斥 w.lock() r.unlock()
RUnlock() r.lock() r_cnt-- if r_cnt==0: w.unlock() r.unlock()
//写锁 Lock() w.lock()
Unlock() w.unlock()
思路二
使用锁和条件变量
RLock() m.lock() while w: // wLock使用中,block到被唤醒 c.wait(m) r++ m.unlock()
RUnlock() m.lock() r-- if r==0: c.signal_all() // 唤醒writer m.unlock()
Lock() m.lock() while w || r > 0 : // 有reader,block到被唤醒 c.wait(m) w = true m.unlock()
Unlock() m.lock() w = false c.signal_all() // 唤醒reader或writer m.unlock()
golang的实现
为避免writer被饿死,RWMutex中至少需要记录三个信息:
使用中的reader计数器:a
writer请求中/使用中的标志:b
挂起在writer之后的reader:c
golang用两个变量包括了以上三部分信息,具体见golang的实现(忽略race部分)
type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32 // semaphore for writers to wait for completing readers. writer等待reader完成的信号量 readerSem uint32 // semaphore for readers to wait for completing writers. reader等待writer完成的信号量
// 计数器承担两个作用,一个是用来记录reader的个数,另一个作用用作判断是否有writer挂起或者在占有锁 readerCount int32 // number of pending readers. 挂起的reader和使用中的reader,如果有writer操作(挂起/占有),计数器会变为负数以示区分 readerWait int32 // number of departing readers. 使用中的reader }
// 没有writer时只需执行+1操作. // 如果有writer调用,计数器会被置为负数。此时reader被block,等待writer unlock后唤醒 if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_Semacquire(&rw.readerSem) }
if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } }
// RUnlock undoes a single RLock call; // it does not affect other simultaneous readers. // It is a run-time error if rw is not locked for reading // on entry to RUnlock. func (rw *RWMutex) RUnlock() { if race.Enabled { _ = rw.w.state race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) race.Disable() }
// 计数器小于0,表示有writer在等待 if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { if r+1 == 0|| r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex") } // A writer is pending. // 使用中的reader已全部释放,唤醒等待中的writer if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false) } } if race.Enabled { race.Enable() } }
// Lock locks rw for writing. // If the lock is already locked for reading or writing, // Lock blocks until the lock is available. func (rw *RWMutex) Lock() { if race.Enabled { _ = rw.w.state race.Disable() }
// First, resolve competition with other writers. rw.w.Lock()
// Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0&& atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_Semacquire(&rw.writerSem) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } }
// Unlock unlocks rw for writing. It is a run-time error if rw is // not locked for writing on entry to Unlock. // // As with Mutexes, a locked RWMutex is not associated with a particular // goroutine. One goroutine may RLock (Lock) an RWMutex and then // arrange for another goroutine to RUnlock (Unlock) it. func (rw *RWMutex) Unlock() { if race.Enabled { _ = rw.w.state race.Release(unsafe.Pointer(&rw.readerSem)) race.Release(unsafe.Pointer(&rw.writerSem)) race.Disable() }
// Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() throw("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false) } // Allow other writers to proceed. rw.w.Unlock() if race.Enabled { race.Enable() } }