Spring框架集成Kakfa的方式

springboot集成kafka的方式

添加maven依赖

<dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.3.0</version>
</dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

配置application.yml

spring:kafka:producer:bootstrap-servers: ip:porttopics: topicsretries: 0batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:topics: topicsbootstrap-servers: ip:portgroup-id: group_idauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:security.protocol: SASL_SSLsasl.mechanism: PLAINsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";ssl.truststore.location: client.truststore.jksssl.truststore.password: trus_passwordssl.endpoint.identification.algorithm:

创建kafka生产者和消费者

在Spring Boot应用中,正确配置application.propertiesapplication.yml后,Spring Boot的Kafka自动配置(KafkaAutoConfiguration)会自动创建和装配KafkaTemplateKafkaConsumer等相关的Bean。

  • KafkaTemplate:用于发送消息到Kafka

  • ConsumerFactory:创建Kafka消费者的工厂

  • KafkaListenerContainerFactory:为@KafkaListener方法创建消息监听容器。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class KafkaMessageService {@Value("${spring.kafka.producer.topics}")private String outputTopic;@Autowiredprivate final KafkaTemplate<String, String> kafkaTemplate;/*** 监听输入主题的消息* @param message 接收到的消息*/@KafkaListener(topics = "${spring.kafka.consumer.topics}")public void listen(String message) {log.info("Received message:  message = {}", topic, message);// todo 处理消息// 发送到输出主题kafkaTemplate.send(outputTopic, processedMessage);log.info("Sent Processed Message: {}", processedMessage);}
}

手动配置kafka生产者和消费者

如果需要更复杂的配置,也可以自定义kafka的配置类。

kafka消费者配置类:

@Configuration
@Slf4j
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put("security.protocol", securityProtocol);props.put("sasl.mechanism", saslMechanism);props.put("sasl.jaas.config", saslJaasConfig);props.put("ssl.truststore.location", truststoreLocation);props.put("ssl.truststore.password", truststorePassword);props.put("ssl.endpoint.identification.algorithm", endpointIdentificationAlgorithm);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);// 设置并发消费者数量,模拟多个独立的消费者并发处理消息factory.setConcurrency(3);// 设置手动提交factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}

kafka生产者配置类:

@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.producer.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.key-serializer}")private String keyDeserializer;@Value("${spring.kafka.producer.value-serializer}")private String valueDeserializer;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>(4);configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyDeserializer);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueDeserializer);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {return new KafkaTemplate<>(producerFactory);}
}

监听消息并处理:

@Component
@Slf4j
public class KafkaMessageProcess {@Value("${spring.kafka.producer.topics}")private String outTopic;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@KafkaListener(topics = "#{'${spring.kafka.consumer.topics}'.split(',')}")public void listen(@Payload String message,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Acknowledgment acknowledgment) {log.info("Received message: topic = {}, message = {}", topic, message);// 手动确认消息,提交当前消息的偏移量(offset)到Kafka。Kafka会记录这个偏移量,表示该消息(及之前的所有消息)已被成功消费。acknowledgment.acknowledge();}private void process(String message) {// todo process msg}}

KafkaListener 源码分析

@KafkaListener 的注册

