资源

es:7.18  kibana:7.18 flink:1.17.2

目录

mkdir -p /usr/project/flink/{conf,job,logs}
chmod -R 777 /usr/project/flink
#资源情况
mysql8.0 Elasticsearch7.18 自建# 目录结构
/usr/project/flink/
/usr/project/flink/
├── conf/
│   ├── flink-conf.yaml
│   └── log4j2.xml
├── job/
│   ├── flink-connector-elasticsearch7-3.0.1-1.17.jar
│   ├── flink-connector-elasticsearch-base-3.0.1-1.17.jar
│   ├── flink-sql-connector-mysql-cdc-3.1.1.jar
│   └── win_user.sql
├── logs/
└── docker-compose.yml

本地创建es kibana

version: '3.8'services:jobmanager:image: flink:1.17.2container_name: flink-jobmanagerrestart: alwaysports:- "8081:8081"  # Flink Web UIcommand: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagervolumes:- ./conf:/opt/flink/conf- ./job:/opt/flink/job- /usr/project/flink/logs:/opt/flink/lognetworks:- flink-networktaskmanager:image: flink:1.17.2container_name: flink-taskmanagerrestart: alwaysdepends_on:- jobmanagercommand: taskmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagervolumes:- ./conf:/opt/flink/conf- ./job:/opt/flink/job- /usr/project/flink/logs:/opt/flink/lognetworks:- flink-networkelasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:7.17.18container_name: elasticsearchrestart: alwaysenvironment:- discovery.type=single-node- ELASTIC_PASSWORD=123456- xpack.security.enabled=true- network.host=0.0.0.0ports:- "9200:9200"- "9300:9300"volumes:- es_data:/usr/share/elasticsearch/datanetworks:- flink-networkkibana:image: docker.elastic.co/kibana/kibana:7.17.18container_name: kibanarestart: alwaysenvironment:- ELASTICSEARCH_HOSTS=http://elasticsearch:9200- ELASTICSEARCH_USERNAME=elastic- ELASTICSEARCH_PASSWORD=123456ports:- "5601:5601"networks:- flink-networkvolumes:es_data:networks:flink-network:driver: bridge

启动

#目录
cd /usr/project/flink#保存文件后,重新启动容器
docker-compose up -d#关闭 停止并移除容器(不删除数据卷)
docker-compose down#检查 restart 策略是否生效
docker inspect -f '{{.Name}} {{.HostConfig.RestartPolicy.Name}}' $(docker ps -q)

验证

http://127.0.0.1:9200curl -u elastic:123456 http://127.0.0.1:9200#账户密码
elastic
123456

Flink SQL Job 示例

文件 /usr/project/flink/job/win_user.sql

存量增量模式

scan.startup.mode 设置为 'initial',以从表的初始状态开始读取数据,然后再进行增量同步

将其设置为 'latest-offset',以从最新的偏移量开始读取数据,实现增量同步

验证表是否成功创建

/opt/flink/bin/sql-client.sh embeddedSHOW TABLES;
SELECT * FROM v99_source_win_user LIMIT 10;
#验证表是否成功创建 进入flink sql
/opt/flink/bin/sql-client.sh embeddedSHOW TABLES;
SELECT * FROM v99_source_win_user LIMIT 10;#验证表是否成功创建 进入flink sql
/opt/flink/bin/sql-client.sh embeddedSHOW TABLES;
SELECT * FROM v99_source_win_user LIMIT 10;
配置模块

vim /usr/project/flink/job/win_user.sql

