ARROW-459: [C++] Dictionary IPC support in file and stream formats Also fixes ARROW-565
Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #347 from wesm/ARROW-459 and squashes the following commits: 6a987b7 [Wes McKinney] Fix clang warning with forward declaration 8e0e6fb [Wes McKinney] Fix bug causing valgrind failure dee044e [Wes McKinney] Review comments 7ac756e [Wes McKinney] Fix Python build e5cec27 [Wes McKinney] Add some less trivial dictionary-encoded arrays to test case acfa994 [Wes McKinney] cpplint ef9dea8 [Wes McKinney] More dictionary support in FileReader. Simple test passes cb04a41 [Wes McKinney] Refactoring. Remove FileFooter class in favor of private impl in FileReader 1cee0ff [Wes McKinney] More progress toward file/stream roundtrips with dictionaries ae389fa [Wes McKinney] WIP progress toward stream/file dictionary roundtrip 6858e12 [Wes McKinney] Add union and dictionary to file/stream tests d537004 [Wes McKinney] Add support for deconstructing and reconstructing DictionaryArray with known schema Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/d28f1c1e Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/d28f1c1e Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/d28f1c1e Branch: refs/heads/master Commit: d28f1c1e0f21bc578b84ab4bed4cf259c333fbc9 Parents: 5e279f0 Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Fri Feb 24 09:16:32 2017 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Fri Feb 24 09:16:32 2017 -0500 ---------------------------------------------------------------------- cpp/CMakeLists.txt | 4 +- cpp/src/arrow/array.h | 2 + cpp/src/arrow/io/CMakeLists.txt | 9 +- cpp/src/arrow/ipc/CMakeLists.txt | 15 +- cpp/src/arrow/ipc/adapter.cc | 189 ++++++++++------ cpp/src/arrow/ipc/adapter.h | 30 +-- cpp/src/arrow/ipc/file.cc | 306 +++++++++++++------------- cpp/src/arrow/ipc/file.h | 51 +---- cpp/src/arrow/ipc/ipc-adapter-test.cc | 46 ++-- cpp/src/arrow/ipc/ipc-file-test.cc | 78 +++---- cpp/src/arrow/ipc/ipc-metadata-test.cc | 17 +- cpp/src/arrow/ipc/metadata-internal.cc | 239 ++++++++++++++++---- cpp/src/arrow/ipc/metadata-internal.h | 45 ++-- cpp/src/arrow/ipc/metadata.cc | 185 ++++++++++++++-- cpp/src/arrow/ipc/metadata.h | 97 ++++++-- cpp/src/arrow/ipc/stream.cc | 207 ++++++++++++----- cpp/src/arrow/ipc/stream.h | 34 ++- cpp/src/arrow/ipc/test-common.h | 80 +++++++ cpp/src/arrow/type.cc | 6 +- cpp/src/arrow/type.h | 14 +- python/pyarrow/includes/libarrow_ipc.pxd | 1 - python/pyarrow/io.pyx | 5 - 22 files changed, 1093 insertions(+), 567 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 0888a8b..b77f8c7 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -102,7 +102,9 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") ON) endif() -if(NOT ARROW_BUILD_TESTS) +if(ARROW_BUILD_TESTS) + set(ARROW_BUILD_STATIC ON) +else() set(NO_TESTS 1) endif() http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/array.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 32d156b..9bb06af 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -40,6 +40,8 @@ class Status; class ArrayVisitor { public: + virtual ~ArrayVisitor() = default; + virtual Status Visit(const NullArray& array) = 0; virtual Status Visit(const BooleanArray& array) = 0; virtual Status Visit(const Int8Array& array) = 0; http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/io/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index b8882e4..ceb7b73 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -70,13 +70,8 @@ set(ARROW_IO_STATIC_PRIVATE_LINK_LIBS boost_system_static boost_filesystem_static) -if (ARROW_BUILD_STATIC) - set(ARROW_IO_TEST_LINK_LIBS - arrow_io_static) -else() - set(ARROW_IO_TEST_LINK_LIBS - arrow_io_shared) -endif() +set(ARROW_IO_TEST_LINK_LIBS + arrow_io_static) set(ARROW_IO_SRCS file.cc http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index c047f53..e7a3fdb 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -24,20 +24,9 @@ set(ARROW_IPC_SHARED_LINK_LIBS arrow_shared ) -set(ARROW_IPC_STATIC_LINK_LIBS - arrow_static +set(ARROW_IPC_TEST_LINK_LIBS arrow_io_static -) - -if (ARROW_BUILD_STATIC) - set(ARROW_IPC_TEST_LINK_LIBS - arrow_io_static - arrow_ipc_static) -else() - set(ARROW_IPC_TEST_LINK_LIBS - arrow_io_shared - arrow_ipc_shared) -endif() + arrow_ipc_static) set(ARROW_IPC_SRCS adapter.cc http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/adapter.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index a24c007..08ac983 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -51,12 +51,15 @@ namespace ipc { class RecordBatchWriter : public ArrayVisitor { public: - RecordBatchWriter(MemoryPool* pool, const RecordBatch& batch, - int64_t buffer_start_offset, int max_recursion_depth) + RecordBatchWriter( + MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth) : pool_(pool), - batch_(batch), max_recursion_depth_(max_recursion_depth), - buffer_start_offset_(buffer_start_offset) {} + buffer_start_offset_(buffer_start_offset) { + DCHECK_GT(max_recursion_depth, 0); + } + + virtual ~RecordBatchWriter() = default; Status VisitArray(const Array& arr) { if (max_recursion_depth_ <= 0) { @@ -81,7 +84,7 @@ class RecordBatchWriter : public ArrayVisitor { return arr.Accept(this); } - Status Assemble(int64_t* body_length) { + Status Assemble(const RecordBatch& batch, int64_t* body_length) { if (field_nodes_.size() > 0) { field_nodes_.clear(); buffer_meta_.clear(); @@ -89,8 +92,8 @@ class RecordBatchWriter : public ArrayVisitor { } // Perform depth-first traversal of the row-batch - for (int i = 0; i < batch_.num_columns(); ++i) { - RETURN_NOT_OK(VisitArray(*batch_.column(i))); + for (int i = 0; i < batch.num_columns(); ++i) { + RETURN_NOT_OK(VisitArray(*batch.column(i))); } // The position for the start of a buffer relative to the passed frame of @@ -127,16 +130,22 @@ class RecordBatchWriter : public ArrayVisitor { return Status::OK(); } - Status WriteMetadata( - int64_t body_length, io::OutputStream* dst, int32_t* metadata_length) { + // Override this for writing dictionary metadata + virtual Status WriteMetadataMessage( + int32_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) { + return WriteRecordBatchMessage( + num_rows, body_length, field_nodes_, buffer_meta_, out); + } + + Status WriteMetadata(int32_t num_rows, int64_t body_length, io::OutputStream* dst, + int32_t* metadata_length) { // Now that we have computed the locations of all of the buffers in shared // memory, the data header can be converted to a flatbuffer and written out // // Note: The memory written here is prefixed by the size of the flatbuffer // itself as an int32_t. std::shared_ptr<Buffer> metadata_fb; - RETURN_NOT_OK(WriteRecordBatchMetadata( - batch_.num_rows(), body_length, field_nodes_, buffer_meta_, &metadata_fb)); + RETURN_NOT_OK(WriteMetadataMessage(num_rows, body_length, &metadata_fb)); // Need to write 4 bytes (metadata size), the metadata, plus padding to // end on an 8-byte offset @@ -166,15 +175,16 @@ class RecordBatchWriter : public ArrayVisitor { return Status::OK(); } - Status Write(io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) { - RETURN_NOT_OK(Assemble(body_length)); + Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length) { + RETURN_NOT_OK(Assemble(batch, body_length)); #ifndef NDEBUG int64_t start_position, current_position; RETURN_NOT_OK(dst->Tell(&start_position)); #endif - RETURN_NOT_OK(WriteMetadata(*body_length, dst, metadata_length)); + RETURN_NOT_OK(WriteMetadata(batch.num_rows(), *body_length, dst, metadata_length)); #ifndef NDEBUG RETURN_NOT_OK(dst->Tell(¤t_position)); @@ -206,17 +216,17 @@ class RecordBatchWriter : public ArrayVisitor { return Status::OK(); } - Status GetTotalSize(int64_t* size) { + Status GetTotalSize(const RecordBatch& batch, int64_t* size) { // emulates the behavior of Write without actually writing int32_t metadata_length = 0; int64_t body_length = 0; MockOutputStream dst; - RETURN_NOT_OK(Write(&dst, &metadata_length, &body_length)); + RETURN_NOT_OK(Write(batch, &dst, &metadata_length, &body_length)); *size = dst.GetExtentBytesWritten(); return Status::OK(); } - private: + protected: Status Visit(const NullArray& array) override { return Status::NotImplemented("null"); } template <typename ArrayType> @@ -468,15 +478,12 @@ class RecordBatchWriter : public ArrayVisitor { } Status Visit(const DictionaryArray& array) override { - // Dictionary written out separately - const auto& indices = static_cast<const PrimitiveArray&>(*array.indices().get()); - buffers_.push_back(indices.data()); - return Status::OK(); + // Dictionary written out separately. Slice offset contained in the indices + return array.indices()->Accept(this); } // In some cases, intermediate buffers may need to be allocated (with sliced arrays) MemoryPool* pool_; - const RecordBatch& batch_; std::vector<flatbuf::FieldNode> field_nodes_; std::vector<flatbuf::Buffer> buffer_meta_; @@ -486,17 +493,51 @@ class RecordBatchWriter : public ArrayVisitor { int64_t buffer_start_offset_; }; +class DictionaryWriter : public RecordBatchWriter { + public: + using RecordBatchWriter::RecordBatchWriter; + + Status WriteMetadataMessage( + int32_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override { + return WriteDictionaryMessage( + dictionary_id_, num_rows, body_length, field_nodes_, buffer_meta_, out); + } + + Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, + io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) { + dictionary_id_ = dictionary_id; + + // Make a dummy record batch. A bit tedious as we have to make a schema + std::vector<std::shared_ptr<Field>> fields = { + arrow::field("dictionary", dictionary->type())}; + auto schema = std::make_shared<Schema>(fields); + RecordBatch batch(schema, dictionary->length(), {dictionary}); + + return RecordBatchWriter::Write(batch, dst, metadata_length, body_length); + } + + private: + // TODO(wesm): Setting this in Write is a bit unclean, but it works + int64_t dictionary_id_; +}; + Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, MemoryPool* pool, int max_recursion_depth) { - DCHECK_GT(max_recursion_depth, 0); - RecordBatchWriter serializer(pool, batch, buffer_start_offset, max_recursion_depth); - return serializer.Write(dst, metadata_length, body_length); + RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth); + return writer.Write(batch, dst, metadata_length, body_length); +} + +Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, + int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length, MemoryPool* pool) { + DictionaryWriter writer(pool, buffer_start_offset, kMaxIpcRecursionDepth); + return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length); } Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { - RecordBatchWriter serializer(default_memory_pool(), batch, 0, kMaxIpcRecursionDepth); - RETURN_NOT_OK(serializer.GetTotalSize(size)); + RecordBatchWriter writer(default_memory_pool(), 0, kMaxIpcRecursionDepth); + RETURN_NOT_OK(writer.GetTotalSize(batch, size)); return Status::OK(); } @@ -580,10 +621,9 @@ class ArrayLoader : public TypeVisitor { Status LoadPrimitive(const DataType& type) { FieldMetadata field_meta; - std::shared_ptr<Buffer> null_bitmap; - RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); + std::shared_ptr<Buffer> null_bitmap, data; - std::shared_ptr<Buffer> data; + RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); if (field_meta.length > 0) { RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &data)); } else { @@ -597,11 +637,9 @@ class ArrayLoader : public TypeVisitor { template <typename CONTAINER> Status LoadBinary() { FieldMetadata field_meta; - std::shared_ptr<Buffer> null_bitmap; - RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); + std::shared_ptr<Buffer> null_bitmap, offsets, values; - std::shared_ptr<Buffer> offsets; - std::shared_ptr<Buffer> values; + RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); if (field_meta.length > 0) { RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets)); RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values)); @@ -661,11 +699,9 @@ class ArrayLoader : public TypeVisitor { Status Visit(const ListType& type) override { FieldMetadata field_meta; - std::shared_ptr<Buffer> null_bitmap; + std::shared_ptr<Buffer> null_bitmap, offsets; RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); - - std::shared_ptr<Buffer> offsets; if (field_meta.length > 0) { RETURN_NOT_OK(GetBuffer(context_->buffer_index, &offsets)); } else { @@ -715,12 +751,9 @@ class ArrayLoader : public TypeVisitor { Status Visit(const UnionType& type) override { FieldMetadata field_meta; - std::shared_ptr<Buffer> null_bitmap; - RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); - - std::shared_ptr<Buffer> type_ids = nullptr; - std::shared_ptr<Buffer> offsets = nullptr; + std::shared_ptr<Buffer> null_bitmap, type_ids, offsets; + RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); if (field_meta.length > 0) { RETURN_NOT_OK(GetBuffer(context_->buffer_index, &type_ids)); if (type.mode == UnionMode::DENSE) { @@ -738,13 +771,23 @@ class ArrayLoader : public TypeVisitor { } Status Visit(const DictionaryType& type) override { - return Status::NotImplemented("dictionary"); + FieldMetadata field_meta; + std::shared_ptr<Buffer> null_bitmap, indices_data; + RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); + RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &indices_data)); + + std::shared_ptr<Array> indices; + RETURN_NOT_OK(MakePrimitiveArray(type.index_type(), field_meta.length, indices_data, + null_bitmap, field_meta.null_count, 0, &indices)); + + result_ = std::make_shared<DictionaryArray>(field_.type, indices); + return Status::OK(); }; }; class RecordBatchReader { public: - RecordBatchReader(const std::shared_ptr<RecordBatchMetadata>& metadata, + RecordBatchReader(const RecordBatchMetadata& metadata, const std::shared_ptr<Schema>& schema, int max_recursion_depth, io::ReadableFileInterface* file) : metadata_(metadata), @@ -758,7 +801,7 @@ class RecordBatchReader { // The field_index and buffer_index are incremented in the ArrayLoader // based on how much of the batch is "consumed" (through nested data // reconstruction, for example) - context_.metadata = metadata_.get(); + context_.metadata = &metadata_; context_.field_index = 0; context_.buffer_index = 0; context_.max_recursion_depth = max_recursion_depth_; @@ -768,50 +811,58 @@ class RecordBatchReader { RETURN_NOT_OK(loader.Load(&arrays[i])); } - *out = std::make_shared<RecordBatch>(schema_, metadata_->length(), arrays); + *out = std::make_shared<RecordBatch>(schema_, metadata_.length(), arrays); return Status::OK(); } private: RecordBatchContext context_; - std::shared_ptr<RecordBatchMetadata> metadata_; + const RecordBatchMetadata& metadata_; std::shared_ptr<Schema> schema_; int max_recursion_depth_; io::ReadableFileInterface* file_; }; -Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length, - io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* metadata) { - std::shared_ptr<Buffer> buffer; - RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer)); - - int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data()); - - if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) { - std::stringstream ss; - ss << "flatbuffer size " << metadata_length << " invalid. File offset: " << offset - << ", metadata length: " << metadata_length; - return Status::Invalid(ss.str()); - } - - std::shared_ptr<Message> message; - RETURN_NOT_OK(Message::Open(buffer, 4, &message)); - *metadata = std::make_shared<RecordBatchMetadata>(message); - return Status::OK(); -} - -Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata, +Status ReadRecordBatch(const RecordBatchMetadata& metadata, const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out) { return ReadRecordBatch(metadata, schema, kMaxIpcRecursionDepth, file, out); } -Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata, +Status ReadRecordBatch(const RecordBatchMetadata& metadata, const std::shared_ptr<Schema>& schema, int max_recursion_depth, io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out) { RecordBatchReader reader(metadata, schema, max_recursion_depth, file); return reader.Read(out); } +Status ReadDictionary(const DictionaryBatchMetadata& metadata, + const DictionaryTypeMap& dictionary_types, io::ReadableFileInterface* file, + std::shared_ptr<Array>* out) { + int64_t id = metadata.id(); + auto it = dictionary_types.find(id); + if (it == dictionary_types.end()) { + std::stringstream ss; + ss << "Do not have type metadata for dictionary with id: " << id; + return Status::KeyError(ss.str()); + } + + std::vector<std::shared_ptr<Field>> fields = {it->second}; + + // We need a schema for the record batch + auto dummy_schema = std::make_shared<Schema>(fields); + + // The dictionary is embedded in a record batch with a single column + std::shared_ptr<RecordBatch> batch; + RETURN_NOT_OK(ReadRecordBatch(metadata.record_batch(), dummy_schema, file, &batch)); + + if (batch->num_columns() != 1) { + return Status::Invalid("Dictionary record batch must only contain one field"); + } + + *out = batch->column(0); + return Status::OK(); +} + } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/adapter.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h index 83542d0..b7d8fa9 100644 --- a/cpp/src/arrow/ipc/adapter.h +++ b/cpp/src/arrow/ipc/adapter.h @@ -25,6 +25,7 @@ #include <memory> #include <vector> +#include "arrow/ipc/metadata.h" #include "arrow/util/visibility.h" namespace arrow { @@ -44,8 +45,6 @@ class OutputStream; namespace ipc { -class RecordBatchMetadata; - // ---------------------------------------------------------------------- // Write path // We have trouble decoding flatbuffers if the size i > 70, so 64 is a nice round number @@ -72,34 +71,35 @@ constexpr int kMaxIpcRecursionDepth = 64; // // @param(out) body_length: the size of the contiguous buffer block plus // padding bytes -Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch, +Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, + io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, + MemoryPool* pool, int max_recursion_depth = kMaxIpcRecursionDepth); + +// Write Array as a DictionaryBatch message +Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, - int64_t* body_length, MemoryPool* pool, - int max_recursion_depth = kMaxIpcRecursionDepth); + int64_t* body_length, MemoryPool* pool); // Compute the precise number of bytes needed in a contiguous memory segment to // write the record batch. This involves generating the complete serialized // Flatbuffers metadata. -Status ARROW_EXPORT GetRecordBatchSize(const RecordBatch& batch, int64_t* size); +Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size); // ---------------------------------------------------------------------- // "Read" path; does not copy data if the input supports zero copy reads -// Read the record batch flatbuffer metadata starting at the indicated file offset -// -// The flatbuffer is expected to be length-prefixed, so the metadata_length -// includes at least the length prefix and the flatbuffer -Status ARROW_EXPORT ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length, - io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* metadata); - -Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata, +Status ReadRecordBatch(const RecordBatchMetadata& metadata, const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out); -Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata, +Status ReadRecordBatch(const RecordBatchMetadata& metadata, const std::shared_ptr<Schema>& schema, int max_recursion_depth, io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out); +Status ReadDictionary(const DictionaryBatchMetadata& metadata, + const DictionaryTypeMap& dictionary_types, io::ReadableFileInterface* file, + std::shared_ptr<Array>* out); + } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc index 3b18326..c1d483f 100644 --- a/cpp/src/arrow/ipc/file.cc +++ b/cpp/src/arrow/ipc/file.cc @@ -36,8 +36,6 @@ namespace arrow { namespace ipc { static constexpr const char* kArrowMagicBytes = "ARROW1"; -// ---------------------------------------------------------------------- -// File footer static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>> FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) { @@ -51,11 +49,12 @@ FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) { } Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, - const std::vector<FileBlock>& record_batches, io::OutputStream* out) { + const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, + io::OutputStream* out) { FBB fbb; flatbuffers::Offset<flatbuf::Schema> fb_schema; - RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema)); + RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema)); auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); @@ -74,87 +73,6 @@ static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength()); } -class FileFooter::FileFooterImpl { - public: - FileFooterImpl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Footer* footer) - : buffer_(buffer), footer_(footer) {} - - int num_dictionaries() const { return footer_->dictionaries()->size(); } - - int num_record_batches() const { return footer_->recordBatches()->size(); } - - MetadataVersion::type version() const { - switch (footer_->version()) { - case flatbuf::MetadataVersion_V1: - return MetadataVersion::V1; - case flatbuf::MetadataVersion_V2: - return MetadataVersion::V2; - // Add cases as other versions become available - default: - return MetadataVersion::V2; - } - } - - FileBlock record_batch(int i) const { - return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); - } - - FileBlock dictionary(int i) const { - return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); - } - - Status GetSchema(std::shared_ptr<Schema>* out) const { - auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema()); - return schema_msg->GetSchema(out); - } - - private: - // Retain reference to memory - std::shared_ptr<Buffer> buffer_; - - const flatbuf::Footer* footer_; -}; - -FileFooter::FileFooter() {} - -FileFooter::~FileFooter() {} - -Status FileFooter::Open( - const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) { - const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data()); - - *out = std::unique_ptr<FileFooter>(new FileFooter()); - - // TODO(wesm): Verify the footer - (*out)->impl_.reset(new FileFooterImpl(buffer, footer)); - - return Status::OK(); -} - -int FileFooter::num_dictionaries() const { - return impl_->num_dictionaries(); -} - -int FileFooter::num_record_batches() const { - return impl_->num_record_batches(); -} - -MetadataVersion::type FileFooter::version() const { - return impl_->version(); -} - -FileBlock FileFooter::record_batch(int i) const { - return impl_->record_batch(i); -} - -FileBlock FileFooter::dictionary(int i) const { - return impl_->dictionary(i); -} - -Status FileFooter::GetSchema(std::shared_ptr<Schema>* out) const { - return impl_->GetSchema(out); -} - // ---------------------------------------------------------------------- // File writer implementation @@ -171,22 +89,17 @@ Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& s Status FileWriter::Start() { RETURN_NOT_OK(WriteAligned( reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes))); - started_ = true; - return Status::OK(); -} -Status FileWriter::WriteRecordBatch(const RecordBatch& batch) { - // Push an empty FileBlock - // Append metadata, to be written in the footer later - record_batches_.emplace_back(0, 0, 0); - return StreamWriter::WriteRecordBatch( - batch, &record_batches_[record_batches_.size() - 1]); + // We write the schema at the start of the file (and the end). This also + // writes all the dictionaries at the beginning of the file + return StreamWriter::Start(); } Status FileWriter::Close() { // Write metadata int64_t initial_position = position_; - RETURN_NOT_OK(WriteFileFooter(*schema_, dictionaries_, record_batches_, sink_)); + RETURN_NOT_OK(WriteFileFooter( + *schema_, dictionaries_, record_batches_, dictionary_memo_.get(), sink_)); RETURN_NOT_OK(UpdatePosition()); // Write footer length @@ -204,89 +117,180 @@ Status FileWriter::Close() { // ---------------------------------------------------------------------- // Reader implementation -FileReader::FileReader( - const std::shared_ptr<io::ReadableFileInterface>& file, int64_t footer_offset) - : file_(file), footer_offset_(footer_offset) {} +class FileReader::FileReaderImpl { + public: + FileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); } -FileReader::~FileReader() {} + Status ReadFooter() { + int magic_size = static_cast<int>(strlen(kArrowMagicBytes)); -Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file, - std::shared_ptr<FileReader>* reader) { - int64_t footer_offset; - RETURN_NOT_OK(file->GetSize(&footer_offset)); - return Open(file, footer_offset, reader); -} + if (footer_offset_ <= magic_size * 2 + 4) { + std::stringstream ss; + ss << "File is too small: " << footer_offset_; + return Status::Invalid(ss.str()); + } -Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file, - int64_t footer_offset, std::shared_ptr<FileReader>* reader) { - *reader = std::shared_ptr<FileReader>(new FileReader(file, footer_offset)); - return (*reader)->ReadFooter(); -} + std::shared_ptr<Buffer> buffer; + int file_end_size = magic_size + sizeof(int32_t); + RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, &buffer)); + + if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) { + return Status::Invalid("Not an Arrow file"); + } + + int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data()); + + if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > footer_offset_) { + return Status::Invalid("File is smaller than indicated metadata size"); + } -Status FileReader::ReadFooter() { - int magic_size = static_cast<int>(strlen(kArrowMagicBytes)); + // Now read the footer + RETURN_NOT_OK(file_->ReadAt( + footer_offset_ - footer_length - file_end_size, footer_length, &footer_buffer_)); - if (footer_offset_ <= magic_size * 2 + 4) { - std::stringstream ss; - ss << "File is too small: " << footer_offset_; - return Status::Invalid(ss.str()); + // TODO(wesm): Verify the footer + footer_ = flatbuf::GetFooter(footer_buffer_->data()); + schema_metadata_.reset(new SchemaMetadata(nullptr, footer_->schema())); + + return Status::OK(); + } + + int num_dictionaries() const { return footer_->dictionaries()->size(); } + + int num_record_batches() const { return footer_->recordBatches()->size(); } + + MetadataVersion::type version() const { + switch (footer_->version()) { + case flatbuf::MetadataVersion_V1: + return MetadataVersion::V1; + case flatbuf::MetadataVersion_V2: + return MetadataVersion::V2; + // Add cases as other versions become available + default: + return MetadataVersion::V2; + } + } + + FileBlock record_batch(int i) const { + return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); + } + + FileBlock dictionary(int i) const { + return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); } - std::shared_ptr<Buffer> buffer; - int file_end_size = magic_size + sizeof(int32_t); - RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, &buffer)); + const SchemaMetadata& schema_metadata() const { return *schema_metadata_; } + + Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { + DCHECK_GE(i, 0); + DCHECK_LT(i, num_record_batches()); + FileBlock block = record_batch(i); + + std::shared_ptr<Message> message; + RETURN_NOT_OK( + ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); + auto metadata = std::make_shared<RecordBatchMetadata>(message); - if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) { - return Status::Invalid("Not an Arrow file"); + // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see + // ARROW-384). + std::shared_ptr<Buffer> buffer_block; + RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block)); + io::BufferReader reader(buffer_block); + + return ReadRecordBatch(*metadata, schema_, &reader, batch); } - int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data()); + Status ReadSchema() { + RETURN_NOT_OK(schema_metadata_->GetDictionaryTypes(&dictionary_fields_)); + + // Read all the dictionaries + for (int i = 0; i < num_dictionaries(); ++i) { + FileBlock block = dictionary(i); + std::shared_ptr<Message> message; + RETURN_NOT_OK( + ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); + + // TODO(wesm): ARROW-577: This code is duplicated, can be fixed with a more + // invasive refactor + DictionaryBatchMetadata metadata(message); + + // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see + // ARROW-384). + std::shared_ptr<Buffer> buffer_block; + RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block)); + io::BufferReader reader(buffer_block); + + std::shared_ptr<Array> dictionary; + RETURN_NOT_OK(ReadDictionary(metadata, dictionary_fields_, &reader, &dictionary)); + RETURN_NOT_OK(dictionary_memo_->AddDictionary(metadata.id(), dictionary)); + } - if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > footer_offset_) { - return Status::Invalid("File is smaller than indicated metadata size"); + // Get the schema + return schema_metadata_->GetSchema(*dictionary_memo_, &schema_); } - // Now read the footer - RETURN_NOT_OK(file_->ReadAt( - footer_offset_ - footer_length - file_end_size, footer_length, &buffer)); - RETURN_NOT_OK(FileFooter::Open(buffer, &footer_)); + Status Open( + const std::shared_ptr<io::ReadableFileInterface>& file, int64_t footer_offset) { + file_ = file; + footer_offset_ = footer_offset; + RETURN_NOT_OK(ReadFooter()); + return ReadSchema(); + } + + std::shared_ptr<Schema> schema() const { return schema_; } + + private: + std::shared_ptr<io::ReadableFileInterface> file_; - // Get the schema - return footer_->GetSchema(&schema_); + // The location where the Arrow file layout ends. May be the end of the file + // or some other location if embedded in a larger file. + int64_t footer_offset_; + + // Footer metadata + std::shared_ptr<Buffer> footer_buffer_; + const flatbuf::Footer* footer_; + std::unique_ptr<SchemaMetadata> schema_metadata_; + + DictionaryTypeMap dictionary_fields_; + std::shared_ptr<DictionaryMemo> dictionary_memo_; + + // Reconstructed schema, including any read dictionaries + std::shared_ptr<Schema> schema_; +}; + +FileReader::FileReader() { + impl_.reset(new FileReaderImpl()); } -std::shared_ptr<Schema> FileReader::schema() const { - return schema_; +FileReader::~FileReader() {} + +Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file, + std::shared_ptr<FileReader>* reader) { + int64_t footer_offset; + RETURN_NOT_OK(file->GetSize(&footer_offset)); + return Open(file, footer_offset, reader); +} + +Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file, + int64_t footer_offset, std::shared_ptr<FileReader>* reader) { + *reader = std::shared_ptr<FileReader>(new FileReader()); + return (*reader)->impl_->Open(file, footer_offset); } -int FileReader::num_dictionaries() const { - return footer_->num_dictionaries(); +std::shared_ptr<Schema> FileReader::schema() const { + return impl_->schema(); } int FileReader::num_record_batches() const { - return footer_->num_record_batches(); + return impl_->num_record_batches(); } MetadataVersion::type FileReader::version() const { - return footer_->version(); + return impl_->version(); } Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { - DCHECK_GE(i, 0); - DCHECK_LT(i, num_record_batches()); - FileBlock block = footer_->record_batch(i); - - std::shared_ptr<RecordBatchMetadata> metadata; - RETURN_NOT_OK(ReadRecordBatchMetadata( - block.offset, block.metadata_length, file_.get(), &metadata)); - - // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see - // ARROW-384). - std::shared_ptr<Buffer> buffer_block; - RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block)); - io::BufferReader reader(buffer_block); - - return ReadRecordBatch(metadata, schema_, &reader, batch); + return impl_->GetRecordBatch(i, batch); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/file.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/file.h b/cpp/src/arrow/ipc/file.h index cf0baab..524766c 100644 --- a/cpp/src/arrow/ipc/file.h +++ b/cpp/src/arrow/ipc/file.h @@ -45,45 +45,21 @@ class ReadableFileInterface; namespace ipc { Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, - const std::vector<FileBlock>& record_batches, io::OutputStream* out); - -class ARROW_EXPORT FileFooter { - public: - ~FileFooter(); - - static Status Open( - const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out); - - int num_dictionaries() const; - int num_record_batches() const; - MetadataVersion::type version() const; - - FileBlock record_batch(int i) const; - FileBlock dictionary(int i) const; - - Status GetSchema(std::shared_ptr<Schema>* out) const; - - private: - FileFooter(); - class FileFooterImpl; - std::unique_ptr<FileFooterImpl> impl_; -}; + const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, + io::OutputStream* out); class ARROW_EXPORT FileWriter : public StreamWriter { public: static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, std::shared_ptr<FileWriter>* out); - Status WriteRecordBatch(const RecordBatch& batch) override; + using StreamWriter::WriteRecordBatch; Status Close() override; private: FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema); Status Start() override; - - std::vector<FileBlock> dictionaries_; - std::vector<FileBlock> record_batches_; }; class ARROW_EXPORT FileReader { @@ -108,13 +84,9 @@ class ARROW_EXPORT FileReader { static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file, int64_t footer_offset, std::shared_ptr<FileReader>* reader); + /// The schema includes any dictionaries std::shared_ptr<Schema> schema() const; - // Shared dictionaries for dictionary-encoding cross record batches - // TODO(wesm): Implement dictionary reading when we also have dictionary - // encoding - int num_dictionaries() const; - int num_record_batches() const; MetadataVersion::type version() const; @@ -127,19 +99,10 @@ class ARROW_EXPORT FileReader { Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch); private: - FileReader( - const std::shared_ptr<io::ReadableFileInterface>& file, int64_t footer_offset); - - Status ReadFooter(); - - std::shared_ptr<io::ReadableFileInterface> file_; - - // The location where the Arrow file layout ends. May be the end of the file - // or some other location if embedded in a larger file. - int64_t footer_offset_; + FileReader(); - std::unique_ptr<FileFooter> footer_; - std::shared_ptr<Schema> schema_; + class ARROW_NO_EXPORT FileReaderImpl; + std::unique_ptr<FileReaderImpl> impl_; }; } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/ipc-adapter-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index d11b95b..8999363 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -27,6 +27,7 @@ #include "arrow/io/memory.h" #include "arrow/io/test-common.h" #include "arrow/ipc/adapter.h" +#include "arrow/ipc/metadata.h" #include "arrow/ipc/test-common.h" #include "arrow/ipc/util.h" @@ -40,12 +41,8 @@ namespace arrow { namespace ipc { -class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>, - public io::MemoryMapFixture { +class IpcTestFixture : public io::MemoryMapFixture { public: - void SetUp() { pool_ = default_memory_pool(); } - void TearDown() { io::MemoryMapFixture::TearDown(); } - Status RoundTripHelper(const RecordBatch& batch, int memory_map_size, std::shared_ptr<RecordBatch>* batch_result) { std::string path = "test-write-row-batch"; @@ -59,8 +56,9 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>, RETURN_NOT_OK(WriteRecordBatch( batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_)); - std::shared_ptr<RecordBatchMetadata> metadata; - RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata)); + std::shared_ptr<Message> message; + RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); + auto metadata = std::make_shared<RecordBatchMetadata>(message); // The buffer offsets start at 0, so we must construct a // ReadableFileInterface according to that frame of reference @@ -68,7 +66,7 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>, RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload)); io::BufferReader buffer_reader(buffer_payload); - return ReadRecordBatch(metadata, batch.schema(), &buffer_reader, batch_result); + return ReadRecordBatch(*metadata, batch.schema(), &buffer_reader, batch_result); } void CheckRoundtrip(const RecordBatch& batch, int64_t buffer_size) { @@ -112,14 +110,29 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>, MemoryPool* pool_; }; -TEST_P(TestWriteRecordBatch, RoundTrip) { +class TestWriteRecordBatch : public ::testing::Test, public IpcTestFixture { + public: + void SetUp() { pool_ = default_memory_pool(); } + void TearDown() { io::MemoryMapFixture::TearDown(); } +}; + +class TestRecordBatchParam : public ::testing::TestWithParam<MakeRecordBatch*>, + public IpcTestFixture { + public: + void SetUp() { pool_ = default_memory_pool(); } + void TearDown() { io::MemoryMapFixture::TearDown(); } + using IpcTestFixture::RoundTripHelper; + using IpcTestFixture::CheckRoundtrip; +}; + +TEST_P(TestRecordBatchParam, RoundTrip) { std::shared_ptr<RecordBatch> batch; ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue CheckRoundtrip(*batch, 1 << 20); } -TEST_P(TestWriteRecordBatch, SliceRoundTrip) { +TEST_P(TestRecordBatchParam, SliceRoundTrip) { std::shared_ptr<RecordBatch> batch; ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue @@ -130,7 +143,7 @@ TEST_P(TestWriteRecordBatch, SliceRoundTrip) { CheckRoundtrip(*sliced_batch, 1 << 20); } -TEST_P(TestWriteRecordBatch, ZeroLengthArrays) { +TEST_P(TestRecordBatchParam, ZeroLengthArrays) { std::shared_ptr<RecordBatch> batch; ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue @@ -159,10 +172,10 @@ TEST_P(TestWriteRecordBatch, ZeroLengthArrays) { } INSTANTIATE_TEST_CASE_P( - RoundTripTests, TestWriteRecordBatch, + RoundTripTests, TestRecordBatchParam, ::testing::Values(&MakeIntRecordBatch, &MakeStringTypesRecordBatch, &MakeNonNullRecordBatch, &MakeZeroLengthRecordBatch, &MakeListRecordBatch, - &MakeDeeplyNestedList, &MakeStruct, &MakeUnion)); + &MakeDeeplyNestedList, &MakeStruct, &MakeUnion, &MakeDictionary)); void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) { ipc::MockOutputStream mock; @@ -251,8 +264,9 @@ TEST_F(RecursionLimits, ReadLimit) { std::shared_ptr<Schema> schema; ASSERT_OK(WriteToMmap(64, true, &metadata_length, &body_length, &schema)); - std::shared_ptr<RecordBatchMetadata> metadata; - ASSERT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata)); + std::shared_ptr<Message> message; + ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); + auto metadata = std::make_shared<RecordBatchMetadata>(message); std::shared_ptr<Buffer> payload; ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload)); @@ -260,7 +274,7 @@ TEST_F(RecursionLimits, ReadLimit) { io::BufferReader reader(payload); std::shared_ptr<RecordBatch> batch; - ASSERT_RAISES(Invalid, ReadRecordBatch(metadata, schema, &reader, &batch)); + ASSERT_RAISES(Invalid, ReadRecordBatch(*metadata, schema, &reader, &batch)); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/ipc-file-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc index 7cd8054..4b82aab 100644 --- a/cpp/src/arrow/ipc/ipc-file-test.cc +++ b/cpp/src/arrow/ipc/ipc-file-test.cc @@ -180,72 +180,44 @@ TEST_P(TestStreamFormat, RoundTrip) { #define BATCH_CASES() \ ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \ &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, &MakeStringTypesRecordBatch, \ - &MakeStruct); + &MakeStruct, &MakeDictionary); INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES()); INSTANTIATE_TEST_CASE_P(StreamRoundTripTests, TestStreamFormat, BATCH_CASES()); -class TestFileFooter : public ::testing::Test { - public: - void SetUp() {} - - void CheckRoundtrip(const Schema& schema, const std::vector<FileBlock>& dictionaries, - const std::vector<FileBlock>& record_batches) { - auto buffer = std::make_shared<PoolBuffer>(); - io::BufferOutputStream stream(buffer); - - ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream)); - - std::unique_ptr<FileFooter> footer; - ASSERT_OK(FileFooter::Open(buffer, &footer)); - - ASSERT_EQ(MetadataVersion::V2, footer->version()); +void CheckBatchDictionaries(const RecordBatch& batch) { + // Check that dictionaries that should be the same are the same + auto schema = batch.schema(); - // Check schema - std::shared_ptr<Schema> schema2; - ASSERT_OK(footer->GetSchema(&schema2)); - AssertSchemaEqual(schema, *schema2); + const auto& t0 = static_cast<const DictionaryType&>(*schema->field(0)->type); + const auto& t1 = static_cast<const DictionaryType&>(*schema->field(1)->type); - // Check blocks - ASSERT_EQ(dictionaries.size(), footer->num_dictionaries()); - ASSERT_EQ(record_batches.size(), footer->num_record_batches()); + ASSERT_EQ(t0.dictionary().get(), t1.dictionary().get()); - for (int i = 0; i < footer->num_dictionaries(); ++i) { - CheckBlocks(dictionaries[i], footer->dictionary(i)); - } - - for (int i = 0; i < footer->num_record_batches(); ++i) { - CheckBlocks(record_batches[i], footer->record_batch(i)); - } - } + // Same dictionary used for list values + const auto& t3 = static_cast<const ListType&>(*schema->field(3)->type); + const auto& t3_value = static_cast<const DictionaryType&>(*t3.value_type()); + ASSERT_EQ(t0.dictionary().get(), t3_value.dictionary().get()); +} - void CheckBlocks(const FileBlock& left, const FileBlock& right) { - ASSERT_EQ(left.offset, right.offset); - ASSERT_EQ(left.metadata_length, right.metadata_length); - ASSERT_EQ(left.body_length, right.body_length); - } +TEST_F(TestStreamFormat, DictionaryRoundTrip) { + std::shared_ptr<RecordBatch> batch; + ASSERT_OK(MakeDictionary(&batch)); - private: - std::shared_ptr<Schema> example_schema_; -}; + std::vector<std::shared_ptr<RecordBatch>> out_batches; + ASSERT_OK(RoundTripHelper(*batch, &out_batches)); -TEST_F(TestFileFooter, Basics) { - auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>()); - auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>()); - Schema schema({f0, f1}); + CheckBatchDictionaries(*out_batches[0]); +} - std::vector<FileBlock> dictionaries; - dictionaries.emplace_back(8, 92, 900); - dictionaries.emplace_back(1000, 100, 1900); - dictionaries.emplace_back(3000, 100, 2900); +TEST_F(TestFileFormat, DictionaryRoundTrip) { + std::shared_ptr<RecordBatch> batch; + ASSERT_OK(MakeDictionary(&batch)); - std::vector<FileBlock> record_batches; - record_batches.emplace_back(6000, 100, 900); - record_batches.emplace_back(7000, 100, 1900); - record_batches.emplace_back(9000, 100, 2900); - record_batches.emplace_back(12000, 100, 3900); + std::vector<std::shared_ptr<RecordBatch>> out_batches; + ASSERT_OK(RoundTripHelper({batch}, &out_batches)); - CheckRoundtrip(schema, dictionaries, record_batches); + CheckBatchDictionaries(*out_batches[0]); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/ipc-metadata-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc index 098f996..4fb3204 100644 --- a/cpp/src/arrow/ipc/ipc-metadata-test.cc +++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc @@ -22,6 +22,7 @@ #include "gtest/gtest.h" #include "arrow/io/memory.h" +#include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/test-common.h" #include "arrow/schema.h" @@ -39,9 +40,9 @@ class TestSchemaMetadata : public ::testing::Test { public: void SetUp() {} - void CheckRoundtrip(const Schema& schema) { + void CheckRoundtrip(const Schema& schema, DictionaryMemo* memo) { std::shared_ptr<Buffer> buffer; - ASSERT_OK(WriteSchema(schema, &buffer)); + ASSERT_OK(WriteSchemaMessage(schema, memo, &buffer)); std::shared_ptr<Message> message; ASSERT_OK(Message::Open(buffer, 0, &message)); @@ -51,8 +52,10 @@ class TestSchemaMetadata : public ::testing::Test { auto schema_msg = std::make_shared<SchemaMetadata>(message); ASSERT_EQ(schema.num_fields(), schema_msg->num_fields()); + DictionaryMemo empty_memo; + std::shared_ptr<Schema> schema2; - ASSERT_OK(schema_msg->GetSchema(&schema2)); + ASSERT_OK(schema_msg->GetSchema(empty_memo, &schema2)); AssertSchemaEqual(schema, *schema2); } @@ -74,7 +77,9 @@ TEST_F(TestSchemaMetadata, PrimitiveFields) { auto f10 = std::make_shared<Field>("f10", std::make_shared<BooleanType>()); Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10}); - CheckRoundtrip(schema); + DictionaryMemo memo; + + CheckRoundtrip(schema, &memo); } TEST_F(TestSchemaMetadata, NestedFields) { @@ -86,7 +91,9 @@ TEST_F(TestSchemaMetadata, NestedFields) { auto f1 = std::make_shared<Field>("f1", type2); Schema schema({f0, f1}); - CheckRoundtrip(schema); + DictionaryMemo memo; + + CheckRoundtrip(schema, &memo); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/metadata-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index cd77220..7c8ddb9 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -25,6 +25,7 @@ #include "flatbuffers/flatbuffers.h" +#include "arrow/array.h" #include "arrow/buffer.h" #include "arrow/ipc/Message_generated.h" #include "arrow/schema.h" @@ -115,8 +116,8 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, } // Forward declaration -static Status FieldToFlatbuffer( - FBB& fbb, const std::shared_ptr<Field>& field, FieldOffset* offset); +static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field, + DictionaryMemo* dictionary_memo, FieldOffset* offset); static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) { return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union(); @@ -126,34 +127,73 @@ static Offset FloatToFlatbuffer(FBB& fbb, flatbuf::Precision precision) { return flatbuf::CreateFloatingPoint(fbb, precision).Union(); } -static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* out_children, Offset* offset) { +static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& type, + std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo) { FieldOffset field; - RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(0), &field)); - out_children->push_back(field); + for (int i = 0; i < type->num_children(); ++i) { + RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), dictionary_memo, &field)); + out_children->push_back(field); + } + return Status::OK(); +} + +static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, + std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo, + Offset* offset) { + RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); *offset = flatbuf::CreateList(fbb).Union(); return Status::OK(); } static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* out_children, Offset* offset) { - FieldOffset field; - for (int i = 0; i < type->num_children(); ++i) { - RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), &field)); - out_children->push_back(field); - } + std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo, + Offset* offset) { + RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); *offset = flatbuf::CreateStruct_(fbb).Union(); return Status::OK(); } +static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr<DataType>& type, + std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo, + Offset* offset) { + RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); + + const auto& union_type = static_cast<const UnionType&>(*type); + + flatbuf::UnionMode mode = union_type.mode == UnionMode::SPARSE + ? flatbuf::UnionMode_Sparse + : flatbuf::UnionMode_Dense; + + std::vector<int32_t> type_ids; + type_ids.reserve(union_type.type_codes.size()); + for (uint8_t code : union_type.type_codes) { + type_ids.push_back(code); + } + + auto fb_type_ids = fbb.CreateVector(type_ids); + + *offset = flatbuf::CreateUnion(fbb, mode, fb_type_ids).Union(); + return Status::OK(); +} + #define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED) \ *out_type = flatbuf::Type_Int; \ *offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \ break; +// TODO(wesm): Convert this to visitor pattern static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout, - flatbuf::Type* out_type, Offset* offset) { + flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, Offset* offset) { + if (type->type == Type::DICTIONARY) { + // In this library, the dictionary "type" is a logical construct. Here we + // pass through to the value type, as we've already captured the index + // type in the DictionaryEncoding metadata in the parent field + const auto& dict_type = static_cast<const DictionaryType&>(*type); + return TypeToFlatbuffer(fbb, dict_type.dictionary()->type(), children, layout, + out_type, dictionary_memo, offset); + } + std::vector<BufferDescr> buffer_layout = type->GetBufferLayout(); for (const BufferDescr& descr : buffer_layout) { flatbuf::VectorType vector_type; @@ -217,10 +257,13 @@ static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, break; case Type::LIST: *out_type = flatbuf::Type_List; - return ListToFlatbuffer(fbb, type, children, offset); + return ListToFlatbuffer(fbb, type, children, dictionary_memo, offset); case Type::STRUCT: *out_type = flatbuf::Type_Struct_; - return StructToFlatbuffer(fbb, type, children, offset); + return StructToFlatbuffer(fbb, type, children, dictionary_memo, offset); + case Type::UNION: + *out_type = flatbuf::Type_Union; + return UnionToFlatBuffer(fbb, type, children, dictionary_memo, offset); default: *out_type = flatbuf::Type_NONE; // Make clang-tidy happy std::stringstream ss; @@ -230,35 +273,63 @@ static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, return Status::OK(); } -static Status FieldToFlatbuffer( - FBB& fbb, const std::shared_ptr<Field>& field, FieldOffset* offset) { +using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>; + +static DictionaryOffset GetDictionaryEncoding( + FBB& fbb, const DictionaryType& type, DictionaryMemo* memo) { + int64_t dictionary_id = memo->GetId(type.dictionary()); + + // We assume that the dictionary index type (as an integer) has already been + // validated elsewhere, and can safely assume we are dealing with signed + // integers + const auto& fw_index_type = static_cast<const FixedWidthType&>(*type.index_type()); + + auto index_type_offset = flatbuf::CreateInt(fbb, fw_index_type.bit_width(), true); + + // TODO(wesm): ordered dictionaries + return flatbuf::CreateDictionaryEncoding(fbb, dictionary_id, index_type_offset); +} + +static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field, + DictionaryMemo* dictionary_memo, FieldOffset* offset) { auto fb_name = fbb.CreateString(field->name); flatbuf::Type type_enum; - Offset type_data; + Offset type_offset; Offset type_layout; std::vector<FieldOffset> children; std::vector<VectorLayoutOffset> layout; - RETURN_NOT_OK( - TypeToFlatbuffer(fbb, field->type, &children, &layout, &type_enum, &type_data)); + RETURN_NOT_OK(TypeToFlatbuffer( + fbb, field->type, &children, &layout, &type_enum, dictionary_memo, &type_offset)); auto fb_children = fbb.CreateVector(children); auto fb_layout = fbb.CreateVector(layout); + DictionaryOffset dictionary = 0; + if (field->type->type == Type::DICTIONARY) { + dictionary = GetDictionaryEncoding( + fbb, static_cast<const DictionaryType&>(*field->type), dictionary_memo); + } + // TODO: produce the list of VectorTypes - *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_data, - field->dictionary, fb_children, fb_layout); + *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_offset, + dictionary, fb_children, fb_layout); return Status::OK(); } -Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* out) { - std::shared_ptr<DataType> type; +Status FieldFromFlatbufferDictionary( + const flatbuf::Field* field, std::shared_ptr<Field>* out) { + // Need an empty memo to pass down for constructing children + DictionaryMemo dummy_memo; + + // Any DictionaryEncoding set is ignored here + std::shared_ptr<DataType> type; auto children = field->children(); std::vector<std::shared_ptr<Field>> child_fields(children->size()); for (size_t i = 0; i < children->size(); ++i) { - RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), &child_fields[i])); + RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), dummy_memo, &child_fields[i])); } RETURN_NOT_OK( @@ -268,6 +339,39 @@ Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* return Status::OK(); } +Status FieldFromFlatbuffer(const flatbuf::Field* field, + const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out) { + std::shared_ptr<DataType> type; + + const flatbuf::DictionaryEncoding* encoding = field->dictionary(); + + if (encoding == nullptr) { + // The field is not dictionary encoded. We must potentially visit its + // children to fully reconstruct the data type + auto children = field->children(); + std::vector<std::shared_ptr<Field>> child_fields(children->size()); + for (size_t i = 0; i < children->size(); ++i) { + RETURN_NOT_OK( + FieldFromFlatbuffer(children->Get(i), dictionary_memo, &child_fields[i])); + } + RETURN_NOT_OK( + TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type)); + } else { + // The field is dictionary encoded. The type of the dictionary values has + // been determined elsewhere, and is stored in the DictionaryMemo. Here we + // construct the logical DictionaryType object + + std::shared_ptr<Array> dictionary; + RETURN_NOT_OK(dictionary_memo.GetDictionary(encoding->id(), &dictionary)); + + std::shared_ptr<DataType> index_type; + RETURN_NOT_OK(IntFromFlatbuffer(encoding->indexType(), &index_type)); + type = std::make_shared<DictionaryType>(index_type, dictionary); + } + *out = std::make_shared<Field>(field->name()->str(), type, field->nullable()); + return Status::OK(); +} + // Implement MessageBuilder // will return the endianness of the system we are running on @@ -281,13 +385,13 @@ flatbuf::Endianness endianness() { return bint.c[0] == 1 ? flatbuf::Endianness_Big : flatbuf::Endianness_Little; } -Status SchemaToFlatbuffer( - FBB& fbb, const Schema& schema, flatbuffers::Offset<flatbuf::Schema>* out) { +Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* dictionary_memo, + flatbuffers::Offset<flatbuf::Schema>* out) { std::vector<FieldOffset> field_offsets; for (int i = 0; i < schema.num_fields(); ++i) { std::shared_ptr<Field> field = schema.field(i); FieldOffset offset; - RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, &offset)); + RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, dictionary_memo, &offset)); field_offsets.push_back(offset); } @@ -295,29 +399,63 @@ Status SchemaToFlatbuffer( return Status::OK(); } -Status MessageBuilder::SetSchema(const Schema& schema) { - flatbuffers::Offset<flatbuf::Schema> fb_schema; - RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, &fb_schema)); +class MessageBuilder { + public: + Status SetSchema(const Schema& schema, DictionaryMemo* dictionary_memo) { + flatbuffers::Offset<flatbuf::Schema> fb_schema; + RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, dictionary_memo, &fb_schema)); - header_type_ = flatbuf::MessageHeader_Schema; - header_ = fb_schema.Union(); - body_length_ = 0; - return Status::OK(); -} + header_type_ = flatbuf::MessageHeader_Schema; + header_ = fb_schema.Union(); + body_length_ = 0; + return Status::OK(); + } -Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length, - const std::vector<flatbuf::FieldNode>& nodes, - const std::vector<flatbuf::Buffer>& buffers) { - header_type_ = flatbuf::MessageHeader_RecordBatch; - header_ = flatbuf::CreateRecordBatch(fbb_, length, fbb_.CreateVectorOfStructs(nodes), - fbb_.CreateVectorOfStructs(buffers)) - .Union(); - body_length_ = body_length; + Status SetRecordBatch(int32_t length, int64_t body_length, + const std::vector<flatbuf::FieldNode>& nodes, + const std::vector<flatbuf::Buffer>& buffers) { + header_type_ = flatbuf::MessageHeader_RecordBatch; + header_ = flatbuf::CreateRecordBatch(fbb_, length, fbb_.CreateVectorOfStructs(nodes), + fbb_.CreateVectorOfStructs(buffers)) + .Union(); + body_length_ = body_length; - return Status::OK(); + return Status::OK(); + } + + Status SetDictionary(int64_t id, int32_t length, int64_t body_length, + const std::vector<flatbuf::FieldNode>& nodes, + const std::vector<flatbuf::Buffer>& buffers) { + header_type_ = flatbuf::MessageHeader_DictionaryBatch; + + auto record_batch = flatbuf::CreateRecordBatch(fbb_, length, + fbb_.CreateVectorOfStructs(nodes), fbb_.CreateVectorOfStructs(buffers)); + + header_ = flatbuf::CreateDictionaryBatch(fbb_, id, record_batch).Union(); + body_length_ = body_length; + return Status::OK(); + } + + Status Finish(); + + Status GetBuffer(std::shared_ptr<Buffer>* out); + + private: + flatbuf::MessageHeader header_type_; + flatbuffers::Offset<void> header_; + int64_t body_length_; + flatbuffers::FlatBufferBuilder fbb_; +}; + +Status WriteSchemaMessage( + const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out) { + MessageBuilder message; + RETURN_NOT_OK(message.SetSchema(schema, dictionary_memo)); + RETURN_NOT_OK(message.Finish()); + return message.GetBuffer(out); } -Status WriteRecordBatchMetadata(int32_t length, int64_t body_length, +Status WriteRecordBatchMessage(int32_t length, int64_t body_length, const std::vector<flatbuf::FieldNode>& nodes, const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) { MessageBuilder builder; @@ -326,6 +464,15 @@ Status WriteRecordBatchMetadata(int32_t length, int64_t body_length, return builder.GetBuffer(out); } +Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length, + const std::vector<flatbuf::FieldNode>& nodes, + const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) { + MessageBuilder builder; + RETURN_NOT_OK(builder.SetDictionary(id, length, body_length, nodes, buffers)); + RETURN_NOT_OK(builder.Finish()); + return builder.GetBuffer(out); +} + Status MessageBuilder::Finish() { auto message = flatbuf::CreateMessage(fbb_, kMetadataVersion, header_type_, header_, body_length_); http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/metadata-internal.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index d94a8ab..59afecb 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -46,31 +46,34 @@ using Offset = flatbuffers::Offset<void>; static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2; -Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* out); +// Construct a field with type for a dictionary-encoded field. None of its +// children or children's descendents can be dictionary encoded +Status FieldFromFlatbufferDictionary( + const flatbuf::Field* field, std::shared_ptr<Field>* out); -Status SchemaToFlatbuffer( - FBB& fbb, const Schema& schema, flatbuffers::Offset<flatbuf::Schema>* out); +// Construct a field for a non-dictionary-encoded field. Its children may be +// dictionary encoded +Status FieldFromFlatbuffer(const flatbuf::Field* field, + const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out); -class MessageBuilder { - public: - Status SetSchema(const Schema& schema); +Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* dictionary_memo, + flatbuffers::Offset<flatbuf::Schema>* out); - Status SetRecordBatch(int32_t length, int64_t body_length, - const std::vector<flatbuf::FieldNode>& nodes, - const std::vector<flatbuf::Buffer>& buffers); - - Status Finish(); - - Status GetBuffer(std::shared_ptr<Buffer>* out); - - private: - flatbuf::MessageHeader header_type_; - flatbuffers::Offset<void> header_; - int64_t body_length_; - flatbuffers::FlatBufferBuilder fbb_; -}; +// Serialize arrow::Schema as a Flatbuffer +// +// \param[in] schema a Schema instance +// \param[inout] dictionary_memo class for tracking dictionaries and assigning +// dictionary ids +// \param[out] out the serialized arrow::Buffer +// \return Status outcome +Status WriteSchemaMessage( + const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out); + +Status WriteRecordBatchMessage(int32_t length, int64_t body_length, + const std::vector<flatbuf::FieldNode>& nodes, + const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out); -Status WriteRecordBatchMetadata(int32_t length, int64_t body_length, +Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length, const std::vector<flatbuf::FieldNode>& nodes, const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out); http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index a97965c..2ba44ac 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -19,6 +19,7 @@ #include <cstdint> #include <memory> +#include <sstream> #include <vector> #include "flatbuffers/flatbuffers.h" @@ -38,11 +39,60 @@ namespace flatbuf = org::apache::arrow::flatbuf; namespace ipc { -Status WriteSchema(const Schema& schema, std::shared_ptr<Buffer>* out) { - MessageBuilder message; - RETURN_NOT_OK(message.SetSchema(schema)); - RETURN_NOT_OK(message.Finish()); - return message.GetBuffer(out); +// ---------------------------------------------------------------------- +// Memoization data structure for handling shared dictionaries + +DictionaryMemo::DictionaryMemo() {} + +// Returns KeyError if dictionary not found +Status DictionaryMemo::GetDictionary( + int64_t id, std::shared_ptr<Array>* dictionary) const { + auto it = id_to_dictionary_.find(id); + if (it == id_to_dictionary_.end()) { + std::stringstream ss; + ss << "Dictionary with id " << id << " not found"; + return Status::KeyError(ss.str()); + } + *dictionary = it->second; + return Status::OK(); +} + +int64_t DictionaryMemo::GetId(const std::shared_ptr<Array>& dictionary) { + intptr_t address = reinterpret_cast<intptr_t>(dictionary.get()); + auto it = dictionary_to_id_.find(address); + if (it != dictionary_to_id_.end()) { + // Dictionary already observed, return the id + return it->second; + } else { + int64_t new_id = static_cast<int64_t>(dictionary_to_id_.size()) + 1; + dictionary_to_id_[address] = new_id; + id_to_dictionary_[new_id] = dictionary; + return new_id; + } +} + +bool DictionaryMemo::HasDictionary(const std::shared_ptr<Array>& dictionary) const { + intptr_t address = reinterpret_cast<intptr_t>(dictionary.get()); + auto it = dictionary_to_id_.find(address); + return it != dictionary_to_id_.end(); +} + +bool DictionaryMemo::HasDictionaryId(int64_t id) const { + auto it = id_to_dictionary_.find(id); + return it != id_to_dictionary_.end(); +} + +Status DictionaryMemo::AddDictionary( + int64_t id, const std::shared_ptr<Array>& dictionary) { + if (HasDictionaryId(id)) { + std::stringstream ss; + ss << "Dictionary with id " << id << " already exists"; + return Status::KeyError(ss.str()); + } + intptr_t address = reinterpret_cast<intptr_t>(dictionary.get()); + id_to_dictionary_[id] = dictionary; + dictionary_to_id_[address] = id; + return Status::OK(); } //---------------------------------------------------------------------- @@ -113,10 +163,35 @@ class SchemaMetadata::SchemaMetadataImpl { explicit SchemaMetadataImpl(const void* schema) : schema_(static_cast<const flatbuf::Schema*>(schema)) {} - const flatbuf::Field* field(int i) const { return schema_->fields()->Get(i); } + const flatbuf::Field* get_field(int i) const { return schema_->fields()->Get(i); } int num_fields() const { return schema_->fields()->size(); } + Status VisitField(const flatbuf::Field* field, DictionaryTypeMap* id_to_field) const { + const flatbuf::DictionaryEncoding* dict_metadata = field->dictionary(); + if (dict_metadata == nullptr) { + // Field is not dictionary encoded. Visit children + auto children = field->children(); + for (flatbuffers::uoffset_t i = 0; i < children->size(); ++i) { + RETURN_NOT_OK(VisitField(children->Get(i), id_to_field)); + } + } else { + // Field is dictionary encoded. Construct the data type for the + // dictionary (no descendents can be dictionary encoded) + std::shared_ptr<Field> dictionary_field; + RETURN_NOT_OK(FieldFromFlatbufferDictionary(field, &dictionary_field)); + (*id_to_field)[dict_metadata->id()] = dictionary_field; + } + return Status::OK(); + } + + Status GetDictionaryTypes(DictionaryTypeMap* id_to_field) const { + for (int i = 0; i < num_fields(); ++i) { + RETURN_NOT_OK(VisitField(get_field(i), id_to_field)); + } + return Status::OK(); + } + private: const flatbuf::Schema* schema_; }; @@ -138,15 +213,16 @@ int SchemaMetadata::num_fields() const { return impl_->num_fields(); } -Status SchemaMetadata::GetField(int i, std::shared_ptr<Field>* out) const { - const flatbuf::Field* field = impl_->field(i); - return FieldFromFlatbuffer(field, out); +Status SchemaMetadata::GetDictionaryTypes(DictionaryTypeMap* id_to_field) const { + return impl_->GetDictionaryTypes(id_to_field); } -Status SchemaMetadata::GetSchema(std::shared_ptr<Schema>* out) const { +Status SchemaMetadata::GetSchema( + const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out) const { std::vector<std::shared_ptr<Field>> fields(num_fields()); for (int i = 0; i < this->num_fields(); ++i) { - RETURN_NOT_OK(GetField(i, &fields[i])); + const flatbuf::Field* field = impl_->get_field(i); + RETURN_NOT_OK(FieldFromFlatbuffer(field, dictionary_memo, &fields[i])); } *out = std::make_shared<Schema>(fields); return Status::OK(); @@ -173,28 +249,34 @@ class RecordBatchMetadata::RecordBatchMetadataImpl { int num_fields() const { return batch_->nodes()->size(); } + void set_message(const std::shared_ptr<Message>& message) { message_ = message; } + + void set_buffer(const std::shared_ptr<Buffer>& buffer) { buffer_ = buffer; } + private: const flatbuf::RecordBatch* batch_; const flatbuffers::Vector<const flatbuf::FieldNode*>* nodes_; const flatbuffers::Vector<const flatbuf::Buffer*>* buffers_; + + // Possible parents, owns the flatbuffer data + std::shared_ptr<Message> message_; + std::shared_ptr<Buffer> buffer_; }; RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message) { - message_ = message; impl_.reset(new RecordBatchMetadataImpl(message->impl_->header())); + impl_->set_message(message); } -RecordBatchMetadata::RecordBatchMetadata( - const std::shared_ptr<Buffer>& buffer, int64_t offset) { - message_ = nullptr; - buffer_ = buffer; - - const flatbuf::RecordBatch* metadata = - flatbuffers::GetRoot<flatbuf::RecordBatch>(buffer->data() + offset); - - // TODO(wesm): validate table +RecordBatchMetadata::RecordBatchMetadata(const void* header) { + impl_.reset(new RecordBatchMetadataImpl(header)); +} - impl_.reset(new RecordBatchMetadataImpl(metadata)); +RecordBatchMetadata::RecordBatchMetadata( + const std::shared_ptr<Buffer>& buffer, int64_t offset) + : RecordBatchMetadata(buffer->data() + offset) { + // Preserve ownership + impl_->set_buffer(buffer); } RecordBatchMetadata::~RecordBatchMetadata() {} @@ -232,5 +314,64 @@ int RecordBatchMetadata::num_fields() const { return impl_->num_fields(); } +// ---------------------------------------------------------------------- +// DictionaryBatchMetadata + +class DictionaryBatchMetadata::DictionaryBatchMetadataImpl { + public: + explicit DictionaryBatchMetadataImpl(const void* dictionary) + : metadata_(static_cast<const flatbuf::DictionaryBatch*>(dictionary)) { + record_batch_.reset(new RecordBatchMetadata(metadata_->data())); + } + + int64_t id() const { return metadata_->id(); } + const RecordBatchMetadata& record_batch() const { return *record_batch_; } + + void set_message(const std::shared_ptr<Message>& message) { message_ = message; } + + private: + const flatbuf::DictionaryBatch* metadata_; + + std::unique_ptr<RecordBatchMetadata> record_batch_; + + // Parent, owns the flatbuffer data + std::shared_ptr<Message> message_; +}; + +DictionaryBatchMetadata::DictionaryBatchMetadata( + const std::shared_ptr<Message>& message) { + impl_.reset(new DictionaryBatchMetadataImpl(message->impl_->header())); + impl_->set_message(message); +} + +DictionaryBatchMetadata::~DictionaryBatchMetadata() {} + +int64_t DictionaryBatchMetadata::id() const { + return impl_->id(); +} + +const RecordBatchMetadata& DictionaryBatchMetadata::record_batch() const { + return impl_->record_batch(); +} + +// ---------------------------------------------------------------------- +// Conveniences + +Status ReadMessage(int64_t offset, int32_t metadata_length, + io::ReadableFileInterface* file, std::shared_ptr<Message>* message) { + std::shared_ptr<Buffer> buffer; + RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer)); + + int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data()); + + if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) { + std::stringstream ss; + ss << "flatbuffer size " << metadata_length << " invalid. File offset: " << offset + << ", metadata length: " << metadata_length; + return Status::Invalid(ss.str()); + } + return Message::Open(buffer, 4, message); +} + } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 81e3dbd..0091067 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -22,13 +22,17 @@ #include <cstdint> #include <memory> +#include <unordered_map> #include <vector> +#include "arrow/util/macros.h" #include "arrow/util/visibility.h" namespace arrow { +class Array; class Buffer; +struct DataType; struct Field; class Schema; class Status; @@ -36,6 +40,7 @@ class Status; namespace io { class OutputStream; +class ReadableFileInterface; } // namespace io @@ -47,9 +52,38 @@ struct MetadataVersion { //---------------------------------------------------------------------- -// Serialize arrow::Schema as a Flatbuffer -ARROW_EXPORT -Status WriteSchema(const Schema& schema, std::shared_ptr<Buffer>* out); +using DictionaryMap = std::unordered_map<int64_t, std::shared_ptr<Array>>; +using DictionaryTypeMap = std::unordered_map<int64_t, std::shared_ptr<Field>>; + +// Memoization data structure for handling shared dictionaries +class DictionaryMemo { + public: + DictionaryMemo(); + + // Returns KeyError if dictionary not found + Status GetDictionary(int64_t id, std::shared_ptr<Array>* dictionary) const; + + int64_t GetId(const std::shared_ptr<Array>& dictionary); + + bool HasDictionary(const std::shared_ptr<Array>& dictionary) const; + bool HasDictionaryId(int64_t id) const; + + // Add a dictionary to the memo with a particular id. Returns KeyError if + // that dictionary already exists + Status AddDictionary(int64_t id, const std::shared_ptr<Array>& dictionary); + + const DictionaryMap& id_to_dictionary() const { return id_to_dictionary_; } + + private: + // Dictionary memory addresses, to track whether a dictionary has been seen + // before + std::unordered_map<intptr_t, int64_t> dictionary_to_id_; + + // Map of dictionary id to dictionary array + DictionaryMap id_to_dictionary_; + + DISALLOW_COPY_AND_ASSIGN(DictionaryMemo); +}; // Read interface classes. We do not fully deserialize the flatbuffers so that // individual fields metadata can be retrieved from very large schema without @@ -69,12 +103,15 @@ class ARROW_EXPORT SchemaMetadata { int num_fields() const; - // Construct an arrow::Field for the i-th value in the metadata - Status GetField(int i, std::shared_ptr<Field>* out) const; + // Retrieve a list of all the dictionary ids and types required by the schema for + // reconstruction. The presumption is that these will be loaded either from + // the stream or file (or they may already be somewhere else in memory) + Status GetDictionaryTypes(DictionaryTypeMap* id_to_field) const; // Construct a complete Schema from the message. May be expensive for very // large schemas if you are only interested in a few fields - Status GetSchema(std::shared_ptr<Schema>* out) const; + Status GetSchema( + const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out) const; private: // Parent, owns the flatbuffer data @@ -82,6 +119,8 @@ class ARROW_EXPORT SchemaMetadata { class SchemaMetadataImpl; std::unique_ptr<SchemaMetadataImpl> impl_; + + DISALLOW_COPY_AND_ASSIGN(SchemaMetadata); }; // Field metadata @@ -99,8 +138,10 @@ struct ARROW_EXPORT BufferMetadata { // Container for serialized record batch metadata contained in an IPC message class ARROW_EXPORT RecordBatchMetadata { public: + // Instantiate from opaque pointer. Memory ownership must be preserved + // elsewhere (e.g. in a dictionary batch) + explicit RecordBatchMetadata(const void* header); explicit RecordBatchMetadata(const std::shared_ptr<Message>& message); - RecordBatchMetadata(const std::shared_ptr<Buffer>& message, int64_t offset); ~RecordBatchMetadata(); @@ -113,18 +154,25 @@ class ARROW_EXPORT RecordBatchMetadata { int num_fields() const; private: - // Parent, owns the flatbuffer data - std::shared_ptr<Message> message_; - std::shared_ptr<Buffer> buffer_; - class RecordBatchMetadataImpl; std::unique_ptr<RecordBatchMetadataImpl> impl_; + + DISALLOW_COPY_AND_ASSIGN(RecordBatchMetadata); }; class ARROW_EXPORT DictionaryBatchMetadata { public: + explicit DictionaryBatchMetadata(const std::shared_ptr<Message>& message); + ~DictionaryBatchMetadata(); + int64_t id() const; - std::unique_ptr<RecordBatchMetadata> data() const; + const RecordBatchMetadata& record_batch() const; + + private: + class DictionaryBatchMetadataImpl; + std::unique_ptr<DictionaryBatchMetadataImpl> impl_; + + DISALLOW_COPY_AND_ASSIGN(DictionaryBatchMetadata); }; class ARROW_EXPORT Message { @@ -141,24 +189,31 @@ class ARROW_EXPORT Message { private: Message(const std::shared_ptr<Buffer>& buffer, int64_t offset); + friend class DictionaryBatchMetadata; friend class RecordBatchMetadata; friend class SchemaMetadata; // Hide serialization details from user API class MessageImpl; std::unique_ptr<MessageImpl> impl_; -}; -struct ARROW_EXPORT FileBlock { - FileBlock() {} - FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length) - : offset(offset), metadata_length(metadata_length), body_length(body_length) {} - - int64_t offset; - int32_t metadata_length; - int64_t body_length; + DISALLOW_COPY_AND_ASSIGN(Message); }; +/// Read a length-prefixed message flatbuffer starting at the indicated file +/// offset +/// +/// The metadata_length includes at least the length prefix and the flatbuffer +/// +/// \param[in] offset the position in the file where the message starts. The +/// first 4 bytes after the offset are the message length +/// \param[in] metadata_length the total number of bytes to read from file +/// \param[in] file the seekable file interface to read from +/// \param[out] message the message read +/// \return Status success or failure +Status ReadMessage(int64_t offset, int32_t metadata_length, + io::ReadableFileInterface* file, std::shared_ptr<Message>* message); + } // namespace ipc } // namespace arrow