在Redis集群中使用pipeline批量插入的实现方法

由于项目中需要使用批量插入功能, 所以在网上查找到了Redis 批量插入可以使用pipeline来高效的插入, 示例代码如下:

String key = "key";
Jedis jedis = new Jedis("xx.xx.xx.xx");
Pipeline p = jedis.pipelined();
List<String> myData = .... //要插入的数据列表
for(String data: myData){
  p.hset(key, data);
}
p.sync();
jedis.close();

但实际上遇到的问题是,项目上所用到的Redis是集群,初始化的时候使用的类是JedisCluster而不是Jedis. 去查了JedisCluster的文档, 并没有发现提供有像Jedis一样的获取Pipeline对象的 pipelined()方法.

Google了一下, 发现了解决方案.

Redis集群规范有说: Redis 集群的键空间被分割为 16384 个槽(slot), 集群的最大节点数量也是 16384 个。每个主节点都负责处理 16384 个哈希槽的其中一部分。当我们说一个集群处于“稳定”(stable)状态时, 指的是集群没有在执行重配置(reconfiguration)操作, 每个哈希槽都只由一个节点进行处理。

所以我们可以根据要插入的key知道这个key所对应的槽的号码, 再通过这个槽的号码从集群中找到对应Jedis. 具体实现如下

//初始化得到了jedis cluster, 如何获取HostAndPort集合代码就不写了

Set<HostAndPort> nodes = .....

JedisCluster jedisCluster = new JedisCluster(nodes);

Map<String, JedisPool> nodeMap = jedisCluster.getClusterNodes();

String anyHost = nodeMap.keySet().iterator().next();

//getSlotHostMap方法在下面有

TreeMap<Long, String> slotHostMap = getSlotHostMap(anyHost);
  private static TreeMap<Long, String> getSlotHostMap(String anyHostAndPortStr) {
    TreeMap<Long, String> tree = new TreeMap<Long, String>();
    String parts[] = anyHostAndPortStr.split(":");
    HostAndPort anyHostAndPort = new HostAndPort(parts[0], Integer.parseInt(parts[1]));
    try{
      Jedis jedis = new Jedis(anyHostAndPort.getHost(), anyHostAndPort.getPort());
      List<Object> list = jedis.clusterSlots();
      for (Object object : list) {
        List<Object> list1 = (List<Object>) object;
        List<Object> master = (List<Object>) list1.get(2);
        String hostAndPort = new String((byte[]) master.get(0)) + ":" + master.get(1);
        tree.put((Long) list1.get(0), hostAndPort);
        tree.put((Long) list1.get(1), hostAndPort);
      }
      jedis.close();
    }catch(Exception e){

    }
    return tree;
  }

上面这几步可以在初始化的时候就完成. 不需要每次都调用, 把nodeMap和slotHostMap都定义为静态变量.

//获取槽号

int slot = JedisClusterCRC16.getSlot(key); 

//获取到对应的Jedis对象

Map.Entry<Long, String> entry = slotHostMap.lowerEntry(Long.valueOf(slot));

Jedis jedis = nodeMap.get(entry.getValue()).getResource();

建议上面这步操作可以封装成一个静态方法, 比如命名为public static Jedis getJedisByKey(String key) 之类的. 意思就是在集群中, 通过key获取到这个key所对应的Jedis对象.

这样再通过上面的jedis.pipelined();来就可以进行批量插入了.

