Skip to content

Commit de0c195

Browse files
authored
[DE-510] Retriable cursor (v6) (#506)
* sync implementation * async implementation * sync tests * async tests
1 parent d546094 commit de0c195

13 files changed

+292
-57
lines changed

src/main/java/com/arangodb/ArangoCursor.java

+26
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@
2222

2323
import com.arangodb.entity.CursorEntity.Stats;
2424
import com.arangodb.entity.CursorEntity.Warning;
25+
import com.arangodb.model.AqlQueryOptions;
26+
2527

2628
import java.io.Closeable;
2729
import java.util.Collection;
2830
import java.util.List;
31+
import java.util.NoSuchElementException;
2932

3033
/**
3134
* @author Mark Vollmary
@@ -76,4 +79,27 @@ public interface ArangoCursor<T> extends ArangoIterable<T>, ArangoIterator<T>, C
7679
*/
7780
boolean isPotentialDirtyRead();
7881

82+
/**
83+
* @return The ID of the batch after the current one. The first batch has an ID of 1 and the value is incremented by
84+
* 1 with every batch. Only set if the allowRetry query option is enabled.
85+
* @since ArangoDB 3.11
86+
*/
87+
String getNextBatchId();
88+
89+
/**
90+
* Returns the next element in the iteration.
91+
* <p/>
92+
* If the cursor allows retries (see {@link AqlQueryOptions#allowRetry(Boolean)}), then it is safe to retry invoking
93+
* this method in case of I/O exceptions (which are actually thrown as {@link com.arangodb.ArangoDBException} with
94+
* cause {@link java.io.IOException}).
95+
* <p/>
96+
* If the cursor does not allow retries (default), then it is not safe to retry invoking this method in case of I/O
97+
* exceptions, since the request to fetch the next batch is not idempotent (i.e. the cursor may advance multiple
98+
* times on the server).
99+
*
100+
* @return the next element in the iteration
101+
* @throws NoSuchElementException if the iteration has no more elements
102+
*/
103+
@Override
104+
T next();
79105
}

src/main/java/com/arangodb/ArangoDatabase.java

+14
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,20 @@ <T> ArangoCursor<T> query(String query, Map<String, Object> bindVars, AqlQueryOp
338338
*/
339339
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type) throws ArangoDBException;
340340

341+
/**
342+
* Return an cursor from the given cursor-ID if still existing
343+
*
344+
* @param cursorId The ID of the cursor
345+
* @param type The type of the result (POJO class, VPackSlice, String for JSON, or Collection/List/Map)
346+
* @param nextBatchId The ID of the next cursor batch (set only if cursor allows retries, see
347+
* {@link AqlQueryOptions#allowRetry(Boolean)}
348+
* @return cursor of the results
349+
* @see <a href= "https://www.arangodb.com/docs/stable/http/aql-query-cursor-accessing-cursors
350+
* .html#read-next-batch-from-cursor">API Documentation</a>
351+
* @since ArangoDB 3.11
352+
*/
353+
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type, String nextBatchId);
354+
341355
/**
342356
* Explain an AQL query and return information about it
343357
*

src/main/java/com/arangodb/async/ArangoDatabaseAsync.java

+15
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.arangodb.async;
2222

23+
import com.arangodb.ArangoCursor;
2324
import com.arangodb.ArangoDBException;
2425
import com.arangodb.ArangoSerializationAccessor;
2526
import com.arangodb.DbName;
@@ -331,6 +332,20 @@ <T> CompletableFuture<ArangoCursorAsync<T>> query(
331332
*/
332333
<T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId, final Class<T> type);
333334

