From 034c68f4ca4faa17cf762230891324b04b3df7be Mon Sep 17 00:00:00 2001
From: bbaker <bbaker@atlassian.com>
Date: Wed, 22 May 2024 20:25:48 +1000
Subject: [PATCH 1/2] More tests for Publishers

---
 .../java/org/dataloader/DataLoaderHelper.java |   2 +-
 .../java/org/dataloader/DataLoaderTest.java   | 190 +++++++++++++++---
 2 files changed, 163 insertions(+), 29 deletions(-)

diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java
index d01b930..cf3fc6e 100644
--- a/src/main/java/org/dataloader/DataLoaderHelper.java
+++ b/src/main/java/org/dataloader/DataLoaderHelper.java
@@ -790,7 +790,7 @@ public synchronized void onNext(Map.Entry<K, V> entry) {
             V value = entry.getValue();
 
             Object callContext = callContextByKey.get(key);
-            List<CompletableFuture<V>> futures = queuedFuturesByKey.get(key);
+            List<CompletableFuture<V>> futures = queuedFuturesByKey.getOrDefault(key, List.of());
 
             onNextValue(key, value, callContext, futures);
 
diff --git a/src/test/java/org/dataloader/DataLoaderTest.java b/src/test/java/org/dataloader/DataLoaderTest.java
index a87c77d..c8d4562 100644
--- a/src/test/java/org/dataloader/DataLoaderTest.java
+++ b/src/test/java/org/dataloader/DataLoaderTest.java
@@ -16,12 +16,14 @@
 
 package org.dataloader;
 
+import org.awaitility.Duration;
 import org.dataloader.fixtures.CustomCacheMap;
 import org.dataloader.fixtures.JsonObject;
 import org.dataloader.fixtures.TestKit;
 import org.dataloader.fixtures.User;
 import org.dataloader.fixtures.UserManager;
 import org.dataloader.impl.CompletableFutureKit;
+import org.dataloader.impl.DataLoaderAssertionException;
 import org.junit.jupiter.api.Named;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -35,6 +37,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
@@ -47,6 +50,7 @@
 import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
+import static java.util.concurrent.CompletableFuture.*;
 import static org.awaitility.Awaitility.await;
 import static org.dataloader.DataLoaderFactory.newDataLoader;
 import static org.dataloader.DataLoaderFactory.newMappedDataLoader;
@@ -104,7 +108,7 @@ public void basic_map_batch_loading() {
                     mapOfResults.put(k, k);
                 }
             });
-            return CompletableFuture.completedFuture(mapOfResults);
+            return completedFuture(mapOfResults);
         };
         DataLoader<String, String> loader = DataLoaderFactory.newMappedDataLoader(evensOnlyMappedBatchLoader);
 
@@ -424,7 +428,7 @@ public void should_Allow_priming_the_cache_with_a_future(TestDataLoaderFactory f
         List<Collection<String>> loadCalls = new ArrayList<>();
         DataLoader<String, String> identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls);
 
-        DataLoader<String, String> dlFluency = identityLoader.prime("A", CompletableFuture.completedFuture("A"));
+        DataLoader<String, String> dlFluency = identityLoader.prime("A", completedFuture("A"));
         assertThat(dlFluency, equalTo(identityLoader));
 
         CompletableFuture<String> future1 = identityLoader.load("A");
