作者:来自 Elastic Jeffrey Rengifo

学习如何使用 Pydantic 模式和 FastAPI 后台任务,通过实际示例构建一个 Elasticsearch API。

想获得 Elastic 认证吗?查看下一期 Elasticsearch Engineer 培训的时间!

Elasticsearch 拥有丰富的新功能,能帮助你为你的使用场景构建最佳的搜索解决方案。深入了解我们的示例笔记本,开始免费云试用,或立即在本地机器上体验 Elastic。


从《Elasticsearch in JavaScript》这篇文章中我们了解到,你不应该将 Elasticsearch 实例暴露在互联网上,而是应该构建一个 API 层。暴露集群的 URL、索引名称或 API 密钥会让攻击者更容易瞄准你的数据,并不必要地扩大你的攻击面。即使你对请求进行了清洗,攻击者仍然可以发送复杂或高负载的查询使集群超载。因此,最好不仅隐藏集群本身,还要隐藏查询逻辑 —— 只让用户控制必要的部分,比如搜索关键字,而不是整个查询。

对于 JavaScript 开发者来说,可以选择使用 Node.js,而对于 Python 开发者来说,FastAPI 是一个不错的替代方案。FastAPI 因其简洁性和开箱即用的高性能而受到广泛欢迎。

在本文中,我们将使用 FastAPI 构建 Elasticsearch 与客户端应用程序(通常是网页浏览器)之间的 API 层。然后,我们将探索一些可以通过 FastAPI 原生功能实现的常见用例。

你可以在这里找到该应用程序的笔记本。

准备数据

本文将使用一个包含兽医就诊记录的数据集。下面是一个示例文档:

{"owner_name": "Marco Rivera","pet_name": "Milo","species": "Cat","breed": "Siamese","vaccination_history": ["Rabies","Feline Leukemia"],"visit_details": "Slight eye irritation, prescribed eye drops."
}

首先,我们需要安装 Elasticsearch 客户端,以便能够查询我们的集群:

pip install elasticsearch

现在,我们导入 Elasticsearch 客户端、helpers 和 getpass,用于从终端捕获环境变量。

from elasticsearch import Elasticsearch, helpers
from getpass import getpassos.environ["ELASTICSEARCH_ENDPOINT"] = getpass("Elasticsearch endpoint: ")
os.environ["ELASTICSEARCH_API_KEY"] = getpass("Elasticsearch api-key: ")

我们定义索引名称,并使用 Elasticsearch 端点和 API 密钥初始化 Elasticsearch 客户端。

ES_INDEX = "vet-visits"es_client = Elasticsearch(hosts=[os.environ["ELASTICSEARCH_ENDPOINT"]],api_key=os.environ["ELASTICSEARCH_API_KEY"],
)

然后我们来创建映射:

es_client.indices.create(index=ES_INDEX,body={"mappings": {"properties": {"breed": {"type": "keyword"},"owner_name": {"type": "text","fields": {"keyword": {"type": "keyword"}},},"pet_name": {"type": "text","fields": {"keyword": {"type": "keyword"}},},"species": {"type": "keyword"},"vaccination_history": {"type": "keyword"},"visit_details": {"type": "text"},}}},)

最后,下载数据集并将其放在脚本所在的同一文件夹。然后,我们可以使用 Bulk API 将数据导入 Elasticsearch:

def build_data(ndjson_file, index_name):with open(ndjson_file, "r") as f:for line in f:doc = json.loads(line)yield {"_index": index_name, "_source": doc}try:success, errors = helpers.bulk(es_client, build_data("vet-visits.ndjson", ES_INDEX))print(f"{success} documents indexed successfully")if errors:print("Errors during indexing:", errors)
except Exception as e:print(f"Error: {str(e)}")

如果一切顺利,你应该会看到以下信息:

10 documents indexed successfully

现在,数据已导入 Elasticsearch 并准备好使用。接下来,我们将构建 API 来展示 FastAPI 的功能。

Hello, world!

