Go并发编程sync.Cond的具体使用

目录
  • 简介
  • 详细介绍
  • 案例:Redis连接池
  • 注意点

简介

Go 标准库提供 Cond 原语的目的是,为等待 / 通知场景下的并发问题提供支持。Cond 通常应用于等待某个条件的一组 goroutine,等条件变为 true 的时候,其中一个 goroutine 或者所有的 goroutine 都会被唤醒执行。

Cond 是和某个条件相关,这个条件需要一组 goroutine 协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine 都会被阻塞住,只有这一组 goroutine 通过协作达到了这个条件,等待的 goroutine 才可能继续进行下去。

这个条件可以是我们自定义的 true/false 逻辑表达式。

但是 Cond 使用的比较少,因为在大部分场景下是可以被 ChannelWaitGroup 来替换的。

详细介绍

下面就是 Cond 的数据结构和对外提供的方法,Cond 内部维护了一个等待队列和锁实例。

type Cond struct {
   noCopy noCopy

   // 锁
   L Locker

   // 等待队列
   notify  notifyList
   checker copyChecker
}

func NeWCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
  • NeWCondNeWCond 方法需要调用者传入一个 Locker 接口,这个接口就 Lock/UnLock 方法,所以我们可以传入一个 sync.Metex 对象
  • Signal:允许调用者唤醒一个等待当前 Condgoroutine。如果 Cond 等待队列中有一个或者多个等待的 goroutine ,则从等待队列中移除第一个 goroutine 并把它唤醒
  • Broadcast:允许调用者唤醒所有等待当前 Condgoroutine。如果 Cond 等待队列中有一个或者多个等待的 goroutine,则清空所有等待的 goroutine,并全部唤醒
  • Wait:会把调用者放入 Cond 的等待队列中并阻塞,直到被 Signal 或者 Broadcast 的方法从等待队列中移除并唤醒

案例:Redis连接池

可以看一下下面的代码,使用了 Cond 实现一个 Redis 的连接池,最关键的代码就是在链表为空的时候需要调用 CondWait 方法,将 gorutine 进行阻塞。然后 goruntine 在使用完连接后,将连接返回池子后,需要通知其他阻塞的 goruntine 来获取连接。

package main

import (
   "container/list"
   "fmt"
   "math/rand"
   "sync"
   "time"
)

// 连接池
type Pool struct {
   lock    sync.Mutex // 锁
   clients list.List  // 连接
   cond    *sync.Cond // cond实例
   close   bool       // 是否关闭
}

// Redis Client
type Client struct {
   id int32
}

// 创建Redis Client
func NewClient() *Client {
   return &Client{
      id: rand.Int31n(100000),
   }
}

// 关闭Redis Client
func (this *Client) Close() {
   fmt.Printf("Client:%d 正在关闭", this.id)
}

// 创建连接池
func NewPool(maxConnNum int) *Pool {
   pool := new(Pool)
   pool.cond = sync.NewCond(&pool.lock)

   // 创建连接
   for i := 0; i < maxConnNum; i++ {
      client := NewClient()
      pool.clients.PushBack(client)
   }

   return pool
}

// 从池子中获取连接
func (this *Pool) Pull() *Client {
   this.lock.Lock()
   defer this.lock.Unlock()

   // 已关闭
   if this.close {
      fmt.Println("Pool is closed")
      return nil
   }

   // 如果连接池没有连接 需要阻塞
   for this.clients.Len() <= 0 {
      this.cond.Wait()
   }

   // 从链表中取出头节点,删除并返回
   ele := this.clients.Remove(this.clients.Front())
   return ele.(*Client)
}

// 将连接放回池子
func (this *Pool) Push(client *Client) {
   this.lock.Lock()
   defer this.lock.Unlock()

   if this.close {
      fmt.Println("Pool is closed")
      return
   }

   // 向链表尾部插入一个连接
   this.clients.PushBack(client)

   // 唤醒一个正在等待的goruntine
   this.cond.Signal()
}

// 关闭池子
func (this *Pool) Close() {
   this.lock.Lock()
   defer this.lock.Unlock()

   // 关闭连接
   for e := this.clients.Front(); e != nil; e = e.Next() {
      client := e.Value.(*Client)
      client.Close()
   }

   // 重置数据
   this.close = true
   this.clients.Init()
}

