Golang通道channel的源码分析

目录
  • 前言
  • channel基础结构
  • channel初始化
  • channel发送
  • channel接收
  • 小结

前言

channel是golang中标志性的概念之一,很好很强大!

channel(通道),顾名思义,是一种通道,一种用于并发环境中数据传递的通道。通常结合golang中另一重要概念goroutine(go协程)使用,使得在golang中的并发编程变得清晰简洁同时又高效强大。

今天尝试着读读golang对channel的实现源码,本文主要是自己个人对于Channel源码的学习笔记,需要的朋友可以参考以下内容,希望对大家有帮助。

channel基础结构

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx:    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

hchan结构就是channel的底层数据结构,看源码定义,可以说是非常清晰了。

  • qcount:channel缓存队列中已有的元素数量
  • dataqsiz:channel的缓存队列大小(定义channel时指定的缓存大小,这里channel用的是一个环形队列)
  • buf:指向channel缓存队列的指针
  • elemsize:通过channel传递的元素大小
  • closed:channel是否关闭的标志
  • elemtype:通过channel传递的元素类型
  • sendx:channel中发送元素在队列中的索引
  • recvx:channel中接受元素在队列中的索引
  • recvq:等待从channel中接收元素的协程列表
  • sendq:等待向channel中发送元素的协程列表
  • lock:channel上的锁

其中关于recvq和sendq的两个列表所用的结构waitq简单看下。

type waitq struct {
    first *sudog
    last  *sudog
}
type sudog struct {
    g          *g
    selectdone *uint32 // CAS to 1 to win select race (may point to stack)
    next       *sudog
    prev       *sudog
    elem       unsafe.Pointer // data element (may point to stack)
...
    c           *hchan // channel
}

可以看出waiq是一个双向链表结构,链上的节点是sudog。从sudog的结构定义可以粗略看出,sudog是对g(即协程)的一个封装。用于记录一个等待在某个channel上的协程g、等待的元素elem等信息。

channel初始化

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem
    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
    if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
        panic(plainError("makechan: size out of range"))
    }
    var c *hchan
    if elem.kind&kindNoPointers != 0 || size == 0 {
        // Allocate memory in one call.
        // Hchan does not contain pointers interesting for GC in this case:
        // buf points into the same allocation, elemtype is persistent.
        // SudoG's are referenced from their owning thread so they can't be collected.
        // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        if size > 0 && elem.size != 0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            // race detector uses this location for synchronization
            // Also prevents us from pointing beyond the allocation (see issue 9401).
            c.buf = unsafe.Pointer(c)
        }
    } else {
        c = new(hchan)
        c.buf = newarray(elem, int(size))
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}

第一部分的3个if是对初始化参数的合法性检查。

if elem.size >= 1<<16:

检查channel元素大小,小于2字节

if hchanSize%maxAlign != 0 || elem.align > maxAlign

没看懂(对齐?)

if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size)

第一个判断缓存大小需要大于等于0

int64(uintptr(size)) != size这一句实际是用于判断size是否为负数。由于uintptr实际是一个无符号整形,负数经过转换后会变成一个与原数完全不同的很大的正整数,而正数经过转换后并没有变化。

最后一句判断channel的缓存大小要小于heap中能分配的大小。_MaxMem是可分配的堆大小。

第二部分是具体的内存分配。

元素类型为kindNoPointers的时候,既非指针类型,则直接分配(hchanSize+uintptr(size)*elem.size)大小的连续空间。c.buf指向hchan后面的elem队列首地址。

如果channel缓存大小为0,则c.buf实际上是没有给他分配空间的

如果类型为非kindNoPointers,则channel的空间和buf的空间是分别分配的。

channel发送

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
}

channel发送,即协程向channel中发送数据,与此操作对应的go代码如c <- x。

channel发送的实现源码中,通过chansend1(),调用chansend(),其中block参数为true。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }
...
}

