Skip to content

Commit 6523015

Browse files
committed
made them non public and created a static factory support class
1 parent e98621b commit 6523015

6 files changed

+58
-35
lines changed

src/main/java/org/dataloader/DataLoaderHelper.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
import org.dataloader.annotations.GuardedBy;
44
import org.dataloader.annotations.Internal;
55
import org.dataloader.impl.CompletableFutureKit;
6-
import org.dataloader.reactive.HelperIntegration;
7-
import org.dataloader.reactive.MappedBatchSubscriber;
8-
import org.dataloader.reactive.BatchSubscriber;
6+
import org.dataloader.reactive.ReactiveSupport;
97
import org.dataloader.scheduler.BatchLoaderScheduler;
108
import org.dataloader.stats.StatisticsCollector;
119
import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext;
@@ -510,7 +508,7 @@ private CompletableFuture<List<V>> invokeMapBatchLoader(List<K> keys, BatchLoade
510508

511509
private CompletableFuture<List<V>> invokeBatchPublisher(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures, BatchLoaderEnvironment environment) {
512510
CompletableFuture<List<V>> loadResult = new CompletableFuture<>();
513-
Subscriber<V> subscriber = new BatchSubscriber<>(loadResult, keys, keyContexts, queuedFutures, helperIntegration());
511+
Subscriber<V> subscriber = ReactiveSupport.batchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration());
514512

515513
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
516514
if (batchLoadFunction instanceof BatchPublisherWithContext) {
@@ -535,8 +533,8 @@ private CompletableFuture<List<V>> invokeBatchPublisher(List<K> keys, List<Objec
535533
return loadResult;
536534
}
537535

538-
private HelperIntegration<K> helperIntegration() {
539-
return new HelperIntegration<>() {
536+
private ReactiveSupport.HelperIntegration<K> helperIntegration() {
537+
return new ReactiveSupport.HelperIntegration<>() {
540538
@Override
541539
public StatisticsCollector getStats() {
542540
return stats;
@@ -556,7 +554,7 @@ public void clearCacheEntriesOnExceptions(List<K> keys) {
556554

557555
private CompletableFuture<List<V>> invokeMappedBatchPublisher(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures, BatchLoaderEnvironment environment) {
558556
CompletableFuture<List<V>> loadResult = new CompletableFuture<>();
559-
Subscriber<Map.Entry<K, V>> subscriber = new MappedBatchSubscriber<>(loadResult, keys, keyContexts, queuedFutures, helperIntegration());
557+
Subscriber<Map.Entry<K, V>> subscriber = ReactiveSupport.mappedBatchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration());
560558

561559
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
562560
if (batchLoadFunction instanceof MappedBatchPublisherWithContext) {

src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ abstract class AbstractBatchSubscriber<K, V, T> implements Subscriber<T> {
2424
final List<K> keys;
2525
final List<Object> callContexts;
2626
final List<CompletableFuture<V>> queuedFutures;
27-
final HelperIntegration<K> helperIntegration;
27+
final ReactiveSupport.HelperIntegration<K> helperIntegration;
2828

2929
List<K> clearCacheKeys = new ArrayList<>();
3030
List<V> completedValues = new ArrayList<>();
@@ -36,7 +36,7 @@ abstract class AbstractBatchSubscriber<K, V, T> implements Subscriber<T> {
3636
List<K> keys,
3737
List<Object> callContexts,
3838
List<CompletableFuture<V>> queuedFutures,
39-
HelperIntegration<K> helperIntegration
39+
ReactiveSupport.HelperIntegration<K> helperIntegration
4040
) {
4141
this.valuesFuture = valuesFuture;
4242
this.keys = keys;

src/main/java/org/dataloader/reactive/BatchSubscriber.java renamed to src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515
* @param <K> the type of keys
1616
* @param <V> the type of values
1717
*/
18-
public class BatchSubscriber<K, V> extends AbstractBatchSubscriber<K, V, V> {
18+
class BatchSubscriberImpl<K, V> extends AbstractBatchSubscriber<K, V, V> {
1919

2020
private int idx = 0;
2121

22-
public BatchSubscriber(
22+
BatchSubscriberImpl(
2323
CompletableFuture<List<V>> valuesFuture,
2424
List<K> keys,
2525
List<Object> callContexts,
2626
List<CompletableFuture<V>> queuedFutures,
27-
HelperIntegration<K> helperIntegration
27+
ReactiveSupport.HelperIntegration<K> helperIntegration
2828
) {
2929
super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration);
3030
}

src/main/java/org/dataloader/reactive/HelperIntegration.java

-19
This file was deleted.

src/main/java/org/dataloader/reactive/MappedBatchSubscriber.java renamed to src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,19 @@
1515
* @param <K> the type of keys
1616
* @param <V> the type of values
1717
*/
18-
public class MappedBatchSubscriber<K, V> extends AbstractBatchSubscriber<K, V, Map.Entry<K, V>> {
18+
class MappedBatchSubscriberImpl<K, V> extends AbstractBatchSubscriber<K, V, Map.Entry<K, V>> {
1919

2020
private final Map<K, Object> callContextByKey;
2121
private final Map<K, List<CompletableFuture<V>>> queuedFuturesByKey;
2222
private final Map<K, V> completedValuesByKey = new HashMap<>();
2323

2424

25-
public MappedBatchSubscriber(
25+
MappedBatchSubscriberImpl(
2626
CompletableFuture<List<V>> valuesFuture,
2727
List<K> keys,
2828
List<Object> callContexts,
2929
List<CompletableFuture<V>> queuedFutures,
30-
HelperIntegration<K> helperIntegration
31-
30+
ReactiveSupport.HelperIntegration<K> helperIntegration
3231
) {
3332
super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration);
3433
this.callContextByKey = new HashMap<>();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.dataloader.reactive;
2+
3+
import org.dataloader.stats.StatisticsCollector;
4+
import org.reactivestreams.Subscriber;
5+
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.concurrent.CompletableFuture;
9+
10+
public class ReactiveSupport {
11+
12+
public static <K, V> Subscriber<V> batchSubscriber(
13+
CompletableFuture<List<V>> valuesFuture,
14+
List<K> keys,
15+
List<Object> callContexts,
16+
List<CompletableFuture<V>> queuedFutures,
17+
ReactiveSupport.HelperIntegration<K> helperIntegration
18+
) {
19+
return new BatchSubscriberImpl<>(valuesFuture, keys, callContexts, queuedFutures, helperIntegration);
20+
}
21+
22+
public static <K, V> Subscriber<Map.Entry<K, V>> mappedBatchSubscriber(
23+
CompletableFuture<List<V>> valuesFuture,
24+
List<K> keys,
25+
List<Object> callContexts,
26+
List<CompletableFuture<V>> queuedFutures,
27+
ReactiveSupport.HelperIntegration<K> helperIntegration
28+
) {
29+
return new MappedBatchSubscriberImpl<>(valuesFuture, keys, callContexts, queuedFutures, helperIntegration);
30+
}
31+
32+
/**
33+
* Just some callbacks to the data loader code to do common tasks
34+
*
35+
* @param <K> for keys
36+
*/
37+
public interface HelperIntegration<K> {
38+
39+
StatisticsCollector getStats();
40+
41+
void clearCacheView(K key);
42+
43+
void clearCacheEntriesOnExceptions(List<K> keys);
44+
}
45+
}

0 commit comments

Comments
 (0)