Skip to content

Implement the point in time API. #2273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions src/main/asciidoc/reference/elasticsearch-misc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Query searchQuery = NativeQuery.builder()
.withPageable(PageRequest.of(0, 10))
.build();

SearchHitsIterator<SampleEntity> stream = elasticsearchOperations.searchForStream(searchQuery, SampleEntity.class,
SearchHitsIterator<SampleEntity> stream = elasticsearchOperations.searchForStream(searchQuery, SampleEntity.class,
index);

List<SampleEntity> sampleEntities = new ArrayList<>();
Expand All @@ -134,7 +134,7 @@ stream.close();
====

There are no methods in the `SearchOperations` API to access the scroll id, if it should be necessary to access this,
the following methods of the `AbstractElasticsearchTemplate` can be used (this is the base implementation for the
the following methods of the `AbstractElasticsearchTemplate` can be used (this is the base implementation for the
different `ElasticsearchOperations` implementations:

====
Expand Down Expand Up @@ -275,3 +275,42 @@ SearchHits<SomethingToBuy> searchHits = operations.search(query, SomethingToBuy.
====

This works with every implementation of the `Query` interface.

[[elasticsearch.misc.point-in-time]]
== Point In Time (PIT) API

`ElasticsearchOperations` supports the point in time API of Elasticsearch (see https://www.elastic
.co/guide/en/elasticsearch/reference/8.3/point-in-time-api.html). The following code snippet shows how to use this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong line break here 😄

preview

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'm currently checking the documentation anyway for the next release, so this will go in there as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the way, will PIT be added into 4.x release? I didn't find milestones related to this pr.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was in 5.0.M6 (I obviously was neglecting setting the milestones when closing issues this summer). So it will be in version 5.

It won't be backported to 4.4. After a version (minor) is released, it will be only updated with bugfixes and dependency patch updates, not with new features or API changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, thanks~

feature with a fictional `Person` class:

====
[source,java]
----
ElasticsearchOperations operations; // autowired
Duration tenSeconds = Duration.ofSeconds(10);

String pit = operations.openPointInTime(IndexCoordinates.of("person"), tenSeconds); <.>

// create query for the pit
Query query1 = new CriteriaQueryBuilder(Criteria.where("lastName").is("Smith"))
.withPointInTime(new Query.PointInTime(pit, tenSeconds)) <.>
.build();
SearchHits<Person> searchHits1 = operations.search(query1, Person.class);
// do something with the data

// create 2nd query for the pit, use the id returned in the previous result
Query query2 = new CriteriaQueryBuilder(Criteria.where("lastName").is("Miller"))
.withPointInTime(
new Query.PointInTime(searchHits1.getPointInTimeId(), tenSeconds)) <.>
.build();
SearchHits<Person> searchHits2 = operations.search(query2, Person.class);
// do something with the data

operations.closePointInTime(searchHits2.getPointInTimeId()); <.>

----
<.> create a point in time for an index (can be multiple names) and a keep-alive duration and retrieve its id
<.> pass that id into the query to search together with the next keep-alive value
<.> for the next query, use the id returned from the previous search
<.> when done, close the point in time using the last returned id
====
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;

import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.springframework.data.elasticsearch.client.erhlc.ReactiveRestClients;
import org.springframework.data.elasticsearch.client.erhlc.RestClients;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.function.client.WebClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithRequiredEndpoint;
import org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder;
import org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder;
import org.springframework.data.elasticsearch.client.erhlc.ReactiveRestClients;
import org.springframework.data.elasticsearch.client.erhlc.RestClients;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client;

/**
* @author Peter-Josef Meisch
*/
public class UnsupportedClientOperationException extends RuntimeException {
public UnsupportedClientOperationException(Class<?> clientClass, String operation) {
super("Client %1$s does not support the operation %2$s".formatted(clientClass, operation));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static SearchDocument from(Hit<?> hit, JsonpMapper jsonpMapper) {
Map<String, SearchDocumentResponse> innerHits = new LinkedHashMap<>();
hit.innerHits().forEach((name, innerHitsResult) -> {
// noinspection ReturnOfNull
innerHits.put(name, SearchDocumentResponseBuilder.from(innerHitsResult.hits(), null, null, null,
innerHits.put(name, SearchDocumentResponseBuilder.from(innerHitsResult.hits(), null, null, null, null,
searchDocument -> null, jsonpMapper));
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import co.elastic.clients.transport.Version;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -475,8 +476,30 @@ private List<SearchHits<?>> doMultiSearch(List<MultiSearchQueryParameter> multiS
/**
* value class combining the information needed for a single query in a multisearch request.
*/
record MultiSearchQueryParameter(Query query, Class<?> clazz, IndexCoordinates index) {
record MultiSearchQueryParameter(Query query, Class<?> clazz, IndexCoordinates index) {
}

@Override
public String openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) {

Assert.notNull(index, "index must not be null");
Assert.notNull(keepAlive, "keepAlive must not be null");
Assert.notNull(ignoreUnavailable, "ignoreUnavailable must not be null");

var request = requestConverter.searchOpenPointInTimeRequest(index, keepAlive, ignoreUnavailable);
return execute(client -> client.openPointInTime(request)).id();
}

@Override
public Boolean closePointInTime(String pit) {

Assert.notNull(pit, "pit must not be null");

ClosePointInTimeRequest request = requestConverter.searchClosePointInTime(pit);
var response = execute(client -> client.closePointInTime(request));
return response.succeeded();
}

// endregion

// region client callback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,7 @@
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch._types.query_dsl.Like;
import co.elastic.clients.elasticsearch.cluster.HealthRequest;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
import co.elastic.clients.elasticsearch.core.DeleteRequest;
import co.elastic.clients.elasticsearch.core.GetRequest;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.MgetRequest;
import co.elastic.clients.elasticsearch.core.MsearchRequest;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.UpdateByQueryRequest;
import co.elastic.clients.elasticsearch.core.UpdateRequest;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
Expand All @@ -52,6 +43,7 @@
import co.elastic.clients.elasticsearch.core.search.Rescore;
import co.elastic.clients.elasticsearch.core.search.SourceConfig;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.elasticsearch.indices.update_aliases.Action;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.JsonpDeserializer;
Expand Down Expand Up @@ -1164,10 +1156,24 @@ private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, Ind
ElasticsearchPersistentEntity<?> persistentEntity = getPersistentEntity(clazz);

builder //
.index(Arrays.asList(indexNames)) //
.version(true) //
.trackScores(query.getTrackScores());

var pointInTime = query.getPointInTime();
if (pointInTime != null) {
builder.pit(pb -> pb.id(pointInTime.id()).keepAlive(time(pointInTime.keepAlive())));
} else {
builder.index(Arrays.asList(indexNames));

if (query.getRoute() != null) {
builder.routing(query.getRoute());
}

if (query.getPreference() != null) {
builder.preference(query.getPreference());
}
}

if (persistentEntity != null && persistentEntity.hasSeqNoPrimaryTermProperty()) {
builder.seqNoPrimaryTerm(true);
}
Expand Down Expand Up @@ -1205,10 +1211,6 @@ private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, Ind
builder.minScore((double) query.getMinScore());
}

if (query.getPreference() != null) {
builder.preference(query.getPreference());
}

builder.searchType(searchType(query.getSearchType()));

if (query.getSort() != null) {
Expand All @@ -1233,10 +1235,6 @@ private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, Ind
builder.trackTotalHits(th -> th.count(query.getTrackTotalHitsUpTo()));
}

if (query.getRoute() != null) {
builder.routing(query.getRoute());
}

builder.timeout(timeStringMs(query.getTimeout()));

if (query.getExplain()) {
Expand Down Expand Up @@ -1507,6 +1505,27 @@ public co.elastic.clients.elasticsearch._types.query_dsl.MoreLikeThisQuery moreL
return moreLikeThisQuery;
}

public OpenPointInTimeRequest searchOpenPointInTimeRequest(IndexCoordinates index, Duration keepAlive,
Boolean ignoreUnavailable) {

Assert.notNull(index, "index must not be null");
Assert.notNull(keepAlive, "keepAlive must not be null");
Assert.notNull(ignoreUnavailable, "ignoreUnavailable must not be null");

return OpenPointInTimeRequest.of(opit -> opit //
.index(Arrays.asList(index.getIndexNames())) //
.ignoreUnavailable(ignoreUnavailable) //
.keepAlive(time(keepAlive)) //
);
}

public ClosePointInTimeRequest searchClosePointInTime(String pit) {

Assert.notNull(pit, "pit must not be null");

return ClosePointInTimeRequest.of(cpit -> cpit.id(pit));
}

// endregion

// region helper functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ public static <T> SearchDocumentResponse from(ResponseBody<EntityAsMap> response
String scrollId = responseBody.scrollId();
Map<String, Aggregate> aggregations = responseBody.aggregations();
Map<String, List<Suggestion<EntityAsMap>>> suggest = responseBody.suggest();
var pointInTimeId = responseBody.pitId();

return from(hitsMetadata, scrollId, aggregations, suggest, entityCreator, jsonpMapper);
return from(hitsMetadata, scrollId, pointInTimeId, aggregations, suggest, entityCreator, jsonpMapper);
}

/**
Expand All @@ -93,8 +94,9 @@ public static <T> SearchDocumentResponse from(ResponseBody<EntityAsMap> response
* @return the {@link SearchDocumentResponse}
*/
public static <T> SearchDocumentResponse from(HitsMetadata<?> hitsMetadata, @Nullable String scrollId,
@Nullable Map<String, Aggregate> aggregations, Map<String, List<Suggestion<EntityAsMap>>> suggestES,
SearchDocumentResponse.EntityCreator<T> entityCreator, JsonpMapper jsonpMapper) {
@Nullable String pointInTimeId, @Nullable Map<String, Aggregate> aggregations,
Map<String, List<Suggestion<EntityAsMap>>> suggestES, SearchDocumentResponse.EntityCreator<T> entityCreator,
JsonpMapper jsonpMapper) {

Assert.notNull(hitsMetadata, "hitsMetadata must not be null");

Expand Down Expand Up @@ -126,7 +128,7 @@ public static <T> SearchDocumentResponse from(HitsMetadata<?> hitsMetadata, @Nul

Suggest suggest = suggestFrom(suggestES, entityCreator);

return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments,
return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, pointInTimeId, searchDocuments,
aggregationsContainer, suggest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public static <T> SearchDocumentResponse from(SearchHits searchHits, @Nullable S
: null;
Suggest suggest = suggestFrom(suggestES, entityCreator);

return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments,
// no pointInTimeId for the deprecated implementation
return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, null, searchDocuments,
aggregationsContainer, suggest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.convert.EntityReader;
import org.springframework.data.elasticsearch.client.UnsupportedClientOperationException;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
Expand Down Expand Up @@ -423,6 +424,16 @@ public void searchScrollClear(String scrollId) {

abstract public void searchScrollClear(List<String> scrollIds);

@Override
public String openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) {
throw new UnsupportedClientOperationException(getClass(), "openPointInTime");
}

@Override
public Boolean closePointInTime(String pit) {
throw new UnsupportedClientOperationException(getClass(), "closePointInTime");
}

// endregion

// region Helper methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private SearchHitsImpl<T> mapHitsFromResponse(SearchDocumentResponse searchDocum
long totalHits = searchDocumentResponse.getTotalHits();
float maxScore = searchDocumentResponse.getMaxScore();
String scrollId = searchDocumentResponse.getScrollId();
String pointInTimeId = searchDocumentResponse.getPointInTimeId();

List<SearchHit<T>> searchHits = new ArrayList<>();
List<SearchDocument> searchDocuments = searchDocumentResponse.getSearchDocuments();
Expand All @@ -100,7 +101,8 @@ private SearchHitsImpl<T> mapHitsFromResponse(SearchDocumentResponse searchDocum
Suggest suggest = searchDocumentResponse.getSuggest();
mapHitsInCompletionSuggestion(suggest);

return new SearchHitsImpl<>(totalHits, totalHitsRelation, maxScore, scrollId, searchHits, aggregations, suggest);
return new SearchHitsImpl<>(totalHits, totalHitsRelation, maxScore, scrollId, pointInTimeId, searchHits,
aggregations, suggest);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -232,6 +234,7 @@ private SearchHits<?> mapInnerDocuments(SearchHits<SearchDocument> searchHits, C
searchHits.getTotalHitsRelation(), //
searchHits.getMaxScore(), //
scrollId, //
searchHits.getPointInTimeId(), //
convertedSearchHits, //
searchHits.getAggregations(), //
searchHits.getSuggest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,12 @@ default Iterator<SearchHit<T>> iterator() {
return getSearchHits().iterator();
}

/**
* When doing a search with a point in time, the response contains a new point in time id value.
*
* @return the new point in time id, if one was returned from Elasticsearch
* @since 5.0
*/
@Nullable
String getPointInTimeId();
}
Loading