Skip to content

fix: flatten before detect #1361

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 28, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ mod ui {
.expect("has segemnts")
.find(|v| v.starts_with('v'))
.expect("version segement");
println!("cargo:rustc-env=UI_VERSION={}", ui_version);
println!("cargo:rustc-env=UI_VERSION={ui_version}");
}

Ok(())
Expand Down
65 changes: 53 additions & 12 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
use crate::storage::retention::Retention;
use crate::storage::{StreamInfo, StreamType};
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::json::flatten::{
self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels,
};
use crate::{stats, validator, LOCK_EXPECT};

use actix_web::http::StatusCode;
Expand Down Expand Up @@ -102,22 +105,60 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
}

pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, StreamError> {
let log_records: Vec<Value> = match json {
Value::Array(arr) => arr,
value @ Value::Object(_) => vec![value],
_ => {
// flatten before infer
if !has_more_than_max_allowed_levels(&json, 1) {
//perform generic flattening, return error if failed to flatten
let mut flattened_json = match generic_flattening(&json) {
Ok(flattened) => match convert_to_array(flattened) {
Ok(array) => array,
Err(e) => {
return Err(StreamError::Custom {
msg: format!("Failed to convert to array: {}", e),
status: StatusCode::BAD_REQUEST,
})
}
},
Err(e) => {
return Err(StreamError::Custom {
msg: e.to_string(),
status: StatusCode::BAD_REQUEST,
})
}
};
if let Err(err) = flatten::flatten(&mut flattened_json, "_", None, None, None, false) {
return Err(StreamError::Custom {
msg: "please send json events as part of the request".to_string(),
msg: err.to_string(),
status: StatusCode::BAD_REQUEST,
})
});
}
};

let mut schema = Arc::new(infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap());
for log_record in log_records {
schema = override_data_type(schema, log_record, SchemaVersion::V1);
let flattened_json_arr = match flattened_json {
Value::Array(arr) => arr,
value @ Value::Object(_) => vec![value],
_ => unreachable!("flatten would have failed beforehand"),
};
let mut schema = match infer_json_schema_from_iterator(flattened_json_arr.iter().map(Ok)) {
Ok(schema) => Arc::new(schema),
Err(e) => {
return Err(StreamError::Custom {
msg: format!("Failed to infer schema: {}", e),
status: StatusCode::BAD_REQUEST,
})
}
};
for flattened_json in flattened_json_arr {
schema = override_data_type(schema, flattened_json, SchemaVersion::V1);
}
Ok((web::Json(schema), StatusCode::OK))
} else {
// error out if the JSON is heavily nested
Err(StreamError::Custom {
msg: format!(
"JSON is too deeply nested (exceeds level {}), cannot flatten",
PARSEABLE.options.event_flatten_level
),
status: StatusCode::BAD_REQUEST,
})
}
Ok((web::Json(schema), StatusCode::OK))
}

pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
Expand Down
Loading