浏览代码

add 下发指令

18339543638 4 年之前
父节点
当前提交
312f9427a0

+ 5 - 0
jetlinks-components/common-component/pom.xml

@@ -18,6 +18,11 @@
             <version>${jetlinks.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-json</artifactId>
+        </dependency>
+        
         <dependency>
             <groupId>org.hswebframework.web</groupId>
             <artifactId>hsweb-authorization-api</artifactId>

+ 18 - 0
jetlinks-components/common-component/src/main/java/org/jetlinks/community/annotation/MessageCodec.java

@@ -0,0 +1,18 @@
+package org.jetlinks.community.annotation;
+
+import java.lang.annotation.*;
+import org.hswebframework.ezorm.rdb.mapping.annotation.*;
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MessageCodec.java
+ * @Description TODO
+ * @createTime 2021年11月24日 14:25:00
+ */
+@Target({ElementType.FIELD, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+@Documented
+@Codec
+public @interface MessageCodec {
+}

+ 36 - 0
jetlinks-components/common-component/src/main/java/org/jetlinks/community/annotation/MessageValueCodec.java

@@ -0,0 +1,36 @@
+package org.jetlinks.community.annotation;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import lombok.Data;
+import org.hswebframework.ezorm.core.ValueCodec;
+import org.jetlinks.core.message.CommonDeviceMessage;
+import org.jetlinks.core.message.Message;
+import org.jetlinks.core.message.MessageType;
+import org.springframework.beans.BeanUtils;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName MessageValueCodec.java
+ * @Description TODO
+ * @createTime 2021年11月24日 14:29:00
+ */
+@Data
+public class MessageValueCodec implements ValueCodec<Object, Object> {
+    @Override
+    public Object encode(Object value) {
+        return JSON.toJSONString(value);
+    }
+
+    @Override
+    public Object decode(Object data) {
+        JSONObject jsonObject = JSON.parseObject(String.valueOf(data));
+        Object msgType = jsonObject.get("messageType");
+        MessageType messageType = MessageType.of(String.valueOf(msgType)).orElse(MessageType.UNKNOWN);
+        Message result = messageType.getNewInstance().get();
+        CommonDeviceMessage commonDeviceMessage = jsonObject.toJavaObject(CommonDeviceMessage.class);
+        BeanUtils.copyProperties(commonDeviceMessage,result);
+        return result;
+    }
+}

+ 20 - 0
jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/CodecConfiguration.java

@@ -0,0 +1,20 @@
+package org.jetlinks.community.configuration;
+
+import org.hswebframework.ezorm.rdb.mapping.parser.DefaultValueCodecResolver;
+import org.jetlinks.community.annotation.MessageCodec;
+import org.jetlinks.community.annotation.MessageValueCodec;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName CodecConfiguration.java
+ * @Description TODO
+ * @createTime 2021年11月24日 14:31:00
+ */
+@Configuration
+public class CodecConfiguration {
+    static {
+        DefaultValueCodecResolver.COMMONS.register(MessageCodec.class,(field, jsonCodec) -> new MessageValueCodec());
+    }
+}

+ 0 - 1
jetlinks-components/network-component/network-core/pom.xml

@@ -50,7 +50,6 @@
         <dependency>
             <groupId>cn.hutool</groupId>
             <artifactId>hutool-json</artifactId>
-            <version>5.5.0</version>
         </dependency>
 
     </dependencies>

+ 4 - 3
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetlinksExtendTopicMessageCodec.java

@@ -10,6 +10,7 @@ import org.jetlinks.core.message.firmware.*;
 import org.jetlinks.core.message.function.FunctionInvokeMessage;
 import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
 import org.jetlinks.core.message.property.*;
+import org.jetlinks.core.utils.TopicUtils;
 import org.jetlinks.supports.utils.MqttTopicUtils;
 import org.springframework.util.Assert;
 
@@ -54,14 +55,14 @@ public class JetlinksExtendTopicMessageCodec {
         private boolean log;
         public DecodeResult(String topic) {
             this.topic = topic;
-            args = MqttTopicUtils.getPathVariables("/{productId}/{deviceId}/**", topic);
+            args = TopicUtils.getPathVariables("/{productId}/{deviceId}/**", topic);
             if (topic.contains("child")) {
                 child = true;
-                args.putAll(MqttTopicUtils.getPathVariables("/**/child/{childDeviceId}/**", topic));
+                args.putAll(TopicUtils.getPathVariables("/**/child/{childDeviceId}/**", topic));
             }
             if (topic.contains("event")) {
                 event = true;
-                args.putAll(MqttTopicUtils.getPathVariables("/**/event/{eventId}", topic));
+                args.putAll(TopicUtils.getPathVariables("/**/event/{eventId}", topic));
             }
             derivedMetadata = topic.endsWith("metadata/derived");
             if (event) {

+ 0 - 1
jetlinks-components/notify-component/notify-sms/pom.xml

@@ -20,7 +20,6 @@
         <dependency>
             <groupId>com.aliyun</groupId>
             <artifactId>aliyun-java-sdk-core</artifactId>
-            <version>4.5.2</version>
         </dependency>
 
         <dependency>

+ 0 - 1
jetlinks-components/notify-component/notify-voice/pom.xml

@@ -16,7 +16,6 @@
         <dependency>
             <groupId>com.aliyun</groupId>
             <artifactId>aliyun-java-sdk-core</artifactId>
-            <version>4.1.0</version>
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>

+ 1 - 1
jetlinks-core/src/main/java/org/jetlinks/core/device/StandaloneDeviceMessageBroker.java

@@ -117,7 +117,7 @@ public class StandaloneDeviceMessageBroker implements DeviceOperationBroker, Mes
     public Flux<DeviceMessageReply> handleReply(String deviceId,String messageId, Duration timeout) {
 
         return replyProcessor
-            .computeIfAbsent(messageId, ignore -> UnicastProcessor.create())
+            .computeIfAbsent(messageId, ignore -> EmitterProcessor.create())
             .timeout(timeout, Mono.error(() -> new DeviceOperationException(ErrorCode.TIME_OUT)))
             .doFinally(signal -> replyProcessor.remove(messageId));
     }

+ 2 - 0
jetlinks-core/src/main/java/org/jetlinks/core/message/MessageType.java

@@ -2,6 +2,7 @@ package org.jetlinks.core.message;
 
 import com.alibaba.fastjson.JSONObject;
 import lombok.AllArgsConstructor;
+import lombok.Getter;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.core.message.event.EventMessage;
 import org.jetlinks.core.message.firmware.*;
@@ -18,6 +19,7 @@ import java.util.Optional;
 import java.util.function.Supplier;
 
 @AllArgsConstructor
+@Getter
 public enum MessageType {
 
     //上报设备属性

+ 13 - 12
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceDirectivesEntity.java

@@ -1,8 +1,5 @@
 package org.jetlinks.community.device.entity;
 
-import cn.hutool.json.JSONUtil;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -12,8 +9,11 @@ import org.hswebframework.ezorm.rdb.mapping.annotation.*;
 import org.hswebframework.web.api.crud.entity.GenericEntity;
 import org.hswebframework.web.dict.Dict;
 import org.hswebframework.web.dict.EnumDict;
+import org.jetlinks.community.annotation.MessageCodec;
 import org.jetlinks.community.device.enums.DirectiveState;
 import org.jetlinks.core.message.CommonDeviceMessage;
+import org.jetlinks.core.message.DeviceMessage;
+
 import javax.persistence.Column;
 import javax.persistence.Index;
 import javax.persistence.Table;
@@ -70,18 +70,18 @@ public class DeviceDirectivesEntity extends GenericEntity<String> {
     private String lastError;
 
     @Comment("回复内容")
-    @Column(name = "reply_message")
+    @Column(name = "upstream")
     @ColumnType(jdbcType = JDBCType.CLOB)
     @Schema(description = "回复内容")
-    @JsonCodec
-    private CommonDeviceMessage replyMessage;
+    @MessageCodec
+    private DeviceMessage upstream;
 
     @Comment("下发指令")
-    @Column(name = "send_message")
+    @Column(name = "downstream")
     @ColumnType(jdbcType = JDBCType.CLOB)
     @Schema(description = "下发指令")
-    @JsonCodec
-    private CommonDeviceMessage sendMessage;
+    @MessageCodec
+    private DeviceMessage downstream;
 
     @Column(name = "state",length = 16)
     @EnumCodec
@@ -94,16 +94,17 @@ public class DeviceDirectivesEntity extends GenericEntity<String> {
     )
     private DirectiveState state;
 
-    public DeviceDirectivesEntity(String productId, String deviceId, String messageId, Long sendTimestamp, MessageType messageType, String lastError, CommonDeviceMessage replyMessage, CommonDeviceMessage sendMessage, DirectiveState state) {
+    public DeviceDirectivesEntity(String productId, String deviceId, String messageId, Long sendTimestamp, MessageType messageType, String lastError, DeviceMessage upstream, DeviceMessage downstream, DirectiveState state) {
         this.productId = productId;
         this.deviceId = deviceId;
         this.messageId = messageId;
         this.sendTimestamp = sendTimestamp;
         this.messageType = messageType;
         this.lastError = lastError;
-        this.replyMessage = replyMessage;
-        this.sendMessage = sendMessage;
+        this.upstream = upstream;
+        this.downstream = downstream;
         this.state = state;
+
     }
 
     public static DeviceDirectivesEntity of(String productId,

+ 4 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/interceptor/DeviceDirectiveSendInterceptor.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.device.interceptor;
 
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
 import org.jetlinks.community.device.entity.DeviceDirectivesEntity;
 import org.jetlinks.community.device.enums.DirectiveState;
 import org.jetlinks.community.device.service.DeviceDirectivesService;
@@ -35,7 +36,7 @@ public class DeviceDirectiveSendInterceptor implements DeviceMessageSenderInterc
     public Mono<DeviceMessage> preSend(DeviceOperator device, DeviceMessage message) {
         DeviceDirectivesEntity directivesEntity = new DeviceDirectivesEntity();
         directivesEntity.setState(DirectiveState.wait);
-        directivesEntity.setSendMessage((CommonDeviceMessage) message);
+        directivesEntity.setDownstream((CommonDeviceMessage) message);
         directivesEntity.setSendTimestamp(message.getTimestamp());
         directivesEntity.setMessageId(message.getMessageId());
         DeviceDirectivesEntity.MessageType directive = isDirective(message);
@@ -46,6 +47,7 @@ public class DeviceDirectiveSendInterceptor implements DeviceMessageSenderInterc
                 .flatMap(product->{
                     directivesEntity.setProductId(product.getId());
                     return deviceDirectivesService.save(directivesEntity)
+                        .onErrorReturn(SaveResult.of(0,0))
                         .thenReturn(message);
                 });
         }
@@ -61,7 +63,7 @@ public class DeviceDirectiveSendInterceptor implements DeviceMessageSenderInterc
                     deviceDirectivesService.createUpdate()
                         .where(DeviceDirectivesEntity::getDeviceId,msg.getDeviceId())
                         .where(DeviceDirectivesEntity::getMessageId,msg.getMessageId())
-                        .set(DeviceDirectivesEntity::getReplyMessage,msg)
+                        .set(DeviceDirectivesEntity::getUpstream,msg)
                         .set(DeviceDirectivesEntity::getState,DirectiveState.success)
                         .execute())
                 .onErrorResume(DeviceOperationException.class,e->deviceDirectivesService.createUpdate()

+ 19 - 9
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceDirectiveController.java

@@ -1,5 +1,7 @@
 package org.jetlinks.community.device.web;
 
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.util.ObjectUtil;
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import lombok.AllArgsConstructor;
@@ -16,12 +18,20 @@ import org.jetlinks.community.device.entity.DeviceDirectivesEntity;
 import org.jetlinks.community.device.service.DeviceDirectivesService;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.message.Headers;
+import org.jetlinks.core.message.MessageType;
+import org.jetlinks.core.message.function.FunctionInvokeMessage;
+import org.jetlinks.core.message.property.ReadPropertyMessage;
+import org.jetlinks.core.message.property.WritePropertyMessage;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * @author lifang
@@ -40,7 +50,7 @@ import reactor.core.publisher.Mono;
 public class DeviceDirectiveController implements
     ReactiveServiceCrudController<DeviceDirectivesEntity, String> {
     private final DeviceDirectivesService directivesService;
-    private final DeviceRegistry deviceRegistry;
+    private final DeviceRegistry registry;
     @Override
     public ReactiveCrudService<DeviceDirectivesEntity, String> getService() {
         return directivesService;
@@ -49,15 +59,15 @@ public class DeviceDirectiveController implements
     @PostMapping("/resend")
     @CreateAction
     @Operation(summary = "批量发送指令到设备")
-    public Mono<Boolean> sendDirectives(@RequestBody Flux<DeviceDirectivesEntity> directives){
-         return directives
+    public Mono<Void> sendDirectives(@RequestBody Flux<String> directiveIds){
+        return directivesService.findById(directiveIds)
             .flatMap(directive->
-                deviceRegistry
-                    .getDevice(directive.getDeviceId())
-                    .map(DeviceOperator::messageSender)
-                    .map(send->send.send(directive.getSendMessage()))
+                registry.getDevice(directive.getDeviceId())
+                    .publishOn(Schedulers.parallel())
+                    .flatMap(operator -> Mono.just(operator.messageSender()))
+                    .flatMap(sender->sender.send(directive.getDownstream()).then())
             )
-             .doOnError(e->Mono.error(new BusinessException("参数错误")))
-         .then(Mono.just(true));
+            .then()
+            .doOnError(e->Mono.error(new BusinessException("参数错误")));
     }
 }

+ 1 - 1
jetlinks-standalone/src/main/resources/application.yml

@@ -28,7 +28,7 @@ spring:
   #    database: 3
   #        max-wait: 10s
   r2dbc:
-    url: r2dbc:postgresql://192.168.104.114:5432/jetlinks
+    url: r2dbc:postgresql://192.168.104.114:5432/jetlinks?stringtype=unspecified
 #    url: r2dbc:mysql://192.168.100.32:3306/jetlinks
 #    username: root
 #    password: 123456

+ 9 - 7
pom.xml

@@ -27,14 +27,12 @@
         <vertx.version>3.8.5</vertx.version>
         <netty.version>4.1.51.Final</netty.version>
         <elasticsearch.version>7.11.2</elasticsearch.version>
-        <!--<reactor.excel.version>1.0.0</reactor.excel.version>-->
-        <!--<reactor.ql.version>1.0.11</reactor.ql.version>-->
-        <!--<fastjson.version>1.2.70</fastjson.version>-->
         <reactor.excel.version>1.0.1</reactor.excel.version>
         <reactor.ql.version>1.0.13</reactor.ql.version>
-        <fastjson.version>1.2.70</fastjson.version>
-        <hutool.version>5.5.8</hutool.version>
+        <fastjson.version>1.2.75</fastjson.version>
+        <hutool.version>5.7.16</hutool.version>
         <jetlinks.version>1.1.7-SNAPSHOT</jetlinks.version>
+        <aliyun.sdk.version>4.5.2</aliyun.sdk.version>
     </properties>
 
     <build>
@@ -176,6 +174,12 @@
 
         <dependencies>
 
+            <dependency>
+                <groupId>com.aliyun</groupId>
+                <artifactId>aliyun-java-sdk-core</artifactId>
+                <version>${aliyun.sdk.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>cn.hutool</groupId>
                 <artifactId>hutool-all</artifactId>
@@ -403,7 +407,6 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
-            <version>1.7.25</version>
         </dependency>
 
         <dependency>
@@ -426,7 +429,6 @@
         <dependency>
             <groupId>org.hswebframework</groupId>
             <artifactId>hsweb-utils</artifactId>
-            <version>3.0.3</version>
         </dependency>
 
     </dependencies>