redis通过pipeline提升吞吐量的方法

案例目标

简单介绍 redis pipeline 的机制,结合一段实例说明pipeline 在提升吞吐量方面发生的效用。

案例背景

应用系统在数据推送或事件处理过程中,往往出现数据流经过多个网元;

然而在某些服务中,数据操作对redis 是强依赖的,在最近的一次分析中发现:

一次数据推送会对 redis 产生近30次读写操作!

在数据推送业务中的性能压测中,以数据上报 -> 下发应答为一次事务;而对于这样的读写模型,redis 的操作过于频繁,很快便导致系统延时过高,吞吐量低下,无法满足目标;

优化过程 主要针对业务代码做的优化,其中redis 操作经过大量合并,最终降低到原来的1/5,而系统吞吐量也提升明显。

其中,redis pipeline(管道机制) 的应用是一个关键手段。

pipeline的解释

Pipeline指的是管道技术,指的是客户端允许将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可。

管道技术使用广泛,例如许多POP3协议已经实现支持这个功能,大大加快了从服务器下载新邮件的过程。 Redis很早就支持管道(pipeline)技术。(因此无论你运行的是什么版本,你都可以使用管道(pipelining)操作Redis)

普通请求模型

[图-pipeline1]

Pipeline请求模型

[图-pipeline2]

从两个图的对比中可看出,普通的请求模型是同步的,每次请求对应一次IO操作等待;

而Pipeline 化之后所有的请求合并为一次IO,除了时延可以降低之外,还能大幅度提升系统吞吐量。

代码实例

说明

本地开启50个线程,每个线程完成1000个key的写入,对比pipeline开启及不开启两种场景下的性能表现。

相关常量

// 并发任务
private static final int taskCount = 50;
// pipeline大小
private static final int batchSize = 10;
// 每个任务处理命令数
private static final int cmdCount = 1000;

private static final boolean usePipeline = true;

初始化连接

JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxActive(200);
poolConfig.setMaxIdle(100);
poolConfig.setMaxWait(2000);
poolConfig.setTestOnBorrow(false);
poolConfig.setTestOnReturn(false);

jedisPool = new JedisPool(poolConfig, host, port);

并发启动任务,统计执行时间

public static void main(String[] args) throws InterruptedException {
  init();

  flushDB();

  long t1 = System.currentTimeMillis();
  ExecutorService executor = Executors.newCachedThreadPool();

  CountDownLatch latch = new CountDownLatch(taskCount);
  for (int i = 0; i < taskCount; i++) {
   executor.submit(new DemoTask(i, latch));
  }

  latch.await();
  executor.shutdownNow();

  long t2 = System.currentTimeMillis();

  System.out.println("execution finish time(s):" + (t2 - t1) / 1000.0);

 }

DemoTask 封装了执行key写入的细节,区分不同场景

 public void run() {
   logger.info("Task[{}] start.", id);
   try {
    if (usePipeline) {
     runWithPipeline();
    } else {
     runWithNonPipeline();
    }
   } finally {
    latch.countDown();
   }

   logger.info("Task[{}] end.", id);
  }

不使用Pipeline的场景比较简单,循环执行set操作

for (int i = 0; i < cmdCount; i++) {
    Jedis jedis = get();
    try {
     jedis.set(key(i), UUID.randomUUID().toString());
    } finally {
     if (jedis != null) {
      jedisPool.returnResource(jedis);
     }
    }
    if (i % batchSize == 0) {
     logger.info("Task[{}] process -- {}", id, i);
    }
   }

使用Pipeline,需要处理分段,如10个作为一批命令执行

for (int i = 0; i < cmdCount;) {
    Jedis jedis = get();

    try {
     Pipeline pipeline = jedis.pipelined();
     int j;
     for (j = 0; j < batchSize; j++) {
      if (i + j < cmdCount) {
       pipeline.set(key(i + j), UUID.randomUUID().toString());
      } else {
       break;
      }
     }
     pipeline.sync();
     logger.info("Task[{}] pipeline -- {}", id, i + j);

     i += j;

    } finally {
     if (jedis != null) {
      jedisPool.returnResource(jedis);
     }
    }

   }

运行结果

不使用Pipeline,整体执行26s;而使用Pipeline优化后的代码,执行时间仅需要3s!

