|
|
@@ -27,6 +27,7 @@ import org.hswebframework.utils.time.DateFormatter;
|
|
|
import org.hswebframework.utils.time.DefaultDateFormatter;
|
|
|
import org.hswebframework.web.api.crud.entity.PagerResult;
|
|
|
import org.hswebframework.web.bean.FastBeanCopier;
|
|
|
+import org.jetlinks.community.elastic.search.geo.EsGeoFilter;
|
|
|
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
|
|
|
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
|
|
|
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
|
|
|
@@ -139,8 +140,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
|
|
|
- return this.doQuery(index, queryParam)
|
|
|
+ public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, EsGeoFilter filter, Function<Map<String, Object>, T> mapper) {
|
|
|
+ return this.doQuery(index, queryParam,filter)
|
|
|
.flatMap(tp2 -> this
|
|
|
.convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
|
|
|
.collectList()
|
|
|
@@ -153,6 +154,22 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
.switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
|
|
|
+// return this.doQuery(index, queryParam)
|
|
|
+// .flatMap(tp2 -> this
|
|
|
+// .convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
|
|
|
+// .collectList()
|
|
|
+// .filter(CollectionUtils::isNotEmpty)
|
|
|
+// .map(list -> PagerResult.of((int) tp2
|
|
|
+// .getT2()
|
|
|
+// .getHits()
|
|
|
+// .getTotalHits().value, list, queryParam))
|
|
|
+// )
|
|
|
+// .switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
|
|
|
+ return queryPager(index,queryParam,null,mapper);
|
|
|
+ }
|
|
|
+
|
|
|
private <T> Flux<T> convertQueryResult(List<ElasticSearchIndexMetadata> indexList,
|
|
|
SearchResponse response,
|
|
|
Function<Map<String, Object>, T> mapper) {
|
|
|
@@ -174,15 +191,14 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
});
|
|
|
|
|
|
}
|
|
|
-
|
|
|
private Mono<Tuple2<List<ElasticSearchIndexMetadata>, SearchResponse>> doQuery(String[] index,
|
|
|
- QueryParam queryParam) {
|
|
|
+ QueryParam queryParam,EsGeoFilter filter) {
|
|
|
return indexManager
|
|
|
.getIndexesMetadata(index)
|
|
|
.collectList()
|
|
|
.filter(CollectionUtils::isNotEmpty)
|
|
|
.flatMap(metadataList -> this
|
|
|
- .createSearchRequest(queryParam, metadataList)
|
|
|
+ .createSearchRequest(queryParam,filter, metadataList)
|
|
|
.flatMap(restClient::searchForPage)
|
|
|
.map(response -> Tuples.of(metadataList, response))
|
|
|
).onErrorResume(err -> {
|
|
|
@@ -191,6 +207,24 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ private Mono<Tuple2<List<ElasticSearchIndexMetadata>, SearchResponse>> doQuery(String[] index,
|
|
|
+ QueryParam queryParam) {
|
|
|
+// return indexManager
|
|
|
+// .getIndexesMetadata(index)
|
|
|
+// .collectList()
|
|
|
+// .filter(CollectionUtils::isNotEmpty)
|
|
|
+// .flatMap(metadataList -> this
|
|
|
+// .createSearchRequest(queryParam, metadataList)
|
|
|
+// .flatMap(restClient::searchForPage)
|
|
|
+// .map(response -> Tuples.of(metadataList, response))
|
|
|
+// ).onErrorResume(err -> {
|
|
|
+// log.error(err.getMessage(), err);
|
|
|
+// return Mono.empty();
|
|
|
+// });
|
|
|
+ return doQuery(index,queryParam,null);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public Mono<Long> count(String[] index, QueryParam queryParam) {
|
|
|
QueryParam param = queryParam.clone();
|
|
|
@@ -412,9 +446,9 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
.flatMap(list -> createSearchRequest(queryParam, list));
|
|
|
}
|
|
|
|
|
|
- protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
|
|
|
+ protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, EsGeoFilter filter,List<ElasticSearchIndexMetadata> indexes) {
|
|
|
|
|
|
- SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
|
|
|
+ SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, filter,indexes.get(0));
|
|
|
return Flux.fromIterable(indexes)
|
|
|
.flatMap(index -> getIndexForSearch(index.getIndex()))
|
|
|
.collectList()
|
|
|
@@ -424,6 +458,19 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
|
|
|
.indicesOptions(indexOptions));
|
|
|
}
|
|
|
|
|
|
+ protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
|
|
|
+
|
|
|
+// SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
|
|
|
+// return Flux.fromIterable(indexes)
|
|
|
+// .flatMap(index -> getIndexForSearch(index.getIndex()))
|
|
|
+// .collectList()
|
|
|
+// .map(indexList ->
|
|
|
+// new SearchRequest(indexList.toArray(new String[0]))
|
|
|
+// .source(builder)
|
|
|
+// .indicesOptions(indexOptions));
|
|
|
+ return createSearchRequest(queryParam,null,indexes);
|
|
|
+ }
|
|
|
+
|
|
|
protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
|
|
|
return indexManager
|
|
|
.getIndexMetadata(index)
|