AI工作流节点后端开发手册
本手册以 MCP 节点为例,详细说明如何在 PigX AI Flow 中新增一个自定义节点类型。AI Flow 节点是系统的核心组件,每个节点负责执行特定的业务逻辑。
执行流程概览
flowchart LR
A[AiFlowController.execute<br/>接收请求] --> B[AiFlowService.executeFlow<br/>验证流程&解析DSL]
B --> C{执行模式}
C -->|流式| D[executeStreamFlow<br/>创建SseEmitter+异步执行]
C -->|非流式| E[executeNormalFlow<br/>同步执行]
D --> F[AiFlowProcessor.execute<br/>构建LiteFlow链路]
E --> F
F --> G[LiteFlow引擎<br/>节点组件调度]
G --> H{节点类型}
H -->|流式节点| I[AbstractStreamingNodeComponent<br/>LlmNodeCmp/RagNodeCmp/McpNodeCmp]
H -->|普通节点| J[AbstractAiFlowNodeComponent<br/>HttpNodeCmp/TextNodeCmp/DbNodeCmp]
I --> K[执行节点业务逻辑<br/>sendStreamContent + sendStreamEnd]
J --> L[执行节点业务逻辑<br/>doProcess一次性返回]
K --> M[返回结果]
L --> M
M --> N{响应方式}
N -->|流式| O[SSE推送到前端]
N -->|非流式| P[R.ok返回JSON]
💡关键入口说明
执行流程从 Controller 层开始,经过 Service 层验证和模式选择,最终由 LiteFlow 引擎调度节点组件执行业务逻辑。
| 层级 | 类/方法 | 职责 |
|---|
| Controller层 | AiFlowController.execute() | 接收HTTP POST请求,接收AiFlowExecuteDTO参数 |
| Service层 | AiFlowService.executeFlow() | 验证流程ID,解析DSL,区分流式/非流式模式 |
| 流式执行 | executeStreamFlow() | 创建SseEmitter,设置callback,异步执行 |
| 非流式执行 | executeNormalFlow() | 同步执行,直接返回R<AiFlowResultVO> |
| 流程处理器 | AiFlowProcessor.execute() | 调用AiFlowChainBuilder,启动LiteFlow引擎 |
| 链路构建 | AiFlowChainBuilder.buildAndExecute() | 解析DSL,构建EL表达式,创建上下文 |
| 节点调度 | AbstractAiFlowNodeComponent.process() | 节点生命周期管理,调用具体实现 |
| 具体实现 | XxxNodeCmp.doStreamProcess() / doProcess() | 节点业务逻辑,流式输出或一次性返回 |
AI Flow 架构设计
PigX AI Flow 基于 LiteFlow 规则引擎构建,采用组件化的流式处理架构。
| 组件 | 说明 |
|---|
| LiteFlow 框架 | 底层规则引擎,提供流程编排和执行能力 |
| AbstractAiFlowNodeComponent | 所有节点的抽象基类,提供上下文访问、变量处理、状态管理等通用能力 |
| AbstractStreamingNodeComponent | 流式节点基类,继承自 AbstractAiFlowNodeComponent,支持流式和非流式双模式 |
| 具体节点组件 (LiteflowComponent) | LlmNodeCmp、McpNodeCmp、RagNodeCmp 等,使用 @LiteflowComponent 注解注册 |
| AiFlowContext | 执行上下文,继承自 FlowContextHolder,管理节点定义、变量、连接关系等 |
| AiFlowChainBuilder | 动态 EL 表达式构建器,将 JSON DSL 转换为 LiteFlow EL |
节点开发快速指南
| 步骤 | 操作 | 说明 |
|---|
| 1 | 定义节点类型常量 | 在 NodeTypeConstants.java 和 AiFlowNodeConstants.java 中添加节点类型 |
| 2 | 创建节点配置类 | 在 model/nodes/ 下创建 AiXxxNode.java 定义配置参数 |
| 3 | 扩展节点定义 | 在 AiNodeDefinition.java 中添加新节点的配置字段 |
| 4 | 选择基类 | 根据节点特性选择继承 AbstractAiFlowNodeComponent 或 AbstractStreamingNodeComponent |
| 5 | 实现节点组件 | 创建 XxxNodeCmp.java,使用 @LiteflowComponent 注解注册 |
| 6 | 实现处理逻辑 | 实现 doProcess() 或 doStreamProcess()/doNonStreamProcess() 方法 |
详细实现步骤
第一步:定义节点类型常量
需要在两个常量类中添加节点类型定义:
1.1 在 NodeTypeConstants 中添加原始节点类型
// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/liteflow/aiflow/constants/NodeTypeConstants.java
/**
* MCP节点 - 用于调用MCP(Model Context Protocol)服务的节点类型
*/
String MCP = "mcp";
1.2 在 AiFlowNodeConstants 中添加 LiteFlow 组件 ID
// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/liteflow/aiflow/AiFlowNodeConstants.java
/**
* MCP 工具调用节点
*/
String MCP = "aiflow_mcp";
💡双层常量定义
NodeTypeConstants.MCP = "mcp" 用于前端 DSL 定义和节点类型识别,AiFlowNodeConstants.MCP = "aiflow_mcp" 用于 LiteFlow 组件注册和 EL 表达式构建。两者必须保持关联关系,通过 AiFlowNodeConstants.PREFIX + nodeType 进行映射。
第二步:创建节点配置类
在 model/nodes/ 目录下创建节点配置类,定义节点的参数结构。
// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/liteflow/aiflow/model/nodes/AiMCPNode.java
/**
* AI MCP(Model Context Protocol)节点配置
*/
@Data
public class AiMCPNode {
/** MCP配置ID */
private String mcpId;
/** MCP配置名称 */
private String mcpName;
/** 输入提示词 */
private String prompt;
}
第三步:扩展节点定义
在 AiNodeDefinition.java 中添加新节点的配置字段,使系统能够识别和处理新的节点类型。
// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/liteflow/aiflow/model/AiNodeDefinition.java
/**
* MCP节点参数配置 - MCP服务调用配置
*/
private AiMCPNode mcpParams;
第四步:选择合适的基类
根据节点特性选择继承不同的基类:
| 基类 | 适用场景 | 需实现方法 |
|---|
| AbstractAiFlowNodeComponent | 普通节点(HTTP、DB、Text等) | doProcess(node, context) |
| AbstractStreamingNodeComponent | 流式节点(LLM、RAG、MCP、OCR等) | doStreamProcess(node, context) 和 doNonStreamProcess(node, context) |
⚠基类选择原则
流式节点用于需要实时响应的场景(如 AI 对话、长时间处理),必须继承 AbstractStreamingNodeComponent。普通节点用于一次性返回结果的场景,继承 AbstractAiFlowNodeComponent 即可。
第五步:实现节点组件
5.1 流式节点示例(MCP 节点)
// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/liteflow/aiflow/component/McpNodeCmp.java
/**
* MCP 节点组件 - 支持与外部工具和服务的标准化交互
*/
@Slf4j
@RequiredArgsConstructor
@LiteflowComponent(AiFlowNodeConstants.MCP)
public class McpNodeCmp extends AbstractStreamingNodeComponent {
private static final TemplateEngine TEMPLATE_ENGINE = TemplateUtil.createEngine(new TemplateConfig());
private final McpChatRule mcpChatRule;
@Override
protected Dict doStreamProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
return processMcp(node, context);
}
@Override
protected Dict doNonStreamProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
return processMcp(node, context);
}
private Dict processMcp(AiNodeDefinition node, AiFlowContext context) {
// 1. 验证配置
AiMCPNode config = validateNodeConfig(node);
// 2. 处理输入参数和模板
Dict variables = getInputVariables(node, context);
String inputMessage = TEMPLATE_ENGINE.getTemplate(config.getPrompt()).render(variables);
// 3. 构建请求
ChatMessageDTO chatMessageDTO = buildChatMessageDTO(config, inputMessage);
// 4. 调用处理
return processMcpCallViaRule(chatMessageDTO, context, node);
}
private AiMCPNode validateNodeConfig(AiNodeDefinition node) {
AiMCPNode config = node.getMcpParams();
if (config == null) {
throw FlowException.invalidParam("MCP节点配置无效");
}
if (StrUtil.isBlank(config.getPrompt())) {
throw FlowException.invalidParam("MCP节点输入消息不能为空");
}
return config;
}
private ChatMessageDTO buildChatMessageDTO(AiMCPNode config, String inputMessage) {
ChatMessageDTO dto = new ChatMessageDTO();
dto.setContent(inputMessage);
if (StrUtil.isNotBlank(config.getMcpId())) {
ChatMessageDTO.ExtDetails extDetails = new ChatMessageDTO.ExtDetails();
extDetails.setMcpId(config.getMcpId());
dto.setExtDetails(extDetails);
}
return dto;
}
private Dict processMcpCallViaRule(ChatMessageDTO dto, AiFlowContext context, AiNodeDefinition node) {
AtomicReference<String> resultContent = new AtomicReference<>("");
AtomicInteger totalTokens = new AtomicInteger(0);
try {
Flux<AiMessageResultDTO> resultFlux = mcpChatRule.process(dto);
resultFlux
.doOnNext(result -> {
String content = result.getMessage();
if (StrUtil.isNotBlank(content)) {
resultContent.updateAndGet(current -> current + content);
}
context.sendStreamContent(content);
})
.doOnComplete(() -> {
context.sendStreamEnd(node, totalTokens.get() > 0 ? totalTokens.get() : null);
})
.doOnError(error -> {
log.error("MCP调用错误", error);
})
.onErrorComplete()
.blockLast();
} catch (Exception e) {
log.error("MCP调用执行失败", e);
throw FlowException.nodeError(context.getFlowId().toString(), "[MCP节点] -> " + e.getMessage());
}
return Dict.create()
.set(FlowConstant.CONTENT, resultContent.get())
.set(FlowConstant.ROLE, "assistant")
.set(FlowConstant.TOKENS, totalTokens.get())
.set(FlowConstant.TIMESTAMP, System.currentTimeMillis());
}
}
5.2 普通节点示例(HTTP 节点)
对于不需要流式处理的节点,只需继承 AbstractAiFlowNodeComponent 并实现 doProcess() 方法:
/**
* HTTP 请求节点组件
*/
@Slf4j
@RequiredArgsConstructor
@LiteflowComponent(AiFlowNodeConstants.HTTP)
public class HttpNodeCmp extends AbstractAiFlowNodeComponent {
@Override
protected Dict doProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
AiHttpNode config = validateNodeConfig(node);
// 执行 HTTP 请求
String response = executeHttpRequest(config);
return Dict.create()
.set(FlowConstant.CONTENT, response)
.set(FlowConstant.TIMESTAMP, System.currentTimeMillis());
}
// ... 其他辅助方法
}
开发规范指南
1. 类结构规范
1.1 流式节点标准模板
/**
* XXX 节点组件 - 功能说明
*/
@Slf4j
@RequiredArgsConstructor
@LiteflowComponent(AiFlowNodeConstants.XXX)
public class XxxNodeCmp extends AbstractStreamingNodeComponent {
private final SomeService someService;
@Override
protected Dict doStreamProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
// 1. 验证配置
// 2. 处理输入
// 3. 执行逻辑
// 4. context.sendStreamContent(content)
// 5. context.sendStreamEnd(node, tokens)
// 6. 返回结果
}
@Override
protected Dict doNonStreamProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
// 1. 验证配置
// 2. 处理输入
// 3. 执行逻辑
// 4. 返回结果
}
// ... 辅助方法
}
1.2 普通节点标准模板
/**
* XXX 节点组件 - 功能说明
*/
@Slf4j
@RequiredArgsConstructor
@LiteflowComponent(AiFlowNodeConstants.XXX)
public class XxxNodeCmp extends AbstractAiFlowNodeComponent {
private final SomeService someService;
@Override
protected Dict doProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
// 1. 验证配置
// 2. 处理输入
// 3. 执行逻辑
// 4. 返回结果
}
// ... 辅助方法
}
2. 配置验证规范
private AiXxxNode validateNodeConfig(AiNodeDefinition node) {
AiXxxNode config = node.getXxxParams();
if (config == null) {
throw FlowException.invalidParam("XXX节点配置无效");
}
if (StrUtil.isBlank(config.getRequiredField())) {
throw FlowException.invalidParam("必填字段不能为空");
}
if (config.getMaxCount() != null && config.getMaxCount() <= 0) {
throw FlowException.invalidParam("最大数量必须大于0");
}
return config;
}
💡验证规范建议
每个节点都应包含严格的配置验证逻辑,确保运行时的稳定性。验证应包括:配置对象非空检查、必填字段检查、字段值范围检查。
3. 返回结果规范
所有节点处理器必须返回统一格式的 Dict 对象,确保系统的一致性。
return Dict.create()
.set(FlowConstant.CONTENT, resultContent)
.set(FlowConstant.ROLE, "assistant")
.set(FlowConstant.TOKENS, tokenCount)
.set(FlowConstant.TIMESTAMP, System.currentTimeMillis());
工具方法说明
AbstractAiFlowNodeComponent 提供的工具方法
| 方法分类 | 方法签名 | 功能说明 | 返回值 |
|---|
| 上下文访问 | getAiFlowContext() | 获取 AI Flow 执行上下文 | AiFlowContext 对象 |
| 上下文访问 | getCurrentNodeDefinition() | 获取当前节点定义 | AiNodeDefinition 对象 |
| 变量处理 | getInputVariables(node, context) | 获取节点输入参数 | Dict 对象,包含输入变量的键值对 |
| 变量处理 | getOutputVariables(node, kv) | 获取节点输出参数(带类型转换) | Dict 对象,包含输出变量的键值对 |
| 变量处理 | saveOutputVariables(node, result, context) | 保存输出变量到上下文 | 无返回值 |
| 状态管理 | updateNodeStatusSuccess(node, duration, result) | 更新节点执行状态为成功 | 无返回值 |
| 状态管理 | updateNodeStatusError(node, duration, error) | 更新节点执行状态为失败 | 无返回值 |
| 工具方法 | addQueryParam(url, name, value) | 添加 URL 查询参数 | 拼接好参数的完整 URL |
AiFlowContext 上下文方法
| 方法类型 | 方法签名 | 说明 |
|---|
| 变量管理 | getVariable(key) | 获取变量值(支持多源查找:variables → parameters → envs) |
| 变量管理 | setVariable(key, value) | 设置变量值 |
| 变量管理 | setVariable(nodeId, key, value) | 设置节点作用域变量 |
| 变量管理 | setVariables(nodeId, kv) | 批量设置节点变量 |
| 参数管理 | getParameter(key) | 获取流程启动参数 |
| 参数管理 | setParameter(key, value) | 设置参数值 |
| 节点访问 | getNodeDefinition(nodeId) | 获取指定节点定义 |
| 节点访问 | setCurrentNodeId(nodeId) | 设置当前执行节点ID |
| 连接关系 | getOutgoingConnections(sourceNodeId) | 获取节点的所有后续连接 |
| 连接关系 | getNextNodeId(sourceNodeId) | 获取普通节点的下一个节点ID |
| 连接关系 | getNextNodeIdForBranch(sourceNodeId, branchIndex) | 获取分支节点的下一个节点ID |
| 分支控制 | recordBranchSelection(nodeId, branchIndex) | 记录分支选择 |
| 分支控制 | getBranchSelection(nodeId) | 获取分支选择 |
| 循环控制 | initLoop(loopNodeId, maxIterations) | 初始化循环节点 |
| 循环控制 | incrementLoopAndContinue(loopNodeId) | 增加循环索引并判断是否继续 |
| 循环控制 | getLoopIndex(loopNodeId) | 获取当前循环索引 |
| 执行统计 | addTokens(tokens) | 累加 Token 使用量 |
| 执行统计 | getDuration() | 获取执行时长 |
| 执行统计 | addExecutedNode(node) | 添加已执行节点到列表 |
流式处理支持
对于需要实时响应的节点(如 LLM、RAG、MCP、OCR 等),系统提供了统一的流式处理机制。
流式处理核心方法
FlowContextHolder(通过 AiFlowContext 继承)提供了两个核心流式方法:
1. sendStreamContent(String content)
功能:发送流式内容片段到前端
使用场景:在流式处理过程中逐步发送生成的内容,实现实时响应
方法签名:
public void sendStreamContent(String content)
实现逻辑:
public void sendStreamContent(String content) {
if (!stream || aiFlowExecuteDTO == null || aiFlowExecuteDTO.getCallback() == null) {
return;
}
aiFlowExecuteDTO.getCallback()
.execute(AiFlowExecuteDTO.FlowCallbackResult.builder()
.data(AiFlowExecuteDTO.FlowCallbackData.builder()
.content(content)
.build())
.build());
}
使用示例:
// 在 Flux 流式响应中使用
resultFlux.doOnNext(result -> {
String content = result.getMessage();
if (StrUtil.isNotBlank(content)) {
context.sendStreamContent(content);
}
});
// 在 TokenStream 中使用
tokenStream.onPartialResponse(partialContent -> {
context.sendStreamContent(partialContent);
});
✓实时响应
前端通过 SSE (Server-Sent Events) 实时接收内容片段,实现打字机效果,提升用户体验。
2. sendStreamEnd(AiNodeDefinition node, Integer tokens)
功能:发送流式结束信号,并自动处理节点状态更新、Token统计、耗时记录
使用场景:流式处理完成时调用,标记流程结束
方法签名:
public void sendStreamEnd(AiNodeDefinition node, Integer tokens)
实现逻辑:
public void sendStreamEnd(AiNodeDefinition node, Integer tokens) {
if (!stream || aiFlowExecuteDTO == null || aiFlowExecuteDTO.getCallback() == null) {
return;
}
AiFlowExecuteDTO.FlowCallbackData.FlowCallbackDataBuilder builder =
AiFlowExecuteDTO.FlowCallbackData.builder()
.content(AiChatConstants.END_MSG);
if (tokens != null && tokens > 0) {
builder.tokens(tokens).duration(getDuration());
}
if (node != null) {
if (!executedNodes.contains(node)) {
executedNodes.add(node);
}
node.setStatus(ExecutionStatusEnums.SUCCESS.getValue());
node.setDuration(getDuration());
if (tokens != null && tokens > 0) {
node.setTokens(tokens);
}
builder.nodes(new ArrayList<>(executedNodes));
}
aiFlowExecuteDTO.getCallback()
.execute(AiFlowExecuteDTO.FlowCallbackResult.builder()
.data(builder.build())
.build());
}
使用示例:
resultFlux
.doOnNext(result -> {
context.sendStreamContent(result.getMessage());
if (result.getTokens() != null) {
totalTokens.addAndGet(result.getTokens());
}
})
.doOnComplete(() -> {
context.sendStreamEnd(node, totalTokens.get() > 0 ? totalTokens.get() : null);
});
💡自动状态管理
sendStreamEnd() 方法会自动处理以下内容:节点状态更新为 SUCCESS、记录节点执行耗时、统计 Token 使用量、添加节点到已执行列表、发送完整节点列表给前端(用于流程可视化)。
流式方法总结对比
| 方法 | 发送内容类型 | 是否结束流 | 自动更新节点状态 | 典型使用时机 |
|---|
sendStreamContent() | 普通内容片段 | 否 | 否 | Flux.doOnNext() |
sendStreamEnd() | 结束信号 + 统计信息 | 是 | 是 | Flux.doOnComplete() |
流式处理标准模式
@Override
protected Dict doStreamProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
AtomicReference<String> accumulated = new AtomicReference<>("");
AtomicInteger totalTokens = new AtomicInteger(0);
try {
Flux<AiMessageResultDTO> resultFlux = someAiService.process(request);
resultFlux
.doOnNext(result -> {
String content = result.getMessage();
if (StrUtil.isNotBlank(content)) {
accumulated.updateAndGet(c -> c + content);
context.sendStreamContent(content);
}
if (result.getTokens() != null) {
totalTokens.addAndGet(result.getTokens());
}
})
.doOnComplete(() -> {
context.sendStreamEnd(node, totalTokens.get() > 0 ? totalTokens.get() : null);
})
.doOnError(error -> {
log.error("流式处理错误", error);
})
.onErrorComplete()
.blockLast();
return Dict.create()
.set(FlowConstant.CONTENT, accumulated.get())
.set(FlowConstant.TOKENS, totalTokens.get())
.set(FlowConstant.TIMESTAMP, System.currentTimeMillis());
} catch (Exception e) {
log.error("节点执行失败", e);
throw FlowException.nodeError(node.getId(), e.getMessage());
}
}
流式处理优势
使用 AbstractStreamingNodeComponent 和统一的流式方法具有以下优势:
- 自动状态管理:
sendStreamEnd() 自动更新节点状态为成功、记录耗时、统计 Token
- 代码简化:相比手动构建 FlowCallbackResult,代码量减少 60-75%
- 一致性:所有流式节点使用相同的模式,易于维护
- 错误处理:统一的错误处理机制,确保错误信息正确传递
- 双模式支持:同一节点自动支持流式和非流式两种模式
常见节点类型参考
基于现有代码,以下是不同类型节点的推荐基类选择:
| 节点类型 | 基类选择 | 原因 |
|---|
| LLM | AbstractStreamingNodeComponent | 需要流式输出 AI 生成内容 |
| RAG | AbstractStreamingNodeComponent | 检索后流式输出增强的回答 |
| MCP | AbstractStreamingNodeComponent | 工具调用可能返回流式结果 |
| OCR | AbstractStreamingNodeComponent | 识别过程可能较长,支持流式反馈 |
| HTTP | AbstractAiFlowNodeComponent | 普通请求,一次性返回 |
| DB | AbstractAiFlowNodeComponent | 数据库查询,一次性返回 |
| Text | AbstractAiFlowNodeComponent | 文本模板渲染,一次性返回 |
| Code | AbstractAiFlowNodeComponent | 代码执行,一次性返回 |
| Structured | AbstractAiFlowNodeComponent | JSON 生成,一次性返回 |