题目:

利用flink统计网站浏览量,并写入redis。

利用窗口函数以及算子实现每小时PV(网站的页面浏览量)统计,对统计后结果数据格式进行设计,存储至Redis中(利用sink将处理后结果数据输出到redis数据库中)。

操作步骤:

1.redis在虚拟机上的安装

(1)下载redis安装包

cd /opt/software # 进入一个用于存放安装包的目录,可自行选择

wget http://download.redis.io/releases/redis-6.2.6.tar.gz

# 下载Redis 6.2.6版本,可根据需求更换版本号

(2)解压安装包——使用tar命令解压下载好的安装包

tar -zxvf redis-6.2.6.tar.gz

解压后会生成一个名为redis-6.2.6的目录。

(3)安装gcc编译工具

yum install -y gcc gcc-c++ make

(4)编译和安装 Redis

cd redis-6.2.6

make # 编译Redis,此过程可能需要一些时间,取决于虚拟机性能

make install # 安装Redis,默认会安装到/usr/local/bin目录下

(5)配置redis—— Redis 默认没有生成配置文件,需要手动创建相关目录和文件:

mkdir /etc/redis # 创建存放配置文件的目录

cp /opt/redis-6.2.6/redis.conf /etc/redis/ # 将示例配置文件复制到新目录下,路径根据实际解压位置调整

如图所示,redis安装成功并且进行了正常访问。

此处还需要关闭虚拟机上的防火墙,使redis的6379端口可以被正常访问。

2.代码展示

项目创建

