ARROW-459: [C++] Dictionary IPC support in file and stream formats

Also fixes ARROW-565

Author: Wes McKinney <wes.mckin...@twosigma.com>

Closes #347 from wesm/ARROW-459 and squashes the following commits:

6a987b7 [Wes McKinney] Fix clang warning with forward declaration
8e0e6fb [Wes McKinney] Fix bug causing valgrind failure
dee044e [Wes McKinney] Review comments
7ac756e [Wes McKinney] Fix Python build
e5cec27 [Wes McKinney] Add some less trivial dictionary-encoded arrays to test 
case
acfa994 [Wes McKinney] cpplint
ef9dea8 [Wes McKinney] More dictionary support in FileReader. Simple test passes
cb04a41 [Wes McKinney] Refactoring. Remove FileFooter class in favor of private 
impl in FileReader
1cee0ff [Wes McKinney] More progress toward file/stream roundtrips with 
dictionaries
ae389fa [Wes McKinney] WIP progress toward stream/file dictionary roundtrip
6858e12 [Wes McKinney] Add union and dictionary to file/stream tests
d537004 [Wes McKinney] Add support for deconstructing and reconstructing 
DictionaryArray with known schema


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/d28f1c1e
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/d28f1c1e
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/d28f1c1e

Branch: refs/heads/master
Commit: d28f1c1e0f21bc578b84ab4bed4cf259c333fbc9
Parents: 5e279f0
Author: Wes McKinney <wes.mckin...@twosigma.com>
Authored: Fri Feb 24 09:16:32 2017 -0500
Committer: Wes McKinney <wes.mckin...@twosigma.com>
Committed: Fri Feb 24 09:16:32 2017 -0500

----------------------------------------------------------------------
 cpp/CMakeLists.txt                       |   4 +-
 cpp/src/arrow/array.h                    |   2 +
 cpp/src/arrow/io/CMakeLists.txt          |   9 +-
 cpp/src/arrow/ipc/CMakeLists.txt         |  15 +-
 cpp/src/arrow/ipc/adapter.cc             | 189 ++++++++++------
 cpp/src/arrow/ipc/adapter.h              |  30 +--
 cpp/src/arrow/ipc/file.cc                | 306 +++++++++++++-------------
 cpp/src/arrow/ipc/file.h                 |  51 +----
 cpp/src/arrow/ipc/ipc-adapter-test.cc    |  46 ++--
 cpp/src/arrow/ipc/ipc-file-test.cc       |  78 +++----
 cpp/src/arrow/ipc/ipc-metadata-test.cc   |  17 +-
 cpp/src/arrow/ipc/metadata-internal.cc   | 239 ++++++++++++++++----
 cpp/src/arrow/ipc/metadata-internal.h    |  45 ++--
 cpp/src/arrow/ipc/metadata.cc            | 185 ++++++++++++++--
 cpp/src/arrow/ipc/metadata.h             |  97 ++++++--
 cpp/src/arrow/ipc/stream.cc              | 207 ++++++++++++-----
 cpp/src/arrow/ipc/stream.h               |  34 ++-
 cpp/src/arrow/ipc/test-common.h          |  80 +++++++
 cpp/src/arrow/type.cc                    |   6 +-
 cpp/src/arrow/type.h                     |  14 +-
 python/pyarrow/includes/libarrow_ipc.pxd |   1 -
 python/pyarrow/io.pyx                    |   5 -
 22 files changed, 1093 insertions(+), 567 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 0888a8b..b77f8c7 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -102,7 +102,9 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL 
"${CMAKE_CURRENT_SOURCE_DIR}")
     ON)
 endif()
 
