18339543638 пре 4 година
родитељ
комит
7136805ad8
19 измењених фајлова са 273 додато и 47 уклоњено
  1. 34 35
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/auth/MqttDefaultAuth.java
  2. 1 1
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java
  3. 6 0
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetLinksExtendProtocolSupportProvider.java
  4. 4 2
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetlinksExtendTopicMessageCodec.java
  5. 11 0
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/gb28181/GB28181MessageCodec.java
  6. 20 0
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/gb28181/GB28181ProtocolSupportProvider.java
  7. 2 0
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineConfiguration.java
  8. 23 0
      jetlinks-core/src/main/java/org/jetlinks/core/message/CoordinateMessage.java
  9. 3 0
      jetlinks-core/src/main/java/org/jetlinks/core/message/MessageType.java
  10. 1 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceInstanceEntity.java
  11. 11 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/enums/DeviceLogType.java
  12. 2 0
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java
  13. 21 2
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java
  14. 0 1
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java
  15. 1 1
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/RuleSceneEntity.java
  16. 22 1
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/RuleSceneService.java
  17. 1 2
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageConnector.java
  18. 19 0
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/message/ClusterMessage.java
  19. 91 0
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ruleEngine/ClusterRuleEngine.java

+ 34 - 35
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/auth/MqttDefaultAuth.java

@@ -2,16 +2,18 @@ package org.jetlinks.community.network.mqtt.auth;
 
 import cn.hutool.core.util.StrUtil;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.Value;
-import org.jetlinks.core.device.AuthenticationRequest;
-import org.jetlinks.core.device.AuthenticationResponse;
-import org.jetlinks.core.device.DeviceOperator;
-import org.jetlinks.core.device.MqttAuthenticationRequest;
+import org.jetlinks.core.device.*;
+import org.jetlinks.core.exception.DeviceOperationException;
+import org.jetlinks.core.message.codec.DefaultTransport;
+import org.jetlinks.core.message.codec.Transport;
 import org.jetlinks.supports.official.JetLinksAuthenticator;
 import reactor.core.publisher.Mono;
-
 import javax.annotation.Nonnull;
