18339543638 il y a 4 ans
Parent
commit
a0628919fb

+ 6 - 0
jetlinks-components/logging-component/pom.xml

@@ -20,6 +20,12 @@
             <version>${hsweb.framework.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-core</artifactId>
+            <version>${hsweb.framework.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.jetlinks.community</groupId>
             <artifactId>elasticsearch-component</artifactId>

+ 2 - 2
jetlinks-components/logging-component/src/main/java/org/jetlinks/community/logging/access/SerializableAccessLog.java

@@ -125,8 +125,8 @@ public class SerializableAccessLog implements Serializable {
         accessLog.getHttpHeaders().remove("X_Access_Token");
         accessLog.getHttpHeaders().remove(HttpHeaders.AUTHORIZATION);
 
-        accessLog.setException(info.getException() == null ? ""
-            : StringUtils.throwable2String(info.getException()));
+        accessLog.setException(info.getException() == null ? "":info.getException().getMessage());
+//            : StringUtils.throwable2String(info.getException()));
         Map<String, Object> newParameter = info.getParameters()
             .entrySet()
             .stream()

+ 4 - 3
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -21,6 +21,7 @@ import org.jetlinks.core.device.AuthenticationResponse;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.MqttAuthenticationRequest;
+import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.CommonDeviceMessage;
 import org.jetlinks.core.message.CommonDeviceMessageReply;
 import org.jetlinks.core.message.DeviceMessage;
@@ -80,12 +81,15 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
 
     private final DeviceGatewayHelper helper;
 
+    private final EventBus eventBus;
     public MqttServerDeviceGateway(String id,
+                                   EventBus eventBus,
                                    DeviceRegistry registry,
                                    DeviceSessionManager sessionManager,
                                    MqttServer mqttServer,
                                    DecodedClientMessageHandler messageHandler,
                                    Mono<ProtocolSupport> customProtocol) {
+        this.eventBus=eventBus;
         this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id);
         this.id = id;
         this.registry = registry;
@@ -159,9 +163,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
             }));
     }
 
