Go并发编程中使用channel的方法

目录
  • 一.设计原理
  • 二.数据结构
  • 三.创建管道
  • 四. 发送数据
    • 4.1 直接发送
    • 4.2 缓冲区
    • 4.3 阻塞发送
    • 4.4 小结
  • 五. 接收数据
    • 5.1 直接接收
    • 5.2 缓冲区
    • 5.3 阻塞接收
  • 六. 关闭channel
  • 七. 使用场景
    • 7.1 使用channel控制子协程
    • 7.2 通过关闭 channel 实现一对多的通知
    • 7.3 使用 channel 做异步编程
    • 7.4 超时控制
    • 7.5 协程池
  • 八. 参考

一.设计原理

Go 语言中最常见的、也是经常被人提及的设计模式就是:

"不要通过共享内存来通信,我们应该使用通信来共享内存"

通过共享内存来通信是直接读取内存的数据,而通过通信来共享内存,是通过发送消息的方式来进行同步。

而通过发送消息来同步的这种方式常见的就是 Go 采用的通信顺序进程 CSP(Communication Sequential Process) 模型以及 Erlang 采用的 Actor 模型,这两种方式都是通过通信来共享内存。

如下图所示

大部分的语言采用的都是第一种方式直接去操作内存,然后通过互斥锁,CAS 等操作来保证并发安全。Go 引入了 Channel 和 Goroutine 实现 CSP 模型来解耦这个操作。

优点:

在 Goroutine 当中我们就不用手动去做资源的锁定与释放,同时将生产者和消费者进行了解耦,Channel 其实和消息队列很相似。

缺点:

由于 Channel 底层也是通过这些低级的同步原语实现的,所以性能上会差一些,如果有极高的性能要求时也可以用 sync 包中提供的低级同步原语

先入先出

目前的 Channel 收发操作均遵循了先进先出的设计,具体规则如下:

  • 先从 Channel 读取数据的 Goroutine 会先接收到数据;
  • 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;

无锁管道

锁(Lock) 是一种常见的并发控制技术,我们一般会将锁分成乐观锁 和 悲观锁,即乐观并发控制和悲观并发控制,无锁(lock-free)队列更准确的描述是使用乐观并发控制的队列。乐观并发控制也叫乐观锁,很多人都会误以为乐观锁是与悲观锁差不多,然而它并不是真正的锁,只是一种并发控制的思想.

乐观并发控制本质上是基于验证的协议,我们使用原子指令 CAS(compare-and-swap 或者 compare-and-set)在多线程中同步数据,无锁队列的实现也依赖这一原子指令。

从某种程度上说,Channel 是一个用于同步和通信的有锁队列,使用互斥锁解决程序中可能存在的线程竞争问题

Go 语言社区也在 2014 年提出了无锁 Channel 的实现方案,该方案将 Channel 分成了以下三种类型:

同步 Channel — 无缓冲区,发送方会直接将数据交给(Handoff)接收方

异步channel: 基于环形缓存的传统生产者消费者模型;

chan struct{} 类型的异步 Channel — struct{} 类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义;

二.数据结构

Go 语言的 Channel 在运行时使用 runtime.hchan 结构体表示。我们在 Go 语言中创建新的 Channel 时,实际上创建的都是如下所示的结构:

type hchan struct {
	qcount   uint           // 队列中元素总数量
	dataqsiz uint           // 循环队列的长度
	buf      unsafe.Pointer // 指向长度为 dataqsiz 的底层数组,只有在有缓冲时这个才有意义
	elemsize uint16         // 能够发送和接受的元素大小
	closed   uint32         // 是否关闭
	elemtype *_type         // 元素的类型
	sendx    uint           // 当前已发送的元素在队列当中的索引位置
	recvx    uint           // 当前已接收的元素在队列当中的索引位置
	recvq    waitq          // 接收 Goroutine 链表
	sendq    waitq          // 发送 Goroutine 链表

	lock mutex              // 互斥锁
}

// waitq 是一个双向链表,里面保存了 goroutine
type waitq struct {
	first *sudog
	last  *sudog
}

如下图所示,channel 底层其实是一个循环队列

三.创建管道

