Redis+Hbase+RocketMQ 实际使用问题案例讲解

目录
  • 需求
  • 分析及确定方案
  • 实现
  • 部分代码
  • 踩坑
  • 总结

需求

  • 将Hbase数据,解析后推送到RocketMQ。
  • redis使用list数据类型,存储了需要推送的数据的RowKey及表名。

简单画个流程图就是:

分析及确定方案

Redis

  • 明确list中元素结构{"rowkey":rowkey,"table":table}解析出rowkey;
  • 一次取多个元素加快效率;取了之后放入重试队列,并删除原来的元素;
  • 处理数据永远是重试队列里的,成功之后删除,失败就加上重试次数并重新放回;
  • 明确从list中取值所使用的redis命令;
  • 范围获取LRANGE
  • 范围删除(留下指定范围的数据)LTRIM
  • 判断list长度LLEN
  • 加入listRPUSH;删除LREM等等;
  • 从Hbase获取数据失败和发送到mq失败都令重试次数加一;
  • 每次碰到重试次数不为0的数据都休眠1s;
  • 设置最大重试次数,达到限制后丢弃;
  • 考虑客户redis部署方式,单机、主从、集群、哨兵等;
  • 选择合适的客户端,Jedis、Redisson、Lettuce等;
  • 编写不同的操作代码,也可以利用配置文件、环境变量、工厂模式等适配各种部署模式;

Hbase

  • 基本理论知识学习(原来没接触过),rowkey是没条数据的主键,限定符是字段名,列族是多个限定名的集合等;
  • 当时看这个觉得不错https://juejin.cn/post/6844903797655863309因为是不停读取数据、链接、Table不用close,可以缓存起来,没必要每次都创建;
  • 确定批量获取数据方式为批量Get,没用scan
  • 了解解析方式,一些网上的解析试了之后会乱码,这边用的是它自带的CellUtil.clone相关方法;
  • 考虑所有都没数据时休眠10s;

RocketMQ

  • 有现成的发送代码,公司封装好的;
  • 调整发送的速度、太快了服务端会吃不消(获取Hbase数据速度太快了,最开始没限制一会儿就入了百万数据),设置超时时间(默认3s);
  • 调整服务端的内存、线程数等参数;

实现

配置

#server configuration
server.port=8896
#log config
logging.file.path=./logs
#redis-standalone
redis.standalone.host=
redis.standalone.port=6379
redis.standalone.password=
redis.standalone.enable=true
#redis-cluster
redis.cluster.nodes=
redis.cluster.password=
redis.cluster.timeout=30000
redis.cluster.enable=false
# Zookeeper 集群地址,逗号分隔
hbase.zookeeper.quorum=
# Zookeeper 端口
hbase.zookeeper.property.clientPort=2181
# 消息目的rocketmq地址
rocketmq.server.host=
# 发送消息间隔时间,防止发送过快mq受不了
rocketmq.send.interval.millisec=10
# 每次从redis读取数据量限制。
data.access.redisDataSize=100
# 失败数据重试次数,超过的直接丢弃
data.access.retryNum=10
# 需要接入的表,需要发送到rocketmq的topic和在redis中的key的映射。xxx.xxx.xxx[topic]=redisKey
data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back
data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back

部分代码

获取配置,其余的直接@Value("${}")

@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "data.access")
public class AccessRedisMqConfig {

    /**
     * key:topic; value:redis的key
     */
    private Map<String, String> topicKeyMap = new HashMap<>();

    /**
     * 一次从redis中读取数据量限制
     */
    private long redisDataSize = 50;

    /**
     * 失败数据重试次数
     */
    private int retryNum = 10;

}

开启接入:

@Component
public class AdapterRunner implements ApplicationRunner {

    @Resource
    private DataAccessService dataAccessService;

    @Override
    public void run(ApplicationArguments args) {
        System.out.println("项目已启动,开始接入数据到RocketMQ……");
        dataAccessService.accessData2Mq();
    }
}

