Golang协程池gopool设计与实现

目录
  • Goroutine
  • 协程池
  • gopool
  • 核心实现
    • Pool
    • Task
    • Worker
    • 整体来看
    • 三个角色的定位
    • 使用 sync.Pool 进行性能优化

Goroutine

Goroutine 是 Golang 提供的一种轻量级线程,我们通常称之为「协程」,相比较线程,创建一个协程的成本是很低的。所以你会经常看到 Golang 开发的应用出现上千个协程并发的场景。

Goroutine 的优势:

  • 与线程相比,Goroutines 成本很低。

它们的堆栈大小只有几 kb,堆栈可以根据应用程序的需要增长和缩小,context switch 也很快,而在线程的情况下,堆栈大小必须指定并固定。

  • Goroutine 被多路复用到更少数量的 OS 线程。

一个包含数千个 Goroutine 的程序中可能只有一个线程。如果该线程中的任何 Goroutine 阻塞等待用户输入,则创建另一个 OS 线程并将剩余的 Goroutine 移动到新的 OS 线程。所有这些都由运行时处理,作为开发者无需耗费心力关心,这也使得我们有很干净的 API 来支持并发。

  • Goroutines 使用 channel 进行通信。

channel 的设计有效防止了在使用 Goroutine 访问共享内存时发生竞争条件(race conditions) 。channel 可以被认为是 Goroutine 进行通信的管道。

下文中我们会以「协程」来代指 Goroutine。

协程池

在高并发场景下,我们可能会启动大量的协程来处理业务逻辑。协程池是一种利用池化技术,复用对象,减少内存分配的频率以及协程创建开销,从而提高协程执行效率的技术。

最近抽空了解了字节官方开源的 gopkg 库提供的 gopool 协程池实现,感觉还是很高质量的,代码也非常简洁清晰,而且 Kitex 底层也在使用 gopool 来管理协程,这里我们梳理一下设计和实现。

gopool

Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool

gopool is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines. It is an alternative to the go keyword.

了解官方 README 就会发现gopool的用法其实非常简单,将曾经我们经常使用的 go func(){...} 替换为 gopool.Go(func(){...}) 即可。

此时 gopool 将会使用默认的配置来管理你启动的协程,你也可以选择针对业务场景配置池子大小,以及扩容上限。

old:

go func() {
	// do your job
}()

new:

import (
    "github.com/bytedance/gopkg/util/gopool"
)

gopool.Go(func(){
	/// do your job
})

核心实现

下面我们来看看gopool是怎样实现协程池管理的。

Pool

Pool 是一个定义了协程池能力的接口。

type Pool interface {
	// 池子的名称
	Name() string

	// 设置池子内Goroutine的容量
	SetCap(cap int32)

	// 执行 f 函数
	Go(f func())

	// 带 ctx,执行 f 函数
	CtxGo(ctx context.Context, f func())

	// 设置发生panic时调用的函数
	SetPanicHandler(f func(context.Context, interface{}))
}

gopool 提供了这个接口的默认实现(即下面即将介绍的pool),当我们直接调用 gopool.CtxGo 时依赖的就是这个。

这样的设计模式在 Kitex 中也经常出现,所有的依赖均设计为接口,便于随后扩展,底层提供一个默认的实现暴露出去,这样对调用方也很友好。

type pool struct {
	// 池子名称
	name string

	// 池子的容量, 即最大并发工作的 goroutine 的数量
	cap int32

	// 池子配置
	config *Config

	// task 链表
	taskHead  *task
	taskTail  *task
	taskLock  sync.Mutex
	taskCount int32

	// 记录当前正在运行的 worker 的数量
	workerCount int32

	// 当 worker 出现panic时被调用
	panicHandler func(context.Context, interface{})
}

// NewPool 创建一个新的协程池,初始化名称,容量,配置
func NewPool(name string, cap int32, config *Config) Pool {
	p := &pool{
		name:   name,
		cap:    cap,
		config: config,
	}
	return p
}

调用 NewPool 获取了以 Pool 的形式返回的 pool 结构体。

Task

type task struct {
	ctx context.Context
	f   func()

	next *task
}

task 是一个链表结构,可以把它理解为一个待执行的任务,它包含了当前节点需要执行的函数f, 以及指向下一个task的指针。

综合前一节 pool 的定义,我们可以看到,一个协程池 pool 对应了一组task

pool 维护了指向链表的头尾的两个指针:taskHeadtaskTail,以及链表的长度taskCount 和对应的锁 taskLock

Worker

type worker struct {
	pool *pool
}

一个 worker 就是逻辑上的一个执行器,它唯一对应到一个协程池 pool。当一个worker被唤起,将会开启一个goroutine ,不断地从 pool 中的 task链表获取任务并执行。

