在 PySpark 中打包 Python 环境并调度到集群是处理依赖一致性的关键步骤。以下是完整的解决方案,包含环境打包、分发和配置方法:

一、环境打包方法

  1. 使用 Conda 打包环境
# 创建 Conda 环境
conda create -n pyspark_env python=3.8
conda activate pyspark_env
conda install numpy pandas pyarrow  # 安装所需依赖# 导出环境为 YAML 文件
conda env export > environment.yml# 打包环境(Linux/macOS)
conda-pack -n pyspark_env -o pyspark_env.tar.gz
  1. 使用 Virtualenv 打包环境
# 创建虚拟环境
python3 -m venv pyspark_env
source pyspark_env/bin/activate  # Linux/macOS
pyspark_env\Scripts\activate  # Windows# 安装依赖
pip install numpy pandas pyarrow# 打包环境(需使用第三方工具)
pip install virtualenv-pack
virtualenv-pack -f -o pyspark_env.tar.gz

二、分发环境到集群

方法 1:通过 --archives 参数上传
在提交作业时,使用 --archives 参数将打包的环境分发到所有节点:

# 将环境包上传到 HDFS,避免每次提交都重新传输:
hdfs dfs -put pyspark_env.tar.gz /path/in/hdfs/spark-submit \--master yarn \--deploy-mode cluster \--py-files helper.py\ # python依赖文件,比如第三方代码等--archives hdfs:///path/in/hdfs/pyspark_env.tar.gz#environment \your_script.py

三、配置 PySpark 使用打包环境

  1. 设置 Python 解释器路径
    在代码中指定 Executor 使用打包环境中的 Python:
import os
os.environ["PYSPARK_PYTHON"] = "./environment/bin/python"  # 对应 --archives 指定的目录名
os.environ["PYSPARK_DRIVER_PYTHON"] = "./environment/bin/python"  # Cluster 模式需要,如果是client模式,driver_python配置本地python路径,比如/opt/conda/bin/python, 需注意本地python和集群打包python的版本一致from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PackagedEnvApp").getOrCreate()
  1. 编写pyspark脚本
import os
os.environ["PYSPARK_PYTHON"] = "./environment/bin/python"from pyspark.sql import SparkSession
import pandas as pd  # 使用打包环境中的 pandasspark = SparkSession.builder.appName("PackagedEnvExample").getOrCreate()# 使用 pandas 处理数据
pdf = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
df = spark.createDataFrame(pdf)
df.show()spark.stop()
  1. 提交作业
spark-submit \--master yarn \--deploy-mode cluster \--archives pyspark_env.tar.gz#environment \example.py
  1. 配置优先级与运行模式详解
  • 配置优先级规则
    Spark 配置的优先级从高到低如下:

    SparkSession.build.config
    spark-submit --conf参数
    spark-defaults.conf
    系统默认值
  • 关键结论:

    • SparkSession.builder.config() 优先级最高,会覆盖其他配置
    • spark-submit 参数优先级次之
    • 特殊参数例外:–master 和 --deploy-mode 在 spark-submit 中具有最高优先级
  • deploy-mode 配置规则

    设置方式是否生效说明
    spark-submit✅ 总是生效命令行参数具有最终决定权
    SparkSession 代码❌ 当 spark-submit 指定时无效spark-submit 未指定,则代码配置生效
    spark-defaults⚠️ 最低优先级仅当其他方式未配置时生效

四、替代方案

  1. Docker 容器
    使用 Docker 打包完整环境,通过 Kubernetes 调度:
# Dockerfile 示例
FROM apache/spark-py:v3.3.2
RUN pip install pandas numpy# 构建并推送镜像
docker build -t my-spark-image:v1 .
docker push myregistry/my-spark-image:v1# 提交作业
spark-submit \--master k8s://https://kubernetes-host:port \--conf spark.kubernetes.container.image=myregistry/my-spark-image:v1 \...
  1. PySpark 内置依赖管理
    通过 --py-files 参数上传 Python 文件 / 包:
spark-submit \--py-files my_module.zip,another_dep.py \your_script.py

五、Pyspark调用Xgboost或者LightGBM

5.1 调用 XGBoost 模型

  1. 准备依赖
    下载 XGBoost 的 Spark 扩展 jar 包:可以从 XGBoost 的官方 GitHub 发布页面 或者 Maven 仓库下载与你使用的 XGBoost 和 Spark 版本兼容的xgboost4j-spark和xgboost4j的 jar 包。例如,如果你使用的是 Spark 3.3.0 和 XGBoost 1.6.2,可以下载对应的版本。
    下载其他依赖:确保scala-library等相关依赖也在合适的版本,因为xgboost4j-spark会依赖它们。
  2. 配置 Spark 提交参数
    在使用spark-submit提交作业时,通过–jars参数指定上述下载的 jar 包路径。例如:
spark-submit \--master yarn \--deploy-mode cluster \--jars /path/to/xgboost4j-spark-1.6.2.jar,/path/to/xgboost4j-1.6.2.jar,/path/to/scala-library-2.12.10.jar \your_script.py

也可以将这些 jar 包上传到 HDFS,然后使用 HDFS 路径:

hdfs dfs -put /path/to/xgboost4j-spark-1.6.2.jar /lib/
hdfs dfs -put /path/to/xgboost4j-1.6.2.jar /lib/
hdfs dfs -put /path/to/scala-library-2.12.10.jar /lib/spark-submit \--master yarn \--deploy-mode cluster \--jars hdfs:///lib/xgboost4j-spark-1.6.2.jar,hdfs:///lib/xgboost4j-1.6.2.jar,hdfs:///lib/scala-library-2.12.10.jar \your_script.py
  1. Python 代码示例
    在 Python 代码中,导入相关模块并使用 XGBoost 的 Spark 接口:
from pyspark.sql import SparkSession
from xgboost.spark import XGBoostClassifierspark = SparkSession.builder \.appName("XGBoostOnSpark") \.getOrCreate()# 假设data是一个包含特征和标签的DataFrame
data = spark.read.csv("your_data.csv", header=True, inferSchema=True)
feature_cols = [col for col in data.columns if col != "label"]
label_col = "label"# 创建XGBoost分类器
model = XGBoostClassifier(num_round=10, objective="binary:logistic")
# 拟合模型
model.fit(data, label_col=label_col, features_col=feature_cols)

5.2 调用 LightGBM 模型

  1. 准备依赖
    下载 LightGBM 的 Spark 扩展 jar 包:从 LightGBM 的官方 GitHub 发布页面或者 Maven 仓库获取lightgbm4j-spark相关的 jar 包,以及lightgbm4j的 jar 包。注意选择与你的 Spark 和 LightGBM 版本适配的版本。
    处理其他依赖:同样要保证scala-library等依赖的兼容性。
  2. 配置 Spark 提交参数
    和 XGBoost 类似,使用spark-submit时通过–jars参数指定 jar 包路径。例如:
spark-submit \--master yarn \--deploy-mode cluster \--jars /path/to/lightgbm4j-spark-3.3.1.jar,/path/to/lightgbm4j-3.3.1.jar,/path/to/scala-library-2.12.10.jar \your_script.py

或者上传到 HDFS 后使用 HDFS 路径:

hdfs dfs -put /path/to/lightgbm4j-spark-3.3.1.jar /lib/
hdfs dfs -put /path/to/lightgbm4j-3.3.1.jar /lib/
hdfs dfs -put /path/to/scala-library-2.12.10.jar /lib/spark-submit \--master yarn \--deploy-mode cluster \--jars hdfs:///lib/lightgbm4j-spark-3.3.1.jar,hdfs:///lib/lightgbm4j-3.3.1.jar,hdfs:///lib/scala-library-2.12.10.jar \your_script.py
  1. Python 代码示例
    在 Python 代码中,导入模块并使用 LightGBM 的 Spark 接口:
