提供了两个MessageListenerContainer实现:

KafkaMessageListenerContainer

ConcurrentMessageListener容器

KafkaMessageListenerContainer在单个线程上接收来自所有主题或分区的所有消息。ConcurrentMessageListenerContainer委托给一个或多个KafkaMessageListenerCcontainer实例,以提供多线程消费。

从2.2.7版本开始,您可以将RecordInterceptor添加到侦听器容器中;它将在调用侦听器之前被调用,允许检查或修改记录。如果拦截器返回null,则不调用侦听器。从2.7版本开始,它具有在侦听器退出后调用的其他方法(通常,或通过抛出异常)。此外,从2.7版本开始,现在有一个BatchInterceptor,为Batch Listeners提供类似的功能。此外,ConsumerwareRecordInterceptor(和BatchInterceptor)还提供对Consumer的访问?, ?>.例如,这可用于访问拦截器中的消费者指标。

您不应该执行任何影响消费者在这些拦截器中的位置和/或已提交偏移量的方法;容器需要管理这些信息。

如果拦截器更改了记录(通过创建新记录),则主题、分区和偏移量必须保持不变,以避免记录丢失等意外副作用。

CompositeRecordInterceptor和CompositeBatchInterceptor可用于调用多个拦截器。

默认情况下,从2.8版本开始,当使用事务时,拦截器会在事务开始之前被调用。您可以将侦听器容器的interceptBeforeTx属性设置为false,以便在事务开始后调用侦听器。从2.9版本开始,这将适用于任何事务管理器,而不仅仅是KafkaAwareTransactionManagers。例如,这允许拦截器参与由容器启动的JDBC事务。

从2.3.8、2.4.6版本开始,当并发性大于1时,ConcurrentMessageListenerContainer现在支持静态成员资格。group.instance.id的后缀是-n,n从1开始。这与增加的session.timeout.ms一起可用于减少重新平衡事件,例如在重新启动应用程序实例时。

Using KafkaMessageListenerContainer

以下构造函数可用:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)

它在ContainerProperties对象中接收ConsumerFactory以及有关主题和分区以及其他配置的信息。ContainerProperties具有以下构造函数:

public ContainerProperties(TopicPartitionOffset... topicPartitions)public ContainerProperties(String... topics)public ContainerProperties(Pattern topicPattern)

第一个构造函数接受TopicPartitionOffset参数数组,以显式指示容器使用哪些分区(使用消费者assign()方法),并具有可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值相对于分区内当前最后一个偏移量。提供了一个接受额外布尔参数的TopicPartitionOffset构造函数。如果这是真的,则初始偏移(正或负)相对于该消费者的当前位置。容器启动时应用偏移量。第二个是一个主题数组,Kafka根据group.id属性分配分区 — 在整个组中分布分区。第三种使用正则表达式模式来选择主题。

要将MessageListener分配给容器,可以在创建容器时使用ContainerProps.setMessageListener方法。以下示例显示了如何执行此操作:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {...
});
DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

请注意,在创建DefaultKafkaConsumerFactory时,使用仅接受上述属性的构造函数意味着从配置中获取键和值反序列化器类。或者,反序列化器实例可以传递给DefaultKafkaConsumerFactory构造函数的键和/或值,在这种情况下,所有消费者共享相同的实例。另一种选择是提供Supplier<Deserializer>(从2.3版本开始),用于为每个Consumer获取单独的Deserializer实例:

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有关可以设置的各种属性的更多信息,请参阅ContainerProperties的Javadoc。

自2.1.1版本起,一个名为logContainerConfig的新属性可用。当启用true和INFO日志记录时,每个侦听器容器都会写入一条日志消息,总结其配置属性。

默认情况下,在DEBUG日志级别执行主题偏移提交的日志记录。从2.1.2版本开始,ContainerProperties中名为commitLogLevel的属性允许您指定这些消息的日志级别。例如,要将日志级别更改为INFO,可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO);。

从2.2版本开始,添加了一个名为missingTopicsFatal的新容器属性(默认值:自2.3.4以来为false)。如果代理上不存在任何配置的主题,这将阻止容器启动。如果容器配置为侦听主题模式(正则表达式),则不适用。以前,容器线程在consumer.poll()方法中循环,在记录许多消息时等待主题出现。除了日志,没有迹象表明存在问题。