CREATE TABLE v99_source_win_user (id INT,username STRING,merchant_id INT,avatar STRING,fcoin DECIMAL(15,4),coin_commission DECIMAL(15,4),level_id TINYINT,role TINYINT,is_promoter TINYINT,flag INT,real_name STRING,signature STRING,birthday STRING,area_code STRING,mobile STRING,email STRING,sex TINYINT,bind_bank TINYINT,address STRING,score INT,promo_code STRING,id_path STRING,sup_uid_1 INT,sup_username_1 STRING,sup_uid_2 INT,sup_uid_3 INT,sup_uid_4 INT,sup_uid_5 INT,sup_uid_6 INT,sup_uid_top INT,sup_username_top STRING,sup_level_top INT,password_hash STRING,password_coin STRING,ip STRING,third_login_type STRING,ip_region STRING,status TINYINT,last_login_ip STRING,last_login_ip_region STRING,last_login_time INT,last_login_device_id STRING,created_at INT,updated_at INT,freeze_cause STRING,freeze_at INT,operator_name STRING,fb_pid STRING,fb_cid STRING,created_name STRING,memberType TINYINT,google_sub_id STRING,facebook_sub_id STRING,secret STRING,code_url STRING,code_status TINYINT,user_type TINYINT,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = '123456','database-name' = 'db_name','table-name' = 'win_user','scan.startup.mode' = 'initial',          -- 读取存量数据'debezium.snapshot.mode' = 'never',       -- 使用快照模式initial  增量模式never 增量模式'scan.incremental.snapshot.enabled' = 'true'  -- 启用增量同步
);CREATE TABLE es_sink_table_win_user (id INT,username STRING,merchant_id INT,avatar STRING,fcoin DECIMAL(15,4),coin_commission DECIMAL(15,4),level_id TINYINT,role TINYINT,is_promoter TINYINT,flag INT,real_name STRING,signature STRING,birthday STRING,area_code STRING,mobile STRING,email STRING,sex TINYINT,bind_bank TINYINT,address STRING,score INT,promo_code STRING,id_path STRING,sup_uid_1 INT,sup_username_1 STRING,sup_uid_2 INT,sup_uid_3 INT,sup_uid_4 INT,sup_uid_5 INT,sup_uid_6 INT,sup_uid_top INT,sup_username_top STRING,sup_level_top INT,password_hash STRING,password_coin STRING,ip STRING,third_login_type STRING,ip_region STRING,status TINYINT,last_login_ip STRING,last_login_ip_region STRING,last_login_time INT,last_login_device_id STRING,created_at INT,updated_at INT,freeze_cause STRING,freeze_at INT,operator_name STRING,fb_pid STRING,fb_cid STRING,created_name STRING,memberType TINYINT,google_sub_id STRING,facebook_sub_id STRING,secret STRING,code_url STRING,code_status TINYINT,user_type TINYINT,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://elasticsearch:9200','username' = 'elastic','password' = '123456','index' = 'win_user',  -- 确保索引名称与 Elasticsearch 中的匹配'sink.bulk-flush.interval' = '1s','sink.bulk-flush.backoff.max-retries' = '3',   -- 设置最大重试次数'sink.bulk-flush.max-actions' = '100', -- 一条数据也会同步不等待'sink.bulk-flush.max-size' = '1mb', -- 达到 1MB 或 200 条数据时批量 flush'sink.bulk-flush.backoff.delay' = '100ms',       -- 设置重试的延迟'sink.bulk-flush.backoff.strategy' = 'constant'  -- 重试策略
);-- 3. 执行数据插入任务
INSERT INTO es_sink_table_win_user
SELECT * FROM v99_source_win_user;

验证

/opt/flink/bin/sql-client.sh embedded
#验证
SHOW TABLES;
desc es_sink_table_win_user;
DROP TABLE IF EXISTS es_sink_table_win_user;
DROP TABLE IF EXISTS v99_source_win_user;# Flink 1.17 中,您可以使用以下命令查看已注册的连接器
SHOW TABLES;
#作业状态
SHOW JOBS;
#详情
EXPLAIN SELECT * FROM v99_source_win_user;SELECT * FROM v99_source_win_user LIMIT 10;

优化配置 必须要配置

/opt/flink/bin/sql-client.sh embedded
#增加的 Session 全局配置(SET)
SET execution.checkpointing.interval = '1s';
SET restart-strategy = 'fixed-delay';
SET restart-strategy.fixed-delay.attempts = '3';
SET restart-strategy.fixed-delay.delay = '5s';
SET parallelism.default = 4;
SET state.backend = 'rocksdb';
SET state.backend.rocksdb.memory.managed = 'true';
SET execution.parallelism = 8;#-- 提交作业时设置  Sink 的并行度提升
SET parallelism.default = 2; 
#最高作业任务
SET execution.parallelism = 8;
#查看验证配置
SET;

连接器下载配置

flink-connector-elasticsearch包官方下载地址 https://repo1.maven.org/maven2/org/apache/flink/ 要选对版本 es7.17

