程序结构

├── main.cpp
├── config.json
├── hive_export/
├── parquet_data/
├── sql_scripts/
└── logs/

核心代码实现 (main.cpp)

#include <iostream>
#include <fstream>
#include <vector>
#include <thread>
#include <mutex>
#include <queue>
#include <cstdlib>
#include <ctime>
#include <filesystem>
#include <nlohmann/json.hpp>
#include <unistd.h>namespace fs = std::filesystem;
using json = nlohmann::json;
using namespace std;// 全局锁用于日志和队列同步
mutex log_mutex, queue_mutex;// 配置文件结构
struct Config {string hive_jdbc;string hql_output_dir;string parquet_output_dir;string sql_script_dir;string snowflake_cfg;int export_threads;int import_threads;
};// 日志记录函数
void log_message(const string& message, const string& log_path) {lock_guard<mutex> guard(log_mutex);ofstream log_file(log_path, ios::app);if (log_file) {time_t now = time(nullptr);log_file << "[" << put_time(localtime(&now), "%F %T") << "] " << message << endl;}
}// 解析配置文件
Config load_config(const string& config_path) {ifstream config_file(config_path);if (!config_file) throw runtime_error("Config file not found");json j;config_file >> j;return {j["hive_jdbc"],j["directories"]["hql_output"],j["directories"]["parquet_output"],j["directories"]["sql_scripts"],j["snowflake"]["config_path"],j["threads"]["export"],j["threads"]["import"]};
}// 导出Hive建表语句
void export_hql(const Config& cfg, const string& log_path) {string cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'SHOW DATABASES;' | tail -n +2 > databases.txt";system(cmd.c_str());ifstream db_file("databases.txt");string db;while (getline(db_file, db)) {cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'USE " + db + "; SHOW TABLES;' | tail -n +2 > " + db + "_tables.txt";system(cmd.c_str());ifstream table_file(db + "_tables.txt");string table;while (getline(table_file, table)) {fs::path dir = fs::path(cfg.hql_output_dir) / db;fs::create_directories(dir);string hql_path = (dir / (table + ".hql")).string();cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'USE " + db + "; SHOW CREATE TABLE " + table + ";' | ""awk 'NR>2' | head -n -1 > " + hql_path;if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Exported HQL for " + db + "." + table, log_path);} else {log_message("ERROR: Failed to export HQL for " + db + "." + table, log_path);}}}
}// 导出Parquet数据(线程任务)
void export_worker(queue<string> tasks, const Config& cfg, const string& log_path) {while (true) {string task;{lock_guard<mutex> guard(queue_mutex);if (tasks.empty()) return;task = move(tasks.front());tasks.pop();}size_t pos = task.find('.');string db = task.substr(0, pos);string table = task.substr(pos + 1);fs::path out_dir = fs::path(cfg.parquet_output_dir) / db / table;fs::create_directories(out_dir);string cmd = "hive -e \"SET hive.exec.compress.output=false; ""INSERT OVERWRITE DIRECTORY '" + out_dir.string() + "' ""STORED AS PARQUET SELECT * FROM " + task + ";\"";if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Exported Parquet for " + task, log_path);} else {log_message("ERROR: Failed to export Parquet for " + task, log_path);}}
}// 多线程导出Parquet
void export_parquet(const Config& cfg, const string& log_path) {ifstream db_file("databases.txt");queue<string> tasks;string db;while (getline(db_file, db)) {ifstream table_file(db + "_tables.txt");string table;while (getline(table_file, table)) {tasks.push(db + "." + table);}}vector<thread> threads;for (int i = 0; i < cfg.export_threads; ++i) {threads.emplace_back(export_worker, tasks, ref(cfg), ref(log_path));}for (auto& t : threads) t.join();
}// 执行SnowSQL脚本
void run_snowsql(const Config& cfg, const string& log_path) {for (const auto& entry : fs::directory_iterator(cfg.sql_script_dir)) {if (entry.path().extension() == ".sql") {string cmd = "snowsql -c " + cfg.snowflake_cfg + " -f " + entry.path().string();if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Executed SQL " + entry.path().filename().string(), log_path);} else {log_message("ERROR: Failed to execute SQL " + entry.path().filename().string(), log_path);}}}
}// 导入Parquet到Snowflake(线程任务)
void import_worker(queue<fs::path> tasks, const Config& cfg, const string& log_path) {while (true) {fs::path task;{lock_guard<mutex> guard(queue_mutex);if (tasks.empty()) return;task = move(tasks.front());tasks.pop();}string db = task.parent_path().filename();string table = task.stem();string cmd = "snowsql -c " + cfg.snowflake_cfg + " -q \"""COPY INTO " + db + "." + table + " ""FROM @" + cfg.parquet_output_dir + "/" + db + "/" + table + " ""FILE_FORMAT = (TYPE = PARQUET);\"";if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Imported Parquet to " + db + "." + table, log_path);} else {log_message("ERROR: Failed to import Parquet to " + db + "." + table, log_path);}}
}// 多线程导入Parquet
void import_parquet(const Config& cfg, const string& log_path) {queue<fs::path> tasks;for (const auto& db_entry : fs::directory_iterator(cfg.parquet_output_dir)) {for (const auto& table_entry : fs::directory_iterator(db_entry.path())) {tasks.push(table_entry.path());}}vector<thread> threads;for (int i = 0; i < cfg.import_threads; ++i) {threads.emplace_back(import_worker, tasks, ref(cfg), ref(log_path));}for (auto& t : threads) t.join();
}int main() {try {// 初始化配置和日志Config cfg = load_config("config.json");string log_path = "logs/transfer_" + to_string(time(nullptr)) + ".log";fs::create_directories("logs");// 执行全流程export_hql(cfg, log_path);export_parquet(cfg, log_path);run_snowsql(cfg, log_path);import_parquet(cfg, log_path);log_message("ALL OPERATIONS COMPLETED", log_path);} catch (const exception& e) {cerr << "CRITICAL ERROR: " << e.what() << endl;return 1;}return 0;
}

配置文件示例 (config.json)

{"hive_jdbc": "jdbc:hive2://hive-server:10000","directories": {"hql_output": "hive_export","parquet_output": "parquet_data","sql_scripts": "sql_scripts"},"snowflake": {"config_path": "~/.snowsql/config"},"threads": {"export": 8,"import": 8}
}

关键功能说明

  1. HQL导出

    • 使用beeline连接Hive获取所有数据库和表
    • 数据库/表名.hql格式存储建表语句
    • 自动跳过系统表(通过tailawk过滤)
  2. Parquet导出

    • 使用Hive的INSERT OVERWRITE DIRECTORY导出为Parquet格式
    • 多线程处理不同表(线程数由配置控制)
    • 输出路径:parquet_data/数据库/表名/
  3. SnowSQL执行

    • 遍历指定目录的所有.sql文件
    • 使用snowsql -c执行配置文件中的连接
    • 支持认证文件自动加载(需预先配置)
  4. Parquet导入

    • 使用Snowflake的COPY INTO命令
    • 多线程并发导入不同表
    • 自动匹配目录结构与表名
  5. 日志系统

    • 按天分割日志文件(文件名含时间戳)
    • 记录操作类型、状态和时间
    • 线程安全的日志写入
  6. 异常处理

    • 配置文件缺失检测
    • 命令执行状态码检查
    • 目录创建失败处理
    • JSON解析异常捕获

编译与运行

  1. 安装依赖
sudo apt-get install libboost-filesystem-dev nlohmann-json3-dev
  1. 编译程序
g++ -std=c++17 -o hive2snowflake main.cpp -lboost_filesystem -lpthread
  1. 运行程序
./hive2snowflake

注意事项

  1. 需要预先配置:

    • Hive的beeline客户端
    • SnowSQL及认证配置
    • Hive表访问权限
    • Snowflake表结构匹配
  2. 性能调整:

    • 通过config.json调整线程数
    • 大表建议单独处理
    • 可添加重试机制应对网络波动
  3. 安全增强建议:

    • 配置文件加密(如使用jq解密)
    • 敏感信息使用环境变量
    • 添加操作审计日志

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/92682.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/92682.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/92682.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

drippingblues靶机教程

一、信息搜集首先将其在VirtualBOX中安装&#xff0c;并将kali与靶机都设置为桥接模式紧接着我们扫描IP&#xff0c;来发现靶机地址&#xff0c;经过搜集&#xff0c;发现IP是192.168.1.9&#xff0c;我们去访问一下紧接着我们扫一下开放了哪些端口。发现开放了21、22以及80端口…

39.【.NET8 实战--孢子记账--从单体到微服务--转向微服务】--扩展功能--调整发布脚本

这篇文章&#xff0c;我们要调整发布脚本。之所以要调整发布脚本&#xff0c;是因为现在我们的项目有三个环境&#xff1a;本地&#xff08;Local&#xff09;、开发&#xff08;Development&#xff09;、生产&#xff08;Production&#xff09;。Tip&#xff1a;我们的项目虽…

商品、股指及ETF期权五档盘口Tick级与分钟级历史行情数据多维解析

在金融数据分析领域&#xff0c;本地CSV文件是存储高频与低频数据的常用载体。本文以期权市场数据为例&#xff0c;探讨如何基于CSV格式处理分钟级行情、高频Tick数据、日频数据、逐笔委托记录、五档订单簿及历史行情数据&#xff0c;并提供专业的技术实现方案。以下将从数据预…

云端软件工程智能代理:任务委托与自动化实践全解

云端软件工程智能代理&#xff1a;任务委托与自动化实践全解 背景与未来趋势 随着软件工程复杂度不断提升&#xff0c;开发者对自动化工具的依赖也日益增强。我们正进入一个“人机协作”的新时代&#xff0c;开发者可以专注于核心创新&#xff0c;将重复性、繁琐的任务委托给智…

making stb style lib(1): do color print in console

col.h: see origin repo // origin repo: https://github.com/resyfer/libcol #ifndef _COL_HOL_H_ #define _COL_HOL_H_#include <stdlib.h> #include <stdio.h> #include <stdbool.h> #include <string.h> #include <math.h> // 新增&#xf…

llm本地部署+web访问+交互

要实现基于llm的web访问和交互&#xff0c;需支持对llm的访问和对网络搜索的调用。 这里使用ollama llm兼容openai sdk访问&#xff1b;使用proxyless-llm-websearch模拟网络搜索。 1 ollama本地部署 假设ollama已经部署&#xff0c;具体过程参考 在mac m1基于ollama运行dee…

自动驾驶数据闭环

自动驾驶的数据闭环是支撑算法持续迭代的核心机制&#xff0c;其本质是通过“数据采集-处理-训练-部署-反馈”的循环&#xff0c;不断优化模型对复杂场景的适应性。由于自动驾驶数据量极大&#xff08;单车日均TB级&#xff09;、场景多样&#xff08;从常规道路到极端边缘场景…

二十、MySQL-DQL-条件查询

DQL-条件查询代码&#xff1a; DQL-条件查询 -- 1.查询 姓名 为 杨逍 的员工 select * from tb_emp where name 杨逍; -- 2.查询 id小于等于5 的员工信息 select * from tb_emp where id < 5; -- 3.查询 没有分配职位 的员工信息 select * from tb_emp where job is null; …

Mac下安装Conda虚拟环境管理器

Conda 是一个开源的包、环境管理器&#xff0c;可以用于在同一个机器上创建不同的虚拟环境&#xff0c;安装不同Python 版本的软件包及其依赖&#xff0c;并能够在不同的虚拟环境之间切换 Conda常通过安装Anaconda/Miniconda来进行使用。一般使用Miniconda就够了。Miniconda 是…

Android 中解决 Button 按钮背景色设置无效的问题

1、问题描述 在布局文件中有两个 Button 按钮&#xff0c;为每个按钮设置不同的背景色&#xff0c;但是显示出来的效果都是紫色的&#xff0c;跟设置的颜色不同&#xff0c;布局文件如下所示&#xff1a;<Buttonandroid:id"id/button_cancel"android:layout_width…

云服务器--阿里云OSS(2)【Springboot使用阿里云OSS】

&#x1f4d2; 阿里云 OSS Spring Boot 异步任务&#xff08;直接存 OSS&#xff09; 1. 项目结构 src/main/java/com/example/demo├── controller│ └── UploadController.java // 接收上传请求├── service│ ├── AsyncUploadService.java // 异步上传…

get请求中文字符参数乱码问题

第一种方法 服务器默认的传参编码格式是ISO8859-1,所以前端直接原样字符串请求&#xff0c;到后端解析一下就得到正确字符 String fileName request.getParameter("fileName"); fileName new String(fileName.getBytes("ISO8859-1"),"UTF-8");…

C语言(10)——结构体、联合体、枚举

关于C语言零基础学习知识&#xff0c;小编有话说&#xff0c;各位看官敬请入下面的专栏世界&#xff1a;打怪升级之路——C语言之路_ankleless的博客-CSDN博客 Hi&#xff01;冒险者&#x1f60e;&#xff0c;欢迎闯入 C 语言的奇幻异世界&#x1f30c;&#xff01; 我是 Ankle…

海康威视摄像头实时推流到阿里云公网服务器(Windows + FFmpeg + nginx-rtmp)

海康威视摄像头实时推流到阿里云公网服务器&#xff08;Windows FFmpeg nginx-rtmp1. 步骤总览2. 阿里云 ECS&#xff08;Linux&#xff09;配置2.1 开放端口2.2 安装 nginx-rtmp3. Windows 电脑端配置3.1 安装 FFmpeg3.1.1 官网/镜像下载&#xff1a;3.1.2 解压后将 bin 目录…

基础网络网路层——IPV4地址

在IP网络上&#xff0c;如果用户要将一台计算机连接到Internet上&#xff0c;就需要向因特网服务提供方ISP&#xff08;Internet Service Provider&#xff09;申请一个IP地址。IP地址是在计算机网络中被用来唯一标识一台设备的一组数字。IPv4地址由32位二进制数值组成&#xf…

技术速递|GPT-5 正式上线 Azure AI Foundry

AI 应用正在经历一场深刻变革——对企业来说&#xff0c;仅仅“能聊天”早已不够&#xff0c;生成内容、逻辑推理、落地生产&#xff0c;这些才是新时代对 AI 能力的真正考验。 今天&#xff0c;我们非常激动地宣布&#xff0c;OpenAI 最新旗舰大模型 GPT-5 正式上线 Azure AI …

Logistic Regression|逻辑回归

----------------------------------------------------------------------------------------------- 这是我在我的网站中截取的文章&#xff0c;有更多的文章欢迎来访问我自己的博客网站rn.berlinlian.cn&#xff0c;这里还有很多有关计算机的知识&#xff0c;欢迎进行留言或…

三极管在电路中的应用

1、信号放大&#xff08;电压放大&#xff09; 应用场景 &#xff1a;麦克风声音放大、耳机驱动、广播信号接收等音频设备 原理解析 &#xff1a; 想象三极管如同一个精准的水龙头&#xff1a; 基极&#xff08;B&#xff09;电流如同拧动阀门的微弱力量&#xff08;输入信号&a…

Redis 事务机制

文章目录一、什么是事务&#xff1f;二、事务相关操作总体认识基本操作流程watch 操作演示watch 原理一、什么是事务&#xff1f; Redis 的事务和 MySQL 的事务概念上是类似的. 都是把⼀系列操作绑定成⼀组. 让这⼀组能够批量执⾏. Redis 的事务和 MySQL 事务的区别&#xff1…

Mybatis学习之自定义映射resultMap(七)

这里写目录标题一、准备工作1、新建maven工程2、准备两张表3、建立mapper、pojo、映射文件mapper接口pojoxxxMapper.xml二、resultMap处理字段和属性的映射关系1、用起别名的方式保证字段名与属性名一致2、逐一设置resultMap映射关系3、配置mapUnderscoreToCamelCase三、多对一…