Skip to content

[DE-510] Retriable cursor (v6) #506

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/main/java/com/arangodb/ArangoCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@

import com.arangodb.entity.CursorEntity.Stats;
import com.arangodb.entity.CursorEntity.Warning;
import com.arangodb.model.AqlQueryOptions;


import java.io.Closeable;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;

/**
* @author Mark Vollmary
Expand Down Expand Up @@ -76,4 +79,27 @@ public interface ArangoCursor<T> extends ArangoIterable<T>, ArangoIterator<T>, C
*/
boolean isPotentialDirtyRead();

/**
* @return The ID of the batch after the current one. The first batch has an ID of 1 and the value is incremented by
* 1 with every batch. Only set if the allowRetry query option is enabled.
* @since ArangoDB 3.11
*/
String getNextBatchId();

/**
* Returns the next element in the iteration.
* <p/>
* If the cursor allows retries (see {@link AqlQueryOptions#allowRetry(Boolean)}), then it is safe to retry invoking
* this method in case of I/O exceptions (which are actually thrown as {@link com.arangodb.ArangoDBException} with
* cause {@link java.io.IOException}).
* <p/>
* If the cursor does not allow retries (default), then it is not safe to retry invoking this method in case of I/O
* exceptions, since the request to fetch the next batch is not idempotent (i.e. the cursor may advance multiple
* times on the server).
*
* @return the next element in the iteration
* @throws NoSuchElementException if the iteration has no more elements
*/
@Override
T next();
}
14 changes: 14 additions & 0 deletions src/main/java/com/arangodb/ArangoDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,20 @@ <T> ArangoCursor<T> query(String query, Map<String, Object> bindVars, AqlQueryOp
*/
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type) throws ArangoDBException;

/**
* Return an cursor from the given cursor-ID if still existing
*
* @param cursorId The ID of the cursor
* @param type The type of the result (POJO class, VPackSlice, String for JSON, or Collection/List/Map)
* @param nextBatchId The ID of the next cursor batch (set only if cursor allows retries, see
* {@link AqlQueryOptions#allowRetry(Boolean)}
* @return cursor of the results
* @see <a href= "https://www.arangodb.com/docs/stable/http/aql-query-cursor-accessing-cursors
* .html#read-next-batch-from-cursor">API Documentation</a>
* @since ArangoDB 3.11
*/
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type, String nextBatchId);

/**
* Explain an AQL query and return information about it
*
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/arangodb/async/ArangoDatabaseAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.arangodb.async;

import com.arangodb.ArangoCursor;
import com.arangodb.ArangoDBException;
import com.arangodb.ArangoSerializationAccessor;
import com.arangodb.DbName;
Expand Down Expand Up @@ -331,6 +332,20 @@ <T> CompletableFuture<ArangoCursorAsync<T>> query(
*/
<T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId, final Class<T> type);

/**
* Return an cursor from the given cursor-ID if still existing
*
* @param cursorId The ID of the cursor
* @param type The type of the result (POJO class, VPackSlice, String for JSON, or Collection/List/Map)
* @param nextBatchId The ID of the next cursor batch (set only if cursor allows retries, see
* {@link AqlQueryOptions#allowRetry(Boolean)}
* @return cursor of the results
* @see <a href= "https://www.arangodb.com/docs/stable/http/aql-query-cursor-accessing-cursors
* .html#read-next-batch-from-cursor">API Documentation</a>
* @since ArangoDB 3.11
*/
<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, String nextBatchId);

