Browse Source

change 指令下发改为异步进行

18339543638 4 years ago
parent
commit
04de948043

+ 6 - 7
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceDirectiveController.java

@@ -65,7 +65,7 @@ public class DeviceDirectiveController implements
     @CreateAction
     @Operation(summary = "批量重新发送指令到设备")
     public Mono<Void> resendDirectives(@RequestBody Mono<List<String>> idList){
-        return  Mono.fromRunnable(()->idList.flatMapMany(directivesService::findById)
+        return idList.flatMapMany(directivesService::findById)
             .parallel(Runtime.getRuntime().availableProcessors()*2)
             .runOn(Schedulers.parallel())
             .flatMap(directive->
@@ -76,19 +76,18 @@ public class DeviceDirectiveController implements
                             directive.getDownstream().addHeader(Headers.async,false);
                             ( (CommonDeviceMessage)directive.getDownstream()).setMessageId(IdUtils.newUUID());
                             ( (CommonDeviceMessage)directive.getDownstream()).setTimestamp(System.currentTimeMillis());
-                            return sender.send(directive.getDownstream())
-                                .onErrorContinue((e,obj)->log.warn("指令重新发送失败{}",directive))
-                                .then();
+                            return sender.send(directive.getDownstream()).then();
                         }
                     )
-            ).subscribe());
+            )
+            .then();
     }
 
     @PostMapping("/send")
     @CreateAction
     @Operation(summary = "批量重新发送指令到设备")
     public Mono<Void> sendDirective(@RequestBody DirectiveParam directiveParam){
-        return Mono.fromRunnable(()->Mono.zip(this.parseParamMsg(directiveParam),this.parseDeviceIds(directiveParam))
+        return Mono.zip(this.parseParamMsg(directiveParam),this.parseDeviceIds(directiveParam))
             .flatMap(tp2->{
                 List<DeviceOperator> t2 = tp2.getT2();
                 DeviceMessage message = tp2.getT1();
@@ -104,7 +103,7 @@ public class DeviceDirectiveController implements
                         }
                     )
                     .then();
-            }).subscribe());
+            });
     }