AI工作流节点后端开发手册

本手册以 MCP 节点为例,详细说明如何在 PigX AI Flow 中新增一个自定义节点类型。AI Flow 节点是系统的核心组件,每个节点负责执行特定的业务逻辑。

执行流程概览

flowchart LR
    A[AiFlowController.execute<br/>接收请求] --> B[AiFlowService.executeFlow<br/>验证流程&解析DSL]
    B --> C{执行模式}
    C -->|流式| D[executeStreamFlow<br/>创建SseEmitter+异步执行]
    C -->|非流式| E[executeNormalFlow<br/>同步执行]
    D --> F[AiFlowProcessor.execute<br/>构建LiteFlow链路]
    E --> F
    F --> G[LiteFlow引擎<br/>节点组件调度]
    G --> H{节点类型}
    H -->|流式节点| I[AbstractStreamingNodeComponent<br/>LlmNodeCmp/RagNodeCmp/McpNodeCmp]
    H -->|普通节点| J[AbstractAiFlowNodeComponent<br/>HttpNodeCmp/TextNodeCmp/DbNodeCmp]
    I --> K[执行节点业务逻辑<br/>sendStreamContent + sendStreamEnd]
    J --> L[执行节点业务逻辑<br/>doProcess一次性返回]
    K --> M[返回结果]
    L --> M
    M --> N{响应方式}
    N -->|流式| O[SSE推送到前端]
    N -->|非流式| P[R.ok返回JSON]
关键入口说明

执行流程从 Controller 层开始,经过 Service 层验证和模式选择,最终由 LiteFlow 引擎调度节点组件执行业务逻辑。

层级类/方法职责
Controller层AiFlowController.execute()接收HTTP POST请求,接收AiFlowExecuteDTO参数
Service层AiFlowService.executeFlow()验证流程ID,解析DSL,区分流式/非流式模式
流式执行executeStreamFlow()创建SseEmitter,设置callback,异步执行
非流式执行executeNormalFlow()同步执行,直接返回R<AiFlowResultVO>
流程处理器AiFlowProcessor.execute()调用AiFlowChainBuilder,启动LiteFlow引擎
链路构建AiFlowChainBuilder.buildAndExecute()解析DSL,构建EL表达式,创建上下文
节点调度AbstractAiFlowNodeComponent.process()节点生命周期管理,调用具体实现
具体实现XxxNodeCmp.doStreamProcess() / doProcess()节点业务逻辑,流式输出或一次性返回

AI Flow 架构设计

PigX AI Flow 基于 LiteFlow 规则引擎构建,采用组件化的流式处理架构。

组件说明
LiteFlow 框架底层规则引擎,提供流程编排和执行能力
AbstractAiFlowNodeComponent所有节点的抽象基类,提供上下文访问、变量处理、状态管理等通用能力
AbstractStreamingNodeComponent流式节点基类,继承自 AbstractAiFlowNodeComponent,支持流式和非流式双模式
具体节点组件 (LiteflowComponent)LlmNodeCmp、McpNodeCmp、RagNodeCmp 等,使用 @LiteflowComponent 注解注册
AiFlowContext执行上下文,继承自 FlowContextHolder,管理节点定义、变量、连接关系等
AiFlowChainBuilder动态 EL 表达式构建器,将 JSON DSL 转换为 LiteFlow EL

节点开发快速指南

步骤操作说明
1定义节点类型常量NodeTypeConstants.javaAiFlowNodeConstants.java 中添加节点类型
2创建节点配置类model/nodes/ 下创建 AiXxxNode.java 定义配置参数
3扩展节点定义AiNodeDefinition.java 中添加新节点的配置字段
4选择基类根据节点特性选择继承 AbstractAiFlowNodeComponentAbstractStreamingNodeComponent
5实现节点组件创建 XxxNodeCmp.java,使用 @LiteflowComponent 注解注册
6实现处理逻辑实现 doProcess()doStreamProcess()/doNonStreamProcess() 方法

详细实现步骤

第一步:定义节点类型常量

需要在两个常量类中添加节点类型定义:

1.1 在 NodeTypeConstants 中添加原始节点类型

// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/liteflow/aiflow/constants/NodeTypeConstants.java

/**
 * MCP节点 - 用于调用MCP(Model Context Protocol)服务的节点类型
 */
String MCP = "mcp";

1.2 在 AiFlowNodeConstants 中添加 LiteFlow 组件 ID

// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/liteflow/aiflow/AiFlowNodeConstants.java

/**
 * MCP 工具调用节点
 */
String MCP = "aiflow_mcp";
双层常量定义

NodeTypeConstants.MCP = "mcp" 用于前端 DSL 定义和节点类型识别,AiFlowNodeConstants.MCP = "aiflow_mcp" 用于 LiteFlow 组件注册和 EL 表达式构建。两者必须保持关联关系,通过 AiFlowNodeConstants.PREFIX + nodeType 进行映射。

第二步:创建节点配置类

model/nodes/ 目录下创建节点配置类,定义节点的参数结构。

// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/liteflow/aiflow/model/nodes/AiMCPNode.java

/**
 * AI MCP(Model Context Protocol)节点配置
 */
@Data
public class AiMCPNode {
    /** MCP配置ID */
    private String mcpId;

    /** MCP配置名称 */
    private String mcpName;

    /** 输入提示词 */
    private String prompt;
}

第三步:扩展节点定义

AiNodeDefinition.java 中添加新节点的配置字段,使系统能够识别和处理新的节点类型。

// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/liteflow/aiflow/model/AiNodeDefinition.java

/**
 * MCP节点参数配置 - MCP服务调用配置
 */
private AiMCPNode mcpParams;

第四步:选择合适的基类

根据节点特性选择继承不同的基类:

基类适用场景需实现方法
AbstractAiFlowNodeComponent普通节点(HTTP、DB、Text等)doProcess(node, context)
AbstractStreamingNodeComponent流式节点(LLM、RAG、MCP、OCR等)doStreamProcess(node, context)doNonStreamProcess(node, context)
基类选择原则

流式节点用于需要实时响应的场景(如 AI 对话、长时间处理),必须继承 AbstractStreamingNodeComponent。普通节点用于一次性返回结果的场景,继承 AbstractAiFlowNodeComponent 即可。

第五步:实现节点组件

5.1 流式节点示例(MCP 节点)

// 文件位置: pigx-knowledge/src/main/java/com/pig4cloud/pigx/knowledge/support/liteflow/aiflow/component/McpNodeCmp.java

/**
 * MCP 节点组件 - 支持与外部工具和服务的标准化交互
 */
@Slf4j
@RequiredArgsConstructor
@LiteflowComponent(AiFlowNodeConstants.MCP)
public class McpNodeCmp extends AbstractStreamingNodeComponent {

    private static final TemplateEngine TEMPLATE_ENGINE = TemplateUtil.createEngine(new TemplateConfig());
    private final McpChatRule mcpChatRule;

    @Override
    protected Dict doStreamProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
        return processMcp(node, context);
    }

    @Override
    protected Dict doNonStreamProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
        return processMcp(node, context);
    }

    private Dict processMcp(AiNodeDefinition node, AiFlowContext context) {
        // 1. 验证配置
        AiMCPNode config = validateNodeConfig(node);

        // 2. 处理输入参数和模板
        Dict variables = getInputVariables(node, context);
        String inputMessage = TEMPLATE_ENGINE.getTemplate(config.getPrompt()).render(variables);

        // 3. 构建请求
        ChatMessageDTO chatMessageDTO = buildChatMessageDTO(config, inputMessage);

        // 4. 调用处理
        return processMcpCallViaRule(chatMessageDTO, context, node);
    }

    private AiMCPNode validateNodeConfig(AiNodeDefinition node) {
        AiMCPNode config = node.getMcpParams();
        if (config == null) {
            throw FlowException.invalidParam("MCP节点配置无效");
        }
        if (StrUtil.isBlank(config.getPrompt())) {
            throw FlowException.invalidParam("MCP节点输入消息不能为空");
        }
        return config;
    }

    private ChatMessageDTO buildChatMessageDTO(AiMCPNode config, String inputMessage) {
        ChatMessageDTO dto = new ChatMessageDTO();
        dto.setContent(inputMessage);
        if (StrUtil.isNotBlank(config.getMcpId())) {
            ChatMessageDTO.ExtDetails extDetails = new ChatMessageDTO.ExtDetails();
            extDetails.setMcpId(config.getMcpId());
            dto.setExtDetails(extDetails);
        }
        return dto;
    }

    private Dict processMcpCallViaRule(ChatMessageDTO dto, AiFlowContext context, AiNodeDefinition node) {
        AtomicReference<String> resultContent = new AtomicReference<>("");
        AtomicInteger totalTokens = new AtomicInteger(0);

        try {
            Flux<AiMessageResultDTO> resultFlux = mcpChatRule.process(dto);

            resultFlux
                .doOnNext(result -> {
                    String content = result.getMessage();
                    if (StrUtil.isNotBlank(content)) {
                        resultContent.updateAndGet(current -> current + content);
                    }
                    context.sendStreamContent(content);
                })
                .doOnComplete(() -> {
                    context.sendStreamEnd(node, totalTokens.get() > 0 ? totalTokens.get() : null);
                })
                .doOnError(error -> {
                    log.error("MCP调用错误", error);
                })
                .onErrorComplete()
                .blockLast();
        } catch (Exception e) {
            log.error("MCP调用执行失败", e);
            throw FlowException.nodeError(context.getFlowId().toString(), "[MCP节点] -> " + e.getMessage());
        }

        return Dict.create()
            .set(FlowConstant.CONTENT, resultContent.get())
            .set(FlowConstant.ROLE, "assistant")
            .set(FlowConstant.TOKENS, totalTokens.get())
            .set(FlowConstant.TIMESTAMP, System.currentTimeMillis());
    }
}

