Browse Source

add 规则引擎集群

18339543638 4 năm trước cách đây
mục cha
commit
e1c0b93460

+ 5 - 3
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -170,12 +170,14 @@ public class ClusterUniqueTask implements Task ,Serializable{
                     return true;
                 })
                 .flatMap(ignore->{
-                    lock.lock(pingTime,TimeUnit.SECONDS);
+                    try {
+                        lock.tryLock(pingTime,TimeUnit.SECONDS);
+                    }catch (Exception e){}
                     if (listenDisposable!=null) {
                         //终止监听心跳
                         listenDisposable.dispose();
                     }
-                    if(State.running.equals(taskState)||State.running.equals(executor.getState())){
+                    if(State.running.equals(taskState)&&!State.running.equals(executor.getState())){
                         executor.start();
                     }
                     return clusterManager.getTopic(pingTopic).publish(Mono.just(this)).then(Mono.empty());
@@ -304,7 +306,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
          * 抢夺失败时
          *
          */
-        if(listenDisposable==null) {
+        if(listenDisposable!=null) {
             return;
         }
         listenDisposable = clusterManager.getTopic(pingTopic)

+ 0 - 5
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageConnector.java

@@ -79,11 +79,6 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
                             Object obj = message.getMessage();
                             if(obj instanceof ClusterMessage){
                                 msg= (ClusterMessage) obj;
-                                String fromServer = msg.getFromServer();
-                                if(this.serverId.equals(fromServer)){
-                                    //消息消费者排除自身
-                                    return Mono.empty();
-                                }
                             }
                             if(msg==null){
                                 return Mono.empty();