瀏覽代碼

add 规则引擎集群

18339543638 4 年之前
父節點
當前提交
186d342ead

+ 10 - 5
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -305,7 +305,13 @@ class ClusterUniqueTask implements Task ,Serializable{
 
     @Override
     public Mono<Void> reload() {
-        return operation(OperationMessage.of(TaskOperation.RELOAD,currentSeverId));
+        return operation(OperationMessage.of(TaskOperation.RELOAD,currentSeverId))
+            .concatWith(Mono.fromRunnable(
+                ()->
+                    this.clusterManager.getTopic(this.operationTopic)
+                        .publish(Mono.just(OperationMessage.of(TaskOperation.RELOAD,null,this.currentSeverId)))
+                        .then()))
+            .then();
     }
 
     @Override
@@ -355,10 +361,9 @@ class ClusterUniqueTask implements Task ,Serializable{
     private Mono<Void> operation(OperationMessage message) {
         TaskOperation operation=message.operation;
         this.taskState=State.unknown.equals(operation.getState())?this.taskState:operation.getState();
-        if(isReplica()&&
-            (this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed())){
-            //当前为任务副本不在此处进行广播操作
-            if(!TaskOperation.JOB.equals(message.operation)){
+        if(isReplica()){
+            //当前为任务副本或重新加载不在此处进行广播操作
+            if(!TaskOperation.JOB.equals(message.operation)&&!TaskOperation.RELOAD.equals(message.getOperation())){
                 return  this.clusterManager.getTopic(this.operationTopic)
                     .publish(Mono.just(message))
                     .then();