Przeglądaj źródła

fixed 产品修改

18339543638 4 lat temu
rodzic
commit
ac35977ddc

+ 0 - 5
jetlinks-components/common-component/src/main/java/org/jetlinks/community/annotation/MessageValueCodec.java

@@ -4,15 +4,10 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import lombok.Data;
 import org.hswebframework.ezorm.core.ValueCodec;
-import org.jetlinks.core.message.CommonDeviceMessage;
-import org.jetlinks.core.message.Message;
 import org.jetlinks.core.message.MessageType;
 import org.jetlinks.core.message.function.FunctionInvokeMessage;
 import org.jetlinks.core.message.property.ReadPropertyMessage;
 import org.jetlinks.core.message.property.WritePropertyMessage;
-import org.springframework.beans.BeanUtils;
-
-import java.util.Map;
 
 /**
  * @author lifang

+ 1 - 1
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java

@@ -110,7 +110,7 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
             return new HashMap<>();
         }
         return metadata.stream()
-            .collect(Collectors.toMap(PropertyMetadata::getId, prop -> this.createElasticProperty(prop.getValueType())));
+            .collect(Collectors.toMap(PropertyMetadata::getId, prop -> this.createElasticProperty(prop.getValueType()),(a,b)->b));
     }
 
     protected Map<String, Object> createElasticProperty(DataType type) {

+ 61 - 6
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceDirectiveController.java

@@ -16,15 +16,15 @@ import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.hswebframework.web.exception.BusinessException;
 import org.jetlinks.community.device.entity.DeviceDirectivesEntity;
 import org.jetlinks.community.device.service.DeviceDirectivesService;
+import org.jetlinks.community.device.web.request.DirectiveParam;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
-import org.jetlinks.core.message.DeviceMessage;
-import org.jetlinks.core.message.DirectDeviceMessage;
-import org.jetlinks.core.message.Headers;
-import org.jetlinks.core.message.MessageType;
+import org.jetlinks.core.message.*;
 import org.jetlinks.core.message.function.FunctionInvokeMessage;
+import org.jetlinks.core.message.function.FunctionParameter;
 import org.jetlinks.core.message.property.ReadPropertyMessage;
 import org.jetlinks.core.message.property.WritePropertyMessage;
+import org.jetlinks.core.utils.IdUtils;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -35,6 +35,8 @@ import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 /**
  * @author lifang
@@ -61,8 +63,8 @@ public class DeviceDirectiveController implements
 
     @PostMapping("/resend")
     @CreateAction
-    @Operation(summary = "批量发送指令到设备")
-    public Mono<Void> sendDirectives(@RequestBody Mono<List<String>> idList){
+    @Operation(summary = "批量重新发送指令到设备")
+    public Mono<Void> resendDirectives(@RequestBody Mono<List<String>> idList){
         return  idList.flatMapMany(directivesService::findById)
             .flatMap(directive->
                 registry.getDevice(directive.getDeviceId())
@@ -78,4 +80,57 @@ public class DeviceDirectiveController implements
             .then()
             .doOnError(e->Mono.error(new BusinessException("参数错误")));
     }
+
+    @PostMapping("/send")
+    @CreateAction
+    @Operation(summary = "批量重新发送指令到设备")
+    public Mono<Void> sendDirective(@RequestBody DirectiveParam directiveParam){
+        return Mono.zip(this.parseParamMsg(directiveParam),this.parseDeviceIds(directiveParam))
+            .flatMap(tp2->{
+                List<DeviceOperator> t2 = tp2.getT2();
+                DeviceMessage message = tp2.getT1();
+                return Flux.fromStream(t2.stream())
+                    .flatMap(operator -> Mono.just(operator.messageSender()))
+                    .flatMap(sender->{
+                            message.addHeader(Headers.async,true);
+                            return sender.send(message)
+                                .onErrorContinue((e,obj)->log.warn("指令下发失败{}",message))
+                                .then();
+                        }
+                    )
+                    .then();
+            });
+    }
+
+
+    private Mono<DeviceMessage> parseParamMsg(DirectiveParam param){
+        Map<String, Object> message = param.getMessage();
+        Object msgType = message.get("messageType");
+        String msgId = IdUtils.newUUID();
+        if(MessageType.WRITE_PROPERTY.name().equals(msgType)){
+            WritePropertyMessage writePropertyMessage = new WritePropertyMessage();
+            writePropertyMessage.setMessageId(msgId);
+            writePropertyMessage.setProperties((Map<String, Object>) message.get("properties"));
+            return Mono.just(writePropertyMessage);
+        }
+        if(MessageType.READ_PROPERTY.name().equals(msgType)){
+            ReadPropertyMessage readPropertyMessage = new ReadPropertyMessage();
+            readPropertyMessage.setMessageId(msgId);
+            readPropertyMessage.setProperties((List<String>) message.get("properties"));
+            return Mono.just(readPropertyMessage);
+        }
+        if(MessageType.INVOKE_FUNCTION.name().equals(msgType)){
+            FunctionInvokeMessage functionInvokeMessage = new FunctionInvokeMessage();
+            functionInvokeMessage.setMessageId(msgId);
+            functionInvokeMessage.setInputs((List<FunctionParameter>) message.get("inputs"));
+            functionInvokeMessage.setFunctionId((String) message.get("functionId"));
+            return Mono.just(functionInvokeMessage);
+        }
+        throw new BusinessException("消息为空");
+    }
+
+    private Mono<List<DeviceOperator>> parseDeviceIds(DirectiveParam param){
+        return  Flux.fromStream(param.getDeviceId().stream())
+            .flatMap(registry::getDevice).collectList();
+    }
 }

+ 16 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/request/DirectiveParam.java

@@ -1,5 +1,7 @@
 package org.jetlinks.community.device.web.request;
 
+import lombok.Data;
+import java.util.*;
 /**
  * @author lifang
  * @version 1.0.0
@@ -7,6 +9,20 @@ package org.jetlinks.community.device.web.request;
  * @Description 下发指令参数
  * @createTime 2021年11月24日 10:53:00
  */
+@Data
 public class DirectiveParam {
+    /**
+     * 设备id集合
+     */
+    private List<String > deviceId;
 
+    /**
+     * 消息参数
+     */
+    private Map<String,Object> message;
+
+    /**
+     * 产品id
+     */
+    private String productId;
 }