消息队列-kafka消费异常问题

目录
  • 概述
  • 重试一定次数(消息丢失)
  • 加入到死讯队列(消息不丢失)
  • 总结

概述

在kafka中,或者是说在任何消息队列中都有个消费顺序的问题。为了保证一个队列顺序消费,当当中一个消息消费异常时,必将影响后续队列消息的消费,这样业务岂不是卡住了。比如笔者举个最简单的例子:我发送1-100的消息,在我的处理逻辑当中 msg%5==0我就进行 int i=1/0操作,这必将抛异常,一直阻塞在msg=5上,后面6-100无法消费。下面笔者给出解决方案。

重试一定次数(消息丢失)

@KafkaHandler
    @KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2", concurrency = "1")
    public void test6(String msg){
              businessProcess(msg);
            }
           private void businessProcess(String msg){
        System.out.println("接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
       if (Integer.valueOf(msg) % 5 == 0) {
            int i = 1 / 0;
        }
    }

说明:如果读者使用的是java客户端,也就是spring进行实现,那么在不做任何处理的情况下,会自动重试10次,然后消息会被直接处理掉。也就是说如果你的业务允许消息丢失,那么你不需要额外的编码处理

加入到死讯队列(消息不丢失)

消费端代码:

//1.启用手动提交offset
//2.配置errorHandler,用来加入到死讯队列
//3.不管业务处理是否处理异常还是正常都提交offset
@KafkaHandler
    @KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2",
            errorHandler ="kafkaListenerErrorHandler", concurrency = "1")
    public void test6(String msg,Acknowledgment ack){
        try {
            businessProcess(msg);
        }finally {
            //手动提交
            ack.acknowledge();
        }
    }
