Skip to content

Commit 7cbef3a

Browse files
committed
New design for BlockingIdentifierGenerator
1 parent 0c1c026 commit 7cbef3a

File tree

2 files changed

+122
-53
lines changed

2 files changed

+122
-53
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java

Lines changed: 121 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@
55
*/
66
package org.hibernate.reactive.id.impl;
77

8+
import io.vertx.core.Context;
9+
import io.vertx.core.Vertx;
10+
import io.vertx.core.net.impl.pool.CombinerExecutor;
11+
import io.vertx.core.net.impl.pool.Executor;
12+
import io.vertx.core.net.impl.pool.Task;
813
import org.hibernate.reactive.id.ReactiveIdentifierGenerator;
914
import org.hibernate.reactive.session.ReactiveConnectionSupplier;
15+
import org.hibernate.reactive.util.impl.CompletionStages;
1016

11-
import java.util.ArrayList;
12-
import java.util.List;
17+
import java.util.Objects;
1318
import java.util.concurrent.CompletableFuture;
1419
import java.util.concurrent.CompletionStage;
1520

@@ -18,84 +23,149 @@
1823
/**
1924
* A {@link ReactiveIdentifierGenerator} which uses the database to allocate
2025
* blocks of ids. A block is identified by its "hi" value (the first id in
21-
* the block). While a new block is being allocated, concurrent streams wait
22-
* without blocking.
26+
* the block). While a new block is being allocated, concurrent streams will
27+
* defer the operation without blocking.
2328
*
2429
* @author Gavin King
30+
* @author Davide D'Alto
31+
* @author Sanne Grinovero
32+
*
2533
*/
2634
public abstract class BlockingIdentifierGenerator implements ReactiveIdentifierGenerator<Long> {
2735

2836
/**
2937
* The block size (the number of "lo" values for each "hi" value)
3038
*/
31-
3239
protected abstract int getBlockSize();
3340

41+
private final GeneratorState state = new GeneratorState();
42+
43+
//Access to the critical section is to be performed exclusively
44+
//via an Action passed to this executor, to ensure exclusive
45+
//modification access.
46+
//This replaces the synchronization blocks one would see in a similar
47+
//service in Hibernate ORM, but using a non-blocking cooperative design.
48+
private final CombinerExecutor executor = new CombinerExecutor( state );
49+
3450
/**
3551
* Allocate a new block, by obtaining the next "hi" value from the database
3652
*/
3753
protected abstract CompletionStage<Long> nextHiValue(ReactiveConnectionSupplier session);
3854

39-
private int loValue;
40-
private long hiValue;
41-
42-
private volatile List<Runnable> queue = null;
55+
//Not strictly necessary to put these fields into a dedicated class, but it help
56+
//to reason about what the current state is and what the CombinerExecutor is
57+
//supposed to work on.
58+
private static class GeneratorState {
59+
private int loValue;
60+
private long hiValue;
61+
}
4362

44-
protected synchronized long next() {
45-
return loValue > 0 && loValue < getBlockSize()
46-
? hiValue + loValue++
63+
//Critical section: needs to be accessed exclusively via the CombinerExecutor
64+
//when there's contention; direct invocation is allowed in the fast path.
65+
private synchronized long next() {
66+
return state.loValue > 0 && state.loValue < getBlockSize()
67+
? state.hiValue + state.loValue++
4768
: -1; //flag value indicating that we need to hit db
4869
}
4970

50-
protected synchronized long next(long hi) {
51-
hiValue = hi;
52-
loValue = 1;
71+
//Critical section: needs to be accessed exclusively via the CombinerExecutor
72+
private synchronized long next(long hi) {
73+
state.hiValue = hi;
74+
state.loValue = 1;
5375
return hi;
5476
}
5577

5678
@Override
57-
public CompletionStage<Long> generate(ReactiveConnectionSupplier session, Object entity) {
58-
if ( getBlockSize() <= 1 ) {
59-
//special case where we're not using blocking at all
60-
return nextHiValue( session );
79+
public CompletionStage<Long> generate(ReactiveConnectionSupplier connectionSupplier, Object ignored) {
80+
Objects.requireNonNull( connectionSupplier );
81+
82+
//Before submitting a task to the executor, let's try our luck via the fast-path
83+
//(this does actually hit a synchronization, but it's extremely short)
84+
final long next = next();
85+
if ( next != -1 ) {
86+
return CompletionStages.completedFuture( next );
6187
}
6288

63-
long local = next();
64-
if ( local >= 0 ) {
65-
// We don't need to update or initialize the hi
66-
// value in the table, so just increment the lo
67-
// value and return the next id in the block
68-
return completedFuture( local );
89+
//Another special case we need to deal with; this is an unlikely configuration, but
90+
//if it were to happen we should be better off with direct execution rather than using
91+
//the co-operative executor:
92+
if ( getBlockSize() <= 1 ) {
93+
return nextHiValue( connectionSupplier )
94+
.thenApply( i -> next( i ) );
6995
}
70-
else {
71-
synchronized (this) {
72-
CompletableFuture<Long> result = new CompletableFuture<>();
73-
if ( queue == null ) {
74-
// make a queue for any concurrent streams
75-
queue = new ArrayList<>();
76-
// go off and fetch the next hi value from db
77-
nextHiValue( session ).thenAccept( id -> {
78-
// Vertx.currentContext().runOnContext(v -> {
79-
List<Runnable> list;
80-
synchronized (this) {
81-
// clone ref to the queue
82-
list = queue;
83-
queue = null;
84-
// use the fetched hi value in this stream
85-
result.complete( next( id ) );
86-
}
87-
// send waiting streams back to try again
88-
list.forEach( Runnable::run );
89-
// } );
90-
} );
96+
97+
final CompletableFuture<Long> resultForThisEventLoop = new CompletableFuture<>();
98+
final CompletableFuture<Long> result = new CompletableFuture<>();
99+
executor.submit( new GenerateIdAction( connectionSupplier, result ) );
100+
final Context context = Vertx.currentContext();
101+
result.whenComplete( (id,t) -> {
102+
final Context newContext = Vertx.currentContext();
103+
//Need to be careful in resuming processing on the same context as the original
104+
//request, potentially having to switch back if we're no longer executing on the same:
105+
if ( newContext != context ) {
106+
if ( t != null ) {
107+
context.runOnContext( ( v ) -> resultForThisEventLoop.completeExceptionally( t ) );
108+
} else {
109+
context.runOnContext( ( v ) -> resultForThisEventLoop.complete( id ) );
91110
}
92-
else {
93-
// wait for the concurrent fetch to complete
94-
// note that we carefully capture the right session,entity here!
95-
queue.add( () -> generate( session, entity ).thenAccept( result::complete ) );
111+
}
112+
else {
113+
if ( t != null ) {
114+
resultForThisEventLoop.completeExceptionally( t );
115+
} else {
116+
resultForThisEventLoop.complete( id );
96117
}
97-
return result;
118+
}
119+
});
120+
return resultForThisEventLoop;
121+
}
122+
123+
private final class GenerateIdAction implements Executor.Action<GeneratorState> {
124+
125+
private final ReactiveConnectionSupplier connectionSupplier;
126+
private final CompletableFuture<Long> result;
127+
128+
public GenerateIdAction(ReactiveConnectionSupplier connectionSupplier, CompletableFuture<Long> result) {
129+
this.connectionSupplier = Objects.requireNonNull(connectionSupplier);
130+
this.result = Objects.requireNonNull(result);
131+
}
132+
133+
@Override
134+
public Task execute(GeneratorState state) {
135+
long local = next();
136+
if ( local >= 0 ) {
137+
// We don't need to update or initialize the hi
138+
// value in the table, so just increment the lo
139+
// value and return the next id in the block
140+
completedFuture( local )
141+
.whenComplete( this::acceptAsReturnValue );
142+
return null;
143+
} else {
144+
nextHiValue( connectionSupplier )
145+
.whenComplete( (newlyGeneratedHi, throwable) -> {
146+
if ( throwable != null ) {
147+
result.completeExceptionally( throwable );
148+
} else {
149+
//We ignore the state argument as we actually use the field directly
150+
//for convenience, but they are the same object.
151+
executor.submit( stateIgnored -> {
152+
result.complete( next( newlyGeneratedHi ) );
153+
return null;
154+
});
155+
}
156+
} );
157+
return null;
158+
}
159+
}
160+
161+
private void acceptAsReturnValue(final Long aLong, final Throwable throwable) {
162+
if ( throwable != null ) {
163+
result.completeExceptionally( throwable );
164+
}
165+
else {
166+
result.complete( aLong );
98167
}
99168
}
100169
}
170+
101171
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/ReactiveSequenceIdentifierGenerator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ protected int getBlockSize() {
5959
@Override
6060
protected CompletionStage<Long> nextHiValue(ReactiveConnectionSupplier session) {
6161
return session.getReactiveConnection()
62-
.selectIdentifier( sql, NO_PARAMS, Long.class )
63-
.thenApply( this::next );
62+
.selectIdentifier( sql, NO_PARAMS, Long.class );
6463
}
6564

6665
// First one to get called during initialization

0 commit comments

Comments
 (0)