Spring AI Advisors API 提供了一种灵活且强大的方式来拦截、修改和增强 Spring 应用程序中的 AI 驱动交互。其核心思想类似于 Spring AOP(面向切面编程)中的“通知”(Advice),允许开发者在不修改核心业务逻辑的情况下,在 AI 调用前后或过程中插入自定义逻辑。
Advisors 的作用
Advisors 的主要作用包括:
- 封装重复的生成式 AI 模式:将常见的 AI 交互模式(如聊天记忆、RAG 检索增强生成)封装成可重用的组件。
- 数据转换与增强:在发送到大型语言模型(LLMs)的数据和从 LLMs 返回的数据之间进行转换和增强。
- 跨模型和用例的可移植性:提供一种机制,使得 AI 逻辑可以在不同的模型和用例之间轻松移植。
- 可观测性:Advisors 参与到 Spring AI 的可观测性栈中,可以查看与其执行相关的指标和跟踪。
核心接口
Spring AI Advisors API 主要围绕以下几个核心接口构建:
Advisor
:所有 Advisor 的父接口,继承自 Spring 的Ordered
接口,用于定义 Advisor 的执行顺序和名称。CallAdvisor
:用于非流式(non-streaming)场景的 Advisor 接口,其adviseCall
方法用于拦截和处理同步的 AI 调用。StreamAdvisor
:用于流式(streaming)场景的 Advisor 接口,其adviseStream
方法用于拦截和处理异步的 AI 流式 AI 调用。AdvisorChain
:定义了执行 Advisor 链的上下文。CallAdvisorChain
:CallAdvisor
的链,用于编排同步 Advisor 的执行。StreamAdvisorChain
:StreamAdvisor
的链,用于编排流式 Advisor 的执行。BaseAdvisor
:一个抽象基类,实现了CallAdvisor
和StreamAdvisor
的通用方面,减少了实现 Advisor 所需的样板代码,并提供了before
和after
方法来简化逻辑的插入。
这些接口共同构成了 Spring AI Advisors 的核心骨架,允许开发者以统一的方式处理不同类型的 AI 交互。
架构设计与执行流程
Spring AI Advisors 的设计借鉴了 Spring 框架中经典的责任链模式和 AOP 思想,使得 AI 请求的处理流程高度可扩展和可定制。理解其内部架构和执行流程对于有效利用 Advisors 至关重要。
类图概览
首先,我们通过一个简化的类图来概览 Spring AI Advisors 的核心接口和它们之间的关系:
从类图中可以看出:
Advisor
是所有顾问的基石,它继承了Ordered
接口,这意味着每个顾问都有一个排序值,用于确定其在链中的执行顺序。CallAdvisor
和StreamAdvisor
分别处理同步和异步(流式)的 AI 调用。CallAdvisorChain
和StreamAdvisorChain
是顾问链的抽象,它们负责按顺序调用链中的下一个顾问。BaseAdvisor
提供了一个方便的抽象,它实现了CallAdvisor
和StreamAdvisor
的默认行为,并引入了before
和after
方法,使得开发者可以更容易地在请求处理前后插入逻辑。
请求处理流程
Spring AI Advisors 的请求处理流程是一个典型的责任链模式。当一个 AI 请求(ChatClientRequest
)被发起时,它会依次通过配置好的 Advisor 链,每个 Advisor 都有机会在请求发送到 LLM 之前对其进行修改或增强,并在 LLM 返回响应之后对其进行处理。
以下是请求处理的详细流程图:
- 请求初始化:Spring AI 框架从用户的
Prompt
创建一个AdvisedRequest
对象,并附带一个空的AdvisorContext
对象。AdvisorContext
用于在整个 Advisor 链中共享状态和数据。 - Advisor 链处理(请求阶段):
AdvisedRequest
沿着 Advisor 链依次传递。每个 Advisor 都会执行其adviseCall
(同步)或adviseStream
(流式)方法。在这个阶段,Advisor 可以:
-
- 检查未密封的 Prompt 数据。
- 自定义和增强 Prompt 数据(例如,添加系统消息、上下文信息、函数定义等)。
- 调用链中的下一个实体(
chain.nextCall()
或chain.nextStream()
)。 - 选择性地阻塞请求,即不调用
nextCall/nextStream
,而是直接生成响应。在这种情况下,该 Advisor 负责填充响应。
- 发送到 LLM:链中的最后一个 Advisor(通常由框架自动添加)负责将处理后的请求发送到实际的
Chat Model
(大型语言模型)。 - LLM 响应:
Chat Model
处理请求并返回一个响应。 - Advisor 链处理(响应阶段):LLM 的响应被传递回 Advisor 链,并转换为
AdvisedResponse
。AdvisedResponse
同样包含共享的AdvisorContext
实例。每个 Advisor 都有机会处理或修改响应,例如,进行后处理、日志记录、数据提取等。 - 返回客户端:最终的
AdvisedResponse
返回给客户端,并从中提取ChatCompletion
。
时序图直观地展示了请求如何从 Client
经过 ChatClient
,然后逐级深入到 Advisor
链,最终到达 ChatModel
。响应则以相反的顺序回溯,每个 Advisor
再次有机会处理响应。这种“洋葱式”的结构确保了请求和响应在到达核心 AI 模型之前和之后都能被灵活地处理。
Advisor 的执行顺序
Advisor 在链中的执行顺序由其 getOrder()
方法返回的值决定。Spring 框架中的 Ordered
接口定义了以下语义:
- 值越小,优先级越高:
Ordered.HIGHEST_PRECEDENCE
(Integer.MIN_VALUE) 表示最高优先级,Ordered.LOWEST_PRECEDENCE
(Integer.MAX_VALUE) 表示最低优先级。 - 相同值不保证顺序:如果多个 Advisor 具有相同的
order
值,它们的执行顺序是不确定的。
Advisor 链的操作类似于一个栈:
- 请求处理阶段:
order
值最低(优先级最高)的 Advisor 最先处理请求。它位于栈的顶部,因此在请求向下传递时,它首先被调用。 - 响应处理阶段:
order
值最低(优先级最高)的 Advisor 最后处理响应。当响应从 LLM 返回并沿着链回溯时,它会最后被调用。
这种设计使得开发者可以精确控制 Advisor 的执行时机。例如,如果你希望一个 Advisor 在所有其他 Advisor 之前处理请求并在所有其他 Advisor 之后处理响应(例如,用于全局日志记录或异常处理),你应该给它设置一个非常低的 order
值(接近 Ordered.HIGHEST_PRECEDENCE
)。反之,如果你希望它最后处理请求并最先处理响应(例如,用于最终的数据格式化),则设置一个较高的 order
值(接近 Ordered.LOWEST_PRECEDENCE
)。
对于需要同时在请求和响应阶段都处于链条最前端的用例,可以考虑使用两个独立的 Advisor,并为它们配置不同的 order
值,并通过 AdvisorContext
共享状态。
代码分析:深入理解 BaseAdvisor
在 Spring AI Advisors API 中,BaseAdvisor
接口扮演着至关重要的角色,它极大地简化了自定义 Advisor 的实现。通过提供 CallAdvisor
和 StreamAdvisor
的默认实现,BaseAdvisor
将复杂的拦截逻辑抽象为两个核心方法:before
和 after
。
让我们再次回顾 BaseAdvisor
的定义:
public interface BaseAdvisor extends CallAdvisor, StreamAdvisor {Scheduler DEFAULT_SCHEDULER = Schedulers.boundedElastic();@Overridedefault ChatClientResponse adviseCall(ChatClientRequest chatClientRequest, CallAdvisorChain callAdvisorChain) {Assert.notNull(chatClientRequest, "chatClientRequest cannot be null");Assert.notNull(callAdvisorChain, "callAdvisorChain cannot be null");ChatClientRequest processedChatClientRequest = before(chatClientRequest, callAdvisorChain);ChatClientResponse chatClientResponse = callAdvisorChain.nextCall(processedChatClientRequest);return after(chatClientResponse, callAdvisorChain);}@Overridedefault Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest,StreamAdvisorChain streamAdvisorChain) {Assert.notNull(chatClientRequest, "chatClientRequest cannot be null");Assert.notNull(streamAdvisorChain, "streamAdvisorChain cannot be null");Assert.notNull(getScheduler(), "scheduler cannot be null");Flux<ChatClientResponse> chatClientResponseFlux = Mono.just(chatClientRequest).publishOn(getScheduler()).map(request -> this.before(request, streamAdvisorChain)).flatMapMany(streamAdvisorChain::nextStream);return chatClientResponseFlux.map(response -> {if (AdvisorUtils.onFinishReason().test(response)) {response = after(response, streamAdvisorChain);}return response;}).onErrorResume(error -> Flux.error(new IllegalStateException("Stream processing failed", error)));}@Overridedefault String getName() {return this.getClass().getSimpleName();}/*** Logic to be executed before the rest of the advisor chain is called.* 在调用顾问链的其余部分之前执行的逻辑*/ChatClientRequest before(ChatClientRequest chatClientRequest, AdvisorChain advisorChain);/*** Logic to be executed after the rest of the advisor chain is called.*/ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorChain advisorChain);/*** Scheduler used for processing the advisor logic when streaming.* 流式传输时用于处理顾问逻辑的调度器。*/default Scheduler getScheduler() {return DEFAULT_SCHEDULER;}}
关键点分析:
- 统一的拦截点:
BaseAdvisor
通过adviseCall
和adviseStream
方法,为同步和流式调用提供了统一的拦截逻辑。这两个方法内部都调用了before
方法来处理请求,然后调用chain.nextCall()
或chain.nextStream()
将请求传递给链中的下一个 Advisor 或最终的 AI 模型,最后调用after
方法来处理响应。 before
方法:
-
ChatClientRequest before(ChatClientRequest chatClientRequest, AdvisorChain advisorChain)
- 作用:在请求发送到链中的下一个 Advisor 或 AI 模型之前执行的逻辑。你可以在这里修改
chatClientRequest
,例如添加或修改消息、参数等。返回的ChatClientRequest
将被传递给链的后续部分。
after
方法:
-
ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorChain advisorChain)
- 作用:在链中的下一个 Advisor 或 AI 模型返回响应之后执行的逻辑。你可以在这里对
chatClientResponse
进行后处理,例如解析响应、提取信息、日志记录等。返回的ChatClientResponse
将被传递回链的上游。
- 流式处理的复杂性:对于流式处理 (
adviseStream
),BaseAdvisor
使用 Reactor 的Flux
和Mono
来处理异步流。before
方法在Mono.just(chatClientRequest).map(...)
中被调用,确保在流开始之前对请求进行预处理。after
方法则在Flux.map(...)
中被调用,并且通过AdvisorUtils.onFinishReason().test(response)
判断是否是流的最终响应,以确保after
逻辑在整个流完成时才执行,而不是对流中的每个分块都执行。 - 调度器 (
Scheduler
):BaseAdvisor
提供了getScheduler()
方法,默认使用Schedulers.boundedElastic()
。这允许在处理流式 Advisor 逻辑时指定一个调度器,以避免阻塞主线程,这对于响应式编程至关重要。
通过实现 BaseAdvisor
,开发者只需要关注 before
和 after
这两个业务逻辑相关的核心方法,而无需关心 Advisor 链的内部调用机制和流式处理的复杂性,大大降低了开发难度。
自定义 Advisor 示例
理解了 Spring AI Advisors 的核心概念和架构后,我们来看几个实际的自定义 Advisor 示例,它们将帮助你更好地掌握如何在自己的应用中利用这一强大机制。
1. 日志记录 Advisor (SimpleLoggerAdvisor
)
一个常见的需求是在 AI 请求处理的各个阶段进行日志记录,以便于调试和监控。我们可以实现一个简单的日志记录 Advisor,它在请求发送前和响应返回后打印相关信息。这个 Advisor 只观察请求和响应,不进行修改。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;import org.springframework.ai.chat.client.ChatClientRequest;
import org.springframework.ai.chat.client.ChatClientResponse;
import org.springframework.ai.chat.client.advisor.api.BaseAdvisor;
import org.springframework.ai.chat.client.advisor.api.AdvisorChain;
import org.springframework.ai.chat.client.advisor.api.CallAdvisorChain;
import org.springframework.ai.chat.client.advisor.api.StreamAdvisorChain;
import org.springframework.ai.chat.client.advisor.AdvisorUtils;public class SimpleLoggerAdvisor implements BaseAdvisor {private static final Logger logger = LoggerFactory.getLogger(SimpleLoggerAdvisor.class);@Overridepublic String getName() {return this.getClass().getSimpleName();}@Overridepublic int getOrder() {return 0; // 默认顺序,可以根据需要调整}@Overridepublic ChatClientRequest before(ChatClientRequest chatClientRequest, AdvisorChain advisorChain) {logger.debug("BEFORE: {}", chatClientRequest);return chatClientRequest; // 不修改请求}@Overridepublic ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorChain advisorChain) {logger.debug("AFTER: {}", chatClientResponse);return chatClientResponse; // 不修改响应}
}
代码分析:
SimpleLoggerAdvisor
实现了BaseAdvisor
接口,因此它同时支持同步和流式调用。getName()
方法返回 Advisor 的名称,通常是类名。getOrder()
方法返回 Advisor 的执行顺序。这里设置为0
,表示一个中等的优先级。你可以根据需要在Ordered.HIGHEST_PRECEDENCE
和Ordered.LOWEST_PRECEDENCE
之间调整。before()
方法在请求处理前被调用,我们在这里记录了ChatClientRequest
的内容。由于这个 Advisor 只是观察者,所以直接返回了原始的chatClientRequest
。after()
方法在响应返回后被调用,我们在这里记录了ChatClientResponse
的内容。同样,直接返回了原始的chatClientResponse
。
如何使用:
你可以通过 ChatClient.builder().defaultAdvisors()
方法将 SimpleLoggerAdvisor
配置到 ChatClient
中:
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatModel;// 假设 chatModel 已经注入
ChatModel chatModel = ...;ChatClient chatClient = ChatClient.builder(chatModel)
.defaultAdvisors(new SimpleLoggerAdvisor()
)
.build();String response = chatClient.prompt()
.user("你好,Spring AI!")
.call()
.content();System.out.println(response);
当上述代码执行时,你将在日志中看到 BEFORE
和 AFTER
的输出,展示了请求和响应的详细信息。
2. 重读 Advisor (ReReadingAdvisor
)
“重读”(Re-Reading)是一种提高大型语言模型推理能力的技术,其核心思想是将用户的输入查询重复一次,例如:
{Input_Query}
Read the question again: {Input_Query}
我们可以实现一个 Advisor 来自动将用户的输入查询转换为这种格式:
import java.util.HashMap;
import java.util.Map;import reactor.core.publisher.Flux;import org.springframework.ai.chat.client.ChatClientRequest;
import org.springframework.ai.chat.client.ChatClientResponse;
import org.springframework.ai.chat.client.advisor.api.BaseAdvisor;
import org.springframework.ai.chat.client.advisor.api.AdvisorChain;
import org.springframework.ai.chat.client.advisor.api.CallAdvisorChain;
import org.springframework.ai.chat.client.advisor.api.StreamAdvisorChain;public class ReReadingAdvisor implements BaseAdvisor {@Overridepublic String getName() {return this.getClass().getSimpleName();}@Overridepublic int getOrder() {return 0; // 默认顺序,可以根据需要调整}@Overridepublic ChatClientRequest before(ChatClientRequest chatClientRequest, AdvisorChain advisorChain) {Map<String, Object> advisedUserParams = new HashMap<>(chatClientRequest.userParams());advisedUserParams.put("re2_input_query", chatClientRequest.userText());return ChatClientRequest.from(chatClientRequest).userText("""{re2_input_query}Read the question again: {re2_input_query}""").userParams(advisedUserParams).build();}@Override public ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorChain advisorChain) {return chatClientResponse; // 不修改响应}
}
代码分析:
ReReadingAdvisor
也实现了BaseAdvisor
。- 关键逻辑在于
before()
方法。它首先获取原始的userText
,并将其作为参数re2_input_query
放入advisedUserParams
中。 - 然后,它使用
ChatClientRequest.from(chatClientRequest).userText(...)
来构建一个新的ChatClientRequest
,其中userText
被修改为包含重读指令的模板字符串。这里利用了 Spring AI 的模板功能,{re2_input_query}
会被实际的用户输入替换。 after()
方法同样不修改响应。
如何使用:
将 ReReadingAdvisor
配置到 ChatClient
中:
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatModel;// 假设 chatModel 已经注入
ChatModel chatModel = ...;ChatClient chatClient = ChatClient.builder(chatModel).defaultAdvisors(new ReReadingAdvisor()).build();String response = chatClient.prompt().user("请解释一下量子力学").call().content();System.out.println(response);
当用户输入“请解释一下量子力学”时,实际发送给 LLM 的 Prompt 将会是:
请解释一下量子力学
Read the question again: 请解释一下量子力学
这有助于 LLM 更好地理解和处理复杂的查询,从而提高响应质量。
3. 结合使用:聊天记忆 Advisor (MessageChatMemoryAdvisor) 和 RAG Advisor (QuestionAnswerAdvisor)
Spring AI 提供了开箱即用的 Advisors,例如 MessageChatMemoryAdvisor
用于管理聊天记忆,QuestionAnswerAdvisor
用于实现 RAG(检索增强生成)。这些 Advisor 同样遵循上述设计模式,可以方便地集成到你的应用中。
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.client.advisor.MessageChatMemoryAdvisor;
import org.springframework.ai.chat.client.advisor.QuestionAnswerAdvisor;
import org.springframework.ai.chat.memory.ChatMemory;
import org.springframework.ai.chat.memory.InMemoryChatMemory;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.vectorstore.VectorStore;// 假设 chatModel 和 vectorStore 已经注入
ChatModel chatModel = ...;
VectorStore vectorStore = ...;
ChatMemory chatMemory = new InMemoryChatMemory();ChatClient chatClient = ChatClient.builder(chatModel)
.defaultAdvisors(MessageChatMemoryAdvisor.builder(chatMemory).build(), // 聊天记忆 AdvisorQuestionAnswerAdvisor.builder(vectorStore).build() // RAG Advisor
)
.build();String conversationId = "user-123"; // 假设的用户会话ID// 第一次对话
String response1 = chatClient.prompt()
.advisors(advisor -> advisor.param(ChatMemory.CONVERSATION_ID, conversationId))
.user("你好,我叫小明。")
.call()
.content();
System.out.println("Response 1: " + response1);// 第二次对话,会利用聊天记忆
String response2 = chatClient.prompt()
.advisors(advisor -> advisor.param(ChatMemory.CONVERSATION_ID, conversationId))
.user("我刚才说了什么?")
.call()
.content();
System.out.println("Response 2: " + response2);
代码分析:
MessageChatMemoryAdvisor
会根据ChatMemory.CONVERSATION_ID
参数管理会话历史,将其添加到发送给 LLM 的 Prompt 中,从而实现多轮对话的上下文感知。QuestionAnswerAdvisor
会利用VectorStore
进行信息检索,并将检索到的相关文档片段添加到 Prompt 中,以增强 LLM 的回答能力,实现 RAG 模式。- 通过
advisors(advisor -> advisor.param(ChatMemory.CONVERSATION_ID, conversationId))
,我们可以在运行时为 Advisor 提供参数,这使得 Advisor 更加灵活和可配置。
这些示例展示了 Spring AI Advisors 的强大功能和灵活性,无论是简单的日志记录,还是复杂的 Prompt 增强,甚至集成高级的 AI 模式,Advisors 都提供了一个优雅的解决方案。
最佳实践
在使用 Spring AI Advisors 时,遵循一些最佳实践可以帮助你构建更健壮、高效和可维护的 AI 应用。
- 明确 Advisor 的职责:每个 Advisor 应该只负责一个单一的、明确的功能。例如,一个 Advisor 负责日志记录,另一个负责 Prompt 增强,而不是将所有逻辑都塞到一个 Advisor 中。这有助于提高代码的可读性、可测试性和可重用性。
- 合理设置
order
值:仔细考虑每个 Advisor 的执行顺序。例如,如果一个 Advisor 需要依赖另一个 Advisor 修改后的 Prompt,那么它应该在依赖的 Advisor 之后执行。对于全局性的操作(如日志、异常处理),通常设置较高的优先级(较低的order
值),以便它们在请求处理的最早和最晚阶段介入。 - 利用
BaseAdvisor
简化实现:对于大多数自定义 Advisor,优先考虑实现BaseAdvisor
接口。它提供了before
和after
两个清晰的扩展点,大大减少了样板代码,让你能够专注于核心业务逻辑。 - 善用
AdvisorContext
共享状态:如果多个 Advisor 之间需要共享数据或状态,请使用AdvisorContext
。这比在 Advisor 之间传递复杂对象或使用全局变量更优雅和安全。 - 区分同步和流式处理:如果你的 Advisor 需要同时支持同步和流式 AI 调用,确保正确实现
CallAdvisor
和StreamAdvisor
的逻辑。BaseAdvisor
已经为你处理了大部分复杂性,但仍需注意流式处理中after
方法的触发时机(通常是流结束时)。 - 错误处理:在 Advisor 中实现适当的错误处理机制。如果一个 Advisor 抛出异常,它可能会中断整个链的执行。考虑如何优雅地处理这些异常,例如通过日志记录、回退机制或将错误信息传递给
AdvisorContext
。
结论
Spring AI Advisors 提供了一个强大而灵活的机制,用于在 Spring AI 应用程序中拦截、修改和增强 AI 驱动的交互。通过深入理解其核心概念、架构设计、执行流程以及 BaseAdvisor
的实现细节,开发者可以有效地利用这一特性来构建更具可扩展性、可维护性和智能化的 AI 应用。
无论是简单的日志记录、复杂的 Prompt 增强,还是集成如聊天记忆和 RAG 等高级 AI 模式,Advisors 都提供了一个优雅且符合 Spring 哲学的设计模式。掌握 Advisors 的使用,将使你能够更好地控制 AI 模型的行为,为用户提供更智能、更个性化的体验。