Jelajahi Sumber

fixed 场景互联

18339543638 4 tahun lalu
induk
melakukan
3ac61d9497

+ 16 - 16
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/RuleSceneTaskExecutorProvider.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.rule.engine.device;
 
+import com.alibaba.fastjson.JSONObject;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
@@ -79,7 +80,7 @@ public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
         protected Disposable doStart() {
             rule.validate();
             return doSubscribe(eventBus)
-//                .filter(ignore -> state == Task.State.running)
+                .filter(ignore -> state == Task.State.running)
                 .flatMap(result -> {
                     RuleData data = RuleData.create(result);
                     //输出到下一节点
@@ -160,20 +161,15 @@ public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
                     topic,
                     Subscription.Feature.local
                 );
-                ReactorQLContext context = ReactorQLContext
-                    .ofDatasource(ignore ->
-                        eventBus
-                            .subscribe(subscription, DeviceMessage.class)
-                            .map(Jsonable::toJson)
-                            .doOnNext(json -> {
-                                json.put("ruleSceneId", rule.getId());
-                                json.put("ruleSceneName", rule.getName());
-                            })
-                    );
-
-                Flux<Map<String, Object>> resultFlux = (ql == null ? ql = createQL(rule) : ql)
-                    .start(context)
-                    .map(ReactorQLRecord::asMap);
+                Flux<Map<String, Object>> resultFlux = eventBus
+                    .subscribe(subscription, DeviceMessage.class)
+                    .map(Jsonable::toJson)
+                    .doOnNext(json -> {
+                        json.put("productId", trigger.getProductId());
+                        json.put("ruleSceneId", rule.getId());
+                        json.put("ruleSceneName", rule.getName());
+                    })
+                    .map(JSONObject::getInnerMap);
                 ShakeLimit shakeLimit = trigger.getShakeLimit();
                 if (shakeLimit != null
                     && shakeLimit.isEnabled()
@@ -212,7 +208,11 @@ public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
             .doOnNext(System.out::println);
         Flux.empty().concatWith(stringFlux)
 //        Flux<Flux<String>> fluxFlux = Flux.fromStream(Stream.of(stringFlux, stringFlux2));
-        .concatWith(stringFlux2).concatWith(stringFlux3).subscribe();
+        .concatWith(stringFlux2).concatWith(stringFlux3)
+            .doOnNext(s->{
+                System.out.println(s);
+            }).
+                subscribe();
 //        fluxFlux
 //            .publishOn(Schedulers.elastic())
 //            .flatMap(Function.identity())

+ 3 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/enums/RuleSceneState.java

@@ -8,8 +8,9 @@ import org.hswebframework.web.dict.EnumDict;
 @Getter
 public enum RuleSceneState implements EnumDict<String> {
 
-    running("运行中"),
-    stopped("已停止");
+    running("已启用"),
+    stopped("已禁用"),
+    disable("已停止");
 
     private String text;