|
@@ -51,26 +51,13 @@ public class TimeSeriesMessageWriterConnector {
|
|
|
UPGRADE_FIRMWARE_PROGRESS,UPDATE_TAG);
|
|
UPGRADE_FIRMWARE_PROGRESS,UPDATE_TAG);
|
|
|
}
|
|
}
|
|
|
/**
|
|
/**
|
|
|
- * 上行消息
|
|
|
|
|
|
|
+ * 订阅设备消息 入库
|
|
|
*
|
|
*
|
|
|
* @param message 设备消息
|
|
* @param message 设备消息
|
|
|
* @return void
|
|
* @return void
|
|
|
*/
|
|
*/
|
|
|
@Subscribe(topics = "/device/**", id = "device-message-ts-writer")
|
|
@Subscribe(topics = "/device/**", id = "device-message-ts-writer")
|
|
|
public Mono<Void> writeDeviceMessageToTs(DeviceMessage message) {
|
|
public Mono<Void> writeDeviceMessageToTs(DeviceMessage message) {
|
|
|
- String deviceId = message.getDeviceId();
|
|
|
|
|
- String productId = String.valueOf(message.getHeader("productId").get());
|
|
|
|
|
- if(upMessages.contains(message.getMessageType())){
|
|
|
|
|
- try {
|
|
|
|
|
- String deviceMessageTopic = DeviceMessageConnector.createDeviceMessageTopic(productId, deviceId, message);
|
|
|
|
|
- Boolean first =Boolean.valueOf( String.valueOf(message.getHeader(FIRST).isPresent()?message.getHeader(FIRST):true));
|
|
|
|
|
- if(!Boolean.FALSE.equals(first)){
|
|
|
|
|
- clusterManager.getTopic(deviceMessageTopic).publish(Flux.just(message)).subscribe();
|
|
|
|
|
- }
|
|
|
|
|
- }catch (Exception e){
|
|
|
|
|
- log.error("message : {} ,cluster share failed",message);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
return dataService.saveDeviceMessage(message);
|
|
return dataService.saveDeviceMessage(message);
|
|
|
}
|
|
}
|
|
|
|
|
|