目录

  • 源算子(Source)
    • 从集合中读取数据
    • 从文件读取数据
    • 从Socket读取数据
    • 从Kafka读取数据
    • 从数据生成器读取数据
  • Flink支持的数据类型
      • Flink的类型系统
      • Flink支持的数据类型
      • 类型提示(Type Hints)

源算子(Source)

  Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。

在这里插入图片描述

  在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
  从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

从集合中读取数据

  最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。

在这里插入图片描述

package env;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class CollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从集合读
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 22, 3));// 直接填写元素DataStreamSource<Integer> source = env.fromElements(1, 22, 3);source.print();env.execute();}
}

从文件读取数据

  真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。
读取文件,需要添加文件连接器依赖:

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version>
</dependency>

在这里插入图片描述

package source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FileSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);FileSource.FileSourceBuilder<String> builder = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt"));FileSource<String> fileSource = builder.build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"filesource").print();env.execute();}
}

说明:

  • 参数可以是目录,也可以是文件;还可以从HDFS目录下读取,使用路径hdfs://…;
  • 路径可以是相对路径,也可以是绝对路径;
  • 相对路径是从系统属性user.dir获取路径:idea下是project的根目录,standalone模式下是集群节点根目录;

从Socket读取数据

  不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。
我们之前用到的读取socket文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

DataStream<String> stream = env.socketTextStream("localhost", 7777);

从Kafka读取数据

  Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。

  所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。这里我们需要导入的依赖如下。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version>
</dependency>

在这里插入图片描述

package source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KafkaSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("master:9092,slave1:9092,slave2:9092").setGroupId("kafkasource").setTopics("topic1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"").print();env.execute();}
}

Kafka生产数据

在这里插入图片描述
Flink消费数据

在这里插入图片描述

从数据生成器读取数据

  Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。1.17提供了新的Source写法,需要导入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version>
</dependency>

在这里插入图片描述

package source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 数据生成器Source,四个参数*  第一个:GeneratorFunction接口,需要实现,重写map方法,输入类型固定是Long*  第二个,Long类型,自动生成的数字序列(从0自增)的最大值(小于),达到这个值就停止了*  第三个,限速策略,比如 每秒生成几条数据*  第四个:返回的类型*/DataGeneratorSource<String> source = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) {return "Number:" + value;}}, Long.MAX_VALUE, RateLimiterStrategy.perSecond(100), Types.STRING);env.fromSource(source,WatermarkStrategy.noWatermarks(),"").print();env.execute();}
}

Flink支持的数据类型

Flink的类型系统

Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

Flink支持的数据类型

对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:

(1)基本类型
所有Java基本类型及其包装类,再加上Void、String、Date、BigDecimal和BigInteger。

(2)数组类型
包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。

(3)复合数据类型

  • Java元组类型(TUPLE):这是Flink内置的元组类型,是Java
    API的一部分。最多25个字段,也就是从Tuple0~Tuple25,不支持空字段。
  • Scala 样例类及Scala元组:不支持空字段。
  • 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
  • POJO:Flink自定义的类似于Java bean模式的类。

(4)辅助类型
Option、Either、List、Map等。

(5)泛型类型(GENERIC)

Flink支持所有的Java类和Scala类。不过如果没有按照上面POJO类型的要求来定义,就会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由Flink本身序列化的,而是由Kryo序列化的。
在这些类型中,元组类型和POJO类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。所以,在项目实践中,往往会将流处理程序中的元素类型定为Flink的POJO类型。
Flink对POJO类型的要求如下:

  1. 类是公有(public)的
  2. 有一个无参的构造方法
  3. 所有属性都是公有(public)的
  4. 所有属性的类型都是可以序列化的

类型提示(Type Hints)

Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

为了解决这类问题,Java API提供了专门的“类型提示”(type hints)。
回忆一下之前的word count流处理程序,我们在将String类型的每个词转换成(word, count)二元组后,就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

Flink还专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明确地指定转换之后的DataStream里元素的类型。

returns(new TypeHint<Tuple2<Integer, SomeType>>(){})

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/97882.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/97882.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/97882.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux 安装docker-compose安装方法(安装docker compose安装)

