kafka并发写大消息异常TimeoutException排查记录

目录
  • 前言
  • 定位异常点
  • 分析抛异常的逻辑
  • 真实原因-解决方案
  • 结语

前言

先简单介绍下我们的使用场景,线上5台Broker节点的kafka承接了所有binlog订阅的数据,用于Flink组件接收数据做数据中台的原始数据。昨儿开发反馈,线上的binlog大量报错,都是kafka的异常,而且都是同一条topic抛的错,特征也很明显,发送的消息体非常大,主观判断肯定是写入大消息导致的超时了,异常详情如下:

thread:  kafka-producer-network-thread | producer-1
throwable:  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for BIN-LOG-DATA-center-dmz2-TABLE-kk-data-center-ods_contract_finance_info-0 due to 56352 ms has passed since last append

定位异常点

应用抛一个不常见的异常,一般操作是先去百度or谷歌搜索一番的,就上面这个timeout超时的异常,搜索引擎的结果都是producer连不上Borker导致的问题,根本不是我们这个场景的,所以其次我们就需要从源码中寻找答案了。博主使用的开发工具是IDEA,借助IDEA很容易定位到异常抛出点。首先定位TimeoutException异常类,然后按住ctrl键,点击这个类,会出现如下图所有抛TimeoutException异常的点,然后根据异常message内容,寻找相匹配的点击进去就是抛异常的地方了,如图,红色箭头所指即代码位置:

分析抛异常的逻辑

程序中的异常,一定是符合某些条件才会抛出的,想要解决异常,只要让运行时的环境不满足抛异常的条件即可,下面就是抛异常的代码:

boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
            expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
        else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
            expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
        else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
            expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";

        boolean expired = expiryErrorMessage != null;
        if (expired)
            abortRecordAppends();
        return expired;
    }

可以看到,我们的异常是在第一个逻辑判断时候就满足了所以抛异常了。在此处有可能会抛出三个不同的timeout异常,用中文语义翻译条件分别是:

  • 没设置重试,并且发送批次(batch.size)满了,并且配置请求超时时间(request.timeout.ms)小于【当前时间减去最后追加批次的时间】
  • 没设置重试,并且配置请求超时时间(request.timeout.ms)小于【创建批次时间减去配置的等待发送的时间(linger.ms)】
  • 设置重试,并且配置请求超时时间(request.timeout.ms)小于【当前时间-最后重试时间-重试需要等待的时间(retry.backoff.ms)】

上面括号中的参数就是kafka producer中配置的相关的参数,这些参数都没有重新设置过,batch.size默认是10kb大小,而引发报错的消息都是36kb的大小,默认的request.timeout.ms超时设置是30s,所以在这个判断可能过期了的方法中,引发我们异常的主要原因是batch.size和request.timeout.ms的参数设置问题了。

真实原因-解决方案

从上面代码看表面原因是参数设置不够了,实际上呢,博主使用kafka-test启动了五个Borker集群做复现验证测试,测试写入相同的36kb的message,在所有配置也保持默认的情况下,也根本毫无压力。后面查找相关的错误日志,发现所有的TimeoutException集中在几乎同一时刻,经查明,是因为业务批量导入了数据到mysql中,造成binlog消息突然增加,高并发的往kafka写大消息导致Borker处理不过来,造成的TimeoutException超时,所以真正解决问题也可以从两个方面入手:

  • 服务端:增加Borker,并设置多个TopicPartition,平摊写入压力,这个是根本的解决问题
  • 客户端:加大request.timeout.ms、batch.size参数,或者开启消息重试,这种方案治标不治本,但是也能大概率的减少因为此类场景导致的TimeoutException

结语

异常不可怕,所有异常都是人为抛的,都是有既定的触发条件的,只要定位到触发异常的条件对症下药即可解决问题。不过博主五年来的经验发现,日志打印真的是门艺术,在这个方面,Spring框架和Dubbo以及Apollo配置中心框架就是日志打印的典范,不管发生什么异常,日志里都会输出详细的上下文环境,异常的原因,建议的解决方法,如果涉及到相关的配置,也会打印该怎么配置最好。反观kafka client的这条TimeoutException就显的信息量有点过少了,如果能把相关的配置信息和排查的方向写明会更好。最后安利一波kafka test,轻松搭建多Borker的kafka集群,一个注解就ok了。详情参考我的这篇博文深入研究spring boot集成kafka之spring-kafka底层原理

