# TR Spring Boot SSE插件 ## 概述 TR Spring Boot SSE插件是一个基于Spring Boot的Server-Sent Events (SSE)实现,提供了实时推送功能。该插件支持客户端订阅主题、事件广播、心跳检测和连接管理等功能。 ## 功能特性 - 客户端主题订阅与取消订阅 - 单播(定向推送)和广播(主题推送)事件 - 心跳检测与超时处理 - 连接状态监控 - 虚拟线程支持(JDK 21+) - Spring事件机制集成 - 完善的异常处理 ## 安装 在您的Spring Boot项目中添加以下依赖: ```xml cn.tr.plugin cn-spring-boot-starter-plugin-sse 2.0.0 ``` ## 核心组件 ### 1. SseService接口 核心服务接口,提供以下方法: - `subscribeTopic`: 客户端订阅主题 - `unsubscribe`: 客户端取消订阅 - `sendTopicEventToClient`: 向指定客户端发送主题事件 - `broadcastTopicEvent`: 广播主题事件给所有订阅者 - `sendRequest`: 向指定客户端发送请求 - `getActiveClientCount`: 获取活跃客户端数量 - `getClientSubscribedTopics`: 获取客户端订阅的主题 - `isClientConnected`: 检查客户端是否连接 ### 2. DTO对象 - `SseEventDTO`: SSE事件数据传输对象 - `SseTopicSubscribeDTO`: 主题订阅请求对象 - `SseUnsubscribeDTO`: 取消订阅请求对象 - `SseHeartbeatDTO`: 心跳请求对象 - `SseRequestResponseDTO`: 请求-响应对象 ### 3. 控制器 `SseController`提供以下REST接口: - `POST /sse/subscribe`: 客户端订阅主题 - `POST /sse/unsubscribe`: 客户端取消订阅 - `POST /sse/ping`: 客户端发送心跳 - `GET /sse/clients/count`: 获取活跃客户端数量 - `GET /sse/clients/{clientId}/topics`: 获取客户端订阅的主题 ## 使用示例 ### 1. 客户端订阅 客户端通过发送POST请求到`/sse/subscribe`来订阅主题: ```json { "clientId": "client-001", "topics": ["notification", "system_alert"] } ``` ### 2. 服务端发送事件 使用`SseEventPublisher`发送事件: ```java @Autowired private SseEventPublisher sseEventPublisher; // 向指定客户端发送事件 sseEventPublisher.publishTopicEvent("client-001", "notification", "Hello World"); // 广播事件给所有订阅者 sseEventPublisher.broadcastTopicEvent("system_alert", "System maintenance at 2 AM"); ``` ### 3. 客户端接收事件 客户端通过EventSource API接收事件: ```javascript const eventSource = new EventSource('/sse/subscribe'); eventSource.addEventListener('notification', function(event) { console.log('Received notification:', event.data); }); ``` ## 配置 插件支持以下配置项: ```yaml # 心跳超时时间(毫秒) sse: heartbeat: timeout: 30000 # 心跳检测间隔(毫秒) interval: 10000 ``` ## 错误码 - `D0001`: 服务异常 - `D0002`: 连接异常 - `D0003`: 事件发布异常 - `D0004`: 订阅异常 - `D0005`: 客户端不存在 - `D0006`: 参数校验失败 ## 技术细节 ### 1. 线程安全 插件使用`ConcurrentHashMap`和`CopyOnWriteArraySet`确保线程安全。 ### 2. 虚拟线程支持 在JDK 21+环境中,心跳检测使用虚拟线程以提高性能。 ### 3. 连接管理 通过SseEmitter的回调机制管理连接生命周期: - onCompletion: 连接完成时的处理 - onTimeout: 连接超时时的处理 - onError: 连接错误时的处理 ## 注意事项 1. 客户端需要定期发送心跳以维持连接 2. 服务端会自动清理超时的连接 3. 广播事件只会发送给订阅了相应主题的客户端 4. 插件支持跨域配置,可根据需要进行调整 ## 版本历史 ### 2.0.0 - 初始版本 - 支持主题订阅和事件推送 - 集成心跳检测机制 - 提供完整的REST API # SSE Plugin 使用说明 SSE (Server-Sent Events) 插件是一个基于Spring Boot的实时消息推送组件,支持服务器向客户端推送实时事件。 ## 功能特性 - 支持客户端订阅特定主题 - 支持向指定客户端发送主题事件 - 支持广播主题事件给所有订阅者 - 支持在订阅时传递额外参数 - 订阅成功后自动发布订阅事件 - 支持流式响应,不阻塞等待 - 支持客户端管理与监控 - 异步事件处理机制 - 线程安全的客户端连接管理 ## 安装依赖 在需要使用SSE功能的模块中添加以下依赖: ```xml cn.tr.plugin cn-spring-boot-starter-plugin-sse {latest-version} ``` ## 基本用法 ### 1. 客户端订阅主题 客户端订阅主题后,系统会不断产生相关主题事件并推送给客户端: ```bash POST /sse/subscribe Content-Type: application/json { "clientId": "client-001", "topics": ["system_notification", "task_completed"], "extraParams": { "userId": "user123", "department": "IT" } } ``` 如果不指定主题列表,客户端将接收所有主题事件。 ### 2. 客户端取消订阅 客户端可以通过以下方式取消订阅: ```bash POST /sse/unsubscribe Content-Type: application/json { "clientId": "client-001", "eventType": "system_notification" } ``` ### 3. 查看活跃客户端数量 ```bash GET /sse/clients/count ``` ## 服务端发送事件 ### 1. 发布主题事件给指定客户端 注入[SseEventPublisher](file:///F:/javaProject/tr-footstone/tr-plugins/tr-spring-boot-starter-plugin-sse/src/main/java/cn/tr/plugin/sse/SseEventPublisher.java#L16-L50)并调用[publishTopicEvent](file:///F:/javaProject/tr-footstone/tr-plugins/tr-spring-boot-starter-plugin-sse/src/main/java/cn/tr/plugin/sse/SseEventPublisher.java#L32-L44)方法: ```java @Autowired private SseEventPublisher sseEventPublisher; // 发布主题事件给指定客户端 sseEventPublisher.publishTopicEvent("client-001", "system_notification", "Hello Client!"); ``` ### 2. 广播主题事件给所有订阅者 ```java // 广播主题事件给所有订阅了该主题的客户端 sseEventPublisher.broadcastTopicEvent("system_notification", "Broadcast Message!"); ``` ## 主题订阅模式 在主题订阅模式下,客户端订阅特定主题后,服务端会持续产生相关主题事件并推送给客户端。 ### 客户端订阅主题 ``` // 订阅主题事件 fetch('/sse/subscribe', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ clientId: 'client-001', topics: ['system_notification', 'task_completed'], extraParams: { userId: 'user123', department: 'IT' } }) }).then(response => { // 处理SSE流 const reader = response.body.getReader(); // 处理数据流... }); ``` ### 服务端发布主题事件 ``` @RestController @RequiredArgsConstructor public class TopicController { private final SseEventPublisher sseEventPublisher; @PostMapping("/send-notification") public ResponseEntity sendNotification(@RequestParam String clientId, @RequestParam String message) { sseEventPublisher.publishTopicEvent(clientId, "system_notification", message); return ResponseEntity.ok().build(); } @PostMapping("/broadcast-notification") public ResponseEntity broadcastNotification(@RequestParam String topic, @RequestParam String message) { sseEventPublisher.broadcastTopicEvent(topic, message); return ResponseEntity.ok().build(); } } ``` ## 订阅事件监听 当客户端成功订阅主题后,插件会自动发布订阅事件,您可以通过监听[SseApplicationEvent](file:///F:/javaProject/tr-footstone/tr-plugins/tr-spring-boot-starter-plugin-sse/src/main/java/cn/tr/plugin/sse/event/SseApplicationEvent.java#L12-L28)来处理订阅事件: ``` @Component @RequiredArgsConstructor public class SubscribeEventListener { @EventListener public void handleSubscribeEvent(SseApplicationEvent event) { SseEventDTO sseEvent = event.getSseEvent(); if ("subscribe".equals(sseEvent.getEventType())) { // 处理订阅事件 SseTopicSubscribeDTO subscribeDTO = (SseTopicSubscribeDTO) sseEvent.getData(); String clientId = subscribeDTO.getClientId(); Set topics = subscribeDTO.getTopics(); Map extraParams = subscribeDTO.getExtraParams(); // 根据订阅信息执行相应逻辑 handleSubscription(clientId, topics, extraParams); } } private void handleSubscription(String clientId, Set topics, Map extraParams) { // 处理订阅逻辑 log.info("客户端 {} 订阅了主题 {},额外参数: {}", clientId, topics, extraParams); // 可以根据额外参数执行初始化操作 if (extraParams != null && !extraParams.isEmpty()) { // 执行基于额外参数的初始化逻辑 } } } ``` ## API参考 ### 订阅相关接口 | 接口 | 方法 | 路径 | 说明 | |------|------|------|------| | 订阅主题 | POST | `/sse/subscribe` | 客户端订阅指定主题 | | 取消订阅 | POST | `/sse/unsubscribe` | 客户端取消订阅 | | 查询活跃客户端数 | GET | `/sse/clients/count` | 获取当前活跃客户端数量 | ### 服务端事件发布API | 方法 | 参数 | 说明 | |------|------|------| | [publishTopicEvent](file:///F:/javaProject/tr-footstone/tr-plugins/tr-spring-boot-starter-plugin-sse/src/main/java/cn/tr/plugin/sse/SseEventPublisher.java#L32-L44) | clientId, topic, data | 发布主题事件给指定客户端 | | [broadcastTopicEvent](file:///F:/javaProject/tr-footstone/tr-plugins/tr-spring-boot-starter-plugin-sse/src/main/java/cn/tr/plugin/sse/SseEventPublisher.java#L49-L50) | topic, data | 广播主题事件给所有订阅者 | ### 服务端API | 方法 | 参数 | 说明 | |------|------|------| | subscribeTopic | subscribeDTO | 客户端订阅主题 | | sendTopicEventToClient | clientId, event | 发送主题事件给指定客户端 | | broadcastTopicEvent | event | 广播主题事件给所有订阅者 | ## 示例代码 ### 前端JavaScript完整示例 ``` // 订阅主题事件 fetch('/sse/subscribe', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ clientId: 'client-001', topics: ['system_notification', 'task_completed'], extraParams: { userId: 'user123', department: 'IT', priority: 'high' } }) }).then(response => { // 处理SSE流 const reader = response.body.getReader(); // 处理数据流... }); ``` ### 后端Java完整示例 ``` @RestController @RequiredArgsConstructor public class SseExampleController { private final SseEventPublisher sseEventPublisher; // 发布主题事件 @PostMapping("/publish-topic") public ResponseEntity publishTopic(@RequestParam String clientId, @RequestParam String topic, @RequestBody Object data) { sseEventPublisher.publishTopicEvent(clientId, topic, data); return ResponseEntity.ok().build(); } // 广播主题事件 @PostMapping("/broadcast-topic") public ResponseEntity broadcastTopic(@RequestParam String topic, @RequestBody Object data) { sseEventPublisher.broadcastTopicEvent(topic, data); return ResponseEntity.ok().build(); } } @Component @RequiredArgsConstructor public class SubscribeEventListener { @EventListener public void handleSubscribeEvent(SseApplicationEvent event) { SseEventDTO sseEvent = event.getSseEvent(); if ("subscribe".equals(sseEvent.getEventType())) { // 处理订阅事件 SseTopicSubscribeDTO subscribeDTO = (SseTopicSubscribeDTO) sseEvent.getData(); String clientId = subscribeDTO.getClientId(); Set topics = subscribeDTO.getTopics(); Map extraParams = subscribeDTO.getExtraParams(); // 根据订阅信息执行相应逻辑 // 例如:初始化用户数据、建立用户会话等 initializeUserSession(clientId, topics, extraParams); } } private void initializeUserSession(String clientId, Set topics, Map extraParams) { // 根据额外参数初始化用户会话 if (extraParams != null) { Object userId = extraParams.get("userId"); Object department = extraParams.get("department"); // 执行用户会话初始化逻辑 log.info("为客户端 {} 初始化用户会话,用户ID: {}, 部门: {}", clientId, userId, department); } } } ``` ## 注意事项 1. 客户端需要保持长连接以接收实时事件 2. 服务端应处理好连接断开和超时情况 3. 对于大量并发连接,需要合理配置线程池参数 4. 事件数据会被序列化为JSON格式传输 5. 使用流式响应避免阻塞等待,提高系统性能 6. 主题订阅适用于持续推送事件的场景 7. 订阅时的额外参数可用于初始化客户端状态或个性化设置