自2.8版本起,引入了新的容器属性authExceptionRetryInterval。这会导致容器在从KafkaConsumer获取任何AuthenticationException或AuthorizationException后重试获取消息。例如,当配置的用户被拒绝读取某个主题或凭据不正确时,就会发生这种情况。定义authExceptionRetryInterval允许容器在授予适当权限时恢复。

默认情况下,不配置间隔-身份验证和授权错误被认为是致命的,这会导致容器停止。

从2.8版本开始,在创建消费者工厂时,如果您将反序列化器作为对象提供(在构造函数中或通过setter),工厂将调用configure()方法,使用配置属性对其进行配置。

Using ConcurrentMessageListenerContainer

单个构造函数类似于KafkaListenerContainer构造函数。以下列表显示了构造函数的签名:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)

它还具有并发属性。例如,container.setCourrency(3)创建了三个KafkaMessageListenerContainer实例。

如果容器属性是为主题(或主题模式)配置的,Kafka将使用其组管理功能在消费者之间分配分区。

在听多个主题时,默认的分区分布可能不是您所期望的。例如,如果你有三个主题,每个主题有五个分区,并且你想使用并发=15,你只会看到五个活动的消费者,每个消费者从每个主题分配一个分区,其他10个消费者处于空闲状态。这是因为默认的Kafka ConsumerPartitionAssignor是RangeAssignor(请参阅其Javadoc)。对于这种情况,您可能需要考虑使用RoundRobinAssignor,它将分区分配给所有消费者。然后,为每个消费者分配一个主题或分区。要更改ConsumerPartitionAssignor,您可以在提供给DefaultKafkaConsumerFactory的属性中设置partition.assignment.strategy消费者属性(ConsumerConfig.partition_assignment_STRATY_CONFIG)。

使用Spring Boot时,您可以按如下方式分配和设置策略:

spring.kafka.consumer.properties.分区.分配.策略=\

org.apache.kafka.clients.consumer。RoundRobin分配器

当容器属性配置了TopicPartitionOffset时,ConcurrentMessageListenerContainer会将TopicPartitionOffset实例分发到代理KafkaMessageListenerCcontainer实例中。

假设提供了六个TopicPartitionOffset实例,并发性为3;每个容器有两个分区。对于五个TopicPartitionOffset实例,两个容器获得两个分区,第三个容器获得一个分区。如果并发性大于TopicPartitions的数量,则会降低并发性,使每个容器都有一个分区。

client.id属性(如果设置)附加了-n,其中n是与并发性对应的消费者实例。这是在启用JMX时为MBean提供唯一名称所必需的。

从1.3版本开始,MessageListenerContainer提供了对底层KafkaConsumer指标的访问。在ConcurrentMessageListenerContainer的情况下,metrics()方法返回所有目标KafkaMessageListenerCcontainer实例的度量。这些指标被分组到Map<MetricName?根据为底层KafkaConsumer提供的客户端id扩展Metric>。

从2.3版本开始,ContainerProperties提供了一个idleBetweenPolls选项,让侦听器容器中的主循环在KafkaConsumer.poll()调用之间休眠。从提供的选项和max.poll.interval.ms消费者配置与当前记录批处理时间之间的差值中选择实际睡眠间隔作为最小值。

Committing Offsets

提供了几种用于提交抵消的选项。如果enable.auto.commit消费者属性为true,Kafka会根据其配置自动提交偏移量。如果为false,则容器支持多个AckMode设置(如下表所述)。默认的确认模式为批处理。从2.3版本开始,除非在配置中明确设置,否则框架将enable.auto.commit设置为false。以前,如果未设置属性,则使用Kafka默认值(true)。

Consumerpoll()方法返回一个或多个ConsumerRecords。每条记录都会调用MessageListener。以下列表描述了容器对每个AckMode(未使用事务时)采取的操作:

RECORD:在监听器处理记录后返回时提交偏移量。

批处理:当poll()返回的所有记录都已处理完毕时,提交偏移量。

