Skip to content

Commit dbede58

Browse files
committed
Refactor to support a separate SPI package
This commit includes a large refactoring with various improvements, but generally is aimed towards having a separate SPI package that provides access to most of the Failsafe internals such that it's easy to implement policies. The changes in this commit include: - Separating the execution types into separate public and internal interfaces - Creating an spi package and putting the various internal extensible classes in there - Improved the way internal classes interact with FailsafeFuture Fixes #292
1 parent ef64f94 commit dbede58

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1694
-1222
lines changed

src/main/java/net/jodah/failsafe/AbstractExecution.java

Lines changed: 0 additions & 148 deletions
This file was deleted.
Lines changed: 13 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -1,88 +1,26 @@
1-
/*
2-
* Copyright 2016 the original author or authors.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License
15-
*/
161
package net.jodah.failsafe;
172

18-
import net.jodah.failsafe.internal.util.Assert;
19-
import net.jodah.failsafe.util.concurrent.Scheduler;
20-
21-
import java.util.List;
223
import java.util.concurrent.CompletableFuture;
23-
import java.util.concurrent.CompletionException;
24-
import java.util.concurrent.Future;
25-
import java.util.function.Function;
264

275
/**
28-
* Tracks asynchronous executions and handles failures according to one or more {@link Policy policies}. Execution
29-
* results must be explicitly recorded via one of the {@code record} methods.
6+
* Allows asynchronous executions to record their results or complete an execution.
307
*
318
* @param <R> result type
329
* @author Jonathan Halterman
3310
*/
34-
public final class AsyncExecution<R> extends AbstractExecution<R> {
35-
// Cross-attempt state --
36-
// The outer-most function that executions begin with
37-
private Function<AsyncExecution<R>, CompletableFuture<ExecutionResult>> outerFn;
38-
private final boolean asyncExecution;
39-
final FailsafeFuture<R> future;
40-
41-
// Per-attempt state --
42-
// Whether a result has been explicitly recorded
43-
volatile boolean recordCalled;
44-
// The future for the thread that the innerFn is running in
45-
volatile Future<?> innerFuture;
46-
// Whether a policy executor completed post execution
47-
private final boolean[] policyPostExecuted;
48-
49-
AsyncExecution(List<Policy<R>> policies, Scheduler scheduler, FailsafeFuture<R> future, boolean asyncExecution,
50-
Function<AsyncExecution<R>, CompletableFuture<ExecutionResult>> innerFn) {
51-
super(policies, scheduler);
52-
this.future = future;
53-
this.asyncExecution = asyncExecution;
54-
this.policyPostExecuted = new boolean[policyExecutors.size()];
55-
56-
outerFn = asyncExecution ? Functions.toExecutionAware(innerFn) : innerFn;
57-
outerFn = Functions.toAsync(outerFn, scheduler);
58-
59-
for (PolicyExecutor<R, ? extends Policy<R>> policyExecutor : policyExecutors)
60-
outerFn = policyExecutor.applyAsync(outerFn, scheduler, this.future);
61-
}
62-
63-
private AsyncExecution(AsyncExecution<R> execution) {
64-
super(execution);
65-
outerFn = execution.outerFn;
66-
future = execution.future;
67-
asyncExecution = execution.asyncExecution;
68-
policyPostExecuted = new boolean[policyExecutors.size()];
69-
}
70-
11+
public interface AsyncExecution<R> extends ExecutionContext<R> {
7112
/**
7213
* Completes the execution and the associated {@code CompletableFuture}.
7314
*
74-
* @throws IllegalStateException if the execution is already complete
15+
* @throws IllegalStateException if the execution is already recorded or complete
7516
*/
76-
public void complete() {
77-
Assert.state(!recordCalled, "The most recent execution has already been recorded");
78-
recordCalled = true;
17+
void complete();
7918

80-
// Guard against race with a timeout expiring
81-
synchronized (future) {
82-
ExecutionResult result = this.result != null ? this.result : ExecutionResult.NONE;
83-
complete(postExecute(result), null);
84-
}
85-
}
19+
/**
20+
* Returns whether the execution is complete or if it can be retried. An execution is considered complete only when
21+
* all configured policies consider the execution complete.
22+
*/
23+
boolean isComplete();
8624

8725
/**
8826
* Records an execution {@code result} or {@code failure} which triggers failure handling, if needed, by the
@@ -91,78 +29,21 @@ public void complete() {
9129
*
9230
* @throws IllegalStateException if the most recent execution was already recorded or the execution is complete
9331
*/
94-
public void record(R result, Throwable failure) {
95-
Assert.state(!recordCalled, "The most recent execution has already been recorded");
96-
recordCalled = true;
97-
98-
// Guard against race with a timeout expiring
99-
synchronized (future) {
100-
if (!attemptRecorded) {
101-
ExecutionResult er = new ExecutionResult(result, failure).withDelay(delayNanos);
102-
record(er);
103-
}
104-
105-
// Proceed with handling the recorded result
106-
executeAsync();
107-
}
108-
}
32+
void record(R result, Throwable failure);
10933

11034
/**
11135
* Records an execution {@code result} which triggers failure handling, if needed, by the configured policies. If
11236
* policy handling is not possible or already complete, the resulting {@link CompletableFuture} is completed.
11337
*
11438
* @throws IllegalStateException if the most recent execution was already recorded or the execution is complete
11539
*/
116-
public void recordResult(R result) {
117-
record(result, null);
118-
}
40+
void recordResult(R result);
11941

12042
/**
12143
* Records an execution {@code failure} which triggers failure handling, if needed, by the configured policies. If
12244
* policy handling is not possible or already complete, the resulting {@link CompletableFuture} is completed.
12345
*
12446
* @throws IllegalStateException if the most recent execution was already recorded or the execution is complete
12547
*/
126-
public void recordFailure(Throwable failure) {
127-
record(null, failure);
128-
}
129-
130-
/**
131-
* Performs an asynchronous execution.
132-
*/
133-
void executeAsync() {
134-
outerFn.apply(this).whenComplete(this::complete);
135-
}
136-
137-
private void complete(ExecutionResult result, Throwable error) {
138-
if (result == null && error == null)
139-
return;
140-
141-
completed = true;
142-
if (!future.isDone()) {
143-
if (result != null)
144-
future.completeResult(result);
145-
else {
146-
if (error instanceof CompletionException)
147-
error = error.getCause();
148-
future.completeResult(ExecutionResult.failure(error));
149-
}
150-
}
151-
}
152-
153-
synchronized void setPostExecuted(int policyIndex) {
154-
policyPostExecuted[policyIndex] = true;
155-
}
156-
157-
synchronized boolean isPostExecuted(int policyIndex) {
158-
return policyPostExecuted[policyIndex];
159-
}
160-
161-
boolean isAsyncExecution() {
162-
return asyncExecution;
163-
}
164-
165-
AsyncExecution<R> copy() {
166-
return new AsyncExecution<>(this);
167-
}
168-
}
48+
void recordFailure(Throwable failure);
49+
}

0 commit comments

Comments
 (0)