This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new fd0b90a  ARROW-3769: [C++] Add support for reading non-dictionary 
encoded binary Parquet columns directly as DictionaryArray
fd0b90a is described below

commit fd0b90a7f7e65fde32af04c4746004a1240914cf
Author: Hatem Helal <hhe...@mathworks.com>
AuthorDate: Sun Mar 17 19:13:41 2019 -0500

    ARROW-3769: [C++] Add support for reading non-dictionary encoded binary 
Parquet columns directly as DictionaryArray
    
    This patch addresses the following JIRAS:
    
    * [ARROW-3769](https://issues.apache.org/jira/browse/ARROW-3769): 
refactored record reader logic to toggle between the different builder 
depending on the column type (String or Binary) and the requested array type 
(Chunked "dense" or Dictionary).  These changes are covered by unittests and 
benchmarks.
    * [PARQUET-1537](https://issues.apache.org/jira/browse/PARQUET-1537): fixed 
increment and covered by unittests.
    
    Also included is an experimental class `ArrowReaderProperties` that can be 
used to select which columns are read directly as an `arrow::DictionaryArray`.  
I think some more work is needed to fully address the requests in 
[ARROW-3772](https://issues.apache.org/jira/browse/ARROW-3772).  Namely, the 
ability automatically infer which columns in a parquet file should be read as 
`DictionaryArray`.  My current thinking is that this would be solved by 
introducing optional arrow type metadata  [...]
    
    Note that the behavior with this patch is that incremental reading of a 
parquet file will not resolve the global dictionary for all of the row groups.  
There are a few possible solutions for this:
    
    * Introduce a concept of an "unknown" dictionary.  This will enable 
concatenating multiple row groups together so long as we define unknown 
dictionaries as equal (assuming indices have the same data type)
    * Add an API for merging the schemas from multiple tables together.  This 
could be used after reading multiple row groups to enable concatenating the 
tables together into one.
    * Add an API for inferring the global dictionary for the entire file.  This 
could be an expensive operation so ideally would be made optional.
    * Allow a user-specified dictionary.  This could be useful in the limited 
case where a caller already knows the global dictionary list (computed through 
some other mechanism).
    
    Author: Hatem Helal <hhe...@mathworks.com>
    Author: Hatem Helal <hatem.he...@gmail.com>
    Author: Hatem Helal <hatem.he...@mathworks.co.uk>
    
    Closes #3721 from hatemhelal/arrow-3769 and squashes the following commits:
    
    f644fff9c <Hatem Helal> Move schema fix logic to post-processing step
    023c022c3 <Hatem Helal> Add virtual destructor to WrappedBuilderInterface
    99e9dee12 <Hatem Helal> Removed dependencies on arrow builder in 
parquet/encoding
    2026b513c <Hatem Helal> Rework ByteArrayDecoder interface to reduce code 
duplication
    5bc933b97 <Hatem Helal> use PutSpaced in test setup to correctly initialize 
encoded data
    2c8fa7efd <Hatem Helal> revert incorrect changes to 
PlainByteArrayDecoder::DecodeArrow method
    7719b944f <Hatem Helal> Use random string generator instead of poor JSON
    e6ca0db43 <Hatem Helal> Fix DictEncoding test: need to use PutSpaced 
instead of Put in setup
    9da133142 <Hatem Helal> Temporarily disable tests for arrow builder 
decoding from dictionary encoded col
    7347cfa26 <Hatem Helal> Fix DecodeArrow from plain encoded columns
    5fb9e860a <Hatem Helal> Rework parquet encoding tests
    4d7bb30de <Hatem Helal> Refactor dictionary data generation into 
RandomArrayGenerator
    6e65fdbdf <Hatem Helal> simplify ArrowReaderProperties and mark as 
experimental
    babe52e38 <Hatem Helal> replace deprecated ReadableFileInterface with 
RandomAccessFile
    a267a27d4 <Hatem Helal> remove unnecessary inlines
    7aac84c45 <Hatem Helal> Reworked encoding benchmark to reduce code 
duplication
    077a8f1ae <Hatem Helal> Move function definition to (hopefully) resolve 
appveyor build failure due to C2491
    a35754456 <Hatem Helal> Basic unittests for reading DictionaryArray 
directly from parquet
    a6740f31e <Hatem Helal> Make sure to update the schema when reading a 
column as a DictionaryArray
    a8c15354e <Hatem Helal> Add support for requesting a parquet column be read 
as a DictionaryArray
    28d76b7b2 <Hatem Helal> Add benchmark for dictionary decoding using arrow 
builder
    8f59198e8 <Hatem Helal> Add overloads for decoding using a 
StringDictionaryBuilder
    b16eaa978 <Hatem Helal> prefer default_random_engine to avoid potential 
slowdown with Mersenne Twister prng
    ff380211c <Hatem Helal> prefer mersenne twister prng over default one which 
is implemenation defined
    78eddb8af <Hatem Helal> Use value parameterization in decoding tests
    84df23bfa <Hatem Helal> prefer range-based for loop to KeepRunning while 
loop pattern
    f234ca2a2 <Hatem Helal> respond to code review feedback - many readability 
fixes in benchmark and tests
    4fbcf1fab <Hatem Helal> fix loop increment in templated 
PlainByteArrayDecoder::DecodeArrow method
    39a5f1994 <Hatem Helal> fix appveyor windows failure
    89de5d5be <Hatem Helal> rework data generation so that decoding benchmark 
runs using a more realistic dataset
    ef55081a0 <Hatem Helal> added benchmarks for decoding plain encoded data 
using arrow builders
    31667ffbb <Hatem Helal> added tests for DictByteArrayDecoder and reworked 
previous tests
    4a26f7405 <Hatem Helal> remove todo message
    fa504158f <Hatem Helal> Implement DecodeArrowNonNull and unit tests
    21fc45083 <Hatem Helal> Add some basic unittests that exercise the 
DecodeArrow methods
---
 cpp/src/arrow/testing/random.cc                   |  63 ++++-
 cpp/src/arrow/testing/random.h                    |  29 +++
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc |  75 ++++++
 cpp/src/parquet/arrow/reader.cc                   | 101 +++++++--
 cpp/src/parquet/arrow/reader.h                    |  48 +++-
 cpp/src/parquet/arrow/record_reader.cc            | 260 ++++++++++++---------
 cpp/src/parquet/arrow/record_reader.h             |   7 +-
 cpp/src/parquet/encoding-benchmark.cc             | 224 +++++++++++++++++-
 cpp/src/parquet/encoding-test.cc                  | 265 ++++++++++++++++++++++
 cpp/src/parquet/encoding.cc                       | 102 ++-------
 cpp/src/parquet/encoding.h                        |  78 +++++--
 11 files changed, 1004 insertions(+), 248 deletions(-)

diff --git a/cpp/src/arrow/testing/random.cc b/cpp/src/arrow/testing/random.cc
index b050c5d..f693a45 100644
--- a/cpp/src/arrow/testing/random.cc
+++ b/cpp/src/arrow/testing/random.cc
@@ -79,7 +79,7 @@ std::shared_ptr<Array> RandomArrayGenerator::Boolean(int64_t 
size, double probab
   // only calls the GenerateBitmap method.
   using GenOpt = GenerateOptions<int, std::uniform_int_distribution<int>>;
 
-  std::vector<std::shared_ptr<Buffer>> buffers{2};
+  BufferVector buffers{2};
   // Need 2 distinct generators such that probabilities are not shared.
   GenOpt value_gen(seed(), 0, 1, probability);
   GenOpt null_gen(seed(), 0, 1, null_probability);
@@ -100,7 +100,7 @@ static std::shared_ptr<NumericArray<ArrowType>> 
GenerateNumericArray(int64_t siz
                                                                      
OptionType options) {
   using CType = typename ArrowType::c_type;
   auto type = TypeTraits<ArrowType>::type_singleton();
-  std::vector<std::shared_ptr<Buffer>> buffers{2};
+  BufferVector buffers{2};
 
   int64_t null_count = 0;
   ABORT_NOT_OK(AllocateEmptyBitmap(size, &buffers[0]));
@@ -145,5 +145,64 @@ PRIMITIVE_RAND_FLOAT_IMPL(Float64, double, DoubleType)
 #undef PRIMITIVE_RAND_FLOAT_IMPL
 #undef PRIMITIVE_RAND_IMPL
 
+std::shared_ptr<arrow::Array> RandomArrayGenerator::String(int64_t size,
+                                                           int32_t min_length,
+                                                           int32_t max_length,
+                                                           double 
null_probability) {
+  if (null_probability < 0 || null_probability > 1) {
+    ABORT_NOT_OK(Status::Invalid("null_probability must be between 0 and 1"));
+  }
+
+  auto int32_lengths = Int32(size, min_length, max_length, null_probability);
+  auto lengths = std::dynamic_pointer_cast<Int32Array>(int32_lengths);
+
+  // Visual Studio does not implement uniform_int_distribution for char types.
+  using GenOpt = GenerateOptions<uint8_t, 
std::uniform_int_distribution<uint16_t>>;
+  GenOpt options(seed(), static_cast<uint8_t>('A'), static_cast<uint8_t>('z'),
+                 /*null_probability=*/0);
+
+  std::vector<uint8_t> str_buffer(max_length);
+  StringBuilder builder;
+
+  for (int64_t i = 0; i < size; ++i) {
+    if (lengths->IsValid(i)) {
+      options.GenerateData(str_buffer.data(), lengths->Value(i));
+      ABORT_NOT_OK(builder.Append(str_buffer.data(), lengths->Value(i)));
+    } else {
+      ABORT_NOT_OK(builder.AppendNull());
+    }
+  }
+
+  std::shared_ptr<arrow::Array> result;
+  ABORT_NOT_OK(builder.Finish(&result));
+  return result;
+}
+
+std::shared_ptr<arrow::Array> RandomArrayGenerator::StringWithRepeats(
+    int64_t size, int64_t unique, int32_t min_length, int32_t max_length,
+    double null_probability) {
+  // Generate a random string dictionary without any nulls
+  auto array = String(unique, min_length, max_length, /*null_probability=*/0);
+  auto dictionary = std::dynamic_pointer_cast<StringArray>(array);
+
+  // Generate random indices to sample the dictionary with
+  auto id_array = Int64(size, 0, unique - 1, null_probability);
+  auto indices = std::dynamic_pointer_cast<Int64Array>(id_array);
+  StringBuilder builder;
+
+  for (int64_t i = 0; i < size; ++i) {
+    if (indices->IsValid(i)) {
+      const auto index = indices->Value(i);
+      const auto value = dictionary->GetView(index);
+      ABORT_NOT_OK(builder.Append(value));
+    } else {
+      ABORT_NOT_OK(builder.AppendNull());
+    }
+  }
+
+  std::shared_ptr<Array> result;
+  ABORT_NOT_OK(builder.Finish(&result));
+  return result;
+}
 }  // namespace random
 }  // namespace arrow
diff --git a/cpp/src/arrow/testing/random.h b/cpp/src/arrow/testing/random.h
index 1ed8d03..f69b705 100644
--- a/cpp/src/arrow/testing/random.h
+++ b/cpp/src/arrow/testing/random.h
@@ -198,6 +198,35 @@ class ARROW_EXPORT RandomArrayGenerator {
     }
   }
 
+  /// \brief Generates a random StringArray
+  ///
+  /// \param[in] size the size of the array to generate
+  /// \param[in] min_length the lower bound of the string length
+  ///            determined by the uniform distribution
+  /// \param[in] max_length the upper bound of the string length
+  ///            determined by the uniform distribution
+  /// \param[in] null_probability the probability of a row being null
+  ///
+  /// \return a generated Array
+  std::shared_ptr<arrow::Array> String(int64_t size, int32_t min_length,
+                                       int32_t max_length, double 
null_probability);
+
+  /// \brief Generates a random StringArray with repeated values
+  ///
+  /// \param[in] size the size of the array to generate
+  /// \param[in] unique the number of unique string values used
+  ///            to populate the array
+  /// \param[in] min_length the lower bound of the string length
+  ///            determined by the uniform distribution
+  /// \param[in] max_length the upper bound of the string length
+  ///            determined by the uniform distribution
+  /// \param[in] null_probability the probability of a row being null
+  ///
+  /// \return a generated Array
+  std::shared_ptr<arrow::Array> StringWithRepeats(int64_t size, int64_t unique,
+                                                  int32_t min_length, int32_t 
max_length,
+                                                  double null_probability);
+
  private:
   SeedType seed() { return seed_distribution_(seed_rng_); }
 
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc 
b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index 3995d2c..b54c3f4 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -30,6 +30,7 @@
 #include <vector>
 
 #include "arrow/api.h"
+#include "arrow/testing/random.h"
 #include "arrow/testing/util.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/decimal.h"
@@ -2439,6 +2440,80 @@ TEST(TestArrowWriterAdHoc, SchemaMismatch) {
   ASSERT_RAISES(Invalid, writer->WriteTable(*tbl, 1));
 }
 
+// ----------------------------------------------------------------------
+// Tests for directly reading DictionaryArray
+class TestArrowReadDictionary : public ::testing::TestWithParam<double> {
+ public:
+  void SetUp() override {
+    GenerateData(GetParam());
+    ASSERT_NO_FATAL_FAILURE(
+        WriteTableToBuffer(expected_dense_, expected_dense_->num_rows() / 2,
+                           default_arrow_writer_properties(), &buffer_));
+
+    properties_ = default_arrow_reader_properties();
+  }
+
+  void GenerateData(double null_probability) {
+    constexpr int num_unique = 100;
+    constexpr int repeat = 10;
+    constexpr int64_t min_length = 2;
+    constexpr int64_t max_length = 10;
+    ::arrow::random::RandomArrayGenerator rag(0);
+    auto dense_array = rag.StringWithRepeats(repeat * num_unique, num_unique, 
min_length,
+                                             max_length, null_probability);
+    expected_dense_ = MakeSimpleTable(dense_array, /*nullable=*/true);
+
+    ::arrow::StringDictionaryBuilder builder(default_memory_pool());
+    const auto& string_array = static_cast<const 
::arrow::StringArray&>(*dense_array);
+    ASSERT_OK(builder.AppendArray(string_array));
+
+    std::shared_ptr<::arrow::Array> dict_array;
+    ASSERT_OK(builder.Finish(&dict_array));
+    expected_dict_ = MakeSimpleTable(dict_array, /*nullable=*/true);
+
+    // TODO(hatemhelal): Figure out if we can use the following to init the 
expected_dict_
+    // Currently fails due to DataType mismatch for indices array.
+    //    Datum out;
+    //    FunctionContext ctx(default_memory_pool());
+    //    ASSERT_OK(DictionaryEncode(&ctx, Datum(dense_array), &out));
+    //    expected_dict_ = MakeSimpleTable(out.make_array(), 
/*nullable=*/true);
+  }
+
+  void TearDown() override {}
+
+  void CheckReadWholeFile(const Table& expected) {
+    std::unique_ptr<FileReader> reader;
+    ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer_),
+                                ::arrow::default_memory_pool(), properties_, 
&reader));
+
+    std::shared_ptr<Table> actual;
+    ASSERT_OK_NO_THROW(reader->ReadTable(&actual));
+    ::arrow::AssertTablesEqual(*actual, expected, /*same_chunk_layout=*/false);
+  }
+
+  static std::vector<double> null_probabilites() { return {0.0, 0.5, 1}; }
+
+ protected:
+  std::shared_ptr<Table> expected_dense_;
+  std::shared_ptr<Table> expected_dict_;
+  std::shared_ptr<Buffer> buffer_;
+  ArrowReaderProperties properties_;
+};
+
+TEST_P(TestArrowReadDictionary, ReadWholeFileDict) {
+  properties_.set_read_dictionary(0, true);
+  CheckReadWholeFile(*expected_dict_);
+}
+
+TEST_P(TestArrowReadDictionary, ReadWholeFileDense) {
+  properties_.set_read_dictionary(0, false);
+  CheckReadWholeFile(*expected_dense_);
+}
+
+INSTANTIATE_TEST_CASE_P(
+    ReadDictionary, TestArrowReadDictionary,
+    ::testing::ValuesIn(TestArrowReadDictionary::null_probabilites()));
+
 }  // namespace arrow
 
 }  // namespace parquet
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 61f5bb2..f891682 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -101,6 +101,11 @@ Status GetSingleChunk(const ChunkedArray& chunked, 
std::shared_ptr<Array>* out)
 
 }  // namespace
 
