go语言限制协程并发数的方案详情

目录
  • 前言
  • 一、使用带缓冲的通道限制并发数
    • 1.1方案详情
    • 1.2评估总结
    • 2.2评估总结
  • 其它

前言

在使用协程并发处理某些任务时, 其并发数量往往因为各种因素的限制不能无限的增大. 例如网络请求、数据库查询等等。从运行效率角度考虑,在相关服务可以负载的前提下(限制最大并发数),尽可能高的并发。本文就这个问题探寻一下解决方案和实现。共两种思路,一是使用带缓冲的通道实现,二是使用锁实现。

一、使用带缓冲的通道限制并发数

1.1方案详情

先上代码如下, 逻辑很简单.

package golimit

type GoLimit struct {
    ch chan int
}
func NewGoLimit(max int) *GoLimit {
    return &GoLimit{ch: make(chan int, max)}
func (g *GoLimit) Add() {
    g.ch <- 1
func (g *GoLimit) Done() {
    <-g.ch

按允许最大并发数创建一个带缓冲的通道, 创建协程之前调用Add()往通道里写一个数据, 协程完成是调用Done()方法读取一个数据. 若无法往通道里写数据时, 表示通道已经写满, 也就是目前的协程并发数为允许的最大数量. Add()方法将被阻塞, 也就无法创建新的协程. 直到有协程运行完成, 调用Done()方法读取了通道了一个数据.

以下是使用示例

package main

import (
    "golimit"
    "log"
    "time"
)
func main() {
    log.Println("开始测试...")
    g := golimit.NewGoLimit(2) //max_num(最大允许并发数)设置为2
    for i := 0; i < 10; i++ {
        //尝试增加一个协程, 若已达到最大并发数,将阻塞
        g.Add()
        go func(g *golimit.GoLimit, i int) {
            defer g.Done() //一个并发协程已经完成
            time.Sleep(time.Second * 2)
            log.Println(i, "done")
        }(g, i)
    }
    log.Println("循环结束")
    time.Sleep(time.Second * 3)//等待执行完成
    log.Println("测试结束")
}

1.2评估总结

优点:此方案的实现逻辑简单明了,易理解、易维护。若能满足需求,在一般的场景下,此方案为首选。

隐忧:使用通道的缓冲区的大小来表示最大可并发数,在允许并发数较大,如几千几万甚至更大的情况下,通道的性能和内存的负载是否会有问题,我不太清楚,若哪位朋友知道请告知一下。

不足:运行中难以调整最大可并发数。而在某些场景下是有这种需求的,如A服务依赖的B服务有扩容或缩减,但A服务不能停止,需要调整请求B服务接口的最大可并发数。二、使用锁实现协程并发数量限制2.1方案详情

同样先上代码(注:此代码我已经在github上开源https://github.com/zh-five/golimit

// 协程并发数限制库
package golimit
import (
    "sync"
)
type GoLimit struct {
    max       uint             //并发最大数量
    count     uint             //当前已有并发数
    isAddLock bool             //是否已锁定增加
    zeroChan  chan interface{} //为0时广播
    addLock   sync.Mutex       //(增加并发数的)锁
    dataLock  sync.Mutex       //(修改数据的)锁
}
func NewGoLimit(max uint) *GoLimit {
    return &GoLimit{max: max, count: 0, isAddLock: false, zeroChan: nil}
}
//并发计数加1.若 计数>=max_num, 则阻塞,直到 计数<max_num
func (g *GoLimit) Add() {
    g.addLock.Lock()
    g.dataLock.Lock()
    g.count += 1
    if g.count < g.max { //未超并发时解锁,后续可以继续增加
        g.addLock.Unlock()
    } else { //已到最大并发数, 不解锁并标记. 等数量减少后解锁
        g.isAddLock = true
    }
    g.dataLock.Unlock()
}
//并发计数减1
//若计数<max_num, 可以使原阻塞的Add()快速解除阻塞
func (g *GoLimit) Done() {
    g.dataLock.Lock()
    g.count -= 1
    //解锁
    if g.isAddLock == true && g.count < g.max {
        g.isAddLock = false
        g.addLock.Unlock()
    }
    //0广播
    if g.count == 0 && g.zeroChan != nil {
        close(g.zeroChan)
        g.zeroChan = nil
    }
    g.dataLock.Unlock()
}
//更新最大并发计数为, 若是调大, 可以使原阻塞的Add()快速解除阻塞
func (g *GoLimit) SetMax(n uint) {
    g.dataLock.Lock()
    g.max = n
    //解锁
    if g.isAddLock == true && g.count < g.max {
        g.isAddLock = false
        g.addLock.Unlock()
    }
    //加锁
    if g.isAddLock == false && g.count >= g.max {
        g.isAddLock = true
        g.addLock.Lock()
    }
    g.dataLock.Unlock()
}
//若当前并发计数为0, 则快速返回; 否则阻塞等待,直到并发计数为0
func (g *GoLimit) WaitZero() {
    g.dataLock.Lock()
    //无需等待
    if g.count == 0 {
        g.dataLock.Unlock()
        return
    }
    //无广播通道, 创建一个
    if g.zeroChan == nil {
        g.zeroChan = make(chan interface{})
    }
    //复制通道后解锁, 避免从nil读数据
    c := g.zeroChan
    g.dataLock.Unlock()
    <-c
}
//获取并发计数
func (g *GoLimit) Count() uint {
    return g.count
}
//获取最大并发计数
func (g *GoLimit) Max() uint {
    return g.max
}

总共使用了两把锁,一把是数据锁(dataLock),用来锁定数据,保证数据修改安全,加锁解锁是在修改数据前后进行的;另一把是增加能否增加协程的锁(addLock),增加协程时必须先加锁,加锁成功后修改并发数,若并发数小于最大可并发数,则解锁,否则不解锁,促使后续增加协程的加锁操作阻塞,从而限制协程的并发数。使用示例如下:

package main
import (
    "github.com/zh-five/golimit"
    "log"
    "time"
)
func main() {
    log.Println("开始测试...")
    g := golimit.NewGoLimit(2) //max_num(最大允许并发数)设置为2
    for i := 0; i < 10; i++ {
        //并发计数加1.若 计数>=max_num, 则阻塞,直到 计数<max_num
        g.Add()
        //运行过程中可以随时修改最大可并发数据
        //g.SetMax(3)
        go func(g *golimit.GoLimit, i int) {
            defer g.Done() //并发计数减1
            time.Sleep(time.Second * 2)
            log.Println(i, "done")
        }(g, i)
    }
    log.Println("循环结束")
    g.WaitZero() //阻塞, 直到所有并发都完成
    log.Println("测试结束")
}

方案2的GoLimit除了增加了SetMax()方法用于修改最大可并发数。出于好玩和偷懒增加了一个WaitZero()方法(其实外部使用sync.WaitGroup也可以快速实现此功能),用于阻塞等待所有并发协程都执行完成。大约可以用于如下场景:有一大批url需要有限制的并发采集数据,主程序里只需要简单的调用一下WaitZero()方法,就可以阻塞等等所有采集的协程完成。

2.2评估总结

  • 优点: 从实现逻辑上说,可以确定性能和消耗不会随着最大可并发数增加而线性增加。另外还有很多可扩展的想象。
  • 缺点:实现逻辑比较复杂

其它

其实我很想对比测试一下两种方案的性能,特别是最大可并发比较大时。但我一直没有找到一种好的测试方法,若哪个朋友有方法或思路,欢迎交流。

到此这篇关于go语言限制协程并发数的方案详情的文章就介绍到这了,更多相关go限制协程并发数内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • golang实现并发数控制的方法

    golang并发 谈到golang这门语言,很自然的想起了他的的并发goroutine.这也是这门语言引以为豪的功能点.并发处理,在某种程度上,可以提高我们对机器的使用率,提升系统业务处理能力.但是并不是并发量越大越好,太大了,硬件环境就会吃不消,反而会影响到系统整体性能,甚至奔溃.所以,在使用golang提供便捷的goroutine时,既要能够实现开启并发,也要学会如果控制并发量. 开启golang并发 golang开启并发处理非常简单,只需要在调用函数时,在函数前边添加上go关键字即可.如下

  • golang实现多协程下载文件(支持断点续传)

    引言 写这篇文章主要是周末休息太无聊,看了看别人代码,发现基本上要么是多协程下载文件要么就只有单协程的断点续传,所以就试了试有进度条的多协程下载文件(支持断点续传) package main import ( "fmt" "io" "os" "regexp" "strconv" "sync" "github.com/qianlnk/pgbar" ) /** * 需求:

  • Go 协程超时控制的实现

    目录 Go 协程超时控制 Select 超时控制 go timer 计时器 go context Go 协程超时控制 Select 阻塞方式 Context 方式 先说个场景: 假设业务中 A 服务需要调用 服务B,要求设置 5s 超时,那么如何优雅实现? Select 超时控制 考虑是否可以用 select + time.After 方式进行实现 这里主要利用的是通道在携程之间通信的特点,当程序调用成功后,会向通道中发送信号.没调用成功前,通道会阻塞. select { case res :=

  • GO实现协程池管理的方法

    使用channel实现协程池 通过 Channel 实现 Goroutine Pool,缺点是会造成协程的频繁开辟和注销,但好在简单灵活通用. package main import ( "fmt" "io/ioutil" "net/http" "sync" ) // Pool goroutine Pool type Pool struct { queue chan int wg *sync.WaitGroup } // Ne

  • go语言限制协程并发数的方案详情

    目录 前言 一.使用带缓冲的通道限制并发数 1.1方案详情 1.2评估总结 2.2评估总结 其它 前言 在使用协程并发处理某些任务时, 其并发数量往往因为各种因素的限制不能无限的增大. 例如网络请求.数据库查询等等.从运行效率角度考虑,在相关服务可以负载的前提下(限制最大并发数),尽可能高的并发.本文就这个问题探寻一下解决方案和实现.共两种思路,一是使用带缓冲的通道实现,二是使用锁实现. 一.使用带缓冲的通道限制并发数 1.1方案详情 先上代码如下, 逻辑很简单. package golimit

  • python 协程并发数控制

    目录 多线程之信号量 协程中使用信号量控制并发 aiohttp 中 TCPConnector 连接池 前言: 本篇博客要采集的站点:[看历史,通天下-历史剧网] 目标数据是该站点下的热门历史事件,列表页分页规则如下所示: http://www.lishiju.net/hotevents/p0 http://www.lishiju.net/hotevents/p1 http://www.lishiju.net/hotevents/p2 首先我们通过普通的多线程,对该数据进行采集,由于本文主要目的是

  • python编程使用协程并发的优缺点

    协程 协程是一种用户态的轻量级线程,又称微线程. 协程拥有自己的寄存器上下文和栈,调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈.因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置. 优点: 1.无需线程上下文切换的开销 2.无需原子操作锁定及同步的开销 3.方便切换控制流,简化编程模型 4.高并发+高扩展性+低成本:一个CPU支持上万的协程都不

  • 详解Go多协程并发环境下的错误处理

    引言 在Go语言中,我们通常会用到panic和recover来抛出错误和捕获错误,这一对操作在单协程环境下我们正常用就好了,并不会踩到什么坑.但是在多协程并发环境下,我们常常会碰到以下两个问题.假设我们现在有2个协程,我们叫它们协程A和B好了: 如果协程A发生了panic,协程B是否会因为协程A的panic而挂掉? 如果协程A发生了panic,协程B是否能用recover捕获到协程A的panic? 答案分别是:会.不能. 那么下面我们来一一验证,并给出在具体的业务场景下的最佳实践. 问题一 如果

  • python3通过gevent.pool限制协程并发数量的实现方法

    协程虽然是轻量级的线程,但到达一定数量后,仍然会造成服务器崩溃出错.最好的方法通过限制协程并发数量来解决此类问题. server代码: #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Cain # @Email : 771535427@qq.com # @Filename : gevnt_sockserver.py # @Last modified : 2017-11-24 16:31 # @Description : impo

  • C语言中实现协程案例

    协程是一种用户空间的非抢占式线程,主要用来解决等待大量的IO操作的问题. 协程vs线程 对比使用多线程来解决IO阻塞任务,使用协程的好处是不用加锁,访问共享的数据不用进行同步操作.这里需要说明的一点是,使用协程之所以不需要加锁不是因为所有的协程只在一个线程中运行,而是因为协程的非抢占式的特点.也就是说,使用协程的话,在没主动交出CPU之前都是不会被突然切换到其它协程上的.而线程是抢占式的,使用多线程你是不能确定你的线程什么时候被操作系统调度,什么时候被切换,因此需要用锁到实现一种"原子操作&qu

  • go语言中的协程详解

    协程的特点 1.该任务的业务代码主动要求切换,即主动让出执行权限 2.发生了IO,导致执行阻塞(使用channel让协程阻塞) 与线程本质的不同 C#.java中我们执行多个线程,是通过时间片切换来进行的,要知道进行切换,程序需要保存上下文等信息,是比较消耗性能的 GO语言中的协程,没有上面这种切换,一定是通过协程主动放出权限,不是被动的. 例如: C# 中创建两个线程 可以看到1和2是交替执行的 Go语言中用协程实现一下 runtime.GOMAXPROCS(1) 这个结果就是 执行了1 在执

  • Kotlin协程与并发深入全面讲解

    目录 协程与并发 1.协程并发问题 2.协程处理并发的手段 协程与并发 Kotlin协程是基于线程执行的.经过一层封装以后,Kotlin协程面对并发,处理方式与Java不同. 在java的世界里,并发往往是多个线程一起工作,存在共享的变量.需要处理好同步问题.要避免把协程与线程的概念混淆. runBlocking { var i = 0 launch(Dispatchers.Default) { repeat(1000) { i++ } } delay(1000L) println(i) } L

  • 深入理解 Java、Kotlin、Go 的线程和协程

    前言 Go 语言比 Java 语言性能优越的一个原因,就是轻量级线程Goroutines(协程Coroutine).本篇文章深入分析下 Java 的线程和 Go 的协程. 协程是什么 协程并不是 Go 提出来的新概念,其他的一些编程语言,例如:Go.Python 等都可以在语言层面上实现协程,甚至是 Java,也可以通过使用扩展库来间接地支持协程. 当在网上搜索协程时,我们会看到: Kotlin 官方文档说「本质上,协程是轻量级的线程」. 很多博客提到「不需要从用户态切换到内核态」.「是协作式的

  • 浅谈golang for 循环中使用协程的问题

    两个例子 package main import ( "fmt" "time" ) func Process1(tasks []string) { for _, task := range tasks { // 启动协程并发处理任务 go func() { fmt.Printf("Worker start process task: %s\n", task) }() } } func main() { tasks := []string{&quo

随机推荐