335+
/**
336+
* Return an cursor from the given cursor-ID if still existing
337+
*
338+
* @param cursorId The ID of the cursor
339+
* @param type The type of the result (POJO class, VPackSlice, String for JSON, or Collection/List/Map)
340+
* @param nextBatchId The ID of the next cursor batch (set only if cursor allows retries, see
341+
* {@link AqlQueryOptions#allowRetry(Boolean)}
342+
* @return cursor of the results
343+
* @see <a href= "https://www.arangodb.com/docs/stable/http/aql-query-cursor-accessing-cursors
344+
* .html#read-next-batch-from-cursor">API Documentation</a>
345+
* @since ArangoDB 3.11
346+
*/
347+
<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, String nextBatchId);
348+
334349
/**
335350
* Explain an AQL query and return information about it
336351
*

src/main/java/com/arangodb/async/internal/ArangoDatabaseAsyncImpl.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -223,16 +223,28 @@ public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId,
223223
return execution.thenApply(result -> createCursor(result, type, null, hostHandle));
224224
}
225225

226+
@Override
227+
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId, final Class<T> type,
228+
final String nextBatchId) {
229+
final HostHandle hostHandle = new HostHandle();
230+
final CompletableFuture<CursorEntity> execution = executor.execute(queryNextByBatchIdRequest(cursorId,
231+
nextBatchId, null,
232+
null),
233+
CursorEntity.class, hostHandle);
234+
return execution.thenApply(result -> createCursor(result, type, null, hostHandle));
235+
}
236+
226237
private <T> ArangoCursorAsync<T> createCursor(
227238
final CursorEntity result,
228239
final Class<T> type,
229240
final AqlQueryOptions options,
230241
final HostHandle hostHandle) {
231242
return new ArangoCursorAsyncImpl<>(this, new ArangoCursorExecute() {
232243
@Override
233-
public CursorEntity next(final String id, Map<String, String> meta) {
234-
final CompletableFuture<CursorEntity> result = executor.execute(queryNextRequest(id, options, meta),
235-
CursorEntity.class, hostHandle);
244+
public CursorEntity next(final String id, Map<String, String> meta, String nextBatchId) {
245+
Request request = nextBatchId == null ?
246+
queryNextRequest(id, options, meta) : queryNextByBatchIdRequest(id, nextBatchId, options, meta);
247+
final CompletableFuture<CursorEntity> result = executor.execute(request, CursorEntity.class, hostHandle);
236248
try {
237249
return result.get();
238250
} catch (InterruptedException | ExecutionException e) {

src/main/java/com/arangodb/entity/CursorEntity.java

+5
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class CursorEntity implements Entity, MetaAware {
4343
private VPackSlice result;
4444

4545
private Map<String, String> meta;
46+
private String nextBatchId;
4647

4748
public String getId() {
4849
return id;
@@ -94,6 +95,10 @@ public Map<String, String> getMeta() {
9495
return meta;
9596
}
9697

98+
public String getNextBatchId() {
99+
return nextBatchId;
100+
}
101+
97102
/**
98103
* @return remove not allowed (valid storable) meta information
99104
*/

src/main/java/com/arangodb/internal/ArangoCursorExecute.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
*/
3131
public interface ArangoCursorExecute {
3232

33-
CursorEntity next(String id, Map<String, String> meta) throws ArangoDBException;
33+
CursorEntity next(String id, Map<String, String> meta, String nextBatchId) throws ArangoDBException;
3434

3535
void close(String id, Map<String, String> meta) throws ArangoDBException;
3636

src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,14 @@ public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type) th
200200
return createCursor(result, type, null, hostHandle);
201201
}
202202

