16
16
17
17
package org .springframework .integration .jdbc .channel ;
18
18
19
+ import java .time .Duration ;
19
20
import java .util .Optional ;
20
21
import java .util .concurrent .Executor ;
21
22
22
23
import org .jspecify .annotations .Nullable ;
23
24
24
25
import org .springframework .core .log .LogAccessor ;
26
+ import org .springframework .core .retry .RetryException ;
27
+ import org .springframework .core .retry .RetryPolicy ;
28
+ import org .springframework .core .retry .RetryTemplate ;
29
+ import org .springframework .core .retry .Retryable ;
25
30
import org .springframework .core .task .SimpleAsyncTaskExecutor ;
26
31
import org .springframework .integration .channel .AbstractSubscribableChannel ;
27
32
import org .springframework .integration .dispatcher .MessageDispatcher ;
28
33
import org .springframework .integration .dispatcher .UnicastingDispatcher ;
29
34
import org .springframework .integration .jdbc .store .JdbcChannelMessageStore ;
30
35
import org .springframework .messaging .Message ;
31
36
import org .springframework .messaging .MessageHandler ;
32
- import org .springframework .retry .support .RetryTemplate ;
33
37
import org .springframework .transaction .PlatformTransactionManager ;
34
38
import org .springframework .transaction .support .TransactionTemplate ;
35
39
import org .springframework .util .Assert ;
@@ -69,7 +73,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
69
73
70
74
private @ Nullable TransactionTemplate transactionTemplate ;
71
75
72
- private RetryTemplate retryTemplate = RetryTemplate .builder ().maxAttempts (1 ).build ();
76
+ private RetryTemplate retryTemplate =
77
+ new RetryTemplate (RetryPolicy .builder ().maxAttempts (1 ).delay (Duration .ZERO ).build ());
73
78
74
79
private ErrorHandler errorHandler = ReflectionUtils ::rethrowRuntimeException ;
75
80
@@ -117,7 +122,7 @@ public void setTransactionManager(PlatformTransactionManager transactionManager)
117
122
}
118
123
119
124
/**
120
- * Set the retry template to use for retries in case of exception in downstream processing
125
+ * Set the retry template to use it for retries in case of exception in downstream processing
121
126
* @param retryTemplate The retry template to use
122
127
* @since 6.0.5
123
128
* @see RetryTemplate
@@ -207,7 +212,7 @@ private Optional<?> doPollAndDispatchMessage() {
207
212
if (this .hasHandlers ) {
208
213
TransactionTemplate transactionTemplateToUse = this .transactionTemplate ;
209
214
if (transactionTemplateToUse != null ) {
210
- return this . retryTemplate . execute ( context ->
215
+ return executeWithRetry (() ->
211
216
transactionTemplateToUse .execute (status ->
212
217
pollMessage ()
213
218
.filter (message -> {
@@ -221,12 +226,26 @@ private Optional<?> doPollAndDispatchMessage() {
221
226
}
222
227
else {
223
228
return pollMessage ()
224
- .map (message -> this . retryTemplate . execute ( context -> dispatch (message )));
229
+ .map (message -> executeWithRetry (() -> dispatch (message )));
225
230
}
226
231
}
227
232
return Optional .empty ();
228
233
}
229
234
235
+ @ SuppressWarnings ("NullAway" ) // Never null, according to the logic in the 'doPollAndDispatchMessage()'.
236
+ private <T > T executeWithRetry (Retryable <T > retryable ) {
237
+ try {
238
+ return this .retryTemplate .execute (retryable );
239
+ }
240
+ catch (RetryException ex ) {
241
+ Throwable cause = ex .getCause ();
242
+ if (cause instanceof RuntimeException runtimeException ) {
243
+ throw runtimeException ;
244
+ }
245
+ throw new IllegalStateException (cause );
246
+ }
247
+ }
248
+
230
249
private Optional <Message <?>> pollMessage () {
231
250
return Optional .ofNullable (this .jdbcChannelMessageStore .pollMessageFromGroup (this .groupId ));
232
251
}
0 commit comments