|
|
@@ -170,27 +170,26 @@ public class MediaDeviceController implements ReactiveServiceCrudController<Medi
|
|
|
log.debug("设备通道信息同步API调用,deviceId:" + id);
|
|
|
}
|
|
|
String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + id;
|
|
|
- MediaDevice device = redisCacheStorage.getDevice(id);
|
|
|
- if(device==null){
|
|
|
- return Mono.error(new BusinessException("设备已离线,无法更新通道最新信息"));
|
|
|
- }
|
|
|
-
|
|
|
- return messageBroker.handleReply(id,key,Duration.ofSeconds(10))
|
|
|
- .onErrorResume(DeviceOperationException.class,error-> {
|
|
|
- //设备下线
|
|
|
- mediaDeviceService.deviceOffline(device);
|
|
|
- return Mono.error(new BusinessException("设备响应超时"));
|
|
|
- })
|
|
|
- .mergeWith(cmder.catalogQuery(device, event -> {
|
|
|
- MediaMessageReply<String> reply = MediaMessageReply.of(String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg), null);
|
|
|
- reply.setSuccess(false);
|
|
|
- reply.setMessageId(key);
|
|
|
- reply.setDeviceId(id);
|
|
|
- messageBroker.reply(reply).subscribe();
|
|
|
- })
|
|
|
- .then(Mono.empty()))
|
|
|
- .flatMap(this::convertReply)
|
|
|
- .singleOrEmpty();
|
|
|
+ return mediaDeviceService.findById(id)
|
|
|
+ .switchIfEmpty(Mono.error(new BusinessException("设备已离线,无法更新通道最新信息")))
|
|
|
+ .flatMap(device->
|
|
|
+ messageBroker.handleReply(id,key,Duration.ofSeconds(10))
|
|
|
+ .onErrorResume(DeviceOperationException.class,error-> {
|
|
|
+ //设备下线
|
|
|
+ mediaDeviceService.deviceOffline(device);
|
|
|
+ return Mono.error(new BusinessException("设备响应超时"));
|
|
|
+ })
|
|
|
+ .mergeWith(cmder.catalogQuery(device, event -> {
|
|
|
+ MediaMessageReply<String> reply = MediaMessageReply.of(String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg), null);
|
|
|
+ reply.setSuccess(false);
|
|
|
+ reply.setMessageId(key);
|
|
|
+ reply.setDeviceId(id);
|
|
|
+ messageBroker.reply(reply).subscribe();
|
|
|
+ })
|
|
|
+ .then(Mono.empty()))
|
|
|
+ .flatMap(this::convertReply)
|
|
|
+ .singleOrEmpty()
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
private Flux<Object> convertReply(DeviceMessageReply reply){
|