一、引言

咱两书接上回,上一篇文章主要介绍了DataStream API一些基本的使用,主要是针对单数据流的场景下,但是在实际的流处理场景中,常常需要对多个数据流进行合并、拆分等操作,以满足复杂的业务需求。Flink 的 DataStream API 提供了一系列强大的多流转换算子,如 union、connect 和 split 等,下面我们来详细了解一下它们的功能和用法。

二、多流转换

2.1 union 算子

union 算子的功能非常直接,就是将多个类型相同的 DataStream 合并成一个新的 DataStream 。它适用于需要将多个来源相同相似的数据合并到一起进行统一处理的场景。例如,在电商场景中,我们可能有来自不同地区的订单流,希望将它们合并起来进行整体的销售统计分析。​

下面通过一个简单的代码示例来展示 union 算子的使用:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class UnionExample {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定义两个数据流DataStream<Integer> stream1 = env.fromElements(1, 2, 3);DataStream<Integer> stream2 = env.fromElements(4, 5, 6);// 使用union合并两个数据流DataStream<Integer> unionStream = stream1.union(stream2);// 打印合并后的数据流unionStream.print();// 执行任务env.execute("Union Example");}
}

 在这个示例中,我们首先创建了两个包含整数的数据流 stream1 和 stream2,然后使用 union 算子将它们合并成一个新的数据流 unionStream 。最后,通过 print 方法将合并后的数据流输出到控制台。合并后的数据特点是,新数据流包含了原始两个数据流中的所有元素,元素的顺序按照它们在原始数据流中的顺序依次排列。

2.2 connect 算子

connect 算子与 union 算子不同,它主要用于连接两个类型可以不同的数据流,并将它们合并成一个 ConnectedStreams 对象。这个对象允许我们在后续处理中分别对两个流的数据进行操作,从而保留流之间的差异。这种特性在需要对不同类型的数据进行关联处理,但又要保持数据类型独立性的场景中非常有用。例如,在一个监控系统中,我们可能有一个数据流表示设备的状态信息(如温度、湿度等数值型数据),另一个数据流表示设备的日志信息(字符串类型),我们可以使用 connect 算子将这两个流连接起来,以便在后续处理中综合分析设备状态和日志。​

以下是使用 connect 算子的代码示例:

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class ConnectExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定义两个不同类型的数据流DataStream<Integer> stream1 = env.fromElements(1, 2, 3);DataStream<String> stream2 = env.fromElements("a", "b", "c");// 使用connect连接两个数据流ConnectedStreams<Integer, String> connectedStreams = stream1.connect(stream2);// 对连接后的流进行处理DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Integer, String, Object>() {@Overridepublic Object map1(Integer value) {return "Integer: " + value;}@Overridepublic Object map2(String value) {return "String: " + value;}});resultStream.print();env.execute("Connect Example");}
}

在这个示例中,我们创建了一个整数类型的数据流 stream1 和一个字符串类型的数据流 stream2 。通过 connect 算子将它们连接成 ConnectedStreams 对象,然后使用 CoMapFunction 对连接后的流进行处理。CoMapFunction 包含两个方法 map1 和 map2 ,分别用于处理来自不同流的数据。最终,将处理结果输出到控制台。

2.3 split 算子

split 算子的作用与合并相反,它用于将一个 DataStream 根据某些条件拆分成多个 DataStream 。在实际应用中,我们常常需要根据数据的不同特征对数据流进行分类处理,split 算子就可以帮助我们实现这一需求。比如,在一个电商订单处理系统中,我们可以根据订单金额的大小将订单流拆分成小额订单流和大额订单流,以便对不同金额范围的订单进行不同的处理策略。​

下面是使用 split 算子的代码实现:

import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;
import java.util.List;public class SplitExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定义一个包含整数的数据流DataStream<Integer> stream = env.fromElements(1, 20, 3, 40, 5, 60);// 使用split算子拆分数据流SplitStream<Integer> splitStream = stream.split(new OutputSelector<Integer>() {@Overridepublic Iterable<String> select(Integer value) {List<String> output = new ArrayList<>();if (value > 10) {output.add("large");} else {output.add("small");}return output;}});// 获取拆分后的两个数据流DataStream<Integer> largeStream = splitStream.select("large");DataStream<Integer> smallStream = splitStream.select("small");// 打印拆分后的数据流largeStream.print("Large Stream: ");smallStream.print("Small Stream: ");env.execute("Split Example");}
}

