Skip to content

Commit 06b9fc1

Browse files
BURJA Lucianlburja
BURJA Lucian
authored andcommitted
Store attemptFrequency and blockAfterAttempts, to allow overriding the default values per schedule request
1 parent b6f8a11 commit 06b9fc1

File tree

6 files changed

+105
-8
lines changed

6 files changed

+105
-8
lines changed

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java

+12
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,18 @@ static final class Builder {
158158
12,
159159
"Add flush index to support ordering",
160160
"CREATE INDEX IX_TXNO_OUTBOX_2 ON TXNO_OUTBOX (topic, processed, seq)"));
161+
migrations.put(
162+
13,
163+
new Migration(
164+
13,
165+
"Add attemptFrequency",
166+
"ALTER TABLE TXNO_OUTBOX ADD COLUMN attemptFrequency INT"));
167+
migrations.put(
168+
14,
169+
new Migration(
170+
14,
171+
"Add blockAfterAttempts",
172+
"ALTER TABLE TXNO_OUTBOX ADD COLUMN blockAfterAttempts INT"));
161173
}
162174

163175
Builder setMigration(Migration migration) {

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.sql.SQLTimeoutException;
1212
import java.sql.Statement;
1313
import java.sql.Timestamp;
14+
import java.time.Duration;
1415
import java.time.Instant;
1516
import java.util.ArrayList;
1617
import java.util.Collection;
@@ -38,7 +39,7 @@
3839
public class DefaultPersistor implements Persistor, Validatable {
3940

4041
private static final String ALL_FIELDS =
41-
"id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version";
42+
"id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version, attemptFrequency, blockAfterAttempts";
4243

4344
/**
4445
* @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write
@@ -115,7 +116,7 @@ public void save(Transaction tx, TransactionOutboxEntry entry)
115116
+ tableName
116117
+ " ("
117118
+ ALL_FIELDS
118-
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
119+
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
119120
var writer = new StringWriter();
120121
serializer.serializeInvocation(entry.getInvocation(), writer);
121122
if (entry.getTopic() != null) {
@@ -201,6 +202,8 @@ private void setupInsert(
201202
stmt.setBoolean(9, entry.isBlocked());
202203
stmt.setBoolean(10, entry.isProcessed());
203204
stmt.setInt(11, entry.getVersion());
205+
stmt.setObject(12, entry.getAttemptFrequency() == null ? null : entry.getAttemptFrequency().toMillis());
206+
stmt.setObject(13, entry.getBlockAfterAttempts());
204207
}
205208

206209
@Override
@@ -392,6 +395,12 @@ private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOExceptio
392395
.blocked(rs.getBoolean("blocked"))
393396
.processed(rs.getBoolean("processed"))
394397
.version(rs.getInt("version"))
398+
.attemptFrequency(rs.getObject("attemptFrequency") == null
399+
? null
400+
: Duration.ofMillis(rs.getInt("attemptFrequency")))
401+
.blockAfterAttempts(rs.getObject("blockAfterAttempts") == null
402+
? null
403+
: rs.getInt("blockAfterAttempts"))
395404
.build();
396405
log.debug("Found {}", entry);
397406
return entry;

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java

+14
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,20 @@ interface ParameterizedScheduleBuilder {
394394
*/
395395
ParameterizedScheduleBuilder delayForAtLeast(Duration duration);
396396

397+
/**
398+
* Use an attemptFrequency that is different from the default set on the {@link TransactionOutbox}.
399+
* @param attemptFrequency How often tasks should be re-attempted.
400+
* @return Builder.
401+
*/
402+
ParameterizedScheduleBuilder attemptFrequency(Duration attemptFrequency);
403+
404+
/**
405+
* Use a blockAfterAttempts that is different from the default set on the {@link TransactionOutbox}.
406+
* @param blockAfterAttempts How many attempts a task should be retried before it is permanently blocked.
407+
* @return Builder.
408+
*/
409+
ParameterizedScheduleBuilder blockAfterAttempts(Integer blockAfterAttempts);
410+
397411
/**
398412
* Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters
399413
* to the request as configured using {@link TransactionOutbox#with()}.

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java

+21
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static java.util.stream.Collectors.joining;
44

5+
import java.time.Duration;
56
import java.time.Instant;
67
import java.util.Arrays;
78
import lombok.*;
@@ -83,6 +84,26 @@ public class TransactionOutboxEntry implements Validatable {
8384
@Setter
8485
private int attempts;
8586

87+
/**
88+
* @param attemptFrequency How often tasks should be re-attempted.
89+
* If null, the default configured on TransactionOutbox will apply.
90+
* @return How often tasks should be re-attempted.
91+
*/
92+
@SuppressWarnings("JavaDoc")
93+
@Getter
94+
@Setter
95+
private Duration attemptFrequency;
96+
97+
/**
98+
* @param blockAfterAttempts How many attempts a task should be retried before it is permanently blocked.
99+
* If null, the default configured on TransactionOutbox will apply.
100+
* @return How many attempts a task should be retried before it is permanently blocked.
101+
*/
102+
@SuppressWarnings("JavaDoc")
103+
@Getter
104+
@Setter
105+
private Integer blockAfterAttempts;
106+
86107
/**
87108
* @param blocked True if the task has exceeded the configured maximum number of attempts.
88109
* @return True if the task has exceeded the configured maximum number of attempts.

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java

+13-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static com.gruelbox.transactionoutbox.spi.Utils.uncheckedly;
55
import static java.time.temporal.ChronoUnit.MILLIS;
66
import static java.time.temporal.ChronoUnit.MINUTES;
7+
import static java.util.Objects.requireNonNullElse;
78

89
import com.gruelbox.transactionoutbox.spi.ProxyFactory;
910
import com.gruelbox.transactionoutbox.spi.Utils;
@@ -82,7 +83,7 @@ public void initialize() {
8283

8384
@Override
8485
public <T> T schedule(Class<T> clazz) {
85-
return schedule(clazz, null, null, null);
86+
return schedule(clazz, null, null, null, null, null);
8687
}
8788

8889
@Override
@@ -219,7 +220,8 @@ public boolean unblock(String entryId, Object transactionContext) {
219220
}
220221

221222
private <T> T schedule(
222-
Class<T> clazz, String uniqueRequestId, String topic, Duration delayForAtLeast) {
223+
Class<T> clazz, String uniqueRequestId, String topic, Duration delayForAtLeast,
224+
Duration attemptFrequencyOverride, Integer blockAfterAttemptsOverride) {
223225
if (!initialized.get()) {
224226
throw new IllegalStateException("Not initialized");
225227
}
@@ -240,6 +242,8 @@ private <T> T schedule(
240242
if (delayForAtLeast != null) {
241243
entry.setNextAttemptTime(entry.getNextAttemptTime().plus(delayForAtLeast));
242244
}
245+
entry.setAttemptFrequency(requireNonNullElse(attemptFrequencyOverride, attemptFrequency));
246+
entry.setBlockAfterAttempts(requireNonNullElse(blockAfterAttemptsOverride, blockAfterAttempts));
243247
validator.validate(entry);
244248
persistor.save(extracted.getTransaction(), entry);
245249
extracted
@@ -253,7 +257,7 @@ private <T> T schedule(
253257
submitNow(entry);
254258
log.debug(
255259
"Scheduled {} for post-commit execution", entry.description());
256-
} else if (delayForAtLeast.compareTo(attemptFrequency) < 0) {
260+
} else if (delayForAtLeast.compareTo(entry.getAttemptFrequency()) < 0) {
257261
scheduler.schedule(
258262
() -> submitNow(entry),
259263
delayForAtLeast.toMillis(),
@@ -360,7 +364,7 @@ private void pushBack(Transaction transaction, TransactionOutboxEntry entry)
360364
throws OptimisticLockException {
361365
try {
362366
entry.setLastAttemptTime(clockProvider.get().instant());
363-
entry.setNextAttemptTime(after(attemptFrequency));
367+
entry.setNextAttemptTime(after(requireNonNullElse(entry.getAttemptFrequency(), attemptFrequency)));
364368
validator.validate(entry);
365369
persistor.update(transaction, entry);
366370
} catch (OptimisticLockException e) {
@@ -377,7 +381,7 @@ private Instant after(Duration duration) {
377381
private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) {
378382
try {
379383
entry.setAttempts(entry.getAttempts() + 1);
380-
var blocked = (entry.getTopic() == null) && (entry.getAttempts() >= blockAfterAttempts);
384+
var blocked = (entry.getTopic() == null) && (entry.getAttempts() >= requireNonNullElse(entry.getBlockAfterAttempts(), blockAfterAttempts));
381385
entry.setBlocked(blocked);
382386
transactionManager.inTransactionThrows(tx -> pushBack(tx, entry));
383387
listener.failure(entry, cause);
@@ -445,13 +449,16 @@ private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleB
445449
private String uniqueRequestId;
446450
private String ordered;
447451
private Duration delayForAtLeast;
452+
private Duration attemptFrequency;
453+
private Integer blockAfterAttempts;
448454

449455
@Override
450456
public <T> T schedule(Class<T> clazz) {
451457
if (uniqueRequestId != null && uniqueRequestId.length() > 250) {
452458
throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters");
453459
}
454-
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast);
460+
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast,
461+
attemptFrequency, blockAfterAttempts);
455462
}
456463
}
457464
}

transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java

+34
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,40 @@ final void retryBehaviour() throws Exception {
501501
singleThreadPool);
502502
}
503503

504+
/**
505+
* Overrides the default attemptFrequency and blockAfterAttempts, which was set on the TransactionOutbox.
506+
*/
507+
@Test
508+
final void retryBehaviour_overrideAttempts() throws Exception {
509+
TransactionManager transactionManager = txManager();
510+
CountDownLatch latch = new CountDownLatch(1);
511+
AtomicInteger attempts = new AtomicInteger();
512+
TransactionOutbox outbox =
513+
TransactionOutbox.builder()
514+
.transactionManager(transactionManager)
515+
.persistor(Persistor.forDialect(connectionDetails().dialect()))
516+
.instantiator(new FailingInstantiator(attempts))
517+
.submitter(Submitter.withExecutor(singleThreadPool))
518+
.attemptFrequency(Duration.ofSeconds(20)) // will be overridden
519+
.blockAfterAttempts(2) // will be overridden
520+
.listener(new LatchListener(latch))
521+
.build();
522+
523+
clearOutbox();
524+
525+
withRunningFlusher(
526+
outbox,
527+
() -> {
528+
transactionManager.inTransaction(
529+
() -> outbox.with()
530+
.attemptFrequency(Duration.ofMillis(500)) // overrides the 20s above
531+
.blockAfterAttempts(3) // overrides the 2 attempts from above (3 attempts necessary for success)
532+
.schedule(InterfaceProcessor.class).process(3, "Whee"));
533+
assertTrue(latch.await(15, SECONDS)); // will time out if the overrides do not work
534+
},
535+
singleThreadPool);
536+
}
537+
504538
@Test
505539
final void onSchedulingFailure_BubbleExceptionsUp() throws Exception {
506540
Assumptions.assumeTrue(

0 commit comments

Comments
 (0)