前沿重器
栏目主要给大家分享各种大厂、顶会的论文和分享,从中抽取关键精华的部分和大家分享,和大家一起把握前沿技术。具体介绍:仓颉专项:飞机大炮我都会,利器心法我还有。(算起来,专项启动已经是20年的事了!)
2024年文章合集最新发布!在这里:再添近20万字-CS的陋室2024年文章合集更新
往期回顾
前沿重器[64] | 阿里妈妈URM大模型:基于LLM的通用推荐新方案
前沿重器[65] | 大模型评判能力综述
前沿重器[66] | 美团搜索广告召回迭代实践启示
前沿重器[67] | 北邮腾讯MemoryOS:分层记忆模式解决大模型长记忆问题(上)
前沿重器[68] | 北邮腾讯MemoryOS:分层记忆模式解决大模型长记忆问题(下)
今天和大家一起阅读deepsearcher的源码,我们也知道这个项目还是比较有名的,我们来看看它内部的那些操作让他变得更好。
开源项目:https://github.com/zilliztech/deep-searcher
上一篇的源码阅读(前沿重器[68] | 北邮腾讯MemoryOS:分层记忆模式解决大模型长记忆问题(下)),写的比较啰嗦,有读者反馈代码粘贴到过多了,所以今天这篇会简化,把重点拿出来。
项目概述
服务简述
文档加载
本地文件
Crawler
推理过程
路由
RAG
NaiveRAG
DeepSearch
ChainOfRAG
总结
这里的RAG应该是本文的重点,即检索推理生成的过程,但因为其他部分在整个项目里也都很重要,所以我都进行了讲解,大家可以根据自己的需求跳转到对应的章节。
PS:详细地,之前写过另一个RAG项目的源码解读,这个内容会更完整,大家有兴趣也可以看看。
前沿重器[45] RAG开源项目Qanything源码阅读1-概述+服务
前沿重器[46] RAG开源项目Qanything源码阅读2-离线文件处理
前沿重器[47] | RAG开源项目Qanything源码阅读3-在线推理
项目概述