在这个示例中,我们定义了一个包含整数的数据流 stream 。通过 split 算子和自定义的 OutputSelector,根据数值大小将数据流拆分成两个子流:largeStream 包含大于 10 的整数,smallStream 包含小于等于 10 的整数。最后,分别将这两个子流输出到控制台,并加上相应的标识以便区分。

三、数据下沉(Sink)

在流处理应用中,将处理后的结果数据输出到各种存储介质是非常重要的一环。Flink 提供了丰富的数据下沉(Sink)操作,支持将数据写入文件、Kafka、数据库等多种存储系统,以满足不同场景下的数据持久化和后续处理需求。

3.1 写入文件​

Flink 提供了多种将数据写入文件的方法,其中常用的是 writeAsText 方法,它将数据流中的元素以文本形式写入文件。例如,我们可以将处理后的订单数据写入文件,以便后续进行数据分析或存档。​

以下是一个完整的代码示例,展示如何从流数据到文件写入的全流程:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FileSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定义一个包含字符串的数据流DataStream<String> stream = env.fromElements("apple", "banana", "cherry");// 将数据流写入文件,路径为output.txtstream.writeAsText("output.txt");// 执行任务env.execute("File Sink Example");}
}

在上述代码中,我们首先创建了一个包含水果名称的数据流 stream 。然后,使用 writeAsText 方法将数据流中的元素写入名为 output.txt 的文件中。在实际应用中,需要注意文件写入模式和配置,writeAsText 默认使用追加模式写入文件,如果文件已存在,新的数据会追加到文件末尾。还可以通过配置参数来指定写入模式(如覆盖模式)、缓冲区大小等。例如,通过设置 env.setBufferTimeout (1000) 可以调整缓冲区的超时时间,当缓冲区数据达到一定时间(这里是 1 秒)或大小限制时,会被写入文件。

3.2 写入 Kafka​

Flink 与 Kafka 的集成非常紧密,通过 FlinkKafkaProducer 可以方便地将处理结果写入 Kafka 主题。这种方式在构建实时数据管道时非常常见,处理后的数据可以被其他系统从 Kafka 中消费,实现数据的共享和进一步处理。其原理是通过配置 Kafka 集群的地址、目标主题等参数,FlinkKafkaProducer 将 Flink 处理后的数据流转换为 Kafka 可接受的消息格式,并发送到指定的 Kafka 主题中 。​

以下是连接 Kafka、配置参数以及数据写入 Kafka 的代码示例:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;public class KafkaSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定义一个包含字符串的数据流DataStream<String> stream = env.fromElements("message1", "message2", "message3");// 配置Kafka参数Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("acks", "all");// 创建FlinkKafkaProducer,将数据写入Kafka的test-topic主题FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("test-topic",new SimpleStringSchema(),properties);// 将数据流写入Kafkastream.addSink(kafkaProducer);// 执行任务env.execute("Kafka Sink Example");}
}

在这个示例中,我们首先创建了一个包含消息的数据流 stream 。然后,配置了 Kafka 的连接参数,包括 Kafka 集群地址(bootstrap.servers)和 acks 参数(这里设置为 "all",表示等待所有副本确认写入成功,以确保数据的可靠性)。接着,创建了 FlinkKafkaProducer 对象,指定了要写入的 Kafka 主题(test-topic)和序列化器(SimpleStringSchema,用于将字符串数据转换为 Kafka 消息格式)。最后,通过 addSink 方法将数据流写入 Kafka。

3.3 写入数据库(以 Redis 为例)​

以 Redis 为例,将 Flink 处理后的数据写入 Redis 可以实现数据的快速存储和查询,适用于对数据读写性能要求较高的场景。连接 Redis 数据库的步骤主要包括引入相关依赖、创建 Jedis 连接池配置以及在 Flink 中自定义 Sink 函数来实现数据写入。​

以下是自定义 RichSinkFunction 实现数据写入 Redis 的代码:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPoolConfig;public class RedisSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 定义一个包含字符串的数据流DataStream<String> stream = env.fromElements("key1:value1", "key2:value2", "key3:value3");// 将数据流写入Redisstream.addSink(new RedisSink());// 执行任务env.execute("Redis Sink Example");}public static class RedisSink extends RichSinkFunction<String> {private transient Jedis jedis;@Overridepublic void open(org.apache.flink.configuration.Configuration parameters) throws Exception {super.open(parameters);// 创建Jedis连接池配置JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();// 初始化Jedis连接,连接到本地Redis服务jedis = new Jedis("localhost", 6379);}@Overridepublic void invoke(String value, Context context) throws Exception {// 按冒号分割字符串,得到键值对String[] parts = value.split(":", 2);String key = parts[0];String data = parts[1];// 将数据写入Redisjedis.set(key, data);}@Overridepublic void close() throws Exception {super.close();// 关闭Jedis连接if (jedis != null) {jedis.close();}}}
}

 在这段代码中,我们定义了一个自定义的 RichSinkFunction,即 RedisSink 。在 open 方法中,创建了 Jedis 连接池配置,并初始化了 Jedis 连接,连接到本地的 Redis 服务(地址为localhost,端口为 6379)。在 invoke 方法中,对输入的字符串进行处理,将其按冒号分割为键值对,然后使用 jedis.set 方法将数据写入 Redis。在 close 方法中,关闭 Jedis 连接,释放资源。通过这种方式,Flink 处理后的数据流中的数据就可以成功写入 Redis 数据库。

