1.Assistant API 的主要优点:
减少编码工作量、自动管理上下文窗口、安全的访问控制、工具和文档的轻松集成
本节讲应用设计和性能

流式输出:借助流式输出,可以让应用程序实时处理和响应用户输入。具体来说,这种技术允许数据在生成的同时即刻传输和处理,而不必等待整个数据处理过程完成。这样,用户无需等待全部数据加载完成即可开始接收到部分结果,从而显著提高了应用的反应速度和交互流畅性。
eg:对话时回答是逐渐展现的。

在流式输出的实现方式中,我们需要在调用 client.chat.completions.create() 时添加 stream=True 参数,用以指定启用流式输出,从而允许 API 逐步发送生成的文本片段,然后使用 for 循环来迭代 completion 对象
code如下:
from openai import OpenAI
client = OpenAI()

completion = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一位乐于助人的人工智能小助理。"},
{"role": "user", "content": "你好,请你介绍一下你自己。"}
],
stream=True
)

for chunk in completion:
print(chunk.choices[0].delta)


OpenAI 提供了两种主要的流实现方法:第一种基于 WebSocket 的直接实时通信方法,第二种是适用于本身不支持此类交互的平台的模拟流。两种流实现方法各有其特点和应用场景:

基于 WebSocket 的直接实时通信方法:
WebSocket 是一种网络通信协议,它允许在用户的浏览器和服务器之间建立一个不断开的连接,通过这个连接可以实现双向的数据传输。

模拟流
是为了在不支持 WebSocket 或其他实时协议的环境中提供类似的功能。这种方法通过定期发送HTTP请求来“模拟”一个持续的数据流。常见的实现方式包括长轮询和服务器发送事件(Server-Sent Events, SSE)。
在这两种方法中,模拟流更加常用,因为它依赖于标准的HTTP请求和响应,易于理解和实现。

二.具体实操:
1.Assistant API 如何开启流式传输
要在一个完整的生命周期内启用流式传输,可以分别在Create Run、 Create Thread and Run和 Submit Tool Outputs 这三个 API 端点中添加 "stream": True 参数。
这样设置后,API 返回的响应将是一个事件流。

首先,我们按照 Assistant API创建对话或代理的标准流程来构建应用实例。如下代码所示:
from openai import OpenAI

# 构建 OpenAI 客户端对象的实例
client = OpenAI()

# Step 1. 创建 Assistant 对象
assistant = client.beta.assistants.create(
model="gpt-4o-mini-2024-07-18",
name="Good writer",  # 优秀的作家
instructions="You are an expert at writing excellent literature"  # 你是一位善于写优秀文学作品的专家
)

# Step 2. 创建 Thread 对象
thread = client.beta.threads.create()

# Step 3. 向Thread中添加Message
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="写一篇关于一个小女孩在森林里遇到一只怪兽的故事。详细介绍她的所见所闻,并描述她的心里活动"
)
在Create Run的过程中,添加stream=True参数,开启流媒体传输。代码如下:
run = client.beta.threads.runs.create(
assistant_id=assistant.id,
thread_id=thread.id,
stream=True      # 开启流式传输
)

print(run)

for event in run:
print(event)
其返回的结果是一个 openai.Stream 对象。这个对象表示的是一个流式数据通道,可以用来接收连续传输的数据。


2.Assistant API 流式传输中的事件流
整个事件流的核心流程包括:当新的运行被创建时,发出 thread.run.created 事件;当运行完成时,发出 thread.run.completed 事件。在运行期间选择创建消息时,将发出一个 thread.message.created 事件、一个 thread.message.in_progress 事件、多个 thread.message.delta 事件,以及最终的 thread.message.completed 事件。

在处理过程中,任何需要的信息都可以通过访问数据结构的方式来获取。
run = client.beta.threads.runs.create(
assistant_id=assistant.id,
thread_id=thread.id,
stream=True      # 开启流媒体传输
)

for event in run:
#print(event.event)
if event.event == 'thread.message.delta':
# 提取 text delta 的 value
value = event.data.delta.content[0].text.value
print(value)

