Skip to content

Identifier generator fix #1608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
*/
package org.hibernate.reactive.id.impl;

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.net.impl.pool.CombinerExecutor;
import io.vertx.core.net.impl.pool.Executor;
import io.vertx.core.net.impl.pool.Task;
import org.hibernate.reactive.id.ReactiveIdentifierGenerator;
import org.hibernate.reactive.session.ReactiveConnectionSupplier;
import org.hibernate.reactive.util.impl.CompletionStages;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

Expand All @@ -18,84 +23,149 @@
/**
* A {@link ReactiveIdentifierGenerator} which uses the database to allocate
* blocks of ids. A block is identified by its "hi" value (the first id in
* the block). While a new block is being allocated, concurrent streams wait
* without blocking.
* the block). While a new block is being allocated, concurrent streams will
* defer the operation without blocking.
*
* @author Gavin King
* @author Davide D'Alto
* @author Sanne Grinovero
*
*/
public abstract class BlockingIdentifierGenerator implements ReactiveIdentifierGenerator<Long> {

/**
* The block size (the number of "lo" values for each "hi" value)
*/

protected abstract int getBlockSize();

private final GeneratorState state = new GeneratorState();

//Access to the critical section is to be performed exclusively
//via an Action passed to this executor, to ensure exclusive
//modification access.
//This replaces the synchronization blocks one would see in a similar
//service in Hibernate ORM, but using a non-blocking cooperative design.
private final CombinerExecutor executor = new CombinerExecutor( state );

/**
* Allocate a new block, by obtaining the next "hi" value from the database
*/
protected abstract CompletionStage<Long> nextHiValue(ReactiveConnectionSupplier session);

private int loValue;
private long hiValue;

private volatile List<Runnable> queue = null;
//Not strictly necessary to put these fields into a dedicated class, but it help
//to reason about what the current state is and what the CombinerExecutor is
//supposed to work on.
private static class GeneratorState {
private int loValue;
private long hiValue;
}

protected synchronized long next() {
return loValue > 0 && loValue < getBlockSize()
? hiValue + loValue++
//Critical section: needs to be accessed exclusively via the CombinerExecutor
//when there's contention; direct invocation is allowed in the fast path.
private synchronized long next() {
return state.loValue > 0 && state.loValue < getBlockSize()
? state.hiValue + state.loValue++
: -1; //flag value indicating that we need to hit db
}

protected synchronized long next(long hi) {
hiValue = hi;
loValue = 1;
//Critical section: needs to be accessed exclusively via the CombinerExecutor
private synchronized long next(long hi) {
state.hiValue = hi;
state.loValue = 1;
return hi;
}

@Override
public CompletionStage<Long> generate(ReactiveConnectionSupplier session, Object entity) {
if ( getBlockSize() <= 1 ) {
//special case where we're not using blocking at all
return nextHiValue( session );
public CompletionStage<Long> generate(ReactiveConnectionSupplier connectionSupplier, Object ignored) {
Objects.requireNonNull( connectionSupplier );

//Before submitting a task to the executor, let's try our luck via the fast-path
//(this does actually hit a synchronization, but it's extremely short)
final long next = next();
if ( next != -1 ) {
return CompletionStages.completedFuture( next );
}

long local = next();
if ( local >= 0 ) {
// We don't need to update or initialize the hi
// value in the table, so just increment the lo
// value and return the next id in the block
return completedFuture( local );
//Another special case we need to deal with; this is an unlikely configuration, but
//if it were to happen we should be better off with direct execution rather than using
//the co-operative executor:
if ( getBlockSize() <= 1 ) {
return nextHiValue( connectionSupplier )
.thenApply( i -> next( i ) );
}
else {
synchronized (this) {
CompletableFuture<Long> result = new CompletableFuture<>();
if ( queue == null ) {
// make a queue for any concurrent streams
queue = new ArrayList<>();
// go off and fetch the next hi value from db
nextHiValue( session ).thenAccept( id -> {
// Vertx.currentContext().runOnContext(v -> {
List<Runnable> list;
synchronized (this) {
// clone ref to the queue
list = queue;
queue = null;
// use the fetched hi value in this stream
result.complete( next( id ) );
}
// send waiting streams back to try again
list.forEach( Runnable::run );
// } );
} );

final CompletableFuture<Long> resultForThisEventLoop = new CompletableFuture<>();
final CompletableFuture<Long> result = new CompletableFuture<>();
executor.submit( new GenerateIdAction( connectionSupplier, result ) );
final Context context = Vertx.currentContext();
result.whenComplete( (id,t) -> {
final Context newContext = Vertx.currentContext();
//Need to be careful in resuming processing on the same context as the original
//request, potentially having to switch back if we're no longer executing on the same:
if ( newContext != context ) {
if ( t != null ) {
context.runOnContext( ( v ) -> resultForThisEventLoop.completeExceptionally( t ) );
} else {
context.runOnContext( ( v ) -> resultForThisEventLoop.complete( id ) );
}
else {
// wait for the concurrent fetch to complete
// note that we carefully capture the right session,entity here!
queue.add( () -> generate( session, entity ).thenAccept( result::complete ) );
}
else {
if ( t != null ) {
resultForThisEventLoop.completeExceptionally( t );
} else {
resultForThisEventLoop.complete( id );
}
return result;
}
});
return resultForThisEventLoop;
}

private final class GenerateIdAction implements Executor.Action<GeneratorState> {

private final ReactiveConnectionSupplier connectionSupplier;
private final CompletableFuture<Long> result;

public GenerateIdAction(ReactiveConnectionSupplier connectionSupplier, CompletableFuture<Long> result) {
this.connectionSupplier = Objects.requireNonNull(connectionSupplier);
this.result = Objects.requireNonNull(result);
}

@Override
public Task execute(GeneratorState state) {
long local = next();
if ( local >= 0 ) {
// We don't need to update or initialize the hi
// value in the table, so just increment the lo
// value and return the next id in the block
completedFuture( local )
.whenComplete( this::acceptAsReturnValue );
return null;
} else {
nextHiValue( connectionSupplier )
.whenComplete( (newlyGeneratedHi, throwable) -> {
if ( throwable != null ) {
result.completeExceptionally( throwable );
} else {
//We ignore the state argument as we actually use the field directly
//for convenience, but they are the same object.
executor.submit( stateIgnored -> {
result.complete( next( newlyGeneratedHi ) );
return null;
});
}
} );
return null;
}
}

private void acceptAsReturnValue(final Long aLong, final Throwable throwable) {
if ( throwable != null ) {
result.completeExceptionally( throwable );
}
else {
result.complete( aLong );
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ protected int getBlockSize() {
@Override
protected CompletionStage<Long> nextHiValue(ReactiveConnectionSupplier session) {
return session.getReactiveConnection()
.selectIdentifier( sql, NO_PARAMS, Long.class )
.thenApply( this::next );
.selectIdentifier( sql, NO_PARAMS, Long.class );
}

// First one to get called during initialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import jakarta.persistence.EntityGraph;
import jakarta.persistence.FlushModeType;
import jakarta.persistence.LockModeType;
import jakarta.persistence.PersistenceException;
import jakarta.persistence.criteria.CriteriaDelete;
import jakarta.persistence.criteria.CriteriaQuery;
import jakarta.persistence.criteria.CriteriaUpdate;
Expand All @@ -24,6 +25,7 @@
import org.hibernate.reactive.common.AffectedEntities;
import org.hibernate.reactive.common.Identifier;
import org.hibernate.reactive.common.ResultSetMapping;
import org.hibernate.reactive.engine.spi.ReactiveSharedSessionContractImplementor;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.logging.impl.LoggerFactory;
import org.hibernate.reactive.mutiny.Mutiny;
Expand All @@ -32,6 +34,8 @@
import org.hibernate.reactive.mutiny.Mutiny.SelectionQuery;
import org.hibernate.reactive.pool.ReactiveConnection;
import org.hibernate.reactive.query.sqm.iternal.ReactiveQuerySqmImpl;
import org.hibernate.reactive.session.ReactiveConnectionSupplier;
import org.hibernate.reactive.session.ReactiveQueryProducer;
import org.hibernate.reactive.session.ReactiveSession;

import java.lang.invoke.MethodHandles;
Expand Down Expand Up @@ -538,4 +542,21 @@ public <T> EntityGraph<T> createEntityGraph(Class<T> rootType) {
public <T> EntityGraph<T> createEntityGraph(Class<T> rootType, String graphName) {
return delegate.createEntityGraph( rootType, graphName );
}

public <T> T unwrap(Class<T> clazz) {
if ( ReactiveConnectionSupplier.class.isAssignableFrom( clazz ) ) {
return clazz.cast( this.delegate );
}
else if ( ReactiveSession.class.isAssignableFrom( clazz ) ) {
return clazz.cast( this.delegate );
}
else if ( ReactiveQueryProducer.class.isAssignableFrom( clazz ) ) {
return clazz.cast( this.delegate );
}
else if ( ReactiveSharedSessionContractImplementor.class.isAssignableFrom( clazz ) ) {
return clazz.cast( this.delegate );
}
throw new PersistenceException( "Cannot unwrap type " + clazz );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import jakarta.persistence.EntityGraph;
import jakarta.persistence.FlushModeType;
import jakarta.persistence.LockModeType;
import jakarta.persistence.PersistenceException;
import jakarta.persistence.criteria.CriteriaDelete;
import jakarta.persistence.criteria.CriteriaQuery;
import jakarta.persistence.criteria.CriteriaUpdate;
Expand All @@ -25,9 +26,12 @@
import org.hibernate.reactive.common.Identifier;
import org.hibernate.reactive.common.ResultSetMapping;
import org.hibernate.reactive.engine.ReactiveActionQueue;
import org.hibernate.reactive.engine.spi.ReactiveSharedSessionContractImplementor;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.logging.impl.LoggerFactory;
import org.hibernate.reactive.pool.ReactiveConnection;
import org.hibernate.reactive.session.ReactiveConnectionSupplier;
import org.hibernate.reactive.session.ReactiveQueryProducer;
import org.hibernate.reactive.session.ReactiveSession;
import org.hibernate.reactive.stage.Stage;
import org.hibernate.reactive.stage.Stage.MutationQuery;
Expand Down Expand Up @@ -540,4 +544,20 @@ public <R> Query<R> createNativeQuery(String queryString) {
public <R> Query<R> createNativeQuery(String queryString, AffectedEntities affectedEntities) {
return new StageQueryImpl<>( delegate.createReactiveNativeQuery( queryString, affectedEntities ) );
}

public <T> T unwrap(Class<T> clazz) {
if ( ReactiveConnectionSupplier.class.isAssignableFrom( clazz ) ) {
return clazz.cast( this.delegate );
}
else if ( ReactiveSession.class.isAssignableFrom( clazz ) ) {
return clazz.cast( this.delegate );
}
else if ( ReactiveQueryProducer.class.isAssignableFrom( clazz ) ) {
return clazz.cast( this.delegate );
}
else if ( ReactiveSharedSessionContractImplementor.class.isAssignableFrom( clazz ) ) {
return clazz.cast( this.delegate );
}
throw new PersistenceException( "Cannot unwrap type " + clazz );
}
}
Loading