This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 92a3e45  Consolidate `batch_size` configuration in `ExecutionConfig`, 
`RuntimeConfig` and `PhysicalPlanConfig` (#1562)
92a3e45 is described below

commit 92a3e454baf8bffc3afd4f1aa0d8adeaa4cc768d
Author: Yijie Shen <henry.yijies...@gmail.com>
AuthorDate: Mon Jan 17 21:51:35 2022 +0800

    Consolidate `batch_size` configuration in `ExecutionConfig`, 
`RuntimeConfig` and `PhysicalPlanConfig` (#1562)
    
    * Remove batch_size from PhysicalPlanConfig first
    
    * Fix newly emerged lint problems
    
    * Remove batch_size from `ExecutionConfig`, rename `PhysicalPlanConfig` to 
`FileScanConfig` to make it clearer
    
    * Revert "Fix newly emerged lint problems"
    
    This reverts commit 7be7ccb4558aca5159884e75cc46a3ff68c40922.
---
 ballista/rust/core/proto/ballista.proto            |  1 -
 .../rust/core/src/serde/logical_plan/from_proto.rs |  4 +-
 ballista/rust/core/src/serde/logical_plan/mod.rs   | 10 ++--
 .../rust/core/src/serde/logical_plan/to_proto.rs   |  4 +-
 .../core/src/serde/physical_plan/from_proto.rs     |  9 ++--
 .../rust/core/src/serde/physical_plan/to_proto.rs  |  7 ++-
 benchmarks/src/bin/tpch.rs                         | 10 ++--
 datafusion/benches/physical_plan.rs                |  2 +-
 datafusion/benches/sort_limit_query_sql.rs         |  7 ++-
 datafusion/src/datasource/datasource.rs            |  1 -
 datafusion/src/datasource/empty.rs                 |  1 -
 datafusion/src/datasource/file_format/avro.rs      | 54 ++++++++++++----------
 datafusion/src/datasource/file_format/csv.rs       | 22 ++++-----
 datafusion/src/datasource/file_format/json.rs      | 22 ++++-----
 datafusion/src/datasource/file_format/mod.rs       |  4 +-
 datafusion/src/datasource/file_format/parquet.rs   | 30 ++++++------
 datafusion/src/datasource/listing/table.rs         | 12 ++---
 datafusion/src/datasource/memory.rs                | 12 ++---
 datafusion/src/execution/context.rs                | 15 ++----
 datafusion/src/optimizer/filter_push_down.rs       |  1 -
 .../src/physical_optimizer/coalesce_batches.rs     |  2 +-
 datafusion/src/physical_optimizer/repartition.rs   |  8 ++--
 .../src/physical_plan/coalesce_partitions.rs       |  5 +-
 datafusion/src/physical_plan/file_format/avro.rs   | 21 ++++-----
 datafusion/src/physical_plan/file_format/csv.rs    | 24 ++++------
 datafusion/src/physical_plan/file_format/json.rs   | 19 ++++----
 datafusion/src/physical_plan/file_format/mod.rs    | 11 ++---
 .../src/physical_plan/file_format/parquet.rs       | 21 ++++-----
 datafusion/src/physical_plan/filter.rs             |  5 +-
 datafusion/src/physical_plan/limit.rs              |  5 +-
 datafusion/src/physical_plan/mod.rs                |  2 +-
 datafusion/src/physical_plan/planner.rs            |  4 +-
 datafusion/src/physical_plan/projection.rs         |  5 +-
 .../src/physical_plan/sorts/external_sort.rs       |  6 +--
 datafusion/src/physical_plan/sorts/sort.rs         |  5 +-
 .../physical_plan/sorts/sort_preserving_merge.rs   | 44 ++++++------------
 datafusion/src/physical_plan/union.rs              |  8 ++--
 datafusion/src/physical_plan/windows/mod.rs        |  5 +-
 datafusion/tests/custom_sources.rs                 |  1 -
 datafusion/tests/provider_filter_pushdown.rs       |  1 -
 datafusion/tests/sql/avro.rs                       |  2 +-
 datafusion/tests/sql/explain_analyze.rs            |  8 ++--
 datafusion/tests/statistics.rs                     |  1 -
 datafusion/tests/user_defined_plan.rs              |  6 +--
 44 files changed, 188 insertions(+), 259 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index cdd27cf..a0bb841 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -626,7 +626,6 @@ message ScanLimit {
 message FileScanExecConf {
   repeated FileGroup file_groups = 1;
   Schema schema = 2;
-  uint32 batch_size = 3;
   repeated uint32 projection = 4;
   ScanLimit limit = 5;
   Statistics statistics = 6;
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index dfac547..427e6bf 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -246,8 +246,8 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                             .collect::<Result<Vec<_>, _>>()?,
                         partition_count as usize,
                     ),
-                    PartitionMethod::RoundRobin(batch_size) => {
-                        Partitioning::RoundRobinBatch(batch_size as usize)
+                    PartitionMethod::RoundRobin(partition_count) => {
+                        Partitioning::RoundRobinBatch(partition_count as usize)
                     }
                 };
 
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs 
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 94d4e8e..c09b8a5 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -65,7 +65,7 @@ mod roundtrip_tests {
     async fn roundtrip_repartition() -> Result<()> {
         use datafusion::logical_plan::Partitioning;
 
-        let test_batch_sizes = [usize::MIN, usize::MAX, 43256];
+        let test_partition_counts = [usize::MIN, usize::MAX, 43256];
 
         let test_expr: Vec<Expr> =
             vec![col("c1") + col("c2"), Expr::Literal((4.0).into())];
@@ -92,8 +92,8 @@ mod roundtrip_tests {
             .map_err(BallistaError::DataFusionError)?,
         );
 
-        for batch_size in test_batch_sizes.iter() {
-            let rr_repartition = Partitioning::RoundRobinBatch(*batch_size);
+        for partition_count in test_partition_counts.iter() {
+            let rr_repartition = 
Partitioning::RoundRobinBatch(*partition_count);
 
             let roundtrip_plan = LogicalPlan::Repartition(Repartition {
                 input: plan.clone(),
@@ -102,7 +102,7 @@ mod roundtrip_tests {
 
             roundtrip_test!(roundtrip_plan);
 
-            let h_repartition = Partitioning::Hash(test_expr.clone(), 
*batch_size);
+            let h_repartition = Partitioning::Hash(test_expr.clone(), 
*partition_count);
 
             let roundtrip_plan = LogicalPlan::Repartition(Repartition {
                 input: plan.clone(),
@@ -111,7 +111,7 @@ mod roundtrip_tests {
 
             roundtrip_test!(roundtrip_plan);
 
-            let no_expr_hrepartition = Partitioning::Hash(Vec::new(), 
*batch_size);
+            let no_expr_hrepartition = Partitioning::Hash(Vec::new(), 
*partition_count);
 
             let roundtrip_plan = LogicalPlan::Repartition(Repartition {
                 input: plan.clone(),
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index b783b54..5d77a97 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -826,8 +826,8 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                             partition_count: *partition_count as u64,
                         })
                     }
-                    Partitioning::RoundRobinBatch(batch_size) => {
-                        PartitionMethod::RoundRobin(*batch_size as u64)
+                    Partitioning::RoundRobinBatch(partition_count) => {
+                        PartitionMethod::RoundRobin(*partition_count as u64)
                     }
                 };
 
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index cad27b3..0b3d503 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -48,7 +48,7 @@ use datafusion::logical_plan::{
 use datafusion::physical_plan::aggregates::{create_aggregate_expr, 
AggregateFunction};
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::file_format::{
-    AvroExec, CsvExec, ParquetExec, PhysicalPlanConfig,
+    AvroExec, CsvExec, FileScanConfig, ParquetExec,
 };
 use datafusion::physical_plan::hash_aggregate::{AggregateMode, 
HashAggregateExec};
 use datafusion::physical_plan::hash_join::PartitionMode;
@@ -770,10 +770,10 @@ impl TryInto<Statistics> for &protobuf::Statistics {
     }
 }
 
-impl TryInto<PhysicalPlanConfig> for &protobuf::FileScanExecConf {
+impl TryInto<FileScanConfig> for &protobuf::FileScanExecConf {
     type Error = BallistaError;
 
-    fn try_into(self) -> Result<PhysicalPlanConfig, Self::Error> {
+    fn try_into(self) -> Result<FileScanConfig, Self::Error> {
         let schema = Arc::new(convert_required!(self.schema)?);
         let projection = self
             .projection
@@ -787,7 +787,7 @@ impl TryInto<PhysicalPlanConfig> for 
&protobuf::FileScanExecConf {
         };
         let statistics = convert_required!(self.statistics)?;
 
-        Ok(PhysicalPlanConfig {
+        Ok(FileScanConfig {
             object_store: Arc::new(LocalFileSystem {}),
             file_schema: schema,
             file_groups: self
@@ -797,7 +797,6 @@ impl TryInto<PhysicalPlanConfig> for 
&protobuf::FileScanExecConf {
                 .collect::<Result<Vec<_>, _>>()?,
             statistics,
             projection,
-            batch_size: self.batch_size as usize,
             limit: self.limit.as_ref().map(|sl| sl.limit as usize),
             table_partition_cols: vec![],
         })
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 930f075..4c6db9a 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -43,7 +43,7 @@ use datafusion::physical_plan::{
 };
 use datafusion::physical_plan::{file_format::AvroExec, filter::FilterExec};
 use datafusion::physical_plan::{
-    file_format::PhysicalPlanConfig, hash_aggregate::AggregateMode,
+    file_format::FileScanConfig, hash_aggregate::AggregateMode,
 };
 use datafusion::{
     datasource::PartitionedFile, 
physical_plan::coalesce_batches::CoalesceBatchesExec,
@@ -677,10 +677,10 @@ impl From<&Statistics> for protobuf::Statistics {
     }
 }
 
-impl TryFrom<&PhysicalPlanConfig> for protobuf::FileScanExecConf {
+impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf {
     type Error = BallistaError;
     fn try_from(
-        conf: &PhysicalPlanConfig,
+        conf: &FileScanConfig,
     ) -> Result<protobuf::FileScanExecConf, Self::Error> {
         let file_groups = conf
             .file_groups
@@ -700,7 +700,6 @@ impl TryFrom<&PhysicalPlanConfig> for 
protobuf::FileScanExecConf {
                 .map(|n| *n as u32)
                 .collect(),
             schema: Some(conf.file_schema.as_ref().into()),
-            batch_size: conf.batch_size as u32,
             table_partition_cols: conf.table_partition_cols.to_vec(),
         })
     }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 9563434..0b9fba5 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -277,13 +277,9 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) 
-> Result<Vec<RecordB
             println!("Loading table '{}' into memory", table);
             let start = Instant::now();
 
-            let memtable = MemTable::load(
-                table_provider,
-                opt.batch_size,
-                Some(opt.partitions),
-                runtime.clone(),
-            )
-            .await?;
+            let memtable =
+                MemTable::load(table_provider, Some(opt.partitions), 
runtime.clone())
+                    .await?;
             println!(
                 "Loaded table '{}' into memory in {} ms",
                 table,
diff --git a/datafusion/benches/physical_plan.rs 
b/datafusion/benches/physical_plan.rs
index e9eb53d..8dd1f49 100644
--- a/datafusion/benches/physical_plan.rs
+++ b/datafusion/benches/physical_plan.rs
@@ -56,7 +56,7 @@ fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, 
sort: &[&str]) {
         None,
     )
     .unwrap();
-    let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec), 
8192));
+    let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));
 
     let rt = Runtime::new().unwrap();
     let rt_env = Arc::new(RuntimeEnv::default());
diff --git a/datafusion/benches/sort_limit_query_sql.rs 
b/datafusion/benches/sort_limit_query_sql.rs
index 8f40928..41f8c17 100644
--- a/datafusion/benches/sort_limit_query_sql.rs
+++ b/datafusion/benches/sort_limit_query_sql.rs
@@ -84,10 +84,9 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
         ctx.state.lock().unwrap().config.target_partitions = 1;
         let runtime = ctx.state.lock().unwrap().runtime_env.clone();
 
-        let mem_table =
-            MemTable::load(Arc::new(csv), 16 * 1024, Some(partitions), runtime)
-                .await
-                .unwrap();
+        let mem_table = MemTable::load(Arc::new(csv), Some(partitions), 
runtime)
+            .await
+            .unwrap();
         ctx.register_table("aggregate_test_100", Arc::new(mem_table))
             .unwrap();
         ctx_holder.lock().unwrap().push(Arc::new(Mutex::new(ctx)))
diff --git a/datafusion/src/datasource/datasource.rs 
b/datafusion/src/datasource/datasource.rs
index 823b408..1b59c85 100644
--- a/datafusion/src/datasource/datasource.rs
+++ b/datafusion/src/datasource/datasource.rs
@@ -77,7 +77,6 @@ pub trait TableProvider: Sync + Send {
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
-        batch_size: usize,
         filters: &[Expr],
         // limit can be used to reduce the amount scanned
         // from the datasource as a performance optimization.
diff --git a/datafusion/src/datasource/empty.rs 
b/datafusion/src/datasource/empty.rs
index 380c5a7..9665605 100644
--- a/datafusion/src/datasource/empty.rs
+++ b/datafusion/src/datasource/empty.rs
@@ -53,7 +53,6 @@ impl TableProvider for EmptyTable {
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
-        _batch_size: usize,
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/src/datasource/file_format/avro.rs 
b/datafusion/src/datasource/file_format/avro.rs
index e1ae174..08eb343 100644
--- a/datafusion/src/datasource/file_format/avro.rs
+++ b/datafusion/src/datasource/file_format/avro.rs
@@ -30,7 +30,7 @@ use crate::avro_to_arrow::read_avro_schema_from_reader;
 use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
 use crate::error::Result;
 use crate::logical_plan::Expr;
-use crate::physical_plan::file_format::{AvroExec, PhysicalPlanConfig};
+use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
 
@@ -61,7 +61,7 @@ impl FileFormat for AvroFormat {
 
     async fn create_physical_plan(
         &self,
-        conf: PhysicalPlanConfig,
+        conf: FileScanConfig,
         _filters: &[Expr],
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let exec = AvroExec::new(conf);
@@ -81,7 +81,7 @@ mod tests {
     };
 
     use super::*;
-    use crate::execution::runtime_env::RuntimeEnv;
+    use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use arrow::array::{
         BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
         TimestampMicrosecondArray,
@@ -90,9 +90,9 @@ mod tests {
 
     #[tokio::test]
     async fn read_small_batches() -> Result<()> {
+        let runtime = 
Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(2))?);
         let projection = None;
-        let runtime = Arc::new(RuntimeEnv::default());
-        let exec = get_exec("alltypes_plain.avro", &projection, 2, 
None).await?;
+        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
         let stream = exec.execute(0, runtime).await?;
 
         let tt_batches = stream
@@ -111,9 +111,10 @@ mod tests {
 
     #[tokio::test]
     async fn read_limit() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
         let projection = None;
-        let exec = get_exec("alltypes_plain.avro", &projection, 1024, 
Some(1)).await?;
-        let batches = collect(exec).await?;
+        let exec = get_exec("alltypes_plain.avro", &projection, 
Some(1)).await?;
+        let batches = collect(exec, runtime).await?;
         assert_eq!(1, batches.len());
         assert_eq!(11, batches[0].num_columns());
         assert_eq!(1, batches[0].num_rows());
@@ -123,8 +124,9 @@ mod tests {
 
     #[tokio::test]
     async fn read_alltypes_plain_avro() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
         let projection = None;
-        let exec = get_exec("alltypes_plain.avro", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
 
         let x: Vec<String> = exec
             .schema()
@@ -149,7 +151,7 @@ mod tests {
             x
         );
 
-        let batches = collect(exec).await?;
+        let batches = collect(exec, runtime).await?;
         assert_eq!(batches.len(), 1);
 
         let expected =  vec![
@@ -173,10 +175,11 @@ mod tests {
 
     #[tokio::test]
     async fn read_bool_alltypes_plain_avro() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![1]);
-        let exec = get_exec("alltypes_plain.avro", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
 
-        let batches = collect(exec).await?;
+        let batches = collect(exec, runtime).await?;
         assert_eq!(batches.len(), 1);
         assert_eq!(1, batches[0].num_columns());
         assert_eq!(8, batches[0].num_rows());
@@ -201,10 +204,11 @@ mod tests {
 
     #[tokio::test]
     async fn read_i32_alltypes_plain_avro() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![0]);
-        let exec = get_exec("alltypes_plain.avro", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
 
-        let batches = collect(exec).await?;
+        let batches = collect(exec, runtime).await?;
         assert_eq!(batches.len(), 1);
         assert_eq!(1, batches[0].num_columns());
         assert_eq!(8, batches[0].num_rows());
@@ -226,10 +230,11 @@ mod tests {
 
     #[tokio::test]
     async fn read_i96_alltypes_plain_avro() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![10]);
-        let exec = get_exec("alltypes_plain.avro", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
 
-        let batches = collect(exec).await?;
+        let batches = collect(exec, runtime).await?;
         assert_eq!(batches.len(), 1);
         assert_eq!(1, batches[0].num_columns());
         assert_eq!(8, batches[0].num_rows());
@@ -251,10 +256,11 @@ mod tests {
 
     #[tokio::test]
     async fn read_f32_alltypes_plain_avro() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![6]);
-        let exec = get_exec("alltypes_plain.avro", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
 
-        let batches = collect(exec).await?;
+        let batches = collect(exec, runtime).await?;
         assert_eq!(batches.len(), 1);
         assert_eq!(1, batches[0].num_columns());
         assert_eq!(8, batches[0].num_rows());
@@ -279,10 +285,11 @@ mod tests {
 
     #[tokio::test]
     async fn read_f64_alltypes_plain_avro() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![7]);
-        let exec = get_exec("alltypes_plain.avro", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
 
-        let batches = collect(exec).await?;
+        let batches = collect(exec, runtime).await?;
         assert_eq!(batches.len(), 1);
         assert_eq!(1, batches[0].num_columns());
         assert_eq!(8, batches[0].num_rows());
@@ -307,10 +314,11 @@ mod tests {
 
     #[tokio::test]
     async fn read_binary_alltypes_plain_avro() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![9]);
-        let exec = get_exec("alltypes_plain.avro", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
 
-        let batches = collect(exec).await?;
+        let batches = collect(exec, runtime).await?;
         assert_eq!(batches.len(), 1);
         assert_eq!(1, batches[0].num_columns());
         assert_eq!(8, batches[0].num_rows());
@@ -336,7 +344,6 @@ mod tests {
     async fn get_exec(
         file_name: &str,
         projection: &Option<Vec<usize>>,
-        batch_size: usize,
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let testdata = crate::test_util::arrow_test_data();
@@ -353,13 +360,12 @@ mod tests {
         let file_groups = 
vec![vec![local_unpartitioned_file(filename.to_owned())]];
         let exec = format
             .create_physical_plan(
-                PhysicalPlanConfig {
+                FileScanConfig {
                     object_store: Arc::new(LocalFileSystem {}),
                     file_schema,
                     file_groups,
                     statistics,
                     projection: projection.clone(),
-                    batch_size,
                     limit,
                     table_partition_cols: vec![],
                 },
diff --git a/datafusion/src/datasource/file_format/csv.rs 
b/datafusion/src/datasource/file_format/csv.rs
index 99770a8..f0a70d9 100644
--- a/datafusion/src/datasource/file_format/csv.rs
+++ b/datafusion/src/datasource/file_format/csv.rs
@@ -29,7 +29,7 @@ use super::FileFormat;
 use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
 use crate::error::Result;
 use crate::logical_plan::Expr;
-use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
+use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
 
@@ -123,7 +123,7 @@ impl FileFormat for CsvFormat {
 
     async fn create_physical_plan(
         &self,
-        conf: PhysicalPlanConfig,
+        conf: FileScanConfig,
         _filters: &[Expr],
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let exec = CsvExec::new(conf, self.has_header, self.delimiter);
@@ -136,10 +136,10 @@ mod tests {
     use arrow::array::StringArray;
 
     use super::*;
-    use crate::execution::runtime_env::RuntimeEnv;
+    use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use crate::{
         datasource::{
-            file_format::PhysicalPlanConfig,
+            file_format::FileScanConfig,
             object_store::local::{
                 local_object_reader, local_object_reader_stream,
                 local_unpartitioned_file, LocalFileSystem,
@@ -150,10 +150,10 @@ mod tests {
 
     #[tokio::test]
     async fn read_small_batches() -> Result<()> {
-        let runtime = Arc::new(RuntimeEnv::default());
+        let runtime = 
Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(2))?);
         // skip column 9 that overflows the automaticly discovered column type 
of i64 (u64 would work)
         let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]);
-        let exec = get_exec("aggregate_test_100.csv", &projection, 2, 
None).await?;
+        let exec = get_exec("aggregate_test_100.csv", &projection, 
None).await?;
         let stream = exec.execute(0, runtime).await?;
 
         let tt_batches: i32 = stream
@@ -178,7 +178,7 @@ mod tests {
     async fn read_limit() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![0, 1, 2, 3]);
-        let exec = get_exec("aggregate_test_100.csv", &projection, 1024, 
Some(1)).await?;
+        let exec = get_exec("aggregate_test_100.csv", &projection, 
Some(1)).await?;
         let batches = collect(exec, runtime).await?;
         assert_eq!(1, batches.len());
         assert_eq!(4, batches[0].num_columns());
@@ -190,7 +190,7 @@ mod tests {
     #[tokio::test]
     async fn infer_schema() -> Result<()> {
         let projection = None;
-        let exec = get_exec("aggregate_test_100.csv", &projection, 1024, 
None).await?;
+        let exec = get_exec("aggregate_test_100.csv", &projection, 
None).await?;
 
         let x: Vec<String> = exec
             .schema()
@@ -224,7 +224,7 @@ mod tests {
     async fn read_char_column() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![0]);
-        let exec = get_exec("aggregate_test_100.csv", &projection, 1024, 
None).await?;
+        let exec = get_exec("aggregate_test_100.csv", &projection, 
None).await?;
 
         let batches = collect(exec, runtime).await.expect("Collect batches");
 
@@ -250,7 +250,6 @@ mod tests {
     async fn get_exec(
         file_name: &str,
         projection: &Option<Vec<usize>>,
-        batch_size: usize,
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let testdata = crate::test_util::arrow_test_data();
@@ -267,13 +266,12 @@ mod tests {
         let file_groups = 
vec![vec![local_unpartitioned_file(filename.to_owned())]];
         let exec = format
             .create_physical_plan(
-                PhysicalPlanConfig {
+                FileScanConfig {
                     object_store: Arc::new(LocalFileSystem {}),
                     file_schema,
                     file_groups,
                     statistics,
                     projection: projection.clone(),
-                    batch_size,
                     limit,
                     table_partition_cols: vec![],
                 },
diff --git a/datafusion/src/datasource/file_format/json.rs 
b/datafusion/src/datasource/file_format/json.rs
index a8f1176..d7a278d 100644
--- a/datafusion/src/datasource/file_format/json.rs
+++ b/datafusion/src/datasource/file_format/json.rs
@@ -29,7 +29,7 @@ use async_trait::async_trait;
 use futures::StreamExt;
 
 use super::FileFormat;
-use super::PhysicalPlanConfig;
+use super::FileScanConfig;
 use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
 use crate::error::Result;
 use crate::logical_plan::Expr;
@@ -85,7 +85,7 @@ impl FileFormat for JsonFormat {
 
     async fn create_physical_plan(
         &self,
-        conf: PhysicalPlanConfig,
+        conf: FileScanConfig,
         _filters: &[Expr],
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let exec = NdJsonExec::new(conf);
@@ -98,10 +98,10 @@ mod tests {
     use arrow::array::Int64Array;
 
     use super::*;
-    use crate::execution::runtime_env::RuntimeEnv;
+    use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use crate::{
         datasource::{
-            file_format::PhysicalPlanConfig,
+            file_format::FileScanConfig,
             object_store::local::{
                 local_object_reader, local_object_reader_stream,
                 local_unpartitioned_file, LocalFileSystem,
@@ -112,9 +112,9 @@ mod tests {
 
     #[tokio::test]
     async fn read_small_batches() -> Result<()> {
-        let runtime = Arc::new(RuntimeEnv::default());
+        let runtime = 
Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(2))?);
         let projection = None;
-        let exec = get_exec(&projection, 2, None).await?;
+        let exec = get_exec(&projection, None).await?;
         let stream = exec.execute(0, runtime).await?;
 
         let tt_batches: i32 = stream
@@ -139,7 +139,7 @@ mod tests {
     async fn read_limit() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let projection = None;
-        let exec = get_exec(&projection, 1024, Some(1)).await?;
+        let exec = get_exec(&projection, Some(1)).await?;
         let batches = collect(exec, runtime).await?;
         assert_eq!(1, batches.len());
         assert_eq!(4, batches[0].num_columns());
@@ -151,7 +151,7 @@ mod tests {
     #[tokio::test]
     async fn infer_schema() -> Result<()> {
         let projection = None;
-        let exec = get_exec(&projection, 1024, None).await?;
+        let exec = get_exec(&projection, None).await?;
 
         let x: Vec<String> = exec
             .schema()
@@ -168,7 +168,7 @@ mod tests {
     async fn read_int_column() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![0]);
-        let exec = get_exec(&projection, 1024, None).await?;
+        let exec = get_exec(&projection, None).await?;
 
         let batches = collect(exec, runtime).await.expect("Collect batches");
 
@@ -196,7 +196,6 @@ mod tests {
 
     async fn get_exec(
         projection: &Option<Vec<usize>>,
-        batch_size: usize,
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let filename = "tests/jsons/2.json";
@@ -212,13 +211,12 @@ mod tests {
         let file_groups = 
vec![vec![local_unpartitioned_file(filename.to_owned())]];
         let exec = format
             .create_physical_plan(
-                PhysicalPlanConfig {
+                FileScanConfig {
                     object_store: Arc::new(LocalFileSystem {}),
                     file_schema,
                     file_groups,
                     statistics,
                     projection: projection.clone(),
-                    batch_size,
                     limit,
                     table_partition_cols: vec![],
                 },
diff --git a/datafusion/src/datasource/file_format/mod.rs 
b/datafusion/src/datasource/file_format/mod.rs
index 5449161..21da2e1 100644
--- a/datafusion/src/datasource/file_format/mod.rs
+++ b/datafusion/src/datasource/file_format/mod.rs
@@ -29,7 +29,7 @@ use std::sync::Arc;
 use crate::arrow::datatypes::SchemaRef;
 use crate::error::Result;
 use crate::logical_plan::Expr;
-use crate::physical_plan::file_format::PhysicalPlanConfig;
+use crate::physical_plan::file_format::FileScanConfig;
 use crate::physical_plan::{ExecutionPlan, Statistics};
 
 use async_trait::async_trait;
@@ -59,7 +59,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
     /// according to this file format.
     async fn create_physical_plan(
         &self,
-        conf: PhysicalPlanConfig,
+        conf: FileScanConfig,
         filters: &[Expr],
     ) -> Result<Arc<dyn ExecutionPlan>>;
 }
diff --git a/datafusion/src/datasource/file_format/parquet.rs 
b/datafusion/src/datasource/file_format/parquet.rs
index 608795a..a947518 100644
--- a/datafusion/src/datasource/file_format/parquet.rs
+++ b/datafusion/src/datasource/file_format/parquet.rs
@@ -35,7 +35,7 @@ use parquet::file::serialized_reader::SerializedFileReader;
 use parquet::file::statistics::Statistics as ParquetStatistics;
 
 use super::FileFormat;
-use super::PhysicalPlanConfig;
+use super::FileScanConfig;
 use crate::arrow::datatypes::{DataType, Field};
 use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
 use crate::datasource::{create_max_min_accs, get_col_stats};
@@ -104,7 +104,7 @@ impl FileFormat for ParquetFormat {
 
     async fn create_physical_plan(
         &self,
-        conf: PhysicalPlanConfig,
+        conf: FileScanConfig,
         filters: &[Expr],
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // If enable pruning then combine the filters to build the predicate.
@@ -341,7 +341,7 @@ mod tests {
     };
 
     use super::*;
-    use crate::execution::runtime_env::RuntimeEnv;
+    use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use arrow::array::{
         BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
         TimestampNanosecondArray,
@@ -350,9 +350,9 @@ mod tests {
 
     #[tokio::test]
     async fn read_small_batches() -> Result<()> {
-        let runtime = Arc::new(RuntimeEnv::default());
+        let runtime = 
Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(2))?);
         let projection = None;
-        let exec = get_exec("alltypes_plain.parquet", &projection, 2, 
None).await?;
+        let exec = get_exec("alltypes_plain.parquet", &projection, 
None).await?;
         let stream = exec.execute(0, runtime).await?;
 
         let tt_batches = stream
@@ -377,7 +377,7 @@ mod tests {
     async fn read_limit() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let projection = None;
-        let exec = get_exec("alltypes_plain.parquet", &projection, 1024, 
Some(1)).await?;
+        let exec = get_exec("alltypes_plain.parquet", &projection, 
Some(1)).await?;
 
         // note: even if the limit is set, the executor rounds up to the batch 
size
         assert_eq!(exec.statistics().num_rows, Some(8));
@@ -395,7 +395,7 @@ mod tests {
     async fn read_alltypes_plain_parquet() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let projection = None;
-        let exec = get_exec("alltypes_plain.parquet", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.parquet", &projection, 
None).await?;
 
         let x: Vec<String> = exec
             .schema()
@@ -432,7 +432,7 @@ mod tests {
     async fn read_bool_alltypes_plain_parquet() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![1]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.parquet", &projection, 
None).await?;
 
         let batches = collect(exec, runtime).await?;
         assert_eq!(1, batches.len());
@@ -461,7 +461,7 @@ mod tests {
     async fn read_i32_alltypes_plain_parquet() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![0]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.parquet", &projection, 
None).await?;
 
         let batches = collect(exec, runtime).await?;
         assert_eq!(1, batches.len());
@@ -487,7 +487,7 @@ mod tests {
     async fn read_i96_alltypes_plain_parquet() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![10]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.parquet", &projection, 
None).await?;
 
         let batches = collect(exec, runtime).await?;
         assert_eq!(1, batches.len());
@@ -513,7 +513,7 @@ mod tests {
     async fn read_f32_alltypes_plain_parquet() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![6]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.parquet", &projection, 
None).await?;
 
         let batches = collect(exec, runtime).await?;
         assert_eq!(1, batches.len());
@@ -542,7 +542,7 @@ mod tests {
     async fn read_f64_alltypes_plain_parquet() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![7]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.parquet", &projection, 
None).await?;
 
         let batches = collect(exec, runtime).await?;
         assert_eq!(1, batches.len());
@@ -571,7 +571,7 @@ mod tests {
     async fn read_binary_alltypes_plain_parquet() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let projection = Some(vec![9]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, 1024, 
None).await?;
+        let exec = get_exec("alltypes_plain.parquet", &projection, 
None).await?;
 
         let batches = collect(exec, runtime).await?;
         assert_eq!(1, batches.len());
@@ -599,7 +599,6 @@ mod tests {
     async fn get_exec(
         file_name: &str,
         projection: &Option<Vec<usize>>,
-        batch_size: usize,
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let testdata = crate::test_util::parquet_test_data();
@@ -616,13 +615,12 @@ mod tests {
         let file_groups = 
vec![vec![local_unpartitioned_file(filename.clone())]];
         let exec = format
             .create_physical_plan(
-                PhysicalPlanConfig {
+                FileScanConfig {
                     object_store: Arc::new(LocalFileSystem {}),
                     file_schema,
                     file_groups,
                     statistics,
                     projection: projection.clone(),
-                    batch_size,
                     limit,
                     table_partition_cols: vec![],
                 },
diff --git a/datafusion/src/datasource/listing/table.rs 
b/datafusion/src/datasource/listing/table.rs
index 22e3f75..ff6d322 100644
--- a/datafusion/src/datasource/listing/table.rs
+++ b/datafusion/src/datasource/listing/table.rs
@@ -28,7 +28,7 @@ use crate::{
     logical_plan::Expr,
     physical_plan::{
         empty::EmptyExec,
-        file_format::{PhysicalPlanConfig, DEFAULT_PARTITION_COLUMN_DATATYPE},
+        file_format::{FileScanConfig, DEFAULT_PARTITION_COLUMN_DATATYPE},
         ExecutionPlan, Statistics,
     },
 };
@@ -170,7 +170,6 @@ impl TableProvider for ListingTable {
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
-        batch_size: usize,
         filters: &[Expr],
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
@@ -193,13 +192,12 @@ impl TableProvider for ListingTable {
         self.options
             .format
             .create_physical_plan(
-                PhysicalPlanConfig {
+                FileScanConfig {
                     object_store: Arc::clone(&self.object_store),
                     file_schema: Arc::clone(&self.file_schema),
                     file_groups: partitioned_file_lists,
                     statistics,
                     projection: projection.clone(),
-                    batch_size,
                     limit,
                     table_partition_cols: 
self.options.table_partition_cols.clone(),
                 },
@@ -289,7 +287,7 @@ mod tests {
         let table = load_table("alltypes_plain.parquet").await?;
         let projection = None;
         let exec = table
-            .scan(&projection, 1024, &[], None)
+            .scan(&projection, &[], None)
             .await
             .expect("Scan table");
 
@@ -313,7 +311,7 @@ mod tests {
             .await?;
         let table =
             ListingTable::new(Arc::new(LocalFileSystem {}), filename, schema, 
opt);
-        let exec = table.scan(&None, 1024, &[], None).await?;
+        let exec = table.scan(&None, &[], None).await?;
         assert_eq!(exec.statistics().num_rows, Some(8));
         assert_eq!(exec.statistics().total_byte_size, Some(671));
 
@@ -345,7 +343,7 @@ mod tests {
         let filter = Expr::not_eq(col("p1"), lit("v1"));
 
         let scan = table
-            .scan(&None, 1024, &[filter], None)
+            .scan(&None, &[filter], None)
             .await
             .expect("Empty execution plan");
 
diff --git a/datafusion/src/datasource/memory.rs 
b/datafusion/src/datasource/memory.rs
index f09e691..c732b17 100644
--- a/datafusion/src/datasource/memory.rs
+++ b/datafusion/src/datasource/memory.rs
@@ -64,12 +64,11 @@ impl MemTable {
     /// Create a mem table by reading from another data source
     pub async fn load(
         t: Arc<dyn TableProvider>,
-        batch_size: usize,
         output_partitions: Option<usize>,
         runtime: Arc<RuntimeEnv>,
     ) -> Result<Self> {
         let schema = t.schema();
-        let exec = t.scan(&None, batch_size, &[], None).await?;
+        let exec = t.scan(&None, &[], None).await?;
         let partition_count = exec.output_partitioning().partition_count();
 
         let tasks = (0..partition_count)
@@ -131,7 +130,6 @@ impl TableProvider for MemTable {
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
-        _batch_size: usize,
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
@@ -175,7 +173,7 @@ mod tests {
         let provider = MemTable::try_new(schema, vec![vec![batch]])?;
 
         // scan with projection
-        let exec = provider.scan(&Some(vec![2, 1]), 1024, &[], None).await?;
+        let exec = provider.scan(&Some(vec![2, 1]), &[], None).await?;
         let mut it = exec.execute(0, runtime).await?;
         let batch2 = it.next().await.unwrap()?;
         assert_eq!(2, batch2.schema().fields().len());
@@ -206,7 +204,7 @@ mod tests {
 
         let provider = MemTable::try_new(schema, vec![vec![batch]])?;
 
-        let exec = provider.scan(&None, 1024, &[], None).await?;
+        let exec = provider.scan(&None, &[], None).await?;
         let mut it = exec.execute(0, runtime).await?;
         let batch1 = it.next().await.unwrap()?;
         assert_eq!(3, batch1.schema().fields().len());
@@ -236,7 +234,7 @@ mod tests {
 
         let projection: Vec<usize> = vec![0, 4];
 
-        match provider.scan(&Some(projection), 1024, &[], None).await {
+        match provider.scan(&Some(projection), &[], None).await {
             Err(DataFusionError::Internal(e)) => {
                 assert_eq!("\"Projection index out of range\"", 
format!("{:?}", e))
             }
@@ -358,7 +356,7 @@ mod tests {
         let provider =
             MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, 
batch2]])?;
 
-        let exec = provider.scan(&None, 1024, &[], None).await?;
+        let exec = provider.scan(&None, &[], None).await?;
         let mut it = exec.execute(0, runtime).await?;
         let batch1 = it.next().await.unwrap()?;
         assert_eq!(3, batch1.schema().fields().len());
diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index 03a0b4d..7a47b37 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -179,8 +179,7 @@ impl ExecutionContext {
                 .register_catalog(config.default_catalog.clone(), 
default_catalog);
         }
 
-        let runtime_env =
-            Arc::new(RuntimeEnv::new(config.runtime_config.clone()).unwrap());
+        let runtime_env = 
Arc::new(RuntimeEnv::new(config.runtime.clone()).unwrap());
 
         Self {
             state: Arc::new(Mutex::new(ExecutionContextState {
@@ -872,8 +871,6 @@ impl QueryPlanner for DefaultQueryPlanner {
 pub struct ExecutionConfig {
     /// Number of partitions for query execution. Increasing partitions can 
increase concurrency.
     pub target_partitions: usize,
-    /// Default batch size when reading data sources
-    pub batch_size: usize,
     /// Responsible for optimizing a logical plan
     optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
     /// Responsible for optimizing a physical execution plan
@@ -901,14 +898,13 @@ pub struct ExecutionConfig {
     /// Should Datafusion parquet reader using the predicate to prune data
     parquet_pruning: bool,
     /// Runtime configurations such as memory threshold and local disk for 
spill
-    pub runtime_config: RuntimeConfig,
+    pub runtime: RuntimeConfig,
 }
 
 impl Default for ExecutionConfig {
     fn default() -> Self {
         Self {
             target_partitions: num_cpus::get(),
-            batch_size: 8192,
             optimizers: vec![
                 // Simplify expressions first to maximize the chance
                 // of applying other optimizations
@@ -936,7 +932,7 @@ impl Default for ExecutionConfig {
             repartition_aggregations: true,
             repartition_windows: true,
             parquet_pruning: true,
-            runtime_config: RuntimeConfig::default(),
+            runtime: RuntimeConfig::default(),
         }
     }
 }
@@ -959,7 +955,7 @@ impl ExecutionConfig {
     pub fn with_batch_size(mut self, n: usize) -> Self {
         // batch size must be greater than zero
         assert!(n > 0);
-        self.batch_size = n;
+        self.runtime.batch_size = n;
         self
     }
 
@@ -1057,7 +1053,7 @@ impl ExecutionConfig {
 
     /// Customize runtime config
     pub fn with_runtime_config(mut self, config: RuntimeConfig) -> Self {
-        self.runtime_config = config;
+        self.runtime = config;
         self
     }
 }
@@ -3618,7 +3614,6 @@ mod tests {
             async fn scan(
                 &self,
                 _: &Option<Vec<usize>>,
-                _: usize,
                 _: &[Expr],
                 _: Option<usize>,
             ) -> Result<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/src/optimizer/filter_push_down.rs 
b/datafusion/src/optimizer/filter_push_down.rs
index c721712..ef171e1 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -1158,7 +1158,6 @@ mod tests {
         async fn scan(
             &self,
             _: &Option<Vec<usize>>,
-            _: usize,
             _: &[Expr],
             _: Option<usize>,
         ) -> Result<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/src/physical_optimizer/coalesce_batches.rs 
b/datafusion/src/physical_optimizer/coalesce_batches.rs
index 14616a7..98e65a2 100644
--- a/datafusion/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/src/physical_optimizer/coalesce_batches.rs
@@ -75,7 +75,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
                 // we should do that once 
https://issues.apache.org/jira/browse/ARROW-11059 is
                 // implemented. For now, we choose half the configured batch 
size to avoid copies
                 // when a small number of rows are removed from a batch
-                let target_batch_size = config.batch_size / 2;
+                let target_batch_size = config.runtime.batch_size / 2;
                 Arc::new(CoalesceBatchesExec::new(plan.clone(), 
target_batch_size))
             } else {
                 plan.clone()
diff --git a/datafusion/src/physical_optimizer/repartition.rs 
b/datafusion/src/physical_optimizer/repartition.rs
index 26f77ef..0926ed7 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -111,7 +111,7 @@ mod tests {
 
     use super::*;
     use crate::datasource::PartitionedFile;
-    use crate::physical_plan::file_format::{ParquetExec, PhysicalPlanConfig};
+    use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use crate::physical_plan::projection::ProjectionExec;
     use crate::physical_plan::Statistics;
     use crate::test::object_store::TestObjectStore;
@@ -122,13 +122,12 @@ mod tests {
         let parquet_project = ProjectionExec::try_new(
             vec![],
             Arc::new(ParquetExec::new(
-                PhysicalPlanConfig {
+                FileScanConfig {
                     object_store: TestObjectStore::new_arc(&[("x", 100)]),
                     file_schema,
                     file_groups: 
vec![vec![PartitionedFile::new("x".to_string(), 100)]],
                     statistics: Statistics::default(),
                     projection: None,
-                    batch_size: 2048,
                     limit: None,
                     table_partition_cols: vec![],
                 },
@@ -161,7 +160,7 @@ mod tests {
             Arc::new(ProjectionExec::try_new(
                 vec![],
                 Arc::new(ParquetExec::new(
-                    PhysicalPlanConfig {
+                    FileScanConfig {
                         object_store: TestObjectStore::new_arc(&[("x", 100)]),
                         file_schema,
                         file_groups: vec![vec![PartitionedFile::new(
@@ -170,7 +169,6 @@ mod tests {
                         )]],
                         statistics: Statistics::default(),
                         projection: None,
-                        batch_size: 2048,
                         limit: None,
                         table_partition_cols: vec![],
                     },
diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs 
b/datafusion/src/physical_plan/coalesce_partitions.rs
index 3fcacbb..2e3a552 100644
--- a/datafusion/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/src/physical_plan/coalesce_partitions.rs
@@ -213,7 +213,7 @@ mod tests {
 
     use super::*;
     use crate::datasource::object_store::local::LocalFileSystem;
-    use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::physical_plan::{collect, common};
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending};
@@ -228,13 +228,12 @@ mod tests {
         let (_, files) =
             test::create_partitioned_csv("aggregate_test_100.csv", 
num_partitions)?;
         let csv = CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_schema: schema,
                 file_groups: files,
                 statistics: Statistics::default(),
                 projection: None,
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
diff --git a/datafusion/src/physical_plan/file_format/avro.rs 
b/datafusion/src/physical_plan/file_format/avro.rs
index 6ab5bf9..9de8ffd 100644
--- a/datafusion/src/physical_plan/file_format/avro.rs
+++ b/datafusion/src/physical_plan/file_format/avro.rs
@@ -33,19 +33,19 @@ use std::sync::Arc;
 
 #[cfg(feature = "avro")]
 use super::file_stream::{BatchIter, FileStream};
-use super::PhysicalPlanConfig;
+use super::FileScanConfig;
 
 /// Execution plan for scanning Avro data source
 #[derive(Debug, Clone)]
 pub struct AvroExec {
-    base_config: PhysicalPlanConfig,
+    base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
 }
 
 impl AvroExec {
     /// Create a new Avro reader execution plan provided base configurations
-    pub fn new(base_config: PhysicalPlanConfig) -> Self {
+    pub fn new(base_config: FileScanConfig) -> Self {
         let (projected_schema, projected_statistics) = base_config.project();
 
         Self {
@@ -55,7 +55,7 @@ impl AvroExec {
         }
     }
     /// Ref to the base configs
-    pub fn base_config(&self) -> &PhysicalPlanConfig {
+    pub fn base_config(&self) -> &FileScanConfig {
         &self.base_config
     }
 }
@@ -107,11 +107,11 @@ impl ExecutionPlan for AvroExec {
     async fn execute(
         &self,
         partition: usize,
-        _runtime: Arc<RuntimeEnv>,
+        runtime: Arc<RuntimeEnv>,
     ) -> Result<SendableRecordBatchStream> {
         let proj = self.base_config.projected_file_column_names();
 
-        let batch_size = self.base_config.batch_size;
+        let batch_size = runtime.batch_size();
         let file_schema = Arc::clone(&self.base_config.file_schema);
 
         // The avro reader cannot limit the number of records, so `remaining` 
is ignored.
@@ -149,9 +149,8 @@ impl ExecutionPlan for AvroExec {
             DisplayFormatType::Default => {
                 write!(
                     f,
-                    "AvroExec: files={}, batch_size={}, limit={:?}",
+                    "AvroExec: files={}, limit={:?}",
                     super::FileGroupsDisplay(&self.base_config.file_groups),
-                    self.base_config.batch_size,
                     self.base_config.limit,
                 )
             }
@@ -180,7 +179,7 @@ mod tests {
     async fn avro_exec_without_partition() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let filename = format!("{}/avro/alltypes_plain.avro", testdata);
-        let avro_exec = AvroExec::new(PhysicalPlanConfig {
+        let avro_exec = AvroExec::new(FileScanConfig {
             object_store: Arc::new(LocalFileSystem {}),
             file_groups: 
vec![vec![local_unpartitioned_file(filename.clone())]],
             file_schema: AvroFormat {}
@@ -188,7 +187,6 @@ mod tests {
                 .await?,
             statistics: Statistics::default(),
             projection: Some(vec![0, 1, 2]),
-            batch_size: 1024,
             limit: None,
             table_partition_cols: vec![],
         });
@@ -241,7 +239,7 @@ mod tests {
             .infer_schema(local_object_reader_stream(vec![filename]))
             .await?;
 
-        let avro_exec = AvroExec::new(PhysicalPlanConfig {
+        let avro_exec = AvroExec::new(FileScanConfig {
             // select specific columns of the files as well as the partitioning
             // column which is supposed to be the last column in the table 
schema.
             projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
@@ -249,7 +247,6 @@ mod tests {
             file_groups: vec![vec![partitioned_file]],
             file_schema: file_schema,
             statistics: Statistics::default(),
-            batch_size: 1024,
             limit: None,
             table_partition_cols: vec!["date".to_owned()],
         });
diff --git a/datafusion/src/physical_plan/file_format/csv.rs 
b/datafusion/src/physical_plan/file_format/csv.rs
index ea965a4..5cff3b6 100644
--- a/datafusion/src/physical_plan/file_format/csv.rs
+++ b/datafusion/src/physical_plan/file_format/csv.rs
@@ -31,12 +31,12 @@ use crate::execution::runtime_env::RuntimeEnv;
 use async_trait::async_trait;
 
 use super::file_stream::{BatchIter, FileStream};
-use super::PhysicalPlanConfig;
+use super::FileScanConfig;
 
 /// Execution plan for scanning a CSV file
 #[derive(Debug, Clone)]
 pub struct CsvExec {
-    base_config: PhysicalPlanConfig,
+    base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
     has_header: bool,
@@ -45,7 +45,7 @@ pub struct CsvExec {
 
 impl CsvExec {
     /// Create a new CSV reader execution plan provided base and specific 
configurations
-    pub fn new(base_config: PhysicalPlanConfig, has_header: bool, delimiter: 
u8) -> Self {
+    pub fn new(base_config: FileScanConfig, has_header: bool, delimiter: u8) 
-> Self {
         let (projected_schema, projected_statistics) = base_config.project();
 
         Self {
@@ -58,7 +58,7 @@ impl CsvExec {
     }
 
     /// Ref to the base configs
-    pub fn base_config(&self) -> &PhysicalPlanConfig {
+    pub fn base_config(&self) -> &FileScanConfig {
         &self.base_config
     }
     /// true if the first line of each file is a header
@@ -110,9 +110,9 @@ impl ExecutionPlan for CsvExec {
     async fn execute(
         &self,
         partition: usize,
-        _runtime: Arc<RuntimeEnv>,
+        runtime: Arc<RuntimeEnv>,
     ) -> Result<SendableRecordBatchStream> {
-        let batch_size = self.base_config.batch_size;
+        let batch_size = runtime.batch_size();
         let file_schema = Arc::clone(&self.base_config.file_schema);
         let file_projection = 
self.base_config.file_column_projection_indices();
         let has_header = self.has_header;
@@ -153,10 +153,9 @@ impl ExecutionPlan for CsvExec {
             DisplayFormatType::Default => {
                 write!(
                     f,
-                    "CsvExec: files={}, has_header={}, batch_size={}, 
limit={:?}",
+                    "CsvExec: files={}, has_header={}, limit={:?}",
                     super::FileGroupsDisplay(&self.base_config.file_groups),
                     self.has_header,
-                    self.base_config.batch_size,
                     self.base_config.limit,
                 )
             }
@@ -186,13 +185,12 @@ mod tests {
         let filename = "aggregate_test_100.csv";
         let path = format!("{}/csv/{}", testdata, filename);
         let csv = CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_schema,
                 file_groups: vec![vec![local_unpartitioned_file(path)]],
                 statistics: Statistics::default(),
                 projection: Some(vec![0, 2, 4]),
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
@@ -233,13 +231,12 @@ mod tests {
         let filename = "aggregate_test_100.csv";
         let path = format!("{}/csv/{}", testdata, filename);
         let csv = CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_schema,
                 file_groups: vec![vec![local_unpartitioned_file(path)]],
                 statistics: Statistics::default(),
                 projection: None,
-                batch_size: 1024,
                 limit: Some(5),
                 table_partition_cols: vec![],
             },
@@ -285,7 +282,7 @@ mod tests {
         partitioned_file.partition_values =
             vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
         let csv = CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 // we should be able to project on the partition column
                 // wich is supposed to be after the file fields
                 projection: Some(vec![0, file_schema.fields().len()]),
@@ -293,7 +290,6 @@ mod tests {
                 file_schema,
                 file_groups: vec![vec![partitioned_file]],
                 statistics: Statistics::default(),
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec!["date".to_owned()],
             },
diff --git a/datafusion/src/physical_plan/file_format/json.rs 
b/datafusion/src/physical_plan/file_format/json.rs
index a0959c2..0fc95d1 100644
--- a/datafusion/src/physical_plan/file_format/json.rs
+++ b/datafusion/src/physical_plan/file_format/json.rs
@@ -28,19 +28,19 @@ use std::any::Any;
 use std::sync::Arc;
 
 use super::file_stream::{BatchIter, FileStream};
-use super::PhysicalPlanConfig;
+use super::FileScanConfig;
 
 /// Execution plan for scanning NdJson data source
 #[derive(Debug, Clone)]
 pub struct NdJsonExec {
-    base_config: PhysicalPlanConfig,
+    base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
 }
 
 impl NdJsonExec {
     /// Create a new JSON reader execution plan provided base configurations
-    pub fn new(base_config: PhysicalPlanConfig) -> Self {
+    pub fn new(base_config: FileScanConfig) -> Self {
         let (projected_schema, projected_statistics) = base_config.project();
 
         Self {
@@ -86,11 +86,11 @@ impl ExecutionPlan for NdJsonExec {
     async fn execute(
         &self,
         partition: usize,
-        _runtime: Arc<RuntimeEnv>,
+        runtime: Arc<RuntimeEnv>,
     ) -> Result<SendableRecordBatchStream> {
         let proj = self.base_config.projected_file_column_names();
 
-        let batch_size = self.base_config.batch_size;
+        let batch_size = runtime.batch_size();
         let file_schema = Arc::clone(&self.base_config.file_schema);
 
         // The json reader cannot limit the number of records, so `remaining` 
is ignored.
@@ -122,8 +122,7 @@ impl ExecutionPlan for NdJsonExec {
             DisplayFormatType::Default => {
                 write!(
                     f,
-                    "JsonExec: batch_size={}, limit={:?}, files={}",
-                    self.base_config.batch_size,
+                    "JsonExec: limit={:?}, files={}",
                     self.base_config.limit,
                     super::FileGroupsDisplay(&self.base_config.file_groups),
                 )
@@ -162,13 +161,12 @@ mod tests {
         let runtime = Arc::new(RuntimeEnv::default());
         use arrow::datatypes::DataType;
         let path = format!("{}/1.json", TEST_DATA_BASE);
-        let exec = NdJsonExec::new(PhysicalPlanConfig {
+        let exec = NdJsonExec::new(FileScanConfig {
             object_store: Arc::new(LocalFileSystem {}),
             file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
             file_schema: infer_schema(path).await?,
             statistics: Statistics::default(),
             projection: None,
-            batch_size: 1024,
             limit: Some(3),
             table_partition_cols: vec![],
         });
@@ -217,13 +215,12 @@ mod tests {
     async fn nd_json_exec_file_projection() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let path = format!("{}/1.json", TEST_DATA_BASE);
-        let exec = NdJsonExec::new(PhysicalPlanConfig {
+        let exec = NdJsonExec::new(FileScanConfig {
             object_store: Arc::new(LocalFileSystem {}),
             file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
             file_schema: infer_schema(path).await?,
             statistics: Statistics::default(),
             projection: Some(vec![0, 2]),
-            batch_size: 1024,
             limit: None,
             table_partition_cols: vec![],
         });
diff --git a/datafusion/src/physical_plan/file_format/mod.rs 
b/datafusion/src/physical_plan/file_format/mod.rs
index 17ec9f1..b655cdb 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -57,7 +57,7 @@ lazy_static! {
 /// The base configurations to provide when creating a physical plan for
 /// any given file format.
 #[derive(Debug, Clone)]
-pub struct PhysicalPlanConfig {
+pub struct FileScanConfig {
     /// Store from which the `files` should be fetched
     pub object_store: Arc<dyn ObjectStore>,
     /// Schema before projection. It contains the columns that are expected
@@ -70,15 +70,13 @@ pub struct PhysicalPlanConfig {
     /// Columns on which to project the data. Indexes that are higher than the
     /// number of columns of `file_schema` refer to `table_partition_cols`.
     pub projection: Option<Vec<usize>>,
-    /// The maximum number of records per arrow column
-    pub batch_size: usize,
     /// The minimum number of records required from this source plan
     pub limit: Option<usize>,
     /// The partitioning column names
     pub table_partition_cols: Vec<String>,
 }
 
-impl PhysicalPlanConfig {
+impl FileScanConfig {
     /// Project the schema and the statistics on the given column indices
     fn project(&self) -> (SchemaRef, Statistics) {
         if self.projection.is_none() && self.table_partition_cols.is_empty() {
@@ -475,9 +473,8 @@ mod tests {
         projection: Option<Vec<usize>>,
         statistics: Statistics,
         table_partition_cols: Vec<String>,
-    ) -> PhysicalPlanConfig {
-        PhysicalPlanConfig {
-            batch_size: 1024,
+    ) -> FileScanConfig {
+        FileScanConfig {
             file_schema,
             file_groups: vec![vec![]],
             limit: None,
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs 
b/datafusion/src/physical_plan/file_format/parquet.rs
index 73f0b8d..17abb43 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -29,7 +29,7 @@ use crate::{
     logical_plan::{Column, Expr},
     physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
     physical_plan::{
-        file_format::PhysicalPlanConfig,
+        file_format::FileScanConfig,
         metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
         stream::RecordBatchReceiverStream,
         DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
@@ -67,7 +67,7 @@ use super::PartitionColumnProjector;
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
-    base_config: PhysicalPlanConfig,
+    base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
     /// Execution metrics
@@ -88,7 +88,7 @@ struct ParquetFileMetrics {
 impl ParquetExec {
     /// Create a new Parquet reader execution plan provided file list and 
schema.
     /// Even if `limit` is set, ParquetExec rounds up the number of records to 
the next `batch_size`.
-    pub fn new(base_config: PhysicalPlanConfig, predicate: Option<Expr>) -> 
Self {
+    pub fn new(base_config: FileScanConfig, predicate: Option<Expr>) -> Self {
         debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: 
{:?}, limit: {:?}",
         base_config.file_groups, base_config.projection, predicate, 
base_config.limit);
 
@@ -125,7 +125,7 @@ impl ParquetExec {
     }
 
     /// Ref to the base configs
-    pub fn base_config(&self) -> &PhysicalPlanConfig {
+    pub fn base_config(&self) -> &FileScanConfig {
         &self.base_config
     }
 }
@@ -190,7 +190,7 @@ impl ExecutionPlan for ParquetExec {
     async fn execute(
         &self,
         partition_index: usize,
-        _runtime: Arc<RuntimeEnv>,
+        runtime: Arc<RuntimeEnv>,
     ) -> Result<SendableRecordBatchStream> {
         // because the parquet implementation is not thread-safe, it is 
necessary to execute
         // on a thread and communicate with channels
@@ -206,7 +206,7 @@ impl ExecutionPlan for ParquetExec {
             None => (0..self.base_config.file_schema.fields().len()).collect(),
         };
         let pruning_predicate = self.pruning_predicate.clone();
-        let batch_size = self.base_config.batch_size;
+        let batch_size = runtime.batch_size();
         let limit = self.base_config.limit;
         let object_store = Arc::clone(&self.base_config.object_store);
         let partition_col_proj = PartitionColumnProjector::new(
@@ -247,8 +247,7 @@ impl ExecutionPlan for ParquetExec {
             DisplayFormatType::Default => {
                 write!(
                     f,
-                    "ParquetExec: batch_size={}, limit={:?}, partitions={}",
-                    self.base_config.batch_size,
+                    "ParquetExec: limit={:?}, partitions={}",
                     self.base_config.limit,
                     super::FileGroupsDisplay(&self.base_config.file_groups)
                 )
@@ -480,7 +479,7 @@ mod tests {
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/alltypes_plain.parquet", testdata);
         let parquet_exec = ParquetExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_groups: 
vec![vec![local_unpartitioned_file(filename.clone())]],
                 file_schema: ParquetFormat::default()
@@ -488,7 +487,6 @@ mod tests {
                     .await?,
                 statistics: Statistics::default(),
                 projection: Some(vec![0, 1, 2]),
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
@@ -531,7 +529,7 @@ mod tests {
             ScalarValue::Utf8(Some("26".to_owned())),
         ];
         let parquet_exec = ParquetExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_groups: vec![vec![partitioned_file]],
                 file_schema: ParquetFormat::default()
@@ -540,7 +538,6 @@ mod tests {
                 statistics: Statistics::default(),
                 // file has 10 cols so index 12 should be month
                 projection: Some(vec![0, 1, 2, 12]),
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![
                     "year".to_owned(),
diff --git a/datafusion/src/physical_plan/filter.rs 
b/datafusion/src/physical_plan/filter.rs
index dab5fac..a071d0e 100644
--- a/datafusion/src/physical_plan/filter.rs
+++ b/datafusion/src/physical_plan/filter.rs
@@ -229,7 +229,7 @@ mod tests {
     use super::*;
     use crate::datasource::object_store::local::LocalFileSystem;
     use crate::physical_plan::expressions::*;
-    use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::physical_plan::ExecutionPlan;
     use crate::scalar::ScalarValue;
     use crate::test;
@@ -247,13 +247,12 @@ mod tests {
             test::create_partitioned_csv("aggregate_test_100.csv", 
partitions)?;
 
         let csv = CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_schema: Arc::clone(&schema),
                 file_groups: files,
                 statistics: Statistics::default(),
                 projection: None,
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
diff --git a/datafusion/src/physical_plan/limit.rs 
b/datafusion/src/physical_plan/limit.rs
index 3e09636..f022557 100644
--- a/datafusion/src/physical_plan/limit.rs
+++ b/datafusion/src/physical_plan/limit.rs
@@ -396,7 +396,7 @@ mod tests {
     use crate::datasource::object_store::local::LocalFileSystem;
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::common;
-    use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::{test, test_util};
 
     #[tokio::test]
@@ -409,13 +409,12 @@ mod tests {
             test::create_partitioned_csv("aggregate_test_100.csv", 
num_partitions)?;
 
         let csv = CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_schema: schema,
                 file_groups: files,
                 statistics: Statistics::default(),
                 projection: None,
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index 836d399..b019b52 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -223,7 +223,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
 ///              \n  CoalesceBatchesExec: target_batch_size=4096\
 ///              \n    FilterExec: a@0 < 5\
 ///              \n      RepartitionExec: partitioning=RoundRobinBatch(3)\
-///              \n        CsvExec: files=[tests/example.csv], 
has_header=true, batch_size=8192, limit=None",
+///              \n        CsvExec: files=[tests/example.csv], 
has_header=true, limit=None",
 ///               plan_string.trim());
 /// }
 /// ```
diff --git a/datafusion/src/physical_plan/planner.rs 
b/datafusion/src/physical_plan/planner.rs
index fa0c101..2dcde9d 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -327,8 +327,6 @@ impl DefaultPhysicalPlanner {
         ctx_state: &'a ExecutionContextState,
     ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
         async move {
-            let batch_size = ctx_state.config.batch_size;
-
             let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan 
{
                 LogicalPlan::TableScan (TableScan {
                     source,
@@ -342,7 +340,7 @@ impl DefaultPhysicalPlanner {
                     // referred to in the query
                     let filters = unnormalize_cols(filters.iter().cloned());
                     let unaliased: Vec<Expr> = 
filters.into_iter().map(unalias).collect();
-                    source.scan(projection, batch_size, &unaliased, 
*limit).await
+                    source.scan(projection, &unaliased, *limit).await
                 }
                 LogicalPlan::Values(Values {
                     values,
diff --git a/datafusion/src/physical_plan/projection.rs 
b/datafusion/src/physical_plan/projection.rs
index d86548b..6d9d587 100644
--- a/datafusion/src/physical_plan/projection.rs
+++ b/datafusion/src/physical_plan/projection.rs
@@ -290,7 +290,7 @@ mod tests {
     use super::*;
     use crate::datasource::object_store::local::LocalFileSystem;
     use crate::physical_plan::expressions::{self, col};
-    use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::scalar::ScalarValue;
     use crate::test::{self};
     use crate::test_util;
@@ -306,13 +306,12 @@ mod tests {
             test::create_partitioned_csv("aggregate_test_100.csv", 
partitions)?;
 
         let csv = CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_schema: Arc::clone(&schema),
                 file_groups: files,
                 statistics: Statistics::default(),
                 projection: None,
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
diff --git a/datafusion/src/physical_plan/sorts/external_sort.rs 
b/datafusion/src/physical_plan/sorts/external_sort.rs
index 8550cb5..6c60aac 100644
--- a/datafusion/src/physical_plan/sorts/external_sort.rs
+++ b/datafusion/src/physical_plan/sorts/external_sort.rs
@@ -153,7 +153,6 @@ impl ExternalSorter {
                 streams,
                 self.schema.clone(),
                 &self.expr,
-                self.runtime.batch_size(),
                 baseline_metrics,
                 partition,
                 self.runtime.clone(),
@@ -523,7 +522,7 @@ mod tests {
     use crate::physical_plan::expressions::col;
     use crate::physical_plan::{
         collect,
-        file_format::{CsvExec, PhysicalPlanConfig},
+        file_format::{CsvExec, FileScanConfig},
     };
     use crate::test;
     use crate::test_util;
@@ -538,13 +537,12 @@ mod tests {
             test::create_partitioned_csv("aggregate_test_100.csv", 
partitions)?;
 
         let csv = CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_schema: Arc::clone(&schema),
                 file_groups: files,
                 statistics: Statistics::default(),
                 projection: None,
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
diff --git a/datafusion/src/physical_plan/sorts/sort.rs 
b/datafusion/src/physical_plan/sorts/sort.rs
index 678ad03..a210b93 100644
--- a/datafusion/src/physical_plan/sorts/sort.rs
+++ b/datafusion/src/physical_plan/sorts/sort.rs
@@ -323,7 +323,7 @@ mod tests {
     use crate::physical_plan::memory::MemoryExec;
     use crate::physical_plan::{
         collect,
-        file_format::{CsvExec, PhysicalPlanConfig},
+        file_format::{CsvExec, FileScanConfig},
     };
     use crate::test::assert_is_pending;
     use crate::test::exec::assert_strong_count_converges_to_zero;
@@ -342,13 +342,12 @@ mod tests {
             test::create_partitioned_csv("aggregate_test_100.csv", 
partitions)?;
 
         let csv = CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_schema: Arc::clone(&schema),
                 file_groups: files,
                 statistics: Statistics::default(),
                 projection: None,
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs 
b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
index f3bed69..9f12891 100644
--- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -65,23 +65,16 @@ pub struct SortPreservingMergeExec {
     input: Arc<dyn ExecutionPlan>,
     /// Sort expressions
     expr: Vec<PhysicalSortExpr>,
-    /// The target size of yielded batches
-    target_batch_size: usize,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
 
 impl SortPreservingMergeExec {
     /// Create a new sort execution plan
-    pub fn new(
-        expr: Vec<PhysicalSortExpr>,
-        input: Arc<dyn ExecutionPlan>,
-        target_batch_size: usize,
-    ) -> Self {
+    pub fn new(expr: Vec<PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>) -> 
Self {
         Self {
             input,
             expr,
-            target_batch_size,
             metrics: ExecutionPlanMetricsSet::new(),
         }
     }
@@ -129,7 +122,6 @@ impl ExecutionPlan for SortPreservingMergeExec {
             1 => Ok(Arc::new(SortPreservingMergeExec::new(
                 self.expr.clone(),
                 children[0].clone(),
-                self.target_batch_size,
             ))),
             _ => Err(DataFusionError::Internal(
                 "SortPreservingMergeExec wrong number of children".to_string(),
@@ -182,7 +174,6 @@ impl ExecutionPlan for SortPreservingMergeExec {
                         AbortOnDropMany(join_handles),
                         self.schema(),
                         &self.expr,
-                        self.target_batch_size,
                         baseline_metrics,
                         partition,
                         runtime.clone(),
@@ -304,9 +295,6 @@ pub(crate) struct SortPreservingMergeStream {
     /// The sort options for each expression
     sort_options: Arc<Vec<SortOptions>>,
 
-    /// The desired RecordBatch size to yield
-    target_batch_size: usize,
-
     /// used to record execution metrics
     baseline_metrics: BaselineMetrics,
 
@@ -333,7 +321,6 @@ impl SortPreservingMergeStream {
         _drop_helper: AbortOnDropMany<()>,
         schema: SchemaRef,
         expressions: &[PhysicalSortExpr],
-        target_batch_size: usize,
         baseline_metrics: BaselineMetrics,
         partition: usize,
         runtime: Arc<RuntimeEnv>,
@@ -354,7 +341,6 @@ impl SortPreservingMergeStream {
             _drop_helper,
             column_expressions: expressions.iter().map(|x| 
x.expr.clone()).collect(),
             sort_options: Arc::new(expressions.iter().map(|x| 
x.options).collect()),
-            target_batch_size,
             baseline_metrics,
             aborted: false,
             in_progress: vec![],
@@ -367,7 +353,6 @@ impl SortPreservingMergeStream {
         streams: Vec<SortedStream>,
         schema: SchemaRef,
         expressions: &[PhysicalSortExpr],
-        target_batch_size: usize,
         baseline_metrics: BaselineMetrics,
         partition: usize,
         runtime: Arc<RuntimeEnv>,
@@ -392,7 +377,6 @@ impl SortPreservingMergeStream {
             _drop_helper: AbortOnDropMany(vec![]),
             column_expressions: expressions.iter().map(|x| 
x.expr.clone()).collect(),
             sort_options: Arc::new(expressions.iter().map(|x| 
x.options).collect()),
-            target_batch_size,
             baseline_metrics,
             aborted: false,
             in_progress: vec![],
@@ -628,7 +612,7 @@ impl SortPreservingMergeStream {
                 row_idx,
             });
 
-            if self.in_progress.len() == self.target_batch_size {
+            if self.in_progress.len() == self.runtime.batch_size() {
                 return Poll::Ready(Some(self.build_record_batch()));
             }
 
@@ -665,7 +649,7 @@ mod tests {
     use crate::arrow::array::{Int32Array, StringArray, 
TimestampNanosecondArray};
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::expressions::col;
-    use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::physical_plan::memory::MemoryExec;
     use crate::physical_plan::sorts::sort::SortExec;
     use crate::physical_plan::{collect, common};
@@ -673,6 +657,7 @@ mod tests {
     use crate::{assert_batches_eq, test_util};
 
     use super::*;
+    use crate::execution::runtime_env::RuntimeConfig;
     use arrow::datatypes::{DataType, Field, Schema};
     use futures::{FutureExt, SinkExt};
     use tokio_stream::StreamExt;
@@ -903,7 +888,7 @@ mod tests {
             },
         ];
         let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
-        let merge = Arc::new(SortPreservingMergeExec::new(sort, 
Arc::new(exec), 1024));
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, 
Arc::new(exec)));
 
         let collected = collect(merge, runtime).await.unwrap();
         assert_batches_eq!(exp, collected.as_slice());
@@ -914,7 +899,7 @@ mod tests {
         sort: Vec<PhysicalSortExpr>,
         runtime: Arc<RuntimeEnv>,
     ) -> RecordBatch {
-        let merge = Arc::new(SortPreservingMergeExec::new(sort, input, 1024));
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, input));
         let mut result = collect(merge, runtime).await.unwrap();
         assert_eq!(result.len(), 1);
         result.remove(0)
@@ -951,13 +936,12 @@ mod tests {
             test::create_partitioned_csv("aggregate_test_100.csv", 
partitions).unwrap();
 
         let csv = Arc::new(CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_schema: Arc::clone(&schema),
                 file_groups: files,
                 statistics: Statistics::default(),
                 projection: None,
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
@@ -1039,13 +1023,12 @@ mod tests {
             test::create_partitioned_csv("aggregate_test_100.csv", 
partitions).unwrap();
 
         let csv = Arc::new(CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_schema: schema,
                 file_groups: files,
                 statistics: Statistics::default(),
                 projection: None,
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
@@ -1106,7 +1089,8 @@ mod tests {
 
     #[tokio::test]
     async fn test_partition_sort_streaming_input_output() {
-        let runtime = Arc::new(RuntimeEnv::default());
+        let runtime =
+            
Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(23)).unwrap());
         let schema = test_util::aggr_test_schema();
 
         let sort = vec![
@@ -1126,7 +1110,7 @@ mod tests {
             sorted_partitioned_input(sort.clone(), &[10, 5, 13], 
runtime.clone()).await;
         let basic = basic_sort(input.clone(), sort.clone(), 
runtime.clone()).await;
 
-        let merge = Arc::new(SortPreservingMergeExec::new(sort, input, 23));
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, input));
         let merged = collect(merge, runtime.clone()).await.unwrap();
 
         assert_eq!(merged.len(), 14);
@@ -1199,7 +1183,7 @@ mod tests {
             },
         ];
         let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, 
None).unwrap();
-        let merge = Arc::new(SortPreservingMergeExec::new(sort, 
Arc::new(exec), 1024));
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, 
Arc::new(exec)));
 
         let collected = collect(merge, runtime).await.unwrap();
         assert_eq!(collected.len(), 1);
@@ -1264,7 +1248,6 @@ mod tests {
             AbortOnDropMany(vec![]),
             batches.schema(),
             sort.as_slice(),
-            1024,
             baseline_metrics,
             0,
             runtime.clone(),
@@ -1313,7 +1296,7 @@ mod tests {
             options: Default::default(),
         }];
         let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, 
None).unwrap();
-        let merge = Arc::new(SortPreservingMergeExec::new(sort, 
Arc::new(exec), 1024));
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, 
Arc::new(exec)));
 
         let collected = collect(merge.clone(), runtime).await.unwrap();
         let expected = vec![
@@ -1366,7 +1349,6 @@ mod tests {
                 options: SortOptions::default(),
             }],
             blocking_exec,
-            1,
         ));
 
         let fut = collect(sort_preserving_merge_exec, runtime);
diff --git a/datafusion/src/physical_plan/union.rs 
b/datafusion/src/physical_plan/union.rs
index efbc623..93ecf22 100644
--- a/datafusion/src/physical_plan/union.rs
+++ b/datafusion/src/physical_plan/union.rs
@@ -229,7 +229,7 @@ mod tests {
     use crate::{
         physical_plan::{
             collect,
-            file_format::{CsvExec, PhysicalPlanConfig},
+            file_format::{CsvExec, FileScanConfig},
         },
         scalar::ScalarValue,
     };
@@ -246,13 +246,12 @@ mod tests {
         let (_, files2) = 
test::create_partitioned_csv("aggregate_test_100.csv", 5)?;
 
         let csv = CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::clone(&fs),
                 file_schema: Arc::clone(&schema),
                 file_groups: files,
                 statistics: Statistics::default(),
                 projection: None,
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
@@ -261,13 +260,12 @@ mod tests {
         );
 
         let csv2 = CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::clone(&fs),
                 file_schema: Arc::clone(&schema),
                 file_groups: files2,
                 statistics: Statistics::default(),
                 projection: None,
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
diff --git a/datafusion/src/physical_plan/windows/mod.rs 
b/datafusion/src/physical_plan/windows/mod.rs
index 42bc27c..243c571 100644
--- a/datafusion/src/physical_plan/windows/mod.rs
+++ b/datafusion/src/physical_plan/windows/mod.rs
@@ -177,7 +177,7 @@ mod tests {
     use crate::execution::runtime_env::RuntimeEnv;
     use crate::physical_plan::aggregates::AggregateFunction;
     use crate::physical_plan::expressions::col;
-    use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::physical_plan::{collect, Statistics};
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending};
@@ -192,13 +192,12 @@ mod tests {
         let (_, files) =
             test::create_partitioned_csv("aggregate_test_100.csv", 
partitions)?;
         let csv = CsvExec::new(
-            PhysicalPlanConfig {
+            FileScanConfig {
                 object_store: Arc::new(LocalFileSystem {}),
                 file_schema: aggr_test_schema(),
                 file_groups: files,
                 statistics: Statistics::default(),
                 projection: None,
-                batch_size: 1024,
                 limit: None,
                 table_partition_cols: vec![],
             },
diff --git a/datafusion/tests/custom_sources.rs 
b/datafusion/tests/custom_sources.rs
index 6f0daa4..c2511ba 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -201,7 +201,6 @@ impl TableProvider for CustomTableProvider {
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
-        _batch_size: usize,
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/tests/provider_filter_pushdown.rs 
b/datafusion/tests/provider_filter_pushdown.rs
index 330e95c..5e14524 100644
--- a/datafusion/tests/provider_filter_pushdown.rs
+++ b/datafusion/tests/provider_filter_pushdown.rs
@@ -128,7 +128,6 @@ impl TableProvider for CustomProvider {
     async fn scan(
         &self,
         _: &Option<Vec<usize>>,
-        _: usize,
         filters: &[Expr],
         _: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/tests/sql/avro.rs b/datafusion/tests/sql/avro.rs
index f3c0f0c..d0cdf71 100644
--- a/datafusion/tests/sql/avro.rs
+++ b/datafusion/tests/sql/avro.rs
@@ -154,7 +154,7 @@ async fn avro_explain() {
             \n    CoalescePartitionsExec\
             \n      HashAggregateExec: mode=Partial, gby=[], 
aggr=[COUNT(UInt8(1))]\
             \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
-            \n          AvroExec: 
files=[ARROW_TEST_DATA/avro/alltypes_plain.avro], batch_size=8192, limit=None\
+            \n          AvroExec: 
files=[ARROW_TEST_DATA/avro/alltypes_plain.avro], limit=None\
             \n",
         ],
     ];
diff --git a/datafusion/tests/sql/explain_analyze.rs 
b/datafusion/tests/sql/explain_analyze.rs
index 128a0d8..7c1fa69 100644
--- a/datafusion/tests/sql/explain_analyze.rs
+++ b/datafusion/tests/sql/explain_analyze.rs
@@ -658,7 +658,7 @@ async fn test_physical_plan_display_indent() {
         "                CoalesceBatchesExec: target_batch_size=4096",
         "                  FilterExec: c12@1 < CAST(10 AS Float64)",
         "                    RepartitionExec: partitioning=RoundRobinBatch(3)",
-        "                      CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
batch_size=8192, limit=None",
+        "                      CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
limit=None",
     ];
 
     let data_path = datafusion::test_util::arrow_test_data();
@@ -703,13 +703,13 @@ async fn 
test_physical_plan_display_indent_multi_children() {
         "          ProjectionExec: expr=[c1@0 as c1]",
         "            ProjectionExec: expr=[c1@0 as c1]",
         "              RepartitionExec: partitioning=RoundRobinBatch(3)",
-        "                CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
batch_size=8192, limit=None",
+        "                CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
limit=None",
         "      CoalesceBatchesExec: target_batch_size=4096",
         "        RepartitionExec: partitioning=Hash([Column { name: \"c2\", 
index: 0 }], 3)",
         "          ProjectionExec: expr=[c2@0 as c2]",
         "            ProjectionExec: expr=[c1@0 as c2]",
         "              RepartitionExec: partitioning=RoundRobinBatch(3)",
-        "                CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
batch_size=8192, limit=None",
+        "                CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
limit=None",
     ];
 
     let data_path = datafusion::test_util::arrow_test_data();
@@ -751,7 +751,7 @@ async fn csv_explain() {
               \n  CoalesceBatchesExec: target_batch_size=4096\
               \n    FilterExec: CAST(c2@1 AS Int64) > 10\
               \n      RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
-              \n        CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
batch_size=8192, limit=None\
+              \n        CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None\
               \n"
         ]];
     assert_eq!(expected, actual);
diff --git a/datafusion/tests/statistics.rs b/datafusion/tests/statistics.rs
index 0e97717..4964baf 100644
--- a/datafusion/tests/statistics.rs
+++ b/datafusion/tests/statistics.rs
@@ -73,7 +73,6 @@ impl TableProvider for StatisticsValidation {
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
-        _batch_size: usize,
         filters: &[Expr],
         // limit is ignored because it is not mandatory for a `TableProvider` 
to honor it
         _limit: Option<usize>,
diff --git a/datafusion/tests/user_defined_plan.rs 
b/datafusion/tests/user_defined_plan.rs
index a8d19d6..a4d9e97 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -220,9 +220,9 @@ async fn topk_plan() -> Result<()> {
     let mut ctx = setup_table(make_topk_context()).await?;
 
     let expected = vec![
-        "| logical_plan after topk                               | TopK: k=3   
                                                                               
|",
-        "|                                                       |   
Projection: #sales.customer_id, #sales.revenue                                  
         |",
-        "|                                                       |     
TableScan: sales projection=Some([0, 1])                                        
       |",
+        "| logical_plan after topk                               | TopK: k=3   
                                                                  |",
+        "|                                                       |   
Projection: #sales.customer_id, #sales.revenue                              |",
+        "|                                                       |     
TableScan: sales projection=Some([0, 1])                                  |",
     ].join("\n");
 
     let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);

Reply via email to