文章目录**方法一&#xff1a;通过 curl 下载二进制文件&#xff08;推荐&#xff09;**1. 安装前准备- **确保已安装 Docker**- **检查 Docker 是否安装成功**2. 下载并安装 Docker Compose- **下载最新版本的 Docker Compose 二进制文件**- **国内加速下载&#xff08;解决 G…

OCR 发票识别与验真接口:助力电子化发票新时代

自 2025 年 10 月 1 日起&#xff0c;纸质火车票彻底告别历史舞台&#xff0c;全面数字化的电子发票取而代之&#xff0c;这一变革标志着票务领域的重大革新&#xff0c;也让电子化发票处理的需求呈井喷式增长。在此背景下&#xff0c;OCR 发票识别和发票验真接口技术挺身而出&…

设计模式:抽象工厂模式(Abstract Factory Pattern)

文章目录一、概念二、实例分析三、完整示例一、概念 抽象工厂模式是一种创建型设计模式。 提供一个接口用于创建一系列相关或相互依赖的对象&#xff0c;而无需指定它们的具体类。 相比于工厂方法模式&#xff0c;抽象工厂模式不仅仅是创建单一产品&#xff0c;而是一族产品&am…

轻量级注意力模型HOTSPOT-YOLO:无人机光伏热异常检测新SOTA,mAP高达90.8%

【导读】 无人机光伏巡检如何更智能、更高效&#xff1f;HOTSPOT-YOLO模型给出了亮眼答案&#xff01;给AI装上“热成像鹰眼”&#xff0c;能精准锁定光伏板上的细微热斑缺陷。它不仅将检测精度&#xff08;mAP&#xff09;提升至90.8%&#xff0c;更在保持实时性的前提下大幅…

CHT共轭传热: 导热系数差异如何影响矩阵系数

文章目录 一、导热系数差异如何影响矩阵系数&#xff1f;二、如何处理系数差异以加速收敛&#xff1f;1. **变量重缩放&#xff08;Scaling of Variables&#xff09;**2. **使用物理型预条件子&#xff08;Physics-based Preconditioning&#xff09;**3. **区域分解法&#x…

Vue Vapor 事件机制深潜:从设计动机到源码解析

基于 vue3.6&#xff08;alpha 阶段&#xff09;及 Vapor 的最新进展撰写&#xff1b;Vapor 仍在演进中&#xff0c;部分实现可能继续优化。TL;DR&#xff08;速览&#xff09; 传统&#xff08;≤3.5&#xff09;&#xff1a;事件以元素为中心绑定&#xff1b;每个元素用 el._…

Day 01(01): Hadoop与大数据基石

目标&#xff1a;建立对大数据生态的整体认知&#xff0c;理解HDFS和MapReduce的核心思想。 8:00-9:30&#xff1a;【视频学习】在B站搜索“Hadoop入门”或“三小时入门大数据”&#xff0c;观看1-2个高播放量的简介视频&#xff0c;了解大数据面临的问题和Hadoop的解决方案。 …

开源 + 免费!谷歌推出 Gemini CLI,Claude Code 的强劲对手

在如今飞速发展的 AI 工具生态中&#xff0c;命令行界面&#xff08;CLI&#xff09;这一开发者与计算机交互的传统方式&#xff0c;正悄然发生着一场颠覆性的变革。2025 年 6 月 25 日&#xff0c;谷歌正式发布开源的 Gemini CLI&#xff0c;这一举措标志着谷歌 Gemini AI 能力…

MacOS - 记录MacOS发烫的好几天 - 幕后黑手竟然是

MacOS - 记录MacOS发烫的好几天 - 幕后黑手竟然是 Mac是不可能出bug的&#xff0c;一定是世界出bug了。 前言 几天前Mac突然开始烫烫的&#xff0c;就这么一烫烫了好几天。这可不行&#xff0c;所以看了下“活动监视器”&#xff0c;发现了一个Code Helper(Plugin)占据200%上下…

Vue基础知识-Vue中:class与:style动态绑定样式

完整源码<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><script src&quo…

终于赶在考试券过期前把Oracle OCP证书考下来了!

