|
16 | 16 | import java.util.concurrent.CompletableFuture;
|
17 | 17 | import java.util.concurrent.CompletionStage;
|
18 | 18 | import java.util.concurrent.atomic.AtomicReference;
|
| 19 | +import java.util.function.BiConsumer; |
19 | 20 | import java.util.stream.Collectors;
|
20 | 21 |
|
21 | 22 | import static java.util.Collections.emptyList;
|
@@ -61,17 +62,25 @@ Object getCallContext() {
|
61 | 62 | private final DataLoader<K, V> dataLoader;
|
62 | 63 | private final Object batchLoadFunction;
|
63 | 64 | private final DataLoaderOptions loaderOptions;
|
64 |
| - private final CacheMap<Object, CompletableFuture<V>> futureCache; |
| 65 | + private final CacheMap<Object, V> futureCache; |
| 66 | + private final ValueCache<Object, V> valueCache; |
65 | 67 | private final List<LoaderQueueEntry<K, CompletableFuture<V>>> loaderQueue;
|
66 | 68 | private final StatisticsCollector stats;
|
67 | 69 | private final Clock clock;
|
68 | 70 | private final AtomicReference<Instant> lastDispatchTime;
|
69 | 71 |
|
70 |
| - DataLoaderHelper(DataLoader<K, V> dataLoader, Object batchLoadFunction, DataLoaderOptions loaderOptions, CacheMap<Object, CompletableFuture<V>> futureCache, StatisticsCollector stats, Clock clock) { |
| 72 | + DataLoaderHelper(DataLoader<K, V> dataLoader, |
| 73 | + Object batchLoadFunction, |
| 74 | + DataLoaderOptions loaderOptions, |
| 75 | + CacheMap<Object, V> futureCache, |
| 76 | + ValueCache<Object, V> valueCache, |
| 77 | + StatisticsCollector stats, |
| 78 | + Clock clock) { |
71 | 79 | this.dataLoader = dataLoader;
|
72 | 80 | this.batchLoadFunction = batchLoadFunction;
|
73 | 81 | this.loaderOptions = loaderOptions;
|
74 | 82 | this.futureCache = futureCache;
|
| 83 | + this.valueCache = valueCache; |
75 | 84 | this.loaderQueue = new ArrayList<>();
|
76 | 85 | this.stats = stats;
|
77 | 86 | this.clock = clock;
|
@@ -120,35 +129,13 @@ CompletableFuture<V> load(K key, Object loadContext) {
|
120 | 129 | boolean batchingEnabled = loaderOptions.batchingEnabled();
|
121 | 130 | boolean cachingEnabled = loaderOptions.cachingEnabled();
|
122 | 131 |
|
123 |
| - Object cacheKey = null; |
124 |
| - if (cachingEnabled) { |
125 |
| - if (loadContext == null) { |
126 |
| - cacheKey = getCacheKey(key); |
127 |
| - } else { |
128 |
| - cacheKey = getCacheKeyWithContext(key, loadContext); |
129 |
| - } |
130 |
| - } |
131 | 132 | stats.incrementLoadCount();
|
132 | 133 |
|
133 | 134 | if (cachingEnabled) {
|
134 |
| - if (futureCache.containsKey(cacheKey)) { |
135 |
| - stats.incrementCacheHitCount(); |
136 |
| - return futureCache.get(cacheKey); |
137 |
| - } |
138 |
| - } |
139 |
| - |
140 |
| - CompletableFuture<V> future = new CompletableFuture<>(); |
141 |
| - if (batchingEnabled) { |
142 |
| - loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext)); |
| 135 | + return loadFromCache(key, loadContext, batchingEnabled); |
143 | 136 | } else {
|
144 |
| - stats.incrementBatchLoadCountBy(1); |
145 |
| - // immediate execution of batch function |
146 |
| - future = invokeLoaderImmediately(key, loadContext); |
147 |
| - } |
148 |
| - if (cachingEnabled) { |
149 |
| - futureCache.set(cacheKey, future); |
| 137 | + return queueOrInvokeLoader(key, loadContext, batchingEnabled); |
150 | 138 | }
|
151 |
| - return future; |
152 | 139 | }
|
153 | 140 | }
|
154 | 141 |
|
@@ -296,6 +283,66 @@ private void possiblyClearCacheEntriesOnExceptions(List<K> keys) {
|
296 | 283 | }
|
297 | 284 | }
|
298 | 285 |
|
| 286 | + private CompletableFuture<V> loadFromCache(K key, Object loadContext, boolean batchingEnabled) { |
| 287 | + final Object cacheKey = loadContext == null ? getCacheKey(key) : getCacheKeyWithContext(key, loadContext); |
| 288 | + |
| 289 | + if (futureCache.containsKey(cacheKey)) { |
| 290 | + // We already have a promise for this key, no need to check value cache or queue up load |
| 291 | + stats.incrementCacheHitCount(); |
| 292 | + return futureCache.get(cacheKey); |
| 293 | + } |
| 294 | + |
| 295 | + /* |
| 296 | + We haven't been asked for this key yet. We want to do one of two things: |
| 297 | +
|
| 298 | + 1. Check if our cache store has it. If so: |
| 299 | + a. Get the value from the cache store |
| 300 | + b. Add a recovery case so we queue the load if fetching from cache store fails |
| 301 | + c. Put that future in our futureCache to hit the early return next time |
| 302 | + d. Return the resilient future |
| 303 | + 2. If not in value cache: |
| 304 | + a. queue or invoke the load |
| 305 | + b. Add a success handler to store the result in the cache store |
| 306 | + c. Return the result |
| 307 | + */ |
| 308 | + final CompletableFuture<V> future = new CompletableFuture<>(); |
| 309 | + |
| 310 | + valueCache.get(cacheKey).whenComplete((cachedValue, getCallEx) -> { |
| 311 | + if (getCallEx == null) { |
| 312 | + future.complete(cachedValue); |
| 313 | + } else { |
| 314 | + queueOrInvokeLoader(key, loadContext, batchingEnabled) |
| 315 | + .whenComplete(setValueIntoCacheAndCompleteFuture(cacheKey, future)); |
| 316 | + } |
| 317 | + }); |
| 318 | + |
| 319 | + futureCache.set(cacheKey, future); |
| 320 | + |
| 321 | + return future; |
| 322 | + } |
| 323 | + |
| 324 | + private BiConsumer<V, Throwable> setValueIntoCacheAndCompleteFuture(Object cacheKey, CompletableFuture<V> future) { |
| 325 | + return (result, loadCallEx) -> { |
| 326 | + if (loadCallEx == null) { |
| 327 | + valueCache.set(cacheKey, result) |
| 328 | + .whenComplete((v, setCallExIgnored) -> future.complete(result)); |
| 329 | + } else { |
| 330 | + future.completeExceptionally(loadCallEx); |
| 331 | + } |
| 332 | + }; |
| 333 | + } |
| 334 | + |
| 335 | + private CompletableFuture<V> queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled) { |
| 336 | + if (batchingEnabled) { |
| 337 | + CompletableFuture<V> future = new CompletableFuture<>(); |
| 338 | + loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext)); |
| 339 | + return future; |
| 340 | + } else { |
| 341 | + stats.incrementBatchLoadCountBy(1); |
| 342 | + // immediate execution of batch function |
| 343 | + return invokeLoaderImmediately(key, loadContext); |
| 344 | + } |
| 345 | + } |
299 | 346 |
|
300 | 347 | CompletableFuture<V> invokeLoaderImmediately(K key, Object keyContext) {
|
301 | 348 | List<K> keys = singletonList(key);
|
|
0 commit comments