知识库文档处理解析

本文档解析 PIGX 知识库文档处理的完整流程,基于 LiteFlow 规则引擎 实现。

架构概览

文档处理采用规则引擎编排,将复杂的文档处理流程拆分为独立的组件节点,通过配置文件(document-rule.xml)定义流程编排。

总体流程

flowchart LR
    A[文档上传] --> B[Ingest 接收]
    B --> C[Parse 解析]
    C --> D{异步解析?}
    D -->|是| E[提交异步任务]
    D -->|否| F[Chunk 切片]
    E --> G[等待回调]
    G --> F
    F --> H[Summary 摘要]
    F --> I[Tag 标签]
    H --> J[Embed 向量化]
    I --> J
    J --> K[完成]

核心组件

组件位置职责
DocumentContextcontext/DocumentContext.java流程上下文,传递数据和状态
IngestDocumentCmpcomponent/IngestDocumentCmp.java文档接收(Ingest阶段)
ParseDocumentCmpcomponent/ParseDocumentCmp.java文档解析(Parse阶段)
ChunkDocumentCmpcomponent/ChunkDocumentCmp.java文档切片(Chunk阶段)
EmbedSlicesCmpcomponent/EmbedSlicesCmp.java向量化(Embed阶段)
StatusMarkerCmpcomponent/StatusMarkerCmp.java状态标记

流程配置

规则定义位于 pigx-knowledge/src/main/resources/rules/document-rule.xml

<chain name="uploadDocumentChain">   <!-- 文件上传 -->
<chain name="textInputChain">        <!-- 文本录入 -->
<chain name="excelQaChain">          <!-- Excel QA -->
<chain name="manualQaChain">         <!-- 手动问答 -->
<chain name="retryDocumentChain">    <!-- 重试处理 -->

文档解析器架构

解析器选择流程

flowchart LR
    A[文档] --> B[遍历解析器]
    B --> C{supports?}
    C -->|否| B
    C -->|是| D[加入候选列表]
    D --> E[按优先级排序]
    E --> F[选择最高优先级]
    F --> G[执行解析]

接口定义

位置: support/handler/parse/UploadFileParseHandler.java

public interface UploadFileParseHandler extends Ordered {
    // 判断是否支持该文档类型
    boolean supports(AiDatasetEntity dataset, AiDocumentEntity document);

    // 文件转文本,返回 (状态, 结果/任务ID/错误信息)
    Pair<FileParserStatusEnums, String> file2String(...);

    // 优先级(数值越大优先级越高)
    int getOrder();
}

解析器类型

解析器优先级支持格式解析方式
MineruCloudUploadFileParseHandler200DOCX, PDF, PPTX, PNG, JPGMineru 云服务(异步)
PdfAIUploadFileParseHandler100PDFPDFBox → AI视觉OCR
Office2MdUploadFileParseHandler100DOCX, PDF, PPTX, XLSXMarkitdown 转换服务
PdfOcrUploadFileParseHandler0PDFPDFBox → UmiOCR
OfficeUploadFileParseHandler0DOC, DOCX, PPT, PPTX, XLS, XLSXApache POI
ImageOcrUploadFileParseHandler0PNG, JPG, JPEGUmiOCR
TextUploadFileParseHandler0TXT, MDTextDocumentParser

解析状态

状态说明后续流程
PARSE_SUCCESS解析成功进入 Chunk 阶段
PARSE_FAIL解析失败标记失败
ASYNC_PARSING异步解析中等待回调

流程阶段详解

1. Ingest 阶段 - 文档接收

组件: IngestDocumentCmp.java

flowchart LR
    A[请求入口] --> B{文档类型?}
    B -->|文件上传| C[ingestDocument]
    B -->|文本录入| D[ingestText]
    B -->|Excel QA| E[ingestExcelQa]
    B -->|手动问答| F[ingestManualQa]
    C --> G[加载文档实体]
    D --> G
    E --> G
    F --> G
    G --> H[加载知识库配置]
    H --> I[初始化上下文]

2. Parse 阶段 - 文档解析

组件: ParseDocumentCmp.java

flowchart LR
    A[选择解析器] --> B[执行解析]
    B --> C{解析状态?}
    C -->|SUCCESS| D[数据清洗]
    C -->|ASYNC| E[保存任务ID]
    C -->|FAIL| F[记录错误]
    D --> G[设置 rawContent]
    E --> H[提交异步任务]
    H --> I[标记解析中]
    F --> J[标记失败]
    G --> K[进入 Chunk]

Excel QA 解析

flowchart LR
    A[Excel 文件] --> B[POI 读取]
    B --> C[遍历行数据]
    C --> D[提取 Q/A 列]
    D --> E[构建 QaItem]
    E --> F[设置 qaList]
    F --> G[创建切片]

3. Chunk 阶段 - 文档切片

组件: ChunkDocumentCmp.java

flowchart LR
    A[rawContent] --> B{切片策略?}
    B -->|PARAGRAPH| C[按段落分割]
    B -->|CHARACTER| D[按字符分割]
    B -->|SENTENCE| E[按句子分割]
    B -->|默认| F[递归分割]
    C --> G[生成切片列表]
    D --> G
    E --> G
    F --> G
    G --> H[批量保存]
    H --> I[设置 slices]

切片策略:

策略实现类说明
PARAGRAPHDocumentByParagraphSplitter按段落分割
CHARACTERDocumentByCharacterSplitter按字符数分割
SENTENCEDocumentBySentenceSplitter按句子分割
默认DocumentSplitters.recursive()递归分割

4. Embed 阶段 - 向量化

组件: EmbedSlicesCmp.java

