diff --git a/src/main/asciidoc/reference/elasticsearch-misc.adoc b/src/main/asciidoc/reference/elasticsearch-misc.adoc index 1ea9ae680..412c6be26 100644 --- a/src/main/asciidoc/reference/elasticsearch-misc.adoc +++ b/src/main/asciidoc/reference/elasticsearch-misc.adoc @@ -121,7 +121,7 @@ Query searchQuery = NativeQuery.builder() .withPageable(PageRequest.of(0, 10)) .build(); -SearchHitsIterator stream = elasticsearchOperations.searchForStream(searchQuery, SampleEntity.class, +SearchHitsIterator stream = elasticsearchOperations.searchForStream(searchQuery, SampleEntity.class, index); List sampleEntities = new ArrayList<>(); @@ -134,7 +134,7 @@ stream.close(); ==== There are no methods in the `SearchOperations` API to access the scroll id, if it should be necessary to access this, -the following methods of the `AbstractElasticsearchTemplate` can be used (this is the base implementation for the +the following methods of the `AbstractElasticsearchTemplate` can be used (this is the base implementation for the different `ElasticsearchOperations` implementations: ==== @@ -275,3 +275,42 @@ SearchHits searchHits = operations.search(query, SomethingToBuy. ==== This works with every implementation of the `Query` interface. + +[[elasticsearch.misc.point-in-time]] +== Point In Time (PIT) API + +`ElasticsearchOperations` supports the point in time API of Elasticsearch (see https://www.elastic +.co/guide/en/elasticsearch/reference/8.3/point-in-time-api.html). The following code snippet shows how to use this +feature with a fictional `Person` class: + +==== +[source,java] +---- +ElasticsearchOperations operations; // autowired +Duration tenSeconds = Duration.ofSeconds(10); + +String pit = operations.openPointInTime(IndexCoordinates.of("person"), tenSeconds); <.> + +// create query for the pit +Query query1 = new CriteriaQueryBuilder(Criteria.where("lastName").is("Smith")) + .withPointInTime(new Query.PointInTime(pit, tenSeconds)) <.> + .build(); +SearchHits searchHits1 = operations.search(query1, Person.class); +// do something with the data + +// create 2nd query for the pit, use the id returned in the previous result +Query query2 = new CriteriaQueryBuilder(Criteria.where("lastName").is("Miller")) + .withPointInTime( + new Query.PointInTime(searchHits1.getPointInTimeId(), tenSeconds)) <.> + .build(); +SearchHits searchHits2 = operations.search(query2, Person.class); +// do something with the data + +operations.closePointInTime(searchHits2.getPointInTimeId()); <.> + +---- +<.> create a point in time for an index (can be multiple names) and a keep-alive duration and retrieve its id +<.> pass that id into the query to search together with the next keep-alive value +<.> for the next query, use the id returned from the previous search +<.> when done, close the point in time using the last returned id +==== diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java index fcbce7edd..156bf43df 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java @@ -26,9 +26,6 @@ import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; -import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback; -import org.springframework.data.elasticsearch.client.erhlc.ReactiveRestClients; -import org.springframework.data.elasticsearch.client.erhlc.RestClients; import org.springframework.http.HttpHeaders; import org.springframework.lang.Nullable; import org.springframework.web.reactive.function.client.WebClient; diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java index 36d13b4f2..50cf6b4be 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java @@ -31,8 +31,6 @@ import org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithRequiredEndpoint; import org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder; import org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder; -import org.springframework.data.elasticsearch.client.erhlc.ReactiveRestClients; -import org.springframework.data.elasticsearch.client.erhlc.RestClients; import org.springframework.http.HttpHeaders; import org.springframework.lang.Nullable; import org.springframework.util.Assert; diff --git a/src/main/java/org/springframework/data/elasticsearch/client/UnsupportedClientOperationException.java b/src/main/java/org/springframework/data/elasticsearch/client/UnsupportedClientOperationException.java new file mode 100644 index 000000000..86ab9fd3f --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/UnsupportedClientOperationException.java @@ -0,0 +1,25 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client; + +/** + * @author Peter-Josef Meisch + */ +public class UnsupportedClientOperationException extends RuntimeException { + public UnsupportedClientOperationException(Class clientClass, String operation) { + super("Client %1$s does not support the operation %2$s".formatted(clientClass, operation)); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/DocumentAdapters.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/DocumentAdapters.java index 80709d651..0e41d8c67 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/DocumentAdapters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/DocumentAdapters.java @@ -73,7 +73,7 @@ public static SearchDocument from(Hit hit, JsonpMapper jsonpMapper) { Map innerHits = new LinkedHashMap<>(); hit.innerHits().forEach((name, innerHitsResult) -> { // noinspection ReturnOfNull - innerHits.put(name, SearchDocumentResponseBuilder.from(innerHitsResult.hits(), null, null, null, + innerHits.put(name, SearchDocumentResponseBuilder.from(innerHitsResult.hits(), null, null, null, null, searchDocument -> null, jsonpMapper)); }); diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java index 394702786..ecac2b7ac 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java @@ -27,6 +27,7 @@ import co.elastic.clients.transport.Version; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -475,8 +476,30 @@ private List> doMultiSearch(List multiS /** * value class combining the information needed for a single query in a multisearch request. */ - record MultiSearchQueryParameter(Query query, Class clazz, IndexCoordinates index) { + record MultiSearchQueryParameter(Query query, Class clazz, IndexCoordinates index) { } + + @Override + public String openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) { + + Assert.notNull(index, "index must not be null"); + Assert.notNull(keepAlive, "keepAlive must not be null"); + Assert.notNull(ignoreUnavailable, "ignoreUnavailable must not be null"); + + var request = requestConverter.searchOpenPointInTimeRequest(index, keepAlive, ignoreUnavailable); + return execute(client -> client.openPointInTime(request)).id(); + } + + @Override + public Boolean closePointInTime(String pit) { + + Assert.notNull(pit, "pit must not be null"); + + ClosePointInTimeRequest request = requestConverter.searchClosePointInTime(pit); + var response = execute(client -> client.closePointInTime(request)); + return response.succeeded(); + } + // endregion // region client callback diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java index f1cc656a4..e8b674faa 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java @@ -32,16 +32,7 @@ import co.elastic.clients.elasticsearch._types.mapping.TypeMapping; import co.elastic.clients.elasticsearch._types.query_dsl.Like; import co.elastic.clients.elasticsearch.cluster.HealthRequest; -import co.elastic.clients.elasticsearch.core.BulkRequest; -import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest; -import co.elastic.clients.elasticsearch.core.DeleteRequest; -import co.elastic.clients.elasticsearch.core.GetRequest; -import co.elastic.clients.elasticsearch.core.IndexRequest; -import co.elastic.clients.elasticsearch.core.MgetRequest; -import co.elastic.clients.elasticsearch.core.MsearchRequest; -import co.elastic.clients.elasticsearch.core.SearchRequest; -import co.elastic.clients.elasticsearch.core.UpdateByQueryRequest; -import co.elastic.clients.elasticsearch.core.UpdateRequest; +import co.elastic.clients.elasticsearch.core.*; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; import co.elastic.clients.elasticsearch.core.bulk.CreateOperation; import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; @@ -52,6 +43,7 @@ import co.elastic.clients.elasticsearch.core.search.Rescore; import co.elastic.clients.elasticsearch.core.search.SourceConfig; import co.elastic.clients.elasticsearch.indices.*; +import co.elastic.clients.elasticsearch.indices.ExistsRequest; import co.elastic.clients.elasticsearch.indices.update_aliases.Action; import co.elastic.clients.json.JsonData; import co.elastic.clients.json.JsonpDeserializer; @@ -1164,10 +1156,24 @@ private void prepareSearchRequest(Query query, @Nullable Class clazz, Ind ElasticsearchPersistentEntity persistentEntity = getPersistentEntity(clazz); builder // - .index(Arrays.asList(indexNames)) // .version(true) // .trackScores(query.getTrackScores()); + var pointInTime = query.getPointInTime(); + if (pointInTime != null) { + builder.pit(pb -> pb.id(pointInTime.id()).keepAlive(time(pointInTime.keepAlive()))); + } else { + builder.index(Arrays.asList(indexNames)); + + if (query.getRoute() != null) { + builder.routing(query.getRoute()); + } + + if (query.getPreference() != null) { + builder.preference(query.getPreference()); + } + } + if (persistentEntity != null && persistentEntity.hasSeqNoPrimaryTermProperty()) { builder.seqNoPrimaryTerm(true); } @@ -1205,10 +1211,6 @@ private void prepareSearchRequest(Query query, @Nullable Class clazz, Ind builder.minScore((double) query.getMinScore()); } - if (query.getPreference() != null) { - builder.preference(query.getPreference()); - } - builder.searchType(searchType(query.getSearchType())); if (query.getSort() != null) { @@ -1233,10 +1235,6 @@ private void prepareSearchRequest(Query query, @Nullable Class clazz, Ind builder.trackTotalHits(th -> th.count(query.getTrackTotalHitsUpTo())); } - if (query.getRoute() != null) { - builder.routing(query.getRoute()); - } - builder.timeout(timeStringMs(query.getTimeout())); if (query.getExplain()) { @@ -1507,6 +1505,27 @@ public co.elastic.clients.elasticsearch._types.query_dsl.MoreLikeThisQuery moreL return moreLikeThisQuery; } + public OpenPointInTimeRequest searchOpenPointInTimeRequest(IndexCoordinates index, Duration keepAlive, + Boolean ignoreUnavailable) { + + Assert.notNull(index, "index must not be null"); + Assert.notNull(keepAlive, "keepAlive must not be null"); + Assert.notNull(ignoreUnavailable, "ignoreUnavailable must not be null"); + + return OpenPointInTimeRequest.of(opit -> opit // + .index(Arrays.asList(index.getIndexNames())) // + .ignoreUnavailable(ignoreUnavailable) // + .keepAlive(time(keepAlive)) // + ); + } + + public ClosePointInTimeRequest searchClosePointInTime(String pit) { + + Assert.notNull(pit, "pit must not be null"); + + return ClosePointInTimeRequest.of(cpit -> cpit.id(pit)); + } + // endregion // region helper functions diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/SearchDocumentResponseBuilder.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/SearchDocumentResponseBuilder.java index 077456229..8e1b96e89 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/SearchDocumentResponseBuilder.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/SearchDocumentResponseBuilder.java @@ -75,8 +75,9 @@ public static SearchDocumentResponse from(ResponseBody response String scrollId = responseBody.scrollId(); Map aggregations = responseBody.aggregations(); Map>> suggest = responseBody.suggest(); + var pointInTimeId = responseBody.pitId(); - return from(hitsMetadata, scrollId, aggregations, suggest, entityCreator, jsonpMapper); + return from(hitsMetadata, scrollId, pointInTimeId, aggregations, suggest, entityCreator, jsonpMapper); } /** @@ -93,8 +94,9 @@ public static SearchDocumentResponse from(ResponseBody response * @return the {@link SearchDocumentResponse} */ public static SearchDocumentResponse from(HitsMetadata hitsMetadata, @Nullable String scrollId, - @Nullable Map aggregations, Map>> suggestES, - SearchDocumentResponse.EntityCreator entityCreator, JsonpMapper jsonpMapper) { + @Nullable String pointInTimeId, @Nullable Map aggregations, + Map>> suggestES, SearchDocumentResponse.EntityCreator entityCreator, + JsonpMapper jsonpMapper) { Assert.notNull(hitsMetadata, "hitsMetadata must not be null"); @@ -126,7 +128,7 @@ public static SearchDocumentResponse from(HitsMetadata hitsMetadata, @Nul Suggest suggest = suggestFrom(suggestES, entityCreator); - return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments, + return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, pointInTimeId, searchDocuments, aggregationsContainer, suggest); } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/erhlc/SearchDocumentResponseBuilder.java b/src/main/java/org/springframework/data/elasticsearch/client/erhlc/SearchDocumentResponseBuilder.java index c31adce4b..b80af0c71 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/erhlc/SearchDocumentResponseBuilder.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/erhlc/SearchDocumentResponseBuilder.java @@ -113,7 +113,8 @@ public static SearchDocumentResponse from(SearchHits searchHits, @Nullable S : null; Suggest suggest = suggestFrom(suggestES, entityCreator); - return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments, + // no pointInTimeId for the deprecated implementation + return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, null, searchDocuments, aggregationsContainer, suggest); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java index 2b8690729..919c221de 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java @@ -28,6 +28,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.data.convert.EntityReader; +import org.springframework.data.elasticsearch.client.UnsupportedClientOperationException; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.document.Document; @@ -423,6 +424,16 @@ public void searchScrollClear(String scrollId) { abstract public void searchScrollClear(List scrollIds); + @Override + public String openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) { + throw new UnsupportedClientOperationException(getClass(), "openPointInTime"); + } + + @Override + public Boolean closePointInTime(String pit) { + throw new UnsupportedClientOperationException(getClass(), "closePointInTime"); + } + // endregion // region Helper methods diff --git a/src/main/java/org/springframework/data/elasticsearch/core/SearchHitMapping.java b/src/main/java/org/springframework/data/elasticsearch/core/SearchHitMapping.java index d7a40d18c..8e373449b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/SearchHitMapping.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/SearchHitMapping.java @@ -85,6 +85,7 @@ private SearchHitsImpl mapHitsFromResponse(SearchDocumentResponse searchDocum long totalHits = searchDocumentResponse.getTotalHits(); float maxScore = searchDocumentResponse.getMaxScore(); String scrollId = searchDocumentResponse.getScrollId(); + String pointInTimeId = searchDocumentResponse.getPointInTimeId(); List> searchHits = new ArrayList<>(); List searchDocuments = searchDocumentResponse.getSearchDocuments(); @@ -100,7 +101,8 @@ private SearchHitsImpl mapHitsFromResponse(SearchDocumentResponse searchDocum Suggest suggest = searchDocumentResponse.getSuggest(); mapHitsInCompletionSuggestion(suggest); - return new SearchHitsImpl<>(totalHits, totalHitsRelation, maxScore, scrollId, searchHits, aggregations, suggest); + return new SearchHitsImpl<>(totalHits, totalHitsRelation, maxScore, scrollId, pointInTimeId, searchHits, + aggregations, suggest); } @SuppressWarnings("unchecked") @@ -232,6 +234,7 @@ private SearchHits mapInnerDocuments(SearchHits searchHits, C searchHits.getTotalHitsRelation(), // searchHits.getMaxScore(), // scrollId, // + searchHits.getPointInTimeId(), // convertedSearchHits, // searchHits.getAggregations(), // searchHits.getSuggest()); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/SearchHits.java b/src/main/java/org/springframework/data/elasticsearch/core/SearchHits.java index 2d62fe295..c5c3d4e98 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/SearchHits.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/SearchHits.java @@ -100,4 +100,12 @@ default Iterator> iterator() { return getSearchHits().iterator(); } + /** + * When doing a search with a point in time, the response contains a new point in time id value. + * + * @return the new point in time id, if one was returned from Elasticsearch + * @since 5.0 + */ + @Nullable + String getPointInTimeId(); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/SearchHitsImpl.java b/src/main/java/org/springframework/data/elasticsearch/core/SearchHitsImpl.java index 76da69ef3..ac7557746 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/SearchHitsImpl.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/SearchHitsImpl.java @@ -41,6 +41,7 @@ public class SearchHitsImpl implements SearchScrollHits { private final Lazy>> unmodifiableSearchHits; @Nullable private final AggregationsContainer aggregations; @Nullable private final Suggest suggest; + @Nullable private String pointInTimeId; /** * @param totalHits the number of total hits for the search @@ -51,8 +52,8 @@ public class SearchHitsImpl implements SearchScrollHits { * @param aggregations the aggregations if available */ public SearchHitsImpl(long totalHits, TotalHitsRelation totalHitsRelation, float maxScore, @Nullable String scrollId, - List> searchHits, @Nullable AggregationsContainer aggregations, - @Nullable Suggest suggest) { + @Nullable String pointInTimeId, List> searchHits, + @Nullable AggregationsContainer aggregations, @Nullable Suggest suggest) { Assert.notNull(searchHits, "searchHits must not be null"); @@ -60,6 +61,7 @@ public SearchHitsImpl(long totalHits, TotalHitsRelation totalHitsRelation, float this.totalHitsRelation = totalHitsRelation; this.maxScore = maxScore; this.scrollId = scrollId; + this.pointInTimeId = pointInTimeId; this.searchHits = searchHits; this.aggregations = aggregations; this.suggest = suggest; @@ -110,6 +112,12 @@ public Suggest getSuggest() { return suggest; } + @Nullable + @Override + public String getPointInTimeId() { + return pointInTimeId; + } + @Override public String toString() { return "SearchHits{" + // @@ -117,6 +125,7 @@ public String toString() { ", totalHitsRelation=" + totalHitsRelation + // ", maxScore=" + maxScore + // ", scrollId='" + scrollId + '\'' + // + ", pointInTimeId='" + pointInTimeId + '\'' + // ", searchHits={" + searchHits.size() + " elements}" + // ", aggregations=" + aggregations + // '}'; diff --git a/src/main/java/org/springframework/data/elasticsearch/core/SearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/SearchOperations.java index da2234c8e..b1ef67b69 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/SearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/SearchOperations.java @@ -15,6 +15,7 @@ */ package org.springframework.data.elasticsearch.core; +import java.time.Duration; import java.util.List; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; @@ -216,4 +217,35 @@ default SearchHit searchOne(Query query, Class clazz, IndexCoordinates * @since 4.3 */ Query idsQuery(List ids); + + /** + * Opens a point in time (pit) in Elasticsearch. + * + * @param index the index name(s) to use + * @param keepAlive the duration the pit shoult be kept alive + * @return the pit identifier + * @since 5.0 + */ + default String openPointInTime(IndexCoordinates index, Duration keepAlive) { + return openPointInTime(index, keepAlive, false); + } + /** + * Opens a point in time (pit) in Elasticsearch. + * + * @param index the index name(s) to use + * @param keepAlive the duration the pit shoult be kept alive + * @param ignoreUnavailable if {$literal true} the call will fail if any of the indices is missing or closed + * @return the pit identifier + * @since 5.0 + */ + String openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable); + + /** + * Closes a point in time + * + * @param pit the pit identifier as returned by {@link #openPointInTime(IndexCoordinates, Duration, Boolean)} + * @return {@literal true} on success + * @since 5.0 + */ + Boolean closePointInTime(String pit); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java index f1d01e427..8d03a4b41 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java @@ -39,13 +39,16 @@ public class SearchDocumentResponse { @Nullable private final AggregationsContainer aggregations; @Nullable private final Suggest suggest; + @Nullable String pointInTimeId; + public SearchDocumentResponse(long totalHits, String totalHitsRelation, float maxScore, @Nullable String scrollId, - List searchDocuments, @Nullable AggregationsContainer aggregationsContainer, - @Nullable Suggest suggest) { + @Nullable String pointInTimeId, List searchDocuments, + @Nullable AggregationsContainer aggregationsContainer, @Nullable Suggest suggest) { this.totalHits = totalHits; this.totalHitsRelation = totalHitsRelation; this.maxScore = maxScore; this.scrollId = scrollId; + this.pointInTimeId = pointInTimeId; this.searchDocuments = searchDocuments; this.aggregations = aggregationsContainer; this.suggest = suggest; @@ -82,6 +85,14 @@ public Suggest getSuggest() { return suggest; } + /** + * @since 5.0 + */ + @Nullable + public String getPointInTimeId() { + return pointInTimeId; + } + /** * A function to convert a {@link SearchDocument} async into an entity. Asynchronous so that it can be used from the * imperative and the reactive code. diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java b/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java index 009a79a4e..825f188f2 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java @@ -72,6 +72,7 @@ public class BaseQuery implements Query { @Nullable protected Boolean requestCache; protected List idsWithRouting = Collections.emptyList(); protected final List runtimeFields = new ArrayList<>(); + @Nullable protected PointInTime pointInTime; public BaseQuery() {} @@ -83,7 +84,7 @@ public > BaseQuery(BaseQue this.storedFields = builder.getStoredFields(); this.sourceFilter = builder.getSourceFilter(); this.minScore = builder.getMinScore(); - this.ids = builder.getIds().isEmpty() ? null : builder.getIds(); + this.ids = builder.getIds() == null ? null : builder.getIds(); this.route = builder.getRoute(); this.searchType = builder.getSearchType(); this.indicesOptions = builder.getIndicesOptions(); @@ -101,6 +102,7 @@ public > BaseQuery(BaseQue this.rescorerQueries = builder.getRescorerQueries(); this.requestCache = builder.getRequestCache(); this.idsWithRouting = builder.getIdsWithRouting(); + this.pointInTime = builder.getPointInTime(); } @Override @@ -285,7 +287,6 @@ public boolean getTrackScores() { /** * Configures whether to track scores. * - * @param trackScores * @since 3.1 */ public void setTrackScores(boolean trackScores) { @@ -370,7 +371,6 @@ public Duration getTimeout() { /** * set the query timeout * - * @param timeout * @since 4.2 */ public void setTimeout(@Nullable Duration timeout) { @@ -451,4 +451,19 @@ public List getRuntimeFields() { public List getIndicesBoost() { return indicesBoost; } + + /** + * @since 5.0 + */ + @Nullable + public PointInTime getPointInTime() { + return pointInTime; + } + + /** + * @since 5.0 + */ + public void setPointInTime(@Nullable PointInTime pointInTime) { + this.pointInTime = pointInTime; + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQueryBuilder.java b/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQueryBuilder.java index 5a351e99c..70fac56e0 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQueryBuilder.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQueryBuilder.java @@ -64,6 +64,7 @@ public abstract class BaseQueryBuilder idsWithRouting = new ArrayList<>(); protected final List runtimeFields = new ArrayList<>(); + @Nullable protected Query.PointInTime pointInTime; @Nullable public Sort getSort() { @@ -182,6 +183,14 @@ public List getRescorerQueries() { return rescorerQueries; } + /** + * @since 5.0 + */ + @Nullable + public Query.PointInTime getPointInTime() { + return pointInTime; + } + public SELF withPageable(Pageable pageable) { this.pageable = pageable; return self(); @@ -358,6 +367,14 @@ public SELF withRescorerQuery(RescorerQuery rescorerQuery) { return self(); } + /** + * @since 5.0 + */ + public SELF withPointInTime(@Nullable Query.PointInTime pointInTime) { + this.pointInTime = pointInTime; + return self(); + } + public abstract Q build(); private SELF self() { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/Query.java b/src/main/java/org/springframework/data/elasticsearch/core/query/Query.java index 86b71e95e..a88a307cb 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/Query.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/Query.java @@ -439,6 +439,15 @@ default List getRescorerQueries() { @Nullable List getIndicesBoost(); + /** + * @return the point in time id to use in the query + * @since 5.0 + */ + @Nullable + default PointInTime getPointInTime() { + return null; + }; + /** * @since 4.3 */ @@ -451,13 +460,22 @@ enum SearchType { * * @since 4.3 */ - record IdWithRouting(String id, @Nullable String routing) { + record IdWithRouting(String id, @Nullable String routing) { public IdWithRouting { Assert.notNull(id, "id must not be null"); } + } + /** + * Desscribes the point in time parameters for a query + * + * @param id the point in time id + * @param keepAlive the new keep alive value to be sent with the query + * @since 5.0 + */ + record PointInTime(String id, Duration keepAlive) { } } diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java b/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java index 57ea3f73d..9ea2e5aa2 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java @@ -108,8 +108,8 @@ public Object execute(Object[] parameters) { int itemCount = (int) elasticsearchOperations.count(query, clazz, index); if (itemCount == 0) { - result = new SearchHitsImpl<>(0, TotalHitsRelation.EQUAL_TO, Float.NaN, null, Collections.emptyList(), null, - null); + result = new SearchHitsImpl<>(0, TotalHitsRelation.EQUAL_TO, Float.NaN, null, + query.getPointInTime() != null ? query.getPointInTime().id() : null, Collections.emptyList(), null, null); } else { query.setPageable(PageRequest.of(0, Math.max(1, itemCount))); } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/PointInTimeELCIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/PointInTimeELCIntegrationTests.java new file mode 100644 index 000000000..7e2e0029d --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/core/PointInTimeELCIntegrationTests.java @@ -0,0 +1,39 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchTemplateConfiguration; +import org.springframework.data.elasticsearch.utils.IndexNameProvider; +import org.springframework.test.context.ContextConfiguration; + +/** + * @author Peter-Josef Meisch + */ +@ContextConfiguration(classes = {PointInTimeELCIntegrationTests.Config.class }) +public class PointInTimeELCIntegrationTests extends PointInTimeIntegrationTests { + + @Configuration + @Import({ElasticsearchTemplateConfiguration.class }) + static class Config { + @Bean + IndexNameProvider indexNameProvider() { + return new IndexNameProvider("point-in-time"); + } + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/core/PointInTimeERHLCIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/PointInTimeERHLCIntegrationTests.java new file mode 100644 index 000000000..67d0e6127 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/core/PointInTimeERHLCIntegrationTests.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import org.junit.jupiter.api.Disabled; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration; +import org.springframework.data.elasticsearch.utils.IndexNameProvider; +import org.springframework.test.context.ContextConfiguration; + +/** + * This test class is disabled on purpose. PIT will be introduced in Spring Data Elasticsearch 5.0 where the old + * RestHighLevelClient and the {@link org.springframework.data.elasticsearch.client.erhlc.ElasticsearchRestTemplate} are + * deprecated. We therefore do not add new features to this implementation anymore. + * + * @author Peter-Josef Meisch + */ +@Disabled +@ContextConfiguration(classes = { PointInTimeERHLCIntegrationTests.Config.class }) +public class PointInTimeERHLCIntegrationTests extends PointInTimeIntegrationTests { + + @Configuration + @Import({ ElasticsearchRestTemplateConfiguration.class }) + static class Config { + @Bean + IndexNameProvider indexNameProvider() { + return new IndexNameProvider("point-in-time-es7"); + } + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/core/PointInTimeIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/PointInTimeIntegrationTests.java new file mode 100644 index 000000000..b55147562 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/core/PointInTimeIntegrationTests.java @@ -0,0 +1,110 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import static org.assertj.core.api.Assertions.*; + +import java.time.Duration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.annotation.Id; +import org.springframework.data.elasticsearch.annotations.Document; +import org.springframework.data.elasticsearch.annotations.Field; +import org.springframework.data.elasticsearch.annotations.FieldType; +import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.Criteria; +import org.springframework.data.elasticsearch.core.query.CriteriaQuery; +import org.springframework.data.elasticsearch.core.query.CriteriaQueryBuilder; +import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; +import org.springframework.data.elasticsearch.utils.IndexNameProvider; +import org.springframework.lang.Nullable; +import org.springframework.util.StringUtils; + +/** + * Integration tests for the point in time API. + * + * @author Peter-Josef Meisch + */ +@SpringIntegrationTest +public abstract class PointInTimeIntegrationTests { + + @Autowired ElasticsearchOperations operations; + @Autowired IndexNameProvider indexNameProvider; + @Nullable IndexOperations indexOperations; + + @BeforeEach + void setUp() { + indexNameProvider.increment(); + indexOperations = operations.indexOps(SampleEntity.class); + indexOperations.createWithMapping(); + } + + @Test + @Order(Integer.MAX_VALUE) + void cleanup() { + operations.indexOps(IndexCoordinates.of(indexNameProvider.getPrefix() + '*')).delete(); + } + + @Test // #1684 + @DisplayName("should create pit search with it and delete it again") + void shouldCreatePitSearchWithItAndDeleteItAgain() { + + // insert 2 records, one smith + operations.save(new SampleEntity("1", "John", "Smith"), new SampleEntity("2", "Mike", "Cutter")); + + // seach for smith + var searchQuery = new CriteriaQuery(Criteria.where("lastName").is("Smith")); + var searchHits = operations.search(searchQuery, SampleEntity.class); + assertThat(searchHits.getTotalHits()).isEqualTo(1); + + // create pit + var pit = operations.openPointInTime(IndexCoordinates.of(indexNameProvider.indexName()), Duration.ofMinutes(10)); + assertThat(StringUtils.hasText(pit)).isTrue(); + + // add another smith + operations.save(new SampleEntity("3", "Harry", "Smith")); + + // search with pit -> 1 smith + var pitQuery = new CriteriaQueryBuilder(Criteria.where("lastName").is("Smith")) // + .withPointInTime(new Query.PointInTime(pit, Duration.ofMinutes(10))) // + .build(); + searchHits = operations.search(pitQuery, SampleEntity.class); + assertThat(searchHits.getTotalHits()).isEqualTo(1); + var newPit = searchHits.getPointInTimeId(); + assertThat(StringUtils.hasText(newPit)).isTrue(); + + // search without pit -> 2 smiths + searchHits = operations.search(searchQuery, SampleEntity.class); + assertThat(searchHits.getTotalHits()).isEqualTo(2); + + // close pit + var success = operations.closePointInTime(newPit); + assertThat(success).isTrue(); + } + + @Document(indexName = "#{@indexNameProvider.indexName()}") + record SampleEntity( // + @Nullable @Id String id, // + @Field(type = FieldType.Text) String firstName, // + @Field(type = FieldType.Text) String lastName // + ) { + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/core/SearchHitSupportTest.java b/src/test/java/org/springframework/data/elasticsearch/core/SearchHitSupportTest.java index f58c27d51..a451d8207 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/SearchHitSupportTest.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/SearchHitSupportTest.java @@ -65,7 +65,7 @@ void shouldReturnTheSameListInstanceInSearchHitsAndGetContent() { hits.add(new SearchHit<>(null, null, null, 0, null, null, null, null, null, null, "five")); SearchHits originalSearchHits = new SearchHitsImpl<>(hits.size(), TotalHitsRelation.EQUAL_TO, 0, "scroll", - hits, null, null); + null, hits, null, null); SearchPage searchPage = SearchHitSupport.searchPageFor(originalSearchHits, PageRequest.of(0, 3)); SearchHits searchHits = searchPage.getSearchHits(); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/StreamQueriesTest.java b/src/test/java/org/springframework/data/elasticsearch/core/StreamQueriesTest.java index b92132020..1cbe5ac51 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/StreamQueriesTest.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/StreamQueriesTest.java @@ -180,6 +180,6 @@ void shouldOnlyReturnRequestedCount() { } private SearchScrollHits newSearchScrollHits(List> hits, String scrollId) { - return new SearchHitsImpl<>(hits.size(), TotalHitsRelation.EQUAL_TO, 0, scrollId, hits, null, null); + return new SearchHitsImpl<>(hits.size(), TotalHitsRelation.EQUAL_TO, 0, scrollId, null, hits, null, null); } } diff --git a/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/SpringDataElasticsearchExtension.java b/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/SpringDataElasticsearchExtension.java index 962c31b55..ec110393c 100644 --- a/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/SpringDataElasticsearchExtension.java +++ b/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/SpringDataElasticsearchExtension.java @@ -33,7 +33,7 @@ import org.springframework.test.context.MergedContextConfiguration; /** - * This extension class check in the {@link #beforeAll(ExtensionContext)} call if there is already a Elasticsearch + * This extension class check in the {@link #beforeAll(ExtensionContext)} call if there is already an Elasticsearch * cluster connection defined in the root store. If no, the connection to the cluster is defined according to the * configuration, starting a local node if necessary. The connection is stored and will be closed when the store is * shutdown at the end of all tests. diff --git a/src/test/java/org/springframework/data/elasticsearch/utils/IndexNameProvider.java b/src/test/java/org/springframework/data/elasticsearch/utils/IndexNameProvider.java index 231803e19..5cd3d72bf 100644 --- a/src/test/java/org/springframework/data/elasticsearch/utils/IndexNameProvider.java +++ b/src/test/java/org/springframework/data/elasticsearch/utils/IndexNameProvider.java @@ -35,7 +35,7 @@ public IndexNameProvider(String prefix) { } public void increment() { - indexName = prefix + "-" + ++idx; + indexName = prefix + '-' + ++idx; } public String indexName() {