func main() {

   var wg sync.WaitGroup

   pool := NewPool(3)
   for i := 1; i <= 10; i++ {
      wg.Add(1)
      go func(index int) {

         defer wg.Done()

         // 获取一个连接
         client := pool.Pull()

         fmt.Printf("Time:%s | 【goruntine#%d】获取到client[%d]\n", time.Now().Format("15:04:05"), index, client.id)
         time.Sleep(time.Second * 5)
         fmt.Printf("Time:%s | 【goruntine#%d】使用完毕,将client[%d]放回池子\n", time.Now().Format("15:04:05"), index, client.id)

         // 将连接放回池子
         pool.Push(client)
      }(i)
   }

   wg.Wait()
}

运行结果:

Time:15:10:25 | 【goruntine#7】获取到client[31847]
Time:15:10:25 | 【goruntine#5】获取到client[27887]
Time:15:10:25 | 【goruntine#10】获取到client[98081]
Time:15:10:30 | 【goruntine#5】使用完毕,将client[27887]放回池子
Time:15:10:30 | 【goruntine#6】获取到client[27887]               
Time:15:10:30 | 【goruntine#10】使用完毕,将client[98081]放回池子
Time:15:10:30 | 【goruntine#7】使用完毕,将client[31847]放回池子 
Time:15:10:30 | 【goruntine#1】获取到client[31847]               
Time:15:10:30 | 【goruntine#9】获取到client[98081]               
Time:15:10:35 | 【goruntine#6】使用完毕,将client[27887]放回池子
Time:15:10:35 | 【goruntine#3】获取到client[27887]              
Time:15:10:35 | 【goruntine#1】使用完毕,将client[31847]放回池子
Time:15:10:35 | 【goruntine#4】获取到client[31847]              
Time:15:10:35 | 【goruntine#9】使用完毕,将client[98081]放回池子
Time:15:10:35 | 【goruntine#2】获取到client[98081]              
Time:15:10:40 | 【goruntine#3】使用完毕,将client[27887]放回池子
Time:15:10:40 | 【goruntine#8】获取到client[27887]              
Time:15:10:40 | 【goruntine#2】使用完毕,将client[98081]放回池子
Time:15:10:40 | 【goruntine#4】使用完毕,将client[31847]放回池子
Time:15:10:45 | 【goruntine#8】使用完毕,将client[27887]放回池子

注意点

  • 在调用 Wait 方法前,需要先加锁,就像我上面例子中 Pull 方法也是先加锁

看一下源码就知道了,因为 Wait 方法的执行逻辑是先将 goruntine 添加到等待队列中,然后释放锁,然后阻塞,等唤醒后,会继续加锁。如果在调用 Wait 前不加锁,但是里面会解锁,执行的时候就会报错。

//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
func (c *Cond) Wait() {
   c.checker.check()

   // 添加到等待队列
   t := runtime_notifyListAdd(&c.notify)
   c.L.Unlock()

   // 阻塞
   runtime_notifyListWait(&c.notify, t)
   c.L.Lock()
}
  • 还是 Wait 方法,在唤醒后需要继续检查 Cond 条件

就拿上面的 redis 连接案例来进行说明吧,我这里是使用了 for 循环来进行检测。如果将 for 循环改成使用 if,也就是只判断一次,会有什么问题?可以停下来先想想

上面说了调用者也可以使用 Broadcast 方法来唤醒 goruntine ,如果使用的是 Broadcast 方法,所有的 goruntine 都会被唤醒,然后大家都去链表中去获取 redis 连接了,就会出现部分 goruntine拿不到连接,实际上没有那么多连接可以获取,因为每次只会放回一个连接到池子中。

// 如果连接池没有连接 需要阻塞
for this.clients.Len() <= 0 {
  this.cond.Wait()
}

// 获取连接
ele := this.clients.Remove(this.clients.Front())
return ele.(*Client)

