再次探讨go实现无限 buffer 的 channel方法

前言

总所周知,go 里面只有两种 channel,一种是 unbuffered channel, 其声明方式为

ch := make(chan interface{})

另一种是 buffered channel,其声明方式为

bufferSize := 5
ch := make(chan interface{},bufferSize)

对于一个 buffered channel,无论它的 buffer 有多大,它终究是有极限的。这个极限就是该 channel 最初被 make 时,所指定的 bufferSize 。

jojo,buffer channel 的大小是有极限的,我不做 channel 了。

一旦 channel 满了的话,再往里面添加元素的话,将会阻塞。

so how can we make a infinite buffer channel?

本文参考了 medinum 上面的一篇文章,有兴趣的同学可以直接阅读原文。

实现

接口的设计

首先当然是建一个 struct,在百度翻译的帮助下,我们将这个 struct 取名为 InfiniteChannel

type InfiniteChannel struct {
}

思考一下 channel 的核心行为,实际上就两个,一个流入(Fan in),一个流出(Fan out),因此我们添加如下几个 method。

func (c *InfiniteChannel) In(val interface{}) {
	// todo
}

func (c *InfiniteChannel) Out() interface{} {
	// todo
}

内部实现

通过 In() 接收的数据,总得需要一个地方来存放。我们可以用一个 slice 来存放,就算用 In() 往里面添加了很多元素,也可以通过 append() 来拓展 sliceslice 的容量可以无限拓展下去(内存足够的话),所以 channel 也是 infiniteInfiniteChannel 的第一个成员就这么敲定下来的。

type InfiniteChannel struct {
	data    []interface{}
}

用户调用 In()Out() 时,可能是并发的环境,在 go 中如何进行并发编程,最容易想到的肯定是 channel 了,因此我们在内部准备两个 channel,一个 inChan,一个 outChan,用 inChan 来接收数据,用 outChan 来流出数据。

type InfiniteChannel struct {
	inChan  chan interface{}
	outChan chan interface{}
	data    []interface{}
}

func (c *InfiniteChannel) In(val interface{}) {
	c.inChan <- val
}

func (c *InfiniteChannel) Out() interface{} {
	return <-c.outChan
}

其中, inChanoutChan 都是 unbuffered channel。

此外,也肯定是需要一个 select 来处理来自 inChanoutChan 身上的事件。因此我们另起一个协程,在里面做 select 操作。

func (c *InfiniteChannel) background() {
	for true {
		select {
		case newVal := <-c.inChan:
			c.data = append(c.data, newVal)
        case c.outChan <- c.pop():		// pop() 将取出队列的首个元素
		}
	}
}
func NewInfiniteChannel() *InfiniteChannel {
	c := &InfiniteChannel{
		inChan:  make(chan interface{}),
		outChan: make(chan interface{}),
	}
	go c.background()	// 注意这里另起了一个协程
	return c
}

ps:感觉这也算是 go 并发编程的一个套路了。即

  1. 在 new struct 的时候,顺手 go 一个 select 协程,select 协程内执行一个 for 循环,不停的 select,监听一个或者多个 channel 的事件。
  2. struct 对外提供的 method,只会操作 struct 内的 channel(在本例中就是 inChan 和 outChan),不会操作 struct 内的其他数据(在本例中,In() 和 Out() 都没有直接操作 data)。
  3. 触发 channel 的事件后,由 select 协程进行数据的更新(在本例中就是 data )。因为只有 select 协程对除 channel 外的数据成员进行读写操作,且 go 保证了对于 channel 的并发读写是安全的,所以代码是并发安全的。
  4. 如果 struct 是 exported ,用户或许会越过 new ,直接手动 make 一个 struct,可以考虑将 struct 设置为 unexported,把它的首字母小写即可。

pop() 的实现也非常简单。

