diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java index 1752d163d..d15de1779 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java @@ -5,11 +5,16 @@ */ package org.hibernate.reactive.id.impl; +import io.vertx.core.Context; +import io.vertx.core.Vertx; +import io.vertx.core.net.impl.pool.CombinerExecutor; +import io.vertx.core.net.impl.pool.Executor; +import io.vertx.core.net.impl.pool.Task; import org.hibernate.reactive.id.ReactiveIdentifierGenerator; import org.hibernate.reactive.session.ReactiveConnectionSupplier; +import org.hibernate.reactive.util.impl.CompletionStages; -import java.util.ArrayList; -import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -18,84 +23,149 @@ /** * A {@link ReactiveIdentifierGenerator} which uses the database to allocate * blocks of ids. A block is identified by its "hi" value (the first id in - * the block). While a new block is being allocated, concurrent streams wait - * without blocking. + * the block). While a new block is being allocated, concurrent streams will + * defer the operation without blocking. * * @author Gavin King + * @author Davide D'Alto + * @author Sanne Grinovero + * */ public abstract class BlockingIdentifierGenerator implements ReactiveIdentifierGenerator { /** * The block size (the number of "lo" values for each "hi" value) */ - protected abstract int getBlockSize(); + private final GeneratorState state = new GeneratorState(); + + //Access to the critical section is to be performed exclusively + //via an Action passed to this executor, to ensure exclusive + //modification access. + //This replaces the synchronization blocks one would see in a similar + //service in Hibernate ORM, but using a non-blocking cooperative design. + private final CombinerExecutor executor = new CombinerExecutor( state ); + /** * Allocate a new block, by obtaining the next "hi" value from the database */ protected abstract CompletionStage nextHiValue(ReactiveConnectionSupplier session); - private int loValue; - private long hiValue; - - private volatile List queue = null; + //Not strictly necessary to put these fields into a dedicated class, but it help + //to reason about what the current state is and what the CombinerExecutor is + //supposed to work on. + private static class GeneratorState { + private int loValue; + private long hiValue; + } - protected synchronized long next() { - return loValue > 0 && loValue < getBlockSize() - ? hiValue + loValue++ + //Critical section: needs to be accessed exclusively via the CombinerExecutor + //when there's contention; direct invocation is allowed in the fast path. + private synchronized long next() { + return state.loValue > 0 && state.loValue < getBlockSize() + ? state.hiValue + state.loValue++ : -1; //flag value indicating that we need to hit db } - protected synchronized long next(long hi) { - hiValue = hi; - loValue = 1; + //Critical section: needs to be accessed exclusively via the CombinerExecutor + private synchronized long next(long hi) { + state.hiValue = hi; + state.loValue = 1; return hi; } @Override - public CompletionStage generate(ReactiveConnectionSupplier session, Object entity) { - if ( getBlockSize() <= 1 ) { - //special case where we're not using blocking at all - return nextHiValue( session ); + public CompletionStage generate(ReactiveConnectionSupplier connectionSupplier, Object ignored) { + Objects.requireNonNull( connectionSupplier ); + + //Before submitting a task to the executor, let's try our luck via the fast-path + //(this does actually hit a synchronization, but it's extremely short) + final long next = next(); + if ( next != -1 ) { + return CompletionStages.completedFuture( next ); } - long local = next(); - if ( local >= 0 ) { - // We don't need to update or initialize the hi - // value in the table, so just increment the lo - // value and return the next id in the block - return completedFuture( local ); + //Another special case we need to deal with; this is an unlikely configuration, but + //if it were to happen we should be better off with direct execution rather than using + //the co-operative executor: + if ( getBlockSize() <= 1 ) { + return nextHiValue( connectionSupplier ) + .thenApply( i -> next( i ) ); } - else { - synchronized (this) { - CompletableFuture result = new CompletableFuture<>(); - if ( queue == null ) { - // make a queue for any concurrent streams - queue = new ArrayList<>(); - // go off and fetch the next hi value from db - nextHiValue( session ).thenAccept( id -> { -// Vertx.currentContext().runOnContext(v -> { - List list; - synchronized (this) { - // clone ref to the queue - list = queue; - queue = null; - // use the fetched hi value in this stream - result.complete( next( id ) ); - } - // send waiting streams back to try again - list.forEach( Runnable::run ); -// } ); - } ); + + final CompletableFuture resultForThisEventLoop = new CompletableFuture<>(); + final CompletableFuture result = new CompletableFuture<>(); + executor.submit( new GenerateIdAction( connectionSupplier, result ) ); + final Context context = Vertx.currentContext(); + result.whenComplete( (id,t) -> { + final Context newContext = Vertx.currentContext(); + //Need to be careful in resuming processing on the same context as the original + //request, potentially having to switch back if we're no longer executing on the same: + if ( newContext != context ) { + if ( t != null ) { + context.runOnContext( ( v ) -> resultForThisEventLoop.completeExceptionally( t ) ); + } else { + context.runOnContext( ( v ) -> resultForThisEventLoop.complete( id ) ); } - else { - // wait for the concurrent fetch to complete - // note that we carefully capture the right session,entity here! - queue.add( () -> generate( session, entity ).thenAccept( result::complete ) ); + } + else { + if ( t != null ) { + resultForThisEventLoop.completeExceptionally( t ); + } else { + resultForThisEventLoop.complete( id ); } - return result; + } + }); + return resultForThisEventLoop; + } + + private final class GenerateIdAction implements Executor.Action { + + private final ReactiveConnectionSupplier connectionSupplier; + private final CompletableFuture result; + + public GenerateIdAction(ReactiveConnectionSupplier connectionSupplier, CompletableFuture result) { + this.connectionSupplier = Objects.requireNonNull(connectionSupplier); + this.result = Objects.requireNonNull(result); + } + + @Override + public Task execute(GeneratorState state) { + long local = next(); + if ( local >= 0 ) { + // We don't need to update or initialize the hi + // value in the table, so just increment the lo + // value and return the next id in the block + completedFuture( local ) + .whenComplete( this::acceptAsReturnValue ); + return null; + } else { + nextHiValue( connectionSupplier ) + .whenComplete( (newlyGeneratedHi, throwable) -> { + if ( throwable != null ) { + result.completeExceptionally( throwable ); + } else { + //We ignore the state argument as we actually use the field directly + //for convenience, but they are the same object. + executor.submit( stateIgnored -> { + result.complete( next( newlyGeneratedHi ) ); + return null; + }); + } + } ); + return null; + } + } + + private void acceptAsReturnValue(final Long aLong, final Throwable throwable) { + if ( throwable != null ) { + result.completeExceptionally( throwable ); + } + else { + result.complete( aLong ); } } } + } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/ReactiveSequenceIdentifierGenerator.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/ReactiveSequenceIdentifierGenerator.java index ecf5884f7..6331d178b 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/ReactiveSequenceIdentifierGenerator.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/ReactiveSequenceIdentifierGenerator.java @@ -59,8 +59,7 @@ protected int getBlockSize() { @Override protected CompletionStage nextHiValue(ReactiveConnectionSupplier session) { return session.getReactiveConnection() - .selectIdentifier( sql, NO_PARAMS, Long.class ) - .thenApply( this::next ); + .selectIdentifier( sql, NO_PARAMS, Long.class ); } // First one to get called during initialization diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java index 717397e51..8754434e3 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java @@ -11,6 +11,7 @@ import jakarta.persistence.EntityGraph; import jakarta.persistence.FlushModeType; import jakarta.persistence.LockModeType; +import jakarta.persistence.PersistenceException; import jakarta.persistence.criteria.CriteriaDelete; import jakarta.persistence.criteria.CriteriaQuery; import jakarta.persistence.criteria.CriteriaUpdate; @@ -24,6 +25,7 @@ import org.hibernate.reactive.common.AffectedEntities; import org.hibernate.reactive.common.Identifier; import org.hibernate.reactive.common.ResultSetMapping; +import org.hibernate.reactive.engine.spi.ReactiveSharedSessionContractImplementor; import org.hibernate.reactive.logging.impl.Log; import org.hibernate.reactive.logging.impl.LoggerFactory; import org.hibernate.reactive.mutiny.Mutiny; @@ -32,6 +34,8 @@ import org.hibernate.reactive.mutiny.Mutiny.SelectionQuery; import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.query.sqm.iternal.ReactiveQuerySqmImpl; +import org.hibernate.reactive.session.ReactiveConnectionSupplier; +import org.hibernate.reactive.session.ReactiveQueryProducer; import org.hibernate.reactive.session.ReactiveSession; import java.lang.invoke.MethodHandles; @@ -538,4 +542,21 @@ public EntityGraph createEntityGraph(Class rootType) { public EntityGraph createEntityGraph(Class rootType, String graphName) { return delegate.createEntityGraph( rootType, graphName ); } + + public T unwrap(Class clazz) { + if ( ReactiveConnectionSupplier.class.isAssignableFrom( clazz ) ) { + return clazz.cast( this.delegate ); + } + else if ( ReactiveSession.class.isAssignableFrom( clazz ) ) { + return clazz.cast( this.delegate ); + } + else if ( ReactiveQueryProducer.class.isAssignableFrom( clazz ) ) { + return clazz.cast( this.delegate ); + } + else if ( ReactiveSharedSessionContractImplementor.class.isAssignableFrom( clazz ) ) { + return clazz.cast( this.delegate ); + } + throw new PersistenceException( "Cannot unwrap type " + clazz ); + } + } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java index 939f723a2..a4046ce0b 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java @@ -10,6 +10,7 @@ import jakarta.persistence.EntityGraph; import jakarta.persistence.FlushModeType; import jakarta.persistence.LockModeType; +import jakarta.persistence.PersistenceException; import jakarta.persistence.criteria.CriteriaDelete; import jakarta.persistence.criteria.CriteriaQuery; import jakarta.persistence.criteria.CriteriaUpdate; @@ -25,9 +26,12 @@ import org.hibernate.reactive.common.Identifier; import org.hibernate.reactive.common.ResultSetMapping; import org.hibernate.reactive.engine.ReactiveActionQueue; +import org.hibernate.reactive.engine.spi.ReactiveSharedSessionContractImplementor; import org.hibernate.reactive.logging.impl.Log; import org.hibernate.reactive.logging.impl.LoggerFactory; import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.session.ReactiveConnectionSupplier; +import org.hibernate.reactive.session.ReactiveQueryProducer; import org.hibernate.reactive.session.ReactiveSession; import org.hibernate.reactive.stage.Stage; import org.hibernate.reactive.stage.Stage.MutationQuery; @@ -540,4 +544,20 @@ public Query createNativeQuery(String queryString) { public Query createNativeQuery(String queryString, AffectedEntities affectedEntities) { return new StageQueryImpl<>( delegate.createReactiveNativeQuery( queryString, affectedEntities ) ); } + + public T unwrap(Class clazz) { + if ( ReactiveConnectionSupplier.class.isAssignableFrom( clazz ) ) { + return clazz.cast( this.delegate ); + } + else if ( ReactiveSession.class.isAssignableFrom( clazz ) ) { + return clazz.cast( this.delegate ); + } + else if ( ReactiveQueryProducer.class.isAssignableFrom( clazz ) ) { + return clazz.cast( this.delegate ); + } + else if ( ReactiveSharedSessionContractImplementor.class.isAssignableFrom( clazz ) ) { + return clazz.cast( this.delegate ); + } + throw new PersistenceException( "Cannot unwrap type " + clazz ); + } } diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedIdentityGenerationTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedIdentityGenerationTest.java new file mode 100644 index 000000000..c9220cbf8 --- /dev/null +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedIdentityGenerationTest.java @@ -0,0 +1,304 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +import org.hibernate.SessionFactory; +import org.hibernate.boot.registry.StandardServiceRegistry; +import org.hibernate.boot.registry.StandardServiceRegistryBuilder; +import org.hibernate.cfg.Configuration; +import org.hibernate.reactive.id.impl.ReactiveGeneratorWrapper; +import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder; +import org.hibernate.reactive.provider.Settings; +import org.hibernate.reactive.session.ReactiveConnectionSupplier; +import org.hibernate.reactive.session.impl.ReactiveSessionFactoryImpl; +import org.hibernate.reactive.stage.Stage; +import org.hibernate.reactive.stage.impl.StageSessionImpl; +import org.hibernate.reactive.testing.DatabaseSelectionRule; +import org.hibernate.reactive.util.impl.CompletionStages; +import org.hibernate.reactive.vertx.VertxInstance; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.hibernate.reactive.containers.DatabaseConfiguration.DBType.SQLSERVER; + +/** + * This is a multi-threaded stress test, intentionally consuming some time. + * The purpose is to verify that the sequence optimizer used by Hibernate Reactive + * is indeed able to generate unique IDs backed by the database sequences, while + * running multiple operations in different threads and on multiple Vert.x eventloops. + * A typical reactive application will not require multiple threads, but we + * specifically want to test for the case in which the single ID source is being + * shared across multiple threads and eventloops. + */ +@RunWith(VertxUnitRunner.class) +public class MultithreadedIdentityGenerationTest { + + /* The number of threads should be higher than the default size of the connection pool so that + * this test is also effective in detecting problems with resource starvation. + */ + private static final int N_THREADS = 48; + private static final int IDS_GENERATED_PER_THREAD = 10000; + + //Should finish much sooner, but generating this amount of IDs could be slow on some CIs + private static final int TIMEOUT_MINUTES = 10; + + private static final boolean LOG_SQL = false; + private static final Latch startLatch = new Latch( "start", N_THREADS ); + private static final Latch endLatch = new Latch( "end", N_THREADS ); + + private static Stage.SessionFactory stageSessionFactory; + private static Vertx vertx; + private static SessionFactory sessionFactory; + + @Rule // Currently failing for unrelated reasons on SQL Server https://github.com/hibernate/hibernate-reactive/issues/1609 + public DatabaseSelectionRule dbRule = DatabaseSelectionRule.skipTestsFor( SQLSERVER ); + + @BeforeClass + public static void setupSessionFactory() { + final VertxOptions vertxOptions = new VertxOptions(); + vertxOptions.setEventLoopPoolSize( N_THREADS ); + //We relax the blocked thread checks as we'll actually use latches to block them + //intentionally for the purpose of the test; functionally this isn't required + //but it's useful as self-test in the design of this, to ensure that the way + //things are setup are indeed being run in multiple, separate threads. + vertxOptions.setBlockedThreadCheckInterval( TIMEOUT_MINUTES ); + vertxOptions.setBlockedThreadCheckIntervalUnit( TimeUnit.MINUTES ); + vertx = Vertx.vertx( vertxOptions ); + Configuration configuration = new Configuration(); + configuration.addAnnotatedClass( EntityWithGeneratedId.class ); + BaseReactiveTest.setDefaultProperties( configuration ); + configuration.setProperty( Settings.SHOW_SQL, String.valueOf( LOG_SQL ) ); + StandardServiceRegistryBuilder builder = new ReactiveServiceRegistryBuilder() + .applySettings( configuration.getProperties() ) + //Inject our custom vert.x instance: + .addService( VertxInstance.class, () -> vertx ); + StandardServiceRegistry registry = builder.build(); + sessionFactory = configuration.buildSessionFactory( registry ); + stageSessionFactory = sessionFactory.unwrap( Stage.SessionFactory.class ); + } + + @AfterClass + public static void closeSessionFactory() { + stageSessionFactory.close(); + } + + private ReactiveGeneratorWrapper getIdGenerator() { + final ReactiveSessionFactoryImpl hibernateSessionFactory = (ReactiveSessionFactoryImpl) sessionFactory; + final ReactiveGeneratorWrapper identifierGenerator = (ReactiveGeneratorWrapper) hibernateSessionFactory.getIdentifierGenerator( + "org.hibernate.reactive.MultithreadedIdentityGenerationTest$EntityWithGeneratedId" ); + return identifierGenerator; + } + + @Test(timeout = ( 1000 * 60 * 10 ))//10 minutes timeout + public void testIdentityGenerator(TestContext context) { + final Async async = context.async(); + final ReactiveGeneratorWrapper idGenerator = getIdGenerator(); + context.assertNotNull( idGenerator ); + + final DeploymentOptions deploymentOptions = new DeploymentOptions(); + deploymentOptions.setInstances( N_THREADS ); + + ResultsCollector allResults = new ResultsCollector(); + + vertx + .deployVerticle( () -> new IdGenVerticle( idGenerator, allResults ), deploymentOptions ) + .onSuccess( res -> { + endLatch.waitForEveryone(); + if ( allResultsAreUnique( allResults ) ) { + async.complete(); + } + else { + context.fail( "Non unique numbers detected" ); + } + } ) + .onFailure( context::fail ) + .eventually( unused -> vertx.close() ); + } + + private boolean allResultsAreUnique(ResultsCollector allResults) { + //Add 50 per thread to the total amount of generated ids to allow for gaps + //in the hi/lo partitioning (not likely to be necessary) + final int expectedSize = N_THREADS * ( IDS_GENERATED_PER_THREAD + 50 ); + BitSet resultsSeen = new BitSet( expectedSize ); + boolean failed = false; + for ( List partialResult : allResults.resultsByThread.values() ) { + for ( Long aLong : partialResult ) { + final int intValue = aLong.intValue(); + final boolean existing = resultsSeen.get( intValue ); + if ( existing ) { + System.out.println( "Duplicate ID detected: " + intValue ); + failed = true; + } + resultsSeen.set( intValue ); + } + } + return !failed; + } + + private static class IdGenVerticle extends AbstractVerticle { + + private final ReactiveGeneratorWrapper idGenerator; + private final ResultsCollector allResults; + private final ArrayList generatedIds = new ArrayList<>( IDS_GENERATED_PER_THREAD ); + + public IdGenVerticle(ReactiveGeneratorWrapper idGenerator, ResultsCollector allResults) { + this.idGenerator = idGenerator; + this.allResults = allResults; + } + + @Override + public void start(Promise startPromise) { + try { + startLatch.reached(); + startLatch.waitForEveryone();//Not essential, but to ensure a good level of parallelism + final String initialThreadName = Thread.currentThread().getName(); + stageSessionFactory.withSession( + s -> generateMultipleIds( idGenerator, s, generatedIds ) + ) + .whenComplete( (o, throwable) -> { + endLatch.reached(); + if ( throwable != null ) { + startPromise.fail( throwable ); + } + else { + if ( !initialThreadName.equals( Thread.currentThread().getName() ) ) { + startPromise.fail( "Thread switch detected!" ); + } + else { + allResults.deliverResulst( generatedIds ); + startPromise.complete(); + } + } + } ); + } + catch (RuntimeException e) { + startPromise.fail( e ); + } + } + + @Override + public void stop() { + prettyOut( "Verticle stopped " + super.toString() ); + } + } + + private static class ResultsCollector { + + private final ConcurrentMap> resultsByThread = new ConcurrentHashMap<>(); + + public void deliverResulst(List generatedIds) { + final String threadName = Thread.currentThread().getName(); + resultsByThread.put( threadName, generatedIds ); + } + } + + private static CompletionStage generateMultipleIds( + ReactiveGeneratorWrapper idGenerator, + Stage.Session s, + ArrayList collector) { + return CompletionStages.loop( 0, IDS_GENERATED_PER_THREAD, index -> generateIds( idGenerator, s, collector ) ); + } + + private static CompletionStage generateIds( + ReactiveGeneratorWrapper idGenerator, + Stage.Session s, + ArrayList collector) { + final Thread beforeOperationThread = Thread.currentThread(); + return idGenerator.generate( ( (StageSessionImpl) s ) + .unwrap( ReactiveConnectionSupplier.class ), new EntityWithGeneratedId() ) + .thenAccept( o -> { + if ( beforeOperationThread != Thread.currentThread() ) { + throw new IllegalStateException( "Detected an unexpected switch of carrier threads!" ); + } + collector.add( (Long) o ); + } ); + } + + /** + * Trivial entity using a Sequence for Id generation + */ + @Entity + @Table(name="Entity") + private static class EntityWithGeneratedId { + @Id + @GeneratedValue + Long id; + + String name; + + public EntityWithGeneratedId() { + } + } + + /** + * Custom latch which is rather verbose about threads reaching the milestones, to help verifying the design + */ + private static final class Latch { + private final String label; + private final CountDownLatch countDownLatch; + + public Latch(String label, int membersCount) { + this.label = label; + this.countDownLatch = new CountDownLatch( membersCount ); + } + + public void reached() { + final long count = countDownLatch.getCount(); + countDownLatch.countDown(); + prettyOut( "Reached latch '" + label + "', current countdown is " + ( count - 1 ) ); + } + + public void waitForEveryone() { + try { + countDownLatch.await( TIMEOUT_MINUTES, TimeUnit.MINUTES ); + prettyOut( "Everyone has now breached '" + label + "'" ); + } + catch ( InterruptedException e ) { + e.printStackTrace(); + } + } + } + + private static void prettyOut(final String message) { + final String threadName = Thread.currentThread().getName(); + final long l = System.currentTimeMillis(); + final long seconds = ( l / 1000 ) - initialSecond; + //We prefix log messages by seconds since bootstrap; I'm preferring this over millisecond precision + //as it's not very relevant to see exactly how long each stage took (it's actually distracting) + //but it's more useful to group things coarsely when some lock or timeout introduces a significant + //divide between some operations (when a starvation or timeout happens it takes some seconds). + System.out.println( seconds + " - " + threadName + ": " + message ); + } + + private static final long initialSecond = ( System.currentTimeMillis() / 1000 ); + +} diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionTest.java new file mode 100644 index 000000000..aae2a1e37 --- /dev/null +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionTest.java @@ -0,0 +1,248 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive; + +import io.vertx.core.*; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +import org.hibernate.SessionFactory; +import org.hibernate.boot.registry.StandardServiceRegistry; +import org.hibernate.boot.registry.StandardServiceRegistryBuilder; +import org.hibernate.cfg.Configuration; +import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder; +import org.hibernate.reactive.provider.Settings; +import org.hibernate.reactive.stage.Stage; +import org.hibernate.reactive.testing.DatabaseSelectionRule; +import org.hibernate.reactive.util.impl.CompletionStages; +import org.hibernate.reactive.vertx.VertxInstance; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.hibernate.reactive.containers.DatabaseConfiguration.DBType.SQLSERVER; + +/** + * This is a multi-threaded stress test, intentionally consuming some time. + * The purpose is to verify that the sequence optimizer used by Hibernate Reactive + * is indeed able to generate unique IDs backed by the database sequences, while + * running multiple operations in different threads and on multiple Vert.x eventloops. + * This is very similar to MultithreadedIdentityGenerationTest except it models + * the full operations including the insert statements, while the latter focuses + * on the generated IDs to be unique; it's useful to maintain both tests as: + * - ID generation needs to be unique so it's good to stress that aspect + * in isolation + * - insert operations are downstream events, so this allows us to test that + * such downstream events are not being unintentionally duplicated/dropped, + * which could actually happen when the id generator triggers unintended + * threading behaviours. + * + * N.B. We actually had a case in which the IDs were uniquely generated but the + * downstream event was being processed twice (or more) concurrently, so it's + * useful to have both integration tests. + * + * A typical reactive application will not require multiple threads, but we + * specifically want to test for the case in which the single ID source is being + * shared across multiple threads and also multiple eventloops. + */ +@RunWith(VertxUnitRunner.class) +public class MultithreadedInsertionTest { + + @Rule // Currently failing for unrelated reasons on SQL Server https://github.com/hibernate/hibernate-reactive/issues/1609 + public DatabaseSelectionRule dbRule = DatabaseSelectionRule.skipTestsFor( SQLSERVER ); + + /** + * The number of threads should be higher than the default size of the connection pool so that + * this test is also effective in detecting problems with resource starvation. + */ + private static final int N_THREADS = 12; + private static final int ENTITIES_STORED_PER_THREAD = 2000; + + //Should finish much sooner, but generating this amount of IDs could be slow on some CIs + private static final int TIMEOUT_MINUTES = 10; + + private static final boolean LOG_SQL = false; + private static final Latch startLatch = new Latch( "start", N_THREADS ); + private static final Latch endLatch = new Latch( "end", N_THREADS ); + + private static Stage.SessionFactory stageSessionFactory; + private static Vertx vertx; + private static SessionFactory sessionFactory; + + @BeforeClass + public static void setupSessionFactory() { + final VertxOptions vertxOptions = new VertxOptions(); + vertxOptions.setEventLoopPoolSize( N_THREADS ); + //We relax the blocked thread checks as we'll actually use latches to block them + //intentionally for the purpose of the test; functionally this isn't required + //but it's useful as self-test in the design of this, to ensure that the way + //things are setup are indeed being run in multiple, separate threads. + vertxOptions.setBlockedThreadCheckInterval( TIMEOUT_MINUTES ); + vertxOptions.setBlockedThreadCheckIntervalUnit( TimeUnit.MINUTES ); + vertx = Vertx.vertx( vertxOptions ); + Configuration configuration = new Configuration(); + configuration.addAnnotatedClass( EntityWithGeneratedId.class ); + BaseReactiveTest.setDefaultProperties( configuration ); + configuration.setProperty( Settings.SHOW_SQL, String.valueOf( LOG_SQL ) ); + StandardServiceRegistryBuilder builder = new ReactiveServiceRegistryBuilder() + .applySettings( configuration.getProperties() ) + //Inject our custom vert.x instance: + .addService( VertxInstance.class, () -> vertx ); + StandardServiceRegistry registry = builder.build(); + sessionFactory = configuration.buildSessionFactory( registry ); + stageSessionFactory = sessionFactory.unwrap( Stage.SessionFactory.class ); + } + + @AfterClass + public static void closeSessionFactory() { + stageSessionFactory.close(); + } + + @Test(timeout = ( 1000 * 60 * 10 ))//10 minutes timeout + public void testIdentityGenerator(TestContext context) { + final Async async = context.async(); + + final DeploymentOptions deploymentOptions = new DeploymentOptions(); + deploymentOptions.setInstances( N_THREADS ); + + vertx + .deployVerticle( () -> new InsertEntitiesVerticle(), deploymentOptions ) + .onSuccess( res -> { + endLatch.waitForEveryone(); + async.complete(); + } ) + .onFailure( context::fail ) + .eventually( unused -> vertx.close() ); + } + + private static class InsertEntitiesVerticle extends AbstractVerticle { + + int sequentialOperation = 0; + + public InsertEntitiesVerticle() { + } + + @Override + public void start(Promise startPromise) { + startLatch.reached(); + startLatch.waitForEveryone();//Not essential, but to ensure a good level of parallelism + final String initialThreadName = Thread.currentThread().getName(); + stageSessionFactory.withSession( + s -> storeMultipleEntities( s ) + ) + .whenComplete( (o, throwable) -> { + endLatch.reached(); + if ( throwable != null ) { + startPromise.fail( throwable ); + } + else { + if ( !initialThreadName.equals( Thread.currentThread().getName() ) ) { + startPromise.fail( "Thread switch detected!" ); + } + else { + startPromise.complete(); + } + } + } ); + } + + private CompletionStage storeMultipleEntities( Stage.Session s) { + return CompletionStages.loop( 0, ENTITIES_STORED_PER_THREAD, index -> storeEntity( s ) ); + } + + private CompletionStage storeEntity(Stage.Session s) { + final Thread beforeOperationThread = Thread.currentThread(); + final int localVerticleOperationSequence = sequentialOperation++; + final EntityWithGeneratedId entity = new EntityWithGeneratedId(); + entity.name = beforeOperationThread + "__" + localVerticleOperationSequence; + + return s.persist( entity ) + .thenCompose( v -> s.flush() ) + .thenAccept( v -> { + s.clear(); + if ( beforeOperationThread != Thread.currentThread() ) { + throw new IllegalStateException( "Detected an unexpected switch of carrier threads!" ); + } + }); + } + + @Override + public void stop() { + prettyOut( "Verticle stopped " + super.toString() ); + } + } + + + /** + * Trivial entity using a Sequence for Id generation + */ + @Entity + @Table(name="Entity") + private static class EntityWithGeneratedId { + @Id + @GeneratedValue + Long id; + + String name; + + public EntityWithGeneratedId() { + } + } + + /** + * Custom latch which is rather verbose about threads reaching the milestones, to help verifying the design + */ + private static final class Latch { + private final String label; + private final CountDownLatch countDownLatch; + + public Latch(String label, int membersCount) { + this.label = label; + this.countDownLatch = new CountDownLatch( membersCount ); + } + + public void reached() { + final long count = countDownLatch.getCount(); + countDownLatch.countDown(); + prettyOut( "Reached latch '" + label + "', current countdown is " + ( count - 1 ) ); + } + + public void waitForEveryone() { + try { + countDownLatch.await( TIMEOUT_MINUTES, TimeUnit.MINUTES ); + prettyOut( "Everyone has now breached '" + label + "'" ); + } + catch ( InterruptedException e ) { + e.printStackTrace(); + } + } + } + + private static void prettyOut(final String message) { + final String threadName = Thread.currentThread().getName(); + final long l = System.currentTimeMillis(); + final long seconds = ( l / 1000 ) - initialSecond; + //We prefix log messages by seconds since bootstrap; I'm preferring this over millisecond precision + //as it's not very relevant to see exactly how long each stage took (it's actually distracting) + //but it's more useful to group things coarsely when some lock or timeout introduces a significant + //divide between some operations (when a starvation or timeout happens it takes some seconds). + System.out.println( seconds + " - " + threadName + ": " + message ); + } + + private static final long initialSecond = ( System.currentTimeMillis() / 1000 ); + +}