一文详解go同步协程的必备工具WaitGroup

目录
  • 1. 简介
  • 2. 基本使用
    • 2.1 定义
    • 2.2 使用方式
    • 2.3 使用例子
  • 3.实现原理
    • 3.1 设计初衷
    • 3.2 基本原理
    • 3.3 代码实现
      • 3.3.1 Add方法
      • 3.3.2 Done方法实现
      • 3.3.3 Wait方法实现
    • 3.4 实现补充
  • 4.使用注意事项
    • 4.1 Add方法和Done方法需要成对出现
    • 4.2 在所有任务都已经添加之后,才调用Wait方法进行等待
  • 5. WaitGroup常见使用场景
  • 总结

1. 简介

本文将介绍 Go 语言中的 WaitGroup 并发原语,包括 WaitGroup 的基本使用方法、实现原理、使用注意事项以及常见的使用方式。能够更好地理解和应用 WaitGroup 来协调多个 Goroutine 的执行,提高 Go 并发编程的效率和稳定性。

2. 基本使用

2.1 定义

WaitGroup是Go语言标准库中的一个结构体,它提供了一种简单的机制,用于同步多个协程的执行。适用于需要并发执行多个任务并等待它们全部完成后才能继续执行后续操作的场景。

2.2 使用方式

首先主协程创建WaitGroup实例,然后在每个协程的开始处,调用Add(1)方法,表示需要等待一个任务执行完成,然后协程在任务执行完成之后,调用Done方法,表示任务已经执行完成了。

主协程中,需要调用Wait()方法,等待所有协程完成任务,示例如下:

func main(){
    //首先主协程创建WaitGroup实例
    var wg sync.WaitGroup
    // 开始时调用Add方法表示有个任务开始执行
    wg.Add(1)
    go func() {
        // 开始执行...
        //完成之后,调用Done方法
        wg.Done()
    }()
    // 调用Wait()方法,等待所有协程完成任务
    wg.Wait()
    // 执行后续逻辑
}

2.3 使用例子

package main
import (
    "fmt"
    "sync"
)
func main() {
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
       wg.Add(1)
       go func(i int) {
          defer wg.Done()
          fmt.Printf("任务%d开始执行\n", i)
          // 模拟协程任务执行一段时间
          time.Sleep(time.Duration(rand.Int() % 100))
          // 线程任务执行完成
          fmt.Printf("任务%d执行完毕\n", i)
       }(i)
    }
    fmt.Println("主协程开始等待所有任务执行完成...")
    wg.Wait()
    fmt.Println("所有协程已经执行完毕...")
}

在这个例子中,我们使用了sync.WaitGroup来等待5个协程执行完毕。在循环中,每创建一个任务,我们调用一次wg.Add(1)方法,然后启动一个协程去执行任务,当协程完成任务后,调用wg.Done方法,告知主协程任务已经执行完毕。然后主协程会在5个协程任务全部执行完毕之后,才会继续向下执行。

3.实现原理

3.1 设计初衷

WaitGroup的设计初衷就是为了等待一组操作完成后再执行下一步操作,通常会在一组协程中使用。

3.2 基本原理

sync.WaitGroup 结构体中的 state1state2 字段是用于实现 WaitGroup 功能的重要变量。

type WaitGroup struct {
   noCopy noCopy
   state1 uint64
   state2 uint32
}

由于 WaitGroup 需要等待一组操作完成之后再执行,因此需要等待所有操作完成之后才能继续执行。为了实现这个功能,WaitGroup 使用了一个计数器 counter 来记录还有多少个操作没有完成,如果 counter 的值为 0,则表示所有操作已经完成。

同时,WaitGroup 在所有任务都完成之后,需要唤醒所有处于等待的协程,此时需要知道有多少个协程处于等待状态。为了实现这个功能,WaitGroup 使用了一个等待计数器 waiter 来记录当前有多少个协程正在等待操作完成。

这里WaitGroup对于计数器和等待计数器的实现,是通过一个64位无符号整数来实现的,也就是WaitGroup结构体中的state1,其中高32位保存了任务计数器counter的值,低32位保存了等待计数器waiter的值。当我们创建一个 WaitGroup 实例时,该实例的任务计数器等待计数器都被初始化为 0。

而且,等待协程需要等待所有任务完成之后才能继续执行,所以等待协程在任务未完成时会被阻塞,当任务全部完成后,自动被唤醒。WaitGroup使用 state2 用于实现信号量机制。通过调用 runtime_Semacquire()runtime_Semrelease() 函数,可以在不阻塞线程的情况下进行等待和通知操作。

3.3 代码实现

