Kaynağa Gözat

add 规则引擎集群

18339543638 4 yıl önce
ebeveyn
işleme
87798556e0

+ 17 - 8
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -57,14 +57,14 @@ class ClusterUniqueTask implements Task ,Serializable{
      */
     private long lastStateTime;
 
-    private transient final RedissonClient redissonClient;
+
 
     /**
      * 开始时间
      */
     private long startTime;
 
-    private transient final AbstractExecutionContext context;
+
 
     /**
      * 默认心跳时间10s
@@ -72,16 +72,20 @@ class ClusterUniqueTask implements Task ,Serializable{
     private int pingTime=10;
 
     /**
-     * 心跳,心跳包传递任务状态,集群实时更新任务状态
+     * 操作主题
      */
-    private transient String pingTopic ="cluster-unique-task-ping-%s";
+    private Task.State taskState;
+
+    private transient final AbstractExecutionContext context;
+
+    private transient final RedissonClient redissonClient;
 
     /**
-     * 操作主题
+     * 心跳,心跳包传递任务状态,集群实时更新任务状态
      */
-    private transient String operationTopic ="cluster-unique-task-operation-%s";
+    private transient String pingTopic ="cluster-unique-task-ping-%s";
 
-    private Task.State taskState;
+    private transient String operationTopic ="cluster-unique-task-operation-%s";
 
     private transient RLock lock;
 
@@ -89,7 +93,7 @@ class ClusterUniqueTask implements Task ,Serializable{
 
     private transient final TaskExecutor executor;
 
-    private AtomicBoolean isAlive=new AtomicBoolean(true);
+    private transient AtomicBoolean isAlive=new AtomicBoolean(true);
 
     private transient Disposable handleDisposable =null;
 
@@ -170,6 +174,9 @@ class ClusterUniqueTask implements Task ,Serializable{
                                     //获取锁成功后改写工作节点id
                                     this.workerId=this.getCurrentSeverId();
                                     isAlive.set(true);
+                                }else {
+                                    //等待心跳传递过来
+                                    this.workerId=null;
                                 }
                             }catch (InterruptedException e){
                             }
@@ -185,6 +192,8 @@ class ClusterUniqueTask implements Task ,Serializable{
                     try {
                         if (lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS)) {
                             this.workerId=this.getCurrentSeverId();
+                        }else {
+                            this.workerId=null;
                         }
                     } catch (InterruptedException e) {
                     }