Skip to content

Commit f2457ec

Browse files
authored
Merge pull request #87 from graphql-java/scheduled-dataloader-registry
Adding a ScheduledDataLoaderRegistry
2 parents 7e2e609 + 80dafa1 commit f2457ec

File tree

10 files changed

+722
-3
lines changed

10 files changed

+722
-3
lines changed

README.md

+27
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,33 @@ and there are also gains to this different mode of operation:
478478
However, with batch execution control comes responsibility! If you forget to make the call to `dispatch()` then the futures
479479
in the load request queue will never be batched, and thus _will never complete_! So be careful when crafting your loader designs.
480480

481+
## Scheduled Dispatching
482+
483+
`ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a
484+
predicate that is evaluated (per data loader contained within) when `dispatchAll` is invoked.
485+
486+
If that predicate is true, it will make a `dispatch` call on the data loader, otherwise is will schedule a task to
487+
perform that check again. Once a predicate evaluated to true, it will not reschedule and another call to
488+
`dispatchAll` is required to be made.
489+
490+
This allows you to do things like "dispatch ONLY if the queue depth is > 10 deep or more than 200 millis have passed
491+
since it was last dispatched".
492+
493+
```java
494+
495+
DispatchPredicate depthOrTimePredicate = DispatchPredicate
496+
.dispatchIfDepthGreaterThan(10)
497+
.or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200)));
498+
499+
ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry()
500+
.dispatchPredicate(depthOrTimePredicate)
501+
.schedule(Duration.ofMillis(10))
502+
.register("users",userDataLoader)
503+
.build();
504+
```
505+
506+
The above acts as a kind of minimum batch depth, with a time overload. It won't dispatch if the loader depth is less
507+
than or equal to 10 but if 200ms pass it will dispatch.
481508

482509
## Let's get started!
483510

src/main/java/org/dataloader/DataLoaderRegistry.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.ArrayList;
77
import java.util.HashMap;
88
import java.util.HashSet;
9+
import java.util.LinkedHashMap;
910
import java.util.List;
1011
import java.util.Map;
1112
import java.util.Set;
@@ -19,7 +20,7 @@
1920
*/
2021
@PublicApi
2122
public class DataLoaderRegistry {
22-
private final Map<String, DataLoader<?, ?>> dataLoaders = new ConcurrentHashMap<>();
23+
protected final Map<String, DataLoader<?, ?>> dataLoaders = new ConcurrentHashMap<>();
2324

2425
public DataLoaderRegistry() {
2526
}
@@ -28,6 +29,7 @@ private DataLoaderRegistry(Builder builder) {
2829
this.dataLoaders.putAll(builder.dataLoaders);
2930
}
3031

32+
3133
/**
3234
* This will register a new dataloader
3335
*
@@ -84,6 +86,13 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) {
8486
return new ArrayList<>(dataLoaders.values());
8587
}
8688

89+
/**
90+
* @return the currently registered data loaders as a map
91+
*/
92+
public Map<String, DataLoader<?, ?>> getDataLoadersMap() {
93+
return new LinkedHashMap<>(dataLoaders);
94+
}
95+
8796
/**
8897
* This will unregister a new dataloader
8998
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.dataloader.annotations;
2+
3+
import java.lang.annotation.Documented;
4+
import java.lang.annotation.Retention;
5+
import java.lang.annotation.RetentionPolicy;
6+
import java.lang.annotation.Target;
7+
8+
import static java.lang.annotation.ElementType.CONSTRUCTOR;
9+
import static java.lang.annotation.ElementType.FIELD;
10+
import static java.lang.annotation.ElementType.METHOD;
11+
import static java.lang.annotation.ElementType.TYPE;
12+
13+
/**
14+
* This represents code that the graphql-java project considers experimental API and while our intention is that it will
15+
* progress to be {@link PublicApi}, its existence, signature of behavior may change between releases.
16+
*
17+
* In general unnecessary changes will be avoided but you should not depend on experimental classes being stable
18+
*/
19+
@Retention(RetentionPolicy.RUNTIME)
20+
@Target(value = {CONSTRUCTOR, METHOD, TYPE, FIELD})
21+
@Documented
22+
public @interface ExperimentalApi {
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package org.dataloader.registries;
2+
3+
import org.dataloader.DataLoader;
4+
5+
import java.time.Duration;
6+
import java.util.Objects;
7+
8+
/**
9+
* A predicate class used by {@link ScheduledDataLoaderRegistry} to decide whether to dispatch or not
10+
*/
11+
@FunctionalInterface
12+
public interface DispatchPredicate {
13+
/**
14+
* This predicate tests whether the data loader should be dispatched or not.
15+
*
16+
* @param dataLoaderKey the key of the data loader when registered
17+
* @param dataLoader the dataloader to dispatch
18+
*
19+
* @return true if the data loader SHOULD be dispatched
20+
*/
21+
boolean test(String dataLoaderKey, DataLoader<?, ?> dataLoader);
22+
23+
24+
/**
25+
* Returns a composed predicate that represents a short-circuiting logical
26+
* AND of this predicate and another.
27+
*
28+
* @param other a predicate that will be logically-ANDed with this
29+
* predicate
30+
*
31+
* @return a composed predicate that represents the short-circuiting logical
32+
* AND of this predicate and the {@code other} predicate
33+
*/
34+
default DispatchPredicate and(DispatchPredicate other) {
35+
Objects.requireNonNull(other);
36+
return (k, dl) -> test(k, dl) && other.test(k, dl);
37+
}
38+
39+
/**
40+
* Returns a predicate that represents the logical negation of this
41+
* predicate.
42+
*
43+
* @return a predicate that represents the logical negation of this
44+
* predicate
45+
*/
46+
default DispatchPredicate negate() {
47+
return (k, dl) -> !test(k, dl);
48+
}
49+
50+
/**
51+
* Returns a composed predicate that represents a short-circuiting logical
52+
* OR of this predicate and another.
53+
*
54+
* @param other a predicate that will be logically-ORed with this
55+
* predicate
56+
*
57+
* @return a composed predicate that represents the short-circuiting logical
58+
* OR of this predicate and the {@code other} predicate
59+
*/
60+
default DispatchPredicate or(DispatchPredicate other) {
61+
Objects.requireNonNull(other);
62+
return (k, dl) -> test(k, dl) || other.test(k, dl);
63+
}
64+
65+
/**
66+
* This predicate will return true if the {@link DataLoader} has not be dispatched
67+
* for at least the duration length of time.
68+
*
69+
* @param duration the length of time to check
70+
*
71+
* @return true if the data loader has not been dispatched in duration time
72+
*/
73+
static DispatchPredicate dispatchIfLongerThan(Duration duration) {
74+
return (dataLoaderKey, dataLoader) -> {
75+
int i = dataLoader.getTimeSinceDispatch().compareTo(duration);
76+
return i > 0;
77+
};
78+
}
79+
80+
/**
81+
* This predicate will return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth.
82+
*
83+
* This will act as minimum batch size. There must be more than `depth` items queued for the predicate to return true.
84+
*
85+
* @param depth the value to be greater than
86+
*
87+
* @return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth.
88+
*/
89+
static DispatchPredicate dispatchIfDepthGreaterThan(int depth) {
90+
return (dataLoaderKey, dataLoader) -> dataLoader.dispatchDepth() > depth;
91+
}
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package org.dataloader.registries;
2+
3+
import org.dataloader.DataLoader;
4+
import org.dataloader.DataLoaderRegistry;
5+
import org.dataloader.annotations.ExperimentalApi;
6+
7+
import java.time.Duration;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.concurrent.Executors;
11+
import java.util.concurrent.ScheduledExecutorService;
12+
import java.util.concurrent.TimeUnit;
13+
14+
import static org.dataloader.impl.Assertions.nonNull;
15+
16+
/**
17+
* This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called
18+
* to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled
19+
* to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
20+
* <p>
21+
* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
22+
* no rescheduling will occur and you will need to call dispatch again to restart the process.
23+
* <p>
24+
* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
25+
* call {@link #rescheduleNow()}.
26+
* <p>
27+
* This code is currently marked as {@link ExperimentalApi}
28+
*/
29+
@ExperimentalApi
30+
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable {
31+
32+
private final ScheduledExecutorService scheduledExecutorService;
33+
private final DispatchPredicate dispatchPredicate;
34+
private final Duration schedule;
35+
private volatile boolean closed;
36+
37+
private ScheduledDataLoaderRegistry(Builder builder) {
38+
this.dataLoaders.putAll(builder.dataLoaders);
39+
this.scheduledExecutorService = builder.scheduledExecutorService;
40+
this.dispatchPredicate = builder.dispatchPredicate;
41+
this.schedule = builder.schedule;
42+
this.closed = false;
43+
}
44+
45+
/**
46+
* Once closed this registry will never again reschedule checks
47+
*/
48+
@Override
49+
public void close() {
50+
closed = true;
51+
}
52+
53+
/**
54+
* @return how long the {@link ScheduledExecutorService} task will wait before checking the predicate again
55+
*/
56+
public Duration getScheduleDuration() {
57+
return schedule;
58+
}
59+
60+
@Override
61+
public void dispatchAll() {
62+
dispatchAllWithCount();
63+
}
64+
65+
@Override
66+
public int dispatchAllWithCount() {
67+
int sum = 0;
68+
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
69+
DataLoader<?, ?> dataLoader = entry.getValue();
70+
String key = entry.getKey();
71+
if (dispatchPredicate.test(key, dataLoader)) {
72+
sum += dataLoader.dispatchWithCounts().getKeysCount();
73+
} else {
74+
reschedule(key, dataLoader);
75+
}
76+
}
77+
return sum;
78+
}
79+
80+
/**
81+
* This will immediately dispatch the {@link DataLoader}s in the registry
82+
* without testing the predicate
83+
*/
84+
public void dispatchAllImmediately() {
85+
super.dispatchAll();
86+
}
87+
88+
/**
89+
* This will immediately dispatch the {@link DataLoader}s in the registry
90+
* without testing the predicate
91+
*
92+
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
93+
*/
94+
public int dispatchAllWithCountImmediately() {
95+
return super.dispatchAllWithCount();
96+
}
97+
98+
/**
99+
* This will schedule a task to check the predicate and dispatch if true right now. It will not do
100+
* a pre check of the preodicate like {@link #dispatchAll()} would
101+
*/
102+
public void rescheduleNow() {
103+
dataLoaders.forEach(this::reschedule);
104+
}
105+
106+
private void reschedule(String key, DataLoader<?, ?> dataLoader) {
107+
if (!closed) {
108+
Runnable runThis = () -> dispatchOrReschedule(key, dataLoader);
109+
scheduledExecutorService.schedule(runThis, schedule.toMillis(), TimeUnit.MILLISECONDS);
110+
}
111+
}
112+
113+
private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
114+
if (dispatchPredicate.test(key, dataLoader)) {
115+
dataLoader.dispatch();
116+
} else {
117+
reschedule(key, dataLoader);
118+
}
119+
}
120+
121+
/**
122+
* By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
123+
* and a schedule duration of 10 milli seconds.
124+
*
125+
* @return A builder of {@link ScheduledDataLoaderRegistry}s
126+
*/
127+
public static Builder newScheduledRegistry() {
128+
return new Builder();
129+
}
130+
131+
public static class Builder {
132+
133+
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
134+
private DispatchPredicate dispatchPredicate = (key, dl) -> true;
135+
private Duration schedule = Duration.ofMillis(10);
136+
private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();
137+
138+
public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
139+
this.scheduledExecutorService = nonNull(executorService);
140+
return this;
141+
}
142+
143+
public Builder schedule(Duration schedule) {
144+
this.schedule = schedule;
145+
return this;
146+
}
147+
148+
public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
149+
this.dispatchPredicate = nonNull(dispatchPredicate);
150+
return this;
151+
}
152+
153+
/**
154+
* This will register a new dataloader
155+
*
156+
* @param key the key to put the data loader under
157+
* @param dataLoader the data loader to register
158+
*
159+
* @return this builder for a fluent pattern
160+
*/
161+
public Builder register(String key, DataLoader<?, ?> dataLoader) {
162+
dataLoaders.put(key, dataLoader);
163+
return this;
164+
}
165+
166+
/**
167+
* This will combine together the data loaders in this builder with the ones
168+
* from a previous {@link DataLoaderRegistry}
169+
*
170+
* @param otherRegistry the previous {@link DataLoaderRegistry}
171+
*
172+
* @return this builder for a fluent pattern
173+
*/
174+
public Builder registerAll(DataLoaderRegistry otherRegistry) {
175+
dataLoaders.putAll(otherRegistry.getDataLoadersMap());
176+
return this;
177+
}
178+
179+
/**
180+
* @return the newly built {@link ScheduledDataLoaderRegistry}
181+
*/
182+
public ScheduledDataLoaderRegistry build() {
183+
return new ScheduledDataLoaderRegistry(this);
184+
}
185+
}
186+
}

0 commit comments

Comments
 (0)