以上就是kafka并发写大消息异常TimeoutException排查记录的详细内容,更多关于kafka并发异常TimeoutException排查的资料请关注我们其它相关文章!

(0)

相关推荐

  • 消息队列-kafka消费异常问题

    目录 概述 重试一定次数(消息丢失) 加入到死讯队列(消息不丢失) 总结 概述 在kafka中,或者是说在任何消息队列中都有个消费顺序的问题.为了保证一个队列顺序消费,当当中一个消息消费异常时,必将影响后续队列消息的消费,这样业务岂不是卡住了.比如笔者举个最简单的例子:我发送1-100的消息,在我的处理逻辑当中 msg%5==0我就进行 int i=1/0操作,这必将抛异常,一直阻塞在msg=5上,后面6-100无法消费.下面笔者给出解决方案. 重试一定次数(消息丢失) @KafkaHandle

  • 解决kafka消息堆积及分区不均匀的问题

    目录 kafka消息堆积及分区不均匀的解决 1.先在kafka消息中创建 2.添加配置文件application.properties 3.创建kafka工厂 4.展示kafka消费者 kafka出现若干分区不消费的现象 定位过程 验证 解决方法 kafka消息堆积及分区不均匀的解决 我在环境中发现代码里面的kafka有所延迟,查看kafka消息发现堆积严重,经过检查发现是kafka消息分区不均匀造成的,消费速度过慢.这里由自己在虚拟机上演示相关问题,给大家提供相应问题的参考思路. 这篇文章有点

  • java应用占用内存过高排查的解决方案

    故障:收到服务器报警,内存使用率超过80% 1.查看 使用dstat和top查看内存使用最高的应用 使用dstat 查到内存占用最高的是java应用,使用2253M内存,但是这台服务器跑了好几个java,具体哪个进程使用top看下资源情况 使用top 可以看到java应用整体内存使用率超过了70%,其中pid为16494的进程 一个应用占了28.7的内存 2.定位线程问题 使用ps查看16494的线程情况 命令:ps p 16494 -L -o pcpu,pmem,pid,tid,time,tn

  • kafka生产者和消费者的javaAPI的示例代码

    写了个kafka的java demo 顺便记录下,仅供参考 1.创建maven项目 目录如下: 2.pom文件: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://mave

  • kafka并发写大消息异常TimeoutException排查记录

    目录 前言 定位异常点 分析抛异常的逻辑 真实原因-解决方案 结语 前言 先简单介绍下我们的使用场景,线上5台Broker节点的kafka承接了所有binlog订阅的数据,用于Flink组件接收数据做数据中台的原始数据.昨儿开发反馈,线上的binlog大量报错,都是kafka的异常,而且都是同一条topic抛的错,特征也很明显,发送的消息体非常大,主观判断肯定是写入大消息导致的超时了,异常详情如下: thread: kafka-producer-network-thread | producer

  • Python并发编程线程消息通信机制详解

    目录 1 Event事件 2 Condition 3 Queue队列 4 总结一下 前面我已经向大家介绍了,如何使用创建线程,启动线程.相信大家都会有这样一个想法,线程无非就是创建一下,然后再start()下,实在是太简单了. 可是要知道,在真实的项目中,实际场景可要我们举的例子要复杂的多得多,不同线程的执行可能是有顺序的,或者说他们的执行是有条件的,是要受控制的.如果仅仅依靠前面学的那点浅薄的知识,是远远不够的. 那今天,我们就来探讨一下如何控制线程的触发执行. 要实现对多个线程进行控制,其实

  • 详解Mysql数据库平滑扩容解决高并发和大数据量问题

    目录 1 停机方案 2 停写方案 3 平滑扩容之双写方案(中小型数据) 4 平滑扩容之2N方案大数据量问题解决 4.1 扩容问题 4.2 解决方案 4.3 双主架构思想 4.4 环境部署 5 数据库秒级平滑2N扩容实践 5.1 新增数据库VIP 5.2 应用服务增加动态数据源 5.3 解除原双主同步 5.4 安装MariaDB扩容服务器 5.5 增加KeepAlived服务实现高可用 5.6 清理数据并验证 1 停机方案 发布公告 停止服务 离线数据迁移(拆分,重新分配数据) 数据校验 更改配置

  • Spring Boot 中使用@KafkaListener并发批量接收消息的完整代码

    kakfa是我们在项目开发中经常使用的消息中间件.由于它的写性能非常高,因此,经常会碰到读取Kafka消息队列时拥堵的情况.遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic.因此只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,再启动一个新的消费者,没有作用). 完整的代码在这里,欢迎加星号.fork. 官方文档在https://docs.spring.io/spring-kafka/reference/h

  • Java NIO写大文件对比(win7和mac)

    测试说明 写2G文件,分批次写入,每批次写入128MB: 分别在Win7系统(3G内存,双核,32位,T系列处理器)和MacOS系统(8G内存,四核,64位,i7系列处理器)下运行测试.理论上跟硬盘类型和配置也有关系,这里不再贴出了. 测试代码 package rwbigfile; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.RandomA

  • Python3并发写文件与Python对比

    这篇文章主要介绍了Python3并发写文件原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 使用python2在进行并发写的时候,发现文件会乱掉,就是某一行中间会插入其他行的内容. 但是在使用python3进行并发写的时候,无论是多进程,还是多线程,都没有出现这个问题,难道是python3的特性吗? import time import os import multiprocessing from multiprocessing.dumm

  • 滴滴二面之Kafka如何读写副本消息的

    目录 前言 appendRecords-副本写入 副本读取:fetchMessages 总结 前言 无论是读取副本还是写入副本,都是通过底层的Partition对象完成的,而这些分区对象全部保存在上节课所学的allPartitions字段中.可以说,理解这些字段的用途,是后续我们探索副本管理器类功能的重要前提. 现在,我们就来学习下副本读写功能.整个Kafka的同步机制,本质上就是副本读取+副本写入,搞懂了这两个功能,你就知道了Follower副本是如何同步Leader副本数据的. append

  • 异常排查记录amqp协议链接陷阱

    目录 前言 问题背景 异常信息 原因分析 异常一分析: 异常二分析: 解决问题 前言 amqp是一种通用的消息队列数据传输协议,典型的MQ应用RabbitMQ就实现了amqp协议,所以,我们在使用amqp-client链接rabbitmq时,可以使用amqp的链接协议连接rabbitmq.但是博主在尝试使用amqp协议链接时,碰到了一个隐藏的连接协议规范问题,故记录在此. amqp协议文档:https://www.rabbitmq.com/uri-spec.html 问题背景 amqp-clie

  • spring boot整合mongo查询converter异常排查记录

    目录 前言 自定义转换器 javabean的方式配置MongoTemplate 后记: 前言 使用过spring boot的人都知道spring boot约定优于配置的理念给我们开发中集成相关技术框架提供了很多的便利,集成mongo也是相当的简单,但是通过约定的配置信息来集成mongo有些问题. 当你的字段包含Timestamp这种类型时,读取数据的时候会抛一个类型转换的异常,如 No converter found capable of converting from type [java.u

  • 记一次tomcat进程cpu占用过高的问题排查记录

    本文主要记录一次tomcat进程,因TCP连接过多导致CPU占用过高的问题排查记录. 问题描述 linux系统下,一个tomcat web服务的cpu占用率非常高,top显示结果超过200%.请求无法响应.反复重启依然同一个现象. 问题排查 1.获取进程信息 通过jdk提供的jps命令可以快速查出jvm进程, jps pid 2.查看jstack信息 jstack pid 发现存在大量log4j线程block,处于waiting lock状态 org.apache.log4j.Category.

随机推荐