1、sse、stdio、streamable-http使用
参考:https://gofastmcp.com/deployment/running-server#the-run-method
stdio本地使用;sse、streamable-http远程调用(
Streamable HTTP—New in version: 2.3.0)
调用:
stdio、sse
streamable-http
2、 MCPConfig多服务器使用案例
参考:
https://gofastmcp.com/clients/client
代码
config如果只有一个服务,那call_tool函数不用前缀直接工具函数名,如果多个服务,需要添加前缀
from fastmcp import Client# Standard MCP configuration with multiple servers
config = {"mcpServers": {"trip": {"url": "http://localhost:8000/sse"},"math":{"command": "python","args": ["./api_mcp_server_math.py"],"env": {"DEBUG": "true"}}}
}async def main():async with Client(config) as client:tools = await client.list_tools()print("Available tools:")for tool in tools: ###打印结果系统也自动加了前缀print(f" - {tool.name}: {tool.description}")hotel_response = await client.call_tool("trip_check_hotel", {"cityname": "广州","date": "后天"})math_response = await client.call_tool("math_add", {"a": 1, "b": 2})print("Math response:", math_response)print("Hotel response:", hotel_response)if __name__ == "__main__":import asyncioasyncio.run(main())
for tool in tools: ###打印结果系统也自动加了前缀
print(f" - {tool.name}: {tool.description}")
大模型自动调用工具执行
模型自动调用工具,LLM 根据用户输入自动选择合适的工具;使用百炼qwen3思考模型流式输出
#!/usr/bin/python3
# -*- coding: utf-8 -*-import json
import asyncio
from typing import List, Dict, Optionalfrom openai import OpenAI
from fastmcp import Clientclass IntegratedMCPClient:def __init__(self, config: Dict, model="qwen3-235b-a22b"):"""初始化集成的 MCP 客户端Args:config: MCP 服务器配置字典model: 使用的模型名称"""self.config = configself.model = model# 初始化 OpenAI 客户端self.client = OpenAI(api_key="sk-424971",base_url="https://dashscope.ale-mode/v1")# 初始化 MCP 客户端self.session = Client(config)self.tools = []self.server_tool_mapping = {} # 记录工具来自哪个服务器async def prepare_tools(self):"""准备所有可用的工具"""try:# 获取所有工具tools = await self.session.list_tools()print(f"发现 {len(tools)} 个可用工具:")self.tools = []self.server_tool_mapping = {}for tool in tools:print(f" - {tool.name}: {tool.description}")# 构建工具描述tool_def = {"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema,}}self.tools.append(tool_def)# 记录工具映射(如果需要区分来源)# 这里可以通过工具名前缀或其他方式来识别来源服务器if tool.name.startswith("trip_"):self.server_tool_mapping[tool.name] = "trip"elif tool.name.startswith("math_"):self.server_tool_mapping[tool.name] = "math"else:self.server_tool_mapping[tool.name] = "unknown"except Exception as e:print(f"准备工具时出错: {e}")self.tools = []async def chat(self, messages: List[Dict]) -> Optional[object]:"""处理聊天对话,支持工具调用"""if not self.tools:await self.prepare_tools()try:# 使用流式响应,启用思考过程response = self.client.chat.completions.create(model=self.model,messages=messages,tools=self.tools if self.tools else None,temperature=0,max_tokens=16000,stream=True,logprobs=False,stream_options={"include_usage": True},extra_body={"enable_thinking": True # 启用思考过程})# 收集流式响应collected_content = ""collected_tool_calls = []finish_reason = Noneprint("AI: ", end="", flush=True)for chunk in response:# 安全检查:防止空的 choicesif not chunk.choices:continuedelta = chunk.choices[0].deltafinish_reason = chunk.choices[0].finish_reason# 打印思考过程(Qwen 特有字段)reasoning = getattr(delta, "reasoning_content", None)if reasoning:print(f"{reasoning}", end="", flush=True)# 打印常规内容if delta.content:print(delta.content, end="", flush=True)collected_content += delta.content# 收集工具调用if hasattr(delta, 'tool_calls') and delta.tool_calls:if not collected_tool_calls:collected_tool_calls = [{"id": "", "function": {"name": "", "arguments": ""}} for _ in delta.tool_calls]for i, tool_call_delta in enumerate(delta.tool_calls):if tool_call_delta.id:collected_tool_calls[i]["id"] = tool_call_delta.idif tool_call_delta.function:if tool_call_delta.function.name:collected_tool_calls[i]["function"]["name"] = tool_call_delta.function.nameif tool_call_delta.function.arguments:collected_tool_calls[i]["function"]["arguments"] += tool_call_delta.function.argumentsprint() # 换行# 如果没有工具调用,返回收集的内容if finish_reason != 'tool_calls' or not collected_tool_calls:class SimpleMessage:def __init__(self, content):self.content = contentreturn SimpleMessage(collected_content)# 处理工具调用print("\n[正在调用工具...]")assistant_message = {'role': 'assistant','content': collected_content,'tool_calls': [{"id": tc["id"],"type": "function","function": {"name": tc["function"]["name"],"arguments": tc["function"]["arguments"]}}for tc in collected_tool_calls]}print(f"[助手消息]: 调用了 {len(collected_tool_calls)} 个工具")messages.append(assistant_message)# 执行每个工具调用for tool_call in collected_tool_calls:try:tool_name = tool_call['function']['name']tool_args = json.loads(tool_call['function']['arguments'])server_name = self.server_tool_mapping.get(tool_name, "unknown")print(f"\n[执行工具]: {tool_name} (来自服务器: {server_name})")print(f"[工具参数]: {tool_args}")# 调用工具result = await self.session.call_tool(tool_name, tool_args)# 处理结果result_content = ""if result:if isinstance(result, list) and len(result) > 0:if hasattr(result[0], 'text'):result_content = result[0].textelse:result_content = str(result[0])else:result_content = str(result)else:result_content = "No result"# 添加工具结果到消息中messages.append({'role': 'tool','tool_call_id': tool_call['id'],'content': result_content})print(f"[工具结果]: {result_content}")except Exception as e:print(f"[工具调用错误]: {e}")messages.append({'role': 'tool','tool_call_id': tool_call['id'],'content': f"Error executing tool: {str(e)}"})# 使用工具结果获取最终响应print("\n[生成最终回答...]")return await self.chat(messages)except Exception as e:print(f"聊天过程中出错: {e}")class ErrorMessage:def __init__(self, content):self.content = contentreturn ErrorMessage(f"抱歉,处理您的请求时出现错误: {str(e)}")async def loop(self):"""交互式聊天循环"""print("=== 集成 MCP 客户端启动 ===")print("支持的命令:")print(" - quit/exit/bye: 退出程序")print(" - tools: 显示可用工具")print(" - clear: 清除对话历史")print("=====================================\n")conversation_history = []while True:try:async with self.session:# 如果是第一次进入,准备工具if not self.tools:await self.prepare_tools()question = input("\nUser: ").strip()# 处理特殊命令if question.lower() in ['quit', 'exit', 'bye']:print("再见!")breakelif question.lower() == 'tools':print(f"\n可用工具 ({len(self.tools)} 个):")for tool in self.tools:func = tool['function']server = self.server_tool_mapping.get(func['name'], 'unknown')print(f" - {func['name']} (服务器: {server})")print(f" 描述: {func['description']}")continueelif question.lower() == 'clear':conversation_history = []print("对话历史已清除")continueelif not question:continue# 添加用户消息到历史user_message = {"role": "user", "content": question}current_messages = conversation_history + [user_message]# 获取响应response = await self.chat(current_messages.copy())if response and hasattr(response, 'content'):print(f"\n回答: {response.content}")# 更新对话历史conversation_history.append(user_message)conversation_history.append({"role": "assistant", "content": response.content})# 限制历史长度,避免上下文过长if len(conversation_history) > 20: # 保留最近10轮对话conversation_history = conversation_history[-20:]except KeyboardInterrupt:print("\n\n程序被用户中断")breakexcept Exception as e:print(f"循环过程中出错: {e}")continueasync def single_query(self, query: str) -> str:"""单次查询接口,适用于非交互式使用"""try:async with self.session:if not self.tools:await self.prepare_tools()messages = [{"role": "user", "content": query}]response = await self.chat(messages)return response.content if response and hasattr(response, 'content') else "无响应"except Exception as e:return f"查询失败: {str(e)}"async def main():"""主函数"""# MCP 配置config = {"mcpServers": {"trip": {"url": "http://localhost:8000/sse"},"math": {"command": "python","args": ["./api_mcp_server_math.py"],"env": {"DEBUG": "true"}}}}print("正在初始化集成 MCP 客户端...")mcp_client = IntegratedMCPClient(config)# 启动交互式循环await mcp_client.loop()async def test_single_queries():"""测试单次查询的示例"""config = {"mcpServers": {"trip": {"url": "http://localhost:8000/sse"},"math": {"command": "python", "args": ["./api_mcp_server_math.py"],"env": {"DEBUG": "true"}}}}mcp_client = IntegratedMCPClient(config)# 测试查询test_queries = ["帮我查询广州后天的酒店信息","计算 15 加 27 等于多少","我想知道北京明天有什么好的酒店推荐","计算 100 乘以 50","你好,请介绍一下你的功能"]for query in test_queries:print(f"\n{'='*50}")print(f"测试查询: {query}")print(f"{'='*50}")result = await mcp_client.single_query(query)print(f"结果: {result}")if __name__ == '__main__':# 运行交互式模式asyncio.run(main())# 或者运行测试模式# asyncio.run(test_single_queries())