Skip to content

Commit 45b69fb

Browse files
lucasmcdonald3Lucas McDonald
and
Lucas McDonald
authored
fix: Close threads when calling PutObject (#180)
* fix: Close threads when calling PutObject --------- Co-authored-by: Lucas McDonald <[email protected]>
1 parent a9fdaa3 commit 45b69fb

File tree

5 files changed

+57
-10
lines changed

5 files changed

+57
-10
lines changed

src/main/java/software/amazon/encryption/s3/S3EncryptionClient.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -192,12 +192,28 @@ public PutObjectResponse putObject(PutObjectRequest putObjectRequest, RequestBod
192192
.secureRandom(_secureRandom)
193193
.build();
194194

195+
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
196+
195197
try {
196-
CompletableFuture<PutObjectResponse> futurePut = pipeline.putObject(putObjectRequest, AsyncRequestBody.fromInputStream(requestBody.contentStreamProvider().newStream(), requestBody.optionalContentLength().orElse(-1L), Executors.newSingleThreadExecutor()));
197-
return futurePut.join();
198+
CompletableFuture<PutObjectResponse> futurePut = pipeline.putObject(putObjectRequest,
199+
AsyncRequestBody.fromInputStream(
200+
requestBody.contentStreamProvider().newStream(),
201+
requestBody.optionalContentLength().orElse(-1L),
202+
singleThreadExecutor
203+
)
204+
);
205+
206+
PutObjectResponse response = futurePut.join();
207+
208+
singleThreadExecutor.shutdown();
209+
210+
return response;
211+
198212
} catch (CompletionException completionException) {
213+
singleThreadExecutor.shutdownNow();
199214
throw new S3EncryptionClientException(completionException.getMessage(), completionException.getCause());
200215
} catch (Exception exception) {
216+
singleThreadExecutor.shutdownNow();
201217
throw new S3EncryptionClientException(exception.getMessage(), exception);
202218
}
203219

@@ -279,7 +295,6 @@ private CompleteMultipartUploadResponse multipartPutObject(PutObjectRequest requ
279295
}
280296

281297
ExecutorService es = multipartConfiguration.executorService();
282-
final boolean defaultExecutorService = es == null;
283298
if (es == null) {
284299
throw new S3EncryptionClientException("ExecutorService should not be null, Please initialize during MultipartConfiguration");
285300
}
@@ -315,8 +330,8 @@ private CompleteMultipartUploadResponse multipartPutObject(PutObjectRequest requ
315330
} catch (IOException | InterruptedException | ExecutionException | RuntimeException | Error ex) {
316331
throw onAbort(observer, ex);
317332
} finally {
318-
if (defaultExecutorService) {
319-
// shut down the locally created thread pool
333+
if (multipartConfiguration.usingDefaultExecutorService()) {
334+
// shut down the thread pool if it was created by the encryption client
320335
es.shutdownNow();
321336
}
322337
// delete left-over temp files

src/main/java/software/amazon/encryption/s3/internal/MultipartUploadObjectPipeline.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Collections;
3333
import java.util.HashMap;
3434
import java.util.Map;
35+
import java.util.concurrent.ExecutorService;
3536
import java.util.concurrent.Executors;
3637

3738
import static software.amazon.encryption.s3.internal.ApiNameVersion.API_NAME_INTERCEPTOR;
@@ -134,10 +135,18 @@ public UploadPartResponse uploadPart(UploadPartRequest request, RequestBody requ
134135
// Checks the parts are uploaded in series
135136
materials.beginPartUpload(actualRequest.partNumber(), partContentLength);
136137
Cipher cipher = materials.getCipher(materials.getIv());
138+
139+
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
140+
137141
try {
138-
final AsyncRequestBody cipherAsyncRequestBody = new CipherAsyncRequestBody(AsyncRequestBody.fromInputStream(requestBody.contentStreamProvider().newStream(),
142+
final AsyncRequestBody cipherAsyncRequestBody = new CipherAsyncRequestBody(
143+
AsyncRequestBody.fromInputStream(
144+
requestBody.contentStreamProvider().newStream(),
139145
partContentLength, // this MUST be the original contentLength; it refers to the plaintext stream
140-
Executors.newSingleThreadExecutor()), ciphertextLength, materials, cipher.getIV(), isLastPart);
146+
singleThreadExecutor
147+
),
148+
ciphertextLength, materials, cipher.getIV(), isLastPart
149+
);
141150

142151
// Ensure we haven't already seen the last part
143152
if (isLastPart) {
@@ -156,6 +165,9 @@ public UploadPartResponse uploadPart(UploadPartRequest request, RequestBody requ
156165
if (isLastPart) {
157166
materials.setHasFinalPartBeenSeen(true);
158167
}
168+
169+
singleThreadExecutor.shutdown();
170+
159171
return response;
160172
}
161173

src/main/java/software/amazon/encryption/s3/materials/MultipartConfiguration.java

+10
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public class MultipartConfiguration {
1414
private final long _diskLimit;
1515
private final UploadObjectObserver _observer;
1616
private final ExecutorService _es;
17+
private final boolean _usingDefaultExecutorService;
1718
private final MultiFileOutputStream _outputStream;
1819

1920
public MultipartConfiguration(Builder builder) {
@@ -22,6 +23,7 @@ public MultipartConfiguration(Builder builder) {
2223
this._diskLimit = builder._diskLimit;
2324
this._observer = builder._observer;
2425
this._es = builder._es;
26+
this._usingDefaultExecutorService = builder._usingDefaultExecutorService;
2527
this._outputStream = builder._outputStream;
2628
}
2729

@@ -53,6 +55,10 @@ public ExecutorService executorService() {
5355
return _es;
5456
}
5557

58+
public boolean usingDefaultExecutorService() {
59+
return _usingDefaultExecutorService;
60+
}
61+
5662
static public class Builder {
5763
private final long MIN_PART_SIZE = 5 << 20;
5864
private MultiFileOutputStream _outputStream = new MultiFileOutputStream();
@@ -64,6 +70,7 @@ static public class Builder {
6470
private UploadObjectObserver _observer = new UploadObjectObserver();
6571
// If null, ExecutorService will be initialized in build() based on maxConnections.
6672
private ExecutorService _es = null;
73+
private boolean _usingDefaultExecutorService;
6774

6875
private Builder() {
6976
}
@@ -104,6 +111,9 @@ public Builder multiFileOutputStream(MultiFileOutputStream outputStream) {
104111
public MultipartConfiguration build() {
105112
if (_es == null) {
106113
_es = Executors.newFixedThreadPool(_maxConnections);
114+
_usingDefaultExecutorService = true;
115+
} else {
116+
_usingDefaultExecutorService = false;
107117
}
108118

109119
return new MultipartConfiguration(this);

src/test/java/software/amazon/encryption/s3/S3EncryptionClientMultipartUploadTest.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.List;
3636
import java.util.Map;
3737
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.ExecutorService;
3839
import java.util.concurrent.Executors;
3940

4041
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,11 +74,14 @@ public void multipartPutObjectAsync() throws IOException {
7374
Map<String, String> encryptionContext = new HashMap<>();
7475
encryptionContext.put("user-metadata-key", "user-metadata-value-v3-to-v3");
7576

77+
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
78+
7679
CompletableFuture<PutObjectResponse> futurePut = v3Client.putObject(builder -> builder
7780
.bucket(BUCKET)
7881
.overrideConfiguration(withAdditionalConfiguration(encryptionContext))
79-
.key(objectKey), AsyncRequestBody.fromInputStream(inputStream, fileSizeLimit, Executors.newSingleThreadExecutor()));
82+
.key(objectKey), AsyncRequestBody.fromInputStream(inputStream, fileSizeLimit, singleThreadExecutor));
8083
futurePut.join();
84+
singleThreadExecutor.shutdown();
8185

8286
// Asserts
8387
CompletableFuture<ResponseInputStream<GetObjectResponse>> getFuture = v3Client.getObject(builder -> builder
@@ -110,12 +114,15 @@ public void multipartPutObjectAsyncLargeObjectFails() {
110114
Map<String, String> encryptionContext = new HashMap<>();
111115
encryptionContext.put("user-metadata-key", "user-metadata-value-v3-to-v3");
112116

117+
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
118+
113119
assertThrows(S3EncryptionClientException.class, () -> v3Client.putObject(builder -> builder
114120
.bucket(BUCKET)
115121
.overrideConfiguration(withAdditionalConfiguration(encryptionContext))
116-
.key(objectKey), AsyncRequestBody.fromInputStream(inputStream, fileSizeLimit, Executors.newSingleThreadExecutor())));
122+
.key(objectKey), AsyncRequestBody.fromInputStream(inputStream, fileSizeLimit, singleThreadExecutor)));
117123

118124
v3Client.close();
125+
singleThreadExecutor.shutdown();
119126
}
120127

121128
@Test

src/test/java/software/amazon/encryption/s3/S3EncryptionClientStreamTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.security.Security;
4141
import java.util.concurrent.CompletableFuture;
4242
import java.util.concurrent.CompletionException;
43+
import java.util.concurrent.ExecutorService;
4344
import java.util.concurrent.Executors;
4445

4546
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -324,13 +325,15 @@ public void customSetBufferSizeWithLargeObjectAsyncClient() throws IOException {
324325
// Tight bound on the custom buffer size limit of 32MiB
325326
final long fileSizeExceedingDefaultLimit = 1024 * 1024 * 32 + 1;
326327
final InputStream largeObjectStream = new BoundedInputStream(fileSizeExceedingDefaultLimit);
328+
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
327329
CompletableFuture<PutObjectResponse> futurePut = v3ClientWithBuffer32MiB.putObject(PutObjectRequest.builder()
328330
.bucket(BUCKET)
329331
.key(objectKey)
330-
.build(), AsyncRequestBody.fromInputStream(largeObjectStream, fileSizeExceedingDefaultLimit, Executors.newSingleThreadExecutor()));
332+
.build(), AsyncRequestBody.fromInputStream(largeObjectStream, fileSizeExceedingDefaultLimit, singleThreadExecutor));
331333

332334
futurePut.join();
333335
largeObjectStream.close();
336+
singleThreadExecutor.shutdown();
334337

335338
try {
336339
// Object is larger than Buffer, so getObject fails

0 commit comments

Comments
 (0)