@KafkaListener 是 Spring Kafka 提供的一个核心注解,用于标记一个方法作为 Kafka 消息的消费者。下面是对该注解的详细解析:

基本用法

@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void listen(String message) {System.out.println("Received Message: " + message);
}

主要属性

1. 必需属性

  • topics / topicPattern:指定监听的 topic
    • topics:逗号分隔的 topic 列表
    • `topicPattern**:使用正则表达式匹配 topic
@KafkaListener(topics = "topic1,topic2")
// 或
@KafkaListener(topicPattern = "test.*")

2. 消费者配置

  • groupId:指定消费者组 ID
  • containerFactory:指定使用的 KafkaListenerContainerFactory
@KafkaListener(topics = "myTopic", groupId = "myGroup", containerFactory = "myFactory")

3. 消息处理

  • id:为监听器指定唯一 ID
  • concurrency:设置并发消费者数量
@KafkaListener(id = "myListener", topics = "myTopic", concurrency = "3")

4. 高级配置

  • containerGroup:指定容器组(Spring Kafka 2.5+)
  • errorHandler:指定错误处理器
  • idIsGroup:是否使用监听器 ID 作为组 ID(默认 false)

消息处理方法签名

监听器方法可以接受多种形式的参数:

  1. 简单消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(String message) { ... }
    
  2. 带元数据的消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(ConsumerRecord<?, ?> record) { ... }
    
  3. 批量消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(List<String> messages) { ... }
    
  4. 带确认的消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(String message, Acknowledgment ack) {// 处理消息后手动确认ack.acknowledge();
    }
    

配置选项

可以通过 @KafkaListenercontainerFactory 属性引用自定义配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> myFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;
}@KafkaListener(topics = "myTopic", containerFactory = "myFactory")
public void listen(String message) { ... }

错误处理

可以通过以下方式处理错误:

  1. 配置错误处理器

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setErrorHandler(new SeekToCurrentErrorHandler());return factory;
    }
    
  2. 使用 @SendTo 发送到死信队列

    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    @SendTo("myDltTopic")
    public String listen(String message) {// 处理失败时返回错误消息return "error";
    }
    

注意事项

  1. 监听器方法应该是 public 的
  2. 避免在监听器方法中执行长时间运行的操作
  3. 考虑消息处理的幂等性
  4. 对于批量处理,确保方法参数是 List 类型
  5. 在 Spring Boot 中,许多配置可以通过 application.properties/yml 设置

完整示例

@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}
}@Service
public class KafkaMessageListener {@KafkaListener(topics = "myTopic", groupId = "myGroup", containerFactory = "kafkaListenerContainerFactory")public void listen(String message, Acknowledgment ack) {try {System.out.println("Received Message: " + message);// 业务处理逻辑ack.acknowledge();} catch (Exception e) {// 错误处理}}
}

@KafkaListener 注解提供了灵活的方式来消费 Kafka 消息,开发者可以根据具体需求进行配置和扩展。

ConcurrentKafkaListenerContainerFactory详解

在Spring Kafka中,ConcurrentKafkaListenerContainerFactory是一个核心配置类,用于创建并发消息监听容器,支持多线程消费Kafka消息,以下是其详细介绍:

1、核心作用

  1. 并发消费支持:通过创建多个KafkaMessageListenerContainer实例(每个对应一个线程),实现多线程并发消费消息。例如设置concurrency=3会创建3个消费者线程,每个线程处理分配到的分区。
  2. 线程安全保障:生成的ConcurrentMessageListenerContainer内部委托给多个单线程的KafkaMessageListenerContainer实例,保证线程安全性(Kafka Consumer本身非线程安全)。

2、关键特性

  1. 并发度配置

    • 通过setConcurrency()方法设置并发消费者数量,可提高消息处理速度和吞吐量。
    • 配置规则为concurrency<=分区数/应用实例数,设置过多会导致线程闲置。
  2. 批量处理支持

    • 通过setBatchListener(true)启用批量消费
    • 配合MAX_POLL_RECORDS_CONFIG参数控制单次poll最大返回记录数
  3. 错误处理机制

    • 可配置自定义错误处理器(如SeekToCurrentErrorHandler
    • 支持重试策略集成
  4. 分区分配控制

    • 可自定义分区分配逻辑
    • 配合group.id实现消费者组协调

3、配置示例

@Configuration
@EnableKafka
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 设置并发消费者数量factory.setBatchListener(true); // 启用批量消费factory.getContainerProperties().setPollTimeout(3000); // 设置轮询超时factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 设置错误处理器return factory;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // 批量消费配置return new DefaultKafkaConsumerFactory<>(props);}
}

4、使用场景

  1. 高吞吐量需求:通过增加并发消费者数量提升处理能力
  2. 批量数据处理:需要批量处理消息的场景
  3. 复杂错误处理:需要自定义错误处理逻辑的场景
  4. 多主题监听:需要同时监听多个主题的场景

5、注意事项

  1. 顺序性问题:并发消费可能导致消息顺序混乱,需业务保证
  2. 重复处理问题:需实现幂等性处理机制
  3. 数据库访问:需注意并发访问控制
  4. 资源限制:并发度设置需考虑系统资源限制

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/908728.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/908728.shtml
英文地址,请注明出处:http://en.pswp.cn/news/908728.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

多区域协同的异地多活AI推理服务架构

&#x1f310;多区域协同的异地多活AI推理服务架构 #mermaid-svg-TTnpRKKC7k3twxhE {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-TTnpRKKC7k3twxhE .error-icon{fill:#552222;}#mermaid-svg-TTnpRKKC7k3twxhE .er…

极客时间:在 Google Colab 上尝试 Prefix Tuning

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

Android设备推送traceroute命令进行网络诊断

文章目录 工作原理下载traceroute for android推送到安卓设备执行traceroutetraceroute www.baidu.com Traceroute&#xff08;追踪路由&#xff09; 是一个用于网络诊断的工具&#xff0c;主要用于追踪数据包从源主机到目标主机所经过的路由路径&#xff0c;以及每一跳&#x…

【Linux应用】Linux系统日志上报服务,以及thttpd的配置、发送函数

【Linux应用】Linux系统日志上报服务&#xff0c;以及thttpd的配置、发送函数 文章目录 thttpd服务安装thttpd配置thttpd服务thttpd函数日志效果和文件附录&#xff1a;开发板快速上手&#xff1a;镜像烧录、串口shell、外设挂载、WiFi配置、SSH连接、文件交互&#xff08;RADX…

Linux 内核内存管理子系统全面解析与体系构建

一、前言: 为什么内存管理是核心知识 内存管理是 Linux 内核最核心也最复杂的子系统之一&#xff0c;其作用包括&#xff1a; 为软件提供独立的虚拟内存空间&#xff0c;实现安全隔离分配/回收物理内存资源&#xff0c;维持系统稳定支持不同类型的内存分配器&#xff0c;最优…

鼠标的拖动效果

1、变量的设置 let isDragging false; let startX; let startY&#xff1b; let endX; let endY; let box null;isDragging : 表示是否推拽startX、startY&#xff1a;表示起始坐标&#xff0c;相对于元素endX、endY&#xff1a;表示结束坐标&#xff0c;相对于元素box&…

SwaggerFuzzer:一款自动化 OpenAPI/Swagger 接口未授权访问测试工具

SwaggerFuzzer &#x1f310; 一款自动化 OpenAPI/Swagger 接口未授权访问测试工具&#x1f680; 工具介绍&#xff1a;SwaggerFuzzer✨ 核心功能亮点&#x1f680; 快速使用&#x1f9f0; 支持参数 &#x1f4cc; 项目结构&#x1f4e5; 获取与下载 &#x1f310; 一款自动化 …

文献阅读:Exploring Autoencoder-based Error-bounded Compression for Scientific Data

目录 论文简介动机&#xff1a;为什么作者想要解决这个问题&#xff1f;贡献&#xff1a;作者在这篇论文中完成了什么工作(创新点)&#xff1f;规划&#xff1a;他们如何完成工作&#xff1f;离线训练阶段&#xff1a;在线压缩阶段 理由&#xff1a;通过什么实验验证它们的工作…

【业务框架】3C-相机-Cinemachine

概述 插件&#xff0c;做相机需求&#xff0c;等于相机老师傅多年经验总结的工具 Feature Transform&#xff1a;略Control Camera&#xff1a;控制相机参数Noise&#xff1a;增加随机性Blend&#xff1a;CameraBrain的混合列表指定一个虚拟相机到另一个相机的过渡&#xff…

设计一个算法:删除非空单链表L中结点值为x的第一个结点的前驱结点

目录 单链表的存储结构定义如下 快慢指针法 三指针法版本① 三指针法版本② 单链表的存储结构定义如下 typedef struct{Elemtype data;struct Node* next; }LNode,*LinkList; 快慢指针法 void deleteprex(LinkList L, Elemtype e) {if (L NULL || L->next NULL ||…

【Qt】:设置新建类模板

完整的头文件模板 #ifndef %FILENAME%_H #define %FILENAME%_H/*** brief The %CLASSNAME% class* author %USER%* date %DATE%*/ class %CLASSNAME% { public:%CLASSNAME%();~%CLASSNAME%();// 禁止拷贝构造和赋值%CLASSNAME%(const %CLASSNAME%&) delete;%CLASSNAME%&a…

​**​CID字体​**​ 和 ​**​Simple字体​**​

在PDF中&#xff0c;字体类型主要分为 ​​CID字体​​ 和 ​​Simple字体​​ 两大类&#xff0c;它们的主要区别在于编码方式和适用场景。以下是它们的详细对比&#xff1a; ​​1. CID字体&#xff08;CID-keyed Fonts&#xff09;​​ CID&#xff08;Character Identifie…

计组_导学

2025.05.31:老汤讲408计组学习笔记 导学 第1章计算机系统概述:对计算机系统有全局的认识第2章总线系统:简单且独立,不会依赖其他内容,它是被依赖的第3章主存储器:只有了解主存储器的内部结构,才能理解在主存中是如何存储二进制的第4章数据的表示与运算:各种编码以及计算…

【GPT模型训练】第二课:张量与秩:从数学本质到深度学习的基础概念解析

这里写自定义目录标题 张量&#xff08;Tensor&#xff09;的定义关键特点&#xff1a;示例&#xff1a; 张量的秩&#xff08;Rank&#xff09;示例&#xff1a;“秩”的拼音常见混淆点 总结 张量&#xff08;Tensor&#xff09;的定义 在数学和物理学中&#xff0c;张量是一…

RabbitMQ work模型

Work 模型是 RabbitMQ 最基础的消息处理模式&#xff0c;核心思想是 ​​多个消费者竞争消费同一个队列中的消息​​&#xff0c;适用于任务分发和负载均衡场景。同一个消息只会被一个消费者处理。 当一个消息队列绑定了多个消费者&#xff0c;每个消息消费的个数都是平摊的&a…

【Linux操作系统】基础开发工具(yum、vim、gcc/g++)

文章目录 Linux软件包管理器 - yumLinux下的三种安装方式什么是软件包认识Yum与RPMyum常用指令更新软件安装与卸载查找与搜索清理缓存与重建元数据 yum源更新1. 备份现有的 yum 源配置2. 下载新的 repo 文件3. 清理并重建缓存 Linux编辑器 - vim启动vimVim 的三种主要模式常用操…

73常用控件_QFormLayout的使用

目录 代码⽰例: 使⽤ QFormLayout 创建表单. 除了上述的布局管理器之外, Qt 还提供了 QFormLayout , 属于是 QGridLayout 的特殊情况, 专 ⻔⽤于实现两列表单的布局. 这种表单布局多⽤于让⽤⼾填写信息的场景. 左侧列为提⽰, 右侧列为输⼊框 代码⽰例: 使⽤ QFormLayout 创…

兰亭妙微 | 医疗软件的界面设计能有多专业?

从医疗影像系统到手术机器人控制界面&#xff0c;从便携式病原体检测设备到多平台协同操作系统&#xff0c;兰亭妙微为众多医疗设备研发企业&#xff0c;打造了兼具专业性与可用性的交互界面方案。 我们不仅做设计&#xff0c;更深入理解医疗场景的实际需求&#xff1a; 对精…

鸿蒙开发修改版本几个步骤

鸿蒙开发修改版本几个步骤 比如&#xff1a;5.0.4&#xff08;16&#xff09;版本改为5.0.2&#xff08;14&#xff09;版本 一、项目下的build-profile.json5 "products": [{"name": "default","signingConfig": "default&qu…