Repository: arrow Updated Branches: refs/heads/master 1f04f7ff9 -> 4bd13b852
ARROW-91: Basic Parquet read support Depends on (mainly one line fixes): - [x] https://github.com/apache/parquet-cpp/pull/99 - [x] https://github.com/apache/parquet-cpp/pull/98 - [x] https://github.com/apache/parquet-cpp/pull/97 Author: Uwe L. Korn <uw...@xhochy.com> Author: Wes McKinney <w...@apache.org> Closes #73 from xhochy/arrow-91 and squashes the following commits: 7579fed [Uwe L. Korn] Mark single argument constructor as explicit 47441a1 [Uwe L. Korn] Assert that no exception was thrown 5fa1026 [Uwe L. Korn] Incorporate review comments 8d2db22 [Uwe L. Korn] ARROW-91: Basic Parquet read support d9940d8 [Wes McKinney] Public API draft Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/4bd13b85 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/4bd13b85 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/4bd13b85 Branch: refs/heads/master Commit: 4bd13b852d376065fdb16c36fa821ab0e167f0fc Parents: 1f04f7f Author: Uwe L. Korn <uw...@xhochy.com> Authored: Tue May 10 15:58:04 2016 -0700 Committer: Wes McKinney <w...@apache.org> Committed: Tue May 10 15:58:04 2016 -0700 ---------------------------------------------------------------------- cpp/src/arrow/parquet/CMakeLists.txt | 4 + cpp/src/arrow/parquet/parquet-reader-test.cc | 116 +++++++++++++ cpp/src/arrow/parquet/reader.cc | 194 ++++++++++++++++++++++ cpp/src/arrow/parquet/reader.h | 134 +++++++++++++++ cpp/src/arrow/parquet/schema.cc | 8 +- cpp/src/arrow/parquet/schema.h | 2 +- cpp/src/arrow/parquet/utils.h | 38 +++++ 7 files changed, 488 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/4bd13b85/cpp/src/arrow/parquet/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt index 0d5cf26..1ae6709 100644 --- a/cpp/src/arrow/parquet/CMakeLists.txt +++ b/cpp/src/arrow/parquet/CMakeLists.txt @@ -19,6 +19,7 @@ # arrow_parquet : Arrow <-> Parquet adapter set(PARQUET_SRCS + reader.cc schema.cc ) @@ -36,6 +37,9 @@ SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX) ADD_ARROW_TEST(parquet-schema-test) ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet) +ADD_ARROW_TEST(parquet-reader-test) +ARROW_TEST_LINK_LIBRARIES(parquet-reader-test arrow_parquet) + # Headers: top level install(FILES DESTINATION include/arrow/parquet) http://git-wip-us.apache.org/repos/asf/arrow/blob/4bd13b85/cpp/src/arrow/parquet/parquet-reader-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/parquet-reader-test.cc b/cpp/src/arrow/parquet/parquet-reader-test.cc new file mode 100644 index 0000000..a7fc2a8 --- /dev/null +++ b/cpp/src/arrow/parquet/parquet-reader-test.cc @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gtest/gtest.h" + +#include "arrow/test-util.h" +#include "arrow/parquet/reader.h" +#include "arrow/types/primitive.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +#include "parquet/api/reader.h" +#include "parquet/api/writer.h" + +using ParquetBuffer = parquet::Buffer; +using parquet::BufferReader; +using parquet::InMemoryOutputStream; +using parquet::Int64Writer; +using parquet::ParquetFileReader; +using parquet::ParquetFileWriter; +using parquet::RandomAccessSource; +using parquet::Repetition; +using parquet::SchemaDescriptor; +using ParquetType = parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +namespace arrow { + +namespace parquet { + +class TestReadParquet : public ::testing::Test { + public: + virtual void SetUp() {} + + std::shared_ptr<GroupNode> Int64Schema() { + auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64); + NodePtr node_ = + GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode})); + return std::static_pointer_cast<GroupNode>(node_); + } + + std::unique_ptr<ParquetFileReader> Int64File( + std::vector<int64_t>& values, int num_chunks) { + std::shared_ptr<GroupNode> schema = Int64Schema(); + std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream()); + auto file_writer = ParquetFileWriter::Open(sink, schema); + size_t chunk_size = values.size() / num_chunks; + for (int i = 0; i < num_chunks; i++) { + auto row_group_writer = file_writer->AppendRowGroup(chunk_size); + auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn()); + int64_t* data = values.data() + i * chunk_size; + column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); + column_writer->Close(); + row_group_writer->Close(); + } + file_writer->Close(); + + std::shared_ptr<ParquetBuffer> buffer = sink->GetBuffer(); + std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer)); + return ParquetFileReader::Open(std::move(source)); + } + + private: +}; + +TEST_F(TestReadParquet, SingleColumnInt64) { + std::vector<int64_t> values(100, 128); + std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 1); + arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); + std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader; + ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); + ASSERT_NE(nullptr, column_reader.get()); + std::shared_ptr<Array> out; + ASSERT_OK(column_reader->NextBatch(100, &out)); + ASSERT_NE(nullptr, out.get()); + Int64Array* out_array = static_cast<Int64Array*>(out.get()); + for (size_t i = 0; i < values.size(); i++) { + EXPECT_EQ(values[i], out_array->raw_data()[i]); + } +} + +TEST_F(TestReadParquet, SingleColumnInt64Chunked) { + std::vector<int64_t> values(100, 128); + std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 4); + arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); + std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader; + ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); + ASSERT_NE(nullptr, column_reader.get()); + std::shared_ptr<Array> out; + ASSERT_OK(column_reader->NextBatch(100, &out)); + ASSERT_NE(nullptr, out.get()); + Int64Array* out_array = static_cast<Int64Array*>(out.get()); + for (size_t i = 0; i < values.size(); i++) { + EXPECT_EQ(values[i], out_array->raw_data()[i]); + } +} + +} // namespace parquet + +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/4bd13b85/cpp/src/arrow/parquet/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc new file mode 100644 index 0000000..481ded5 --- /dev/null +++ b/cpp/src/arrow/parquet/reader.cc @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/parquet/reader.h" + +#include <queue> + +#include "arrow/parquet/schema.h" +#include "arrow/parquet/utils.h" +#include "arrow/schema.h" +#include "arrow/types/primitive.h" +#include "arrow/util/status.h" + +using parquet::ColumnReader; +using parquet::TypedColumnReader; + +namespace arrow { +namespace parquet { + +class FileReader::Impl { + public: + Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); + virtual ~Impl() {} + + Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out); + Status ReadFlatColumn(int i, std::shared_ptr<Array>* out); + + private: + MemoryPool* pool_; + std::unique_ptr<::parquet::ParquetFileReader> reader_; +}; + +class FlatColumnReader::Impl { + public: + Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, + ::parquet::ParquetFileReader* reader, int column_index); + virtual ~Impl() {} + + Status NextBatch(int batch_size, std::shared_ptr<Array>* out); + template <typename ArrowType, typename ParquetType, typename CType> + Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out); + + private: + void NextRowGroup(); + + MemoryPool* pool_; + const ::parquet::ColumnDescriptor* descr_; + ::parquet::ParquetFileReader* reader_; + int column_index_; + int next_row_group_; + std::shared_ptr<ColumnReader> column_reader_; + std::shared_ptr<Field> field_; + + PoolBuffer values_buffer_; + PoolBuffer def_levels_buffer_; + PoolBuffer rep_levels_buffer_; +}; + +FileReader::Impl::Impl( + MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) + : pool_(pool), reader_(std::move(reader)) {} + +Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) { + std::unique_ptr<FlatColumnReader::Impl> impl( + new FlatColumnReader::Impl(pool_, reader_->descr()->Column(i), reader_.get(), i)); + *out = std::unique_ptr<FlatColumnReader>(new FlatColumnReader(std::move(impl))); + return Status::OK(); +} + +Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) { + std::unique_ptr<FlatColumnReader> flat_column_reader; + RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader)); + return flat_column_reader->NextBatch(reader_->num_rows(), out); +} + +FileReader::FileReader( + MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) + : impl_(new FileReader::Impl(pool, std::move(reader))) {} + +FileReader::~FileReader() {} + +Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) { + return impl_->GetFlatColumn(i, out); +} + +Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) { + return impl_->ReadFlatColumn(i, out); +} + +FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, + ::parquet::ParquetFileReader* reader, int column_index) + : pool_(pool), + descr_(descr), + reader_(reader), + column_index_(column_index), + next_row_group_(0), + values_buffer_(pool), + def_levels_buffer_(pool), + rep_levels_buffer_(pool) { + NodeToField(descr_->schema_node(), &field_); + NextRowGroup(); +} + +template <typename ArrowType, typename ParquetType, typename CType> +Status FlatColumnReader::Impl::TypedReadBatch( + int batch_size, std::shared_ptr<Array>* out) { + int values_to_read = batch_size; + NumericBuilder<ArrowType> builder(pool_, field_->type); + while ((values_to_read > 0) && column_reader_) { + values_buffer_.Resize(values_to_read * sizeof(CType)); + if (descr_->max_definition_level() > 0) { + def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); + } + if (descr_->max_repetition_level() > 0) { + rep_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); + } + auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get()); + int64_t values_read; + CType* values = reinterpret_cast<CType*>(values_buffer_.mutable_data()); + PARQUET_CATCH_NOT_OK( + values_to_read -= reader->ReadBatch(values_to_read, + reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()), + reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data()), values, + &values_read)); + if (descr_->max_definition_level() == 0) { + RETURN_NOT_OK(builder.Append(values, values_read)); + } else { + return Status::NotImplemented("no support for definition levels yet"); + } + if (!column_reader_->HasNext()) { NextRowGroup(); } + } + *out = builder.Finish(); + return Status::OK(); +} + +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType, CType) \ + case Type::ENUM: \ + return TypedReadBatch<ArrowType, ParquetType, CType>(batch_size, out); \ + break; + +Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) { + if (!column_reader_) { + // Exhausted all row groups. + *out = nullptr; + return Status::OK(); + } + + if (descr_->max_repetition_level() > 0) { + return Status::NotImplemented("no support for repetition yet"); + } + + switch (field_->type->type) { + TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type, int32_t) + TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type, int64_t) + TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType, float) + TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType, double) + default: + return Status::NotImplemented(field_->type->ToString()); + } +} + +void FlatColumnReader::Impl::NextRowGroup() { + if (next_row_group_ < reader_->num_row_groups()) { + column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_); + next_row_group_++; + } else { + column_reader_ = nullptr; + } +} + +FlatColumnReader::FlatColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {} + +FlatColumnReader::~FlatColumnReader() {} + +Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) { + return impl_->NextBatch(batch_size, out); +} + +} // namespace parquet +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/4bd13b85/cpp/src/arrow/parquet/reader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h new file mode 100644 index 0000000..41ca7eb --- /dev/null +++ b/cpp/src/arrow/parquet/reader.h @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_PARQUET_READER_H +#define ARROW_PARQUET_READER_H + +#include <memory> + +#include "parquet/api/reader.h" +#include "parquet/api/schema.h" + +namespace arrow { + +class Array; +class MemoryPool; +class RowBatch; +class Status; + +namespace parquet { + +class FlatColumnReader; + +// Arrow read adapter class for deserializing Parquet files as Arrow row +// batches. +// +// TODO(wesm): nested data does not always make sense with this user +// interface unless you are only reading a single leaf node from a branch of +// a table. For example: +// +// repeated group data { +// optional group record { +// optional int32 val1; +// optional byte_array val2; +// optional bool val3; +// } +// optional int32 val4; +// } +// +// In the Parquet file, there are 3 leaf nodes: +// +// * data.record.val1 +// * data.record.val2 +// * data.record.val3 +// * data.val4 +// +// When materializing this data in an Arrow array, we would have: +// +// data: list<struct< +// record: struct< +// val1: int32, +// val2: string (= list<uint8>), +// val3: bool, +// >, +// val4: int32 +// >> +// +// However, in the Parquet format, each leaf node has its own repetition and +// definition levels describing the structure of the intermediate nodes in +// this array structure. Thus, we will need to scan the leaf data for a group +// of leaf nodes part of the same type tree to create a single result Arrow +// nested array structure. +// +// This is additionally complicated "chunky" repeated fields or very large byte +// arrays +class FileReader { + public: + FileReader(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); + + // Since the distribution of columns amongst a Parquet file's row groups may + // be uneven (the number of values in each column chunk can be different), we + // provide a column-oriented read interface. The ColumnReader hides the + // details of paging through the file's row groups and yielding + // fully-materialized arrow::Array instances + // + // Returns error status if the column of interest is not flat. + Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out); + // Read column as a whole into an Array. + Status ReadFlatColumn(int i, std::shared_ptr<Array>* out); + + virtual ~FileReader(); + + private: + class Impl; + std::unique_ptr<Impl> impl_; +}; + +// At this point, the column reader is a stream iterator. It only knows how to +// read the next batch of values for a particular column from the file until it +// runs out. +// +// We also do not expose any internal Parquet details, such as row groups. This +// might change in the future. +class FlatColumnReader { + public: + virtual ~FlatColumnReader(); + + // Scan the next array of the indicated size. The actual size of the + // returned array may be less than the passed size depending how much data is + // available in the file. + // + // When all the data in the file has been exhausted, the result is set to + // nullptr. + // + // Returns Status::OK on a successful read, including if you have exhausted + // the data available in the file. + Status NextBatch(int batch_size, std::shared_ptr<Array>* out); + + private: + class Impl; + std::unique_ptr<Impl> impl_; + explicit FlatColumnReader(std::unique_ptr<Impl> impl); + + friend class FileReader; +}; + +} // namespace parquet + +} // namespace arrow + +#endif // ARROW_PARQUET_READER_H http://git-wip-us.apache.org/repos/asf/arrow/blob/4bd13b85/cpp/src/arrow/parquet/schema.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc index 214c764..fd75894 100644 --- a/cpp/src/arrow/parquet/schema.cc +++ b/cpp/src/arrow/parquet/schema.cc @@ -21,13 +21,12 @@ #include <vector> #include "parquet/api/schema.h" -#include "parquet/exception.h" +#include "arrow/parquet/utils.h" #include "arrow/types/decimal.h" #include "arrow/types/string.h" #include "arrow/util/status.h" -using parquet::ParquetException; using parquet::Repetition; using parquet::schema::Node; using parquet::schema::NodePtr; @@ -41,11 +40,6 @@ namespace arrow { namespace parquet { -#define PARQUET_CATCH_NOT_OK(s) \ - try { \ - (s); \ - } catch (const ParquetException& e) { return Status::Invalid(e.what()); } - const auto BOOL = std::make_shared<BooleanType>(); const auto UINT8 = std::make_shared<UInt8Type>(); const auto INT32 = std::make_shared<Int32Type>(); http://git-wip-us.apache.org/repos/asf/arrow/blob/4bd13b85/cpp/src/arrow/parquet/schema.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/schema.h b/cpp/src/arrow/parquet/schema.h index bfc7d21..ec5f960 100644 --- a/cpp/src/arrow/parquet/schema.h +++ b/cpp/src/arrow/parquet/schema.h @@ -45,4 +45,4 @@ Status ToParquetSchema( } // namespace arrow -#endif +#endif // ARROW_PARQUET_SCHEMA_H http://git-wip-us.apache.org/repos/asf/arrow/blob/4bd13b85/cpp/src/arrow/parquet/utils.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/utils.h b/cpp/src/arrow/parquet/utils.h new file mode 100644 index 0000000..b32792f --- /dev/null +++ b/cpp/src/arrow/parquet/utils.h @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_PARQUET_UTILS_H +#define ARROW_PARQUET_UTILS_H + +#include "arrow/util/status.h" + +#include "parquet/exception.h" + +namespace arrow { + +namespace parquet { + +#define PARQUET_CATCH_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) { return Status::Invalid(e.what()); } + +} // namespace parquet + +} // namespace arrow + +#endif // ARROW_PARQUET_UTILS_H