1.StreamGraph图:

        StreamGraph是Flink流处理作业的第一个计算调度流图,它是从用户编写的 DataStream API程序转换而来的逻辑图。StreamGraph由StreamNode与StreamEdge组成,StreamNode为记录数据处理的节点,StreamEdge为连接两个StreamNode的边。

StreamGraph图解:

2.StreamNode节点:

        StreamNode是StreamGraph中的基本构建单元,代表了数据处理逻辑中的一个独立操作或阶段。每个StreamNode对应DataStream API中的一个转换操作(Transformation)。

StreamNode图解:

        StreamNode主要封装了算子操作逻辑StreamOperator和其SimpleOperatorFactory,算子的入边与出边集合、还记录SlotSharingGroup与ColocationGroup的等配置信息。

StreamNode完整源码:

public class StreamNode {private final int id;//并行度private int parallelism;/*** Maximum parallelism for this stream node. The maximum parallelism is the upper limit for* dynamic scaling and the number of key groups used for partitioned state.*/private int maxParallelism;private ResourceSpec minResources = ResourceSpec.DEFAULT;private ResourceSpec preferredResources = ResourceSpec.DEFAULT;private final Map<ManagedMemoryUseCase, Integer> managedMemoryOperatorScopeUseCaseWeights =new HashMap<>();private final Set<ManagedMemoryUseCase> managedMemorySlotScopeUseCases = new HashSet<>();private long bufferTimeout;private final String operatorName;private String operatorDescription;//slotSharingGroup与coLocationGroup配置private @Nullable String slotSharingGroup;private @Nullable String coLocationGroup;private KeySelector<?, ?>[] statePartitioners = new KeySelector[0];private TypeSerializer<?> stateKeySerializer;//封装算子的StreamOperatorFactoryprivate StreamOperatorFactory<?> operatorFactory;private TypeSerializer<?>[] typeSerializersIn = new TypeSerializer[0];private TypeSerializer<?> typeSerializerOut;//算子的入边与出边private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();private final Class<? extends TaskInvokable> jobVertexClass;private InputFormat<?, ?> inputFormat;private OutputFormat<?> outputFormat;private String transformationUID;private String userHash;private final Map<Integer, StreamConfig.InputRequirement> inputRequirements = new HashMap<>();private @Nullable IntermediateDataSetID consumeClusterDatasetId;private boolean supportsConcurrentExecutionAttempts = true;private boolean parallelismConfigured = false;@VisibleForTestingpublic StreamNode(Integer id,@Nullable String slotSharingGroup,@Nullable String coLocationGroup,StreamOperator<?> operator,String operatorName,Class<? extends TaskInvokable> jobVertexClass) {this(id,slotSharingGroup,coLocationGroup,SimpleOperatorFactory.of(operator),operatorName,jobVertexClass);}public StreamNode(Integer id,@Nullable String slotSharingGroup,@Nullable String coLocationGroup,StreamOperatorFactory<?> operatorFactory,String operatorName,Class<? extends TaskInvokable> jobVertexClass) {this.id = id;this.operatorName = operatorName;this.operatorDescription = operatorName;this.operatorFactory = operatorFactory;this.jobVertexClass = jobVertexClass;this.slotSharingGroup = slotSharingGroup;this.coLocationGroup = coLocationGroup;}public void addInEdge(StreamEdge inEdge) {checkState(inEdges.stream().noneMatch(inEdge::equals),"Adding not unique edge = %s to existing inEdges = %s",inEdge,inEdges);if (inEdge.getTargetId() != getId()) {throw new IllegalArgumentException("Destination id doesn't match the StreamNode id");} else {inEdges.add(inEdge);}}public void addOutEdge(StreamEdge outEdge) {checkState(outEdges.stream().noneMatch(outEdge::equals),"Adding not unique edge = %s to existing outEdges = %s",outEdge,outEdges);if (outEdge.getSourceId() != getId()) {throw new IllegalArgumentException("Source id doesn't match the StreamNode id");} else {outEdges.add(outEdge);}}public List<StreamEdge> getOutEdges() {return outEdges;}public List<StreamEdge> getInEdges() {return inEdges;}public List<Integer> getOutEdgeIndices() {List<Integer> outEdgeIndices = new ArrayList<Integer>();for (StreamEdge edge : outEdges) {outEdgeIndices.add(edge.getTargetId());}return outEdgeIndices;}public List<Integer> getInEdgeIndices() {List<Integer> inEdgeIndices = new ArrayList<Integer>();for (StreamEdge edge : inEdges) {inEdgeIndices.add(edge.getSourceId());}return inEdgeIndices;}public int getId() {return id;}public int getParallelism() {return parallelism;}public void setParallelism(Integer parallelism) {setParallelism(parallelism, true);}void setParallelism(Integer parallelism, boolean parallelismConfigured) {this.parallelism = parallelism;this.parallelismConfigured =parallelismConfigured && parallelism != ExecutionConfig.PARALLELISM_DEFAULT;}/*** Get the maximum parallelism for this stream node.** @return Maximum parallelism*/int getMaxParallelism() {return maxParallelism;}/*** Set the maximum parallelism for this stream node.** @param maxParallelism Maximum parallelism to be set*/void setMaxParallelism(int maxParallelism) {this.maxParallelism = maxParallelism;}public ResourceSpec getMinResources() {return minResources;}public ResourceSpec getPreferredResources() {return preferredResources;}public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) {this.minResources = minResources;this.preferredResources = preferredResources;}public void setManagedMemoryUseCaseWeights(Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights,Set<ManagedMemoryUseCase> slotScopeUseCases) {managedMemoryOperatorScopeUseCaseWeights.putAll(operatorScopeUseCaseWeights);managedMemorySlotScopeUseCases.addAll(slotScopeUseCases);}public Map<ManagedMemoryUseCase, Integer> getManagedMemoryOperatorScopeUseCaseWeights() {return Collections.unmodifiableMap(managedMemoryOperatorScopeUseCaseWeights);}public Set<ManagedMemoryUseCase> getManagedMemorySlotScopeUseCases() {return Collections.unmodifiableSet(managedMemorySlotScopeUseCases);}public long getBufferTimeout() {return bufferTimeout;}public void setBufferTimeout(Long bufferTimeout) {this.bufferTimeout = bufferTimeout;}@VisibleForTestingpublic StreamOperator<?> getOperator() {return (StreamOperator<?>) ((SimpleOperatorFactory) operatorFactory).getOperator();}public StreamOperatorFactory<?> getOperatorFactory() {return operatorFactory;}public String getOperatorName() {return operatorName;}public String getOperatorDescription() {return operatorDescription;}public void setOperatorDescription(String operatorDescription) {this.operatorDescription = operatorDescription;}public void setSerializersIn(TypeSerializer<?>... typeSerializersIn) {checkArgument(typeSerializersIn.length > 0);// Unfortunately code above assumes type serializer can be null, while users of for example// getTypeSerializersIn would be confused by returning an array size of two with all// elements set to null...this.typeSerializersIn =Arrays.stream(typeSerializersIn).filter(typeSerializer -> typeSerializer != null).toArray(TypeSerializer<?>[]::new);}public TypeSerializer<?>[] getTypeSerializersIn() {return typeSerializersIn;}public TypeSerializer<?> getTypeSerializerOut() {return typeSerializerOut;}public void setSerializerOut(TypeSerializer<?> typeSerializerOut) {this.typeSerializerOut = typeSerializerOut;}public Class<? extends TaskInvokable> getJobVertexClass() {return jobVertexClass;}public InputFormat<?, ?> getInputFormat() {return inputFormat;}public void setInputFormat(InputFormat<?, ?> inputFormat) {this.inputFormat = inputFormat;}public OutputFormat<?> getOutputFormat() {return outputFormat;}public void setOutputFormat(OutputFormat<?> outputFormat) {this.outputFormat = outputFormat;}public void setSlotSharingGroup(@Nullable String slotSharingGroup) {this.slotSharingGroup = slotSharingGroup;}@Nullablepublic String getSlotSharingGroup() {return slotSharingGroup;}public void setCoLocationGroup(@Nullable String coLocationGroup) {this.coLocationGroup = coLocationGroup;}public @Nullable String getCoLocationGroup() {return coLocationGroup;}public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) {return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null)|| (slotSharingGroup != null&& slotSharingGroup.equals(downstreamVertex.slotSharingGroup));}@Overridepublic String toString() {return operatorName + "-" + id;}public KeySelector<?, ?>[] getStatePartitioners() {return statePartitioners;}public void setStatePartitioners(KeySelector<?, ?>... statePartitioners) {checkArgument(statePartitioners.length > 0);this.statePartitioners = statePartitioners;}public TypeSerializer<?> getStateKeySerializer() {return stateKeySerializer;}public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer) {this.stateKeySerializer = stateKeySerializer;}public String getTransformationUID() {return transformationUID;}void setTransformationUID(String transformationId) {this.transformationUID = transformationId;}public String getUserHash() {return userHash;}public void setUserHash(String userHash) {this.userHash = userHash;}public void addInputRequirement(int inputIndex, StreamConfig.InputRequirement inputRequirement) {inputRequirements.put(inputIndex, inputRequirement);}public Map<Integer, StreamConfig.InputRequirement> getInputRequirements() {return inputRequirements;}public Optional<OperatorCoordinator.Provider> getCoordinatorProvider(String operatorName, OperatorID operatorID) {if (operatorFactory instanceof CoordinatedOperatorFactory) {return Optional.of(((CoordinatedOperatorFactory) operatorFactory).getCoordinatorProvider(operatorName, operatorID));} else {return Optional.empty();}}boolean isParallelismConfigured() {return parallelismConfigured;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}StreamNode that = (StreamNode) o;return id == that.id;}@Overridepublic int hashCode() {return id;}@Nullablepublic IntermediateDataSetID getConsumeClusterDatasetId() {return consumeClusterDatasetId;}public void setConsumeClusterDatasetId(@Nullable IntermediateDataSetID consumeClusterDatasetId) {this.consumeClusterDatasetId = consumeClusterDatasetId;}public boolean isSupportsConcurrentExecutionAttempts() {return supportsConcurrentExecutionAttempts;}public void setSupportsConcurrentExecutionAttempts(boolean supportsConcurrentExecutionAttempts) {this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;}
}

