一、环境版本

环境版本
Flink1.17.0
Kafka2.12
MySQL5.7.33

二、MySQL建表脚本

create table user_log
(id      int auto_increment comment '主键'primary key,uid     int    not null comment '用户id',event   int    not null comment '用户行为',logtime bigint null comment '日志时间'
)comment '用户日志表,作为验证数据源';

三、用户日志类

新建maven项目

用以定义Kafka和MySQL中Schema

/*** 用户日志类*/
@Data
public class UserLog {//用户uidprivate int uid;//用户行为private int event;//日志时间private Date logtime;
}

四、用户数据生成器

/*** 用户数据生成器*/
public class UserLogGenerator {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.自定义数据生成器SourceDataGeneratorSource<UserLog> dataGeneratorSource = new DataGeneratorSource<>(// 指定GeneratorFunction 实现类new GeneratorFunction<Long, UserLog>(){// 定义随机数数据生成器public RandomDataGenerator generator;@Overridepublic void open(SourceReaderContext readerContext) throws Exception {generator = new RandomDataGenerator();}@Overridepublic UserLog map(Long aLong) throws Exception {UserLog userLog = new UserLog();//随机生成用户uiduserLog.setUid(generator.nextInt(1, 100000));//随机生成用户行为userLog.setEvent(generator.nextInt(1, 2));//随机生成用户数据时间userLog.setLogtime(DateUtil.offset(new DateTime(), DateField.MILLISECOND, generator.nextInt(-2000, 2000)));return userLog;}},// 指定输出数据的总行数60 * 60 * 10,// 指定每秒发射的记录数RateLimiterStrategy.perSecond(10),// 指定返回值类型, 将Java的StockPrice封装成到TypeInformationTypeInformation.of(UserLog.class));DataStreamSource<UserLog> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");//输出生成数据
//        dataGeneratorSourceStream.print();//kafka数据写入KafkaSink<UserLog> kafkaSink = KafkaSink.<UserLog>builder().setBootstrapServers("hadoop01:9092").setRecordSerializer(KafkaRecordSerializationSchema.<UserLog>builder().setTopic("userLog").setValueSerializationSchema((SerializationSchema<UserLog>) userLog -> JSONUtil.toJsonStr(userLog).getBytes()).build()).build();dataGeneratorSourceStream.sinkTo(kafkaSink);//MySQL数据写入,用以数据验证SinkFunction<UserLog> jdbcSink = JdbcSink.sink("insert into user_log (uid, event, logtime) values (?, ?, ?)",new JdbcStatementBuilder<UserLog>() {@Overridepublic void accept(PreparedStatement preparedStatement, UserLog userLog) throws SQLException {preparedStatement.setInt(1, userLog.getUid());preparedStatement.setInt(2, userLog.getEvent());preparedStatement.setLong(3, userLog.getLogtime().getTime());}},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/demo").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("你的用户名").withPassword("你的密码").build());dataGeneratorSourceStream.addSink(jdbcSink);env.execute();}
}

五、TableAPI 10秒钟内用户的访问量

/*** 10秒钟内用户的访问量*/
public class UserLogCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);env.setParallelism(1);//1.定义table的schemafinal Schema schema = Schema.newBuilder().column("uid", DataTypes.INT()).column("event", DataTypes.INT()).column("logtime", DataTypes.BIGINT())//将logtime转换为flink使用的timsstamp格式.columnByExpression("rowtime", "TO_TIMESTAMP_LTZ(logtime, 3)")//定义水位线.watermark("rowtime", "rowtime - INTERVAL '5' SECOND").build();//2.创建Kafka source tabletableEnv.createTable("user_log", TableDescriptor.forConnector("kafka").schema(schema).format("json")
//                .option("json.timestamp-format.standard", "ISO-8601").option("json.ignore-parse-errors", "true").option("topic", "userLog").option("properties.bootstrap.servers", "hadoop01:9092").option("scan.startup.mode", "latest-offset").build());//3.创建一个滚动窗口表Table pvTable = tableEnv.from("user_log")//定义一个10秒钟的滚动窗口.window(Tumble.over(lit(10).seconds()).on($("rowtime")).as("w")).groupBy($("w")).select($("w").start().as("w_start"),$("w").end().as("w_end"),//$("uid").count().distinct().as("uv")),$("uid").count().as("pv"));pvTable.execute().print();}
}