开始之前,我们只需要安装 FastAPI 和用于服务器创建的 Uvicorn,Pydantic 用于模式处理,以及 Elasticsearch 用于存储和搜索数据。

pip install fastapi uvicorn elasticsearch pydantic -q

我们先导入库并创建 FastAPI 服务器实例。

import asyncio
import json
import os
from typing import Listimport uvicorn
from fastapi import BackgroundTasks, Body, FastAPI, HTTPException, Response
from pydantic import BaseModelapp = FastAPI()

然后,我们可以创建一个 ping 端点来检查服务器状态。

@app.get("/ping")
async def ping():try:health = await es_client.cluster.health()return {"status": "success","message": "Connected to Elasticsearch","cluster_status": health["status"],"number_of_nodes": health["number_of_nodes"],"active_shards": health["active_shards"],}except Exception as e:status_code = getattr(e, "status_code", 500)raise HTTPException(status_code=status_code,detail=f"Error connecting to Elasticsearch: {str(e)}",)

请求:

curl -XGET "http://localhost:8000/ping"

响应:

{"status":"success","message":"Connected to Elasticsearch","cluster_status":"green","number_of_nodes":4,"active_shards":172}
@app.post("/search")
async def search(query: dict = Body(...)):try:result = await es_client.search(index=ES_INDEX, body=query)return resultexcept Exception as e:status_code = getattr(e, "status_code", 500)raise HTTPException(status_code=status_code, detail=str(e))

我们将尝试使用 match_phrase 搜索牙齿清洁的就诊记录:

请求:

curl -X POST "http://localhost:8000/search" \-H "Content-Type: application/json" \-d '{"query": {"match_phrase": {"visit_details": "dental cleaning"}},"size": 10}'

响应:

{"took": 1,"timed_out": false,"_shards": {"total": 1,"successful": 1,"skipped": 0,"failed": 0},"hits": {"total": {"value": 1,"relation": "eq"},"max_score": 3.5869093,"hits": [{"_index": "vet-visits","_id": "VUjqWZYB8Z9CzAyMLmyB","_score": 3.5869093,"_source": {"owner_name": "Leo Martínez","pet_name": "Simba","species": "Cat","breed": "Maine Coon","vaccination_history": ["Rabies","Feline Panleukopenia"],"visit_details": "Dental cleaning. Minor tartar buildup removed."}}]}
}

输入和响应的类型定义

FastAPI 的一个关键特性是与 Pydantic 的集成,用于数据模式处理,允许你使用类型注解定义类,并自动进行数据验证。在大多数情况下,给数据加类型对构建健壮稳定的应用至关重要。它还能实现模型复用,使代码更易读、维护和文档化。

不过,严格的类型和模式限制在需要处理高度动态或不可预测的数据结构时可能不太适用。

接下来,我们把搜索端点的请求和响应改成类型定义。

我们可以创建类来验证字段类型(field types)、设置默认值(default values)、定义枚举(enums)和列表(lists)等。这里我们将为用户请求和 Elasticsearch 响应创建类,并应用部分这些概念。

现在,用户只需发送搜索词,和可选的结果数量限制,就会返回包含 owner_name 和 visit_details 的命中列表。

# Pydantic classes# Pydantic class for the request
class SearchRequest(BaseModel):term: strsize: int = 10# Pydantic class for the response
class SearchResponse(BaseModel):hits: List[SearchHit]total: int# Class to format for hits
class SearchHit(BaseModel):owner_name: str = ""visit_details: str = ""

现在这个端点对用户来说更简单,对我们来说更安全,因为用户只能控制查询的一部分,而不是全部。

@app.post("/search", response_model=SearchResponse)
async def search_v3(request: SearchRequest):try:query = {"query": {"match_phrase": {"visit_details": request.term}},"size": request.size,}result = await es_client.search(index=ES_INDEX, body=query)hits = result["hits"]["hits"]results = []for hit in hits:source = hit.get("_source", {})results.append(SearchHit(owner_name=source["owner_name"],visit_details=source["visit_details"],))return SearchResponse(hits=results, total=len(results))except Exception as e:status_code = getattr(e, "status_code", 500)raise HTTPException(status_code=status_code, detail=str(e))

