在现代数据工程中,自动化和监控是确保数据管道高效运行的关键因素。Dagster作为一款强大的数据编排工具,提供了多种方式来实现这些目标。本文将深入探讨如何使用Dagster Pipes修改外部代码,以实现日志记录、结构化元数据报告以及资产检查等功能。

什么是Dagster Pipes?

Dagster Pipes是Dagster提供的一种机制,允许你在Dagster之外运行的代码与Dagster内部的工作流进行交互。通过Dagster Pipes,你可以将现有的脚本或应用程序集成到Dagster的数据管道中,并实现信息的双向流动。这不仅提高了代码的复用性,还增强了管道的可监控性和可维护性。

在这里插入图片描述

修改外部代码的步骤

假设我们有一个独立的Python脚本external_code.py,我们希望将其与Dagster集成,并实现日志记录和结构化元数据的报告。同时,我们还有一个Dagster定义文件dagster_code.py,其中包含了一个Dagster资产和其他相关定义。

步骤1:在外部代码中引入Dagster上下文

首先,我们需要在external_code.py中引入Dagster Pipes的相关模块,并初始化Dagster Pipes上下文。这可以通过调用open_dagster_pipes()函数来实现,该函数会返回一个上下文管理器,用于管理Dagster Pipes连接的生命周期。

from dagster_pipes import PipesContext, open_dagster_pipes
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:print(f"processing total {total_orders} orders")

步骤2:发送日志消息到Dagster

接下来,我们可以使用context.log方法将日志消息发送回Dagster。这比直接打印到标准输出更加灵活,因为日志消息可以在Dagster UI中进行过滤和查看。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")

在Dagster UI的Run details页面中,你可以通过选择日志级别来过滤出info级别的日志消息。
在这里插入图片描述

步骤3:发送结构化元数据到Dagster

除了日志消息,我们还可以发送结构化元数据到Dagster。这对于报告资产的状态、数据质量检查结果等信息非常有用。

报告资产物化

我们可以使用context.report_asset_materialization方法来报告资产物化的元数据。例如,我们可以报告处理的总订单数。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})
报告资产检查

如果我们的资产有定义数据质量检查,我们还可以通过context.report_asset_check方法来报告检查的结果。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)

在Dagster UI中,你可以在Asset Details页面的Events和Checks标签页中查看这些事件和检查结果。
在这里插入图片描述

完整代码示例

外部代码 external_code.py

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)

Dagster代码 dagster_code.py

import shutil
import dagster as dg
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 获取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 报告数据质量检查结果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)@dg.asset(check_specs=[dg.AssetCheckSpec(name="no_empty_order_check", asset="subprocess_asset")],
)
def subprocess_asset(context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
):cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py"),]return pipes_subprocess_client.run(command=cmd, context=context).get_materialize_result()defs = dg.Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)

总结

通过上述步骤,我们成功地将一个独立的Python脚本与Dagster集成,并实现了日志记录和结构化元数据的报告。这不仅提高了代码的可维护性,还增强了数据管道的监控能力。你可以进一步探索Dagster Pipes的其他功能,如自定义协议和与其他系统的集成,以满足更复杂的需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/80722.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/80722.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/80722.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++类和对象进阶 —— 与数据结构的结合