四、总结

Flink DataStream API 的多流转换操作,如 union、connect 和 split 等算子,为我们提供了强大的工具,使他们能够灵活地处理多个数据流之间的复杂关系。通过这些算子,我们可以将不同来源的数据进行合并,对不同类型的数据进行关联处理,以及根据数据特征进行分类拆分,从而满足各种复杂业务场景下的流处理需求。​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/88489.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/88489.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/88489.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity3D游戏线上崩溃排查指南

前言 排查Unity3D线上游戏崩溃是个系统工程&#xff0c;需要结合工具链、日志分析和版本管理。以下是详细的排查指南和关键步骤&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&#xff0c;希望大家可以点击进来一起交流一下开发经验呀&#xff01; 一、崩溃信息收…

DPDK性能优化实践:系统级性能调优的方法论与实战(一套通用的方法论)

性能优化的挑战与现实困境 在高性能网络处理领域&#xff0c;性能优化往往被视为一门“玄学”而非科学。许多开发者在面对性能瓶颈时&#xff0c;要么盲目追求单一指标的极致优化&#xff0c;要么采用"试错法"进行零散的局部调优&#xff0c;结果往往是投入大量精力却…

Docker的/var/lib/docker/目录占用100%的处理方法

文章目录 一、问题描述 二、解决措施 三、可能遇到的问题 问题1、问题描述&#xff1a;执行 sudo systemctl stop docker 命令时&#xff0c;提示 Warning: Stopping docker.service, but it can still be activated by: docker.socket 问题2、问题描述&#xff1a;执行 s…

【UE教程/进阶】Slate链式编辑原理

目录链式编辑操作" . "操作" "操作" [ ] "链式编辑 SNew().&#xfeff;[] 操作" . " SLATE_ARGUMENT(ArgType, ArgName) 宏 调用宏 SLATE_PRIVATE_ARGUMENT_VARIABLE(ArgType, ArgName) &#xff0c;在FArgument结构体中添加了变量…

将手工建模模型(fbx、obj)转换为3dtiles的免费工具!

文章目录1、工具下载2、使用说明3、详细说明命令行格式示例命令参数说明4、源码地址1、工具下载 百度网盘下载链接 选择最新版本下载即可&#xff0c;支持Linux和Windows系统 2、使用说明 1&#xff09;按住键盘winr键&#xff0c;在弹出的窗口中输入cmd 2&#xff09;点击…

FreeRTOS源码学习之内核初始化

目录 前言 一、主函数内容 二、osKernelInitialize ()内核初始化函数内容 三、IS_IRQ()宏定义中断检测函数内容 四、如果这篇文章能帮助到你&#xff0c;请点个赞鼓励一下吧ξ( ✿&#xff1e;◡❛)~ 前言 使用STM32CubeMX添加FreeRTOS进入工程之后&#xff0c;会自动在ma…

Docker—— 镜像构建原因

在现代软件开发和运维中&#xff0c;Docker已成为一种非常流行的工具&#xff0c;它通过容器化应用程序来简化部署过程。然而&#xff0c;默认的官方镜像往往只能满足基础需求&#xff0c;无法涵盖所有特定项目的具体要求。原因说明系统级改动无法通过 volume 实现修改用户、删…

锂电池自动化生产线的现状与发展

锂电池自动化生产线的概述锂电池自动化生产线是指采用自动化设备和控制系统&#xff0c;实现锂电池从原材料到成品的全流程自动化生产过程。随着新能源产业的快速发展&#xff0c;锂电池作为重要的储能元件&#xff0c;其生产制造技术也在不断进步。自动化生产线通过减少人工干…

java底层的native和沙箱安全机制

沙箱安全机制沙箱&#xff08;Sandbox&#xff09;安全机制是一种将程序或代码运行在隔离环境中的安全技术&#xff0c;旨在限制其对系统资源&#xff08;如文件系统、网络、内存、其他进程等&#xff09;的访问权限&#xff0c;从而降低潜在恶意代码带来的风险。其核心思想是“…