flowchart LR
    A[切片列表] --> B[选择向量化策略]
    B --> C{知识库模式?}
    C -->|Q2Q 客服| D[纯问题向量化]
    C -->|通用| E[文档名+内容+摘要]
    D --> F[构建 TextSegment]
    E --> F
    F --> G[调用 EmbeddingModel]
    G --> H[存入向量库]
    H --> I[更新切片状态]

向量化内容构建:

知识库模式文档来源向量化内容元数据 type
Q2Q(客服)Q&A纯问题"1" (问题)
Q2Q(客服)非Q&A文档名+切片内容"1" (问题)
通用模式任意文档名+切片内容+摘要"0" (答案)

向量化策略

位置: support/handler/rag/strategy/impl/DefaultEmbeddingStrategy.java

@Override
public void processEmbedding(AiDocumentEntity documentEntity, AiDatasetEntity aiDataset,
                             List<AiSliceEntity> sliceEntityList, EmbeddingStore<TextSegment> embeddingStore) {
    ...
    for (AiSliceEntity sliceEntity : sliceEntityList) {
        // 构建向量化内容
        String content = buildEmbeddingContent(documentEntity, aiDataset, sliceEntity);

        // 构建元数据
        Metadata metadata = buildMetadata(documentEntity, sliceEntity, aiDataset);

        // 向量化并存储
        TextSegment textSegment = TextSegment.from(content, metadata);
        Embedding embedding = embeddingModel.embed(textSegment.text()).content();
        embeddingStore.add(sliceEntity.getEmbeddingId(), embedding, textSegment);
    }
}

5. 状态标记

组件: StatusMarkerCmp.java

flowchart LR
    A[处理结果] --> B{状态?}
    B -->|异步解析| C[markParsing]
    B -->|处理完成| D[markCompleted]
    B -->|处理失败| E[markFailed]
    C --> F[ASYNC_PARSING]
    D --> G[SLICED]
    E --> H[FAILED]
节点状态说明
markParsingASYNC_PARSING等待异步解析
markCompletedSLICED处理完成
markFailedFAILED处理失败

流程上下文

位置: context/DocumentContext.java

public class DocumentContext {
    // 输入参数
    private AiDocumentDTO inputPayload;
    private Long documentId;
    private Long datasetId;

    // 处理状态
    private SliceStatusEnums state;
    private String currentStage;

    // 中间结果
    private AiDocumentEntity documentEntity;
    private AiDatasetEntity datasetEntity;
    private String rawContent;           // 解析后的文本
    private List<AiSliceEntity> slices;  // 切片列表
    private List<QaItem> qaList;         // QA列表

    // 解析相关
    private String parserType;
    private String asyncTaskId;
    private FileParserStatusEnums parseStatus;
    ...
}

流程编排示例

文件上传流程

flowchart LR
    A[ingestDocument] --> B[selectParser]
    B --> C[parseDocument]
    C --> D{isAsyncParser?}
    D -->|是| E[submitAsyncTask]
    E --> F[markParsing]
    D -->|否| G{parseSuccess?}
    G -->|是| H[chunkDocument]
    G -->|否| I[markFailed]
    H --> J[summaryDocument]
    H --> K[tagDocument]
    J --> L[embedSlices]
    K --> L
    L --> M[markCompleted]

执行路径:

  1. 同步解析成功: ingest → selectParser → parse → chunk → summary/tag → embed → completed
  2. 异步解析: ingest → selectParser → parse → submitAsync → markParsing (等待回调)
  3. 解析失败: ingest → selectParser → parse → markFailed

Excel QA 流程

flowchart LR
    A[ingestExcelQa] --> B[parseExcelQa]
    B --> C[createQaSlices]
    C --> D[embedSlices]
    D --> E[markCompleted]

向量存储支持

flowchart LR
    A[EmbeddingStoreFactory] --> B{向量库类型?}
    B -->|milvus| C[MilvusEmbeddingStoreFactory]
    B -->|qdrant| D[QdrantEmbeddingStoreFactory]
    B -->|chroma| E[ChromaEmbeddingStoreFactory]
    B -->|pgvector| F[PgVectorEmbeddingStoreFactory]
    B -->|neo4j| G[Neo4jEmbeddingStoreFactory]
    C --> H[DefaultEmbeddingStrategy]
    D --> H
    E --> H
    F --> H
    G --> I[Neo4jEmbeddingStrategy]
向量存储工厂类策略实现
MilvusMilvusEmbeddingStoreFactoryDefaultEmbeddingStrategy
QdrantQdrantEmbeddingStoreFactoryDefaultEmbeddingStrategy
ChromaChromaEmbeddingStoreFactoryDefaultEmbeddingStrategy
PgvectorPgVectorEmbeddingStoreFactoryDefaultEmbeddingStrategy
Neo4jNeo4jEmbeddingStoreFactoryNeo4jEmbeddingStrategy

关键代码位置

功能文件路径
流程规则定义pigx-knowledge/src/main/resources/rules/document-rule.xml
上下文定义support/liteflow/document/context/DocumentContext.java
Ingest组件support/liteflow/document/component/IngestDocumentCmp.java
Parse组件support/liteflow/document/component/ParseDocumentCmp.java
Chunk组件support/liteflow/document/component/ChunkDocumentCmp.java
Embed组件support/liteflow/document/component/EmbedSlicesCmp.java
解析器接口support/handler/parse/UploadFileParseHandler.java
向量化策略support/handler/rag/strategy/EmbeddingStrategy.java
默认策略实现support/handler/rag/strategy/impl/DefaultEmbeddingStrategy.java