소스 검색

add 场景联动解析

18339543638 4 년 전
부모
커밋
b8a2ddea76

+ 1 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.rule.engine.executor;
 
+import com.alibaba.fastjson.JSONObject;
 import lombok.AllArgsConstructor;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;

+ 26 - 41
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRule.java

@@ -4,6 +4,7 @@ import io.swagger.v3.oas.annotations.Hidden;
 import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.Setter;
 import org.jetlinks.community.rule.engine.model.Action;
 import org.jetlinks.core.message.DeviceMessage;
@@ -92,6 +93,14 @@ public class DeviceAlarmRule implements Serializable {
     @Schema(description = "防抖限制")
     private ShakeLimit shakeLimit;
 
+    @Schema(description = "告警级别")
+    @Hidden
+    private Integer level;
+
+    @Schema(description = "告警类别")
+    @Hidden
+    private String type;
+
     public void validate() {
         if (org.apache.commons.collections.CollectionUtils.isEmpty(getTriggers())) {
             throw new IllegalArgumentException("触发条件不能为空");
@@ -173,7 +182,7 @@ public class DeviceAlarmRule implements Serializable {
     @Getter
     @AllArgsConstructor
     public enum TriggerType implements Serializable {
-        //设备消息 todo 产品
+        //设备消息
         device(Arrays.asList(
             MessageType.values()
         )),
@@ -181,10 +190,7 @@ public class DeviceAlarmRule implements Serializable {
         timer(Arrays.asList(
             MessageType.properties,
             MessageType.function
-        )),
-        product(Arrays.asList(
-            MessageType.values()
-        )),;
+        ));
 
         final List<MessageType> supportMessageTypes;
 
@@ -194,16 +200,10 @@ public class DeviceAlarmRule implements Serializable {
     @Setter
     public static class Trigger implements Serializable {
 
-        //触发方式,定时,设备,产品
+        //触发方式,定时,设备
         @Schema(description = "触发方式")
         private TriggerType trigger = TriggerType.device;
 
-        @Schema(description = "设备id")
-        private String deviceId;
-
-        @Schema(description = "产品id")
-        private String productId;
-
         //trigger为定时任务时的cron表达式
         @Schema(description = "定时触发cron表达式")
         private String cron;
@@ -217,7 +217,7 @@ public class DeviceAlarmRule implements Serializable {
         private List<FunctionParameter> parameters;
 
         //物模型属性或者事件的标识 如: fire_alarm
-        @Schema(description = "物模型表示,如:属性ID,事件ID")
+        @Schema(description = "物模型标识,如:属性ID,事件ID")
         private String modelId;
 
         //过滤条件
@@ -280,34 +280,10 @@ public class DeviceAlarmRule implements Serializable {
         }
     }
 
-    /**
-     * 抖动限制
-     * <a href="https://github.com/jetlinks/jetlinks-community/issues/8">https://github.com/jetlinks/jetlinks-community/issues/8</a>
-     *
-     * @since 1.3
-     */
-    @Getter
-    @Setter
-    public static class ShakeLimit implements Serializable {
-        @Schema(description = "是否开启防抖")
-        private boolean enabled;
-
-        //时间限制,单位时间内发生多次告警时,只算一次。单位:秒
-        @Schema(description = "时间间隔(秒)")
-        private int time;
-
-        //触发阈值,单位时间内发生n次告警,只算一次。
-        @Schema(description = "触发阈值(次)")
-        private int threshold;
-
-        //当发生第一次告警时就触发,为false时表示最后一次才触发(告警有延迟,但是可以统计出次数)
-        @Schema(description = "是否第一次满足条件就触发")
-        private boolean alarmFirst;
-
-    }
-
     @Getter
     @Setter
+    @AllArgsConstructor
+    @NoArgsConstructor
     public static class ConditionFilter implements Serializable {
         //过滤条件key 如: temperature
         @Schema(description = "条件key")
@@ -326,11 +302,20 @@ public class DeviceAlarmRule implements Serializable {
         }
 
         public String createExpression(MessageType type) {
+            return createExpression(type, true);
+        }
+
+        public String createExpression(MessageType type, boolean prepareSQL) {
             //函数和this忽略前缀
             if (key.contains("(") || key.startsWith("this")) {
-                return key;
+                return key + operator.symbol + " ? ";
             }
-            return type.getPropertyPrefix() + "this['" + (key.trim()) + "'] " + operator.symbol + " ? ";
+            return type.getPropertyPrefix() + "this['" + (key.trim()) + "'] " + operator.symbol
+                + (prepareSQL ? " ? " : valueIsExpression() ? value : "'" + value + "'");
+        }
+
+        public boolean valueIsExpression() {
+            return false;
         }
 
         public Object convertValue() {

+ 1 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmTaskExecutorProvider.java

@@ -178,7 +178,6 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                 topics.toArray(new String[0]),
                 Subscription.Feature.local
             );
-//            List<Subscription> subscriptions = topics.stream().map(Subscription::new).collect(Collectors.toList());
 
             ReactorQLContext context = ReactorQLContext
                 .ofDatasource(ignore ->
@@ -204,7 +203,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                 .start(context)
                 .map(ReactorQLRecord::asMap);
 
-            DeviceAlarmRule.ShakeLimit shakeLimit;
+            ShakeLimit shakeLimit;
             if ((shakeLimit = rule.getShakeLimit()) != null
                 && shakeLimit.isEnabled()
                 && shakeLimit.getTime() > 0) {

+ 222 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/RuleSceneTaskExecutorProvider.java

@@ -0,0 +1,222 @@
+package org.jetlinks.community.rule.engine.device;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.ValueObject;
+import org.jetlinks.community.rule.engine.entity.RuleSceneEntity;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.metadata.Jsonable;
+import org.jetlinks.reactor.ql.ReactorQL;
+import org.jetlinks.reactor.ql.ReactorQLContext;
+import org.jetlinks.reactor.ql.ReactorQLRecord;
+import org.jetlinks.rule.engine.api.RuleConstants;
+import org.jetlinks.rule.engine.api.RuleData;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.Task;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.function.Tuples;
+
+import java.time.Duration;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+@AllArgsConstructor
+@Component
+public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
+
+    private final EventBus eventBus;
+
+    private final Scheduler scheduler;
+
+    @Override
+    public String getExecutor() {
+        return "rule_scene";
+    }
+
+    @Override
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
+        return Mono.just(new RuleSceneTaskExecutor(context));
+    }
+
+    class RuleSceneTaskExecutor extends AbstractTaskExecutor {
+
+        List<String> default_columns = Arrays.asList(
+            "timestamp", "deviceId"
+        );
+
+        private RuleSceneEntity rule;
+
+        private ReactorQL ql;
+
+        public RuleSceneTaskExecutor(ExecutionContext context) {
+            super(context);
+            rule = createRule();
+            ql = createQL(rule);
+        }
+
+        @Override
+        public String getName() {
+            return "场景联动";
+        }
+
+        @Override
+        protected Disposable doStart() {
+            rule.validate();
+            return doSubscribe(eventBus)
+//                .filter(ignore -> state == Task.State.running)
+                .flatMap(result -> {
+                    RuleData data = RuleData.create(result);
+                    //输出到下一节点
+                    return context
+                        .getOutput()
+                        .write(Mono.just(data))
+                        .then(context.fireEvent(RuleConstants.Event.result, data));
+                })
+                .onErrorResume(err -> context.onError(err, null))
+                .subscribe();
+        }
+
+        @Override
+        public void reload() {
+            rule = createRule();
+            ql = createQL(rule);
+            if (disposable != null) {
+                disposable.dispose();
+            }
+            disposable = doStart();
+        }
+
+        private RuleSceneEntity createRule() {
+            RuleSceneEntity rule = ValueObject.of(context.getJob().getConfiguration())
+                .get("rule")
+                .map(val ->
+                    FastBeanCopier.copy(val, new RuleSceneEntity())).orElseThrow(() -> new IllegalArgumentException("场景联动配置错误"));
+            rule.validate();
+            return rule;
+        }
+
+        @Override
+        public void validate() {
+            RuleSceneEntity rule = createRule();
+            try {
+                createQL(rule);
+            } catch (Exception e) {
+                throw new IllegalArgumentException("配置错误:" + e.getMessage(), e);
+            }
+        }
+
+        private ReactorQL createQL(RuleSceneEntity rule) {
+            List<String> columns = new ArrayList<>(default_columns);
+            List<String> wheres = new ArrayList<>();
+
+            List<RuleSceneEntity.Trigger> triggers = rule.getTriggers();
+
+            for (int i = 0; i < triggers.size(); i++) {
+                RuleSceneEntity.DeviceTrigger trigger = triggers.get(i).getDevice();
+                if(trigger==null){
+                    continue;
+                }
+                // select this.properties.this trigger0
+                columns.add(trigger.getType().getPropertyPrefix() + "this trigger" + i);
+                columns.addAll(trigger.toColumns());
+                trigger.createExpression()
+                    .ifPresent(expr -> wheres.add("(" + expr + ")"));
+            }
+            String sql = "select \n\t\t" + String.join("\n\t\t,", columns) + " \n\tfrom dual ";
+
+            if (!wheres.isEmpty()) {
+                sql += "\n\twhere " + String.join("\n\t\t or ", wheres);
+            }
+
+            log.debug("create rule scene sql : \n{}", sql);
+
+            return ReactorQL.builder().sql(sql).build();
+        }
+
+        public Flux<Map<String, Object>> doSubscribe(EventBus eventBus) {
+            Flux<Map<String, Object>> result =null;
+            for (RuleSceneEntity.Trigger triggerMix : rule.getTriggers()) {
+                RuleSceneEntity.DeviceTrigger trigger = triggerMix.getDevice();
+                String topic = trigger.getType().getTopic(trigger.getProductId(), trigger.getDeviceId(), trigger.getModelId());
+                //订阅主题
+                Subscription subscription = Subscription.of(
+                    "rule_scene:" + topic,
+                    topic,
+                    Subscription.Feature.local
+                );
+                ReactorQLContext context = ReactorQLContext
+                    .ofDatasource(ignore ->
+                        eventBus
+                            .subscribe(subscription, DeviceMessage.class)
+                            .map(Jsonable::toJson)
+                            .doOnNext(json -> {
+                                json.put("ruleSceneId", rule.getId());
+                                json.put("ruleSceneName", rule.getName());
+                            })
+                    );
+
+                Flux<Map<String, Object>> resultFlux = (ql == null ? ql = createQL(rule) : ql)
+                    .start(context)
+                    .map(ReactorQLRecord::asMap);
+                ShakeLimit shakeLimit = trigger.getShakeLimit();
+                if (shakeLimit != null
+                    && shakeLimit.isEnabled()
+                    && shakeLimit.getTime() > 0) {
+                    int thresholdNumber = shakeLimit.getThreshold();
+                    Duration windowTime = Duration.ofSeconds(shakeLimit.getTime());
+                    resultFlux
+                        .as(flux ->
+                            StringUtils.hasText(trigger.getDeviceId())
+                                ? flux.window(windowTime, scheduler)//规则已经指定了固定的设备,直接开启时间窗口就行
+                                : flux //规则配置在设备产品上,则按设备ID分组后再开窗口
+                                .groupBy(map -> String.valueOf(map.get("deviceId")), Integer.MAX_VALUE)
+                                .flatMap(group -> group.window(windowTime, scheduler), Integer.MAX_VALUE))
+                        //处理每一组数据
+                        .flatMap(group -> group
+                            .index((index, data) -> Tuples.of(index + 1, data)) //给数据打上索引,索引号就是告警次数
+                            .filter(tp -> tp.getT1() >= thresholdNumber)//超过阈值告警
+                            .as(flux -> shakeLimit.isAlarmFirst() ? flux.take(1) : flux.takeLast(1))//取第一个或者最后一个
+                            .map(tp2 -> {
+                                tp2.getT2().put("totalScene", tp2.getT1());
+                                return tp2.getT2();
+                            }));
+                }
+                result=result==null?resultFlux:result.concatWith(resultFlux);
+            }
+            return result;
+        }
+    }
+
+    public static void main(String[] args) {
+        Flux<String> stringFlux = Flux.just("1")
+            .doOnNext(System.out::println);
+        Flux<String> stringFlux2 = Flux.just("2")
+            .doOnNext(System.out::println);
+        Flux<String> stringFlux3 = Flux.just("3")
+            .doOnNext(System.out::println);
+        Flux.empty().concatWith(stringFlux)
+//        Flux<Flux<String>> fluxFlux = Flux.fromStream(Stream.of(stringFlux, stringFlux2));
+        .concatWith(stringFlux2).concatWith(stringFlux3).subscribe();
+//        fluxFlux
+//            .publishOn(Schedulers.elastic())
+//            .flatMap(Function.identity())
+//            .then()
+//            .subscribe();
+    }
+}

+ 103 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/ShakeLimit.java

@@ -0,0 +1,103 @@
+package org.jetlinks.community.rule.engine.device;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
+import reactor.util.function.Tuples;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/**
+ * 抖动限制
+ * <a href="https://github.com/jetlinks/jetlinks-community/issues/8">https://github.com/jetlinks/jetlinks-community/issues/8</a>
+ *
+ * @since 1.3
+ */
+@Getter
+@Setter
+public class ShakeLimit implements Serializable {
+    @Schema(description = "是否开启防抖")
+    private boolean enabled;
+
+    //时间限制,单位时间内发生多次告警时,只算一次。单位:秒
+    @Schema(description = "时间间隔(秒)")
+    private int time;
+
+    //触发阈值,单位时间内发生n次告警,只算一次。
+    @Schema(description = "触发阈值(次)")
+    private int threshold;
+
+    //当发生第一次告警时就触发,为false时表示最后一次才触发(告警有延迟,但是可以统计出次数)
+    @Schema(description = "是否第一次满足条件就触发")
+    private boolean alarmFirst;
+
+    /**
+     *
+     * 利用窗口函数,将ReactorQL语句包装为支持抖动限制的SQL.
+     *
+     * select * from ( sql )
+     * group by
+     * _window('1s') --时间窗口
+     * ,trace() -- 跟踪分组内行号信息
+     * ,take(-1) --取最后一条数据
+     * having row.index >= 2"; -- 行号信息索引就是数据量
+     *
+     * @param sql 原始SQL
+     * @return 防抖SQL
+     */
+    public String wrapReactorQl(@Nonnull String sql,
+                                @Nullable String groupBy) {
+        if (!enabled || time <= 0) {
+            return sql;
+        }
+        int takes = Math.max(threshold, 1);
+
+        return "select t.* from (" + sql + ") t" +
+            " group by " + (StringUtils.hasText(groupBy) ? groupBy + "," : "") +
+            "_window('" + time + "s')" + //时间窗口
+            ",trace()" +    //跟踪分组后新的行信息,row.index为分组内的行号,row.elapsed为与上一行数据间隔时间(毫秒)
+            ",take(" + (alarmFirst ? takes : -1) + ")" +
+            " having row.index >= " + takes;
+
+    }
+
+    /**
+     * 将流转换为支持抖动限制的流
+     *
+     * @param source         数据源
+     * @param windowFunction 窗口函数
+     * @param totalConsumer  总数接收器
+     * @param <T>            数据类型
+     * @return 新流
+     */
+    public <T> Flux<T> transfer(Flux<T> source,
+                                BiFunction<Duration, Flux<T>, Flux<Flux<T>>> windowFunction,
+                                BiConsumer<T, Long> totalConsumer) {
+        if (!enabled || time <= 0) {
+            return source;
+        }
+        int thresholdNumber = getThreshold();
+        Duration windowTime = Duration.ofSeconds(getTime());
+
+        return source
+            .as(flux -> windowFunction.apply(windowTime, flux))
+            //处理每一组数据
+            .flatMap(group -> group
+                //给数据打上索引,索引号就是告警次数
+                .index((index, data) -> Tuples.of(index + 1, data))
+                //超过阈值告警时
+                .filter(tp -> tp.getT1() >= thresholdNumber)
+                .as(flux -> isAlarmFirst() ? flux.take(1) : flux.takeLast(1))//取第一个或者最后一个
+                .map(tp2 -> {
+                    totalConsumer.accept(tp2.getT2(), tp2.getT1());
+                    return tp2.getT2();
+                }));
+    }
+}

+ 78 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/RuleSceneEntity.java

@@ -1,6 +1,10 @@
 package org.jetlinks.community.rule.engine.entity;
 
+import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSON;
 import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
@@ -12,8 +16,15 @@ import org.hswebframework.web.api.crud.entity.GenericEntity;
 import org.hswebframework.web.api.crud.entity.RecordCreationEntity;
 import org.hswebframework.web.validator.CreateGroup;
 import org.jetlinks.community.rule.engine.device.DeviceAlarmRule;
+import org.jetlinks.community.rule.engine.device.ShakeLimit;
 import org.jetlinks.community.rule.engine.enums.RuleSceneState;
 import org.jetlinks.community.rule.engine.model.Action;
+import org.jetlinks.community.rule.engine.model.RuleSceneModelParser;
+import org.jetlinks.core.message.function.FunctionParameter;
+import org.springframework.scheduling.support.CronSequenceGenerator;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.StringUtils;
+
 import javax.persistence.Column;
 import javax.persistence.GeneratedValue;
 import javax.persistence.Table;
@@ -21,6 +32,9 @@ import javax.validation.constraints.Pattern;
 import java.io.Serializable;
 import java.sql.JDBCType;
 import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 /**
  * @author lifang
  * @version 1.0.0
@@ -80,10 +94,72 @@ public class RuleSceneEntity  extends GenericEntity<String> implements RecordCre
     @Schema(description = "创建者ID")
     private String creatorId;
 
-
     @Data
     public static class Trigger implements Serializable{
-        private DeviceAlarmRule.Trigger device;
+        private DeviceTrigger device;
         private DeviceAlarmRule.TriggerType trigger;
+
+        private String cron;
+
+        public void validate() {
+            if (device.getType() == null) {
+                throw new IllegalArgumentException("类型不能为空");
+            }
+
+            if (device.getType() != DeviceAlarmRule.MessageType.online && device.getType() != DeviceAlarmRule.MessageType.offline && StringUtils.isEmpty(device.getModelId())) {
+                throw new IllegalArgumentException("属性/事件/功能ID不能为空");
+            }
+
+            if (trigger == DeviceAlarmRule.TriggerType.timer) {
+                if (StrUtil.isEmpty(cron)) {
+                    throw new IllegalArgumentException("cron表达式不能为空");
+                }
+                try {
+                    new CronSequenceGenerator(cron);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("cron表达式格式错误", e);
+                }
+
+            }
+        }
+
+    }
+
+    public void validate() {
+        if (CollectionUtil.isEmpty(triggers)) {
+            throw new IllegalArgumentException("触发条件不能为空");
+        }
+        getTriggers().forEach(Trigger::validate);
+    }
+
+    public RuleInstanceEntity toRuleInstance() {
+        RuleInstanceEntity instanceEntity = new RuleInstanceEntity();
+        if (triggers == null) {
+            throw new IllegalArgumentException("未设置触发器");
+        }
+
+        instanceEntity.setModelVersion(1);
+        instanceEntity.setId(getId());
+        instanceEntity.setModelId(getId());
+        instanceEntity.setDescription("场景联动");
+        instanceEntity.setModelType(RuleSceneModelParser.format);
+        instanceEntity.setName("场景联动:" + name);
+        instanceEntity.setModelMeta(JSON.toJSONString(this));
+        instanceEntity.setCreateTimeNow();
+        return instanceEntity;
+    }
+
+    @Getter
+    @Setter
+    public static class DeviceTrigger extends DeviceAlarmRule.Trigger implements Serializable {
+
+        @Schema(description = "设备id")
+        private String deviceId;
+
+        @Schema(description = "产品id")
+        private String productId;
+
+        @Schema(description = "防抖设置")
+        private ShakeLimit shakeLimit;
     }
 }

+ 48 - 137
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/RuleSceneModelParser.java

@@ -5,11 +5,9 @@ import cn.hutool.json.JSONUtil;
 import org.apache.commons.collections.CollectionUtils;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.community.rule.engine.device.DeviceAlarmRule;
-import org.jetlinks.community.rule.engine.entity.RuleInstanceEntity;
 import org.jetlinks.community.rule.engine.entity.RuleSceneEntity;
 import org.jetlinks.community.rule.engine.executor.DeviceMessageSendTaskExecutorProvider;
 import org.jetlinks.core.message.DeviceMessage;
-import org.jetlinks.rule.engine.api.model.RuleEngineModelParser;
 import org.jetlinks.rule.engine.api.model.RuleLink;
 import org.jetlinks.rule.engine.api.model.RuleModel;
 import org.jetlinks.rule.engine.api.model.RuleNodeModel;
@@ -45,98 +43,27 @@ public class RuleSceneModelParser implements RuleModelParserStrategy {
         model.setId(format+":"+rule.getId());
         model.setName(rule.getName());
         List<RuleSceneEntity.Trigger> triggers = rule.getTriggers();
-//        //执行动作
-//        List<Action> actions = rule.getActions();
-//        //处理定时触发
-//        {
-//            List<DeviceAlarmRule.Trigger> timerTriggers = triggers.stream()
-//                .filter(trigger -> trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer)
-//                .collect(Collectors.toList());
-//            int index = 0;
-//            for (DeviceAlarmRule.Trigger trigger : timerTriggers) {
-//                DeviceMessage msg = trigger.getType().createMessage(trigger).orElse(null);
-//                if (msg == null) {
-//                    throw new UnsupportedOperationException("不支持定时条件类型:" + trigger.getType());
-//                }
-//                RuleNodeModel timer = new RuleNodeModel();
-//                timer.setId("timer:" + (++index));
-//                timer.setName("定时发送设备消息");
-//                timer.setExecutor("timer");
-//                timer.setConfiguration(Collections.singletonMap("cron", trigger.getCron()));
-//
-//                DeviceMessageSendTaskExecutorProvider.Config senderConfig = new DeviceMessageSendTaskExecutorProvider.Config();
-//                senderConfig.setAsync(true);
-//                senderConfig.setDeviceId(trigger.getDeviceId());
-//                senderConfig.setProductId(trigger.getProductId());
-//                senderConfig.setMessage(msg.toJson());
-//
-//                RuleNodeModel messageSender = new RuleNodeModel();
-//                messageSender.setId("message-sender:" + (++index));
-//                messageSender.setName("定时发送设备消息");
-//                messageSender.setExecutor("device-message-sender");
-//                messageSender.setConfiguration(FastBeanCopier.copy(senderConfig, new HashMap<>()));
-//
-//                RuleLink link = new RuleLink();
-//                link.setId(timer.getId().concat(":").concat(messageSender.getId()));
-//                link.setName("执行动作:" + index);
-//                link.setSource(timer);
-//                link.setTarget(messageSender);
-//                timer.getOutputs().add(link);
-//                messageSender.getInputs().add(link);
-//                model.getNodes().add(timer);
-//                model.getNodes().add(messageSender);
-//            }
-//        }
-//        RuleNodeModel conditionNode = new RuleNodeModel();
-//        conditionNode.setId("conditions");
-//        conditionNode.setName("预警条件");
-//        conditionNode.setExecutor("device-message-sender");
-//        conditionNode.addConfiguration("rule", rule.getTriggers());
-////        conditionNode.setConfiguration(Collections.singletonMap("rule", rule.getTriggers()));
-//        model.getNodes().add(conditionNode);
-//        if (CollectionUtils.isNotEmpty(actions)) {
-//            int index = 0;
-//            for (Action operation : actions) {
-//                if (!StringUtils.hasText(operation.getExecutor())) {
-//                    continue;
-//                }
-//                index++;
-//                RuleNodeModel action = new RuleNodeModel();
-//                action.setId("device-message-sender:" + index);
-//                action.setName("执行动作:" + index);
-//                action.setExecutor(operation.getExecutor());
-//                action.setConfiguration(operation.getConfiguration());
-//
-//                RuleLink link = new RuleLink();
-//                link.setId(action.getId().concat(":").concat(conditionNode.getId()));
-//                link.setName("执行动作:" + index);
-//                link.setSource(conditionNode);
-//                link.setTarget(action);
-//                model.getNodes().add(action);
-//                action.getInputs().add(link);
-//                conditionNode.getOutputs().add(link);
-//            }
-//        }
-        return model;
-    }
 
-
-    /**
-     * 设备触发
-     */
-    private void handlerDeviceTrigger(List<DeviceAlarmRule.Trigger> deviceTriggers,RuleModel model){
-        if(!CollectionUtil.isEmpty(deviceTriggers)){
+        //执行动作
+        List<Action> actions = rule.getActions();
+        //处理定时触发
+        {
+            List<RuleSceneEntity.Trigger> timerTriggers = triggers.stream()
+                .filter(trigger -> trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer)
+                .collect(Collectors.toList());
             int index = 0;
-            for (DeviceAlarmRule.Trigger trigger : deviceTriggers) {
+
+            for (RuleSceneEntity.Trigger triggerMix : timerTriggers) {
+                RuleSceneEntity.DeviceTrigger trigger = triggerMix.getDevice();
                 DeviceMessage msg = trigger.getType().createMessage(trigger).orElse(null);
                 if (msg == null) {
                     throw new UnsupportedOperationException("不支持定时条件类型:" + trigger.getType());
                 }
-                RuleNodeModel node = new RuleNodeModel();
-                node.setId("device:" + (++index));
-                node.setName("设备触发发送消息");
-                node.setExecutor("device_alarm");
-                node.setConfiguration(Collections.singletonMap("rule", trigger.getCron()));
+                RuleNodeModel timer = new RuleNodeModel();
+                timer.setId("timer:" + (++index));
+                timer.setName("定时发送设备消息");
+                timer.setExecutor("timer");
+                timer.setConfiguration(Collections.singletonMap("cron", triggerMix.getCron()));
 
                 DeviceMessageSendTaskExecutorProvider.Config senderConfig = new DeviceMessageSendTaskExecutorProvider.Config();
                 senderConfig.setAsync(true);
@@ -145,71 +72,55 @@ public class RuleSceneModelParser implements RuleModelParserStrategy {
                 senderConfig.setMessage(msg.toJson());
 
                 RuleNodeModel messageSender = new RuleNodeModel();
-                messageSender.setId("device_alarm_action:" + (++index));
-                messageSender.setName("设备触发发送消息");
+                messageSender.setId("message-sender:" + (++index));
+                messageSender.setName("定时发送设备消息");
                 messageSender.setExecutor("device-message-sender");
                 messageSender.setConfiguration(FastBeanCopier.copy(senderConfig, new HashMap<>()));
 
                 RuleLink link = new RuleLink();
-                link.setId(node.getId().concat(":").concat(messageSender.getId()));
+                link.setId(timer.getId().concat(":").concat(messageSender.getId()));
                 link.setName("执行动作:" + index);
-                link.setSource(node);
+                link.setSource(timer);
                 link.setTarget(messageSender);
-                node.getOutputs().add(link);
+                timer.getOutputs().add(link);
                 messageSender.getInputs().add(link);
-                model.getNodes().add(node);
+                model.getNodes().add(timer);
                 model.getNodes().add(messageSender);
             }
         }
-    }
-
 
-    /**
-     * 产品触发
-     */
-    private void handlerProductTrigger(List<DeviceAlarmRule.Trigger> productTriggers,RuleModel model){
-        if(!CollectionUtil.isEmpty(productTriggers)){
-            if(!CollectionUtil.isEmpty(productTriggers)){
-                int index = 0;
-                for (DeviceAlarmRule.Trigger trigger : productTriggers) {
-                    DeviceMessage msg = trigger.getType().createMessage(trigger).orElse(null);
-                    if (msg == null) {
-                        throw new UnsupportedOperationException("不支持定时条件类型:" + trigger.getType());
-                    }
-                    RuleNodeModel node = new RuleNodeModel();
-                    node.setId("device:" + (++index));
-                    node.setName("产品触发发送消息");
-                    node.setExecutor("device_alarm");
-                    node.setConfiguration(Collections.singletonMap("rule", trigger.getCron()));
 
-                    DeviceMessageSendTaskExecutorProvider.Config senderConfig = new DeviceMessageSendTaskExecutorProvider.Config();
-                    senderConfig.setAsync(true);
-                    senderConfig.setDeviceId(trigger.getDeviceId());
-                    senderConfig.setProductId(trigger.getProductId());
-                    senderConfig.setMessage(msg.toJson());
 
-                    RuleNodeModel messageSender = new RuleNodeModel();
-                    messageSender.setId("device_alarm_action:" + (++index));
-                    messageSender.setName("设备触发发送消息");
-                    messageSender.setExecutor("device-message-sender");
-                    messageSender.setConfiguration(FastBeanCopier.copy(senderConfig, new HashMap<>()));
+        RuleNodeModel conditionNode = new RuleNodeModel();
+        conditionNode.setId("conditions");
+        conditionNode.setName("联动条件");
+        conditionNode.setExecutor("rule_scene");
+        conditionNode.addConfiguration("rule", rule);
+        model.getNodes().add(conditionNode);
 
-                    RuleLink link = new RuleLink();
-                    link.setId(node.getId().concat(":").concat(messageSender.getId()));
-                    link.setName("执行动作:" + index);
-                    link.setSource(node);
-                    link.setTarget(messageSender);
-                    node.getOutputs().add(link);
-                    messageSender.getInputs().add(link);
-                    model.getNodes().add(node);
-                    model.getNodes().add(messageSender);
+        if (CollectionUtils.isNotEmpty(actions)) {
+            int index = 0;
+            for (Action operation : actions) {
+                if (!StringUtils.hasText(operation.getExecutor())) {
+                    continue;
                 }
+                index++;
+                RuleNodeModel action = new RuleNodeModel();
+                action.setId("device-message-sender:" + index);
+                action.setName("执行动作:" + index);
+                action.setExecutor(operation.getExecutor());
+                action.setConfiguration(operation.getConfiguration());
+
+                RuleLink link = new RuleLink();
+                link.setId(action.getId().concat(":").concat(conditionNode.getId()));
+                link.setName("执行动作:" + index);
+                link.setSource(conditionNode);
+                link.setTarget(action);
+                model.getNodes().add(action);
+                action.getInputs().add(link);
+                conditionNode.getOutputs().add(link);
             }
         }
-    }
-
-
-    private void handlerTrigger(){
-
+        return model;
     }
 }

+ 43 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/RuleSceneService.java

@@ -1,8 +1,14 @@
 package org.jetlinks.community.rule.engine.service;
 
+import lombok.AllArgsConstructor;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
+import org.jetlinks.community.rule.engine.entity.DeviceAlarmEntity;
 import org.jetlinks.community.rule.engine.entity.RuleSceneEntity;
+import org.jetlinks.community.rule.engine.enums.AlarmState;
+import org.reactivestreams.Publisher;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * @author lifang
@@ -12,5 +18,42 @@ import org.springframework.stereotype.Service;
  * @createTime 2021年09月16日 17:23:00
  */
 @Service
+@AllArgsConstructor
 public class RuleSceneService extends GenericReactiveCrudService<RuleSceneEntity, String> {
+    private final RuleInstanceService instanceService;
+
+
+    public Mono<Void> start(String id) {
+        return findById(id)
+            .flatMap(this::doStart);
+    }
+
+    private Mono<Void> doStart(RuleSceneEntity entity) {
+        return instanceService
+            .save(Mono.just(entity.toRuleInstance()))
+            .then(instanceService.start(entity.getId()))
+            .then(createUpdate()
+                .set(RuleSceneEntity::getState, AlarmState.running)
+                .where(entity::getId).execute())
+            .then();
+    }
+
+
+    public Mono<Void> stop(String id) {
+        return instanceService.stop(id)
+            .then(createUpdate()
+                .set(RuleSceneEntity::getState,AlarmState.stopped)
+                .where(DeviceAlarmEntity::getId,id)
+                .execute())
+            .then();
+    }
+
+    @Override
+    public Mono<Integer> deleteById(Publisher<String> idPublisher) {
+        return Flux.from(idPublisher)
+            .flatMap(id -> instanceService.stop(id)
+                .then(instanceService.deleteById(Mono.just(id)))
+                .then(RuleSceneService.super.deleteById(Mono.just(id)))
+            ).reduce(Math::addExact);
+    }
 }

+ 26 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/RuleSceneController.java

@@ -1,12 +1,15 @@
 package org.jetlinks.community.rule.engine.web;
 
+import io.swagger.v3.oas.annotations.Operation;
 import lombok.AllArgsConstructor;
 import org.hswebframework.web.authorization.annotation.Resource;
+import org.hswebframework.web.authorization.annotation.SaveAction;
 import org.hswebframework.web.crud.service.ReactiveCrudService;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.jetlinks.community.rule.engine.entity.RuleSceneEntity;
 import org.jetlinks.community.rule.engine.service.RuleSceneService;
 import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Mono;
 
 @RestController
 @RequestMapping("rule-engine/scene")
@@ -23,4 +26,27 @@ public class RuleSceneController implements ReactiveServiceCrudController<RuleSc
         return sceneService;
     }
 
+
+    @PostMapping("/{id}/_start")
+    @SaveAction
+    @Operation(summary = "启动告警配置")
+    public Mono<Void> startAlarm(@PathVariable String id) {
+        return sceneService.start(id);
+    }
+
+
+    @PostMapping("/{id}/_stop")
+    @SaveAction
+    @Operation(summary = "停止告警配置")
+    public Mono<Void> stopAlarm(@PathVariable String id) {
+        return sceneService.stop(id);
+    }
+
+    @DeleteMapping("/{id}")
+    @SaveAction
+    @Operation(summary = "删除告警配置")
+    public Mono<Void> deleteAlarm(@PathVariable String id) {
+        return sceneService.deleteById(Mono.just(id))
+            .then();
+    }
 }