golang 并发编程之生产者消费者详解

golang 最吸引人的地方可能就是并发了,无论代码的编写上,还是性能上面,golang 都有绝对的优势

学习一个语言的并发特性,我喜欢实现一个生产者消费者模型,这个模型非常经典,适用于很多的并发场景,下面我通过这个模型,来简单介绍一下 golang 的并发编程

go 并发语法

协程 go

协程是 golang 并发的最小单元,类似于其他语言的线程,只不过线程的实现借助了操作系统的实现,每次线程的调度都是一次系统调用,需要从用户态切换到内核态,这是一项非常耗时的操作,因此一般的程序里面线程太多会导致大量的性能耗费在线程切换上。而在 golang 内部实现了这种调度,协程在这种调度下面的切换非常的轻量级,成百上千的协程跑在一个 golang 程序里面是很正常的事情

golang 为并发而生,启动一个协程的语法非常简单,使用 go 关键字即可

go func () {
    // do something
}

同步信号 sync.WaitGroup

多个协程之间可以通过 sync.WaitGroup 同步,这个类似于 Linux 里面的信号量

var wg sync.WaitGroup  // 申明一个信号量
wg.Add(1)   // 信号量加一
wg.Done()   // 信号量减一
wg.Wait()   // 信号量为正时阻塞,直到信号量为0时被唤醒

通道 chan

通道可以理解为一个消息队列,生产者往队列里面放,消费者从队列里面取。通道可以使用 close 关闭

ic := make(chan int, 10)  // 申明一个通道
ic <- 10        // 往通道里面放
i := <- ic      // 从通道里面取
close(ic)       // 关闭通道

生产者消费者实现

定义产品类

这个产品类根据具体的业务需求定义

type Product struct {
    name  int
    value int
}

生产者

如果 stop 标志不为 false,不断地往通道里面放 product,完成之后信号量完成

func producer(wg *sync.WaitGroup, products chan<- Product, name int, stop *bool) {
    for !*stop {
        product := Product{name: name, value: rand.Int()}
        products <- product
        fmt.Printf("producer %v produce a product: %#v\n", name, product)
        time.Sleep(time.Duration(200+rand.Intn(1000)) * time.Millisecond)
    }
    wg.Done()
}

消费者

不断地从通道里面取 product,然后作对应的处理,直到通道被关闭,并且 products 里面为空, for 循环才会终止,而这正是我们期望的

func consumer(wg *sync.WaitGroup, products <-chan Product, name int) {
    for product := range products {
        fmt.Printf("consumer %v consume a product: %#v\n", name, product)
        time.Sleep(time.Duration(200+rand.Intn(1000)) * time.Millisecond)
    }
    wg.Done()
}

主线程

var wgp sync.WaitGroup
var wgc sync.WaitGroup
stop := false
products := make(chan Product, 10)
// 创建 5 个生产者和 5 个消费者
for i := 0; i < 5; i++ {
    go producer(&wgp, products, i, &stop)
    go consumer(&wgc, products, i)
    wgp.Add(1)
    wgc.Add(1)
}
time.Sleep(time.Duration(1) * time.Second)
stop = true     // 设置生产者终止信号
wgp.Wait()      // 等待生产者退出
close(products) // 关闭通道
wgc.Wait()      // 等待消费者退出

补充:Go并发编程--通过channel实现生产者消费者模型

概述

生产者消费者模型是多线程设计的经典模型,该模型被广泛的应用到各个系统的多线程/进程模型设计中。

本文介绍了Go语言中channel的特性,并通过Go语言实现了两个生产者消费者模型。

channel的一些特性

在Go中channel是非常重要的协程通信的手段,channel是双向的通道,通过channel可以实现协程间数据的传递,通过channel也可以实现协程间的同步(后面会有介绍)。

本文介绍的生产者消费者模型主要用到了channel的以下特性:任意时刻只能有一个协程能够对channel中某一个item进行访问。

单生产者单消费者模型

把生产者和消费者都放到一个无线循环中,这个和我们的服务器端的任务处理非常相似。生产者不断的向channel中放入数据,而消费者不断的从channel中取出数据,并对数据进行处理(打印)。

