Skip to content

chore: find tables from DFParser, schema merge when required #1380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 18, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ use datafusion::{
logical_expr::{BinaryExpr, Literal, Operator},
prelude::{col, lit, DataFrame, Expr},
};
use tracing::trace;
use tokio::task::JoinSet;
use tracing::{trace, warn};

use crate::{
alerts::LogicalOperator, parseable::PARSEABLE, query::QUERY_SESSION, utils::time::TimeRange,
alerts::LogicalOperator,
parseable::PARSEABLE,
query::{resolve_stream_names, QUERY_SESSION},
utils::time::TimeRange,
};

use super::{
Expand Down Expand Up @@ -71,11 +75,37 @@ async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, Alert

let session_state = QUERY_SESSION.state();
let select_query = alert.get_base_query();
let raw_logical_plan = session_state.create_logical_plan(&select_query).await?;

let time_range = TimeRange::parse_human_time(start_time, end_time)
.map_err(|err| AlertError::CustomError(err.to_string()))?;

let streams = resolve_stream_names(&select_query)?;
let raw_logical_plan = match session_state.create_logical_plan(&select_query).await {
Ok(plan) => plan,
Err(_) => {
let mut join_set = JoinSet::new();
for stream_name in streams {
let stream_name = stream_name.clone();
join_set.spawn(async move {
let result = PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await;

if let Err(e) = &result {
warn!("Failed to create stream '{}': {}", stream_name, e);
}

(stream_name, result)
});
}

while let Some(result) = join_set.join_next().await {
if let Err(join_error) = result {
warn!("Task join error: {}", join_error);
}
}
session_state.create_logical_plan(&select_query).await?
}
};
Ok(crate::query::Query {
raw_logical_plan,
time_range,
Expand All @@ -87,7 +117,8 @@ async fn execute_base_query(
query: &crate::query::Query,
original_query: &str,
) -> Result<DataFrame, AlertError> {
let stream_name = query.first_table_name().ok_or_else(|| {
let streams = resolve_stream_names(&original_query)?;
let stream_name = streams.first().ok_or_else(|| {
AlertError::CustomError(format!("Table name not found in query- {original_query}"))
})?;

Expand Down
4 changes: 4 additions & 0 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use actix_web::http::header::ContentType;
use async_trait::async_trait;
use chrono::Utc;
use datafusion::sql::sqlparser::parser::ParserError;
use derive_more::derive::FromStr;
use derive_more::FromStrError;
use http::StatusCode;
Expand Down Expand Up @@ -860,6 +861,8 @@ pub enum AlertError {
InvalidTargetModification(String),
#[error("Can't delete a Target which is being used")]
TargetInUse,
#[error("{0}")]
ParserError(#[from] ParserError),
}

impl actix_web::ResponseError for AlertError {
Expand All @@ -880,6 +883,7 @@ impl actix_web::ResponseError for AlertError {
Self::InvalidTargetID(_) => StatusCode::BAD_REQUEST,
Self::InvalidTargetModification(_) => StatusCode::BAD_REQUEST,
Self::TargetInUse => StatusCode::CONFLICT,
Self::ParserError(_) => StatusCode::BAD_REQUEST,
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Correlations {
.iter()
.map(|t| t.table_name.clone())
.collect_vec();
if user_auth_for_datasets(&permissions, tables).is_ok() {
if user_auth_for_datasets(&permissions, tables).await.is_ok() {
user_correlations.push(correlation.clone());
}
}
Expand Down Expand Up @@ -281,7 +281,7 @@ impl CorrelationConfig {
.map(|t| t.table_name.clone())
.collect_vec();

user_auth_for_datasets(&permissions, tables)?;
user_auth_for_datasets(&permissions, tables).await?;

// to validate table config, we need to check whether the mentioned fields
// are present in the table or not
Expand Down
35 changes: 10 additions & 25 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use arrow_array::RecordBatch;
use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_flight::PollInfo;
use arrow_schema::ArrowError;

use datafusion::common::tree_node::TreeNode;
use serde_json::json;
use std::net::SocketAddr;
use std::time::Instant;
Expand All @@ -35,11 +33,11 @@ use tonic_web::GrpcWebLayer;

use crate::handlers::http::cluster::get_node_info;
use crate::handlers::http::modal::{NodeMetadata, NodeType};
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
use crate::handlers::http::query::into_query;
use crate::handlers::livetail::cross_origin_config;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::parseable::PARSEABLE;
use crate::query::{execute, TableScanVisitor, QUERY_SESSION};
use crate::query::{execute, resolve_stream_names, QUERY_SESSION};
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
Expand Down Expand Up @@ -131,40 +129,27 @@ impl FlightService for AirServiceImpl {

let ticket =
get_query_from_ticket(&req).map_err(|e| Status::invalid_argument(e.to_string()))?;

let streams = resolve_stream_names(&ticket.query).map_err(|e| {
error!("Failed to extract table names from SQL: {}", e);
Status::invalid_argument("Invalid SQL query syntax")
})?;
info!("query requested to airplane: {:?}", ticket);

// get the query session_state
let session_state = QUERY_SESSION.state();

// get the logical plan and extract the table name
let raw_logical_plan = session_state
.create_logical_plan(&ticket.query)
.await
.map_err(|err| {
error!("Datafusion Error: Failed to create logical plan: {}", err);
Status::internal("Failed to create logical plan")
})?;

let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time)
.map_err(|e| Status::internal(e.to_string()))?;
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let streams = visitor.into_inner();

let stream_name = streams
.first()
.iter()
.next()
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
.to_owned();

update_schema_when_distributed(&streams)
.await
.map_err(|err| Status::internal(err.to_string()))?;

// map payload to query
let query = into_query(&ticket, &session_state, time_range)
let query = into_query(&ticket, &session_state, time_range, &streams)
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

Expand Down Expand Up @@ -214,7 +199,7 @@ impl FlightService for AirServiceImpl {

let permissions = Users.get_permissions(&key);

user_auth_for_datasets(&permissions, &streams).map_err(|_| {
user_auth_for_datasets(&permissions, &streams).await.map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub async fn get(
.map(|t| t.table_name.clone())
.collect_vec();

user_auth_for_datasets(&permissions, tables)?;
user_auth_for_datasets(&permissions, tables).await?;

Ok(web::Json(correlation))
}
Expand Down
98 changes: 49 additions & 49 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use actix_web::{Either, FromRequest, HttpRequest, HttpResponse, Responder};
use arrow_array::RecordBatch;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::sql::sqlparser::parser::ParserError;
use futures::stream::once;
use futures::{future, Stream, StreamExt};
use futures_util::Future;
Expand All @@ -44,11 +44,10 @@ use tracing::{error, warn};

use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::Mode;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::query::error::ExecuteError;
use crate::query::{resolve_stream_names, QUERY_SESSION};
use crate::query::{execute, CountsRequest, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::QueryResponse;
use crate::storage::ObjectStorageError;
Expand Down Expand Up @@ -81,31 +80,21 @@ pub async fn get_records_and_fields(
query_request: &Query,
req: &HttpRequest,
) -> Result<(Option<Vec<RecordBatch>>, Option<Vec<String>>), QueryError> {
let tables = resolve_stream_names(&query_request.query)?;
let session_state = QUERY_SESSION.state();

// get the logical plan and extract the table name
let raw_logical_plan = session_state
.create_logical_plan(&query_request.query)
.await?;

let time_range =
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let tables = visitor.into_inner();
update_schema_when_distributed(&tables).await?;
let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?;

let query: LogicalQuery =
into_query(query_request, &session_state, time_range, &tables).await?;
let creds = extract_session_key_from_req(req)?;
let permissions = Users.get_permissions(&creds);

let table_name = query
.first_table_name()
let table_name = tables
.first()
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;

user_auth_for_datasets(&permissions, &tables)?;
user_auth_for_datasets(&permissions, &tables).await?;

let (records, fields) = execute(query, &table_name, false).await?;

Expand All @@ -121,35 +110,18 @@ pub async fn get_records_and_fields(

pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
let session_state = QUERY_SESSION.state();
let raw_logical_plan = match session_state
.create_logical_plan(&query_request.query)
.await
{
Ok(raw_logical_plan) => raw_logical_plan,
Err(_) => {
create_streams_for_querier().await?;
session_state
.create_logical_plan(&query_request.query)
.await?
}
};
let time_range =
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;

let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
let tables = visitor.into_inner();
update_schema_when_distributed(&tables).await?;
let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;

let tables = resolve_stream_names(&query_request.query)?;
let query: LogicalQuery =
into_query(&query_request, &session_state, time_range, &tables).await?;
let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);

let table_name = query
.first_table_name()
let table_name = tables
.first()
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;

user_auth_for_datasets(&permissions, &tables)?;
user_auth_for_datasets(&permissions, &tables).await?;

let time = Instant::now();

Expand Down Expand Up @@ -372,7 +344,7 @@ pub async fn get_counts(
let body = counts_request.into_inner();

// does user have access to table?
user_auth_for_datasets(&permissions, &[body.stream.clone()])?;
user_auth_for_datasets(&permissions, &[body.stream.clone()]).await?;

// if the user has given a sql query (counts call with filters applied), then use this flow
// this could include filters or group by
Expand Down Expand Up @@ -420,11 +392,9 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
// if the mode is query or prism, we need to update the schema in memory
// no need to commit schema to storage
// as the schema is read from memory everytime
if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism {
for table in tables {
if let Ok(new_schema) = fetch_schema(table).await {
commit_schema(table, Arc::new(new_schema))?;
}
for table in tables {
if let Ok(new_schema) = fetch_schema(table).await {
commit_schema(table, Arc::new(new_schema))?;
}
}

Expand Down Expand Up @@ -520,6 +490,7 @@ pub async fn into_query(
query: &Query,
session_state: &SessionState,
time_range: TimeRange,
tables: &Vec<String>,
) -> Result<LogicalQuery, QueryError> {
if query.query.is_empty() {
return Err(QueryError::EmptyQuery);
Expand All @@ -532,9 +503,36 @@ pub async fn into_query(
if query.end_time.is_empty() {
return Err(QueryError::EmptyEndTime);
}
let raw_logical_plan = match session_state.create_logical_plan(&query.query).await {
Ok(plan) => plan,
Err(_) => {
let mut join_set = JoinSet::new();
for stream_name in tables {
let stream_name = stream_name.clone();
join_set.spawn(async move {
let result = PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await;

if let Err(e) = &result {
warn!("Failed to create stream '{}': {}", stream_name, e);
}

(stream_name, result)
});
}

while let Some(result) = join_set.join_next().await {
if let Err(join_error) = result {
warn!("Task join error: {}", join_error);
}
}
session_state.create_logical_plan(&query.query).await?
}
};

Ok(crate::query::Query {
raw_logical_plan: session_state.create_logical_plan(&query.query).await?,
raw_logical_plan,
time_range,
filter_tag: query.filter_tags.clone(),
})
Expand Down Expand Up @@ -618,6 +616,8 @@ Description: {0}"#
CustomError(String),
#[error("No available queriers found")]
NoAvailableQuerier,
#[error("{0}")]
ParserError(#[from] ParserError),
}

impl actix_web::ResponseError for QueryError {
Expand Down
Loading
Loading