zhouzeyu 7de2926acc 修改项目包结构 2 viikkoa sitten
..
src 2b3585f361 添加项目 3 viikkoa sitten
FRONTEND_GUIDE.md 2b3585f361 添加项目 3 viikkoa sitten
README.md 7de2926acc 修改项目包结构 2 viikkoa sitten
pom.xml 2b3585f361 添加项目 3 viikkoa sitten

README.md

TR Spring Boot SSE插件

概述

TR Spring Boot SSE插件是一个基于Spring Boot的Server-Sent Events (SSE)实现,提供了实时推送功能。该插件支持客户端订阅主题、事件广播、心跳检测和连接管理等功能。

功能特性

  • 客户端主题订阅与取消订阅
  • 单播(定向推送)和广播(主题推送)事件
  • 心跳检测与超时处理
  • 连接状态监控
  • 虚拟线程支持(JDK 21+)
  • Spring事件机制集成
  • 完善的异常处理

安装

在您的Spring Boot项目中添加以下依赖:


<dependency>
    <groupId>cn.tr.plugin</groupId>
    <artifactId>cn-spring-boot-starter-plugin-sse</artifactId>
    <version>2.0.0</version>
</dependency>

核心组件

1. SseService接口

核心服务接口,提供以下方法:

  • subscribeTopic: 客户端订阅主题
  • unsubscribe: 客户端取消订阅
  • sendTopicEventToClient: 向指定客户端发送主题事件
  • broadcastTopicEvent: 广播主题事件给所有订阅者
  • sendRequest: 向指定客户端发送请求
  • getActiveClientCount: 获取活跃客户端数量
  • getClientSubscribedTopics: 获取客户端订阅的主题
  • isClientConnected: 检查客户端是否连接

2. DTO对象

  • SseEventDTO: SSE事件数据传输对象
  • SseTopicSubscribeDTO: 主题订阅请求对象
  • SseUnsubscribeDTO: 取消订阅请求对象
  • SseHeartbeatDTO: 心跳请求对象
  • SseRequestResponseDTO: 请求-响应对象

3. 控制器

SseController提供以下REST接口:

  • POST /sse/subscribe: 客户端订阅主题
  • POST /sse/unsubscribe: 客户端取消订阅
  • POST /sse/ping: 客户端发送心跳
  • GET /sse/clients/count: 获取活跃客户端数量
  • GET /sse/clients/{clientId}/topics: 获取客户端订阅的主题

使用示例

1. 客户端订阅

客户端通过发送POST请求到/sse/subscribe来订阅主题:

{
  "clientId": "client-001",
  "topics": ["notification", "system_alert"]
}

2. 服务端发送事件

使用SseEventPublisher发送事件:

@Autowired
private SseEventPublisher sseEventPublisher;

// 向指定客户端发送事件
sseEventPublisher.publishTopicEvent("client-001", "notification", "Hello World");

// 广播事件给所有订阅者
sseEventPublisher.broadcastTopicEvent("system_alert", "System maintenance at 2 AM");

3. 客户端接收事件

客户端通过EventSource API接收事件:

const eventSource = new EventSource('/sse/subscribe');
eventSource.addEventListener('notification', function(event) {
    console.log('Received notification:', event.data);
});

配置

插件支持以下配置项:

# 心跳超时时间(毫秒)
sse:
  heartbeat:
    timeout: 30000
  # 心跳检测间隔(毫秒)
  interval: 10000

错误码

  • D0001: 服务异常
  • D0002: 连接异常
  • D0003: 事件发布异常
  • D0004: 订阅异常
  • D0005: 客户端不存在
  • D0006: 参数校验失败

技术细节

1. 线程安全

插件使用ConcurrentHashMapCopyOnWriteArraySet确保线程安全。

2. 虚拟线程支持

在JDK 21+环境中,心跳检测使用虚拟线程以提高性能。

3. 连接管理

通过SseEmitter的回调机制管理连接生命周期:

  • onCompletion: 连接完成时的处理
  • onTimeout: 连接超时时的处理
  • onError: 连接错误时的处理

注意事项

  1. 客户端需要定期发送心跳以维持连接
  2. 服务端会自动清理超时的连接
  3. 广播事件只会发送给订阅了相应主题的客户端
  4. 插件支持跨域配置,可根据需要进行调整

