目录

摘要

Abstract

1 Schema

2 Prompt

3 KAG-Builder

3.1 reader

3.2 splitter

3.3 extractor

3.4 vectorizer

3.5 writer

3.6 可选组件

4 示例

总结


摘要

本周深入学习了 KAG 项目中的 Schema、Prompt 以及 KAG-Builder 相关代码知识,涵盖了其定义、功能以及在知识图谱构建中的应用。Schema定义了知识图谱的架构,Prompt用于引导模型生成符合预期的输出,而 KAG-Builder则是通过结构化信息获取、知识语义对齐和图存储写入来构建知识图谱,其中包含reader、splitter、extractor、vectorizer、writer等多个组件。

Abstract

This week, we have in-depth knowledge of the Schema, Prompt and KAG-Builder related code knowledge in the KAG project, covering their definitions, functions, and applications in the construction of knowledge graphs. The schema defines the architecture of the knowledge graph, the Prompt is used to guide the model to generate the expected output, and the KAG-Builder is to build the knowledge graph through structured information acquisition, knowledge semantic alignment, and graph storage writing, which includes multiple components such as reader, splitter, extractor, vectorizer, and writer.

1 Schema

Schema是定义知识图谱语义框架和数据组织结构的核心组件,它规定了知识图谱中数据的类型、关系及逻辑约束,是构建和管理知识图谱的基础,其中:

  • 类型:定义一类物体的行为,类型与实例的关系可类比与java语言中类和对象的关系;包括实体类型(EntityType),概念类型(ConceptType),事件类型(EventType),标准类型等;
  • 实例:类比于java语言中的对象,是在图数据库中的node此处要澄清,类型与实例只是SPG逻辑上的概念,在实际的图数据库存储中,都为node;
  • 属性:实体中可定义属性,这与java类中的属性相似,注意,在SPG中某些属性会被处理成关系节点,如User类型下有一个phone的标准类型属性,在User实例的创建中,会生成一个phone标准类型的实例node,并创建一个由User实例指向phone实例的edge,这样做可以很好的解决知识图谱稀疏性的问题,也能提取共性属性;
  • 关系:类比于图数据库中的edge,可以有属性,且可以增加多种限制;
  • 规则:定义属性,关系时可定义相关规则,在规则中创建约束条件,如相反关系等。

概念类型:

  • 实体从具体到一般的抽象,表述的是一组实体实例或事件实例的集合,是一种分类体系;
  • 相对静态,也是常识知识,具有较强复用性,如人群标签、事件分类、行政区划分类等;
  • 概念类型的关系只允许创建在7大类里定义的谓词,概念类型需定义上下谓词。
HospitalDepartment(科室): ConceptTypehypernymPredicate: isA

实体类型(必须存在):

  • 业务相关性比较强的客观对象,通多属性、多关系刻画的多元复合结构类型,如用户、企业、商户等;
  • 所有类型会默认继承Thing类型,默认创建id、name和description属性。
Disease(疾病): EntityTypeproperties:desc(描述): Textindex: TextsemanticType(语义类型): Textindex: Textcomplication(并发症): Diseaseconstraint: MultiValuecommonSymptom(常见症状): Symptomconstraint: MultiValueapplicableMedicine(适用药品): Medicineconstraint: MultiValuehospitalDepartment(就诊科室): HospitalDepartmentconstraint: MultiValuediseaseSite(发病部位): HumanBodyPartconstraint: MultiValuerelations:abnormal(异常指征): Indicator

事件类型:

  • 加入时间、空间等约束的时空多元类型,如通过NLP、CV等抽取出来的行业事件、企业事件、诊疗事件或因购买、核销、注册等行为产生的用户行为事件;
  • 事件类型必须有主题subject,可以有发生事件、地点等基础信息。
Event(医疗风险事件): EventTypeproperties:subject(主体): Doctortime(时间): Date

2 Prompt

Prompt是引导LLM与知识图谱协同工作的核心指令,其设计融合了结构化知识约束和语义对齐能力,旨在提升垂直领域的决策精准性与逻辑严谨性。

在KAG中的Prompt主要要以下几类:

  • 实体prompt:
"instruction": "作为一个图谱知识抽取的专家, 你需要基于定义了实体类型及对应属性的schema,从input字段的文本中抽取出所有的实体及其属性,schema中标记为List的属性返回list,未能提取的属性返回null。以标准json list格式输出,list中每个元素形如{category: properties},你可以参考example字段中给出的示例格式。注意实体属性的SemanticType指的是一个相比实体类型更具体且明确定义的类型,例如Person类型的SemanticType可以是Professor或Actor。","example": [{"input": "周杰伦(Jay Chou),1979年1月18日出生于台湾省新北市,祖籍福建省永春县,华语流行乐男歌手、音乐人、演员、导演、编剧,毕业于淡江中学。2000年,发行个人首张音乐专辑《Jay》 [26]。2023年凭借《最伟大的作品》获得第一届浪潮音乐大赏年度制作、最佳作曲、最佳音乐录影带三项大奖。","output": [{"category": "Person","properties": {"name": "周杰伦","semanticType": "Musician","description": "华语流行乐男歌手、音乐人、演员、导演、编剧",},},{"category": "GeographicLocation","properties": {"name": "台湾省新北市","semanticType": "City","description": "周杰伦的出生地",},},{"category": "GeographicLocation","properties": {"name": "福建省永春县","semanticType": "County","description": "周杰伦的祖籍",},},{"category": "Organization","properties": {"name": "淡江中学","semanticType": "School","description": "周杰伦的毕业学校",},},{"category": "Works","properties": {"name": "Jay","semanticType": "Album","description": "周杰伦的个人首张音乐专辑",},},{"category": "Works","properties": {"name": "最伟大的作品","semanticType": "MusicVideo","description": "周杰伦凭借此作品获得多项音乐大奖",},},],}],
  • 事件prompt:
