Skip to content

fix: flatten before detect #1361

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ mod ui {
.expect("has segemnts")
.find(|v| v.starts_with('v'))
.expect("version segement");
println!("cargo:rustc-env=UI_VERSION={}", ui_version);
println!("cargo:rustc-env=UI_VERSION={ui_version}");
}

Ok(())
Expand Down
12 changes: 6 additions & 6 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn execute_base_query(
original_query: &str,
) -> Result<DataFrame, AlertError> {
let stream_name = query.first_table_name().ok_or_else(|| {
AlertError::CustomError(format!("Table name not found in query- {}", original_query))
AlertError::CustomError(format!("Table name not found in query- {original_query}"))
})?;

let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
Expand Down Expand Up @@ -412,7 +412,7 @@ pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
let value = match NumberOrString::from_string(value.to_owned()) {
NumberOrString::Number(val) => format!("{val}"),
NumberOrString::String(val) => {
format!("'{}'", val)
format!("'{val}'")
}
};
format!("{} {}", condition.operator, value)
Expand Down Expand Up @@ -456,23 +456,23 @@ fn match_alert_operator(expr: &ConditionConfig) -> Expr {
WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(column)),
Operator::RegexIMatch,
Box::new(lit(format!("^{}", value))),
Box::new(lit(format!("^{value}"))),
)),
WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(column)),
Operator::RegexIMatch,
Box::new(lit(format!("{}$", value))),
Box::new(lit(format!("{value}$"))),
)),
WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(value)),
WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(column)),
Operator::RegexNotIMatch,
Box::new(lit(format!("^{}", value))),
Box::new(lit(format!("^{value}"))),
)),
WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(column)),
Operator::RegexNotIMatch,
Box::new(lit(format!("{}$", value))),
Box::new(lit(format!("{value}$"))),
)),
_ => unreachable!("value must not be null for operators other than `is null` and `is not null`. Should've been caught in validation")
}
Expand Down
2 changes: 1 addition & 1 deletion src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ impl Conditions {
format!("{} {}", expr2.column, expr2.operator)
};