-import java.util.concurrent.TimeUnit;
+
+import static org.jetlinks.core.enums.ErrorCode.UNSUPPORTED_MESSAGE;
 
 /**
  * @author lifang
@@ -25,40 +27,37 @@ public class MqttDefaultAuth extends JetLinksAuthenticator {
     public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator deviceOperation) {
         if (request instanceof MqttAuthenticationRequest) {
             MqttAuthenticationRequest mqtt = ((MqttAuthenticationRequest) request);
-            // secureId|timestamp
+            Transport transport = request.getTransport();
+            if(!transport.equals(DefaultTransport.MQTT) &&!transport.equals(DefaultTransport.MQTT_TLS)){
+                //非mqtt协议
+                return Mono.just(AuthenticationResponse.error(400, "设备不支持的连接协议类型:" + transport.getName()));
+            }
             String username = mqtt.getUsername();
-            // md5(secureId|timestamp|secureKey)
             String password = mqtt.getPassword();
-            String requestSecureId;
+
             try {
-//                String[] arr = username.split("[|]");
-//                if (arr.length <= 1) {
-//                    return Mono.just(AuthenticationResponse.error(401, "用户名格式错误"));
-//                }
-//                requestSecureId = arr[0];
-//                long time = Long.parseLong(arr[1]);
-                //和设备时间差大于5分钟则认为无效
-//                if (Math.abs(System.currentTimeMillis() - time) > TimeUnit.MINUTES.toMillis(5)) {
-//                    return Mono.just(AuthenticationResponse.error(401, "设备时间不同步"));
-//                }
                 return deviceOperation
-                    .getConfigs("secureId", "secureKey")
-                    .map(conf -> {
-                        String secureId = conf.getValue("secureId").map(Value::asString).orElse(null);
+                    .getProduct()
+                    .flatMap(deviceProductOperator -> deviceProductOperator.getProtocol()
+                        .flatMap(protocolSupport -> protocolSupport.authenticate(request,deviceOperation)))
+                    .switchIfEmpty(Mono.error(()->new DeviceOperationException(UNSUPPORTED_MESSAGE)))
+                    .flatMap(ignore->deviceOperation
+                        .getConfigs("secureId", "secureKey")
+                        .map(conf -> {
+                            String secureId = conf.getValue("secureId").map(Value::asString).orElse(null);
 
-                        String secureKey = conf
-                            .getValue("secureKey")
-                            .map(Value::asString)
-                            .orElse(null);
-                        //签名
-                        String digest = DigestUtils.md5Hex(username + "|" + secureKey);
-                        if ((StrUtil.isEmpty(secureId)||username.equals(secureId))
-                            && (StrUtil.isEmpty(secureKey)||password.equals(secureKey))) {
-                            return AuthenticationResponse.success(deviceOperation.getDeviceId());
-                        } else {
-                            return AuthenticationResponse.error(401, "密钥错误");
-                        }
-                    });
+                            String secureKey = conf
+                                .getValue("secureKey")
+                                .map(Value::asString)
+                                .orElse(null);
+                            //签名
+                            if ((StrUtil.isEmpty(secureId)||username.equals(secureId))
+                                && (StrUtil.isEmpty(secureKey)||password.equals(secureKey))) {
+                                return AuthenticationResponse.success(deviceOperation.getDeviceId());
+                            } else {
+                                return AuthenticationResponse.error(401, "密钥错误");
+                            }
+                        }));
             } catch (NumberFormatException e) {
                 return Mono.just(AuthenticationResponse.error(401, "用户名格式错误"));
             }

+ 1 - 1
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java

@@ -96,7 +96,7 @@ public class MqttClientDeviceGateway implements DeviceGateway {
         disposable
             .add(mqttClient
                      .subscribe(topics)
-//                     .filter((msg) -> started.get())
+                     .filter((msg) -> started.get())
                      .flatMap(mqttMessage -> {
                          AtomicReference<Duration> timeoutRef = new AtomicReference<>();
                          return this

+ 6 - 0
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetLinksExtendProtocolSupportProvider.java

@@ -69,8 +69,14 @@ public class JetLinksExtendProtocolSupportProvider implements ProtocolSupportPro
 
             support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.MQTT));
             support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS));
+
             support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.INNER));
 
+//            support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.TCP));
+//            support.addMessageCodecSupport(new JetLinksExtendMqttDeviceMessageCodec(DefaultTransport.TCP_TLS));
+
+
+
             support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec());
             support.addMessageCodecSupport(new JetLinksCoapDTLSDeviceMessageCodec());
 

+ 4 - 2
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/JetlinksExtendTopicMessageCodec.java

@@ -44,7 +44,7 @@ public class JetlinksExtendTopicMessageCodec {
         private boolean reportFirmware;
         private boolean upgradeFirmwareProgress;
         private boolean readFirmwareReply;
-
+        private boolean coordinate;
         private boolean timeSync;
         private boolean timeSyncReply;
         private boolean direct;
@@ -77,7 +77,7 @@ public class JetlinksExtendTopicMessageCodec {
             } else if (readFirmwareReply = topic.endsWith("firmware/read/reply")) {
             } else if (derivedMetadata = topic.endsWith("metadata/derived")) {
             } else if(  tag= topic.endsWith("/tags")){
-            }
+            }else if(coordinate=topic.endsWith("/coordinate")){}
             if(timeSync=topic.endsWith("/time-sync")){
 
             }else if(direct=topic.endsWith("/direct")){
@@ -220,6 +220,8 @@ public class JetlinksExtendTopicMessageCodec {
             message=object.toJavaObject(LogMessage.class);
         }else if(result.isTag()){
             message=object.toJavaObject(UpdateTagMessage.class);
+        }else if(result.isCoordinate()){
+            message=object.toJavaObject(CoordinateMessage.class);
         }
 
         if (result.isChild()) {

+ 11 - 0
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/gb28181/GB28181MessageCodec.java

@@ -0,0 +1,11 @@
+package org.jetlinks.community.support.gb28181;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName GB28181MessageCodec.java
+ * @Description TODO
+ * @createTime 2021年11月10日 14:29:00
+ */
+public class GB28181MessageCodec {
+}

