深入理解Golang channel的应用

目录
  • 前言
  • 整体结构
  • 创建
  • 发送
  • 接收
  • 关闭

前言

channel是用于 goroutine 之间的同步、通信的数据结构

channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率

channel的用途包括但不限于以下几点:

  • 协程间通信,同步
  • 定时任务:和timer结合
  • 解耦生产方和消费方,实现阻塞队列
  • 控制并发数

本文将介绍channel的底层原理,包括数据结构,channel的创建,发送,接收,关闭的实现逻辑

整体结构

Go channel的数据结构如下所示:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    lock mutex
}

qcount:已经存储了多少个元素

dataqsie:最多存储多少个元素,即缓冲区容量

buf:指向缓冲区的位置,实际上是一个数组

elemsize:每个元素占多大空间

closed:channel能够关闭,这里记录其关闭状态

elemtype:保存数据的类型信息,用于go运行时使用

sendx,recvx:

  • 记录下一个要发送到的位置,下一次从哪里还是接收
  • 这里用数组模拟队列,这两个变量即表示队列的队头,队尾
  • 因此channel的缓冲也被称为环形缓冲区

recvq,sendq:

当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送

lock:channel支持协程间并发访问,因此需要一把锁来保护

创建

创建channel会被编译器编译为调用makechan函数

// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)

会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值

可以看出,创建出来的是hchan指针,这样就能在函数间直接传递 channel,而不用传递 channel 的指针

func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   // mem:缓冲区大小
   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError( "makechan: size out of range" ))
   }

   var c *hchan
   switch {
   // 缓冲区大小为空,只申请hchanSize大小的内存
   case mem == 0:
       c = (*hchan)(mallocgc(hchanSize, nil, true))
       c.buf = c.raceaddr()
   // 元素类型不包含指针,一次性分配hchanSize+mem大小的内存
   case elem.ptrdata == 0:
       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
       c.buf = add(unsafe.Pointer(c), hchanSize)
   // 否则就是带缓存,且有指针,分配两次内存
   default:
      // Elements contain pointers.
       c = new(hchan)
       c.buf = mallocgc(mem, elem, true)
   }

   // 保存元素类型,元素大小,容量
   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)

   return c
}

发送

执行以下代码时:

ch <- 3

编译器会转化为对chansend的调用

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   // 如果channel是空
   if c == nil {
      // 非阻塞,直接返回
      if !block {
         return  false
      }
      // 否则阻塞当前协程
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw( "unreachable" )
   }

   // 非阻塞,没有关闭,且容量满了,无法发送,直接返回
   if !block && c.closed == 0 && full(c) {
      return  false
   }

   // 加锁
   lock(&c.lock)

   // 如果已经关闭,无法发送,直接panic
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError( "send on closed channel" ))
   }

   // 从接收队列弹出一个协程的包装结构sudog
   if sg := c.recvq.dequeue(); sg != nil {
      // 如果能弹出,即有等到接收的协程,说明:
      // 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
      // 将要发送的数据拷贝到该协程的接收指针上
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return  true
}

   // 缓冲区还有空间
   if c.qcount < c.dataqsiz {
      // qp:计算要发送到的位置的地址
      qp := chanbuf(c, c.sendx)
      // 将数据从ep拷贝到qp
      typedmemmove(c.elemtype, qp, ep)
      // 待发送位置移动
      c.sendx++
      // 由于是数组模拟队列,sendx到顶了需要归零
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      // 缓冲区数量++
      c.qcount++
      unlock(&c.lock)
      return  true
}

   // 往下就是缓冲区无数据,也没有等到接收协程的情况了

   // 如果是非阻塞模式,直接返回
   if !block {
      unlock(&c.lock)
      return  false
    }

   // 将当前协程包装成sudog,阻塞到channel上
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }

   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil

   // 当前协程进入发送等待队列
   c.sendq.enqueue(mysg)
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

 // 被唤醒后从这里开始执行

   KeepAlive(ep)

   if mysg != gp.waiting {
      throw( "G waiting list is corrupted" )
   }
   gp.waiting = nil
   gp.activeStackChans = false
   closed := !mysg.success
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   // 被唤醒后发现channel关闭了,panic
   if closed {
      if c.closed == 0 {
         throw( "chansend: spurious wakeup" )
      }
      panic(plainError( "send on closed channel" ))
   }
   return  true
}

整体流程为:

如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回

