Skip to content

feat(query): add rule_grouping_sets_to_union #18413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 92 commits into from
Jul 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
e880453
add MaterializedCTE plan
SkyFan2002 Jun 19, 2025
466e163
build pipeline
SkyFan2002 Jun 19, 2025
db16044
build pipeline
SkyFan2002 Jun 20, 2025
26bb0ab
add operator
SkyFan2002 Jun 23, 2025
7e986a9
remove m cte temp table
SkyFan2002 Jun 23, 2025
e4ee842
bind
SkyFan2002 Jun 23, 2025
307809c
Merge remote-tracking branch 'upstream/main' into cte_plan
SkyFan2002 Jun 23, 2025
e5d7472
fix
SkyFan2002 Jun 23, 2025
9a53ba2
remove unused field
SkyFan2002 Jun 29, 2025
ff73950
fix bind
SkyFan2002 Jun 30, 2025
5197134
fix schema
SkyFan2002 Jun 30, 2025
ba5be42
fix
SkyFan2002 Jun 30, 2025
7690dbf
make lint
SkyFan2002 Jun 30, 2025
1f22235
Merge branch 'main' into cte_plan
SkyFan2002 Jun 30, 2025
f8f4d7a
fix
SkyFan2002 Jun 30, 2025
4c75ded
fix join
SkyFan2002 Jun 30, 2025
5bb786c
Merge branch 'main' into cte_plan
SkyFan2002 Jun 30, 2025
5a4c0ca
Merge branch 'main' into cte_plan
SkyFan2002 Jun 30, 2025
cc89312
fix
SkyFan2002 Jun 30, 2025
291204a
refine explain
SkyFan2002 Jun 30, 2025
4835fb8
fix
SkyFan2002 Jul 1, 2025
046bd04
fix
SkyFan2002 Jul 2, 2025
4979e0b
fix
SkyFan2002 Jul 2, 2025
af0eeb7
fix
SkyFan2002 Jul 2, 2025
67c2bc3
fix
SkyFan2002 Jul 3, 2025
9a8eb3b
fix
SkyFan2002 Jul 8, 2025
30aa9f3
fix
SkyFan2002 Jul 9, 2025
e39af8e
fix
SkyFan2002 Jul 9, 2025
7b5b406
Merge branch 'main' into cte_plan
SkyFan2002 Jul 9, 2025
a39a569
CleanupUnusedCTE
SkyFan2002 Jul 9, 2025
3686686
fix
SkyFan2002 Jul 10, 2025
fb7bfbd
fix
SkyFan2002 Jul 10, 2025
e600f31
fix
SkyFan2002 Jul 10, 2025
5e0752a
fix
SkyFan2002 Jul 10, 2025
4606b12
refine
SkyFan2002 Jul 10, 2025
70df77d
refine
SkyFan2002 Jul 10, 2025
e38c70f
make lint
SkyFan2002 Jul 10, 2025
6829408
fix
SkyFan2002 Jul 10, 2025
bc4efde
add log
SkyFan2002 Jul 13, 2025
b5ccca9
fix
SkyFan2002 Jul 13, 2025
2f897f4
fix
SkyFan2002 Jul 13, 2025
cf7e7f9
make lint
SkyFan2002 Jul 13, 2025
2a5f371
fix
SkyFan2002 Jul 13, 2025
8b0efe5
fix
SkyFan2002 Jul 15, 2025
38067f5
fix
SkyFan2002 Jul 15, 2025
3758bbd
fix
SkyFan2002 Jul 15, 2025
488a08d
fix
SkyFan2002 Jul 15, 2025
f24bb7f
disable distributed optimization
SkyFan2002 Jul 16, 2025
6d70dd6
Merge remote-tracking branch 'upstream/main' into cte_plan
SkyFan2002 Jul 16, 2025
00b299a
fix merge
SkyFan2002 Jul 16, 2025
4d24f11
Merge branch 'main' into cte_plan
SkyFan2002 Jul 16, 2025
ad405bb
fix explain join
SkyFan2002 Jul 17, 2025
a5853a0
fix logic test
SkyFan2002 Jul 17, 2025
0d8ee4e
fix logic test
SkyFan2002 Jul 17, 2025
321a70a
add ref count
SkyFan2002 Jul 21, 2025
a5949c1
refactor: streaming CTE consumption
SkyFan2002 Jul 21, 2025
9c146f8
refactor plan
SkyFan2002 Jul 22, 2025
993659a
fix
SkyFan2002 Jul 22, 2025
db1c089
fix
SkyFan2002 Jul 22, 2025
3e90fbd
Merge branch 'main' into cte_plan
SkyFan2002 Jul 22, 2025
082eccf
enable distributed
SkyFan2002 Jul 23, 2025
2ad7a25
fix logic test
SkyFan2002 Jul 23, 2025
b2d42af
fix serial cte
SkyFan2002 Jul 23, 2025
5e20f1c
fix test
SkyFan2002 Jul 23, 2025
c612c56
fix fragment type
SkyFan2002 Jul 23, 2025
5fc45ba
fix replace range join
SkyFan2002 Jul 23, 2025
7b38411
fix explain join order
SkyFan2002 Jul 23, 2025
e385732
fix logic test
SkyFan2002 Jul 23, 2025
4fcfba5
feat(query): add rule_grouping_sets_to_union
sundy-li Jul 24, 2025
c7b9d95
feat(query): add rule_grouping_sets_to_union
sundy-li Jul 24, 2025
dedc246
Merge remote-tracking branch 'fb/cte_plan' into rule_grouping_sets_to…
sundy-li Jul 24, 2025
1f8df6c
simplify
SkyFan2002 Jul 24, 2025
a1178bf
ref_count calculation is not required when constructing MaterializedCTE
SkyFan2002 Jul 24, 2025
934a2d4
Merge fb
sundy-li Jul 24, 2025
9a91ebb
simplify MaterializedCTE
SkyFan2002 Jul 24, 2025
b79eb41
simplify CTEConsumer
SkyFan2002 Jul 24, 2025
1740ff0
Merge remote-tracking branch 'fb/cte_plan' into rule_grouping_sets_to…
sundy-li Jul 24, 2025
e38ca13
Merge
sundy-li Jul 25, 2025
576f98a
Update src/query/service/src/pipelines/builders/builder_sequence.rs
SkyFan2002 Jul 25, 2025
df268f7
Update src/query/service/src/pipelines/builders/builder_materialized_…
SkyFan2002 Jul 25, 2025
a053834
make lint
SkyFan2002 Jul 25, 2025
0a01f28
rename CTEConsumer to MaterializeCTERef
SkyFan2002 Jul 25, 2025
4513b15
Update src/query/sql/src/planner/optimizer/optimizers/operator/cte/cl…
SkyFan2002 Jul 25, 2025
5f951f0
Merge
sundy-li Jul 25, 2025
2c6399a
Merge
sundy-li Jul 25, 2025
8ff3944
add channel size config
SkyFan2002 Jul 27, 2025
acc747d
Merge remote-tracking branch 'fb/cte_plan' into rule_grouping_sets_to…
sundy-li Jul 27, 2025
c2eacfa
Merge
sundy-li Jul 27, 2025
a71c994
Merge
sundy-li Jul 27, 2025
e9986a1
Merge branch 'main' into rule_grouping_sets_to_union
sundy-li Jul 27, 2025
da58283
update
sundy-li Jul 27, 2025
16bb077
update
sundy-li Jul 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ impl PipelineBuilder {
self.main_pipeline.extend_sinks(left_sinks);
self.main_pipeline.extend_sinks(right_sinks);

match self.ctx.get_settings().get_enable_parallel_union_all()? {
let enable_parallel_union_all = self.ctx.get_settings().get_enable_parallel_union_all()?
|| self.ctx.get_settings().get_grouping_sets_to_union()?;
match enable_parallel_union_all {
true => self.main_pipeline.resize(outputs, false),
false => self.main_pipeline.sequence_group(sequence_groups, outputs),
}
Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,13 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("grouping_sets_to_union", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enables grouping sets to union.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("storage_fetch_part_num", DefaultSettingValue {
value: UserSettingValue::UInt64(2),
desc: "Sets the number of partitions that are fetched in parallel from storage during query execution.",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@ impl Settings {
self.try_get_string("group_by_shuffle_mode")
}

pub fn get_grouping_sets_to_union(&self) -> Result<bool> {
Ok(self.try_get_u64("grouping_sets_to_union")? == 1)
}

pub fn get_efficiently_memory_group_by(&self) -> Result<bool> {
Ok(self.try_get_u64("efficiently_memory_group_by")? == 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,13 @@ impl PhysicalPlanBuilder {
settings.get_enable_experimental_aggregate_hashtable()?;

if let Some(grouping_sets) = agg.grouping_sets.as_ref() {
assert_eq!(grouping_sets.dup_group_items.len(), group_items.len() - 1); // ignore `_grouping_id`.
// If the aggregation function argument if a group item,
// we cannot use the group item directly.
// It's because the group item will be wrapped with nullable and fill dummy NULLs (in `AggregateExpand` plan),
// which will cause panic while executing aggregation function.
// To avoid the panic, we will duplicate (`Arc::clone`) original group item columns in `AggregateExpand`,
// we should use these columns instead.
// ignore `_grouping_id`.
// If the aggregation function argument if a group item,
// we cannot use the group item directly.
// It's because the group item will be wrapped with nullable and fill dummy NULLs (in `AggregateExpand` plan),
// which will cause panic while executing aggregation function.
// To avoid the panic, we will duplicate (`Arc::clone`) original group item columns in `AggregateExpand`,
// we should use these columns instead.
for func in agg_funcs.iter_mut() {
for arg in func.arg_indices.iter_mut() {
if let Some(pos) = group_items.iter().position(|g| g == arg) {
Expand Down Expand Up @@ -480,7 +480,6 @@ impl PhysicalPlanBuilder {

if let Some(grouping_sets) = agg.grouping_sets.as_ref() {
// The argument types are wrapped nullable due to `AggregateExpand` plan. We should recover them to original types.
assert_eq!(grouping_sets.dup_group_items.len(), group_items.len() - 1); // ignore `_grouping_id`.
for func in agg_funcs.iter_mut() {
for (arg, ty) in func.arg_indices.iter_mut().zip(func.sig.args.iter_mut()) {
if let Some(pos) = group_items.iter().position(|g| g == arg) {
Expand Down
19 changes: 11 additions & 8 deletions src/query/sql/src/planner/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ impl Binder {
);
dup_group_items.push((dummy.index, *dummy.data_type));
}

// Add a virtual column `_grouping_id` to group items.
let grouping_id_column = self.create_derived_column_binding(
"_grouping_id".to_string(),
Expand All @@ -783,14 +784,16 @@ impl Binder {
column: grouping_id_column.clone(),
};

agg_info.group_items_map.insert(
bound_grouping_id_col.clone().into(),
agg_info.group_items.len(),
);
agg_info.group_items.push(ScalarItem {
index: grouping_id_column.index,
scalar: bound_grouping_id_col.into(),
});
if !self.ctx.get_settings().get_grouping_sets_to_union()? {
agg_info.group_items_map.insert(
bound_grouping_id_col.clone().into(),
agg_info.group_items.len(),
);
agg_info.group_items.push(ScalarItem {
index: grouping_id_column.index,
scalar: bound_grouping_id_col.into(),
});
}

let grouping_sets_info = GroupingSetsInfo {
grouping_id_column,
Expand Down
8 changes: 8 additions & 0 deletions src/query/sql/src/planner/optimizer/optimizer_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_catalog::table_context::TableContext;
use educe::Educe;
use parking_lot::RwLock;

use crate::optimizer::optimizers::rule::RuleID;
use crate::planner::QueryExecutor;
use crate::MetadataRef;

Expand Down Expand Up @@ -152,6 +153,13 @@ impl OptimizerContext {
/// Check if an optimizer or rule is disabled based on optimizer_skip_list setting
pub fn is_optimizer_disabled(self: &Arc<Self>, name: &str) -> bool {
let settings = self.get_table_ctx().get_settings();

if !settings.get_grouping_sets_to_union().unwrap_or_default()
&& name == RuleID::GroupingSetsToUnion.to_string()
{
return true;
}

match settings.get_optimizer_skip_list() {
Ok(skip_list) if !skip_list.is_empty() => {
let name_lower = name.to_lowercase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
mod agg_index;
mod rule_eager_aggregation;
mod rule_fold_count_aggregate;
mod rule_grouping_sets_to_union;
mod rule_push_down_filter_aggregate;
mod rule_push_down_limit_aggregate;
mod rule_split_aggregate;
mod rule_try_apply_agg_index;

pub use rule_eager_aggregation::RuleEagerAggregation;
pub use rule_fold_count_aggregate::RuleFoldCountAggregate;
pub use rule_grouping_sets_to_union::RuleGroupingSetsToUnion;
pub use rule_push_down_filter_aggregate::RulePushDownFilterAggregate;
pub use rule_push_down_limit_aggregate::RulePushDownRankLimitAggregate;
pub use rule_split_aggregate::RuleSplitAggregate;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::DefaultHasher;
use std::hash::Hash;
use std::hash::Hasher;
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::types::NumberScalar;
use databend_common_expression::Scalar;

use crate::optimizer::ir::Matcher;
use crate::optimizer::ir::RelExpr;
use crate::optimizer::ir::SExpr;
use crate::optimizer::optimizers::rule::Rule;
use crate::optimizer::optimizers::rule::RuleID;
use crate::optimizer::optimizers::rule::TransformResult;
use crate::plans::walk_expr_mut;
use crate::plans::Aggregate;
use crate::plans::AggregateMode;
use crate::plans::CastExpr;
use crate::plans::ConstantExpr;
use crate::plans::EvalScalar;
use crate::plans::MaterializeCTERef;
use crate::plans::MaterializedCTE;
use crate::plans::RelOp;
use crate::plans::Sequence;
use crate::plans::UnionAll;
use crate::plans::VisitorMut;
use crate::IndexType;
use crate::ScalarExpr;

// TODO
const ID: RuleID = RuleID::GroupingSetsToUnion;
// Split `Grouping Sets` into `Union All` of `Group by`
// Eg:
// select number % 10 AS a, number % 3 AS b, number % 4 AS c
// from numbers(100000000)
// group by grouping sets((a,b),(a,c));

// INTO:

// select number % 10 AS a, number % 3 AS b, number % 4 AS c
// from numbers(100000000)
// group by a,b
// union all
// select number % 10 AS a, number % 3 AS b, number % 4 AS c
// from numbers(100000000)
// group by a,c
//
pub struct RuleGroupingSetsToUnion {
id: RuleID,
matchers: Vec<Matcher>,
}

impl RuleGroupingSetsToUnion {
pub fn new() -> Self {
Self {
id: ID,
// Aggregate
// \
// *
matchers: vec![Matcher::MatchOp {
op_type: RelOp::EvalScalar,
children: vec![Matcher::MatchOp {
op_type: RelOp::Aggregate,
children: vec![Matcher::Leaf],
}],
}],
}
}
}

// Must go before `RuleSplitAggregate`
impl Rule for RuleGroupingSetsToUnion {
fn id(&self) -> RuleID {
self.id
}

fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
let eval_scalar: EvalScalar = s_expr.plan().clone().try_into()?;
let agg: Aggregate = s_expr.child(0)?.plan().clone().try_into()?;
if agg.mode != AggregateMode::Initial {
return Ok(());
}

let agg_input = s_expr.child(0)?.child(0)?;
let agg_input_columns: Vec<IndexType> = RelExpr::with_s_expr(agg_input)
.derive_relational_prop()?
.output_columns
.iter()
.cloned()
.collect();

if let Some(grouping_sets) = &agg.grouping_sets {
if !grouping_sets.sets.is_empty() {
let mut children = Vec::with_capacity(grouping_sets.sets.len());

let mut hasher = DefaultHasher::new();
agg.grouping_sets.hash(&mut hasher);
let hash = hasher.finish();
let temp_cte_name = format!("cte_groupingsets_{hash}");

let cte_materialized_sexpr = SExpr::create_unary(
MaterializedCTE::new(temp_cte_name.clone(), None, Some(1)),
agg_input.clone(),
);

let cte_consumer = SExpr::create_leaf(MaterializeCTERef {
cte_name: temp_cte_name,
output_columns: agg_input_columns.clone(),
def: agg_input.clone(),
});

let mask = (1 << grouping_sets.dup_group_items.len()) - 1;
let group_bys = agg
.group_items
.iter()
.map(|i| {
agg_input_columns
.iter()
.position(|t| *t == i.index)
.unwrap()
})
.collect::<Vec<_>>();

for set in &grouping_sets.sets {
let mut id = 0;

// For element in `group_bys`,
// if it is in current grouping set: set 0, else: set 1. (1 represents it will be NULL in grouping)
// Example: GROUP BY GROUPING SETS ((a, b), (a), (b), ())
// group_bys: [a, b]
// grouping_sets: [[0, 1], [0], [1], []]
// grouping_ids: 00, 01, 10, 11

for g in set {
let i = group_bys.iter().position(|t| *t == *g).unwrap();
id |= 1 << i;
}
let grouping_id = !id & mask;

let mut eval_scalar = eval_scalar.clone();
let mut agg = agg.clone();
agg.grouping_sets = None;

let null_group_ids: Vec<IndexType> = agg
.group_items
.iter()
.map(|i| i.index)
.filter(|index| !set.contains(index))
.clone()
.collect();

agg.group_items.retain(|x| set.contains(&x.index));
let group_ids: Vec<IndexType> =
agg.group_items.iter().map(|i| i.index).collect();

let mut visitor = ReplaceColumnForGroupingSetsVisitor {
group_indexes: group_ids,
exclude_group_indexes: null_group_ids,
grouping_id_index: grouping_sets.grouping_id_index,
grouping_id_value: grouping_id,
};

for scalar in eval_scalar.items.iter_mut() {
visitor.visit(&mut scalar.scalar)?;
}

let agg_plan = SExpr::create_unary(agg, cte_consumer.clone());
let eval_plan = SExpr::create_unary(eval_scalar, agg_plan);
children.push(eval_plan);
}

// fold children into result
let mut result = children.first().unwrap().clone();
for other in children.into_iter().skip(1) {
let left_outputs: Vec<(IndexType, Option<ScalarExpr>)> =
eval_scalar.items.iter().map(|x| (x.index, None)).collect();
let right_outputs = left_outputs.clone();

let union_plan = UnionAll {
left_outputs,
right_outputs,
cte_scan_names: vec![],
output_indexes: eval_scalar.items.iter().map(|x| x.index).collect(),
};
result = SExpr::create_binary(Arc::new(union_plan.into()), result, other);
}
result = SExpr::create_binary(Sequence, cte_materialized_sexpr, result);
state.add_result(result);
return Ok(());
}
}
Ok(())
}

fn matchers(&self) -> &[Matcher] {
&self.matchers
}
}

impl Default for RuleGroupingSetsToUnion {
fn default() -> Self {
Self::new()
}
}

struct ReplaceColumnForGroupingSetsVisitor {
group_indexes: Vec<IndexType>,
exclude_group_indexes: Vec<IndexType>,
grouping_id_index: IndexType,
grouping_id_value: u32,
}

impl VisitorMut<'_> for ReplaceColumnForGroupingSetsVisitor {
fn visit(&mut self, expr: &mut ScalarExpr) -> Result<()> {
let old = expr.clone();

if let ScalarExpr::BoundColumnRef(col) = expr {
if self.group_indexes.contains(&col.column.index) {
*expr = ScalarExpr::CastExpr(CastExpr {
argument: Box::new(old),
is_try: true,
target_type: Box::new(col.column.data_type.wrap_nullable()),
span: col.span,
});
} else if self.exclude_group_indexes.contains(&col.column.index) {
*expr = ScalarExpr::TypedConstantExpr(
ConstantExpr {
value: Scalar::Null,
span: col.span,
},
col.column.data_type.wrap_nullable(),
);
} else if self.grouping_id_index == col.column.index {
*expr = ScalarExpr::ConstantExpr(ConstantExpr {
value: Scalar::Number(NumberScalar::UInt32(self.grouping_id_value)),
span: col.span,
});
}
return Ok(());
}
walk_expr_mut(self, expr)
}
}
Loading
Loading