六、数据验证

  1. 启动 UserLogGenerator
  2. 启动 UserLogCount
+----+-------------------------+-------------------------+----------------------+
| op |                 w_start |                   w_end |                   pv |
+----+-------------------------+-------------------------+----------------------+
| +I | 2025-08-11 15:11:50.000 | 2025-08-11 15:12:00.000 |                   10 |
| +I | 2025-08-11 15:12:00.000 | 2025-08-11 15:12:10.000 |                   95 |
| +I | 2025-08-11 15:12:10.000 | 2025-08-11 15:12:20.000 |                  104 |
| +I | 2025-08-11 15:12:20.000 | 2025-08-11 15:12:30.000 |                  104 |
| +I | 2025-08-11 15:12:30.000 | 2025-08-11 15:12:40.000 |                   94 |
| +I | 2025-08-11 15:12:40.000 | 2025-08-11 15:12:50.000 |                  104 |
| +I | 2025-08-11 15:12:50.000 | 2025-08-11 15:13:00.000 |                   96 |
| +I | 2025-08-11 15:13:00.000 | 2025-08-11 15:13:10.000 |                  100 |
  1. 在MySQL中验证查询

选取数据

+----+-------------------------+-------------------------+----------------------+
| op |                 w_start |                   w_end |                   pv |
+----+-------------------------+-------------------------+----------------------+
| +I | 2025-08-11 15:12:50.000 | 2025-08-11 15:13:00.000 |                   96 

转换时间戳

时间戳转换前转换后
w_start2025-08-11 15:12:50.0001754896370000
w_end2025-08-11 15:13:00.0001754896380000

MySQL中查询

# 输出96与Flink结果一致
select count(*) 
from user_log 
where logtime>= 1754896370000 and logtime < 1754896380000;

七、POM文件

<project><groupId>dblab</groupId><artifactId>demo</artifactId><modelVersion>4.0.0</modelVersion><name> </name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>central-repos</id><name>Central Repository</name><url>http://repo.maven.apache.org/maven2</url></repository><repository><id>alimaven</id><name>aliyun maven</name><url>https://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories><properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.flink</groupId>-->
<!--      <artifactId>flink-connector-files</artifactId>-->
<!--      <version>${flink.version}</version>-->
<!--    </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.flink</groupId>-->
<!--      <artifactId>flink-connector-files</artifactId>-->
<!--      <version>${flink.version}</version>-->
<!--    </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.flink</groupId>-->
<!--      <artifactId>flink-csv</artifactId>-->
<!--      <version>${flink.version}</version>-->
<!--    </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version><scope>provided</scope></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.39</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

八、常见问题

8.1 未定义水位线

Exception in thread "main" org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:327)at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:307)at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:300)at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:265)at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:262)at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:641)at UserLogCount.main(UserLogCount.java:42)

当TableAPI中未定义水位线时,会导致Flink无法识别窗口的时间戳

//定义水位线
.watermark("rowtime", "rowtime - INTERVAL '5' SECOND")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/93295.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/93295.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/93295.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

18.13 《3倍效率提升!Hugging Face datasets.map高级技巧实战指南》

3倍效率提升!Hugging Face datasets.map高级技巧实战指南 实战项目:使用 datasets.map 进行高级数据处理 在大模型训练过程中,数据预处理的质量直接决定了模型最终的表现。Hugging Face Datasets 库提供的 datasets.map 方法是处理复杂数据场景的瑞士军刀,本章将深入解析…