从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明:

  • 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
  • 将要发送的数据拷贝到该协程的接收指针上,返回
  • 这里直接从发送者拷贝到接收者的内存,而不是先把数据拷贝到缓冲区,再从缓冲区拷贝到接收者,节约了一次内存拷贝

否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回

接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上

将协程阻塞到channel的等待队列时,将其包装成了sudog结构:

type sudog struct {
   // 协程
   g *g
   // 前一个,后一个指针
   next *sudog
   prev *sudog
   // 等到发送的数据在哪,等待从哪个位置接收数据
   elem unsafe.Pointer
   acquiretime int64
   releasetime int64
   ticket      uint32
   isSelect bool
   success bool

   parent   *sudog // semaRoot binary tree
   waitlink *sudog // g.waiting list or semaRoot
   waittail *sudog // semaRoot
   // 在哪个channel上等待
   c        *hchan // channel
}

其目的是:

  • g本身没有存储前一个,后一个指针,需要用sudog结构包装才能加入队列
  • elem字段存储等到发送的数据在哪,等待从哪个位置接收数据,用于从数据能从协程到协程的直接拷贝

来看看一些子函数:

1.判断channel是否是满的

func full(c *hchan) bool {
   // 无缓冲
   if c.dataqsiz == 0 {
      // 并且没有其他协程在等待
      return c.recvq.first == nil
   }
   // 有缓冲,但容量装满了
   return c.qcount == c.dataqsiz
}

2.send方法:

/**
c:要操作的channel
sg:弹出的接收者协程
ep:要发送的数据在的位置
*/
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果接收者指针不为空,直接把数据从ep拷贝到sg.elem
   if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep)
      sg.elem = nil
   }
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 唤醒该接收者协程
   goready(gp, skip+1)
}

接收

从channel中接收数据有几种写法:

  • 带不带ok
  • 接不接收返回值

根据带不带ok,决定用下面哪个方法

func chanrecv1(c *hchan, elem unsafe.Pointer) {
        chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
        _, received = chanrecv(c, elem, true)
        return
}

根据接不接收返回值,决定elem是不是nil

最终都会调用chanrecv方法:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 如果channel为nil,根据参数中是否阻塞来决定是否阻塞
   if c == nil {
      if !block {
         return
   }
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw( "unreachable" )
   }

   // 非阻塞,并且channel为空
   if !block && empty(c) {
      // 如果还没关闭,直接返回
   if atomic.Load(&c.closed) == 0 {
      return
   }
      // 否则已经关闭,
      // 如果为空,返回该类型的零值
   if empty(c) {
     if ep != nil {
        typedmemclr(c.elemtype, ep)
     }
     return  true, false
       }
   }

   lock(&c.lock)

   // 同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值
   if c.closed != 0 && c.qcount == 0 {
      unlock(&c.lock)
      if ep != nil {
         typedmemclr(c.elemtype, ep)
      }
      return  true, false
}

   // 如果有发送者正在阻塞,说明:
   // 1.无缓冲
   // 2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
   if sg := c.sendq.dequeue(); sg != nil {
      // 将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return  true, true
}

   // 如果缓存区有数据,
   if c.qcount > 0 {
      // qp为缓冲区中下一次接收的位置
      qp := chanbuf(c, c.recvx)
      // 将数据从qp拷贝到ep
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return  true, true
}

   // 接下来就是既没有发送者在等待,也缓冲区也没数据
   if !block {
      unlock(&c.lock)
      return  false, false
}

   // 将当前协程包装成sudog,阻塞到channel中
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // 记录接收地址
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg)

   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive,        traceEvGoBlockRecv, 2)

   // 从这里唤醒
   if mysg != gp.waiting {
      throw( "G waiting list is corrupted" )
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   success := mysg.success
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return  true, success
}

接收流程如为:

如果channel为nil,根据参数中是否阻塞来决定是否阻塞

如果channel已经关闭,且缓冲区没有元素,返回该类型零值

如果有发送者正在阻塞,说明:

  • 要么是无缓冲
  • 有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
  • 将数据从缓冲区拷贝到ep,再将发送者的数据拷贝到缓冲区,并唤该发送者

如果缓存区有数据, 则从缓冲区将数据复制到ep,返回

接下来就是既没有发送者在等待,也缓冲区也没数据的情况:

将当前协程包装成sudog,阻塞到channel中

来看其中的子函数recv():