-if(NOT ARROW_BUILD_TESTS)
+if(ARROW_BUILD_TESTS)
+  set(ARROW_BUILD_STATIC ON)
+else()
   set(NO_TESTS 1)
 endif()
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 32d156b..9bb06af 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -40,6 +40,8 @@ class Status;
 
 class ArrayVisitor {
  public:
+  virtual ~ArrayVisitor() = default;
+
   virtual Status Visit(const NullArray& array) = 0;
   virtual Status Visit(const BooleanArray& array) = 0;
   virtual Status Visit(const Int8Array& array) = 0;

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index b8882e4..ceb7b73 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -70,13 +70,8 @@ set(ARROW_IO_STATIC_PRIVATE_LINK_LIBS
   boost_system_static
   boost_filesystem_static)
 
-if (ARROW_BUILD_STATIC)
-  set(ARROW_IO_TEST_LINK_LIBS
-    arrow_io_static)
-else()
-  set(ARROW_IO_TEST_LINK_LIBS
-    arrow_io_shared)
-endif()
+set(ARROW_IO_TEST_LINK_LIBS
+  arrow_io_static)
 
 set(ARROW_IO_SRCS
   file.cc

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index c047f53..e7a3fdb 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -24,20 +24,9 @@ set(ARROW_IPC_SHARED_LINK_LIBS
   arrow_shared
 )
 
-set(ARROW_IPC_STATIC_LINK_LIBS
-  arrow_static
+set(ARROW_IPC_TEST_LINK_LIBS
   arrow_io_static
-)
-
-if (ARROW_BUILD_STATIC)
-  set(ARROW_IPC_TEST_LINK_LIBS
-    arrow_io_static
-    arrow_ipc_static)
-else()
-  set(ARROW_IPC_TEST_LINK_LIBS
-    arrow_io_shared
-    arrow_ipc_shared)
-endif()
+  arrow_ipc_static)
 
 set(ARROW_IPC_SRCS
   adapter.cc

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index a24c007..08ac983 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -51,12 +51,15 @@ namespace ipc {
 
 class RecordBatchWriter : public ArrayVisitor {
  public:
-  RecordBatchWriter(MemoryPool* pool, const RecordBatch& batch,
-      int64_t buffer_start_offset, int max_recursion_depth)
+  RecordBatchWriter(
+      MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth)
       : pool_(pool),
-        batch_(batch),
         max_recursion_depth_(max_recursion_depth),
-        buffer_start_offset_(buffer_start_offset) {}
+        buffer_start_offset_(buffer_start_offset) {
+    DCHECK_GT(max_recursion_depth, 0);
+  }
+
+  virtual ~RecordBatchWriter() = default;
 
   Status VisitArray(const Array& arr) {
     if (max_recursion_depth_ <= 0) {
@@ -81,7 +84,7 @@ class RecordBatchWriter : public ArrayVisitor {
     return arr.Accept(this);
   }
 
-  Status Assemble(int64_t* body_length) {
+  Status Assemble(const RecordBatch& batch, int64_t* body_length) {
     if (field_nodes_.size() > 0) {
       field_nodes_.clear();
       buffer_meta_.clear();
@@ -89,8 +92,8 @@ class RecordBatchWriter : public ArrayVisitor {
     }
 
     // Perform depth-first traversal of the row-batch
-    for (int i = 0; i < batch_.num_columns(); ++i) {
-      RETURN_NOT_OK(VisitArray(*batch_.column(i)));
+    for (int i = 0; i < batch.num_columns(); ++i) {
+      RETURN_NOT_OK(VisitArray(*batch.column(i)));
     }
 
     // The position for the start of a buffer relative to the passed frame of
@@ -127,16 +130,22 @@ class RecordBatchWriter : public ArrayVisitor {
     return Status::OK();
   }
 
-  Status WriteMetadata(
-      int64_t body_length, io::OutputStream* dst, int32_t* metadata_length) {
+  // Override this for writing dictionary metadata
+  virtual Status WriteMetadataMessage(
+      int32_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) {
+    return WriteRecordBatchMessage(
+        num_rows, body_length, field_nodes_, buffer_meta_, out);
+  }
+
+  Status WriteMetadata(int32_t num_rows, int64_t body_length, 
io::OutputStream* dst,
+      int32_t* metadata_length) {
     // Now that we have computed the locations of all of the buffers in shared
     // memory, the data header can be converted to a flatbuffer and written out
     //
     // Note: The memory written here is prefixed by the size of the flatbuffer
     // itself as an int32_t.
     std::shared_ptr<Buffer> metadata_fb;
-    RETURN_NOT_OK(WriteRecordBatchMetadata(
-        batch_.num_rows(), body_length, field_nodes_, buffer_meta_, 
&metadata_fb));
+    RETURN_NOT_OK(WriteMetadataMessage(num_rows, body_length, &metadata_fb));
 
     // Need to write 4 bytes (metadata size), the metadata, plus padding to
     // end on an 8-byte offset
@@ -166,15 +175,16 @@ class RecordBatchWriter : public ArrayVisitor {
     return Status::OK();
   }
 
-  Status Write(io::OutputStream* dst, int32_t* metadata_length, int64_t* 
body_length) {
-    RETURN_NOT_OK(Assemble(body_length));
+  Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t* 
metadata_length,
+      int64_t* body_length) {
+    RETURN_NOT_OK(Assemble(batch, body_length));
 
 #ifndef NDEBUG
     int64_t start_position, current_position;
     RETURN_NOT_OK(dst->Tell(&start_position));
 #endif
 
-    RETURN_NOT_OK(WriteMetadata(*body_length, dst, metadata_length));
+    RETURN_NOT_OK(WriteMetadata(batch.num_rows(), *body_length, dst, 
metadata_length));
 
 #ifndef NDEBUG
     RETURN_NOT_OK(dst->Tell(&current_position));
@@ -206,17 +216,17 @@ class RecordBatchWriter : public ArrayVisitor {
     return Status::OK();
   }
 
-  Status GetTotalSize(int64_t* size) {
+  Status GetTotalSize(const RecordBatch& batch, int64_t* size) {
     // emulates the behavior of Write without actually writing
     int32_t metadata_length = 0;
     int64_t body_length = 0;
     MockOutputStream dst;
-    RETURN_NOT_OK(Write(&dst, &metadata_length, &body_length));
+    RETURN_NOT_OK(Write(batch, &dst, &metadata_length, &body_length));
     *size = dst.GetExtentBytesWritten();
     return Status::OK();
   }
 
- private:
+ protected:
   Status Visit(const NullArray& array) override { return 
Status::NotImplemented("null"); }
 
   template <typename ArrayType>
@@ -468,15 +478,12 @@ class RecordBatchWriter : public ArrayVisitor {
   }
 
   Status Visit(const DictionaryArray& array) override {
-    // Dictionary written out separately
-    const auto& indices = static_cast<const 
PrimitiveArray&>(*array.indices().get());
-    buffers_.push_back(indices.data());
-    return Status::OK();
+    // Dictionary written out separately. Slice offset contained in the indices
+    return array.indices()->Accept(this);
   }
 
   // In some cases, intermediate buffers may need to be allocated (with sliced 
arrays)
   MemoryPool* pool_;
-  const RecordBatch& batch_;
 
   std::vector<flatbuf::FieldNode> field_nodes_;
   std::vector<flatbuf::Buffer> buffer_meta_;
@@ -486,17 +493,51 @@ class RecordBatchWriter : public ArrayVisitor {
   int64_t buffer_start_offset_;
 };
 
+class DictionaryWriter : public RecordBatchWriter {
+ public:
+  using RecordBatchWriter::RecordBatchWriter;
+
+  Status WriteMetadataMessage(
+      int32_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) 
override {
+    return WriteDictionaryMessage(
+        dictionary_id_, num_rows, body_length, field_nodes_, buffer_meta_, 
out);
+  }
+
+  Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
+      io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
+    dictionary_id_ = dictionary_id;
+
+    // Make a dummy record batch. A bit tedious as we have to make a schema
+    std::vector<std::shared_ptr<Field>> fields = {
+        arrow::field("dictionary", dictionary->type())};
+    auto schema = std::make_shared<Schema>(fields);
+    RecordBatch batch(schema, dictionary->length(), {dictionary});
+
+    return RecordBatchWriter::Write(batch, dst, metadata_length, body_length);
+  }
+
+ private:
+  // TODO(wesm): Setting this in Write is a bit unclean, but it works
+  int64_t dictionary_id_;
+};
+
 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
     io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
     MemoryPool* pool, int max_recursion_depth) {
-  DCHECK_GT(max_recursion_depth, 0);
-  RecordBatchWriter serializer(pool, batch, buffer_start_offset, 
max_recursion_depth);
-  return serializer.Write(dst, metadata_length, body_length);
+  RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth);
+  return writer.Write(batch, dst, metadata_length, body_length);
+}
+
+Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& 
dictionary,
+    int64_t buffer_start_offset, io::OutputStream* dst, int32_t* 
metadata_length,
+    int64_t* body_length, MemoryPool* pool) {
+  DictionaryWriter writer(pool, buffer_start_offset, kMaxIpcRecursionDepth);
+  return writer.Write(dictionary_id, dictionary, dst, metadata_length, 
body_length);
 }
 
 Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
-  RecordBatchWriter serializer(default_memory_pool(), batch, 0, 
kMaxIpcRecursionDepth);
-  RETURN_NOT_OK(serializer.GetTotalSize(size));
+  RecordBatchWriter writer(default_memory_pool(), 0, kMaxIpcRecursionDepth);
+  RETURN_NOT_OK(writer.GetTotalSize(batch, size));
   return Status::OK();
 }
 
@@ -580,10 +621,9 @@ class ArrayLoader : public TypeVisitor {
 
   Status LoadPrimitive(const DataType& type) {
     FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap;
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+    std::shared_ptr<Buffer> null_bitmap, data;
 
-    std::shared_ptr<Buffer> data;
+    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
     if (field_meta.length > 0) {
       RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &data));
     } else {
@@ -597,11 +637,9 @@ class ArrayLoader : public TypeVisitor {
   template <typename CONTAINER>
   Status LoadBinary() {
     FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap;
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+    std::shared_ptr<Buffer> null_bitmap, offsets, values;
 
-    std::shared_ptr<Buffer> offsets;
-    std::shared_ptr<Buffer> values;
+    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
     if (field_meta.length > 0) {
       RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets));
       RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values));
@@ -661,11 +699,9 @@ class ArrayLoader : public TypeVisitor {
 
   Status Visit(const ListType& type) override {
     FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap;
+    std::shared_ptr<Buffer> null_bitmap, offsets;
 
     RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-
-    std::shared_ptr<Buffer> offsets;
     if (field_meta.length > 0) {
       RETURN_NOT_OK(GetBuffer(context_->buffer_index, &offsets));
     } else {
@@ -715,12 +751,9 @@ class ArrayLoader : public TypeVisitor {
 
   Status Visit(const UnionType& type) override {
     FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap;
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-
-    std::shared_ptr<Buffer> type_ids = nullptr;
-    std::shared_ptr<Buffer> offsets = nullptr;
+    std::shared_ptr<Buffer> null_bitmap, type_ids, offsets;
 
+    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
     if (field_meta.length > 0) {
       RETURN_NOT_OK(GetBuffer(context_->buffer_index, &type_ids));
       if (type.mode == UnionMode::DENSE) {
@@ -738,13 +771,23 @@ class ArrayLoader : public TypeVisitor {
   }
 
   Status Visit(const DictionaryType& type) override {
-    return Status::NotImplemented("dictionary");
+    FieldMetadata field_meta;
+    std::shared_ptr<Buffer> null_bitmap, indices_data;
+    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+    RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &indices_data));
+
+    std::shared_ptr<Array> indices;
+    RETURN_NOT_OK(MakePrimitiveArray(type.index_type(), field_meta.length, 
indices_data,
+        null_bitmap, field_meta.null_count, 0, &indices));
+
+    result_ = std::make_shared<DictionaryArray>(field_.type, indices);
+    return Status::OK();
   };
 };
 
 class RecordBatchReader {
  public:
-  RecordBatchReader(const std::shared_ptr<RecordBatchMetadata>& metadata,
+  RecordBatchReader(const RecordBatchMetadata& metadata,
       const std::shared_ptr<Schema>& schema, int max_recursion_depth,
       io::ReadableFileInterface* file)
       : metadata_(metadata),
@@ -758,7 +801,7 @@ class RecordBatchReader {
     // The field_index and buffer_index are incremented in the ArrayLoader
     // based on how much of the batch is "consumed" (through nested data
     // reconstruction, for example)
-    context_.metadata = metadata_.get();
+    context_.metadata = &metadata_;
     context_.field_index = 0;
     context_.buffer_index = 0;
     context_.max_recursion_depth = max_recursion_depth_;
@@ -768,50 +811,58 @@ class RecordBatchReader {
       RETURN_NOT_OK(loader.Load(&arrays[i]));
     }
 
-    *out = std::make_shared<RecordBatch>(schema_, metadata_->length(), arrays);
+    *out = std::make_shared<RecordBatch>(schema_, metadata_.length(), arrays);
     return Status::OK();
   }
 
  private:
   RecordBatchContext context_;
-  std::shared_ptr<RecordBatchMetadata> metadata_;
+  const RecordBatchMetadata& metadata_;
   std::shared_ptr<Schema> schema_;
   int max_recursion_depth_;
   io::ReadableFileInterface* file_;
 };
 
-Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
-    io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* 
metadata) {
-  std::shared_ptr<Buffer> buffer;
-  RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));
-
-  int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
-
-  if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) {
-    std::stringstream ss;
-    ss << "flatbuffer size " << metadata_length << " invalid. File offset: " 
<< offset
-       << ", metadata length: " << metadata_length;
-    return Status::Invalid(ss.str());
-  }
-
-  std::shared_ptr<Message> message;
-  RETURN_NOT_OK(Message::Open(buffer, 4, &message));
-  *metadata = std::make_shared<RecordBatchMetadata>(message);
-  return Status::OK();
-}
-
-Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
     const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file,
     std::shared_ptr<RecordBatch>* out) {
   return ReadRecordBatch(metadata, schema, kMaxIpcRecursionDepth, file, out);
 }
 
-Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
     const std::shared_ptr<Schema>& schema, int max_recursion_depth,
     io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out) {
   RecordBatchReader reader(metadata, schema, max_recursion_depth, file);
   return reader.Read(out);
 }
 
+Status ReadDictionary(const DictionaryBatchMetadata& metadata,
+    const DictionaryTypeMap& dictionary_types, io::ReadableFileInterface* file,
+    std::shared_ptr<Array>* out) {
+  int64_t id = metadata.id();
+  auto it = dictionary_types.find(id);
+  if (it == dictionary_types.end()) {
+    std::stringstream ss;
+    ss << "Do not have type metadata for dictionary with id: " << id;
+    return Status::KeyError(ss.str());
+  }
+
+  std::vector<std::shared_ptr<Field>> fields = {it->second};
+
+  // We need a schema for the record batch
+  auto dummy_schema = std::make_shared<Schema>(fields);
+
+  // The dictionary is embedded in a record batch with a single column
+  std::shared_ptr<RecordBatch> batch;
+  RETURN_NOT_OK(ReadRecordBatch(metadata.record_batch(), dummy_schema, file, 
&batch));
+
+  if (batch->num_columns() != 1) {
+    return Status::Invalid("Dictionary record batch must only contain one 
field");
+  }
+
+  *out = batch->column(0);
+  return Status::OK();
+}
+
 }  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index 83542d0..b7d8fa9 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -25,6 +25,7 @@
 #include <memory>
 #include <vector>
 
+#include "arrow/ipc/metadata.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
@@ -44,8 +45,6 @@ class OutputStream;
 
 namespace ipc {
 
-class RecordBatchMetadata;
-
 // ----------------------------------------------------------------------
 // Write path
 // We have trouble decoding flatbuffers if the size i > 70, so 64 is a nice 
round number
@@ -72,34 +71,35 @@ constexpr int kMaxIpcRecursionDepth = 64;
 //
 // @param(out) body_length: the size of the contiguous buffer block plus
 // padding bytes
-Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch,
+Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
+    io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
+    MemoryPool* pool, int max_recursion_depth = kMaxIpcRecursionDepth);
+
+// Write Array as a DictionaryBatch message
+Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& 
dictionary,
     int64_t buffer_start_offset, io::OutputStream* dst, int32_t* 
metadata_length,
-    int64_t* body_length, MemoryPool* pool,
-    int max_recursion_depth = kMaxIpcRecursionDepth);
+    int64_t* body_length, MemoryPool* pool);
 
 // Compute the precise number of bytes needed in a contiguous memory segment to
 // write the record batch. This involves generating the complete serialized
 // Flatbuffers metadata.
-Status ARROW_EXPORT GetRecordBatchSize(const RecordBatch& batch, int64_t* 
size);
+Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
 
 // ----------------------------------------------------------------------
 // "Read" path; does not copy data if the input supports zero copy reads
 
-// Read the record batch flatbuffer metadata starting at the indicated file 
offset
-//
-// The flatbuffer is expected to be length-prefixed, so the metadata_length
-// includes at least the length prefix and the flatbuffer
-Status ARROW_EXPORT ReadRecordBatchMetadata(int64_t offset, int32_t 
metadata_length,
-    io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* 
metadata);
-
-Status ARROW_EXPORT ReadRecordBatch(const 
std::shared_ptr<RecordBatchMetadata>& metadata,
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
     const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file,
     std::shared_ptr<RecordBatch>* out);
 
-Status ARROW_EXPORT ReadRecordBatch(const 
std::shared_ptr<RecordBatchMetadata>& metadata,
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
     const std::shared_ptr<Schema>& schema, int max_recursion_depth,
     io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out);
 
+Status ReadDictionary(const DictionaryBatchMetadata& metadata,
+    const DictionaryTypeMap& dictionary_types, io::ReadableFileInterface* file,
+    std::shared_ptr<Array>* out);
+
 }  // namespace ipc
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc
index 3b18326..c1d483f 100644
--- a/cpp/src/arrow/ipc/file.cc
+++ b/cpp/src/arrow/ipc/file.cc
@@ -36,8 +36,6 @@ namespace arrow {
 namespace ipc {
 
 static constexpr const char* kArrowMagicBytes = "ARROW1";
-// ----------------------------------------------------------------------
-// File footer
 
 static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
 FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
@@ -51,11 +49,12 @@ FileBlocksToFlatbuffer(FBB& fbb, const 
std::vector<FileBlock>& blocks) {
 }
 
 Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& 
dictionaries,
-    const std::vector<FileBlock>& record_batches, io::OutputStream* out) {
+    const std::vector<FileBlock>& record_batches, DictionaryMemo* 
dictionary_memo,
+    io::OutputStream* out) {
   FBB fbb;
 
   flatbuffers::Offset<flatbuf::Schema> fb_schema;
-  RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema));
+  RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
 
   auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
   auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
@@ -74,87 +73,6 @@ static inline FileBlock FileBlockFromFlatbuffer(const 
flatbuf::Block* block) {
   return FileBlock(block->offset(), block->metaDataLength(), 
block->bodyLength());
 }
 
-class FileFooter::FileFooterImpl {
- public:
-  FileFooterImpl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Footer* 
footer)
-      : buffer_(buffer), footer_(footer) {}
-
-  int num_dictionaries() const { return footer_->dictionaries()->size(); }
-
-  int num_record_batches() const { return footer_->recordBatches()->size(); }
-
-  MetadataVersion::type version() const {
-    switch (footer_->version()) {
-      case flatbuf::MetadataVersion_V1:
-        return MetadataVersion::V1;
-      case flatbuf::MetadataVersion_V2:
-        return MetadataVersion::V2;
-      // Add cases as other versions become available
-      default:
-        return MetadataVersion::V2;
-    }
-  }
-
-  FileBlock record_batch(int i) const {
-    return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
-  }
-
-  FileBlock dictionary(int i) const {
-    return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
-  }
-
-  Status GetSchema(std::shared_ptr<Schema>* out) const {
-    auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, 
footer_->schema());
-    return schema_msg->GetSchema(out);
-  }
-
- private:
-  // Retain reference to memory
-  std::shared_ptr<Buffer> buffer_;
-
-  const flatbuf::Footer* footer_;
-};
-
-FileFooter::FileFooter() {}
-
-FileFooter::~FileFooter() {}
-
-Status FileFooter::Open(
-    const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) {
-  const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data());
-
-  *out = std::unique_ptr<FileFooter>(new FileFooter());
-
-  // TODO(wesm): Verify the footer
-  (*out)->impl_.reset(new FileFooterImpl(buffer, footer));
-
-  return Status::OK();
-}
-
-int FileFooter::num_dictionaries() const {
-  return impl_->num_dictionaries();
-}
-
-int FileFooter::num_record_batches() const {
-  return impl_->num_record_batches();
-}
-
-MetadataVersion::type FileFooter::version() const {
-  return impl_->version();
-}
-
-FileBlock FileFooter::record_batch(int i) const {
-  return impl_->record_batch(i);
-}
-
-FileBlock FileFooter::dictionary(int i) const {
-  return impl_->dictionary(i);
-}
-
-Status FileFooter::GetSchema(std::shared_ptr<Schema>* out) const {
-  return impl_->GetSchema(out);
-}
-
 // ----------------------------------------------------------------------
 // File writer implementation
 
@@ -171,22 +89,17 @@ Status FileWriter::Open(io::OutputStream* sink, const 
std::shared_ptr<Schema>& s
 Status FileWriter::Start() {
   RETURN_NOT_OK(WriteAligned(
       reinterpret_cast<const uint8_t*>(kArrowMagicBytes), 
strlen(kArrowMagicBytes)));
-  started_ = true;
-  return Status::OK();
-}
 
-Status FileWriter::WriteRecordBatch(const RecordBatch& batch) {
-  // Push an empty FileBlock
-  // Append metadata, to be written in the footer later
-  record_batches_.emplace_back(0, 0, 0);
-  return StreamWriter::WriteRecordBatch(
-      batch, &record_batches_[record_batches_.size() - 1]);
+  // We write the schema at the start of the file (and the end). This also
+  // writes all the dictionaries at the beginning of the file
+  return StreamWriter::Start();
 }
 
 Status FileWriter::Close() {
   // Write metadata
   int64_t initial_position = position_;
-  RETURN_NOT_OK(WriteFileFooter(*schema_, dictionaries_, record_batches_, 
sink_));
+  RETURN_NOT_OK(WriteFileFooter(
+      *schema_, dictionaries_, record_batches_, dictionary_memo_.get(), 
sink_));
   RETURN_NOT_OK(UpdatePosition());
 
   // Write footer length
@@ -204,89 +117,180 @@ Status FileWriter::Close() {
 // ----------------------------------------------------------------------
 // Reader implementation
 
-FileReader::FileReader(
-    const std::shared_ptr<io::ReadableFileInterface>& file, int64_t 
footer_offset)
-    : file_(file), footer_offset_(footer_offset) {}
+class FileReader::FileReaderImpl {
+ public:
+  FileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); }
 
-FileReader::~FileReader() {}
+  Status ReadFooter() {
+    int magic_size = static_cast<int>(strlen(kArrowMagicBytes));
 
-Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
-    std::shared_ptr<FileReader>* reader) {
-  int64_t footer_offset;
-  RETURN_NOT_OK(file->GetSize(&footer_offset));
-  return Open(file, footer_offset, reader);
-}
+    if (footer_offset_ <= magic_size * 2 + 4) {
+      std::stringstream ss;
+      ss << "File is too small: " << footer_offset_;
+      return Status::Invalid(ss.str());
+    }
 
-Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
-    int64_t footer_offset, std::shared_ptr<FileReader>* reader) {
-  *reader = std::shared_ptr<FileReader>(new FileReader(file, footer_offset));
-  return (*reader)->ReadFooter();
-}
+    std::shared_ptr<Buffer> buffer;
+    int file_end_size = magic_size + sizeof(int32_t);
+    RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, 
&buffer));
+
+    if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, 
magic_size)) {
+      return Status::Invalid("Not an Arrow file");
+    }
+
+    int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data());
+
+    if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > 
footer_offset_) {
+      return Status::Invalid("File is smaller than indicated metadata size");
+    }
 
