Skip to content

Commit 02c49b0

Browse files
committed
When virtual threads are enabled, configure Pulsar to use them
Closes gh-36347
1 parent 8b115c8 commit 02c49b0

File tree

2 files changed

+56
-2
lines changed

2 files changed

+56
-2
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,13 @@
3131
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
3232
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
3333
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
34+
import org.springframework.boot.autoconfigure.thread.Threading;
3435
import org.springframework.boot.util.LambdaSafe;
3536
import org.springframework.context.annotation.Bean;
3637
import org.springframework.context.annotation.Configuration;
3738
import org.springframework.context.annotation.Import;
39+
import org.springframework.core.env.Environment;
40+
import org.springframework.core.task.VirtualThreadTaskExecutor;
3841
import org.springframework.pulsar.annotation.EnablePulsar;
3942
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
4043
import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory;
@@ -149,10 +152,13 @@ private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>>
149152
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
150153
ConcurrentPulsarListenerContainerFactory<Object> pulsarListenerContainerFactory(
151154
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
152-
TopicResolver topicResolver) {
155+
TopicResolver topicResolver, Environment environment) {
153156
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
154157
containerProperties.setSchemaResolver(schemaResolver);
155158
containerProperties.setTopicResolver(topicResolver);
159+
if (Threading.VIRTUAL.isActive(environment)) {
160+
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor());
161+
}
156162
this.propertiesMapper.customizeContainerProperties(containerProperties);
157163
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
158164
}
@@ -178,9 +184,12 @@ private void applyReaderBuilderCustomizers(List<ReaderBuilderCustomizer<?>> cust
178184
@Bean
179185
@ConditionalOnMissingBean(name = "pulsarReaderContainerFactory")
180186
DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReaderFactory<?> pulsarReaderFactory,
181-
SchemaResolver schemaResolver) {
187+
SchemaResolver schemaResolver, Environment environment) {
182188
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
183189
readerContainerProperties.setSchemaResolver(schemaResolver);
190+
if (Threading.VIRTUAL.isActive(environment)) {
191+
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor());
192+
}
184193
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
185194
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
186195
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java

+45
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.pulsar.common.schema.SchemaType;
3030
import org.junit.jupiter.api.Nested;
3131
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.condition.EnabledForJreRange;
33+
import org.junit.jupiter.api.condition.JRE;
3234
import org.junit.jupiter.params.ParameterizedTest;
3335
import org.junit.jupiter.params.provider.ValueSource;
3436

@@ -39,6 +41,7 @@
3941
import org.springframework.context.annotation.Bean;
4042
import org.springframework.context.annotation.Configuration;
4143
import org.springframework.core.annotation.Order;
44+
import org.springframework.core.task.VirtualThreadTaskExecutor;
4245
import org.springframework.pulsar.annotation.PulsarBootstrapConfiguration;
4346
import org.springframework.pulsar.annotation.PulsarListenerAnnotationBeanPostProcessor;
4447
import org.springframework.pulsar.annotation.PulsarReaderAnnotationBeanPostProcessor;
@@ -464,6 +467,27 @@ void whenObservationsDisabledDoesNotEnableObservation() {
464467
.hasFieldOrPropertyWithValue("containerProperties.observationEnabled", false));
465468
}
466469

470+
@Test
471+
@EnabledForJreRange(min = JRE.JAVA_21)
472+
void whenVirtualThreadsAreEnabledOnJava21AndLaterListenerContainerShouldUseVirtualThreads() {
473+
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
474+
ConcurrentPulsarListenerContainerFactory<?> factory = context
475+
.getBean(ConcurrentPulsarListenerContainerFactory.class);
476+
assertThat(factory.getContainerProperties().getConsumerTaskExecutor())
477+
.isInstanceOf(VirtualThreadTaskExecutor.class);
478+
});
479+
}
480+
481+
@Test
482+
@EnabledForJreRange(max = JRE.JAVA_20)
483+
void whenVirtualThreadsAreEnabledOnJava20AndEarlierListenerContainerShouldNotUseVirtualThreads() {
484+
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
485+
ConcurrentPulsarListenerContainerFactory<?> factory = context
486+
.getBean(ConcurrentPulsarListenerContainerFactory.class);
487+
assertThat(factory.getContainerProperties().getConsumerTaskExecutor()).isNull();
488+
});
489+
}
490+
467491
}
468492

469493
@Nested
@@ -498,6 +522,27 @@ <T> void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
498522
});
499523
}
500524

525+
@Test
526+
@EnabledForJreRange(min = JRE.JAVA_21)
527+
void whenVirtualThreadsAreEnabledOnJava21AndLaterReaderShouldUseVirtualThreads() {
528+
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
529+
DefaultPulsarReaderContainerFactory<?> factory = context
530+
.getBean(DefaultPulsarReaderContainerFactory.class);
531+
assertThat(factory.getContainerProperties().getReaderTaskExecutor())
532+
.isInstanceOf(VirtualThreadTaskExecutor.class);
533+
});
534+
}
535+
536+
@Test
537+
@EnabledForJreRange(max = JRE.JAVA_20)
538+
void whenVirtualThreadsAreEnabledOnJava20AndEarlierReaderShouldNotUseVirtualThreads() {
539+
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
540+
DefaultPulsarReaderContainerFactory<?> factory = context
541+
.getBean(DefaultPulsarReaderContainerFactory.class);
542+
assertThat(factory.getContainerProperties().getReaderTaskExecutor()).isNull();
543+
});
544+
}
545+
501546
@TestConfiguration(proxyBeanMethods = false)
502547
static class ReaderBuilderCustomizersConfig {
503548

0 commit comments

Comments
 (0)