format!("[{} {op} {}]", expr1_msg, expr2_msg)
format!("[{expr1_msg} {op} {expr2_msg}]")
}
},
None => {
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ pub fn partition_path(
let lower = lower_bound.date_naive().format("%Y-%m-%d").to_string();
let upper = upper_bound.date_naive().format("%Y-%m-%d").to_string();
if lower == upper {
RelativePathBuf::from_iter([stream, &format!("date={}", lower)])
RelativePathBuf::from_iter([stream, &format!("date={lower}")])
} else {
RelativePathBuf::from_iter([stream, &format!("date={}:{}", lower, upper)])
RelativePathBuf::from_iter([stream, &format!("date={lower}:{upper}")])
}
}
17 changes: 6 additions & 11 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,7 @@ impl Options {
} else {
if endpoint.starts_with("http") {
panic!(
"Invalid value `{}`, please set the environment variable `{}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.",
endpoint, env_var
"Invalid value `{endpoint}`, please set the environment variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.",
);
}
endpoint.to_string()
Expand All @@ -579,15 +578,14 @@ impl Options {

if addr_parts.len() != 2 {
panic!(
"Invalid value `{}`, please set the environment variable to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.",
endpoint
"Invalid value `{endpoint}`, please set the environment variable to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details."
);
}

let hostname = self.resolve_env_var(addr_parts[0]);
let port = self.resolve_env_var(addr_parts[1]);

self.build_url(&format!("{}:{}", hostname, port))
self.build_url(&format!("{hostname}:{port}"))
}

/// resolve the env var
Expand All @@ -597,15 +595,13 @@ impl Options {
if let Some(env_var) = value.strip_prefix('$') {
let resolved_value = env::var(env_var).unwrap_or_else(|_| {
panic!(
"The environment variable `{}` is not set. Please set it to a valid value. Refer to the documentation: https://logg.ing/env for more details.",
env_var
"The environment variable `{env_var}` is not set. Please set it to a valid value. Refer to the documentation: https://logg.ing/env for more details."
);
});

if resolved_value.starts_with("http") {
panic!(
"Invalid value `{}`, please set the environment variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.",
resolved_value, env_var
"Invalid value `{resolved_value}`, please set the environment variable `{env_var}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.",
);
}

Expand All @@ -621,8 +617,7 @@ impl Options {
.parse::<Url>()
.unwrap_or_else(|err| {
panic!(
"{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.",
address
"{err}, failed to parse `{address}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details."
);
})
}
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl FromStr for BadData {
"drop" => Ok(BadData::Drop),
"fail" => Ok(BadData::Fail),
"dlt" => Ok(BadData::Dlt),
_ => Err(format!("Invalid bad data policy: {}", s)),
_ => Err(format!("Invalid bad data policy: {s}")),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/connectors/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ impl std::str::FromStr for SecurityProtocol {
"SSL" => Ok(SecurityProtocol::Ssl),
"SASL_SSL" => Ok(SecurityProtocol::SaslSsl),
"SASL_PLAINTEXT" => Ok(SecurityProtocol::SaslPlaintext),
_ => Err(format!("Invalid security protocol: {}", s)),
_ => Err(format!("Invalid security protocol: {s}")),
}
}
}
Expand Down Expand Up @@ -965,7 +965,7 @@ impl std::str::FromStr for SaslMechanism {
"SCRAM-SHA-512" => Ok(SaslMechanism::ScramSha512),
"GSSAPI" => Ok(SaslMechanism::Gssapi),
"OAUTHBEARER" => Ok(SaslMechanism::OAuthBearer),
_ => Err(format!("Invalid SASL mechanism: {}", s)),
_ => Err(format!("Invalid SASL mechanism: {s}")),
}
}
}
Expand All @@ -985,7 +985,7 @@ impl std::str::FromStr for Acks {
"0" => Ok(Acks::None),
"1" => Ok(Acks::Leader),
"all" => Ok(Acks::All),
_ => Err(format!("Invalid acks value: {}", s)),
_ => Err(format!("Invalid acks value: {s}")),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl FlightService for AirServiceImpl {

let time = time.elapsed().as_secs_f64();
QUERY_EXECUTE_TIME
.with_label_values(&[&format!("flight-query-{}", stream_name)])
.with_label_values(&[&format!("flight-query-{stream_name}")])
.observe(time);

// Airplane takes off 🛫
Expand Down
5 changes: 2 additions & 3 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,13 +783,12 @@ pub async fn remove_node(node_url: Path<String>) -> Result<impl Responder, PostE

if removed_ingestor || removed_indexer || removed_querier || removed_prism {
return Ok((
format!("node {} removed successfully", domain_name),
format!("node {domain_name} removed successfully"),
StatusCode::OK,
));
}
Err(PostError::Invalid(anyhow::anyhow!(
"node {} not found",
domain_name
"node {domain_name} not found"
)))
}

Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,5 @@ pub fn to_url_string(str: String) -> String {
return str;
}

format!("http://{}/", str)
format!("http://{str}/")
}
9 changes: 4 additions & 5 deletions src/handlers/http/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ impl From<&arrow_schema::Field> for Field {

fn build_prompt(stream: &str, prompt: &str, schema_json: &str) -> String {
format!(
r#"I have a table called {}.
It has the columns:\n{}
Based on this schema, generate valid SQL for the query: "{}"
Generate only simple SQL as output. Also add comments in SQL syntax to explain your actions. Don't output anything else. If it is not possible to generate valid SQL, output an SQL comment saying so."#,
stream, schema_json, prompt
r#"I have a table called {stream}.
It has the columns:\n{schema_json}
Based on this schema, generate valid SQL for the query: "{prompt}"
Generate only simple SQL as output. Also add comments in SQL syntax to explain your actions. Don't output anything else. If it is not possible to generate valid SQL, output an SQL comment saying so."#
)
}

Expand Down
65 changes: 53 additions & 12 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
use crate::storage::retention::Retention;
use crate::storage::{StreamInfo, StreamType};
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::json::flatten::{
self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels,
};
use crate::{stats, validator, LOCK_EXPECT};

use actix_web::http::StatusCode;
Expand Down Expand Up @@ -102,22 +105,60 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
}

pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, StreamError> {
let log_records: Vec<Value> = match json {
Value::Array(arr) => arr,
value @ Value::Object(_) => vec![value],
_ => {
// flatten before infer
if !has_more_than_max_allowed_levels(&json, 1) {
//perform generic flattening, return error if failed to flatten
let mut flattened_json = match generic_flattening(&json) {
Ok(flattened) => match convert_to_array(flattened) {
Ok(array) => array,
Err(e) => {
return Err(StreamError::Custom {
msg: format!("Failed to convert to array: {e}"),
status: StatusCode::BAD_REQUEST,
})
}
},
Err(e) => {
return Err(StreamError::Custom {
msg: e.to_string(),
status: StatusCode::BAD_REQUEST,
})
}
};
if let Err(err) = flatten::flatten(&mut flattened_json, "_", None, None, None, false) {
return Err(StreamError::Custom {
msg: "please send json events as part of the request".to_string(),
msg: err.to_string(),
status: StatusCode::BAD_REQUEST,
})
});
}
};

let mut schema = Arc::new(infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap());
for log_record in log_records {
schema = override_data_type(schema, log_record, SchemaVersion::V1);
let flattened_json_arr = match flattened_json {
Value::Array(arr) => arr,
value @ Value::Object(_) => vec![value],
_ => unreachable!("flatten would have failed beforehand"),
};
let mut schema = match infer_json_schema_from_iterator(flattened_json_arr.iter().map(Ok)) {
Ok(schema) => Arc::new(schema),
Err(e) => {
return Err(StreamError::Custom {
msg: format!("Failed to infer schema: {e}"),
status: StatusCode::BAD_REQUEST,
})
}
};
for flattened_json in flattened_json_arr {
schema = override_data_type(schema, flattened_json, SchemaVersion::V1);
}
Ok((web::Json(schema), StatusCode::OK))
} else {
// error out if the JSON is heavily nested
Err(StreamError::Custom {
msg: format!(
"JSON is too deeply nested (exceeds level {}), cannot flatten",
PARSEABLE.options.event_flatten_level
),
status: StatusCode::BAD_REQUEST,
})
}
Ok((web::Json(schema), StatusCode::OK))
}

pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub fn base_path_without_preceding_slash() -> String {
/// An `anyhow::Result` containing the `arrow_schema::Schema` for the specified stream.
pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Schema> {
let path_prefix =
relative_path::RelativePathBuf::from(format!("{}/{}", stream_name, STREAM_ROOT_DIRECTORY));
relative_path::RelativePathBuf::from(format!("{stream_name}/{STREAM_ROOT_DIRECTORY}"));
let store = PARSEABLE.storage.get_object_store();
let res: Vec<Schema> = store
.get_objects(
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ async fn validate_credentials() -> anyhow::Result<()> {
PARSEABLE.options.username, PARSEABLE.options.password
));

let token = format!("Basic {}", token);
let token = format!("Basic {token}");

if check != token {
return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again."));
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/oidc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ fn redirect_to_oidc(
}

fn redirect_to_oidc_logout(mut logout_endpoint: Url, redirect: &Url) -> HttpResponse {
logout_endpoint.set_query(Some(&format!("post_logout_redirect_uri={}", redirect)));
logout_endpoint.set_query(Some(&format!("post_logout_redirect_uri={redirect}")));
HttpResponse::TemporaryRedirect()
.insert_header((header::CACHE_CONTROL, "no-store"))
.insert_header((header::LOCATION, logout_endpoint.to_string()))
Expand Down
5 changes: 2 additions & 3 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,7 @@ async fn handle_streaming_query(

// Combine the initial fields chunk with the records stream
let fields_chunk = once(future::ok::<_, actix_web::Error>(Bytes::from(format!(
"{}\n",
fields_json
"{fields_json}\n"
))));
Box::pin(fields_chunk.chain(records_stream))
as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
Expand Down Expand Up @@ -356,7 +355,7 @@ fn create_batch_processor(
error!("Failed to parse record batch into JSON: {}", e);
actix_web::error::ErrorInternalServerError(e)
})?;
Ok(Bytes::from(format!("{}\n", response)))
Ok(Bytes::from(format!("{response}\n")))
}
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
}
Expand Down
6 changes: 3 additions & 3 deletions src/handlers/http/users/dashboards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub async fn post(
}
DASHBOARDS.update(&dashboard).await;

let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id));
let path = dashboard_path(&user_id, &format!("{dashboard_id}.json"));

let store = PARSEABLE.storage.get_object_store();
let dashboard_bytes = serde_json::to_vec(&dashboard)?;
Expand Down Expand Up @@ -120,7 +120,7 @@ pub async fn update(
}
DASHBOARDS.update(&dashboard).await;

let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id));
let path = dashboard_path(&user_id, &format!("{dashboard_id}.json"));

let store = PARSEABLE.storage.get_object_store();
let dashboard_bytes = serde_json::to_vec(&dashboard)?;
Expand All @@ -145,7 +145,7 @@ pub async fn delete(
{
return Err(DashboardError::Metadata("Dashboard does not exist"));
}
let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id));
let path = dashboard_path(&user_id, &format!("{dashboard_id}.json"));
let store = PARSEABLE.storage.get_object_store();
store.delete_object(&path).await?;

Expand Down
Loading
Loading