"instruction": "作为一个知识图谱图谱事件抽取的专家, 你需要基于定义的事件类型及对应属性的schema,从input字段的文本中抽取出所有的事件及其属性,schema中标记为List的属性返回list,未能提取的属性返回null。以标准json list格式输出,list中每个元素形如{category: properties},你可以参考example字段中给出的示例格式。","example": {"input": "1986年,周星驰被调入无线电视台戏剧组;同年,他在单元情景剧《哥哥的女友》中饰演可爱活泼又略带羞涩的潘家伟,这也是他第一次在情景剧中担任男主角;之后,他还在温兆伦、郭晋安等人主演的电视剧中跑龙套。","output": [{"category": "Event","properties": {"name": "周星驰被调入无线电视台戏剧组","abstract": "1986年,周星驰被调入无线电视台戏剧组。","subject": "周星驰","time": "1986年","location": "无线电视台","participants": [],"semanticType": "调动",},},{"category": "Event","properties": {"name": "周星驰在《哥哥的女友》中饰演潘家伟","abstract": "1986年,周星驰在单元情景剧《哥哥的女友》中饰演可爱活泼又略带羞涩的潘家伟,这也是他第一次在情景剧中担任男主角。","subject": "周星驰","time": "1986年","location": None,"participants": [],"semanticType": "演出",},},{"category": "Event","properties": {"name": "周星驰跑龙套","abstract": "1986年,周星驰在温兆伦、郭晋安等人主演的电视剧中跑龙套。","subject": "周星驰","time": "1986年","location": None,"participants": ["温兆伦", "郭晋安"],"semanticType": "演出",},},],},
  • 实体标准化prompt:
"""
{"instruction": "input字段包含用户提供的上下文。命名实体字段包含从上下文中提取的命名实体,这些可能是含义不明的缩写、别名或俚语。为了消除歧义,请尝试根据上下文和您自己的知识提供这些实体的官方名称。请注意,具有相同含义的实体只能有一个官方名称。请按照提供的示例中的输出字段格式,以单个JSONArray字符串形式回复,无需任何解释。","example": {"input": "烦躁不安、语妄、失眠酌用镇静药,禁用抑制呼吸的镇静药。3.并发症的处理经抗菌药物治疗后,高热常在24小时内消退,或数日内逐渐下降。若体温降而复升或3天后仍不降者,应考虑SP的肺外感染,如腋胸、心包炎或关节炎等。治疗:接胸腔压力调节管+吸引机负压吸引水瓶装置闭式负压吸引宜连续,如经12小时后肺仍未复张,应查找原因。","named_entities": [{"name": "烦躁不安", "category": "Symptom"},{"name": "语妄", "category": "Symptom"},{"name": "失眠", "category": "Symptom"},{"name": "镇静药", "category": "Medicine"},{"name": "肺外感染", "category": "Disease"},{"name": "胸腔压力调节管", "category": "MedicalEquipment"},{"name": "吸引机负压吸引水瓶装置", "category": "MedicalEquipment"},{"name": "闭式负压吸引", "category": "SurgicalOperation"}],"output": [{"name": "烦躁不安", "category": "Symptom", "official_name": "焦虑不安"},{"name": "语妄", "category": "Symptom", "official_name": "谵妄"},{"name": "失眠", "category": "Symptom", "official_name": "失眠症"},{"name": "镇静药", "category": "Medicine", "official_name": "镇静剂"},{"name": "肺外感染", "category": "Disease", "official_name": "肺外感染"},{"name": "胸腔压力调节管", "category": "MedicalEquipment", "official_name": "胸腔引流管"},{"name": "吸引机负压吸引水瓶装置", "category": "MedicalEquipment", "official_name": "负压吸引装置"},{"name": "闭式负压吸引", "category": "SurgicalOperation", "official_name": "闭式负压引流"}]},"input": $input,"named_entities": $named_entities,
}    """

KAG中的Prompt是连接大模型与知识图谱的“语义桥梁”,通过结构化知识注入、符号逻辑引导和动态上下文适配,解决了传统大模型在垂直领域中的幻觉、逻辑混乱等问题。

3 KAG-Builder

构建器链为一个有向无环图,有三种类型的类,用于不同场景的知识图谱构建:

  • 结构化构建器链:处理结构化数据(如数据库表、CSV文件),Mapping(映射器)->Vectorizer(向量器,可选)->Writer(写入器)
    • mapping:将原始数据按预定义Schema转换为中间格式;
    • vectorizer:将结构化数据编码为向量;
    • writer:将处理后的数据持久化到存储系统。
@KAGBuilderChain.register("structured")
@KAGBuilderChain.register("structured_builder_chain")
class DefaultStructuredBuilderChain(KAGBuilderChain):def __init__(self,mapping: MappingABC,writer: SinkWriterABC,vectorizer: VectorizerABC = None,):self.mapping = mappingself.writer = writerself.vectorizer = vectorizerdef build(self, **kwargs):if self.vectorizer:chain = self.mapping >> self.vectorizer >> self.writerelse:chain = self.mapping >> self.writerreturn chain

如下图所示,为KAG-Builder的处理非结构化数据的流程链:

  • 非结构化构建器链:处理非结构化文本(如 PDF、TXT),Reader(读取器)->Spliter(切分器)->Extractor(提取器)->Vectorizer(向量器)->PostProcess(后处理,可选)->Writer(写入器):
    • reader:加载原始文件,生成Chunk或SubGraph;
    • spliter:将大Chunk分割为更小的chunk;
    • extractor:从文本中抽取实体、关系、事件,生成SubGraph;
    • vectorizer:将文本或图谱编码为向量;
    • post_process:优化图谱结构;
    • writer:将构建好的知识图谱数据写入到指定的存储介质中。
@KAGBuilderChain.register("unstructured")
@KAGBuilderChain.register("unstructured_builder_chain")
class DefaultUnstructuredBuilderChain(KAGBuilderChain):def __init__(self,reader: ReaderABC,splitter: SplitterABC = None,extractor: ExtractorABC = None,vectorizer: VectorizerABC = None,writer: SinkWriterABC = None,post_processor: PostProcessorABC = None,):self.reader = readerself.splitter = splitterself.extractor = extractorself.vectorizer = vectorizerself.post_processor = post_processorself.writer = writerdef build(self, **kwargs):chain = self.reader >> self.splitterif self.extractor:chain = chain >> self.extractorif self.vectorizer:chain = chain >> self.vectorizerif self.post_processor:chain = chain >> self.post_processorif self.writer:chain = chain >> self.writerreturn chain
  • 外部知识注入链:将外部知识图谱(如 Wikidata)合并到当前图谱,ExternalGraphLoader(外部图谱加载器)->Vectorizer(向量器,可选)->Writer(写入器):

    • externalgraph:读取外部图谱数据;

    • vectorizer:编码外部图谱;

    • writer:将外部图谱与当前图谱融合。

