Skip to content

Commit b7a992b

Browse files
author
Альберт Скальт
committed
push down filter: extend a projection if some pushed filters become unsupported
Consider the next scenario: 1. `supports_filters_pushdown` returns `Exact` on some filter, e.g. "a = 1", where column "a" is not required by the query projection. 2. "a" is removed from the table provider projection by "optimize projection" rule. 3. `supports_filters_pushdown` changes a decision and returns `Inexact` on this filter the next time. For example, input filters were changed and it prefers to use a new one. 4. "a" is not returned to the table provider projection which leads to filter that references a column which is not a part of the schema. This patch fixes this issue introducing the next logic within a filter push-down rule: 1. Collect columns that are not used in the current table provider projection, but required for filter expressions. Call it `additional_projection`. 2. If `additional_projection` is empty -- leave all as it was before. 3. Otherwise extend a table provider projection and wrap a plan with an additional projection node to preserve schema used prior to this rule.
1 parent 6162c56 commit b7a992b

File tree

2 files changed

+129
-24
lines changed

2 files changed

+129
-24
lines changed

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 125 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,13 @@ use datafusion_common::{
2828
JoinConstraint, Result,
2929
};
3030
use datafusion_expr::expr_rewriter::replace_col;
31-
use datafusion_expr::logical_plan::{
32-
CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union,
33-
};
31+
use datafusion_expr::logical_plan::{CrossJoin, Join, JoinType, LogicalPlan, Union};
3432
use datafusion_expr::utils::{
3533
conjunction, expr_to_columns, split_conjunction, split_conjunction_owned,
3634
};
3735
use datafusion_expr::{
3836
and, build_join_schema, or, BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator,
39-
Projection, TableProviderFilterPushDown,
37+
Projection, TableProviderFilterPushDown, TableScan,
4038
};
4139