Go 语言中所有 Channel 的创建都会使用 make 关键字。创建的表达式使用 make(chan T, cap) 来创建 channel.

如果不向 make 传递表示缓冲区大小的参数,那么就会设置一个默认值 0,也就是当前的 Channel 不存在缓冲区。

四. 发送数据

当想要向 Channel 发送数据时,就需要使用 ch <- i 语句.

在发送数据的逻辑执行之前会先为当前 Channel 加锁,防止多个线程并发修改数据。

如果 Channel 已经关闭,那么向该 Channel 发送数据时会报 “send on closed channel” 错误并中止程序。

4.1 直接发送

如果 Channel 没有被关闭并且已经有处于读等待的 Goroutine,会取出最先陷入等待的 Goroutine 并直接向它发送数据:

直接发送的过程称为两个部分:

  • 调用 runtime.sendDirect将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上;
  • 调用 runtime.goready 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;

需要注意的是,发送数据的过程只是将接收方的 Goroutine 放到了处理器的 runnext 中,程序没有立刻执行该 Goroutine。

4.2 缓冲区

如果创建的 Channel 包含缓冲区并且 Channel 中的数据没有装满,会使用 runtime.chanbuf 计算出下一个可以存储数据的位置,然后通过 runtime.typedmemmove 将发送的数据拷贝到缓冲区中并增加 sendx 索引和 qcount 计数器。

4.3 阻塞发送

当 Channel 没有接收者能够处理数据时,向 Channel 发送数据会被下游阻塞,当然使用 select 关键字可以向 Channel 非阻塞地发送消息。

4.4 小结

可以简单梳理和总结一下使用 ch <- i 表达式向 Channel 发送数据时遇到的几种情况:

  • 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;
  • 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上;
  • 如果不满足上面的两种情况,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;

五. 接收数据

可以使用两种不同的方式去接收 Channel 中的数据:

i <- ch

i, ok <- ch

5.1 直接接收

会根据缓冲区的大小分别处理不同的情况

如果 Channel 不存在缓冲区,直接从发送者那里把数据拷贝给接收变量如果是有缓冲 channel将队列中的数据拷贝到接收方的内存地址;将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方;

5.2 缓冲区

当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中 的索引位置中取出数据进行处理:

5.3 阻塞接收

当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞的,然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作:

六. 关闭channel

使用 close(ch) 来关闭 channel 最后会调用 runtime 中的 closechan 方法.

关闭一个 nil 的 channel 和已关闭了的 channel 都会导致 panic关闭 channel 后会释放所有因为 channel 而阻塞的 Goroutine

七. 使用场景

channel一般用于协程之间的通信,channel也可以用于并发控制。比如主协程启动N个子协程,主协程等待所有子协程退出后再继续后续流程,这种场景下channel也可轻易实现。

7.1 使用channel控制子协程

package main

import (
    "time"
    "fmt"
)

func Process(ch chan int) {
    //Do some work...
    time.Sleep(time.Second)

    ch <- 1 //管道中写入一个元素表示当前协程已结束
}

func main() {
    channels := make([]chan int, 10) //创建一个10个元素的切片,元素类型为channel

    for i:= 0; i < 10; i++ {
        channels[i] = make(chan int) //切片中放入一个channel
        go Process(channels[i])      //启动协程,传一个管道用于通信
    }

    for i, ch := range channels {  //遍历切片,等待子协程结束
        <-ch
        fmt.Println("Routine ", i, " quit!")
    }
}

输出:

Routine  0  quit!

Routine  1  quit!

Routine  2  quit!

Routine  3  quit!

Routine  4  quit!

Routine  5  quit!

Routine  6  quit!

Routine  7  quit!

Routine  8  quit!

Routine  9  quit!

上面程序通过创建N个channel来管理N个协程,每个协程都有一个channel用于跟父协程通信,父协程创建完所有协程后等待所有协程结束。

这个例子中,父协程仅仅是等待子协程结束,其实父协程也可以向管道中写入数据通知子协程结束,这时子协程需要定期地探测管道中是否有消息出现。

7.2 通过关闭 channel 实现一对多的通知