@KAGBuilderChain.register("domain_kg_inject_chain")
class DomainKnowledgeInjectChain(KAGBuilderChain):def __init__(self,external_graph: ExternalGraphLoaderABC,writer: SinkWriterABC,vectorizer: VectorizerABC = None,):self.external_graph = external_graphself.writer = writerself.vectorizer = vectorizerdef build(self, **kwargs):if self.vectorizer:chain = self.external_graph >> self.vectorizer >> self.writerelse:chain = self.external_graph >> self.writerreturn chain

3.1 reader

reader基于抽象基类ReaderABC,用于将文件(pdf、txt、csv等类型)内容解析为Chunk对象,以下是Reader文件中TXTReader的代码:

@ReaderABC.register("txt")
@ReaderABC.register("txt_reader")
class TXTReader(ReaderABC):def _invoke(self, input: Input, **kwargs) -> List[Output]:if not input:raise ValueError("Input cannot be empty")try:if os.path.exists(input):with open(input, "r", encoding="utf-8") as f:content = f.read()else:content = inputexcept OSError as e:raise IOError(f"Failed to read file: {input}") from ebasename, _ = os.path.splitext(os.path.basename(input))chunk = Chunk(id=generate_hash_id(input),name=basename,content=content,)return [chunk]

TXTReader为处理文本读取的主要方法,此方法读取输入的内容(可以是文件路径或文本内容)并将其转换为块,其中:

  • 参数:
    • input (输入):输入字符串,可以是文本文件或直接文本内容的路径。
    •  **kwargs:其他关键字参数,当前未使用,但保留以备将来扩展。
  • 返回:
    •  List[Output]:包含 Chunk 对象的列表,每个对象代表读取的一段文本。
  • 异常处理:
    • ValueError:如果输入为空。
    • IOError:如果读取输入指定的文件时出现问题。

3.2 splitter

splitter能够根据指定的长度和窗口大小将文本拆分为多个小块。它支持按句子边界拆分和严格按长度拆分两种模式,并且能够处理表格数据。

LengthSplitter类继承自 BaseTableSplitter,来实现根据文本长度和窗口大小将文本拆分为小块的功能。分为以下几个步骤:

  • __init__:构造函数,初始化拆分长度、窗口长度和是否严格按长度拆分:
    • split_length (int):每个 chunk 的最大长度;
    • window_length (int):块之间的重叠长度;
    • strict_length (bool):是否严格按长度拆分而不保留句子,默认为 False。
@SplitterABC.register("length")
@SplitterABC.register("length_splitter")
class LengthSplitter(BaseTableSplitter):def __init__(self,split_length: int = 500,window_length: int = 100,strict_length: bool = False,):super().__init__()self.split_length = split_lengthself.window_length = window_lengthself.strict_length = strict_length
  • input_types output_types:定义输入和输出类型均为Chunk:
    @propertydef input_types(self) -> Type[Input]:return Chunk@propertydef output_types(self) -> Type[Output]:return Chunk
  • chunk_breakdown:递归地将文本块拆分为更小的块:
    def chunk_breakdown(self, chunk):chunks = self.logic_break(chunk)if chunks:res_chunks = []for c in chunks:res_chunks.extend(self.chunk_breakdown(c))else:res_chunks = self.slide_window_chunk(chunk, self.split_length, self.window_length)return res_chunks
  • split_sentence:将文本按句子分隔符拆分为句子列表:
    • 参数:content (str),需要拆分为句子的内容;
    • 返回值:List[str],句子列表。
    def split_sentence(self, content):sentence_delimiters = ".。??!!" if KAG_PROJECT_CONF.language == "en" else "。?!"output = []start = 0for idx, char in enumerate(content):if char in sentence_delimiters:end = idxtmp = content[start : end + 1].strip()if len(tmp) > 0:output.append(tmp.strip())start = idx + 1res = content[start:].strip()if len(res) > 0:output.append(res)return output
  • slide_window_chunk:使用滑动窗口方法将内容拆分为多个块:
    • 参数:
      • org_chunk (Chunk):要拆分的原始Chunk;
      • chunk_size (int, optional):每个块的最大大小,默认值为2000;
      •  window_length (int,可选):块之间的重叠长度,默认值为300;
      •  sep (str,可选):用于连接句子的分隔符,默认为 “n”;
    • 返回: List[Chunk],Chunk对象的列表。
    def slide_window_chunk(self,org_chunk: Chunk,chunk_size: int = 2000,window_length: int = 300,sep: str = "\n",) -> List[Chunk]:if org_chunk.type == ChunkTypeEnum.Table:table_chunks = self.split_table(org_chunk=org_chunk, chunk_size=chunk_size, sep=sep)if table_chunks is not None:return table_chunks# 如果启用严格长度切分,不按句子切分if getattr(self, "strict_length", False):return self.strict_length_chunk(org_chunk)content = self.split_sentence(org_chunk.content)splitted = []cur = []cur_len = 0for sentence in content:if cur_len + len(sentence) > chunk_size:if cur:splitted.append(cur)tmp = []cur_len = 0for item in cur[::-1]:if cur_len >= window_length:breaktmp.append(item)cur_len += len(item)cur = tmp[::-1]cur.append(sentence)cur_len += len(sentence)if len(cur) > 0:splitted.append(cur)output = []for idx, sentences in enumerate(splitted):chunk = Chunk(id=generate_hash_id(f"{org_chunk.id}#{idx}"),name=f"{org_chunk.name}_split_{idx}",content=sep.join(sentences),type=org_chunk.type,chunk_size=chunk_size,window_length=window_length,**org_chunk.kwargs,)output.append(chunk)return output
  • strict_length_chunk:严格按长度将内容拆分为块,而不保留句子边界:
