This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a764d3b  ARROW-10332: [Rust] Allow CSV reader to iterate from start up 
to end
a764d3b is described below

commit a764d3bafaaf593e5b3fe418975cd039c28f8494
Author: Jorge C. Leitao <jorgecarlei...@gmail.com>
AuthorDate: Sun Oct 25 22:23:20 2020 +0200

    ARROW-10332: [Rust] Allow CSV reader to iterate from start up to end
    
    This PR proposes the following changes:
    
    1. Make the CSV reader accept an optional argument to bound its iteration
    2. Simplify the `next` code via iterators
    3. Add a new struct to perform buffered iterations (useful to any reader)
    
    Closes #8482 from jorgecarleitao/csv_many
    
    Authored-by: Jorge C. Leitao <jorgecarlei...@gmail.com>
    Signed-off-by: Neville Dipale <nevilled...@gmail.com>
---
 rust/arrow/examples/read_csv.rs          |   2 +-
 rust/arrow/src/array/builder.rs          | 159 +++++++------
 rust/arrow/src/csv/reader.rs             | 395 +++++++++++++++++++------------
 rust/arrow/src/util/buffered_iterator.rs | 138 +++++++++++
 rust/arrow/src/util/mod.rs               |   1 +
 rust/datafusion/src/physical_plan/csv.rs |   1 +
 6 files changed, 460 insertions(+), 236 deletions(-)

diff --git a/rust/arrow/examples/read_csv.rs b/rust/arrow/examples/read_csv.rs
index dcbc44c..8c8dfa0 100644
--- a/rust/arrow/examples/read_csv.rs
+++ b/rust/arrow/examples/read_csv.rs
@@ -35,7 +35,7 @@ fn main() -> Result<()> {
 
     let file = File::open("test/data/uk_cities.csv").unwrap();
 
-    let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, 
None);
+    let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, 
None, None);
     let _batch = csv.next().unwrap().unwrap();
     #[cfg(feature = "prettyprint")]
     {
diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs
index ca45f9e..7d1122f 100644
--- a/rust/arrow/src/array/builder.rs
+++ b/rust/arrow/src/array/builder.rs
@@ -1990,6 +1990,79 @@ impl ArrayBuilder for StructBuilder {
     }
 }
 
