下载 1.15.1

https://flink.apache.org/downloads.html#apache-flink-1151

部署模式分类

  • 会话模式
  • 应用模式
  • 单作业模式
1、会话模式

先启动一个集群,保持一个会话,然后通过客户端提交作业,所有作业都在一个会话执行;

会话模式适合规模小、执行时间短的大量作业;

2、应用模式

前两种模式应用代码都是在客户端运行,然后由客户端提交给jobmanager的,这种方式的弊端是:需要占用大量网络带宽,去下载依赖和把二进制数据发送给jobmanager,将会加重客户端资源消耗。
所以Application Mode的解决办法是:不需要客户端,直接把应用提交到jobmanager上运行,这意味着要为每个提交的应用单独启动一个jobmanager,也就是创建一个集群,

jobmanager执行完自己的应用将会关闭

应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由 JobManager 执行应用程序的,即使应用包含了多个作业,也只创建一个集群。此模式用的比较少,

3、单作业模式

为每个作业启动一个集群,只要客户端提交了一个作业,就为这个作业启动一个单独的集群,这个集群只为这个作业提供服务;其

一、独立会话模式(Standalone)-部署

flink只支持linux部署

1、解压

tar -zvxf flink-1.15.1-bin-scala_2.12.tgz

2、修改配置文件

vim conf/flink-conf.yaml 
# 修改以下内容
jobmanager.rpc.address: 192.168.31.250 # 选择当前主机的ip地址,如果是云服务器,使用外网ip
# JobManager将绑定到的主机接口,默认值为 localhost 禁止外部访问,设为0.0.0.0表示允许外部访问,设置错误的话 Available Task Slots 会显示0
jobmanager.bind-host: 0.0.0.0
# 任务插槽数量,相当于使用多少个线程来执行流
taskmanager.numberOfTaskSlots: 2 
parallelism.default: 1web.submit.enable: true
# 指定TaskManager主机的地址,单机部署的话,用localhost即可
taskmanager.host: 192.168.31.250
# web前端展示的端口,自己设置
rest.port: 8081   
# 客户端应该用来连接到服务器的地址。注意:仅当高可用性配置为 NONE 时才考虑此选项
rest.address: 192.168.31.250
# 允许外部ip访问的地址,默认情况下是localhost,只能内部访问,改为0.0.0.0允许所有外部ip访问
rest.bind-address: 0.0.0.0

3、修改master文件,

vim conf/masters # 填写主节点的ip地址,如果是云服务器,使用外网ip
192.168.31.250:8081

4、修改 workers 文件

vim  conf/workers# 添加 taskManager 节点的ip地址列表,如果是单节点,只填写主节点ip地址即可
192.168.31.250
192.168.31.251
192.168.31.252

5、、启动

bin/start-cluster.sh

启动成功后,命令行会显示如下信息

[root@dev-server bin]# ./start-cluster.sh 
Starting cluster.  # 启动集群
Starting standalonesession daemon on host dev-server.  # 启动会话模式的 作业调度器 jobmanager
Starting taskexecutor daemon on host dev-server. # 启动任务管理器

通过jps命令可以看到已经启动的flink

[root@dev-server bin]# jps
3010991 TaskManagerRunner   # 任务调度器 taskManager
3010438 StandaloneSessionClusterEntrypoint   # 会话模式的节点
3023395 Jps

说明:

  1. JobManager 的启动代码:standalonesession,实现类是:StandaloneSessionClusterEntrypoint
  2. TaskManager 的启动代码:taskexecutor,实现类是:TaskManagerRunner

6、、访问ui界面

http://192.168.31.250:8081

7、、停止flink

bin/stop-cluster.sh

二、提交作业

1、编写作业代码
新建maven项目,pom.xml 加入flink的依赖

<properties><java.version>1.8</java.version><scala-binary-version>2.12</scala-binary-version><flink-version>1.13.0</flink-version><slf4j-version>1.7.30</slf4j-version></properties><dependencies>
<!--        flink 依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala-binary-version}</artifactId><version>${flink-version}</version></dependency>
<!--        flink 客户端,主要做一些管理相关的工作,如果不需要,就不需要导入此依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala-binary-version}</artifactId><version>${flink-version}</version></dependency><!--        日志相关依赖--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j-version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j-version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency></dependencies>

