Skip to content

Commit c969e3c

Browse files
committed
Added close support
1 parent 4675d0e commit c969e3c

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,27 @@
2525
* call {@link #rescheduleNow()}.
2626
*/
2727
@PublicApi
28-
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry {
28+
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable {
2929

3030
private final ScheduledExecutorService scheduledExecutorService;
3131
private final DispatchPredicate dispatchPredicate;
3232
private final Duration schedule;
33+
private volatile boolean closed;
3334

3435
private ScheduledDataLoaderRegistry(Builder builder) {
3536
this.dataLoaders.putAll(builder.dataLoaders);
3637
this.scheduledExecutorService = builder.scheduledExecutorService;
3738
this.dispatchPredicate = builder.dispatchPredicate;
3839
this.schedule = builder.schedule;
40+
this.closed = false;
41+
}
42+
43+
/**
44+
* Once closed this registry will never again reschedule checks
45+
*/
46+
@Override
47+
public void close() {
48+
closed = true;
3949
}
4050

4151
/**
@@ -92,8 +102,10 @@ public void rescheduleNow() {
92102
}
93103

94104
private void reschedule(String key, DataLoader<?, ?> dataLoader) {
95-
Runnable runThis = () -> dispatchOrReschedule(key, dataLoader);
96-
scheduledExecutorService.schedule(runThis, schedule.toMillis(), TimeUnit.MILLISECONDS);
105+
if (!closed) {
106+
Runnable runThis = () -> dispatchOrReschedule(key, dataLoader);
107+
scheduledExecutorService.schedule(runThis, schedule.toMillis(), TimeUnit.MILLISECONDS);
108+
}
97109
}
98110

99111
private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {

src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,49 @@ public void test_it_will_take_out_the_schedule_once_it_dispatches() {
212212
snooze(2000);
213213

214214
assertThat(calls, equalTo(asList(asList("K1", "K2"), asList("K3", "K4"))));
215+
}
216+
217+
public void test_close_is_a_one_way_door() {
218+
AtomicInteger counter = new AtomicInteger();
219+
DispatchPredicate countingPredicate = (dataLoaderKey, dataLoader) -> {
220+
counter.incrementAndGet();
221+
return false;
222+
};
223+
224+
DataLoader<String, String> dlA = TestKit.idLoader();
225+
dlA.load("K1");
226+
dlA.load("K2");
227+
228+
ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry()
229+
.register("a", dlA)
230+
.dispatchPredicate(countingPredicate)
231+
.schedule(Duration.ofMillis(10))
232+
.build();
233+
234+
registry.rescheduleNow();
215235

236+
snooze(200);
237+
238+
assertTrue(counter.get() > 0);
239+
240+
registry.close();
241+
242+
snooze(100);
243+
int countThen = counter.get();
244+
245+
registry.rescheduleNow();
246+
snooze(200);
247+
assertEquals(counter.get(), countThen);
216248

249+
registry.rescheduleNow();
250+
snooze(200);
251+
assertEquals(counter.get(), countThen);
252+
253+
registry.dispatchAll();
254+
snooze(200);
255+
assertEquals(counter.get(), countThen + 1); // will have re-entered
256+
257+
snooze(200);
258+
assertEquals(counter.get(), countThen + 1);
217259
}
218260
}

0 commit comments

Comments
 (0)