Forráskód Böngészése

add 规则引擎集群

18339543638 4 éve
szülő
commit
d180550b8a

+ 65 - 67
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/cluster/ClusterUniqueTask.java

@@ -39,6 +39,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @Slf4j
 class ClusterUniqueTask implements Task ,Serializable{
 
+    private String id;
+
     private static final long serialVersionUID=1L;
     /**
      * 工作服务器节点id
@@ -69,7 +71,7 @@ class ClusterUniqueTask implements Task ,Serializable{
     /**
      * 默认心跳时间10s
      */
-    private int pingTime=10;
+    private static final int pingTime=10;
 
     /**
      * 操作主题
@@ -83,9 +85,9 @@ class ClusterUniqueTask implements Task ,Serializable{
     /**
      * 心跳,心跳包传递任务状态,集群实时更新任务状态
      */
-    private transient String pingTopic ="cluster-unique-task-ping-%s";
+    private transient String pingTopic ="task-ping-%s";
 
-    private transient String operationTopic ="cluster-unique-task-operation-%s";
+    private transient String operationTopic ="task-operation-%s";
 
     private transient RLock lock;
 
@@ -117,9 +119,10 @@ class ClusterUniqueTask implements Task ,Serializable{
         this.executor = executor;
         this.currentSeverId=currentSeverId;
         this.workerId=currentSeverId;
+        this.id=context.getInstanceId()+"-"+context.getJob().getNodeId();
         this.clusterManager=clusterManager;
-        this.pingTopic=String.format(this.pingTopic,context.getInstanceId());
-        this.operationTopic=String.format(this.operationTopic,context.getInstanceId());
+        this.pingTopic=String.format(this.pingTopic,this.id);
+        this.operationTopic=String.format(this.operationTopic,this.id);
         this.lock = redissonClient.getLock("cluster-unique-"+this.getId());
         this.redissonClient=redissonClient;
         //先创建本地任务,再争夺任务的唯一锁,避免消息漏发
@@ -127,14 +130,9 @@ class ClusterUniqueTask implements Task ,Serializable{
             .subscribe();
     }
 
-    @Override
-    public String getId(){
-        return this.getContext().getInstanceId();
-    }
-
     @Override
     public String getName() {
-        return executor.getName();
+        return this.executor.getName();
     }
 
     @Override
@@ -144,12 +142,13 @@ class ClusterUniqueTask implements Task ,Serializable{
 
     @Override
     public Mono<Void> setJob(ScheduleJob job) {
-        return operation(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job),currentSeverId));
+        return operation(OperationMessage.of(TaskOperation.JOB,Collections.singletonList(job),this.currentSeverId));
     }
 
 
 
     private Mono<Void> init(){
+
         return generatePingMsg()
             .mergeWith(this.handlePingMsg())
             .then();
@@ -163,18 +162,18 @@ class ClusterUniqueTask implements Task ,Serializable{
         return Flux.interval(Duration.ofSeconds(pingTime/2))
             .filter(ignore->{
                 if (isReplica()) {
-                    heldLockThread.execute(()->{
-                        if (lock.isHeldByThread(Thread.currentThread().getId())) {
+                    this.heldLockThread.execute(()->{
+                        if (this.lock.isHeldByThread(Thread.currentThread().getId())) {
                             //非工作节点且占据锁,则解锁
-                            lock.unlock();
-                        }else if(!isAlive.get()&&!lock.isLocked()){
+                            this.lock.unlock();
+                        }else if(!this.isAlive.get()&&!this.lock.isLocked()){
                             //争夺锁
                             try {
                                 //获取锁
-                                if (lock.tryLock(-1, pingTime, TimeUnit.SECONDS)) {
+                                if (this.lock.tryLock(-1, pingTime, TimeUnit.SECONDS)) {
                                     //获取锁成功后改写工作节点id
                                     this.workerId=this.getCurrentSeverId();
-                                    isAlive.set(true);
+                                    this.isAlive.set(true);
                                 }else {
                                     //等待心跳传递过来
                                     this.workerId=null;
@@ -190,13 +189,15 @@ class ClusterUniqueTask implements Task ,Serializable{
             .doOnNext(ignore->{
                 //锁续约
                 AtomicBoolean finished= new AtomicBoolean(false);
-                heldLockThread.execute(()->{
+                this.heldLockThread.execute(()->{
                     try {
-                        if (lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS)) {
+                        if (this.lock.tryLock(-1,pingTime/2, TimeUnit.SECONDS)) {
                             this.workerId=this.getCurrentSeverId();
                         }else {
                             this.workerId=null;
-                            generatePingMsgDisposable.dispose();
+                            if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
+                                this.generatePingMsgDisposable.dispose();
+                            }
                         }
 
                     } catch (InterruptedException e) {
@@ -207,14 +208,14 @@ class ClusterUniqueTask implements Task ,Serializable{
                 while (!finished.get()){
                 }
                 if(!isReplica()){
-                    if(State.running.equals(taskState)&&!State.running.equals(executor.getState())){
-                        executor.start();
+                    if(State.running.equals(this.taskState)&&!State.running.equals(this.executor.getState())){
+                        this.executor.start();
                     }
-                    if(handleDisposable!=null&&!handleDisposable.isDisposed()){
-                        handleDisposable.dispose();
+                    if(this.handleDisposable!=null&&!this.handleDisposable.isDisposed()){
+                        this.handleDisposable.dispose();
                     }
-                    if(generatePingMsgDisposable==null||generatePingMsgDisposable.isDisposed()){
-                        generatePingMsgDisposable = clusterManager.getTopic(pingTopic).publish(Mono.just(this)).subscribe();
+                    if(this.generatePingMsgDisposable==null||this.generatePingMsgDisposable.isDisposed()){
+                        this.generatePingMsgDisposable = this.clusterManager.getTopic(pingTopic).publish(Mono.just(this)).subscribe();
                     }
                 }
 
@@ -229,40 +230,37 @@ class ClusterUniqueTask implements Task ,Serializable{
      */
     private Mono<Void> handlePingMsg(){
         return Flux.interval(Duration.ofSeconds(pingTime/3))
-            .filter(ignore-> (handleDisposable ==null|| handleDisposable.isDisposed())&&(generatePingMsgDisposable==null||generatePingMsgDisposable.isDisposed()))
+            .filter(ignore-> (this.handleDisposable ==null|| this.handleDisposable.isDisposed())&&(this.generatePingMsgDisposable==null||this.generatePingMsgDisposable.isDisposed()))
             .doOnNext(ignore->{
-                    if(generatePingMsgDisposable!=null&&!generatePingMsgDisposable.isDisposed()){
-                        if(handleDisposable!=null&&!handleDisposable.isDisposed()){
-                            handleDisposable.isDisposed();
+                    if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
+                        if(this.handleDisposable!=null&&!this.handleDisposable.isDisposed()){
+                            this.handleDisposable.isDisposed();
                         }
                         return;
                     }
-                    handleDisposable = clusterManager.getTopic(pingTopic)
+                this.handleDisposable = this.clusterManager.getTopic(this.pingTopic)
                         .subscribePattern()
-                        .mergeWith(clusterManager.getTopic(operationTopic).subscribePattern())
-                        .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> {
-                            isAlive.set(false);
-                            workerId=null;
-                        }))
+                        .mergeWith(this.clusterManager.getTopic(this.operationTopic).subscribePattern())
+                        .timeout(Duration.ofSeconds(Math.multiplyExact(pingTime,2)), Mono.fromRunnable(() -> this.isAlive.set(false)))
                         .doOnNext(__->{
-                            if(executor.getState().equals(State.running)){
+                            if(isReplica()&&this.executor.getState().equals(State.running)){
                                 this.taskState=State.running;
-                                executor.pause();
+                                this.executor.pause();
                             }
                         })
                         .publishOn(Schedulers.boundedElastic())
                         .flatMap(obj -> {
-                            if(generatePingMsgDisposable!=null&&!generatePingMsgDisposable.isDisposed()){
-                                generatePingMsgDisposable.dispose();
+                            if(this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed()){
+                                this.generatePingMsgDisposable.dispose();
                             }
                             Object message = obj.getMessage();
-                            isAlive.set(true);
+                            this.isAlive.set(true);
                             if (message instanceof ClusterUniqueTask) {
                                 //心跳信息
                                 ClusterUniqueTask task = (ClusterUniqueTask) message;
                                 if(task.getCurrentSeverId().equals(this.workerId)){
                                     //本地节点关闭
-                                    handleDisposable.dispose();
+                                    this.handleDisposable.dispose();
                                     return Mono.empty();
                                 }
                                 this.workerId = task.currentSeverId;
@@ -273,7 +271,7 @@ class ClusterUniqueTask implements Task ,Serializable{
                                 //操作信息
                                 OperationMessage operationMessage = (OperationMessage) message;
                                 if(operationMessage.fromServerId.equals(this.workerId)){
-                                    handleDisposable.dispose();
+                                    this.handleDisposable.dispose();
                                     return Mono.empty();
                                 }
                                 //任务存活
@@ -293,22 +291,22 @@ class ClusterUniqueTask implements Task ,Serializable{
 
     @Override
     public Mono<Void> start() {
-        return operation(OperationMessage.of(TaskOperation.START,currentSeverId));
+        return operation(OperationMessage.of(TaskOperation.START,this.currentSeverId));
     }
 
     @Override
     public Mono<Void> pause() {
-        return operation(OperationMessage.of(TaskOperation.PAUSE,currentSeverId));
+        return operation(OperationMessage.of(TaskOperation.PAUSE,this.currentSeverId));
     }
 
     @Override
     public Mono<Void> shutdown() {
-        return operation(OperationMessage.of(TaskOperation.SHUTDOWN,currentSeverId));
+        return operation(OperationMessage.of(TaskOperation.SHUTDOWN,this.currentSeverId));
     }
 
     @Override
     public Mono<Void> execute(RuleData data) {
-        return operation(OperationMessage.of(TaskOperation.EXECUTE,Collections.singletonList(data),currentSeverId));
+        return operation(OperationMessage.of(TaskOperation.EXECUTE,Collections.singletonList(data),this.currentSeverId));
     }
 
     @Override
@@ -322,67 +320,67 @@ class ClusterUniqueTask implements Task ,Serializable{
 
     @Override
     public Mono<Void> debug(boolean debug) {
-        return operation(debug ?OperationMessage.of(TaskOperation.ENABLE_DEBUG,currentSeverId)  :OperationMessage.of(TaskOperation.DISABLE_DEBUG,currentSeverId));
+        return operation(debug ?OperationMessage.of(TaskOperation.ENABLE_DEBUG,this.currentSeverId)  :OperationMessage.of(TaskOperation.DISABLE_DEBUG,this.currentSeverId));
     }
 
     @Override
     public Mono<Long> getLastStateTime() {
-        return Mono.just(lastStateTime);
+        return Mono.just(this.lastStateTime);
     }
 
     @Override
     public Mono<Long> getStartTime() {
-        return Mono.just(startTime);
+        return Mono.just(this.startTime);
     }
 
     private Mono<Void> operation(OperationMessage message) {
         TaskOperation operation=message.operation;
         this.taskState=operation.getState();
         if(isReplica()&&
-            (generatePingMsgDisposable!=null&&!generatePingMsgDisposable.isDisposed())){
+            (this.generatePingMsgDisposable!=null&&!this.generatePingMsgDisposable.isDisposed())){
             //当前为任务副本,传递给任务执行者
-            return  clusterManager.getTopic(operationTopic)
+            return  this.clusterManager.getTopic(this.operationTopic)
                 .publish(Mono.just(message))
                 .then();
         }
         long currentTimeMillis = System.currentTimeMillis();
-        lastStateTime=currentTimeMillis;
+        this.lastStateTime=currentTimeMillis;
         //该任务不是其他任务的副本,即运行实例位于该机器中,改变状态后进行广播操作
         switch (operation){
             case JOB:
                 return Mono.fromRunnable(() -> {
                     ScheduleJob job = (ScheduleJob) message.getParams().get(0);
-                    ScheduleJob old = context.getJob();
-                    context.setJob(job);
+                    ScheduleJob old = this.context.getJob();
+                    this.context.setJob(job);
                     try {
-                        executor.validate();
+                        this.executor.validate();
                     } catch (Throwable e) {
-                        context.setJob(old);
+                        this.context.setJob(old);
                         throw e;
                     }
                 });
             case START:
-                return Mono.<Void>fromRunnable(executor::start)
-                    .doOnSuccess((v) -> startTime =currentTimeMillis)
+                return Mono.<Void>fromRunnable(this.executor::start)
+                    .doOnSuccess((v) ->this. startTime =currentTimeMillis)
                     .subscribeOn(Schedulers.boundedElastic());
             case PAUSE:
-                return Mono.fromRunnable(executor::pause);
+                return Mono.fromRunnable(this.executor::pause);
             case RELOAD:
-                return Mono.<Void>fromRunnable(executor::reload)
+                return Mono.<Void>fromRunnable(this.executor::reload)
                     .subscribeOn(Schedulers.boundedElastic());
             case SHUTDOWN:
                 this.taskState=State.shutdown;
                 //解锁
-                heldLockThread.execute(()->lock.unlock());
+                this.heldLockThread.execute(()->this.lock.unlock());
                 break;
             case ENABLE_DEBUG:
-                return Mono.fromRunnable(() -> context.setDebug(true));
+                return Mono.fromRunnable(() ->this. context.setDebug(true));
             case DISABLE_DEBUG:
-                return Mono.fromRunnable(() -> context.setDebug(false));
+                return Mono.fromRunnable(() -> this.context.setDebug(false));
             case EXECUTE:
-                if(executor instanceof ExecutableTaskExecutor){
+                if(this.executor instanceof ExecutableTaskExecutor){
                     RuleData data = (RuleData) message.getParams().get(0);
-                    return ((ExecutableTaskExecutor) executor).execute(data);
+                    return ((ExecutableTaskExecutor) this.executor).execute(data);
                 }
                 return Mono.empty();
 

+ 2 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/RuleSceneTaskExecutorProvider.java

@@ -154,7 +154,7 @@ public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
             Flux<Map<String, Object>> result =null;
             List<Object> binds = new ArrayList<>();
             for (RuleSceneEntity.Trigger triggerMix : rule.getTriggers()) {
-            RuleSceneEntity.DeviceTrigger trigger = rule.getTriggers().get(0).getDevice();
+            RuleSceneEntity.DeviceTrigger trigger = triggerMix.getDevice();
             String topic = trigger.getType().getTopic(trigger.getProductId(), trigger.getDeviceId(), trigger.getModelId());
             binds.addAll(trigger.toFilterBinds());
             //订阅主题
@@ -202,7 +202,7 @@ public class RuleSceneTaskExecutorProvider implements TaskExecutorProvider {
                             return tp2.getT2();
                         }));
             }
-                result=result==null?resultFlux:result.concatWith(resultFlux);
+                result=result==null?resultFlux:result.mergeWith(resultFlux);
             }
             return result;
         }