Skip to content

Commit e55b643

Browse files
committed
Instrumentatin support for dataloader
1 parent 2036771 commit e55b643

File tree

7 files changed

+264
-20
lines changed

7 files changed

+264
-20
lines changed

src/main/java/org/dataloader/DataLoaderFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ public Builder<K, V> options(DataLoaderOptions options) {
561561
return this;
562562
}
563563

564-
DataLoader<K, V> build() {
564+
public DataLoader<K, V> build() {
565565
return mkDataLoader(batchLoadFunction, options);
566566
}
567567
}

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import org.dataloader.annotations.GuardedBy;
44
import org.dataloader.annotations.Internal;
55
import org.dataloader.impl.CompletableFutureKit;
6+
import org.dataloader.instrumentation.DataLoaderInstrumentation;
7+
import org.dataloader.instrumentation.DataLoaderInstrumentationContext;
68
import org.dataloader.reactive.ReactiveSupport;
79
import org.dataloader.scheduler.BatchLoaderScheduler;
810
import org.dataloader.stats.StatisticsCollector;
@@ -34,6 +36,7 @@
3436
import static java.util.stream.Collectors.toList;
3537
import static org.dataloader.impl.Assertions.assertState;
3638
import static org.dataloader.impl.Assertions.nonNull;
39+
import static org.dataloader.instrumentation.DataLoaderInstrumentationHelper.ctxOrNoopCtx;
3740

