go sync Waitgroup数据结构实现基本操作详解

目录
  • WaitGroup 示例
  • WaitGroup 基本原理
  • 背景知识
    • 信号量
    • WaitGroup 中的信号量
  • WaitGroup 数据结构
    • noCopy
    • state
    • sema
  • WaitGroup 的三个基本操作
  • WaitGroup 的实现
    • Add 的实现
    • Done 的实现
    • Wait 的实现
  • 总结

WaitGroup 示例

本文基于 Go 1.19。

go 里面的 WaitGroup 是非常常见的一种并发控制方式,它可以让我们的代码等待一组 goroutine 的结束。 比如在主协程中等待几个子协程去做一些耗时的操作,如发起几个 HTTP 请求,然后等待它们的结果。

下面的代码展示了一个 goroutine 等待另外 2 个 goroutine 结束的例子:

func TestWaitgroup(t *testing.T) {
   var wg sync.WaitGroup
   // 计数器 +2
   wg.Add(2)
   go func() {
      sendHttpRequest("https://baidu.com")
      // 计数器 -1
      wg.Done()
   }()
   go func() {
      sendHttpRequest("https://baidu.com")
      // 计数器 -1
      wg.Done()
   }()
   // 阻塞。计数器为 0 的时候,Wait 返回
   wg.Wait()
}
// 发起 HTTP GET 请求
func sendHttpRequest(url string) (string, error) {
   method := "GET"
   client := &http.Client{}
   req, err := http.NewRequest(method, url, nil)
   if err != nil {
      return "", err
   }
   res, err := client.Do(req)
   if err != nil {
      return "", err
   }
   defer res.Body.Close()
   body, err := io.ReadAll(res.Body)
   if err != nil {
      return "", err
   }
   return string(body), err
}

在这个例子中,我们做了如下事情:

  • 定义了一个 WaitGroup 对象 wg,调用 wg.Add(2) 将其计数器 +2
  • 启动两个新的 goroutine,在这两个 goroutine 中,使用 sendHttpRequest 函数发起了一个 HTTP 请求。
  • 在 HTTP 请求返回之后,调用 wg.Done 将计数器 -1
  • 在函数的最后,我们调用了 wg.Wait,这个方法会阻塞,直到 WaitGroup 的计数器的值为 0 才会解除阻塞状态。

WaitGroup 基本原理

WaitGroup 内部通过一个计数器来统计有多少协程被等待。这个计数器的值在我们启动 goroutine 之前先写入(使用 Add 方法), 然后在 goroutine 结束的时候,将这个计数器减 1(使用 Done 方法)。除此之外,在启动这些 goroutine 的协程中, 会调用 Wait 来进行等待,在 Wait 调用的地方会阻塞,直到 WaitGroup 内部的计数器减到 0。 也就实现了等待一组 goroutine 的目的

背景知识

在操作系统中,有多种实现进程/线程间同步的方式,如:test_and_setcompare_and_swap、互斥锁等。 除此之外,还有一种是信号量,它的功能类似于互斥锁,但是它能提供更为高级的方法,以便进程能够同步活动。

信号量

一个信号量(semaphore)S是一个整型变量,它除了初始化外只能通过两个标准的原子操作:wait()signal() 来访问。 操作 wait() 最初称为 P(荷兰语 proberen,测试);操作 signal() 最初称为 V(荷兰语 verhogen,增加),可按如下来定义 wait()

PV 原语。

wait(S) {
    while (S <= 0)
        ; // 忙等待
    S--;
}

可按如下来定义 signal()

signal(S) {
    S++;
}

wait()signal() 操作中,信号量整数值的修改应不可分割地执行。也就是说,当一个进程修改信号量值时,没有其他进程能够同时修改同一信号量的值。

简单来说,信号量实现的功能是:

  • 当信号量>0 时,表示资源可用,则 wait 会对信号量执行减 1 操作。
  • 当信号量<=0 时,表示资源暂时不可用,获取信号量时,当前的进程/线程会阻塞,直到信号量为正时被唤醒。

WaitGroup 中的信号量

WaitGroup 中,使用了信号量来实现 goroutine 的阻塞以及唤醒:

  • 在调用 Wait 的地方,goroutine 会陷入阻塞,直到信号量大于等于 0 的时候解除阻塞状态,得以继续执行。
  • 在调用 Done 的时候,如果 WaitGroup 内的等待协程的计数器减到 0 的时候,信号量会进行递增,这样那些阻塞的协程会进行执行下去。

WaitGroup 数据结构

