redis 集群批量操作实现

Redis集群是没法执行批量操作命令的,如mget,pipeline等。这是因为redis将集群划分为16383个哈希槽,不同的key会划分到不同的槽中。但是,Jedis客户端提供了计算key的slot方法,已经slot和节点之间的映射关系,通过这两个数据,就可以计算出每个key所在的节点,然后使用pipeline获取数据。具体代码如下:

初始化    JedisCluster类

@Configuration
public class JedisClusterConfig {

    @Value("${spring.redis.cluster.nodes}")
    private String clusterNodes;

    @Value("${spring.redis.cache.commandTimeout}")
    private Integer commandTimeout;

    @Bean
    public JedisCluster getJedisCluster() {

        String[] serverArray = clusterNodes.split(",");
        Set<HostAndPort> nodes = new HashSet<>();
        for (String ipPort : serverArray) {
            String[] ipPortPair = ipPort.split(":");
            nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
        }
        return new JedisCluster(nodes, commandTimeout);
    }
}

工具类 JedisClusterUtil

@Component
public class JedisClusterUtil {

    @Autowired
    private JedisCluster jedisCluster;

    @Resource(name = "redisTemplate4Json")
    protected RedisTemplate<String, Object> redisTemplate;

    /**
     * ZSet批量查询
     * @param keys
     * @return
     */
    public List<Object> batchZRange(List<String> keys) {

        List<Object> resList = new ArrayList<>();
        if (keys == null || keys.size() == 0) {
            return resList;
        }

        if (keys.size() == 1) {
            BoundZSetOperations<String, Object> operations = redisTemplate.boundZSetOps(keys.get(0));
            Set<Object> set = operations.reverseRange(0, 0);
            resList.add(set.iterator().next());
            return resList;
        }

        Map<JedisPool, List<String>> jedisPoolMap = getJedisPool(keys);

        List<String> keyList;
        JedisPool currentJedisPool = null;
        Pipeline currentPipeline;
        List<Object> res = new ArrayList<>();
        Map<String, Object> resultMap = new HashMap<>();

        //执行
        for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) {
            Jedis jedis = null;
            try {
                currentJedisPool = entry.getKey();
                keyList = entry.getValue();
                //获取pipeline
                jedis = currentJedisPool.getResource();
                currentPipeline = jedis.pipelined();
                for (String key : keyList) {
                    currentPipeline.zrevrange(key, 0, 0);
                }
                //从pipeline中获取结果
                res = currentPipeline.syncAndReturnAll();
                currentPipeline.close();
                for (int i = 0; i < keyList.size(); i++) {
                    if (null == res.get(i)) {
                        resultMap.put(keyList.get(i), null);
                    } else {
                        Set<Object> set = (Set<Object>) res.get(i);
                        if (null == set || set.isEmpty()) {
                            resultMap.put(keyList.get(i), null);
                        } else {
                            byte[] byteStr = set.iterator().next().toString().getBytes();
                            Object obj = redisTemplate.getDefaultSerializer().deserialize(byteStr);
                            resultMap.put(keyList.get(i), obj);
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                returnResource(jedis, currentJedisPool);
            }
        }
        resList = sortList(keys, resultMap);
        return resList;
    }

    /**
     * Value批量查询
     * @param keys
     * @return
     */
    public List<Object> batchGet(List<String> keys){
        List<Object> resList = new ArrayList<>();
        if (keys == null || keys.size() == 0) {
            return resList;
        }

        if (keys.size() == 1) {
            BoundValueOperations<String, Object> operations = redisTemplate.boundValueOps(keys.get(0));
            resList.add(operations.get());
            return resList;
        }

        Map<JedisPool, List<String>> jedisPoolMap = getJedisPool(keys);

        List<String> keyList;
        JedisPool currentJedisPool = null;
        Pipeline currentPipeline;
        List<Object> res = new ArrayList<>();
        Map<String, Object> resultMap = new HashMap<>();

        for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) {
            Jedis jedis = null;
            try {
                currentJedisPool = entry.getKey();
                keyList = entry.getValue();
                //获取pipeline
                jedis = currentJedisPool.getResource();
                currentPipeline = jedis.pipelined();
                for (String key : keyList) {
                    currentPipeline.get(key);
                }
                //从pipeline中获取结果
                res = currentPipeline.syncAndReturnAll();
                currentPipeline.close();
                for (int i = 0; i < keyList.size(); i++) {
                    if (null == res.get(i)) {
                        resultMap.put(keyList.get(i), null);
                    } else {
                        byte[] byteStr = keyList.get(i).toString().getBytes();
                        Object obj = redisTemplate.getDefaultSerializer().deserialize(byteStr);
                        resultMap.put(keyList.get(i), obj);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                returnResource(jedis, currentJedisPool);
            }
        }
        resList = sortList(keys, resultMap);
        return resList;
    }

    private Map<JedisPool, List<String>> getJedisPool(List<String> keys){
        //JedisCluster继承了BinaryJedisCluster
        //BinaryJedisCluster的JedisClusterConnectionHandler属性
        //里面有JedisClusterInfoCache,根据这一条继承链,可以获取到JedisClusterInfoCache
        //从而获取slot和JedisPool直接的映射
        MetaObject metaObject = SystemMetaObject.forObject(jedisCluster);
        JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");
        //保存地址+端口和命令的映射
        Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>();

        JedisPool currentJedisPool = null;
        List<String> keyList;
        for (String key : keys) {
            //计算哈希槽
            int crc = JedisClusterCRC16.getSlot(key);
            //通过哈希槽获取节点的连接
            currentJedisPool = cache.getSlotPool(crc);

            //由于JedisPool作为value保存在JedisClusterInfoCache中的一个map对象中,每个节点的
            //JedisPool在map的初始化阶段就是确定的和唯一的,所以获取到的每个节点的JedisPool都是一样
            //的,可以作为map的key
            if (jedisPoolMap.containsKey(currentJedisPool)) {
                jedisPoolMap.get(currentJedisPool).add(key);
            } else {
                keyList = new ArrayList<>();
                keyList.add(key);
                jedisPoolMap.put(currentJedisPool, keyList);
            }
        }
        return jedisPoolMap;
    }

    private List<Object> sortList(List<String> keys, Map<String, Object> params) {
        List<Object> resultList = new ArrayList<>();
        Iterator<String> it = keys.iterator();
        while (it.hasNext()) {
            String key = it.next();
            resultList.add(params.get(key));
        }
        return resultList;
    }

    /**
     * 释放jedis资源
     *
     * @param jedis
     */
    public void returnResource(Jedis jedis, JedisPool jedisPool) {
        if (jedis != null && jedisPool != null) {
            jedisPool.returnResource(jedis);
        }
    }

注意:一定要完成后释放 jedis 资源  不然会造成卡死现象

到此这篇关于redis 集群批量操作实现的文章就介绍到这了,更多相关redis 集群批量操作内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 详解redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作

    前段时间在做用户画像的时候,遇到了这样的一个问题,记录某一个商品的用户购买群,刚好这种需求就可以用到Redis中的Set,key作为productID,value就是具体的customerid集合,后续的话,我就可以通过productid来查看该customerid是否买了此商品,如果购买了,就可以有相关的关联推荐,当然这只是系统中的一个小业务条件,这时候我就可以用到SADD操作方法,代码如下: static void Main(string[] args) { ConnectionMultip

  • 用python 批量操作redis数据库

    方法一:使用 pipeline 使用pipelining 发送命令时,redis server必须部分请求放到队列中(使用内存)执行完毕后一次性发送结果,在 pipeline 使用期间,将"独占"链接,无法进行非"管道"类型的其他操作,直至 pipeline 关闭:如果 pipeline 的指令集很多很庞大,为了不影响其他操作(redis 最大时间lua-time-limit默认是5s),可以使用其他新建新链接操作.批量操作如下: import redis r =

  • redis 集群批量操作实现

    Redis集群是没法执行批量操作命令的,如mget,pipeline等.这是因为redis将集群划分为16383个哈希槽,不同的key会划分到不同的槽中.但是,Jedis客户端提供了计算key的slot方法,已经slot和节点之间的映射关系,通过这两个数据,就可以计算出每个key所在的节点,然后使用pipeline获取数据.具体代码如下: 初始化    JedisCluster类 @Configuration public class JedisClusterConfig { @Value("$

  • 详解三分钟快速搭建分布式高可用的Redis集群

    这里的Redis集群指的是Redis Cluster,它是Redis在3.0版本正式推出的专用集群方案,有效地解决了Redis分布式方面的需求.当单机内存.并发.流量等遇到瓶颈的时候,可以采用这种Redis Cluster方案进行解决. 分区规则 Redis Cluster采用虚拟槽(slot)进行数据分区,即使用分散度良好的哈希函数把所有键映射到一个固定范围的整数集合里,这里的整数就是槽(slot).Redis Cluster槽的范围是0~16383,计算公式:slot=CRC16(key)

  • 比较几种Redis集群方案

    目录 一.概述 二.Redis高可用集群搭建 三.Redis集群节点间的通信机制 3.1.集中式 3.2.gossip 四.网络抖动 五.Redis集群选举原理分析 5.1.集群是否完整才能对外提供服务 5.2.Redis集群为什么至少需要三个master节点,并且推荐节点数为奇数? 5.3.哨兵leader选举流程 六.新增/删除节点 一.概述 在Redis3.0以前的集群一般是借助哨兵sentinel工具来监控主节点的状态,如果主节点异常,则会做主从切换,将某一台slave作为master.

  • Redis集群搭建全记录

    Redis集群是一个提供在多个Redis节点间共享数据的程序集. Redis集群中不支持处理多个keys的命令. Redis集群通过分区来提供一定程度的可用性.在某个节点宕机或者不可用的时候可以继续处理命令. Redis集群数据分片 在Redis集群中,使用数据分片(sharding)而不是一致性hash(consistency hashing)来实现,一个Redis集群包含16384个哈希槽(hash slot),数据库中的每个键都存在这些哈希槽中的某一个,通过CRC16校验后对16384取模

  • redis集群搭建教程及遇到的问题处理

    这里,在一个Linux虚拟机上搭建6个节点的redis伪集群,思路很简单,一台虚拟机上开启6个redis实例,每个redis实例有自己的端口.这样的话,相当于模拟出了6台机器了,然后在以这6个实例组建redis集群就可以了. 前提:redis已经安装,目录为/usr/local/redis-4.0.1 如不会,可以参考一下文章  windows下安装redis    Linux下安装redis redis集群是用的ruby脚本,所以要想执行该脚本,需要ruby环境..对应redis的源码src目

  • redis集群规范详解

    本文档翻译自 http://redis.io/topics/cluster-spec . 引言 这个文档是正在开发中的 Redis 集群功能的规范(specification)文档, 文档分为两个部分: 第一部分介绍目前已经在 unstable 分支中实现了的那些功能. 第二部分介绍目前仍未实现的那些功能. 文档各个部分的内容可能会随着集群功能的设计修改而发生改变, 其中, 未实现功能发生修改的几率比已实现功能发生修改的几率要高. 这个规范包含了编写客户端库(client library)所需的

  • 详解docker搭建redis集群的环境搭建

    本文介绍了docker搭建redis集群的环境搭建,分享给大家,废话不多说,具体如下: 下载镜像 docker pull redis 准备配置文件 mkdir /home/docker/redis/ wget https://raw.githubusercontent.com/antirez/redis/3.0/redis.conf -O /home/docker/redis/redis.conf cd /home/docker/redis/ sed -i 's/# slaveof <maste

  • Laravel框架实现redis集群的方法分析

    本文实例讲述了Laravel框架实现redis集群的方法.分享给大家供大家参考,具体如下: 在app/config/database.php中配置如下: 'redis' => array( 'cluster' => true, 'default' => array( 'host' => '172.21.107.247', 'port' => 6379, ), 'redis1' => array( 'host' => '172.21.107.248', 'port'

  • 详细分析Redis集群故障

    故障表象: 业务层面显示提示查询redis失败 集群组成: 3主3从,每个节点的数据有8GB 机器分布: 在同一个机架中, xx.x.xxx.199 xx.x.xxx.200 xx.x.xxx.201 redis-server进程状态: 通过命令ps -eo pid,lstart | grep $pid, 发现进程已经持续运行了3个月 发生故障前集群的节点状态: xx.x.xxx.200:8371(bedab2c537fe94f8c0363ac4ae97d56832316e65) master

  • redis集群搭建_动力节点Java学院整理

    现在项目上用redis的话,很少说不用集群的情况,毕竟如果生产上只有一台redis会有极大的风险,比如机器挂掉,或者内存爆掉,就比如我们生产环境曾今也遭遇到这种情况,导致redis内存不够挂掉的情况,当然这些都是我们及其不能容忍的,第一个必须要做到高可靠,其次才是高性能,好了,下面我来逐一搭建一下. 一:Redis集群搭建 1. 下载 首先去官网下载较新的3.2.0版本,下载方式还是非常简单的,比如官网介绍的这样. $ wget http://download.redis.io/releases

随机推荐