springboot中如何实现kafa指定offset消费

这篇文章主要介绍了springboot中如何实现kafa指定offset消费,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

kafka消费过程难免会遇到需要重新消费的场景,例如我们消费到kafka数据之后需要进行存库操作,若某一时刻数据库down了,导致kafka消费的数据无法入库,为了弥补数据库down期间的数据损失,有一种做法我们可以指定kafka消费者的offset到之前某一时间的数值,然后重新进行消费。

首先创建kafka消费服务

@Service
@Slf4j
//实现CommandLineRunner接口,在springboot启动时自动运行其run方法。
public class TspLogbookAnalysisService implements CommandLineRunner {
 @Override
 public void run(String... args) {
  //do something
 }
}

kafka消费模型建立

kafka server中每个主题存在多个分区(partition),每个分区自己维护一个偏移量(offset),我们的目标是实现kafka consumer指定offset消费。

在这里使用consumer-->partition一对一的消费模型,每个consumer各自管理自己的partition。

@Service
@Slf4j
public class TspLogbookAnalysisService implements CommandLineRunner {
 //声明kafka分区数相等的消费线程数,一个分区对应一个消费线程
 private static final int consumeThreadNum = 9;
 //特殊指定每个分区开始消费的offset
 private List<Long> partitionOffsets = Lists.newArrayList(1111,1112,1113,1114,1115,1116,1117,1118,1119);

 private ExecutorService executorService = Executors.newFixedThreadPool(consumeThreadNum);

 @Override
 public void run(String... args) {
  //循环遍历创建消费线程
  IntStream.range(0, consumeThreadNum)
    .forEach(partitionIndex -> executorService.submit(() -> startConsume(partitionIndex)));
 }
}

kafka consumer对offset的处理

声明kafka consumer的配置类

private Properties buildKafkaConfig() {
 Properties kafkaConfiguration = new Properties();
 kafkaConfiguration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
 kafkaConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "");
 kafkaConfiguration.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "");
 kafkaConfiguration.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");
 kafkaConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "");
 kafkaConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "");
 kafkaConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"");
 kafkaConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");
 ...更多配置项

 return kafkaConfiguration;
}

创建kafka consumer,处理offset,开始消费数据任务#

private void startConsume(int partitionIndex) {
 //创建kafka consumer
 KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(buildKafkaConfig());

 try {
  //指定该consumer对应的消费分区
  TopicPartition partition = new TopicPartition(kafkaProperties.getKafkaTopic(), partitionIndex);
  consumer.assign(Lists.newArrayList(partition));

  //consumer的offset处理
  if (collectionUtils.isNotEmpty(partitionOffsets) && partitionOffsets.size() == consumeThreadNum) {
   Long seekOffset = partitionOffsets.get(partitionIndex);
   log.info("partition:{} , offset seek from {}", partition, seekOffset);
   consumer.seek(partition, seekOffset);
  }

  //开始消费数据任务
  kafkaRecordConsume(consumer, partition);
 } catch (Exception e) {
  log.error("kafka consume error:{}", ExceptionUtils.getFullStackTrace(e));
 } finally {
  try {
   consumer.commitSync();
  } finally {
   consumer.close();
  }
 }
}

消费数据逻辑,offset操作