def strict_length_chunk(self,org_chunk: Chunk,) -> List[Chunk]:content = org_chunk.contenttotal_length = len(content)output = []position = 0chunk_index = 0while position < total_length:# 计算当前chunk的内容chunk_content = content[position : position + self.split_length]# 创建新的Chunk对象chunk = Chunk(id=generate_hash_id(f"{org_chunk.id}#{chunk_index}"),name=f"{org_chunk.name}_split_{chunk_index}",content=chunk_content,type=org_chunk.type,chunk_size=self.split_length,window_length=self.window_length,**org_chunk.kwargs,)output.append(chunk)# 更新位置和索引position = position + self.split_length - self.window_lengthchunk_index += 1# 确保不会出现负索引if position < 0:position = 0return output
  • _invoke:根据指定的长度和窗口大小调用输入块的拆分:
    • 参数:
      • input (Chunk):要拆分的chunk;
      •  **kwargs:其他关键字参数,当前未使用,但保留以备将来扩展。
    • 返回: List[Output],拆分作生成的Chunk对象列表。
    def _invoke(self, input: Chunk, **kwargs) -> List[Output]:cutted = []if isinstance(input, list):for item in input:cutted.extend(self.slide_window_chunk(item, self.split_length, self.window_length))else:cutted.extend(self.slide_window_chunk(input, self.split_length, self.window_length))return cutted

3.3 extractor

extractor用于从文本中提取实体、关系和事件,并构建知识图谱的子图。

下面是基于架构约束extractor的相关代码,其中:

  • 架构约束extractor的功能:执行知识提取以实施架构约束,包括实体、事件及其边缘,实体和事件的类型及其各自的属性会自动从项目的架构继承;
  • __init__:构造函数,初始化实例;
  • input_types:输入类型为splitter划分的文本块;
  • output_types:输出类型为子图;
@ExtractorABC.register("schema_constraint")
@ExtractorABC.register("schema_constraint_extractor")
class SchemaConstraintExtractor(ExtractorABC):def __init__(self,llm: LLMClient,ner_prompt: PromptABC = None,std_prompt: PromptABC = None,relation_prompt: PromptABC = None,event_prompt: PromptABC = None,external_graph: ExternalGraphLoaderABC = None,):super().__init__()self.llm = llmself.schema = SchemaClient(host_addr=KAG_PROJECT_CONF.host_addr, project_id=KAG_PROJECT_CONF.project_id).load()self.ner_prompt = ner_promptself.std_prompt = std_promptself.relation_prompt = relation_promptself.event_prompt = event_promptbiz_scene = KAG_PROJECT_CONF.biz_sceneif self.ner_prompt is None:self.ner_prompt = init_prompt_with_fallback("ner", biz_scene)if self.std_prompt is None:self.std_prompt = init_prompt_with_fallback("std", biz_scene)self.external_graph = external_graph@propertydef input_types(self) -> Type[Input]:return Chunk@propertydef output_types(self) -> Type[Output]:return SubGraph
  • _named_entity_recognition_llm:调用语言模型,执行命名实体识别:
    • 参数:passage,要处理的文本内容;
    • 返回值:ner_result:命名实体识别的结果。
    def _named_entity_recognition_llm(self, passage: str):ner_result = self.llm.invoke({"input": passage}, self.ner_prompt, with_except=False)return ner_result
  • _named_entity_recognition_process:对命名实体识别的结果进行去重和整合:
    • 参数:
      • passage:原始文本内容;
      • ner_resulet:命名实体识别的结果。
    • 返回值:output,处理后的实体列表,包含唯一的实体信息。
    def _named_entity_recognition_process(self, passage, ner_result):if self.external_graph:extra_ner_result = self.external_graph.ner(passage)else:extra_ner_result = []output = []dedup = set()for item in extra_ner_result:name = item.nameif name not in dedup:dedup.add(name)output.append({"name": name,"category": item.label,"properties": item.properties,})for item in ner_result:name = item.get("name", None)category = item.get("category", None)if name is None or category is None:continueif not isinstance(name, str):continueif name not in dedup:dedup.add(name)output.append(item)return output
  • named_entity_recognition:对给定的文本段落执行命名实体识别:
    • 参数: passage (str),要对其执行命名实体识别的文本;
    • 返回值: 命名实体识别作的结果。
@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=10, max=60),reraise=True,)def named_entity_recognition(self, passage: str):ner_result = self._named_entity_recognition_llm(passage)return self._named_entity_recognition_process(passage, ner_result)
  • named_entity_standardization:对给定的文本段落和实体执行命名实体标准化:
    • 参数:
      • passage (str):文本段落;
      • entities (List[Dict]):要标准化的实体列表。
    • 返回值:命名实体标准化作的结果。
@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=10, max=60),reraise=True,)def named_entity_standardization(self, passage: str, entities: List[Dict]):return self.llm.invoke({"input": passage, "named_entities": entities},self.std_prompt,with_except=False,)
  • relations_extraction:对给定的文本段落和实体执行关系提取:
    • 参数:
      • passage (str):文本段落;
      • entities (List[Dict]):实体列表。
    • 返回值: 关系提取作的结果。
@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=10, max=60),reraise=True,)def relations_extraction(self, passage: str, entities: List[Dict]):if self.relation_prompt is None:logger.debug("Relation extraction prompt not configured, skip.")return []return self.llm.invoke({"input": passage, "entity_list": entities},self.relation_prompt,with_except=False,)
  • event_extraction: 对给定的文本段落执行事件提取:
    • 参数:passage (str),文本段落;
    • 返回:事件提取作的结果。