chansend()首先对c进行判断, if c == nil:即channel没有被初始化,这个时候会直接调用gopark使得当前协程进入等待状态。而且用于唤醒的参数unlockf传的nil,即没有人来唤醒它,这样系统进入死锁。所以channel必须被初始化之后才能使用,否则死锁。

接下来是正式的发送处理,且后续操作会加锁。

lock(&c.lock)

close判断

if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

如果channel已经是closed状态,解锁然后直接panic。也就是说我们不可以向已经关闭的通道内在发送数据。

将数据发给接收协程

if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

尝试从接收等待协程队列中取出一个协程,如果有则直接数据发给它。也就是说发送到channel的数据会优先检查接收等待队列,如果有协程等待取数,就直接给它。发完解锁,操作完成。

这里send()方法会将数据写到从队列里取出来的sg中,通过goready()唤醒sg.g(即等待的协程),进行后续处理。

数据放到缓存

if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

如果没有接收协程在等待,则去检查channel的缓存队列是否还有空位。如果有空位,则将数据放到缓存队列中。

通过c.sendx游标找到队列中的空余位置,然后将数据存进去。移动游标,更新数据,然后解锁,操作完成。

if c.sendx == c.dataqsiz {
        c.sendx = 0
    }

通过这一段游标的处理可以看出,缓存队列是一个环形。

阻塞发送协程

gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

如果缓存也慢了,这时候就只能阻塞住发送协程了, 等有合适的机会了,再将数据发送出去。

getg()获取当前协程对象g的指针,acquireSudog()生成一个sudog,然后将当前协程及相关数据封装好链接到sendq列表中。然年通过goparkunlock()将其转为等待状态,并解锁。操作完成。

channel接收

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

channel接收,即协程从channel中接收数据,与此操作对应的go代码如<- c。

channel接收的实现源码中,通过chanrecv1(),调用chanrecv(),其中block参数为true。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }
...
}

同发送一样,接收也会首先检查c是否为nil,如果为nil,会调用gopark()休眠当前协程,从而最终造成死锁。

接收操作同样先进行加锁,然后开始正式操作。

close处理

if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

接收和发送略有不同,当channel关闭并且channel的缓存队列里没有数据了,那么接收动作会直接结束,但不会报错。

也就是说,允许从已关闭的channel中接收数据。

从发送等待协程中接收

if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

尝试从发送等待协程列表中取出一个等待协程,如果存在,则调用recv()方法接收数据。

