关于redigo中PubSub的一点小坑分析

前言

最近在用 golang 做一些 redis 相关的操作,选用了 redigo 这个第三方库。然后在使用 Pub/Sub 的时候,却发现了一个小坑……

Redis Client

首先,我们来初始化一个带连接池的 Redis Client:

import (
	"github.com/gomodule/redigo/redis"
)

type RedisClient struct {
	pool *redis.Pool
}

func NewRedisClient(addr string, db int, passwd string) *RedisClient {
	pool := &redis.Pool{
		MaxIdle:  10,
		IdleTimeout: 300 * time.Second,
		Dial: func() (redis.Conn, error) {
			c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db))
			if err != nil {
				return nil, err
			}
			return c, nil
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			if time.Since(t) < time.Minute {
				return nil
			}
			_, err := c.Do("PING")
			return err
		},
	}
	log.Printf("new redis pool at %s", addr)
	client := &RedisClient{
		pool: pool,
	}
	return client
}

Publish

然后我们可以简单的实现一个 publish 方法:

func (r *RedisClient) Publish(channel, message string) (int, error) {
	c := r.pool.Get()
	defer c.Close()
	n, err := redis.Int(c.Do("PUBLISH", channel, message))
	if err != nil {
		return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)
	}
	return n, nil
}

Subscribe

接下来就是一个稍微复杂点的带有心跳的 subscribe 方法:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done <- fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done <- err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done <- nil
					return
				}
			}
		}
	}()

	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case <-ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := <-done:
			return err
		case <-tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}

	return nil
}

最后,我们写一个简单地 main 函数来调用 publish & subscribe:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done <- fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done <- err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done <- nil
					return
				}
			}
		}
	}()

	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case <-ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := <-done:
			return err
		case <-tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}

	return nil
}

咋一看之下,好像并没有什么异常?然而,如果我们这时候去看 redis 的 tcp 连接,就可以发现一些猫腻:

$sudo netstat -antp | grep redis
tcp  0  0 0.0.0.0:6379   0.0.0.0:*    LISTEN  940/redis-server 0.
tcp  0  0 172.16.8.128:6379  172.16.8.1:55010  ESTABLISHED 940/redis-server 0.
tcp  0  0 172.16.8.128:6379  172.16.8.1:55015  ESTABLISHED 940/redis-server 0.
tcp  0  0 172.16.8.128:6379  172.16.8.1:55009  ESTABLISHED 940/redis-server 0.
tcp  0  0 172.16.8.128:6379  172.16.8.1:55005  ESTABLISHED 940/redis-server 0.
tcp  0  0 172.16.8.128:6379  172.16.8.1:55012  ESTABLISHED 940/redis-server 0.
tcp  0  0 172.16.8.128:6379  172.16.8.1:55011  ESTABLISHED 940/redis-server 0.
tcp  0  0 172.16.8.128:6379  172.16.8.1:55013  ESTABLISHED 940/redis-server 0.
tcp  0  0 172.16.8.128:6379  172.16.8.1:55007  ESTABLISHED 940/redis-server 0.
tcp  0  0 172.16.8.128:6379  172.16.8.1:55006  ESTABLISHED 940/redis-server 0.
tcp  0  0 172.16.8.128:6379  172.16.8.1:55014  ESTABLISHED 940/redis-server 0.
tcp  0  0 172.16.8.128:6379  172.16.8.1:54972  ESTABLISHED 940/redis-server 0. 

竟然是每一次 subscribe 就新建了一个连接,而 connection pool 似乎没有什么作用。

更进一步地调试,我们发现在 defer psc.Close() 的时候就卡住了,也就是上面的 10 个 goroutine 其实并没有正常退出。

Concurrent

排查许久之后,终于定位到了问题!引用 redigo 的说明

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.

For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.

也就是说,虽然一个连接可以在不同的 goroutine 并发调用 Receive() 和 Subscribe()(subscribe调用了send和flush) ,但是却不能再有其他并发操作(比如 Close())。

其他相似的问题还可以参考 issue

Fix