其他代码其实也在分析里了。

踩坑

mq发送问题

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout
	at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572)
	at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Wo

上面分析也说了,注意发送速度,有多少资源就接入多快。还有注意相关三个端口是否开放。

总结

程序很简单,主要涉及方案的是,获取redis的list数据时,是考虑效率,及加入重试策略,保证数据不丢失等。

(0)

相关推荐

  • Redis 中ZSET数据类型命令使用及对应场景总结(案例详解)

    目录 1.zadd添加元素 2.zrem 从有序集合key中删除元素 3.zscore 返回有序集合key中元素member的分值 4.zincrby 为有序集合key中元素增加分值 5.zcard获取有序集合key中元素总个数 6.zrange 正序获取分值范围内的元素 7.zrevrange 倒序获取集合元素 8.zrank获取有序集合中元素key的排名 9.zrangebyscore 获取有序集合中分数区间的元素 10.zcount 获取分值区间的元素数量 1.zadd添加元素 zadd

  • 使用redis的increment()方法实现计数器功能案例

    一直知道redis可以用来实现计数器功能,但是之前没有实际使用过,昨天碰到一个需求:用户扫码当天达到20次即提示:当日扫码次数达到上限! 当时就想到使用redis的递增方法increment()来实现计数器功能,一定要注意redisTemplate和stringRedisTemplate的使用 首先设置key: 该key我使用了用户id和当天日期作为key的一部分,date:xxxx-xx-xx格式,这样一来该用户在第二天扫码的时候又是一个新key,因为日期不同了 设置key的过期时间: 实现计

  • redis redisson 集合的使用案例(RList、Rset、RMap)

    redis redisson 集合操作 相关类及接口 Rlist:链表 public interface RList<V> extends List<V>, RExpirable, RListAsync<V>, RSortable<List<V>>, RandomAccess { List<V> get(int... var1); //获取指定的节点值 int addAfter(V var1, V var2); //在var1前添加v

  • springBoot整合redis使用案例详解

    一.创建springboot项目(采用骨架方式) 创建完成: 我们分析下pom文件中内容: 所使用到的关键依赖: <!--springBoot集成redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.5.4<

  • Spring Cache使用RedisCache案例解析

    一.RedisCache使用演示 Redis是一个key-value存储系统,在web应用上被广泛应用,这里就不对其过多描述了. 本章节示例是在Spring Boot集成Spring Cache的源码基础上进行改造.源码地址:https://github.com/imyanger/springboot-project/tree/master/p20-springboot-cache 使用RedisCache作为缓存,我们先引入相关依赖. <dependency> <groupId>

  • Redis+Hbase+RocketMQ 实际使用问题案例讲解

    目录 需求 分析及确定方案 实现 部分代码 踩坑 总结 需求 将Hbase数据,解析后推送到RocketMQ. redis使用list数据类型,存储了需要推送的数据的RowKey及表名. 简单画个流程图就是: 分析及确定方案 Redis 明确list中元素结构{"rowkey":rowkey,"table":table}解析出rowkey: 一次取多个元素加快效率:取了之后放入重试队列,并删除原来的元素: 处理数据永远是重试队列里的,成功之后删除,失败就加上重试次数

  • Redis之windows下主从复制案例讲解

    一般的主从复制功能最少是一主二从,我这里就以最低要求进行配置. 1.首先下去官网下载并安装redis 若安装成功点击redis-server  如此是成功 2.点击客户端redis-cli 连接客户端即可使用 3.新建7000.7001两个从redis  4.修改redis-windows.conf   (1)把端口修改成7000 (2)修改cluster-config-file的名字 以免和6379端口的名字重复其他配置默认即可,我个人认为我们都重新建了一个文件夹也不可能出现和6379重复的错

  • Java之操作Redis案例讲解

    首先 下载 jedis.jar包 然后再 工程设置里面找到Libraries,点击+.添加下载好的jedis.jar包.点击OK退出即可 创建Java_Control_Redis类 测试链接 package ccit.redis; import redis.clients.jedis.Jedis; public class Java_Control_Redis { public static void main(String[] args) { //连接本地的 Redis 服务 Jedis je

  • php之redis短线重连案例讲解

    php redis断线重连,pconnect连接失败问题 介绍 在swoole ,workerman等cli长连接模式下,遇到Redis异常断开,后面又开启的情况,一般得重新启动程序才能正常使用, 本文介绍在不重启服务,实现原来的Redis断线重连 原理 Redis 断开的情况下调用 $Redis->ping()会触发Notice错误,Notice: Redis::ping(): send of 14 bytes failed with errno=10054 当获取redis实例时,如果pin

  • Java之理解Redis回收算法LRU案例讲解

    如何通俗易懂的理解LRU算法? 1.LRU是什么? LRU全称Least Recently Used,也就是最近最少使用的意思,是一种内存管理算法,最早应用于Linux操作系统. LRU算法基于一种假设:长期不被使用的数据,在未来被用到的几率也不大.因此,当数据所占内存达到一定阈值时,我们要移除掉最近最少被使用的数据. LRU算法应用:可以在内存不够时,从哈希表移除一部分很少访问的用户. LRU是什么?按照英文的直接原义就是Least Recently Used,最近最久未使用法,它是按照一个非

  • php之使用docker运行workerman案例讲解

    介绍 在docker上部署gatawayWorker项目,项目中使用到mysql,redis 安装 mysql 拉取镜像 docker pull mysql:5.7 运行容器 docker run -d -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root --name m_mysql mysql:5.7 -d 后台运行 -p 端口映射,前面是宿主机端口,后面是需要映射的容器端口 -e 设置环境变量,MYSQL_ROOT_PASSWORD是mysql的root用户的初

  • 案例讲解WEB 漏洞-文件操作之文件下载读取

    目录 原理 漏洞危害 利用方式 系统文件 window Linux 常见脚本敏感文件参考 任意文件读取 任意文件下载 Google search 漏洞利用代码 漏洞挖掘 漏洞验证 漏洞防范 案例 pikuchu靶场-文件下载测试 小米路由器-文件读取真实测试-漏洞 RoarCTF2019-文件读取真题复现 百度杯2017二月-Zone真题复现 原理 产生:任意语言代码下载函数 文件下载(一些网站由于业务需求,往往需要提供文件查看或文件下载功能,但若对用户查看或下载的文件不做限制,则恶意用户就能够

  • redis在java中的使用(实例讲解)

    1.首先下载jar包放到你的工程中 2.练习 package com.jianyuan.redisTest; import java.util.Iterator; import java.util.List; import java.util.Set; import redis.clients.jedis.Jedis; public class RedisTest { public static void main(String[] args) { //连接本地的Redis服务 Jedis je

  • Redis解决库存超卖问题实例讲解

    商品和订单服务间使用MQ 商品服务的库存变化时,通过 MQ 通知订单服务库存变化. 原始的同步流程 查询商品信息 (调用商品服务) 计算总价(生成订单详情) 商品服务扣库存(调用商品服务) 订单入库( 生成订单) // 原始的MySQL同步流程 // 判断此代金券是否加入抢购 SeckillVouchers seckillVouchers = seckillVouchersMapper.selectVoucher(voucherId); AssertUtil.isTrue(seckillVouc

  • C语言异常处理机制案例讲解

    异常处理机制:setjmp()函数与longjmp()函数 C标准库提供两个特殊的函数:setjmp() 及 longjmp(),这两个函数是结构化异常的基础,正是利用这两个函数的特性来实现异常. 所以,异常的处理过程可以描述为这样: 首先设置一个跳转点(setjmp() 函数可以实现这一功能),然后在其后的代码中任意地方调用 longjmp() 跳转回这个跳转点上,以此来实现当发生异常时,转到处理异常的程序上,在其后的介绍中将介绍如何实现. setjmp() 为跳转返回保存现场并为异常提供处理

随机推荐