common-websocket 模块使用

什么是 WebSocket

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,使得客户端和服务器之间可以实时双向传输数据。

核心特性

  1. 心跳保活机制: 前端每 30 秒发送 ping 心跳,后端响应 pong,确保连接活性
  2. 自动重连: 连接断开后自动重连(默认最多 6 次,间隔 5 秒)
  3. 分布式支持: 通过 Redis Pub/Sub 实现多实例消息分发
  4. 用户级推送: 基于用户 ID 精准推送消息
  5. 广播功能: 支持全员广播消息

前端配置开启

开启 WebSocket 转发

在网关配置中开启 WebSocket 转发功能:

引入 WebSocket 组件

在主页面(如 App.vueLayout.vue)引入 WebSocket 组件:

<template>
  <!-- 其他页面内容 -->
  <global-websocket
    uri="/admin/ws/info"
    v-if="websocketEnable"
    @rollback="rollback"
  />
</template>

<script setup lang="ts">
import { defineAsyncComponent } from 'vue';

// 异步加载 WebSocket 组件
const GlobalWebsocket = defineAsyncComponent(
  () => import("/@/components/Websocket/index.vue")
);

// 控制是否启用 WebSocket (可从配置中读取)
const websocketEnable = ref(true);

// 接收到消息的回调函数
const rollback = (msg: string) => {
  console.log('收到 WebSocket 消息:', msg);

  // 方式1: 使用消息存储 (如示例)
  useMsg().setMsg({
    label: "websocket消息",
    value: msg,
    time: formatAxis(new Date()),
  });

  // 方式2: 自定义业务处理
  // handleBusinessMessage(msg);
};
</script>

组件参数说明

参数类型必填说明
uriStringWebSocket 连接地址(相对路径)
@rollbackFunction接收到消息时的回调函数

连接机制详解

连接 URL 格式:

ws://[域名][baseURL][uri]?access_token=[token]&TENANT-ID=[tenantId] 或 wss://[域名][baseURL][uri]?access_token=[token]&TENANT-ID=[tenantId]
  • 协议自动选择: HTTPS 页面使用 wss://,HTTP 页面使用 ws://
  • 认证参数: 自动携带用户 token 和租户 ID
  • 完整示例: ws://localhost:8888/api/admin/ws/info?access_token=xxx&TENANT-ID=1

心跳与重连配置

前端组件内置了以下配置(位于 pigx-ui-pro/src/components/Websocket/index.vue):

const state = reactive({
  lockReconnect: false,     // 重连锁,避免多次重连
  maxReconnect: 6,          // 最大重连次数 (-1 表示无限重连)
  reconnectTime: 0,         // 当前重连尝试次数
  heartbeat: {
    interval: 30 * 1000,    // 心跳间隔: 30秒
    timeout: 10 * 1000,     // 心跳超时: 10秒
    pingMessage: JSON.stringify({ type: 'ping' }), // 心跳消息格式
  },
});

心跳流程:

  1. 连接建立后,每隔 30 秒发送 {"type":"ping"} 消息
  2. 后端收到后立即返回 {"type":"pong"} 响应
  3. 如果 10 秒内未收到响应,则认为连接断开,触发重连
  4. 收到任何消息(包括 pong)都会重置心跳定时器

重连流程:

  1. 检测到连接断开(onclose/onerror)
  2. 等待 5 秒后尝试重连
  3. 重连成功后重置 reconnectTime 计数器
  4. 达到最大重连次数后停止尝试

后端配置与使用

引入依赖

在需要使用 WebSocket 的微服务模块的 pom.xml 中添加依赖:

<!--websocket 支持-->
<dependency>
  <groupId>com.pig4cloud</groupId>
  <artifactId>pigx-common-websocket</artifactId>
</dependency>

配置参数

application.yml 或 Nacos 配置中心添加以下配置(均为可选,有默认值):

pigx:
  websocket:
    path: /ws/info                    # WebSocket 端点路径 (默认: /ws/info)
    allow-origins: "*"                # 允许的跨域源 (默认: *)
    heartbeat: true                   # 是否启用心跳 (默认: true)
    map-session: true                 # 是否启用 Session 映射 (默认: true)
    message-distributor: redis        # 消息分发器类型: local | redis (默认: local)
    send-time-limit: 10000            # 发送时间限制,单位毫秒 (默认: 10000)
    send-buffer-size-limit: 64000     # 发送缓冲区大小限制 (默认: 64000)

配置项说明:

配置项类型默认值说明
pathString/ws/infoWebSocket 连接路径
allow-originsString*CORS 允许的源
heartbeatBooleantrue是否启用心跳检测
map-sessionBooleantrue是否记录用户 Session 映射关系
message-distributorStringlocal消息分发模式:• local: 单机模式,直接发送• redis: 分布式模式,通过 Redis Pub/Sub 分发
send-time-limitInteger10000WebSocket 消息发送超时时间(毫秒)
send-buffer-size-limitInteger64000WebSocket 发送缓冲区大小限制

服务端向客户端发送消息

发送消息给指定用户

import com.pig4cloud.pigx.common.core.util.SpringContextHolder;
import com.pig4cloud.pigx.common.websocket.distribute.MessageDO;
import com.pig4cloud.pigx.common.websocket.distribute.MessageDistributor;
import cn.hutool.core.collection.CollUtil;

// 获取消息分发器 (自动根据配置注入 LocalMessageDistributor 或 RedisMessageDistributor)
MessageDistributor messageDistributor = SpringContextHolder.getBean(MessageDistributor.class);

