Skip to content

Commit 478b282

Browse files
remove stats calculation from conversion
1 parent 829c97d commit 478b282

File tree

2 files changed

+5
-12
lines changed

2 files changed

+5
-12
lines changed

src/parseable/streams.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ impl Stream {
463463
}
464464

465465
/// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
466-
pub async fn prepare_parquet(
466+
pub fn prepare_parquet(
467467
&self,
468468
init_signal: bool,
469469
shutdown_signal: bool,
@@ -637,8 +637,6 @@ impl Stream {
637637
}
638638

639639
self.update_staging_metrics(&staging_files);
640-
641-
let mut record_batches = Vec::new();
642640
for (parquet_path, arrow_files) in staging_files {
643641
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
644642
if record_reader.readers.is_empty() {
@@ -656,7 +654,6 @@ impl Stream {
656654
&schema,
657655
&props,
658656
time_partition,
659-
&mut record_batches,
660657
)? {
661658
continue;
662659
}
@@ -682,7 +679,6 @@ impl Stream {
682679
schema: &Arc<Schema>,
683680
props: &WriterProperties,
684681
time_partition: Option<&String>,
685-
record_batches: &mut Vec<RecordBatch>,
686682
) -> Result<bool, StagingError> {
687683
let mut part_file = OpenOptions::new()
688684
.create(true)
@@ -692,8 +688,6 @@ impl Stream {
692688
let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?;
693689
for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) {
694690
writer.write(record)?;
695-
// Collect record batches for finding statistics later
696-
record_batches.push(record.clone());
697691
}
698692
writer.close()?;
699693

@@ -960,7 +954,7 @@ impl Stream {
960954
}
961955

962956
/// First flushes arrows onto disk and then converts the arrow into parquet
963-
pub async fn flush_and_convert(
957+
pub fn flush_and_convert(
964958
&self,
965959
init_signal: bool,
966960
shutdown_signal: bool,
@@ -975,7 +969,7 @@ impl Stream {
975969

976970
let start_convert = Instant::now();
977971

978-
self.prepare_parquet(init_signal, shutdown_signal).await?;
972+
self.prepare_parquet(init_signal, shutdown_signal)?;
979973
trace!(
980974
"Converting arrows to parquet on stream ({}) took: {}s",
981975
self.stream_name,
@@ -1070,8 +1064,7 @@ impl Streams {
10701064
.map(Arc::clone)
10711065
.collect();
10721066
for stream in streams {
1073-
joinset
1074-
.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal).await });
1067+
joinset.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal) });
10751068
}
10761069
}
10771070
}

src/storage/object_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -909,7 +909,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
909909
// perform local sync for the `pstats` dataset
910910
task::spawn(async move {
911911
if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) {
912-
if let Err(err) = stats_stream.flush_and_convert(false, false).await {
912+
if let Err(err) = stats_stream.flush_and_convert(false, false) {
913913
error!("Failed in local sync for dataset stats stream: {err}");
914914
}
915915
}

0 commit comments

Comments
 (0)