# TR Spring Boot SSE插件
## 概述
TR Spring Boot SSE插件是一个基于Spring Boot的Server-Sent Events (SSE)实现,提供了实时推送功能。该插件支持客户端订阅主题、事件广播、心跳检测和连接管理等功能。
## 功能特性
- 客户端主题订阅与取消订阅
- 单播(定向推送)和广播(主题推送)事件
- 心跳检测与超时处理
- 连接状态监控
- 虚拟线程支持(JDK 21+)
- Spring事件机制集成
- 完善的异常处理
## 安装
在您的Spring Boot项目中添加以下依赖:
```xml
cn.tr.plugin
cn-spring-boot-starter-plugin-sse
2.0.0
```
## 核心组件
### 1. SseService接口
核心服务接口,提供以下方法:
- `subscribeTopic`: 客户端订阅主题
- `unsubscribe`: 客户端取消订阅
- `sendTopicEventToClient`: 向指定客户端发送主题事件
- `broadcastTopicEvent`: 广播主题事件给所有订阅者
- `sendRequest`: 向指定客户端发送请求
- `getActiveClientCount`: 获取活跃客户端数量
- `getClientSubscribedTopics`: 获取客户端订阅的主题
- `isClientConnected`: 检查客户端是否连接
### 2. DTO对象
- `SseEventDTO`: SSE事件数据传输对象
- `SseTopicSubscribeDTO`: 主题订阅请求对象
- `SseUnsubscribeDTO`: 取消订阅请求对象
- `SseHeartbeatDTO`: 心跳请求对象
- `SseRequestResponseDTO`: 请求-响应对象
### 3. 控制器
`SseController`提供以下REST接口:
- `POST /sse/subscribe`: 客户端订阅主题
- `POST /sse/unsubscribe`: 客户端取消订阅
- `POST /sse/ping`: 客户端发送心跳
- `GET /sse/clients/count`: 获取活跃客户端数量
- `GET /sse/clients/{clientId}/topics`: 获取客户端订阅的主题
## 使用示例
### 1. 客户端订阅
客户端通过发送POST请求到`/sse/subscribe`来订阅主题:
```json
{
"clientId": "client-001",
"topics": ["notification", "system_alert"]
}
```
### 2. 服务端发送事件
使用`SseEventPublisher`发送事件:
```java
@Autowired
private SseEventPublisher sseEventPublisher;
// 向指定客户端发送事件
sseEventPublisher.publishTopicEvent("client-001", "notification", "Hello World");
// 广播事件给所有订阅者
sseEventPublisher.broadcastTopicEvent("system_alert", "System maintenance at 2 AM");
```
### 3. 客户端接收事件
客户端通过EventSource API接收事件:
```javascript
const eventSource = new EventSource('/sse/subscribe');
eventSource.addEventListener('notification', function(event) {
console.log('Received notification:', event.data);
});
```
## 配置
插件支持以下配置项:
```yaml
# 心跳超时时间(毫秒)
sse:
heartbeat:
timeout: 30000
# 心跳检测间隔(毫秒)
interval: 10000
```
## 错误码
- `D0001`: 服务异常
- `D0002`: 连接异常
- `D0003`: 事件发布异常
- `D0004`: 订阅异常
- `D0005`: 客户端不存在
- `D0006`: 参数校验失败
## 技术细节
### 1. 线程安全
插件使用`ConcurrentHashMap`和`CopyOnWriteArraySet`确保线程安全。
### 2. 虚拟线程支持
在JDK 21+环境中,心跳检测使用虚拟线程以提高性能。
### 3. 连接管理
通过SseEmitter的回调机制管理连接生命周期:
- onCompletion: 连接完成时的处理
- onTimeout: 连接超时时的处理
- onError: 连接错误时的处理
## 注意事项
1. 客户端需要定期发送心跳以维持连接
2. 服务端会自动清理超时的连接
3. 广播事件只会发送给订阅了相应主题的客户端
4. 插件支持跨域配置,可根据需要进行调整
## 版本历史
### 2.0.0
- 初始版本
- 支持主题订阅和事件推送
- 集成心跳检测机制
- 提供完整的REST API
# SSE Plugin 使用说明
SSE (Server-Sent Events) 插件是一个基于Spring Boot的实时消息推送组件,支持服务器向客户端推送实时事件。
## 功能特性
- 支持客户端订阅特定主题
- 支持向指定客户端发送主题事件
- 支持广播主题事件给所有订阅者
- 支持在订阅时传递额外参数
- 订阅成功后自动发布订阅事件
- 支持流式响应,不阻塞等待
- 支持客户端管理与监控
- 异步事件处理机制
- 线程安全的客户端连接管理
## 安装依赖
在需要使用SSE功能的模块中添加以下依赖:
```xml
cn.tr.plugin
cn-spring-boot-starter-plugin-sse
{latest-version}
```
## 基本用法
### 1. 客户端订阅主题
客户端订阅主题后,系统会不断产生相关主题事件并推送给客户端:
```bash
POST /sse/subscribe
Content-Type: application/json
{
"clientId": "client-001",
"topics": ["system_notification", "task_completed"],
"extraParams": {
"userId": "user123",
"department": "IT"
}
}
```
如果不指定主题列表,客户端将接收所有主题事件。
### 2. 客户端取消订阅
客户端可以通过以下方式取消订阅:
```bash
POST /sse/unsubscribe
Content-Type: application/json
{
"clientId": "client-001",
"eventType": "system_notification"
}
```
### 3. 查看活跃客户端数量
```bash
GET /sse/clients/count
```
## 服务端发送事件
### 1. 发布主题事件给指定客户端
注入[SseEventPublisher](file:///F:/javaProject/tr-footstone/tr-plugins/tr-spring-boot-starter-plugin-sse/src/main/java/cn/tr/plugin/sse/SseEventPublisher.java#L16-L50)并调用[publishTopicEvent](file:///F:/javaProject/tr-footstone/tr-plugins/tr-spring-boot-starter-plugin-sse/src/main/java/cn/tr/plugin/sse/SseEventPublisher.java#L32-L44)方法:
```java
@Autowired
private SseEventPublisher sseEventPublisher;
// 发布主题事件给指定客户端
sseEventPublisher.publishTopicEvent("client-001", "system_notification", "Hello Client!");
```
### 2. 广播主题事件给所有订阅者
```java
// 广播主题事件给所有订阅了该主题的客户端
sseEventPublisher.broadcastTopicEvent("system_notification", "Broadcast Message!");
```
## 主题订阅模式
在主题订阅模式下,客户端订阅特定主题后,服务端会持续产生相关主题事件并推送给客户端。
### 客户端订阅主题
```
// 订阅主题事件
fetch('/sse/subscribe', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
clientId: 'client-001',
topics: ['system_notification', 'task_completed'],
extraParams: {
userId: 'user123',
department: 'IT'
}
})
}).then(response => {
// 处理SSE流
const reader = response.body.getReader();
// 处理数据流...
});
```
### 服务端发布主题事件
```
@RestController
@RequiredArgsConstructor
public class TopicController {
private final SseEventPublisher sseEventPublisher;
@PostMapping("/send-notification")
public ResponseEntity sendNotification(@RequestParam String clientId,
@RequestParam String message) {
sseEventPublisher.publishTopicEvent(clientId, "system_notification", message);
return ResponseEntity.ok().build();
}
@PostMapping("/broadcast-notification")
public ResponseEntity broadcastNotification(@RequestParam String topic,
@RequestParam String message) {
sseEventPublisher.broadcastTopicEvent(topic, message);
return ResponseEntity.ok().build();
}
}
```
## 订阅事件监听
当客户端成功订阅主题后,插件会自动发布订阅事件,您可以通过监听[SseApplicationEvent](file:///F:/javaProject/tr-footstone/tr-plugins/tr-spring-boot-starter-plugin-sse/src/main/java/cn/tr/plugin/sse/event/SseApplicationEvent.java#L12-L28)来处理订阅事件:
```
@Component
@RequiredArgsConstructor
public class SubscribeEventListener {
@EventListener
public void handleSubscribeEvent(SseApplicationEvent event) {
SseEventDTO sseEvent = event.getSseEvent();
if ("subscribe".equals(sseEvent.getEventType())) {
// 处理订阅事件
SseTopicSubscribeDTO subscribeDTO = (SseTopicSubscribeDTO) sseEvent.getData();
String clientId = subscribeDTO.getClientId();
Set topics = subscribeDTO.getTopics();
Map extraParams = subscribeDTO.getExtraParams();
// 根据订阅信息执行相应逻辑
handleSubscription(clientId, topics, extraParams);
}
}
private void handleSubscription(String clientId, Set topics, Map extraParams) {
// 处理订阅逻辑
log.info("客户端 {} 订阅了主题 {},额外参数: {}", clientId, topics, extraParams);
// 可以根据额外参数执行初始化操作
if (extraParams != null && !extraParams.isEmpty()) {
// 执行基于额外参数的初始化逻辑
}
}
}
```
## API参考
### 订阅相关接口
| 接口 | 方法 | 路径 | 说明 |
|------|------|------|------|
| 订阅主题 | POST | `/sse/subscribe` | 客户端订阅指定主题 |
| 取消订阅 | POST | `/sse/unsubscribe` | 客户端取消订阅 |
| 查询活跃客户端数 | GET | `/sse/clients/count` | 获取当前活跃客户端数量 |
### 服务端事件发布API
| 方法 | 参数 | 说明 |
|------|------|------|
| [publishTopicEvent](file:///F:/javaProject/tr-footstone/tr-plugins/tr-spring-boot-starter-plugin-sse/src/main/java/cn/tr/plugin/sse/SseEventPublisher.java#L32-L44) | clientId, topic, data | 发布主题事件给指定客户端 |
| [broadcastTopicEvent](file:///F:/javaProject/tr-footstone/tr-plugins/tr-spring-boot-starter-plugin-sse/src/main/java/cn/tr/plugin/sse/SseEventPublisher.java#L49-L50) | topic, data | 广播主题事件给所有订阅者 |
### 服务端API
| 方法 | 参数 | 说明 |
|------|------|------|
| subscribeTopic | subscribeDTO | 客户端订阅主题 |
| sendTopicEventToClient | clientId, event | 发送主题事件给指定客户端 |
| broadcastTopicEvent | event | 广播主题事件给所有订阅者 |
## 示例代码
### 前端JavaScript完整示例
```
// 订阅主题事件
fetch('/sse/subscribe', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
clientId: 'client-001',
topics: ['system_notification', 'task_completed'],
extraParams: {
userId: 'user123',
department: 'IT',
priority: 'high'
}
})
}).then(response => {
// 处理SSE流
const reader = response.body.getReader();
// 处理数据流...
});
```
### 后端Java完整示例
```
@RestController
@RequiredArgsConstructor
public class SseExampleController {
private final SseEventPublisher sseEventPublisher;
// 发布主题事件
@PostMapping("/publish-topic")
public ResponseEntity publishTopic(@RequestParam String clientId,
@RequestParam String topic,
@RequestBody Object data) {
sseEventPublisher.publishTopicEvent(clientId, topic, data);
return ResponseEntity.ok().build();
}
// 广播主题事件
@PostMapping("/broadcast-topic")
public ResponseEntity broadcastTopic(@RequestParam String topic,
@RequestBody Object data) {
sseEventPublisher.broadcastTopicEvent(topic, data);
return ResponseEntity.ok().build();
}
}
@Component
@RequiredArgsConstructor
public class SubscribeEventListener {
@EventListener
public void handleSubscribeEvent(SseApplicationEvent event) {
SseEventDTO sseEvent = event.getSseEvent();
if ("subscribe".equals(sseEvent.getEventType())) {
// 处理订阅事件
SseTopicSubscribeDTO subscribeDTO = (SseTopicSubscribeDTO) sseEvent.getData();
String clientId = subscribeDTO.getClientId();
Set topics = subscribeDTO.getTopics();
Map extraParams = subscribeDTO.getExtraParams();
// 根据订阅信息执行相应逻辑
// 例如:初始化用户数据、建立用户会话等
initializeUserSession(clientId, topics, extraParams);
}
}
private void initializeUserSession(String clientId, Set topics, Map extraParams) {
// 根据额外参数初始化用户会话
if (extraParams != null) {
Object userId = extraParams.get("userId");
Object department = extraParams.get("department");
// 执行用户会话初始化逻辑
log.info("为客户端 {} 初始化用户会话,用户ID: {}, 部门: {}", clientId, userId, department);
}
}
}
```
## 注意事项
1. 客户端需要保持长连接以接收实时事件
2. 服务端应处理好连接断开和超时情况
3. 对于大量并发连接,需要合理配置线程池参数
4. 事件数据会被序列化为JSON格式传输
5. 使用流式响应避免阻塞等待,提高系统性能
6. 主题订阅适用于持续推送事件的场景
7. 订阅时的额外参数可用于初始化客户端状态或个性化设置