-    public static void main(String[] args) {
-
-    }
     //处理认证结果
     private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleAuthResponse(DeviceOperator device,
                                                                                                    AuthenticationResponse resp,

+ 6 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGatewayProvider.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.network.mqtt.gateway.device;
 
+import lombok.AllArgsConstructor;
 import org.jetlinks.community.gateway.DeviceGateway;
 import org.jetlinks.community.gateway.supports.DeviceGatewayProperties;
 import org.jetlinks.community.gateway.supports.DeviceGatewayProvider;
@@ -9,8 +10,10 @@ import org.jetlinks.community.network.NetworkType;
 import org.jetlinks.community.network.mqtt.server.MqttServer;
 import org.jetlinks.core.ProtocolSupports;
 import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 
@@ -26,6 +29,8 @@ public class MqttServerDeviceGatewayProvider implements DeviceGatewayProvider {
     private final DecodedClientMessageHandler messageHandler;
 
     private final ProtocolSupports protocolSupports;
+    @Autowired
+    private EventBus eventBus;
 
     public MqttServerDeviceGatewayProvider(NetworkManager networkManager,
                                            DeviceRegistry registry,
@@ -61,6 +66,7 @@ public class MqttServerDeviceGatewayProvider implements DeviceGatewayProvider {
             .<MqttServer>getNetwork(getNetworkType(), properties.getNetworkId())
             .map(mqttServer -> new MqttServerDeviceGateway(
                 properties.getId(),
+                eventBus,
                 registry,
                 sessionManager,
                 mqttServer,

+ 4 - 10
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -136,10 +136,6 @@ class VertxMqttConnection implements MqttConnection {
             }
             //心跳检测 todo 改为配置文件配置
             heartIdleHandler(endpoint,100,TimeUnit.SECONDS);
-            //放置遗言
-            this.willMessage = this.getWillMessage();
-            //获取设备id
-//            String deviceId=endpoint.clientIdentifier();
         } catch (Exception e) {
             close().subscribe();
             log.warn(e.getMessage(), e);
@@ -222,16 +218,14 @@ class VertxMqttConnection implements MqttConnection {
             })
             .publishHandler(msg -> {
                 ping();
-                VertxMqttPublishing publishing = new VertxMqttPublishing(msg,false);
+                VertxMqttPublishing publishing = new VertxMqttPublishing(msg, false);
                 boolean hasDownstream = this.messageProcessor.hasDownstreams();
                 if (autoAckMsg || !hasDownstream) {
                     publishing.acknowledge();
-//                    this.publishingFluxSink.next(publishing);
                 }
-                this.publishingFluxSink.next(publishing);
-//                if (hasDownstream) {
-//                    this.publishingFluxSink.next(publishing);
-//                }
+                if (hasDownstream) {
+                    this.publishingFluxSink.next(publishing);
+                }
             })
             //QoS 1 PUBACK
             .publishAcknowledgeHandler(messageId -> {

+ 6 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java

@@ -5,6 +5,7 @@ import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
 import org.jetlinks.community.device.measurements.DeviceDashboardDefinition;
 import org.jetlinks.community.device.measurements.DeviceObjectDefinition;
 import org.jetlinks.community.device.service.LocalDeviceInstanceService;
+import org.jetlinks.community.device.service.data.DeviceDataService;
 import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
 import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.community.micrometer.MeterRegistryManager;
@@ -17,15 +18,16 @@ import reactor.core.publisher.Mono;
 @Component
 public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
 
-
     private final MeterRegistry registry;
-
+    private final DeviceDataService dataService;
     public DeviceStatusMeasurementProvider(MeterRegistryManager registryManager,
                                            LocalDeviceInstanceService instanceService,
                                            TimeSeriesManager timeSeriesManager,
+                                           DeviceDataService deviceDataService,
                                            EventBus eventBus) {
         super(DeviceDashboardDefinition.instance, DeviceObjectDefinition.status);
 
+        this.dataService=deviceDataService;
         addMeasurement(new DeviceStatusChangeMeasurement(timeSeriesManager, eventBus));
 
         addMeasurement(new DeviceStatusRecordMeasurement(instanceService, timeSeriesManager));
@@ -42,6 +44,7 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
                 .counter("online", "productId", productId)
                 .increment();
         });
+
     }
 
     @Subscribe("/device/*/*/offline")
@@ -51,6 +54,7 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
             registry
                 .counter("offline", "productId", productId)
                 .increment();
+
         });
     }
 

+ 69 - 31
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -15,6 +15,7 @@ import org.jetlinks.community.device.enums.DeviceFeature;
 import org.jetlinks.community.device.enums.DeviceState;
 import org.jetlinks.community.device.response.DeviceDeployResult;
 import org.jetlinks.community.device.response.DeviceDetail;
+import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.community.utils.ErrorUtils;
 import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceOperator;
@@ -23,17 +24,20 @@ import org.jetlinks.core.enums.ErrorCode;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;
 import org.jetlinks.core.exception.DeviceOperationException;
+import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.DeviceMessageReply;
 import org.jetlinks.core.message.FunctionInvokeMessageSender;
 import org.jetlinks.core.message.WritePropertyMessageSender;
 import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
 import org.jetlinks.core.message.property.ReadPropertyMessageReply;
+import org.jetlinks.core.message.property.ReportPropertyMessage;
 import org.jetlinks.core.message.property.WritePropertyMessageReply;
-import org.jetlinks.core.metadata.ConfigMetadata;
-import org.jetlinks.core.metadata.PropertyMetadata;
+import org.jetlinks.core.metadata.*;
 import org.jetlinks.core.metadata.types.StringType;
 import org.reactivestreams.Publisher;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -72,8 +76,8 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
     @Override
     public Mono<SaveResult> save(Publisher<DeviceInstanceEntity> entityPublisher) {
         return Flux.from(entityPublisher)
-                   .doOnNext(instance -> instance.setState(null))
-                   .as(super::save);
+            .doOnNext(instance -> instance.setState(null))
+            .as(super::save);
     }
 
 
@@ -96,13 +100,13 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                         if (MapUtils.isNotEmpty(product.getConfiguration())) {
                             if (MapUtils.isNotEmpty(device.getConfiguration())) {
                                 product.getConfiguration()
-                                       .keySet()
-                                       .forEach(device.getConfiguration()::remove);
+                                    .keySet()
+                                    .forEach(device.getConfiguration()::remove);
                             }
                             //重置注册中心里的配置
                             return registry.getDevice(deviceId)
-                                           .flatMap(opts -> opts.removeConfigs(product.getConfiguration().keySet()))
-                                           .then();
+                                .flatMap(opts -> opts.removeConfigs(product.getConfiguration().keySet()))
+                                .then();
                         }
                         return Mono.empty();
                     }).then(
@@ -186,9 +190,9 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             .flatMap(product -> registry
                 .unregisterDevice(id)
                 .then(createUpdate()
-                          .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
-                          .where(DeviceInstanceEntity::getId, id)
-                          .execute()));
+                    .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
+                    .where(DeviceInstanceEntity::getId, id)
+                    .execute()));
     }
 
     /**
@@ -199,12 +203,12 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
      */
     public Mono<Integer> unregisterDevice(String id) {
         return this.findById(Mono.just(id))
-                   .flatMap(device -> registry
-                       .unregisterDevice(id)
-                       .then(createUpdate()
-                                 .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
-                                 .where(DeviceInstanceEntity::getId, id)
-                                 .execute()));
+            .flatMap(device -> registry
+                .unregisterDevice(id)
+                .then(createUpdate()
+                    .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
+                    .where(DeviceInstanceEntity::getId, id)
+                    .execute()));
     }
 
     /**
@@ -215,12 +219,12 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
      */
     public Mono<Integer> unregisterDevice(Publisher<String> ids) {
         return Flux.from(ids)
-                   .flatMap(id -> registry.unregisterDevice(id).thenReturn(id))
-                   .collectList()
-                   .flatMap(list -> createUpdate()
-                       .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
-                       .where().in(DeviceInstanceEntity::getId, list)
-                       .execute());
+            .flatMap(id -> registry.unregisterDevice(id).thenReturn(id))
+            .collectList()
+            .flatMap(list -> createUpdate()
+                .set(DeviceInstanceEntity::getState, DeviceState.notActive.getValue())
+                .where().in(DeviceInstanceEntity::getId, list)
+                .execute());
     }
 
     protected Mono<DeviceDetail> createDeviceDetail(DeviceProductEntity product,
@@ -281,11 +285,11 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             .findById(deviceId)
             .zipWhen(device -> deviceProductService.findById(device.getProductId()))//合并型号
             .zipWith(tagRepository
-                         .createQuery()
-                         .where(DeviceTagEntity::getDeviceId, deviceId)
-                         .fetch()
-                         .collectList()
-                         .defaultIfEmpty(Collections.emptyList()) //合并标签
+                    .createQuery()
+                    .where(DeviceTagEntity::getDeviceId, deviceId)
+                    .fetch()
+                    .collectList()
+                    .defaultIfEmpty(Collections.emptyList()) //合并标签
                 , (left, right) -> Tuples.of(left.getT2(), left.getT1(), right))
             .flatMap(tp3 -> createDeviceDetail(tp3.getT1(), tp3.getT2(), tp3.getT3()));
     }