flink-1.17.2

cd /usr/project/flink/job

#删除当前目录除win_user.sql其他的文件
find . -maxdepth 1 ! -name 'win_user.sql' ! -name '.' -type f -exec rm -f {} +# MySQL CDC
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.1.1/flink-sql-connector-mysql-cdc-3.1.1.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar
# Elasticsearch
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch-base/3.0.1-1.17/flink-connector-elasticsearch-base-3.0.1-1.17.jar# 补充依赖
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.13/httpclient-4.5.13.jar
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.13/httpcore-4.4.13.jar
wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar
wget https://repo1.maven.org/maven2/commons-codec/commons-codec/1.11/commons-codec-1.11.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/97718.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/97718.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/97718.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI浏览器和钉钉ONE是不是伪需求?

最近两则新闻格外引起了我的注意:一是Claude推出了官方浏览器插件,二是钉钉发布了钉钉ONE。前者说明AI浏览器未必有必要,后者则描绘了一幅“刷刷手机就能完成工作”的未来办公图景。这几天我经常在思考,AI浏览器是不是没有必要&am…

从结构化到多模态:RAG文档解析工具选型全指南

在RAG系统建设中,文档解析质量直接决定最终效果上限,选择合适的解析工具已成为避免"垃圾进,垃圾出"(GIGO)困境的关键决策。一、文档解析:RAG系统的基石与瓶颈 当前企业知识库中超过80%的信息存储…

设计模式:享元模式(Flyweight Pattern)

文章目录一、享元模式的介绍二、实例分析三、示例代码一、享元模式的介绍 享元模式(Flyweight Pattern) 是一种结构型设计模式。通过共享相同对象,减少内存消耗,提高性能。 它摒弃了在每个对象中保存所有数据的方式, 通…

【Go语言入门教程】 Go语言的起源与技术特点:从诞生到现代编程利器(一)

文章目录前言1. Go语言的起源与发展2. Go语言的核心设计团队2.1 Ken Thompson(肯汤普森)2.2 Rob Pike(罗布派克)2.3 Robert Griesemer(罗伯特格瑞泽默)设计动机:解决C的痛点3. Go语言的核心特性…

rocketmq启动与测试

1.更改runserver.sh的内存大小 vi runserver.sh 2.更改 runbroker.sh内存大小 vi runbroker.sh3.设置环境变量 vi ~/.bash_profile 新增 export NAMESRV_ADDRlocalhost:98764.启动 --在bin的上一级目录启动 nohup bin/mqnamesrv & nohup bin/mqbroker &5.查看日志 le…

11.《简单的路由重分布基础知识探秘》

11_路由重分布 文章目录11_路由重分布路由重分布概述路由重分布的核心作用基础实验实验流程实验拓扑配置示例(基本操作省略)实验结论路由重分布概述 路由重分布(又称路由引入)是指在不同路由协议之间交换路由信息的技术。在复杂网络中,可能同…

C++ 左值引用与右值引用介绍

C 左值引用与右值引用详解 在 C 的类型系统中,引用(reference) 是一种为已有对象起别名的机制。在早期(C98/03)中,C 只有 左值引用(lvalue reference),主要用于函数参数…

基于物联网设计的园林灌溉系统(华为云IOT)_274

文章目录 一、前言 1.1 项目介绍 【1】项目开发背景 【2】设计实现的功能 【3】项目硬件模块组成 【4】设计意义 【5】国内外研究现状 【6】摘要 1.2 设计思路 1.3 系统功能总结 1.4 开发工具的选择 【1】设备端开发 【2】上位机开发 1.5 参考文献 1.6 系统框架图 1.7 系统原理…

uni-app iOS 应用版本迭代与上架实践 持续更新的高效流程

很多团队在使用 uni-app 开发 iOS 应用时,往往能顺利完成第一次上架,但一到 版本更新和迭代 环节,就会频繁遇到瓶颈:证书是否能复用?如何快速上传?怎样保持节奏不被打乱? 本文结合实战经验&…

解决由Tomcat部署前端改成nginx部署,导致大写.JPG结尾文件无法访问问题

