Repository: arrow Updated Branches: refs/heads/master 86154f0be -> e44ede87c
ARROW-1343: [Java] Aligning serialized schema, end of buffers in RecordBatches Author: Emilio Lahr-Vivaz <elahrvi...@ccri.com> Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #954 from elahrvivaz/align_end and squashes the following commits: 79ac1204 [Wes McKinney] Revert to NDEBUG because it's a standard define in release builds in MSVC ae6bc9f2 [Wes McKinney] Use __declspec(noreturn) in MSVC. Not sure why this suddenly showed up 74b29ccf [Wes McKinney] Add notes to IPC.md to make alignment contract more clear e2f0114b [Wes McKinney] Add C++ DCHECKs on read path for aligned buffers, aligned file block offset, lengths 3d64c9f5 [Wes McKinney] Align stream schema message in C++, DCHECKs for FileBlocks 4778ee1f [Emilio Lahr-Vivaz] adding padding to magic bytes in file format 53429159 [Emilio Lahr-Vivaz] using asserts instead of padding checks, adding padding to ArrowRecordBatch.calculateBodySize, moving align to writeBufferBatches a12b4ff8 [Emilio Lahr-Vivaz] comments 0b32265b [Emilio Lahr-Vivaz] aligning schema write 26bbc255 [Emilio Lahr-Vivaz] Merge branch 'ARROW-1340' into align_end a307779e [Emilio Lahr-Vivaz] ARROW-1340: [Java] Fix NullableMapVector field metadata b2bf86d4 [Emilio Lahr-Vivaz] WIP for aligning end of buffers Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e44ede87 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e44ede87 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e44ede87 Branch: refs/heads/master Commit: e44ede87c069087e11b4f57682090e01ae06a746 Parents: 86154f0 Author: Emilio Lahr-Vivaz <elahrvi...@ccri.com> Authored: Wed Aug 9 09:36:18 2017 -0400 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Wed Aug 9 09:36:18 2017 -0400 ---------------------------------------------------------------------- cpp/src/arrow/ipc/metadata.cc | 15 ++++++++ cpp/src/arrow/ipc/reader.cc | 13 +++++++ cpp/src/arrow/ipc/util.h | 4 ++- cpp/src/arrow/ipc/writer.cc | 37 +++++++++++--------- cpp/src/arrow/util/bit-util.h | 18 +++++++--- cpp/src/arrow/util/logging.h | 6 ++-- cpp/src/arrow/util/macros.h | 6 ++++ format/IPC.md | 10 +++++- .../arrow/vector/file/ArrowFileWriter.java | 4 +-- .../apache/arrow/vector/file/ArrowMagic.java | 5 ++- .../arrow/vector/schema/ArrowRecordBatch.java | 3 ++ .../arrow/vector/stream/MessageSerializer.java | 34 +++++++++++++----- 12 files changed, 118 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/ipc/metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index faf01a5..c953421 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -36,6 +36,7 @@ #include "arrow/status.h" #include "arrow/tensor.h" #include "arrow/type.h" +#include "arrow/util/logging.h" namespace arrow { @@ -773,6 +774,20 @@ Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dicti flatbuffers::Offset<flatbuf::Schema> fb_schema; RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema)); +#ifndef NDEBUG + for (size_t i = 0; i < dictionaries.size(); ++i) { + DCHECK(BitUtil::IsMultipleOf8(dictionaries[i].offset)) << i; + DCHECK(BitUtil::IsMultipleOf8(dictionaries[i].metadata_length)) << i; + DCHECK(BitUtil::IsMultipleOf8(dictionaries[i].body_length)) << i; + } + + for (size_t i = 0; i < record_batches.size(); ++i) { + DCHECK(BitUtil::IsMultipleOf8(record_batches[i].offset)) << i; + DCHECK(BitUtil::IsMultipleOf8(record_batches[i].metadata_length)) << i; + DCHECK(BitUtil::IsMultipleOf8(record_batches[i].body_length)) << i; + } +#endif + auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/ipc/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 8ae8280..6ea907e 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -34,6 +34,7 @@ #include "arrow/table.h" #include "arrow/tensor.h" #include "arrow/type.h" +#include "arrow/util/bit-util.h" #include "arrow/util/logging.h" #include "arrow/visitor_inline.h" @@ -59,6 +60,9 @@ class IpcComponentSource { *out = nullptr; return Status::OK(); } else { + DCHECK(BitUtil::IsMultipleOf8(buffer->offset())) + << "Buffer " << buffer_index + << " did not start on 8-byte aligned offset: " << buffer->offset(); return file_->ReadAt(buffer->offset(), buffer->length(), out); } } @@ -550,6 +554,10 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { DCHECK_LT(i, num_record_batches()); FileBlock block = record_batch(i); + DCHECK(BitUtil::IsMultipleOf8(block.offset)); + DCHECK(BitUtil::IsMultipleOf8(block.metadata_length)); + DCHECK(BitUtil::IsMultipleOf8(block.body_length)); + std::unique_ptr<Message> message; RETURN_NOT_OK( ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); @@ -564,6 +572,11 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { // Read all the dictionaries for (int i = 0; i < num_dictionaries(); ++i) { FileBlock block = dictionary(i); + + DCHECK(BitUtil::IsMultipleOf8(block.offset)); + DCHECK(BitUtil::IsMultipleOf8(block.metadata_length)); + DCHECK(BitUtil::IsMultipleOf8(block.body_length)); + std::unique_ptr<Message> message; RETURN_NOT_OK( ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/ipc/util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h index 49a7d01..412f312 100644 --- a/cpp/src/arrow/ipc/util.h +++ b/cpp/src/arrow/ipc/util.h @@ -27,10 +27,12 @@ namespace arrow { namespace ipc { -// Align on 8-byte boundaries // Buffers are padded to 64-byte boundaries (for SIMD) static constexpr int kArrowAlignment = 64; +// Align on 8-byte boundaries in IPC +static constexpr int kArrowIpcAlignment = 8; + static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0}; static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) { http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/ipc/writer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 163b27b..bc07dc6 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -156,7 +156,7 @@ class RecordBatchSerializer : public ArrayVisitor { // The buffer might be null if we are handling zero row lengths. if (buffer) { size = buffer->size(); - padding = BitUtil::RoundUpToMultipleOf64(size) - size; + padding = BitUtil::RoundUpToMultipleOf8(size) - size; } // TODO(wesm): We currently have no notion of shared memory page id's, @@ -172,7 +172,7 @@ class RecordBatchSerializer : public ArrayVisitor { } *body_length = offset - buffer_start_offset_; - DCHECK(BitUtil::IsMultipleOf64(*body_length)); + DCHECK(BitUtil::IsMultipleOf8(*body_length)); return Status::OK(); } @@ -216,7 +216,7 @@ class RecordBatchSerializer : public ArrayVisitor { // The buffer might be null if we are handling zero row lengths. if (buffer) { size = buffer->size(); - padding = BitUtil::RoundUpToMultipleOf64(size) - size; + padding = BitUtil::RoundUpToMultipleOf8(size) - size; } if (size > 0) { @@ -251,7 +251,7 @@ class RecordBatchSerializer : public ArrayVisitor { // Send padding if it's available const int64_t buffer_length = - std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width), + std::min(BitUtil::RoundUpToMultipleOf8(array.length() * type_width), data->size() - byte_offset); data = SliceBuffer(data, byte_offset, buffer_length); } @@ -618,15 +618,7 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { } virtual Status Start() { - std::shared_ptr<Buffer> schema_fb; - RETURN_NOT_OK(WriteSchemaMessage(*schema_, &dictionary_memo_, &schema_fb)); - - int32_t flatbuffer_size = static_cast<int32_t>(schema_fb->size()); - RETURN_NOT_OK( - Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t))); - - // Write the flatbuffer - RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size)); + RETURN_NOT_OK(WriteSchema()); // If there are any dictionaries, write them as the next messages RETURN_NOT_OK(WriteDictionaries()); @@ -635,6 +627,17 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { return Status::OK(); } + Status WriteSchema() { + std::shared_ptr<Buffer> schema_fb; + RETURN_NOT_OK(WriteSchemaMessage(*schema_, &dictionary_memo_, &schema_fb)); + + int32_t metadata_length = 0; + RETURN_NOT_OK(WriteMessage(*schema_fb, sink_, &metadata_length)); + RETURN_NOT_OK(UpdatePosition()); + DCHECK_EQ(0, position_ % 8) << "WriteSchema did not perform an aligned write"; + return Status::OK(); + } + virtual Status Close() { // Write the schema if not already written // User is responsible for closing the OutputStream @@ -701,9 +704,9 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { &record_batches_[record_batches_.size() - 1]); } - // Adds padding bytes if necessary to ensure all memory blocks are written on - // 64-byte (or other alignment) boundaries. - Status Align(int64_t alignment = kArrowAlignment) { + Status Align(int64_t alignment = kArrowIpcAlignment) { + // Adds padding bytes if necessary to ensure all memory blocks are written on + // 8-byte (or other alignment) boundaries. int64_t remainder = PaddedLength(position_, alignment) - position_; if (remainder > 0) { return Write(kPaddingBytes, remainder); @@ -774,7 +777,7 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl // It is only necessary to align to 8-byte boundary at the start of the file RETURN_NOT_OK(Write(reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes))); - RETURN_NOT_OK(Align(8)); + RETURN_NOT_OK(Align()); // We write the schema at the start of the file (and the end). This also // writes all the dictionaries at the beginning of the file http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/util/bit-util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h index fc360ba..5c3938a 100644 --- a/cpp/src/arrow/util/bit-util.h +++ b/cpp/src/arrow/util/bit-util.h @@ -217,13 +217,13 @@ static inline uint32_t RoundUpNumi64(uint32_t bits) { return (bits + 63) >> 6; } /// Returns the rounded down to 64 multiple. static inline uint32_t RoundDownNumi64(uint32_t bits) { return bits >> 6; } -static inline int64_t RoundUpToMultipleOf64(int64_t num) { +template <int64_t ROUND_TO> +static inline int64_t RoundToPowerOfTwo(int64_t num) { // TODO(wesm): is this definitely needed? // DCHECK_GE(num, 0); - constexpr int64_t round_to = 64; - constexpr int64_t force_carry_addend = round_to - 1; - constexpr int64_t truncate_bitmask = ~(round_to - 1); - constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to; + constexpr int64_t force_carry_addend = ROUND_TO - 1; + constexpr int64_t truncate_bitmask = ~(ROUND_TO - 1); + constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - ROUND_TO; if (num <= max_roundable_num) { return (num + force_carry_addend) & truncate_bitmask; } @@ -231,6 +231,14 @@ static inline int64_t RoundUpToMultipleOf64(int64_t num) { return num; } +static inline int64_t RoundUpToMultipleOf64(int64_t num) { + return RoundToPowerOfTwo<64>(num); +} + +static inline int64_t RoundUpToMultipleOf8(int64_t num) { + return RoundToPowerOfTwo<8>(num); +} + /// Non hw accelerated pop count. /// TODO: we don't use this in any perf sensitive code paths currently. There /// might be a much faster way to implement this. http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/util/logging.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h index 89e69f9..998f7ed 100644 --- a/cpp/src/arrow/util/logging.h +++ b/cpp/src/arrow/util/logging.h @@ -21,6 +21,8 @@ #include <cstdlib> #include <iostream> +#include "arrow/util/macros.h" + namespace arrow { // Stubbed versions of macros defined in glog/logging.h, intended for @@ -127,9 +129,9 @@ class CerrLog { class FatalLog : public CerrLog { public: explicit FatalLog(int /* severity */) // NOLINT - : CerrLog(ARROW_FATAL){} // NOLINT + : CerrLog(ARROW_FATAL) {} // NOLINT - [[noreturn]] ~FatalLog() { + ARROW_NORETURN ~FatalLog() { if (has_logged_) { std::cerr << std::endl; } http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/util/macros.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/macros.h b/cpp/src/arrow/util/macros.h index a2f704f..fe2d768 100644 --- a/cpp/src/arrow/util/macros.h +++ b/cpp/src/arrow/util/macros.h @@ -36,7 +36,13 @@ #if defined(__GNUC__) #define ARROW_PREDICT_FALSE(x) (__builtin_expect(x, 0)) #define ARROW_PREDICT_TRUE(x) (__builtin_expect(!!(x), 1)) +#define ARROW_NORETURN __attribute__((noreturn)) +#elif defined(_MSC_VER) +#define ARROW_NORETURN __declspec(noreturn) +#define ARROW_PREDICT_FALSE(x) x +#define ARROW_PREDICT_TRUE(x) x #else +#define ARROW_NORETURN #define ARROW_PREDICT_FALSE(x) x #define ARROW_PREDICT_TRUE(x) x #endif http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/format/IPC.md ---------------------------------------------------------------------- diff --git a/format/IPC.md b/format/IPC.md index 3fd234e..2f79031 100644 --- a/format/IPC.md +++ b/format/IPC.md @@ -27,7 +27,7 @@ Data components in the stream and file formats are represented as encapsulated * A length prefix indicating the metadata size * The message metadata as a [Flatbuffer][3] * Padding bytes to an 8-byte boundary -* The message body +* The message body, which must be a multiple of 8 bytes Schematically, we have: @@ -38,6 +38,10 @@ Schematically, we have: <message body> ``` +The complete serialized message must be a multiple of 8 bytes so that messages +can be relocated between streams. Otherwise the amount of padding between the +metadata and the message body could be non-deterministic. + The `metadata_size` includes the size of the flatbuffer plus padding. The `Message` flatbuffer includes a version number, the particular message (as a flatbuffer union), and the size of the message body: @@ -154,6 +158,10 @@ struct Block { } ``` +The `metaDataLength` here includes the metadata length prefix, serialized +metadata, and any additional padding bytes, and by construction must be a +multiple of 8 bytes. + Some notes about this * The `Block` offset indicates the starting byte of the record batch. http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java index 06519bc..1d92d2b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java @@ -38,7 +38,7 @@ public class ArrowFileWriter extends ArrowWriter { @Override protected void startInternal(WriteChannel out) throws IOException { - ArrowMagic.writeMagic(out); + ArrowMagic.writeMagic(out, true); } @Override @@ -54,7 +54,7 @@ public class ArrowFileWriter extends ArrowWriter { } out.writeIntLittleEndian(footerLength); LOGGER.debug(String.format("Footer starts at %d, length: %d", footerStart, footerLength)); - ArrowMagic.writeMagic(out); + ArrowMagic.writeMagic(out, false); LOGGER.debug(String.format("magic written, now at %d", out.getCurrentPosition())); } } http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java index 0d2da37..68313e7 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java @@ -28,8 +28,11 @@ public class ArrowMagic { public static final int MAGIC_LENGTH = MAGIC.length; - public static void writeMagic(WriteChannel out) throws IOException { + public static void writeMagic(WriteChannel out, boolean align) throws IOException { out.write(MAGIC); + if (align) { + out.align(); + } } public static boolean validateMagic(byte[] array) { http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java index d2f3782..c842d4c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java @@ -168,6 +168,9 @@ public class ArrowRecordBatch implements ArrowMessage { ByteBuffer nioBuffer = buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes()); size += nioBuffer.remaining(); + if (size % 8 != 0) { + size += 8 - (size % 8); + } } return size; } http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java index a70d029..f69aa41 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java @@ -78,12 +78,25 @@ public class MessageSerializer { * @throws IOException if something went wrong */ public static long serialize(WriteChannel out, Schema schema) throws IOException { + long start = out.getCurrentPosition(); + assert start % 8 == 0; + FlatBufferBuilder builder = new FlatBufferBuilder(); int schemaOffset = schema.getSchema(builder); ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.Schema, schemaOffset, 0); - long size = out.writeIntLittleEndian(serializedMessage.remaining()); - size += out.write(serializedMessage); - return size; + + int size = serializedMessage.remaining(); + // ensure that message aligns to 8 byte padding - 4 bytes for size, then message body + if ((size + 4) % 8 != 0) { + size += 8 - (size + 4) % 8; + } + + out.writeIntLittleEndian(size); + out.write(serializedMessage); + out.align(); // any bytes written are already captured by our size modification above + + assert (size + 4) % 8 == 0; + return size + 4; } /** @@ -120,6 +133,7 @@ public class MessageSerializer { long start = out.getCurrentPosition(); int bodyLength = batch.computeBodyLength(); + assert bodyLength % 8 == 0; FlatBufferBuilder builder = new FlatBufferBuilder(); int batchOffset = batch.writeTo(builder); @@ -141,6 +155,7 @@ public class MessageSerializer { out.align(); long bufferLength = writeBatchBuffers(out, batch); + assert bufferLength % 8 == 0; // Metadata size in the Block account for the size prefix return new ArrowBlock(start, metadataLength + 4, bufferLength); @@ -164,6 +179,7 @@ public class MessageSerializer { " != " + startPosition + layout.getSize()); } } + out.align(); return out.getCurrentPosition() - bufferStart; } @@ -268,6 +284,7 @@ public class MessageSerializer { public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch) throws IOException { long start = out.getCurrentPosition(); int bodyLength = batch.computeBodyLength(); + assert bodyLength % 8 == 0; FlatBufferBuilder builder = new FlatBufferBuilder(); int batchOffset = batch.writeTo(builder); @@ -276,10 +293,10 @@ public class MessageSerializer { int metadataLength = serializedMessage.remaining(); - // Add extra padding bytes so that length prefix + metadata is a multiple - // of 8 after alignment - if ((start + metadataLength + 4) % 8 != 0) { - metadataLength += 8 - (start + metadataLength + 4) % 8; + // calculate alignment bytes so that metadata length points to the correct location after alignment + int padding = (int) ((start + metadataLength + 4) % 8); + if (padding != 0) { + metadataLength += (8 - padding); } out.writeIntLittleEndian(metadataLength); @@ -290,9 +307,10 @@ public class MessageSerializer { // write the embedded record batch long bufferLength = writeBatchBuffers(out, batch.getDictionary()); + assert bufferLength % 8 == 0; // Metadata size in the Block account for the size prefix - return new ArrowBlock(start, metadataLength + 4, bufferLength + 8); + return new ArrowBlock(start, metadataLength + 4, bufferLength); } /**