3.StreamEdge边:

        StreamEdge是StreamGraph中连接各个StreamNode的边,它定义了数据在StreamNode节点之间的流动方式和分区策略。

StreamEdge图解:

        StreamEdge主要封装了StreamEdge连接的前后StreamNode的id,并记录确定分区选择的Partitioner。

StreamEdge源码:

public class StreamEdge implements Serializable {private static final long serialVersionUID = 1L;private static final long ALWAYS_FLUSH_BUFFER_TIMEOUT = 0L;private final String edgeId;//封装前后StreamNode节点的idprivate final int sourceId;private final int targetId;/*** Note that this field doesn't have to be unique among all {@link StreamEdge}s. It's enough if* this field ensures that all logical instances of {@link StreamEdge} are unique, and {@link* #hashCode()} are different and {@link #equals(Object)} returns false, for every possible pair* of {@link StreamEdge}. Especially among two different {@link StreamEdge}s that are connecting* the same pair of nodes.*/private final int uniqueId;/** The type number of the input for co-tasks. */private final int typeNumber;/** The side-output tag (if any) of this {@link StreamEdge}. */private final OutputTag outputTag;//分区器 StreamPartitioner/** The {@link StreamPartitioner} on this {@link StreamEdge}. */private StreamPartitioner<?> outputPartitioner;/** The name of the operator in the source vertex. */private final String sourceOperatorName;/** The name of the operator in the target vertex. */private final String targetOperatorName;private final StreamExchangeMode exchangeMode;private long bufferTimeout;private boolean supportsUnalignedCheckpoints = true;private final IntermediateDataSetID intermediateDatasetIdToProduce;public StreamEdge(StreamNode sourceVertex,StreamNode targetVertex,int typeNumber,StreamPartitioner<?> outputPartitioner,OutputTag outputTag) {this(sourceVertex,targetVertex,typeNumber,ALWAYS_FLUSH_BUFFER_TIMEOUT,outputPartitioner,outputTag,StreamExchangeMode.UNDEFINED,0,null);}public StreamEdge(StreamNode sourceVertex,StreamNode targetVertex,int typeNumber,StreamPartitioner<?> outputPartitioner,OutputTag outputTag,StreamExchangeMode exchangeMode,int uniqueId,IntermediateDataSetID intermediateDatasetId) {this(sourceVertex,targetVertex,typeNumber,sourceVertex.getBufferTimeout(),outputPartitioner,outputTag,exchangeMode,uniqueId,intermediateDatasetId);}public StreamEdge(StreamNode sourceVertex,StreamNode targetVertex,int typeNumber,long bufferTimeout,StreamPartitioner<?> outputPartitioner,OutputTag outputTag,StreamExchangeMode exchangeMode,int uniqueId,IntermediateDataSetID intermediateDatasetId) {this.sourceId = sourceVertex.getId();this.targetId = targetVertex.getId();this.uniqueId = uniqueId;this.typeNumber = typeNumber;this.bufferTimeout = bufferTimeout;this.outputPartitioner = outputPartitioner;this.outputTag = outputTag;this.sourceOperatorName = sourceVertex.getOperatorName();this.targetOperatorName = targetVertex.getOperatorName();this.exchangeMode = checkNotNull(exchangeMode);this.intermediateDatasetIdToProduce = intermediateDatasetId;this.edgeId =sourceVertex+ "_"+ targetVertex+ "_"+ typeNumber+ "_"+ outputPartitioner+ "_"+ uniqueId;}public int getSourceId() {return sourceId;}public int getTargetId() {return targetId;}public int getTypeNumber() {return typeNumber;}public OutputTag getOutputTag() {return this.outputTag;}public StreamPartitioner<?> getPartitioner() {return outputPartitioner;}public StreamExchangeMode getExchangeMode() {return exchangeMode;}public void setPartitioner(StreamPartitioner<?> partitioner) {this.outputPartitioner = partitioner;}public void setBufferTimeout(long bufferTimeout) {checkArgument(bufferTimeout >= -1);this.bufferTimeout = bufferTimeout;}public long getBufferTimeout() {return bufferTimeout;}public void setSupportsUnalignedCheckpoints(boolean supportsUnalignedCheckpoints) {this.supportsUnalignedCheckpoints = supportsUnalignedCheckpoints;}public boolean supportsUnalignedCheckpoints() {return supportsUnalignedCheckpoints;}@Overridepublic int hashCode() {return Objects.hash(edgeId, outputTag);}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}StreamEdge that = (StreamEdge) o;return Objects.equals(edgeId, that.edgeId) && Objects.equals(outputTag, that.outputTag);}@Overridepublic String toString() {return "("+ (sourceOperatorName + "-" + sourceId)+ " -> "+ (targetOperatorName + "-" + targetId)+ ", typeNumber="+ typeNumber+ ", outputPartitioner="+ outputPartitioner+ ", exchangeMode="+ exchangeMode+ ", bufferTimeout="+ bufferTimeout+ ", outputTag="+ outputTag+ ", uniqueId="+ uniqueId+ ')';}public IntermediateDataSetID getIntermediateDatasetIdToProduce() {return intermediateDatasetIdToProduce;}
}

        本文是对StreamGraph的解释与补充,完整StreamGraph创建源码见《Flink-1.19.0源码详解4-StreamGraph生成》。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/87249.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/87249.shtml
