This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b577f3  Multiple files per partitions for CSV Avro Json (#1138)
4b577f3 is described below

commit 4b577f374ce0922f61608be25d8d91c59a65c2cf
Author: rdettai <rdet...@gmail.com>
AuthorDate: Wed Oct 20 12:54:14 2021 +0200

    Multiple files per partitions for CSV Avro Json (#1138)
    
    * [feat] multi file partition for csv avro json
    
    * [fix] typos
    
    * [fix] aliasing closure trait
---
 ballista/rust/core/proto/ballista.proto            |   4 +-
 .../core/src/serde/physical_plan/from_proto.rs     |  12 +-
 .../rust/core/src/serde/physical_plan/to_proto.rs  |  22 +-
 datafusion/src/datasource/file_format/avro.rs      |   3 +-
 datafusion/src/datasource/file_format/csv.rs       |   3 +-
 datafusion/src/datasource/file_format/json.rs      |   3 +-
 datafusion/src/physical_plan/file_format/avro.rs   | 158 ++++--------
 datafusion/src/physical_plan/file_format/csv.rs    | 146 ++++--------
 .../src/physical_plan/file_format/file_stream.rs   | 265 +++++++++++++++++++++
 datafusion/src/physical_plan/file_format/json.rs   | 138 +++--------
 datafusion/src/physical_plan/file_format/mod.rs    |   1 +
 datafusion/src/test/mod.rs                         |  19 +-
 12 files changed, 436 insertions(+), 338 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 49b65cf..338c5a6 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -615,7 +615,7 @@ message ParquetScanExecNode {
 }
 
 message CsvScanExecNode {
-  repeated PartitionedFile files = 1;
+  repeated FileGroup file_groups = 1;
   Schema schema = 2;
   bool has_header = 3;
   uint32 batch_size = 4;
@@ -626,7 +626,7 @@ message CsvScanExecNode {
 }
 
 message AvroScanExecNode {
-  repeated PartitionedFile files = 1;
+  repeated FileGroup file_groups = 1;
   Schema schema = 2;
   uint32 batch_size = 4;
   repeated uint32 projection = 6;
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 75dd915..dce354a 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -125,10 +125,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for 
&protobuf::PhysicalPlanNode {
 
                 Ok(Arc::new(CsvExec::new(
                     Arc::new(LocalFileSystem {}),
-                    scan.files
+                    scan.file_groups
                         .iter()
-                        .map(|f| f.into())
-                        .collect::<Vec<PartitionedFile>>(),
+                        .map(|p| p.into())
+                        .collect::<Vec<Vec<PartitionedFile>>>(),
                     statistics,
                     schema,
                     scan.has_header,
@@ -165,10 +165,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for 
&protobuf::PhysicalPlanNode {
 
                 Ok(Arc::new(AvroExec::new(
                     Arc::new(LocalFileSystem {}),
-                    scan.files
+                    scan.file_groups
                         .iter()
-                        .map(|f| f.into())
-                        .collect::<Vec<PartitionedFile>>(),
+                        .map(|p| p.into())
+                        .collect::<Vec<Vec<PartitionedFile>>>(),
                     statistics,
                     schema,
                     Some(projection),
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index e5e6347..52285ee 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -244,14 +244,15 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn 
ExecutionPlan> {
                 ))),
             })
         } else if let Some(exec) = plan.downcast_ref::<CsvExec>() {
+            let file_groups = exec
+                .file_groups()
+                .iter()
+                .map(|p| p.as_slice().into())
+                .collect();
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::CsvScan(
                     protobuf::CsvScanExecNode {
-                        files: exec
-                            .files()
-                            .iter()
-                            .map(|f| f.into())
-                            .collect::<Vec<protobuf::PartitionedFile>>(),
+                        file_groups,
                         statistics: Some((&exec.statistics()).into()),
                         limit: exec
                             .limit()
@@ -301,14 +302,15 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn 
ExecutionPlan> {
                 )),
             })
         } else if let Some(exec) = plan.downcast_ref::<AvroExec>() {
+            let file_groups = exec
+                .file_groups()
+                .iter()
+                .map(|p| p.as_slice().into())
+                .collect();
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::AvroScan(
                     protobuf::AvroScanExecNode {
-                        files: exec
-                            .files()
-                            .iter()
-                            .map(|f| f.into())
-                            .collect::<Vec<protobuf::PartitionedFile>>(),
+                        file_groups,
                         statistics: Some((&exec.statistics()).into()),
                         limit: exec
                             .limit()
diff --git a/datafusion/src/datasource/file_format/avro.rs 
b/datafusion/src/datasource/file_format/avro.rs
index 7728747..c632696 100644
--- a/datafusion/src/datasource/file_format/avro.rs
+++ b/datafusion/src/datasource/file_format/avro.rs
@@ -64,8 +64,7 @@ impl FileFormat for AvroFormat {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let exec = AvroExec::new(
             conf.object_store,
-            // flattening this for now because CsvExec does not support 
partitioning yet
-            conf.files.into_iter().flatten().collect::<Vec<_>>(),
+            conf.files,
             conf.statistics,
             conf.schema,
             conf.projection,
diff --git a/datafusion/src/datasource/file_format/csv.rs 
b/datafusion/src/datasource/file_format/csv.rs
index 4d75c65..f995994 100644
--- a/datafusion/src/datasource/file_format/csv.rs
+++ b/datafusion/src/datasource/file_format/csv.rs
@@ -126,8 +126,7 @@ impl FileFormat for CsvFormat {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let exec = CsvExec::new(
             conf.object_store,
-            // flattening this for now because CsvExec does not support 
partitioning yet
-            conf.files.into_iter().flatten().collect::<Vec<_>>(),
+            conf.files,
             conf.statistics,
             conf.schema,
             self.has_header,
diff --git a/datafusion/src/datasource/file_format/json.rs 
b/datafusion/src/datasource/file_format/json.rs
index 2741da3..a579831 100644
--- a/datafusion/src/datasource/file_format/json.rs
+++ b/datafusion/src/datasource/file_format/json.rs
@@ -96,8 +96,7 @@ impl FileFormat for JsonFormat {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let exec = NdJsonExec::new(
             conf.object_store,
-            // flattening this for now because NdJsonExec does not support 
partitioning yet
-            conf.files.into_iter().flatten().collect::<Vec<_>>(),
+            conf.files,
             conf.statistics,
             conf.schema,
             conf.projection,
diff --git a/datafusion/src/physical_plan/file_format/avro.rs 
b/datafusion/src/physical_plan/file_format/avro.rs
index 0a57f8b..2420040 100644
--- a/datafusion/src/physical_plan/file_format/avro.rs
+++ b/datafusion/src/physical_plan/file_format/avro.rs
@@ -16,36 +16,32 @@
 // under the License.
 
 //! Execution plan for reading line-delimited Avro files
+#[cfg(feature = "avro")]
+use crate::avro_to_arrow;
 use crate::datasource::object_store::ObjectStore;
 use crate::datasource::PartitionedFile;
 use crate::error::{DataFusionError, Result};
-#[cfg(feature = "avro")]
-use crate::physical_plan::RecordBatchStream;
 use crate::physical_plan::{
     DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, 
Statistics,
 };
 use arrow::datatypes::{Schema, SchemaRef};
 #[cfg(feature = "avro")]
-use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use arrow::error::ArrowError;
+
 use async_trait::async_trait;
-#[cfg(feature = "avro")]
-use futures::Stream;
 use std::any::Any;
 use std::sync::Arc;
+
 #[cfg(feature = "avro")]
-use std::{
-    io::Read,
-    pin::Pin,
-    task::{Context, Poll},
-};
+use super::file_stream::{BatchIter, FileStream};
 
 /// Execution plan for scanning Avro data source
 #[derive(Debug, Clone)]
 pub struct AvroExec {
     object_store: Arc<dyn ObjectStore>,
-    files: Vec<PartitionedFile>,
+    file_groups: Vec<Vec<PartitionedFile>>,
     statistics: Statistics,
-    schema: SchemaRef,
+    file_schema: SchemaRef,
     projection: Option<Vec<usize>>,
     projected_schema: SchemaRef,
     batch_size: usize,
@@ -53,29 +49,28 @@ pub struct AvroExec {
 }
 
 impl AvroExec {
-    /// Create a new JSON reader execution plan provided file list and schema
-    /// TODO: support partitiond file list (Vec<Vec<PartitionedFile>>)
+    /// Create a new Avro reader execution plan provided file list and schema
     pub fn new(
         object_store: Arc<dyn ObjectStore>,
-        files: Vec<PartitionedFile>,
+        file_groups: Vec<Vec<PartitionedFile>>,
         statistics: Statistics,
-        schema: SchemaRef,
+        file_schema: SchemaRef,
         projection: Option<Vec<usize>>,
         batch_size: usize,
         limit: Option<usize>,
     ) -> Self {
         let projected_schema = match &projection {
-            None => Arc::clone(&schema),
+            None => Arc::clone(&file_schema),
             Some(p) => Arc::new(Schema::new(
-                p.iter().map(|i| schema.field(*i).clone()).collect(),
+                p.iter().map(|i| file_schema.field(*i).clone()).collect(),
             )),
         };
 
         Self {
             object_store,
-            files,
+            file_groups,
             statistics,
-            schema,
+            file_schema,
             projection,
             projected_schema,
             batch_size,
@@ -83,12 +78,12 @@ impl AvroExec {
         }
     }
     /// List of data files
-    pub fn files(&self) -> &[PartitionedFile] {
-        &self.files
+    pub fn file_groups(&self) -> &[Vec<PartitionedFile>] {
+        &self.file_groups
     }
     /// The schema before projection
     pub fn file_schema(&self) -> &SchemaRef {
-        &self.schema
+        &self.file_schema
     }
     /// Optional projection for which columns to load
     pub fn projection(&self) -> &Option<Vec<usize>> {
@@ -115,7 +110,7 @@ impl ExecutionPlan for AvroExec {
     }
 
     fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(self.files.len())
+        Partitioning::UnknownPartitioning(self.file_groups.len())
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -145,26 +140,39 @@ impl ExecutionPlan for AvroExec {
 
     #[cfg(feature = "avro")]
     async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
-        let file = self
-            .object_store
-            .file_reader(self.files[partition].file_meta.sized_file.clone())?
-            .sync_reader()?;
-
         let proj = self.projection.as_ref().map(|p| {
             p.iter()
-                .map(|col_idx| self.schema.field(*col_idx).name())
+                .map(|col_idx| self.file_schema.field(*col_idx).name())
                 .cloned()
                 .collect()
         });
 
-        let avro_reader = crate::avro_to_arrow::Reader::try_new(
-            file,
-            self.schema(),
-            self.batch_size,
-            proj,
-        )?;
+        let batch_size = self.batch_size;
+        let file_schema = Arc::clone(&self.file_schema);
+
+        // The avro reader cannot limit the number of records, so `remaining` 
is ignored.
+        let fun = move |file, _remaining: &Option<usize>| {
+            let reader_res = avro_to_arrow::Reader::try_new(
+                file,
+                Arc::clone(&file_schema),
+                batch_size,
+                proj.clone(),
+            );
+            match reader_res {
+                Ok(r) => Box::new(r) as BatchIter,
+                Err(e) => Box::new(
+                    
vec![Err(ArrowError::ExternalError(Box::new(e)))].into_iter(),
+                ),
+            }
+        };
 
-        Ok(Box::pin(AvroStream::new(avro_reader, self.limit)))
+        Ok(Box::pin(FileStream::new(
+            Arc::clone(&self.object_store),
+            self.file_groups[partition].clone(),
+            fun,
+            Arc::clone(&self.projected_schema),
+            self.limit,
+        )))
     }
 
     fn fmt_as(
@@ -176,12 +184,8 @@ impl ExecutionPlan for AvroExec {
             DisplayFormatType::Default => {
                 write!(
                     f,
-                    "AvroExec: files=[{}], batch_size={}, limit={:?}",
-                    self.files
-                        .iter()
-                        .map(|f| f.file_meta.path())
-                        .collect::<Vec<_>>()
-                        .join(", "),
+                    "AvroExec: files={}, batch_size={}, limit={:?}",
+                    super::FileGroupsDisplay(&self.file_groups),
                     self.batch_size,
                     self.limit,
                 )
@@ -194,70 +198,6 @@ impl ExecutionPlan for AvroExec {
     }
 }
 
-#[cfg(feature = "avro")]
-struct AvroStream<'a, R: Read> {
-    reader: crate::avro_to_arrow::Reader<'a, R>,
-    remain: Option<usize>,
-}
-
-#[cfg(feature = "avro")]
-impl<'a, R: Read> AvroStream<'a, R> {
-    fn new(reader: crate::avro_to_arrow::Reader<'a, R>, limit: Option<usize>) 
-> Self {
-        Self {
-            reader,
-            remain: limit,
-        }
-    }
-}
-
-#[cfg(feature = "avro")]
-impl<R: Read + Unpin> Stream for AvroStream<'_, R> {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: Pin<&mut Self>,
-        _cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        if let Some(remain) = self.remain.as_mut() {
-            if *remain < 1 {
-                return Poll::Ready(None);
-            }
-        }
-
-        Poll::Ready(match self.reader.next() {
-            Ok(Some(item)) => {
-                if let Some(remain) = self.remain.as_mut() {
-                    if *remain >= item.num_rows() {
-                        *remain -= item.num_rows();
-                        Some(Ok(item))
-                    } else {
-                        let len = *remain;
-                        *remain = 0;
-                        Some(Ok(RecordBatch::try_new(
-                            item.schema(),
-                            item.columns()
-                                .iter()
-                                .map(|column| column.slice(0, len))
-                                .collect(),
-                        )?))
-                    }
-                } else {
-                    Some(Ok(item))
-                }
-            }
-            Ok(None) => None,
-            Err(err) => Some(Err(err)),
-        })
-    }
-}
-
-#[cfg(feature = "avro")]
-impl<R: Read + Unpin> RecordBatchStream for AvroStream<'_, R> {
-    fn schema(&self) -> SchemaRef {
-        self.reader.schema()
-    }
-}
-
 #[cfg(test)]
 #[cfg(feature = "avro")]
 mod tests {
@@ -278,9 +218,9 @@ mod tests {
         let filename = format!("{}/avro/alltypes_plain.avro", testdata);
         let avro_exec = AvroExec::new(
             Arc::new(LocalFileSystem {}),
-            vec![PartitionedFile {
+            vec![vec![PartitionedFile {
                 file_meta: local_file_meta(filename.clone()),
-            }],
+            }]],
             Statistics::default(),
             AvroFormat {}
                 .infer_schema(local_object_reader_stream(vec![filename]))
diff --git a/datafusion/src/physical_plan/file_format/csv.rs 
b/datafusion/src/physical_plan/file_format/csv.rs
index 329dd6b..fc82c8f 100644
--- a/datafusion/src/physical_plan/file_format/csv.rs
+++ b/datafusion/src/physical_plan/file_format/csv.rs
@@ -21,29 +21,25 @@ use crate::datasource::object_store::ObjectStore;
 use crate::datasource::PartitionedFile;
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
-    SendableRecordBatchStream, Statistics,
+    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, 
Statistics,
 };
+
 use arrow::csv;
 use arrow::datatypes::{Schema, SchemaRef};
-use arrow::error::Result as ArrowResult;
-use arrow::record_batch::RecordBatch;
-use futures::Stream;
 use std::any::Any;
-use std::io::Read;
-use std::pin::Pin;
 use std::sync::Arc;
-use std::task::{Context, Poll};
 
 use async_trait::async_trait;
 
+use super::file_stream::{BatchIter, FileStream};
+
 /// Execution plan for scanning a CSV file
 #[derive(Debug, Clone)]
 pub struct CsvExec {
     object_store: Arc<dyn ObjectStore>,
-    files: Vec<PartitionedFile>,
+    file_groups: Vec<Vec<PartitionedFile>>,
     /// Schema representing the CSV file
-    schema: SchemaRef,
+    file_schema: SchemaRef,
     /// Schema after the projection has been applied
     projected_schema: SchemaRef,
     statistics: Statistics,
@@ -56,13 +52,12 @@ pub struct CsvExec {
 
 impl CsvExec {
     /// Create a new CSV reader execution plan provided file list and schema
-    /// TODO: support partitiond file list (Vec<Vec<PartitionedFile>>)
     #[allow(clippy::too_many_arguments)]
     pub fn new(
         object_store: Arc<dyn ObjectStore>,
-        files: Vec<PartitionedFile>,
+        file_groups: Vec<Vec<PartitionedFile>>,
         statistics: Statistics,
-        schema: SchemaRef,
+        file_schema: SchemaRef,
         has_header: bool,
         delimiter: u8,
         projection: Option<Vec<usize>>,
@@ -70,16 +65,16 @@ impl CsvExec {
         limit: Option<usize>,
     ) -> Self {
         let projected_schema = match &projection {
-            None => Arc::clone(&schema),
+            None => Arc::clone(&file_schema),
             Some(p) => Arc::new(Schema::new(
-                p.iter().map(|i| schema.field(*i).clone()).collect(),
+                p.iter().map(|i| file_schema.field(*i).clone()).collect(),
             )),
         };
 
         Self {
             object_store,
-            files,
-            schema,
+            file_groups,
+            file_schema,
             statistics,
             has_header,
             delimiter,
@@ -91,12 +86,12 @@ impl CsvExec {
     }
 
     /// List of data files
-    pub fn files(&self) -> &[PartitionedFile] {
-        &self.files
+    pub fn file_groups(&self) -> &[Vec<PartitionedFile>] {
+        &self.file_groups
     }
     /// The schema before projection
     pub fn file_schema(&self) -> &SchemaRef {
-        &self.schema
+        &self.file_schema
     }
     /// true if the first line of each file is a header
     pub fn has_header(&self) -> bool {
@@ -134,7 +129,7 @@ impl ExecutionPlan for CsvExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(self.files.len())
+        Partitioning::UnknownPartitioning(self.file_groups.len())
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -157,20 +152,33 @@ impl ExecutionPlan for CsvExec {
     }
 
     async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
-        let file = self
-            .object_store
-            .file_reader(self.files[partition].file_meta.sized_file.clone())?
-            .sync_reader()?;
+        let batch_size = self.batch_size;
+        let file_schema = Arc::clone(&self.file_schema);
+        let projection = self.projection.clone();
+        let has_header = self.has_header;
+        let delimiter = self.delimiter;
+        let start_line = if has_header { 1 } else { 0 };
 
-        Ok(Box::pin(CsvStream::try_new_from_reader(
-            file,
-            self.schema.clone(),
-            self.has_header,
-            self.delimiter,
-            &self.projection,
-            self.batch_size,
+        let fun = move |file, remaining: &Option<usize>| {
+            let bounds = remaining.map(|x| (0, x + start_line));
+            Box::new(csv::Reader::new(
+                file,
+                Arc::clone(&file_schema),
+                has_header,
+                Some(delimiter),
+                batch_size,
+                bounds,
+                projection.clone(),
+            )) as BatchIter
+        };
+
+        Ok(Box::pin(FileStream::new(
+            Arc::clone(&self.object_store),
+            self.file_groups[partition].clone(),
+            fun,
+            Arc::clone(&self.projected_schema),
             self.limit,
-        )?))
+        )))
     }
 
     fn fmt_as(
@@ -182,12 +190,8 @@ impl ExecutionPlan for CsvExec {
             DisplayFormatType::Default => {
                 write!(
                     f,
-                    "CsvExec: files=[{}], has_header={}, batch_size={}, 
limit={:?}",
-                    self.files
-                        .iter()
-                        .map(|f| f.file_meta.path())
-                        .collect::<Vec<_>>()
-                        .join(", "),
+                    "CsvExec: files={}, has_header={}, batch_size={}, 
limit={:?}",
+                    super::FileGroupsDisplay(&self.file_groups),
                     self.has_header,
                     self.batch_size,
                     self.limit,
@@ -201,58 +205,6 @@ impl ExecutionPlan for CsvExec {
     }
 }
 
-/// Iterator over batches
-struct CsvStream<R: Read> {
-    /// Arrow CSV reader
-    reader: csv::Reader<R>,
-}
-
-impl<R: Read> CsvStream<R> {
-    /// Create an iterator for a reader
-    pub fn try_new_from_reader(
-        reader: R,
-        schema: SchemaRef,
-        has_header: bool,
-        delimiter: u8,
-        projection: &Option<Vec<usize>>,
-        batch_size: usize,
-        limit: Option<usize>,
-    ) -> Result<CsvStream<R>> {
-        let start_line = if has_header { 1 } else { 0 };
-        let bounds = limit.map(|x| (0, x + start_line));
-
-        let reader = csv::Reader::new(
-            reader,
-            schema,
-            has_header,
-            Some(delimiter),
-            batch_size,
-            bounds,
-            projection.clone(),
-        );
-
-        Ok(Self { reader })
-    }
-}
-
-impl<R: Read + Unpin> Stream for CsvStream<R> {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: Pin<&mut Self>,
-        _: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        Poll::Ready(self.reader.next())
-    }
-}
-
-impl<R: Read + Unpin> RecordBatchStream for CsvStream<R> {
-    /// Get the schema
-    fn schema(&self) -> SchemaRef {
-        self.reader.schema()
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -270,9 +222,9 @@ mod tests {
         let path = format!("{}/csv/{}", testdata, filename);
         let csv = CsvExec::new(
             Arc::new(LocalFileSystem {}),
-            vec![PartitionedFile {
+            vec![vec![PartitionedFile {
                 file_meta: local_file_meta(path),
-            }],
+            }]],
             Statistics::default(),
             schema,
             true,
@@ -281,7 +233,7 @@ mod tests {
             1024,
             None,
         );
-        assert_eq!(13, csv.schema.fields().len());
+        assert_eq!(13, csv.file_schema.fields().len());
         assert_eq!(3, csv.projected_schema.fields().len());
         assert_eq!(3, csv.schema().fields().len());
         let mut stream = csv.execute(0).await?;
@@ -303,9 +255,9 @@ mod tests {
         let path = format!("{}/csv/{}", testdata, filename);
         let csv = CsvExec::new(
             Arc::new(LocalFileSystem {}),
-            vec![PartitionedFile {
+            vec![vec![PartitionedFile {
                 file_meta: local_file_meta(path),
-            }],
+            }]],
             Statistics::default(),
             schema,
             true,
@@ -314,7 +266,7 @@ mod tests {
             1024,
             None,
         );
-        assert_eq!(13, csv.schema.fields().len());
+        assert_eq!(13, csv.file_schema.fields().len());
         assert_eq!(13, csv.projected_schema.fields().len());
         assert_eq!(13, csv.schema().fields().len());
         let mut it = csv.execute(0).await?;
diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs 
b/datafusion/src/physical_plan/file_format/file_stream.rs
new file mode 100644
index 0000000..55a66f4
--- /dev/null
+++ b/datafusion/src/physical_plan/file_format/file_stream.rs
@@ -0,0 +1,265 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A generic stream over file format readers that can be used by
+//! any file format that read its files from start to end.
+//!
+//! Note: Most traits here need to be marked `Sync + Send` to be
+//! compliant with the `SendableRecordBatchStream` trait.
+
+use crate::{
+    datasource::{object_store::ObjectStore, PartitionedFile},
+    error::Result as DataFusionResult,
+    physical_plan::RecordBatchStream,
+};
+use arrow::{
+    datatypes::SchemaRef,
+    error::{ArrowError, Result as ArrowResult},
+    record_batch::RecordBatch,
+};
+use futures::Stream;
+use std::{
+    io::Read,
+    iter,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+pub type FileIter =
+    Box<dyn Iterator<Item = DataFusionResult<Box<dyn Read + Send + Sync>>> + 
Send + Sync>;
+pub type BatchIter = Box<dyn Iterator<Item = ArrowResult<RecordBatch>> + Send 
+ Sync>;
+
+/// A closure that creates a file format reader (iterator over `RecordBatch`) 
from a `Read` object
+/// and an optional number of required records.
+pub trait FormatReaderOpener:
+    FnMut(Box<dyn Read + Send + Sync>, &Option<usize>) -> BatchIter + Send + 
Unpin + 'static
+{
+}
+
+impl<T> FormatReaderOpener for T where
+    T: FnMut(Box<dyn Read + Send + Sync>, &Option<usize>) -> BatchIter
+        + Send
+        + Unpin
+        + 'static
+{
+}
+
+/// A stream that iterates record batch by record batch, file over file.
+pub struct FileStream<F: FormatReaderOpener> {
+    /// An iterator over record batches of the last file returned by file_iter
+    batch_iter: BatchIter,
+    /// An iterator over input files
+    file_iter: FileIter,
+    /// The stream schema (file schema after projection)
+    schema: SchemaRef,
+    /// The remaining number of records to parse, None if no limit
+    remain: Option<usize>,
+    /// A closure that takes a reader and an optional remaining number of lines
+    /// (before reaching the limit) and returns a batch iterator. If the file 
reader
+    /// is not capable of limiting the number of records in the last batch, 
the file
+    /// stream will take care of truncating it.
+    file_reader: F,
+}
+
+impl<F: FormatReaderOpener> FileStream<F> {
+    pub fn new(
+        object_store: Arc<dyn ObjectStore>,
+        files: Vec<PartitionedFile>,
+        file_reader: F,
+        schema: SchemaRef,
+        limit: Option<usize>,
+    ) -> Self {
+        let read_iter = files.into_iter().map(move |f| -> DataFusionResult<_> {
+            object_store
+                .file_reader(f.file_meta.sized_file)?
+                .sync_reader()
+        });
+
+        Self {
+            file_iter: Box::new(read_iter),
+            batch_iter: Box::new(iter::empty()),
+            remain: limit,
+            schema,
+            file_reader,
+        }
+    }
+
+    /// Acts as a flat_map of record batches over files.
+    fn next_batch(&mut self) -> Option<ArrowResult<RecordBatch>> {
+        match self.batch_iter.next() {
+            Some(batch) => Some(batch),
+            None => match self.file_iter.next() {
+                Some(Ok(f)) => {
+                    self.batch_iter = (self.file_reader)(f, &self.remain);
+                    self.next_batch()
+                }
+                Some(Err(e)) => 
Some(Err(ArrowError::ExternalError(Box::new(e)))),
+                None => None,
+            },
+        }
+    }
+}
+
+impl<F: FormatReaderOpener> Stream for FileStream<F> {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        // check if finished or no limit
+        match self.remain {
+            Some(r) if r == 0 => return Poll::Ready(None),
+            None => return Poll::Ready(self.get_mut().next_batch()),
+            Some(r) => r,
+        };
+
+        Poll::Ready(match self.as_mut().next_batch() {
+            Some(Ok(item)) => {
+                if let Some(remain) = self.remain.as_mut() {
+                    if *remain >= item.num_rows() {
+                        *remain -= item.num_rows();
+                        Some(Ok(item))
+                    } else {
+                        let len = *remain;
+                        *remain = 0;
+                        Some(Ok(RecordBatch::try_new(
+                            item.schema(),
+                            item.columns()
+                                .iter()
+                                .map(|column| column.slice(0, len))
+                                .collect(),
+                        )?))
+                    }
+                } else {
+                    Some(Ok(item))
+                }
+            }
+            other => other,
+        })
+    }
+}
+
+impl<F: FormatReaderOpener> RecordBatchStream for FileStream<F> {
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use futures::StreamExt;
+
+    use super::*;
+    use crate::{
+        error::Result,
+        test::{make_partition, object_store::TestObjectStore},
+    };
+
+    /// helper that creates a stream of 2 files with the same pair of batches 
in each ([0,1,2] and [0,1])
+    async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
+        let records = vec![make_partition(3), make_partition(2)];
+
+        let source_schema = records[0].schema();
+
+        let reader = move |_file, _remain: &Option<usize>| {
+            // this reader returns the same batch regardless of the file
+            Box::new(records.clone().into_iter().map(Ok)) as BatchIter
+        };
+
+        let file_stream = FileStream::new(
+            TestObjectStore::new_arc(&[("mock_file1", 10), ("mock_file2", 
20)]),
+            vec![
+                PartitionedFile::new("mock_file1".to_owned(), 10),
+                PartitionedFile::new("mock_file2".to_owned(), 20),
+            ],
+            reader,
+            source_schema,
+            limit,
+        );
+
+        file_stream
+            .map(|b| b.expect("No error expected in stream"))
+            .collect::<Vec<_>>()
+            .await
+    }
+
+    #[tokio::test]
+    async fn without_limit() -> Result<()> {
+        let batches = create_and_collect(None).await;
+
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn with_limit_between_files() -> Result<()> {
+        let batches = create_and_collect(Some(5)).await;
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "+---+",
+        ], &batches);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn with_limit_at_middle_of_batch() -> Result<()> {
+        let batches = create_and_collect(Some(6)).await;
+        #[rustfmt::skip]
+        crate::assert_batches_eq!(&[
+            "+---+",
+            "| i |",
+            "+---+",
+            "| 0 |",
+            "| 1 |",
+            "| 2 |",
+            "| 0 |",
+            "| 1 |",
+            "| 0 |",
+            "+---+",
+        ], &batches);
+
+        Ok(())
+    }
+}
diff --git a/datafusion/src/physical_plan/file_format/json.rs 
b/datafusion/src/physical_plan/file_format/json.rs
index 068e53a..f9dde67 100644
--- a/datafusion/src/physical_plan/file_format/json.rs
+++ b/datafusion/src/physical_plan/file_format/json.rs
@@ -17,36 +17,29 @@
 
 //! Execution plan for reading line-delimited JSON files
 use async_trait::async_trait;
-use futures::Stream;
 
 use crate::datasource::object_store::ObjectStore;
 use crate::datasource::PartitionedFile;
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
-    SendableRecordBatchStream, Statistics,
+    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, 
Statistics,
 };
 use arrow::{
     datatypes::{Schema, SchemaRef},
-    error::Result as ArrowResult,
     json,
-    record_batch::RecordBatch,
 };
 use std::any::Any;
-use std::{
-    io::Read,
-    pin::Pin,
-    sync::Arc,
-    task::{Context, Poll},
-};
+use std::sync::Arc;
+
+use super::file_stream::{BatchIter, FileStream};
 
 /// Execution plan for scanning NdJson data source
 #[derive(Debug, Clone)]
 pub struct NdJsonExec {
     object_store: Arc<dyn ObjectStore>,
-    files: Vec<PartitionedFile>,
+    file_groups: Vec<Vec<PartitionedFile>>,
     statistics: Statistics,
-    schema: SchemaRef,
+    file_schema: SchemaRef,
     projection: Option<Vec<usize>>,
     projected_schema: SchemaRef,
     batch_size: usize,
@@ -55,28 +48,27 @@ pub struct NdJsonExec {
 
 impl NdJsonExec {
     /// Create a new JSON reader execution plan provided file list and schema
-    /// TODO: support partitiond file list (Vec<Vec<PartitionedFile>>)
     pub fn new(
         object_store: Arc<dyn ObjectStore>,
-        files: Vec<PartitionedFile>,
+        file_groups: Vec<Vec<PartitionedFile>>,
         statistics: Statistics,
-        schema: SchemaRef,
+        file_schema: SchemaRef,
         projection: Option<Vec<usize>>,
         batch_size: usize,
         limit: Option<usize>,
     ) -> Self {
         let projected_schema = match &projection {
-            None => Arc::clone(&schema),
+            None => Arc::clone(&file_schema),
             Some(p) => Arc::new(Schema::new(
-                p.iter().map(|i| schema.field(*i).clone()).collect(),
+                p.iter().map(|i| file_schema.field(*i).clone()).collect(),
             )),
         };
 
         Self {
             object_store,
-            files,
+            file_groups,
             statistics,
-            schema,
+            file_schema,
             projection,
             projected_schema,
             batch_size,
@@ -96,7 +88,7 @@ impl ExecutionPlan for NdJsonExec {
     }
 
     fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(self.files.len())
+        Partitioning::UnknownPartitioning(self.file_groups.len())
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -120,19 +112,31 @@ impl ExecutionPlan for NdJsonExec {
     async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
         let proj = self.projection.as_ref().map(|p| {
             p.iter()
-                .map(|col_idx| self.schema.field(*col_idx).name())
+                .map(|col_idx| self.file_schema.field(*col_idx).name())
                 .cloned()
                 .collect()
         });
 
-        let file = self
-            .object_store
-            .file_reader(self.files[partition].file_meta.sized_file.clone())?
-            .sync_reader()?;
-
-        let json_reader = json::Reader::new(file, self.schema(), 
self.batch_size, proj);
+        let batch_size = self.batch_size;
+        let file_schema = Arc::clone(&self.file_schema);
+
+        // The json reader cannot limit the number of records, so `remaining` 
is ignored.
+        let fun = move |file, _remaining: &Option<usize>| {
+            Box::new(json::Reader::new(
+                file,
+                Arc::clone(&file_schema),
+                batch_size,
+                proj.clone(),
+            )) as BatchIter
+        };
 
-        Ok(Box::pin(NdJsonStream::new(json_reader, self.limit)))
+        Ok(Box::pin(FileStream::new(
+            Arc::clone(&self.object_store),
+            self.file_groups[partition].clone(),
+            fun,
+            Arc::clone(&self.projected_schema),
+            self.limit,
+        )))
     }
 
     fn fmt_as(
@@ -144,14 +148,10 @@ impl ExecutionPlan for NdJsonExec {
             DisplayFormatType::Default => {
                 write!(
                     f,
-                    "JsonExec: batch_size={}, limit={:?}, files=[{}]",
+                    "JsonExec: batch_size={}, limit={:?}, files={}",
                     self.batch_size,
                     self.limit,
-                    self.files
-                        .iter()
-                        .map(|f| f.file_meta.path())
-                        .collect::<Vec<_>>()
-                        .join(", ")
+                    super::FileGroupsDisplay(&self.file_groups),
                 )
             }
         }
@@ -162,66 +162,6 @@ impl ExecutionPlan for NdJsonExec {
     }
 }
 
-struct NdJsonStream<R: Read> {
-    reader: json::Reader<R>,
-    remain: Option<usize>,
-}
-
-impl<R: Read> NdJsonStream<R> {
-    fn new(reader: json::Reader<R>, limit: Option<usize>) -> Self {
-        Self {
-            reader,
-            remain: limit,
-        }
-    }
-}
-
-impl<R: Read + Unpin> Stream for NdJsonStream<R> {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: Pin<&mut Self>,
-        _cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        if let Some(remain) = self.remain.as_mut() {
-            if *remain < 1 {
-                return Poll::Ready(None);
-            }
-        }
-
-        Poll::Ready(match self.reader.next() {
-            Ok(Some(item)) => {
-                if let Some(remain) = self.remain.as_mut() {
-                    if *remain >= item.num_rows() {
-                        *remain -= item.num_rows();
-                        Some(Ok(item))
-                    } else {
-                        let len = *remain;
-                        *remain = 0;
-                        Some(Ok(RecordBatch::try_new(
-                            item.schema(),
-                            item.columns()
-                                .iter()
-                                .map(|column| column.slice(0, len))
-                                .collect(),
-                        )?))
-                    }
-                } else {
-                    Some(Ok(item))
-                }
-            }
-            Ok(None) => None,
-            Err(err) => Some(Err(err)),
-        })
-    }
-}
-
-impl<R: Read + Unpin> RecordBatchStream for NdJsonStream<R> {
-    fn schema(&self) -> SchemaRef {
-        self.reader.schema()
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use futures::StreamExt;
@@ -249,9 +189,9 @@ mod tests {
         let path = format!("{}/1.json", TEST_DATA_BASE);
         let exec = NdJsonExec::new(
             Arc::new(LocalFileSystem {}),
-            vec![PartitionedFile {
+            vec![vec![PartitionedFile {
                 file_meta: local_file_meta(path.clone()),
-            }],
+            }]],
             Default::default(),
             infer_schema(path).await?,
             None,
@@ -304,9 +244,9 @@ mod tests {
         let path = format!("{}/1.json", TEST_DATA_BASE);
         let exec = NdJsonExec::new(
             Arc::new(LocalFileSystem {}),
-            vec![PartitionedFile {
+            vec![vec![PartitionedFile {
                 file_meta: local_file_meta(path.clone()),
-            }],
+            }]],
             Default::default(),
             infer_schema(path).await?,
             Some(vec![0, 2]),
diff --git a/datafusion/src/physical_plan/file_format/mod.rs 
b/datafusion/src/physical_plan/file_format/mod.rs
index aa4f116..b0b6905 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -19,6 +19,7 @@
 
 mod avro;
 mod csv;
+mod file_stream;
 mod json;
 mod parquet;
 
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index 4ad4722..f673eb0 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -57,7 +57,7 @@ pub fn create_table_dual() -> Arc<dyn TableProvider> {
 pub fn create_partitioned_csv(
     filename: &str,
     partitions: usize,
-) -> Result<(String, Vec<PartitionedFile>)> {
+) -> Result<(String, Vec<Vec<PartitionedFile>>)> {
     let testdata = crate::test_util::arrow_test_data();
     let path = format!("{}/csv/{}", testdata, filename);
 
@@ -96,15 +96,16 @@ pub fn create_partitioned_csv(
         w.flush().unwrap();
     }
 
-    Ok((
-        tmp_dir.into_path().to_str().unwrap().to_string(),
-        files
-            .into_iter()
-            .map(|f| PartitionedFile {
+    let groups = files
+        .into_iter()
+        .map(|f| {
+            vec![PartitionedFile {
                 file_meta: local_file_meta(f.to_str().unwrap().to_owned()),
-            })
-            .collect(),
-    ))
+            }]
+        })
+        .collect::<Vec<_>>();
+
+    Ok((tmp_dir.into_path().to_str().unwrap().to_string(), groups))
 }
 
 /// Get the schema for the aggregate_test_* csv files

Reply via email to