+ArrowReaderProperties default_arrow_reader_properties() {
+  static ArrowReaderProperties default_reader_props;
+  return default_reader_props;
+}
+
 // ----------------------------------------------------------------------
 // Iteration utilities
 
@@ -236,8 +241,9 @@ using FileColumnIteratorFactory =
 
 class FileReader::Impl {
  public:
-  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
-      : pool_(pool), reader_(std::move(reader)), use_threads_(false) {}
+  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
+       const ArrowReaderProperties& properties)
+      : pool_(pool), reader_(std::move(reader)), 
reader_properties_(properties) {}
 
   virtual ~Impl() {}
 
@@ -279,14 +285,21 @@ class FileReader::Impl {
 
   int num_columns() const { return reader_->metadata()->num_columns(); }
 
-  void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
+  void set_use_threads(bool use_threads) {
+    reader_properties_.set_use_threads(use_threads);
+  }
 
   ParquetFileReader* reader() { return reader_.get(); }
 
+  std::vector<int> GetDictionaryIndices(const std::vector<int>& indices);
+  std::shared_ptr<::arrow::Schema> FixSchema(
+      const ::arrow::Schema& old_schema, const std::vector<int>& dict_indices,
+      std::vector<std::shared_ptr<::arrow::Column>>& columns);
+
  private:
   MemoryPool* pool_;
   std::unique_ptr<ParquetFileReader> reader_;
-  bool use_threads_;
+  ArrowReaderProperties reader_properties_;
 };
 
 class ColumnReader::ColumnReaderImpl {
@@ -302,9 +315,10 @@ class ColumnReader::ColumnReaderImpl {
 // Reader implementation for primitive arrays
 class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl {
  public:
-  PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
+  PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input,
+                const bool read_dictionary)
       : pool_(pool), input_(std::move(input)), descr_(input_->descr()) {
-    record_reader_ = RecordReader::Make(descr_, pool_);
+    record_reader_ = RecordReader::Make(descr_, pool_, read_dictionary);
     Status s = NodeToField(*input_->descr()->schema_node(), &field_);
     DCHECK_OK(s);
     NextRowGroup();
@@ -358,17 +372,19 @@ class PARQUET_NO_EXPORT StructImpl : public 
ColumnReader::ColumnReaderImpl {
                  const std::vector<std::shared_ptr<ColumnReaderImpl>>& 
children);
 };
 
-FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> 
reader)
-    : impl_(new FileReader::Impl(pool, std::move(reader))) {}
+FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> 
reader,
+                       const ArrowReaderProperties& properties)
+    : impl_(new FileReader::Impl(pool, std::move(reader), properties)) {}
 
 FileReader::~FileReader() {}
 
 Status FileReader::Impl::GetColumn(int i, FileColumnIteratorFactory 
iterator_factory,
                                    std::unique_ptr<ColumnReader>* out) {
   std::unique_ptr<FileColumnIterator> input(iterator_factory(i, 
reader_.get()));
+  bool read_dict = reader_properties_.read_dictionary(i);
 
   std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
-      new PrimitiveImpl(pool_, std::move(input)));
+      new PrimitiveImpl(pool_, std::move(input), read_dict));
   *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
   return Status::OK();
 }
@@ -552,7 +568,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
     return Status::OK();
   };
 
