Skip to content

Commit 4675d0e

Browse files
committed
Adding a ScheduledDataLoaderRegistry
1 parent 11a7348 commit 4675d0e

File tree

9 files changed

+916
-268
lines changed

9 files changed

+916
-268
lines changed

README.md

Lines changed: 300 additions & 265 deletions
Large diffs are not rendered by default.

src/main/java/org/dataloader/DataLoaderRegistry.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.ArrayList;
77
import java.util.HashMap;
88
import java.util.HashSet;
9+
import java.util.LinkedHashMap;
910
import java.util.List;
1011
import java.util.Map;
1112
import java.util.Set;
@@ -19,7 +20,7 @@
1920
*/
2021
@PublicApi
2122
public class DataLoaderRegistry {
22-
private final Map<String, DataLoader<?, ?>> dataLoaders = new ConcurrentHashMap<>();
23+
protected final Map<String, DataLoader<?, ?>> dataLoaders = new ConcurrentHashMap<>();
2324

2425
public DataLoaderRegistry() {
2526
}
@@ -28,6 +29,7 @@ private DataLoaderRegistry(Builder builder) {
2829
this.dataLoaders.putAll(builder.dataLoaders);
2930
}
3031

32+
3133
/**
3234
* This will register a new dataloader
3335
*
@@ -84,6 +86,13 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) {
8486
return new ArrayList<>(dataLoaders.values());
8587
}
8688

89+
/**
90+
* @return the currently registered data loaders as a map
91+
*/
92+
public Map<String, DataLoader<?, ?>> getDataLoadersMap() {
93+
return new LinkedHashMap<>(dataLoaders);
94+
}
95+
8796
/**
8897
* This will unregister a new dataloader
8998
*
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package org.dataloader.registries;
2+
3+
import org.dataloader.DataLoader;
4+
5+
import java.time.Duration;
6+
import java.util.Objects;
7+
8+
/**
9+
* A predicate class used by {@link ScheduledDataLoaderRegistry} to decide whether to dispatch or not
10+
*/
11+
@FunctionalInterface
12+
public interface DispatchPredicate {
13+
/**
14+
* This predicate tests whether the data loader should be dispatched or not.
15+
*
16+
* @param dataLoaderKey the key of the data loader when registered
17+
* @param dataLoader the dataloader to dispatch
18+
*
19+
* @return true if the data loader SHOULD be dispatched
20+
*/
21+
boolean test(String dataLoaderKey, DataLoader<?, ?> dataLoader);
22+
23+
24+
/**
25+
* Returns a composed predicate that represents a short-circuiting logical
26+
* AND of this predicate and another.
27+
*
28+
* @param other a predicate that will be logically-ANDed with this
29+
* predicate
30+
*
31+
* @return a composed predicate that represents the short-circuiting logical
32+
* AND of this predicate and the {@code other} predicate
33+
*/
34+
default DispatchPredicate and(DispatchPredicate other) {
35+
Objects.requireNonNull(other);
36+
return (k, dl) -> test(k, dl) && other.test(k, dl);
37+
}
38+
39+
/**
40+
* Returns a predicate that represents the logical negation of this
41+
* predicate.
42+
*
43+
* @return a predicate that represents the logical negation of this
44+
* predicate
45+
*/
46+
default DispatchPredicate negate() {
47+
return (k, dl) -> !test(k, dl);
48+
}
49+
50+
/**
51+
* Returns a composed predicate that represents a short-circuiting logical
52+
* OR of this predicate and another.
53+
*
54+
* @param other a predicate that will be logically-ORed with this
55+
* predicate
56+
*
57+
* @return a composed predicate that represents the short-circuiting logical
58+
* OR of this predicate and the {@code other} predicate
59+
*/
60+
default DispatchPredicate or(DispatchPredicate other) {
61+
Objects.requireNonNull(other);
62+
return (k, dl) -> test(k, dl) || other.test(k, dl);
63+
}
64+
65+
/**
66+
* This predicate will return true if the {@link DataLoader} has not be dispatched
67+
* for at least the duration length of time.
68+
*
69+
* @param duration the length of time to check
70+
*
71+
* @return true if the data loader has not been dispatched in duration time
72+
*/
73+
static DispatchPredicate dispatchIfLongerThan(Duration duration) {
74+
return (dataLoaderKey, dataLoader) -> {
75+
int i = dataLoader.getTimeSinceDispatch().compareTo(duration);
76+
return i > 0;
77+
};
78+
}
79+
80+
/**
81+
* This predicate will return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth.
82+
*
83+
* This will act as minimum batch size. There must be more then `depth` items queued for the predicate to return true.
84+
*
85+
* @param depth the value to be greater than
86+
*
87+
* @return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth.
88+
*/
89+
static DispatchPredicate dispatchIfDepthGreaterThan(int depth) {
90+
return (dataLoaderKey, dataLoader) -> dataLoader.dispatchDepth() > depth;
91+
}
92+
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
package org.dataloader.registries;
2+
3+
import org.dataloader.DataLoader;
4+
import org.dataloader.DataLoaderRegistry;
5+
import org.dataloader.annotations.PublicApi;
6+
7+
import java.time.Duration;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.concurrent.Executors;
11+
import java.util.concurrent.ScheduledExecutorService;
12+
import java.util.concurrent.TimeUnit;
13+
14+
import static org.dataloader.impl.Assertions.nonNull;
15+
16+
/**
17+
* This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called
18+
* to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled
19+
* to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
20+
* <p>
21+
* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
22+
* no rescheduling will occur and you will need to call dispatch again to restart the process.
23+
* <p>
24+
* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
25+
* call {@link #rescheduleNow()}.
26+
*/
27+
@PublicApi
28+
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry {
29+
30+
private final ScheduledExecutorService scheduledExecutorService;
31+
private final DispatchPredicate dispatchPredicate;
32+
private final Duration schedule;
33+
34+
private ScheduledDataLoaderRegistry(Builder builder) {
35+
this.dataLoaders.putAll(builder.dataLoaders);
36+
this.scheduledExecutorService = builder.scheduledExecutorService;
37+
this.dispatchPredicate = builder.dispatchPredicate;
38+
this.schedule = builder.schedule;
39+
}
40+
41+
/**
42+
* @return how long the {@link ScheduledExecutorService} task will wait before checking the predicate again
43+
*/
44+
public Duration getScheduleDuration() {
45+
return schedule;
46+
}
47+
48+
@Override
49+
public void dispatchAll() {
50+
dispatchAllWithCount();
51+
}
52+
53+
@Override
54+
public int dispatchAllWithCount() {
55+
int sum = 0;
56+
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
57+
DataLoader<?, ?> dataLoader = entry.getValue();
58+
String key = entry.getKey();
59+
if (dispatchPredicate.test(key, dataLoader)) {
60+
sum += dataLoader.dispatchWithCounts().getKeysCount();
61+
} else {
62+
reschedule(key, dataLoader);
63+
}
64+
}
65+
return sum;
66+
}
67+
68+
/**
69+
* This will immediately dispatch the {@link DataLoader}s in the registry
70+
* without testing the predicate
71+
*/
72+
public void dispatchAllImmediately() {
73+
super.dispatchAll();
74+
}
75+
76+
/**
77+
* This will immediately dispatch the {@link DataLoader}s in the registry
78+
* without testing the predicate
79+
*
80+
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
81+
*/
82+
public int dispatchAllWithCountImmediately() {
83+
return super.dispatchAllWithCount();
84+
}
85+
86+
/**
87+
* This will schedule a task to check the predicate and dispatch if true right now. It will not do
88+
* a pre check of the preodicate like {@link #dispatchAll()} would
89+
*/
90+
public void rescheduleNow() {
91+
dataLoaders.forEach(this::reschedule);
92+
}
93+
94+
private void reschedule(String key, DataLoader<?, ?> dataLoader) {
95+
Runnable runThis = () -> dispatchOrReschedule(key, dataLoader);
96+
scheduledExecutorService.schedule(runThis, schedule.toMillis(), TimeUnit.MILLISECONDS);
97+
}
98+
99+
private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
100+
if (dispatchPredicate.test(key, dataLoader)) {
101+
dataLoader.dispatch();
102+
} else {
103+
reschedule(key, dataLoader);
104+
}
105+
}
106+
107+
/**
108+
* By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
109+
* and a schedule duration of 10 milli seconds.
110+
*
111+
* @return A builder of {@link ScheduledDataLoaderRegistry}s
112+
*/
113+
public static Builder newScheduledRegistry() {
114+
return new Builder();
115+
}
116+
117+
public static class Builder {
118+
119+
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
120+
private DispatchPredicate dispatchPredicate = (key, dl) -> true;
121+
private Duration schedule = Duration.ofMillis(10);
122+
private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();
123+
124+
public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
125+
this.scheduledExecutorService = nonNull(executorService);
126+
return this;
127+
}
128+
129+
public Builder schedule(Duration schedule) {
130+
this.schedule = schedule;
131+
return this;
132+
}
133+
134+
public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
135+
this.dispatchPredicate = nonNull(dispatchPredicate);
136+
return this;
137+
}
138+
139+
/**
140+
* This will register a new dataloader
141+
*
142+
* @param key the key to put the data loader under
143+
* @param dataLoader the data loader to register
144+
*
145+
* @return this builder for a fluent pattern
146+
*/
147+
public Builder register(String key, DataLoader<?, ?> dataLoader) {
148+
dataLoaders.put(key, dataLoader);
149+
return this;
150+
}
151+
152+
/**
153+
* This will combine together the data loaders in this builder with the ones
154+
* from a previous {@link DataLoaderRegistry}
155+
*
156+
* @param otherRegistry the previous {@link DataLoaderRegistry}
157+
*
158+
* @return this builder for a fluent pattern
159+
*/
160+
public Builder registerAll(DataLoaderRegistry otherRegistry) {
161+
dataLoaders.putAll(otherRegistry.getDataLoadersMap());
162+
return this;
163+
}
164+
165+
/**
166+
* @return the newly built {@link ScheduledDataLoaderRegistry}
167+
*/
168+
public ScheduledDataLoaderRegistry build() {
169+
return new ScheduledDataLoaderRegistry(this);
170+
}
171+
}
172+
}

