一文带你了解Go语言实现的并发神库conc

目录
  • 前言
  • worker池
    • Stream
  • ForEach和map
    • ForEach
    • map
  • 总结

前言

哈喽,大家好,我是asong;前几天逛github发现了一个有趣的并发库-conc,其目标是:

  • 更难出现goroutine泄漏
  • 处理panic更友好
  • 并发代码可读性高

从简介上看主要封装功能如下:

  • waitGroup进行封装,避免了产生大量重复代码,并且也封装recover,安全性更高
  • 提供panics.Catcher封装recover逻辑,统一捕获panic,打印调用栈一些信息
  • 提供一个并发执行任务的worker池,可以控制并发度、goroutine可以进行复用,支持函数签名,同时提供了stream方法来保证结果有序
  • 提供ForEachmap方法优雅的处理切片

接下来就区分模块来介绍一下这个库;

仓库地址:https://github.com/sourcegraph/conc

Go语言标准库有提供sync.waitGroup控制等待goroutine,我们一般会写出如下代码:

func main(){
    var wg sync.WaitGroup
    for i:=0; i < 10; i++{
        wg.Add(1)
        go func() {
            defer wg.Done()
            defer func() {
                // recover panic
                err := recover()
                if err != nil {
                    fmt.Println(err)
                }
            }
            // do something
            handle()
        }
    }
    wg.Wait()
}

上述代码我们需要些一堆重复代码,并且需要单独在每一个func中处理recover逻辑,所以conc库对其进行了封装,代码简化如下:

func main() {
	wg := conc.NewWaitGroup()
	for i := 0; i < 10; i++ {
		wg.Go(doSomething)
	}
	wg.Wait()
}

func doSomething() {
	fmt.Println("test")
}

conc库封装也比较简单,结构如下:

type WaitGroup struct {
	wg sync.WaitGroup
	pc panics.Catcher
}

其自己实现了Catcher类型对recover逻辑进行了封装,封装思路如下:

type Catcher struct {
	recovered atomic.Pointer[RecoveredPanic]
}

recovered是原子指针类型,RecoveredPanic是捕获的recover封装,封装了堆栈等信息:

type RecoveredPanic struct {
	// The original value of the panic.
	Value any
	// The caller list as returned by runtime.Callers when the panic was
	// recovered. Can be used to produce a more detailed stack information with
	// runtime.CallersFrames.
	Callers []uintptr
	// The formatted stacktrace from the goroutine where the panic was recovered.
	// Easier to use than Callers.
	Stack []byte
}

提供了Try方法执行方法,只会记录第一个panic的goroutine信息:

func (p *Catcher) Try(f func()) {
	defer p.tryRecover()
	f()
}

func (p *Catcher) tryRecover() {
	if val := recover(); val != nil {
		rp := NewRecoveredPanic(1, val)
        // 只会记录第一个panic的goroutine信息
		p.recovered.CompareAndSwap(nil, &rp)
	}
}

提供了Repanic()方法用来重放捕获的panic:

func (p *Catcher) Repanic() {
	if val := p.Recovered(); val != nil {
		panic(val)
	}
}

func (p *Catcher) Recovered() *RecoveredPanic {
	return p.recovered.Load()
}

waitGroup对此也分别提供了Wait()WaitAndRecover()方法:

func (h *WaitGroup) Wait() {
	h.wg.Wait()

	// Propagate a panic if we caught one from a child goroutine.
	h.pc.Repanic()
}

func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
	h.wg.Wait()

	// Return a recovered panic if we caught one from a child goroutine.
	return h.pc.Recovered()
}

wait方法只要有一个goroutine发生panic就会向上抛出panic,比较简单粗暴;

waitAndRecover方法只有有一个goroutine发生panic就会返回第一个recover的goroutine信息;

总结:conc库对waitGrouop的封装总体是比较不错的,可以减少重复的代码;

worker池