注:这个方法是从Google上搜来的, 直到目前我使用起来还没发现什么问题. 如果哪位大神发现有什么不对的地方欢迎提出来.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 详解Java使用Pipeline对Redis批量读写(hmset&hgetall)

    一般情况下,Redis Client端发出一个请求后,通常会阻塞并等待Redis服务端处理,Redis服务端处理完后请求命令后会将结果通过响应报文返回给Client. 感觉这有点类似于HBase的Scan,通常是Client端获取每一条记录都是一次RPC调用服务端. 在Redis中,有没有类似HBase Scanner Caching的东西呢,一次请求,返回多条记录呢? 有,这就是Pipline.官方介绍 http://redis.io/topics/pipelining 通过pipeline方

  • redis通过pipeline提升吞吐量的方法

    案例目标 简单介绍 redis pipeline 的机制,结合一段实例说明pipeline 在提升吞吐量方面发生的效用. 案例背景 应用系统在数据推送或事件处理过程中,往往出现数据流经过多个网元: 然而在某些服务中,数据操作对redis 是强依赖的,在最近的一次分析中发现: 一次数据推送会对 redis 产生近30次读写操作! 在数据推送业务中的性能压测中,以数据上报 -> 下发应答为一次事务:而对于这样的读写模型,redis 的操作过于频繁,很快便导致系统延时过高,吞吐量低下,无法满足目标:

  • .NET客户端实现Redis中的管道(PipeLine)与事物(Transactions)

    序言 Redis中的管道(PipeLine)特性:简述一下就是,Redis如何从客户端一次发送多个命令,服务端到客户端如何一次性响应多个命令. Redis使用的是客户端-服务器模型和请求/响应协议的TCP服务器,这就意味着一个请求要有以下步骤才能完成:1.客户端向服务器发送查询命令,然后通常以阻塞的方式等待服务器相应.2.服务器处理查询命令,并将相应发送回客户端.这样便会通过网络连接,如果是本地回环接口那么就能特别迅速的响应,但是如果走外网,甚至外网再做一系列的层层转发,那就显的格外蛋疼.无论网

  • python使用pipeline批量读写redis的方法

    用了很久的redis了.随着业务的要求越来越高.对redis的读写速度要求也越来越高.正好最近有个需求(需要在秒级取值1000+的数据),如果对于传统的单词取值,循环取值,消耗实在是大,有小伙伴可能考虑到多线程,但这并不是最好的解决方案,这里考虑到了redis特有的功能pipeline管道功能. 下面就更大家演示一下pipeline在python环境下的使用情况. 1.插入数据 >>> import redis >>> conn = redis.Redis(host='

  • 详解redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作

    前段时间在做用户画像的时候,遇到了这样的一个问题,记录某一个商品的用户购买群,刚好这种需求就可以用到Redis中的Set,key作为productID,value就是具体的customerid集合,后续的话,我就可以通过productid来查看该customerid是否买了此商品,如果购买了,就可以有相关的关联推荐,当然这只是系统中的一个小业务条件,这时候我就可以用到SADD操作方法,代码如下: static void Main(string[] args) { ConnectionMultip

  • 在Redis集群中使用pipeline批量插入的实现方法

    由于项目中需要使用批量插入功能, 所以在网上查找到了Redis 批量插入可以使用pipeline来高效的插入, 示例代码如下: String key = "key"; Jedis jedis = new Jedis("xx.xx.xx.xx"); Pipeline p = jedis.pipelined(); List<String> myData = .... //要插入的数据列表 for(String data: myData){ p.hset(ke

  • redis 集群批量操作实现

    Redis集群是没法执行批量操作命令的,如mget,pipeline等.这是因为redis将集群划分为16383个哈希槽,不同的key会划分到不同的槽中.但是,Jedis客户端提供了计算key的slot方法,已经slot和节点之间的映射关系,通过这两个数据,就可以计算出每个key所在的节点,然后使用pipeline获取数据.具体代码如下: 初始化    JedisCluster类 @Configuration public class JedisClusterConfig { @Value("$

  • redis集群实现清理前缀相同的key

    目录 redis集群清理前缀相同的key 原来的定期清理脚本的逻辑 redis集群(jedis)批量删除同一前缀 redis集群清理前缀相同的key 最近经常收到redis集群告警,每天收到50多封邮件,实在不胜其烦,内存不够用,原因是有一些无用的key(约3000万)占用内存(具体不说了).这部分内存不能被释放. 原来的定期清理脚本的逻辑 打开一个redis链接,在内部循环从1000万到7亿之间的数据,然后加上前缀去批量删除,这种方式属于广撒网式的清理,穷举法,不但耗时,效果也不好. 因为有的

  • Redis集群搭建全记录

    Redis集群是一个提供在多个Redis节点间共享数据的程序集. Redis集群中不支持处理多个keys的命令. Redis集群通过分区来提供一定程度的可用性.在某个节点宕机或者不可用的时候可以继续处理命令. Redis集群数据分片 在Redis集群中,使用数据分片(sharding)而不是一致性hash(consistency hashing)来实现,一个Redis集群包含16384个哈希槽(hash slot),数据库中的每个键都存在这些哈希槽中的某一个,通过CRC16校验后对16384取模

  • redis集群搭建教程及遇到的问题处理

    这里,在一个Linux虚拟机上搭建6个节点的redis伪集群,思路很简单,一台虚拟机上开启6个redis实例,每个redis实例有自己的端口.这样的话,相当于模拟出了6台机器了,然后在以这6个实例组建redis集群就可以了. 前提:redis已经安装,目录为/usr/local/redis-4.0.1 如不会,可以参考一下文章  windows下安装redis    Linux下安装redis redis集群是用的ruby脚本,所以要想执行该脚本,需要ruby环境..对应redis的源码src目

  • redis集群规范详解

    本文档翻译自 http://redis.io/topics/cluster-spec . 引言 这个文档是正在开发中的 Redis 集群功能的规范(specification)文档, 文档分为两个部分: 第一部分介绍目前已经在 unstable 分支中实现了的那些功能. 第二部分介绍目前仍未实现的那些功能. 文档各个部分的内容可能会随着集群功能的设计修改而发生改变, 其中, 未实现功能发生修改的几率比已实现功能发生修改的几率要高. 这个规范包含了编写客户端库(client library)所需的

  • Redis集群与SSM整合使用方法

    首先是创建redis-cluster文件夹: 因为redis最少需要6个节点(三主三从),为了更好的理解,我这里创建了两台虚拟机(192.168.0.109 192.168.0.110),分别在两台虚拟机的/opt/redis-4.0.1/redis-cluster下创建三个节点文件夹 192.168.0.109: 192.168.0.110: 以上6个节点全部创建完成,分别再在这六个文件夹下创建redis.conf配置文件,其中配置如图: port 7000 bind 192.168.0.10

  • Redis集群的相关详解

    注意!要求使用的都是redis3.0以上的版本,因为3.0以上增加了redis集群的功能. 1.redis介绍 1.1什么是redis Redis是用C语言开发的一个开源的高性能键值对(key-value)的非关系型数据库.通过多种键值数据类型来适应不同场景下的存储需求,目前支持的键值数据类型有: 字符串,散列,列表,集合,有序集合 2.2应用场景 缓存(数据查询.短连接.新闻内容.商品内容等等).(最多使用) 分布式集群架构中的session分离. 聊天室的在线好友列表. 任务队列.(秒杀.抢

  • 详解如何清理redis集群的所有数据

    1. 背景:生产测试后redis中产生大量数据 生产前需要清理reids集群中的数据.. 你看有很多key呢: 使用工具 使用命令,查看是否有数据: keys * 2. 清理步骤 2.1 任意登录一台redis机器 执行下面脚本: clear_redis_cluster.sh 10.1.33.101:8001 redis 执行日志如下: Clearing 10.1.33.112:8028 ... Background append only file rewriting started READ

  • 在K8s上部署Redis集群的方法步骤

    一.前言 架构原理:每个Master都可以拥有多个Slave.当Master下线后,Redis集群会从多个Slave中选举出一个新的Master作为替代,而旧Master重新上线后变成新Master的Slave. 二.准备操作 本次部署主要基于该项目:https://github.com/zuxqoj/kubernetes-redis-cluster 其包含了两种部署Redis集群的方式: StatefulSet Service&Deployment 两种方式各有优劣,对于像Redis.Mong

随机推荐