Forráskód Böngészése

fixed 网桥设备接口

18339543638 4 éve
szülő
commit
4bf1dc1049

+ 16 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java

@@ -9,6 +9,7 @@ import org.jetlinks.community.network.mqtt.server.MqttServer;
 import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
 
 import java.util.Collection;
 import java.util.function.Function;
@@ -97,4 +98,19 @@ public class VertxMqttServer implements MqttServer {
         }
 
     }
+
+
+    public static void main(String[] args) {
+        EmitterProcessor<String> connectionProcessor = EmitterProcessor.create(false);
+        FluxSink<String> sink = connectionProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
+
+
+        connectionProcessor.doOnNext(s->{
+            System.out.println(s);
+        }).subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            sink.next(String.valueOf(i));
+        }
+    }
 }

+ 0 - 2
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.java

@@ -6,11 +6,9 @@ import lombok.Setter;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.id.IDGenerator;
 import org.hswebframework.web.utils.ExpressionUtils;
-import org.jetlinks.core.defaults.DefaultDeviceOperator;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceProductOperator;
 import org.jetlinks.core.device.DeviceRegistry;
-import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.DeviceMessageReply;
 import org.jetlinks.core.message.Headers;
 import org.jetlinks.core.message.MessageType;

+ 25 - 4
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/AliBridgeCodec.java

@@ -4,16 +4,21 @@ package org.jetlinks.community.bridge.core;
 import cn.hutool.core.util.StrUtil;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.aliyun.iot.as.bridge.core.model.Session;
 import com.aliyun.iot.as.bridge.core.model.tsl.*;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
 import org.jetlinks.core.message.*;
 import org.jetlinks.core.message.event.EventMessage;
 import org.jetlinks.core.message.function.FunctionInvokeMessage;
+import org.jetlinks.core.message.function.FunctionParameter;
 import org.jetlinks.core.message.property.ReportPropertyMessage;
+import org.jetlinks.core.message.property.WritePropertyMessage;
 import org.jetlinks.core.message.property.WritePropertyMessageReply;
 import org.jetlinks.core.metadata.types.GeoPoint;
+import org.jetlinks.core.utils.IdUtils;
 
 import javax.annotation.Nonnull;
+import java.util.*;
 
 /**
  * @author lifang
@@ -28,15 +33,31 @@ public class AliBridgeCodec{
 
     public static DeviceMessage decode(@Nonnull AliBridgeMessage payload,String productKey,String deviceName) {
         String topic = payload.getTopic();
-        if(topic==null){
+        Session session = payload.getSession();
+        if(topic==null||session==null){
             return null;
         }
-        if(topic.equals( ThingTopicTemplates.PROPERTY_SET_REPLY_TOPIC_TEMPLATE.getTopic(productKey, deviceName))){
-            //属性设置回复
+        if(topic.equals( ThingTopicTemplates.PROPERTY_SET_TOPIC_TEMPLATE.getTopic(session.getProductKey(), session.getDeviceName()))){
+            //属性设置
+            WritePropertyMessage writePropertyMessage = new WritePropertyMessage();
+            writePropertyMessage.setDeviceId(payload.getOriginalIdentity());
+            writePropertyMessage.setMessageId(IdUtils.newUUID());
+            writePropertyMessage.setProperties(JSONObject.parseObject(new String(payload.getPayload())));
+            return writePropertyMessage;
         }
         if(topic.startsWith(ThingTopicTemplates.SERVICE_TOPIC_TEMPLATE_PREFIX.getTopic(productKey,deviceName))){
-            //功能调用回复
+            //获取属性
+            FunctionInvokeMessage functionInvokeMessage = new FunctionInvokeMessage();
+            functionInvokeMessage.setDeviceId(payload.getOriginalIdentity());
+            functionInvokeMessage.setMessageId(IdUtils.newUUID());
+            functionInvokeMessage.setFunctionId(topic.substring(topic.lastIndexOf("/")+1));
+            JSONObject params = (JSONObject) JSONObject.parseObject(new String(payload.getPayload())).get("params");
+            List<FunctionParameter> functionParameters = new ArrayList<>();
+            params.forEach((k,v)->functionParameters.add(new FunctionParameter(k,v)));
+            functionInvokeMessage.setInputs(functionParameters);
+            return functionInvokeMessage;
         }
+
         return null;
     }
 

+ 37 - 20
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/core/DefaultBridgeConfigManager.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.bridge.core;
 
 import cn.hutool.core.util.HashUtil;
+import cn.hutool.core.util.StrUtil;
 import com.aliyun.iot.as.bridge.core.config.BridgeConfigConsts;
 import com.aliyun.iot.as.bridge.core.config.BridgeConfigManager;
 import com.aliyun.iot.as.bridge.core.exception.BootException;
@@ -9,14 +10,22 @@ import lombok.Getter;
 import lombok.NoArgsConstructor;
 
 import java.net.*;
+import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 
 @Getter
-@NoArgsConstructor
 public class DefaultBridgeConfigManager implements BridgeConfigManager {
 
+    /**
+     * 网桥id,以数据库中存储的主键字段id为准
+     * @see  org.jetlinks.community.bridge.entity.AliIotBridgeEntity
+     */
+    private String bridgeId;
+
+    private final String macId;
+
     /**
      * 网桥设备所属产品的ProductKey。
      */
