|
|
2 viikkoa sitten | |
|---|---|---|
| .. | ||
| src | 3 viikkoa sitten | |
| FRONTEND_GUIDE.md | 3 viikkoa sitten | |
| README.md | 2 viikkoa sitten | |
| pom.xml | 3 viikkoa sitten | |
TR Spring Boot SSE插件是一个基于Spring Boot的Server-Sent Events (SSE)实现,提供了实时推送功能。该插件支持客户端订阅主题、事件广播、心跳检测和连接管理等功能。
在您的Spring Boot项目中添加以下依赖:
<dependency>
<groupId>cn.tr.plugin</groupId>
<artifactId>cn-spring-boot-starter-plugin-sse</artifactId>
<version>2.0.0</version>
</dependency>
核心服务接口,提供以下方法:
subscribeTopic: 客户端订阅主题unsubscribe: 客户端取消订阅sendTopicEventToClient: 向指定客户端发送主题事件broadcastTopicEvent: 广播主题事件给所有订阅者sendRequest: 向指定客户端发送请求getActiveClientCount: 获取活跃客户端数量getClientSubscribedTopics: 获取客户端订阅的主题isClientConnected: 检查客户端是否连接SseEventDTO: SSE事件数据传输对象SseTopicSubscribeDTO: 主题订阅请求对象SseUnsubscribeDTO: 取消订阅请求对象SseHeartbeatDTO: 心跳请求对象SseRequestResponseDTO: 请求-响应对象SseController提供以下REST接口:
POST /sse/subscribe: 客户端订阅主题POST /sse/unsubscribe: 客户端取消订阅POST /sse/ping: 客户端发送心跳GET /sse/clients/count: 获取活跃客户端数量GET /sse/clients/{clientId}/topics: 获取客户端订阅的主题客户端通过发送POST请求到/sse/subscribe来订阅主题:
{
"clientId": "client-001",
"topics": ["notification", "system_alert"]
}
使用SseEventPublisher发送事件:
@Autowired
private SseEventPublisher sseEventPublisher;
// 向指定客户端发送事件
sseEventPublisher.publishTopicEvent("client-001", "notification", "Hello World");
// 广播事件给所有订阅者
sseEventPublisher.broadcastTopicEvent("system_alert", "System maintenance at 2 AM");
客户端通过EventSource API接收事件:
const eventSource = new EventSource('/sse/subscribe');
eventSource.addEventListener('notification', function(event) {
console.log('Received notification:', event.data);
});
插件支持以下配置项:
# 心跳超时时间(毫秒)
sse:
heartbeat:
timeout: 30000
# 心跳检测间隔(毫秒)
interval: 10000
D0001: 服务异常D0002: 连接异常D0003: 事件发布异常D0004: 订阅异常D0005: 客户端不存在D0006: 参数校验失败插件使用ConcurrentHashMap和CopyOnWriteArraySet确保线程安全。
在JDK 21+环境中,心跳检测使用虚拟线程以提高性能。
通过SseEmitter的回调机制管理连接生命周期:
SSE (Server-Sent Events) 插件是一个基于Spring Boot的实时消息推送组件,支持服务器向客户端推送实时事件。
在需要使用SSE功能的模块中添加以下依赖:
<dependency>
<groupId>cn.tr.plugin</groupId>
<artifactId>cn-spring-boot-starter-plugin-sse</artifactId>
<version>{latest-version}</version>
</dependency>
客户端订阅主题后,系统会不断产生相关主题事件并推送给客户端:
POST /sse/subscribe
Content-Type: application/json
{
"clientId": "client-001",
"topics": ["system_notification", "task_completed"],
"extraParams": {
"userId": "user123",
"department": "IT"
}
}
如果不指定主题列表,客户端将接收所有主题事件。
客户端可以通过以下方式取消订阅:
POST /sse/unsubscribe
Content-Type: application/json
{
"clientId": "client-001",
"eventType": "system_notification"
}
GET /sse/clients/count
注入SseEventPublisher并调用publishTopicEvent方法:
@Autowired
private SseEventPublisher sseEventPublisher;
// 发布主题事件给指定客户端
sseEventPublisher.publishTopicEvent("client-001", "system_notification", "Hello Client!");
// 广播主题事件给所有订阅了该主题的客户端
sseEventPublisher.broadcastTopicEvent("system_notification", "Broadcast Message!");
在主题订阅模式下,客户端订阅特定主题后,服务端会持续产生相关主题事件并推送给客户端。
// 订阅主题事件
fetch('/sse/subscribe', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
clientId: 'client-001',
topics: ['system_notification', 'task_completed'],
extraParams: {
userId: 'user123',
department: 'IT'
}
})
}).then(response => {
// 处理SSE流
const reader = response.body.getReader();
// 处理数据流...
});
@RestController
@RequiredArgsConstructor
public class TopicController {
private final SseEventPublisher sseEventPublisher;
@PostMapping("/send-notification")
public ResponseEntity<Void> sendNotification(@RequestParam String clientId,
@RequestParam String message) {
sseEventPublisher.publishTopicEvent(clientId, "system_notification", message);
return ResponseEntity.ok().build();
}
@PostMapping("/broadcast-notification")
public ResponseEntity<Void> broadcastNotification(@RequestParam String topic,
@RequestParam String message) {
sseEventPublisher.broadcastTopicEvent(topic, message);
return ResponseEntity.ok().build();
}
}
当客户端成功订阅主题后,插件会自动发布订阅事件,您可以通过监听SseApplicationEvent来处理订阅事件:
@Component
@RequiredArgsConstructor
public class SubscribeEventListener {
@EventListener
public void handleSubscribeEvent(SseApplicationEvent event) {
SseEventDTO sseEvent = event.getSseEvent();
if ("subscribe".equals(sseEvent.getEventType())) {
// 处理订阅事件
SseTopicSubscribeDTO subscribeDTO = (SseTopicSubscribeDTO) sseEvent.getData();
String clientId = subscribeDTO.getClientId();
Set<String> topics = subscribeDTO.getTopics();
Map<String, Object> extraParams = subscribeDTO.getExtraParams();
// 根据订阅信息执行相应逻辑
handleSubscription(clientId, topics, extraParams);
}
}
private void handleSubscription(String clientId, Set<String> topics, Map<String, Object> extraParams) {
// 处理订阅逻辑
log.info("客户端 {} 订阅了主题 {},额外参数: {}", clientId, topics, extraParams);
// 可以根据额外参数执行初始化操作
if (extraParams != null && !extraParams.isEmpty()) {
// 执行基于额外参数的初始化逻辑
}
}
}
| 接口 | 方法 | 路径 | 说明 |
|---|---|---|---|
| 订阅主题 | POST | /sse/subscribe |
客户端订阅指定主题 |
| 取消订阅 | POST | /sse/unsubscribe |
客户端取消订阅 |
| 查询活跃客户端数 | GET | /sse/clients/count |
获取当前活跃客户端数量 |
| 方法 | 参数 | 说明 |
|---|---|---|
| publishTopicEvent | clientId, topic, data | 发布主题事件给指定客户端 |
| broadcastTopicEvent | topic, data | 广播主题事件给所有订阅者 |
| 方法 | 参数 | 说明 |
|---|---|---|
| subscribeTopic | subscribeDTO | 客户端订阅主题 |
| sendTopicEventToClient | clientId, event | 发送主题事件给指定客户端 |
| broadcastTopicEvent | event | 广播主题事件给所有订阅者 |
// 订阅主题事件
fetch('/sse/subscribe', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
clientId: 'client-001',
topics: ['system_notification', 'task_completed'],
extraParams: {
userId: 'user123',
department: 'IT',
priority: 'high'
}
})
}).then(response => {
// 处理SSE流
const reader = response.body.getReader();
// 处理数据流...
});
@RestController
@RequiredArgsConstructor
public class SseExampleController {
private final SseEventPublisher sseEventPublisher;
// 发布主题事件
@PostMapping("/publish-topic")
public ResponseEntity<Void> publishTopic(@RequestParam String clientId,
@RequestParam String topic,
@RequestBody Object data) {
sseEventPublisher.publishTopicEvent(clientId, topic, data);
return ResponseEntity.ok().build();
}
// 广播主题事件
@PostMapping("/broadcast-topic")
public ResponseEntity<Void> broadcastTopic(@RequestParam String topic,
@RequestBody Object data) {
sseEventPublisher.broadcastTopicEvent(topic, data);
return ResponseEntity.ok().build();
}
}
@Component
@RequiredArgsConstructor
public class SubscribeEventListener {
@EventListener
public void handleSubscribeEvent(SseApplicationEvent event) {
SseEventDTO sseEvent = event.getSseEvent();
if ("subscribe".equals(sseEvent.getEventType())) {
// 处理订阅事件
SseTopicSubscribeDTO subscribeDTO = (SseTopicSubscribeDTO) sseEvent.getData();
String clientId = subscribeDTO.getClientId();
Set<String> topics = subscribeDTO.getTopics();
Map<String, Object> extraParams = subscribeDTO.getExtraParams();
// 根据订阅信息执行相应逻辑
// 例如:初始化用户数据、建立用户会话等
initializeUserSession(clientId, topics, extraParams);
}
}
private void initializeUserSession(String clientId, Set<String> topics, Map<String, Object> extraParams) {
// 根据额外参数初始化用户会话
if (extraParams != null) {
Object userId = extraParams.get("userId");
Object department = extraParams.get("department");
// 执行用户会话初始化逻辑
log.info("为客户端 {} 初始化用户会话,用户ID: {}, 部门: {}", clientId, userId, department);
}
}
}