private void kafkaRecordConsume(KafkaConsumer<String, byte[]> consumer, TopicPartition partition) {
 while (true) {
  try {
   ConsumerRecords<String, byte[]> records = consumer.poll(TspLogbookConstants.POLL_TIMEOUT);
   //具体的处理流程
   records.forEach((k) -> handleKafkaInput(k.key(), k.value()));

   //🌿很重要:日志记录当前consumer的offset,partition相关信息(之后如需重新指定offset消费就从这里的日志中获取offset,partition信息)
   if (records.count() > 0) {
    String currentOffset = String.valueOf(consumer.position(partition));
    log.info("current records size is:{}, partition is: {}, offset is:{}", records.count(), consumer.assignment(), currentOffset);
   }

   //offset提交
   consumer.commitAsync();
  } catch (Exception e) {
   log.error("handlerKafkaInput error{}", ExceptionUtils.getFullStackTrace(e));
  }
 }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • div的offsetLeft与style.left区别

    clientX 事件属性返回当事件被触发时鼠标指针向对于浏览器页面(或客户区)的水平坐标. 客户区指的是当前窗口. 如果父div的position定义为relative,子div的position定义为absolute,那么子div的style.left的值是相对于父div的值,这同offsetLeft是相同的,区别在于: 1. style.left 返回的是字符串,如28px,offsetLeft返回的是数值28,如果需要对取得的值进行计算,还用offsetLeft比较方便. 2. style

  • JavaScript中offsetWidth的bug及解决方法

    offsetWidth表示对象的可见宽度. 比如: #div1 { width: 100px; height: 200px; background: red; } 结果:100 #div1 { width: 100px; height: 200px; background: red; border: 2px solid black; } 结果:104 (100 + 2 + 2) #div1 { width: 100px; height: 200px; background: red; borde

  • MySQL查询中LIMIT的大offset导致性能低下浅析

    前言 我们大家都知道,mysql查询使用select命令,配合limit,offset参数可以读取指定范围的记录,但是offset过大影响查询性能的原因及优化方法 我们在业务系统中难免少不了分页的需求.想到分页的时候,大家肯定会想到使用SQL中的LIMIT来实现.但是,如果不正确的使用LIMIT会导致性能问题(SQL执行得很慢.有可能会拖垮服务器),也会被领导批的:所以,我们来看看如何正确地使用LIMIT. 下面话不多说了,来一起看看详细的介绍吧 LIMIT OFFSET, ROW_COUNT

  • window.setInterval()方法的定义和用法及offsetLeft与style.left的区别

    定义和用法 setInterval() 方法可按照指定的周期(以毫秒计)来调用函数或计算表达式. setInterval() 方法会不停地调用函数,直到 clearInterval()被调用或窗口被关闭.由 setInterval() 返回的 ID 值可用作 clearInterval() 方法的参数. 提示: 1000 毫秒= 1 秒. 语法 setInterval(code,millisec,lang) 参数 描述 code 必需.要调用的函数或要执行的代码串. millisec 必须.周期

  • 详解原生js实现offset方法

    在为 jTool 提供 offset (获取当前节点位置)方法时, 先后使用了两种方式进行实现, 现整理出来以作记录. 前后共使用了两种方式实现了该方法, 这里将这两种方法分别列出. 通过递归实现 function offset(element) { var offest = { top: 0, left: 0 }; var _position; getOffset(element, true); return offest; // 递归获取 offset, 可以考虑使用 getBounding

  • js中offset,client , scroll 三大元素知识点总结

    js 元素offset,client , scroll 三大系列总结 1,element.offsetWidth : 包括 padding 和 边框 2,element.clientWidth : 包括 padding ,不包含边框 , 内容超出会溢出盒子的时候,就用scrollWidth 3,element.scrollWidth : 不包含边框 主要用法: 1,offset 系列 经常用于获得元素位置 offsetLeft offsetTop 2,client经常用于获取元素大小, clie

  • JavaScript中style.left与offsetLeft的使用及区别详解

    如果父div的position定义为relative,子div的position定义为absolute,那么子div的style.left的值是相对于父div的值, 这同offsetLeft是相同的,区别在于: 1. style.left 返回的是字符串,如28px,offsetLeft返回的是数值28,如果需要对取得的值进行计算,还用offsetLeft比较方便. 2. style.left是读写的,offsetLeft是只读的,所以要改变div的位置,只能修改style.left. 3. s

  • JS中offset和匀速动画详解

    offset简介 我们知道,三大家族包括:offset/scroll/client.今天来讲一下offset,以及与其相关的匀速动画. offset的中文是:偏移,补偿,位移. js中有一套方便的获取元素尺寸的办法就是offset家族.offset家族包括: offsetWidth offsetHight offsetLeft offsetTop offsetParent 下面分别介绍. 1.offsetWidth 和 offsetHight 用于检测盒子自身的宽高+padding+border

  • js获取元素的偏移量offset简单方法(必看)

    前言:以前一直是看别人写的,然后学习点东西,现在也把自己的学习记录下来,给大家一个学习的机会,欢迎大家多多评论和推荐哈,共同进步.竟然还有六个人关注我了 ,哈哈 开心.我会继续写下去的.. null和undefined都代表没有,但是null是属性存在值不存在,undefined是连这个属性都不存在 //例如 document.parentNode//浏览器天生自带的一个属性:父亲节点的属性 null (因为一个页面中的document已经是最顶级元素了,它没有父亲) document.pare

  • springboot中如何实现kafa指定offset消费

    这篇文章主要介绍了springboot中如何实现kafa指定offset消费,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 kafka消费过程难免会遇到需要重新消费的场景,例如我们消费到kafka数据之后需要进行存库操作,若某一时刻数据库down了,导致kafka消费的数据无法入库,为了弥补数据库down期间的数据损失,有一种做法我们可以指定kafka消费者的offset到之前某一时间的数值,然后重新进行消费. 首先创建kafka消费服务 @S

  • SpringBoot中使用Redis Stream实现消息监听示例

    目录 Demo环境 仓库地址 POM依赖 配置监听消息类 监听俩个stream的实现 [问题补充]确认完消息删除消息 [问题补充]自动初始化stream的key和group问题-最新更新-2021年12月4日 Demo环境 JDK8 Maven3.6.3 springboot2.4.3 redis_version:6.2.1 仓库地址 https://gitee.com/hlovez/redismq.git. POM依赖 <?xml version="1.0" encoding=

  • springboot中的springSession的存储和获取实现

    利用redis进行springSession的存储: 存储: // 在session中保存用户信息 HttpSession session = httpRequest.getSession(true); session.setAttribute(Constants.SESSION_KEY_USER + userToken, user); // 存储sessionId redisService.hmSet(Constants.SESSION_ID_KEY , userToken, session.

  • SpringBoot中使用 RabbitMQ的教程详解

    本章主要建立在已经安装好Erlang以及RabbitMQ的基础上,接下来,简单介绍一下使用 一.Direct直接模式 通过routingKey和exchange决定的那个唯一的queue可以接收消息 1.首先到RabbitMQ的管理界面新建一个队列(Direct模式) 2.测试项目的基础结构如下: 这里为了方便测试,直接在父项目中建立两个子模块(生产者和消费者) 3.pom.xml文件的依赖如下: 父项目: <?xml version="1.0" encoding="U

  • springboot中rabbitmq实现消息可靠性机制详解

    1. 生产者模块通过publisher confirm机制实现消息可靠性 1.1 生产者模块导入rabbitmq相关依赖 <!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-

  • SpringBoot中RabbitMQ集群的搭建详解

    目录 1. 两种模式 1.1 普通集群 1.2 镜像集群 1.3 节点类型 2. 搭建普通集群 2.1 预备知识 2.2 开始搭建 2.3 代码测试 2.4 反向测试 3. 搭建镜像集群 3.1 网页配置镜像队列 3.2 命令行配置镜像队列 4. 小结 单个的 RabbitMQ 肯定无法实现高可用,要想高可用,还得上集群. 今天松哥就来和大家聊一聊 RabbitMQ 集群的搭建. 1. 两种模式 说到集群,小伙伴们可能第一个问题是,如果我有一个 RabbitMQ 集群,那么是不是我的消息集群中的

  • jQuery实现将div中滚动条滚动到指定位置的方法

    本文实例讲述了jQuery实现将div中滚动条滚动到指定位置的方法.分享给大家供大家参考,具体如下: 一.Js代码: onload = function () { //初始化 scrollToLocation(); }; function scrollToLocation() { var mainContainer = $('#thisMainPanel'), scrollToContainer = mainContainer.find('.son-panel:last');//滚动到<div

  • SpringBoot中自定义注解实现控制器访问次数限制实例

    今天给大家介绍一下SpringBoot中如何自定义注解实现控制器访问次数限制. 在Web中最经常发生的就是利用恶性URL访问刷爆服务器之类的攻击,今天我就给大家介绍一下如何利用自定义注解实现这类攻击的防御操作. 其实这类问题一般的解决思路就是:在控制器中加入自定义注解实现访问次数限制的功能. 具体的实现过程看下面的例子: 步骤一:先定义一个注解类,下面看代码事例: package example.controller.limit; import org.springframework.core.

  • IOS中UITableView滚动到指定位置

    方法很简单: - (void)scrollToRowAtIndexPath:(NSIndexPath *)indexPath atScrollPosition:(UITableViewScrollPosition)scrollPosition animated:(BOOL)animated 有些需要注意的地方: 如果在reloadData后需要立即获取tableview的cell.高度,或者需要滚动tableview,那么,直接在reloadData后执行代码是有可能出问题的. reloadDa

  • SpringBoot中的Thymeleaf用法

    Thymeleaf Thymeleaf是最近SpringBoot推荐支持的模板框架,官网在thymeleaf.org这里. 我们为什么要用Thymeleaf来作为模板引擎呢?官网给了我们一个非常令人信服的解释: Thymeleaf is a modern server-side Java template engine for both web and standalone environments.> 基本写法就像下面这样: <table> <thead> <tr&g

随机推荐