/**
c:操作的channel
sg:阻塞的发送协程
ep:接收者接收数据的地址
*/
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果是无缓冲channel,直接将数据从发送者sg拷贝到ep
   if c.dataqsiz == 0 {
      if ep != nil {
         recvDirect(c.elemtype, sg, ep)
      }
   // 接下来是有缓冲,且缓冲区满的情况
   } else {
      // qp为channel缓冲区中,接收者下一次接收的地址
   qp := chanbuf(c, c.recvx)
      // 将数据从qp拷贝到ep
   if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
    }
    // 将发送者的数据从sg.elem拷贝到qp
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
       c.recvx = 0
    }
    // 由于一接收已发送,缓冲区还是满的,因此 c.sendx = c.recvx
    c.sendx = c.recvx
}
   sg.elem = nil
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 唤醒发送者
   goready(gp, skip+1)
}

关闭

func closechan(c *hchan) {
   // 不能关闭空channel
   if c == nil {
      panic(plainError( "close of nil channel" ))
   }

   lock(&c.lock)
   // 不能重复关闭
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError( "close of closed channel" ))
   }

   // 修改关闭状态
   c.closed = 1

   var glist gList

   // 释放所有的接收者协程,并为它们赋予零值
 for {
      sg := c.recvq.dequeue()
      if sg == nil {
         break
      }
      if sg.elem != nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
      sg.success = false
      glist.push(gp)
   }

   // 释放所有的发送者协程
 for {
      sg := c.sendq.dequeue()
      if sg == nil {
         break
     }
      sg.elem = nil
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
      sg.success = false
      glist.push(gp)
   }
   unlock(&c.lock)

   // 执行唤醒操作
 for !glist.empty() {
      gp := glist.pop()
      gp.schedlink = 0
      goready(gp, 3)
   }
}

关闭的流程比较简单,可以看出:

不能关闭空channel,不能重复关闭channel

先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒:

接收者:会收到该类型的零值

这里返回零值没有问题,因为之所以这些接收者会阻塞,就是因为缓冲区没有数据,因此channel关闭后该接收者收到零值也符合逻辑

发送者:会被唤醒,然后panic

因此不能在有多个sender的时候贸然关闭channel

以上就是深入理解Golang channel的应用的详细内容,更多关于Golang channel的资料请关注我们其它相关文章!

(0)