版本历史

2.0.0

  • 初始版本
  • 支持主题订阅和事件推送
  • 集成心跳检测机制
  • 提供完整的REST API

SSE Plugin 使用说明

SSE (Server-Sent Events) 插件是一个基于Spring Boot的实时消息推送组件,支持服务器向客户端推送实时事件。

功能特性

  • 支持客户端订阅特定主题
  • 支持向指定客户端发送主题事件
  • 支持广播主题事件给所有订阅者
  • 支持在订阅时传递额外参数
  • 订阅成功后自动发布订阅事件
  • 支持流式响应,不阻塞等待
  • 支持客户端管理与监控
  • 异步事件处理机制
  • 线程安全的客户端连接管理

安装依赖

在需要使用SSE功能的模块中添加以下依赖:


<dependency>
    <groupId>cn.tr.plugin</groupId>
    <artifactId>cn-spring-boot-starter-plugin-sse</artifactId>
    <version>{latest-version}</version>
</dependency>

基本用法

1. 客户端订阅主题

客户端订阅主题后,系统会不断产生相关主题事件并推送给客户端:

POST /sse/subscribe
Content-Type: application/json

{
  "clientId": "client-001",
  "topics": ["system_notification", "task_completed"],
  "extraParams": {
    "userId": "user123",
    "department": "IT"
  }
}

如果不指定主题列表,客户端将接收所有主题事件。

2. 客户端取消订阅

客户端可以通过以下方式取消订阅:

POST /sse/unsubscribe
Content-Type: application/json

{
  "clientId": "client-001",
  "eventType": "system_notification"
}

3. 查看活跃客户端数量

GET /sse/clients/count

服务端发送事件

1. 发布主题事件给指定客户端

注入SseEventPublisher并调用publishTopicEvent方法:

@Autowired
private SseEventPublisher sseEventPublisher;

// 发布主题事件给指定客户端
sseEventPublisher.publishTopicEvent("client-001", "system_notification", "Hello Client!");

2. 广播主题事件给所有订阅者

// 广播主题事件给所有订阅了该主题的客户端
sseEventPublisher.broadcastTopicEvent("system_notification", "Broadcast Message!");

主题订阅模式

在主题订阅模式下,客户端订阅特定主题后,服务端会持续产生相关主题事件并推送给客户端。

客户端订阅主题

// 订阅主题事件
fetch('/sse/subscribe', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json'
    },
    body: JSON.stringify({
        clientId: 'client-001',
        topics: ['system_notification', 'task_completed'],
        extraParams: {
            userId: 'user123',
            department: 'IT'
        }
    })
}).then(response => {
    // 处理SSE流
    const reader = response.body.getReader();
    // 处理数据流...
});

服务端发布主题事件

@RestController
@RequiredArgsConstructor
public class TopicController {
    
    private final SseEventPublisher sseEventPublisher;
    
    @PostMapping("/send-notification")
    public ResponseEntity<Void> sendNotification(@RequestParam String clientId, 
                                                 @RequestParam String message) {
        sseEventPublisher.publishTopicEvent(clientId, "system_notification", message);
        return ResponseEntity.ok().build();
    }
    
    @PostMapping("/broadcast-notification")
    public ResponseEntity<Void> broadcastNotification(@RequestParam String topic,
                                                      @RequestParam String message) {
        sseEventPublisher.broadcastTopicEvent(topic, message);
        return ResponseEntity.ok().build();
    }
}

订阅事件监听

当客户端成功订阅主题后,插件会自动发布订阅事件,您可以通过监听SseApplicationEvent来处理订阅事件:

@Component
@RequiredArgsConstructor
public class SubscribeEventListener {
    
    @EventListener
    public void handleSubscribeEvent(SseApplicationEvent event) {
        SseEventDTO sseEvent = event.getSseEvent();
        if ("subscribe".equals(sseEvent.getEventType())) {
            // 处理订阅事件
            SseTopicSubscribeDTO subscribeDTO = (SseTopicSubscribeDTO) sseEvent.getData();
            String clientId = subscribeDTO.getClientId();
            Set<String> topics = subscribeDTO.getTopics();
            Map<String, Object> extraParams = subscribeDTO.getExtraParams();
            
            // 根据订阅信息执行相应逻辑
            handleSubscription(clientId, topics, extraParams);
        }
    }
    
