This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push: new 92a3e45 Consolidate `batch_size` configuration in `ExecutionConfig`, `RuntimeConfig` and `PhysicalPlanConfig` (#1562) 92a3e45 is described below commit 92a3e454baf8bffc3afd4f1aa0d8adeaa4cc768d Author: Yijie Shen <henry.yijies...@gmail.com> AuthorDate: Mon Jan 17 21:51:35 2022 +0800 Consolidate `batch_size` configuration in `ExecutionConfig`, `RuntimeConfig` and `PhysicalPlanConfig` (#1562) * Remove batch_size from PhysicalPlanConfig first * Fix newly emerged lint problems * Remove batch_size from `ExecutionConfig`, rename `PhysicalPlanConfig` to `FileScanConfig` to make it clearer * Revert "Fix newly emerged lint problems" This reverts commit 7be7ccb4558aca5159884e75cc46a3ff68c40922. --- ballista/rust/core/proto/ballista.proto | 1 - .../rust/core/src/serde/logical_plan/from_proto.rs | 4 +- ballista/rust/core/src/serde/logical_plan/mod.rs | 10 ++-- .../rust/core/src/serde/logical_plan/to_proto.rs | 4 +- .../core/src/serde/physical_plan/from_proto.rs | 9 ++-- .../rust/core/src/serde/physical_plan/to_proto.rs | 7 ++- benchmarks/src/bin/tpch.rs | 10 ++-- datafusion/benches/physical_plan.rs | 2 +- datafusion/benches/sort_limit_query_sql.rs | 7 ++- datafusion/src/datasource/datasource.rs | 1 - datafusion/src/datasource/empty.rs | 1 - datafusion/src/datasource/file_format/avro.rs | 54 ++++++++++++---------- datafusion/src/datasource/file_format/csv.rs | 22 ++++----- datafusion/src/datasource/file_format/json.rs | 22 ++++----- datafusion/src/datasource/file_format/mod.rs | 4 +- datafusion/src/datasource/file_format/parquet.rs | 30 ++++++------ datafusion/src/datasource/listing/table.rs | 12 ++--- datafusion/src/datasource/memory.rs | 12 ++--- datafusion/src/execution/context.rs | 15 ++---- datafusion/src/optimizer/filter_push_down.rs | 1 - .../src/physical_optimizer/coalesce_batches.rs | 2 +- datafusion/src/physical_optimizer/repartition.rs | 8 ++-- .../src/physical_plan/coalesce_partitions.rs | 5 +- datafusion/src/physical_plan/file_format/avro.rs | 21 ++++----- datafusion/src/physical_plan/file_format/csv.rs | 24 ++++------ datafusion/src/physical_plan/file_format/json.rs | 19 ++++---- datafusion/src/physical_plan/file_format/mod.rs | 11 ++--- .../src/physical_plan/file_format/parquet.rs | 21 ++++----- datafusion/src/physical_plan/filter.rs | 5 +- datafusion/src/physical_plan/limit.rs | 5 +- datafusion/src/physical_plan/mod.rs | 2 +- datafusion/src/physical_plan/planner.rs | 4 +- datafusion/src/physical_plan/projection.rs | 5 +- .../src/physical_plan/sorts/external_sort.rs | 6 +-- datafusion/src/physical_plan/sorts/sort.rs | 5 +- .../physical_plan/sorts/sort_preserving_merge.rs | 44 ++++++------------ datafusion/src/physical_plan/union.rs | 8 ++-- datafusion/src/physical_plan/windows/mod.rs | 5 +- datafusion/tests/custom_sources.rs | 1 - datafusion/tests/provider_filter_pushdown.rs | 1 - datafusion/tests/sql/avro.rs | 2 +- datafusion/tests/sql/explain_analyze.rs | 8 ++-- datafusion/tests/statistics.rs | 1 - datafusion/tests/user_defined_plan.rs | 6 +-- 44 files changed, 188 insertions(+), 259 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index cdd27cf..a0bb841 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -626,7 +626,6 @@ message ScanLimit { message FileScanExecConf { repeated FileGroup file_groups = 1; Schema schema = 2; - uint32 batch_size = 3; repeated uint32 projection = 4; ScanLimit limit = 5; Statistics statistics = 6; diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index dfac547..427e6bf 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -246,8 +246,8 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode { .collect::<Result<Vec<_>, _>>()?, partition_count as usize, ), - PartitionMethod::RoundRobin(batch_size) => { - Partitioning::RoundRobinBatch(batch_size as usize) + PartitionMethod::RoundRobin(partition_count) => { + Partitioning::RoundRobinBatch(partition_count as usize) } }; diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 94d4e8e..c09b8a5 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -65,7 +65,7 @@ mod roundtrip_tests { async fn roundtrip_repartition() -> Result<()> { use datafusion::logical_plan::Partitioning; - let test_batch_sizes = [usize::MIN, usize::MAX, 43256]; + let test_partition_counts = [usize::MIN, usize::MAX, 43256]; let test_expr: Vec<Expr> = vec![col("c1") + col("c2"), Expr::Literal((4.0).into())]; @@ -92,8 +92,8 @@ mod roundtrip_tests { .map_err(BallistaError::DataFusionError)?, ); - for batch_size in test_batch_sizes.iter() { - let rr_repartition = Partitioning::RoundRobinBatch(*batch_size); + for partition_count in test_partition_counts.iter() { + let rr_repartition = Partitioning::RoundRobinBatch(*partition_count); let roundtrip_plan = LogicalPlan::Repartition(Repartition { input: plan.clone(), @@ -102,7 +102,7 @@ mod roundtrip_tests { roundtrip_test!(roundtrip_plan); - let h_repartition = Partitioning::Hash(test_expr.clone(), *batch_size); + let h_repartition = Partitioning::Hash(test_expr.clone(), *partition_count); let roundtrip_plan = LogicalPlan::Repartition(Repartition { input: plan.clone(), @@ -111,7 +111,7 @@ mod roundtrip_tests { roundtrip_test!(roundtrip_plan); - let no_expr_hrepartition = Partitioning::Hash(Vec::new(), *batch_size); + let no_expr_hrepartition = Partitioning::Hash(Vec::new(), *partition_count); let roundtrip_plan = LogicalPlan::Repartition(Repartition { input: plan.clone(), diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index b783b54..5d77a97 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -826,8 +826,8 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan { partition_count: *partition_count as u64, }) } - Partitioning::RoundRobinBatch(batch_size) => { - PartitionMethod::RoundRobin(*batch_size as u64) + Partitioning::RoundRobinBatch(partition_count) => { + PartitionMethod::RoundRobin(*partition_count as u64) } }; diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index cad27b3..0b3d503 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -48,7 +48,7 @@ use datafusion::logical_plan::{ use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::file_format::{ - AvroExec, CsvExec, ParquetExec, PhysicalPlanConfig, + AvroExec, CsvExec, FileScanConfig, ParquetExec, }; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::hash_join::PartitionMode; @@ -770,10 +770,10 @@ impl TryInto<Statistics> for &protobuf::Statistics { } } -impl TryInto<PhysicalPlanConfig> for &protobuf::FileScanExecConf { +impl TryInto<FileScanConfig> for &protobuf::FileScanExecConf { type Error = BallistaError; - fn try_into(self) -> Result<PhysicalPlanConfig, Self::Error> { + fn try_into(self) -> Result<FileScanConfig, Self::Error> { let schema = Arc::new(convert_required!(self.schema)?); let projection = self .projection @@ -787,7 +787,7 @@ impl TryInto<PhysicalPlanConfig> for &protobuf::FileScanExecConf { }; let statistics = convert_required!(self.statistics)?; - Ok(PhysicalPlanConfig { + Ok(FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema: schema, file_groups: self @@ -797,7 +797,6 @@ impl TryInto<PhysicalPlanConfig> for &protobuf::FileScanExecConf { .collect::<Result<Vec<_>, _>>()?, statistics, projection, - batch_size: self.batch_size as usize, limit: self.limit.as_ref().map(|sl| sl.limit as usize), table_partition_cols: vec![], }) diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 930f075..4c6db9a 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -43,7 +43,7 @@ use datafusion::physical_plan::{ }; use datafusion::physical_plan::{file_format::AvroExec, filter::FilterExec}; use datafusion::physical_plan::{ - file_format::PhysicalPlanConfig, hash_aggregate::AggregateMode, + file_format::FileScanConfig, hash_aggregate::AggregateMode, }; use datafusion::{ datasource::PartitionedFile, physical_plan::coalesce_batches::CoalesceBatchesExec, @@ -677,10 +677,10 @@ impl From<&Statistics> for protobuf::Statistics { } } -impl TryFrom<&PhysicalPlanConfig> for protobuf::FileScanExecConf { +impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf { type Error = BallistaError; fn try_from( - conf: &PhysicalPlanConfig, + conf: &FileScanConfig, ) -> Result<protobuf::FileScanExecConf, Self::Error> { let file_groups = conf .file_groups @@ -700,7 +700,6 @@ impl TryFrom<&PhysicalPlanConfig> for protobuf::FileScanExecConf { .map(|n| *n as u32) .collect(), schema: Some(conf.file_schema.as_ref().into()), - batch_size: conf.batch_size as u32, table_partition_cols: conf.table_partition_cols.to_vec(), }) } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 9563434..0b9fba5 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -277,13 +277,9 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB println!("Loading table '{}' into memory", table); let start = Instant::now(); - let memtable = MemTable::load( - table_provider, - opt.batch_size, - Some(opt.partitions), - runtime.clone(), - ) - .await?; + let memtable = + MemTable::load(table_provider, Some(opt.partitions), runtime.clone()) + .await?; println!( "Loaded table '{}' into memory in {} ms", table, diff --git a/datafusion/benches/physical_plan.rs b/datafusion/benches/physical_plan.rs index e9eb53d..8dd1f49 100644 --- a/datafusion/benches/physical_plan.rs +++ b/datafusion/benches/physical_plan.rs @@ -56,7 +56,7 @@ fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, sort: &[&str]) { None, ) .unwrap(); - let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec), 8192)); + let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let rt = Runtime::new().unwrap(); let rt_env = Arc::new(RuntimeEnv::default()); diff --git a/datafusion/benches/sort_limit_query_sql.rs b/datafusion/benches/sort_limit_query_sql.rs index 8f40928..41f8c17 100644 --- a/datafusion/benches/sort_limit_query_sql.rs +++ b/datafusion/benches/sort_limit_query_sql.rs @@ -84,10 +84,9 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> { ctx.state.lock().unwrap().config.target_partitions = 1; let runtime = ctx.state.lock().unwrap().runtime_env.clone(); - let mem_table = - MemTable::load(Arc::new(csv), 16 * 1024, Some(partitions), runtime) - .await - .unwrap(); + let mem_table = MemTable::load(Arc::new(csv), Some(partitions), runtime) + .await + .unwrap(); ctx.register_table("aggregate_test_100", Arc::new(mem_table)) .unwrap(); ctx_holder.lock().unwrap().push(Arc::new(Mutex::new(ctx))) diff --git a/datafusion/src/datasource/datasource.rs b/datafusion/src/datasource/datasource.rs index 823b408..1b59c85 100644 --- a/datafusion/src/datasource/datasource.rs +++ b/datafusion/src/datasource/datasource.rs @@ -77,7 +77,6 @@ pub trait TableProvider: Sync + Send { async fn scan( &self, projection: &Option<Vec<usize>>, - batch_size: usize, filters: &[Expr], // limit can be used to reduce the amount scanned // from the datasource as a performance optimization. diff --git a/datafusion/src/datasource/empty.rs b/datafusion/src/datasource/empty.rs index 380c5a7..9665605 100644 --- a/datafusion/src/datasource/empty.rs +++ b/datafusion/src/datasource/empty.rs @@ -53,7 +53,6 @@ impl TableProvider for EmptyTable { async fn scan( &self, projection: &Option<Vec<usize>>, - _batch_size: usize, _filters: &[Expr], _limit: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { diff --git a/datafusion/src/datasource/file_format/avro.rs b/datafusion/src/datasource/file_format/avro.rs index e1ae174..08eb343 100644 --- a/datafusion/src/datasource/file_format/avro.rs +++ b/datafusion/src/datasource/file_format/avro.rs @@ -30,7 +30,7 @@ use crate::avro_to_arrow::read_avro_schema_from_reader; use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::error::Result; use crate::logical_plan::Expr; -use crate::physical_plan::file_format::{AvroExec, PhysicalPlanConfig}; +use crate::physical_plan::file_format::{AvroExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; @@ -61,7 +61,7 @@ impl FileFormat for AvroFormat { async fn create_physical_plan( &self, - conf: PhysicalPlanConfig, + conf: FileScanConfig, _filters: &[Expr], ) -> Result<Arc<dyn ExecutionPlan>> { let exec = AvroExec::new(conf); @@ -81,7 +81,7 @@ mod tests { }; use super::*; - use crate::execution::runtime_env::RuntimeEnv; + use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use arrow::array::{ BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, TimestampMicrosecondArray, @@ -90,9 +90,9 @@ mod tests { #[tokio::test] async fn read_small_batches() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(2))?); let projection = None; - let runtime = Arc::new(RuntimeEnv::default()); - let exec = get_exec("alltypes_plain.avro", &projection, 2, None).await?; + let exec = get_exec("alltypes_plain.avro", &projection, None).await?; let stream = exec.execute(0, runtime).await?; let tt_batches = stream @@ -111,9 +111,10 @@ mod tests { #[tokio::test] async fn read_limit() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); let projection = None; - let exec = get_exec("alltypes_plain.avro", &projection, 1024, Some(1)).await?; - let batches = collect(exec).await?; + let exec = get_exec("alltypes_plain.avro", &projection, Some(1)).await?; + let batches = collect(exec, runtime).await?; assert_eq!(1, batches.len()); assert_eq!(11, batches[0].num_columns()); assert_eq!(1, batches[0].num_rows()); @@ -123,8 +124,9 @@ mod tests { #[tokio::test] async fn read_alltypes_plain_avro() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); let projection = None; - let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.avro", &projection, None).await?; let x: Vec<String> = exec .schema() @@ -149,7 +151,7 @@ mod tests { x ); - let batches = collect(exec).await?; + let batches = collect(exec, runtime).await?; assert_eq!(batches.len(), 1); let expected = vec![ @@ -173,10 +175,11 @@ mod tests { #[tokio::test] async fn read_bool_alltypes_plain_avro() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![1]); - let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.avro", &projection, None).await?; - let batches = collect(exec).await?; + let batches = collect(exec, runtime).await?; assert_eq!(batches.len(), 1); assert_eq!(1, batches[0].num_columns()); assert_eq!(8, batches[0].num_rows()); @@ -201,10 +204,11 @@ mod tests { #[tokio::test] async fn read_i32_alltypes_plain_avro() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![0]); - let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.avro", &projection, None).await?; - let batches = collect(exec).await?; + let batches = collect(exec, runtime).await?; assert_eq!(batches.len(), 1); assert_eq!(1, batches[0].num_columns()); assert_eq!(8, batches[0].num_rows()); @@ -226,10 +230,11 @@ mod tests { #[tokio::test] async fn read_i96_alltypes_plain_avro() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![10]); - let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.avro", &projection, None).await?; - let batches = collect(exec).await?; + let batches = collect(exec, runtime).await?; assert_eq!(batches.len(), 1); assert_eq!(1, batches[0].num_columns()); assert_eq!(8, batches[0].num_rows()); @@ -251,10 +256,11 @@ mod tests { #[tokio::test] async fn read_f32_alltypes_plain_avro() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![6]); - let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.avro", &projection, None).await?; - let batches = collect(exec).await?; + let batches = collect(exec, runtime).await?; assert_eq!(batches.len(), 1); assert_eq!(1, batches[0].num_columns()); assert_eq!(8, batches[0].num_rows()); @@ -279,10 +285,11 @@ mod tests { #[tokio::test] async fn read_f64_alltypes_plain_avro() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![7]); - let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.avro", &projection, None).await?; - let batches = collect(exec).await?; + let batches = collect(exec, runtime).await?; assert_eq!(batches.len(), 1); assert_eq!(1, batches[0].num_columns()); assert_eq!(8, batches[0].num_rows()); @@ -307,10 +314,11 @@ mod tests { #[tokio::test] async fn read_binary_alltypes_plain_avro() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![9]); - let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.avro", &projection, None).await?; - let batches = collect(exec).await?; + let batches = collect(exec, runtime).await?; assert_eq!(batches.len(), 1); assert_eq!(1, batches[0].num_columns()); assert_eq!(8, batches[0].num_rows()); @@ -336,7 +344,6 @@ mod tests { async fn get_exec( file_name: &str, projection: &Option<Vec<usize>>, - batch_size: usize, limit: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { let testdata = crate::test_util::arrow_test_data(); @@ -353,13 +360,12 @@ mod tests { let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]]; let exec = format .create_physical_plan( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema, file_groups, statistics, projection: projection.clone(), - batch_size, limit, table_partition_cols: vec![], }, diff --git a/datafusion/src/datasource/file_format/csv.rs b/datafusion/src/datasource/file_format/csv.rs index 99770a8..f0a70d9 100644 --- a/datafusion/src/datasource/file_format/csv.rs +++ b/datafusion/src/datasource/file_format/csv.rs @@ -29,7 +29,7 @@ use super::FileFormat; use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::error::Result; use crate::logical_plan::Expr; -use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; +use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; @@ -123,7 +123,7 @@ impl FileFormat for CsvFormat { async fn create_physical_plan( &self, - conf: PhysicalPlanConfig, + conf: FileScanConfig, _filters: &[Expr], ) -> Result<Arc<dyn ExecutionPlan>> { let exec = CsvExec::new(conf, self.has_header, self.delimiter); @@ -136,10 +136,10 @@ mod tests { use arrow::array::StringArray; use super::*; - use crate::execution::runtime_env::RuntimeEnv; + use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use crate::{ datasource::{ - file_format::PhysicalPlanConfig, + file_format::FileScanConfig, object_store::local::{ local_object_reader, local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, @@ -150,10 +150,10 @@ mod tests { #[tokio::test] async fn read_small_batches() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); + let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(2))?); // skip column 9 that overflows the automaticly discovered column type of i64 (u64 would work) let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]); - let exec = get_exec("aggregate_test_100.csv", &projection, 2, None).await?; + let exec = get_exec("aggregate_test_100.csv", &projection, None).await?; let stream = exec.execute(0, runtime).await?; let tt_batches: i32 = stream @@ -178,7 +178,7 @@ mod tests { async fn read_limit() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![0, 1, 2, 3]); - let exec = get_exec("aggregate_test_100.csv", &projection, 1024, Some(1)).await?; + let exec = get_exec("aggregate_test_100.csv", &projection, Some(1)).await?; let batches = collect(exec, runtime).await?; assert_eq!(1, batches.len()); assert_eq!(4, batches[0].num_columns()); @@ -190,7 +190,7 @@ mod tests { #[tokio::test] async fn infer_schema() -> Result<()> { let projection = None; - let exec = get_exec("aggregate_test_100.csv", &projection, 1024, None).await?; + let exec = get_exec("aggregate_test_100.csv", &projection, None).await?; let x: Vec<String> = exec .schema() @@ -224,7 +224,7 @@ mod tests { async fn read_char_column() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![0]); - let exec = get_exec("aggregate_test_100.csv", &projection, 1024, None).await?; + let exec = get_exec("aggregate_test_100.csv", &projection, None).await?; let batches = collect(exec, runtime).await.expect("Collect batches"); @@ -250,7 +250,6 @@ mod tests { async fn get_exec( file_name: &str, projection: &Option<Vec<usize>>, - batch_size: usize, limit: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { let testdata = crate::test_util::arrow_test_data(); @@ -267,13 +266,12 @@ mod tests { let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]]; let exec = format .create_physical_plan( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema, file_groups, statistics, projection: projection.clone(), - batch_size, limit, table_partition_cols: vec![], }, diff --git a/datafusion/src/datasource/file_format/json.rs b/datafusion/src/datasource/file_format/json.rs index a8f1176..d7a278d 100644 --- a/datafusion/src/datasource/file_format/json.rs +++ b/datafusion/src/datasource/file_format/json.rs @@ -29,7 +29,7 @@ use async_trait::async_trait; use futures::StreamExt; use super::FileFormat; -use super::PhysicalPlanConfig; +use super::FileScanConfig; use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::error::Result; use crate::logical_plan::Expr; @@ -85,7 +85,7 @@ impl FileFormat for JsonFormat { async fn create_physical_plan( &self, - conf: PhysicalPlanConfig, + conf: FileScanConfig, _filters: &[Expr], ) -> Result<Arc<dyn ExecutionPlan>> { let exec = NdJsonExec::new(conf); @@ -98,10 +98,10 @@ mod tests { use arrow::array::Int64Array; use super::*; - use crate::execution::runtime_env::RuntimeEnv; + use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use crate::{ datasource::{ - file_format::PhysicalPlanConfig, + file_format::FileScanConfig, object_store::local::{ local_object_reader, local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, @@ -112,9 +112,9 @@ mod tests { #[tokio::test] async fn read_small_batches() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); + let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(2))?); let projection = None; - let exec = get_exec(&projection, 2, None).await?; + let exec = get_exec(&projection, None).await?; let stream = exec.execute(0, runtime).await?; let tt_batches: i32 = stream @@ -139,7 +139,7 @@ mod tests { async fn read_limit() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let projection = None; - let exec = get_exec(&projection, 1024, Some(1)).await?; + let exec = get_exec(&projection, Some(1)).await?; let batches = collect(exec, runtime).await?; assert_eq!(1, batches.len()); assert_eq!(4, batches[0].num_columns()); @@ -151,7 +151,7 @@ mod tests { #[tokio::test] async fn infer_schema() -> Result<()> { let projection = None; - let exec = get_exec(&projection, 1024, None).await?; + let exec = get_exec(&projection, None).await?; let x: Vec<String> = exec .schema() @@ -168,7 +168,7 @@ mod tests { async fn read_int_column() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![0]); - let exec = get_exec(&projection, 1024, None).await?; + let exec = get_exec(&projection, None).await?; let batches = collect(exec, runtime).await.expect("Collect batches"); @@ -196,7 +196,6 @@ mod tests { async fn get_exec( projection: &Option<Vec<usize>>, - batch_size: usize, limit: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { let filename = "tests/jsons/2.json"; @@ -212,13 +211,12 @@ mod tests { let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]]; let exec = format .create_physical_plan( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema, file_groups, statistics, projection: projection.clone(), - batch_size, limit, table_partition_cols: vec![], }, diff --git a/datafusion/src/datasource/file_format/mod.rs b/datafusion/src/datasource/file_format/mod.rs index 5449161..21da2e1 100644 --- a/datafusion/src/datasource/file_format/mod.rs +++ b/datafusion/src/datasource/file_format/mod.rs @@ -29,7 +29,7 @@ use std::sync::Arc; use crate::arrow::datatypes::SchemaRef; use crate::error::Result; use crate::logical_plan::Expr; -use crate::physical_plan::file_format::PhysicalPlanConfig; +use crate::physical_plan::file_format::FileScanConfig; use crate::physical_plan::{ExecutionPlan, Statistics}; use async_trait::async_trait; @@ -59,7 +59,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { /// according to this file format. async fn create_physical_plan( &self, - conf: PhysicalPlanConfig, + conf: FileScanConfig, filters: &[Expr], ) -> Result<Arc<dyn ExecutionPlan>>; } diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 608795a..a947518 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -35,7 +35,7 @@ use parquet::file::serialized_reader::SerializedFileReader; use parquet::file::statistics::Statistics as ParquetStatistics; use super::FileFormat; -use super::PhysicalPlanConfig; +use super::FileScanConfig; use crate::arrow::datatypes::{DataType, Field}; use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::datasource::{create_max_min_accs, get_col_stats}; @@ -104,7 +104,7 @@ impl FileFormat for ParquetFormat { async fn create_physical_plan( &self, - conf: PhysicalPlanConfig, + conf: FileScanConfig, filters: &[Expr], ) -> Result<Arc<dyn ExecutionPlan>> { // If enable pruning then combine the filters to build the predicate. @@ -341,7 +341,7 @@ mod tests { }; use super::*; - use crate::execution::runtime_env::RuntimeEnv; + use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use arrow::array::{ BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, TimestampNanosecondArray, @@ -350,9 +350,9 @@ mod tests { #[tokio::test] async fn read_small_batches() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); + let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(2))?); let projection = None; - let exec = get_exec("alltypes_plain.parquet", &projection, 2, None).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; let stream = exec.execute(0, runtime).await?; let tt_batches = stream @@ -377,7 +377,7 @@ mod tests { async fn read_limit() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let projection = None; - let exec = get_exec("alltypes_plain.parquet", &projection, 1024, Some(1)).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, Some(1)).await?; // note: even if the limit is set, the executor rounds up to the batch size assert_eq!(exec.statistics().num_rows, Some(8)); @@ -395,7 +395,7 @@ mod tests { async fn read_alltypes_plain_parquet() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let projection = None; - let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; let x: Vec<String> = exec .schema() @@ -432,7 +432,7 @@ mod tests { async fn read_bool_alltypes_plain_parquet() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![1]); - let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; let batches = collect(exec, runtime).await?; assert_eq!(1, batches.len()); @@ -461,7 +461,7 @@ mod tests { async fn read_i32_alltypes_plain_parquet() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![0]); - let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; let batches = collect(exec, runtime).await?; assert_eq!(1, batches.len()); @@ -487,7 +487,7 @@ mod tests { async fn read_i96_alltypes_plain_parquet() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![10]); - let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; let batches = collect(exec, runtime).await?; assert_eq!(1, batches.len()); @@ -513,7 +513,7 @@ mod tests { async fn read_f32_alltypes_plain_parquet() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![6]); - let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; let batches = collect(exec, runtime).await?; assert_eq!(1, batches.len()); @@ -542,7 +542,7 @@ mod tests { async fn read_f64_alltypes_plain_parquet() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![7]); - let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; let batches = collect(exec, runtime).await?; assert_eq!(1, batches.len()); @@ -571,7 +571,7 @@ mod tests { async fn read_binary_alltypes_plain_parquet() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![9]); - let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; let batches = collect(exec, runtime).await?; assert_eq!(1, batches.len()); @@ -599,7 +599,6 @@ mod tests { async fn get_exec( file_name: &str, projection: &Option<Vec<usize>>, - batch_size: usize, limit: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { let testdata = crate::test_util::parquet_test_data(); @@ -616,13 +615,12 @@ mod tests { let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]]; let exec = format .create_physical_plan( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema, file_groups, statistics, projection: projection.clone(), - batch_size, limit, table_partition_cols: vec![], }, diff --git a/datafusion/src/datasource/listing/table.rs b/datafusion/src/datasource/listing/table.rs index 22e3f75..ff6d322 100644 --- a/datafusion/src/datasource/listing/table.rs +++ b/datafusion/src/datasource/listing/table.rs @@ -28,7 +28,7 @@ use crate::{ logical_plan::Expr, physical_plan::{ empty::EmptyExec, - file_format::{PhysicalPlanConfig, DEFAULT_PARTITION_COLUMN_DATATYPE}, + file_format::{FileScanConfig, DEFAULT_PARTITION_COLUMN_DATATYPE}, ExecutionPlan, Statistics, }, }; @@ -170,7 +170,6 @@ impl TableProvider for ListingTable { async fn scan( &self, projection: &Option<Vec<usize>>, - batch_size: usize, filters: &[Expr], limit: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { @@ -193,13 +192,12 @@ impl TableProvider for ListingTable { self.options .format .create_physical_plan( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::clone(&self.object_store), file_schema: Arc::clone(&self.file_schema), file_groups: partitioned_file_lists, statistics, projection: projection.clone(), - batch_size, limit, table_partition_cols: self.options.table_partition_cols.clone(), }, @@ -289,7 +287,7 @@ mod tests { let table = load_table("alltypes_plain.parquet").await?; let projection = None; let exec = table - .scan(&projection, 1024, &[], None) + .scan(&projection, &[], None) .await .expect("Scan table"); @@ -313,7 +311,7 @@ mod tests { .await?; let table = ListingTable::new(Arc::new(LocalFileSystem {}), filename, schema, opt); - let exec = table.scan(&None, 1024, &[], None).await?; + let exec = table.scan(&None, &[], None).await?; assert_eq!(exec.statistics().num_rows, Some(8)); assert_eq!(exec.statistics().total_byte_size, Some(671)); @@ -345,7 +343,7 @@ mod tests { let filter = Expr::not_eq(col("p1"), lit("v1")); let scan = table - .scan(&None, 1024, &[filter], None) + .scan(&None, &[filter], None) .await .expect("Empty execution plan"); diff --git a/datafusion/src/datasource/memory.rs b/datafusion/src/datasource/memory.rs index f09e691..c732b17 100644 --- a/datafusion/src/datasource/memory.rs +++ b/datafusion/src/datasource/memory.rs @@ -64,12 +64,11 @@ impl MemTable { /// Create a mem table by reading from another data source pub async fn load( t: Arc<dyn TableProvider>, - batch_size: usize, output_partitions: Option<usize>, runtime: Arc<RuntimeEnv>, ) -> Result<Self> { let schema = t.schema(); - let exec = t.scan(&None, batch_size, &[], None).await?; + let exec = t.scan(&None, &[], None).await?; let partition_count = exec.output_partitioning().partition_count(); let tasks = (0..partition_count) @@ -131,7 +130,6 @@ impl TableProvider for MemTable { async fn scan( &self, projection: &Option<Vec<usize>>, - _batch_size: usize, _filters: &[Expr], _limit: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { @@ -175,7 +173,7 @@ mod tests { let provider = MemTable::try_new(schema, vec![vec![batch]])?; // scan with projection - let exec = provider.scan(&Some(vec![2, 1]), 1024, &[], None).await?; + let exec = provider.scan(&Some(vec![2, 1]), &[], None).await?; let mut it = exec.execute(0, runtime).await?; let batch2 = it.next().await.unwrap()?; assert_eq!(2, batch2.schema().fields().len()); @@ -206,7 +204,7 @@ mod tests { let provider = MemTable::try_new(schema, vec![vec![batch]])?; - let exec = provider.scan(&None, 1024, &[], None).await?; + let exec = provider.scan(&None, &[], None).await?; let mut it = exec.execute(0, runtime).await?; let batch1 = it.next().await.unwrap()?; assert_eq!(3, batch1.schema().fields().len()); @@ -236,7 +234,7 @@ mod tests { let projection: Vec<usize> = vec![0, 4]; - match provider.scan(&Some(projection), 1024, &[], None).await { + match provider.scan(&Some(projection), &[], None).await { Err(DataFusionError::Internal(e)) => { assert_eq!("\"Projection index out of range\"", format!("{:?}", e)) } @@ -358,7 +356,7 @@ mod tests { let provider = MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?; - let exec = provider.scan(&None, 1024, &[], None).await?; + let exec = provider.scan(&None, &[], None).await?; let mut it = exec.execute(0, runtime).await?; let batch1 = it.next().await.unwrap()?; assert_eq!(3, batch1.schema().fields().len()); diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 03a0b4d..7a47b37 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -179,8 +179,7 @@ impl ExecutionContext { .register_catalog(config.default_catalog.clone(), default_catalog); } - let runtime_env = - Arc::new(RuntimeEnv::new(config.runtime_config.clone()).unwrap()); + let runtime_env = Arc::new(RuntimeEnv::new(config.runtime.clone()).unwrap()); Self { state: Arc::new(Mutex::new(ExecutionContextState { @@ -872,8 +871,6 @@ impl QueryPlanner for DefaultQueryPlanner { pub struct ExecutionConfig { /// Number of partitions for query execution. Increasing partitions can increase concurrency. pub target_partitions: usize, - /// Default batch size when reading data sources - pub batch_size: usize, /// Responsible for optimizing a logical plan optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>, /// Responsible for optimizing a physical execution plan @@ -901,14 +898,13 @@ pub struct ExecutionConfig { /// Should Datafusion parquet reader using the predicate to prune data parquet_pruning: bool, /// Runtime configurations such as memory threshold and local disk for spill - pub runtime_config: RuntimeConfig, + pub runtime: RuntimeConfig, } impl Default for ExecutionConfig { fn default() -> Self { Self { target_partitions: num_cpus::get(), - batch_size: 8192, optimizers: vec![ // Simplify expressions first to maximize the chance // of applying other optimizations @@ -936,7 +932,7 @@ impl Default for ExecutionConfig { repartition_aggregations: true, repartition_windows: true, parquet_pruning: true, - runtime_config: RuntimeConfig::default(), + runtime: RuntimeConfig::default(), } } } @@ -959,7 +955,7 @@ impl ExecutionConfig { pub fn with_batch_size(mut self, n: usize) -> Self { // batch size must be greater than zero assert!(n > 0); - self.batch_size = n; + self.runtime.batch_size = n; self } @@ -1057,7 +1053,7 @@ impl ExecutionConfig { /// Customize runtime config pub fn with_runtime_config(mut self, config: RuntimeConfig) -> Self { - self.runtime_config = config; + self.runtime = config; self } } @@ -3618,7 +3614,6 @@ mod tests { async fn scan( &self, _: &Option<Vec<usize>>, - _: usize, _: &[Expr], _: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index c721712..ef171e1 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -1158,7 +1158,6 @@ mod tests { async fn scan( &self, _: &Option<Vec<usize>>, - _: usize, _: &[Expr], _: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { diff --git a/datafusion/src/physical_optimizer/coalesce_batches.rs b/datafusion/src/physical_optimizer/coalesce_batches.rs index 14616a7..98e65a2 100644 --- a/datafusion/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/src/physical_optimizer/coalesce_batches.rs @@ -75,7 +75,7 @@ impl PhysicalOptimizerRule for CoalesceBatches { // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is // implemented. For now, we choose half the configured batch size to avoid copies // when a small number of rows are removed from a batch - let target_batch_size = config.batch_size / 2; + let target_batch_size = config.runtime.batch_size / 2; Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size)) } else { plan.clone() diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 26f77ef..0926ed7 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -111,7 +111,7 @@ mod tests { use super::*; use crate::datasource::PartitionedFile; - use crate::physical_plan::file_format::{ParquetExec, PhysicalPlanConfig}; + use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::Statistics; use crate::test::object_store::TestObjectStore; @@ -122,13 +122,12 @@ mod tests { let parquet_project = ProjectionExec::try_new( vec![], Arc::new(ParquetExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: TestObjectStore::new_arc(&[("x", 100)]), file_schema, file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]], statistics: Statistics::default(), projection: None, - batch_size: 2048, limit: None, table_partition_cols: vec![], }, @@ -161,7 +160,7 @@ mod tests { Arc::new(ProjectionExec::try_new( vec![], Arc::new(ParquetExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: TestObjectStore::new_arc(&[("x", 100)]), file_schema, file_groups: vec![vec![PartitionedFile::new( @@ -170,7 +169,6 @@ mod tests { )]], statistics: Statistics::default(), projection: None, - batch_size: 2048, limit: None, table_partition_cols: vec![], }, diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs b/datafusion/src/physical_plan/coalesce_partitions.rs index 3fcacbb..2e3a552 100644 --- a/datafusion/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/src/physical_plan/coalesce_partitions.rs @@ -213,7 +213,7 @@ mod tests { use super::*; use crate::datasource::object_store::local::LocalFileSystem; - use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; + use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::{collect, common}; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending}; @@ -228,13 +228,12 @@ mod tests { let (_, files) = test::create_partitioned_csv("aggregate_test_100.csv", num_partitions)?; let csv = CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema: schema, file_groups: files, statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: None, table_partition_cols: vec![], }, diff --git a/datafusion/src/physical_plan/file_format/avro.rs b/datafusion/src/physical_plan/file_format/avro.rs index 6ab5bf9..9de8ffd 100644 --- a/datafusion/src/physical_plan/file_format/avro.rs +++ b/datafusion/src/physical_plan/file_format/avro.rs @@ -33,19 +33,19 @@ use std::sync::Arc; #[cfg(feature = "avro")] use super::file_stream::{BatchIter, FileStream}; -use super::PhysicalPlanConfig; +use super::FileScanConfig; /// Execution plan for scanning Avro data source #[derive(Debug, Clone)] pub struct AvroExec { - base_config: PhysicalPlanConfig, + base_config: FileScanConfig, projected_statistics: Statistics, projected_schema: SchemaRef, } impl AvroExec { /// Create a new Avro reader execution plan provided base configurations - pub fn new(base_config: PhysicalPlanConfig) -> Self { + pub fn new(base_config: FileScanConfig) -> Self { let (projected_schema, projected_statistics) = base_config.project(); Self { @@ -55,7 +55,7 @@ impl AvroExec { } } /// Ref to the base configs - pub fn base_config(&self) -> &PhysicalPlanConfig { + pub fn base_config(&self) -> &FileScanConfig { &self.base_config } } @@ -107,11 +107,11 @@ impl ExecutionPlan for AvroExec { async fn execute( &self, partition: usize, - _runtime: Arc<RuntimeEnv>, + runtime: Arc<RuntimeEnv>, ) -> Result<SendableRecordBatchStream> { let proj = self.base_config.projected_file_column_names(); - let batch_size = self.base_config.batch_size; + let batch_size = runtime.batch_size(); let file_schema = Arc::clone(&self.base_config.file_schema); // The avro reader cannot limit the number of records, so `remaining` is ignored. @@ -149,9 +149,8 @@ impl ExecutionPlan for AvroExec { DisplayFormatType::Default => { write!( f, - "AvroExec: files={}, batch_size={}, limit={:?}", + "AvroExec: files={}, limit={:?}", super::FileGroupsDisplay(&self.base_config.file_groups), - self.base_config.batch_size, self.base_config.limit, ) } @@ -180,7 +179,7 @@ mod tests { async fn avro_exec_without_partition() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); - let avro_exec = AvroExec::new(PhysicalPlanConfig { + let avro_exec = AvroExec::new(FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_groups: vec![vec![local_unpartitioned_file(filename.clone())]], file_schema: AvroFormat {} @@ -188,7 +187,6 @@ mod tests { .await?, statistics: Statistics::default(), projection: Some(vec![0, 1, 2]), - batch_size: 1024, limit: None, table_partition_cols: vec![], }); @@ -241,7 +239,7 @@ mod tests { .infer_schema(local_object_reader_stream(vec![filename])) .await?; - let avro_exec = AvroExec::new(PhysicalPlanConfig { + let avro_exec = AvroExec::new(FileScanConfig { // select specific columns of the files as well as the partitioning // column which is supposed to be the last column in the table schema. projection: Some(vec![0, 1, file_schema.fields().len(), 2]), @@ -249,7 +247,6 @@ mod tests { file_groups: vec![vec![partitioned_file]], file_schema: file_schema, statistics: Statistics::default(), - batch_size: 1024, limit: None, table_partition_cols: vec!["date".to_owned()], }); diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index ea965a4..5cff3b6 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -31,12 +31,12 @@ use crate::execution::runtime_env::RuntimeEnv; use async_trait::async_trait; use super::file_stream::{BatchIter, FileStream}; -use super::PhysicalPlanConfig; +use super::FileScanConfig; /// Execution plan for scanning a CSV file #[derive(Debug, Clone)] pub struct CsvExec { - base_config: PhysicalPlanConfig, + base_config: FileScanConfig, projected_statistics: Statistics, projected_schema: SchemaRef, has_header: bool, @@ -45,7 +45,7 @@ pub struct CsvExec { impl CsvExec { /// Create a new CSV reader execution plan provided base and specific configurations - pub fn new(base_config: PhysicalPlanConfig, has_header: bool, delimiter: u8) -> Self { + pub fn new(base_config: FileScanConfig, has_header: bool, delimiter: u8) -> Self { let (projected_schema, projected_statistics) = base_config.project(); Self { @@ -58,7 +58,7 @@ impl CsvExec { } /// Ref to the base configs - pub fn base_config(&self) -> &PhysicalPlanConfig { + pub fn base_config(&self) -> &FileScanConfig { &self.base_config } /// true if the first line of each file is a header @@ -110,9 +110,9 @@ impl ExecutionPlan for CsvExec { async fn execute( &self, partition: usize, - _runtime: Arc<RuntimeEnv>, + runtime: Arc<RuntimeEnv>, ) -> Result<SendableRecordBatchStream> { - let batch_size = self.base_config.batch_size; + let batch_size = runtime.batch_size(); let file_schema = Arc::clone(&self.base_config.file_schema); let file_projection = self.base_config.file_column_projection_indices(); let has_header = self.has_header; @@ -153,10 +153,9 @@ impl ExecutionPlan for CsvExec { DisplayFormatType::Default => { write!( f, - "CsvExec: files={}, has_header={}, batch_size={}, limit={:?}", + "CsvExec: files={}, has_header={}, limit={:?}", super::FileGroupsDisplay(&self.base_config.file_groups), self.has_header, - self.base_config.batch_size, self.base_config.limit, ) } @@ -186,13 +185,12 @@ mod tests { let filename = "aggregate_test_100.csv"; let path = format!("{}/csv/{}", testdata, filename); let csv = CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema, file_groups: vec![vec![local_unpartitioned_file(path)]], statistics: Statistics::default(), projection: Some(vec![0, 2, 4]), - batch_size: 1024, limit: None, table_partition_cols: vec![], }, @@ -233,13 +231,12 @@ mod tests { let filename = "aggregate_test_100.csv"; let path = format!("{}/csv/{}", testdata, filename); let csv = CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema, file_groups: vec![vec![local_unpartitioned_file(path)]], statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: Some(5), table_partition_cols: vec![], }, @@ -285,7 +282,7 @@ mod tests { partitioned_file.partition_values = vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))]; let csv = CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { // we should be able to project on the partition column // wich is supposed to be after the file fields projection: Some(vec![0, file_schema.fields().len()]), @@ -293,7 +290,6 @@ mod tests { file_schema, file_groups: vec![vec![partitioned_file]], statistics: Statistics::default(), - batch_size: 1024, limit: None, table_partition_cols: vec!["date".to_owned()], }, diff --git a/datafusion/src/physical_plan/file_format/json.rs b/datafusion/src/physical_plan/file_format/json.rs index a0959c2..0fc95d1 100644 --- a/datafusion/src/physical_plan/file_format/json.rs +++ b/datafusion/src/physical_plan/file_format/json.rs @@ -28,19 +28,19 @@ use std::any::Any; use std::sync::Arc; use super::file_stream::{BatchIter, FileStream}; -use super::PhysicalPlanConfig; +use super::FileScanConfig; /// Execution plan for scanning NdJson data source #[derive(Debug, Clone)] pub struct NdJsonExec { - base_config: PhysicalPlanConfig, + base_config: FileScanConfig, projected_statistics: Statistics, projected_schema: SchemaRef, } impl NdJsonExec { /// Create a new JSON reader execution plan provided base configurations - pub fn new(base_config: PhysicalPlanConfig) -> Self { + pub fn new(base_config: FileScanConfig) -> Self { let (projected_schema, projected_statistics) = base_config.project(); Self { @@ -86,11 +86,11 @@ impl ExecutionPlan for NdJsonExec { async fn execute( &self, partition: usize, - _runtime: Arc<RuntimeEnv>, + runtime: Arc<RuntimeEnv>, ) -> Result<SendableRecordBatchStream> { let proj = self.base_config.projected_file_column_names(); - let batch_size = self.base_config.batch_size; + let batch_size = runtime.batch_size(); let file_schema = Arc::clone(&self.base_config.file_schema); // The json reader cannot limit the number of records, so `remaining` is ignored. @@ -122,8 +122,7 @@ impl ExecutionPlan for NdJsonExec { DisplayFormatType::Default => { write!( f, - "JsonExec: batch_size={}, limit={:?}, files={}", - self.base_config.batch_size, + "JsonExec: limit={:?}, files={}", self.base_config.limit, super::FileGroupsDisplay(&self.base_config.file_groups), ) @@ -162,13 +161,12 @@ mod tests { let runtime = Arc::new(RuntimeEnv::default()); use arrow::datatypes::DataType; let path = format!("{}/1.json", TEST_DATA_BASE); - let exec = NdJsonExec::new(PhysicalPlanConfig { + let exec = NdJsonExec::new(FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_groups: vec![vec![local_unpartitioned_file(path.clone())]], file_schema: infer_schema(path).await?, statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: Some(3), table_partition_cols: vec![], }); @@ -217,13 +215,12 @@ mod tests { async fn nd_json_exec_file_projection() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); let path = format!("{}/1.json", TEST_DATA_BASE); - let exec = NdJsonExec::new(PhysicalPlanConfig { + let exec = NdJsonExec::new(FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_groups: vec![vec![local_unpartitioned_file(path.clone())]], file_schema: infer_schema(path).await?, statistics: Statistics::default(), projection: Some(vec![0, 2]), - batch_size: 1024, limit: None, table_partition_cols: vec![], }); diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 17ec9f1..b655cdb 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -57,7 +57,7 @@ lazy_static! { /// The base configurations to provide when creating a physical plan for /// any given file format. #[derive(Debug, Clone)] -pub struct PhysicalPlanConfig { +pub struct FileScanConfig { /// Store from which the `files` should be fetched pub object_store: Arc<dyn ObjectStore>, /// Schema before projection. It contains the columns that are expected @@ -70,15 +70,13 @@ pub struct PhysicalPlanConfig { /// Columns on which to project the data. Indexes that are higher than the /// number of columns of `file_schema` refer to `table_partition_cols`. pub projection: Option<Vec<usize>>, - /// The maximum number of records per arrow column - pub batch_size: usize, /// The minimum number of records required from this source plan pub limit: Option<usize>, /// The partitioning column names pub table_partition_cols: Vec<String>, } -impl PhysicalPlanConfig { +impl FileScanConfig { /// Project the schema and the statistics on the given column indices fn project(&self) -> (SchemaRef, Statistics) { if self.projection.is_none() && self.table_partition_cols.is_empty() { @@ -475,9 +473,8 @@ mod tests { projection: Option<Vec<usize>>, statistics: Statistics, table_partition_cols: Vec<String>, - ) -> PhysicalPlanConfig { - PhysicalPlanConfig { - batch_size: 1024, + ) -> FileScanConfig { + FileScanConfig { file_schema, file_groups: vec![vec![]], limit: None, diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 73f0b8d..17abb43 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -29,7 +29,7 @@ use crate::{ logical_plan::{Column, Expr}, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, physical_plan::{ - file_format::PhysicalPlanConfig, + file_format::FileScanConfig, metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, stream::RecordBatchReceiverStream, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, @@ -67,7 +67,7 @@ use super::PartitionColumnProjector; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { - base_config: PhysicalPlanConfig, + base_config: FileScanConfig, projected_statistics: Statistics, projected_schema: SchemaRef, /// Execution metrics @@ -88,7 +88,7 @@ struct ParquetFileMetrics { impl ParquetExec { /// Create a new Parquet reader execution plan provided file list and schema. /// Even if `limit` is set, ParquetExec rounds up the number of records to the next `batch_size`. - pub fn new(base_config: PhysicalPlanConfig, predicate: Option<Expr>) -> Self { + pub fn new(base_config: FileScanConfig, predicate: Option<Expr>) -> Self { debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", base_config.file_groups, base_config.projection, predicate, base_config.limit); @@ -125,7 +125,7 @@ impl ParquetExec { } /// Ref to the base configs - pub fn base_config(&self) -> &PhysicalPlanConfig { + pub fn base_config(&self) -> &FileScanConfig { &self.base_config } } @@ -190,7 +190,7 @@ impl ExecutionPlan for ParquetExec { async fn execute( &self, partition_index: usize, - _runtime: Arc<RuntimeEnv>, + runtime: Arc<RuntimeEnv>, ) -> Result<SendableRecordBatchStream> { // because the parquet implementation is not thread-safe, it is necessary to execute // on a thread and communicate with channels @@ -206,7 +206,7 @@ impl ExecutionPlan for ParquetExec { None => (0..self.base_config.file_schema.fields().len()).collect(), }; let pruning_predicate = self.pruning_predicate.clone(); - let batch_size = self.base_config.batch_size; + let batch_size = runtime.batch_size(); let limit = self.base_config.limit; let object_store = Arc::clone(&self.base_config.object_store); let partition_col_proj = PartitionColumnProjector::new( @@ -247,8 +247,7 @@ impl ExecutionPlan for ParquetExec { DisplayFormatType::Default => { write!( f, - "ParquetExec: batch_size={}, limit={:?}, partitions={}", - self.base_config.batch_size, + "ParquetExec: limit={:?}, partitions={}", self.base_config.limit, super::FileGroupsDisplay(&self.base_config.file_groups) ) @@ -480,7 +479,7 @@ mod tests { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/alltypes_plain.parquet", testdata); let parquet_exec = ParquetExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_groups: vec![vec![local_unpartitioned_file(filename.clone())]], file_schema: ParquetFormat::default() @@ -488,7 +487,6 @@ mod tests { .await?, statistics: Statistics::default(), projection: Some(vec![0, 1, 2]), - batch_size: 1024, limit: None, table_partition_cols: vec![], }, @@ -531,7 +529,7 @@ mod tests { ScalarValue::Utf8(Some("26".to_owned())), ]; let parquet_exec = ParquetExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_groups: vec![vec![partitioned_file]], file_schema: ParquetFormat::default() @@ -540,7 +538,6 @@ mod tests { statistics: Statistics::default(), // file has 10 cols so index 12 should be month projection: Some(vec![0, 1, 2, 12]), - batch_size: 1024, limit: None, table_partition_cols: vec![ "year".to_owned(), diff --git a/datafusion/src/physical_plan/filter.rs b/datafusion/src/physical_plan/filter.rs index dab5fac..a071d0e 100644 --- a/datafusion/src/physical_plan/filter.rs +++ b/datafusion/src/physical_plan/filter.rs @@ -229,7 +229,7 @@ mod tests { use super::*; use crate::datasource::object_store::local::LocalFileSystem; use crate::physical_plan::expressions::*; - use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; + use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::scalar::ScalarValue; use crate::test; @@ -247,13 +247,12 @@ mod tests { test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; let csv = CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema: Arc::clone(&schema), file_groups: files, statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: None, table_partition_cols: vec![], }, diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs index 3e09636..f022557 100644 --- a/datafusion/src/physical_plan/limit.rs +++ b/datafusion/src/physical_plan/limit.rs @@ -396,7 +396,7 @@ mod tests { use crate::datasource::object_store::local::LocalFileSystem; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::common; - use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; + use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::{test, test_util}; #[tokio::test] @@ -409,13 +409,12 @@ mod tests { test::create_partitioned_csv("aggregate_test_100.csv", num_partitions)?; let csv = CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema: schema, file_groups: files, statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: None, table_partition_cols: vec![], }, diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 836d399..b019b52 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -223,7 +223,7 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// \n CoalesceBatchesExec: target_batch_size=4096\ /// \n FilterExec: a@0 < 5\ /// \n RepartitionExec: partitioning=RoundRobinBatch(3)\ -/// \n CsvExec: files=[tests/example.csv], has_header=true, batch_size=8192, limit=None", +/// \n CsvExec: files=[tests/example.csv], has_header=true, limit=None", /// plan_string.trim()); /// } /// ``` diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index fa0c101..2dcde9d 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -327,8 +327,6 @@ impl DefaultPhysicalPlanner { ctx_state: &'a ExecutionContextState, ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> { async move { - let batch_size = ctx_state.config.batch_size; - let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan { LogicalPlan::TableScan (TableScan { source, @@ -342,7 +340,7 @@ impl DefaultPhysicalPlanner { // referred to in the query let filters = unnormalize_cols(filters.iter().cloned()); let unaliased: Vec<Expr> = filters.into_iter().map(unalias).collect(); - source.scan(projection, batch_size, &unaliased, *limit).await + source.scan(projection, &unaliased, *limit).await } LogicalPlan::Values(Values { values, diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs index d86548b..6d9d587 100644 --- a/datafusion/src/physical_plan/projection.rs +++ b/datafusion/src/physical_plan/projection.rs @@ -290,7 +290,7 @@ mod tests { use super::*; use crate::datasource::object_store::local::LocalFileSystem; use crate::physical_plan::expressions::{self, col}; - use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; + use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::scalar::ScalarValue; use crate::test::{self}; use crate::test_util; @@ -306,13 +306,12 @@ mod tests { test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; let csv = CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema: Arc::clone(&schema), file_groups: files, statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: None, table_partition_cols: vec![], }, diff --git a/datafusion/src/physical_plan/sorts/external_sort.rs b/datafusion/src/physical_plan/sorts/external_sort.rs index 8550cb5..6c60aac 100644 --- a/datafusion/src/physical_plan/sorts/external_sort.rs +++ b/datafusion/src/physical_plan/sorts/external_sort.rs @@ -153,7 +153,6 @@ impl ExternalSorter { streams, self.schema.clone(), &self.expr, - self.runtime.batch_size(), baseline_metrics, partition, self.runtime.clone(), @@ -523,7 +522,7 @@ mod tests { use crate::physical_plan::expressions::col; use crate::physical_plan::{ collect, - file_format::{CsvExec, PhysicalPlanConfig}, + file_format::{CsvExec, FileScanConfig}, }; use crate::test; use crate::test_util; @@ -538,13 +537,12 @@ mod tests { test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; let csv = CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema: Arc::clone(&schema), file_groups: files, statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: None, table_partition_cols: vec![], }, diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 678ad03..a210b93 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -323,7 +323,7 @@ mod tests { use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::{ collect, - file_format::{CsvExec, PhysicalPlanConfig}, + file_format::{CsvExec, FileScanConfig}, }; use crate::test::assert_is_pending; use crate::test::exec::assert_strong_count_converges_to_zero; @@ -342,13 +342,12 @@ mod tests { test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; let csv = CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema: Arc::clone(&schema), file_groups: files, statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: None, table_partition_cols: vec![], }, diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index f3bed69..9f12891 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -65,23 +65,16 @@ pub struct SortPreservingMergeExec { input: Arc<dyn ExecutionPlan>, /// Sort expressions expr: Vec<PhysicalSortExpr>, - /// The target size of yielded batches - target_batch_size: usize, /// Execution metrics metrics: ExecutionPlanMetricsSet, } impl SortPreservingMergeExec { /// Create a new sort execution plan - pub fn new( - expr: Vec<PhysicalSortExpr>, - input: Arc<dyn ExecutionPlan>, - target_batch_size: usize, - ) -> Self { + pub fn new(expr: Vec<PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>) -> Self { Self { input, expr, - target_batch_size, metrics: ExecutionPlanMetricsSet::new(), } } @@ -129,7 +122,6 @@ impl ExecutionPlan for SortPreservingMergeExec { 1 => Ok(Arc::new(SortPreservingMergeExec::new( self.expr.clone(), children[0].clone(), - self.target_batch_size, ))), _ => Err(DataFusionError::Internal( "SortPreservingMergeExec wrong number of children".to_string(), @@ -182,7 +174,6 @@ impl ExecutionPlan for SortPreservingMergeExec { AbortOnDropMany(join_handles), self.schema(), &self.expr, - self.target_batch_size, baseline_metrics, partition, runtime.clone(), @@ -304,9 +295,6 @@ pub(crate) struct SortPreservingMergeStream { /// The sort options for each expression sort_options: Arc<Vec<SortOptions>>, - /// The desired RecordBatch size to yield - target_batch_size: usize, - /// used to record execution metrics baseline_metrics: BaselineMetrics, @@ -333,7 +321,6 @@ impl SortPreservingMergeStream { _drop_helper: AbortOnDropMany<()>, schema: SchemaRef, expressions: &[PhysicalSortExpr], - target_batch_size: usize, baseline_metrics: BaselineMetrics, partition: usize, runtime: Arc<RuntimeEnv>, @@ -354,7 +341,6 @@ impl SortPreservingMergeStream { _drop_helper, column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(), sort_options: Arc::new(expressions.iter().map(|x| x.options).collect()), - target_batch_size, baseline_metrics, aborted: false, in_progress: vec![], @@ -367,7 +353,6 @@ impl SortPreservingMergeStream { streams: Vec<SortedStream>, schema: SchemaRef, expressions: &[PhysicalSortExpr], - target_batch_size: usize, baseline_metrics: BaselineMetrics, partition: usize, runtime: Arc<RuntimeEnv>, @@ -392,7 +377,6 @@ impl SortPreservingMergeStream { _drop_helper: AbortOnDropMany(vec![]), column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(), sort_options: Arc::new(expressions.iter().map(|x| x.options).collect()), - target_batch_size, baseline_metrics, aborted: false, in_progress: vec![], @@ -628,7 +612,7 @@ impl SortPreservingMergeStream { row_idx, }); - if self.in_progress.len() == self.target_batch_size { + if self.in_progress.len() == self.runtime.batch_size() { return Poll::Ready(Some(self.build_record_batch())); } @@ -665,7 +649,7 @@ mod tests { use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray}; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; - use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; + use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{collect, common}; @@ -673,6 +657,7 @@ mod tests { use crate::{assert_batches_eq, test_util}; use super::*; + use crate::execution::runtime_env::RuntimeConfig; use arrow::datatypes::{DataType, Field, Schema}; use futures::{FutureExt, SinkExt}; use tokio_stream::StreamExt; @@ -903,7 +888,7 @@ mod tests { }, ]; let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); - let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec), 1024)); + let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let collected = collect(merge, runtime).await.unwrap(); assert_batches_eq!(exp, collected.as_slice()); @@ -914,7 +899,7 @@ mod tests { sort: Vec<PhysicalSortExpr>, runtime: Arc<RuntimeEnv>, ) -> RecordBatch { - let merge = Arc::new(SortPreservingMergeExec::new(sort, input, 1024)); + let merge = Arc::new(SortPreservingMergeExec::new(sort, input)); let mut result = collect(merge, runtime).await.unwrap(); assert_eq!(result.len(), 1); result.remove(0) @@ -951,13 +936,12 @@ mod tests { test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap(); let csv = Arc::new(CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema: Arc::clone(&schema), file_groups: files, statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: None, table_partition_cols: vec![], }, @@ -1039,13 +1023,12 @@ mod tests { test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap(); let csv = Arc::new(CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema: schema, file_groups: files, statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: None, table_partition_cols: vec![], }, @@ -1106,7 +1089,8 @@ mod tests { #[tokio::test] async fn test_partition_sort_streaming_input_output() { - let runtime = Arc::new(RuntimeEnv::default()); + let runtime = + Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(23)).unwrap()); let schema = test_util::aggr_test_schema(); let sort = vec![ @@ -1126,7 +1110,7 @@ mod tests { sorted_partitioned_input(sort.clone(), &[10, 5, 13], runtime.clone()).await; let basic = basic_sort(input.clone(), sort.clone(), runtime.clone()).await; - let merge = Arc::new(SortPreservingMergeExec::new(sort, input, 23)); + let merge = Arc::new(SortPreservingMergeExec::new(sort, input)); let merged = collect(merge, runtime.clone()).await.unwrap(); assert_eq!(merged.len(), 14); @@ -1199,7 +1183,7 @@ mod tests { }, ]; let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap(); - let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec), 1024)); + let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let collected = collect(merge, runtime).await.unwrap(); assert_eq!(collected.len(), 1); @@ -1264,7 +1248,6 @@ mod tests { AbortOnDropMany(vec![]), batches.schema(), sort.as_slice(), - 1024, baseline_metrics, 0, runtime.clone(), @@ -1313,7 +1296,7 @@ mod tests { options: Default::default(), }]; let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap(); - let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec), 1024)); + let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let collected = collect(merge.clone(), runtime).await.unwrap(); let expected = vec![ @@ -1366,7 +1349,6 @@ mod tests { options: SortOptions::default(), }], blocking_exec, - 1, )); let fut = collect(sort_preserving_merge_exec, runtime); diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs index efbc623..93ecf22 100644 --- a/datafusion/src/physical_plan/union.rs +++ b/datafusion/src/physical_plan/union.rs @@ -229,7 +229,7 @@ mod tests { use crate::{ physical_plan::{ collect, - file_format::{CsvExec, PhysicalPlanConfig}, + file_format::{CsvExec, FileScanConfig}, }, scalar::ScalarValue, }; @@ -246,13 +246,12 @@ mod tests { let (_, files2) = test::create_partitioned_csv("aggregate_test_100.csv", 5)?; let csv = CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::clone(&fs), file_schema: Arc::clone(&schema), file_groups: files, statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: None, table_partition_cols: vec![], }, @@ -261,13 +260,12 @@ mod tests { ); let csv2 = CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::clone(&fs), file_schema: Arc::clone(&schema), file_groups: files2, statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: None, table_partition_cols: vec![], }, diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs index 42bc27c..243c571 100644 --- a/datafusion/src/physical_plan/windows/mod.rs +++ b/datafusion/src/physical_plan/windows/mod.rs @@ -177,7 +177,7 @@ mod tests { use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::aggregates::AggregateFunction; use crate::physical_plan::expressions::col; - use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; + use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::{collect, Statistics}; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending}; @@ -192,13 +192,12 @@ mod tests { let (_, files) = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; let csv = CsvExec::new( - PhysicalPlanConfig { + FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_schema: aggr_test_schema(), file_groups: files, statistics: Statistics::default(), projection: None, - batch_size: 1024, limit: None, table_partition_cols: vec![], }, diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index 6f0daa4..c2511ba 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -201,7 +201,6 @@ impl TableProvider for CustomTableProvider { async fn scan( &self, projection: &Option<Vec<usize>>, - _batch_size: usize, _filters: &[Expr], _limit: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { diff --git a/datafusion/tests/provider_filter_pushdown.rs b/datafusion/tests/provider_filter_pushdown.rs index 330e95c..5e14524 100644 --- a/datafusion/tests/provider_filter_pushdown.rs +++ b/datafusion/tests/provider_filter_pushdown.rs @@ -128,7 +128,6 @@ impl TableProvider for CustomProvider { async fn scan( &self, _: &Option<Vec<usize>>, - _: usize, filters: &[Expr], _: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { diff --git a/datafusion/tests/sql/avro.rs b/datafusion/tests/sql/avro.rs index f3c0f0c..d0cdf71 100644 --- a/datafusion/tests/sql/avro.rs +++ b/datafusion/tests/sql/avro.rs @@ -154,7 +154,7 @@ async fn avro_explain() { \n CoalescePartitionsExec\ \n HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\ - \n AvroExec: files=[ARROW_TEST_DATA/avro/alltypes_plain.avro], batch_size=8192, limit=None\ + \n AvroExec: files=[ARROW_TEST_DATA/avro/alltypes_plain.avro], limit=None\ \n", ], ]; diff --git a/datafusion/tests/sql/explain_analyze.rs b/datafusion/tests/sql/explain_analyze.rs index 128a0d8..7c1fa69 100644 --- a/datafusion/tests/sql/explain_analyze.rs +++ b/datafusion/tests/sql/explain_analyze.rs @@ -658,7 +658,7 @@ async fn test_physical_plan_display_indent() { " CoalesceBatchesExec: target_batch_size=4096", " FilterExec: c12@1 < CAST(10 AS Float64)", " RepartitionExec: partitioning=RoundRobinBatch(3)", - " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, batch_size=8192, limit=None", + " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None", ]; let data_path = datafusion::test_util::arrow_test_data(); @@ -703,13 +703,13 @@ async fn test_physical_plan_display_indent_multi_children() { " ProjectionExec: expr=[c1@0 as c1]", " ProjectionExec: expr=[c1@0 as c1]", " RepartitionExec: partitioning=RoundRobinBatch(3)", - " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, batch_size=8192, limit=None", + " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 3)", " ProjectionExec: expr=[c2@0 as c2]", " ProjectionExec: expr=[c1@0 as c2]", " RepartitionExec: partitioning=RoundRobinBatch(3)", - " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, batch_size=8192, limit=None", + " CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None", ]; let data_path = datafusion::test_util::arrow_test_data(); @@ -751,7 +751,7 @@ async fn csv_explain() { \n CoalesceBatchesExec: target_batch_size=4096\ \n FilterExec: CAST(c2@1 AS Int64) > 10\ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\ - \n CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, batch_size=8192, limit=None\ + \n CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None\ \n" ]]; assert_eq!(expected, actual); diff --git a/datafusion/tests/statistics.rs b/datafusion/tests/statistics.rs index 0e97717..4964baf 100644 --- a/datafusion/tests/statistics.rs +++ b/datafusion/tests/statistics.rs @@ -73,7 +73,6 @@ impl TableProvider for StatisticsValidation { async fn scan( &self, projection: &Option<Vec<usize>>, - _batch_size: usize, filters: &[Expr], // limit is ignored because it is not mandatory for a `TableProvider` to honor it _limit: Option<usize>, diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index a8d19d6..a4d9e97 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -220,9 +220,9 @@ async fn topk_plan() -> Result<()> { let mut ctx = setup_table(make_topk_context()).await?; let expected = vec![ - "| logical_plan after topk | TopK: k=3 |", - "| | Projection: #sales.customer_id, #sales.revenue |", - "| | TableScan: sales projection=Some([0, 1]) |", + "| logical_plan after topk | TopK: k=3 |", + "| | Projection: #sales.customer_id, #sales.revenue |", + "| | TableScan: sales projection=Some([0, 1]) |", ].join("\n"); let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);