这里的recv()方法比send()方法稍微复杂一点,我们简单分析下。

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        ...
        if ep != nil {
            // copy data from sender
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        qp := chanbuf(c, c.recvx)
        ...
        // copy data from queue to receiver
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

recv()的接收动作分为两种情况:

  • c.dataqsiz == 0:即当channel为无缓存channel时,直接将发送协程中的数据,拷贝给接收者。
  • c.dataqsiz != 0:如果channel有缓存,则:根据缓存的接收游标,从缓存队列中取出一个,拷贝给接受者

小结

channel必须初始化后才能使用;

channel关闭后,不允许在发送数据,但是还可以继续从中接收未处理完的数据。所以尽量从发送端关闭channel;

无缓存的channel需要注意在一个协程中的操作不会造成死锁;

到此这篇关于Golang通道channel的源码分析的文章就介绍到这了,更多相关Golang通道channel内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 深入理解Golang Channel 的底层结构

    目录 make chan 发送和接收 Goroutine Pause/Resume wait empty channel Golang 使用 Groutine 和 channels 实现了 CSP(Communicating Sequential Processes) 模型,channles在 goroutine 的通信和同步中承担着重要的角色. 在GopherCon 2017 中,Golang 专家 Kavya 深入介绍了 Go Channels 的内部机制,以及运行时调度器和内存管理系统是如

  • Go语言入门学习之Channel通道详解

    目录 前言 通道的声明 通道的初始化 发送和接收数据 通道的关闭 通道的容量与长度 缓冲通道与无缓冲通道 双向通道和单向通道 遍历通道 fibonacci 数列 参考文章: 总结 前言 不同于传统的多线程并发模型使用共享内存来实现线程间通信的方式,go 是通过 channel 进行协程 (goroutine) 之间的通信来实现数据共享. channel,就是一个管道,可以想像成 Go 协程之间通信的管道.它是一种队列式的数据结构,遵循先入先出的规则. 通道的声明 每个通道都只能传递一种数据类型的

  • Go语言中的通道channel详情

    目录 一.Go语言通道基础概念 1.channel产生背景 2.channel工作方式 二.通道使用语法 1.通道的声明与初始化 2.将数据放入通道内 3.从通道内取出数据 4.关闭通道close 三.单项通道及通道的状态分析 1.单项输出通道 2.单项输入通道 3.通道的状态 四.通道死锁原因分析 一.Go语言通道基础概念 1.channel产生背景 线程之间进行通信的时候,会因为资源的争夺而产生竟态问题,为了保证数据交换的正确性,必须使用互斥量给内存进行加锁,go语言并发的模型是CSP,提倡

  • GoLang channel使用介绍

    目录 停止信号 任务定时 解耦生产方和消费方 控制并发数 停止信号 channel 用于停止信号的场景还是挺多的,经常是关闭某个 channel 或者向 channel 发送一个元素,使得接收 channel 的那一方获知道此信息,进而做一些其他的操作. 任务定时 与 timer 结合,一般有两种玩法:实现超时控制,实现定期执行某个任务. 有时候,需要执行某项操作,但又不想它耗费太长时间,上一个定时器就可以搞定: select { case <-time.After(100 * time.Mil

  • 浅析Go语言中Channel的各种用法

    目录 Go语言基础四 if定义 单层if语法格式 语法警告 多层if语法格式 Switch定义 Type Switch Select定义 Select语句注意事项 Select用法补充 退出 判断Channel状态 Go语言基础四 今天我们要来学习if语句,也就是大家口中的判断语句,我们首先来看一下if语句的定义 if定义 条件语句需要开发者通过指定一个或多个条件,并通过测试条件是否为 true 来决定是否执行指定语句,并在条件为 false 的情况在执行另外的语句.相信读者看到这儿,也是云里雾

  • Golang中channel的原理解读(推荐)

    数据结构 channel的数据结构在$GOROOT/src/runtime/chan.go文件下: type hchan struct { qcount uint // 当前队列中剩余元素个数 dataqsiz uint // 环形队列长度,即可以存放的元素个数 buf unsafe.Pointer // 环形队列指针 elemsize uint16 // 每个元素的大小 closed uint32 // 标记是否关闭 elemtype *_type // 元素类型 sendx uint //

  • Golang通道channel的源码分析

    目录 前言 channel基础结构 channel初始化 channel发送 channel接收 小结 前言 channel是golang中标志性的概念之一,很好很强大! channel(通道),顾名思义,是一种通道,一种用于并发环境中数据传递的通道.通常结合golang中另一重要概念goroutine(go协程)使用,使得在golang中的并发编程变得清晰简洁同时又高效强大. 今天尝试着读读golang对channel的实现源码,本文主要是自己个人对于Channel源码的学习笔记,需要的朋友可

  • Golang Mutex互斥锁源码分析

    目录 前言 Mutex 特性 数据结构 Lock() Unlock() 前言 在上一篇文章中,我们一起学习了如何使用 Go 中的互斥锁 Mutex,那么本篇文章,我们就一起来探究下 Mutex 底层是如何实现的,知其然,更要知其所以然! 说明:本文中的示例,均是基于Go1.17 64位机器 Mutex 特性 Mutex 就是一把互斥锁,可以想象成一个令牌,有且只有这一个令牌,只有持有令牌的 goroutine 才能进入房间(临界区),在房间内执行完任务后,走出房间并把令牌交出来,如果还有其余的 

  • Netty启动流程服务端channel初始化源码分析

    目录 服务端channel初始化 回顾上一小节initAndRegister()方法 init(Channel)方法 前文传送门 Netty分布式server启动流程 服务端channel初始化 回顾上一小节initAndRegister()方法 final ChannelFuture initAndRegister() { Channel channel = null; try { //创建channel channel = channelFactory.newChannel(); //初始化

  • Golang源码分析之golang/sync之singleflight

    目录 1.背景 1.1. 项目介绍 1.2.使用方法 2.源码分析 2.1.项目结构 2.2.数据结构 2.3.API代码流程 3.总结 1.背景 1.1. 项目介绍 golang/sync库拓展了官方自带的sync库,提供了errgroup.semaphore.singleflight及syncmap四个包,本次分析singlefliht的源代码.singlefliht用于解决单机协程并发调用下的重复调用问题,常与缓存一起使用,避免缓存击穿. 1.2.使用方法 go get -u golang

  • 详解Golang中select的使用与源码分析

    目录 背景 select 流程 背景 golang 中主推 channel 通信.单个 channel 的通信可以通过一个goroutine往 channel 发数据,另外一个从channel取数据进行.这是阻塞的,因为要想顺利执行完这个步骤,需要 channel 准备好才行,准备好的条件如下: 1.发送 缓存有空间(如果是有缓存的 channel) 有等待接收的 goroutine 2.接收 缓存有数据(如果是有缓存的 channel) 有等待发送的 goroutine 对channel实际使

  • Netty分布式server启动流程Nio创建源码分析

    目录 NioServerSocketChannel创建 继承关系 绑定端口 端口封装成socket地址对象 跟进initAndRegister()方法 创建channel 父类的构造方法 将jdk的channel设置为非阻塞模式 前文传送门 Netty分布式Server启动流程服务端初始化源码分析 NioServerSocketChannel创建 我们如果熟悉Nio, 则对channel的概念则不会陌生, channel在相当于一个通道, 用于数据的传输 Netty将jdk的channel进行了

  • Java BufferedWriter BufferedReader 源码分析

    一:BufferedWriter 1.类功能简介: BufferedWriter.缓存字符输出流.他的功能是为传入的底层字符输出流提供缓存功能.同样当使用底层字符输出流向目的地中写入字符或者字符数组时.每写入一次就要打开一次到目的地的连接.这样频繁的访问不断效率底下.也有可能会对存储介质造成一定的破坏.比如当我们向磁盘中不断的写入字节时.夸张一点.将一个非常大单位是G的字节数据写入到磁盘的指定文件中的.没写入一个字节就要打开一次到这个磁盘的通道.这个结果无疑是恐怖的.而当我们使用Buffered

  • Python日志打印里logging.getLogger源码分析详解

    实践环境 WIN 10 Python 3.6.5 函数说明 logging.getLogger(name=None) getLogger函数位于logging/__init__.py脚本 源码分析 _loggerClass = Logger # ...略 root = RootLogger(WARNING) Logger.root = root Logger.manager = Manager(Logger.root) # ...略 def getLogger(name=None): "&quo

  • Go语言context test源码分析详情

    目录 1.测试例子分析 2.单元测试 1.测试例子分析 example_test.go,展示了With-系列的4个例子 func ExampleWithCancel() {   gen := func(ctx context.Context) <-chan int {     dst := make(chan int)     n := 1     go func() {       for {         select {         case <-ctx.Done():      

  • 分布式Netty源码分析EventLoopGroup及介绍

    目录 EventLoopGroup介绍 功能1:先来看看注册Channel 功能2:执行一些Runnable任务 EventLoop介绍 NioEventLoop介绍 EpollEventLoop介绍 后续 EventLoopGroup介绍 在前面一篇文章中提到了,EventLoopGroup主要负责2个事情,这里再重复下: 它主要包含2个方面的功能,注册Channel和执行一些Runnable任务. 功能1:先来看看注册Channel 即将Channel注册到Selector上,由Select

随机推荐