🎁个人主页:工藤新一 🔍系列专栏:C面向对象(类和对象篇) 🌟心中的天空之城,终会照亮我前方的路 🎉欢迎大家点赞👍评论📝收藏⭐文章 文章目录 […

Java中进阶并发编程

第一章、并发编程的挑战 并发和并行:指多线程或多进程 线程的本质:操作系统能够进行运算调度的最小单位,是进程(Process)中的实际工作单元 进程的本质:操作系统进行资源分配和调度的基本单位&#xff0c…

《 指针变量类型与内存访问:揭秘背后的奥秘》

🚀个人主页:BabyZZの秘密日记 📖收入专栏:C语言 🌍文章目入 一、指针变量类型的基本概念二、指针类型与内存访问字节数的关系(一)整型指针(二)字符型指针(三&…

mapbox进阶,使用mapbox-plugins插件加载饼状图

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:mapbox 从入门到精通 文章目录 一、🍀前言1.1 ☘️mapboxgl.Map 地图对象1.1 ☘️mapboxgl.Map style属性二、🍀使用mapbox-plugins插件加载饼状图1. ☘…

GraphicLayer与BusineDataLayer层级控制

补充说明: 当参与层级控制的元素是点型元素时,是无法参与ZIndex层级控制的,此时可以换个解决方案 1.给不同的高度值实现,元素间的层级控制覆盖 import * as mars3d from "mars3d"export let map // mars3d.Map三维地…

uniapp 百家云直播插件打包失败

打包错误日志 Android自有证书 打包失败 错误日志: https://app.liuyingyong.cn/build/errorLog/cf41a610-effe-11ef-88db-05262d4c3e5d原因:需要导入插件依赖 依赖地址:https://ext.dcloud.net.cn/plugin?id16289 百家云直播插件地址 直播插…

【C++】”如虎添翼“:模板初阶

泛型编程: C中一种使用模板来实现代码重用和类型安全的编程范式。它允许程序员编写与数据类型无关的代码,从而可以用相同的代码逻辑处理不同的数据类型。模板是泛型编程的基础 模板分为两类: 函数模板:代表了一个函数家族&#x…

十五、多态与虚函数

十五、多态与虚函数 15.1 引言 面向对象编程的基本特征:数据抽象(封装)、继承、多态基于对象:我们创建类和对象,并向这些对象发送消息多态(Polymorphism):指的是相同的接口、不同的…

点云特征提取的两大经典范式:Voxel-based 与 Pillar-based

点云特征提取的两大经典范式:Voxel-based 与 Pillar-based 在点云处理领域,尤其是针对 3D 目标检测任务,特征提取是核心环节之一。目前,Voxel-based(体素化)和 Pillar-based(柱状化&#xff09…

前苹果首席设计官回顾了其在苹果的设计生涯、公司文化、标志性产品的背后故事

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

web 自动化之 selenium 元素四大操作三大切换等待

文章目录 一、元素的四大操作二、三大切换&等待1、切换窗口:当定位的元素不在当前窗口,则需要切换窗口2、切换iframe:当定位的元素在frame/iframe,则需要切换3、切换弹出窗口 一、元素的四大操作 1、输入 2、点击 3、获取文本 4、获取属…

window server 2012安装sql server2008 r2

执行sql server2008 r2安装目录下的setup 选择运行程序而不获取帮助 然后就是让人绝望的 只能先搞这个了,F*微软,自家软件不让正常安装 打开服务器管理器->添加角色和功能->选择Web 服务(IIS)->添加.NET Framework3.5 然…

【K8S学习之生命周期钩子】详细了解 postStart 和 preStop 生命周期钩子

0. 参考 Kubernetes容器生命周期 —— 钩子函数详解(postStart、preStop) - 人艰不拆_zmc - 博客园详解Kubernetes Pod优雅退出 - 人艰不拆_zmc - 博客园 1. Kubernetes 生命周期钩子概述 在 Kubernetes 中,生命周期钩子(Lifec…

测试文章标题01

模型上下文协议(Model Context Protocol, MCP)深度解析 一、MCP的核心概念 模型上下文协议(Model Context Protocol, MCP)是一种用于规范机器学习模型与外部环境交互的标准化框架。其核心目标是通过定义统一的接口和数据格式&am…

kubuntu系统详解

Kubuntu 系统深度解析(从系统架构到用户体验) 一、定位与核心特性 Kubuntu 是 Ubuntu 的官方 KDE 衍生版,基于 Ubuntu 的稳定底层(Debian 技术栈),搭载 KDE Plasma 桌面环境,主打 “功能丰富、…

cURL:通过URL传输数据的命令行工具库介绍

文章目录 1. 什么是 curl?2. 下载与安装 curl3. curl 的常见用法3.1 获取网页内容3.2 下载文件3.3 发送 POST 请求(带表单数据)3.4 发送带 JSON 的 POST 请求 1. 什么是 curl? cURL(CommandLine URL)是非常…

从零搭建AI工作站:Gemma3大模型本地部署+WebUI配置全套方案

文章目录 前言1. 安装Ollama2.Gemma3模型安装与运行3. 安装Open WebUI图形化界面3.1 Open WebUI安装运行3.2 添加模型3.3 多模态测试 4. 安装内网穿透工具5. 配置固定公网地址总结 前言 如今各家的AI大模型厮杀得如火如荼,每天都有新的突破。今天我要给大家安利一款…

Element Plus对话框(ElDialog)全面指南:打造灵活弹窗交互

📌 开篇导语 对话框是Web应用中实现用户交互的核心组件之一,常用于信息确认、表单提交或详情展示。Element Plus的ElDialog组件以高扩展性和优雅动效著称,支持高度定制化开发。本文将从基础配置到进阶技巧,手把手教你掌握对话框组…

解决WSL、Ubuntu的.ico图标不正确显示缩略图

解决WSL、Ubuntu的.ico图标不正确显示缩略图 问题描述 Win10系统中由于更新了某些软件,篡改了默认的图像显示软件,导致WSL等软件未能成功显示图标,表现如下: 解决方法 将ico文件的默认打开方式更改为“画图”,如下…

[数据结构高阶]并查集初识、手撕、可以解决哪类问题?

标题:[数据结构高阶]并查集初识、手撕、可以解决哪类问题? 水墨不写bug 文章目录 一、认识并查集二、模拟实现并查集三、用并查集解决问题1、[省份的数量](https://leetcode.cn/problems/number-of-provinces/)2、[等式方程的可满足性](https://leetcode…