This is an automated email from the ASF dual-hosted git repository. emkornfield pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 3129732 ARROW-4772: [C++] new ORC adapter interface for stripe and row iteration 3129732 is described below commit 3129732a18210d0c8921b45f79be4f34eadf0cc3 Author: Yurui Zhou <yurui....@alibaba-inc.com> AuthorDate: Sun Mar 24 21:21:50 2019 -0700 ARROW-4772: [C++] new ORC adapter interface for stripe and row iteration Improvemnt of current ORC adapter interface that enable following operation: - enable seek operation to designated row - enable iteration over stripe with StripeReader - StripeReader is neccesary since ORC support stripe level dictionary encoding, for this reason the Arrow Schema could varies between stripes if Dictionary Based Type is enabled. - enable row level iteration with StripeReader Author: Yurui Zhou <yurui....@alibaba-inc.com> Closes #3843 from yuruiz/OrcAdapterInterface and squashes the following commits: 9c3229b5 <Yurui Zhou> resolve comments e3911374 <Yurui Zhou> resolve comments 74ea0727 <Yurui Zhou> fix clang format error 94d34827 <Yurui Zhou> resolve comments 0a525dcb <Yurui Zhou> Resolve comments b241b64d <Yurui Zhou> fix lint error bf12efed <Yurui Zhou> remove unnecessary deprecation mark 340f4f96 <Yurui Zhou> resolve code style issues 1f5ecdce <Yurui Zhou> fix clang format error 0547348f <Yurui Zhou> Fix cpplint errors 1d258540 <Yurui Zhou> Fix cmake format error eb187c0e <Yurui Zhou> ARROW-4713: new ORC adapter interface for stripe and row iteration Improvemnt of current ORC adapter interface that enable following operation: - enable seek operation to designated row - enable iteration over stripe with StripeReader - StripeReader is neccesary since ORC support stripe level dictionary encoding, for this reason the Arrow Schema could varies between stripes if Dictionary Based Type is enabled. - enable row level iteration with StripeReader --- cpp/src/arrow/CMakeLists.txt | 2 +- cpp/src/arrow/adapters/orc/CMakeLists.txt | 19 ++ cpp/src/arrow/adapters/orc/adapter-test.cc | 158 ++++++++++ cpp/src/arrow/adapters/orc/adapter.cc | 485 +++++++---------------------- cpp/src/arrow/adapters/orc/adapter.h | 31 +- cpp/src/arrow/adapters/orc/adapter_util.cc | 427 +++++++++++++++++++++++++ cpp/src/arrow/adapters/orc/adapter_util.h | 44 +++ 7 files changed, 792 insertions(+), 374 deletions(-) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 865c453..3ff8ba1 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -202,7 +202,7 @@ endif() if(ARROW_ORC) add_subdirectory(adapters/orc) - set(ARROW_SRCS adapters/orc/adapter.cc ${ARROW_SRCS}) + set(ARROW_SRCS adapters/orc/adapter.cc adapters/orc/adapter_util.cc ${ARROW_SRCS}) endif() if(ARROW_TENSORFLOW) diff --git a/cpp/src/arrow/adapters/orc/CMakeLists.txt b/cpp/src/arrow/adapters/orc/CMakeLists.txt index 6c8b47e..97bff89 100644 --- a/cpp/src/arrow/adapters/orc/CMakeLists.txt +++ b/cpp/src/arrow/adapters/orc/CMakeLists.txt @@ -26,3 +26,22 @@ install(FILES adapter.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/adapters/ configure_file(arrow-orc.pc.in "${CMAKE_CURRENT_BINARY_DIR}/arrow-orc.pc" @ONLY) install(FILES "${CMAKE_CURRENT_BINARY_DIR}/arrow-orc.pc" DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/") + +set(ORC_MIN_TEST_LIBS GTest::Main GTest::GTest) + +if(ARROW_BUILD_STATIC) + set(ARROW_LIBRARIES_FOR_STATIC_TESTS arrow_testing_static arrow_static) +else() + set(ARROW_LIBRARIES_FOR_STATIC_TESTS arrow_testing_shared arrow_shared) +endif() + +if(APPLE) + set(ORC_MIN_TEST_LIBS ${ORC_MIN_TEST_LIBS} ${CMAKE_DL_LIBS}) +elseif(NOT MSVC) + set(ORC_MIN_TEST_LIBS ${ORC_MIN_TEST_LIBS} pthread ${CMAKE_DL_LIBS}) +endif() + +set(ORC_STATIC_TEST_LINK_LIBS ${ORC_MIN_TEST_LIBS} ${ARROW_LIBRARIES_FOR_STATIC_TESTS} + orc_static) + +add_arrow_test(adapter-test PREFIX "orc" STATIC_LINK_LIBS ${ORC_STATIC_TEST_LINK_LIBS}) diff --git a/cpp/src/arrow/adapters/orc/adapter-test.cc b/cpp/src/arrow/adapters/orc/adapter-test.cc new file mode 100644 index 0000000..f42144e --- /dev/null +++ b/cpp/src/arrow/adapters/orc/adapter-test.cc @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/adapters/orc/adapter.h" +#include "arrow/array.h" +#include "arrow/io/api.h" + +#include <gtest/gtest.h> +#include <orc/OrcFile.hh> + +namespace liborc = orc; + +namespace arrow { + +constexpr int DEFAULT_MEM_STREAM_SIZE = 100 * 1024 * 1024; + +class MemoryOutputStream : public liborc::OutputStream { + public: + explicit MemoryOutputStream(ssize_t capacity) + : data_(capacity), name_("MemoryOutputStream"), length_(0) {} + + uint64_t getLength() const override { return length_; } + + uint64_t getNaturalWriteSize() const override { return natural_write_size_; } + + void write(const void* buf, size_t size) override { + memcpy(data_.data() + length_, buf, size); + length_ += size; + } + + const std::string& getName() const override { return name_; } + + const char* getData() const { return data_.data(); } + + void close() override {} + + void reset() { length_ = 0; } + + private: + std::vector<char> data_; + std::string name_; + uint64_t length_, natural_write_size_; +}; + +std::unique_ptr<liborc::Writer> CreateWriter(uint64_t stripe_size, + const liborc::Type& type, + liborc::OutputStream* stream) { + liborc::WriterOptions options; + options.setStripeSize(stripe_size); + options.setCompressionBlockSize(1024); + options.setMemoryPool(liborc::getDefaultPool()); + options.setRowIndexStride(0); + return liborc::createWriter(type, stream, options); +} + +TEST(TestAdapter, readIntAndStringFileMultipleStripes) { + MemoryOutputStream mem_stream(DEFAULT_MEM_STREAM_SIZE); + ORC_UNIQUE_PTR<liborc::Type> type( + liborc::Type::buildTypeFromString("struct<col1:int,col2:string>")); + + constexpr uint64_t stripe_size = 1024; // 1K + constexpr uint64_t stripe_count = 10; + constexpr uint64_t stripe_row_count = 65535; + constexpr uint64_t reader_batch_size = 1024; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(stripe_row_count); + auto struct_batch = dynamic_cast<liborc::StructVectorBatch*>(batch.get()); + auto long_batch = dynamic_cast<liborc::LongVectorBatch*>(struct_batch->fields[0]); + auto str_batch = dynamic_cast<liborc::StringVectorBatch*>(struct_batch->fields[1]); + int64_t accumulated = 0; + + for (uint64_t j = 0; j < stripe_count; ++j) { + char data_buffer[327675]; + uint64_t offset = 0; + for (uint64_t i = 0; i < stripe_row_count; ++i) { + std::string str_data = std::to_string(accumulated % stripe_row_count); + long_batch->data[i] = static_cast<int64_t>(accumulated % stripe_row_count); + str_batch->data[i] = data_buffer + offset; + str_batch->length[i] = static_cast<int64_t>(str_data.size()); + memcpy(data_buffer + offset, str_data.c_str(), str_data.size()); + accumulated++; + offset += str_data.size(); + } + struct_batch->numElements = stripe_row_count; + long_batch->numElements = stripe_row_count; + str_batch->numElements = stripe_row_count; + + writer->add(*batch); + } + + writer->close(); + + std::shared_ptr<io::RandomAccessFile> in_stream(new io::BufferReader( + std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>(mem_stream.getData()), + static_cast<int64_t>(mem_stream.getLength())))); + + std::unique_ptr<adapters::orc::ORCFileReader> reader; + ASSERT_TRUE( + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool(), &reader).ok()); + + ASSERT_EQ(stripe_row_count * stripe_count, reader->NumberOfRows()); + ASSERT_EQ(stripe_count, reader->NumberOfStripes()); + accumulated = 0; + std::shared_ptr<RecordBatchReader> stripe_reader; + EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok()); + while (stripe_reader) { + std::shared_ptr<RecordBatch> record_batch; + EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok()); + while (record_batch) { + auto int32_array = std::dynamic_pointer_cast<Int32Array>(record_batch->column(0)); + auto str_array = std::dynamic_pointer_cast<StringArray>(record_batch->column(1)); + for (int j = 0; j < record_batch->num_rows(); ++j) { + EXPECT_EQ(accumulated % stripe_row_count, int32_array->Value(j)); + EXPECT_EQ(std::to_string(accumulated % stripe_row_count), + str_array->GetString(j)); + accumulated++; + } + EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok()); + } + EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok()); + } + + // test seek operation + int64_t start_offset = 830; + EXPECT_TRUE(reader->Seek(stripe_row_count + start_offset).ok()); + + EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok()); + std::shared_ptr<RecordBatch> record_batch; + EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok()); + while (record_batch) { + auto int32_array = std::dynamic_pointer_cast<Int32Array>(record_batch->column(0)); + auto str_array = std::dynamic_pointer_cast<StringArray>(record_batch->column(1)); + for (int j = 0; j < record_batch->num_rows(); ++j) { + std::ostringstream os; + os << start_offset % stripe_row_count; + EXPECT_EQ(start_offset % stripe_row_count, int32_array->Value(j)); + EXPECT_EQ(os.str(), str_array->GetString(j)); + start_offset++; + } + EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok()); + } +} +} // namespace arrow diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index 78a321f..89ad597 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -16,6 +16,7 @@ // under the License. #include "arrow/adapters/orc/adapter.h" +#include "arrow/adapters/orc/adapter_util.h" #include <algorithm> #include <cstdint> @@ -102,120 +103,58 @@ struct StripeInformation { uint64_t offset; uint64_t length; uint64_t num_rows; + uint64_t first_row_of_stripe; }; -Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out) { - // When subselecting fields on read, liborc will set some nodes to nullptr, - // so we need to check for nullptr before progressing - if (type == nullptr) { - *out = null(); - return Status::OK(); - } - liborc::TypeKind kind = type->getKind(); - const int subtype_count = static_cast<int>(type->getSubtypeCount()); - - switch (kind) { - case liborc::BOOLEAN: - *out = boolean(); - break; - case liborc::BYTE: - *out = int8(); - break; - case liborc::SHORT: - *out = int16(); - break; - case liborc::INT: - *out = int32(); - break; - case liborc::LONG: - *out = int64(); - break; - case liborc::FLOAT: - *out = float32(); - break; - case liborc::DOUBLE: - *out = float64(); - break; - case liborc::VARCHAR: - case liborc::STRING: - *out = utf8(); - break; - case liborc::BINARY: - *out = binary(); - break; - case liborc::CHAR: - *out = fixed_size_binary(static_cast<int>(type->getMaximumLength())); - break; - case liborc::TIMESTAMP: - *out = timestamp(TimeUnit::NANO); - break; - case liborc::DATE: - *out = date32(); - break; - case liborc::DECIMAL: { - const int precision = static_cast<int>(type->getPrecision()); - const int scale = static_cast<int>(type->getScale()); - if (precision == 0) { - // In HIVE 0.11/0.12 precision is set as 0, but means max precision - *out = decimal(38, 6); - } else { - *out = decimal(precision, scale); - } - break; - } - case liborc::LIST: { - if (subtype_count != 1) { - return Status::Invalid("Invalid Orc List type"); - } - std::shared_ptr<DataType> elemtype; - RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &elemtype)); - *out = list(elemtype); - break; - } - case liborc::MAP: { - if (subtype_count != 2) { - return Status::Invalid("Invalid Orc Map type"); - } - std::shared_ptr<DataType> keytype; - std::shared_ptr<DataType> valtype; - RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &keytype)); - RETURN_NOT_OK(GetArrowType(type->getSubtype(1), &valtype)); - *out = list(struct_({field("key", keytype), field("value", valtype)})); - break; +// The number of rows to read in a ColumnVectorBatch +constexpr int64_t kReadRowsBatch = 1000; + +class OrcStripeReader : public RecordBatchReader { + public: + OrcStripeReader(std::unique_ptr<liborc::RowReader> row_reader, + std::shared_ptr<Schema> schema, int64_t batch_size, MemoryPool* pool) + : row_reader_(std::move(row_reader)), + schema_(schema), + pool_(pool), + batch_size_{batch_size} {} + + std::shared_ptr<Schema> schema() const override { return schema_; } + + Status ReadNext(std::shared_ptr<RecordBatch>* out) override { + std::unique_ptr<liborc::ColumnVectorBatch> batch; + try { + batch = row_reader_->createRowBatch(batch_size_); + } catch (const liborc::ParseError& e) { + return Status::Invalid(e.what()); } - case liborc::STRUCT: { - std::vector<std::shared_ptr<Field>> fields; - for (int child = 0; child < subtype_count; ++child) { - std::shared_ptr<DataType> elemtype; - RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype)); - std::string name = type->getFieldName(child); - fields.push_back(field(name, elemtype)); - } - *out = struct_(fields); - break; + + const liborc::Type& type = row_reader_->getSelectedType(); + if (!row_reader_->next(*batch)) { + out->reset(); + return Status::OK(); } - case liborc::UNION: { - std::vector<std::shared_ptr<Field>> fields; - std::vector<uint8_t> type_codes; - for (int child = 0; child < subtype_count; ++child) { - std::shared_ptr<DataType> elemtype; - RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype)); - fields.push_back(field("_union_" + std::to_string(child), elemtype)); - type_codes.push_back(static_cast<uint8_t>(child)); - } - *out = union_(fields, type_codes); - break; + + std::unique_ptr<RecordBatchBuilder> builder; + RETURN_NOT_OK(RecordBatchBuilder::Make(schema_, pool_, batch->numElements, &builder)); + + // The top-level type must be a struct to read into an arrow table + const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch); + + for (int i = 0; i < builder->num_fields(); i++) { + RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0, + batch->numElements, builder->GetField(i))); } - default: { return Status::Invalid("Unknown Orc type kind: ", kind); } - } - return Status::OK(); -} -// The number of rows to read in a ColumnVectorBatch -constexpr int64_t kReadRowsBatch = 1000; + RETURN_NOT_OK(builder->Flush(out)); + return Status::OK(); + } -// The numer of nanoseconds in a second -constexpr int64_t kOneSecondNanos = 1000000000LL; + private: + std::unique_ptr<liborc::RowReader> row_reader_; + std::shared_ptr<Schema> schema_; + MemoryPool* pool_; + int64_t batch_size_; +}; class ORCFileReader::Impl { public: @@ -233,6 +172,7 @@ class ORCFileReader::Impl { } pool_ = pool; reader_ = std::move(liborc_reader); + current_row_ = 0; return Init(); } @@ -241,10 +181,12 @@ class ORCFileReader::Impl { int64_t nstripes = reader_->getNumberOfStripes(); stripes_.resize(nstripes); std::unique_ptr<liborc::StripeInformation> stripe; + uint64_t first_row_of_stripe = 0; for (int i = 0; i < nstripes; ++i) { stripe = reader_->getStripe(i); - stripes_[i] = StripeInformation( - {stripe->getOffset(), stripe->getLength(), stripe->getNumberOfRows()}); + stripes_[i] = StripeInformation({stripe->getOffset(), stripe->getLength(), + stripe->getNumberOfRows(), first_row_of_stripe}); + first_row_of_stripe += stripe->getNumberOfRows(); } return Status::OK(); } @@ -349,6 +291,23 @@ class ORCFileReader::Impl { return Status::OK(); } + Status SelectStripeWithRowNumber(liborc::RowReaderOptions* opts, int64_t row_number, + StripeInformation* out) { + ARROW_RETURN_IF(row_number >= NumberOfRows(), + Status::Invalid("Out of bounds row number: ", row_number)); + + for (auto it = stripes_.begin(); it != stripes_.end(); it++) { + if (static_cast<uint64_t>(row_number) >= it->first_row_of_stripe && + static_cast<uint64_t>(row_number) < it->first_row_of_stripe + it->num_rows) { + opts->range(it->offset, it->length); + *out = *it; + return Status::OK(); + } + } + + return Status::Invalid("Invalid row number", row_number); + } + Status SelectIndices(liborc::RowReaderOptions* opts, const std::vector<int>& include_indices) { std::list<uint64_t> include_indices_list; @@ -374,11 +333,11 @@ class ORCFileReader::Impl { Status ReadBatch(const liborc::RowReaderOptions& opts, const std::shared_ptr<Schema>& schema, int64_t nrows, std::shared_ptr<RecordBatch>* out) { - std::unique_ptr<liborc::RowReader> rowreader; + std::unique_ptr<liborc::RowReader> row_reader; std::unique_ptr<liborc::ColumnVectorBatch> batch; try { - rowreader = reader_->createRowReader(opts); - batch = rowreader->createRowBatch(std::min(nrows, kReadRowsBatch)); + row_reader = reader_->createRowReader(opts); + batch = row_reader->createRowBatch(std::min(nrows, kReadRowsBatch)); } catch (const liborc::ParseError& e) { return Status::Invalid(e.what()); } @@ -388,8 +347,8 @@ class ORCFileReader::Impl { // The top-level type must be a struct to read into an arrow table const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch); - const liborc::Type& type = rowreader->getSelectedType(); - while (rowreader->next(*batch)) { + const liborc::Type& type = row_reader->getSelectedType(); + while (row_reader->next(*batch)) { for (int i = 0; i < builder->num_fields(); i++) { RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0, batch->numElements, builder->GetField(i))); @@ -399,283 +358,52 @@ class ORCFileReader::Impl { return Status::OK(); } - Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch, - int64_t offset, int64_t length, ArrayBuilder* builder) { - if (type == nullptr) { - return Status::OK(); - } - liborc::TypeKind kind = type->getKind(); - switch (kind) { - case liborc::STRUCT: - return AppendStructBatch(type, batch, offset, length, builder); - case liborc::LIST: - return AppendListBatch(type, batch, offset, length, builder); - case liborc::MAP: - return AppendMapBatch(type, batch, offset, length, builder); - case liborc::LONG: - return AppendNumericBatch<Int64Builder, liborc::LongVectorBatch, int64_t>( - batch, offset, length, builder); - case liborc::INT: - return AppendNumericBatchCast<Int32Builder, int32_t, liborc::LongVectorBatch, - int64_t>(batch, offset, length, builder); - case liborc::SHORT: - return AppendNumericBatchCast<Int16Builder, int16_t, liborc::LongVectorBatch, - int64_t>(batch, offset, length, builder); - case liborc::BYTE: - return AppendNumericBatchCast<Int8Builder, int8_t, liborc::LongVectorBatch, - int64_t>(batch, offset, length, builder); - case liborc::DOUBLE: - return AppendNumericBatch<DoubleBuilder, liborc::DoubleVectorBatch, double>( - batch, offset, length, builder); - case liborc::FLOAT: - return AppendNumericBatchCast<FloatBuilder, float, liborc::DoubleVectorBatch, - double>(batch, offset, length, builder); - case liborc::BOOLEAN: - return AppendBoolBatch(batch, offset, length, builder); - case liborc::VARCHAR: - case liborc::STRING: - return AppendBinaryBatch<StringBuilder>(batch, offset, length, builder); - case liborc::BINARY: - return AppendBinaryBatch<BinaryBuilder>(batch, offset, length, builder); - case liborc::CHAR: - return AppendFixedBinaryBatch(batch, offset, length, builder); - case liborc::DATE: - return AppendNumericBatchCast<Date32Builder, int32_t, liborc::LongVectorBatch, - int64_t>(batch, offset, length, builder); - case liborc::TIMESTAMP: - return AppendTimestampBatch(batch, offset, length, builder); - case liborc::DECIMAL: - return AppendDecimalBatch(type, batch, offset, length, builder); - default: - return Status::NotImplemented("Not implemented type kind: ", kind); - } - } + Status Seek(int64_t row_number) { + ARROW_RETURN_IF(row_number >= NumberOfRows(), + Status::Invalid("Out of bounds row number: ", row_number)); - Status AppendStructBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch, - int64_t offset, int64_t length, ArrayBuilder* abuilder) { - auto builder = checked_cast<StructBuilder*>(abuilder); - auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch); - - const uint8_t* valid_bytes = nullptr; - if (batch->hasNulls) { - valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset; - } - RETURN_NOT_OK(builder->AppendValues(length, valid_bytes)); - - for (int i = 0; i < builder->num_fields(); i++) { - RETURN_NOT_OK(AppendBatch(type->getSubtype(i), batch->fields[i], offset, length, - builder->field_builder(i))); - } + current_row_ = row_number; return Status::OK(); } - Status AppendListBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch, - int64_t offset, int64_t length, ArrayBuilder* abuilder) { - auto builder = checked_cast<ListBuilder*>(abuilder); - auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch); - liborc::ColumnVectorBatch* elements = batch->elements.get(); - const liborc::Type* elemtype = type->getSubtype(0); - - const bool has_nulls = batch->hasNulls; - for (int64_t i = offset; i < length + offset; i++) { - if (!has_nulls || batch->notNull[i]) { - int64_t start = batch->offsets[i]; - int64_t end = batch->offsets[i + 1]; - RETURN_NOT_OK(builder->Append()); - RETURN_NOT_OK(AppendBatch(elemtype, elements, start, end - start, - builder->value_builder())); - } else { - RETURN_NOT_OK(builder->AppendNull()); - } - } - return Status::OK(); - } - - Status AppendMapBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch, - int64_t offset, int64_t length, ArrayBuilder* abuilder) { - auto list_builder = checked_cast<ListBuilder*>(abuilder); - auto struct_builder = checked_cast<StructBuilder*>(list_builder->value_builder()); - auto batch = checked_cast<liborc::MapVectorBatch*>(cbatch); - liborc::ColumnVectorBatch* keys = batch->keys.get(); - liborc::ColumnVectorBatch* vals = batch->elements.get(); - const liborc::Type* keytype = type->getSubtype(0); - const liborc::Type* valtype = type->getSubtype(1); - - const bool has_nulls = batch->hasNulls; - for (int64_t i = offset; i < length + offset; i++) { - RETURN_NOT_OK(list_builder->Append()); - int64_t start = batch->offsets[i]; - int64_t list_length = batch->offsets[i + 1] - start; - if (list_length && (!has_nulls || batch->notNull[i])) { - RETURN_NOT_OK(struct_builder->AppendValues(list_length, nullptr)); - RETURN_NOT_OK(AppendBatch(keytype, keys, start, list_length, - struct_builder->field_builder(0))); - RETURN_NOT_OK(AppendBatch(valtype, vals, start, list_length, - struct_builder->field_builder(1))); - } - } - return Status::OK(); - } - - template <class builder_type, class batch_type, class elem_type> - Status AppendNumericBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset, - int64_t length, ArrayBuilder* abuilder) { - auto builder = checked_cast<builder_type*>(abuilder); - auto batch = checked_cast<batch_type*>(cbatch); - - if (length == 0) { - return Status::OK(); - } - const uint8_t* valid_bytes = nullptr; - if (batch->hasNulls) { - valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset; - } - const elem_type* source = batch->data.data() + offset; - RETURN_NOT_OK(builder->AppendValues(source, length, valid_bytes)); - return Status::OK(); - } - - template <class builder_type, class target_type, class batch_type, class source_type> - Status AppendNumericBatchCast(liborc::ColumnVectorBatch* cbatch, int64_t offset, - int64_t length, ArrayBuilder* abuilder) { - auto builder = checked_cast<builder_type*>(abuilder); - auto batch = checked_cast<batch_type*>(cbatch); - - if (length == 0) { - return Status::OK(); - } - - const uint8_t* valid_bytes = nullptr; - if (batch->hasNulls) { - valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset; - } - const source_type* source = batch->data.data() + offset; - auto cast_iter = internal::MakeLazyRange( - [&source](int64_t index) { return static_cast<target_type>(source[index]); }, - length); - - RETURN_NOT_OK(builder->AppendValues(cast_iter.begin(), cast_iter.end(), valid_bytes)); - - return Status::OK(); - } - - Status AppendBoolBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset, - int64_t length, ArrayBuilder* abuilder) { - auto builder = checked_cast<BooleanBuilder*>(abuilder); - auto batch = checked_cast<liborc::LongVectorBatch*>(cbatch); - - if (length == 0) { - return Status::OK(); - } - - const uint8_t* valid_bytes = nullptr; - if (batch->hasNulls) { - valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset; - } - const int64_t* source = batch->data.data() + offset; - - auto cast_iter = internal::MakeLazyRange( - [&source](int64_t index) { return static_cast<bool>(source[index]); }, length); - - RETURN_NOT_OK(builder->AppendValues(cast_iter.begin(), cast_iter.end(), valid_bytes)); - - return Status::OK(); - } - - Status AppendTimestampBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset, - int64_t length, ArrayBuilder* abuilder) { - auto builder = checked_cast<TimestampBuilder*>(abuilder); - auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch); - - if (length == 0) { + Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices, + std::shared_ptr<RecordBatchReader>* out) { + if (current_row_ >= NumberOfRows()) { + out->reset(); return Status::OK(); } - const uint8_t* valid_bytes = nullptr; - if (batch->hasNulls) { - valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset; + liborc::RowReaderOptions opts; + if (!include_indices.empty()) { + RETURN_NOT_OK(SelectIndices(&opts, include_indices)); } - - const int64_t* seconds = batch->data.data() + offset; - const int64_t* nanos = batch->nanoseconds.data() + offset; - - auto transform_timestamp = [seconds, nanos](int64_t index) { - return seconds[index] * kOneSecondNanos + nanos[index]; - }; - - auto transform_range = internal::MakeLazyRange(transform_timestamp, length); - - RETURN_NOT_OK(builder->AppendValues(transform_range.begin(), transform_range.end(), - valid_bytes)); - return Status::OK(); - } - - template <class builder_type> - Status AppendBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset, - int64_t length, ArrayBuilder* abuilder) { - auto builder = checked_cast<builder_type*>(abuilder); - auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch); - - const bool has_nulls = batch->hasNulls; - for (int64_t i = offset; i < length + offset; i++) { - if (!has_nulls || batch->notNull[i]) { - RETURN_NOT_OK( - builder->Append(batch->data[i], static_cast<int32_t>(batch->length[i]))); - } else { - RETURN_NOT_OK(builder->AppendNull()); - } + StripeInformation stripe_info; + RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info)); + std::shared_ptr<Schema> schema; + RETURN_NOT_OK(ReadSchema(opts, &schema)); + std::unique_ptr<liborc::RowReader> row_reader; + try { + row_reader = reader_->createRowReader(opts); + row_reader->seekToRow(current_row_); + current_row_ = stripe_info.first_row_of_stripe + stripe_info.num_rows; + } catch (const liborc::ParseError& e) { + return Status::Invalid(e.what()); } - return Status::OK(); - } - Status AppendFixedBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset, - int64_t length, ArrayBuilder* abuilder) { - auto builder = checked_cast<FixedSizeBinaryBuilder*>(abuilder); - auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch); - - const bool has_nulls = batch->hasNulls; - for (int64_t i = offset; i < length + offset; i++) { - if (!has_nulls || batch->notNull[i]) { - RETURN_NOT_OK(builder->Append(batch->data[i])); - } else { - RETURN_NOT_OK(builder->AppendNull()); - } - } + *out = std::shared_ptr<RecordBatchReader>( + new OrcStripeReader(std::move(row_reader), schema, batch_size, pool_)); return Status::OK(); } - Status AppendDecimalBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch, - int64_t offset, int64_t length, ArrayBuilder* abuilder) { - auto builder = checked_cast<Decimal128Builder*>(abuilder); - - const bool has_nulls = cbatch->hasNulls; - if (type->getPrecision() == 0 || type->getPrecision() > 18) { - auto batch = checked_cast<liborc::Decimal128VectorBatch*>(cbatch); - for (int64_t i = offset; i < length + offset; i++) { - if (!has_nulls || batch->notNull[i]) { - RETURN_NOT_OK(builder->Append( - Decimal128(batch->values[i].getHighBits(), batch->values[i].getLowBits()))); - } else { - RETURN_NOT_OK(builder->AppendNull()); - } - } - } else { - auto batch = checked_cast<liborc::Decimal64VectorBatch*>(cbatch); - for (int64_t i = offset; i < length + offset; i++) { - if (!has_nulls || batch->notNull[i]) { - RETURN_NOT_OK(builder->Append(Decimal128(batch->values[i]))); - } else { - RETURN_NOT_OK(builder->AppendNull()); - } - } - } - return Status::OK(); + Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out) { + return NextStripeReader(batch_size, {}, out); } private: MemoryPool* pool_; std::unique_ptr<liborc::Reader> reader_; std::vector<StripeInformation> stripes_; + int64_t current_row_; }; ORCFileReader::ORCFileReader() { impl_.reset(new ORCFileReader::Impl()); } @@ -721,6 +449,19 @@ Status ORCFileReader::ReadStripe(int64_t stripe, const std::vector<int>& include return impl_->ReadStripe(stripe, include_indices, out); } +Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); } + +Status ORCFileReader::NextStripeReader(int64_t batch_sizes, + std::shared_ptr<RecordBatchReader>* out) { + return impl_->NextStripeReader(batch_sizes, out); +} + +Status ORCFileReader::NextStripeReader(int64_t batch_size, + const std::vector<int>& include_indices, + std::shared_ptr<RecordBatchReader>* out) { + return impl_->NextStripeReader(batch_size, include_indices, out); +} + int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); } int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); } diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h index 5ae19b2..6279f68 100644 --- a/cpp/src/arrow/adapters/orc/adapter.h +++ b/cpp/src/arrow/adapters/orc/adapter.h @@ -41,7 +41,7 @@ class ARROW_EXPORT ORCFileReader { public: ~ORCFileReader(); - /// \brief Create a new ORC reader + /// \brief Creates a new ORC reader. /// /// \param[in] file the data source /// \param[in] pool a MemoryPool to use for buffer allocations @@ -102,6 +102,35 @@ class ARROW_EXPORT ORCFileReader { Status ReadStripe(int64_t stripe, const std::vector<int>& include_indices, std::shared_ptr<RecordBatch>* out); + /// \brief Seek to designated row. Invoke NextStripeReader() after seek + /// will return stripe reader starting from designated row. + /// + /// \param[in] row_number the rows number to seek + Status Seek(int64_t row_number); + + /// \brief Get a stripe level record batch iterator with specified row count + /// in each record batch. NextStripeReader serves as an fine grain + /// alternative to ReadStripe which may cause OOM issue by loading + /// the whole stripes into memory. + /// + /// \param[in] batch_size the number of rows each record batch contains in + /// record batch iteration. + /// \param[out] out the returned stripe reader + Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out); + + /// \brief Get a stripe level record batch iterator with specified row count + /// in each record batch. NextStripeReader serves as an fine grain + /// alternative to ReadStripe which may cause OOM issue by loading + /// the whole stripes into memory. + /// + /// \param[in] batch_size Get a stripe level record batch iterator with specified row + /// count in each record batch. + /// + /// \param[in] include_indices the selected field indices to read + /// \param[out] out the returned stripe reader + Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices, + std::shared_ptr<RecordBatchReader>* out); + /// \brief The number of stripes in the file int64_t NumberOfStripes(); diff --git a/cpp/src/arrow/adapters/orc/adapter_util.cc b/cpp/src/arrow/adapters/orc/adapter_util.cc new file mode 100644 index 0000000..235e5ba --- /dev/null +++ b/cpp/src/arrow/adapters/orc/adapter_util.cc @@ -0,0 +1,427 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <string> +#include <vector> + +#include "arrow/adapters/orc/adapter_util.h" +#include "arrow/array/builder_base.h" +#include "arrow/builder.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/decimal.h" +#include "arrow/util/lazy.h" + +#include "orc/Exceptions.hh" +#include "orc/OrcFile.hh" + +// alias to not interfere with nested orc namespace +namespace liborc = orc; + +namespace arrow { + +namespace adapters { + +namespace orc { + +using internal::checked_cast; + +// The numer of nanoseconds in a second +constexpr int64_t kOneSecondNanos = 1000000000LL; + +Status AppendStructBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch, + int64_t offset, int64_t length, ArrayBuilder* abuilder) { + auto builder = checked_cast<StructBuilder*>(abuilder); + auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch); + + const uint8_t* valid_bytes = nullptr; + if (batch->hasNulls) { + valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset; + } + RETURN_NOT_OK(builder->AppendValues(length, valid_bytes)); + + for (int i = 0; i < builder->num_fields(); i++) { + RETURN_NOT_OK(AppendBatch(type->getSubtype(i), batch->fields[i], offset, length, + builder->field_builder(i))); + } + return Status::OK(); +} + +Status AppendListBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch, + int64_t offset, int64_t length, ArrayBuilder* abuilder) { + auto builder = checked_cast<ListBuilder*>(abuilder); + auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch); + liborc::ColumnVectorBatch* elements = batch->elements.get(); + const liborc::Type* elemtype = type->getSubtype(0); + + const bool has_nulls = batch->hasNulls; + for (int64_t i = offset; i < length + offset; i++) { + if (!has_nulls || batch->notNull[i]) { + int64_t start = batch->offsets[i]; + int64_t end = batch->offsets[i + 1]; + RETURN_NOT_OK(builder->Append()); + RETURN_NOT_OK( + AppendBatch(elemtype, elements, start, end - start, builder->value_builder())); + } else { + RETURN_NOT_OK(builder->AppendNull()); + } + } + return Status::OK(); +} + +Status AppendMapBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch, + int64_t offset, int64_t length, ArrayBuilder* abuilder) { + auto list_builder = checked_cast<ListBuilder*>(abuilder); + auto struct_builder = checked_cast<StructBuilder*>(list_builder->value_builder()); + auto batch = checked_cast<liborc::MapVectorBatch*>(cbatch); + liborc::ColumnVectorBatch* keys = batch->keys.get(); + liborc::ColumnVectorBatch* vals = batch->elements.get(); + const liborc::Type* keytype = type->getSubtype(0); + const liborc::Type* valtype = type->getSubtype(1); + + const bool has_nulls = batch->hasNulls; + for (int64_t i = offset; i < length + offset; i++) { + RETURN_NOT_OK(list_builder->Append()); + int64_t start = batch->offsets[i]; + int64_t list_length = batch->offsets[i + 1] - start; + if (list_length && (!has_nulls || batch->notNull[i])) { + RETURN_NOT_OK(struct_builder->AppendValues(list_length, nullptr)); + RETURN_NOT_OK(AppendBatch(keytype, keys, start, list_length, + struct_builder->field_builder(0))); + RETURN_NOT_OK(AppendBatch(valtype, vals, start, list_length, + struct_builder->field_builder(1))); + } + } + return Status::OK(); +} + +template <class builder_type, class batch_type, class elem_type> +Status AppendNumericBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset, + int64_t length, ArrayBuilder* abuilder) { + auto builder = checked_cast<builder_type*>(abuilder); + auto batch = checked_cast<batch_type*>(cbatch); + + if (length == 0) { + return Status::OK(); + } + const uint8_t* valid_bytes = nullptr; + if (batch->hasNulls) { + valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset; + } + const elem_type* source = batch->data.data() + offset; + RETURN_NOT_OK(builder->AppendValues(source, length, valid_bytes)); + return Status::OK(); +} + +template <class builder_type, class target_type, class batch_type, class source_type> +Status AppendNumericBatchCast(liborc::ColumnVectorBatch* cbatch, int64_t offset, + int64_t length, ArrayBuilder* abuilder) { + auto builder = checked_cast<builder_type*>(abuilder); + auto batch = checked_cast<batch_type*>(cbatch); + + if (length == 0) { + return Status::OK(); + } + + const uint8_t* valid_bytes = nullptr; + if (batch->hasNulls) { + valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset; + } + const source_type* source = batch->data.data() + offset; + auto cast_iter = internal::MakeLazyRange( + [&source](int64_t index) { return static_cast<target_type>(source[index]); }, + length); + + RETURN_NOT_OK(builder->AppendValues(cast_iter.begin(), cast_iter.end(), valid_bytes)); + + return Status::OK(); +} + +Status AppendBoolBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset, int64_t length, + ArrayBuilder* abuilder) { + auto builder = checked_cast<BooleanBuilder*>(abuilder); + auto batch = checked_cast<liborc::LongVectorBatch*>(cbatch); + + if (length == 0) { + return Status::OK(); + } + + const uint8_t* valid_bytes = nullptr; + if (batch->hasNulls) { + valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset; + } + const int64_t* source = batch->data.data() + offset; + + auto cast_iter = internal::MakeLazyRange( + [&source](int64_t index) { return static_cast<bool>(source[index]); }, length); + + RETURN_NOT_OK(builder->AppendValues(cast_iter.begin(), cast_iter.end(), valid_bytes)); + + return Status::OK(); +} + +Status AppendTimestampBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset, + int64_t length, ArrayBuilder* abuilder) { + auto builder = checked_cast<TimestampBuilder*>(abuilder); + auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch); + + if (length == 0) { + return Status::OK(); + } + + const uint8_t* valid_bytes = nullptr; + if (batch->hasNulls) { + valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset; + } + + const int64_t* seconds = batch->data.data() + offset; + const int64_t* nanos = batch->nanoseconds.data() + offset; + + auto transform_timestamp = [seconds, nanos](int64_t index) { + return seconds[index] * kOneSecondNanos + nanos[index]; + }; + + auto transform_range = internal::MakeLazyRange(transform_timestamp, length); + + RETURN_NOT_OK( + builder->AppendValues(transform_range.begin(), transform_range.end(), valid_bytes)); + return Status::OK(); +} + +template <class builder_type> +Status AppendBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset, + int64_t length, ArrayBuilder* abuilder) { + auto builder = checked_cast<builder_type*>(abuilder); + auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch); + + const bool has_nulls = batch->hasNulls; + for (int64_t i = offset; i < length + offset; i++) { + if (!has_nulls || batch->notNull[i]) { + RETURN_NOT_OK( + builder->Append(batch->data[i], static_cast<int32_t>(batch->length[i]))); + } else { + RETURN_NOT_OK(builder->AppendNull()); + } + } + return Status::OK(); +} + +Status AppendFixedBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset, + int64_t length, ArrayBuilder* abuilder) { + auto builder = checked_cast<FixedSizeBinaryBuilder*>(abuilder); + auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch); + + const bool has_nulls = batch->hasNulls; + for (int64_t i = offset; i < length + offset; i++) { + if (!has_nulls || batch->notNull[i]) { + RETURN_NOT_OK(builder->Append(batch->data[i])); + } else { + RETURN_NOT_OK(builder->AppendNull()); + } + } + return Status::OK(); +} + +Status AppendDecimalBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch, + int64_t offset, int64_t length, ArrayBuilder* abuilder) { + auto builder = checked_cast<Decimal128Builder*>(abuilder); + + const bool has_nulls = cbatch->hasNulls; + if (type->getPrecision() == 0 || type->getPrecision() > 18) { + auto batch = checked_cast<liborc::Decimal128VectorBatch*>(cbatch); + for (int64_t i = offset; i < length + offset; i++) { + if (!has_nulls || batch->notNull[i]) { + RETURN_NOT_OK(builder->Append( + Decimal128(batch->values[i].getHighBits(), batch->values[i].getLowBits()))); + } else { + RETURN_NOT_OK(builder->AppendNull()); + } + } + } else { + auto batch = checked_cast<liborc::Decimal64VectorBatch*>(cbatch); + for (int64_t i = offset; i < length + offset; i++) { + if (!has_nulls || batch->notNull[i]) { + RETURN_NOT_OK(builder->Append(Decimal128(batch->values[i]))); + } else { + RETURN_NOT_OK(builder->AppendNull()); + } + } + } + return Status::OK(); +} + +Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch, + int64_t offset, int64_t length, ArrayBuilder* builder) { + if (type == nullptr) { + return Status::OK(); + } + liborc::TypeKind kind = type->getKind(); + switch (kind) { + case liborc::STRUCT: + return AppendStructBatch(type, batch, offset, length, builder); + case liborc::LIST: + return AppendListBatch(type, batch, offset, length, builder); + case liborc::MAP: + return AppendMapBatch(type, batch, offset, length, builder); + case liborc::LONG: + return AppendNumericBatch<Int64Builder, liborc::LongVectorBatch, int64_t>( + batch, offset, length, builder); + case liborc::INT: + return AppendNumericBatchCast<Int32Builder, int32_t, liborc::LongVectorBatch, + int64_t>(batch, offset, length, builder); + case liborc::SHORT: + return AppendNumericBatchCast<Int16Builder, int16_t, liborc::LongVectorBatch, + int64_t>(batch, offset, length, builder); + case liborc::BYTE: + return AppendNumericBatchCast<Int8Builder, int8_t, liborc::LongVectorBatch, + int64_t>(batch, offset, length, builder); + case liborc::DOUBLE: + return AppendNumericBatch<DoubleBuilder, liborc::DoubleVectorBatch, double>( + batch, offset, length, builder); + case liborc::FLOAT: + return AppendNumericBatchCast<FloatBuilder, float, liborc::DoubleVectorBatch, + double>(batch, offset, length, builder); + case liborc::BOOLEAN: + return AppendBoolBatch(batch, offset, length, builder); + case liborc::VARCHAR: + case liborc::STRING: + return AppendBinaryBatch<StringBuilder>(batch, offset, length, builder); + case liborc::BINARY: + return AppendBinaryBatch<BinaryBuilder>(batch, offset, length, builder); + case liborc::CHAR: + return AppendFixedBinaryBatch(batch, offset, length, builder); + case liborc::DATE: + return AppendNumericBatchCast<Date32Builder, int32_t, liborc::LongVectorBatch, + int64_t>(batch, offset, length, builder); + case liborc::TIMESTAMP: + return AppendTimestampBatch(batch, offset, length, builder); + case liborc::DECIMAL: + return AppendDecimalBatch(type, batch, offset, length, builder); + default: + return Status::NotImplemented("Not implemented type kind: ", kind); + } +} + +Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out) { + // When subselecting fields on read, liborc will set some nodes to nullptr, + // so we need to check for nullptr before progressing + if (type == nullptr) { + *out = null(); + return Status::OK(); + } + liborc::TypeKind kind = type->getKind(); + const int subtype_count = static_cast<int>(type->getSubtypeCount()); + + switch (kind) { + case liborc::BOOLEAN: + *out = boolean(); + break; + case liborc::BYTE: + *out = int8(); + break; + case liborc::SHORT: + *out = int16(); + break; + case liborc::INT: + *out = int32(); + break; + case liborc::LONG: + *out = int64(); + break; + case liborc::FLOAT: + *out = float32(); + break; + case liborc::DOUBLE: + *out = float64(); + break; + case liborc::VARCHAR: + case liborc::STRING: + *out = utf8(); + break; + case liborc::BINARY: + *out = binary(); + break; + case liborc::CHAR: + *out = fixed_size_binary(static_cast<int>(type->getMaximumLength())); + break; + case liborc::TIMESTAMP: + *out = timestamp(TimeUnit::NANO); + break; + case liborc::DATE: + *out = date32(); + break; + case liborc::DECIMAL: { + const int precision = static_cast<int>(type->getPrecision()); + const int scale = static_cast<int>(type->getScale()); + if (precision == 0) { + // In HIVE 0.11/0.12 precision is set as 0, but means max precision + *out = decimal(38, 6); + } else { + *out = decimal(precision, scale); + } + break; + } + case liborc::LIST: { + if (subtype_count != 1) { + return Status::Invalid("Invalid Orc List type"); + } + std::shared_ptr<DataType> elemtype; + RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &elemtype)); + *out = list(elemtype); + break; + } + case liborc::MAP: { + if (subtype_count != 2) { + return Status::Invalid("Invalid Orc Map type"); + } + std::shared_ptr<DataType> keytype; + std::shared_ptr<DataType> valtype; + RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &keytype)); + RETURN_NOT_OK(GetArrowType(type->getSubtype(1), &valtype)); + *out = list(struct_({field("key", keytype), field("value", valtype)})); + break; + } + case liborc::STRUCT: { + std::vector<std::shared_ptr<Field>> fields; + for (int child = 0; child < subtype_count; ++child) { + std::shared_ptr<DataType> elemtype; + RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype)); + std::string name = type->getFieldName(child); + fields.push_back(field(name, elemtype)); + } + *out = struct_(fields); + break; + } + case liborc::UNION: { + std::vector<std::shared_ptr<Field>> fields; + std::vector<uint8_t> type_codes; + for (int child = 0; child < subtype_count; ++child) { + std::shared_ptr<DataType> elemtype; + RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype)); + fields.push_back(field("_union_" + std::to_string(child), elemtype)); + type_codes.push_back(static_cast<uint8_t>(child)); + } + *out = union_(fields, type_codes); + break; + } + default: { return Status::Invalid("Unknown Orc type kind: ", kind); } + } + return Status::OK(); +} + +} // namespace orc +} // namespace adapters +} // namespace arrow diff --git a/cpp/src/arrow/adapters/orc/adapter_util.h b/cpp/src/arrow/adapters/orc/adapter_util.h new file mode 100644 index 0000000..eede230 --- /dev/null +++ b/cpp/src/arrow/adapters/orc/adapter_util.h @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_ADAPATER_UTIL_H +#define ARROW_ADAPATER_UTIL_H + +#include <cstdint> +#include <memory> + +#include "arrow/array/builder_base.h" +#include "arrow/status.h" +#include "orc/OrcFile.hh" + +namespace liborc = orc; + +namespace arrow { + +namespace adapters { + +namespace orc { + +Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out); + +Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch, + int64_t offset, int64_t length, ArrayBuilder* builder); +} // namespace orc +} // namespace adapters +} // namespace arrow + +#endif // ARROW_ADAPATER_UTIL_H