英文地址,请注明出处:http://en.pswp.cn/web/87249.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux系统---Nginx反向代理与缓存功能

目录 正向代理和反向代理 正向代理的作用 反向代理可实现的功能 反向代理客户端ip透传 1.初始访问192.168.235.139 结果 2.编辑代理服务器的配置文件 3、重载nginx服务 4、访问代理服务器 实现反向代理负载均衡 1.先启用已用另一台服务端 2.使用192.168.235.140 …

U+平台配置免密登录、安装Hadoop配置集群、Spark配置

文章目录 1、免密登录2、安装hadoop3、Spark配置 具体详细报告见资源部分&#xff0c;全部实验内容已经上传&#xff0c;如有需要请自行下载。 1、免密登录 使用的配置命令&#xff1a; cd ~/.ssh/ssh-keygen -t rsaEnter键回车y回车回车出现如上所示 cat ./id_rsa.pub >…

GitHub vs GitLab 全面对比报告(2025版)

从技术架构到金融估值&#xff0c;深度解析两大代码托管平台的差异化竞争策略 一、技术架构对比 维度GitHub (Microsoft旗下)GitLab (独立上市公司)关键差异核心架构- 分布式Git仓库 Issues/Projects- 全栈DevSecOps平台GitLab集成CI/CD、安全、监控部署模式- SaaS为主 - Git…