-  if (use_threads_) {
+  if (reader_properties_.use_threads()) {
     std::vector<std::future<Status>> futures;
     auto pool = ::arrow::internal::GetCpuThreadPool();
     for (int i = 0; i < num_fields; i++) {
@@ -572,7 +588,15 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
     }
   }
 
-  *out = Table::Make(schema, columns);
+  auto dict_indices = GetDictionaryIndices(indices);
+
+  if (!dict_indices.empty()) {
+    schema = FixSchema(*schema, dict_indices, columns);
+  }
+
+  std::shared_ptr<Table> table = Table::Make(schema, columns);
+  RETURN_NOT_OK(table->Validate());
+  *out = table;
   return Status::OK();
 }
 
@@ -599,7 +623,7 @@ Status FileReader::Impl::ReadTable(const std::vector<int>& 
indices,
     return Status::OK();
   };
 
-  if (use_threads_) {
+  if (reader_properties_.use_threads()) {
     std::vector<std::future<Status>> futures;
     auto pool = ::arrow::internal::GetCpuThreadPool();
     for (int i = 0; i < num_fields; i++) {
@@ -619,6 +643,12 @@ Status FileReader::Impl::ReadTable(const std::vector<int>& 
indices,
     }
   }
 
+  auto dict_indices = GetDictionaryIndices(indices);
+
+  if (!dict_indices.empty()) {
+    schema = FixSchema(*schema, dict_indices, columns);
+  }
+
   std::shared_ptr<Table> table = Table::Make(schema, columns);
   RETURN_NOT_OK(table->Validate());
   *out = table;
@@ -664,6 +694,32 @@ Status FileReader::Impl::ReadRowGroup(int i, 
std::shared_ptr<Table>* table) {
   return ReadRowGroup(i, indices, table);
 }
 
+std::vector<int> FileReader::Impl::GetDictionaryIndices(const 
std::vector<int>& indices) {
+  // Select the column indices that were read as DictionaryArray
+  std::vector<int> dict_indices(indices);
+  auto remove_func = [this](int i) { return 
!reader_properties_.read_dictionary(i); };
+  auto it = std::remove_if(dict_indices.begin(), dict_indices.end(), 
remove_func);
+  dict_indices.erase(it, dict_indices.end());
+  return dict_indices;
+}
+
+std::shared_ptr<::arrow::Schema> FileReader::Impl::FixSchema(
+    const ::arrow::Schema& old_schema, const std::vector<int>& dict_indices,
+    std::vector<std::shared_ptr<::arrow::Column>>& columns) {
+  // Fix the schema with the actual DictionaryType that was read
+  auto fields = old_schema.fields();
+
+  for (int idx : dict_indices) {
+    auto name = columns[idx]->name();
+    auto dict_array = columns[idx]->data();
+    auto dict_field = std::make_shared<::arrow::Field>(name, 
dict_array->type());
+    fields[idx] = dict_field;
+    columns[idx] = std::make_shared<Column>(dict_field, dict_array);
+  }
+
+  return std::make_shared<::arrow::Schema>(fields, old_schema.metadata());
+}
+
 // Static ctor
 Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
                 MemoryPool* allocator, const ReaderProperties& props,
@@ -683,6 +739,18 @@ Status OpenFile(const 
std::shared_ptr<::arrow::io::RandomAccessFile>& file,
                   reader);
 }
 
+Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
+                ::arrow::MemoryPool* allocator, const ArrowReaderProperties& 
properties,
+                std::unique_ptr<FileReader>* reader) {
+  std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(file));
+  std::unique_ptr<ParquetReader> pq_reader;
+  PARQUET_CATCH_NOT_OK(
+      pq_reader = ParquetReader::Open(std::move(io_wrapper),
+                                      ::parquet::default_reader_properties(), 
nullptr));
+  reader->reset(new FileReader(allocator, std::move(pq_reader), properties));
+  return Status::OK();
+}
+
 Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
   FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* 
reader) {
     return new AllRowGroupsIterator(i, reader);
@@ -1138,15 +1206,6 @@ struct TransferFunctor<
   Status operator()(RecordReader* reader, MemoryPool* pool,
                     const std::shared_ptr<::arrow::DataType>& type, Datum* 
out) {
     std::vector<std::shared_ptr<Array>> chunks = reader->GetBuilderChunks();
-
-    if (type->id() == ::arrow::Type::STRING) {
-      // Convert from BINARY type to STRING
-      for (size_t i = 0; i < chunks.size(); ++i) {
-        auto new_data = chunks[i]->data()->Copy();
-        new_data->type = type;
-        chunks[i] = ::arrow::MakeArray(new_data);
-      }
-    }
     *out = std::make_shared<ChunkedArray>(chunks);
     return Status::OK();
   }
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index 7ef21fd..52fcec8 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -20,6 +20,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <unordered_set>
 #include <vector>
 
 #include "parquet/util/visibility.h"
@@ -51,6 +52,42 @@ class ColumnChunkReader;
 class ColumnReader;
 class RowGroupReader;
 
+static constexpr bool DEFAULT_USE_THREADS = false;
+
+/// EXPERIMENTAL: Properties for configuring FileReader behavior.
+class PARQUET_EXPORT ArrowReaderProperties {
+ public:
+  explicit ArrowReaderProperties(bool use_threads = DEFAULT_USE_THREADS)
+      : use_threads_(use_threads), read_dict_indices_() {}
+
+  void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
+
+  bool use_threads() const { return use_threads_; }
+
+  void set_read_dictionary(int column_index, bool read_dict) {
+    if (read_dict) {
+      read_dict_indices_.insert(column_index);
+    } else {
+      read_dict_indices_.erase(column_index);
+    }
+  }
+  bool read_dictionary(int column_index) const {
+    if (read_dict_indices_.find(column_index) != read_dict_indices_.end()) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+ private:
+  bool use_threads_;
+  std::unordered_set<int> read_dict_indices_;
+};
+
+/// EXPERIMENTAL: Constructs the default ArrowReaderProperties
+PARQUET_EXPORT
+ArrowReaderProperties default_arrow_reader_properties();
+
 // Arrow read adapter class for deserializing Parquet files as Arrow row
 // batches.
 //
@@ -109,7 +146,8 @@ class RowGroupReader;
 // arrays
 class PARQUET_EXPORT FileReader {
  public:
-  FileReader(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileReader> 
reader);
+  FileReader(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileReader> 
reader,
+             const ArrowReaderProperties& properties = 
default_arrow_reader_properties());
 
   // Since the distribution of columns amongst a Parquet file's row groups may
   // be uneven (the number of values in each column chunk can be different), we
@@ -291,7 +329,7 @@ class PARQUET_EXPORT ColumnReader {
 };
 
 // Helper function to create a file reader from an implementation of an Arrow
-// readable file
+// random access file
 //
 // metadata : separately-computed file metadata, can be nullptr
 PARQUET_EXPORT
@@ -306,6 +344,12 @@ PARQUET_EXPORT
                          ::arrow::MemoryPool* allocator,
                          std::unique_ptr<FileReader>* reader);
 
+PARQUET_EXPORT
+::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& 
file,
+                         ::arrow::MemoryPool* allocator,
+                         const ArrowReaderProperties& properties,
+                         std::unique_ptr<FileReader>* reader);
+
 }  // namespace arrow
 }  // namespace parquet
 
diff --git a/cpp/src/parquet/arrow/record_reader.cc 
b/cpp/src/parquet/arrow/record_reader.cc
index c800f36..42334b9 100644
--- a/cpp/src/parquet/arrow/record_reader.cc
+++ b/cpp/src/parquet/arrow/record_reader.cc
@@ -449,35 +449,16 @@ class RecordReader::RecordReaderImpl {
 };
 
 template <typename DType>
-struct RecordReaderTraits {
-  using BuilderType = ::arrow::ArrayBuilder;
-};
-
-template <>
-struct RecordReaderTraits<ByteArrayType> {
-  using BuilderType = ::arrow::internal::ChunkedBinaryBuilder;
-};
-
-template <>
-struct RecordReaderTraits<FLBAType> {
-  using BuilderType = ::arrow::FixedSizeBinaryBuilder;
-};
-
-template <typename DType>
 class TypedRecordReader : public RecordReader::RecordReaderImpl {
  public:
   using T = typename DType::c_type;
 
-  using BuilderType = typename RecordReaderTraits<DType>::BuilderType;
-
   TypedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
-      : RecordReader::RecordReaderImpl(descr, pool), current_decoder_(nullptr) 
{
-    InitializeBuilder();
-  }
+      : RecordReader::RecordReaderImpl(descr, pool), current_decoder_(nullptr) 
{}
 
   void ResetDecoders() override { decoders_.clear(); }
 
-  inline void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) {
+  virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) 
{
     uint8_t* valid_bits = valid_bits_->mutable_data();
     const int64_t valid_bits_offset = values_written_;
 
@@ -487,7 +468,7 @@ class TypedRecordReader : public 
RecordReader::RecordReaderImpl {
     DCHECK_EQ(num_decoded, values_with_nulls);
   }
 
-  inline void ReadValuesDense(int64_t values_to_read) {
+  virtual void ReadValuesDense(int64_t values_to_read) {
     int64_t num_decoded =
         current_decoder_->Decode(ValuesHead<T>(), 
static_cast<int>(values_to_read));
     DCHECK_EQ(num_decoded, values_to_read);
@@ -568,18 +549,17 @@ class TypedRecordReader : public 
RecordReader::RecordReaderImpl {
     throw ParquetException("GetChunks only implemented for binary types");
   }
 
- private:
+ protected:
   using DecoderType = typename EncodingTraits<DType>::Decoder;
 
+  DecoderType* current_decoder_;
+
+ private:
   // Map of encoding type to the respective decoder object. For example, a
   // column chunk's data pages may include both dictionary-encoded and
   // plain-encoded data.
   std::unordered_map<int, std::unique_ptr<DecoderType>> decoders_;
 
-  std::unique_ptr<BuilderType> builder_;
-
-  DecoderType* current_decoder_;
-
   // Initialize repetition and definition level decoders on the next data page.
   int64_t InitializeLevelDecoders(const DataPage& page,
                                   Encoding::type repetition_level_encoding,
@@ -590,103 +570,143 @@ class TypedRecordReader : public 
RecordReader::RecordReaderImpl {
   // Advance to the next data page
   bool ReadNewPage() override;
 
-  void InitializeBuilder() {}
-
   void ConfigureDictionary(const DictionaryPage* page);
 };
 
-// TODO(wesm): Implement these to some satisfaction
-template <>
-void TypedRecordReader<Int96Type>::DebugPrintState() {}
+class FLBARecordReader : public TypedRecordReader<FLBAType> {
+ public:
+  FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
+      : TypedRecordReader<FLBAType>(descr, pool), builder_(nullptr) {
+    DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
+    int byte_width = descr_->type_length();
+    std::shared_ptr<::arrow::DataType> type = 
::arrow::fixed_size_binary(byte_width);
+    builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool_));
+  }
 
-template <>
-void TypedRecordReader<ByteArrayType>::DebugPrintState() {}
+  ::arrow::ArrayVector GetBuilderChunks() override {
+    std::shared_ptr<::arrow::Array> chunk;
+    PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
+    return ::arrow::ArrayVector({chunk});
+  }
 
-template <>
-void TypedRecordReader<FLBAType>::DebugPrintState() {}
+  void ReadValuesDense(int64_t values_to_read) override {
+    auto values = ValuesHead<FLBA>();
+    int64_t num_decoded =
+        current_decoder_->Decode(values, static_cast<int>(values_to_read));
+    DCHECK_EQ(num_decoded, values_to_read);
 
-template <>
-void TypedRecordReader<ByteArrayType>::InitializeBuilder() {
-  // Maximum of 16MB chunks
-  constexpr int32_t kBinaryChunksize = 1 << 24;
-  DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
-  builder_.reset(new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, 
pool_));
-}
+    for (int64_t i = 0; i < num_decoded; i++) {
+      PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
+    }
+    ResetValues();
+  }
 
-template <>
-void TypedRecordReader<FLBAType>::InitializeBuilder() {
-  DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
-  int byte_width = descr_->type_length();
-  std::shared_ptr<::arrow::DataType> type = 
::arrow::fixed_size_binary(byte_width);
-  builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool_));
-}
+  void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
+    uint8_t* valid_bits = valid_bits_->mutable_data();
+    const int64_t valid_bits_offset = values_written_;
+    auto values = ValuesHead<FLBA>();
 