关闭 channel 时会释放所有阻塞的 Goroutine,所以我们就可以利用这个特性来做一对多的通知,除了一对多之外我们还用了 done 做了多对一的通知,当然多对一这种情况还是建议直接使用 WaitGroup 即可

package main

import (
	"fmt"
	"time"
)

func run(stop <-chan struct{}, done chan<- struct{}) {

	// 每一秒打印一次
	for {
		select {
		case <-stop:
			fmt.Println("stop...")
			// 接收到停止后,向 done 管道中发送数据,然后退出函数
			done <- struct{}{}
			return
		// 超时1秒将输出hello
		case <-time.After(time.Second):
			fmt.Println("hello...")
		}
	}
}

func main() {
	// 一对多,使用无缓冲通道,当关闭chan后,其他程序中接收到关闭信号后会统一执行操作
	stop := make(chan struct{})

	// 多对一,当关闭后,关闭一个chan, 写入一个数据到管道中
	done := make(chan struct{}, 10)

	for i := 0; i < 10; i++ {
		go run(stop, done)
	}

	// 模拟超时时间
	time.Sleep(5 * time.Second)
	close(stop)

	for i := 0; i < 10; i++ {
		<-done
	}
}

输出:

hello...

hello...

hello...

...

hello..

stop...

stop...

stop...

stop...

stop...

stop...

stop...

stop...

stop...

stop...

7.3 使用 channel 做异步编程

利用无缓冲channel,接收早于发送的特点,只有当数据写入后,接收才能完成实现数据一致性

package main

import (
	"fmt"
)

// 这里只能读
func read(c <-chan int) {
	fmt.Println("read:", <-c)
}

// 这里只能写
func write(c chan<- int) {
	c <- 0
}

func main() {
	c := make(chan int)
	go write(c)
	read(c)
}

7.4 超时控制

超时控制还是建议使用 context

func run(stop <-chan struct{}, done chan<- struct{}) {
	// 每一秒打印一次 hello
	for {
		select {
		case <-stop:
			fmt.Println("stop...")
			done <- struct{}{}
			return
		case <-time.After(time.Second):
			fmt.Println("hello")
		}
	}
}

7.5 协程池

根据控制Channel的缓存大小来控制并发执行的Goroutine的最大数目

var limit = make(chan int, 3)

func main() {
    for _, w := range work {
        go func() {
            limit <- 1
            w()
            <-limit
        }()
    }
    select{}
}

最后一句select{}是一个空的管道选择语句,该语句会导致main线程阻塞,从而避免程序过早退出。还有for{}<-make(chan int)等诸多方法可以达到类似的效果。因为main线程被阻塞了,如果需要程序正常退出的话可以通过调用os.Exit(0)实现。

八. 参考

https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/

https://www.topgoer.cn/docs/gozhuanjia/chapter055.1-channel

https://lailin.xyz/post/go-training-week3-channel.html

https://chai2010.cn/advanced-go-programming-book/ch1-basic/ch1-05-mem.html