相关推荐

  • Go语言中的通道channel详情

    目录 一.Go语言通道基础概念 1.channel产生背景 2.channel工作方式 二.通道使用语法 1.通道的声明与初始化 2.将数据放入通道内 3.从通道内取出数据 4.关闭通道close 三.单项通道及通道的状态分析 1.单项输出通道 2.单项输入通道 3.通道的状态 四.通道死锁原因分析 一.Go语言通道基础概念 1.channel产生背景 线程之间进行通信的时候,会因为资源的争夺而产生竟态问题,为了保证数据交换的正确性,必须使用互斥量给内存进行加锁,go语言并发的模型是CSP,提倡

  • GoLang channel关闭状态相关操作详解

    关于 channel 的使用,有几点不方便的地方: 1.在不改变 channel 自身状态的情况下,无法获知一个 channel 是否关闭. 2.关闭一个 closed channel 会导致 panic.所以,如果关闭 channel 的一方在不知道 channel 是否处于关闭状态时就去贸然关闭 channel 是很危险的事情. 3.向一个 closed channel 发送数据会导致 panic.所以,如果向 channel 发送数据的一方不知道 channel 是否处于关闭状态时就去贸然

  • GoLang channel使用介绍

    目录 停止信号 任务定时 解耦生产方和消费方 控制并发数 停止信号 channel 用于停止信号的场景还是挺多的,经常是关闭某个 channel 或者向 channel 发送一个元素,使得接收 channel 的那一方获知道此信息,进而做一些其他的操作. 任务定时 与 timer 结合,一般有两种玩法:实现超时控制,实现定期执行某个任务. 有时候,需要执行某项操作,但又不想它耗费太长时间,上一个定时器就可以搞定: select { case <-time.After(100 * time.Mil

  • 深入理解Golang Channel 的底层结构

    目录 make chan 发送和接收 Goroutine Pause/Resume wait empty channel Golang 使用 Groutine 和 channels 实现了 CSP(Communicating Sequential Processes) 模型,channles在 goroutine 的通信和同步中承担着重要的角色. 在GopherCon 2017 中,Golang 专家 Kavya 深入介绍了 Go Channels 的内部机制,以及运行时调度器和内存管理系统是如

  • Go底层channel实现原理及示例详解

    目录 概念: 使用场景: 底层数据结构: 操作: 创建 发送 接收 关闭 案例分析: 概念: Go中的channel 是一个队列,遵循先进先出的原则,负责协程之间的通信(Go 语言提倡不要通过共享内存来通信,而要通过通信来实现内存共享,CSP(Communicating Sequential Process)并发模型,就是通过 goroutine 和 channel 来实现的) 使用场景: 停止信号监听 定时任务 生产方和消费方解耦 控制并发数 底层数据结构: 通过var声明或者make函数创建

  • Go语言入门学习之Channel通道详解

    目录 前言 通道的声明 通道的初始化 发送和接收数据 通道的关闭 通道的容量与长度 缓冲通道与无缓冲通道 双向通道和单向通道 遍历通道 fibonacci 数列 参考文章: 总结 前言 不同于传统的多线程并发模型使用共享内存来实现线程间通信的方式,go 是通过 channel 进行协程 (goroutine) 之间的通信来实现数据共享. channel,就是一个管道,可以想像成 Go 协程之间通信的管道.它是一种队列式的数据结构,遵循先入先出的规则. 通道的声明 每个通道都只能传递一种数据类型的

  • 深入理解Golang Channel 的底层结构

    目录 makechan 发送和接收 GoroutinePause/Resume waitemptychannel Golang 使用 Groutine 和 channels 实现了 CSP(Communicating Sequential Processes) 模型,channles在 goroutine 的通信和同步中承担着重要的角色. 在GopherCon 2017 中,Golang 专家 Kavya 深入介绍了 Go Channels 的内部机制,以及运行时调度器和内存管理系统是如何支持

  • 深入理解Golang channel的应用

    目录 前言 整体结构 创建 发送 接收 关闭 前言 channel是用于 goroutine 之间的同步.通信的数据结构 channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率 channel的用途包括但不限于以下几点: 协程间通信,同步 定时任务:和timer结合 解耦生产方和消费方,实现阻塞队列 控制并发数 本文将介绍channel的底层原理,包括数据结构,c

  • 深入理解golang chan的使用

    目录 前言 见真身 结构体 发送数据 接收数据 上手 定义 发送与接收 前言 之前在看golang多线程通信的时候, 看到了go 的管道. 当时就觉得这玩意很神奇, 因为之前接触过的不管是php, java, Python, js, c等等, 都没有这玩意, 第一次见面, 难免勾起我的好奇心. 所以就想着看一看它具体是什么东西. 很明显, 管道是go实现在语言层面的功能, 所以我以为需要去翻他的源码了. 虽然最终没有翻到C的层次, 不过还是受益匪浅. 见真身 结构体 要想知道他是什么东西, 没什

  • Golang channel为什么不会阻塞的原因详解

    正文 最近在学通道channel,发现一个简单的demo: package main import "fmt" func main() { chanInt := make(chan int) go func() { chanInt <- 100 }() res := <-chanInt fmt.Println(res) } 输出结果是100,这个没有问题.但是之前在学goroutine的时候有看到过一个例子: package main import "fmt&qu

  • 深入理解Golang make和new的区别及实现原理

    目录 前言 new的使用 底层实现 make的使用 底层实现 总结 前言 在Go语言中,有两个比较雷同的内置函数,分别是new和make方法,二者都可以用来分配内存,那他们有什么区别呢?对于初学者可能会觉得有点迷惑,尤其是在掌握不牢固的时候经常遇到panic,下面我们就从底层来分析一下二者的不同.感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助. new的使用 new可以对类型进行内存创建和初始化,其返回值是所创建类型的指针引用,这是与make函数的区别之一.我们通过一个示例代码看下: fun

  • 深入理解Golang的反射reflect示例

    目录 编程语言中反射的概念 interface 和 反射 Golang的反射reflect reflect的基本功能TypeOf和ValueOf 说明 从relfect.Value中获取接口interface的信息 已知原有类型[进行“强制转换”] 说明 未知原有类型[遍历探测其Filed] 说明 通过reflect.Value设置实际变量的值 说明 通过reflect.ValueOf来进行方法的调用 说明 Golang的反射reflect性能 小结 总结 参考链接 编程语言中反射的概念 在计算

  • golang channel管道使用示例解析

    目录 定义channel管道 channel管道塞值和取值 通过channel管道实现同步,和数据交互 无缓冲的channel 有缓冲的channel管道 关闭channel管道 单向channel管道,读写分离 管道消费者生产者模型 定义channel管道 定义一个channel时,也需要定义发送到管道的值类型.channel可以使用内置的make()函数来创建: var ch = make(chan int) //等价于:make(chan Type,0) var ch = make(cha

随机推荐