@@ -48,11 +57,13 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
 
     private PopClientConfiguration popConfiguration;
 
-    public static DefaultBridgeConfigManager of(String bridgeProductKey, String bridgeDeviceName, String bridgeDeviceSecret, String http2Endpoint,String authEndpoint, Map<String, Object> configMaps,PopClientConfiguration popConfiguration) {
-        return new DefaultBridgeConfigManager(bridgeProductKey,bridgeDeviceName,bridgeDeviceSecret,http2Endpoint,authEndpoint,configMaps,popConfiguration);
+    public static DefaultBridgeConfigManager of(String bridgeId,String bridgeProductKey, String bridgeDeviceName, String bridgeDeviceSecret, String http2Endpoint,String authEndpoint, Map<String, Object> configMaps,PopClientConfiguration popConfiguration) {
+        return new DefaultBridgeConfigManager(bridgeId,bridgeProductKey,bridgeDeviceName,bridgeDeviceSecret,http2Endpoint,authEndpoint,configMaps,popConfiguration);
     }
 
-    private DefaultBridgeConfigManager(String bridgeProductKey, String bridgeDeviceName, String bridgeDeviceSecret, String http2Endpoint,String authEndpoint,Map<String, Object> configMaps, PopClientConfiguration popConfiguration) {
+    private DefaultBridgeConfigManager(String bridgeId,String bridgeProductKey, String bridgeDeviceName, String bridgeDeviceSecret, String http2Endpoint,String authEndpoint,Map<String, Object> configMaps, PopClientConfiguration popConfiguration) {
+        this.bridgeId=bridgeId;
+        this.macId="";
         this.bridgeProductKey = bridgeProductKey;
         this.bridgeDeviceName = bridgeDeviceName;
         this.bridgeDeviceSecret = bridgeDeviceSecret;
@@ -62,6 +73,26 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
         this.popConfiguration = popConfiguration;
     }
 
+    public static void main(String[] args) throws UnknownHostException {
+        System.out.println(createMacId("123"));
+        System.out.println(createMacId("123456789420"));
+    }
+
+    private static String createMacId(String id){
+        String result="%s%s-%s%s-%s%s-%s%s-%s%s-%s%s";
+        String[] params=new String[12];
+        Arrays.fill(params,"0");
+        String reverse = StrUtil.reverse(id);
+        for (int i = 0; i < reverse.length(); i++) {
+            if(i>=12){
+                break;
+            }
+            params[i]=String.valueOf(reverse.charAt(i));
+        }
+        result=String.format(result,params);
+        return result;
+    }
+
     @Override
     public String getBridgeInstanceId() {
         return (String) configMaps.get(BridgeConfigConsts.BRIDGE_INSTANCEID_KEY);
@@ -119,7 +150,8 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
     @Override
     public String getMacAddress() {
 //        return String.valueOf(HashUtil.rsHash(originalProductId));
-        return getMac(getInetAddress());
+//        return getMac(getInetAddress());
+        return macId;
     }
     @Override
     public void updateBridgeDeviceSecret(String deviceSecret) {
@@ -153,21 +185,6 @@ public class DefaultBridgeConfigManager implements BridgeConfigManager {
         }
     }
 
-    public static void main(String[] args) throws UnknownHostException {
-        byte[] macBytes ="123456".getBytes();
-        StringBuffer buffer = new StringBuffer();
-        for(int i = 0; i < macBytes.length; i++){
-            if(i != 0) { buffer.append("-"); }
-            int intMac = macBytes[i]&0xff;
-            String str = Integer.toHexString(intMac);
-            if(str.length() == 0){
-                buffer.append("0");
-            }
-            buffer.append(str);
-        }
-        System.out.println(buffer.toString().toUpperCase());
-
-    }
     private InetAddress getInetAddress() {
         try {
             Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();

+ 5 - 0
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/entity/AliIotBridgeEntity.java

@@ -60,6 +60,11 @@ public class AliIotBridgeEntity extends GenericEntity<String> {
     @Deprecated
     private String protocol;
 
+    @Comment("阿里云实例id")
+    @Column(name = "iotInstanceId")
+    @Schema(description = "阿里云实例id")
+    @Deprecated
+    private String iotInstanceId;
 
     @Column(name = "state",length = 16,nullable = false)
     @EnumCodec

+ 5 - 0
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/message/AliBridgeMessage.java

@@ -39,6 +39,11 @@ public class AliBridgeMessage {
         this.topic = topic;
         this.payload = payload;
         this.session=session;
+        if(session!=null){
+            this.originalIdentity=session.getOriginalIdentity();
+            this.productKey=session.getProductKey();
+            this.deviceName=session.getDeviceName();
+        }
     }
 
     private String productKey;

+ 24 - 16
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeGateway.java

@@ -11,11 +11,18 @@ import org.jetlinks.community.bridge.entity.AliIotBridgeEntity;
 import org.jetlinks.community.bridge.message.AliBridgeMessage;
 import org.jetlinks.core.cluster.ClusterManager;
 import org.jetlinks.core.cluster.ClusterTopic;
+import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.event.TopicPayload;
 import org.jetlinks.core.message.CommonDeviceMessage;
 import org.jetlinks.core.message.CommonDeviceMessageReply;
+import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.Message;
+import org.jetlinks.core.message.function.FunctionInvokeMessage;
+import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
+import org.jetlinks.core.message.property.WritePropertyMessage;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
@@ -114,20 +121,15 @@ public class AliBridgeGateway{
 
     /**
      * 取消设备注册
+     * @param bridgeId 网桥id
+     * @param originalIdentity 设备id
      * @return
      */
-    public Mono<Void> unregisterDevice(String serverId,String originalIdentity){
-        return handleClusterOperation(serverId,MessageType.unregister,originalIdentity)
+    public Mono<Void> unregisterDevice(String bridgeId,String originalIdentity){
+        return handleClusterOperation(bridgeId,MessageType.unregister,originalIdentity)
             .switchIfEmpty(aliBridgeServer.unRegister(originalIdentity));
     }
 
-    public static void main(String[] args) {
-        Mono.justOrEmpty(null)
-            .then()
-            .switchIfEmpty(Mono.just("123").doOnNext(s->{
-                System.out.println(s);
-            }).then()).subscribe();
-    }
     public Mono<Void> initBridge(AliIotBridgeEntity bridgeEntity){
         return handleClusterOperation(bridgeEntity.getNodeId(),MessageType.init,bridgeEntity)
             .switchIfEmpty(Mono.justOrEmpty(aliBridgeServer)
@@ -151,9 +153,9 @@ public class AliBridgeGateway{
                     .then()
                     .onErrorResume(error->
                         Mono.error(()->{
-                        this.delBridgeServer(bridgeEntity.getNodeId());
-                        return new BusinessException("网桥配置出错,请确定配置参数是否正确;或请勿重复启动网桥)");
-                    })))
+                            this.delBridgeServer(bridgeEntity.getNodeId());
+                            return new BusinessException("网桥配置出错,请确定配置参数是否正确;或请勿重复启动网桥)");
+                        })))
                 .then());
     }
 
@@ -201,10 +203,16 @@ public class AliBridgeGateway{
 
     private Mono<Void> handleChannelMessage(Message message,String deviceId) {
         return registry.getDevice(deviceId)
-            .flatMap(operator ->
-                messageHandler
-                    .handleMessage(operator, message)
-                    .then()
+            .flatMap(operator ->{
+                    if(message instanceof WritePropertyMessage || message instanceof FunctionInvokeMessage){
+                        return operator.messageSender().send((DeviceMessage) message)
+                            .then();
+                    }
+                    return  messageHandler
+                        .handleMessage(operator, message)
+                        .then();
+                }
+
             );
     }
 

+ 3 - 3
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/server/aliyun/AliBridgeServer.java

@@ -61,10 +61,10 @@ public class AliBridgeServer  implements BridgeServer {
         this.clusterId=clusterId;
     }
 
-    public static Mono<AliBridgeServer> create(EventBus eventBus,DeviceRegistry deviceRegistry, AliIotBridgeEntity productMapping,String clusterId) {
+    public static Mono<AliBridgeServer> create(EventBus eventBus,DeviceRegistry deviceRegistry, AliIotBridgeEntity bridge,String clusterId) {
         AliBridgeServer aliBridgeServer = new AliBridgeServer(eventBus,deviceRegistry,clusterId);
         return  Mono.just(aliBridgeServer)
-            .flatMap(server->server.initBridge(productMapping))
+            .flatMap(server->server.initBridge(bridge))
             .thenReturn(aliBridgeServer);
     }
 
@@ -85,7 +85,7 @@ public class AliBridgeServer  implements BridgeServer {
         Assert.notNull(http2Endpoint,"创建网桥 http2Endpoint 不能为空, mapping id {%s}",params.getId());
         String regionId = accessConfig.getRegionId();
         Assert.notNull(regionId,"创建网桥 regionId 不能为空, mapping id {%s}",params.getId());
-        bridgeConfigManager = DefaultBridgeConfigManager.of(productKey, null, null, http2Endpoint, authEndpoint, null, getPopClientProfile(accessKey, accessSecret, regionId));
+        bridgeConfigManager = DefaultBridgeConfigManager.of(params.getId(),productKey, null, null, http2Endpoint, authEndpoint, null, getPopClientProfile(accessKey, accessSecret, regionId));
         bootstrap=new DefaultBridgeBootstrap(bridgeConfigManager);
         if(start.get()){
             return Mono.empty();

+ 2 - 0
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/service/AliIotService.java

@@ -30,6 +30,7 @@ public class AliIotService {
         DefaultProfile profile = DefaultProfile.getProfile(param.getRegionId(), param.getAccessKey(), param.getAccessSecret());
         DefaultAcsClient client = new DefaultAcsClient(profile);
         QueryProductListRequest request = new QueryProductListRequest();
+        request.setIotInstanceId(param.getIotInstanceId());
         request.setPageSize(pageSize);
         request.setCurrentPage(currentPage);
         QueryProductListResponse response = client.getAcsResponse(request);
@@ -51,6 +52,7 @@ public class AliIotService {
         DefaultProfile profile = DefaultProfile.getProfile(param.getRegionId(), param.getAccessKey(), param.getAccessSecret());
         DefaultAcsClient client = new DefaultAcsClient(profile);
         QueryDeviceRequest request = new QueryDeviceRequest();
+        request.setIotInstanceId(param.getIotInstanceId());
         request.setPageSize(pageSize);
         request.setCurrentPage(currentPage);
         request.setProductKey(param.getProductKey());

+ 30 - 21
jetlinks-manager/bridge-manager/src/main/java/org/jetlinks/community/bridge/web/AliBridgeServerController.java

@@ -55,11 +55,13 @@ public class AliBridgeServerController implements
     @DeleteMapping("/delete/{serverId}/{bridgeId}")
     @Operation(summary = "删除网桥")
     @DeleteAction
-    public Mono<Void> deleteBridge(@PathVariable("serverId")String serverId,@PathVariable("bridgeId")String bridgeId){
-        return Mono.zip(bridgeGateway.delBridgeServer(serverId)
-            ,bridgeService.deleteById(bridgeId))
-            .concatWith(bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getBridgeId,bridgeId).execute().then(Mono.empty()))
-            .then();
+    public Mono<Void> deleteBridge(@PathVariable("serverId")String serverId,@PathVariable("bridgeId")String bridgeId,String id){
+        return bridgeService.findById(id)
+            .flatMap(bridge->Mono.zip(bridgeGateway.delBridgeServer(bridge.getNodeId())
+                ,bridgeService.deleteById(id))
+                .concatWith(bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getBridgeId,id).execute().then(Mono.empty()))
+                .then()
+            );
     }
 
 
@@ -94,12 +96,15 @@ public class AliBridgeServerController implements
     @PostMapping("/unregister/{serverId}")
     @Operation(summary = "取消注册网桥设备")
     @DeleteAction
-    public Mono<Void> unRegister(@PathVariable("serverId")String serverId,@RequestBody String originalIdentity){
-        return Mono.zip(
-            //删除网桥设备
-            bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getOriginalIdentity,originalIdentity).execute(),
-            //取消注册
-            bridgeGateway.unregisterDevice(serverId,originalIdentity)).then();
+    public Mono<Void> unRegister(@PathVariable("serverId")String serverId,@RequestBody String originalIdentity,String id){
+        return bridgeDeviceService.findById(id)
+            .flatMap(bridgeDevice->
+                Mono.zip(
+                    //删除网桥设备
+                    bridgeDeviceService.createDelete().where(AliIotBridgeDeviceConfig::getOriginalIdentity,originalIdentity).execute(),
+                    //取消注册
+                    bridgeGateway.unregisterDevice(bridgeDevice.getBridgeId(),bridgeDevice.getOriginalIdentity())).then()
+            );
     }
 
 
@@ -114,17 +119,21 @@ public class AliBridgeServerController implements
     @PutMapping("/update/{serverId}")
     @Operation(summary = "更新网桥信息")
     @CreateAction
-    public Mono<Void> updateBridge(@PathVariable("serverId")String serverId,@RequestBody AliIotBridgeEntity bridge){
+    public Mono<Void> updateBridge(@PathVariable("serverId")String serverId,@RequestBody AliIotBridgeEntity bridge,String id){
         //删除设备后重新注册设备
-        return   Mono.zip(
-            bridgeGateway.replaceBridgeServer(serverId,bridge)
-            , bridgeService.save(bridge))
-            .flatMap(tp2->bridgeDeviceService.createQuery()
-                .where(AliIotBridgeDeviceConfig::getBridgeId,bridge.getId())
-                .fetch()
-                .flatMap(deviceConfig->
-                    bridgeGateway.registerDevice(serverId,deviceConfig)
-                ).then());
+        return bridgeService.findById(id)
+            .flatMap(oldBridge->
+                Mono.zip(
+                    bridgeGateway.replaceBridgeServer(oldBridge.getNodeId(),bridge)
+                    , bridgeService.save(bridge))
+                    .flatMap(tp2->bridgeDeviceService.createQuery()
+                        .where(AliIotBridgeDeviceConfig::getBridgeId,bridge.getId())
+                        .fetch()
+                        .flatMap(deviceConfig->
+                            bridgeGateway.registerDevice(oldBridge.getNodeId(),deviceConfig)
+                        ).then()
+                    )
+            );
     }