-template <>
-::arrow::ArrayVector TypedRecordReader<ByteArrayType>::GetBuilderChunks() {
-  ::arrow::ArrayVector chunks;
-  PARQUET_THROW_NOT_OK(builder_->Finish(&chunks));
-  return chunks;
-}
+    int64_t num_decoded = current_decoder_->DecodeSpaced(
+        values, static_cast<int>(values_to_read), static_cast<int>(null_count),
+        valid_bits, valid_bits_offset);
+    DCHECK_EQ(num_decoded, values_to_read);
 
-template <>
-::arrow::ArrayVector TypedRecordReader<FLBAType>::GetBuilderChunks() {
-  std::shared_ptr<::arrow::Array> chunk;
-  PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
-  return ::arrow::ArrayVector({chunk});
-}
+    for (int64_t i = 0; i < num_decoded; i++) {
+      if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
+        PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
+      } else {
+        PARQUET_THROW_NOT_OK(builder_->AppendNull());
+      }
+    }
+    ResetValues();
+  }
 
-template <>
-inline void TypedRecordReader<ByteArrayType>::ReadValuesDense(int64_t 
values_to_read) {
-  int64_t num_decoded = current_decoder_->DecodeArrowNonNull(
-      static_cast<int>(values_to_read), builder_.get());
-  DCHECK_EQ(num_decoded, values_to_read);
-  ResetValues();
-}
+ private:
+  std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_;
+};
 
-template <>
-inline void TypedRecordReader<FLBAType>::ReadValuesDense(int64_t 
values_to_read) {
-  auto values = ValuesHead<FLBA>();
-  int64_t num_decoded =
-      current_decoder_->Decode(values, static_cast<int>(values_to_read));
-  DCHECK_EQ(num_decoded, values_to_read);
+class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType> {
+ public:
+  ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, 
::arrow::MemoryPool* pool)
+      : TypedRecordReader<ByteArrayType>(descr, pool), builder_(nullptr) {
+    // Maximum of 16MB chunks
+    constexpr int32_t kBinaryChunksize = 1 << 24;
+    DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
+    if (descr_->logical_type() == LogicalType::UTF8) {
+      builder_.reset(
+          new ::arrow::internal::ChunkedStringBuilder(kBinaryChunksize, 
pool_));
+    } else {
+      builder_.reset(
+          new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, 
pool_));
+    }
+  }
 
-  for (int64_t i = 0; i < num_decoded; i++) {
-    PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
+  ::arrow::ArrayVector GetBuilderChunks() override {
+    ::arrow::ArrayVector chunks;
+    PARQUET_THROW_NOT_OK(builder_->Finish(&chunks));
+    return chunks;
   }
-  ResetValues();
-}
 
+  void ReadValuesDense(int64_t values_to_read) override {
+    int64_t num_decoded = current_decoder_->DecodeArrowNonNull(
+        static_cast<int>(values_to_read), builder_.get());
+    DCHECK_EQ(num_decoded, values_to_read);
+    ResetValues();
+  }
+
+  void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
+    int64_t num_decoded = current_decoder_->DecodeArrow(
+        static_cast<int>(values_to_read), static_cast<int>(null_count),
+        valid_bits_->mutable_data(), values_written_, builder_.get());
+    DCHECK_EQ(num_decoded, values_to_read);
+    ResetValues();
+  }
+
+ private:
+  std::unique_ptr<::arrow::internal::ChunkedBinaryBuilder> builder_;
+};
+
+template <typename BuilderType>
+class ByteArrayDictionaryRecordReader : public 
TypedRecordReader<ByteArrayType> {
+ public:
+  ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr,
+                                  ::arrow::MemoryPool* pool)
+      : TypedRecordReader<ByteArrayType>(descr, pool), builder_(new 
BuilderType(pool)) {}
+
+  ::arrow::ArrayVector GetBuilderChunks() override {
+    std::shared_ptr<::arrow::Array> chunk;
+    PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
+    return ::arrow::ArrayVector({chunk});
+  }
+
+  void ReadValuesDense(int64_t values_to_read) override {
+    int64_t num_decoded = current_decoder_->DecodeArrowNonNull(
+        static_cast<int>(values_to_read), builder_.get());
+    DCHECK_EQ(num_decoded, values_to_read);
+    ResetValues();
+  }
+
+  void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
+    int64_t num_decoded = current_decoder_->DecodeArrow(
+        static_cast<int>(values_to_read), static_cast<int>(null_count),
+        valid_bits_->mutable_data(), values_written_, builder_.get());
+    DCHECK_EQ(num_decoded, values_to_read);
+    ResetValues();
+  }
+
+ private:
+  std::unique_ptr<BuilderType> builder_;
+};
+
+// TODO(wesm): Implement these to some satisfaction
 template <>
-inline void TypedRecordReader<ByteArrayType>::ReadValuesSpaced(int64_t 
values_to_read,
-                                                               int64_t 
null_count) {
-  int64_t num_decoded = current_decoder_->DecodeArrow(
-      static_cast<int>(values_to_read), static_cast<int>(null_count),
-      valid_bits_->mutable_data(), values_written_, builder_.get());
-  DCHECK_EQ(num_decoded, values_to_read);
-  ResetValues();
-}
+void TypedRecordReader<Int96Type>::DebugPrintState() {}
 
 template <>
-inline void TypedRecordReader<FLBAType>::ReadValuesSpaced(int64_t 
values_to_read,
-                                                          int64_t null_count) {
-  uint8_t* valid_bits = valid_bits_->mutable_data();
-  const int64_t valid_bits_offset = values_written_;
-  auto values = ValuesHead<FLBA>();
-
-  int64_t num_decoded = current_decoder_->DecodeSpaced(
-      values, static_cast<int>(values_to_read), static_cast<int>(null_count), 
valid_bits,
-      valid_bits_offset);
-  DCHECK_EQ(num_decoded, values_to_read);
-
-  for (int64_t i = 0; i < num_decoded; i++) {
-    if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
-      PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
-    } else {
-      PARQUET_THROW_NOT_OK(builder_->AppendNull());
-    }
-  }
-  ResetValues();
-}
+void TypedRecordReader<ByteArrayType>::DebugPrintState() {}
+
+template <>
+void TypedRecordReader<FLBAType>::DebugPrintState() {}
 
 template <typename DType>
 inline void TypedRecordReader<DType>::ConfigureDictionary(const 
DictionaryPage* page) {
@@ -845,8 +865,27 @@ bool TypedRecordReader<DType>::ReadNewPage() {
   return true;
 }
 
+std::shared_ptr<RecordReader> RecordReader::MakeByteArrayRecordReader(
+    const ColumnDescriptor* descr, arrow::MemoryPool* pool, bool 
read_dictionary) {
+  if (read_dictionary) {
+    if (descr->logical_type() == LogicalType::UTF8) {
+      using Builder = ::arrow::StringDictionaryBuilder;
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new ByteArrayDictionaryRecordReader<Builder>(descr, 
pool)));
+    } else {
+      using Builder = ::arrow::BinaryDictionaryBuilder;
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new ByteArrayDictionaryRecordReader<Builder>(descr, 
pool)));
+    }
+  } else {
+    return std::shared_ptr<RecordReader>(
+        new RecordReader(new ByteArrayChunkedRecordReader(descr, pool)));
+  }
+}
+
 std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr,