@@ -366,9 +370,9 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                                 .defaultIfEmpty(0)
                         )
                         .thenReturn(deviceId
-                                        .stream()
-                                        .map(id -> DeviceStateInfo.of(id, state))
-                                        .collect(Collectors.toList()));
+                            .stream()
+                            .map(id -> DeviceStateInfo.of(id, state))
+                            .collect(Collectors.toList()));
                 }));
     }
 
@@ -441,6 +445,40 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
             });
     }
 
+    @Subscribe("/device/*/*/message/property/report")
+    @Transactional(propagation = Propagation.NEVER)
+    public Mono<Void> writeProperties(DeviceMessage deviceMessage){
+        return registry.getDevice(deviceMessage.getDeviceId())
+            .flatMap(DeviceOperator::getMetadata)
+            .switchIfEmpty(Mono.just(new SimpleDeviceMetadata()))
+            .doOnNext(metadataMono -> {
+                ReportPropertyMessage message= (ReportPropertyMessage) deviceMessage;
+                List<PropertyMetadata> properties = metadataMono.getProperties();
+                if(properties==null||properties.size()<1){
+                    return;
+                }
+                Map<String, Object> sendProperties = message.getProperties();
+                Map<String, List<PropertyMetadata>> metadataNameMap = properties.parallelStream()
+                    .collect(Collectors.groupingBy(Metadata::getId));
+                //获取有效的属性值
+                Set<String> existProperties = sendProperties.keySet().stream().filter(name -> metadataNameMap.containsKey(name)).collect(Collectors.toSet());
+                List<String> validateProperties = existProperties.stream().filter(propertyName -> {
+                    Object value = sendProperties.get(propertyName);
+                    PropertyMetadata propertyMetadata = Optional.ofNullable(Optional.ofNullable(metadataNameMap.get(propertyName)).orElse(new ArrayList<>()).get(0)).orElse(new SimplePropertyMetadata());
+                    DataType valueType = propertyMetadata.getValueType();
+                    try {
+                        valueType.validate(value);
+                        return true;
+                    } catch (Exception e) {
+                        return false;
+                    }
+                }).collect(Collectors.toList());
+                Map<String, Object> map = new HashMap<>();
+                validateProperties.forEach(key->map.put(key,sendProperties.get(key)));
+                writeProperties(deviceMessage.getDeviceId(),map);
+            }).then();
+}
+
     //设备功能调用
     @SneakyThrows
     public Flux<?> invokeFunction(String deviceId,

+ 2 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java

@@ -183,7 +183,8 @@ public class DefaultDeviceDataService implements DeviceDataService {
     public Mono<Void> saveDeviceMessage(@Nonnull DeviceMessage message) {
         return this
             .getDeviceStrategy(message.getDeviceId())
-            .flatMap(strategy -> strategy.saveDeviceMessage(message));
+            .flatMap(strategy ->
+                strategy.saveDeviceMessage(message));
     }
 
     @Nonnull

+ 1 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceMessageController.java

@@ -82,8 +82,7 @@ public class DeviceMessageController {
                             .productId(property)
                             .build()
                             .withValue(dataType, value));
-                })))
-            ;
+                })));
 
     }