Python 数据分析与可视化 Day 14 - 建模复盘 + 多模型评估对比(逻辑回归 vs 决策树)

✅ 今日目标 回顾整个本周数据分析 & 建模流程学会训练第二种模型&#xff1a;决策树&#xff08;Decision Tree&#xff09;掌握多模型对比评估的方法与实践输出综合对比报告&#xff1a;准确率、精确率、召回率、F1 等指标为后续模型调优与扩展打下基础 &#x1fa9c; 一…

本周大模型新动向:KV缓存混合精度量化、个体时空行为生成、个性化问答

点击蓝字 关注我们 AI TIME欢迎每一位AI爱好者的加入&#xff01; 01 KVmix: Gradient-Based Layer Importance-Aware Mixed-Precision Quantization for KV Cache 大型语言模型&#xff08;LLMs&#xff09;在推理过程中&#xff0c;键值&#xff08;KV&#xff09;缓存的高内…

在 Spring Boot 中使用 WebMvcConfigurer

WebMvcConfigurer 是 Spring MVC 提供的一个扩展接口&#xff0c;用于配置 Spring MVC 的各种功能。在 Spring Boot 应用中&#xff0c;通过实现 WebMvcConfigurer 接口&#xff0c;可以定制和扩展默认的 Spring MVC 配置。以下是对 WebMvcConfigurer 的详细解析及其常见用法。…

