流式图表拒绝增删改查之kafka核心消费逻辑下篇

目录
  • 前篇回顾
  • kafka消费者线程
  • 任务提交

前篇回顾

  • 流式图表框架搭建
  • kafka核心消费逻辑线程池搭建

kafka消费者线程

突击检查八股文,实现线程的方法有哪些?嗯?没复习是吧,行没关系,那感谢参加本次面试哈。

常用的几种方式分别是:

  • 继承Thread类,重写run方法
  • 实现Runbale接口,重写run方法
  • 实现Callable接口,重写call方法

这里我们直接创捷出一个任务类实现Runable方法,重写run方法,一个线程当作一个kafka client,所以要在任务类中声明一个KafkaConsumer的成员变量,另外创建任务需要指定当前任务的名称也就是线程名,还有要监听的topic主题。

private KafkaConsumer<String, String> consumer;
private String topic;
private String threadName;

name和topic通过构造方法传进来,同时在构造方法里完成对client的初始化操作。

/**
    * 封装必要信息
    * @param bootServer 生产者ip
    * @param groupId 分组信息
    * @param topic  订阅主题
    */
   public KafkaConsumerRunnable(String bootServer, String groupId, String topic) {
       this.topic = topic;
       Properties props = new Properties();
       props.put("bootstrap.servers", bootServer);
       props.put("group.id", groupId);
       props.put("enable.auto.commit", "false");
       props.put("auto.offset.reset", "latest");
       props.put("max.poll.records", 5);
       props.put("session.timeout.ms", "60000");
       props.put("max.poll.interval.ms", 300000);
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //键反序列化方式
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       this.consumer = new KafkaConsumer&lt;&gt;(props);
   }

这里封装kafka client的必要信息,入参bootServer为kafka集群ip,groupId为threadName,我们规定一个线程为一个kafka消费链接,消费一个topic。

上一篇线程池保证了任务不会轻易挂掉,就算挂掉了也会重新提交,所以为了节省资源不做所谓的同groupId的负载操作。session.timeout.ms和max.poll.interval.ms可以根据当前的kafka资源灵活配置,不然可能会引发一些reblance。

enable.auto.commit设置为false,手动提交offset,auto.offset.reset这块由于业务特殊,本来就是流式图表瞬时的展示,如果真的出现了数据丢失那就丢了吧,从最新的数据读取。

接下来只需要处理下消费逻辑,consumer.subscribe(Collections.singletonList(this.topic))开始订阅监听kafka数据,搞一个while true不断的消费数据,try catch只需要对WakeupException做处理,kafka客户端会在关闭的时候抛出WakeupException异常。

finally里提交offset,无论这条offset对应的数据消费成功还是失败都是消费过了,失败了就过去了。

   @Override
   public void run() {
   consumer.subscribe(Collections.singletonList(this.topic));
   String key = "stream_chart:" + this.name;
   Thread.currentThread().setName(key);
   try {
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
         // 如果队列中没有消息 等待KAFKA_TIME_OUT后调用poll,如果有消息立即消费
         for (ConsumerRecord<String, String> record : records) {
            String value = record.value();
            log.info("线程 {} 消费kafka数据 -> {} \n", Thread.currentThread().getName(), value);
            RedisConfig.getRedisTemplate().opsForZSet().add(key, value, Instant.now().getEpochSecond() * 1000);
         }
      }
   } catch (WakeupException e) {
      log.info("ignore for shutdown", e);
   } finally {
      consumer.commitAsync();
   }
}

我们消费到数据直接放到redis的zset结构里,当前的时间戳作为score,最后留一个关闭客户端的后门

// 退出后关掉客户端
public void shutDown() {
   consumer.wakeup();
}

任务提交

任务提交这块只需要在业务service中注入线程池,创建对应的KafkaRunable任务封装对应的信息,执行execute即可。

这里有个坑需要注意下,第二次突击检查八股文,线程池提交方法submitexecute的区别说一下。不知道的立刻去熟读并背诵。