func (w *worker) run() {
	go func() {
		for {
                        // 声明即将执行的 task
			var t *task

                        // 操作 pool 中的 task 链表,加锁
			w.pool.taskLock.Lock()
			if w.pool.taskHead != nil {
                                // 拿到 taskHead 准备执行
				t = w.pool.taskHead

                                // 更新链表的 head 以及数量
				w.pool.taskHead = w.pool.taskHead.next
				atomic.AddInt32(&w.pool.taskCount, -1)
			}
                        // 如果前一步拿到的 taskHead 为空,说明无任务需要执行,清理后返回
			if t == nil {
				w.close()
				w.pool.taskLock.Unlock()
				w.Recycle()
				return
			}
			w.pool.taskLock.Unlock()

                        // 执行任务,针对 panic 会recover,并调用配置的 handler
			func() {
				defer func() {
					if r := recover(); r != nil {
						msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
						logger.CtxErrorf(t.ctx, msg)
						if w.pool.panicHandler != nil {
							w.pool.panicHandler(t.ctx, r)
						}
					}
				}()
				t.f()
			}()
			t.Recycle()
		}
	}()
}

整体来看

看到这里,其实就能把整个流程串起来了。我们来看看对外的接口 CtxGo(context.Context, f func()) 到底做了什么?

func Go(f func()) {
	CtxGo(context.Background(), f)
}

func CtxGo(ctx context.Context, f func()) {
	defaultPool.CtxGo(ctx, f)
}

func (p *pool) CtxGo(ctx context.Context, f func()) {

        // 创建一个 task 对象,将 ctx 和待执行的函数赋值
	t := taskPool.Get().(*task)
	t.ctx = ctx
	t.f = f

        // 将 task 插入 pool 的链表的尾部,更新链表数量
	p.taskLock.Lock()
	if p.taskHead == nil {
		p.taskHead = t
		p.taskTail = t
	} else {
		p.taskTail.next = t
		p.taskTail = t
	}
	p.taskLock.Unlock()
	atomic.AddInt32(&p.taskCount, 1)

	// 以下两个条件满足时,创建新的 worker 并唤起执行:
	// 1. task的数量超过了配置的限制
	// 2. 当前运行的worker数量小于上限(或无worker运行)
	if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {

                // worker数量+1
		p.incWorkerCount()

                // 创建一个新的worker,并把当前 pool 赋值
		w := workerPool.Get().(*worker)
		w.pool = p

                // 唤起worker执行
		w.run()
	}
}

相信看了代码注释,大家就能理解发生了什么。

gopool 会自行维护一个 defaultPool,这是一个默认的 pool 结构体,在引入包的时候就进行初始化。当我们直接调用 gopool.CtxGo() 时,本质上是调用了 defaultPool 的同名方法

func init() {
	defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}

const (
	defaultScalaThreshold = 1
)

// Config is used to config pool.
type Config struct {
	// 控制扩容的门槛,一旦待执行的 task 超过此值,且 worker 数量未达到上限,就开始启动新的 worker
	ScaleThreshold int32
}

// NewConfig creates a default Config.
func NewConfig() *Config {
	c := &Config{
		ScaleThreshold: defaultScalaThreshold,
	}
	return c
}

defaultPool 的名称为 gopool.DefaultPool,池子容量一万,扩容下限为 1。

当我们调用 CtxGo时,gopool 就会更新维护的任务链表,并且判断是否需要扩容 worker

  • 若此时已经有很多 worker 启动(底层一个 worker 对应一个 goroutine),不需要扩容,就直接返回。
  • 若判断需要扩容,就创建一个新的worker,并调用 worker.run()方法启动,各个worker会异步地检查 pool 里面的任务链表是否还有待执行的任务,如果有就执行。

三个角色的定位

  • task 是一个待执行的任务节点,同时还包含了指向下一个任务的指针,链表结构;
  • worker 是一个实际执行任务的执行器,它会异步启动一个 goroutine 执行协程池里面未执行的task
  • pool 是一个逻辑上的协程池,对应了一个task链表,同时负责维护task状态的更新,以及在需要的时候创建新的 worker

使用 sync.Pool 进行性能优化

其实到这个地方,gopool已经是一个代码简洁清晰的协程池库了,但是性能上显然有改进空间,所以gopool的作者应用了多次 sync.Pool 来池化对象的创建,复用woker和task对象。

这里建议大家直接看源码,其实在上面的代码中已经有所涉及。

  • task 池化
var taskPool sync.Pool

func init() {
	taskPool.New = newTask
}

func newTask() interface{} {
	return &task{}
}

func (t *task) Recycle() {
	t.zero()
	taskPool.Put(t)
}
  • worker 池化
var workerPool sync.Pool

func init() {
	workerPool.New = newWorker
}

func newWorker() interface{} {
	return &worker{}
}

func (w *worker) Recycle() {
	w.zero()
	workerPool.Put(w)
}

