Spring AI阿里百炼平台实现流式对话:基于 SSE 的实践指南
在大模型应用开发中,流式对话是提升用户体验的关键特性。本文将详细介绍如何利用 Spring AI 结合 Spring Boot,基于 SSE(Server-Sent Events)协议实现高效的流式对话功能,包括动态中断机制和前端交互优化。
技术选型与协议解析
SSE 协议与 WebSocket 的区别
SSE(Server-Sent Events)是一种基于 HTTP 的轻量级服务器向客户端推送信息的协议,与 WebSocket 相比有显著差异:
特性 | SSE | WebSocket |
---|---|---|
通信方向 | 单向(服务端 → 客户端) | 全双工(双向通信) |
连接方式 | 基于 HTTP 长连接 | 独立的 WebSocket 协议 |
复杂度 | 简单(无需复杂握手) | 复杂(需要专门握手过程) |
适用场景 | 消息推送、实时通知、流式输出 | 即时通讯、游戏等双向交互场景 |
数据格式 | 文本/event-stream 格式 | 二进制/文本,需自定义格式 |
对于仅需服务端向客户端推送流式响应的对话场景,SSE 是更简洁高效的选择。
项目环境搭建
核心依赖配置
首先在 pom.xml
中添加必要依赖(注意修正版本号格式错误):
<!-- Spring AI 依赖管理 -->
<dependencyManagement><dependencies><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-bom</artifactId><version>1.0.0-M6</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement><!-- 核心依赖 -->
<dependencies><!-- Spring AI OpenAI 适配 --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-openai-spring-boot-starter</artifactId></dependency><!-- 阿里百炼 SDK(兼容 OpenAI 协议) --><dependency><groupId>com.alibaba</groupId><artifactId>dashscope-sdk-java</artifactId><version>2.16.9</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId></exclusion></exclusions></dependency><!-- Spring WebFlux(响应式编程支持) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>
</dependencies>
配置文件设置
在 application.yml
中配置模型服务信息:
spring:ai:openai:base-url: https://dashscope.aliyuncs.com/compatible-mode # 阿里百炼兼容接口api-key: sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx # 替换为实际API密钥chat:options:model: qwen-plus # 可替换为实际使用的模型,如qwen-max、qwen-turbo等
核心功能实现
1. ChatClient 配置
创建 ChatClient
实例,配置对话模型、记忆机制和系统提示:
import org.springframework.ai.chat.ChatClient;
import org.springframework.ai.chat.ChatMemory;
import org.springframework.ai.chat.advisor.MessageChatMemoryAdvisor;
import org.springframework.ai.chat.advisor.SimpleLoggerAdvisor;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class AiConfig {@Beanpublic ChatClient chatClient(ChatModel chatModel, ChatMemory chatMemory) {return ChatClient.builder(chatModel).defaultOptions(options -> options.withModel("qwen-plus") // 模型名称.withTemperature(0.7f)) // 新增温度参数,控制输出随机性.defaultSystem("你是一个友好的智能助手,负责解答用户问题") // 修正引号格式.defaultAdvisors(new SimpleLoggerAdvisor(), // 日志记录new MessageChatMemoryAdvisor(chatMemory) // 对话记忆).build();}
}
2. 流式对话控制器
实现支持 SSE 的控制器,包含流式响应和中断功能:
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import java.util.concurrent.atomic.AtomicBoolean;import static org.springframework.ai.chat.client.advisor.AbstractChatMemoryAdvisor.CHAT_MEMORY_CONVERSATION_ID_KEY;@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
@Slf4j
public class ChatController {private final ChatClient chatClient;private final AtomicBoolean isStreaming = new AtomicBoolean(true); // 线程安全的状态标识/*** 流式对话接口* @param prompt 用户输入* @param chatId 对话ID(用于记忆上下文)*/@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamChat(@RequestParam String prompt,@RequestParam(required = false, defaultValue = "default") String chatId) {log.info("收到对话请求 [{}]: {}", chatId, prompt);isStreaming.set(true); // 重置流式状态return chatClient.prompt().user(prompt).advisors(advisorSpec -> advisorSpec.param(CHAT_MEMORY_CONVERSATION_ID_KEY, chatId)).stream().content().takeWhile(data -> isStreaming.get()) // 动态中断控制.doOnCancel(() -> log.info("对话流已取消 [{}]", chatId)) // 取消时日志.concatWithValues("\u0003"); // 结束标记(ETX字符)}/*** 中断流式输出接口*/@PostMapping("/cancel")public void cancelStream() {isStreaming.set(false);log.info("已触发流式输出中断");}
}
3. 前端交互实现
完善前端页面,处理流式数据接收、中断控制和用户体验优化:
<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=no"><title>Spring AI 流式对话演示</title><style>.container { max-width: 800px; margin: 20px auto; padding: 0 15px; }#output { border: 1px solid #e0e0e0; padding: 15px; min-height: 300px; border-radius: 8px; }.controls { margin: 15px 0; display: flex; gap: 10px; }input[type="text"] { flex: 1; padding: 8px 12px; border: 1px solid #ccc; border-radius: 4px; }button { padding: 8px 16px; cursor: pointer; border: none; border-radius: 4px; }button.send { background: #4285f4; color: white; }button.cancel { background: #ea4335; color: white; }</style>
</head>
<body><div class="container"><div class="controls"><input type="text" id="prompt" placeholder="请输入问题..."><button class="send" onclick="sendMessage()">发送</button><button class="cancel" onclick="cancelStream()">中断</button></div><h3>AI 回复:</h3><div id="output"></div></div><script>const outputDiv = document.getElementById('output');const promptInput = document.getElementById('prompt');let eventSource = null;// 发送消息function sendMessage() {const prompt = promptInput.value.trim();if (!prompt) return;// 清空输入和输出promptInput.value = '';outputDiv.innerHTML = '';// 创建对话ID(可基于时间戳生成)const chatId = 'chat_' + Date.now();// 建立SSE连接eventSource = new EventSource(`/api/chat/stream?prompt=${encodeURIComponent(prompt)}&chatId=${chatId}`);// 处理收到的消息eventSource.onmessage = (event) => {const data = event.data;// 检测结束标记if (data === '\u0003') {eventSource.close();return;}// 追加内容到输出区域outputDiv.textContent += data;};// 处理错误eventSource.onerror = () => {console.error('连接发生错误');eventSource.close();};}// 中断流式输出function cancelStream() {if (eventSource) {eventSource.close();eventSource = null;}// 通知后端停止生成fetch('/api/chat/cancel', { method: 'POST' }).catch(err => console.error('取消请求失败', err));}</script>
</body>
</html>
关键技术点解析
1. 响应式编程与 Flux
Spring AI 的流式响应基于 Reactor 框架的 Flux
实现:
Flux
代表一个异步的序列数据流,适合处理流式输出takeWhile
操作符用于根据条件(isStreaming
状态)控制流的生命周期concatWithValues
用于在流结束时添加终止标记,方便前端处理
2. 动态中断机制
- 使用
AtomicBoolean
保证多线程环境下的状态安全性 - 前端通过
/cancel
接口触发中断,后端通过takeWhile
终止流 - 结合
eventSource.close()
确保前端资源正确释放
3. 对话记忆管理
- 通过
ChatMemory
和MessageChatMemoryAdvisor
实现上下文记忆 chatId
参数用于区分不同对话会话,实现多用户/多会话隔离
常见问题与优化建议
- 依赖版本问题:确保 Spring AI 版本与 Spring Boot 版本兼容(建议使用 Spring Boot 3.2+)
通过以上实现,我们基于 Spring AI 和 SSE 协议构建了一个完整的流式对话系统,支持实时响应、动态中断和上下文记忆,为用户提供流畅的对话体验。在实际应用中,可根据业务需求进一步扩展功能,如添加消息加密、内容过滤或多模型切换等特性。