diff --git a/.github/workflows/rust-tests.yml b/.github/workflows/rust-tests.yml index 785eca2c..108b6024 100644 --- a/.github/workflows/rust-tests.yml +++ b/.github/workflows/rust-tests.yml @@ -9,11 +9,25 @@ on: jobs: test: - name: Run Rust Tests (OS = ${{ matrix.os }}) + name: Test (OS=${{ matrix.os }}, Features=${{ matrix.name }}) runs-on: ${{ matrix.os }} strategy: + fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-latest] + include: + - name: "Default" + flags: "" + - name: "No Default Features" + flags: "--no-default-features" + - name: "Parallel" + flags: "--features parallel" + - name: "Expose Internal API" + flags: "--features expose-internal-api" + - name: "Parallel + Expose API" + flags: "--features=parallel,expose-internal-api" + - name: "All Features" + flags: "--all-features" steps: - name: Checkout repository @@ -22,8 +36,26 @@ jobs: - name: Install Rust uses: dtolnay/rust-toolchain@stable - - name: Build workspace - run: cargo build --workspace + # Added caching step to speed up dependency builds. + - name: Cache Cargo dependencies + uses: actions/cache@v4 + with: + path: | + ~/.cargo/bin/ + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + target/ + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}-${{ matrix.flags }} + restore-keys: | + ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}- - - name: Run tests - run: cargo test --workspace --all-targets --verbose + - name: Build + run: cargo build --workspace --all-targets ${{ matrix.flags }} + + - name: Test + run: cargo test --workspace --all-targets --verbose ${{ matrix.flags }} + + # Added step to ensure benchmarks compile. `--no-run` is important. + - name: Check benchmarks compile + run: cargo bench --workspace --no-run ${{ matrix.flags }} diff --git a/Cargo.lock b/Cargo.lock index ba9fe49e..4b9975c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -960,6 +960,8 @@ dependencies = [ [[package]] name = "muxio" version = "0.9.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2e143d83f97695de4bd77bc811d6e542026cbdce3b690b9feeff8e9129e4969" dependencies = [ "chrono", "once_cell", @@ -969,6 +971,8 @@ dependencies = [ [[package]] name = "muxio-rpc-service" version = "0.9.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75a5f5807934e07012687f744b4e8bf10e2391c83c7e807a5f119d5f6f873233" dependencies = [ "async-trait", "futures", @@ -980,6 +984,8 @@ dependencies = [ [[package]] name = "muxio-rpc-service-caller" version = "0.9.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24337541fc4a5b0e08b8f41cfdb8861a82b9ad66d0772365d59d099463b04413" dependencies = [ "async-trait", "futures", @@ -991,6 +997,8 @@ dependencies = [ [[package]] name = "muxio-rpc-service-endpoint" version = "0.9.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c799f999c7d5dbfad3ebd16fe0d93425c32f7242dcf18cc0adc57862a4525773" dependencies = [ "async-trait", "futures", @@ -1004,6 +1012,8 @@ dependencies = [ [[package]] name = "muxio-tokio-rpc-client" version = "0.9.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5656d1fc8093efe90fd90dab6f4f9f61348b9871b64da6a25478a2576e894e17" dependencies = [ "async-trait", "bytes", @@ -1020,6 +1030,8 @@ dependencies = [ [[package]] name = "muxio-tokio-rpc-server" version = "0.9.0-alpha" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "770aec8718a85c4854f124c0e247a4ec9c983d38b560ccf4b4dada4feabfd5b1" dependencies = [ "async-trait", "axum", @@ -1485,7 +1497,7 @@ dependencies = [ [[package]] name = "simd-r-drive" -version = "0.11.0-alpha" +version = "0.12.0-alpha" dependencies = [ "async-trait", "bincode", @@ -1497,6 +1509,7 @@ dependencies = [ "indoc", "memmap2", "rand", + "rayon", "serde", "serde_json", "serial_test", @@ -1510,7 +1523,7 @@ dependencies = [ [[package]] name = "simd-r-drive-extensions" -version = "0.11.0-alpha" +version = "0.12.0-alpha" dependencies = [ "bincode", "doc-comment", @@ -1522,7 +1535,7 @@ dependencies = [ [[package]] name = "simd-r-drive-muxio-service-definition" -version = "0.11.0-alpha" +version = "0.12.0-alpha" dependencies = [ "bitcode", "muxio-rpc-service", @@ -1530,7 +1543,7 @@ dependencies = [ [[package]] name = "simd-r-drive-ws-client" -version = "0.11.0-alpha" +version = "0.12.0-alpha" dependencies = [ "async-trait", "muxio-rpc-service", @@ -1544,7 +1557,7 @@ dependencies = [ [[package]] name = "simd-r-drive-ws-server" -version = "0.11.0-alpha" +version = "0.12.0-alpha" dependencies = [ "clap", "indoc", @@ -2193,3 +2206,19 @@ dependencies = [ "quote", "syn", ] + +[[patch.unused]] +name = "muxio-rpc-service" +version = "0.10.0-alpha" + +[[patch.unused]] +name = "muxio-rpc-service-caller" +version = "0.10.0-alpha" + +[[patch.unused]] +name = "muxio-tokio-rpc-client" +version = "0.10.0-alpha" + +[[patch.unused]] +name = "muxio-tokio-rpc-server" +version = "0.10.0-alpha" diff --git a/Cargo.toml b/Cargo.toml index fd978900..4358fb80 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,12 +1,12 @@ [workspace.package] authors = ["Jeremy Harris "] -version = "0.11.0-alpha" +version = "0.12.0-alpha" edition = "2024" repository = "https://github.com/jzombie/rust-simd-r-drive" license = "Apache-2.0" categories = ["database-implementations", "data-structures", "filesystem"] keywords = ["storage-engine", "binary-storage", "append-only", "simd", "mmap"] -publish = true +publish = false [package] name = "simd-r-drive" @@ -22,9 +22,27 @@ publish.workspace = true # Inherit from workspace [workspace.dependencies] # Intra-workspace crates -simd-r-drive = { path = ".", version = "0.11.0-alpha" } -simd-r-drive-ws-client = { path = "./experiments/simd-r-drive-ws-client", version = "0.11.0-alpha" } -simd-r-drive-muxio-service-definition = { path = "./experiments/simd-r-drive-muxio-service-definition", version = "0.11.0-alpha" } +simd-r-drive = { path = ".", version = "0.12.0-alpha" } +simd-r-drive-ws-client = { path = "./experiments/simd-r-drive-ws-client", version = "0.12.0-alpha" } +simd-r-drive-muxio-service-definition = { path = "./experiments/simd-r-drive-muxio-service-definition", version = "0.12.0-alpha" } +muxio-tokio-rpc-client = "0.9.0-alpha" +muxio-tokio-rpc-server = "0.9.0-alpha" +muxio-rpc-service = "0.9.0-alpha" +muxio-rpc-service-caller = "0.9.0-alpha" + +# Third-party crates (note, not all dependencies are used in the base drive) +async-trait = "0.1.88" +bincode = "1.3.3" # TODO: Replace with `bitcode` +bitcode = "0.6.6" +clap = "4.5.40" +doc-comment = "0.3.3" +indoc = "2.0.6" +serde = "1.0.219" +tokio = "1.45.1" # Tokio is not used in base `SIMD R Drive`, only extensions +tempfile = "3.19.0" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" +walkdir = "2" [dependencies] async-trait = "0.1.88" @@ -34,6 +52,7 @@ memmap2 = "0.9.5" dashmap = "6.1.0" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } +rayon = { version = "1.10.0", optional = true } [dependencies.clap] version = "4.5.32" @@ -66,6 +85,7 @@ harness = false [features] default = [] expose-internal-api = [] +parallel = ["rayon"] [workspace] members = [ diff --git a/README.md b/README.md index 421dfb5f..cc4bbcf9 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ Can be used as a command line interface (CLI) app, or as a library in another ap - [Multiple Read Modes](#multiple-read-modes) - [Direct memory access](#direct-memory-access) - [Streaming](#streaming) + - [Parallel Iteration (via Rayon)](#parallel-iteration-via-rayon) - [SIMD Write & Query Acceleration](#simd-write--query-acceleration) - [Python Bindings and Experiments](#python-bindings-and-experiments) - [License](#license) @@ -207,6 +208,11 @@ This avoids high memory overhead while still leveraging `mmap` for efficient acc > ⚠️ Streaming reads are non-zero-copy since they are read through a buffer. +### Parallel Iteration (via Rayon) + +For high-throughput, bulk processing on multi-core machines, `SIMD R Drive` offers an optional parallel iterator. When the `parallel` feature is enabled, you can use the Rayon-powered `.par_iter_entries()` method to process all valid entries in the data store across multiple threads. + +This is ideal for data analytics, batch processing, or building in-memory caches where you need to scan the entire dataset as quickly as possible. ## SIMD Write & Query Acceleration diff --git a/experiments/bindings/python_(old_client)/pyproject.toml b/experiments/bindings/python_(old_client)/pyproject.toml index 232a53eb..a0d97026 100644 --- a/experiments/bindings/python_(old_client)/pyproject.toml +++ b/experiments/bindings/python_(old_client)/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "simd-r-drive-py" -version = "0.11.0-alpha" +version = "0.12.0-alpha" description = "SIMD-optimized append-only schema-less storage engine. Key-based binary storage in a single-file storage container." repository = "https://github.com/jzombie/rust-simd-r-drive" license = "Apache-2.0" diff --git a/experiments/simd-r-drive-muxio-service-definition/Cargo.toml b/experiments/simd-r-drive-muxio-service-definition/Cargo.toml index b80017ff..0529b725 100644 --- a/experiments/simd-r-drive-muxio-service-definition/Cargo.toml +++ b/experiments/simd-r-drive-muxio-service-definition/Cargo.toml @@ -11,6 +11,6 @@ keywords.workspace = true # Inherit from workspace publish.workspace = true # Inherit from workspace [dependencies] -bitcode = "0.6.6" -muxio-rpc-service = "0.9.0-alpha" +bitcode = { workspace = true } +muxio-rpc-service = { workspace = true } diff --git a/experiments/simd-r-drive-ws-client/Cargo.toml b/experiments/simd-r-drive-ws-client/Cargo.toml index ef7cc618..1d4dcfb1 100644 --- a/experiments/simd-r-drive-ws-client/Cargo.toml +++ b/experiments/simd-r-drive-ws-client/Cargo.toml @@ -13,9 +13,9 @@ publish.workspace = true # Inherit from workspace [dependencies] simd-r-drive = { workspace = true } simd-r-drive-muxio-service-definition = { workspace = true } -muxio-tokio-rpc-client = "0.9.0-alpha" -muxio-rpc-service = "0.9.0-alpha" -muxio-rpc-service-caller = "0.9.0-alpha" -tokio = "1.45.1" -tracing = "0.1.41" -async-trait = "0.1.88" +muxio-tokio-rpc-client = { workspace = true } +muxio-rpc-service = { workspace = true } +muxio-rpc-service-caller = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +async-trait = { workspace = true } diff --git a/experiments/simd-r-drive-ws-client/src/ws_client.rs b/experiments/simd-r-drive-ws-client/src/ws_client.rs index 07b04fde..e519c92e 100644 --- a/experiments/simd-r-drive-ws-client/src/ws_client.rs +++ b/experiments/simd-r-drive-ws-client/src/ws_client.rs @@ -39,6 +39,14 @@ impl AsyncDataStoreWriter for WsClient { unimplemented!("`write_stream` is not currently implemented"); } + async fn write_stream_with_key_hash( + &self, + _key_hash: u64, + _reader: &mut R, + ) -> Result { + unimplemented!("`write_stream_with_key_hash` is not currently implemented"); + } + async fn write(&self, key: &[u8], payload: &[u8]) -> Result { let response_params = Write::call( &self.rpc_client, @@ -52,6 +60,10 @@ impl AsyncDataStoreWriter for WsClient { Ok(response_params.tail_offset) } + async fn write_with_key_hash(&self, _key_hash: u64, _payload: &[u8]) -> Result { + unimplemented!("`write_with_key_hash` is not currently implemented"); + } + async fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result { let response_params = BatchWrite::call( &self.rpc_client, @@ -67,6 +79,14 @@ impl AsyncDataStoreWriter for WsClient { Ok(response_params.tail_offset) } + async fn batch_write_with_key_hashes( + &self, + _prehashed_keys: Vec<(u64, &[u8])>, + _allow_null_bytes: bool, + ) -> Result { + unimplemented!("`batch_write_with_key_hashes` is not currently implemented"); + } + async fn rename(&self, _old_key: &[u8], _new_key: &[u8]) -> Result { unimplemented!("`rename` is not currently implemented"); } @@ -85,6 +105,14 @@ impl AsyncDataStoreWriter for WsClient { Ok(resp.tail_offset) } + + async fn batch_delete(&self, _keys: &[&[u8]]) -> Result { + unimplemented!("`batch_delete` is not currently implemented"); + } + + async fn batch_delete_key_hashes(&self, _prehashed_keys: &[u64]) -> Result { + unimplemented!("`batch_delete_key_hashes` is not currently implemented"); + } } #[async_trait::async_trait] @@ -99,6 +127,10 @@ impl AsyncDataStoreReader for WsClient { Ok(response_params.exists) } + async fn exists_with_key_hash(&self, _prehashed_key: u64) -> Result { + unimplemented!("`exists_with_key_hash` is not currently implemented"); + } + async fn read(&self, key: &[u8]) -> Result> { let response_params = Read::call(&self.rpc_client, ReadRequestParams { key: key.to_vec() }).await?; @@ -106,6 +138,13 @@ impl AsyncDataStoreReader for WsClient { Ok(response_params.entry_payload) } + async fn read_with_key_hash( + &self, + _prehashed_key: u64, + ) -> Result> { + unimplemented!("`read_with_key_hash` is not currently implemented"); + } + async fn read_last_entry(&self) -> Result> { unimplemented!("`read_last_entry` is not currently implemented"); } @@ -122,6 +161,14 @@ impl AsyncDataStoreReader for WsClient { Ok(batch_read_result.entries_payloads) } + async fn batch_read_hashed_keys( + &self, + _prehashed_keys: &[u64], + _non_hashed_keys: Option<&[&[u8]]>, + ) -> Result>> { + unimplemented!("`batch_read_hashed_keys` is not currently implemented"); + } + async fn read_metadata(&self, _key: &[u8]) -> Result> { unimplemented!("`read_metadata` is not currently implemented"); } diff --git a/experiments/simd-r-drive-ws-server/Cargo.toml b/experiments/simd-r-drive-ws-server/Cargo.toml index 51f6020c..d2ff2e83 100644 --- a/experiments/simd-r-drive-ws-server/Cargo.toml +++ b/experiments/simd-r-drive-ws-server/Cargo.toml @@ -13,10 +13,10 @@ publish.workspace = true # Inherit from workspace [dependencies] simd-r-drive = { workspace = true } simd-r-drive-muxio-service-definition = { workspace = true } -muxio-tokio-rpc-server = "0.9.0-alpha" -muxio-rpc-service = "0.9.0-alpha" -tokio = "1.45.1" -tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } -tracing = "0.1.41" -clap = { version = "4.5.40", features = ["derive"] } -indoc = "2.0.6" +muxio-tokio-rpc-server = { workspace = true } +muxio-rpc-service = { workspace = true } +tokio = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing = { workspace = true } +clap = { workspace = true, features = ["derive"] } +indoc = { workspace = true } diff --git a/extensions/Cargo.toml b/extensions/Cargo.toml index c1722f18..a0b852c2 100644 --- a/extensions/Cargo.toml +++ b/extensions/Cargo.toml @@ -11,11 +11,11 @@ keywords.workspace = true # Inherit from workspace publish.workspace = true # Inherit from workspace [dependencies] -bincode = "1.3.3" -serde = { version = "1.0.219", features = ["derive"] } +bincode = { workspace = true } +serde = { workspace = true, features = ["derive"] } simd-r-drive = { workspace = true } -walkdir = "2" +walkdir = { workspace = true} [dev-dependencies] -doc-comment = "0.3.3" -tempfile = "3.19.0" +doc-comment = { workspace = true } +tempfile = { workspace = true } diff --git a/src/lib.rs b/src/lib.rs index 070516f5..7cabbd4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -132,3 +132,5 @@ pub use storage_engine::digest::*; pub use storage_engine::*; pub mod utils; + +pub use storage_engine::NULL_BYTE; diff --git a/src/storage_engine.rs b/src/storage_engine.rs index a1d123fe..bea45024 100644 --- a/src/storage_engine.rs +++ b/src/storage_engine.rs @@ -1,4 +1,5 @@ mod constants; +pub use constants::NULL_BYTE; use constants::*; mod data_store; diff --git a/src/storage_engine/data_store.rs b/src/storage_engine/data_store.rs index ab625306..974a0e00 100644 --- a/src/storage_engine/data_store.rs +++ b/src/storage_engine/data_store.rs @@ -16,6 +16,9 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; use tracing::{debug, info, warn}; +#[cfg(feature = "parallel")] +use rayon::prelude::*; + /// Append-Only Storage Engine pub struct DataStore { file: Arc>>, @@ -25,6 +28,13 @@ pub struct DataStore { path: PathBuf, } +/// Provides a **consuming sequential** iterator over the valid entries. +/// +/// This allows a `DataStore` to be consumed to produce a sequential iterator. +/// For non-consuming iteration, iterate over a reference (`&storage`). +/// +/// The iterator produced is **sequential**. For parallel processing, +/// enable the `parallel` feature and use the `.par_iter_entries()` method instead. impl IntoIterator for DataStore { type Item = EntryHandle; type IntoIter = EntryIterator; @@ -242,6 +252,64 @@ impl DataStore { EntryIterator::new(mmap_clone, tail_offset) } + /// Provides a parallel iterator over all valid entries in the storage. + /// + /// This method is only available when the `parallel` feature is enabled. + /// It leverages the Rayon crate to process entries across multiple threads, + /// which can be significantly faster for bulk operations on multi-core machines. + /// + /// The iterator is efficient, collecting only the necessary entry offsets first + /// and then constructing the `EntryHandle` objects in parallel. + /// + /// # Returns + /// - A Rayon `ParallelIterator` that yields `EntryHandle` items. + #[cfg(feature = "parallel")] + pub fn par_iter_entries(&self) -> impl ParallelIterator { + // First, acquire a read lock and collect all the packed offset values. + // This is a short, fast operation. + let key_indexer_guard = self.key_indexer.read().unwrap(); + let packed_values: Vec = key_indexer_guard.values().cloned().collect(); + drop(key_indexer_guard); // Release the lock as soon as possible. + + // Clone the mmap Arc once to be moved into the parallel iterator. + let mmap_arc = self.get_mmap_arc(); + + // Create a parallel iterator over the collected offsets. The `filter_map` + // operation is the part that will run in parallel across threads. + packed_values.into_par_iter().filter_map(move |packed| { + let (_tag, offset) = KeyIndexer::unpack(packed); + let offset = offset as usize; + + // This logic is a simplified, read-only version of `read_entry_with_context`. + // We perform basic bounds checks to ensure safety. + if offset + METADATA_SIZE > mmap_arc.len() { + return None; + } + + let metadata_bytes = &mmap_arc[offset..offset + METADATA_SIZE]; + let metadata = EntryMetadata::deserialize(metadata_bytes); + let entry_start = metadata.prev_offset as usize; + let entry_end = offset; + + if entry_start >= entry_end || entry_end > mmap_arc.len() { + return None; + } + + // Important: We must filter out tombstone entries, which are marked + // by a single null byte payload. + if entry_end - entry_start == 1 && mmap_arc[entry_start..entry_end] == NULL_BYTE { + return None; + } + + // If all checks pass, construct and return the EntryHandle. + Some(EntryHandle { + mmap_arc: mmap_arc.clone(), // Each handle gets a clone of the Arc + range: entry_start..entry_end, + metadata, + }) + }) + } + /// Recovers the **latest valid chain** of entries from the storage file. /// /// This function **scans backward** through the file, verifying that each entry @@ -316,184 +384,6 @@ impl DataStore { Ok(best_valid_offset.unwrap_or(0)) } - /// Writes an entry using a **precomputed key hash** and a streaming `Read` source. - /// - /// This is a **low-level** method that operates like `write_stream`, but requires - /// the key to be hashed beforehand. It is primarily used internally to avoid - /// redundant hash computations when writing multiple entries. - /// - /// # Parameters: - /// - `key_hash`: The **precomputed hash** of the key. - /// - `reader`: A **streaming reader** (`Read` trait) supplying the entry's content. - /// - /// # Returns: - /// - `Ok(offset)`: The file offset where the entry was written. - /// - `Err(std::io::Error)`: If a write or I/O operation fails. - pub fn write_stream_with_key_hash( - &self, - key_hash: u64, - reader: &mut R, - ) -> Result { - let mut file = self - .file - .write() - .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?; - let prev_offset = self.tail_offset.load(Ordering::Acquire); - - let mut buffer = vec![0; WRITE_STREAM_BUFFER_SIZE]; - let mut total_written = 0; - let mut checksum_state = crc32fast::Hasher::new(); - let mut is_null_only = true; - - while let Ok(bytes_read) = reader.read(&mut buffer) { - if bytes_read == 0 { - break; - } - - if buffer[..bytes_read].iter().any(|&b| b != NULL_BYTE[0]) { - is_null_only = false; - } - - file.write_all(&buffer[..bytes_read])?; - checksum_state.update(&buffer[..bytes_read]); - total_written += bytes_read; - } - - if total_written > 0 && is_null_only { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "NULL-byte-only streams cannot be written directly.", - )); - } - - let checksum = checksum_state.finalize().to_le_bytes(); - let metadata = EntryMetadata { - key_hash, - prev_offset, - checksum, - }; - file.write_all(&metadata.serialize())?; - file.flush()?; - - let tail_offset = prev_offset + total_written as u64 + METADATA_SIZE as u64; - self.reindex( - &file, - &[(key_hash, tail_offset - METADATA_SIZE as u64)], - tail_offset, - None, - )?; - Ok(tail_offset) - } - - /// Writes an entry using a **precomputed key hash** and a payload. - /// - /// This method is a **low-level** alternative to `write()`, allowing direct - /// specification of the key hash. It is mainly used for optimized workflows - /// where the key hash is already known, avoiding redundant computations. - /// - /// # Parameters: - /// - `key_hash`: The **precomputed hash** of the key. - /// - `payload`: The **data payload** to be stored. - /// - /// # Returns: - /// - `Ok(offset)`: The file offset where the entry was written. - /// - `Err(std::io::Error)`: If a write operation fails. - /// - /// # Notes: - /// - The caller is responsible for ensuring that `key_hash` is correctly computed. - /// - This method **locks the file for writing** to maintain consistency. - /// - If writing **multiple entries**, consider using `batch_write_hashed_payloads()`. - pub fn write_with_key_hash(&self, key_hash: u64, payload: &[u8]) -> Result { - self.batch_write_hashed_payloads(vec![(key_hash, payload)], false) - } - - /// Writes multiple key-value pairs as a **single transaction**, using precomputed key hashes. - /// - /// This method efficiently appends multiple entries in a **batch operation**, - /// reducing lock contention and improving performance for bulk writes. - /// - /// # Parameters: - /// - `hashed_payloads`: A **vector of precomputed key hashes and payloads**, where: - /// - `key_hash`: The **precomputed hash** of the key. - /// - `payload`: The **data payload** to be stored. - /// - /// # Returns: - /// - `Ok(final_offset)`: The file offset after all writes. - /// - `Err(std::io::Error)`: If a write operation fails. - /// - /// # Notes: - /// - **File locking is performed only once** for all writes, improving efficiency. - /// - If an entry's `payload` is empty, an error is returned. - /// - This method uses **SIMD-accelerated memory copy (`simd_copy`)** to optimize write - /// performance. - /// - **Metadata (checksums, offsets) is written after payloads** to ensure data integrity. - /// - After writing, the memory-mapped file (`mmap`) is **remapped** to reflect updates. - /// - /// # Efficiency Considerations: - /// - **Faster than multiple `write()` calls**, since it reduces lock contention. - /// - Suitable for **bulk insertions** where key hashes are known beforehand. - /// - If keys are available but not hashed, use `batch_write()` instead. - pub fn batch_write_hashed_payloads( - &self, - hashed_payloads: Vec<(u64, &[u8])>, - allow_null_bytes: bool, - ) -> Result { - let mut file = self - .file - .write() - .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?; - - let mut buffer = Vec::new(); - let mut tail_offset = self.tail_offset.load(Ordering::Acquire); - - let mut key_hash_offsets: Vec<(u64, u64)> = Vec::with_capacity(hashed_payloads.len()); - let mut deleted_keys: HashSet = HashSet::new(); - - for (key_hash, payload) in hashed_payloads { - if payload == NULL_BYTE { - if !allow_null_bytes { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "NULL-byte payloads cannot be written directly.", - )); - } - - deleted_keys.insert(key_hash); - } - - if payload.is_empty() { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "Payload cannot be empty.", - )); - } - - let prev_offset = tail_offset; - let checksum = compute_checksum(payload); - let metadata = EntryMetadata { - key_hash, - prev_offset, - checksum, - }; - let payload_len = payload.len(); - - let mut entry: Vec = vec![0u8; payload_len + METADATA_SIZE]; - simd_copy(&mut entry[..payload.len()], payload); - entry[payload.len()..].copy_from_slice(&metadata.serialize()); - buffer.extend_from_slice(&entry); - - tail_offset += entry.len() as u64; - key_hash_offsets.push((key_hash, tail_offset - METADATA_SIZE as u64)); - } - - file.write_all(&buffer)?; - file.flush()?; - - self.reindex(&file, &key_hash_offsets, tail_offset, Some(&deleted_keys))?; - - Ok(self.tail_offset.load(Ordering::Acquire)) - } - /// Performs the core logic of reading an entry from the store. /// /// This private helper centralizes the logic for both `read` and `batch_read`. @@ -513,7 +403,7 @@ impl DataStore { #[inline] fn read_entry_with_context<'a>( &self, - key: &[u8], + non_hashed_key: Option<&[u8]>, key_hash: u64, mmap_arc: &Arc, key_indexer_guard: &RwLockReadGuard<'a, KeyIndexer>, @@ -522,8 +412,12 @@ impl DataStore { let (tag, offset) = KeyIndexer::unpack(packed); // The crucial verification check, now centralized. - if tag != KeyIndexer::tag_from_key(key) { - warn!("Tag mismatch detected for key, likely a hash collision or index corruption."); + if let Some(non_hashed_key) = non_hashed_key + && tag != KeyIndexer::tag_from_key(non_hashed_key) + { + warn!( + "Tag mismatch detected for `non_hashed_key`, likely a hash collision or index corruption." + ); return None; } @@ -709,16 +603,135 @@ impl DataStoreWriter for DataStore { self.write_stream_with_key_hash(key_hash, reader) } + fn write_stream_with_key_hash(&self, key_hash: u64, reader: &mut R) -> Result { + let mut file = self + .file + .write() + .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?; + let prev_offset = self.tail_offset.load(Ordering::Acquire); + + let mut buffer = vec![0; WRITE_STREAM_BUFFER_SIZE]; + let mut total_written = 0; + let mut checksum_state = crc32fast::Hasher::new(); + let mut is_null_only = true; + + while let Ok(bytes_read) = reader.read(&mut buffer) { + if bytes_read == 0 { + break; + } + + if buffer[..bytes_read].iter().any(|&b| b != NULL_BYTE[0]) { + is_null_only = false; + } + + file.write_all(&buffer[..bytes_read])?; + checksum_state.update(&buffer[..bytes_read]); + total_written += bytes_read; + } + + if total_written > 0 && is_null_only { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "NULL-byte-only streams cannot be written directly.", + )); + } + + let checksum = checksum_state.finalize().to_le_bytes(); + let metadata = EntryMetadata { + key_hash, + prev_offset, + checksum, + }; + file.write_all(&metadata.serialize())?; + file.flush()?; + + let tail_offset = prev_offset + total_written as u64 + METADATA_SIZE as u64; + self.reindex( + &file, + &[(key_hash, tail_offset - METADATA_SIZE as u64)], + tail_offset, + None, + )?; + Ok(tail_offset) + } + fn write(&self, key: &[u8], payload: &[u8]) -> Result { let key_hash = compute_hash(key); self.write_with_key_hash(key_hash, payload) } + fn write_with_key_hash(&self, key_hash: u64, payload: &[u8]) -> Result { + self.batch_write_with_key_hashes(vec![(key_hash, payload)], false) + } + + // TODO: Consider change signature to: fn batch_write(&self, entries: Vec<(Vec, Vec)>) -> Result { fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result { let (keys, payloads): (Vec<_>, Vec<_>) = entries.iter().cloned().unzip(); let hashes = compute_hash_batch(&keys); let hashed_entries = hashes.into_iter().zip(payloads).collect::>(); - self.batch_write_hashed_payloads(hashed_entries, false) + self.batch_write_with_key_hashes(hashed_entries, false) + } + + // TODO: Consider change `prehashed_keys: Vec<(u64, &[u8])>` to `prehashed_keys: Vec<(u64, Vec)>` + fn batch_write_with_key_hashes( + &self, + prehashed_keys: Vec<(u64, &[u8])>, + allow_null_bytes: bool, + ) -> Result { + let mut file = self + .file + .write() + .map_err(|_| std::io::Error::other("Failed to acquire file lock"))?; + + let mut buffer = Vec::new(); + let mut tail_offset = self.tail_offset.load(Ordering::Acquire); + + let mut key_hash_offsets: Vec<(u64, u64)> = Vec::with_capacity(prehashed_keys.len()); + let mut deleted_keys: HashSet = HashSet::new(); + + for (key_hash, payload) in prehashed_keys { + if payload == NULL_BYTE { + if !allow_null_bytes { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "NULL-byte payloads cannot be written directly.", + )); + } + + deleted_keys.insert(key_hash); + } + + if payload.is_empty() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Payload cannot be empty.", + )); + } + + let prev_offset = tail_offset; + let checksum = compute_checksum(payload); + let metadata = EntryMetadata { + key_hash, + prev_offset, + checksum, + }; + let payload_len = payload.len(); + + let mut entry: Vec = vec![0u8; payload_len + METADATA_SIZE]; + simd_copy(&mut entry[..payload.len()], payload); + entry[payload.len()..].copy_from_slice(&metadata.serialize()); + buffer.extend_from_slice(&entry); + + tail_offset += entry.len() as u64; + key_hash_offsets.push((key_hash, tail_offset - METADATA_SIZE as u64)); + } + + file.write_all(&buffer)?; + file.flush()?; + + self.reindex(&file, &key_hash_offsets, tail_offset, Some(&deleted_keys))?; + + Ok(self.tail_offset.load(Ordering::Acquire)) } fn rename(&self, old_key: &[u8], new_key: &[u8]) -> Result { @@ -766,8 +779,42 @@ impl DataStoreWriter for DataStore { } fn delete(&self, key: &[u8]) -> Result { - let key_hash = compute_hash(key); - self.batch_write_hashed_payloads(vec![(key_hash, &NULL_BYTE)], true) + self.batch_delete(&[key]) + } + + fn batch_delete(&self, keys: &[&[u8]]) -> Result { + let key_hashes = compute_hash_batch(keys); + self.batch_delete_key_hashes(&key_hashes) + } + + fn batch_delete_key_hashes(&self, prehashed_keys: &[u64]) -> Result { + // First, check which keys actually exist to avoid writing useless tombstones. + let keys_to_delete: Vec = { + let key_indexer_guard = self + .key_indexer + .read() + .map_err(|_| Error::other("Key-index lock poisoned during batch_delete check"))?; + + prehashed_keys + .iter() + .filter(|&&key_hash| key_indexer_guard.get_packed(&key_hash).is_some()) + .cloned() + .collect() + }; + + // If no keys were found to delete, we can exit early without any file I/O. + if keys_to_delete.is_empty() { + return Ok(self.tail_offset.load(Ordering::Acquire)); + } + + // Prepare the delete operations (a key hash + a null byte payload). + let delete_ops: Vec<(u64, &[u8])> = keys_to_delete + .iter() + .map(|&key_hash| (key_hash, &NULL_BYTE as &[u8])) + .collect(); + + // Use the underlying batch write method, allowing null bytes for tombstones. + self.batch_write_with_key_hashes(delete_ops, true) } } @@ -778,6 +825,11 @@ impl DataStoreReader for DataStore { Ok(self.read(key)?.is_some()) } + fn exists_with_key_hash(&self, prehashed_key: u64) -> Result { + // This is a lightweight wrapper around the read method, just like exists(). + Ok(self.read_with_key_hash(prehashed_key)?.is_some()) + } + fn read(&self, key: &[u8]) -> Result> { let key_hash = compute_hash(key); let key_indexer_guard = self @@ -786,7 +838,19 @@ impl DataStoreReader for DataStore { .map_err(|_| Error::other("key-index lock poisoned"))?; let mmap_arc = self.get_mmap_arc(); - Ok(self.read_entry_with_context(key, key_hash, &mmap_arc, &key_indexer_guard)) + Ok(self.read_entry_with_context(Some(key), key_hash, &mmap_arc, &key_indexer_guard)) + } + + fn read_with_key_hash(&self, prehashed_key: u64) -> Result> { + let key_indexer_guard = self + .key_indexer + .read() + .map_err(|_| Error::other("key-index lock poisoned"))?; + let mmap_arc = self.get_mmap_arc(); + + // Call the core logic with `None` for the key, as we are only using the hash + // and want to skip the tag verification check. + Ok(self.read_entry_with_context(None, prehashed_key, &mmap_arc, &key_indexer_guard)) } fn read_last_entry(&self) -> Result> { @@ -818,21 +882,59 @@ impl DataStoreReader for DataStore { } fn batch_read(&self, keys: &[&[u8]]) -> Result>> { + let hashed_keys = compute_hash_batch(keys); + + self.batch_read_hashed_keys(&hashed_keys, Some(keys)) + } + + fn batch_read_hashed_keys( + &self, + prehashed_keys: &[u64], + non_hashed_keys: Option<&[&[u8]]>, + ) -> Result>> { let mmap_arc = self.get_mmap_arc(); let key_indexer_guard = self .key_indexer .read() .map_err(|_| Error::other("Key-index lock poisoned during `batch_read`"))?; - let hashes = compute_hash_batch(keys); + // Use a match to handle the two possible scenarios + let results = match non_hashed_keys { + // Case 1: We have the original keys for tag verification. + Some(keys) => { + // Good practice to ensure lengths match. + if keys.len() != prehashed_keys.len() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Mismatched lengths for hashed and non-hashed keys.", + )); + } - let results = hashes - .into_iter() - .zip(keys.iter()) - .map(|(key_hash, &key)| { - self.read_entry_with_context(key, key_hash, &mmap_arc, &key_indexer_guard) - }) - .collect(); + prehashed_keys + .iter() + .zip(keys.iter()) + .map(|(key_hash, &key)| { + // Correctly pass `Some(key)` for verification + self.read_entry_with_context( + Some(key), + *key_hash, + &mmap_arc, + &key_indexer_guard, + ) + }) + .collect() + } + // Case 2: We only have the hashes and must skip tag verification. + None => { + prehashed_keys + .iter() + .map(|key_hash| { + // Correctly pass `None` as we don't have the original key + self.read_entry_with_context(None, *key_hash, &mmap_arc, &key_indexer_guard) + }) + .collect() + } + }; Ok(results) } diff --git a/src/storage_engine/digest/compute_hash.rs b/src/storage_engine/digest/compute_hash.rs index 6b829c33..da38d326 100644 --- a/src/storage_engine/digest/compute_hash.rs +++ b/src/storage_engine/digest/compute_hash.rs @@ -37,7 +37,7 @@ pub fn compute_hash(key: &[u8]) -> u64 { /// * call the hasher exactly **once** from the high-level API, /// * pre-allocate the `Vec` only once, /// * hand the resulting `(hash, payload)` tuples straight to -/// `batch_write_hashed_payloads`, keeping the critical section (the `RwLock`) +/// `batch_write_with_key_hashes`, keeping the critical section (the `RwLock`) /// as small as possible. /// /// # Parameters @@ -67,6 +67,8 @@ pub fn compute_hash_batch(keys: &[&[u8]]) -> Vec { // A plain loop beats an iterator here; it lets LLVM unroll/vectorize freely. let mut out = Vec::with_capacity(keys.len()); + + // TODO: For a large amount of keys, consider distributing the work with Rayon for k in keys { // xxh3_64 already uses SIMD internally where available. out.push(xxh3_64(k)); diff --git a/src/storage_engine/key_indexer.rs b/src/storage_engine/key_indexer.rs index 4944f110..e99df0e9 100644 --- a/src/storage_engine/key_indexer.rs +++ b/src/storage_engine/key_indexer.rs @@ -3,6 +3,7 @@ use crate::storage_engine::constants::*; use crate::storage_engine::digest::{Xxh3BuildHasher, compute_hash}; use memmap2::Mmap; use std::collections::hash_map::Entry; +use std::collections::hash_map::Values; use std::collections::{HashMap, HashSet}; /// Number of high bits reserved for collision-detection tag (16 bits). @@ -185,4 +186,16 @@ impl KeyIndexer { pub fn is_empty(&self) -> bool { self.index.is_empty() } + + /// Returns a memory-efficient iterator over the packed (tag|offset) values. + /// + /// This method is preferable to collecting all values into a `Vec` when the + /// index is large, as it avoids a large upfront memory allocation. The iterator + /// borrows the underlying index, so it must be used within the lifetime of the + /// `KeyIndexer`'s read lock. + /// + #[inline] + pub fn values(&self) -> Values<'_, u64, u64> { + self.index.values() + } } diff --git a/src/storage_engine/traits/reader.rs b/src/storage_engine/traits/reader.rs index edb034b4..f389767f 100644 --- a/src/storage_engine/traits/reader.rs +++ b/src/storage_engine/traits/reader.rs @@ -20,6 +20,22 @@ pub trait DataStoreReader { /// - `Err(std::io::Error)`: On I/O failure. fn exists(&self, key: &[u8]) -> Result; + /// Checks whether a key with a pre-computed hash exists in the store. + /// + /// This is a more direct version of [`Self::exists`] that skips the hashing step, + /// making it faster if the hash is already known. Because the original key is not + /// provided, this check does not perform tag verification and relies solely on the + /// hash's presence in the index. + /// + /// # Parameters + /// - `prehashed_key`: The **pre-computed hash** of the key to check. + /// + /// # Returns + /// - `Ok(true)` if the key hash exists in the index. + /// - `Ok(false)` if the key hash is absent. + /// - `Err(std::io::Error)`: On I/O failure. + fn exists_with_key_hash(&self, prehashed_key: u64) -> Result; + /// Retrieves the most recent value associated with a given key. /// /// This method **efficiently looks up a key** using a fast in-memory index, @@ -37,6 +53,25 @@ pub trait DataStoreReader { /// - The returned `EntryHandle` provides zero-copy access to the stored data. fn read(&self, key: &[u8]) -> Result>; + /// Retrieves the most recent value associated with a pre-computed key hash. + /// + /// This is a low-level alternative to [`Self::read`] that looks up an entry using + /// only its hash, bypassing the hashing step. + /// + /// # Warning + /// This method does **not** perform tag verification, as the original key is not + /// provided. This means that in the rare event of a hash collision, this function + /// could return the entry for a different key. + /// + /// # Parameters + /// - `prehashed_key`: The **pre-computed hash** of the key to retrieve. + /// + /// # Returns + /// - `Ok(Some(EntryHandle))`: Handle to the entry if found. + /// - `Ok(None)`: If the key hash does not exist or is deleted. + /// - `Err(std::io::Error)`: On I/O failure. + fn read_with_key_hash(&self, prehashed_key: u64) -> Result>; + /// Retrieves the last entry written to the file. /// /// # Returns: @@ -65,6 +100,43 @@ pub trait DataStoreReader { /// - `Err(std::io::Error)`: On I/O failure. fn batch_read(&self, keys: &[&[u8]]) -> Result>>; + /// Reads many keys in one shot using pre-computed hashes. + /// + /// This is a lower-level, high-performance version of [`Self::batch_read`]. + /// It is designed for scenarios where the caller has already computed the key + /// hashes and wants to avoid the overhead of re-hashing. The method offers + /// an optional verification step to safeguard against hash collisions. + /// + /// * **Zero-copy**: Each `Some(EntryHandle)` provides a direct, zero-copy view + /// into the memory-mapped file. + /// * **High-performance**: Bypasses the key hashing step if hashes are already + /// available. + /// * **Thread-safe**: Acquires a single read lock for the entire batch + /// operation, minimizing contention. + /// + /// # Parameters + /// - `prehashed_keys`: A slice of `u64` key hashes to look up. + /// - `non_hashed_keys`: An optional slice of the original, non-hashed keys + /// corresponding to `prehashed_keys`. + /// - If `Some(keys)`, the method performs a tag-based verification to ensure + /// that the found entry truly belongs to the original key, preventing + /// data retrieval from a hash collision. The length of this slice + /// **must** match the length of `prehashed_keys`. + /// - If `None`, this verification is skipped. The lookup relies solely + /// on the hash, which is faster but carries a theoretical risk of + /// returning incorrect data in the event of a hash collision. + /// + /// # Returns + /// - `Ok(results)`: A `Vec>` where each element + /// corresponds to the result of looking up the key at the same index. + /// - `Err(std::io::Error)`: On I/O failure or if the lengths of `prehashed_keys` + /// and `non_hashed_keys` (when `Some`) do not match. + fn batch_read_hashed_keys( + &self, + prehashed_keys: &[u64], + non_hashed_keys: Option<&[&[u8]]>, + ) -> Result>>; + /// Retrieves metadata for a given key. /// /// This method looks up a key in the storage and returns its associated metadata. @@ -120,6 +192,22 @@ pub trait AsyncDataStoreReader { /// - `Err(std::io::Error)`: On I/O failure. async fn exists(&self, key: &[u8]) -> Result; + /// Checks whether a key with a pre-computed hash exists in the store. + /// + /// This is a more direct version of [`Self::exists`] that skips the hashing step, + /// making it faster if the hash is already known. Because the original key is not + /// provided, this check does not perform tag verification and relies solely on the + /// hash's presence in the index. + /// + /// # Parameters + /// - `prehashed_key`: The **pre-computed hash** of the key to check. + /// + /// # Returns + /// - `Ok(true)` if the key hash exists in the index. + /// - `Ok(false)` if the key hash is absent. + /// - `Err(std::io::Error)`: On I/O failure. + async fn exists_with_key_hash(&self, prehashed_key: u64) -> Result; + /// Retrieves the most recent value associated with a given key. /// /// This method **efficiently looks up a key** using a fast in-memory index, @@ -137,6 +225,26 @@ pub trait AsyncDataStoreReader { /// - The returned `EntryHandle` provides zero-copy access to the stored data. async fn read(&self, key: &[u8]) -> Result>; + /// Retrieves the most recent value associated with a pre-computed key hash. + /// + /// This is a low-level alternative to [`Self::read`] that looks up an entry using + /// only its hash, bypassing the hashing step. + /// + /// # Warning + /// This method does **not** perform tag verification, as the original key is not + /// provided. This means that in the rare event of a hash collision, this function + /// could return the entry for a different key. + /// + /// # Parameters + /// - `prehashed_key`: The **pre-computed hash** of the key to retrieve. + /// + /// # Returns + /// - `Ok(Some(EntryHandle))`: Handle to the entry if found. + /// - `Ok(None)`: If the key hash does not exist or is deleted. + /// - `Err(std::io::Error)`: On I/O failure. + async fn read_with_key_hash(&self, prehashed_key: u64) + -> Result>; + /// Retrieves the last entry written to the file. /// /// # Returns: @@ -165,6 +273,43 @@ pub trait AsyncDataStoreReader { /// - `Err(std::io::Error)`: On I/O failure. async fn batch_read(&self, keys: &[&[u8]]) -> Result>>; + /// Reads many keys in one shot using pre-computed hashes. + /// + /// This is a lower-level, high-performance version of [`Self::batch_read`]. + /// It is designed for scenarios where the caller has already computed the key + /// hashes and wants to avoid the overhead of re-hashing. The method offers + /// an optional verification step to safeguard against hash collisions. + /// + /// * **Zero-copy**: Each `Some(EntryHandle)` provides a direct, zero-copy view + /// into the memory-mapped file. + /// * **High-performance**: Bypasses the key hashing step if hashes are already + /// available. + /// * **Thread-safe**: Acquires a single read lock for the entire batch + /// operation, minimizing contention. + /// + /// # Parameters + /// - `prehashed_keys`: A slice of `u64` key hashes to look up. + /// - `non_hashed_keys`: An optional slice of the original, non-hashed keys + /// corresponding to `prehashed_keys`. + /// - If `Some(keys)`, the method performs a tag-based verification to ensure + /// that the found entry truly belongs to the original key, preventing + /// data retrieval from a hash collision. The length of this slice + /// **must** match the length of `prehashed_keys`. + /// - If `None`, this verification is skipped. The lookup relies solely + /// on the hash, which is faster but carries a theoretical risk of + /// returning incorrect data in the event of a hash collision. + /// + /// # Returns + /// - `Ok(results)`: A `Vec>` where each element + /// corresponds to the result of looking up the key at the same index. + /// - `Err(std::io::Error)`: On I/O failure or if the lengths of `prehashed_keys` + /// and `non_hashed_keys` (when `Some`) do not match. + async fn batch_read_hashed_keys( + &self, + prehashed_keys: &[u64], + non_hashed_keys: Option<&[&[u8]]>, + ) -> Result>>; + /// Retrieves metadata for a given key. /// /// This method looks up a key in the storage and returns its associated metadata. diff --git a/src/storage_engine/traits/writer.rs b/src/storage_engine/traits/writer.rs index 59e4cb0a..fc326f8c 100644 --- a/src/storage_engine/traits/writer.rs +++ b/src/storage_engine/traits/writer.rs @@ -28,6 +28,21 @@ pub trait DataStoreWriter { /// - Metadata is appended **after** the full entry is written. fn write_stream(&self, key: &[u8], reader: &mut R) -> Result; + /// Writes an entry using a **precomputed key hash** and a streaming `Read` source. + /// + /// This is a **low-level** method that operates like `write_stream`, but requires + /// the key to be hashed beforehand. It is primarily used internally to avoid + /// redundant hash computations when writing multiple entries. + /// + /// # Parameters: + /// - `key_hash`: The **precomputed hash** of the key. + /// - `reader`: A **streaming reader** (`Read` trait) supplying the entry's content. + /// + /// # Returns: + /// - `Ok(offset)`: The file offset where the entry was written. + /// - `Err(std::io::Error)`: If a write or I/O operation fails. + fn write_stream_with_key_hash(&self, key_hash: u64, reader: &mut R) -> Result; + /// Writes an entry with a given key and payload. /// /// This method computes the hash of the key and delegates to `write_with_key_hash()`. @@ -49,10 +64,30 @@ pub trait DataStoreWriter { /// - For writing **multiple entries at once**, use `batch_write()`. fn write(&self, key: &[u8], payload: &[u8]) -> Result; + /// Writes an entry using a **precomputed key hash** and a payload. + /// + /// This method is a **low-level** alternative to `write()`, allowing direct + /// specification of the key hash. It is mainly used for optimized workflows + /// where the key hash is already known, avoiding redundant computations. + /// + /// # Parameters: + /// - `key_hash`: The **precomputed hash** of the key. + /// - `payload`: The **data payload** to be stored. + /// + /// # Returns: + /// - `Ok(offset)`: The file offset where the entry was written. + /// - `Err(std::io::Error)`: If a write operation fails. + /// + /// # Notes: + /// - The caller is responsible for ensuring that `key_hash` is correctly computed. + /// - This method **locks the file for writing** to maintain consistency. + /// - If writing **multiple entries**, consider using `batch_write_with_key_hashes()`. + fn write_with_key_hash(&self, key_hash: u64, payload: &[u8]) -> Result; + /// Writes multiple key-value pairs as a **single transaction**. /// /// This method computes the hashes of the provided keys and delegates to - /// `batch_write_hashed_payloads()`, ensuring all writes occur in a single + /// `batch_write_with_key_hashes()`, ensuring all writes occur in a single /// locked operation for efficiency. /// /// # Parameters: @@ -67,9 +102,41 @@ pub trait DataStoreWriter { /// # Notes: /// - This method improves efficiency by **minimizing file lock contention**. /// - If a large number of entries are written, **batching reduces overhead**. - /// - If the key hashes are already computed, use `batch_write_hashed_payloads()`. + /// - If the key hashes are already computed, use `batch_write_with_key_hashes()`. fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result; + /// Writes multiple key-value pairs as a **single transaction**, using precomputed key hashes. + /// + /// This method efficiently appends multiple entries in a **batch operation**, + /// reducing lock contention and improving performance for bulk writes. + /// + /// # Parameters: + /// - `prehashed_keys`: A **vector of precomputed key hashes and payloads**, where: + /// - `key_hash`: The **precomputed hash** of the key. + /// - `payload`: The **data payload** to be stored. + /// + /// # Returns: + /// - `Ok(final_offset)`: The file offset after all writes. + /// - `Err(std::io::Error)`: If a write operation fails. + /// + /// # Notes: + /// - **File locking is performed only once** for all writes, improving efficiency. + /// - If an entry's `payload` is empty, an error is returned. + /// - This method uses **SIMD-accelerated memory copy (`simd_copy`)** to optimize write + /// performance. + /// - **Metadata (checksums, offsets) is written after payloads** to ensure data integrity. + /// - After writing, the memory-mapped file (`mmap`) is **remapped** to reflect updates. + /// + /// # Efficiency Considerations: + /// - **Faster than multiple `write()` calls**, since it reduces lock contention. + /// - Suitable for **bulk insertions** where key hashes are known beforehand. + /// - If keys are available but not hashed, use `batch_write()` instead. + fn batch_write_with_key_hashes( + &self, + prehashed_keys: Vec<(u64, &[u8])>, + allow_null_bytes: bool, + ) -> Result; + /// Renames an existing entry by copying it under a new key and marking the old key as deleted. /// /// This function: @@ -131,17 +198,51 @@ pub trait DataStoreWriter { /// the append-only structure. fn transfer(&self, key: &[u8], target: &DataStore) -> Result; - /// Deletes a key by appending a **null byte marker**. + /// Logically deletes an entry by its key. /// - /// The storage engine is **append-only**, so keys cannot be removed directly. - /// Instead, a **null byte is appended** as a tombstone entry to mark the key as deleted. + /// The storage engine is **append-only**, so entries are not removed directly. + /// Instead, this method appends a **tombstone marker** to logically delete the key. /// - /// # Parameters: + /// This operation first **verifies that the key exists** before appending a tombstone. + /// If the key is not found, no data is written to the file, and the operation + /// succeeds without changing the store's state. + /// + /// # Parameters /// - `key`: The **binary key** to mark as deleted. /// - /// # Returns: - /// - The **new file offset** where the delete marker was appended. + /// # Returns + /// - `Ok(tail_offset)`: The file's tail offset after the operation completes. + /// - `Err(std::io::Error)`: On I/O failure. fn delete(&self, key: &[u8]) -> Result; + + /// Deletes a batch of entries from the storage by their keys. + /// + /// This method computes the hash for each key and then calls the underlying + /// `batch_delete_key_hashes` method. It will only write deletion markers + /// (tombstones) for keys that currently exist in the store. + /// + /// # Parameters + /// - `keys`: A slice of keys to be deleted. + /// + /// # Returns + /// - `Ok(tail_offset)`: The new tail offset of the file after the operation. + /// - `Err(std::io::Error)`: On I/O failure. + fn batch_delete(&self, keys: &[&[u8]]) -> Result; + + /// Deletes a batch of entries from the storage using pre-computed key hashes. + /// + /// This is the lowest-level batch deletion method. It checks for the existence + /// of each key hash in the in-memory index before writing a deletion marker. + /// This prevents the store from being filled with unnecessary tombstones for + /// keys that were never present. + /// + /// # Parameters + /// - `prehashed_keys`: A slice of `u64` key hashes to be deleted. + /// + /// # Returns + /// - `Ok(tail_offset)`: The new tail offset of the file after the operation. + /// - `Err(std::io::Error)`: On I/O failure. + fn batch_delete_key_hashes(&self, prehashed_keys: &[u64]) -> Result; } #[async_trait::async_trait] @@ -172,6 +273,25 @@ pub trait AsyncDataStoreWriter { /// - Metadata is appended **after** the full entry is written. async fn write_stream(&self, key: &[u8], reader: &mut R) -> Result; + /// Writes an entry using a **precomputed key hash** and a streaming `Read` source. + /// + /// This is a **low-level** method that operates like `write_stream`, but requires + /// the key to be hashed beforehand. It is primarily used internally to avoid + /// redundant hash computations when writing multiple entries. + /// + /// # Parameters: + /// - `key_hash`: The **precomputed hash** of the key. + /// - `reader`: A **streaming reader** (`Read` trait) supplying the entry's content. + /// + /// # Returns: + /// - `Ok(offset)`: The file offset where the entry was written. + /// - `Err(std::io::Error)`: If a write or I/O operation fails. + async fn write_stream_with_key_hash( + &self, + key_hash: u64, + reader: &mut R, + ) -> Result; + /// Writes an entry with a given key and payload. /// /// This method computes the hash of the key and delegates to `write_with_key_hash()`. @@ -193,10 +313,30 @@ pub trait AsyncDataStoreWriter { /// - For writing **multiple entries at once**, use `batch_write()`. async fn write(&self, key: &[u8], payload: &[u8]) -> Result; + /// Writes an entry using a **precomputed key hash** and a payload. + /// + /// This method is a **low-level** alternative to `write()`, allowing direct + /// specification of the key hash. It is mainly used for optimized workflows + /// where the key hash is already known, avoiding redundant computations. + /// + /// # Parameters: + /// - `key_hash`: The **precomputed hash** of the key. + /// - `payload`: The **data payload** to be stored. + /// + /// # Returns: + /// - `Ok(offset)`: The file offset where the entry was written. + /// - `Err(std::io::Error)`: If a write operation fails. + /// + /// # Notes: + /// - The caller is responsible for ensuring that `key_hash` is correctly computed. + /// - This method **locks the file for writing** to maintain consistency. + /// - If writing **multiple entries**, consider using `batch_write_with_key_hashes()`. + async fn write_with_key_hash(&self, key_hash: u64, payload: &[u8]) -> Result; + /// Writes multiple key-value pairs as a **single transaction**. /// /// This method computes the hashes of the provided keys and delegates to - /// `batch_write_hashed_payloads()`, ensuring all writes occur in a single + /// `batch_write_with_key_hashes()`, ensuring all writes occur in a single /// locked operation for efficiency. /// /// # Parameters: @@ -211,9 +351,41 @@ pub trait AsyncDataStoreWriter { /// # Notes: /// - This method improves efficiency by **minimizing file lock contention**. /// - If a large number of entries are written, **batching reduces overhead**. - /// - If the key hashes are already computed, use `batch_write_hashed_payloads()`. + /// - If the key hashes are already computed, use `batch_write_with_key_hashes()`. async fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result; + /// Writes multiple key-value pairs as a **single transaction**, using precomputed key hashes. + /// + /// This method efficiently appends multiple entries in a **batch operation**, + /// reducing lock contention and improving performance for bulk writes. + /// + /// # Parameters: + /// - `prehashed_keys`: A **vector of precomputed key hashes and payloads**, where: + /// - `key_hash`: The **precomputed hash** of the key. + /// - `payload`: The **data payload** to be stored. + /// + /// # Returns: + /// - `Ok(final_offset)`: The file offset after all writes. + /// - `Err(std::io::Error)`: If a write operation fails. + /// + /// # Notes: + /// - **File locking is performed only once** for all writes, improving efficiency. + /// - If an entry's `payload` is empty, an error is returned. + /// - This method uses **SIMD-accelerated memory copy (`simd_copy`)** to optimize write + /// performance. + /// - **Metadata (checksums, offsets) is written after payloads** to ensure data integrity. + /// - After writing, the memory-mapped file (`mmap`) is **remapped** to reflect updates. + /// + /// # Efficiency Considerations: + /// - **Faster than multiple `write()` calls**, since it reduces lock contention. + /// - Suitable for **bulk insertions** where key hashes are known beforehand. + /// - If keys are available but not hashed, use `batch_write()` instead. + async fn batch_write_with_key_hashes( + &self, + prehashed_keys: Vec<(u64, &[u8])>, + allow_null_bytes: bool, + ) -> Result; + /// Renames an existing entry by copying it under a new key and marking the old key as deleted. /// /// This function: @@ -275,15 +447,49 @@ pub trait AsyncDataStoreWriter { /// the append-only structure. async fn transfer(&self, key: &[u8], target: &DataStore) -> Result; - /// Deletes a key by appending a **null byte marker**. + /// Logically deletes an entry by its key. /// - /// The storage engine is **append-only**, so keys cannot be removed directly. - /// Instead, a **null byte is appended** as a tombstone entry to mark the key as deleted. + /// The storage engine is **append-only**, so entries are not removed directly. + /// Instead, this method appends a **tombstone marker** to logically delete the key. /// - /// # Parameters: + /// This operation first **verifies that the key exists** before appending a tombstone. + /// If the key is not found, no data is written to the file, and the operation + /// succeeds without changing the store's state. + /// + /// # Parameters /// - `key`: The **binary key** to mark as deleted. /// - /// # Returns: - /// - The **new file offset** where the delete marker was appended. + /// # Returns + /// - `Ok(tail_offset)`: The file's tail offset after the operation completes. + /// - `Err(std::io::Error)`: On I/O failure. async fn delete(&self, key: &[u8]) -> Result; + + /// Deletes a batch of entries from the storage by their keys. + /// + /// This method computes the hash for each key and then calls the underlying + /// `batch_delete_key_hashes` method. It will only write deletion markers + /// (tombstones) for keys that currently exist in the store. + /// + /// # Parameters + /// - `keys`: A slice of keys to be deleted. + /// + /// # Returns + /// - `Ok(tail_offset)`: The new tail offset of the file after the operation. + /// - `Err(std::io::Error)`: On I/O failure. + async fn batch_delete(&self, keys: &[&[u8]]) -> Result; + + /// Deletes a batch of entries from the storage using pre-computed key hashes. + /// + /// This is the lowest-level batch deletion method. It checks for the existence + /// of each key hash in the in-memory index before writing a deletion marker. + /// This prevents the store from being filled with unnecessary tombstones for + /// keys that were never present. + /// + /// # Parameters + /// - `prehashed_keys`: A slice of `u64` key hashes to be deleted. + /// + /// # Returns + /// - `Ok(tail_offset)`: The new tail offset of the file after the operation. + /// - `Err(std::io::Error)`: On I/O failure. + async fn batch_delete_key_hashes(&self, prehashed_keys: &[u64]) -> Result; } diff --git a/src/utils/align_or_copy.rs b/src/utils/align_or_copy.rs index 25cfc6eb..9192a6e7 100644 --- a/src/utils/align_or_copy.rs +++ b/src/utils/align_or_copy.rs @@ -41,7 +41,10 @@ use std::{borrow::Cow, mem}; /// let result: Cow<[f32]> = simd_r_drive::utils::align_or_copy::(raw, f32::from_le_bytes); /// assert_eq!(result[0], 1.0); /// ``` -pub fn align_or_copy(bytes: &[u8], from_le_bytes: fn([u8; N]) -> T) -> Cow<[T]> +pub fn align_or_copy( + bytes: &[u8], + from_le_bytes: fn([u8; N]) -> T, +) -> Cow<'_, [T]> where T: Copy, { diff --git a/tests/basic_operations_tests.rs b/tests/basic_operations_tests.rs index 42a75ded..d25dc5c5 100644 --- a/tests/basic_operations_tests.rs +++ b/tests/basic_operations_tests.rs @@ -1,265 +1,328 @@ -#[cfg(test)] -mod tests { - - use simd_r_drive::{ - DataStore, - traits::{DataStoreReader, DataStoreWriter}, - }; - use tempfile::tempdir; - - /// Helper function to create a temporary file for testing - fn create_temp_storage() -> (tempfile::TempDir, DataStore) { - let dir = tempdir().expect("Failed to create temp dir"); - let path = dir.path().join("test_storage.bin"); - - let storage = DataStore::open(&path).expect("Failed to open storage"); - (dir, storage) - } +use simd_r_drive::{ + DataStore, compute_hash, + traits::{DataStoreReader, DataStoreWriter}, +}; +use tempfile::tempdir; + +/// Helper function to create a temporary file for testing +fn create_temp_storage() -> (tempfile::TempDir, DataStore) { + let dir = tempdir().expect("Failed to create temp dir"); + let path = dir.path().join("test_storage.bin"); + + let storage = DataStore::open(&path).expect("Failed to open storage"); + (dir, storage) +} - #[test] - fn test_emptiness_check() { - let (_dir, storage) = create_temp_storage(); +#[test] +fn test_emptiness_check() { + let (_dir, storage) = create_temp_storage(); - assert!(storage.is_empty().unwrap()); + assert!(storage.is_empty().unwrap()); - let key = b"test_key".as_slice(); - let payload = b"Hello, world!".as_slice(); - storage.write(key, payload).expect("Failed to append entry"); + let key = b"test_key".as_slice(); + let payload = b"Hello, world!".as_slice(); + storage.write(key, payload).expect("Failed to append entry"); - assert!(!storage.is_empty().unwrap()); - } + assert!(!storage.is_empty().unwrap()); +} - #[test] - fn test_exists_checks_key_presence() { - let (_dir, storage) = create_temp_storage(); - - let key = b"exists_key".as_slice(); - let payload = b"some payload".as_slice(); - - // 1. Key should NOT exist before any write. - assert!( - !storage.exists(key).unwrap(), - "Key unexpectedly exists before write" - ); - - // 2. After write, key should exist. - storage.write(key, payload).expect("Failed to write entry"); - assert!(storage.exists(key).unwrap(), "Key should exist after write"); - - // 3. After delete, key should no longer exist. - storage.delete(key).expect("Failed to delete entry"); - assert!( - !storage.exists(key).unwrap(), - "Key should not exist after delete" - ); - } +#[test] +fn test_exists_checks_key_presence() { + let (_dir, storage) = create_temp_storage(); + + let key = b"exists_key".as_slice(); + let payload = b"some payload".as_slice(); + + // 1. Key should NOT exist before any write. + assert!( + !storage.exists(key).unwrap(), + "Key unexpectedly exists before write" + ); + + // 2. After write, key should exist. + storage.write(key, payload).expect("Failed to write entry"); + assert!(storage.exists(key).unwrap(), "Key should exist after write"); + + // 3. After delete, key should no longer exist. + storage.delete(key).expect("Failed to delete entry"); + assert!( + !storage.exists(key).unwrap(), + "Key should not exist after delete" + ); +} - #[test] - fn test_append_and_read_last_entry() { - let (_dir, storage) = create_temp_storage(); +#[test] +fn test_append_and_read_last_entry() { + let (_dir, storage) = create_temp_storage(); - let key = b"test_key".as_slice(); - let payload = b"Hello, world!".as_slice(); - storage.write(key, payload).expect("Failed to append entry"); + let key = b"test_key".as_slice(); + let payload = b"Hello, world!".as_slice(); + storage.write(key, payload).expect("Failed to append entry"); - let last_entry = storage.read_last_entry().unwrap().expect("No entry found"); - assert_eq!( - last_entry.as_slice(), - payload, - "Stored payload does not match expected value" - ); - } + let last_entry = storage.read_last_entry().unwrap().expect("No entry found"); + assert_eq!( + last_entry.as_slice(), + payload, + "Stored payload does not match expected value" + ); +} + +#[test] +fn test_multiple_appends_and_reads() { + let (_dir, storage) = create_temp_storage(); + + let entries = vec![ + (b"key1".as_slice(), b"First Entry".as_slice()), + (b"key2".as_slice(), b"Second Entry".as_slice()), + (b"key3".as_slice(), b"Third Entry".as_slice()), + ]; - #[test] - fn test_multiple_appends_and_reads() { - let (_dir, storage) = create_temp_storage(); - - let entries = vec![ - (b"key1".as_slice(), b"First Entry".as_slice()), - (b"key2".as_slice(), b"Second Entry".as_slice()), - (b"key3".as_slice(), b"Third Entry".as_slice()), - ]; - - for (key, payload) in &entries { - storage.write(key, payload).expect("Failed to append entry"); - } - - let last_entry = storage - .read_last_entry() - .unwrap() - .expect("No last entry found"); - assert_eq!( - last_entry.as_slice(), - entries.last().unwrap().1, - "Last entry does not match expected value" - ); + for (key, payload) in &entries { + storage.write(key, payload).expect("Failed to append entry"); } - #[test] - fn test_varying_payload_sizes() { - let (_dir, storage) = create_temp_storage(); - - let payloads = [ - vec![b'a'; 10], // Small payload - vec![b'b'; 1024], // Medium payload - vec![b'c'; 4096], - ]; - - for (i, payload) in payloads.iter().enumerate() { - storage - .write(format!("key{i}").as_bytes(), payload.as_slice()) - .expect("Failed to append entry"); - } - - let last_entry = storage - .read_last_entry() - .unwrap() - .expect("No last entry found"); - assert_eq!( - last_entry.as_slice(), - payloads.last().unwrap().as_slice(), - "Last entry payload does not match expected value" - ); + let last_entry = storage + .read_last_entry() + .unwrap() + .expect("No last entry found"); + assert_eq!( + last_entry.as_slice(), + entries.last().unwrap().1, + "Last entry does not match expected value" + ); +} + +#[test] +fn test_varying_payload_sizes() { + let (_dir, storage) = create_temp_storage(); + + let payloads = [ + vec![b'a'; 10], // Small payload + vec![b'b'; 1024], // Medium payload + vec![b'c'; 4096], + ]; + + for (i, payload) in payloads.iter().enumerate() { + storage + .write(format!("key{i}").as_bytes(), payload.as_slice()) + .expect("Failed to append entry"); } - #[test] - fn test_retrieve_entry_by_key() { - let (_dir, storage) = create_temp_storage(); + let last_entry = storage + .read_last_entry() + .unwrap() + .expect("No last entry found"); + assert_eq!( + last_entry.as_slice(), + payloads.last().unwrap().as_slice(), + "Last entry payload does not match expected value" + ); +} - let key = b"test_key".as_slice(); - let payload = b"Hello, world!".as_slice(); - storage.write(key, payload).expect("Failed to append entry"); +#[test] +fn test_retrieve_entry_by_key() { + let (_dir, storage) = create_temp_storage(); - let retrieved = storage.read(key).unwrap(); + let key = b"test_key".as_slice(); + let payload = b"Hello, world!".as_slice(); + storage.write(key, payload).expect("Failed to append entry"); - assert!( - retrieved.is_some(), - "Entry should be found by key, but got None" - ); + let retrieved = storage.read(key).unwrap(); - assert_eq!( - retrieved.unwrap().as_slice(), - payload, - "Retrieved payload does not match expected value" - ); + assert!( + retrieved.is_some(), + "Entry should be found by key, but got None" + ); + + assert_eq!( + retrieved.unwrap().as_slice(), + payload, + "Retrieved payload does not match expected value" + ); +} + +#[test] +fn test_update_entries_with_varying_lengths() { + let (_dir, storage) = create_temp_storage(); + + let key1 = b"key1".as_slice(); + let key2 = b"key2".as_slice(); + let key3 = b"key3".as_slice(); + + let initial_payload1 = b"Short".as_slice(); + let initial_payload2 = b"Medium length payload".as_slice(); + let initial_payload3 = b"Longer initial payload data".as_slice(); + + storage + .write(key1, initial_payload1) + .expect("Failed to append entry"); + storage + .write(key2, initial_payload2) + .expect("Failed to append entry"); + storage + .write(key3, initial_payload3) + .expect("Failed to append entry"); + + let updated_payload1 = b"Updated with longer data!".as_slice(); + let updated_payload2 = b"Short".as_slice(); + + storage + .write(key1, updated_payload1) + .expect("Failed to update entry"); + storage + .write(key2, updated_payload2) + .expect("Failed to update entry"); + + let retrieved1 = storage + .read(key1) + .unwrap() + .expect("Entry for key1 should be found"); + assert_eq!( + retrieved1.as_slice(), + updated_payload1, + "Latest version of key1 was not retrieved" + ); + + let retrieved2 = storage + .read(key2) + .unwrap() + .expect("Entry for key2 should be found"); + assert_eq!( + retrieved2.as_slice(), + updated_payload2, + "Latest version of key2 was not retrieved" + ); + + let retrieved3 = storage + .read(key3) + .unwrap() + .expect("Entry for key3 should be found"); + assert_eq!( + retrieved3.as_slice(), + initial_payload3, + "Key3 should remain unchanged" + ); +} + +#[test] +fn test_open_existing_storage() { + let dir = tempdir().expect("Failed to create temp dir"); + let path = dir.path().join("test_storage_existing.bin"); + + // Create the file first + { + let _storage = DataStore::open(&path).expect("Failed to create storage file"); } - #[test] - fn test_update_entries_with_varying_lengths() { - let (_dir, storage) = create_temp_storage(); + // Now attempt to open it with `open_existing` + let storage = DataStore::open_existing(&path).expect("Failed to open existing storage file"); + + // Ensure storage is accessible + let key = b"test_key".as_slice(); + let payload = b"Existing file test".as_slice(); + storage.write(key, payload).expect("Failed to write entry"); + + let retrieved = storage + .read(key) + .unwrap() + .expect("Entry should exist in storage"); + assert_eq!( + retrieved.as_slice(), + payload, + "Retrieved payload does not match expected value" + ); +} - let key1 = b"key1".as_slice(); - let key2 = b"key2".as_slice(); - let key3 = b"key3".as_slice(); +#[test] +fn test_open_existing_fails_for_missing_file() { + let dir = tempdir().expect("Failed to create temp dir"); + let path = dir.path().join("non_existent_storage.bin"); - let initial_payload1 = b"Short".as_slice(); - let initial_payload2 = b"Medium length payload".as_slice(); - let initial_payload3 = b"Longer initial payload data".as_slice(); + let result = DataStore::open_existing(&path); + assert!( + result.is_err(), + "Expected error when opening non-existent file" + ); +} - storage - .write(key1, initial_payload1) - .expect("Failed to append entry"); - storage - .write(key2, initial_payload2) - .expect("Failed to append entry"); - storage - .write(key3, initial_payload3) - .expect("Failed to append entry"); +#[test] +fn test_write_null_byte_fails() { + let (_dir, storage) = create_temp_storage(); - let updated_payload1 = b"Updated with longer data!".as_slice(); - let updated_payload2 = b"Short".as_slice(); + let key = b"test_key"; - storage - .write(key1, updated_payload1) - .expect("Failed to update entry"); - storage - .write(key2, updated_payload2) - .expect("Failed to update entry"); - - let retrieved1 = storage - .read(key1) - .unwrap() - .expect("Entry for key1 should be found"); - assert_eq!( - retrieved1.as_slice(), - updated_payload1, - "Latest version of key1 was not retrieved" - ); - - let retrieved2 = storage - .read(key2) - .unwrap() - .expect("Entry for key2 should be found"); - assert_eq!( - retrieved2.as_slice(), - updated_payload2, - "Latest version of key2 was not retrieved" - ); - - let retrieved3 = storage - .read(key3) - .unwrap() - .expect("Entry for key3 should be found"); - assert_eq!( - retrieved3.as_slice(), - initial_payload3, - "Key3 should remain unchanged" - ); - } + let result = storage.write(key, b"\x00"); - #[test] - fn test_open_existing_storage() { - let dir = tempdir().expect("Failed to create temp dir"); - let path = dir.path().join("test_storage_existing.bin"); - - // Create the file first - { - let _storage = DataStore::open(&path).expect("Failed to create storage file"); - } - - // Now attempt to open it with `open_existing` - let storage = - DataStore::open_existing(&path).expect("Failed to open existing storage file"); - - // Ensure storage is accessible - let key = b"test_key".as_slice(); - let payload = b"Existing file test".as_slice(); - storage.write(key, payload).expect("Failed to write entry"); - - let retrieved = storage - .read(key) - .unwrap() - .expect("Entry should exist in storage"); - assert_eq!( - retrieved.as_slice(), - payload, - "Retrieved payload does not match expected value" - ); - } + assert!( + result.is_err(), + "Expected error when writing a null-byte payload" + ); +} - #[test] - fn test_open_existing_fails_for_missing_file() { - let dir = tempdir().expect("Failed to create temp dir"); - let path = dir.path().join("non_existent_storage.bin"); +#[test] +fn test_read_with_key_hash() { + let (_dir, storage) = create_temp_storage(); + let key = b"test_key_for_hash_read"; + let payload = b"some data"; + storage.write(key, payload).expect("Write failed"); + + // Case 1: Read an existing key using its pre-computed hash + let key_hash = compute_hash(key); + let result = storage + .read_with_key_hash(key_hash) + .expect("Read with hash failed"); + + assert!(result.is_some(), "Expected to find an entry with the hash"); + assert_eq!(result.unwrap().as_slice(), payload); + + // Case 2: Attempt to read a key that doesn't exist + let missing_hash = compute_hash(b"a_key_that_was_never_written"); + let result_missing = storage + .read_with_key_hash(missing_hash) + .expect("Read with missing hash failed"); + + assert!( + result_missing.is_none(), + "Expected to get None for a missing hash" + ); +} - let result = DataStore::open_existing(&path); - assert!( - result.is_err(), - "Expected error when opening non-existent file" - ); - } +#[test] +fn test_exists_with_key_hash() { + let (_dir, storage) = create_temp_storage(); + let key = b"test_key_for_hash_exists"; + storage.write(key, b"some data").expect("Write failed"); + + let existing_hash = compute_hash(key); + let missing_hash = compute_hash(b"a_missing_key"); + + // Case 1: Check for a key that exists + assert!( + storage.exists_with_key_hash(existing_hash).unwrap(), + "Expected exists_with_key_hash to be true for an existing key" + ); + + // Case 2: Check for a key that does not exist + assert!( + !storage.exists_with_key_hash(missing_hash).unwrap(), + "Expected exists_with_key_hash to be false for a missing key" + ); +} - #[test] - fn test_write_null_byte_fails() { - let (_dir, storage) = create_temp_storage(); +#[test] +fn test_exists_with_key_hash_after_deletion() { + let (_dir, storage) = create_temp_storage(); + let key = b"key_to_be_deleted"; + storage.write(key, b"data").expect("Write failed"); - let key = b"test_key"; + let key_hash = compute_hash(key); - let result = storage.write(key, b"\x00"); + // Verify it exists before deletion + assert!(storage.exists_with_key_hash(key_hash).unwrap()); - assert!( - result.is_err(), - "Expected error when writing a null-byte payload" - ); - } + // Delete the key + storage.delete(key).expect("Delete failed"); + + // Verify it no longer exists + assert!(!storage.exists_with_key_hash(key_hash).unwrap()); } diff --git a/tests/batch_ops_tests.rs b/tests/batch_ops_tests.rs index 425d5b22..68d9ca7d 100644 --- a/tests/batch_ops_tests.rs +++ b/tests/batch_ops_tests.rs @@ -1,7 +1,7 @@ //! Integration-tests for the batch-write / batch-read API. use simd_r_drive::{ - DataStore, + DataStore, compute_hash, compute_hash_batch, traits::{DataStoreReader, DataStoreWriter}, }; use tempfile::tempdir; @@ -165,3 +165,189 @@ fn test_batch_read_with_missing_key() { "wrong payload for exists_2" ); } + +#[test] +fn test_batch_write_rejects_null_byte_among_many() { + let (_dir, storage) = create_temp_storage(); + // First call should fail because the middle entry is a null byte + let entries = vec![ + (b"k1".as_slice(), b"payload1".as_slice()), + (b"k_null".as_slice(), b"\0".as_slice()), // ← invalid + (b"k2".as_slice(), b"payload2".as_slice()), + ]; + let err = storage.batch_write(&entries).expect_err("should fail"); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); + + // Ensure *nothing* was persisted + assert!(storage.read(b"k1").unwrap().is_none()); + assert!(storage.read(b"k2").unwrap().is_none()); + assert_eq!( + storage.len().unwrap(), + 0, + "no entries should have been written" + ); +} + +/// End-to-end test of `batch_read_hashed_keys` with full verification. +/// * write with `batch_write` +/// * compute hashes +/// * fetch with `batch_read_hashed_keys` providing both hashes and original keys +/// * verify ordering & presence match +#[test] +fn test_batch_read_hashed_keys_with_verification() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![ + (b"key1".as_slice(), b"val1".as_slice()), + (b"key2".as_slice(), b"val2".as_slice()), + ]; + storage.batch_write(&entries).expect("batch_write failed"); + + let keys: Vec<&[u8]> = entries.iter().map(|(k, _)| *k).collect(); + let hashes = compute_hash_batch(&keys); + + // Read back using the hashed key method with original keys for verification + let results = storage + .batch_read_hashed_keys(&hashes, Some(&keys)) + .unwrap(); + assert_eq!(results.len(), keys.len()); + + for ((_expected_key, expected_val), got_opt) in entries.iter().zip(results.iter()) { + let got = got_opt + .as_ref() + .expect("missing key in batch_read_hashed_keys"); + assert_eq!(got.as_slice(), *expected_val); + } +} + +/// End-to-end test of `batch_read_hashed_keys` without verification (hash-only). +#[test] +fn test_batch_read_hashed_keys_without_verification() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![(b"key1".as_slice(), b"val1".as_slice())]; + storage.batch_write(&entries).expect("batch_write failed"); + + let keys: Vec<&[u8]> = entries.iter().map(|(k, _)| *k).collect(); + let hashes = compute_hash_batch(&keys); + + // Read back using only the hash, passing `None` for the original keys + let results = storage.batch_read_hashed_keys(&hashes, None).unwrap(); + assert_eq!(results.len(), 1); + assert_eq!(results[0].as_ref().unwrap().as_slice(), entries[0].1); +} + +/// `batch_read_hashed_keys` should return `None` for keys that are not present. +#[test] +fn test_batch_read_hashed_keys_with_missing_keys() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![(b"exists".as_slice(), b"payload".as_slice())]; + storage.batch_write(&entries).expect("batch_write failed"); + + let existing_hash = compute_hash_batch(&[b"exists" as &[u8]])[0]; + let missing_hash = 12345_u64; // A key that was never written + + let hashes = vec![existing_hash, missing_hash]; + let results = storage.batch_read_hashed_keys(&hashes, None).unwrap(); + + assert_eq!(results.len(), 2); + assert!(results[0].is_some(), "expected entry for existing key"); + assert!(results[1].is_none(), "expected None for missing key"); +} + +/// Verifies that `batch_read_hashed_keys` with key verification enabled +/// will reject a match if the key's tag doesn't align with the hash. +/// This simulates a hash collision and confirms the safety check works. +#[test] +fn test_batch_read_hashed_keys_detects_collision() { + let (_dir, storage) = create_temp_storage(); + let real_key = b"real_key"; + let fake_key = b"fake_key"; // A different key + let payload = b"some data"; + storage.write(real_key, payload).unwrap(); + + // Get the hash of the key that actually exists in storage. + let real_hash = compute_hash_batch(&[real_key])[0]; + + // Now, try to read using the *real hash* but providing the *fake key* + // for verification. The tag check inside the read logic should fail. + let results = storage + .batch_read_hashed_keys(&[real_hash], Some(&[fake_key])) + .unwrap(); + + assert_eq!(results.len(), 1); + assert!( + results[0].is_none(), + "Read should fail due to tag mismatch, simulating a hash collision" + ); +} + +/// Happy-path: write a handful of entries, then delete a subset of them +/// in a single batch operation. +#[test] +fn test_batch_delete() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![ + (b"alpha".as_slice(), b"one".as_slice()), + (b"beta".as_slice(), b"two".as_slice()), + (b"gamma".as_slice(), b"three".as_slice()), + (b"delta".as_slice(), b"four".as_slice()), + ]; + storage.batch_write(&entries).expect("batch_write failed"); + assert_eq!(storage.len().unwrap(), 4); + + // Delete two of the entries + let keys_to_delete = [b"beta".as_slice(), b"delta".as_slice()]; + storage + .batch_delete(&keys_to_delete) + .expect("batch_delete failed"); + + // Verify store state + assert_eq!(storage.len().unwrap(), 2, "Length should be reduced by 2"); + assert!(storage.read(b"beta").unwrap().is_none()); + assert!(storage.read(b"delta").unwrap().is_none()); + + // Ensure other keys are unaffected + assert!(storage.read(b"alpha").unwrap().is_some()); + assert!(storage.read(b"gamma").unwrap().is_some()); +} + +/// Verify that `batch_delete` correctly handles a mix of keys that +/// exist and keys that do not. The operation should succeed, and only +/// existing keys should be deleted. +#[test] +fn test_batch_delete_with_missing_keys() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![(b"key1".as_slice(), b"val1".as_slice())]; + storage.batch_write(&entries).expect("batch_write failed"); + assert_eq!(storage.len().unwrap(), 1); + + // Attempt to delete one existing and one non-existent key + let keys_to_delete = [b"key1".as_slice(), b"non_existent_key".as_slice()]; + storage + .batch_delete(&keys_to_delete) + .expect("batch_delete should not fail on missing keys"); + + // Verify only the existing key was deleted + assert_eq!(storage.len().unwrap(), 0); + assert!(storage.is_empty().unwrap()); + assert!(storage.read(b"key1").unwrap().is_none()); +} + +/// Verify the lowest-level batch delete function works as intended, +/// ignoring hashes for keys that are not present in the store. +#[test] +fn test_batch_delete_key_hashes() { + let (_dir, storage) = create_temp_storage(); + storage.write(b"real", b"data").unwrap(); + assert_eq!(storage.len().unwrap(), 1); + + let real_hash = compute_hash(b"real"); + let fake_hash = 1234567890_u64; // A hash for a key that doesn't exist + + let hashes_to_delete = [real_hash, fake_hash]; + storage + .batch_delete_key_hashes(&hashes_to_delete) + .expect("batch_delete_key_hashes failed"); + + // The store should now be empty because the only real key was deleted. + assert!(storage.is_empty().unwrap()); +} diff --git a/tests/parallel_iterator_tests.rs b/tests/parallel_iterator_tests.rs new file mode 100644 index 00000000..1789a57c --- /dev/null +++ b/tests/parallel_iterator_tests.rs @@ -0,0 +1,155 @@ +// This attribute ensures the entire file is only compiled and run when +// the "parallel" feature is enabled. +#![cfg(feature = "parallel")] + +use rayon::prelude::*; +use simd_r_drive::{DataStore, traits::DataStoreWriter}; +use std::collections::HashSet; +use tempfile::tempdir; + +/// Helper function to create a temporary file for testing. +fn create_temp_storage() -> (tempfile::TempDir, DataStore) { + let dir = tempdir().expect("Failed to create temp dir"); + let path = dir.path().join("test_storage.bin"); + let storage = DataStore::open(&path).expect("Failed to open storage"); + (dir, storage) +} + +#[test] +fn test_par_iter_produces_correct_entries() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![ + (b"key1".as_slice(), b"payload1".as_slice()), + (b"key2".as_slice(), b"payload2".as_slice()), + (b"key3".as_slice(), b"payload3".as_slice()), + ]; + storage.batch_write(&entries).expect("Batch write failed"); + + // Use a HashSet to verify that the parallel iterator produces the exact + // same set of payloads as the sequential one, ignoring order. + let expected_payloads: HashSet> = storage + .iter_entries() + .map(|e| e.as_slice().to_vec()) + .collect(); + + let parallel_payloads: HashSet> = storage + .par_iter_entries() + .map(|e| e.as_slice().to_vec()) + .collect(); + + assert_eq!( + expected_payloads, parallel_payloads, + "Parallel iterator should produce the same set of entries as the sequential one" + ); + assert_eq!(parallel_payloads.len(), 3); +} + +#[test] +fn test_par_iter_skips_deleted_entries() { + let (_dir, storage) = create_temp_storage(); + let entries = vec![ + (b"key1".as_slice(), b"payload1".as_slice()), + (b"key_to_delete".as_slice(), b"payload_to_delete".as_slice()), + (b"key3".as_slice(), b"payload3".as_slice()), + ]; + storage.batch_write(&entries).expect("Batch write failed"); + storage.delete(b"key_to_delete").expect("Delete failed"); + + // Collect all payloads found by the parallel iterator. + let found_payloads: Vec> = storage + .par_iter_entries() + .map(|e| e.as_slice().to_vec()) + .collect(); + + assert_eq!( + found_payloads.len(), + 2, + "Parallel iterator should not include deleted entries" + ); + + // Ensure the deleted payload is not present. + let deleted_payload = b"payload_to_delete".to_vec(); + assert!( + !found_payloads.contains(&deleted_payload), + "Deleted payload should not be found in parallel iteration results" + ); +} + +#[test] +fn test_par_iter_on_empty_store() { + let (_dir, storage) = create_temp_storage(); + + let count = storage.par_iter_entries().count(); + + assert_eq!( + count, 0, + "Parallel iterator should produce zero items for an empty store" + ); +} + +#[test] +fn test_par_iter_yields_only_latest_version_of_updated_entry() { + let (_dir, storage) = create_temp_storage(); + + // Write initial versions of two keys + storage + .write(b"updated_key", b"version1") + .expect("Write failed"); + storage + .write(b"stable_key", b"stable_version") + .expect("Write failed"); + + // Update one of the keys + storage + .write(b"updated_key", b"version2_final") + .expect("Update failed"); + + // Collect the results from the parallel iterator + let final_payloads: HashSet> = storage + .par_iter_entries() + .map(|e| e.as_slice().to_vec()) + .collect(); + + // The iterator should yield two entries: the final version of the updated key + // and the stable key. + assert_eq!(final_payloads.len(), 2); + assert!(final_payloads.contains(b"version2_final".as_slice())); + assert!(final_payloads.contains(b"stable_version".as_slice())); + + // Crucially, the stale, older version should NOT be present. + assert!(!final_payloads.contains(b"version1".as_slice())); +} + +#[test] +fn test_par_iter_excludes_entries_that_were_updated_then_deleted() { + let (_dir, storage) = create_temp_storage(); + + // Write and then update a key that we intend to delete + storage + .write(b"deleted_key", b"version1") + .expect("Write failed"); + storage + .write(b"deleted_key", b"version2") + .expect("Update failed"); + + // Write another key that will remain + storage + .write(b"stable_key", b"stable_version") + .expect("Write failed"); + + // Now, delete the key that has multiple versions + storage.delete(b"deleted_key").expect("Delete failed"); + + let final_payloads: HashSet> = storage + .par_iter_entries() + .map(|e| e.as_slice().to_vec()) + .collect(); + + // The iterator should only yield the one remaining stable key. + assert_eq!(final_payloads.len(), 1); + assert!(final_payloads.contains(b"stable_version".as_slice())); + + // Assert that NEITHER version of the deleted key is present. + assert!(!final_payloads.contains(b"version1".as_slice())); + assert!(!final_payloads.contains(b"version2".as_slice())); +}