src/test/java/ReadmeExamples.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010
import org.dataloader.fixtures.SecurityCtx;
1111
import org.dataloader.fixtures.User;
1212
import org.dataloader.fixtures.UserManager;
13+
import org.dataloader.registries.DispatchPredicate;
14+
import org.dataloader.registries.ScheduledDataLoaderRegistry;
1315
import org.dataloader.stats.Statistics;
1416
import org.dataloader.stats.ThreadLocalStatisticsCollector;
1517

18+
import java.time.Duration;
1619
import java.util.ArrayList;
1720
import java.util.List;
1821
import java.util.Map;
@@ -269,4 +272,14 @@ private void statsConfigExample() {
269272
DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options);
270273
}
271274

275+
private void ScheduledDispatche() {
276+
DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10)
277+
.or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200)));
278+
279+
ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry()
280+
.dispatchPredicate(depthOrTimePredicate)
281+
.schedule(Duration.ofMillis(10))
282+
.register("users", userDataLoader)
283+
.build();
284+
}
272285
}

src/test/java/org/dataloader/ClockDataLoader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
public class ClockDataLoader<K, V> extends DataLoader<K, V> {
66

7-
ClockDataLoader(Object batchLoadFunction, Clock clock) {
7+
public ClockDataLoader(Object batchLoadFunction, Clock clock) {
88
this(batchLoadFunction, null, clock);
99
}
1010

11-
ClockDataLoader(Object batchLoadFunction, DataLoaderOptions options, Clock clock) {
11+
public ClockDataLoader(Object batchLoadFunction, DataLoaderOptions options, Clock clock) {
1212
super(batchLoadFunction, options, clock);
1313
}
1414

src/test/java/org/dataloader/fixtures/TestKit.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public static <K, V> BatchLoader<K, V> keysAsValues(List<List<K>> loadCalls) {
3131
};
3232
}
3333

34+
public static <K, V> DataLoader<K, V> idLoader() {
35+
return idLoader(null, new ArrayList<>());
36+
}
37+
3438
public static <K, V> DataLoader<K, V> idLoader(List<List<K>> loadCalls) {
3539
return idLoader(null, loadCalls);
3640
}

0 commit comments

Comments
 (0)