让用户完全访问 _search 查询体被认为是安全风险,因为恶意用户可能发送超载集群的查询。采用这种方式,用户只能设置 match_phrase 子句内的内容,使端点更安全。

响应也是如此。我们可以返回更简洁的结果,隐藏 _id、_score、_index 等字段。

请求:

curl -X POST "http://localhost:8000/search" \-H "Content-Type: application/json" \-d '{"term": "dental cleaning"}'

响应:

{"hits":[{"owner_name":"Leo Martínez","visit_details":"Dental cleaning. Minor tartar buildup removed."}],"total":1}

后台任务

FastAPI 的另一个功能是支持后台任务。利用后台任务,你可以立即返回结果给用户,同时在后台继续执行任务。这个功能对长时间运行的任务特别有用。

在 Elasticsearch 中,我们使用 wait_for_completion=false 参数获取任务 ID 并关闭连接,而不是等待任务完成。然后可以用 tasks API 查询任务状态。一些示例有 _reindex、_update_by_query 和 _delete_by_query。

假设你想让用户触发基于字段值删除几百万条文档的操作,并在完成后通知他们。你可以结合 FastAPI 的后台任务和 Elasticsearch 的 wait_for_completion 实现。

我们先创建一个函数,每隔 2 秒查询 tasks API 检查任务状态。

async def check_task(es_client, task_id):try:while True:status = await es_client.tasks.get(task_id=task_id)if status.get("completed", False):print(f"Task {task_id} completed.")# Here you can add the logic to send the notificationbreakawait asyncio.sleep(2)except Exception as e:print(f"Error checking task {task_id}: {e}")

现在,我们可以创建一个端点,接收用作过滤条件的值来删除文档。调用 _delete_by_query API,设置 wait_for_completion=false,并使用返回的任务 ID 创建一个后台任务,调用我们之前写的 check_task 方法。

background_tasks.add_task 的第一个参数是要执行的函数,后面的参数是该函数所需的参数。

# Background task endpoint
@app.post("/delete-by-query")
async def delete_by_query(request: SearchRequest = Body(...), background_tasks: BackgroundTasks = None
):try:body = {"query": {"term": {"pet_name.keyword": request.term}}}response = await es_client.delete_by_query(index=ES_INDEX, body=body, wait_for_completion=False)task_id = response.get("task")if task_id:background_tasks.add_task(check_task, es_async_client, task_id)return Response(status_code=200,content=json.dumps({"message": "Delete by query. The response will be send by email when the task is completed.","task_id": task_id,}),media_type="application/json",)except Exception as e:status_code = getattr(e, "status_code", 500)raise HTTPException(status_code=status_code, detail=str(e))

下面是删除所有 pet_name.keyword 字段中包含 “Buddy” 文档的示例:

curl -X POST "http://localhost:8000/delete-by-query" \-H "Content-Type: application/json" \-d '{"term": "Buddy"}'

响应:

{"message": "Delete by query. The response will be send by email when the task is completed.", "task_id": "191ALShERbucSkcFTGpOCg:34822095"}

后台日志:

INFO:     127.0.0.1:58804 - "POST /delete-by-query HTTP/1.1" 200 OK
Task 191ALShERbucSkcFTGpOCg:34822095 completed.

运行 API

添加以下代码块,将服务器暴露在 8000 端口:

if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)

使用以下命令运行 FastAPI 应用:

uvicorn app:app --reload

总结

FastAPI 让构建安全且简洁的 Elasticsearch API 层变得简单。它内置类型检查、异步支持和后台任务,能轻松处理常见用例,且开销不大。

这些示例只是起点——你可以根据需要添加认证、分页,甚至 websockets。关键是保持集群安全,API 清晰且易于维护。

