|
|
@@ -1,5 +1,6 @@
|
|
|
package org.jetlinks.community.bridge.server.aliyun;
|
|
|
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.Data;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
@@ -31,6 +32,8 @@ import reactor.core.scheduler.Schedulers;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
import java.io.Serializable;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
|
|
|
/**
|
|
|
@@ -38,12 +41,13 @@ import java.util.Objects;
|
|
|
* @version 1.0.0
|
|
|
* @ClassName AliBridgeGateway.java
|
|
|
* @Description 阿里云网桥网关
|
|
|
+ * 处理网桥服务,解码/编码
|
|
|
* @createTime 2021年11月30日 08:35:00
|
|
|
*/
|
|
|
@Slf4j
|
|
|
@Service
|
|
|
public class AliBridgeGateway{
|
|
|
- private volatile AliBridgeServer aliBridgeServer;
|
|
|
+ private Map<String,AliBridgeServer> bridgeMap=new HashMap<>();
|
|
|
private final EmitterProcessor<Message> messageProcessor = EmitterProcessor.create(false);
|
|
|
|
|
|
private final FluxSink<Message> sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
|
|
|
@@ -81,17 +85,17 @@ public class AliBridgeGateway{
|
|
|
MessageType messageType = message.getMessageType();
|
|
|
switch (messageType) {
|
|
|
case del:
|
|
|
- return delBridgeServer((String) message.getMsg());
|
|
|
+ return delBridgeServer(clusterId,message.getBridgeId());
|
|
|
case init:
|
|
|
return initBridge((AliIotBridgeEntity) message.getMsg());
|
|
|
case update:
|
|
|
- return replaceBridgeServer(clusterId, (AliIotBridgeEntity) message.getMsg());
|
|
|
+ return replaceBridgeServer(clusterId, message.getBridgeId(),(AliIotBridgeEntity) message.getMsg());
|
|
|
case register:
|
|
|
- return registerDevice(clusterId, (AliIotBridgeDeviceConfig) message.getMsg());
|
|
|
+ return registerDevice(clusterId, message.getBridgeId(),(AliIotBridgeDeviceConfig) message.getMsg());
|
|
|
case unregister:
|
|
|
- return unregisterDevice(clusterId, (String) message.getMsg());
|
|
|
+ return unregisterDevice(clusterId,message.getBridgeId(), (String) message.getMsg());
|
|
|
case restart:
|
|
|
- return reconnect(clusterId);
|
|
|
+ return reconnect(clusterId,message.getBridgeId());
|
|
|
default:
|
|
|
return Mono.empty();
|
|
|
}
|
|
|
@@ -101,20 +105,32 @@ public class AliBridgeGateway{
|
|
|
}
|
|
|
|
|
|
|
|
|
+ private AliBridgeServer getServer(String serverId){
|
|
|
+ return this.bridgeMap.get(serverId);
|
|
|
+ }
|
|
|
|
|
|
- public Mono<Void> reconnect(String serverId){
|
|
|
- return handleClusterOperation(serverId,MessageType.restart,serverId)
|
|
|
- .switchIfEmpty(aliBridgeServer.reconnect());
|
|
|
+ private AliBridgeServer putServer(String serverId,AliBridgeServer server){
|
|
|
+ return this.bridgeMap.put(serverId,server);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 重启网桥
|
|
|
+ * @param bridgeId 网桥id
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public Mono<Void> reconnect(String nodeId,String bridgeId){
|
|
|
+ return handleClusterOperation(nodeId,bridgeId,MessageType.restart,bridgeId)
|
|
|
+ .switchIfEmpty(getServer(bridgeId).reconnect());
|
|
|
}
|
|
|
/**
|
|
|
* 注册设备
|
|
|
- * @param serverId 节点id
|
|
|
+ * @param nodeId 节点id
|
|
|
+ * @param bridgeId 网桥id
|
|
|
* @param config 设备配置
|
|
|
* @return
|
|
|
*/
|
|
|
- public Mono<Void> registerDevice(String serverId, AliIotBridgeDeviceConfig config){
|
|
|
- return handleClusterOperation(serverId,MessageType.register,config)
|
|
|
- .switchIfEmpty(this.aliBridgeServer
|
|
|
+ public Mono<Void> registerDevice(String nodeId,String bridgeId,AliIotBridgeDeviceConfig config){
|
|
|
+ return handleClusterOperation(nodeId,bridgeId,MessageType.register,config)
|
|
|
+ .switchIfEmpty(getServer(bridgeId)
|
|
|
.register(config.getOriginalIdentity(),config.getProductKey(),config.getDeviceName(),config.getDeviceSecret()).then());
|
|
|
}
|
|
|
|
|
|
@@ -125,22 +141,22 @@ public class AliBridgeGateway{
|
|
|
* @param originalIdentity 设备id
|
|
|
* @return
|
|
|
*/
|
|
|
- public Mono<Void> unregisterDevice(String bridgeId,String originalIdentity){
|
|
|
- return handleClusterOperation(bridgeId,MessageType.unregister,originalIdentity)
|
|
|
- .switchIfEmpty(aliBridgeServer.unRegister(originalIdentity));
|
|
|
+ public Mono<Void> unregisterDevice(String nodeId,String bridgeId,String originalIdentity){
|
|
|
+ return handleClusterOperation(nodeId,bridgeId,MessageType.unregister,originalIdentity)
|
|
|
+ .switchIfEmpty(getServer(bridgeId).unRegister(originalIdentity));
|
|
|
}
|
|
|
|
|
|
public Mono<Void> initBridge(AliIotBridgeEntity bridgeEntity){
|
|
|
- return handleClusterOperation(bridgeEntity.getNodeId(),MessageType.init,bridgeEntity)
|
|
|
- .switchIfEmpty(Mono.justOrEmpty(aliBridgeServer)
|
|
|
+ return handleClusterOperation(bridgeEntity.getNodeId(),
|
|
|
+ bridgeEntity.getId(),MessageType.init,bridgeEntity)
|
|
|
+ .switchIfEmpty(Mono.justOrEmpty(getServer(bridgeEntity.getId()))
|
|
|
.then()
|
|
|
.switchIfEmpty(AliBridgeServer.create(eventBus,registry,bridgeEntity,clusterId)
|
|
|
.doOnNext(server -> {
|
|
|
- if(aliBridgeServer==null){
|
|
|
- aliBridgeServer=server;
|
|
|
- }else {
|
|
|
- server.stopBridge().subscribe();
|
|
|
- throw new BusinessException("网桥正在运行,请勿重复启动");
|
|
|
+ AliBridgeServer oldServer =
|
|
|
+ this.bridgeMap.putIfAbsent(server.getId(), server);
|
|
|
+ if(oldServer!=null){
|
|
|
+ oldServer.stopBridge().subscribe();
|
|
|
}
|
|
|
})
|
|
|
.doOnNext(server->{
|
|
|
@@ -153,26 +169,31 @@ public class AliBridgeGateway{
|
|
|
.then()
|
|
|
.onErrorResume(error->
|
|
|
Mono.error(()->{
|
|
|
- this.delBridgeServer(bridgeEntity.getNodeId());
|
|
|
+ this.delBridgeServer(bridgeEntity.getNodeId(),
|
|
|
+ bridgeEntity.getId());
|
|
|
return new BusinessException("网桥配置出错,请确定配置参数是否正确;或请勿重复启动网桥)");
|
|
|
})))
|
|
|
.then());
|
|
|
}
|
|
|
|
|
|
- public Mono<Void> delBridgeServer(String serverId){
|
|
|
- return handleClusterOperation(serverId,MessageType.del,serverId)
|
|
|
- .switchIfEmpty(Mono.justOrEmpty(aliBridgeServer)
|
|
|
- .filter(Objects::nonNull)
|
|
|
- .flatMap(AliBridgeServer::stopBridge));
|
|
|
- }
|
|
|
|
|
|
- public Mono<Void> replaceBridgeServer(String serverId,AliIotBridgeEntity bridge){
|
|
|
- return handleClusterOperation(serverId,MessageType.update,bridge)
|
|
|
- .switchIfEmpty(delBridgeServer(serverId)
|
|
|
+
|
|
|
+ public Mono<Void> replaceBridgeServer(String oldNodeId,String oldBridgeId,AliIotBridgeEntity bridge){
|
|
|
+ return handleClusterOperation(oldNodeId,oldBridgeId,MessageType.update,bridge)
|
|
|
+ .switchIfEmpty(delBridgeServer(oldNodeId,oldBridgeId)
|
|
|
.concatWith(initBridge(bridge))
|
|
|
.then());
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ public Mono<Void> delBridgeServer(String nodeId,String bridgeId){
|
|
|
+ return handleClusterOperation(nodeId,bridgeId,MessageType.del,bridgeId)
|
|
|
+ .switchIfEmpty(Mono.justOrEmpty(getServer(bridgeId))
|
|
|
+ .filter(Objects::nonNull)
|
|
|
+ .flatMap(AliBridgeServer::stopBridge));
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
//解码消息并处理
|
|
|
private Mono<Void> decodeAndHandleMessage(AliBridgeMessage msg) {
|
|
|
return Mono.justOrEmpty(AliBridgeCodec.decode(msg,msg.getProductKey(),msg.getDeviceName()))
|
|
|
@@ -217,19 +238,32 @@ public class AliBridgeGateway{
|
|
|
}
|
|
|
|
|
|
|
|
|
- private Mono<Void> handleClusterOperation(String serverId, MessageType messageType, Object msg){
|
|
|
- if(!this.clusterId.equals(serverId)){
|
|
|
- return clusterManager.getTopic(String.format(format,serverId))
|
|
|
- .publish(Mono.just(BridgeMessage.of(clusterId,messageType,msg))).then();
|
|
|
+ /**
|
|
|
+ *
|
|
|
+ * @param nodeId 网桥所在节点id
|
|
|
+ * @param bridgeId 网桥id
|
|
|
+ * @param messageType 消息类型
|
|
|
+ * @param msg 消息
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private Mono<Void> handleClusterOperation(String nodeId,String bridgeId, MessageType messageType, Object msg){
|
|
|
+ if(StrUtil.isNullOrUndefined(bridgeId)){
|
|
|
+ log.error("AliBridgeGateway.handleClusterOperation 网桥id不能为空,");
|
|
|
+ return Mono.error(new BusinessException("系统内部错误"));
|
|
|
+ }
|
|
|
+ if(!this.clusterId.equals(nodeId)){
|
|
|
+ return clusterManager.getTopic(String.format(format,nodeId))
|
|
|
+ .publish(Mono.just(BridgeMessage.of(clusterId,messageType,bridgeId,msg))).then();
|
|
|
}
|
|
|
return Mono.empty();
|
|
|
}
|
|
|
|
|
|
@AllArgsConstructor(staticName = "of")
|
|
|
@Data
|
|
|
- static class BridgeMessage implements Serializable {
|
|
|
+ private static class BridgeMessage implements Serializable {
|
|
|
private String from;
|
|
|
private MessageType messageType;
|
|
|
+ private String bridgeId;
|
|
|
private Object msg;
|
|
|
}
|
|
|
|