Spring Kafka中@KafkaListener源码解析详解?

2026-05-21 01:044阅读0评论SEO资讯
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1121个文字,预计阅读时间需要5分钟。

Spring Kafka中@KafkaListener源码解析详解?

目录+前言+一、总体流程+二、源码解读

1.postProcessAfterInitialization

1.1 processKafkaListener 1.2 processListener 1.3 registerEndpoint 1.4 startIfNecessary

2.afterSingletonsInstantiated

2.1 afterPropertiesSet

目录
  • 前言
  • 一、总体流程
  • 二、源码解读
    • 1、postProcessAfterInitialization
      • 1.1、processKafkaListener
      • 1.2、processListener
      • 1.3、registerEndpoint
      • 1.4、startIfNecessary
    • 2、afterSingletonsInstantiated
      • 2.1、afterPropertiesSet
      • 2.2、registerAllEndpoints
  • 总结

    前言

    本文主要通过深入了解源码,梳理从spring启动到真正监听kafka消息的这套流程

    一、总体流程

    从spring启动开始处理@KafkaListener,到start消息监听整体流程图

    二、源码解读

    1、postProcessAfterInitialization

    KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { if (!this.nonAnnotatedClasses.contains(bean.getClass())) { Class<?> targetClass = AopUtils.getTargetClass(bean); // 扫描@KafkaListener注解 Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass); ...... if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(bean.getClass()); this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass()); } else { // Non-empty set of methods for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); // 遍历扫描到的所有@KafkaListener注解并开始处理 for (KafkaListener listener : entry.getValue()) { processKafkaListener(listener, method, bean, beanName); } } this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '" + beanName + "': " + annotatedMethods); } // 处理在类上的@KafkaListener注解 if (hasClassLevelListeners) { processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); } } return bean; }

    1.1、processKafkaListener

    KafkaListenerAnnotationBeanPostProcessor#processKafkaListener

    protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) { Method methodToUse = checkProxy(method, bean); MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setMethod(methodToUse); processListener(endpoint, kafkaListener, bean, methodToUse, beanName); }

    1.2、processListener

    KafkaListenerAnnotationBeanPostProcessor#processListener

    将每个kafkaListener转变成MethodKafkaListenerEndpoint并注册到KafkaListenerEndpointRegistrar容器,方便后续统一启动监听

    protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, Object adminTarget, String beanName) { String beanRef = kafkaListener.beanRef(); if (StringUtils.hasText(beanRef)) { this.listenerScope.addListener(beanRef, bean); } endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(kafkaListener)); endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId())); endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener)); endpoint.setTopics(resolveTopics(kafkaListener)); endpoint.setTopicPattern(resolvePattern(kafkaListener)); endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix")); String group = kafkaListener.containerGroup(); ...... // 注册已经封装好的消费端-endpoint this.registrar.registerEndpoint(endpoint, factory); if (StringUtils.hasText(beanRef)) { this.listenerScope.removeListener(beanRef); } }

    1.3、registerEndpoint

    KafkaListenerEndpointRegistrar#registerEndpoint

    public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) { ...... KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory); synchronized (this.endpointDescriptors) { // 如果到了需要立即启动监听的阶段就直接注册并监听(也就是创建消息监听容器并启动) if (this.startImmediately) { // Register and start immediately this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor), true); } else { // 一般情况都先走这一步,添加至此列表,待bean后续的生命周期 统一注册并启动 this.endpointDescriptors.add(descriptor); } } } public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) { ...... synchronized (this.listenerContainers) { ...... // 1.创建消息监听容器 MessageListenerContainer container = createListenerContainer(endpoint, factory); this.listenerContainers.put(id, container); if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) { List<MessageListenerContainer> containerGroup; if (this.applicationContext.containsBean(endpoint.getGroup())) { containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class); } else { containerGroup = new ArrayList<MessageListenerContainer>(); this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup); } containerGroup.add(container); } // 2.是否立即启动消息监听 if (startImmediately) { startIfNecessary(container); } } }

    1.4、startIfNecessary

    KafkaListenerEndpointRegistry#startIfNecessary
    启动消息监听

    private void startIfNecessary(MessageListenerContainer listenerContainer) { if (this.contextRefreshed || listenerContainer.isAutoStartup()) { // 启动消息监听 // 到这一步之后,消息监听以及处理都是KafkaMessageListenerContainer的逻辑 // 到此也就打通了@KafkaListener到MessageListenerContainer消息监听容器的逻辑 listenerContainer.start(); } }

    2、afterSingletonsInstantiated

    这一步是实例化(此处的实例化是已经创建对象并完成了初始化操作)之后,紧接着的操作

    Spring Kafka中@KafkaListener源码解析详解?

    KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated

    public void afterSingletonsInstantiated() { this.registrar.setBeanFactory(this.beanFactory); // 对"注册员"信息的完善 if (this.beanFactory instanceof ListableBeanFactory) { Map<String, KafkaListenerConfigurer> instances = ((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class); for (KafkaListenerConfigurer configurer : instances.values()) { configurer.configureKafkaListeners(this.registrar); } } if (this.registrar.getEndpointRegistry() == null) { if (this.endpointRegistry == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find endpoint registry by bean name"); this.endpointRegistry = this.beanFactory.getBean( KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, KafkaListenerEndpointRegistry.class); } this.registrar.setEndpointRegistry(this.endpointRegistry); } ...... // Actually register all listeners // 整个方法这里才是关键 // 创建MessageListenerContainer并注册 this.registrar.afterPropertiesSet(); }

    2.1、afterPropertiesSet

    KafkaListenerEndpointRegistrar#afterPropertiesSet

    public void afterPropertiesSet() { registerAllEndpoints(); }

    2.2、registerAllEndpoints

    KafkaListenerEndpointRegistrar#registerAllEndpoints

    protected void registerAllEndpoints() { synchronized (this.endpointDescriptors) { for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) { // 这里是真正的创建ListenerContainer监听对象并注册 this.endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); } // 启动时所有消息监听对象都注册之后,便将参数置为true this.startImmediately = true; // trigger immediate startup } }

    总结

    以上便是整个流程,总体感觉就是将kafka消息监听融入到spring生命周期中,并完美契合

    调试及相关源码版本:

    org.springframework.boot::2.3.3.RELEASE spring-kafka:2.5.4.RELEASE

    相关参考:

    spring-kafka官方文档
    spring容器之refresh方法

    到此这篇关于spring-Kafka中的@KafkaListener深入源码解读的文章就介绍到这了,更多相关spring-Kafka @KafkaListener内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!

    标签:KafkaListener

    本文共计1121个文字,预计阅读时间需要5分钟。

    Spring Kafka中@KafkaListener源码解析详解?

    目录+前言+一、总体流程+二、源码解读

    1.postProcessAfterInitialization

    1.1 processKafkaListener 1.2 processListener 1.3 registerEndpoint 1.4 startIfNecessary

    2.afterSingletonsInstantiated

    2.1 afterPropertiesSet

    目录
    • 前言
    • 一、总体流程
    • 二、源码解读
      • 1、postProcessAfterInitialization
        • 1.1、processKafkaListener
        • 1.2、processListener
        • 1.3、registerEndpoint
        • 1.4、startIfNecessary
      • 2、afterSingletonsInstantiated
        • 2.1、afterPropertiesSet
        • 2.2、registerAllEndpoints
    • 总结

      前言

      本文主要通过深入了解源码,梳理从spring启动到真正监听kafka消息的这套流程

      一、总体流程

      从spring启动开始处理@KafkaListener,到start消息监听整体流程图

      二、源码解读

      1、postProcessAfterInitialization

      KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

      public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { if (!this.nonAnnotatedClasses.contains(bean.getClass())) { Class<?> targetClass = AopUtils.getTargetClass(bean); // 扫描@KafkaListener注解 Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass); ...... if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(bean.getClass()); this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass()); } else { // Non-empty set of methods for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); // 遍历扫描到的所有@KafkaListener注解并开始处理 for (KafkaListener listener : entry.getValue()) { processKafkaListener(listener, method, bean, beanName); } } this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '" + beanName + "': " + annotatedMethods); } // 处理在类上的@KafkaListener注解 if (hasClassLevelListeners) { processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); } } return bean; }

      1.1、processKafkaListener

      KafkaListenerAnnotationBeanPostProcessor#processKafkaListener

      protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) { Method methodToUse = checkProxy(method, bean); MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setMethod(methodToUse); processListener(endpoint, kafkaListener, bean, methodToUse, beanName); }

      1.2、processListener

      KafkaListenerAnnotationBeanPostProcessor#processListener

      将每个kafkaListener转变成MethodKafkaListenerEndpoint并注册到KafkaListenerEndpointRegistrar容器,方便后续统一启动监听

      protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, Object adminTarget, String beanName) { String beanRef = kafkaListener.beanRef(); if (StringUtils.hasText(beanRef)) { this.listenerScope.addListener(beanRef, bean); } endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(kafkaListener)); endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId())); endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener)); endpoint.setTopics(resolveTopics(kafkaListener)); endpoint.setTopicPattern(resolvePattern(kafkaListener)); endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix")); String group = kafkaListener.containerGroup(); ...... // 注册已经封装好的消费端-endpoint this.registrar.registerEndpoint(endpoint, factory); if (StringUtils.hasText(beanRef)) { this.listenerScope.removeListener(beanRef); } }

      1.3、registerEndpoint

      KafkaListenerEndpointRegistrar#registerEndpoint

      public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) { ...... KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory); synchronized (this.endpointDescriptors) { // 如果到了需要立即启动监听的阶段就直接注册并监听(也就是创建消息监听容器并启动) if (this.startImmediately) { // Register and start immediately this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor), true); } else { // 一般情况都先走这一步,添加至此列表,待bean后续的生命周期 统一注册并启动 this.endpointDescriptors.add(descriptor); } } } public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) { ...... synchronized (this.listenerContainers) { ...... // 1.创建消息监听容器 MessageListenerContainer container = createListenerContainer(endpoint, factory); this.listenerContainers.put(id, container); if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) { List<MessageListenerContainer> containerGroup; if (this.applicationContext.containsBean(endpoint.getGroup())) { containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class); } else { containerGroup = new ArrayList<MessageListenerContainer>(); this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup); } containerGroup.add(container); } // 2.是否立即启动消息监听 if (startImmediately) { startIfNecessary(container); } } }

      1.4、startIfNecessary

      KafkaListenerEndpointRegistry#startIfNecessary
      启动消息监听

      private void startIfNecessary(MessageListenerContainer listenerContainer) { if (this.contextRefreshed || listenerContainer.isAutoStartup()) { // 启动消息监听 // 到这一步之后,消息监听以及处理都是KafkaMessageListenerContainer的逻辑 // 到此也就打通了@KafkaListener到MessageListenerContainer消息监听容器的逻辑 listenerContainer.start(); } }

      2、afterSingletonsInstantiated

      这一步是实例化(此处的实例化是已经创建对象并完成了初始化操作)之后,紧接着的操作

      Spring Kafka中@KafkaListener源码解析详解?

      KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated

      public void afterSingletonsInstantiated() { this.registrar.setBeanFactory(this.beanFactory); // 对"注册员"信息的完善 if (this.beanFactory instanceof ListableBeanFactory) { Map<String, KafkaListenerConfigurer> instances = ((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class); for (KafkaListenerConfigurer configurer : instances.values()) { configurer.configureKafkaListeners(this.registrar); } } if (this.registrar.getEndpointRegistry() == null) { if (this.endpointRegistry == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find endpoint registry by bean name"); this.endpointRegistry = this.beanFactory.getBean( KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, KafkaListenerEndpointRegistry.class); } this.registrar.setEndpointRegistry(this.endpointRegistry); } ...... // Actually register all listeners // 整个方法这里才是关键 // 创建MessageListenerContainer并注册 this.registrar.afterPropertiesSet(); }

      2.1、afterPropertiesSet

      KafkaListenerEndpointRegistrar#afterPropertiesSet

      public void afterPropertiesSet() { registerAllEndpoints(); }

      2.2、registerAllEndpoints

      KafkaListenerEndpointRegistrar#registerAllEndpoints

      protected void registerAllEndpoints() { synchronized (this.endpointDescriptors) { for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) { // 这里是真正的创建ListenerContainer监听对象并注册 this.endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); } // 启动时所有消息监听对象都注册之后,便将参数置为true this.startImmediately = true; // trigger immediate startup } }

      总结

      以上便是整个流程,总体感觉就是将kafka消息监听融入到spring生命周期中,并完美契合

      调试及相关源码版本:

      org.springframework.boot::2.3.3.RELEASE spring-kafka:2.5.4.RELEASE

      相关参考:

      spring-kafka官方文档
      spring容器之refresh方法

      到此这篇关于spring-Kafka中的@KafkaListener深入源码解读的文章就介绍到这了,更多相关spring-Kafka @KafkaListener内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!

      标签:KafkaListener