@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=10, max=60),reraise=True,)def event_extraction(self, passage: str):if self.event_prompt is None:logger.debug("Event extraction prompt not configured, skip.")return []return self.llm.invoke({"input": passage}, self.event_prompt, with_except=False)
  • parse_nodes_and_edges:从实体列表中解析节点和边:
    • 参数:entities (List[Dict]),实体列表;
    • 返回:Tuple[List[Node], List[Edge]],解析的节点和边。
    def parse_nodes_and_edges(self, entities: List[Dict], category: str = None):graph = SubGraph([], [])entities = copy.deepcopy(entities)root_nodes = []for record in entities:if record is None:continueif isinstance(record, str):record = {"name": record}s_name = record.get("name", "")s_label = record.get("category", category)properties = record.get("properties", {})# 有时,名称和/或标签会放置在属性中。if not s_name:s_name = properties.pop("name", "")if not s_label:s_label = properties.pop("category", "")if not s_name or not s_label:continues_name = processing_phrases(s_name)root_nodes.append((s_name, s_label))tmp_properties = copy.deepcopy(properties)spg_type = self.schema.get(s_label)for prop_name, prop_value in properties.items():if prop_value is None:tmp_properties.pop(prop_name)continueif prop_name in spg_type.properties:prop_schema = spg_type.properties.get(prop_name)o_label = prop_schema.object_type_name_enif o_label not in BASIC_TYPES:# 弹出并将属性转换为节点和边if not isinstance(prop_value, list):prop_value = [prop_value](new_root_nodes,new_nodes,new_edges,) = self.parse_nodes_and_edges(prop_value, o_label)graph.nodes.extend(new_nodes)graph.edges.extend(new_edges)# 将当前节点连接到属性生成的节点for node in new_root_nodes:graph.add_edge(s_id=s_name,s_label=s_label,p=prop_name,o_id=node[0],o_label=node[1],)tmp_properties.pop(prop_name)record["properties"] = tmp_properties# 注意:对于转换为节点/边的属性,我们会保留原始属性值的副本。graph.add_node(id=s_name, name=s_name, label=s_label, properties=properties)if "official_name" in record:official_name = processing_phrases(record["official_name"])if official_name != s_name:graph.add_node(id=official_name,name=official_name,label=s_label,properties=dict(properties),)graph.add_edge(s_id=s_name,s_label=s_label,p="OfficialName",o_id=official_name,o_label=s_label,)return root_nodes, graph.nodes, graph.edges
  • add_relations_to_graph:根据关系和实体列表向子图添加边:
    • 参数:
      • sub_graph (SubGraph):要向其添加边的子图;
      • entities (List[Dict]): 实体列表,用于查找类别信息;
      • relations (List[list]): 一个关系列表,每个关系代表要添加到子图中的一个关系。
    •  返回:构造的子图。
    @staticmethoddef add_relations_to_graph(sub_graph: SubGraph, entities: List[Dict], relations: List[list]):for rel in relations:if len(rel) != 5:continues_name, s_category, predicate, o_name, o_category = rels_name = processing_phrases(s_name)sub_graph.add_node(s_name, s_name, s_category)o_name = processing_phrases(o_name)sub_graph.add_node(o_name, o_name, o_category)edge_type = to_camel_case(predicate)if edge_type:sub_graph.add_edge(s_name, s_category, edge_type, o_name, o_category)return sub_graph
  • add_chunk_to_graph:将 Chunk 对象与子图关联,将其添加为节点并将其与现有节点连接:
    • 参数:
      • sub_graph (SubGraph):要向其添加数据块信息的子图;
      • chunk (Chunk):包含文本和元数据的 chunk 对象。
    • 返回:构造的子图。
    @staticmethoddef add_chunk_to_graph(sub_graph: SubGraph, chunk: Chunk):for node in sub_graph.nodes:sub_graph.add_edge(node.id, node.label, "source", chunk.id, CHUNK_TYPE)sub_graph.add_node(id=chunk.id,name=chunk.name,label=CHUNK_TYPE,properties={"id": chunk.id,"name": chunk.name,"content": f"{chunk.name}\n{chunk.content}",**chunk.kwargs,},)sub_graph.id = chunk.idreturn sub_graph
  • assemble_subgraph:从给定的 chunk、entities、events 和 relations 中组装一个 subgraph:
    • 参数:
      • chunk (Chunk):块对象;
      • entities (List[Dict]):实体列表;
      • events (List[Dict]): 事件列表。
    • 返回:构造的子图。
    def assemble_subgraph(self,chunk: Chunk,entities: List[Dict],relations: List[list],events: List[Dict],):graph = SubGraph([], [])_, entity_nodes, entity_edges = self.parse_nodes_and_edges(entities)graph.nodes.extend(entity_nodes)graph.edges.extend(entity_edges)_, event_nodes, event_edges = self.parse_nodes_and_edges(events)graph.nodes.extend(event_nodes)graph.edges.extend(event_edges)self.add_relations_to_graph(graph, entities, relations)self.add_chunk_to_graph(graph, chunk)return graph
  • append_official_name:将正式名称附加到实体:
    • 参数:
      • source_entities (List[Dict]):源实体的列表;
      • entities_with_official_name (List[Dict]):具有正式名称的实体列表。
    def append_official_name(self, source_entities: List[Dict], entities_with_official_name: List[Dict]):tmp_dict = {}for tmp_entity in entities_with_official_name:name = tmp_entity["name"]category = tmp_entity["category"]official_name = tmp_entity["official_name"]key = f"{category}{name}"tmp_dict[key] = official_namefor tmp_entity in source_entities:name = tmp_entity["name"]category = tmp_entity["category"]key = f"{category}{name}"if key in tmp_dict:official_name = tmp_dict[key]tmp_entity["official_name"] = official_name
  • postprocess_graph:通过合并具有相同名称和标签的节点来对图形进行后处理:
    • 参数:graph (SubGraph),要后处理的图形;
    • 返回:后处理图。
    def postprocess_graph(self, graph):try:all_node_properties = {}for node in graph.nodes:id_ = node.idname = node.namelabel = node.labelkey = (id_, name, label)if key not in all_node_properties:all_node_properties[key] = node.propertieselse:all_node_properties[key].update(node.properties)new_graph = SubGraph([], [])for key, node_properties in all_node_properties.items():id_, name, label = keynew_graph.add_node(id=id_, name=name, label=label, properties=node_properties)new_graph.edges = graph.edgesreturn new_graphexcept:return graph
  • _invoke:在给定的输入上调用提取器:
    • 参数:
      • input (Input):输入数据;
      • **kwargs:其他关键字参数。
    • 返回: List[Output],输出结果的列表。
    def _invoke(self, input: Input, **kwargs) -> List[Output]:title = input.namepassage = title + "\n" + input.contentout = []entities = self.named_entity_recognition(passage)events = self.event_extraction(passage)named_entities = []for entity in entities:named_entities.append({"name": entity["name"], "category": entity["category"]})relations = self.relations_extraction(passage, named_entities)std_entities = self.named_entity_standardization(passage, named_entities)self.append_official_name(entities, std_entities)subgraph = self.assemble_subgraph(input, entities, relations, events)out.append(self.postprocess_graph(subgraph))logger.debug(f"input passage:\n{passage}")logger.debug(f"output graphs:\n{out}")return out