// 构建消息对象
MessageDO messageDO = new MessageDO();
messageDO.setNeedBroadcast(Boolean.FALSE);  // 非广播模式
messageDO.setSessionKeys(CollUtil.newArrayList(1, 2, 3));  // 目标用户 ID 列表
messageDO.setMessageText("您有一条新的待办事项,请及时处理");  // 消息内容

// 发送消息
messageDistributor.distribute(messageDO);

关键说明:

  • sessionKeys: 接收消息的用户 ID 列表(对应数据库中的用户 ID)
  • needBroadcast: 设置为 false 表示定向发送
  • messageText: 发送的文本消息内容

广播消息给所有在线用户

// 方式1: 使用 MessageDO 手动构建
MessageDO messageDO = new MessageDO();
messageDO.setNeedBroadcast(Boolean.TRUE);  // 广播模式
messageDO.setMessageText("系统将于 10 分钟后进行维护,请及时保存数据");
messageDistributor.distribute(messageDO);

// 方式2: 使用便捷方法
MessageDO broadcastMsg = MessageDO.broadcastMessage("系统维护通知: 服务将在 10 分钟后重启");
messageDistributor.distribute(broadcastMsg);

实际业务场景示例

场景1: 工作流审批通知

@Service
@RequiredArgsConstructor
public class FlowNotifyService {

    private final MessageDistributor messageDistributor;

    /**
     * 发送审批通知
     */
    public void sendApprovalNotice(Long userId, String taskName) {
        MessageDO messageDO = new MessageDO();
        messageDO.setNeedBroadcast(Boolean.FALSE);
        messageDO.setSessionKeys(CollUtil.newArrayList(userId));
        messageDO.setMessageText(String.format("您有新的审批任务【%s】待处理", taskName));
        messageDistributor.distribute(messageDO);
    }
}

场景2: 批量通知多个用户

/**
 * 通知项目成员
 */
public void notifyProjectMembers(List<Long> memberIds, String message) {
    MessageDO messageDO = new MessageDO();
    messageDO.setNeedBroadcast(Boolean.FALSE);
    messageDO.setSessionKeys(new ArrayList<>(memberIds));  // 批量用户ID
    messageDO.setMessageText(message);
    messageDistributor.distribute(messageDO);
}

场景3: 系统公告

/**
 * 发布系统公告
 */
public void publishSystemAnnouncement(String announcement) {
    // 广播给所有在线用户
    MessageDO messageDO = MessageDO.broadcastMessage(announcement);
    messageDistributor.distribute(messageDO);
}

服务端接收客户端发送的消息

默认行为

common-websocket 模块默认只会将客户端发送的消息输出到日志,不做任何业务处理:

// CustomPlanTextMessageHandler.java (默认实现)
@Override
public void handle(WebSocketSession session, String message) {
    log.info("sessionId {} ,msg {}", session.getId(), message);
}

自定义消息处理

如果需要处理客户端发送的消息,可以实现 PlanTextMessageHandler 接口:

import com.pig4cloud.pigx.common.websocket.handler.PlanTextMessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;

@Slf4j
@Component
public class BizPlanTextMessageHandler implements PlanTextMessageHandler {

    @Override
    public void handle(WebSocketSession session, String message) {
        log.info("收到客户端消息 - SessionId: {}, Message: {}", session.getId(), message);

        // 获取用户信息 (从 session attributes 中)
        Object userAttr = session.getAttributes().get("USER_KEY_ATTR_NAME");
        if (userAttr instanceof PigxUser) {
            PigxUser user = (PigxUser) userAttr;
            log.info("发送消息的用户ID: {}, 用户名: {}", user.getId(), user.getUsername());
        }

        // 示例: 解析消息并处理
        try {
            // 假设客户端发送 JSON 格式消息
            JSONObject jsonMessage = JSONUtil.parseObj(message);
            String type = jsonMessage.getStr("type");
            String content = jsonMessage.getStr("content");

            // 根据消息类型处理业务逻辑
            switch (type) {
                case "chat":
                    // 处理聊天消息
                    handleChatMessage(session, content);
                    break;
                case "command":
                    // 处理指令消息
                    handleCommand(session, content);
                    break;
                default:
                    log.warn("未知消息类型: {}", type);
            }
        } catch (Exception e) {
            log.error("处理 WebSocket 消息异常", e);
        }
    }

    private void handleChatMessage(WebSocketSession session, String content) {
        // 实现聊天消息处理逻辑
        log.info("处理聊天消息: {}", content);
    }

    private void handleCommand(WebSocketSession session, String content) {
        // 实现指令处理逻辑
        log.info("处理命令: {}", content);
    }
}

Session Key 生成规则

PigX 默认使用用户 ID 作为 Session 唯一标识(PigxSessionKeyGenerator):

@Override
public Object sessionKey(WebSocketSession webSocketSession) {
    Object obj = webSocketSession.getAttributes().get("USER_KEY_ATTR_NAME");

    if (obj instanceof PigxUser) {
        PigxUser user = (PigxUser) obj;
        // 使用用户 ID 作为唯一标识
        return String.valueOf(user.getId());
    }

    return null;
}

自定义 Session Key 规则

如果需要自定义(如使用 租户ID+用户ID 组合),可以实现 SessionKeyGenerator 接口:

@Component
public class CustomSessionKeyGenerator implements SessionKeyGenerator {

    @Override
    public Object sessionKey(WebSocketSession webSocketSession) {
        Object obj = webSocketSession.getAttributes().get("USER_KEY_ATTR_NAME");

        if (obj instanceof PigxUser) {
            PigxUser user = (PigxUser) obj;
            // 自定义规则: 租户ID + 用户ID
            return user.getTenantId() + "_" + user.getId();
        }

        return null;
    }
}

架构建议