Skip to content

Commit 6d3c4eb

Browse files
committed
Making the Subscribers use a common base class - merged in main branch
1 parent 86ec5c8 commit 6d3c4eb

File tree

1 file changed

+20
-13
lines changed

1 file changed

+20
-13
lines changed

src/main/java/org/dataloader/DataLoaderHelper.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,13 @@ CompletableFuture<V> load(K key, Object loadContext) {
155155
}
156156
}
157157

158+
@SuppressWarnings("unchecked")
158159
Object getCacheKey(K key) {
159160
return loaderOptions.cacheKeyFunction().isPresent() ?
160161
loaderOptions.cacheKeyFunction().get().getKey(key) : key;
161162
}
162163

164+
@SuppressWarnings("unchecked")
163165
Object getCacheKeyWithContext(K key, Object context) {
164166
return loaderOptions.cacheKeyFunction().isPresent() ?
165167
loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key;
@@ -511,6 +513,7 @@ private CompletableFuture<List<V>> invokeBatchPublisher(List<K> keys, List<Objec
511513

512514
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
513515
if (batchLoadFunction instanceof BatchPublisherWithContext) {
516+
//noinspection unchecked
514517
BatchPublisherWithContext<K, V> loadFunction = (BatchPublisherWithContext<K, V>) batchLoadFunction;
515518
if (batchLoaderScheduler != null) {
516519
BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber, environment);
@@ -519,6 +522,7 @@ private CompletableFuture<List<V>> invokeBatchPublisher(List<K> keys, List<Objec
519522
loadFunction.load(keys, subscriber, environment);
520523
}
521524
} else {
525+
//noinspection unchecked
522526
BatchPublisher<K, V> loadFunction = (BatchPublisher<K, V>) batchLoadFunction;
523527
if (batchLoaderScheduler != null) {
524528
BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber);
@@ -536,6 +540,7 @@ private CompletableFuture<List<V>> invokeMappedBatchPublisher(List<K> keys, List
536540

537541
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
538542
if (batchLoadFunction instanceof MappedBatchPublisherWithContext) {
543+
//noinspection unchecked
539544
MappedBatchPublisherWithContext<K, V> loadFunction = (MappedBatchPublisherWithContext<K, V>) batchLoadFunction;
540545
if (batchLoaderScheduler != null) {
541546
BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber, environment);
@@ -544,6 +549,7 @@ private CompletableFuture<List<V>> invokeMappedBatchPublisher(List<K> keys, List
544549
loadFunction.load(keys, subscriber, environment);
545550
}
546551
} else {
552+
//noinspection unchecked
547553
MappedBatchPublisher<K, V> loadFunction = (MappedBatchPublisher<K, V>) batchLoadFunction;
548554
if (batchLoaderScheduler != null) {
549555
BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber);
@@ -670,21 +676,21 @@ public void onError(Throwable throwable) {
670676
/*
671677
* A value has arrived - how do we complete the future that's associated with it in a common way
672678
*/
673-
void onNextValue(K key, V value, Object callContext, CompletableFuture<V> future) {
679+
void onNextValue(K key, V value, Object callContext, List<CompletableFuture<V>> futures) {
674680
if (value instanceof Try) {
675681
// we allow the batch loader to return a Try so we can better represent a computation
676682
// that might have worked or not.
677683
//noinspection unchecked
678684
Try<V> tryValue = (Try<V>) value;
679685
if (tryValue.isSuccess()) {
680-
future.complete(tryValue.get());
686+
futures.forEach(f -> f.complete(tryValue.get()));
681687
} else {
682688
stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext));
683-
future.completeExceptionally(tryValue.getThrowable());
689+
futures.forEach(f -> f.completeExceptionally(tryValue.getThrowable()));
684690
clearCacheKeys.add(key);
685691
}
686692
} else {
687-
future.complete(value);
693+
futures.forEach(f -> f.complete(value));
688694
}
689695
}
690696

@@ -718,7 +724,7 @@ public synchronized void onNext(V value) {
718724
K key = keys.get(idx);
719725
Object callContext = callContexts.get(idx);
720726
CompletableFuture<V> future = queuedFutures.get(idx);
721-
onNextValue(key, value, callContext, future);
727+
onNextValue(key, value, callContext, List.of(future));
722728

723729
completedValues.add(value);
724730
idx++;
@@ -754,7 +760,7 @@ public synchronized void onError(Throwable ex) {
754760
private class DataLoaderMapEntrySubscriber extends DataLoaderSubscriberBase<Map.Entry<K, V>> {
755761

756762
private final Map<K, Object> callContextByKey;
757-
private final Map<K, CompletableFuture<V>> queuedFutureByKey;
763+
private final Map<K, List<CompletableFuture<V>>> queuedFuturesByKey;
758764
private final Map<K, V> completedValuesByKey = new HashMap<>();
759765

760766

@@ -766,13 +772,13 @@ private DataLoaderMapEntrySubscriber(
766772
) {
767773
super(valuesFuture, keys, callContexts, queuedFutures);
768774
this.callContextByKey = new HashMap<>();
769-
this.queuedFutureByKey = new HashMap<>();
775+
this.queuedFuturesByKey = new HashMap<>();
770776
for (int idx = 0; idx < queuedFutures.size(); idx++) {
771777
K key = keys.get(idx);
772778
Object callContext = callContexts.get(idx);
773779
CompletableFuture<V> queuedFuture = queuedFutures.get(idx);
774780
callContextByKey.put(key, callContext);
775-
queuedFutureByKey.put(key, queuedFuture);
781+
queuedFuturesByKey.computeIfAbsent(key, k -> new ArrayList<>()).add(queuedFuture);
776782
}
777783
}
778784

@@ -784,9 +790,9 @@ public synchronized void onNext(Map.Entry<K, V> entry) {
784790
V value = entry.getValue();
785791

786792
Object callContext = callContextByKey.get(key);
787-
CompletableFuture<V> future = queuedFutureByKey.get(key);
793+
List<CompletableFuture<V>> futures = queuedFuturesByKey.get(key);
788794

789-
onNextValue(key, value, callContext, future);
795+
onNextValue(key, value, callContext, futures);
790796

791797
completedValuesByKey.put(key, value);
792798
}
@@ -811,15 +817,16 @@ public synchronized void onError(Throwable ex) {
811817
// Complete the futures for the remaining keys with the exception.
812818
for (int idx = 0; idx < queuedFutures.size(); idx++) {
813819
K key = keys.get(idx);
814-
CompletableFuture<V> future = queuedFutureByKey.get(key);
820+
List<CompletableFuture<V>> futures = queuedFuturesByKey.get(key);
815821
if (!completedValuesByKey.containsKey(key)) {
816-
future.completeExceptionally(ex);
822+
for (CompletableFuture<V> future : futures) {
823+
future.completeExceptionally(ex);
824+
}
817825
// clear any cached view of this key because they all failed
818826
dataLoader.clear(key);
819827
}
820828
}
821829
valuesFuture.completeExceptionally(ex);
822830
}
823-
824831
}
825832
}

0 commit comments

Comments
 (0)