-                                                 MemoryPool* pool) {
+                                                 MemoryPool* pool,
+                                                 const bool read_dictionary) {
   switch (descr->physical_type()) {
     case Type::BOOLEAN:
       return std::shared_ptr<RecordReader>(
@@ -867,11 +906,10 @@ std::shared_ptr<RecordReader> RecordReader::Make(const 
ColumnDescriptor* descr,
       return std::shared_ptr<RecordReader>(
           new RecordReader(new TypedRecordReader<DoubleType>(descr, pool)));
     case Type::BYTE_ARRAY:
-      return std::shared_ptr<RecordReader>(
-          new RecordReader(new TypedRecordReader<ByteArrayType>(descr, pool)));
+      return RecordReader::MakeByteArrayRecordReader(descr, pool, 
read_dictionary);
     case Type::FIXED_LEN_BYTE_ARRAY:
       return std::shared_ptr<RecordReader>(
-          new RecordReader(new TypedRecordReader<FLBAType>(descr, pool)));
+          new RecordReader(new FLBARecordReader(descr, pool)));
     default: {
       // PARQUET-1481: This can occur if the file is corrupt
       std::stringstream ss;
diff --git a/cpp/src/parquet/arrow/record_reader.h 
b/cpp/src/parquet/arrow/record_reader.h
index cc932c2..c999dd0 100644
--- a/cpp/src/parquet/arrow/record_reader.h
+++ b/cpp/src/parquet/arrow/record_reader.h
@@ -51,7 +51,8 @@ class RecordReader {
 
   static std::shared_ptr<RecordReader> Make(
       const ColumnDescriptor* descr,
-      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
+      const bool read_dictionary = false);
 
   virtual ~RecordReader();
 
@@ -111,6 +112,10 @@ class RecordReader {
  private:
   std::unique_ptr<RecordReaderImpl> impl_;
   explicit RecordReader(RecordReaderImpl* impl);
+
+  static std::shared_ptr<RecordReader> MakeByteArrayRecordReader(
+      const ColumnDescriptor* descr, ::arrow::MemoryPool* pool,
+      const bool read_dictionary);
 };
 
 }  // namespace internal
diff --git a/cpp/src/parquet/encoding-benchmark.cc 
b/cpp/src/parquet/encoding-benchmark.cc
index 8031aeb..0e88190 100644
--- a/cpp/src/parquet/encoding-benchmark.cc
+++ b/cpp/src/parquet/encoding-benchmark.cc
@@ -17,13 +17,29 @@
 
 #include "benchmark/benchmark.h"
 
+#include "arrow/array.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_dict.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/type.h"
+
 #include "parquet/encoding.h"
 #include "parquet/schema.h"
 #include "parquet/util/memory.h"
 
+#include <random>
+
 using arrow::default_memory_pool;
 using arrow::MemoryPool;
 
+namespace {
+// The min/max number of values used to drive each family of encoding 
benchmarks
+constexpr int MIN_RANGE = 1024;
+constexpr int MAX_RANGE = 65536;
+}  // namespace
+
 namespace parquet {
 
 using schema::PrimitiveNode;
@@ -39,14 +55,14 @@ static void BM_PlainEncodingBoolean(benchmark::State& 
state) {
   auto encoder = MakeEncoder(Type::BOOLEAN, Encoding::PLAIN);
   auto typed_encoder = dynamic_cast<BooleanEncoder*>(encoder.get());
 
-  while (state.KeepRunning()) {
+  for (auto _ : state) {
     typed_encoder->Put(values, static_cast<int>(values.size()));
     typed_encoder->FlushValues();
   }
   state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(bool));
 }
 
-BENCHMARK(BM_PlainEncodingBoolean)->Range(1024, 65536);
+BENCHMARK(BM_PlainEncodingBoolean)->Range(MIN_RANGE, MAX_RANGE);
 
 static void BM_PlainDecodingBoolean(benchmark::State& state) {
   std::vector<bool> values(state.range(0), true);
@@ -56,7 +72,7 @@ static void BM_PlainDecodingBoolean(benchmark::State& state) {
   typed_encoder->Put(values, static_cast<int>(values.size()));
   std::shared_ptr<Buffer> buf = encoder->FlushValues();
 
-  while (state.KeepRunning()) {
+  for (auto _ : state) {
     auto decoder = MakeTypedDecoder<BooleanType>(Encoding::PLAIN);
     decoder->SetData(static_cast<int>(values.size()), buf->data(),
                      static_cast<int>(buf->size()));
@@ -67,19 +83,19 @@ static void BM_PlainDecodingBoolean(benchmark::State& 
state) {
   delete[] output;
 }
 
-BENCHMARK(BM_PlainDecodingBoolean)->Range(1024, 65536);
+BENCHMARK(BM_PlainDecodingBoolean)->Range(MIN_RANGE, MAX_RANGE);
 
 static void BM_PlainEncodingInt64(benchmark::State& state) {
   std::vector<int64_t> values(state.range(0), 64);
   auto encoder = MakeTypedEncoder<Int64Type>(Encoding::PLAIN);
-  while (state.KeepRunning()) {
+  for (auto _ : state) {
     encoder->Put(values.data(), static_cast<int>(values.size()));
     encoder->FlushValues();
   }
   state.SetBytesProcessed(state.iterations() * state.range(0) * 
sizeof(int64_t));
 }
 
-BENCHMARK(BM_PlainEncodingInt64)->Range(1024, 65536);
+BENCHMARK(BM_PlainEncodingInt64)->Range(MIN_RANGE, MAX_RANGE);
 
 static void BM_PlainDecodingInt64(benchmark::State& state) {
   std::vector<int64_t> values(state.range(0), 64);
@@ -87,7 +103,7 @@ static void BM_PlainDecodingInt64(benchmark::State& state) {
   encoder->Put(values.data(), static_cast<int>(values.size()));
   std::shared_ptr<Buffer> buf = encoder->FlushValues();
 
-  while (state.KeepRunning()) {
+  for (auto _ : state) {
     auto decoder = MakeTypedDecoder<Int64Type>(Encoding::PLAIN);
     decoder->SetData(static_cast<int>(values.size()), buf->data(),
                      static_cast<int>(buf->size()));
@@ -96,7 +112,7 @@ static void BM_PlainDecodingInt64(benchmark::State& state) {
   state.SetBytesProcessed(state.iterations() * state.range(0) * 
sizeof(int64_t));
 }
 
-BENCHMARK(BM_PlainDecodingInt64)->Range(1024, 65536);
+BENCHMARK(BM_PlainDecodingInt64)->Range(MIN_RANGE, MAX_RANGE);
 
 template <typename Type>
 static void DecodeDict(std::vector<typename Type::c_type>& values,
@@ -126,7 +142,7 @@ static void DecodeDict(std::vector<typename Type::c_type>& 
values,
 
   PARQUET_THROW_NOT_OK(indices->Resize(actual_bytes));
 
-  while (state.KeepRunning()) {
+  for (auto _ : state) {
     auto dict_decoder = MakeTypedDecoder<Type>(Encoding::PLAIN, descr.get());
     dict_decoder->SetData(dict_traits->num_entries(), dict_buffer->data(),
                           static_cast<int>(dict_buffer->size()));
@@ -148,7 +164,7 @@ static void BM_DictDecodingInt64_repeats(benchmark::State& 
state) {
   DecodeDict<Type>(values, state);
 }
 
-BENCHMARK(BM_DictDecodingInt64_repeats)->Range(1024, 65536);
+BENCHMARK(BM_DictDecodingInt64_repeats)->Range(MIN_RANGE, MAX_RANGE);
 
 static void BM_DictDecodingInt64_literals(benchmark::State& state) {
   typedef Int64Type Type;
@@ -161,6 +177,192 @@ static void 
BM_DictDecodingInt64_literals(benchmark::State& state) {
   DecodeDict<Type>(values, state);
 }
 
-BENCHMARK(BM_DictDecodingInt64_literals)->Range(1024, 65536);
+BENCHMARK(BM_DictDecodingInt64_literals)->Range(MIN_RANGE, MAX_RANGE);
+
+// ----------------------------------------------------------------------
+// Shared benchmarks for decoding using arrow builders
+class BenchmarkDecodeArrow : public ::benchmark::Fixture {
+ public:
+  void SetUp(const ::benchmark::State& state) override {
+    num_values_ = static_cast<int>(state.range());
+    InitDataInputs();
+    DoEncodeData();
+  }
+
+  void TearDown(const ::benchmark::State& state) override {}
+
+  void InitDataInputs() {
+    // Generate a random string dictionary without any nulls so that this 
dataset can be
+    // used for benchmarking the DecodeArrowNonNull API
+    constexpr int repeat_factor = 8;
+    constexpr int64_t min_length = 2;
+    constexpr int64_t max_length = 10;
+    ::arrow::random::RandomArrayGenerator rag(0);
+    input_array_ = rag.StringWithRepeats(num_values_, num_values_ / 
repeat_factor,
+                                         min_length, max_length, 
/*null_probability=*/0);
+    valid_bits_ = input_array_->null_bitmap()->data();
+    values_ = std::vector<ByteArray>();
+    values_.reserve(num_values_);
+    total_size_ = 0;
+    const auto& binary_array = static_cast<const 
::arrow::BinaryArray&>(*input_array_);
+
+    for (int64_t i = 0; i < binary_array.length(); i++) {
+      auto view = binary_array.GetView(i);
+      values_.emplace_back(static_cast<uint32_t>(view.length()),
+                           reinterpret_cast<const uint8_t*>(view.data()));
+      total_size_ += view.length();
+    }
+  }
+
+  virtual void DoEncodeData() = 0;
+
+  virtual std::unique_ptr<ByteArrayDecoder> InitializeDecoder() = 0;
+
+  template <typename BuilderType>
+  std::unique_ptr<BuilderType> CreateBuilder();
+
+  template <typename BuilderType>
+  void DecodeArrowBenchmark(benchmark::State& state) {
+    for (auto _ : state) {
+      auto decoder = InitializeDecoder();
+      auto builder = CreateBuilder<BuilderType>();
+      decoder->DecodeArrow(num_values_, 0, valid_bits_, 0, builder.get());
+    }
+
+    state.SetBytesProcessed(state.iterations() * total_size_);
+  }
+
+  template <typename BuilderType>
+  void DecodeArrowNonNullBenchmark(benchmark::State& state) {
+    for (auto _ : state) {
+      auto decoder = InitializeDecoder();
+      auto builder = CreateBuilder<BuilderType>();
+      decoder->DecodeArrowNonNull(num_values_, builder.get());
+    }
+
+    state.SetBytesProcessed(state.iterations() * total_size_);
+  }
+
+ protected:
+  int num_values_;
+  std::shared_ptr<::arrow::Array> input_array_;
+  uint64_t total_size_;
+  std::vector<ByteArray> values_;
+  const uint8_t* valid_bits_;
+  std::shared_ptr<Buffer> buffer_;
+};
+
+using ::arrow::BinaryDictionaryBuilder;
+using ::arrow::internal::ChunkedBinaryBuilder;
+
+template <>
+std::unique_ptr<ChunkedBinaryBuilder> BenchmarkDecodeArrow::CreateBuilder() {
+  int chunk_size = static_cast<int>(buffer_->size());
+  return std::unique_ptr<ChunkedBinaryBuilder>(
+      new ChunkedBinaryBuilder(chunk_size, default_memory_pool()));
+}
+
+template <>
+std::unique_ptr<BinaryDictionaryBuilder> BenchmarkDecodeArrow::CreateBuilder() 
{
+  return std::unique_ptr<BinaryDictionaryBuilder>(
+      new BinaryDictionaryBuilder(default_memory_pool()));
+}
+
+// ----------------------------------------------------------------------
+// Benchmark Decoding from Plain Encoding
+class BM_PlainDecodingByteArray : public BenchmarkDecodeArrow {
+ public:
+  void DoEncodeData() override {
+    auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::PLAIN);
+    encoder->Put(values_.data(), num_values_);
+    buffer_ = encoder->FlushValues();
+  }
+
+  std::unique_ptr<ByteArrayDecoder> InitializeDecoder() override {
+    auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::PLAIN);
+    decoder->SetData(num_values_, buffer_->data(), 
static_cast<int>(buffer_->size()));
+    return decoder;
+  }
+};
+
+BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrow_Dense)
+(benchmark::State& state) { DecodeArrowBenchmark<ChunkedBinaryBuilder>(state); 
}
+BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrow_Dense)
+    ->Range(MIN_RANGE, MAX_RANGE);
+
+BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dense)
+(benchmark::State& state) { 
DecodeArrowNonNullBenchmark<ChunkedBinaryBuilder>(state); }
+BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dense)
+    ->Range(MIN_RANGE, MAX_RANGE);
+
+BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrow_Dict)
+(benchmark::State& state) { 
DecodeArrowBenchmark<BinaryDictionaryBuilder>(state); }
+BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrow_Dict)
+    ->Range(MIN_RANGE, MAX_RANGE);
+
+BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dict)
+(benchmark::State& state) { 
DecodeArrowNonNullBenchmark<BinaryDictionaryBuilder>(state); }
+BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dict)
+    ->Range(MIN_RANGE, MAX_RANGE);
+
+// ----------------------------------------------------------------------
+// Benchmark Decoding from Dictionary Encoding
+class BM_DictDecodingByteArray : public BenchmarkDecodeArrow {
+ public:
+  void DoEncodeData() override {
+    auto node = schema::ByteArray("name");
+    descr_ = std::unique_ptr<ColumnDescriptor>(new ColumnDescriptor(node, 0, 
0));
+    auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::PLAIN,
+                                                   /*use_dictionary=*/true, 
descr_.get());
+    ASSERT_NO_THROW(encoder->Put(values_.data(), num_values_));
+    buffer_ = encoder->FlushValues();
+
+    auto dict_encoder = 
dynamic_cast<DictEncoder<ByteArrayType>*>(encoder.get());
+    ASSERT_NE(dict_encoder, nullptr);
+    dict_buffer_ =
+        AllocateBuffer(default_memory_pool(), 
dict_encoder->dict_encoded_size());
+    dict_encoder->WriteDict(dict_buffer_->mutable_data());
+    num_dict_entries_ = dict_encoder->num_entries();
+  }
+
+  std::unique_ptr<ByteArrayDecoder> InitializeDecoder() override {
+    auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::PLAIN, 
descr_.get());
+    decoder->SetData(num_dict_entries_, dict_buffer_->data(),
+                     static_cast<int>(dict_buffer_->size()));
+    auto dict_decoder = MakeDictDecoder<ByteArrayType>(descr_.get());
+    dict_decoder->SetDict(decoder.get());
+    dict_decoder->SetData(num_values_, buffer_->data(),
+                          static_cast<int>(buffer_->size()));
+    return std::unique_ptr<ByteArrayDecoder>(
+        dynamic_cast<ByteArrayDecoder*>(dict_decoder.release()));
+  }
+
+ protected:
+  std::unique_ptr<ColumnDescriptor> descr_;
+  std::unique_ptr<DictDecoder<ByteArrayType>> dict_decoder_;
+  std::shared_ptr<Buffer> dict_buffer_;
+  int num_dict_entries_;
+};
+
+BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, 
DecodeArrow_Dense)(benchmark::State& state) {
+  DecodeArrowBenchmark<ChunkedBinaryBuilder>(state);
+}
+BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrow_Dense)
+    ->Range(MIN_RANGE, MAX_RANGE);
+
+BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dense)
+(benchmark::State& state) { 
DecodeArrowNonNullBenchmark<ChunkedBinaryBuilder>(state); }
+BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dense)
+    ->Range(MIN_RANGE, MAX_RANGE);
+
+BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrow_Dict)
+(benchmark::State& state) { 
DecodeArrowBenchmark<BinaryDictionaryBuilder>(state); }
+BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrow_Dict)
+    ->Range(MIN_RANGE, MAX_RANGE);
+
+BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dict)
+(benchmark::State& state) { 
DecodeArrowNonNullBenchmark<BinaryDictionaryBuilder>(state); }
+BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dict)
+    ->Range(MIN_RANGE, MAX_RANGE);
 
 }  // namespace parquet
diff --git a/cpp/src/parquet/encoding-test.cc b/cpp/src/parquet/encoding-test.cc
index 28d9812..4ec537a 100644
--- a/cpp/src/parquet/encoding-test.cc
+++ b/cpp/src/parquet/encoding-test.cc
@@ -22,6 +22,12 @@
 #include <string>
 #include <vector>
 
+#include "arrow/array.h"
+#include "arrow/compute/api.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/type.h"
 #include "arrow/util/bit-util.h"
 
 #include "parquet/encoding.h"
@@ -36,6 +42,13 @@ using arrow::MemoryPool;
 using std::string;
 using std::vector;
 
+// TODO(hatemhelal): investigate whether this can be replaced with GTEST_SKIP 
in a future
+// gtest release that contains https://github.com/google/googletest/pull/1544
+#define SKIP_TEST_IF(condition) \
+  if (condition) {              \
+    return;                     \
+  }
+
 namespace parquet {
 
 namespace test {
@@ -314,6 +327,258 @@ TEST(TestDictionaryEncoding, CannotDictDecodeBoolean) {
   ASSERT_THROW(MakeDictDecoder<BooleanType>(nullptr), ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// Shared arrow builder decode tests
+template <typename T>
+struct BuilderTraits {};
+
+template <>
+struct BuilderTraits<::arrow::BinaryType> {
+  using DenseArrayBuilder = ::arrow::internal::ChunkedBinaryBuilder;
+  using DictArrayBuilder = ::arrow::BinaryDictionaryBuilder;
+};
+
+template <>
+struct BuilderTraits<::arrow::StringType> {
+  using DenseArrayBuilder = ::arrow::internal::ChunkedStringBuilder;
+  using DictArrayBuilder = ::arrow::StringDictionaryBuilder;
+};
+
+template <typename DType>
+class TestArrowBuilderDecoding : public ::testing::Test {
+ public:
+  using DenseBuilder = typename BuilderTraits<DType>::DenseArrayBuilder;
+  using DictBuilder = typename BuilderTraits<DType>::DictArrayBuilder;
+
+  void SetUp() override { null_probabilities_ = {0.0, 0.5, 1.0}; }
+  void TearDown() override {}
+
+  void InitTestCase(double null_probability) {
+    GenerateInputData(null_probability);
+    SetupEncoderDecoder();
+  }
+
+  void GenerateInputData(double null_probability) {
+    constexpr int num_unique = 100;
+    constexpr int repeat = 10;
+    constexpr int64_t min_length = 2;
+    constexpr int64_t max_length = 10;
+    ::arrow::random::RandomArrayGenerator rag(0);
+    expected_dense_ = rag.StringWithRepeats(repeat * num_unique, num_unique, 
min_length,
+                                            max_length, null_probability);
+
+    std::shared_ptr<::arrow::DataType> data_type = std::make_shared<DType>();
+
+    if (data_type->id() == ::arrow::BinaryType::type_id) {
+      // TODO(hatemhelal):  this is a kludge.  Probably best to extend the
+      // RandomArrayGenerator to also generate BinaryType arrays.
+      auto data = expected_dense_->data()->Copy();
+      data->type = data_type;
+      expected_dense_ = std::make_shared<::arrow::BinaryArray>(data);
+    }
+
+    num_values_ = static_cast<int>(expected_dense_->length());
+    null_count_ = static_cast<int>(expected_dense_->null_count());
+    valid_bits_ = expected_dense_->null_bitmap()->data();
+
+    auto builder = CreateDictBuilder();
+    ASSERT_OK(builder->AppendArray(*expected_dense_));
+    ASSERT_OK(builder->Finish(&expected_dict_));
+
+    // Initialize input_data_ for the encoder from the expected_array_ values
+    const auto& binary_array = static_cast<const 
::arrow::BinaryArray&>(*expected_dense_);
+    input_data_.reserve(binary_array.length());
+
+    for (int64_t i = 0; i < binary_array.length(); ++i) {
+      auto view = binary_array.GetView(i);
+      input_data_[i] = {static_cast<uint32_t>(view.length()),
+                        reinterpret_cast<const uint8_t*>(view.data())};
+    }
+  }
+
+  std::unique_ptr<DenseBuilder> CreateDenseBuilder() {
+    // Use same default chunk size of 16MB as used in 
ByteArrayChunkedRecordReader
+    constexpr int32_t kChunkSize = 1 << 24;
+    return std::unique_ptr<DenseBuilder>(
+        new DenseBuilder(kChunkSize, default_memory_pool()));
+  }
+
+  std::unique_ptr<DictBuilder> CreateDictBuilder() {
+    return std::unique_ptr<DictBuilder>(new 
DictBuilder(default_memory_pool()));
+  }
+
+  // Setup encoder/decoder pair for testing with
+  virtual void SetupEncoderDecoder() = 0;
+
+  template <typename Builder>
+  void CheckDense(int actual_num_values, Builder& builder) {
+    ASSERT_EQ(actual_num_values, num_values_);
+    ::arrow::ArrayVector actual_vec;
+    ASSERT_OK(builder.Finish(&actual_vec));
+    ASSERT_EQ(actual_vec.size(), 1);
+    ASSERT_ARRAYS_EQUAL(*actual_vec[0], *expected_dense_);
+  }
+
+  template <typename Builder>
+  void CheckDict(int actual_num_values, Builder& builder) {
+    ASSERT_EQ(actual_num_values, num_values_);
+    std::shared_ptr<::arrow::Array> actual;
+    ASSERT_OK(builder.Finish(&actual));
+    ASSERT_ARRAYS_EQUAL(*actual, *expected_dict_);
+  }
+
+  void CheckDecodeArrowUsingDenseBuilder() {
+    for (auto np : null_probabilities_) {
+      InitTestCase(np);
+      auto builder = CreateDenseBuilder();
+      auto actual_num_values =
+          decoder_->DecodeArrow(num_values_, null_count_, valid_bits_, 0, 
builder.get());
+      CheckDense(actual_num_values, *builder);
+    }
+  }
+
+  void CheckDecodeArrowUsingDictBuilder() {
+    for (auto np : null_probabilities_) {
+      InitTestCase(np);
+      auto builder = CreateDictBuilder();
+      auto actual_num_values =
+          decoder_->DecodeArrow(num_values_, null_count_, valid_bits_, 0, 
builder.get());
+      CheckDict(actual_num_values, *builder);
+    }
+  }
+
+  void CheckDecodeArrowNonNullUsingDenseBuilder() {
+    for (auto np : null_probabilities_) {
+      InitTestCase(np);
+      SKIP_TEST_IF(null_count_ > 0)
+      auto builder = CreateDenseBuilder();
+      auto actual_num_values = decoder_->DecodeArrowNonNull(num_values_, 
builder.get());
+      CheckDense(actual_num_values, *builder);
+    }
+  }
+
+  void CheckDecodeArrowNonNullUsingDictBuilder() {
+    for (auto np : null_probabilities_) {
+      InitTestCase(np);
+      SKIP_TEST_IF(null_count_ > 0)
+      auto builder = CreateDictBuilder();
+      auto actual_num_values = decoder_->DecodeArrowNonNull(num_values_, 
builder.get());
+      CheckDict(actual_num_values, *builder);
+    }
+  }
+
+ protected:
+  std::vector<double> null_probabilities_;
+  std::shared_ptr<::arrow::Array> expected_dict_;
+  std::shared_ptr<::arrow::Array> expected_dense_;
+  int num_values_;
+  int null_count_;
+  vector<ByteArray> input_data_;
+  const uint8_t* valid_bits_;
+  std::unique_ptr<ByteArrayEncoder> encoder_;
+  std::unique_ptr<ByteArrayDecoder> decoder_;
+  std::shared_ptr<Buffer> buffer_;
+};
+
+#define TEST_ARROW_BUILDER_BASE_MEMBERS()             \
+  using TestArrowBuilderDecoding<DType>::encoder_;    \
+  using TestArrowBuilderDecoding<DType>::decoder_;    \
+  using TestArrowBuilderDecoding<DType>::input_data_; \
+  using TestArrowBuilderDecoding<DType>::valid_bits_; \
+  using TestArrowBuilderDecoding<DType>::num_values_; \
+  using TestArrowBuilderDecoding<DType>::buffer_
+
+template <typename DType>
+class PlainEncoding : public TestArrowBuilderDecoding<DType> {
+ public:
+  void SetupEncoderDecoder() override {
+    encoder_ = MakeTypedEncoder<ByteArrayType>(Encoding::PLAIN);
+    decoder_ = MakeTypedDecoder<ByteArrayType>(Encoding::PLAIN);
+    ASSERT_NO_THROW(encoder_->PutSpaced(input_data_.data(), num_values_, 
valid_bits_, 0));
+    buffer_ = encoder_->FlushValues();
+    decoder_->SetData(num_values_, buffer_->data(), 
static_cast<int>(buffer_->size()));
+  }
+
+ protected:
+  TEST_ARROW_BUILDER_BASE_MEMBERS();
+};
+
+using BuilderArrayTypes = ::testing::Types<::arrow::BinaryType, 
::arrow::StringType>;
+// using BuilderArrayTypes = ::testing::Types<::arrow::StringType>;
+TYPED_TEST_CASE(PlainEncoding, BuilderArrayTypes);
+
+TYPED_TEST(PlainEncoding, CheckDecodeArrowUsingDenseBuilder) {
+  this->CheckDecodeArrowUsingDenseBuilder();
+}
+
+TYPED_TEST(PlainEncoding, CheckDecodeArrowUsingDictBuilder) {
+  this->CheckDecodeArrowUsingDictBuilder();
+}
+
+TYPED_TEST(PlainEncoding, CheckDecodeArrowNonNullDenseBuilder) {
+  this->CheckDecodeArrowNonNullUsingDenseBuilder();
+}
+
+TYPED_TEST(PlainEncoding, CheckDecodeArrowNonNullDictBuilder) {
+  this->CheckDecodeArrowNonNullUsingDictBuilder();
+}
+
+template <typename DType>
+class DictEncoding : public TestArrowBuilderDecoding<DType> {
+ public:
+  void SetupEncoderDecoder() override {
+    auto node = schema::ByteArray("name");
+    descr_ = std::unique_ptr<ColumnDescriptor>(new ColumnDescriptor(node, 0, 
0));
+    encoder_ = MakeTypedEncoder<ByteArrayType>(Encoding::PLAIN, 
/*use_dictionary=*/true,
+                                               descr_.get());
+    ASSERT_NO_THROW(encoder_->PutSpaced(input_data_.data(), num_values_, 
valid_bits_, 0));
+    buffer_ = encoder_->FlushValues();
+
+    auto dict_encoder = 
dynamic_cast<DictEncoder<ByteArrayType>*>(encoder_.get());
+    ASSERT_NE(dict_encoder, nullptr);
+    dict_buffer_ =
+        AllocateBuffer(default_memory_pool(), 
dict_encoder->dict_encoded_size());
+    dict_encoder->WriteDict(dict_buffer_->mutable_data());
+
+    // Simulate reading the dictionary page followed by a data page
+    plain_decoder_ = MakeTypedDecoder<ByteArrayType>(Encoding::PLAIN, 
descr_.get());
+    plain_decoder_->SetData(dict_encoder->num_entries(), dict_buffer_->data(),
+                            static_cast<int>(dict_buffer_->size()));
+
+    dict_decoder_ = MakeDictDecoder<ByteArrayType>(descr_.get());
+    dict_decoder_->SetDict(plain_decoder_.get());
+    dict_decoder_->SetData(num_values_, buffer_->data(),
+                           static_cast<int>(buffer_->size()));
+    decoder_ = std::unique_ptr<ByteArrayDecoder>(
+        dynamic_cast<ByteArrayDecoder*>(dict_decoder_.release()));
+  }
+
+ protected:
+  TEST_ARROW_BUILDER_BASE_MEMBERS();
+  std::unique_ptr<ColumnDescriptor> descr_;
+  std::unique_ptr<ByteArrayDecoder> plain_decoder_;
+  std::unique_ptr<DictDecoder<ByteArrayType>> dict_decoder_;
+  std::shared_ptr<Buffer> dict_buffer_;
+};
+
+TYPED_TEST_CASE(DictEncoding, BuilderArrayTypes);
+
+TYPED_TEST(DictEncoding, CheckDecodeArrowUsingDenseBuilder) {
+  this->CheckDecodeArrowUsingDenseBuilder();
+}
+
+TYPED_TEST(DictEncoding, CheckDecodeArrowUsingDictBuilder) {
+  this->CheckDecodeArrowUsingDictBuilder();
+}
+
+TYPED_TEST(DictEncoding, CheckDecodeArrowNonNullDenseBuilder) {
+  this->CheckDecodeArrowNonNullUsingDenseBuilder();
+}
+
+TYPED_TEST(DictEncoding, CheckDecodeArrowNonNullDictBuilder) {
+  this->CheckDecodeArrowNonNullUsingDictBuilder();
+}
+
 }  // namespace test
 
 }  // namespace parquet
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index da63067..217ee80 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -24,7 +24,6 @@
 #include <utility>
 #include <vector>
 
-#include "arrow/builder.h"
 #include "arrow/status.h"
 #include "arrow/util/bit-stream-utils.h"
 #include "arrow/util/bit-util.h"
@@ -697,40 +696,12 @@ class PlainByteArrayDecoder : public 
PlainDecoder<ByteArrayType>,
   using Base::DecodeSpaced;
   using Base::PlainDecoder;
 
-  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
-                  int64_t valid_bits_offset,
-                  ::arrow::internal::ChunkedBinaryBuilder* out) override {
-    int result = 0;
-    PARQUET_THROW_NOT_OK(
-        DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, 
out, &result));
-    return result;
-  }
-
-  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
-                  int64_t valid_bits_offset,
-                  ::arrow::BinaryDictionaryBuilder* out) override {
-    int result = 0;
-    PARQUET_THROW_NOT_OK(
-        DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, 
out, &result));
-    return result;
-  }
-
-  int DecodeArrowNonNull(int num_values,
-                         ::arrow::internal::ChunkedBinaryBuilder* out) 
override {
-    int result = 0;
-    PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, out, &result));
-    return result;
-  }
-
  private:
-  template <typename BuilderType>
   ::arrow::Status DecodeArrow(int num_values, int null_count, const uint8_t* 
valid_bits,
-                              int64_t valid_bits_offset, BuilderType* out,
-                              int* values_decoded) {
+                              int64_t valid_bits_offset, 
WrappedBuilderInterface* builder,
+                              int* values_decoded) override {
     num_values = std::min(num_values, num_values_);
-
-    ARROW_RETURN_NOT_OK(out->Reserve(num_values));
-
+    builder->Reserve(num_values);
     ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, 
num_values);
     int increment;
     int i = 0;
@@ -744,15 +715,15 @@ class PlainByteArrayDecoder : public 
PlainDecoder<ByteArrayType>,
         if (data_size < increment) {
           ParquetException::EofException();
         }
-        ARROW_RETURN_NOT_OK(out->Append(data + sizeof(uint32_t), len));
+        builder->Append(data + sizeof(uint32_t), len);
         data += increment;
         data_size -= increment;
         bytes_decoded += increment;
-        ++i;
       } else {
-        ARROW_RETURN_NOT_OK(out->AppendNull());
+        builder->AppendNull();
       }
       bit_reader.Next();
+      ++i;
     }
 
     data_ += bytes_decoded;
@@ -762,23 +733,24 @@ class PlainByteArrayDecoder : public 
PlainDecoder<ByteArrayType>,
     return ::arrow::Status::OK();
   }
 
-  ::arrow::Status DecodeArrowNonNull(int num_values,
-                                     ::arrow::internal::ChunkedBinaryBuilder* 
out,
-                                     int* values_decoded) {
+  ::arrow::Status DecodeArrowNonNull(int num_values, WrappedBuilderInterface* 
builder,
+                                     int* values_decoded) override {
     num_values = std::min(num_values, num_values_);
-    ARROW_RETURN_NOT_OK(out->Reserve(num_values));
+    builder->Reserve(num_values);
     int i = 0;
     const uint8_t* data = data_;
     int64_t data_size = len_;
     int bytes_decoded = 0;
+
     while (i < num_values) {
       uint32_t len = *reinterpret_cast<const uint32_t*>(data);
       int increment = static_cast<int>(sizeof(uint32_t) + len);
       if (data_size < increment) ParquetException::EofException();
-      ARROW_RETURN_NOT_OK(out->Append(data + sizeof(uint32_t), len));
+      builder->Append(data + sizeof(uint32_t), len);
       data += increment;
       data_size -= increment;
       bytes_decoded += increment;
+      ++i;
     }
 
     data_ += bytes_decoded;
@@ -916,39 +888,13 @@ class DictByteArrayDecoder : public 
DictDecoderImpl<ByteArrayType>,
   using BASE = DictDecoderImpl<ByteArrayType>;
   using BASE::DictDecoderImpl;
 
-  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
-                  int64_t valid_bits_offset,
-                  ::arrow::internal::ChunkedBinaryBuilder* out) override {
-    int result = 0;
-    PARQUET_THROW_NOT_OK(
-        DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, 
out, &result));
-    return result;
-  }
-
-  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
-                  int64_t valid_bits_offset,
-                  ::arrow::BinaryDictionaryBuilder* out) override {
-    int result = 0;
-    PARQUET_THROW_NOT_OK(
-        DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, 
out, &result));
-    return result;
-  }
-
-  int DecodeArrowNonNull(int num_values,
-                         ::arrow::internal::ChunkedBinaryBuilder* out) 
override {
-    int result = 0;
-    PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, out, &result));
-    return result;
-  }
-
  private:
-  template <typename BuilderType>
   ::arrow::Status DecodeArrow(int num_values, int null_count, const uint8_t* 
valid_bits,
-                              int64_t valid_bits_offset, BuilderType* builder,
-                              int* out_num_values) {
+                              int64_t valid_bits_offset, 
WrappedBuilderInterface* builder,
+                              int* out_num_values) override {
     constexpr int32_t buffer_size = 1024;
     int32_t indices_buffer[buffer_size];
-
+    builder->Reserve(num_values);
     ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, 
num_values);
 
     int values_decoded = 0;
@@ -966,10 +912,10 @@ class DictByteArrayDecoder : public 
DictDecoderImpl<ByteArrayType>,
           // Consume all indices
           if (is_valid) {
             const auto& val = dictionary_[indices_buffer[i]];
-            ARROW_RETURN_NOT_OK(builder->Append(val.ptr, val.len));
+            builder->Append(val.ptr, val.len);
             ++i;
           } else {
-            ARROW_RETURN_NOT_OK(builder->AppendNull());
+            builder->AppendNull();
             --null_count;
           }
           ++values_decoded;
@@ -982,7 +928,7 @@ class DictByteArrayDecoder : public 
DictDecoderImpl<ByteArrayType>,
           bit_reader.Next();
         }
       } else {
-        ARROW_RETURN_NOT_OK(builder->AppendNull());
+        builder->AppendNull();
         --null_count;
         ++values_decoded;
       }
@@ -995,18 +941,20 @@ class DictByteArrayDecoder : public 
DictDecoderImpl<ByteArrayType>,
     return ::arrow::Status::OK();
   }
 
-  template <typename BuilderType>
-  ::arrow::Status DecodeArrowNonNull(int num_values, BuilderType* builder,
-                                     int* out_num_values) {
+  ::arrow::Status DecodeArrowNonNull(int num_values, WrappedBuilderInterface* 
builder,
+                                     int* out_num_values) override {
     constexpr int32_t buffer_size = 2048;
     int32_t indices_buffer[buffer_size];
     int values_decoded = 0;
+    builder->Reserve(num_values);
+
     while (values_decoded < num_values) {
-      int num_indices = idx_decoder_.GetBatch(indices_buffer, buffer_size);
+      int32_t batch_size = std::min<int32_t>(buffer_size, num_values - 
values_decoded);
+      int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size);
       if (num_indices == 0) break;
       for (int i = 0; i < num_indices; ++i) {
         const auto& val = dictionary_[indices_buffer[i]];
-        PARQUET_THROW_NOT_OK(builder->Append(val.ptr, val.len));
+        builder->Append(val.ptr, val.len);
       }
       values_decoded += num_indices;
     }
diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h
index 046296c..09c1d0f 100644
--- a/cpp/src/parquet/encoding.h
+++ b/cpp/src/parquet/encoding.h
@@ -32,17 +32,6 @@
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"
 
-namespace arrow {
-
-class BinaryDictionaryBuilder;
-
-namespace internal {
-
-class ChunkedBinaryBuilder;
-
-}  // namespace internal
-}  // namespace arrow
-
 namespace parquet {
 
 class ColumnDescriptor;
@@ -207,18 +196,61 @@ using DoubleDecoder = TypedDecoder<DoubleType>;
 class ByteArrayDecoder : virtual public TypedDecoder<ByteArrayType> {
  public:
   using TypedDecoder<ByteArrayType>::DecodeSpaced;
-  virtual int DecodeArrow(int num_values, int null_count, const uint8_t* 
valid_bits,
-                          int64_t valid_bits_offset,
-                          ::arrow::internal::ChunkedBinaryBuilder* builder) = 
0;
-
-  virtual int DecodeArrow(int num_values, int null_count, const uint8_t* 
valid_bits,
-                          int64_t valid_bits_offset,
-                          ::arrow::BinaryDictionaryBuilder* builder) = 0;
-
-  // TODO(wesm): Implement DecodeArrowNonNull as part of ARROW-3325
-  // See also ARROW-3772, ARROW-3769
-  virtual int DecodeArrowNonNull(int num_values,
-                                 ::arrow::internal::ChunkedBinaryBuilder* 
builder) = 0;
+
+  class WrappedBuilderInterface {
+   public:
+    virtual void Reserve(int64_t values) = 0;
+    virtual void Append(const uint8_t* value, uint32_t length) = 0;
+    virtual void AppendNull() = 0;
+    virtual ~WrappedBuilderInterface() = default;
+  };
+
+  template <typename Builder>
+  class WrappedBuilder : public WrappedBuilderInterface {
+   public:
+    explicit WrappedBuilder(Builder* builder) : builder_(builder) {}
+
+    void Reserve(int64_t values) override {
+      PARQUET_THROW_NOT_OK(builder_->Reserve(values));
+    }
+    void Append(const uint8_t* value, uint32_t length) override {
+      PARQUET_THROW_NOT_OK(builder_->Append(value, length));
+    }
+
+    void AppendNull() override { PARQUET_THROW_NOT_OK(builder_->AppendNull()); 
}
+
+   private:
+    Builder* builder_;
+  };
+
+  template <typename Builder>
+  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+                  int64_t valid_bits_offset, Builder* builder) {
+    int result = 0;
+    WrappedBuilder<Builder> wrapped_builder(builder);
+    PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
+                                     valid_bits_offset, &wrapped_builder, 
&result));
+    return result;
+  }
+
+  template <typename Builder>
+  int DecodeArrowNonNull(int num_values, Builder* builder) {
+    int result = 0;
+    WrappedBuilder<Builder> wrapped_builder(builder);
+    PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, &wrapped_builder, 
&result));
+    return result;
+  }
+
+ private:
+  virtual ::arrow::Status DecodeArrow(int num_values, int null_count,
+                                      const uint8_t* valid_bits,
+                                      int64_t valid_bits_offset,
+                                      WrappedBuilderInterface* builder,
+                                      int* values_decoded) = 0;
+
+  virtual ::arrow::Status DecodeArrowNonNull(int num_values,
+                                             WrappedBuilderInterface* builder,
+                                             int* values_decoded) = 0;
 };
 
 class FLBADecoder : virtual public TypedDecoder<FLBAType> {

Reply via email to