Skip to content

POC on DataLoader automatic dispatch capability #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 28 commits into from

Conversation

gkesler
Copy link

@gkesler gkesler commented Jun 28, 2019

Following our twitter conversation with @andimarek I have created a POC demonstrating an ability to trigger DataLoader dispatch automatically when the requestor (client) thread blocks on the DataLoader promises.

In the current state, DataLoader fulfills load promises when DataLoader.dispatch() method is explicitly triggered. This to the great extent couples client code and GraphQL engine. In order to trigger DataLoader.dispatch() outside a client DataFetcher, a client needs to configure DataLoader aware Instrumentation, either supplied by the GraphQL framework or provided by the client, which is not a trivial exercise.

The main idea of automatic dispatch mechanism is to asynchronously check when client thread enters waiting (actually, not RUNNABLE) state and launch DataLoader dispatch process. Upon completion, the dispatch task is either destroyed or returned back to the thread pool where it was taken from. To minimize resource contention, the dispatch thread is not running permanently. Client can supply his own Executor to fully control thread usage.
If not Executor is not configured explicitly, the common ForkJoinPool is used. It is always available, lightweight and can handle large amount of tasks without resource contention. CompletableFuture xxxAsync methods by default use common pool to complete asynchronously.

The main usage pattern is:

import org.dataloader.nextgen.DataLoader;

...
BatchLoader<K, V> batchLoader = ...
DataLoader<K, V> dataLoader = new AutoDataLoader<>(batchLoader);
CompletableFuture<V> f1 = dataLoader.load(key1);
CompletableFuture<V> f2 = dataLoader.load(key2);

// note, no dispatch
V v1 = f1.get();
V v2 = f2.get();

This is an attempt to match NodeJS DataLoader capabilities.

Here is a sequence diagram illustrating automatic dispatch.
image

@bbakerman @bbakerman WDYT?

@gkesler
Copy link
Author

gkesler commented Jun 28, 2019

@andimarek @bbakerman how can I view the test results to see what is failing?

gkesler added 6 commits June 27, 2019 19:31
…T [timed] waiting

- added yielding while waiting to force task switch
- simplified access to Dispatcher.dataLoaders map
…ng with reading DataLoader.dispatchDepth. I.e. added proper synchronization when accessing AutoDataLoader member variables
@kaqqao
Copy link

kaqqao commented Jun 30, 2019

I neither have the full context nor yet fully understand the PR code, so pardon my uninformed interjection, but this sentence caught my attention:

trigger DataLoader dispatch automatically when the requestor (client) thread blocks on the DataLoader promises

If the main idea is to dispatch once a thread is waiting on the promise, wouldn't it (at least theoretically) be enough to wrap each such promise with one that dispatches when its blocking operations (e.g. get) are called?

In extremely simplified terms, do something like this in the DataLoader:

public CompletableFuture<V> load(K key) {
    CompletableFuture<V> original = ...; //whatever it does now
    return new CompletableFuture<>() {
        public V get() {
            DataLoader.this.dispatch(); // dispatch when something blocks on this future
            return original.get();
        }
    }
}

@gkesler
Copy link
Author

gkesler commented Jun 30, 2019

@kaqqao the problem is that not every call to DataLoader.load should trigger dispatch. This would lead to calling the load mechanism for every key. This PR attempts to demonstrate (not to solve yet!) the ability to allow individual .load or .loadMany calls to be queued in the DataLoader - as it does it today - and then, when the caller falls into waiting to batch all pending load requests and execute only 1 batched load request. That is the essence of solving N+1 problem without tight coupling between graphql execution and the client/application code. This is what is natively built into JS dataloader implementation where load calls are batched and executed only after the main thread stops execution awaiting for the promises to resolve. This happens at the end of JS event loop and then JS executes callbacks. That triggers data loaders to batch their pending load requests into single batched ones.

Additionally, wait for promises completion in AsyncExecutionStrategy is done via CompletableFuture.allOf call that does not invoke either CompletableFuture.get or CompletableFuture.join so override CompletableFuture.get won't help in this case.

@andimarek
Copy link
Member

thanks a lot @gkesler ... give us some time please to review it ... thanks!

@bbakerman
Copy link
Member

I had a brief look at this (on the train home) but not in enougb detail.

I can however tell you my concerns - lets think of them as design criteria

The #1 thing that MUST not happen with this change is that callers of dataLoader.load(key) is HANG.

That is the completable future they get never completes because the auto dispatching did not run for some reason.

We had bugs in certain conditions in the past even with the more simple dispatching where we didn't count the graphql fields correctly and it was possible to lock a request forever.

Over dispatching is more preferable to under dispatching.

