diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java index 4fa478305ba..bef4ccd691e 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java @@ -16,12 +16,17 @@ package org.springframework.integration.jdbc.channel; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.Executor; import org.jspecify.annotations.Nullable; import org.springframework.core.log.LogAccessor; +import org.springframework.core.retry.RetryException; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; +import org.springframework.core.retry.Retryable; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.integration.channel.AbstractSubscribableChannel; import org.springframework.integration.dispatcher.MessageDispatcher; @@ -29,7 +34,6 @@ import org.springframework.integration.jdbc.store.JdbcChannelMessageStore; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; -import org.springframework.retry.support.RetryTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.Assert; @@ -69,7 +73,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel private @Nullable TransactionTemplate transactionTemplate; - private RetryTemplate retryTemplate = RetryTemplate.builder().maxAttempts(1).build(); + private RetryTemplate retryTemplate = + new RetryTemplate(RetryPolicy.builder().maxAttempts(1).delay(Duration.ZERO).build()); private ErrorHandler errorHandler = ReflectionUtils::rethrowRuntimeException; @@ -117,7 +122,7 @@ public void setTransactionManager(PlatformTransactionManager transactionManager) } /** - * Set the retry template to use for retries in case of exception in downstream processing + * Set the retry template to use it for retries in case of exception in downstream processing * @param retryTemplate The retry template to use * @since 6.0.5 * @see RetryTemplate @@ -207,7 +212,7 @@ private Optional doPollAndDispatchMessage() { if (this.hasHandlers) { TransactionTemplate transactionTemplateToUse = this.transactionTemplate; if (transactionTemplateToUse != null) { - return this.retryTemplate.execute(context -> + return executeWithRetry(() -> transactionTemplateToUse.execute(status -> pollMessage() .filter(message -> { @@ -221,12 +226,26 @@ private Optional doPollAndDispatchMessage() { } else { return pollMessage() - .map(message -> this.retryTemplate.execute(context -> dispatch(message))); + .map(message -> executeWithRetry(() -> dispatch(message))); } } return Optional.empty(); } + @SuppressWarnings("NullAway") // Never null, according to the logic in the 'doPollAndDispatchMessage()'. + private T executeWithRetry(Retryable retryable) { + try { + return this.retryTemplate.execute(retryable); + } + catch (RetryException ex) { + Throwable cause = ex.getCause(); + if (cause instanceof RuntimeException runtimeException) { + throw runtimeException; + } + throw new IllegalStateException(cause); + } + } + private Optional> pollMessage() { return Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)); } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java index 71bd80f08ba..ded06169799 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/postgres/PostgresChannelMessageTableSubscriberTests.java @@ -43,6 +43,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ByteArrayResource; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.jdbc.channel.PgConnectionSupplier; import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber; @@ -57,7 +59,6 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.GenericMessage; -import org.springframework.retry.support.RetryTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -246,7 +247,8 @@ void testRetryOnErrorDuringDispatch(boolean transactionsEnabled) throws Interrup AtomicInteger actualTries = new AtomicInteger(); int maxAttempts = 2; - postgresSubscribableChannel.setRetryTemplate(RetryTemplate.builder().maxAttempts(maxAttempts).build()); + postgresSubscribableChannel.setRetryTemplate( + new RetryTemplate(RetryPolicy.builder().maxAttempts(maxAttempts).build())); if (transactionsEnabled) { postgresSubscribableChannel.setTransactionManager(transactionManager);