在 IntelliJ IDEA 中,选择File -> New -> Project,然后选择Maven,按照向导创建一个新的 Maven 项目。在pom.xml文件中添加以下依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flink-pv-redis</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.13.6</flink.version><redis.clients.version>3.8.0</redis.clients.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- Flink 核心依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><!-- Flink Redis 连接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><!-- Redis 客户端依赖 --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>${redis.clients.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency></dependencies><!-- 阿里云镜像 --><repositories><repository><id>aliyunmaven</id><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url></repository></repositories><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin></plugins></build>
</project>

定义数据类型

创建一个 Java 类来表示UserBehavior,UserBehavior.csv文件的每一行包含userId、behavior等五个字段,以逗号分隔。

package Bean;public class UserBehavior {private Long userId;private Long itemId;private Integer categoryId;private String behavior;  // 行为类型:"pv"为页面浏览private Long timestamp;   // 时间戳(毫秒)public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {this.userId = userId;this.itemId = itemId;this.categoryId = categoryId;this.behavior = behavior;this.timestamp = timestamp;}public Long getUserId() {return userId;}public void setUserId(Long userId) {this.userId = userId;}public Long getItemId() {return itemId;}public void setItemId(Long itemId) {this.itemId = itemId;}public Integer getCategoryId() {return categoryId;}public void setCategoryId(Integer categoryId) {this.categoryId = categoryId;}public String getBehavior() {return behavior;}public void setBehavior(String behavior) {this.behavior = behavior;}public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}}

编写Flink程序

创建一个主类,比如PvStatisticsToRedis.java,编写 Flink 程序来统计每小时 PV 并写入 Redis。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.connector.redis.sink.RedisSink;
import org.apache.flink.connector.redis.sink.RedisSinkFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;public class PvStatisticsToRedis {public static void main(String[] args) throws Exception {// 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从 CSV 文件读取数据,假设第一行是表头,这里简单跳过DataStream<UserBehavior> userBehaviorDataStream = env.readTextFile("/mnt/UserBehavior(1).csv").skip(1).map(new MapFunction<String, UserBehavior>() {@Overridepublic UserBehavior map(String line) throws Exception {String[] parts = line.split(",");long userId = Long.parseLong(parts[0]);String behavior = parts[1];return new UserBehavior(userId, behavior);}});// 过滤出 pv 行为的数据SingleOutputStreamOperator<Tuple2<String, Long>> pvStream = userBehaviorDataStream.filter(behavior -> "pv".equals(behavior.getBehavior())).map(behavior -> Tuple2.of("pv", behavior.getUserId())).returns(Types.TUPLE(Types.STRING, Types.LONG));// 按照 "pv" 进行分组,并使用滑动窗口统计每小时的 PV 数量KeyedStream<Tuple2<String, Long>, String> keyedStream = pvStream.keyBy(t -> t.f0);SingleOutputStreamOperator<Map<String, Object>> pvCountStream = (SingleOutputStreamOperator<Map<String, Object>>) keyedStream.window(TumblingProcessingTimeWindows.of(Time.hours(1))).process(new KeyedProcessFunction<String, Tuple2<String, Long>, Map<String, Object>>() {private transient HashSet<Long> userIds;@Overridepublic void open(Configuration parameters) throws Exception {userIds = new HashSet<>();}@Overridepublic void processElement(Tuple2<String, Long> value, Context ctx, Collector<Map<String, Object>> out) throws Exception {userIds.add(value.f1);Map<String, Object> result = new HashMap<>();result.put("window_end_time", ctx.timerService().currentProcessingTime());result.put("pv_count", userIds.size());out.collect(result);}@Overridepublic void close() throws Exception {userIds.clear();}});// 将统计结果转换为字符串格式SingleOutputStreamOperator<String> resultStream = pvCountStream.map(new MapFunction<Map<String, Object>, String>() {@Overridepublic String map(Map<String, Object> value) throws Exception {return "window_end_time: " + value.get("window_end_time") + ", pv_count: " + value.get("pv_count");}});// 配置 Redis 连接FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();// 创建 Redis SinkRedisSinkFunction<String> redisSinkFunction = new RedisSinkFunction<>(conf, new RedisMapper<String>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH, "pv_statistics");}@Overridepublic String getKeyFromData(String data) {return null;}@Overridepublic String getValueFromData(String data) {return data;}});RedisSink<String> redisSink = new RedisSink<>(redisSinkFunction);// 将结果写入 RedisresultStream.addSink(redisSink);// 打印结果到控制台(可选)resultStream.addSink(new PrintSinkFunction<>());// 执行 Flink 任务env.execute("PV Statistics to Redis");}
}

资源文件中加载日志log4j.poverities,代码如下:

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

运行效果:

3.sink输出到redis数据库

核心原理

Flink 的RedisSink通过自定义RedisMapper实现数据写入 Redis,主要需指定:

Redis 命令:如SET(存储键值对)、HSET(存储哈希)、LPUSH(存储列表)等。

键(Key):用于标识数据的唯一性(如按小时的 PV 统计可将window_end_time作为 Key)。

值(Value):需要存储的具体统计结果(如 PV 数量)。

具体实现步骤

假设统计结果为每小时的 PV 数,设计存储格式如下:

Redis 键(Key):pv_statistics:{window_end_time}(其中window_end_time为窗口结束时间戳,精确到小时)。

Redis 值(Value):该小时的 PV 总数(如12345)。

数据结构:采用String类型(通过SET命令存储),便于后续查询和聚合。

RedisMapper是 Flink 与 Redis 交互的核心接口,需实现 3 个方法:

getCommandDescription():指定 Redis 命令(如SET)。

getKeyFromData():从统计结果中提取 Redis 的 Key。

getValueFromData():从统计结果中提取 Redis 的 Value。

代码实现

RedisMapper<PvResult> redisMapper = new RedisMapper<PvResult>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "pv:hour");}@Overridepublic String getKeyFromData(PvResult data) {SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH");String key = sdf.format(new Date(data.getWindowStart()));System.out.println("Redis Key: " + key);return key;}@Overridepublic String getValueFromData(PvResult data) {return String.valueOf(data.getCount());}
};
package Bean;
public class PvResult {private long windowStart; // 窗口开始时间(毫秒)private long windowEnd;   // 窗口结束时间(毫秒)private long count;       // 该窗口的 PV 数public PvResult() {}public PvResult(long windowStart, long windowEnd, long count) {this.windowStart = windowStart;this.windowEnd = windowEnd;this.count = count;}// Getters & Setterspublic long getWindowStart() { return windowStart; }public void setWindowStart(long windowStart) { this.windowStart = windowStart; }public long getWindowEnd() { return windowEnd; }public void setWindowEnd(long windowEnd) { this.windowEnd = windowEnd; }public long getCount() { return count; }public void setCount(long count) { this.count = count; }
}

配置 Redis 连接

通过FlinkJedisPoolConfig配置 Redis 连接信息(如主机、端口、密码等):

FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder().setHost("192.168.100.20") // 虚拟机Redis IP.setPort(6379).build();

注意,这里连接的是虚拟机上的redis,需要更改为虚拟机上的IP地址。

效果展示:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/90222.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/90222.shtml
英文地址,请注明出处:http://en.pswp.cn/web/90222.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用Imgui和SDL2做的一个弹球小游戏-Bounze

使用Imgui和SDL2做的一个弹球小游戏-Bounze 油管上面TheCherno博主分享的一个视频FIRST GAME in C! Did He Do a Good Job? // Code Review (C/SDL2)里面分享了一个Github项目&#xff1a; https://github.com/staticaron/Bounze 使用了Imgui和SDL2&#xff0c;并且可以设置音…

SQL 中 CASE WHEN 及 SELECT CASE WHEN 的用法

SQL 中 CASE WHEN 及 SELECT CASE WHEN 的用法 CASE WHEN 是 SQL 中非常实用的条件表达式&#xff0c;它允许你在查询中实现条件逻辑。以下是详细的用法说明&#xff1a; 1. 基本语法结构 CASE WHEN condition1 THEN result1WHEN condition2 THEN result2...ELSE default_resul…

CentOS 7 Linux 基础知识点汇总

&#x1f427; CentOS 7 Linux 基础知识点汇总为方便初学者快速掌握 CentOS 7 系统的核心操作&#xff0c;本文档整理了常用系统命令、快捷键、目录结构及文件后缀名等基础内容&#xff0c;适合入门参考。 一、常见系统命令 &#x1f50d; 命令行提示符说明 终端中的提示符包含…

突发限制下的破局之路:国产之光 Lynx 重构 AI 开发安全壁垒

继 Pro 套餐 “明升暗降” 争议后&#xff0c;Cursor 本周再掀波澜 —— 包括 Claude 系列、GPT-4 在内的主流模型一夜之间对中国用户全面封禁。开发者社群瞬间沸腾&#xff0c;“付费却用不了”“项目数据导不出” 的焦虑刷屏&#xff0c;境外工具的政策波动再次给行业敲响警钟…

渗透测试实战 | docker复杂环境下的内网打点

本文作者&#xff1a;Track-syst1m一.前言本文涉及的相关漏洞均已修复、本文中技术和方法仅用于教育目的&#xff1b;文中讨论的所有案例和技术均旨在帮助读者更好地理解相关安全问题&#xff0c;并采取适当的防护措施来保护自身系统免受攻击。二.大概流程1. 外网打点漏洞利用•…

阿里云服务器 CentOS 7 安装 MySQL 8.4 超详细指南

阿里云服务器 CentOS 7 安装 MySQL 8.4 超详细指南 一、准备工作 系统要求&#xff1a; CentOS 7.9 64位2 核&#xff08;vCPU&#xff09;2 GiBroot 用户权限 服务器连接工具&#xff1a; FinalShell 下载安装包&#xff1a; 访问 MySQL 官网选择版本&#xff1a;MySQL 8.4.0…

解决 Electron 中 window.open 打开新窗口的各种“坑”

嘿&#xff0c;各位开发者们&#xff01;今天我们要聊聊在使用 Electron 时遇到的一个经典问题&#xff1a;如何正确地使用 window.open 来打开新窗口&#xff1f; 这听起来似乎很简单&#xff0c;但实际上却充满了各种“惊喜”&#xff08;或者说“惊吓”&#xff09;。别担心…

朝歌智慧盘古信息:以IMS MOM V6重构国产化智能终端新生态

随着5G、云计算、AI、大数据等技术深度渗透&#xff0c;智能终端行业正迎来场景化创新的爆发期。面对市场需求升级与技术迭代压力&#xff0c;国产化智能终端领域领军企业——广东朝歌智慧互联科技有限公司&#xff08;以下简称“朝歌智慧”&#xff09;&#xff0c;基于集团“…

docker 离线安装postgres+postgis实践

文章目录前言一、离线安装docker二、导出导入PG镜像1.导出2.导入三、启动容器四、验证与测试前言 在企业内网环境中部署地理信息系统&#xff08;GIS&#xff09;时&#xff0c;常常面临网络隔离导致无法在线拉取 Docker 镜像的问题。 本文将详细介绍如何通过离线方式完成 Pos…

视频、音频录制

1&#xff0c;项目介绍。 实现全屏录屏、选择区域录屏、摄像头录像、麦克风录音、主板音频录音、截屏画板的自由组合。并通过FFmpeg完成音频与视频的合并。 功能界面 画板画笔 参考的项目 https://github.com/yangjinming1062/RecordWin 本项目是在此项目的基础上修复了部…

Linux文件系统理解1

目录一、初步理解系统层面的文件1. 文件操作的本质2. 进程管理文件核心思想二、系统调用层1. 打开关闭文件函数2. 读写文件函数三、操作系统文件管理1. 文件管理机制2. 硬件管理机制四、理解重定向1. 文件描述符分配规则2. 重定向系统调用3. 重定向命令行调用五、理解缓冲区1. …

科技向善,银发向暖:智慧养老与经济共筑适老未来

人口老龄化是当今中国社会面临的重大课题&#xff0c;也是推动社会变革与经济转型的重要引擎。随着数字技术的飞速发展&#xff0c;“智慧养老”正以科技向善的温度&#xff0c;为老年群体构建更舒适、更安全、更有尊严的晚年生活&#xff0c;同时为银发经济注入蓬勃活力&#…

numpy库 降维,矩阵创建与元素的选取,修改

目录 1.降维函数ravel()和flatten ravel(): flatten(): 2.矩阵存储与内存结构 3.修改矩阵形状的方法 4.特殊矩阵创建 全零矩阵: 如np.zeros(5) 创建含5个零的一维数组&#xff0c;输出中零后的点&#xff08;如 0.&#xff09;表示浮点数类型。 全一矩阵&#xff1a;如n…

SpringCloud seata全局事务

项目https://github.com/apache/incubator-seata docker拉取启动server $ docker run --name seata-server -p 8091:8091 apache/seata-server:2.1.0 seata注册到nacos <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-…

OpenLayers 快速入门(八)事件系统

看过的知识不等于学会。唯有用心总结、系统记录&#xff0c;并通过温故知新反复实践&#xff0c;才能真正掌握一二 作为一名摸爬滚打三年的前端开发&#xff0c;开源社区给了我饭碗&#xff0c;我也将所学的知识体系回馈给大家&#xff0c;助你少走弯路&#xff01; OpenLayers…

【Linux | 网络】应用层(HTTPS)

目录一、HTTPS的概念二、准备概念2.1 什么是加密和解密2.2 为什么要加密2.3 常见的加密方式2.3.1 对称加密2.3.1 非对称加密2.4 数据摘要&&数据指纹三、HTTPS理解过程3.1 只使用对称加密3.2 只使用非对称加密3.3 双方都使用非对称加密3.4 对称加密 非对称加密3.5 中间…

GRE协议

一、实验拓扑二、实验配置1、静态路由实现GRERT1配置&#xff1a;RT1(config)# int fa1/0RT1(config-if)# ip add 192.168.20.1 255.255.255.0RT1(config-if)# no shutdownRT1(config)# int fa0/0RT1(config-if)# ip add 172.1.1.2 255.255.255.0RT1(config-if)# no shutdownRT…

JDialong弹窗

public class DialogDemo extends JFrame {public DialogDemo(){this.setVisible(true);this.setSize(700,500);this.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);//JFrame 放东西&#xff0c;容器Container contentPane this.getContentPane();//绝对布局conte…

tlias智能学习辅助系统--违纪处理(实战)

目录 1.StudentController.java 2.interface StudentService 3.StudentServiceImpl.java 4.interface StudentMapper 1.StudentController.java // 违纪处理PutMapping("/violation/{id}/{score}")Operation(summary "违纪处理")public Result violat…

传统RNN模型笔记:输入数据长度变化的结构解析

一、案例背景 本案例通过PyTorch的nn.RNN构建单隐藏层RNN模型&#xff0c;重点展示RNN对变长序列数据的处理能力&#xff08;序列长度从1变为20&#xff09;&#xff0c;帮助理解RNN的输入输出逻辑。 二、核心代码与结构拆解 def dm_rnn_for_sequencelen():# 1. 定义RNN模型rnn…