|
|
@@ -11,6 +11,7 @@ import org.jetlinks.core.event.EventBus;
|
|
|
import org.jetlinks.core.event.Subscription;
|
|
|
import org.jetlinks.core.message.DeviceMessage;
|
|
|
import org.jetlinks.core.metadata.Jsonable;
|
|
|
+import org.jetlinks.reactor.ql.DefaultReactorQLRecord;
|
|
|
import org.jetlinks.reactor.ql.ReactorQL;
|
|
|
import org.jetlinks.reactor.ql.ReactorQLContext;
|
|
|
import org.jetlinks.reactor.ql.ReactorQLRecord;
|
|
|
@@ -21,6 +22,7 @@ import org.jetlinks.rule.engine.api.task.Task;
|
|
|
import org.jetlinks.rule.engine.api.task.TaskExecutor;
|
|
|
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
|
|
|
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
|
|
|
+import org.jetlinks.supports.event.BrokerEventBus;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.StringUtils;
|
|
|
import reactor.core.Disposable;
|
|
|
@@ -45,6 +47,7 @@ public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
|
|
|
private final Scheduler scheduler;
|
|
|
|
|
|
+
|
|
|
@Override
|
|
|
public String getExecutor() {
|
|
|
return "rule_scene";
|
|
|
@@ -57,9 +60,9 @@ public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
|
|
|
class RuleSceneTaskExecutor extends AbstractTaskExecutor {
|
|
|
|
|
|
- List<String> default_columns = Arrays.asList(
|
|
|
- "timestamp", "deviceId"
|
|
|
- );
|
|
|
+// List<String> default_columns = Arrays.asList(
|
|
|
+// "timestamp", "deviceId"
|
|
|
+// );
|
|
|
|
|
|
private RuleSceneEntity rule;
|
|
|
|
|
|
@@ -123,7 +126,8 @@ public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
}
|
|
|
|
|
|
private ReactorQL createQL(RuleSceneEntity rule) {
|
|
|
- List<String> columns = new ArrayList<>(default_columns);
|
|
|
+// List<String> columns = new ArrayList<>(default_columns);
|
|
|
+ List<String> columns = new ArrayList<>();
|
|
|
List<String> wheres = new ArrayList<>();
|
|
|
|
|
|
List<RuleSceneEntity.Trigger> triggers = rule.getTriggers();
|
|
|
@@ -140,7 +144,6 @@ public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
.ifPresent(expr -> wheres.add("(" + expr + ")"));
|
|
|
}
|
|
|
String sql = "select \n\t\t" + String.join("\n\t\t,", columns) + " \n\tfrom dual ";
|
|
|
-
|
|
|
if (!wheres.isEmpty()) {
|
|
|
sql += "\n\twhere " + String.join("\n\t\t or ", wheres);
|
|
|
}
|
|
|
@@ -152,71 +155,59 @@ public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
|
|
|
public Flux<Map<String, Object>> doSubscribe(EventBus eventBus) {
|
|
|
Flux<Map<String, Object>> result =null;
|
|
|
+ List<Object> binds = new ArrayList<>();
|
|
|
for (RuleSceneEntity.Trigger triggerMix : rule.getTriggers()) {
|
|
|
- RuleSceneEntity.DeviceTrigger trigger = triggerMix.getDevice();
|
|
|
- String topic = trigger.getType().getTopic(trigger.getProductId(), trigger.getDeviceId(), trigger.getModelId());
|
|
|
- //订阅主题
|
|
|
- Subscription subscription = Subscription.of(
|
|
|
- "rule_scene:" + topic,
|
|
|
- topic,
|
|
|
- Subscription.Feature.local
|
|
|
+ RuleSceneEntity.DeviceTrigger trigger = rule.getTriggers().get(0).getDevice();
|
|
|
+ String topic = trigger.getType().getTopic(trigger.getProductId(), trigger.getDeviceId(), trigger.getModelId());
|
|
|
+ binds.addAll(trigger.toFilterBinds());
|
|
|
+ //订阅主题
|
|
|
+ Subscription subscription = Subscription.of(
|
|
|
+ "rule_scene:" + topic,
|
|
|
+ topic,
|
|
|
+ Subscription.Feature.local
|
|
|
+ );
|
|
|
+ ReactorQLContext context = ReactorQLContext
|
|
|
+ .ofDatasource(ignore ->
|
|
|
+ eventBus
|
|
|
+ .subscribe(subscription, DeviceMessage.class)
|
|
|
+ .map(Jsonable::toJson)
|
|
|
+ .doOnNext(json -> {
|
|
|
+ json.put("ruleSceneId", rule.getId());
|
|
|
+ json.put("ruleSceneName", rule.getName());
|
|
|
+ })
|
|
|
);
|
|
|
- Flux<Map<String, Object>> resultFlux = eventBus
|
|
|
- .subscribe(subscription, DeviceMessage.class)
|
|
|
- .map(Jsonable::toJson)
|
|
|
- .doOnNext(json -> {
|
|
|
- json.put("productId", trigger.getProductId());
|
|
|
- json.put("ruleSceneId", rule.getId());
|
|
|
- json.put("ruleSceneName", rule.getName());
|
|
|
- })
|
|
|
- .map(JSONObject::getInnerMap);
|
|
|
- ShakeLimit shakeLimit = trigger.getShakeLimit();
|
|
|
- if (shakeLimit != null
|
|
|
- && shakeLimit.isEnabled()
|
|
|
- && shakeLimit.getTime() > 0) {
|
|
|
- int thresholdNumber = shakeLimit.getThreshold();
|
|
|
- Duration windowTime = Duration.ofSeconds(shakeLimit.getTime());
|
|
|
- resultFlux
|
|
|
- .as(flux ->
|
|
|
- StringUtils.hasText(trigger.getDeviceId())
|
|
|
- ? flux.window(windowTime, scheduler)//规则已经指定了固定的设备,直接开启时间窗口就行
|
|
|
- : flux //规则配置在设备产品上,则按设备ID分组后再开窗口
|
|
|
- .groupBy(map -> String.valueOf(map.get("deviceId")), Integer.MAX_VALUE)
|
|
|
- .flatMap(group -> group.window(windowTime, scheduler), Integer.MAX_VALUE))
|
|
|
- //处理每一组数据
|
|
|
- .flatMap(group -> group
|
|
|
- .index((index, data) -> Tuples.of(index + 1, data)) //给数据打上索引,索引号就是告警次数
|
|
|
- .filter(tp -> tp.getT1() >= thresholdNumber)//超过阈值告警
|
|
|
- .as(flux -> shakeLimit.isAlarmFirst() ? flux.take(1) : flux.takeLast(1))//取第一个或者最后一个
|
|
|
- .map(tp2 -> {
|
|
|
- tp2.getT2().put("totalScene", tp2.getT1());
|
|
|
- return tp2.getT2();
|
|
|
- }));
|
|
|
- }
|
|
|
+
|
|
|
+ binds.forEach(context::bind);
|
|
|
+ Flux<Map<String, Object>> resultFlux = (ql == null ? ql = createQL(rule) : ql)
|
|
|
+ .start(context)
|
|
|
+ .map(ReactorQLRecord::asMap);
|
|
|
+
|
|
|
+ ShakeLimit shakeLimit = trigger.getShakeLimit();
|
|
|
+ if (shakeLimit != null
|
|
|
+ && shakeLimit.isEnabled()
|
|
|
+ && shakeLimit.getTime() > 0) {
|
|
|
+ int thresholdNumber = shakeLimit.getThreshold();
|
|
|
+ Duration windowTime = Duration.ofSeconds(shakeLimit.getTime());
|
|
|
+ resultFlux
|
|
|
+ .as(flux ->
|
|
|
+ StringUtils.hasText(trigger.getDeviceId())
|
|
|
+ ? flux.window(windowTime, scheduler)//规则已经指定了固定的设备,直接开启时间窗口就行
|
|
|
+ : flux //规则配置在设备产品上,则按设备ID分组后再开窗口
|
|
|
+ .groupBy(map -> String.valueOf(map.get("deviceId")), Integer.MAX_VALUE)
|
|
|
+ .flatMap(group -> group.window(windowTime, scheduler), Integer.MAX_VALUE))
|
|
|
+ //处理每一组数据
|
|
|
+ .flatMap(group -> group
|
|
|
+ .index((index, data) -> Tuples.of(index + 1, data)) //给数据打上索引,索引号就是告警次数
|
|
|
+ .filter(tp -> tp.getT1() >= thresholdNumber)//超过阈值告警
|
|
|
+ .as(flux -> shakeLimit.isAlarmFirst() ? flux.take(1) : flux.takeLast(1))//取第一个或者最后一个
|
|
|
+ .map(tp2 -> {
|
|
|
+ tp2.getT2().put("totalScene", tp2.getT1());
|
|
|
+ return tp2.getT2();
|
|
|
+ }));
|
|
|
+ }
|
|
|
result=result==null?resultFlux:result.concatWith(resultFlux);
|
|
|
}
|
|
|
return result;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- public static void main(String[] args) {
|
|
|
- Flux<String> stringFlux = Flux.just("1")
|
|
|
- .doOnNext(System.out::println);
|
|
|
- Flux<String> stringFlux2 = Flux.just("2")
|
|
|
- .doOnNext(System.out::println);
|
|
|
- Flux<String> stringFlux3 = Flux.just("3")
|
|
|
- .doOnNext(System.out::println);
|
|
|
- Flux.empty().concatWith(stringFlux)
|
|
|
-// Flux<Flux<String>> fluxFlux = Flux.fromStream(Stream.of(stringFlux, stringFlux2));
|
|
|
- .concatWith(stringFlux2).concatWith(stringFlux3)
|
|
|
- .doOnNext(s->{
|
|
|
- System.out.println(s);
|
|
|
- }).
|
|
|
- subscribe();
|
|
|
-// fluxFlux
|
|
|
-// .publishOn(Schedulers.elastic())
|
|
|
-// .flatMap(Function.identity())
|
|
|
-// .then()
|
|
|
-// .subscribe();
|
|
|
- }
|
|
|
}
|