|
|
@@ -0,0 +1,181 @@
|
|
|
+package com.nb.core.ai.Utils;
|
|
|
+
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
|
|
+import reactor.core.publisher.Flux;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.function.Consumer;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Reactive Stream 转换工具类
|
|
|
+ * 将 Reactor Flux 转换为 Spring MVC 的 SseEmitter
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+public class ReactiveStreamConverter {
|
|
|
+
|
|
|
+ // 线程池,用于异步处理 SSE 事件发送(使用缓存线程池,根据需要创建新线程,空闲线程会被回收)
|
|
|
+ private static final ExecutorService executor = Executors.newCachedThreadPool();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将 Flux 转换为 SseEmitter
|
|
|
+ *
|
|
|
+ * @param flux 数据流
|
|
|
+ * @param dataConsumer 数据处理回调(可选)
|
|
|
+ * @return SseEmitter
|
|
|
+ */
|
|
|
+ public static SseEmitter toSseEmitter(Flux<String> flux, Consumer<String> dataConsumer, String requestId) {
|
|
|
+ // 创建 SSE 发射器,设置 60 秒超时时间
|
|
|
+ // 超时时间需要根据实际业务场景调整,对于长文本生成可以设置更长
|
|
|
+ SseEmitter emitter = new SseEmitter(60_000L);
|
|
|
+
|
|
|
+ // 在线程池中异步执行流处理
|
|
|
+ executor.execute(() -> {
|
|
|
+ try {
|
|
|
+ log.info("开始处理SSE流,请求ID: {}", requestId);
|
|
|
+
|
|
|
+ // 订阅 Flux 流,处理三种事件:数据、错误、完成
|
|
|
+ flux.subscribe(
|
|
|
+ // 数据到达时的处理逻辑
|
|
|
+ data -> {
|
|
|
+ try {
|
|
|
+ // 如果提供了数据处理回调,则调用它
|
|
|
+ // 可以用于数据清洗、统计、日志记录等
|
|
|
+ if (dataConsumer != null) {
|
|
|
+ dataConsumer.accept(data);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 构建 SSE 事件并发送到客户端
|
|
|
+ // 每个事件包含数据、ID、事件类型和时间戳注释
|
|
|
+ SseEmitter.SseEventBuilder event = SseEmitter.event()
|
|
|
+ .data(data) // 事件数据内容
|
|
|
+ .id(String.valueOf(System.currentTimeMillis())) // 事件唯一ID
|
|
|
+ .name("message") // 事件类型名称
|
|
|
+ .comment("timestamp: " + LocalDateTime.now()); // 时间戳注释
|
|
|
+ emitter.send(event);
|
|
|
+
|
|
|
+ // 记录调试日志,便于问题排查
|
|
|
+ log.debug("发送SSE数据,请求ID: {}, 数据长度: {}", requestId, data.length());
|
|
|
+ } catch (IOException e) {
|
|
|
+ // 发送失败时的错误处理
|
|
|
+ log.error("发送SSE数据失败,请求ID: {}", requestId, e);
|
|
|
+ emitter.completeWithError(e);
|
|
|
+ }
|
|
|
+ },
|
|
|
+ // 流发生错误时的处理逻辑
|
|
|
+ error -> {
|
|
|
+ log.error("SSE流处理错误,请求ID: {}", requestId, error);
|
|
|
+ try {
|
|
|
+ // 构建错误事件通知客户端
|
|
|
+ SseEmitter.SseEventBuilder errorEvent = SseEmitter.event()
|
|
|
+ .data("{\"error\": \"" + error.getMessage() + "\"}") // 错误信息JSON格式
|
|
|
+ .name("error") // 错误事件类型
|
|
|
+ .comment("timestamp: " + LocalDateTime.now()); // 错误发生时间
|
|
|
+ emitter.send(errorEvent);
|
|
|
+ emitter.completeWithError(error);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ // 发送错误事件失败时的处理
|
|
|
+ log.error("发送SSE错误事件失败,请求ID: {}", requestId, ex);
|
|
|
+ emitter.completeWithError(ex);
|
|
|
+ }
|
|
|
+ },
|
|
|
+ // 流正常完成时的处理逻辑
|
|
|
+ () -> {
|
|
|
+ log.info("SSE流处理完成,请求ID: {}", requestId);
|
|
|
+ try {
|
|
|
+ // 构建完成事件通知客户端
|
|
|
+ SseEmitter.SseEventBuilder completeEvent = SseEmitter.event()
|
|
|
+ .data("{\"status\": \"completed\"}") // 完成状态信息
|
|
|
+ .name("complete") // 完成事件类型
|
|
|
+ .comment("timestamp: " + LocalDateTime.now()); // 完成时间
|
|
|
+ emitter.send(completeEvent);
|
|
|
+ emitter.complete();
|
|
|
+ } catch (IOException e) {
|
|
|
+ // 发送完成事件失败时的处理
|
|
|
+ log.error("发送SSE完成事件失败,请求ID: {}", requestId, e);
|
|
|
+ emitter.completeWithError(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ );
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 流执行过程中的异常处理
|
|
|
+ log.error("SSE流执行异常,请求ID: {}", requestId, e);
|
|
|
+ emitter.completeWithError(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ // 流完成时的回调(无论是正常完成还是异常完成)
|
|
|
+ emitter.onCompletion(() ->
|
|
|
+ log.info("SSE流完成回调,请求ID: {}", requestId));
|
|
|
+
|
|
|
+ // 流超时时的回调
|
|
|
+ emitter.onTimeout(() -> {
|
|
|
+ log.warn("SSE流超时,请求ID: {}", requestId);
|
|
|
+ try {
|
|
|
+ // 发送超时事件通知客户端
|
|
|
+ SseEmitter.SseEventBuilder timeoutEvent = SseEmitter.event()
|
|
|
+ .data("{\"error\": \"请求超时\"}") // 超时错误信息
|
|
|
+ .name("timeout"); // 超时事件类型
|
|
|
+ emitter.send(timeoutEvent);
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("发送SSE超时事件失败,请求ID: {}", requestId, e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ // 流发生错误时的回调
|
|
|
+ emitter.onError(throwable ->
|
|
|
+ log.error("SSE流错误回调,请求ID: {}", requestId, throwable));
|
|
|
+
|
|
|
+ return emitter;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将 Flux 数据流转换为 SseEmitter - 简化版本
|
|
|
+ * 包含数据处理回调,自动生成请求ID
|
|
|
+ *
|
|
|
+ * @param flux 数据流
|
|
|
+ * @param dataConsumer 数据处理回调函数
|
|
|
+ * @return SseEmitter Spring MVC 的服务器发送事件发射器
|
|
|
+ *
|
|
|
+ * @example
|
|
|
+ * // 使用示例:
|
|
|
+ * Flux<String> flux = streamAIService.generateStream(message);
|
|
|
+ * SseEmitter emitter = ReactiveStreamConverter.toSseEmitter(
|
|
|
+ * flux,
|
|
|
+ * data -> System.out.println("收到数据: " + data)
|
|
|
+ * );
|
|
|
+ */
|
|
|
+ public static SseEmitter toSseEmitter(Flux<String> flux, Consumer<String> dataConsumer) {
|
|
|
+ // 自动生成请求ID:req_ + 当前时间戳
|
|
|
+ return toSseEmitter(flux, dataConsumer, "req_" + System.currentTimeMillis());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将 Flux 数据流转换为 SseEmitter - 最简版本
|
|
|
+ * 不包含数据处理回调,自动生成请求ID
|
|
|
+ * 适用于简单的流转换场景
|
|
|
+ * @param flux 数据流
|
|
|
+ * @return SseEmitter Spring MVC 的服务器发送事件发射器
|
|
|
+ */
|
|
|
+ public static SseEmitter toSseEmitter(Flux<String> flux) {
|
|
|
+ // 调用完整版本,dataConsumer 参数为 null
|
|
|
+ return toSseEmitter(flux, null, "req_" + System.currentTimeMillis());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 关闭工具类的线程池
|
|
|
+ * 注意:在应用程序关闭时调用,释放线程资源
|
|
|
+ *
|
|
|
+ * @example
|
|
|
+ * // 使用示例(在Spring Bean的销毁方法中):
|
|
|
+ * @PreDestroy
|
|
|
+ * public void destroy() {
|
|
|
+ * ReactiveStreamConverter.shutdown();
|
|
|
+ * }
|
|
|
+ */
|
|
|
+ public static void shutdown() {
|
|
|
+ log.info("关闭ReactiveStreamConverter线程池");
|
|
|
+ executor.shutdown();
|
|
|
+ }
|
|
|
+}
|