到此这篇关于Go并发编程sync.Cond的具体使用的文章就介绍到这了,更多相关Go sync.Cond内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Go语言中sync.Cond使用详解

    目录 sync.Cond 可以用来干什么? 与 Sync.Mutex 的区别 sync.Cond 使用场景 sync.Cond sync.Cond 有哪些方法 NewCond 创建实例 Broadcast 广播唤醒所有 Signal 唤醒一个协程 Wait 等待 代码示例 sync.Cond 可以用来干什么? Golang 的 sync 包中的 Cond 实现了一种条件变量,可以使用多个 Reader 等待公共资源. 每个 Cond 都会关联一个 Lock ,当修改条件或者调用 Wait 方法,

  • Go并发编程sync.Cond的具体使用

    目录 简介 详细介绍 案例:Redis连接池 注意点 简介 Go 标准库提供 Cond 原语的目的是,为等待 / 通知场景下的并发问题提供支持.Cond 通常应用于等待某个条件的一组 goroutine,等条件变为 true 的时候,其中一个 goroutine 或者所有的 goroutine 都会被唤醒执行. Cond 是和某个条件相关,这个条件需要一组 goroutine 协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine 都会被阻塞住,只有这一组 goroutin

  • go并发编程sync.Cond使用场景及实现原理

    目录 使用场景 实现原理 copyChecker Wait Signal Broadcast 使用场景 sync.Cond是go标准库提供的一个条件变量,用于控制一组goroutine在满足特定条件下被唤醒. sync.Cond常用于一组goroutine等待,一个goroutine通知(事件发生)的场景.如果只有一个goroutine等待,一个goroutine通知(事件发生),使用Mutex或者Channel就可以实现. 可以用一个全局变量标志特定条件condition,每个sync.Con

  • Go语言并发编程 sync.Once

    sync.Once用于保证某个动作只被执行一次,可用于单例模式中,比如初始化配置.我们知道init()函数也只会执行一次,不过它是在main()函数之前执行,如果想要在代码执行过程中只运行某个动作一次,可以使用sync.Once,下面来介绍一下它的使用方法. 先来看下面的代码: package main import ( "fmt" "sync" ) func main() { var num = 6 var once sync.Once add_one := fu

  • Go并发编程中sync/errGroup的使用

    目录 一.序 二.errGroup 2.1 函数签名 三.源码 3.1 Group 3.2 WaitContext 3.3 Go 3.4 Wait 四. 案例 五. 参考 一.序 这一篇算是并发编程的一个补充,起因是当前有个项目,大概の 需求是,根据kafka的分区(partition)数,创建同等数量的 消费者( goroutine)从不同的分区中消费者消费数据,但是总有某种原因导致,某一个分区消费者创建失败,但是其他分区消费者创建失败. 最初的逻辑是,忽略分区失败的逻辑,将成功创建的分区消费

  • 一文带你深入理解Go语言中的sync.Cond

    目录 sync.Cond 是什么 适用场景 sync.Cond 的基本用法 NewCond 创建实例 Wait 等待条件满足 Signal 通知一个等待的 goroutine Broadcast 通知所有等待的 goroutine sync.Cond 使用实例 为什么要用 sync.Cond close channel 广播实例 sync.Cond 基本原理 sync.Cond 的设计与实现 sync.Cond 模型 notifyList 结构体 sync.Cond 的方法 Wait 方法 Si

  • 一文掌握Go语言并发编程必备的Mutex互斥锁

    目录 1. Mutex 互斥锁的基本概念 2. Mutex 互斥锁的基本用法 3. Mutex 互斥锁的底层实现 3.1 等待队列 3.2 锁状态 4. Mutex 互斥锁的注意事项 4.1 不要将 Mutex 作为函数或方法的参数传递 4.2 不要在获取 Mutex 的锁时阻塞太久 4.3 不要重复释放 Mutex 的锁 4.4 不要在锁内部执行阻塞或耗时操作 5. 总结 在并发编程中,我们需要处理多个线程同时对共享资源的访问问题.如果不加控制地同时访问共享资源,就会导致竞争条件(Race C

  • Java并发编程Semaphore计数信号量详解

    Semaphore 是一个计数信号量,它的本质是一个共享锁.信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可:当信号量中有可用的许可时,线程能获取该许可:否则线程必须等待,直到有可用的许可为止. 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量). 简单示例: package me.socketthread; import java.util.concurrent.ExecutorService;

  • Go并发编程实践

    前言 并发编程一直是Golang区别与其他语言的很大优势,也是实际工作场景中经常遇到的.近日笔者在组内分享了我们常见的并发场景,及代码示例,以期望大家能在遇到相同场景下,能快速的想到解决方案,或者是拿这些方案与自己实现的比较,取长补短.现整理出来与大家共享. 简单并发场景 很多时候,我们只想并发的做一件事情,比如测试某个接口的是否支持并发.那么我们就可以这么做: func RunScenario1() { count := 10 var wg sync.WaitGroup for i := 0;

  • Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解

    Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解 在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法. 以下是本文目录大纲: 一.CountDownLatch用法 二.CyclicBarrier用法 三.Semaphore用法 若有不正之处请多多谅解,并欢迎批评指正. 一.CountDownLatch

  • Java并发编程总结——慎用CAS详解

    一.CAS和synchronized适用场景 1.对于资源竞争较少的情况,使用synchronized同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗cpu资源:而CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能. 2.对于资源竞争严重的情况,CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized.以java.util.concurrent.atomic包中AtomicInteger类为例,其getAn

随机推荐