5.2 普通节点示例(HTTP 节点)

对于不需要流式处理的节点,只需继承 AbstractAiFlowNodeComponent 并实现 doProcess() 方法:

/**
 * HTTP 请求节点组件
 */
@Slf4j
@RequiredArgsConstructor
@LiteflowComponent(AiFlowNodeConstants.HTTP)
public class HttpNodeCmp extends AbstractAiFlowNodeComponent {

    @Override
    protected Dict doProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
        AiHttpNode config = validateNodeConfig(node);

        // 执行 HTTP 请求
        String response = executeHttpRequest(config);

        return Dict.create()
            .set(FlowConstant.CONTENT, response)
            .set(FlowConstant.TIMESTAMP, System.currentTimeMillis());
    }

    // ... 其他辅助方法
}

开发规范指南

1. 类结构规范

1.1 流式节点标准模板

/**
 * XXX 节点组件 - 功能说明
 */
@Slf4j
@RequiredArgsConstructor
@LiteflowComponent(AiFlowNodeConstants.XXX)
public class XxxNodeCmp extends AbstractStreamingNodeComponent {

    private final SomeService someService;

    @Override
    protected Dict doStreamProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
        // 1. 验证配置
        // 2. 处理输入
        // 3. 执行逻辑
        // 4. context.sendStreamContent(content)
        // 5. context.sendStreamEnd(node, tokens)
        // 6. 返回结果
    }

    @Override
    protected Dict doNonStreamProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
        // 1. 验证配置
        // 2. 处理输入
        // 3. 执行逻辑
        // 4. 返回结果
    }

    // ... 辅助方法
}

1.2 普通节点标准模板

/**
 * XXX 节点组件 - 功能说明
 */
@Slf4j
@RequiredArgsConstructor
@LiteflowComponent(AiFlowNodeConstants.XXX)
public class XxxNodeCmp extends AbstractAiFlowNodeComponent {

    private final SomeService someService;

    @Override
    protected Dict doProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
        // 1. 验证配置
        // 2. 处理输入
        // 3. 执行逻辑
        // 4. 返回结果
    }

    // ... 辅助方法
}

2. 配置验证规范

private AiXxxNode validateNodeConfig(AiNodeDefinition node) {
    AiXxxNode config = node.getXxxParams();
    if (config == null) {
        throw FlowException.invalidParam("XXX节点配置无效");
    }
    if (StrUtil.isBlank(config.getRequiredField())) {
        throw FlowException.invalidParam("必填字段不能为空");
    }
    if (config.getMaxCount() != null && config.getMaxCount() <= 0) {
        throw FlowException.invalidParam("最大数量必须大于0");
    }
    return config;
}
验证规范建议

每个节点都应包含严格的配置验证逻辑,确保运行时的稳定性。验证应包括:配置对象非空检查、必填字段检查、字段值范围检查。

3. 返回结果规范

所有节点处理器必须返回统一格式的 Dict 对象,确保系统的一致性。

return Dict.create()
    .set(FlowConstant.CONTENT, resultContent)
    .set(FlowConstant.ROLE, "assistant")
    .set(FlowConstant.TOKENS, tokenCount)
    .set(FlowConstant.TIMESTAMP, System.currentTimeMillis());

