程序结构
├── main.cpp
├── config.json
├── hive_export/
├── parquet_data/
├── sql_scripts/
└── logs/
核心代码实现 (main.cpp)
#include <iostream>
#include <fstream>
#include <vector>
#include <thread>
#include <mutex>
#include <queue>
#include <cstdlib>
#include <ctime>
#include <filesystem>
#include <nlohmann/json.hpp>
#include <unistd.h>namespace fs = std::filesystem;
using json = nlohmann::json;
using namespace std;// 全局锁用于日志和队列同步
mutex log_mutex, queue_mutex;// 配置文件结构
struct Config {string hive_jdbc;string hql_output_dir;string parquet_output_dir;string sql_script_dir;string snowflake_cfg;int export_threads;int import_threads;
};// 日志记录函数
void log_message(const string& message, const string& log_path) {lock_guard<mutex> guard(log_mutex);ofstream log_file(log_path, ios::app);if (log_file) {time_t now = time(nullptr);log_file << "[" << put_time(localtime(&now), "%F %T") << "] " << message << endl;}
}// 解析配置文件
Config load_config(const string& config_path) {ifstream config_file(config_path);if (!config_file) throw runtime_error("Config file not found");json j;config_file >> j;return {j["hive_jdbc"],j["directories"]["hql_output"],j["directories"]["parquet_output"],j["directories"]["sql_scripts"],j["snowflake"]["config_path"],j["threads"]["export"],j["threads"]["import"]};
}// 导出Hive建表语句
void export_hql(const Config& cfg, const string& log_path) {string cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'SHOW DATABASES;' | tail -n +2 > databases.txt";system(cmd.c_str());ifstream db_file("databases.txt");string db;while (getline(db_file, db)) {cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'USE " + db + "; SHOW TABLES;' | tail -n +2 > " + db + "_tables.txt";system(cmd.c_str());ifstream table_file(db + "_tables.txt");string table;while (getline(table_file, table)) {fs::path dir = fs::path(cfg.hql_output_dir) / db;fs::create_directories(dir);string hql_path = (dir / (table + ".hql")).string();cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'USE " + db + "; SHOW CREATE TABLE " + table + ";' | ""awk 'NR>2' | head -n -1 > " + hql_path;if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Exported HQL for " + db + "." + table, log_path);} else {log_message("ERROR: Failed to export HQL for " + db + "." + table, log_path);}}}
}// 导出Parquet数据(线程任务)
void export_worker(queue<string> tasks, const Config& cfg, const string& log_path) {while (true) {string task;{lock_guard<mutex> guard(queue_mutex);if (tasks.empty()) return;task = move(tasks.front());tasks.pop();}size_t pos = task.find('.');string db = task.substr(0, pos);string table = task.substr(pos + 1);fs::path out_dir = fs::path(cfg.parquet_output_dir) / db / table;fs::create_directories(out_dir);string cmd = "hive -e \"SET hive.exec.compress.output=false; ""INSERT OVERWRITE DIRECTORY '" + out_dir.string() + "' ""STORED AS PARQUET SELECT * FROM " + task + ";\"";if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Exported Parquet for " + task, log_path);} else {log_message("ERROR: Failed to export Parquet for " + task, log_path);}}
}// 多线程导出Parquet
void export_parquet(const Config& cfg, const string& log_path) {ifstream db_file("databases.txt");queue<string> tasks;string db;while (getline(db_file, db)) {ifstream table_file(db + "_tables.txt");string table;while (getline(table_file, table)) {tasks.push(db + "." + table);}}vector<thread> threads;for (int i = 0; i < cfg.export_threads; ++i) {threads.emplace_back(export_worker, tasks, ref(cfg), ref(log_path));}for (auto& t : threads) t.join();
}// 执行SnowSQL脚本
void run_snowsql(const Config& cfg, const string& log_path) {for (const auto& entry : fs::directory_iterator(cfg.sql_script_dir)) {if (entry.path().extension() == ".sql") {string cmd = "snowsql -c " + cfg.snowflake_cfg + " -f " + entry.path().string();if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Executed SQL " + entry.path().filename().string(), log_path);} else {log_message("ERROR: Failed to execute SQL " + entry.path().filename().string(), log_path);}}}
}// 导入Parquet到Snowflake(线程任务)
void import_worker(queue<fs::path> tasks, const Config& cfg, const string& log_path) {while (true) {fs::path task;{lock_guard<mutex> guard(queue_mutex);if (tasks.empty()) return;task = move(tasks.front());tasks.pop();}string db = task.parent_path().filename();string table = task.stem();string cmd = "snowsql -c " + cfg.snowflake_cfg + " -q \"""COPY INTO " + db + "." + table + " ""FROM @" + cfg.parquet_output_dir + "/" + db + "/" + table + " ""FILE_FORMAT = (TYPE = PARQUET);\"";if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Imported Parquet to " + db + "." + table, log_path);} else {log_message("ERROR: Failed to import Parquet to " + db + "." + table, log_path);}}
}// 多线程导入Parquet
void import_parquet(const Config& cfg, const string& log_path) {queue<fs::path> tasks;for (const auto& db_entry : fs::directory_iterator(cfg.parquet_output_dir)) {for (const auto& table_entry : fs::directory_iterator(db_entry.path())) {tasks.push(table_entry.path());}}vector<thread> threads;for (int i = 0; i < cfg.import_threads; ++i) {threads.emplace_back(import_worker, tasks, ref(cfg), ref(log_path));}for (auto& t : threads) t.join();
}int main() {try {// 初始化配置和日志Config cfg = load_config("config.json");string log_path = "logs/transfer_" + to_string(time(nullptr)) + ".log";fs::create_directories("logs");// 执行全流程export_hql(cfg, log_path);export_parquet(cfg, log_path);run_snowsql(cfg, log_path);import_parquet(cfg, log_path);log_message("ALL OPERATIONS COMPLETED", log_path);} catch (const exception& e) {cerr << "CRITICAL ERROR: " << e.what() << endl;return 1;}return 0;
}
配置文件示例 (config.json)
{"hive_jdbc": "jdbc:hive2://hive-server:10000","directories": {"hql_output": "hive_export","parquet_output": "parquet_data","sql_scripts": "sql_scripts"},"snowflake": {"config_path": "~/.snowsql/config"},"threads": {"export": 8,"import": 8}
}
关键功能说明
-
HQL导出:
- 使用
beeline
连接Hive获取所有数据库和表 - 按
数据库/表名.hql
格式存储建表语句 - 自动跳过系统表(通过
tail
和awk
过滤)
- 使用
-
Parquet导出:
- 使用Hive的
INSERT OVERWRITE DIRECTORY
导出为Parquet格式 - 多线程处理不同表(线程数由配置控制)
- 输出路径:
parquet_data/数据库/表名/
- 使用Hive的
-
SnowSQL执行:
- 遍历指定目录的所有
.sql
文件 - 使用
snowsql -c
执行配置文件中的连接 - 支持认证文件自动加载(需预先配置)
- 遍历指定目录的所有
-
Parquet导入:
- 使用Snowflake的
COPY INTO
命令 - 多线程并发导入不同表
- 自动匹配目录结构与表名
- 使用Snowflake的
-
日志系统:
- 按天分割日志文件(文件名含时间戳)
- 记录操作类型、状态和时间
- 线程安全的日志写入
-
异常处理:
- 配置文件缺失检测
- 命令执行状态码检查
- 目录创建失败处理
- JSON解析异常捕获
编译与运行
- 安装依赖:
sudo apt-get install libboost-filesystem-dev nlohmann-json3-dev
- 编译程序:
g++ -std=c++17 -o hive2snowflake main.cpp -lboost_filesystem -lpthread
- 运行程序:
./hive2snowflake
注意事项
-
需要预先配置:
- Hive的beeline客户端
- SnowSQL及认证配置
- Hive表访问权限
- Snowflake表结构匹配
-
性能调整:
- 通过
config.json
调整线程数 - 大表建议单独处理
- 可添加重试机制应对网络波动
- 通过
-
安全增强建议:
- 配置文件加密(如使用jq解密)
- 敏感信息使用环境变量
- 添加操作审计日志