Repository: arrow
Updated Branches:
  refs/heads/master 86154f0be -> e44ede87c


ARROW-1343: [Java] Aligning serialized schema, end of buffers in RecordBatches

Author: Emilio Lahr-Vivaz <elahrvi...@ccri.com>
Author: Wes McKinney <wes.mckin...@twosigma.com>

Closes #954 from elahrvivaz/align_end and squashes the following commits:

79ac1204 [Wes McKinney] Revert to NDEBUG because it's a standard define in 
release builds in MSVC
ae6bc9f2 [Wes McKinney] Use __declspec(noreturn) in MSVC. Not sure why this 
suddenly showed up
74b29ccf [Wes McKinney] Add notes to IPC.md to make alignment contract more 
clear
e2f0114b [Wes McKinney] Add C++ DCHECKs on read path for aligned buffers, 
aligned file block offset, lengths
3d64c9f5 [Wes McKinney] Align stream schema message in C++, DCHECKs for 
FileBlocks
4778ee1f [Emilio Lahr-Vivaz] adding padding to magic bytes in file format
53429159 [Emilio Lahr-Vivaz] using asserts instead of padding checks, adding 
padding to ArrowRecordBatch.calculateBodySize, moving align to 
writeBufferBatches
a12b4ff8 [Emilio Lahr-Vivaz] comments
0b32265b [Emilio Lahr-Vivaz] aligning schema write
26bbc255 [Emilio Lahr-Vivaz] Merge branch 'ARROW-1340' into align_end
a307779e [Emilio Lahr-Vivaz] ARROW-1340: [Java] Fix NullableMapVector field 
metadata
b2bf86d4 [Emilio Lahr-Vivaz] WIP for aligning end of buffers


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e44ede87
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e44ede87
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e44ede87

Branch: refs/heads/master
Commit: e44ede87c069087e11b4f57682090e01ae06a746
Parents: 86154f0
Author: Emilio Lahr-Vivaz <elahrvi...@ccri.com>
Authored: Wed Aug 9 09:36:18 2017 -0400
Committer: Wes McKinney <wes.mckin...@twosigma.com>
Committed: Wed Aug 9 09:36:18 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/ipc/metadata.cc                   | 15 ++++++++
 cpp/src/arrow/ipc/reader.cc                     | 13 +++++++
 cpp/src/arrow/ipc/util.h                        |  4 ++-
 cpp/src/arrow/ipc/writer.cc                     | 37 +++++++++++---------
 cpp/src/arrow/util/bit-util.h                   | 18 +++++++---
 cpp/src/arrow/util/logging.h                    |  6 ++--
 cpp/src/arrow/util/macros.h                     |  6 ++++
 format/IPC.md                                   | 10 +++++-
 .../arrow/vector/file/ArrowFileWriter.java      |  4 +--
 .../apache/arrow/vector/file/ArrowMagic.java    |  5 ++-
 .../arrow/vector/schema/ArrowRecordBatch.java   |  3 ++
 .../arrow/vector/stream/MessageSerializer.java  | 34 +++++++++++++-----
 12 files changed, 118 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index faf01a5..c953421 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -36,6 +36,7 @@
 #include "arrow/status.h"
 #include "arrow/tensor.h"
 #include "arrow/type.h"