前言:因信创替代要求,在麒麟服务器部署新的应用。原先的架构:前端tomcat部署,源码部署java应用(ps:前后端,文件都在同一台服务器上),前端访问后端,再通过后端…

【设计模式】三大原则 单一职责原则、开放-封闭原则、依赖倒转原则

系列文章目录 文章目录系列文章目录一、单一职责原则方块游戏的设计二、开放-封闭原则原则介绍何时应对变化三、依赖倒转原则依赖倒转原则介绍里氏代换原则总结一、单一职责原则 单一职责原则,听字面意思,就是说功能要单一,他的准确解释是&a…

(3dnr)多帧视频图像去噪 (一)

一、多帧视频图像去噪 原理当摄像机每秒捕捉的图像达到60FPS,除了场景切换或者一些快速运动的场 景外,视频信号中相邻的两帧图像内容大部分是相同的。并且视频信号中的噪 声大部分都是均值为零的随机噪声,因此在时间上对视频信号做帧平均&…

从静态到智能:用函数式接口替代传统工具类

在 Java 早期开发中,我们习惯使用**静态实用程序类(Utility Class)**来集中放置一些通用方法,例如验证、字符串处理、数学计算等。这种模式虽然简单直接,但在现代 Java 开发(尤其是 Java 8 引入 Lambda 和函…

免杀伪装 ----> R3进程伪装实战(高阶) ---->培养红队免杀思路

目录 R3进程伪装(免杀技术)高阶技术说明 深入剖析Windows进程规避免杀技术 学习R3进程伪装的必备技能 R3进程伪装的核心知识点与实现步骤 核心知识点 实现步骤 免杀实现步骤 PEB与EPROCESS的深入解析 1. PEB(进程环境块) 2. EPROCESS 3. PEB与…

深度学习——基于卷积神经网络实现食物图像分类(数据增强)

文章目录 引言 一、项目概述 二、环境准备 三、数据预处理 3.1 数据增强与标准化 3.2 数据集准备 四、自定义数据集类 五、构建CNN模型 六、训练与评估 6.1 训练函数 6.2 评估函数 6.3 训练流程 七、关键技术与优化 八、常见问题与解决 九、完整代码 十、总结 引言 本文将详细介…

【开题答辩全过程】以 基于微信小程序的教学辅助系统 为例,包含答辩的问题和答案

个人简介一名14年经验的资深毕设内行人,语言擅长Java、php、微信小程序、Python、Golang、安卓Android等开发项目包括大数据、深度学习、网站、小程序、安卓、算法。平常会做一些项目定制化开发、代码讲解、答辩教学、文档编写、也懂一些降重方面的技巧。感谢大家的…

【代码解读】Deepseek_vl2中具体代码调用

【代码解读】Deepseek_vl2中具体代码调用 文章目录【代码解读】Deepseek_vl2中具体代码调用DeepseekVLV2Processor解读DeepseekVLV2ForCausalLM - 多模态模型DeepSeek-VL2 Processor的输入格式单样本格式多样本格式DeepSeek-VL2模型的输出形式总结主要输出类型:Deep…

Git 9 ,.git/index.lock 文件冲突问题( .git/index.lock‘: File exists. )

目录 前言 一、问题背景 1.1 问题出现场景 1.2 典型报错信息 1.3 问题影响 二、问题原因分 2.1 Git 的 index 与锁机制 2.2 主要作用 2.3 根本原因 三、解决方案 3.1 确认进程 3.2 手动删除 3.3 再次执行 四、注意事项 4.1 确保运行 4.2 问题排查 4.3 自动化解…

Proteus8 仿真教学全指南:从入门到实战的电子开发利器

在电子设计、单片机课程设计或创客实践中,你是否常因实物采购贵、新手怕烧板、调试排错难而头疼?Proteus8 作为一款 “全能型” EDA 仿真工具,完美解决这些痛点 —— 它集「原理图绘制 PCB 设计 虚拟仿真」于一体,支持 51、STM3…

系统科学:结构、功能与层级探析

摘要本文旨在系统性地梳理和辨析系统科学中的核心概念——结构、功能与层级。文章首先追溯系统思想的理论源流,确立其作为一种超越还原论的整体性研究范式。在此基础上,深度剖析系统结构的内在构成(组分、框架、动态性)、系统层级…