From 0be610b0ee43ee23a54a5d9c2d9b89edab35f686 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Thu, 2 May 2019 16:23:48 +0200 Subject: [PATCH] Support for reactive transactions in TransactionInterceptor Introduces TransactionManager marker interface for PlatformTransactionManager as well as ReactiveTransactionManager, allowing for a common configuration type in TransactionAspectSupport and TransactionManagementConfigurer. Closes gh-22590 --- .../TestContextTransactionUtils.java | 12 +- .../PlatformTransactionManager.java | 4 +- .../ReactiveTransactionManager.java | 3 +- .../transaction/TransactionManager.java | 30 ++ ...actTransactionManagementConfiguration.java | 6 +- .../TransactionManagementConfigurer.java | 19 +- .../interceptor/TransactionAspectSupport.java | 353 ++++++++++++++- ...bstractReactiveTransactionAspectTests.java | 406 ++++++++++++++++++ .../ReactiveTransactionInterceptorTests.java | 112 +++++ 9 files changed, 919 insertions(+), 26 deletions(-) create mode 100644 spring-tx/src/main/java/org/springframework/transaction/TransactionManager.java create mode 100644 spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java create mode 100644 spring-tx/src/test/java/org/springframework/transaction/interceptor/ReactiveTransactionInterceptorTests.java diff --git a/spring-test/src/main/java/org/springframework/test/context/transaction/TestContextTransactionUtils.java b/spring-test/src/main/java/org/springframework/test/context/transaction/TestContextTransactionUtils.java index ce3271f19499..3efba1e16d8a 100644 --- a/spring-test/src/main/java/org/springframework/test/context/transaction/TestContextTransactionUtils.java +++ b/spring-test/src/main/java/org/springframework/test/context/transaction/TestContextTransactionUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.springframework.lang.Nullable; import org.springframework.test.context.TestContext; import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionManager; import org.springframework.transaction.annotation.TransactionManagementConfigurer; import org.springframework.transaction.interceptor.DelegatingTransactionAttribute; import org.springframework.transaction.interceptor.TransactionAttribute; @@ -202,7 +203,14 @@ public static PlatformTransactionManager retrieveTransactionManager(TestContext Assert.state(configurers.size() <= 1, "Only one TransactionManagementConfigurer may exist in the ApplicationContext"); if (configurers.size() == 1) { - return configurers.values().iterator().next().annotationDrivenTransactionManager(); + TransactionManager tm = configurers.values().iterator().next().annotationDrivenTransactionManager(); + if (tm instanceof PlatformTransactionManager) { + return (PlatformTransactionManager) tm; + } + else { + throw new IllegalStateException( + "Specified transaction manager is not a PlatformTransactionManager: " + tm); + } } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/PlatformTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/PlatformTransactionManager.java index 3723549452ee..a3806200cceb 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/PlatformTransactionManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/PlatformTransactionManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,7 +43,7 @@ * @see org.springframework.transaction.interceptor.TransactionInterceptor * @see org.springframework.transaction.interceptor.TransactionProxyFactoryBean */ -public interface PlatformTransactionManager { +public interface PlatformTransactionManager extends TransactionManager { /** * Return a currently active transaction or create a new one, according to diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java index 553e4e329ff5..d9d5b26835dd 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java @@ -27,9 +27,8 @@ * @author Mark Paluch * @author Juergen Hoeller * @since 5.2 - * @see org.springframework.transaction.interceptor.TransactionProxyFactoryBean */ -public interface ReactiveTransactionManager { +public interface ReactiveTransactionManager extends TransactionManager { /** * Emit a currently active reactive transaction or create a new one, according to diff --git a/spring-tx/src/main/java/org/springframework/transaction/TransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/TransactionManager.java new file mode 100644 index 000000000000..6504495b59cb --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/TransactionManager.java @@ -0,0 +1,30 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction; + +/** + * Marker interface for Spring transaction manager implementations, + * either traditional or reactive. + * + * @author Juergen Hoeller + * @since 5.2 + * @see PlatformTransactionManager + * @see ReactiveTransactionManager + */ +public interface TransactionManager { + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/annotation/AbstractTransactionManagementConfiguration.java b/spring-tx/src/main/java/org/springframework/transaction/annotation/AbstractTransactionManagementConfiguration.java index 93a88a80f9d1..460105d9ea56 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/annotation/AbstractTransactionManagementConfiguration.java +++ b/spring-tx/src/main/java/org/springframework/transaction/annotation/AbstractTransactionManagementConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ import org.springframework.core.annotation.AnnotationAttributes; import org.springframework.core.type.AnnotationMetadata; import org.springframework.lang.Nullable; -import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionManager; import org.springframework.transaction.config.TransactionManagementConfigUtils; import org.springframework.transaction.event.TransactionalEventListenerFactory; import org.springframework.util.CollectionUtils; @@ -51,7 +51,7 @@ public abstract class AbstractTransactionManagementConfiguration implements Impo * Default transaction manager, as configured through a {@link TransactionManagementConfigurer}. */ @Nullable - protected PlatformTransactionManager txManager; + protected TransactionManager txManager; @Override diff --git a/spring-tx/src/main/java/org/springframework/transaction/annotation/TransactionManagementConfigurer.java b/spring-tx/src/main/java/org/springframework/transaction/annotation/TransactionManagementConfigurer.java index c7d6a07f468b..fdab4eafa73f 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/annotation/TransactionManagementConfigurer.java +++ b/spring-tx/src/main/java/org/springframework/transaction/annotation/TransactionManagementConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,15 +16,16 @@ package org.springframework.transaction.annotation; -import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionManager; /** * Interface to be implemented by @{@link org.springframework.context.annotation.Configuration * Configuration} classes annotated with @{@link EnableTransactionManagement} that wish to - * or need to explicitly specify the default {@link PlatformTransactionManager} bean to be - * used for annotation-driven transaction management, as opposed to the default approach - * of a by-type lookup. One reason this might be necessary is if there are two - * {@code PlatformTransactionManager} beans present in the container. + * (or need to) explicitly specify the default {@code PlatformTransactionManager} bean + * (or {@code ReactiveTransactionManager} bean) to be used for annotation-driven + * transaction management, as opposed to the default approach of a by-type lookup. + * One reason this might be necessary is if there are two {@code PlatformTransactionManager} + * beans present in the container. * *

See @{@link EnableTransactionManagement} for general examples and context; * see {@link #annotationDrivenTransactionManager()} for detailed instructions. @@ -40,6 +41,8 @@ * @since 3.1 * @see EnableTransactionManagement * @see org.springframework.context.annotation.Primary + * @see org.springframework.transaction.PlatformTransactionManager + * @see org.springframework.transaction.ReactiveTransactionManager */ public interface TransactionManagementConfigurer { @@ -76,7 +79,9 @@ public interface TransactionManagementConfigurer { * container as all {@code PlatformTransactionManager} implementations take advantage * of Spring lifecycle callbacks such as {@code InitializingBean} and * {@code BeanFactoryAware}. + * @return a {@link org.springframework.transaction.PlatformTransactionManager} or + * {@link org.springframework.transaction.ReactiveTransactionManager} implementation */ - PlatformTransactionManager annotationDrivenTransactionManager(); + TransactionManager annotationDrivenTransactionManager(); } diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index 262f227817da..7c3a4797cb53 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -23,17 +23,25 @@ import io.vavr.control.Try; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; import org.springframework.core.NamedThreadLocal; +import org.springframework.core.ReactiveAdapter; +import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.lang.Nullable; import org.springframework.transaction.NoTransactionException; import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.ReactiveTransaction; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.TransactionManager; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.TransactionSystemException; +import org.springframework.transaction.reactive.TransactionContextManager; import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -86,6 +94,12 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init private static final boolean vavrPresent = ClassUtils.isPresent( "io.vavr.control.Try", TransactionAspectSupport.class.getClassLoader()); + /** + * Reactive Streams API present on the classpath? + */ + private static final boolean reactiveStreamsPresent = + ClassUtils.isPresent("org.reactivestreams.Publisher", TransactionAspectSupport.class.getClassLoader()); + /** * Holder to support the {@code currentTransactionStatus()} method, * and to support communication between different cooperating advices @@ -136,11 +150,14 @@ public static TransactionStatus currentTransactionStatus() throws NoTransactionE protected final Log logger = LogFactory.getLog(getClass()); + @Nullable + private final ReactiveAdapterRegistry reactiveAdapterRegistry; + @Nullable private String transactionManagerBeanName; @Nullable - private PlatformTransactionManager transactionManager; + private TransactionManager transactionManager; @Nullable private TransactionAttributeSource transactionAttributeSource; @@ -148,12 +165,23 @@ public static TransactionStatus currentTransactionStatus() throws NoTransactionE @Nullable private BeanFactory beanFactory; - private final ConcurrentMap transactionManagerCache = - new ConcurrentReferenceHashMap<>(4); + private final ConcurrentMap transactionManagerCache = new ConcurrentReferenceHashMap<>(4); + + + protected TransactionAspectSupport() { + if (reactiveStreamsPresent) { + this.reactiveAdapterRegistry = ReactiveAdapterRegistry.getSharedInstance(); + } + else { + this.reactiveAdapterRegistry = null; + } + } /** * Specify the name of the default transaction manager bean. + * This can either point to a traditional {@link PlatformTransactionManager} or a + * {@link ReactiveTransactionManager} for reactive transaction management. */ public void setTransactionManagerBeanName(@Nullable String transactionManagerBeanName) { this.transactionManagerBeanName = transactionManagerBeanName; @@ -169,20 +197,24 @@ protected final String getTransactionManagerBeanName() { /** * Specify the default transaction manager to use to drive transactions. + * This can either be a traditional {@link PlatformTransactionManager} or a + * {@link ReactiveTransactionManager} for reactive transaction management. *

The default transaction manager will be used if a qualifier * has not been declared for a given transaction or if an explicit name for the * default transaction manager bean has not been specified. * @see #setTransactionManagerBeanName */ - public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) { + public void setTransactionManager(@Nullable TransactionManager transactionManager) { this.transactionManager = transactionManager; } /** * Return the default transaction manager, or {@code null} if unknown. + * This can either be a traditional {@link PlatformTransactionManager} or a + * {@link ReactiveTransactionManager} for reactive transaction management. */ @Nullable - public PlatformTransactionManager getTransactionManager() { + public TransactionManager getTransactionManager() { return this.transactionManager; } @@ -285,6 +317,13 @@ public void afterPropertiesSet() { protected Object invokeWithinTransaction(Method method, @Nullable Class targetClass, final InvocationCallback invocation) throws Throwable { + if (this.reactiveAdapterRegistry != null) { + ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType()); + if (adapter != null) { + return new ReactiveTransactionSupport(adapter).invokeWithinTransaction(method, targetClass, invocation); + } + } + // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); @@ -398,7 +437,7 @@ protected void clearTransactionManagerCache() { protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) { // Do not attempt to lookup tx manager if no tx attributes are set if (txAttr == null || this.beanFactory == null) { - return getTransactionManager(); + return asPlatformTransactionManager(getTransactionManager()); } String qualifier = txAttr.getQualifier(); @@ -409,9 +448,9 @@ else if (StringUtils.hasText(this.transactionManagerBeanName)) { return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName); } else { - PlatformTransactionManager defaultTransactionManager = getTransactionManager(); + PlatformTransactionManager defaultTransactionManager = asPlatformTransactionManager(getTransactionManager()); if (defaultTransactionManager == null) { - defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY); + defaultTransactionManager = asPlatformTransactionManager(this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY)); if (defaultTransactionManager == null) { defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class); this.transactionManagerCache.putIfAbsent( @@ -423,7 +462,7 @@ else if (StringUtils.hasText(this.transactionManagerBeanName)) { } private PlatformTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) { - PlatformTransactionManager txManager = this.transactionManagerCache.get(qualifier); + PlatformTransactionManager txManager = asPlatformTransactionManager(this.transactionManagerCache.get(qualifier)); if (txManager == null) { txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType( beanFactory, PlatformTransactionManager.class, qualifier); @@ -432,6 +471,18 @@ private PlatformTransactionManager determineQualifiedTransactionManager(BeanFact return txManager; } + + @Nullable + private PlatformTransactionManager asPlatformTransactionManager(@Nullable Object transactionManager) { + if (transactionManager == null || transactionManager instanceof PlatformTransactionManager) { + return (PlatformTransactionManager) transactionManager; + } + else { + throw new IllegalStateException( + "Specified transaction manager is not a PlatformTransactionManager: " + transactionManager); + } + } + private String methodIdentification(Method method, @Nullable Class targetClass, @Nullable TransactionAttribute txAttr) { @@ -614,7 +665,7 @@ protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) { /** - * Opaque object used to hold Transaction information. Subclasses + * Opaque object used to hold transaction information. Subclasses * must pass it back to methods on this class, but not see its internals. */ protected static final class TransactionInfo { @@ -753,4 +804,286 @@ public static Object evaluateTryFailure(Object retVal, TransactionAttribute txAt } } + + /** + * Delegate for Reactor-based management of transactional methods with a + * reactive return type. + */ + private class ReactiveTransactionSupport { + + private final ReactiveAdapter adapter; + + public ReactiveTransactionSupport(ReactiveAdapter adapter) { + this.adapter = adapter; + } + + @SuppressWarnings("unchecked") + public Object invokeWithinTransaction(Method method, @Nullable Class targetClass, InvocationCallback invocation) { + // If the transaction attribute is null, the method is non-transactional. + TransactionAttributeSource tas = getTransactionAttributeSource(); + TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); + ReactiveTransactionManager tm = determineTransactionManager(txAttr); + String joinpointIdentification = methodIdentification(method, targetClass, txAttr); + + // Optimize for Mono + if (Mono.class.isAssignableFrom(method.getReturnType())) { + return TransactionContextManager.currentContext().flatMap(context -> { + // Standard transaction demarcation with getTransaction and commit/rollback calls. + Mono txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); + return txInfo.flatMap(it -> { + try { + // This is an around advice: Invoke the next interceptor in the chain. + // This will normally result in a target object being invoked. + Mono retVal = (Mono) invocation.proceedWithInvocation(); + return retVal + .onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))).materialize() + .flatMap(signal -> { + if (signal.isOnComplete() || signal.isOnNext()) { + return commitTransactionAfterReturning(it).thenReturn(signal); + } + return Mono.just(signal); + }).dematerialize(); + } + catch (Throwable ex) { + // target invocation exception + return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)); + } + }); + }).subscriberContext(TransactionContextManager.getOrCreateContext()) + .subscriberContext(TransactionContextManager.getOrCreateContextHolder()); + } + + return TransactionContextManager.currentContext().flatMapMany(context -> { + // Standard transaction demarcation with getTransaction and commit/rollback calls. + Mono txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); + return txInfo.flatMapMany(it -> { + try { + // This is an around advice: Invoke the next interceptor in the chain. + // This will normally result in a target object being invoked. + Flux retVal = Flux.from(this.adapter.toPublisher(invocation.proceedWithInvocation())); + return retVal + .onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))) + .materialize().flatMap(signal -> { + if (signal.isOnComplete()) { + return commitTransactionAfterReturning(it).materialize(); + } + return Mono.just(signal); + }).dematerialize(); + } + catch (Throwable ex) { + // target invocation exception + return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)); + } + }); + }).subscriberContext(TransactionContextManager.getOrCreateContext()) + .subscriberContext(TransactionContextManager.getOrCreateContextHolder()); + } + + @Nullable + private ReactiveTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) { + // Do not attempt to lookup tx manager if no tx attributes are set + if (txAttr == null || beanFactory == null) { + return asReactiveTransactionManager(getTransactionManager()); + } + + String qualifier = txAttr.getQualifier(); + if (StringUtils.hasText(qualifier)) { + return determineQualifiedTransactionManager(beanFactory, qualifier); + } + else if (StringUtils.hasText(transactionManagerBeanName)) { + return determineQualifiedTransactionManager(beanFactory, transactionManagerBeanName); + } + else { + ReactiveTransactionManager defaultTransactionManager = asReactiveTransactionManager(getTransactionManager()); + if (defaultTransactionManager == null) { + defaultTransactionManager = asReactiveTransactionManager(transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY)); + if (defaultTransactionManager == null) { + defaultTransactionManager = beanFactory.getBean(ReactiveTransactionManager.class); + transactionManagerCache.putIfAbsent( + DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager); + } + } + return defaultTransactionManager; + } + } + + private ReactiveTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) { + ReactiveTransactionManager txManager = asReactiveTransactionManager(transactionManagerCache.get(qualifier)); + if (txManager == null) { + txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType( + beanFactory, ReactiveTransactionManager.class, qualifier); + transactionManagerCache.putIfAbsent(qualifier, txManager); + } + return txManager; + } + + @Nullable + private ReactiveTransactionManager asReactiveTransactionManager(@Nullable Object transactionManager) { + if (transactionManager == null || transactionManager instanceof ReactiveTransactionManager) { + return (ReactiveTransactionManager) transactionManager; + } + else { + throw new IllegalStateException( + "Specified transaction manager is not a ReactiveTransactionManager: " + transactionManager); + } + } + + @SuppressWarnings("serial") + private Mono createTransactionIfNecessary(@Nullable ReactiveTransactionManager tm, + @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { + + // If no name specified, apply method identification as transaction name. + if (txAttr != null && txAttr.getName() == null) { + txAttr = new DelegatingTransactionAttribute(txAttr) { + @Override + public String getName() { + return joinpointIdentification; + } + }; + } + TransactionAttribute attrToUse = txAttr; + + Mono tx = Mono.empty(); + if (txAttr != null) { + if (tm != null) { + tx = tm.getReactiveTransaction(txAttr); + } + else { + if (logger.isDebugEnabled()) { + logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + + "] because no transaction manager has been configured"); + } + } + } + + return tx.map(it -> prepareTransactionInfo(tm, attrToUse, joinpointIdentification, it)) + .switchIfEmpty(Mono.defer(() -> Mono.just(prepareTransactionInfo(tm, attrToUse, joinpointIdentification, null)))); + } + + private ReactiveTransactionInfo prepareTransactionInfo(@Nullable ReactiveTransactionManager tm, + @Nullable TransactionAttribute txAttr, String joinpointIdentification, + @Nullable ReactiveTransaction transaction) { + + ReactiveTransactionInfo txInfo = new ReactiveTransactionInfo(tm, txAttr, joinpointIdentification); + if (txAttr != null) { + // We need a transaction for this method... + if (logger.isTraceEnabled()) { + logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); + } + // The transaction manager will flag an error if an incompatible tx already exists. + txInfo.newReactiveTransaction(transaction); + } + else { + // The TransactionInfo.hasTransaction() method will return false. We created it only + // to preserve the integrity of the ThreadLocal stack maintained in this class. + if (logger.isTraceEnabled()) { + logger.trace("Don't need to create transaction for [" + joinpointIdentification + + "]: This method isn't transactional."); + } + } + + return txInfo; + } + + private Mono commitTransactionAfterReturning(@Nullable ReactiveTransactionInfo txInfo) { + if (txInfo != null && txInfo.getReactiveTransaction() != null) { + if (logger.isTraceEnabled()) { + logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); + } + return txInfo.getTransactionManager().commit(txInfo.getReactiveTransaction()); + } + return Mono.empty(); + } + + private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactionInfo txInfo, Throwable ex) { + if (txInfo != null && txInfo.getReactiveTransaction() != null) { + if (logger.isTraceEnabled()) { + logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + + "] after exception: " + ex); + } + if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { + return txInfo.getTransactionManager().rollback(txInfo.getReactiveTransaction()).onErrorMap(ex2 -> { + logger.error("Application exception overridden by rollback exception", ex); + if (ex2 instanceof TransactionSystemException) { + ((TransactionSystemException) ex2).initApplicationException(ex); + } + return ex2; + } + ); + } + else { + // We don't roll back on this exception. + // Will still roll back if TransactionStatus.isRollbackOnly() is true. + return txInfo.getTransactionManager().commit(txInfo.getReactiveTransaction()).onErrorMap(ex2 -> { + logger.error("Application exception overridden by commit exception", ex); + if (ex2 instanceof TransactionSystemException) { + ((TransactionSystemException) ex2).initApplicationException(ex); + } + return ex2; + } + ); + } + } + return Mono.empty(); + } + } + + + /** + * Opaque object used to hold transaction information for reactive methods. + */ + private static final class ReactiveTransactionInfo { + + @Nullable + private final ReactiveTransactionManager transactionManager; + + @Nullable + private final TransactionAttribute transactionAttribute; + + private final String joinpointIdentification; + + @Nullable + private ReactiveTransaction reactiveTransaction; + + public ReactiveTransactionInfo(@Nullable ReactiveTransactionManager transactionManager, + @Nullable TransactionAttribute transactionAttribute, String joinpointIdentification) { + + this.transactionManager = transactionManager; + this.transactionAttribute = transactionAttribute; + this.joinpointIdentification = joinpointIdentification; + } + + public ReactiveTransactionManager getTransactionManager() { + Assert.state(this.transactionManager != null, "No ReactiveTransactionManager set"); + return this.transactionManager; + } + + @Nullable + public TransactionAttribute getTransactionAttribute() { + return this.transactionAttribute; + } + + /** + * Return a String representation of this joinpoint (usually a Method call) + * for use in logging. + */ + public String getJoinpointIdentification() { + return this.joinpointIdentification; + } + + public void newReactiveTransaction(@Nullable ReactiveTransaction transaction) { + this.reactiveTransaction = transaction; + } + + @Nullable + public ReactiveTransaction getReactiveTransaction() { + return this.reactiveTransaction; + } + + @Override + public String toString() { + return (this.transactionAttribute != null ? this.transactionAttribute.toString() : "No transaction"); + } + } + } diff --git a/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java b/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java new file mode 100644 index 000000000000..4e7213c84b66 --- /dev/null +++ b/spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java @@ -0,0 +1,406 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.interceptor; + +import java.lang.reflect.Method; + +import org.junit.Before; +import org.junit.Test; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import org.springframework.transaction.CannotCreateTransactionException; +import org.springframework.transaction.ReactiveTransaction; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.TransactionSystemException; +import org.springframework.transaction.UnexpectedRollbackException; +import org.springframework.transaction.reactive.TransactionContext; + +import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Fail.fail; +import static org.mockito.Mockito.*; + +/** + * Abstract support class to test {@link TransactionAspectSupport} with reactive methods. + * + * @author Mark Paluch + * @author Juergen Hoeller + */ +public abstract class AbstractReactiveTransactionAspectTests { + + protected Method exceptionalMethod; + + protected Method getNameMethod; + + protected Method setNameMethod; + + + @Before + public void setup() throws Exception { + exceptionalMethod = TestBean.class.getMethod("exceptional", Throwable.class); + getNameMethod = TestBean.class.getMethod("getName"); + setNameMethod = TestBean.class.getMethod("setName", String.class); + } + + + @Test + public void noTransaction() throws Exception { + ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class); + + DefaultTestBean tb = new DefaultTestBean(); + TransactionAttributeSource tas = new MapTransactionAttributeSource(); + + // All the methods in this class use the advised() template method + // to obtain a transaction object, configured with the when PlatformTransactionManager + // and transaction attribute source + TestBean itb = (TestBean) advised(tb, rtm, tas); + + checkReactiveTransaction(false); + itb.getName(); + checkReactiveTransaction(false); + + // expect no calls + verifyZeroInteractions(rtm); + } + + /** + * Check that a transaction is created and committed. + */ + @Test + public void transactionShouldSucceed() throws Exception { + TransactionAttribute txatt = new DefaultTransactionAttribute(); + + MapTransactionAttributeSource tas = new MapTransactionAttributeSource(); + tas.register(getNameMethod, txatt); + + ReactiveTransaction status = mock(ReactiveTransaction.class); + ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class); + // expect a transaction + when(rtm.getReactiveTransaction(txatt)).thenReturn(Mono.just(status)); + when(rtm.commit(status)).thenReturn(Mono.empty()); + + DefaultTestBean tb = new DefaultTestBean(); + TestBean itb = (TestBean) advised(tb, rtm, tas); + + itb.getName() + .as(StepVerifier::create) + .verifyComplete(); + + verify(rtm).commit(status); + } + + /** + * Check that two transactions are created and committed. + */ + @Test + public void twoTransactionsShouldSucceed() throws Exception { + TransactionAttribute txatt = new DefaultTransactionAttribute(); + + MapTransactionAttributeSource tas1 = new MapTransactionAttributeSource(); + tas1.register(getNameMethod, txatt); + MapTransactionAttributeSource tas2 = new MapTransactionAttributeSource(); + tas2.register(setNameMethod, txatt); + + ReactiveTransaction status = mock(ReactiveTransaction.class); + ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class); + // expect a transaction + when(rtm.getReactiveTransaction(txatt)).thenReturn(Mono.just(status)); + when(rtm.commit(status)).thenReturn(Mono.empty()); + + DefaultTestBean tb = new DefaultTestBean(); + TestBean itb = (TestBean) advised(tb, rtm, new TransactionAttributeSource[] {tas1, tas2}); + + itb.getName() + .as(StepVerifier::create) + .verifyComplete(); + + Mono.from(itb.setName("myName")) + .as(StepVerifier::create) + .verifyComplete(); + + verify(rtm, times(2)).commit(status); + } + + /** + * Check that a transaction is created and committed. + */ + @Test + public void transactionShouldSucceedWithNotNew() throws Exception { + TransactionAttribute txatt = new DefaultTransactionAttribute(); + + MapTransactionAttributeSource tas = new MapTransactionAttributeSource(); + tas.register(getNameMethod, txatt); + + ReactiveTransaction status = mock(ReactiveTransaction.class); + ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class); + // expect a transaction + when(rtm.getReactiveTransaction(txatt)).thenReturn(Mono.just(status)); + when(rtm.commit(status)).thenReturn(Mono.empty()); + + DefaultTestBean tb = new DefaultTestBean(); + TestBean itb = (TestBean) advised(tb, rtm, tas); + + itb.getName() + .as(StepVerifier::create) + .verifyComplete(); + + verify(rtm).commit(status); + } + + + @Test + public void rollbackOnCheckedException() throws Throwable { + doTestRollbackOnException(new Exception(), true, false); + } + + @Test + public void noRollbackOnCheckedException() throws Throwable { + doTestRollbackOnException(new Exception(), false, false); + } + + @Test + public void rollbackOnUncheckedException() throws Throwable { + doTestRollbackOnException(new RuntimeException(), true, false); + } + + @Test + public void noRollbackOnUncheckedException() throws Throwable { + doTestRollbackOnException(new RuntimeException(), false, false); + } + + @Test + public void rollbackOnCheckedExceptionWithRollbackException() throws Throwable { + doTestRollbackOnException(new Exception(), true, true); + } + + @Test + public void noRollbackOnCheckedExceptionWithRollbackException() throws Throwable { + doTestRollbackOnException(new Exception(), false, true); + } + + @Test + public void rollbackOnUncheckedExceptionWithRollbackException() throws Throwable { + doTestRollbackOnException(new RuntimeException(), true, true); + } + + @Test + public void noRollbackOnUncheckedExceptionWithRollbackException() throws Throwable { + doTestRollbackOnException(new RuntimeException(), false, true); + } + + /** + * Check that the when exception thrown by the target can produce the + * desired behavior with the appropriate transaction attribute. + * @param ex exception to be thrown by the target + * @param shouldRollback whether this should cause a transaction rollback + */ + @SuppressWarnings("serial") + protected void doTestRollbackOnException( + final Exception ex, final boolean shouldRollback, boolean rollbackException) throws Exception { + + TransactionAttribute txatt = new DefaultTransactionAttribute() { + @Override + public boolean rollbackOn(Throwable t) { + assertThat(t).isSameAs(ex); + return shouldRollback; + } + }; + + Method m = exceptionalMethod; + MapTransactionAttributeSource tas = new MapTransactionAttributeSource(); + tas.register(m, txatt); + + ReactiveTransaction status = mock(ReactiveTransaction.class); + ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class); + // Gets additional call(s) from TransactionControl + + when(rtm.getReactiveTransaction(txatt)).thenReturn(Mono.just(status)); + + TransactionSystemException tex = new TransactionSystemException("system exception"); + if (rollbackException) { + if (shouldRollback) { + when(rtm.rollback(status)).thenReturn(Mono.error(tex)); + } + else { + when(rtm.commit(status)).thenReturn(Mono.error(tex)); + } + }else{ + when(rtm.commit(status)).thenReturn(Mono.empty()); + when(rtm.rollback(status)).thenReturn(Mono.empty()); + } + + DefaultTestBean tb = new DefaultTestBean(); + TestBean itb = (TestBean) advised(tb, rtm, tas); + + itb.exceptional(ex) + .as(StepVerifier::create) + .expectErrorSatisfies(actual -> { + + if (rollbackException) { + assertThat(actual).isEqualTo(tex); + } else { + assertThat(actual).isEqualTo(ex); + } + }).verify(); + + if (!rollbackException) { + if (shouldRollback) { + verify(rtm).rollback(status); + } + else { + verify(rtm).commit(status); + } + } + } + + /** + * Simulate a transaction infrastructure failure. + * Shouldn't invoke target method. + */ + @Test + public void cannotCreateTransaction() throws Exception { + TransactionAttribute txatt = new DefaultTransactionAttribute(); + + Method m = getNameMethod; + MapTransactionAttributeSource tas = new MapTransactionAttributeSource(); + tas.register(m, txatt); + + ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class); + // Expect a transaction + CannotCreateTransactionException ex = new CannotCreateTransactionException("foobar", null); + when(rtm.getReactiveTransaction(txatt)).thenThrow(ex); + + DefaultTestBean tb = new DefaultTestBean() { + @Override + public Mono getName() { + throw new UnsupportedOperationException( + "Shouldn't have invoked target method when couldn't create transaction for transactional method"); + } + }; + TestBean itb = (TestBean) advised(tb, rtm, tas); + + itb.getName() + .as(StepVerifier::create) + .expectError(CannotCreateTransactionException.class) + .verify(); + } + + /** + * Simulate failure of the underlying transaction infrastructure to commit. + * Check that the target method was invoked, but that the transaction + * infrastructure exception was thrown to the client + */ + @Test + public void cannotCommitTransaction() throws Exception { + TransactionAttribute txatt = new DefaultTransactionAttribute(); + + Method m = setNameMethod; + MapTransactionAttributeSource tas = new MapTransactionAttributeSource(); + tas.register(m, txatt); + // Method m2 = getNameMethod; + // No attributes for m2 + + ReactiveTransactionManager rtm = mock(ReactiveTransactionManager.class); + + ReactiveTransaction status = mock(ReactiveTransaction.class); + when(rtm.getReactiveTransaction(txatt)).thenReturn(Mono.just(status)); + UnexpectedRollbackException ex = new UnexpectedRollbackException("foobar", null); + when(rtm.commit(status)).thenReturn(Mono.error(ex)); + + DefaultTestBean tb = new DefaultTestBean(); + TestBean itb = (TestBean) advised(tb, rtm, tas); + + String name = "new name"; + + Mono.from(itb.setName(name)) + .as(StepVerifier::create) + .expectError(UnexpectedRollbackException.class) + .verify(); + + // Should have invoked target and changed name + + itb.getName() + .as(StepVerifier::create) + .expectNext(name) + .verifyComplete(); + } + + private void checkReactiveTransaction(boolean expected) { + Mono.subscriberContext().handle((context, sink) -> { + if (context.hasKey(TransactionContext.class) != expected){ + fail("Should have thrown NoTransactionException"); + } + sink.complete(); + }).block(); + } + + + protected Object advised( + Object target, ReactiveTransactionManager rtm, TransactionAttributeSource[] tas) throws Exception { + + return advised(target, rtm, new CompositeTransactionAttributeSource(tas)); + } + + /** + * Subclasses must implement this to create an advised object based on the + * when target. In the case of AspectJ, the advised object will already + * have been created, as there's no distinction between target and proxy. + * In the case of Spring's own AOP framework, a proxy must be created + * using a suitably configured transaction interceptor + * @param target target if there's a distinct target. If not (AspectJ), + * return target. + * @return transactional advised object + */ + protected abstract Object advised( + Object target, ReactiveTransactionManager rtm, TransactionAttributeSource tas) throws Exception; + + + public interface TestBean { + + Mono exceptional(Throwable t); + + Mono getName(); + + Publisher setName(String name); + } + + + public class DefaultTestBean implements TestBean { + + private String name; + + @Override + public Mono getName() { + return Mono.justOrEmpty(name); + } + + @Override + public Mono setName(String name) { + return Mono.fromRunnable(() -> this.name = name); + } + + @Override + public Mono exceptional(Throwable t) { + if (t != null) { + return Mono.error(t); + } + return Mono.empty(); + } + } + +} diff --git a/spring-tx/src/test/java/org/springframework/transaction/interceptor/ReactiveTransactionInterceptorTests.java b/spring-tx/src/test/java/org/springframework/transaction/interceptor/ReactiveTransactionInterceptorTests.java new file mode 100644 index 000000000000..902a4d149e7e --- /dev/null +++ b/spring-tx/src/test/java/org/springframework/transaction/interceptor/ReactiveTransactionInterceptorTests.java @@ -0,0 +1,112 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.interceptor; + +import org.junit.Rule; +import org.junit.rules.ExpectedException; + +import org.springframework.aop.framework.ProxyFactory; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.transaction.ReactiveTransactionManager; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.*; + +/** + * Unit tests for {@link TransactionInterceptor} with reactive methods. + * + * @author Mark Paluch + */ +public class ReactiveTransactionInterceptorTests extends AbstractReactiveTransactionAspectTests { + + @Rule + public final ExpectedException thrown = ExpectedException.none(); + + + @Override + protected Object advised(Object target, ReactiveTransactionManager ptm, TransactionAttributeSource[] tas) { + TransactionInterceptor ti = new TransactionInterceptor(); + ti.setTransactionManager(ptm); + ti.setTransactionAttributeSources(tas); + + ProxyFactory pf = new ProxyFactory(target); + pf.addAdvice(0, ti); + return pf.getProxy(); + } + + /** + * Template method to create an advised object given the + * target object and transaction setup. + * Creates a TransactionInterceptor and applies it. + */ + @Override + protected Object advised(Object target, ReactiveTransactionManager ptm, TransactionAttributeSource tas) { + TransactionInterceptor ti = new TransactionInterceptor(); + ti.setTransactionManager(ptm); + + assertThat(ti.getTransactionManager()).isEqualTo(ptm); + ti.setTransactionAttributeSource(tas); + assertThat(ti.getTransactionAttributeSource()).isEqualTo(tas); + + ProxyFactory pf = new ProxyFactory(target); + pf.addAdvice(0, ti); + return pf.getProxy(); + } + + private TransactionInterceptor createTransactionInterceptor(BeanFactory beanFactory, + String transactionManagerName, ReactiveTransactionManager transactionManager) { + + TransactionInterceptor ti = new TransactionInterceptor(); + if (beanFactory != null) { + ti.setBeanFactory(beanFactory); + } + if (transactionManagerName != null) { + ti.setTransactionManagerBeanName(transactionManagerName); + + } + if (transactionManager != null) { + ti.setTransactionManager(transactionManager); + } + ti.setTransactionAttributeSource(new NameMatchTransactionAttributeSource()); + ti.afterPropertiesSet(); + return ti; + } + + private TransactionInterceptor transactionInterceptorWithTransactionManager( + ReactiveTransactionManager transactionManager, BeanFactory beanFactory) { + + return createTransactionInterceptor(beanFactory, null, transactionManager); + } + + private TransactionInterceptor transactionInterceptorWithTransactionManagerName( + String transactionManagerName, BeanFactory beanFactory) { + + return createTransactionInterceptor(beanFactory, transactionManagerName, null); + } + + private TransactionInterceptor simpleTransactionInterceptor(BeanFactory beanFactory) { + return createTransactionInterceptor(beanFactory, null, null); + } + + private ReactiveTransactionManager associateTransactionManager(BeanFactory beanFactory, String name) { + ReactiveTransactionManager transactionManager = mock(ReactiveTransactionManager.class); + when(beanFactory.containsBean(name)).thenReturn(true); + when(beanFactory.getBean(name, ReactiveTransactionManager.class)).thenReturn(transactionManager); + return transactionManager; + } + +} \ No newline at end of file