//1.专门处理死讯队列消息,都是topicName+.DLT的主题
//2.死讯队列里,只有消费成功的才提交offset,否则等待bug修复完上线,继续处理
    @KafkaHandler
    @KafkaListener(topics = {"quickstart-events.DLT"},groupId = "test-consumer-group-2", concurrency = "1")
    public void test7(String msg,Acknowledgment ack){
        try {
            businessProcess(msg);
            ack.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
//业务代码
    private void businessProcess(String msg){
        System.out.println("接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
        if (Integer.valueOf(msg) % 5 == 0) {
            int i = 1 / 0;
        }
    }

异常处理器

//1.向容器注册一个KafkaListenerErrorHandler类型的bean
//2.该bean就是当处理消息异常的时候,将消息加入到.DLT主题中
@Component("kafkaListenerErrorHandler")
public class KafkaListenerErrorHandlerTest implements KafkaListenerErrorHandler {
   @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    private static final String TOPIC_DLT=".DLT";
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        System.out.println("消费失败消息:"+message.toString());
        //获取消息处理异常主题
        MessageHeaders headers = message.getHeaders();
        String topic=headers.get("kafka_receivedTopic")+TOPIC_DLT;
        //放入死讯队列
        kafkaTemplate.send(topic,message.getPayload());
        return message;
    }
}

效果图

说明:以上基本上就是使用死讯队列的方案,也许读者会觉得这样编码复杂度很高,但其实不用担心,其实上面这些代码基本上是使用死讯队列的模板代码,在成熟一点的公司,一般会使用上述代码进行简单封装,这里笔者给个思路,有兴趣同学可以实现一下。我们其实可以使用aop思想,进行自定义一个@EnableDLT这样的注解去实现,这样上面这个方案使用起来是不是就简单优雅了。之前笔者在开发过程中使用过亚马逊的消息队列服务,也不过是这样实现罢了。

总结

本篇文章就到这里了,希望可以给你带来一些帮助,也希望您能够多多关注我们的更多内容!

(0)

相关推荐

  • KOA+egg.js集成kafka消息队列的示例

    Egg.js : 基于KOA2的企业级框架 Kafka:高吞吐量的分布式发布订阅消息系统 本文章将集成egg + kafka + mysql 的日志系统例子 系统要求:日志记录,通过kafka进行消息队列控制 思路图: 这里消费者和生产者都由日志系统提供 λ.1 环境准备 ①Kafka 官网下载kafka后,解压 启动zookeeper: bin/zookeeper-server-start.sh config/zookeeper.properties 启动Kafka server 这里conf

  • 通过pykafka接收Kafka消息队列的方法

    没有Kafka环境,所以也没有进行验证.感觉今后应该能用到,所以借抄在此,备查. pykafka使用示例,自动消费最新消息,不重复消费: # -* coding:utf8 *- from pykafka import KafkaClient host = '192.168.200.38' client = KafkaClient(hosts="%s:9092" % host) print client.topics # 生产者 # topicdocu = client.topics['

  • Spring boot 整合KAFKA消息队列的示例

    这里使用 spring-kafka 依赖和 KafkaTemplate 对象来操作 Kafka 服务. 一.添加依赖和添加配置项 1.1.在 Pom 文件中添加依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 1.2.添加配置项 spring: kafka: b

  • 浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式;2.并行方式 a.串

  • 消息队列-kafka消费异常问题

    目录 概述 重试一定次数(消息丢失) 加入到死讯队列(消息不丢失) 总结 概述 在kafka中,或者是说在任何消息队列中都有个消费顺序的问题.为了保证一个队列顺序消费,当当中一个消息消费异常时,必将影响后续队列消息的消费,这样业务岂不是卡住了.比如笔者举个最简单的例子:我发送1-100的消息,在我的处理逻辑当中 msg%5==0我就进行 int i=1/0操作,这必将抛异常,一直阻塞在msg=5上,后面6-100无法消费.下面笔者给出解决方案. 重试一定次数(消息丢失) @KafkaHandle

  • 学习spring事务与消息队列

    在开发过程中,遇到一个bug,产生bug的原因是spring事务提交晚于消息队列的生产消息,导致消息队列消费消息时获取到的数据不正确.这篇文章介绍问题的产生和一步步的解决过程. 一.问题的产生: 场景还原:接口中的一个方法,首先修改订单状态,然后向消息队列中生产消息,消息队列的消费者获取到消息检测订单状态,发现订单状态未更改. 代码: @Service(orderApi) public class OrderApiImpl implements OrderApi { @Resource MqSe

  • Java RabbitMQ 中的消息长期不消费会过期吗

    目录 1. 默认情况 2. TTL 2.1 单条消息过期 2.2 队列消息过期 2.3 特殊情况 3. 死信队列 3.1 死信交换机 3.2 死信队列 3.3 实践 4. 小结 RabbitMQ 中的消息长期未被消费会过期吗?用过 RabbitMQ 的小伙伴可能都有这样的疑问,今天松哥就来和大家捋一捋这个问题. 1. 默认情况 首先我们来看看默认情况. 默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直

  • 大数据Kafka:消息队列和Kafka基本介绍

    目录 一.什么是消息队列 二.消息队列的应用场景 异步处理 应用耦合 限流削峰 消息驱动系统 三.消息队列的两种方式 点对点模式 发布/订阅模式 四.常见的消息队列的产品 1) RabbitMQ 2) activeMQ: 3) RocketMQ 4) kafka 五.Kafka的基本介绍 一.什么是消息队列 消息队列,英文名:Message Queue,经常缩写为MQ.从字面上来理解,消息队列是一种用来存储消息的队列 .来看一下下面的代码 上述代码,创建了一个队列,先往队列中添加了一个消息,然后

  • Kafka中消息队列的两种模式讲解

    目录 Kafka消息队列的两种模式 1.点对点模式 2.发布/订阅模式 Kafka消息队列模型 Kafka消息队列的两种模式 消息队列包括两种模式,点对点模式(point to point, queue)和发布/订阅模式(publish/subscribe,topic) 1.点对点模式 点对点模式下包括三个角色: 消息队列 发送者 (生产者) 接收者(消费者) 消息发送者生产消息发送到queue中,然后消息接收者从queue中取出并且消费消息.消息被消费以后,queue中不再有存储,所以消息接收

  • kafka 消息队列中点对点与发布订阅的区别说明

    目录 背景知识 1.JMS中定义 2.二者分析与区别 2.1 点对点模式 2.2 发布订阅模式 3.流行的消息队列模型比较 3.1 RabbitMQ 3.2 Kafka 背景知识 JMS一个在 Java标准化组织(JCP)内开发的标准(代号JSR 914).2001年6月25日,Java消息服务发布JMS 1.0.2b,2002年3月18日Java消息服务发布 1.1. Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM

  • 关于Kafka消息队列原理的总结

    目录 Kafka消息队列原理 Kafka的逻辑数据模型 Kafka的分发策略 Kafka的物理存储模型和查找数据的设计 Kafka的持久化策略设计 Kafka的节点间的数据一致性策略设计 Kafka的备份和负载均衡 Kafka消息队列内部实现原理 Kafka消息队列原理 最近在测试kafka的读写性能,所以借这个机会了解了kafka的一些设计原理,既然作为分布式系统,我们还是按照分布式的套路进行分析. Kafka的逻辑数据模型 生产者发送数据给服务端时,构造的是ProducerRecord<In

  • Java分布式学习之Kafka消息队列

    目录 介绍 Kafka核心相关名称 kafka集群安装 kafka使用 kafka文件存储 Springboot整合kafka 介绍 Apache Kafka 是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统. 它最初由LinkedIn公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交日志服务. 注意:Kafka并没有遵循JMS规范(

随机推荐