Skip to content

Fix calculation of multipart S3 upload chunk size #1648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions nativelink-store/src/s3_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ use tracing::{Level, event};

use crate::cas_utils::is_zero_digest;

// S3 object cannot be larger than this number. See:
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
const MAX_UPLOAD_SIZE: u64 = 5 * 1024 * 1024 * 1024 * 1024; // 5TB.

// S3 parts cannot be smaller than this number. See:
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB.
Expand All @@ -84,7 +88,8 @@ const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB.

// S3 parts cannot be more than this number. See:
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
const MAX_UPLOAD_PARTS: usize = 10_000;
// Note: Type 'u64' chosen to simplify calculations
const MAX_UPLOAD_PARTS: u64 = 10_000;

// Default max buffer size for retrying upload requests.
// Note: If you change this, adjust the docs in the config.
Expand Down Expand Up @@ -595,6 +600,14 @@ where
UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz,
};

// Sanity check S3 maximum upload size.
if max_size > MAX_UPLOAD_SIZE {
return Err(make_err!(
Code::FailedPrecondition,
"File size exceeds max of {MAX_UPLOAD_SIZE}"
));
}

// Note(allada) It might be more optimal to use a different
// heuristic here, but for simplicity we use a hard coded value.
// Anything going down this if-statement will have the advantage of only
Expand Down Expand Up @@ -705,9 +718,24 @@ where
.await?;

// S3 requires us to upload in parts if the size is greater than 5GB. The part size must be at least
// 5mb (except last part) and can have up to 10,000 parts.
// 5MB (except last part) and can have up to 10,000 parts.

// Calculate of number of chunks if we upload in 5MB chucks (min chunk size), clamping to
// 10,000 parts and correcting for lossy integer division. This provides the
let chunk_count = (max_size / MIN_MULTIPART_SIZE).clamp(0, MAX_UPLOAD_PARTS - 1) + 1;

// Using clamped first approximation of number of chunks, calculate byte count of each
// chunk, excluding last chunk, clamping to min/max upload size 5MB, 5GB.
let bytes_per_upload_part =
(max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
(max_size / chunk_count).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);

// Sanity check before continuing.
if !(MIN_MULTIPART_SIZE..MAX_MULTIPART_SIZE).contains(&bytes_per_upload_part) {
return Err(make_err!(
Code::FailedPrecondition,
"Failed to calculate file chuck size (min, max, calc): {MIN_MULTIPART_SIZE}, {MAX_MULTIPART_SIZE}, {bytes_per_upload_part}",
));
}

