Spring Kafka中如何通过参数配置解决超时问题详解

目录
  • 背景
  • 思路
  • 过程
    • 步骤一,查询版本特性
    • 步骤二,查源码
    • 步骤三,查自身的代码
  • 总结

背景

这是我们团队负责的一个不太核心的服务。之前与外部交互时应外部要求由普通kafka集群改成加密kafka集群。我们是数据生产端。

改的过程中并跑上线,60%的请求耗时增加了2倍,也还是在百毫秒的量级可以接受。但是每次重启的第一个请求要5s以上,会超过;运行过程中,一两个月也会有一次超时。因为我们有三次重试,整体没有影响成功率。

上线的时候我们问过网络组,还专门请教过公司专业负责kafka的团队。结论是:第一,这个慢是外部交互方的问题,不是咱们这边可以处理的;第二,参数上也没有什么可以调优的。

我们团队内部还是不信邪,调了几个参数,加测之后上线了。频繁度降到了现在的一两个月一次超时,但是没有根治。因为本身这个服务不是特别核心,本身外部是允许有一定失败率的,而且现在实际上也没有失败,几年内业务量也是很平稳的:1分钟4笔。

而我上班时间的状态基本上是我站在两个人中间,我目的是想问一个人问题,结果却先要回答另外一个人的问题,这时候还会出现第四个人说别的事。这个优先级排不上。但是心疼开发小哥哥,每一两个月就要处理一下因为这件事引起的告警。虽然实际不影响,告警出来了,我们就要排查核对是否还是这个问题,并且确实通过重试将消息推送出去了。

所以本次利用周末,希望可以根治这个疑难杂症,减少运维成本。

思路

前期已经明确了这个外部的加密集群建立连接和数据传输速度都慢于之前的普通集群。之所以第一次慢和每一两个月会慢一次都是连接断开重连造成的。之前我们进行过参数调优,调优做的就是因为1分钟4笔请求,线上以最小部署单元3台机器部署,每台机器1分钟预计处理一笔请求。根据这个数据调整了空闲自动断开连接的时间间隔,保证连接不会因为空闲自动断开。线上验证有效,也侧面证实了是连接过程慢引起的超时。

因为建立连接过程慢,这个主要是外部提供的集群就是如此。既然目前并不影响实际发送成功率。人家代表的是大佬,我们也不好太强硬的去推他们解决。所以我的思路有两个:

第一,探索将建立连接与发送数据分离的可行性:程序启动后先将连接建立好再提供服务。如果生产端是这样实现的。那也许还可以进行连接自动探测,如果连接断开则自动重连,不要等发送数据时再发现连接已断开。

第二,其实第一种思路的可行性渺茫,只是需要验证一下自己的想法。一般的这种消息中间件,消费端是这样实现的。但是生产端采用了更简单的方式:读写数据的时候再探测连接是否可用,不可用则重新建立连接。这种用在发送本来就是异步的,对发送延迟本身敏感度也不高的场景。生产端本来就是这种场景,并且通过测试实际上也确实是在发送时建立的第一次连接。kafka生产端原本就是这种设计的可能性极大。如果是这种情况,那就在生产端真正使用异步,给调用方返回“受理成功”,保证调用方不超时。自己再通过接受回调保证实际的成功。

这个事情真要做,还有两个隐形需求:

1、因为外部有需求,数据可以偶尔少发,但是不能重复发送。所以不能使用业务级别的数据发送来实现探测功能。重试也要保证上条确实没有收到。

2、改造不能太大,研发成本要小。

过程

因为我在网上搜到的这方面都是入门级,没有什么解决这个问题的相关资料。所以采用的主要方法是读源码和官方文档。当然,本文的方法是有前提知识储备基础的。就是《白话TCP/IP原理》系列的相关内容:https://mp.weixin.qq.com/s/Y2k3AW2ZjWbB1w63gsSRag

步骤一,查询版本特性

我们目前用到的kafka客户端版本是

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.8.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>

spring-kafka对应的官网的大版本是2.5,所以先点开了2.5.17.RELEASE对应的参考文档。看到一句有用信息:

The default consumer and producer factories can now invoke a callback whenever a consumer or producer is created or closed.

默认消费者和生产者工厂现在已经可以在生产者和消费者创建和关闭时引发一个回调。耗时的连接建立过程是可以监听的,我们可以通过打日志进行监控。

步骤二,查源码

首先我们看一下类图,看不清楚没有关系。看这里就好:

首先发现Producer、Consumer和Sender都是通过KafkaClient(接口),也就是NetworkClient(实现类)进行网络活动的。其次发现NetworkClient是在传输层和应用层之间起了一个缓冲的作用,解耦了各个部件。

Producer、Consumer和AdminClient主要管理requests;NetworkClient主要管理connection;Selector主要管理sockets channel。这些被管理对象我在之前的网络系列里都讲过。

如果不看代码,我站在设计者角度结合类图猜想:生产端实际使用的是KafkaTemplate的send方法,具体的参数都是由DefaultKafkaProducerFactory接收。实际上连接的建立是Producer类进行。而在Producer类依赖于NetworkClient。而实际上进行连接应该在Sender类。Sender是一个Runnable异步线程来做,那实际建立连接的是run方法中。

我跟踪源码验证了猜想。NetworkClient里有个initiateConnect的私有方法,是建立连接用的,跟踪它就可以知道调用的地方。跟踪下来,主要入口在NetworkClient的poll方法,注释如下:

 /**
     * Do actual reads and writes to sockets.
     *
     * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
     *                must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
     *                metadata timeout
     * @param now The current time in milliseconds
     * @return The list of responses received
     */
    @Override
    public List<ClientResponse> poll(long timeout, long now) {

人家明确说了是读写时才会调用。证实了思路一不可行。

步骤三,查自身的代码

按照思路二,进行异步化。本身生产端就应该是异步的,为什么异步没有生效呢?结合KafkaTemplate的send方法源代码和项目中自己写的代码。异步部分大体是这样:

 SettableListenableFuture future = new SettableListenableFuture();
        future.set("OK");
        future.get();
        future.addCallback((sendResult) -> {
            try {
                System.out.println("成功");
            } catch (Exception e) {
            }
        }, r -> {
            System.out.println("失败");
        });
        System.out.println("============end==============");

就是说KafkaTemplate的异步是靠使用SettableListenableFuture实现的,实际上它的set方法会马上触发callback,是同步的。代码是先同步调用set,并且还手动调用了get(这个方法会等待直到返回结果)。所以整体是同步的。或者直接这么看,future实现异步要有一个Callable或者Runnable的线程方法,人家SettableListenableFuture第一行源码就禁用了Callable。这个我看了2.5.17.RELEASE这个更高版本的spring-kafka,实现没有做更改。

也就是说spring-kafka自身起码在2.5.X版本里异步没有起到作用。

问题清楚了修改也很简单,比如可以加个异步注解将整个发送方法做异步,重试等逻辑也放到这个方法中。给调用方只返回受理成功。具体怎么解决交给开发小哥哥。

总结

幸亏我上周已经提前规划好周一要休假。否则现在都2点半了明天上班也没精神。主要时间花在异步不生效的问题上。其实排查异步不生效的思路是很简单清晰的。耗时长是因为:第一,不敢相信spring官方实现的,竟然使用异步的代码实际效果没有异步;第二,关于异步我在网上搜索了一下,都是按照项目中配置的那样。官方这样说,大家这样说,我总得考虑是不是自己搞错了。

所以我反复的验证、反复的debug之后也不敢下结论。仔细研究了源码仍然不敢下结论。直到终于搜索到一篇文章说要实现异步除了要使用addCallback之外还要加异步标签。人间清醒的我,马上意识到文章实际用了两种不同方法实现异步。作者之所以认为这是一个方法的两个部分大概也是发现其实spring-kafka的异步没好使吧。

到此这篇关于Spring Kafka中如何通过参数配置解决超时问题的文章就介绍到这了,更多相关Spring Kafka参数配置解决超时内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 详解Spring Kafka中关于Kafka的配置参数

    SpringKafka文档地址:https://docs.spring.io/spring-kafka/reference/htmlsingle kafka文档地址:http://kafka.apache.org/documentation SpringKafka中配置的Java配置实现类:https://github.com/spring-projects/spring-boot/blob/v1.5.4.RELEASE/spring-boot-autoconfigure/src/main/ja

  • Spring Kafka中如何通过参数配置解决超时问题详解

    目录 背景 思路 过程 步骤一,查询版本特性 步骤二,查源码 步骤三,查自身的代码 总结 背景 这是我们团队负责的一个不太核心的服务.之前与外部交互时应外部要求由普通kafka集群改成加密kafka集群.我们是数据生产端. 改的过程中并跑上线,60%的请求耗时增加了2倍,也还是在百毫秒的量级可以接受.但是每次重启的第一个请求要5s以上,会超过:运行过程中,一两个月也会有一次超时.因为我们有三次重试,整体没有影响成功率. 上线的时候我们问过网络组,还专门请教过公司专业负责kafka的团队.结论是:

  • spring boot中的properties参数配置详解

    application.properties application.properties是spring boot默认的配置文件,spring boot默认会在以下两个路径搜索并加载这个文件 src\main\resources src\main\resources\config 配置系统参数 在application.properties中可配置一些系统参数,spring boot会自动加载这个参数到相应的功能,如下 #端口,默认为8080 server.port=80 #访问路径,默认为/

  • Java Spring MVC 上传下载文件配置及controller方法详解

    下载: 1.在spring-mvc中配置(用于100M以下的文件下载) <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter"> <property name="messageConverters"> <list> <!--配置下载返回类型--> <bean class="or

  • Spring Cloud中使用jib进行docker部署的步骤详解

    Jib介绍 Jib 是 Google 开发的可以直接构建 Java 应用的 Docker 和 OCI 镜像的类库,以 Maven 和 Gradle 插件形式提供. 通过 Jib,Java 开发者可以使用他们熟悉的 Java 工具来构建容器.Jib 是一个快速而简单的容器镜像构建工具,它负责处理将应用程序打包到容器镜像中所需的所有步骤.它不需要你编写 Dockerfile 或安装 Docker,而且可以直接集成到 Maven 和 Gradle中 -- 只需要将插件添加到构建中,就可以立即将 Jav

  • IntelliJ IDEA 的 Spring 项目如何查看 @Value 的配置和值(方法详解)

    当你打开项目或者项目中的文件的时候,如果你有 Spring 的 Value 的配置,Intellij 将会自动将参数替换为值. 如果你单击上面的值,那么这个配置参数将会显示为配置的参数名. 如果你还想显示值的话,你需要重新打开这个文件或者项目. 有没有什么快捷键可以快速进行切换. 快捷键 这个配置是在 Intellij 的 Code > Folding 中进行配置的. 快捷键是是 Ctrl + NumberPad + 快捷键是是 Ctrl + NumberPad - NumberPad +,这个

  • Spring Boot中扩展XML请求与响应的支持详解

    前言 在之前的所有Spring Boot教程中,我们都只提到和用到了针对HTML和JSON格式的请求与响应处理.那么对于XML格式的请求要如何快速的在Controller中包装成对象,以及如何以XML的格式返回一个对象呢? 什么是xml文件格式 我们要给对方传输一段数据,数据内容是"too young,too simple,sometimes naive",要将这段话按照属性拆分为三个数据的话,就是,年龄too young,阅历too simple,结果sometimes naive.

  • Spring BPP中如何优雅的创建动态代理Bean详解

    v一.前言 本文章所讲并没有基于Aspectj,而是直接通过Cglib以及ProxyFactoryBean去创建代理Bean.通过下面的例子,可以看出Cglib方式创建的代理Bean和ProxyFactoryBean创建的代理Bean的区别. v二.基本测试代码 测试实体类,在BPP中创建BppTestDepBean类型的代理Bean. @Component public static class BppTestBean { @Autowired private BppTestDepBean d

  • 在Spring Boot中加载XML配置的完整步骤

    开篇 在SpringBoot中我们通常都是基于注解来开发的,实话说其实这个功能比较鸡肋,但是,SpringBoot中还是能做到的.所以用不用是一回事,会不会又是另外一回事. 涛锅锅在个人能力能掌握的范围之内,一般是会得越多越好,都是细小的积累,发生质的改变,所以今天和小伙伴们一起分享一下. 实践 1.首先我们新建一个SpringBoot Project ,工程名为 xml 2.添加web依赖,点击Finish完成构建 3.我们新建一个类 SayHello 不做任何配置 package org.t

  • 详解Spring Boot中如何自定义SpringMVC配置

    目录 前言 一.SpringBoot 中 SpringMVC 配置概述 二.WebMvcConfigurerAdapter 抽象类 三.WebMvcConfigurer 接口 四.WebMvcConfigurationSupport 类-自定义配置 五.WebMvcAutoConfiguration 配置类 – 自动化配置 六.@EnableWebMvc 注解 七.总结 前言 在 Spring Boot 框架中只需要在项目中引入 spring-boot-starter-web 依赖,Spring

随机推荐