知道了上面的原因之后,我们稍微修改一下 defer psc.Close() 的位置即可解决问题:

	// start a new goroutine to receive message
	go func() {
		// IMPORTANT!
		defer psc.Close()
		for {
			switch msg := psc.Receive().(type) {
			case error:

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • go实现redigo的简单操作

    golang操作redis主要有两个库,go-redis和redigo.两者操作都比较简单,区别上redigo更像一个client执行各种操作都是通过Do函数去做的,redis-go对函数的封装更好,相比之下redigo操作redis显得有些繁琐.但是官方更推荐redigo,所以项目中我使用了redigo. 1.连接redis package redisclient import ( "fmt" redigo "github.com/garyburd/redigo/redis

  • 关于redigo中PubSub的一点小坑分析

    前言 最近在用 golang 做一些 redis 相关的操作,选用了 redigo 这个第三方库.然后在使用 Pub/Sub 的时候,却发现了一个小坑-- Redis Client 首先,我们来初始化一个带连接池的 Redis Client: import ( "github.com/gomodule/redigo/redis" ) type RedisClient struct { pool *redis.Pool } func NewRedisClient(addr string,

  • 记录vue项目中遇到的一点小问题

    1.鼠标悬浮显示不同的背景图片,代码如下: 效果如图,页面初始效果: 鼠标悬浮后效果: 2.for 循环发送axios遇到的问题 问题背景: 在声动语商项目中,需求更改后,出现一个:教师发布课程的时候要求一个课程同时发送给多个班级. 现在的接口:每次只能发送一个班级的id,也就是:classesId字段只能传一个班级的id,因此为了满足这个新需求就想着:将select修改为多选,使用for循环循环用户选中的班级数组,使用axios发送创建课程请求.更改后的界面如下图所示: 问题复现: 思路:使用

  • 关于jQuery中fade(),show()起始位置的一点小发现

    最近在鼓弄主页的时候想要添加一个音乐播放的插件,暂时使用网易与音乐外链播放器,效果是在右下角弹出和消失,于是问题来了: show()和fade()函数是用来显示或者隐藏元素的函数,可以为其传入时间参数,使得函数在多少毫秒内完成. 但是出现和消失的起始点在哪里呢? 默认的话,是左上角: 但是有的时候要设置元素出现的位置,比如我想让元素以右下角为起始位置,怎么做呢? 我也是否然发现但不确定是否是隐藏属性,就是我为元素设置了如下样式: aside{ position: fixed; bottom:65

  • Python创建二维数组实例(关于list的一个小坑)

    0.目录 1.遇到的问题 2.创建二维数组的办法 •3.1 直接创建法 •3.2 列表生成式法 •3.3 使用模块numpy创建 1.遇到的问题 今天写Python代码的时候遇到了一个大坑,差点就耽误我交作业了... 问题是这样的,我需要创建一个二维数组,如下: m = n = 3 test = [[0] * m] * n print("test =", test) 输出结果如下: test = [[0, 0, 0], [0, 0, 0], [0, 0, 0]] 是不是看起来没有一点问

  • react路由v6版本NavLink的两个小坑及解决

    目录 react路由v6版本NavLink的两个小坑 react 路由React Router(v6) 安装react-router 一级路由 重定向 NavLink高亮 useRoutes路由表----嵌套路由 路由的params参数 路由的search参数 路由的state参数 编程式路由导航 react路由v6版本NavLink的两个小坑 本人新人,是按照文档进行学习的,今遇到两个小坑,现记录如下: 第一点,当前版本的NavLink的style或者className当中的isActive,

  • 一场由Java中Integer引发的踩坑实战

    看过阿里巴巴开发手册的同学应该都会对Integer临界值127有点印象. 原文中写的是: [强制]所有整型包装类对象之间值的比较,全部使用 equals 方法比较. 说明:对于 Integer var = ? 在-128 至 127 之间的赋值,Integer 对象是在 IntegerCache.cache 产生, 会复用已有对象,这个区间内的 Integer 值可以直接使用==进行判断,但是这个区间之外的所有数据,都 会在堆上产生,并不会复用已有对象,这是一个大坑,推荐使用 equals 方法

  • 学习Angular中作用域需要注意的坑

    Angular作用域 在用angular搭建的网页应用中,作用域(scope)这个概念是贯穿其中的.在angular的视图(view)中的很多指令是会创建一个作用域的,例如 ng-app , ng-controller 等.这个作用域就是我们在写控制器构造函数时注入的 $scope (angular1.2之前的版本),他是视图模型(view model)中的一个概念.我们的数据模型(model)就是定义在作用域中的. Angular作用域的坑 用过angular的人应该都会经过一个过程,就是刚开

  • 浅谈c语言中类型隐性转换的坑

    谨记:在C语言中,当两种不同类型之间运算时,低字节长度类型会向高自己长度类型转换,有符号会向无符号类型转换. 举例子如下: #include <stdio.h> void func(void) { int i = 1; unsigned char c1 = 1; signed char c2 = -1; if (c2 > i){ printf("\r\n -1 > 1"); } else{ printf("\r\n -1 <= 1");

  • 浅谈Android为RecyclerView增加监听以及数据混乱的小坑

    为 RecyclerView增加监听 1.在实现好的MyAdapter中写内部接口: public void setOnItemLongClickListener(OnItemLongClickListener onItemLongClickListener) { this.onItemLongClickListener = onItemLongClickListener; } public void setOnItemClickListener(OnItemClickListener onIt

  • 详解Spring Cloud Feign 熔断配置的一些小坑

    1.在使用feign做服务调用时,使用继承的方式调用服务,加入Hystrix的熔断处理fallback配置时,会报错,已解决. 2.使用feign默认配置,熔断不生效,已解决. 最近在做微服务的学习,发现在使用feign做服务调用时,使用继承的方式调用服务,加入Hystrix的熔断处理fallback配置时,会报错,代码如下: @RequestMapping("/demo/api") public interface HelloApi { @GetMapping("user/

随机推荐