go RWMutex的实现示例

目录
  • Overview
  • RWMutex的结构
  • Lock
  • Unlock
  • RLock
  • RUnlock
  • Q1:多个协程并发拿读锁,如何保证这些读锁协程都不会被阻塞?
  • Q2:多个协程并发拿写锁,如何保证只会有一个协程拿到写锁?
  • Q3:在读锁被拿到的情况下,新协程拿写锁,如果保证写锁现成会被阻塞?
  • Q4:在读锁被拿到的情况下,新协程拿写锁被阻塞,当旧有的读锁协程全部释放,如何唤醒等待的写锁协程
  • Q5:在写锁被拿到的情况下,新协程拿读锁,如何让新协程被阻塞?
  • Q6:在写锁被拿到的情况下,新协程拿读锁,写锁协程释放,如何唤醒等待的读锁协程?
  • Q7:在写锁被拿到的情况下,有两个协程分别去抢读锁和写锁,当写锁被释放时,这两个协程谁会胜利?
  • 认为写的比较巧妙的两个点

Overview

go 里面的 rwlock 是 write preferred 的,可以避免写锁饥饿。

读锁和写锁按照先来后到的规则持有锁,一旦有协程持有了写锁,后面的协程只能在写锁被释放后才能得到读锁。

同样,一旦有 >= 1 个协程写到了读锁,只有等这些读锁全部释放后,后面的协程才能拿到写锁。

下面了解一下 Go 的 RWMutex 是如何实现的吧,下面的代码取自 go1.17.2/src/sync/rwmutex.go,并删减了 race 相关的代码。

PS: rwmutex 的代码挺短的,其实读源码也没那么可怕...

RWMutex 的结构

RWMutex 总体上是通过: 普通锁和条件变量来实现的

type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // semaphore for writers to wait for completing readers
	readerSem   uint32 // semaphore for readers to wait for completing writers
	readerCount int32  // number of pending readers
	readerWait  int32  // number of departing readers
}

Lock

func (rw *RWMutex) Lock() {
	// First, resolve competition with other writers.
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
}

Unlock

const rwmutexMaxReaders = 1 << 30

func (rw *RWMutex) Unlock() {
	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
}

RLock

func (rw *RWMutex) RLock() {
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}

RUnlock