NoPipeline-stat

[图-nopipeline]

Pipeline-stat

[图-pipeline]

注意事项

pipeline机制可以优化吞吐量,但无法提供原子性/事务保障,而这个可以通过Redis-Multi等命令实现。

参考这里

部分读写操作存在相关依赖,无法使用pipeline实现,可利用Script机制,但需要在可维护性方面做好取舍。

扩展阅读

官方文档-Redis-Pipelining

官方文档-Redis-Transaction

以上这篇redis通过pipeline提升吞吐量的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 在Redis集群中使用pipeline批量插入的实现方法

    由于项目中需要使用批量插入功能, 所以在网上查找到了Redis 批量插入可以使用pipeline来高效的插入, 示例代码如下: String key = "key"; Jedis jedis = new Jedis("xx.xx.xx.xx"); Pipeline p = jedis.pipelined(); List<String> myData = .... //要插入的数据列表 for(String data: myData){ p.hset(ke

  • .NET客户端实现Redis中的管道(PipeLine)与事物(Transactions)

    序言 Redis中的管道(PipeLine)特性:简述一下就是,Redis如何从客户端一次发送多个命令,服务端到客户端如何一次性响应多个命令. Redis使用的是客户端-服务器模型和请求/响应协议的TCP服务器,这就意味着一个请求要有以下步骤才能完成:1.客户端向服务器发送查询命令,然后通常以阻塞的方式等待服务器相应.2.服务器处理查询命令,并将相应发送回客户端.这样便会通过网络连接,如果是本地回环接口那么就能特别迅速的响应,但是如果走外网,甚至外网再做一系列的层层转发,那就显的格外蛋疼.无论网

  • python使用pipeline批量读写redis的方法

    用了很久的redis了.随着业务的要求越来越高.对redis的读写速度要求也越来越高.正好最近有个需求(需要在秒级取值1000+的数据),如果对于传统的单词取值,循环取值,消耗实在是大,有小伙伴可能考虑到多线程,但这并不是最好的解决方案,这里考虑到了redis特有的功能pipeline管道功能. 下面就更大家演示一下pipeline在python环境下的使用情况. 1.插入数据 >>> import redis >>> conn = redis.Redis(host='

  • 详解Java使用Pipeline对Redis批量读写(hmset&hgetall)

    一般情况下,Redis Client端发出一个请求后,通常会阻塞并等待Redis服务端处理,Redis服务端处理完后请求命令后会将结果通过响应报文返回给Client. 感觉这有点类似于HBase的Scan,通常是Client端获取每一条记录都是一次RPC调用服务端. 在Redis中,有没有类似HBase Scanner Caching的东西呢,一次请求,返回多条记录呢? 有,这就是Pipline.官方介绍 http://redis.io/topics/pipelining 通过pipeline方

  • Redis利用Pipeline加速查询速度的方法

    1. RTT Redis 是一种基于客户端-服务端模型以及请求/响应协议的TCP服务.这意味着通常情况下 Redis 客户端执行一条命令分为如下四个过程: 发送命令 命令排队 命令执行 返回结果 客户端向服务端发送一个查询请求,并监听Socket返回,通常是以阻塞模式,等待服务端响应.服务端处理命令,并将结果返回给客户端.客户端和服务端通过网络进行连接.这个连接可以很快,也可能很慢.无论网络如何延迟,数据包总是能从客户端到达服务端,服务端返回数据给客户端. 这个时间被称为 RTT (Round

  • 详解redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作

    前段时间在做用户画像的时候,遇到了这样的一个问题,记录某一个商品的用户购买群,刚好这种需求就可以用到Redis中的Set,key作为productID,value就是具体的customerid集合,后续的话,我就可以通过productid来查看该customerid是否买了此商品,如果购买了,就可以有相关的关联推荐,当然这只是系统中的一个小业务条件,这时候我就可以用到SADD操作方法,代码如下: static void Main(string[] args) { ConnectionMultip

  • redis通过pipeline提升吞吐量的方法

    案例目标 简单介绍 redis pipeline 的机制,结合一段实例说明pipeline 在提升吞吐量方面发生的效用. 案例背景 应用系统在数据推送或事件处理过程中,往往出现数据流经过多个网元: 然而在某些服务中,数据操作对redis 是强依赖的,在最近的一次分析中发现: 一次数据推送会对 redis 产生近30次读写操作! 在数据推送业务中的性能压测中,以数据上报 -> 下发应答为一次事务:而对于这样的读写模型,redis 的操作过于频繁,很快便导致系统延时过高,吞吐量低下,无法满足目标:

  • redis批量操作pipeline管道操作方法

    目录 redis | pipeline(管道) 背景 什么是流水线(pipeline) 适用场景 benchmark压测pipeline 代码测试-python: StrictRedis 代码测试-java:Jedis使用pipeline pipeline注意事项 参考资料 redis | pipeline(管道) 背景 Redis是一种基于客户端-服务端模型以及请求/响应的TCP服务.这意味着通常情况下一个请求会遵循以下步骤: 客户端向服务端发送一个查询请求,并监听 Socket 返回,通常是

  • Python获取Redis所有Key以及内容的方法

    一.获取所有Key # -*- encoding: UTF-8 -*- __author__ = "Sky" import redis pool=redis.ConnectionPool(host='127.0.0.1',port=6379,db=0) r = redis.StrictRedis(connection_pool=pool) keys = r.keys() print type(keys) print keys 运行结果: <type 'list'> ['fa

  • 通过Docker部署Redis 6.x集群的方法

    系统环境: Redis 版本:6.0.8 Docker 版本:19.03.12 系统版本:CoreOS 7.8 内核版本:5.8.5-1.el7.elrepo.x86_64 一.什么是 Redis 集群模式 在 Redis 3.0 版本后正式推出 Redis 集群模式,该模式是 Redis 的分布式的解决方案,是一个提供在多个 Redis 节点间共享数据的程序集,且 Redis 集群是去中心化的,它的每个 Master 节点都可以进行读写数据,每个节点都拥有平等的关系,每个节点都保持各自的数据和

  • golang redis中Pipeline通道的使用详解

    目录 一.pipeline出现的背景 二.pipeline的用法 pipeline命令的使用 goredis库连接客户端 package client import (     "github.com/go-redis/redis"     "github.com/sirupsen/logrus" ) var MainRDS *redis.Client func init() {     ConnectRedis() } func ConnectRedis() {

  • Redis删除策略的三种方法及逐出算法

    目录 一.前言 二.Redis中的数据特征 三.时效性数据储存结构 四.数据删除策略 1.定时删除 2.惰性删除 3.定期删除 五.删除策略对比 六.逐出算法 1.概念引入 2.八种配置 一.前言 在文章开始之前,我先问大家一个问题:当我们使用指令:expire key second给一个key设置过期时间,过期时间一到,这个key对应的过期数据真的被服务器立即删除了吗?答案是并不会立即删除.知道了这个答案,就来看看Redis中如何处理过期的数据. 二.Redis中的数据特征 Redis是一种内

  • redis实现加锁的几种方法示例详解

    前言 本文主要给大家介绍了关于redis实现加锁的几种方法,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 1. redis加锁分类 redis能用的的加锁命令分表是INCR.SETNX.SET 2. 第一种锁命令INCR 这种加锁的思路是, key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作进行加一. 然后其它用户在执行 INCR 操作进行加一时,如果返回的数大于 1 ,说明这个锁正在被使用当中. 1. 客户端A请求服务器获取key的值为1表示

  • PHP使用Redis替代文件存储Session的方法

    本文实例讲述了PHP使用Redis替代文件存储Session的方法.分享给大家供大家参考,具体如下: PHP默认使用文件存储session,如果并发量大,效率非常低.而Redis对高并发的支持非常好,所以,可以使用redis替代文件存储session. 这里,介绍下php的session_set_save_handler 函数的作用和使用方法.该函数定义用户级session保存函数(如打开.关闭.写入等). 原型如下: bool session_set_save_hanler(callback

  • php实现的redis缓存类定义与使用方法示例

    本文实例讲述了php实现的redis缓存类定义与使用方法.分享给大家供大家参考,具体如下: php+redis缓存类 <?php class redisCache { /** * $host : redis服务器ip * $port : redis服务器端口 * $lifetime : 缓存文件有效期,单位为秒 * $cacheid : 缓存文件路径,包含文件名 */ private $host; private $port; private $lifetime; private $cachei

随机推荐