Repository: arrow Updated Branches: refs/heads/master 8197f246d -> ec66ddd1f
ARROW-203: Python: Basic filename based Parquet read/write Author: Uwe L. Korn <uw...@xhochy.com> Closes #83 from xhochy/arrow-203 and squashes the following commits: 405f85d [Uwe L. Korn] Remove FindParquet duplication 38d786c [Uwe L. Korn] Make code more readable by using using ec07768 [Uwe L. Korn] Set LD_LIBRARY_PATH in python build 8d90d3f [Uwe L. Korn] Do not set LD_LIBRARY_PATH in python build 000e1e3 [Uwe L. Korn] Use unique_ptr and shared_ptr from Cython 8f6010a [Uwe L. Korn] Linter fixes 0514d01 [Uwe L. Korn] Handle exceptions on RowGroupWriter::Close better 77bd21a [Uwe L. Korn] Add pandas roundtrip to tests f583b61 [Uwe L. Korn] Fix rpath for libarrow_parquet 00c1461 [Uwe L. Korn] Also ensure correct OSX compiler flags in PyArrow 4a80116 [Uwe L. Korn] Handle Python3 strings correctly 066c08a [Uwe L. Korn] Add missing functions to smart pointers 5706db2 [Uwe L. Korn] Use length and offset instead of slicing 443de8b [Uwe L. Korn] Add miniconda to the LD_LIBRARY_PATH 2dffc14 [Uwe L. Korn] Fix min mistake, use equals instead of == 2006e70 [Uwe L. Korn] Rewrite test py.test style 9520c39 [Uwe L. Korn] Use PARQUET from miniconda path cd3b9a9 [Uwe L. Korn] Also search for Parquet in PyArrow 6a41d23 [Uwe L. Korn] Re-use conda installation from C++ 81f501e [Uwe L. Korn] No need to install conda in travis_script_python anymore b505feb [Uwe L. Korn] Install parquet-cpp via conda 5d4929a [Uwe L. Korn] Add test-util.h 9b06e41 [Uwe L. Korn] Make tests templated be6415c [Uwe L. Korn] Incorportate review comments 0fbed3f [Uwe L. Korn] Remove obsolete parquet files 081db5f [Uwe L. Korn] Limit and document chunk_size 7192cfb [Uwe L. Korn] Add const to slicing parameters 0463995 [Uwe L. Korn] ARROW-203: Python: Basic filename based Parquet read/write Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ec66ddd1 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ec66ddd1 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ec66ddd1 Branch: refs/heads/master Commit: ec66ddd1fd4954b78967bfa1893480473e4d380c Parents: 8197f24 Author: Uwe L. Korn <uw...@xhochy.com> Authored: Fri Jun 10 15:08:23 2016 -0700 Committer: Wes McKinney <w...@apache.org> Committed: Fri Jun 10 15:08:23 2016 -0700 ---------------------------------------------------------------------- ci/travis_before_script_cpp.sh | 6 +- ci/travis_conda_build.sh | 22 +-- ci/travis_install_conda.sh | 26 +++ ci/travis_script_python.sh | 21 +-- cpp/src/arrow/column.h | 2 + cpp/src/arrow/parquet/CMakeLists.txt | 7 + cpp/src/arrow/parquet/parquet-io-test.cc | 256 +++++++++++++++++++------- cpp/src/arrow/parquet/reader.cc | 25 +++ cpp/src/arrow/parquet/reader.h | 3 + cpp/src/arrow/parquet/test-util.h | 77 ++++++++ cpp/src/arrow/parquet/utils.h | 5 + cpp/src/arrow/parquet/writer.cc | 99 +++++++--- cpp/src/arrow/parquet/writer.h | 12 +- cpp/src/arrow/util/status.h | 9 + python/CMakeLists.txt | 8 + python/cmake_modules/FindArrow.cmake | 14 +- python/conda.recipe/build.sh | 13 ++ python/pyarrow/array.pyx | 3 + python/pyarrow/error.pxd | 2 + python/pyarrow/error.pyx | 8 + python/pyarrow/includes/common.pxd | 9 +- python/pyarrow/includes/libarrow.pxd | 3 + python/pyarrow/includes/parquet.pxd | 46 +++++ python/pyarrow/parquet.pyx | 50 ++++- python/pyarrow/schema.pyx | 9 +- python/pyarrow/tests/test_parquet.py | 59 ++++++ python/setup.py | 4 +- 27 files changed, 654 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/ci/travis_before_script_cpp.sh ---------------------------------------------------------------------- diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index 193c76f..6159f67 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -2,6 +2,10 @@ set -e +source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh +conda install -y --channel apache/channel/dev parquet-cpp +export PARQUET_HOME=$MINICONDA + : ${CPP_BUILD_DIR=$TRAVIS_BUILD_DIR/cpp-build} mkdir $CPP_BUILD_DIR @@ -19,7 +23,7 @@ echo $GTEST_HOME : ${ARROW_CPP_INSTALL=$TRAVIS_BUILD_DIR/cpp-install} -CMAKE_COMMON_FLAGS="-DARROW_BUILD_BENCHMARKS=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL" +CMAKE_COMMON_FLAGS="-DARROW_BUILD_BENCHMARKS=ON -DARROW_PARQUET=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL" if [ $TRAVIS_OS_NAME == "linux" ]; then cmake -DARROW_TEST_MEMCHECK=on $CMAKE_COMMON_FLAGS -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/ci/travis_conda_build.sh ---------------------------------------------------------------------- diff --git a/ci/travis_conda_build.sh b/ci/travis_conda_build.sh index afa531d..c43a851 100755 --- a/ci/travis_conda_build.sh +++ b/ci/travis_conda_build.sh @@ -2,27 +2,7 @@ set -e -if [ $TRAVIS_OS_NAME == "linux" ]; then - MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh" -else - MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-MacOSX-x86_64.sh" -fi - -wget -O miniconda.sh $MINICONDA_URL -MINICONDA=$TRAVIS_BUILD_DIR/miniconda -bash miniconda.sh -b -p $MINICONDA -export PATH="$MINICONDA/bin:$PATH" -conda update -y -q conda -conda info -a - -conda config --set show_channel_urls yes -conda config --add channels conda-forge -conda config --add channels apache - -conda install --yes conda-build jinja2 anaconda-client - -# faster builds, please -conda install -y nomkl +source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh # Build libarrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/ci/travis_install_conda.sh ---------------------------------------------------------------------- diff --git a/ci/travis_install_conda.sh b/ci/travis_install_conda.sh new file mode 100644 index 0000000..bef667d --- /dev/null +++ b/ci/travis_install_conda.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +set -e + +if [ $TRAVIS_OS_NAME == "linux" ]; then + MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh" +else + MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-MacOSX-x86_64.sh" +fi + +wget -O miniconda.sh $MINICONDA_URL +export MINICONDA=$TRAVIS_BUILD_DIR/miniconda +bash miniconda.sh -b -p $MINICONDA +export PATH="$MINICONDA/bin:$PATH" +conda update -y -q conda +conda info -a + +conda config --set show_channel_urls yes +conda config --add channels conda-forge +conda config --add channels apache + +conda install --yes conda-build jinja2 anaconda-client + +# faster builds, please +conda install -y nomkl + http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/ci/travis_script_python.sh ---------------------------------------------------------------------- diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh index d45b895..6d35785 100755 --- a/ci/travis_script_python.sh +++ b/ci/travis_script_python.sh @@ -4,6 +4,12 @@ set -e PYTHON_DIR=$TRAVIS_BUILD_DIR/python +# Re-use conda installation from C++ +export MINICONDA=$TRAVIS_BUILD_DIR/miniconda +export PATH="$MINICONDA/bin:$PATH" +export LD_LIBRARY_PATH="$MINICONDA/lib:$LD_LIBRARY_PATH" +export PARQUET_HOME=$MINICONDA + # Share environment with C++ pushd $CPP_BUILD_DIR source setup_build_env.sh @@ -11,21 +17,6 @@ popd pushd $PYTHON_DIR -# Bootstrap a Conda Python environment - -if [ $TRAVIS_OS_NAME == "linux" ]; then - MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh" -else - MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-MacOSX-x86_64.sh" -fi - -curl $MINICONDA_URL > miniconda.sh -MINICONDA=$TRAVIS_BUILD_DIR/miniconda -bash miniconda.sh -b -p $MINICONDA -export PATH="$MINICONDA/bin:$PATH" -conda update -y -q conda -conda info -a - python_version_tests() { PYTHON_VERSION=$1 CONDA_ENV_NAME="pyarrow-test-${PYTHON_VERSION}" http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/column.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/column.h b/cpp/src/arrow/column.h index 22becc3..e409566 100644 --- a/cpp/src/arrow/column.h +++ b/cpp/src/arrow/column.h @@ -67,6 +67,8 @@ class Column { int64_t null_count() const { return data_->null_count(); } + const std::shared_ptr<Field>& field() const { return field_; } + // @returns: the column's name in the passed metadata const std::string& name() const { return field_->name; } http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt index c00cc9f..f00bb53 100644 --- a/cpp/src/arrow/parquet/CMakeLists.txt +++ b/cpp/src/arrow/parquet/CMakeLists.txt @@ -35,6 +35,13 @@ add_library(arrow_parquet SHARED target_link_libraries(arrow_parquet ${PARQUET_LIBS}) SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX) +if (APPLE) + set_target_properties(arrow_parquet + PROPERTIES + BUILD_WITH_INSTALL_RPATH ON + INSTALL_NAME_DIR "@rpath") +endif() + ADD_ARROW_TEST(parquet-schema-test) ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet) http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/parquet-io-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index 845574d..db779d8 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -18,6 +18,7 @@ #include "gtest/gtest.h" #include "arrow/test-util.h" +#include "arrow/parquet/test-util.h" #include "arrow/parquet/reader.h" #include "arrow/parquet/writer.h" #include "arrow/types/primitive.h" @@ -44,36 +45,45 @@ namespace arrow { namespace parquet { -template <typename ArrowType> -std::shared_ptr<PrimitiveArray> NonNullArray( - size_t size, typename ArrowType::c_type value) { - std::vector<typename ArrowType::c_type> values(size, value); - NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>()); - builder.Append(values.data(), values.size()); - return std::static_pointer_cast<PrimitiveArray>(builder.Finish()); -} +const int SMALL_SIZE = 100; +const int LARGE_SIZE = 10000; -// This helper function only supports (size/2) nulls yet. -template <typename ArrowType> -std::shared_ptr<PrimitiveArray> NullableArray( - size_t size, typename ArrowType::c_type value, size_t num_nulls) { - std::vector<typename ArrowType::c_type> values(size, value); - std::vector<uint8_t> valid_bytes(size, 1); +template <typename TestType> +struct test_traits {}; - for (size_t i = 0; i < num_nulls; i++) { - valid_bytes[i * 2] = 0; - } +template <> +struct test_traits<Int32Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; +}; - NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>()); - builder.Append(values.data(), values.size(), valid_bytes.data()); - return std::static_pointer_cast<PrimitiveArray>(builder.Finish()); -} +template <> +struct test_traits<Int64Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT64; +}; + +template <> +struct test_traits<FloatType> { + static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; +}; + +template <> +struct test_traits<DoubleType> { + static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; +}; + +template <typename T> +using ParquetDataType = ::parquet::DataType<test_traits<T>::parquet_enum>; +template <typename T> +using ParquetWriter = ::parquet::TypedColumnWriter<ParquetDataType<T>>; + +template <typename TestType> class TestParquetIO : public ::testing::Test { public: + typedef typename TestType::c_type T; virtual void SetUp() {} - std::shared_ptr<GroupNode> Schema( + std::shared_ptr<GroupNode> MakeSchema( ParquetType::type parquet_type, Repetition::type repetition) { auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type); NodePtr node_ = @@ -98,20 +108,27 @@ class TestParquetIO : public ::testing::Test { std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader; ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); ASSERT_NE(nullptr, column_reader.get()); - ASSERT_OK(column_reader->NextBatch(100, out)); + ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out)); + ASSERT_NE(nullptr, out->get()); + } + + void ReadTableFromFile( + std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* out) { + arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); + ASSERT_NO_THROW(ASSERT_OK(reader.ReadFlatTable(out))); ASSERT_NE(nullptr, out->get()); } - std::unique_ptr<ParquetFileReader> Int64File( - std::vector<int64_t>& values, int num_chunks) { - std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED); + std::unique_ptr<ParquetFileReader> TestFile(std::vector<T>& values, int num_chunks) { + std::shared_ptr<GroupNode> schema = + MakeSchema(test_traits<TestType>::parquet_enum, Repetition::REQUIRED); std::unique_ptr<ParquetFileWriter> file_writer = MakeWriter(schema); size_t chunk_size = values.size() / num_chunks; for (int i = 0; i < num_chunks; i++) { auto row_group_writer = file_writer->AppendRowGroup(chunk_size); - auto column_writer = - static_cast<::parquet::Int64Writer*>(row_group_writer->NextColumn()); - int64_t* data = values.data() + i * chunk_size; + auto column_writer = static_cast<ParquetWriter<TestType>*>( + row_group_writer->NextColumn()); + T* data = values.data() + i * chunk_size; column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); column_writer->Close(); row_group_writer->Close(); @@ -120,71 +137,135 @@ class TestParquetIO : public ::testing::Test { return ReaderFromSink(); } - private: std::shared_ptr<InMemoryOutputStream> sink_; }; -TEST_F(TestParquetIO, SingleColumnInt64Read) { - std::vector<int64_t> values(100, 128); - std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 1); +typedef ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType> TestTypes; + +TYPED_TEST_CASE(TestParquetIO, TestTypes); + +TYPED_TEST(TestParquetIO, SingleColumnRequiredRead) { + std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128); + std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 1); std::shared_ptr<Array> out; - ReadSingleColumnFile(std::move(file_reader), &out); + this->ReadSingleColumnFile(std::move(file_reader), &out); - Int64Array* out_array = static_cast<Int64Array*>(out.get()); - for (size_t i = 0; i < values.size(); i++) { - EXPECT_EQ(values[i], out_array->raw_data()[i]); - } + ExpectArray<typename TypeParam::c_type>(values.data(), out.get()); } -TEST_F(TestParquetIO, SingleColumnInt64ChunkedRead) { - std::vector<int64_t> values(100, 128); - std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 4); +TYPED_TEST(TestParquetIO, SingleColumnRequiredTableRead) { + std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128); + std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 1); + + std::shared_ptr<Table> out; + this->ReadTableFromFile(std::move(file_reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(SMALL_SIZE, out->num_rows()); + + std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ExpectArray<typename TypeParam::c_type>(values.data(), chunked_array->chunk(0).get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedRead) { + std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128); + std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 4); std::shared_ptr<Array> out; - ReadSingleColumnFile(std::move(file_reader), &out); + this->ReadSingleColumnFile(std::move(file_reader), &out); - Int64Array* out_array = static_cast<Int64Array*>(out.get()); - for (size_t i = 0; i < values.size(); i++) { - EXPECT_EQ(values[i], out_array->raw_data()[i]); - } + ExpectArray<typename TypeParam::c_type>(values.data(), out.get()); } -TEST_F(TestParquetIO, SingleColumnInt64Write) { - std::shared_ptr<PrimitiveArray> values = NonNullArray<Int64Type>(100, 128); +TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedTableRead) { + std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128); + std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 4); + + std::shared_ptr<Table> out; + this->ReadTableFromFile(std::move(file_reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(SMALL_SIZE, out->num_rows()); - std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED); - FileWriter writer(default_memory_pool(), MakeWriter(schema)); + std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ExpectArray<typename TypeParam::c_type>(values.data(), chunked_array->chunk(0).get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { + std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128); + + std::shared_ptr<GroupNode> schema = + this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::REQUIRED); + FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); ASSERT_NO_THROW(ASSERT_OK(writer.Close())); std::shared_ptr<Array> out; - ReadSingleColumnFile(ReaderFromSink(), &out); + this->ReadSingleColumnFile(this->ReaderFromSink(), &out); ASSERT_TRUE(values->Equals(out)); } -TEST_F(TestParquetIO, SingleColumnDoubleReadWrite) { +TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { + std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128); + std::shared_ptr<Table> table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared<InMemoryOutputStream>(); + ASSERT_NO_THROW(ASSERT_OK( + WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length()))); + + std::shared_ptr<Table> out; + this->ReadTableFromFile(this->ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(100, out->num_rows()); + + std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + +TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { // This also tests max_definition_level = 1 - std::shared_ptr<PrimitiveArray> values = NullableArray<DoubleType>(100, 128, 10); + std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10); - std::shared_ptr<GroupNode> schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL); - FileWriter writer(default_memory_pool(), MakeWriter(schema)); + std::shared_ptr<GroupNode> schema = + this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::OPTIONAL); + FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); ASSERT_NO_THROW(ASSERT_OK(writer.Close())); std::shared_ptr<Array> out; - ReadSingleColumnFile(ReaderFromSink(), &out); + this->ReadSingleColumnFile(this->ReaderFromSink(), &out); ASSERT_TRUE(values->Equals(out)); } -TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) { - std::shared_ptr<PrimitiveArray> values = NonNullArray<Int64Type>(100, 128); - std::shared_ptr<PrimitiveArray> values_chunk = NonNullArray<Int64Type>(25, 128); +TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { + // This also tests max_definition_level = 1 + std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10); + std::shared_ptr<Table> table = MakeSimpleTable(values, true); + this->sink_ = std::make_shared<InMemoryOutputStream>(); + ASSERT_NO_THROW(ASSERT_OK( + WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length()))); + + std::shared_ptr<Table> out; + this->ReadTableFromFile(this->ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(SMALL_SIZE, out->num_rows()); + + std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} - std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED); - FileWriter writer(default_memory_pool(), MakeWriter(schema)); +TYPED_TEST(TestParquetIO, SingleColumnIntRequiredChunkedWrite) { + std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128); + std::shared_ptr<PrimitiveArray> values_chunk = + NonNullArray<TypeParam>(SMALL_SIZE / 4, 128); + + std::shared_ptr<GroupNode> schema = + this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::REQUIRED); + FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length()))); ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get()))); @@ -192,18 +273,37 @@ TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) { ASSERT_NO_THROW(ASSERT_OK(writer.Close())); std::shared_ptr<Array> out; - ReadSingleColumnFile(ReaderFromSink(), &out); + this->ReadSingleColumnFile(this->ReaderFromSink(), &out); ASSERT_TRUE(values->Equals(out)); } -TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) { - std::shared_ptr<PrimitiveArray> values = NullableArray<DoubleType>(100, 128, 10); +TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { + std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(LARGE_SIZE, 128); + std::shared_ptr<Table> table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared<InMemoryOutputStream>(); + ASSERT_NO_THROW( + ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512))); + + std::shared_ptr<Table> out; + this->ReadTableFromFile(this->ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(LARGE_SIZE, out->num_rows()); + + std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + +TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { + std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10); std::shared_ptr<PrimitiveArray> values_chunk_nulls = - NullableArray<DoubleType>(25, 128, 10); - std::shared_ptr<PrimitiveArray> values_chunk = NullableArray<DoubleType>(25, 128, 0); + NullableArray<TypeParam>(SMALL_SIZE / 4, 128, 10); + std::shared_ptr<PrimitiveArray> values_chunk = + NullableArray<TypeParam>(SMALL_SIZE / 4, 128, 0); - std::shared_ptr<GroupNode> schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL); - FileWriter writer(default_memory_pool(), MakeWriter(schema)); + std::shared_ptr<GroupNode> schema = + this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::OPTIONAL); + FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length()))); ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get()))); for (int i = 0; i < 3; i++) { @@ -213,10 +313,28 @@ TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) { ASSERT_NO_THROW(ASSERT_OK(writer.Close())); std::shared_ptr<Array> out; - ReadSingleColumnFile(ReaderFromSink(), &out); + this->ReadSingleColumnFile(this->ReaderFromSink(), &out); ASSERT_TRUE(values->Equals(out)); } +TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { + // This also tests max_definition_level = 1 + std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(LARGE_SIZE, 128, 100); + std::shared_ptr<Table> table = MakeSimpleTable(values, true); + this->sink_ = std::make_shared<InMemoryOutputStream>(); + ASSERT_NO_THROW( + ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512))); + + std::shared_ptr<Table> out; + this->ReadTableFromFile(this->ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(LARGE_SIZE, out->num_rows()); + + std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + } // namespace parquet } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 346de25..3b4882d 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -18,10 +18,14 @@ #include "arrow/parquet/reader.h" #include <queue> +#include <string> +#include <vector> +#include "arrow/column.h" #include "arrow/parquet/schema.h" #include "arrow/parquet/utils.h" #include "arrow/schema.h" +#include "arrow/table.h" #include "arrow/types/primitive.h" #include "arrow/util/status.h" @@ -40,6 +44,7 @@ class FileReader::Impl { bool CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr); Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out); Status ReadFlatColumn(int i, std::shared_ptr<Array>* out); + Status ReadFlatTable(std::shared_ptr<Table>* out); private: MemoryPool* pool_; @@ -103,6 +108,22 @@ Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) { return flat_column_reader->NextBatch(reader_->num_rows(), out); } +Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) { + const std::string& name = reader_->descr()->schema()->name(); + std::shared_ptr<Schema> schema; + RETURN_NOT_OK(FromParquetSchema(reader_->descr(), &schema)); + + std::vector<std::shared_ptr<Column>> columns(reader_->num_columns()); + for (int i = 0; i < reader_->num_columns(); i++) { + std::shared_ptr<Array> array; + RETURN_NOT_OK(ReadFlatColumn(i, &array)); + columns[i] = std::make_shared<Column>(schema->field(i), array); + } + + *table = std::make_shared<Table>(name, schema, columns); + return Status::OK(); +} + FileReader::FileReader( MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) : impl_(new FileReader::Impl(pool, std::move(reader))) {} @@ -117,6 +138,10 @@ Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) { return impl_->ReadFlatColumn(i, out); } +Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) { + return impl_->ReadFlatTable(out); +} + FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, ::parquet::ParquetFileReader* reader, int column_index) : pool_(pool), http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/reader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h index 41ca7eb..db7a157 100644 --- a/cpp/src/arrow/parquet/reader.h +++ b/cpp/src/arrow/parquet/reader.h @@ -29,6 +29,7 @@ class Array; class MemoryPool; class RowBatch; class Status; +class Table; namespace parquet { @@ -90,6 +91,8 @@ class FileReader { Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out); // Read column as a whole into an Array. Status ReadFlatColumn(int i, std::shared_ptr<Array>* out); + // Read a table of flat columns into a Table. + Status ReadFlatTable(std::shared_ptr<Table>* out); virtual ~FileReader(); http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/test-util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/test-util.h b/cpp/src/arrow/parquet/test-util.h new file mode 100644 index 0000000..1496082 --- /dev/null +++ b/cpp/src/arrow/parquet/test-util.h @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <string> +#include <vector> + +#include "arrow/types/primitive.h" + +namespace arrow { + +namespace parquet { + +template <typename ArrowType> +std::shared_ptr<PrimitiveArray> NonNullArray( + size_t size, typename ArrowType::c_type value) { + std::vector<typename ArrowType::c_type> values(size, value); + NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast<PrimitiveArray>(builder.Finish()); +} + +// This helper function only supports (size/2) nulls yet. +template <typename ArrowType> +std::shared_ptr<PrimitiveArray> NullableArray( + size_t size, typename ArrowType::c_type value, size_t num_nulls) { + std::vector<typename ArrowType::c_type> values(size, value); + std::vector<uint8_t> valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast<PrimitiveArray>(builder.Finish()); +} + +std::shared_ptr<Column> MakeColumn(const std::string& name, + const std::shared_ptr<PrimitiveArray>& array, bool nullable) { + auto field = std::make_shared<Field>(name, array->type(), nullable); + return std::make_shared<Column>(field, array); +} + +std::shared_ptr<Table> MakeSimpleTable( + const std::shared_ptr<PrimitiveArray>& values, bool nullable) { + std::shared_ptr<Column> column = MakeColumn("col", values, nullable); + std::vector<std::shared_ptr<Column>> columns({column}); + std::vector<std::shared_ptr<Field>> fields({column->field()}); + auto schema = std::make_shared<Schema>(fields); + return std::make_shared<Table>("table", schema, columns); +} + +template <typename T> +void ExpectArray(T* expected, Array* result) { + PrimitiveArray* p_array = static_cast<PrimitiveArray*>(result); + for (size_t i = 0; i < result->length(); i++) { + EXPECT_EQ(expected[i], reinterpret_cast<const T*>(p_array->data()->data())[i]); + } +} + +} // namespace parquet + +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/utils.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/utils.h b/cpp/src/arrow/parquet/utils.h index b32792f..409bcd9 100644 --- a/cpp/src/arrow/parquet/utils.h +++ b/cpp/src/arrow/parquet/utils.h @@ -31,6 +31,11 @@ namespace parquet { (s); \ } catch (const ::parquet::ParquetException& e) { return Status::Invalid(e.what()); } +#define PARQUET_IGNORE_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) {} + } // namespace parquet } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/writer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc index 3ad2c5b..1223901 100644 --- a/cpp/src/arrow/parquet/writer.cc +++ b/cpp/src/arrow/parquet/writer.cc @@ -17,11 +17,21 @@ #include "arrow/parquet/writer.h" +#include <algorithm> +#include <vector> + #include "arrow/array.h" +#include "arrow/column.h" +#include "arrow/table.h" +#include "arrow/types/construct.h" #include "arrow/types/primitive.h" +#include "arrow/parquet/schema.h" #include "arrow/parquet/utils.h" #include "arrow/util/status.h" +using parquet::ParquetFileWriter; +using parquet::schema::GroupNode; + namespace arrow { namespace parquet { @@ -32,8 +42,9 @@ class FileWriter::Impl { Status NewRowGroup(int64_t chunk_size); template <typename ParquetType> - Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data); - Status WriteFlatColumnChunk(const PrimitiveArray* data); + Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data, + int64_t offset, int64_t length); + Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length); Status Close(); virtual ~Impl() {} @@ -60,31 +71,31 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) { } template <typename ParquetType> -Status FileWriter::Impl::TypedWriteBatch( - ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data) { +Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer, + const PrimitiveArray* data, int64_t offset, int64_t length) { + // TODO: DCHECK((offset + length) <= data->length()); auto data_ptr = - reinterpret_cast<const typename ParquetType::c_type*>(data->data()->data()); + reinterpret_cast<const typename ParquetType::c_type*>(data->data()->data()) + + offset; auto writer = reinterpret_cast<::parquet::TypedColumnWriter<ParquetType>*>(column_writer); if (writer->descr()->max_definition_level() == 0) { // no nulls, just dump the data - PARQUET_CATCH_NOT_OK(writer->WriteBatch(data->length(), nullptr, nullptr, data_ptr)); + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_ptr)); } else if (writer->descr()->max_definition_level() == 1) { - RETURN_NOT_OK(def_levels_buffer_.Resize(data->length() * sizeof(int16_t))); + RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); int16_t* def_levels_ptr = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); if (data->null_count() == 0) { - std::fill(def_levels_ptr, def_levels_ptr + data->length(), 1); - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(data->length(), def_levels_ptr, nullptr, data_ptr)); + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, data_ptr)); } else { - RETURN_NOT_OK(data_buffer_.Resize( - (data->length() - data->null_count()) * sizeof(typename ParquetType::c_type))); + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(typename ParquetType::c_type))); auto buffer_ptr = reinterpret_cast<typename ParquetType::c_type*>(data_buffer_.mutable_data()); int buffer_idx = 0; - for (size_t i = 0; i < data->length(); i++) { - if (data->IsNull(i)) { + for (size_t i = 0; i < length; i++) { + if (data->IsNull(offset + i)) { def_levels_ptr[i] = 0; } else { def_levels_ptr[i] = 1; @@ -92,7 +103,7 @@ Status FileWriter::Impl::TypedWriteBatch( } } PARQUET_CATCH_NOT_OK( - writer->WriteBatch(data->length(), def_levels_ptr, nullptr, buffer_ptr)); + writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); } } else { return Status::NotImplemented("no support for max definition level > 1 yet"); @@ -107,12 +118,13 @@ Status FileWriter::Impl::Close() { return Status::OK(); } -#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ - case Type::ENUM: \ - return TypedWriteBatch<ParquetType>(writer, data); \ +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ + case Type::ENUM: \ + return TypedWriteBatch<ParquetType>(writer, data, offset, length); \ break; -Status FileWriter::Impl::WriteFlatColumnChunk(const PrimitiveArray* data) { +Status FileWriter::Impl::WriteFlatColumnChunk( + const PrimitiveArray* data, int64_t offset, int64_t length) { ::parquet::ColumnWriter* writer; PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn()); switch (data->type_enum()) { @@ -133,8 +145,11 @@ Status FileWriter::NewRowGroup(int64_t chunk_size) { return impl_->NewRowGroup(chunk_size); } -Status FileWriter::WriteFlatColumnChunk(const PrimitiveArray* data) { - return impl_->WriteFlatColumnChunk(data); +Status FileWriter::WriteFlatColumnChunk( + const PrimitiveArray* data, int64_t offset, int64_t length) { + int64_t real_length = length; + if (length == -1) { real_length = data->length(); } + return impl_->WriteFlatColumnChunk(data, offset, real_length); } Status FileWriter::Close() { @@ -143,6 +158,48 @@ Status FileWriter::Close() { FileWriter::~FileWriter() {} +Status WriteFlatTable(const Table* table, MemoryPool* pool, + std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size) { + std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema; + RETURN_NOT_OK(ToParquetSchema(table->schema().get(), &parquet_schema)); + auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema()); + std::unique_ptr<ParquetFileWriter> parquet_writer = + ParquetFileWriter::Open(sink, schema_node); + FileWriter writer(pool, std::move(parquet_writer)); + + // TODO: Support writing chunked arrays. + for (int i = 0; i < table->num_columns(); i++) { + if (table->column(i)->data()->num_chunks() != 1) { + return Status::NotImplemented("No support for writing chunked arrays yet."); + } + } + + // Cast to PrimitiveArray instances as we work with them. + std::vector<std::shared_ptr<PrimitiveArray>> arrays(table->num_columns()); + for (int i = 0; i < table->num_columns(); i++) { + // num_chunks == 1 as per above loop + std::shared_ptr<Array> array = table->column(i)->data()->chunk(0); + auto primitive_array = std::dynamic_pointer_cast<PrimitiveArray>(array); + if (!primitive_array) { + PARQUET_IGNORE_NOT_OK(writer.Close()); + return Status::NotImplemented("Table must consist of PrimitiveArray instances"); + } + arrays[i] = primitive_array; + } + + for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) { + int64_t offset = chunk * chunk_size; + int64_t size = std::min(chunk_size, table->num_rows() - offset); + RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close())); + for (int i = 0; i < table->num_columns(); i++) { + RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(arrays[i].get(), offset, size), + PARQUET_IGNORE_NOT_OK(writer.Close())); + } + } + + return writer.Close(); +} + } // namespace parquet } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/writer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h index 38f7d0b..83e799f 100644 --- a/cpp/src/arrow/parquet/writer.h +++ b/cpp/src/arrow/parquet/writer.h @@ -29,6 +29,7 @@ class MemoryPool; class PrimitiveArray; class RowBatch; class Status; +class Table; namespace parquet { @@ -42,7 +43,8 @@ class FileWriter { FileWriter(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); Status NewRowGroup(int64_t chunk_size); - Status WriteFlatColumnChunk(const PrimitiveArray* data); + Status WriteFlatColumnChunk( + const PrimitiveArray* data, int64_t offset = 0, int64_t length = -1); Status Close(); virtual ~FileWriter(); @@ -52,6 +54,14 @@ class FileWriter { std::unique_ptr<Impl> impl_; }; +/** + * Write a flat Table to Parquet. + * + * The table shall only consist of nullable, non-repeated columns of primitive type. + */ +Status WriteFlatTable(const Table* table, MemoryPool* pool, + std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size); + } // namespace parquet } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/util/status.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/status.h b/cpp/src/arrow/util/status.h index 6ddc177..d1a7425 100644 --- a/cpp/src/arrow/util/status.h +++ b/cpp/src/arrow/util/status.h @@ -63,6 +63,15 @@ namespace arrow { if (!_s.ok()) { return _s; } \ } while (0); +#define RETURN_NOT_OK_ELSE(s, else_) \ + do { \ + Status _s = (s); \ + if (!_s.ok()) { \ + else_; \ + return _s; \ + } \ + } while (0); + enum class StatusCode : char { OK = 0, OutOfMemory = 1, http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 2173232..f1becfc 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -339,11 +339,17 @@ if (PYARROW_BUILD_TESTS) STATIC_LIB ${GTEST_STATIC_LIB}) endif() +## Parquet +find_package(Parquet REQUIRED) +include_directories(SYSTEM ${PARQUET_INCLUDE_DIR}) + ## Arrow find_package(Arrow REQUIRED) include_directories(SYSTEM ${ARROW_INCLUDE_DIR}) ADD_THIRDPARTY_LIB(arrow SHARED_LIB ${ARROW_SHARED_LIB}) +ADD_THIRDPARTY_LIB(arrow_parquet + SHARED_LIB ${ARROW_PARQUET_SHARED_LIB}) ############################################################ # Linker setup @@ -422,6 +428,7 @@ set(PYARROW_SRCS set(LINK_LIBS arrow + arrow_parquet ) SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) @@ -442,6 +449,7 @@ set(CYTHON_EXTENSIONS array config error + parquet scalar schema table http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/cmake_modules/FindArrow.cmake ---------------------------------------------------------------------- diff --git a/python/cmake_modules/FindArrow.cmake b/python/cmake_modules/FindArrow.cmake index 3d99838..f0b258e 100644 --- a/python/cmake_modules/FindArrow.cmake +++ b/python/cmake_modules/FindArrow.cmake @@ -42,19 +42,27 @@ find_library(ARROW_LIB_PATH NAMES arrow ${ARROW_SEARCH_LIB_PATH} NO_DEFAULT_PATH) -if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH) +find_library(ARROW_PARQUET_LIB_PATH NAMES arrow_parquet + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) + +if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH AND ARROW_PARQUET_LIB_PATH) set(ARROW_FOUND TRUE) set(ARROW_LIB_NAME libarrow) + set(ARROW_PARQUET_LIB_NAME libarrow_parquet) set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH}) set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a) set(ARROW_SHARED_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + set(ARROW_PARQUET_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_PARQUET_LIB_NAME}.a) + set(ARROW_PARQUET_SHARED_LIB ${ARROW_LIBS}/${ARROW_PARQUET_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) else () set(ARROW_FOUND FALSE) endif () if (ARROW_FOUND) if (NOT Arrow_FIND_QUIETLY) - message(STATUS "Found the Arrow library: ${ARROW_LIB_PATH}") + message(STATUS "Found the Arrow library: ${ARROW_LIB_PATH}, ${ARROW_PARQUET_LIB_PATH}") endif () else () if (NOT Arrow_FIND_QUIETLY) @@ -74,4 +82,6 @@ mark_as_advanced( ARROW_LIBS ARROW_STATIC_LIB ARROW_SHARED_LIB + ARROW_PARQUET_STATIC_LIB + ARROW_PARQUET_SHARED_LIB ) http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/conda.recipe/build.sh ---------------------------------------------------------------------- diff --git a/python/conda.recipe/build.sh b/python/conda.recipe/build.sh index a9d9aed..a164c1a 100644 --- a/python/conda.recipe/build.sh +++ b/python/conda.recipe/build.sh @@ -6,6 +6,19 @@ export ARROW_HOME=$PREFIX cd $RECIPE_DIR +if [ "$(uname)" == "Darwin" ]; then + # C++11 finagling for Mac OSX + export CC=clang + export CXX=clang++ + export MACOSX_VERSION_MIN="10.7" + CXXFLAGS="${CXXFLAGS} -mmacosx-version-min=${MACOSX_VERSION_MIN}" + CXXFLAGS="${CXXFLAGS} -stdlib=libc++ -std=c++11" + export LDFLAGS="${LDFLAGS} -mmacosx-version-min=${MACOSX_VERSION_MIN}" + export LDFLAGS="${LDFLAGS} -stdlib=libc++ -std=c++11" + export LINKFLAGS="${LDFLAGS}" + export MACOSX_DEPLOYMENT_TARGET=10.7 +fi + echo Setting the compiler... if [ `uname` == Linux ]; then EXTRA_CMAKE_ARGS=-DCMAKE_SHARED_LINKER_FLAGS=-static-libstdc++ http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/array.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx index a80b3ce..619e5ef 100644 --- a/python/pyarrow/array.pyx +++ b/python/pyarrow/array.pyx @@ -68,6 +68,9 @@ cdef class Array: values = array_format(self, window=10) return '{0}\n{1}'.format(type_format, values) + def equals(Array self, Array other): + return self.ap.Equals(other.sp_array) + def __len__(self): if self.sp_array.get(): return self.sp_array.get().length() http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/error.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/error.pxd b/python/pyarrow/error.pxd index d226abe..97ba0ef 100644 --- a/python/pyarrow/error.pxd +++ b/python/pyarrow/error.pxd @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +from pyarrow.includes.libarrow cimport CStatus from pyarrow.includes.pyarrow cimport * +cdef check_cstatus(const CStatus& status) cdef check_status(const Status& status) http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/error.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/error.pyx b/python/pyarrow/error.pyx index 3f8d7dd..5a6a038 100644 --- a/python/pyarrow/error.pyx +++ b/python/pyarrow/error.pyx @@ -15,12 +15,20 @@ # specific language governing permissions and limitations # under the License. +from pyarrow.includes.libarrow cimport CStatus from pyarrow.includes.common cimport c_string from pyarrow.compat import frombytes class ArrowException(Exception): pass +cdef check_cstatus(const CStatus& status): + if status.ok(): + return + + cdef c_string c_message = status.ToString() + raise ArrowException(frombytes(c_message)) + cdef check_status(const Status& status): if status.ok(): return http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/includes/common.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index e86d5d7..1f6ecee 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -19,6 +19,7 @@ from libc.stdint cimport * from libcpp cimport bool as c_bool +from libcpp.memory cimport shared_ptr, unique_ptr from libcpp.string cimport string as c_string from libcpp.vector cimport vector @@ -32,11 +33,3 @@ cdef extern from "<iostream>": cdef extern from "<Python.h>": void Py_XDECREF(PyObject* o) -cdef extern from "<memory>" namespace "std" nogil: - - cdef cppclass shared_ptr[T]: - shared_ptr() - shared_ptr(T*) - T* get() - void reset() - void reset(T* p) http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index b2ef45a..90414e3 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -72,6 +72,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: cdef cppclass MemoryPool" arrow::MemoryPool": int64_t bytes_allocated() + cdef MemoryPool* default_memory_pool() + cdef cppclass CListType" arrow::ListType"(CDataType): CListType(const shared_ptr[CDataType]& value_type) @@ -103,6 +105,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: int32_t null_count() Type type_enum() + c_bool Equals(const shared_ptr[CArray]& arr) c_bool IsNull(int i) cdef cppclass CBooleanArray" arrow::BooleanArray"(CArray): http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/includes/parquet.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index ffdc5d4..0918344 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -18,6 +18,26 @@ # distutils: language = c++ from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool + + +cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: + cdef cppclass Node: + pass + + cdef cppclass GroupNode(Node): + pass + + cdef cppclass PrimitiveNode(Node): + pass + +cdef extern from "parquet/api/schema.h" namespace "parquet" nogil: + cdef cppclass SchemaDescriptor: + shared_ptr[Node] schema() + GroupNode* group() + + cdef cppclass ColumnDescriptor: + pass cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: cdef cppclass ColumnReader: @@ -48,4 +68,30 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: pass cdef cppclass ParquetFileReader: + # TODO: Some default arguments are missing + @staticmethod + unique_ptr[ParquetFileReader] OpenFile(const c_string& path) + +cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: + cdef cppclass OutputStream: pass + + cdef cppclass LocalFileOutputStream(OutputStream): + LocalFileOutputStream(const c_string& path) + void Close() + + +cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil: + cdef cppclass FileReader: + FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader) + CStatus ReadFlatTable(shared_ptr[CTable]* out); + + +cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil: + CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema, shared_ptr[CSchema]* out) + CStatus ToParquetSchema(const CSchema* arrow_schema, shared_ptr[SchemaDescriptor]* out) + + +cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil: + cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, shared_ptr[OutputStream] sink, int64_t chunk_size) + http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/parquet.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index 622e7d0..3d5355e 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -19,5 +19,53 @@ # distutils: language = c++ # cython: embedsignature = True -from pyarrow.compat import frombytes, tobytes +from pyarrow.includes.libarrow cimport * +cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.includes.parquet cimport * + +from pyarrow.compat import tobytes +from pyarrow.error cimport check_cstatus +from pyarrow.table cimport Table + +def read_table(filename, columns=None): + """ + Read a Table from Parquet format + Returns + ------- + table: pyarrow.Table + """ + cdef unique_ptr[FileReader] reader + cdef Table table = Table() + cdef shared_ptr[CTable] ctable + + # Must be in one expression to avoid calling std::move which is not possible + # in Cython (due to missing rvalue support) + reader = unique_ptr[FileReader](new FileReader(default_memory_pool(), + ParquetFileReader.OpenFile(tobytes(filename)))) + check_cstatus(reader.get().ReadFlatTable(&ctable)) + table.init(ctable) + return table + +def write_table(table, filename, chunk_size=None): + """ + Write a Table to Parquet format + + Parameters + ---------- + table : pyarrow.Table + filename : string + chunk_size : int + The maximum number of rows in each Parquet RowGroup + """ + cdef Table table_ = table + cdef CTable* ctable_ = table_.table + cdef shared_ptr[OutputStream] sink + cdef int64_t chunk_size_ = 0 + if chunk_size is None: + chunk_size_ = min(ctable_.num_rows(), int(2**16)) + else: + chunk_size_ = chunk_size + + sink.reset(new LocalFileOutputStream(tobytes(filename))) + check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, chunk_size_)) + http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/schema.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/schema.pyx b/python/pyarrow/schema.pyx index 22ddf0c..084c304 100644 --- a/python/pyarrow/schema.pyx +++ b/python/pyarrow/schema.pyx @@ -201,7 +201,9 @@ def string(): def list_(DataType value_type): cdef DataType out = DataType() - out.init(shared_ptr[CDataType](new CListType(value_type.sp_type))) + cdef shared_ptr[CDataType] list_type + list_type.reset(new CListType(value_type.sp_type)) + out.init(list_type) return out def struct(fields): @@ -212,12 +214,13 @@ def struct(fields): DataType out = DataType() Field field vector[shared_ptr[CField]] c_fields + cdef shared_ptr[CDataType] struct_type for field in fields: c_fields.push_back(field.sp_field) - out.init(shared_ptr[CDataType]( - new CStructType(c_fields))) + struct_type.reset(new CStructType(c_fields)) + out.init(struct_type) return out def schema(fields): http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py new file mode 100644 index 0000000..d92cf4c --- /dev/null +++ b/python/pyarrow/tests/test_parquet.py @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyarrow.compat import unittest +import pyarrow as arrow +import pyarrow.parquet + +A = arrow + +import numpy as np +import os.path +import pandas as pd + +import pandas.util.testing as pdt + + +def test_single_pylist_column_roundtrip(tmpdir): + for dtype in [int, float]: + filename = tmpdir.join('single_{}_column.parquet'.format(dtype.__name__)) + data = [A.from_pylist(list(map(dtype, range(5))))] + table = A.Table.from_arrays(('a', 'b'), data, 'table_name') + A.parquet.write_table(table, filename.strpath) + table_read = pyarrow.parquet.read_table(filename.strpath) + for col_written, col_read in zip(table.itercolumns(), table_read.itercolumns()): + assert col_written.name == col_read.name + assert col_read.data.num_chunks == 1 + data_written = col_written.data.chunk(0) + data_read = col_read.data.chunk(0) + assert data_written.equals(data_read) + +def test_pandas_rountrip(tmpdir): + size = 10000 + df = pd.DataFrame({ + 'int32': np.arange(size, dtype=np.int32), + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64) + }) + filename = tmpdir.join('pandas_rountrip.parquet') + arrow_table = A.from_pandas_dataframe(df) + A.parquet.write_table(arrow_table, filename.strpath) + table_read = pyarrow.parquet.read_table(filename.strpath) + df_read = table_read.to_pandas() + pdt.assert_frame_equal(df, df_read) + http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index 5f228ed..7edeb91 100644 --- a/python/setup.py +++ b/python/setup.py @@ -214,7 +214,7 @@ class build_ext(_build_ext): return name + suffix def get_cmake_cython_names(self): - return ['array', 'config', 'error', 'scalar', 'schema', 'table'] + return ['array', 'config', 'error', 'parquet', 'scalar', 'schema', 'table'] def get_names(self): return self._found_names @@ -242,7 +242,7 @@ setup( 'clean': clean, 'build_ext': build_ext }, - install_requires=['cython >= 0.21', 'numpy >= 1.9'], + install_requires=['cython >= 0.23', 'numpy >= 1.9'], description=DESC, license='Apache License, Version 2.0', maintainer="Apache Arrow Developers",