按照相同的思路,我们还可以尝试测试Create Thread and Run和 Submit Tool Outputs两个端点在启用流媒体传输时的实现方法。

  首先来看Create Thread and Run方法。这个方法很容易理解,它所做的事情就是把创建Thread对象实例、向Thread对象实例中追加Message信息以及构建Run状态这三个单独的步骤合并在了一个.beta.threads.create_and_run方法中,所以调用方法也发生了一定的变化,代码如下:
run = client.beta.threads.create_and_run(
assistant_id=assistant.id,
thread={
"messages": [
{"role": "user", "content": "写一篇歌颂中国的文章"}
]
},
stream=True,
)

for event in run:
print(event)

3.如何在函数调用中启用流式传输
这里我们定义一个外部函数get_current_weather,用来获取指定地点的当前天气状况。
import json

def get_current_weather(location, unit="celsius"):
"""Get the current weather in a given location in China"""
if "beijing" in location.lower():
return json.dumps({"location": "Beijing", "temperature": "15", "unit": unit})
elif "shanghai" in location.lower():
return json.dumps({"location": "Shanghai", "temperature": "20", "unit": unit})
elif "guangzhou" in location.lower():
return json.dumps({"location": "Guangzhou", "temperature": "25", "unit": unit})
else:
return json.dumps({"location": location, "temperature": "unknown"})

接下来,需要为 get_current_weather 函数编写一个 JSON Schema的表示。这个 Json Schema的描述将用于指导大模型何时以及如何调用这个外部函数。通过定义必要的参数、预期的数据格式和响应类型,可以确保大模型能够正确并有效地利用这个功用这个功能。定义如下:
get_weather_desc = {
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g.beijing",
},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
},
}

然后,定义一个available_functions 字典,用来映射函数名到具体的函数对象,因为在大模型识别到需要调用外部函数的时候,我们需要动态地调用函数并获取到函数的执行结果。
available_functions = {
"get_current_weather": get_current_weather,
}

在准备好外部函数后,我们在定义 Assistant 对象时需要使用 tools 参数。此参数是一个列表的数据类型,我们要将可用的外部工具的 JSON Schema 描述添加进去,从而让 Assistant 可以正确识别这些工具,代码如下:
from openai import OpenAI
client = OpenAI()


# Step 1. 创建一个新的 assistant 对象实例
assistant = client.beta.assistants.create(
name="你是一个实时天气小助理",   
instructions="你可以调用工具,获取到当前的实时天气,再给出最终的回复", 
model="gpt-4o-mini-2024-07-18",
tools=[get_weather_desc],

# Step 2. 创建一个新的 thread 对象实例
thread = client.beta.threads.create()
然后,还是按照Assistant API的标准流程,将Messages追加到Thread中,并执行Run运行状态。代码如下:

# Step 3. 将消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京现在的天气怎么样?"
)


# Step 4. 执行 运行
run = client.beta.threads.runs.create(
thread_id=thread.id, 
assistant_id=assistant.id,
stream=True
)

for event in run:
print(event)


从上述的输出中可以分析出:**与直接生成回复的流式过程相比,明显的区别体现在 `thread.run.step.delta` 事件中。**在这一事件中,增量更新的是大模型输出的参数`arguments`,这个参数中的内容是用于执行外部函数的,而非对“北京现在的天气怎么样?”这个提问的回答。此外,流程会暂停在 `thread.run.requires_action` 事件处,整个过程不会自动生成最终的回复,而是等待外部操作的完成。
因此要在操作中识别这两点
这里我们仍然可以通过直接提取数据结构的方式来获取到想要的信息。
# Step 2. 创建一个新的 thread 对象实例
thread = client.beta.threads.create()

# Step 3. 将消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京现在的天气怎么样?"
)


# Step 4. 执行 运行
run = client.beta.threads.runs.create(
thread_id=thread.id, 
assistant_id=assistant.id,
stream=True
)


for event in run:
if event.event == 'thread.run.step.delta':
# 打印大模型输出的参数
print("Delta Event Arguments:")
print(event.data.delta.step_details.tool_calls[0].function.arguments)
print("--------------------------------------------------")  # 添加分隔符

    if event.event == 'thread.run.requires_action':
# 打印需要采取行动的详细信息
print("Requires Action Data:")
print(event.data)
print("--------------------------------------------------")  # 添加分隔符

