diff --git a/nativelink-store/src/s3_store.rs b/nativelink-store/src/s3_store.rs index 1413a52e5..044a736b6 100644 --- a/nativelink-store/src/s3_store.rs +++ b/nativelink-store/src/s3_store.rs @@ -74,6 +74,10 @@ use tracing::{Level, event}; use crate::cas_utils::is_zero_digest; +// S3 object cannot be larger than this number. See: +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html +const MAX_UPLOAD_SIZE: u64 = 5 * 1024 * 1024 * 1024 * 1024; // 5TB. + // S3 parts cannot be smaller than this number. See: // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB. @@ -84,7 +88,8 @@ const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB. // S3 parts cannot be more than this number. See: // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html -const MAX_UPLOAD_PARTS: usize = 10_000; +// Note: Type 'u64' chosen to simplify calculations +const MAX_UPLOAD_PARTS: u64 = 10_000; // Default max buffer size for retrying upload requests. // Note: If you change this, adjust the docs in the config. @@ -595,6 +600,14 @@ where UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz, }; + // Sanity check S3 maximum upload size. + if max_size > MAX_UPLOAD_SIZE { + return Err(make_err!( + Code::FailedPrecondition, + "File size exceeds max of {MAX_UPLOAD_SIZE}" + )); + } + // Note(allada) It might be more optimal to use a different // heuristic here, but for simplicity we use a hard coded value. // Anything going down this if-statement will have the advantage of only @@ -705,9 +718,24 @@ where .await?; // S3 requires us to upload in parts if the size is greater than 5GB. The part size must be at least - // 5mb (except last part) and can have up to 10,000 parts. + // 5MB (except last part) and can have up to 10,000 parts. + + // Calculate of number of chunks if we upload in 5MB chucks (min chunk size), clamping to + // 10,000 parts and correcting for lossy integer division. This provides the + let chunk_count = (max_size / MIN_MULTIPART_SIZE).clamp(0, MAX_UPLOAD_PARTS - 1) + 1; + + // Using clamped first approximation of number of chunks, calculate byte count of each + // chunk, excluding last chunk, clamping to min/max upload size 5MB, 5GB. let bytes_per_upload_part = - (max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE); + (max_size / chunk_count).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE); + + // Sanity check before continuing. + if !(MIN_MULTIPART_SIZE..MAX_MULTIPART_SIZE).contains(&bytes_per_upload_part) { + return Err(make_err!( + Code::FailedPrecondition, + "Failed to calculate file chuck size (min, max, calc): {MIN_MULTIPART_SIZE}, {MAX_MULTIPART_SIZE}, {bytes_per_upload_part}", + )); + } let upload_parts = move || async move { // This will ensure we only have `multipart_max_concurrent_uploads` * `bytes_per_upload_part` @@ -774,11 +802,8 @@ where let mut upload_futures = FuturesUnordered::new(); let mut completed_parts = Vec::with_capacity( - usize::try_from(cmp::min( - MAX_UPLOAD_PARTS as u64, - (max_size / bytes_per_upload_part) + 1, - )) - .err_tip(|| "Could not convert u64 to usize")?, + usize::try_from(cmp::min(MAX_UPLOAD_PARTS, chunk_count)) + .err_tip(|| "Could not convert u64 to usize")?, ); tokio::pin!(read_stream_fut); loop { diff --git a/nativelink-store/tests/s3_store_test.rs b/nativelink-store/tests/s3_store_test.rs index d3aed892f..cb7c276fc 100644 --- a/nativelink-store/tests/s3_store_test.rs +++ b/nativelink-store/tests/s3_store_test.rs @@ -36,6 +36,7 @@ use nativelink_util::common::DigestInfo; use nativelink_util::instant_wrapper::MockInstantWrapped; use nativelink_util::spawn; use nativelink_util::store_trait::{StoreLike, UploadSizeInfo}; +use patricia_tree::BorrowedBytes; use pretty_assertions::assert_eq; use sha2::{Digest, Sha256}; @@ -769,3 +770,193 @@ async fn has_with_expired_result() -> Result<(), Error> { Ok(()) } + +// TODO: Consider moving business logic to a helper so that multiple tests can take advantage of +// creating multipart uploads with various chunk sizes. This could be done alongside effort to test +// for chunk size > MIN_MULTIPART_SIZE. +#[nativelink_test] +async fn multipart_chunk_size_clamp_min() -> Result<(), Error> { + // Same as in s3_store. + const MIN_MULTIPART_SIZE: usize = 5 * 1024 * 1024; // 5MiB + const MAX_MULTIPART_SIZE: usize = 5 * 1024 * 1024 * 1024; // 5GiB + const MAX_UPLOAD_SIZE: usize = 5 * 1024 * 1024 * 1024 * 1024; // 5 TiB + const MAX_UPLOAD_PARTS: usize = 10_000; + + // It is impossible to trigger a clamp down to MAX_MULTIPART_SIZE... + assert!(MAX_UPLOAD_SIZE / MAX_UPLOAD_PARTS < MAX_MULTIPART_SIZE); + + // ... so chose an upload size that will force chunk size calculation to be clamped up to MIN_MULTIPART_SIZE + const AC_ENTRY_SIZE: usize = 10 * MIN_MULTIPART_SIZE + 50; + + // Expected values + const CHUNK_SIZE: usize = MIN_MULTIPART_SIZE; + const CHUNK_COUNT: usize = AC_ENTRY_SIZE / CHUNK_SIZE + 1; // 11 + const LAST_CHUNK_SIZE: usize = AC_ENTRY_SIZE % MIN_MULTIPART_SIZE; + + // Allocate a palatable amount of data to send repeatedly until the chosen number of bytes have + // been uploaded. + let mut send_data = Vec::with_capacity(MIN_MULTIPART_SIZE); + for i in 0..send_data.capacity() { + send_data.push(((i * 3) % 256) as u8); + } + + let digest = DigestInfo::try_new(VALID_HASH1, AC_ENTRY_SIZE)?; + + let mut events = Vec::new(); + // InitiateMultipartUpload + events.push( + ReplayEvent::new( + http::Request::builder() + .uri(format!( + "https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?uploads", + )) + .method("POST") + .body(SdkBody::empty()) + .unwrap(), + http::Response::builder() + .status(StatusCode::OK) + .body(SdkBody::from( + r#" + + Dummy-uploadid + "# + .as_bytes(), + )) + .unwrap(), + ) + ); + + // UploadPart + for i in 1..=(AC_ENTRY_SIZE / MIN_MULTIPART_SIZE) { + events.push( + ReplayEvent::new( + http::Request::builder() + .uri(format!( + "https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?x-id=UploadPart&partNumber={i}&uploadId=Dummy-uploadid", + )) + .method("PUT") + .header("content-type", "application/octet-stream") + .header("content-length", "5242880") + .body(SdkBody::from(&send_data[0..MIN_MULTIPART_SIZE])) + .unwrap(), + http::Response::builder() + .status(StatusCode::OK) + .body(SdkBody::empty()) + .unwrap(), + ), + ); + } + + // Last UploadPart + events.push( + ReplayEvent::new( + http::Request::builder() + .uri(format!( + "https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?x-id=UploadPart&partNumber={}&uploadId=Dummy-uploadid", + CHUNK_COUNT, + )) + .method("PUT") + .header("content-type", "application/octet-stream") + .header("content-length", "50") + .body(SdkBody::from( + &send_data[0..LAST_CHUNK_SIZE], + )) + .unwrap(), + http::Response::builder() + .status(StatusCode::OK) + .body(SdkBody::empty()) + .unwrap(), + ), + ); + + // Build body of CompleteMultipartUpload + let mut body = String::from( + r#""#, + ); + for i in 1..=CHUNK_COUNT { + body.push_str(&format!("{i}")); + } + body.push_str(""); + + // CompleteMultipartUpload + events.push( + ReplayEvent::new( + http::Request::builder() + .uri(format!( + "https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?uploadId=Dummy-uploadid", + )) + .method("POST") + .header("content-length", format!("{}", body.len())) + .body(SdkBody::from(body.as_str())) + .unwrap(), + http::Response::builder() + .status(StatusCode::OK) + .body(SdkBody::from(concat!( + "", + "", + ))) + .unwrap(), + ), + ); + + // Leave here for safe-keeping our sanity in the event the test parameters are modified. + assert!( + CHUNK_COUNT < MAX_UPLOAD_PARTS, + "Expected chunck count to be less than 10,000 (max)" + ); + + let mock_client = StaticReplayClient::new(events); + let test_config = Builder::new() + .behavior_version(BehaviorVersion::v2025_01_17()) + .region(Region::from_static(REGION)) + .http_client(mock_client.clone()) + .build(); + let s3_client = aws_sdk_s3::Client::from_conf(test_config); + let store = S3Store::new_with_client_and_jitter( + &S3Spec { + bucket: BUCKET_NAME.to_string(), + ..Default::default() + }, + s3_client, + Arc::new(move |_delay| Duration::from_secs(0)), + MockInstantWrapped::default, + )?; + + // To avoid needing a complete buffer of a certain size, we're going to pump data from a + // reasonably-sized buffer repeatedly until the expected number of bytes have been written. + let (mut tx, rx) = make_buf_channel_pair(); + // Make future responsible for processing the datastream + // and forwarding it to the s3 backend/server. + let update_fut = Box::pin(async move { + store + .update(digest, rx, UploadSizeInfo::ExactSize(AC_ENTRY_SIZE as u64)) + .await + }); + + let send_data_copy = Bytes::copy_from_slice(send_data.as_bytes()); + // Create spawn that is responsible for sending the stream of data + // to the S3Store and processing/forwarding to the S3 backend. + // NOTE: Unlike other streaming examples in these tests, we will write more than one byte at a + // time to save time. + spawn!("multipart_upload", async move { + tokio::try_join!(update_fut, async move { + for _ in 0..(AC_ENTRY_SIZE / MIN_MULTIPART_SIZE) { + tx.send(send_data_copy.slice(0..MIN_MULTIPART_SIZE)).await?; + } + tx.send(send_data_copy.slice(0..LAST_CHUNK_SIZE)).await?; + tx.send_eof() + }) + .or_else(|e| { + // Printing error to make it easier to debug, since ordering + // of futures is not guaranteed. + eprintln!("Error updating or sending in spawn: {e:?}"); + Err(e) + }) + }) + .await + .err_tip(|| "Failed to launch spawn")? + .err_tip(|| "In spawn")?; + + mock_client.assert_requests_match(&[]); + Ok(()) +}