From f962e3ef504b50a5d72812304a73be4a54f31987 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 29 Jul 2025 12:44:03 -0600 Subject: [PATCH 01/32] Add TODO --- src/storage_engine/data_store.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index ab625306..2403cd5b 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -767,6 +767,7 @@ impl DataStoreWriter for DataStore { fn delete(&self, key: &[u8]) -> Result { let key_hash = compute_hash(key); + // TODO: Check prior exists before deletion self.batch_write_hashed_payloads(vec![(key_hash, &NULL_BYTE)], true) } } From b20591ea9aad95915efe91385e660c08edd8cae5 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 29 Jul 2025 12:44:44 -0600 Subject: [PATCH 02/32] Add additional TODO --- src/storage_engine/data_store.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 2403cd5b..4fb788ff 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -770,6 +770,8 @@ impl DataStoreWriter for DataStore { // TODO: Check prior exists before deletion self.batch_write_hashed_payloads(vec![(key_hash, &NULL_BYTE)], true) } + + // TODO: Implement batch_delete } impl DataStoreReader for DataStore { From 1a82ba44d179ec8f0211ee6bb8358ab20e5fcfd5 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 29 Jul 2025 12:55:35 -0600 Subject: [PATCH 03/32] Add TODO --- src/storage_engine/data_store.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 4fb788ff..d11ff8ae 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -714,6 +714,7 @@ impl DataStoreWriter for DataStore { self.write_with_key_hash(key_hash, payload) } + // TODO: Change signature to: fn batch_write(&self, entries: Vec<(Vec, Vec)>) -> Result { fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result { let (keys, payloads): (Vec<_>, Vec<_>) = entries.iter().cloned().unzip(); let hashes = compute_hash_batch(&keys); From 7f44e36af2087304c96eb0b24e1c5e501781530e Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 29 Jul 2025 13:34:25 -0600 Subject: [PATCH 04/32] Add TODO --- src/storage_engine/data_store.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index d11ff8ae..8bd2e881 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -229,6 +229,7 @@ impl DataStore { self.path.clone() } + // TODO: Ensure this doesn't include deleted entries /// Retrieves an iterator over all valid entries in the storage. /// /// This iterator allows scanning the storage file and retrieving **only the most recent** From cce3a16d5160d06da0168de0d844b031b98be58f Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 29 Jul 2025 14:00:33 -0600 Subject: [PATCH 05/32] Add TODO --- src/storage_engine/data_store.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 8bd2e881..9064ca8e 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -408,6 +408,7 @@ impl DataStore { self.batch_write_hashed_payloads(vec![(key_hash, payload)], false) } + // TODO: Change `hashed_payloads: Vec<(u64, &[u8])>` to `hashed_payloads: Vec<(u64, Vec)>` /// Writes multiple key-value pairs as a **single transaction**, using precomputed key hashes. /// /// This method efficiently appends multiple entries in a **batch operation**, From 26b5927d14066e75e763b52b628dd6e61030a03e Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 29 Jul 2025 18:02:04 -0600 Subject: [PATCH 06/32] Add TODO --- src/storage_engine/data_store.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 9064ca8e..43272717 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -47,6 +47,7 @@ impl From for DataStore { } impl DataStore { + // TODO: Use owned `PathBuf` /// Opens an **existing** or **new** append-only storage file. /// /// This function: From 9fa23a7872b449ef3a0a9d8fa8412c43485a6233 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Fri, 1 Aug 2025 12:55:09 -0600 Subject: [PATCH 07/32] Add TODO --- src/storage_engine/data_store.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 43272717..4ac57ceb 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -25,6 +25,7 @@ pub struct DataStore { path: PathBuf, } +// TODO: Add feature to iterate via `rayon` impl IntoIterator for DataStore { type Item = EntryHandle; type IntoIter = EntryIterator; From 2099bc2d024cff01d100ae0be78a08f91947bfe0 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Fri, 1 Aug 2025 12:57:10 -0600 Subject: [PATCH 08/32] Update TODO --- src/storage_engine/data_store.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 4ac57ceb..142d382f 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -25,7 +25,6 @@ pub struct DataStore { path: PathBuf, } -// TODO: Add feature to iterate via `rayon` impl IntoIterator for DataStore { type Item = EntryHandle; type IntoIter = EntryIterator; @@ -35,6 +34,22 @@ impl IntoIterator for DataStore { } } +// TODO: Add feature to iterate via `rayon` +// pub struct ParallelEntryIter { +// entries: Arc<[EntryHandle]>, // or offsets +// } +// +// impl ParallelIterator for ParallelEntryIter { +// type Item = EntryHandle; +// +// fn drive_unindexed(self, consumer: C) -> C::Result +// where +// C: rayon::iter::plumbing::UnindexedConsumer, +// { +// rayon::slice::from_arc(&self.entries).into_par_iter().drive_unindexed(consumer) +// } +// } + impl From for DataStore { /// Creates an `DataStore` instance from a `PathBuf`. /// From 9793672a3dd8f306a9c7d96899b4eac7fa3cb148 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Fri, 8 Aug 2025 16:37:10 -0600 Subject: [PATCH 09/32] Centralize dependencies --- Cargo.lock | 12 ++++++++++++ Cargo.toml | 16 ++++++++++++++++ .../Cargo.toml | 4 ++-- experiments/simd-r-drive-ws-client/Cargo.toml | 12 ++++++------ experiments/simd-r-drive-ws-server/Cargo.toml | 14 +++++++------- extensions/Cargo.toml | 6 +++--- 6 files changed, 46 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba9fe49e..79e5424b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -960,6 +960,8 @@ dependencies = [ [[package]] name = "muxio" version = "0.9.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2e143d83f97695de4bd77bc811d6e542026cbdce3b690b9feeff8e9129e4969" dependencies = [ "chrono", "once_cell", @@ -969,6 +971,8 @@ dependencies = [ [[package]] name = "muxio-rpc-service" version = "0.9.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75a5f5807934e07012687f744b4e8bf10e2391c83c7e807a5f119d5f6f873233" dependencies = [ "async-trait", "futures", @@ -980,6 +984,8 @@ dependencies = [ [[package]] name = "muxio-rpc-service-caller" version = "0.9.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24337541fc4a5b0e08b8f41cfdb8861a82b9ad66d0772365d59d099463b04413" dependencies = [ "async-trait", "futures", @@ -991,6 +997,8 @@ dependencies = [ [[package]] name = "muxio-rpc-service-endpoint" version = "0.9.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c799f999c7d5dbfad3ebd16fe0d93425c32f7242dcf18cc0adc57862a4525773" dependencies = [ "async-trait", "futures", @@ -1004,6 +1012,8 @@ dependencies = [ [[package]] name = "muxio-tokio-rpc-client" version = "0.9.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5656d1fc8093efe90fd90dab6f4f9f61348b9871b64da6a25478a2576e894e17" dependencies = [ "async-trait", "bytes", @@ -1020,6 +1030,8 @@ dependencies = [ [[package]] name = "muxio-tokio-rpc-server" version = "0.9.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "770aec8718a85c4854f124c0e247a4ec9c983d38b560ccf4b4dada4feabfd5b1" dependencies = [ "async-trait", "axum", diff --git a/Cargo.toml b/Cargo.toml index fd978900..d946a37e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,22 @@ publish.workspace = true # Inherit from workspace simd-r-drive = { path = ".", version = "0.11.0-alpha" } simd-r-drive-ws-client = { path = "./experiments/simd-r-drive-ws-client", version = "0.11.0-alpha" } simd-r-drive-muxio-service-definition = { path = "./experiments/simd-r-drive-muxio-service-definition", version = "0.11.0-alpha" } +muxio-tokio-rpc-client = "0.9.0-alpha" +muxio-tokio-rpc-server = "0.9.0-alpha" +muxio-rpc-service = "0.9.0-alpha" +muxio-rpc-service-caller = "0.9.0-alpha" + +# Third-party crates (note, not all dependencies are used in the base drive) +async-trait = "0.1.88" +bincode = "1.3.3" # TODO: Replace with `bitcode` +bitcode = "0.6.6" +clap = "4.5.40" +indoc = "2.0.6" +serde = "1.0.219" +tokio = "1.45.1" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" +walkdir = "2" [dependencies] async-trait = "0.1.88" diff --git a/experiments/simd-r-drive-muxio-service-definition/Cargo.toml b/experiments/simd-r-drive-muxio-service-definition/Cargo.toml index b80017ff..0529b725 100644 --- a/experiments/simd-r-drive-muxio-service-definition/Cargo.toml +++ b/experiments/simd-r-drive-muxio-service-definition/Cargo.toml @@ -11,6 +11,6 @@ keywords.workspace = true # Inherit from workspace publish.workspace = true # Inherit from workspace [dependencies] -bitcode = "0.6.6" -muxio-rpc-service = "0.9.0-alpha" +bitcode = { workspace = true } +muxio-rpc-service = { workspace = true } diff --git a/experiments/simd-r-drive-ws-client/Cargo.toml b/experiments/simd-r-drive-ws-client/Cargo.toml index ef7cc618..1d4dcfb1 100644 --- a/experiments/simd-r-drive-ws-client/Cargo.toml +++ b/experiments/simd-r-drive-ws-client/Cargo.toml @@ -13,9 +13,9 @@ publish.workspace = true # Inherit from workspace [dependencies] simd-r-drive = { workspace = true } simd-r-drive-muxio-service-definition = { workspace = true } -muxio-tokio-rpc-client = "0.9.0-alpha" -muxio-rpc-service = "0.9.0-alpha" -muxio-rpc-service-caller = "0.9.0-alpha" -tokio = "1.45.1" -tracing = "0.1.41" -async-trait = "0.1.88" +muxio-tokio-rpc-client = { workspace = true } +muxio-rpc-service = { workspace = true } +muxio-rpc-service-caller = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +async-trait = { workspace = true } diff --git a/experiments/simd-r-drive-ws-server/Cargo.toml b/experiments/simd-r-drive-ws-server/Cargo.toml index 51f6020c..d2ff2e83 100644 --- a/experiments/simd-r-drive-ws-server/Cargo.toml +++ b/experiments/simd-r-drive-ws-server/Cargo.toml @@ -13,10 +13,10 @@ publish.workspace = true # Inherit from workspace [dependencies] simd-r-drive = { workspace = true } simd-r-drive-muxio-service-definition = { workspace = true } -muxio-tokio-rpc-server = "0.9.0-alpha" -muxio-rpc-service = "0.9.0-alpha" -tokio = "1.45.1" -tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } -tracing = "0.1.41" -clap = { version = "4.5.40", features = ["derive"] } -indoc = "2.0.6" +muxio-tokio-rpc-server = { workspace = true } +muxio-rpc-service = { workspace = true } +tokio = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing = { workspace = true } +clap = { workspace = true, features = ["derive"] } +indoc = { workspace = true } diff --git a/extensions/Cargo.toml b/extensions/Cargo.toml index c1722f18..fcb8fd5b 100644 --- a/extensions/Cargo.toml +++ b/extensions/Cargo.toml @@ -11,10 +11,10 @@ keywords.workspace = true # Inherit from workspace publish.workspace = true # Inherit from workspace [dependencies] -bincode = "1.3.3" -serde = { version = "1.0.219", features = ["derive"] } +bincode = { workspace = true } +serde = { workspace = true, features = ["derive"] } simd-r-drive = { workspace = true } -walkdir = "2" +walkdir = { workspace = true} [dev-dependencies] doc-comment = "0.3.3" From b3c463071b265ccd956de6a0a3ae0b74cea7b0cc Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Fri, 8 Aug 2025 16:50:20 -0600 Subject: [PATCH 10/32] Fix `error: hiding a lifetime that's elided elsewhere is confusing` --- src/utils/align_or_copy.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/utils/align_or_copy.rs b/src/utils/align_or_copy.rs index 25cfc6eb..9192a6e7 100644 --- a/src/utils/align_or_copy.rs +++ b/src/utils/align_or_copy.rs @@ -41,7 +41,10 @@ use std::{borrow::Cow, mem}; /// let result: Cow<[f32]> = simd_r_drive::utils::align_or_copy::(raw, f32::from_le_bytes); /// assert_eq!(result[0], 1.0); /// ``` -pub fn align_or_copy(bytes: &[u8], from_le_bytes: fn([u8; N]) -> T) -> Cow<[T]> +pub fn align_or_copy( + bytes: &[u8], + from_le_bytes: fn([u8; N]) -> T, +) -> Cow<'_, [T]> where T: Copy, { From e238fae8bb859561542664a8781613da2c12b9a4 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Fri, 8 Aug 2025 16:52:25 -0600 Subject: [PATCH 11/32] Add comment --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index d946a37e..1a2b92ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ bitcode = "0.6.6" clap = "4.5.40" indoc = "2.0.6" serde = "1.0.219" -tokio = "1.45.1" +tokio = "1.45.1" # Tokio is not used in base `SIMD R Drive`, only extensions tracing = "0.1.41" tracing-subscriber = "0.3.19" walkdir = "2" From 9d055d01bba93436457f2c7018f2e517fc51ce37 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 10:50:36 -0600 Subject: [PATCH 12/32] Preempt version `0.12.0-alpha` --- Cargo.lock | 10 +++++----- Cargo.toml | 10 +++++----- .../bindings/python_(old_client)/pyproject.toml | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 79e5424b..097baa53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1497,7 +1497,7 @@ dependencies = [ [[package]] name = "simd-r-drive" -version = "0.11.0-alpha" +version = "0.12.0-alpha" dependencies = [ "async-trait", "bincode", @@ -1522,7 +1522,7 @@ dependencies = [ [[package]] name = "simd-r-drive-extensions" -version = "0.11.0-alpha" +version = "0.12.0-alpha" dependencies = [ "bincode", "doc-comment", @@ -1534,7 +1534,7 @@ dependencies = [ [[package]] name = "simd-r-drive-muxio-service-definition" -version = "0.11.0-alpha" +version = "0.12.0-alpha" dependencies = [ "bitcode", "muxio-rpc-service", @@ -1542,7 +1542,7 @@ dependencies = [ [[package]] name = "simd-r-drive-ws-client" -version = "0.11.0-alpha" +version = "0.12.0-alpha" dependencies = [ "async-trait", "muxio-rpc-service", @@ -1556,7 +1556,7 @@ dependencies = [ [[package]] name = "simd-r-drive-ws-server" -version = "0.11.0-alpha" +version = "0.12.0-alpha" dependencies = [ "clap", "indoc", diff --git a/Cargo.toml b/Cargo.toml index 1a2b92ee..a6bd1c8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,12 +1,12 @@ [workspace.package] authors = ["Jeremy Harris "] -version = "0.11.0-alpha" +version = "0.12.0-alpha" edition = "2024" repository = "https://github.com/jzombie/rust-simd-r-drive" license = "Apache-2.0" categories = ["database-implementations", "data-structures", "filesystem"] keywords = ["storage-engine", "binary-storage", "append-only", "simd", "mmap"] -publish = true +publish = false [package] name = "simd-r-drive" @@ -22,9 +22,9 @@ publish.workspace = true # Inherit from workspace [workspace.dependencies] # Intra-workspace crates -simd-r-drive = { path = ".", version = "0.11.0-alpha" } -simd-r-drive-ws-client = { path = "./experiments/simd-r-drive-ws-client", version = "0.11.0-alpha" } -simd-r-drive-muxio-service-definition = { path = "./experiments/simd-r-drive-muxio-service-definition", version = "0.11.0-alpha" } +simd-r-drive = { path = ".", version = "0.12.0-alpha" } +simd-r-drive-ws-client = { path = "./experiments/simd-r-drive-ws-client", version = "0.12.0-alpha" } +simd-r-drive-muxio-service-definition = { path = "./experiments/simd-r-drive-muxio-service-definition", version = "0.12.0-alpha" } muxio-tokio-rpc-client = "0.9.0-alpha" muxio-tokio-rpc-server = "0.9.0-alpha" muxio-rpc-service = "0.9.0-alpha" diff --git a/experiments/bindings/python_(old_client)/pyproject.toml b/experiments/bindings/python_(old_client)/pyproject.toml index 232a53eb..a0d97026 100644 --- a/experiments/bindings/python_(old_client)/pyproject.toml +++ b/experiments/bindings/python_(old_client)/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "simd-r-drive-py" -version = "0.11.0-alpha" +version = "0.12.0-alpha" description = "SIMD-optimized append-only schema-less storage engine. Key-based binary storage in a single-file storage container." repository = "https://github.com/jzombie/rust-simd-r-drive" license = "Apache-2.0" From 1703dd67efbbba99245839997149b650b9a49e92 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 10:53:01 -0600 Subject: [PATCH 13/32] Centralize additional deps --- Cargo.toml | 2 ++ extensions/Cargo.toml | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a6bd1c8a..9113c0d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,9 +35,11 @@ async-trait = "0.1.88" bincode = "1.3.3" # TODO: Replace with `bitcode` bitcode = "0.6.6" clap = "4.5.40" +doc-comment = "0.3.3" indoc = "2.0.6" serde = "1.0.219" tokio = "1.45.1" # Tokio is not used in base `SIMD R Drive`, only extensions +tempfile = "3.19.0" tracing = "0.1.41" tracing-subscriber = "0.3.19" walkdir = "2" diff --git a/extensions/Cargo.toml b/extensions/Cargo.toml index fcb8fd5b..a0b852c2 100644 --- a/extensions/Cargo.toml +++ b/extensions/Cargo.toml @@ -17,5 +17,5 @@ simd-r-drive = { workspace = true } walkdir = { workspace = true} [dev-dependencies] -doc-comment = "0.3.3" -tempfile = "3.19.0" +doc-comment = { workspace = true } +tempfile = { workspace = true } From 1eac14b33da6a060946b74862e2c76085c6901e3 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 10:56:41 -0600 Subject: [PATCH 14/32] Remove TODO --- src/storage_engine/data_store.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 142d382f..f593117c 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -63,7 +63,6 @@ impl From for DataStore { } impl DataStore { - // TODO: Use owned `PathBuf` /// Opens an **existing** or **new** append-only storage file. /// /// This function: From cd0da0fea4034379657313126a58162b9e5ae774 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 10:57:15 -0600 Subject: [PATCH 15/32] Remove TODO --- src/storage_engine/data_store.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index f593117c..fecf05ec 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -245,7 +245,6 @@ impl DataStore { self.path.clone() } - // TODO: Ensure this doesn't include deleted entries /// Retrieves an iterator over all valid entries in the storage. /// /// This iterator allows scanning the storage file and retrieving **only the most recent** From 5adb64fe25d170552bedbce525fb32b910cdf579 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 11:15:49 -0600 Subject: [PATCH 16/32] Add TODOs --- src/storage_engine/data_store.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index fecf05ec..e46e3cd4 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -332,6 +332,7 @@ impl DataStore { Ok(best_valid_offset.unwrap_or(0)) } + // TODO: Move to writer trait /// Writes an entry using a **precomputed key hash** and a streaming `Read` source. /// /// This is a **low-level** method that operates like `write_stream`, but requires @@ -401,6 +402,7 @@ impl DataStore { Ok(tail_offset) } + // TODO: Move to writer trait /// Writes an entry using a **precomputed key hash** and a payload. /// /// This method is a **low-level** alternative to `write()`, allowing direct @@ -423,6 +425,7 @@ impl DataStore { self.batch_write_hashed_payloads(vec![(key_hash, payload)], false) } + // TODO: Move to writer trait // TODO: Change `hashed_payloads: Vec<(u64, &[u8])>` to `hashed_payloads: Vec<(u64, Vec)>` /// Writes multiple key-value pairs as a **single transaction**, using precomputed key hashes. /// From 1281657c020327c68ccd59c8a28ec45d6deb27d8 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 11:19:30 -0600 Subject: [PATCH 17/32] Rename `hashed_payloads` to `prehashed_keys` --- src/storage_engine/data_store.rs | 20 ++++++++++---------- src/storage_engine/digest/compute_hash.rs | 2 +- src/storage_engine/traits/writer.rs | 8 ++++---- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index e46e3cd4..6b8d3223 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -420,20 +420,20 @@ impl DataStore { /// # Notes: /// - The caller is responsible for ensuring that `key_hash` is correctly computed. /// - This method **locks the file for writing** to maintain consistency. - /// - If writing **multiple entries**, consider using `batch_write_hashed_payloads()`. + /// - If writing **multiple entries**, consider using `batch_write_prehashed_keys()`. pub fn write_with_key_hash(&self, key_hash: u64, payload: &[u8]) -> Result { - self.batch_write_hashed_payloads(vec![(key_hash, payload)], false) + self.batch_write_prehashed_keys(vec![(key_hash, payload)], false) } // TODO: Move to writer trait - // TODO: Change `hashed_payloads: Vec<(u64, &[u8])>` to `hashed_payloads: Vec<(u64, Vec)>` + // TODO: Change `prehashed_keys: Vec<(u64, &[u8])>` to `prehashed_keys: Vec<(u64, Vec)>` /// Writes multiple key-value pairs as a **single transaction**, using precomputed key hashes. /// /// This method efficiently appends multiple entries in a **batch operation**, /// reducing lock contention and improving performance for bulk writes. /// /// # Parameters: - /// - `hashed_payloads`: A **vector of precomputed key hashes and payloads**, where: + /// - `prehashed_keys`: A **vector of precomputed key hashes and payloads**, where: /// - `key_hash`: The **precomputed hash** of the key. /// - `payload`: The **data payload** to be stored. /// @@ -453,9 +453,9 @@ impl DataStore { /// - **Faster than multiple `write()` calls**, since it reduces lock contention. /// - Suitable for **bulk insertions** where key hashes are known beforehand. /// - If keys are available but not hashed, use `batch_write()` instead. - pub fn batch_write_hashed_payloads( + pub fn batch_write_prehashed_keys( &self, - hashed_payloads: Vec<(u64, &[u8])>, + prehashed_keys: Vec<(u64, &[u8])>, allow_null_bytes: bool, ) -> Result { let mut file = self @@ -466,10 +466,10 @@ impl DataStore { let mut buffer = Vec::new(); let mut tail_offset = self.tail_offset.load(Ordering::Acquire); - let mut key_hash_offsets: Vec<(u64, u64)> = Vec::with_capacity(hashed_payloads.len()); + let mut key_hash_offsets: Vec<(u64, u64)> = Vec::with_capacity(prehashed_keys.len()); let mut deleted_keys: HashSet = HashSet::new(); - for (key_hash, payload) in hashed_payloads { + for (key_hash, payload) in prehashed_keys { if payload == NULL_BYTE { if !allow_null_bytes { return Err(std::io::Error::new( @@ -739,7 +739,7 @@ impl DataStoreWriter for DataStore { let (keys, payloads): (Vec<_>, Vec<_>) = entries.iter().cloned().unzip(); let hashes = compute_hash_batch(&keys); let hashed_entries = hashes.into_iter().zip(payloads).collect::>(); - self.batch_write_hashed_payloads(hashed_entries, false) + self.batch_write_prehashed_keys(hashed_entries, false) } fn rename(&self, old_key: &[u8], new_key: &[u8]) -> Result { @@ -789,7 +789,7 @@ impl DataStoreWriter for DataStore { fn delete(&self, key: &[u8]) -> Result { let key_hash = compute_hash(key); // TODO: Check prior exists before deletion - self.batch_write_hashed_payloads(vec![(key_hash, &NULL_BYTE)], true) + self.batch_write_prehashed_keys(vec![(key_hash, &NULL_BYTE)], true) } // TODO: Implement batch_delete diff --git a/src/storage_engine/digest/compute_hash.rs b/src/storage_engine/digest/compute_hash.rs index 6b829c33..4fef4d0a 100644 --- a/src/storage_engine/digest/compute_hash.rs +++ b/src/storage_engine/digest/compute_hash.rs @@ -37,7 +37,7 @@ pub fn compute_hash(key: &[u8]) -> u64 { /// * call the hasher exactly **once** from the high-level API, /// * pre-allocate the `Vec` only once, /// * hand the resulting `(hash, payload)` tuples straight to -/// `batch_write_hashed_payloads`, keeping the critical section (the `RwLock`) +/// `batch_write_prehashed_keys`, keeping the critical section (the `RwLock`) /// as small as possible. /// /// # Parameters diff --git a/src/storage_engine/traits/writer.rs b/src/storage_engine/traits/writer.rs index 59e4cb0a..34624e05 100644 --- a/src/storage_engine/traits/writer.rs +++ b/src/storage_engine/traits/writer.rs @@ -52,7 +52,7 @@ pub trait DataStoreWriter { /// Writes multiple key-value pairs as a **single transaction**. /// /// This method computes the hashes of the provided keys and delegates to - /// `batch_write_hashed_payloads()`, ensuring all writes occur in a single + /// `batch_write_prehashed_keys()`, ensuring all writes occur in a single /// locked operation for efficiency. /// /// # Parameters: @@ -67,7 +67,7 @@ pub trait DataStoreWriter { /// # Notes: /// - This method improves efficiency by **minimizing file lock contention**. /// - If a large number of entries are written, **batching reduces overhead**. - /// - If the key hashes are already computed, use `batch_write_hashed_payloads()`. + /// - If the key hashes are already computed, use `batch_write_prehashed_keys()`. fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result; /// Renames an existing entry by copying it under a new key and marking the old key as deleted. @@ -196,7 +196,7 @@ pub trait AsyncDataStoreWriter { /// Writes multiple key-value pairs as a **single transaction**. /// /// This method computes the hashes of the provided keys and delegates to - /// `batch_write_hashed_payloads()`, ensuring all writes occur in a single + /// `batch_write_prehashed_keys()`, ensuring all writes occur in a single /// locked operation for efficiency. /// /// # Parameters: @@ -211,7 +211,7 @@ pub trait AsyncDataStoreWriter { /// # Notes: /// - This method improves efficiency by **minimizing file lock contention**. /// - If a large number of entries are written, **batching reduces overhead**. - /// - If the key hashes are already computed, use `batch_write_hashed_payloads()`. + /// - If the key hashes are already computed, use `batch_write_prehashed_keys()`. async fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result; /// Renames an existing entry by copying it under a new key and marking the old key as deleted. From 4a2174d998562f0fe9bd270baddca47c82dc4cb5 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 11:28:32 -0600 Subject: [PATCH 18/32] Bump lock for local dev --- Cargo.lock | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 097baa53..c538a745 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2205,3 +2205,19 @@ dependencies = [ "quote", "syn", ] + +[[patch.unused]] +name = "muxio-rpc-service" +version = "0.10.0-alpha" + +[[patch.unused]] +name = "muxio-rpc-service-caller" +version = "0.10.0-alpha" + +[[patch.unused]] +name = "muxio-tokio-rpc-client" +version = "0.10.0-alpha" + +[[patch.unused]] +name = "muxio-tokio-rpc-server" +version = "0.10.0-alpha" From e9b0306bc4b38627005b4715af9914814afbd7e3 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 11:29:36 -0600 Subject: [PATCH 19/32] Rename to `batch_write_with_key_hashes` --- src/storage_engine/data_store.rs | 10 +++++----- src/storage_engine/digest/compute_hash.rs | 2 +- src/storage_engine/traits/writer.rs | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 6b8d3223..26feadb1 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -420,9 +420,9 @@ impl DataStore { /// # Notes: /// - The caller is responsible for ensuring that `key_hash` is correctly computed. /// - This method **locks the file for writing** to maintain consistency. - /// - If writing **multiple entries**, consider using `batch_write_prehashed_keys()`. + /// - If writing **multiple entries**, consider using `batch_write_with_key_hashes()`. pub fn write_with_key_hash(&self, key_hash: u64, payload: &[u8]) -> Result { - self.batch_write_prehashed_keys(vec![(key_hash, payload)], false) + self.batch_write_with_key_hashes(vec![(key_hash, payload)], false) } // TODO: Move to writer trait @@ -453,7 +453,7 @@ impl DataStore { /// - **Faster than multiple `write()` calls**, since it reduces lock contention. /// - Suitable for **bulk insertions** where key hashes are known beforehand. /// - If keys are available but not hashed, use `batch_write()` instead. - pub fn batch_write_prehashed_keys( + pub fn batch_write_with_key_hashes( &self, prehashed_keys: Vec<(u64, &[u8])>, allow_null_bytes: bool, @@ -739,7 +739,7 @@ impl DataStoreWriter for DataStore { let (keys, payloads): (Vec<_>, Vec<_>) = entries.iter().cloned().unzip(); let hashes = compute_hash_batch(&keys); let hashed_entries = hashes.into_iter().zip(payloads).collect::>(); - self.batch_write_prehashed_keys(hashed_entries, false) + self.batch_write_with_key_hashes(hashed_entries, false) } fn rename(&self, old_key: &[u8], new_key: &[u8]) -> Result { @@ -789,7 +789,7 @@ impl DataStoreWriter for DataStore { fn delete(&self, key: &[u8]) -> Result { let key_hash = compute_hash(key); // TODO: Check prior exists before deletion - self.batch_write_prehashed_keys(vec![(key_hash, &NULL_BYTE)], true) + self.batch_write_with_key_hashes(vec![(key_hash, &NULL_BYTE)], true) } // TODO: Implement batch_delete diff --git a/src/storage_engine/digest/compute_hash.rs b/src/storage_engine/digest/compute_hash.rs index 4fef4d0a..2eefcb99 100644 --- a/src/storage_engine/digest/compute_hash.rs +++ b/src/storage_engine/digest/compute_hash.rs @@ -37,7 +37,7 @@ pub fn compute_hash(key: &[u8]) -> u64 { /// * call the hasher exactly **once** from the high-level API, /// * pre-allocate the `Vec` only once, /// * hand the resulting `(hash, payload)` tuples straight to -/// `batch_write_prehashed_keys`, keeping the critical section (the `RwLock`) +/// `batch_write_with_key_hashes`, keeping the critical section (the `RwLock`) /// as small as possible. /// /// # Parameters diff --git a/src/storage_engine/traits/writer.rs b/src/storage_engine/traits/writer.rs index 34624e05..4fd651c8 100644 --- a/src/storage_engine/traits/writer.rs +++ b/src/storage_engine/traits/writer.rs @@ -52,7 +52,7 @@ pub trait DataStoreWriter { /// Writes multiple key-value pairs as a **single transaction**. /// /// This method computes the hashes of the provided keys and delegates to - /// `batch_write_prehashed_keys()`, ensuring all writes occur in a single + /// `batch_write_with_key_hashes()`, ensuring all writes occur in a single /// locked operation for efficiency. /// /// # Parameters: @@ -67,7 +67,7 @@ pub trait DataStoreWriter { /// # Notes: /// - This method improves efficiency by **minimizing file lock contention**. /// - If a large number of entries are written, **batching reduces overhead**. - /// - If the key hashes are already computed, use `batch_write_prehashed_keys()`. + /// - If the key hashes are already computed, use `batch_write_with_key_hashes()`. fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result; /// Renames an existing entry by copying it under a new key and marking the old key as deleted. @@ -196,7 +196,7 @@ pub trait AsyncDataStoreWriter { /// Writes multiple key-value pairs as a **single transaction**. /// /// This method computes the hashes of the provided keys and delegates to - /// `batch_write_prehashed_keys()`, ensuring all writes occur in a single + /// `batch_write_with_key_hashes()`, ensuring all writes occur in a single /// locked operation for efficiency. /// /// # Parameters: @@ -211,7 +211,7 @@ pub trait AsyncDataStoreWriter { /// # Notes: /// - This method improves efficiency by **minimizing file lock contention**. /// - If a large number of entries are written, **batching reduces overhead**. - /// - If the key hashes are already computed, use `batch_write_prehashed_keys()`. + /// - If the key hashes are already computed, use `batch_write_with_key_hashes()`. async fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result; /// Renames an existing entry by copying it under a new key and marking the old key as deleted. From 2313733845cb0ff266aa4cbd5386a359f5ea2ff3 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 11:42:00 -0600 Subject: [PATCH 20/32] Add add'l writer trait methods --- .../simd-r-drive-ws-client/src/ws_client.rs | 20 ++ src/storage_engine/data_store.rs | 302 +++++++----------- src/storage_engine/traits/writer.rs | 138 ++++++++ 3 files changed, 277 insertions(+), 183 deletions(-) diff --git a/experiments/simd-r-drive-ws-client/src/ws_client.rs b/experiments/simd-r-drive-ws-client/src/ws_client.rs index 07b04fde..c9d447ba 100644 --- a/experiments/simd-r-drive-ws-client/src/ws_client.rs +++ b/experiments/simd-r-drive-ws-client/src/ws_client.rs @@ -39,6 +39,14 @@ impl AsyncDataStoreWriter for WsClient { unimplemented!("`write_stream` is not currently implemented"); } + async fn write_stream_with_key_hash( + &self, + _key_hash: u64, + _reader: &mut R, + ) -> Result { + unimplemented!("`write_stream_with_key_hash` is not currently implemented"); + } + async fn write(&self, key: &[u8], payload: &[u8]) -> Result { let response_params = Write::call( &self.rpc_client, @@ -52,6 +60,10 @@ impl AsyncDataStoreWriter for WsClient { Ok(response_params.tail_offset) } + async fn write_with_key_hash(&self, _key_hash: u64, _payload: &[u8]) -> Result { + unimplemented!("`write_with_key_hash` is not currently implemented"); + } + async fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result { let response_params = BatchWrite::call( &self.rpc_client, @@ -67,6 +79,14 @@ impl AsyncDataStoreWriter for WsClient { Ok(response_params.tail_offset) } + async fn batch_write_with_key_hashes( + &self, + _prehashed_keys: Vec<(u64, &[u8])>, + _allow_null_bytes: bool, + ) -> Result { + unimplemented!("`batch_write_with_key_hashes` is not currently implemented"); + } + async fn rename(&self, _old_key: &[u8], _new_key: &[u8]) -> Result { unimplemented!("`rename` is not currently implemented"); } diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 26feadb1..f68c2ada 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -332,188 +332,6 @@ impl DataStore { Ok(best_valid_offset.unwrap_or(0)) } - // TODO: Move to writer trait - /// Writes an entry using a **precomputed key hash** and a streaming `Read` source. - /// - /// This is a **low-level** method that operates like `write_stream`, but requires - /// the key to be hashed beforehand. It is primarily used internally to avoid - /// redundant hash computations when writing multiple entries. - /// - /// # Parameters: - /// - `key_hash`: The **precomputed hash** of the key. - /// - `reader`: A **streaming reader** (`Read` trait) supplying the entry's content. - /// - /// # Returns: - /// - `Ok(offset)`: The file offset where the entry was written. - /// - `Err(std::io::Error)`: If a write or I/O operation fails. - pub fn write_stream_with_key_hash( - &self, - key_hash: u64, - reader: &mut R, - ) -> Result { - let mut file = self - .file - .write() - .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?; - let prev_offset = self.tail_offset.load(Ordering::Acquire); - - let mut buffer = vec![0; WRITE_STREAM_BUFFER_SIZE]; - let mut total_written = 0; - let mut checksum_state = crc32fast::Hasher::new(); - let mut is_null_only = true; - - while let Ok(bytes_read) = reader.read(&mut buffer) { - if bytes_read == 0 { - break; - } - - if buffer[..bytes_read].iter().any(|&b| b != NULL_BYTE[0]) { - is_null_only = false; - } - - file.write_all(&buffer[..bytes_read])?; - checksum_state.update(&buffer[..bytes_read]); - total_written += bytes_read; - } - - if total_written > 0 && is_null_only { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "NULL-byte-only streams cannot be written directly.", - )); - } - - let checksum = checksum_state.finalize().to_le_bytes(); - let metadata = EntryMetadata { - key_hash, - prev_offset, - checksum, - }; - file.write_all(&metadata.serialize())?; - file.flush()?; - - let tail_offset = prev_offset + total_written as u64 + METADATA_SIZE as u64; - self.reindex( - &file, - &[(key_hash, tail_offset - METADATA_SIZE as u64)], - tail_offset, - None, - )?; - Ok(tail_offset) - } - - // TODO: Move to writer trait - /// Writes an entry using a **precomputed key hash** and a payload. - /// - /// This method is a **low-level** alternative to `write()`, allowing direct - /// specification of the key hash. It is mainly used for optimized workflows - /// where the key hash is already known, avoiding redundant computations. - /// - /// # Parameters: - /// - `key_hash`: The **precomputed hash** of the key. - /// - `payload`: The **data payload** to be stored. - /// - /// # Returns: - /// - `Ok(offset)`: The file offset where the entry was written. - /// - `Err(std::io::Error)`: If a write operation fails. - /// - /// # Notes: - /// - The caller is responsible for ensuring that `key_hash` is correctly computed. - /// - This method **locks the file for writing** to maintain consistency. - /// - If writing **multiple entries**, consider using `batch_write_with_key_hashes()`. - pub fn write_with_key_hash(&self, key_hash: u64, payload: &[u8]) -> Result { - self.batch_write_with_key_hashes(vec![(key_hash, payload)], false) - } - - // TODO: Move to writer trait - // TODO: Change `prehashed_keys: Vec<(u64, &[u8])>` to `prehashed_keys: Vec<(u64, Vec)>` - /// Writes multiple key-value pairs as a **single transaction**, using precomputed key hashes. - /// - /// This method efficiently appends multiple entries in a **batch operation**, - /// reducing lock contention and improving performance for bulk writes. - /// - /// # Parameters: - /// - `prehashed_keys`: A **vector of precomputed key hashes and payloads**, where: - /// - `key_hash`: The **precomputed hash** of the key. - /// - `payload`: The **data payload** to be stored. - /// - /// # Returns: - /// - `Ok(final_offset)`: The file offset after all writes. - /// - `Err(std::io::Error)`: If a write operation fails. - /// - /// # Notes: - /// - **File locking is performed only once** for all writes, improving efficiency. - /// - If an entry's `payload` is empty, an error is returned. - /// - This method uses **SIMD-accelerated memory copy (`simd_copy`)** to optimize write - /// performance. - /// - **Metadata (checksums, offsets) is written after payloads** to ensure data integrity. - /// - After writing, the memory-mapped file (`mmap`) is **remapped** to reflect updates. - /// - /// # Efficiency Considerations: - /// - **Faster than multiple `write()` calls**, since it reduces lock contention. - /// - Suitable for **bulk insertions** where key hashes are known beforehand. - /// - If keys are available but not hashed, use `batch_write()` instead. - pub fn batch_write_with_key_hashes( - &self, - prehashed_keys: Vec<(u64, &[u8])>, - allow_null_bytes: bool, - ) -> Result { - let mut file = self - .file - .write() - .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?; - - let mut buffer = Vec::new(); - let mut tail_offset = self.tail_offset.load(Ordering::Acquire); - - let mut key_hash_offsets: Vec<(u64, u64)> = Vec::with_capacity(prehashed_keys.len()); - let mut deleted_keys: HashSet = HashSet::new(); - - for (key_hash, payload) in prehashed_keys { - if payload == NULL_BYTE { - if !allow_null_bytes { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "NULL-byte payloads cannot be written directly.", - )); - } - - deleted_keys.insert(key_hash); - } - - if payload.is_empty() { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Payload cannot be empty.", - )); - } - - let prev_offset = tail_offset; - let checksum = compute_checksum(payload); - let metadata = EntryMetadata { - key_hash, - prev_offset, - checksum, - }; - let payload_len = payload.len(); - - let mut entry: Vec = vec![0u8; payload_len + METADATA_SIZE]; - simd_copy(&mut entry[..payload.len()], payload); - entry[payload.len()..].copy_from_slice(&metadata.serialize()); - buffer.extend_from_slice(&entry); - - tail_offset += entry.len() as u64; - key_hash_offsets.push((key_hash, tail_offset - METADATA_SIZE as u64)); - } - - file.write_all(&buffer)?; - file.flush()?; - - self.reindex(&file, &key_hash_offsets, tail_offset, Some(&deleted_keys))?; - - Ok(self.tail_offset.load(Ordering::Acquire)) - } - /// Performs the core logic of reading an entry from the store. /// /// This private helper centralizes the logic for both `read` and `batch_read`. @@ -729,12 +547,68 @@ impl DataStoreWriter for DataStore { self.write_stream_with_key_hash(key_hash, reader) } + fn write_stream_with_key_hash(&self, key_hash: u64, reader: &mut R) -> Result { + let mut file = self + .file + .write() + .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?; + let prev_offset = self.tail_offset.load(Ordering::Acquire); + + let mut buffer = vec![0; WRITE_STREAM_BUFFER_SIZE]; + let mut total_written = 0; + let mut checksum_state = crc32fast::Hasher::new(); + let mut is_null_only = true; + + while let Ok(bytes_read) = reader.read(&mut buffer) { + if bytes_read == 0 { + break; + } + + if buffer[..bytes_read].iter().any(|&b| b != NULL_BYTE[0]) { + is_null_only = false; + } + + file.write_all(&buffer[..bytes_read])?; + checksum_state.update(&buffer[..bytes_read]); + total_written += bytes_read; + } + + if total_written > 0 && is_null_only { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "NULL-byte-only streams cannot be written directly.", + )); + } + + let checksum = checksum_state.finalize().to_le_bytes(); + let metadata = EntryMetadata { + key_hash, + prev_offset, + checksum, + }; + file.write_all(&metadata.serialize())?; + file.flush()?; + + let tail_offset = prev_offset + total_written as u64 + METADATA_SIZE as u64; + self.reindex( + &file, + &[(key_hash, tail_offset - METADATA_SIZE as u64)], + tail_offset, + None, + )?; + Ok(tail_offset) + } + fn write(&self, key: &[u8], payload: &[u8]) -> Result { let key_hash = compute_hash(key); self.write_with_key_hash(key_hash, payload) } - // TODO: Change signature to: fn batch_write(&self, entries: Vec<(Vec, Vec)>) -> Result { + fn write_with_key_hash(&self, key_hash: u64, payload: &[u8]) -> Result { + self.batch_write_with_key_hashes(vec![(key_hash, payload)], false) + } + + // TODO: Consider change signature to: fn batch_write(&self, entries: Vec<(Vec, Vec)>) -> Result { fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result { let (keys, payloads): (Vec<_>, Vec<_>) = entries.iter().cloned().unzip(); let hashes = compute_hash_batch(&keys); @@ -742,6 +616,68 @@ impl DataStoreWriter for DataStore { self.batch_write_with_key_hashes(hashed_entries, false) } + // TODO: Consider change `prehashed_keys: Vec<(u64, &[u8])>` to `prehashed_keys: Vec<(u64, Vec)>` + fn batch_write_with_key_hashes( + &self, + prehashed_keys: Vec<(u64, &[u8])>, + allow_null_bytes: bool, + ) -> Result { + let mut file = self + .file + .write() + .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?; + + let mut buffer = Vec::new(); + let mut tail_offset = self.tail_offset.load(Ordering::Acquire); + + let mut key_hash_offsets: Vec<(u64, u64)> = Vec::with_capacity(prehashed_keys.len()); + let mut deleted_keys: HashSet = HashSet::new(); + + for (key_hash, payload) in prehashed_keys { + if payload == NULL_BYTE { + if !allow_null_bytes { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "NULL-byte payloads cannot be written directly.", + )); + } + + deleted_keys.insert(key_hash); + } + + if payload.is_empty() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Payload cannot be empty.", + )); + } + + let prev_offset = tail_offset; + let checksum = compute_checksum(payload); + let metadata = EntryMetadata { + key_hash, + prev_offset, + checksum, + }; + let payload_len = payload.len(); + + let mut entry: Vec = vec![0u8; payload_len + METADATA_SIZE]; + simd_copy(&mut entry[..payload.len()], payload); + entry[payload.len()..].copy_from_slice(&metadata.serialize()); + buffer.extend_from_slice(&entry); + + tail_offset += entry.len() as u64; + key_hash_offsets.push((key_hash, tail_offset - METADATA_SIZE as u64)); + } + + file.write_all(&buffer)?; + file.flush()?; + + self.reindex(&file, &key_hash_offsets, tail_offset, Some(&deleted_keys))?; + + Ok(self.tail_offset.load(Ordering::Acquire)) + } + fn rename(&self, old_key: &[u8], new_key: &[u8]) -> Result { if old_key == new_key { return Err(std::io::Error::new( diff --git a/src/storage_engine/traits/writer.rs b/src/storage_engine/traits/writer.rs index 4fd651c8..735ea4b8 100644 --- a/src/storage_engine/traits/writer.rs +++ b/src/storage_engine/traits/writer.rs @@ -28,6 +28,21 @@ pub trait DataStoreWriter { /// - Metadata is appended **after** the full entry is written. fn write_stream(&self, key: &[u8], reader: &mut R) -> Result; + /// Writes an entry using a **precomputed key hash** and a streaming `Read` source. + /// + /// This is a **low-level** method that operates like `write_stream`, but requires + /// the key to be hashed beforehand. It is primarily used internally to avoid + /// redundant hash computations when writing multiple entries. + /// + /// # Parameters: + /// - `key_hash`: The **precomputed hash** of the key. + /// - `reader`: A **streaming reader** (`Read` trait) supplying the entry's content. + /// + /// # Returns: + /// - `Ok(offset)`: The file offset where the entry was written. + /// - `Err(std::io::Error)`: If a write or I/O operation fails. + fn write_stream_with_key_hash(&self, key_hash: u64, reader: &mut R) -> Result; + /// Writes an entry with a given key and payload. /// /// This method computes the hash of the key and delegates to `write_with_key_hash()`. @@ -49,6 +64,26 @@ pub trait DataStoreWriter { /// - For writing **multiple entries at once**, use `batch_write()`. fn write(&self, key: &[u8], payload: &[u8]) -> Result; + /// Writes an entry using a **precomputed key hash** and a payload. + /// + /// This method is a **low-level** alternative to `write()`, allowing direct + /// specification of the key hash. It is mainly used for optimized workflows + /// where the key hash is already known, avoiding redundant computations. + /// + /// # Parameters: + /// - `key_hash`: The **precomputed hash** of the key. + /// - `payload`: The **data payload** to be stored. + /// + /// # Returns: + /// - `Ok(offset)`: The file offset where the entry was written. + /// - `Err(std::io::Error)`: If a write operation fails. + /// + /// # Notes: + /// - The caller is responsible for ensuring that `key_hash` is correctly computed. + /// - This method **locks the file for writing** to maintain consistency. + /// - If writing **multiple entries**, consider using `batch_write_with_key_hashes()`. + fn write_with_key_hash(&self, key_hash: u64, payload: &[u8]) -> Result; + /// Writes multiple key-value pairs as a **single transaction**. /// /// This method computes the hashes of the provided keys and delegates to @@ -70,6 +105,38 @@ pub trait DataStoreWriter { /// - If the key hashes are already computed, use `batch_write_with_key_hashes()`. fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result; + /// Writes multiple key-value pairs as a **single transaction**, using precomputed key hashes. + /// + /// This method efficiently appends multiple entries in a **batch operation**, + /// reducing lock contention and improving performance for bulk writes. + /// + /// # Parameters: + /// - `prehashed_keys`: A **vector of precomputed key hashes and payloads**, where: + /// - `key_hash`: The **precomputed hash** of the key. + /// - `payload`: The **data payload** to be stored. + /// + /// # Returns: + /// - `Ok(final_offset)`: The file offset after all writes. + /// - `Err(std::io::Error)`: If a write operation fails. + /// + /// # Notes: + /// - **File locking is performed only once** for all writes, improving efficiency. + /// - If an entry's `payload` is empty, an error is returned. + /// - This method uses **SIMD-accelerated memory copy (`simd_copy`)** to optimize write + /// performance. + /// - **Metadata (checksums, offsets) is written after payloads** to ensure data integrity. + /// - After writing, the memory-mapped file (`mmap`) is **remapped** to reflect updates. + /// + /// # Efficiency Considerations: + /// - **Faster than multiple `write()` calls**, since it reduces lock contention. + /// - Suitable for **bulk insertions** where key hashes are known beforehand. + /// - If keys are available but not hashed, use `batch_write()` instead. + fn batch_write_with_key_hashes( + &self, + prehashed_keys: Vec<(u64, &[u8])>, + allow_null_bytes: bool, + ) -> Result; + /// Renames an existing entry by copying it under a new key and marking the old key as deleted. /// /// This function: @@ -172,6 +239,25 @@ pub trait AsyncDataStoreWriter { /// - Metadata is appended **after** the full entry is written. async fn write_stream(&self, key: &[u8], reader: &mut R) -> Result; + /// Writes an entry using a **precomputed key hash** and a streaming `Read` source. + /// + /// This is a **low-level** method that operates like `write_stream`, but requires + /// the key to be hashed beforehand. It is primarily used internally to avoid + /// redundant hash computations when writing multiple entries. + /// + /// # Parameters: + /// - `key_hash`: The **precomputed hash** of the key. + /// - `reader`: A **streaming reader** (`Read` trait) supplying the entry's content. + /// + /// # Returns: + /// - `Ok(offset)`: The file offset where the entry was written. + /// - `Err(std::io::Error)`: If a write or I/O operation fails. + async fn write_stream_with_key_hash( + &self, + key_hash: u64, + reader: &mut R, + ) -> Result; + /// Writes an entry with a given key and payload. /// /// This method computes the hash of the key and delegates to `write_with_key_hash()`. @@ -193,6 +279,26 @@ pub trait AsyncDataStoreWriter { /// - For writing **multiple entries at once**, use `batch_write()`. async fn write(&self, key: &[u8], payload: &[u8]) -> Result; + /// Writes an entry using a **precomputed key hash** and a payload. + /// + /// This method is a **low-level** alternative to `write()`, allowing direct + /// specification of the key hash. It is mainly used for optimized workflows + /// where the key hash is already known, avoiding redundant computations. + /// + /// # Parameters: + /// - `key_hash`: The **precomputed hash** of the key. + /// - `payload`: The **data payload** to be stored. + /// + /// # Returns: + /// - `Ok(offset)`: The file offset where the entry was written. + /// - `Err(std::io::Error)`: If a write operation fails. + /// + /// # Notes: + /// - The caller is responsible for ensuring that `key_hash` is correctly computed. + /// - This method **locks the file for writing** to maintain consistency. + /// - If writing **multiple entries**, consider using `batch_write_with_key_hashes()`. + async fn write_with_key_hash(&self, key_hash: u64, payload: &[u8]) -> Result; + /// Writes multiple key-value pairs as a **single transaction**. /// /// This method computes the hashes of the provided keys and delegates to @@ -214,6 +320,38 @@ pub trait AsyncDataStoreWriter { /// - If the key hashes are already computed, use `batch_write_with_key_hashes()`. async fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result; + /// Writes multiple key-value pairs as a **single transaction**, using precomputed key hashes. + /// + /// This method efficiently appends multiple entries in a **batch operation**, + /// reducing lock contention and improving performance for bulk writes. + /// + /// # Parameters: + /// - `prehashed_keys`: A **vector of precomputed key hashes and payloads**, where: + /// - `key_hash`: The **precomputed hash** of the key. + /// - `payload`: The **data payload** to be stored. + /// + /// # Returns: + /// - `Ok(final_offset)`: The file offset after all writes. + /// - `Err(std::io::Error)`: If a write operation fails. + /// + /// # Notes: + /// - **File locking is performed only once** for all writes, improving efficiency. + /// - If an entry's `payload` is empty, an error is returned. + /// - This method uses **SIMD-accelerated memory copy (`simd_copy`)** to optimize write + /// performance. + /// - **Metadata (checksums, offsets) is written after payloads** to ensure data integrity. + /// - After writing, the memory-mapped file (`mmap`) is **remapped** to reflect updates. + /// + /// # Efficiency Considerations: + /// - **Faster than multiple `write()` calls**, since it reduces lock contention. + /// - Suitable for **bulk insertions** where key hashes are known beforehand. + /// - If keys are available but not hashed, use `batch_write()` instead. + async fn batch_write_with_key_hashes( + &self, + prehashed_keys: Vec<(u64, &[u8])>, + allow_null_bytes: bool, + ) -> Result; + /// Renames an existing entry by copying it under a new key and marking the old key as deleted. /// /// This function: From 42413ffb46bfa03971f7dbae54183089836b29dc Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 12:25:00 -0600 Subject: [PATCH 21/32] Implement `batch_read_hashed_keys` --- .../simd-r-drive-ws-client/src/ws_client.rs | 8 ++ src/storage_engine/data_store.rs | 68 ++++++++-- src/storage_engine/traits/reader.rs | 74 +++++++++++ tests/batch_ops_tests.rs | 116 +++++++++++++++++- 4 files changed, 252 insertions(+), 14 deletions(-) diff --git a/experiments/simd-r-drive-ws-client/src/ws_client.rs b/experiments/simd-r-drive-ws-client/src/ws_client.rs index c9d447ba..85028179 100644 --- a/experiments/simd-r-drive-ws-client/src/ws_client.rs +++ b/experiments/simd-r-drive-ws-client/src/ws_client.rs @@ -142,6 +142,14 @@ impl AsyncDataStoreReader for WsClient { Ok(batch_read_result.entries_payloads) } + async fn batch_read_hashed_keys( + &self, + _prehashed_keys: &[u64], + _non_hashed_keys: Option<&[&[u8]]>, + ) -> Result>> { + unimplemented!("`batch_read_hashed_keys` is not currently implemented"); + } + async fn read_metadata(&self, _key: &[u8]) -> Result> { unimplemented!("`read_metadata` is not currently implemented"); } diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index f68c2ada..bc820b2a 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -351,7 +351,7 @@ impl DataStore { #[inline] fn read_entry_with_context<'a>( &self, - key: &[u8], + non_hashed_key: Option<&[u8]>, key_hash: u64, mmap_arc: &Arc, key_indexer_guard: &RwLockReadGuard<'a, KeyIndexer>, @@ -360,9 +360,13 @@ impl DataStore { let (tag, offset) = KeyIndexer::unpack(packed); // The crucial verification check, now centralized. - if tag != KeyIndexer::tag_from_key(key) { - warn!("Tag mismatch detected for key, likely a hash collision or index corruption."); - return None; + if let Some(non_hashed_key) = non_hashed_key { + if tag != KeyIndexer::tag_from_key(non_hashed_key) { + warn!( + "Tag mismatch detected for `non_hashed_key`, likely a hash collision or index corruption." + ); + return None; + } } let offset = offset as usize; @@ -746,7 +750,7 @@ impl DataStoreReader for DataStore { .map_err(|_| Error::other("key-index lock poisoned"))?; let mmap_arc = self.get_mmap_arc(); - Ok(self.read_entry_with_context(key, key_hash, &mmap_arc, &key_indexer_guard)) + Ok(self.read_entry_with_context(Some(key), key_hash, &mmap_arc, &key_indexer_guard)) } fn read_last_entry(&self) -> Result> { @@ -778,21 +782,59 @@ impl DataStoreReader for DataStore { } fn batch_read(&self, keys: &[&[u8]]) -> Result>> { + let hashed_keys = compute_hash_batch(keys); + + self.batch_read_hashed_keys(&hashed_keys, Some(&keys)) + } + + fn batch_read_hashed_keys( + &self, + prehashed_keys: &[u64], + non_hashed_keys: Option<&[&[u8]]>, + ) -> Result>> { let mmap_arc = self.get_mmap_arc(); let key_indexer_guard = self .key_indexer .read() .map_err(|_| Error::other("Key-index lock poisoned during `batch_read`"))?; - let hashes = compute_hash_batch(keys); + // Use a match to handle the two possible scenarios + let results = match non_hashed_keys { + // Case 1: We have the original keys for tag verification. + Some(keys) => { + // Good practice to ensure lengths match. + if keys.len() != prehashed_keys.len() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Mismatched lengths for hashed and non-hashed keys.", + )); + } - let results = hashes - .into_iter() - .zip(keys.iter()) - .map(|(key_hash, &key)| { - self.read_entry_with_context(key, key_hash, &mmap_arc, &key_indexer_guard) - }) - .collect(); + prehashed_keys + .iter() + .zip(keys.iter()) + .map(|(key_hash, &key)| { + // Correctly pass `Some(key)` for verification + self.read_entry_with_context( + Some(key), + *key_hash, + &mmap_arc, + &key_indexer_guard, + ) + }) + .collect() + } + // Case 2: We only have the hashes and must skip tag verification. + None => { + prehashed_keys + .iter() + .map(|key_hash| { + // Correctly pass `None` as we don't have the original key + self.read_entry_with_context(None, *key_hash, &mmap_arc, &key_indexer_guard) + }) + .collect() + } + }; Ok(results) } diff --git a/src/storage_engine/traits/reader.rs b/src/storage_engine/traits/reader.rs index edb034b4..7f3aae91 100644 --- a/src/storage_engine/traits/reader.rs +++ b/src/storage_engine/traits/reader.rs @@ -65,6 +65,43 @@ pub trait DataStoreReader { /// - `Err(std::io::Error)`: On I/O failure. fn batch_read(&self, keys: &[&[u8]]) -> Result>>; + /// Reads many keys in one shot using pre-computed hashes. + /// + /// This is a lower-level, high-performance version of [`Self::batch_read`]. + /// It is designed for scenarios where the caller has already computed the key + /// hashes and wants to avoid the overhead of re-hashing. The method offers + /// an optional verification step to safeguard against hash collisions. + /// + /// * **Zero-copy**: Each `Some(EntryHandle)` provides a direct, zero-copy view + /// into the memory-mapped file. + /// * **High-performance**: Bypasses the key hashing step if hashes are already + /// available. + /// * **Thread-safe**: Acquires a single read lock for the entire batch + /// operation, minimizing contention. + /// + /// # Parameters + /// - `prehashed_keys`: A slice of `u64` key hashes to look up. + /// - `non_hashed_keys`: An optional slice of the original, non-hashed keys + /// corresponding to `prehashed_keys`. + /// - If `Some(keys)`, the method performs a tag-based verification to ensure + /// that the found entry truly belongs to the original key, preventing + /// data retrieval from a hash collision. The length of this slice + /// **must** match the length of `prehashed_keys`. + /// - If `None`, this verification is skipped. The lookup relies solely + /// on the hash, which is faster but carries a theoretical risk of + /// returning incorrect data in the event of a hash collision. + /// + /// # Returns + /// - `Ok(results)`: A `Vec>` where each element + /// corresponds to the result of looking up the key at the same index. + /// - `Err(std::io::Error)`: On I/O failure or if the lengths of `prehashed_keys` + /// and `non_hashed_keys` (when `Some`) do not match. + fn batch_read_hashed_keys( + &self, + prehashed_keys: &[u64], + non_hashed_keys: Option<&[&[u8]]>, + ) -> Result>>; + /// Retrieves metadata for a given key. /// /// This method looks up a key in the storage and returns its associated metadata. @@ -165,6 +202,43 @@ pub trait AsyncDataStoreReader { /// - `Err(std::io::Error)`: On I/O failure. async fn batch_read(&self, keys: &[&[u8]]) -> Result>>; + /// Reads many keys in one shot using pre-computed hashes. + /// + /// This is a lower-level, high-performance version of [`Self::batch_read`]. + /// It is designed for scenarios where the caller has already computed the key + /// hashes and wants to avoid the overhead of re-hashing. The method offers + /// an optional verification step to safeguard against hash collisions. + /// + /// * **Zero-copy**: Each `Some(EntryHandle)` provides a direct, zero-copy view + /// into the memory-mapped file. + /// * **High-performance**: Bypasses the key hashing step if hashes are already + /// available. + /// * **Thread-safe**: Acquires a single read lock for the entire batch + /// operation, minimizing contention. + /// + /// # Parameters + /// - `prehashed_keys`: A slice of `u64` key hashes to look up. + /// - `non_hashed_keys`: An optional slice of the original, non-hashed keys + /// corresponding to `prehashed_keys`. + /// - If `Some(keys)`, the method performs a tag-based verification to ensure + /// that the found entry truly belongs to the original key, preventing + /// data retrieval from a hash collision. The length of this slice + /// **must** match the length of `prehashed_keys`. + /// - If `None`, this verification is skipped. The lookup relies solely + /// on the hash, which is faster but carries a theoretical risk of + /// returning incorrect data in the event of a hash collision. + /// + /// # Returns + /// - `Ok(results)`: A `Vec>` where each element + /// corresponds to the result of looking up the key at the same index. + /// - `Err(std::io::Error)`: On I/O failure or if the lengths of `prehashed_keys` + /// and `non_hashed_keys` (when `Some`) do not match. + async fn batch_read_hashed_keys( + &self, + prehashed_keys: &[u64], + non_hashed_keys: Option<&[&[u8]]>, + ) -> Result>>; + /// Retrieves metadata for a given key. /// /// This method looks up a key in the storage and returns its associated metadata. diff --git a/tests/batch_ops_tests.rs b/tests/batch_ops_tests.rs index 425d5b22..d58a24a1 100644 --- a/tests/batch_ops_tests.rs +++ b/tests/batch_ops_tests.rs @@ -1,7 +1,7 @@ //! Integration-tests for the batch-write / batch-read API. use simd_r_drive::{ - DataStore, + DataStore, compute_hash_batch, traits::{DataStoreReader, DataStoreWriter}, }; use tempfile::tempdir; @@ -165,3 +165,117 @@ fn test_batch_read_with_missing_key() { "wrong payload for exists_2" ); } + +#[test] +fn test_batch_write_rejects_null_byte_among_many() { + let (_dir, storage) = create_temp_storage(); + // First call should fail because the middle entry is a null byte + let entries = vec![ + (b"k1".as_slice(), b"payload1".as_slice()), + (b"k_null".as_slice(), b"\0".as_slice()), // ← invalid + (b"k2".as_slice(), b"payload2".as_slice()), + ]; + let err = storage.batch_write(&entries).expect_err("should fail"); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); + + // Ensure *nothing* was persisted + assert!(storage.read(b"k1").unwrap().is_none()); + assert!(storage.read(b"k2").unwrap().is_none()); + assert_eq!( + storage.len().unwrap(), + 0, + "no entries should have been written" + ); +} + +/// End-to-end test of `batch_read_hashed_keys` with full verification. +/// * write with `batch_write` +/// * compute hashes +/// * fetch with `batch_read_hashed_keys` providing both hashes and original keys +/// * verify ordering & presence match +#[test] +fn test_batch_read_hashed_keys_with_verification() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![ + (b"key1".as_slice(), b"val1".as_slice()), + (b"key2".as_slice(), b"val2".as_slice()), + ]; + storage.batch_write(&entries).expect("batch_write failed"); + + let keys: Vec<&[u8]> = entries.iter().map(|(k, _)| *k).collect(); + let hashes = compute_hash_batch(&keys); + + // Read back using the hashed key method with original keys for verification + let results = storage + .batch_read_hashed_keys(&hashes, Some(&keys)) + .unwrap(); + assert_eq!(results.len(), keys.len()); + + for ((_expected_key, expected_val), got_opt) in entries.iter().zip(results.iter()) { + let got = got_opt + .as_ref() + .expect("missing key in batch_read_hashed_keys"); + assert_eq!(got.as_slice(), *expected_val); + } +} + +/// End-to-end test of `batch_read_hashed_keys` without verification (hash-only). +#[test] +fn test_batch_read_hashed_keys_without_verification() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![(b"key1".as_slice(), b"val1".as_slice())]; + storage.batch_write(&entries).expect("batch_write failed"); + + let keys: Vec<&[u8]> = entries.iter().map(|(k, _)| *k).collect(); + let hashes = compute_hash_batch(&keys); + + // Read back using only the hash, passing `None` for the original keys + let results = storage.batch_read_hashed_keys(&hashes, None).unwrap(); + assert_eq!(results.len(), 1); + assert_eq!(results[0].as_ref().unwrap().as_slice(), entries[0].1); +} + +/// `batch_read_hashed_keys` should return `None` for keys that are not present. +#[test] +fn test_batch_read_hashed_keys_with_missing_keys() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![(b"exists".as_slice(), b"payload".as_slice())]; + storage.batch_write(&entries).expect("batch_write failed"); + + let existing_hash = compute_hash_batch(&[b"exists" as &[u8]])[0]; + let missing_hash = 12345_u64; // A key that was never written + + let hashes = vec![existing_hash, missing_hash]; + let results = storage.batch_read_hashed_keys(&hashes, None).unwrap(); + + assert_eq!(results.len(), 2); + assert!(results[0].is_some(), "expected entry for existing key"); + assert!(results[1].is_none(), "expected None for missing key"); +} + +/// Verifies that `batch_read_hashed_keys` with key verification enabled +/// will reject a match if the key's tag doesn't align with the hash. +/// This simulates a hash collision and confirms the safety check works. +#[test] +fn test_batch_read_hashed_keys_detects_collision() { + let (_dir, storage) = create_temp_storage(); + let real_key = b"real_key"; + let fake_key = b"fake_key"; // A different key + let payload = b"some data"; + storage.write(real_key, payload).unwrap(); + + // Get the hash of the key that actually exists in storage. + let real_hash = compute_hash_batch(&[real_key])[0]; + + // Now, try to read using the *real hash* but providing the *fake key* + // for verification. The tag check inside the read logic should fail. + let results = storage + .batch_read_hashed_keys(&[real_hash], Some(&[fake_key])) + .unwrap(); + + assert_eq!(results.len(), 1); + assert!( + results[0].is_none(), + "Read should fail due to tag mismatch, simulating a hash collision" + ); +} From 0ba9f0ac58f451ee91d0e86bd4122807c82e4ad8 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 12:40:33 -0600 Subject: [PATCH 22/32] Implement `batch_delete` --- .../simd-r-drive-ws-client/src/ws_client.rs | 8 ++ src/storage_engine/data_store.rs | 39 +++++++++- src/storage_engine/traits/writer.rs | 58 +++++++++++++++ tests/batch_ops_tests.rs | 74 ++++++++++++++++++- 4 files changed, 174 insertions(+), 5 deletions(-) diff --git a/experiments/simd-r-drive-ws-client/src/ws_client.rs b/experiments/simd-r-drive-ws-client/src/ws_client.rs index 85028179..ee93fee0 100644 --- a/experiments/simd-r-drive-ws-client/src/ws_client.rs +++ b/experiments/simd-r-drive-ws-client/src/ws_client.rs @@ -105,6 +105,14 @@ impl AsyncDataStoreWriter for WsClient { Ok(resp.tail_offset) } + + async fn batch_delete(&self, _keys: &[&[u8]]) -> Result { + unimplemented!("`batch_delete` is not currently implemented"); + } + + async fn batch_delete_key_hashes(&self, _prehashed_keys: &[u64]) -> Result { + unimplemented!("`batch_delete_key_hashes` is not currently implemented"); + } } #[async_trait::async_trait] diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index bc820b2a..6ec93207 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -727,12 +727,43 @@ impl DataStoreWriter for DataStore { } fn delete(&self, key: &[u8]) -> Result { - let key_hash = compute_hash(key); - // TODO: Check prior exists before deletion - self.batch_write_with_key_hashes(vec![(key_hash, &NULL_BYTE)], true) + self.batch_delete(&[key]) + } + + fn batch_delete(&self, keys: &[&[u8]]) -> Result { + let key_hashes = compute_hash_batch(keys); + self.batch_delete_key_hashes(&key_hashes) } - // TODO: Implement batch_delete + fn batch_delete_key_hashes(&self, prehashed_keys: &[u64]) -> Result { + // First, check which keys actually exist to avoid writing useless tombstones. + let keys_to_delete: Vec = { + let key_indexer_guard = self + .key_indexer + .read() + .map_err(|_| Error::other("Key-index lock poisoned during batch_delete check"))?; + + prehashed_keys + .iter() + .filter(|&&key_hash| key_indexer_guard.get_packed(&key_hash).is_some()) + .cloned() + .collect() + }; + + // If no keys were found to delete, we can exit early without any file I/O. + if keys_to_delete.is_empty() { + return Ok(self.tail_offset.load(Ordering::Acquire)); + } + + // Prepare the delete operations (a key hash + a null byte payload). + let delete_ops: Vec<(u64, &[u8])> = keys_to_delete + .iter() + .map(|&key_hash| (key_hash, &NULL_BYTE as &[u8])) + .collect(); + + // Use the underlying batch write method, allowing null bytes for tombstones. + self.batch_write_with_key_hashes(delete_ops, true) + } } impl DataStoreReader for DataStore { diff --git a/src/storage_engine/traits/writer.rs b/src/storage_engine/traits/writer.rs index 735ea4b8..9d1062ac 100644 --- a/src/storage_engine/traits/writer.rs +++ b/src/storage_engine/traits/writer.rs @@ -209,6 +209,35 @@ pub trait DataStoreWriter { /// # Returns: /// - The **new file offset** where the delete marker was appended. fn delete(&self, key: &[u8]) -> Result; + + /// Deletes a batch of entries from the storage by their keys. + /// + /// This method computes the hash for each key and then calls the underlying + /// `batch_delete_key_hashes` method. It will only write deletion markers + /// (tombstones) for keys that currently exist in the store. + /// + /// # Parameters + /// - `keys`: A slice of keys to be deleted. + /// + /// # Returns + /// - `Ok(tail_offset)`: The new tail offset of the file after the operation. + /// - `Err(std::io::Error)`: On I/O failure. + fn batch_delete(&self, keys: &[&[u8]]) -> Result; + + /// Deletes a batch of entries from the storage using pre-computed key hashes. + /// + /// This is the lowest-level batch deletion method. It checks for the existence + /// of each key hash in the in-memory index before writing a deletion marker. + /// This prevents the store from being filled with unnecessary tombstones for + /// keys that were never present. + /// + /// # Parameters + /// - `prehashed_keys`: A slice of `u64` key hashes to be deleted. + /// + /// # Returns + /// - `Ok(tail_offset)`: The new tail offset of the file after the operation. + /// - `Err(std::io::Error)`: On I/O failure. + fn batch_delete_key_hashes(&self, prehashed_keys: &[u64]) -> Result; } #[async_trait::async_trait] @@ -424,4 +453,33 @@ pub trait AsyncDataStoreWriter { /// # Returns: /// - The **new file offset** where the delete marker was appended. async fn delete(&self, key: &[u8]) -> Result; + + /// Deletes a batch of entries from the storage by their keys. + /// + /// This method computes the hash for each key and then calls the underlying + /// `batch_delete_key_hashes` method. It will only write deletion markers + /// (tombstones) for keys that currently exist in the store. + /// + /// # Parameters + /// - `keys`: A slice of keys to be deleted. + /// + /// # Returns + /// - `Ok(tail_offset)`: The new tail offset of the file after the operation. + /// - `Err(std::io::Error)`: On I/O failure. + async fn batch_delete(&self, keys: &[&[u8]]) -> Result; + + /// Deletes a batch of entries from the storage using pre-computed key hashes. + /// + /// This is the lowest-level batch deletion method. It checks for the existence + /// of each key hash in the in-memory index before writing a deletion marker. + /// This prevents the store from being filled with unnecessary tombstones for + /// keys that were never present. + /// + /// # Parameters + /// - `prehashed_keys`: A slice of `u64` key hashes to be deleted. + /// + /// # Returns + /// - `Ok(tail_offset)`: The new tail offset of the file after the operation. + /// - `Err(std::io::Error)`: On I/O failure. + async fn batch_delete_key_hashes(&self, prehashed_keys: &[u64]) -> Result; } diff --git a/tests/batch_ops_tests.rs b/tests/batch_ops_tests.rs index d58a24a1..68d9ca7d 100644 --- a/tests/batch_ops_tests.rs +++ b/tests/batch_ops_tests.rs @@ -1,7 +1,7 @@ //! Integration-tests for the batch-write / batch-read API. use simd_r_drive::{ - DataStore, compute_hash_batch, + DataStore, compute_hash, compute_hash_batch, traits::{DataStoreReader, DataStoreWriter}, }; use tempfile::tempdir; @@ -279,3 +279,75 @@ fn test_batch_read_hashed_keys_detects_collision() { "Read should fail due to tag mismatch, simulating a hash collision" ); } + +/// Happy-path: write a handful of entries, then delete a subset of them +/// in a single batch operation. +#[test] +fn test_batch_delete() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![ + (b"alpha".as_slice(), b"one".as_slice()), + (b"beta".as_slice(), b"two".as_slice()), + (b"gamma".as_slice(), b"three".as_slice()), + (b"delta".as_slice(), b"four".as_slice()), + ]; + storage.batch_write(&entries).expect("batch_write failed"); + assert_eq!(storage.len().unwrap(), 4); + + // Delete two of the entries + let keys_to_delete = [b"beta".as_slice(), b"delta".as_slice()]; + storage + .batch_delete(&keys_to_delete) + .expect("batch_delete failed"); + + // Verify store state + assert_eq!(storage.len().unwrap(), 2, "Length should be reduced by 2"); + assert!(storage.read(b"beta").unwrap().is_none()); + assert!(storage.read(b"delta").unwrap().is_none()); + + // Ensure other keys are unaffected + assert!(storage.read(b"alpha").unwrap().is_some()); + assert!(storage.read(b"gamma").unwrap().is_some()); +} + +/// Verify that `batch_delete` correctly handles a mix of keys that +/// exist and keys that do not. The operation should succeed, and only +/// existing keys should be deleted. +#[test] +fn test_batch_delete_with_missing_keys() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![(b"key1".as_slice(), b"val1".as_slice())]; + storage.batch_write(&entries).expect("batch_write failed"); + assert_eq!(storage.len().unwrap(), 1); + + // Attempt to delete one existing and one non-existent key + let keys_to_delete = [b"key1".as_slice(), b"non_existent_key".as_slice()]; + storage + .batch_delete(&keys_to_delete) + .expect("batch_delete should not fail on missing keys"); + + // Verify only the existing key was deleted + assert_eq!(storage.len().unwrap(), 0); + assert!(storage.is_empty().unwrap()); + assert!(storage.read(b"key1").unwrap().is_none()); +} + +/// Verify the lowest-level batch delete function works as intended, +/// ignoring hashes for keys that are not present in the store. +#[test] +fn test_batch_delete_key_hashes() { + let (_dir, storage) = create_temp_storage(); + storage.write(b"real", b"data").unwrap(); + assert_eq!(storage.len().unwrap(), 1); + + let real_hash = compute_hash(b"real"); + let fake_hash = 1234567890_u64; // A hash for a key that doesn't exist + + let hashes_to_delete = [real_hash, fake_hash]; + storage + .batch_delete_key_hashes(&hashes_to_delete) + .expect("batch_delete_key_hashes failed"); + + // The store should now be empty because the only real key was deleted. + assert!(storage.is_empty().unwrap()); +} From 3a02528a47d206701ebaa2dee11d9c8ec84a87f9 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 13:20:51 -0600 Subject: [PATCH 23/32] Update comments --- src/storage_engine/traits/writer.rs | 34 +++++++++++++++++++---------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/storage_engine/traits/writer.rs b/src/storage_engine/traits/writer.rs index 9d1062ac..fc326f8c 100644 --- a/src/storage_engine/traits/writer.rs +++ b/src/storage_engine/traits/writer.rs @@ -198,16 +198,21 @@ pub trait DataStoreWriter { /// the append-only structure. fn transfer(&self, key: &[u8], target: &DataStore) -> Result; - /// Deletes a key by appending a **null byte marker**. + /// Logically deletes an entry by its key. /// - /// The storage engine is **append-only**, so keys cannot be removed directly. - /// Instead, a **null byte is appended** as a tombstone entry to mark the key as deleted. + /// The storage engine is **append-only**, so entries are not removed directly. + /// Instead, this method appends a **tombstone marker** to logically delete the key. /// - /// # Parameters: + /// This operation first **verifies that the key exists** before appending a tombstone. + /// If the key is not found, no data is written to the file, and the operation + /// succeeds without changing the store's state. + /// + /// # Parameters /// - `key`: The **binary key** to mark as deleted. /// - /// # Returns: - /// - The **new file offset** where the delete marker was appended. + /// # Returns + /// - `Ok(tail_offset)`: The file's tail offset after the operation completes. + /// - `Err(std::io::Error)`: On I/O failure. fn delete(&self, key: &[u8]) -> Result; /// Deletes a batch of entries from the storage by their keys. @@ -442,16 +447,21 @@ pub trait AsyncDataStoreWriter { /// the append-only structure. async fn transfer(&self, key: &[u8], target: &DataStore) -> Result; - /// Deletes a key by appending a **null byte marker**. + /// Logically deletes an entry by its key. /// - /// The storage engine is **append-only**, so keys cannot be removed directly. - /// Instead, a **null byte is appended** as a tombstone entry to mark the key as deleted. + /// The storage engine is **append-only**, so entries are not removed directly. + /// Instead, this method appends a **tombstone marker** to logically delete the key. /// - /// # Parameters: + /// This operation first **verifies that the key exists** before appending a tombstone. + /// If the key is not found, no data is written to the file, and the operation + /// succeeds without changing the store's state. + /// + /// # Parameters /// - `key`: The **binary key** to mark as deleted. /// - /// # Returns: - /// - The **new file offset** where the delete marker was appended. + /// # Returns + /// - `Ok(tail_offset)`: The file's tail offset after the operation completes. + /// - `Err(std::io::Error)`: On I/O failure. async fn delete(&self, key: &[u8]) -> Result; /// Deletes a batch of entries from the storage by their keys. From a54b66b91c6c44a8d4f3de8de46ba73b1f46ef1c Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 13:44:46 -0600 Subject: [PATCH 24/32] Add `exists_with_key_hash` and `read_with_key_hash` methods --- .../simd-r-drive-ws-client/src/ws_client.rs | 11 + src/lib.rs | 2 + src/storage_engine.rs | 1 + src/storage_engine/data_store.rs | 17 + src/storage_engine/traits/reader.rs | 71 +++ tests/basic_operations_tests.rs | 529 ++++++++++-------- 6 files changed, 398 insertions(+), 233 deletions(-) diff --git a/experiments/simd-r-drive-ws-client/src/ws_client.rs b/experiments/simd-r-drive-ws-client/src/ws_client.rs index ee93fee0..e519c92e 100644 --- a/experiments/simd-r-drive-ws-client/src/ws_client.rs +++ b/experiments/simd-r-drive-ws-client/src/ws_client.rs @@ -127,6 +127,10 @@ impl AsyncDataStoreReader for WsClient { Ok(response_params.exists) } + async fn exists_with_key_hash(&self, _prehashed_key: u64) -> Result { + unimplemented!("`exists_with_key_hash` is not currently implemented"); + } + async fn read(&self, key: &[u8]) -> Result> { let response_params = Read::call(&self.rpc_client, ReadRequestParams { key: key.to_vec() }).await?; @@ -134,6 +138,13 @@ impl AsyncDataStoreReader for WsClient { Ok(response_params.entry_payload) } + async fn read_with_key_hash( + &self, + _prehashed_key: u64, + ) -> Result> { + unimplemented!("`read_with_key_hash` is not currently implemented"); + } + async fn read_last_entry(&self) -> Result> { unimplemented!("`read_last_entry` is not currently implemented"); } diff --git a/src/lib.rs b/src/lib.rs index 070516f5..7cabbd4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -132,3 +132,5 @@ pub use storage_engine::digest::*; pub use storage_engine::*; pub mod utils; + +pub use storage_engine::NULL_BYTE; diff --git a/src/storage_engine.rs b/src/storage_engine.rs index a1d123fe..bea45024 100644 --- a/src/storage_engine.rs +++ b/src/storage_engine.rs @@ -1,4 +1,5 @@ mod constants; +pub use constants::NULL_BYTE; use constants::*; mod data_store; diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 6ec93207..923df20f 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -773,6 +773,11 @@ impl DataStoreReader for DataStore { Ok(self.read(key)?.is_some()) } + fn exists_with_key_hash(&self, prehashed_key: u64) -> Result { + // This is a lightweight wrapper around the read method, just like exists(). + Ok(self.read_with_key_hash(prehashed_key)?.is_some()) + } + fn read(&self, key: &[u8]) -> Result> { let key_hash = compute_hash(key); let key_indexer_guard = self @@ -784,6 +789,18 @@ impl DataStoreReader for DataStore { Ok(self.read_entry_with_context(Some(key), key_hash, &mmap_arc, &key_indexer_guard)) } + fn read_with_key_hash(&self, prehashed_key: u64) -> Result> { + let key_indexer_guard = self + .key_indexer + .read() + .map_err(|_| Error::other("key-index lock poisoned"))?; + let mmap_arc = self.get_mmap_arc(); + + // Call the core logic with `None` for the key, as we are only using the hash + // and want to skip the tag verification check. + Ok(self.read_entry_with_context(None, prehashed_key, &mmap_arc, &key_indexer_guard)) + } + fn read_last_entry(&self) -> Result> { let mmap_arc = self.get_mmap_arc(); let tail_offset = self.tail_offset.load(std::sync::atomic::Ordering::Acquire); diff --git a/src/storage_engine/traits/reader.rs b/src/storage_engine/traits/reader.rs index 7f3aae91..100b5437 100644 --- a/src/storage_engine/traits/reader.rs +++ b/src/storage_engine/traits/reader.rs @@ -20,6 +20,22 @@ pub trait DataStoreReader { /// - `Err(std::io::Error)`: On I/O failure. fn exists(&self, key: &[u8]) -> Result; + /// Checks whether a key with a pre-computed hash exists in the store. + /// + /// This is a more direct version of [`Self::exists`] that skips the hashing step, + /// making it faster if the hash is already known. Because the original key is not + /// provided, this check does not perform tag verification and relies solely on the + /// hash's presence in the index. + /// + /// # Parameters + /// - `prehashed_key`: The **pre-computed hash** of the key to check. + /// + /// # Returns + /// - `Ok(true)` if the key hash exists in the index. + /// - `Ok(false)` if the key hash is absent. + /// - `Err(std::io::Error)`: On I/O failure. + fn exists_with_key_hash(&self, prehashed_key: u64) -> Result; + /// Retrieves the most recent value associated with a given key. /// /// This method **efficiently looks up a key** using a fast in-memory index, @@ -37,6 +53,25 @@ pub trait DataStoreReader { /// - The returned `EntryHandle` provides zero-copy access to the stored data. fn read(&self, key: &[u8]) -> Result>; + /// Retrieves the most recent value associated with a pre-computed key hash. + /// + /// This is a low-level alternative to [`Self::read`] that looks up an entry using + /// only its hash, bypassing the hashing step. + /// + /// # Warning + /// This method does **not** perform tag verification, as the original key is not + /// provided. This means that in the rare event of a hash collision, this function + /// could return the entry for a different key. + /// + /// # Parameters + /// - `prehashed_key`: The **pre-computed hash** of the key to retrieve. + /// + /// # Returns + /// - `Ok(Some(EntryHandle))`: Handle to the entry if found. + /// - `Ok(None)`: If the key hash does not exist or is deleted. + /// - `Err(std::io::Error)`: On I/O failure. + fn read_with_key_hash(&self, prehashed_key: u64) -> Result>; + /// Retrieves the last entry written to the file. /// /// # Returns: @@ -157,6 +192,22 @@ pub trait AsyncDataStoreReader { /// - `Err(std::io::Error)`: On I/O failure. async fn exists(&self, key: &[u8]) -> Result; + /// Checks whether a key with a pre-computed hash exists in the store. + /// + /// This is a more direct version of [`Self::exists`] that skips the hashing step, + /// making it faster if the hash is already known. Because the original key is not + /// provided, this check does not perform tag verification and relies solely on the + /// hash's presence in the index. + /// + /// # Parameters + /// - `prehashed_key`: The **pre-computed hash** of the key to check. + /// + /// # Returns + /// - `Ok(true)` if the key hash exists in the index. + /// - `Ok(false)` if the key hash is absent. + /// - `Err(std::io::Error)`: On I/O failure. + async fn exists_with_key_hash(&self, prehashed_key: u64) -> Result; + /// Retrieves the most recent value associated with a given key. /// /// This method **efficiently looks up a key** using a fast in-memory index, @@ -174,6 +225,26 @@ pub trait AsyncDataStoreReader { /// - The returned `EntryHandle` provides zero-copy access to the stored data. async fn read(&self, key: &[u8]) -> Result>; + /// Retrieves the most recent value associated with a pre-computed key hash. + /// + /// This is a low-level alternative to [`Self::read`] that looks up an entry using + /// only its hash, bypassing the hashing step. + /// + /// # Warning + /// This method does **not** perform tag verification, as the original key is not + /// provided. This means that in the rare event of a hash collision, this function + /// could return the entry for a different key. + /// + /// # Parameters + /// - `prehashed_key`: The **pre-computed hash** of the key to retrieve. + /// + /// # Returns + /// - `Ok(Some(EntryHandle))`: Handle to the entry if found. + /// - `Ok(None)`: If the key hash does not exist or is deleted. + /// - `Err(std::io::Error)`: On I/O failure. + async fn read_with_key_hash(&self, prehashed_key: u64) + -> Result>; + /// Retrieves the last entry written to the file. /// /// # Returns: diff --git a/tests/basic_operations_tests.rs b/tests/basic_operations_tests.rs index 42a75ded..d25dc5c5 100644 --- a/tests/basic_operations_tests.rs +++ b/tests/basic_operations_tests.rs @@ -1,265 +1,328 @@ -#[cfg(test)] -mod tests { - - use simd_r_drive::{ - DataStore, - traits::{DataStoreReader, DataStoreWriter}, - }; - use tempfile::tempdir; - - /// Helper function to create a temporary file for testing - fn create_temp_storage() -> (tempfile::TempDir, DataStore) { - let dir = tempdir().expect("Failed to create temp dir"); - let path = dir.path().join("test_storage.bin"); - - let storage = DataStore::open(&path).expect("Failed to open storage"); - (dir, storage) - } +use simd_r_drive::{ + DataStore, compute_hash, + traits::{DataStoreReader, DataStoreWriter}, +}; +use tempfile::tempdir; + +/// Helper function to create a temporary file for testing +fn create_temp_storage() -> (tempfile::TempDir, DataStore) { + let dir = tempdir().expect("Failed to create temp dir"); + let path = dir.path().join("test_storage.bin"); + + let storage = DataStore::open(&path).expect("Failed to open storage"); + (dir, storage) +} - #[test] - fn test_emptiness_check() { - let (_dir, storage) = create_temp_storage(); +#[test] +fn test_emptiness_check() { + let (_dir, storage) = create_temp_storage(); - assert!(storage.is_empty().unwrap()); + assert!(storage.is_empty().unwrap()); - let key = b"test_key".as_slice(); - let payload = b"Hello, world!".as_slice(); - storage.write(key, payload).expect("Failed to append entry"); + let key = b"test_key".as_slice(); + let payload = b"Hello, world!".as_slice(); + storage.write(key, payload).expect("Failed to append entry"); - assert!(!storage.is_empty().unwrap()); - } + assert!(!storage.is_empty().unwrap()); +} - #[test] - fn test_exists_checks_key_presence() { - let (_dir, storage) = create_temp_storage(); - - let key = b"exists_key".as_slice(); - let payload = b"some payload".as_slice(); - - // 1. Key should NOT exist before any write. - assert!( - !storage.exists(key).unwrap(), - "Key unexpectedly exists before write" - ); - - // 2. After write, key should exist. - storage.write(key, payload).expect("Failed to write entry"); - assert!(storage.exists(key).unwrap(), "Key should exist after write"); - - // 3. After delete, key should no longer exist. - storage.delete(key).expect("Failed to delete entry"); - assert!( - !storage.exists(key).unwrap(), - "Key should not exist after delete" - ); - } +#[test] +fn test_exists_checks_key_presence() { + let (_dir, storage) = create_temp_storage(); + + let key = b"exists_key".as_slice(); + let payload = b"some payload".as_slice(); + + // 1. Key should NOT exist before any write. + assert!( + !storage.exists(key).unwrap(), + "Key unexpectedly exists before write" + ); + + // 2. After write, key should exist. + storage.write(key, payload).expect("Failed to write entry"); + assert!(storage.exists(key).unwrap(), "Key should exist after write"); + + // 3. After delete, key should no longer exist. + storage.delete(key).expect("Failed to delete entry"); + assert!( + !storage.exists(key).unwrap(), + "Key should not exist after delete" + ); +} - #[test] - fn test_append_and_read_last_entry() { - let (_dir, storage) = create_temp_storage(); +#[test] +fn test_append_and_read_last_entry() { + let (_dir, storage) = create_temp_storage(); - let key = b"test_key".as_slice(); - let payload = b"Hello, world!".as_slice(); - storage.write(key, payload).expect("Failed to append entry"); + let key = b"test_key".as_slice(); + let payload = b"Hello, world!".as_slice(); + storage.write(key, payload).expect("Failed to append entry"); - let last_entry = storage.read_last_entry().unwrap().expect("No entry found"); - assert_eq!( - last_entry.as_slice(), - payload, - "Stored payload does not match expected value" - ); - } + let last_entry = storage.read_last_entry().unwrap().expect("No entry found"); + assert_eq!( + last_entry.as_slice(), + payload, + "Stored payload does not match expected value" + ); +} + +#[test] +fn test_multiple_appends_and_reads() { + let (_dir, storage) = create_temp_storage(); + + let entries = vec![ + (b"key1".as_slice(), b"First Entry".as_slice()), + (b"key2".as_slice(), b"Second Entry".as_slice()), + (b"key3".as_slice(), b"Third Entry".as_slice()), + ]; - #[test] - fn test_multiple_appends_and_reads() { - let (_dir, storage) = create_temp_storage(); - - let entries = vec![ - (b"key1".as_slice(), b"First Entry".as_slice()), - (b"key2".as_slice(), b"Second Entry".as_slice()), - (b"key3".as_slice(), b"Third Entry".as_slice()), - ]; - - for (key, payload) in &entries { - storage.write(key, payload).expect("Failed to append entry"); - } - - let last_entry = storage - .read_last_entry() - .unwrap() - .expect("No last entry found"); - assert_eq!( - last_entry.as_slice(), - entries.last().unwrap().1, - "Last entry does not match expected value" - ); + for (key, payload) in &entries { + storage.write(key, payload).expect("Failed to append entry"); } - #[test] - fn test_varying_payload_sizes() { - let (_dir, storage) = create_temp_storage(); - - let payloads = [ - vec![b'a'; 10], // Small payload - vec![b'b'; 1024], // Medium payload - vec![b'c'; 4096], - ]; - - for (i, payload) in payloads.iter().enumerate() { - storage - .write(format!("key{i}").as_bytes(), payload.as_slice()) - .expect("Failed to append entry"); - } - - let last_entry = storage - .read_last_entry() - .unwrap() - .expect("No last entry found"); - assert_eq!( - last_entry.as_slice(), - payloads.last().unwrap().as_slice(), - "Last entry payload does not match expected value" - ); + let last_entry = storage + .read_last_entry() + .unwrap() + .expect("No last entry found"); + assert_eq!( + last_entry.as_slice(), + entries.last().unwrap().1, + "Last entry does not match expected value" + ); +} + +#[test] +fn test_varying_payload_sizes() { + let (_dir, storage) = create_temp_storage(); + + let payloads = [ + vec![b'a'; 10], // Small payload + vec![b'b'; 1024], // Medium payload + vec![b'c'; 4096], + ]; + + for (i, payload) in payloads.iter().enumerate() { + storage + .write(format!("key{i}").as_bytes(), payload.as_slice()) + .expect("Failed to append entry"); } - #[test] - fn test_retrieve_entry_by_key() { - let (_dir, storage) = create_temp_storage(); + let last_entry = storage + .read_last_entry() + .unwrap() + .expect("No last entry found"); + assert_eq!( + last_entry.as_slice(), + payloads.last().unwrap().as_slice(), + "Last entry payload does not match expected value" + ); +} - let key = b"test_key".as_slice(); - let payload = b"Hello, world!".as_slice(); - storage.write(key, payload).expect("Failed to append entry"); +#[test] +fn test_retrieve_entry_by_key() { + let (_dir, storage) = create_temp_storage(); - let retrieved = storage.read(key).unwrap(); + let key = b"test_key".as_slice(); + let payload = b"Hello, world!".as_slice(); + storage.write(key, payload).expect("Failed to append entry"); - assert!( - retrieved.is_some(), - "Entry should be found by key, but got None" - ); + let retrieved = storage.read(key).unwrap(); - assert_eq!( - retrieved.unwrap().as_slice(), - payload, - "Retrieved payload does not match expected value" - ); + assert!( + retrieved.is_some(), + "Entry should be found by key, but got None" + ); + + assert_eq!( + retrieved.unwrap().as_slice(), + payload, + "Retrieved payload does not match expected value" + ); +} + +#[test] +fn test_update_entries_with_varying_lengths() { + let (_dir, storage) = create_temp_storage(); + + let key1 = b"key1".as_slice(); + let key2 = b"key2".as_slice(); + let key3 = b"key3".as_slice(); + + let initial_payload1 = b"Short".as_slice(); + let initial_payload2 = b"Medium length payload".as_slice(); + let initial_payload3 = b"Longer initial payload data".as_slice(); + + storage + .write(key1, initial_payload1) + .expect("Failed to append entry"); + storage + .write(key2, initial_payload2) + .expect("Failed to append entry"); + storage + .write(key3, initial_payload3) + .expect("Failed to append entry"); + + let updated_payload1 = b"Updated with longer data!".as_slice(); + let updated_payload2 = b"Short".as_slice(); + + storage + .write(key1, updated_payload1) + .expect("Failed to update entry"); + storage + .write(key2, updated_payload2) + .expect("Failed to update entry"); + + let retrieved1 = storage + .read(key1) + .unwrap() + .expect("Entry for key1 should be found"); + assert_eq!( + retrieved1.as_slice(), + updated_payload1, + "Latest version of key1 was not retrieved" + ); + + let retrieved2 = storage + .read(key2) + .unwrap() + .expect("Entry for key2 should be found"); + assert_eq!( + retrieved2.as_slice(), + updated_payload2, + "Latest version of key2 was not retrieved" + ); + + let retrieved3 = storage + .read(key3) + .unwrap() + .expect("Entry for key3 should be found"); + assert_eq!( + retrieved3.as_slice(), + initial_payload3, + "Key3 should remain unchanged" + ); +} + +#[test] +fn test_open_existing_storage() { + let dir = tempdir().expect("Failed to create temp dir"); + let path = dir.path().join("test_storage_existing.bin"); + + // Create the file first + { + let _storage = DataStore::open(&path).expect("Failed to create storage file"); } - #[test] - fn test_update_entries_with_varying_lengths() { - let (_dir, storage) = create_temp_storage(); + // Now attempt to open it with `open_existing` + let storage = DataStore::open_existing(&path).expect("Failed to open existing storage file"); + + // Ensure storage is accessible + let key = b"test_key".as_slice(); + let payload = b"Existing file test".as_slice(); + storage.write(key, payload).expect("Failed to write entry"); + + let retrieved = storage + .read(key) + .unwrap() + .expect("Entry should exist in storage"); + assert_eq!( + retrieved.as_slice(), + payload, + "Retrieved payload does not match expected value" + ); +} - let key1 = b"key1".as_slice(); - let key2 = b"key2".as_slice(); - let key3 = b"key3".as_slice(); +#[test] +fn test_open_existing_fails_for_missing_file() { + let dir = tempdir().expect("Failed to create temp dir"); + let path = dir.path().join("non_existent_storage.bin"); - let initial_payload1 = b"Short".as_slice(); - let initial_payload2 = b"Medium length payload".as_slice(); - let initial_payload3 = b"Longer initial payload data".as_slice(); + let result = DataStore::open_existing(&path); + assert!( + result.is_err(), + "Expected error when opening non-existent file" + ); +} - storage - .write(key1, initial_payload1) - .expect("Failed to append entry"); - storage - .write(key2, initial_payload2) - .expect("Failed to append entry"); - storage - .write(key3, initial_payload3) - .expect("Failed to append entry"); +#[test] +fn test_write_null_byte_fails() { + let (_dir, storage) = create_temp_storage(); - let updated_payload1 = b"Updated with longer data!".as_slice(); - let updated_payload2 = b"Short".as_slice(); + let key = b"test_key"; - storage - .write(key1, updated_payload1) - .expect("Failed to update entry"); - storage - .write(key2, updated_payload2) - .expect("Failed to update entry"); - - let retrieved1 = storage - .read(key1) - .unwrap() - .expect("Entry for key1 should be found"); - assert_eq!( - retrieved1.as_slice(), - updated_payload1, - "Latest version of key1 was not retrieved" - ); - - let retrieved2 = storage - .read(key2) - .unwrap() - .expect("Entry for key2 should be found"); - assert_eq!( - retrieved2.as_slice(), - updated_payload2, - "Latest version of key2 was not retrieved" - ); - - let retrieved3 = storage - .read(key3) - .unwrap() - .expect("Entry for key3 should be found"); - assert_eq!( - retrieved3.as_slice(), - initial_payload3, - "Key3 should remain unchanged" - ); - } + let result = storage.write(key, b"\x00"); - #[test] - fn test_open_existing_storage() { - let dir = tempdir().expect("Failed to create temp dir"); - let path = dir.path().join("test_storage_existing.bin"); - - // Create the file first - { - let _storage = DataStore::open(&path).expect("Failed to create storage file"); - } - - // Now attempt to open it with `open_existing` - let storage = - DataStore::open_existing(&path).expect("Failed to open existing storage file"); - - // Ensure storage is accessible - let key = b"test_key".as_slice(); - let payload = b"Existing file test".as_slice(); - storage.write(key, payload).expect("Failed to write entry"); - - let retrieved = storage - .read(key) - .unwrap() - .expect("Entry should exist in storage"); - assert_eq!( - retrieved.as_slice(), - payload, - "Retrieved payload does not match expected value" - ); - } + assert!( + result.is_err(), + "Expected error when writing a null-byte payload" + ); +} - #[test] - fn test_open_existing_fails_for_missing_file() { - let dir = tempdir().expect("Failed to create temp dir"); - let path = dir.path().join("non_existent_storage.bin"); +#[test] +fn test_read_with_key_hash() { + let (_dir, storage) = create_temp_storage(); + let key = b"test_key_for_hash_read"; + let payload = b"some data"; + storage.write(key, payload).expect("Write failed"); + + // Case 1: Read an existing key using its pre-computed hash + let key_hash = compute_hash(key); + let result = storage + .read_with_key_hash(key_hash) + .expect("Read with hash failed"); + + assert!(result.is_some(), "Expected to find an entry with the hash"); + assert_eq!(result.unwrap().as_slice(), payload); + + // Case 2: Attempt to read a key that doesn't exist + let missing_hash = compute_hash(b"a_key_that_was_never_written"); + let result_missing = storage + .read_with_key_hash(missing_hash) + .expect("Read with missing hash failed"); + + assert!( + result_missing.is_none(), + "Expected to get None for a missing hash" + ); +} - let result = DataStore::open_existing(&path); - assert!( - result.is_err(), - "Expected error when opening non-existent file" - ); - } +#[test] +fn test_exists_with_key_hash() { + let (_dir, storage) = create_temp_storage(); + let key = b"test_key_for_hash_exists"; + storage.write(key, b"some data").expect("Write failed"); + + let existing_hash = compute_hash(key); + let missing_hash = compute_hash(b"a_missing_key"); + + // Case 1: Check for a key that exists + assert!( + storage.exists_with_key_hash(existing_hash).unwrap(), + "Expected exists_with_key_hash to be true for an existing key" + ); + + // Case 2: Check for a key that does not exist + assert!( + !storage.exists_with_key_hash(missing_hash).unwrap(), + "Expected exists_with_key_hash to be false for a missing key" + ); +} - #[test] - fn test_write_null_byte_fails() { - let (_dir, storage) = create_temp_storage(); +#[test] +fn test_exists_with_key_hash_after_deletion() { + let (_dir, storage) = create_temp_storage(); + let key = b"key_to_be_deleted"; + storage.write(key, b"data").expect("Write failed"); - let key = b"test_key"; + let key_hash = compute_hash(key); - let result = storage.write(key, b"\x00"); + // Verify it exists before deletion + assert!(storage.exists_with_key_hash(key_hash).unwrap()); - assert!( - result.is_err(), - "Expected error when writing a null-byte payload" - ); - } + // Delete the key + storage.delete(key).expect("Delete failed"); + + // Verify it no longer exists + assert!(!storage.exists_with_key_hash(key_hash).unwrap()); } From e9bebf5c8d278534857f56f2e7c6f622f9f261d4 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 13:53:40 -0600 Subject: [PATCH 25/32] Fix Clippy warnings --- src/storage_engine/data_store.rs | 2 +- src/storage_engine/traits/reader.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 923df20f..cde9bc9e 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -832,7 +832,7 @@ impl DataStoreReader for DataStore { fn batch_read(&self, keys: &[&[u8]]) -> Result>> { let hashed_keys = compute_hash_batch(keys); - self.batch_read_hashed_keys(&hashed_keys, Some(&keys)) + self.batch_read_hashed_keys(&hashed_keys, Some(keys)) } fn batch_read_hashed_keys( diff --git a/src/storage_engine/traits/reader.rs b/src/storage_engine/traits/reader.rs index 100b5437..f389767f 100644 --- a/src/storage_engine/traits/reader.rs +++ b/src/storage_engine/traits/reader.rs @@ -108,11 +108,11 @@ pub trait DataStoreReader { /// an optional verification step to safeguard against hash collisions. /// /// * **Zero-copy**: Each `Some(EntryHandle)` provides a direct, zero-copy view - /// into the memory-mapped file. + /// into the memory-mapped file. /// * **High-performance**: Bypasses the key hashing step if hashes are already - /// available. + /// available. /// * **Thread-safe**: Acquires a single read lock for the entire batch - /// operation, minimizing contention. + /// operation, minimizing contention. /// /// # Parameters /// - `prehashed_keys`: A slice of `u64` key hashes to look up. From b120aa4d5663c35ab8caea3e40c2a30a92ee5c92 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 13:57:53 -0600 Subject: [PATCH 26/32] Fix additional Clippy warning --- src/storage_engine/data_store.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index cde9bc9e..98b059af 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -360,13 +360,13 @@ impl DataStore { let (tag, offset) = KeyIndexer::unpack(packed); // The crucial verification check, now centralized. - if let Some(non_hashed_key) = non_hashed_key { - if tag != KeyIndexer::tag_from_key(non_hashed_key) { - warn!( - "Tag mismatch detected for `non_hashed_key`, likely a hash collision or index corruption." - ); - return None; - } + if let Some(non_hashed_key) = non_hashed_key + && tag != KeyIndexer::tag_from_key(non_hashed_key) + { + warn!( + "Tag mismatch detected for `non_hashed_key`, likely a hash collision or index corruption." + ); + return None; } let offset = offset as usize; From 378d9cb862f2fed1467c05eb6d70ae5a89ba0b68 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 14:03:20 -0600 Subject: [PATCH 27/32] Add TODO --- src/storage_engine/digest/compute_hash.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/storage_engine/digest/compute_hash.rs b/src/storage_engine/digest/compute_hash.rs index 2eefcb99..da38d326 100644 --- a/src/storage_engine/digest/compute_hash.rs +++ b/src/storage_engine/digest/compute_hash.rs @@ -67,6 +67,8 @@ pub fn compute_hash_batch(keys: &[&[u8]]) -> Vec { // A plain loop beats an iterator here; it lets LLVM unroll/vectorize freely. let mut out = Vec::with_capacity(keys.len()); + + // TODO: For a large amount of keys, consider distributing the work with Rayon for k in keys { // xxh3_64 already uses SIMD internally where available. out.push(xxh3_64(k)); From 9b1a85aa9832b4f43638397076f98d2d5577e65c Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 14:25:59 -0600 Subject: [PATCH 28/32] Add optional `parallel` feature --- .github/workflows/rust-tests.yml | 42 ++++++++++++-- Cargo.lock | 1 + Cargo.toml | 2 + src/storage_engine/data_store.rs | 84 ++++++++++++++++++++++------ src/storage_engine/key_indexer.rs | 13 +++++ tests/parallel_iterator_tests.rs | 91 +++++++++++++++++++++++++++++++ 6 files changed, 212 insertions(+), 21 deletions(-) create mode 100644 tests/parallel_iterator_tests.rs diff --git a/.github/workflows/rust-tests.yml b/.github/workflows/rust-tests.yml index 785eca2c..108b6024 100644 --- a/.github/workflows/rust-tests.yml +++ b/.github/workflows/rust-tests.yml @@ -9,11 +9,25 @@ on: jobs: test: - name: Run Rust Tests (OS = ${{ matrix.os }}) + name: Test (OS=${{ matrix.os }}, Features=${{ matrix.name }}) runs-on: ${{ matrix.os }} strategy: + fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-latest] + include: + - name: "Default" + flags: "" + - name: "No Default Features" + flags: "--no-default-features" + - name: "Parallel" + flags: "--features parallel" + - name: "Expose Internal API" + flags: "--features expose-internal-api" + - name: "Parallel + Expose API" + flags: "--features=parallel,expose-internal-api" + - name: "All Features" + flags: "--all-features" steps: - name: Checkout repository @@ -22,8 +36,26 @@ jobs: - name: Install Rust uses: dtolnay/rust-toolchain@stable - - name: Build workspace - run: cargo build --workspace + # Added caching step to speed up dependency builds. + - name: Cache Cargo dependencies + uses: actions/cache@v4 + with: + path: | + ~/.cargo/bin/ + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + target/ + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}-${{ matrix.flags }} + restore-keys: | + ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}- - - name: Run tests - run: cargo test --workspace --all-targets --verbose + - name: Build + run: cargo build --workspace --all-targets ${{ matrix.flags }} + + - name: Test + run: cargo test --workspace --all-targets --verbose ${{ matrix.flags }} + + # Added step to ensure benchmarks compile. `--no-run` is important. + - name: Check benchmarks compile + run: cargo bench --workspace --no-run ${{ matrix.flags }} diff --git a/Cargo.lock b/Cargo.lock index c538a745..4b9975c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1509,6 +1509,7 @@ dependencies = [ "indoc", "memmap2", "rand", + "rayon", "serde", "serde_json", "serial_test", diff --git a/Cargo.toml b/Cargo.toml index 9113c0d3..4358fb80 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ memmap2 = "0.9.5" dashmap = "6.1.0" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } +rayon = { version = "1.10.0", optional = true } [dependencies.clap] version = "4.5.32" @@ -84,6 +85,7 @@ harness = false [features] default = [] expose-internal-api = [] +parallel = ["rayon"] [workspace] members = [ diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 98b059af..4a04f404 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -16,6 +16,9 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; use tracing::{debug, info, warn}; +#[cfg(feature = "parallel")] +use rayon::prelude::*; + /// Append-Only Storage Engine pub struct DataStore { file: Arc>>, @@ -25,6 +28,13 @@ pub struct DataStore { path: PathBuf, } +/// Provides a **consuming sequential** iterator over the valid entries. +/// +/// This allows a `DataStore` to be consumed to produce a sequential iterator. +/// For non-consuming iteration, iterate over a reference (`&storage`). +/// +/// The iterator produced is **sequential**. For parallel processing, +/// enable the `parallel` feature and use the `.par_iter_entries()` method instead. impl IntoIterator for DataStore { type Item = EntryHandle; type IntoIter = EntryIterator; @@ -34,22 +44,6 @@ impl IntoIterator for DataStore { } } -// TODO: Add feature to iterate via `rayon` -// pub struct ParallelEntryIter { -// entries: Arc<[EntryHandle]>, // or offsets -// } -// -// impl ParallelIterator for ParallelEntryIter { -// type Item = EntryHandle; -// -// fn drive_unindexed(self, consumer: C) -> C::Result -// where -// C: rayon::iter::plumbing::UnindexedConsumer, -// { -// rayon::slice::from_arc(&self.entries).into_par_iter().drive_unindexed(consumer) -// } -// } - impl From for DataStore { /// Creates an `DataStore` instance from a `PathBuf`. /// @@ -258,6 +252,64 @@ impl DataStore { EntryIterator::new(mmap_clone, tail_offset) } + /// Provides a parallel iterator over all valid entries in the storage. + /// + /// This method is only available when the `parallel` feature is enabled. + /// It leverages the Rayon crate to process entries across multiple threads, + /// which can be significantly faster for bulk operations on multi-core machines. + /// + /// The iterator is efficient, collecting only the necessary entry offsets first + /// and then constructing the `EntryHandle` objects in parallel. + /// + /// # Returns + /// - A Rayon `ParallelIterator` that yields `EntryHandle` items. + #[cfg(feature = "parallel")] + pub fn par_iter_entries(&self) -> impl ParallelIterator { + // First, acquire a read lock and collect all the packed offset values. + // This is a short, fast operation. + let key_indexer_guard = self.key_indexer.read().unwrap(); + let packed_values: Vec = key_indexer_guard.values().cloned().collect(); + drop(key_indexer_guard); // Release the lock as soon as possible. + + // Clone the mmap Arc once to be moved into the parallel iterator. + let mmap_arc = self.get_mmap_arc(); + + // Create a parallel iterator over the collected offsets. The `filter_map` + // operation is the part that will run in parallel across threads. + packed_values.into_par_iter().filter_map(move |packed| { + let (_tag, offset) = KeyIndexer::unpack(packed); + let offset = offset as usize; + + // This logic is a simplified, read-only version of `read_entry_with_context`. + // We perform basic bounds checks to ensure safety. + if offset + METADATA_SIZE > mmap_arc.len() { + return None; + } + + let metadata_bytes = &mmap_arc[offset..offset + METADATA_SIZE]; + let metadata = EntryMetadata::deserialize(metadata_bytes); + let entry_start = metadata.prev_offset as usize; + let entry_end = offset; + + if entry_start >= entry_end || entry_end > mmap_arc.len() { + return None; + } + + // Important: We must filter out tombstone entries, which are marked + // by a single null byte payload. + if entry_end - entry_start == 1 && &mmap_arc[entry_start..entry_end] == NULL_BYTE { + return None; + } + + // If all checks pass, construct and return the EntryHandle. + Some(EntryHandle { + mmap_arc: mmap_arc.clone(), // Each handle gets a clone of the Arc + range: entry_start..entry_end, + metadata, + }) + }) + } + /// Recovers the **latest valid chain** of entries from the storage file. /// /// This function **scans backward** through the file, verifying that each entry diff --git a/src/storage_engine/key_indexer.rs b/src/storage_engine/key_indexer.rs index 4944f110..e99df0e9 100644 --- a/src/storage_engine/key_indexer.rs +++ b/src/storage_engine/key_indexer.rs @@ -3,6 +3,7 @@ use crate::storage_engine::constants::*; use crate::storage_engine::digest::{Xxh3BuildHasher, compute_hash}; use memmap2::Mmap; use std::collections::hash_map::Entry; +use std::collections::hash_map::Values; use std::collections::{HashMap, HashSet}; /// Number of high bits reserved for collision-detection tag (16 bits). @@ -185,4 +186,16 @@ impl KeyIndexer { pub fn is_empty(&self) -> bool { self.index.is_empty() } + + /// Returns a memory-efficient iterator over the packed (tag|offset) values. + /// + /// This method is preferable to collecting all values into a `Vec` when the + /// index is large, as it avoids a large upfront memory allocation. The iterator + /// borrows the underlying index, so it must be used within the lifetime of the + /// `KeyIndexer`'s read lock. + /// + #[inline] + pub fn values(&self) -> Values<'_, u64, u64> { + self.index.values() + } } diff --git a/tests/parallel_iterator_tests.rs b/tests/parallel_iterator_tests.rs new file mode 100644 index 00000000..c6d356a0 --- /dev/null +++ b/tests/parallel_iterator_tests.rs @@ -0,0 +1,91 @@ +// This attribute ensures the entire file is only compiled and run when +// the "parallel" feature is enabled. +#![cfg(feature = "parallel")] + +use rayon::prelude::*; +use simd_r_drive::{ + DataStore, + traits::{DataStoreReader, DataStoreWriter}, +}; +use std::collections::HashSet; +use tempfile::tempdir; + +/// Helper function to create a temporary file for testing. +fn create_temp_storage() -> (tempfile::TempDir, DataStore) { + let dir = tempdir().expect("Failed to create temp dir"); + let path = dir.path().join("test_storage.bin"); + let storage = DataStore::open(&path).expect("Failed to open storage"); + (dir, storage) +} + +#[test] +fn test_par_iter_produces_correct_entries() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![ + (b"key1".as_slice(), b"payload1".as_slice()), + (b"key2".as_slice(), b"payload2".as_slice()), + (b"key3".as_slice(), b"payload3".as_slice()), + ]; + storage.batch_write(&entries).expect("Batch write failed"); + + // Use a HashSet to verify that the parallel iterator produces the exact + // same set of payloads as the sequential one, ignoring order. + let expected_payloads: HashSet> = storage + .iter_entries() + .map(|e| e.as_slice().to_vec()) + .collect(); + + let parallel_payloads: HashSet> = storage + .par_iter_entries() + .map(|e| e.as_slice().to_vec()) + .collect(); + + assert_eq!( + expected_payloads, parallel_payloads, + "Parallel iterator should produce the same set of entries as the sequential one" + ); + assert_eq!(parallel_payloads.len(), 3); +} + +#[test] +fn test_par_iter_skips_deleted_entries() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![ + (b"key1".as_slice(), b"payload1".as_slice()), + (b"key_to_delete".as_slice(), b"payload_to_delete".as_slice()), + (b"key3".as_slice(), b"payload3".as_slice()), + ]; + storage.batch_write(&entries).expect("Batch write failed"); + storage.delete(b"key_to_delete").expect("Delete failed"); + + // Collect all payloads found by the parallel iterator. + let found_payloads: Vec> = storage + .par_iter_entries() + .map(|e| e.as_slice().to_vec()) + .collect(); + + assert_eq!( + found_payloads.len(), + 2, + "Parallel iterator should not include deleted entries" + ); + + // Ensure the deleted payload is not present. + let deleted_payload = b"payload_to_delete".to_vec(); + assert!( + !found_payloads.contains(&deleted_payload), + "Deleted payload should not be found in parallel iteration results" + ); +} + +#[test] +fn test_par_iter_on_empty_store() { + let (_dir, storage) = create_temp_storage(); + + let count = storage.par_iter_entries().count(); + + assert_eq!( + count, 0, + "Parallel iterator should produce zero items for an empty store" + ); +} From 304e8b48fc81ac40dc2a08260cbb84ba5e0901f7 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 14:34:56 -0600 Subject: [PATCH 29/32] Update README --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 421dfb5f..cc4bbcf9 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ Can be used as a command line interface (CLI) app, or as a library in another ap - [Multiple Read Modes](#multiple-read-modes) - [Direct memory access](#direct-memory-access) - [Streaming](#streaming) + - [Parallel Iteration (via Rayon)](#parallel-iteration-via-rayon) - [SIMD Write & Query Acceleration](#simd-write--query-acceleration) - [Python Bindings and Experiments](#python-bindings-and-experiments) - [License](#license) @@ -207,6 +208,11 @@ This avoids high memory overhead while still leveraging `mmap` for efficient acc > ⚠️ Streaming reads are non-zero-copy since they are read through a buffer. +### Parallel Iteration (via Rayon) + +For high-throughput, bulk processing on multi-core machines, `SIMD R Drive` offers an optional parallel iterator. When the `parallel` feature is enabled, you can use the Rayon-powered `.par_iter_entries()` method to process all valid entries in the data store across multiple threads. + +This is ideal for data analytics, batch processing, or building in-memory caches where you need to scan the entire dataset as quickly as possible. ## SIMD Write & Query Acceleration From ad81d9915840a50d0524c70f9a9f8e1524299f14 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 14:37:00 -0600 Subject: [PATCH 30/32] Fix Clippy warnings --- src/storage_engine/data_store.rs | 2 +- tests/parallel_iterator_tests.rs | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index 4a04f404..974a0e00 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -297,7 +297,7 @@ impl DataStore { // Important: We must filter out tombstone entries, which are marked // by a single null byte payload. - if entry_end - entry_start == 1 && &mmap_arc[entry_start..entry_end] == NULL_BYTE { + if entry_end - entry_start == 1 && mmap_arc[entry_start..entry_end] == NULL_BYTE { return None; } diff --git a/tests/parallel_iterator_tests.rs b/tests/parallel_iterator_tests.rs index c6d356a0..18f023e2 100644 --- a/tests/parallel_iterator_tests.rs +++ b/tests/parallel_iterator_tests.rs @@ -3,10 +3,7 @@ #![cfg(feature = "parallel")] use rayon::prelude::*; -use simd_r_drive::{ - DataStore, - traits::{DataStoreReader, DataStoreWriter}, -}; +use simd_r_drive::{DataStore, traits::DataStoreWriter}; use std::collections::HashSet; use tempfile::tempdir; From a2310709284f6949dd99ed9a8482928235537f86 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 14:40:35 -0600 Subject: [PATCH 31/32] Add additional tests --- tests/parallel_iterator_tests.rs | 67 ++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/tests/parallel_iterator_tests.rs b/tests/parallel_iterator_tests.rs index 18f023e2..6263f1f2 100644 --- a/tests/parallel_iterator_tests.rs +++ b/tests/parallel_iterator_tests.rs @@ -86,3 +86,70 @@ fn test_par_iter_on_empty_store() { "Parallel iterator should produce zero items for an empty store" ); } + +#[test] +fn test_par_iter_yields_only_latest_version_of_updated_entry() { + let (_dir, storage) = create_temp_storage(); + + // Write initial versions of two keys + storage + .write(b"updated_key", b"version1") + .expect("Write failed"); + storage + .write(b"stable_key", b"stable_version") + .expect("Write failed"); + + // Update one of the keys + storage + .write(b"updated_key", b"version2_final") + .expect("Update failed"); + + // Collect the results from the parallel iterator + let final_payloads: HashSet> = storage + .par_iter_entries() + .map(|e| e.as_slice().to_vec()) + .collect(); + + // The iterator should yield two entries: the final version of the updated key + // and the stable key. + assert_eq!(final_payloads.len(), 2); + assert!(final_payloads.contains(&b"version2_final".to_vec())); + assert!(final_payloads.contains(&b"stable_version".to_vec())); + + // Crucially, the stale, older version should NOT be present. + assert!(!final_payloads.contains(&b"version1".to_vec())); +} + +#[test] +fn test_par_iter_excludes_entries_that_were_updated_then_deleted() { + let (_dir, storage) = create_temp_storage(); + + // Write and then update a key that we intend to delete + storage + .write(b"deleted_key", b"version1") + .expect("Write failed"); + storage + .write(b"deleted_key", b"version2") + .expect("Update failed"); + + // Write another key that will remain + storage + .write(b"stable_key", b"stable_version") + .expect("Write failed"); + + // Now, delete the key that has multiple versions + storage.delete(b"deleted_key").expect("Delete failed"); + + let final_payloads: HashSet> = storage + .par_iter_entries() + .map(|e| e.as_slice().to_vec()) + .collect(); + + // The iterator should only yield the one remaining stable key. + assert_eq!(final_payloads.len(), 1); + assert!(final_payloads.contains(&b"stable_version".to_vec())); + + // Assert that NEITHER version of the deleted key is present. + assert!(!final_payloads.contains(&b"version1".to_vec())); + assert!(!final_payloads.contains(&b"version2".to_vec())); +} From cab83fa8e970a4f1550060dd2bc75be86b40b512 Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Sat, 9 Aug 2025 14:52:16 -0600 Subject: [PATCH 32/32] Fix Clippy warnings --- tests/parallel_iterator_tests.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/parallel_iterator_tests.rs b/tests/parallel_iterator_tests.rs index 6263f1f2..1789a57c 100644 --- a/tests/parallel_iterator_tests.rs +++ b/tests/parallel_iterator_tests.rs @@ -113,11 +113,11 @@ fn test_par_iter_yields_only_latest_version_of_updated_entry() { // The iterator should yield two entries: the final version of the updated key // and the stable key. assert_eq!(final_payloads.len(), 2); - assert!(final_payloads.contains(&b"version2_final".to_vec())); - assert!(final_payloads.contains(&b"stable_version".to_vec())); + assert!(final_payloads.contains(b"version2_final".as_slice())); + assert!(final_payloads.contains(b"stable_version".as_slice())); // Crucially, the stale, older version should NOT be present. - assert!(!final_payloads.contains(&b"version1".to_vec())); + assert!(!final_payloads.contains(b"version1".as_slice())); } #[test] @@ -147,9 +147,9 @@ fn test_par_iter_excludes_entries_that_were_updated_then_deleted() { // The iterator should only yield the one remaining stable key. assert_eq!(final_payloads.len(), 1); - assert!(final_payloads.contains(&b"stable_version".to_vec())); + assert!(final_payloads.contains(b"stable_version".as_slice())); // Assert that NEITHER version of the deleted key is present. - assert!(!final_payloads.contains(&b"version1".to_vec())); - assert!(!final_payloads.contains(&b"version2".to_vec())); + assert!(!final_payloads.contains(b"version1".as_slice())); + assert!(!final_payloads.contains(b"version2".as_slice())); }