Repository: arrow Updated Branches: refs/heads/master 8ca7033fc -> 5888e10cf
ARROW-495: [C++] Implement streaming binary format, refactoring cc @nongli Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #293 from wesm/ARROW-495 and squashes the following commits: 279583b [Wes McKinney] FileBlock is a struct c88e61a [Wes McKinney] Fix Python bindings after API changes 645a329 [Wes McKinney] Install stream.h 21378b4 [Wes McKinney] Collapse BaseStreamWriter and StreamWriter b6c4578 [Wes McKinney] clang-format 12eb2cb [Wes McKinney] Add unit tests for streaming format, fix EOS, metadata length padding issues 3200b17 [Wes McKinney] Implement StreamReader 69fe82e [Wes McKinney] Implement rough draft of StreamWriter, share code with FileWriter Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5888e10c Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5888e10c Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5888e10c Branch: refs/heads/master Commit: 5888e10cffac222e359d1b440b4684d16c061085 Parents: 8ca7033 Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Sat Jan 21 11:11:06 2017 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Sat Jan 21 11:11:06 2017 -0500 ---------------------------------------------------------------------- cpp/CMakeLists.txt | 1 - cpp/src/arrow/io/memory.cc | 4 +- cpp/src/arrow/ipc/CMakeLists.txt | 2 + cpp/src/arrow/ipc/adapter.cc | 44 ++--- cpp/src/arrow/ipc/adapter.h | 11 +- cpp/src/arrow/ipc/file.cc | 167 +++++++++++++------ cpp/src/arrow/ipc/file.h | 54 ++++--- cpp/src/arrow/ipc/ipc-adapter-test.cc | 16 +- cpp/src/arrow/ipc/ipc-file-test.cc | 188 +++++++++++++++++---- cpp/src/arrow/ipc/ipc-json-test.cc | 5 +- cpp/src/arrow/ipc/ipc-metadata-test.cc | 83 +--------- cpp/src/arrow/ipc/json-integration-test.cc | 4 +- cpp/src/arrow/ipc/json.cc | 19 +-- cpp/src/arrow/ipc/json.h | 3 +- cpp/src/arrow/ipc/metadata-internal.cc | 8 +- cpp/src/arrow/ipc/metadata-internal.h | 4 +- cpp/src/arrow/ipc/metadata.cc | 121 +------------- cpp/src/arrow/ipc/metadata.h | 32 +--- cpp/src/arrow/ipc/stream.cc | 206 ++++++++++++++++++++++++ cpp/src/arrow/ipc/stream.h | 112 +++++++++++++ cpp/src/arrow/ipc/test-common.h | 9 ++ python/pyarrow/includes/libarrow_ipc.pxd | 3 +- python/pyarrow/ipc.pyx | 5 +- python/src/pyarrow/adapters/pandas.cc | 34 ++-- 24 files changed, 718 insertions(+), 417 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 885ab19..9039ffb 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -90,7 +90,6 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") option(ARROW_ALTIVEC "Build Arrow with Altivec" ON) - endif() if(NOT ARROW_BUILD_TESTS) http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/io/memory.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 0f5a0dc..1339a99 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -116,13 +116,13 @@ Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { int64_t size = std::min(nbytes, size_ - position_); - if (buffer_ != nullptr) { + if (size > 0 && buffer_ != nullptr) { *out = SliceBuffer(buffer_, position_, size); } else { *out = std::make_shared<Buffer>(data_ + position_, size); } - position_ += nbytes; + position_ += size; return Status::OK(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index b7ac5f0..c047f53 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -46,6 +46,7 @@ set(ARROW_IPC_SRCS json-internal.cc metadata.cc metadata-internal.cc + stream.cc ) if(NOT APPLE) @@ -151,6 +152,7 @@ install(FILES file.h json.h metadata.h + stream.h DESTINATION include/arrow/ipc) # pkg-config support http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/adapter.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index 7b4d18c..9da7b39 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -49,10 +49,9 @@ namespace ipc { class RecordBatchWriter : public ArrayVisitor { public: - RecordBatchWriter(const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows, - int64_t buffer_start_offset, int max_recursion_depth) - : columns_(columns), - num_rows_(num_rows), + RecordBatchWriter( + const RecordBatch& batch, int64_t buffer_start_offset, int max_recursion_depth) + : batch_(batch), max_recursion_depth_(max_recursion_depth), buffer_start_offset_(buffer_start_offset) {} @@ -79,8 +78,8 @@ class RecordBatchWriter : public ArrayVisitor { } // Perform depth-first traversal of the row-batch - for (size_t i = 0; i < columns_.size(); ++i) { - RETURN_NOT_OK(VisitArray(*columns_[i].get())); + for (int i = 0; i < batch_.num_columns(); ++i) { + RETURN_NOT_OK(VisitArray(*batch_.column(i))); } // The position for the start of a buffer relative to the passed frame of @@ -126,18 +125,23 @@ class RecordBatchWriter : public ArrayVisitor { // itself as an int32_t. std::shared_ptr<Buffer> metadata_fb; RETURN_NOT_OK(WriteRecordBatchMetadata( - num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb)); + batch_.num_rows(), body_length, field_nodes_, buffer_meta_, &metadata_fb)); // Need to write 4 bytes (metadata size), the metadata, plus padding to - // fall on an 8-byte offset - int64_t padded_metadata_length = BitUtil::CeilByte(metadata_fb->size() + 4); + // end on an 8-byte offset + int64_t start_offset; + RETURN_NOT_OK(dst->Tell(&start_offset)); + + int64_t padded_metadata_length = metadata_fb->size() + 4; + const int remainder = (padded_metadata_length + start_offset) % 8; + if (remainder != 0) { padded_metadata_length += 8 - remainder; } // The returned metadata size includes the length prefix, the flatbuffer, // plus padding *metadata_length = static_cast<int32_t>(padded_metadata_length); - // Write the flatbuffer size prefix - int32_t flatbuffer_size = metadata_fb->size(); + // Write the flatbuffer size prefix including padding + int32_t flatbuffer_size = padded_metadata_length - 4; RETURN_NOT_OK( dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t))); @@ -294,9 +298,7 @@ class RecordBatchWriter : public ArrayVisitor { return Status::OK(); } - // Do not copy this vector. Ownership must be retained elsewhere - const std::vector<std::shared_ptr<Array>>& columns_; - int32_t num_rows_; + const RecordBatch& batch_; std::vector<flatbuf::FieldNode> field_nodes_; std::vector<flatbuf::Buffer> buffer_meta_; @@ -306,18 +308,16 @@ class RecordBatchWriter : public ArrayVisitor { int64_t buffer_start_offset_; }; -Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns, - int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst, - int32_t* metadata_length, int64_t* body_length, int max_recursion_depth) { +Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, + io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, + int max_recursion_depth) { DCHECK_GT(max_recursion_depth, 0); - RecordBatchWriter serializer( - columns, num_rows, buffer_start_offset, max_recursion_depth); + RecordBatchWriter serializer(batch, buffer_start_offset, max_recursion_depth); return serializer.Write(dst, metadata_length, body_length); } -Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) { - RecordBatchWriter serializer( - batch->columns(), batch->num_rows(), 0, kMaxIpcRecursionDepth); +Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { + RecordBatchWriter serializer(batch, 0, kMaxIpcRecursionDepth); RETURN_NOT_OK(serializer.GetTotalSize(size)); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/adapter.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h index 963b9ee..f9ef7d9 100644 --- a/cpp/src/arrow/ipc/adapter.h +++ b/cpp/src/arrow/ipc/adapter.h @@ -71,17 +71,14 @@ constexpr int kMaxIpcRecursionDepth = 64; // // @param(out) body_length: the size of the contiguous buffer block plus // padding bytes -ARROW_EXPORT Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns, - int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst, - int32_t* metadata_length, int64_t* body_length, - int max_recursion_depth = kMaxIpcRecursionDepth); - -// int64_t GetRecordBatchMetadata(const RecordBatch* batch); +ARROW_EXPORT Status WriteRecordBatch(const RecordBatch& batch, + int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length, int max_recursion_depth = kMaxIpcRecursionDepth); // Compute the precise number of bytes needed in a contiguous memory segment to // write the record batch. This involves generating the complete serialized // Flatbuffers metadata. -ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size); +ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size); // ---------------------------------------------------------------------- // "Read" path; does not copy data if the input supports zero copy reads http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc index d7d2e61..bc086e3 100644 --- a/cpp/src/arrow/ipc/file.cc +++ b/cpp/src/arrow/ipc/file.cc @@ -26,6 +26,7 @@ #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/ipc/adapter.h" +#include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" #include "arrow/status.h" @@ -35,82 +36,154 @@ namespace arrow { namespace ipc { static constexpr const char* kArrowMagicBytes = "ARROW1"; - // ---------------------------------------------------------------------- -// Writer implementation +// File footer + +static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>> +FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) { + std::vector<flatbuf::Block> fb_blocks; -FileWriter::FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema) - : sink_(sink), schema_(schema), position_(-1), started_(false) {} + for (const FileBlock& block : blocks) { + fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length); + } -Status FileWriter::UpdatePosition() { - return sink_->Tell(&position_); + return fbb.CreateVectorOfStructs(fb_blocks); } -Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, - std::shared_ptr<FileWriter>* out) { - *out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema)); // ctor is private - RETURN_NOT_OK((*out)->UpdatePosition()); - return Status::OK(); +Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, + const std::vector<FileBlock>& record_batches, io::OutputStream* out) { + FBB fbb; + + flatbuffers::Offset<flatbuf::Schema> fb_schema; + RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema)); + + auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); + auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); + + auto footer = flatbuf::CreateFooter( + fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches); + + fbb.Finish(footer); + + int32_t size = fbb.GetSize(); + + return out->Write(fbb.GetBufferPointer(), size); } -Status FileWriter::Write(const uint8_t* data, int64_t nbytes) { - RETURN_NOT_OK(sink_->Write(data, nbytes)); - position_ += nbytes; - return Status::OK(); +static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { + return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength()); } -Status FileWriter::Align() { - int64_t remainder = PaddedLength(position_) - position_; - if (remainder > 0) { return Write(kPaddingBytes, remainder); } +class FileFooter::FileFooterImpl { + public: + FileFooterImpl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Footer* footer) + : buffer_(buffer), footer_(footer) {} + + int num_dictionaries() const { return footer_->dictionaries()->size(); } + + int num_record_batches() const { return footer_->recordBatches()->size(); } + + MetadataVersion::type version() const { + switch (footer_->version()) { + case flatbuf::MetadataVersion_V1: + return MetadataVersion::V1; + case flatbuf::MetadataVersion_V2: + return MetadataVersion::V2; + // Add cases as other versions become available + default: + return MetadataVersion::V2; + } + } + + FileBlock record_batch(int i) const { + return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); + } + + FileBlock dictionary(int i) const { + return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); + } + + Status GetSchema(std::shared_ptr<Schema>* out) const { + auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema()); + return schema_msg->GetSchema(out); + } + + private: + // Retain reference to memory + std::shared_ptr<Buffer> buffer_; + + const flatbuf::Footer* footer_; +}; + +FileFooter::FileFooter() {} + +FileFooter::~FileFooter() {} + +Status FileFooter::Open( + const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) { + const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data()); + + *out = std::unique_ptr<FileFooter>(new FileFooter()); + + // TODO(wesm): Verify the footer + (*out)->impl_.reset(new FileFooterImpl(buffer, footer)); + return Status::OK(); } -Status FileWriter::WriteAligned(const uint8_t* data, int64_t nbytes) { - RETURN_NOT_OK(Write(data, nbytes)); - return Align(); +int FileFooter::num_dictionaries() const { + return impl_->num_dictionaries(); } -Status FileWriter::Start() { - RETURN_NOT_OK(WriteAligned( - reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes))); - started_ = true; - return Status::OK(); +int FileFooter::num_record_batches() const { + return impl_->num_record_batches(); } -Status FileWriter::CheckStarted() { - if (!started_) { return Start(); } - return Status::OK(); +MetadataVersion::type FileFooter::version() const { + return impl_->version(); } -Status FileWriter::WriteRecordBatch( - const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) { - RETURN_NOT_OK(CheckStarted()); - - int64_t offset = position_; +FileBlock FileFooter::record_batch(int i) const { + return impl_->record_batch(i); +} - // There may be padding ever the end of the metadata, so we cannot rely on - // position_ - int32_t metadata_length; - int64_t body_length; +FileBlock FileFooter::dictionary(int i) const { + return impl_->dictionary(i); +} - // Frame of reference in file format is 0, see ARROW-384 - const int64_t buffer_start_offset = 0; - RETURN_NOT_OK(arrow::ipc::WriteRecordBatch( - columns, num_rows, buffer_start_offset, sink_, &metadata_length, &body_length)); - RETURN_NOT_OK(UpdatePosition()); +Status FileFooter::GetSchema(std::shared_ptr<Schema>* out) const { + return impl_->GetSchema(out); +} - DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes"; +// ---------------------------------------------------------------------- +// File writer implementation - // Append metadata, to be written in the footer later - record_batches_.emplace_back(offset, metadata_length, body_length); +Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, + std::shared_ptr<FileWriter>* out) { + *out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema)); // ctor is private + RETURN_NOT_OK((*out)->UpdatePosition()); + return Status::OK(); +} +Status FileWriter::Start() { + RETURN_NOT_OK(WriteAligned( + reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes))); + started_ = true; return Status::OK(); } +Status FileWriter::WriteRecordBatch(const RecordBatch& batch) { + // Push an empty FileBlock + // Append metadata, to be written in the footer later + record_batches_.emplace_back(0, 0, 0); + return StreamWriter::WriteRecordBatch( + batch, &record_batches_[record_batches_.size() - 1]); +} + Status FileWriter::Close() { // Write metadata int64_t initial_position = position_; - RETURN_NOT_OK(WriteFileFooter(schema_.get(), dictionaries_, record_batches_, sink_)); + RETURN_NOT_OK(WriteFileFooter(*schema_, dictionaries_, record_batches_, sink_)); RETURN_NOT_OK(UpdatePosition()); // Write footer length http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/file.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/file.h b/cpp/src/arrow/ipc/file.h index 4f35c37..7696954 100644 --- a/cpp/src/arrow/ipc/file.h +++ b/cpp/src/arrow/ipc/file.h @@ -25,13 +25,12 @@ #include <vector> #include "arrow/ipc/metadata.h" +#include "arrow/ipc/stream.h" #include "arrow/util/visibility.h" namespace arrow { -class Array; class Buffer; -struct Field; class RecordBatch; class Schema; class Status; @@ -45,40 +44,43 @@ class ReadableFileInterface; namespace ipc { -class ARROW_EXPORT FileWriter { - public: - static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, - std::shared_ptr<FileWriter>* out); +Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, + const std::vector<FileBlock>& record_batches, io::OutputStream* out); - // TODO(wesm): Write dictionaries +class ARROW_EXPORT FileFooter { + public: + ~FileFooter(); - Status WriteRecordBatch( - const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows); + static Status Open( + const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out); - Status Close(); + int num_dictionaries() const; + int num_record_batches() const; + MetadataVersion::type version() const; - private: - FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema); + FileBlock record_batch(int i) const; + FileBlock dictionary(int i) const; - Status CheckStarted(); - Status Start(); + Status GetSchema(std::shared_ptr<Schema>* out) const; - Status UpdatePosition(); + private: + FileFooter(); + class FileFooterImpl; + std::unique_ptr<FileFooterImpl> impl_; +}; - // Adds padding bytes if necessary to ensure all memory blocks are written on - // 8-byte boundaries. - Status Align(); +class ARROW_EXPORT FileWriter : public StreamWriter { + public: + static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, + std::shared_ptr<FileWriter>* out); - // Write data and update position - Status Write(const uint8_t* data, int64_t nbytes); + Status WriteRecordBatch(const RecordBatch& batch) override; + Status Close() override; - // Write and align - Status WriteAligned(const uint8_t* data, int64_t nbytes); + private: + using StreamWriter::StreamWriter; - io::OutputStream* sink_; - std::shared_ptr<Schema> schema_; - int64_t position_; - bool started_; + Status Start() override; std::vector<FileBlock> dictionaries_; std::vector<FileBlock> record_batches_; http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/ipc-adapter-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index 6ba0a6e..17868f8 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -55,8 +55,8 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>, const int64_t buffer_offset = 0; - RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), buffer_offset, - mmap_.get(), &metadata_length, &body_length)); + RETURN_NOT_OK(WriteRecordBatch( + batch, buffer_offset, mmap_.get(), &metadata_length, &body_length)); std::shared_ptr<RecordBatchMetadata> metadata; RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata)); @@ -102,9 +102,8 @@ void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) { int32_t mock_metadata_length = -1; int64_t mock_body_length = -1; int64_t size = -1; - ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), 0, &mock, - &mock_metadata_length, &mock_body_length)); - ASSERT_OK(GetRecordBatchSize(batch.get(), &size)); + ASSERT_OK(WriteRecordBatch(*batch, 0, &mock, &mock_metadata_length, &mock_body_length)); + ASSERT_OK(GetRecordBatchSize(*batch, &size)); ASSERT_EQ(mock.GetExtentBytesWritten(), size); } @@ -157,11 +156,10 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); if (override_level) { - return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(), - metadata_length, body_length, recursion_level + 1); + return WriteRecordBatch( + *batch, 0, mmap_.get(), metadata_length, body_length, recursion_level + 1); } else { - return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(), - metadata_length, body_length); + return WriteRecordBatch(*batch, 0, mmap_.get(), metadata_length, body_length); } } http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/ipc-file-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc index 0a9f677..15ceb80 100644 --- a/cpp/src/arrow/ipc/ipc-file-test.cc +++ b/cpp/src/arrow/ipc/ipc-file-test.cc @@ -29,6 +29,7 @@ #include "arrow/io/test-common.h" #include "arrow/ipc/adapter.h" #include "arrow/ipc/file.h" +#include "arrow/ipc/stream.h" #include "arrow/ipc/test-common.h" #include "arrow/ipc/util.h" @@ -41,6 +42,19 @@ namespace arrow { namespace ipc { +void CompareBatch(const RecordBatch& left, const RecordBatch& right) { + ASSERT_TRUE(left.schema()->Equals(right.schema())); + ASSERT_EQ(left.num_columns(), right.num_columns()) + << left.schema()->ToString() << " result: " << right.schema()->ToString(); + EXPECT_EQ(left.num_rows(), right.num_rows()); + for (int i = 0; i < left.num_columns(); ++i) { + EXPECT_TRUE(left.column(i)->Equals(right.column(i))) + << "Idx: " << i << " Name: " << left.column_name(i); + } +} + +using BatchVector = std::vector<std::shared_ptr<RecordBatch>>; + class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> { public: void SetUp() { @@ -50,43 +64,94 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> { } void TearDown() {} - Status RoundTripHelper( - const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) { + Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) { // Write the file - RETURN_NOT_OK(FileWriter::Open(sink_.get(), batch.schema(), &file_writer_)); - int num_batches = 3; - for (int i = 0; i < num_batches; ++i) { - RETURN_NOT_OK(file_writer_->WriteRecordBatch(batch.columns(), batch.num_rows())); + std::shared_ptr<FileWriter> writer; + RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer)); + + const int num_batches = static_cast<int>(in_batches.size()); + + for (const auto& batch : in_batches) { + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); } - RETURN_NOT_OK(file_writer_->Close()); + RETURN_NOT_OK(writer->Close()); // Current offset into stream is the end of the file int64_t footer_offset; RETURN_NOT_OK(sink_->Tell(&footer_offset)); // Open the file - auto reader = std::make_shared<io::BufferReader>(buffer_); - RETURN_NOT_OK(FileReader::Open(reader, footer_offset, &file_reader_)); + auto buf_reader = std::make_shared<io::BufferReader>(buffer_); + std::shared_ptr<FileReader> reader; + RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader)); - EXPECT_EQ(num_batches, file_reader_->num_record_batches()); - - out_batches->resize(num_batches); + EXPECT_EQ(num_batches, reader->num_record_batches()); for (int i = 0; i < num_batches; ++i) { - RETURN_NOT_OK(file_reader_->GetRecordBatch(i, &(*out_batches)[i])); + std::shared_ptr<RecordBatch> chunk; + RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk)); + out_batches->emplace_back(chunk); } return Status::OK(); } - void CompareBatch(const RecordBatch* left, const RecordBatch* right) { - ASSERT_TRUE(left->schema()->Equals(right->schema())); - ASSERT_EQ(left->num_columns(), right->num_columns()) - << left->schema()->ToString() << " result: " << right->schema()->ToString(); - EXPECT_EQ(left->num_rows(), right->num_rows()); - for (int i = 0; i < left->num_columns(); ++i) { - EXPECT_TRUE(left->column(i)->Equals(right->column(i))) - << "Idx: " << i << " Name: " << left->column_name(i); + protected: + MemoryPool* pool_; + + std::unique_ptr<io::BufferOutputStream> sink_; + std::shared_ptr<PoolBuffer> buffer_; +}; + +TEST_P(TestFileFormat, RoundTrip) { + std::shared_ptr<RecordBatch> batch1; + std::shared_ptr<RecordBatch> batch2; + ASSERT_OK((*GetParam())(&batch1)); // NOLINT clang-tidy gtest issue + ASSERT_OK((*GetParam())(&batch2)); // NOLINT clang-tidy gtest issue + + std::vector<std::shared_ptr<RecordBatch>> in_batches = {batch1, batch2}; + std::vector<std::shared_ptr<RecordBatch>> out_batches; + + ASSERT_OK(RoundTripHelper(in_batches, &out_batches)); + + // Compare batches + for (size_t i = 0; i < in_batches.size(); ++i) { + CompareBatch(*in_batches[i], *out_batches[i]); + } +} + +class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> { + public: + void SetUp() { + pool_ = default_memory_pool(); + buffer_ = std::make_shared<PoolBuffer>(pool_); + sink_.reset(new io::BufferOutputStream(buffer_)); + } + void TearDown() {} + + Status RoundTripHelper( + const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) { + // Write the file + std::shared_ptr<StreamWriter> writer; + RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer)); + int num_batches = 5; + for (int i = 0; i < num_batches; ++i) { + RETURN_NOT_OK(writer->WriteRecordBatch(batch)); + } + RETURN_NOT_OK(writer->Close()); + + // Open the file + auto buf_reader = std::make_shared<io::BufferReader>(buffer_); + + std::shared_ptr<StreamReader> reader; + RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader)); + + std::shared_ptr<RecordBatch> chunk; + while (true) { + RETURN_NOT_OK(reader->GetNextRecordBatch(&chunk)); + if (chunk == nullptr) { break; } + out_batches->emplace_back(chunk); } + return Status::OK(); } protected: @@ -94,12 +159,9 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> { std::unique_ptr<io::BufferOutputStream> sink_; std::shared_ptr<PoolBuffer> buffer_; - - std::shared_ptr<FileWriter> file_writer_; - std::shared_ptr<FileReader> file_reader_; }; -TEST_P(TestFileFormat, RoundTrip) { +TEST_P(TestStreamFormat, RoundTrip) { std::shared_ptr<RecordBatch> batch; ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue @@ -109,14 +171,80 @@ TEST_P(TestFileFormat, RoundTrip) { // Compare batches. Same for (size_t i = 0; i < out_batches.size(); ++i) { - CompareBatch(batch.get(), out_batches[i].get()); + CompareBatch(*batch, *out_batches[i]); } } -INSTANTIATE_TEST_CASE_P(RoundTripTests, TestFileFormat, - ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, - &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, - &MakeStringTypesRecordBatch, &MakeStruct)); +#define BATCH_CASES() \ + ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \ + &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, &MakeStringTypesRecordBatch, \ + &MakeStruct); + +INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES()); +INSTANTIATE_TEST_CASE_P(StreamRoundTripTests, TestStreamFormat, BATCH_CASES()); + +class TestFileFooter : public ::testing::Test { + public: + void SetUp() {} + + void CheckRoundtrip(const Schema& schema, const std::vector<FileBlock>& dictionaries, + const std::vector<FileBlock>& record_batches) { + auto buffer = std::make_shared<PoolBuffer>(); + io::BufferOutputStream stream(buffer); + + ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream)); + + std::unique_ptr<FileFooter> footer; + ASSERT_OK(FileFooter::Open(buffer, &footer)); + + ASSERT_EQ(MetadataVersion::V2, footer->version()); + + // Check schema + std::shared_ptr<Schema> schema2; + ASSERT_OK(footer->GetSchema(&schema2)); + AssertSchemaEqual(schema, *schema2); + + // Check blocks + ASSERT_EQ(dictionaries.size(), footer->num_dictionaries()); + ASSERT_EQ(record_batches.size(), footer->num_record_batches()); + + for (int i = 0; i < footer->num_dictionaries(); ++i) { + CheckBlocks(dictionaries[i], footer->dictionary(i)); + } + + for (int i = 0; i < footer->num_record_batches(); ++i) { + CheckBlocks(record_batches[i], footer->record_batch(i)); + } + } + + void CheckBlocks(const FileBlock& left, const FileBlock& right) { + ASSERT_EQ(left.offset, right.offset); + ASSERT_EQ(left.metadata_length, right.metadata_length); + ASSERT_EQ(left.body_length, right.body_length); + } + + private: + std::shared_ptr<Schema> example_schema_; +}; + +TEST_F(TestFileFooter, Basics) { + auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>()); + auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>()); + Schema schema({f0, f1}); + + std::vector<FileBlock> dictionaries; + dictionaries.emplace_back(8, 92, 900); + dictionaries.emplace_back(1000, 100, 1900); + dictionaries.emplace_back(3000, 100, 2900); + + std::vector<FileBlock> record_batches; + record_batches.emplace_back(6000, 100, 900); + record_batches.emplace_back(7000, 100, 1900); + record_batches.emplace_back(9000, 100, 2900); + record_batches.emplace_back(12000, 100, 3900); + + CheckRoundtrip(schema, dictionaries, record_batches); +} } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/ipc-json-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc index 0750989..30f968c 100644 --- a/cpp/src/arrow/ipc/ipc-json-test.cc +++ b/cpp/src/arrow/ipc/ipc-json-test.cc @@ -245,8 +245,9 @@ TEST(TestJsonFileReadWrite, BasicRoundTrip) { std::vector<std::shared_ptr<Array>> arrays; MakeBatchArrays(schema, num_rows, &arrays); - batches.emplace_back(std::make_shared<RecordBatch>(schema, num_rows, arrays)); - ASSERT_OK(writer->WriteRecordBatch(arrays, num_rows)); + auto batch = std::make_shared<RecordBatch>(schema, num_rows, arrays); + batches.push_back(batch); + ASSERT_OK(writer->WriteRecordBatch(*batch)); } std::string result; http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/ipc-metadata-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc index 7c5744a..098f996 100644 --- a/cpp/src/arrow/ipc/ipc-metadata-test.cc +++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc @@ -23,6 +23,7 @@ #include "arrow/io/memory.h" #include "arrow/ipc/metadata.h" +#include "arrow/ipc/test-common.h" #include "arrow/schema.h" #include "arrow/status.h" #include "arrow/test-util.h" @@ -34,20 +35,11 @@ class Buffer; namespace ipc { -static inline void assert_schema_equal(const Schema* lhs, const Schema* rhs) { - if (!lhs->Equals(*rhs)) { - std::stringstream ss; - ss << "left schema: " << lhs->ToString() << std::endl - << "right schema: " << rhs->ToString() << std::endl; - FAIL() << ss.str(); - } -} - class TestSchemaMetadata : public ::testing::Test { public: void SetUp() {} - void CheckRoundtrip(const Schema* schema) { + void CheckRoundtrip(const Schema& schema) { std::shared_ptr<Buffer> buffer; ASSERT_OK(WriteSchema(schema, &buffer)); @@ -57,12 +49,12 @@ class TestSchemaMetadata : public ::testing::Test { ASSERT_EQ(Message::SCHEMA, message->type()); auto schema_msg = std::make_shared<SchemaMetadata>(message); - ASSERT_EQ(schema->num_fields(), schema_msg->num_fields()); + ASSERT_EQ(schema.num_fields(), schema_msg->num_fields()); std::shared_ptr<Schema> schema2; ASSERT_OK(schema_msg->GetSchema(&schema2)); - assert_schema_equal(schema, schema2.get()); + AssertSchemaEqual(schema, *schema2); } }; @@ -82,7 +74,7 @@ TEST_F(TestSchemaMetadata, PrimitiveFields) { auto f10 = std::make_shared<Field>("f10", std::make_shared<BooleanType>()); Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10}); - CheckRoundtrip(&schema); + CheckRoundtrip(schema); } TEST_F(TestSchemaMetadata, NestedFields) { @@ -94,70 +86,7 @@ TEST_F(TestSchemaMetadata, NestedFields) { auto f1 = std::make_shared<Field>("f1", type2); Schema schema({f0, f1}); - CheckRoundtrip(&schema); -} - -class TestFileFooter : public ::testing::Test { - public: - void SetUp() {} - - void CheckRoundtrip(const Schema* schema, const std::vector<FileBlock>& dictionaries, - const std::vector<FileBlock>& record_batches) { - auto buffer = std::make_shared<PoolBuffer>(); - io::BufferOutputStream stream(buffer); - - ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream)); - - std::unique_ptr<FileFooter> footer; - ASSERT_OK(FileFooter::Open(buffer, &footer)); - - ASSERT_EQ(MetadataVersion::V2, footer->version()); - - // Check schema - std::shared_ptr<Schema> schema2; - ASSERT_OK(footer->GetSchema(&schema2)); - assert_schema_equal(schema, schema2.get()); - - // Check blocks - ASSERT_EQ(dictionaries.size(), footer->num_dictionaries()); - ASSERT_EQ(record_batches.size(), footer->num_record_batches()); - - for (int i = 0; i < footer->num_dictionaries(); ++i) { - CheckBlocks(dictionaries[i], footer->dictionary(i)); - } - - for (int i = 0; i < footer->num_record_batches(); ++i) { - CheckBlocks(record_batches[i], footer->record_batch(i)); - } - } - - void CheckBlocks(const FileBlock& left, const FileBlock& right) { - ASSERT_EQ(left.offset, right.offset); - ASSERT_EQ(left.metadata_length, right.metadata_length); - ASSERT_EQ(left.body_length, right.body_length); - } - - private: - std::shared_ptr<Schema> example_schema_; -}; - -TEST_F(TestFileFooter, Basics) { - auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>()); - auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>()); - Schema schema({f0, f1}); - - std::vector<FileBlock> dictionaries; - dictionaries.emplace_back(8, 92, 900); - dictionaries.emplace_back(1000, 100, 1900); - dictionaries.emplace_back(3000, 100, 2900); - - std::vector<FileBlock> record_batches; - record_batches.emplace_back(6000, 100, 900); - record_batches.emplace_back(7000, 100, 1900); - record_batches.emplace_back(9000, 100, 2900); - record_batches.emplace_back(12000, 100, 3900); - - CheckRoundtrip(&schema, dictionaries, record_batches); + CheckRoundtrip(schema); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/json-integration-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index 757e6c0..95bc742 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -81,7 +81,7 @@ static Status ConvertJsonToArrow( for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr<RecordBatch> batch; RETURN_NOT_OK(reader->GetRecordBatch(i, &batch)); - RETURN_NOT_OK(writer->WriteRecordBatch(batch->columns(), batch->num_rows())); + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); } return writer->Close(); } @@ -108,7 +108,7 @@ static Status ConvertArrowToJson( for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr<RecordBatch> batch; RETURN_NOT_OK(reader->GetRecordBatch(i, &batch)); - RETURN_NOT_OK(writer->WriteRecordBatch(batch->columns(), batch->num_rows())); + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); } std::string result; http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/json.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc index 6e3a993..773fb74 100644 --- a/cpp/src/arrow/ipc/json.cc +++ b/cpp/src/arrow/ipc/json.cc @@ -64,25 +64,23 @@ class JsonWriter::JsonWriterImpl { return Status::OK(); } - Status WriteRecordBatch( - const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) { - DCHECK_EQ(static_cast<int>(columns.size()), schema_->num_fields()); + Status WriteRecordBatch(const RecordBatch& batch) { + DCHECK_EQ(batch.num_columns(), schema_->num_fields()); writer_->StartObject(); writer_->Key("count"); - writer_->Int(num_rows); + writer_->Int(batch.num_rows()); writer_->Key("columns"); writer_->StartArray(); for (int i = 0; i < schema_->num_fields(); ++i) { - const std::shared_ptr<Array>& column = columns[i]; + const std::shared_ptr<Array>& column = batch.column(i); - DCHECK_EQ(num_rows, column->length()) + DCHECK_EQ(batch.num_rows(), column->length()) << "Array length did not match record batch length"; - RETURN_NOT_OK( - WriteJsonArray(schema_->field(i)->name, *column.get(), writer_.get())); + RETURN_NOT_OK(WriteJsonArray(schema_->field(i)->name, *column, writer_.get())); } writer_->EndArray(); @@ -113,9 +111,8 @@ Status JsonWriter::Finish(std::string* result) { return impl_->Finish(result); } -Status JsonWriter::WriteRecordBatch( - const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) { - return impl_->WriteRecordBatch(columns, num_rows); +Status JsonWriter::WriteRecordBatch(const RecordBatch& batch) { + return impl_->WriteRecordBatch(batch); } // ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/json.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json.h b/cpp/src/arrow/ipc/json.h index 7395be4..88afdfa 100644 --- a/cpp/src/arrow/ipc/json.h +++ b/cpp/src/arrow/ipc/json.h @@ -46,8 +46,7 @@ class ARROW_EXPORT JsonWriter { // TODO(wesm): Write dictionaries - Status WriteRecordBatch( - const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows); + Status WriteRecordBatch(const RecordBatch& batch); Status Finish(std::string* result); http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/metadata-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index cc160c4..cd77220 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -282,10 +282,10 @@ flatbuf::Endianness endianness() { } Status SchemaToFlatbuffer( - FBB& fbb, const Schema* schema, flatbuffers::Offset<flatbuf::Schema>* out) { + FBB& fbb, const Schema& schema, flatbuffers::Offset<flatbuf::Schema>* out) { std::vector<FieldOffset> field_offsets; - for (int i = 0; i < schema->num_fields(); ++i) { - std::shared_ptr<Field> field = schema->field(i); + for (int i = 0; i < schema.num_fields(); ++i) { + std::shared_ptr<Field> field = schema.field(i); FieldOffset offset; RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, &offset)); field_offsets.push_back(offset); @@ -295,7 +295,7 @@ Status SchemaToFlatbuffer( return Status::OK(); } -Status MessageBuilder::SetSchema(const Schema* schema) { +Status MessageBuilder::SetSchema(const Schema& schema) { flatbuffers::Offset<flatbuf::Schema> fb_schema; RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, &fb_schema)); http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/metadata-internal.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index 4826ebe..d94a8ab 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -49,11 +49,11 @@ static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVe Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* out); Status SchemaToFlatbuffer( - FBB& fbb, const Schema* schema, flatbuffers::Offset<flatbuf::Schema>* out); + FBB& fbb, const Schema& schema, flatbuffers::Offset<flatbuf::Schema>* out); class MessageBuilder { public: - Status SetSchema(const Schema* schema); + Status SetSchema(const Schema& schema); Status SetRecordBatch(int32_t length, int64_t body_length, const std::vector<flatbuf::FieldNode>& nodes, http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index f0674ff..a97965c 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -38,7 +38,7 @@ namespace flatbuf = org::apache::arrow::flatbuf; namespace ipc { -Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out) { +Status WriteSchema(const Schema& schema, std::shared_ptr<Buffer>* out) { MessageBuilder message; RETURN_NOT_OK(message.SetSchema(schema)); RETURN_NOT_OK(message.Finish()); @@ -232,124 +232,5 @@ int RecordBatchMetadata::num_fields() const { return impl_->num_fields(); } -// ---------------------------------------------------------------------- -// File footer - -static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>> -FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) { - std::vector<flatbuf::Block> fb_blocks; - - for (const FileBlock& block : blocks) { - fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length); - } - - return fbb.CreateVectorOfStructs(fb_blocks); -} - -Status WriteFileFooter(const Schema* schema, const std::vector<FileBlock>& dictionaries, - const std::vector<FileBlock>& record_batches, io::OutputStream* out) { - FBB fbb; - - flatbuffers::Offset<flatbuf::Schema> fb_schema; - RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema)); - - auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); - auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); - - auto footer = flatbuf::CreateFooter( - fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches); - - fbb.Finish(footer); - - int32_t size = fbb.GetSize(); - - return out->Write(fbb.GetBufferPointer(), size); -} - -static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { - return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength()); -} - -class FileFooter::FileFooterImpl { - public: - FileFooterImpl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Footer* footer) - : buffer_(buffer), footer_(footer) {} - - int num_dictionaries() const { return footer_->dictionaries()->size(); } - - int num_record_batches() const { return footer_->recordBatches()->size(); } - - MetadataVersion::type version() const { - switch (footer_->version()) { - case flatbuf::MetadataVersion_V1: - return MetadataVersion::V1; - case flatbuf::MetadataVersion_V2: - return MetadataVersion::V2; - // Add cases as other versions become available - default: - return MetadataVersion::V2; - } - } - - FileBlock record_batch(int i) const { - return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i)); - } - - FileBlock dictionary(int i) const { - return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); - } - - Status GetSchema(std::shared_ptr<Schema>* out) const { - auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema()); - return schema_msg->GetSchema(out); - } - - private: - // Retain reference to memory - std::shared_ptr<Buffer> buffer_; - - const flatbuf::Footer* footer_; -}; - -FileFooter::FileFooter() {} - -FileFooter::~FileFooter() {} - -Status FileFooter::Open( - const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) { - const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data()); - - *out = std::unique_ptr<FileFooter>(new FileFooter()); - - // TODO(wesm): Verify the footer - (*out)->impl_.reset(new FileFooterImpl(buffer, footer)); - - return Status::OK(); -} - -int FileFooter::num_dictionaries() const { - return impl_->num_dictionaries(); -} - -int FileFooter::num_record_batches() const { - return impl_->num_record_batches(); -} - -MetadataVersion::type FileFooter::version() const { - return impl_->version(); -} - -FileBlock FileFooter::record_batch(int i) const { - return impl_->record_batch(i); -} - -FileBlock FileFooter::dictionary(int i) const { - return impl_->dictionary(i); -} - -Status FileFooter::GetSchema(std::shared_ptr<Schema>* out) const { - return impl_->GetSchema(out); -} - } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 1c4ef64..6e15ef3 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -49,7 +49,7 @@ struct MetadataVersion { // Serialize arrow::Schema as a Flatbuffer ARROW_EXPORT -Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out); +Status WriteSchema(const Schema& schema, std::shared_ptr<Buffer>* out); // Read interface classes. We do not fully deserialize the flatbuffers so that // individual fields metadata can be retrieved from very large schema without @@ -149,10 +149,8 @@ class ARROW_EXPORT Message { std::unique_ptr<MessageImpl> impl_; }; -// ---------------------------------------------------------------------- -// File footer for file-like representation - struct FileBlock { + FileBlock() {} FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length) : offset(offset), metadata_length(metadata_length), body_length(body_length) {} @@ -161,32 +159,6 @@ struct FileBlock { int64_t body_length; }; -ARROW_EXPORT -Status WriteFileFooter(const Schema* schema, const std::vector<FileBlock>& dictionaries, - const std::vector<FileBlock>& record_batches, io::OutputStream* out); - -class ARROW_EXPORT FileFooter { - public: - ~FileFooter(); - - static Status Open( - const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out); - - int num_dictionaries() const; - int num_record_batches() const; - MetadataVersion::type version() const; - - FileBlock record_batch(int i) const; - FileBlock dictionary(int i) const; - - Status GetSchema(std::shared_ptr<Schema>* out) const; - - private: - FileFooter(); - class FileFooterImpl; - std::unique_ptr<FileFooterImpl> impl_; -}; - } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/stream.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc new file mode 100644 index 0000000..a2ca672 --- /dev/null +++ b/cpp/src/arrow/ipc/stream.cc @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/ipc/stream.h" + +#include <cstdint> +#include <cstring> +#include <sstream> +#include <vector> + +#include "arrow/buffer.h" +#include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/adapter.h" +#include "arrow/ipc/metadata.h" +#include "arrow/ipc/util.h" +#include "arrow/schema.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace ipc { + +// ---------------------------------------------------------------------- +// Stream writer implementation + +StreamWriter::~StreamWriter() {} + +StreamWriter::StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema) + : sink_(sink), schema_(schema), position_(-1), started_(false) {} + +Status StreamWriter::UpdatePosition() { + return sink_->Tell(&position_); +} + +Status StreamWriter::Write(const uint8_t* data, int64_t nbytes) { + RETURN_NOT_OK(sink_->Write(data, nbytes)); + position_ += nbytes; + return Status::OK(); +} + +Status StreamWriter::Align() { + int64_t remainder = PaddedLength(position_) - position_; + if (remainder > 0) { return Write(kPaddingBytes, remainder); } + return Status::OK(); +} + +Status StreamWriter::WriteAligned(const uint8_t* data, int64_t nbytes) { + RETURN_NOT_OK(Write(data, nbytes)); + return Align(); +} + +Status StreamWriter::CheckStarted() { + if (!started_) { return Start(); } + return Status::OK(); +} + +Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, FileBlock* block) { + RETURN_NOT_OK(CheckStarted()); + + block->offset = position_; + + // Frame of reference in file format is 0, see ARROW-384 + const int64_t buffer_start_offset = 0; + RETURN_NOT_OK(arrow::ipc::WriteRecordBatch( + batch, buffer_start_offset, sink_, &block->metadata_length, &block->body_length)); + RETURN_NOT_OK(UpdatePosition()); + + DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes"; + + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// StreamWriter implementation + +Status StreamWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, + std::shared_ptr<StreamWriter>* out) { + // ctor is private + *out = std::shared_ptr<StreamWriter>(new StreamWriter(sink, schema)); + RETURN_NOT_OK((*out)->UpdatePosition()); + return Status::OK(); +} + +Status StreamWriter::Start() { + std::shared_ptr<Buffer> schema_fb; + RETURN_NOT_OK(WriteSchema(*schema_, &schema_fb)); + + int32_t flatbuffer_size = schema_fb->size(); + RETURN_NOT_OK( + Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t))); + + // Write the flatbuffer + RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size)); + started_ = true; + return Status::OK(); +} + +Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) { + // Pass FileBlock, but results not used + FileBlock dummy_block; + return WriteRecordBatch(batch, &dummy_block); +} + +Status StreamWriter::Close() { + // Close the stream + RETURN_NOT_OK(CheckStarted()); + return sink_->Close(); +} + +// ---------------------------------------------------------------------- +// StreamReader implementation + +StreamReader::StreamReader(const std::shared_ptr<io::InputStream>& stream) + : stream_(stream), schema_(nullptr) {} + +StreamReader::~StreamReader() {} + +Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream, + std::shared_ptr<StreamReader>* reader) { + // Private ctor + *reader = std::shared_ptr<StreamReader>(new StreamReader(stream)); + return (*reader)->ReadSchema(); +} + +Status StreamReader::ReadSchema() { + std::shared_ptr<Message> message; + RETURN_NOT_OK(ReadNextMessage(&message)); + + if (message->type() != Message::SCHEMA) { + return Status::IOError("First message was not schema type"); + } + + SchemaMetadata schema_meta(message); + + // TODO(wesm): If the schema contains dictionaries, we must read all the + // dictionaries from the stream before constructing the final Schema + return schema_meta.GetSchema(&schema_); +} + +Status StreamReader::ReadNextMessage(std::shared_ptr<Message>* message) { + std::shared_ptr<Buffer> buffer; + RETURN_NOT_OK(stream_->Read(sizeof(int32_t), &buffer)); + + if (buffer->size() != sizeof(int32_t)) { + *message = nullptr; + return Status::OK(); + } + + int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data()); + + RETURN_NOT_OK(stream_->Read(message_length, &buffer)); + if (buffer->size() != message_length) { + return Status::IOError("Unexpected end of stream trying to read message"); + } + return Message::Open(buffer, 0, message); +} + +std::shared_ptr<Schema> StreamReader::schema() const { + return schema_; +} + +Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { + std::shared_ptr<Message> message; + RETURN_NOT_OK(ReadNextMessage(&message)); + + if (message == nullptr) { + // End of stream + *batch = nullptr; + return Status::OK(); + } + + if (message->type() != Message::RECORD_BATCH) { + return Status::IOError("Metadata not record batch"); + } + + auto batch_metadata = std::make_shared<RecordBatchMetadata>(message); + + std::shared_ptr<Buffer> batch_body; + RETURN_NOT_OK(stream_->Read(message->body_length(), &batch_body)); + + if (batch_body->size() < message->body_length()) { + return Status::IOError("Unexpected EOS when reading message body"); + } + + io::BufferReader reader(batch_body); + + return ReadRecordBatch(batch_metadata, schema_, &reader, batch); +} + +} // namespace ipc +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/stream.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h new file mode 100644 index 0000000..0b0e62f --- /dev/null +++ b/cpp/src/arrow/ipc/stream.h @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Implement Arrow streaming binary format + +#ifndef ARROW_IPC_STREAM_H +#define ARROW_IPC_STREAM_H + +#include <cstdint> +#include <memory> + +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; +class Buffer; +struct Field; +class RecordBatch; +class Schema; +class Status; + +namespace io { + +class InputStream; +class OutputStream; + +} // namespace io + +namespace ipc { + +struct FileBlock; +class Message; + +class ARROW_EXPORT StreamWriter { + public: + virtual ~StreamWriter(); + + static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, + std::shared_ptr<StreamWriter>* out); + + virtual Status WriteRecordBatch(const RecordBatch& batch); + virtual Status Close(); + + protected: + StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema); + + virtual Status Start(); + + Status CheckStarted(); + Status UpdatePosition(); + + Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block); + + // Adds padding bytes if necessary to ensure all memory blocks are written on + // 8-byte boundaries. + Status Align(); + + // Write data and update position + Status Write(const uint8_t* data, int64_t nbytes); + + // Write and align + Status WriteAligned(const uint8_t* data, int64_t nbytes); + + io::OutputStream* sink_; + std::shared_ptr<Schema> schema_; + int64_t position_; + bool started_; +}; + +class ARROW_EXPORT StreamReader { + public: + ~StreamReader(); + + // Open an stream. + static Status Open(const std::shared_ptr<io::InputStream>& stream, + std::shared_ptr<StreamReader>* reader); + + std::shared_ptr<Schema> schema() const; + + // Returned batch is nullptr when end of stream reached + Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch); + + private: + explicit StreamReader(const std::shared_ptr<io::InputStream>& stream); + + Status ReadSchema(); + + Status ReadNextMessage(std::shared_ptr<Message>* message); + + std::shared_ptr<io::InputStream> stream_; + std::shared_ptr<Schema> schema_; +}; + +} // namespace ipc +} // namespace arrow + +#endif // ARROW_IPC_STREAM_H http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/test-common.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 3faeebf..ca790de 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -36,6 +36,15 @@ namespace arrow { namespace ipc { +static inline void AssertSchemaEqual(const Schema& lhs, const Schema& rhs) { + if (!lhs.Equals(rhs)) { + std::stringstream ss; + ss << "left schema: " << lhs.ToString() << std::endl + << "right schema: " << rhs.ToString() << std::endl; + FAIL() << ss.str(); + } +} + const auto kListInt32 = list(int32()); const auto kListListInt32 = list(kListInt32); http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/python/pyarrow/includes/libarrow_ipc.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd index b3185b1..8295760 100644 --- a/python/pyarrow/includes/libarrow_ipc.pxd +++ b/python/pyarrow/includes/libarrow_ipc.pxd @@ -29,8 +29,7 @@ cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil: CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, shared_ptr[CFileWriter]* out) - CStatus WriteRecordBatch(const vector[shared_ptr[CArray]]& columns, - int32_t num_rows) + CStatus WriteRecordBatch(const CRecordBatch& batch) CStatus Close() http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/python/pyarrow/ipc.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx index abc5e1b..22069a7 100644 --- a/python/pyarrow/ipc.pyx +++ b/python/pyarrow/ipc.pyx @@ -21,6 +21,8 @@ # distutils: language = c++ # cython: embedsignature = True +from cython.operator cimport dereference as deref + from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_io cimport * from pyarrow.includes.libarrow_ipc cimport * @@ -58,10 +60,9 @@ cdef class ArrowFileWriter: self.close() def write_record_batch(self, RecordBatch batch): - cdef CRecordBatch* bptr = batch.batch with nogil: check_status(self.writer.get() - .WriteRecordBatch(bptr.columns(), bptr.num_rows())) + .WriteRecordBatch(deref(batch.batch))) def close(self): with nogil: http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/python/src/pyarrow/adapters/pandas.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 6623e23..feafa3d 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -254,16 +254,16 @@ struct arrow_traits<Type::BOOL> { static constexpr bool is_numeric_nullable = false; }; -#define INT_DECL(TYPE) \ - template <> \ - struct arrow_traits<Type::TYPE> { \ - static constexpr int npy_type = NPY_##TYPE; \ - static constexpr bool supports_nulls = false; \ - static constexpr double na_value = NAN; \ - static constexpr bool is_boolean = false; \ - static constexpr bool is_numeric_not_nullable = true; \ - static constexpr bool is_numeric_nullable = false; \ - typedef typename npy_traits<NPY_##TYPE>::value_type T; \ +#define INT_DECL(TYPE) \ + template <> \ + struct arrow_traits<Type::TYPE> { \ + static constexpr int npy_type = NPY_##TYPE; \ + static constexpr bool supports_nulls = false; \ + static constexpr double na_value = NAN; \ + static constexpr bool is_boolean = false; \ + static constexpr bool is_numeric_not_nullable = true; \ + static constexpr bool is_numeric_nullable = false; \ + typedef typename npy_traits<NPY_##TYPE>::value_type T; \ }; INT_DECL(INT8); @@ -1803,7 +1803,7 @@ class ArrowDeserializer { // types Status Convert(PyObject** out) { -#define CONVERT_CASE(TYPE) \ +#define CONVERT_CASE(TYPE) \ case Type::TYPE: { \ RETURN_NOT_OK(ConvertValues<Type::TYPE>()); \ } break; @@ -1857,8 +1857,7 @@ class ArrowDeserializer { } template <int TYPE> - inline typename std::enable_if<TYPE == Type::DATE, Status>::type - ConvertValues() { + inline typename std::enable_if<TYPE == Type::DATE, Status>::type ConvertValues() { typedef typename arrow_traits<TYPE>::T T; RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type)); @@ -1910,24 +1909,21 @@ class ArrowDeserializer { // UTF8 strings template <int TYPE> - inline typename std::enable_if<TYPE == Type::STRING, Status>::type - ConvertValues() { + inline typename std::enable_if<TYPE == Type::STRING, Status>::type ConvertValues() { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); return ConvertBinaryLike<arrow::StringArray>(data_, out_values); } template <int T2> - inline typename std::enable_if<T2 == Type::BINARY, Status>::type - ConvertValues() { + inline typename std::enable_if<T2 == Type::BINARY, Status>::type ConvertValues() { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); return ConvertBinaryLike<arrow::BinaryArray>(data_, out_values); } template <int TYPE> - inline typename std::enable_if<TYPE == Type::DICTIONARY, Status>::type - ConvertValues() { + inline typename std::enable_if<TYPE == Type::DICTIONARY, Status>::type ConvertValues() { std::shared_ptr<PandasBlock> block; RETURN_NOT_OK(MakeCategoricalBlock(col_->type(), col_->length(), &block)); RETURN_NOT_OK(block->Write(col_, 0, 0));