3.3.1 Add方法

调用 Add() 方法增加/减小counter的值,delta的值可以是正数,也可以是负数,下面是Add方法的源码实现:

func (wg *WaitGroup) Add(delta int) {
   // delta 的值可以为负数,Done方法便是通过Add(-1)来实现的
   // statep: 为state1的地址  semap: 为state2的地址
   statep, semap := wg.state()
   // 高32位的值 加上 delta,增加任务计数器的值
   state := atomic.AddUint64(statep, uint64(delta)<<32)
   // v: 取高32位数据,获取到待完成任务数
   v := int32(state >> 32)
   // 取低32位数据,获取到等待线程的值
   w := uint32(state)
   // v > 0: 说明还有待完成的任务数,此时不应该唤醒等待协程
   // w = 0: 说明没有协程在等待,此时可以直接退出
   if v > 0 || w == 0 {
      return
   }
   // 此时v = 0,所有任务都完成了,唤醒等待协程
   *statep = 0
   for ; w != 0; w-- {
      runtime_Semrelease(semap, false, 0)
   }
}

3.3.2 Done方法实现

调用 Done() 方法表示完成了一个任务,通过调用Add方法,delta值为-1,减少任务计数器counter的值,当其归为0时,便自动唤醒所有处于等待的协程。

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
   wg.Add(-1)
}

3.3.3 Wait方法实现

调用Wait方法,等待任务执行完成,增加等待计数器Waiter的值:

func (wg *WaitGroup) Wait() {
   // statep: 为state1的地址  semap: 为state2的地址
   statep, semap := wg.state()
   for {
      // 加载state1的值
      state := atomic.LoadUint64(statep)
      // v: 取高32位数据,获取到待完成任务数
      v := int32(state >> 32)
      // 没有任务待执行,全部都完成了
      if v == 0 {
         return
      }
      // 增加waiter计数器的值
      if atomic.CompareAndSwapUint64(statep, state, state+1) {
          // 等待被唤醒
         runtime_Semacquire(semap)
         return
      }
   }
}

3.4 实现补充

Add方法,Done方法以及Wait方法实现中,有一些异常场景的验证逻辑被我删除掉了。当出现异常场景时,说明用户使用方式和WaitGroup的设计初衷相违背了,此时WaitGroup就会直接panic。

下面通过说明使用的注意事项,来间接介绍WaitGroup的异常验证逻辑。

4.使用注意事项

4.1 Add方法和Done方法需要成对出现

下面是一个Add方法和Done方法没有成对出现的例子,此时Add方法调多了,此时计数器永远大于0,Wait 方法会一直阻塞等待。

package main
import (
    "fmt"
    "sync"
)
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        fmt.Println("Goroutine 1")
    }()
    go func() {
        fmt.Println("Goroutine 2")
    }()
    wg.Wait()
    fmt.Println("All goroutines finished")
}

在上述代码中,我们调用了wg.Add(2),但只调用了一次wg.Done()。这会导致counter的值大于0,因此调用wg.Wait()会被永久阻塞,不会继续向下继续执行。

还有另外一种情况时Done方法调用多了,此时任务计数器counter的值为负数,从WaitGroup设计的语意来看,就是需要等待完成的任务数为负数,这个不符合预期,此时将会直接panic

package main
import (
    "fmt"
    "sync"
)
func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        fmt.Println("Goroutine 1 started")
        wg.Done() // 第一次调用Done方法
        wg.Done() // 第二次调用Done方法
        fmt.Println("Goroutine 1 completed")
    }()
    wg.Wait()
    fmt.Println("All goroutines completed")
}

在上面的例子中,我们启动了一个goroutine,第一次调用Add方法,counter的值变为1,在第14行调用Done,此时计数器的值变为0,此时等待中的goroutine将会被唤醒。在第15行又调用了一次Done方法,当counter减小为0时,再次调用Done方法会导致panic,因为此时waitGroup的计数器已经为0,再次减少将导致负数计数,这是不被允许的。

所以在调用Done方法时,需要保证每次调用都与Add方法的调用一一对应,否则会导致程序出现错误。

4.2 在所有任务都已经添加之后,才调用Wait方法进行等待

WaitGroup的设计初衷就是为了等待一组操作完成后再执行下一步操作。所以,如果在所有任务添加之前,便调用Wait方法进行等待,此时有可能会导致等待协程提前被唤醒,执行下一步操作,而尚未添加的任务则不会被等待,这违反了WaitGroup的设计初衷,也不符合预期。下面是一个简单的例子:

package main
import (
        "fmt"
        "sync"
        "time"
)
func main() {
        var wg sync.WaitGroup
        for i := 1; i <= 3; i++ {
           go func(id int) {
              wg.Add(1)
              defer wg.Done()
              fmt.Printf("Goroutine %d started\n", id)
              time.Sleep(time.Duration(id) * time.Second)
              fmt.Printf("Goroutine %d finished\n", id)
           }(i)
        }
        // 不等待所有任务添加,就开始等待
        wg.Wait()
        fmt.Println("All goroutines finished")
        time.Sleep(10 * time.Second)
}

代码执行结果如下,等待协程被提前唤醒,执行之后的操作,而子任务在等待协程唤醒后才开始执行:

All goroutines finished
Goroutine 1 started
Goroutine 3 started
Goroutine 2 started
Goroutine 1 finished
Goroutine 2 finished
Goroutine 3 finished

在这个例子中,我们创建了三个协程并打印出它们开始和结束的消息。但是,我们没有在任务开始前调用Add方法添加任务,而是在任务开始之后再调用Add方法添加任务。

这可能会导致某些任务未被加入到WaitGroup中,等待协程就调用了wg.Wait方法,这样就会导致一些任务未被加入WaitGrou,从而导致等待协程不会等待这些任务执行完成。如果这种情况发生了,我们会看到"All goroutines finished"被输出,但实际上有一些协程还没有完成。

因此,我们应该在所有任务添加完毕之后再调用Wait方法,以保证等待的正确性。

5. WaitGroup常见使用场景

在函数或方法中使用,如果一个大任务可以拆分为多个独立的子任务,此时会将其进行拆分,并使用多个协程来并发执行这些任务,提高执行效率,同时使用WaitGroup等待所有子任务执行完成,完成协程间的同步。

下面来看go-redis中ClusterClient结构体中ForEachMaster方法中对于WaitGroup的使用。ForEachMaster方法通常用于在 Redis 集群中执行针对所有主节点的某种操作,例如在集群中添加或删除键,或者执行一些全局的诊断操作,具体执行的操作由传入参数fn指定。

这里ForEachMaster方法会对所有主节点执行某种操作,这里的实现是对所有主节点执行某种操作这个大任务,拆分为多个独立的子任务,每个子任务完成对一个Master节点执行指定操作,然后每个子任务启动一个协程去执行,主协程使用WaitGroup等待所有协程完成指定子任务,ForEachMaster也就完成了对所有主节点执行某种操作的任务。具体实现如下:

func (c *ClusterClient) ForEachMaster(
   ctx context.Context,
   fn func(ctx context.Context, client *Client) error,
) error {
   // 重新加载集群状态,以确保状态信息是最新的
   state, err := c.state.ReloadOrGet(ctx)
   if err != nil {
      return err
   }
   var wg sync.WaitGroup
   // 用于协程间通信
   errCh := make(chan error, 1)
    // 获取到redis集群中所有的master节点
   for _, master := range state.Masters {
      // 启动一个协程来执行该任务
      wg.Add(1)
      go func(node *clusterNode) {
         // 任务完成时,调用Done告知WaitGroup任务已完成
         defer wg.Done()
         err := fn(ctx, node.Client)
         if err != nil {
            select {
            case errCh &lt;- err:
            default:
            }
         }
      }(master)
   }
   // 主协程等待所有任务完成
   wg.Wait()
   return nil
 }

总结

本文介绍了 Go 语言中的 WaitGroup 并发原语,它提供了一种简单且强大的机制来协调多个 Goroutine 的执行。我们首先学习了 WaitGroup 的基本使用方法,包括如何创建 WaitGroup、如何向计数器中添加值、如何等待所有 Goroutine 完成以及如何在 Goroutine 中通知 WaitGroup 完成。

接着,我们了解了 WaitGroup 的实现原理,包括计数器和等待计数器的实现。了解了实现原理之后,我们可以更好地理解 WaitGroup 的内部机制以及如何更好地使用它来实现我们的需求。

在接下来的部分中,我们介绍了一些使用 WaitGroup 的注意事项,以及常见的使用方式。基于此,我们完成了对WaitGroup的介绍,更多关于go同步协程WaitGroup的资料请关注我们其它相关文章!

(0)

