

本文主要给大家介绍了关于golang 如何将多路复异步io转变成阻塞io的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍:

package main

import (

func handleConnection(c net.Conn) {
 buffer := make([]byte, 1024)
 c.Write([]byte("Hello from server"))

func main() {
 l, err := net.Listen("tcp", "host:port")
 if err != nil {
 defer l.Close()
 for {
 c, err := l.Accept()
 if err!= nil {
 go handleConnection(c)


// Multiple goroutines may invoke methods on a Conn simultaneously.
type Conn interface {
 Read(b []byte) (n int, err error)
 Write(b []byte) (n int, err error)
 Close() error
 LocalAddr() Addr
 RemoteAddr() Addr
 SetDeadline(t time.Time) error
 SetReadDeadline(t time.Time) error
 SetWriteDeadline(t time.Time) error

type conn struct {
 fd *netFD


// Network file descriptor.
type netFD struct {
 // locking/lifetime of sysfd + serialize access to Read and Write methods
 fdmu fdMutex

 // immutable until Close
 sysfd  int
 family  int
 sotype  int
 isConnected bool
 net   string
 laddr  Addr
 raddr  Addr

 // wait server
 pd pollDesc

func (fd *netFD) accept() (netfd *netFD, err error) {
 for {
 s, rsa, err = accept(fd.sysfd)
 if err != nil {
 nerr, ok := err.(*os.SyscallError)
 if !ok {
 return nil, err
 switch nerr.Err {
 /* 如果错误是EAGAIN说明Socket的缓冲区为空,未读取到任何数据
 case syscall.EAGAIN:
 if err = fd.pd.waitRead(); err == nil {
 case syscall.ECONNABORTED:
 return nil, err
 //代码过长不再列出,感兴趣看go的源码,runtime 下的fd_unix.go
 return netfd, nil



const (
 pdReady uintptr = 1
 pdWait uintptr = 2

// Network poller descriptor.
type pollDesc struct {
 link *pollDesc // in pollcache, protected by pollcache.lock
 lock mutex // protects the following fields
 fd  uintptr
 closing bool
 seq  uintptr // protects from stale timers and ready notifications
 rg  uintptr // pdReady, pdWait, G waiting for read or nil
 rt  timer // read deadline timer (set if rt.f != nil)
 rd  int64 // read deadline
 wg  uintptr // pdReady, pdWait, G waiting for write or nil
 wt  timer // write deadline timer
 wd  int64 // write deadline
 user uint32 // user settable cookie

type pollCache struct {
 lock mutex
 first *pollDesc

pollDesc网络轮询器是Golang中针对每个socket文件描述符建立的轮询机制。 此处的轮询并不是一般意义上的轮询,而是Golang的runtime在调度goroutine或者GC完成之后或者指定时间之内,调用epoll_wait获取所有产生IO事件的socket文件描述符。当然在runtime轮询之前,需要将socket文件描述符和当前goroutine的相关信息加入epoll维护的数据结构中,并挂起当前goroutine,当IO就绪后,通过epoll返回的文件描述符和其中附带的goroutine的信息,重新恢复当前goroutine的执行。这里我们可以看到pollDesc中有两个变量wg和rg,其实我们可以把它们看作信号量,这两个变量有几种不同的状态:

  • pdReady:io就绪
  • pdWait:当前的goroutine正在准备挂起在信号量上,但是还没有挂起。
  • G pointer:当我们把它改为指向当前goroutine的指针时,当前goroutine挂起


func net_runtime_pollWait(pd *pollDesc, mode int) int {
 err := netpollcheckerr(pd, int32(mode))
 if err != 0 {
 return err
 // As for now only Solaris uses level-triggered IO.
 if GOOS == "solaris" {
 netpollarm(pd, mode)
 for !netpollblock(pd, int32(mode), false) {
 err = netpollcheckerr(pd, int32(mode))
 if err != 0 {
 return err
 // Can happen if timeout has fired and unblocked us,
 // but before we had a chance to run, timeout has been reset.
 // Pretend it has not happened and retry.
 return 0

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
 gpp := &pd.rg
 if mode == 'w' {
 gpp = &pd.wg

 for {
 old := *gpp
 if old == pdReady {
 *gpp = 0
 return true
 if old != 0 {
 throw("netpollblock: double wait")
  //设置gpp pdWait
 if atomic.Casuintptr(gpp, 0, pdWait) {

 if waitio || netpollcheckerr(pd, mode) == 0 {
 gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)

 old := atomic.Xchguintptr(gpp, 0)
 if old > pdWait {
 throw("netpollblock: corrupted state")
 return old == pdReady


func netpoll(block bool) *g {
 if epfd == -1 {
 return nil
 waitms := int32(-1)
 if !block {
 waitms = 0
 var events [128]epollevent
 n := epollwait(epfd, &events[0], int32(len(events)), waitms)
 if n < 0 {
 if n != -_EINTR {
 println("runtime: epollwait on fd", epfd, "failed with", -n)
 throw("epollwait failed")
 goto retry
 var gp guintptr
 for i := int32(0); i < n; i++ {
 ev := &events[i]
 if ev.events == 0 {
 var mode int32
 mode += 'r'
 if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
 mode += 'w'
 if mode != 0 {
 pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
 netpollready(&gp, pd, mode)
 if block && gp == 0 {
 goto retry
 return gp.ptr()

这里就是熟悉的代码了,epoll的使用,看起来亲民多了。pd:=*(**pollDesc)(unsafe.Pointer(&ev.data))这是最关键的一句,我们在这里拿到当前可读时间的pollDesc,上面我们已经说了,当pollDesc的读写信号量保存为G pointer时当前goroutine就会挂起。而在这里我们调用了netpollready函数,函数中把相应的读写信号量G指针擦出,置为pdReady,G-pointer状态被抹去,当前goroutine的G指针就放到可运行队列中,这样goroutine就被唤醒了。

可以看到虽然我们在写tcp server看似一个阻塞的网络模型,在其底层实际上是基于异步多路复用的机制来实现的,只是把它封装成了跟阻塞io相似的开发模式,这样是使得我们不用去关注异步io,多路复用等这些复杂的概念以及混乱的回调函数。





