|
@@ -0,0 +1,143 @@
|
|
|
|
|
+package cn.tr.module.phototherapy.common.service.impl;
|
|
|
|
|
+
|
|
|
|
|
+import cn.tr.module.phototherapy.common.dto.DeviceStatusPushDTO;
|
|
|
|
|
+import cn.tr.module.phototherapy.common.dto.TherapyDurationPushDTO;
|
|
|
|
|
+import cn.tr.module.phototherapy.common.service.DeviceSseService;
|
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
|
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
|
|
|
|
+
|
|
|
|
|
+import java.io.IOException;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
+import java.util.concurrent.Executors;
|
|
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * @ClassName DeviceSseServiceImpl
|
|
|
|
|
+ * @Description 设备SSE服务实现类
|
|
|
|
|
+ * @Date 2026/1/28 9:33
|
|
|
|
|
+ * @Version 1.0.0
|
|
|
|
|
+ */
|
|
|
|
|
+@Slf4j
|
|
|
|
|
+@Service
|
|
|
|
|
+public class DeviceSseServiceImpl implements DeviceSseService {
|
|
|
|
|
+ private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
|
|
|
|
|
+ // 心跳线程池
|
|
|
|
|
+ private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1);
|
|
|
|
|
+ // 超时时间:30分钟(避免Long.MAX_VALUE导致内存泄漏)
|
|
|
|
|
+ private static final long SSE_TIMEOUT = 30 * 60 * 1000L;
|
|
|
|
|
+ // 心跳间隔:40秒(维持连接不被断开)
|
|
|
|
|
+ private static final long HEARTBEAT_INTERVAL = 40 * 1000L;
|
|
|
|
|
+
|
|
|
|
|
+ // 全局连接标识
|
|
|
|
|
+ private static final String GLOBAL_ADMIN_ID = "hospital_admin";
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public SseEmitter createConnection(String uniqueId) {
|
|
|
|
|
+ // 先移除旧连接(避免重复连接)
|
|
|
|
|
+ if (emitters.containsKey(uniqueId)) {
|
|
|
|
|
+ SseEmitter oldEmitter = emitters.remove(uniqueId);
|
|
|
|
|
+ oldEmitter.complete(); // 关闭旧连接
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 创建新的Emitter,设置30分钟超时
|
|
|
|
|
+ SseEmitter emitter = new SseEmitter(SSE_TIMEOUT);
|
|
|
|
|
+ log.info("创建SSE连接,标识:{}", uniqueId);
|
|
|
|
|
+
|
|
|
|
|
+ // 连接回调:清理资源
|
|
|
|
|
+ emitter.onCompletion(() -> {
|
|
|
|
|
+ emitters.remove(uniqueId);
|
|
|
|
|
+ log.info("SSE连接完成,清理标识{}连接", uniqueId);
|
|
|
|
|
+ });
|
|
|
|
|
+ emitter.onTimeout(() -> {
|
|
|
|
|
+ emitters.remove(uniqueId);
|
|
|
|
|
+ log.warn("SSE连接超时,清理标识{}连接", uniqueId);
|
|
|
|
|
+ });
|
|
|
|
|
+ emitter.onError((ex) -> {
|
|
|
|
|
+ emitters.remove(uniqueId);
|
|
|
|
|
+ log.error("SSE连接异常,清理标识{}连接", uniqueId, ex);
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // 启动心跳任务,维持连接
|
|
|
|
|
+ startHeartbeat(uniqueId, emitter);
|
|
|
|
|
+
|
|
|
|
|
+ // 存入连接池
|
|
|
|
|
+ emitters.put(uniqueId, emitter);
|
|
|
|
|
+
|
|
|
|
|
+ return emitter;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void pushDeviceStatus(String patientUniqueId, DeviceStatusPushDTO data) {
|
|
|
|
|
+ sendEvent(patientUniqueId, "DEVICE_STATUS_CHANGE", data);
|
|
|
|
|
+
|
|
|
|
|
+ // 同步推送给全局连接
|
|
|
|
|
+ sendGlobalEvent("DEVICE_STATUS_CHANGE", data);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void pushTherapyDuration(String patientUniqueId, TherapyDurationPushDTO data) {
|
|
|
|
|
+ sendEvent(patientUniqueId, "THERAPY_DURATION_UPDATE", data);
|
|
|
|
|
+
|
|
|
|
|
+ // 同步推送给全局连接
|
|
|
|
|
+ sendGlobalEvent("THERAPY_DURATION_UPDATE", data);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 通用发送方法(抽离重复逻辑)
|
|
|
|
|
+ * @param uniqueId 患者ID/全局标识
|
|
|
|
|
+ * @param eventName 事件名称
|
|
|
|
|
+ * @param data 推送数据
|
|
|
|
|
+ * @param <T> 数据类型
|
|
|
|
|
+ */
|
|
|
|
|
+ private <T> void sendEvent(String uniqueId, String eventName, T data) {
|
|
|
|
|
+ SseEmitter emitter = emitters.get(uniqueId);
|
|
|
|
|
+ if (emitter == null) {
|
|
|
|
|
+ log.warn("标识{}无活跃SSE连接,推送{}失败", uniqueId, eventName);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ emitter.send(SseEmitter.event()
|
|
|
|
|
+ .name(eventName)
|
|
|
|
|
+ .data(data));
|
|
|
|
|
+ log.info("标识{}推送{}成功,数据:{}", uniqueId, eventName, data);
|
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
|
+ log.error("标识{}推送{}失败", uniqueId, eventName, e);
|
|
|
|
|
+ emitters.remove(uniqueId); // 移除失效连接
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 新增:推送给全局管理员连接的通用方法
|
|
|
|
|
+ * @param eventName 事件名称
|
|
|
|
|
+ * @param data 推送数据
|
|
|
|
|
+ * @param <T> 数据类型
|
|
|
|
|
+ */
|
|
|
|
|
+ private <T> void sendGlobalEvent(String eventName, T data) {
|
|
|
|
|
+ // 直接调用通用发送方法,目标标识为全局管理员ID
|
|
|
|
|
+ sendEvent(GLOBAL_ADMIN_ID, eventName, data);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 心跳任务:每30秒发送空事件维持连接
|
|
|
|
|
+ * @param uniqueId 唯一标识(患者ID/全局标识)
|
|
|
|
|
+ * @param emitter SSE连接实例
|
|
|
|
|
+ */
|
|
|
|
|
+ private void startHeartbeat(String uniqueId, SseEmitter emitter) {
|
|
|
|
|
+ heartbeatExecutor.scheduleAtFixedRate(() -> {
|
|
|
|
|
+ if (!emitters.containsKey(uniqueId)) {
|
|
|
|
|
+ return; // 连接已清理,停止心跳
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 发送注释类型的心跳(前端不解析,仅维持连接)
|
|
|
|
|
+ emitter.send(SseEmitter.event().comment("heartbeat"));
|
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
|
+ log.error("标识{}心跳发送失败,清理连接", uniqueId, e);
|
|
|
|
|
+ emitters.remove(uniqueId);
|
|
|
|
|
+ }
|
|
|
|
|
+ }, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
|
|
|
|
|
+ }
|
|
|
|
|
+}
|