// 取出队列的首个元素,如果队列为空,将会返回一个 nil
func (c *InfiniteChannel) pop() interface{} {
	if len(c.data) == 0 {
		return nil
	}
	val := c.data[0]
	c.data = c.data[1:]
	return val
}

测试一下

用一个协程每秒钟生产一条数据,另一个协程每半秒消费一条数据,并打印。

func main() {
	c := NewInfiniteChannel()
	go func() {
		for i := 0; i < 20; i++ {
			c.In(i)
			time.Sleep(time.Second)
		}
	}()

	for i := 0; i < 50; i++ {
		val := c.Out()
		fmt.Print(val)
		time.Sleep(time.Millisecond * 500)
	}
}
// out
<nil>0<nil>1<nil>23<nil>4<nil><nil>5<nil>67<nil><nil>89<nil><nil>1011<nil>12<nil>13<nil>14<nil>15<nil>16<nil>17<nil><nil>1819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil>
Process finished with the exit code 0

可以看到,将 InfiniteChannel 内没有数据可供消费时,调用 Out() 将会返回一个 nil,不过这也在我们的意料之中,原因是 pop() 在队列为空时,将会返回 nil。

目前 InfiniteChannel 的行为与标准的 channel 的行为是有出入的,go 中的 channel,在没有数据却仍要取数据时会被阻塞,如何实现这个效果?

优化

我认为此处是是整篇文章最有技巧的地方,我第一次看到时忍不住拍案叫绝。

首先把原来的 background() 摘出来

func (c *InfiniteChannel) background() {
	for true {
		select {
		case newVal := <-c.inChan:
			c.data = append(c.data, newVal)
		case c.outChan <- c.pop():
		}
	}
}

outChan 进行一个简单封装

func (c *InfiniteChannel) background() {
	for true {
		select {
		case newVal := <-c.inChan:
			c.data = append(c.data, newVal)
		case c.outChanWrapper() <- c.pop():
		}
	}
}
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
	return c.outChan
}

目前为止,一切照旧。

点睛之笔来了:

func (c *InfiniteChannel) outChanWrapper() chan interface{} {
	if len(c.data) == 0 {
		return nil
	}
	return c.outChan
}

c.data 为空的时候,返回一个 nil

background() 中,当执行到 case c.outChan <- c.pop(): 时,实际上将会变成:

case nil <- nil:

go 中,是无法往一个 nilchannel 中发送元素的。例如

func main() {
	var c chan interface{}
	select {
	case c <- 1:
	}
}
// fatal error: all goroutines are asleep - deadlock!
func main() {
	var c chan interface{}
	select {
	case c <- 1:
	default:
		fmt.Println("hello world")
	}
}
// hello world

因此,对于

select {
case newVal := <-c.inChan:
	c.data = append(c.data, newVal)
case c.outChanWrapper() <- c.pop():
}

将会一直阻塞在 select 那里,直到 inChan 来了数据。

再测试一下

012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!

最后,程序 panic 了,因为死锁了。

补充

实际上 channel 除了 In()Out() 外,还有一个行为,即 close(),如果 channel close 后,依旧从其中取元素的话,将会取出该类型的默认值。

func main() {
	c := make(chan interface{})
	close(c)
	for true {
		v := <-c
		fmt.Println(v)
		time.Sleep(time.Second)
	}
}
// output
// <nil>
// <nil>
// <nil>
// <nil>
func main() {
	c := make(chan interface{})
	close(c)
	for true {
		v, isOpen := <-c
		fmt.Println(v, isOpen)
		time.Sleep(time.Second)
	}
}
// output
// <nil> false
// <nil> false
// <nil> false
// <nil> false

我们也需要实现相同的效果。

func (c *InfiniteChannel) Close() {
	close(c.inChan)
}

func (c *InfiniteChannel) background() {
	for true {
		select {
		case newVal, isOpen := <-c.inChan:
			if isOpen {
				c.data = append(c.data, newVal)
			} else {
				c.isOpen = false
			}
		case c.outChanWrapper() <- c.pop():
		}
	}
}

