在这里插入图片描述


1.简介

Kafka的poll()方法消费无法精准的掌握其消费的起始位置,auto.offset.reset参数也只能在比较粗粒度的指定消费方式。更细粒度的消费方式kafka提供了seek()方法可以指定位移消费允许消费者从特定位置(如固定偏移量、时间戳或分区首尾)开始消费消息。

2.指定消费位置

2.1.从特定偏移量开始消费

使用seek(TopicPartition partition, long offset)指定具体偏移量。

源码分析:

  • seek()方法更新消费者内部的subscriptions对象的position字段,记录目标偏移量。
  • 后续poll()时,Fetcher类根据此位置向Broker发送拉取请求。

代码示例:

consumer.subscribe(Collections.singleton("test-topic"));
Set<TopicPartition> assignment = new HashSet<>();
// 确保分配到分区
while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();
}
// 设置所有分区从offset=100开始消费
assignment.forEach(tp -> consumer.seek(tp, 100));

2.2.从时间戳开始消费

使用offsetsForTimes()获取时间戳对应的偏移量,再调用seek()

源码分析:

offsetsForTimes()向Broker发送ListOffsetRequest,查询满足时间戳条件的最早或最新偏移量。

代码实例:

Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 24 * 3600 * 1000L));
// 获取24小时前的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
});

2.3.从分区首尾消费

使用seekToBeginning()seekToEnd(),或通过beginningOffsets()/endOffsets()获取首尾偏移量后手动设置。

代码实例:

// 从分区末尾开始消费(等效于auto.offset.reset=latest)
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
assignment.forEach(tp -> consumer.seek(tp, endOffsets.get(tp)));

2.4.注意事项

  1. 分区分配与poll()的依赖
    seek()必须在分区分配完成后调用,否则会抛出IllegalStateException。需通过循环poll()确保分配到分区。

  2. 数据过期问题
    若指定偏移量对应的消息已被删除(如日志清理导致),seek()将失效。此时需使用beginningOffsets()获取当前最小有效偏移量。

  3. 异步提交与位移覆盖风险
    异步提交(commitAsync())失败时不会重试,可能因位移回滚导致重复消费。需结合同步提交(commitSync())保证原子性

  4. seek()方法提供了我们可以将消费者位移保存在外部的能力,还可以配合在均衡监听器来提供更加精准的消费能力。

3.完整代码实例

public class SeekToTimestampDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "seek-demo");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singleton("test-topic"));// 等待分区分配Set<TopicPartition> assignment = new HashSet<>();while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();}// 获取24小时前的时间戳对应偏移量Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 86400000L));Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);// 指定位移offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) {consumer.seek(tp, offsetAndTs.offset());} else {// 处理无有效偏移量的情况(如从头开始)consumer.seekToBeginning(Collections.singleton(tp));}});while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));records.forEach(record -> System.out.printf("offset=%d, value=%s%n", record.offset(), record.value()));}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/907369.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/907369.shtml
英文地址,请注明出处:http://en.pswp.cn/news/907369.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JS进阶】JavaScript 中 this 值的确定规则

JavaScript 中 this 值的确定规则 1. 默认绑定&#xff08;独立函数调用&#xff09; 当函数作为普通函数调用时&#xff0c;this 指向全局对象&#xff08;浏览器中是 window&#xff0c;Node.js 中是 global&#xff09;&#xff0c;严格模式下是 undefined。 function sh…

【凌智视觉模块】rv1106 部署 pp-humseg 模型

人像分割简介 ❀ 凌智视觉模块 是一款基于rv1106芯片开发的视觉模块&#xff0c;专注于视觉模型部署与开发。 人像分割是一种基于计算机视觉的技术&#xff0c;通过深度学习算法精准识别图像或视频中的人物主体&#xff0c;将其与背景进行像素级分离。该技术可实时运行于移动端…

wangeditor富文本编辑器+vue3粘贴内容样式处理

又是一个风格和日立的上午&#xff0c;某只菜鸟高高兴兴的骑着小电驴去上班&#xff0c;本着上班只要不迟到的理念飞速前行&#xff08;迟到扣钱啊~&#xff09;&#xff0c;高高兴兴的行走在路上。来到工位刚拴上我的绳子组长就开始滴滴俺&#xff0c;顿时我心中大感不妙&…

实测,大模型谁更懂数据可视化?

大家好&#xff0c;我是 Ai 学习的老章 看论文时&#xff0c;经常看到漂亮的图表&#xff0c;很多不知道是用什么工具绘制的&#xff0c;或者很想复刻类似图表。 实测&#xff0c;大模型 LaTeX 公式识别&#xff0c;出乎预料 前文&#xff0c;我用 Kimi、Qwen-3-235B-A22B、…

深度学习-梯度消失和梯度爆炸

梯度消失 在某些神经网络中&#xff0c;随着网络深度的增加&#xff0c;梯度在隐藏层反向传播时倾向于变小&#xff0c;这就意味着&#xff0c;前面隐藏层中的神经元要比后面的学习起来更慢&#xff0c;这种现象就叫做“梯度消失”&#xff1b; 梯度爆炸 如果我们进行一些特殊…

Go 语言基础 2 Func,流程控制

更多个人笔记见&#xff1a; github个人笔记仓库 gitee 个人笔记仓库 个人学习&#xff0c;学习过程中还会不断补充&#xff5e; &#xff08;后续会更新在github上&#xff09; 文章目录 Func 函数函数栈概念 函数表示类型 Anonymous func 匿名函数closure 闭包基础示例http利…