-Status FileReader::ReadFooter() {
-  int magic_size = static_cast<int>(strlen(kArrowMagicBytes));
+    // Now read the footer
+    RETURN_NOT_OK(file_->ReadAt(
+        footer_offset_ - footer_length - file_end_size, footer_length, 
&footer_buffer_));
 
-  if (footer_offset_ <= magic_size * 2 + 4) {
-    std::stringstream ss;
-    ss << "File is too small: " << footer_offset_;
-    return Status::Invalid(ss.str());
+    // TODO(wesm): Verify the footer
+    footer_ = flatbuf::GetFooter(footer_buffer_->data());
+    schema_metadata_.reset(new SchemaMetadata(nullptr, footer_->schema()));
+
+    return Status::OK();
+  }
+
+  int num_dictionaries() const { return footer_->dictionaries()->size(); }
+
+  int num_record_batches() const { return footer_->recordBatches()->size(); }
+
+  MetadataVersion::type version() const {
+    switch (footer_->version()) {
+      case flatbuf::MetadataVersion_V1:
+        return MetadataVersion::V1;
+      case flatbuf::MetadataVersion_V2:
+        return MetadataVersion::V2;
+      // Add cases as other versions become available
+      default:
+        return MetadataVersion::V2;
+    }
+  }
+
+  FileBlock record_batch(int i) const {
+    return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
+  }
+
+  FileBlock dictionary(int i) const {
+    return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
   }
 
-  std::shared_ptr<Buffer> buffer;
-  int file_end_size = magic_size + sizeof(int32_t);
-  RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, 
&buffer));
+  const SchemaMetadata& schema_metadata() const { return *schema_metadata_; }
+
+  Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
+    DCHECK_GE(i, 0);
+    DCHECK_LT(i, num_record_batches());
+    FileBlock block = record_batch(i);
+
+    std::shared_ptr<Message> message;
+    RETURN_NOT_OK(
+        ReadMessage(block.offset, block.metadata_length, file_.get(), 
&message));
+    auto metadata = std::make_shared<RecordBatchMetadata>(message);
 
-  if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) {
-    return Status::Invalid("Not an Arrow file");
+    // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
+    // ARROW-384).
+    std::shared_ptr<Buffer> buffer_block;
+    RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
+    io::BufferReader reader(buffer_block);
+
+    return ReadRecordBatch(*metadata, schema_, &reader, batch);
   }
 