实体店获客新引擎:数据大集网如何破解传统门店引流难题

在商业竞争日益激烈的当下&#xff0c;实体店的生存与发展正面临前所未有的挑战。无论是街边的小型便利店&#xff0c;还是大型购物中心的连锁品牌&#xff0c;都在为"如何吸引顾客进店"而绞尽脑汁。传统广告投放效果不佳、线下流量持续萎缩、客户转化率难以提升………

LeetCode 分类刷题:2302. 统计得分小于 K 的子数组数目

题目一个数组的 分数 定义为数组之和 乘以 数组的长度。比方说&#xff0c;[1, 2, 3, 4, 5] 的分数为 (1 2 3 4 5) * 5 75 。给你一个正整数数组 nums 和一个整数 k &#xff0c;请你返回 nums 中分数 严格小于 k 的 非空整数子数组数目。子数组 是数组中的一个连续元素序…

TDengine IDMP 基本功能(1.界面布局和操作)

UI 布局和操作说明 TDengine IDMP 的用户界面&#xff08;UI&#xff09;设计旨在提供直观、易用的操作体验。下面介绍 UI 的主要区域和典型操作&#xff1a; 主要区域 IDMP 的用户界面是完全基于浏览器的。登录后的典型 UI 界面具有几个区域&#xff1a; 主菜单&#xff1a;AI…

QT(概述、基础函数、界面类、信号和槽)

一、概述1、QTQT是一个c的第三方库&#xff0c;是专门用来进行界面编程的一个库 1. QT本身实现了多种软件&#xff1a; 2. ubuntu系统中所有界面都是QT做的 3. 最新版本的QQ也是QT做的 4. 嵌入式编程中&#xff0c;几乎所有的上位机&#xff0c;都可以使用QT来做 QT本身除了实现…

【从零开始java学习|第六篇】运算符的使用与注意事项

目录 一、算术运算符 1. 基本算术运算符&#xff08;二元&#xff09; 2. 自增 / 自减运算符&#xff08;一元&#xff09; 二、类型转换&#xff08;隐式与强制&#xff09; 1. 隐式转换&#xff08;自动类型转换&#xff09; ​编辑 2. 强制转换&#xff08;显式类型转…

shellgpt

一、介绍 官网&#xff1a;https://github.com/TheR1D/shell_gpt ShellGPT&#xff08;shell_gpt&#xff09; 是一款把 GPT 系列大模型能力直接搬到终端 的开源命令行生产力工具。用日常英语或中文描述需求&#xff0c;就能帮你 生成、解释甚至自动执行 Shell 命令&#xff…

geoserver sql视图调用Postgis自定义函数问题记录

一、问题描述&#xff1a;geoserver sql视图调用Postgis自定义函数对点图层增加一条记录时&#xff0c;返回结果主键自增ID加了2&#xff0c;但表中数据只增加一条记录。 但在pgAdmin中直接写SQL调用Postgis自定义函数对点图层增加一条记录时&#xff0c;返回结果主键自增ID只加…

#T1224. 最大子矩阵

题目传送 题目描述 已知矩阵的大小定义为矩阵中所有元素的和。给定一个矩阵&#xff0c;你的任务是找到最大的非空(大小至少是11)子矩阵。 比如&#xff0c;如下44的矩阵 0 -2 -7 09 2 -6 2 -4 1 -4 1-1 8 0 -2的最大子矩阵是 9 2-4 1-1 8这…