203+
@Override
204+
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
205+
final HostHandle hostHandle = new HostHandle();
206+
final CursorEntity result = executor
207+
.execute(queryNextByBatchIdRequest(cursorId, nextBatchId, null, null), CursorEntity.class, hostHandle);
208+
return createCursor(result, type, null, hostHandle);
209+
}
210+
203211
private <T> ArangoCursor<T> createCursor(
204212
final CursorEntity result,
205213
final Class<T> type,
@@ -208,8 +216,10 @@ private <T> ArangoCursor<T> createCursor(
208216

209217
final ArangoCursorExecute execute = new ArangoCursorExecute() {
210218
@Override
211-
public CursorEntity next(final String id, Map<String, String> meta) {
212-
return executor.execute(queryNextRequest(id, options, meta), CursorEntity.class, hostHandle);
219+
public CursorEntity next(String id, Map<String, String> meta, String nextBatchId) {
220+
Request request = nextBatchId == null ?
221+
queryNextRequest(id, options, meta) : queryNextByBatchIdRequest(id, nextBatchId, options, meta);
222+
return executor.execute(request, CursorEntity.class, hostHandle);
213223
}
214224

215225
@Override

src/main/java/com/arangodb/internal/InternalArangoDatabase.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,17 @@ protected Request queryRequest(
182182
return request;
183183
}
184184

185-
protected Request queryNextRequest(final String id, final AqlQueryOptions options, Map<String, String> meta) {
186-
185+
protected Request queryNextRequest(String id, AqlQueryOptions options, Map<String, String> meta) {
187186
final Request request = request(dbName, RequestType.POST, PATH_API_CURSOR, id);
187+
return completeQueryNextRequest(request, options, meta);
188+
}
189+
190+
protected Request queryNextByBatchIdRequest(String id, String nextBatchId, AqlQueryOptions options, Map<String, String> meta) {
191+
final Request request = request(dbName, RequestType.POST, PATH_API_CURSOR, id, nextBatchId);
192+
return completeQueryNextRequest(request, options, meta);
193+
}
188194

195+
private Request completeQueryNextRequest(Request request, AqlQueryOptions options, Map<String, String> meta) {
189196
if (meta != null) {
190197
request.getHeaderParam().putAll(meta);
191198
}

src/main/java/com/arangodb/internal/cursor/ArangoCursorImpl.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import com.arangodb.entity.CursorEntity.Warning;
3030
import com.arangodb.internal.ArangoCursorExecute;
3131
import com.arangodb.internal.InternalArangoDatabase;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
3234

3335
import java.util.ArrayList;
3436
import java.util.Collection;
@@ -38,12 +40,14 @@
3840
* @author Mark Vollmary
3941
*/
4042
public class ArangoCursorImpl<T> extends AbstractArangoIterable<T> implements ArangoCursor<T> {
43+
private final static Logger LOG = LoggerFactory.getLogger(ArangoCursorImpl.class);
4144

4245
private final Class<T> type;
4346
protected final ArangoCursorIterator<T> iterator;
4447
private final String id;
4548
private final ArangoCursorExecute execute;
46-
private final boolean isPontentialDirtyRead;
49+
private final boolean pontentialDirtyRead;
50+
private final boolean allowRetry;
4751

4852
public ArangoCursorImpl(final InternalArangoDatabase<?, ?> db, final ArangoCursorExecute execute,
4953
final Class<T> type, final CursorEntity result) {
@@ -52,7 +56,8 @@ public ArangoCursorImpl(final InternalArangoDatabase<?, ?> db, final ArangoCurso
5256
this.type = type;
5357
iterator = createIterator(this, db, execute, result);
5458
id = result.getId();
55-
this.isPontentialDirtyRead = Boolean.parseBoolean(result.getMeta().get("X-Arango-Potential-Dirty-Read"));
59+
this.pontentialDirtyRead = Boolean.parseBoolean(result.getMeta().get("X-Arango-Potential-Dirty-Read"));
60+
this.allowRetry = result.getNextBatchId() != null;
5661
}
5762

5863
protected ArangoCursorIterator<T> createIterator(
@@ -98,7 +103,7 @@ public boolean isCached() {
98103

99104
@Override
100105
public void close() {
101-
if (id != null && hasNext()) {
106+
if (getId() != null && (allowRetry || iterator.getResult().getHasMore())) {
102107
execute.close(id, iterator.getResult().getMeta());
103108
}
104109
}
@@ -119,12 +124,17 @@ public List<T> asListRemaining() {
119124
while (hasNext()) {
120125
remaining.add(next());
121126
}
127+
try {
128+
close();
129+
} catch (final Exception e) {
130+
LOG.warn("Could not close cursor: ", e);
131+
}
122132
return remaining;
123133
}
124134

125135
@Override
126136
public boolean isPotentialDirtyRead() {
127-
return isPontentialDirtyRead;
137+
return pontentialDirtyRead;
128138
}
129139

130140
@Override
@@ -144,4 +154,8 @@ public void foreach(final Consumer<? super T> action) {
144154
}
145155
}
146156

157+
public String getNextBatchId() {
158+
return iterator.getResult().getNextBatchId();
159+
}
160+
147161
}

src/main/java/com/arangodb/internal/cursor/ArangoCursorIterator.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ public class ArangoCursorIterator<T> implements ArangoIterator<T> {
4444
private final InternalArangoDatabase<?, ?> db;
4545
private final ArangoCursorExecute execute;
4646

47-
protected ArangoCursorIterator(final ArangoCursor<T> cursor, final ArangoCursorExecute execute,
47+
protected ArangoCursorIterator(final ArangoCursor<T> cursor,
48+
final ArangoCursorExecute execute,
4849
final InternalArangoDatabase<?, ?> db, final CursorEntity result) {
49-
super();
5050
this.cursor = cursor;
5151
this.execute = execute;
5252
this.db = db;
@@ -66,7 +66,7 @@ public boolean hasNext() {
6666
@Override
6767
public T next() {
6868
if (!arrayIterator.hasNext() && result.getHasMore()) {
69-
result = execute.next(cursor.getId(), result.getMeta());
69+
result = execute.next(cursor.getId(), result.getMeta(), result.getNextBatchId());
7070
arrayIterator = result.getResult().arrayIterator();
7171
}
7272
if (!hasNext()) {

src/main/java/com/arangodb/model/AqlQueryOptions.java

+28
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,33 @@ public AqlQueryOptions forceOneShardAttributeValue(final String forceOneShardAtt
425425
return this;
426426
}
427427

428+
public Boolean getAllowRetry() {
429+
return getOptions().allowRetry;
430+
}
431+
432+
/**
433+
* @param allowRetry Set this option to true to make it possible to retry fetching the latest batch from a cursor.
434+
* <p/>
435+
* This makes possible to safely retry invoking {@link com.arangodb.ArangoCursor#next()} in
436+
* case of I/O exceptions (which are actually thrown as {@link com.arangodb.ArangoDBException}
437+
* with cause {@link java.io.IOException})
438+
* <p/>
439+
* If set to false (default), then it is not safe to retry invoking
440+
* {@link com.arangodb.ArangoCursor#next()} in case of I/O exceptions, since the request to
441+
* fetch the next batch is not idempotent (i.e. the cursor may advance multiple times on the
442+
* server).
443+
* <p/>
444+
* Note: once you successfully received the last batch, you should call
445+
* {@link com.arangodb.ArangoCursor#close()} so that the server does not unnecessary keep the
446+
* batch until the cursor times out ({@link AqlQueryOptions#ttl(Integer)}).
447+
* @return options
448+
* @since ArangoDB 3.11
449+
*/
450+
public AqlQueryOptions allowRetry(final Boolean allowRetry) {
451+
getOptions().allowRetry = allowRetry;
452+
return this;
453+
}
454+
428455
private Options getOptions() {
429456
if (options == null) {
430457
options = new Options();
@@ -463,6 +490,7 @@ public static class Options implements Serializable, Cloneable {
463490
private Double maxRuntime;
464491
private Boolean fillBlockCache;
465492
private String forceOneShardAttributeValue;
493+
private Boolean allowRetry;
466494

467495
protected Optimizer getOptimizer() {
468496
if (optimizer == null) {

0 commit comments

Comments
 (0)