正如ETL这个概念本身所指示的,数据库读写访问是ETL的最常用甚至是最主要的操作。现代信息系统的设计与运行基本都是围绕数据库展开的,很多应用的核心功能都是对数据库的CRUD(创建、检索、更新、删除)操作。

SmartETL框架设计之初就考虑到了这个情况,在早期就根据团队的技术栈,实现了对MongoDBMySQLElasticSearchClickHouse等数据库的Extract操作(即Loader组件)和Load操作(即Processor组件)。具体来说,是在wikidata_filter.loader模块和wikidata_filter.iterator模块下分别创建了名为database的子模块,分别实现了相应的数据库组件。

ElasticSearch全文索引数据库操作为例,为了实现将数据写入ES,即建立ES全文索引,框架提供了wikidata_filter.iterator.database.elasticsearch.ESWriter组件,提供基于批量模式将一组JSON对象写入ES中。代码如下:

class ESWriter(BufferedWriter):"""数据写入ES索引中"""def __init__(self, host="localhost",port=9200,username=None,password=None,index=None,buffer_size=1000, **kwargs):super().__init__(buffer_size=buffer_size)self.url = f"http://{host}:{port}"if password:self.auth = (username, password)else:self.auth = Noneself.index_name = indexdef write_batch(self, rows: list):header = {"Content-Type":  "application/json"}lines = []for row in rows:action_row = {}for key in id_keys:if key in row:action_row["_id"] = row.pop(key)break# row_meta = json.dumps({"index": action_row})row_meta = json.dumps({"index": action_row})try:row_data = json.dumps(row)lines.append(row_meta)lines.append(row_data)except:passbody = '\n'.join(lines)body += '\n'print(f"{self.url}/{self.index_name} bulk")res = requests.post(f'{self.url}/{self.index_name}/_bulk', data=body, headers=header, auth=self.auth)if res.status_code != 200:print("Warning, ES bulk load failed:", res.text)return Falsereturn True

需要注意,为了提高写ES的效率,ESWriter并不是直接实现JsonIterator,而是继承自BufferedWriter,通过重写write_batch方法,基于ES的bulk接口,实现了批量写入ES。

类似的,为了实现从ES读取数据,可以基于ES检索接口(POST /{index}/_search)或Scroll接口(POST /{index}/_search/scroll)实现检索Loader组件或Scroll模式的Loader组件;为了实现ES数据删除,基于ES删除接口(DELETE /{index}/_doc/{id})实现删除Processor组件;为了判断数据是否存在,基于ES详情接口(GET /{index}/_doc/{id})实现是否存在的Processor组件;……

这种方式比较简单直观,也很容易实现,但是随着应用中需要集成的数据库种类越来越多、数据操作越来越多样化,我们就会发现,为了实现对数据库的访问操作,需要针对每一类操作开发Loader组件或Processor组件,最后数据库相关操作代码就会分散在多个模块、函数中,不便于组件使用、维护和扩展。

有没有可能设计一套专门的机制,实现数据库操作与流程节点分离,同时根据需要进行绑定?当然可以!

出于这样的目的,本文设计了独立的数据库接口体系,并基于函数式组件机制实现数据库独立接口与流程的松耦合绑定。核心过程包括3步:

首先,定义一套数据库操作接口Database,包括数据库级别的表格列表list_tables、获取表格元数据desc_table等和表(集合、索引)级别的写入upsert、扫描scroll、检索search、获取详情get、是否存在exists、删除delete等。这类操作可以根据业务需要就行扩展。类图如下所示:
数据库接口类图
注意,Database接口函数通过使用命名参数,这些参数可以通过SmartETLYAML流程进行配置,方便传递特定数据库的配置。

第二,根据实际需要的数据库类型,实现对应的Database类。事实上,这就是“桥接模式”的应用,将数据库SDK提供的接口转换为本项目的Database接口。以下是一个ElasticSearch实现类(基于ES-HTTP接口)的示例代码:

import json
import requests
from requests.auth import HTTPBasicAuth
from .base import Databaseid_keys = ["_id", "id", "mongo_id"]headers = {'Content-Type': 'application/json','Accept': 'application/json'
}
class ES(Database):"""读取ES指定索引全部数据,支持提供查询条件"""def __init__(self, host: str = "localhost",port: int = 9200,username: str = None,password: str = None,index: str = None,secure: bool = False,**kwargs):self.url = f"{'https' if secure else 'http'}://{host}:{port}"if password:self.auth = HTTPBasicAuth(username, password)else:self.auth = Noneself.index = indexdef search(self, query: dict = None,query_body: dict = None,fetch_size: int = 10,index: str = None,**kwargs):index = index or self.indexquery_body = query_body or {}if query:query_body['query'] = queryelif 'query' not in query_body:query_body['query'] = {"match_all": {}}if 'size' not in query_body:query_body['size'] = fetch_sizeprint("ES search query_body:", query_body)res = requests.post(f'{self.url}/{index}/_search', auth=self.auth, json=query_body, **kwargs)if res.status_code != 200:print("Error:", res.text)returnres = res.json()if 'hits' not in res or 'hits' not in res['hits']:print('ERROR', res)returnhits = res['hits']['hits']for hit in hits:# print(hit)doc = hit.get('_source') or {}doc['_id'] = hit['_id']doc['_score'] = hit['_score']if 'fields' in hit:doc.update(hit['fields'])yield docdef scroll(self, query: dict = None,query_body: dict = None,batch_size: int = 10,fetch_size: int = 10000,index: str = None,_scroll: str = "1m",**kwargs):index = index or self.indexquery_body = query_body or {}if query:query_body['query'] = queryelif 'query' not in query_body:query_body['query'] = {"match_all": {}}if 'size' not in query_body:query_body['size'] = batch_sizeprint("ES scroll query_body:", query_body)scroll_id = Nonetotal = 0while True:if scroll_id:# 后续请求url = f'{self.url}/_search/scroll'res = requests.post(url, auth=self.auth, json={'scroll': _scroll, 'scroll_id': scroll_id}, **kwargs)else:# 第一次请求 scrollurl = f'{self.url}/{index}/_search?scroll={_scroll}'res = requests.post(url, auth=self.auth, json=query_body, **kwargs)if res.status_code != 200:print("Error:", res.text)breakres = res.json()if 'hits' not in res or 'hits' not in res['hits']:print('ERROR', res)continueif '_scroll_id' in res:scroll_id = res['_scroll_id']hits = res['hits']['hits']for hit in hits:doc = hit.get('_source') or {}doc['_id'] = hit['_id']yield doctotal += len(hits)if len(hits) < batch_size or 0 < fetch_size <= total:breakif scroll_id:# clear scrollurl = f'{self.url}/_search/scroll'requests.delete(url, auth=self.auth, json={'scroll_id': scroll_id})def exists(self, _id, index: str = None, **kwargs):index = index or self.indexif isinstance(_id, dict):_id = _id.get("_id") or _id.get("id")url = f'{self.url}/{index}/_doc/{_id}?_source=_id'res = requests.get(url, auth=self.auth)if res.status_code == 200:return res.json().get("found") is Truereturn Falsedef delete(self, _id, index: str = None, **kwargs):index = index or self.indexif isinstance(_id, dict):_id = _id.get("_id") or _id.get("id")url = f'{self.url}/{index}/_doc/{_id}'res = requests.delete(url, auth=self.auth)return res.status_code == 200def upsert(self, items: dict or list, index: str = None, **kwargs):index = index or self.indexheader = {"Content-Type": "application/json"}if not isinstance(items, list):items = [items]lines = []for row in items:action_row = {}for key in id_keys:if key in row:action_row["_id"] = row.pop(key)breakrow_meta = json.dumps({"index": action_row})row_data = json.dumps(row)lines.append(row_meta)lines.append(row_data)body = '\n'.join(lines)body += '\n'print(f"{self.url}/{index} bulk")res = requests.post(f'{self.url}/{index}/_bulk', data=body, headers=header, auth=self.auth)if res.status_code != 200:print("Warning, ES bulk load failed:", res.text)return Falsereturn True

SmartETL项目中根据目前业务流程需求,初步实现了ElasticSearchClickHouseMongoDBMySQLPostgreSQLQdrantSQLiteMinIO等8类数据库,类结构图如下所示:
数据库类图结构
第三,定义一组数据库操作的代理函数,将流程的数据库调用需求转发给数据库组件。目前定义为gestata.dbops模块,代码很简单,如下所示:

from wikidata_filter.util.database.base import Databasedef tables(db: Database, *database_list, columns: bool = False):if database_list:for database in database_list:for table in db.list_tables(database):yield {"name": table,"columns": db.desc_table(table, database) if columns else [],"database": database}else:for table in db.list_tables():yield {"name": table,"columns": db.desc_table(table) if columns else []}def search(db: Database, **kwargs):return db.search(**kwargs)def scroll(db: Database, **kwargs):return db.scroll(**kwargs)def upsert(row: dict or list, db: Database, **kwargs):
db.upsert(row, **kwargs)
return rowdef delete(_id, db: Database, **kwargs):return db.delete(_id, **kwargs)def exists(_id, db: Database, **kwargs) -> bool:return db.exists(_id, **kwargs)def get(_id, db: Database, **kwargs):return db.get(_id, **kwargs)

