Ver Fonte

fixed 集群设备消息上报 消息重复存储

18339543638 há 4 anos atrás
pai
commit
7b9d8e5150

+ 3 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java

@@ -26,6 +26,7 @@ import java.util.Optional;
 @AllArgsConstructor
 public class TimeSeriesMessageWriterConnector {
     private static final String FIRST="first";
+    public static final String ONLY_READ="onlyRead";
     private final DeviceDataService dataService;
     private final RedisClusterManager clusterManager;
     private final EventBus eventBus;
@@ -37,6 +38,7 @@ public class TimeSeriesMessageWriterConnector {
      */
     @Subscribe(topics = "/device/**", id = "device-message-ts-writer")
     public Mono<Void> writeDeviceMessageToTs(DeviceMessage message) {
+
         try {
             Boolean first =Boolean.valueOf( String.valueOf(message.getHeader(FIRST).isPresent()?message.getHeader(FIRST):true));
             String deviceId = message.getDeviceId();
@@ -60,6 +62,7 @@ public class TimeSeriesMessageWriterConnector {
             String topic = message.getTopic();
             DeviceMessage deviceMessage= (DeviceMessage) message.getMessage();
             deviceMessage.addHeader(FIRST,false);
+            deviceMessage.addHeader(ONLY_READ,false);
             eventBus.publish(topic,deviceMessage).subscribe();
         }).subscribe();
     }

+ 6 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java

@@ -13,6 +13,7 @@ import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
 import org.jetlinks.community.device.entity.DeviceProperty;
 import org.jetlinks.community.device.enums.DeviceLogType;
 import org.jetlinks.community.device.events.handler.ValueTypeTranslator;
+import org.jetlinks.community.device.message.writer.TimeSeriesMessageWriterConnector;
 import org.jetlinks.community.device.service.LocalDeviceInstanceService;
 import org.jetlinks.community.gateway.DeviceMessageUtils;
 import org.jetlinks.community.timeseries.TimeSeriesData;
@@ -117,7 +118,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
     @Nonnull
     @Override
     public Mono<Void> saveDeviceMessage(@Nonnull DeviceMessage message) {
-        return this
+        return message.getHeader(TimeSeriesMessageWriterConnector.ONLY_READ).isPresent() ?Mono.never(): this
             .convertMessageToTimeSeriesData(message)
             .flatMap(tp2 -> doSaveData(tp2.getT1(), tp2.getT2()))
             .then();
@@ -127,6 +128,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
     @Override
     public Mono<Void> saveDeviceMessage(@Nonnull Publisher<DeviceMessage> message) {
         return Flux.from(message)
+            .filter(msg->msg.getHeader(TimeSeriesMessageWriterConnector.ONLY_READ).isPresent())
             .flatMap(this::convertMessageToTimeSeriesData)
             .groupBy(Tuple2::getT1, Integer.MAX_VALUE)
             .flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2)))
@@ -141,6 +143,9 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
     protected Mono<Tuple2<String, TimeSeriesData>> createDeviceMessageLog(String productId,
                                                                           DeviceMessage message,
                                                                           BiConsumer<DeviceMessage, DeviceOperationLogEntity> logEntityConsumer) {
+        if(message.getHeader(TimeSeriesMessageWriterConnector.ONLY_READ).isPresent()){
+            return Mono.never();
+        }
         DeviceOperationLogEntity operationLog = new DeviceOperationLogEntity();
         operationLog.setId(IDGenerator.SNOW_FLAKE_STRING.generate());
         operationLog.setDeviceId(message.getDeviceId());

+ 2 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java

@@ -5,6 +5,8 @@ import org.hswebframework.web.api.crud.entity.QueryParamEntity;
 import org.jetlinks.community.device.entity.DeviceEvent;
 import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
 import org.jetlinks.community.device.entity.DeviceProperty;
+import org.jetlinks.community.device.message.writer.TimeSeriesMessageWriterConnector;
+import org.jetlinks.community.gateway.external.socket.WebSocketMessagingHandler;
 import org.jetlinks.community.timeseries.query.AggregationData;
 import org.jetlinks.core.Value;
 import org.jetlinks.core.device.DeviceOperator;