原文:Building Elasticsearch APIs with FastAPI - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/92580.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/92580.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/92580.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[2025CVPR-目标检测方向]FSHNet:一种用于3D物体检测的全稀疏混合网络。

1. ​简介​ 论文提出了FSHNet(Fully Sparse Hybrid Network),一种用于3D物体检测的全稀疏混合网络。FSHNet旨在解决现有稀疏3D检测器的两大核心问题:长距离交互能力弱和网络优化困难。稀疏检测器(如VoxelNeXt和SAFDN…

MySql 8.0.42 zip版安装教程和使用

今天要装个MySQL,就按照自己以前的教程来做,不知道是不是版本更新了的原因,又遇到了一点小阻碍,于是再记录一下吧。 下载MySQL 下载链接:MySQL :: Download MySQL Community Serverhttps://dev.mysql.com/downloads/…

【lucene】实现knn

在 Lucene 中,可以通过 KnnFloatVectorQuery 和 KnnFloatVectorField 来实现 KNN(k-Nearest Neighbors)搜索。以下是具体介绍:1. 功能原理KnnFloatVectorQuery 是 Lucene 用于执行最近邻搜索的查询类,它可以在一个字段…

RabbitMQ实践学习笔记

RabbitMQ实践 以下是关于RabbitMQ实践的整理,涵盖常见场景和示例代码(基于Markdown格式)。内容按模块分类,避免步骤词汇,直接提供可操作的方法: 基础连接与队列声明 使用Python的pika库建立连接并声明队列: import pikaconnection = pika.BlockingConnection(pika.C…

量子生成对抗网络:量子计算与生成模型的融合革命

引言:当生成对抗网络遇上量子计算在人工智能与量子计算双重浪潮的交汇处,量子生成对抗网络(Quantum Generative Adversarial Networks, QGAN)正成为突破经典算力瓶颈的关键技术。传统生成对抗网络(GAN)在图…

VBA 多个选项,将选中的选项录入当前选中的单元格

1、使用LISTBOX插件&#xff0c;选中后回车录入 维护好数据&#xff0c;并新增一个activeX列表框插件 Private Sub Worksheet_SelectionChange(ByVal Target As Range)If Target.Count > 1 Then Exit SubIf Target.Row > 2 And Target.Row < 10 And Target.Column 2…

【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 主页-微博点赞量Top6实现

大家好&#xff0c;我是java1234_小锋老师&#xff0c;最近写了一套【NLP舆情分析】基于python微博舆情分析可视化系统(flaskpandasecharts)视频教程&#xff0c;持续更新中&#xff0c;计划月底更新完&#xff0c;感谢支持。今天讲解主页-微博点赞量Top6实现 视频在线地址&…

SAP调用外部API

SAP需求将中文字符转化为对应的拼音具体思路,由于sap中没有将中文字符转化为拼音的函数或方法类,则以http请求访问外部服务器发布的API服务,然后获取其返回值即可1.调用外部网站上提供的api缺点:免费次数有限,后需要充值这里是用www格式的json报文*&----------------------…

(12)机器学习小白入门YOLOv:YOLOv8-cls 模型微调实操

YOLOv8-cls 模型微调实操 (1)机器学习小白入门YOLOv &#xff1a;从概念到实践 (2)机器学习小白入门 YOLOv&#xff1a;从模块优化到工程部署 (3)机器学习小白入门 YOLOv&#xff1a; 解锁图片分类新技能 (4)机器学习小白入门YOLOv &#xff1a;图片标注实操手册 (5)机器学习小…

基于Matlab传统图像处理技术的车辆车型识别与分类方法研究

随着计算机视觉和图像处理技术的发展&#xff0c;车辆检测与识别已经成为智能交通系统中的一个重要研究方向。传统图像处理方法通过对图像进行预处理、特征提取、分类与识别&#xff0c;提供了一种无需复杂深度学习模型的解决方案。本研究基于MATLAB平台&#xff0c;采用传统图…

未来趋势:LeafletJS 与 Web3/AI 的融合

引言 LeafletJS 作为一个轻量、灵活的 JavaScript 地图库&#xff0c;以其模块化设计和高效渲染能力在 Web 地图开发中占据重要地位。随着 Web3 和人工智能&#xff08;AI&#xff09;的兴起&#xff0c;地图应用的开发范式正在发生变革。Web3 技术&#xff08;如区块链、去中…

Spring AI 系列之二十一 - EmbeddingModel

之前做个几个大模型的应用&#xff0c;都是使用Python语言&#xff0c;后来有一个项目使用了Java&#xff0c;并使用了Spring AI框架。随着Spring AI不断地完善&#xff0c;最近它发布了1.0正式版&#xff0c;意味着它已经能很好的作为企业级生产环境的使用。对于Java开发者来说…

LFU算法及优化

继上一篇的LRU算法的实现和讲解&#xff0c;这一篇来讲述LFU最近使用频率高的数据很大概率将会再次被使用,而最近使用频率低的数据,将来大概率不会再使用。做法&#xff1a;把使用频率最小的数据置换出去。这种算法更多是从使用频率的角度&#xff08;但是当缓存满时&#xff0…

关于原车一键启动升级手机控车的核心信息及注意事项

想知道如何给原车已经有一键启动功能的车辆加装手机远程启动。这是个很实用的汽车改装需求&#xff0c;尤其适合想在冬天提前热车、夏天提前开空调的车主。一、适配方案与核心功能 ‌升级专车专用4G手机控车模块‌&#xff0c;推荐安装「移动管家YD361-3」系统&#xff0c;该方…

数据结构与算法:类C语言有关操作补充

数据结构与算法:类C语言操作补充 作为老师,我将详细讲解类C语言(如C或C++)中的关键操作,包括动态内存分配和参数传递。这些内容在数据结构与算法中至关重要,例如在实现动态数组、链表或高效函数调用时。我会用通俗易懂的语言和代码示例逐步解释,确保你轻松掌握。内容基…

Go 并发(协程,通道,锁,协程控制)

一.协程&#xff08;Goroutine&#xff09;并发&#xff1a;指程序能够同时执行多个任务的能力&#xff0c;多线程程序在一个核的cpu上运行&#xff0c;就是并发。并行&#xff1a;多线程程序在多个核的cpu上运行&#xff0c;就是并行。并发主要由切换时间片来实现"同时&q…

图机器学习(15)——链接预测在社交网络分析中的应用

图机器学习&#xff08;15&#xff09;——链接预测在社交网络分析中的应用0. 链接预测1. 数据处理2. 基于 node2vec 的链路预测3. 基于 GraphSAGE 的链接预测3.1 无特征方法3.2 引入节点特征4. 用于链接预测的手工特征5. 结果对比0. 链接预测 如今&#xff0c;社交媒体已成为…

每日一算:华为-批萨分配问题

题目描述"吃货"和"馋嘴"两人到披萨店点了一份铁盘&#xff08;圆形&#xff09;披萨&#xff0c;并嘱咐店员将披萨按放射状切成大小相同的偶数个小块。但是粗心的服务员将披萨切成了每块大小都完全不同的奇数块&#xff0c;且肉眼能分辨出大小。由于两人都…

Transfusion,Show-o and Show-o2论文解读

目录 一、Transfusion 1、概述 2、方法 二、Show-o 1、概述 2、方法 3、训练 三、Show-o2 1、概述 2、模型架构 3、训练方法 4、实验 一、Transfusion 1、概述 Transfusion模型应该是Show系列&#xff0c;Emu系列的前传&#xff0c;首次将文本和图像生成统一到单…

聊聊 Flutter 在 iOS 真机 Debug 运行出现 Timed out *** to update 的问题

最近刚好有人在问&#xff0c;他的 Flutter 项目在升级之后出现 Error starting debug session in Xcode: Timed out waiting for CONFIGURATION_BUILD_DIR to update 问题&#xff0c;也就是真机 Debug 时始终运行不了的问题&#xff1a; 其实这已经是一个老问题了&#xff0c…