let upload_parts = move || async move {
// This will ensure we only have `multipart_max_concurrent_uploads` * `bytes_per_upload_part`
Expand Down Expand Up @@ -774,11 +802,8 @@ where
let mut upload_futures = FuturesUnordered::new();

let mut completed_parts = Vec::with_capacity(
usize::try_from(cmp::min(
MAX_UPLOAD_PARTS as u64,
(max_size / bytes_per_upload_part) + 1,
))
.err_tip(|| "Could not convert u64 to usize")?,
usize::try_from(cmp::min(MAX_UPLOAD_PARTS, chunk_count))
.err_tip(|| "Could not convert u64 to usize")?,
);
tokio::pin!(read_stream_fut);
loop {
Expand Down
191 changes: 191 additions & 0 deletions nativelink-store/tests/s3_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use nativelink_util::common::DigestInfo;
use nativelink_util::instant_wrapper::MockInstantWrapped;
use nativelink_util::spawn;
use nativelink_util::store_trait::{StoreLike, UploadSizeInfo};
use patricia_tree::BorrowedBytes;
use pretty_assertions::assert_eq;
use sha2::{Digest, Sha256};

Expand Down Expand Up @@ -769,3 +770,193 @@ async fn has_with_expired_result() -> Result<(), Error> {

Ok(())
}

// TODO: Consider moving business logic to a helper so that multiple tests can take advantage of
// creating multipart uploads with various chunk sizes. This could be done alongside effort to test
// for chunk size > MIN_MULTIPART_SIZE.
#[nativelink_test]
async fn multipart_chunk_size_clamp_min() -> Result<(), Error> {
// Same as in s3_store.
const MIN_MULTIPART_SIZE: usize = 5 * 1024 * 1024; // 5MiB
const MAX_MULTIPART_SIZE: usize = 5 * 1024 * 1024 * 1024; // 5GiB
const MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024 * 1024 * 1024; // 5 TiB
const MAX_UPLOAD_PARTS: usize = 10_000;

// It is impossible to trigger a clamp down to MAX_MULTIPART_SIZE...
assert!(MAX_UPLOAD_SIZE / MAX_UPLOAD_PARTS < MAX_MULTIPART_SIZE);

// ... so chose an upload size that will force chunk size calculation to be clamped up to MIN_MULTIPART_SIZE
const AC_ENTRY_SIZE: usize = 10 * MIN_MULTIPART_SIZE + 50;

// Expected values
const CHUNK_SIZE: usize = MIN_MULTIPART_SIZE;
const CHUNK_COUNT: usize = AC_ENTRY_SIZE / CHUNK_SIZE + 1; // 11
const LAST_CHUNK_SIZE: usize = AC_ENTRY_SIZE % MIN_MULTIPART_SIZE;

// Allocate a palatable amount of data to send repeatedly until the chosen number of bytes have
// been uploaded.
let mut send_data = Vec::with_capacity(MIN_MULTIPART_SIZE);
for i in 0..send_data.capacity() {
send_data.push(((i * 3) % 256) as u8);
}

let digest = DigestInfo::try_new(VALID_HASH1, AC_ENTRY_SIZE)?;

let mut events = Vec::new();
// InitiateMultipartUpload
events.push(
ReplayEvent::new(
http::Request::builder()
.uri(format!(
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?uploads",
))
.method("POST")
.body(SdkBody::empty())
.unwrap(),
http::Response::builder()
.status(StatusCode::OK)
.body(SdkBody::from(
r#"
<InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<UploadId>Dummy-uploadid</UploadId>
</InitiateMultipartUploadResult>"#
.as_bytes(),
))
.unwrap(),
)
);

// UploadPart
for i in 1..=(AC_ENTRY_SIZE / MIN_MULTIPART_SIZE) {
events.push(
ReplayEvent::new(
http::Request::builder()
.uri(format!(
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?x-id=UploadPart&partNumber={i}&uploadId=Dummy-uploadid",
))
.method("PUT")
.header("content-type", "application/octet-stream")
.header("content-length", "5242880")
.body(SdkBody::from(&send_data[0..MIN_MULTIPART_SIZE]))
.unwrap(),
http::Response::builder()
.status(StatusCode::OK)
.body(SdkBody::empty())
.unwrap(),
),
);
}

// Last UploadPart
events.push(
ReplayEvent::new(
http::Request::builder()
.uri(format!(
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?x-id=UploadPart&partNumber={}&uploadId=Dummy-uploadid",
CHUNK_COUNT,
))
.method("PUT")
.header("content-type", "application/octet-stream")
.header("content-length", "50")
.body(SdkBody::from(
&send_data[0..LAST_CHUNK_SIZE],
))
.unwrap(),
http::Response::builder()
.status(StatusCode::OK)
.body(SdkBody::empty())
.unwrap(),
),
);

// Build body of CompleteMultipartUpload
let mut body = String::from(
r#"<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"#,
);
for i in 1..=CHUNK_COUNT {
body.push_str(&format!("<Part><PartNumber>{i}</PartNumber></Part>"));
}
body.push_str("</CompleteMultipartUpload>");

// CompleteMultipartUpload
events.push(
ReplayEvent::new(
http::Request::builder()
.uri(format!(
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?uploadId=Dummy-uploadid",
))
.method("POST")
.header("content-length", format!("{}", body.len()))
.body(SdkBody::from(body.as_str()))
.unwrap(),
http::Response::builder()
.status(StatusCode::OK)
.body(SdkBody::from(concat!(
"<CompleteMultipartUploadResult>",
"</CompleteMultipartUploadResult>",
)))
.unwrap(),
),
);

// Leave here for safe-keeping our sanity in the event the test parameters are modified.
assert!(
CHUNK_COUNT < MAX_UPLOAD_PARTS,
"Expected chunck count to be less than 10,000 (max)"
);

let mock_client = StaticReplayClient::new(events);
let test_config = Builder::new()
.behavior_version(BehaviorVersion::v2025_01_17())
.region(Region::from_static(REGION))
.http_client(mock_client.clone())
.build();
let s3_client = aws_sdk_s3::Client::from_conf(test_config);
let store = S3Store::new_with_client_and_jitter(
&S3Spec {
bucket: BUCKET_NAME.to_string(),
..Default::default()
},
s3_client,
Arc::new(move |_delay| Duration::from_secs(0)),
MockInstantWrapped::default,
)?;

// To avoid needing a complete buffer of a certain size, we're going to pump data from a
// reasonably-sized buffer repeatedly until the expected number of bytes have been written.
let (mut tx, rx) = make_buf_channel_pair();
// Make future responsible for processing the datastream
// and forwarding it to the s3 backend/server.
let update_fut = Box::pin(async move {
store
.update(digest, rx, UploadSizeInfo::ExactSize(AC_ENTRY_SIZE as u64))
.await
});

let send_data_copy = Bytes::copy_from_slice(send_data.as_bytes());
// Create spawn that is responsible for sending the stream of data
// to the S3Store and processing/forwarding to the S3 backend.
// NOTE: Unlike other streaming examples in these tests, we will write more than one byte at a
// time to save time.
spawn!("multipart_upload", async move {
tokio::try_join!(update_fut, async move {
for _ in 0..(AC_ENTRY_SIZE / MIN_MULTIPART_SIZE) {
tx.send(send_data_copy.slice(0..MIN_MULTIPART_SIZE)).await?;
}
tx.send(send_data_copy.slice(0..LAST_CHUNK_SIZE)).await?;
tx.send_eof()
})
.or_else(|e| {
// Printing error to make it easier to debug, since ordering
// of futures is not guaranteed.
eprintln!("Error updating or sending in spawn: {e:?}");
Err(e)
})
})
.await
.err_tip(|| "Failed to launch spawn")?
.err_tip(|| "In spawn")?;

mock_client.assert_requests_match(&[]);
Ok(())
}
Loading