@@ -992,7 +996,7 @@ public void batches_multiple_requests_with_max_batch_size(TestDataLoaderFactory
 
         identityLoader.dispatch();
 
-        CompletableFuture.allOf(f1, f2, f3).join();
+        allOf(f1, f2, f3).join();
 
         assertThat(f1.join(), equalTo(1));
         assertThat(f2.join(), equalTo(2));
@@ -1035,13 +1039,13 @@ public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory fa
 
         AtomicBoolean v4Called = new AtomicBoolean();
 
-        CompletableFuture.supplyAsync(nullValue).thenAccept(v1 -> {
+        supplyAsync(nullValue).thenAccept(v1 -> {
             identityLoader.load("a");
-            CompletableFuture.supplyAsync(nullValue).thenAccept(v2 -> {
+            supplyAsync(nullValue).thenAccept(v2 -> {
                 identityLoader.load("b");
-                CompletableFuture.supplyAsync(nullValue).thenAccept(v3 -> {
+                supplyAsync(nullValue).thenAccept(v3 -> {
                     identityLoader.load("c");
-                    CompletableFuture.supplyAsync(nullValue).thenAccept(
+                    supplyAsync(nullValue).thenAccept(
                             v4 -> {
                                 identityLoader.load("d");
                                 v4Called.set(true);
@@ -1058,12 +1062,68 @@ public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory fa
                 singletonList(asList("a", "b", "c", "d"))));
     }
 
+    @ParameterizedTest
+    @MethodSource("dataLoaderFactories")
+    public void should_blowup_after_N_keys(TestDataLoaderFactory factory) {
+        if (!(factory instanceof TestReactiveDataLoaderFactory)) {
+            return;
+        }
+        //
+        // if we blow up after emitting N keys, the N keys should work but the rest of the keys
+        // should be exceptional
+        DataLoader<Integer, Integer> identityLoader = ((TestReactiveDataLoaderFactory) factory).idLoaderBlowsUpsAfterN(3, new DataLoaderOptions(), new ArrayList<>());
+        CompletableFuture<Integer> cf1 = identityLoader.load(1);
+        CompletableFuture<Integer> cf2 = identityLoader.load(2);
+        CompletableFuture<Integer> cf3 = identityLoader.load(3);
+        CompletableFuture<Integer> cf4 = identityLoader.load(4);
+        CompletableFuture<Integer> cf5 = identityLoader.load(5);
+        identityLoader.dispatch();
+        await().until(cf5::isDone);
+
+        assertThat(cf1.join(), equalTo(1));
+        assertThat(cf2.join(), equalTo(2));
+        assertThat(cf3.join(), equalTo(3));
+        assertThat(cf4.isCompletedExceptionally(), is(true));
+        assertThat(cf5.isCompletedExceptionally(), is(true));
+
+    }
+
+    @ParameterizedTest
+    @MethodSource("dataLoaderFactories")
+    public void should_assert_values_size_equals_key_size(TestDataLoaderFactory factory) {
+        //
+        // what happens if we want 4 values but are only given 2 back say
+        //
+        DataLoader<String, String> identityLoader = factory.onlyReturnsNValues(2, new DataLoaderOptions(), new ArrayList<>());
+        CompletableFuture<String> cf1 = identityLoader.load("A");
+        CompletableFuture<String> cf2 = identityLoader.load("B");
+        CompletableFuture<String> cf3 = identityLoader.load("C");
+        CompletableFuture<String> cf4 = identityLoader.load("D");
+        identityLoader.dispatch();
+
+        await().atMost(Duration.FIVE_HUNDRED_MILLISECONDS).until(() -> cf1.isDone() && cf2.isDone() && cf3.isDone() && cf4.isDone());
+
+        if (factory instanceof ListDataLoaderFactory | factory instanceof PublisherDataLoaderFactory) {
+            assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class));
+            assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class));
+            assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class));
+            assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class));
+        } else {
+            // with the maps it's ok to have fewer results
+            assertThat(cf1.join(), equalTo("A"));
+            assertThat(cf2.join(), equalTo("B"));
+            assertThat(cf3.join(), equalTo(null));
+            assertThat(cf4.join(), equalTo(null));
+        }
+
+    }
+
     @Test
     public void can_call_a_loader_from_a_loader() throws Exception {
         List<Collection<String>> deepLoadCalls = new ArrayList<>();
         DataLoader<String, String> deepLoader = newDataLoader(keys -> {
             deepLoadCalls.add(keys);
-            return CompletableFuture.completedFuture(keys);
+            return completedFuture(keys);
         });
 
         List<Collection<String>> aLoadCalls = new ArrayList<>();
@@ -1083,7 +1143,7 @@ public void can_call_a_loader_from_a_loader() throws Exception {
         CompletableFuture<String> b1 = bLoader.load("B1");
         CompletableFuture<String> b2 = bLoader.load("B2");
 
-        CompletableFuture.allOf(
+        allOf(
                 aLoader.dispatch(),
                 deepLoader.dispatch(),
                 bLoader.dispatch(),
@@ -1109,11 +1169,10 @@ public void can_call_a_loader_from_a_loader() throws Exception {
     public void should_allow_composition_of_data_loader_calls() {
         UserManager userManager = new UserManager();
 
-        BatchLoader<Long, User> userBatchLoader = userIds -> CompletableFuture
-                .supplyAsync(() -> userIds
-                        .stream()
-                        .map(userManager::loadUserById)
-                        .collect(Collectors.toList()));
+        BatchLoader<Long, User> userBatchLoader = userIds -> supplyAsync(() -> userIds
+                .stream()
+                .map(userManager::loadUserById)
+                .collect(Collectors.toList()));
         DataLoader<Long, User> userLoader = newDataLoader(userBatchLoader);
 
         AtomicBoolean gandalfCalled = new AtomicBoolean(false);
@@ -1160,9 +1219,18 @@ private static Stream<Arguments> dataLoaderFactories() {
 
     public interface TestDataLoaderFactory {
         <K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<K>> loadCalls);
+
         <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Collection<K>> loadCalls);
+
         <K> DataLoader<K, Object> idLoaderAllExceptions(DataLoaderOptions options, List<Collection<K>> loadCalls);
+
         DataLoader<Integer, Object> idLoaderOddEvenExceptions(DataLoaderOptions options, List<Collection<Integer>> loadCalls);
+
+        DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls);
+    }
+
+    public interface TestReactiveDataLoaderFactory {
+        <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls);
     }
 
     private static class ListDataLoaderFactory implements TestDataLoaderFactory {
@@ -1170,7 +1238,7 @@ private static class ListDataLoaderFactory implements TestDataLoaderFactory {
         public <K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<K>> loadCalls) {
             return newDataLoader(keys -> {
                 loadCalls.add(new ArrayList<>(keys));
-                return CompletableFuture.completedFuture(keys);
+                return completedFuture(keys);
             }, options);
         }
 
@@ -1189,7 +1257,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(DataLoaderOptions options
                 loadCalls.add(new ArrayList<>(keys));
 
                 List<Object> errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList());
-                return CompletableFuture.completedFuture(errors);
+                return completedFuture(errors);
             }, options);
         }
 
@@ -1206,7 +1274,15 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(DataLoaderOptions o
                         errors.add(new IllegalStateException("Error"));
                     }
                 }
-                return CompletableFuture.completedFuture(errors);
+                return completedFuture(errors);
+            }, options);
+        }
+
+        @Override
+        public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
+            return newDataLoader(keys -> {
+                loadCalls.add(new ArrayList<>(keys));
+                return completedFuture(keys.subList(0, N));
             }, options);
         }
     }
@@ -1220,7 +1296,7 @@ public <K> DataLoader<K, K> idLoader(
                 loadCalls.add(new ArrayList<>(keys));
                 Map<K, K> map = new HashMap<>();
                 keys.forEach(k -> map.put(k, k));
-                return CompletableFuture.completedFuture(map);
+                return completedFuture(map);
             }, options);
         }
 
@@ -1239,7 +1315,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(
                 loadCalls.add(new ArrayList<>(keys));
                 Map<K, Object> errorByKey = new HashMap<>();
                 keys.forEach(k -> errorByKey.put(k, new IllegalStateException("Error")));
-                return CompletableFuture.completedFuture(errorByKey);
+                return completedFuture(errorByKey);
             }, options);
         }
 
@@ -1257,16 +1333,28 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
                         errorByKey.put(key, new IllegalStateException("Error"));
                     }
                 }
-                return CompletableFuture.completedFuture(errorByKey);
+                return completedFuture(errorByKey);
+            }, options);
+        }
+
+        @Override
+        public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
+            return newMappedDataLoader(keys -> {
+                loadCalls.add(new ArrayList<>(keys));
+
+                Map<String, String> collect = List.copyOf(keys).subList(0, N).stream().collect(Collectors.toMap(
+                        k -> k, v -> v
+                ));
+                return completedFuture(collect);
             }, options);
         }
     }
 
