前言

官方示例的代码中,mcp一般是配置到yml中或者json文件中,使用自动装配的方式注入服务,这种方式不方便在程序启动后添加新的服务,这里参考cherry studio的方式动态添加mcp服务

1.确定方案

  • mcp服务的维护放到mysql业务数据库维护,前端通过json来添加mcp服务,为什么是json?因为开源的mcp服务都提供json示例,拿来即用

  • 后端轮询mcp服务确保服务可用状态

  • 前端动态切换模型时,根据模型是否支持工具调和是否启用mcp来控制是否使用工具

  • 一个mcp服务下可能有一系列的工具,提供一个查看mcp服务工具的页面

2. pom依赖

 <!-- Spring AI MCP 核心包 (包含ToolCallbackProvider) --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-starter-mcp-client</artifactId></dependency><!-- Spring AI Model (ToolCallback接口等) --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-model</artifactId><version>${spring-ai.version}</version></dependency>

其他的依赖见前面几篇文章

3. 新增mcp表

CREATE TABLE `ai_mcp` (`id` bigint NOT NULL AUTO_INCREMENT,`content_type` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_german2_ci NOT NULL COMMENT '连接类型,sse,stdio',`name` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_german2_ci NOT NULL COMMENT 'mcp名称',`description` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_german2_ci DEFAULT NULL COMMENT '功能描述',`config_json` json NOT NULL COMMENT '配置参数,json类型',`status` tinyint(1) DEFAULT NULL COMMENT '可用状态,1可用,0不可用',`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_german2_ci;

4.补充增删改查

使用的mybatis-plus,省略mapper层


import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.lanyu.springainovel.entity.AiMcp;
import org.lanyu.springainovel.entity.ServerConfig;
import org.lanyu.springainovel.mapper.AiMcpMapper;
import org.lanyu.springainovel.util.ConversionUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.sql.Timestamp;
import java.util.List;@Service
public class AiMcpService {public AiMcpService() {super();}@Autowiredprivate AiMcpMapper aiMcpMapper;public List<AiMcp> getAllEnabledMcp() {QueryWrapper<AiMcp> query = new QueryWrapper<>();query.eq("status", 1);return aiMcpMapper.selectList(query);}public void addMcp(AiMcp mcp, DynamicMcpClientManager dynamicMcpClientManager) {mcp.setUpdateTime(new Timestamp(System.currentTimeMillis()));// 从configJson中提取mcpServers作为mcp名称和类型try {ObjectMapper objectMapper = new ObjectMapper();JsonNode root = objectMapper.readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isObject() && serversNode.size() > 0) {// 获取第一个服务器名称作为mcp名称String serverName = serversNode.fieldNames().next();mcp.setName(serverName);// 获取服务器类型JsonNode serverNode = serversNode.path(serverName);String type = serverNode.path("type").asText("");if ("sse".equalsIgnoreCase(type) || "stdio".equalsIgnoreCase(type)) {mcp.setContentType(type);}}} catch (Exception e) {// 日志输出}aiMcpMapper.insert(mcp);if (mcp.getStatus() != null && mcp.getStatus() == 1) {// 解析 configJson,提取所有 servertry {ObjectMapper objectMapper = new ObjectMapper();JsonNode root = objectMapper.readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isObject()) {serversNode.fields().forEachRemaining(entry -> {String serverName = entry.getKey();JsonNode serverNode = entry.getValue();String type = serverNode.path("type").asText("");if ("sse".equalsIgnoreCase(type) || "stdio".equalsIgnoreCase(type)) {ServerConfig config = ConversionUtil.parseServerConfigFromJson(serverName, serverNode, type);if (config != null) {dynamicMcpClientManager.addOrUpdateServerConfig(serverName, config);}}});}} catch (Exception e) {// 日志输出}}}public void updateMcp(AiMcp mcp) {mcp.setUpdateTime(new Timestamp(System.currentTimeMillis()));// 从configJson中提取mcpServers作为mcp名称try {ObjectMapper objectMapper = new ObjectMapper();JsonNode root = objectMapper.readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isObject() && serversNode.size() > 0) {// 获取第一个服务器名称作为mcp名称String serverName = serversNode.fieldNames().next();mcp.setName(serverName);// 获取服务器类型JsonNode serverNode = serversNode.path(serverName);String type = serverNode.path("type").asText("");if ("sse".equalsIgnoreCase(type) || "stdio".equalsIgnoreCase(type)) {mcp.setContentType(type);}}} catch (Exception e) {// 日志输出}aiMcpMapper.updateById(mcp);}public void deleteMcp(Long id, DynamicMcpClientManager dynamicMcpClientManager) {AiMcp mcp = aiMcpMapper.selectById(id);if (mcp != null) {// 解析 configJson,提取所有 servertry {ObjectMapper objectMapper = new ObjectMapper();JsonNode root = objectMapper.readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isObject()) {serversNode.fields().forEachRemaining(entry -> {String serverName = entry.getKey();dynamicMcpClientManager.removeServerConfig(serverName);});}} catch (Exception e) {// 日志输出}aiMcpMapper.deleteById(id);}}public AiMcp getById(Long id) {return aiMcpMapper.selectById(id);}public AiMcp getByName(String name) {QueryWrapper<AiMcp> query = new QueryWrapper<>();query.eq("name", name);return aiMcpMapper.selectOne(query);}public Page<AiMcp> pageQuery(String name, int page, int size) {QueryWrapper<AiMcp> query = new QueryWrapper<>();if (name != null && !name.isEmpty()) {query.like("name", name);}query.orderByDesc("update_time");return aiMcpMapper.selectPage(new Page<>(page, size), query);}
}

5.创建mcp管理类(核心)

/*** 动态MCP客户端管理器,支持热加载和配置变更。** <p>此服务提供以下功能:* <ul>* <li>动态读取MCP服务器配置</li>* <li>自动连接和重连MCP服务器</li>* <li>实时发现新工具</li>* <li>配置变更时自动更新连接</li>* <li>健康检查和故障恢复</li>* </ul>** @author Spring AI Team* @since 1.1.0*/
@Service
public class DynamicMcpClientManager implements DisposableBean {private static final Logger logger = LoggerFactory.getLogger(DynamicMcpClientManager.class);/*** 活跃的MCP客户端,key为服务器名,value为McpSyncClient实例*/private final Map<String, McpSyncClient> activeClients = new ConcurrentHashMap<>();/*** 当前连接状态的配置,key为服务器名,value为ServerConfig*/private final Map<String, ServerConfig> currentConfigs = new ConcurrentHashMap<>();/*** 定时任务调度器*/private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);/*** AiMcpService用于数据库操作*/private final AiMcpService aiMcpService;/*** 缓存的工具回调列表,定期刷新*/private volatile List<SyncMcpToolCallback> cachedToolCallbacks = new ArrayList<>();/*** 上次工具回调刷新的时间戳*/private volatile long lastToolRefresh = 0;/*** 工具回调缓存的有效期(毫秒)*/private static final long TOOL_CACHE_TTL = 30000; // 30秒缓存/*** 最大重连次数,所有重连逻辑统一引用该常量*/private static final int MAX_RECONNECT_ATTEMPTS = 3;public DynamicMcpClientManager(AiMcpService aiMcpService) {this.aiMcpService = aiMcpService;}@PostConstructpublic void init() {loadSpringAiMcpConfiguration();
//        scheduler.scheduleWithFixedDelay(this::loadSpringAiMcpConfiguration, 30, 30, TimeUnit.SECONDS);scheduler.scheduleWithFixedDelay(this::performHealthCheck, 30, 30, TimeUnit.SECONDS);logger.info("动态MCP客户端管理器已启动");}/*** 从数据库加载MCP配置,只有配置实际变更时才调用updateServerConfigs*/public void loadSpringAiMcpConfiguration() {logger.info("从数据库加载MCP配置");Map<String, ServerConfig> newConfigs = new ConcurrentHashMap<>();try {for (AiMcp mcp : aiMcpService.getAllEnabledMcp()) {if (mcp.getConfigJson() == null || mcp.getConfigJson().isEmpty()) {logger.warn("MCP配置[{}]缺少configJson,跳过", mcp.getName());continue;}try {JsonNode root = new ObjectMapper().readTree(mcp.getConfigJson());JsonNode serversNode = root.path("mcpServers");if (serversNode.isMissingNode() || !serversNode.isObject()) {logger.warn("MCP配置[{}]的mcpServers字段缺失或格式错误,跳过", mcp.getName());continue;}serversNode.fields().forEachRemaining(entry -> {String serverName = entry.getKey();JsonNode serverNode = entry.getValue();String type = serverNode.path("type").asText("");ServerConfig config = ConversionUtil.parseServerConfigFromJson(serverName, serverNode, type);if (config != null) {newConfigs.put(serverName, config);}});} catch (Exception e) {logger.warn("MCP配置[{}]解析configJson失败: {},跳过", mcp.getName(), e.getMessage());}}} catch (Exception e) {logger.error("从数据库加载MCP配置失败", e);}// 只有配置实际变更时才更新if (!newConfigs.equals(currentConfigs)) {updateServerConfigs(newConfigs);} else {logger.debug("MCP配置无变更,无需更新");}}/*** 更新服务器配置并重新连接。重连次数由MAX_RECONNECT_ATTEMPTS控制。*/public synchronized void updateServerConfigs(Map<String, ServerConfig> newConfigs) {logger.info("更新MCP服务器配置,共 {} 个服务器", newConfigs.size());// 移除不再存在的服务器currentConfigs.keySet().removeIf(serverName -> {if (!newConfigs.containsKey(serverName)) {disconnectServer(serverName);return true;}return false;});// 添加或更新服务器配置for (Map.Entry<String, ServerConfig> entry : newConfigs.entrySet()) {String serverName = entry.getKey();ServerConfig newConfig = entry.getValue();ServerConfig oldConfig = currentConfigs.get(serverName);// 如果配置发生变化或新加,尝试连接if (oldConfig == null || !configEquals(oldConfig, newConfig)) {currentConfigs.put(serverName, newConfig);if (newConfig.isEnabled()) {boolean connected = false;int attempts = 0;while (!connected && attempts < MAX_RECONNECT_ATTEMPTS) {attempts++;logger.info("[重连策略] 连接/重连MCP服务器[{}],第{}次尝试...", serverName, attempts);connectServer(serverName, newConfig);if (activeClients.containsKey(serverName)) {logger.info("[重连策略] 连接/重连MCP服务器[{}]成功", serverName);connected = true;} else {logger.warn("[重连策略] 连接/重连MCP服务器[{}]失败,等待2分钟后重试...", serverName);try {Thread.sleep(120_000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}if (!connected) {logger.error("[重连策略] MCP服务器[{}]重连已达最大次数({}),将自动禁用该服务", serverName, MAX_RECONNECT_ATTEMPTS);disableMcpInDatabase(serverName);}} else {disconnectServer(serverName);}}}// 清除工具缓存,强制重新加载invalidateToolCache();}/*** 连接到MCP服务器(重连和首次连接参数完全一致,重连前彻底释放资源)*/private void connectServer(String serverName, ServerConfig config) {logger.info("连接到MCP服务器: {} ({})", serverName, config.getUrl());// 1. 彻底释放旧资源disconnectServer(serverName);try {// 2. 构建全新 McpSyncClient,参数与首次一致io.modelcontextprotocol.client.McpSyncClient client;if (config.getUrl().startsWith("stdio://")) {logger.info("使用STDIO传输连接服务器: {}", serverName);// TODO: STDIO传输实现logger.warn("STDIO传输暂未实现,服务器: {}", serverName);return;} else {logger.info("使用SSE传输连接服务器: {} -> {} (endpoint: {})", serverName, config.getUrl(), config.getSseEndpoint());HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder(config.getUrl()).clientBuilder(HttpClient.newBuilder()).sseEndpoint(config.getSseEndpoint()).build();client = McpClient.sync(transport).requestTimeout(config.getTimeout()).build();}// 3. 初始化客户端client.initialize();activeClients.put(serverName, client);logger.info("成功连接到MCP服务器: {}", serverName);} catch (Exception e) {logger.error("连接MCP服务器失败: {} - {} (url: {}, sseEndpoint: {})", serverName, e.getMessage(), config.getUrl(), config.getSseEndpoint(), e);}}/*** 彻底断开MCP服务器连接,释放所有资源*/private void disconnectServer(String serverName) {io.modelcontextprotocol.client.McpSyncClient client = activeClients.remove(serverName);if (client != null) {try {client.close();logger.info("已断开MCP服务器连接: {}", serverName);} catch (Exception e) {logger.warn("断开MCP服务器连接时出错: {} - {}", serverName, e.getMessage());}}// 清理相关缓存// cachedToolCallbacks = new ArrayList<>(); // 若有必要可清理}/*** 获取所有活跃的客户端*/public List<McpSyncClient> getActiveClients() {return new ArrayList<>(activeClients.values());}/*** 获取所有可用的工具回调(带缓存)*/public List<ToolCallback> getAvailableToolCallbacks() {long now = System.currentTimeMillis();if (now - lastToolRefresh > TOOL_CACHE_TTL || cachedToolCallbacks.isEmpty()) {refreshToolCallbacks();lastToolRefresh = now;}return new ArrayList<>(cachedToolCallbacks);}/*** 强制刷新工具回调,重新从所有活跃客户端获取工具*/public synchronized void refreshToolCallbacks() {logger.info("刷新MCP工具回调");List<McpSyncClient> clients = getActiveClients();if (clients.isEmpty()) {cachedToolCallbacks = new ArrayList<>();return;}try {logger.info("准备调用createToolCallbacks");List<SyncMcpToolCallback> newCallbacks = SimpleMcpToolCallbackProvider.createToolCallbacks(clients);logger.info("createToolCallbacks调用工具结果:{}", newCallbacks.size());cachedToolCallbacks = newCallbacks;logger.info("刷新{}个工具", newCallbacks.size());} catch (Exception e) {logger.error("刷新工具回调失败", e);}}/*** 获取指定Mcp服务的工具列表*/public List<SyncMcpToolCallback> getToolByMcpName(String mcpName) {McpSyncClient client = activeClients.get(mcpName);if (client == null) {return new ArrayList<>();}try {return SimpleMcpToolCallbackProvider.getToolCallbacksByClientName(client);} catch (Exception e) {return new ArrayList<>();}}/*** 清除工具缓存*/public void invalidateToolCache() {lastToolRefresh = 0;logger.debug("工具缓存已清除");}/*** 执行健康检查*/private void performHealthCheck() {logger.info("执行MCP客户端健康检查");List<String> unhealthyServers = new ArrayList<>();for (Map.Entry<String, McpSyncClient> entry : activeClients.entrySet()) {String serverName = entry.getKey();McpSyncClient client = entry.getValue();try {List<McpSchema.Tool> tools = client.listTools().tools();//logger.info("检查工具:{}", tools.toString());} catch (Exception e) {logger.warn("MCP服务器 {} 健康检查失败: {},将尝试重连", serverName, e.getMessage());unhealthyServers.add(serverName);}}// 只负责发现断开,重连交给 reconnectWithLimitfor (String serverName : unhealthyServers) {ServerConfig config = currentConfigs.get(serverName);if (config != null && config.isEnabled()) {logger.info("MCP服务器 {} 已断开,将尝试重连", serverName);reconnectWithLimit(serverName, config);}}}/*** 比较两个配置是否相等*/private boolean configEquals(ServerConfig config1, ServerConfig config2) {if (config1 == config2) return true;if (config1 == null || config2 == null) return false;return java.util.Objects.equals(config1.getUrl(), config2.getUrl()) &&java.util.Objects.equals(config1.getSseEndpoint(), config2.getSseEndpoint()) &&java.util.Objects.equals(config1.getHeaders(), config2.getHeaders()) &&java.util.Objects.equals(config1.getTimeout(), config2.getTimeout()) &&config1.isEnabled() == config2.isEnabled();}private void reconnectWithLimit(String serverName, ServerConfig config) {int attempts = 0;boolean connected = false;while (attempts < MAX_RECONNECT_ATTEMPTS && !connected) {attempts++;logger.warn("重连MCP服务器[{}],第{}次尝试...,最大{}次", serverName, attempts, MAX_RECONNECT_ATTEMPTS);try {connectServer(serverName, config);if (activeClients.containsKey(serverName)) {logger.info("重连MCP服务器[{}]成功", serverName);connected = true;}} catch (Exception e) {logger.error("重连MCP服务器[{}]失败: {}", serverName, e.getMessage());}}if (!connected) {logger.error("MCP服务器[{}]重连已达最大次数({}),将自动禁用该服务", serverName, MAX_RECONNECT_ATTEMPTS);disableMcpInDatabase(serverName);}}private void disableMcpInDatabase(String serverName) {try {org.lanyu.springainovel.entity.AiMcp mcp = aiMcpService.getByName(serverName);if (mcp != null && mcp.getStatus() != null && mcp.getStatus() != 0) {mcp.setStatus(0);aiMcpService.updateMcp(mcp);logger.warn("已将MCP服务[{}]在数据库中禁用(status=0)", serverName);}// 同步禁用缓存ServerConfigServerConfig config = currentConfigs.get(serverName);if (config != null && config.isEnabled()) {config.setEnabled(false);logger.warn("已将MCP服务[{}]在缓存中禁用(enable=false)", serverName);}// 同步禁用McpConfigurationService的lastKnownConfigstry {// 反射获取AiMcpService中的McpConfigurationServicejava.lang.reflect.Field field = aiMcpService.getClass().getDeclaredField("mcpConfigurationService");field.setAccessible(true);Object mcpConfigServiceObj = field.get(aiMcpService);if (mcpConfigServiceObj != null) {java.lang.reflect.Field lastKnownConfigsField = mcpConfigServiceObj.getClass().getDeclaredField("lastKnownConfigs");lastKnownConfigsField.setAccessible(true);@SuppressWarnings("unchecked")Map<String, ServerConfig> lastKnownConfigs = (Map<String, ServerConfig>) lastKnownConfigsField.get(mcpConfigServiceObj);ServerConfig lastConfig = lastKnownConfigs.get(serverName);if (lastConfig != null && lastConfig.isEnabled()) {lastConfig.setEnabled(false);logger.warn("已将MCP服务[{}]在lastKnownConfigs中禁用(enable=false)", serverName);}}} catch (Exception e) {logger.error("同步禁用lastKnownConfigs缓存失败: {}", e.getMessage());}} catch (Exception e) {logger.error("禁用MCP服务[{}]数据库操作失败: {}", serverName, e.getMessage());}}/*** 新增或更新单个MCP服务器配置*/public synchronized void addOrUpdateServerConfig(String serverName, ServerConfig config) {Map<String, ServerConfig> newConfigs = new ConcurrentHashMap<>(currentConfigs);newConfigs.put(serverName, config);updateServerConfigs(newConfigs);}/*** 移除单个MCP服务器配置*/public synchronized void removeServerConfig(String serverName) {Map<String, ServerConfig> newConfigs = new ConcurrentHashMap<>(currentConfigs);newConfigs.remove(serverName);updateServerConfigs(newConfigs);}@Overridepublic void destroy() throws Exception {logger.info("关闭动态MCP客户端管理器");// 关闭调度器scheduler.shutdown();try {if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException e) {scheduler.shutdownNow();Thread.currentThread().interrupt();}// 关闭所有客户端连接for (String serverName : new ArrayList<>(activeClients.keySet())) {disconnectServer(serverName);}}
}
/*** 简化的MCP工具回调提供者,用于演示目的。** <p>此类提供了从MCP客户端创建工具回调的基本功能。** @author Spring AI Team* @since 1.1.0*/
public class SimpleMcpToolCallbackProvider {private static final Logger logger = LoggerFactory.getLogger(SimpleMcpToolCallbackProvider.class);private static final ObjectMapper objectMapper = new ObjectMapper();/*** 从MCP客户端列表创建工具回调列表*/public static List<SyncMcpToolCallback> createToolCallbacks(List<McpSyncClient> clients) {List<SyncMcpToolCallback> callbacks = new ArrayList<>();for (McpSyncClient client : clients) {try {// 获取工具列表McpSchema.ListToolsResult toolsResult = client.listTools();if (toolsResult != null && toolsResult.tools() != null) {for (McpSchema.Tool tool : toolsResult.tools()) {callbacks.add(new SyncMcpToolCallback(client, tool));}}} catch (Exception e) {logger.error("获取客户端工具失败", e);}}return callbacks;}/*** 从MCP客户端列表创建工具回调列表*/public static List<SyncMcpToolCallback> getToolCallbacksByClientName(McpSyncClient client) {List<SyncMcpToolCallback> callbacks = new ArrayList<>();try {// 获取工具列表McpSchema.ListToolsResult toolsResult = client.listTools();if (toolsResult != null && toolsResult.tools() != null) {for (McpSchema.Tool tool : toolsResult.tools()) {callbacks.add(new SyncMcpToolCallback(client, tool));}}} catch (Exception e) {logger.error("获取客户端工具失败", e);}return callbacks;}/*** 简化的MCP工具回调实现*/public static class SimpleMcpToolCallback implements ToolCallback {private final McpSyncClient client;private final McpSchema.Tool tool;private final ToolDefinition toolDefinition;public SimpleMcpToolCallback(McpSyncClient client, McpSchema.Tool tool) {this.client = client;this.tool = tool;this.toolDefinition = ToolDefinition.builder().name(tool.name()).description(tool.description()).inputSchema(tool.inputSchema() != null ? tool.inputSchema().toString() : "{}").build();}@Overridepublic ToolDefinition getToolDefinition() {return toolDefinition;}@Overridepublic String call(String arguments) {try {// 解析参数@SuppressWarnings("unchecked")Map<String, Object> args = objectMapper.readValue(arguments, Map.class);// 调用MCP工具McpSchema.CallToolRequest request = new McpSchema.CallToolRequest(tool.name(), args);McpSchema.CallToolResult result = client.callTool(request);if (result != null && result.content() != null && !result.content().isEmpty()) {// 提取文本内容StringBuilder response = new StringBuilder();for (Object content : result.content()) {if (content instanceof McpSchema.TextContent) {McpSchema.TextContent textContent = (McpSchema.TextContent) content;response.append(textContent.text());} else {response.append(content.toString());}}return response.toString();} else {return "工具执行完成,无返回内容";}} catch (Exception e) {logger.error("调用MCP工具失败: {}", tool.name(), e);return "错误:" + e.getMessage();}}}
}
/*** 服务器配置类*/
public class ServerConfig {public ServerConfig() { super(); }private String name;private String url;private String sseEndpoint; // SSE端点,默认为/sseprivate Map<String, String> headers;private Duration timeout = Duration.ofSeconds(30);private boolean enabled = true;// Getters and Setterspublic String getName() { return name; }public void setName(String name) { this.name = name; }public String getUrl() { return url; }public void setUrl(String url) { this.url = url; }public String getSseEndpoint() { return sseEndpoint; }public void setSseEndpoint(String sseEndpoint) { this.sseEndpoint = sseEndpoint; }public Map<String, String> getHeaders() { return headers; }public void setHeaders(Map<String, String> headers) { this.headers = headers; }public Duration getTimeout() { return timeout; }public void setTimeout(Duration timeout) { this.timeout = timeout; }public boolean isEnabled() { return enabled; }public void setEnabled(boolean enabled) { this.enabled = enabled; }/*** 获取完整的SSE URL(基础URL + SSE端点)*/public String getFullSseUrl() {if (url == null) return null;if (url.startsWith("stdio://")) return url;String baseUrl = url.endsWith("/") ? url.substring(0, url.length() - 1) : url;String endpoint = sseEndpoint.startsWith("/") ? sseEndpoint : "/" + sseEndpoint;return baseUrl + endpoint;}
}

6.Mcp的相关接口


/*** MCP工具控制器*/
@Tag(name = "mcp工具", description = "供统一的REST API来调用MCP工具")
@RestController
@RequestMapping("/mcp")
public class McpToolController {private static final Logger logger = LoggerFactory.getLogger(McpToolController.class);/*** 动态MCP客户端管理器,负责动态连接和管理MCP服务器*/@Autowired(required = false)private DynamicMcpClientManager dynamicClientManager;/*** AiMcpService,负责MCP工具的数据库操作*/@Autowiredprivate AiMcpService aiMcpService;public McpToolController() {super(); // 默认无参构造,调用父类构造函数}/*** 获取当前有效的工具回调数组,优先使用动态管理器** @return ToolCallback[] 当前可用的工具回调*/private ToolCallback[] getCurrentToolCallbacks() {List<ToolCallback> dynamicTools = dynamicClientManager.getAvailableToolCallbacks();if (!dynamicTools.isEmpty()) {return dynamicTools.toArray(new ToolCallback[0]);}return new ToolCallback[0];}/*** 获取MCP配置状态** @return 配置状态信息*/@Operation(summary = "获取MCP配置状态", description = "返回当前MCP工具的配置模式、工具数量、可用工具列表等状态信息。")@GetMapping("/status")public Map<String, Object> getStatus() {Map<String, Object> status = new HashMap<>();ToolCallback[] currentCallbacks = getCurrentToolCallbacks();status.put("toolCount", currentCallbacks.length);status.put("hasDynamicManager", dynamicClientManager != null);// 添加工具列表List<Map<String, String>> toolList = new ArrayList<>();for (ToolCallback callback : currentCallbacks) {Map<String, String> tool = new HashMap<>();tool.put("name", callback.getToolDefinition().name());tool.put("description", callback.getToolDefinition().description());toolList.add(tool);}status.put("tools", toolList);logger.debug("📊 状态查询 - 工具数: {}", currentCallbacks.length);return status;}/*** 获取可用工具列表** @return 工具列表*/@Operation(summary = "获取可用MCP工具列表", description = "返回当前可用的MCP工具及其描述、输入参数等信息。")@GetMapping("/tools")public Map<String, Object> getTools() {Map<String, Object> result = new HashMap<>();try {ToolCallback[] currentCallbacks = getCurrentToolCallbacks();List<Map<String, Object>> toolList = new ArrayList<>();for (ToolCallback callback : currentCallbacks) {Map<String, Object> tool = new HashMap<>();tool.put("name", callback.getToolDefinition().name());tool.put("description", callback.getToolDefinition().description());tool.put("inputSchema", callback.getToolDefinition().inputSchema());toolList.add(tool);}result.put("success", true);result.put("toolCount", currentCallbacks.length);result.put("tools", toolList);logger.debug("🔧 工具列表查询 - 返回 {} 个工具", currentCallbacks.length);} catch (Exception e) {logger.error("❌ 获取工具列表失败", e);result.put("success", false);result.put("message", "获取工具列表失败: " + e.getMessage());result.put("toolCount", 0);result.put("tools", new ArrayList<>());}return result;}/*** 获取可用工具列表** @return 工具列表*/@Operation(summary = "获取指定MCP工具列表", description = "返回当前MCP服务的工具及其描述、输入参数等信息。")@GetMapping("/toolsByName")public Map<String, Object> toolsByName(@RequestParam String mcpName) {Map<String, Object> result = new HashMap<>();List<SyncMcpToolCallback> tools = dynamicClientManager.getToolByMcpName(mcpName);List<Map<String, Object>> toolList = new ArrayList<>();for (ToolCallback callback : tools) {Map<String, Object> tool = new HashMap<>();tool.put("name", callback.getToolDefinition().name());tool.put("description", callback.getToolDefinition().description());tool.put("inputSchema", callback.getToolDefinition().inputSchema());toolList.add(tool);}result.put("success", true);result.put("toolCount", tools.size());result.put("tools", toolList);logger.debug("🔧 工具列表查询 - 返回 {} 个工具", tools.size());return result;}/*** 调用MCP工具** @param toolName  工具名称* @param arguments 工具参数(JSON格式)* @return 调用结果*/@Operation(summary = "调用指定MCP工具", description = "根据工具名称和参数调用MCP工具,返回调用结果。参数为JSON字符串。")@PostMapping("/call/{toolName}")public Map<String, Object> callTool(@PathVariable String toolName, @RequestBody String arguments) {Map<String, Object> result = new HashMap<>();try {ToolCallback[] currentCallbacks = getCurrentToolCallbacks();if (currentCallbacks.length == 0) {result.put("success", false);result.put("message", "当前没有可用的MCP工具");return result;}// 查找指定的工具ToolCallback targetTool = null;for (ToolCallback callback : currentCallbacks) {if (callback.getToolDefinition().name().equals(toolName)) {targetTool = callback;break;}}if (targetTool == null) {result.put("success", false);result.put("message", "工具不存在: " + toolName);result.put("availableTools", Arrays.stream(currentCallbacks).map(cb -> cb.getToolDefinition().name()).toArray());return result;}logger.debug("🔧 调用工具: {}, 参数: {}", toolName, arguments);// 调用工具String toolResult = targetTool.call(arguments);result.put("success", true);result.put("toolName", toolName);result.put("result", toolResult);logger.info("✅ 工具调用成功: {} -> {}", toolName, toolResult);} catch (Exception e) {logger.error("❌ 工具调用失败: {}", toolName, e);result.put("success", false);result.put("message", "调用失败: " + e.getMessage());result.put("toolName", toolName);}return result;}/*** 刷新工具列表(重新从MCP服务器获取)** @return 刷新结果*/@Operation(summary = "刷新MCP工具列表", description = "重新从MCP服务器获取工具列表并刷新缓存。")@PostMapping("/refresh")public Map<String, Object> refreshTools() {Map<String, Object> result = new HashMap<>();try {if (dynamicClientManager != null) {logger.info("🔄 刷新MCP工具列表");dynamicClientManager.refreshToolCallbacks();List<ToolCallback> tools = dynamicClientManager.getAvailableToolCallbacks();result.put("success", true);result.put("message", "工具列表已刷新");result.put("toolCount", tools.size());logger.info("✅ 工具列表刷新完成,发现 {} 个工具", tools.size());} else {result.put("success", false);result.put("message", "动态管理器不可用");}} catch (Exception e) {logger.error("❌ 刷新工具列表失败", e);result.put("success", false);result.put("message", "刷新失败: " + e.getMessage());}return result;}/*** 新增MCP工具*/@Operation(summary = "新增MCP工具", description = "新增一条MCP工具记录")@PostMapping("/entity")public RestVO<String> addMcp(@RequestBody AiMcp mcp) {try {aiMcpService.addMcp(mcp, dynamicClientManager);return RestVO.success("新增成功");} catch (Exception e) {return RestVO.fail("新增失败: " + e.getMessage());}}/*** 修改MCP工具*/@Operation(summary = "修改MCP工具", description = "根据ID修改MCP工具记录")@PutMapping("/entity/{id}")public RestVO<String> updateMcp(@PathVariable Long id, @RequestBody AiMcp mcp) {try {mcp.setId(id);aiMcpService.updateMcp(mcp);return RestVO.success("修改成功");} catch (Exception e) {return RestVO.fail("修改失败: " + e.getMessage());}}/*** 删除MCP工具*/@Operation(summary = "删除MCP工具", description = "根据ID删除MCP工具记录")@DeleteMapping("/entity/{id}")public RestVO<String> deleteMcp(@PathVariable Long id) {try {aiMcpService.deleteMcp(id, dynamicClientManager);return RestVO.success("删除成功");} catch (Exception e) {return RestVO.fail("删除失败: " + e.getMessage());}}/*** 分页查询MCP工具,支持名称模糊搜索,按添加日期倒序*/@Operation(summary = "分页查询MCP工具", description = "分页查询MCP工具,支持名称模糊搜索,按添加日期倒序排列")@GetMapping("/entity/page")public RestVO<Map<String, Object>> pageMcp(@RequestParam(defaultValue = "") String name,@RequestParam(defaultValue = "1") int page,@RequestParam(defaultValue = "10") int size) {try {Page<AiMcp> resultPage = aiMcpService.pageQuery(name, page, size);Map<String, Object> result = new HashMap<>();result.put("total", resultPage.getTotal());result.put("pages", resultPage.getPages());result.put("current", resultPage.getCurrent());result.put("size", resultPage.getSize());result.put("records", resultPage.getRecords());return RestVO.success(result);} catch (Exception e) {return RestVO.fail("分页查询失败: " + e.getMessage());}}
}

7.测试mcp工具

添加一个高德地图的mcp ,key从高德开放平台获取

{"mcpServers": {"amap-amap-sse": {"url": "https://mcp.amap.com","type": "sse","sseEndpoint": "/sse?key=xxxxxxxxxxxxxxxxxx"}}
}

查询可用工具
在这里插入图片描述

8.ChatClient接入toolCallbacks

private Flux<FluxVO> getFluxVOFlux(List<Message> messageList, AiModel myModel, QuestionVO body) {Prompt prompt = new Prompt(messageList);AtomicBoolean inThinking = new AtomicBoolean(false);StringBuffer outputText = body.getMemory() ? new StringBuffer() : null;ChatClient chatModel = myModel.getChatClient();// 1. 先构造 Publisher<ChatResponse>Flux<ChatResponse> publisher;//判断是否需要启用mcp工具if (body.getUseTools()) {List<ToolCallback> toolCallbacks = dynamicMcpClientManager.getAvailableToolCallbacks();publisher = chatModel.prompt(prompt).toolCallbacks(toolCallbacks).stream().chatResponse();} else {publisher = chatModel.prompt(prompt).stream().chatResponse();}// 主动推送一条“处理中”消息Flux<FluxVO> proactiveMsg = Flux.just(FluxVO.builder().text("").status("before").build());Flux<FluxVO> resp = Flux.from(publisher).doFirst(() -> {System.out.println("-------------开始输出");if (body.getMemory()) {chatMemoryService.saveMessage(body);}}).doFinally(signalType -> {System.out.println("-------------流式处理结束");if (body.getMemory() && outputText != null) {chatMemoryService.saveMessage(body.getSessionId(), "ASSISTANT", outputText.toString(), body.getModel());}});return Flux.concat(proactiveMsg, resp);

getFluxVOFlux 之前的代码可以参考同系列前几章节

9.测试最终效果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/91748.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/91748.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/91748.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【PDF + ZIP 合并器:把ZIP文件打包至PDF文件中】

B站链接 PDF ZIP 合并器&#xff1a;把ZIP文件打包至PDF文件中_哔哩哔哩_bilibiliz 加强作者的工具 https://wwgw.lanzn.com/i8h1C32k9bef 密码:30cv 新增c框架&#xff0c;加快运行速度

阿里云部署微调chatglm3

git Ifs install Git lfs 主要用于管理大型文件。在传统的Git仓库中&#xff0c;所有文件内容都会被完整记录在每一次提交中&#xff0c;这会导致仓库体积增大&#xff0c;克隆、拉取和推送操作变慢&#xff0c;甚至可能超出存储限额。Git LFS通过将大文件替换成文本指针&#…

Linux网络编程 ---五种IO模型

五种IO模型一、IO慢的原因二、五种IO模型三、如何设置非阻塞式IO&#xff1f;一、IO慢的原因 二、五种IO模型 阻塞式IO 非阻塞式IO 信号驱动IO 多路转接 异步IO 三、如何设置非阻塞式IO&#xff1f; &#xff08;一&#xff09;用法说明 &#xff08;二&#xff0…

Obsidian结合CI/CD实现自动发布

CI/CDQuickAddJS脚本bat脚本sh脚本实现自动发版Hugo文章 需求来源 每次手动执行Hugo的命令&#xff0c;手动把public文件夹上传到自己的服务器可以完成发版需求。 但是&#xff0c;作为一个内容创作者&#xff0c;我更希望的关注于自己的内容&#xff0c;而不是关注整个发版…

[硬件电路-141]:模拟电路 - 源电路,信号源与电源,能自己产生确定性波形的电路。

源电路&#xff08;Source Circuit&#xff09;是电子系统中为其他电路或负载提供特定信号或能量的基础电路模块&#xff0c;其核心功能是生成、调节或转换所需的物理量&#xff08;如电压、电流、波形、频率等&#xff09;。以下是源电路的详细解析&#xff1a;一、源电路的核…

Unity_数据持久化_PlayerPrefs基础

Unity数据持久化 一、数据持久化基础概念 1.1 什么是数据持久化 定义&#xff1a; 数据持久化就是将内存中的数据模型转换为存储模型&#xff0c;以及将存储模型转换为内存中的数据模型的统称。 通俗解释&#xff1a; 将游戏数据存储到硬盘&#xff0c;硬盘中数据读取到游戏中&…

什么是列存储(Columnar Storage)?深度解析其原理与应用场景

列存储的基本概念&#xff1a;颠覆传统的数据组织方式列存储&#xff08;Column Storage&#xff09;是一种革命性的数据库存储技术&#xff0c;它通过按列而非按行组织数据&#xff0c;从根本上改变了数据的物理存储结构。与传统行存储数据库不同&#xff0c;列式数据库将每一…

机器人抓取流程介绍与实现——机器人抓取系统基础系列(七)

机器人抓取系统基础系列文章目录 1. UR机械臂的ROS驱动安装官方教程详解——机器人抓取系统基础系列&#xff08;一&#xff09; 2. MoveIt控制机械臂的运动实现——机器人抓取系统基础系列&#xff08;二&#xff09; 3. 机器人&#xff08;机械臂&#xff09;的相机选型与安装…

【Qt】QObject::startTimer: Timers cannot be started from another thread

QTimer对象的 start 函数调用必须和创建QTimer对象是同一个线程。 #include "QtTimerTest.h" #include <QDebug>QtTimerTest::QtTimerTest(QWidget *parent): QMainWindow(parent),m_timer(nullptr),m_timerThread(nullptr), m_workingThread(nullptr) {ui.set…

社会治安满意度调查:为城市安全治理提供精准参考(满意度调查公司)

在社会治理不断深化的背景下&#xff0c;公众对社会治安的感知与评价已成为衡量城市治理水平的重要维度&#xff08;社会治安满意度调查&#xff09;&#xff08;公众满意度调查&#xff09;&#xff08;满意度调查&#xff09;。为全面掌握市民对治安状况的真实反馈&#xff0…

Python篇--- Python 的加载、缓存、覆盖机制

要理解 import 与 if __name__ "__main__": 的关系&#xff0c;以及 Python 的加载、缓存、覆盖机制&#xff0c;我们可以从 “模块的两种身份” 和 “导入的全过程” 入手&#xff0c;用通俗的例子一步步拆解。一、核心&#xff1a;模块的 “双重身份” 与 __name_…

Java设计模式之行为型模式(访问者模式)应用场景分析

访问者模式&#xff08;Visitor Pattern&#xff09;作为Java设计模式中的“隐形冠军”&#xff0c;常被开发者低估其价值。这一模式通过“双分派”机制巧妙解耦数据结构与操作&#xff0c;为复杂系统的扩展提供了强大武器。在大厂项目中&#xff0c;访问者模式往往出现在业务逻…

【IDEA】JavaWeb自定义servlet模板

方法一&#xff1a;&#xff08;推荐去使用方法二&#xff0c;还能创建其它代码模板&#xff09;使用servlet模板创建Servlet类如果创建时找不到servlet模板&#xff1a;File -> Project Structure然后应用 -> OK&#xff0c;如果还是找不到Servlet模板&#xff0c;看看项…

Linux选择

在内存中运行着的进程称为&#xff08; 服务 &#xff09;。负责控制systemd系统和服务管理器的工具为&#xff08; systemctl &#xff09;命令。systemd管理系统服务的基本单位是&#xff08; unit &#xff09;。分配和管理资源的基本单位是&#xff08; 进程 &#xf…

【Redis学习路|第一篇】初步认识Redis

概要: 深入探讨NoSQL数据库的核心特性&#xff0c;对比传统关系型数据库的差异&#xff0c;重点介绍Redis作为内存数据库的优势与应用场景。 文章目录认识 NoSQLNoSQL vs SQL 对比1️⃣ 结构化 vs 非结构化2️⃣ 关联 vs 非关联3️⃣ 查询方式对比4️⃣ 事务特性5️⃣ 存储方式…

java局域网聊天室小项目架构思路

java局域网聊天室小项目架构思路 项目需求 创建一个局域网聊天系统&#xff0c;要求&#xff1a;用户在登录界面登录后进入聊天窗口界面&#xff0c;能实现多用户同时在线聊天&#xff0c;并且用户之间可以进行私聊 项目用到的技术栈 java网络编程java多线程java面向对象编…

vulhub-corrosion2靶机

1.安装靶机 https://download.vulnhub.com/corrosion/Corrosion2.ovahttps://download.vulnhub.com/corrosion/Corrosion2.ova 2.扫描IP 3.扫描端口 4.访问端口 首先访问一下80端口 访问一个8080端口发现是一个apache的页面 5.扫描目录与漏洞探测 那么我们扫描一下目录 80…

Mysql深入学习:慢sql执行

目录 慢查询日志 慢查询主要步骤 11种慢查询的场景分析 场景一&#xff1a;SQL 没有建立索引 场景二&#xff1a;索引未生效的典型原因 场景三&#xff1a;LIMIT 深分页导致性能下降 场景四&#xff1a;单表数据量过大导致 SQL 性能下降 场景五&#xff1a;ORDER BY 出现…

李宏毅深度学习教程 第8-9章 生成模型+扩散模型

【2025版】12 生成式对抗网络GAN 一 – 基本概念介紹_哔哩哔哩_bilibili 目录 1. GAN生成式对抗网络 2. GAN的训练 散度差异 3.WGAN 4.训练GAN 5. 如何客观评估GAN 6. 条件型生成&#xff08;按照要求&#xff09; 7. Cycle GAN&#xff08;互转配对&#xff09; 8. d…

1.8 axios详解

Axios的定义与核心特性Axios是一个基于Promise的现代化HTTP客户端库&#xff0c;主要用于在浏览器和Node.js 环境中发送HTTP请求&#xff0c;旨在简化异步数据交互流程。其核心特性如下&#xff1a;跨平台支持&#xff1a;在浏览器中通过XMLHttpRequest对象发送请求&#xff0c…