from pyspark.sql import SparkSession
from lightgbm4j.spark import LightGBMClassifierspark = SparkSession.builder \.appName("LightGBMOnSpark") \.getOrCreate()# 假设data是一个包含特征和标签的DataFrame
data = spark.read.csv("your_data.csv", header=True, inferSchema=True)
feature_cols = [col for col in data.columns if col != "label"]
label_col = "label"# 创建LightGBM分类器
params = {"objective": "binary","num_leaves": 31,"learning_rate": 0.05,"feature_fraction": 0.9
}
model = LightGBMClassifier(params=params, num_round=10)
# 拟合模型
model.fit(data, label_col=label_col, features_col=feature_cols)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/88432.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/88432.shtml
英文地址,请注明出处:http://en.pswp.cn/web/88432.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

和鲸社区深度学习基础训练营2025年关卡2(1)纯numpy

拟分3种实现方法:1.纯numpy2.sklearn中的MLPClassifier3.pytorch题目: 在 MNIST 数据集上训练 MLP 模型并比较不同的激活函数和优化算法任务描述:使用 MNIST 数据集中的前 20,000 个样本训练一个多层感知机 (MLP) 模型。你需要比较三种不同的…

Sequential Thinking:AI深度思考的新范式及其与CoT、ReAct的对比分析

引言:AI深度思考的演进与Sequential Thinking的崛起在人工智能技术快速发展的今天,AI模型的思考能力正经历着从简单应答到深度推理的革命性转变。这一演进过程不仅反映了技术本身的进步,更体现了人类对机器智能认知边界的持续探索。早期的大语…

云原生详解:构建现代化应用的未来

引言 在数字化转型的浪潮中,"云原生"已成为技术领域最热门的话题之一。从初创公司到全球500强企业,都在积极探索云原生技术以提升业务敏捷性和创新能力。本文将全面解析云原生的概念、核心技术、优势以及实践路径,帮助您深入理解这一改变IT格局的技术范式。 什么…

SSE事件流简单示例

文章目录1、推送-SseEmitter2、接收-EventSourceListenerSSE(Server-Sent Events,服务器推送事件)是一种基于HTTP的服务器向客户端实时推送数据的技术标准。1、推送-SseEmitter SseEmitter用于实现服务器向客户端单向、长连接的实时数据推送…

Elasticsearch RESTful API入门:基础搜索与查询DSL

Elasticsearch RESTful API入门:基础搜索与查询DSL 本文为Elasticsearch初学者详细解析RESTful API的核心操作与查询DSL语法,包含大量实战示例及最佳实践。 一、Elasticsearch与RESTful API简介 Elasticsearch(ES)作为分布式搜索…

(六)复习(OutBox Message)

文章目录 项目地址一、OutBox Message1.1 OutBox表配置1. OutBoxMessage类2. OutboxMessage表配置3. 给每个模块生成outboxmessage表1.2 发布OutBox Message1. 修改Intercepotor2. 配置Quartz3. 创建Quatz方法发布领域事件4. 创建Quatz定时任务5. 注册Quatz服务和配置6. 流程梳…

STM32-ADC内部温度

