Skip to content

Commit 5289e41

Browse files
authored
Enable staged data querying (#371)
1 parent 4319205 commit 5289e41

File tree

5 files changed

+193
-8
lines changed

5 files changed

+193
-8
lines changed

server/src/event/writer.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,19 @@ impl WriterTable {
150150
}
151151
}
152152
}
153+
154+
pub fn clone_read_buf(&self, stream_name: &str) -> Option<ReadBuf> {
155+
let hashmap_guard = self.read().unwrap();
156+
let (writer, context) = hashmap_guard.get(stream_name)?;
157+
let writer = writer.lock().unwrap();
158+
match &*writer {
159+
StreamWriter::Mem(mem) => Some(ReadBuf {
160+
time: context.time,
161+
buf: mem.recordbatch_cloned(),
162+
}),
163+
StreamWriter::Disk(_) => None,
164+
}
165+
}
153166
}
154167

155168
pub mod errors {

server/src/event/writer/mem_writer.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ impl<const N: usize> MemWriter<N> {
4646
self.mutable_buffer.push(rb)
4747
}
4848

49-
#[allow(unused)]
5049
pub fn recordbatch_cloned(&self) -> Vec<RecordBatch> {
5150
let mut read_buffer = self.read_buffer.clone();
5251
let rb = self.mutable_buffer.recordbatch_cloned();

server/src/query.rs

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*
1717
*/
1818

19+
pub mod table_provider;
20+
1921
use chrono::TimeZone;
2022
use chrono::{DateTime, Utc};
2123
use datafusion::arrow::datatypes::Schema;
@@ -27,13 +29,16 @@ use serde_json::Value;
2729
use std::path::Path;
2830
use std::sync::Arc;
2931

32+
use crate::event::STREAM_WRITERS;
3033
use crate::option::CONFIG;
34+
use crate::storage::staging::{ReadBuf, MEMORY_READ_BUFFERS};
3135
use crate::storage::ObjectStorageError;
3236
use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY};
3337
use crate::utils::TimePeriod;
3438
use crate::validator;
3539

3640
use self::error::{ExecuteError, ParseError};
41+
use self::table_provider::QueryTableProvider;
3742

3843
type Key = &'static str;
3944
fn get_value(value: &Value, key: Key) -> Result<&str, Key> {
@@ -73,10 +78,6 @@ impl Query {
7378
.collect()
7479
}
7580

76-
pub fn get_schema(&self) -> &Schema {
77-
&self.schema
78-
}
79-
8081
/// Execute query on object storage(and if necessary on cache as well) with given stream information
8182
/// TODO: find a way to query all selected parquet files together in a single context.
8283
pub async fn execute(
@@ -88,9 +89,13 @@ impl Query {
8889
CONFIG.storage().get_datafusion_runtime(),
8990
);
9091

91-
let Some(table) = storage.query_table(self.get_prefixes(), Arc::new(self.get_schema().clone()))? else {
92-
return Ok((Vec::new(), Vec::new()));
93-
};
92+
let prefixes = self.get_prefixes();
93+
let table = QueryTableProvider::new(
94+
prefixes,
95+
storage,
96+
get_all_read_buf(&self.stream_name, self.start, self.end),
97+
Arc::clone(&self.schema),
98+
);
9499

95100
ctx.register_table(
96101
&*self.stream_name,
@@ -176,6 +181,30 @@ pub mod error {
176181
}
177182
}
178183

184+
fn get_all_read_buf(stream_name: &str, start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<ReadBuf> {
185+
let now = Utc::now();
186+
let include_mutable = start <= now && now <= end;
187+
// copy from mutable buffer
188+
let mut queryable_read_buffer = Vec::new();
189+
190+
if let Some(mem) = MEMORY_READ_BUFFERS.read().unwrap().get(stream_name) {
191+
for read_buffer in mem {
192+
let time = read_buffer.time;
193+
if start.naive_utc() <= time && time <= end.naive_utc() {
194+
queryable_read_buffer.push(read_buffer.clone())
195+
}
196+
}
197+
}
198+
199+
if include_mutable {
200+
if let Some(x) = STREAM_WRITERS.clone_read_buf(stream_name) {
201+
queryable_read_buffer.push(x);
202+
}
203+
}
204+
205+
queryable_read_buffer
206+
}
207+
179208
#[cfg(test)]
180209
mod tests {
181210
use super::time_from_path;

server/src/query/table_provider.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use async_trait::async_trait;
20+
use datafusion::arrow::datatypes::{Schema, SchemaRef};
21+
use datafusion::datasource::{MemTable, TableProvider};
22+
use datafusion::error::DataFusionError;
23+
use datafusion::execution::context::SessionState;
24+
use datafusion::logical_expr::TableType;
25+
use datafusion::physical_plan::empty::EmptyExec;
26+
use datafusion::physical_plan::union::UnionExec;
27+
use datafusion::physical_plan::ExecutionPlan;
28+
use datafusion::prelude::Expr;
29+
use std::any::Any;
30+
use std::sync::Arc;
31+
32+
use crate::storage::staging::ReadBuf;
33+
use crate::storage::ObjectStorage;
34+
use crate::utils::arrow::adapt_batch;
35+
36+
pub struct QueryTableProvider {
37+
storage_prefixes: Vec<String>,
38+
storage: Arc<dyn ObjectStorage + Send>,
39+
readable_buffer: Vec<ReadBuf>,
40+
schema: Arc<Schema>,
41+
}
42+
43+
impl QueryTableProvider {
44+
pub fn new(
45+
storage_prefixes: Vec<String>,
46+
storage: Arc<dyn ObjectStorage + Send>,
47+
readable_buffer: Vec<ReadBuf>,
48+
schema: Arc<Schema>,
49+
) -> Self {
50+
Self {
51+
storage_prefixes,
52+
storage,
53+
readable_buffer,
54+
schema,
55+
}
56+
}
57+
58+
async fn create_physical_plan(
59+
&self,
60+
ctx: &SessionState,
61+
projection: Option<&Vec<usize>>,
62+
filters: &[Expr],
63+
limit: Option<usize>,
64+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
65+
let memexec = self.get_mem_exec(ctx, projection, filters, limit).await?;
66+
let table = self
67+
.storage
68+
.query_table(self.storage_prefixes.clone(), Arc::clone(&self.schema))?;
69+
70+
let mut exec = Vec::new();
71+
if let Some(memexec) = memexec {
72+
exec.push(memexec);
73+
}
74+
75+
if let Some(ref storage_listing) = table {
76+
exec.push(
77+
storage_listing
78+
.scan(ctx, projection, filters, limit)
79+
.await?,
80+
);
81+
}
82+
83+
if exec.is_empty() {
84+
Ok(Arc::new(EmptyExec::new(false, Arc::clone(&self.schema))))
85+
} else {
86+
Ok(Arc::new(UnionExec::new(exec)))
87+
}
88+
}
89+
90+
async fn get_mem_exec(
91+
&self,
92+
ctx: &SessionState,
93+
projection: Option<&Vec<usize>>,
94+
filters: &[Expr],
95+
limit: Option<usize>,
96+
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
97+
if self.readable_buffer.is_empty() {
98+
return Ok(None);
99+
}
100+
101+
let mem_records: Vec<Vec<_>> = self
102+
.readable_buffer
103+
.iter()
104+
.map(|r| {
105+
r.buf
106+
.iter()
107+
.cloned()
108+
.map(|rb| adapt_batch(&self.schema, rb))
109+
.collect()
110+
})
111+
.collect();
112+
113+
let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?;
114+
let memexec = memtable.scan(ctx, projection, filters, limit).await?;
115+
Ok(Some(memexec))
116+
}
117+
}
118+
119+
#[async_trait]
120+
impl TableProvider for QueryTableProvider {
121+
fn as_any(&self) -> &dyn Any {
122+
self
123+
}
124+
125+
fn schema(&self) -> SchemaRef {
126+
Arc::clone(&self.schema)
127+
}
128+
129+
fn table_type(&self) -> TableType {
130+
TableType::Base
131+
}
132+
133+
async fn scan(
134+
&self,
135+
ctx: &SessionState,
136+
projection: Option<&Vec<usize>>,
137+
filters: &[Expr],
138+
limit: Option<usize>,
139+
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
140+
self.create_physical_plan(ctx, projection, filters, limit)
141+
.await
142+
}
143+
}

server/src/storage/staging.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ pub fn take_all_read_bufs() -> Vec<(String, Vec<ReadBuf>)> {
6262
res
6363
}
6464

65+
#[derive(Debug, Clone)]
6566
pub struct ReadBuf {
6667
pub time: NaiveDateTime,
6768
pub buf: Vec<RecordBatch>,

0 commit comments

Comments
 (0)