-    private static class PublisherDataLoaderFactory implements TestDataLoaderFactory {
+    private static class PublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory {
 
         @Override
         public <K> DataLoader<K, K> idLoader(
-            DataLoaderOptions options, List<Collection<K>> loadCalls) {
+                DataLoaderOptions options, List<Collection<K>> loadCalls) {
             return newPublisherDataLoader((keys, subscriber) -> {
                 loadCalls.add(new ArrayList<>(keys));
                 Flux.fromIterable(keys).subscribe(subscriber);
@@ -1283,7 +1371,7 @@ public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Col
 
         @Override
         public <K> DataLoader<K, Object> idLoaderAllExceptions(
-            DataLoaderOptions options, List<Collection<K>> loadCalls) {
+                DataLoaderOptions options, List<Collection<K>> loadCalls) {
             return newPublisherDataLoaderWithTry((keys, subscriber) -> {
                 loadCalls.add(new ArrayList<>(keys));
                 Stream<Try<Object>> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error")));
@@ -1293,7 +1381,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(
 
         @Override
         public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
-            DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
+                DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
             return newPublisherDataLoaderWithTry((keys, subscriber) -> {
                 loadCalls.add(new ArrayList<>(keys));
 
@@ -1308,13 +1396,36 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
                 Flux.fromIterable(errors).subscribe(subscriber);
             }, options);
         }
+
+        @Override
+        public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls) {
+            return newPublisherDataLoader((keys, subscriber) -> {
+                loadCalls.add(new ArrayList<>(keys));
+
+                List<K> nKeys = keys.subList(0, N);
+                Flux<K> subFlux = Flux.fromIterable(nKeys);
+                subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
+                        .subscribe(subscriber);
+            }, options);
+        }
+
+        @Override
+        public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
+            return newPublisherDataLoader((keys, subscriber) -> {
+                loadCalls.add(new ArrayList<>(keys));
+
+                List<String> nKeys = keys.subList(0, N);
+                Flux.fromIterable(nKeys)
+                        .subscribe(subscriber);
+            }, options);
+        }
     }
 
-    private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory {
+    private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory {
 
         @Override
         public <K> DataLoader<K, K> idLoader(
-            DataLoaderOptions options, List<Collection<K>> loadCalls) {
+                DataLoaderOptions options, List<Collection<K>> loadCalls) {
             return newMappedPublisherDataLoader((keys, subscriber) -> {
                 loadCalls.add(new ArrayList<>(keys));
                 Map<K, K> map = new HashMap<>();
@@ -1333,7 +1444,7 @@ public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Col
 
         @Override
         public <K> DataLoader<K, Object> idLoaderAllExceptions(
-            DataLoaderOptions options, List<Collection<K>> loadCalls) {
+                DataLoaderOptions options, List<Collection<K>> loadCalls) {
             return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> {
                 loadCalls.add(new ArrayList<>(keys));
                 Stream<Map.Entry<K, Try<Object>>> failures = keys.stream().map(k -> Map.entry(k, Try.failed(new IllegalStateException("Error"))));
@@ -1343,7 +1454,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(
 
         @Override
         public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
-            DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
+                DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
             return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> {
                 loadCalls.add(new ArrayList<>(keys));
 
@@ -1358,6 +1469,29 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
                 Flux.fromIterable(errorByKey.entrySet()).subscribe(subscriber);
             }, options);
         }
+
+        @Override
+        public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls) {
+            return newMappedPublisherDataLoader((keys, subscriber) -> {
+                loadCalls.add(new ArrayList<>(keys));
+
+                List<K> nKeys = keys.subList(0, N);
+                Flux<Map.Entry<K, K>> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k));
+                subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
+                        .subscribe(subscriber);
+            }, options);
+        }
+
+        @Override
+        public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
+            return newMappedPublisherDataLoader((keys, subscriber) -> {
+                loadCalls.add(new ArrayList<>(keys));
+
+                List<String> nKeys = keys.subList(0, N);
+                Flux.fromIterable(nKeys).map(k -> Map.entry(k, k))
+                        .subscribe(subscriber);
+            }, options);
+        }
     }
 
     private static class ThrowingCacheMap extends CustomCacheMap {

From 8b344dbdba430c2ca56896112c0b9f85a2d01cb4 Mon Sep 17 00:00:00 2001
From: bbaker <bbaker@atlassian.com>
Date: Thu, 23 May 2024 15:46:21 +1000
Subject: [PATCH 2/2] Now the builds pass - broken out the fixtures

---
 .../java/org/dataloader/DataLoaderHelper.java |  50 ++-
 .../java/org/dataloader/DataLoaderTest.java   | 343 +++---------------
 .../java/org/dataloader/fixtures/TestKit.java |   9 +
 .../parameterized/ListDataLoaderFactory.java  |  79 ++++
 .../MappedDataLoaderFactory.java              |  95 +++++
 .../MappedPublisherDataLoaderFactory.java     | 104 ++++++
 .../PublisherDataLoaderFactory.java           | 100 +++++
 .../parameterized/TestDataLoaderFactory.java  |  22 ++
 .../TestReactiveDataLoaderFactory.java        |  11 +
 9 files changed, 516 insertions(+), 297 deletions(-)
 create mode 100644 src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java
 create mode 100644 src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java
 create mode 100644 src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java
 create mode 100644 src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java
 create mode 100644 src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java
 create mode 100644 src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java

diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java
index cf3fc6e..edbf348 100644
--- a/src/main/java/org/dataloader/DataLoaderHelper.java
+++ b/src/main/java/org/dataloader/DataLoaderHelper.java
@@ -3,6 +3,7 @@
 import org.dataloader.annotations.GuardedBy;
 import org.dataloader.annotations.Internal;
 import org.dataloader.impl.CompletableFutureKit;
+import org.dataloader.impl.DataLoaderAssertionException;
 import org.dataloader.scheduler.BatchLoaderScheduler;
 import org.dataloader.stats.StatisticsCollector;
 import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext;
@@ -624,6 +625,15 @@ private static <T> DispatchResult<T> emptyDispatchResult() {
         return (DispatchResult<T>) EMPTY_DISPATCH_RESULT;
     }
 
+    /**********************************************************************************************
+     * ********************************************************************************************
+     * <p>
+     * The reactive support classes start here
+     *
+     * @param <T> for two
+     **********************************************************************************************
+     **********************************************************************************************
+     */
     private abstract class DataLoaderSubscriberBase<T> implements Subscriber<T> {
 
         final CompletableFuture<List<V>> valuesFuture;
@@ -721,6 +731,11 @@ private DataLoaderSubscriber(
         public synchronized void onNext(V value) {
             super.onNext(value);
 
+            if (idx >= keys.size()) {
+                // hang on they have given us more values than we asked for in keys
+                // we cant handle this
+                return;
+            }
             K key = keys.get(idx);
             Object callContext = callContexts.get(idx);
             CompletableFuture<V> future = queuedFutures.get(idx);
@@ -734,8 +749,16 @@ public synchronized void onNext(V value) {
         @Override
         public synchronized void onComplete() {
             super.onComplete();
-            assertResultSize(keys, completedValues);
-
+            if (keys.size() != completedValues.size()) {
+                // we have more or less values than promised
+                // we will go through all the outstanding promises and mark those that
+                // have not finished as failed
+                for (CompletableFuture<V> queuedFuture : queuedFutures) {
+                    if (!queuedFuture.isDone()) {
+                        queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list"));
+                    }
+                }
+            }
             possiblyClearCacheEntriesOnExceptions(clearCacheKeys);
             valuesFuture.complete(completedValues);
         }
@@ -748,9 +771,11 @@ public synchronized void onError(Throwable ex) {
             for (int i = idx; i < queuedFutures.size(); i++) {
                 K key = keys.get(i);
                 CompletableFuture<V> future = queuedFutures.get(i);
-                future.completeExceptionally(ex);
-                // clear any cached view of this key because they all failed
-                dataLoader.clear(key);
+                if (! future.isDone()) {
+                    future.completeExceptionally(ex);
+                    // clear any cached view of this key because it failed
+                    dataLoader.clear(key);
+                }
             }
             valuesFuture.completeExceptionally(ex);
         }
@@ -794,7 +819,10 @@ public synchronized void onNext(Map.Entry<K, V> entry) {
 
             onNextValue(key, value, callContext, futures);
 
-            completedValuesByKey.put(key, value);
+            // did we have an actual key for this value - ignore it if they send us one outside the key set
+            if (!futures.isEmpty()) {
+                completedValuesByKey.put(key, value);
+            }
         }
 
         @Override
@@ -806,6 +834,16 @@ public synchronized void onComplete() {
             for (K key : keys) {
                 V value = completedValuesByKey.get(key);
                 values.add(value);
+
+                List<CompletableFuture<V>> futures = queuedFuturesByKey.getOrDefault(key, List.of());
+                for (CompletableFuture<V> future : futures) {
+                    if (! future.isDone()) {
+                        // we have a future that never came back for that key
+                        // but the publisher is done sending in data - it must be null
+                        // e.g. for key X when found no value
+                        future.complete(null);
+                    }
+                }
             }
             valuesFuture.complete(values);
         }
diff --git a/src/test/java/org/dataloader/DataLoaderTest.java b/src/test/java/org/dataloader/DataLoaderTest.java
index c8d4562..1f748fb 100644
--- a/src/test/java/org/dataloader/DataLoaderTest.java
+++ b/src/test/java/org/dataloader/DataLoaderTest.java
@@ -19,9 +19,14 @@
 import org.awaitility.Duration;
 import org.dataloader.fixtures.CustomCacheMap;
 import org.dataloader.fixtures.JsonObject;
-import org.dataloader.fixtures.TestKit;
 import org.dataloader.fixtures.User;
 import org.dataloader.fixtures.UserManager;
+import org.dataloader.fixtures.parameterized.ListDataLoaderFactory;
+import org.dataloader.fixtures.parameterized.MappedDataLoaderFactory;
+import org.dataloader.fixtures.parameterized.MappedPublisherDataLoaderFactory;
+import org.dataloader.fixtures.parameterized.PublisherDataLoaderFactory;
+import org.dataloader.fixtures.parameterized.TestDataLoaderFactory;
+import org.dataloader.fixtures.parameterized.TestReactiveDataLoaderFactory;
 import org.dataloader.impl.CompletableFutureKit;
 import org.dataloader.impl.DataLoaderAssertionException;
 import org.junit.jupiter.api.Named;
@@ -29,7 +34,6 @@
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
-import reactor.core.publisher.Flux;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -37,7 +41,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
@@ -59,7 +62,7 @@
 import static org.dataloader.DataLoaderFactory.newPublisherDataLoader;
 import static org.dataloader.DataLoaderFactory.newPublisherDataLoaderWithTry;
 import static org.dataloader.DataLoaderOptions.newOptions;
-import static org.dataloader.fixtures.TestKit.futureError;
+import static org.dataloader.fixtures.TestKit.areAllDone;
 import static org.dataloader.fixtures.TestKit.listFrom;
 import static org.dataloader.impl.CompletableFutureKit.cause;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -68,6 +71,7 @@
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * Tests for {@link DataLoader}.
@@ -1090,7 +1094,7 @@ public void should_blowup_after_N_keys(TestDataLoaderFactory factory) {
 
     @ParameterizedTest
     @MethodSource("dataLoaderFactories")
-    public void should_assert_values_size_equals_key_size(TestDataLoaderFactory factory) {
+    public void when_values_size_are_less_then_key_size(TestDataLoaderFactory factory) {
         //
         // what happens if we want 4 values but are only given 2 back say
         //
@@ -1101,13 +1105,19 @@ public void should_assert_values_size_equals_key_size(TestDataLoaderFactory fact
         CompletableFuture<String> cf4 = identityLoader.load("D");
         identityLoader.dispatch();
 
-        await().atMost(Duration.FIVE_HUNDRED_MILLISECONDS).until(() -> cf1.isDone() && cf2.isDone() && cf3.isDone() && cf4.isDone());
+        await().atMost(Duration.FIVE_SECONDS).until(() -> areAllDone(cf1, cf2, cf3, cf4));
 
-        if (factory instanceof ListDataLoaderFactory | factory instanceof PublisherDataLoaderFactory) {
+        if (factory instanceof ListDataLoaderFactory) {
             assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class));
             assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class));
             assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class));
             assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class));
+        } else if (factory instanceof PublisherDataLoaderFactory) {
+            // some have completed progressively but the other never did
+            assertThat(cf1.join(), equalTo("A"));
+            assertThat(cf2.join(), equalTo("B"));
+            assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class));
+            assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class));
         } else {
             // with the maps it's ok to have fewer results
             assertThat(cf1.join(), equalTo("A"));
@@ -1115,7 +1125,34 @@ public void should_assert_values_size_equals_key_size(TestDataLoaderFactory fact
             assertThat(cf3.join(), equalTo(null));
             assertThat(cf4.join(), equalTo(null));
         }
+    }
+
+    @ParameterizedTest
+    @MethodSource("dataLoaderFactories")
+    public void when_values_size_are_more_then_key_size(TestDataLoaderFactory factory) {
+        //
+        // what happens if we want 4 values but only given 6 back say
+        //
+        DataLoader<String, String> identityLoader = factory.idLoaderReturnsTooMany(2, new DataLoaderOptions(), new ArrayList<>());
+        CompletableFuture<String> cf1 = identityLoader.load("A");
+        CompletableFuture<String> cf2 = identityLoader.load("B");
+        CompletableFuture<String> cf3 = identityLoader.load("C");
+        CompletableFuture<String> cf4 = identityLoader.load("D");
+        identityLoader.dispatch();
+        await().atMost(Duration.FIVE_SECONDS).until(() -> areAllDone(cf1, cf2, cf3, cf4));
+
 
+        if (factory instanceof ListDataLoaderFactory) {
+            assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class));
+            assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class));
+            assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class));
+            assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class));
+        } else {
+            assertThat(cf1.join(), equalTo("A"));
+            assertThat(cf2.join(), equalTo("B"));
+            assertThat(cf3.join(), equalTo("C"));
+            assertThat(cf4.join(), equalTo("D"));
+        }
     }
 
     @Test