+/// Returns a builder with capacity `capacity` that corresponds to the 
datatype `DataType`
+/// This function is useful to construct arrays from an arbitrary vectors with 
known/expected
+/// schema.
+pub fn make_builder(datatype: &DataType, capacity: usize) -> Box<ArrayBuilder> 
{
+    match datatype {
+        DataType::Null => unimplemented!(),
+        DataType::Boolean => Box::new(BooleanBuilder::new(capacity)),
+        DataType::Int8 => Box::new(Int8Builder::new(capacity)),
+        DataType::Int16 => Box::new(Int16Builder::new(capacity)),
+        DataType::Int32 => Box::new(Int32Builder::new(capacity)),
+        DataType::Int64 => Box::new(Int64Builder::new(capacity)),
+        DataType::UInt8 => Box::new(UInt8Builder::new(capacity)),
+        DataType::UInt16 => Box::new(UInt16Builder::new(capacity)),
+        DataType::UInt32 => Box::new(UInt32Builder::new(capacity)),
+        DataType::UInt64 => Box::new(UInt64Builder::new(capacity)),
+        DataType::Float32 => Box::new(Float32Builder::new(capacity)),
+        DataType::Float64 => Box::new(Float64Builder::new(capacity)),
+        DataType::Binary => Box::new(BinaryBuilder::new(capacity)),
+        DataType::FixedSizeBinary(len) => {
+            Box::new(FixedSizeBinaryBuilder::new(capacity, *len))
+        }
+        DataType::Utf8 => Box::new(StringBuilder::new(capacity)),
+        DataType::Date32(DateUnit::Day) => 
Box::new(Date32Builder::new(capacity)),
+        DataType::Date64(DateUnit::Millisecond) => 
Box::new(Date64Builder::new(capacity)),
+        DataType::Time32(TimeUnit::Second) => {
+            Box::new(Time32SecondBuilder::new(capacity))
+        }
+        DataType::Time32(TimeUnit::Millisecond) => {
+            Box::new(Time32MillisecondBuilder::new(capacity))
+        }
+        DataType::Time64(TimeUnit::Microsecond) => {
+            Box::new(Time64MicrosecondBuilder::new(capacity))
+        }
+        DataType::Time64(TimeUnit::Nanosecond) => {
+            Box::new(Time64NanosecondBuilder::new(capacity))
+        }
+        DataType::Timestamp(TimeUnit::Second, _) => {
+            Box::new(TimestampSecondBuilder::new(capacity))
+        }
+        DataType::Timestamp(TimeUnit::Millisecond, _) => {
+            Box::new(TimestampMillisecondBuilder::new(capacity))
+        }
+        DataType::Timestamp(TimeUnit::Microsecond, _) => {
+            Box::new(TimestampMicrosecondBuilder::new(capacity))
+        }
+        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+            Box::new(TimestampNanosecondBuilder::new(capacity))
+        }
+        DataType::Interval(IntervalUnit::YearMonth) => {
+            Box::new(IntervalYearMonthBuilder::new(capacity))
+        }
+        DataType::Interval(IntervalUnit::DayTime) => {
+            Box::new(IntervalDayTimeBuilder::new(capacity))
+        }
+        DataType::Duration(TimeUnit::Second) => {
+            Box::new(DurationSecondBuilder::new(capacity))
+        }
+        DataType::Duration(TimeUnit::Millisecond) => {
+            Box::new(DurationMillisecondBuilder::new(capacity))
+        }
+        DataType::Duration(TimeUnit::Microsecond) => {
+            Box::new(DurationMicrosecondBuilder::new(capacity))
+        }
+        DataType::Duration(TimeUnit::Nanosecond) => {
+            Box::new(DurationNanosecondBuilder::new(capacity))
+        }
+        DataType::Struct(fields) => {
+            Box::new(StructBuilder::from_fields(fields.clone(), capacity))
+        }
+        t => panic!("Data type {:?} is not currently supported", t),
+    }
+}
+
 impl StructBuilder {
     pub fn new(fields: Vec<Field>, builders: Vec<Box<ArrayBuilder>>) -> Self {
         let mut field_anys = Vec::with_capacity(builders.len());
@@ -2016,86 +2089,12 @@ impl StructBuilder {
         }
     }
 
-    pub fn from_schema(schema: Schema, capacity: usize) -> Self {
-        let fields = schema.fields();
+    pub fn from_fields(fields: Vec<Field>, capacity: usize) -> Self {
         let mut builders = Vec::with_capacity(fields.len());
-        for f in schema.fields() {
-            builders.push(Self::from_field(f.clone(), capacity));
-        }
-        Self::new(schema.fields, builders)
-    }
-
-    fn from_field(f: Field, capacity: usize) -> Box<ArrayBuilder> {
-        match f.data_type() {
-            DataType::Null => unimplemented!(),
-            DataType::Boolean => Box::new(BooleanBuilder::new(capacity)),
-            DataType::Int8 => Box::new(Int8Builder::new(capacity)),
-            DataType::Int16 => Box::new(Int16Builder::new(capacity)),
-            DataType::Int32 => Box::new(Int32Builder::new(capacity)),
-            DataType::Int64 => Box::new(Int64Builder::new(capacity)),
-            DataType::UInt8 => Box::new(UInt8Builder::new(capacity)),
-            DataType::UInt16 => Box::new(UInt16Builder::new(capacity)),
-            DataType::UInt32 => Box::new(UInt32Builder::new(capacity)),
-            DataType::UInt64 => Box::new(UInt64Builder::new(capacity)),
-            DataType::Float32 => Box::new(Float32Builder::new(capacity)),
-            DataType::Float64 => Box::new(Float64Builder::new(capacity)),
-            DataType::Binary => Box::new(BinaryBuilder::new(capacity)),
-            DataType::FixedSizeBinary(len) => {
-                Box::new(FixedSizeBinaryBuilder::new(capacity, *len))
-            }
-            DataType::Utf8 => Box::new(StringBuilder::new(capacity)),
-            DataType::Date32(DateUnit::Day) => 
Box::new(Date32Builder::new(capacity)),
-            DataType::Date64(DateUnit::Millisecond) => {
-                Box::new(Date64Builder::new(capacity))
-            }
-            DataType::Time32(TimeUnit::Second) => {
-                Box::new(Time32SecondBuilder::new(capacity))
-            }
-            DataType::Time32(TimeUnit::Millisecond) => {
-                Box::new(Time32MillisecondBuilder::new(capacity))
-            }
-            DataType::Time64(TimeUnit::Microsecond) => {
-                Box::new(Time64MicrosecondBuilder::new(capacity))
-            }
-            DataType::Time64(TimeUnit::Nanosecond) => {
-                Box::new(Time64NanosecondBuilder::new(capacity))
-            }
-            DataType::Timestamp(TimeUnit::Second, _) => {
-                Box::new(TimestampSecondBuilder::new(capacity))
-            }
-            DataType::Timestamp(TimeUnit::Millisecond, _) => {
-                Box::new(TimestampMillisecondBuilder::new(capacity))
-            }
-            DataType::Timestamp(TimeUnit::Microsecond, _) => {
-                Box::new(TimestampMicrosecondBuilder::new(capacity))
-            }
-            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
-                Box::new(TimestampNanosecondBuilder::new(capacity))
-            }
-            DataType::Interval(IntervalUnit::YearMonth) => {
-                Box::new(IntervalYearMonthBuilder::new(capacity))
-            }
-            DataType::Interval(IntervalUnit::DayTime) => {
-                Box::new(IntervalDayTimeBuilder::new(capacity))
-            }
-            DataType::Duration(TimeUnit::Second) => {
-                Box::new(DurationSecondBuilder::new(capacity))
-            }
-            DataType::Duration(TimeUnit::Millisecond) => {
-                Box::new(DurationMillisecondBuilder::new(capacity))
-            }
-            DataType::Duration(TimeUnit::Microsecond) => {
-                Box::new(DurationMicrosecondBuilder::new(capacity))
-            }
-            DataType::Duration(TimeUnit::Nanosecond) => {
-                Box::new(DurationNanosecondBuilder::new(capacity))
-            }
-            DataType::Struct(fields) => {
-                let schema = Schema::new(fields.clone());
-                Box::new(Self::from_schema(schema, capacity))
-            }
-            t => panic!("Data type {:?} is not currently supported", t),
+        for field in &fields {
+            builders.push(make_builder(field.data_type(), capacity));
         }
+        Self::new(fields, builders)
     }
 
     /// Returns a mutable reference to the child field builder at index `i`.
@@ -3369,7 +3368,7 @@ mod tests {
         let struct_type = DataType::Struct(sub_fields);
         fields.push(Field::new("f3", struct_type, false));
 
-        let mut builder = StructBuilder::from_schema(Schema::new(fields), 5);
+        let mut builder = StructBuilder::from_fields(fields, 5);
         assert_eq!(3, builder.num_fields());
         assert!(builder.field_builder::<Float32Builder>(0).is_some());
         assert!(builder.field_builder::<StringBuilder>(1).is_some());
@@ -3384,7 +3383,7 @@ mod tests {
         let list_type = DataType::List(Box::new(DataType::Int64));
         fields.push(Field::new("f2", list_type, false));
 
-        let _ = StructBuilder::from_schema(Schema::new(fields), 5);
+        let _ = StructBuilder::from_fields(fields, 5);
     }
 
     #[test]
diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs
index c7a6b77..4f926f9 100644
--- a/rust/arrow/src/csv/reader.rs
+++ b/rust/arrow/src/csv/reader.rs
@@ -36,26 +36,29 @@
 //!
 //! let file = File::open("test/data/uk_cities.csv").unwrap();
 //!
-//! let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, 
None);
+//! let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, 
None, None);
 //! let batch = csv.next().unwrap().unwrap();
 //! ```
 
 use lazy_static::lazy_static;
 use regex::{Regex, RegexBuilder};
-use std::collections::HashSet;
-use std::fmt;
 use std::fs::File;
 use std::io::{BufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
+use std::{collections::HashSet, iter::Skip};
+use std::{fmt, iter::Take};
 
 use csv as csv_crate;
 
-use crate::array::{ArrayRef, PrimitiveArray, StringBuilder};
 use crate::datatypes::*;
 use crate::error::{ArrowError, Result};
 use crate::record_batch::RecordBatch;
+use crate::{
+    array::{ArrayRef, PrimitiveArray, StringBuilder},
+    util::buffered_iterator::Buffered,
+};
 
-use self::csv_crate::{StringRecord, StringRecordsIntoIter};
+use self::csv_crate::{Error, StringRecord, StringRecordsIntoIter};
 
 lazy_static! {
     static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap();
@@ -216,6 +219,9 @@ pub fn infer_schema_from_files(
     Schema::try_merge(&schemas)
 }
 
+// optional bounds of the reader, of the form (min line, max line).
+type Bounds = Option<(usize, usize)>;
+
 /// CSV file reader
 pub struct Reader<R: Read> {
     /// Explicit schema for the CSV file
@@ -223,10 +229,9 @@ pub struct Reader<R: Read> {
     /// Optional projection for which columns to load (zero-based column 
indices)
     projection: Option<Vec<usize>>,
     /// File reader
-    record_iter: StringRecordsIntoIter<BufReader<R>>,
-    /// Batch size (number of records to load each time)
-    batch_size: usize,
-    /// Current line number, used in error reporting
+    record_iter:
+        Buffered<Skip<Take<StringRecordsIntoIter<BufReader<R>>>>, 
StringRecord, Error>,
+    /// Current line number
     line_number: usize,
 }
 
@@ -238,7 +243,6 @@ where
         f.debug_struct("Reader")
             .field("schema", &self.schema)
             .field("projection", &self.projection)
-            .field("batch_size", &self.batch_size)
             .field("line_number", &self.line_number)
             .finish()
     }
@@ -256,6 +260,7 @@ impl<R: Read> Reader<R> {
         has_header: bool,
         delimiter: Option<u8>,
         batch_size: usize,
+        bounds: Bounds,
         projection: Option<Vec<usize>>,
     ) -> Self {
         Self::from_buf_reader(
@@ -264,6 +269,7 @@ impl<R: Read> Reader<R> {
             has_header,
             delimiter,
             batch_size,
+            bounds,
             projection,
         )
     }
@@ -293,6 +299,7 @@ impl<R: Read> Reader<R> {
         has_header: bool,
         delimiter: Option<u8>,
         batch_size: usize,
+        bounds: Bounds,
         projection: Option<Vec<usize>>,
     ) -> Self {
         let mut reader_builder = csv_crate::ReaderBuilder::new();
@@ -304,147 +311,44 @@ impl<R: Read> Reader<R> {
 
         let csv_reader = reader_builder.from_reader(buf_reader);
         let record_iter = csv_reader.into_records();
+
+        let (start, end) = match bounds {
+            None => (0, usize::MAX),
+            Some((start, end)) => (start, end),
+        };
+        // Create an iterator that:
+        // * skips the first `start` items
+        // * runs up to `end` items
+        // * buffers `batch_size` items
+        // note that this skips by iteration. This is because in general it is 
not possible
+        // to seek in CSV. However, skiping still saves the burden of creating 
arrow arrays,
+        // which is a slow operation that scales with the number of columns
+        let record_iter = Buffered::new(record_iter.take(end).skip(start), 
batch_size);
+
         Self {
             schema,
             projection,
             record_iter,
-            batch_size,
-            line_number: if has_header { 1 } else { 0 },
+            line_number: if has_header { start + 1 } else { start + 0 },
         }
     }
-
-    fn parse(&self, rows: &[StringRecord]) -> Result<RecordBatch> {
-        let projection: Vec<usize> = match self.projection {
-            Some(ref v) => v.clone(),
-            None => self
-                .schema
-                .fields()
-                .iter()
-                .enumerate()
-                .map(|(i, _)| i)
-                .collect(),
-        };
-
-        let arrays: Result<Vec<ArrayRef>> = projection
-            .iter()
-            .map(|i| {
-                let i = *i;
-                let field = self.schema.field(i);
-                match field.data_type() {
-                    &DataType::Boolean => {
-                        self.build_primitive_array::<BooleanType>(rows, i)
-                    }
-                    &DataType::Int8 => 
self.build_primitive_array::<Int8Type>(rows, i),
-                    &DataType::Int16 => 
self.build_primitive_array::<Int16Type>(rows, i),
-                    &DataType::Int32 => 
self.build_primitive_array::<Int32Type>(rows, i),
-                    &DataType::Int64 => 
self.build_primitive_array::<Int64Type>(rows, i),
-                    &DataType::UInt8 => 
self.build_primitive_array::<UInt8Type>(rows, i),
-                    &DataType::UInt16 => {
-                        self.build_primitive_array::<UInt16Type>(rows, i)
-                    }
-                    &DataType::UInt32 => {
-                        self.build_primitive_array::<UInt32Type>(rows, i)
-                    }
-                    &DataType::UInt64 => {
-                        self.build_primitive_array::<UInt64Type>(rows, i)
-                    }
-                    &DataType::Float32 => {
-                        self.build_primitive_array::<Float32Type>(rows, i)
-                    }
-                    &DataType::Float64 => {
-                        self.build_primitive_array::<Float64Type>(rows, i)
-                    }
-                    &DataType::Utf8 => {
-                        let mut builder = StringBuilder::new(rows.len());
-                        for row in rows.iter() {
-                            match row.get(i) {
-                                Some(s) => builder.append_value(s).unwrap(),
-                                _ => builder.append(false).unwrap(),
-                            }
-                        }
-                        Ok(Arc::new(builder.finish()) as ArrayRef)
-                    }
-                    other => Err(ArrowError::ParseError(format!(
-                        "Unsupported data type {:?}",
-                        other
-                    ))),
-                }
-            })
-            .collect();
-
-        let schema_fields = self.schema.fields();
-
-        let projected_fields: Vec<Field> = projection
-            .iter()
-            .map(|i| schema_fields[*i].clone())
-            .collect();
-
-        let projected_schema = Arc::new(Schema::new(projected_fields));
-
-        arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr))
-    }
-
-    fn build_primitive_array<T: ArrowPrimitiveType>(
-        &self,
-        rows: &[StringRecord],
-        col_idx: usize,
-    ) -> Result<ArrayRef> {
-        let is_boolean_type =
-            *self.schema.field(col_idx).data_type() == DataType::Boolean;
-
-        rows.iter()
-            .enumerate()
-            .map(|(row_index, row)| {
-                match row.get(col_idx) {
-                    Some(s) => {
-                        if s.is_empty() {
-                            return Ok(None);
-                        }
-                        let parsed = if is_boolean_type {
-                            s.to_lowercase().parse::<T::Native>()
-                        } else {
-                            s.parse::<T::Native>()
-                        };
-                        match parsed {
-                            Ok(e) => Ok(Some(e)),
-                            Err(_) => Err(ArrowError::ParseError(format!(
-                                // TODO: we should surface the underlying 
error here.
-                                "Error while parsing value {} for column {} at 
line {}",
-                                s,
-                                col_idx,
-                                self.line_number + row_index
-                            ))),
-                        }
-                    }
-                    None => Ok(None),
-                }
-            })
-            .collect::<Result<PrimitiveArray<T>>>()
-            .map(|e| Arc::new(e) as ArrayRef)
-    }
 }
 
 impl<R: Read> Iterator for Reader<R> {
     type Item = Result<RecordBatch>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        // read a batch of rows into memory
-        let mut rows: Vec<StringRecord> = Vec::with_capacity(self.batch_size);
-        for i in 0..self.batch_size {
-            match self.record_iter.next() {
-                Some(Ok(r)) => {
-                    rows.push(r);
-                }
-                Some(Err(e)) => {
-                    return Some(Err(ArrowError::ParseError(format!(
-                        "Error parsing line {}: {:?}",
-                        self.line_number + i,
-                        e
-                    ))));
-                }
-                None => break,
+        let rows = match self.record_iter.next() {
+            Some(Ok(r)) => r,
+            Some(Err(e)) => {
+                return Some(Err(ArrowError::ParseError(format!(
+                    "Error parsing line {}: {:?}",
+                    self.line_number + self.record_iter.n(),
+                    e
+                ))));
             }
-        }
+            None => return None,
+        };
 
         // return early if no data was loaded
         if rows.is_empty() {
@@ -452,7 +356,12 @@ impl<R: Read> Iterator for Reader<R> {
         }
 
         // parse the batches into a RecordBatch
-        let result = self.parse(&rows);
+        let result = parse(
+            &rows,
+            &self.schema.fields(),
+            &self.projection,
+            self.line_number,
+        );
 
         self.line_number += rows.len();
 
@@ -460,6 +369,120 @@ impl<R: Read> Iterator for Reader<R> {
     }
 }
 
+/// parses a slice of [csv_crate::StringRecord] into a 
[array::record_batch::RecordBatch].
+fn parse(
+    rows: &[StringRecord],
+    fields: &Vec<Field>,
+    projection: &Option<Vec<usize>>,
+    line_number: usize,
+) -> Result<RecordBatch> {
+    let projection: Vec<usize> = match projection {
+        Some(ref v) => v.clone(),
+        None => fields.iter().enumerate().map(|(i, _)| i).collect(),
+    };
+
+    let arrays: Result<Vec<ArrayRef>> = projection
+        .iter()
+        .map(|i| {
+            let i = *i;
+            let field = &fields[i];
+            match field.data_type() {
+                &DataType::Boolean => {
+                    build_primitive_array::<BooleanType>(line_number, rows, i)
+                }
+                &DataType::Int8 => {
+                    build_primitive_array::<Int8Type>(line_number, rows, i)
+                }
+                &DataType::Int16 => {
+                    build_primitive_array::<Int16Type>(line_number, rows, i)
+                }
+                &DataType::Int32 => {
+                    build_primitive_array::<Int32Type>(line_number, rows, i)
+                }
+                &DataType::Int64 => {
+                    build_primitive_array::<Int64Type>(line_number, rows, i)
+                }
+                &DataType::UInt8 => {
+                    build_primitive_array::<UInt8Type>(line_number, rows, i)
+                }
+                &DataType::UInt16 => {
+                    build_primitive_array::<UInt16Type>(line_number, rows, i)
+                }
+                &DataType::UInt32 => {
+                    build_primitive_array::<UInt32Type>(line_number, rows, i)
+                }
+                &DataType::UInt64 => {
+                    build_primitive_array::<UInt64Type>(line_number, rows, i)
+                }
+                &DataType::Float32 => {
+                    build_primitive_array::<Float32Type>(line_number, rows, i)
+                }
+                &DataType::Float64 => {
+                    build_primitive_array::<Float64Type>(line_number, rows, i)
+                }
+                &DataType::Utf8 => {
+                    let mut builder = StringBuilder::new(rows.len());
+                    for row in rows.iter() {
+                        match row.get(i) {
+                            Some(s) => builder.append_value(s).unwrap(),
+                            _ => builder.append(false).unwrap(),
+                        }
+                    }
+                    Ok(Arc::new(builder.finish()) as ArrayRef)
+                }
+                other => Err(ArrowError::ParseError(format!(
+                    "Unsupported data type {:?}",
+                    other
+                ))),
+            }
+        })
+        .collect();
+
+    let projected_fields: Vec<Field> =
+        projection.iter().map(|i| fields[*i].clone()).collect();
+
+    let projected_schema = Arc::new(Schema::new(projected_fields));
+
+    arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr))
+}
+
+// parses a specific column (col_idx) into an Arrow Array.
+fn build_primitive_array<T: ArrowPrimitiveType>(
+    line_number: usize,
+    rows: &[StringRecord],
+    col_idx: usize,
+) -> Result<ArrayRef> {
+    rows.iter()
+        .enumerate()
+        .map(|(row_index, row)| {
+            match row.get(col_idx) {
+                Some(s) => {
+                    if s.is_empty() {
+                        return Ok(None);
+                    }
+                    let parsed = if T::DATA_TYPE == DataType::Boolean {
+                        s.to_lowercase().parse::<T::Native>()
+                    } else {
+                        s.parse::<T::Native>()
+                    };
+                    match parsed {
+                        Ok(e) => Ok(Some(e)),
+                        Err(_) => Err(ArrowError::ParseError(format!(
+                            // TODO: we should surface the underlying error 
here.
+                            "Error while parsing value {} for column {} at 
line {}",
+                            s,
+                            col_idx,
+                            line_number + row_index
+                        ))),
+                    }
+                }
+                None => Ok(None),
+            }
+        })
+        .collect::<Result<PrimitiveArray<T>>>()
+        .map(|e| Arc::new(e) as ArrayRef)
+}
+
 /// CSV file reader builder
 #[derive(Debug)]
 pub struct ReaderBuilder {
@@ -483,6 +506,8 @@ pub struct ReaderBuilder {
     ///
     /// The default batch size when using the `ReaderBuilder` is 1024 records
     batch_size: usize,
+    /// The bounds over which to scan the reader. `None` starts from 0 and 
runs until EOF.
+    bounds: Bounds,
     /// Optional projection for which columns to load (zero-based column 
indices)
     projection: Option<Vec<usize>>,
 }
@@ -495,6 +520,7 @@ impl Default for ReaderBuilder {
             delimiter: None,
             max_records: None,
             batch_size: 1024,
+            bounds: None,
             projection: None,
         }
     }