public class TestTheadPool {
    public static void main(String[] args) {
        ExecutorService executorService= Executors.newFixedThreadPool(1);
        executorService.submit(new task("submit"));
        executorService.execute(new task("execute"));
    }
}
class task implements  Runnable{
    private String name;
    public task(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println(this.name + " start task");
        int i=1/0;
    }
}

熟悉的同学通过示例代码可以看出来,submit提交的线程不会抛出异常代码,只有获取Future返回值并执行get方法才会捕获到异常。这块涉及到异步的东西不再赘述

try {
    Future<?> submit = executorService.submit(new task("submit"));
    submit.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

所以我们要使用execute执行,不然kafka消费线程里消费失败了拦截不到就不会被重新提交,导致线程挂掉。

以上就是流式图表拒绝增删改查之kafka核心消费逻辑下篇的详细内容,更多关于kafka消费流式图表的资料请关注我们其它相关文章!

(0)

相关推荐

  • Spring Boot整合Kafka教程详解

    目录 正文 步骤一:添加依赖项 步骤二:配置 Kafka 步骤三:创建一个生产者 步骤四:创建一个消费者 正文 本教程将介绍如何在 Spring Boot 应用程序中使用 Kafka.Kafka 是一个分布式的发布-订阅消息系统,它可以处理大量数据并提供高吞吐量. 在本教程中,我们将使用 Spring Boot 2.5.4 和 Kafka 2.8.0. 步骤一:添加依赖项 在 pom.xml 中添加以下依赖项: <dependency> <groupId>org.springfra

  • 一文详解kafka序列化器和拦截器

    目录 介绍 序列化器 设置序列化和反序列化 自定义序列化 自定义反序列化 思考 拦截器 总结 介绍 本篇主要介绍kafka的拦截器和序列化器,序列化器是和数据在网络中的传输有关,数据在网络中的传输为字节流,所以生产者在发送时需要将其序列化为字节流,消费者收到消息时,需要将字节流反序列化为我们能够识别的对象,我们不难看出,这就是RPC通信,kafka中实现了很多自定义协议,我们知道,在RPC通信中,只有生产者和消费者的协议一样,才能相互传输和解析数据,在使用HTTP时,我们就不用去关注协议本身,因

  • 流式图表拒绝增删改查之框架搭建过程

    目录 前言 技术方案 数据库设计 基础模块 整体流程 最终效果 前言 作为一名练习时长两年半的夹娃工程师,常年浸泡在增删改查的业务代码里,每当金三银四来临该迭代自己简历的时候,面对自己的项目经历都十分窘迫.突然有天学弟问我在实习公司一直做缝缝补补的工作或者是一些基于封装好的RBAC(基于角色的权限管理系统)做业务的增删改查,眼下又要找工作了不知道咋写项目经验. 无论是功能.技术栈.设计都平淡无奇,问我咋整,我打开尘封已久的简历一看,操,我不也一样吗. 当时快毕业那会也是有点焦虑,难的项目看不懂简

  • 流式图表拒绝增删改查之kafka核心消费逻辑下篇

    目录 前篇回顾 kafka消费者线程 任务提交 前篇回顾 流式图表框架搭建 kafka核心消费逻辑线程池搭建 kafka消费者线程 突击检查八股文,实现线程的方法有哪些?嗯?没复习是吧,行没关系,那感谢参加本次面试哈. 常用的几种方式分别是: 继承Thread类,重写run方法 实现Runbale接口,重写run方法 实现Callable接口,重写call方法 这里我们直接创捷出一个任务类实现Runable方法,重写run方法,一个线程当作一个kafka client,所以要在任务类中声明一个K

  • MongoDB.NET 2.2.4驱动版本对Mongodb3.3数据库中GridFS增删改查

    本文实例为大家分享了针对Mongodb3.3数据库中GridFS增删改查,供大家参考,具体内容如下 Program.cs代码如下: internal class Program { private static void Main(string[] args) { GridFSHelper helper = new GridFSHelper("mongodb://localhost", "GridFSDemo", "Pictures"); #re

  • PHP操作MongoDB实现增删改查功能【附php7操作MongoDB方法】

    本文实例讲述了PHP操作MongoDB实现增删改查功能.分享给大家供大家参考,具体如下: MongoDB的PHP驱动提供了一些核心类来操作MongoDB,总的来说MongoDB命令行中有的功能,它都可以实现,而且参数的格式基本相似.PHP7以前的版本和PHP7之后的版本对MongoDB的操作有所不同,本文主要以PHP7以前版本为例讲解PHP对MongoDB的各种操作,最后再简单说明一下PHP7以后版本对MongoDB的操作. 一.数据插入 //insert() //参数1:一个数组或对象 //参

  • Java描述数据结构学习之链表的增删改查详解

    前言 链表是一种常见的基础数据结构,它是一种线性表,但在内存中它并不是顺序存储的,它是以链式进行存储的,每一个节点里存放的是下一个节点的"指针".在Java中的数据分为引用数据类型和基础数据类型,在Java中不存在指针的概念,但是对于链表而言的指针,指的就是引用数据类型的地址. 链表和数组都是线性的数据结构,对于数组而言其长度是固定的,由于在内存中其是连续的,因此更适合做查找与遍历,而链表在内存中是并不是顺序存储的,但是由于其是通过"指针"构成的,因此在插入.删除时

  • Android利用SAX对XML进行增删改查操作详解

    前言 解析XML的方式有很多种,大家比较熟悉的可能就是DOM解析. DOM(文件对象模型)解析:解析器读入整个文档,然后构建一个驻留内存的树结构,然后代码就可以根据DOM接口来操作这个树结构了. 优点:整个文档读入内存,方便操作:支持修改.删除和重现排列等多种功能. 缺点:将整个文档读入内存中,保留了过多的不需要的节点,浪费内存和空间. 使用场合:一旦读入文档,还需要多次对文档进行操作,并且在硬件资源充足的情况下(内存,CPU). 为了解决DOM解析存在的问题,就出现了SAX解析.其特点为: 优

  • JdbcTemplate方法介绍与增删改查操作实现

    JdbcTemplate介绍 为了使 JDBC 更加易于使用,Spring 在 JDBCAPI 上定义了一个抽象层, 以此建立一个JDBC存取框架,Spring Boot Spring Data-JPA. 作为 SpringJDBC 框架的核心, JDBC 模板的设计目的是为不同类型的JDBC操作提供模板方法. 每个模板方法都能控制整个过程,并允许覆盖过程中的特定任务. 通过这种方式,可以在尽可能保留灵活性的情况下,将数据库存取的工作量降到最低. JdbcTemplate方法介绍 JdbcTem

  • node.js中 mysql 增删改查操作及async,await处理实例分析

    本文实例讲述了node.js中 mysql 增删改查操作及async,await处理.分享给大家供大家参考,具体如下: 要对mysql进行操作,我们需要安装一个mysql的库. 一.安装mysql库 npm install mysql --save 二.对mysql进行简单查询操作 const mysql = require('mysql'); //创建数据库连接 let conn = mysql.createConnection({ //主机地址 host: '127.0.0.1', //用户

  • MySQL 详细单表增删改查crud语句

    MySQL 增删改查语句 1.创建练习表 这里练习表没有满足三范式 第一范式(又称 1NF):保证每列的原子性 数据表中的每一列(字段),必须是不可拆分的最小单元,也就是确保每一列的原子性.满足第一范式是关系模式规范化的最低要求,否则,将有很多基本操作在这样的关系模式中实现不了. 第二范式(又称 2NF):保证一张表只描述一件事情 满足1NF后要求表中的所有列,每一行的数据只能与其中一列相关,即一行数据只做一件事.只要数据列中出现数据重复,就要把表拆分开来. 第三范式(又称 3NF):保证每列都

  • mybatis3使用@Select等注解实现增删改查操作

    1.需要的jar包 2.目录树 3.具体代码 一.需要的jar包 第一个:mybatis的jar包 第二个:mysql数据的驱动 二.目录树 三.具体代码 使用框架,配置文件先行! conf.xml:(配置 登录数据库,映射文件) <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN

随机推荐