Therefore how can we test that complex and multi threaded conditions wont HANG?

The other question I have is - how will this approach go in Reactive code base. That is things like RxJava or Reactor. While they do use executors to pump messages - is this a fit?

As I said I have not had more than 10 mins looking at this so far so I thought I would put up some "criteria" before I look in more detail.

As for why the tests are failing - perhaps its a permission problem in Travis

@bbakerman
Copy link
Member

Can you not see https://travis-ci.org/graphql-java/java-dataloader/builds/552002096?utm_source=github_status&utm_medium=notification

:processTestResources NO-SOURCE
:testClasses
:test
org.dataloader.nextgen.AutoDataLoaderTest > should_Batch_loads_occurring_within_futures FAILED
    java.lang.AssertionError at AutoDataLoaderTest.java:952
123 tests completed, 1 failed
:test FAILED


dataLoaders.put(dataLoader, Thread.currentThread());
return this;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this wont work.

You do registration (aka association of a AutoDataLoader instance to a thread) at the construction of the AutoDataLoader. But in an execution runtime in graphql say then that may happen on a different thread.

Are you saying its hard contract that a AutoDataLoader must be used on the thread it was created on in order to track its dispatch-edness?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could make that a requirement -but you you would want ot check it on call call to load such that if the currentThread() is not the registered one then you throw an IllegalStateE say

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think more on this with Andreas M - I am not sure this can work.

That is in graphql-java we invoke fields on whatever thread completed the value

 // its more complicated than this but I am trying to show it in psudeo code
 CF result = dataFetcher.get(env)
 result.thenApply( val -> lowerLevelCode(val))

So graphql-java MAY jump all over a set of threads. So really how do we decide that a thread for a autoDataLoader.load(key) call is in WAIT? How can we know which Thread is the one we should watch for ?

The current registration on creation of the AutoDataLoader will not be enough

Copy link
Author

@gkesler gkesler Jul 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course a DataLoader needs association with a Thread or Threads that need promises to complete. This is just a POC. If this is a requirement, Dispatcher can be modified to have 1:N DataLoader->Thread association and registration can be either deferred to for instance load call.

PS. Or as you've mentioned in the next comment, we can verify the calling thread.
So far, I haven't seen in the examples using other threads, but maybe that could be changed.
BTW, I have tested AutoDataLoader with the existing DataLoader examples in graphql-java - worked without problems.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current field resolution strategy is (pseudocode)

List<CF> futures = new List<>();

foreach field in selection {
   // this calls DataFetcher to load data
   CF future = resolveField(field, env);
   futures.add(future);
}

// this causes the current thread to wait for the futures to complete
Async.each(futures);

The current graphql execution strategy is effectively single threaded (on a request thread) and after each round of field resolutions it waits for the fields to be resolved in order to move to the next set of fields. Of course we cannot control the client code, but the contract is that the client code generates promises to provide data. Graphql execution waits for that data and uses it as source(s) to resolve next set of fields. The main purpose of DataLoader is to decouple the engine from how the data is provided. The engine does not even need to know that DataLoader is used - that is totally up to the client.

In your example CF.get() causes the graphql execution thread to WAIT for the CF to complete similarly to my example above.
The goal of AutoDataLoader is to detect when execution thread enters WAIT state and then execute dispatch to fulfill the promises.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current graphql execution strategy is effectively single threaded (on a request thread)

yes as implemented right now the returned CFs are aligned with the request thread. However out next gen engine is toying with the idea that we send of N CFs for each node and we weave thogether as they resolve not cordinated as they are right now.

So I might be getting ahead of ourselves here but I think the contract that the thread that created the AutoDL MUST be the thread is tracked for WAIT state (for dispayching) is too strict.

Imagine some Project Reactor code that setup data loaders in Thread A etc.. but ended up running the query in thread B. You only have two threads in action but already you have broken the contract and you WILL hang (since no one will call dispatch())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dispatcher can be modified to have 1:N DataLoader->Thread association and registration can be either deferred to for instance load call.

yes I think we would need this. I think a contact of "any thread that calls dl.load() is tracked" would work.

return true;
}

return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does java.lang.Thread.State#BLOCKED apply here? is that not a waiting state?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BLOCKED
A thread that is blocked waiting for a monitor lock is in this state.

As far as I understand a Thread enters BLOCKED state when it tries to acquire a lock entering a critical section (like synchronized block to Lock.lock()). This state is different from WAITING or TIMED_WAITING state when a thread waits to be notified. When a thread waits for CF to complete, it has one of those states, not the BLOCKED one.

@bbakerman
Copy link
Member

One other thing I am interested in (that is a unsolved problem we have today in DataLoader) is how to chain data loader calls

