1. 基础环境:
1.1 安装JDK
本次使用 jdk-11.0.26_linux-x64_bin.tar.gz
解压缩
tar -zxvf jdk-11.0.26_linux-x64_bin.tar.gz -C /usr/local/java/
配置环境变量:
vi /etc/profileJAVA_HOME=/usr/local/java/jdk-11.0.26
CLASSPATH=.:${JAVA_HOME}/lib:$CLASSPATH
PATH=$PATH:${JAVA_HOME}/bin
export JAVA_HOME CLASS_PATH PATH
让环境变量生效:
source /etc/profile
如果没生效就重启服务器
1.2 ssh免密码登录
2. 搭建Flink分布式集群
1. 下载
版本:flink-2.0.0-bin-scala_2.12.tgz
地址: https://www.apache.org/dyn/closer.lua/flink/flink-2.0.0/flink-2.0.0-bin-scala_2.12.tgz
2. 安装
通过虚拟机设置共享文件夹将需要的安装包复制到linux虚拟机中 localhost1。虚拟机的共享盘在 /mnt/hgfs/。 将共享盘安装包复制到 存在目标路径/opt/software/
解压缩
cd /opt/software/
tar -zxvf flink-2.0.0-bin-scala_2.12.tgz -C /usr/local/applications/
3,修改FLINK配置
修改 /conf/config.yaml 文件
at localhost1
jobmanager:bind-host: 0.0.0.0rpc:address: localhost1port: 6123taskmanager:bind-host: 0.0.0.0host: localhost1
at localhost2
jobmanager:bind-host: 0.0.0.0rpc:address: localhost1port: 6123taskmanager:bind-host: 0.0.0.0host: localhost2
at localhost3
jobmanager:bind-host: 0.0.0.0rpc:address: localhost1port: 6123taskmanager:bind-host: 0.0.0.0host: localhost3
修改 /conf/masters文件
localhost1:8081
修改 /conf/workers文件
localhost1
localhost2
localhost3
修改 /conf/zoo.cfg 文件 (可以不改)
server.1=localhost1:2888:3888
server.2=localhost2:2888:3888
server.3=localhost3:2888:3888
4. 将Spark软件分发到集群
先关闭防火墙
systemctl stop firewalldsystemctl disable firewalld
将Flink分发到localhost2 和 localhost3
scp -r flink-2.0.0 root@localhost2:/usr/local/applications/flink-2.0.0
scp -r flink-2.0.0 root@localhost3:/usr/local/applications/flink-2.0.0
5, 启动集群
[root@localhost1 flink-2.0.0]# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host localhost1.
Starting taskexecutor daemon on host localhost1.
Starting taskexecutor daemon on host localhost2.
Starting taskexecutor daemon on host localhost3.
6, 查看WEB页面
http://localhost1:8081/#/overview
3, Flink 开发
3.1 单词统计案例
创建一个Java项目 导入Flink依赖
<properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>2.0.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies>
创建WordCount 类
package com.neilparker;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = executionEnvironment.socketTextStream("localhost1",7777,"\n");SingleOutputStreamOperator<Tuple2<String, Long>> dataStream = source.flatMap(new FlatMapFunction<String, Tuple2<String,Long>>(){@Overridepublic void flatMap (String string, Collector<Tuple2<String, Long>> collector) {String[] splits = string.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}}).keyBy(value -> value.f0).sum(1);dataStream.print();executionEnvironment.execute("wordcount batch process");}}
启动nc 命令 模拟一个 Socket Server ,
然后运行java 代码,
然后再nc 命令行发送数据
然后就可以看到nc 命令行如下:
[root@localhost1 ~]# nc -lp 7777
hello neil hello jack
hello mike hello walker
hello sun
Java代码控制台看到单词统计结果:
5> (hello,1)
15> (neil,1)
14> (jack,1)
5> (hello,2)
4> (mike,1)
9> (walker,1)
5> (hello,3)
5> (hello,4)
15> (sun,1)
5> (hello,5)
3.2 提交代码到Flink集群中
<build><plugins><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
然后maven package

先启动nc命令
[root@localhost1 flink-2.0.0]# nc -lp 7777
然后到Flink UI 页面提交jar包
然后就看到job正常运行起来了
然后区nc 命令行 输入一些单词
到task manager 页面就能看到统计结果