+#include "arrow/util/logging.h"
 
 namespace arrow {
 
@@ -773,6 +774,20 @@ Status WriteFileFooter(const Schema& schema, const 
std::vector<FileBlock>& dicti
   flatbuffers::Offset<flatbuf::Schema> fb_schema;
   RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
 
+#ifndef NDEBUG
+  for (size_t i = 0; i < dictionaries.size(); ++i) {
+    DCHECK(BitUtil::IsMultipleOf8(dictionaries[i].offset)) << i;
+    DCHECK(BitUtil::IsMultipleOf8(dictionaries[i].metadata_length)) << i;
+    DCHECK(BitUtil::IsMultipleOf8(dictionaries[i].body_length)) << i;
+  }
+
+  for (size_t i = 0; i < record_batches.size(); ++i) {
+    DCHECK(BitUtil::IsMultipleOf8(record_batches[i].offset)) << i;
+    DCHECK(BitUtil::IsMultipleOf8(record_batches[i].metadata_length)) << i;
+    DCHECK(BitUtil::IsMultipleOf8(record_batches[i].body_length)) << i;
+  }
+#endif
+
   auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
   auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 8ae8280..6ea907e 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -34,6 +34,7 @@
 #include "arrow/table.h"
 #include "arrow/tensor.h"
 #include "arrow/type.h"
+#include "arrow/util/bit-util.h"
 #include "arrow/util/logging.h"
 #include "arrow/visitor_inline.h"
 
@@ -59,6 +60,9 @@ class IpcComponentSource {
       *out = nullptr;
       return Status::OK();
     } else {
+      DCHECK(BitUtil::IsMultipleOf8(buffer->offset()))
+          << "Buffer " << buffer_index
+          << " did not start on 8-byte aligned offset: " << buffer->offset();
       return file_->ReadAt(buffer->offset(), buffer->length(), out);
     }
   }
@@ -550,6 +554,10 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
     DCHECK_LT(i, num_record_batches());
     FileBlock block = record_batch(i);
 
+    DCHECK(BitUtil::IsMultipleOf8(block.offset));
+    DCHECK(BitUtil::IsMultipleOf8(block.metadata_length));
+    DCHECK(BitUtil::IsMultipleOf8(block.body_length));
+
     std::unique_ptr<Message> message;
     RETURN_NOT_OK(
         ReadMessage(block.offset, block.metadata_length, file_.get(), 
&message));
@@ -564,6 +572,11 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
     // Read all the dictionaries
     for (int i = 0; i < num_dictionaries(); ++i) {
       FileBlock block = dictionary(i);
+
+      DCHECK(BitUtil::IsMultipleOf8(block.offset));
+      DCHECK(BitUtil::IsMultipleOf8(block.metadata_length));
+      DCHECK(BitUtil::IsMultipleOf8(block.body_length));
+
       std::unique_ptr<Message> message;
       RETURN_NOT_OK(
           ReadMessage(block.offset, block.metadata_length, file_.get(), 
&message));

http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/ipc/util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h
index 49a7d01..412f312 100644
--- a/cpp/src/arrow/ipc/util.h
+++ b/cpp/src/arrow/ipc/util.h
@@ -27,10 +27,12 @@
 namespace arrow {
 namespace ipc {
 
-// Align on 8-byte boundaries
 // Buffers are padded to 64-byte boundaries (for SIMD)
 static constexpr int kArrowAlignment = 64;
 
+// Align on 8-byte boundaries in IPC
+static constexpr int kArrowIpcAlignment = 8;
+
 static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};
 
 static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = 
kArrowAlignment) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 163b27b..bc07dc6 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -156,7 +156,7 @@ class RecordBatchSerializer : public ArrayVisitor {
       // The buffer might be null if we are handling zero row lengths.
       if (buffer) {
         size = buffer->size();
-        padding = BitUtil::RoundUpToMultipleOf64(size) - size;
+        padding = BitUtil::RoundUpToMultipleOf8(size) - size;
       }
 
       // TODO(wesm): We currently have no notion of shared memory page id's,
@@ -172,7 +172,7 @@ class RecordBatchSerializer : public ArrayVisitor {
     }
 
     *body_length = offset - buffer_start_offset_;
-    DCHECK(BitUtil::IsMultipleOf64(*body_length));
+    DCHECK(BitUtil::IsMultipleOf8(*body_length));
 
     return Status::OK();
   }
@@ -216,7 +216,7 @@ class RecordBatchSerializer : public ArrayVisitor {
       // The buffer might be null if we are handling zero row lengths.
       if (buffer) {
         size = buffer->size();
-        padding = BitUtil::RoundUpToMultipleOf64(size) - size;
+        padding = BitUtil::RoundUpToMultipleOf8(size) - size;
       }
 
       if (size > 0) {
@@ -251,7 +251,7 @@ class RecordBatchSerializer : public ArrayVisitor {
 
       // Send padding if it's available
       const int64_t buffer_length =
-          std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width),
+          std::min(BitUtil::RoundUpToMultipleOf8(array.length() * type_width),
                    data->size() - byte_offset);
       data = SliceBuffer(data, byte_offset, buffer_length);
     }
@@ -618,15 +618,7 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl 
{
   }
 
   virtual Status Start() {
-    std::shared_ptr<Buffer> schema_fb;
-    RETURN_NOT_OK(WriteSchemaMessage(*schema_, &dictionary_memo_, &schema_fb));
-
-    int32_t flatbuffer_size = static_cast<int32_t>(schema_fb->size());
-    RETURN_NOT_OK(
-        Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), 
sizeof(int32_t)));
-
-    // Write the flatbuffer
-    RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size));
+    RETURN_NOT_OK(WriteSchema());
 
     // If there are any dictionaries, write them as the next messages
     RETURN_NOT_OK(WriteDictionaries());
