go并发编程sync.Cond使用场景及实现原理

目录
  • 使用场景
  • 实现原理
    • copyChecker
    • Wait
    • Signal
    • Broadcast

使用场景

sync.Cond是go标准库提供的一个条件变量,用于控制一组goroutine在满足特定条件下被唤醒。

sync.Cond常用于一组goroutine等待,一个goroutine通知(事件发生)的场景。如果只有一个goroutine等待,一个goroutine通知(事件发生),使用Mutex或者Channel就可以实现。

可以用一个全局变量标志特定条件condition,每个sync.Cond都必须要关联一个互斥锁(Mutex或者RWMutex),当condition发生变更或者调用Wait时,都必须加锁,保证多个goroutine安全地访问condition。

下面是go标准库http中关于pipe的部分实现,我们可以看到,pipe使用sync.Cond来控制管道中字节流的写入和读取,在pipe中数据可用并且字节流复制到pipe的缓冲区之前,所有的需要读取该管道数据的goroutine都必须等待,直到数据准备完成。

type pipe struct {
   mu       sync.Mutex
   c        sync.Cond     // c.L lazily initialized to &p.mu
   b        pipeBuffer    // nil when done reading
   ...
}
// Read waits until data is available and copies bytes
// from the buffer into p.
func (p *pipe) Read(d []byte) (n int, err error) {
   p.mu.Lock()
   defer p.mu.Unlock()
   if p.c.L == nil {
      p.c.L = &p.mu
   }
   for {
      ...
      if p.b != nil && p.b.Len() > 0 {
         return p.b.Read(d)
      }
      ...
      p.c.Wait() // write未完成前调用Wait进入等待
   }
}
// Write copies bytes from p into the buffer and wakes a reader.
// It is an error to write more data than the buffer can hold.
func (p *pipe) Write(d []byte) (n int, err error) {
   p.mu.Lock()
   defer p.mu.Unlock()
   if p.c.L == nil {
      p.c.L = &p.mu
   }
   defer p.c.Signal() // 唤醒所有等待的goroutine
   if p.err != nil {
      return 0, errClosedPipeWrite
   }
   if p.breakErr != nil {
      p.unread += len(d)
      return len(d), nil // discard when there is no reader
   }
   return p.b.Write(d)
}

实现原理

type Cond struct {
   noCopy noCopy       // 用来保证结构体无法在编译期间拷贝
   // L is held while observing or changing the condition
   L Locker             // 用来保证condition变更安全
   notify  notifyList   // 待通知的goutine列表
   checker copyChecker  // 用于禁止运行期间发生的拷贝
}
type notifyList struct {
   wait   uint32      // 正在等待的goroutine的ticket
   notify uint32      // 已经通知到的goroutine的ticket
   lock   uintptr // key field of the mutex
   head   unsafe.Pointer     // 链表头部
   tail   unsafe.Pointer     // 链表尾部
}

copyChecker

copyChecker是一个指针类型,在创建时,它的值指向自身地址,用于检测该对象是否发生了拷贝。如果发生了拷贝,则直接panic。

// copyChecker holds back pointer to itself to detect object copying.
type copyChecker uintptr
func (c *copyChecker) check() {
   if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
      !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
      uintptr(*c) != uintptr(unsafe.Pointer(c)) {
      panic("sync.Cond is copied")
   }
}

Wait

调用 Wait 会自动释放锁 c.L,并挂起调用者所在的 goroutine,因此当前协程会阻塞在 Wait 方法调用的地方。如果其他协程调用了 Signal 或 Broadcast 唤醒了该协程,那么 Wait 方法在结束阻塞时,会重新给 c.L 加锁,并且继续执行 Wait 后面的代码。

对条件的检查,使用了 for !condition() 而非 if,是因为当前协程被唤醒时,条件不一定符合要求,需要再次 Wait 等待下次被唤醒。为了保险起见,使用 for 能够确保条件符合要求后,再执行后续的代码。

func (c *Cond) Wait() {
   c.checker.check()
   t := runtime_notifyListAdd(&c.notify)
   c.L.Unlock()
   runtime_notifyListWait(&c.notify, t)
   c.L.Lock()
}
  • 检查Cond是否被复制,如果被复制,直接panic;
  • 调用runtime_notifyListAdd调用者添加到通知列表并解锁,以便可以接收到通知,然后将返回的ticket传入到runtime_notifyListWait来等待通知。
  • 当前goroutine会阻塞在wait调用的地方,直到其他goroutine调用Signal或Broadcast唤醒该协程。
func notifyListAdd(l *notifyList) uint32 {
    return atomic.Xadd(&l.wait, 1) - 1
}

