Просмотр исходного кода

fix 规则引擎集群状态变化

18339543638 3 лет назад
Родитель
Сommit
b14408b104

+ 1 - 1
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/RuleAbstractClusterUniqueTask.java

@@ -117,7 +117,7 @@ class RuleAbstractClusterUniqueTask extends AbstractClusterUniqueTask implements
         if (message instanceof OperationMessage) {
             //操作信息
             OperationMessage operationMessage = (OperationMessage) message;
-            if (operationMessage.getFromServerId().equals(this.getWorkerId())) {
+            if (operationMessage.getFromServerId().equals(this.getCurrentSeverId())) {
                 return Mono.empty();
             }
             //任务存活

+ 3 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.java

@@ -9,6 +9,7 @@ import org.jetlinks.rule.engine.api.RuleConstants;
 import org.jetlinks.rule.engine.api.RuleData;
 import org.jetlinks.rule.engine.api.RuleDataHelper;
 import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.Task;
 import org.jetlinks.rule.engine.api.task.TaskExecutor;
 import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
 import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
@@ -68,6 +69,7 @@ public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
                         }));
             } else {
                 dataStream = reactorQL
+
                     .start(table -> {
                         if (table == null || table.equalsIgnoreCase("dual")) {
                             return Flux.just(1);
@@ -84,6 +86,7 @@ public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
                                                .topics(table)
                                                .local()
                                                .build())
+                                .filter(ignore -> state == Task.State.running)
                                 .flatMap(payload -> {
                                     try {
                                         return Mono.just(payload.bodyToJson(true));

+ 8 - 5
jetlinks-manager/media-manager/src/main/java/org/jetlinks/community/media/service/LocalMediaDeviceChannelService.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.media.service;
 
+import cn.hutool.core.collection.CollectionUtil;
 import io.vertx.core.spi.launcher.Command;
 import lombok.AllArgsConstructor;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
@@ -70,10 +71,12 @@ public class LocalMediaDeviceChannelService extends GenericReactiveCrudService<M
     @Override
     public void run(String... args)  {
         Set<Object> deviceIds = redisUtil.members("session_" + clusterManager.getCurrentServerId());
-        this.createUpdate()
-            .in(MediaDeviceChannel::getDeviceId,deviceIds)
-            .set(MediaDeviceChannel::getStatus,DeviceState.offline)
-            .execute()
-            .subscribe();
+        if(CollectionUtil.isNotEmpty(deviceIds)){
+            this.createUpdate()
+                .in(MediaDeviceChannel::getDeviceId,deviceIds)
+                .set(MediaDeviceChannel::getStatus,DeviceState.offline)
+                .execute()
+                .subscribe();
+        }
     }
 }

+ 11 - 7
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/RuleInstanceService.java

@@ -125,14 +125,18 @@ public class RuleInstanceService extends GenericReactiveCrudService<RuleInstance
     public void run(String... args) {
         createQuery()
             .where()
-            .is(RuleInstanceEntity::getState, RuleInstanceState.started)
             .fetch()
-            .flatMap(e -> this
-                .doStart(e)
-                .onErrorResume(err -> {
-                    log.warn("启动规则[{}]失败", e.getName(), e);
-                    return Mono.empty();
-                }))
+            .flatMap(e ->
+                this
+                    .doStart(e)
+                    .then(Mono.just(e)
+                        .filter(rule->rule.getState()==RuleInstanceState.stopped))
+                    .flatMap(rule->stop(rule.getId()))
+                    .onErrorResume(err -> {
+                        log.warn("启动规则[{}]失败", e.getName(), e);
+                        return Mono.empty();
+                    })
+            )
             .subscribe();
     }