  1. 扫描注解:在bean初始化阶段,KafkaListenerAnnotationBeanPostProcessor 由于实现了BeanPostProcessor,会扫描所有 Bean,查找 @KafkaListener 注解
KafkaListenerAnnotationBeanPostProcessor// 注:省略了部分代码
// BeanPostProcessor接口提供的方法,是 Spring 框架的核心扩展机制之一,允许在 Bean 初始化后进行自定义处理。
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 扫描标注了KafkaListener的类Collection<KafkaListener> classLevelListeners = this.findListenerAnnotations(targetClass);// 扫描标注了KafkaListener的方法Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (methodx) -> {Set<KafkaListener> listenerMethods = this.findListenerAnnotations(methodx);return !listenerMethods.isEmpty() ? listenerMethods : null;});// 遍历扫描到的方法,解析签名Iterator var13 = annotatedMethods.entrySet().iterator();Map.Entry<Method, Set<KafkaListener>> entry = (Map.Entry)var13.next();Method method = (Method)entry.getKey();Iterator var11 = ((Set)entry.getValue()).iterator();while(var11.hasNext()) {KafkaListener listener = (KafkaListener)var11.next();// 扫描到后,后续的解析注册逻辑this.processKafkaListener(listener, method, bean, beanName);}return bean;
}
  1. 解析注解:提取 topicsgroupIdcontainerFactory 等信息。
KafkaListenerAnnotationBeanPostProcessorprotected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {// 解析注解,将注解元数据、方法、bean等静态配置封装到endpointthis.processKafkaListenerAnnotation(endpoint, kafkaListener, bean, topics, tps);String containerFactory = this.resolve(kafkaListener.containerFactory());KafkaListenerContainerFactory<?> listenerContainerFactory = this.resolveContainerFactory(kafkaListener, containerFactory, beanName);// 将上一步扫描到的listener、method等封装成endpoint,进行注册this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
}
  1. 注册监听端点:调用 KafkaListenerEndpointRegistrar.registerEndpoint() 注册监听器。
KafkaListenerEndpointRegistrarpublic void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaListenerContainerFactory<?> factory) {KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);synchronized(this.endpointDescriptors) {// 是否立即启动,// true:立即创建并启动对应的 MessageListenerContainer(Kafka 消费者容器)// false: 仅将端点信息保存到 endpointDescriptors 集合中,后续统一创建并启动if (this.startImmediately) {this.endpointRegistry.registerListenerContainer(descriptor.endpoint, this.resolveContainerFactory(descriptor), true);} else {this.endpointDescriptors.add(descriptor);}}
}// 统一创建KafkaMessageListenerContainer并启动
protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true;  // trigger immediate startup}
}public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,boolean startImmediately) {synchronized (this.listenerContainers) {// 创建MessageListenerContainer,这个方法是创建容器的环节,源码看下一节分析MessageListenerContainer container = createListenerContainer(endpoint, factory);// 将创建好的容器放到一个线程安全的map中this.listenerContainers.put(id, container);if (startImmediately) {// 启动startIfNecessary(container);}}
}

KafkaListenerContainerFactory 创建监听容器

KafkaMessageListenerContainer 是 Spring Kafka 的核心组件之一,负责 管理和执行 Kafka 消费者的消息监听逻辑,封装了原生 KafkaConsumer,提供了线程管理、消息拉取、监听器调用、错误处理等功能。

暂时回到我们开头的配置,这里我们配置的容器创建工厂是ConcurrentKafkaListenerContainerFactoryconcurrency=3表示启动三个线程并发处理消息,这个时候,则会由ConcurrentKafkaListenerContainerFactory创建ConcurrentMessageListenerContainer

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);// 设置并发消费者数量factory.setConcurrency(3);// 设置手动提交factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;
}

在ConcurrentMessageListenerContainer中有一个集合,到时候会根据concurrency创建对应数量的KafkaMessageListenerContainer 子容器。

private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();

容器创建代码