    private void handleSubscription(String clientId, Set<String> topics, Map<String, Object> extraParams) {
        // 处理订阅逻辑
        log.info("客户端 {} 订阅了主题 {},额外参数: {}", clientId, topics, extraParams);
        
        // 可以根据额外参数执行初始化操作
        if (extraParams != null && !extraParams.isEmpty()) {
            // 执行基于额外参数的初始化逻辑
        }
    }
}

API参考

订阅相关接口

接口 方法 路径 说明
订阅主题 POST /sse/subscribe 客户端订阅指定主题
取消订阅 POST /sse/unsubscribe 客户端取消订阅
查询活跃客户端数 GET /sse/clients/count 获取当前活跃客户端数量

服务端事件发布API

方法 参数 说明
publishTopicEvent clientId, topic, data 发布主题事件给指定客户端
broadcastTopicEvent topic, data 广播主题事件给所有订阅者

服务端API

方法 参数 说明
subscribeTopic subscribeDTO 客户端订阅主题
sendTopicEventToClient clientId, event 发送主题事件给指定客户端
broadcastTopicEvent event 广播主题事件给所有订阅者

示例代码

前端JavaScript完整示例

// 订阅主题事件
fetch('/sse/subscribe', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json'
    },
    body: JSON.stringify({
        clientId: 'client-001',
        topics: ['system_notification', 'task_completed'],
        extraParams: {
            userId: 'user123',
            department: 'IT',
            priority: 'high'
        }
    })
}).then(response => {
    // 处理SSE流
    const reader = response.body.getReader();
    // 处理数据流...
});

后端Java完整示例

@RestController
@RequiredArgsConstructor
public class SseExampleController {
    
    private final SseEventPublisher sseEventPublisher;
    
    // 发布主题事件
    @PostMapping("/publish-topic")
    public ResponseEntity<Void> publishTopic(@RequestParam String clientId,
                                             @RequestParam String topic,
                                             @RequestBody Object data) {
        sseEventPublisher.publishTopicEvent(clientId, topic, data);
        return ResponseEntity.ok().build();
    }
    
    // 广播主题事件
    @PostMapping("/broadcast-topic")
    public ResponseEntity<Void> broadcastTopic(@RequestParam String topic,
                                               @RequestBody Object data) {
        sseEventPublisher.broadcastTopicEvent(topic, data);
        return ResponseEntity.ok().build();
    }
}

@Component
@RequiredArgsConstructor
public class SubscribeEventListener {
    
    @EventListener
    public void handleSubscribeEvent(SseApplicationEvent event) {
        SseEventDTO sseEvent = event.getSseEvent();
        if ("subscribe".equals(sseEvent.getEventType())) {
            // 处理订阅事件
            SseTopicSubscribeDTO subscribeDTO = (SseTopicSubscribeDTO) sseEvent.getData();
            String clientId = subscribeDTO.getClientId();
            Set<String> topics = subscribeDTO.getTopics();
            Map<String, Object> extraParams = subscribeDTO.getExtraParams();
            
            // 根据订阅信息执行相应逻辑
            // 例如:初始化用户数据、建立用户会话等
            initializeUserSession(clientId, topics, extraParams);
        }
    }
    
    private void initializeUserSession(String clientId, Set<String> topics, Map<String, Object> extraParams) {
        // 根据额外参数初始化用户会话
        if (extraParams != null) {
            Object userId = extraParams.get("userId");
            Object department = extraParams.get("department");
            
            // 执行用户会话初始化逻辑
            log.info("为客户端 {} 初始化用户会话,用户ID: {}, 部门: {}", clientId, userId, department);
        }
    }
}

注意事项

  1. 客户端需要保持长连接以接收实时事件
  2. 服务端应处理好连接断开和超时情况
  3. 对于大量并发连接,需要合理配置线程池参数
  4. 事件数据会被序列化为JSON格式传输
  5. 使用流式响应避免阻塞等待,提高系统性能
  6. 主题订阅适用于持续推送事件的场景
  7. 订阅时的额外参数可用于初始化客户端状态或个性化设置