背景:一张上百亿行的hive表,只有id和app两列,其中app的去重量是8w多个(原app有上百万枚举值,此处已经用id数量进行过筛选,只留下有一定规模的app),id的去重量大概有八九亿,最终希望生成pid和对应app的稀疏向量。

我们使用pyspark来实现:

# 处理app特征,生成id,app和app对应的稀疏向量
import time
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler,StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.linalg import VectorUDT  # 新增导入
from pyspark.ml.functions import vector_to_array  # 新增关键导入
import sys
import os# 配置环境变量,否则报错python3找不到
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf", "/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \.config("spark.driver.memory", "48g")\.config("spark.driver.maxResultSize", "16g")\.appName("test_dj")\.enableHiveSupport()\.getOrCreate()# 1. 从Hive读取数据
df = spark.sql("SELECT id,app FROM database.table")# 2. & 3. 定义特征转换管道(Pipeline)
# 步骤1:将app字符串转换为数值索引
indexer = StringIndexer(inputCol="app", outputCol="app_index")
# 步骤2:将索引进行One-Hot编码,输出为稀疏向量
encoder = OneHotEncoder(inputCol="app_index", outputCol="app_ohe_vector")# 将两个步骤组合成一个管道
pipeline = Pipeline(stages=[indexer, encoder])# 拟合数据并转换
model = pipeline.fit(df)
result = model.transform(df)# 查看结果(可选)
# result.select("id", "app", "app_ohe_vector").show(truncate=False)# 4. 将结果保存回HDFS(例如Parquet格式)
# result.select("id", "app_ohe_vector").write \
#     .mode("overwrite") \
#     .parquet("/path/to/your/output/onehot_result.parquet")# 需要跑6小时,表非常大 338亿数据
result.createOrReplaceTempView("temp_view")
spark.sql("CREATE TABLE database.app_vec AS SELECT * FROM temp_view")# 停止SparkSession
spark.stop()

此方案的优点:

高效:​​ Spark是专为大规模数据处理设计的,性能远超Hive UDF。

节省空间:​​ 输出是稀疏向量,8万个类别中每个用户只有少量app,向量中大部分是0,稀疏表示非常紧凑。

标准化:​​ 这是ML领域处理类别特征的标准流程,与后续的Spark MLlib机器学习库无缝集成。

此方案生成的结果数据示例如下:

id

app

app_index

app_ohe_vector

1001

微信

0

(0,80000, [0], [1.0])

1001

王者荣耀

79999

(0,80000, [79999], [1.0])

1002

淘宝

1

(0,80000, [1], [1.0])

在hive表中,app_ohe_vector的格式为row("type" tinyint, "size" integer, "indices" array(integer), "values" array(double))。

app_ohe_vector的结构是Spark ML的标准格式:

  • 0: 向量类型(0=稀疏向量,1=密集向量)
  • 80000: 向量总长度(即app总数)
  • [0, 1, 2, 3, ...]: 非零元素的索引位置
  • [1.0, 1.0, 1.0, ...]: 对应索引位置的值

接下来我们对id进行聚合,同样使用pyspark来实现:

