Skip to content

Commit 005469a

Browse files
refactor: Integrate the materialized CTE into the plan and pipeline (#18226)
* add MaterializedCTE plan * build pipeline * build pipeline * add operator * remove m cte temp table * bind * fix * remove unused field * fix bind * fix schema * fix * make lint * fix * fix join * fix * refine explain * fix * fix * fix * fix * fix * fix * fix * fix * CleanupUnusedCTE * fix * fix * fix * fix * refine * refine * make lint * fix * add log * fix * fix * make lint * fix * fix * fix * fix * fix * disable distributed optimization * fix merge * fix explain join * fix logic test * fix logic test * add ref count * refactor: streaming CTE consumption * refactor plan * fix * fix * enable distributed * fix logic test * fix serial cte * fix test * fix fragment type * fix replace range join * fix explain join order * fix logic test * simplify * ref_count calculation is not required when constructing MaterializedCTE * simplify MaterializedCTE * simplify CTEConsumer * Update src/query/service/src/pipelines/builders/builder_sequence.rs Co-authored-by: Winter Zhang <[email protected]> * Update src/query/service/src/pipelines/builders/builder_materialized_cte.rs Co-authored-by: Winter Zhang <[email protected]> * make lint * rename CTEConsumer to MaterializeCTERef * Update src/query/sql/src/planner/optimizer/optimizers/operator/cte/cleanup_unused_cte.rs Co-authored-by: Winter Zhang <[email protected]> * add channel size config --------- Co-authored-by: Winter Zhang <[email protected]>
1 parent a4e72e5 commit 005469a

File tree

65 files changed

+1747
-476
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1747
-476
lines changed

โ€Žsrc/query/catalog/src/table_context.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -406,10 +406,6 @@ pub trait TableContext: Send + Sync {
406406
fn is_temp_table(&self, catalog_name: &str, database_name: &str, table_name: &str) -> bool;
407407
fn get_shared_settings(&self) -> Arc<Settings>;
408408

409-
fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str);
410-
411-
async fn drop_m_cte_temp_table(&self) -> Result<()>;
412-
413409
fn add_streams_ref(&self, _catalog: &str, _database: &str, _stream: &str, _consume: bool) {
414410
unimplemented!()
415411
}

โ€Žsrc/query/service/src/interpreters/hook/compact_hook.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use log::info;
3333

3434
use crate::interpreters::common::metrics_inc_compact_hook_compact_time_ms;
3535
use crate::interpreters::common::metrics_inc_compact_hook_main_operation_time_ms;
36-
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
3736
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
3837
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
3938
use crate::interpreters::Interpreter;
@@ -188,7 +187,6 @@ async fn compact_table(
188187
let query_ctx = ctx.clone();
189188
build_res.main_pipeline.set_on_finished(always_callback(
190189
move |_info: &ExecutionInfo| {
191-
hook_clear_m_cte_temp_table(&query_ctx)?;
192190
hook_vacuum_temp_files(&query_ctx)?;
193191
hook_disk_temp_dir(&query_ctx)?;
194192
Ok(())

โ€Žsrc/query/service/src/interpreters/hook/refresh_hook.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ use databend_storages_common_table_meta::meta::Location;
3838
use log::info;
3939
use parking_lot::RwLock;
4040

41-
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
4241
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
4342
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
4443
use crate::interpreters::Interpreter;
@@ -127,7 +126,6 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
127126
let query_ctx = ctx_cloned.clone();
128127
build_res.main_pipeline.set_on_finished(always_callback(
129128
move |_: &ExecutionInfo| {
130-
hook_clear_m_cte_temp_table(&query_ctx)?;
131129
hook_vacuum_temp_files(&query_ctx)?;
132130
hook_disk_temp_dir(&query_ctx)?;
133131
Ok(())
@@ -164,7 +162,6 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
164162
let query_ctx = ctx_cloned.clone();
165163
build_res.main_pipeline.set_on_finished(always_callback(
166164
move |_info: &ExecutionInfo| {
167-
hook_clear_m_cte_temp_table(&query_ctx)?;
168165
hook_vacuum_temp_files(&query_ctx)?;
169166
hook_disk_temp_dir(&query_ctx)?;
170167
Ok(())

โ€Žsrc/query/service/src/interpreters/hook/vacuum_hook.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,3 @@ pub fn hook_disk_temp_dir(query_ctx: &Arc<QueryContext>) -> Result<()> {
105105

106106
Ok(())
107107
}
108-
109-
pub fn hook_clear_m_cte_temp_table(query_ctx: &Arc<QueryContext>) -> Result<()> {
110-
let _ = GlobalIORuntime::instance().block_on(async move {
111-
query_ctx.drop_m_cte_temp_table().await?;
112-
Ok(())
113-
});
114-
Ok(())
115-
}

โ€Žsrc/query/service/src/interpreters/interpreter.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ use log::info;
4949
use md5::Digest;
5050
use md5::Md5;
5151

52-
use super::hook::vacuum_hook::hook_clear_m_cte_temp_table;
5352
use super::hook::vacuum_hook::hook_disk_temp_dir;
5453
use super::hook::vacuum_hook::hook_vacuum_temp_files;
5554
use super::InterpreterMetrics;
@@ -363,7 +362,6 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc<QueryContext>)
363362
);
364363
}
365364

366-
hook_clear_m_cte_temp_table(&query_ctx)?;
367365
hook_vacuum_temp_files(&query_ctx)?;
368366
hook_disk_temp_dir(&query_ctx)?;
369367

โ€Žsrc/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ use crate::interpreters::common::table_option_validation::is_valid_data_retentio
7474
use crate::interpreters::common::table_option_validation::is_valid_option_of_type;
7575
use crate::interpreters::common::table_option_validation::is_valid_random_seed;
7676
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
77-
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
7877
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
7978
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
8079
use crate::interpreters::InsertInterpreter;
@@ -283,7 +282,6 @@ impl CreateTableInterpreter {
283282
pipeline
284283
.main_pipeline
285284
.set_on_finished(always_callback(move |_: &ExecutionInfo| {
286-
hook_clear_m_cte_temp_table(&query_ctx)?;
287285
hook_vacuum_temp_files(&query_ctx)?;
288286
hook_disk_temp_dir(&query_ctx)?;
289287
Ok(())

โ€Žsrc/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ use derive_visitor::DriveMut;
6868
use log::error;
6969
use log::warn;
7070

71-
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
7271
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
7372
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
7473
use crate::interpreters::interpreter_insert_multi_table::scalar_expr_to_remote_expr;
@@ -247,7 +246,6 @@ impl ReclusterTableInterpreter {
247246
ctx.evict_table_from_cache(&catalog, &database, &table)?;
248247

249248
ctx.unload_spill_meta();
250-
hook_clear_m_cte_temp_table(&ctx)?;
251249
hook_vacuum_temp_files(&ctx)?;
252250
hook_disk_temp_dir(&ctx)?;
253251
match &info.res {

โ€Žsrc/query/service/src/interpreters/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ mod util;
175175

176176
pub use access::ManagementModeAccess;
177177
pub use common::InterpreterQueryLog;
178-
pub use hook::vacuum_hook::hook_clear_m_cte_temp_table;
179178
pub use hook::HookOperator;
180179
pub use interpreter::interpreter_plan_sql;
181180
pub use interpreter::Interpreter;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::Result;
16+
use databend_common_sql::executor::physical_plans::MaterializeCTERef;
17+
use databend_common_storages_fuse::TableContext;
18+
19+
use crate::pipelines::processors::transforms::CTESource;
20+
use crate::pipelines::PipelineBuilder;
21+
22+
impl PipelineBuilder {
23+
pub(crate) fn build_cte_consumer(&mut self, cte: &MaterializeCTERef) -> Result<()> {
24+
let receiver = self.ctx.get_materialized_cte_receiver(&cte.cte_name);
25+
self.main_pipeline.add_source(
26+
|output_port| {
27+
CTESource::create(self.ctx.clone(), output_port.clone(), receiver.clone())
28+
},
29+
self.ctx.get_settings().get_max_threads()? as usize,
30+
)?;
31+
Ok(())
32+
}
33+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::Result;
16+
use databend_common_sql::executor::physical_plans::MaterializedCTE;
17+
18+
use crate::pipelines::processors::transforms::MaterializedCteSink;
19+
use crate::pipelines::PipelineBuilder;
20+
impl PipelineBuilder {
21+
pub(crate) fn build_materialized_cte(&mut self, cte: &MaterializedCTE) -> Result<()> {
22+
self.build_pipeline(&cte.input)?;
23+
let input_schema = cte.input.output_schema()?;
24+
if let Some(output_columns) = &cte.cte_output_columns {
25+
Self::build_result_projection(
26+
&self.func_ctx,
27+
input_schema,
28+
output_columns,
29+
&mut self.main_pipeline,
30+
false,
31+
)?;
32+
}
33+
self.main_pipeline.try_resize(1)?;
34+
let tx =
35+
self.ctx
36+
.get_materialized_cte_senders(&cte.cte_name, cte.ref_count, cte.channel_size);
37+
self.main_pipeline
38+
.add_sink(|input| MaterializedCteSink::create(input, tx.clone()))
39+
}
40+
}

0 commit comments

Comments
ย (0)