@@ -584,18 +610,15 @@ impl ReaderBuilder {
                 Arc::new(inferred_schema)
             }
         };
-        let csv_reader = csv_crate::ReaderBuilder::new()
-            .delimiter(delimiter)
-            .has_headers(self.has_header)
-            .from_reader(buf_reader);
-        let record_iter = csv_reader.into_records();
-        Ok(Reader {
+        Ok(Reader::from_buf_reader(
+            buf_reader,
             schema,
-            projection: self.projection.clone(),
-            record_iter,
-            batch_size: self.batch_size,
-            line_number: if self.has_header { 1 } else { 0 },
-        })
+            self.has_header,
+            self.delimiter,
+            self.batch_size,
+            None,
+            self.projection.clone(),
+        ))
     }
 }
 
@@ -620,8 +643,15 @@ mod tests {
 
         let file = File::open("test/data/uk_cities.csv").unwrap();
 
-        let mut csv =
-            Reader::new(file, Arc::new(schema.clone()), false, None, 1024, 
None);
+        let mut csv = Reader::new(
+            file,
+            Arc::new(schema.clone()),
+            false,
+            None,
+            1024,
+            None,
+            None,
+        );
         assert_eq!(Arc::new(schema), csv.schema());
         let batch = csv.next().unwrap().unwrap();
         assert_eq!(37, batch.num_rows());
@@ -666,6 +696,7 @@ mod tests {
             None,
             1024,
             None,
+            None,
         );
         let batch = csv.next().unwrap().unwrap();
         assert_eq!(74, batch.num_rows());
@@ -755,8 +786,15 @@ mod tests {
 
         let file = File::open("test/data/uk_cities.csv").unwrap();
 
-        let mut csv =
-            Reader::new(file, Arc::new(schema), false, None, 1024, 
Some(vec![0, 1]));
+        let mut csv = Reader::new(
+            file,
+            Arc::new(schema),
+            false,
+            None,
+            1024,
+            None,
+            Some(vec![0, 1]),
+        );
         let projected_schema = Arc::new(Schema::new(vec![
             Field::new("city", DataType::Utf8, false),
             Field::new("lat", DataType::Float64, false),
@@ -778,7 +816,7 @@ mod tests {
 
         let file = File::open("test/data/null_test.csv").unwrap();
 
-        let mut csv = Reader::new(file, Arc::new(schema), true, None, 1024, 
None);
+        let mut csv = Reader::new(file, Arc::new(schema), true, None, 1024, 
None, None);
         let batch = csv.next().unwrap().unwrap();
 
         assert_eq!(false, batch.column(1).is_null(0));
@@ -906,4 +944,51 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_bounded() -> Result<()> {
+        let schema = Schema::new(vec![Field::new("int", DataType::UInt32, 
false)]);
+        let data = vec![
+            vec!["0"],
+            vec!["1"],
+            vec!["2"],
+            vec!["3"],
+            vec!["4"],
+            vec!["5"],
+            vec!["6"],
+        ];
+
+        let data = data
+            .iter()
+            .map(|x| x.join(","))
+            .collect::<Vec<_>>()
+            .join("\n");
+        let data = data.as_bytes();
+
+        let reader = std::io::Cursor::new(data);
+
+        let mut csv = Reader::new(
+            reader,
+            Arc::new(schema),
+            false,
+            None,
+            2,
+            // starting at row 2 and up to row 6.
+            Some((2, 6)),
+            Some(vec![0]),
+        );
+
+        let batch = csv.next().unwrap().unwrap();
+        let a = batch.column(0);
+        let a = a.as_any().downcast_ref::<UInt32Array>().unwrap();
+        assert_eq!(a, &UInt32Array::from(vec![2, 3]));
+
+        let batch = csv.next().unwrap().unwrap();
+        let a = batch.column(0);
+        let a = a.as_any().downcast_ref::<UInt32Array>().unwrap();
+        assert_eq!(a, &UInt32Array::from(vec![4, 5]));
+
+        assert!(csv.next().is_none());
+        Ok(())
+    }
 }
diff --git a/rust/arrow/src/util/buffered_iterator.rs 
b/rust/arrow/src/util/buffered_iterator.rs
new file mode 100644
index 0000000..059b824
--- /dev/null
+++ b/rust/arrow/src/util/buffered_iterator.rs
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [Buffered] is an iterator useful to build an [arrow::array::Array] and 
other
+//! containers that benefit from batching or chunking.
+
+use std::marker::PhantomData;
+
+/// An iterator that buffers results in a vector so that the iterator returns 
a vector of `size` items.
+/// The items must be a [std::result::Result] and if an error is returned, tha 
error is returned
+/// and the iterator continues.
+/// An invariant of this iterator is that every returned vector's size is at 
most the specified size.
+#[derive(Debug)]
+pub struct Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    iter: I,
+    size: usize,
+    buffer: Vec<T>,
+    phantom: PhantomData<R>,
+}
+
+impl<I, T, R> Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    pub fn new(iter: I, size: usize) -> Self {
+        Buffered {
+            iter,
+            size,
+            buffer: Vec::with_capacity(size),
+            phantom: PhantomData,
+        }
+    }
+
+    /// returns the number of items buffered so far.
+    /// Useful to extract the exact item where an error occurred
+    #[inline]
+    pub fn n(&self) -> usize {
+        return self.buffer.len();
+    }
+}
+
+impl<I, T, R> Iterator for Buffered<I, T, R>
+where
+    T: Clone,
+    I: Iterator<Item = Result<T, R>>,
+{
+    type Item = Result<Vec<T>, R>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        for _ in 0..(self.size - self.n()) {
+            match self.iter.next() {
+                Some(Ok(item)) => self.buffer.push(item),
+                Some(Err(error)) => return Some(Err(error)),
+                None => break,
+            }
+        }
+        if self.buffer.is_empty() {
+            None
+        } else {
+            let result = self.buffer.clone();
+            self.buffer.clear();
+            Some(Ok(result))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[derive(Debug, PartialEq)]
+    struct AError {}
+
+    impl std::fmt::Display for AError {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            write!(f, "Bla")
+        }
+    }
+    impl std::error::Error for AError {}
+
+    #[test]
+    fn test_basic() {
+        let a: Vec<Result<i32, AError>> = vec![Ok(1), Ok(2), Ok(3)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Ok(vec![3])));
+        assert_eq!(iter.next(), None);
+    }
+
+    #[test]
+    fn test_error_first() {
+        let a: Vec<Result<i32, AError>> =
+            vec![Ok(1), Ok(2), Err(AError {}), Ok(4), Ok(5)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Ok(vec![1, 2])));
+        assert_eq!(iter.next(), Some(Err(AError {})));
+        // 4 is here: it was not skipped on the previous
+        assert_eq!(iter.n(), 0);
+        assert_eq!(iter.next(), Some(Ok(vec![4, 5])));
+        assert_eq!(iter.next(), None);
+    }
+
+    #[test]
+    fn test_error_last() {
+        let a: Vec<Result<i32, AError>> = vec![Ok(1), Err(AError {}), Ok(3), 
Ok(4)];
+        let iter = a.into_iter();
+        let mut iter = Buffered::new(iter, 2);
+
+        assert_eq!(iter.next(), Some(Err(AError {})));
+        assert_eq!(iter.n(), 1);
+        assert_eq!(iter.next(), Some(Ok(vec![1, 3])));
+        assert_eq!(iter.next(), Some(Ok(vec![4])));
+        assert_eq!(iter.next(), None);
+    }
+}
diff --git a/rust/arrow/src/util/mod.rs b/rust/arrow/src/util/mod.rs
index 0f95043..053d132 100644
--- a/rust/arrow/src/util/mod.rs
+++ b/rust/arrow/src/util/mod.rs
@@ -17,6 +17,7 @@
 
 pub mod bit_chunk_iterator;
 pub mod bit_util;
+pub mod buffered_iterator;
 pub mod display;
 pub mod integration_util;
 #[cfg(feature = "prettyprint")]
diff --git a/rust/datafusion/src/physical_plan/csv.rs 
b/rust/datafusion/src/physical_plan/csv.rs
index ba40ddd..f28523f 100644
--- a/rust/datafusion/src/physical_plan/csv.rs
+++ b/rust/datafusion/src/physical_plan/csv.rs
@@ -256,6 +256,7 @@ impl CsvStream {
             has_header,
             delimiter,
             batch_size,
+            None,
             projection.clone(),
         );
 

Reply via email to