From fbf4f38285498aed11b66053dcff4e38dbc16a2b Mon Sep 17 00:00:00 2001 From: Laurin <42897588+suxatcode@users.noreply.github.com> Date: Fri, 6 Jun 2025 16:44:11 +0200 Subject: [PATCH 1/5] WIP: refactor gateway to use runtime abstraction (#1) --- README.md | 2 +- docs/simulation-testing.md | 28 +++++++++++++++++ helix-container/src/main.rs | 3 +- .../helix_gateway/connection/connection.rs | 22 +++++++------- helixdb/src/helix_gateway/gateway.rs | 15 ++++++---- .../helix_gateway/thread_pool/thread_pool.rs | 30 +++++++++++-------- helixdb/src/helix_runtime/mod.rs | 22 ++++++++++++++ helixdb/src/helix_runtime/tokio_runtime.rs | 24 +++++++++++++++ helixdb/src/lib.rs | 1 + 9 files changed, 116 insertions(+), 31 deletions(-) create mode 100644 docs/simulation-testing.md create mode 100644 helixdb/src/helix_runtime/mod.rs create mode 100644 helixdb/src/helix_runtime/tokio_runtime.rs diff --git a/README.md b/README.md index 5bfee28c..914e2e99 100644 --- a/README.md +++ b/README.md @@ -114,7 +114,7 @@ Our current focus areas include: - Expanding vector data type capabilities for RAG applications - Implementing a test suite to enable end-to-end testing of queries before deployment -- Building a Deterministic Simulation Testing engine enabling us to robustly iterate faster +- [Building a Deterministic Simulation Testing engine](docs/simulation-testing.md) enabling us to robustly iterate faster - Binary quantisation for even better performance Long term projects: diff --git a/docs/simulation-testing.md b/docs/simulation-testing.md new file mode 100644 index 00000000..2bf52a1b --- /dev/null +++ b/docs/simulation-testing.md @@ -0,0 +1,28 @@ +# Deterministic Simulation Testing + +This document outlines the design for a deterministic simulation testing engine inspired by TigerBeetle's VOPR. The goal is to run HelixDB in a fully controlled environment where network, storage and asynchronous execution are scheduled deterministically. + +## Motivation +Deterministic simulation allows us to reproduce race conditions, inject faults and run large workloads at accelerated speed. By running real code under a custom scheduler we can explore many failure scenarios in CI. + +## Architecture Overview +1. **Async Runtime Abstraction** + - Introduce an `AsyncRuntime` trait providing primitives such as `spawn` and `sleep`. + - Production uses a Tokio backed implementation. + - A deterministic scheduler will implement the same trait for testing, controlling the order of task execution. + +2. **Transport Abstraction** + - Extract a `Transport` trait that wraps accepting and connecting TCP streams. + - `ConnectionHandler` and the thread pool depend on this trait instead of `TcpListener`/`TcpStream` directly. + - The simulator implements in-memory transport that can delay, drop or reorder packets. + +3. **Storage Abstraction** + - Define a `Storage` trait around LMDB operations in `helix_engine`. + - Implementations include the current LMDB backend and a simulated store capable of injecting errors or latency. + +4. **Simulation Harness** + - Compose the deterministic runtime, simulated transport and storage. + - Run one or more HelixDB instances inside the same process to model a cluster. + - Drive workloads, schedule events and collect metrics for fuzzing and invariant checking. + + diff --git a/helix-container/src/main.rs b/helix-container/src/main.rs index 77d700d6..c9e57eba 100644 --- a/helix-container/src/main.rs +++ b/helix-container/src/main.rs @@ -4,6 +4,7 @@ use helixdb::helix_gateway::{ gateway::{GatewayOpts, HelixGateway}, router::router::{HandlerFn, HandlerSubmission}, }; +use helixdb::helix_runtime::tokio_runtime::TokioRuntime; use inventory; use std::{collections::HashMap, sync::Arc}; @@ -76,8 +77,8 @@ async fn main() { graph, GatewayOpts::DEFAULT_POOL_SIZE, Some(routes), + TokioRuntime::default(), ).await; - // start server println!("Starting server..."); let a = gateway.connection_handler.accept_conns().await.unwrap(); diff --git a/helixdb/src/helix_gateway/connection/connection.rs b/helixdb/src/helix_gateway/connection/connection.rs index b3e53d39..b2e0c1e6 100644 --- a/helixdb/src/helix_gateway/connection/connection.rs +++ b/helixdb/src/helix_gateway/connection/connection.rs @@ -7,17 +7,16 @@ use std::{ collections::HashMap, sync::{Arc, Mutex}, }; -use tokio::{ - net::TcpListener, - task::JoinHandle, -}; +use tokio::net::TcpListener; +use crate::helix_runtime::AsyncRuntime; use crate::helix_gateway::{router::router::HelixRouter, thread_pool::thread_pool::ThreadPool}; -pub struct ConnectionHandler { +pub struct ConnectionHandler { pub address: String, pub active_connections: Arc>>, - pub thread_pool: ThreadPool, + pub thread_pool: ThreadPool, + pub runtime: R, } pub struct ClientConnection { @@ -26,21 +25,23 @@ pub struct ClientConnection { pub addr: SocketAddr, } -impl ConnectionHandler { +impl ConnectionHandler { pub fn new( address: &str, graph: Arc, size: usize, router: HelixRouter, + runtime: R, ) -> Result { Ok(Self { address: address.to_string(), active_connections: Arc::new(Mutex::new(HashMap::new())), - thread_pool: ThreadPool::new(size, graph, Arc::new(router))?, + thread_pool: ThreadPool::new(size, graph, Arc::new(router), runtime.clone())?, + runtime, }) } - pub async fn accept_conns(&self) -> Result, GraphError> { + pub async fn accept_conns(&self) -> Result<::JoinHandle<()>, GraphError> { // Create a new TcpListener for each accept_conns call let listener = TcpListener::bind(&self.address).await.map_err(|e| { eprintln!("Failed to bind to address {}: {}", self.address, e); @@ -54,7 +55,8 @@ impl ConnectionHandler { let address = self.address.clone(); - let handle = tokio::spawn(async move { + let runtime = self.runtime.clone(); + let handle = runtime.spawn(async move { loop { match listener.accept().await { diff --git a/helixdb/src/helix_gateway/gateway.rs b/helixdb/src/helix_gateway/gateway.rs index 8aa339c9..f33abb89 100644 --- a/helixdb/src/helix_gateway/gateway.rs +++ b/helixdb/src/helix_gateway/gateway.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use super::connection::connection::ConnectionHandler; +use crate::helix_runtime::AsyncRuntime; use crate::helix_engine::graph_core::graph_core::HelixGraphEngine; use super::router::router::{HandlerFn, HelixRouter}; @@ -10,21 +11,23 @@ impl GatewayOpts { pub const DEFAULT_POOL_SIZE: usize = 1024; } -pub struct HelixGateway { - pub connection_handler: ConnectionHandler, +pub struct HelixGateway { + pub connection_handler: ConnectionHandler, + pub runtime: R, } -impl HelixGateway { +impl HelixGateway { pub async fn new( address: &str, graph: Arc, size: usize, routes: Option>, - ) -> HelixGateway { + runtime: R, + ) -> HelixGateway { let router = HelixRouter::new(routes); - let connection_handler = ConnectionHandler::new(address, graph, size, router).unwrap(); + let connection_handler = ConnectionHandler::new(address, graph, size, router, runtime.clone()).unwrap(); println!("Gateway created"); - HelixGateway { connection_handler } + HelixGateway { connection_handler, runtime } } } diff --git a/helixdb/src/helix_gateway/thread_pool/thread_pool.rs b/helixdb/src/helix_gateway/thread_pool/thread_pool.rs index a29bd622..ec7b419d 100644 --- a/helixdb/src/helix_gateway/thread_pool/thread_pool.rs +++ b/helixdb/src/helix_gateway/thread_pool/thread_pool.rs @@ -1,7 +1,7 @@ use crate::helix_engine::graph_core::graph_core::HelixGraphEngine; use flume::{Receiver, Sender}; use std::sync::{Arc, Mutex}; -use tokio::task::JoinHandle; +use crate::helix_runtime::AsyncRuntime; use crate::helix_gateway::router::router::{HelixRouter, RouterError}; use crate::protocol::request::Request; @@ -12,19 +12,21 @@ extern crate tokio; use tokio::net::TcpStream; -pub struct Worker { +pub struct Worker { pub id: usize, - pub handle: JoinHandle<()>, + pub handle: ::JoinHandle<()>, + pub runtime: R, } -impl Worker { +impl Worker { fn new( id: usize, graph_access: Arc, router: Arc, rx: Receiver, - ) -> Worker { - let handle = tokio::spawn(async move { + runtime: R, + ) -> Worker { + let handle = runtime.spawn(async move { loop { let mut conn = match rx.recv_async().await { Ok(stream) => stream, @@ -66,23 +68,24 @@ impl Worker { } }); - Worker { id, handle } + Worker { id, handle, runtime } } } -pub struct ThreadPool { +pub struct ThreadPool { pub sender: Sender, pub num_unused_workers: Mutex, pub num_used_workers: Mutex, - pub workers: Vec, + pub workers: Vec>, + pub runtime: R, } - -impl ThreadPool { +impl ThreadPool { pub fn new( size: usize, graph: Arc, router: Arc, - ) -> Result { + runtime: R, + ) -> Result, RouterError> { assert!( size > 0, "Expected number of threads in thread pool to be more than 0, got {}", @@ -92,7 +95,7 @@ impl ThreadPool { let (tx, rx) = flume::unbounded::(); let mut workers = Vec::with_capacity(size); for id in 0..size { - workers.push(Worker::new(id, Arc::clone(&graph), Arc::clone(&router), rx.clone())); + workers.push(Worker::new(id, Arc::clone(&graph), Arc::clone(&router), rx.clone(), runtime.clone())); } println!("Thread pool initialized with {} workers", workers.len()); @@ -101,6 +104,7 @@ impl ThreadPool { sender: tx, num_unused_workers: Mutex::new(size), num_used_workers: Mutex::new(0), + runtime: runtime, workers, }) } diff --git a/helixdb/src/helix_runtime/mod.rs b/helixdb/src/helix_runtime/mod.rs new file mode 100644 index 00000000..8481fea4 --- /dev/null +++ b/helixdb/src/helix_runtime/mod.rs @@ -0,0 +1,22 @@ +pub mod tokio_runtime; + +use std::future::Future; +use std::pin::Pin; + +/// Trait representing the minimal async runtime capabilities required by HelixDB. +/// +/// Production code uses a Tokio-backed implementation while tests can +/// provide deterministic schedulers by implementing this trait. +pub trait AsyncRuntime { + type JoinHandle: Future + Send + 'static; + + /// Spawn a future onto the runtime. + fn spawn(&self, fut: F) -> Self::JoinHandle + where + F: Future + Send + 'static, + T: Send + 'static; + + /// Sleep for the specified duration. + fn sleep(&self, dur: std::time::Duration) -> Pin + Send>>; +} + diff --git a/helixdb/src/helix_runtime/tokio_runtime.rs b/helixdb/src/helix_runtime/tokio_runtime.rs new file mode 100644 index 00000000..8e2a3175 --- /dev/null +++ b/helixdb/src/helix_runtime/tokio_runtime.rs @@ -0,0 +1,24 @@ +use super::AsyncRuntime; +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; + +/// Tokio based implementation of [`AsyncRuntime`]. +#[derive(Clone, Default)] +pub struct TokioRuntime; + +impl AsyncRuntime for TokioRuntime { + type JoinHandle = tokio::task::JoinHandle; + + fn spawn(&self, fut: F) -> Self::JoinHandle + where + F: Future + Send + 'static, + T: Send + 'static, + { + tokio::spawn(fut) + } + + fn sleep(&self, dur: Duration) -> Pin + Send>> { + Box::pin(tokio::time::sleep(dur)) + } +} diff --git a/helixdb/src/lib.rs b/helixdb/src/lib.rs index d54be464..728101ee 100644 --- a/helixdb/src/lib.rs +++ b/helixdb/src/lib.rs @@ -5,3 +5,4 @@ pub mod helixc; #[cfg(feature = "ingestion")] pub mod ingestion_engine; pub mod protocol; +pub mod helix_runtime; From b66dcf9d328744309fa4c384ec434f61d9dae9b9 Mon Sep 17 00:00:00 2001 From: Laurin <42897588+suxatcode@users.noreply.github.com> Date: Sat, 7 Jun 2025 12:08:28 +0200 Subject: [PATCH 2/5] fix compile by adjusting async runtime (#4) --- helix-container/src/main.rs | 4 +-- helixdb/src/helix_runtime/mod.rs | 4 ++- helixdb/src/helix_runtime/tokio_runtime.rs | 30 ++++++++++++++++++++-- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/helix-container/src/main.rs b/helix-container/src/main.rs index c9e57eba..1df7e8e2 100644 --- a/helix-container/src/main.rs +++ b/helix-container/src/main.rs @@ -81,8 +81,8 @@ async fn main() { ).await; // start server println!("Starting server..."); - let a = gateway.connection_handler.accept_conns().await.unwrap(); - let b = a.await.unwrap(); + let handle = gateway.connection_handler.accept_conns().await.unwrap(); + handle.await; } diff --git a/helixdb/src/helix_runtime/mod.rs b/helixdb/src/helix_runtime/mod.rs index 8481fea4..6885b9be 100644 --- a/helixdb/src/helix_runtime/mod.rs +++ b/helixdb/src/helix_runtime/mod.rs @@ -8,7 +8,9 @@ use std::pin::Pin; /// Production code uses a Tokio-backed implementation while tests can /// provide deterministic schedulers by implementing this trait. pub trait AsyncRuntime { - type JoinHandle: Future + Send + 'static; + type JoinHandle: Future + Send + 'static + where + T: Send + 'static; /// Spawn a future onto the runtime. fn spawn(&self, fut: F) -> Self::JoinHandle diff --git a/helixdb/src/helix_runtime/tokio_runtime.rs b/helixdb/src/helix_runtime/tokio_runtime.rs index 8e2a3175..ca57ed98 100644 --- a/helixdb/src/helix_runtime/tokio_runtime.rs +++ b/helixdb/src/helix_runtime/tokio_runtime.rs @@ -1,21 +1,47 @@ use super::AsyncRuntime; use std::future::Future; use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; /// Tokio based implementation of [`AsyncRuntime`]. #[derive(Clone, Default)] pub struct TokioRuntime; +/// Wrapper around [`tokio::task::JoinHandle`] that yields the task result +/// directly and panics if the task failed. +pub struct TokioJoinHandle(tokio::task::JoinHandle); + +// SAFETY: `TokioJoinHandle` is only constructed via `TokioRuntime::spawn`, +// which requires `T: Send + 'static`. Therefore it is safe to mark this type +// as `Send` for all `T`. +unsafe impl Send for TokioJoinHandle {} + +impl Future for TokioJoinHandle { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Safety: just projecting the inner JoinHandle + let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) }; + match inner.poll(cx) { + Poll::Ready(Ok(val)) => Poll::Ready(val), + Poll::Ready(Err(err)) => panic!("Tokio task failed: {}", err), + Poll::Pending => Poll::Pending, + } + } +} + impl AsyncRuntime for TokioRuntime { - type JoinHandle = tokio::task::JoinHandle; + type JoinHandle = TokioJoinHandle + where + T: Send + 'static; fn spawn(&self, fut: F) -> Self::JoinHandle where F: Future + Send + 'static, T: Send + 'static, { - tokio::spawn(fut) + TokioJoinHandle(tokio::spawn(fut)) } fn sleep(&self, dur: Duration) -> Pin + Send>> { From b21e4642097a959220a0440cd43dc7a1809d4c4d Mon Sep 17 00:00:00 2001 From: skep Date: Sun, 8 Jun 2025 12:02:50 +0000 Subject: [PATCH 3/5] WIP: transport abstraction --- helix-container/src/main.rs | 2 + .../helix_gateway/connection/connection.rs | 58 ++++++++++--------- helixdb/src/helix_gateway/gateway.rs | 22 +++++-- .../helix_gateway/thread_pool/thread_pool.rs | 42 ++++++++------ helixdb/src/helix_transport/mod.rs | 40 +++++++++++++ .../src/helix_transport/tokio_transport.rs | 38 ++++++++++++ helixdb/src/lib.rs | 1 + 7 files changed, 155 insertions(+), 48 deletions(-) create mode 100644 helixdb/src/helix_transport/mod.rs create mode 100644 helixdb/src/helix_transport/tokio_transport.rs diff --git a/helix-container/src/main.rs b/helix-container/src/main.rs index 8b8f388a..13b0c787 100644 --- a/helix-container/src/main.rs +++ b/helix-container/src/main.rs @@ -6,6 +6,7 @@ use helixdb::helix_gateway::{ router::router::{HandlerFn, HandlerSubmission}, }; use helixdb::helix_runtime::tokio_runtime::TokioRuntime; +use helixdb::helix_transport::tokio_transport::TokioTransport; use inventory; use std::{collections::HashMap, sync::Arc}; @@ -102,6 +103,7 @@ async fn main() { Some(routes), Some(mcp_routes), TokioRuntime::default(), + TokioTransport, ) .await; diff --git a/helixdb/src/helix_gateway/connection/connection.rs b/helixdb/src/helix_gateway/connection/connection.rs index b2e0c1e6..856139bf 100644 --- a/helixdb/src/helix_gateway/connection/connection.rs +++ b/helixdb/src/helix_gateway/connection/connection.rs @@ -1,22 +1,27 @@ use crate::helix_engine::graph_core::graph_core::HelixGraphEngine; use crate::helix_engine::types::GraphError; +use crate::helix_gateway::{router::router::HelixRouter, thread_pool::thread_pool::ThreadPool}; +use crate::helix_runtime::AsyncRuntime; +use crate::helix_transport::{Listener, Transport}; use chrono::{DateTime, Utc}; -use uuid::Uuid; use std::{ - net::SocketAddr, collections::HashMap, + io, + net::SocketAddr, sync::{Arc, Mutex}, }; -use tokio::net::TcpListener; -use crate::helix_runtime::AsyncRuntime; - -use crate::helix_gateway::{router::router::HelixRouter, thread_pool::thread_pool::ThreadPool}; +use uuid::Uuid; -pub struct ConnectionHandler { +pub struct ConnectionHandler +where + R: AsyncRuntime + Clone + Send + Sync + 'static, + T: Transport, +{ pub address: String, pub active_connections: Arc>>, - pub thread_pool: ThreadPool, + pub thread_pool: ThreadPool, pub runtime: R, + transport: T, } pub struct ClientConnection { @@ -25,49 +30,49 @@ pub struct ClientConnection { pub addr: SocketAddr, } -impl ConnectionHandler { +impl ConnectionHandler +where + R: AsyncRuntime + Clone + Send + Sync + 'static, + T: Transport, + T::Stream: 'static, +{ pub fn new( address: &str, graph: Arc, size: usize, router: HelixRouter, runtime: R, + transport: T, ) -> Result { Ok(Self { address: address.to_string(), active_connections: Arc::new(Mutex::new(HashMap::new())), thread_pool: ThreadPool::new(size, graph, Arc::new(router), runtime.clone())?, runtime, + transport, }) } pub async fn accept_conns(&self) -> Result<::JoinHandle<()>, GraphError> { - // Create a new TcpListener for each accept_conns call - let listener = TcpListener::bind(&self.address).await.map_err(|e| { + let addr: SocketAddr = self.address.parse().map_err(|e| { + GraphError::GraphConnectionError( + "Invalid address".to_string(), + io::Error::new(io::ErrorKind::InvalidInput, e), + ) + })?; + let listener = self.transport.bind(addr).await.map_err(|e| { eprintln!("Failed to bind to address {}: {}", self.address, e); GraphError::GraphConnectionError("Failed to bind to address".to_string(), e) })?; - // Log binding success to stderr since stdout might be buffered - let active_connections = Arc::clone(&self.active_connections); let thread_pool_sender = self.thread_pool.sender.clone(); - let address = self.address.clone(); - let runtime = self.runtime.clone(); let handle = runtime.spawn(async move { - loop { match listener.accept().await { Ok((stream, addr)) => { - - // Configure TCP stream - if let Err(e) = stream.set_nodelay(true) { - eprintln!("Failed to set TCP_NODELAY: {}", e); - } - - // Create a client connection record let client_id = Uuid::new_v4().to_string(); let client = ClientConnection { id: client_id.clone(), @@ -75,17 +80,18 @@ impl ConnectionHandler { addr, }; - // Add to active connections active_connections .lock() .unwrap() .insert(client_id.clone(), client); - // Send to thread pool match thread_pool_sender.send_async(stream).await { Ok(_) => (), Err(e) => { - eprintln!("Error sending connection {} to thread pool: {}", client_id, e); + eprintln!( + "Error sending connection {} to thread pool: {}", + client_id, e + ); active_connections.lock().unwrap().remove(&client_id); } } diff --git a/helixdb/src/helix_gateway/gateway.rs b/helixdb/src/helix_gateway/gateway.rs index ad4a627d..3199c406 100644 --- a/helixdb/src/helix_gateway/gateway.rs +++ b/helixdb/src/helix_gateway/gateway.rs @@ -6,6 +6,7 @@ use super::router::router::{HandlerFn, HelixRouter}; use crate::{ helix_engine::graph_core::graph_core::HelixGraphEngine, helix_gateway::mcp::mcp::MCPHandlerFn, }; +use crate::helix_transport::Transport; pub struct GatewayOpts {} @@ -13,12 +14,21 @@ impl GatewayOpts { pub const DEFAULT_POOL_SIZE: usize = 8; } -pub struct HelixGateway { - pub connection_handler: ConnectionHandler, +pub struct HelixGateway +where + R: AsyncRuntime + Clone + Send + Sync + 'static, + T: Transport, +{ + pub connection_handler: ConnectionHandler, pub runtime: R, } -impl HelixGateway { +impl HelixGateway +where + R: AsyncRuntime + Clone + Send + Sync + 'static, + T: Transport, + T::Stream: 'static, +{ pub async fn new( address: &str, graph: Arc, @@ -26,10 +36,12 @@ impl HelixGateway { routes: Option>, mcp_routes: Option>, runtime: R, - ) -> HelixGateway { + transport: T, + ) -> HelixGateway { let router = HelixRouter::new(routes, mcp_routes); let connection_handler = - ConnectionHandler::new(address, graph, size, router, runtime.clone()).unwrap(); + ConnectionHandler::new(address, graph, size, router, runtime.clone(), transport) + .unwrap(); println!("Gateway created"); HelixGateway { connection_handler, diff --git a/helixdb/src/helix_gateway/thread_pool/thread_pool.rs b/helixdb/src/helix_gateway/thread_pool/thread_pool.rs index 1cdd24cf..bb80a915 100644 --- a/helixdb/src/helix_gateway/thread_pool/thread_pool.rs +++ b/helixdb/src/helix_gateway/thread_pool/thread_pool.rs @@ -7,25 +7,23 @@ use crate::helix_gateway::router::router::{HelixRouter, RouterError}; use crate::protocol::request::Request; use crate::protocol::response::Response; +use crate::helix_transport::Stream; -extern crate tokio; - -use tokio::net::TcpStream; - -pub struct Worker { +pub struct Worker { pub id: usize, pub handle: ::JoinHandle<()>, pub runtime: R, + _marker: std::marker::PhantomData, } -impl Worker { +impl Worker { fn new( id: usize, graph_access: Arc, router: Arc, - rx: Receiver, + rx: Receiver, runtime: R, - ) -> Worker { + ) -> Worker { let handle = runtime.spawn(async move { loop { let mut conn = match rx.recv_async().await { @@ -68,38 +66,48 @@ impl Worker { } }); - Worker { id, handle, runtime } + Worker { + id, + handle, + runtime, + _marker: std::marker::PhantomData, + } } } -pub struct ThreadPool { - pub sender: Sender, +pub struct ThreadPool { + pub sender: Sender, pub num_unused_workers: Mutex, pub num_used_workers: Mutex, - pub workers: Vec>, + pub workers: Vec>, pub runtime: R, } -impl ThreadPool { +impl ThreadPool { pub fn new( size: usize, graph: Arc, router: Arc, runtime: R, - ) -> Result, RouterError> { + ) -> Result, RouterError> { assert!( size > 0, "Expected number of threads in thread pool to be more than 0, got {}", size ); - let (tx, rx) = flume::bounded::(1000); + let (tx, rx) = flume::bounded::(1000); let mut workers = Vec::with_capacity(size); for id in 0..size { - workers.push(Worker::new(id, Arc::clone(&graph), Arc::clone(&router), rx.clone(), runtime.clone())); + workers.push(Worker::new( + id, + Arc::clone(&graph), + Arc::clone(&router), + rx.clone(), + runtime.clone(), + )); } println!("Thread pool initialized with {} workers", workers.len()); - Ok(ThreadPool { sender: tx, num_unused_workers: Mutex::new(size), diff --git a/helixdb/src/helix_transport/mod.rs b/helixdb/src/helix_transport/mod.rs new file mode 100644 index 00000000..2a50a7e9 --- /dev/null +++ b/helixdb/src/helix_transport/mod.rs @@ -0,0 +1,40 @@ +pub mod tokio_transport; + +use std::future::Future; +use std::io; +use std::net::SocketAddr; +use tokio::io::{AsyncRead, AsyncWrite}; + +/// A trait for objects that can be read from and written to asynchronously. +/// +/// This is a marker trait that is automatically implemented for any type that +/// implements `AsyncRead`, `AsyncWrite`, `Send`, `Sync`, and `Unpin`. +pub trait Stream: AsyncRead + AsyncWrite + Send + Sync + Unpin {} +impl Stream for T {} + +/// A trait for listeners that can accept incoming connections. +pub trait Listener: Send + Sync { + /// The type of stream that this listener produces. + type Stream: Stream; + + /// Accepts a new incoming connection from this listener. + fn accept(&self) -> impl Future> + Send; + + /// Returns the local address that this listener is bound to. + fn local_addr(&self) -> io::Result; +} + +/// A trait for transports that can create listeners and connect to peers. +pub trait Transport: Send + Sync + 'static { + /// The type of listener that this transport produces. + type Listener: Listener; + + /// The type of stream that this transport produces. + type Stream: Stream; + + /// Creates a new listener which will be bound to the specified address. + fn bind(&self, addr: SocketAddr) -> impl Future> + Send; + + /// Opens a connection to a remote host. + fn connect(&self, addr: SocketAddr) -> impl Future> + Send; +} \ No newline at end of file diff --git a/helixdb/src/helix_transport/tokio_transport.rs b/helixdb/src/helix_transport/tokio_transport.rs new file mode 100644 index 00000000..da31a517 --- /dev/null +++ b/helixdb/src/helix_transport/tokio_transport.rs @@ -0,0 +1,38 @@ +use super::{Listener, Transport}; +use std::future::Future; +use std::io; +use std::net::SocketAddr; +use tokio::net::{TcpListener, TcpStream}; + +#[derive(Clone)] +pub struct TokioTransport; + +impl Transport for TokioTransport { + type Listener = TokioListener; + type Stream = TcpStream; + + fn bind(&self, addr: SocketAddr) -> impl Future> + Send { + async move { + let listener = TcpListener::bind(addr).await?; + Ok(TokioListener(listener)) + } + } + + fn connect(&self, addr: SocketAddr) -> impl Future> + Send { + TcpStream::connect(addr) + } +} + +pub struct TokioListener(TcpListener); + +impl Listener for TokioListener { + type Stream = TcpStream; + + fn accept(&self) -> impl Future> + Send { + self.0.accept() + } + + fn local_addr(&self) -> io::Result { + self.0.local_addr() + } +} \ No newline at end of file diff --git a/helixdb/src/lib.rs b/helixdb/src/lib.rs index 728101ee..eea32142 100644 --- a/helixdb/src/lib.rs +++ b/helixdb/src/lib.rs @@ -6,3 +6,4 @@ pub mod helixc; pub mod ingestion_engine; pub mod protocol; pub mod helix_runtime; +pub mod helix_transport; From 3de360946dbc44cda11c288f05f70e6381e40f59 Mon Sep 17 00:00:00 2001 From: skep Date: Sun, 8 Jun 2025 14:01:13 +0000 Subject: [PATCH 4/5] fix: explicit short param names --- helix-cli/src/args.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/helix-cli/src/args.rs b/helix-cli/src/args.rs index ce5d01e5..d94b14c1 100644 --- a/helix-cli/src/args.rs +++ b/helix-cli/src/args.rs @@ -69,13 +69,13 @@ pub enum CommandType { #[derive(Debug, Args)] #[clap(name = "deploy", about = "Deploy a Helix project")] pub struct DeployCommand { - #[clap(short, long, help = "The path to the project")] + #[clap(short = 'P', long, help = "The path to the project")] pub path: Option, #[clap(short, long, help = "The output path")] pub output: Option, - #[clap(short, long, help = "Port to run the instance on")] + #[clap(short = 'p', long, help = "Port to run the instance on")] pub port: Option, } From a3c25607e02c6a5fe0c95d652a1d418be0e02457 Mon Sep 17 00:00:00 2001 From: skep Date: Mon, 9 Jun 2025 19:31:10 +0000 Subject: [PATCH 5/5] chore: forward heed3 imports to be replace by fault injection engine --- helixdb/src/helix_engine/bm25/bm25.rs | 2 +- helixdb/src/helix_engine/bm25/bm25_tests.rs | 4 ++-- .../src/helix_engine/graph_core/ops/bm25/search_bm25.rs | 2 +- helixdb/src/helix_engine/graph_core/ops/g.rs | 2 +- helixdb/src/helix_engine/graph_core/ops/in_/in_.rs | 8 ++++---- helixdb/src/helix_engine/graph_core/ops/in_/in_e.rs | 8 ++++---- helixdb/src/helix_engine/graph_core/ops/in_/to_n.rs | 2 +- helixdb/src/helix_engine/graph_core/ops/out/from_n.rs | 2 +- helixdb/src/helix_engine/graph_core/ops/out/out.rs | 8 ++++---- helixdb/src/helix_engine/graph_core/ops/out/out_e.rs | 8 ++++---- helixdb/src/helix_engine/graph_core/ops/source/add_e.rs | 2 +- helixdb/src/helix_engine/graph_core/ops/source/add_n.rs | 2 +- .../src/helix_engine/graph_core/ops/source/bulk_add_e.rs | 2 +- .../src/helix_engine/graph_core/ops/source/bulk_add_n.rs | 2 +- .../src/helix_engine/graph_core/ops/source/e_from_id.rs | 2 +- .../src/helix_engine/graph_core/ops/source/e_from_type.rs | 4 ++-- .../src/helix_engine/graph_core/ops/source/n_from_id.rs | 2 +- .../helix_engine/graph_core/ops/source/n_from_index.rs | 4 ++-- .../src/helix_engine/graph_core/ops/source/n_from_type.rs | 4 ++-- helixdb/src/helix_engine/graph_core/ops/util/drop.rs | 2 +- .../src/helix_engine/graph_core/ops/util/filter_mut.rs | 4 ++-- .../src/helix_engine/graph_core/ops/util/filter_ref.rs | 2 +- helixdb/src/helix_engine/graph_core/ops/util/map.rs | 2 +- helixdb/src/helix_engine/graph_core/ops/util/paths.rs | 2 +- .../graph_core/ops/vectors/brute_force_search.rs | 2 +- helixdb/src/helix_engine/graph_core/ops/vectors/insert.rs | 2 +- helixdb/src/helix_engine/graph_core/ops/vectors/search.rs | 2 +- helixdb/src/helix_engine/graph_core/traversal_iter.rs | 2 +- helixdb/src/helix_engine/storage_core/storage_core.rs | 4 ++-- helixdb/src/helix_engine/storage_core/storage_methods.rs | 2 +- helixdb/src/helix_engine/types.rs | 2 +- helixdb/src/helix_engine/vector_core/hnsw.rs | 2 +- helixdb/src/helix_engine/vector_core/hnsw_tests.rs | 2 +- helixdb/src/helix_engine/vector_core/vector_core.rs | 2 +- helixdb/src/helix_gateway/mcp/mcp.rs | 2 +- helixdb/src/helix_gateway/mcp/tools.rs | 2 +- helixdb/src/helix_storage.rs | 3 +++ helixdb/src/helixc/generator/utils.rs | 2 +- helixdb/src/lib.rs | 1 + 39 files changed, 59 insertions(+), 55 deletions(-) create mode 100644 helixdb/src/helix_storage.rs diff --git a/helixdb/src/helix_engine/bm25/bm25.rs b/helixdb/src/helix_engine/bm25/bm25.rs index 96038823..e648cd57 100644 --- a/helixdb/src/helix_engine/bm25/bm25.rs +++ b/helixdb/src/helix_engine/bm25/bm25.rs @@ -1,4 +1,4 @@ -use heed3::{types::*, Database, Env, RoTxn, RwTxn}; +use crate::helix_storage::heed3::{types::*, Database, Env, RoTxn, RwTxn}; use serde::{Deserialize, Serialize}; use std::{borrow::Cow, collections::HashMap, sync::Arc}; diff --git a/helixdb/src/helix_engine/bm25/bm25_tests.rs b/helixdb/src/helix_engine/bm25/bm25_tests.rs index 2d7a52d0..e8163c1e 100644 --- a/helixdb/src/helix_engine/bm25/bm25_tests.rs +++ b/helixdb/src/helix_engine/bm25/bm25_tests.rs @@ -5,7 +5,7 @@ mod tests { storage_core::storage_core::HelixGraphStorage, graph_core::config::Config, }; - use heed3::{EnvOpenOptions, Env}; + use crate::helix_storage::heed3::{EnvOpenOptions, Env}; use tempfile::tempdir; fn setup_test_env() -> (Env, tempfile::TempDir) { @@ -496,4 +496,4 @@ mod tests { wtxn.commit().unwrap(); } -} \ No newline at end of file +} diff --git a/helixdb/src/helix_engine/graph_core/ops/bm25/search_bm25.rs b/helixdb/src/helix_engine/graph_core/ops/bm25/search_bm25.rs index 7e33c301..dc3cfa1c 100644 --- a/helixdb/src/helix_engine/graph_core/ops/bm25/search_bm25.rs +++ b/helixdb/src/helix_engine/graph_core/ops/bm25/search_bm25.rs @@ -1,4 +1,4 @@ -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; use super::super::tr_val::TraversalVal; use crate::helix_engine::{ diff --git a/helixdb/src/helix_engine/graph_core/ops/g.rs b/helixdb/src/helix_engine/graph_core/ops/g.rs index dd722305..3a89781d 100644 --- a/helixdb/src/helix_engine/graph_core/ops/g.rs +++ b/helixdb/src/helix_engine/graph_core/ops/g.rs @@ -4,7 +4,7 @@ use crate::helix_engine::{ storage_core::storage_core::HelixGraphStorage, types::GraphError, }; -use heed3::{RoTxn, RwTxn}; +use crate::helix_storage::heed3::{RoTxn, RwTxn}; use std::sync::Arc; pub struct G {} diff --git a/helixdb/src/helix_engine/graph_core/ops/in_/in_.rs b/helixdb/src/helix_engine/graph_core/ops/in_/in_.rs index 238ef1dd..b37fc420 100644 --- a/helixdb/src/helix_engine/graph_core/ops/in_/in_.rs +++ b/helixdb/src/helix_engine/graph_core/ops/in_/in_.rs @@ -12,15 +12,15 @@ use crate::{ }, protocol::label_hash::hash_label, }; -use heed3::{types::Bytes, RoTxn}; +use crate::helix_storage::heed3::{types::Bytes, RoTxn}; use std::sync::Arc; pub struct InNodesIterator<'a, T> { - pub iter: heed3::RoIter< + pub iter: crate::helix_storage::heed3::RoIter< 'a, Bytes, - heed3::types::LazyDecode, - heed3::iteration_method::MoveOnCurrentKeyDuplicates, + crate::helix_storage::heed3::types::LazyDecode, + crate::helix_storage::heed3::iteration_method::MoveOnCurrentKeyDuplicates, >, pub storage: Arc, pub txn: &'a T, diff --git a/helixdb/src/helix_engine/graph_core/ops/in_/in_e.rs b/helixdb/src/helix_engine/graph_core/ops/in_/in_e.rs index 850a4e5d..6ba584a1 100644 --- a/helixdb/src/helix_engine/graph_core/ops/in_/in_e.rs +++ b/helixdb/src/helix_engine/graph_core/ops/in_/in_e.rs @@ -9,15 +9,15 @@ use crate::{ }, protocol::label_hash::hash_label, }; -use heed3::{types::Bytes, RoTxn}; +use crate::helix_storage::heed3::{types::Bytes, RoTxn}; use std::sync::Arc; pub struct InEdgesIterator<'a, T> { - pub iter: heed3::RoIter< + pub iter: crate::helix_storage::heed3::RoIter< 'a, Bytes, - heed3::types::LazyDecode, - heed3::iteration_method::MoveOnCurrentKeyDuplicates, + crate::helix_storage::heed3::types::LazyDecode, + crate::helix_storage::heed3::iteration_method::MoveOnCurrentKeyDuplicates, >, pub storage: Arc, pub txn: &'a T, diff --git a/helixdb/src/helix_engine/graph_core/ops/in_/to_n.rs b/helixdb/src/helix_engine/graph_core/ops/in_/to_n.rs index 3e590700..e549fa5e 100644 --- a/helixdb/src/helix_engine/graph_core/ops/in_/to_n.rs +++ b/helixdb/src/helix_engine/graph_core/ops/in_/to_n.rs @@ -3,7 +3,7 @@ use crate::helix_engine::{ storage_core::{storage_core::HelixGraphStorage, storage_methods::StorageMethods}, types::GraphError, }; -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; use std::sync::Arc; pub struct ToNIterator<'a, I, T> { diff --git a/helixdb/src/helix_engine/graph_core/ops/out/from_n.rs b/helixdb/src/helix_engine/graph_core/ops/out/from_n.rs index a5dd8e2d..514b7e79 100644 --- a/helixdb/src/helix_engine/graph_core/ops/out/from_n.rs +++ b/helixdb/src/helix_engine/graph_core/ops/out/from_n.rs @@ -3,7 +3,7 @@ use crate::helix_engine::{ storage_core::{storage_core::HelixGraphStorage, storage_methods::StorageMethods}, types::GraphError, }; -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; use std::sync::Arc; pub struct FromNIterator<'a, I, T> { diff --git a/helixdb/src/helix_engine/graph_core/ops/out/out.rs b/helixdb/src/helix_engine/graph_core/ops/out/out.rs index b402945f..58f8a7c8 100644 --- a/helixdb/src/helix_engine/graph_core/ops/out/out.rs +++ b/helixdb/src/helix_engine/graph_core/ops/out/out.rs @@ -12,15 +12,15 @@ use crate::{ }, protocol::label_hash::hash_label, }; -use heed3::{types::Bytes, RoTxn, WithTls}; +use crate::helix_storage::heed3::{types::Bytes, RoTxn, WithTls}; use std::sync::Arc; pub struct OutNodesIterator<'a, T> { - pub iter: heed3::RoIter< + pub iter: crate::helix_storage::heed3::RoIter< 'a, Bytes, - heed3::types::LazyDecode, - heed3::iteration_method::MoveOnCurrentKeyDuplicates, + crate::helix_storage::heed3::types::LazyDecode, + crate::helix_storage::heed3::iteration_method::MoveOnCurrentKeyDuplicates, >, pub storage: Arc, pub edge_type: &'a EdgeType, diff --git a/helixdb/src/helix_engine/graph_core/ops/out/out_e.rs b/helixdb/src/helix_engine/graph_core/ops/out/out_e.rs index 1634729a..f7585cc3 100644 --- a/helixdb/src/helix_engine/graph_core/ops/out/out_e.rs +++ b/helixdb/src/helix_engine/graph_core/ops/out/out_e.rs @@ -9,15 +9,15 @@ use crate::{ }, protocol::label_hash::hash_label, }; -use heed3::{types::Bytes, RoTxn}; +use crate::helix_storage::heed3::{types::Bytes, RoTxn}; use std::sync::Arc; pub struct OutEdgesIterator<'a, T> { - pub iter: heed3::RoIter< + pub iter: crate::helix_storage::heed3::RoIter< 'a, Bytes, - heed3::types::LazyDecode, - heed3::iteration_method::MoveOnCurrentKeyDuplicates, + crate::helix_storage::heed3::types::LazyDecode, + crate::helix_storage::heed3::iteration_method::MoveOnCurrentKeyDuplicates, >, pub storage: Arc, pub txn: &'a T, diff --git a/helixdb/src/helix_engine/graph_core/ops/source/add_e.rs b/helixdb/src/helix_engine/graph_core/ops/source/add_e.rs index 70c387f2..0a18272d 100644 --- a/helixdb/src/helix_engine/graph_core/ops/source/add_e.rs +++ b/helixdb/src/helix_engine/graph_core/ops/source/add_e.rs @@ -12,7 +12,7 @@ use crate::{ value::Value, }, }; -use heed3::PutFlags; +use crate::helix_storage::heed3::PutFlags; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] diff --git a/helixdb/src/helix_engine/graph_core/ops/source/add_n.rs b/helixdb/src/helix_engine/graph_core/ops/source/add_n.rs index af2f04d1..35a0c150 100644 --- a/helixdb/src/helix_engine/graph_core/ops/source/add_n.rs +++ b/helixdb/src/helix_engine/graph_core/ops/source/add_n.rs @@ -11,7 +11,7 @@ use crate::{ value::Value, }, }; -use heed3::PutFlags; +use crate::helix_storage::heed3::PutFlags; pub struct AddNIterator { inner: std::iter::Once>, diff --git a/helixdb/src/helix_engine/graph_core/ops/source/bulk_add_e.rs b/helixdb/src/helix_engine/graph_core/ops/source/bulk_add_e.rs index 027c5bcd..df2ab074 100644 --- a/helixdb/src/helix_engine/graph_core/ops/source/bulk_add_e.rs +++ b/helixdb/src/helix_engine/graph_core/ops/source/bulk_add_e.rs @@ -6,7 +6,7 @@ use crate::{ }, protocol::{items::Edge, label_hash::hash_label}, }; -use heed3::PutFlags; +use crate::helix_storage::heed3::PutFlags; pub struct BulkAddE { inner: std::iter::Once>, diff --git a/helixdb/src/helix_engine/graph_core/ops/source/bulk_add_n.rs b/helixdb/src/helix_engine/graph_core/ops/source/bulk_add_n.rs index 759b54d0..c0d3c4a5 100644 --- a/helixdb/src/helix_engine/graph_core/ops/source/bulk_add_n.rs +++ b/helixdb/src/helix_engine/graph_core/ops/source/bulk_add_n.rs @@ -2,7 +2,7 @@ use crate::{ helix_engine::{graph_core::traversal_iter::RwTraversalIterator, types::GraphError}, protocol::items::Node, }; -use heed3::PutFlags; +use crate::helix_storage::heed3::PutFlags; use super::super::tr_val::TraversalVal; pub struct BulkAddN { diff --git a/helixdb/src/helix_engine/graph_core/ops/source/e_from_id.rs b/helixdb/src/helix_engine/graph_core/ops/source/e_from_id.rs index c80b63d5..28014bbd 100644 --- a/helixdb/src/helix_engine/graph_core/ops/source/e_from_id.rs +++ b/helixdb/src/helix_engine/graph_core/ops/source/e_from_id.rs @@ -6,7 +6,7 @@ use crate::{ }, protocol::items::Edge, }; -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; use std::{iter::Once, sync::Arc}; pub struct EFromId<'a, T> { diff --git a/helixdb/src/helix_engine/graph_core/ops/source/e_from_type.rs b/helixdb/src/helix_engine/graph_core/ops/source/e_from_type.rs index 3bf77b5b..854eb3c3 100644 --- a/helixdb/src/helix_engine/graph_core/ops/source/e_from_type.rs +++ b/helixdb/src/helix_engine/graph_core/ops/source/e_from_type.rs @@ -5,13 +5,13 @@ use crate::{ }, protocol::items::Edge, }; -use heed3::{ +use crate::helix_storage::heed3::{ byteorder::BE, types::{Bytes, U128}, }; pub struct EFromType<'a> { - pub iter: heed3::RoIter<'a, U128, heed3::types::LazyDecode>, + pub iter: crate::helix_storage::heed3::RoIter<'a, U128, crate::helix_storage::heed3::types::LazyDecode>, pub label: &'a str, } diff --git a/helixdb/src/helix_engine/graph_core/ops/source/n_from_id.rs b/helixdb/src/helix_engine/graph_core/ops/source/n_from_id.rs index 66580018..92ed4200 100644 --- a/helixdb/src/helix_engine/graph_core/ops/source/n_from_id.rs +++ b/helixdb/src/helix_engine/graph_core/ops/source/n_from_id.rs @@ -6,7 +6,7 @@ use crate::{ }, protocol::items::Node, }; -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; use std::{iter::Once, sync::Arc}; pub struct NFromId<'a, T> { diff --git a/helixdb/src/helix_engine/graph_core/ops/source/n_from_index.rs b/helixdb/src/helix_engine/graph_core/ops/source/n_from_index.rs index 7d8eba60..bda60b16 100644 --- a/helixdb/src/helix_engine/graph_core/ops/source/n_from_index.rs +++ b/helixdb/src/helix_engine/graph_core/ops/source/n_from_index.rs @@ -6,13 +6,13 @@ use crate::{ }, protocol::{items::Node, value::Value}, }; -use heed3::{byteorder::BE, types::Bytes, RoTxn}; +use crate::helix_storage::heed3::{byteorder::BE, types::Bytes, RoTxn}; use serde::Serialize; use std::{iter::Once, sync::Arc}; pub struct NFromIndex<'a> { iter: - heed3::RoPrefix<'a, heed3::types::Bytes, heed3::types::LazyDecode>>, + crate::helix_storage::heed3::RoPrefix<'a, crate::helix_storage::heed3::types::Bytes, crate::helix_storage::heed3::types::LazyDecode>>, txn: &'a RoTxn<'a>, storage: Arc, } diff --git a/helixdb/src/helix_engine/graph_core/ops/source/n_from_type.rs b/helixdb/src/helix_engine/graph_core/ops/source/n_from_type.rs index b3f685fa..2787c1e9 100644 --- a/helixdb/src/helix_engine/graph_core/ops/source/n_from_type.rs +++ b/helixdb/src/helix_engine/graph_core/ops/source/n_from_type.rs @@ -5,13 +5,13 @@ use crate::{ }, protocol::items::Node, }; -use heed3::{ +use crate::helix_storage::heed3::{ byteorder::BE, types::{Bytes, U128}, }; pub struct NFromType<'a> { - pub iter: heed3::RoIter<'a, U128, heed3::types::LazyDecode>, + pub iter: crate::helix_storage::heed3::RoIter<'a, U128, crate::helix_storage::heed3::types::LazyDecode>, pub label: &'a str, } diff --git a/helixdb/src/helix_engine/graph_core/ops/util/drop.rs b/helixdb/src/helix_engine/graph_core/ops/util/drop.rs index c0a25d0e..44cac643 100644 --- a/helixdb/src/helix_engine/graph_core/ops/util/drop.rs +++ b/helixdb/src/helix_engine/graph_core/ops/util/drop.rs @@ -3,7 +3,7 @@ use crate::helix_engine::{ storage_core::{storage_core::HelixGraphStorage, storage_methods::StorageMethods}, types::GraphError, }; -use heed3::RwTxn; +use crate::helix_storage::heed3::RwTxn; use std::sync::Arc; pub struct Drop { diff --git a/helixdb/src/helix_engine/graph_core/ops/util/filter_mut.rs b/helixdb/src/helix_engine/graph_core/ops/util/filter_mut.rs index 1be07fa5..0541ada2 100644 --- a/helixdb/src/helix_engine/graph_core/ops/util/filter_mut.rs +++ b/helixdb/src/helix_engine/graph_core/ops/util/filter_mut.rs @@ -1,4 +1,4 @@ -use heed3::RwTxn; +use crate::helix_storage::heed3::RwTxn; use crate::helix_engine::{ graph_core::ops::tr_val::TraversalVal, @@ -27,4 +27,4 @@ where None => None, } } -} \ No newline at end of file +} diff --git a/helixdb/src/helix_engine/graph_core/ops/util/filter_ref.rs b/helixdb/src/helix_engine/graph_core/ops/util/filter_ref.rs index d0876f0f..933cf85c 100644 --- a/helixdb/src/helix_engine/graph_core/ops/util/filter_ref.rs +++ b/helixdb/src/helix_engine/graph_core/ops/util/filter_ref.rs @@ -1,7 +1,7 @@ use crate::helix_engine::{graph_core::traversal_iter::RoTraversalIterator, types::GraphError}; use super::super::tr_val::TraversalVal; -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; pub struct FilterRef<'a, I, F> { iter: I, diff --git a/helixdb/src/helix_engine/graph_core/ops/util/map.rs b/helixdb/src/helix_engine/graph_core/ops/util/map.rs index 93a090b8..60be882b 100644 --- a/helixdb/src/helix_engine/graph_core/ops/util/map.rs +++ b/helixdb/src/helix_engine/graph_core/ops/util/map.rs @@ -4,7 +4,7 @@ use crate::helix_engine::{ }; use super::super::tr_val::TraversalVal; -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; pub struct Map<'a, I, F> { iter: I, diff --git a/helixdb/src/helix_engine/graph_core/ops/util/paths.rs b/helixdb/src/helix_engine/graph_core/ops/util/paths.rs index 661f2687..77178276 100644 --- a/helixdb/src/helix_engine/graph_core/ops/util/paths.rs +++ b/helixdb/src/helix_engine/graph_core/ops/util/paths.rs @@ -6,7 +6,7 @@ use crate::{ }, protocol::{items::Edge, label_hash::hash_label}, }; -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; use std::{ collections::{HashMap, HashSet, VecDeque}, sync::Arc, diff --git a/helixdb/src/helix_engine/graph_core/ops/vectors/brute_force_search.rs b/helixdb/src/helix_engine/graph_core/ops/vectors/brute_force_search.rs index 18442c23..6e9db9a9 100644 --- a/helixdb/src/helix_engine/graph_core/ops/vectors/brute_force_search.rs +++ b/helixdb/src/helix_engine/graph_core/ops/vectors/brute_force_search.rs @@ -1,4 +1,4 @@ -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; use super::super::tr_val::TraversalVal; use crate::helix_engine::{ diff --git a/helixdb/src/helix_engine/graph_core/ops/vectors/insert.rs b/helixdb/src/helix_engine/graph_core/ops/vectors/insert.rs index adf7aef2..9477b89e 100644 --- a/helixdb/src/helix_engine/graph_core/ops/vectors/insert.rs +++ b/helixdb/src/helix_engine/graph_core/ops/vectors/insert.rs @@ -1,4 +1,4 @@ -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; use super::super::tr_val::TraversalVal; use crate::{ diff --git a/helixdb/src/helix_engine/graph_core/ops/vectors/search.rs b/helixdb/src/helix_engine/graph_core/ops/vectors/search.rs index 48ab94ff..9baa9d25 100644 --- a/helixdb/src/helix_engine/graph_core/ops/vectors/search.rs +++ b/helixdb/src/helix_engine/graph_core/ops/vectors/search.rs @@ -1,4 +1,4 @@ -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; use super::super::tr_val::TraversalVal; use crate::helix_engine::{ diff --git a/helixdb/src/helix_engine/graph_core/traversal_iter.rs b/helixdb/src/helix_engine/graph_core/traversal_iter.rs index bea91977..c7ecc7fc 100644 --- a/helixdb/src/helix_engine/graph_core/traversal_iter.rs +++ b/helixdb/src/helix_engine/graph_core/traversal_iter.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use heed3::{RoTxn, RwTxn, WithTls}; +use crate::helix_storage::heed3::{RoTxn, RwTxn, WithTls}; use super::ops::tr_val::TraversalVal; use crate::helix_engine::{storage_core::storage_core::HelixGraphStorage, types::GraphError}; diff --git a/helixdb/src/helix_engine/storage_core/storage_core.rs b/helixdb/src/helix_engine/storage_core/storage_core.rs index 1b66f3cb..744e1024 100644 --- a/helixdb/src/helix_engine/storage_core/storage_core.rs +++ b/helixdb/src/helix_engine/storage_core/storage_core.rs @@ -18,8 +18,8 @@ use crate::{ }, }; -use heed3::byteorder::BE; -use heed3::{types::*, Database, DatabaseFlags, Env, EnvOpenOptions, RoTxn, RwTxn, WithTls}; +use crate::helix_storage::heed3::byteorder::BE; +use crate::helix_storage::heed3::{types::*, Database, DatabaseFlags, Env, EnvOpenOptions, RoTxn, RwTxn, WithTls}; use std::collections::HashMap; use std::fs; use std::path::Path; diff --git a/helixdb/src/helix_engine/storage_core/storage_methods.rs b/helixdb/src/helix_engine/storage_core/storage_methods.rs index ddf205f5..69fc1b52 100644 --- a/helixdb/src/helix_engine/storage_core/storage_methods.rs +++ b/helixdb/src/helix_engine/storage_core/storage_methods.rs @@ -3,7 +3,7 @@ use crate::protocol::{ items::{Edge, Node}, value::Value, }; -use heed3::{RoTxn, RwTxn}; +use crate::helix_storage::heed3::{RoTxn, RwTxn}; pub trait DBMethods { /// Creates a new database with a given name for a secondary index diff --git a/helixdb/src/helix_engine/types.rs b/helixdb/src/helix_engine/types.rs index 26fd59eb..6378be49 100644 --- a/helixdb/src/helix_engine/types.rs +++ b/helixdb/src/helix_engine/types.rs @@ -3,7 +3,7 @@ use crate::{ protocol::traversal_value::TraversalValueError, }; use core::fmt; -use heed3::Error as HeedError; +use crate::helix_storage::heed3::Error as HeedError; use sonic_rs::Error as SonicError; use std::{ net::AddrParseError, diff --git a/helixdb/src/helix_engine/vector_core/hnsw.rs b/helixdb/src/helix_engine/vector_core/hnsw.rs index 27e4d0eb..9742a5d1 100644 --- a/helixdb/src/helix_engine/vector_core/hnsw.rs +++ b/helixdb/src/helix_engine/vector_core/hnsw.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use crate::{helix_engine::types::VectorError, protocol::value::Value}; use crate::helix_engine::vector_core::vector::HVector; -use heed3::{RoTxn, RwTxn}; +use crate::helix_storage::heed3::{RoTxn, RwTxn}; pub trait HNSW { diff --git a/helixdb/src/helix_engine/vector_core/hnsw_tests.rs b/helixdb/src/helix_engine/vector_core/hnsw_tests.rs index fe376be0..b0502cbe 100644 --- a/helixdb/src/helix_engine/vector_core/hnsw_tests.rs +++ b/helixdb/src/helix_engine/vector_core/hnsw_tests.rs @@ -16,7 +16,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use heed3::{RwTxn, RoTxn}; +use crate::helix_storage::heed3::{RwTxn, RoTxn}; use rand::seq::SliceRandom; use polars::prelude::*; use kdam::tqdm; diff --git a/helixdb/src/helix_engine/vector_core/vector_core.rs b/helixdb/src/helix_engine/vector_core/vector_core.rs index dabadcbc..d5627df7 100644 --- a/helixdb/src/helix_engine/vector_core/vector_core.rs +++ b/helixdb/src/helix_engine/vector_core/vector_core.rs @@ -3,7 +3,7 @@ use crate::helix_engine::{ vector_core::{hnsw::HNSW, vector::HVector}, }; use crate::protocol::value::Value; -use heed3::{ +use crate::helix_storage::heed3::{ types::{Bytes, Unit}, Database, Env, RoTxn, RwTxn, }; diff --git a/helixdb/src/helix_gateway/mcp/mcp.rs b/helixdb/src/helix_gateway/mcp/mcp.rs index fa9b8bec..bb655cd5 100644 --- a/helixdb/src/helix_gateway/mcp/mcp.rs +++ b/helixdb/src/helix_gateway/mcp/mcp.rs @@ -10,7 +10,7 @@ use std::{ }; use get_routes::{local_handler, mcp_handler}; -use heed3::{AnyTls, RoTxn}; +use crate::helix_storage::heed3::{AnyTls, RoTxn}; use serde::Deserialize; use crate::{ diff --git a/helixdb/src/helix_gateway/mcp/tools.rs b/helixdb/src/helix_gateway/mcp/tools.rs index 52a8eee7..03e8a00a 100644 --- a/helixdb/src/helix_gateway/mcp/tools.rs +++ b/helixdb/src/helix_gateway/mcp/tools.rs @@ -14,7 +14,7 @@ use crate::helix_gateway::router::router::HandlerInput; use crate::protocol::label_hash::hash_label; use crate::protocol::response::Response; use get_routes::local_handler; -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; use serde::{Deserialize, Deserializer}; use std::collections::HashMap; use std::sync::Arc; diff --git a/helixdb/src/helix_storage.rs b/helixdb/src/helix_storage.rs new file mode 100644 index 00000000..d97182c3 --- /dev/null +++ b/helixdb/src/helix_storage.rs @@ -0,0 +1,3 @@ +pub mod heed3 { + pub use ::heed3::*; +} \ No newline at end of file diff --git a/helixdb/src/helixc/generator/utils.rs b/helixdb/src/helixc/generator/utils.rs index 84f52737..d99a2839 100644 --- a/helixdb/src/helixc/generator/utils.rs +++ b/helixdb/src/helixc/generator/utils.rs @@ -303,7 +303,7 @@ impl Separator { pub fn write_headers() -> String { r#" -use heed3::RoTxn; +use crate::helix_storage::heed3::RoTxn; use get_routes::handler; use helixdb::{field_remapping, identifier_remapping, traversal_remapping, exclude_field}; use helixdb::helix_engine::vector_core::vector::HVector; diff --git a/helixdb/src/lib.rs b/helixdb/src/lib.rs index eea32142..c93bd7c9 100644 --- a/helixdb/src/lib.rs +++ b/helixdb/src/lib.rs @@ -7,3 +7,4 @@ pub mod ingestion_engine; pub mod protocol; pub mod helix_runtime; pub mod helix_transport; +pub mod helix_storage;