notifyListWait会将当前goroutine追加到链表的尾端,同时调用goparkunlock让当前goroutine陷入休眠,该方法会直接让出当前处理器的使用权并等待调度器的唤醒。

func notifyListWait(l *notifyList, t uint32) {
    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    if l.tail == nil {
       l.head = s
    } else {
       l.tail.next = s
    }
    l.tail = s
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    releaseSudog(s)
}

Signal

Signal会唤醒队列最前面的Goroutine。

func (c *Cond) Signal() {
   c.checker.check()
   runtime_notifyListNotifyOne(&c.notify)
}
func notifyListNotifyOne(l *notifyList) {
   t := l.notify
   atomic.Store(&l.notify, t+1)
   for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
      if s.ticket == t {
         n := s.next
         if p != nil {
            p.next = n
         } else {
            l.head = n
         }
         if n == nil {
            l.tail = p
         }
         s.next = nil
         readyWithTime(s, 4)
         return
      }
   }
}

Broadcast

Broadcast会唤醒队列中全部的goroutine。

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}
func notifyListNotifyAll(l *notifyList) {
   s := l.head
   l.head = nil
   l.tail = nil
   atomic.Store(&l.notify, atomic.Load(&l.wait))
   for s != nil {
      next := s.next
      s.next = nil
      readyWithTime(s, 4)
      s = next
   }
}

以上就是go并发编程sync.Cond使用场景及实现原理的详细内容,更多关于go并发编程sync.Cond的资料请关注我们其它相关文章!

(0)