一个图基本可以描述出这个项目的核心逻辑,可以看到逻辑还是比较简单的,是很经典的RAG过程,但又两个关键区别。
生成多个sub-query,用多个query去搜。
循环生成。核验结果相关性,只有合适才会出来,否则继续搜索。
当然这俩区别,其实在很多论文内,也应该有见过。另外其他的类似多信息来源检索、语义检索,在现在的研究下也已经是很常规的了。
服务简述
这个项目的服务用的是fastapi,内部核心是这几个接口。
@app.post("/set-provider-config/") # Set configuration for a specific provider.
@app.post("/load-files/") # Load files into the vector database.
@app.post("/load-website/") # Load website content into the vector database.
@app.get("/query/") # Perform a query against the loaded data.
4个接口分别是修改配置、加载文件、加载网页和检索。
修改配置这个,大家可以参考,后续很多需要在线修改配置的,这个接口可以说是一个后门。
文档加载
本地文件
也就是接口load-files
的部分,这部分大家比较关注的点基本都是文本分割了。这里有几个关键点可以借鉴。
首先说一下正则切割,就是基于某个或者某几个分隔符,来进行切割。
def _split_text_with_regex(text: str, separator: str, keep_separator: bool
) -> List[str]:# Now that we have the separator, split the textif separator:if keep_separator:# The parentheses in the pattern keep the delimiters in the result._splits = re.split(f"({separator})", text)splits = [_splits[i] + _splits[i + 1] for i in range(1, len(_splits), 2)]if len(_splits) % 2 == 0:splits += _splits[-1:]splits = [_splits[0]] + splitselse:splits = re.split(separator, text)else:splits = list(text)return [s for s in splits if s != ""]
然后对必要的内容进行合并,这里的_merge_splits
用的是from langchain_text_splitters.base import Language, TextSplitter
下的原生合并,它的核心是判断片段是否过短,如果内容过短,就合并,毕竟正则对长度不敏感,但是向量表征还是比较敏感的。
后续就是转向量和入库了,这个也比较常规,此处的数据库比较多,支持milvus、oracle、qdrant等,这个组件有需要是可以直接搬的,在这个路径。
deepsearcher/vector_db/milvus.py
deepsearcher/vector_db/oracle.py
deepsearcher/vector_db/qdrant.py
Crawler
在项目内,提供了3种爬虫。
crawl4ai_crawler
:使用Crawl4AI
工具作为爬虫,这是一款专为大型语言模型(LLM)和人工智能应用设计的开源异步网页爬虫框架,内部的亮点是,默认生成Markdown、清洁HTML和结构化JSON,还有一些清洗的工具。docling_crawler
:使用docling
工具作为爬虫,这是IBM开源的一款专为生成式AI设计的文档解析工具,核心定位是将PDF、DOCX等复杂文档转换为AI友好的结构化数据。firecrawl
:使用docling
工具作为爬虫,能解决传统爬虫在动态内容处理、反爬机制和数据清洗上的痛点,也支持结构化输出,辅助RAG做内容梳理。
大家还是根据自己的需要学习吧。
推理过程
内部本质还是一个RAG,这块的设计是Agent化,有一个明确的Router,剩下的多个检索组件各自完成。
路由
路由在deepsearcher/agent/rag_router.py
。核心的路由流程在这里。
def _route(self, query: str) -> Tuple[RAGAgent, int]:description_str = "\n".join([f"[{i + 1}]: {description}"for i, description in enumerate(self.agent_descriptions)])prompt = RAG_ROUTER_PROMPT.format(query=query, description_str=description_str)chat_response = self.llm.chat(messages=[{"role": "user", "content": prompt}])try:selected_agent_index = int(self.llm.remove_think(chat_response.content)) - 1except ValueError:# In some reasoning LLM, the output is not a number, but a explaination string with a number in the end.log.warning("Parse int failed in RAGRouter, but will try to find the last digit as fallback.")selected_agent_index = (int(self.find_last_digit(self.llm.remove_think(chat_response.content))) - 1)selected_agent = self.rag_agents[selected_agent_index]log.color_print(f"<think> Select agent [{selected_agent.__class__.__name__}] to answer the query [{query}] </think>\n")return self.rag_agents[selected_agent_index], chat_response.total_tokens
这里的操作主要是构造好prompt然后请求大模型进行决策。这个prompt的模板是这样的,内容相对比较常规,就是给好指令、query、路由的描述即可,这点在这个类型的项目内,即路由个数比较少的场景下,还是可以的,但是次数比较多,会因为prompt过场而导致决策质量出现差错,目前我看比较常见的策略是向量召回(以搜代分)+大模型的模式(PS:某种程度,这也算一种RAG?)(我写了个简单的开源项目:心法利器[114] | 通用大模型文本分类实践(含代码))
RAG_ROUTER_PROMPT = """Given a list of agent indexes and corresponding descriptions, each agent has a specific function.
Given a query, select only one agent that best matches the agent handling the query, and return the index without any other information.## Question
{query}## Agent Indexes and Descriptions
{description_str}Only return one agent index number that best matches the agent handling the query:
"""
这里比较建议用思考模型,在源码里,作者也是加了remove_think的,说明作者在实验过程应该也是这么干的。
路由负责分发任务,下面,就开始真正的任务执行了。
RAG
搜索是推理过程的重要一环,但因为这才是本文的重点,内容也比较多,所以我单独列了一章。
NaiveRAG
这个最经典的RAG模式,检索、生成。这个比较常规,我就不细说了。
DeepSearch
这个检索流程就是本文最开头那张结构图提到的搜索过程,会生成subquery进行分别检索,然后整合分析决策,支持多次循环检索,核心推理流程在这个函数。
def query(self, query: str, **kwargs) -> Tuple[str, List[RetrievalResult], int]:"""Query the agent and generate an answer based on retrieved documents.This method retrieves relevant documents and uses the language modelto generate a comprehensive answer to the query.Args:query (str): The query to answer.**kwargs: Additional keyword arguments for customizing the query process.Returns:Tuple[str, List[RetrievalResult], int]: A tuple containing:- The generated answer- A list of retrieved document results- The total token usage"""all_retrieved_results, n_token_retrieval, additional_info = self.retrieve(query, **kwargs)ifnot all_retrieved_results or len(all_retrieved_results) == 0:returnf"No relevant information found for query '{query}'.", [], n_token_retrievalall_sub_queries = additional_info["all_sub_queries"]chunk_texts = []for chunk in all_retrieved_results:if self.text_window_splitter and"wider_text"in chunk.metadata:chunk_texts.append(chunk.metadata["wider_text"])else:chunk_texts.append(chunk.text)log.color_print(f"<think> Summarize answer from all {len(all_retrieved_results)} retrieved chunks... </think>\n")summary_prompt = SUMMARY_PROMPT.format(question=query,mini_questions=all_sub_queries,mini_chunk_str=self._format_chunk_texts(chunk_texts),)chat_response = self.llm.chat([{"role": "user", "content": summary_prompt}])log.color_print("\n==== FINAL ANSWER====\n")log.color_print(self.llm.remove_think(chat_response.content))return (self.llm.remove_think(chat_response.content),all_retrieved_results,n_token_retrieval + chat_response.total_tokens,)
生成subquery并且搜索的工作,包括这个循环搜索的流程,都在
self.retrieve
,这个一会继续说。字符串拼接完就开始构造总结prompt和请求大模型了。
总结的prompt在这里,注意,这里把生成的子query也带上了。
SUMMARY_PROMPT = """You are a AI content analysis expert, good at summarizing content. Please summarize a specific and detailed answer or report based on the previous queries and the retrieved document chunks.Original Query: {question}Previous Sub Queries: {mini_questions}Related Chunks:
{mini_chunk_str}"""
回到self.retrieve
,这一块应该是最能体现子query生成和循环搜索的事了。
async def async_retrieve(self, original_query: str, **kwargs
) -> Tuple[List[RetrievalResult], int, dict]:max_iter = kwargs.pop("max_iter", self.max_iter)### SUB QUERIES ###log.color_print(f"<query> {original_query} </query>\n")all_search_res = []all_sub_queries = []total_tokens = 0sub_queries, used_token = self._generate_sub_queries(original_query)total_tokens += used_tokenifnot sub_queries:log.color_print("No sub queries were generated by the LLM. Exiting.")return [], total_tokens, {}else:log.color_print(f"<think> Break down the original query into new sub queries: {sub_queries}</think>\n")all_sub_queries.extend(sub_queries)sub_gap_queries = sub_queriesfor iter in range(max_iter):log.color_print(f">> Iteration: {iter + 1}\n")search_res_from_vectordb = []search_res_from_internet = [] # TODO# Create all search taskssearch_tasks = [self._search_chunks_from_vectordb(query, sub_gap_queries)for query in sub_gap_queries]# Execute all tasks in parallel and wait for resultssearch_results = await asyncio.gather(*search_tasks)# Merge all resultsfor result in search_results:search_res, consumed_token = resulttotal_tokens += consumed_tokensearch_res_from_vectordb.extend(search_res)search_res_from_vectordb = deduplicate_results(search_res_from_vectordb)# search_res_from_internet = deduplicate_results(search_res_from_internet)all_search_res.extend(search_res_from_vectordb + search_res_from_internet)if iter == max_iter - 1:log.color_print("<think> Exceeded maximum iterations. Exiting. </think>\n")break### REFLECTION & GET GAP QUERIES ###log.color_print("<think> Reflecting on the search results... </think>\n")sub_gap_queries, consumed_token = self._generate_gap_queries(original_query, all_sub_queries, all_search_res)total_tokens += consumed_tokenifnot sub_gap_queries or len(sub_gap_queries) == 0:log.color_print("<think> No new search queries were generated. Exiting. </think>\n")breakelse:log.color_print(f"<think> New search queries for next iteration: {sub_gap_queries} </think>\n")all_sub_queries.extend(sub_gap_queries)all_search_res = deduplicate_results(all_search_res)additional_info = {"all_sub_queries": all_sub_queries}return all_search_res, total_tokens, additional_info
这里很多核心的工作,都是用大模型来做的,包括这个生成sub_query,而且prompt也很简单,这里和前面不一样的是,给了例子,剩下的就让大模型发挥了。
SUB_QUERY_PROMPT = """To answer this question more comprehensively, please break down the original question into up to four sub-questions. Return as list of str.
If this is a very simple question and no decomposition is necessary, then keep the only one original question in the python code list.Original Question: {original_query}<EXAMPLE>
Example input:
"Explain deep learning"Example output:
["What is deep learning?","What is the difference between deep learning and machine learning?","What is the history of deep learning?"
]
</EXAMPLE>Provide your response in a python code list of str format:
"""
在生成多个prompt后,便可以开始检索,检索的这一步,又有一次决策——该搜哪个库,前面有提到,有文档、有网络,而且网络还可以有多个,文档也是,这里要决策该搜哪些资源,已经资源多了都去搜不现实,这里构造了一个CollectionRouter
,来进行决策。此处也是通过大模型来做的。
COLLECTION_ROUTE_PROMPT = """
I provide you with collection_name(s) and corresponding collection_description(s). Please select the collection names that may be related to the question and return a python list of str. If there is no collection related to the question, you can return an empty list."QUESTION": {question}
"COLLECTION_INFO": {collection_info}When you return, you can ONLY return a python list of str, WITHOUT any other additional content. Your selected collection name list is:
"""
决策完便可以从中选择进行搜索了。这里的搜索统一都是用的向量检索,作者提供了很多种类型。(base是基类)
base
bedrock_embedding
fastembed_embdding
gemini_embedding
glm_embedding
milvus_embedding
novita_embedding
ollama_embedding
openai_embedding
ppio_embedding
sentence_transformer_embedding
siliconflow_embedding
volcengine_embedding
voyage_embedding
watsonx_embedding
搜完,还会进行合并以及rerank,这里与其说rerank,我自己觉得更像是0-1的判断,是否有用,对没用的结果是直接删掉的。
RERANK_PROMPT = """Based on the query questions and the retrieved chunk, to determine whether the chunk is helpful in answering any of the query question, you can only return "YES" or "NO", without any other information.Query Questions: {query}
Retrieved Chunk: {retrieved_chunk}Is the chunk helpful in answering the any of the questions?
"""
流程到这里,就会判断循环是否结束,停止的条件有两个。
到达最大循环次数,就会停。
根据搜索情况判断是否需要在进入下一步搜索。
这个决策用的是这个,也是非常简单的描述,直接看看prompt就知道了。
REFLECT_PROMPT = """Determine whether additional search queries are needed based on the original query, previous sub queries, and all retrieved document chunks. If further research is required, provide a Python list of up to 3 search queries. If no further research is required, return an empty list.If the original query is to write a report, then you prefer to generate some further queries, instead return an empty list.Original Query: {question}Previous Sub Queries: {mini_questions}Related Chunks:
{mini_chunk_str}Respond exclusively in valid List of str format without any other text."""
如果还有别的搜索空间,则会生成query并继续搜索,否则就跳出,跳到前面提到的summary的部分。
这便是项目内提到的deepsearch的整个流程。
ChainOfRAG
RAG并没有结束,还有一个ChainOfRAG
。
@describe_class("This agent can decompose complex queries and gradually find the fact information of sub-queries. ""It is very suitable for handling concrete factual queries and multi-hop questions."
)
class ChainOfRAG(RAGAgent):"""Chain of Retrieval-Augmented Generation (RAG) agent implementation.This agent implements a multi-step RAG process where each step can refinethe query and retrieval process based on previous results, creating a chainof increasingly focused and relevant information retrieval and generation.Inspired by: https://arxiv.org/pdf/2501.14342"""pass
看这串描述大概可以明白,这是一个多步的RAG流程,通过动态生成子查询(sub-queries)和子答案(sub-answers)逐步推理复杂查询,从而达成对复杂问题的推理探索。
来都来了,一起看看。
首先是先推理这一步要做的查询任务,推理出需要的query后便可以去进行搜索,这个“推理这一步要做的查询任务”是大模型完成的,具体prompt如下。
FOLLOWUP_QUERY_PROMPT = """You are using a search tool to answer the main query by iteratively searching the database. Given the following intermediate queries and answers, generate a new simple follow-up question that can help answer the main query. You may rephrase or decompose the main query when previous answers are not helpful. Ask simple follow-up questions only as the search tool may not understand complex questions.## Previous intermediate queries and answers
{intermediate_context}## Main query to answer
{query}Respond with a simple follow-up question that will help answer the main query, do not explain yourself or output anything else.
"""
完成搜索后,再利用大模型把回答本次任务的文档给拿出来。
GET_SUPPORTED_DOCS_PROMPT = """Given the following documents, select the ones that are support the Q-A pair.## Documents
{retrieved_documents}## Q-A Pair
### Question
{query}
### Answer
{answer}Respond with a python list of indices of the selected documents.
"""
此时可以开始判断,是否该停止循环了。
REFLECTION_PROMPT = """Given the following intermediate queries and answers, judge whether you have enough information to answer the main query. If you believe you have enough information, respond with "Yes", otherwise respond with "No".## Intermediate queries and answers
{intermediate_context}## Main query
{query}Respond with "Yes" or "No" only, do not explain yourself or output anything else.
"""
如果可停止则跳出,否则继续循环。
结束循环就开始进行总体的总结,这里会把历史所有总结的结果都给汇总起来(这里没放拓展的query)。
FINAL_ANSWER_PROMPT = """Given the following intermediate queries and answers, generate a final answer for the main query by combining relevant information. Note that intermediate answers are generated by an LLM and may not always be accurate.## Documents
{retrieved_documents}## Intermediate queries and answers
{intermediate_context}## Main query
{query}Respond with an appropriate answer only, do not explain yourself or output anything else.
"""
这种模式相比前面的deepsearch,深度要更深。
总结
看完这个项目,感觉是有些疑问,当然也有可能是很多业务定制或者是很机密的内容,没直接放出来。
全文比较核心的功能基本都是通过prompt解决的,而且还是比较简单的prompt,这在比较通用开放的领域来说,无疑是高效的,但是在比较专的场景中,这么简单的prompt得到的答案很可能并不能和这个领域对齐。高情商地说,就是对“大模型有很高的信任”,实际上相信有经验的大家应该都有这个感受,这个信任度不能太高。
类似query拓展之类的任务,实际场景下多少要考虑一下准确性、有效性和多样性。之前曾经也做过类似的任务,拓展出来的内容很多都无效,类似“换个同义词或者说法”、“拓展query和原问题有关但方向差距大”都很常见。
文档处理和搜索都挺基础的,但是这里的搜索效果和质量,并不太能保证。
当然,带着学习的目的,我们还是可以从中找到很多有用的思路。
横向的query拓展和纵向的循环搜索,对最终效果还是有收益的,虽说展示代码上的内容很简陋,但是这个思路还是可以借鉴。
最近好几个项目都可以考虑用Agent的模式来设计代码了,上次的memoryOS(前沿重器[68] | 北邮腾讯MemoryOS:分层记忆模式解决大模型长记忆问题(下))用的是MCP的模式,这次的deepsearcher也有路由、Agent的风格,说不定会成为新的趋势。
这篇文章的工业风还是挺明显的,这里有表标准的基类和继承,这个模式标准下,后续横向拓展确实会比较方便。
为了应对多次搜索的要求,整个项目内是采用了python的协程工具,看代码时,一般的搜索都是这么写的,这个就是python的协程,有兴趣的大家可以了解了解,对性能提升,CPU使用率都有好处。
async def async_retrieve