golang基于errgroup实现并发调用的方法
目录
- 串行调用
- 基于sync.WaitGroup实现简单的并发调用
- 基于errgroup.Group实现并发调用
- 总结
串行调用
在用go编写web/rpc服务器的时候,经常会出现需要对下游多 个/组 服务调用rpc(或者其他比较耗时的操作)的情况。
按照自然的写法,比如对下游有ABC三个调用,串行顺着写,就总共要花费TimeA+TimeB+TimeC的时间:
func Handler(ctx context.Context) { var a, b, c respType a = A(ctx) b = B(ctx) c = C(ctx) }
基于sync.WaitGroup实现简单的并发调用
但经常地,几个rpc相互之间没有依赖关系的情况,这时,我们稍加思考就会想到使用并发的方式,同时发出请求,阻塞等到所有请求返回,这样,总体耗时就变成了Max(TimeA, TimeB, TimeC),我们可以通过常用的sync.WaitGroup轻松实现这事:
func Handler(ctx context.Context) { var a, b, c respType wg := sync.WaitGroup{} wg.Add(3) go func() { defer wg.Done() a = A(ctx) }() go func() { defer wg.Done() b = B(ctx) }() go func() { defer wg.Done() c = C(ctx) }() wg.Wait() }
但是现实事件是不完美的,尤其是在加入了网络这一因素后,我们经常会需要处理调用失败的情况,很多情况下,并发的几个操作只要任一失败,整个处理就算失败了,但是由于WaitGroup要等所有调用都done才能返回,因此调用时间是由耗时最长的那个(不一定是失败的)决定的,如果不是失败的那个,其实就产生了资源浪费,如下图,B最先失败了,此时逻辑上已经可以返回,但是实际却等到了最长的调用-A返回了整个函数才返回:
func Handler(ctx context.Context) { var a, b, c respType var errA, errB, errC error wg := sync.WaitGroup{} wg.Add(3) go func() { defer wg.Done() a, errA = A(ctx) }() go func() { defer wg.Done() b, errB = B(ctx) }() go func() { defer wg.Done() c, errC = C(ctx) }() wg.Wait() if errA != nil { // ... } if errB != nil { // ... } if errC != nil { // ... } }
基于errgroup.Group实现并发调用
这对于追求极致的我们来说显然是不能接受的,我们希望达到,如果有任意一个调用报错,立刻让所有调用返回的效果:
好在,我们有现成的工具可以用,通过引入"golang.org/x/sync/errgroup",可以轻松实现上面的目的。
为了使用errgroup,先使用WithContext方法创建一个Group
wg, groupCtx := errgroup.WithContext(ctx)
返回的第一个参数是*errgroup.Group,第二个则是在子调用中应该使用的context。
然后,使用Go方法调用所有的并发方法
wg.Go(func() error { var err error a, err = A(groupCtx) return err })
最后, 使用Wait方法等待并发结束,返回值是所有子调用中第一个非nil的error,全成功的话就是nil。
if err := wg.Wait(); err != nil { // ... }
因此整体,我们的代码差不多就长这个样子
func handler(ctx context.Context) { var a, b, c respType wg, groupCtx := errgroup.WithContext(ctx) wg.Go(func() error { var err error a, err = A(groupCtx) return err }) wg.Go(func() error { var err error b, err = B(groupCtx) return err }) wg.Go(func() error { var err error c, err = C(groupCtx) return err }) if err := wg.Wait(); err != nil { // ... 错误处理 } // 全部成功 }
errgroup内部通过封装了waitGroup和sync.Once实现了这个语法糖。
使用时特别要注意的是,errgroup的提前取消调用rpc是通过cancel那个返回的context(即上面的groupCtx)实现的,因此在所有子调用中都要实现监听groupCtx的Done事件。而在正常的rpc框架中都已经帮我们实现了这件事,因此我们只要保证传进去的是groupCtx即可。
总结
errgroup帮我们封装了并发调用下游时快速失败的逻辑,我们能很方便地使用它进行业务代码的编写。使用的关键是一定要记得在子调用中传递WithContext中返回的Context。
好用的工具千千万,让我们一个个来掌握!