This is an automated email from the ASF dual-hosted git repository. nevime pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new a764d3b ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end a764d3b is described below commit a764d3bafaaf593e5b3fe418975cd039c28f8494 Author: Jorge C. Leitao <jorgecarlei...@gmail.com> AuthorDate: Sun Oct 25 22:23:20 2020 +0200 ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end This PR proposes the following changes: 1. Make the CSV reader accept an optional argument to bound its iteration 2. Simplify the `next` code via iterators 3. Add a new struct to perform buffered iterations (useful to any reader) Closes #8482 from jorgecarleitao/csv_many Authored-by: Jorge C. Leitao <jorgecarlei...@gmail.com> Signed-off-by: Neville Dipale <nevilled...@gmail.com> --- rust/arrow/examples/read_csv.rs | 2 +- rust/arrow/src/array/builder.rs | 159 +++++++------ rust/arrow/src/csv/reader.rs | 395 +++++++++++++++++++------------ rust/arrow/src/util/buffered_iterator.rs | 138 +++++++++++ rust/arrow/src/util/mod.rs | 1 + rust/datafusion/src/physical_plan/csv.rs | 1 + 6 files changed, 460 insertions(+), 236 deletions(-) diff --git a/rust/arrow/examples/read_csv.rs b/rust/arrow/examples/read_csv.rs index dcbc44c..8c8dfa0 100644 --- a/rust/arrow/examples/read_csv.rs +++ b/rust/arrow/examples/read_csv.rs @@ -35,7 +35,7 @@ fn main() -> Result<()> { let file = File::open("test/data/uk_cities.csv").unwrap(); - let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, None); + let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, None, None); let _batch = csv.next().unwrap().unwrap(); #[cfg(feature = "prettyprint")] { diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index ca45f9e..7d1122f 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -1990,6 +1990,79 @@ impl ArrayBuilder for StructBuilder { } } +/// Returns a builder with capacity `capacity` that corresponds to the datatype `DataType` +/// This function is useful to construct arrays from an arbitrary vectors with known/expected +/// schema. +pub fn make_builder(datatype: &DataType, capacity: usize) -> Box<ArrayBuilder> { + match datatype { + DataType::Null => unimplemented!(), + DataType::Boolean => Box::new(BooleanBuilder::new(capacity)), + DataType::Int8 => Box::new(Int8Builder::new(capacity)), + DataType::Int16 => Box::new(Int16Builder::new(capacity)), + DataType::Int32 => Box::new(Int32Builder::new(capacity)), + DataType::Int64 => Box::new(Int64Builder::new(capacity)), + DataType::UInt8 => Box::new(UInt8Builder::new(capacity)), + DataType::UInt16 => Box::new(UInt16Builder::new(capacity)), + DataType::UInt32 => Box::new(UInt32Builder::new(capacity)), + DataType::UInt64 => Box::new(UInt64Builder::new(capacity)), + DataType::Float32 => Box::new(Float32Builder::new(capacity)), + DataType::Float64 => Box::new(Float64Builder::new(capacity)), + DataType::Binary => Box::new(BinaryBuilder::new(capacity)), + DataType::FixedSizeBinary(len) => { + Box::new(FixedSizeBinaryBuilder::new(capacity, *len)) + } + DataType::Utf8 => Box::new(StringBuilder::new(capacity)), + DataType::Date32(DateUnit::Day) => Box::new(Date32Builder::new(capacity)), + DataType::Date64(DateUnit::Millisecond) => Box::new(Date64Builder::new(capacity)), + DataType::Time32(TimeUnit::Second) => { + Box::new(Time32SecondBuilder::new(capacity)) + } + DataType::Time32(TimeUnit::Millisecond) => { + Box::new(Time32MillisecondBuilder::new(capacity)) + } + DataType::Time64(TimeUnit::Microsecond) => { + Box::new(Time64MicrosecondBuilder::new(capacity)) + } + DataType::Time64(TimeUnit::Nanosecond) => { + Box::new(Time64NanosecondBuilder::new(capacity)) + } + DataType::Timestamp(TimeUnit::Second, _) => { + Box::new(TimestampSecondBuilder::new(capacity)) + } + DataType::Timestamp(TimeUnit::Millisecond, _) => { + Box::new(TimestampMillisecondBuilder::new(capacity)) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + Box::new(TimestampMicrosecondBuilder::new(capacity)) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + Box::new(TimestampNanosecondBuilder::new(capacity)) + } + DataType::Interval(IntervalUnit::YearMonth) => { + Box::new(IntervalYearMonthBuilder::new(capacity)) + } + DataType::Interval(IntervalUnit::DayTime) => { + Box::new(IntervalDayTimeBuilder::new(capacity)) + } + DataType::Duration(TimeUnit::Second) => { + Box::new(DurationSecondBuilder::new(capacity)) + } + DataType::Duration(TimeUnit::Millisecond) => { + Box::new(DurationMillisecondBuilder::new(capacity)) + } + DataType::Duration(TimeUnit::Microsecond) => { + Box::new(DurationMicrosecondBuilder::new(capacity)) + } + DataType::Duration(TimeUnit::Nanosecond) => { + Box::new(DurationNanosecondBuilder::new(capacity)) + } + DataType::Struct(fields) => { + Box::new(StructBuilder::from_fields(fields.clone(), capacity)) + } + t => panic!("Data type {:?} is not currently supported", t), + } +} + impl StructBuilder { pub fn new(fields: Vec<Field>, builders: Vec<Box<ArrayBuilder>>) -> Self { let mut field_anys = Vec::with_capacity(builders.len()); @@ -2016,86 +2089,12 @@ impl StructBuilder { } } - pub fn from_schema(schema: Schema, capacity: usize) -> Self { - let fields = schema.fields(); + pub fn from_fields(fields: Vec<Field>, capacity: usize) -> Self { let mut builders = Vec::with_capacity(fields.len()); - for f in schema.fields() { - builders.push(Self::from_field(f.clone(), capacity)); - } - Self::new(schema.fields, builders) - } - - fn from_field(f: Field, capacity: usize) -> Box<ArrayBuilder> { - match f.data_type() { - DataType::Null => unimplemented!(), - DataType::Boolean => Box::new(BooleanBuilder::new(capacity)), - DataType::Int8 => Box::new(Int8Builder::new(capacity)), - DataType::Int16 => Box::new(Int16Builder::new(capacity)), - DataType::Int32 => Box::new(Int32Builder::new(capacity)), - DataType::Int64 => Box::new(Int64Builder::new(capacity)), - DataType::UInt8 => Box::new(UInt8Builder::new(capacity)), - DataType::UInt16 => Box::new(UInt16Builder::new(capacity)), - DataType::UInt32 => Box::new(UInt32Builder::new(capacity)), - DataType::UInt64 => Box::new(UInt64Builder::new(capacity)), - DataType::Float32 => Box::new(Float32Builder::new(capacity)), - DataType::Float64 => Box::new(Float64Builder::new(capacity)), - DataType::Binary => Box::new(BinaryBuilder::new(capacity)), - DataType::FixedSizeBinary(len) => { - Box::new(FixedSizeBinaryBuilder::new(capacity, *len)) - } - DataType::Utf8 => Box::new(StringBuilder::new(capacity)), - DataType::Date32(DateUnit::Day) => Box::new(Date32Builder::new(capacity)), - DataType::Date64(DateUnit::Millisecond) => { - Box::new(Date64Builder::new(capacity)) - } - DataType::Time32(TimeUnit::Second) => { - Box::new(Time32SecondBuilder::new(capacity)) - } - DataType::Time32(TimeUnit::Millisecond) => { - Box::new(Time32MillisecondBuilder::new(capacity)) - } - DataType::Time64(TimeUnit::Microsecond) => { - Box::new(Time64MicrosecondBuilder::new(capacity)) - } - DataType::Time64(TimeUnit::Nanosecond) => { - Box::new(Time64NanosecondBuilder::new(capacity)) - } - DataType::Timestamp(TimeUnit::Second, _) => { - Box::new(TimestampSecondBuilder::new(capacity)) - } - DataType::Timestamp(TimeUnit::Millisecond, _) => { - Box::new(TimestampMillisecondBuilder::new(capacity)) - } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - Box::new(TimestampMicrosecondBuilder::new(capacity)) - } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - Box::new(TimestampNanosecondBuilder::new(capacity)) - } - DataType::Interval(IntervalUnit::YearMonth) => { - Box::new(IntervalYearMonthBuilder::new(capacity)) - } - DataType::Interval(IntervalUnit::DayTime) => { - Box::new(IntervalDayTimeBuilder::new(capacity)) - } - DataType::Duration(TimeUnit::Second) => { - Box::new(DurationSecondBuilder::new(capacity)) - } - DataType::Duration(TimeUnit::Millisecond) => { - Box::new(DurationMillisecondBuilder::new(capacity)) - } - DataType::Duration(TimeUnit::Microsecond) => { - Box::new(DurationMicrosecondBuilder::new(capacity)) - } - DataType::Duration(TimeUnit::Nanosecond) => { - Box::new(DurationNanosecondBuilder::new(capacity)) - } - DataType::Struct(fields) => { - let schema = Schema::new(fields.clone()); - Box::new(Self::from_schema(schema, capacity)) - } - t => panic!("Data type {:?} is not currently supported", t), + for field in &fields { + builders.push(make_builder(field.data_type(), capacity)); } + Self::new(fields, builders) } /// Returns a mutable reference to the child field builder at index `i`. @@ -3369,7 +3368,7 @@ mod tests { let struct_type = DataType::Struct(sub_fields); fields.push(Field::new("f3", struct_type, false)); - let mut builder = StructBuilder::from_schema(Schema::new(fields), 5); + let mut builder = StructBuilder::from_fields(fields, 5); assert_eq!(3, builder.num_fields()); assert!(builder.field_builder::<Float32Builder>(0).is_some()); assert!(builder.field_builder::<StringBuilder>(1).is_some()); @@ -3384,7 +3383,7 @@ mod tests { let list_type = DataType::List(Box::new(DataType::Int64)); fields.push(Field::new("f2", list_type, false)); - let _ = StructBuilder::from_schema(Schema::new(fields), 5); + let _ = StructBuilder::from_fields(fields, 5); } #[test] diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index c7a6b77..4f926f9 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -36,26 +36,29 @@ //! //! let file = File::open("test/data/uk_cities.csv").unwrap(); //! -//! let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, None); +//! let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, None, None); //! let batch = csv.next().unwrap().unwrap(); //! ``` use lazy_static::lazy_static; use regex::{Regex, RegexBuilder}; -use std::collections::HashSet; -use std::fmt; use std::fs::File; use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; +use std::{collections::HashSet, iter::Skip}; +use std::{fmt, iter::Take}; use csv as csv_crate; -use crate::array::{ArrayRef, PrimitiveArray, StringBuilder}; use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; +use crate::{ + array::{ArrayRef, PrimitiveArray, StringBuilder}, + util::buffered_iterator::Buffered, +}; -use self::csv_crate::{StringRecord, StringRecordsIntoIter}; +use self::csv_crate::{Error, StringRecord, StringRecordsIntoIter}; lazy_static! { static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap(); @@ -216,6 +219,9 @@ pub fn infer_schema_from_files( Schema::try_merge(&schemas) } +// optional bounds of the reader, of the form (min line, max line). +type Bounds = Option<(usize, usize)>; + /// CSV file reader pub struct Reader<R: Read> { /// Explicit schema for the CSV file @@ -223,10 +229,9 @@ pub struct Reader<R: Read> { /// Optional projection for which columns to load (zero-based column indices) projection: Option<Vec<usize>>, /// File reader - record_iter: StringRecordsIntoIter<BufReader<R>>, - /// Batch size (number of records to load each time) - batch_size: usize, - /// Current line number, used in error reporting + record_iter: + Buffered<Skip<Take<StringRecordsIntoIter<BufReader<R>>>>, StringRecord, Error>, + /// Current line number line_number: usize, } @@ -238,7 +243,6 @@ where f.debug_struct("Reader") .field("schema", &self.schema) .field("projection", &self.projection) - .field("batch_size", &self.batch_size) .field("line_number", &self.line_number) .finish() } @@ -256,6 +260,7 @@ impl<R: Read> Reader<R> { has_header: bool, delimiter: Option<u8>, batch_size: usize, + bounds: Bounds, projection: Option<Vec<usize>>, ) -> Self { Self::from_buf_reader( @@ -264,6 +269,7 @@ impl<R: Read> Reader<R> { has_header, delimiter, batch_size, + bounds, projection, ) } @@ -293,6 +299,7 @@ impl<R: Read> Reader<R> { has_header: bool, delimiter: Option<u8>, batch_size: usize, + bounds: Bounds, projection: Option<Vec<usize>>, ) -> Self { let mut reader_builder = csv_crate::ReaderBuilder::new(); @@ -304,147 +311,44 @@ impl<R: Read> Reader<R> { let csv_reader = reader_builder.from_reader(buf_reader); let record_iter = csv_reader.into_records(); + + let (start, end) = match bounds { + None => (0, usize::MAX), + Some((start, end)) => (start, end), + }; + // Create an iterator that: + // * skips the first `start` items + // * runs up to `end` items + // * buffers `batch_size` items + // note that this skips by iteration. This is because in general it is not possible + // to seek in CSV. However, skiping still saves the burden of creating arrow arrays, + // which is a slow operation that scales with the number of columns + let record_iter = Buffered::new(record_iter.take(end).skip(start), batch_size); + Self { schema, projection, record_iter, - batch_size, - line_number: if has_header { 1 } else { 0 }, + line_number: if has_header { start + 1 } else { start + 0 }, } } - - fn parse(&self, rows: &[StringRecord]) -> Result<RecordBatch> { - let projection: Vec<usize> = match self.projection { - Some(ref v) => v.clone(), - None => self - .schema - .fields() - .iter() - .enumerate() - .map(|(i, _)| i) - .collect(), - }; - - let arrays: Result<Vec<ArrayRef>> = projection - .iter() - .map(|i| { - let i = *i; - let field = self.schema.field(i); - match field.data_type() { - &DataType::Boolean => { - self.build_primitive_array::<BooleanType>(rows, i) - } - &DataType::Int8 => self.build_primitive_array::<Int8Type>(rows, i), - &DataType::Int16 => self.build_primitive_array::<Int16Type>(rows, i), - &DataType::Int32 => self.build_primitive_array::<Int32Type>(rows, i), - &DataType::Int64 => self.build_primitive_array::<Int64Type>(rows, i), - &DataType::UInt8 => self.build_primitive_array::<UInt8Type>(rows, i), - &DataType::UInt16 => { - self.build_primitive_array::<UInt16Type>(rows, i) - } - &DataType::UInt32 => { - self.build_primitive_array::<UInt32Type>(rows, i) - } - &DataType::UInt64 => { - self.build_primitive_array::<UInt64Type>(rows, i) - } - &DataType::Float32 => { - self.build_primitive_array::<Float32Type>(rows, i) - } - &DataType::Float64 => { - self.build_primitive_array::<Float64Type>(rows, i) - } - &DataType::Utf8 => { - let mut builder = StringBuilder::new(rows.len()); - for row in rows.iter() { - match row.get(i) { - Some(s) => builder.append_value(s).unwrap(), - _ => builder.append(false).unwrap(), - } - } - Ok(Arc::new(builder.finish()) as ArrayRef) - } - other => Err(ArrowError::ParseError(format!( - "Unsupported data type {:?}", - other - ))), - } - }) - .collect(); - - let schema_fields = self.schema.fields(); - - let projected_fields: Vec<Field> = projection - .iter() - .map(|i| schema_fields[*i].clone()) - .collect(); - - let projected_schema = Arc::new(Schema::new(projected_fields)); - - arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr)) - } - - fn build_primitive_array<T: ArrowPrimitiveType>( - &self, - rows: &[StringRecord], - col_idx: usize, - ) -> Result<ArrayRef> { - let is_boolean_type = - *self.schema.field(col_idx).data_type() == DataType::Boolean; - - rows.iter() - .enumerate() - .map(|(row_index, row)| { - match row.get(col_idx) { - Some(s) => { - if s.is_empty() { - return Ok(None); - } - let parsed = if is_boolean_type { - s.to_lowercase().parse::<T::Native>() - } else { - s.parse::<T::Native>() - }; - match parsed { - Ok(e) => Ok(Some(e)), - Err(_) => Err(ArrowError::ParseError(format!( - // TODO: we should surface the underlying error here. - "Error while parsing value {} for column {} at line {}", - s, - col_idx, - self.line_number + row_index - ))), - } - } - None => Ok(None), - } - }) - .collect::<Result<PrimitiveArray<T>>>() - .map(|e| Arc::new(e) as ArrayRef) - } } impl<R: Read> Iterator for Reader<R> { type Item = Result<RecordBatch>; fn next(&mut self) -> Option<Self::Item> { - // read a batch of rows into memory - let mut rows: Vec<StringRecord> = Vec::with_capacity(self.batch_size); - for i in 0..self.batch_size { - match self.record_iter.next() { - Some(Ok(r)) => { - rows.push(r); - } - Some(Err(e)) => { - return Some(Err(ArrowError::ParseError(format!( - "Error parsing line {}: {:?}", - self.line_number + i, - e - )))); - } - None => break, + let rows = match self.record_iter.next() { + Some(Ok(r)) => r, + Some(Err(e)) => { + return Some(Err(ArrowError::ParseError(format!( + "Error parsing line {}: {:?}", + self.line_number + self.record_iter.n(), + e + )))); } - } + None => return None, + }; // return early if no data was loaded if rows.is_empty() { @@ -452,7 +356,12 @@ impl<R: Read> Iterator for Reader<R> { } // parse the batches into a RecordBatch - let result = self.parse(&rows); + let result = parse( + &rows, + &self.schema.fields(), + &self.projection, + self.line_number, + ); self.line_number += rows.len(); @@ -460,6 +369,120 @@ impl<R: Read> Iterator for Reader<R> { } } +/// parses a slice of [csv_crate::StringRecord] into a [array::record_batch::RecordBatch]. +fn parse( + rows: &[StringRecord], + fields: &Vec<Field>, + projection: &Option<Vec<usize>>, + line_number: usize, +) -> Result<RecordBatch> { + let projection: Vec<usize> = match projection { + Some(ref v) => v.clone(), + None => fields.iter().enumerate().map(|(i, _)| i).collect(), + }; + + let arrays: Result<Vec<ArrayRef>> = projection + .iter() + .map(|i| { + let i = *i; + let field = &fields[i]; + match field.data_type() { + &DataType::Boolean => { + build_primitive_array::<BooleanType>(line_number, rows, i) + } + &DataType::Int8 => { + build_primitive_array::<Int8Type>(line_number, rows, i) + } + &DataType::Int16 => { + build_primitive_array::<Int16Type>(line_number, rows, i) + } + &DataType::Int32 => { + build_primitive_array::<Int32Type>(line_number, rows, i) + } + &DataType::Int64 => { + build_primitive_array::<Int64Type>(line_number, rows, i) + } + &DataType::UInt8 => { + build_primitive_array::<UInt8Type>(line_number, rows, i) + } + &DataType::UInt16 => { + build_primitive_array::<UInt16Type>(line_number, rows, i) + } + &DataType::UInt32 => { + build_primitive_array::<UInt32Type>(line_number, rows, i) + } + &DataType::UInt64 => { + build_primitive_array::<UInt64Type>(line_number, rows, i) + } + &DataType::Float32 => { + build_primitive_array::<Float32Type>(line_number, rows, i) + } + &DataType::Float64 => { + build_primitive_array::<Float64Type>(line_number, rows, i) + } + &DataType::Utf8 => { + let mut builder = StringBuilder::new(rows.len()); + for row in rows.iter() { + match row.get(i) { + Some(s) => builder.append_value(s).unwrap(), + _ => builder.append(false).unwrap(), + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + } + other => Err(ArrowError::ParseError(format!( + "Unsupported data type {:?}", + other + ))), + } + }) + .collect(); + + let projected_fields: Vec<Field> = + projection.iter().map(|i| fields[*i].clone()).collect(); + + let projected_schema = Arc::new(Schema::new(projected_fields)); + + arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr)) +} + +// parses a specific column (col_idx) into an Arrow Array. +fn build_primitive_array<T: ArrowPrimitiveType>( + line_number: usize, + rows: &[StringRecord], + col_idx: usize, +) -> Result<ArrayRef> { + rows.iter() + .enumerate() + .map(|(row_index, row)| { + match row.get(col_idx) { + Some(s) => { + if s.is_empty() { + return Ok(None); + } + let parsed = if T::DATA_TYPE == DataType::Boolean { + s.to_lowercase().parse::<T::Native>() + } else { + s.parse::<T::Native>() + }; + match parsed { + Ok(e) => Ok(Some(e)), + Err(_) => Err(ArrowError::ParseError(format!( + // TODO: we should surface the underlying error here. + "Error while parsing value {} for column {} at line {}", + s, + col_idx, + line_number + row_index + ))), + } + } + None => Ok(None), + } + }) + .collect::<Result<PrimitiveArray<T>>>() + .map(|e| Arc::new(e) as ArrayRef) +} + /// CSV file reader builder #[derive(Debug)] pub struct ReaderBuilder { @@ -483,6 +506,8 @@ pub struct ReaderBuilder { /// /// The default batch size when using the `ReaderBuilder` is 1024 records batch_size: usize, + /// The bounds over which to scan the reader. `None` starts from 0 and runs until EOF. + bounds: Bounds, /// Optional projection for which columns to load (zero-based column indices) projection: Option<Vec<usize>>, } @@ -495,6 +520,7 @@ impl Default for ReaderBuilder { delimiter: None, max_records: None, batch_size: 1024, + bounds: None, projection: None, } } @@ -584,18 +610,15 @@ impl ReaderBuilder { Arc::new(inferred_schema) } }; - let csv_reader = csv_crate::ReaderBuilder::new() - .delimiter(delimiter) - .has_headers(self.has_header) - .from_reader(buf_reader); - let record_iter = csv_reader.into_records(); - Ok(Reader { + Ok(Reader::from_buf_reader( + buf_reader, schema, - projection: self.projection.clone(), - record_iter, - batch_size: self.batch_size, - line_number: if self.has_header { 1 } else { 0 }, - }) + self.has_header, + self.delimiter, + self.batch_size, + None, + self.projection.clone(), + )) } } @@ -620,8 +643,15 @@ mod tests { let file = File::open("test/data/uk_cities.csv").unwrap(); - let mut csv = - Reader::new(file, Arc::new(schema.clone()), false, None, 1024, None); + let mut csv = Reader::new( + file, + Arc::new(schema.clone()), + false, + None, + 1024, + None, + None, + ); assert_eq!(Arc::new(schema), csv.schema()); let batch = csv.next().unwrap().unwrap(); assert_eq!(37, batch.num_rows()); @@ -666,6 +696,7 @@ mod tests { None, 1024, None, + None, ); let batch = csv.next().unwrap().unwrap(); assert_eq!(74, batch.num_rows()); @@ -755,8 +786,15 @@ mod tests { let file = File::open("test/data/uk_cities.csv").unwrap(); - let mut csv = - Reader::new(file, Arc::new(schema), false, None, 1024, Some(vec![0, 1])); + let mut csv = Reader::new( + file, + Arc::new(schema), + false, + None, + 1024, + None, + Some(vec![0, 1]), + ); let projected_schema = Arc::new(Schema::new(vec![ Field::new("city", DataType::Utf8, false), Field::new("lat", DataType::Float64, false), @@ -778,7 +816,7 @@ mod tests { let file = File::open("test/data/null_test.csv").unwrap(); - let mut csv = Reader::new(file, Arc::new(schema), true, None, 1024, None); + let mut csv = Reader::new(file, Arc::new(schema), true, None, 1024, None, None); let batch = csv.next().unwrap().unwrap(); assert_eq!(false, batch.column(1).is_null(0)); @@ -906,4 +944,51 @@ mod tests { Ok(()) } + + #[test] + fn test_bounded() -> Result<()> { + let schema = Schema::new(vec![Field::new("int", DataType::UInt32, false)]); + let data = vec![ + vec!["0"], + vec!["1"], + vec!["2"], + vec!["3"], + vec!["4"], + vec!["5"], + vec!["6"], + ]; + + let data = data + .iter() + .map(|x| x.join(",")) + .collect::<Vec<_>>() + .join("\n"); + let data = data.as_bytes(); + + let reader = std::io::Cursor::new(data); + + let mut csv = Reader::new( + reader, + Arc::new(schema), + false, + None, + 2, + // starting at row 2 and up to row 6. + Some((2, 6)), + Some(vec![0]), + ); + + let batch = csv.next().unwrap().unwrap(); + let a = batch.column(0); + let a = a.as_any().downcast_ref::<UInt32Array>().unwrap(); + assert_eq!(a, &UInt32Array::from(vec![2, 3])); + + let batch = csv.next().unwrap().unwrap(); + let a = batch.column(0); + let a = a.as_any().downcast_ref::<UInt32Array>().unwrap(); + assert_eq!(a, &UInt32Array::from(vec![4, 5])); + + assert!(csv.next().is_none()); + Ok(()) + } } diff --git a/rust/arrow/src/util/buffered_iterator.rs b/rust/arrow/src/util/buffered_iterator.rs new file mode 100644 index 0000000..059b824 --- /dev/null +++ b/rust/arrow/src/util/buffered_iterator.rs @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [Buffered] is an iterator useful to build an [arrow::array::Array] and other +//! containers that benefit from batching or chunking. + +use std::marker::PhantomData; + +/// An iterator that buffers results in a vector so that the iterator returns a vector of `size` items. +/// The items must be a [std::result::Result] and if an error is returned, tha error is returned +/// and the iterator continues. +/// An invariant of this iterator is that every returned vector's size is at most the specified size. +#[derive(Debug)] +pub struct Buffered<I, T, R> +where + T: Clone, + I: Iterator<Item = Result<T, R>>, +{ + iter: I, + size: usize, + buffer: Vec<T>, + phantom: PhantomData<R>, +} + +impl<I, T, R> Buffered<I, T, R> +where + T: Clone, + I: Iterator<Item = Result<T, R>>, +{ + pub fn new(iter: I, size: usize) -> Self { + Buffered { + iter, + size, + buffer: Vec::with_capacity(size), + phantom: PhantomData, + } + } + + /// returns the number of items buffered so far. + /// Useful to extract the exact item where an error occurred + #[inline] + pub fn n(&self) -> usize { + return self.buffer.len(); + } +} + +impl<I, T, R> Iterator for Buffered<I, T, R> +where + T: Clone, + I: Iterator<Item = Result<T, R>>, +{ + type Item = Result<Vec<T>, R>; + + fn next(&mut self) -> Option<Self::Item> { + for _ in 0..(self.size - self.n()) { + match self.iter.next() { + Some(Ok(item)) => self.buffer.push(item), + Some(Err(error)) => return Some(Err(error)), + None => break, + } + } + if self.buffer.is_empty() { + None + } else { + let result = self.buffer.clone(); + self.buffer.clear(); + Some(Ok(result)) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug, PartialEq)] + struct AError {} + + impl std::fmt::Display for AError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Bla") + } + } + impl std::error::Error for AError {} + + #[test] + fn test_basic() { + let a: Vec<Result<i32, AError>> = vec![Ok(1), Ok(2), Ok(3)]; + let iter = a.into_iter(); + let mut iter = Buffered::new(iter, 2); + + assert_eq!(iter.next(), Some(Ok(vec![1, 2]))); + assert_eq!(iter.next(), Some(Ok(vec![3]))); + assert_eq!(iter.next(), None); + } + + #[test] + fn test_error_first() { + let a: Vec<Result<i32, AError>> = + vec![Ok(1), Ok(2), Err(AError {}), Ok(4), Ok(5)]; + let iter = a.into_iter(); + let mut iter = Buffered::new(iter, 2); + + assert_eq!(iter.next(), Some(Ok(vec![1, 2]))); + assert_eq!(iter.next(), Some(Err(AError {}))); + // 4 is here: it was not skipped on the previous + assert_eq!(iter.n(), 0); + assert_eq!(iter.next(), Some(Ok(vec![4, 5]))); + assert_eq!(iter.next(), None); + } + + #[test] + fn test_error_last() { + let a: Vec<Result<i32, AError>> = vec![Ok(1), Err(AError {}), Ok(3), Ok(4)]; + let iter = a.into_iter(); + let mut iter = Buffered::new(iter, 2); + + assert_eq!(iter.next(), Some(Err(AError {}))); + assert_eq!(iter.n(), 1); + assert_eq!(iter.next(), Some(Ok(vec![1, 3]))); + assert_eq!(iter.next(), Some(Ok(vec![4]))); + assert_eq!(iter.next(), None); + } +} diff --git a/rust/arrow/src/util/mod.rs b/rust/arrow/src/util/mod.rs index 0f95043..053d132 100644 --- a/rust/arrow/src/util/mod.rs +++ b/rust/arrow/src/util/mod.rs @@ -17,6 +17,7 @@ pub mod bit_chunk_iterator; pub mod bit_util; +pub mod buffered_iterator; pub mod display; pub mod integration_util; #[cfg(feature = "prettyprint")] diff --git a/rust/datafusion/src/physical_plan/csv.rs b/rust/datafusion/src/physical_plan/csv.rs index ba40ddd..f28523f 100644 --- a/rust/datafusion/src/physical_plan/csv.rs +++ b/rust/datafusion/src/physical_plan/csv.rs @@ -256,6 +256,7 @@ impl CsvStream { has_header, delimiter, batch_size, + None, projection.clone(), );