type WaitGroup struct {
   noCopy noCopy
   // 高 32 位为计数器,低 32 位为等待者数量
   state atomic.Uint64
   sema  uint32
}

noCopy

我们发现,WaitGroup 中有一个字段 noCopy,顾名思义,它的目的是防止复制。 这个字段在运行时是没有什么影响的,但是我们通过 go vet 可以发现我们对 WaitGroup 的复制。 为什么不能复制呢?因为一旦复制,WaitGroup 内的计数器就不再准确了,比如下面这个例子:

func test(wg sync.WaitGroup) {
   wg.Done()
}
func TestWaitGroup(t *testing.T) {
   var wg sync.WaitGroup
   wg.Add(1)
   test(wg)
   wg.Wait()
}

go 里面的函数参数传递是值传递。调用 test(wg) 的时候将 WaitGroup 复制了一份。

在这个例子中,程序会永远阻塞下去,因为 test 中调用 wg.Done() 的时候,只是将 WaitGroup 副本的计数器减去了 1, 而 TestWaitGroup 里面的 WaitGroup 的计数器并没有发生改变,因此 Wait 会永远阻塞。

我们如果需要将 WaitGroup 作为参数,请传递指针:

func test(wg *sync.WaitGroup) {
   wg.Done()
}

传递指针之后,我们在 test 中调用 wg.Done() 修改的就是 TestWaitGroup 里面同一个 WaitGroup。 从而,Wait 方法可以正常返回。

state

WaitGroup 里面的 state 是一个 64 位的 atomic.Uint64 类型,它的高 32 位用来保存 counter(也就是上面说的计数器),低 32 位用来保存 waiter(也就是阻塞在 Wait 上的 goroutine 数量。)

sema

WaitGroup 通过 sema 来记录信号量:

  • runtime_Semrelease 表示将信号量递增(对应信号量中的 signal 操作)
  • runtime_Semacquire 表示将信号量递减(对应信号量中的 wait 操作)

简单来说,在调用 runtime_Semacquire 的时候 goroutine 会阻塞,而调用 runtime_Semrelease 会唤醒阻塞在同一个信号量上的 goroutine。

