目录

  • 基本转换算子
    • 映射(map)
    • 过滤(filter)
    • 扁平映射
  • 聚合算子
    • 按键分区(keyBy)
    • 简单聚合(sum/min/max/minBy/maxBy)
    • 规约聚合(reduce)

基本转换算子

有如下POJO类用来做测试使用

package transform;import java.util.Objects;public class WaterSensor {public String id;public Long ts;public Integer vc;public WaterSensor() {}public WaterSensor(String id, Long ts, Integer vc) {this.id = id;this.ts = ts;this.vc = vc;}public String getId() {return id;}public void setId(String id) {this.id = id;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public Integer getVc() {return vc;}public void setVc(Integer vc) {this.vc = vc;}@Overridepublic String toString() {return "WaterSensor{" +"id='" + id + '\'' +", ts=" + ts +", vc=" + vc +'}';}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;WaterSensor that = (WaterSensor) o;return Objects.equals(id, that.id) &&Objects.equals(ts, that.ts) &&Objects.equals(vc, that.vc);}@Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}
}

映射(map)

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。

我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改变。
下面的代码用不同的方式,实现了提取WaterSensor中的id字段的功能。

在这里插入图片描述

package transform;
import env.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MapDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 2L, 2),new WaterSensor("s3", 3L, 3));SingleOutputStreamOperator<String> map = source.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor value) {return value.getId();}});map.print();env.execute();}
}

在这里插入图片描述

过滤(filter)

  filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。

进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。

在这里插入图片描述

package transform;import env.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FilterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 11L, 11),new WaterSensor("s2", 2L, 2),new WaterSensor("s3", 3L, 3));//        SingleOutputStreamOperator<WaterSensor> filter = source.filter(new FilterFunction<WaterSensor>() {
//            @Override
//            public boolean filter(WaterSensor value) throws Exception {
//                return "s1".equals(value.getId());
//            }
//        });SingleOutputStreamOperator<WaterSensor> filter = source.filter(value -> "s1".equals(value.getId()));filter.print();env.execute();}
}

扁平映射

flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

同map一样,flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。

在这里插入图片描述

package transform;import env.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlatMapDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<env.WaterSensor> source = env.fromElements(new env.WaterSensor("s1", 1L, 1),new env.WaterSensor("s1", 11L, 11),new env.WaterSensor("s2", 2L, 2),new env.WaterSensor("s3", 3L, 3));source.flatMap(new FlatMapFunction<WaterSensor, String>() {@Overridepublic void flatMap(WaterSensor value, Collector<String> out) throws Exception {String id = value.getId();if("s1".equals(id)){out.collect(value.getTs().toString());}else if("s2".equals(id)){out.collect(value.getTs().toString());out.collect(value.getVc().toString());}}}).print();env.execute();}
}

在这里插入图片描述

聚合算子

按键分区(keyBy)

  对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。

  keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。
基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。

在内部,是通过计算key的哈希值(hash code),对分区数进行取模运算来实现的。所以这里key如果是POJO的话,必须要重写hashCode()方法

keyBy()方法需要传入一个参数,这个参数指定了一个或一组key。有很多不同的方法来指定key:比如对于Tuple数据类型,可以指定字段的位置或者多个位置的组合;对于POJO类型,可以指定字段的名称(String);另外,还可以传入Lambda表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取key的逻辑。

我们可以以id作为key做一个分区操作,代码实现如下:

在这里插入图片描述

package aggregate;import env.WaterSensor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByDemo {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 11L, 11),new WaterSensor("s2", 2L, 2),new WaterSensor("s3", 3L, 3));source.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor value) throws Exception {return value.getId();}});}
}

需要注意的是,keyBy得到的结果将不再是DataStream,而是会将DataStream转换为KeyedStream。KeyedStream可以认为是“分区流”或者“键控流”,它是对DataStream按照key的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定key的类型。

KeyedStream也继承自DataStream,所以基于它的操作也都归属于DataStream API。但它跟之前的转换操作得到的SingleOutputStreamOperator不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如sum,reduce)。

简单聚合(sum/min/max/minBy/maxBy)

有了按键分区的数据流KeyedStream,我们就可以基于它进行聚合操作了,Flink为我们内置实现了一些最基本,最简单的聚合API,主要有以下几种:

  • sum():在输入流上,对指定的字段做叠加求和的操作
  • min():在输入流上,对指定的字段求最小值
  • max():在输入流上,对指定的字段求最大值
  • minBy():与min()类似,在输入流上针对指定字段求最小值,不同的是,min()只计算指定字段最小值,其他字段会保留最初第一个数据的值,而minBy()则会返回包含字段最小值的整条数据

max和maxby区别:
max:只取比较字段最大值,非比较字段保留第一次的值
maxby:取比较字段的最大值,同时非比较字段取最大值这条数据的值

简单聚合算子:
keyby之后才能调用
分组内的聚合:对同一个key的数据进行聚合

规约聚合(reduce)

reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
调用KeyedStream的reduce方法时,需要传入一个参数,实现ReduceFunction接口。接口在源码中的定义如下:

public interface ReduceFunction<T> extends Function, Serializable {T reduce(T value1, T value2) throws Exception;
}

ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

在这里插入图片描述

在这里插入图片描述

keyby之后调用
输入类型 = 输出类型,类型不能变
每个key的第一条数据来的时候,不会执行reduce方法,存起来,直接输出
reduce方法中的两个参数:
value1:之前的计算结果,存状态
value2:现在来的数据

package aggregate;import env.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> source = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 11L, 11),new WaterSensor("s1", 21L, 21),new WaterSensor("s2", 2L, 2),new WaterSensor("s3", 3L, 3));KeyedStream<WaterSensor, String> waterSensorStringKeyedStream = source.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor value) {return value.id;}});SingleOutputStreamOperator<WaterSensor> reduce = waterSensorStringKeyedStream.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("value1: " + value1);System.out.println("value2: " + value2);return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);}});reduce.print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/97964.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/97964.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/97964.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从淘宝推荐到微信搜索:查找算法如何支撑亿级用户——动画可视化

本篇技术博文摘要 &#x1f31f; 本文通过动画可视化深入解析数据结构中的核心查找算法&#xff0c;从基础概念到高阶应用&#xff0c;全面覆盖顺序查找、折半查找、分块查找、B树/B树及散列查找的核心原理与实现细节。文章以动态演示为核心工具&#xff0c;直观展现算法执行过…

图像正向扭曲反向扭曲

在图像处理领域&#xff0c;正向扭曲&#xff08;Forward Warping&#xff09;和反向扭曲&#xff08;Backward Warping&#xff09;是两种核心的图像坐标映射与像素重采样技术&#xff0c;核心区别在于“像素映射的方向”——是从“原始图像”到“目标图像”&#xff0c;还是从…

【C语言】 第三课 函数与栈帧机制详解

1 函数的基本概念 在C语言中&#xff0c;函数是程序的基本执行单元。一个函数的定义包括返回类型、函数名、参数列表和函数体。例如&#xff1a; int add(int x, int y) { // 函数定义int z x y;return z; }在使用函数前&#xff0c;通常需要声明&#xff08; declaration&am…

多个大体积PDF文件怎么按数量批量拆分成多个单独文件

在现代社会中&#xff0c;电子文档在我们的身边无所不在&#xff0c;而PDF文件时我们日常接触非常多的文档类型之一。PDF由于格式稳定、兼容性好&#xff0c;因此经常被用于各行各业。但是&#xff0c;我们平时在制作或搜集PDF文件时&#xff0c;文件太大&#xff0c;传输和分享…

ansible-角色

角色 一、利用角色构造ansible playbook 随着开发更多的playbook&#xff0c;会发现有很多机会重复利用以前编写的playbook中的代码。或许&#xff0c;一个用于为某一应用配置MySQL数据库的play可以改变用途。通过利用不同的主机名、密码和用户来为另一个应用配置MySQL数据库。…

git命令行打patch

在 Git 里打 patch&#xff08;补丁&#xff09;其实就是把某些提交的改动导出来&#xff0c;生成一个 .patch 文件&#xff0c;方便别人用 git apply 或 git am 打进代码里。&#x1f539; 常用方式1. 基于提交导出 patch导出最近一次提交&#xff1a;git format-patch -1 HEA…

文华财经多空提示指标公式 变色K线多空明确指标 文华wh6赢顺多空买卖提示指标

XX:240C;YY:MA(C,1);A1:POW(XX,2)/360-POW(YY,2)/260;A5:EMA2(EMA2(A1,20),5),LINETHICK2;A6:A5*0.9999,COLORSTICK;A20:EMA2(EMA2(A5,20),5),LINETHICK2;A60:EMA2(EMA2(A20,20),5),LINETHICK2;支撑:HHV(A5,30),COLORRED;天数:BARSSINCE(A5HHV(A5,0));YL:REF(A5,1)2.79-天数*0.…

记录一个防重Toast

当我们已经对某个按钮做了防暴力点击&#xff0c;但是依然在业务上有些复杂交互的情况&#xff0c;需要我们封装一个防重Toast。针对这类情况&#xff0c;可以直接使用下面的showDebouncedToastdata class ToastInfo(val id: Any? null,val command: MediaCommandDebouncer.M…

在线测评系统---第n天

主要完成了退出登录前后的代码的实现&#xff0c;以及题目列表的查询1.退出登录前端引入了全局前置守卫&#xff0c;如果cookie里面没有token则直接跳转到login页面&#xff1b;有则直接跳转到layout页面&#xff0c;无需重新登录后端接收到退出登录&#xff0c;将token置为无效…

机器学习从入门到精通 - 卷积神经网络(CNN)实战:图像识别模型搭建指南

机器学习从入门到精通 - 卷积神经网络(CNN)实战&#xff1a;图像识别模型搭建指南 各位&#xff0c;是不是觉得那些能认出照片里是猫还是狗、是停车标志还是绿灯的AI酷毙了&#xff1f;今天咱们就撸起袖子&#xff0c;亲手搭建一个这样的图像识别模型&#xff01;别担心不需要你…

python sqlalchemy模型的建立

SQLAlchemy 是一个功能强大的 Python SQL 工具包和对象关系映射&#xff08;ORM&#xff09;库&#xff0c;用于管理和操作关系数据库。它为 Python 开发者提供了一种用 Python 对象来运行和管理 SQL 数据库的方式。 目录 SQLAlchemy 的两个核心组成部分 SQLAlchemy 的主要功…

Rust中使用RocksDB索引进行高效范围查询的实践指南

在当今海量数据处理场景下,高效的范围查询能力成为许多系统的关键需求。RocksDB作为一款高性能的嵌入式键值存储引擎,其独特的LSM树结构和索引设计为范围查询提供了底层支持。本文将深入探讨如何在Rust中利用RocksDB的特性来实现高效范围查询,从键的设计原则到迭代器的工程实…

怎么做到这一点:让 Agent 可以像人类一样 边听边想、边说,而不是“等一句话 → 一次性返回”

要实现“边听边想、边说”&#xff0c;核心是把整条链路做成全双工、分片流式、可中断的流水线&#xff1a; ASR 连续吐字 →&#xff08;短缓冲&#xff09;→ LLM 连续出 token&#xff08;可抢断&#xff09;→ TTS 连续合成并播放&#xff08;可打断/续播&#xff09;。 下…

Ubuntu 22.04 网络服务安装配置

Ubuntu 22.04 网络服务安装配置 一键安装所有服务 # 更新系统 sudo apt update# 安装所有服务 sudo apt install -y openssh-server vsftpd telnetd inetutils-inetd ftp telnet# 启动所有服务 sudo systemctl start ssh vsftpd inetutils-inetd sudo systemctl enable ssh vsf…

【Unity知识分享】Unity实现全局监听键鼠调用

1、实现该功能前&#xff0c;优先学习Unity接入dll调用Window系统接口教程 【Unity知识分享】Unity接入dll调用Window系统接口 2、初始化动态连接库后&#xff0c;进行脚本功能实现 2.1 创建脚本KeyBoardHook.h和KeyBoardHook.cpp&#xff0c;实现功能如下 KeyBoardHook.h …

深度学习篇---MNIST:手写数字数据集

下面我将详细介绍使用 PyTorch 处理 MNIST 手写数字数据集的完整流程&#xff0c;包括数据加载、模型定义、训练和评估&#xff0c;并解释每一行代码的含义和注意事项。整个流程可以分为五个主要步骤&#xff1a;准备工作、数据加载与预处理、模型定义、模型训练和模型评估。# …

k8s集群搭建(二)-------- 集群搭建

安装 containerd 需要在集群内的每个节点上都安装容器运行时&#xff08;containerd runtime&#xff09;&#xff0c;这个软件是负责运行容器的软件。 1. 启动 ipv4 数据包转发 # 设置所需的 sysctl 参数&#xff0c;参数在重新启动后保持不变 cat <<EOF | sudo tee …

【Docker】P1 前言:容器化技术发展之路

目录容器发展之路物理服务器时代&#xff1a;一机一应用的局限虚拟化时代&#xff1a;突破与局限并存容器化时代&#xff1a;轻量级的革新技术演进的价值体现各位&#xff0c;欢迎来到容器化时代。 容器发展之路 现代业务的核心是应用程序&#xff08;Application&#xff09;…

WPF依赖属性和依赖属性的包装器:

依赖属性是WPF&#xff08;Windows Presentation Foundation&#xff09;中的一种特殊类型的属性&#xff0c;特别适用于内存使用优化和属性值继承。依赖属性的定义包括以下几个步骤&#xff1a; 使用 DependencyProperty.Register 方法注册依赖属性。 该方法需要四个参数&…

图生图算法

图生图算法研究细分&#xff1a;技术演进、应用与争议 1. 基于GAN的传统图生图方法 定义&#xff1a;利用生成对抗网络&#xff08;GAN&#xff09;将输入图像转换为目标域图像&#xff08;如语义图→照片、草图→彩图&#xff09;。关键发展与趋势&#xff1a; Pix2Pix&#…