【分享】文件摆渡系统适配医疗场景:安全与效率兼得

根据国家信息安全相关法规要求&#xff0c;医院为了网络安全&#xff0c;大多会采用网闸等隔离手段&#xff0c;将网络隔离为内网和外网&#xff0c;但网络隔离后&#xff0c;医院的内外网间仍存在较为频繁的文件摆渡需求。文件摆渡系统则是可以解决跨网络或跨安全域文件传输中…

vscode 中的 mermaid

一、安装软件 Mermaid preview Mermaid support 二、运行命令 创建.md 文件右键选择 ​Open Preview​&#xff08;或按 CtrlShiftV&#xff09; 三、流程图 注意&#xff1a; 要md 文件要保留 mermaid 1. #mermaid-svg-nchqbvlWePe5KCwJ {font-family:"trebuchet ms"…

微服务引擎 MSE 及云原生 API 网关 2025 年 6 月产品动态

点击此处&#xff0c;了解微服务引擎 MSE 产品详情。

【TCP/IP】7. IP 路由

7. IP 路由7. IP 路由概述7.1 直接传递与间接传递7.2 IP 路由核心机制7.3 路由表7.3.1 路由表的构成7.3.2 信宿地址采用网络地址的好处7.3.3 下一跳地址的优势7.3.4 特殊路由表项7.3.5 路由算法7.4 静态路由7.4.1 特点7.4.2 自治系统&#xff08;AS&#xff09;7.4.3 配置命令7…

xFile:高性能虚拟分布式加密存储系统——Go

xFile&#xff1a;高性能虚拟分布式加密存储系统 目录xFile&#xff1a;高性能虚拟分布式加密存储系统1 背景介绍2 设计初衷与目标3 项目简介4 系统架构5 核心优势1. 真正的分布式块存储2. 块级加密与压缩&#xff0c;安全高效3. 灵活的索引与元数据管理4. 多用户与权限体系5. …

时序数据库:高效处理时间序列数据的核心技术

时序数据库概述时序数据库&#xff08;Time Series Database&#xff0c;TSDB&#xff09;是一种专门为存储、处理和查询时间序列数据而优化的数据库系统。随着物联网、金融科技、工业互联网等领域的快速发展&#xff0c;时序数据呈现出爆炸式增长&#xff0c;传统的关系型数据…

面试官:你再问TCP三次握手,我就要报警了!

CP三次握手和四次挥手&#xff0c;是面试官最爱问的“开场白”之一 别看它基础&#xff0c;真要讲清楚细节&#xff0c;分分钟让你冷汗直流&#xff01; 这玩意儿就跟程序员相亲一样&#xff1a; 表面上问的是“你老家哪的” 实际上是在试探你有没有房、有没有车、能不能落…

RuoYi+Uniapp(uni-ui)开发商城系统

如果你正在考虑用 RuoYi 和 UniApp&#xff08;uni-ui&#xff09;搭建一套商城系统&#xff0c;那这套组合确实值得好好研究。它整合了 RuoYi 的快速开发能力和 UniApp 的跨平台特性&#xff0c;在高效开发的同时还能兼顾多端适配的需求。下面从技术架构、功能模块、开发实践到…

面试150 二叉树的最大高度

思路 考虑从递归出发&#xff0c;联想递归三部曲&#xff1a;返回什么、传入的参数是什么、遍历的方式是什么。此题现在需要我们整个树&#xff0c;并且需要从根节点出发&#xff0c;因此我们选择先序遍历即可。另一张办法&#xff0c;则是选择通过队列实现层次遍历&#xff0c…

从零实现一个GPT 【React + Express】--- 【2】实现对话流和停止生成

摘要 这是本系列文章的第二篇&#xff0c;开始之前我们先回顾一下上一篇文章的内容&#xff1a; 从零实现一个GPT 【React Express】— 【1】初始化前后端项目&#xff0c;实现模型接入SSE 在这一篇中&#xff0c;我们主要创建了前端工程和后端工程&#xff0c;这里贴一下我…

SEQUENCE在RAC多实例开启CACHE的NEXTVAL数值乱序问题

问题说明 在多实例环境中可能会出现从Sequence所取出来的nextval是乱序的&#xff0c;比如第二次比第一次所取的数要小但这并不是我们所希望的。当程序逻辑Base on sequence.nextval数值所谓填充字段的大小来排序时&#xff0c;就会产生问题。 实际上就是由于多实例这一特性造成…