/**
* Explain an AQL query and return information about it
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,28 @@ public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId,
return execution.thenApply(result -> createCursor(result, type, null, hostHandle));
}

@Override
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId, final Class<T> type,
final String nextBatchId) {
final HostHandle hostHandle = new HostHandle();
final CompletableFuture<CursorEntity> execution = executor.execute(queryNextByBatchIdRequest(cursorId,
nextBatchId, null,
null),
CursorEntity.class, hostHandle);
return execution.thenApply(result -> createCursor(result, type, null, hostHandle));
}

private <T> ArangoCursorAsync<T> createCursor(
final CursorEntity result,
final Class<T> type,
final AqlQueryOptions options,
final HostHandle hostHandle) {
return new ArangoCursorAsyncImpl<>(this, new ArangoCursorExecute() {
@Override
public CursorEntity next(final String id, Map<String, String> meta) {
final CompletableFuture<CursorEntity> result = executor.execute(queryNextRequest(id, options, meta),
CursorEntity.class, hostHandle);
public CursorEntity next(final String id, Map<String, String> meta, String nextBatchId) {
Request request = nextBatchId == null ?
queryNextRequest(id, options, meta) : queryNextByBatchIdRequest(id, nextBatchId, options, meta);
final CompletableFuture<CursorEntity> result = executor.execute(request, CursorEntity.class, hostHandle);
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/arangodb/entity/CursorEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class CursorEntity implements Entity, MetaAware {
private VPackSlice result;

private Map<String, String> meta;
private String nextBatchId;

public String getId() {
return id;
Expand Down Expand Up @@ -94,6 +95,10 @@ public Map<String, String> getMeta() {
return meta;
}

public String getNextBatchId() {
return nextBatchId;
}

/**
* @return remove not allowed (valid storable) meta information
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
public interface ArangoCursorExecute {

CursorEntity next(String id, Map<String, String> meta) throws ArangoDBException;
CursorEntity next(String id, Map<String, String> meta, String nextBatchId) throws ArangoDBException;

void close(String id, Map<String, String> meta) throws ArangoDBException;

Expand Down
14 changes: 12 additions & 2 deletions src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,14 @@ public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type) th
return createCursor(result, type, null, hostHandle);
}

@Override
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
final HostHandle hostHandle = new HostHandle();
final CursorEntity result = executor
.execute(queryNextByBatchIdRequest(cursorId, nextBatchId, null, null), CursorEntity.class, hostHandle);
return createCursor(result, type, null, hostHandle);
}

private <T> ArangoCursor<T> createCursor(
final CursorEntity result,
final Class<T> type,
Expand All @@ -208,8 +216,10 @@ private <T> ArangoCursor<T> createCursor(

final ArangoCursorExecute execute = new ArangoCursorExecute() {
@Override
public CursorEntity next(final String id, Map<String, String> meta) {
return executor.execute(queryNextRequest(id, options, meta), CursorEntity.class, hostHandle);
public CursorEntity next(String id, Map<String, String> meta, String nextBatchId) {
Request request = nextBatchId == null ?
queryNextRequest(id, options, meta) : queryNextByBatchIdRequest(id, nextBatchId, options, meta);
return executor.execute(request, CursorEntity.class, hostHandle);
}

@Override
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/com/arangodb/internal/InternalArangoDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,17 @@ protected Request queryRequest(
return request;
}

protected Request queryNextRequest(final String id, final AqlQueryOptions options, Map<String, String> meta) {

protected Request queryNextRequest(String id, AqlQueryOptions options, Map<String, String> meta) {
final Request request = request(dbName, RequestType.POST, PATH_API_CURSOR, id);
return completeQueryNextRequest(request, options, meta);
}

protected Request queryNextByBatchIdRequest(String id, String nextBatchId, AqlQueryOptions options, Map<String, String> meta) {
final Request request = request(dbName, RequestType.POST, PATH_API_CURSOR, id, nextBatchId);
return completeQueryNextRequest(request, options, meta);
}

private Request completeQueryNextRequest(Request request, AqlQueryOptions options, Map<String, String> meta) {
if (meta != null) {
request.getHeaderParam().putAll(meta);
}
Expand Down
22 changes: 18 additions & 4 deletions src/main/java/com/arangodb/internal/cursor/ArangoCursorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.arangodb.entity.CursorEntity.Warning;
import com.arangodb.internal.ArangoCursorExecute;
import com.arangodb.internal.InternalArangoDatabase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -38,12 +40,14 @@
* @author Mark Vollmary
*/
public class ArangoCursorImpl<T> extends AbstractArangoIterable<T> implements ArangoCursor<T> {
private final static Logger LOG = LoggerFactory.getLogger(ArangoCursorImpl.class);

private final Class<T> type;
protected final ArangoCursorIterator<T> iterator;
private final String id;
private final ArangoCursorExecute execute;
private final boolean isPontentialDirtyRead;
private final boolean pontentialDirtyRead;
private final boolean allowRetry;

public ArangoCursorImpl(final InternalArangoDatabase<?, ?> db, final ArangoCursorExecute execute,
final Class<T> type, final CursorEntity result) {
Expand All @@ -52,7 +56,8 @@ public ArangoCursorImpl(final InternalArangoDatabase<?, ?> db, final ArangoCurso
this.type = type;
iterator = createIterator(this, db, execute, result);
id = result.getId();
this.isPontentialDirtyRead = Boolean.parseBoolean(result.getMeta().get("X-Arango-Potential-Dirty-Read"));
this.pontentialDirtyRead = Boolean.parseBoolean(result.getMeta().get("X-Arango-Potential-Dirty-Read"));
this.allowRetry = result.getNextBatchId() != null;
}

protected ArangoCursorIterator<T> createIterator(
Expand Down Expand Up @@ -98,7 +103,7 @@ public boolean isCached() {

@Override
public void close() {
if (id != null && hasNext()) {
if (getId() != null && (allowRetry || iterator.getResult().getHasMore())) {
execute.close(id, iterator.getResult().getMeta());
}
}
Expand All @@ -119,12 +124,17 @@ public List<T> asListRemaining() {
while (hasNext()) {
remaining.add(next());
}
try {
close();
} catch (final Exception e) {
LOG.warn("Could not close cursor: ", e);
}
return remaining;
}

@Override
public boolean isPotentialDirtyRead() {
return isPontentialDirtyRead;
return pontentialDirtyRead;
}

@Override
Expand All @@ -144,4 +154,8 @@ public void foreach(final Consumer<? super T> action) {
}
}

public String getNextBatchId() {
return iterator.getResult().getNextBatchId();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public class ArangoCursorIterator<T> implements ArangoIterator<T> {
private final InternalArangoDatabase<?, ?> db;
private final ArangoCursorExecute execute;

protected ArangoCursorIterator(final ArangoCursor<T> cursor, final ArangoCursorExecute execute,
protected ArangoCursorIterator(final ArangoCursor<T> cursor,
final ArangoCursorExecute execute,
final InternalArangoDatabase<?, ?> db, final CursorEntity result) {
super();
this.cursor = cursor;
this.execute = execute;
this.db = db;
Expand All @@ -66,7 +66,7 @@ public boolean hasNext() {
@Override
public T next() {
if (!arrayIterator.hasNext() && result.getHasMore()) {
result = execute.next(cursor.getId(), result.getMeta());
result = execute.next(cursor.getId(), result.getMeta(), result.getNextBatchId());
arrayIterator = result.getResult().arrayIterator();
}
if (!hasNext()) {
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/com/arangodb/model/AqlQueryOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,33 @@ public AqlQueryOptions forceOneShardAttributeValue(final String forceOneShardAtt
return this;
}

public Boolean getAllowRetry() {
return getOptions().allowRetry;
}

/**
* @param allowRetry Set this option to true to make it possible to retry fetching the latest batch from a cursor.
* <p/>
* This makes possible to safely retry invoking {@link com.arangodb.ArangoCursor#next()} in
* case of I/O exceptions (which are actually thrown as {@link com.arangodb.ArangoDBException}
* with cause {@link java.io.IOException})
* <p/>
* If set to false (default), then it is not safe to retry invoking
* {@link com.arangodb.ArangoCursor#next()} in case of I/O exceptions, since the request to
* fetch the next batch is not idempotent (i.e. the cursor may advance multiple times on the
* server).
* <p/>
* Note: once you successfully received the last batch, you should call
* {@link com.arangodb.ArangoCursor#close()} so that the server does not unnecessary keep the
* batch until the cursor times out ({@link AqlQueryOptions#ttl(Integer)}).
* @return options
* @since ArangoDB 3.11
*/
public AqlQueryOptions allowRetry(final Boolean allowRetry) {
getOptions().allowRetry = allowRetry;
return this;
}

private Options getOptions() {
if (options == null) {
options = new Options();
Expand Down Expand Up @@ -463,6 +490,7 @@ public static class Options implements Serializable, Cloneable {
private Double maxRuntime;
private Boolean fillBlockCache;
private String forceOneShardAttributeValue;
private Boolean allowRetry;

protected Optimizer getOptimizer() {
if (optimizer == null) {
Expand Down
Loading