# 处理app特征,按id聚合app对应的稀疏向量
import time
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import expr, col, collect_list, udf, first, size,struct
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, StructType, StructField
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler,StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors,VectorUDT  # 新增导入
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array  # 新增关键导入
from pyspark import StorageLevel
import json
import sys
import os# 配置环境变量,否则报错python3找不到
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf", "/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \.config("spark.sql.shuffle.partitions", "5000") \.config("spark.driver.memory", "48g")\.config("spark.driver.maxResultSize", "16g")\.appName("test_dj")\.enableHiveSupport()\.getOrCreate()for i in ['0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f']:# 1. 从Hive读取数据print(i)sql_idapp = '''select id,app_ohe_vector from database.app_vec_par where flag = '{fflag}\''''.format(fflag=i)df = spark.sql(sql_idapp)# 打印数据概览total_count = df.count()print(f"数据总量: {total_count:,}")    # 高效UDAF聚合函数(针对单元素向量优化)def merge_sparse_vectors(vectors):"""高效合并稀疏向量,针对单元素向量优化"""if not vectors:return {"type": 0, "size": 0, "indices": [], "values": []}# 获取向量尺寸(假设所有向量尺寸相同)size_val = vectors[0]["size"]# 使用字典高效聚合value_dict = {}for vec in vectors:# 直接访问第一个(也是唯一一个)索引和值idx = vec["indices"][0]val = vec["values"][0]# 使用get方法避免两次字典查找value_dict[idx] = value_dict.get(idx, 0.0) + val# 提取并排序索引sorted_indices = sorted(value_dict.keys())sorted_values = [value_dict[i] for i in sorted_indices]return {"type": 0, "size": size_val, "indices": sorted_indices, "values": sorted_values}# 注册UDAFmerge_sparse_vectors_udf = udf(merge_sparse_vectors,StructType([StructField("type", IntegerType()),StructField("size", IntegerType()),StructField("indices", ArrayType(IntegerType())),StructField("values", ArrayType(DoubleType()))]))# 数据预处理:过滤无效记录并重新分区print("开始数据预处理...")cleaned_df = df.filter((col("app_ohe_vector").isNotNull()) & (size(col("app_ohe_vector.indices")) > 0)).repartition(5000, "id")  # 增加分区数处理数据倾斜# 释放原始DF内存df.unpersist()# 两阶段聚合策略(处理数据倾斜)print("开始第一阶段聚合(按id和索引分组)...")# 步骤1: 提取每个向量的索引和值expanded_df = cleaned_df.select("id",col("app_ohe_vector.indices")[0].alias("index"),col("app_ohe_vector.values")[0].alias("value"),col("app_ohe_vector.size").alias("size"))# 步骤2: 按(id, index)分组求和intermediate_df = expanded_df.groupBy("id", "index").agg(expr("sum(value)").alias("sum_value"),first("size").alias("size"))# 步骤3: 按id分组,收集所有(index, sum_value)对print("开始第二阶段聚合(按id分组)...")grouped_df = intermediate_df.groupBy("id").agg(collect_list(struct("index", "sum_value")).alias("index_value_pairs"),first("size").alias("size"))# 步骤4: 转换为稀疏向量格式def pairs_to_sparse_vector(pairs, size_val):"""将(index, value)对列表转换为稀疏向量"""if not pairs:return {"type": 0, "size": size_val, "indices": [], "values": []}# 提取索引和值indices = [p["index"] for p in pairs]values = [p["sum_value"] for p in pairs]# 排序(如果需要)sorted_indices = sorted(indices)sorted_values = [values[indices.index(i)] for i in sorted_indices]return {"type": 0, "size": size_val, "indices": sorted_indices, "values": sorted_values}pairs_to_sparse_vector_udf = udf(pairs_to_sparse_vector,StructType([StructField("type", IntegerType()),StructField("size", IntegerType()),StructField("indices", ArrayType(IntegerType())),StructField("values", ArrayType(DoubleType()))]))# 生成最终结果result = grouped_df.withColumn("merged_vector",pairs_to_sparse_vector_udf("index_value_pairs", "size")).select("id", "merged_vector")print("开始第三阶段数据插入...")# 创建临时视图result.createOrReplaceTempView("sparse_matrix_result")res_sql='''INSERT into TABLE database.app_vecagg_res PARTITION(flag='{fflag}')SELECT id,merged_vector from sparse_matrix_result'''.format(fflag=i)spark.sql(res_sql)print("数据插入完成")# 停止SparkSession
spark.stop()

此处因为原表有300亿+数据,集群性能有限无法一次性处理,所以我将id进行了分区,然后循环分区进行的聚合。

聚合后的结果数据示例如下:

id

merged_vector

1001

(0,80000, [0,79999], [1.0,1.0])

merged_vector的结构是Spark ML的标准格式:

  • 0: 向量类型(0=稀疏向量,1=密集向量)
  • 80000: 向量总长度(即app总数)
  • [0, 1, 2, 3, ...]: 非零元素的索引位置
  • [1.0, 1.0, 1.0, ...]: 对应索引位置的值

