Forráskód Böngészése

fixed 集群网桥

18339543638 4 éve
szülő
commit
84118e9e46

+ 0 - 1
jetlinks-core/src/main/java/org/jetlinks/core/cluster/AbstractClusterUniqueTask.java

@@ -8,7 +8,6 @@ import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
-
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.concurrent.*;

+ 2 - 3
jetlinks-core/src/main/java/org/jetlinks/core/cluster/ServerNode.java

@@ -38,9 +38,8 @@ public class ServerNode implements Serializable {
 
     private final long startTime=System.currentTimeMillis();
 
-    public void setLastKeepAlive(long lastKeepAlive) {
-        this.lastKeepAlive = lastKeepAlive;
-        this.keepAliveTime=lastKeepAlive-startTime;
+    public long getKeepAliveTime() {
+        return System.currentTimeMillis()-startTime;
     }
 
     public boolean hasTag(String tag) {

+ 0 - 46
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/Channel.java

@@ -1,46 +0,0 @@
-package org.jetlinks.community.bridge.server;
-
-import org.jetlinks.community.bridge.message.AliBridgeMessage;
-import org.jetlinks.core.server.session.DeviceSession;
-import org.reactivestreams.Publisher;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import java.util.function.Consumer;
-
-/**
- * @author lifang
- * @version 1.0.0
- * @ClassName Channel.java
- * @Description 网桥连接设备
- * @createTime 2021年11月30日 09:02:00
- */
-public interface Channel {
-
-
-    String getOriginalIdentity();
-
-    String getDeviceName();
-
-    String getProductKey();
-
-    void init();
-
-    /**
-     * 向云端推送设备
-     * 当设备连接到网桥服务器上时,设备向网桥服务器发送消息,则此时网桥服务器会通过网桥连接将消息推送至云端
-     * @param message 设备消息
-     * @return
-     * @see Mono <Void>
-     */
-    void upLink(AliBridgeMessage message);
-
-    /**
-     * 设备上线
-     * @return
-     */
-    void online();
-
-    void doOffline();
-
-    void close();
-}

+ 3 - 2
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -19,7 +19,6 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import reactor.core.publisher.*;
 import reactor.core.scheduler.Schedulers;
-import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -95,7 +94,9 @@ public class AliBridgeGateway{
                                 resource.getId(),broadcast);
                             return new BusinessException(error.getMessage());
                         }))
-            ).flatMap(server -> server.initBridge(resource,broadcast));
+            )
+            .flatMap(server -> server.initBridge(resource,broadcast))
+            .flatMap(AliBridgeServer::refreshAllChannel);
     }
 
 

+ 19 - 10
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -20,7 +20,6 @@ import org.jetlinks.community.bridge.core.DefaultUplinkChannelHandler;
 import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.enums.BridgeStatus;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
-import org.jetlinks.community.bridge.server.Channel;
 import org.jetlinks.core.cluster.AbstractClusterUniqueTask;
 import org.jetlinks.core.cluster.ClusterManager;
 import org.jetlinks.core.device.DeviceOperator;
@@ -30,6 +29,7 @@ import org.redisson.api.RedissonClient;
 import reactor.core.publisher.*;
 import javax.validation.constraints.NotNull;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 @Slf4j
 public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer> {
-    private transient Map<String, Channel> channelMap=new ConcurrentHashMap<>();
+    private transient Map<String, DefaultAliBridgeChannel> channelMap=new ConcurrentHashMap<>();
 
     private transient final EventBus eventBus;
 
@@ -81,7 +81,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
         this.bridgeGateway= SpringUtil.getBean(AliBridgeGateway.class);
     }
 
-    public Mono<Void> initBridge(AliIotBridgeEntity params,boolean broadcast){
+    public Mono<AliBridgeServer> initBridge(AliIotBridgeEntity params,boolean broadcast){
         this.params=params;
         this.status=params.getState();
         verify(params);
@@ -116,7 +116,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
             });
         }
         params.setDeviceName(bridgeConfigManager.getDeviceName());
-        return Mono.empty();
+        return Mono.just(this);
     }
 
     public Mono<AliBridgeServer> refreshBootStrap(AliIotBridgeEntity params,boolean broadcast){
@@ -150,6 +150,14 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
         bridgeConfigManager = DefaultBridgeConfigManager.of(params.getId(),productKey, null, null, http2Endpoint, authEndpoint, MapUtil.of(Pair.of(BridgeConfigConsts.BRIDGE_INSTANCEID_KEY,params.getAccessConfig().getIotInstanceId())), getPopClientProfile(accessKey, accessSecret, regionId));
     }
 
+    public Mono<Void> refreshAllChannel(){
+        Collection<DefaultAliBridgeChannel> channels = channelMap.values();
+        return Flux.fromStream(channels.stream())
+            .flatMap(channel->
+                register(this.getId(),channel.getOriginalIdentity(),channel.getProductKey(),channel.getDeviceName(),channel.getDeviceSecret(),false))
+            .then();
+    }
+
 
     private PopClientConfiguration getPopClientProfile(@NotNull String accessKey,@NotNull String accessSecret,@NotNull String endpoint){
         PopClientConfiguration popClientConfiguration = new PopClientConfiguration();
@@ -163,13 +171,14 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
     }
 
 
