Skip to content

Commit 9f230c9

Browse files
committed
Fix calculation of multipart S3 upload chunk size
- Add check for maximum file size - Calculate minimum chunk size (maximum chunk count) for multipart S3 file upload, checking for proper limit of chunk count before upload - Make MAX_UPLOAD_PARTS u64 for uniformity and simplicity
1 parent 840a5b3 commit 9f230c9

File tree

2 files changed

+224
-8
lines changed

2 files changed

+224
-8
lines changed

nativelink-store/src/s3_store.rs

+33-8
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ use tracing::{Level, event};
7474

7575
use crate::cas_utils::is_zero_digest;
7676

77+
// S3 object cannot be larger than this number. See:
78+
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
79+
const MAX_UPLOAD_SIZE: u64 = 5 * 1024 * 1024 * 1024 * 1024; // 5TB.
80+
7781
// S3 parts cannot be smaller than this number. See:
7882
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
7983
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB.
@@ -84,7 +88,8 @@ const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB.
8488

8589
// S3 parts cannot be more than this number. See:
8690
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
87-
const MAX_UPLOAD_PARTS: usize = 10_000;
91+
// Note: Type 'u64' chosen to simplify calculations
92+
const MAX_UPLOAD_PARTS: u64 = 10_000;
8893

8994
// Default max buffer size for retrying upload requests.
9095
// Note: If you change this, adjust the docs in the config.
@@ -594,6 +599,14 @@ where
594599
UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz,
595600
};
596601

602+
// Sanity check S3 maximum upload size.
603+
if max_size > MAX_UPLOAD_SIZE {
604+
return Err(make_err!(
605+
Code::FailedPrecondition,
606+
"File size exceeds max of {MAX_UPLOAD_SIZE}"
607+
));
608+
}
609+
597610
// Note(allada) It might be more optimal to use a different
598611
// heuristic here, but for simplicity we use a hard coded value.
599612
// Anything going down this if-statement will have the advantage of only
@@ -704,9 +717,24 @@ where
704717
.await?;
705718