conc提供了几种类型的worker池:

  • ContextPool:可以传递context的pool,若有goroutine发生错误可以cancel其他goroutine
  • ErrorPool:通过参数可以控制只收集第一个error还是所有error
  • ResultContextPool:若有goroutine发生错误会cancel其他goroutine并且收集错误
  • RestultPool:收集work池中每个任务的执行结果,并不能保证顺序,保证顺序需要使用stream或者iter.map;

我们来看一个简单的例子:

import "github.com/sourcegraph/conc/pool"

func ExampleContextPool_WithCancelOnError() {
	p := pool.New().
		WithMaxGoroutines(4).
		WithContext(context.Background()).
		WithCancelOnError()
	for i := 0; i < 3; i++ {
		i := i
		p.Go(func(ctx context.Context) error {
			if i == 2 {
				return errors.New("I will cancel all other tasks!")
			}
			<-ctx.Done()
			return nil
		})
	}
	err := p.Wait()
	fmt.Println(err)
	// Output:
	// I will cancel all other tasks!
}

在创建pool时有如下方法可以调用:

  • p.WithMaxGoroutines()配置pool中goroutine的最大数量
  • p.WithErrors:配置pool中的task是否返回error
  • p.WithContext(ctx):配置pool中运行的task当遇到第一个error要取消
  • p.WithFirstError:配置pool中的task只返回第一个error
  • p.WithCollectErrored:配置pool的task收集所有error

pool的基础结构如下:

type Pool struct {
	handle   conc.WaitGroup
	limiter  limiter
	tasks    chan func()
	initOnce sync.Once
}

limiter是控制器,用chan来控制goroutine的数量:

type limiter chan struct{}

func (l limiter) limit() int {
	return cap(l)
}

func (l limiter) release() {
	if l != nil {
		<-l
	}
}

pool的核心逻辑也比较简单,如果没有设置limiter,那么就看有没有空闲的worker,否则就创建一个新的worker,然后投递任务进去;

如果设置了limiter,达到了limiter worker数量上限,就把任务投递给空闲的worker,没有空闲就阻塞等着;

func (p *Pool) Go(f func()) {
	p.init()

	if p.limiter == nil {
		// 没有限制
		select {
		case p.tasks <- f:
			// A goroutine was available to handle the task.
		default:
			// No goroutine was available to handle the task.
			// Spawn a new one and send it the task.
			p.handle.Go(p.worker)
			p.tasks <- f
		}
	} else {
		select {
		case p.limiter <- struct{}{}:
			// If we are below our limit, spawn a new worker rather
			// than waiting for one to become available.
			p.handle.Go(p.worker)

			// We know there is at least one worker running, so wait
			// for it to become available. This ensures we never spawn
			// more workers than the number of tasks.
			p.tasks <- f
		case p.tasks <- f:
			// A worker is available and has accepted the task.
			return
		}
	}

}

这里work使用的是一个无缓冲的channel,这种复用方式很巧妙,如果goroutine执行很快避免创建过多的goroutine;

使用pool处理任务不能保证有序性,conc库又提供了Stream方法,返回结果可以保持顺序;

Stream

Steam的实现也是依赖于pool,在此基础上做了封装保证结果的顺序性,先看一个例子:

func ExampleStream() {
	times := []int{20, 52, 16, 45, 4, 80}

	stream := stream2.New()
	for _, millis := range times {
		dur := time.Duration(millis) * time.Millisecond
		stream.Go(func() stream2.Callback {
			time.Sleep(dur)
			// This will print in the order the tasks were submitted
			return func() { fmt.Println(dur) }
		})
	}
	stream.Wait()

	// Output:
	// 20ms
	// 52ms
	// 16ms
	// 45ms
	// 4ms
	// 80ms
}

stream的结构如下:

type Stream struct {
	pool             pool.Pool
	callbackerHandle conc.WaitGroup
	queue            chan callbackCh

	initOnce sync.Once
}

queue是一个channel类型,callbackCh也是channel类型 - chan func():

type callbackCh chan func()

在提交goroutine时按照顺序生成callbackCh传递结果:

func (s *Stream) Go(f Task) {
	s.init()

	// Get a channel from the cache.
	ch := getCh()

	// Queue the channel for the callbacker.
	s.queue <- ch

	// Submit the task for execution.
	s.pool.Go(func() {
		defer func() {
			// In the case of a panic from f, we don't want the callbacker to
			// starve waiting for a callback from this channel, so give it an
			// empty callback.
			if r := recover(); r != nil {
				ch <- func() {}
				panic(r)
			}
		}()

		// Run the task, sending its callback down this task's channel.
		callback := f()
		ch <- callback
	})
}

var callbackChPool = sync.Pool{
	New: func() any {
		return make(callbackCh, 1)
	},
}

func getCh() callbackCh {
	return callbackChPool.Get().(callbackCh)
}

func putCh(ch callbackCh) {
	callbackChPool.Put(ch)
}

ForEach和map

ForEach

conc库提供了ForEach方法可以优雅的并发处理切片,看一下官方的例子:

conc库使用泛型进行了封装,我们只需要关注handle代码即可,避免冗余代码,我们自己动手写一个例子:

func main() {
	input := []int{1, 2, 3, 4}
	iterator := iter.Iterator[int]{
		MaxGoroutines: len(input) / 2,
	}

	iterator.ForEach(input, func(v *int) {
		if *v%2 != 0 {
			*v = -1
		}
	})

	fmt.Println(input)
}

ForEach内部实现为Iterator结构及核心逻辑如下:

type Iterator[T any] struct {
	MaxGoroutines int
}
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
	if iter.MaxGoroutines == 0 {
		// iter is a value receiver and is hence safe to mutate
		iter.MaxGoroutines = defaultMaxGoroutines()
	}

	numInput := len(input)
	if iter.MaxGoroutines > numInput {
		// No more concurrent tasks than the number of input items.
		iter.MaxGoroutines = numInput
	}

	var idx atomic.Int64
	// 通过atomic控制仅创建一个闭包
	task := func() {
		i := int(idx.Add(1) - 1)
		for ; i < numInput; i = int(idx.Add(1) - 1) {
			f(i, &input[i])
		}
	}

	var wg conc.WaitGroup
	for i := 0; i < iter.MaxGoroutines; i++ {
		wg.Go(task)
	}
	wg.Wait()
}

可以设置并发的goroutine数量,默认取的是GOMAXPROCS ,也可以自定义传参;

并发执行这块设计的很巧妙,仅创建了一个闭包,通过atomic控制idx,避免频繁触发GC;

map

conc库提供的map方法可以得到对切片中元素结果,官方例子:

使用map可以提高代码的可读性,并且减少了冗余代码,自己写个例子:

func main() {
	input := []int{1, 2, 3, 4}
	mapper := iter.Mapper[int, bool]{
		MaxGoroutines: len(input) / 2,
	}

	results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
	fmt.Println(results)
	// Output:
	// [false true false true]
}

map的实现也依赖于Iterator,也是调用的ForEachIdx方法,区别于ForEach是记录处理结果;

总结

花了小半天时间看了一下这个库,很多设计点值得我们学习,总结一下我学习到的知识点:

  • conc.WatiGroup对Sync.WaitGroup进行了封装,对Add、Done、Recover进行了封装,提高了可读性,避免了冗余代码
  • ForEach、Map方法可以更优雅的并发处理切片,代码简洁易读,在实现上Iterator中的并发处理使用atomic来控制只创建一个闭包,避免了GC性能问题
  • pool是一个并发的协程队列,可以控制协程的数量,实现上也很巧妙,使用一个无缓冲的channel作为worker,如果goroutine执行速度快,避免了创建多个goroutine
  • stream是一个保证顺序的并发协程队列,实现上也很巧妙,使用sync.Pool在提交goroutine时控制顺序,值得我们学习;

