spring-Kafka中的@KafkaListener深入源码解读

目录
  • 前言
  • 一、总体流程
  • 二、源码解读
    • 1、postProcessAfterInitialization
      • 1.1、processKafkaListener
      • 1.2、processListener
      • 1.3、registerEndpoint
      • 1.4、startIfNecessary
    • 2、afterSingletonsInstantiated
      • 2.1、afterPropertiesSet
      • 2.2、registerAllEndpoints
  • 总结

前言

本文主要通过深入了解源码,梳理从spring启动到真正监听kafka消息的这套流程

一、总体流程

从spring启动开始处理@KafkaListener,到start消息监听整体流程图

二、源码解读

1、postProcessAfterInitialization

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
		    Class<?> targetClass = AopUtils.getTargetClass(bean);

		    // 扫描@KafkaListener注解
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);

			......

			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(bean.getClass());
				this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
			}
			else {
				// Non-empty set of methods
				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
					Method method = entry.getKey();
					// 遍历扫描到的所有@KafkaListener注解并开始处理
					for (KafkaListener listener : entry.getValue()) {
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
							+ beanName + "': " + annotatedMethods);
			}
			// 处理在类上的@KafkaListener注解
			if (hasClassLevelListeners) {
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}

1.1、processKafkaListener

KafkaListenerAnnotationBeanPostProcessor#processKafkaListener

	protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
		Method methodToUse = checkProxy(method, bean);
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
		endpoint.setMethod(methodToUse);
		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
	}

1.2、processListener

KafkaListenerAnnotationBeanPostProcessor#processListener

将每个kafkaListener转变成MethodKafkaListenerEndpoint并注册到KafkaListenerEndpointRegistrar容器,方便后续统一启动监听

	protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
			Object bean, Object adminTarget, String beanName) {

		String beanRef = kafkaListener.beanRef();
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.addListener(beanRef, bean);
		}
		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(kafkaListener));
		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
		endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
		endpoint.setTopics(resolveTopics(kafkaListener));
		endpoint.setTopicPattern(resolvePattern(kafkaListener));
		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
		String group = kafkaListener.containerGroup();

        ......

        // 注册已经封装好的消费端-endpoint
		this.registrar.registerEndpoint(endpoint, factory);

		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.removeListener(beanRef);
		}
	}

1.3、registerEndpoint

KafkaListenerEndpointRegistrar#registerEndpoint

	public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {

	    ......

		KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
		synchronized (this.endpointDescriptors) {
		    // 如果到了需要立即启动监听的阶段就直接注册并监听(也就是创建消息监听容器并启动)
			if (this.startImmediately) { // Register and start immediately
				this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
						resolveContainerFactory(descriptor), true);
			}
			else {
			    // 一般情况都先走这一步,添加至此列表,待bean后续的生命周期 统一注册并启动
				this.endpointDescriptors.add(descriptor);
			}
		}
	}

	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
			boolean startImmediately) {

        ......

		synchronized (this.listenerContainers) {

			......

			// 1.创建消息监听容器
			MessageListenerContainer container = createListenerContainer(endpoint, factory);
			this.listenerContainers.put(id, container);
			if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
				List<MessageListenerContainer> containerGroup;
				if (this.applicationContext.containsBean(endpoint.getGroup())) {
					containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
				}
				else {
					containerGroup = new ArrayList<MessageListenerContainer>();
					this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
				}
				containerGroup.add(container);
			}

            // 2.是否立即启动消息监听
			if (startImmediately) {
				startIfNecessary(container);
			}
		}
	}

1.4、startIfNecessary

KafkaListenerEndpointRegistry#startIfNecessary
启动消息监听

	private void startIfNecessary(MessageListenerContainer listenerContainer) {
		if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
		    // 启动消息监听
		    // 到这一步之后,消息监听以及处理都是KafkaMessageListenerContainer的逻辑
		    // 到此也就打通了@KafkaListener到MessageListenerContainer消息监听容器的逻辑
			listenerContainer.start();
		}
	}

2、afterSingletonsInstantiated