Spark ML的算法设计时就已经考虑了这种向量格式,所有内置算法都能正确处理这种结构:

  1. 算法兼容性​:Spark ML的所有分类、回归、聚类算法都接受这种格式的向量
  2. 性能优化​:稀疏向量格式在内存使用和计算效率上都有优化
  3. 内置支持​:Spark ML的VectorAssembler、特征变换器等都能处理这种格式

至此我们就可以将此向量作为特征用于后续的建模操作了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/95961.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/95961.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/95961.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【设计模式】关于学习《重学Java设计模式》的一些成长笔记

【设计模式】关于学习《重学Java设计模式》的一些成长笔记 没有几个人是一说就会的,掌握一些技能,不仅要用心,而且还需要从温故中知新。 为此,好记性不如烂笔头,我干脆一步一脚印地系统学习一遍设计模式! (关注不迷路哈!!!) 文章目录 【设计模式】关于学习《重学Jav…

【基础-判断】@Entry装饰的自定义组件将作为页面的入口。在单个页面中可以使用多个@Entry装饰不同自定义组件。

@Entry装饰的自定义组件将作为页面的入口。在单个页面中可以使用多个@Entry装饰不同自定义组件。 解释: @Entry 的核心作用与唯一性:@Entry 装饰器用于明确声明该组件是一个页面的入口组件,即整个页面的“根”和“起点”。当UIAbility实例加载并显示页面时,系统需要明确知道…

医学影像AI应用-实践:使用MONAI实现肺部CT图像分割的原理与实践

🧑 博主简介:CSDN博客专家、CSDN平台优质创作者,高级开发工程师,数学专业,10年以上C/C++, C#,Java等多种编程语言开发经验,拥有高级工程师证书;擅长C/C++、C#等开发语言,熟悉Java常用开发技术,能熟练应用常用数据库SQL server,Oracle,mysql,postgresql等进行开发应用…

如何训练一个简单的Transformer模型(附源码)李宏毅2025大模型-作业4

摘要:一、作业目标:使用只有2层transformer的GPT-2,生成完整宝可梦图像。二、源码&解析:使用提供的Transformer模型(GPT-2)进行训练,FID Score: 96.3425一、作业目标1)目标使用T…

leetcode211.添加与搜索单词-数据结构设计

与208.前缀树的设计是一样的,关键点在于word中存在通配符“.",所以针对该特殊情况,在search时针对这里进行全子节点的深度搜索class WordDictionary {TrieNode root;private class TrieNode {char val;// 当前节点的值,冗余了…

项目中的一些比较实用的自定义控件