在 event.event == 'thread.run.requires_action' 事件中,当识别到需要执行的外部函数及其参数后,我们可以根据Function Calling中介绍的方法来执行这些函数。在流媒体的事件流中我们只要适当的进行修改,就可以完成这个过程。代码如下:

# Step 2. 创建一个新的 thread 对象实例
thread = client.beta.threads.create()

# Step 3. 将消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京现在的天气怎么样?"
)


# Step 4. 执行 运行
run = client.beta.threads.runs.create(
thread_id=thread.id, 
assistant_id=assistant.id,
stream=True
)

tool_outputs = []
for event in run:
# print(f"event:{event}")
if event.event == 'thread.run.requires_action':
# 先拿到thread.run.requires_action事件的全部信息
function_info = event.data
print("--------------------------------------------------")
print(f"Function Info: {function_info}")

required_action = function_info.required_action
print("--------------------------------------------------")
print(f"Required Action: {required_action}")

tool_calls = required_action.submit_tool_outputs.tool_calls
print("--------------------------------------------------")
print(f"Tool Calls: {tool_calls}")

        # 执行外部函数,使用循环是因为`Assistant API` 可以并行执行多个函数
for tool_call in tool_calls:
tool_id = tool_call.id
function = tool_call.function
function_name = function.name
function_args = json.loads(function.arguments)
function_result = available_functions[function_name](**function_args)
print("--------------------------------------------------")
print(f"Function Result for {function_name}: {function_result}")

            tool_outputs.append({"tool_call_id": tool_id, "output":function_result})

print("--------------------------------------------------")
print(f"Tool Outputs: {tool_outputs}")

 当获取到外部函数的执行结果后,需要将这些结果再次追加到 Thread 中,使其再次进入到队列中,以继续回答北京现在的天气怎么样?这个原始的问题。正如我们在上节课中介绍的,使用.beta.threads.runs.submit_tool_outputs方法用于提交外部工具的输出,此方法也支持流媒体输出,如果在这个阶段需要开启流媒体传输,我们就需要使用 stream=True 参数来明确指定。代码如下:

# Step 2. 创建一个新的 thread 对象实例
thread = client.beta.threads.create()

# Step 3. 将消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京现在的天气怎么样?"
)


# Step 4. 执行 运行
run = client.beta.threads.runs.create(
thread_id=thread.id, 
assistant_id=assistant.id,
stream=True
)

tool_outputs = []
for event in run:
# print(f"event:{event}")
if event.event == 'thread.run.requires_action':
# 先拿到thread.run.requires_action事件的全部信息
function_info = event.data
print("--------------------------------------------------")
print(f"Function Info: {function_info}")

required_action = function_info.required_action
print("--------------------------------------------------")
print(f"Required Action: {required_action}")

tool_calls = required_action.submit_tool_outputs.tool_calls
print("--------------------------------------------------")
print(f"Tool Calls: {tool_calls}")

        # 执行外部函数,使用循环是因为`Assistant API` 可以并行执行多个函数
for tool_call in tool_calls:
tool_id = tool_call.id
function = tool_call.function
function_name = function.name
function_args = json.loads(function.arguments)
function_result = available_functions[function_name](**function_args)
print("--------------------------------------------------")
print(f"Function Result for {function_name}: {function_result}")

            tool_outputs.append({"tool_call_id": tool_id, "output":function_result})

print("--------------------------------------------------")
print(f"Tool Outputs: {tool_outputs}")


run_tools = client.beta.threads.runs.submit_tool_outputs(
thread_id=thread.id, 
run_id=function_info.id,
tool_outputs=tool_outputs,
stream=True)

for event_tool in run_tools:
# print(f"event_tool:{event_tool}")
if event_tool.event == 'thread.message.delta':
# 提取 text delta 的 value
text = event_tool.data.delta.content[0].text.value
print("Delta Text Output:", text)
print("--------------------------------------------------")  # 添加分隔符

if event_tool.event == 'thread.message.completed':
# 提取 text delta 的 value
full_text = event_tool.data.content[0].text.value
print("Completed Message Text:", full_text)
print("--------------------------------------------------")  # 添加分隔符

