diff --git a/framework-docs/modules/ROOT/pages/integration/scheduling.adoc b/framework-docs/modules/ROOT/pages/integration/scheduling.adoc index eae7ff2e7045..e8f723af0c7b 100644 --- a/framework-docs/modules/ROOT/pages/integration/scheduling.adoc +++ b/framework-docs/modules/ROOT/pages/integration/scheduling.adoc @@ -393,6 +393,117 @@ container and once through the `@Configurable` aspect), with the consequence of `@Scheduled` method being invoked twice. ==== +[[scheduling-annotation-support-scheduled-reactive]] +=== The `@Scheduled` annotation on Reactive methods or Kotlin suspending functions + +As of Spring Framework 6.1, `@Scheduled` methods are also supported on several types +of reactive methods: + + - methods with a `Publisher` return type (or any concrete implementation of `Publisher`) +like in the following example: + +[source,java,indent=0,subs="verbatim,quotes"] +---- + @Scheduled(fixedDelay = 500) + public Publisher reactiveSomething() { + // return an instance of Publisher + } +---- + + - methods with a return type that can be adapted to `Publisher` via the shared instance +of the `ReactiveAdapterRegistry`, provided the type supports _deferred subscription_ like +in the following example: + +[source,java,indent=0,subs="verbatim,quotes"] +---- + @Scheduled(fixedDelay = 500) + public Single rxjavaNonPublisher() { + return Single.just("example"); + } +---- + +[NOTE] +==== +The `CompletableFuture` class is an example of a type that can typically be adapted +to `Publisher` but doesn't support deferred subscription. Its `ReactiveAdapter` in the +registry denotes that by having the `getDescriptor().isDeferred()` method return `false`. +==== + + + - Kotlin suspending functions, like in the following example: + +[source,kotlin,indent=0,subs="verbatim,quotes"] +---- + @Scheduled(fixedDelay = 500) + suspend fun something() { + // do something asynchronous + } +---- + + - methods that return a Kotlin `Flow` or `Deferred` instance, like in the following example: + +[source,kotlin,indent=0,subs="verbatim,quotes"] +---- + @Scheduled(fixedDelay = 500) + fun something(): Flow { + flow { + // do something asynchronous + } + } +---- + +All these types of methods must be declared without any arguments. In the case of Kotlin +suspending functions the `kotlinx.coroutines.reactor` bridge must also be present to allow +the framework to invoke a suspending function as a `Publisher`. + +The Spring Framework will obtain a `Publisher` out of the annotated method once and will +schedule a `Runnable` in which it subscribes to said `Publisher`. These inner regular +subscriptions happen according to the `cron`/fixedDelay`/`fixedRate` configuration. + +If the `Publisher` emits `onNext` signal(s), these are ignored and discarded (the same way +return values from synchronous `@Scheduled` methods are ignored). + +In the following example, the `Flux` emits `onNext("Hello"), onNext("World")` every 5 +seconds, but these values are unused: + +[source,java,indent=0,subs="verbatim,quotes"] +---- + @Scheduled(initialDelay = 5000, fixedRate = 5000) + public Flux reactiveSomething() { + return Flux.just("Hello", "World"); + } +---- + +If the `Publisher` emits an `onError` signal, it is logged at WARN level and recovered. +As a result, further scheduled subscription do happen despite the error. + +In the following example, the `Mono` subscription fails twice in the first five seconds +then subscriptions start succeeding, printing a message to the standard output every five +seconds: + +[source,java,indent=0,subs="verbatim,quotes"] +---- + @Scheduled(initialDelay = 0, fixedRate = 5000) + public Mono reactiveSomething() { + AtomicInteger countdown = new AtomicInteger(2); + + return Mono.defer(() -> { + if (countDown.get() == 0 || countDown.decrementAndGet() == 0) { + return Mono.fromRunnable(() -> System.out.println("Message")); + } + return Mono.error(new IllegalStateException("Cannot deliver message")); + }) + } +---- + +[NOTE] +==== +When destroying the annotated bean or closing the application context Spring Framework cancels +scheduled tasks, which includes the next scheduled subscription to the `Publisher` as well +as any past subscription that is still currently active (e.g. for long-running publishers, +or even infinite publishers). +==== + [[scheduling-annotation-support-async]] === The `@Async` annotation diff --git a/spring-context/spring-context.gradle b/spring-context/spring-context.gradle index 754bfcb72fe4..333ec1dd2ad8 100644 --- a/spring-context/spring-context.gradle +++ b/spring-context/spring-context.gradle @@ -27,6 +27,7 @@ dependencies { optional("org.jetbrains.kotlin:kotlin-reflect") optional("org.jetbrains.kotlin:kotlin-stdlib") optional("org.reactivestreams:reactive-streams") + optional("io.projectreactor:reactor-core") testImplementation(project(":spring-core-test")) testImplementation(testFixtures(project(":spring-aop"))) testImplementation(testFixtures(project(":spring-beans"))) @@ -38,6 +39,8 @@ dependencies { testImplementation("org.awaitility:awaitility") testImplementation("jakarta.inject:jakarta.inject-tck") testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core") + testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") + testImplementation("io.reactivex.rxjava3:rxjava") testRuntimeOnly("jakarta.xml.bind:jakarta.xml.bind-api") testRuntimeOnly("org.glassfish:jakarta.el") // Substitute for javax.management:jmxremote_optional:1.0.1_04 (not available on Maven Central) diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/Scheduled.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/Scheduled.java index 1b4cca077887..5f2fe23b0a3a 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/annotation/Scheduled.java +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/Scheduled.java @@ -36,6 +36,20 @@ * a {@code void} return type; if not, the returned value will be ignored * when called through the scheduler. * + *

Methods that return a reactive {@code Publisher} or a type which can be adapted + * to {@code Publisher} by the default {@code ReactiveAdapterRegistry} are supported. + * The {@code Publisher} MUST support multiple subsequent subscriptions (i.e. be cold). + * The returned Publisher is only produced once, and the scheduling infrastructure + * then periodically {@code subscribe()} to it according to configuration. + * Values emitted by the publisher are ignored. Errors are logged at WARN level, which + * doesn't prevent further iterations. If a {@code fixed delay} is configured, the + * subscription is blocked upon in order to respect the fixed delay semantics. + * + *

Kotlin suspending functions are also supported, provided the coroutine-reactor + * bridge ({@code kotlinx.coroutine.reactor}) is present at runtime. This bridge is + * used to adapt the suspending function into a {@code Publisher} which is treated + * the same way as in the reactive method case (see above). + * *

Processing of {@code @Scheduled} annotations is performed by * registering a {@link ScheduledAnnotationBeanPostProcessor}. This can be * done manually or, more conveniently, through the {@code } diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java index 14272d35e776..01a5244d8459 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -98,6 +99,7 @@ * @author Elizabeth Chatman * @author Victor Brown * @author Sam Brannen + * @author Simon Baslé * @since 3.0 * @see Scheduled * @see EnableScheduling @@ -143,6 +145,8 @@ public class ScheduledAnnotationBeanPostProcessor private final Map> scheduledTasks = new IdentityHashMap<>(16); + private final Map> reactiveSubscriptions = new IdentityHashMap<>(16); + /** * Create a default {@code ScheduledAnnotationBeanPostProcessor}. @@ -385,15 +389,33 @@ public Object postProcessAfterInitialization(Object bean, String beanName) { } /** - * Process the given {@code @Scheduled} method declaration on the given bean. + * Process the given {@code @Scheduled} method declaration on the given bean, + * attempting to distinguish {@link #processScheduledAsync(Scheduled, Method, Object) reactive} + * methods from {@link #processScheduledSync(Scheduled, Method, Object) synchronous} methods. * @param scheduled the {@code @Scheduled} annotation * @param method the method that the annotation has been declared on * @param bean the target bean instance - * @see #createRunnable(Object, Method) + * @see #processScheduledSync(Scheduled, Method, Object) + * @see #processScheduledAsync(Scheduled, Method, Object) */ protected void processScheduled(Scheduled scheduled, Method method, Object bean) { + // Is method a Kotlin suspending function? Throws if true but reactor bridge isn't on the classpath. + // Is method returning a reactive type? Throws if true, but it isn't a deferred Publisher type. + if (ScheduledAnnotationReactiveSupport.isReactive(method)) { + processScheduledAsync(scheduled, method, bean); + return; + } + processScheduledSync(scheduled, method, bean); + } + + /** + * Parse the {@code Scheduled} annotation and schedule the provided {@code Runnable} + * accordingly. The Runnable can represent either a synchronous method invocation + * (see {@link #processScheduledSync(Scheduled, Method, Object)}) or an asynchronous + * one (see {@link #processScheduledAsync(Scheduled, Method, Object)}). + */ + protected void processScheduledTask(Scheduled scheduled, Runnable runnable, Method method, Object bean) { try { - Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; @@ -516,6 +538,53 @@ protected void processScheduled(Scheduled scheduled, Method method, Object bean) } } + /** + * Process the given {@code @Scheduled} method declaration on the given bean, + * as a synchronous method. The method MUST take no arguments. Its return value + * is ignored (if any) and the scheduled invocations of the method take place + * using the underlying {@link TaskScheduler} infrastructure. + * @param scheduled the {@code @Scheduled} annotation + * @param method the method that the annotation has been declared on + * @param bean the target bean instance + * @see #createRunnable(Object, Method) + */ + protected void processScheduledSync(Scheduled scheduled, Method method, Object bean) { + Runnable task; + try { + task = createRunnable(bean, method); + } + catch (IllegalArgumentException ex) { + throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage()); + } + processScheduledTask(scheduled, task, method, bean); + } + + /** + * Process the given {@code @Scheduled} bean method declaration which returns + * a {@code Publisher}, or the given Kotlin suspending function converted to a + * Publisher. A {@code Runnable} which subscribes to that publisher is then repeatedly + * scheduled according to the annotation configuration. + *

Note that for fixed delay configuration, the subscription is turned into a blocking + * call instead. Types for which a {@code ReactiveAdapter} is registered but which cannot + * be deferred (i.e. not a {@code Publisher}) are not supported. + * @param scheduled the {@code @Scheduled} annotation + * @param method the method that the annotation has been declared on, which + * MUST either return a Publisher-adaptable type or be a Kotlin suspending function + * @param bean the target bean instance + * @see ScheduledAnnotationReactiveSupport + */ + protected void processScheduledAsync(Scheduled scheduled, Method method, Object bean) { + Runnable task; + try { + task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, scheduled, + this.reactiveSubscriptions.computeIfAbsent(bean, k -> new CopyOnWriteArrayList<>())); + } + catch (IllegalArgumentException ex) { + throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage()); + } + processScheduledTask(scheduled, task, method, bean); + } + /** * Create a {@link Runnable} for the given bean instance, * calling the specified scheduled method. @@ -554,6 +623,8 @@ private static boolean isP(char ch) { /** * Return all currently scheduled tasks, from {@link Scheduled} methods * as well as from programmatic {@link SchedulingConfigurer} interaction. + *

Note this includes upcoming scheduled subscriptions for reactive methods, + * but doesn't cover any currently active subscription for such methods. * @since 5.0.2 */ @Override @@ -572,20 +643,27 @@ public Set getScheduledTasks() { @Override public void postProcessBeforeDestruction(Object bean, String beanName) { Set tasks; + List liveSubscriptions; synchronized (this.scheduledTasks) { tasks = this.scheduledTasks.remove(bean); + liveSubscriptions = this.reactiveSubscriptions.remove(bean); } if (tasks != null) { for (ScheduledTask task : tasks) { task.cancel(); } } + if (liveSubscriptions != null) { + for (Runnable subscription : liveSubscriptions) { + subscription.run(); // equivalent to cancelling the subscription + } + } } @Override public boolean requiresDestruction(Object bean) { synchronized (this.scheduledTasks) { - return this.scheduledTasks.containsKey(bean); + return this.scheduledTasks.containsKey(bean) || this.reactiveSubscriptions.containsKey(bean); } } @@ -599,6 +677,12 @@ public void destroy() { } } this.scheduledTasks.clear(); + Collection> allLiveSubscriptions = this.reactiveSubscriptions.values(); + for (List liveSubscriptions : allLiveSubscriptions) { + for (Runnable liveSubscription : liveSubscriptions) { + liveSubscription.run(); //equivalent to cancelling the subscription + } + } } this.registrar.destroy(); } diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java new file mode 100644 index 000000000000..f5998132d319 --- /dev/null +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java @@ -0,0 +1,268 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.scheduling.annotation; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.publisher.Flux; + +import org.springframework.aop.support.AopUtils; +import org.springframework.core.CoroutinesUtils; +import org.springframework.core.KotlinDetector; +import org.springframework.core.ReactiveAdapter; +import org.springframework.core.ReactiveAdapterRegistry; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.ReflectionUtils; +import org.springframework.util.StringUtils; + +/** + * Helper class for @{@link ScheduledAnnotationBeanPostProcessor} to support reactive cases + * without a dependency on optional classes. + * @author Simon Baslé + * @since 6.1.0 + */ +abstract class ScheduledAnnotationReactiveSupport { + + static final boolean reactorPresent = ClassUtils.isPresent( + "reactor.core.publisher.Flux", ScheduledAnnotationReactiveSupport.class.getClassLoader()); + + static final boolean coroutinesReactorPresent = ClassUtils.isPresent( + "kotlinx.coroutines.reactor.MonoKt", ScheduledAnnotationReactiveSupport.class.getClassLoader()); + + private static final Log LOGGER = LogFactory.getLog(ScheduledAnnotationReactiveSupport.class); + + /** + * Checks that if the method is reactive, it can be scheduled. Methods are considered + * eligible for reactive scheduling if they either return an instance of a type that + * can be converted to {@code Publisher} or are a Kotlin Suspending Function. + * If the method isn't matching these criteria then this check returns {@code false}. + *

For scheduling of Kotlin Suspending Functions, the Coroutine-Reactor bridge + * {@code kotlinx.coroutines.reactor} MUST be present at runtime (in order to invoke + * suspending functions as a {@code Publisher}). + * Provided that is the case, this method returns {@code true}. Otherwise, it throws + * an {@code IllegalStateException}. + * @throws IllegalStateException if the method is reactive but Reactor and/or the + * Kotlin coroutines bridge are not present at runtime + */ + static boolean isReactive(Method method) { + if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) { + //Note that suspending functions declared without args have a single Continuation parameter in reflective inspection + Assert.isTrue(method.getParameterCount() == 1,"Kotlin suspending functions may only be" + + " annotated with @Scheduled if declared without arguments"); + Assert.isTrue(coroutinesReactorPresent, "Kotlin suspending functions may only be annotated with" + + " @Scheduled if the Coroutine-Reactor bridge (kotlinx.coroutines.reactor) is present at runtime"); + return true; + } + ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance(); + if (!registry.hasAdapters()) { + return false; + } + Class returnType = method.getReturnType(); + ReactiveAdapter candidateAdapter = registry.getAdapter(returnType); + if (candidateAdapter == null) { + return false; + } + Assert.isTrue(method.getParameterCount() == 0, "Reactive methods may only be annotated with" + + " @Scheduled if declared without arguments"); + Assert.isTrue(candidateAdapter.getDescriptor().isDeferred(), "Reactive methods may only be annotated with" + + " @Scheduled if the return type supports deferred execution"); + return true; + } + + /** + * Turn the invocation of the provided {@code Method} into a {@code Publisher}, + * either by reflectively invoking it and converting the result to a {@code Publisher} + * via {@link ReactiveAdapterRegistry} or by converting a Kotlin suspending function + * into a {@code Publisher} via {@link CoroutinesUtils}. + * The {@link #isReactive(Method)} check is a precondition to calling this method. + * If Reactor is present at runtime, the Publisher is additionally converted to a {@code Flux} + * with a checkpoint String, allowing for better debugging. + */ + static Publisher getPublisherFor(Method method, Object bean) { + if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) { + return CoroutinesUtils.invokeSuspendingFunction(method, bean, (Object[]) method.getParameters()); + } + + ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance(); + Class returnType = method.getReturnType(); + ReactiveAdapter adapter = registry.getAdapter(returnType); + if (adapter == null) { + throw new IllegalArgumentException("Cannot convert the @Scheduled reactive method return type to Publisher"); + } + if (!adapter.getDescriptor().isDeferred()) { + throw new IllegalArgumentException("Cannot convert the @Scheduled reactive method return type to Publisher, " + + returnType.getSimpleName() + " is not a deferred reactive type"); + } + Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass()); + try { + ReflectionUtils.makeAccessible(invocableMethod); + Object r = invocableMethod.invoke(bean); + + Publisher publisher = adapter.toPublisher(r); + //if Reactor is on the classpath, we could benefit from having a checkpoint for debuggability + if (reactorPresent) { + final String checkpoint = "@Scheduled '"+ method.getName() + "()' in bean '" + + method.getDeclaringClass().getName() + "'"; + return Flux.from(publisher).checkpoint(checkpoint); + } + else { + return publisher; + } + } + catch (InvocationTargetException ex) { + throw new IllegalArgumentException("Cannot obtain a Publisher-convertible value from the @Scheduled reactive method", ex.getTargetException()); + } + catch (IllegalAccessException ex) { + throw new IllegalArgumentException("Cannot obtain a Publisher-convertible value from the @Scheduled reactive method", ex); + } + } + + /** + * Create a {@link Runnable} for the Scheduled infrastructure, allowing for scheduled + * subscription to the publisher produced by a reactive method. + *

Note that the reactive method is invoked once, but the resulting {@code Publisher} + * is subscribed to repeatedly, once per each invocation of the {@code Runnable}. + *

In the case of a {@code fixed delay} configuration, the subscription inside the + * Runnable is turned into a blocking call in order to maintain fixedDelay semantics + * (i.e. the task blocks until completion of the Publisher, then the delay is applied + * until next iteration). + */ + static Runnable createSubscriptionRunnable(Method method, Object targetBean, Scheduled scheduled, + List subscriptionTrackerRegistry) { + boolean shouldBlock = scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString()); + final Publisher publisher = getPublisherFor(method, targetBean); + return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry); + } + + /** + * Utility implementation of {@code Runnable} that subscribes to a {@code Publisher} + * or subscribes-then-blocks if {@code shouldBlock} is set to {@code true}. + */ + static final class SubscribingRunnable implements Runnable { + + final Publisher publisher; + final boolean shouldBlock; + final List subscriptionTrackerRegistry; + + SubscribingRunnable(Publisher publisher, boolean shouldBlock, List subscriptionTrackerRegistry) { + this.publisher = publisher; + this.shouldBlock = shouldBlock; + this.subscriptionTrackerRegistry = subscriptionTrackerRegistry; + } + + @Override + public void run() { + if (this.shouldBlock) { + final CountDownLatch latch = new CountDownLatch(1); + TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry, latch); + this.subscriptionTrackerRegistry.add(subscriber); + this.publisher.subscribe(subscriber); + try { + latch.await(); + } + catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + else { + final TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry); + this.subscriptionTrackerRegistry.add(subscriber); + this.publisher.subscribe(subscriber); + } + } + } + + /** + * A {@code Subscriber} which keeps track of its {@code Subscription} and exposes the + * capacity to cancel the subscription as a {@code Runnable}. Can optionally support + * blocking if a {@code CountDownLatch} is passed at construction. + */ + private static final class TrackingSubscriber implements Subscriber, Runnable { + + private final List subscriptionTrackerRegistry; + + @Nullable + private final CountDownLatch blockingLatch; + + /* + Implementation note: since this is created last minute when subscribing, + there shouldn't be a way to cancel the tracker externally from the + ScheduledAnnotationBeanProcessor before the #setSubscription(Subscription) + method is called. + */ + @Nullable + private Subscription s; + + TrackingSubscriber(List subscriptionTrackerRegistry) { + this(subscriptionTrackerRegistry, null); + } + + TrackingSubscriber(List subscriptionTrackerRegistry, @Nullable CountDownLatch latch) { + this.subscriptionTrackerRegistry = subscriptionTrackerRegistry; + this.blockingLatch = latch; + } + + @Override + public void run() { + if (this.s != null) { + this.s.cancel(); + } + if (this.blockingLatch != null) { + this.blockingLatch.countDown(); + } + } + + @Override + public void onSubscribe(Subscription s) { + this.s = s; + s.request(Integer.MAX_VALUE); + } + + @Override + public void onNext(Object o) { + // NO-OP + } + + @Override + public void onError(Throwable ex) { + this.subscriptionTrackerRegistry.remove(this); + LOGGER.warn("Unexpected error occurred in scheduled reactive task", ex); + if (this.blockingLatch != null) { + this.blockingLatch.countDown(); + } + } + + @Override + public void onComplete() { + this.subscriptionTrackerRegistry.remove(this); + if (this.blockingLatch != null) { + this.blockingLatch.countDown(); + } + } + } + +} diff --git a/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupportTests.java b/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupportTests.java new file mode 100644 index 000000000000..d5357c35174f --- /dev/null +++ b/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupportTests.java @@ -0,0 +1,238 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.scheduling.annotation; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.util.ReflectionUtils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.createSubscriptionRunnable; +import static org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.getPublisherFor; +import static org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.isReactive; + +class ScheduledAnnotationReactiveSupportTests { + + @Test + void ensureReactor() { + assertThat(ScheduledAnnotationReactiveSupport.reactorPresent).isTrue(); + } + + @ParameterizedTest + @ValueSource(strings = { "mono", "flux", "monoString", "fluxString", "publisherMono", + "publisherString", "monoThrows", "flowable", "completable" }) //note: monoWithParams can't be found by this test + void checkIsReactive(String method) { + Method m = ReflectionUtils.findMethod(ReactiveMethods.class, method); + assertThat(isReactive(m)).as(m.getName()).isTrue(); + } + + @Test + void checkNotReactive() { + Method string = ReflectionUtils.findMethod(ReactiveMethods.class, "oops"); + + assertThat(isReactive(string)) + .as("String-returning").isFalse(); + } + + @Test + void rejectReactiveAdaptableButNotDeferred() { + Method future = ReflectionUtils.findMethod(ReactiveMethods.class, "future"); + + assertThatIllegalArgumentException().isThrownBy(() -> isReactive(future)) + .withMessage("Reactive methods may only be annotated with @Scheduled if the return type supports deferred execution"); + } + + @Test + void isReactiveRejectsWithParams() { + Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "monoWithParam", String.class); + + //isReactive rejects with context + assertThatIllegalArgumentException().isThrownBy(() -> isReactive(m)) + .withMessage("Reactive methods may only be annotated with @Scheduled if declared without arguments") + .withNoCause(); + } + + @Test + void rejectCantProducePublisher() { + ReactiveMethods target = new ReactiveMethods(); + Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "monoThrows"); + + //static helper method + assertThatIllegalArgumentException().isThrownBy(() -> getPublisherFor(m, target)) + .withMessage("Cannot obtain a Publisher-convertible value from the @Scheduled reactive method") + .withCause(new IllegalStateException("expected")); + } + + @Test + void rejectCantAccessMethod() { + ReactiveMethods target = new ReactiveMethods(); + Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "monoThrowsIllegalAccess"); + + //static helper method + assertThatIllegalArgumentException().isThrownBy(() -> getPublisherFor(m, target)) + .withMessage("Cannot obtain a Publisher-convertible value from the @Scheduled reactive method") + .withCause(new IllegalAccessException("expected")); + } + + @Test + void fixedDelayIsBlocking() { + ReactiveMethods target = new ReactiveMethods(); + Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "mono"); + Scheduled fixedDelayString = AnnotationUtils.synthesizeAnnotation(Map.of("fixedDelayString", "123"), Scheduled.class, null); + Scheduled fixedDelayLong = AnnotationUtils.synthesizeAnnotation(Map.of("fixedDelay", 123L), Scheduled.class, null); + List tracker = new ArrayList<>(); + + assertThat(createSubscriptionRunnable(m, target, fixedDelayString, tracker)) + .isInstanceOfSatisfying(ScheduledAnnotationReactiveSupport.SubscribingRunnable.class, sr -> + assertThat(sr.shouldBlock).as("fixedDelayString.shouldBlock").isTrue() + ); + + assertThat(createSubscriptionRunnable(m, target, fixedDelayLong, tracker)) + .isInstanceOfSatisfying(ScheduledAnnotationReactiveSupport.SubscribingRunnable.class, sr -> + assertThat(sr.shouldBlock).as("fixedDelayLong.shouldBlock").isTrue() + ); + } + + @Test + void fixedRateIsNotBlocking() { + ReactiveMethods target = new ReactiveMethods(); + Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "mono"); + Scheduled fixedRateString = AnnotationUtils.synthesizeAnnotation(Map.of("fixedRateString", "123"), Scheduled.class, null); + Scheduled fixedRateLong = AnnotationUtils.synthesizeAnnotation(Map.of("fixedRate", 123L), Scheduled.class, null); + List tracker = new ArrayList<>(); + + assertThat(createSubscriptionRunnable(m, target, fixedRateString, tracker)) + .isInstanceOfSatisfying(ScheduledAnnotationReactiveSupport.SubscribingRunnable.class, sr -> + assertThat(sr.shouldBlock).as("fixedRateString.shouldBlock").isFalse() + ); + + assertThat(createSubscriptionRunnable(m, target, fixedRateLong, tracker)) + .isInstanceOfSatisfying(ScheduledAnnotationReactiveSupport.SubscribingRunnable.class, sr -> + assertThat(sr.shouldBlock).as("fixedRateLong.shouldBlock").isFalse() + ); + } + + @Test + void cronIsNotBlocking() { + ReactiveMethods target = new ReactiveMethods(); + Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "mono"); + Scheduled cron = AnnotationUtils.synthesizeAnnotation(Map.of("cron", "-"), Scheduled.class, null); + List tracker = new ArrayList<>(); + + assertThat(createSubscriptionRunnable(m, target, cron, tracker)) + .isInstanceOfSatisfying(ScheduledAnnotationReactiveSupport.SubscribingRunnable.class, sr -> + assertThat(sr.shouldBlock).as("cron.shouldBlock").isFalse() + ); + } + + @Test + void hasCheckpointToString() { + ReactiveMethods target = new ReactiveMethods(); + Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "mono"); + Publisher p = getPublisherFor(m, target); + + assertThat(p.getClass().getName()) + .as("checkpoint class") + .isEqualTo("reactor.core.publisher.FluxOnAssembly"); + + assertThat(p).hasToString("checkpoint(\"@Scheduled 'mono()' in bean 'org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupportTests$ReactiveMethods'\")"); + } + + static class ReactiveMethods { + + public String oops() { + return "oops"; + } + + public Mono mono() { + return Mono.empty(); + } + + public Flux flux() { + return Flux.empty(); + } + + public Mono monoString() { + return Mono.just("example"); + } + + public Flux fluxString() { + return Flux.just("example"); + } + + public Publisher publisherMono() { + return Mono.empty(); + } + + public Publisher publisherString() { + return fluxString(); + } + + public CompletableFuture future() { + return CompletableFuture.completedFuture("example"); + } + + public Mono monoWithParam(String param) { + return Mono.just(param).then(); + } + + public Mono monoThrows() { + throw new IllegalStateException("expected"); + } + + public Mono monoThrowsIllegalAccess() throws IllegalAccessException { + //simulate a reflection issue + throw new IllegalAccessException("expected"); + } + + public Flowable flowable() { + return Flowable.empty(); + } + + public Completable completable() { + return Completable.complete(); + } + + AtomicInteger subscription = new AtomicInteger(); + + public Mono trackingMono() { + return Mono.empty() + .doOnSubscribe(s -> subscription.incrementAndGet()); + } + + public Mono monoError() { + return Mono.error(new IllegalStateException("expected")); + } + + } +} diff --git a/spring-context/src/test/kotlin/org/springframework/scheduling/annotation/KotlinScheduledAnnotationReactiveSupportTests.kt b/spring-context/src/test/kotlin/org/springframework/scheduling/annotation/KotlinScheduledAnnotationReactiveSupportTests.kt new file mode 100644 index 000000000000..51cf0e5481a8 --- /dev/null +++ b/spring-context/src/test/kotlin/org/springframework/scheduling/annotation/KotlinScheduledAnnotationReactiveSupportTests.kt @@ -0,0 +1,155 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.scheduling.annotation + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOf +import org.assertj.core.api.Assertions +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.getPublisherFor +import org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.isReactive +import org.springframework.util.ReflectionUtils +import reactor.core.publisher.Mono +import java.util.concurrent.atomic.AtomicInteger +import kotlin.coroutines.Continuation + +class KotlinScheduledAnnotationReactiveSupportTests { + + @Test + fun ensureReactor() { + assertThat(ScheduledAnnotationReactiveSupport.reactorPresent).isTrue + } + + @Test + fun ensureKotlinCoroutineReactorBridge() { + assertThat(ScheduledAnnotationReactiveSupport.coroutinesReactorPresent).isTrue + } + + @ParameterizedTest + @ValueSource(strings = ["suspending", "suspendingReturns"]) + fun isReactiveSuspending(methodName: String) { + val method = ReflectionUtils.findMethod(SuspendingFunctions::class.java, methodName, Continuation::class.java)!! + assertThat(isReactive(method)).isTrue + } + + @ParameterizedTest + @ValueSource(strings = ["flow", "deferred"]) + fun isReactiveKotlinType(methodName: String) { + val method = ReflectionUtils.findMethod(SuspendingFunctions::class.java, methodName)!! + assertThat(isReactive(method)).isTrue + } + + @Test + fun isNotReactive() { + val method = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "notSuspending")!! + assertThat(isReactive(method)).isFalse + } + + internal class SuspendingFunctions { + suspend fun suspending() { + } + + suspend fun suspendingReturns(): String = "suspended" + + suspend fun withParam(param: String): String { + return param + } + + suspend fun throwsIllegalState() { + throw IllegalStateException("expected") + } + + var subscription = AtomicInteger() + suspend fun suspendingTracking() { + subscription.incrementAndGet() + } + + fun notSuspending() { } + + fun flow(): Flow { + return flowOf() + } + + fun deferred(): Deferred { + return CompletableDeferred() + } + } + + + private var target: SuspendingFunctions? = null + + @BeforeEach + fun init() { + target = SuspendingFunctions() + } + + @Test + fun checkKotlinRuntimeIfNeeded() { + val suspendingMethod = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspending", Continuation::class.java)!! + val notSuspendingMethod = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "notSuspending")!! + + assertThat(isReactive(suspendingMethod)).describedAs("suspending").isTrue() + assertThat(isReactive(notSuspendingMethod)).describedAs("not suspending").isFalse() + } + + @Test + fun isReactiveRejectsWithParams() { + val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "withParam", String::class.java, Continuation::class.java)!! + + //isReactive rejects with some context + Assertions.assertThatIllegalArgumentException().isThrownBy { isReactive(m) } + .withMessage("Kotlin suspending functions may only be annotated with @Scheduled if declared without arguments") + .withNoCause() + } + + @Test + fun rejectNotSuspending() { + val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "notSuspending") + + //static helper method + Assertions.assertThatIllegalArgumentException().isThrownBy { getPublisherFor(m!!, target!!) } + .withMessage("Cannot convert the @Scheduled reactive method return type to Publisher") + .withNoCause() + } + + @Test + fun suspendingThrowIsTurnedToMonoError() { + val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "throwsIllegalState", Continuation::class.java) + + val mono = Mono.from(getPublisherFor(m!!, target!!)) + + Assertions.assertThatIllegalStateException().isThrownBy { mono.block() } + .withMessage("expected") + .withNoCause() + } + + @Test + fun turningSuspendingFunctionToMonoDoesntExecuteTheMethod() { + val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java) + val mono = Mono.from(getPublisherFor(m!!, target!!)) + + assertThat(target!!.subscription).hasValue(0) + mono.block() + assertThat(target!!.subscription).describedAs("after subscription").hasValue(1) + } +} \ No newline at end of file