这一步是实例化(此处的实例化是已经创建对象并完成了初始化操作)之后,紧接着的操作

KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated

	public void afterSingletonsInstantiated() {
		this.registrar.setBeanFactory(this.beanFactory);

        // 对"注册员"信息的完善
		if (this.beanFactory instanceof ListableBeanFactory) {
			Map<String, KafkaListenerConfigurer> instances =
					((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
			for (KafkaListenerConfigurer configurer : instances.values()) {
				configurer.configureKafkaListeners(this.registrar);
			}
		}

		if (this.registrar.getEndpointRegistry() == null) {
			if (this.endpointRegistry == null) {
				Assert.state(this.beanFactory != null,
						"BeanFactory must be set to find endpoint registry by bean name");
				this.endpointRegistry = this.beanFactory.getBean(
						KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
						KafkaListenerEndpointRegistry.class);
			}
			this.registrar.setEndpointRegistry(this.endpointRegistry);
		}

		......

		// Actually register all listeners
		// 整个方法这里才是关键
		// 创建MessageListenerContainer并注册
		this.registrar.afterPropertiesSet();
	}

2.1、afterPropertiesSet

KafkaListenerEndpointRegistrar#afterPropertiesSet

	public void afterPropertiesSet() {
		registerAllEndpoints();
	}

2.2、registerAllEndpoints

KafkaListenerEndpointRegistrar#registerAllEndpoints

	protected void registerAllEndpoints() {
		synchronized (this.endpointDescriptors) {
			for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
			    // 这里是真正的创建ListenerContainer监听对象并注册
				this.endpointRegistry.registerListenerContainer(
						descriptor.endpoint, resolveContainerFactory(descriptor));
			}
			// 启动时所有消息监听对象都注册之后,便将参数置为true
			this.startImmediately = true;  // trigger immediate startup
		}
	}

总结

以上便是整个流程,总体感觉就是将kafka消息监听融入到spring生命周期中,并完美契合

调试及相关源码版本:

org.springframework.boot::2.3.3.RELEASE
spring-kafka:2.5.4.RELEASE

相关参考:

spring-kafka官方文档
spring容器之refresh方法

到此这篇关于spring-Kafka中的@KafkaListener深入源码解读的文章就介绍到这了,更多相关spring-Kafka @KafkaListener内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot+kafka中@KafkaListener动态指定多个topic问题

    目录 说明 总结一下大家问的最多的一个问题 终极方法 思路 实现 代码 总结 说明 本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的消费注解@KafkaListener 首先,application.properties中配置用逗号隔开的多个topic. 方法:利用Spring的SpEl表达式,将topics 配置为:@KafkaListener(topics = “#{’${topics}’.split(’,’)}”) 运行程序,console打

  • Spring Boot 中使用@KafkaListener并发批量接收消息的完整代码

    kakfa是我们在项目开发中经常使用的消息中间件.由于它的写性能非常高,因此,经常会碰到读取Kafka消息队列时拥堵的情况.遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic.因此只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,再启动一个新的消费者,没有作用). 完整的代码在这里,欢迎加星号.fork. 官方文档在https://docs.spring.io/spring-kafka/reference/h

  • spring-Kafka中的@KafkaListener深入源码解读

    目录 前言 一.总体流程 二.源码解读 1.postProcessAfterInitialization 1.1.processKafkaListener 1.2.processListener 1.3.registerEndpoint 1.4.startIfNecessary 2.afterSingletonsInstantiated 2.1.afterPropertiesSet 2.2.registerAllEndpoints 总结 前言 本文主要通过深入了解源码,梳理从spring启动到真

  • SPRING BOOT启动命令参数及源码详析

    前言 使用过Spring Boot,我们都知道通过java -jar可以快速启动Spring Boot项目.同时,也可以通过在执行jar -jar时传递参数来进行配置.本文带大家系统的了解一下Spring Boot命令行参数相关的功能及相关源码分析. 命令行参数使用 启动Spring Boot项目时,我们可以通过如下方式传递参数: java -jar xxx.jar --server.port=8081 默认情况下Spring Boot使用8080端口,通过上述参数将其修改为8081端口,而且通

  • Spring Kafka中如何通过参数配置解决超时问题详解

    目录 背景 思路 过程 步骤一,查询版本特性 步骤二,查源码 步骤三,查自身的代码 总结 背景 这是我们团队负责的一个不太核心的服务.之前与外部交互时应外部要求由普通kafka集群改成加密kafka集群.我们是数据生产端. 改的过程中并跑上线,60%的请求耗时增加了2倍,也还是在百毫秒的量级可以接受.但是每次重启的第一个请求要5s以上,会超过:运行过程中,一两个月也会有一次超时.因为我们有三次重试,整体没有影响成功率. 上线的时候我们问过网络组,还专门请教过公司专业负责kafka的团队.结论是:

  • Spring Cloud集成Nacos Config动态刷新源码剖析

    目录 正文 Nacos Config动态刷新机制 Nacos Config 长轮询源码剖析 ClientWorker构造器初始化线程池 长轮询流程方法 正文 从远端服务器获取变更数据的主要模式有两种:推(push)和拉(pull).Push 模式简单来说就是服务端主动将数据变更信息推送给客户端,这种模式优点是时效性好,服务端数据发生变更可以立马通知到客户端,但这种模式需要服务端维持与客户端的心跳连接,会增加服务端实现的复杂度,服务端也需要占用更多的资源来维持与客户端的连接. 而 Pull 模式则

  • spring事务之事务挂起和事务恢复源码解读

    目录 事务挂起和事务恢复源码解读 事务挂起源码 suspend(transaction) newTransactionStatus() doBegin() 事务恢复 所以 事务挂起和事务恢复源码解读 在学习spring事务的时候,一定会涉及到一个概念,无法避免的,就是事务挂起和事务恢复 对于事务挂起和事务恢复,可以简单的描述一下,是这样的 1.首先我们假设有两个类,A类和B类,两个类中的字段是一模一样的,A类表示当前事务,B类表示备份事务 2.如果我开启一个事务,会把当前事务信息,存入到A类中,

  • Spring MVC策略模式之MethodArgumentResolver源码解析

    目录 正文 例子 源码分析 RequestParamMethodArgumentResolver: PathVariableMethodArgumentResolver: ModelMethodProcessor: 总结 正文 Spring MVC 是一个基于 MVC 设计模式的Web框架,它的核心就是 DispatcherServlet,它相当于请求的中央处理器.在 DispatcherServlet 中,它使用了 MethodArgumentResolver 来解析方法参数. MethodA

  • 详解go中panic源码解读

    panic源码解读 前言 本文是在go version go1.13.15 darwin/amd64上进行的 panic的作用 panic能够改变程序的控制流,调用panic后会立刻停止执行当前函数的剩余代码,并在当前Goroutine中递归执行调用方的defer: recover可以中止panic造成的程序崩溃.它是一个只能在defer中发挥作用的函数,在其他作用域中调用不会发挥作用: 举个栗子 package main import "fmt" func main() { fmt.

  • vue使用引用库中的方法附源码

    monaco-editor-vue的官方源码如下 Index.js import * as monaco from 'monaco-editor/esm/vs/editor/editor.api'; function noop() { } export { monaco }; export default { name: 'MonacoEditor', props: { diffEditor: { type: Boolean, default: false }, //是否使用diff模式 wid

  • Postgres中UPDATE更新语句源码分析

    目录 PG中UPDATE源码分析 整体流程分析 解析部分——生成语法解析树UpdateStmt 解析部分——生成查询树Query 优化器——生成执行计划 执行器 事务 总结 PG中UPDATE源码分析 本文主要描述SQL中UPDATE语句的源码分析,代码为PG13.3版本. 整体流程分析 以update dtea set id = 1;这条最简单的Update语句进行源码分析(dtea不是分区表,不考虑并行等,没有建立任何索引),帮助我们理解update的大致流程. SQL流程如下: parse

随机推荐