&#x1f6a9; 今天终于能松口气了——Oracle OCP证书到手&#xff01; 差点白白浪费一次考试机会&#xff08;1700&#xff09;&#xff01;3月底报名了Oracle OCP&#xff0c;摆烂了大半年&#xff0c;终于是逼着自己在考试券过期前考完了082和083科目&#xff0c;目前已经顺…

Power BI学习笔记-周报销售数据分析

Power BI学习笔记-周报销售数据分析 简介 来自B站的Power BI学习视频的学习笔记。 记录来自B站的Power BI教学视频&#xff0c;由“高级财务BP-Ni”发布&#xff0c;视频发布者主要发布财务类相关的PBI视频&#xff0c;视频长度30分钟左右。 视频链接&#xff1a; 【powerbi周报…

Oracle 数据库与操作系统兼容性指南

前言 作为一个在 Oracle 坑里摸爬滚打多年的老 DBA&#xff0c;最怕听到的就是"这个版本能不能装在这个系统上&#xff1f;"这种问题。昨天又有朋友来问我 Oracle 数据库和操作系统的兼容性&#xff0c;索性把这些年积累的官方兼容性列表整理出来&#xff0c;省得大家…

pytorch初级

本文章是本人通过读《Pytorch实用教程》第二版做的学习笔记&#xff0c;深度学习的核心部分&#xff1a;数据准备 ➡️ 模型构建 ➡️ 模型训练 ➡️ 模型评估与应用。根据上面的思路&#xff0c;我们分为几个部分&#xff1a; 第一部分&#xff1a;PyTorch 基础 - 涵盖了从基本…

UniApp 混合开发:Plus API 从基础到7大核心场景实战的完整指南

在 UniApp 混合开发中&#xff0c;plus API 是连接前端代码与原生设备能力的核心桥梁。基于 5 Runtime&#xff0c;它封装了设备硬件、系统交互、网络通信等近百种原生能力&#xff0c;解决了 UniApp 跨端 API 覆盖不足的问题。但直接使用 plus API 常面临兼容性复杂、回调嵌套…

本周难点问题详细总结

&#x1f4cb; 本周技术问题总结 &#x1f534; 1. 表单校验与用户体验 1.1 表单错误提示不规范 问题&#xff1a;校验失败时缺少页面标识位置&#xff1a;SupplierForm.vue:375代码示例&#xff1a;message.error([基本信息] 表单校验失败&#xff0c;请检查必填字段)影响&…

下一代自动驾驶汽车系统XIL验证方法

摘要自动驾驶汽车测试仍是一个新兴且尚未成熟的过程&#xff0c;全球统一的测试流程尚需时日。实车测试对资源要求极高&#xff0c;因此开发并提升基于虚拟环境的测试方法的效率至关重要。有鉴于此&#xff0c;本文提出一种新颖的 X-in-the-Loop&#xff08;XIL&#xff0c;X 代…

视频数据如何联网共享?

视频数据如何联网共享&#xff1f; 视频联网共享系统&#xff0c;实现前端设备的接入管理以及接入数据的获取。前端设备包括视频设备、卡口设备、Wifi数据采集设备、移动采集设备以及GPS/北斗数据采集设备等。系统实现海量视频数据的快速检索&#xff0c;并为上层数据应用提供视…

Django项目开发全链路:数据库操作、多环境配置、windows/linux项目部署一站式指南

Django项目开发全链路:数据库操作、多环境配置、windows/linux项目部署一站式指南 一、项目初始化 二、创建第一个应用 三、数据库与数据模型的应用 四、创建管理后台用户 五、数据模型与数据库交互之添加 六、数据模型与数据库交互之修改 七、数据模型与数据库交互之查询 八、…

GLib多线程编程实践:从数据结构到线程池的完整指南

引言 GLib是一个功能丰富、跨平台的C程序库,提供了大量高效且经过充分测试的数据结构与算法接口。本文将通过一个完整的实践案例,介绍如何使用GLib实现动态数组、链表、平衡二叉树和线程池,并分享在实际开发中遇到的常见问题及解决方案。 一、GLib核心数据结构实践 1.1 动…