Skip to content

Commit 1edcb43

Browse files
committed
fix
1 parent 5eb78bf commit 1edcb43

File tree

5 files changed

+29
-27
lines changed

5 files changed

+29
-27
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/expression/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ educe = { workspace = true }
3333
either = { workspace = true }
3434
enum-as-inner = { workspace = true }
3535
ethnum = { workspace = true, features = ["serde", "macros"] }
36+
fastrace = { workspace = true }
3637
futures = { workspace = true }
3738
geo = { workspace = true }
3839
geozero = { workspace = true }

src/query/expression/src/aggregate/aggregate_hashtable.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ impl AggregateHashTable {
167167
}
168168
}
169169

170+
#[fastrace::trace(name = "AggregateHashTable::add_groups_inner")]
170171
// Add new groups and combine the states
171172
fn add_groups_inner(
172173
&mut self,

src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -534,11 +534,7 @@ impl<const NULLABLE_RESULT: bool> CommonNullAdaptor<NULLABLE_RESULT> {
534534
return Ok(());
535535
}
536536

537-
if !get_flag(place) {
538-
// initial the state to remove the dirty stats
539-
self.init_state(place);
540-
}
541-
set_flag(place, true);
537+
self.update_flag(place);
542538
self.nested
543539
.merge(place.remove_last_loc(), &data[..data.len() - 1])
544540
}
@@ -556,35 +552,29 @@ impl<const NULLABLE_RESULT: bool> CommonNullAdaptor<NULLABLE_RESULT> {
556552

557553
match state {
558554
BlockEntry::Column(Column::Tuple(tuple)) => {
559-
let nested_state = tuple[0..tuple.len() - 1].to_vec();
555+
let nested_state = Column::Tuple(tuple[0..tuple.len() - 1].to_vec());
560556
let flag = tuple.last().unwrap().as_boolean().unwrap();
561557
let flag = match filter {
562558
Some(filter) => filter & flag,
563559
None => flag.clone(),
564560
};
565-
if flag.null_count() == 0 {
566-
return self.nested.batch_merge(
567-
places,
568-
loc,
569-
&Column::Tuple(nested_state).into(),
570-
filter,
571-
);
572-
}
573-
574-
for (place, flag) in places.iter().zip(flag.iter()) {
575-
if flag {
576-
let addr = AggrState::new(*place, loc);
577-
if !get_flag(addr) {
578-
// initial the state to remove the dirty stats
579-
self.init_state(AggrState::new(*place, loc));
580-
}
581-
set_flag(addr, true);
561+
let filter = if flag.null_count() == 0 {
562+
for place in places.iter() {
563+
self.update_flag(AggrState::new(*place, loc));
582564
}
583-
}
584-
585-
let nested_state = Column::Tuple(nested_state).into();
565+
None
566+
} else {
567+
for place in places
568+
.iter()
569+
.zip(flag.iter())
570+
.filter_map(|(place, flag)| flag.then_some(place))
571+
{
572+
self.update_flag(AggrState::new(*place, loc));
573+
}
574+
Some(&flag)
575+
};
586576
self.nested
587-
.batch_merge(places, &loc[..loc.len() - 1], &nested_state, Some(&flag))
577+
.batch_merge(places, &loc[..loc.len() - 1], &nested_state.into(), filter)
588578
}
589579
_ => {
590580
let state = state.downcast::<AnyType>().unwrap();
@@ -643,6 +633,14 @@ impl<const NULLABLE_RESULT: bool> CommonNullAdaptor<NULLABLE_RESULT> {
643633
self.nested.drop_state(place.remove_last_loc())
644634
}
645635
}
636+
637+
fn update_flag(&self, place: AggrState) {
638+
if !get_flag(place) {
639+
// initial the state to remove the dirty stats
640+
self.init_state(place);
641+
}
642+
set_flag(place, true);
643+
}
646644
}
647645

648646
fn set_flag(place: AggrState, flag: bool) {

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ impl SerializedPayload {
4545
entry.as_column().unwrap()
4646
}
4747

48+
#[fastrace::trace(name = "SerializedPayload::convert_to_aggregate_table")]
4849
pub fn convert_to_aggregate_table(
4950
&self,
5051
group_types: Vec<DataType>,

0 commit comments

Comments
 (0)