4240
use crate::optimizer::ApplyOrder;
@@ -897,23 +895,103 @@ impl OptimizerRule for PushDownFilter {
897895
.map(|(pred, _)| pred);
898896
let new_scan_filters: Vec<Expr> =
899897
new_scan_filters.unique().cloned().collect();
898+
899+
let source_schema = scan.source.schema();
900+
let mut additional_projection = HashSet::new();
900901
let new_predicate: Vec<Expr> = zip
901-
.filter(|(_, res)| res != &TableProviderFilterPushDown::Exact)
902+
.filter(|(expr, res)| {
903+
if *res == TableProviderFilterPushDown::Exact {
904+
return false;
905+
}
906+
expr.apply(|expr| {
907+
if let Expr::Column(column) = expr {
908+
if let Ok(idx) = source_schema.index_of(column.name()) {
909+
if scan
910+
.projection
911+
.as_ref()
912+
.is_some_and(|p| !p.contains(&idx))
913+
{
914+
additional_projection.insert(idx);
915+
}
916+
}
917+
}
918+
Ok(TreeNodeRecursion::Continue)
919+
})
920+
.unwrap();
921+
true
922+
})
902923
.map(|(pred, _)| pred.clone())
903924
.collect();
904925

905-
let new_scan = LogicalPlan::TableScan(TableScan {
906-
filters: new_scan_filters,
907-
..scan
908-
});
909-
910-
Transformed::yes(new_scan).transform_data(|new_scan| {
911-
if let Some(predicate) = conjunction(new_predicate) {
912-
make_filter(predicate, Arc::new(new_scan)).map(Transformed::yes)
926+
// Wraps with a filter if some filters are not supported exactly.
927+
let filtered = move |plan| {
928+
if let Some(new_predicate) = conjunction(new_predicate) {
929+
Filter::try_new(new_predicate, Arc::new(plan))
930+
.map(LogicalPlan::Filter)
913931
} else {
914-
Ok(Transformed::no(new_scan))
932+
Ok(plan)
915933
}
916-
})
934+
};
935+
936+
if additional_projection.is_empty() {
937+
// No additional projection is required.
938+
let new_scan = LogicalPlan::TableScan(TableScan {
939+
filters: new_scan_filters,
940+
..scan
941+
});
942+
return filtered(new_scan).map(Transformed::yes);
943+
}
944+
945+
let scan_table_name = &scan.table_name;
946+
let new_scan = filtered(
947+
LogicalPlanBuilder::scan_with_filters_fetch(
948+
scan_table_name.clone(),
949+
Arc::clone(&scan.source),
950+
scan.projection.clone().map(|mut projection| {
951+
// Extend a projection.
952+
projection.extend(additional_projection);
953+
projection
954+
}),
955+
new_scan_filters,
956+
scan.fetch,
957+
)?
958+
.build()?,
959+
)?;
960+
961+
// Project fields required by the initial projection.
962+
let new_plan = LogicalPlan::Projection(Projection::try_new_with_schema(
963+
scan.projection
964+
.as_ref()
965+
.map(|projection| {
966+
projection
967+
.into_iter()
968+
.cloned()
969+
.map(|idx| {
970+
Expr::Column(Column::new(
971+
Some(scan_table_name.clone()),
972+
source_schema.field(idx).name(),
973+
))
974+
})
975+
.collect()
976+
})
977+
.unwrap_or_else(|| {
978+
source_schema
979+
.fields()
980+
.iter()
981+
.map(|field| {
982+
Expr::Column(Column::new(
983+
Some(scan_table_name.clone()),
984+
field.name(),
985+
))
986+
})
987+
.collect()
988+
}),
989+
Arc::new(new_scan),
990+
// Preserve a projected schema.
991+
scan.projected_schema,
992+
)?);
993+
994+
Ok(Transformed::yes(new_plan))
917995
}
918996
LogicalPlan::Extension(extension_plan) => {
919997
let prevent_cols =
@@ -1206,8 +1284,8 @@ mod tests {
12061284
use datafusion_expr::logical_plan::table_scan;
12071285
use datafusion_expr::{
12081286
col, in_list, in_subquery, lit, ColumnarValue, Extension, ScalarUDF,
1209-
ScalarUDFImpl, Signature, TableSource, TableType, UserDefinedLogicalNodeCore,
1210-
Volatility,
1287+
ScalarUDFImpl, Signature, TableScan, TableSource, TableType,
1288+
UserDefinedLogicalNodeCore, Volatility,
12111289
};
12121290

12131291
use crate::optimizer::Optimizer;
@@ -2452,6 +2530,34 @@ mod tests {
24522530
.build()
24532531
}
24542532

2533+
#[test]
2534+
fn projection_is_updated_when_filter_becomes_unsupported() -> Result<()> {
2535+
let test_provider = PushDownProvider {
2536+
filter_support: TableProviderFilterPushDown::Unsupported,
2537+
};
2538+
2539+
let projeted_schema = test_provider.schema().project(&[0])?;
2540+
let table_scan = LogicalPlan::TableScan(TableScan {
2541+
table_name: "test".into(),
2542+
// Emulate that there were pushed filters but now
2543+
// provider cannot support it.
2544+
filters: vec![col("b").eq(lit(1i64))],
2545+
projected_schema: Arc::new(DFSchema::try_from(projeted_schema)?),
2546+
projection: Some(vec![0]),
2547+
source: Arc::new(test_provider),
2548+
fetch: None,
2549+
});
2550+
2551+
let plan = LogicalPlanBuilder::from(table_scan)
2552+
.filter(col("a").eq(lit(1i64)))?
2553+
.build()?;
2554+
2555+
let expected = "Projection: test.a\
2556+
\n Filter: a = Int64(1) AND b = Int64(1)\
2557+
\n TableScan: test projection=[a, b]";
2558+
assert_optimized_plan_eq(plan, expected)
2559+
}
2560+
24552561
#[test]
24562562
fn filter_with_table_provider_exact() -> Result<()> {
24572563
let plan = table_scan_with_pushdown_provider(TableProviderFilterPushDown::Exact)?;
@@ -2514,7 +2620,7 @@ mod tests {
25142620
projected_schema: Arc::new(DFSchema::try_from(
25152621
(*test_provider.schema()).clone(),
25162622
)?),
2517-
projection: Some(vec![0]),
2623+
projection: Some(vec![0, 1]),
25182624
source: Arc::new(test_provider),
25192625
fetch: None,
25202626
});
@@ -2526,7 +2632,7 @@ mod tests {
25262632

25272633
let expected = "Projection: a, b\
25282634
\n Filter: a = Int64(10) AND b > Int64(11)\
2529-
\n TableScan: test projection=[a], partial_filters=[a = Int64(10), b > Int64(11)]";
2635+
\n TableScan: test projection=[a, b], partial_filters=[a = Int64(10), b > Int64(11)]";
25302636

25312637
assert_optimized_plan_eq(plan, expected)
25322638
}

datafusion/sql/src/parser.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -279,15 +279,14 @@ impl<'a> DFParser<'a> {
279279
sql: &str,
280280
dialect: &'a dyn Dialect,
281281
) -> Result<Self, ParserError> {
282-
let tokens = Tokenizer::new(dialect, sql).into_tokens().collect::<Result<_, _>>()?;
282+
let tokens = Tokenizer::new(dialect, sql)
283+
.into_tokens()
284+
.collect::<Result<_, _>>()?;
283285
Ok(Self::from_dialect_and_tokens(dialect, tokens))
284286
}
285287

286288
/// Create a new parser from specified dialect and tokens.
287-
pub fn from_dialect_and_tokens(
288-
dialect: &'a dyn Dialect,
289-
tokens: Vec<Token>,
290-
) -> Self {
289+
pub fn from_dialect_and_tokens(dialect: &'a dyn Dialect, tokens: Vec<Token>) -> Self {
291290
let parser = Parser::new(dialect).with_tokens(tokens);
292291
DFParser { parser }
293292
}

0 commit comments

Comments
 (0)