func NewInfiniteChannel() *InfiniteChannel {
	c := &InfiniteChannel{
		inChan:  make(chan interface{}),
		outChan: make(chan interface{}),
		isOpen:  true,
	}
	go c.background()
	return c
}

func (c *InfiniteChannel) outChanWrapper() chan interface{} {
    // 这里添加了对 c.isOpen 的判断
	if c.isOpen && len(c.data) == 0 {
		return nil
	}
	return c.outChan
}

再测试一下

func main() {
	c := NewInfiniteChannel()
	go func() {
		for i := 0; i < 20; i++ {
			c.In(i)
			time.Sleep(time.Second)
		}
		c.Close()		// 这里调用了 Close
	}()

	for i := 0; i < 50; i++ {
		val := c.Out()
		fmt.Print(val)
		time.Sleep(time.Millisecond * 500)
	}
}
// output
012345678910111213141516171819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil>
Process finished with the exit code 0

符合预期

遗憾

目前看上去已经很完美了,但是和标准的 channel 相比,仍然有差距。因为标准的 channel 是有这种用法的

v,isOpen := <- ch

可以通过 isOpen 变量来获取 channel 的开闭情况。

因此 InfiniteChannel 也应该提供一个类似的 method

func (c *InfiniteChannel) OutAndIsOpen() (interface{}, bool) {
	// todo
}

可惜的是,要想得知 InfiniteChannel 是否是 Open 的,就必定要访问 InfiniteChannel 内的 isOpen 成员。

type InfiniteChannel struct {
	inChan  chan interface{}
	outChan chan interface{}
	data    []interface{}
	isOpen  bool
}

isOpen 并非 channel 类型,根据之前的套路,这种非 channel 类型的成员只应该被 select 协程访问。一旦有多个协程访问,就会出现并发问题,除非加锁。

我不能接受!所以干脆不提供这个 method 了,嘿嘿。

完整代码

func main() {
	c := NewInfiniteChannel()
	go func() {
		for i := 0; i < 20; i++ {
			c.In(i)
			time.Sleep(time.Second)
		}
		c.Close()
	}()

	for i := 0; i < 50; i++ {
		val := c.Out()
		fmt.Print(val)
		time.Sleep(time.Millisecond * 500)
	}
}

type InfiniteChannel struct {
	inChan  chan interface{}
	outChan chan interface{}
	data    []interface{}
	isOpen  bool
}

func (c *InfiniteChannel) In(val interface{}) {
	c.inChan <- val
}

func (c *InfiniteChannel) Out() interface{} {
	return <-c.outChan
}

func (c *InfiniteChannel) Close() {
	close(c.inChan)
}

func (c *InfiniteChannel) background() {
	for true {
		select {
		case newVal, isOpen := <-c.inChan:
			if isOpen {
				c.data = append(c.data, newVal)
			} else {
				c.isOpen = false
			}
		case c.outChanWrapper() <- c.pop():
		}
	}
}

func NewInfiniteChannel() *InfiniteChannel {
	c := &InfiniteChannel{
		inChan:  make(chan interface{}),
		outChan: make(chan interface{}),
		isOpen:  true,
	}
	go c.background()
	return c
}

// 取出队列的首个元素,如果队列为空,将会返回一个 nil
func (c *InfiniteChannel) pop() interface{} {
	if len(c.data) == 0 {
		return nil
	}
	val := c.data[0]
	c.data = c.data[1:]
	return val
}

func (c *InfiniteChannel) outChanWrapper() chan interface{} {
	if c.isOpen && len(c.data) == 0 {
		return nil
	}
	return c.outChan
}

参考

https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd

以上就是再次探讨go实现无限 buffer 的 channel方法的详细内容,更多关于go无限 buffer 的 channel的资料请关注我们其它相关文章!

(0)

