Skip to content

Commit 14beb79

Browse files
Move DataSink to datasource and add session crate (apache#15371)
* session * clippy * fmt * session * minor * Update README.md * doc * fmt * doc --------- Co-authored-by: berkaysynnada <[email protected]>
1 parent 923bfb7 commit 14beb79

File tree

38 files changed

+299
-121
lines changed

38 files changed

+299
-121
lines changed

Cargo.lock

Lines changed: 31 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ members = [
4747
"datafusion/proto/gen",
4848
"datafusion/proto-common",
4949
"datafusion/proto-common/gen",
50+
"datafusion/session",
5051
"datafusion/sql",
5152
"datafusion/sqllogictest",
5253
"datafusion/substrait",
@@ -136,6 +137,7 @@ datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", versio
136137
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "46.0.1" }
137138
datafusion-proto = { path = "datafusion/proto", version = "46.0.1" }
138139
datafusion-proto-common = { path = "datafusion/proto-common", version = "46.0.1" }
140+
datafusion-session = { path = "datafusion/session", version = "46.0.1" }
139141
datafusion-sql = { path = "datafusion/sql", version = "46.0.1" }
140142
doc-comment = "0.3"
141143
env_logger = "0.11"

datafusion/catalog-listing/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ datafusion-expr = { workspace = true }
4141
datafusion-physical-expr = { workspace = true }
4242
datafusion-physical-expr-common = { workspace = true }
4343
datafusion-physical-plan = { workspace = true }
44+
datafusion-session = { workspace = true }
4445
futures = { workspace = true }
4546
log = { workspace = true }
4647
object_store = { workspace = true }

datafusion/catalog/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@ async-trait = { workspace = true }
3636
dashmap = { workspace = true }
3737
datafusion-common = { workspace = true }
3838
datafusion-common-runtime = { workspace = true }
39+
datafusion-datasource = { workspace = true }
3940
datafusion-execution = { workspace = true }
4041
datafusion-expr = { workspace = true }
4142
datafusion-physical-expr = { workspace = true }
4243
datafusion-physical-plan = { workspace = true }
44+
datafusion-session = { workspace = true }
4345
datafusion-sql = { workspace = true }
4446
futures = { workspace = true }
4547
itertools = { workspace = true }

datafusion/catalog/src/lib.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,26 +31,32 @@
3131
//! * Simple memory based catalog: [`MemoryCatalogProviderList`], [`MemoryCatalogProvider`], [`MemorySchemaProvider`]
3232
//! * Listing schema: [`listing_schema`]
3333
34+
pub mod cte_worktable;
35+
pub mod default_table_source;
36+
pub mod information_schema;
37+
pub mod listing_schema;
3438
pub mod memory;
35-
pub use memory::{
36-
MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
37-
};
39+
pub mod stream;
40+
pub mod streaming;
41+
pub mod view;
42+
3843
mod r#async;
3944
mod catalog;
4045
mod dynamic_file;
41-
pub mod information_schema;
42-
pub mod listing_schema;
4346
mod schema;
44-
mod session;
4547
mod table;
48+
4649
pub use catalog::*;
50+
pub use datafusion_session::Session;
4751
pub use dynamic_file::catalog::*;
52+
pub use memory::{
53+
MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
54+
};
4855
pub use r#async::*;
4956
pub use schema::*;
50-
pub use session::*;
5157
pub use table::*;
52-
pub mod cte_worktable;
53-
pub mod default_table_source;
54-
pub mod stream;
55-
pub mod streaming;
56-
pub mod view;
58+
59+
// For backwards compatibility,
60+
mod session {
61+
pub use datafusion_session::Session;
62+
}

datafusion/catalog/src/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
3030
use arrow::datatypes::SchemaRef;
3131
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
3232
use datafusion_common_runtime::SpawnedTask;
33+
use datafusion_datasource::sink::{DataSink, DataSinkExec};
3334
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3435
use datafusion_expr::dml::InsertOp;
3536
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
3637
use datafusion_physical_expr::create_ordering;
37-
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
3838
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
3939
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
4040
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};

datafusion/catalog/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ pub trait TableProvider: Debug + Sync + Send {
288288
/// See [`DataSinkExec`] for the common pattern of inserting a
289289
/// streams of `RecordBatch`es as files to an ObjectStore.
290290
///
291-
/// [`DataSinkExec`]: datafusion_physical_plan::insert::DataSinkExec
291+
/// [`DataSinkExec`]: datafusion_datasource::sink::DataSinkExec
292292
async fn insert_into(
293293
&self,
294294
_state: &dyn Session,

datafusion/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ datafusion-physical-expr = { workspace = true }
123123
datafusion-physical-expr-common = { workspace = true }
124124
datafusion-physical-optimizer = { workspace = true }
125125
datafusion-physical-plan = { workspace = true }
126+
datafusion-session = { workspace = true }
126127
datafusion-sql = { workspace = true }
127128
flate2 = { version = "1.1.0", optional = true }
128129
futures = { workspace = true }

datafusion/core/src/datasource/dynamic_file.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,17 @@
2020
2121
use std::sync::Arc;
2222

23-
use async_trait::async_trait;
24-
use datafusion_catalog::{SessionStore, UrlTableFactory};
25-
use datafusion_common::plan_datafusion_err;
26-
2723
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
2824
use crate::datasource::TableProvider;
2925
use crate::error::Result;
3026
use crate::execution::context::SessionState;
3127

28+
use datafusion_catalog::UrlTableFactory;
29+
use datafusion_common::plan_datafusion_err;
30+
use datafusion_session::SessionStore;
31+
32+
use async_trait::async_trait;
33+
3234
/// [DynamicListTableFactory] is a factory that can create a [ListingTable] from the given url.
3335
#[derive(Default, Debug)]
3436
pub struct DynamicListTableFactory {

datafusion/core/src/datasource/file_format/arrow.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ use datafusion_common_runtime::{JoinSet, SpawnedTask};
5050
use datafusion_datasource::display::FileGroupDisplay;
5151
use datafusion_datasource::file::FileSource;
5252
use datafusion_datasource::file_scan_config::FileScanConfig;
53+
use datafusion_datasource::sink::{DataSink, DataSinkExec};
5354
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5455
use datafusion_expr::dml::InsertOp;
5556
use datafusion_physical_expr::PhysicalExpr;
5657
use datafusion_physical_expr_common::sort_expr::LexRequirement;
57-
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
5858

5959
use async_trait::async_trait;
6060
use bytes::Bytes;

0 commit comments

Comments
 (0)