由于生产者的协程不会退出,所以channel的写入会永久存在,这样当channel中没有放入数据时,消费者端将会阻塞,等待生产者端放入数据。

代码的实现如下:

package main
import (
    "fmt"
    "time"
)
var ch1 chan int = make(chan int)
var bufChan chan int = make(chan int, 1000)
var msgChan chan int = make(chan int)
func sum(a int, b int) {
    ch1 <- a + b
}
// write data to channel
func writer(max int) {
    for {
        for i := 0; i < max; i++ {  // 简单的向channel中放入一个整数
            bufChan <- i
            time.Sleep(1 * time.Millisecond)  //控制放入的频率
        }
    }
}
// read data fro m channel
func reader(max int) {
    for {
        r := <-bufChan
        fmt.Printf("read value: %d\n", r)
    }
    // 通知主线程,工作结束了,这一步可以省略
    msgChan <- 1
}
func testWriterAndReader(max int) {
    go writer(max)
    go reader(max)
    // writer 和reader的任务结束了,主线程会得到通知
    res := <-msgChan
    fmt.Printf("task is done: value=%d\n", res)
}
func main() {
    testWriterAndReader(100)
}

多生产者消费者模型

我们可以利用channel在某个时间点只能有一个协程能够访问其中的某一个数据,的特性来实现生产者消费者模型。由于channel具有这样的特性,我们在放数据和消费数据时可以不需要加锁。

package main
import (
    "time"
    "fmt"
    "os"
)
var ch1 chan int = make(chan int)
var bufChan chan int = make(chan int, 1000)
var msgChan chan string = make(chan string)
func sum(a int, b int) {
    ch1 <- a + b
}
// write data to channel
func writer(max int) {
    for {
        for i := 0; i < max; i++ {
            bufChan <- i
            fmt.Fprintf(os.Stderr, "%v write: %d\n", os.Getpid(), i)
            time.Sleep(10 * time.Millisecond)
        }
    }
}
// read data fro m channel
func reader(name string) {
    for {
        r := <-bufChan
        fmt.Printf("%s read value: %d\n", name, r)
    }
    msgChan <- name
}
func testWriterAndReader(max int) {
    // 开启多个writer的goroutine,不断地向channel中写入数据
    go writer(max)
    go writer(max)
    // 开启多个reader的goroutine,不断的从channel中读取数据,并处理数据
    go reader("read1")
    go reader("read2")
    go reader("read3")
    // 获取三个reader的任务完成状态
    name1 := <-msgChan
    name2 := <-msgChan
    name3 := <-msgChan
    fmt.Println("%s,%s,%s: All is done!!", name1, name2, name3)
}
func main() {
    testWriterAndReader(100)
}

输出如下:

read3 read value: 0

80731 write: 0

80731 write: 0

read1 read value: 0

80731 write: 1

read2 read value: 1

80731 write: 1

read3 read value: 1

80731 write: 2

read2 read value: 2

80731 write: 2

... ...

总结

本文通过channel实现了经典的生产者和消费者模型,利用了channel的特性。但要注意,当消费者的速度小于生产者时,channel就有可能产生拥塞,导致占用内存增加,所以,在实际场景中需要考虑channel的缓冲区的大小。

设置了channel的大小,当生产的数据大于channel的容量时,生产者将会阻塞,这些问题都是要在实际场景中需要考虑的。

一个解决办法就是使用一个固定的数组或切片作为环形缓冲区,而非channel,通过Sync包的机制来进行同步,实现生产者消费者模型,这样可以避免由于channel满而导致消费者端阻塞。

但,对于环形缓冲区而言,可能会覆盖老的数据,同样需要考虑具体的使用场景。关于环形缓冲区的原理和实现,在分析Sync包的使用时再进一步分析。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。如有错误或未考虑完全的地方,望不吝赐教。

(0)

