Skip to content

Commit 730c89f

Browse files
author
Keith Dreibelbis
authored
Merge pull request #1 from getmargin/feature/dataloader-counts
Allow dispatch caller to get stats back
2 parents 4600885 + 0613e12 commit 730c89f

File tree

4 files changed

+87
-15
lines changed

4 files changed

+87
-15
lines changed

src/main/java/org/dataloader/DataLoader.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,19 @@ public CompletableFuture<List<V>> loadMany(List<K> keys, List<Object> keyContext
470470
* @return the promise of the queued load requests
471471
*/
472472
public CompletableFuture<List<V>> dispatch() {
473+
return helper.dispatch().futureList;
474+
}
475+
476+
/**
477+
* Dispatches the queued load requests to the batch execution function and returns both the promise of the result
478+
* and the number of entries that were dispatched.
479+
* <p>
480+
* If batching is disabled, or there are no queued requests, then a succeeded promise with no entries dispatched is
481+
* returned.
482+
*
483+
* @return the promise of the queued load requests and the number of entries dispatched.
484+
*/
485+
public DataLoaderHelper.DispatchResult<V> dispatchWithCounts() {
473486
return helper.dispatch();
474487
}
475488

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,16 @@ Object getCacheKey(K key) {
134134
loaderOptions.cacheKeyFunction().get().getKey(key) : key;
135135
}
136136

137-
CompletableFuture<List<V>> dispatch() {
137+
public static class DispatchResult<X> {
138+
public final CompletableFuture<List<X>> futureList;
139+
public final int totalEntriesHandled;
140+
public DispatchResult(CompletableFuture<List<X>> futureList, int totalEntriesHandled) {
141+
this.futureList = futureList;
142+
this.totalEntriesHandled = totalEntriesHandled;
143+
}
144+
}
145+
146+
DispatchResult<V> dispatch() {
138147
boolean batchingEnabled = loaderOptions.batchingEnabled();
139148
//
140149
// we copy the pre-loaded set of futures ready for dispatch
@@ -149,9 +158,10 @@ CompletableFuture<List<V>> dispatch() {
149158
});
150159
loaderQueue.clear();
151160
}
152-
if (!batchingEnabled || keys.size() == 0) {
153-
return CompletableFuture.completedFuture(emptyList());
161+
if (!batchingEnabled || keys.isEmpty()) {
162+
return new DispatchResult<V>(CompletableFuture.completedFuture(emptyList()), 0);
154163
}
164+
final int totalEntriesHandled = keys.size();
155165
//
156166
// order of keys -> values matter in data loader hence the use of linked hash map
157167
//
@@ -164,11 +174,13 @@ CompletableFuture<List<V>> dispatch() {
164174
// via calls to load("foo") and loadMany(["foo","bar"])
165175
//
166176
int maxBatchSize = loaderOptions.maxBatchSize();
177+
CompletableFuture<List<V>> futureList;
167178
if (maxBatchSize > 0 && maxBatchSize < keys.size()) {
168-
return sliceIntoBatchesOfBatches(keys, queuedFutures, callContexts, maxBatchSize);
179+
futureList = sliceIntoBatchesOfBatches(keys, queuedFutures, callContexts, maxBatchSize);
169180
} else {
170-
return dispatchQueueBatch(keys, callContexts, queuedFutures);
181+
futureList = dispatchQueueBatch(keys, callContexts, queuedFutures);
171182
}
183+
return new DispatchResult<V>(futureList, totalEntriesHandled);
172184
}
173185

174186
private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<CompletableFuture<V>> queuedFutures, List<Object> callContexts, int maxBatchSize) {

src/main/java/org/dataloader/DataLoaderRegistry.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Set;
88
import java.util.concurrent.ConcurrentHashMap;
99
import java.util.function.Function;
10+
1011
import org.dataloader.stats.Statistics;
1112

1213
/**
@@ -22,7 +23,6 @@ public class DataLoaderRegistry {
2223
*
2324
* @param key the key to put the data loader under
2425
* @param dataLoader the data loader to register
25-
*
2626
* @return this registry
2727
*/
2828
public DataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader) {
@@ -33,15 +33,14 @@ public DataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader) {
3333
/**
3434
* Computes a data loader if absent or return it if it was
3535
* already registered at that key.
36-
*
36+
* <p>
3737
* Note: The entire method invocation is performed atomically,
3838
* so the function is applied at most once per key.
3939
*
40-
* @param key the key of the data loader
40+
* @param key the key of the data loader
4141
* @param mappingFunction the function to compute a data loader
42-
* @param <K> the type of keys
43-
* @param <V> the type of values
44-
*
42+
* @param <K> the type of keys
43+
* @param <V> the type of values
4544
* @return a data loader
4645
*/
4746
@SuppressWarnings("unchecked")
@@ -55,7 +54,6 @@ public <K, V> DataLoader<K, V> computeIfAbsent(final String key,
5554
* and return a new combined registry
5655
*
5756
* @param registry the registry to combine into this registry
58-
*
5957
* @return a new combined registry
6058
*/
6159
public DataLoaderRegistry combine(DataLoaderRegistry registry) {
@@ -77,7 +75,6 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) {
7775
* This will unregister a new dataloader
7876
*
7977
* @param key the key of the data loader to unregister
80-
*
8178
* @return this registry
8279
*/
8380
public DataLoaderRegistry unregister(String key) {
@@ -91,7 +88,6 @@ public DataLoaderRegistry unregister(String key) {
9188
* @param key the key of the data loader
9289
* @param <K> the type of keys
9390
* @param <V> the type of values
94-
*
9591
* @return a data loader or null if its not present
9692
*/
9793
@SuppressWarnings("unchecked")
@@ -114,6 +110,32 @@ public void dispatchAll() {
114110
getDataLoaders().forEach(DataLoader::dispatch);
115111
}
116112

113+
/**
114+
* Similar to {@link DataLoaderRegistry#dispatchAll()}, this calls {@link org.dataloader.DataLoader#dispatch()} on
115+
* each of the registered {@link org.dataloader.DataLoader}s, but returns the number of dispatches.
116+
*
117+
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
118+
*/
119+
public int dispatchAllWithCount() {
120+
int sum = 0;
121+
for (DataLoader dataLoader : getDataLoaders()) {
122+
sum += dataLoader.dispatchWithCounts().totalEntriesHandled;
123+
}
124+
return sum;
125+
}
126+
127+
/**
128+
* @return The sum of all batched key loads that need to be dispatched from all registered
129+
* {@link org.dataloader.DataLoader}s
130+
*/
131+
public int dispatchDepth() {
132+
int totalDispatchDepth = 0;
133+
for (DataLoader dataLoader : getDataLoaders()) {
134+
totalDispatchDepth += dataLoader.dispatchDepth();
135+
}
136+
return totalDispatchDepth;
137+
}
138+
117139
/**
118140
* @return a combined set of statistics for all data loaders in this registry presented
119141
* as the sum of all their statistics

src/test/java/org/dataloader/DataLoaderTest.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,19 @@ public void should_Resolve_to_empty_list_when_no_keys_supplied() {
101101
assertThat(futureEmpty.join(), empty());
102102
}
103103

104+
@Test
105+
public void should_Return_zero_entries_dispatched_when_no_keys_supplied() {
106+
AtomicBoolean success = new AtomicBoolean();
107+
DataLoader<Integer, Integer> identityLoader = new DataLoader<>(keysAsValues());
108+
CompletableFuture<List<Integer>> futureEmpty = identityLoader.loadMany(emptyList());
109+
futureEmpty.thenAccept(promisedValues -> {
110+
assertThat(promisedValues.size(), is(0));
111+
success.set(true);
112+
});
113+
DataLoaderHelper.DispatchResult dispatchResult = identityLoader.dispatchWithCounts();
114+
await().untilAtomic(success, is(true));
115+
assertThat(dispatchResult.totalEntriesHandled, equalTo(0));
116+
}
104117
@Test
105118
public void should_Batch_multiple_requests() throws ExecutionException, InterruptedException {
106119
List<Collection<Integer>> loadCalls = new ArrayList<>();
@@ -113,7 +126,19 @@ public void should_Batch_multiple_requests() throws ExecutionException, Interrup
113126
await().until(() -> future1.isDone() && future2.isDone());
114127
assertThat(future1.get(), equalTo(1));
115128
assertThat(future2.get(), equalTo(2));
116-
assertThat(loadCalls, equalTo(singletonList(asList(1, 2))));
129+
assertThat(loadCalls, equalTo(singletonList(asList(1, 2)))); }
130+
131+
@Test
132+
public void should_Return_number_of_batched_entries() throws ExecutionException, InterruptedException {
133+
List<Collection<Integer>> loadCalls = new ArrayList<>();
134+
DataLoader<Integer, Integer> identityLoader = idLoader(new DataLoaderOptions(), loadCalls);
135+
136+
CompletableFuture<Integer> future1 = identityLoader.load(1);
137+
CompletableFuture<Integer> future2 = identityLoader.load(2);
138+
DataLoaderHelper.DispatchResult dispatchResult = identityLoader.dispatchWithCounts();
139+
140+
await().until(() -> future1.isDone() && future2.isDone());
141+
assertThat(dispatchResult.totalEntriesHandled, equalTo(2));
117142
}
118143

119144
@Test

0 commit comments

Comments
 (0)