func (rw *RWMutex) RUnlock() {
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
	// A writer is pending.
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

Q1: 多个协程并发拿读锁,如何保证这些读锁协程都不会被阻塞?

func (rw *RWMutex) RLock() {
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}

拿读锁时,仅仅会增加 readerCount,因此读锁之间是可以正常并发的

Q2: 多个协程并发拿写锁,如何保证只会有一个协程拿到写锁?

func (rw *RWMutex) Lock() {
	// First, resolve competition with other writers.
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
}

拿写锁时,会获取 w.Lock,自然能保证同一时间只会有一把写锁

Q3: 在读锁被拿到的情况下,新协程拿写锁,如果保证写锁现成会被阻塞?

func (rw *RWMutex) Lock() {
	// First, resolve competition with other writers.
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
}

假设此时有 5 个协程拿到读锁,则 readerCount = 5,假设 rwmutexMaxReaders = 100。

此时有一个新的协程 w1 想要拿写锁。

在执行

r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders

后, rw.readerCount = -95,r = 5。

在执行

atomic.AddInt32(&rw.readerWait, r)

后,rw.readerWait = 5。

readerWait 记录了在获取写锁的这一瞬间有多少个协程持有读锁。这一瞬间之后,就算有新的协程尝试获取读锁,也只会增加 readerCount ,而不会动到 readerWait。

之后执行 runtime_SemacquireMutex() 睡在了 writerSem 这个信号量上面。

Q4: 在读锁被拿到的情况下,新协程拿写锁被阻塞,当旧有的读锁协程全部释放,如何唤醒等待的写锁协程

func (rw *RWMutex) RUnlock() {
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
	// A writer is pending.
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

继续上一步的场景,每当执行 RUnlock 时,readerCount 都会减去1。当 readerCount 为负数时,意味着有协程正在持有或者正在等待持有写锁。

之前的五个读协程中的四个,每次 RUnlock() 之后,readerCount = -95 - 4 = -99,readerWait = 5 - 4 = 1。

当最后一个读协程调用 RUnlock() 之后,readerCount 变成了 -100,readerWait 变成 0,此时会唤醒在 writerSem 上沉睡的协程 w1。

Q5: 在写锁被拿到的情况下,新协程拿读锁,如何让新协程被阻塞?

func (rw *RWMutex) RLock() {
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}

继续上面的场景,readerCount = -100 + 1 = -99 < 0。

新的读协程 r1 被沉睡在 readerSem 下面。

假设此时再来一个读协程 r2,则 readerCount = -98,依旧沉睡。

Q6: 在写锁被拿到的情况下,新协程拿读锁,写锁协程释放,如何唤醒等待的读锁协程?

继续上面的场景,此时协程 w1 释放写锁

func (rw *RWMutex) Unlock() {
	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&amp;rw.readerCount, rwmutexMaxReaders)
	// Unblock blocked readers, if any.
	for i := 0; i &lt; int(r); i++ {
		runtime_Semrelease(&amp;rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
}

在执行

atomic.AddInt32(&amp;rw.readerCount, rwmutexMaxReaders)

后,r = readerCount = -98 + 100 = 2,代表此时有两个读协程 r1 和 r2 在等待

ps: 如果此时有一些新的协程想要拿读锁,他会因为 readerCount = 2 + 1 = 3 > 0 而顺利执行下去,不会被阻塞

之后 for 循环执行两次,将协程 r1 和 协程 r2 都唤醒了。

Q7: 在写锁被拿到的情况下,有两个协程分别去抢读锁和写锁,当写锁被释放时,这两个协程谁会胜利?

func (rw *RWMutex) Unlock() {
	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&amp;rw.readerCount, rwmutexMaxReaders)
	// Unblock blocked readers, if any.
	for i := 0; i &lt; int(r); i++ {
		runtime_Semrelease(&amp;rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
}

由于是先唤醒读锁,再调用 w.Unlock() ,因此肯定是读协程先胜利!

认为写的比较巧妙的两个点

  • readerCount 与 rwmutexMaxReaders 的纠缠

    通过 readerCount + rwmutexMaxReaders 以及 readerCount - rwmutexMaxReaders 这两个操作可以得知当前是否有协程等待/持有写锁以及当前等待/持有读锁的协程数量

  • readerCount 与 readerWait 的纠缠

    在 Lock() 时直接将 readerCount 的值赋给 readerWait,在 readerWait = 0 而非 readerCount = 0 是唤醒写协程,可以避免在 Lock() 后来达到的读协程先于写协程被执行。

到此这篇关于go RWMutex的实现示例的文章就介绍到这了,更多相关go RWMutex内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 详解golang RWMutex读写互斥锁源码分析

    针对Golang 1.9的sync.RWMutex进行分析,与Golang 1.10基本一样除了将panic改为了throw之外其他的都一样. RWMutex是读写互斥锁.锁可以由任意数量的读取器或单个写入器来保持. RWMutex的零值是一个解锁的互斥锁. 以下代码均去除race竞态检测代码 源代码位置:sync\rwmutex.go 结构体 type RWMutex struct { w Mutex // 互斥锁 writerSem uint32 // 写锁信号量 readerSem uin

  • Go语言并发编程之互斥锁Mutex和读写锁RWMutex

    目录 一.互斥锁Mutex 1.Mutex介绍 2.Mutex使用实例 二.读写锁RWMutex 1.RWMutex介绍 2.RWMutex使用实例 在并发编程中,多个Goroutine访问同一块内存资源时可能会出现竞态条件,我们需要在临界区中使用适当的同步操作来以避免竞态条件.Go 语言中提供了很多同步工具,本文将介绍互斥锁Mutex和读写锁RWMutex的使用方法. 一.互斥锁Mutex 1.Mutex介绍 Go 语言的同步工具主要由 sync 包提供,互斥锁 (Mutex) 与读写锁 (R

  • go RWMutex的实现示例

    目录 Overview RWMutex的结构 Lock Unlock RLock RUnlock Q1:多个协程并发拿读锁,如何保证这些读锁协程都不会被阻塞? Q2:多个协程并发拿写锁,如何保证只会有一个协程拿到写锁? Q3:在读锁被拿到的情况下,新协程拿写锁,如果保证写锁现成会被阻塞? Q4:在读锁被拿到的情况下,新协程拿写锁被阻塞,当旧有的读锁协程全部释放,如何唤醒等待的写锁协程 Q5:在写锁被拿到的情况下,新协程拿读锁,如何让新协程被阻塞? Q6:在写锁被拿到的情况下,新协程拿读锁,写锁协

  • GO语言协程互斥锁Mutex和读写锁RWMutex用法实例详解

    sync.Mutex Go中使用sync.Mutex类型实现mutex(排他锁.互斥锁).在源代码的sync/mutex.go文件中,有如下定义: // A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 sem

  • golang并发安全及读写互斥锁的示例分析

    目录 并发安全和锁 互斥锁 读写互斥锁 并发安全和锁 有时候在Go代码中可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态).类比现实生活中的例子有十字路口被各个方向的的汽车竞争:还有火车上的卫生间被车厢里的人竞争. 举个例子: var x int64 var wg sync.WaitGroup func add() { for i := 0; i < 5000; i++ { x = x + 1 } wg.Done() } func main() { w

  • Go读写锁操作方法示例详解

    目录 引言 读写锁有很多方法 读操作 写操作 引言 前面讲到,在资源竞争的时候可以使用互斥锁,保证了资源访问的唯一性,但也降低了性能,仔细分析一下场景,如果只是读取数据,无论多少个goroutine都是不会存在逻辑上的互斥操作的. 这里读写锁 RWMutex就应运而生了,RWMutex可以分别针对读操作和写操作进行上锁和解锁. RWMutex同一时刻允许多个读操作进行,但只允许一个写操作进行,同时,在某一个写操作进行的时候,读操作不可进行. 读写锁有很多方法 方法一: RLock 这个方法是读锁

  • Golang信号量设计实现示例详解

    目录 开篇 信号量 semaphore 扩展库实现 Acquire Release TryAcquire 总结 开篇 在我们此前的文章 Golang Mutex 原理解析 中曾提到过,Mutex 的底层结构包含了两个字段,state 和 sema: type Mutex struct { state int32 sema uint32 } state 代表互斥锁的状态,比如是否被锁定: sema 表示信号量,协程阻塞会等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程. 这个 sema

  • GO中sync包自由控制并发示例详解

    目录 资源竞争 sync.Mutex sync.RWMutex sync.WaitGroup sync.Once sync.Cond 资源竞争 channel 常用于并发通信,要保证并发安全,主要使用互斥锁.在并发的过程中,当一个内存被多个 goroutine 同时访问时,就会产生资源竞争的情况.这块内存也可以称为共享资源. 并发时对于共享资源必然会出现抢占资源的情况,如果是对某资源的统计,很可能就会导致结果错误.为保证只有一个协程拿到资源并操作它,可以引入互斥锁 sync.Mutex. syn

  • Golang设计模式工厂模式实战写法示例详解

    目录 拆出主板 工厂模式流程 代码实战 抽象能力,定义接口 实现工厂,支持注册和获取实现 主流程只依赖接口完成 扩展 => 适配器,实现接口 注册适配器到工厂里 小结 拆出主板 今天带大家看一下怎么用 Go 写工厂模式的代码,我们来学习一个实战案例.这个写法笔者日常经常使用,能够很有效地帮助大家实现 Separation of Concerns. 主板就是一个程序的主流程.比如我们要基于一份学习资料来消化,吸收知识.我们可能有下面几步流程: 准备好笔记本: 打开资料: 阅读资料内容,思考并记录关

  • Go语言读写锁RWMutex的源码分析

    目录 前言 RWMutex 总览 深入源码 数据结构 RLock() RUnlock() Lock() Unlock() 常见问题 实战一下 前言 在前面两篇文章中 初见 Go Mutex .Go Mutex 源码详解,我们学习了 Go语言 中的 Mutex,它是一把互斥锁,每次只允许一个 goroutine 进入临界区,可以保证临界区资源的状态正确性.但是有的情况下,并不是所有 goroutine 都会修改临界区状态,可能只是读取临界区的数据,如果此时还是需要每个 goroutine 拿到锁依

  • GO的锁和原子操作的示例详解

    目录 GO的锁和原子操作分享 锁是什么 锁是用来做什么的 互斥锁 互斥锁 - 解决问题 读写锁 我们先来写一个读写锁的DEMO 自旋锁和互斥锁的区别 如何选择锁 啥是原子操作 总结 GO的锁和原子操作分享 上次我们说到协程,我们再来回顾一下: 协程类似线程,是一种更为轻量级的调度单位 线程是系统级实现的,常见的调度方法是时间片轮转法 协程是应用软件级实现,原理与线程类似 协程的调度基于 GPM 模型实现 要是对协程的使用感兴趣的话,可以看看这篇文章简单了解一下瞅一眼就会使用GO的并发编程分享 今

  • go-cache的基本使用场景示例解析

    目录 什么是 go-cache 使用 导入 快速开始 常量与结构体 常量 结构体 Set() Get() 删除 其他 备份恢复数据 什么是 go-cache go-cache 是一个轻量级的基于内存的 K-V 储存组件,内部实现了一个线程安全的 map[string]interface{},适用于单机应用.具备如下功能: 线程安全,多 goroutine 并发安全访问: 每个 item 可以设置过期时间(或无过期时间): 自动定期清理过期的 item: 可以自定义清理回调函数: 这里的 item

随机推荐