From 1151ccbf4b531c2dbd12dc298f3018535fdb4ebc Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Fri, 29 Sep 2023 20:54:30 +1000 Subject: [PATCH 1/7] Adds a predicate to DataLoaderRegistry and a per dataloader map of predicates is also possible --- .../org/dataloader/DataLoaderRegistry.java | 143 ++++++++++++++++-- .../registries/DispatchPredicate.java | 10 ++ .../ScheduledDataLoaderRegistry.java | 60 +------- 3 files changed, 142 insertions(+), 71 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 9b19c29..48e8d96 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -1,6 +1,7 @@ package org.dataloader; import org.dataloader.annotations.PublicApi; +import org.dataloader.registries.DispatchPredicate; import org.dataloader.stats.Statistics; import java.util.ArrayList; @@ -21,12 +22,17 @@ @PublicApi public class DataLoaderRegistry { protected final Map> dataLoaders = new ConcurrentHashMap<>(); + protected final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); + protected final DispatchPredicate dispatchPredicate; + public DataLoaderRegistry() { + this.dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; } - private DataLoaderRegistry(Builder builder) { + protected DataLoaderRegistry(Builder builder) { this.dataLoaders.putAll(builder.dataLoaders); + this.dispatchPredicate = builder.dispatchPredicate; } @@ -43,6 +49,21 @@ public DataLoaderRegistry register(String key, DataLoader dataLoader) { return this; } + /** + * This will register a new dataloader and dispatch predicate associated with that data loader + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * @param dispatchPredicate the dispatch predicate to associate with this data loader + * + * @return this registry + */ + public DataLoaderRegistry register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { + dataLoaders.put(key, dataLoader); + dataLoaderPredicates.put(dataLoader, dispatchPredicate); + return this; + } + /** * Computes a data loader if absent or return it if it was * already registered at that key. @@ -76,6 +97,8 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { this.dataLoaders.forEach(combined::register); registry.dataLoaders.forEach(combined::register); + combined.dataLoaderPredicates.putAll(this.dataLoaderPredicates); + combined.dataLoaderPredicates.putAll(registry.dataLoaderPredicates); return combined; } @@ -101,7 +124,10 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { * @return this registry */ public DataLoaderRegistry unregister(String key) { - dataLoaders.remove(key); + DataLoader dataLoader = dataLoaders.remove(key); + if (dataLoader != null) { + dataLoaderPredicates.remove(dataLoader); + } return this; } @@ -131,7 +157,7 @@ public Set getKeys() { * {@link org.dataloader.DataLoader}s */ public void dispatchAll() { - getDataLoaders().forEach(DataLoader::dispatch); + dispatchAllWithCount(); } /** @@ -142,8 +168,12 @@ public void dispatchAll() { */ public int dispatchAllWithCount() { int sum = 0; - for (DataLoader dataLoader : getDataLoaders()) { - sum += dataLoader.dispatchWithCounts().getKeysCount(); + for (Map.Entry> entry : dataLoaders.entrySet()) { + DataLoader dataLoader = entry.getValue(); + String key = entry.getKey(); + if (shouldDispatch(key, dataLoader)) { + sum += dataLoader.dispatchWithCounts().getKeysCount(); + } } return sum; } @@ -154,12 +184,59 @@ public int dispatchAllWithCount() { */ public int dispatchDepth() { int totalDispatchDepth = 0; - for (DataLoader dataLoader : getDataLoaders()) { - totalDispatchDepth += dataLoader.dispatchDepth(); + for (Map.Entry> entry : dataLoaders.entrySet()) { + DataLoader dataLoader = entry.getValue(); + String key = entry.getKey(); + if (shouldDispatch(key, dataLoader)) { + totalDispatchDepth += dataLoader.dispatchDepth(); + } } return totalDispatchDepth; } + /** + * This will immediately dispatch the {@link DataLoader}s in the registry + * without testing the predicate + */ + public void dispatchAllImmediately() { + dispatchAllWithCountImmediately(); + } + + /** + * This will immediately dispatch the {@link DataLoader}s in the registry + * without testing the predicate + * + * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. + */ + public int dispatchAllWithCountImmediately() { + int sum = 0; + for (Map.Entry> entry : dataLoaders.entrySet()) { + DataLoader dataLoader = entry.getValue(); + sum += dataLoader.dispatchWithCounts().getKeysCount(); + } + return sum; + } + + + /** + * Returns true if the dataloader has a predicate which returned true, OR the overall + * registry predicate returned true. + * + * @param dataLoaderKey the key in the dataloader map + * @param dataLoader the dataloader + * + * @return true if it should dispatch + */ + protected boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { + DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); + if (dispatchPredicate != null) { + if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { + return true; + } + } + return this.dispatchPredicate.test(dataLoaderKey, dataLoader); + } + /** * @return a combined set of statistics for all data loaders in this registry presented * as the sum of all their statistics @@ -175,13 +252,22 @@ public Statistics getStatistics() { /** * @return A builder of {@link DataLoaderRegistry}s */ - public static Builder newRegistry() { + public static Builder newRegistry() { + //noinspection rawtypes return new Builder(); } - public static class Builder { + public static class Builder> { private final Map> dataLoaders = new HashMap<>(); + private final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); + + private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; + + private B self() { + //noinspection unchecked + return (B) this; + } /** * This will register a new dataloader @@ -191,22 +277,51 @@ public static class Builder { * * @return this builder for a fluent pattern */ - public Builder register(String key, DataLoader dataLoader) { + public B register(String key, DataLoader dataLoader) { dataLoaders.put(key, dataLoader); - return this; + return self(); } /** - * This will combine together the data loaders in this builder with the ones + * This will register a new dataloader with a specific {@link DispatchPredicate} + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * @param dispatchPredicate the dispatch predicate + * + * @return this builder for a fluent pattern + */ + public B register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { + register(key, dataLoader); + dataLoaderPredicates.put(dataLoader, dispatchPredicate); + return self(); + } + + /** + * This will combine the data loaders in this builder with the ones * from a previous {@link DataLoaderRegistry} * * @param otherRegistry the previous {@link DataLoaderRegistry} * * @return this builder for a fluent pattern */ - public Builder registerAll(DataLoaderRegistry otherRegistry) { + public B registerAll(DataLoaderRegistry otherRegistry) { dataLoaders.putAll(otherRegistry.dataLoaders); - return this; + dataLoaderPredicates.putAll(otherRegistry.dataLoaderPredicates); + return self(); + } + + /** + * This sets a predicate on the {@link DataLoaderRegistry} that will control + * whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched. + * + * @param dispatchPredicate the predicate + * + * @return this builder for a fluent pattern + */ + public B dispatchPredicate(DispatchPredicate dispatchPredicate) { + this.dispatchPredicate = dispatchPredicate; + return self(); } /** diff --git a/src/main/java/org/dataloader/registries/DispatchPredicate.java b/src/main/java/org/dataloader/registries/DispatchPredicate.java index d5bd31b..45e1da0 100644 --- a/src/main/java/org/dataloader/registries/DispatchPredicate.java +++ b/src/main/java/org/dataloader/registries/DispatchPredicate.java @@ -10,6 +10,16 @@ */ @FunctionalInterface public interface DispatchPredicate { + + /** + * A predicate that always returns true + */ + DispatchPredicate DISPATCH_ALWAYS = (dataLoaderKey, dataLoader) -> true; + /** + * A predicate that always returns false + */ + DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> true; + /** * This predicate tests whether the data loader should be dispatched or not. * diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 4be317e..3e7a327 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -5,7 +5,6 @@ import org.dataloader.annotations.ExperimentalApi; import java.time.Duration; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -30,14 +29,12 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable { private final ScheduledExecutorService scheduledExecutorService; - private final DispatchPredicate dispatchPredicate; private final Duration schedule; private volatile boolean closed; private ScheduledDataLoaderRegistry(Builder builder) { - this.dataLoaders.putAll(builder.dataLoaders); + super(builder); this.scheduledExecutorService = builder.scheduledExecutorService; - this.dispatchPredicate = builder.dispatchPredicate; this.schedule = builder.schedule; this.closed = false; } @@ -77,24 +74,6 @@ public int dispatchAllWithCount() { return sum; } - /** - * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicate - */ - public void dispatchAllImmediately() { - super.dispatchAll(); - } - - /** - * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicate - * - * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. - */ - public int dispatchAllWithCountImmediately() { - return super.dispatchAllWithCount(); - } - /** * This will schedule a task to check the predicate and dispatch if true right now. It will not do * a pre check of the preodicate like {@link #dispatchAll()} would @@ -111,7 +90,7 @@ private void reschedule(String key, DataLoader dataLoader) { } private void dispatchOrReschedule(String key, DataLoader dataLoader) { - if (dispatchPredicate.test(key, dataLoader)) { + if (shouldDispatch(key, dataLoader)) { dataLoader.dispatch(); } else { reschedule(key, dataLoader); @@ -128,12 +107,10 @@ public static Builder newScheduledRegistry() { return new Builder(); } - public static class Builder { + public static class Builder extends DataLoaderRegistry.Builder { private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - private DispatchPredicate dispatchPredicate = (key, dl) -> true; private Duration schedule = Duration.ofMillis(10); - private final Map> dataLoaders = new HashMap<>(); public Builder scheduledExecutorService(ScheduledExecutorService executorService) { this.scheduledExecutorService = nonNull(executorService); @@ -145,37 +122,6 @@ public Builder schedule(Duration schedule) { return this; } - public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { - this.dispatchPredicate = nonNull(dispatchPredicate); - return this; - } - - /** - * This will register a new dataloader - * - * @param key the key to put the data loader under - * @param dataLoader the data loader to register - * - * @return this builder for a fluent pattern - */ - public Builder register(String key, DataLoader dataLoader) { - dataLoaders.put(key, dataLoader); - return this; - } - - /** - * This will combine together the data loaders in this builder with the ones - * from a previous {@link DataLoaderRegistry} - * - * @param otherRegistry the previous {@link DataLoaderRegistry} - * - * @return this builder for a fluent pattern - */ - public Builder registerAll(DataLoaderRegistry otherRegistry) { - dataLoaders.putAll(otherRegistry.getDataLoadersMap()); - return this; - } - /** * @return the newly built {@link ScheduledDataLoaderRegistry} */ From fb0a072f171e8037f2d82a21da6f19125e9a8b46 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sat, 30 Sep 2023 11:09:01 +1000 Subject: [PATCH 2/7] Adds a predicate to DataLoaderRegistry - added tests --- .../org/dataloader/DataLoaderRegistry.java | 42 ++-- .../registries/DispatchPredicate.java | 2 +- .../DataLoaderRegistryPredicateTest.java | 198 ++++++++++++++++++ .../java/org/dataloader/fixtures/TestKit.java | 11 + 4 files changed, 233 insertions(+), 20 deletions(-) create mode 100644 src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 48e8d96..aa01baa 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -15,7 +15,7 @@ import java.util.function.Function; /** - * This allows data loaders to be registered together into a single place so + * This allows data loaders to be registered together into a single place, so * they can be dispatched as one. It also allows you to retrieve data loaders by * name from a central place */ @@ -32,6 +32,7 @@ public DataLoaderRegistry() { protected DataLoaderRegistry(Builder builder) { this.dataLoaders.putAll(builder.dataLoaders); + this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates); this.dispatchPredicate = builder.dispatchPredicate; } @@ -116,6 +117,20 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { return new LinkedHashMap<>(dataLoaders); } + /** + * @return the current dispatch predicate + */ + public DispatchPredicate getDispatchPredicate() { + return dispatchPredicate; + } + + /** + * @return a map of data loaders to specific dispatch predicates + */ + public Map, DispatchPredicate> getDataLoaderPredicates() { + return new LinkedHashMap<>(dataLoaderPredicates); + } + /** * This will unregister a new dataloader * @@ -153,7 +168,7 @@ public Set getKeys() { } /** - * This will called {@link org.dataloader.DataLoader#dispatch()} on each of the registered + * This will be called {@link org.dataloader.DataLoader#dispatch()} on each of the registered * {@link org.dataloader.DataLoader}s */ public void dispatchAll() { @@ -183,20 +198,12 @@ public int dispatchAllWithCount() { * {@link org.dataloader.DataLoader}s */ public int dispatchDepth() { - int totalDispatchDepth = 0; - for (Map.Entry> entry : dataLoaders.entrySet()) { - DataLoader dataLoader = entry.getValue(); - String key = entry.getKey(); - if (shouldDispatch(key, dataLoader)) { - totalDispatchDepth += dataLoader.dispatchDepth(); - } - } - return totalDispatchDepth; + return dataLoaders.values().stream().mapToInt(DataLoader::dispatchDepth).sum(); } /** * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicate + * without testing the predicates */ public void dispatchAllImmediately() { dispatchAllWithCountImmediately(); @@ -204,17 +211,14 @@ public void dispatchAllImmediately() { /** * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicate + * without testing the predicates * * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. */ public int dispatchAllWithCountImmediately() { - int sum = 0; - for (Map.Entry> entry : dataLoaders.entrySet()) { - DataLoader dataLoader = entry.getValue(); - sum += dataLoader.dispatchWithCounts().getKeysCount(); - } - return sum; + return dataLoaders.values().stream() + .mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount()) + .sum(); } diff --git a/src/main/java/org/dataloader/registries/DispatchPredicate.java b/src/main/java/org/dataloader/registries/DispatchPredicate.java index 45e1da0..247a51a 100644 --- a/src/main/java/org/dataloader/registries/DispatchPredicate.java +++ b/src/main/java/org/dataloader/registries/DispatchPredicate.java @@ -18,7 +18,7 @@ public interface DispatchPredicate { /** * A predicate that always returns false */ - DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> true; + DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> false; /** * This predicate tests whether the data loader should be dispatched or not. diff --git a/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java b/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java new file mode 100644 index 0000000..56e4f90 --- /dev/null +++ b/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java @@ -0,0 +1,198 @@ +package org.dataloader; + +import org.dataloader.registries.DispatchPredicate; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import static java.util.Arrays.asList; +import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.dataloader.fixtures.TestKit.asSet; +import static org.dataloader.registries.DispatchPredicate.DISPATCH_NEVER; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +public class DataLoaderRegistryPredicateTest { + final BatchLoader identityBatchLoader = CompletableFuture::completedFuture; + + static class CountingDispatchPredicate implements DispatchPredicate { + int count = 0; + int max = 0; + + public CountingDispatchPredicate(int max) { + this.max = max; + } + + @Override + public boolean test(String dataLoaderKey, DataLoader dataLoader) { + boolean shouldFire = count >= max; + count++; + return shouldFire; + } + } + + @Test + public void predicate_registration_works() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateA = new CountingDispatchPredicate(1); + DispatchPredicate predicateB = new CountingDispatchPredicate(2); + DispatchPredicate predicateC = new CountingDispatchPredicate(3); + + DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("a", dlA, predicateA) + .register("b", dlB, predicateB) + .register("c", dlC, predicateC) + .dispatchPredicate(predicateOverAll) + .build(); + + assertThat(registry.getDataLoaders(), equalTo(asList(dlA, dlB, dlC))); + assertThat(registry.getDataLoadersMap().keySet(), equalTo(asSet("a", "b", "c"))); + assertThat(asSet(registry.getDataLoadersMap().values()), equalTo(asSet(dlA, dlB, dlC))); + assertThat(registry.getDispatchPredicate(), equalTo(predicateOverAll)); + assertThat(asSet(registry.getDataLoaderPredicates().values()), equalTo(asSet(predicateA, predicateB, predicateC))); + + // and unregister (fluently) + DataLoaderRegistry dlR = registry.unregister("c"); + assertThat(dlR, equalTo(registry)); + + assertThat(registry.getDataLoaders(), equalTo(asList(dlA, dlB))); + assertThat(registry.getDispatchPredicate(), equalTo(predicateOverAll)); + assertThat(asSet(registry.getDataLoaderPredicates().values()), equalTo(asSet(predicateA, predicateB))); + + // direct on the registry works + registry.register("c", dlC, predicateC); + assertThat(registry.getDataLoaders(), equalTo(asList(dlA, dlB, dlC))); + assertThat(registry.getDispatchPredicate(), equalTo(predicateOverAll)); + assertThat(asSet(registry.getDataLoaderPredicates().values()), equalTo(asSet(predicateA, predicateB, predicateC))); + + } + + @Test + public void predicate_firing_works() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateA = new CountingDispatchPredicate(1); + DispatchPredicate predicateB = new CountingDispatchPredicate(2); + DispatchPredicate predicateC = new CountingDispatchPredicate(3); + + DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("a", dlA, predicateA) + .register("b", dlB, predicateB) + .register("c", dlC, predicateC) + .dispatchPredicate(predicateOverAll) + .build(); + + + CompletableFuture cfA = dlA.load("A"); + CompletableFuture cfB = dlB.load("B"); + CompletableFuture cfC = dlC.load("C"); + + int count = registry.dispatchAllWithCount(); // first firing + // none should fire + assertThat(count, equalTo(0)); + assertThat(cfA.isDone(), equalTo(false)); + assertThat(cfB.isDone(), equalTo(false)); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // second firing + // one should fire + assertThat(count, equalTo(1)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfA.join(), equalTo("A")); + + assertThat(cfB.isDone(), equalTo(false)); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // third firing + assertThat(count, equalTo(1)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfB.join(), equalTo("B")); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // fourth firing + assertThat(count, equalTo(1)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfC.isDone(), equalTo(true)); + assertThat(cfC.join(), equalTo("C")); + } + + @Test + public void test_the_registry_overall_predicate_firing_works() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateOverAllOnThree = new CountingDispatchPredicate(3); + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("a", dlA, DISPATCH_NEVER) + .register("b", dlB, DISPATCH_NEVER) + .register("c", dlC, DISPATCH_NEVER) + .dispatchPredicate(predicateOverAllOnThree) + .build(); + + + CompletableFuture cfA = dlA.load("A"); + CompletableFuture cfB = dlB.load("B"); + CompletableFuture cfC = dlC.load("C"); + + int count = registry.dispatchAllWithCount(); // first firing + // none should fire + assertThat(count, equalTo(0)); + assertThat(cfA.isDone(), equalTo(false)); + assertThat(cfB.isDone(), equalTo(false)); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // second firing but the overall been asked 3 times already + assertThat(count, equalTo(3)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfC.isDone(), equalTo(true)); + } + + @Test + public void dispatch_immediate_firing_works() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateA = new CountingDispatchPredicate(1); + DispatchPredicate predicateB = new CountingDispatchPredicate(2); + DispatchPredicate predicateC = new CountingDispatchPredicate(3); + + DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("a", dlA, predicateA) + .register("b", dlB, predicateB) + .register("c", dlC, predicateC) + .dispatchPredicate(predicateOverAll) + .build(); + + + CompletableFuture cfA = dlA.load("A"); + CompletableFuture cfB = dlB.load("B"); + CompletableFuture cfC = dlC.load("C"); + + int count = registry.dispatchAllWithCountImmediately(); // all should fire + assertThat(count, equalTo(3)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfA.join(), equalTo("A")); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfB.join(), equalTo("B")); + assertThat(cfC.isDone(), equalTo(true)); + assertThat(cfC.join(), equalTo("C")); + } + +} diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 5c87148..a26c18f 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -6,8 +6,11 @@ import org.dataloader.DataLoaderOptions; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static java.util.stream.Collectors.toList; @@ -67,4 +70,12 @@ public static void snooze(int millis) { public static List sort(Collection collection) { return collection.stream().sorted().collect(toList()); } + + public static Set asSet(T... elements) { + return new LinkedHashSet<>(Arrays.asList(elements)); + } + + public static Set asSet(Collection elements) { + return new LinkedHashSet<>(elements); + } } From c528455564dab0f849c45a5e5fbd80ed123bacf1 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sat, 30 Sep 2023 19:36:07 +1000 Subject: [PATCH 3/7] Adds a predicate to DataLoaderRegistry - added more tests --- .../dataloader/DataLoaderRegistryPredicateTest.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java b/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java index 56e4f90..579ad81 100644 --- a/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java +++ b/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java @@ -133,13 +133,13 @@ public void test_the_registry_overall_predicate_firing_works() { DataLoader dlB = newDataLoader(identityBatchLoader); DataLoader dlC = newDataLoader(identityBatchLoader); - DispatchPredicate predicateOverAllOnThree = new CountingDispatchPredicate(3); + DispatchPredicate predicateOnSix = new CountingDispatchPredicate(6); DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() .register("a", dlA, DISPATCH_NEVER) .register("b", dlB, DISPATCH_NEVER) .register("c", dlC, DISPATCH_NEVER) - .dispatchPredicate(predicateOverAllOnThree) + .dispatchPredicate(predicateOnSix) .build(); @@ -148,13 +148,18 @@ public void test_the_registry_overall_predicate_firing_works() { CompletableFuture cfC = dlC.load("C"); int count = registry.dispatchAllWithCount(); // first firing - // none should fire assertThat(count, equalTo(0)); assertThat(cfA.isDone(), equalTo(false)); assertThat(cfB.isDone(), equalTo(false)); assertThat(cfC.isDone(), equalTo(false)); - count = registry.dispatchAllWithCount(); // second firing but the overall been asked 3 times already + count = registry.dispatchAllWithCount(); // second firing but the overall been asked 6 times already + assertThat(count, equalTo(0)); + assertThat(cfA.isDone(), equalTo(false)); + assertThat(cfB.isDone(), equalTo(false)); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // third firing but the overall been asked 9 times already assertThat(count, equalTo(3)); assertThat(cfA.isDone(), equalTo(true)); assertThat(cfB.isDone(), equalTo(true)); From c0c6eef673bc0b981a35c5bf4d24765451e4f21f Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Tue, 3 Oct 2023 10:34:58 +1100 Subject: [PATCH 4/7] Adds a predicate to ScheduledDataLoaderRegistry - no longer in DLR --- .../org/dataloader/DataLoaderRegistry.java | 138 ++------------- .../org/dataloader/annotations/GuardedBy.java | 2 +- .../ScheduledDataLoaderRegistry.java | 164 +++++++++++++++++- ...duledDataLoaderRegistryPredicateTest.java} | 57 +++++- 4 files changed, 222 insertions(+), 139 deletions(-) rename src/test/java/org/dataloader/{DataLoaderRegistryPredicateTest.java => registries/ScheduledDataLoaderRegistryPredicateTest.java} (77%) diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index aa01baa..3128d2c 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -1,7 +1,6 @@ package org.dataloader; import org.dataloader.annotations.PublicApi; -import org.dataloader.registries.DispatchPredicate; import org.dataloader.stats.Statistics; import java.util.ArrayList; @@ -22,18 +21,12 @@ @PublicApi public class DataLoaderRegistry { protected final Map> dataLoaders = new ConcurrentHashMap<>(); - protected final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); - protected final DispatchPredicate dispatchPredicate; - public DataLoaderRegistry() { - this.dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; } protected DataLoaderRegistry(Builder builder) { this.dataLoaders.putAll(builder.dataLoaders); - this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates); - this.dispatchPredicate = builder.dispatchPredicate; } @@ -50,21 +43,6 @@ public DataLoaderRegistry register(String key, DataLoader dataLoader) { return this; } - /** - * This will register a new dataloader and dispatch predicate associated with that data loader - * - * @param key the key to put the data loader under - * @param dataLoader the data loader to register - * @param dispatchPredicate the dispatch predicate to associate with this data loader - * - * @return this registry - */ - public DataLoaderRegistry register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { - dataLoaders.put(key, dataLoader); - dataLoaderPredicates.put(dataLoader, dispatchPredicate); - return this; - } - /** * Computes a data loader if absent or return it if it was * already registered at that key. @@ -98,8 +76,6 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { this.dataLoaders.forEach(combined::register); registry.dataLoaders.forEach(combined::register); - combined.dataLoaderPredicates.putAll(this.dataLoaderPredicates); - combined.dataLoaderPredicates.putAll(registry.dataLoaderPredicates); return combined; } @@ -117,20 +93,6 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { return new LinkedHashMap<>(dataLoaders); } - /** - * @return the current dispatch predicate - */ - public DispatchPredicate getDispatchPredicate() { - return dispatchPredicate; - } - - /** - * @return a map of data loaders to specific dispatch predicates - */ - public Map, DispatchPredicate> getDataLoaderPredicates() { - return new LinkedHashMap<>(dataLoaderPredicates); - } - /** * This will unregister a new dataloader * @@ -139,10 +101,7 @@ public DispatchPredicate getDispatchPredicate() { * @return this registry */ public DataLoaderRegistry unregister(String key) { - DataLoader dataLoader = dataLoaders.remove(key); - if (dataLoader != null) { - dataLoaderPredicates.remove(dataLoader); - } + dataLoaders.remove(key); return this; } @@ -168,11 +127,11 @@ public Set getKeys() { } /** - * This will be called {@link org.dataloader.DataLoader#dispatch()} on each of the registered + * This will called {@link org.dataloader.DataLoader#dispatch()} on each of the registered * {@link org.dataloader.DataLoader}s */ public void dispatchAll() { - dispatchAllWithCount(); + getDataLoaders().forEach(DataLoader::dispatch); } /** @@ -183,12 +142,8 @@ public void dispatchAll() { */ public int dispatchAllWithCount() { int sum = 0; - for (Map.Entry> entry : dataLoaders.entrySet()) { - DataLoader dataLoader = entry.getValue(); - String key = entry.getKey(); - if (shouldDispatch(key, dataLoader)) { - sum += dataLoader.dispatchWithCounts().getKeysCount(); - } + for (DataLoader dataLoader : getDataLoaders()) { + sum += dataLoader.dispatchWithCounts().getKeysCount(); } return sum; } @@ -198,47 +153,11 @@ public int dispatchAllWithCount() { * {@link org.dataloader.DataLoader}s */ public int dispatchDepth() { - return dataLoaders.values().stream().mapToInt(DataLoader::dispatchDepth).sum(); - } - - /** - * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicates - */ - public void dispatchAllImmediately() { - dispatchAllWithCountImmediately(); - } - - /** - * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicates - * - * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. - */ - public int dispatchAllWithCountImmediately() { - return dataLoaders.values().stream() - .mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount()) - .sum(); - } - - - /** - * Returns true if the dataloader has a predicate which returned true, OR the overall - * registry predicate returned true. - * - * @param dataLoaderKey the key in the dataloader map - * @param dataLoader the dataloader - * - * @return true if it should dispatch - */ - protected boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { - DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); - if (dispatchPredicate != null) { - if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { - return true; - } + int totalDispatchDepth = 0; + for (DataLoader dataLoader : getDataLoaders()) { + totalDispatchDepth += dataLoader.dispatchDepth(); } - return this.dispatchPredicate.test(dataLoaderKey, dataLoader); + return totalDispatchDepth; } /** @@ -256,19 +175,15 @@ public Statistics getStatistics() { /** * @return A builder of {@link DataLoaderRegistry}s */ - public static Builder newRegistry() { - //noinspection rawtypes + public static Builder newRegistry() { return new Builder(); } public static class Builder> { private final Map> dataLoaders = new HashMap<>(); - private final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); - - private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; - private B self() { + protected B self() { //noinspection unchecked return (B) this; } @@ -287,22 +202,7 @@ public B register(String key, DataLoader dataLoader) { } /** - * This will register a new dataloader with a specific {@link DispatchPredicate} - * - * @param key the key to put the data loader under - * @param dataLoader the data loader to register - * @param dispatchPredicate the dispatch predicate - * - * @return this builder for a fluent pattern - */ - public B register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { - register(key, dataLoader); - dataLoaderPredicates.put(dataLoader, dispatchPredicate); - return self(); - } - - /** - * This will combine the data loaders in this builder with the ones + * This will combine together the data loaders in this builder with the ones * from a previous {@link DataLoaderRegistry} * * @param otherRegistry the previous {@link DataLoaderRegistry} @@ -311,20 +211,6 @@ public B register(String key, DataLoader dataLoader, DispatchPredicate dis */ public B registerAll(DataLoaderRegistry otherRegistry) { dataLoaders.putAll(otherRegistry.dataLoaders); - dataLoaderPredicates.putAll(otherRegistry.dataLoaderPredicates); - return self(); - } - - /** - * This sets a predicate on the {@link DataLoaderRegistry} that will control - * whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched. - * - * @param dispatchPredicate the predicate - * - * @return this builder for a fluent pattern - */ - public B dispatchPredicate(DispatchPredicate dispatchPredicate) { - this.dispatchPredicate = dispatchPredicate; return self(); } diff --git a/src/main/java/org/dataloader/annotations/GuardedBy.java b/src/main/java/org/dataloader/annotations/GuardedBy.java index c26b2ef..85c5765 100644 --- a/src/main/java/org/dataloader/annotations/GuardedBy.java +++ b/src/main/java/org/dataloader/annotations/GuardedBy.java @@ -15,7 +15,7 @@ public @interface GuardedBy { /** - * The lock that should be held. + * @return The lock that should be held. */ String value(); } diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 3e7a327..e86b93e 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -5,7 +5,9 @@ import org.dataloader.annotations.ExperimentalApi; import java.time.Duration; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -28,6 +30,8 @@ @ExperimentalApi public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable { + private final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); + private final DispatchPredicate dispatchPredicate; private final ScheduledExecutorService scheduledExecutorService; private final Duration schedule; private volatile boolean closed; @@ -37,6 +41,8 @@ private ScheduledDataLoaderRegistry(Builder builder) { this.scheduledExecutorService = builder.scheduledExecutorService; this.schedule = builder.schedule; this.closed = false; + this.dispatchPredicate = builder.dispatchPredicate; + this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates); } /** @@ -54,6 +60,86 @@ public Duration getScheduleDuration() { return schedule; } + /** + * This will combine all the current data loaders in this registry and all the data loaders from the specified registry + * and return a new combined registry + * + * @param registry the registry to combine into this registry + * + * @return a new combined registry + */ + public DataLoaderRegistry combine(DataLoaderRegistry registry) { + Builder combinedBuilder = ScheduledDataLoaderRegistry.newScheduledRegistry() + .dispatchPredicate(this.dispatchPredicate); + combinedBuilder.registerAll(this); + combinedBuilder.registerAll(registry); + return combinedBuilder.build(); + } + + + /** + * This will unregister a new dataloader + * + * @param key the key of the data loader to unregister + * + * @return this registry + */ + public ScheduledDataLoaderRegistry unregister(String key) { + DataLoader dataLoader = dataLoaders.remove(key); + if (dataLoader != null) { + dataLoaderPredicates.remove(dataLoader); + } + return this; + } + + /** + * @return the current dispatch predicate + */ + public DispatchPredicate getDispatchPredicate() { + return dispatchPredicate; + } + + /** + * @return a map of data loaders to specific dispatch predicates + */ + public Map, DispatchPredicate> getDataLoaderPredicates() { + return new LinkedHashMap<>(dataLoaderPredicates); + } + + /** + * This will register a new dataloader and dispatch predicate associated with that data loader + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * @param dispatchPredicate the dispatch predicate to associate with this data loader + * + * @return this registry + */ + public DataLoaderRegistry register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { + dataLoaders.put(key, dataLoader); + dataLoaderPredicates.put(dataLoader, dispatchPredicate); + return this; + } + + /** + * Returns true if the dataloader has a predicate which returned true, OR the overall + * registry predicate returned true. + * + * @param dataLoaderKey the key in the dataloader map + * @param dataLoader the dataloader + * + * @return true if it should dispatch + */ + private boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { + DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); + if (dispatchPredicate != null) { + if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { + return true; + } + } + return this.dispatchPredicate.test(dataLoaderKey, dataLoader); + } + @Override public void dispatchAll() { dispatchAllWithCount(); @@ -65,7 +151,7 @@ public int dispatchAllWithCount() { for (Map.Entry> entry : dataLoaders.entrySet()) { DataLoader dataLoader = entry.getValue(); String key = entry.getKey(); - if (dispatchPredicate.test(key, dataLoader)) { + if (shouldDispatch(key, dataLoader)) { sum += dataLoader.dispatchWithCounts().getKeysCount(); } else { reschedule(key, dataLoader); @@ -74,6 +160,28 @@ public int dispatchAllWithCount() { return sum; } + + /** + * This will immediately dispatch the {@link DataLoader}s in the registry + * without testing the predicates + */ + public void dispatchAllImmediately() { + dispatchAllWithCountImmediately(); + } + + /** + * This will immediately dispatch the {@link DataLoader}s in the registry + * without testing the predicates + * + * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. + */ + public int dispatchAllWithCountImmediately() { + return dataLoaders.values().stream() + .mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount()) + .sum(); + } + + /** * This will schedule a task to check the predicate and dispatch if true right now. It will not do * a pre check of the preodicate like {@link #dispatchAll()} would @@ -112,14 +220,64 @@ public static class Builder extends DataLoaderRegistry.Builder, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); + + private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; + public Builder scheduledExecutorService(ScheduledExecutorService executorService) { this.scheduledExecutorService = nonNull(executorService); - return this; + return self(); } public Builder schedule(Duration schedule) { this.schedule = schedule; - return this; + return self(); + } + + + /** + * This will register a new dataloader with a specific {@link DispatchPredicate} + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * @param dispatchPredicate the dispatch predicate + * + * @return this builder for a fluent pattern + */ + public Builder register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { + register(key, dataLoader); + dataLoaderPredicates.put(dataLoader, dispatchPredicate); + return self(); + } + + /** + * This will combine the data loaders in this builder with the ones + * from a previous {@link DataLoaderRegistry} + * + * @param otherRegistry the previous {@link DataLoaderRegistry} + * + * @return this builder for a fluent pattern + */ + public Builder registerAll(DataLoaderRegistry otherRegistry) { + super.registerAll(otherRegistry); + if (otherRegistry instanceof ScheduledDataLoaderRegistry) { + ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry) otherRegistry; + dataLoaderPredicates.putAll(other.dataLoaderPredicates); + } + return self(); + } + + /** + * This sets a predicate on the {@link DataLoaderRegistry} that will control + * whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched. + * + * @param dispatchPredicate the predicate + * + * @return this builder for a fluent pattern + */ + public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { + this.dispatchPredicate = dispatchPredicate; + return self(); } /** diff --git a/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java similarity index 77% rename from src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java rename to src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java index 579ad81..43da82f 100644 --- a/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java @@ -1,18 +1,23 @@ -package org.dataloader; +package org.dataloader.registries; -import org.dataloader.registries.DispatchPredicate; +import org.dataloader.BatchLoader; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderRegistry; import org.junit.Test; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import static java.util.Arrays.asList; +import static org.awaitility.Awaitility.await; import static org.dataloader.DataLoaderFactory.newDataLoader; import static org.dataloader.fixtures.TestKit.asSet; import static org.dataloader.registries.DispatchPredicate.DISPATCH_NEVER; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -public class DataLoaderRegistryPredicateTest { +public class ScheduledDataLoaderRegistryPredicateTest { final BatchLoader identityBatchLoader = CompletableFuture::completedFuture; static class CountingDispatchPredicate implements DispatchPredicate { @@ -43,7 +48,7 @@ public void predicate_registration_works() { DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); - DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() .register("a", dlA, predicateA) .register("b", dlB, predicateB) .register("c", dlC, predicateC) @@ -82,13 +87,14 @@ public void predicate_firing_works() { DispatchPredicate predicateB = new CountingDispatchPredicate(2); DispatchPredicate predicateC = new CountingDispatchPredicate(3); - DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); + DispatchPredicate predicateOnTen = new CountingDispatchPredicate(10); - DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() .register("a", dlA, predicateA) .register("b", dlB, predicateB) .register("c", dlC, predicateC) - .dispatchPredicate(predicateOverAll) + .dispatchPredicate(predicateOnTen) + .schedule(Duration.ofHours(1000)) // make this so long its never rescheduled .build(); @@ -135,11 +141,12 @@ public void test_the_registry_overall_predicate_firing_works() { DispatchPredicate predicateOnSix = new CountingDispatchPredicate(6); - DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() .register("a", dlA, DISPATCH_NEVER) .register("b", dlB, DISPATCH_NEVER) .register("c", dlC, DISPATCH_NEVER) .dispatchPredicate(predicateOnSix) + .schedule(Duration.ofHours(1000)) .build(); @@ -178,11 +185,12 @@ public void dispatch_immediate_firing_works() { DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); - DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() .register("a", dlA, predicateA) .register("b", dlB, predicateB) .register("c", dlC, predicateC) .dispatchPredicate(predicateOverAll) + .schedule(Duration.ofHours(1000)) .build(); @@ -200,4 +208,35 @@ public void dispatch_immediate_firing_works() { assertThat(cfC.join(), equalTo("C")); } + @Test + public void test_the_registry_overall_predicate_firing_works_when_on_schedule() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateOnTwenty = new CountingDispatchPredicate(20); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA, DISPATCH_NEVER) + .register("b", dlB, DISPATCH_NEVER) + .register("c", dlC, DISPATCH_NEVER) + .dispatchPredicate(predicateOnTwenty) + .schedule(Duration.ofMillis(5)) + .build(); + + + CompletableFuture cfA = dlA.load("A"); + CompletableFuture cfB = dlB.load("B"); + CompletableFuture cfC = dlC.load("C"); + + int count = registry.dispatchAllWithCount(); // first firing + assertThat(count, equalTo(0)); + + // the calls will be rescheduled until eventually the counting predicate returns true + await().until(cfA::isDone, is(true)); + + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfC.isDone(), equalTo(true)); + } } From 1c8d48c33f9e8416411f4ab995ab1bceef05d8da Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Tue, 3 Oct 2023 12:50:14 +1100 Subject: [PATCH 5/7] Adds a predicate to ScheduledDataLoaderRegistry - no longer in DLR - code tweaks --- .../registries/ScheduledDataLoaderRegistry.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index e86b93e..b109974 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -68,7 +68,7 @@ public Duration getScheduleDuration() { * * @return a new combined registry */ - public DataLoaderRegistry combine(DataLoaderRegistry registry) { + public ScheduledDataLoaderRegistry combine(DataLoaderRegistry registry) { Builder combinedBuilder = ScheduledDataLoaderRegistry.newScheduledRegistry() .dispatchPredicate(this.dispatchPredicate); combinedBuilder.registerAll(this); @@ -115,7 +115,7 @@ public DispatchPredicate getDispatchPredicate() { * * @return this registry */ - public DataLoaderRegistry register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { + public ScheduledDataLoaderRegistry register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { dataLoaders.put(key, dataLoader); dataLoaderPredicates.put(dataLoader, dispatchPredicate); return this; @@ -206,8 +206,8 @@ private void dispatchOrReschedule(String key, DataLoader dataLoader) { } /** - * By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()} - * and a schedule duration of 10 milli seconds. + * By default, this will create use a {@link Executors#newSingleThreadScheduledExecutor()} + * and a schedule duration of 10 milliseconds. * * @return A builder of {@link ScheduledDataLoaderRegistry}s */ From 69528f1c41464fe312431187f058132c7c830ace Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Wed, 4 Oct 2023 09:57:00 +1100 Subject: [PATCH 6/7] Adds a predicate to ScheduledDataLoaderRegistry - removed generic builders --- .../org/dataloader/DataLoaderRegistry.java | 17 +++------ .../ScheduledDataLoaderRegistry.java | 37 +++++++++++++------ 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 3128d2c..0bc54cb 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -25,7 +25,7 @@ public class DataLoaderRegistry { public DataLoaderRegistry() { } - protected DataLoaderRegistry(Builder builder) { + private DataLoaderRegistry(Builder builder) { this.dataLoaders.putAll(builder.dataLoaders); } @@ -179,15 +179,10 @@ public static Builder newRegistry() { return new Builder(); } - public static class Builder> { + public static class Builder { private final Map> dataLoaders = new HashMap<>(); - protected B self() { - //noinspection unchecked - return (B) this; - } - /** * This will register a new dataloader * @@ -196,9 +191,9 @@ protected B self() { * * @return this builder for a fluent pattern */ - public B register(String key, DataLoader dataLoader) { + public Builder register(String key, DataLoader dataLoader) { dataLoaders.put(key, dataLoader); - return self(); + return this; } /** @@ -209,9 +204,9 @@ public B register(String key, DataLoader dataLoader) { * * @return this builder for a fluent pattern */ - public B registerAll(DataLoaderRegistry otherRegistry) { + public Builder registerAll(DataLoaderRegistry otherRegistry) { dataLoaders.putAll(otherRegistry.dataLoaders); - return self(); + return this; } /** diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index b109974..8ad5ecb 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -37,7 +37,8 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A private volatile boolean closed; private ScheduledDataLoaderRegistry(Builder builder) { - super(builder); + super(); + this.dataLoaders.putAll(builder.dataLoaders); this.scheduledExecutorService = builder.scheduledExecutorService; this.schedule = builder.schedule; this.closed = false; @@ -215,23 +216,35 @@ public static Builder newScheduledRegistry() { return new Builder(); } - public static class Builder extends DataLoaderRegistry.Builder { + public static class Builder { + private final Map> dataLoaders = new LinkedHashMap<>(); + private final Map, DispatchPredicate> dataLoaderPredicates = new LinkedHashMap<>(); + private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private Duration schedule = Duration.ofMillis(10); - private final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); - - private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; - public Builder scheduledExecutorService(ScheduledExecutorService executorService) { this.scheduledExecutorService = nonNull(executorService); - return self(); + return this; } public Builder schedule(Duration schedule) { this.schedule = schedule; - return self(); + return this; + } + + /** + * This will register a new dataloader + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * + * @return this builder for a fluent pattern + */ + public Builder register(String key, DataLoader dataLoader) { + dataLoaders.put(key, dataLoader); + return this; } @@ -247,7 +260,7 @@ public Builder schedule(Duration schedule) { public Builder register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { register(key, dataLoader); dataLoaderPredicates.put(dataLoader, dispatchPredicate); - return self(); + return this; } /** @@ -259,12 +272,12 @@ public Builder register(String key, DataLoader dataLoader, DispatchPredica * @return this builder for a fluent pattern */ public Builder registerAll(DataLoaderRegistry otherRegistry) { - super.registerAll(otherRegistry); + dataLoaders.putAll(otherRegistry.getDataLoadersMap()); if (otherRegistry instanceof ScheduledDataLoaderRegistry) { ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry) otherRegistry; dataLoaderPredicates.putAll(other.dataLoaderPredicates); } - return self(); + return this; } /** @@ -277,7 +290,7 @@ public Builder registerAll(DataLoaderRegistry otherRegistry) { */ public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { this.dispatchPredicate = dispatchPredicate; - return self(); + return this; } /** From 3099f1a9032963e99146f61d4fe8ffdf6a813eb8 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Fri, 6 Oct 2023 10:17:53 +1100 Subject: [PATCH 7/7] Adds a predicate to ScheduledDataLoaderRegistry - improved doco --- .../ScheduledDataLoaderRegistry.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 8ad5ecb..5b1af76 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -15,12 +15,15 @@ import static org.dataloader.impl.Assertions.nonNull; /** - * This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called + * This {@link DataLoaderRegistry} will use {@link DispatchPredicate}s when {@link #dispatchAll()} is called * to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled * to perform that predicate dispatch again via the {@link ScheduledExecutorService}. *

+ * It;s possible to have a {@link DispatchPredicate} per dataloader as well as a default {@link DispatchPredicate} for the + * whole {@link ScheduledDataLoaderRegistry}. + *

* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case - * no rescheduling will occur and you will need to call dispatch again to restart the process. + * no rescheduling will occur, and you will need to call dispatch again to restart the process. *

* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and * call {@link #rescheduleNow()}. @@ -94,17 +97,19 @@ public ScheduledDataLoaderRegistry unregister(String key) { } /** - * @return the current dispatch predicate + * @return a map of data loaders to specific dispatch predicates */ - public DispatchPredicate getDispatchPredicate() { - return dispatchPredicate; + public Map, DispatchPredicate> getDataLoaderPredicates() { + return new LinkedHashMap<>(dataLoaderPredicates); } /** - * @return a map of data loaders to specific dispatch predicates + * There is a default predicate that applies to the whole {@link ScheduledDataLoaderRegistry} + * + * @return the default dispatch predicate */ - public Map, DispatchPredicate> getDataLoaderPredicates() { - return new LinkedHashMap<>(dataLoaderPredicates); + public DispatchPredicate getDispatchPredicate() { + return dispatchPredicate; } /** @@ -281,7 +286,7 @@ public Builder registerAll(DataLoaderRegistry otherRegistry) { } /** - * This sets a predicate on the {@link DataLoaderRegistry} that will control + * This sets a default predicate on the {@link DataLoaderRegistry} that will control * whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched. * * @param dispatchPredicate the predicate