3841
/**
3942
* This helps break up the large DataLoader class functionality, and it contains the logic to dispatch the
@@ -167,6 +170,8 @@ Object getCacheKeyWithContext(K key, Object context) {
167170
}
168171

169172
DispatchResult<V> dispatch() {
173+
DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader));
174+
170175
boolean batchingEnabled = loaderOptions.batchingEnabled();
171176
final List<K> keys;
172177
final List<Object> callContexts;
@@ -175,7 +180,8 @@ DispatchResult<V> dispatch() {
175180
int queueSize = loaderQueue.size();
176181
if (queueSize == 0) {
177182
lastDispatchTime.set(now());
178-
return emptyDispatchResult();
183+
instrCtx.onDispatched();
184+
return endDispatchCtx(instrCtx, emptyDispatchResult());
179185
}
180186

181187
// we copy the pre-loaded set of futures ready for dispatch
@@ -192,7 +198,8 @@ DispatchResult<V> dispatch() {
192198
lastDispatchTime.set(now());
193199
}
194200
if (!batchingEnabled) {
195-
return emptyDispatchResult();
201+
instrCtx.onDispatched();
202+
return endDispatchCtx(instrCtx, emptyDispatchResult());
196203
}
197204
final int totalEntriesHandled = keys.size();
198205
//
@@ -213,7 +220,15 @@ DispatchResult<V> dispatch() {
213220
} else {
214221
futureList = dispatchQueueBatch(keys, callContexts, queuedFutures);
215222
}
216-
return new DispatchResult<>(futureList, totalEntriesHandled);
223+
instrCtx.onDispatched();
224+
return endDispatchCtx(instrCtx, new DispatchResult<>(futureList, totalEntriesHandled));
225+
}
226+
227+
private DispatchResult<V> endDispatchCtx(DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx, DispatchResult<V> dispatchResult) {
228+
// once the CF completes, we can tell the instrumentation
229+
dispatchResult.getPromisedResults()
230+
.whenComplete((result, throwable) -> instrCtx.onCompleted(dispatchResult, throwable));
231+
return dispatchResult;
217232
}
218233

219234
private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<CompletableFuture<V>> queuedFutures, List<Object> callContexts, int maxBatchSize) {
@@ -427,11 +442,14 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
427442
}
428443

429444
CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures) {
445+
Object context = loaderOptions.getBatchLoaderContextProvider().getContext();
446+
BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment()
447+
.context(context).keyContexts(keys, keyContexts).build();
448+
449+
DataLoaderInstrumentationContext<List<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginBatchLoader(dataLoader, keys, environment));
450+
430451
CompletableFuture<List<V>> batchLoad;
431452
try {
432-
Object context = loaderOptions.getBatchLoaderContextProvider().getContext();
433-
BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment()
434-
.context(context).keyContexts(keys, keyContexts).build();
435453
if (isMapLoader()) {
436454
batchLoad = invokeMapBatchLoader(keys, environment);
437455
} else if (isPublisher()) {
@@ -441,12 +459,16 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
441459
} else {
442460
batchLoad = invokeListBatchLoader(keys, environment);
443461
}
462+
instrCtx.onDispatched();
444463
} catch (Exception e) {
464+
instrCtx.onDispatched();
445465
batchLoad = CompletableFutureKit.failedFuture(e);
446466
}
467+
batchLoad.whenComplete(instrCtx::onCompleted);
447468
return batchLoad;
448469
}
449470

471+
450472
@SuppressWarnings("unchecked")
451473
private CompletableFuture<List<V>> invokeListBatchLoader(List<K> keys, BatchLoaderEnvironment environment) {
452474
CompletionStage<List<V>> loadResult;
@@ -575,6 +597,10 @@ private boolean isMappedPublisher() {
575597
return batchLoadFunction instanceof MappedBatchPublisher;
576598
}
577599

600+
private DataLoaderInstrumentation instrumentation() {
601+
return loaderOptions.getInstrumentation();
602+
}
603+
578604
int dispatchDepth() {
579605
synchronized (dataLoader) {
580606
return loaderQueue.size();

src/main/java/org/dataloader/DataLoaderOptions.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import org.dataloader.annotations.PublicApi;
2020
import org.dataloader.impl.Assertions;
21+
import org.dataloader.instrumentation.DataLoaderInstrumentation;
22+
import org.dataloader.instrumentation.DataLoaderInstrumentationHelper;
2123
import org.dataloader.scheduler.BatchLoaderScheduler;
2224
import org.dataloader.stats.NoOpStatisticsCollector;
2325
import org.dataloader.stats.StatisticsCollector;
@@ -48,6 +50,7 @@ public class DataLoaderOptions {
4850
private BatchLoaderContextProvider environmentProvider;
4951
private ValueCacheOptions valueCacheOptions;
5052
private BatchLoaderScheduler batchLoaderScheduler;
53+
private DataLoaderInstrumentation instrumentation;
5154

5255
/**
5356
* Creates a new data loader options with default settings.
@@ -61,6 +64,7 @@ public DataLoaderOptions() {
6164
environmentProvider = NULL_PROVIDER;
6265
valueCacheOptions = ValueCacheOptions.newOptions();
6366
batchLoaderScheduler = null;
67+
instrumentation = DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION;
6468
}
6569

6670
/**
@@ -80,7 +84,8 @@ public DataLoaderOptions(DataLoaderOptions other) {
8084
this.statisticsCollector = other.statisticsCollector;
8185
this.environmentProvider = other.environmentProvider;
8286
this.valueCacheOptions = other.valueCacheOptions;
83-
batchLoaderScheduler = other.batchLoaderScheduler;
87+
this.batchLoaderScheduler = other.batchLoaderScheduler;
88+
this.instrumentation = other.instrumentation;
8489
}
8590

8691
/**
@@ -103,7 +108,6 @@ public boolean batchingEnabled() {
103108
* Sets the option that determines whether batch loading is enabled.
104109
*
105110
* @param batchingEnabled {@code true} to enable batch loading, {@code false} otherwise
106-
*
107111
* @return the data loader options for fluent coding
108112
*/
109113
public DataLoaderOptions setBatchingEnabled(boolean batchingEnabled) {
@@ -124,7 +128,6 @@ public boolean cachingEnabled() {
124128
* Sets the option that determines whether caching is enabled.
125129
*
126130
* @param cachingEnabled {@code true} to enable caching, {@code false} otherwise
127-
*
128131
* @return the data loader options for fluent coding
129132
*/
130133
public DataLoaderOptions setCachingEnabled(boolean cachingEnabled) {
@@ -134,7 +137,7 @@ public DataLoaderOptions setCachingEnabled(boolean cachingEnabled) {
134137

135138
/**
136139
* Option that determines whether to cache exceptional values (the default), or not.
137-
*
140+
* <p>
138141
* For short-lived caches (that is request caches) it makes sense to cache exceptions since
139142
* it's likely the key is still poisoned. However, if you have long-lived caches, then it may make
140143
* sense to set this to false since the downstream system may have recovered from its failure
@@ -150,7 +153,6 @@ public boolean cachingExceptionsEnabled() {
150153
* Sets the option that determines whether exceptional values are cache enabled.
151154
*
152155
* @param cachingExceptionsEnabled {@code true} to enable caching exceptional values, {@code false} otherwise
153-
*
154156
* @return the data loader options for fluent coding
155157
*/
156158
public DataLoaderOptions setCachingExceptionsEnabled(boolean cachingExceptionsEnabled) {
@@ -173,7 +175,6 @@ public Optional<CacheKey> cacheKeyFunction() {
173175
* Sets the function to use for creating the cache key, if caching is enabled.
174176
*
175177
* @param cacheKeyFunction the cache key function to use
176-
*
177178
* @return the data loader options for fluent coding
178179
*/
179180
public DataLoaderOptions setCacheKeyFunction(CacheKey<?> cacheKeyFunction) {
@@ -196,7 +197,6 @@ public DataLoaderOptions setCacheKeyFunction(CacheKey<?> cacheKeyFunction) {
196197
* Sets the cache map implementation to use for caching, if caching is enabled.
197198
*
198199
* @param cacheMap the cache map instance
199-
*
200200
* @return the data loader options for fluent coding
201201
*/
202202
public DataLoaderOptions setCacheMap(CacheMap<?, ?> cacheMap) {
@@ -219,7 +219,6 @@ public int maxBatchSize() {
219219
* before they are split into multiple class
220220
*
221221
* @param maxBatchSize the maximum batch size
222-
*
223222
* @return the data loader options for fluent coding
224223
*/
225224
public DataLoaderOptions setMaxBatchSize(int maxBatchSize) {
@@ -240,7 +239,6 @@ public StatisticsCollector getStatisticsCollector() {
240239
* a common value
241240
*
242241
* @param statisticsCollector the statistics collector to use
243-
*
244242
* @return the data loader options for fluent coding
245243
*/
246244
public DataLoaderOptions setStatisticsCollector(Supplier<StatisticsCollector> statisticsCollector) {
@@ -259,7 +257,6 @@ public BatchLoaderContextProvider getBatchLoaderContextProvider() {
259257
* Sets the batch loader environment provider that will be used to give context to batch load functions
260258
*
261259
* @param contextProvider the batch loader context provider
262-
*
263260
* @return the data loader options for fluent coding
264261
*/
265262
public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvider contextProvider) {
@@ -282,7 +279,6 @@ public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvide
282279
* Sets the value cache implementation to use for caching values, if caching is enabled.
283280
*
284281
* @param valueCache the value cache instance
285-
*
286282
* @return the data loader options for fluent coding
287283
*/
288284
public DataLoaderOptions setValueCache(ValueCache<?, ?> valueCache) {
@@ -301,7 +297,6 @@ public ValueCacheOptions getValueCacheOptions() {
301297
* Sets the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used
302298
*
303299
* @param valueCacheOptions the value cache options
304-
*
305300
* @return the data loader options for fluent coding
306301
*/
307302
public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOptions) {
@@ -321,11 +316,28 @@ public BatchLoaderScheduler getBatchLoaderScheduler() {
321316
* to some future time.
322317
*
323318
* @param batchLoaderScheduler the scheduler
324-
*
325319
* @return the data loader options for fluent coding
326320
*/
327321
public DataLoaderOptions setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler) {
328322
this.batchLoaderScheduler = batchLoaderScheduler;
329323
return this;
330324
}
325+
326+
/**
327+
* @return the {@link DataLoaderInstrumentation} to use
328+
*/
329+
public DataLoaderInstrumentation getInstrumentation() {
330+
return instrumentation;
331+
}
332+
333+
/**
334+
* Sets in a new {@link DataLoaderInstrumentation}
335+
*
336+
* @param instrumentation the new {@link DataLoaderInstrumentation}
337+
* @return the data loader options for fluent coding
338+
*/
339+
public DataLoaderOptions setInstrumentation(DataLoaderInstrumentation instrumentation) {
340+
this.instrumentation = nonNull(instrumentation);
341+
return this;
342+
}
331343
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.dataloader.instrumentation;
2+
3+
import org.dataloader.BatchLoaderEnvironment;
4+
import org.dataloader.DataLoader;
5+
import org.dataloader.DispatchResult;
6+
7+
import java.util.List;
8+
9+
/**
10+
* This interface is called when certain actions happen inside a data loader
11+
*/
12+
public interface DataLoaderInstrumentation {
13+
/**
14+
* This call back is done just before the {@link DataLoader#dispatch()} is invoked
15+
* and it completes when the dispatch call promise is done.
16+
*
17+
* @param dataLoader the {@link DataLoader} in question
18+
* @return a DataLoaderInstrumentationContext or null to be more performant
19+
*/
20+
default DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
21+
return null;
22+
}
23+
24+
/**
25+
* This call back is done just before the batch loader of a {@link DataLoader} is invoked. Remember a batch loader
26+
* could be called multiple times during a dispatch event (because of max batch sizes)
27+
*
28+
* @param dataLoader the {@link DataLoader} in question
29+
* @param keys the set of keys being fetched
30+
* @param environment the {@link BatchLoaderEnvironment}
31+
* @return a DataLoaderInstrumentationContext or null to be more performant
32+
*/
33+
default DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
34+
return null;
35+
}
36+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.dataloader.instrumentation;
2+
3+
import java.util.concurrent.CompletableFuture;
4+
5+
/**
6+
* When a {@link DataLoaderInstrumentation}.'beginXXX()' method is called then it must return a {@link DataLoaderInstrumentationContext}
7+
* that will be invoked when the step is first dispatched and then when it completes. Sometimes this is effectively the same time
8+
* whereas at other times it's when an asynchronous {@link CompletableFuture} completes.
9+
* <p>
10+
* This pattern of construction of an object then call back is intended to allow "timers" to be created that can instrument what has
11+
* just happened or "loggers" to be called to record what has happened.
12+
*/
13+
public interface DataLoaderInstrumentationContext<T> {
14+
/**
15+
* This is invoked when the instrumentation step is initially dispatched
16+
*/
17+
default void onDispatched() {
18+
}
19+
20+
/**
21+
* This is invoked when the instrumentation step is fully completed
22+
*
23+
* @param result the result of the step (which may be null)
24+
* @param t this exception will be non-null if an exception was thrown during the step
25+
*/
26+
default void onCompleted(T result, Throwable t) {
27+
}
28+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.dataloader.instrumentation;
2+
3+
public class DataLoaderInstrumentationHelper {
4+
5+
private static final DataLoaderInstrumentationContext<?> NOOP_CTX = new DataLoaderInstrumentationContext<>() {
6+
@Override
7+
public void onDispatched() {
8+
}
9+
10+
@Override
11+
public void onCompleted(Object result, Throwable t) {
12+
}
13+
};
14+
15+
public static <T> DataLoaderInstrumentationContext<T> noOpCtx() {
16+
//noinspection unchecked
17+
return (DataLoaderInstrumentationContext<T>) NOOP_CTX;
18+
}
19+
20+
public static final DataLoaderInstrumentation NOOP_INSTRUMENTATION = new DataLoaderInstrumentation() {
21+
};
22+
23+
/**
24+
* Check the {@link DataLoaderInstrumentationContext} to see if its null and returns a noop if it is or else the original
25+
* context
26+
*
27+
* @param ic the context in play
28+
* @param <T> for two
29+
* @return a non null context
30+
*/
31+
public static <T> DataLoaderInstrumentationContext<T> ctxOrNoopCtx(DataLoaderInstrumentationContext<T> ic) {
32+
return ic == null ? noOpCtx() : ic;
33+
}
34+
}

0 commit comments

Comments
 (0)