这里需要注意各个函数的返回值,大多数直接返回数据库组件对应方法的执行结果,但upsert返回的是输入参数。这是为什么呢?这样就能够将数据库写入操作作为流程的中间节点,也就是说数据经过入库流程,但没有终止,而是继续流入后续节点中,入下图所示:
入库流程示意
至此,我们完成了将数据库组件与流程节点组件进行解耦的设计。下面来看一个应用案例,通过读取arXiv数据集(来自kaggle,可参考这篇文章),写入ElasticSearch(建立全文索引),流程定义如下:

from: local/db_envs.yamlname: load arXiv meta flow
description: 读取arXiv-meta数据集,写入ES
arguments: 1consts:type_mapping:all: .jsonlnodes:es: util.database.elasticsearch.ES(**es1, index='arxiv-meta-2505')select: SelectVal('data')rename: RenameFields(id='_id')change_id: "Map(lambda s: s.replace('/', ':'), key='_id')"
# 建立ES全文索引write_es: Map('gestata.dbops.upsert', es)loader: ar.Zip(arg1, 'all', type_mapping=type_mapping)
processor: Chain(select, rename, change_id, Buffer(1000), write_es, Count(label=’total-papers’)

源代码及流程定义详见SmartETL项目。需要注意,由于项目持续演化,目前除了Kafka,其他数据库组件都已经按照新的方式完成重构。

总结:本文阐述了SmartETL项目中的数据库与流程解耦的设计,包括动机、目的、设计思路、应用案例。作为软件设计中的一条基本原则,高内聚、松耦合是我们持续追求的目标,也只有好的设计,才能让我们的代码能够易于维护与扩展,从而快速响应业务需求,降低开发成本。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/912215.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/912215.shtml
英文地址,请注明出处:http://en.pswp.cn/news/912215.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【记录解决问题】activiti--sql 转义符设置

一、背景 %、&#xff01;、_在sql查询时需要转义&#xff0c;转义的语法 like %?2% escape ?#{escapeCharacter()}二、activiti转义配置 String wildcardEscapeClause ""; if (this.databaseWildcardEscapeCharacter ! null && this.databaseWildcard…

Unity AR构建维护系统的以AI驱动增强现实知识检索系统

本博客概述了为维护开发的AI驱动增强现实&#xff08;AR&#xff09;知识检索系统的开发过程&#xff0c;该系统集成了Unity用于AR、Python服务器用于后端处理&#xff0c;以及ChatGPT用于自然语言处理。该系统允许维护工人通过AR设备&#xff08;如HoloLens 2&#xff09;查询…

Java面向对象核心:方法值传递与封装机制精讲

文章目录 Java面向对象编程核心笔记一、方法值传递机制1. 基本数据类型传递2. 引用数据类型传递值传递总结 二、面向对象核心概念1. 类与对象关系2. 类定义规范3. 对象创建与使用 三、封装机制详解1. 封装三大要素2. 封装示例&#xff08;GirlFriend类&#xff09;3. 测试类4. …

【Actix Web】构建高性能 Rust API:Actix Web 最佳实践与进阶指南

目录 一、高性能 API 架构设计1.1 系统架构图1.2 核心组件 二、项目初始化与配置2.1 创建项目2.2 添加依赖 (Cargo.toml)2.3 配置文件 (config/default.toml) 三、核心模块实现3.1 应用状态管理 (src/state.rs)3.2 数据模型定义 (src/models.rs) 四、认证与授权系统4.1 JWT 认证…

vue项目中纯前端实现导出pdf文件,不需要后端处理。

在 Vue 项目中&#xff0c;纯前端实现导出 PDF 文件是完全可行的。通常可以借助一些 JavaScript 库来将 HTML 内容或 DOM 元素转换为 PDF 并下载&#xff0c;无需后端参与。 下面介绍几种常用的方案和实现方法&#xff1a; 推荐方案&#xff1a;使用 html2canvas jsPDF 安装…

c++虚拟内存

常见的内存困惑 当你编写C程序时&#xff0c;是否遇到过&#xff1a; vector申请200MB内存&#xff0c;但系统显示只占用20MB&#xff1f;程序在低配机器上崩溃&#xff0c;报出std::bad_alloc但内存显示充裕&#xff1f;遍历数组时特定位置耗时突然增加&#xff1f;相同代码…

领域驱动设计(DDD)【22】之限定建模技术

文章目录 一 限定初识二 限定识别三 限定实现 一 限定初识 一个 员工 可以拥有多份 工作经验&#xff0c;而各个 工作经验 的 时间段 不能相互重叠。可以得出一个推论&#xff1a;对于一个 员工 而言&#xff0c;每个 时间段 只能有一条 工作经验。 UML中第二种表述方式&…

《P6492 [COCI 2010/2011 #6] STEP》

题目描述 给定一个长度为 n 的字符序列 a&#xff0c;初始时序列中全部都是字符 L。 有 q 次修改&#xff0c;每次给定一个 x&#xff0c;若 ax​ 为 L&#xff0c;则将 ax​ 修改成 R&#xff0c;否则将 ax​ 修改成 L。 对于一个只含字符 L&#xff0c;R 的字符串 s&#…

macOS,切换 space 失效,向右切换space(move right a space) 失效

背景 准确来讲&#xff0c;遇到的问题是向右切换space&#xff08;move right a space) 失效&#xff0c;并向左是成功的。 在键盘-快捷键-调度中心中&#xff0c;所有的快捷键均可用&#xff0c;但是“向右移动一个空间”总是失效。 已经检查过不是快捷键冲突的问题&#x…

网飞猫官网入口 - 免费高清影视平台,Netflix一站观看

网飞猫是一个专注于提供丰富影视资源的在线平台&#xff0c;涵盖国内外热门电影、电视剧、动漫、综艺等多种类型。它不仅整合了Netflix的独家内容&#xff0c;还提供了大量高清、蓝光画质的影视作品&#xff0c;支持多语言字幕&#xff0c;满足不同用户的观影需求。网飞猫的界面…

Hyper-v-中的FnOs--飞牛Nas虚拟磁盘扩容(不清除数据)

在Hyper-v下的飞牛Nas要怎么在不删除原有虚拟磁盘数据的情况下扩容呢 OK下面开始教学&#xff08;适用于Basic模式的虚拟磁盘扩容&#xff0c;Linear没试过&#xff09; 先关闭飞牛Nas系统 找到飞牛Nas虚拟机&#xff0c;在设置下SCSI控制器找到要扩容的虚拟磁盘&#xff0c; 点…

掌握 MySQL 的基石:全面解读数据类型及其影响

前言 上篇文章小编讲述了关于MySQL表的DDL操作&#xff0c;在那里我多次使用了MySQL的数据类型&#xff0c;但是我并没有去讲述MySQL的数据类型&#xff0c;想必各位读者已经很好奇MySQL的数据类型都有什么了&#xff0c;今天这篇文章我将会详细的去讲述MySQL的数据类型&#x…

buildadmin 如何制作自己的插件

官方文档指引 提示&#xff1a;若不计划发布到应用市场&#xff0c;可省略图片等非必要功能 参考文档&#xff1a;https://doc.buildadmin.com/senior/module/basicInfo.html 目录 官方文档指引开发说明模块开发流程模块包结构示例安装开发工具 总结 开发说明 目标&#xff…

【数据标注师】关键点标注

目录 一、 **关键点标注的四大核心原则**二、 **五阶能力培养体系**▶ **阶段1&#xff1a;基础认知筑基&#xff08;1-2周&#xff09;**▶ **阶段2&#xff1a;复杂场景处理技能▶ **阶段3&#xff1a;三维空间标注&#xff08;进阶&#xff09;**▶ **阶段4&#xff1a;效率…

创建网站的基本步骤?如何建设自己的网站?

创建网站是一个系统化的过程&#xff0c;涵盖规划、设计、开发、测试和发布等多个阶段。以下是详细步骤及关键工具推荐&#xff1a; 一、规划阶段&#xff1a;明确目标与内容 定义目标 1、确定网站目的&#xff08;展示信息、销售、博客、服务等&#xff09;。 2、分析目标…

FreeSWITCH配置文件解析(2) dialplan 拨号计划中xml 的action解析

在 FreeSWITCH 的拨号计划&#xff08;Dialplan&#xff09;中&#xff0c;使用 XML 配置。其中&#xff0c;<action> 标签用于指定要执行的操作。这些操作通常是应用程序&#xff08;applications&#xff09;或设置变量等。下面列出常见的 <action> 类型及其含义…

MCPA2APPT:基于 A2A+MCP+ADK 的多智能体流式并发高质量 PPT 智能生成系统

&#x1f680; MCPA2APPT / MultiAgentPPT 集成 A2A MCP ADK 架构的智能化演示文稿生成系统&#xff0c;支持多智能体协作与流式并发&#xff0c;实时生成高质量 PPT 内容。 &#x1f9e0; 项目简介 MultiAgentPPT&#xff08;又名 MCPA2APPT&#xff09;采用 A2A&#xff…

Maven 多模块项目调试与问题排查总结

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…

debian国内安装docker

先升级apt和安装依赖包 apt update apt upgrade apt install curl vim wget gnupg dpkg apt-transport-https lsb-release ca-certificates添加存储库的GPG密钥&#xff08;阿里云&#xff09; curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/debian/gpg | sudo gpg…

vue网页中的一个天气组件使用高德api

今天写了一个天气组件效果如下&#xff1a; 实现代码如下&#xff1a; <template><div><span click"getLocation" style"cursor: pointer"><span style"color:white;">{{ weatherInfo.area }}</span></span&g…