-  int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data());
+  Status ReadSchema() {
+    RETURN_NOT_OK(schema_metadata_->GetDictionaryTypes(&dictionary_fields_));
+
+    // Read all the dictionaries
+    for (int i = 0; i < num_dictionaries(); ++i) {
+      FileBlock block = dictionary(i);
+      std::shared_ptr<Message> message;
+      RETURN_NOT_OK(
+          ReadMessage(block.offset, block.metadata_length, file_.get(), 
&message));
+
+      // TODO(wesm): ARROW-577: This code is duplicated, can be fixed with a 
more
+      // invasive refactor
+      DictionaryBatchMetadata metadata(message);
+
+      // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
+      // ARROW-384).
+      std::shared_ptr<Buffer> buffer_block;
+      RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
+      io::BufferReader reader(buffer_block);
+
+      std::shared_ptr<Array> dictionary;
+      RETURN_NOT_OK(ReadDictionary(metadata, dictionary_fields_, &reader, 
&dictionary));
+      RETURN_NOT_OK(dictionary_memo_->AddDictionary(metadata.id(), 
dictionary));
+    }
 
-  if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > 
footer_offset_) {
-    return Status::Invalid("File is smaller than indicated metadata size");
+    // Get the schema
+    return schema_metadata_->GetSchema(*dictionary_memo_, &schema_);
   }
 
-  // Now read the footer
-  RETURN_NOT_OK(file_->ReadAt(
-      footer_offset_ - footer_length - file_end_size, footer_length, &buffer));
-  RETURN_NOT_OK(FileFooter::Open(buffer, &footer_));
+  Status Open(
+      const std::shared_ptr<io::ReadableFileInterface>& file, int64_t 
footer_offset) {
+    file_ = file;
+    footer_offset_ = footer_offset;
+    RETURN_NOT_OK(ReadFooter());
+    return ReadSchema();
+  }
+
+  std::shared_ptr<Schema> schema() const { return schema_; }
+
+ private:
+  std::shared_ptr<io::ReadableFileInterface> file_;
 
-  // Get the schema
-  return footer_->GetSchema(&schema_);
+  // The location where the Arrow file layout ends. May be the end of the file
+  // or some other location if embedded in a larger file.
+  int64_t footer_offset_;
+
+  // Footer metadata
+  std::shared_ptr<Buffer> footer_buffer_;
+  const flatbuf::Footer* footer_;
+  std::unique_ptr<SchemaMetadata> schema_metadata_;
+
+  DictionaryTypeMap dictionary_fields_;
+  std::shared_ptr<DictionaryMemo> dictionary_memo_;
+
+  // Reconstructed schema, including any read dictionaries
+  std::shared_ptr<Schema> schema_;
+};
+
+FileReader::FileReader() {
+  impl_.reset(new FileReaderImpl());
 }
 
-std::shared_ptr<Schema> FileReader::schema() const {
-  return schema_;
+FileReader::~FileReader() {}
+
+Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+    std::shared_ptr<FileReader>* reader) {
+  int64_t footer_offset;
+  RETURN_NOT_OK(file->GetSize(&footer_offset));
+  return Open(file, footer_offset, reader);
+}
+
+Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+    int64_t footer_offset, std::shared_ptr<FileReader>* reader) {
+  *reader = std::shared_ptr<FileReader>(new FileReader());
+  return (*reader)->impl_->Open(file, footer_offset);
 }
 
-int FileReader::num_dictionaries() const {
-  return footer_->num_dictionaries();
+std::shared_ptr<Schema> FileReader::schema() const {
+  return impl_->schema();
 }
 
 int FileReader::num_record_batches() const {
-  return footer_->num_record_batches();
+  return impl_->num_record_batches();
 }
 
 MetadataVersion::type FileReader::version() const {
-  return footer_->version();
+  return impl_->version();
 }
 
 Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
-  DCHECK_GE(i, 0);
-  DCHECK_LT(i, num_record_batches());
-  FileBlock block = footer_->record_batch(i);
-
-  std::shared_ptr<RecordBatchMetadata> metadata;
-  RETURN_NOT_OK(ReadRecordBatchMetadata(
-      block.offset, block.metadata_length, file_.get(), &metadata));
-
-  // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
-  // ARROW-384).
-  std::shared_ptr<Buffer> buffer_block;
-  RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
-  io::BufferReader reader(buffer_block);
-
-  return ReadRecordBatch(metadata, schema_, &reader, batch);
+  return impl_->GetRecordBatch(i, batch);
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.h b/cpp/src/arrow/ipc/file.h
index cf0baab..524766c 100644
--- a/cpp/src/arrow/ipc/file.h
+++ b/cpp/src/arrow/ipc/file.h
@@ -45,45 +45,21 @@ class ReadableFileInterface;
 namespace ipc {
 
 Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& 
dictionaries,
-    const std::vector<FileBlock>& record_batches, io::OutputStream* out);
-
-class ARROW_EXPORT FileFooter {
- public:
-  ~FileFooter();
-
-  static Status Open(
-      const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out);
-
-  int num_dictionaries() const;
-  int num_record_batches() const;
-  MetadataVersion::type version() const;
-
-  FileBlock record_batch(int i) const;
-  FileBlock dictionary(int i) const;
-
-  Status GetSchema(std::shared_ptr<Schema>* out) const;
-
- private:
-  FileFooter();
-  class FileFooterImpl;
-  std::unique_ptr<FileFooterImpl> impl_;
-};
+    const std::vector<FileBlock>& record_batches, DictionaryMemo* 
dictionary_memo,
+    io::OutputStream* out);
 
 class ARROW_EXPORT FileWriter : public StreamWriter {
  public:
   static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& 
schema,
       std::shared_ptr<FileWriter>* out);
 
-  Status WriteRecordBatch(const RecordBatch& batch) override;
+  using StreamWriter::WriteRecordBatch;
   Status Close() override;
 
  private:
   FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
 
   Status Start() override;
-
-  std::vector<FileBlock> dictionaries_;
-  std::vector<FileBlock> record_batches_;
 };
 
 class ARROW_EXPORT FileReader {
@@ -108,13 +84,9 @@ class ARROW_EXPORT FileReader {
   static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
       int64_t footer_offset, std::shared_ptr<FileReader>* reader);
 
+  /// The schema includes any dictionaries
   std::shared_ptr<Schema> schema() const;
 
-  // Shared dictionaries for dictionary-encoding cross record batches
-  // TODO(wesm): Implement dictionary reading when we also have dictionary
-  // encoding
-  int num_dictionaries() const;
-
   int num_record_batches() const;
 
   MetadataVersion::type version() const;
@@ -127,19 +99,10 @@ class ARROW_EXPORT FileReader {
   Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
 
  private:
-  FileReader(
-      const std::shared_ptr<io::ReadableFileInterface>& file, int64_t 
footer_offset);
-
-  Status ReadFooter();
-
-  std::shared_ptr<io::ReadableFileInterface> file_;
-
-  // The location where the Arrow file layout ends. May be the end of the file
-  // or some other location if embedded in a larger file.
-  int64_t footer_offset_;
+  FileReader();
 
-  std::unique_ptr<FileFooter> footer_;
-  std::shared_ptr<Schema> schema_;
+  class ARROW_NO_EXPORT FileReaderImpl;
+  std::unique_ptr<FileReaderImpl> impl_;
 };
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc 
b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index d11b95b..8999363 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -27,6 +27,7 @@
 #include "arrow/io/memory.h"
 #include "arrow/io/test-common.h"
 #include "arrow/ipc/adapter.h"
+#include "arrow/ipc/metadata.h"
 #include "arrow/ipc/test-common.h"
 #include "arrow/ipc/util.h"
 
@@ -40,12 +41,8 @@
 namespace arrow {
 namespace ipc {
 
-class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
-                             public io::MemoryMapFixture {
+class IpcTestFixture : public io::MemoryMapFixture {
  public:
-  void SetUp() { pool_ = default_memory_pool(); }
-  void TearDown() { io::MemoryMapFixture::TearDown(); }
-
   Status RoundTripHelper(const RecordBatch& batch, int memory_map_size,
       std::shared_ptr<RecordBatch>* batch_result) {
     std::string path = "test-write-row-batch";
@@ -59,8 +56,9 @@ class TestWriteRecordBatch : public 
::testing::TestWithParam<MakeRecordBatch*>,
     RETURN_NOT_OK(WriteRecordBatch(
         batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, 
pool_));
 
-    std::shared_ptr<RecordBatchMetadata> metadata;
-    RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), 
&metadata));
+    std::shared_ptr<Message> message;
+    RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
+    auto metadata = std::make_shared<RecordBatchMetadata>(message);
 
     // The buffer offsets start at 0, so we must construct a
     // ReadableFileInterface according to that frame of reference
@@ -68,7 +66,7 @@ class TestWriteRecordBatch : public 
::testing::TestWithParam<MakeRecordBatch*>,
     RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, 
&buffer_payload));
     io::BufferReader buffer_reader(buffer_payload);
 
-    return ReadRecordBatch(metadata, batch.schema(), &buffer_reader, 
batch_result);
+    return ReadRecordBatch(*metadata, batch.schema(), &buffer_reader, 
batch_result);
   }
 
   void CheckRoundtrip(const RecordBatch& batch, int64_t buffer_size) {
@@ -112,14 +110,29 @@ class TestWriteRecordBatch : public 
::testing::TestWithParam<MakeRecordBatch*>,
   MemoryPool* pool_;
 };
 
-TEST_P(TestWriteRecordBatch, RoundTrip) {
+class TestWriteRecordBatch : public ::testing::Test, public IpcTestFixture {
+ public:
+  void SetUp() { pool_ = default_memory_pool(); }
+  void TearDown() { io::MemoryMapFixture::TearDown(); }
+};
+
+class TestRecordBatchParam : public ::testing::TestWithParam<MakeRecordBatch*>,
+                             public IpcTestFixture {
+ public:
+  void SetUp() { pool_ = default_memory_pool(); }
+  void TearDown() { io::MemoryMapFixture::TearDown(); }
+  using IpcTestFixture::RoundTripHelper;
+  using IpcTestFixture::CheckRoundtrip;
+};
+
+TEST_P(TestRecordBatchParam, RoundTrip) {
   std::shared_ptr<RecordBatch> batch;
   ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
 
   CheckRoundtrip(*batch, 1 << 20);
 }
 
-TEST_P(TestWriteRecordBatch, SliceRoundTrip) {
+TEST_P(TestRecordBatchParam, SliceRoundTrip) {
   std::shared_ptr<RecordBatch> batch;
   ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
 
@@ -130,7 +143,7 @@ TEST_P(TestWriteRecordBatch, SliceRoundTrip) {
   CheckRoundtrip(*sliced_batch, 1 << 20);
 }
 
-TEST_P(TestWriteRecordBatch, ZeroLengthArrays) {
+TEST_P(TestRecordBatchParam, ZeroLengthArrays) {
   std::shared_ptr<RecordBatch> batch;
   ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
 
@@ -159,10 +172,10 @@ TEST_P(TestWriteRecordBatch, ZeroLengthArrays) {
 }
 
 INSTANTIATE_TEST_CASE_P(
-    RoundTripTests, TestWriteRecordBatch,
+    RoundTripTests, TestRecordBatchParam,
     ::testing::Values(&MakeIntRecordBatch, &MakeStringTypesRecordBatch,
         &MakeNonNullRecordBatch, &MakeZeroLengthRecordBatch, 
&MakeListRecordBatch,
-        &MakeDeeplyNestedList, &MakeStruct, &MakeUnion));
+        &MakeDeeplyNestedList, &MakeStruct, &MakeUnion, &MakeDictionary));
 
 void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
   ipc::MockOutputStream mock;
@@ -251,8 +264,9 @@ TEST_F(RecursionLimits, ReadLimit) {
   std::shared_ptr<Schema> schema;
   ASSERT_OK(WriteToMmap(64, true, &metadata_length, &body_length, &schema));
 
-  std::shared_ptr<RecordBatchMetadata> metadata;
-  ASSERT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), 
&metadata));
+  std::shared_ptr<Message> message;
+  ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
+  auto metadata = std::make_shared<RecordBatchMetadata>(message);
 
   std::shared_ptr<Buffer> payload;
   ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
@@ -260,7 +274,7 @@ TEST_F(RecursionLimits, ReadLimit) {
   io::BufferReader reader(payload);
 
   std::shared_ptr<RecordBatch> batch;
-  ASSERT_RAISES(Invalid, ReadRecordBatch(metadata, schema, &reader, &batch));
+  ASSERT_RAISES(Invalid, ReadRecordBatch(*metadata, schema, &reader, &batch));
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc 
b/cpp/src/arrow/ipc/ipc-file-test.cc
index 7cd8054..4b82aab 100644
--- a/cpp/src/arrow/ipc/ipc-file-test.cc
+++ b/cpp/src/arrow/ipc/ipc-file-test.cc
@@ -180,72 +180,44 @@ TEST_P(TestStreamFormat, RoundTrip) {
 #define BATCH_CASES()                                                          
         \
   ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, 
&MakeNonNullRecordBatch, \
       &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, 
&MakeStringTypesRecordBatch,   \
-      &MakeStruct);
+      &MakeStruct, &MakeDictionary);
 
 INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
 INSTANTIATE_TEST_CASE_P(StreamRoundTripTests, TestStreamFormat, BATCH_CASES());
 
-class TestFileFooter : public ::testing::Test {
- public:
-  void SetUp() {}
-
-  void CheckRoundtrip(const Schema& schema, const std::vector<FileBlock>& 
dictionaries,
-      const std::vector<FileBlock>& record_batches) {
-    auto buffer = std::make_shared<PoolBuffer>();
-    io::BufferOutputStream stream(buffer);
-
-    ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream));
-
-    std::unique_ptr<FileFooter> footer;
-    ASSERT_OK(FileFooter::Open(buffer, &footer));
-
-    ASSERT_EQ(MetadataVersion::V2, footer->version());
+void CheckBatchDictionaries(const RecordBatch& batch) {
+  // Check that dictionaries that should be the same are the same
+  auto schema = batch.schema();
 
-    // Check schema
-    std::shared_ptr<Schema> schema2;
-    ASSERT_OK(footer->GetSchema(&schema2));
-    AssertSchemaEqual(schema, *schema2);
+  const auto& t0 = static_cast<const DictionaryType&>(*schema->field(0)->type);
+  const auto& t1 = static_cast<const DictionaryType&>(*schema->field(1)->type);
 
-    // Check blocks
-    ASSERT_EQ(dictionaries.size(), footer->num_dictionaries());
-    ASSERT_EQ(record_batches.size(), footer->num_record_batches());
+  ASSERT_EQ(t0.dictionary().get(), t1.dictionary().get());
 
-    for (int i = 0; i < footer->num_dictionaries(); ++i) {
-      CheckBlocks(dictionaries[i], footer->dictionary(i));
-    }
-
-    for (int i = 0; i < footer->num_record_batches(); ++i) {
-      CheckBlocks(record_batches[i], footer->record_batch(i));
-    }
-  }
+  // Same dictionary used for list values
+  const auto& t3 = static_cast<const ListType&>(*schema->field(3)->type);
+  const auto& t3_value = static_cast<const DictionaryType&>(*t3.value_type());
+  ASSERT_EQ(t0.dictionary().get(), t3_value.dictionary().get());
+}
 
-  void CheckBlocks(const FileBlock& left, const FileBlock& right) {
-    ASSERT_EQ(left.offset, right.offset);
-    ASSERT_EQ(left.metadata_length, right.metadata_length);
-    ASSERT_EQ(left.body_length, right.body_length);
-  }
+TEST_F(TestStreamFormat, DictionaryRoundTrip) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK(MakeDictionary(&batch));
 
- private:
-  std::shared_ptr<Schema> example_schema_;
-};
+  std::vector<std::shared_ptr<RecordBatch>> out_batches;
+  ASSERT_OK(RoundTripHelper(*batch, &out_batches));
 
-TEST_F(TestFileFooter, Basics) {
-  auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
-  auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>());
-  Schema schema({f0, f1});
+  CheckBatchDictionaries(*out_batches[0]);
+}
 
-  std::vector<FileBlock> dictionaries;
-  dictionaries.emplace_back(8, 92, 900);
-  dictionaries.emplace_back(1000, 100, 1900);
-  dictionaries.emplace_back(3000, 100, 2900);
+TEST_F(TestFileFormat, DictionaryRoundTrip) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK(MakeDictionary(&batch));
 
-  std::vector<FileBlock> record_batches;
-  record_batches.emplace_back(6000, 100, 900);
-  record_batches.emplace_back(7000, 100, 1900);
-  record_batches.emplace_back(9000, 100, 2900);
-  record_batches.emplace_back(12000, 100, 3900);
+  std::vector<std::shared_ptr<RecordBatch>> out_batches;
+  ASSERT_OK(RoundTripHelper({batch}, &out_batches));
 
-  CheckRoundtrip(schema, dictionaries, record_batches);
+  CheckBatchDictionaries(*out_batches[0]);
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/ipc-metadata-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc 
b/cpp/src/arrow/ipc/ipc-metadata-test.cc
index 098f996..4fb3204 100644
--- a/cpp/src/arrow/ipc/ipc-metadata-test.cc
+++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc
@@ -22,6 +22,7 @@
 #include "gtest/gtest.h"
 
 #include "arrow/io/memory.h"
+#include "arrow/ipc/metadata-internal.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/test-common.h"
 #include "arrow/schema.h"
@@ -39,9 +40,9 @@ class TestSchemaMetadata : public ::testing::Test {
  public:
   void SetUp() {}
 
-  void CheckRoundtrip(const Schema& schema) {
+  void CheckRoundtrip(const Schema& schema, DictionaryMemo* memo) {
     std::shared_ptr<Buffer> buffer;
-    ASSERT_OK(WriteSchema(schema, &buffer));
+    ASSERT_OK(WriteSchemaMessage(schema, memo, &buffer));
 
     std::shared_ptr<Message> message;
     ASSERT_OK(Message::Open(buffer, 0, &message));
@@ -51,8 +52,10 @@ class TestSchemaMetadata : public ::testing::Test {
     auto schema_msg = std::make_shared<SchemaMetadata>(message);
     ASSERT_EQ(schema.num_fields(), schema_msg->num_fields());
 
+    DictionaryMemo empty_memo;
+
     std::shared_ptr<Schema> schema2;
-    ASSERT_OK(schema_msg->GetSchema(&schema2));
+    ASSERT_OK(schema_msg->GetSchema(empty_memo, &schema2));
 
     AssertSchemaEqual(schema, *schema2);
   }
@@ -74,7 +77,9 @@ TEST_F(TestSchemaMetadata, PrimitiveFields) {
   auto f10 = std::make_shared<Field>("f10", std::make_shared<BooleanType>());
 
   Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10});
-  CheckRoundtrip(schema);
+  DictionaryMemo memo;
+
+  CheckRoundtrip(schema, &memo);
 }
 
 TEST_F(TestSchemaMetadata, NestedFields) {
@@ -86,7 +91,9 @@ TEST_F(TestSchemaMetadata, NestedFields) {
   auto f1 = std::make_shared<Field>("f1", type2);
 
   Schema schema({f0, f1});
-  CheckRoundtrip(schema);
+  DictionaryMemo memo;
+
+  CheckRoundtrip(schema, &memo);
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc 
b/cpp/src/arrow/ipc/metadata-internal.cc
index cd77220..7c8ddb9 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -25,6 +25,7 @@
 
 #include "flatbuffers/flatbuffers.h"
 
+#include "arrow/array.h"
 #include "arrow/buffer.h"
 #include "arrow/ipc/Message_generated.h"
 #include "arrow/schema.h"
@@ -115,8 +116,8 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const 
void* type_data,
 }
 
 // Forward declaration
-static Status FieldToFlatbuffer(
-    FBB& fbb, const std::shared_ptr<Field>& field, FieldOffset* offset);
+static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
+    DictionaryMemo* dictionary_memo, FieldOffset* offset);
 
 static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) {
   return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union();
@@ -126,34 +127,73 @@ static Offset FloatToFlatbuffer(FBB& fbb, 
flatbuf::Precision precision) {
   return flatbuf::CreateFloatingPoint(fbb, precision).Union();
 }
 
-static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
-    std::vector<FieldOffset>* out_children, Offset* offset) {
+static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& 
type,
+    std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo) {
   FieldOffset field;
-  RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(0), &field));
-  out_children->push_back(field);
+  for (int i = 0; i < type->num_children(); ++i) {
+    RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), dictionary_memo, 
&field));
+    out_children->push_back(field);
+  }
+  return Status::OK();
+}
+
+static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+    std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
+    Offset* offset) {
+  RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
   *offset = flatbuf::CreateList(fbb).Union();
   return Status::OK();
 }
 
 static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& 
type,
-    std::vector<FieldOffset>* out_children, Offset* offset) {
-  FieldOffset field;
-  for (int i = 0; i < type->num_children(); ++i) {
-    RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), &field));
-    out_children->push_back(field);
-  }
+    std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
+    Offset* offset) {
+  RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
   *offset = flatbuf::CreateStruct_(fbb).Union();
   return Status::OK();
 }
 