其中,_named_entity_recognition_llm、named_entity_recognition、named_entity_standardization、relations_extraction、event_extraction、_invoke等方法除了上述的同步方法外,还有对应的异步方法。

3.4 vectorizer

vectorizer用于为知识图谱中的节点属性生成嵌入向量,可以增强知识图谱的语义表示,为后续的查询和推理提供支持。

其中,BatchVectorizer用于为SubGraph中的节点属性批量生成嵌入向量的类,通过批量处理提高了效率,并支持同步和异步操作模式。

BatchVectorizer属性:

  • project_id (int):与SubGraph关联的工程的ID;
  • vec_meta (defaultdict):SubGraph中向量字段的元数据;
  • vectorize_model (VectorizeModelABC):用于生成嵌入向量的模型;
  • batch_size (int):处理节点的批处理的大小,默认值为 32。
@VectorizerABC.register("batch")
@VectorizerABC.register("batch_vectorizer")
class BatchVectorizer(VectorizerABC):def __init__(self,vectorize_model: VectorizeModelABC,batch_size: int = 32,disable_generation: Optional[List[str]] = None,):super().__init__()self.project_id = KAG_PROJECT_CONF.project_id# self._init_graph_store()self.vec_meta = self._init_vec_meta()self.vectorize_model = vectorize_modelself.batch_size = batch_sizeself.disable_generation = disable_generationdef _init_vec_meta(self):vec_meta = defaultdict(list)schema_client = SchemaClient(host_addr=KAG_PROJECT_CONF.host_addr, project_id=self.project_id)spg_types = schema_client.load()for type_name, spg_type in spg_types.items():for prop_name, prop in spg_type.properties.items():if prop_name == "name" or prop.index_type in [# if prop.index_type in [IndexTypeEnum.Vector,IndexTypeEnum.TextAndVector,]:vec_meta[type_name].append(get_vector_field_name(prop_name))return vec_meta

_generate_embedding_vectors:为输入 SubGraph 中的节点生成嵌入向量:

  • 参数:input_subgraph (SubGraph),要为其生成嵌入向量的 SubGraph。
  • 返回:SubGraph,带有生成的嵌入向量的修改后的 SubGraph。
@retry(stop=stop_after_attempt(3), reraise=True)def _generate_embedding_vectors(self, input_subgraph: SubGraph) -> SubGraph:node_list = []node_batch = []for node in input_subgraph.nodes:if not node.id or not node.name:continueproperties = {"id": node.id, "name": node.name}properties.update(node.properties)node_list.append((node, properties))node_batch.append((node.label, properties.copy()))generator = EmbeddingVectorGenerator(self.vectorize_model, self.vec_meta, self.disable_generation)generator.batch_generate(node_batch, self.batch_size)for (node, properties), (_node_label, new_properties) in zip(node_list, node_batch):for key, value in properties.items():if key in new_properties and new_properties[key] == value:del new_properties[key]node.properties.update(new_properties)return input_subgraph

_invoke:调用输入 SubGraph 的嵌入向量的生成:

  • 参数:
    • input_subgraph (Input):要为其生成嵌入向量的 SubGraph;
    • **kwargs:其他关键字参数,当前未使用,但保留以备将来扩展。
  • 返回:List[Output],包含修改后的 SubGraph 和生成的嵌入向量的列表。
    def _invoke(self, input_subgraph: Input, **kwargs) -> List[Output]:modified_input = self._generate_embedding_vectors(input_subgraph)return [modified_input]

3.5 writer

writer用于将子图写入知识图谱存储,在知识图谱构建流程中负责最终的数据持久化。

KGWriter是用于将 SubGraph 写入知识图 (KG) 存储的类。此类继承自 SinkWriterABC,并提供将 SubGraph 写入知识图存储系统的功能,它支持upsert和delete等操作:

class AlterOperationEnum(str, Enum):Upsert = "UPSERT"Delete = "DELETE"@SinkWriterABC.register("kg", as_default=True)
@SinkWriterABC.register("kg_writer", as_default=True)
class KGWriter(SinkWriterABC):def __init__(self, project_id: int = None, delete: bool = False, **kwargs):super().__init__(**kwargs)if project_id is None:self.project_id = KAG_PROJECT_CONF.project_idelse:self.project_id = project_idself.client = GraphClient(host_addr=KAG_PROJECT_CONF.host_addr, project_id=project_id)self.delete = delete@propertydef input_types(self) -> Type[Input]:return SubGraph@propertydef output_types(self) -> Type[Output]:return None

format_label:通过添加项目命名空间(如果尚不存在)来设置标签的格式:

  • 参数: label (str),需要格式化的标签;
  • 返回: str,格式化的标签。
    def format_label(self, label: str):namespace = KAG_PROJECT_CONF.namespaceif label.split(".")[0] == namespace:return labelreturn f"{namespace}.{label}"
  • standarlize_graph:标准化子图,格式化标签并将非字符串属性转换为JSON字符串:

    def standarlize_graph(self, graph):for node in graph.nodes:node.label = self.format_label(node.label)for edge in graph.edges:edge.from_type = self.format_label(edge.from_type)edge.to_type = self.format_label(edge.to_type)for node in graph.nodes:for k, v in node.properties.items():if k.startswith("_"):continueif not isinstance(v, str):node.properties[k] = json.dumps(v, ensure_ascii=False)for edge in graph.edges:for k, v in edge.properties.items():if k.startswith("_"):continueif not isinstance(v, str):edge.properties[k] = json.dumps(v, ensure_ascii=False)return graph

_invoke:在图形存储上调用指定的作(upsert 或 delete)。

  • 参数:
    • input (Input):表示要作的子图的 input 对象;
    • alter_operation (str):要执行的作类型(Upsert 或 Delete),默认为 Upsert;
    • lead_to_builder (bool):启用潜在客户到事件推断生成器,默认为 False。
  • 返回:List[Output],输出对象列表(当前始终为 [None])。
    def _invoke(self,input: Input,alter_operation: str = AlterOperationEnum.Upsert,lead_to_builder: bool = False,**kwargs,) -> List[Output]:if self.delete:self.client.write_graph(sub_graph=input.to_dict(),operation=AlterOperationEnum.Delete,lead_to_builder=lead_to_builder,)else:input = self.standarlize_graph(input)logger.debug(f"final graph to write: {input}")self.client.write_graph(sub_graph=input.to_dict(),operation=alter_operation,lead_to_builder=lead_to_builder,)return [input]