到此这篇关于Golang协程池gopool设计与实现的文章就介绍到这了,更多相关Golang协程池gopool内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • golang协程池模拟实现群发邮件功能

    比如批量群发邮件的功能 因为发送邮件是个比较耗时的操作, 如果是传统的一个个执行 , 总体耗时比较长 可以使用golang实现一个协程池 , 并行发送邮件 pool包下的pool.go文件 package pool import "log" //具体任务,可以传参可以自定义操作 type Task struct { Args interface{} Do func(interface{})error } //协程的个数 var Nums int //任务通道 var JobChanne

  • golang协程池设计详解

    Why Pool go自从出生就身带"高并发"的标签,其并发编程就是由groutine实现的,因其消耗资源低,性能高效,开发成本低的特性而被广泛应用到各种场景,例如服务端开发中使用的HTTP服务,在golang net/http包中,每一个被监听到的tcp链接都是由一个groutine去完成处理其上下文的,由此使得其拥有极其优秀的并发量吞吐量 for { // 监听tcp rw, e := l.Accept() if e != nil { ....... } tempDelay = 0

  • golang 40行代码实现通用协程池

    代码仓库 goroutine-pool golang的协程管理 golang协程机制很方便的解决了并发编程的问题,但是协程并不是没有开销的,所以也需要适当限制一下数量. 不使用协程池的代码(示例代码使用chan实现,代码略啰嗦) func (p *converter) upload(bytes [][]byte) ([]string, error) { ch := make(chan struct{}, 4) wg := &sync.WaitGroup{} wg.Add(len(bytes))

  • Golang协程池gopool设计与实现

    目录 Goroutine 协程池 gopool 核心实现 Pool Task Worker 整体来看 三个角色的定位 使用 sync.Pool 进行性能优化 Goroutine Goroutine 是 Golang 提供的一种轻量级线程,我们通常称之为「协程」,相比较线程,创建一个协程的成本是很低的.所以你会经常看到 Golang 开发的应用出现上千个协程并发的场景. Goroutine 的优势: 与线程相比,Goroutines 成本很低. 它们的堆栈大小只有几 kb,堆栈可以根据应用程序的需

  • golang协程设计及调度原理

    目录 一.协程设计-GMP模型 1.工作线程M 2.逻辑处理器p 3.协程g 4.全局调度信息schedt 5.GMP详细示图 二.协程调度 1.调度策略 获取本地运行队列 获取全局运行队列 协程窃取 2.调度时机 主动调度 被动调度 抢占调度 一.协程设计-GMP模型 线程是操作系统调度到CPU中执行的基本单位,多线程总是交替式地抢占CPU的时间片,线程在上下文的切换过程中需要经过操作系统用户态与内核态的切换.golang的协程(G)依然运行在工作线程(M)之上,但是借助语言的调度器,协程只需

  • java基于quasar实现协程池的方法示例

    业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍.所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!在csdn中基本都是对它的基本使用,用法和线程差不多.不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~) 一个线程可以多个协程,一个进程也可以单独拥有多个协程. 线程进程都是同步机制,而协程则是异步. 协程能保留上一次调用时的状态,每次过程重入时,就相当于进

  • GO实现协程池管理的方法

    使用channel实现协程池 通过 Channel 实现 Goroutine Pool,缺点是会造成协程的频繁开辟和注销,但好在简单灵活通用. package main import ( "fmt" "io/ioutil" "net/http" "sync" ) // Pool goroutine Pool type Pool struct { queue chan int wg *sync.WaitGroup } // Ne

  • Go简单实现协程池的实现示例

    目录 MPG模型 通道的特性 首先就是进程.线程.协程讲解老三样. 进程: 本质上是一个独立执行的程序,进程是操作系统进行资源分配和调度的基本概念,操作系统进行资源分配和调度的一个独立单位. 线程: 是操作系统能够进行运算调度的最小单位.它被包含在进程之中,是进程中的实际运作单位.一个进程中可以并发多个线程,每条线程执行不同的任务,切换受系统控制. 协程:  又称为微线程,是一种用户态的轻量级线程,协程不像线程和进程需要进行系统内核上的上下文切换,协程的上下文切换是由用户自己决定的,有自己的上下

  • golang协程与线程区别简要介绍

    目录 一.进程与线程 二.并发与并行 三.go协程与线程 1.调度方式 2.调度策略 3.上下文切换速度 4.栈的大小 四.GMP模型 一.进程与线程 进程是操作系统资源分配的基本单位,是程序运行的实例.例如打开一个浏览器就开启了一个进程. 线程是操作系统调度到CPU中执行的基本单位.例如在浏览器里新建一个窗口就需要一个线程来进行处理. 在一般情况下,线程是进程的组成部分,一个进程可以包含多个线程.例如浏览器可以新建多个窗口. 进程中的多个线程并发执行并共享进程的内存等资源.例如多个窗口之间可以

  • 一文详解Golang协程调度器scheduler

    目录 1. 调度器scheduler的作用 2. GMP模型 3. 调度机制 1. 调度器scheduler的作用 我们都知道,在Go语言中,程序运行的最小单元是gorouines. 然而程序的运行最终都是要交给操作系统来执行的,以Java为例,Java中的一个线程对应的就是操作系统中的线程,以此来实现在操作系统中的运行.在Go中,gorouines比线程更轻量级,其与操作系统的线程也不是一一对应的关系,然而,最终我们想要执行程序,还是要借助操作系统的线程来完成,调度器scheduler的工作就

随机推荐