AbstractKafkaListenerContainerFactory// 
public C createListenerContainer(KafkaListenerEndpoint endpoint) {C instance = createContainerInstance(endpoint);JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName);if (endpoint instanceof AbstractKafkaListenerEndpoint) {configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);}endpoint.setupListenerContainer(instance, this.messageConverter);// 初始化容器的配置,endpoint中有静态的配置,比如topic信息、KafkaListener标记的方法、bane等,这里会将这些信息复制到容器中,还有initializeContainer(instance, endpoint);customizeContainer(instance);return instance;
}protected abstract C createContainerInstance(KafkaListenerEndpoint endpoint);ConcurrentKafkaListenerContainerFactory
// 调用子类的方法,这里是通过模板方法的设计模式,在抽象类中定义好整个流程,具体部分的实现由子类完成
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();if (topicPartitions != null && topicPartitions.length > 0) {ContainerProperties properties = new ContainerProperties(topicPartitions);return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {Collection<String> topics = endpoint.getTopics();if (!topics.isEmpty()) { // NOSONARContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern()); // NOSONARreturn new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}}
}

启动容器,消费消息

前面我们提到容器创建好后有一个启动的过程,也就是这一行代码startIfNecessary(container);,会真正启动容器,进一步触发消费者线程(ListenerConsumer)的初始化并开始消息消费流程。

KafkaListenerEndpointRegistrarprivate void startIfNecessary(MessageListenerContainer listenerContainer) {if ((this.contextRefreshed && this.alwaysStartAfterRefresh) || listenerContainer.isAutoStartup()) {listenerContainer.start();}
}// 调用到AbstractMessageListenerContainer的start方法,
public final void start() {checkGroupId();synchronized (this.lifecycleMonitor) {if (!isRunning()) {doStart();}}
}// 调用到ConcurrentMessageListenerContainer的doStart()方法,执行真正的启动逻辑
protected void doStart() {if (!isRunning()) {// 根据concurrency创建对应数量的子容器for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container =constructContainer(containerProperties, topicPartitions, i);configureChildContainer(i, container);if (isPaused()) {container.pause();}// 启动子容器container.start();// 保存到子容器列表this.containers.add(container);}}
}// 调用到KafkaMessageListenerContainer的doStart,启动子容器
protected void doStart() {// 创建消费者线程this.listenerConsumer = new ListenerConsumer(listener, listenerType);setRunning(true);// 阻塞等待消费者线程真正启动完成。this.startLatch = new CountDownLatch(1);// 提交到线程池,异步启动消费者线程。this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer);
}

消费消息的逻辑在ListenerConsumer中,该类实现了Runnable接口的run()方法,在run()方法中实现了拉取消息,并通过反射调用我们自定义的业务方法,进行消息处理等自定义逻辑。

ListenerConsumerpublic void run() {while (isRunning()) {try {// 从kafka拉取消息并通过反射调用业务方法pollAndInvoke();}catch (Exception e) {handleConsumerException(e);}finally {clearThreadState();}}
}protected void pollAndInvoke() {// 拉取消息ConsumerRecords<K, V> records = doPoll();// 通过反射调用到我们自定义的方法进行消息处理invokeIfHaveRecords(records);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/95093.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/95093.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/95093.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【蓝桥杯 2024 省 Python B】缴纳过路费

【蓝桥杯 2024 省 Python B】缴纳过路费 蓝桥杯专栏&#xff1a;2024 省 Python B 算法竞赛&#xff1a;图论&#xff0c;生成树&#xff0c;并查集&#xff0c;组合计数&#xff0c;kruskal 最小生成树&#xff0c;乘法原理 题目链接&#xff1a;洛谷 【蓝桥杯 2024 省 Python…

个性化导航新体验:cpolar让Dashy支持语音控制

文章目录简介1. 安装Dashy2. 安装cpolar3.配置公网访问地址4. 固定域名访问用 cpolar 让 Dashy 管理个人导航站就是这么简单&#xff01;三步轻松搞定&#xff1a;在电脑上安装 Dashy&#xff0c;拖拽添加常用网站&#xff0c;运行 cpolar 生成远程访问链接。这个方法不仅免费&…

SQL学习记录

基本的&#xff0c;增、删&#xff0c;改insert into table_name (列1, 列2,...) VALUES (值1, 值2,....)Delete from 表 where keyvalueupdate 表 set keyvalue,keyvalue where keyvalue查用的最多whereSELECT prod_name, prod_price FROM Products WHERE vend idDLLO1OR ve…

零基础学C++,函数篇~

C基础学习&#xff08;DAY_06&#xff09;函数1. 函数的定义与使用2. 函数参数传递3. 变量的声明周期4. 函数的其他特性5. 函数的嵌套与递归函数 1. 函数的定义与使用 ​ 在设计程序时&#xff0c;如果一段代码重复进行某种操作或者完成一个特定的功能&#xff0c;就应该将这…

react+vite+ts 组件模板

1.创建项目npm create vitelatest my-app --template react-ts2.配置项目 tsconfig.json{"compilerOptions": {"target": "ES2020","useDefineForClassFields": true,"lib": ["ES2020", "DOM", "D…

C语言 - 输出参数详解:从简单示例到 alloc_chrdev_region

C语言中的输出参数详解&#xff1a;以 alloc_chrdev_region 为例 在学习 C 语言函数调用时&#xff0c;我们常常接触到“输入参数”&#xff0c;比如把一个数字传给函数&#xff0c;让函数帮我们算出结果。但有时候可能会发现&#xff0c;有些函数除了返回值之外&#xff0c;还…

机器视觉学习-day09-图像矫正

1 仿射变换与透视变换1.1 仿射变换之前在图像旋转实验中已经接触过仿射变换&#xff0c;仿射变换是一个二维坐标系到另一个二维坐标系的过程&#xff0c;在仿射变换中符合直线的平直性和平行性。1.2 透视变换透视变换是把一个图像投影到一个新的视平面的过程。在现实世界中&…

杰理ac791获取之前版本sdk

很惭愧&#xff0c;一个如此简单的问题卡了这么久&#xff0c;运动战的本质就是多找线索&#xff0c;多尝试

基于轴重转移补偿和多轴协调的粘着控制方法研究

基于轴重转移补偿和多轴协调的粘着控制方法研究 1. 论文标题 基于轴重转移补偿和多轴协调的粘着控制方法研究 2. 内容概括 该论文针对重载电力机车在复杂轨面条件下易发生空转的问题,提出了一种新型粘着控制方法。传统方法仅考虑单轴粘着利用而忽略轴间关系,本文设计了包…

台达 PLC 软件导入 EDS 文件后不能通过 PDO 控制的解决方法

一、功能及注意事项 1.功能说明&#xff1a;通过修改 EDS 文件处理台达 PLC 软件导入 EDS 文件后不能通过 PDO 控制的解决方法 2.注意事项&#xff1a;1).此文档只针对立迈胜 CANopen 通讯一体化电机&#xff1b; 2).EDS 文件可以用记事本打开&#xff1b; 二、EDS 文件修改 IS…

Python库2——Matplotlib2

上一篇文章主要介绍了Matplotlib库中的Pyplot模块中几大常见图像的绘制&#xff0c;包括自行修改图像的属性&#xff0c;在绘制图像时会自动创建一个图形窗口来展现这些图像。本节内容继续讲讲这个&#xff08;Figure&#xff09;图像窗口即其一些常见用法。 其他python库链接…

AI生成思维导图和AI生成Excel公式

AI生成思维导图和AI生成Excel公式 AI 生成思维导图和 AI 生成 Excel 公式是一个完全免费的 AI 办公合集网站。 它完全免费&#xff0c;一个网站支持多个实用 AI 办公功能&#xff0c;包括&#xff1a;免费 AI Excel 公式生成器、输入 Excel 公式解释含义、AI Excel 助手、Exc…

java中的VO、DAO、BO、PO、DO、DTO

VO、DAO、BO 等对象在了解这里 po、vo、dao、之前先介绍下 MVC 开发模式M层负责与数据库打交道&#xff1b;C层负责业务逻辑的编写&#xff1b;V层负责给用户展示&#xff08;针对于前后端不分离的项目&#xff0c;不分离项目那种编写模版的方式&#xff0c;理解V的概念更直观&…

More Effective C++ 条款16:牢记80-20准则(Remember the 80-20 Rule)

More Effective C 条款16&#xff1a;牢记80-20准则&#xff08;Remember the 80-20 Rule&#xff09;核心思想&#xff1a;软件性能优化遵循帕累托原则&#xff08;Pareto Principle&#xff09;&#xff0c;即大约80%的性能提升来自于优化20%的关键代码。识别并专注于这些关键…

Java中对泛型的理解

一、泛型是什么&#xff1f;1. 定义&#xff1a; 泛型允许你在定义类、接口或方法时使用类型参数&#xff08;Type Parameter&#xff09;。在使用时&#xff08;如声明变量、创建实例时&#xff09;&#xff0c;再用具体的类型实参&#xff08;Type Argument&#xff09; 替换…

Redis开发06:使用stackexchange.redis库结合WebAPI对redis进行增删改查

一、接口写法namespace WebApplication1.Controllers.Redis {[ApiController][Route("/api/[controller]")]public class RedisService : IRedisService{private readonly IConnectionMultiplexer _redis;//StackExchange.Redis库自带接口public RedisService(IConne…

【前端教程】从零开始学JavaScript交互:7个经典事件处理案例解析

在网页开发中&#xff0c;交互性是提升用户体验的关键。JavaScript作为网页交互的核心语言&#xff0c;通过事件处理机制让静态页面"动"了起来。本文将通过7个经典案例&#xff0c;从简单到复杂&#xff0c;逐步讲解JavaScript事件处理的实现方法和应用场景。 案例1&…

内存模型(Memory Model)是什么?

内存模型(Memory Model)是什么? 内存模型是一个非常深刻且核心的计算机科学概念。 核心摘要 内存模型是一个契约或协议,它精确定义了: 一个线程对共享内存的写操作,如何以及何时对其他线程可见。 内存操作(读/写)可以被重新排序的程度。 它连接了硬件(CPU如何执行指令…

在 MyBatis 中oracle基本数值类型的 JDBC 类型映射

基本数值类型的 JDBC 类型Java 类型JDBC 类型 (jdbcType)说明int / IntegerINTEGER标准整数类型long / LongBIGINT大整数类型short / ShortSMALLINT小整数类型float / FloatFLOAT单精度浮点数double / DoubleDOUBLE双精度浮点数java.math.BigDecimalDECIMAL高精度小数&#xff…

Spring注解演进与自动装配原理深度解析:从历史发展到自定义Starter实践

目录 Spring注解发展史 Spring 1.X Spring 2.X Spring 2.5之前 Required Repository Aspect Spring2.5 之后 Spring 3.x ComponentScan Import 静态导入 ImportSelector ImportBeanDefinitionRegistrar EnableXXX Spring 4.x Spring 5.x 什么是SPI 自动装配…