相关推荐

  • Go并发编程之sync.Once使用实例详解

    目录 一.序 二. 源码分析 2.1结构体 2.2 接口 三. 使用场景案例 3.1 单例模式 3.2 加载配置文件示例 四.总结 五. 参考 一.序 单从库名大概就能猜出其作用.sync.Once使用起来很简单, 下面是一个简单的使用案例 package main import ( "fmt" "sync" ) func main() { var ( once sync.Once wg sync.WaitGroup ) for i := 0; i < 10;

  • Go并发编程实践

    前言 并发编程一直是Golang区别与其他语言的很大优势,也是实际工作场景中经常遇到的.近日笔者在组内分享了我们常见的并发场景,及代码示例,以期望大家能在遇到相同场景下,能快速的想到解决方案,或者是拿这些方案与自己实现的比较,取长补短.现整理出来与大家共享. 简单并发场景 很多时候,我们只想并发的做一件事情,比如测试某个接口的是否支持并发.那么我们就可以这么做: func RunScenario1() { count := 10 var wg sync.WaitGroup for i := 0;

  • GO语言并发编程之互斥锁、读写锁详解

    在本节,我们对Go语言所提供的与锁有关的API进行说明.这包括了互斥锁和读写锁.我们在第6章描述过互斥锁,但却没有提到过读写锁.这两种锁对于传统的并发程序来说都是非常常用和重要的. 一.互斥锁 互斥锁是传统的并发程序对共享资源进行访问控制的主要手段.它由标准库代码包sync中的Mutex结构体类型代表.sync.Mutex类型(确切地说,是*sync.Mutex类型)只有两个公开方法--Lock和Unlock.顾名思义,前者被用于锁定当前的互斥量,而后者则被用来对当前的互斥量进行解锁. 类型sy

  • Go并发编程中sync/errGroup的使用

    目录 一.序 二.errGroup 2.1 函数签名 三.源码 3.1 Group 3.2 WaitContext 3.3 Go 3.4 Wait 四. 案例 五. 参考 一.序 这一篇算是并发编程的一个补充,起因是当前有个项目,大概の 需求是,根据kafka的分区(partition)数,创建同等数量的 消费者( goroutine)从不同的分区中消费者消费数据,但是总有某种原因导致,某一个分区消费者创建失败,但是其他分区消费者创建失败. 最初的逻辑是,忽略分区失败的逻辑,将成功创建的分区消费

  • Go并发编程之goroutine使用正确方法

    目录 1. 对创建的gorouting负载 1.1 不要创建一个你不知道何时退出的 goroutine 1.2 不要帮别人做选择 1.3 不要作为一个旁观者 1.4 不要创建不知道什么时候退出的 goroutine 1.5 不要创建都无法退出的 goroutine 1.6 确保创建出的goroutine工作已经完成 2. 总结 3. 参考 并发(concurrency): 指在同一时刻只能有一条指令执行,但多个进程指令被快速的轮换执行,使得在宏观上具有多个进程同时执行的效果,但在微观上并不是同时

  • golang 并发编程之生产者消费者详解

    golang 最吸引人的地方可能就是并发了,无论代码的编写上,还是性能上面,golang 都有绝对的优势 学习一个语言的并发特性,我喜欢实现一个生产者消费者模型,这个模型非常经典,适用于很多的并发场景,下面我通过这个模型,来简单介绍一下 golang 的并发编程 go 并发语法 协程 go 协程是 golang 并发的最小单元,类似于其他语言的线程,只不过线程的实现借助了操作系统的实现,每次线程的调度都是一次系统调用,需要从用户态切换到内核态,这是一项非常耗时的操作,因此一般的程序里面线程太多会

  • go并发编程sync.Cond使用场景及实现原理

    目录 使用场景 实现原理 copyChecker Wait Signal Broadcast 使用场景 sync.Cond是go标准库提供的一个条件变量,用于控制一组goroutine在满足特定条件下被唤醒. sync.Cond常用于一组goroutine等待,一个goroutine通知(事件发生)的场景.如果只有一个goroutine等待,一个goroutine通知(事件发生),使用Mutex或者Channel就可以实现. 可以用一个全局变量标志特定条件condition,每个sync.Con

  • Go并发编程sync.Cond的具体使用

    目录 简介 详细介绍 案例:Redis连接池 注意点 简介 Go 标准库提供 Cond 原语的目的是,为等待 / 通知场景下的并发问题提供支持.Cond 通常应用于等待某个条件的一组 goroutine,等条件变为 true 的时候,其中一个 goroutine 或者所有的 goroutine 都会被唤醒执行. Cond 是和某个条件相关,这个条件需要一组 goroutine 协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine 都会被阻塞住,只有这一组 goroutin

  • Go语言并发编程 sync.Once

    sync.Once用于保证某个动作只被执行一次,可用于单例模式中,比如初始化配置.我们知道init()函数也只会执行一次,不过它是在main()函数之前执行,如果想要在代码执行过程中只运行某个动作一次,可以使用sync.Once,下面来介绍一下它的使用方法. 先来看下面的代码: package main import ( "fmt" "sync" ) func main() { var num = 6 var once sync.Once add_one := fu

  • Go语言sync.Cond基本使用及原理示例详解

    目录 1. 简介 2. 基本使用 2.1 定义 2.2 方法说明 2.3 使用方式 2.4 使用例子 2.5 为什么Sync.Cond 需要关联一个锁,然后调用Wait方法前需要先获取该锁 3.使用场景 3.1 基本说明 3.2 场景说明 3.2.1 同步和协调多个协程之间共享资源 3.2.2 需要重复唤醒的场景中使用 4. 原理 4.1 基本原理 4.2 实现 4.2.1 Wait方法实现 4.2.2 Singal方法实现 4.2.3 Broadcast方法实现 5.使用注意事项 5.1 调用

  • Go语言中sync.Cond使用详解

    目录 sync.Cond 可以用来干什么? 与 Sync.Mutex 的区别 sync.Cond 使用场景 sync.Cond sync.Cond 有哪些方法 NewCond 创建实例 Broadcast 广播唤醒所有 Signal 唤醒一个协程 Wait 等待 代码示例 sync.Cond 可以用来干什么? Golang 的 sync 包中的 Cond 实现了一种条件变量,可以使用多个 Reader 等待公共资源. 每个 Cond 都会关联一个 Lock ,当修改条件或者调用 Wait 方法,

  • 一文带你深入理解Go语言中的sync.Cond

    目录 sync.Cond 是什么 适用场景 sync.Cond 的基本用法 NewCond 创建实例 Wait 等待条件满足 Signal 通知一个等待的 goroutine Broadcast 通知所有等待的 goroutine sync.Cond 使用实例 为什么要用 sync.Cond close channel 广播实例 sync.Cond 基本原理 sync.Cond 的设计与实现 sync.Cond 模型 notifyList 结构体 sync.Cond 的方法 Wait 方法 Si

  • Java并发编程总结——慎用CAS详解

    一.CAS和synchronized适用场景 1.对于资源竞争较少的情况,使用synchronized同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗cpu资源:而CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能. 2.对于资源竞争严重的情况,CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized.以java.util.concurrent.atomic包中AtomicInteger类为例,其getAn

  • Java多线程并发编程(互斥锁Reentrant Lock)

    Java 中的锁通常分为两种: 通过关键字 synchronized 获取的锁,我们称为同步锁,上一篇有介绍到:Java 多线程并发编程 Synchronized 关键字. java.util.concurrent(JUC)包里的锁,如通过继承接口 Lock 而实现的 ReentrantLock(互斥锁),继承 ReadWriteLock 实现的 ReentrantReadWriteLock(读写锁). 本篇主要介绍 ReentrantLock(互斥锁). ReentrantLock(互斥锁)

随机推荐