在通道16无引脚(测量温度不准确)跟ADC代码差不多;不需要使能引脚时钟;将内部温度测量打开/*** brief 启用或禁用温度传感器和内部参考电压功能* param NewState: 新的功能状态,取值为ENABLE或DISABLE* retval 无* no…

「Linux命令基础」文本模式系统关闭与重启

关机重启基本命令 直接拔掉计算机电源可能损坏内部元件;Linux系统通过命令关闭计算机则是安全流程,让所有程序有机会保存数据、释放资源。 关机命令:shutdown Linux系统提供了多种用于关闭或重启系统的命令,其中 shutdown 是最常用的一种,它可以安全地通知用户系统即将…

射频信号(大宽高比)时频图目标检测anchors配置

一、大宽高比目标YOLO检测参数设置 这是yolov7的一个label的txt文件: 1 0.500 0.201 1.000 0.091 2 0.500 0.402 1.000 0.150 3 0.500 0.604 1.000 0.093 0 0.500 0.804 1.000 0.217 对应的样本: 长宽比分别是:1/0.09110.98, 1/0.1506.67…

OpenStack 鉴权服务介绍.md

引言 OpenStack是一个开源的云计算管理平台,其中的Keystone组件承担了身份认证和授权的关键任务。Keystone的主要功能包括管理用户及其权限、维护OpenStack Services的Endpoint,以及实现认证(Authentication)和鉴权(Au…

Linux_3:进程间通信

IPC1.什么是IPC?Inter Process Communication2.进程间通信常用的几种方式1,管道通信:有名管道,无名管道2,信号- 系统开销小3,消息队列-内核的链表4,信号量-计数器5,共享内存6&#x…

【Springboot】Bean解释

在 Spring Boot 中,Bean 就像是你餐厅里的一名员工。比如,你有一名服务员(Service)、一名厨师(Chef)和一名收银员(Cashier)。这些员工都是餐厅正常运转所必需的,他们各自…

axios的post请求,数据为什么要用qs处理?什么时候不用?

为什么使用 qs 处理 POST 数据axios 的 POST 请求默认将 JavaScript 对象序列化为 JSON 格式(Content-Type: application/json)。但某些后端接口(尤其是传统表单提交)要求数据以 application/x-www-form-urlencoded 格式传输&…

【unitrix】 4.21 类型级二进制数基本结构体(types.rs)

一、源码 这段代码定义了一个类型级数值系统的 Rust 实现,主要用于在编译时表示和操作各种数值类型。 use crate::sealed::Sealed; use crate::number::{NonZero, TypedInt, Unsigned, Primitive}; // // 特殊浮点值枚举 ///// 特殊浮点值(NaN/∞&#x…

UI前端与数字孪生结合实践案例:智慧零售的库存管理优化系统

hello宝子们...我们是艾斯视觉擅长ui设计和前端数字孪生、大数据、三维建模、三维动画10年经验!希望我的分享能帮助到您!如需帮助可以评论关注私信我们一起探讨!致敬感谢感恩!一、引言:数字孪生重构零售库存的 “人 - 货 - 场” 协同在零售行业利润率持续承压的背景…

【Freertos实战】零基础制作基于stm32的物联网温湿度检测(教程非常简易)持续更新中.........

本次记录采用Freertos的第二个DIY作品,基于Onenet的物联网温湿度检测系统,此次代码依然是全部开源。通过网盘分享的文件:物联网温湿度检测.rar 链接: https://pan.baidu.com/s/1uj9UURVtGE6ZB6OsL2W8lw?pwdqm2e 提取码: qm2e 大家也可以看看…

Matplotlib-多图布局与网格显示

Matplotlib-多图布局与网格显示一、多图布局的核心组件二、基础布局:plt.subplots()快速创建网格1. 均等分网格2. 不等分网格(指定比例)三、进阶布局:GridSpec实现复杂嵌套1. 跨行列布局2. 嵌套GridSpec四、实用技巧:布…

GitHub上优秀的开源播放器项目介绍及优劣对比

ExoPlayer 项目地址:https://github.com/google/ExoPlayer 特点: 由Google开发,支持广泛的视频格式和流媒体传输协议,如DASH、HLS、SmoothStreaming。 提供灵活的媒体源架构和高级特性,如动态自适应流播放。 开发者可以轻松扩展和定制播放器组件,适应特定需求。 优点: 功…

react打包发到线上报错Minified React error #130

开发过程中遇到一个问题,记录一下 本地打包发布正常,发测试环境正常,可是通过Jenkins打包发布线上报错 报错信息 index-67fbbd81.js:39 Error: Minified React error #130; visit https://reactjs.org/docs/error-decoder.html?invariant130…

微服务项目远程调用时的负载均衡是如何实现的?

负载均衡概述 负载均衡是微服务架构中的核心组件,用于将请求合理分配到多个服务实例上,提高系统的可用性和性能。负载均衡的分类 负载均衡大致可以分为两类 1. 服务端负载均衡 实现位置:独立部署的负载均衡服务器(位于客户端和服务…