This is an automated email from the ASF dual-hosted git repository. uwe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 635ee1f ARROW-2521: [Rust] Refactor Rust API to use traits and generic to represent Array instead of enum 635ee1f is described below commit 635ee1f59e550981eed0e493685383d1378a6224 Author: Andy Grove <andygrov...@gmail.com> AuthorDate: Thu May 17 18:15:44 2018 +0200 ARROW-2521: [Rust] Refactor Rust API to use traits and generic to represent Array instead of enum Author: Andy Grove <andygrov...@gmail.com> Closes #1971 from andygrove/refactor_rust_api_v2 and squashes the following commits: a04d66a9 <Andy Grove> cargo fmt with 1.26.0 f3f71dda <Andy Grove> Rename BufferArray to PrimitiveArray 10714a1f <Andy Grove> cargo fmt b2d9e42e <Andy Grove> add assertions to RecordBatch d577510c <Andy Grove> Remove need to clone array be3a981d <Andy Grove> cargo fmt 22f907ab <Andy Grove> Renaming structs and traits and adding documentation 4add4f05 <Andy Grove> Revert "Add type coercion helper method" 51270de5 <Andy Grove> Add type coercion helper method cc40ba45 <Andy Grove> Removing macros, implemented min/max for arrays of primitives 01bc9538 <Andy Grove> implement min/max for primitive array b2659b10 <Andy Grove> run cargo fmt with stable rust 66c016e3 <Andy Grove> use usize instead of i32 (except for list offsets) dbe49a74 <Andy Grove> Rebase d1bfdca5 <Andy Grove> Merge branch 'master' of github.com:andygrove/arrow 2bae169e <Andy Grove> Refactor Rust API to use traits and generic to represent Array instead of enum 52de6a10 <Andy Grove> Merge branch 'master' of github.com:andygrove/arrow 0e2606b2 <Andy Grove> Merge remote-tracking branch 'upstream/master' d883da2f <Andy Grove> Merge remote-tracking branch 'upstream/master' 589ef71d <Andy Grove> Merge remote-tracking branch 'upstream/master' bd4fbb55 <Andy Grove> Merge remote-tracking branch 'upstream/master' 9c8a10a4 <Andy Grove> Merge remote-tracking branch 'upstream/master' 05592f8c <Andy Grove> Merge remote-tracking branch 'upstream/master' 8c0e6982 <Andy Grove> Merge remote-tracking branch 'upstream/master' 31ef90ba <Andy Grove> Merge remote-tracking branch 'upstream/master' 2f87c703 <Andy Grove> Fix build - add missing import --- rust/benches/array_from_builder.rs | 2 +- rust/benches/array_from_vec.rs | 2 +- rust/examples/array_from_builder.rs | 10 +- rust/examples/array_from_vec.rs | 10 +- rust/examples/dynamic_types.rs | 89 ++++++ rust/src/array.rs | 528 ++++++++++++++++++++++-------------- rust/src/bitmap.rs | 2 +- rust/src/buffer.rs | 92 ++++--- rust/src/builder.rs | 27 +- rust/src/datatypes.rs | 43 ++- rust/src/lib.rs | 1 + rust/src/list.rs | 31 ++- rust/src/list_builder.rs | 25 +- rust/src/record_batch.rs | 83 ++++++ 14 files changed, 646 insertions(+), 299 deletions(-) diff --git a/rust/benches/array_from_builder.rs b/rust/benches/array_from_builder.rs index 3d02003..2398e2e 100644 --- a/rust/benches/array_from_builder.rs +++ b/rust/benches/array_from_builder.rs @@ -30,7 +30,7 @@ fn array_from_builder(n: usize) { for i in 0..n { v.push(i as i32); } - Array::from(v.finish()); + PrimitiveArray::from(v.finish()); } fn criterion_benchmark(c: &mut Criterion) { diff --git a/rust/benches/array_from_vec.rs b/rust/benches/array_from_vec.rs index 0feb0de..50f0c52 100644 --- a/rust/benches/array_from_vec.rs +++ b/rust/benches/array_from_vec.rs @@ -29,7 +29,7 @@ fn array_from_vec(n: usize) { for i in 0..n { v.push(i as i32); } - Array::from(v); + PrimitiveArray::from(v); } fn criterion_benchmark(c: &mut Criterion) { diff --git a/rust/examples/array_from_builder.rs b/rust/examples/array_from_builder.rs index ea1ecec..d18953d 100644 --- a/rust/examples/array_from_builder.rs +++ b/rust/examples/array_from_builder.rs @@ -37,12 +37,6 @@ fn main() { // builder.build(); // create a memory-aligned Arrow from the builder (zero-copy) - let array = Array::from(buffer); - - match array.data() { - &ArrayData::Int32(ref buffer) => { - println!("array contents: {:?}", buffer.iter().collect::<Vec<i32>>()); - } - _ => {} - } + let array = PrimitiveArray::from(buffer); + println!("array contents: {:?}", array.iter().collect::<Vec<i32>>()); } diff --git a/rust/examples/array_from_vec.rs b/rust/examples/array_from_vec.rs index 8cb4b26..bf0f6c7 100644 --- a/rust/examples/array_from_vec.rs +++ b/rust/examples/array_from_vec.rs @@ -21,12 +21,6 @@ use arrow::array::*; fn main() { // create a memory-aligned Arrow array from an existing Vec - let array = Array::from(vec![1, 2, 3, 4, 5]); - - match array.data() { - &ArrayData::Int32(ref buffer) => { - println!("array contents: {:?}", buffer.iter().collect::<Vec<i32>>()); - } - _ => {} - } + let array = PrimitiveArray::from(vec![1, 2, 3, 4, 5]); + println!("array contents: {:?}", array.iter().collect::<Vec<i32>>()); } diff --git a/rust/examples/dynamic_types.rs b/rust/examples/dynamic_types.rs new file mode 100644 index 0000000..acfd515 --- /dev/null +++ b/rust/examples/dynamic_types.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +///! This example demonstrates dealing with mixed types dynamically at runtime +use std::rc::Rc; + +extern crate arrow; + +use arrow::array::*; +use arrow::datatypes::*; +use arrow::record_batch::*; + +fn main() { + // define schema + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "nested", + DataType::Struct(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Float64, false), + Field::new("c", DataType::Float64, false), + ]), + false, + ), + ]); + + // create some data + let id = PrimitiveArray::from(vec![1, 2, 3, 4, 5]); + + let nested = StructArray::from(vec![ + Rc::new(PrimitiveArray::from(vec!["a", "b", "c", "d", "e"])) as Rc<Array>, + Rc::new(PrimitiveArray::from(vec![1.1, 2.2, 3.3, 4.4, 5.5])), + Rc::new(PrimitiveArray::from(vec![2.2, 3.3, 4.4, 5.5, 6.6])), + ]); + + // build a record batch + let batch = RecordBatch::new(Rc::new(schema), vec![Rc::new(id), Rc::new(nested)]); + + process(&batch); +} + +/// Create a new batch by performing a projection of id, (nested.b + nested.c) AS sum +fn process(batch: &RecordBatch) { + let id = batch.column(0); + let nested = batch + .column(1) + .as_any() + .downcast_ref::<StructArray>() + .unwrap(); + + let nested_b = nested + .column(1) + .as_any() + .downcast_ref::<PrimitiveArray<f64>>() + .unwrap(); + let nested_c = nested + .column(2) + .as_any() + .downcast_ref::<PrimitiveArray<f64>>() + .unwrap(); + + let projected_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("sum", DataType::Float64, false), + ]); + + let _ = RecordBatch::new( + Rc::new(projected_schema), + vec![ + id.clone(), //NOTE: this is cloning the Rc not the array data + Rc::new(nested_b.add(nested_c)), + ], + ); +} diff --git a/rust/src/array.rs b/rust/src/array.rs index 55881f5..5305279 100644 --- a/rust/src/array.rs +++ b/rust/src/array.rs @@ -15,297 +15,421 @@ // specific language governing permissions and limitations // under the License. +///! Array types +use std::any::Any; use std::convert::From; -use std::iter::Iterator; +use std::ops::Add; use std::rc::Rc; use std::str; use std::string::String; use super::bitmap::Bitmap; -use super::buffer::Buffer; -use super::list::List; - -pub enum ArrayData { - Boolean(Buffer<bool>), - Float32(Buffer<f32>), - Float64(Buffer<f64>), - Int8(Buffer<i8>), - Int16(Buffer<i16>), - Int32(Buffer<i32>), - Int64(Buffer<i64>), - UInt8(Buffer<u8>), - UInt16(Buffer<u16>), - UInt32(Buffer<u32>), - UInt64(Buffer<u64>), - Utf8(List<u8>), - Struct(Vec<Rc<Array>>), -} +use super::buffer::*; +use super::builder::*; +use super::datatypes::*; +use super::list::*; +use super::list_builder::*; -macro_rules! arraydata_from_primitive { - ($DT:ty, $AT:ident) => { - impl From<Vec<$DT>> for ArrayData { - fn from(v: Vec<$DT>) -> Self { - ArrayData::$AT(Buffer::from(v)) - } - } - impl From<Buffer<$DT>> for ArrayData { - fn from(v: Buffer<$DT>) -> Self { - ArrayData::$AT(v) - } - } - }; +/// Trait for dealing with different types of Array at runtime when the type of the +/// array is not known in advance +pub trait Array { + /// Returns the length of the array (number of items in the array) + fn len(&self) -> usize; + /// Returns the number of null values in the array + fn null_count(&self) -> usize; + /// Optional validity bitmap (can be None if there are no null values) + fn validity_bitmap(&self) -> &Option<Bitmap>; + /// Return the array as Any so that it can be downcast to a specific implementation + fn as_any(&self) -> &Any; } -arraydata_from_primitive!(bool, Boolean); -arraydata_from_primitive!(f32, Float32); -arraydata_from_primitive!(f64, Float64); -arraydata_from_primitive!(i8, Int8); -arraydata_from_primitive!(i16, Int16); -arraydata_from_primitive!(i32, Int32); -arraydata_from_primitive!(i64, Int64); -arraydata_from_primitive!(u8, UInt8); -arraydata_from_primitive!(u16, UInt16); -arraydata_from_primitive!(u32, UInt32); -arraydata_from_primitive!(u64, UInt64); - -pub struct Array { - /// number of elements in the array - len: i32, - /// number of null elements in the array - null_count: i32, - /// If null_count is greater than zero then the validity_bitmap will be Some(Bitmap) +/// Array of List<T> +pub struct ListArray<T: ArrowPrimitiveType> { + len: usize, + data: List<T>, + null_count: usize, validity_bitmap: Option<Bitmap>, - /// The array of elements - data: ArrayData, } -impl Array { - /// Create a new array where there are no null values - pub fn new(len: usize, data: ArrayData) -> Self { - Array { - len: len as i32, - data, +impl<T> ListArray<T> +where + T: ArrowPrimitiveType, +{ + pub fn get(&self, i: usize) -> &[T] { + self.data.get(i) + } + + pub fn list(&self) -> &List<T> { + &self.data + } +} + +/// Create a ListArray<T> from a List<T> without null values +impl<T> From<List<T>> for ListArray<T> +where + T: ArrowPrimitiveType, +{ + fn from(list: List<T>) -> Self { + let len = list.len(); + ListArray { + len, + data: list, validity_bitmap: None, null_count: 0, } } +} - /// Get a reference to the array data - pub fn data(&self) -> &ArrayData { - &self.data +/// Create ListArray<u8> from Vec<&'static str> +impl From<Vec<&'static str>> for ListArray<u8> { + fn from(v: Vec<&'static str>) -> Self { + let mut builder: ListBuilder<u8> = ListBuilder::with_capacity(v.len()); + for s in v { + builder.push(s.as_bytes()) + } + ListArray::from(builder.finish()) } +} - /// number of elements in the array - pub fn len(&self) -> usize { - self.len as usize +/// Create ListArray<u8> from Vec<String> +impl From<Vec<String>> for ListArray<u8> { + fn from(v: Vec<String>) -> Self { + let mut builder: ListBuilder<u8> = ListBuilder::with_capacity(v.len()); + for s in v { + builder.push(s.as_bytes()) + } + ListArray::from(builder.finish()) } +} - /// number of null elements in the array - pub fn null_count(&self) -> usize { - self.null_count as usize +impl<T> Array for ListArray<T> +where + T: ArrowPrimitiveType, +{ + fn len(&self) -> usize { + self.len } - - /// If null_count is greater than zero then the validity_bitmap will be Some(Bitmap) - pub fn validity_bitmap(&self) -> &Option<Bitmap> { + fn null_count(&self) -> usize { + self.null_count + } + fn validity_bitmap(&self) -> &Option<Bitmap> { &self.validity_bitmap } + fn as_any(&self) -> &Any { + self + } } -macro_rules! array_from_primitive { - ($DT:ty) => { - impl From<Vec<$DT>> for Array { - fn from(v: Vec<$DT>) -> Self { - Array { - len: v.len() as i32, - null_count: 0, - validity_bitmap: None, - data: ArrayData::from(v), - } - } +/// Array of T +pub struct PrimitiveArray<T: ArrowPrimitiveType> { + len: usize, + data: Buffer<T>, + null_count: usize, + validity_bitmap: Option<Bitmap>, +} + +impl<T> PrimitiveArray<T> +where + T: ArrowPrimitiveType, +{ + pub fn new(data: Buffer<T>, null_count: usize, validity_bitmap: Option<Bitmap>) -> Self { + PrimitiveArray { + len: data.len(), + data: data, + null_count, + validity_bitmap, } - impl From<Buffer<$DT>> for Array { - fn from(v: Buffer<$DT>) -> Self { - Array { - len: v.len() as i32, - null_count: 0, - validity_bitmap: None, - data: ArrayData::from(v), + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn iter(&self) -> BufferIterator<T> { + self.data.iter() + } + + pub fn buffer(&self) -> &Buffer<T> { + &self.data + } + + /// Determine the minimum value in the array + pub fn min(&self) -> Option<T> { + let mut n: Option<T> = None; + match &self.validity_bitmap { + &Some(ref bitmap) => for i in 0..self.len { + if bitmap.is_set(i) { + let mut m = self.data.get(i); + match n { + None => n = Some(*m), + Some(nn) => if *m < nn { + n = Some(*m) + }, + } } - } + }, + &None => for i in 0..self.len { + let mut m = self.data.get(i); + match n { + None => n = Some(*m), + Some(nn) => if *m < nn { + n = Some(*m) + }, + } + }, } - }; -} + n + } -array_from_primitive!(bool); -array_from_primitive!(f32); -array_from_primitive!(f64); -array_from_primitive!(u8); -array_from_primitive!(u16); -array_from_primitive!(u32); -array_from_primitive!(u64); -array_from_primitive!(i8); -array_from_primitive!(i16); -array_from_primitive!(i32); -array_from_primitive!(i64); - -macro_rules! array_from_optional_primitive { - ($DT:ty, $DEFAULT:expr) => { - impl From<Vec<Option<$DT>>> for Array { - fn from(v: Vec<Option<$DT>>) -> Self { - let mut null_count = 0; - let mut validity_bitmap = Bitmap::new(v.len()); - for i in 0..v.len() { - if v[i].is_none() { - null_count += 1; - validity_bitmap.clear(i); + /// Determine the maximum value in the array + pub fn max(&self) -> Option<T> { + let mut n: Option<T> = None; + match &self.validity_bitmap { + &Some(ref bitmap) => for i in 0..self.len { + if bitmap.is_set(i) { + let mut m = self.data.get(i); + match n { + None => n = Some(*m), + Some(nn) => if *m > nn { + n = Some(*m) + }, } } - let values = v.iter() - .map(|x| x.unwrap_or($DEFAULT)) - .collect::<Vec<$DT>>(); - Array { - len: values.len() as i32, - null_count, - validity_bitmap: Some(validity_bitmap), - data: ArrayData::from(values), + }, + &None => for i in 0..self.len { + let mut m = self.data.get(i); + match n { + None => n = Some(*m), + Some(nn) => if *m > nn { + n = Some(*m) + }, } - } + }, } - }; + n + } } -array_from_optional_primitive!(bool, false); -array_from_optional_primitive!(f32, 0_f32); -array_from_optional_primitive!(f64, 0_f64); -array_from_optional_primitive!(u8, 0_u8); -array_from_optional_primitive!(u16, 0_u16); -array_from_optional_primitive!(u32, 0_u32); -array_from_optional_primitive!(u64, 0_u64); -array_from_optional_primitive!(i8, 0_i8); -array_from_optional_primitive!(i16, 0_i16); -array_from_optional_primitive!(i32, 0_i32); -array_from_optional_primitive!(i64, 0_i64); - -/// This method mostly just used for unit tests -impl From<Vec<&'static str>> for Array { - fn from(v: Vec<&'static str>) -> Self { - Array::from(v.iter().map(|s| s.to_string()).collect::<Vec<String>>()) +/// Implement the Add operation for types that support Add +impl<T> PrimitiveArray<T> +where + T: ArrowPrimitiveType + Add<Output = T>, +{ + pub fn add(&self, other: &PrimitiveArray<T>) -> PrimitiveArray<T> { + let mut builder: Builder<T> = Builder::new(); + for i in 0..self.len { + let x = *self.data.get(i) + *other.data.get(i); + builder.push(x); + } + PrimitiveArray::from(builder.finish()) } } -impl From<Vec<String>> for Array { - fn from(v: Vec<String>) -> Self { - Array { - len: v.len() as i32, - null_count: 0, +impl<T> Array for PrimitiveArray<T> +where + T: ArrowPrimitiveType, +{ + fn len(&self) -> usize { + self.len + } + fn null_count(&self) -> usize { + self.null_count + } + fn validity_bitmap(&self) -> &Option<Bitmap> { + &self.validity_bitmap + } + fn as_any(&self) -> &Any { + self + } +} + +/// Create a BufferArray<T> from a Buffer<T> without null values +impl<T> From<Buffer<T>> for PrimitiveArray<T> +where + T: ArrowPrimitiveType, +{ + fn from(data: Buffer<T>) -> Self { + PrimitiveArray { + len: data.len(), + data: data, validity_bitmap: None, - data: ArrayData::Utf8(List::from(v)), + null_count: 0, } } } -impl From<Vec<Rc<Array>>> for Array { - fn from(v: Vec<Rc<Array>>) -> Self { - Array { - len: v.len() as i32, +/// Create a BufferArray<T> from a Vec<T> of primitive values +impl<T> From<Vec<T>> for PrimitiveArray<T> +where + T: ArrowPrimitiveType + 'static, +{ + fn from(vec: Vec<T>) -> Self { + PrimitiveArray::from(Buffer::from(vec)) + } +} + +/// Create a BufferArray<T> from a Vec<Optional<T>> with null handling +impl<T> From<Vec<Option<T>>> for PrimitiveArray<T> +where + T: ArrowPrimitiveType + 'static, +{ + fn from(v: Vec<Option<T>>) -> Self { + let mut builder: Builder<T> = Builder::with_capacity(v.len()); + builder.set_len(v.len()); + let mut null_count = 0; + let mut validity_bitmap = Bitmap::new(v.len()); + for i in 0..v.len() { + match v[i] { + Some(value) => builder.set(i, value), + None => { + null_count += 1; + validity_bitmap.clear(i); + } + } + } + PrimitiveArray::new(builder.finish(), null_count, Some(validity_bitmap)) + } +} + +/// An Array of structs +pub struct StructArray { + len: usize, + columns: Vec<Rc<Array>>, + null_count: usize, + validity_bitmap: Option<Bitmap>, +} + +impl StructArray { + pub fn num_columns(&self) -> usize { + self.columns.len() + } + pub fn column(&self, i: usize) -> &Rc<Array> { + &self.columns[i] + } +} + +impl Array for StructArray { + fn len(&self) -> usize { + self.len + } + fn null_count(&self) -> usize { + self.null_count + } + fn validity_bitmap(&self) -> &Option<Bitmap> { + &self.validity_bitmap + } + fn as_any(&self) -> &Any { + self + } +} + +/// Create a StructArray from a list of arrays representing the fields of the struct. The fields +/// must be in the same order as the schema defining the struct. +impl From<Vec<Rc<Array>>> for StructArray { + fn from(data: Vec<Rc<Array>>) -> Self { + StructArray { + len: data[0].len(), + columns: data, null_count: 0, validity_bitmap: None, - data: ArrayData::Struct(v.iter().map(|a| a.clone()).collect()), } } } #[cfg(test)] mod tests { - use super::super::datatypes::*; use super::*; #[test] - fn test_utf8_offsets() { - let a = Array::from(vec!["this", "is", "a", "test"]); - assert_eq!(4, a.len()); - match *a.data() { - ArrayData::Utf8(ref list) => { - assert_eq!(11, list.data().len()); - assert_eq!(0, *list.offsets().get(0)); - assert_eq!(4, *list.offsets().get(1)); - assert_eq!(6, *list.offsets().get(2)); - assert_eq!(7, *list.offsets().get(3)); - assert_eq!(11, *list.offsets().get(4)); - } - _ => panic!(), - } + fn array_data_from_list_u8() { + let mut b: ListBuilder<u8> = ListBuilder::new(); + b.push(&[1, 2, 3, 4, 5]); + b.push(&[5, 4, 3, 2, 1]); + let array_data = ListArray::from(b.finish()); + assert_eq!(2, array_data.len()); } #[test] - fn test_utf8_slices() { - let a = Array::from(vec!["this", "is", "a", "test"]); - match *a.data() { - ArrayData::Utf8(ref d) => { - assert_eq!(4, d.len()); - assert_eq!("this", str::from_utf8(d.slice(0)).unwrap()); - assert_eq!("is", str::from_utf8(d.slice(1)).unwrap()); - assert_eq!("a", str::from_utf8(d.slice(2)).unwrap()); - assert_eq!("test", str::from_utf8(d.slice(3)).unwrap()); - } - _ => panic!(), - } + fn array_from_list_u8() { + let mut b: ListBuilder<u8> = ListBuilder::new(); + b.push("Hello, ".as_bytes()); + b.push("World!".as_bytes()); + let array = ListArray::from(b.finish()); + // downcast back to the data + let array_list_u8 = array.as_any().downcast_ref::<ListArray<u8>>().unwrap(); + assert_eq!(2, array_list_u8.len()); + assert_eq!("Hello, ", str::from_utf8(array_list_u8.get(0)).unwrap()); + assert_eq!("World!", str::from_utf8(array_list_u8.get(1)).unwrap()); } #[test] fn test_from_bool() { - let a = Array::from(vec![false, false, true, false]); + let a = PrimitiveArray::from(vec![false, false, true, false]); assert_eq!(4, a.len()); + assert_eq!(0, a.null_count()); } #[test] fn test_from_f32() { - let a = Array::from(vec![1.23, 2.34, 3.45, 4.56]); + let a = PrimitiveArray::from(vec![1.23, 2.34, 3.45, 4.56]); assert_eq!(4, a.len()); } #[test] fn test_from_i32() { - let a = Array::from(vec![15, 14, 13, 12, 11]); + let a = PrimitiveArray::from(vec![15, 14, 13, 12, 11]); assert_eq!(5, a.len()); - match *a.data() { - ArrayData::Int32(ref b) => { - assert_eq!(vec![15, 14, 13, 12, 11], b.iter().collect::<Vec<i32>>()); - } - _ => panic!(), - } } #[test] fn test_from_empty_vec() { let v: Vec<i32> = vec![]; - let a = Array::from(v); + let a = PrimitiveArray::from(v); assert_eq!(0, a.len()); } #[test] fn test_from_optional_i32() { - let a = Array::from(vec![Some(1), None, Some(2), Some(3), None]); + let a = PrimitiveArray::from(vec![Some(1), None, Some(2), Some(3), None]); assert_eq!(5, a.len()); + assert_eq!(2, a.null_count()); // 1 == not null - let validity_bitmap = a.validity_bitmap.unwrap(); - assert_eq!(true, validity_bitmap.is_set(0)); - assert_eq!(false, validity_bitmap.is_set(1)); - assert_eq!(true, validity_bitmap.is_set(2)); - assert_eq!(true, validity_bitmap.is_set(3)); - assert_eq!(false, validity_bitmap.is_set(4)); + match a.validity_bitmap() { + &Some(ref validity_bitmap) => { + assert_eq!(true, validity_bitmap.is_set(0)); + assert_eq!(false, validity_bitmap.is_set(1)); + assert_eq!(true, validity_bitmap.is_set(2)); + assert_eq!(true, validity_bitmap.is_set(3)); + assert_eq!(false, validity_bitmap.is_set(4)); + } + _ => panic!(), + } } #[test] fn test_struct() { - let _schema = DataType::Struct(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Float32, false), - ]); - - let a = Rc::new(Array::from(vec![1, 2, 3, 4, 5])); - let b = Rc::new(Array::from(vec![1.1, 2.2, 3.3, 4.4, 5.5])); - let _ = Rc::new(Array::from(vec![a, b])); + let a: Rc<Array> = Rc::new(PrimitiveArray::from(Buffer::from(vec![1, 2, 3, 4, 5]))); + let b: Rc<Array> = Rc::new(PrimitiveArray::from(Buffer::from(vec![ + 1.1, 2.2, 3.3, 4.4, 5.5 + ]))); + + let s = StructArray::from(vec![a, b]); + assert_eq!(2, s.num_columns()); + assert_eq!(0, s.null_count()); + } + + #[test] + fn test_buffer_array_min_max() { + let a = PrimitiveArray::from(Buffer::from(vec![5, 6, 7, 8, 9])); + assert_eq!(5, a.min().unwrap()); + assert_eq!(9, a.max().unwrap()); + } + + #[test] + fn test_buffer_array_min_max_with_nulls() { + let a = PrimitiveArray::from(vec![Some(5), None, None, Some(8), Some(9)]); + assert_eq!(5, a.min().unwrap()); + assert_eq!(9, a.max().unwrap()); } } diff --git a/rust/src/bitmap.rs b/rust/src/bitmap.rs index 59c6513..27bf2d6 100644 --- a/rust/src/bitmap.rs +++ b/rust/src/bitmap.rs @@ -39,7 +39,7 @@ impl Bitmap { } } - pub fn len(&self) -> i32 { + pub fn len(&self) -> usize { self.bits.len() } diff --git a/rust/src/buffer.rs b/rust/src/buffer.rs index e8168f2..b6e68d8 100644 --- a/rust/src/buffer.rs +++ b/rust/src/buffer.rs @@ -20,25 +20,32 @@ use libc; use std::mem; use std::slice; +use super::datatypes::*; use super::memory::*; /// Buffer<T> is essentially just a Vec<T> for fixed-width primitive types and the start of the /// memory region is aligned at a 64-byte boundary -pub struct Buffer<T> { +pub struct Buffer<T> +where + T: ArrowPrimitiveType, +{ /// Contiguous memory region holding instances of primitive T data: *const T, /// Number of elements in the buffer - len: i32, + len: usize, } -impl<T> Buffer<T> { +impl<T> Buffer<T> +where + T: ArrowPrimitiveType, +{ /// create a buffer from an existing region of memory (must already be byte-aligned) - pub unsafe fn from_raw_parts(data: *const T, len: i32) -> Self { + pub unsafe fn from_raw_parts(data: *const T, len: usize) -> Self { Buffer { data, len } } /// Get the number of elements in the buffer - pub fn len(&self) -> i32 { + pub fn len(&self) -> usize { self.len } @@ -48,20 +55,20 @@ impl<T> Buffer<T> { } pub fn slice(&self, start: usize, end: usize) -> &[T] { - assert!(end <= self.len as usize); + assert!(end <= self.len); assert!(start <= end); - unsafe { slice::from_raw_parts(self.data.offset(start as isize), (end - start) as usize) } + unsafe { slice::from_raw_parts(self.data.offset(start as isize), end - start) } } /// Get a reference to the value at the specified offset pub fn get(&self, i: usize) -> &T { - assert!(i < self.len as usize); + assert!(i < self.len); unsafe { &(*self.data.offset(i as isize)) } } /// Write to a slot in the buffer pub fn set(&mut self, i: usize, v: T) { - assert!(i < self.len as usize); + assert!(i < self.len); let p = self.data as *mut T; unsafe { *p.offset(i as isize) = v; @@ -79,22 +86,28 @@ impl<T> Buffer<T> { } /// Release the underlying memory when the Buffer goes out of scope -impl<T> Drop for Buffer<T> { +impl<T> Drop for Buffer<T> +where + T: ArrowPrimitiveType, +{ fn drop(&mut self) { free_aligned(self.data as *const u8); } } /// Iterator over the elements of a buffer -pub struct BufferIterator<T> { +pub struct BufferIterator<T> +where + T: ArrowPrimitiveType, +{ data: *const T, - len: i32, + len: usize, index: isize, } impl<T> Iterator for BufferIterator<T> where - T: Copy, + T: ArrowPrimitiveType, { type Item = T; @@ -110,39 +123,30 @@ where } /// Copy the memory from a Vec<T> into a newly allocated Buffer<T> -macro_rules! array_from_primitive { - ($DT:ty) => { - impl From<Vec<$DT>> for Buffer<$DT> { - fn from(v: Vec<$DT>) -> Self { - // allocate aligned memory buffer - let len = v.len(); - let sz = mem::size_of::<$DT>(); - let buffer = allocate_aligned((len * sz) as i64).unwrap(); - Buffer { - len: len as i32, - data: unsafe { - let dst = buffer as *mut libc::c_void; - libc::memcpy(dst, v.as_ptr() as *const libc::c_void, len * sz); - dst as *const $DT - }, - } - } +impl<T> From<Vec<T>> for Buffer<T> +where + T: ArrowPrimitiveType, +{ + fn from(v: Vec<T>) -> Self { + // allocate aligned memory buffer + let len = v.len(); + let sz = mem::size_of::<T>(); + let buffer = allocate_aligned((len * sz) as i64).unwrap(); + Buffer { + len, + data: unsafe { + let dst = mem::transmute::<*const u8, *mut libc::c_void>(buffer); + libc::memcpy( + dst, + mem::transmute::<*const T, *const libc::c_void>(v.as_ptr()), + len * sz, + ); + mem::transmute::<*mut libc::c_void, *const T>(dst) + }, } - }; + } } -array_from_primitive!(bool); -array_from_primitive!(f32); -array_from_primitive!(f64); -array_from_primitive!(u8); -array_from_primitive!(u16); -array_from_primitive!(u32); -array_from_primitive!(u64); -array_from_primitive!(i8); -array_from_primitive!(i16); -array_from_primitive!(i32); -array_from_primitive!(i64); - impl From<Bytes> for Buffer<u8> { fn from(bytes: Bytes) -> Self { // allocate aligned @@ -151,7 +155,7 @@ impl From<Bytes> for Buffer<u8> { let buf_mem = allocate_aligned((len * sz) as i64).unwrap(); let dst = buf_mem as *mut libc::c_void; Buffer { - len: len as i32, + len, data: unsafe { libc::memcpy(dst, bytes.as_ptr() as *const libc::c_void, len * sz); dst as *mut u8 diff --git a/rust/src/builder.rs b/rust/src/builder.rs index 2d5e321..833d6e8 100644 --- a/rust/src/builder.rs +++ b/rust/src/builder.rs @@ -22,16 +22,23 @@ use std::ptr; use std::slice; use super::buffer::*; +use super::datatypes::*; use super::memory::*; /// Buffer builder with zero-copy build method -pub struct Builder<T> { +pub struct Builder<T> +where + T: ArrowPrimitiveType, +{ data: *mut T, len: usize, capacity: usize, } -impl<T> Builder<T> { +impl<T> Builder<T> +where + T: ArrowPrimitiveType, +{ /// Creates a builder with a default capacity pub fn new() -> Self { Builder::with_capacity(64) @@ -87,6 +94,15 @@ impl<T> Builder<T> { self.len += 1; } + /// Set a value at a slot in the allocated memory without adjusting the length + pub fn set(&mut self, i: usize, v: T) { + assert!(!self.data.is_null()); + assert!(i < self.capacity); + unsafe { + *self.data.offset(i as isize) = v; + } + } + /// push a slice of type T, growing the internal buffer as needed pub fn push_slice(&mut self, slice: &[T]) { self.reserve(slice.len()); @@ -131,11 +147,14 @@ impl<T> Builder<T> { assert!(!self.data.is_null()); let p = self.data as *const T; self.data = ptr::null_mut(); // ensure builder cannot be re-used - unsafe { Buffer::from_raw_parts(p, self.len as i32) } + unsafe { Buffer::from_raw_parts(p, self.len) } } } -impl<T> Drop for Builder<T> { +impl<T> Drop for Builder<T> +where + T: ArrowPrimitiveType, +{ fn drop(&mut self) { if !self.data.is_null() { free_aligned(self.data as *const u8); diff --git a/rust/src/datatypes.rs b/rust/src/datatypes.rs index d486ab3..d1ac454 100644 --- a/rust/src/datatypes.rs +++ b/rust/src/datatypes.rs @@ -35,9 +35,34 @@ pub enum DataType { Float32, Float64, Utf8, + List(Box<DataType>), Struct(Vec<Field>), } +/// Arrow struct/schema field +#[derive(Debug, Clone, PartialEq)] +pub struct Field { + name: String, + data_type: DataType, + nullable: bool, +} + +/// Primitive type (ints, floats, strings) +pub trait ArrowPrimitiveType: Copy + PartialOrd + 'static {} + +impl ArrowPrimitiveType for bool {} +impl ArrowPrimitiveType for u8 {} +impl ArrowPrimitiveType for u16 {} +impl ArrowPrimitiveType for u32 {} +impl ArrowPrimitiveType for u64 {} +impl ArrowPrimitiveType for i8 {} +impl ArrowPrimitiveType for i16 {} +impl ArrowPrimitiveType for i32 {} +impl ArrowPrimitiveType for i64 {} +impl ArrowPrimitiveType for f32 {} +impl ArrowPrimitiveType for f64 {} +impl ArrowPrimitiveType for &'static str {} + impl DataType { /// Parse a data type from a JSON representation fn from(json: &Value) -> Result<DataType, ArrowError> { @@ -129,18 +154,14 @@ impl DataType { Value::Array(fields.iter().map(|f| f.to_json()).collect::<Vec<Value>>()); json!({ "fields": field_json_array }) } + DataType::List(ref t) => { + let child_json = t.to_json(); + json!({ "name": "list", "children": child_json }) + } } } } -/// Arrow Field -#[derive(Debug, Clone, PartialEq)] -pub struct Field { - name: String, - data_type: DataType, - nullable: bool, -} - impl Field { pub fn new(name: &str, data_type: DataType, nullable: bool) -> Self { Field { @@ -243,8 +264,12 @@ impl Schema { &self.columns } + pub fn column(&self, i: usize) -> &Field { + &self.columns[i] + } + /// look up a column by name and return a reference to the column along with it's index - pub fn column(&self, name: &str) -> Option<(usize, &Field)> { + pub fn column_with_name(&self, name: &str) -> Option<(usize, &Field)> { self.columns .iter() .enumerate() diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 80b53a1..b67b31b 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -31,3 +31,4 @@ pub mod list; pub mod list_builder; pub mod memory; pub mod memory_pool; +pub mod record_batch; diff --git a/rust/src/list.rs b/rust/src/list.rs index f1e9e6a..32dcb6f 100644 --- a/rust/src/list.rs +++ b/rust/src/list.rs @@ -18,25 +18,32 @@ use std::str; use super::buffer::Buffer; +use super::datatypes::*; use super::list_builder::ListBuilder; /// List<T> is a nested type in which each array slot contains a variable-size sequence of values of /// the same type T -pub struct List<T> { +pub struct List<T> +where + T: ArrowPrimitiveType, +{ /// Contiguous region of memory holding contents of the lists data: Buffer<T>, /// offsets to start of each array slot offsets: Buffer<i32>, } -impl<T> List<T> { +impl<T> List<T> +where + T: ArrowPrimitiveType, +{ /// Create a List from raw parts pub fn from_raw_parts(data: Buffer<T>, offsets: Buffer<i32>) -> Self { List { data, offsets } } /// Get the length of the List (number of array slots) - pub fn len(&self) -> i32 { + pub fn len(&self) -> usize { self.offsets.len() - 1 } @@ -51,7 +58,7 @@ impl<T> List<T> { } /// Get the contents of a single array slot - pub fn slice(&self, index: usize) -> &[T] { + pub fn get(&self, index: usize) -> &[T] { let start = *self.offsets.get(index) as usize; let end = *self.offsets.get(index + 1) as usize; self.data.slice(start, end) @@ -84,20 +91,20 @@ mod tests { fn test_utf8_slices() { let list = List::from(vec!["this", "is", "a", "test"]); assert_eq!(4, list.len()); - assert_eq!("this", str::from_utf8(list.slice(0)).unwrap()); - assert_eq!("is", str::from_utf8(list.slice(1)).unwrap()); - assert_eq!("a", str::from_utf8(list.slice(2)).unwrap()); - assert_eq!("test", str::from_utf8(list.slice(3)).unwrap()); + assert_eq!("this", str::from_utf8(list.get(0)).unwrap()); + assert_eq!("is", str::from_utf8(list.get(1)).unwrap()); + assert_eq!("a", str::from_utf8(list.get(2)).unwrap()); + assert_eq!("test", str::from_utf8(list.get(3)).unwrap()); } #[test] fn test_utf8_empty_strings() { let list = List::from(vec!["", "", "", ""]); assert_eq!(4, list.len()); - assert_eq!("", str::from_utf8(list.slice(0)).unwrap()); - assert_eq!("", str::from_utf8(list.slice(1)).unwrap()); - assert_eq!("", str::from_utf8(list.slice(2)).unwrap()); - assert_eq!("", str::from_utf8(list.slice(3)).unwrap()); + assert_eq!("", str::from_utf8(list.get(0)).unwrap()); + assert_eq!("", str::from_utf8(list.get(1)).unwrap()); + assert_eq!("", str::from_utf8(list.get(2)).unwrap()); + assert_eq!("", str::from_utf8(list.get(3)).unwrap()); } } diff --git a/rust/src/list_builder.rs b/rust/src/list_builder.rs index acda6be..970ff00 100644 --- a/rust/src/list_builder.rs +++ b/rust/src/list_builder.rs @@ -16,15 +16,22 @@ // under the License. use super::builder::*; +use super::datatypes::*; use super::list::List; /// Builder for List<T> -pub struct ListBuilder<T> { +pub struct ListBuilder<T> +where + T: ArrowPrimitiveType, +{ data: Builder<T>, offsets: Builder<i32>, } -impl<T> ListBuilder<T> { +impl<T> ListBuilder<T> +where + T: ArrowPrimitiveType, +{ /// Create a ListBuilder with a default capacity pub fn new() -> Self { ListBuilder::with_capacity(64) @@ -63,8 +70,8 @@ mod tests { let buffer = b.finish(); assert_eq!(2, buffer.len()); - assert_eq!("Hello, ".as_bytes(), buffer.slice(0)); - assert_eq!("World!".as_bytes(), buffer.slice(1)); + assert_eq!("Hello, ".as_bytes(), buffer.get(0)); + assert_eq!("World!".as_bytes(), buffer.get(1)); } #[test] @@ -74,8 +81,8 @@ mod tests { b.push("World!".as_bytes()); let buffer = b.finish(); assert_eq!(2, buffer.len()); - assert_eq!("Hello, ".as_bytes(), buffer.slice(0)); - assert_eq!("World!".as_bytes(), buffer.slice(1)); + assert_eq!("Hello, ".as_bytes(), buffer.get(0)); + assert_eq!("World!".as_bytes(), buffer.get(1)); } #[test] @@ -87,8 +94,8 @@ mod tests { let buffer = b.finish(); assert_eq!(3, buffer.len()); - assert_eq!("Hello, ".as_bytes(), buffer.slice(0)); - assert_eq!("".as_bytes(), buffer.slice(1)); - assert_eq!("World!".as_bytes(), buffer.slice(2)); + assert_eq!("Hello, ".as_bytes(), buffer.get(0)); + assert_eq!("".as_bytes(), buffer.get(1)); + assert_eq!("World!".as_bytes(), buffer.get(2)); } } diff --git a/rust/src/record_batch.rs b/rust/src/record_batch.rs new file mode 100644 index 0000000..d8ca2f1 --- /dev/null +++ b/rust/src/record_batch.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::array::*; +use super::datatypes::*; +use std::rc::Rc; + +/// A batch of column-oriented data +pub struct RecordBatch { + schema: Rc<Schema>, + columns: Vec<Rc<Array>>, +} + +impl RecordBatch { + pub fn new(schema: Rc<Schema>, columns: Vec<Rc<Array>>) -> Self { + // assert that there are some columns + assert!(columns.len() > 0); + // assert that all columns have the same row count + let len = columns[0].len(); + for i in 1..columns.len() { + assert_eq!(len, columns[i].len()); + } + RecordBatch { schema, columns } + } + + pub fn schema(&self) -> &Rc<Schema> { + &self.schema + } + + pub fn num_columns(&self) -> usize { + self.columns.len() + } + + pub fn num_rows(&self) -> usize { + self.columns[0].len() + } + + pub fn column(&self, i: usize) -> &Rc<Array> { + &self.columns[i] + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn create_record_batch() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ]); + + let a = PrimitiveArray::from(vec![1, 2, 3, 4, 5]); + let b = PrimitiveArray::from(vec!["a", "b", "c", "d", "e"]); + + let record_batch = RecordBatch::new(Rc::new(schema), vec![Rc::new(a), Rc::new(b)]); + + assert_eq!(5, record_batch.num_rows()); + assert_eq!(2, record_batch.num_columns()); + assert_eq!( + &DataType::Int32, + record_batch.schema().column(0).data_type() + ); + assert_eq!(&DataType::Utf8, record_batch.schema().column(1).data_type()); + assert_eq!(5, record_batch.column(0).len()); + assert_eq!(5, record_batch.column(1).len()); + } +} -- To stop receiving notification emails like this one, please contact u...@apache.org.