diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java index caa70d87..13a2892a 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java @@ -158,6 +158,18 @@ static final class Builder { 12, "Add flush index to support ordering", "CREATE INDEX IX_TXNO_OUTBOX_2 ON TXNO_OUTBOX (topic, processed, seq)")); + migrations.put( + 13, + new Migration( + 13, + "Add attemptFrequency", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN attemptFrequency INT")); + migrations.put( + 14, + new Migration( + 14, + "Add blockAfterAttempts", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN blockAfterAttempts INT")); } Builder setMigration(Migration migration) { diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java index 52a00f93..e079db9b 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java @@ -11,6 +11,7 @@ import java.sql.SQLTimeoutException; import java.sql.Statement; import java.sql.Timestamp; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; @@ -38,7 +39,7 @@ public class DefaultPersistor implements Persistor, Validatable { private static final String ALL_FIELDS = - "id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version"; + "id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version, attemptFrequency, blockAfterAttempts"; /** * @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write @@ -115,7 +116,7 @@ public void save(Transaction tx, TransactionOutboxEntry entry) + tableName + " (" + ALL_FIELDS - + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; var writer = new StringWriter(); serializer.serializeInvocation(entry.getInvocation(), writer); if (entry.getTopic() != null) { @@ -201,6 +202,8 @@ private void setupInsert( stmt.setBoolean(9, entry.isBlocked()); stmt.setBoolean(10, entry.isProcessed()); stmt.setInt(11, entry.getVersion()); + stmt.setObject(12, entry.getAttemptFrequency() == null ? null : entry.getAttemptFrequency().toMillis()); + stmt.setObject(13, entry.getBlockAfterAttempts()); } @Override @@ -392,6 +395,12 @@ private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOExceptio .blocked(rs.getBoolean("blocked")) .processed(rs.getBoolean("processed")) .version(rs.getInt("version")) + .attemptFrequency(rs.getObject("attemptFrequency") == null + ? null + : Duration.ofMillis(rs.getInt("attemptFrequency"))) + .blockAfterAttempts(rs.getObject("blockAfterAttempts") == null + ? null + : rs.getInt("blockAfterAttempts")) .build(); log.debug("Found {}", entry); return entry; diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java index 51e0ab59..a4f5e0c9 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java @@ -394,6 +394,20 @@ interface ParameterizedScheduleBuilder { */ ParameterizedScheduleBuilder delayForAtLeast(Duration duration); + /** + * Use an attemptFrequency that is different from the default set on the {@link TransactionOutbox}. + * @param attemptFrequency How often tasks should be re-attempted. + * @return Builder. + */ + ParameterizedScheduleBuilder attemptFrequency(Duration attemptFrequency); + + /** + * Use a blockAfterAttempts that is different from the default set on the {@link TransactionOutbox}. + * @param blockAfterAttempts How many attempts a task should be retried before it is permanently blocked. + * @return Builder. + */ + ParameterizedScheduleBuilder blockAfterAttempts(Integer blockAfterAttempts); + /** * Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters * to the request as configured using {@link TransactionOutbox#with()}. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java index 01749722..9cafc5fc 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java @@ -2,6 +2,7 @@ import static java.util.stream.Collectors.joining; +import java.time.Duration; import java.time.Instant; import java.util.Arrays; import lombok.*; @@ -83,6 +84,26 @@ public class TransactionOutboxEntry implements Validatable { @Setter private int attempts; + /** + * @param attemptFrequency How often tasks should be re-attempted. + * If null, the default configured on TransactionOutbox will apply. + * @return How often tasks should be re-attempted. + */ + @SuppressWarnings("JavaDoc") + @Getter + @Setter + private Duration attemptFrequency; + + /** + * @param blockAfterAttempts How many attempts a task should be retried before it is permanently blocked. + * If null, the default configured on TransactionOutbox will apply. + * @return How many attempts a task should be retried before it is permanently blocked. + */ + @SuppressWarnings("JavaDoc") + @Getter + @Setter + private Integer blockAfterAttempts; + /** * @param blocked True if the task has exceeded the configured maximum number of attempts. * @return True if the task has exceeded the configured maximum number of attempts. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index 7ae2cb1d..b94dc543 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -4,6 +4,7 @@ import static com.gruelbox.transactionoutbox.spi.Utils.uncheckedly; import static java.time.temporal.ChronoUnit.MILLIS; import static java.time.temporal.ChronoUnit.MINUTES; +import static java.util.Objects.requireNonNullElse; import com.gruelbox.transactionoutbox.spi.ProxyFactory; import com.gruelbox.transactionoutbox.spi.Utils; @@ -82,7 +83,7 @@ public void initialize() { @Override public T schedule(Class clazz) { - return schedule(clazz, null, null, null); + return schedule(clazz, null, null, null, null, null); } @Override @@ -219,7 +220,8 @@ public boolean unblock(String entryId, Object transactionContext) { } private T schedule( - Class clazz, String uniqueRequestId, String topic, Duration delayForAtLeast) { + Class clazz, String uniqueRequestId, String topic, Duration delayForAtLeast, + Duration attemptFrequencyOverride, Integer blockAfterAttemptsOverride) { if (!initialized.get()) { throw new IllegalStateException("Not initialized"); } @@ -240,6 +242,8 @@ private T schedule( if (delayForAtLeast != null) { entry.setNextAttemptTime(entry.getNextAttemptTime().plus(delayForAtLeast)); } + entry.setAttemptFrequency(requireNonNullElse(attemptFrequencyOverride, attemptFrequency)); + entry.setBlockAfterAttempts(requireNonNullElse(blockAfterAttemptsOverride, blockAfterAttempts)); validator.validate(entry); persistor.save(extracted.getTransaction(), entry); extracted @@ -253,7 +257,7 @@ private T schedule( submitNow(entry); log.debug( "Scheduled {} for post-commit execution", entry.description()); - } else if (delayForAtLeast.compareTo(attemptFrequency) < 0) { + } else if (delayForAtLeast.compareTo(entry.getAttemptFrequency()) < 0) { scheduler.schedule( () -> submitNow(entry), delayForAtLeast.toMillis(), @@ -360,7 +364,7 @@ private void pushBack(Transaction transaction, TransactionOutboxEntry entry) throws OptimisticLockException { try { entry.setLastAttemptTime(clockProvider.get().instant()); - entry.setNextAttemptTime(after(attemptFrequency)); + entry.setNextAttemptTime(after(requireNonNullElse(entry.getAttemptFrequency(), attemptFrequency))); validator.validate(entry); persistor.update(transaction, entry); } catch (OptimisticLockException e) { @@ -377,7 +381,7 @@ private Instant after(Duration duration) { private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) { try { entry.setAttempts(entry.getAttempts() + 1); - var blocked = (entry.getTopic() == null) && (entry.getAttempts() >= blockAfterAttempts); + var blocked = (entry.getTopic() == null) && (entry.getAttempts() >= requireNonNullElse(entry.getBlockAfterAttempts(), blockAfterAttempts)); entry.setBlocked(blocked); transactionManager.inTransactionThrows(tx -> pushBack(tx, entry)); listener.failure(entry, cause); @@ -445,13 +449,16 @@ private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleB private String uniqueRequestId; private String ordered; private Duration delayForAtLeast; + private Duration attemptFrequency; + private Integer blockAfterAttempts; @Override public T schedule(Class clazz) { if (uniqueRequestId != null && uniqueRequestId.length() > 250) { throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters"); } - return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast); + return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast, + attemptFrequency, blockAfterAttempts); } } } diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java index 878c0c17..b4280b3b 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java @@ -501,6 +501,40 @@ final void retryBehaviour() throws Exception { singleThreadPool); } + /** + * Overrides the default attemptFrequency and blockAfterAttempts, which was set on the TransactionOutbox. + */ + @Test + final void retryBehaviour_overrideAttempts() throws Exception { + TransactionManager transactionManager = txManager(); + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger attempts = new AtomicInteger(); + TransactionOutbox outbox = + TransactionOutbox.builder() + .transactionManager(transactionManager) + .persistor(Persistor.forDialect(connectionDetails().dialect())) + .instantiator(new FailingInstantiator(attempts)) + .submitter(Submitter.withExecutor(singleThreadPool)) + .attemptFrequency(Duration.ofSeconds(20)) // will be overridden + .blockAfterAttempts(2) // will be overridden + .listener(new LatchListener(latch)) + .build(); + + clearOutbox(); + + withRunningFlusher( + outbox, + () -> { + transactionManager.inTransaction( + () -> outbox.with() + .attemptFrequency(Duration.ofMillis(500)) // overrides the 20s above + .blockAfterAttempts(3) // overrides the 2 attempts from above (3 attempts necessary for success) + .schedule(InterfaceProcessor.class).process(3, "Whee")); + assertTrue(latch.await(15, SECONDS)); // will time out if the overrides do not work + }, + singleThreadPool); + } + @Test final void onSchedulingFailure_BubbleExceptionsUp() throws Exception { Assumptions.assumeTrue(