相关推荐

  • 基于Golang 高并发问题的解决方案

    Golang 高并发问题的解决 Golang在高并发问题上,由于协程的使用,相对于其他编程语言,已经有了很大的优势,即相同的配置上,Golang可以以更低的代价处理更多的线程,同样的线程数,占用更低的资源!及时这样,只是解决了一部分问题而已,因为在每个协程里,处理逻辑还是会有问题. 高并发时,还是要考虑服务器所能承受的最大压力,数据库读取时的io问题,连接数问题,带宽问题等等 研究了一下并发解决方案,在此记录一下 参考文章:Handling 1 Million Requests per Minu

  • 解决golang 关于全局变量的坑

    学习golang不久,在定义全局变量的时候遇见了坑.写个小例子,增强记忆. 错误版本 var p int func main() { p, err := test(4) if err != nil { log.Fatal(err) } } func test(i int) (int, error) { return i + 1, nil } 编译一直不通过,p declared and not used.后来查了查资料,看见这种其实是在main里边又重新定义了p,所以一直提示p定义了但是没有使用

  • golang 对私有函数进行单元测试的实例

    在待测试的私有函数所在的包内,新建一个xx_test.go文件 书写方式如下: import ( "github.com/stretchr/testify/assert" "testing" ) var XXFunc = yourPrivateFunc func TestXXFunc(t *testing.T) { ret, ... := XXFunc(...) assert.Equal(t, ret, ...) } 就可以了~ 补充:golang test使用(简

  • 关于golang高并发的实现与注意事项说明

    一.并发的意义 并发的意义就是让 一个程序同时做多件事情,其目的只是为了能让程序同时做另一件事情而已,而不是为了让程序运行的更快(如果是多核处理器,而且任务可以分成相互独立的部分,那么并发确实可以让事情解决的更快). golang从语言级别上对并发提供了支持,而且在启动并发的方式上直接添加了语言级的关键字,不必非要按照固定的格式来定义线程函数,也不必因为启动线程的时候只能给线程函数传递一个参数而烦恼. 二.并发的启动 go的并发启动非常简单,几乎没有什么额外的准备工作,要并发的函数和一般的函数没

  • 使用golang编写一个并发工作队列

    其实golang用一个函数可以构建一个并发队列,现在编写一个灵活可控的队列程序 先定义一个工作 type Worker struct { ID int RepJobs chan int64 SM *SM quit chan bool } 包含了workid和执行任务的id,上面的SM只是任务具体内容,这个和具体业务相关,大家自己编写自己的SM业务逻辑 然后定义工作池 type workerPool struct { workerChan chan *Worker workerList []*Wo

  • 深入浅析golang zap 日志库使用(含文件切割、分级别存储和全局使用等)

    日志处理经常有以下几个需求: 1.不同级别的日志输出到不同的日志文件中. 2.日志文件按照文件大小或日期进行切割存储,以避免单一日志文件过大. 3.日志使用简单方便,一次定义全局使用. 建议使用使用Uber-go的Zap Logger,大神李文周大博客已经说的非常明确了,请先参考李老师的博客: https://www.liwenzhou.com/posts/Go/zap/ 问题二和问题三需要补充描述: 一.日志按照级别分文件切割存储 1.1 首先实现两个判断日志等级的interface info

  • Golang全局变量加锁的问题解决

    如果全局变量只读取 那自然是不需要加锁的 如果全局变量多进程读,多进程写,那自然是需要加读写锁的 但是如果全局变量只有一个进程写,其他进程读呢? 如果采用COW的方式,写进程只是通过单次赋值的方式来更新变量,是否就可以不加锁了呢? 就第三种情况而言: 当然我们通过 go build -race 或者 go run -race 就会出现 WARNING: DATA RACE. 但是出现 data race 就证明一定有问题么? 其实核心点在于这个赋值是否是原子的.也就是说是否存在 p1 = p2

  • golang 并发编程之生产者消费者详解

    golang 最吸引人的地方可能就是并发了,无论代码的编写上,还是性能上面,golang 都有绝对的优势 学习一个语言的并发特性,我喜欢实现一个生产者消费者模型,这个模型非常经典,适用于很多的并发场景,下面我通过这个模型,来简单介绍一下 golang 的并发编程 go 并发语法 协程 go 协程是 golang 并发的最小单元,类似于其他语言的线程,只不过线程的实现借助了操作系统的实现,每次线程的调度都是一次系统调用,需要从用户态切换到内核态,这是一项非常耗时的操作,因此一般的程序里面线程太多会

  • java并发学习之BlockingQueue实现生产者消费者详解

    1.介绍 阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满:从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空.并发包下很多高级同步类的实现都是基于BlockingQueue实现的. JDK7提供了以下7个阻塞队列: ArrayBlockingQueue :由数组结构组成的有界阻塞队列. LinkedBloc

  • Go语言并发编程基础上下文概念详解

    目录 前言 1 Go 中的 Context 2 Context 接口 3 Context Tree 4 创建上下文 4.1 上下文创建函数 4.2 Context 使用规范 4.3 Context 使用场景 5 总结 前言 相信大家以前在做阅读理解的时候,一定有从老师那里学一个技巧或者从参考答案看个:结合上下文.根据上下文我们能够找到有助于解题的相关信息,也能更加了解段落的思想. 在开发过程中,也有这个上下文(Context)的概念,而且上下文也必不可少,缺少上下文,就不能获取完整的程序信息.那

  • Java并发编程Semaphore计数信号量详解

    Semaphore 是一个计数信号量,它的本质是一个共享锁.信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可:当信号量中有可用的许可时,线程能获取该许可:否则线程必须等待,直到有可用的许可为止. 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量). 简单示例: package me.socketthread; import java.util.concurrent.ExecutorService;

  • Java并发编程总结——慎用CAS详解

    一.CAS和synchronized适用场景 1.对于资源竞争较少的情况,使用synchronized同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗cpu资源:而CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能. 2.对于资源竞争严重的情况,CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized.以java.util.concurrent.atomic包中AtomicInteger类为例,其getAn

  • Java并发编程预防死锁过程详解

    这篇文章主要介绍了Java并发编程预防死锁过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 在java并发编程领域已经有技术大咖总结出了发生死锁的条件,只有四个条件都发生时才会出现死锁: 1.互斥,共享资源X和Y只能被一个线程占用 2.占有且等待,线程T1已经取得共享资源X,在等待共享资源Y的时候,不释放共享资源X 3.不可抢占,其他线程不能强行抢占线程T1占有的资源 4.循环等待,线程T1等待线程T2占有的资源,线程T2等待线程T1占有

  • Java多线程之并发编程的核心AQS详解

    目录 一.AQS简介 1.1.AOS概念 1.2.AQS的核心思想 1.3.AQS是自旋锁 1.4.AQS支持两种资源分享的方式 二.AQS原理 2.1.同步状态的管理 2.2.等待队列 2.3.CLH队列中的结点 2.4.队列定义 2.5.AQS底层的CAS机制 2.6.通过ReentrantLock理解AQS 三.AQS方法 3.1.用户需要自己重写的方法 3.2.AQS 提供的一系列模板方法 3.3.acquire(int)方法 3.4.release(int)方法 3.5.acquire

  • Java并发编程之阻塞队列详解

    1.什么是阻塞队列? 队列是一种数据结构,它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素.阻塞队里与普通的队列的区别在于,普通队列不会对当前线程产生阻塞,在面对类似消费者-生产者模型时,就必须额外的实现同步策略以及线程间唤醒策略.使用阻塞队列,就会对当前线程产生阻塞,当队列是空时,从队列中获取元素的操作将会被阻塞,当队列是满时,往队列里添加元素的操作也会被阻塞. 2.主要的阻塞队列及其方法 java.util.concurrent包下提供主要的几种阻塞队列,主要有以下几个: 1

  • python 并发编程 多路复用IO模型详解

    多路复用IO(IO multiplexing) 这种IO方式为事件驱动IO(event driven IO). 我们都知道,select/epoll的好处就在于单个进程process就可以同时处理多个网络连接的IO.它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程.它的流程如图: select是多路复用的一种 当用户进程调用了select,那么整个进程会被block,而同时,kernel会"监视&qu

  • 高并发系统的限流详解及实现

    在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.本文结合作者的一些经验介绍限流的相关概念.算法和常规的实现方式. 缓存 缓存比较好理解,在大型高并发系统中,如果没有缓存数据库将分分钟被爆,系统也会瞬间瘫痪.使用缓存不单单能够提升系统访问速度.提高并发访问量,也是保护数据库.保护系统的有效方式.大型网站一般主要是"读",缓存的使用很容易被想到.在大型"写"系统中,缓存也常常扮演者非常重要的角色.比如累积一些数据批量写入,内存里面的缓存队列(生产消费),以及

随机推荐