2025年大模型安全岗的面试汇总(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 1. Transformer核心机制及其对LLM突破的基石作用 2. LLM能力边界评估框架设计 3. 模型层级安全风险分析 …

《关于省级政务云服务费支出预算标准的规定》豫财预〔2024〕106号解读

《关于省级政务云服务费支出预算标准的规定》豫财预〔2024〕106号文件由河南省财政厅编制经省政府同意后于2024年12月3日印发执行&#xff0c;规定作为省级政务云服务费支出预算编制和审核的依据&#xff0c;旨在加强省级部门预算管理&#xff0c;规范政务云服务费支出预算编制…

使用HalconDotNet实现异步多相机采集与实时处理

文章目录 一、核心功能与原理 功能目标: 工作原理: 关键机制: 二、完整C#实现代码 三、关键实现解析 1. 零拷贝图像传输 2. 动态帧率控制 3. HALCON并行优化 4. 异常隔离机制 四、高级优化策略 1. 硬件加速配置 2. 内存池管理 3. 实时性保障 一、核心功能与原理 功能目标:…

《疯狂Java讲义(第3版)》学习笔记ch4

ch4流程控制与数组1.switch语句后的expression表达式的数据类型只能是byte、short、char、int四种证书类型。2.建议不要在循环体内修改循环变量&#xff08;也叫循环计数器&#xff09;的值&#xff0c;否则会增加程序出错的可能性。3.定义数组推荐语法格式&#xff1a;type[] …

COLMAP进行密集重建,三维重建的步骤

密集重建是在稀疏重建的基础上进行的 稀疏重建见&#xff1a;用 COLMAP GUI 在 Windows 下一步步完成 相机位姿估计&#xff08;SfM&#xff09; 和 稀疏点云重建的详细步骤&#xff1a;_colmap database导入图片位姿-CSDN博客 完成稀疏重建后直接进入以下步骤进行密集重建&am…

基于飞算JavaAI实现Reactor模式服务器的深度实践

一、飞算JavaAI技术概述 1.1 飞算JavaAI平台简介飞算JavaAI是飞算科技推出的智能化Java开发平台&#xff0c;通过AI技术赋能传统软件开发流程&#xff0c;为开发者提供从需求分析到代码实现的全流程智能化解决方案。该平台深度融合了人工智能技术与软件开发实践&#xff0c;具备…

量子人工智能

量子人工智能&#xff08;QAI&#xff09;是量子计算与人工智能的强大融合。这一领域旨在将量子系统独特的计算能力与人工智能的模式识别和学习能力相结合&#xff0c;以更快、更高效地解决问题。 量子人工智能与常规人工智能的区别是什么&#xff1f;常规人工智能在经典计算机…

算法题Day1

1. 练习1&#xff1a;Hello,World!解题步骤:using namespace std; int main() {cout<<"Hello,World!"<<endl;return 0; }2. 练习2&#xff1a;打印飞机解题步骤:#include <iostream> using namespace std; int main() {cout << " …

Cypher注入详解:原理、类型与测试方法

Cypher&#xff0c;全称为 (Open) Cypher Query Language&#xff0c;是一种专为图数据库设计的声明式查询语言。它以直观的模式匹配方式&#xff0c;帮助开发者和数据分析师从复杂的图结构数据中检索、创建和修改信息。如果说 SQL 是关系型数据库的语言&#xff0c;那么 Cyphe…

PG靶机 - Pelican

一、 初步侦察与服务探测 1.1 端口扫描与服务识别 首先&#xff0c;对目标主机 192.168.163.98 进行全面的端口扫描&#xff0c;以识别所有开放的服务。 sudo nmap 192.168.163.98 -p- --min-rate5000 -A图 1: Nmap 扫描结果&#xff0c;显示多个开放端口 扫描结果表明&#xf…

【1】Transformers快速入门:自然语言处理(NLP)是啥?

第一章&#xff1a;自然语言处理&#xff08;NLP&#xff09;是啥&#xff1f;一句话解释&#xff1a; NLP 教电脑听懂人话、说人话的技术 &#xff08;比如让手机听懂你说话、让翻译软件变聪明&#xff09;NLP发展史&#xff1a;电脑学人话的 “翻车史” 第一阶段&#xff08…