2、编写java代码

package com.demo;/*** @author yexd*/import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @title: 无界流处理* @Author yexd* @Date: 2022/8/7 20:10* @Version 1.0*/
public class UnboundedStreamWord {static String ip = "192.168.31.250";static int port = 9879;/*** 先将文件中的每一行进行分词,然后统计每个单词出现的次数* @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 读取网络流,在linux系统输入命令 : nc -lk  8888  后,就可以进行通讯了,-lk表示保持当前的连接并持续监听8888端口DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip,port);// 将每行数据根据空格切割后进行分词,转换成二元组, FlatMapOperator<输入的数据类型, 输出的数据类型>SingleOutputStreamOperator<Tuple2<String, Long>> operator = stringDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {// 将每行进行切割String[] words = line.split(" ");for (String word : words) {// 将每个单词转换成二元组进行输出,其中第一个 word 表示单词本身, 1L表示每个单词出现的次数,后面会用这个次数来进行统计单词出现的总数out.collect(Tuple2.of(word, 1L));}});// 返回分词后的结果,FlatMapOperator<输入的数据类型, 输出的数据类型>SingleOutputStreamOperator<Tuple2<String, Long>> returns = operator.returns(Types.TUPLE(Types.STRING, Types.LONG));// 按照分词进行分组,keyBy 参数中的 f0 表示根据第几个字段进行分组(从0开始), 很明显,Tuple2的第一个字段是String类型,也就是刚刚分好词后的单词KeyedStream<Tuple2<String, Long>, Object> tuple2UnsortedGrouping = returns.keyBy(data -> data.f0);// 分组内进行聚合统计,sum 中的参数1 表示根据第几个属性进行统计,Tuple2<String, Long> 很明显第二个属性是Long,在上面我们将这个属性都置为1了,所以会进行统计SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2UnsortedGrouping.sum(1);// 打印sum.print();// 启动执行executionEnvironment.execute();/**打印结果:4> (123,1)5> (hello,1)15> (456,1)5> (hello,2)4> (123,2)5> (hello,3)说明: 大于号前面的数字表示 线程的编号,表示使用不同的线程进行处理,也就是并行流*/}
}

3、打包,通过以下命令将项目打成 jar 包

maven clean package

3、添加作业
在页面中选择 Submit New Job -> Add New ,

选择刚刚打好的jar包

上传后点击jar的名称,有些信息需要填写一下

说明:

  • Entry Class : jar包中 main 方法所在类的全类名
  • Parallelism : 并行度,就是用多线程去执行作业,调成多少就用多少个线程执行作业
  • Program Arguments : 传入main 方法的参数,多个参数用空格隔开
  • Savepoint Path :保存点路径,比如你作业执行到一半,但是flink服务器需要重启,就会先暂停作业,然后将执行到一半的作业保存起来,待重启后继续执行,这里配置就是保存的路径;如果不需要保存,为空就行

4、提交之前的改动
因为在java代码里面用的无界流处理,也就是说,数据是通过 socket 网络传输的,如果不先启动监听的话,现在盲目提交就会导致报错,而我的代码里监听了 192.168.31.250 的 9879端口, 所以需要在 192.168.31.250 的服务器上输入以下命令来监听 9879 的端口

# -lk表示保持当前的连接并持续监听9879端口
nc -lk  9879

5、提交

以下是我的配置,然后点击 Submit 就可以提交了

提交后 一次点击左边的菜单栏 Jobs -> Running Jobs ,就可以可以看到刚刚提交的任务了,点进去看看

说明:

  • 绿色的RUNNING 表示正在运行中,如果是红色的字体,就表示有错误
  • RUNNING旁边绿色的 2 表示并行度,表示有2个线程执行这个作业
  • 底部表格展示的是运行的时长、数据流大小、任务数量等信息
  • Cancel Job : 可通过此按钮来停止作业

6、往flink发送消息
刚刚启动了 linux 监听了 9879 端口,发送了2条信息

然后依次点击 TaskManager -> 任务id

最后点击 Stout 就可以看到输入的内容了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/91011.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/91011.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/91011.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis数据量过大的隐患:查询会变慢吗?如何避免?

一、Redis数据过多引发的五大隐患&#xff08;附系统交互图&#xff09; #mermaid-svg-X83bpHUu830QXKUt {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-X83bpHUu830QXKUt .error-icon{fill:#552222;}#mermaid-svg-…

网络与信息安全有哪些岗位:(3)安全运维工程师

安全运维工程师是企业安全防线的 “日常守护者”&#xff0c;既要确保安全设备与系统的稳定运行&#xff0c;又要实时监控潜在威胁&#xff0c;快速响应并处置安全事件&#xff0c;是连接安全技术与业务运营的关键角色。其核心价值在于通过常态化运维&#xff0c;将安全风险控制…

鱼皮项目简易版 RPC 框架开发(三)

本文为笔者阅读鱼皮的项目 《简易版 RPC 框架开发》的笔记&#xff0c;如果有时间可以直接去看原文&#xff0c; 1. 简易版 RPC 框架开发 前面的内容可以笔者的前面两个篇笔记 鱼皮项目简易版 RPC 框架开发&#xff08;一&#xff09; 鱼皮项目简易版 RPC 框架开发&#xff08;…

嵌入式Linux:注册线程清理处理函数

在 Linux 多线程编程中&#xff0c;线程终止时可以执行特定的清理操作&#xff0c;通过注册线程清理函数&#xff08;thread cleanup handler&#xff09;来实现。这类似于使用 atexit() 注册进程终止处理函数。线程清理函数用于在线程退出时执行一些资源释放或清理工作&#x…

【Git】Linux-ubuntu 22.04 初步认识 -> 安装 -> 基础操作

文章目录Git 初识Git 安装Linux-centosLinux-ubuntuWindowsGit 基本操作配置 Git认识工作区、暂存区、版本库添加文件 -- 场景一查看 .git 文件添加文件 -- 场景二修改文件版本回退撤销修改情况一&#xff1a;对于工作区的代码&#xff0c;还没有 add情况二&#xff1a;已经 ad…

轻量级音乐元数据编辑器Metadata Remote

简介 什么是 Metadata Remote (mdrm) &#xff1f; Metadata Remote 是一个基于 Web 的音频元数据编辑工具&#xff0c;旨在简化在无头服务器&#xff08;即没有图形用户界面的服务器&#xff09;上编辑音频文件的元数据。用户只需使用 Docker 和浏览器&#xff0c;无需复杂的…

免费使用|共享服务器上线RTX3080(20GB显存)

共享服务器也上架GPU啦 生物信息学中有很多用到GPU的场景&#xff0c;例如我们分享过的&#xff1a;利用GPU加速TensorFlow、部署本地DeepSeek&#xff0c;空间转录组学习手册合辑加速。因此多种GPU供大家选择&#xff1a;RTX5090、4080S、5070显卡上机。为了让此前的CPU服务器…

搭建DM数据守护集群

1环境与规划准备3个kylin 10操作系统的虚拟机&#xff0c;规划IP、端口、安装目录等。说明搭建REALTIME归档模式、事务一致性的数据守护名称项初始主库机器dm1初始备库机器dm2监视器机器dmmon外部业务IP192.168.23.129192.168.23.130192.168.23.131内部心跳IP192.168.23.129192…

AUTOSAR进阶图解==>AUTOSAR_SRS_OCUDriver

AUTOSAR OCU驱动程序详解 AUTOSAR标准输出比较单元驱动程序架构与实现分析目录 1. 概述 1.1 OCU驱动程序简介1.2 功能概述 2. OCU驱动程序架构 2.1 架构图2.2 层次结构 3. OCU驱动程序组件设计 3.1 组件图3.2 接口定义 4. OCU驱动程序状态管理 4.1 状态图4.2 状态转换 5. OCU驱…

InfluxDB 与 HTTP 协议交互进阶(一)

引言 在当今数字化时代&#xff0c;数据处理的高效性和准确性成为了众多领域关注的焦点。InfluxDB 作为一款开源的时序数据库&#xff0c;凭借其高性能、易扩展等特性&#xff0c;在时间序列数据处理中占据了重要地位。而 HTTP 协议作为互联网应用层的核心协议之一&#xff0c…