工具方法说明

AbstractAiFlowNodeComponent 提供的工具方法

方法分类方法签名功能说明返回值
上下文访问getAiFlowContext()获取 AI Flow 执行上下文AiFlowContext 对象
上下文访问getCurrentNodeDefinition()获取当前节点定义AiNodeDefinition 对象
变量处理getInputVariables(node, context)获取节点输入参数Dict 对象,包含输入变量的键值对
变量处理getOutputVariables(node, kv)获取节点输出参数(带类型转换)Dict 对象,包含输出变量的键值对
变量处理saveOutputVariables(node, result, context)保存输出变量到上下文无返回值
状态管理updateNodeStatusSuccess(node, duration, result)更新节点执行状态为成功无返回值
状态管理updateNodeStatusError(node, duration, error)更新节点执行状态为失败无返回值
工具方法addQueryParam(url, name, value)添加 URL 查询参数拼接好参数的完整 URL

AiFlowContext 上下文方法

方法类型方法签名说明
变量管理getVariable(key)获取变量值(支持多源查找:variables → parameters → envs)
变量管理setVariable(key, value)设置变量值
变量管理setVariable(nodeId, key, value)设置节点作用域变量
变量管理setVariables(nodeId, kv)批量设置节点变量
参数管理getParameter(key)获取流程启动参数
参数管理setParameter(key, value)设置参数值
节点访问getNodeDefinition(nodeId)获取指定节点定义
节点访问setCurrentNodeId(nodeId)设置当前执行节点ID
连接关系getOutgoingConnections(sourceNodeId)获取节点的所有后续连接
连接关系getNextNodeId(sourceNodeId)获取普通节点的下一个节点ID
连接关系getNextNodeIdForBranch(sourceNodeId, branchIndex)获取分支节点的下一个节点ID
分支控制recordBranchSelection(nodeId, branchIndex)记录分支选择
分支控制getBranchSelection(nodeId)获取分支选择
循环控制initLoop(loopNodeId, maxIterations)初始化循环节点
循环控制incrementLoopAndContinue(loopNodeId)增加循环索引并判断是否继续
循环控制getLoopIndex(loopNodeId)获取当前循环索引
执行统计addTokens(tokens)累加 Token 使用量
执行统计getDuration()获取执行时长
执行统计addExecutedNode(node)添加已执行节点到列表

流式处理支持

对于需要实时响应的节点(如 LLM、RAG、MCP、OCR 等),系统提供了统一的流式处理机制。

流式处理核心方法

FlowContextHolder(通过 AiFlowContext 继承)提供了两个核心流式方法:

1. sendStreamContent(String content)

功能:发送流式内容片段到前端

使用场景:在流式处理过程中逐步发送生成的内容,实现实时响应

方法签名

public void sendStreamContent(String content)

实现逻辑

public void sendStreamContent(String content) {
    if (!stream || aiFlowExecuteDTO == null || aiFlowExecuteDTO.getCallback() == null) {
        return;
    }
    aiFlowExecuteDTO.getCallback()
        .execute(AiFlowExecuteDTO.FlowCallbackResult.builder()
            .data(AiFlowExecuteDTO.FlowCallbackData.builder()
                .content(content)
                .build())
            .build());
}

使用示例

// 在 Flux 流式响应中使用
resultFlux.doOnNext(result -> {
    String content = result.getMessage();
    if (StrUtil.isNotBlank(content)) {
        context.sendStreamContent(content);
    }
});

// 在 TokenStream 中使用
tokenStream.onPartialResponse(partialContent -> {
    context.sendStreamContent(partialContent);
});
实时响应

前端通过 SSE (Server-Sent Events) 实时接收内容片段,实现打字机效果,提升用户体验。

2. sendStreamEnd(AiNodeDefinition node, Integer tokens)

功能:发送流式结束信号,并自动处理节点状态更新、Token统计、耗时记录

使用场景:流式处理完成时调用,标记流程结束

方法签名

public void sendStreamEnd(AiNodeDefinition node, Integer tokens)

实现逻辑