WaitGroup 的三个基本操作

  • Add: 这会将 WaitGroup 里面的 counter 加上一个整数(也就是传递给 Add 的函数参数)。
  • Done: 这会将 WaitGroup 里面的 counter 减去 1。
  • Wait: 这会将 WaitGroup 里面的 waiter 加上 1,并且调用 Wait 的地方会阻塞。(有可能会有多个 goroutine 等待一个 WaitGroup

WaitGroup 的实现

Add 的实现

Add 做了下面两件事:

  • delta 加到 state 的高 32 位上
  • 如果 counter0 了,并且 waiter 大于 0,表示所有被等待的 goroutine 都完成了,而还有在等待的 goroutine,这会唤醒那些阻塞在 Wait 上的 goroutine。

源码实现:

func (wg *WaitGroup) Add(delta int) {
   // wg.state 的计数器加上 delta
   //(加到 state 的高 32 上)
   state := wg.state.Add(uint64(delta) << 32) // 高 32 位加上 delta
   v := int32(state >> 32)                    // 高 32 位(counter)
   w := uint32(state)                         // 低 32 位(waiter)
   // 计数器不能为负数(加上 delta 之后不能为负数,最小只能到 0)
   if v < 0 {
      panic("sync: negative WaitGroup counter")
   }
   // 正常使用情况下,是先调用 Add 再调用 Wait 的,这种情况下,w 是 0,v > 0
   if w != 0 && delta > 0 && v == int32(delta) {
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
   }
   // v > 0,计数器大于 0
   // w == 0,没有在 Wait 的协程
   // 说明还没有到唤醒 waiter 的时候
   if v > 0 || w == 0 {
      return
   }
   // Add 负数的时候,v 会减去对应的数值,减到最后 v 是 0。
   // 计数器是 0,并且有等待的协程,现在要唤醒这些协程。
   // 存在等待的协程时,goroutine 已将计数器设置为0。
   // 现在不可能同时出现状态突变:
   // - Add 不能与 Wait 同时发生,
   // - 如果看到计数器==0,则 Wait 不会增加等待的协程。
   // 仍然要做一个廉价的健康检查,以检测 WaitGroup 的误用。
   if wg.state.Load() != state { // 不能在 Add 的同时调用 Wait
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
   }
   // 将等待的协程数量设置为 0。
   wg.state.Store(0)
   for ; w != 0; w-- {
      // signal,调用 Wait 的地方会解除阻塞
      runtime_Semrelease(&wg.sema, false, 0) // goyield
   }
}

Done 的实现

WaitGroup 里的 Done 其实只是对 Add 的调用,但是它的效果是,将计数器的值减去 1。 背后的含义是:一个被等待的协程执行完毕了

Wait 的实现

Wait 主要功能是阻塞当前的协程:

  • Wait 会先判断计数器是否为 0,为 0 说明没有任何需要等待的协程,那么就可以直接返回了。
  • 如果计数器还不是 0,说明有协程还没执行完,那么调用 Wait 的地方就需要被阻塞起来,等待所有的协程完成。

源码实现:

func (wg *WaitGroup) Wait() {
   for {
      // 获取当前计数器
      state := wg.state.Load()
      // 计数器
      v := int32(state >> 32)
      // waiter 数量
      w := uint32(state)
      // v 为 0,不需要等待,直接返回
      if v == 0 {
         // 计数器是 0,不需要等待
         return
      }
      // 增加 waiter 数量。
      // 调用一次 Wait,waiter 数量会加 1。
      if wg.state.CompareAndSwap(state, state+1) {
         // 这会阻塞,直到 sema (信号量)大于 0
         runtime_Semacquire(&wg.sema) // goparkunlock
         // state 不等 0
         // wait 还没有返回又继续使用了 WaitGroup
         if wg.state.Load() != 0 {
            panic("sync: WaitGroup is reused before previous Wait has returned")
         }
         // 解除阻塞状态了,可以返回了
         return
      }
      // 状态没有修改成功(state 没有成功 +1),开始下一次尝试。
   }
}

总结

  • WaitGroup 使用了信号量来实现了并发资源控制,sema 字段表示信号量。
  • 使用 runtime_Semacquire 会使得 goroutine 阻塞直到计数器减少至 0,而使用 runtime_Semrelease 会使得信号量递增,这等于是通知之前阻塞在信号量上的协程,告诉它们可以继续执行了。
  • WaitGroup 作为参数传递的时候,需要传递指针作为参数,否则在被调用函数内对 Add 或者 Done 的调用,在 caller 里面调用的 Wait 会观测不到。
  • WaitGroup 使用一个 64 位的数来保存计数器(高 32 位)和 waiter(低 32 位,正在等待的协程的数量)。
  • WaitGroup 使用 Add 增加计数器,使用 Done 来将计数器减 1,使用 Wait 来等待 goroutine。Wait 会阻塞直到计数器减少到 0

以上就是go sync Waitgroup数据结构实现基本操作详解的详细内容,更多关于go sync Waitgroup数据结构的资料请关注我们其它相关文章!

(0)

相关推荐

  • go数据结构和算法BitMap原理及实现示例

    目录 1. BitMap介绍 如何判断数字在bit数组的位置 设置数据到bit数组 从bit数组中清除数据 数字是否在bit数组中 2. Go语言位运算 左移 右移 使用&^和位移运算来给某一位置0 3. BitMap的Go语言实现 定义 创建BitMap结构 将数据添加到BitMap 从BitMap中删除数据 判断BitMap中是否存在指定的数据 1. BitMap介绍 BitMap可以理解为通过一个bit数组来存储特定数据的一种数据结构.BitMap常用于对大量整形数据做去重和查询.在这类查

  • Go语言数据结构之希尔排序示例详解

    目录 希尔排序 算法思想 图解算法 Go 代码实现: 总结 希尔排序 在插入排序中,在待排序序列的记录个数比较少,而且基本有序,则排序的效率较高. 1959 年,Donald Shell 从“减少记录个数” 和 “基本有序” 两个方面对直接插入排序进行了改进,提出了希尔排序算法. 希尔排序又称为“缩小增量排序”.即将待排序记录按下标的一定增量分组(减少记录个数),对每组记录使用直接插入排序算法排序(达到基本有序): 随着增量逐渐减少,每组包含的关键词越来越多,当增量减至1,整个序列基本有序,再对

  • Go 语言数据结构如何实现抄一个list示例详解

    目录 前言 list是个啥 list结构 Init & New InsertAfter & InsertBefore & PushBack & PushFront Back & Front Remove 总结 前言 闲来无事,自己实现一个 Go提供的list包 list是Go提供的一个内置的包 内部就是实现了一个双向循环链表以及各种API 目标明确:就是实现一个双向循环链表 文章源码 list是个啥 在开始做之前,还是要先了解一下链表这个数据结构 ,长话短说: 线性

  • Golang迭代如何在Go中循环数据结构使用详解

    目录 引言 如何在Go中循环字符串 如何在Go中循环map结构 如何在Go中循环Struct 结论 引言 数组是存储类似类型数据的强大数据结构.您可以通过索引识别和访问其中的元素. 在Golang中,您可以通过在0初始化变量i并增加变量直到它达到数组的长度,使用for循环循环数组. 它们的语法如下所示: for i := 0; i < len(arr); i++ { // perform an operation } 例如,让我们循环一个整数数组: package main import ( &qu

  • Go 数据结构之堆排序示例详解

    目录 堆排序 堆排序过程 动画显示 开始堆排序 代码实现 总结 堆排序 堆排序是一种树形选择排序算法. 简单选择排序算法每次选择一个关键字最小的记录需要 O(n) 的时间,而堆排序选择一个关键字最小的记录需要 O(nlogn)的时间. 堆可以看作一棵完全二叉树的顺序存储结构. 在这棵完全二叉树中,如果每个节点的值都大于等于左边孩子的值,称为大根堆(最大堆.又叫大顶堆).如果每个节点的值都小于等于左边孩子的值,称为小根堆(最小堆,小顶堆). 可以,用数学符号表示如下: 堆排序过程 构建初始堆 在输

  • Go语言数据结构之插入排序示例详解

    目录 插入排序 动画演示 Go 代码实现 总结 插入排序 插入排序,英文名(insertion sort)是一种简单且有效的比较排序算法. 思想: 在每次迭代过程中算法随机地从输入序列中移除一个元素,并将改元素插入待排序序列的正确位置.重复该过程,直到所有输入元素都被选择一次,排序结束. 插入排序有点像小时候我们抓扑克牌的方式,如果抓起一张牌,我们放在手里:抓起第二张的时候,会跟手里的第一张牌进行比较,比手里的第一张牌小放在左边,否则,放在右边. 因此,对所有的牌重复这样的操作,所以每一次都是插

  • Go 语言数据结构之双链表学习教程

    目录 双链表 创建节点 双链表遍历 扩展功能 链表长度 插入 删除 反转双链表 总结 双链表 双链表 (Doubly Linked List),每个节点持有一个指向列表前一个元素的指针,以及指向下一个元素的指针. 双向链表的节点中包含 3 个字段: 数据域 Value 一个 Next 指针指向双链表中的下一个节点 一个 Prev 指针,指向双链表中的前一个节点 结构体如下: type Node struct { Prev *Node Value int Next *Node } 实际应用: 音乐

  • Go语言数据结构之选择排序示例详解

    目录 选择排序 动画演示 Go 代码实现 总结 选择排序 选择排序(selection sort)是一种原地(in-place)排序算法,适用于数据量较少的情况.由于选择操作是基于键值的且交换操作只在需要时才执行,所以选择排序长用于数值较大和键值较小的文件. 思想: 对一个数组进行排序,从未排序的部分反复找到最小的元素,并将其放在开头. 给定长度为 nnn 的序列和位置索引i=0 的数组,选择排序将: 遍历一遍序列,寻找序列中的最小值.在 [i...n−1] 范围内找出最小值 minValue

  • C++ 单链表的基本操作(详解)

    链表一直是面试的高频题,今天先总结一下单链表的使用,下节再总结双向链表的.本文主要有单链表的创建.插入.删除节点等. 1.概念 单链表是一种链式存取的数据结构,用一组地址任意的存储单元存放线性表中的数据元素. 链表中的数据是以结点来表示的,每个结点的构成:元素 + 指针,元素就是存储数据的存储单元,指针就是连接每个结点的地址数据.如下图: 2.链表的基本操作 SingleList.cpp: #include "stdafx.h" #include "SingleList.h&

  • pandas的Series类型与基本操作详解

    1 Series 线性的数据结构, series是一个一维数组 Pandas 会默然用0到n-1来作为series的index, 但也可以自己指定index( 可以把index理解为dict里面的key ) 1.1创造一个serise数据 import pandas as pd import numpy as np ​s = pd.Series([9, 'zheng', 'beijing', 128]) ​print(s) 打印 0 9 1 zheng 2 beijing 3 128 dtype

  • Python数据结构之双向链表详解

    目录 0. 学习目标 1. 双向链表简介 1.1 双向链表介绍 1.2 双向链表结点类 1.3 双向链表优缺点 2. 双向链表实现 2.1 双向链表的初始化 2.2 获取双向链表长度 2.3 读取指定位置元素 2.4 查找指定元素 2.5 在指定位置插入新元素 2.6 删除指定位置元素 2.7 其它一些有用的操作 3. 双向链表应用 3.1 双向链表应用示例 3.2 利用双向链表基本操作实现复杂操作 0. 学习目标 单链表只有一个指向直接后继的指针来表示结点间的逻辑关系,因此可以方便的从任一结点

  • Python数据结构之循环链表详解

    目录 0. 学习目标 1. 循环链表简介 2. 循环单链表实现 2.1 循环单链表的基本操作 2.2 简单的实现方法 2.3 循环单链表应用示例 2.4 利用循环单链表基本操作实现复杂操作 3. 循环双链表实现 3.1 循环双链表的基本操作 3.2 循环双链表应用示例 0. 学习目标 循环链表 (Circular Linked List) 是链式存储结构的另一种形式,它将链表中最后一个结点的指针指向链表的头结点,使整个链表头尾相接形成一个环形,使链表的操作更加方便灵活.我们已经介绍了单链表和双向

  • Python数据结构之栈详解

    目录 0.学习目标 1.栈的基本概念 1.1栈的基本概念 1.2栈抽象数据类型 1.3栈的应用场景 2.栈的实现 2.1顺序栈的实现 2.1.1栈的初始化 2.2链栈的实现 2.3栈的不同实现对比 3.栈应用 3.1顺序栈的应用 3.2链栈的应用 3.3利用栈基本操作实现复杂算法 0. 学习目标 栈和队列是在程序设计中常见的数据类型,从数据结构的角度来讲,栈和队列也是线性表,是操作受限的线性表,它们的基本操作是线性表操作的子集,但从数据类型的角度来讲,它们与线性表又有着巨大的不同.本节将首先介绍

  • Python数据结构之队列详解

    目录 0. 学习目标 1. 队列的基本概念 1.1 队列的基本概念 1.2 队列抽象数据类型 1.3 队列的应用场景 2. 队列的实现 2.1 顺序队列的实现 2.2 链队列的实现 2.3 队列的不同实现对比 3. 队列应用 3.1 顺序队列的应用 3.2 链队列的应用 3.3 利用队列基本操作实现复杂算法 0. 学习目标 栈和队列是在程序设计中常见的数据类型,从数据结构的角度来讲,栈和队列也是线性表,是操作受限的线性表,它们的基本操作是线性表操作的子集,但从数据类型的角度来讲,它们与线性表又有

  • JavaScript数据结构链表知识详解

    最近在看<javascript数据结构和算法>这本书,补一下数据结构和算法部分的知识,觉得自己这块是短板. 链表:存储有序的元素集合,但不同于数组,链表中的元素在内存中不是连续放置的.每个元素由一个存储元素本身的节点和一个指向下一个元素的引用(也称指针或链接)组成. 好处:可以添加或移除任意项,它会按需扩容,且不需要移动其他元素. 与数组的区别: 数组:可以直接访问任何位置的任何元素: 链表:想要访问链表中的一个元素,需要从起点(表头)开始迭代列表直到找到所需的元素. 做点小笔记. funct

  • C++ 双链表的基本操作(详解)

    1.概念 双向链表也叫双链表,是链表的一种,它的每个数据结点中都有两个指针,分别指向直接后继和直接前驱.所以,从双向链表中的任意一个结点开始,都可以很方便地访问它的前驱结点和后继结点.一般我们都构造双向循环链表. 结构图如下所示: 2.基本操作实例 DoubleList.cpp #include "stdafx.h" #include "DoubleList.h" #include <stdio.h> #include <malloc.h>

  • C语言数据结构 快速排序实例详解

    C语言数据结构 快速排序实例详解 一.快速排序简介 快速排序采用分治的思想,第一趟先将一串数字分为两部分,第一部分的数值都比第二部分要小,然后按照这种方法,依次对两边的数据进行排序. 二.代码实现 #include <stdio.h> /* 将两个数据交换 */ void swap(int* Ina , int* Inb) { int temp = *Ina; *Ina = *Inb; *Inb = temp; } /* 进行一趟的快速排序,把一个序列分为两个部分 */ int getPart

  • Java语言实现数据结构栈代码详解

    近来复习数据结构,自己动手实现了栈.栈是一种限制插入和删除只能在一个位置上的表.最基本的操作是进栈和出栈,因此,又被叫作"先进后出"表. 首先了解下栈的概念: 栈是限定仅在表头进行插入和删除操作的线性表.有时又叫LIFO(后进先出表).要搞清楚这个概念,首先要明白"栈"原来的意思,如此才能把握本质. "栈"者,存储货物或供旅客住宿的地方,可引申为仓库.中转站,所以引入到计算机领域里,就是指数据暂时存储的地方,所以才有进栈.出栈的说法. 实现方式是

随机推荐