18339543638 4 роки тому
батько
коміт
b5935b2c29

+ 24 - 49
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -40,6 +40,7 @@ import java.util.function.Function;
 @Slf4j
 public class ClusterUniqueTask implements Task ,Serializable{
 
+    private static final long serialVersionUID=1L;
     private String workerId;
 
     /**
@@ -91,8 +92,6 @@ public class ClusterUniqueTask implements Task ,Serializable{
 
     private transient final TaskExecutor executor;
 
-    private transient final UnicastProcessor<ClusterTopic.TopicMessage> listenPing= UnicastProcessor.create();
-
     /**
      * 监听任务
      */
@@ -115,7 +114,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
     @Override
     public Mono<Void> setJob(ScheduleJob job) {
         if(isReplica()){
-            return null;
+            return operation(OperationMessage.of(TaskOperation.JOB,Arrays.asList(job)));
         }
         return Mono.fromRunnable(() -> {
             ScheduleJob old = context.getJob();
@@ -147,8 +146,7 @@ public class ClusterUniqueTask implements Task ,Serializable{
         this.redissonClient=redissonClient;
         //先创建本地任务,再争夺任务的唯一锁,避免消息漏发
         initUniqueTask()
-            .flatMap(ignore->handlePingMessage()).
-            subscribe();
+            .subscribe();
     }
 
     private Mono<Void> initUniqueTask(){
@@ -293,20 +291,38 @@ public class ClusterUniqueTask implements Task ,Serializable{
         listenDisposable = clusterManager.getTopic(pingTopic)
             .subscribePattern()
             .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
-            .filter(ignore -> listenPing.hasDownstreams())
-            .doOnNext(listenPing::onNext)
             .timeout(Duration.ofSeconds(pingTime),
                 Mono.empty()
                     .doOnNext(ignore -> isDead = true)
                     .concatWith(initUniqueTask())
                     .then(Mono.empty()))
-            .subscribe();
+            .publishOn(Schedulers.boundedElastic())
+            .subscribe(obj->{
+                isDead=false;
+                Object message = obj.getMessage();
+                if (message instanceof ClusterUniqueTask) {
+                    //心跳信息
+                    ClusterUniqueTask task = (ClusterUniqueTask) message;
+                    this.workerId = task.currentSeverId;
+                    this.taskState = task.taskState;
+                    this.lastStateTime = task.lastStateTime;
+                    this.startTime = task.startTime;
+                } else if (message instanceof OperationMessage) {
+                    //操作信息
+                    OperationMessage operationMessage = (OperationMessage) message;
+                    if(!isDead){
+                        //任务存活
+                         operation(operationMessage).subscribe();
+                    }
+                }
+            });
     }
     enum TaskOperation implements Serializable {
         START,
         PAUSE,
         RELOAD,
         SHUTDOWN,
+        JOB,
         EXECUTE,
         ENABLE_DEBUG,
         DISABLE_DEBUG
@@ -330,45 +346,4 @@ public class ClusterUniqueTask implements Task ,Serializable{
             return new OperationMessage(operation,null);
         }
     }
-
-    private Flux<ClusterTopic.TopicMessage> getPingMessage(){
-        return listenPing.map(Function.identity());
-    }
-
-    private Mono<Void> handlePingMessage(){
-        return getPingMessage()
-            .flatMap(obj->{
-                isDead=false;
-                Object message = obj.getMessage();
-                if (message instanceof ClusterUniqueTask) {
-                    ClusterUniqueTask task = (ClusterUniqueTask) message;
-                    //心跳信息
-                    return Mono.just(task);
-                } else if (message instanceof TaskOperation) {
-                    //操作信息
-                    TaskOperation task = (TaskOperation) message;
-                    return Mono.just(task);
-                }
-                return Mono.empty();
-            })
-            .flatMap(msg -> {
-                if (msg instanceof ClusterUniqueTask) {
-                    //同步心跳信息
-                    ClusterUniqueTask task = (ClusterUniqueTask) msg;
-                    this.workerId = task.currentSeverId;
-                    this.taskState = task.taskState;
-                    this.lastStateTime = task.lastStateTime;
-                    this.startTime = task.startTime;
-                    return Mono.empty();
-                } else if (msg instanceof OperationMessage) {
-                    //同步操作信息
-                    OperationMessage message = (OperationMessage) msg;
-                    if(!isDead){
-                        //任务存活
-                        return operation(message);
-                    }
-                }
-                return Mono.empty();
-            }).then();
-    }
 }