Ver código fonte

add 规则引擎集群

18339543638 4 anos atrás
pai
commit
bf39af87fa

+ 21 - 15
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -139,7 +139,7 @@ class ClusterUniqueTask implements Task ,Serializable{
 
     @Override
     public Mono<Void> setJob(ScheduleJob job) {
-        return operation(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job)));
+        return operation(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job),currentSeverId));
     }
 
 
@@ -148,8 +148,12 @@ class ClusterUniqueTask implements Task ,Serializable{
         return generatePingMsg()
             .mergeWith(this.handlePingMsg())
             .then();
-
     }
+
+    /**
+     * 产生心跳信息
+     * @return
+     */
     private Mono<Void> generatePingMsg() {
         return Flux.interval(Duration.ofSeconds(pingTime/2))
             .filter(ignore->{
@@ -204,27 +208,27 @@ class ClusterUniqueTask implements Task ,Serializable{
 
     @Override
     public Mono<Void> reload() {
-        return operation(OperationMessage.of(TaskOperation.RELOAD));
+        return operation(OperationMessage.of(TaskOperation.RELOAD,currentSeverId));
     }
 
     @Override
     public Mono<Void> start() {
-        return operation(OperationMessage.of(TaskOperation.START));
+        return operation(OperationMessage.of(TaskOperation.START,currentSeverId));
     }
 
     @Override
     public Mono<Void> pause() {
-        return operation(OperationMessage.of(TaskOperation.PAUSE));
+        return operation(OperationMessage.of(TaskOperation.PAUSE,currentSeverId));
     }
 
     @Override
     public Mono<Void> shutdown() {
-        return operation(OperationMessage.of(TaskOperation.SHUTDOWN));
+        return operation(OperationMessage.of(TaskOperation.SHUTDOWN,currentSeverId));
     }
 
     @Override
     public Mono<Void> execute(RuleData data) {
-        return operation(OperationMessage.of(TaskOperation.EXECUTE,Collections.singletonList(data)));
+        return operation(OperationMessage.of(TaskOperation.EXECUTE,Collections.singletonList(data),currentSeverId));
     }
 
     @Override
@@ -238,7 +242,7 @@ class ClusterUniqueTask implements Task ,Serializable{
 
     @Override
     public Mono<Void> debug(boolean debug) {
-        return operation(debug ?OperationMessage.of(TaskOperation.ENABLE_DEBUG)  :OperationMessage.of(TaskOperation.DISABLE_DEBUG));
+        return operation(debug ?OperationMessage.of(TaskOperation.ENABLE_DEBUG,currentSeverId)  :OperationMessage.of(TaskOperation.DISABLE_DEBUG,currentSeverId));
     }
 
     @Override
@@ -315,12 +319,12 @@ class ClusterUniqueTask implements Task ,Serializable{
     }
 
     /**
-     * 抢夺失败时
+     * 处理消息
      *
      */
     private Mono<Void> handlePingMsg(){
         return Flux.interval(Duration.ofSeconds(pingTime/3))
-            .filter(ignore-> handleDisposable ==null|| handleDisposable.isDisposed())
+            .filter(ignore-> (handleDisposable ==null|| handleDisposable.isDisposed())&&(generatePingMsgDisposable==null||generatePingMsgDisposable.isDisposed()))
             .doOnNext(ignore->
                 handleDisposable = clusterManager.getTopic(pingTopic)
                     .subscribePattern()
@@ -388,18 +392,20 @@ class ClusterUniqueTask implements Task ,Serializable{
     public static class OperationMessage implements Serializable{
         private TaskOperation operation;
         private List<Object> params;
+        private String fromServerId;
 
-        private OperationMessage(TaskOperation operation, List<Object> params) {
+        private OperationMessage(TaskOperation operation, List<Object> params, String fromServerId) {
             this.operation = operation;
             this.params = params;
+            this.fromServerId=fromServerId;
         }
 
-        public static OperationMessage of(TaskOperation operation, List<Object> params) {
-            return new OperationMessage(operation,params);
+        public static OperationMessage of(TaskOperation operation, List<Object> params,String fromServerId) {
+            return new OperationMessage(operation,params,fromServerId);
         }
 
-        public static OperationMessage of(TaskOperation operation) {
-            return new OperationMessage(operation,null);
+        public static OperationMessage of(TaskOperation operation,String fromServerId) {
+            return new OperationMessage(operation,null,fromServerId);
         }
     }
 }