Browse Source

add webhook 通知配置

18339543638 4 years ago
parent
commit
066a321305

+ 11 - 0
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/support/message/TimeSyncReplyMessage.java

@@ -1,8 +1,18 @@
 package org.jetlinks.community.support.message;
 
+import cn.hutool.core.io.FileUtil;
 import cn.hutool.core.util.EnumUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.ezorm.core.CastUtil;
 import org.jetlinks.core.message.CommonDeviceMessageReply;
 import org.jetlinks.core.message.MessageType;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+import java.io.File;
 
 /**
  * @author lifang
@@ -11,6 +21,7 @@ import org.jetlinks.core.message.MessageType;
  * @Description TODO
  * @createTime 2021年08月27日 09:11:00
  */
+@Slf4j
 public class TimeSyncReplyMessage extends CommonDeviceMessageReply<TimeSyncReplyMessage> {
     private static MessageType messageType= EnumUtil.likeValueOf(MessageType.class, "timeSyncReply");
     @Override

+ 1 - 0
jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/DefaultNotifyType.java

@@ -12,6 +12,7 @@ public enum DefaultNotifyType implements NotifyType {
     voice("语音"),
     dingTalk("钉钉"),
     weixin("微信"),
+    webHook("web资源服务")
 
     ;
 

+ 27 - 0
jetlinks-components/notify-component/notify-webhook/pom.xml

@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>notify-component</artifactId>
+        <groupId>org.jetlinks.community</groupId>
+        <version>1.10.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>notify-webhook</artifactId>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-http</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>notify-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>

+ 117 - 0
jetlinks-components/notify-component/notify-webhook/src/main/java/org/jetlinks/community/notify/webhook/WebHookNotifier.java

@@ -0,0 +1,117 @@
+package org.jetlinks.community.notify.webhook;
+
+import cn.hutool.http.HttpRequest;
+import cn.hutool.http.HttpResponse;
+import cn.hutool.http.HttpUtil;
+import cn.hutool.http.Method;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.exception.BusinessException;
+import org.jetlinks.community.notify.*;
+import org.jetlinks.community.notify.template.TemplateManager;
+import org.jetlinks.core.Values;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nonnull;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName WebHookNotifier.java
+ * @Description TODO
+ * @createTime 2021年08月30日 09:35:00
+ */
+@Slf4j
+public class WebHookNotifier extends AbstractNotifier<WebHookTemplate> {
+    private String method;
+    private String url;
+    private Long connectionTimeout;
+
+    private Long readTimeout;
+
+    @Getter
+    private String notifierId;
+
+
+    private HttpRequest request;
+    public WebHookNotifier(NotifierProperties profile, TemplateManager templateManager) {
+        super(templateManager);
+        Map<String, Object> config = profile.getConfiguration();
+        this.method = (String) Objects.requireNonNull(config.get("method"), "请求方法不能为空");
+        this.url=(String) Objects.requireNonNull(config.get("url"), "url不能为空");
+        this.connectionTimeout=(Long) Objects.requireNonNull(config.get("connectionTimeout"), "连接超时时长不能为空");
+        this.readTimeout=(Long) Objects.requireNonNull(config.get("readTimeout"), "请求超时时长不能为空");
+        switch (method){
+            case "post":
+                request = HttpUtil.createPost(url);
+                break;
+            case "get":
+                request = HttpUtil.createGet(url);
+                break;
+            default: throw new BusinessException(String.format("http不支持该方法{%s}",method));
+        }
+        request.setReadTimeout(Long.valueOf(TimeUnit.SECONDS.toMillis(readTimeout)).intValue());
+        request.setConnectionTimeout(Long.valueOf(TimeUnit.SECONDS.toMillis(connectionTimeout)).intValue());
+        this.notifierId = profile.getId();
+    }
+
+    public WebHookNotifier(TemplateManager templateManager) {
+        super(templateManager);
+    }
+
+    @Override
+    public String getNotifierId() {
+        return notifierId;
+    }
+
+    @Nonnull
+    @Override
+    public NotifyType getType() {
+        return DefaultNotifyType.webHook;
+    }
+
+    @Nonnull
+    @Override
+    public Provider getProvider() {
+        return WebHookProvider.webHook;
+    }
+
+    @Nonnull
+    @Override
+    public Mono<Void> send(@Nonnull WebHookTemplate template, @Nonnull Values context) {
+        return Mono.defer(()->{
+                try {
+                    Method method = request.getMethod();
+                    switch (method){
+                        case GET:
+                            Map<String, String> headers = new HashMap<>();
+                            context.getAllValues().forEach((k,v)->headers.put(k,String.valueOf(v)));
+                            request.addHeaders(headers);
+                            break;
+                        case PUT:
+                            request.body(context.toString());
+                            break;
+                        default:break;
+                    }
+                    HttpResponse response = request.execute();
+                    if (200!=response.getStatus()) {
+                        return Mono.error(new BusinessException(response.body(), response.getStatus()));
+                    }
+                }catch (Exception e){
+                    return Mono.error(e);
+                }
+                return Mono.empty();
+            }
+        );
+    }
+
+    @Nonnull
+    @Override
+    public Mono<Void> close() {
+        return Mono.fromRunnable(()->request.getConnection().disconnect());
+    }
+}

+ 70 - 0
jetlinks-components/notify-component/notify-webhook/src/main/java/org/jetlinks/community/notify/webhook/WebHookNotifyProvider.java

@@ -0,0 +1,70 @@
+package org.jetlinks.community.notify.webhook;
+
+import com.alibaba.fastjson.JSON;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.validator.ValidatorUtils;
+import org.jetlinks.community.ConfigMetadataConstants;
+import org.jetlinks.community.notify.*;
+import org.jetlinks.community.notify.template.TemplateManager;
+import org.jetlinks.community.notify.template.TemplateProperties;
+import org.jetlinks.community.notify.template.TemplateProvider;
+import org.jetlinks.core.metadata.ConfigMetadata;
+import org.jetlinks.core.metadata.DefaultConfigMetadata;
+import org.jetlinks.core.metadata.types.LongType;
+import org.jetlinks.core.metadata.types.StringType;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nonnull;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName WebHookNotifyProvider.java
+ * @Description TODO
+ * @createTime 2021年08月30日 08:54:00
+ */
+@Component
+@Slf4j
+@AllArgsConstructor
+public class WebHookNotifyProvider implements NotifierProvider, TemplateProvider {
+
+    private final TemplateManager templateManager;
+
+
+    public static final DefaultConfigMetadata notifierConfig = new DefaultConfigMetadata("通知配置", "")
+        .add("url", "地址", "", new StringType().expand(ConfigMetadataConstants.required.value(true)))
+        .add("method","方法","",new StringType().expand(ConfigMetadataConstants.required.value(true)))
+        .add("connectionTimeout","连接超时时长","",new LongType().expand(ConfigMetadataConstants.required.value(true)))
+        .add("readTimeout","请求超时时长","",new LongType().expand(ConfigMetadataConstants.required.value(true)));
+
+
+    @Override
+    public ConfigMetadata getNotifierConfigMetadata() {
+        return notifierConfig;
+    }
+
+    @Nonnull
+    @Override
+    public NotifyType getType() {
+        return DefaultNotifyType.webHook;
+    }
+
+    @Nonnull
+    @Override
+    public Provider getProvider() {
+        return WebHookProvider.webHook;
+    }
+
+    @Override
+    public Mono<WebHookTemplate> createTemplate(TemplateProperties properties) {
+        return Mono.fromCallable(() -> ValidatorUtils.tryValidate(JSON.parseObject(properties.getTemplate(), WebHookTemplate.class)));
+    }
+
+    @Nonnull
+    @Override
+    public Mono<Notifier<WebHookTemplate>> createNotifier(@Nonnull NotifierProperties properties) {
+        return Mono.fromSupplier(() -> new WebHookNotifier(properties, templateManager));
+    }
+}

+ 26 - 0
jetlinks-components/notify-component/notify-webhook/src/main/java/org/jetlinks/community/notify/webhook/WebHookProvider.java

@@ -0,0 +1,26 @@
+package org.jetlinks.community.notify.webhook;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.jetlinks.community.notify.*;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName WebHookProvider.java
+ * @Description TODO
+ * @createTime 2021年08月30日 08:32:00
+ */
+
+@Getter
+@AllArgsConstructor
+public enum  WebHookProvider  implements Provider {
+    webHook("默认");
+
+    private String name;
+
+    @Override
+    public String getId() {
+        return name();
+    }
+}

+ 42 - 0
jetlinks-components/notify-component/notify-webhook/src/main/java/org/jetlinks/community/notify/webhook/WebHookTemplate.java

@@ -0,0 +1,42 @@
+package org.jetlinks.community.notify.webhook;
+
+import cn.hutool.core.util.URLUtil;
+import cn.hutool.http.HttpUtil;
+import io.vertx.core.http.HttpMethod;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.jetlinks.community.notify.template.Template;
+
+import javax.validation.constraints.NotBlank;
+
+/**
+ * @author lifang
+ * @version 1.0.0
+ * @ClassName WebHookTemplate.java
+ * @Description TODO
+ * @createTime 2021年08月30日 08:57:00
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+public class WebHookTemplate implements Template {
+    @NotBlank(message = "[url]不能为空")
+    private String url;
+
+    private Long connectionTimeout;
+
+    private Long requestTimeout;
+
+    @NotBlank(message = "[请求方法]不能为空")
+    private HttpMethod httpMethod;
+
+    private String sslKey;
+
+    private String sslCert;
+
+    private String description;
+
+    private boolean verify(){
+        return true;
+    }
+
+}

+ 1 - 0
jetlinks-components/notify-component/pom.xml

@@ -19,6 +19,7 @@
         <module>notify-wechat</module>
         <module>notify-dingtalk</module>
         <module>notify-voice</module>
+        <module>notify-webhook</module>
     </modules>
 
 

+ 3 - 1
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalFirmwareUpgradeHistoryService.java

@@ -16,8 +16,9 @@ import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.message.firmware.*;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
@@ -39,6 +40,7 @@ public class LocalFirmwareUpgradeHistoryService extends GenericReactiveCrudServi
     private final LocalDeviceInstanceService deviceService;
     private final DeviceRegistry registry;
     public static final Map<String,ScheduledFuture<?>> taskDelayService=new ConcurrentHashMap<>();
+
     /**
      * 上报版本信息
      * @param message

+ 3 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceFirmwareUpgradeHistoryController.java

@@ -13,8 +13,11 @@ import org.jetlinks.community.device.service.LocalDeviceFirmwareTaskService;
 import org.jetlinks.community.device.service.LocalFirmwareUpgradeHistoryService;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.ArrayList;
+
 /**
  * @author lifang
  * @version 1.0.0

+ 6 - 0
jetlinks-manager/notify-manager/pom.xml

@@ -59,6 +59,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.jetlinks.community</groupId>
+            <artifactId>notify-webhook</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>notify-voice</artifactId>

+ 6 - 19
jetlinks-standalone/src/test/java/org/jetlinks/community/network/manager/web/GateWayTest.java

@@ -31,6 +31,7 @@ import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -52,7 +53,8 @@ public class GateWayTest {
 
     @Autowired
     private DeviceGatewayManager gatewayManager;
-
+    @Autowired
+    private LocalDeviceInstanceService instanceService;
     @Autowired
     private NetworkConfigService networkConfigService;
     @Autowired
@@ -65,27 +67,12 @@ public class GateWayTest {
 
     @Autowired
     private LocalDeviceFirmwareService firmwareService;
-    @Autowired
-    private LocalDeviceInstanceService instanceService;
 
     @Test
     public void test(){
-        Optional<DeviceFirmwareEntity> deviceFirmwareEntity = firmwareService.createQuery().where(DeviceFirmwareEntity::getProductId, Optional.of(3))
-            .fetchOne()
-            .blockOptional();
-
-        System.out.println(deviceFirmwareEntity.get());
-    }
-    public static void main(String[] args) throws InterruptedException {
-        VertxMqttServerProvider mqttServerManager = new VertxMqttServerProvider(id -> Mono.empty(), vertx);
-
-        VertxMqttServerProperties properties = new VertxMqttServerProperties();
-        properties.setId("test");
-        properties.setInstance(4);
-        properties.setOptions(new MqttServerOptions().setPort(1800));
-
-        mqttServer = mqttServerManager.createNetwork(properties);
+        instanceService.createQuery()
+            .fetch()
 
-        Thread.currentThread().join();
+            .subscribe();
     }
 }

+ 23 - 0
pom.xml

@@ -178,6 +178,29 @@
                 <version>${hutool.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>cn.hutool</groupId>
+                <artifactId>hutool-http</artifactId>
+                <version>${hutool.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>cn.hutool</groupId>
+                <artifactId>hutool-json</artifactId>
+                <version>${hutool.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>cn.hutool</groupId>
+                <artifactId>hutool-core</artifactId>
+                <version>${hutool.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>cn.hutool</groupId>
+                <artifactId>hutool-crypto</artifactId>
+                <version>${hutool.version}</version>
+            </dependency>
             <dependency>
                 <groupId>io.netty</groupId>
                 <artifactId>netty-bom</artifactId>