到此这篇关于Go并发编程中使用channel的方法的文章就介绍到这了,更多相关Go channel使用内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Django实现WebSocket在线聊天室功能(channels库)

    1.Django实现WebSocket在线聊天室 1.1 安装 pip install channels==2.3 (saas) F:\Desktop\Python_Study\CHS-Tracer\saas>pip install channels==2.3 Looking in indexes: http://mirrors.aliyun.com/pypi/simple/ Collecting channels==2.3   Downloading   ... Successfully in

  • Go并发编程之sync.Once使用实例详解

    目录 一.序 二. 源码分析 2.1结构体 2.2 接口 三. 使用场景案例 3.1 单例模式 3.2 加载配置文件示例 四.总结 五. 参考 一.序 单从库名大概就能猜出其作用.sync.Once使用起来很简单, 下面是一个简单的使用案例 package main import ( "fmt" "sync" ) func main() { var ( once sync.Once wg sync.WaitGroup ) for i := 0; i < 10;

  • Go并发编程实现数据竞争

    目录 1.前言 2.数据竞争 2.1 示例一 2.2 循环中使用goroutine引用临时变量 2.3 引起变量共享 2.4 不受保护的全局变量 2.5 未受保护的成员变量 2.6 接口中存在的数据竞争 3. 总结 4 参考 1.前言 虽然在 go 中,并发编程十分简单, 只需要使用 go func() 就能启动一个 goroutine 去做一些事情,但是正是由于这种简单我们要十分当心,不然很容易出现一些莫名其妙的 bug 或者是你的服务由于不知名的原因就重启了. 而最常见的bug是关于线程安全

  • Go语言并发编程之互斥锁Mutex和读写锁RWMutex

    目录 一.互斥锁Mutex 1.Mutex介绍 2.Mutex使用实例 二.读写锁RWMutex 1.RWMutex介绍 2.RWMutex使用实例 在并发编程中,多个Goroutine访问同一块内存资源时可能会出现竞态条件,我们需要在临界区中使用适当的同步操作来以避免竞态条件.Go 语言中提供了很多同步工具,本文将介绍互斥锁Mutex和读写锁RWMutex的使用方法. 一.互斥锁Mutex 1.Mutex介绍 Go 语言的同步工具主要由 sync 包提供,互斥锁 (Mutex) 与读写锁 (R

  • Go并发控制Channel使用场景分析

    1. 前言 channel一个类型管道,通过它可以在goroutine之间发送和接收消息.它是Golang在语言层面提供的goroutine间的通信方式. Channel是Go中的一个核心类型,你可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(communication). 它的操作符是箭头 <- . 我们考虑这么一种场景,协程A执行过程中需要创建子协程A1.A2.A3-An,协程A创建完子协程后就等待子协程退出. 针对这种场景,GO提供了三种解决方案: Channel:

  • Golang中channel的原理解读(推荐)

    数据结构 channel的数据结构在$GOROOT/src/runtime/chan.go文件下: type hchan struct { qcount uint // 当前队列中剩余元素个数 dataqsiz uint // 环形队列长度,即可以存放的元素个数 buf unsafe.Pointer // 环形队列指针 elemsize uint16 // 每个元素的大小 closed uint32 // 标记是否关闭 elemtype *_type // 元素类型 sendx uint //

  • Go语言并发编程 互斥锁详情

    目录 1.互斥锁Mutex 1.1 Mutex介绍 1.2 Mutex使用实例 2.读写锁RWMutex 2.1 RWMutex介绍 2.2 RWMutex使用实例 1.互斥锁Mutex 1.1 Mutex介绍 Go 语言的同步工具主要由 sync 包提供,互斥锁 (Mutex) 与读写锁 (RWMutex) 就是sync 包中的方法. 互斥锁可以用来保护一个临界区,保证同一时刻只有一个 goroutine 处于该临界区内.主要包括锁定(Lock方法)和解锁(Unlock方法)两个操作,首先对进

  • 再次探讨go实现无限 buffer 的 channel方法

    前言 总所周知,go 里面只有两种 channel,一种是 unbuffered channel, 其声明方式为 ch := make(chan interface{}) 另一种是 buffered channel,其声明方式为 bufferSize := 5 ch := make(chan interface{},bufferSize) 对于一个 buffered channel,无论它的 buffer 有多大,它终究是有极限的.这个极限就是该 channel 最初被 make 时,所指定的

  • Go并发编程中使用channel的方法

    目录 一.设计原理 二.数据结构 三.创建管道 四. 发送数据 4.1 直接发送 4.2 缓冲区 4.3 阻塞发送 4.4 小结 五. 接收数据 5.1 直接接收 5.2 缓冲区 5.3 阻塞接收 六. 关闭channel 七. 使用场景 7.1 使用channel控制子协程 7.2 通过关闭 channel 实现一对多的通知 7.3 使用 channel 做异步编程 7.4 超时控制 7.5 协程池 八. 参考 一.设计原理 Go 语言中最常见的.也是经常被人提及的设计模式就是: "不要通过共

  • Java 并发编程中如何创建线程

    简介 线程是基本的调度单位,它被包含在进程之中,是进程中的实际运作单位,它本身是不会独立存在.一个进程至少有一个线程,进程中的多个线程共享进程的资源. Java中创建线程的方式有多种如继承Thread类.实现Runnable接口.实现Callable接口以及使用线程池的方式,线程池将在后面文章中单独介绍,这里先介绍另外三种方式. 继承Thread类 优点:在run方法里可以用this获取到当前线程. 缺点:由于Java不支持多继承,所以如果继承了Thread类后就不能再继承其他类. public

  • 浅析Java 并发编程中的synchronized

    synchronized关键字,我们一般称之为"同步锁",用它来修饰需要同步的方法和需要同步代码块,默认是当前对象作为锁的对象.在用synchronized修饰类时(或者修饰静态方法),默认是当前类的Class对象作为锁的对象,故存在着方法锁.对象锁.类锁这样的概念. 一.没有设置线程同步的情况 先给出以下代码感受下代码执行的时候为什么需要同步?代码可能比较枯燥,配上业务理解起来就会舒服很多,学生军训,有三列,每列5人,需要报数,每个线程负责每一列报数. class Synchroni

  • Go并发编程中sync/errGroup的使用

    目录 一.序 二.errGroup 2.1 函数签名 三.源码 3.1 Group 3.2 WaitContext 3.3 Go 3.4 Wait 四. 案例 五. 参考 一.序 这一篇算是并发编程的一个补充,起因是当前有个项目,大概の 需求是,根据kafka的分区(partition)数,创建同等数量的 消费者( goroutine)从不同的分区中消费者消费数据,但是总有某种原因导致,某一个分区消费者创建失败,但是其他分区消费者创建失败. 最初的逻辑是,忽略分区失败的逻辑,将成功创建的分区消费

  • java并发编程中ReentrantLock可重入读写锁

    目录 一.ReentrantLock可重入锁 二.ReentrantReadWriteLock读写锁 三.读锁之间不互斥 一.ReentrantLock可重入锁 可重入锁ReentrantLock 是一个互斥锁,即同一时间只有一个线程能够获取锁定资源,执行锁定范围内的代码.这一点与synchronized 关键字十分相似.其基本用法代码如下: Lock lock = new ReentrantLock(); //实例化锁 //lock.lock(); //上锁 boolean locked =

  • java编程中实现调用js方法分析

    本文实例讲述了java编程中实现调用js方法.分享给大家供大家参考,具体如下: /* * 加载脚本引擎,并在java中调用js方法 */ public void test2() { ScriptEngineManager manager = new ScriptEngineManager(); ScriptEngine engine = manager.getEngineByName("javascript"); try { String str="2&1"

  • Android编程中图片特效处理方法小结

    本文实例总结了Android编程中图片特效处理方法.分享给大家供大家参考,具体如下: 这里介绍的Android图片处理方法包括: 转换 -  drawable To  bitmap 缩放 -  Zoom 圆角 -  Round Corner 倒影 -  Reflected bitmapPrcess  code: package com.learn.games; import android.graphics.Bitmap; import android.graphics.Canvas; impo

  • Android编程中黑名单的实现方法

    本文实例讲述了Android编程中黑名单的实现方法.分享给大家供大家参考,具体如下: 说明:由于挂断电话android   api不是对外开放的,所以需要使用反射的方法得到拨打电话的服务. 1.将android源代码中的"aidl"文件拷贝到项目中 这样项目中会生成两个包:android.telephony:此包中文件为:NeighboringCellInfo.aidl com.android.internal.telephony;此包中文件为:ITelephony.aidl 2.通过

  • Android编程中光线传感器的调用方法详解

    本文实例讲述了Android编程中光线传感器的调用方法.分享给大家供大家参考,具体如下: 1.activity如果要使用传感器,就必须实现SensorEventListener接口 2.得到传感器管理对象(sensormanager) 3.使用sensormanager.registerlistener 方法注册指定的传感器 4.在sensoreventlistener 接口中的onsensorchanged和onaccuracychanged方法中完成其他具体工作 public class T

  • 深入分析java并发编程中volatile的实现原理

    引言 在多线程并发编程中synchronized和Volatile都扮演着重要的角色,Volatile是轻量级的synchronized,它在多处理器开发中保证了共享变量的"可见性".可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值.它在某些情况下比synchronized的开销更小,本文将深入分析在硬件层面上Inter处理器是如何实现Volatile的,通过深入分析能帮助我们正确的使用Volatile变量. 术语定义 术语 英文单词 描述 共享变量 在多个线

随机推荐