@@ -635,6 +627,17 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl 
{
     return Status::OK();
   }
 
+  Status WriteSchema() {
+    std::shared_ptr<Buffer> schema_fb;
+    RETURN_NOT_OK(WriteSchemaMessage(*schema_, &dictionary_memo_, &schema_fb));
+
+    int32_t metadata_length = 0;
+    RETURN_NOT_OK(WriteMessage(*schema_fb, sink_, &metadata_length));
+    RETURN_NOT_OK(UpdatePosition());
+    DCHECK_EQ(0, position_ % 8) << "WriteSchema did not perform an aligned 
write";
+    return Status::OK();
+  }
+
   virtual Status Close() {
     // Write the schema if not already written
     // User is responsible for closing the OutputStream
@@ -701,9 +704,9 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
                             &record_batches_[record_batches_.size() - 1]);
   }
 
-  // Adds padding bytes if necessary to ensure all memory blocks are written on
-  // 64-byte (or other alignment) boundaries.
-  Status Align(int64_t alignment = kArrowAlignment) {
+  Status Align(int64_t alignment = kArrowIpcAlignment) {
+    // Adds padding bytes if necessary to ensure all memory blocks are written 
on
+    // 8-byte (or other alignment) boundaries.
     int64_t remainder = PaddedLength(position_, alignment) - position_;
     if (remainder > 0) {
       return Write(kPaddingBytes, remainder);
@@ -774,7 +777,7 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl
     // It is only necessary to align to 8-byte boundary at the start of the 
file
     RETURN_NOT_OK(Write(reinterpret_cast<const uint8_t*>(kArrowMagicBytes),
                         strlen(kArrowMagicBytes)));
-    RETURN_NOT_OK(Align(8));
+    RETURN_NOT_OK(Align());
 
     // We write the schema at the start of the file (and the end). This also
     // writes all the dictionaries at the beginning of the file

http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/util/bit-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h
index fc360ba..5c3938a 100644
--- a/cpp/src/arrow/util/bit-util.h
+++ b/cpp/src/arrow/util/bit-util.h
@@ -217,13 +217,13 @@ static inline uint32_t RoundUpNumi64(uint32_t bits) { 
return (bits + 63) >> 6; }
 /// Returns the rounded down to 64 multiple.
 static inline uint32_t RoundDownNumi64(uint32_t bits) { return bits >> 6; }
 
-static inline int64_t RoundUpToMultipleOf64(int64_t num) {
+template <int64_t ROUND_TO>
+static inline int64_t RoundToPowerOfTwo(int64_t num) {
   // TODO(wesm): is this definitely needed?
   // DCHECK_GE(num, 0);
-  constexpr int64_t round_to = 64;
-  constexpr int64_t force_carry_addend = round_to - 1;
-  constexpr int64_t truncate_bitmask = ~(round_to - 1);
-  constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - 
round_to;
+  constexpr int64_t force_carry_addend = ROUND_TO - 1;
+  constexpr int64_t truncate_bitmask = ~(ROUND_TO - 1);
+  constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - 
ROUND_TO;
   if (num <= max_roundable_num) {
     return (num + force_carry_addend) & truncate_bitmask;
   }
@@ -231,6 +231,14 @@ static inline int64_t RoundUpToMultipleOf64(int64_t num) {
   return num;
 }
 
+static inline int64_t RoundUpToMultipleOf64(int64_t num) {
+  return RoundToPowerOfTwo<64>(num);
+}
+
+static inline int64_t RoundUpToMultipleOf8(int64_t num) {
+  return RoundToPowerOfTwo<8>(num);
+}
+
 /// Non hw accelerated pop count.
 /// TODO: we don't use this in any perf sensitive code paths currently.  There
 /// might be a much faster way to implement this.

http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index 89e69f9..998f7ed 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -21,6 +21,8 @@
 #include <cstdlib>
 #include <iostream>
 
+#include "arrow/util/macros.h"
+
 namespace arrow {
 
 // Stubbed versions of macros defined in glog/logging.h, intended for
@@ -127,9 +129,9 @@ class CerrLog {
 class FatalLog : public CerrLog {
  public:
   explicit FatalLog(int /* severity */)  // NOLINT
-      : CerrLog(ARROW_FATAL){}           // NOLINT
+      : CerrLog(ARROW_FATAL) {}          // NOLINT
 
-            [[noreturn]] ~FatalLog() {
+  ARROW_NORETURN ~FatalLog() {
     if (has_logged_) {
       std::cerr << std::endl;
     }

http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/cpp/src/arrow/util/macros.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/macros.h b/cpp/src/arrow/util/macros.h
index a2f704f..fe2d768 100644
--- a/cpp/src/arrow/util/macros.h
+++ b/cpp/src/arrow/util/macros.h
@@ -36,7 +36,13 @@
 #if defined(__GNUC__)
 #define ARROW_PREDICT_FALSE(x) (__builtin_expect(x, 0))
 #define ARROW_PREDICT_TRUE(x) (__builtin_expect(!!(x), 1))
+#define ARROW_NORETURN __attribute__((noreturn))
+#elif defined(_MSC_VER)
+#define ARROW_NORETURN __declspec(noreturn)
+#define ARROW_PREDICT_FALSE(x) x
+#define ARROW_PREDICT_TRUE(x) x
 #else
+#define ARROW_NORETURN
 #define ARROW_PREDICT_FALSE(x) x
 #define ARROW_PREDICT_TRUE(x) x
 #endif

http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/format/IPC.md
----------------------------------------------------------------------
diff --git a/format/IPC.md b/format/IPC.md
index 3fd234e..2f79031 100644
--- a/format/IPC.md
+++ b/format/IPC.md
@@ -27,7 +27,7 @@ Data components in the stream and file formats are 
represented as encapsulated
 * A length prefix indicating the metadata size
 * The message metadata as a [Flatbuffer][3]
 * Padding bytes to an 8-byte boundary
-* The message body
+* The message body, which must be a multiple of 8 bytes
 
 Schematically, we have:
 
@@ -38,6 +38,10 @@ Schematically, we have:
 <message body>
 ```
 
+The complete serialized message must be a multiple of 8 bytes so that messages
+can be relocated between streams. Otherwise the amount of padding between the
+metadata and the message body could be non-deterministic.
+
 The `metadata_size` includes the size of the flatbuffer plus padding. The
 `Message` flatbuffer includes a version number, the particular message (as a
 flatbuffer union), and the size of the message body:
@@ -154,6 +158,10 @@ struct Block {
 }
 ```
 
+The `metaDataLength` here includes the metadata length prefix, serialized
+metadata, and any additional padding bytes, and by construction must be a
+multiple of 8 bytes.
+
 Some notes about this
 
 * The `Block` offset indicates the starting byte of the record batch.

http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java 
b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java
index 06519bc..1d92d2b 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java
@@ -38,7 +38,7 @@ public class ArrowFileWriter extends ArrowWriter {
 
   @Override
   protected void startInternal(WriteChannel out) throws IOException {
-    ArrowMagic.writeMagic(out);
+    ArrowMagic.writeMagic(out, true);
   }
 
   @Override
@@ -54,7 +54,7 @@ public class ArrowFileWriter extends ArrowWriter {
     }
     out.writeIntLittleEndian(footerLength);
     LOGGER.debug(String.format("Footer starts at %d, length: %d", footerStart, 
footerLength));
-    ArrowMagic.writeMagic(out);
+    ArrowMagic.writeMagic(out, false);
     LOGGER.debug(String.format("magic written, now at %d", 
out.getCurrentPosition()));
   }
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java 
b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java
index 0d2da37..68313e7 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java
@@ -28,8 +28,11 @@ public class ArrowMagic {
 
   public static final int MAGIC_LENGTH = MAGIC.length;
 
-  public static void writeMagic(WriteChannel out) throws IOException {
+  public static void writeMagic(WriteChannel out, boolean align) throws 
IOException {
     out.write(MAGIC);
+    if (align) {
+      out.align();
+    }
   }
 
   public static boolean validateMagic(byte[] array) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
index d2f3782..c842d4c 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
@@ -168,6 +168,9 @@ public class ArrowRecordBatch implements ArrowMessage {
       ByteBuffer nioBuffer =
           buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes());
       size += nioBuffer.remaining();
+      if (size % 8 != 0) {
+        size += 8 - (size % 8);
+      }
     }
     return size;
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/e44ede87/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
index a70d029..f69aa41 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
@@ -78,12 +78,25 @@ public class MessageSerializer {
    * @throws IOException if something went wrong
    */
   public static long serialize(WriteChannel out, Schema schema) throws 
IOException {
+    long start = out.getCurrentPosition();
+    assert start % 8 == 0;
+
     FlatBufferBuilder builder = new FlatBufferBuilder();
     int schemaOffset = schema.getSchema(builder);
     ByteBuffer serializedMessage = serializeMessage(builder, 
MessageHeader.Schema, schemaOffset, 0);
-    long size = out.writeIntLittleEndian(serializedMessage.remaining());
-    size += out.write(serializedMessage);
-    return size;
+
+    int size = serializedMessage.remaining();
+    // ensure that message aligns to 8 byte padding - 4 bytes for size, then 
message body
+    if ((size + 4) % 8 != 0) {
+      size += 8 - (size + 4) % 8;
+    }
+
+    out.writeIntLittleEndian(size);
+    out.write(serializedMessage);
+    out.align(); // any bytes written are already captured by our size 
modification above
+
+    assert (size + 4) % 8 == 0;
+    return size + 4;
   }
 
   /**
@@ -120,6 +133,7 @@ public class MessageSerializer {
 
     long start = out.getCurrentPosition();
     int bodyLength = batch.computeBodyLength();
+    assert bodyLength % 8 == 0;
 
     FlatBufferBuilder builder = new FlatBufferBuilder();
     int batchOffset = batch.writeTo(builder);
@@ -141,6 +155,7 @@ public class MessageSerializer {
     out.align();
 
     long bufferLength = writeBatchBuffers(out, batch);
+    assert bufferLength % 8 == 0;
 
     // Metadata size in the Block account for the size prefix
     return new ArrowBlock(start, metadataLength + 4, bufferLength);
@@ -164,6 +179,7 @@ public class MessageSerializer {
             " != " + startPosition + layout.getSize());
       }
     }
+    out.align();
     return out.getCurrentPosition() - bufferStart;
   }
 
@@ -268,6 +284,7 @@ public class MessageSerializer {
   public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch 
batch) throws IOException {
     long start = out.getCurrentPosition();
     int bodyLength = batch.computeBodyLength();
+    assert bodyLength % 8 == 0;
 
     FlatBufferBuilder builder = new FlatBufferBuilder();
     int batchOffset = batch.writeTo(builder);
@@ -276,10 +293,10 @@ public class MessageSerializer {
 
     int metadataLength = serializedMessage.remaining();
 
-    // Add extra padding bytes so that length prefix + metadata is a multiple
-    // of 8 after alignment
-    if ((start + metadataLength + 4) % 8 != 0) {
-      metadataLength += 8 - (start + metadataLength + 4) % 8;
+    // calculate alignment bytes so that metadata length points to the correct 
location after alignment
+    int padding = (int) ((start + metadataLength + 4) % 8);
+    if (padding != 0) {
+      metadataLength += (8 - padding);
     }
 
     out.writeIntLittleEndian(metadataLength);
@@ -290,9 +307,10 @@ public class MessageSerializer {
 
     // write the embedded record batch
     long bufferLength = writeBatchBuffers(out, batch.getDictionary());
+    assert bufferLength % 8 == 0;
 
     // Metadata size in the Block account for the size prefix
-    return new ArrowBlock(start, metadataLength + 4, bufferLength + 8);
+    return new ArrowBlock(start, metadataLength + 4, bufferLength);
   }
 
   /**

Reply via email to