Golang 实现 Redis系列(六)如何实现 pipeline 模式的 redis 客户端

本文的完整代码在github.com/hdt3213/godis/redis/client

通常 TCP 客户端的通信模式都是阻塞式的: 客户端发送请求 -> 等待服务端响应 -> 发送下一个请求。因为需要等待网络传输数据,完成一次请求循环需要等待较多时间。

我们能否不等待服务端响应直接发送下一条请求呢?答案是肯定的。

TCP 作为全双工协议可以同时进行上行和下行通信,不必担心客户端和服务端同时发包会导致冲突。

p.s. 打电话的时候两个人同时讲话就会冲突听不清,只能轮流讲。这种通信方式称为半双工。广播只能由电台发送到收音机不能反向传输,这种方式称为单工。

我们为每一个 tcp 连接分配了一个 goroutine 可以保证先收到的请求先先回复。另一个方面,tcp 协议会保证数据流的有序性,同一个 tcp 连接上先发送的请求服务端先接收,先回复的响应客户端先收到。因此我们不必担心混淆响应所对应的请求。

这种在服务端未响应时客户端继续向服务端发送请求的模式称为 Pipeline 模式。因为减少等待网络传输的时间,Pipeline 模式可以极大的提高吞吐量,减少所需使用的 tcp 链接数。

pipeline 模式的 redis 客户端需要有两个后台协程程负责 tcp 通信,调用方通过 channel 向后台协程发送指令,并阻塞等待直到收到响应,这是一个典型的异步编程模式。

我们先来定义 client 的结构:

type Client struct {
    conn        net.Conn // 与服务端的 tcp 连接
    pendingReqs chan *Request // 等待发送的请求
    waitingReqs chan *Request // 等待服务器响应的请求
    ticker      *time.Ticker // 用于触发心跳包的计时器
    addr        string

    ctx        context.Context
    cancelFunc context.CancelFunc
    writing    *sync.WaitGroup // 有请求正在处理不能立即停止,用于实现 graceful shutdown
}

type Request struct {
    id        uint64 // 请求id
    args      [][]byte // 上行参数
    reply     redis.Reply // 收到的返回值
    heartbeat bool // 标记是否是心跳请求
    waiting   *wait.Wait // 调用协程发送请求后通过 waitgroup 等待请求异步处理完成
    err       error
}

调用者将请求发送给后台协程,并通过 wait group 等待异步处理完成:

func (client *Client) Send(args [][]byte) redis.Reply {
	request := &request{
		args:      args,
		heartbeat: false,
		waiting:   &wait.Wait{},
	}
	request.waiting.Add(1)
	client.working.Add(1)
	defer client.working.Done()
	client.pendingReqs <- request // 请求入队
	timeout := request.waiting.WaitWithTimeout(maxWait) // 等待响应或者超时
	if timeout {
		return reply.MakeErrReply("server time out")
	}
	if request.err != nil {
		return reply.MakeErrReply("request failed")
	}
	return request.reply
}

client 的核心部分是后台的读写协程。先从写协程开始:

// 写协程入口
func (client *Client) handleWrite() {
	for req := range client.pendingReqs {
		client.doRequest(req)
	}
}

// 发送请求
func (client *Client) doRequest(req *request) {
	if req == nil || len(req.args) == 0 {
		return
	}
    // 序列化请求
	re := reply.MakeMultiBulkReply(req.args)
	bytes := re.ToBytes()
	_, err := client.conn.Write(bytes)
	i := 0
    // 失败重试
	for err != nil && i < 3 {
		err = client.handleConnectionError(err)
		if err == nil {
			_, err = client.conn.Write(bytes)
		}
		i++
	}
	if err == nil {
        // 发送成功等待服务器响应
		client.waitingReqs <- req
	} else {
		req.err = err
		req.waiting.Done()
	}
}

读协程是我们熟悉的协议解析器模板, 不熟悉的朋友可以到解析Redis Cluster原理了解更多。

// 收到服务端的响应
func (client *Client) finishRequest(reply redis.Reply) {
	defer func() {
		if err := recover(); err != nil {
			debug.PrintStack()
			logger.Error(err)
		}
	}()
	request := <-client.waitingReqs
	if request == nil {
		return
	}
	request.reply = reply
	if request.waiting != nil {
		request.waiting.Done()
	}
}

// 读协程是个 RESP 协议解析器
func (client *Client) handleRead() error {
	ch := parser.ParseStream(client.conn)
	for payload := range ch {
		if payload.Err != nil {
			client.finishRequest(reply.MakeErrReply(payload.Err.Error()))
			continue
		}
		client.finishRequest(payload.Data)
	}
	return nil
}

最后编写 client 的构造器和启动异步协程的代码:

func MakeClient(addr string) (*Client, error) {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return nil, err
    }
    ctx, cancel := context.WithCancel(context.Background())
    return &Client{
        addr:        addr,
        conn:        conn,
        sendingReqs: make(chan *Request, chanSize),
        waitingReqs: make(chan *Request, chanSize),
        ctx:         ctx,
        cancelFunc:  cancel,
        writing:     &sync.WaitGroup{},
    }, nil
}

func (client *Client) Start() {
    client.ticker = time.NewTicker(10 * time.Second)
    go client.handleWrite()
    go func() {
        err := client.handleRead()
        logger.Warn(err)
    }()
    go client.heartbeat()
}

关闭 client 的时候记得等待请求完成:

func (client *Client) Close() {
    // 先阻止新请求进入队列
    close(client.sendingReqs)

    // 等待处理中的请求完成
    client.writing.Wait()

    // 释放资源
    _ = client.conn.Close() // 关闭与服务端的连接,连接关闭后读协程会退出
    client.cancelFunc() // 使用 context 关闭读协程
    close(client.waitingReqs) // 关闭队列
}

测试一下:

func TestClient(t *testing.T) {
    client, err := MakeClient("localhost:6379")
    if err != nil {
        t.Error(err)
    }
    client.Start()

    result = client.Send([][]byte{
        []byte("SET"),
        []byte("a"),
        []byte("a"),
    })
    if statusRet, ok := result.(*reply.StatusReply); ok {
        if statusRet.Status != "OK" {
            t.Error("`set` failed, result: " + statusRet.Status)
        }
    }

    result = client.Send([][]byte{
        []byte("GET"),
        []byte("a"),
    })
    if bulkRet, ok := result.(*reply.BulkReply); ok {
        if string(bulkRet.Arg) != "a" {
            t.Error("`get` failed, result: " + string(bulkRet.Arg))
        }
    }
}

Keep working, we will find a way out.This is Finley, welcome to join us.

