知识库文档处理解析
本文档解析 PIGX 知识库文档处理的完整流程,基于 LiteFlow 规则引擎 实现。
架构概览
文档处理采用规则引擎编排,将复杂的文档处理流程拆分为独立的组件节点,通过配置文件(document-rule.xml)定义流程编排。
总体流程
flowchart LR
A[文档上传] --> B[Ingest 接收]
B --> C[Parse 解析]
C --> D{异步解析?}
D -->|是| E[提交异步任务]
D -->|否| F[Chunk 切片]
E --> G[等待回调]
G --> F
F --> H[Summary 摘要]
F --> I[Tag 标签]
H --> J[Embed 向量化]
I --> J
J --> K[完成]
核心组件
| 组件 | 位置 | 职责 |
|---|
DocumentContext | context/DocumentContext.java | 流程上下文,传递数据和状态 |
IngestDocumentCmp | component/IngestDocumentCmp.java | 文档接收(Ingest阶段) |
ParseDocumentCmp | component/ParseDocumentCmp.java | 文档解析(Parse阶段) |
ChunkDocumentCmp | component/ChunkDocumentCmp.java | 文档切片(Chunk阶段) |
EmbedSlicesCmp | component/EmbedSlicesCmp.java | 向量化(Embed阶段) |
StatusMarkerCmp | component/StatusMarkerCmp.java | 状态标记 |
流程配置
规则定义位于 pigx-knowledge/src/main/resources/rules/document-rule.xml:
<chain name="uploadDocumentChain"> <!-- 文件上传 -->
<chain name="textInputChain"> <!-- 文本录入 -->
<chain name="excelQaChain"> <!-- Excel QA -->
<chain name="manualQaChain"> <!-- 手动问答 -->
<chain name="retryDocumentChain"> <!-- 重试处理 -->
文档解析器架构
解析器选择流程
flowchart LR
A[文档] --> B[遍历解析器]
B --> C{supports?}
C -->|否| B
C -->|是| D[加入候选列表]
D --> E[按优先级排序]
E --> F[选择最高优先级]
F --> G[执行解析]
接口定义
位置: support/handler/parse/UploadFileParseHandler.java
public interface UploadFileParseHandler extends Ordered {
// 判断是否支持该文档类型
boolean supports(AiDatasetEntity dataset, AiDocumentEntity document);
// 文件转文本,返回 (状态, 结果/任务ID/错误信息)
Pair<FileParserStatusEnums, String> file2String(...);
// 优先级(数值越大优先级越高)
int getOrder();
}
解析器类型
| 解析器 | 优先级 | 支持格式 | 解析方式 |
|---|
MineruCloudUploadFileParseHandler | 200 | DOCX, PDF, PPTX, PNG, JPG | Mineru 云服务(异步) |
PdfAIUploadFileParseHandler | 100 | PDF | PDFBox → AI视觉OCR |
Office2MdUploadFileParseHandler | 100 | DOCX, PDF, PPTX, XLSX | Markitdown 转换服务 |
PdfOcrUploadFileParseHandler | 0 | PDF | PDFBox → UmiOCR |
OfficeUploadFileParseHandler | 0 | DOC, DOCX, PPT, PPTX, XLS, XLSX | Apache POI |
ImageOcrUploadFileParseHandler | 0 | PNG, JPG, JPEG | UmiOCR |
TextUploadFileParseHandler | 0 | TXT, MD | TextDocumentParser |
解析状态
| 状态 | 说明 | 后续流程 |
|---|
PARSE_SUCCESS | 解析成功 | 进入 Chunk 阶段 |
PARSE_FAIL | 解析失败 | 标记失败 |
ASYNC_PARSING | 异步解析中 | 等待回调 |
流程阶段详解
1. Ingest 阶段 - 文档接收
组件: IngestDocumentCmp.java
flowchart LR
A[请求入口] --> B{文档类型?}
B -->|文件上传| C[ingestDocument]
B -->|文本录入| D[ingestText]
B -->|Excel QA| E[ingestExcelQa]
B -->|手动问答| F[ingestManualQa]
C --> G[加载文档实体]
D --> G
E --> G
F --> G
G --> H[加载知识库配置]
H --> I[初始化上下文]
2. Parse 阶段 - 文档解析
组件: ParseDocumentCmp.java
flowchart LR
A[选择解析器] --> B[执行解析]
B --> C{解析状态?}
C -->|SUCCESS| D[数据清洗]
C -->|ASYNC| E[保存任务ID]
C -->|FAIL| F[记录错误]
D --> G[设置 rawContent]
E --> H[提交异步任务]
H --> I[标记解析中]
F --> J[标记失败]
G --> K[进入 Chunk]
Excel QA 解析
flowchart LR
A[Excel 文件] --> B[POI 读取]
B --> C[遍历行数据]
C --> D[提取 Q/A 列]
D --> E[构建 QaItem]
E --> F[设置 qaList]
F --> G[创建切片]
3. Chunk 阶段 - 文档切片
组件: ChunkDocumentCmp.java
flowchart LR
A[rawContent] --> B{切片策略?}
B -->|PARAGRAPH| C[按段落分割]
B -->|CHARACTER| D[按字符分割]
B -->|SENTENCE| E[按句子分割]
B -->|默认| F[递归分割]
C --> G[生成切片列表]
D --> G
E --> G
F --> G
G --> H[批量保存]
H --> I[设置 slices]
切片策略:
| 策略 | 实现类 | 说明 |
|---|
| PARAGRAPH | DocumentByParagraphSplitter | 按段落分割 |
| CHARACTER | DocumentByCharacterSplitter | 按字符数分割 |
| SENTENCE | DocumentBySentenceSplitter | 按句子分割 |
| 默认 | DocumentSplitters.recursive() | 递归分割 |
4. Embed 阶段 - 向量化
组件: EmbedSlicesCmp.java
flowchart LR
A[切片列表] --> B[选择向量化策略]
B --> C{知识库模式?}
C -->|Q2Q 客服| D[纯问题向量化]
C -->|通用| E[文档名+内容+摘要]
D --> F[构建 TextSegment]
E --> F
F --> G[调用 EmbeddingModel]
G --> H[存入向量库]
H --> I[更新切片状态]
向量化内容构建:
| 知识库模式 | 文档来源 | 向量化内容 | 元数据 type |
|---|
| Q2Q(客服) | Q&A | 纯问题 | "1" (问题) |
| Q2Q(客服) | 非Q&A | 文档名+切片内容 | "1" (问题) |
| 通用模式 | 任意 | 文档名+切片内容+摘要 | "0" (答案) |
向量化策略
位置: support/handler/rag/strategy/impl/DefaultEmbeddingStrategy.java
@Override
public void processEmbedding(AiDocumentEntity documentEntity, AiDatasetEntity aiDataset,
List<AiSliceEntity> sliceEntityList, EmbeddingStore<TextSegment> embeddingStore) {
...
for (AiSliceEntity sliceEntity : sliceEntityList) {
// 构建向量化内容
String content = buildEmbeddingContent(documentEntity, aiDataset, sliceEntity);
// 构建元数据
Metadata metadata = buildMetadata(documentEntity, sliceEntity, aiDataset);
// 向量化并存储
TextSegment textSegment = TextSegment.from(content, metadata);
Embedding embedding = embeddingModel.embed(textSegment.text()).content();
embeddingStore.add(sliceEntity.getEmbeddingId(), embedding, textSegment);
}
}
5. 状态标记
组件: StatusMarkerCmp.java
flowchart LR
A[处理结果] --> B{状态?}
B -->|异步解析| C[markParsing]
B -->|处理完成| D[markCompleted]
B -->|处理失败| E[markFailed]
C --> F[ASYNC_PARSING]
D --> G[SLICED]
E --> H[FAILED]
| 节点 | 状态 | 说明 |
|---|
markParsing | ASYNC_PARSING | 等待异步解析 |
markCompleted | SLICED | 处理完成 |
markFailed | FAILED | 处理失败 |
流程上下文
位置: context/DocumentContext.java
public class DocumentContext {
// 输入参数
private AiDocumentDTO inputPayload;
private Long documentId;
private Long datasetId;
// 处理状态
private SliceStatusEnums state;
private String currentStage;
// 中间结果
private AiDocumentEntity documentEntity;
private AiDatasetEntity datasetEntity;
private String rawContent; // 解析后的文本
private List<AiSliceEntity> slices; // 切片列表
private List<QaItem> qaList; // QA列表
// 解析相关
private String parserType;
private String asyncTaskId;
private FileParserStatusEnums parseStatus;
...
}
流程编排示例
文件上传流程
flowchart LR
A[ingestDocument] --> B[selectParser]
B --> C[parseDocument]
C --> D{isAsyncParser?}
D -->|是| E[submitAsyncTask]
E --> F[markParsing]
D -->|否| G{parseSuccess?}
G -->|是| H[chunkDocument]
G -->|否| I[markFailed]
H --> J[summaryDocument]
H --> K[tagDocument]
J --> L[embedSlices]
K --> L
L --> M[markCompleted]
执行路径:
- 同步解析成功: ingest → selectParser → parse → chunk → summary/tag → embed → completed
- 异步解析: ingest → selectParser → parse → submitAsync → markParsing (等待回调)
- 解析失败: ingest → selectParser → parse → markFailed
Excel QA 流程
flowchart LR
A[ingestExcelQa] --> B[parseExcelQa]
B --> C[createQaSlices]
C --> D[embedSlices]
D --> E[markCompleted]
向量存储支持
flowchart LR
A[EmbeddingStoreFactory] --> B{向量库类型?}
B -->|milvus| C[MilvusEmbeddingStoreFactory]
B -->|qdrant| D[QdrantEmbeddingStoreFactory]
B -->|chroma| E[ChromaEmbeddingStoreFactory]
B -->|pgvector| F[PgVectorEmbeddingStoreFactory]
B -->|neo4j| G[Neo4jEmbeddingStoreFactory]
C --> H[DefaultEmbeddingStrategy]
D --> H
E --> H
F --> H
G --> I[Neo4jEmbeddingStrategy]
| 向量存储 | 工厂类 | 策略实现 |
|---|
| Milvus | MilvusEmbeddingStoreFactory | DefaultEmbeddingStrategy |
| Qdrant | QdrantEmbeddingStoreFactory | DefaultEmbeddingStrategy |
| Chroma | ChromaEmbeddingStoreFactory | DefaultEmbeddingStrategy |
| Pgvector | PgVectorEmbeddingStoreFactory | DefaultEmbeddingStrategy |
| Neo4j | Neo4jEmbeddingStoreFactory | Neo4jEmbeddingStrategy |
关键代码位置
| 功能 | 文件路径 |
|---|
| 流程规则定义 | pigx-knowledge/src/main/resources/rules/document-rule.xml |
| 上下文定义 | support/liteflow/document/context/DocumentContext.java |
| Ingest组件 | support/liteflow/document/component/IngestDocumentCmp.java |
| Parse组件 | support/liteflow/document/component/ParseDocumentCmp.java |
| Chunk组件 | support/liteflow/document/component/ChunkDocumentCmp.java |
| Embed组件 | support/liteflow/document/component/EmbedSlicesCmp.java |
| 解析器接口 | support/handler/parse/UploadFileParseHandler.java |
| 向量化策略 | support/handler/rag/strategy/EmbeddingStrategy.java |
| 默认策略实现 | support/handler/rag/strategy/impl/DefaultEmbeddingStrategy.java |