相关推荐

  • Go WaitGroup及Cond底层实现原理

    目录 WaitGroup 概念 底层数据结构 使用方法 Cond 概念 底层数据结构 使用方法 WaitGroup 概念 Go标准库提供了WaitGroup原语, 可以用它来等待一批 Goroutine 结束 底层数据结构 // A WaitGroup must not be copied after first use. type WaitGroup struct { noCopy noCopy state1 [3]uint32 } 其中 noCopy 是 golang 源码中检测禁止拷贝的技

  • Go sync WaitGroup使用深入理解

    目录 基本介绍 使用 源码分析 Add Done Wait 注意事项 基本介绍 WaitGroup是go用来做任务编排的一个并发原语,它要解决的就是并发 - 等待的问题: 当有一个 goroutine A 在检查点(checkpoint)等待一组 goroutine 全部完成,如果这些 goroutine 还没全部完成,goroutine A 就会阻塞在检查点,直到所有 goroutine 都完成后才能继续执行 试想如果没有WaitGroup,想要在协程A等到其他协程执行完成后能立马执行,只能不

  • Go语言学习之WaitGroup用法详解

    目录 前言 小试牛刀 总览 底层实现 结构体 Add Done Wait 易错点 总结 前言 在前面的文章中,我们使用过 WaitGroup 进行任务编排,Go语言中的 WaitGroup 和 Java 中的 CyclicBarrier.CountDownLatch 非常类似.比如我们有一个主任务在执行,执行到某一点时需要并行执行三个子任务,并且需要等到三个子任务都执行完后,再继续执行主任务.那我们就需要设置一个检查点,使主任务一直阻塞在这,等三个子任务执行完后再放行. 说明:本文中的示例,均是

  • Go并发:使用sync.WaitGroup实现协程同步方式

    经常看到有人会问如何等待主协程中创建的协程执行完毕之后再结束主协程,例如如下代码: package main import ( "fmt" ) func main() { go func() { fmt.Println("Goroutine 1") }() go func() { fmt.Println("Goroutine 2") }() } 执行以上代码很可能看不到输出,因为有可能这两个协程还没得到执行主协程已经结束了,而主协程结束时会结束所

  • go sync Waitgroup数据结构实现基本操作详解

    目录 WaitGroup 示例 WaitGroup 基本原理 背景知识 信号量 WaitGroup 中的信号量 WaitGroup 数据结构 noCopy state sema WaitGroup 的三个基本操作 WaitGroup 的实现 Add 的实现 Done 的实现 Wait 的实现 总结 WaitGroup 示例 本文基于 Go 1.19. go 里面的 WaitGroup 是非常常见的一种并发控制方式,它可以让我们的代码等待一组 goroutine 的结束. 比如在主协程中等待几个子

  • Go语言同步等待组sync.WaitGroup结构体对象方法详解

    目录 sync.WaitGroup结构体对象 WaitGroup的结构体 Add()方法 Done()方法 Wait()方法 Add().Done().Wait()三者对比 sync.WaitGroup使用示例 sync.WaitGroup结构体对象 在Go语言中,sync.WaitGroup结构体对象用于等待一组线程的结束:WaitGroup是go并发中最常用的工具,我们可以通过WaitGroup来表达这一组协程的任务是否完成,以决定是否继续往下走,或者取任务结果: WaitGroup的结构体

  • 一文详解go同步协程的必备工具WaitGroup

    目录 1. 简介 2. 基本使用 2.1 定义 2.2 使用方式 2.3 使用例子 3.实现原理 3.1 设计初衷 3.2 基本原理 3.3 代码实现 3.3.1 Add方法 3.3.2 Done方法实现 3.3.3 Wait方法实现 3.4 实现补充 4.使用注意事项 4.1 Add方法和Done方法需要成对出现 4.2 在所有任务都已经添加之后,才调用Wait方法进行等待 5. WaitGroup常见使用场景 总结 1. 简介 本文将介绍 Go 语言中的 WaitGroup 并发原语,包括

  • 详解c++20协程如何使用

    什么是协程 新接触的人看了网上很多人的见解都是一头雾水,本人的理解,协程就是可中断的函数,这个函数在执行到某一时刻可以暂停,保存当前的上下文(比如当前作用域的变量,函数参数等等),在后来某一时刻可以手动恢复这个中断的函数,把保存的上下文恢复并从中断的地方继续执行.简而言之,协程就是可中断的函数,协程如何实现:保存上下文和恢复上下文. 你可能会说协程不会这么简单的吧,我这里来举例一下啊,如python的协程 def test(): print('begin') yield print('hello

  • 详解Go多协程并发环境下的错误处理

    引言 在Go语言中,我们通常会用到panic和recover来抛出错误和捕获错误,这一对操作在单协程环境下我们正常用就好了,并不会踩到什么坑.但是在多协程并发环境下,我们常常会碰到以下两个问题.假设我们现在有2个协程,我们叫它们协程A和B好了: 如果协程A发生了panic,协程B是否会因为协程A的panic而挂掉? 如果协程A发生了panic,协程B是否能用recover捕获到协程A的panic? 答案分别是:会.不能. 那么下面我们来一一验证,并给出在具体的业务场景下的最佳实践. 问题一 如果

  • 详解python之协程gevent模块

    Gevent官网文档地址:http://www.gevent.org/contents.html 进程.线程.协程区分 我们通常所说的协程Coroutine其实是corporate routine的缩写,直接翻译为协同的例程,一般我们都简称为协程. 在linux系统中,线程就是轻量级的进程,而我们通常也把协程称为轻量级的线程即微线程. 进程和协程 下面对比一下进程和协程的相同点和不同点: 相同点: 相同点存在于,当我们挂起一个执行流的时,我们要保存的东西: 栈, 其实在你切换前你的局部变量,以及

  • 一文详解Golang协程调度器scheduler

    目录 1. 调度器scheduler的作用 2. GMP模型 3. 调度机制 1. 调度器scheduler的作用 我们都知道,在Go语言中,程序运行的最小单元是gorouines. 然而程序的运行最终都是要交给操作系统来执行的,以Java为例,Java中的一个线程对应的就是操作系统中的线程,以此来实现在操作系统中的运行.在Go中,gorouines比线程更轻量级,其与操作系统的线程也不是一一对应的关系,然而,最终我们想要执行程序,还是要借助操作系统的线程来完成,调度器scheduler的工作就

  • 一文详解MySQL主从同步原理

    目录 1. MySQL主从同步实现方式 2. MySQL主从同步的作用 一主多从架构 双主多从架构 3. 主动同步的原理 4. 主从同步延迟问题 主从同步延迟的原因有哪些? 主从同步延迟的解决方案? 5. 如何提升主从同步性能 从库开启多线程复制 修改同步模式,改为异步 修改从库Bin Log配置 知识点总结 1. MySQL主从同步实现方式 MySQL主从同步是基于Bin Log实现的,而Bin Log记录的是原始SQL语句. Bin Log共有三种日志格式,可以binlog_format配置

  • 一文详解JS中的事件循环机制

    目录 前言 1.JavaScript是单线程的 2.同步和异步 3.事件循环 前言 我们知道JavaScript 是单线程的编程语言,只能同一时间内做一件事,按顺序来处理事件,但是在遇到异步事件的时候,js线程并没有阻塞,还会继续执行,这又是为什么呢?本文来总结一下js 的事件循环机制. 1.JavaScript是单线程的 JavaScript 是一种单线程的编程语言,只有一个调用栈,决定了它在同一时间只能做一件事.在代码执行的时候,通过将不同函数的执行上下文压入执行栈中来保证代码的有序执行.在

  • 一文详解Java中的类加载机制

    目录 一.前言 二.类加载的时机 2.1 类加载过程 2.2 什么时候类初始化 2.3 被动引用不会初始化 三.类加载的过程 3.1 加载 3.2 验证 3.3 准备 3.4 解析 3.5 初始化 四.父类和子类初始化过程中的执行顺序 五.类加载器 5.1 类与类加载器 5.2 双亲委派模型 5.3 破坏双亲委派模型 六.Java模块化系统 一.前言 Java虚拟机把描述类的数据从Class文件加载到内存,并对数据进行校验.转换解析和初始化,最 终形成可以被虚拟机直接使用的Java类型,这个过程

  • 一文详解Golang中net/http包的实现原理

    目录 前言 http包执行流程 http包源码分析 端口监听 请求解析 路由分配 响应处理 前言 Go语言自带的net/http包提供了HTTP客户端和服务端的实现,实现一个简单的http服务非常容易,其自带了一些列结构和方法来帮助开发者简化HTTP服务开发的相关流程,因此我们不需要依赖任何第三方组件就能构建并启动一个高并发的HTTP服务器,net/http包在编写web应用中有很重要的作用,这篇文章会学习如何用 net/http 自己编写实现一个 HTTP Server 并探究其实现原理,具体

  • 一文详解Golang 定时任务库 gron 设计和原理

    目录 cron 简介 gron 定时参数 源码解析 Cron Entry 按照时间排序 新增定时任务 启动和停止 Schedule 扩展性 经典写法-控制退出 结语 cron 简介 在 Unix-like 操作系统中,有一个大家都很熟悉的 cli 工具,它能够来处理定时任务,周期性任务,这就是: cron. 你只需要简单的语法控制就能实现任意[定时]的语义.用法上可以参考一下这个Crontab Guru Editor,做的非常精巧. 简单说,每一个位都代表了一个时间维度,* 代表全集,所以,上面

随机推荐