相关推荐

  • Django使用Channels实现WebSocket的方法

    WebSocket - 开启通往新世界的大门 WebSocket是什么? WebSocket是一种在单个TCP连接上进行全双工通讯的协议.WebSocket允许服务端主动向客户端推送数据.在WebSocket协议中,客户端浏览器和服务器只需要完成一次握手就可以创建持久性的连接,并在浏览器和服务器之间进行双向的数据传输. WebSocket有什么用? WebSocket区别于HTTP协议的一个最为显著的特点是,WebSocket协议可以由服务端主动发起消息,对于浏览器需要及时接收数据变化的场景非常

  • Go语言中使用 buffered channel 实现线程安全的 pool

    概述 我们已经知道 Go 语言提供了 sync.Pool,但是做的不怎么好,所以有必要自己来实现一个 pool. 给我看代码: 复制代码 代码如下: type Pool struct {   pool chan *Client } // 创建一个新的 pool func NewPool(max int) *Pool {   return &Pool{     pool: make(chan *Client, max),   } } // 从 pool 里借一个 Client func (p *P

  • go原生库的中bytes.Buffer用法

    1 bytes.Buffer定义 bytes.Buffer提供可扩容的字节缓冲区,实质是对切片的封装:结构中包含一个64字节的小切片,避免小内存分配: // A Buffer is a variable-sized buffer of bytes with Read and Write methods. // The zero value for Buffer is an empty buffer ready to use. type Buffer struct { buf []byte //

  • Django Channels 实现点对点实时聊天和消息推送功能

    简介在很多实际的项目开发中,我们需要实现很多实时功能:而在这篇文章中,我们就利用django channels简单地实现了点对点聊天和消息推送功能. 手边有一个项目需要用到后台消息推送和用户之间一对一在线聊天的功能.例如用户A评论了用户B的帖子,这时候用户B就应该收到一条通知,显示自己的帖子被评论了.这个功能可以由最基本的刷新页面后访问数据库来完成,但是这样会增加对后台服务器的压力,同时如果是手机客户端的话,也会造成流量的损失.于是,我们考虑使用websocket建立一个连接来完成这个功能. 但

  • 详解Django-channels 实现WebSocket实例

    引入 先安装三个模块 pip install channels pip install channels_redis pip install pywin32 创建一个Django项目和一个app 项目名随意,app名随意.这里项目名为 django_websocket_demo ,app名 chat 把app文件夹下除了 views.py 和 __init__.py 的文件都删了,最终项目目录结构如下: django_websocket_demo/ manage.py django_websoc

  • C#语言使用gRPC、protobuf(Google Protocol Buffers)实现文件传输功能

    初识gRPC还是一位做JAVA的同事在项目中用到了它,为了C#的客户端程序和java的服务器程序进行通信和数据交换,当时还是对方编译成C#,我直接调用. 后来,自己下来做了C#版本gRPC编写,搜了很多资料,但许多都是从入门开始?调用说"Say Hi!"这种官方标准的入门示例,然后遇到各种问题-- 关于gRPC和Protobuf介绍,就不介绍了,网络上一搜一大把,随便一抓都是标准的官方,所以直接从使用说起. gPRC源代码:https://github.com/grpc/grpc: p

  • 再次探讨go实现无限 buffer 的 channel方法

    前言 总所周知,go 里面只有两种 channel,一种是 unbuffered channel, 其声明方式为 ch := make(chan interface{}) 另一种是 buffered channel,其声明方式为 bufferSize := 5 ch := make(chan interface{},bufferSize) 对于一个 buffered channel,无论它的 buffer 有多大,它终究是有极限的.这个极限就是该 channel 最初被 make 时,所指定的

  • node.JS二进制操作模块buffer对象使用方法详解

    在ES6引入TypedArray之前,JavaScript语言没有读取或操作二进制数据流的机制.Buffer类被引入作为Nodejs的API的一部分,使其可以在TCP流和文件系统操作等场景中处理二进制数据流.现在TypedArray已经被添加进ES6中,Buffer类以一种更优与更适合Node.js用例的方式实现了Uint8Array. Buffer对象概述 由于应用场景不同,在Node中,应用需要处理网络协议.操作数据库.处理图片.接收上传文件等,在网络流和文件的操作中,还要处理大量二进制数据

  • jstree创建无限分级树的方法【基于ajax动态创建子节点】

    本文实例讲述了jstree创建无限分级树的方法.分享给大家供大家参考,具体如下: 首先来看一下效果 页面加载之初 节点全部展开后 首先数据库的表结构如下 其中Id为主键,PId为关联到自身的外键 两个字段均为GUID形式 层级关系主要靠这两个字段维护 其次需要有一个类型 public class MenuType { public Guid Id { get; set; } public Guid PId { get; set; } public string MenuName { get; s

  • EasyUI Tree树组件无限循环的解决方法

    在学习jquery easyui的tree组件的时候,在url为链接地址的时,发现如果最后一个节点的state为closed时,未节点显示为文件夹,单击会重新加载动态(Url:链接地址)形成无限循环.如: tree.json [{ "id":1, "text":"Folder1", "iconCls":"icon-save", "children":[{ "text"

  • php实现smarty模板无限极分类的方法

    本文实例讲述了php实现smarty模板无限极分类的方法.分享给大家供大家参考,具体如下: <?php $conn = mysql_connect("localhost","admin","admin"); mysql_select_db("people_shop",$conn); mysql_query("SET NAMES 'UTF-8'"); $class_arr=array(); $sql =

  • PHP超牛逼无限极分类生成树方法

    你还在用浪费时间又浪费内存的递归遍历无限极分类吗,看了该篇文章,我觉得你应该换换了. 这是我在OSChina上看到的一段非常精简的PHP无限极分类生成树方法,巧在引用,整理分享了. 复制代码 代码如下: function generateTree($items){     $tree = array();     foreach($items as $item){         if(isset($items[$item['pid']])){             $items[$item[

  • sqlserver实现树形结构递归查询(无限极分类)的方法

    SQL Server 2005开始,我们可以直接通过CTE来支持递归查询,CTE即公用表表达式 百度百科 公用表表达式(CTE),是一个在查询中定义的临时命名结果集将在from子句中使用它.每个CTE仅被定义一次(但在其作用域内可以被引用任意次),并且在该查询生存期间将一直生存.可以使用CTE来执行递归操作.创建的语法是: with <name of you cte>(<column names>) as( <actual query> ) select * from

  • C# TreeView无限目录树实现方法

    本文实例讲述了C# TreeView无限目录树实现方法.分享给大家供大家参考,具体如下: #region 绑定客户树 protected void bindTreeView() { TreeView1.Nodes.Clear(); string userid = Session["UserID"].ToString(); string sqlwr = new SY_ADMINUSER().GetUserIDListByLoginUser(userid, "CUSTOMERSE

  • Android编程实现ListView内容无限循环显示的方法

    本文实例讲述了Android编程实现ListView内容无限循环显示的方法.分享给大家供大家参考,具体如下: 其实要达到无限循环显示,主要就是实现继承Adapter的类. 我这里用到的是BaseAdapter private class MyAdapter extends BaseAdapter{ private Context context; private String[] strs = null; LayoutInflater inflater = null; public MyAdap

  • thinkPHP实现递归循环栏目并按照树形结构无限极输出的方法

    本文实例讲述了thinkPHP实现递归循环栏目并按照树形结构无限极输出的方法.分享给大家供大家参考,具体如下: 这里使用thinkphp递归循环栏目按照树形结构无限极输出,并保存为一个数组,利于模板调用 具体代码如下: private function categoryTree($parentid,$level) //因为是本类中使用所以定于为私有函数 { $Category= D('Category'); $result = $Category->where("`parentid`=&q

随机推荐