invoke:处理输入数据并调用_invoke执行写入操作,支持检查点功能:

    def invoke(self, input: Input, **kwargs) -> List[Union[Output, BuilderComponentData]]:if isinstance(input, BuilderComponentData):input_data = input.datainput_key = input.hash_keyelse:input_data = inputinput_key = Noneif self.inherit_input_key:output_key = input_keyelse:output_key = Nonewrite_ckpt = kwargs.get("write_ckpt", True)if write_ckpt and self.checkpointer:# found existing data in checkpointerif input_key and self.checkpointer.exists(input_key):return []# not foundoutput = self._invoke(input_data, **kwargs)# We only record the data key to avoid embeddings from taking up too much disk space.if input_key:self.checkpointer.write_to_ckpt(input_key, input_key)return [BuilderComponentData(x, output_key) for x in output]else:output = self._invoke(input_data, **kwargs)return [BuilderComponentData(x, output_key) for x in output]

_handle:为 SPGServer 提供的调用接口:

  • 参数:
    • input (Dict):表示要作的子图的输入字典;
    • alter_operation (str):要执行的作类型(Upsert 或 Delete);
    • **kwargs:其他关键字参数。
  • 返回:None,此方法当前返回 None。
    def _handle(self, input: Dict, alter_operation: str, **kwargs):_input = self.input_types.from_dict(input)_output = self.invoke(_input, alter_operation)  # noqareturn None

3.6 可选组件

除了上述的常用组件外,KAG-Builder还提供一些可选组件,例如:

  • post_processor:对构建好的知识图谱进行后处理,如数据清洗、格式转换、质量评估等;
  • mapping:实现不同数据源、不同格式之间的映射和转换;
  • scanner:对数据进行快速扫描和预处理,识别数据中的关键信息和结构;
  • external_graph:用于与外部知识图谱进行交互和融合,可以导入或导出知识图谱数据。

4 示例

如下图所示: 

当给KAG一个txt的文本文件,

  • 首先,reader将这个文件读入chunk中,例如:

亚伯拉罕·林肯采取了温和的立场,旨在尽快使南方回归联邦,而国会中的激进共和党人则寻求更强有力的措施来提升非裔美国人的权利。共和党重新夺回了他们在2006年中期选举中失去的众议院控制权,净增了63个席位。

  • splitter再将这个大chunk,分为更小的chunk:

亚伯拉罕·林肯采取了温和的立场,旨在尽快使南方回归联邦,而国会中的激进共和党人则寻求更强有力的措施来提升非裔美国人的权利。

共和党重新夺回了他们在2006年中期选举中失去的众议院控制权,净增了63个席位。

  • 然后,由extractor进行信息提取,从切分后的文本中提取实体、关系等知识图谱的核心元素:

  • 再由aligner进行知识对齐,将提取器结果与语义架构标准化对齐,包括文档语义对齐和概念语义图对齐:

  • 最后,vectorizer将结果向量化,再由writer进行图谱写入,将子图写入图存储数据库:

总结

通过对 Schema、Prompt 以及 KAG-Builder 的学习,我对知识图谱构建过程中的数据定义、语言模型引导和构建流程管理有了更深入的理解:

  1. Schema是定义知识图谱语义框架和数据组织结构的核心组件,Schema 是知识图谱的架构定义,它描述了实体和关系的类型及其属性。
  2. Prompt 是自然语言处理任务中用于引导模型生成符合预期输出的模板。在 KAG项目中,Prompt 被广泛应用于各种任务,如命名实体识别(NER)、关系抽取、事件抽取等。通过设计合适的 Prompt,可以有效地指导语言模型理解任务要求并生成准确的结构化输出。
  3. KAG-Builder 是 KAG 项目中用于构建知识图谱的核心组件,它由多个功能模块组成,包括 Reader(读取器)、Splitter(切分器)、Extractor(提取器)、Vectorizer(向量化器)、PostProcessor(后处理器)和 Writer(写入器)。这些模块通过构建链(Builder Chain)连接成一个完整的工作流程,负责从数据读取到知识图谱构建的全过程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/81686.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/81686.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/81686.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Gitea windows服务注册,服务启动、停止、重启脚本

修改配置文件 查看COMPUTERNAME echo %COMPUTERNAME%进入配置文件D:\gitea\custom\conf\app.ini&#xff0c;将 Gitea 设置为以本地系统用户运行 如果结果是 USER-PC&#xff0c;那么 RUN_USER USER-PC$ RUN_USER COMPUTERNAME$SQLite3 PATH配置&#xff0c;更改为包含完整…

矿泉水瓶的绘制

1.制作中心矩形&#xff0c;大小为60&#xff0c;注意设置矩形的两条边相等 2.点击拉伸&#xff0c;高度为150mm 3.使用圆角命令&#xff0c;点击连接到开始面&#xff0c;同时选中4条边&#xff0c;进行圆角转化&#xff0c;圆角大小为10mm&#xff0c;点击多半径圆角&#xf…

【程序+论文】大规模新能源并网下的火电机组深度调峰经济调度

目录 1 主要内容 讲解重点 2 讲解视频及代码 1 主要内容 该视频为《大规模新能源并网下的火电机组深度调峰经济调度》代码讲解内容&#xff0c;该程序有完全对照的论文&#xff0c;以改进IEEE30节点作为研究对象&#xff0c;系统包括5个火电机组和2个新能源机组&#xff0c;…

​​工业机器人智能编程:从示教器到AI自主决策​​

工业机器人智能编程:从示教器到AI自主决策 引言 工业机器人作为智能制造的核心装备,其编程方式正经历革命性变革。传统示教器编程效率低下,平均每个路径点需要30秒人工示教,而复杂轨迹编程可能耗时数周。随着AI技术的发展,工业机器人编程正朝着"所见即所得"的…

n8n 构建一个 ReAct AI Agent 示例

n8n 构建一个 ReAct AI Agent 示例 0. 引言1. 详细步骤创建一个 "When Executed by Another Workflow"创建一个 "Edit Fields (Set)"再创建一个 "Edit Fields (Set)"创建一个 HTTP Request创建一个 If 节点在 true 分支创建一个 "Edit Fiel…