本文是记录项目开发中一些相对复杂但都比较实用的控件,这些控件都是基于自定义的方式去实现,如果有需要的朋友,这个可以作为一个参考,同时也做一个自我总结。 (1)子项大小不一致的RecyclerView(…

[iOS] 折叠 cell

目录 前言 1.原理 2.折叠 cell 的点击选中 3.折叠 cell 高度的变化 4.实现效果 5.总结 前言 折叠 cell 是在 3GShare 中写过的一个小控件,这篇博客是一个小小的总结。 1.原理 在这里的核心就是我们可以通过改变按钮的 tag 值来判断我们是否应该展开还是回收…

MySQL的组复制(MGR)高可用集群搭建

一、MySQL 组复制(MGR)核心概念 MySQL Group Replication(简称 MGR)是 MySQL 官方推出的 高可用(HA) 强一致性 解决方案,基于改进的 Paxos 协议实现,核心能力可概括为 3 点&#xf…

使用Shell脚本实现Linux系统资源监控邮件告警

前言 1. 问题背景与需求 2. Bash 脚本监控资源 3. Bash 脚本判断阈值 4. 配置 msmtp 发送邮件 4.1 安装 msmtp 4.2 创建配置文件 /etc/msmtprc 5. 发送邮件 5.1 给别人发邮件 6. 完整示例脚本 7. 测试方法 8. 常见问题解答 9. 总结 前言 在运维过程中&#xff0c…

设计整体 的 序分(三“释”)、正宗分(双“门”)和流通分(统一的通行表达式) 之3 “自明性”(腾讯元宝 之2)

Q&AQ11、可能还需要补充 魂轴、体轴 和 中心轴 并行 上升 的内容Q11.1、我刚才说“可能还需要补充 魂轴、体轴 和 中心轴 并行 上升 的内容” 是指的 我们今天前面讨论 得出的整体设计 的一个概念整体 的一个双螺旋上升结构中的三个轴。 您刚才是这样 理解的吗?…

使用Ansible自动化部署Hadoop集群(含源码)--环境准备

现在我们有5台虚拟机,已经配置好了主机名和网络我们的目标是通过Ansible实现自动化部署hadoop集群。在此之前,我们先编写一个shell脚本来配置hadoop集群的环境,包括安装软件、安装配置Ansible(一个主节点四个工作节点)…

C#海康车牌识别实战指南带源码

C#海康车牌识别实战指南带源码前言车牌识别技术在智能交通、停车场管理等领域有着广泛的应用。海康威视作为国内领先的安防厂商,其车牌识别相机提供了丰富的SDK接口供开发者使用。本文将详细介绍如何使用C#语言结合海康威视SDK实现车牌识别功能,并解析关…

智慧能源新范式:数字孪生平台如何驱动风电场的精细化管理?

摘要你有没有想过,一座风力发电场背后,藏着一个“看不见的孪生兄弟”?它能提前预知风机故障,实时模拟极端天气的影响,甚至能“训练”运维人员在虚拟场景中演练抢修。这就是数字孪生——一个让风电场从“靠经验管理”转…

STM32-FreeRTOS操作系统-任务管理

引言 随着嵌入式技术的飞速发展,STM32与FreeRTOS的融合愈发紧密。本文聚焦于STM32平台下FreeRTOS操作系统的任务管理,旨在为开发者提供清晰的思路与实用的技巧,助力高效开发。 为什么要进行任务管理? 在嵌入式系统中,…

工业领域 ACP 协议全解析:从入门到实战案例

工业领域 ACP 协议全解析:从入门到实战案例 文章目录工业领域 ACP 协议全解析:从入门到实战案例一、前言二、ACP 协议是什么?1. 基本定义2. 与数据传输协议的区别三、ACP 协议的核心功能1. 身份认证(Authentication)2.…

计算机组成原理:计算机硬件的基本组成

📌目录🖥️ 计算机硬件的基本组成:从经典到现代的架构演进🧩 一、计算机硬件的五大部分:功能与协同📥 (一)输入设备:人机交互的“入口”📤 (二&am…

AI歌手功能终于上线!Suno AI 带你保存歌曲的灵魂

当我们谈论一首歌时,究竟是什么让它“独一无二”?是主唱的声音质感?是旋律里的氛围?还是那种无法复制的风格气息? 如今,Suno AI 给出了答案—— AI歌手功能正式上线! 🌟什么是「AI…

Dubbo3.3 Triple协议处理东西向流量

前言 Apache Dubbo 3.3 对 Triple 协议做了升级,现在 Dubbo 不仅可以处理东西向流量,也可以处理南北向流量。 **东西向流量(East-West Traffic) ** 指数据中心或网络内部同级设备/服务之间的通信。例如,微服务之间的…

操作系统核心特点详解:从并发到分布式,一文搞懂考研必备知识

操作系统核心特点详解:从并发到分布式,一文搞懂考研必备知识 大家好,今天咱们来聊聊操作系统(OS)这个计算机世界的“大管家”。想象一下,你的电脑就像一个忙碌的厨房,操作系统就是那个厨师长&am…

2025精选5款AI视频转文字工具,高效转录秒变文字!

视频转文本的需求早已渗透到生活的方方面面:网课学习需要提取课件台词、会议记录想快速整理要点、追剧时急需生肉转字幕…… 手动记录不仅费时,还容易遗漏关键信息。今天就分享5款实用工具,从免费到专业全覆盖,几步操作就能让视频…