+ 20 - 0
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/gb28181/GB28181ProtocolSupportProvider.java

@@ -0,0 +1,20 @@
+package org.jetlinks.community.support.gb28181;
+
+import org.jetlinks.core.ProtocolSupport;
+import org.jetlinks.core.spi.ProtocolSupportProvider;
+import org.jetlinks.core.spi.ServiceContext;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName GB28181ProtpcplSupportProvider.java
+ * @Description TODO
+ * @createTime 2021年11月10日 14:30:00
+ */
+public class GB28181ProtocolSupportProvider implements ProtocolSupportProvider {
+    @Override
+    public Mono<? extends ProtocolSupport> create(ServiceContext context) {
+        return null;
+    }
+}

+ 2 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineConfiguration.java

@@ -7,6 +7,7 @@ import org.jetlinks.rule.engine.api.scheduler.Scheduler;
 import org.jetlinks.rule.engine.api.task.ConditionEvaluator;
 import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
 import org.jetlinks.rule.engine.api.worker.Worker;
+import org.jetlinks.rule.engine.cluster.ClusterRuleEngine;
 import org.jetlinks.rule.engine.condition.ConditionEvaluatorStrategy;
 import org.jetlinks.rule.engine.condition.DefaultConditionEvaluator;
 import org.jetlinks.rule.engine.condition.supports.DefaultScriptEvaluator;
