17
17
package org .dataloader ;
18
18
19
19
import org .dataloader .impl .CompletableFutureKit ;
20
+ import org .dataloader .stats .Statistics ;
21
+ import org .dataloader .stats .StatisticsCollector ;
20
22
21
23
import java .util .ArrayList ;
22
24
import java .util .Collection ;
23
25
import java .util .LinkedHashMap ;
24
26
import java .util .List ;
25
27
import java .util .Map ;
26
28
import java .util .concurrent .CompletableFuture ;
29
+ import java .util .concurrent .CompletionStage ;
27
30
import java .util .stream .Collectors ;
28
31
29
32
import static java .util .Collections .emptyList ;
@@ -64,6 +67,7 @@ public class DataLoader<K, V> {
64
67
private final DataLoaderOptions loaderOptions ;
65
68
private final CacheMap <Object , CompletableFuture <V >> futureCache ;
66
69
private final Map <K , CompletableFuture <V >> loaderQueue ;
70
+ private final StatisticsCollector stats ;
67
71
68
72
/**
69
73
* Creates new DataLoader with the specified batch loader function and default options
@@ -153,6 +157,7 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction, DataLoaderOptions options
153
157
this .futureCache = determineCacheMap (loaderOptions );
154
158
// order of keys matter in data loader
155
159
this .loaderQueue = new LinkedHashMap <>();
160
+ this .stats = nonNull (this .loaderOptions .getStatisticsCollector ());
156
161
}
157
162
158
163
@ SuppressWarnings ("unchecked" )
@@ -173,8 +178,11 @@ private CacheMap<Object, CompletableFuture<V>> determineCacheMap(DataLoaderOptio
173
178
*/
174
179
public CompletableFuture <V > load (K key ) {
175
180
Object cacheKey = getCacheKey (nonNull (key ));
181
+ stats .incrementLoadCount ();
182
+
176
183
synchronized (futureCache ) {
177
184
if (loaderOptions .cachingEnabled () && futureCache .containsKey (cacheKey )) {
185
+ stats .incrementCacheHitCount ();
178
186
return futureCache .get (cacheKey );
179
187
}
180
188
}
@@ -185,6 +193,7 @@ public CompletableFuture<V> load(K key) {
185
193
loaderQueue .put (key , future );
186
194
}
187
195
} else {
196
+ stats .incrementBatchLoadCountBy (1 );
188
197
// immediate execution of batch function
189
198
CompletableFuture <List <V >> batchedLoad = batchLoadFunction
190
199
.load (singletonList (key ))
@@ -291,7 +300,14 @@ private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<
291
300
292
301
@ SuppressWarnings ("unchecked" )
293
302
private CompletableFuture <List <V >> dispatchQueueBatch (List <K > keys , List <CompletableFuture <V >> queuedFutures ) {
294
- return batchLoadFunction .load (keys )
303
+ stats .incrementBatchLoadCountBy (keys .size ());
304
+ CompletionStage <List <V >> batchLoad ;
305
+ try {
306
+ batchLoad = nonNull (batchLoadFunction .load (keys ), "Your batch loader function MUST return a non null CompletionStage promise" );
307
+ } catch (Exception e ) {
308
+ batchLoad = CompletableFutureKit .failedFuture (e );
309
+ }
310
+ return batchLoad
295
311
.toCompletableFuture ()
296
312
.thenApply (values -> {
297
313
assertState (keys .size () == values .size (), "The size of the promised values MUST be the same size as the key list" );
@@ -300,20 +316,28 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Complet
300
316
Object value = values .get (idx );
301
317
CompletableFuture <V > future = queuedFutures .get (idx );
302
318
if (value instanceof Throwable ) {
319
+ stats .incrementLoadErrorCount ();
303
320
future .completeExceptionally ((Throwable ) value );
304
321
// we don't clear the cached view of this entry to avoid
305
322
// frequently loading the same error
306
323
} else if (value instanceof Try ) {
307
324
// we allow the batch loader to return a Try so we can better represent a computation
308
325
// that might have worked or not.
309
- handleTry ((Try <V >) value , future );
326
+ Try <V > tryValue = (Try <V >) value ;
327
+ if (tryValue .isSuccess ()) {
328
+ future .complete (tryValue .get ());
329
+ } else {
330
+ stats .incrementLoadErrorCount ();
331
+ future .completeExceptionally (tryValue .getThrowable ());
332
+ }
310
333
} else {
311
334
V val = (V ) value ;
312
335
future .complete (val );
313
336
}
314
337
}
315
338
return values ;
316
339
}).exceptionally (ex -> {
340
+ stats .incrementBatchLoadExceptionCount ();
317
341
for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
318
342
K key = keys .get (idx );
319
343
CompletableFuture <V > future = queuedFutures .get (idx );
@@ -325,14 +349,6 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Complet
325
349
});
326
350
}
327
351
328
- private void handleTry (Try <V > vTry , CompletableFuture <V > future ) {
329
- if (vTry .isSuccess ()) {
330
- future .complete (vTry .get ());
331
- } else {
332
- future .completeExceptionally (vTry .getThrowable ());
333
- }
334
- }
335
-
336
352
/**
337
353
* Normally {@link #dispatch()} is an asynchronous operation but this version will 'join' on the
338
354
* results if dispatch and wait for them to complete. If the {@link CompletableFuture} callbacks make more
@@ -441,4 +457,15 @@ public Object getCacheKey(K key) {
441
457
return loaderOptions .cacheKeyFunction ().isPresent () ?
442
458
loaderOptions .cacheKeyFunction ().get ().getKey (key ) : key ;
443
459
}
460
+
461
+ /**
462
+ * Gets the statistics associated with this data loader. These will have been gather via
463
+ * the {@link org.dataloader.stats.StatisticsCollector} passed in via {@link DataLoaderOptions#getStatisticsCollector()}
464
+ *
465
+ * @return statistics for this data loader
466
+ */
467
+ public Statistics getStatistics () {
468
+ return stats .getStatistics ();
469
+ }
470
+
444
471
}
0 commit comments