到此这篇关于Golang 实现 Redis系列(六)如何实现 pipeline 模式的 redis 客户端的文章就介绍到这了,更多相关Golang实现pipeline模式的redis客户端内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • golang实现redis的延时消息队列功能示例

    前言 在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现.本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo. 提前准备 安装redis, redis-go 因为用的是macOS, 直接 $ brew install redis $ go get github.com/garyburd/redigo/redis 又因为比较懒,生成任

  • Golang使用lua脚本实现redis原子操作

    目录 [redis 调用Lua脚本](#redis 调用Lua脚本) [redis+lua 实现评分排行榜实时更新](#redis+lua 实现评分排行榜实时更新) [lua 脚本](#lua 脚本) Golang调用redis+lua示例 byte切片与string的转换优化 redis 调用Lua脚本 EVAL命令 redis调用Lua脚本需要使用EVAL命令. redis EVAL命令格式: redis 127.0.0.1:6379> EVAL script numkeys key [ke

  • Golang连接Redis数据库的方法

    Golang连接Redis数据库 golang连接数据库,这里博主推荐使用go-redis这个库,理由很简单(连接数据库的操作类似在数据库里面输入命令) go-redis的安装方式 go get -v https://github.com/go-redis/redis -v小v,是输出过程.一般情况下不带-v什么反馈都看不到. 连接redis的方式 package ... import ( "github.com/go-redis/redis" ) func main() { clie

  • 在Golang中使用Redis的方法示例

    周五上班的主要任务是在公司老平台上用redis处理一个队列问题,顺便复习了一下redis操作的基础知识,回来后就想着在自己的博客demo里,用redis来优化一些使用场景,学习一下golang开发下redis的使用. Redis简单介绍 简介 关于Redis的讨论,其实在现在的后台开发中已经是个老生常谈的问题,基本上也是后端开发面试的基本考察点.其中 Redis的背景介绍和细节说明在这里就不赘述.不管怎么介绍,核心在于Redis是一个基于内存的key-value的多数据结构存储,并可以提供持久化

  • Golang 实现 Redis系列(六)如何实现 pipeline 模式的 redis 客户端

    本文的完整代码在github.com/hdt3213/godis/redis/client 通常 TCP 客户端的通信模式都是阻塞式的: 客户端发送请求 -> 等待服务端响应 -> 发送下一个请求.因为需要等待网络传输数据,完成一次请求循环需要等待较多时间. 我们能否不等待服务端响应直接发送下一条请求呢?答案是肯定的. TCP 作为全双工协议可以同时进行上行和下行通信,不必担心客户端和服务端同时发包会导致冲突. p.s. 打电话的时候两个人同时讲话就会冲突听不清,只能轮流讲.这种通信方式称为半

  • BootStrap智能表单实战系列(六)表单编辑页面的数据绑定

    什么是 Bootstrap? Bootstrap 是一个用于快速开发 Web 应用程序和网站的前端框架.Bootstrap 是基于 HTML.CSS.JAVASCRIPT 的. 历史 Bootstrap 是由 Twitter 的 Mark Otto 和 Jacob Thornton 开发的.Bootstrap 是 2011 年八月在 GitHub 上发布的开源产品. Bootstrap 包的内容 基本结构:Bootstrap 提供了一个带有网格系统.链接样式.背景的基本结构.这将在 Bootst

  • .NET客户端实现Redis中的管道(PipeLine)与事物(Transactions)

    序言 Redis中的管道(PipeLine)特性:简述一下就是,Redis如何从客户端一次发送多个命令,服务端到客户端如何一次性响应多个命令. Redis使用的是客户端-服务器模型和请求/响应协议的TCP服务器,这就意味着一个请求要有以下步骤才能完成:1.客户端向服务器发送查询命令,然后通常以阻塞的方式等待服务器相应.2.服务器处理查询命令,并将相应发送回客户端.这样便会通过网络连接,如果是本地回环接口那么就能特别迅速的响应,但是如果走外网,甚至外网再做一系列的层层转发,那就显的格外蛋疼.无论网

  • python使用pipeline批量读写redis的方法

    用了很久的redis了.随着业务的要求越来越高.对redis的读写速度要求也越来越高.正好最近有个需求(需要在秒级取值1000+的数据),如果对于传统的单词取值,循环取值,消耗实在是大,有小伙伴可能考虑到多线程,但这并不是最好的解决方案,这里考虑到了redis特有的功能pipeline管道功能. 下面就更大家演示一下pipeline在python环境下的使用情况. 1.插入数据 >>> import redis >>> conn = redis.Redis(host='

  • golang pprof 监控系列 go trace统计原理与使用解析

    目录 引言 go trace 使用 统计原理介绍 Goroutine analysis Execution Network wait Sync block,Blocking syscall,Scheduler wait 各种profile 图 引言 服务监控系列文章 服务监控系列视频 关于go tool trace的使用,网上有相当多的资料,但拿我之前初学golang的经验来讲,很多资料都没有把go tool trace中的相关指标究竟是统计的哪些方法,统计了哪段区间讲解清楚.所以这篇文章不仅仅

  • redis查看连接数及php模拟并发创建redis连接的方法

    max_redis.php <?php set_time_limit (0); for($i=1;$i<=1050;$i++){ exec("nohup php /var/www/html/big/link_redis.php > /dev/null &"); } link_redis.php <?php set_time_limit (0); $redis = new redis(); $redis->pconnect('localhost',

  • 详解简单基于spring的redis配置(单机和集群模式)

    需要的jar包:spring版本:4.3.6.RELEASE,jedis版本:2.9.0,spring-data-redis:1.8.0.RELEASE:如果使用jackson序列化的话还额外需要:jackson-annotations和jackson-databind包 spring集成redis单机版: 1.配置RedisTemplate <bean id="redisTemplate" class="org.springframework.data.redis.c

  • Java通俗易懂系列设计模式之责任链模式

    概述 责任链设计模式是行为设计模式之一. 责任链模式用于在软件设计中实现松散耦合,其中来自客户端的请求被传递到对象链以处理它们.然后链中的对象将自己决定谁将处理请求以及是否需要将请求发送到链中的下一个对象. JDK中的责任链模式示例 让我们看一下JDK中责任链模式的例子,然后我们将继续实现这种模式的真实例子.我们知道在try-catch块代码中我们可以有多个catch块.这里每个catch块都是处理该特定异常的处理器. 因此当try块中发生任何异常时,它会发送到第一个catch块进行处理.如果c

  • Redis 的查询很快的原因解析及Redis 如何保证查询的高效

    目录 Redis如何保证高效的查询效率 为什么Redis比较快 Redis中的数据结构 1.简单动态字符串 2.链表 3.字典 4.跳表 5.整数数组 6.压缩列表 为什么单线程还能很快 基于多路复用的高性能I/O模型 单线程处理IO请求性能瓶颈 总结 参考 Redis 如何保证高效的查询效率 为什么 Redis 比较快 Redis 中的查询速度为什么那么快呢? 1.因为它是内存数据库: 2.归功于它的数据结构: 3.Redis 中是单线程: 4.Redis 中使用了多路复用. Redis 中的

  • 在Redis集群中使用pipeline批量插入的实现方法

    由于项目中需要使用批量插入功能, 所以在网上查找到了Redis 批量插入可以使用pipeline来高效的插入, 示例代码如下: String key = "key"; Jedis jedis = new Jedis("xx.xx.xx.xx"); Pipeline p = jedis.pipelined(); List<String> myData = .... //要插入的数据列表 for(String data: myData){ p.hset(ke

随机推荐