public void sendStreamEnd(AiNodeDefinition node, Integer tokens) {
    if (!stream || aiFlowExecuteDTO == null || aiFlowExecuteDTO.getCallback() == null) {
        return;
    }

    AiFlowExecuteDTO.FlowCallbackData.FlowCallbackDataBuilder builder =
        AiFlowExecuteDTO.FlowCallbackData.builder()
            .content(AiChatConstants.END_MSG);

    if (tokens != null && tokens > 0) {
        builder.tokens(tokens).duration(getDuration());
    }

    if (node != null) {
        if (!executedNodes.contains(node)) {
            executedNodes.add(node);
        }
        node.setStatus(ExecutionStatusEnums.SUCCESS.getValue());
        node.setDuration(getDuration());
        if (tokens != null && tokens > 0) {
            node.setTokens(tokens);
        }
        builder.nodes(new ArrayList<>(executedNodes));
    }

    aiFlowExecuteDTO.getCallback()
        .execute(AiFlowExecuteDTO.FlowCallbackResult.builder()
            .data(builder.build())
            .build());
}

使用示例

resultFlux
    .doOnNext(result -> {
        context.sendStreamContent(result.getMessage());
        if (result.getTokens() != null) {
            totalTokens.addAndGet(result.getTokens());
        }
    })
    .doOnComplete(() -> {
        context.sendStreamEnd(node, totalTokens.get() > 0 ? totalTokens.get() : null);
    });
自动状态管理

sendStreamEnd() 方法会自动处理以下内容:节点状态更新为 SUCCESS、记录节点执行耗时、统计 Token 使用量、添加节点到已执行列表、发送完整节点列表给前端(用于流程可视化)。

流式方法总结对比

方法发送内容类型是否结束流自动更新节点状态典型使用时机
sendStreamContent()普通内容片段Flux.doOnNext()
sendStreamEnd()结束信号 + 统计信息Flux.doOnComplete()

流式处理标准模式

@Override
protected Dict doStreamProcess(AiNodeDefinition node, AiFlowContext context) throws Exception {
    AtomicReference<String> accumulated = new AtomicReference<>("");
    AtomicInteger totalTokens = new AtomicInteger(0);

    try {
        Flux<AiMessageResultDTO> resultFlux = someAiService.process(request);

        resultFlux
            .doOnNext(result -> {
                String content = result.getMessage();
                if (StrUtil.isNotBlank(content)) {
                    accumulated.updateAndGet(c -> c + content);
                    context.sendStreamContent(content);
                }
                if (result.getTokens() != null) {
                    totalTokens.addAndGet(result.getTokens());
                }
            })
            .doOnComplete(() -> {
                context.sendStreamEnd(node, totalTokens.get() > 0 ? totalTokens.get() : null);
            })
            .doOnError(error -> {
                log.error("流式处理错误", error);
            })
            .onErrorComplete()
            .blockLast();

        return Dict.create()
            .set(FlowConstant.CONTENT, accumulated.get())
            .set(FlowConstant.TOKENS, totalTokens.get())
            .set(FlowConstant.TIMESTAMP, System.currentTimeMillis());

    } catch (Exception e) {
        log.error("节点执行失败", e);
        throw FlowException.nodeError(node.getId(), e.getMessage());
    }
}

流式处理优势

使用 AbstractStreamingNodeComponent 和统一的流式方法具有以下优势:

  1. 自动状态管理sendStreamEnd() 自动更新节点状态为成功、记录耗时、统计 Token
  2. 代码简化:相比手动构建 FlowCallbackResult,代码量减少 60-75%
  3. 一致性:所有流式节点使用相同的模式,易于维护
  4. 错误处理:统一的错误处理机制,确保错误信息正确传递
  5. 双模式支持:同一节点自动支持流式和非流式两种模式

常见节点类型参考

基于现有代码,以下是不同类型节点的推荐基类选择:

节点类型基类选择原因
LLMAbstractStreamingNodeComponent需要流式输出 AI 生成内容
RAGAbstractStreamingNodeComponent检索后流式输出增强的回答
MCPAbstractStreamingNodeComponent工具调用可能返回流式结果
OCRAbstractStreamingNodeComponent识别过程可能较长,支持流式反馈
HTTPAbstractAiFlowNodeComponent普通请求,一次性返回
DBAbstractAiFlowNodeComponent数据库查询,一次性返回
TextAbstractAiFlowNodeComponent文本模板渲染,一次性返回
CodeAbstractAiFlowNodeComponent代码执行,一次性返回
StructuredAbstractAiFlowNodeComponentJSON 生成,一次性返回