golang基于errgroup实现并发调用的方法

目录
  • 串行调用
  • 基于sync.WaitGroup实现简单的并发调用
  • 基于errgroup.Group实现并发调用
  • 总结

串行调用

在用go编写web/rpc服务器的时候,经常会出现需要对下游多 个/组 服务调用rpc(或者其他比较耗时的操作)的情况。
按照自然的写法,比如对下游有ABC三个调用,串行顺着写,就总共要花费TimeA+TimeB+TimeC的时间:

func Handler(ctx context.Context) {
    var a, b, c respType
    a = A(ctx)
    b = B(ctx)
    c = C(ctx)
}

基于sync.WaitGroup实现简单的并发调用

但经常地,几个rpc相互之间没有依赖关系的情况,这时,我们稍加思考就会想到使用并发的方式,同时发出请求,阻塞等到所有请求返回,这样,总体耗时就变成了Max(TimeA, TimeB, TimeC),我们可以通过常用的sync.WaitGroup轻松实现这事:

func Handler(ctx context.Context) {
    var a, b, c respType
   	wg := sync.WaitGroup{}
	wg.Add(3)
	go func() {
		defer wg.Done()
		a = A(ctx)
	}()
	go func() {
		defer wg.Done()
		b = B(ctx)
	}()
	go func() {
		defer wg.Done()
		c = C(ctx)
	}()
	wg.Wait()
}

但是现实事件是不完美的,尤其是在加入了网络这一因素后,我们经常会需要处理调用失败的情况,很多情况下,并发的几个操作只要任一失败,整个处理就算失败了,但是由于WaitGroup要等所有调用都done才能返回,因此调用时间是由耗时最长的那个(不一定是失败的)决定的,如果不是失败的那个,其实就产生了资源浪费,如下图,B最先失败了,此时逻辑上已经可以返回,但是实际却等到了最长的调用-A返回了整个函数才返回:

func Handler(ctx context.Context) {
    var a, b, c respType
    var errA, errB, errC error
   	wg := sync.WaitGroup{}
	wg.Add(3)
	go func() {
		defer wg.Done()
		a, errA = A(ctx)
	}()
	go func() {
		defer wg.Done()
		b, errB = B(ctx)
	}()
	go func() {
		defer wg.Done()
		c, errC = C(ctx)
	}()
	wg.Wait()
	if errA != nil {
	// ...
	}
	if errB != nil {
	// ...
	}
	if errC != nil {
	// ...
	}
}

基于errgroup.Group实现并发调用

这对于追求极致的我们来说显然是不能接受的,我们希望达到,如果有任意一个调用报错,立刻让所有调用返回的效果:

好在,我们有现成的工具可以用,通过引入"golang.org/x/sync/errgroup",可以轻松实现上面的目的。

为了使用errgroup,先使用WithContext方法创建一个Group

wg, groupCtx := errgroup.WithContext(ctx)

返回的第一个参数是*errgroup.Group,第二个则是在子调用中应该使用的context。

然后,使用Go方法调用所有的并发方法

	wg.Go(func() error {
		var err error
		a, err = A(groupCtx)
		return err
	})

最后, 使用Wait方法等待并发结束,返回值是所有子调用中第一个非nil的error,全成功的话就是nil。

if err := wg.Wait(); err != nil {
// ...
}

因此整体,我们的代码差不多就长这个样子

func handler(ctx context.Context) {
    var a, b, c respType
    wg, groupCtx := errgroup.WithContext(ctx)
	wg.Go(func() error {
		var err error
		a, err = A(groupCtx)
		return err
	})
	wg.Go(func() error {
		var err error
		b, err = B(groupCtx)
		return err
	})
	wg.Go(func() error {
		var err error
		c, err = C(groupCtx)
		return err
	})
	if err := wg.Wait(); err != nil {
    // ... 错误处理
    }
    // 全部成功
}

errgroup内部通过封装了waitGroup和sync.Once实现了这个语法糖。

使用时特别要注意的是,errgroup的提前取消调用rpc是通过cancel那个返回的context(即上面的groupCtx)实现的,因此在所有子调用中都要实现监听groupCtx的Done事件。而在正常的rpc框架中都已经帮我们实现了这件事,因此我们只要保证传进去的是groupCtx即可。