-    public Mono<Channel> register(@NotNull String bridgeId,@NotNull String originalIdentity,@NotNull String productKey,@NotNull String deviceName,@NotNull String deviceSecret,
-                                  boolean broadcast) {
+    public Mono<DefaultAliBridgeChannel> register(@NotNull String bridgeId,@NotNull String originalIdentity,@NotNull String productKey,@NotNull String deviceName,@NotNull String deviceSecret,
+                                                  boolean broadcast) {
         //注册设备信息
         DefaultDeviceConfigManager.register(bridgeId,originalIdentity,productKey,deviceName,deviceSecret);
         DefaultUplinkChannelHandler uplinkChannelHandler = new DefaultUplinkChannelHandler(bridgeConfigManager, DefaultDeviceConfigManager.getInstance());
         channelMap
-            .putIfAbsent(originalIdentity, new DefaultAliBridgeChannel(originalIdentity, productKey, deviceName, uplinkChannelHandler, deviceRegistry, eventBus));
+            .putIfAbsent(originalIdentity, new DefaultAliBridgeChannel(originalIdentity, productKey, deviceName,deviceSecret, uplinkChannelHandler, deviceRegistry, eventBus));
+
         if(broadcast){
             getClusterOperationTopic()
                 .publish(Mono.just(OperationMessage.builder()
@@ -184,7 +193,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
             return Mono.empty();
         }
         return Mono.just(channelMap.get(originalIdentity))
-            .doOnNext(Channel::init)
+            .doOnNext(DefaultAliBridgeChannel::init)
             .flatMap(channel ->
                 deviceRegistry.getDevice(channel.getOriginalIdentity())
                     .flatMap(DeviceOperator::isOnline)
@@ -209,7 +218,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
 
     public Mono<Void> unRegister(String originalIdentity,boolean broadcast) {
         return Mono.fromRunnable(()->{
-            Channel channel = channelMap.remove(originalIdentity);
+            DefaultAliBridgeChannel channel = channelMap.remove(originalIdentity);
             if(broadcast){
                 getClusterOperationTopic()
                     .publish(Mono.just(OperationMessage.builder()
@@ -251,7 +260,7 @@ public class AliBridgeServer extends AbstractClusterUniqueTask<AliBridgeServer>
             if(bootstrap!=null){
                 if(!isReplica()){
                     bootstrap.reconnectBridge();
-                    channelMap.values().forEach(Channel::online);
+                    channelMap.values().forEach(DefaultAliBridgeChannel::online);
                 }
             }
             changeStatus(BridgeStatus.running,broadcast);

+ 7 - 8
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/DefaultAliBridgeChannel.java

@@ -8,9 +8,7 @@ import org.jetlinks.community.bridge.core.AliBridgeCodec;
 import org.jetlinks.community.bridge.core.DefaultBridgeBootstrap;
 import org.jetlinks.community.bridge.core.DefaultUplinkChannelHandler;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
-import org.jetlinks.community.bridge.server.Channel;
 import org.jetlinks.core.device.DeviceOperator;
-import org.jetlinks.core.device.DeviceProductOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;
@@ -29,7 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 @Slf4j
 @Data
-public class DefaultAliBridgeChannel implements Channel {
+public class DefaultAliBridgeChannel {
     private String originalIdentity;
 
     private DeviceRegistry deviceRegistry;
@@ -46,6 +44,7 @@ public class DefaultAliBridgeChannel implements Channel {
 
     private AtomicBoolean start=new AtomicBoolean(false);
 
+    private String deviceSecret;
     private  String productId="**";
     /**
      * @see DefaultBridgeBootstrap 网桥数据下行
@@ -54,6 +53,7 @@ public class DefaultAliBridgeChannel implements Channel {
     public DefaultAliBridgeChannel(String originalIdentity,
                                    String productKey,
                                    String deviceName,
+                                   String deviceSecret,
                                    DefaultUplinkChannelHandler uplinkChannelHandler,
                                    DeviceRegistry deviceRegistry,
                                    EventBus eventBus) {
@@ -63,9 +63,9 @@ public class DefaultAliBridgeChannel implements Channel {
         this.productKey=productKey;
         this.deviceName=deviceName;
         this.uplinkChannelHandler = uplinkChannelHandler;
+        this.deviceSecret=deviceSecret;
     }
 
-    @Override
     public void init(){
         if(start.get()){
             return;
@@ -116,7 +116,6 @@ public class DefaultAliBridgeChannel implements Channel {
     /**
      * 处理上行数据   自有平台-》阿里云
      */
-    @Override
     public void upLink(AliBridgeMessage msg){
         if(msg.isOnline()){
             uplinkChannelHandler.doOnline(Session.newInstance(this.originalIdentity,new Object()),this.originalIdentity);
@@ -127,18 +126,18 @@ public class DefaultAliBridgeChannel implements Channel {
         }
     }
 
-    @Override
+
     public void online() {
         uplinkChannelHandler.doOnline(Session.newInstance(this.originalIdentity,new Object()),this.originalIdentity);
     }
 
-    @Override
+
     public void doOffline(){
         uplinkChannelHandler.doOffline(this.originalIdentity);
         log.info("设备网桥{}下线",this.originalIdentity);
     }
 
-    @Override
+
     public void close(){
         disposable.dispose();
         doOffline();