到此这篇关于一文带你了解Go语言实现的并发神库conc的文章就介绍到这了,更多相关Go语言并发库conc内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Golang编程并发工具库MapReduce使用实践

    目录 环境 项目需求 mapReduce使用说明 需求实现 业务逻辑 创建任务队列 运行结果 结论 引申阅读 环境 go version go1.16.4 windows/amd64 Intel(R) Core(TM) i7-7820HK CPU @ 2.90GHz 4核心8线程 项目需求 处理数个约5MB的小文件 从源目录读取文件并拷贝到目标目录 计算源文件MD5和目标文件MD5进行对比,如不相同则报错并终止程序执行 mapReduce使用说明 go get -u github.com/tal

  • Go语言实现的可读性更高的并发神库详解

    目录 前言 WaitGroup的封装 worker池 Stream ForEach和map ForEach map 总结 前言 前几天逛github发现了一个有趣的并发库-conc,其目标是: 更难出现goroutine泄漏 处理panic更友好 并发代码可读性高 从简介上看主要封装功能如下: 对waitGroup进行封装,避免了产生大量重复代码,并且也封装recover,安全性更高 提供panics.Catcher封装recover逻辑,统一捕获panic,打印调用栈一些信息 提供一个并发执行

  • gorm golang 并发连接数据库报错的解决方法

    底层报错 error:cannot assign requested address 原因 并发场景下 client 频繁请求端口建立tcp连接导致端口被耗尽 解决方案 root执行即可 sysctl -w net.ipv4.tcp_timestamps=1 开启对于TCP时间戳的支持,若该项设置为0,则下面一项设置不起作用 sysctl -w net.ipv4.tcp_tw_recycle=1 表示开启TCP连接中TIME-WAIT sockets的快速回收 以上这篇gorm golang 并

  • Golang并发读取文件数据并写入数据库的项目实践

    目录 需求 项目结构 获取data目录下的文件 按行读取文本数据 数据类型定义 并发读取文件 将数据写入数据库 完整main.go代码 测试运行 需求 最近接到一个任务,要把一批文件中的十几万条JSON格式数据写入到Oracle数据库中,Oracle是企业级别的数据库向来以高性能著称,所以尽可能地利用这一特性.当时第一时间想到的就是用多线程并发读文件并操作数据库,而Golang是为并发而生的,用Golang进行并发编程非常方便,因此这里选用Golang并发读取文件并用Gorm操作数据库.然而Go

  • 一文带你了解Go语言实现的并发神库conc

    目录 前言 worker池 Stream ForEach和map ForEach map 总结 前言 哈喽,大家好,我是asong:前几天逛github发现了一个有趣的并发库-conc,其目标是: 更难出现goroutine泄漏 处理panic更友好 并发代码可读性高 从简介上看主要封装功能如下: 对waitGroup进行封装,避免了产生大量重复代码,并且也封装recover,安全性更高 提供panics.Catcher封装recover逻辑,统一捕获panic,打印调用栈一些信息 提供一个并发

  • 一文带你掌握Go语言运算符的使用

    目录 算术运算符 关系运算符 逻辑运算符 位运算符 赋值运算符 其他运算符 运算符优先级 运算符用于在程序运行时执行数学或逻辑运算. Go 语言内置的运算符有: 算术运算符 关系运算符 逻辑运算符 位运算符 赋值运算符 其他运算符 接下来让我们来详细看看各个运算符的介绍. 算术运算符 算术运算符,所有的数据类型要相同 下表列出了所有Go语言的算术运算符.假定 A 值为 10,B 值为 20. 运算符 描述 实例 + 相加 A + B 输出结果 30 - 相减 A - B 输出结果 -10 * 相

  • 一文带你了解Go语言中的单元测试

    目录 基本概念 示例一:取整函数基本测试 示例二:Fail()函数 示例三:FailNow函数 实例四:Log和Fetal函数 基本概念 上一节提到,代码完成的标准之一还包含了单元测试,这部分也是很多开发流程中不规范的地方.写过单元测试的开发人员应该理解,单元测试最核心的价值是为了证明:为什么我写的代码是正确的?也就是从逻辑角度帮你检查你的代码.但是另外一方面,如果从单元测试覆盖率角度来看,单元测试也是非常耗时的,几乎是三倍于你代码的开发时间,所以在很多迭代速度非常快的项目中,单元测试就几乎没人

  • 一文带你入门Go语言中定时任务库Cron的使用

    目录 前言 快速开始 安装 导入 Demo Cron表达式格式 标准格式 预定义时间表 常用的方法介绍 new() AddJob() AddFunc() Start() 相关推荐 Go第三方库之cronexpr——解析 crontab 表达式 总结 前言 在平时的开发需求中,我们经常会有一些重复执行的操作需要触发执行,和系统约个时间,在几点几分几秒或者每隔几分钟跑一个任务,说白了就是定时任务,,想必大家第一反应都是linux的Crontab.其实定时任务不止使用系统自带的Crontab,在Go语

  • 一文带你了解Go语言中的类型断言和类型转换

    目录 类型断言 类型判断 为什么需要断言 类型转换 什么时候使用类型转换 类型为什么称为转换 类型结论 在Go中,类型断言和类型转换是一个令人困惑的事情,他们似乎都在做同样的事情. 下面是一个类型断言的例子: var greeting interface{} = "hello world" greetingStr := greeting.(string) 接着看一个类型转换的例子: greeting := []byte("hello world") greeting

  • 一文带你了解Go语言中的指针和结构体

    目录 前言 指针 指针的定义 获取和修改指针所指向变量的值 结构体 结构体定义 结构体的创建方式 小结 前言 前面的两篇文章对 Go 语言的基础语法和基本数据类型以及几个复合数据类型进行介绍,本文将对 Go 里面的指针和结构体进行介绍,也为后续文章做铺垫. 指针 在 Go 语言中,指针可以简单理解是一个地址,指针类型是依托于某一个类型而存在的,例如 Go 里面的基本数据类型 int.float64.string 等,它们所对应的指针类型为 *int.*float64.*string等. 指针的定

  • 一文带你了解Go语言中接口的使用

    目录 接口 接口的实现 接口类型变量 空接口 类型断言 类型断言变种 type switch 小结 接口 在 Go 语言中,接口是一种抽象的类型,是一组方法的集合.接口存在的目的是定义规范,而规范的细节由其他对象去实现.我们来看一个例子: import "fmt" type Person struct { Name string } func main() { person := Person{Name: "cmy"} fmt.Println(person) //

  • 一文带你掌握Go语言中的文件读取操作

    目录 os 包 和 bufio 包 os.Open 与 os.OpenFile 以及 File.Read 读取文件操作 bufio.NewReader 和 Reader.ReadString 读取文件操作 小结 os 包 和 bufio 包 Go 标准库的 os 包,为我们提供很多操作文件的函数,如 Open(name) 打开文件.Create(name) 创建文件等函数,与之对应的是 bufio 包,os 包是直接对磁盘进行操作的,而 bufio 包则是带有缓冲的操作,不用每次都去操作磁盘.

  • 一文带你了解Go语言中方法的调用

    目录 前言 方法 方法的调用 Receiver 参数类型的选择 方法的约束 小结 前言 在前面的 一文熟悉 Go 函数 文章中,介绍了 Go 函数的声明,函数的几种形式如匿名函数.闭包.基于函数的自定义类型和函数参数详解等,而本文将对方法进行介绍,方法的本质就是函数,介绍方法的同时也会顺带对比其与函数的不同之处. 方法 在 Go 中,我们可以为任何的数据类型定义方法(指针或接口除外),现在让我们看一看方法的声明和组成部分以及与函数有什么不同之处. type Person struct { age

  • 一文带你熟悉Go语言中函数的使用

    目录 函数 函数的声明 Go 函数支持变长参数 匿名函数 闭包 init 函数 函数参数详解 形式参数与实际参数 值传递 函数是一种数据类型 小结 函数 函数的英文单词是 Function,这个单词还有着功能的意思.在 Go 语言中,函数是实现某一特定功能的代码块.函数代表着某个功能,可以在同一个地方多次使用,也可以在不同地方使用.因此使用函数,可以提高代码的复用性,减少代码的冗余. 函数的声明 通过案例了解函数的声明有哪几部分: 定义一个函数,实现两个数相加的功能,并将相加之后的结果返回. f

随机推荐