总结

errgroup帮我们封装了并发调用下游时快速失败的逻辑,我们能很方便地使用它进行业务代码的编写。使用的关键是一定要记得在子调用中传递WithContext中返回的Context。

好用的工具千千万,让我们一个个来掌握!

(0)

相关推荐

  • 解析golang中的并发安全和锁问题

    1. 并发安全 package main import ( "fmt" "sync" ) var ( sum int wg sync.WaitGroup ) func test() { for i := 0; i < 5000000; i++ { sum += 1 } wg.Done() } func main() { // 并发和安全锁 wg.Add(2) go test() go test() wg.Wait() fmt.Println(sum) } 上面

  • 详解Golang并发操作中常见的死锁情形

    目录 第一种情形:无缓存能力的管道,自己写完自己读 第二种情形:协程来晚了 第三种情形:管道读写时,相互要求对方先读/写 第四种情形:读写锁相互阻塞,形成隐形死锁 什么是死锁,在Go的协程里面死锁通常就是永久阻塞了,你拿着我的东西,要我先给你然后再给我,我拿着你的东西又让你先给我,不然就不给你.我俩都这么想,这事就解决不了了. 第一种情形:无缓存能力的管道,自己写完自己读 先上代码: func main() { ch := make(chan int, 0) ​ ch <- 666 x := <

  • Golang并发操作中常见的读写锁详析

    互斥锁简单粗暴,谁拿到谁操作.今天给大家介绍一下读写锁,读写锁比互斥锁略微复杂一些,不过我相信我们今天能够把他拿下! golang读写锁,其特征在于 读锁:可以同时进行多个协程读操作,不允许写操作 写锁:只允许同时有一个协程进行写操作,不允许其他写操作和读操作 读写锁有两种模式.没错!一种是读模式,一种是写模式.当他为写模式的话,作用和互斥锁差不多,只允许有一个协程抢到这把锁,其他协程乖乖排队.但是读模式就不一样了,他允许你多个协程读,但是不能写.总结起来就是: 仅读模式: 多协程可读不可写 仅

  • Golang errgroup 设计及实现原理解析

    目录 开篇 errgroup 源码拆解 Group WithContext Wait Go SetLimit TryGo 使用方法 结束语 开篇 继上次学习了信号量 semaphore 扩展库的设计思路和实现之后,今天我们继续来看 golang.org/x/sync 包下的另一个经常被 Golang 开发者使用的大杀器:errgroup. 业务研发中我们经常会遇到需要调用多个下游的场景,比如加载一个商品的详情页,你可能需要访问商品服务,库存服务,券服务,用户服务等,才能从各个数据源获取到所需要的

  • golang基于errgroup实现并发调用的方法

    目录 串行调用 基于sync.WaitGroup实现简单的并发调用 基于errgroup.Group实现并发调用 总结 串行调用 在用go编写web/rpc服务器的时候,经常会出现需要对下游多 个/组 服务调用rpc(或者其他比较耗时的操作)的情况.按照自然的写法,比如对下游有ABC三个调用,串行顺着写,就总共要花费TimeA+TimeB+TimeC的时间: func Handler(ctx context.Context) { var a, b, c respType a = A(ctx) b

  • Golang 语言控制并发 Goroutine的方法

    goroutine 是 Go语言中的轻量级线程实现,由 Go 运行时(runtime)管理.Go 程序会智能地将 goroutine 中的任务合理地分配给每个 CPU. 01介绍 Golang 语言的优势之一是天生支持并发,我们在 Golang 语言开发中,通常使用的并发控制方式主要有 Channel,WaitGroup 和 Context,本文我们主要介绍一下 Golang 语言中并发控制的这三种方式怎么使用?关于它们各自的详细介绍在之前的文章已经介绍过,感兴趣的读者朋友们可以按需翻阅. 02

  • Golang基于泛化调用与Nacos实现Dubbo代理

    目录 前言 准备 实现 项目结构 go.mod 返回数据格式 获取 nacos 元信息 泛化调用 提供 http 服务 启动 效果 前言 由于工作中使用的 rpc 框架是 dubbo,经常需要调试不同环境的 dubbo 接口,例如本地环境.开发环境和测试环境.而为了统一管理 http 接口和 dubbo 接口,希望使用统一的调试工具,例如 PostMan 或 ApiPost 等,因此萌生了开发一个 dubbo 的 http 代理工具的念头. 准备 由于是通用的 dubbo 代理,因此肯定需要使用

  • Golang基于sync.Once实现单例的操作代码

    目录 基于sync.Once实现单例 单例类型定义Driver 类Field conn once.Do(func() {}) 并发访问once.Do() 对外暴露方法Conn() 重新new(Driver)会发生什么? 在go里实现单例模式有多种方式: 基于lock 基于init函数 基于sync.Once 本文介绍基于sync.Once的方式来实现单例,熟练掌握这种模式,并理解其底层原理,对大部分人来讲已经完全够用了. 基于sync.Once实现单例 // 其他package也可见,在其他地方

  • 基于线程、并发的基本概念(详解)

    什么是线程? 提到"线程"总免不了要和"进程"做比较,而我认为在Java并发编程中混淆的不是"线程"和"进程"的区别,而是"任务(Task)".进程是表示资源分配的基本单位.而线程则是进程中执行运算的最小单位,即执行处理机调度的基本单位.关于"线程"和"进程"的区别耳熟能详,说来说去就一句话:通常来讲一个程序有一个进程,而一个进程可以有多个线程. 但是"任务

  • PHP基于文件存储实现缓存的方法

    本文实例讲述了PHP基于文件存储实现缓存的方法.分享给大家供大家参考.具体如下: 在一些数据库数据记录较大,但是服务器有限的时候,可能一条MySQL查询就会好几百毫秒,一个简单的页面一般也有十几条查询,这个时候也个页面加载下来基本要好几秒了,如果并发量高的话服务器基本就瘫痪了,造成一个页面很久也加载不下来,这个时候我们可以使用文件缓存来缓解下MySQL的压力,下面给个使用例子. <?php //页面业务逻辑处理,获得结果 $objPage = new Page_IndexModel($arrPa

  • 使用Golang的singleflight防止缓存击穿的方法

    在使用缓存时,容易发生缓存击穿. 缓存击穿:一个存在的key,在缓存过期的瞬间,同时有大量的请求过来,造成所有请求都去读dB,这些请求都会击穿到DB,造成瞬时DB请求量大.压力骤增. singleflight 介绍 import "golang.org/x/sync/singleflight" singleflight类的使用方法就新建一个singleflight.Group,使用其方法Do或者DoChan来包装方法,被包装的方法在对于同一个key,只会有一个协程执行,其他协程等待那个

  • python基于socketserver实现并发,验证客户端的合法性

    一.socketserver实现并发 tcp协议的socket是只能和一个客户端通信的,使用socketserver可以实现和多个客户端通信,他是在socket的基础上进行的封装,底层还是调用的socket. socket是底层模块 socketserver是基于socket完成的 socketserver代码格式: 服务端: import socketserver # 引入模块 import time ​ ​ # 类名随便定义,但是必须继承socketserver.BaseRequestHan

  • 示例剖析golang中的CSP并发模型

    目录 1. 相关概念: 2. CSP (通信顺序进程) 3. channel:同步&传递消息 4. goroutine:实际并发执行的实体 5. golang调度器 1. 相关概念: 用户态:当一个进程在执行用户自己的代码时处于用户运行态(用户态) 内核态:当一个进程因为系统调用陷入内核代码中执行时处于内核运行态(内核态),引入内核态防止用户态的程序随意的操作内核地址空间,具有一定的安全保护作用.这种保护模式是通过内存页表操作等机制,保证进程间的地址空间不会相互冲突,一个进程的操作不会修改另一个

  • golang基于websocket通信tcp keepalive研究记录

    目录 为什么有tcp Keepalive? tcp Keepalive是否默认开启? 如何设置tcp keepalive? 在Linux内核设置 golang websocket 客户端默认怎么处理tcp keepalive? golang websocket 服务器默认怎么处理tcp keepalive? 为什么有tcp Keepalive? 服务器和客户端建立tcp连接以后,客户端/服务器如何知道对方是否挂掉了? 这时候TCP协议提出一个办法,当客户端端等待超过一定时间后自动给服务端发送一个

随机推荐