由此可见,当流式传输中涉及到函数调用的中间过程时,实际上也是发生了两次`Run`操作。为了使第二个`Run`能够接续第一个`Run`的输出,关键在于两者之间需要共享第一个`Run`的`Run id`。此外,在第二个`Run`的运行状态中需要设置`stream=True`以启用流式传输,确保两个`Run`状态的输出能够有效链接,形成一次连贯的响应。整个过程的事件流非常多,需要大家仔细体会。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/88361.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/88361.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/88361.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

React Native安卓刘海屏适配终极方案:仅需修改 AndroidManifest.xml!

📌 问题背景在 React Native 开发中,我们经常会遇到安卓设备刘海屏(Notch)适配问题。即使正确使用了 react-native-safe-area-context 和 react-navigation,在一些安卓设备(如小米、华为、OPPO 等&#xff…

Spring Boot整合MyBatis+MySQL实战指南(Java 1.8 + 单元测试)

一、环境准备 开发工具&#xff1a;IntelliJ IDEA 2023.1 JDK 1.8.0_382 Maven3.6.3数据库&#xff1a;MySQL 8.0.21依赖版本&#xff1a;<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifact…

游戏开发日记

如何用数据表来储存&#xff0c;位置坐标&#xff08;XYZ&#xff09;&#xff1a;决定了对象在世界中的摆放资源ID / 图片URL&#xff1a;决定了使用什么模型或贴图事件ID / 特效&#xff1a;是否触发某些事件&#xff08;例如点击、交互&#xff09;逻辑索引&#xff08;Grid…

如何使用xmind编写测试用例

如何使用xmind编写测试用例为什么要使用xmind&#xff1f;使用xmind编写测试用例是为了梳理我们的思路。使用xmind编写测试用例的思路是什么&#xff1f;先进行分析再提取测试用例。 例如下面的注册功能的测试用例的分析&#xff1a; 分析&#xff1a; 先提取出需要测试的功能点…

使用LLaMA-Factory微调Qwen2.5-VL-3B 的目标检测任务-数据集格式转换(voc 转 ShareGPT)

一、LLaMA-Factory Qwen2.5-VL ShareGPT 格式要求ShareGPT 格式就是多轮对话的 list&#xff0c;每条数据如下&#xff1a;[{"conversations": [{"from": "user", "value": "<image>\n请标注图片中的所有目标及其类别和位…

【SkyWalking】服务端部署与微服务无侵入接入实战指南

【SkyWalking】服务端部署与微服务无侵入接入实战指南 &#x1f4a1; SkyWalking 系列总引导 在微服务架构快速演进的今天&#xff0c;如何有效实现服务链路追踪、性能分析、日志采集与自动化告警&#xff0c;成为系统稳定性的关键保障手段。 SkyWalking&#xff0c;作为 Apa…

LVDS系列20:Xilinx 7系ISERDESE2原语(一)

Xilinx 7系FPGA bank的io单元如下&#xff1a;Hr bank比hp bank少odelaye2组件&#xff0c;两者的idelaye2组件后面&#xff0c;都有iserdese2组件&#xff1b; iserdese2组件是一种专用的串并转换器或称解串器&#xff0c;用于高速源同步应用&#xff0c;如大部分LVDS信号解析…

【U-Boot】Shell指令

目录 U-Boot 三个Shell U-Boot Shell Linux Shell shell脚本 总结 U-Boot Shell命令 帮助命令 部分命令分类与功能说明 一、基础操作与信息查询 二、内存操作 三、启动管理 四、文件系统操作 五、设备与分区管理 六、环境变量 七、诊断与调试 八、特殊功能 九…

《Revisiting Generative Replay for Class Incremental Object Detection》阅读笔记

摘要Abstract部分 原文 Generative replay has gained significant attention in class-incremental learning; however, its application to Class Incremental Object Detection (CIOD) remains limited due to the challenges in generating complex images with precise …

Mysql: Bin log原理以及三种格式

目录 一、什么是 Binlog&#xff1f; 二、Binlog 的应用场景与案例 1. 数据恢复 (Point-in-Time Recovery) 2. 主从复制 (Master-Slave Replication) 3. 数据审计 三、Binlog 的三种格式 1. STATEMENT 模式 (Statement-Based Logging - SBL) 2. ROW 模式 (Row-Based Log…

LiteHub之文件下载与视频播放

文件下载 前端请求 箭头函数 //这个箭头函数可以形象理解为&#xff0c;x流入&#xff08;>&#xff09;x*x, //自然而然>前面的就是传入参数,>表示函数体 x > x * x//相当于 function (x) {return x * x; }//如果参数不是一个&#xff0c;就需要用括号()括起来…

QT5使用cmakelists引入Qt5Xlsx库并使用

1、首先需要已经有了Qt5Xlsx的头文件和库&#xff0c;并拷贝到程序exe路径下&#xff08;以xxx.exe/3rdparty/qtxlsx路径为例&#xff0c;Qt5Xlsx版本为0.3.0&#xff09;&#xff1b; 2、cmakelist中&#xff1a; # 设置 QtXlsx 路径 set(QTXLSX_ROOT_DIR ${CMAKE_CURRENT_SOU…

醋酸镨:闪亮的稀土宝藏,掀开科技应用新篇章

一、什么是醋酸镨醋酸镨是一种镨的有机盐&#xff0c;镨是稀土金属元素之一。作为一种重要的稀土化合物&#xff0c;醋酸镨通常以水合物的形式存在&#xff0c;呈现淡黄色或无色结晶。镨元素本身因其独特的物理化学特性&#xff0c;在工业和科技领域有着广泛应用&#xff0c;而…

深入解析JVM内存结构与垃圾回收机制

java是强类型高级语言JVM&#xff08;Java Virtual Machine&#xff0c;Java虚拟机&#xff09;是Java平台的核心组件&#xff0c;它是一个虚拟的计算机&#xff0c;能够执行Java字节码&#xff08;bytecode&#xff09;。1、区域划分JVM对Java内存的管理也是分区分块进行&…

Java 流程控制详解:从顺序执行到跳转语句,掌握程序逻辑设计

作为一名Java开发工程师&#xff0c;你一定知道&#xff0c;流程控制&#xff08;Flow Control&#xff09; 是编写任何程序的核心。它决定了代码的执行路径、分支走向和循环次数。本文将带你系统梳理 Java中的所有常用流程控制结构&#xff0c;包括&#xff1a;顺序结构分支结…

面试150 环形链表

思路 采用双指针法,slow指针每次走一步,fast指针每次走两步&#xff0c;如果相遇的情况下&#xff0c;slow指针回到开始的位置,此时快慢指针各走一步&#xff0c;当相遇的时候也就是说明链表中有环。 # Definition for singly-linked list. # class ListNode: # def __init…

AI技术正在深度重构全球产业格局,其影响已超越工具属性,演变为推动行业变革的核心引擎。

一、AI如何重塑AI的工作与行业&#xff08;AI助手领域&#xff09;能力升级理解与生成&#xff1a;基于LLM&#xff08;大语言模型&#xff09;&#xff0c;AI能处理开放式问题、撰写报告、翻译代码&#xff0c;替代部分人类知识工作。个性化交互&#xff1a;通过用户历史对话分…

Kafka的无消息丢失配置怎么实现

那 Kafka 到底在什么情况下才能保证消息不丢失呢&#xff1f; Kafka 只对“已提交”的消息&#xff08;committed message&#xff09;做有限度的持久化保证。 第一个核心要素是“已提交的消息”。什么是已提交的消息&#xff1f;当 Kafka 的若干个 Broker 成 功地接收到一条…

集成CommitLInt+ESLint+Prettier+StyleLint+LintStaged

代码可读性低代码 代码规范落地难代码格式难统一代码质量低下 配置 ESLint ESLint 是一个用来识别 ECMAScript 并且按照规则给出报告的代码检测工具&#xff0c;使用它可以避免低级错误和统一代码的风格。它拥有以下功能&#xff1a; 查出 JavaScript 代码语法问题。根据配置…

寻找两个正序数组的中位数(C++)

给定两个大小分别为 m 和 n 的正序&#xff08;从小到大&#xff09;数组 nums1 和 nums2。请你找出并返回这两个正序数组的 中位数 。算法的时间复杂度应该为 O(log (mn)) 。示例 1&#xff1a;输入&#xff1a;nums1 [1,3], nums2 [2] 输出&#xff1a;2.00000 解释&#x…