TIME:在poll()返回的所有记录都已处理完毕时提交偏移量,只要超过了自上次提交以来的ackTime。

COUNT:在poll()返回的所有记录都已处理完毕时提交偏移量,只要自上次提交以来已收到ackCount记录即可。

COUNT_TIME:类似于TIME和COUNT,但如果任一条件为真,则执行提交。

手册:消息监听器负责确认()确认。之后,应用与BATCH相同的语义。

MANUAL_IMMEDIATE:当侦听器调用Acknowledgment.reacknowe()方法时,立即提交偏移量。

使用事务时,偏移量被发送到事务,语义等效于RECORD或BATCH,具体取决于侦听器类型(记录或批处理)。

MANUAL和MANUAL_IMMEDIATE要求侦听器是AcknowledgingMessageListener或BatchAcknowledingMessageListener。请参阅消息侦听器。

根据syncCommits容器属性,使用消费者上的commitSync()或commitSync(()方法。syncCommits默认为true;另请参见setSyncCommitTimeout。查看setCommitCallback以获取异步提交的结果;默认回调是LoggingCommitCallback,它记录错误(以及调试级别的成功)。

因为监听器容器有自己的偏移提交机制,所以它更喜欢Kafka ConsumerConfig。ENABLE_AUTO_COMIT_CONFIG为false。从2.3版本开始,它无条件地将其设置为false,除非在消费者工厂中特别设置或容器的消费者属性重写。

确认书有以下方法:

public interface Acknowledgment {void acknowledge();}

此方法使侦听器可以控制何时提交偏移量。

从2.3版本开始,Acknowledgment接口有两个额外的方法nack(长睡眠)和nack(int index,长睡眠)。第一个用于记录侦听器,第二个用于批处理侦听器。为您的侦听器类型调用错误的方法将引发IllegalStateException。

如果你想使用nack()提交部分批处理,在使用事务时,将AckMode设置为MANUAL;调用nack()将成功处理的记录的偏移量发送给事务。

nack()只能在调用监听器的消费者线程上调用。

使用无序提交时不允许使用nack()。

使用记录侦听器,当调用nack()时,任何挂起的偏移都会被提交,最后一次轮询的剩余记录会被丢弃,并在它们的分区上执行查找,以便在下一次轮询()时重新传递失败的记录和未处理的记录。通过设置sleep参数,可以在重新交付之前暂停消费者。这与在容器配置了DefaultErrorHandler时抛出异常的功能类似。

nack()在指定的睡眠持续时间内暂停整个侦听器,包括所有分配的分区。

使用批侦听器时,可以指定发生故障的批中的索引。当调用nack()时,在对失败和丢弃的记录的分区执行索引和查找之前,将为记录提交偏移量,以便在下一次poll()时重新传递它们。

有关更多信息,请参阅容器错误处理程序。

消费者在睡眠期间暂停,以便我们继续轮询经纪人以保持消费者的活力。实际睡眠时间及其分辨率取决于容器的pollTimeout,默认值为5秒。最小睡眠时间等于pollTimeout,所有睡眠时间都是它的倍数。对于较小的睡眠时间,或者为了提高其准确性,可以考虑减少容器的pollTimetime。

从3.0.10版本开始,批处理监听器可以使用Acknowledgment参数上的confirm(index)来提交批处理部分的偏移量。调用此方法时,将提交索引处记录的偏移量(以及所有以前的记录)。在执行部分批处理提交后调用confirmate()将提交批处理剩余部分的偏移量。以下限制适用:

确认模式。需要立即手动

必须在侦听器线程上调用该方法

侦听器必须使用List而不是原始ConsumerRecords

索引必须在列表元素的范围内

索引必须大于前一次调用中使用的索引

这些限制被强制执行,该方法将根据违规情况抛出IllegalArgumentException或IllegalStateException。

Listener Container Auto Startup

侦听器容器实现SmartLifecycle,默认情况下autoStartup为true。容器在后期阶段(Integer.MAX-VALUE-100)启动。实现SmartLifecycle以处理来自侦听器的数据的其他组件应在早期阶段启动。-100为后续阶段留出了空间,使组件能够在容器后自动启动。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/87882.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/87882.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/87882.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JDBC 注册驱动的常用方法详解

JDBC 注册驱动的常用方法详解 在 JDBC 中&#xff0c;注册驱动是建立数据库连接的第一步。以下是几种常用的驱动注册方式&#xff1a; 1. 显式类加载&#xff08;传统方式&#xff09; // 通过 Class.forName() 加载驱动类 Class.forName("com.mysql.cj.jdbc.Driver&qu…

插入数据优化

目录 一.插入数据优化 1.insert语句优化 ①批量插入 ②手动提交事务 ③主键顺序插入 2.大批量插入数据&#xff08;100万条&#xff09; 举例 第一步&#xff1a;连接数据库时&#xff0c;加上--local-infile属性 第二步&#xff1a;查看全局参数local_infile的值&…

区块链在域名系统安全中的应用进展综述

一、区块链与DNS结合的核心原理1.1 传统DNS的安全缺陷中心化架构&#xff1a;传统DNS依赖中心化服务器&#xff08;如ICANN管理的根服务器&#xff09;&#xff0c;存在单点故障风险&#xff0c;易受DDoS攻击或配置错误影响。协议脆弱性&#xff1a;DNS协议设计之初缺乏加密和认…

GO Web 框架 Gin 完全解析与实践

目录 1. 为什么选择 Gin?解锁 Go Web 开发的超能力 Gin 的核心优势 什么时候用 Gin? 第一个 Hello World 2. 路由的艺术:从简单 GET 到复杂匹配 基础路由 高级路由技巧 性能优化小贴士 3. 中间件的魔法:让请求处理更聪明 内置中间件 自定义中间件 中间件的最佳实…

RabbitMQ使用topic Exchange实现微服务分组订阅

案例场景&#xff1a;用户下单后需要多个微服务&#xff08;如营销、会员&#xff09;分别订阅并处理订单事件&#xff0c;且每个微服务可能有多个集群实例&#xff0c;需要保证同一个微服务的集群中&#xff0c;只有一个实例消费到消息。不同于Kafka和rocketMQ有分组消费的功能…

kotlin 通道trysend方法

trySend 方法是 Kotlin 协程中 Channel 类的一个重要功能。它用于向通道发送元素&#xff0c;但与 send 方法不同的是&#xff0c;trySend 是非阻塞的。这意味着它不会在通道满时挂起当前协程&#xff0c;而是会立即返回。 trySend 方法的效果 非阻塞行为&#xff1a; 当你调用…

winform CheckedListBox单击选中解决方案

在WinForms的CheckedListBox控件中&#xff0c;默认需要双击才能切换选中状态&#xff08;复选框勾选&#xff09;。要实现单击即选中&#xff0c;需要通过代码处理鼠标点击事件并手动切换选中状态。以下是实现步骤&#xff1a; 1.CheckOnClick属性置为true即可。 2.通过事件处…

Docker文件操作、数据卷、挂载

一&#xff1a;容器文件操作 在Docker环境中&#xff0c;管理容器内部的文件是一个常见的需求。 无论是为了配置应用、备份数据还是调试问题&#xff0c;了解如何高效地进行文件操作都是非常重要的。 docker cp命令提供了一种简单的方法来在宿主主机和容器之间复制文件或目录…

接口漏洞怎么抓?Fiddler 中文版 + Postman + Wireshark 实战指南

接口安全是现代应用开发中的高危环节&#xff1a;一旦API存在未授权访问、参数篡改、权限绕过等漏洞&#xff0c;可能直接导致用户信息泄露、资金损失甚至整个平台瘫痪。对于开发和安全人员来说&#xff0c;光依赖后端日志排查远远不够&#xff0c;需要对接口进行主动安全性验证…

iOS 出海 App 安全加固指南:无源码环境下的 IPA 加固与防破解方法

随着越来越多国内开发团队将iOS App推向海外市场&#xff0c;如何在交付和分发环节保护应用安全成为出海过程中的重要议题。尤其是App进入多个海外应用商店或通过第三方渠道发行时&#xff0c;容易被当地黑产或竞争对手进行逆向分析&#xff0c;从而暴露内部API、核心业务流程等…

React Hooks 内部实现原理与函数组件更新机制

React Hooks 内部实现原理与函数组件更新机制 Hooks 的内部实现原理 React Hooks 的实现依赖于以下几个关键机制&#xff1a; 1. 链表结构存储 Hook 状态 React 使用单向链表来管理 Hooks 的状态。每个 Hook 节点包含&#xff1a; type Hook {memoizedState: any, // 存储…

分布式会话的演进和最佳实践,含springBoot 实现(Java版本)

一、分布式会话的背景 在微服务架构或集群部署环境下&#xff0c;请求可能落在不同的服务器节点&#xff0c;无法再依赖本地内存来维护用户 Session。因此&#xff0c;需要一种跨节点共享 Session 的机制&#xff0c;这就是 分布式会话管理的核心目标。二、分布式会话的演进历程…

ch03 部分题目思路

G. 收集 由于稀有度相同的物品需要一起处理&#xff0c;我们先把他们聚集到一起。 类似这样&#xff1a; vector<int> g[maxn]; ... {cin >> x >> c;g[c].push_back(x); }那么我们需要一个贪心的思路&#xff1a; 肯定是按 ccc 从小往大收集的&#xff1b;对…

Django多表查询(ORM)

1、建立表结构 三个表&#xff1a;book、Author、publisher。 书籍和作者是多对多的关系&#xff0c;一本书可以有多个作者&#xff0c;一个作者可以有多本书。 出版社和书籍是一对多的关系&#xff0c;一个出版社可以出版多本书&#xff08;多方&#xff0c;多方定义外键&…

C# 集合表达式和展开运算符 (..) 详解

集合表达式 (Collection Expressions)基本语法支持的集合类型展开运算符 (..)基本用法实际应用示例创建新集合合并集合与现有API结合性能考虑高级用法多维集合自定义集合注意事项与传统方式的比较总结集合表达式 (Collection Expressions) C# 12 引入了集合表达式&#xff0c;…

数学视频动画引擎Python库 -- Manim Voiceover 安装 Installation

文中内容仅限技术学习与代码实践参考&#xff0c;市场存在不确定性&#xff0c;技术分析需谨慎验证&#xff0c;不构成任何投资建议。 Manim Voiceover 是一个为 Manim 打造的专注于语音旁白的插件&#xff1a; 直接在 Python 中添加语音旁白&#xff1a; 无需使用视频编辑器&…

Git安装避坑指南:新手村通关秘籍

Git安装避坑指南&#xff1a;新手村通关秘籍 刚学编程那会儿&#xff0c;Git安装差点让我砸键盘。满心欢喜打开官网下载&#xff0c;结果卡在配置上&#xff0c;命令行死活不认识git命令。看着教程里别人行云流水的操作&#xff0c;自己对着报错信息干瞪眼——这感觉&#xff…

如何修改Siteground max_execution_time值?

这个值在Siteground 上是修改不了的。 以下是来自Siteground 官网的解释&#xff1a; 由于服务器上全局定义的 PHP 限制&#xff0c;某些 PHP 设置无法更改。最常见的无法更改的 PHP 设置包括&#xff1a; memory_limit max_execution_time max_input_time post_max_size up…

【libm】 11 fmin函数 (fmin.rs)

一、源码 这段代码实现了一个符合 IEEE 754-2008 标准的 minNum 函数&#xff08;在 Rust 中命名为 fmin&#xff09;&#xff0c;该功能在 IEEE 754-2019 标准中已被 minimumNumber 取代。 /* SPDX-License-Identifier: MIT OR Apache-2.0 */ //! IEEE 754-2008 minNum. Thi…

React 英语单词消消乐一款专为英语学习设计的互动式记忆游戏

&#x1f4d6; 项目简介 英语单词消消乐 是一款专为英语学习设计的互动式记忆游戏。通过经典的消消乐玩法&#xff0c;让用户在轻松愉快的游戏中掌握英语单词&#xff0c;提高词汇量和记忆效果。 &#x1f3af; 项目目标 让英语学习变得有趣且高效通过游戏化方式增强单词记忆…