706719
// S3 requires us to upload in parts if the size is greater than 5GB. The part size must be at least
707-
// 5mb (except last part) and can have up to 10,000 parts.
720+
// 5MB (except last part) and can have up to 10,000 parts.
721+
722+
// Calculate of number of chunks if we upload in 5MB chucks (min chunk size), clamping to
723+
// 10,000 parts and correcting for lossy integer division. This provides the
724+
let chunk_count = (max_size / MIN_MULTIPART_SIZE).clamp(0, MAX_UPLOAD_PARTS - 1) + 1;
725+
726+
// Using clamped first approximation of number of chunks, calculate byte count of each
727+
// chunk, excluding last chunk, clamping to min/max upload size 5MB, 5GB.
708728
let bytes_per_upload_part =
709-
(max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
729+
(max_size / chunk_count).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
730+
731+
// Sanity check before continuing.
732+
if !(MIN_MULTIPART_SIZE..MAX_MULTIPART_SIZE).contains(&bytes_per_upload_part) {
733+
return Err(make_err!(
734+
Code::FailedPrecondition,
735+
"Failed to calculate file chuck size (min, max, calc): {MIN_MULTIPART_SIZE}, {MAX_MULTIPART_SIZE}, {bytes_per_upload_part}",
736+
));
737+
}
710738

711739
let upload_parts = move || async move {
712740
// This will ensure we only have `multipart_max_concurrent_uploads` * `bytes_per_upload_part`
@@ -773,11 +801,8 @@ where
773801
let mut upload_futures = FuturesUnordered::new();
774802

775803
let mut completed_parts = Vec::with_capacity(
776-
usize::try_from(cmp::min(
777-
MAX_UPLOAD_PARTS as u64,
778-
(max_size / bytes_per_upload_part) + 1,
779-
))
780-
.err_tip(|| "Could not convert u64 to usize")?,
804+
usize::try_from(cmp::min(MAX_UPLOAD_PARTS, chunk_count))
805+
.err_tip(|| "Could not convert u64 to usize")?,
781806
);
782807
tokio::pin!(read_stream_fut);
783808
loop {

nativelink-store/tests/s3_store_test.rs

+191
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use nativelink_util::common::DigestInfo;
3636
use nativelink_util::instant_wrapper::MockInstantWrapped;
3737
use nativelink_util::spawn;
3838
use nativelink_util::store_trait::{StoreLike, UploadSizeInfo};
39+
use patricia_tree::BorrowedBytes;
3940
use pretty_assertions::assert_eq;
4041
use sha2::{Digest, Sha256};
4142

@@ -769,3 +770,193 @@ async fn has_with_expired_result() -> Result<(), Error> {
769770

770771
Ok(())
771772
}
773+
774+
// TODO: Consider moving business logic to a helper so that multiple tests can take advantage of
775+
// creating multipart uploads with various chunk sizes. This could be done alongside effort to test
776+
// for chunk size > MIN_MULTIPART_SIZE.
777+
#[nativelink_test]
778+
async fn multipart_chunk_size_clamp_min() -> Result<(), Error> {
779+
// Same as in s3_store.
780+
const MIN_MULTIPART_SIZE: usize = 5 * 1024 * 1024; // 5MiB
781+
const MAX_MULTIPART_SIZE: usize = 5 * 1024 * 1024 * 1024; // 5GiB
782+
const MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024 * 1024 * 1024; // 5 TiB
783+
const MAX_UPLOAD_PARTS: usize = 10_000;
784+
785+
// It is impossible to trigger a clamp down to MAX_MULTIPART_SIZE...
786+
assert!(MAX_UPLOAD_SIZE / MAX_UPLOAD_PARTS < MAX_MULTIPART_SIZE);
787+
788+
// ... so chose an upload size that will force chunk size calculation to be clamped up to MIN_MULTIPART_SIZE
789+
const AC_ENTRY_SIZE: usize = 10 * MIN_MULTIPART_SIZE + 50;
790+
791+
// Expected values
792+
const CHUNK_SIZE: usize = MIN_MULTIPART_SIZE;
793+
const CHUNK_COUNT: usize = AC_ENTRY_SIZE / CHUNK_SIZE + 1; // 11
794+
const LAST_CHUNK_SIZE: usize = AC_ENTRY_SIZE % MIN_MULTIPART_SIZE;
795+
796+
// Allocate a palatable amount of data to send repeatedly until the chosen number of bytes have
797+
// been uploaded.
798+
let mut send_data = Vec::with_capacity(MIN_MULTIPART_SIZE);
799+
for i in 0..send_data.capacity() {
800+
send_data.push(((i * 3) % 256) as u8);
801+
}
802+
803+
let digest = DigestInfo::try_new(VALID_HASH1, AC_ENTRY_SIZE)?;
804+
805+
let mut events = Vec::new();
806+
// InitiateMultipartUpload
807+
events.push(
808+
ReplayEvent::new(
809+
http::Request::builder()
810+
.uri(format!(
811+
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?uploads",
812+
))
813+
.method("POST")
814+
.body(SdkBody::empty())
815+
.unwrap(),
816+
http::Response::builder()
817+
.status(StatusCode::OK)
818+
.body(SdkBody::from(
819+
r#"
820+
<InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
821+
<UploadId>Dummy-uploadid</UploadId>
822+
</InitiateMultipartUploadResult>"#
823+
.as_bytes(),
824+
))
825+
.unwrap(),
826+
)
827+
);
828+
829+
// UploadPart
830+
for i in 1..=(AC_ENTRY_SIZE / MIN_MULTIPART_SIZE) {
831+
events.push(
832+
ReplayEvent::new(
833+
http::Request::builder()
834+
.uri(format!(
835+
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?x-id=UploadPart&partNumber={i}&uploadId=Dummy-uploadid",
836+
))
837+
.method("PUT")
838+
.header("content-type", "application/octet-stream")
839+
.header("content-length", "5242880")
840+
.body(SdkBody::from(&send_data[0..MIN_MULTIPART_SIZE]))
841+
.unwrap(),
842+
http::Response::builder()
843+
.status(StatusCode::OK)
844+
.body(SdkBody::empty())
845+
.unwrap(),
846+
),
847+
);
848+
}
849+
850+
// Last UploadPart
851+
events.push(
852+
ReplayEvent::new(
853+
http::Request::builder()
854+
.uri(format!(
855+
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?x-id=UploadPart&partNumber={}&uploadId=Dummy-uploadid",
856+
CHUNK_COUNT,
857+
))
858+
.method("PUT")
859+
.header("content-type", "application/octet-stream")
860+
.header("content-length", "50")
861+
.body(SdkBody::from(
862+
&send_data[0..LAST_CHUNK_SIZE],
863+
))
864+
.unwrap(),
865+
http::Response::builder()
866+
.status(StatusCode::OK)
867+
.body(SdkBody::empty())
868+
.unwrap(),
869+
),
870+
);
871+
872+
// Build body of CompleteMultipartUpload
873+
let mut body = String::from(
874+
r#"<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"#,
875+
);
876+
for i in 1..=CHUNK_COUNT {
877+
body.push_str(&format!("<Part><PartNumber>{i}</PartNumber></Part>"));
878+
}
879+
body.push_str("</CompleteMultipartUpload>");
880+
881+
// CompleteMultipartUpload
882+
events.push(
883+
ReplayEvent::new(
884+
http::Request::builder()
885+
.uri(format!(
886+
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?uploadId=Dummy-uploadid",
887+
))
888+
.method("POST")
889+
.header("content-length", format!("{}", body.len()))
890+
.body(SdkBody::from(body.as_str()))
891+
.unwrap(),
892+
http::Response::builder()
893+
.status(StatusCode::OK)
894+
.body(SdkBody::from(concat!(
895+
"<CompleteMultipartUploadResult>",
896+
"</CompleteMultipartUploadResult>",
897+
)))
898+
.unwrap(),
899+
),
900+
);
901+
902+
// Leave here for safe-keeping our sanity in the event the test parameters are modified.
903+
assert!(
904+
CHUNK_COUNT < MAX_UPLOAD_PARTS,
905+
"Expected chunck count to be less than 10,000 (max)"
906+
);
907+
908+
let mock_client = StaticReplayClient::new(events);
909+
let test_config = Builder::new()
910+
.behavior_version(BehaviorVersion::v2025_01_17())
911+
.region(Region::from_static(REGION))
912+
.http_client(mock_client.clone())
913+
.build();
914+
let s3_client = aws_sdk_s3::Client::from_conf(test_config);
915+
let store = S3Store::new_with_client_and_jitter(
916+
&S3Spec {
917+
bucket: BUCKET_NAME.to_string(),
918+
..Default::default()
919+
},
920+
s3_client,
921+
Arc::new(move |_delay| Duration::from_secs(0)),
922+
MockInstantWrapped::default,
923+
)?;
924+
925+
// To avoid needing a complete buffer of a certain size, we're going to pump data from a
926+
// reasonably-sized buffer repeatedly until the expected number of bytes have been written.
927+
let (mut tx, rx) = make_buf_channel_pair();
928+
// Make future responsible for processing the datastream
929+
// and forwarding it to the s3 backend/server.
930+
let update_fut = Box::pin(async move {
931+
store
932+
.update(digest, rx, UploadSizeInfo::ExactSize(AC_ENTRY_SIZE as u64))
933+
.await
934+
});
935+
936+
let send_data_copy = Bytes::copy_from_slice(send_data.as_bytes());
937+
// Create spawn that is responsible for sending the stream of data
938+
// to the S3Store and processing/forwarding to the S3 backend.
939+
// NOTE: Unlike other streaming examples in these tests, we will write more than one byte at a
940+
// time to save time.
941+
spawn!("multipart_upload", async move {
942+
tokio::try_join!(update_fut, async move {
943+
for _ in 0..(AC_ENTRY_SIZE / MIN_MULTIPART_SIZE) {
944+
tx.send(send_data_copy.slice(0..MIN_MULTIPART_SIZE)).await?;
945+
}
946+
tx.send(send_data_copy.slice(0..LAST_CHUNK_SIZE)).await?;
947+
tx.send_eof()
948+
})
949+
.or_else(|e| {
950+
// Printing error to make it easier to debug, since ordering
951+
// of futures is not guaranteed.
952+
eprintln!("Error updating or sending in spawn: {e:?}");
953+
Err(e)
954+
})
955+
})
956+
.await
957+
.err_tip(|| "Failed to launch spawn")?
958+
.err_tip(|| "In spawn")?;
959+
960+
mock_client.assert_requests_match(&[]);
961+
Ok(())
962+
}

0 commit comments

Comments
 (0)