w-笔记:uni-app的H5平台和非H5平台的拍照识别功能:

uni-app的H5平台和非H5平台的拍照识别功能&#xff1a; <template><view class"humanVehicleBinding"><view v-if"warn" class"shadow"></view><view class"header"><uni-nav-bar left-icon"l…

TCP 半连接队列和全连接队列(结合 Linux 2.6.32 内核源码分析)

文章目录 一、什么是 TCP 半连接队列和全连接队列二、TCP 全连接队列1、如何查看进程的 TCP 全连接队列大小&#xff1f;注意 2、TCP 全连接队列溢出问题注意 3、TCP 全连接队列最大长度 三、TCP 半连接队列1、TCP 半连接队列溢出问题2、TCP 半连接队列最大长度3、引申问题 一、…

linux下fabric环境搭建

参考教程&#xff1a; https://devpress.csdn.net/cloudnative/66d58e702045de334a569db3.html?dp_tokeneyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpZCI6MjA2MzY4NywiZXhwIjoxNzQwMzY4MDc0LCJpYXQiOjE3Mzk3NjMyNzQsInVzZXJuYW1lIjoiaHVhbmd0dXBpIn0.oh8e4F6Sw_A4SV2ODQ5W0pYK0…

Redis Pipeline介绍:提高操作Redis数据库的执行效率

Redis Pipeline是一种用于提高Redis执行效率的技术&#xff0c;通过减少客户端与服务器之间的通信开销&#xff0c;显著提升批量操作的性能。本文将详细介绍Redis Pipeline的概念、使用场景、实现方式及其优势。 一、Redis Pipeline的概念 Redis Pipeline是一种批处理机制&am…

linux长时间锁屏无法唤醒

是的&#xff0c;您这么理解很直接&#xff0c;抓住了要点。 简单来说&#xff0c;就是这样&#xff1a; 电脑睡觉有两种方式&#xff1a; 打个盹&#xff08;挂起/Suspend&#xff09;&#xff1a; 把工作状态保存在内存里。这个一般和 Swap 分区没关系。睡死过去&#xff…

STM32F103_Bootloader程序开发11 - 实现 App 安全跳转至 Bootloader

导言 想象一下&#xff0c;我们的单片机 App 正在稳定地运行着&#xff0c;突然我们想给它升级一下&#xff0c;添加个新功能。我们该如何安全地通知它&#xff1a;“嘿&#xff0c;准备好接收新固件了” ? 这就需要 App 和 Bootloader 之间建立一个可靠的"秘密握手"…

Explain解释

参考官方文档&#xff1a;https://dev.mysql.com/doc/refman/5.7/en/explain-output.html explain关键字可以分析你的查询语句的结构和性能。 explain select查询&#xff0c; 执行会返回执行计划的信息。 注意&#xff1a;如果from中有子查询&#xff0c;仍然会执行该子查询…

选择 PDF 转 HTML 转换器的 5 个关键特性

市面上有很多 PDF 转 HTML 的转换器&#xff0c;每一款产品都有不同的功能组合。要理清并理解每个功能可能会让人感到困惑。那么&#xff0c;真正重要的是什么呢&#xff1f; 这篇文章将介绍我们认为在选择最佳 PDF 转 HTML 转换器时最重要的 5 个关键特性&#xff1a; 1. 转换…

使用堡塔在服务器上部署宝塔面板-linux版

使用堡塔在服务器上部署宝塔面板-linux版 使用堡塔多机管理登录服务器 进入宝塔官网&#xff0c;获取安装脚本 wget -O install_panel.sh https://download.bt.cn/install/install_panel.sh && sudo bash install_panel.sh ed8484bec3. 在堡塔多机管理中&#xff0c;…

【Unity高级】Unity多界面游戏场景管理方案详解

引言&#xff1a;游戏界面管理的挑战 在Unity游戏开发中&#xff0c;尤其是包含多个功能界面&#xff08;如主菜单、关卡选择、游戏页面、设置和商城&#xff09;的游戏&#xff0c;如何高效管理场景与界面是架构设计的核心挑战。本文将深入探讨三种主流实现方案&#xff1a;单…

WINDOWS最快布署WEB服务器:apache2

安装JDK下载 https://tomcat.apache.org/ Index of /dist/tomcat/tomcat-9 安装测试 http://localhost:8080/ 替换自己的文件 把自己的文件复制到&#xff1a; C:\Program Files\Apache Software Foundation\Tomcat 9.0\webapps\ROOT

Microsoft Edge 打开无反应、打开后显示兼容性问题、卸载重装 解决方案。一键卸载Microsoft Edge 。

背景&#xff1a;网络上的浏览器修复、重装、恢复默认应用测试后无用&#xff0c;以下卸载重装方案经实测可以正常使用Microsoft Edg。 卸载软件在资源里&#xff0c;请自取。 一、卸载软件&#xff1a;Remove-Edge_GUI.exe 双击卸载等待即可。 二、在微软商店重新安装Micro…

Spring Boot - 参数校验:分组校验、自定义注解、嵌套对象全解析

01 依赖配置 在构建高效的校验体系前&#xff0c;需先完善项目依赖配置。 以下是优化后的依赖示例&#xff1a; <dependencies><!-- Web 依赖&#xff0c;提供 RESTful 接口支持 --><dependency><groupId>org.springframework.boot</groupId>…

深入浅出多模态》(十一)之多模态经典模型:Flamingo系列

&#x1f389;AI学习星球推荐&#xff1a; GoAI的学习社区 知识星球是一个致力于提供《机器学习 | 深度学习 | CV | NLP | 大模型 | 多模态 | AIGC 》各个最新AI方向综述、论文等成体系的学习资料&#xff0c;配有全面而有深度的专栏内容&#xff0c;包括不限于 前沿论文解读、…