+static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr<DataType>& 
type,
+    std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
+    Offset* offset) {
+  RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
+
+  const auto& union_type = static_cast<const UnionType&>(*type);
+
+  flatbuf::UnionMode mode = union_type.mode == UnionMode::SPARSE
+                                ? flatbuf::UnionMode_Sparse
+                                : flatbuf::UnionMode_Dense;
+
+  std::vector<int32_t> type_ids;
+  type_ids.reserve(union_type.type_codes.size());
+  for (uint8_t code : union_type.type_codes) {
+    type_ids.push_back(code);
+  }
+
+  auto fb_type_ids = fbb.CreateVector(type_ids);
+
+  *offset = flatbuf::CreateUnion(fbb, mode, fb_type_ids).Union();
+  return Status::OK();
+}
+
 #define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED)            \
   *out_type = flatbuf::Type_Int;                        \
   *offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \
   break;
 
+// TODO(wesm): Convert this to visitor pattern
 static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
     std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* 
layout,
-    flatbuf::Type* out_type, Offset* offset) {
+    flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, Offset* offset) {
+  if (type->type == Type::DICTIONARY) {
+    // In this library, the dictionary "type" is a logical construct. Here we
+    // pass through to the value type, as we've already captured the index
+    // type in the DictionaryEncoding metadata in the parent field
+    const auto& dict_type = static_cast<const DictionaryType&>(*type);
+    return TypeToFlatbuffer(fbb, dict_type.dictionary()->type(), children, 
layout,
+        out_type, dictionary_memo, offset);
+  }
+
   std::vector<BufferDescr> buffer_layout = type->GetBufferLayout();
   for (const BufferDescr& descr : buffer_layout) {
     flatbuf::VectorType vector_type;
@@ -217,10 +257,13 @@ static Status TypeToFlatbuffer(FBB& fbb, const 
std::shared_ptr<DataType>& type,
       break;
     case Type::LIST:
       *out_type = flatbuf::Type_List;
-      return ListToFlatbuffer(fbb, type, children, offset);
+      return ListToFlatbuffer(fbb, type, children, dictionary_memo, offset);
     case Type::STRUCT:
       *out_type = flatbuf::Type_Struct_;
-      return StructToFlatbuffer(fbb, type, children, offset);
+      return StructToFlatbuffer(fbb, type, children, dictionary_memo, offset);
+    case Type::UNION:
+      *out_type = flatbuf::Type_Union;
+      return UnionToFlatBuffer(fbb, type, children, dictionary_memo, offset);
     default:
       *out_type = flatbuf::Type_NONE;  // Make clang-tidy happy
       std::stringstream ss;
@@ -230,35 +273,63 @@ static Status TypeToFlatbuffer(FBB& fbb, const 
std::shared_ptr<DataType>& type,
   return Status::OK();
 }
 
-static Status FieldToFlatbuffer(
-    FBB& fbb, const std::shared_ptr<Field>& field, FieldOffset* offset) {
+using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>;
+
+static DictionaryOffset GetDictionaryEncoding(
+    FBB& fbb, const DictionaryType& type, DictionaryMemo* memo) {
+  int64_t dictionary_id = memo->GetId(type.dictionary());
+
+  // We assume that the dictionary index type (as an integer) has already been
+  // validated elsewhere, and can safely assume we are dealing with signed
+  // integers
+  const auto& fw_index_type = static_cast<const 
FixedWidthType&>(*type.index_type());
+
+  auto index_type_offset = flatbuf::CreateInt(fbb, fw_index_type.bit_width(), 
true);
+
+  // TODO(wesm): ordered dictionaries
+  return flatbuf::CreateDictionaryEncoding(fbb, dictionary_id, 
index_type_offset);
+}
+
+static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
+    DictionaryMemo* dictionary_memo, FieldOffset* offset) {
   auto fb_name = fbb.CreateString(field->name);
 
   flatbuf::Type type_enum;
-  Offset type_data;
+  Offset type_offset;
   Offset type_layout;
   std::vector<FieldOffset> children;
   std::vector<VectorLayoutOffset> layout;
 
-  RETURN_NOT_OK(
-      TypeToFlatbuffer(fbb, field->type, &children, &layout, &type_enum, 
&type_data));
+  RETURN_NOT_OK(TypeToFlatbuffer(
+      fbb, field->type, &children, &layout, &type_enum, dictionary_memo, 
&type_offset));
   auto fb_children = fbb.CreateVector(children);
   auto fb_layout = fbb.CreateVector(layout);
 
+  DictionaryOffset dictionary = 0;
+  if (field->type->type == Type::DICTIONARY) {
+    dictionary = GetDictionaryEncoding(
+        fbb, static_cast<const DictionaryType&>(*field->type), 
dictionary_memo);
+  }
+
   // TODO: produce the list of VectorTypes
-  *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, 
type_data,
-      field->dictionary, fb_children, fb_layout);
+  *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, 
type_offset,
+      dictionary, fb_children, fb_layout);
 
   return Status::OK();
 }
 
-Status FieldFromFlatbuffer(const flatbuf::Field* field, 
std::shared_ptr<Field>* out) {
-  std::shared_ptr<DataType> type;
+Status FieldFromFlatbufferDictionary(
+    const flatbuf::Field* field, std::shared_ptr<Field>* out) {
+  // Need an empty memo to pass down for constructing children
+  DictionaryMemo dummy_memo;
+
+  // Any DictionaryEncoding set is ignored here
 
+  std::shared_ptr<DataType> type;
   auto children = field->children();
   std::vector<std::shared_ptr<Field>> child_fields(children->size());
   for (size_t i = 0; i < children->size(); ++i) {
-    RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), &child_fields[i]));
+    RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), dummy_memo, 
&child_fields[i]));
   }
 
   RETURN_NOT_OK(
@@ -268,6 +339,39 @@ Status FieldFromFlatbuffer(const flatbuf::Field* field, 
std::shared_ptr<Field>*
   return Status::OK();
 }
 
+Status FieldFromFlatbuffer(const flatbuf::Field* field,
+    const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out) {
+  std::shared_ptr<DataType> type;
+
+  const flatbuf::DictionaryEncoding* encoding = field->dictionary();
+
+  if (encoding == nullptr) {
+    // The field is not dictionary encoded. We must potentially visit its
+    // children to fully reconstruct the data type
+    auto children = field->children();
+    std::vector<std::shared_ptr<Field>> child_fields(children->size());
+    for (size_t i = 0; i < children->size(); ++i) {
+      RETURN_NOT_OK(
+          FieldFromFlatbuffer(children->Get(i), dictionary_memo, 
&child_fields[i]));
+    }
+    RETURN_NOT_OK(
+        TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, 
&type));
+  } else {
+    // The field is dictionary encoded. The type of the dictionary values has
+    // been determined elsewhere, and is stored in the DictionaryMemo. Here we
+    // construct the logical DictionaryType object
+
+    std::shared_ptr<Array> dictionary;
+    RETURN_NOT_OK(dictionary_memo.GetDictionary(encoding->id(), &dictionary));
+
+    std::shared_ptr<DataType> index_type;
+    RETURN_NOT_OK(IntFromFlatbuffer(encoding->indexType(), &index_type));
+    type = std::make_shared<DictionaryType>(index_type, dictionary);
+  }
+  *out = std::make_shared<Field>(field->name()->str(), type, 
field->nullable());
+  return Status::OK();
+}
+
 // Implement MessageBuilder
 
 // will return the endianness of the system we are running on
@@ -281,13 +385,13 @@ flatbuf::Endianness endianness() {
   return bint.c[0] == 1 ? flatbuf::Endianness_Big : flatbuf::Endianness_Little;
 }
 
-Status SchemaToFlatbuffer(
-    FBB& fbb, const Schema& schema, flatbuffers::Offset<flatbuf::Schema>* out) 
{
+Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* 
dictionary_memo,
+    flatbuffers::Offset<flatbuf::Schema>* out) {
   std::vector<FieldOffset> field_offsets;
   for (int i = 0; i < schema.num_fields(); ++i) {
     std::shared_ptr<Field> field = schema.field(i);
     FieldOffset offset;
-    RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, &offset));
+    RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, dictionary_memo, &offset));
     field_offsets.push_back(offset);
   }
 
@@ -295,29 +399,63 @@ Status SchemaToFlatbuffer(
   return Status::OK();
 }
 
-Status MessageBuilder::SetSchema(const Schema& schema) {
-  flatbuffers::Offset<flatbuf::Schema> fb_schema;
-  RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, &fb_schema));
+class MessageBuilder {
+ public:
+  Status SetSchema(const Schema& schema, DictionaryMemo* dictionary_memo) {
+    flatbuffers::Offset<flatbuf::Schema> fb_schema;
+    RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, dictionary_memo, 
&fb_schema));
 
-  header_type_ = flatbuf::MessageHeader_Schema;
-  header_ = fb_schema.Union();
-  body_length_ = 0;
-  return Status::OK();
-}
+    header_type_ = flatbuf::MessageHeader_Schema;
+    header_ = fb_schema.Union();
+    body_length_ = 0;
+    return Status::OK();
+  }
 
-Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length,
-    const std::vector<flatbuf::FieldNode>& nodes,
-    const std::vector<flatbuf::Buffer>& buffers) {
-  header_type_ = flatbuf::MessageHeader_RecordBatch;
-  header_ = flatbuf::CreateRecordBatch(fbb_, length, 
fbb_.CreateVectorOfStructs(nodes),
-                fbb_.CreateVectorOfStructs(buffers))
-                .Union();
-  body_length_ = body_length;
+  Status SetRecordBatch(int32_t length, int64_t body_length,
+      const std::vector<flatbuf::FieldNode>& nodes,
+      const std::vector<flatbuf::Buffer>& buffers) {
+    header_type_ = flatbuf::MessageHeader_RecordBatch;
+    header_ = flatbuf::CreateRecordBatch(fbb_, length, 
fbb_.CreateVectorOfStructs(nodes),
+                  fbb_.CreateVectorOfStructs(buffers))
+                  .Union();
+    body_length_ = body_length;
 
-  return Status::OK();
+    return Status::OK();
+  }
+
+  Status SetDictionary(int64_t id, int32_t length, int64_t body_length,
+      const std::vector<flatbuf::FieldNode>& nodes,
+      const std::vector<flatbuf::Buffer>& buffers) {
+    header_type_ = flatbuf::MessageHeader_DictionaryBatch;
+
+    auto record_batch = flatbuf::CreateRecordBatch(fbb_, length,
+        fbb_.CreateVectorOfStructs(nodes), 
fbb_.CreateVectorOfStructs(buffers));
+
+    header_ = flatbuf::CreateDictionaryBatch(fbb_, id, record_batch).Union();
+    body_length_ = body_length;
+    return Status::OK();
+  }
+
+  Status Finish();
+
+  Status GetBuffer(std::shared_ptr<Buffer>* out);
+
+ private:
+  flatbuf::MessageHeader header_type_;
+  flatbuffers::Offset<void> header_;
+  int64_t body_length_;
+  flatbuffers::FlatBufferBuilder fbb_;
+};
+
+Status WriteSchemaMessage(
+    const Schema& schema, DictionaryMemo* dictionary_memo, 
std::shared_ptr<Buffer>* out) {
+  MessageBuilder message;
+  RETURN_NOT_OK(message.SetSchema(schema, dictionary_memo));
+  RETURN_NOT_OK(message.Finish());
+  return message.GetBuffer(out);
 }
 
-Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
+Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
     const std::vector<flatbuf::FieldNode>& nodes,
     const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) 
{
   MessageBuilder builder;
@@ -326,6 +464,15 @@ Status WriteRecordBatchMetadata(int32_t length, int64_t 
body_length,
   return builder.GetBuffer(out);
 }
 
+Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
+    const std::vector<flatbuf::FieldNode>& nodes,
+    const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) 
{
+  MessageBuilder builder;
+  RETURN_NOT_OK(builder.SetDictionary(id, length, body_length, nodes, 
buffers));
+  RETURN_NOT_OK(builder.Finish());
+  return builder.GetBuffer(out);
+}
+
 Status MessageBuilder::Finish() {
   auto message =
       flatbuf::CreateMessage(fbb_, kMetadataVersion, header_type_, header_, 
body_length_);

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h 
b/cpp/src/arrow/ipc/metadata-internal.h
index d94a8ab..59afecb 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -46,31 +46,34 @@ using Offset = flatbuffers::Offset<void>;
 
 static constexpr flatbuf::MetadataVersion kMetadataVersion = 
flatbuf::MetadataVersion_V2;
 
-Status FieldFromFlatbuffer(const flatbuf::Field* field, 
std::shared_ptr<Field>* out);
+// Construct a field with type for a dictionary-encoded field. None of its
+// children or children's descendents can be dictionary encoded
+Status FieldFromFlatbufferDictionary(
+    const flatbuf::Field* field, std::shared_ptr<Field>* out);
 
-Status SchemaToFlatbuffer(
-    FBB& fbb, const Schema& schema, flatbuffers::Offset<flatbuf::Schema>* out);
+// Construct a field for a non-dictionary-encoded field. Its children may be
+// dictionary encoded
+Status FieldFromFlatbuffer(const flatbuf::Field* field,
+    const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out);
 
-class MessageBuilder {
- public:
-  Status SetSchema(const Schema& schema);
+Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* 
dictionary_memo,
+    flatbuffers::Offset<flatbuf::Schema>* out);
 
-  Status SetRecordBatch(int32_t length, int64_t body_length,
-      const std::vector<flatbuf::FieldNode>& nodes,
-      const std::vector<flatbuf::Buffer>& buffers);
-
-  Status Finish();
-
-  Status GetBuffer(std::shared_ptr<Buffer>* out);
-
- private:
-  flatbuf::MessageHeader header_type_;
-  flatbuffers::Offset<void> header_;
-  int64_t body_length_;
-  flatbuffers::FlatBufferBuilder fbb_;
-};
+// Serialize arrow::Schema as a Flatbuffer
+//
+// \param[in] schema a Schema instance
+// \param[inout] dictionary_memo class for tracking dictionaries and assigning
+// dictionary ids
+// \param[out] out the serialized arrow::Buffer
+// \return Status outcome
+Status WriteSchemaMessage(
+    const Schema& schema, DictionaryMemo* dictionary_memo, 
std::shared_ptr<Buffer>* out);
+
+Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
+    const std::vector<flatbuf::FieldNode>& nodes,
+    const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out);
 
-Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
+Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
     const std::vector<flatbuf::FieldNode>& nodes,
     const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index a97965c..2ba44ac 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -19,6 +19,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <sstream>
 #include <vector>
 
 #include "flatbuffers/flatbuffers.h"
