Skip to content

Commit 6c84866

Browse files
merge schema for tables in query
1 parent 712c7e2 commit 6c84866

File tree

5 files changed

+28
-17
lines changed

5 files changed

+28
-17
lines changed

src/alerts/alerts_utils.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use tracing::{trace, warn};
3434

3535
use crate::{
3636
alerts::LogicalOperator,
37+
handlers::http::query::update_schema_when_distributed,
3738
parseable::PARSEABLE,
3839
query::{resolve_stream_names, QUERY_SESSION},
3940
utils::time::TimeRange,
@@ -117,12 +118,18 @@ async fn execute_base_query(
117118
query: &crate::query::Query,
118119
original_query: &str,
119120
) -> Result<DataFrame, AlertError> {
120-
let streams = resolve_stream_names(&original_query)?;
121+
let streams = resolve_stream_names(original_query)?;
121122
let stream_name = streams.first().ok_or_else(|| {
122123
AlertError::CustomError(format!("Table name not found in query- {original_query}"))
123124
})?;
124-
125-
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
125+
update_schema_when_distributed(&streams)
126+
.await
127+
.map_err(|err| {
128+
AlertError::CustomError(format!(
129+
"Failed to update schema for distributed streams: {err}"
130+
))
131+
})?;
132+
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
126133
query
127134
.get_dataframe(time_partition.as_ref())
128135
.await

src/handlers/airplane.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,7 @@ impl FlightService for AirServiceImpl {
143143
// create a visitor to extract the table name
144144

145145
let stream_name = streams
146-
.iter()
147-
.next()
146+
.first()
148147
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
149148
.to_owned();
150149

@@ -199,9 +198,11 @@ impl FlightService for AirServiceImpl {
199198

200199
let permissions = Users.get_permissions(&key);
201200

202-
user_auth_for_datasets(&permissions, &streams).await.map_err(|_| {
203-
Status::permission_denied("User Does not have permission to access this")
204-
})?;
201+
user_auth_for_datasets(&permissions, &streams)
202+
.await
203+
.map_err(|_| {
204+
Status::permission_denied("User Does not have permission to access this")
205+
})?;
205206
let time = Instant::now();
206207

207208
let (records, _) = execute(query, &stream_name, false)

src/handlers/http/query.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ use crate::event::commit_schema;
4646
use crate::metrics::QUERY_EXECUTE_TIME;
4747
use crate::parseable::{StreamNotFound, PARSEABLE};
4848
use crate::query::error::ExecuteError;
49-
use crate::query::{resolve_stream_names, QUERY_SESSION};
5049
use crate::query::{execute, CountsRequest, Query as LogicalQuery};
50+
use crate::query::{resolve_stream_names, QUERY_SESSION};
5151
use crate::rbac::Users;
5252
use crate::response::QueryResponse;
5353
use crate::storage::ObjectStorageError;
@@ -95,8 +95,8 @@ pub async fn get_records_and_fields(
9595
.first()
9696
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
9797
user_auth_for_datasets(&permissions, &tables).await?;
98-
99-
let (records, fields) = execute(query, &table_name, false).await?;
98+
update_schema_when_distributed(&tables).await?;
99+
let (records, fields) = execute(query, table_name, false).await?;
100100

101101
let records = match records {
102102
Either::Left(vec_rb) => vec_rb,
@@ -122,25 +122,25 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
122122
.first()
123123
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
124124
user_auth_for_datasets(&permissions, &tables).await?;
125-
125+
update_schema_when_distributed(&tables).await?;
126126
let time = Instant::now();
127127

128128
// if the query is `select count(*) from <dataset>`
129129
// we use the `get_bin_density` method to get the count of records in the dataset
130130
// instead of executing the query using datafusion
131131
if let Some(column_name) = query.is_logical_plan_count_without_filters() {
132-
return handle_count_query(&query_request, &table_name, column_name, time).await;
132+
return handle_count_query(&query_request, table_name, column_name, time).await;
133133
}
134134

135135
// if the query request has streaming = false (default)
136136
// we use datafusion's `execute` method to get the records
137137
if !query_request.streaming {
138-
return handle_non_streaming_query(query, &table_name, &query_request, time).await;
138+
return handle_non_streaming_query(query, table_name, &query_request, time).await;
139139
}
140140

141141
// if the query request has streaming = true
142142
// we use datafusion's `execute_stream` method to get the records
143-
handle_streaming_query(query, &table_name, &query_request, time).await
143+
handle_streaming_query(query, table_name, &query_request, time).await
144144
}
145145

146146
/// Handles count queries (e.g., `SELECT COUNT(*) FROM <dataset-name>`)

src/users/filters.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,10 @@ impl Filters {
193193
}
194194
} else if *filter_type == FilterType::Search || *filter_type == FilterType::Filter {
195195
let dataset_name = &f.stream_name;
196-
if user_auth_for_datasets(&permissions, &[dataset_name.to_string()]).await.is_ok() {
196+
if user_auth_for_datasets(&permissions, &[dataset_name.to_string()])
197+
.await
198+
.is_ok()
199+
{
197200
filters.push(f.clone())
198201
}
199202
}

src/utils/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ pub async fn user_auth_for_datasets(
104104
break;
105105
}
106106
Permission::Resource(Action::Query, ParseableResourceType::Stream(stream)) => {
107-
if !PARSEABLE.check_or_load_stream(&stream).await {
107+
if !PARSEABLE.check_or_load_stream(stream).await {
108108
return Err(actix_web::error::ErrorUnauthorized(format!(
109109
"Stream not found: {stream}"
110110
)));

0 commit comments

Comments
 (0)