【Linux 学习计划】-- 倒计时、进度条小程序

目录 \r 、\n、fflush 倒计时 进度条 进度条进阶版 结语 \r 、\n、fflush 首先我们先来认识这三个东西&#xff0c;这将会是我们接下来两个小程序的重点之一 首先是我们的老演员\n&#xff0c;也就是回车加换行 这里面其实包含了两个操作&#xff0c;一个叫做回车&…

从零实现wss通信示例(WebSocket SSL)

客户端和服务端代码框架跟上一篇一致,仅增加了ssl的证书部分用于加密通信,明文通信(ws协议)见上一篇【https://blog.csdn.net/suoxd123/article/details/148093934】 1. 证书创建 1. 安装openssl 【官网地址】:https://slproweb.com/products/Win32OpenSSL.html 1.2 …

mysql 索引失效有哪些

InnoDB存储引擎根据索引类型不同&#xff0c;分为聚簇索引和二级索引 聚簇索引&#xff1a;叶子节点存放的是实际数据 二级索引&#xff1a;存放的是主键值&#xff0c;不是实际数据 1.对索引使用左或者左右模糊匹配 select * from t_user where name like %林‘&#xff1b…

LabVIEW通用测控平台设计

基于 LabVIEW 图形化编程环境&#xff0c;设计了一套适用于工业自动化、科研测试领域的通用测控平台。通过整合研华、NI等品牌硬件&#xff0c;实现多类型数据采集、实时控制及可视化管理。平台采用模块化架构&#xff0c;支持硬件灵活扩展&#xff0c;解决了传统测控系统开发周…

华为OD机试真题——智能驾驶(2025A卷:200分)Java/python/JavaScript/C/C++/GO最佳实现

2025 A卷 200分 题型 本专栏内全部题目均提供Java、python、JavaScript、C、C++、GO六种语言的最佳实现方式; 并且每种语言均涵盖详细的问题分析、解题思路、代码实现、代码详解、3个测试用例以及综合分析; 本文收录于专栏:《2025华为OD真题目录+全流程解析+备考攻略+经验分…

速卖通,国际站测评补单,如何平衡效率和安全

测评能够帮助卖家让平台更喜欢自己的产品&#xff0c;给予更好排名的同时也让后续进入店铺的买家更容易认可自己的产品。这是进行真实交易后形成的评价&#xff0c;而不是通过机器软件生成&#xff0c;形成虚拟数据后&#xff0c;那种刷评形式产生的评论。它符合任何电商平台的…

学习路之PHP--easyswoole3.3入门及文件热加载

学习路之PHP--easyswoole入门 一、框架说明二、常用命令三、文件热加载 一、框架说明 目录结构 目录结构 project 项目部署目录 ├─App 应用目录(可以有多个) │ ├─HttpController 控制器目录 │ │ └─Index.php …

设计模式26——解释器模式

写文章的初心主要是用来帮助自己快速的回忆这个模式该怎么用&#xff0c;主要是下面的UML图可以起到大作用&#xff0c;在你学习过一遍以后可能会遗忘&#xff0c;忘记了不要紧&#xff0c;只要看一眼UML图就能想起来了。同时也请大家多多指教。 解释器模式&#xff08;Interp…

第三届宁波技能大赛网络安全赛项样题

2025 第三届宁波技能大赛网络安全赛项样题 模块A: 网络安全事件响应、数字取证调查和应用安全任务一:应急响应任务二:操作系统取证任务三:网络数据包分析任务四:代码审计 模块B:CTF 夺旗-攻击模块C:CTF 夺旗-防御需要环境培训可以私信博主&#xff01;&#xff01;&#xff01;…

GO语言进阶:掌握进程OS操作与高效编码数据转换

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐&#xff1a;「storms…

IO进程(进程 Process)

什么是进程&#xff1f; 1.概念 程序&#xff1a;编译好的可执行文件&#xff0c;存放在磁盘上的指令和数据的有序集合。 由此可见程序是静态的&#xff0c;没有执行的概念。 进程&#xff1a;是程序的一次执行的过程&#xff0c;是一个可调度的任务&#xff0c;也是执行一…

CSS传统布局与定位详解与TDK三大标签SEO优化

一、传统布局基础 1. 文档流布局 浏览器默认的文档流布局方式遵循以下规则&#xff1a; 块级元素&#xff08;如<div>、<p>、<h1>&#xff09;&#xff1a; 独占一行宽度默认100%可以设置宽高、内外边距 div {width: 500px;height: 200px;margin: 10px …

【GraphQL】深入解析 Apollo Client:从架构到实践的一站式 GraphQL 解决方案

深入解析 Apollo Client&#xff1a;从架构到实践的一站式 GraphQL 解决方案 1. 引言 GraphQL 作为现代 API 开发的核心技术&#xff0c;其灵活性和高效性正在重塑数据交互模式。Apollo Client 作为 GraphQL 生态中最受欢迎的客户端库&#xff0c;凭借强大的缓存机制、框架集…

docker学习基本使用教程

docker是一款用于开发部署和运行容器化平台&#xff0c;能将应用及其依赖打包成轻量级、可移植的容器&#xff0c;实现一次构建&#xff0c;随处运行。docker是cs架构程序&#xff08;客户端和服务端&#xff09;&#xff0c;docker客户端向docker守护进程发送请求&#xff0c;…