Today people want to do this

fetch for field X

   CF promiseA = dataLoaderA.load(key)
   // no dispatch pump here
   CF promiseB  = promiseA.thenApply( a -> dataLoaderB.load(a.key2B)
   return promiseB

This does not work in graphql-java today because while the promise is returned as expected, none one is pumping the dispatch on dataLoaderB because field tracking says that dispatched has been called for field X and there is no no need to dispatchAll again.

While this is not the end of the world it is nice. People could jusrt chain futures but they wanrt batch loading and caching of the values.

So perhaps a AutoDataLoader could help here?

@gkesler
Copy link
Author

gkesler commented Jul 8, 2019

@bbakerman thanks for your requirements.

I absolutely agree that the callers shall not HANG waiting forever for the data promised via DataLoader.load. That was also my #1 reason not to have a dispatch thread blocked on some sort of semaphore waiting for load requests to arrive. Instead, load request causes a short living thread to execute that dies (usually, gets cycled back to the pool) after that load request and possibly inferred ones had been dispatched. A load request that wasn't dispatched by the previous thread would be dispatched by the next one if necessary, so from this perspective this POC already favors over dispatch to make sure the clients don't HANG.

I would like to know better about use of reactive executors (DataFetchers?).
It is my understanding that with current graphql-java architecture/API reactive data fetching won't make any difference. DataLoader is NOT reactive, and even if the client somehow wraps it into Publisher or Observable, the graphql-java contract expects a CompletableFuture that in this case must be completed by the client, not by the framework itself, so most likely AutoDataLoader is not even a concern here and the client can use current DataLoader.

In any case, AutoDataLoader needs an Executor that has at least 1 spare thread in order to asynchronously dispatch the promises (shall AutoDataLoader be renamed to AsyncDataLoader?). The most recommended source of threads would be common ForkJoinPool as it is always there and ForkJoinTasks that run in that pool are much lighter weight comparing to standard Java Threads. Same thread executors like NonBlockingMutexExecutor will not HANG the client but they won't do the good job of batching load calls as every single load call will be dispatched separately.

@gkesler
Copy link
Author

gkesler commented Jul 8, 2019

@bbakerman
RE: chaining dataloader calls

AutoDataLoader solves that natively.
In your example

CF promiseA = dataLoaderA.load(key)
   // no dispatch pump here
   CF promiseB  = promiseA.thenApply( a -> dataLoaderB.load(a.key2B)
   return promiseB

the execution/calling thread is waiting either on both promiseA and promiseB to complete or just on promiseB. Right?

First call to dataLoaderA.load will cause associated Dispatcher to run dataLoaderA.dispatch. During that call promiseA completes and immediately after (on the same thread actually) the next load call dataLoaderB.load will take place. The second load will be queued by the Dispatcher and it will cause dataLoaderB.dispatch to execute right after the first one finishes.

@bbakerman
Copy link
Member

I would like to know better about use of reactive executors (DataFetchers?).
It is my understanding that with current graphql-java architecture/API reactive data fetching won't make any difference.

No we have no native reactive DataFetchers etc.. we only use JDK CompleteableFutures

However IF the surrounding application code is reactive (eg imagine a graphql-java engine inside a WebFlux app engine) then what you see is code completing on many different threads. Those threads are managed the the Project Reactor thread pools.

Reactor for example has a nice Mono.toCompleteableFuture that a reactive app can use to interop with CF based code bases like graphql-java

The short of it is that the execution phase can run on different threads and there is almost always different threads supplying values to the DataFetcher - even if they are today synced back to the graphql-java execution thread via Aync.each

@gkesler
Copy link
Author

gkesler commented Aug 12, 2019

Thanks for the explanation @bbakerman

DataFetchers can supply their values on multiple threads if they need to.
Execution control runs on a single thread, making it to run on different threads would require significant investments. Do you know if there are any plans to run execution on multiple threads?

When execution needs to resolve current set of fields it waits via Async.each which is effectively a CompletableFuture.allOf and upon resolving all CFs it recursively manages to resolve the next set of values [on the same thread], which will again wait on Async.each call and so on.
So the idea of automatic dispatch is based on the fact that the current graphql execution thread waits on unresolved CFs and that triggers a dispatcher thread that monitors the state of the execution thread to trigger DataLoader.dispatch method on the DataLoaders that have promised to load values via their DataLoader.load calls.
Please look at the test all existing DataLoader tests - even multithreaded ones - are passing now with the AutoDataLoader, except AutoDataLoaderTest.should_Batch_loads_occurring_within_futures that fails on the batching expectations, i.e. over-fetches due to its completely async structure where the execution thread falls into the wait before supplier threads request data from DataLoader. But other than that, there are no difference in the supplied values, i.e. there is no under-fetch. I wonder though how common is this scenario in the field?

@TGNThump
Copy link

TGNThump commented Nov 11, 2019

Any updates on what’s left to do for this PR to be merged?

@sheepdreamofandroids
Copy link

sheepdreamofandroids commented Nov 12, 2019

Just thinking out loud here:
The AutoDataLoader needs some kind of trigger to know when to dispatch. In this POC that trigger is a particular thread blocking on a future. Most of the discussion seems to be that that makes assumptions about the user of AutoDataLoader. Theoretically, graphql-java could be completely async and no thread ever waits.
We could remedy that problem by introducing additional triggers since additional calls to dispatch are cheap.

  • when the batch size has been reached (that one probably already exists).
  • when no keys have been added to a dataloader for some (configurable) time.

This will make it impossible to ever hang waiting for the dataloader.

@zmchenry
Copy link

zmchenry commented Mar 4, 2020

Any updates on this PR?

@bbakerman
Copy link
Member

As the code stands today we are not in a good position to accept this code.

However we are considering change the library which will allow more custom implementations

https://github.com/graphql-java/java-dataloader/wiki/Thoughts-on-a-3.x-breaking-change

Once we have that we could have other implementations of DataLoader such as this one.

The fundamental problem (as outlined above by @sheepdreamofandroids ) is that in a JVM when is the right time to dispatch? This is in some ways an unsolved generalised problem

@kaqqao
Copy link

kaqqao commented Mar 5, 2020

@bbakerman Am just waxing philosophical here... Would going full reactive solve this? Since the reactive types are designed mostly for pull-based use, a dispatch could maybe be triggered on subscription only...

@bbakerman
Copy link
Member

a dispatch could maybe be triggered on subscription only...

Kinda but no I dont think its enough. In a reactive you get one subscription event say.

However with the DataLoader pattern, you can call it multiple times per request. eg use it to get all the users then use it to get all their friends and then use it to get all friends friends.

Now perhaps you meant that a DataFecther is a reactive stream and hence its got s subscription event per field fetch. Well maybe BUT the idea behind DataLoader is that it can reach across multiple fields and "cache/batch" calls for the same objects and only "dispatch" them at a optimal time.

Finding that optimal time is the problem. In node.js they have the nextTick which works well from a generic algo point of view. However we have tested this and found that its not as optimal as the graphql-java field tracking approach. But its pretty good and much simpler so pretty good is good enough

Some of the reactive systems have event loop style "ticks" and hence that could be used for the idea that the system has gone quiet and hence can dispatch. I think Vertx has such a thing. I have looked in Reactor and have not found it. Would love for Reactor experts to tell me otherwise.

@sheepdreamofandroids
Copy link

Even "ticks" can only be optimal for some usecases. There is always a tradeoff between fetching early of fetching a large batch. Which one is best really depends on the usecase.

There are many ways of dealing with this, for example the graphql client could send a header with a preference for efficiency vs low latency. Maybe even using some machine learning algorithm that finds the optimal strategy for each datafetcher.

Making DataLoader an interface would be a great idea given that we clearly don't have the one implementation to rule them all ;-)

Another possibility would be to have a pluggable AutoDispatchStrategy (also an interface) for dealing with automatic dispatch. Default implementations could be NoAutoDispatch, TimedAutoDispatch and BlockedPromiseAutoDispatch.

@sheepdreamofandroids
Copy link

Just as an update: I implemented a poor man's version of this by extending DataLoaderRegistry (in Kotlin):

object : DataLoaderRegistry() {
        override fun dispatchAll() {
            val dispatchedFutures: List<CompletableFuture<out List<*>>> = dataLoaders.map { it.dispatch() }
            CompletableFuture.allOf(* dispatchedFutures.toTypedArray())
                .handleAsync { _, _ ->
                    val noNewDataIsReceived = dispatchedFutures.all {
                        it.isCompletedExceptionally
                                || it.isCancelled
                                || it.get().isNullOrEmpty() // it definitely completed because of allOf() above
                    }
                    if (!noNewDataIsReceived) dispatchAll()
                }
        }
    }

So after a dispatchAll I wait for all futures to be completed and if any of them succeeded, a new dispatch might be necessary.
This will fail when any future hangs. And when many calls to dispatchAll() are done independently will lead to many redundant additional calls to dispatchAll(). But then it is a poor man's version...

I'm very open to suggestions how to improve this. For example how to "debounce" the calls to the actual dataloaders to avoid calling them too often.

@bbakerman
Copy link
Member

Closing as out of date - interesting idea of course - but it was always a POC

@bbakerman bbakerman closed this Feb 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants