This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push: new 4b577f3 Multiple files per partitions for CSV Avro Json (#1138) 4b577f3 is described below commit 4b577f374ce0922f61608be25d8d91c59a65c2cf Author: rdettai <rdet...@gmail.com> AuthorDate: Wed Oct 20 12:54:14 2021 +0200 Multiple files per partitions for CSV Avro Json (#1138) * [feat] multi file partition for csv avro json * [fix] typos * [fix] aliasing closure trait --- ballista/rust/core/proto/ballista.proto | 4 +- .../core/src/serde/physical_plan/from_proto.rs | 12 +- .../rust/core/src/serde/physical_plan/to_proto.rs | 22 +- datafusion/src/datasource/file_format/avro.rs | 3 +- datafusion/src/datasource/file_format/csv.rs | 3 +- datafusion/src/datasource/file_format/json.rs | 3 +- datafusion/src/physical_plan/file_format/avro.rs | 158 ++++-------- datafusion/src/physical_plan/file_format/csv.rs | 146 ++++-------- .../src/physical_plan/file_format/file_stream.rs | 265 +++++++++++++++++++++ datafusion/src/physical_plan/file_format/json.rs | 138 +++-------- datafusion/src/physical_plan/file_format/mod.rs | 1 + datafusion/src/test/mod.rs | 19 +- 12 files changed, 436 insertions(+), 338 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 49b65cf..338c5a6 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -615,7 +615,7 @@ message ParquetScanExecNode { } message CsvScanExecNode { - repeated PartitionedFile files = 1; + repeated FileGroup file_groups = 1; Schema schema = 2; bool has_header = 3; uint32 batch_size = 4; @@ -626,7 +626,7 @@ message CsvScanExecNode { } message AvroScanExecNode { - repeated PartitionedFile files = 1; + repeated FileGroup file_groups = 1; Schema schema = 2; uint32 batch_size = 4; repeated uint32 projection = 6; diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 75dd915..dce354a 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -125,10 +125,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode { Ok(Arc::new(CsvExec::new( Arc::new(LocalFileSystem {}), - scan.files + scan.file_groups .iter() - .map(|f| f.into()) - .collect::<Vec<PartitionedFile>>(), + .map(|p| p.into()) + .collect::<Vec<Vec<PartitionedFile>>>(), statistics, schema, scan.has_header, @@ -165,10 +165,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode { Ok(Arc::new(AvroExec::new( Arc::new(LocalFileSystem {}), - scan.files + scan.file_groups .iter() - .map(|f| f.into()) - .collect::<Vec<PartitionedFile>>(), + .map(|p| p.into()) + .collect::<Vec<Vec<PartitionedFile>>>(), statistics, schema, Some(projection), diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index e5e6347..52285ee 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -244,14 +244,15 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> { ))), }) } else if let Some(exec) = plan.downcast_ref::<CsvExec>() { + let file_groups = exec + .file_groups() + .iter() + .map(|p| p.as_slice().into()) + .collect(); Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::CsvScan( protobuf::CsvScanExecNode { - files: exec - .files() - .iter() - .map(|f| f.into()) - .collect::<Vec<protobuf::PartitionedFile>>(), + file_groups, statistics: Some((&exec.statistics()).into()), limit: exec .limit() @@ -301,14 +302,15 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> { )), }) } else if let Some(exec) = plan.downcast_ref::<AvroExec>() { + let file_groups = exec + .file_groups() + .iter() + .map(|p| p.as_slice().into()) + .collect(); Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::AvroScan( protobuf::AvroScanExecNode { - files: exec - .files() - .iter() - .map(|f| f.into()) - .collect::<Vec<protobuf::PartitionedFile>>(), + file_groups, statistics: Some((&exec.statistics()).into()), limit: exec .limit() diff --git a/datafusion/src/datasource/file_format/avro.rs b/datafusion/src/datasource/file_format/avro.rs index 7728747..c632696 100644 --- a/datafusion/src/datasource/file_format/avro.rs +++ b/datafusion/src/datasource/file_format/avro.rs @@ -64,8 +64,7 @@ impl FileFormat for AvroFormat { ) -> Result<Arc<dyn ExecutionPlan>> { let exec = AvroExec::new( conf.object_store, - // flattening this for now because CsvExec does not support partitioning yet - conf.files.into_iter().flatten().collect::<Vec<_>>(), + conf.files, conf.statistics, conf.schema, conf.projection, diff --git a/datafusion/src/datasource/file_format/csv.rs b/datafusion/src/datasource/file_format/csv.rs index 4d75c65..f995994 100644 --- a/datafusion/src/datasource/file_format/csv.rs +++ b/datafusion/src/datasource/file_format/csv.rs @@ -126,8 +126,7 @@ impl FileFormat for CsvFormat { ) -> Result<Arc<dyn ExecutionPlan>> { let exec = CsvExec::new( conf.object_store, - // flattening this for now because CsvExec does not support partitioning yet - conf.files.into_iter().flatten().collect::<Vec<_>>(), + conf.files, conf.statistics, conf.schema, self.has_header, diff --git a/datafusion/src/datasource/file_format/json.rs b/datafusion/src/datasource/file_format/json.rs index 2741da3..a579831 100644 --- a/datafusion/src/datasource/file_format/json.rs +++ b/datafusion/src/datasource/file_format/json.rs @@ -96,8 +96,7 @@ impl FileFormat for JsonFormat { ) -> Result<Arc<dyn ExecutionPlan>> { let exec = NdJsonExec::new( conf.object_store, - // flattening this for now because NdJsonExec does not support partitioning yet - conf.files.into_iter().flatten().collect::<Vec<_>>(), + conf.files, conf.statistics, conf.schema, conf.projection, diff --git a/datafusion/src/physical_plan/file_format/avro.rs b/datafusion/src/physical_plan/file_format/avro.rs index 0a57f8b..2420040 100644 --- a/datafusion/src/physical_plan/file_format/avro.rs +++ b/datafusion/src/physical_plan/file_format/avro.rs @@ -16,36 +16,32 @@ // under the License. //! Execution plan for reading line-delimited Avro files +#[cfg(feature = "avro")] +use crate::avro_to_arrow; use crate::datasource::object_store::ObjectStore; use crate::datasource::PartitionedFile; use crate::error::{DataFusionError, Result}; -#[cfg(feature = "avro")] -use crate::physical_plan::RecordBatchStream; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; use arrow::datatypes::{Schema, SchemaRef}; #[cfg(feature = "avro")] -use arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; +use arrow::error::ArrowError; + use async_trait::async_trait; -#[cfg(feature = "avro")] -use futures::Stream; use std::any::Any; use std::sync::Arc; + #[cfg(feature = "avro")] -use std::{ - io::Read, - pin::Pin, - task::{Context, Poll}, -}; +use super::file_stream::{BatchIter, FileStream}; /// Execution plan for scanning Avro data source #[derive(Debug, Clone)] pub struct AvroExec { object_store: Arc<dyn ObjectStore>, - files: Vec<PartitionedFile>, + file_groups: Vec<Vec<PartitionedFile>>, statistics: Statistics, - schema: SchemaRef, + file_schema: SchemaRef, projection: Option<Vec<usize>>, projected_schema: SchemaRef, batch_size: usize, @@ -53,29 +49,28 @@ pub struct AvroExec { } impl AvroExec { - /// Create a new JSON reader execution plan provided file list and schema - /// TODO: support partitiond file list (Vec<Vec<PartitionedFile>>) + /// Create a new Avro reader execution plan provided file list and schema pub fn new( object_store: Arc<dyn ObjectStore>, - files: Vec<PartitionedFile>, + file_groups: Vec<Vec<PartitionedFile>>, statistics: Statistics, - schema: SchemaRef, + file_schema: SchemaRef, projection: Option<Vec<usize>>, batch_size: usize, limit: Option<usize>, ) -> Self { let projected_schema = match &projection { - None => Arc::clone(&schema), + None => Arc::clone(&file_schema), Some(p) => Arc::new(Schema::new( - p.iter().map(|i| schema.field(*i).clone()).collect(), + p.iter().map(|i| file_schema.field(*i).clone()).collect(), )), }; Self { object_store, - files, + file_groups, statistics, - schema, + file_schema, projection, projected_schema, batch_size, @@ -83,12 +78,12 @@ impl AvroExec { } } /// List of data files - pub fn files(&self) -> &[PartitionedFile] { - &self.files + pub fn file_groups(&self) -> &[Vec<PartitionedFile>] { + &self.file_groups } /// The schema before projection pub fn file_schema(&self) -> &SchemaRef { - &self.schema + &self.file_schema } /// Optional projection for which columns to load pub fn projection(&self) -> &Option<Vec<usize>> { @@ -115,7 +110,7 @@ impl ExecutionPlan for AvroExec { } fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.files.len()) + Partitioning::UnknownPartitioning(self.file_groups.len()) } fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { @@ -145,26 +140,39 @@ impl ExecutionPlan for AvroExec { #[cfg(feature = "avro")] async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> { - let file = self - .object_store - .file_reader(self.files[partition].file_meta.sized_file.clone())? - .sync_reader()?; - let proj = self.projection.as_ref().map(|p| { p.iter() - .map(|col_idx| self.schema.field(*col_idx).name()) + .map(|col_idx| self.file_schema.field(*col_idx).name()) .cloned() .collect() }); - let avro_reader = crate::avro_to_arrow::Reader::try_new( - file, - self.schema(), - self.batch_size, - proj, - )?; + let batch_size = self.batch_size; + let file_schema = Arc::clone(&self.file_schema); + + // The avro reader cannot limit the number of records, so `remaining` is ignored. + let fun = move |file, _remaining: &Option<usize>| { + let reader_res = avro_to_arrow::Reader::try_new( + file, + Arc::clone(&file_schema), + batch_size, + proj.clone(), + ); + match reader_res { + Ok(r) => Box::new(r) as BatchIter, + Err(e) => Box::new( + vec![Err(ArrowError::ExternalError(Box::new(e)))].into_iter(), + ), + } + }; - Ok(Box::pin(AvroStream::new(avro_reader, self.limit))) + Ok(Box::pin(FileStream::new( + Arc::clone(&self.object_store), + self.file_groups[partition].clone(), + fun, + Arc::clone(&self.projected_schema), + self.limit, + ))) } fn fmt_as( @@ -176,12 +184,8 @@ impl ExecutionPlan for AvroExec { DisplayFormatType::Default => { write!( f, - "AvroExec: files=[{}], batch_size={}, limit={:?}", - self.files - .iter() - .map(|f| f.file_meta.path()) - .collect::<Vec<_>>() - .join(", "), + "AvroExec: files={}, batch_size={}, limit={:?}", + super::FileGroupsDisplay(&self.file_groups), self.batch_size, self.limit, ) @@ -194,70 +198,6 @@ impl ExecutionPlan for AvroExec { } } -#[cfg(feature = "avro")] -struct AvroStream<'a, R: Read> { - reader: crate::avro_to_arrow::Reader<'a, R>, - remain: Option<usize>, -} - -#[cfg(feature = "avro")] -impl<'a, R: Read> AvroStream<'a, R> { - fn new(reader: crate::avro_to_arrow::Reader<'a, R>, limit: Option<usize>) -> Self { - Self { - reader, - remain: limit, - } - } -} - -#[cfg(feature = "avro")] -impl<R: Read + Unpin> Stream for AvroStream<'_, R> { - type Item = ArrowResult<RecordBatch>; - - fn poll_next( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - if let Some(remain) = self.remain.as_mut() { - if *remain < 1 { - return Poll::Ready(None); - } - } - - Poll::Ready(match self.reader.next() { - Ok(Some(item)) => { - if let Some(remain) = self.remain.as_mut() { - if *remain >= item.num_rows() { - *remain -= item.num_rows(); - Some(Ok(item)) - } else { - let len = *remain; - *remain = 0; - Some(Ok(RecordBatch::try_new( - item.schema(), - item.columns() - .iter() - .map(|column| column.slice(0, len)) - .collect(), - )?)) - } - } else { - Some(Ok(item)) - } - } - Ok(None) => None, - Err(err) => Some(Err(err)), - }) - } -} - -#[cfg(feature = "avro")] -impl<R: Read + Unpin> RecordBatchStream for AvroStream<'_, R> { - fn schema(&self) -> SchemaRef { - self.reader.schema() - } -} - #[cfg(test)] #[cfg(feature = "avro")] mod tests { @@ -278,9 +218,9 @@ mod tests { let filename = format!("{}/avro/alltypes_plain.avro", testdata); let avro_exec = AvroExec::new( Arc::new(LocalFileSystem {}), - vec![PartitionedFile { + vec![vec![PartitionedFile { file_meta: local_file_meta(filename.clone()), - }], + }]], Statistics::default(), AvroFormat {} .infer_schema(local_object_reader_stream(vec![filename])) diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index 329dd6b..fc82c8f 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -21,29 +21,25 @@ use crate::datasource::object_store::ObjectStore; use crate::datasource::PartitionedFile; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, Statistics, + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; + use arrow::csv; use arrow::datatypes::{Schema, SchemaRef}; -use arrow::error::Result as ArrowResult; -use arrow::record_batch::RecordBatch; -use futures::Stream; use std::any::Any; -use std::io::Read; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; use async_trait::async_trait; +use super::file_stream::{BatchIter, FileStream}; + /// Execution plan for scanning a CSV file #[derive(Debug, Clone)] pub struct CsvExec { object_store: Arc<dyn ObjectStore>, - files: Vec<PartitionedFile>, + file_groups: Vec<Vec<PartitionedFile>>, /// Schema representing the CSV file - schema: SchemaRef, + file_schema: SchemaRef, /// Schema after the projection has been applied projected_schema: SchemaRef, statistics: Statistics, @@ -56,13 +52,12 @@ pub struct CsvExec { impl CsvExec { /// Create a new CSV reader execution plan provided file list and schema - /// TODO: support partitiond file list (Vec<Vec<PartitionedFile>>) #[allow(clippy::too_many_arguments)] pub fn new( object_store: Arc<dyn ObjectStore>, - files: Vec<PartitionedFile>, + file_groups: Vec<Vec<PartitionedFile>>, statistics: Statistics, - schema: SchemaRef, + file_schema: SchemaRef, has_header: bool, delimiter: u8, projection: Option<Vec<usize>>, @@ -70,16 +65,16 @@ impl CsvExec { limit: Option<usize>, ) -> Self { let projected_schema = match &projection { - None => Arc::clone(&schema), + None => Arc::clone(&file_schema), Some(p) => Arc::new(Schema::new( - p.iter().map(|i| schema.field(*i).clone()).collect(), + p.iter().map(|i| file_schema.field(*i).clone()).collect(), )), }; Self { object_store, - files, - schema, + file_groups, + file_schema, statistics, has_header, delimiter, @@ -91,12 +86,12 @@ impl CsvExec { } /// List of data files - pub fn files(&self) -> &[PartitionedFile] { - &self.files + pub fn file_groups(&self) -> &[Vec<PartitionedFile>] { + &self.file_groups } /// The schema before projection pub fn file_schema(&self) -> &SchemaRef { - &self.schema + &self.file_schema } /// true if the first line of each file is a header pub fn has_header(&self) -> bool { @@ -134,7 +129,7 @@ impl ExecutionPlan for CsvExec { /// Get the output partitioning of this plan fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.files.len()) + Partitioning::UnknownPartitioning(self.file_groups.len()) } fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { @@ -157,20 +152,33 @@ impl ExecutionPlan for CsvExec { } async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> { - let file = self - .object_store - .file_reader(self.files[partition].file_meta.sized_file.clone())? - .sync_reader()?; + let batch_size = self.batch_size; + let file_schema = Arc::clone(&self.file_schema); + let projection = self.projection.clone(); + let has_header = self.has_header; + let delimiter = self.delimiter; + let start_line = if has_header { 1 } else { 0 }; - Ok(Box::pin(CsvStream::try_new_from_reader( - file, - self.schema.clone(), - self.has_header, - self.delimiter, - &self.projection, - self.batch_size, + let fun = move |file, remaining: &Option<usize>| { + let bounds = remaining.map(|x| (0, x + start_line)); + Box::new(csv::Reader::new( + file, + Arc::clone(&file_schema), + has_header, + Some(delimiter), + batch_size, + bounds, + projection.clone(), + )) as BatchIter + }; + + Ok(Box::pin(FileStream::new( + Arc::clone(&self.object_store), + self.file_groups[partition].clone(), + fun, + Arc::clone(&self.projected_schema), self.limit, - )?)) + ))) } fn fmt_as( @@ -182,12 +190,8 @@ impl ExecutionPlan for CsvExec { DisplayFormatType::Default => { write!( f, - "CsvExec: files=[{}], has_header={}, batch_size={}, limit={:?}", - self.files - .iter() - .map(|f| f.file_meta.path()) - .collect::<Vec<_>>() - .join(", "), + "CsvExec: files={}, has_header={}, batch_size={}, limit={:?}", + super::FileGroupsDisplay(&self.file_groups), self.has_header, self.batch_size, self.limit, @@ -201,58 +205,6 @@ impl ExecutionPlan for CsvExec { } } -/// Iterator over batches -struct CsvStream<R: Read> { - /// Arrow CSV reader - reader: csv::Reader<R>, -} - -impl<R: Read> CsvStream<R> { - /// Create an iterator for a reader - pub fn try_new_from_reader( - reader: R, - schema: SchemaRef, - has_header: bool, - delimiter: u8, - projection: &Option<Vec<usize>>, - batch_size: usize, - limit: Option<usize>, - ) -> Result<CsvStream<R>> { - let start_line = if has_header { 1 } else { 0 }; - let bounds = limit.map(|x| (0, x + start_line)); - - let reader = csv::Reader::new( - reader, - schema, - has_header, - Some(delimiter), - batch_size, - bounds, - projection.clone(), - ); - - Ok(Self { reader }) - } -} - -impl<R: Read + Unpin> Stream for CsvStream<R> { - type Item = ArrowResult<RecordBatch>; - - fn poll_next( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - Poll::Ready(self.reader.next()) - } -} - -impl<R: Read + Unpin> RecordBatchStream for CsvStream<R> { - /// Get the schema - fn schema(&self) -> SchemaRef { - self.reader.schema() - } -} - #[cfg(test)] mod tests { use super::*; @@ -270,9 +222,9 @@ mod tests { let path = format!("{}/csv/{}", testdata, filename); let csv = CsvExec::new( Arc::new(LocalFileSystem {}), - vec![PartitionedFile { + vec![vec![PartitionedFile { file_meta: local_file_meta(path), - }], + }]], Statistics::default(), schema, true, @@ -281,7 +233,7 @@ mod tests { 1024, None, ); - assert_eq!(13, csv.schema.fields().len()); + assert_eq!(13, csv.file_schema.fields().len()); assert_eq!(3, csv.projected_schema.fields().len()); assert_eq!(3, csv.schema().fields().len()); let mut stream = csv.execute(0).await?; @@ -303,9 +255,9 @@ mod tests { let path = format!("{}/csv/{}", testdata, filename); let csv = CsvExec::new( Arc::new(LocalFileSystem {}), - vec![PartitionedFile { + vec![vec![PartitionedFile { file_meta: local_file_meta(path), - }], + }]], Statistics::default(), schema, true, @@ -314,7 +266,7 @@ mod tests { 1024, None, ); - assert_eq!(13, csv.schema.fields().len()); + assert_eq!(13, csv.file_schema.fields().len()); assert_eq!(13, csv.projected_schema.fields().len()); assert_eq!(13, csv.schema().fields().len()); let mut it = csv.execute(0).await?; diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs b/datafusion/src/physical_plan/file_format/file_stream.rs new file mode 100644 index 0000000..55a66f4 --- /dev/null +++ b/datafusion/src/physical_plan/file_format/file_stream.rs @@ -0,0 +1,265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A generic stream over file format readers that can be used by +//! any file format that read its files from start to end. +//! +//! Note: Most traits here need to be marked `Sync + Send` to be +//! compliant with the `SendableRecordBatchStream` trait. + +use crate::{ + datasource::{object_store::ObjectStore, PartitionedFile}, + error::Result as DataFusionResult, + physical_plan::RecordBatchStream, +}; +use arrow::{ + datatypes::SchemaRef, + error::{ArrowError, Result as ArrowResult}, + record_batch::RecordBatch, +}; +use futures::Stream; +use std::{ + io::Read, + iter, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +pub type FileIter = + Box<dyn Iterator<Item = DataFusionResult<Box<dyn Read + Send + Sync>>> + Send + Sync>; +pub type BatchIter = Box<dyn Iterator<Item = ArrowResult<RecordBatch>> + Send + Sync>; + +/// A closure that creates a file format reader (iterator over `RecordBatch`) from a `Read` object +/// and an optional number of required records. +pub trait FormatReaderOpener: + FnMut(Box<dyn Read + Send + Sync>, &Option<usize>) -> BatchIter + Send + Unpin + 'static +{ +} + +impl<T> FormatReaderOpener for T where + T: FnMut(Box<dyn Read + Send + Sync>, &Option<usize>) -> BatchIter + + Send + + Unpin + + 'static +{ +} + +/// A stream that iterates record batch by record batch, file over file. +pub struct FileStream<F: FormatReaderOpener> { + /// An iterator over record batches of the last file returned by file_iter + batch_iter: BatchIter, + /// An iterator over input files + file_iter: FileIter, + /// The stream schema (file schema after projection) + schema: SchemaRef, + /// The remaining number of records to parse, None if no limit + remain: Option<usize>, + /// A closure that takes a reader and an optional remaining number of lines + /// (before reaching the limit) and returns a batch iterator. If the file reader + /// is not capable of limiting the number of records in the last batch, the file + /// stream will take care of truncating it. + file_reader: F, +} + +impl<F: FormatReaderOpener> FileStream<F> { + pub fn new( + object_store: Arc<dyn ObjectStore>, + files: Vec<PartitionedFile>, + file_reader: F, + schema: SchemaRef, + limit: Option<usize>, + ) -> Self { + let read_iter = files.into_iter().map(move |f| -> DataFusionResult<_> { + object_store + .file_reader(f.file_meta.sized_file)? + .sync_reader() + }); + + Self { + file_iter: Box::new(read_iter), + batch_iter: Box::new(iter::empty()), + remain: limit, + schema, + file_reader, + } + } + + /// Acts as a flat_map of record batches over files. + fn next_batch(&mut self) -> Option<ArrowResult<RecordBatch>> { + match self.batch_iter.next() { + Some(batch) => Some(batch), + None => match self.file_iter.next() { + Some(Ok(f)) => { + self.batch_iter = (self.file_reader)(f, &self.remain); + self.next_batch() + } + Some(Err(e)) => Some(Err(ArrowError::ExternalError(Box::new(e)))), + None => None, + }, + } + } +} + +impl<F: FormatReaderOpener> Stream for FileStream<F> { + type Item = ArrowResult<RecordBatch>; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + // check if finished or no limit + match self.remain { + Some(r) if r == 0 => return Poll::Ready(None), + None => return Poll::Ready(self.get_mut().next_batch()), + Some(r) => r, + }; + + Poll::Ready(match self.as_mut().next_batch() { + Some(Ok(item)) => { + if let Some(remain) = self.remain.as_mut() { + if *remain >= item.num_rows() { + *remain -= item.num_rows(); + Some(Ok(item)) + } else { + let len = *remain; + *remain = 0; + Some(Ok(RecordBatch::try_new( + item.schema(), + item.columns() + .iter() + .map(|column| column.slice(0, len)) + .collect(), + )?)) + } + } else { + Some(Ok(item)) + } + } + other => other, + }) + } +} + +impl<F: FormatReaderOpener> RecordBatchStream for FileStream<F> { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +#[cfg(test)] +mod tests { + use futures::StreamExt; + + use super::*; + use crate::{ + error::Result, + test::{make_partition, object_store::TestObjectStore}, + }; + + /// helper that creates a stream of 2 files with the same pair of batches in each ([0,1,2] and [0,1]) + async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> { + let records = vec![make_partition(3), make_partition(2)]; + + let source_schema = records[0].schema(); + + let reader = move |_file, _remain: &Option<usize>| { + // this reader returns the same batch regardless of the file + Box::new(records.clone().into_iter().map(Ok)) as BatchIter + }; + + let file_stream = FileStream::new( + TestObjectStore::new_arc(&[("mock_file1", 10), ("mock_file2", 20)]), + vec![ + PartitionedFile::new("mock_file1".to_owned(), 10), + PartitionedFile::new("mock_file2".to_owned(), 20), + ], + reader, + source_schema, + limit, + ); + + file_stream + .map(|b| b.expect("No error expected in stream")) + .collect::<Vec<_>>() + .await + } + + #[tokio::test] + async fn without_limit() -> Result<()> { + let batches = create_and_collect(None).await; + + #[rustfmt::skip] + crate::assert_batches_eq!(&[ + "+---+", + "| i |", + "+---+", + "| 0 |", + "| 1 |", + "| 2 |", + "| 0 |", + "| 1 |", + "| 0 |", + "| 1 |", + "| 2 |", + "| 0 |", + "| 1 |", + "+---+", + ], &batches); + + Ok(()) + } + + #[tokio::test] + async fn with_limit_between_files() -> Result<()> { + let batches = create_and_collect(Some(5)).await; + #[rustfmt::skip] + crate::assert_batches_eq!(&[ + "+---+", + "| i |", + "+---+", + "| 0 |", + "| 1 |", + "| 2 |", + "| 0 |", + "| 1 |", + "+---+", + ], &batches); + + Ok(()) + } + + #[tokio::test] + async fn with_limit_at_middle_of_batch() -> Result<()> { + let batches = create_and_collect(Some(6)).await; + #[rustfmt::skip] + crate::assert_batches_eq!(&[ + "+---+", + "| i |", + "+---+", + "| 0 |", + "| 1 |", + "| 2 |", + "| 0 |", + "| 1 |", + "| 0 |", + "+---+", + ], &batches); + + Ok(()) + } +} diff --git a/datafusion/src/physical_plan/file_format/json.rs b/datafusion/src/physical_plan/file_format/json.rs index 068e53a..f9dde67 100644 --- a/datafusion/src/physical_plan/file_format/json.rs +++ b/datafusion/src/physical_plan/file_format/json.rs @@ -17,36 +17,29 @@ //! Execution plan for reading line-delimited JSON files use async_trait::async_trait; -use futures::Stream; use crate::datasource::object_store::ObjectStore; use crate::datasource::PartitionedFile; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, Statistics, + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; use arrow::{ datatypes::{Schema, SchemaRef}, - error::Result as ArrowResult, json, - record_batch::RecordBatch, }; use std::any::Any; -use std::{ - io::Read, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; +use std::sync::Arc; + +use super::file_stream::{BatchIter, FileStream}; /// Execution plan for scanning NdJson data source #[derive(Debug, Clone)] pub struct NdJsonExec { object_store: Arc<dyn ObjectStore>, - files: Vec<PartitionedFile>, + file_groups: Vec<Vec<PartitionedFile>>, statistics: Statistics, - schema: SchemaRef, + file_schema: SchemaRef, projection: Option<Vec<usize>>, projected_schema: SchemaRef, batch_size: usize, @@ -55,28 +48,27 @@ pub struct NdJsonExec { impl NdJsonExec { /// Create a new JSON reader execution plan provided file list and schema - /// TODO: support partitiond file list (Vec<Vec<PartitionedFile>>) pub fn new( object_store: Arc<dyn ObjectStore>, - files: Vec<PartitionedFile>, + file_groups: Vec<Vec<PartitionedFile>>, statistics: Statistics, - schema: SchemaRef, + file_schema: SchemaRef, projection: Option<Vec<usize>>, batch_size: usize, limit: Option<usize>, ) -> Self { let projected_schema = match &projection { - None => Arc::clone(&schema), + None => Arc::clone(&file_schema), Some(p) => Arc::new(Schema::new( - p.iter().map(|i| schema.field(*i).clone()).collect(), + p.iter().map(|i| file_schema.field(*i).clone()).collect(), )), }; Self { object_store, - files, + file_groups, statistics, - schema, + file_schema, projection, projected_schema, batch_size, @@ -96,7 +88,7 @@ impl ExecutionPlan for NdJsonExec { } fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.files.len()) + Partitioning::UnknownPartitioning(self.file_groups.len()) } fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { @@ -120,19 +112,31 @@ impl ExecutionPlan for NdJsonExec { async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> { let proj = self.projection.as_ref().map(|p| { p.iter() - .map(|col_idx| self.schema.field(*col_idx).name()) + .map(|col_idx| self.file_schema.field(*col_idx).name()) .cloned() .collect() }); - let file = self - .object_store - .file_reader(self.files[partition].file_meta.sized_file.clone())? - .sync_reader()?; - - let json_reader = json::Reader::new(file, self.schema(), self.batch_size, proj); + let batch_size = self.batch_size; + let file_schema = Arc::clone(&self.file_schema); + + // The json reader cannot limit the number of records, so `remaining` is ignored. + let fun = move |file, _remaining: &Option<usize>| { + Box::new(json::Reader::new( + file, + Arc::clone(&file_schema), + batch_size, + proj.clone(), + )) as BatchIter + }; - Ok(Box::pin(NdJsonStream::new(json_reader, self.limit))) + Ok(Box::pin(FileStream::new( + Arc::clone(&self.object_store), + self.file_groups[partition].clone(), + fun, + Arc::clone(&self.projected_schema), + self.limit, + ))) } fn fmt_as( @@ -144,14 +148,10 @@ impl ExecutionPlan for NdJsonExec { DisplayFormatType::Default => { write!( f, - "JsonExec: batch_size={}, limit={:?}, files=[{}]", + "JsonExec: batch_size={}, limit={:?}, files={}", self.batch_size, self.limit, - self.files - .iter() - .map(|f| f.file_meta.path()) - .collect::<Vec<_>>() - .join(", ") + super::FileGroupsDisplay(&self.file_groups), ) } } @@ -162,66 +162,6 @@ impl ExecutionPlan for NdJsonExec { } } -struct NdJsonStream<R: Read> { - reader: json::Reader<R>, - remain: Option<usize>, -} - -impl<R: Read> NdJsonStream<R> { - fn new(reader: json::Reader<R>, limit: Option<usize>) -> Self { - Self { - reader, - remain: limit, - } - } -} - -impl<R: Read + Unpin> Stream for NdJsonStream<R> { - type Item = ArrowResult<RecordBatch>; - - fn poll_next( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - if let Some(remain) = self.remain.as_mut() { - if *remain < 1 { - return Poll::Ready(None); - } - } - - Poll::Ready(match self.reader.next() { - Ok(Some(item)) => { - if let Some(remain) = self.remain.as_mut() { - if *remain >= item.num_rows() { - *remain -= item.num_rows(); - Some(Ok(item)) - } else { - let len = *remain; - *remain = 0; - Some(Ok(RecordBatch::try_new( - item.schema(), - item.columns() - .iter() - .map(|column| column.slice(0, len)) - .collect(), - )?)) - } - } else { - Some(Ok(item)) - } - } - Ok(None) => None, - Err(err) => Some(Err(err)), - }) - } -} - -impl<R: Read + Unpin> RecordBatchStream for NdJsonStream<R> { - fn schema(&self) -> SchemaRef { - self.reader.schema() - } -} - #[cfg(test)] mod tests { use futures::StreamExt; @@ -249,9 +189,9 @@ mod tests { let path = format!("{}/1.json", TEST_DATA_BASE); let exec = NdJsonExec::new( Arc::new(LocalFileSystem {}), - vec![PartitionedFile { + vec![vec![PartitionedFile { file_meta: local_file_meta(path.clone()), - }], + }]], Default::default(), infer_schema(path).await?, None, @@ -304,9 +244,9 @@ mod tests { let path = format!("{}/1.json", TEST_DATA_BASE); let exec = NdJsonExec::new( Arc::new(LocalFileSystem {}), - vec![PartitionedFile { + vec![vec![PartitionedFile { file_meta: local_file_meta(path.clone()), - }], + }]], Default::default(), infer_schema(path).await?, Some(vec![0, 2]), diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index aa4f116..b0b6905 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -19,6 +19,7 @@ mod avro; mod csv; +mod file_stream; mod json; mod parquet; diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs index 4ad4722..f673eb0 100644 --- a/datafusion/src/test/mod.rs +++ b/datafusion/src/test/mod.rs @@ -57,7 +57,7 @@ pub fn create_table_dual() -> Arc<dyn TableProvider> { pub fn create_partitioned_csv( filename: &str, partitions: usize, -) -> Result<(String, Vec<PartitionedFile>)> { +) -> Result<(String, Vec<Vec<PartitionedFile>>)> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/{}", testdata, filename); @@ -96,15 +96,16 @@ pub fn create_partitioned_csv( w.flush().unwrap(); } - Ok(( - tmp_dir.into_path().to_str().unwrap().to_string(), - files - .into_iter() - .map(|f| PartitionedFile { + let groups = files + .into_iter() + .map(|f| { + vec![PartitionedFile { file_meta: local_file_meta(f.to_str().unwrap().to_owned()), - }) - .collect(), - )) + }] + }) + .collect::<Vec<_>>(); + + Ok((tmp_dir.into_path().to_str().unwrap().to_string(), groups)) } /// Get the schema for the aggregate_test_* csv files