@@ -95,6 +96,7 @@ public class RuleEngineConfiguration {
 
     @Bean
     public RuleEngine defaultRuleEngine(Scheduler scheduler) {
+//        ClusterRuleEngine clusterRuleEngine = new ClusterRuleEngine();
         return new DefaultRuleEngine(scheduler);
     }
 

+ 23 - 0
jetlinks-core/src/main/java/org/jetlinks/core/message/CoordinateMessage.java

@@ -0,0 +1,23 @@
+package org.jetlinks.core.message;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName CoordinateMessage.java
+ * @Description TODO
+ * @createTime 2021年11月09日 15:50:00
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class CoordinateMessage extends CommonDeviceMessage{
+
+    private double lon;
+    private double lat;
+    @Override
+    public MessageType getMessageType() {
+        return MessageType.COORDINATE;
+    }
+}

+ 3 - 0
jetlinks-core/src/main/java/org/jetlinks/core/message/MessageType.java

@@ -71,6 +71,9 @@ public enum MessageType {
     //断开回复
     DISCONNECT_REPLY(DisconnectDeviceMessageReply::new),
 
+    //地理坐标
+    COORDINATE(CoordinateMessage::new),
+
     //派生属性
     DERIVED_METADATA(DerivedMetadataMessage::new),
 

+ 1 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceInstanceEntity.java

@@ -69,7 +69,7 @@ public class DeviceInstanceEntity extends GenericEntity<String> implements Recor
     private String productName;
 
 
-    //todo
+
     @Comment("地理位置")
     @Column(name = "location")
     @JsonCodec

+ 11 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/enums/DeviceLogType.java

@@ -30,7 +30,10 @@ public enum DeviceLogType implements EnumDict<String> {
     tag("标签更新"),
     offline("离线"),
     online("上线"),
-    other("其它");
+    other("其它"),
+    readFirmware("读取固件"),
+    upgradeFirmware("更新固件"),
+    coordinate("读取坐标");
 
 
     @JSONField(serialize = false)
@@ -66,6 +69,13 @@ public enum DeviceLogType implements EnumDict<String> {
         typeMapping.put(MessageType.REGISTER, register);
         typeMapping.put(MessageType.UN_REGISTER, unregister);
 
+        typeMapping.put(MessageType.READ_FIRMWARE, readFirmware);
+        typeMapping.put(MessageType.UPGRADE_FIRMWARE, upgradeFirmware);
+
+        typeMapping.put(MessageType.COORDINATE, coordinate);
+
+        ;
+
     }
 
     public static DeviceLogType of(DeviceMessage message) {

+ 2 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java

@@ -100,6 +100,8 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
         createFastBuilder(MessageType.DISCONNECT, "/disconnect");
         //断开连接回复
         createFastBuilder(MessageType.DISCONNECT_REPLY, "/disconnect/reply");
+        //坐标
+        createFastBuilder(MessageType.COORDINATE, "/coordinate");
         //子设备消息
         createFastBuilder(MessageType.CHILD, (message, builder) -> {
             Message msg = ((ChildDeviceMessage) message).getChildDeviceMessage();

+ 21 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -11,12 +11,16 @@ import org.hswebframework.ezorm.rdb.operator.dml.Terms;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.hswebframework.web.exception.BusinessException;
 import org.hswebframework.web.id.IDGenerator;
+import org.jetlinks.community.device.configuration.GeoIndexProvider;
+import org.jetlinks.community.device.dto.DeviceGeoDto;
 import org.jetlinks.community.device.entity.*;
 import org.jetlinks.community.device.enums.DeviceFeature;
 import org.jetlinks.community.device.enums.DeviceState;
 import org.jetlinks.community.device.response.DeviceDeployResult;
 import org.jetlinks.community.device.response.DeviceDetail;
 import org.jetlinks.community.device.session.MockSession;
+import org.jetlinks.community.elastic.search.service.ElasticSearchService;
+import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.community.utils.ErrorUtils;
 import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceOperator;
@@ -24,6 +28,7 @@ import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.enums.ErrorCode;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.exception.DeviceOperationException;
+import org.jetlinks.core.message.CoordinateMessage;
 import org.jetlinks.core.message.DeviceMessageReply;
 import org.jetlinks.core.message.FunctionInvokeMessageSender;
 import org.jetlinks.core.message.WritePropertyMessageSender;
@@ -32,6 +37,7 @@ import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
 import org.jetlinks.core.message.property.ReadPropertyMessageReply;
 import org.jetlinks.core.message.property.WritePropertyMessageReply;
 import org.jetlinks.core.metadata.*;
+import org.jetlinks.core.metadata.types.GeoPoint;
 import org.jetlinks.core.metadata.types.StringType;
 import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
@@ -66,8 +72,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
 
     private final DeviceSessionManager deviceSessionManager;
 
-
-
+    private final ElasticSearchService elasticSearchService;
     @Override
     public Mono<SaveResult> save(Publisher<DeviceInstanceEntity> entityPublisher) {
         return Flux.from(entityPublisher)
@@ -470,4 +475,18 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
            .then();
     }
 
+    @Subscribe("/device/**/**/coordinate")
+    public Mono<Void> handleCoordinate(CoordinateMessage message){
+        GeoPoint geoPoint = new GeoPoint(message.getLon(), message.getLat());
+        return this.createUpdate()
+            .where(DeviceInstanceEntity::getId,message.getDeviceId())
+            .set(DeviceInstanceEntity::getLocation,geoPoint)
+            .execute()
+            .flatMap(ignore->
+                this.findById(message.getDeviceId())
+                    .map(DeviceGeoDto::of)
+                    .flatMap(geo-> elasticSearchService.save(GeoIndexProvider.GEO.getIndex(),geo))
+            )
+            .then();
+    }
 }

+ 0 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java

@@ -39,7 +39,6 @@ import org.jetlinks.community.device.web.excel.DeviceExcelInfo;
 import org.jetlinks.community.device.web.excel.DeviceWrapper;
 import org.jetlinks.community.device.web.request.AggRequest;
 import org.jetlinks.community.device.web.request.GeoRequestParam;
-import org.jetlinks.community.elastic.search.geo.EsGeoFilter;
 import org.jetlinks.community.elastic.search.service.ElasticSearchService;
 import org.jetlinks.community.io.excel.ImportExportService;
 import org.jetlinks.community.io.utils.FileUtils;

+ 1 - 1
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/RuleSceneEntity.java

@@ -35,7 +35,7 @@ import java.util.*;
  * @author lifang
  * @version 1.0.0
  * @ClassName RuleScenceEntity.java
- * @Description TODO
+ * @Description 场景联动
  * @createTime 2021年08月31日 14:59:00
  */
 @Getter

+ 22 - 1
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/RuleSceneService.java

@@ -1,13 +1,18 @@
 package org.jetlinks.community.rule.engine.service;
 
 import lombok.AllArgsConstructor;
+import lombok.extern.java.Log;
+import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.jetlinks.community.rule.engine.entity.DeviceAlarmEntity;
+import org.jetlinks.community.rule.engine.entity.RuleInstanceEntity;
 import org.jetlinks.community.rule.engine.entity.RuleSceneEntity;
 import org.jetlinks.community.rule.engine.enums.AlarmState;
+import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
 import org.jetlinks.community.rule.engine.enums.RuleSceneState;
 import org.jetlinks.rule.engine.api.RuleData;
 import org.reactivestreams.Publisher;
+import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -25,7 +30,8 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 @Service
 @AllArgsConstructor
-public class RuleSceneService extends GenericReactiveCrudService<RuleSceneEntity, String> {
+@Slf4j
+public class RuleSceneService extends GenericReactiveCrudService<RuleSceneEntity, String> implements CommandLineRunner {
     private final RuleInstanceService instanceService;
 
 
@@ -66,4 +72,19 @@ public class RuleSceneService extends GenericReactiveCrudService<RuleSceneEntity
     public Mono<Void> execute(String id) {
         return instanceService.execute(id, RuleData.create(null));
     }
+
+    @Override
+    public void run(String... args) {
+        createQuery()
+            .where()
+            .is(RuleSceneEntity::getState, RuleSceneState.started)
+            .fetch()
+            .flatMap(e -> this
+                .doStart(e)
+                .onErrorResume(err -> {
+                    log.warn("启动规则[{}]失败", e.getName(), e);
+                    return Mono.empty();
+                }))
+            .subscribe();
+    }
 }

+ 1 - 2
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ClusterDeviceMessageConnector.java

@@ -26,7 +26,6 @@ import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 /**
  * @author lifang
@@ -142,7 +141,7 @@ public class ClusterDeviceMessageConnector extends DeviceMessageConnector {
         }else {
             return  device.getConnectionServerId()
                 .flatMap(serverId->{
-                    ClusterMessage acceptMessage = messageServerMap.get(message.getMessageId());
+                    ClusterMessage acceptMessage = messageServerMap.remove(message.getMessageId());
                     if(acceptMessage==null){
                         return super.handleMessage(device, message);
                     }

+ 19 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/message/ClusterMessage.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.standalone.configuration.cluster.message;
 
+import cn.hutool.core.util.ObjectUtil;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import org.jetlinks.core.cluster.ClusterTopic;
@@ -28,6 +29,7 @@ public class ClusterMessage implements ClusterTopic.TopicMessage, Serializable {
 
     private long timestamp;
 
+    private ServiceMessage service;
     public ClusterMessage(Message payload, String fromServer, String address) {
         this.payload = payload;
         this.fromServer = fromServer;
@@ -35,6 +37,13 @@ public class ClusterMessage implements ClusterTopic.TopicMessage, Serializable {
         this.messageId = payload.getMessageId();
     }
 
+    public ClusterMessage(ServiceMessage service, String fromServer, String address) {
+        this.service = service;
+        this.fromServer = fromServer;
+        this.address = address;
+        this.messageId = payload.getMessageId();
+    }
+
     @Override
     public String getTopic() {
         return address;
@@ -44,4 +53,14 @@ public class ClusterMessage implements ClusterTopic.TopicMessage, Serializable {
     public Object getMessage() {
         return payload;
     }
+
+
+    @AllArgsConstructor(staticName = "of")
+    @Data
+    public static class ServiceMessage{
+        //任务消息
+        String id;
+        Object payload;
+        String type;
+    }
 }

+ 91 - 0
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/cluster/ruleEngine/ClusterRuleEngine.java

@@ -0,0 +1,91 @@
+package org.jetlinks.community.standalone.configuration.cluster.ruleEngine;
+
+import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
+import org.jetlinks.community.standalone.configuration.cluster.message.ClusterMessage;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.rule.engine.api.model.RuleModel;
+import org.jetlinks.rule.engine.api.scheduler.Scheduler;
+import org.jetlinks.rule.engine.api.task.Task;
+import org.jetlinks.rule.engine.defaults.DefaultRuleEngine;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName ClusterRuleEngine.java
+ * @Description TODO
+ * @createTime 2021年11月10日 16:10:00
+ */
+
+public class ClusterRuleEngine extends DefaultRuleEngine {
+    private ClusterManager clusterManager;
+    private volatile  boolean started=false;
+    private String serverId;
+    public ClusterRuleEngine(Scheduler scheduler) {
+        super(scheduler);
+    }
+
+
+    public synchronized void init(){
+        if(started){
+            return;
+        }
+        started=true;
+        //广播消息
+        clusterManager
+            .getTopic("")
+            .subscribePattern()
+            .publishOn(Schedulers.boundedElastic())
+            .flatMap(obj -> {
+                if(obj instanceof ClusterMessage){
+                    ClusterMessage clusterMessage = (ClusterMessage) obj;
+                    String fromServer = clusterMessage.getFromServer();
+                    if(this.serverId.equals(fromServer)){
+                        return Flux.empty();
+                    }
+                    ClusterMessage.ServiceMessage serviceMessage = clusterMessage.getService();
+                    String type = serviceMessage.getType();
+                    RuleInstanceState ruleInstanceState = RuleInstanceState.valueOf(type);
+                    return execute(ruleInstanceState,serviceMessage.getId(),serviceMessage.getPayload());
+                }
+                return Flux.empty();
+            }).subscribe();
+    }
+
+    private Mono<Void> execute(RuleInstanceState state, String id, Object payload){
+        RuleModel model= (RuleModel) payload;
+        switch (state){
+            case started:
+                return super.startRule(id,model).then();
+            case stopped:
+                return super.shutdown(id);
+            case disable:
+            default:return Mono.empty();
+        }
+    }
+
+    @Override
+    public Flux<Task> startRule(String instanceId, RuleModel model) {
+        ClusterMessage.ServiceMessage serviceMessage = ClusterMessage.ServiceMessage.of(instanceId, model, RuleInstanceState.started.getValue());
+        return super.startRule(instanceId, model)
+            .publishOn(Schedulers.parallel())
+            .doOnNext(ignore->clusterManager.getTopic("cluster-rule-engine")
+                .publish(Mono.just(
+                    new ClusterMessage(
+                        serviceMessage,this.serverId,null))).subscribe());
+    }
+
+    @Override
+    public Mono<Void> shutdown(String instanceId) {
+        ClusterMessage.ServiceMessage serviceMessage = ClusterMessage.ServiceMessage.of(instanceId, null, RuleInstanceState.stopped.getValue());
+        return super.shutdown(instanceId)
+            .publishOn(Schedulers.parallel())
+            .doOnNext(ignore->clusterManager.getTopic("cluster-rule-engine")
+                .publish(Mono.just(
+                    new ClusterMessage(
+                        serviceMessage,this.serverId,null))).subscribe());
+    }
+
+}