diff --git a/README.md b/README.md
index 2a0e08f..61180d9 100644
--- a/README.md
+++ b/README.md
@@ -750,6 +750,65 @@ When ticker mode is **true** the `ScheduledDataLoaderRegistry` algorithm is as f
         * If it returns **true**, then `dataLoader.dispatch()` is called **and** a task is scheduled to re-evaluate this specific dataloader in the near future
 * The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()`
 
+## Instrumenting the data loader code
+
+A `DataLoader` can have a `DataLoaderInstrumentation` associated with it.  This callback interface is intended to provide
+insight into working of the `DataLoader` such as how long it takes to run or to allow for logging of key events.
+
+You set the `DataLoaderInstrumentation` into the `DataLoaderOptions` at build time.
+
+```java
+
+
+        DataLoaderInstrumentation timingInstrumentation = new DataLoaderInstrumentation() {
+            @Override
+            public DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
+                long then = System.currentTimeMillis();
+                return DataLoaderInstrumentationHelper.whenCompleted((result, err) -> {
+                    long ms = System.currentTimeMillis() - then;
+                    System.out.println(format("dispatch time: %d ms", ms));
+                });
+            }
+
+            @Override
+            public DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
+                long then = System.currentTimeMillis();
+                return DataLoaderInstrumentationHelper.whenCompleted((result, err) -> {
+                    long ms = System.currentTimeMillis() - then;
+                    System.out.println(format("batch loader time: %d ms", ms));
+                });
+            }
+        };
+        DataLoaderOptions options = DataLoaderOptions.newOptions().setInstrumentation(timingInstrumentation);
+        DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options);
+        
+```
+
+The example shows how long the overall `DataLoader` dispatch takes or how long the batch loader takes to run.
+
+### Instrumenting the DataLoaderRegistry
+
+You can also associate a `DataLoaderInstrumentation` with a `DataLoaderRegistry`.  Every `DataLoader` registered will be changed so that the registry
+`DataLoaderInstrumentation` is associated with it.  This allows you to set just the one `DataLoaderInstrumentation` in place and it applies to all 
+data loaders.
+
+```java
+    DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader);
+    DataLoader<String, User> teamsDataLoader = DataLoaderFactory.newDataLoader(teamsBatchLoader);
+    
+    DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
+            .instrumentation(timingInstrumentation)
+            .register("users", userDataLoader)
+            .register("teams", teamsDataLoader)
+            .build();
+    
+    DataLoader<String, User> changedUsersDataLoader = registry.getDataLoader("users");
+```
+
+The `timingInstrumentation` here will be associated with the `DataLoader` under the key  `users` and the key `teams`.  Note that since
+DataLoader is immutable, a new changed object is created so you must use the registry to get the `DataLoader`.  
+
+
 ## Other information sources
 
 - [Facebook DataLoader Github repo](https://github.com/facebook/dataloader)
diff --git a/src/main/java/org/dataloader/DataLoaderFactory.java b/src/main/java/org/dataloader/DataLoaderFactory.java
index 0dc029a..a87e4eb 100644
--- a/src/main/java/org/dataloader/DataLoaderFactory.java
+++ b/src/main/java/org/dataloader/DataLoaderFactory.java
@@ -561,7 +561,7 @@ public Builder<K, V> options(DataLoaderOptions options) {
             return this;
         }
 
-        DataLoader<K, V> build() {
+        public DataLoader<K, V> build() {
             return mkDataLoader(batchLoadFunction, options);
         }
     }
diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java
index 29701b5..9b5a59e 100644
--- a/src/main/java/org/dataloader/DataLoaderHelper.java
+++ b/src/main/java/org/dataloader/DataLoaderHelper.java
@@ -3,6 +3,8 @@
 import org.dataloader.annotations.GuardedBy;
 import org.dataloader.annotations.Internal;
 import org.dataloader.impl.CompletableFutureKit;
+import org.dataloader.instrumentation.DataLoaderInstrumentation;
+import org.dataloader.instrumentation.DataLoaderInstrumentationContext;
 import org.dataloader.reactive.ReactiveSupport;
 import org.dataloader.scheduler.BatchLoaderScheduler;
 import org.dataloader.stats.StatisticsCollector;
@@ -34,6 +36,7 @@
 import static java.util.stream.Collectors.toList;
 import static org.dataloader.impl.Assertions.assertState;
 import static org.dataloader.impl.Assertions.nonNull;
+import static org.dataloader.instrumentation.DataLoaderInstrumentationHelper.ctxOrNoopCtx;
 
 /**
  * This helps break up the large DataLoader class functionality, and it contains the logic to dispatch the
@@ -167,6 +170,8 @@ Object getCacheKeyWithContext(K key, Object context) {
     }
 
     DispatchResult<V> dispatch() {
+        DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader));
+
         boolean batchingEnabled = loaderOptions.batchingEnabled();
         final List<K> keys;
         final List<Object> callContexts;
@@ -175,7 +180,8 @@ DispatchResult<V> dispatch() {
             int queueSize = loaderQueue.size();
             if (queueSize == 0) {
                 lastDispatchTime.set(now());
-                return emptyDispatchResult();
+                instrCtx.onDispatched();
+                return endDispatchCtx(instrCtx, emptyDispatchResult());
             }
 
             // we copy the pre-loaded set of futures ready for dispatch
@@ -192,7 +198,8 @@ DispatchResult<V> dispatch() {
             lastDispatchTime.set(now());
         }
         if (!batchingEnabled) {
-            return emptyDispatchResult();
+            instrCtx.onDispatched();
+            return endDispatchCtx(instrCtx, emptyDispatchResult());
         }
         final int totalEntriesHandled = keys.size();
         //
@@ -213,7 +220,15 @@ DispatchResult<V> dispatch() {
         } else {
             futureList = dispatchQueueBatch(keys, callContexts, queuedFutures);
         }
-        return new DispatchResult<>(futureList, totalEntriesHandled);
+        instrCtx.onDispatched();
+        return endDispatchCtx(instrCtx, new DispatchResult<>(futureList, totalEntriesHandled));
+    }
+
+    private DispatchResult<V> endDispatchCtx(DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx, DispatchResult<V> dispatchResult) {
+        // once the CF completes, we can tell the instrumentation
+        dispatchResult.getPromisedResults()
+                .whenComplete((result, throwable) -> instrCtx.onCompleted(dispatchResult, throwable));
+        return dispatchResult;
     }
 
     private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<CompletableFuture<V>> queuedFutures, List<Object> callContexts, int maxBatchSize) {
@@ -427,11 +442,14 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
     }
 
     CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures) {
+        Object context = loaderOptions.getBatchLoaderContextProvider().getContext();
+        BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment()
+                .context(context).keyContexts(keys, keyContexts).build();
+
+        DataLoaderInstrumentationContext<List<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginBatchLoader(dataLoader, keys, environment));
+
         CompletableFuture<List<V>> batchLoad;
         try {
-            Object context = loaderOptions.getBatchLoaderContextProvider().getContext();
-            BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment()
-                    .context(context).keyContexts(keys, keyContexts).build();
             if (isMapLoader()) {
                 batchLoad = invokeMapBatchLoader(keys, environment);
             } else if (isPublisher()) {
@@ -441,12 +459,16 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
             } else {
                 batchLoad = invokeListBatchLoader(keys, environment);
             }
+            instrCtx.onDispatched();
         } catch (Exception e) {
+            instrCtx.onDispatched();
             batchLoad = CompletableFutureKit.failedFuture(e);
         }
+        batchLoad.whenComplete(instrCtx::onCompleted);
         return batchLoad;
     }
 
+
     @SuppressWarnings("unchecked")
     private CompletableFuture<List<V>> invokeListBatchLoader(List<K> keys, BatchLoaderEnvironment environment) {
         CompletionStage<List<V>> loadResult;
@@ -575,6 +597,10 @@ private boolean isMappedPublisher() {
         return batchLoadFunction instanceof MappedBatchPublisher;
     }
 
+    private DataLoaderInstrumentation instrumentation() {
+        return loaderOptions.getInstrumentation();
+    }
+
     int dispatchDepth() {
         synchronized (dataLoader) {
             return loaderQueue.size();
diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java
index f8ea95c..e4b286a 100644
--- a/src/main/java/org/dataloader/DataLoaderOptions.java
+++ b/src/main/java/org/dataloader/DataLoaderOptions.java
@@ -17,6 +17,8 @@
 package org.dataloader;
 
 import org.dataloader.annotations.PublicApi;
+import org.dataloader.instrumentation.DataLoaderInstrumentation;
+import org.dataloader.instrumentation.DataLoaderInstrumentationHelper;
 import org.dataloader.scheduler.BatchLoaderScheduler;
 import org.dataloader.stats.NoOpStatisticsCollector;
 import org.dataloader.stats.StatisticsCollector;
@@ -52,6 +54,7 @@ public class DataLoaderOptions {
     private final BatchLoaderContextProvider environmentProvider;
     private final ValueCacheOptions valueCacheOptions;
     private final BatchLoaderScheduler batchLoaderScheduler;
+    private final DataLoaderInstrumentation instrumentation;
 
     /**
      * Creates a new data loader options with default settings.
@@ -68,6 +71,7 @@ public DataLoaderOptions() {
         environmentProvider = NULL_PROVIDER;
         valueCacheOptions = DEFAULT_VALUE_CACHE_OPTIONS;
         batchLoaderScheduler = null;
+        instrumentation = DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION;
     }
 
     private DataLoaderOptions(Builder builder) {
@@ -82,6 +86,7 @@ private DataLoaderOptions(Builder builder) {
         this.environmentProvider = builder.environmentProvider;
         this.valueCacheOptions = builder.valueCacheOptions;
         this.batchLoaderScheduler = builder.batchLoaderScheduler;
+        this.instrumentation = builder.instrumentation;
     }
 
     /**
@@ -101,7 +106,8 @@ public DataLoaderOptions(DataLoaderOptions other) {
         this.statisticsCollector = other.statisticsCollector;
         this.environmentProvider = other.environmentProvider;
         this.valueCacheOptions = other.valueCacheOptions;
-        batchLoaderScheduler = other.batchLoaderScheduler;
+        this.batchLoaderScheduler = other.batchLoaderScheduler;
+        this.instrumentation = other.instrumentation;
     }
 
     /**
@@ -169,7 +175,7 @@ public boolean batchingEnabled() {
      * Sets the option that determines whether batch loading is enabled.
      *
      * @param batchingEnabled {@code true} to enable batch loading, {@code false} otherwise
-     * @return the data loader options for fluent coding
+     * @return a new data loader options instance for fluent coding
      */
     public DataLoaderOptions setBatchingEnabled(boolean batchingEnabled) {
         return builder().setBatchingEnabled(batchingEnabled).build();
@@ -188,7 +194,7 @@ public boolean cachingEnabled() {
      * Sets the option that determines whether caching is enabled.
      *
      * @param cachingEnabled {@code true} to enable caching, {@code false} otherwise
-     * @return the data loader options for fluent coding
+     * @return a new data loader options instance for fluent coding
      */
     public DataLoaderOptions setCachingEnabled(boolean cachingEnabled) {
         return builder().setCachingEnabled(cachingEnabled).build();
@@ -212,7 +218,7 @@ public boolean cachingExceptionsEnabled() {
      * Sets the option that determines whether exceptional values are cache enabled.
      *
      * @param cachingExceptionsEnabled {@code true} to enable caching exceptional values, {@code false} otherwise
-     * @return the data loader options for fluent coding
+     * @return a new data loader options instance for fluent coding
      */
     public DataLoaderOptions setCachingExceptionsEnabled(boolean cachingExceptionsEnabled) {
         return builder().setCachingExceptionsEnabled(cachingExceptionsEnabled).build();
@@ -233,7 +239,7 @@ public Optional<CacheKey> cacheKeyFunction() {
      * Sets the function to use for creating the cache key, if caching is enabled.
      *
      * @param cacheKeyFunction the cache key function to use
-     * @return the data loader options for fluent coding
+     * @return a new data loader options instance for fluent coding
      */
     public DataLoaderOptions setCacheKeyFunction(CacheKey<?> cacheKeyFunction) {
         return builder().setCacheKeyFunction(cacheKeyFunction).build();
@@ -254,7 +260,7 @@ public DataLoaderOptions setCacheKeyFunction(CacheKey<?> cacheKeyFunction) {
      * Sets the cache map implementation to use for caching, if caching is enabled.
      *
      * @param cacheMap the cache map instance
-     * @return the data loader options for fluent coding
+     * @return a new data loader options instance for fluent coding
      */
     public DataLoaderOptions setCacheMap(CacheMap<?, ?> cacheMap) {
         return builder().setCacheMap(cacheMap).build();
@@ -275,7 +281,7 @@ public int maxBatchSize() {
      * before they are split into multiple class
      *
      * @param maxBatchSize the maximum batch size
-     * @return the data loader options for fluent coding
+     * @return a new data loader options instance for fluent coding
      */
     public DataLoaderOptions setMaxBatchSize(int maxBatchSize) {
         return builder().setMaxBatchSize(maxBatchSize).build();
@@ -294,7 +300,7 @@ public StatisticsCollector getStatisticsCollector() {
      * a common value
      *
      * @param statisticsCollector the statistics collector to use
-     * @return the data loader options for fluent coding
+     * @return a new data loader options instance for fluent coding
      */
     public DataLoaderOptions setStatisticsCollector(Supplier<StatisticsCollector> statisticsCollector) {
         return builder().setStatisticsCollector(nonNull(statisticsCollector)).build();
@@ -311,7 +317,7 @@ public BatchLoaderContextProvider getBatchLoaderContextProvider() {
      * Sets the batch loader environment provider that will be used to give context to batch load functions
      *
      * @param contextProvider the batch loader context provider
-     * @return the data loader options for fluent coding
+     * @return a new data loader options instance for fluent coding
      */
     public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvider contextProvider) {
         return builder().setBatchLoaderContextProvider(nonNull(contextProvider)).build();
@@ -332,7 +338,7 @@ public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvide
      * Sets the value cache implementation to use for caching values, if caching is enabled.
      *
      * @param valueCache the value cache instance
-     * @return the data loader options for fluent coding
+     * @return a new data loader options instance for fluent coding
      */
     public DataLoaderOptions setValueCache(ValueCache<?, ?> valueCache) {
         return builder().setValueCache(valueCache).build();
@@ -349,7 +355,7 @@ public ValueCacheOptions getValueCacheOptions() {
      * Sets the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used
      *
      * @param valueCacheOptions the value cache options
-     * @return the data loader options for fluent coding
+     * @return a new data loader options instance for fluent coding
      */
     public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOptions) {
         return builder().setValueCacheOptions(nonNull(valueCacheOptions)).build();
@@ -367,12 +373,29 @@ public BatchLoaderScheduler getBatchLoaderScheduler() {
      * to some future time.
      *
      * @param batchLoaderScheduler the scheduler
-     * @return the data loader options for fluent coding
+     * @return a new data loader options instance for fluent coding
      */
     public DataLoaderOptions setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler) {
         return builder().setBatchLoaderScheduler(batchLoaderScheduler).build();
     }
 
+    /**
+     * @return the {@link DataLoaderInstrumentation} to use
+     */
+    public DataLoaderInstrumentation getInstrumentation() {
+        return instrumentation;
+    }
+
+    /**
+     * Sets in a new {@link DataLoaderInstrumentation}
+     *
+     * @param instrumentation the new {@link DataLoaderInstrumentation}
+     * @return a new data loader options instance for fluent coding
+     */
+    public DataLoaderOptions setInstrumentation(DataLoaderInstrumentation instrumentation) {
+        return builder().setInstrumentation(instrumentation).build();
+    }
+
     private Builder builder() {
         return new Builder(this);
     }
@@ -389,6 +412,7 @@ public static class Builder {
         private BatchLoaderContextProvider environmentProvider;
         private ValueCacheOptions valueCacheOptions;
         private BatchLoaderScheduler batchLoaderScheduler;
+        private DataLoaderInstrumentation instrumentation;
 
         public Builder() {
             this(new DataLoaderOptions()); // use the defaults of the DataLoaderOptions for this builder
@@ -406,6 +430,7 @@ public Builder() {
             this.environmentProvider = other.environmentProvider;
             this.valueCacheOptions = other.valueCacheOptions;
             this.batchLoaderScheduler = other.batchLoaderScheduler;
+            this.instrumentation = other.instrumentation;
         }
 
         public Builder setBatchingEnabled(boolean batchingEnabled) {
@@ -463,6 +488,11 @@ public Builder setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler
             return this;
         }
 
+        public Builder setInstrumentation(DataLoaderInstrumentation instrumentation) {
+            this.instrumentation = nonNull(instrumentation);
+            return this;
+        }
+
         public DataLoaderOptions build() {
             return new DataLoaderOptions(this);
         }
diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java
index 5a3f90f..06c93c4 100644
--- a/src/main/java/org/dataloader/DataLoaderRegistry.java
+++ b/src/main/java/org/dataloader/DataLoaderRegistry.java
@@ -1,6 +1,9 @@
 package org.dataloader;
 
 import org.dataloader.annotations.PublicApi;
+import org.dataloader.instrumentation.ChainedDataLoaderInstrumentation;
+import org.dataloader.instrumentation.DataLoaderInstrumentation;
+import org.dataloader.instrumentation.DataLoaderInstrumentationHelper;
 import org.dataloader.stats.Statistics;
 
 import java.util.ArrayList;
@@ -16,30 +19,108 @@
 /**
  * This allows data loaders to be registered together into a single place, so
  * they can be dispatched as one.  It also allows you to retrieve data loaders by
- * name from a central place
+ * name from a central place.
+ * <p>
+ * Notes on {@link DataLoaderInstrumentation} : A {@link DataLoaderRegistry} can have an instrumentation
+ * associated with it.  As each {@link DataLoader} is added to the registry, the {@link DataLoaderInstrumentation}
+ * of the registry is applied to that {@link DataLoader}.
+ * <p>
+ * The {@link DataLoader} is changed and hence the object in the registry is not the
+ * same one as was originally registered.  So you MUST get access to the {@link DataLoader} via {@link DataLoaderRegistry#getDataLoader(String)} methods
+ * and not use the original {@link DataLoader} object.
+ * <p>
+ * If the {@link DataLoader} has no {@link DataLoaderInstrumentation} then the registry one is added to it.  If it does have one already
+ * then a {@link ChainedDataLoaderInstrumentation} is created with the registry {@link DataLoaderInstrumentation} in it first and then any other
+ * {@link DataLoaderInstrumentation}s added after that.  If the registry {@link DataLoaderInstrumentation} instance and {@link DataLoader} {@link DataLoaderInstrumentation} instance
+ * are the same object, then nothing is changed, since the same instrumentation code is being run.
  */
 @PublicApi
 public class DataLoaderRegistry {
-    protected final Map<String, DataLoader<?, ?>> dataLoaders = new ConcurrentHashMap<>();
+    protected final Map<String, DataLoader<?, ?>> dataLoaders;
+    protected final DataLoaderInstrumentation instrumentation;
+
 
     public DataLoaderRegistry() {
+        this(new ConcurrentHashMap<>(), null);
     }
 
     private DataLoaderRegistry(Builder builder) {
-        this.dataLoaders.putAll(builder.dataLoaders);
+        this(builder.dataLoaders, builder.instrumentation);
+    }
+
+    protected DataLoaderRegistry(Map<String, DataLoader<?, ?>> dataLoaders, DataLoaderInstrumentation instrumentation) {
+        this.dataLoaders = instrumentDLs(dataLoaders, instrumentation);
+        this.instrumentation = instrumentation;
+    }
+
+    private Map<String, DataLoader<?, ?>> instrumentDLs(Map<String, DataLoader<?, ?>> incomingDataLoaders, DataLoaderInstrumentation registryInstrumentation) {
+        Map<String, DataLoader<?, ?>> dataLoaders = new ConcurrentHashMap<>(incomingDataLoaders);
+        if (registryInstrumentation != null) {
+            dataLoaders.replaceAll((k, existingDL) -> instrumentDL(registryInstrumentation, existingDL));
+        }
+        return dataLoaders;
+    }
+
+    /**
+     * Can be called to tweak a {@link DataLoader} so that it has the registry {@link DataLoaderInstrumentation} added as the first one.
+     *
+     * @param registryInstrumentation the common registry {@link DataLoaderInstrumentation}
+     * @param existingDL              the existing data loader
+     * @return a new {@link DataLoader} or the same one if there is nothing to change
+     */
+    private static DataLoader<?, ?> instrumentDL(DataLoaderInstrumentation registryInstrumentation, DataLoader<?, ?> existingDL) {
+        if (registryInstrumentation == null) {
+            return existingDL;
+        }
+        DataLoaderOptions options = existingDL.getOptions();
+        DataLoaderInstrumentation existingInstrumentation = options.getInstrumentation();
+        // if they have any instrumentations then add to it
+        if (existingInstrumentation != null) {
+            if (existingInstrumentation == registryInstrumentation) {
+                // nothing to change
+                return existingDL;
+            }
+            if (existingInstrumentation == DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION) {
+                // replace it with the registry one
+                return mkInstrumentedDataLoader(existingDL, options, registryInstrumentation);
+            }
+            if (existingInstrumentation instanceof ChainedDataLoaderInstrumentation) {
+                // avoids calling a chained inside a chained
+                DataLoaderInstrumentation newInstrumentation = ((ChainedDataLoaderInstrumentation) existingInstrumentation).prepend(registryInstrumentation);
+                return mkInstrumentedDataLoader(existingDL, options, newInstrumentation);
+            } else {
+                DataLoaderInstrumentation newInstrumentation = new ChainedDataLoaderInstrumentation().add(registryInstrumentation).add(existingInstrumentation);
+                return mkInstrumentedDataLoader(existingDL, options, newInstrumentation);
+            }
+        } else {
+            return mkInstrumentedDataLoader(existingDL, options, registryInstrumentation);
+        }
+    }
+
+    private static DataLoader<?, ?> mkInstrumentedDataLoader(DataLoader<?, ?> existingDL, DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation) {
+        return existingDL.transform(builder -> builder.options(setInInstrumentation(options, newInstrumentation)));
     }
 
+    private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation) {
+        return options.transform(optionsBuilder -> optionsBuilder.setInstrumentation(newInstrumentation));
+    }
+
+    /**
+     * @return the {@link DataLoaderInstrumentation} associated with this registry which can be null
+     */
+    public DataLoaderInstrumentation getInstrumentation() {
+        return instrumentation;
+    }
 
     /**
      * This will register a new dataloader
      *
      * @param key        the key to put the data loader under
      * @param dataLoader the data loader to register
-     *
      * @return this registry
      */
     public DataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader) {
-        dataLoaders.put(key, dataLoader);
+        dataLoaders.put(key, instrumentDL(instrumentation, dataLoader));
         return this;
     }
 
@@ -54,13 +135,15 @@ public DataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader) {
      * @param mappingFunction the function to compute a data loader
      * @param <K>             the type of keys
      * @param <V>             the type of values
-     *
      * @return a data loader
      */
     @SuppressWarnings("unchecked")
     public <K, V> DataLoader<K, V> computeIfAbsent(final String key,
                                                    final Function<String, DataLoader<?, ?>> mappingFunction) {
-        return (DataLoader<K, V>) dataLoaders.computeIfAbsent(key, mappingFunction);
+        return (DataLoader<K, V>) dataLoaders.computeIfAbsent(key, (k) -> {
+            DataLoader<?, ?> dl = mappingFunction.apply(k);
+            return instrumentDL(instrumentation, dl);
+        });
     }
 
     /**
@@ -68,7 +151,6 @@ public <K, V> DataLoader<K, V> computeIfAbsent(final String key,
      * and return a new combined registry
      *
      * @param registry the registry to combine into this registry
-     *
      * @return a new combined registry
      */
     public DataLoaderRegistry combine(DataLoaderRegistry registry) {
@@ -97,7 +179,6 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) {
      * This will unregister a new dataloader
      *
      * @param key the key of the data loader to unregister
-     *
      * @return this registry
      */
     public DataLoaderRegistry unregister(String key) {
@@ -111,7 +192,6 @@ public DataLoaderRegistry unregister(String key) {
      * @param key the key of the data loader
      * @param <K> the type of keys
      * @param <V> the type of values
-     *
      * @return a data loader or null if its not present
      */
     @SuppressWarnings("unchecked")
@@ -182,13 +262,13 @@ public static Builder newRegistry() {
     public static class Builder {
 
         private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();
+        private DataLoaderInstrumentation instrumentation;
 
         /**
          * This will register a new dataloader
          *
          * @param key        the key to put the data loader under
          * @param dataLoader the data loader to register
-         *
          * @return this builder for a fluent pattern
          */
         public Builder register(String key, DataLoader<?, ?> dataLoader) {
@@ -201,7 +281,6 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
          * from a previous {@link DataLoaderRegistry}
          *
          * @param otherRegistry the previous {@link DataLoaderRegistry}
-         *
          * @return this builder for a fluent pattern
          */
         public Builder registerAll(DataLoaderRegistry otherRegistry) {
@@ -209,6 +288,11 @@ public Builder registerAll(DataLoaderRegistry otherRegistry) {
             return this;
         }
 
+        public Builder instrumentation(DataLoaderInstrumentation instrumentation) {
+            this.instrumentation = instrumentation;
+            return this;
+        }
+
         /**
          * @return the newly built {@link DataLoaderRegistry}
          */
diff --git a/src/main/java/org/dataloader/instrumentation/ChainedDataLoaderInstrumentation.java b/src/main/java/org/dataloader/instrumentation/ChainedDataLoaderInstrumentation.java
new file mode 100644
index 0000000..eb9af0a
--- /dev/null
+++ b/src/main/java/org/dataloader/instrumentation/ChainedDataLoaderInstrumentation.java
@@ -0,0 +1,118 @@
+package org.dataloader.instrumentation;
+
+import org.dataloader.BatchLoaderEnvironment;
+import org.dataloader.DataLoader;
+import org.dataloader.DispatchResult;
+import org.dataloader.annotations.PublicApi;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * This {@link DataLoaderInstrumentation} can chain together multiple instrumentations and have them all called in
+ * the order of the provided list.
+ */
+@PublicApi
+public class ChainedDataLoaderInstrumentation implements DataLoaderInstrumentation {
+    private final List<DataLoaderInstrumentation> instrumentations;
+
+    public ChainedDataLoaderInstrumentation() {
+        instrumentations = List.of();
+    }
+
+    public ChainedDataLoaderInstrumentation(List<DataLoaderInstrumentation> instrumentations) {
+        this.instrumentations = List.copyOf(instrumentations);
+    }
+
+    public List<DataLoaderInstrumentation> getInstrumentations() {
+        return instrumentations;
+    }
+
+    /**
+     * Adds a new {@link DataLoaderInstrumentation} to the list and creates a new {@link ChainedDataLoaderInstrumentation}
+     *
+     * @param instrumentation the one to add
+     * @return a new ChainedDataLoaderInstrumentation object
+     */
+    public ChainedDataLoaderInstrumentation add(DataLoaderInstrumentation instrumentation) {
+        ArrayList<DataLoaderInstrumentation> list = new ArrayList<>(this.instrumentations);
+        list.add(instrumentation);
+        return new ChainedDataLoaderInstrumentation(list);
+    }
+
+    /**
+     * Prepends a new {@link DataLoaderInstrumentation} to the list and creates a new {@link ChainedDataLoaderInstrumentation}
+     *
+     * @param instrumentation the one to add
+     * @return a new ChainedDataLoaderInstrumentation object
+     */
+    public ChainedDataLoaderInstrumentation prepend(DataLoaderInstrumentation instrumentation) {
+        ArrayList<DataLoaderInstrumentation> list = new ArrayList<>();
+        list.add(instrumentation);
+        list.addAll(this.instrumentations);
+        return new ChainedDataLoaderInstrumentation(list);
+    }
+
+    /**
+     * Adds a collection of {@link DataLoaderInstrumentation} to the list and creates a new {@link ChainedDataLoaderInstrumentation}
+     *
+     * @param instrumentations the new ones to add
+     * @return a new ChainedDataLoaderInstrumentation object
+     */
+    public ChainedDataLoaderInstrumentation addAll(Collection<DataLoaderInstrumentation> instrumentations) {
+        ArrayList<DataLoaderInstrumentation> list = new ArrayList<>(this.instrumentations);
+        list.addAll(instrumentations);
+        return new ChainedDataLoaderInstrumentation(list);
+    }
+
+    @Override
+    public DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
+        return chainedCtx(it -> it.beginDispatch(dataLoader));
+    }
+
+    @Override
+    public DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
+        return chainedCtx(it -> it.beginBatchLoader(dataLoader, keys, environment));
+    }
+
+    private <T> DataLoaderInstrumentationContext<T> chainedCtx(Function<DataLoaderInstrumentation, DataLoaderInstrumentationContext<T>> mapper) {
+        // if we have zero or 1 instrumentations (and 1 is the most common), then we can avoid an object allocation
+        // of the ChainedInstrumentationContext since it won't be needed
+        if (instrumentations.isEmpty()) {
+            return DataLoaderInstrumentationHelper.noOpCtx();
+        }
+        if (instrumentations.size() == 1) {
+            return mapper.apply(instrumentations.get(0));
+        }
+        return new ChainedInstrumentationContext<>(dropNullContexts(mapper));
+    }
+
+    private <T> List<DataLoaderInstrumentationContext<T>> dropNullContexts(Function<DataLoaderInstrumentation, DataLoaderInstrumentationContext<T>> mapper) {
+        return instrumentations.stream()
+                .map(mapper)
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+    }
+
+    private static class ChainedInstrumentationContext<T> implements DataLoaderInstrumentationContext<T> {
+        private final List<DataLoaderInstrumentationContext<T>> contexts;
+
+        public ChainedInstrumentationContext(List<DataLoaderInstrumentationContext<T>> contexts) {
+            this.contexts = contexts;
+        }
+
+        @Override
+        public void onDispatched() {
+            contexts.forEach(DataLoaderInstrumentationContext::onDispatched);
+        }
+
+        @Override
+        public void onCompleted(T result, Throwable t) {
+            contexts.forEach(it -> it.onCompleted(result, t));
+        }
+    }
+}
diff --git a/src/main/java/org/dataloader/instrumentation/DataLoaderInstrumentation.java b/src/main/java/org/dataloader/instrumentation/DataLoaderInstrumentation.java
new file mode 100644
index 0000000..78f2cf5
--- /dev/null
+++ b/src/main/java/org/dataloader/instrumentation/DataLoaderInstrumentation.java
@@ -0,0 +1,38 @@
+package org.dataloader.instrumentation;
+
+import org.dataloader.BatchLoaderEnvironment;
+import org.dataloader.DataLoader;
+import org.dataloader.DispatchResult;
+import org.dataloader.annotations.PublicSpi;
+
+import java.util.List;
+
+/**
+ * This interface is called when certain actions happen inside a data loader
+ */
+@PublicSpi
+public interface DataLoaderInstrumentation {
+    /**
+     * This call back is done just before the {@link DataLoader#dispatch()} is invoked,
+     * and it completes when the dispatch call promise is done.
+     *
+     * @param dataLoader the {@link DataLoader} in question
+     * @return a DataLoaderInstrumentationContext or null to be more performant
+     */
+    default DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
+        return null;
+    }
+
+    /**
+     * This call back is done just before the `batch loader` of a {@link DataLoader} is invoked.  Remember a batch loader
+     * could be called multiple times during a dispatch event (because of max batch sizes)
+     *
+     * @param dataLoader  the {@link DataLoader} in question
+     * @param keys        the set of keys being fetched
+     * @param environment the {@link BatchLoaderEnvironment}
+     * @return a DataLoaderInstrumentationContext or null to be more performant
+     */
+    default DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
+        return null;
+    }
+}
diff --git a/src/main/java/org/dataloader/instrumentation/DataLoaderInstrumentationContext.java b/src/main/java/org/dataloader/instrumentation/DataLoaderInstrumentationContext.java
new file mode 100644
index 0000000..88b08ef
--- /dev/null
+++ b/src/main/java/org/dataloader/instrumentation/DataLoaderInstrumentationContext.java
@@ -0,0 +1,33 @@
+package org.dataloader.instrumentation;
+
+import org.dataloader.annotations.PublicSpi;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * When a {@link DataLoaderInstrumentation}.'beginXXX()' method is called then it must return a {@link DataLoaderInstrumentationContext}
+ * that will be invoked when the step is first dispatched and then when it completes.  Sometimes this is effectively the same time
+ * whereas at other times it's when an asynchronous {@link CompletableFuture} completes.
+ * <p>
+ * This pattern of construction of an object then call back is intended to allow "timers" to be created that can instrument what has
+ * just happened or "loggers" to be called to record what has happened.
+ */
+@PublicSpi
+public interface DataLoaderInstrumentationContext<T> {
+    /**
+     * This is invoked when the instrumentation step is initially dispatched.  Note this is NOT
+     * the same time as the {@link DataLoaderInstrumentation}`beginXXX()` starts, but rather after all the inner
+     * work has been done.
+     */
+    default void onDispatched() {
+    }
+
+    /**
+     * This is invoked when the instrumentation step is fully completed.
+     *
+     * @param result the result of the step (which may be null)
+     * @param t      this exception will be non-null if an exception was thrown during the step
+     */
+    default void onCompleted(T result, Throwable t) {
+    }
+}
diff --git a/src/main/java/org/dataloader/instrumentation/DataLoaderInstrumentationHelper.java b/src/main/java/org/dataloader/instrumentation/DataLoaderInstrumentationHelper.java
new file mode 100644
index 0000000..9e60060
--- /dev/null
+++ b/src/main/java/org/dataloader/instrumentation/DataLoaderInstrumentationHelper.java
@@ -0,0 +1,74 @@
+package org.dataloader.instrumentation;
+
+import org.dataloader.annotations.PublicApi;
+
+import java.util.function.BiConsumer;
+
+@PublicApi
+public class DataLoaderInstrumentationHelper {
+
+    @SuppressWarnings("RedundantMethodOverride")
+    private static final DataLoaderInstrumentationContext<?> NOOP_CTX = new DataLoaderInstrumentationContext<>() {
+        @Override
+        public void onDispatched() {
+        }
+
+        @Override
+        public void onCompleted(Object result, Throwable t) {
+        }
+    };
+
+    /**
+     * Returns a noop {@link DataLoaderInstrumentationContext} of the right type
+     *
+     * @param <T> for two
+     * @return a noop context
+     */
+    public static <T> DataLoaderInstrumentationContext<T> noOpCtx() {
+        //noinspection unchecked
+        return (DataLoaderInstrumentationContext<T>) NOOP_CTX;
+    }
+
+    /**
+     * A well known noop {@link DataLoaderInstrumentation}
+     */
+    public static final DataLoaderInstrumentation NOOP_INSTRUMENTATION = new DataLoaderInstrumentation() {
+    };
+
+    /**
+     * Allows for the more fluent away to return an instrumentation context that runs the specified
+     * code on instrumentation step dispatch.
+     *
+     * @param codeToRun the code to run on dispatch
+     * @param <U>       the generic type
+     * @return an instrumentation context
+     */
+    public static <U> DataLoaderInstrumentationContext<U> whenDispatched(Runnable codeToRun) {
+        return new SimpleDataLoaderInstrumentationContext<>(codeToRun, null);
+    }
+
+    /**
+     * Allows for the more fluent away to return an instrumentation context that runs the specified
+     * code on instrumentation step completion.
+     *
+     * @param codeToRun the code to run on completion
+     * @param <U>       the generic type
+     * @return an instrumentation context
+     */
+    public static <U> DataLoaderInstrumentationContext<U> whenCompleted(BiConsumer<U, Throwable> codeToRun) {
+        return new SimpleDataLoaderInstrumentationContext<>(null, codeToRun);
+    }
+
+
+    /**
+     * Check the {@link DataLoaderInstrumentationContext} to see if its null and returns a noop if it is or else the original
+     * context.  This is a bit of a helper method.
+     *
+     * @param ic  the context in play
+     * @param <T> for two
+     * @return a non null context
+     */
+    public static <T> DataLoaderInstrumentationContext<T> ctxOrNoopCtx(DataLoaderInstrumentationContext<T> ic) {
+        return ic == null ? noOpCtx() : ic;
+    }
+}
diff --git a/src/main/java/org/dataloader/instrumentation/SimpleDataLoaderInstrumentationContext.java b/src/main/java/org/dataloader/instrumentation/SimpleDataLoaderInstrumentationContext.java
new file mode 100644
index 0000000..f629a05
--- /dev/null
+++ b/src/main/java/org/dataloader/instrumentation/SimpleDataLoaderInstrumentationContext.java
@@ -0,0 +1,35 @@
+package org.dataloader.instrumentation;
+
+
+import org.dataloader.annotations.Internal;
+
+import java.util.function.BiConsumer;
+
+/**
+ * A simple implementation of {@link DataLoaderInstrumentationContext}
+ */
+@Internal
+class SimpleDataLoaderInstrumentationContext<T> implements DataLoaderInstrumentationContext<T> {
+
+    private final BiConsumer<T, Throwable> codeToRunOnComplete;
+    private final Runnable codeToRunOnDispatch;
+
+    SimpleDataLoaderInstrumentationContext(Runnable codeToRunOnDispatch, BiConsumer<T, Throwable> codeToRunOnComplete) {
+        this.codeToRunOnComplete = codeToRunOnComplete;
+        this.codeToRunOnDispatch = codeToRunOnDispatch;
+    }
+
+    @Override
+    public void onDispatched() {
+        if (codeToRunOnDispatch != null) {
+            codeToRunOnDispatch.run();
+        }
+    }
+
+    @Override
+    public void onCompleted(T result, Throwable t) {
+        if (codeToRunOnComplete != null) {
+            codeToRunOnComplete.accept(result, t);
+        }
+    }
+}
diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java
index 6ea9425..b6bc257 100644
--- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java
+++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java
@@ -3,6 +3,7 @@
 import org.dataloader.DataLoader;
 import org.dataloader.DataLoaderRegistry;
 import org.dataloader.annotations.ExperimentalApi;
+import org.dataloader.instrumentation.DataLoaderInstrumentation;
 
 import java.time.Duration;
 import java.util.LinkedHashMap;
@@ -64,8 +65,7 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A
     private volatile boolean closed;
 
     private ScheduledDataLoaderRegistry(Builder builder) {
-        super();
-        this.dataLoaders.putAll(builder.dataLoaders);
+        super(builder.dataLoaders, builder.instrumentation);
         this.scheduledExecutorService = builder.scheduledExecutorService;
         this.defaultExecutorUsed = builder.defaultExecutorUsed;
         this.schedule = builder.schedule;
@@ -271,6 +271,8 @@ public static class Builder {
         private boolean defaultExecutorUsed = false;
         private Duration schedule = Duration.ofMillis(10);
         private boolean tickerMode = false;
+        private DataLoaderInstrumentation instrumentation;
+
 
         /**
          * If you provide a {@link ScheduledExecutorService} then it will NOT be shutdown when
@@ -363,6 +365,11 @@ public Builder tickerMode(boolean tickerMode) {
             return this;
         }
 
+        public Builder instrumentation(DataLoaderInstrumentation instrumentation) {
+            this.instrumentation = instrumentation;
+            return this;
+        }
+
         /**
          * @return the newly built {@link ScheduledDataLoaderRegistry}
          */
diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java
index 9e30c90..1f718aa 100644
--- a/src/test/java/ReadmeExamples.java
+++ b/src/test/java/ReadmeExamples.java
@@ -6,12 +6,17 @@
 import org.dataloader.DataLoader;
 import org.dataloader.DataLoaderFactory;
 import org.dataloader.DataLoaderOptions;
+import org.dataloader.DataLoaderRegistry;
+import org.dataloader.DispatchResult;
 import org.dataloader.MappedBatchLoaderWithContext;
 import org.dataloader.MappedBatchPublisher;
 import org.dataloader.Try;
 import org.dataloader.fixtures.SecurityCtx;
 import org.dataloader.fixtures.User;
 import org.dataloader.fixtures.UserManager;
+import org.dataloader.instrumentation.DataLoaderInstrumentation;
+import org.dataloader.instrumentation.DataLoaderInstrumentationContext;
+import org.dataloader.instrumentation.DataLoaderInstrumentationHelper;
 import org.dataloader.registries.DispatchPredicate;
 import org.dataloader.registries.ScheduledDataLoaderRegistry;
 import org.dataloader.scheduler.BatchLoaderScheduler;
@@ -228,6 +233,7 @@ private void clearCacheOnError() {
     }
 
     BatchLoader<String, User> userBatchLoader;
+    BatchLoader<String, User> teamsBatchLoader;
 
     private void disableCache() {
         DataLoaderFactory.newDataLoader(userBatchLoader, DataLoaderOptions.newOptions().setCachingEnabled(false));
@@ -380,4 +386,63 @@ private void ScheduledDispatcherChained() {
                 .build();
 
     }
+
+    private DataLoaderInstrumentation timingInstrumentation = DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION;
+
+    private void instrumentationExample() {
+
+        DataLoaderInstrumentation timingInstrumentation = new DataLoaderInstrumentation() {
+            @Override
+            public DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
+                long then = System.currentTimeMillis();
+                return DataLoaderInstrumentationHelper.whenCompleted((result, err) -> {
+                    long ms = System.currentTimeMillis() - then;
+                    System.out.println(format("dispatch time: %d ms", ms));
+                });
+            }
+
+            @Override
+            public DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
+                long then = System.currentTimeMillis();
+                return DataLoaderInstrumentationHelper.whenCompleted((result, err) -> {
+                    long ms = System.currentTimeMillis() - then;
+                    System.out.println(format("batch loader time: %d ms", ms));
+                });
+            }
+        };
+        DataLoaderOptions options = DataLoaderOptions.newOptions().setInstrumentation(timingInstrumentation);
+        DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options);
+    }
+
+    private void registryExample() {
+        DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader);
+        DataLoader<String, User> teamsDataLoader = DataLoaderFactory.newDataLoader(teamsBatchLoader);
+
+        DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
+                .instrumentation(timingInstrumentation)
+                .register("users", userDataLoader)
+                .register("teams", teamsDataLoader)
+                .build();
+
+        DataLoader<String, User> changedUsersDataLoader = registry.getDataLoader("users");
+
+    }
+
+    private void combiningRegistryExample() {
+        DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader);
+        DataLoader<String, User> teamsDataLoader = DataLoaderFactory.newDataLoader(teamsBatchLoader);
+
+        DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
+                .register("users", userDataLoader)
+                .register("teams", teamsDataLoader)
+                .build();
+
+        DataLoaderRegistry registryCombined = DataLoaderRegistry.newRegistry()
+                .instrumentation(timingInstrumentation)
+                .registerAll(registry)
+                .build();
+
+        DataLoader<String, User> changedUsersDataLoader = registryCombined.getDataLoader("users");
+
+    }
 }
diff --git a/src/test/java/org/dataloader/fixtures/Stopwatch.java b/src/test/java/org/dataloader/fixtures/Stopwatch.java
new file mode 100644
index 0000000..c815a8b
--- /dev/null
+++ b/src/test/java/org/dataloader/fixtures/Stopwatch.java
@@ -0,0 +1,57 @@
+package org.dataloader.fixtures;
+
+import java.time.Duration;
+
+public class Stopwatch {
+
+    public static Stopwatch stopwatchStarted() {
+        return new Stopwatch().start();
+    }
+
+    public static Stopwatch stopwatchUnStarted() {
+        return new Stopwatch();
+    }
+
+    private long started = -1;
+    private long stopped = -1;
+
+    public Stopwatch start() {
+        synchronized (this) {
+            if (started != -1) {
+                throw new IllegalStateException("You have started it before");
+            }
+            started = System.currentTimeMillis();
+        }
+        return this;
+    }
+
+    private Stopwatch() {
+    }
+
+    public long elapsed() {
+        synchronized (this) {
+            if (started == -1) {
+                throw new IllegalStateException("You haven't started it");
+            }
+            if (stopped == -1) {
+                return System.currentTimeMillis() - started;
+            } else {
+                return stopped - started;
+            }
+        }
+    }
+
+    public Duration duration() {
+        return Duration.ofMillis(elapsed());
+    }
+
+    public Duration stop() {
+        synchronized (this) {
+            if (started != -1) {
+                throw new IllegalStateException("You have started it");
+            }
+            stopped = System.currentTimeMillis();
+            return duration();
+        }
+    }
+}
diff --git a/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java
index 97e35a8..8cbe86c 100644
--- a/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java
+++ b/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java
@@ -25,6 +25,10 @@ public interface TestDataLoaderFactory {
 
     // Convenience methods
 
+    default <K> DataLoader<K, K> idLoader(DataLoaderOptions options) {
+        return idLoader(options, new ArrayList<>());
+    }
+
     default <K> DataLoader<K, K> idLoader(List<Collection<K>> calls) {
         return idLoader(null, calls);
     }
diff --git a/src/test/java/org/dataloader/instrumentation/CapturingInstrumentation.java b/src/test/java/org/dataloader/instrumentation/CapturingInstrumentation.java
new file mode 100644
index 0000000..f5af683
--- /dev/null
+++ b/src/test/java/org/dataloader/instrumentation/CapturingInstrumentation.java
@@ -0,0 +1,49 @@
+package org.dataloader.instrumentation;
+
+import org.dataloader.BatchLoaderEnvironment;
+import org.dataloader.DataLoader;
+import org.dataloader.DispatchResult;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class CapturingInstrumentation implements DataLoaderInstrumentation {
+    String name;
+    List<String> methods = new ArrayList<>();
+
+    public CapturingInstrumentation(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
+        methods.add(name + "_beginDispatch");
+        return new DataLoaderInstrumentationContext<>() {
+            @Override
+            public void onDispatched() {
+                methods.add(name + "_beginDispatch_onDispatched");
+            }
+
+            @Override
+            public void onCompleted(DispatchResult<?> result, Throwable t) {
+                methods.add(name + "_beginDispatch_onCompleted");
+            }
+        };
+    }
+
+    @Override
+    public DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
+        methods.add(name + "_beginBatchLoader");
+        return new DataLoaderInstrumentationContext<>() {
+            @Override
+            public void onDispatched() {
+                methods.add(name + "_beginBatchLoader_onDispatched");
+            }
+
+            @Override
+            public void onCompleted(List<?> result, Throwable t) {
+                methods.add(name + "_beginBatchLoader_onCompleted");
+            }
+        };
+    }
+}
diff --git a/src/test/java/org/dataloader/instrumentation/CapturingInstrumentationReturnsNull.java b/src/test/java/org/dataloader/instrumentation/CapturingInstrumentationReturnsNull.java
new file mode 100644
index 0000000..0c16429
--- /dev/null
+++ b/src/test/java/org/dataloader/instrumentation/CapturingInstrumentationReturnsNull.java
@@ -0,0 +1,26 @@
+package org.dataloader.instrumentation;
+
+import org.dataloader.BatchLoaderEnvironment;
+import org.dataloader.DataLoader;
+import org.dataloader.DispatchResult;
+
+import java.util.List;
+
+class CapturingInstrumentationReturnsNull extends CapturingInstrumentation {
+
+    public CapturingInstrumentationReturnsNull(String name) {
+        super(name);
+    }
+
+    @Override
+    public DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
+        methods.add(name + "_beginDispatch");
+        return null;
+    }
+
+    @Override
+    public DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
+        methods.add(name + "_beginBatchLoader");
+        return null;
+    }
+}
diff --git a/src/test/java/org/dataloader/instrumentation/ChainedDataLoaderInstrumentationTest.java b/src/test/java/org/dataloader/instrumentation/ChainedDataLoaderInstrumentationTest.java
new file mode 100644
index 0000000..d791762
--- /dev/null
+++ b/src/test/java/org/dataloader/instrumentation/ChainedDataLoaderInstrumentationTest.java
@@ -0,0 +1,120 @@
+package org.dataloader.instrumentation;
+
+import org.dataloader.DataLoader;
+import org.dataloader.DataLoaderFactory;
+import org.dataloader.DataLoaderOptions;
+import org.dataloader.fixtures.TestKit;
+import org.dataloader.fixtures.parameterized.TestDataLoaderFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.awaitility.Awaitility.await;
+import static org.dataloader.DataLoaderOptions.newOptionsBuilder;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+public class ChainedDataLoaderInstrumentationTest {
+
+    CapturingInstrumentation capturingA;
+    CapturingInstrumentation capturingB;
+    CapturingInstrumentation capturingButReturnsNull;
+
+
+    @BeforeEach
+    void setUp() {
+        capturingA = new CapturingInstrumentation("A");
+        capturingB = new CapturingInstrumentation("B");
+        capturingButReturnsNull = new CapturingInstrumentationReturnsNull("NULL");
+    }
+
+    @Test
+    void canChainTogetherZeroInstrumentation() {
+        // just to prove its useless but harmless
+        ChainedDataLoaderInstrumentation chainedItn = new ChainedDataLoaderInstrumentation();
+
+        DataLoaderOptions options = newOptionsBuilder().setInstrumentation(chainedItn).build();
+
+        DataLoader<String, String> dl = DataLoaderFactory.newDataLoader(TestKit.keysAsValues(), options);
+
+        dl.load("A");
+        dl.load("B");
+
+        CompletableFuture<List<String>> dispatch = dl.dispatch();
+
+        await().until(dispatch::isDone);
+        assertThat(dispatch.join(), equalTo(List.of("A", "B")));
+    }
+
+    @Test
+    void canChainTogetherOneInstrumentation() {
+        CapturingInstrumentation capturingA = new CapturingInstrumentation("A");
+
+        ChainedDataLoaderInstrumentation chainedItn = new ChainedDataLoaderInstrumentation()
+                .add(capturingA);
+
+        DataLoaderOptions options = newOptionsBuilder().setInstrumentation(chainedItn).build();
+
+        DataLoader<String, String> dl = DataLoaderFactory.newDataLoader(TestKit.keysAsValues(), options);
+
+        dl.load("A");
+        dl.load("B");
+
+        CompletableFuture<List<String>> dispatch = dl.dispatch();
+
+        await().until(dispatch::isDone);
+
+        assertThat(capturingA.methods, equalTo(List.of("A_beginDispatch",
+                "A_beginBatchLoader", "A_beginBatchLoader_onDispatched", "A_beginBatchLoader_onCompleted",
+                "A_beginDispatch_onDispatched", "A_beginDispatch_onCompleted")));
+    }
+
+
+    @ParameterizedTest
+    @MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
+    public void canChainTogetherManyInstrumentationsWithDifferentBatchLoaders(TestDataLoaderFactory factory) {
+
+        ChainedDataLoaderInstrumentation chainedItn = new ChainedDataLoaderInstrumentation()
+                .add(capturingA)
+                .add(capturingB)
+                .add(capturingButReturnsNull);
+
+        DataLoaderOptions options = newOptionsBuilder().setInstrumentation(chainedItn).build();
+
+        DataLoader<String, String> dl = factory.idLoader(options);
+
+        dl.load("A");
+        dl.load("B");
+
+        CompletableFuture<List<String>> dispatch = dl.dispatch();
+
+        await().until(dispatch::isDone);
+
+        //
+        // A_beginBatchLoader happens before A_beginDispatch_onDispatched because these are sync
+        // and no async - a batch scheduler or async batch loader would change that
+        //
+        assertThat(capturingA.methods, equalTo(List.of("A_beginDispatch",
+                "A_beginBatchLoader", "A_beginBatchLoader_onDispatched", "A_beginBatchLoader_onCompleted",
+                "A_beginDispatch_onDispatched", "A_beginDispatch_onCompleted")));
+
+        assertThat(capturingB.methods, equalTo(List.of("B_beginDispatch",
+                "B_beginBatchLoader", "B_beginBatchLoader_onDispatched", "B_beginBatchLoader_onCompleted",
+                "B_beginDispatch_onDispatched", "B_beginDispatch_onCompleted")));
+
+        // it returned null on all its contexts - nothing to call back on
+        assertThat(capturingButReturnsNull.methods, equalTo(List.of("NULL_beginDispatch", "NULL_beginBatchLoader")));
+    }
+
+    @Test
+    void addition_works() {
+        ChainedDataLoaderInstrumentation chainedItn = new ChainedDataLoaderInstrumentation()
+                .add(capturingA).prepend(capturingB).addAll(List.of(capturingButReturnsNull));
+
+        assertThat(chainedItn.getInstrumentations(), equalTo(List.of(capturingB, capturingA, capturingButReturnsNull)));
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/dataloader/instrumentation/DataLoaderInstrumentationTest.java b/src/test/java/org/dataloader/instrumentation/DataLoaderInstrumentationTest.java
new file mode 100644
index 0000000..a35e13a
--- /dev/null
+++ b/src/test/java/org/dataloader/instrumentation/DataLoaderInstrumentationTest.java
@@ -0,0 +1,116 @@
+package org.dataloader.instrumentation;
+
+import org.dataloader.BatchLoader;
+import org.dataloader.BatchLoaderEnvironment;
+import org.dataloader.DataLoader;
+import org.dataloader.DataLoaderFactory;
+import org.dataloader.DataLoaderOptions;
+import org.dataloader.DispatchResult;
+import org.dataloader.fixtures.Stopwatch;
+import org.dataloader.fixtures.TestKit;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+
+public class DataLoaderInstrumentationTest {
+
+    BatchLoader<String, String> snoozingBatchLoader = keys -> CompletableFuture.supplyAsync(() -> {
+        TestKit.snooze(100);
+        return keys;
+    });
+
+    @Test
+    void canMonitorDispatching() {
+        Stopwatch stopwatch = Stopwatch.stopwatchUnStarted();
+        AtomicReference<DataLoader<?, ?>> dlRef = new AtomicReference<>();
+
+        DataLoaderInstrumentation instrumentation = new DataLoaderInstrumentation() {
+
+            @Override
+            public DataLoaderInstrumentationContext<DispatchResult<?>> beginDispatch(DataLoader<?, ?> dataLoader) {
+                dlRef.set(dataLoader);
+                stopwatch.start();
+                return new DataLoaderInstrumentationContext<>() {
+                    @Override
+                    public void onCompleted(DispatchResult<?> result, Throwable t) {
+                        stopwatch.stop();
+                    }
+                };
+            }
+
+            @Override
+            public DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
+                return DataLoaderInstrumentationHelper.noOpCtx();
+            }
+        };
+
+        DataLoaderOptions options = new DataLoaderOptions()
+                .setInstrumentation(instrumentation)
+                .setMaxBatchSize(5);
+
+        DataLoader<String, String> dl = DataLoaderFactory.newDataLoader(snoozingBatchLoader, options);
+
+        List<String> keys = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            String key = "X" + i;
+            keys.add(key);
+            dl.load(key);
+        }
+
+        CompletableFuture<List<String>> dispatch = dl.dispatch();
+
+        await().until(dispatch::isDone);
+        // we must have called batch load 4 times at 100ms snooze  per call
+        // but its in parallel via supplyAsync
+        assertThat(stopwatch.elapsed(), greaterThan(75L));
+        assertThat(dlRef.get(), is(dl));
+        assertThat(dispatch.join(), equalTo(keys));
+    }
+
+    @Test
+    void canMonitorBatchLoading() {
+        Stopwatch stopwatch = Stopwatch.stopwatchUnStarted();
+        AtomicReference<BatchLoaderEnvironment> beRef = new AtomicReference<>();
+        AtomicReference<DataLoader<?, ?>> dlRef = new AtomicReference<>();
+
+        DataLoaderInstrumentation instrumentation = new DataLoaderInstrumentation() {
+
+            @Override
+            public DataLoaderInstrumentationContext<List<?>> beginBatchLoader(DataLoader<?, ?> dataLoader, List<?> keys, BatchLoaderEnvironment environment) {
+                dlRef.set(dataLoader);
+                beRef.set(environment);
+
+                stopwatch.start();
+                return new DataLoaderInstrumentationContext<>() {
+                    @Override
+                    public void onCompleted(List<?> result, Throwable t) {
+                        stopwatch.stop();
+                    }
+                };
+            }
+        };
+
+        DataLoaderOptions options = new DataLoaderOptions().setInstrumentation(instrumentation);
+        DataLoader<String, String> dl = DataLoaderFactory.newDataLoader(snoozingBatchLoader, options);
+
+        dl.load("A", "kcA");
+        dl.load("B", "kcB");
+
+        CompletableFuture<List<String>> dispatch = dl.dispatch();
+
+        await().until(dispatch::isDone);
+        assertThat(stopwatch.elapsed(), greaterThan(50L));
+        assertThat(dlRef.get(), is(dl));
+        assertThat(beRef.get().getKeyContexts().keySet(), equalTo(Set.of("A", "B")));
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/dataloader/instrumentation/DataLoaderRegistryInstrumentationTest.java b/src/test/java/org/dataloader/instrumentation/DataLoaderRegistryInstrumentationTest.java
new file mode 100644
index 0000000..465aa4d
--- /dev/null
+++ b/src/test/java/org/dataloader/instrumentation/DataLoaderRegistryInstrumentationTest.java
@@ -0,0 +1,231 @@
+package org.dataloader.instrumentation;
+
+import org.dataloader.DataLoader;
+import org.dataloader.DataLoaderOptions;
+import org.dataloader.DataLoaderRegistry;
+import org.dataloader.fixtures.TestKit;
+import org.dataloader.fixtures.parameterized.TestDataLoaderFactory;
+import org.dataloader.registries.ScheduledDataLoaderRegistry;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+
+public class DataLoaderRegistryInstrumentationTest {
+    DataLoader<String, String> dlX;
+    DataLoader<String, String> dlY;
+    DataLoader<String, String> dlZ;
+
+    CapturingInstrumentation instrA;
+    CapturingInstrumentation instrB;
+    ChainedDataLoaderInstrumentation chainedInstrA;
+    ChainedDataLoaderInstrumentation chainedInstrB;
+
+    @BeforeEach
+    void setUp() {
+        dlX = TestKit.idLoader();
+        dlY = TestKit.idLoader();
+        dlZ = TestKit.idLoader();
+        instrA = new CapturingInstrumentation("A");
+        instrB = new CapturingInstrumentation("B");
+        chainedInstrA = new ChainedDataLoaderInstrumentation().add(instrA);
+        chainedInstrB = new ChainedDataLoaderInstrumentation().add(instrB);
+    }
+
+    @Test
+    void canInstrumentRegisteredDLsViaBuilder() {
+
+        assertThat(dlX.getOptions().getInstrumentation(), equalTo(DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION));
+
+        DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
+                .instrumentation(chainedInstrA)
+                .register("X", dlX)
+                .register("Y", dlY)
+                .register("Z", dlZ)
+                .build();
+
+        assertThat(registry.getInstrumentation(), equalTo(chainedInstrA));
+
+        for (String key : List.of("X", "Y", "Z")) {
+            DataLoaderInstrumentation instrumentation = registry.getDataLoader(key).getOptions().getInstrumentation();
+            assertThat(instrumentation, instanceOf(ChainedDataLoaderInstrumentation.class));
+            List<DataLoaderInstrumentation> instrumentations = ((ChainedDataLoaderInstrumentation) instrumentation).getInstrumentations();
+            assertThat(instrumentations, equalTo(List.of(instrA)));
+        }
+    }
+
+    @Test
+    void canInstrumentRegisteredDLsViaBuilderCombined() {
+
+        DataLoaderRegistry registry1 = DataLoaderRegistry.newRegistry()
+                .register("X", dlX)
+                .register("Y", dlY)
+                .build();
+
+        DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
+                .instrumentation(chainedInstrA)
+                .register("Z", dlZ)
+                .registerAll(registry1)
+                .build();
+
+        for (String key : List.of("X", "Y", "Z")) {
+            DataLoaderInstrumentation instrumentation = registry.getDataLoader(key).getOptions().getInstrumentation();
+            assertThat(instrumentation, instanceOf(ChainedDataLoaderInstrumentation.class));
+            List<DataLoaderInstrumentation> instrumentations = ((ChainedDataLoaderInstrumentation) instrumentation).getInstrumentations();
+            assertThat(instrumentations, equalTo(List.of(instrA)));
+        }
+    }
+
+    @Test
+    void canInstrumentViaMutativeRegistration() {
+
+        DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
+                .instrumentation(chainedInstrA)
+                .build();
+
+        registry.register("X", dlX);
+        registry.computeIfAbsent("Y", l -> dlY);
+        registry.computeIfAbsent("Z", l -> dlZ);
+
+        for (String key : List.of("X", "Y", "Z")) {
+            DataLoaderInstrumentation instrumentation = registry.getDataLoader(key).getOptions().getInstrumentation();
+            assertThat(instrumentation, instanceOf(ChainedDataLoaderInstrumentation.class));
+            List<DataLoaderInstrumentation> instrumentations = ((ChainedDataLoaderInstrumentation) instrumentation).getInstrumentations();
+            assertThat(instrumentations, equalTo(List.of(instrA)));
+        }
+    }
+
+    @Test
+    void wontDoAnyThingIfThereIsNoRegistryInstrumentation() {
+        DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
+                .register("X", dlX)
+                .register("Y", dlY)
+                .register("Z", dlZ)
+                .build();
+
+        for (String key : List.of("X", "Y", "Z")) {
+            DataLoaderInstrumentation instrumentation = registry.getDataLoader(key).getOptions().getInstrumentation();
+            assertThat(instrumentation, equalTo(DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION));
+        }
+    }
+
+    @Test
+    void wontDoAnyThingIfThereTheyAreTheSameInstrumentationAlready() {
+        DataLoader<String, String> newX = dlX.transform(builder -> builder.options(dlX.getOptions().setInstrumentation(instrA)));
+        DataLoader<String, String> newY = dlX.transform(builder ->  builder.options(dlY.getOptions().setInstrumentation(instrA)));
+        DataLoader<String, String> newZ = dlX.transform(builder ->  builder.options(dlZ.getOptions().setInstrumentation(instrA)));
+        DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
+                .instrumentation(instrA)
+                .register("X", newX)
+                .register("Y", newY)
+                .register("Z", newZ)
+                .build();
+
+        Map<String, DataLoader<String, String>> dls = Map.of("X", newX, "Y", newY, "Z", newZ);
+
+        assertThat(registry.getInstrumentation(), equalTo(instrA));
+
+        for (String key : List.of("X", "Y", "Z")) {
+            DataLoader<Object, Object> dataLoader = registry.getDataLoader(key);
+            DataLoaderInstrumentation instrumentation = dataLoader.getOptions().getInstrumentation();
+            assertThat(instrumentation, equalTo(instrA));
+            // it's the same DL - it's not changed because it has the same instrumentation
+            assertThat(dls.get(key), equalTo(dataLoader));
+        }
+    }
+
+    @Test
+    void ifTheDLHasAInstrumentationThenItsTurnedIntoAChainedOne() {
+        DataLoaderOptions options = dlX.getOptions().setInstrumentation(instrA);
+        DataLoader<String, String> newX = dlX.transform(builder -> builder.options(options));
+
+        DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
+                .instrumentation(instrB)
+                .register("X", newX)
+                .build();
+
+        DataLoader<Object, Object> dataLoader = registry.getDataLoader("X");
+        DataLoaderInstrumentation instrumentation = dataLoader.getOptions().getInstrumentation();
+        assertThat(instrumentation, instanceOf(ChainedDataLoaderInstrumentation.class));
+
+        List<DataLoaderInstrumentation> instrumentations = ((ChainedDataLoaderInstrumentation) instrumentation).getInstrumentations();
+        // it gets turned into a chained one and the registry one goes first
+        assertThat(instrumentations, equalTo(List.of(instrB, instrA)));
+    }
+
+    @Test
+    void chainedInstrumentationsWillBeCombined() {
+        DataLoaderOptions options = dlX.getOptions().setInstrumentation(chainedInstrB);
+        DataLoader<String, String> newX = dlX.transform(builder -> builder.options(options));
+
+        DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
+                .instrumentation(instrA)
+                .register("X", newX)
+                .build();
+
+        DataLoader<Object, Object> dataLoader = registry.getDataLoader("X");
+        DataLoaderInstrumentation instrumentation = dataLoader.getOptions().getInstrumentation();
+        assertThat(instrumentation, instanceOf(ChainedDataLoaderInstrumentation.class));
+
+        List<DataLoaderInstrumentation> instrumentations = ((ChainedDataLoaderInstrumentation) instrumentation).getInstrumentations();
+        // it gets turned into a chained one and the registry one goes first
+        assertThat(instrumentations, equalTo(List.of(instrA, instrB)));
+    }
+
+    @SuppressWarnings("resource")
+    @Test
+    void canInstrumentScheduledRegistryViaBuilder() {
+
+        assertThat(dlX.getOptions().getInstrumentation(), equalTo(DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION));
+
+        ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry()
+                .instrumentation(chainedInstrA)
+                .register("X", dlX)
+                .register("Y", dlY)
+                .register("Z", dlZ)
+                .build();
+
+        assertThat(registry.getInstrumentation(), equalTo(chainedInstrA));
+
+        for (String key : List.of("X", "Y", "Z")) {
+            DataLoaderInstrumentation instrumentation = registry.getDataLoader(key).getOptions().getInstrumentation();
+            assertThat(instrumentation, instanceOf(ChainedDataLoaderInstrumentation.class));
+            List<DataLoaderInstrumentation> instrumentations = ((ChainedDataLoaderInstrumentation) instrumentation).getInstrumentations();
+            assertThat(instrumentations, equalTo(List.of(instrA)));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
+    public void endToEndIntegrationTest(TestDataLoaderFactory factory) {
+        DataLoader<String, String> dl = factory.idLoader();
+
+        DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
+                .instrumentation(instrA)
+                .register("X", dl)
+                .build();
+
+        // since the data-loader changed when registered you MUST get the data loader from the registry
+        // not direct to the old one
+        DataLoader<String, String> dataLoader = registry.getDataLoader("X");
+        CompletableFuture<String> loadA = dataLoader.load("A");
+
+        registry.dispatchAll();
+
+        await().until(loadA::isDone);
+        assertThat(loadA.join(), equalTo("A"));
+
+        assertThat(instrA.methods, equalTo(List.of("A_beginDispatch",
+                "A_beginBatchLoader", "A_beginBatchLoader_onDispatched", "A_beginBatchLoader_onCompleted",
+                "A_beginDispatch_onDispatched", "A_beginDispatch_onCompleted")));
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/dataloader/instrumentation/SimpleDataLoaderInstrumentationContextTest.java b/src/test/java/org/dataloader/instrumentation/SimpleDataLoaderInstrumentationContextTest.java
new file mode 100644
index 0000000..38328eb
--- /dev/null
+++ b/src/test/java/org/dataloader/instrumentation/SimpleDataLoaderInstrumentationContextTest.java
@@ -0,0 +1,49 @@
+package org.dataloader.instrumentation;
+
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.nullValue;
+
+public class SimpleDataLoaderInstrumentationContextTest {
+
+    @Test
+    void canRunCompletedCodeAsExpected() {
+        AtomicReference<Object> actual = new AtomicReference<>();
+        AtomicReference<Object> actualErr = new AtomicReference<>();
+
+        DataLoaderInstrumentationContext<Object> ctx = DataLoaderInstrumentationHelper.whenCompleted((r, err) -> {
+            actualErr.set(err);
+            actual.set(r);
+        });
+
+        ctx.onDispatched(); // nothing happens
+        assertThat(actual.get(), nullValue());
+        assertThat(actualErr.get(), nullValue());
+
+        ctx.onCompleted("X", null);
+        assertThat(actual.get(), Matchers.equalTo("X"));
+        assertThat(actualErr.get(), nullValue());
+
+        ctx.onCompleted(null, new RuntimeException());
+        assertThat(actual.get(), nullValue());
+        assertThat(actualErr.get(), Matchers.instanceOf(RuntimeException.class));
+    }
+
+    @Test
+    void canRunOnDispatchCodeAsExpected() {
+        AtomicBoolean dispatchedCalled = new AtomicBoolean();
+
+        DataLoaderInstrumentationContext<Object> ctx = DataLoaderInstrumentationHelper.whenDispatched(() -> dispatchedCalled.set(true));
+
+        ctx.onCompleted("X", null); // nothing happens
+        assertThat(dispatchedCalled.get(), Matchers.equalTo(false));
+
+        ctx.onDispatched();
+        assertThat(dispatchedCalled.get(), Matchers.equalTo(true));
+    }
+}
\ No newline at end of file