Monorepo项目多项目一次性启动工具对比与实践

Monorepo项目多项目一次性启动工具对比与实践 在现代软件开发中&#xff0c;Monorepo&#xff08;单一仓库&#xff09;模式越来越受到开发者的青睐。Monorepo将多个相关的项目或包集中在一个仓库中进行管理&#xff0c;方便依赖共享、代码复用和统一发布。在Monorepo项目开发…

笔记整理六----OSPF协议

OSPF 动态路由的分类&#xff1a; 1.基于网络范围进行划分--将网络本身划分为一个个AS&#xff08;自治系统---方便管理和维护&#xff09; 内部网关协议---负责AS内部用户之间互相访问使用的协议 IGP--RIP EIGRP ISIS OSPF 外部网关协议--负责AS之间&#xff08;整个互联网&…

网络编程,使用select()进行简单服务端与客户端通信

这里在Ubuntu环境下演示 一般流程 服务端常用函数&#xff1a; socket()&#xff1a;创建一个新的套接字。bind()&#xff1a;将套接字与特定的IP地址和端口绑定。listen()&#xff1a;使套接字开始监听传入的连接请求。accept()&#xff1a;接受一个传入的连接请求&#xff…

智能决策支持系统的基本概念与理论体系

决策支持系统是管理科学的一个分支&#xff0c;原本与人工智能属于不同的学科范畴&#xff0c;但自20世纪80年代以来&#xff0c;由于专家系统在许多方面取得了成功&#xff0c;于是人们开始考虑把人工智能技术用于计算机管理中来。在用计算机所进行的各种管理中&#xff0c;如…

驱动开发系列55 - Linux Graphics QXL显卡驱动代码分析(二)显存管理

一:概述 前面介绍了当内核检测到匹配的PCI设备后,会调用 qxl_pci_probe 初始化设备,其中会调用qxl_device_init 来初始化设备,为QXL设备进行内存映射,资源分配,环形缓冲区初始化,IRQ注册等操作,本文展开说说这些细节,以及介绍下QXL的显存管理。 二:QXL设备初始化细节…

洛谷 P1495:【模板】中国剩余定理(CRT)/ 曹冲养猪

【题目来源】 https://www.luogu.com.cn/problem/P1495 https://www.acwing.com/problem/content/225/ 【题目描述】 自从曹冲搞定了大象以后&#xff0c;曹操就开始捉摸让儿子干些事业&#xff0c;于是派他到中原养猪场养猪。可是曹冲满不高兴&#xff0c;于是在工作中马马虎…

配置和使用持久卷

配置和使用持久卷 文章目录 配置和使用持久卷[toc]一、PV与PVC的持久化存储机制二、PV和PVC的生命周期三、创建基于NFS的PV1.准备NFS共享目录2.创建PV 四、基于PVC使用PV1.创建PVC2.使用PVC 五、基于StorageClass实现动态卷制备1.获取NFS服务器的连接信息2.获取nfs-subdir-exte…

FreeRTOS菜鸟入门(十)·消息队列

目录 1. 基本概念 2. 数据存储 3. 运作机制 4. 阻塞机制 4.1 出队阻塞 4.2 入队阻塞 5. 操作示意图 5.1 创建队列 5.2 向队列发送第一个消息 5.3 向队列发送第二个消息 5.4 从队列读取消息 6. 消息队列控制块 7. 消息队列常用函数 7.1 消息队列创建…

java 洛谷题单【算法2-2】常见优化技巧

P1102 A-B 数对 解题思路 输入读取与初始化&#xff1a; 使用 Scanner 读取输入。n 表示数组的长度&#xff0c;c 表示目标差值。使用一个 HashMap 存储数组中每个数字及其出现的次数&#xff0c;方便快速查找。数组 a 用于存储输入的数字。 构建哈希映射&#xff1a; 遍历数…

视频转GIF

视频转GIF 以下是一个使用 Python 将视频转换为 GIF 的脚本&#xff0c;使用了 imageio 和 opencv-python 库&#xff1a; import cv2 import imageio import numpy as np """将视频转换为GIF图参数:video_path -- 输入视频的路径gif_path -- 输出GIF的路径fp…

计算机网络:详解TCP协议(四次握手三次挥手)

目录 1.Tcp协议介绍 1.1 Tcp协议层级 1.2 TCP协议的格式 2. 确认应答机制 2.1 确认应答 2.2 序号字段 2.3 捎带应答 3. 流量控制 4. 三次握手 四次挥手 4.1 认识标志位 4.2 简单认识 4.3 三次挥手 4.4 四次挥手 1.Tcp协议介绍 1.1 Tcp协议层级 计算机网络&#x…

小程序 IView WeappUI组件库(简单增删改查)

IView Weapp 微信小程序UI组件库&#xff1a;https://weapp.iviewui.com/components/card IView Weapp.png 快速上手搭建 快速上手.png iView Weapp 的代码 将源代码下载下来&#xff0c;然后将dict放到自己的项目中去。 iView Weapp 的代码.png 小程序中添加iView Weapp 将di…

用java实现一个简单的sql select 解析器,无需第三方依赖,完全从0开始

以下是一个简单的 SQL SELECT 解析器的 Java 实现&#xff0c;支持单表查询和基本条件过滤。代码包含词法分析和语法分析模块&#xff0c;并支持以下语法&#xff1a; SELECT column1, column2 FROM table WHERE column3 5 完整代码 1. Token 类型定义 (TokenType.java) pu…

阿里云 CentOS YUM 源配置指南

阿里云 CentOS YUM 源配置指南 在使用 CentOS 7 时&#xff0c;由于 CentOS 官方源停止维护等原因&#xff0c;yum install 命令可能会报错 “Cannot find a valid baseurl for repo: centos-sclo-rh/x86_64”。以下是通过更换阿里云源解决该问题的详细步骤。 一、备份原有配…

Learning vtkjs之ThresholdPoints

过滤器 阈值过滤器 介绍 vtkThresholdPoints - 提取满足阈值条件的点 vtkThresholdPoints 是一个过滤器&#xff0c;它从数据集中提取满足阈值条件的点。该条件可以采用三种形式&#xff1a; 1&#xff09;大于特定值&#xff1b; 2) 小于特定值&#xff1b; 3) 在特定值之间…