Skip to content

Allow overriding of attemptFrequency and blockAfterAttempts at schedule request level #739

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,18 @@ static final class Builder {
12,
"Add flush index to support ordering",
"CREATE INDEX IX_TXNO_OUTBOX_2 ON TXNO_OUTBOX (topic, processed, seq)"));
migrations.put(
13,
new Migration(
13,
"Add attemptFrequency",
"ALTER TABLE TXNO_OUTBOX ADD COLUMN attemptFrequency INT"));
migrations.put(
14,
new Migration(
14,
"Add blockAfterAttempts",
"ALTER TABLE TXNO_OUTBOX ADD COLUMN blockAfterAttempts INT"));
}

Builder setMigration(Migration migration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.sql.SQLTimeoutException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -38,7 +39,7 @@
public class DefaultPersistor implements Persistor, Validatable {

private static final String ALL_FIELDS =
"id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version";
"id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version, attemptFrequency, blockAfterAttempts";

/**
* @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write
Expand Down Expand Up @@ -115,7 +116,7 @@ public void save(Transaction tx, TransactionOutboxEntry entry)
+ tableName
+ " ("
+ ALL_FIELDS
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
var writer = new StringWriter();
serializer.serializeInvocation(entry.getInvocation(), writer);
if (entry.getTopic() != null) {
Expand Down Expand Up @@ -201,6 +202,8 @@ private void setupInsert(
stmt.setBoolean(9, entry.isBlocked());
stmt.setBoolean(10, entry.isProcessed());
stmt.setInt(11, entry.getVersion());
stmt.setObject(12, entry.getAttemptFrequency() == null ? null : entry.getAttemptFrequency().toMillis());
stmt.setObject(13, entry.getBlockAfterAttempts());
}

@Override
Expand Down Expand Up @@ -392,6 +395,12 @@ private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOExceptio
.blocked(rs.getBoolean("blocked"))
.processed(rs.getBoolean("processed"))
.version(rs.getInt("version"))
.attemptFrequency(rs.getObject("attemptFrequency") == null
? null
: Duration.ofMillis(rs.getInt("attemptFrequency")))
.blockAfterAttempts(rs.getObject("blockAfterAttempts") == null
? null
: rs.getInt("blockAfterAttempts"))
.build();
log.debug("Found {}", entry);
return entry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,20 @@ interface ParameterizedScheduleBuilder {
*/
ParameterizedScheduleBuilder delayForAtLeast(Duration duration);

/**
* Use an attemptFrequency that is different from the default set on the {@link TransactionOutbox}.
* @param attemptFrequency How often tasks should be re-attempted.
* @return Builder.
*/
ParameterizedScheduleBuilder attemptFrequency(Duration attemptFrequency);

/**
* Use a blockAfterAttempts that is different from the default set on the {@link TransactionOutbox}.
* @param blockAfterAttempts How many attempts a task should be retried before it is permanently blocked.
* @return Builder.
*/
ParameterizedScheduleBuilder blockAfterAttempts(Integer blockAfterAttempts);

/**
* Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters
* to the request as configured using {@link TransactionOutbox#with()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.util.stream.Collectors.joining;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import lombok.*;
Expand Down Expand Up @@ -83,6 +84,26 @@ public class TransactionOutboxEntry implements Validatable {
@Setter
private int attempts;

/**
* @param attemptFrequency How often tasks should be re-attempted.
* If null, the default configured on TransactionOutbox will apply.
* @return How often tasks should be re-attempted.
*/
@SuppressWarnings("JavaDoc")
@Getter
@Setter
private Duration attemptFrequency;

/**
* @param blockAfterAttempts How many attempts a task should be retried before it is permanently blocked.
* If null, the default configured on TransactionOutbox will apply.
* @return How many attempts a task should be retried before it is permanently blocked.
*/
@SuppressWarnings("JavaDoc")
@Getter
@Setter
private Integer blockAfterAttempts;

/**
* @param blocked True if the task has exceeded the configured maximum number of attempts.
* @return True if the task has exceeded the configured maximum number of attempts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.gruelbox.transactionoutbox.spi.Utils.uncheckedly;
import static java.time.temporal.ChronoUnit.MILLIS;
import static java.time.temporal.ChronoUnit.MINUTES;
import static java.util.Objects.requireNonNullElse;

import com.gruelbox.transactionoutbox.spi.ProxyFactory;
import com.gruelbox.transactionoutbox.spi.Utils;
Expand Down Expand Up @@ -82,7 +83,7 @@ public void initialize() {

@Override
public <T> T schedule(Class<T> clazz) {
return schedule(clazz, null, null, null);
return schedule(clazz, null, null, null, null, null);
}

@Override
Expand Down Expand Up @@ -219,7 +220,8 @@ public boolean unblock(String entryId, Object transactionContext) {
}

private <T> T schedule(
Class<T> clazz, String uniqueRequestId, String topic, Duration delayForAtLeast) {
Class<T> clazz, String uniqueRequestId, String topic, Duration delayForAtLeast,
Duration attemptFrequencyOverride, Integer blockAfterAttemptsOverride) {
if (!initialized.get()) {
throw new IllegalStateException("Not initialized");
}
Expand All @@ -240,6 +242,8 @@ private <T> T schedule(
if (delayForAtLeast != null) {
entry.setNextAttemptTime(entry.getNextAttemptTime().plus(delayForAtLeast));
}
entry.setAttemptFrequency(requireNonNullElse(attemptFrequencyOverride, attemptFrequency));
entry.setBlockAfterAttempts(requireNonNullElse(blockAfterAttemptsOverride, blockAfterAttempts));
validator.validate(entry);
persistor.save(extracted.getTransaction(), entry);
extracted
Expand All @@ -253,7 +257,7 @@ private <T> T schedule(
submitNow(entry);
log.debug(
"Scheduled {} for post-commit execution", entry.description());
} else if (delayForAtLeast.compareTo(attemptFrequency) < 0) {
} else if (delayForAtLeast.compareTo(entry.getAttemptFrequency()) < 0) {
scheduler.schedule(
() -> submitNow(entry),
delayForAtLeast.toMillis(),
Expand Down Expand Up @@ -360,7 +364,7 @@ private void pushBack(Transaction transaction, TransactionOutboxEntry entry)
throws OptimisticLockException {
try {
entry.setLastAttemptTime(clockProvider.get().instant());
entry.setNextAttemptTime(after(attemptFrequency));
entry.setNextAttemptTime(after(requireNonNullElse(entry.getAttemptFrequency(), attemptFrequency)));
validator.validate(entry);
persistor.update(transaction, entry);
} catch (OptimisticLockException e) {
Expand All @@ -377,7 +381,7 @@ private Instant after(Duration duration) {
private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) {
try {
entry.setAttempts(entry.getAttempts() + 1);
var blocked = (entry.getTopic() == null) && (entry.getAttempts() >= blockAfterAttempts);
var blocked = (entry.getTopic() == null) && (entry.getAttempts() >= requireNonNullElse(entry.getBlockAfterAttempts(), blockAfterAttempts));
entry.setBlocked(blocked);
transactionManager.inTransactionThrows(tx -> pushBack(tx, entry));
listener.failure(entry, cause);
Expand Down Expand Up @@ -445,13 +449,16 @@ private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleB
private String uniqueRequestId;
private String ordered;
private Duration delayForAtLeast;
private Duration attemptFrequency;
private Integer blockAfterAttempts;

@Override
public <T> T schedule(Class<T> clazz) {
if (uniqueRequestId != null && uniqueRequestId.length() > 250) {
throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters");
}
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast);
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast,
attemptFrequency, blockAfterAttempts);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,40 @@ final void retryBehaviour() throws Exception {
singleThreadPool);
}

/**
* Overrides the default attemptFrequency and blockAfterAttempts, which was set on the TransactionOutbox.
*/
@Test
final void retryBehaviour_overrideAttempts() throws Exception {
TransactionManager transactionManager = txManager();
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger attempts = new AtomicInteger();
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.instantiator(new FailingInstantiator(attempts))
.submitter(Submitter.withExecutor(singleThreadPool))
.attemptFrequency(Duration.ofSeconds(20)) // will be overridden
.blockAfterAttempts(2) // will be overridden
.listener(new LatchListener(latch))
.build();

clearOutbox();

withRunningFlusher(
outbox,
() -> {
transactionManager.inTransaction(
() -> outbox.with()
.attemptFrequency(Duration.ofMillis(500)) // overrides the 20s above
.blockAfterAttempts(3) // overrides the 2 attempts from above (3 attempts necessary for success)
.schedule(InterfaceProcessor.class).process(3, "Whee"));
assertTrue(latch.await(15, SECONDS)); // will time out if the overrides do not work
},
singleThreadPool);
}

@Test
final void onSchedulingFailure_BubbleExceptionsUp() throws Exception {
Assumptions.assumeTrue(
Expand Down