NAS远程访问新解法:OMV与cpolar的技术协同价值

文章目录前言1. OMV安装Cpolar2. 配置FTP公网地址3. OMV FTP 配置4. OMV FTP远程连接前言 当家庭存储需求突破本地边界时&#xff0c;传统NAS方案往往陷入"连接困境"&#xff1a;复杂的端口转发配置、高昂的公网IP成本、以及始终存在的安全顾虑…开源解决方案OMV虽然…

vue 渲染 | 不同类型的元素渲染的方式(vue组件/htmlelement/纯 html)

省流总结&#xff1a;&#xff08;具体实现见下方&#xff09; vue 组件 ——》<component :is组件名> htmlelement 元素 ——》 ref 、★ v-for ref 或是 ★ vue 的 nextTick 纯 html 结构——》v-html 另外&#xff0c;当数据异步加载时&#xff0c;vue3中如何渲…

Charles中文版深度解析,轻松调试API与优化网络请求

在现代软件开发过程中&#xff0c;调试API、捕获HTTP/HTTPS流量以及优化网络性能是开发者不可避免的挑战。特别是在处理复杂的网络请求和验证API接口的数据传输准确性时&#xff0c;开发者需要一款强大且易于使用的工具。Charles抓包工具凭借其功能强大、界面简洁、易于操作的特…

【CF】Codeforces Round 1039 (Div. 2) E1 (二分答案求中位数)

E1. Submedians (Easy Version)题目&#xff1a;思路&#xff1a;经典不过加了点东西对于求中位数&#xff0c;我们必然要想到二分答案&#xff0c;具体的&#xff0c;对于所有大于等于 x 的数我们令其奉献为 1&#xff0c;小于的为 -1&#xff0c;如果存在某段区间的奉献和大于…

ESP32-S3学习笔记<8>:LEDC的应用

ESP32-S3学习笔记&#xff1c;8&#xff1e;&#xff1a;LEDC的应用1. 头文件包含2. LEDC的配置2.1 配置定时器2.1.1 speed_mode/设置速度模式2.1.2 duty_resolution/设置占空比分辨率2.1.3 timer_num/选择定时器2.1.4 freq_hz/设定PWM频率2.1.5 clk_cfg/选择LEDC的外设时钟源2…

网络安全第14集

前言&#xff1a;小迪安全14集&#xff0c;这集重点内容&#xff1a;0、什么是js渗透测试&#xff1f;在javascript中也存在变量和函数&#xff0c;存在可控变量和函数就有可能存在在漏洞&#xff0c;js开发的web应用和php、java开发的区别是&#xff0c;js能看得到的源代码&am…

代码随想录算法训练营第三十三天

LeetCode.62 不同路径 题目链接 不同路径 题解 class Solution {public int uniquePaths(int m, int n) {// dp表示到达ij有多少条路径int[][] dp new int[110][110];dp[1][1] 1;for(int i 0;i<m;i){dp[i][0] 1;}for(int j 0;j<n;j){dp[0][j] 1;}for(int i 1;i…

银行回单OCR识别技术原理

银行回单OCR&#xff08;光学字符识别&#xff09;技术通过结合图像处理、模式识别和自然语言处理&#xff08;NLP&#xff09;技术&#xff0c;将纸质或电子版银行回单中的非结构化文本&#xff08;如账号、金额、日期等&#xff09;转化为结构化数据。以下是其核心原理和关键…

Day22-二叉树的迭代遍历

昨天学习了递归遍历&#xff1a;递归就是一次次的把参数压入栈中&#xff0c;然后返回的时候还是上一次递归保存的参数。今天学习迭代遍历。迭代遍历就是用栈去模拟保存二叉树的节点&#xff0c;然后依次去遍历&#xff0c;只不过要注意栈的后入先出的规则。前序遍历&#xff1…

知识蒸馏 - 通过引入温度参数T调整 Softmax 的输出

知识蒸馏 - 通过引入温度参数T调整 Softmax 的输出 flyfish import torch import torch.nn.functional as F import matplotlib.pyplot as plt import numpy as np# 设置中文字体支持 plt.rcParams["font.family"] [AR PL UMing CN] # Linux plt.rcParams[axes.uni…