@@ -38,11 +39,60 @@ namespace flatbuf = org::apache::arrow::flatbuf;
 
 namespace ipc {
 
-Status WriteSchema(const Schema& schema, std::shared_ptr<Buffer>* out) {
-  MessageBuilder message;
-  RETURN_NOT_OK(message.SetSchema(schema));
-  RETURN_NOT_OK(message.Finish());
-  return message.GetBuffer(out);
+// ----------------------------------------------------------------------
+// Memoization data structure for handling shared dictionaries
+
+DictionaryMemo::DictionaryMemo() {}
+
+// Returns KeyError if dictionary not found
+Status DictionaryMemo::GetDictionary(
+    int64_t id, std::shared_ptr<Array>* dictionary) const {
+  auto it = id_to_dictionary_.find(id);
+  if (it == id_to_dictionary_.end()) {
+    std::stringstream ss;
+    ss << "Dictionary with id " << id << " not found";
+    return Status::KeyError(ss.str());
+  }
+  *dictionary = it->second;
+  return Status::OK();
+}
+
+int64_t DictionaryMemo::GetId(const std::shared_ptr<Array>& dictionary) {
+  intptr_t address = reinterpret_cast<intptr_t>(dictionary.get());
+  auto it = dictionary_to_id_.find(address);
+  if (it != dictionary_to_id_.end()) {
+    // Dictionary already observed, return the id
+    return it->second;
+  } else {
+    int64_t new_id = static_cast<int64_t>(dictionary_to_id_.size()) + 1;
+    dictionary_to_id_[address] = new_id;
+    id_to_dictionary_[new_id] = dictionary;
+    return new_id;
+  }
+}
+
+bool DictionaryMemo::HasDictionary(const std::shared_ptr<Array>& dictionary) 
const {
+  intptr_t address = reinterpret_cast<intptr_t>(dictionary.get());
+  auto it = dictionary_to_id_.find(address);
+  return it != dictionary_to_id_.end();
+}
+
+bool DictionaryMemo::HasDictionaryId(int64_t id) const {
+  auto it = id_to_dictionary_.find(id);
+  return it != id_to_dictionary_.end();
+}
+
+Status DictionaryMemo::AddDictionary(
+    int64_t id, const std::shared_ptr<Array>& dictionary) {
+  if (HasDictionaryId(id)) {
+    std::stringstream ss;
+    ss << "Dictionary with id " << id << " already exists";
+    return Status::KeyError(ss.str());
+  }
+  intptr_t address = reinterpret_cast<intptr_t>(dictionary.get());
+  id_to_dictionary_[id] = dictionary;
+  dictionary_to_id_[address] = id;
+  return Status::OK();
 }
 
 //----------------------------------------------------------------------
@@ -113,10 +163,35 @@ class SchemaMetadata::SchemaMetadataImpl {
   explicit SchemaMetadataImpl(const void* schema)
       : schema_(static_cast<const flatbuf::Schema*>(schema)) {}
 
-  const flatbuf::Field* field(int i) const { return schema_->fields()->Get(i); 
}
+  const flatbuf::Field* get_field(int i) const { return 
schema_->fields()->Get(i); }
 
   int num_fields() const { return schema_->fields()->size(); }
 
+  Status VisitField(const flatbuf::Field* field, DictionaryTypeMap* 
id_to_field) const {
+    const flatbuf::DictionaryEncoding* dict_metadata = field->dictionary();
+    if (dict_metadata == nullptr) {
+      // Field is not dictionary encoded. Visit children
+      auto children = field->children();
+      for (flatbuffers::uoffset_t i = 0; i < children->size(); ++i) {
+        RETURN_NOT_OK(VisitField(children->Get(i), id_to_field));
+      }
+    } else {
+      // Field is dictionary encoded. Construct the data type for the
+      // dictionary (no descendents can be dictionary encoded)
+      std::shared_ptr<Field> dictionary_field;
+      RETURN_NOT_OK(FieldFromFlatbufferDictionary(field, &dictionary_field));
+      (*id_to_field)[dict_metadata->id()] = dictionary_field;
+    }
+    return Status::OK();
+  }
+
+  Status GetDictionaryTypes(DictionaryTypeMap* id_to_field) const {
+    for (int i = 0; i < num_fields(); ++i) {
+      RETURN_NOT_OK(VisitField(get_field(i), id_to_field));
+    }
+    return Status::OK();
+  }
+
  private:
   const flatbuf::Schema* schema_;
 };
@@ -138,15 +213,16 @@ int SchemaMetadata::num_fields() const {
   return impl_->num_fields();
 }
 
-Status SchemaMetadata::GetField(int i, std::shared_ptr<Field>* out) const {
-  const flatbuf::Field* field = impl_->field(i);
-  return FieldFromFlatbuffer(field, out);
+Status SchemaMetadata::GetDictionaryTypes(DictionaryTypeMap* id_to_field) 
const {
+  return impl_->GetDictionaryTypes(id_to_field);
 }
 
-Status SchemaMetadata::GetSchema(std::shared_ptr<Schema>* out) const {
+Status SchemaMetadata::GetSchema(
+    const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out) const 
{
   std::vector<std::shared_ptr<Field>> fields(num_fields());
   for (int i = 0; i < this->num_fields(); ++i) {
-    RETURN_NOT_OK(GetField(i, &fields[i]));
+    const flatbuf::Field* field = impl_->get_field(i);
+    RETURN_NOT_OK(FieldFromFlatbuffer(field, dictionary_memo, &fields[i]));
   }
   *out = std::make_shared<Schema>(fields);
   return Status::OK();
@@ -173,28 +249,34 @@ class RecordBatchMetadata::RecordBatchMetadataImpl {
 
   int num_fields() const { return batch_->nodes()->size(); }
 
+  void set_message(const std::shared_ptr<Message>& message) { message_ = 
message; }
+
+  void set_buffer(const std::shared_ptr<Buffer>& buffer) { buffer_ = buffer; }
+
  private:
   const flatbuf::RecordBatch* batch_;
   const flatbuffers::Vector<const flatbuf::FieldNode*>* nodes_;
   const flatbuffers::Vector<const flatbuf::Buffer*>* buffers_;
+
+  // Possible parents, owns the flatbuffer data
+  std::shared_ptr<Message> message_;
+  std::shared_ptr<Buffer> buffer_;
 };
 
 RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& 
message) {
-  message_ = message;
   impl_.reset(new RecordBatchMetadataImpl(message->impl_->header()));
+  impl_->set_message(message);
 }
 
-RecordBatchMetadata::RecordBatchMetadata(
-    const std::shared_ptr<Buffer>& buffer, int64_t offset) {
-  message_ = nullptr;
-  buffer_ = buffer;
-
-  const flatbuf::RecordBatch* metadata =
-      flatbuffers::GetRoot<flatbuf::RecordBatch>(buffer->data() + offset);
-
-  // TODO(wesm): validate table
+RecordBatchMetadata::RecordBatchMetadata(const void* header) {
+  impl_.reset(new RecordBatchMetadataImpl(header));
+}
 
-  impl_.reset(new RecordBatchMetadataImpl(metadata));
+RecordBatchMetadata::RecordBatchMetadata(
+    const std::shared_ptr<Buffer>& buffer, int64_t offset)
+    : RecordBatchMetadata(buffer->data() + offset) {
+  // Preserve ownership
+  impl_->set_buffer(buffer);
 }
 
 RecordBatchMetadata::~RecordBatchMetadata() {}
@@ -232,5 +314,64 @@ int RecordBatchMetadata::num_fields() const {
   return impl_->num_fields();
 }
 
+// ----------------------------------------------------------------------
+// DictionaryBatchMetadata
+
+class DictionaryBatchMetadata::DictionaryBatchMetadataImpl {
+ public:
+  explicit DictionaryBatchMetadataImpl(const void* dictionary)
+      : metadata_(static_cast<const flatbuf::DictionaryBatch*>(dictionary)) {
+    record_batch_.reset(new RecordBatchMetadata(metadata_->data()));
+  }
+
+  int64_t id() const { return metadata_->id(); }
+  const RecordBatchMetadata& record_batch() const { return *record_batch_; }
+
+  void set_message(const std::shared_ptr<Message>& message) { message_ = 
message; }
+
+ private:
+  const flatbuf::DictionaryBatch* metadata_;
+
+  std::unique_ptr<RecordBatchMetadata> record_batch_;
+
+  // Parent, owns the flatbuffer data
+  std::shared_ptr<Message> message_;
+};
+
+DictionaryBatchMetadata::DictionaryBatchMetadata(
+    const std::shared_ptr<Message>& message) {
+  impl_.reset(new DictionaryBatchMetadataImpl(message->impl_->header()));
+  impl_->set_message(message);
+}
+
+DictionaryBatchMetadata::~DictionaryBatchMetadata() {}
+
+int64_t DictionaryBatchMetadata::id() const {
+  return impl_->id();
+}
+
+const RecordBatchMetadata& DictionaryBatchMetadata::record_batch() const {
+  return impl_->record_batch();
+}
+
+// ----------------------------------------------------------------------
+// Conveniences
+
+Status ReadMessage(int64_t offset, int32_t metadata_length,
+    io::ReadableFileInterface* file, std::shared_ptr<Message>* message) {
+  std::shared_ptr<Buffer> buffer;
+  RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));
+
+  int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
+
+  if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) {
+    std::stringstream ss;
+    ss << "flatbuffer size " << metadata_length << " invalid. File offset: " 
<< offset
+       << ", metadata length: " << metadata_length;
+    return Status::Invalid(ss.str());
+  }
+  return Message::Open(buffer, 4, message);
+}
+
 }  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 81e3dbd..0091067 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -22,13 +22,17 @@
 
 #include <cstdint>
 #include <memory>
+#include <unordered_map>
 #include <vector>
 
+#include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
+class Array;
 class Buffer;
+struct DataType;
 struct Field;
 class Schema;
 class Status;
@@ -36,6 +40,7 @@ class Status;
 namespace io {
 
 class OutputStream;
+class ReadableFileInterface;
 
 }  // namespace io
 
@@ -47,9 +52,38 @@ struct MetadataVersion {
 
 //----------------------------------------------------------------------
 
-// Serialize arrow::Schema as a Flatbuffer
-ARROW_EXPORT
-Status WriteSchema(const Schema& schema, std::shared_ptr<Buffer>* out);
+using DictionaryMap = std::unordered_map<int64_t, std::shared_ptr<Array>>;
+using DictionaryTypeMap = std::unordered_map<int64_t, std::shared_ptr<Field>>;
+
+// Memoization data structure for handling shared dictionaries
+class DictionaryMemo {
+ public:
+  DictionaryMemo();
+
+  // Returns KeyError if dictionary not found
+  Status GetDictionary(int64_t id, std::shared_ptr<Array>* dictionary) const;
+
+  int64_t GetId(const std::shared_ptr<Array>& dictionary);
+
+  bool HasDictionary(const std::shared_ptr<Array>& dictionary) const;
+  bool HasDictionaryId(int64_t id) const;
+
+  // Add a dictionary to the memo with a particular id. Returns KeyError if
+  // that dictionary already exists
+  Status AddDictionary(int64_t id, const std::shared_ptr<Array>& dictionary);
+
+  const DictionaryMap& id_to_dictionary() const { return id_to_dictionary_; }
+
+ private:
+  // Dictionary memory addresses, to track whether a dictionary has been seen
+  // before
+  std::unordered_map<intptr_t, int64_t> dictionary_to_id_;
+
+  // Map of dictionary id to dictionary array
+  DictionaryMap id_to_dictionary_;
+
+  DISALLOW_COPY_AND_ASSIGN(DictionaryMemo);
+};
 
 // Read interface classes. We do not fully deserialize the flatbuffers so that
 // individual fields metadata can be retrieved from very large schema without
@@ -69,12 +103,15 @@ class ARROW_EXPORT SchemaMetadata {
 
   int num_fields() const;
 
-  // Construct an arrow::Field for the i-th value in the metadata
-  Status GetField(int i, std::shared_ptr<Field>* out) const;
+  // Retrieve a list of all the dictionary ids and types required by the 
schema for
+  // reconstruction. The presumption is that these will be loaded either from
+  // the stream or file (or they may already be somewhere else in memory)
+  Status GetDictionaryTypes(DictionaryTypeMap* id_to_field) const;
 
   // Construct a complete Schema from the message. May be expensive for very
   // large schemas if you are only interested in a few fields
-  Status GetSchema(std::shared_ptr<Schema>* out) const;
+  Status GetSchema(
+      const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out) 
const;
 
  private:
   // Parent, owns the flatbuffer data
@@ -82,6 +119,8 @@ class ARROW_EXPORT SchemaMetadata {
 
   class SchemaMetadataImpl;
   std::unique_ptr<SchemaMetadataImpl> impl_;
+
+  DISALLOW_COPY_AND_ASSIGN(SchemaMetadata);
 };
 
 // Field metadata
@@ -99,8 +138,10 @@ struct ARROW_EXPORT BufferMetadata {
 // Container for serialized record batch metadata contained in an IPC message
 class ARROW_EXPORT RecordBatchMetadata {
  public:
+  // Instantiate from opaque pointer. Memory ownership must be preserved
+  // elsewhere (e.g. in a dictionary batch)
+  explicit RecordBatchMetadata(const void* header);
   explicit RecordBatchMetadata(const std::shared_ptr<Message>& message);
-
   RecordBatchMetadata(const std::shared_ptr<Buffer>& message, int64_t offset);
 
   ~RecordBatchMetadata();
@@ -113,18 +154,25 @@ class ARROW_EXPORT RecordBatchMetadata {
   int num_fields() const;
 
  private:
-  // Parent, owns the flatbuffer data
-  std::shared_ptr<Message> message_;
-  std::shared_ptr<Buffer> buffer_;
-
   class RecordBatchMetadataImpl;
   std::unique_ptr<RecordBatchMetadataImpl> impl_;
+
+  DISALLOW_COPY_AND_ASSIGN(RecordBatchMetadata);
 };
 
 class ARROW_EXPORT DictionaryBatchMetadata {
  public:
+  explicit DictionaryBatchMetadata(const std::shared_ptr<Message>& message);
+  ~DictionaryBatchMetadata();
+
   int64_t id() const;
-  std::unique_ptr<RecordBatchMetadata> data() const;
+  const RecordBatchMetadata& record_batch() const;
+
+ private:
+  class DictionaryBatchMetadataImpl;
+  std::unique_ptr<DictionaryBatchMetadataImpl> impl_;
+
+  DISALLOW_COPY_AND_ASSIGN(DictionaryBatchMetadata);
 };
 
 class ARROW_EXPORT Message {
@@ -141,24 +189,31 @@ class ARROW_EXPORT Message {
  private:
   Message(const std::shared_ptr<Buffer>& buffer, int64_t offset);
 
+  friend class DictionaryBatchMetadata;
   friend class RecordBatchMetadata;
   friend class SchemaMetadata;
 
   // Hide serialization details from user API
   class MessageImpl;
   std::unique_ptr<MessageImpl> impl_;
-};
 
-struct ARROW_EXPORT FileBlock {
-  FileBlock() {}
-  FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length)
-      : offset(offset), metadata_length(metadata_length), 
body_length(body_length) {}
-
-  int64_t offset;
-  int32_t metadata_length;
-  int64_t body_length;
+  DISALLOW_COPY_AND_ASSIGN(Message);
 };
 
+/// Read a length-prefixed message flatbuffer starting at the indicated file
+/// offset
+///
+/// The metadata_length includes at least the length prefix and the flatbuffer
+///
+/// \param[in] offset the position in the file where the message starts. The
+/// first 4 bytes after the offset are the message length
+/// \param[in] metadata_length the total number of bytes to read from file
+/// \param[in] file the seekable file interface to read from
+/// \param[out] message the message read
+/// \return Status success or failure
+Status ReadMessage(int64_t offset, int32_t metadata_length,
+    io::ReadableFileInterface* file, std::shared_ptr<Message>* message);
+
 }  // namespace ipc
 }  // namespace arrow
 

Reply via email to