Parcourir la source

add 下发指令

18339543638 il y a 4 ans
Parent
commit
948282f88e

+ 15 - 5
jetlinks-components/common-component/src/main/java/org/jetlinks/community/annotation/MessageValueCodec.java

@@ -7,8 +7,13 @@ import org.hswebframework.ezorm.core.ValueCodec;
 import org.jetlinks.core.message.CommonDeviceMessage;
 import org.jetlinks.core.message.Message;
 import org.jetlinks.core.message.MessageType;
+import org.jetlinks.core.message.function.FunctionInvokeMessage;
+import org.jetlinks.core.message.property.ReadPropertyMessage;
+import org.jetlinks.core.message.property.WritePropertyMessage;
 import org.springframework.beans.BeanUtils;
 
+import java.util.Map;
+
 /**
  * @author lifang
  * @version 1.0.0
@@ -27,10 +32,15 @@ public class MessageValueCodec implements ValueCodec<Object, Object> {
     public Object decode(Object data) {
         JSONObject jsonObject = JSON.parseObject(String.valueOf(data));
         Object msgType = jsonObject.get("messageType");
-        MessageType messageType = MessageType.of(String.valueOf(msgType)).orElse(MessageType.UNKNOWN);
-        Message result = messageType.getNewInstance().get();
-        CommonDeviceMessage commonDeviceMessage = jsonObject.toJavaObject(CommonDeviceMessage.class);
-        BeanUtils.copyProperties(commonDeviceMessage,result);
-        return result;
+        if(MessageType.WRITE_PROPERTY.name().equals(msgType)){
+            return jsonObject.toJavaObject(WritePropertyMessage.class);
+        }
+        if(MessageType.READ_PROPERTY.name().equals(msgType)){
+            return jsonObject.toJavaObject(ReadPropertyMessage.class);
+        }
+        if(MessageType.INVOKE_FUNCTION.name().equals(msgType)){
+            return jsonObject.toJavaObject(FunctionInvokeMessage.class);
+        }
+        return null;
     }
 }

+ 4 - 3
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/interceptor/DeviceDirectiveSendInterceptor.java

@@ -36,7 +36,7 @@ public class DeviceDirectiveSendInterceptor implements DeviceMessageSenderInterc
     public Mono<DeviceMessage> preSend(DeviceOperator device, DeviceMessage message) {
         DeviceDirectivesEntity directivesEntity = new DeviceDirectivesEntity();
         directivesEntity.setState(DirectiveState.wait);
-        directivesEntity.setDownstream((CommonDeviceMessage) message);
+        directivesEntity.setDownstream(message);
         directivesEntity.setSendTimestamp(message.getTimestamp());
         directivesEntity.setMessageId(message.getMessageId());
         DeviceDirectivesEntity.MessageType directive = isDirective(message);
@@ -59,19 +59,20 @@ public class DeviceDirectiveSendInterceptor implements DeviceMessageSenderInterc
         DeviceDirectivesEntity.MessageType directive = isDirective(message);
         if(directive!=null){
             return reply.cast(DeviceMessage.class)
-                .map(msg->
+                .flatMap(msg->
                     deviceDirectivesService.createUpdate()
                         .where(DeviceDirectivesEntity::getDeviceId,msg.getDeviceId())
                         .where(DeviceDirectivesEntity::getMessageId,msg.getMessageId())
                         .set(DeviceDirectivesEntity::getUpstream,msg)
                         .set(DeviceDirectivesEntity::getState,DirectiveState.success)
+                        .set(DeviceDirectivesEntity::getLastError,null)
                         .execute())
                 .onErrorResume(DeviceOperationException.class,e->deviceDirectivesService.createUpdate()
                     .where(DeviceDirectivesEntity::getDeviceId,message.getDeviceId())
                     .where(DeviceDirectivesEntity::getMessageId,message.getMessageId())
                     .set(DeviceDirectivesEntity::getLastError,e.getMessage())
                     .set(DeviceDirectivesEntity::getState,DirectiveState.sendError)
-                    .execute().thenReturn(Mono.empty()))
+                    .execute())
                 .thenMany(reply);
         }
         return reply;

+ 12 - 4
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceDirectiveController.java

@@ -19,6 +19,7 @@ import org.jetlinks.community.device.service.DeviceDirectivesService;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.message.DirectDeviceMessage;
 import org.jetlinks.core.message.Headers;
 import org.jetlinks.core.message.MessageType;
 import org.jetlinks.core.message.function.FunctionInvokeMessage;
@@ -33,6 +34,8 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
+import java.util.List;
+
 /**
  * @author lifang
  * @version 1.0.0
@@ -59,13 +62,18 @@ public class DeviceDirectiveController implements
     @PostMapping("/resend")
     @CreateAction
     @Operation(summary = "批量发送指令到设备")
-    public Mono<Void> sendDirectives(@RequestBody Flux<String> directiveIds){
-        return directivesService.findById(directiveIds)
+    public Mono<Void> sendDirectives(@RequestBody Mono<List<String>> idList){
+        return  idList.flatMapMany(directivesService::findById)
             .flatMap(directive->
                 registry.getDevice(directive.getDeviceId())
-                    .publishOn(Schedulers.parallel())
                     .flatMap(operator -> Mono.just(operator.messageSender()))
-                    .flatMap(sender->sender.send(directive.getDownstream()).then())
+                    .flatMap(sender->{
+                            directive.getDownstream().addHeader(Headers.async,true);
+                            return sender.send(directive.getDownstream())
+                                .onErrorContinue((e,obj)->log.warn("指令重新发送失败{}",directive))
+                                .then();
+                        }
+                    )
             )
             .then()
             .doOnError(e->Mono.error(new BusinessException("参数错误")));