@@ -1208,6 +1245,14 @@ private static CacheKey<JsonObject> getJsonObjectCacheMapFn() {
                 .collect(Collectors.joining());
     }
 
+    private static class ThrowingCacheMap extends CustomCacheMap {
+
+        @Override
+        public CompletableFuture<Object> get(String key) {
+            throw new RuntimeException("Cache implementation failed.");
+        }
+    }
+
     private static Stream<Arguments> dataLoaderFactories() {
         return Stream.of(
                 Arguments.of(Named.of("List DataLoader", new ListDataLoaderFactory())),
@@ -1216,289 +1261,5 @@ private static Stream<Arguments> dataLoaderFactories() {
                 Arguments.of(Named.of("Mapped Publisher DataLoader", new MappedPublisherDataLoaderFactory()))
         );
     }
-
-    public interface TestDataLoaderFactory {
-        <K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<K>> loadCalls);
-
-        <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Collection<K>> loadCalls);
-
-        <K> DataLoader<K, Object> idLoaderAllExceptions(DataLoaderOptions options, List<Collection<K>> loadCalls);
-
-        DataLoader<Integer, Object> idLoaderOddEvenExceptions(DataLoaderOptions options, List<Collection<Integer>> loadCalls);
-
-        DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls);
-    }
-
-    public interface TestReactiveDataLoaderFactory {
-        <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls);
-    }
-
-    private static class ListDataLoaderFactory implements TestDataLoaderFactory {
-        @Override
-        public <K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newDataLoader(keys -> {
-                loadCalls.add(new ArrayList<>(keys));
-                return completedFuture(keys);
-            }, options);
-        }
-
-        @Override
-        public <K> DataLoader<K, K> idLoaderBlowsUps(
-                DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newDataLoader(keys -> {
-                loadCalls.add(new ArrayList<>(keys));
-                return TestKit.futureError();
-            }, options);
-        }
-
-        @Override
-        public <K> DataLoader<K, Object> idLoaderAllExceptions(DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newDataLoader(keys -> {
-                loadCalls.add(new ArrayList<>(keys));
-
-                List<Object> errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList());
-                return completedFuture(errors);
-            }, options);
-        }
-
-        @Override
-        public DataLoader<Integer, Object> idLoaderOddEvenExceptions(DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
-            return newDataLoader(keys -> {
-                loadCalls.add(new ArrayList<>(keys));
-
-                List<Object> errors = new ArrayList<>();
-                for (Integer key : keys) {
-                    if (key % 2 == 0) {
-                        errors.add(key);
-                    } else {
-                        errors.add(new IllegalStateException("Error"));
-                    }
-                }
-                return completedFuture(errors);
-            }, options);
-        }
-
-        @Override
-        public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
-            return newDataLoader(keys -> {
-                loadCalls.add(new ArrayList<>(keys));
-                return completedFuture(keys.subList(0, N));
-            }, options);
-        }
-    }
-
-    private static class MappedDataLoaderFactory implements TestDataLoaderFactory {
-
-        @Override
-        public <K> DataLoader<K, K> idLoader(
-                DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newMappedDataLoader((keys) -> {
-                loadCalls.add(new ArrayList<>(keys));
-                Map<K, K> map = new HashMap<>();
-                keys.forEach(k -> map.put(k, k));
-                return completedFuture(map);
-            }, options);
-        }
-
-        @Override
-        public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newMappedDataLoader((keys) -> {
-                loadCalls.add(new ArrayList<>(keys));
-                return futureError();
-            }, options);
-        }
-
-        @Override
-        public <K> DataLoader<K, Object> idLoaderAllExceptions(
-                DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newMappedDataLoader(keys -> {
-                loadCalls.add(new ArrayList<>(keys));
-                Map<K, Object> errorByKey = new HashMap<>();
-                keys.forEach(k -> errorByKey.put(k, new IllegalStateException("Error")));
-                return completedFuture(errorByKey);
-            }, options);
-        }
-
-        @Override
-        public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
-                DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
-            return newMappedDataLoader(keys -> {
-                loadCalls.add(new ArrayList<>(keys));
-
-                Map<Integer, Object> errorByKey = new HashMap<>();
-                for (Integer key : keys) {
-                    if (key % 2 == 0) {
-                        errorByKey.put(key, key);
-                    } else {
-                        errorByKey.put(key, new IllegalStateException("Error"));
-                    }
-                }
-                return completedFuture(errorByKey);
-            }, options);
-        }
-
-        @Override
-        public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
-            return newMappedDataLoader(keys -> {
-                loadCalls.add(new ArrayList<>(keys));
-
-                Map<String, String> collect = List.copyOf(keys).subList(0, N).stream().collect(Collectors.toMap(
-                        k -> k, v -> v
-                ));
-                return completedFuture(collect);
-            }, options);
-        }
-    }
-
-    private static class PublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory {
-
-        @Override
-        public <K> DataLoader<K, K> idLoader(
-                DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newPublisherDataLoader((keys, subscriber) -> {
-                loadCalls.add(new ArrayList<>(keys));
-                Flux.fromIterable(keys).subscribe(subscriber);
-            }, options);
-        }
-
-        @Override
-        public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newPublisherDataLoader((keys, subscriber) -> {
-                loadCalls.add(new ArrayList<>(keys));
-                Flux.<K>error(new IllegalStateException("Error")).subscribe(subscriber);
-            }, options);
-        }
-
-        @Override
-        public <K> DataLoader<K, Object> idLoaderAllExceptions(
-                DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newPublisherDataLoaderWithTry((keys, subscriber) -> {
-                loadCalls.add(new ArrayList<>(keys));
-                Stream<Try<Object>> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error")));
-                Flux.fromStream(failures).subscribe(subscriber);
-            }, options);
-        }
-
-        @Override
-        public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
-                DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
-            return newPublisherDataLoaderWithTry((keys, subscriber) -> {
-                loadCalls.add(new ArrayList<>(keys));
-
-                List<Try<Object>> errors = new ArrayList<>();
-                for (Integer key : keys) {
-                    if (key % 2 == 0) {
-                        errors.add(Try.succeeded(key));
-                    } else {
-                        errors.add(Try.failed(new IllegalStateException("Error")));
-                    }
-                }
-                Flux.fromIterable(errors).subscribe(subscriber);
-            }, options);
-        }
-
-        @Override
-        public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newPublisherDataLoader((keys, subscriber) -> {
-                loadCalls.add(new ArrayList<>(keys));
-
-                List<K> nKeys = keys.subList(0, N);
-                Flux<K> subFlux = Flux.fromIterable(nKeys);
-                subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
-                        .subscribe(subscriber);
-            }, options);
-        }
-
-        @Override
-        public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
-            return newPublisherDataLoader((keys, subscriber) -> {
-                loadCalls.add(new ArrayList<>(keys));
-
-                List<String> nKeys = keys.subList(0, N);
-                Flux.fromIterable(nKeys)
-                        .subscribe(subscriber);
-            }, options);
-        }
-    }
-
-    private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory {
-
-        @Override
-        public <K> DataLoader<K, K> idLoader(
-                DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newMappedPublisherDataLoader((keys, subscriber) -> {
-                loadCalls.add(new ArrayList<>(keys));
-                Map<K, K> map = new HashMap<>();
-                keys.forEach(k -> map.put(k, k));
-                Flux.fromIterable(map.entrySet()).subscribe(subscriber);
-            }, options);
-        }
-
-        @Override
-        public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newMappedPublisherDataLoader((keys, subscriber) -> {
-                loadCalls.add(new ArrayList<>(keys));
-                Flux.<Map.Entry<K, K>>error(new IllegalStateException("Error")).subscribe(subscriber);
-            }, options);
-        }
-
-        @Override
-        public <K> DataLoader<K, Object> idLoaderAllExceptions(
-                DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> {
-                loadCalls.add(new ArrayList<>(keys));
-                Stream<Map.Entry<K, Try<Object>>> failures = keys.stream().map(k -> Map.entry(k, Try.failed(new IllegalStateException("Error"))));
-                Flux.fromStream(failures).subscribe(subscriber);
-            }, options);
-        }
-
-        @Override
-        public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
-                DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
-            return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> {
-                loadCalls.add(new ArrayList<>(keys));
-
-                Map<Integer, Try<Object>> errorByKey = new HashMap<>();
-                for (Integer key : keys) {
-                    if (key % 2 == 0) {
-                        errorByKey.put(key, Try.succeeded(key));
-                    } else {
-                        errorByKey.put(key, Try.failed(new IllegalStateException("Error")));
-                    }
-                }
-                Flux.fromIterable(errorByKey.entrySet()).subscribe(subscriber);
-            }, options);
-        }
-
-        @Override
-        public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls) {
-            return newMappedPublisherDataLoader((keys, subscriber) -> {
-                loadCalls.add(new ArrayList<>(keys));
-
-                List<K> nKeys = keys.subList(0, N);
-                Flux<Map.Entry<K, K>> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k));
-                subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
-                        .subscribe(subscriber);
-            }, options);
-        }
-
-        @Override
-        public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
-            return newMappedPublisherDataLoader((keys, subscriber) -> {
-                loadCalls.add(new ArrayList<>(keys));
-
-                List<String> nKeys = keys.subList(0, N);
-                Flux.fromIterable(nKeys).map(k -> Map.entry(k, k))
-                        .subscribe(subscriber);
-            }, options);
-        }
-    }
-
-    private static class ThrowingCacheMap extends CustomCacheMap {
-        @Override
-        public CompletableFuture<Object> get(String key) {
-            throw new RuntimeException("Cache implementation failed.");
-        }
-    }
 }
 
diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java
index adffb06..c22988d 100644
--- a/src/test/java/org/dataloader/fixtures/TestKit.java
+++ b/src/test/java/org/dataloader/fixtures/TestKit.java
@@ -131,4 +131,13 @@ public static <T> Set<T> asSet(T... elements) {
     public static <T> Set<T> asSet(Collection<T> elements) {
         return new LinkedHashSet<>(elements);
     }
+
+    public static boolean areAllDone(CompletableFuture<?>... cfs) {
+        for (CompletableFuture<?> cf : cfs) {
+            if (! cf.isDone()) {
+                return false;
+            }
+        }
+        return true;
+    }
 }
diff --git a/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java
new file mode 100644
index 0000000..ee1f1d7
--- /dev/null
+++ b/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java
@@ -0,0 +1,79 @@
+package org.dataloader.fixtures.parameterized;
+
+import org.dataloader.DataLoader;
+import org.dataloader.DataLoaderOptions;
+import org.dataloader.fixtures.TestKit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.dataloader.DataLoaderFactory.newDataLoader;
+
+public class ListDataLoaderFactory implements TestDataLoaderFactory {
+    @Override
+    public <K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newDataLoader(keys -> {
+            loadCalls.add(new ArrayList<>(keys));
+            return completedFuture(keys);
+        }, options);
+    }
+
+    @Override
+    public <K> DataLoader<K, K> idLoaderBlowsUps(
+            DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newDataLoader(keys -> {
+            loadCalls.add(new ArrayList<>(keys));
+            return TestKit.futureError();
+        }, options);
+    }
+
+    @Override
+    public <K> DataLoader<K, Object> idLoaderAllExceptions(DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newDataLoader(keys -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            List<Object> errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList());
+            return completedFuture(errors);
+        }, options);
+    }
+
+    @Override
+    public DataLoader<Integer, Object> idLoaderOddEvenExceptions(DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
+        return newDataLoader(keys -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            List<Object> errors = new ArrayList<>();
+            for (Integer key : keys) {
+                if (key % 2 == 0) {
+                    errors.add(key);
+                } else {
+                    errors.add(new IllegalStateException("Error"));
+                }
+            }
+            return completedFuture(errors);
+        }, options);
+    }
+
+    @Override
+    public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
+        return newDataLoader(keys -> {
+            loadCalls.add(new ArrayList<>(keys));
+            return completedFuture(keys.subList(0, N));
+        }, options);
+    }
+
+    @Override
+    public DataLoader<String, String> idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList<Object> loadCalls) {
+        return newDataLoader(keys -> {
+            loadCalls.add(new ArrayList<>(keys));
+            List<String> l = new ArrayList<>(keys);
+            for (int i = 0; i < howManyMore; i++) {
+                l.add("extra-" + i);
+            }
+            return completedFuture(l);
+        }, options);
+    }
+}
diff --git a/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java
new file mode 100644
index 0000000..8f41441
--- /dev/null
+++ b/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java
@@ -0,0 +1,95 @@
+package org.dataloader.fixtures.parameterized;
+
+import org.dataloader.DataLoader;
+import org.dataloader.DataLoaderOptions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.dataloader.DataLoaderFactory.newMappedDataLoader;
+import static org.dataloader.fixtures.TestKit.futureError;
+
+public class MappedDataLoaderFactory implements TestDataLoaderFactory {
+
+    @Override
+    public <K> DataLoader<K, K> idLoader(
+            DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newMappedDataLoader((keys) -> {
+            loadCalls.add(new ArrayList<>(keys));
+            Map<K, K> map = new HashMap<>();
+            keys.forEach(k -> map.put(k, k));
+            return completedFuture(map);
+        }, options);
+    }
+
+    @Override
+    public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newMappedDataLoader((keys) -> {
+            loadCalls.add(new ArrayList<>(keys));
+            return futureError();
+        }, options);
+    }
+
+    @Override
+    public <K> DataLoader<K, Object> idLoaderAllExceptions(
+            DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newMappedDataLoader(keys -> {
+            loadCalls.add(new ArrayList<>(keys));
+            Map<K, Object> errorByKey = new HashMap<>();
+            keys.forEach(k -> errorByKey.put(k, new IllegalStateException("Error")));
+            return completedFuture(errorByKey);
+        }, options);
+    }
+
+    @Override
+    public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
+            DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
+        return newMappedDataLoader(keys -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            Map<Integer, Object> errorByKey = new HashMap<>();
+            for (Integer key : keys) {
+                if (key % 2 == 0) {
+                    errorByKey.put(key, key);
+                } else {
+                    errorByKey.put(key, new IllegalStateException("Error"));
+                }
+            }
+            return completedFuture(errorByKey);
+        }, options);
+    }
+
+    @Override
+    public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
+        return newMappedDataLoader(keys -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            Map<String, String> collect = List.copyOf(keys).subList(0, N).stream().collect(Collectors.toMap(
+                    k -> k, v -> v
+            ));
+            return completedFuture(collect);
+        }, options);
+    }
+
+    @Override
+    public DataLoader<String, String> idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList<Object> loadCalls) {
+        return newMappedDataLoader(keys -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            List<String> l = new ArrayList<>(keys);
+            for (int i = 0; i < howManyMore; i++) {
+                l.add("extra-" + i);
+            }
+
+            Map<String, String> collect = l.stream().collect(Collectors.toMap(
+                    k -> k, v -> v
+            ));
+            return completedFuture(collect);
+        }, options);
+    }
+}
diff --git a/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java
new file mode 100644
index 0000000..f5c1ad5
--- /dev/null
+++ b/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java
@@ -0,0 +1,104 @@
+package org.dataloader.fixtures.parameterized;
+
+import org.dataloader.DataLoader;
+import org.dataloader.DataLoaderOptions;
+import org.dataloader.Try;
+import reactor.core.publisher.Flux;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoader;
+import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoaderWithTry;
+
+public class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory {
+
+    @Override
+    public <K> DataLoader<K, K> idLoader(
+            DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newMappedPublisherDataLoader((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+            Map<K, K> map = new HashMap<>();
+            keys.forEach(k -> map.put(k, k));
+            Flux.fromIterable(map.entrySet()).subscribe(subscriber);
+        }, options);
+    }
+
+    @Override
+    public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newMappedPublisherDataLoader((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+            Flux.<Map.Entry<K, K>>error(new IllegalStateException("Error")).subscribe(subscriber);
+        }, options);
+    }
+
+    @Override
+    public <K> DataLoader<K, Object> idLoaderAllExceptions(
+            DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+            Stream<Map.Entry<K, Try<Object>>> failures = keys.stream().map(k -> Map.entry(k, Try.failed(new IllegalStateException("Error"))));
+            Flux.fromStream(failures).subscribe(subscriber);
+        }, options);
+    }
+
+    @Override
+    public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
+            DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
+        return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            Map<Integer, Try<Object>> errorByKey = new HashMap<>();
+            for (Integer key : keys) {
+                if (key % 2 == 0) {
+                    errorByKey.put(key, Try.succeeded(key));
+                } else {
+                    errorByKey.put(key, Try.failed(new IllegalStateException("Error")));
+                }
+            }
+            Flux.fromIterable(errorByKey.entrySet()).subscribe(subscriber);
+        }, options);
+    }
+
+    @Override
+    public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newMappedPublisherDataLoader((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            List<K> nKeys = keys.subList(0, N);
+            Flux<Map.Entry<K, K>> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k));
+            subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
+                    .subscribe(subscriber);
+        }, options);
+    }
+
+    @Override
+    public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
+        return newMappedPublisherDataLoader((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            List<String> nKeys = keys.subList(0, N);
+            Flux.fromIterable(nKeys).map(k -> Map.entry(k, k))
+                    .subscribe(subscriber);
+        }, options);
+    }
+
+    @Override
+    public DataLoader<String, String> idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList<Object> loadCalls) {
+        return newMappedPublisherDataLoader((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            List<String> l = new ArrayList<>(keys);
+            for (int i = 0; i < howManyMore; i++) {
+                l.add("extra-" + i);
+            }
+
+            Flux.fromIterable(l).map(k -> Map.entry(k, k))
+                    .subscribe(subscriber);
+        }, options);
+    }
+}
diff --git a/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java
new file mode 100644
index 0000000..d75ff38
--- /dev/null
+++ b/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java
@@ -0,0 +1,100 @@
+package org.dataloader.fixtures.parameterized;
+
+import org.dataloader.DataLoader;
+import org.dataloader.DataLoaderOptions;
+import org.dataloader.Try;
+import reactor.core.publisher.Flux;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.dataloader.DataLoaderFactory.newPublisherDataLoader;
+import static org.dataloader.DataLoaderFactory.newPublisherDataLoaderWithTry;
+
+public class PublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory {
+
+    @Override
+    public <K> DataLoader<K, K> idLoader(
+            DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newPublisherDataLoader((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+            Flux.fromIterable(keys).subscribe(subscriber);
+        }, options);
+    }
+
+    @Override
+    public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newPublisherDataLoader((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+            Flux.<K>error(new IllegalStateException("Error")).subscribe(subscriber);
+        }, options);
+    }
+
+    @Override
+    public <K> DataLoader<K, Object> idLoaderAllExceptions(
+            DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newPublisherDataLoaderWithTry((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+            Stream<Try<Object>> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error")));
+            Flux.fromStream(failures).subscribe(subscriber);
+        }, options);
+    }
+
+    @Override
+    public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
+            DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
+        return newPublisherDataLoaderWithTry((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            List<Try<Object>> errors = new ArrayList<>();
+            for (Integer key : keys) {
+                if (key % 2 == 0) {
+                    errors.add(Try.succeeded(key));
+                } else {
+                    errors.add(Try.failed(new IllegalStateException("Error")));
+                }
+            }
+            Flux.fromIterable(errors).subscribe(subscriber);
+        }, options);
+    }
+
+    @Override
+    public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls) {
+        return newPublisherDataLoader((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            List<K> nKeys = keys.subList(0, N);
+            Flux<K> subFlux = Flux.fromIterable(nKeys);
+            subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
+                    .subscribe(subscriber);
+        }, options);
+    }
+
+    @Override
+    public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
+        return newPublisherDataLoader((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            List<String> nKeys = keys.subList(0, N);
+            Flux.fromIterable(nKeys)
+                    .subscribe(subscriber);
+        }, options);
+    }
+
+    @Override
+    public DataLoader<String, String> idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList<Object> loadCalls) {
+        return newPublisherDataLoader((keys, subscriber) -> {
+            loadCalls.add(new ArrayList<>(keys));
+
+            List<String> l = new ArrayList<>(keys);
+            for (int i = 0; i < howManyMore; i++) {
+                l.add("extra-" + i);
+            }
+
+            Flux.fromIterable(l)
+                    .subscribe(subscriber);
+        }, options);
+    }
+}
diff --git a/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java
new file mode 100644
index 0000000..8c1bc22
--- /dev/null
+++ b/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java
@@ -0,0 +1,22 @@
+package org.dataloader.fixtures.parameterized;
+
+import org.dataloader.DataLoader;
+import org.dataloader.DataLoaderOptions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public interface TestDataLoaderFactory {
+    <K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<K>> loadCalls);
+
+    <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Collection<K>> loadCalls);
+
+    <K> DataLoader<K, Object> idLoaderAllExceptions(DataLoaderOptions options, List<Collection<K>> loadCalls);
+
+    DataLoader<Integer, Object> idLoaderOddEvenExceptions(DataLoaderOptions options, List<Collection<Integer>> loadCalls);
+
+    DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls);
+
+    DataLoader<String, String> idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList<Object> loadCalls);
+}
diff --git a/src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java
new file mode 100644
index 0000000..d45932c
--- /dev/null
+++ b/src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java
@@ -0,0 +1,11 @@
+package org.dataloader.fixtures.parameterized;
+
+import org.dataloader.DataLoader;
+import org.dataloader.DataLoaderOptions;
+
+import java.util.Collection;
+import java.util.List;
+
+public interface TestReactiveDataLoaderFactory {
+    <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls);
+}