Skip to content

Commit 26f91ee

Browse files
committed
Now with better tests and synchronisation that is more complete
1 parent 59d81fe commit 26f91ee

File tree

2 files changed

+47
-29
lines changed

2 files changed

+47
-29
lines changed

src/main/java/org/dataloader/DataLoader.java

+27-28
Original file line numberDiff line numberDiff line change
@@ -176,38 +176,37 @@ private CacheMap<Object, CompletableFuture<V>> determineCacheMap(DataLoaderOptio
176176
* @return the future of the value
177177
*/
178178
public CompletableFuture<V> load(K key) {
179-
Object cacheKey = getCacheKey(nonNull(key));
180-
stats.incrementLoadCount();
179+
synchronized (this) {
180+
Object cacheKey = getCacheKey(nonNull(key));
181+
stats.incrementLoadCount();
181182

182-
if (loaderOptions.cachingEnabled()) {
183-
synchronized (futureCache) {
183+
boolean batchingEnabled = loaderOptions.batchingEnabled();
184+
boolean cachingEnabled = loaderOptions.cachingEnabled();
185+
186+
if (cachingEnabled) {
184187
if (futureCache.containsKey(cacheKey)) {
185188
stats.incrementCacheHitCount();
186189
return futureCache.get(cacheKey);
187190
}
188191
}
189-
}
190192

191-
CompletableFuture<V> future = new CompletableFuture<>();
192-
if (loaderOptions.batchingEnabled()) {
193-
synchronized (loaderQueue) {
193+
CompletableFuture<V> future = new CompletableFuture<>();
194+
if (batchingEnabled) {
194195
loaderQueue.add(new SimpleImmutableEntry<>(key, future));
196+
} else {
197+
stats.incrementBatchLoadCountBy(1);
198+
// immediate execution of batch function
199+
CompletableFuture<List<V>> batchedLoad = batchLoadFunction
200+
.load(singletonList(key))
201+
.toCompletableFuture();
202+
future = batchedLoad
203+
.thenApply(list -> list.get(0));
195204
}
196-
} else {
197-
stats.incrementBatchLoadCountBy(1);
198-
// immediate execution of batch function
199-
CompletableFuture<List<V>> batchedLoad = batchLoadFunction
200-
.load(singletonList(key))
201-
.toCompletableFuture();
202-
future = batchedLoad
203-
.thenApply(list -> list.get(0));
204-
}
205-
if (loaderOptions.cachingEnabled()) {
206-
synchronized (futureCache) {
205+
if (cachingEnabled) {
207206
futureCache.set(cacheKey, future);
208207
}
208+
return future;
209209
}
210-
return future;
211210
}
212211

213212
/**
@@ -223,8 +222,7 @@ public CompletableFuture<V> load(K key) {
223222
* @return the composite future of the list of values
224223
*/
225224
public CompletableFuture<List<V>> loadMany(List<K> keys) {
226-
synchronized (loaderQueue) {
227-
225+
synchronized (this) {
228226
List<CompletableFuture<V>> collect = keys.stream()
229227
.map(this::load)
230228
.collect(Collectors.toList());
@@ -241,18 +239,19 @@ public CompletableFuture<List<V>> loadMany(List<K> keys) {
241239
* @return the promise of the queued load requests
242240
*/
243241
public CompletableFuture<List<V>> dispatch() {
242+
boolean batchingEnabled = loaderOptions.batchingEnabled();
244243
//
245244
// we copy the pre-loaded set of futures ready for dispatch
246245
final List<K> keys = new ArrayList<>();
247246
final List<CompletableFuture<V>> queuedFutures = new ArrayList<>();
248-
synchronized (loaderQueue) {
247+
synchronized (this) {
249248
loaderQueue.forEach(entry -> {
250249
keys.add(entry.getKey());
251250
queuedFutures.add(entry.getValue());
252251
});
253252
loaderQueue.clear();
254253
}
255-
if (!loaderOptions.batchingEnabled() || keys.size() == 0) {
254+
if (!batchingEnabled || keys.size() == 0) {
256255
return CompletableFuture.completedFuture(emptyList());
257256
}
258257
//
@@ -375,7 +374,7 @@ public List<V> dispatchAndJoin() {
375374
* @return the depth of the batched key loads that need to be dispatched
376375
*/
377376
public int dispatchDepth() {
378-
synchronized (loaderQueue) {
377+
synchronized (this) {
379378
return loaderQueue.size();
380379
}
381380
}
@@ -391,7 +390,7 @@ public int dispatchDepth() {
391390
*/
392391
public DataLoader<K, V> clear(K key) {
393392
Object cacheKey = getCacheKey(key);
394-
synchronized (futureCache) {
393+
synchronized (this) {
395394
futureCache.delete(cacheKey);
396395
}
397396
return this;
@@ -403,7 +402,7 @@ public DataLoader<K, V> clear(K key) {
403402
* @return the data loader for fluent coding
404403
*/
405404
public DataLoader<K, V> clearAll() {
406-
synchronized (futureCache) {
405+
synchronized (this) {
407406
futureCache.clear();
408407
}
409408
return this;
@@ -419,7 +418,7 @@ public DataLoader<K, V> clearAll() {
419418
*/
420419
public DataLoader<K, V> prime(K key, V value) {
421420
Object cacheKey = getCacheKey(key);
422-
synchronized (futureCache) {
421+
synchronized (this) {
423422
if (!futureCache.containsKey(cacheKey)) {
424423
futureCache.set(cacheKey, CompletableFuture.completedFuture(value));
425424
}

src/test/java/org/dataloader/DataLoaderTest.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -575,8 +575,9 @@ public void should_Disable_caching() throws ExecutionException, InterruptedExcep
575575
assertThat(loadCalls, equalTo(asList(asList("A", "B"),
576576
asList("A", "C"), asList("A", "B", "C"))));
577577
}
578+
578579
@Test
579-
public void should_work_with_duplicate_keys() throws ExecutionException, InterruptedException {
580+
public void should_work_with_duplicate_keys_when_caching_disabled() throws ExecutionException, InterruptedException {
580581
List<Collection<String>> loadCalls = new ArrayList<>();
581582
DataLoader<String, String> identityLoader =
582583
idLoader(newOptions().setCachingEnabled(false), loadCalls);
@@ -593,6 +594,24 @@ public void should_work_with_duplicate_keys() throws ExecutionException, Interru
593594
assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A"))));
594595
}
595596

597+
@Test
598+
public void should_work_with_duplicate_keys_when_caching_enabled() throws ExecutionException, InterruptedException {
599+
List<Collection<String>> loadCalls = new ArrayList<>();
600+
DataLoader<String, String> identityLoader =
601+
idLoader(newOptions().setCachingEnabled(true), loadCalls);
602+
603+
CompletableFuture<String> future1 = identityLoader.load("A");
604+
CompletableFuture<String> future2 = identityLoader.load("B");
605+
CompletableFuture<String> future3 = identityLoader.load("A");
606+
identityLoader.dispatch();
607+
608+
await().until(() -> future1.isDone() && future2.isDone() && future3.isDone());
609+
assertThat(future1.get(), equalTo("A"));
610+
assertThat(future2.get(), equalTo("B"));
611+
assertThat(future3.get(), equalTo("A"));
612+
assertThat(loadCalls, equalTo(singletonList(asList("A", "B"))));
613+
}
614+
596615
// It is resilient to job queue ordering
597616

598617
@Test

0 commit comments

Comments
 (0)