|
|
@@ -0,0 +1,199 @@
|
|
|
+package cn.tr.plugin.eventbus.config;
|
|
|
+
|
|
|
+import cn.hutool.core.collection.CollectionUtil;
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
+import cn.tr.core.utils.JsonUtils;
|
|
|
+import cn.tr.plugin.eventbus.annotation.Subscribe;
|
|
|
+import cn.tr.plugin.eventbus.bean.Subscription;
|
|
|
+import cn.tr.plugin.eventbus.proxy.ProxyMessageListener;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.aop.framework.Advised;
|
|
|
+import org.springframework.aop.support.AopUtils;
|
|
|
+import org.springframework.beans.BeansException;
|
|
|
+import org.springframework.beans.factory.config.BeanPostProcessor;
|
|
|
+import org.springframework.core.annotation.MergedAnnotation;
|
|
|
+import org.springframework.core.annotation.MergedAnnotations;
|
|
|
+import org.springframework.util.ReflectionUtils;
|
|
|
+
|
|
|
+import java.lang.reflect.AnnotatedElement;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
+import java.lang.reflect.Method;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @ClassName : SubscribeListenerAnnotationBeanPostProcessor
|
|
|
+ * @Description :
|
|
|
+ * @Author : LF
|
|
|
+ * @Date: 2023年03月17日
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+public class SubscribeListenerAnnotationBeanPostProcessor implements BeanPostProcessor {
|
|
|
+ private final ConcurrentMap<Class<?>, TypeMetadata> typeCache = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅者集合
|
|
|
+ */
|
|
|
+ private final static Set<Subscription> subscriptionSet=new HashSet<>();
|
|
|
+
|
|
|
+ private final EventBus eventBus;
|
|
|
+
|
|
|
+ public SubscribeListenerAnnotationBeanPostProcessor(EventBus eventBus) {
|
|
|
+ this.eventBus = eventBus;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
|
|
+ Class<?> targetClass = AopUtils.getTargetClass(bean);
|
|
|
+ final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
|
|
|
+ for (ListenerMethod lm : metadata.listenerMethods) {
|
|
|
+ for (Subscribe subscribe : lm.annotations) {
|
|
|
+ if(subscribe.value().length==0){
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ processSubscribe(targetClass,subscribe, lm.method, bean, beanName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return bean;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理订阅者
|
|
|
+ * @param objClass 对象class
|
|
|
+ * @param subscribe 订阅属性
|
|
|
+ * @param method 订阅方法
|
|
|
+ * @param bean 订阅bean
|
|
|
+ * @param beanName 订阅bean名称
|
|
|
+ */
|
|
|
+ protected void processSubscribe(Class objClass,Subscribe subscribe,Method method,Object bean,String beanName){
|
|
|
+ checkProxy(method, bean);
|
|
|
+ Optional<Class<?>> isArray = Stream.of(method.getParameterTypes()).filter(this::returnsMany).findAny();
|
|
|
+ if(isArray.isPresent()){
|
|
|
+ throw new UnsupportedOperationException("The parameter of class :"+ objClass.getName()+"method : "+method.getName()+" cannot be an array");
|
|
|
+ }
|
|
|
+ ProxyMessageListener proxyMessageListener = new ProxyMessageListener(bean, method);
|
|
|
+ Class<? extends Class> aClass = objClass.getClass();
|
|
|
+ Class<?>[] parameterTypes = method.getParameterTypes();
|
|
|
+ String id =subscribe.id();
|
|
|
+ if(StrUtil.isEmpty(id)){
|
|
|
+ id= aClass.getName() + method.getName() + (parameterTypes.length == 0 ? "" : parameterTypes[0].getName());
|
|
|
+ }
|
|
|
+ Subscription subscription = Subscription.builder()
|
|
|
+ .feature(subscribe.feature())
|
|
|
+ .topics(subscribe.value())
|
|
|
+ .subscriberId(id)
|
|
|
+ .build();
|
|
|
+ if (subscriptionSet.contains(subscription)) {
|
|
|
+ log.warn("subscription : [{}] has exist ,con't repeat register", JsonUtils.toJsonString(subscription));
|
|
|
+ }else {
|
|
|
+ eventBus.subscribe(subscription,proxyMessageListener);
|
|
|
+ subscriptionSet.add(subscription);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 判断返回值类型是否是集合或者数组类型
|
|
|
+ * @param returnType 类型
|
|
|
+ * @return 是否是集合或者数组类型
|
|
|
+ */
|
|
|
+ private boolean returnsMany(Class<?> returnType) {
|
|
|
+ //判断返回类型是否是集合类型
|
|
|
+ boolean isCollection = Collection.class.isAssignableFrom(returnType);
|
|
|
+ //判断返回类型是否是数组类型
|
|
|
+ boolean isArray = returnType.isArray();
|
|
|
+ return isCollection || isArray;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Method checkProxy(Method methodArg, Object bean) {
|
|
|
+ Method method = methodArg;
|
|
|
+ if (AopUtils.isJdkDynamicProxy(bean)) {
|
|
|
+ try {
|
|
|
+ // Found a @RabbitListener method on the target class for this JDK proxy ->
|
|
|
+ // is it also present on the proxy itself?
|
|
|
+ method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
|
|
|
+ Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
|
|
|
+ for (Class<?> iface : proxiedInterfaces) {
|
|
|
+ try {
|
|
|
+ method = iface.getMethod(method.getName(), method.getParameterTypes());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ catch (@SuppressWarnings("unused") NoSuchMethodException noMethod) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (SecurityException ex) {
|
|
|
+ ReflectionUtils.handleReflectionException(ex);
|
|
|
+ }
|
|
|
+ catch (NoSuchMethodException ex) {
|
|
|
+ throw new IllegalStateException(String.format(
|
|
|
+ "@Subscribe method '%s' found on bean target class '%s', " +
|
|
|
+ "but not found in any interface(s) for a bean JDK proxy. Either " +
|
|
|
+ "pull the method up to an interface or switch to subclass (CGLIB) " +
|
|
|
+ "proxies by setting proxy-target-class/proxyTargetClass " +
|
|
|
+ "attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()), ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return method;
|
|
|
+ }
|
|
|
+
|
|
|
+ private TypeMetadata buildMetadata(Class<?> targetClass){
|
|
|
+ final List<ListenerMethod> methods = new ArrayList<>();
|
|
|
+ //找到类中所有带有 @Subscribe 注解的方法
|
|
|
+ ReflectionUtils.doWithMethods(targetClass,method->{
|
|
|
+ List<Subscribe> methodSubscribes = findSubscribeAnnotation(method);
|
|
|
+ if (CollectionUtil.isNotEmpty(methodSubscribes)) {
|
|
|
+ methods.add(new ListenerMethod(method,methodSubscribes.toArray(new Subscribe[CollectionUtil.size(methodSubscribes)])));
|
|
|
+ }
|
|
|
+ }, ReflectionUtils.USER_DECLARED_METHODS);
|
|
|
+
|
|
|
+ return new TypeMetadata(methods.toArray(new ListenerMethod[methods.size()]));
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private List<Subscribe> findSubscribeAnnotation(AnnotatedElement element){
|
|
|
+ return MergedAnnotations.from(element, MergedAnnotations.SearchStrategy.TYPE_HIERARCHY)
|
|
|
+ .stream(Subscribe.class)
|
|
|
+ .map(MergedAnnotation::synthesize)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private static class TypeMetadata{
|
|
|
+ final ListenerMethod[] listenerMethods;
|
|
|
+
|
|
|
+ static final TypeMetadata EMPTY = new TypeMetadata();
|
|
|
+
|
|
|
+ private TypeMetadata() {
|
|
|
+ this.listenerMethods = new SubscribeListenerAnnotationBeanPostProcessor.ListenerMethod[0];
|
|
|
+ }
|
|
|
+
|
|
|
+ TypeMetadata(ListenerMethod[] methods) {
|
|
|
+ this.listenerMethods = methods;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class ListenerMethod {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅方法 带有{@link Subscribe}注解的方法
|
|
|
+ */
|
|
|
+ final Method method;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅注解 {@link Subscribe}
|
|
|
+ */
|
|
|
+ final Subscribe[] annotations;
|
|
|
+
|
|
|
+ ListenerMethod(Method method, Subscribe[] annotations) {
|
|
|
+ this.method = method;
|
|
|
+ this.annotations = annotations;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+}
|