IMPALA-4177,IMPALA-6039: batched bit reading and rle decoding

Switch the decoders to using more batch-oriented interfaces. As an
intermediate step this doesn't make the interfaces of LevelDecoder
or DictDecoder batch-oriented, only the lower-level utility classes.

The next step would be to change those interfaces to be batch-oriented
and make according optimisations in parquet. This could deliver much
larger perf improvements than the current patch.

The high-level changes are.
* BitReader -> BatchedBitReader, which is built to unpack runs of 32
  bit-packed values efficiently.
* RleDecoder -> RleBatchDecoder, which exposes the repeated and literal
  runs to the caller and uses BatchedBitReader to unpack literal runs
  efficiently.
* Dict decoding uses RleBatchDecoder to decode repeated runs efficiently
  and uses the BitPacking utilities to unpack and encode in a single
  step.

Also removes an older benchmark that isn't too interesting (since
the batch-oriented approach to encoding and decoding is so much
faster than the value-by-value approach).

Testing:
* Ran core tests.
* Updated unit tests to exercise new code.
* Added test coverage for the deprecated bit-packed level encoding to
  that it still works (there was no coverage previously).

Perf:
Single-node benchmarks showed a few % performance gain. 16 node cluster
benchmarks only showed a gain for TPC-H nested.

Change-Id: I35de0cf80c86f501c4a39270afc8fb8111552ac6
Reviewed-on: http://gerrit.cloudera.org:8080/8267
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ae116b5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ae116b5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ae116b5b

Branch: refs/heads/master
Commit: ae116b5bf7b8b2514d7a8655d9a6666ad3fd36dd
Parents: 155bb77
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Mon Oct 9 17:09:39 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Thu Nov 16 21:23:09 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/CMakeLists.txt                |   1 -
 be/src/benchmarks/bit-packing-benchmark.cc      | 120 ++++++
 be/src/benchmarks/rle-benchmark.cc              | 244 ------------
 be/src/exec/parquet-column-readers.cc           | 174 +++++----
 be/src/exec/parquet-column-readers.h            |  90 +++--
 be/src/experiments/bit-stream-utils.8byte.h     | 137 -------
 .../experiments/bit-stream-utils.8byte.inline.h | 145 -------
 be/src/util/bit-packing.h                       |  61 ++-
 be/src/util/bit-packing.inline.h                | 210 ++++++++--
 be/src/util/bit-stream-utils.h                  |  77 ++--
 be/src/util/bit-stream-utils.inline.h           | 106 +++--
 be/src/util/dict-encoding.h                     |  94 +++--
 be/src/util/dict-test.cc                        |  96 ++++-
 be/src/util/parquet-reader.cc                   |  21 +-
 be/src/util/rle-encoding.h                      | 383 +++++++++++++++----
 be/src/util/rle-test.cc                         | 185 +++++----
 testdata/data/README                            |   9 +
 .../alltypes_agg_bitpacked_def_levels.parquet   | Bin 0 -> 208380 bytes
 .../queries/QueryTest/parquet-def-levels.test   |  52 +++
 tests/query_test/test_scanners.py               |  16 +
 20 files changed, 1253 insertions(+), 968 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/benchmarks/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/CMakeLists.txt b/be/src/benchmarks/CMakeLists.txt
index 1d67d45..a569a66 100644
--- a/be/src/benchmarks/CMakeLists.txt
+++ b/be/src/benchmarks/CMakeLists.txt
@@ -48,7 +48,6 @@ ADD_BE_BENCHMARK(multiint-benchmark)
 ADD_BE_BENCHMARK(network-perf-benchmark)
 ADD_BE_BENCHMARK(overflow-benchmark)
 ADD_BE_BENCHMARK(parse-timestamp-benchmark)
-ADD_BE_BENCHMARK(rle-benchmark)
 ADD_BE_BENCHMARK(row-batch-serialize-benchmark)
 ADD_BE_BENCHMARK(scheduler-benchmark)
 ADD_BE_BENCHMARK(status-benchmark)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/benchmarks/bit-packing-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/bit-packing-benchmark.cc 
b/be/src/benchmarks/bit-packing-benchmark.cc
index 955c0fb..7769182 100644
--- a/be/src/benchmarks/bit-packing-benchmark.cc
+++ b/be/src/benchmarks/bit-packing-benchmark.cc
@@ -285,6 +285,126 @@ struct BenchmarkParams {
   int64_t data_len;
 };
 
+/// Legacy value-at-a-time implementation of bit unpacking. Retained here for
+/// purposes of comparison in the benchmark.
+class BitReader {
+ public:
+  /// 'buffer' is the buffer to read from.  The buffer's length is 
'buffer_len'.
+  /// Does not take ownership of the buffer.
+  BitReader(const uint8_t* buffer, int buffer_len) { Reset(buffer, 
buffer_len); }
+
+  BitReader() : buffer_(NULL), max_bytes_(0) {}
+
+  // The implicit copy constructor is left defined. If a BitReader is copied, 
the
+  // two copies do not share any state. Invoking functions on either copy 
continues
+  // reading from the current read position without modifying the state of the 
other
+  // copy.
+
+  /// Resets the read to start reading from the start of 'buffer'. The buffer's
+  /// length is 'buffer_len'. Does not take ownership of the buffer.
+  void Reset(const uint8_t* buffer, int buffer_len) {
+    buffer_ = buffer;
+    max_bytes_ = buffer_len;
+    byte_offset_ = 0;
+    bit_offset_ = 0;
+    int num_bytes = std::min(8, max_bytes_);
+    memcpy(&buffered_values_, buffer_, num_bytes);
+  }
+
+  /// Gets the next value from the buffer.  Returns true if 'v' could be read 
or false if
+  /// there are not enough bytes left. num_bits must be <= 32.
+  template<typename T>
+  bool GetValue(int num_bits, T* v);
+
+  /// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T 
needs to be a
+  /// little-endian native type and big enough to store 'num_bytes'. The value 
is assumed
+  /// to be byte-aligned so the stream will be advanced to the start of the 
next byte
+  /// before 'v' is read. Returns false if there are not enough bytes left.
+  template<typename T>
+  bool GetBytes(int num_bytes, T* v);
+
+  /// Returns the number of bytes left in the stream, not including the 
current byte (i.e.,
+  /// there may be an additional fraction of a byte).
+  int bytes_left() { return max_bytes_ - (byte_offset_ + 
BitUtil::Ceil(bit_offset_, 8)); }
+
+  /// Maximum supported bitwidth for reader.
+  static const int MAX_BITWIDTH = 32;
+
+ private:
+  const uint8_t* buffer_;
+  int max_bytes_;
+
+  /// Bytes are memcpy'd from buffer_ and values are read from this variable. 
This is
+  /// faster than reading values byte by byte directly from buffer_.
+  uint64_t buffered_values_;
+
+  int byte_offset_;       // Offset in buffer_
+  int bit_offset_;        // Offset in buffered_values_
+};
+
+template <typename T>
+bool BitReader::GetValue(int num_bits, T* v) {
+  DCHECK(num_bits == 0 || buffer_ != NULL);
+  // TODO: revisit this limit if necessary
+  DCHECK_LE(num_bits, MAX_BITWIDTH);
+  DCHECK_LE(num_bits, sizeof(T) * 8);
+
+  // First do a cheap check to see if we may read past the end of the stream, 
using
+  // constant upper bounds for 'bit_offset_' and 'num_bits'.
+  if (UNLIKELY(byte_offset_ + sizeof(buffered_values_) + MAX_BITWIDTH / 8 > 
max_bytes_)) {
+    // Now do the precise check.
+    if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) {
+      return false;
+    }
+  }
+
+  DCHECK_GE(bit_offset_, 0);
+  DCHECK_LE(bit_offset_, 64);
+  *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> 
bit_offset_;
+
+  bit_offset_ += num_bits;
+  if (bit_offset_ >= 64) {
+    byte_offset_ += 8;
+    bit_offset_ -= 64;
+
+    int bytes_remaining = max_bytes_ - byte_offset_;
+    if (LIKELY(bytes_remaining >= 8)) {
+      memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
+    } else {
+      memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
+    }
+
+    // Read bits of v that crossed into new buffered_values_
+    *v |= BitUtil::TrailingBits(buffered_values_, bit_offset_)
+          << (num_bits - bit_offset_);
+  }
+  DCHECK_LE(bit_offset_, 64);
+  return true;
+}
+
+template<typename T>
+bool BitReader::GetBytes(int num_bytes, T* v) {
+  DCHECK_LE(num_bytes, sizeof(T));
+  int bytes_read = BitUtil::Ceil(bit_offset_, 8);
+  if (UNLIKELY(byte_offset_ + bytes_read + num_bytes > max_bytes_)) return 
false;
+
+  // Advance byte_offset to next unread byte and read num_bytes
+  byte_offset_ += bytes_read;
+  *v = 0; // Ensure unset bytes are initialized to zero.
+  memcpy(v, buffer_ + byte_offset_, num_bytes);
+  byte_offset_ += num_bytes;
+
+  // Reset buffered_values_
+  bit_offset_ = 0;
+  int bytes_remaining = max_bytes_ - byte_offset_;
+  if (LIKELY(bytes_remaining >= 8)) {
+    memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
+  } else {
+    memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
+  }
+  return true;
+}
+
 /// Benchmark calling BitReader::GetValue() in a loop to unpack 32 * 
'batch_size' values.
 void BitReaderBenchmark(int batch_size, void* data) {
   const BenchmarkParams* p = reinterpret_cast<BenchmarkParams*>(data);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/benchmarks/rle-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/rle-benchmark.cc 
b/be/src/benchmarks/rle-benchmark.cc
deleted file mode 100644
index 439c630..0000000
--- a/be/src/benchmarks/rle-benchmark.cc
+++ /dev/null
@@ -1,244 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <iostream>
-#include <sstream>
-
-#include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
-#include "experiments/bit-stream-utils.8byte.inline.h"
-#include "util/benchmark.h"
-#include "util/bit-stream-utils.inline.h"
-#include "util/cpu-info.h"
-
-#include "common/names.h"
-
-// Benchmark to measure how quickly we can do bit encoding and decoding.
-
-// encode:               Function     Rate (iters/ms)          Comparison
-// ----------------------------------------------------------------------
-//     "BitWriter (8 byte) 1-Bit"                66.9                  1X
-//              "BitWriter 1-Bit"               107.3              1.604X
-//     "BitWriter (8 byte) 2-Bit"               74.42                  1X
-//              "BitWriter 2-Bit"               105.6              1.419X
-//     "BitWriter (8 byte) 3-Bit"               76.91                  1X
-//              "BitWriter 3-Bit"                 104              1.353X
-//     "BitWriter (8 byte) 4-Bit"               80.37                  1X
-//              "BitWriter 4-Bit"               102.7              1.278X
-//     "BitWriter (8 byte) 5-Bit"               79.29                  1X
-//              "BitWriter 5-Bit"                 101              1.274X
-//     "BitWriter (8 byte) 6-Bit"               80.37                  1X
-//              "BitWriter 6-Bit"               99.28              1.235X
-//     "BitWriter (8 byte) 7-Bit"               80.19                  1X
-//              "BitWriter 7-Bit"               98.09              1.223X
-//     "BitWriter (8 byte) 8-Bit"               84.93                  1X
-//              "BitWriter 8-Bit"                  97              1.142X
-//     "BitWriter (8 byte) 9-Bit"               79.85                  1X
-//              "BitWriter 9-Bit"               95.09              1.191X
-//    "BitWriter (8 byte) 10-Bit"               80.51                  1X
-//             "BitWriter 10-Bit"               94.17               1.17X
-//    "BitWriter (8 byte) 11-Bit"               79.36                  1X
-//             "BitWriter 11-Bit"                93.2              1.174X
-//    "BitWriter (8 byte) 12-Bit"               80.79                  1X
-//             "BitWriter 12-Bit"               92.09               1.14X
-//    "BitWriter (8 byte) 13-Bit"               78.28                  1X
-//             "BitWriter 13-Bit"               90.83               1.16X
-//    "BitWriter (8 byte) 14-Bit"               78.57                  1X
-//             "BitWriter 14-Bit"               89.71              1.142X
-//    "BitWriter (8 byte) 15-Bit"               77.28                  1X
-//             "BitWriter 15-Bit"                  88              1.139X
-//    "BitWriter (8 byte) 16-Bit"               86.98                  1X
-//             "BitWriter 16-Bit"               88.08              1.013X
-
-// decode:               Function     Rate (iters/ms)          Comparison
-// ----------------------------------------------------------------------
-//     "BitWriter (8 byte) 1-Bit"               132.9                  1X
-//              "BitWriter 1-Bit"               126.9             0.9546X
-//     "BitWriter (8 byte) 2-Bit"               132.9                  1X
-//              "BitWriter 2-Bit"               125.6             0.9448X
-//     "BitWriter (8 byte) 3-Bit"               132.8                  1X
-//              "BitWriter 3-Bit"               122.7             0.9237X
-//     "BitWriter (8 byte) 4-Bit"               133.1                  1X
-//              "BitWriter 4-Bit"               123.6             0.9284X
-//     "BitWriter (8 byte) 5-Bit"               132.2                  1X
-//              "BitWriter 5-Bit"               118.2             0.8942X
-//     "BitWriter (8 byte) 6-Bit"               132.9                  1X
-//              "BitWriter 6-Bit"               117.6              0.885X
-//     "BitWriter (8 byte) 7-Bit"               132.3                  1X
-//              "BitWriter 7-Bit"               112.8             0.8525X
-//     "BitWriter (8 byte) 8-Bit"               132.9                  1X
-//              "BitWriter 8-Bit"               119.2             0.8971X
-//     "BitWriter (8 byte) 9-Bit"               131.8                  1X
-//              "BitWriter 9-Bit"               111.3             0.8447X
-//    "BitWriter (8 byte) 10-Bit"               131.4                  1X
-//             "BitWriter 10-Bit"               108.5             0.8255X
-//    "BitWriter (8 byte) 11-Bit"               131.7                  1X
-//             "BitWriter 11-Bit"               106.9             0.8118X
-//    "BitWriter (8 byte) 12-Bit"               132.9                  1X
-//             "BitWriter 12-Bit"               108.8             0.8189X
-//    "BitWriter (8 byte) 13-Bit"                 131                  1X
-//             "BitWriter 13-Bit"               103.1             0.7873X
-//    "BitWriter (8 byte) 14-Bit"               131.6                  1X
-//             "BitWriter 14-Bit"               101.6             0.7724X
-//    "BitWriter (8 byte) 15-Bit"               131.1                  1X
-//             "BitWriter 15-Bit"               99.91             0.7622X
-//    "BitWriter (8 byte) 16-Bit"                 133                  1X
-//             "BitWriter 16-Bit"               105.2             0.7907X
-
-using namespace impala;
-
-const int BUFFER_LEN = 64 * 4096;
-
-struct TestData {
-  uint8_t* array;
-  uint8_t* buffer;
-  int num_values;
-  int num_bits;
-  int max_value;
-  MemPool* pool;
-  bool result;
-};
-
-void TestBitWriterEncode(int batch_size, void* d) {
-  TestData* data = reinterpret_cast<TestData*>(d);
-  int buffer_size = BitUtil::Ceil(data->num_bits * data->num_values, 8);
-  for (int i = 0; i < batch_size; ++i) {
-    BitWriter writer(data->buffer, buffer_size);
-    // Unroll this to focus more on Put performance.
-    for (int j = 0; j < data->num_values; j += 8) {
-      writer.PutValue(j + 0, data->num_bits);
-      writer.PutValue(j + 1, data->num_bits);
-      writer.PutValue(j + 2, data->num_bits);
-      writer.PutValue(j + 3, data->num_bits);
-      writer.PutValue(j + 4, data->num_bits);
-      writer.PutValue(j + 5, data->num_bits);
-      writer.PutValue(j + 6, data->num_bits);
-      writer.PutValue(j + 7, data->num_bits);
-    }
-    writer.Flush();
-  }
-}
-
-void TestBitWriter8ByteEncode(int batch_size, void* d) {
-  TestData* data = reinterpret_cast<TestData*>(d);
-  int buffer_size = BitUtil::Ceil(data->num_bits * data->num_values, 8);
-  for (int i = 0; i < batch_size; ++i) {
-    BitWriter_8byte writer(data->buffer, buffer_size);
-    // Unroll this to focus more on Put performance.
-    for (int j = 0; j < data->num_values; j += 8) {
-      writer.PutValue(j + 0, data->num_bits);
-      writer.PutValue(j + 1, data->num_bits);
-      writer.PutValue(j + 2, data->num_bits);
-      writer.PutValue(j + 3, data->num_bits);
-      writer.PutValue(j + 4, data->num_bits);
-      writer.PutValue(j + 5, data->num_bits);
-      writer.PutValue(j + 6, data->num_bits);
-      writer.PutValue(j + 7, data->num_bits);
-    }
-  }
-}
-
-void TestBitWriterDecode(int batch_size, void* d) {
-  TestData* data = reinterpret_cast<TestData*>(d);
-  int64_t v;
-  for (int i = 0; i < batch_size; ++i) {
-    BitReader reader(data->buffer, BUFFER_LEN);
-    // Unroll this to focus more on Put performance.
-    for (int j = 0; j < data->num_values; j += 8) {
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-      reader.GetValue(data->num_bits, &v);
-    }
-  }
-}
-
-void TestBitWriter8ByteDecode(int batch_size, void* d) {
-  TestData* data = reinterpret_cast<TestData*>(d);
-  data->result = true;
-  int64_t v;
-  for (int i = 0; i < batch_size; ++i) {
-    BitReader_8byte reader(data->buffer, BUFFER_LEN);
-    // Unroll this to focus more on Put performance.
-    for (int j = 0; j < data->num_values; j += 8) {
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-      data->result &= reader.GetValue(data->num_bits, &v);
-    }
-  }
-  CHECK(data->result);
-}
-
-int main(int argc, char** argv) {
-  CpuInfo::Init();
-
-  MemTracker tracker;
-  MemPool pool(&tracker);
-
-  int num_values = 4096;
-  int max_bits = 16;
-
-  Benchmark encode_suite("encode");
-  TestData data[max_bits];
-  for (int i = 0; i < max_bits; ++i) {
-    data[i].buffer = new uint8_t[BUFFER_LEN];
-    data[i].num_values = num_values;
-    data[i].num_bits = i + 1;
-    data[i].max_value = 1 << i;
-    data[i].pool = &pool;
-
-    stringstream suffix;
-    suffix << " " << (i+1) << "-Bit";
-
-    stringstream name;
-    name << "\"BitWriter (8 byte)" << suffix.str() << "\"";
-    int baseline =
-        encode_suite.AddBenchmark(name.str(), TestBitWriter8ByteEncode, 
&data[i], -1);
-
-    name.str("");
-    name << "\"BitWriter" << suffix.str() << "\"";
-    encode_suite.AddBenchmark(name.str(), TestBitWriterEncode, &data[i], 
baseline);
-  }
-  cout << encode_suite.Measure() << endl;
-
-  Benchmark decode_suite("decode");
-  for (int i = 0; i < max_bits; ++i) {
-    stringstream suffix;
-    suffix << " " << (i+1) << "-Bit";
-
-    stringstream name;
-    name << "\"BitWriter (8 byte)" << suffix.str() << "\"";
-    int baseline =
-        decode_suite.AddBenchmark(name.str(), TestBitWriter8ByteDecode, 
&data[i], -1);
-
-    name.str("");
-    name << "\"BitWriter" << suffix.str() << "\"";
-    decode_suite.AddBenchmark(name.str(), TestBitWriterDecode, &data[i], 
baseline);
-  }
-  cout << decode_suite.Measure() << endl;
-
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc 
b/be/src/exec/parquet-column-readers.cc
index 04127e3..5a2e90e 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -77,6 +77,8 @@ Status ParquetLevelDecoder::Init(const string& filename,
     parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
     int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
   DCHECK_GE(num_buffered_values, 0);
+  DCHECK_GT(cache_size, 0);
+  cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
   encoding_ = encoding;
   max_level_ = max_level;
   num_buffered_values_ = num_buffered_values;
@@ -97,7 +99,7 @@ Status ParquetLevelDecoder::Init(const string& filename,
         return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, 
num_bytes);
       }
       int bit_width = BitUtil::Log2Ceiling64(max_level + 1);
-      Reset(*data, num_bytes, bit_width);
+      rle_decoder_.Reset(*data, num_bytes, bit_width);
       break;
     }
     case parquet::Encoding::BIT_PACKED:
@@ -143,66 +145,80 @@ Status ParquetLevelDecoder::InitCache(MemPool* pool, int 
cache_size) {
 }
 
 inline int16_t ParquetLevelDecoder::ReadLevel() {
-  bool valid;
-  uint8_t level;
-  if (encoding_ == parquet::Encoding::RLE) {
-    valid = Get(&level);
-  } else {
-    DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
-    valid = bit_reader_.GetValue(1, &level);
+  if (UNLIKELY(!CacheHasNext())) {
+    if (UNLIKELY(!FillCache(cache_size_, &num_cached_levels_))) {
+      return HdfsParquetScanner::INVALID_LEVEL;
+    }
+    DCHECK_GE(num_cached_levels_, 0);
+    if (UNLIKELY(num_cached_levels_ == 0)) {
+      return HdfsParquetScanner::INVALID_LEVEL;
+    }
   }
-  return LIKELY(valid) ? level : HdfsParquetScanner::INVALID_LEVEL;
+  return CacheGetNext();
 }
 
-Status ParquetLevelDecoder::CacheNextBatch(int batch_size) {
-  DCHECK_LE(batch_size, cache_size_);
-  cached_level_idx_ = 0;
+Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) {
+  /// Fill the cache completely if there are enough values remaining.
+  /// Otherwise don't try to read more values than are left.
+  int batch_size = min(vals_remaining, cache_size_);
   if (max_level_ > 0) {
-    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_))) {
+    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_) ||
+          num_cached_levels_ < batch_size)) {
       return Status(decoding_error_code_, num_buffered_values_, filename_);
     }
   } else {
     // No levels to read, e.g., because the field is required. The cache was
     // already initialized with all zeros, so we can hand out those values.
     DCHECK_EQ(max_level_, 0);
+    cached_level_idx_ = 0;
     num_cached_levels_ = batch_size;
   }
   return Status::OK();
 }
 
-bool ParquetLevelDecoder::FillCache(int batch_size,
-    int* num_cached_levels) {
-  DCHECK(num_cached_levels != NULL);
-  int num_values = 0;
+bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) {
+  DCHECK(!CacheHasNext());
+  DCHECK(num_cached_levels != nullptr);
+  DCHECK_GE(max_level_, 0);
+  DCHECK_EQ(num_cached_levels_ % 32, 0) << "Last batch was not a multiple of 
32";
+  cached_level_idx_ = 0;
+  if (max_level_ == 0) {
+    // No levels to read, e.g., because the field is required. The cache was
+    // already initialized with all zeros, so we can hand out those values.
+    *num_cached_levels = batch_size;
+    return true;
+  }
   if (encoding_ == parquet::Encoding::RLE) {
-    while (true) {
-      // Add RLE encoded values by repeating the current value this number of 
times.
-      uint32_t num_repeats_to_set =
-          min<uint32_t>(repeat_count_, batch_size - num_values);
-      memset(cached_levels_ + num_values, current_value_, num_repeats_to_set);
-      num_values += num_repeats_to_set;
-      repeat_count_ -= num_repeats_to_set;
-
-      // Add remaining literal values, if any.
-      uint32_t num_literals_to_set =
-          min<uint32_t>(literal_count_, batch_size - num_values);
-      int num_values_end = min<uint32_t>(num_values + literal_count_, 
batch_size);
-      for (; num_values < num_values_end; ++num_values) {
-        bool valid = bit_reader_.GetValue(bit_width_, 
&cached_levels_[num_values]);
-        if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) 
return false;
-      }
-      literal_count_ -= num_literals_to_set;
-
-      if (num_values == batch_size) break;
-      if (UNLIKELY(!NextCounts<int16_t>())) return false;
-      if (repeat_count_ > 0 && current_value_ > max_level_) return false;
-    }
+    return FillCacheRle(batch_size, num_cached_levels);
   } else {
     DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
-    for (; num_values < batch_size; ++num_values) {
-      bool valid = bit_reader_.GetValue(1, &cached_levels_[num_values]);
-      if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return 
false;
+    *num_cached_levels = bit_reader_.UnpackBatch(1, batch_size, 
cached_levels_);
+    return true;
+  }
+}
+
+bool ParquetLevelDecoder::FillCacheRle(int batch_size, int* num_cached_levels) 
{
+  int num_values = 0;
+  while (num_values < batch_size) {
+    // Add RLE encoded values by repeating the current value this number of 
times.
+    uint32_t num_repeats = rle_decoder_.NextNumRepeats();
+    if (num_repeats > 0) {
+      uint32_t num_repeats_to_set = min<uint32_t>(num_repeats, batch_size - 
num_values);
+      uint8_t repeated_value = 
rle_decoder_.GetRepeatedValue(num_repeats_to_set);
+      memset(cached_levels_ + num_values, repeated_value, num_repeats_to_set);
+      num_values += num_repeats_to_set;
+      continue;
+    }
+
+    // Add remaining literal values, if any.
+    uint32_t num_literals = rle_decoder_.NextNumLiterals();
+    if (num_literals == 0) break;
+    uint32_t num_literals_to_set = min<uint32_t>(num_literals, batch_size - 
num_values);
+    if (!rle_decoder_.GetLiteralValues(
+          num_literals_to_set, &cached_levels_[num_values])) {
+      return false;
     }
+    num_values += num_literals_to_set;
   }
   *num_cached_levels = num_values;
   return true;
@@ -282,9 +298,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     // NextLevels() should have already been called and def and rep levels 
should be in
     // valid range.
     DCHECK_GE(rep_level_, 0);
-    DCHECK_LE(rep_level_, max_rep_level());
     DCHECK_GE(def_level_, 0);
-    DCHECK_LE(def_level_, max_def_level());
     DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
         "Caller should have called NextLevels() until we are ready to read a 
value";
 
@@ -330,6 +344,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     int val_count = 0;
     bool continue_execution = true;
     while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+      DCHECK_GE(num_buffered_values_, 0);
       // Read next page if necessary.
       if (num_buffered_values_ == 0) {
         if (!NextPage()) {
@@ -338,26 +353,29 @@ class ScalarColumnReader : public BaseScalarColumnReader {
         }
       }
 
-      // Fill def/rep level caches if they are empty.
-      int level_batch_size = min(parent_->state_->batch_size(), 
num_buffered_values_);
-      if (!def_levels_.CacheHasNext()) {
-        
parent_->parse_status_.MergeStatus(def_levels_.CacheNextBatch(level_batch_size));
-      }
-      // We only need the repetition levels for populating the position slot 
since we
-      // are only populating top-level tuples.
-      if (IN_COLLECTION && pos_slot_desc_ != NULL && 
!rep_levels_.CacheHasNext()) {
-        
parent_->parse_status_.MergeStatus(rep_levels_.CacheNextBatch(level_batch_size));
-      }
-      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-
-      // This special case is most efficiently handled here directly.
+      // Not materializing anything - skip decoding any levels and rely on the 
value
+      // count from page metadata to return the correct number of rows.
       if (!MATERIALIZED && !IN_COLLECTION) {
-        int vals_to_add = min(def_levels_.CacheRemaining(), max_values - 
val_count);
+        int vals_to_add = min(num_buffered_values_, max_values - val_count);
         val_count += vals_to_add;
-        def_levels_.CacheSkipLevels(vals_to_add);
         num_buffered_values_ -= vals_to_add;
         continue;
       }
+      // Fill the rep level cache if needed. We are flattening out the fields 
of the
+      // nested collection into the top-level tuple returned by the scan, so 
we don't
+      // care about the nesting structure unless the position slot is being 
populated.
+      if (IN_COLLECTION && pos_slot_desc_ != nullptr && 
!rep_levels_.CacheHasNext()) {
+        parent_->parse_status_.MergeStatus(
+            rep_levels_.CacheNextBatch(num_buffered_values_));
+        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+      }
+
+      // Fill def level cache if needed.
+      if (!def_levels_.CacheHasNext()) {
+        // TODO: add a fast path here if there's a run of repeated values.
+        parent_->parse_status_.MergeStatus(
+            def_levels_.CacheNextBatch(num_buffered_values_));
+      }
 
       // Read data page and cached levels to materialize values.
       int cache_start_idx = def_levels_.CacheCurrIdx();
@@ -685,7 +703,9 @@ class BoolColumnReader : public BaseScalarColumnReader {
 
   virtual Status InitDataPage(uint8_t* data, int size) {
     // Initialize bool decoder
-    bool_values_ = BitReader(data, size);
+    bool_values_.Reset(data, size);
+    num_unpacked_values_ = 0;
+    unpacked_value_idx_ = 0;
     return Status::OK();
   }
 
@@ -695,9 +715,7 @@ class BoolColumnReader : public BaseScalarColumnReader {
     DCHECK(slot_desc_ != NULL);
     // Def and rep levels should be in valid range.
     DCHECK_GE(rep_level_, 0);
-    DCHECK_LE(rep_level_, max_rep_level());
     DCHECK_GE(def_level_, 0);
-    DCHECK_LE(def_level_, max_def_level());
     DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
         "Caller should have called NextLevels() until we are ready to read a 
value";
 
@@ -719,14 +737,38 @@ class BoolColumnReader : public BaseScalarColumnReader {
   template<bool IN_COLLECTION>
   inline bool ReadSlot(Tuple* tuple, MemPool* pool)  {
     void* slot = tuple->GetSlot(tuple_offset_);
-    if (!bool_values_.GetValue(1, reinterpret_cast<bool*>(slot))) {
-      parent_->parse_status_ = Status("Invalid bool column.");
-      return false;
+    bool val;
+    if (unpacked_value_idx_ < num_unpacked_values_) {
+      val = unpacked_values_[unpacked_value_idx_++];
+    } else {
+      // Unpack as many values as we can into the buffer. We expect to read at 
least one
+      // value.
+      int num_unpacked =
+          bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, 
&unpacked_values_[0]);
+      if (UNLIKELY(num_unpacked == 0)) {
+        parent_->parse_status_ = Status("Invalid bool column.");
+        return false;
+      }
+      val = unpacked_values_[0];
+      num_unpacked_values_ = num_unpacked;
+      unpacked_value_idx_ = 1;
     }
+    *reinterpret_cast<bool*>(slot) = val;
     return NextLevels<IN_COLLECTION>();
   }
 
-  BitReader bool_values_;
+  /// A buffer to store unpacked values. Must be a multiple of 32 size to use 
the
+  /// batch-oriented interface of BatchedBitReader.
+  static const int UNPACKED_BUFFER_LEN = 128;
+  bool unpacked_values_[UNPACKED_BUFFER_LEN];
+
+  /// The number of valid values in 'unpacked_values_'.
+  int num_unpacked_values_ = 0;
+
+  /// The next value to return from 'unpacked_values_'.
+  int unpacked_value_idx_ = 0;
+
+  BatchedBitReader bool_values_;
 };
 
 // Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error 
handling

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h 
b/be/src/exec/parquet-column-readers.h
index dea84a8..17b8c0f 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -22,6 +22,7 @@
 
 #include "exec/hdfs-parquet-scanner.h"
 #include "util/codec.h"
+#include "util/bit-stream-utils.h"
 #include "util/dict-encoding.h"
 #include "util/rle-encoding.h"
 
@@ -36,39 +37,34 @@ class MemPool;
 /// depth of 100, as enforced by the FE. Using a small type saves memory and 
speeds up
 /// populating the level cache (e.g., with RLE we can memset() repeated 
values).
 ///
-/// Inherits from RleDecoder instead of containing one for performance reasons.
-/// The containment design would require two BitReaders per column reader. The 
extra
-/// BitReader causes enough bloat for a column reader to require another cache 
line.
-/// TODO: It is not clear whether the inheritance vs. containment choice still 
makes
-/// sense with column-wise materialization. The containment design seems 
cleaner and
-/// we should revisit.
-class ParquetLevelDecoder : public RleDecoder {
+/// TODO: expose whether we're in a run of repeated values so that callers can
+/// optimise for that case.
+class ParquetLevelDecoder {
  public:
   ParquetLevelDecoder(bool is_def_level_decoder)
-    : cached_levels_(NULL),
-      num_cached_levels_(0),
-      cached_level_idx_(0),
-      encoding_(parquet::Encoding::PLAIN),
-      max_level_(0),
-      cache_size_(0),
-      num_buffered_values_(0),
-      decoding_error_code_(is_def_level_decoder ?
+    : decoding_error_code_(is_def_level_decoder ?
           TErrorCode::PARQUET_DEF_LEVEL_ERROR : 
TErrorCode::PARQUET_REP_LEVEL_ERROR) {
   }
 
   /// Initialize the LevelDecoder. Reads and advances the provided data buffer 
if the
-  /// encoding requires reading metadata from the page header.
+  /// encoding requires reading metadata from the page header. 'cache_size' 
will be
+  /// rounded up to a multiple of 32 internally.
   Status Init(const string& filename, parquet::Encoding::type encoding,
       MemPool* cache_pool, int cache_size, int max_level, int 
num_buffered_values,
       uint8_t** data, int* data_size);
 
-  /// Returns the next level or INVALID_LEVEL if there was an error.
+  /// Returns the next level or INVALID_LEVEL if there was an error. Not as 
efficient
+  /// as batched methods.
   inline int16_t ReadLevel();
 
-  /// Decodes and caches the next batch of levels. Resets members associated 
with the
-  /// cache. Returns a non-ok status if there was a problem decoding a level, 
or if a
-  /// level was encountered with a value greater than max_level_.
-  Status CacheNextBatch(int batch_size);
+  /// Decodes and caches the next batch of levels given that there are 
'vals_remaining'
+  /// values left to decode in the page. Resets members associated with the 
cache.
+  /// Returns a non-ok status if there was a problem decoding a level, if a 
level was
+  /// encountered with a value greater than max_level_, or if fewer than
+  /// min(CacheSize(), vals_remaining) levels could be read, which indicates 
that the
+  /// input did not have the expected number of values. Only valid to call when
+  /// the cache has been exhausted, i.e. CacheHasNext() is false.
+  Status CacheNextBatch(int vals_remaining);
 
   /// Functions for working with the level cache.
   inline bool CacheHasNext() const { return cached_level_idx_ < 
num_cached_levels_; }
@@ -83,33 +79,57 @@ class ParquetLevelDecoder : public RleDecoder {
   inline int CacheSize() const { return num_cached_levels_; }
   inline int CacheRemaining() const { return num_cached_levels_ - 
cached_level_idx_; }
   inline int CacheCurrIdx() const { return cached_level_idx_; }
-
  private:
   /// Initializes members associated with the level cache. Allocates memory for
   /// the cache from pool, if necessary.
   Status InitCache(MemPool* pool, int cache_size);
 
-  /// Decodes and writes a batch of levels into the cache. Sets the number of
-  /// values written to the cache in *num_cached_levels. Returns false if 
there was
-  /// an error decoding a level or if there was a level value greater than 
max_level_.
+  /// Decodes and writes a batch of levels into the cache. Returns true and 
sets
+  /// the number of values written to the cache via *num_cached_levels if no 
errors
+  /// are encountered. *num_cached_levels is < 'batch_size' in this case iff 
the
+  /// end of input was hit without any other errors. Returns false if there 
was an
+  /// error decoding a level or if there was an invalid level value greater 
than
+  /// 'max_level_'. Only valid to call when the cache has been exhausted, i.e.
+  /// CacheHasNext() is false.
   bool FillCache(int batch_size, int* num_cached_levels);
 
-  /// Buffer for a batch of levels. The memory is allocated and owned by a 
pool in
-  /// passed in Init().
-  uint8_t* cached_levels_;
+  /// Implementation of FillCache() for RLE encoding.
+  bool FillCacheRle(int batch_size, int* num_cached_levels);
+
+  /// RLE decoder, used if 'encoding_' is RLE.
+  RleBatchDecoder<uint8_t> rle_decoder_;
+
+  /// Bit unpacker, used if 'encoding_' is BIT_PACKED.
+  BatchedBitReader bit_reader_;
+
+  /// Buffer for a batch of levels. The memory is allocated and owned by a 
pool passed
+  /// in Init().
+  uint8_t* cached_levels_ = nullptr;
+
   /// Number of valid level values in the cache.
-  int num_cached_levels_;
+  int num_cached_levels_ = 0;
+
   /// Current index into cached_levels_.
-  int cached_level_idx_;
-  parquet::Encoding::type encoding_;
+  int cached_level_idx_ = 0;
+
+  /// The parquet encoding used for the levels. Usually RLE but the deprecated 
BIT_PACKED
+  /// encoding is also allowed.
+  parquet::Encoding::type encoding_ = parquet::Encoding::PLAIN;
 
   /// For error checking and reporting.
-  int max_level_;
-  /// Number of level values cached_levels_ has memory allocated for.
-  int cache_size_;
+  int max_level_ = 0;
+
+  /// Number of level values cached_levels_ has memory allocated for. Always
+  /// a multiple of 32 to allow reading directly from 'bit_reader_' in batches.
+  int cache_size_ = 0;
+
   /// Number of remaining data values in the current data page.
-  int num_buffered_values_;
+  int num_buffered_values_ = 0;
+
+  /// Name of the parquet file. Used for reporting level decoding errors.
   string filename_;
+
+  /// Error code to use when reporting level decoding errors.
   TErrorCode::type decoding_error_code_;
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/experiments/bit-stream-utils.8byte.h
----------------------------------------------------------------------
diff --git a/be/src/experiments/bit-stream-utils.8byte.h 
b/be/src/experiments/bit-stream-utils.8byte.h
deleted file mode 100644
index f9418a5..0000000
--- a/be/src/experiments/bit-stream-utils.8byte.h
+++ /dev/null
@@ -1,137 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_H
-#define IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_H
-
-#include <boost/cstdint.hpp>
-#include <string.h>
-#include "common/compiler-util.h"
-#include "common/logging.h"
-#include "util/bit-util.h"
-
-namespace impala {
-
-/// Utility class to write bit/byte streams.  This class can write data to 
either be
-/// bit packed or byte aligned (and a single stream that has a mix of both).
-/// This class does not allocate memory.
-class BitWriter_8byte {
- public:
-  /// buffer: buffer to write bits to.  Buffer should be preallocated with
-  /// 'buffer_len' bytes. 'buffer_len' must be a multiple of 8.
-  BitWriter_8byte(uint8_t* buffer, int buffer_len) :
-      buffer_(reinterpret_cast<uint64_t*>(buffer)),
-      max_bytes_(buffer_len),
-      offset_(0),
-      bit_offset_(0) {
-    DCHECK_EQ(buffer_len % 8, 0);
-  }
-
-  void Clear() {
-    offset_ = 0;
-    bit_offset_ = 0;
-    memset(buffer_, 0, max_bytes_);
-  }
-
-  uint8_t* buffer() const { return reinterpret_cast<uint8_t*>(buffer_); }
-  int buffer_len() const { return max_bytes_; }
-
-  inline int bytes_written() const {
-    return offset_ * 8 + BitUtil::Ceil(bit_offset_, 8);
-  }
-
-  /// Writes a value to the buffer.  This is bit packed.  Returns false if
-  /// there was not enough space.
-  bool PutValue(uint64_t v, int num_bits);
-
-  /// Writes v to the next aligned byte.
-  template<typename T>
-  bool PutAligned(T v, int num_bits);
-
-  /// Write a Vlq encoded int to the buffer.  Returns false if there was not 
enough
-  /// room.  The value is written byte aligned.
-  /// For more details on vlq:
-  /// en.wikipedia.org/wiki/Variable-length_quantity
-  bool PutVlqInt(int32_t v);
-
-  /// Get a pointer to the next aligned byte and advance the underlying buffer
-  /// by num_bytes.
-  /// Returns NULL if there was not enough space.
-  uint8_t* GetNextBytePtr(int num_bytes = 1);
-
- private:
-  uint64_t* buffer_;
-  int max_bytes_;
-  int offset_;            // Offset into buffer_
-  int bit_offset_;        // Offset into current uint64_t
-};
-
-/// Utility class to read bit/byte stream.  This class can read bits or bytes
-/// that are either byte aligned or not.  It also has utilities to read 
multiple
-/// bytes in one read (e.g. encoded int).
-class BitReader_8byte {
- public:
-  /// buffer: buffer to read from.  The buffer's length is 'buffer_len' and 
must be a
-  /// multiple of 8.
-  BitReader_8byte(uint8_t* buffer, int buffer_len) :
-      buffer_(reinterpret_cast<uint64_t*>(buffer)),
-      max_bytes_(buffer_len),
-      offset_(0),
-      bit_offset_(0) {
-    DCHECK_EQ(buffer_len % 8, 0);
-  }
-
-  BitReader_8byte() : buffer_(NULL), max_bytes_(0) {}
-
-  /// Gets the next value from the buffer.
-  /// Returns true if 'v' could be read or false if there are not enough bytes 
left.
-  template<typename T>
-  bool GetValue(int num_bits, T* v);
-
-  /// Reads a T sized value from the buffer.  T needs to be a native type and 
little
-  /// endian.  The value is assumed to be byte aligned so the stream will be 
advance
-  /// to the start of the next byte before v is read.
-  template<typename T>
-  bool GetAligned(int num_bits, T* v);
-
-  /// Reads a vlq encoded int from the stream.  The encoded int must start at 
the
-  /// beginning of a byte. Return false if there were not enough bytes in the 
buffer.
-  bool GetVlqInt(int32_t* v);
-
-  /// Returns the number of bytes left in the stream, not including the 
current byte (i.e.,
-  /// there may be an additional fraction of a byte).
-  inline int bytes_left() {
-    return max_bytes_ - (offset_ * 8 + BitUtil::Ceil(bit_offset_, 8));
-  }
-
-  /// Maximum byte length of a vlq encoded int
-  static const int MAX_VLQ_BYTE_LEN = 5;
- 
- private:
-  uint64_t* buffer_;
-  int max_bytes_;
-  int offset_;            // Offset into buffer_
-  int bit_offset_;        // Offset into current uint64_t
-
-  /// Advances offset_ and/or bit_offset_ to next byte boundary in buffer_.
-  inline void Align();
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/experiments/bit-stream-utils.8byte.inline.h
----------------------------------------------------------------------
diff --git a/be/src/experiments/bit-stream-utils.8byte.inline.h 
b/be/src/experiments/bit-stream-utils.8byte.inline.h
deleted file mode 100644
index 1b1c05a..0000000
--- a/be/src/experiments/bit-stream-utils.8byte.inline.h
+++ /dev/null
@@ -1,145 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_INLINE_H
-#define IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_INLINE_H
-
-#include "experiments/bit-stream-utils.8byte.h"
-
-namespace impala {
-
-inline bool BitWriter_8byte::PutValue(uint64_t v, int num_bits) {
-  DCHECK_LE(num_bits, 64);
-  DCHECK(num_bits == 64 || v >> num_bits == 0)
-      << "v = " << v << ", num_bits = " << num_bits;
-
-  if (UNLIKELY(offset_ * 8 + bit_offset_ > max_bytes_ * 8 - num_bits)) return 
false;
-
-  buffer_[offset_] |= v << bit_offset_;
-  bit_offset_ += num_bits;
-  if (bit_offset_ >= 64) {
-    ++offset_;
-    bit_offset_ -= 64;
-    if (UNLIKELY(bit_offset_ > 0)) {
-      // Write out bits of v that crossed into new byte offset
-      // We cannot perform v >> num_bits (i.e. when bit_offset_ is 0) because 
v >> 64 != 0
-      buffer_[offset_] = v >> (num_bits - bit_offset_);
-    }
-  }
-  DCHECK_LT(bit_offset_, 64);
-  return true;
-}
-
-inline uint8_t* BitWriter_8byte::GetNextBytePtr(int num_bytes) {
-  if (UNLIKELY(bytes_written() + num_bytes > max_bytes_)) return NULL;
-
-  // Advance to next aligned byte if necessary
-  if (UNLIKELY(bit_offset_ > 56)) {
-    ++offset_;
-    bit_offset_ = 0;
-  } else {
-    bit_offset_ = BitUtil::RoundUpNumBytes(bit_offset_) * 8;
-  }
-
-  DCHECK_EQ(bit_offset_ % 8, 0);
-  uint8_t* ptr = reinterpret_cast<uint8_t*>(buffer_ + offset_) + bit_offset_ / 
8;
-  bit_offset_ += num_bytes * 8;
-  offset_ += bit_offset_ / 64;
-  bit_offset_ %= 64;
-  return ptr;
-}
-
-template<typename T>
-inline bool BitWriter_8byte::PutAligned(T val, int num_bits) {
-  // Align to byte boundary
-  uint8_t* byte_ptr = GetNextBytePtr(0);
-  bool result = PutValue(val, num_bits);
-  if (!result) return false;
-  // Pad to next byte boundary
-  byte_ptr = GetNextBytePtr(0);
-  DCHECK(byte_ptr != NULL);
-  return true;
-}
-
-inline bool BitWriter_8byte::PutVlqInt(int32_t v) {
-  bool result = true;
-  while ((v & 0xFFFFFF80) != 0L) {
-    result &= PutAligned<uint8_t>((v & 0x7F) | 0x80, 8);
-    v >>= 7;
-  }
-  result &= PutAligned<uint8_t>(v & 0x7F, 8);
-  return result;
-}
-
-template<typename T>
-inline bool BitReader_8byte::GetValue(int num_bits, T* v) {
-  int size = sizeof(T) * 8;
-  DCHECK_LE(num_bits, size);
-  if (UNLIKELY(offset_ * 8 + bit_offset_ > max_bytes_ * 8 - num_bits)) return 
false;
-
-  *v = BitUtil::TrailingBits(buffer_[offset_], bit_offset_ + num_bits) >> 
bit_offset_;
-  bit_offset_ += num_bits;
-  if (bit_offset_ >= 64) {
-    ++offset_;
-    bit_offset_ -= 64;
-    // Read bits of v that crossed into new byte offset
-    *v |= BitUtil::TrailingBits(buffer_[offset_], bit_offset_)
-        << (num_bits - bit_offset_);
-  }
-  DCHECK_LT(bit_offset_, 64);
-  return true;
-}
-
-template<typename T>
-inline bool BitReader_8byte::GetAligned(int num_bits, T* v) {
- Align();
-  if (UNLIKELY(bytes_left() < BitUtil::Ceil(num_bits, 8))) return false;
-  DCHECK_EQ(bit_offset_ % 8, 0);
-  bool result = GetValue(num_bits, v);
-  DCHECK(result);
-  Align();
-  return true;
-}
-
-inline bool BitReader_8byte::GetVlqInt(int32_t* v) {
-  *v = 0;
-  int shift = 0;
-  int num_bytes = 0;
-  uint8_t byte = 0;
-  do {
-    if (!GetAligned<uint8_t>(8, &byte)) return false;
-    *v |= (byte & 0x7F) << shift;
-    shift += 7;
-    DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN);
-  } while ((byte & 0x80) != 0);
-  return true;
-}
-
-inline void BitReader_8byte::Align() {
-  if (UNLIKELY(bit_offset_ > 56)) {
-    ++offset_;
-    bit_offset_ = 0;
-    DCHECK_LE(offset_, max_bytes_);
-  } else {
-    bit_offset_ = BitUtil::RoundUpNumBytes(bit_offset_) * 8;
-  }
-}
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-packing.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.h b/be/src/util/bit-packing.h
index 62e5e88..05036db 100644
--- a/be/src/util/bit-packing.h
+++ b/be/src/util/bit-packing.h
@@ -62,6 +62,27 @@ class BitPacking {
       const uint8_t* __restrict__ in, int64_t in_bytes, int64_t num_values,
       OutType* __restrict__ out);
 
+  /// Same as above, templated by BIT_WIDTH.
+  template <typename OutType, int BIT_WIDTH>
+  static std::pair<const uint8_t*, int64_t> UnpackValues(const uint8_t* 
__restrict__ in,
+      int64_t in_bytes, int64_t num_values, OutType* __restrict__ out);
+
+  /// Unpack values as above, treating them as unsigned integers, and decode 
them
+  /// using the provided dict. Sets 'decode_error' to true if one of the packed
+  /// values was greater than 'dict_len'. Does not modify 'decode_error' on 
success.
+  template <typename OutType>
+  static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(int 
bit_width,
+      const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ 
dict,
+      int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+      bool* __restrict__ decode_error);
+
+  /// Same as above, templated by BIT_WIDTH.
+  template <typename OutType, int BIT_WIDTH>
+  static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(
+      const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ 
dict,
+      int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+      bool* __restrict__ decode_error);
+
   /// Unpack exactly 32 values of 'bit_width' from 'in' to 'out'. 'in' must 
point to
   /// 'in_bytes' of addressable memory, and 'in_bytes' must be at least
   /// (32 * bit_width / 8). 'out' must have space for 32 OutType values.
@@ -70,22 +91,42 @@ class BitPacking {
   static const uint8_t* Unpack32Values(int bit_width, const uint8_t* 
__restrict__ in,
       int64_t in_bytes, OutType* __restrict__ out);
 
- private:
-  /// Implementation of Unpack32Values() that uses 32-bit integer loads to
-  /// unpack values with the given BIT_WIDTH from 'in' to 'out'.
+  /// Same as Unpack32Values() but templated by BIT_WIDTH.
   template <typename OutType, int BIT_WIDTH>
   static const uint8_t* Unpack32Values(
       const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ 
out);
 
-  /// Function that unpacks 'num_values' values with the given BIT_WIDTH from 
'in' to
-  /// 'out'. 'num_values' can be at most 32. The version with 'bit_width' as 
an argument
-  /// dispatches based on 'bit_width' to the appropriate templated 
implementation.
-  template <typename OutType, int BIT_WIDTH>
-  static const uint8_t* UnpackUpTo32Values(const uint8_t* __restrict__ in,
-      int64_t in_bytes, int num_values, OutType* __restrict__ out);
+  /// Same as Unpack32Values() with dictionary decoding.
   template <typename OutType>
-  static const uint8_t* UnpackUpTo32Values(int bit_width, const uint8_t* 
__restrict__ in,
+  static const uint8_t* UnpackAndDecode32Values(int bit_width,
+      const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ 
dict,
+      int64_t dict_len, OutType* __restrict__ out, bool* __restrict__ 
decode_error);
+
+  /// Same as UnpackAndDecode32Values() but templated by BIT_WIDTH.
+  template <typename OutType, int BIT_WIDTH>
+  static const uint8_t* UnpackAndDecode32Values(const uint8_t* __restrict__ in,
+      int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len,
+      OutType* __restrict__ out, bool* __restrict__ decode_error);
+
+  /// Unpacks 'num_values' values with the given BIT_WIDTH from 'in' to 'out'.
+  /// 'num_values' must be at most 31. 'in' must point to 'in_bytes' of 
addressable
+  /// memory, and 'in_bytes' must be at least ceil(num_values * bit_width / 8).
+  /// 'out' must have space for 'num_values' OutType values.
+  /// 0 <= 'bit_width' <= 32 and 'bit_width' <= # of bits in OutType.
+  template <typename OutType, int BIT_WIDTH>
+  static const uint8_t* UnpackUpTo31Values(const uint8_t* __restrict__ in,
       int64_t in_bytes, int num_values, OutType* __restrict__ out);
+
+  /// Same as UnpackUpTo31Values() with dictionary decoding.
+  template <typename OutType, int BIT_WIDTH>
+  static const uint8_t* UnpackAndDecodeUpTo31Values(const uint8_t* 
__restrict__ in,
+      int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, int 
num_values,
+      OutType* __restrict__ out, bool* __restrict__ decode_error);
+
+ private:
+  /// Compute the number of values with the given bit width that can be 
unpacked from
+  /// an input buffer of 'in_bytes' into an output buffer with space for 
'num_values'.
+  static int64_t NumValuesToUnpack(int bit_width, int64_t in_bytes, int64_t 
num_values);
 };
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-packing.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.inline.h b/be/src/util/bit-packing.inline.h
index 37d51ab..4b2bb33 100644
--- a/be/src/util/bit-packing.inline.h
+++ b/be/src/util/bit-packing.inline.h
@@ -31,32 +31,109 @@
 
 namespace impala {
 
+inline int64_t BitPacking::NumValuesToUnpack(
+    int bit_width, int64_t in_bytes, int64_t num_values) {
+  // Check if we have enough input bytes to decode 'num_values'.
+  if (bit_width == 0 || BitUtil::RoundUpNumBytes(num_values * bit_width) <= 
in_bytes) {
+    // Limited by output space.
+    return num_values;
+  } else {
+    // Limited by the number of input bytes. Compute the number of values that 
can be
+    // unpacked from the input.
+    return (in_bytes * CHAR_BIT) / bit_width;
+  }
+}
+
 template <typename OutType>
 std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(int bit_width,
     const uint8_t* __restrict__ in, int64_t in_bytes, int64_t num_values,
     OutType* __restrict__ out) {
+#pragma push_macro("UNPACK_VALUES_CASE")
+#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
+  case i:                                       \
+    return UnpackValues<OutType, i>(in, in_bytes, num_values, out);
+
+  switch (bit_width) {
+    // Expand cases from 0 to 32.
+    BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
+    default:
+      DCHECK(false);
+      return std::make_pair(nullptr, -1);
+  }
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(
+    const uint8_t* __restrict__ in, int64_t in_bytes, int64_t num_values,
+    OutType* __restrict__ out) {
   constexpr int BATCH_SIZE = 32;
-  const int64_t max_input_values =
-      bit_width ? (in_bytes * CHAR_BIT) / bit_width : num_values;
-  const int64_t values_to_read = std::min(num_values, max_input_values);
+  const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, 
num_values);
   const int64_t batches_to_read = values_to_read / BATCH_SIZE;
   const int64_t remainder_values = values_to_read % BATCH_SIZE;
   const uint8_t* in_pos = in;
   OutType* out_pos = out;
   // First unpack as many full batches as possible.
   for (int64_t i = 0; i < batches_to_read; ++i) {
-    in_pos = Unpack32Values<OutType>(bit_width, in_pos, in_bytes, out_pos);
+    in_pos = Unpack32Values<OutType, BIT_WIDTH>(in_pos, in_bytes, out_pos);
     out_pos += BATCH_SIZE;
-    in_bytes -= (BATCH_SIZE * bit_width) / CHAR_BIT;
+    in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT;
   }
   // Then unpack the final partial batch.
   if (remainder_values > 0) {
-    in_pos = UnpackUpTo32Values<OutType>(bit_width,
+    in_pos = UnpackUpTo31Values<OutType, BIT_WIDTH>(
         in_pos, in_bytes, remainder_values, out_pos);
   }
   return std::make_pair(in_pos, values_to_read);
 }
 
+template <typename OutType>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(int 
bit_width,
+    const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ 
dict,
+    int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+    bool* __restrict__ decode_error) {
+#pragma push_macro("UNPACK_VALUES_CASE")
+#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
+  case i:                                       \
+    return UnpackAndDecodeValues<OutType, i>(   \
+        in, in_bytes, dict, dict_len, num_values, out, decode_error);
+
+  switch (bit_width) {
+    // Expand cases from 0 to 32.
+    BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
+    default:
+      DCHECK(false);
+      return std::make_pair(nullptr, -1);
+  }
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(
+    const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ 
dict,
+    int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+    bool* __restrict__ decode_error) {
+  constexpr int BATCH_SIZE = 32;
+  const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, 
num_values);
+  const int64_t batches_to_read = values_to_read / BATCH_SIZE;
+  const int64_t remainder_values = values_to_read % BATCH_SIZE;
+  const uint8_t* in_pos = in;
+  OutType* out_pos = out;
+  // First unpack as many full batches as possible.
+  for (int64_t i = 0; i < batches_to_read; ++i) {
+    in_pos = UnpackAndDecode32Values<OutType, BIT_WIDTH>(
+        in_pos, in_bytes, dict, dict_len, out_pos, decode_error);
+    out_pos += BATCH_SIZE;
+    in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT;
+  }
+  // Then unpack the final partial batch.
+  if (remainder_values > 0) {
+    in_pos = UnpackAndDecodeUpTo31Values<OutType, BIT_WIDTH>(
+        in_pos, in_bytes, dict, dict_len, remainder_values, out_pos, 
decode_error);
+  }
+  return std::make_pair(in_pos, values_to_read);
+}
+
 // Loop body of unrolled loop that unpacks the value. BIT_WIDTH is the bit 
width of
 // the packed values. 'in_buf' is the start of the input buffer and 'out_vals' 
is the
 // start of the output values array. This function unpacks the VALUE_IDX'th 
packed value
@@ -111,60 +188,83 @@ inline uint32_t ALWAYS_INLINE UnpackValue(const uint8_t* 
__restrict__ in_buf) {
   }
 }
 
+template <typename OutType>
+inline void ALWAYS_INLINE DecodeValue(OutType* __restrict__ dict, int64_t 
dict_len,
+    uint32_t idx, OutType* __restrict__ out_val, bool* __restrict__ 
decode_error) {
+  if (UNLIKELY(idx >= dict_len)) {
+    *decode_error = true;
+  } else {
+    // Use memcpy() because we can't assume sufficient alignment in some cases 
(e.g.
+    // 16 byte decimals).
+    memcpy(out_val, &dict[idx], sizeof(OutType));
+  }
+}
+
 template <typename OutType, int BIT_WIDTH>
 const uint8_t* BitPacking::Unpack32Values(
     const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ 
out) {
   static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
   static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
-  static_assert(
-      BIT_WIDTH <= sizeof(OutType) * CHAR_BIT, "BIT_WIDTH too high for output 
type");
+  DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for 
output";
   constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH);
   DCHECK_GE(in_bytes, BYTES_TO_READ);
 
-// Call UnpackValue for 0 <= i < 32.
-#pragma push_macro("UNPACK_VALUES_CALL")
+  // Call UnpackValue for 0 <= i < 32.
+#pragma push_macro("UNPACK_VALUE_CALL")
 #define UNPACK_VALUE_CALL(ignore1, i, ignore2) \
   out[i] = static_cast<OutType>(UnpackValue<BIT_WIDTH, i>(in));
+
   BOOST_PP_REPEAT_FROM_TO(0, 32, UNPACK_VALUE_CALL, ignore);
-#pragma pop_macro("UNPACK_VALUES_CALL")
   return in + BYTES_TO_READ;
+#pragma pop_macro("UNPACK_VALUE_CALL")
 }
 
 template <typename OutType>
 const uint8_t* BitPacking::Unpack32Values(int bit_width, const uint8_t* 
__restrict__ in,
     int64_t in_bytes, OutType* __restrict__ out) {
-  switch (bit_width) {
-    // Expand cases from 0 to 32.
 #pragma push_macro("UNPACK_VALUES_CASE")
 #define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
     case i: return Unpack32Values<OutType, i>(in, in_bytes, out);
-    BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
-#pragma pop_macro("UNPACK_VALUES_CASE")
-    default: DCHECK(false); return in;
-  }
-}
 
-template <typename OutType>
-const uint8_t* BitPacking::UnpackUpTo32Values(int bit_width, const uint8_t* 
__restrict__ in,
-    int64_t in_bytes, int num_values, OutType* __restrict__ out) {
   switch (bit_width) {
     // Expand cases from 0 to 32.
-#pragma push_macro("UNPACK_VALUES_CASE")
-#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
-    case i: return UnpackUpTo32Values<OutType, i>(in, in_bytes, num_values, 
out);
     BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
-#pragma pop_macro("UNPACK_VALUES_CASE")
     default: DCHECK(false); return in;
   }
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+const uint8_t* BitPacking::UnpackAndDecode32Values(const uint8_t* __restrict__ 
in,
+    int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len,
+    OutType* __restrict__ out, bool* __restrict__ decode_error) {
+  static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
+  static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
+  DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for 
output";
+  constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH);
+  DCHECK_GE(in_bytes, BYTES_TO_READ);
+  // TODO: this could be optimised further by using SIMD instructions.
+  // 
https://lemire.me/blog/2016/08/25/faster-dictionary-decoding-with-simd-instructions/
+
+  // Call UnpackValue() and DecodeValue() for 0 <= i < 32.
+#pragma push_macro("DECODE_VALUE_CALL")
+#define DECODE_VALUE_CALL(ignore1, i, ignore2)               \
+  {                                                          \
+    uint32_t idx = UnpackValue<BIT_WIDTH, i>(in);            \
+    DecodeValue(dict, dict_len, idx, &out[i], decode_error); \
+  }
+
+  BOOST_PP_REPEAT_FROM_TO(0, 32, DECODE_VALUE_CALL, ignore);
+  return in + BYTES_TO_READ;
+#pragma pop_macro("DECODE_VALUE_CALL")
 }
 
 template <typename OutType, int BIT_WIDTH>
-const uint8_t* BitPacking::UnpackUpTo32Values(const uint8_t* __restrict__ in,
+const uint8_t* BitPacking::UnpackUpTo31Values(const uint8_t* __restrict__ in,
     int64_t in_bytes, int num_values, OutType* __restrict__ out) {
   static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
   static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
-  static_assert(
-      BIT_WIDTH <= sizeof(OutType) * CHAR_BIT, "BIT_WIDTH too high for output 
type");
+  DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for 
output";
   constexpr int MAX_BATCH_SIZE = 31;
   const int BYTES_TO_READ = BitUtil::RoundUpNumBytes(num_values * BIT_WIDTH);
   DCHECK_GE(in_bytes, BYTES_TO_READ);
@@ -183,19 +283,65 @@ const uint8_t* BitPacking::UnpackUpTo32Values(const 
uint8_t* __restrict__ in,
     in_buffer = tmp_buffer;
   }
 
-  // Use switch with fall-through cases to minimise branching.
-  switch (num_values) {
-// Expand cases from 31 down to 1.
 #pragma push_macro("UNPACK_VALUES_CASE")
 #define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
   case 31 - i: out[30 - i] = \
       static_cast<OutType>(UnpackValue<BIT_WIDTH, 30 - i>(in_buffer));
+
+  // Use switch with fall-through cases to minimise branching.
+  switch (num_values) {
+  // Expand cases from 31 down to 1.
     BOOST_PP_REPEAT_FROM_TO(0, 31, UNPACK_VALUES_CASE, ignore);
-#pragma pop_macro("UNPACK_VALUES_CASE")
     case 0: break;
     default: DCHECK(false);
   }
   return in + BYTES_TO_READ;
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* 
__restrict__ in,
+      int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, int 
num_values,
+      OutType* __restrict__ out, bool* __restrict__ decode_error) {
+  static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
+  static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
+  DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for 
output";
+  constexpr int MAX_BATCH_SIZE = 31;
+  const int BYTES_TO_READ = BitUtil::RoundUpNumBytes(num_values * BIT_WIDTH);
+  DCHECK_GE(in_bytes, BYTES_TO_READ);
+  DCHECK_LE(num_values, MAX_BATCH_SIZE);
+
+  // Make sure the buffer is at least 1 byte.
+  constexpr int TMP_BUFFER_SIZE = BIT_WIDTH ?
+    (BIT_WIDTH * (MAX_BATCH_SIZE + 1)) / CHAR_BIT : 1;
+  uint8_t tmp_buffer[TMP_BUFFER_SIZE];
+
+  const uint8_t* in_buffer = in;
+  // Copy into padded temporary buffer to avoid reading past the end of 'in' 
if the
+  // last 32-bit load would go past the end of the buffer.
+  if (BitUtil::RoundUp(BYTES_TO_READ, sizeof(uint32_t)) > in_bytes) {
+    memcpy(tmp_buffer, in, BYTES_TO_READ);
+    in_buffer = tmp_buffer;
+  }
+
+#pragma push_macro("DECODE_VALUES_CASE")
+#define DECODE_VALUES_CASE(ignore1, i, ignore2)                   \
+  case 31 - i: {                                                  \
+    uint32_t idx = UnpackValue<BIT_WIDTH, 30 - i>(in_buffer);     \
+    DecodeValue(dict, dict_len, idx, &out[30 - i], decode_error); \
+  }
+
+  // Use switch with fall-through cases to minimise branching.
+  switch (num_values) {
+    // Expand cases from 31 down to 1.
+    BOOST_PP_REPEAT_FROM_TO(0, 31, DECODE_VALUES_CASE, ignore);
+    case 0:
+      break;
+    default:
+      DCHECK(false);
+  }
+  return in + BYTES_TO_READ;
+#pragma pop_macro("DECODE_VALUES_CASE")
 }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h
index 5acdeee..bac127a 100644
--- a/be/src/util/bit-stream-utils.h
+++ b/be/src/util/bit-stream-utils.h
@@ -92,53 +92,67 @@ class BitWriter {
   int bit_offset_;        // Offset in buffered_values_
 };
 
-/// Utility class to read bit/byte stream.  This class can read bits or bytes
-/// that are either byte aligned or not.  It also has utilities to read 
multiple
-/// bytes in one read (e.g. encoded int).
-class BitReader {
+/// Utility class to read bit/byte stream. This class can read bits or bytes 
that are
+/// either byte aligned or not. It also has utilities to read multiple bytes 
in one
+/// read (e.g. encoded int). Exposes a batch-oriented interface to allow 
efficient
+/// processing of multiple values at a time.
+class BatchedBitReader {
  public:
   /// 'buffer' is the buffer to read from.  The buffer's length is 
'buffer_len'.
   /// Does not take ownership of the buffer.
-  BitReader(const uint8_t* buffer, int buffer_len) { Reset(buffer, 
buffer_len); }
+  BatchedBitReader(const uint8_t* buffer, int64_t buffer_len) {
+    Reset(buffer, buffer_len);
+  }
 
-  BitReader() : buffer_(NULL), max_bytes_(0) {}
+  BatchedBitReader() {}
 
-  // The implicit copy constructor is left defined. If a BitReader is copied, 
the
+  // The implicit copy constructor is left defined. If a BatchedBitReader is 
copied, the
   // two copies do not share any state. Invoking functions on either copy 
continues
   // reading from the current read position without modifying the state of the 
other
   // copy.
 
   /// Resets the read to start reading from the start of 'buffer'. The buffer's
   /// length is 'buffer_len'. Does not take ownership of the buffer.
-  void Reset(const uint8_t* buffer, int buffer_len) {
-    buffer_ = buffer;
-    max_bytes_ = buffer_len;
-    byte_offset_ = 0;
-    bit_offset_ = 0;
-    int num_bytes = std::min(8, max_bytes_);
-    memcpy(&buffered_values_, buffer_, num_bytes);
+  void Reset(const uint8_t* buffer, int64_t buffer_len) {
+    buffer_pos_ = buffer;
+    buffer_end_ = buffer + buffer_len;
   }
 
-  /// Gets the next value from the buffer.  Returns true if 'v' could be read 
or false if
-  /// there are not enough bytes left. num_bits must be <= 32.
+  /// Gets up to 'num_values' bit-packed values, starting from the current 
byte in the
+  /// buffer and advance the read position. 'bit_width' must be <= 32.
+  /// If 'bit_width' * 'num_values' is not a multiple of 8, the trailing bytes 
are
+  /// skipped and the next UnpackBatch() call will start reading from the next 
byte.
+  ///
+  /// If the caller does not want to drop trailing bits, 'num_values' must be 
exactly the
+  /// total number of values the caller wants to read from a run of bit-packed 
values, or
+  /// 'bit_width' * 'num_values' must be a multiple of 8. This condition is 
always
+  /// satisfied if 'num_values' is a multiple of 32.
+  ///
+  /// Returns the number of values read.
   template<typename T>
-  bool GetValue(int num_bits, T* v);
+  int UnpackBatch(int bit_width, int num_values, T* v);
 
-  /// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T 
needs to be a
-  /// little-endian native type and big enough to store 'num_bytes'. The value 
is assumed
-  /// to be byte-aligned so the stream will be advanced to the start of the 
next byte
-  /// before 'v' is read. Returns false if there are not enough bytes left.
+  /// Unpack bit-packed values in the same way as UnpackBatch() and decode 
them using the
+  /// dictionary 'dict' with 'dict_len' entries. Return -1 if a decoding error 
is
+  /// encountered, i.e. if the bit-packed values are not valid indices in 
'dict'.
+  /// Otherwise returns the number of values decoded.
   template<typename T>
-  bool GetAligned(int num_bytes, T* v);
+  int UnpackAndDecodeBatch(
+      int bit_width, T* dict, int64_t dict_len, int num_values, T* v);
+
+  /// Reads an unpacked 'num_bytes'-sized value from the buffer and stores it 
in 'v'. T
+  /// needs to be a little-endian native type and big enough to store 
'num_bytes'.
+  /// Returns false if there are not enough bytes left.
+  template<typename T>
+  bool GetBytes(int num_bytes, T* v);
 
   /// Reads a vlq encoded int from the stream.  The encoded int must start at 
the
   /// beginning of a byte. Return false if there were not enough bytes in the 
buffer or
   /// the int is invalid.
   bool GetVlqInt(int32_t* v);
 
-  /// Returns the number of bytes left in the stream, not including the 
current byte (i.e.,
-  /// there may be an additional fraction of a byte).
-  int bytes_left() { return max_bytes_ - (byte_offset_ + 
BitUtil::Ceil(bit_offset_, 8)); }
+  /// Returns the number of bytes left in the stream.
+  int bytes_left() { return buffer_end_ - buffer_pos_; }
 
   /// Maximum byte length of a vlq encoded int
   static const int MAX_VLQ_BYTE_LEN = 5;
@@ -147,17 +161,12 @@ class BitReader {
   static const int MAX_BITWIDTH = 32;
 
  private:
-  const uint8_t* buffer_;
-  int max_bytes_;
+  /// Current read position in the buffer.
+  const uint8_t* buffer_pos_ = nullptr;
 
-  /// Bytes are memcpy'd from buffer_ and values are read from this variable. 
This is
-  /// faster than reading values byte by byte directly from buffer_.
-  uint64_t buffered_values_;
-
-  int byte_offset_;       // Offset in buffer_
-  int bit_offset_;        // Offset in buffered_values_
+  /// Pointer to the byte after the end of the buffer.
+  const uint8_t* buffer_end_ = nullptr;
 };
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.inline.h 
b/be/src/util/bit-stream-utils.inline.h
index c8744aa..3974492 100644
--- a/be/src/util/bit-stream-utils.inline.h
+++ b/be/src/util/bit-stream-utils.inline.h
@@ -18,9 +18,11 @@
 #ifndef IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H
 #define IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H
 
-#include "common/compiler-util.h"
 #include "util/bit-stream-utils.h"
 
+#include "common/compiler-util.h"
+#include "util/bit-packing.inline.h"
+
 namespace impala {
 
 inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
@@ -84,83 +86,67 @@ inline bool BitWriter::PutVlqInt(int32_t v) {
   return result;
 }
 
-/// Force inlining - this is used in perf-critical loops in Parquet and GCC 
often doesn't
-/// inline it in cases where it's beneficial.
-template <typename T>
-ALWAYS_INLINE inline bool BitReader::GetValue(int num_bits, T* v) {
-  DCHECK(num_bits == 0 || buffer_ != NULL);
-  // TODO: revisit this limit if necessary
-  DCHECK_LE(num_bits, MAX_BITWIDTH);
-  DCHECK_LE(num_bits, sizeof(T) * 8);
-
-  // First do a cheap check to see if we may read past the end of the stream, 
using
-  // constant upper bounds for 'bit_offset_' and 'num_bits'.
-  if (UNLIKELY(byte_offset_ + sizeof(buffered_values_) + MAX_BITWIDTH / 8 > 
max_bytes_)) {
-    // Now do the precise check.
-    if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) {
-      return false;
-    }
-  }
-
-  DCHECK_GE(bit_offset_, 0);
-  DCHECK_LE(bit_offset_, 64);
-  *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> 
bit_offset_;
-
-  bit_offset_ += num_bits;
-  if (bit_offset_ >= 64) {
-    byte_offset_ += 8;
-    bit_offset_ -= 64;
-
-    int bytes_remaining = max_bytes_ - byte_offset_;
-    if (LIKELY(bytes_remaining >= 8)) {
-      memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
-    } else {
-      memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
-    }
+template<typename T>
+inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) {
+  DCHECK(buffer_pos_ != nullptr);
+  DCHECK_GE(bit_width, 0);
+  DCHECK_LE(bit_width, MAX_BITWIDTH);
+  DCHECK_LE(bit_width, sizeof(T) * 8);
+  DCHECK_GE(num_values, 0);
+
+  int64_t num_read;
+  std::tie(buffer_pos_, num_read) = BitPacking::UnpackValues(bit_width, 
buffer_pos_,
+      bytes_left(), num_values, v);
+  DCHECK_LE(buffer_pos_, buffer_end_);
+  DCHECK_LE(num_read, num_values);
+  return static_cast<int>(num_read);
+}
 
-    // Read bits of v that crossed into new buffered_values_
-    *v |= BitUtil::TrailingBits(buffered_values_, bit_offset_)
-          << (num_bits - bit_offset_);
-  }
-  DCHECK_LE(bit_offset_, 64);
-  return true;
+template<typename T>
+inline int BatchedBitReader::UnpackAndDecodeBatch(
+      int bit_width, T* dict, int64_t dict_len, int num_values, T* v){
+  DCHECK(buffer_pos_ != nullptr);
+  DCHECK_GE(bit_width, 0);
+  DCHECK_LE(bit_width, MAX_BITWIDTH);
+  DCHECK_LE(bit_width, sizeof(T) * 8);
+  DCHECK_GE(num_values, 0);
+
+  const uint8_t* new_buffer_pos;
+  int64_t num_read;
+  bool decode_error = false;
+  std::tie(new_buffer_pos, num_read) = 
BitPacking::UnpackAndDecodeValues(bit_width,
+      buffer_pos_, bytes_left(), dict, dict_len, num_values, v, &decode_error);
+  if (UNLIKELY(decode_error)) return -1;
+  buffer_pos_ = new_buffer_pos;
+  DCHECK_LE(buffer_pos_, buffer_end_);
+  DCHECK_LE(num_read, num_values);
+  return static_cast<int>(num_read);
 }
 
 template<typename T>
-inline bool BitReader::GetAligned(int num_bytes, T* v) {
+inline bool BatchedBitReader::GetBytes(int num_bytes, T* v) {
+  DCHECK(buffer_pos_ != nullptr);
+  DCHECK_GE(num_bytes, 0);
   DCHECK_LE(num_bytes, sizeof(T));
-  int bytes_read = BitUtil::Ceil(bit_offset_, 8);
-  if (UNLIKELY(byte_offset_ + bytes_read + num_bytes > max_bytes_)) return 
false;
-
-  // Advance byte_offset to next unread byte and read num_bytes
-  byte_offset_ += bytes_read;
-  memcpy(v, buffer_ + byte_offset_, num_bytes);
-  byte_offset_ += num_bytes;
-
-  // Reset buffered_values_
-  bit_offset_ = 0;
-  int bytes_remaining = max_bytes_ - byte_offset_;
-  if (LIKELY(bytes_remaining >= 8)) {
-    memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
-  } else {
-    memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
-  }
+  if (UNLIKELY(buffer_pos_ + num_bytes > buffer_end_)) return false;
+  *v = 0; // Ensure unset bytes are initialized to zero.
+  memcpy(v, buffer_pos_, num_bytes);
+  buffer_pos_ += num_bytes;
   return true;
 }
 
-inline bool BitReader::GetVlqInt(int32_t* v) {
+inline bool BatchedBitReader::GetVlqInt(int32_t* v) {
   *v = 0;
   int shift = 0;
   uint8_t byte = 0;
   do {
     if (UNLIKELY(shift >= MAX_VLQ_BYTE_LEN * 7)) return false;
-    if (!GetAligned<uint8_t>(1, &byte)) return false;
+    if (!GetBytes(1, &byte)) return false;
     *v |= (byte & 0x7F) << shift;
     shift += 7;
   } while ((byte & 0x80) != 0);
   return true;
 }
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index 62b3d3a..fa5e798 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -174,13 +174,16 @@ class DictDecoderBase {
     DCHECK_GE(buffer_len, 0);
     if (UNLIKELY(buffer_len == 0)) return Status("Dictionary cannot be 0 
bytes");
     uint8_t bit_width = *buffer;
-    if (UNLIKELY(bit_width < 0 || bit_width > BitReader::MAX_BITWIDTH)) {
+    if (UNLIKELY(bit_width < 0 || bit_width > BatchedBitReader::MAX_BITWIDTH)) 
{
       return Status(strings::Substitute("Dictionary has invalid or unsupported 
bit "
           "width: $0", bit_width));
     }
     ++buffer;
     --buffer_len;
     data_decoder_.Reset(buffer, buffer_len, bit_width);
+    num_repeats_ = 0;
+    num_literal_values_ = 0;
+    next_literal_idx_ = 0;
     return Status::OK();
   }
 
@@ -193,7 +196,22 @@ class DictDecoderBase {
   virtual void GetValue(int index, void* buffer) = 0;
 
  protected:
-  RleDecoder data_decoder_;
+  /// Number of decoded values to buffer at a time. A multiple of 32 is chosen 
to allow
+  /// efficient reading in batches from data_decoder_. Increasing the batch 
size up to
+  /// 128 seems to improve performance, but increasing further did not make a 
noticeable
+  /// difference.
+  static const int DECODED_BUFFER_SIZE = 128;
+
+  RleBatchDecoder<uint32_t> data_decoder_;
+
+  /// Greater than zero if we've started decoding a repeated run.
+  int64_t num_repeats_ = 0;
+
+  /// Greater than zero if we have buffered some literal values.
+  int num_literal_values_ = 0;
+
+  /// The index of the next decoded value to return.
+  int next_literal_idx_ = 0;
 };
 
 template<typename T>
@@ -211,7 +229,7 @@ class DictDecoder : public DictDecoderBase {
   /// Returns true if the dictionary values were all successfully decoded, or 
false
   /// if the dictionary was corrupt.
   template<parquet::Type::type PARQUET_TYPE>
-  bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size);
+  bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size) 
WARN_UNUSED_RESULT;
 
   virtual int num_entries() const { return dict_.size(); }
 
@@ -219,17 +237,26 @@ class DictDecoder : public DictDecoderBase {
     T* val_ptr = reinterpret_cast<T*>(buffer);
     DCHECK_GE(index, 0);
     DCHECK_LT(index, dict_.size());
-    // TODO: is there any circumstance where this should be a memcpy?
     *val_ptr = dict_[index];
   }
 
   /// Returns the next value.  Returns false if the data is invalid.
   /// For StringValues, this does not make a copy of the data.  Instead,
   /// the string data is from the dictionary buffer passed into the c'tor.
-  bool GetNextValue(T* value);
+  bool GetNextValue(T* value) WARN_UNUSED_RESULT;
 
  private:
   std::vector<T> dict_;
+
+  /// Decoded values, buffered to allow caller to consume one-by-one. If in 
the middle of
+  /// a repeated run, the first element is the current dict value. If in a 
literal run,
+  /// this contains 'num_literal_values_' values, with the next value to be 
returned at
+  /// 'next_literal_idx_'.
+  T decoded_values_[DECODED_BUFFER_SIZE];
+
+  /// Slow path for GetNextValue() where we need to decode new values. Should 
not be
+  /// inlined everywhere.
+  bool DecodeNextValue(T* value);
 };
 
 template<typename T>
@@ -290,30 +317,47 @@ inline int DictEncoder<StringValue>::AddToTable(const 
StringValue& value,
 // Force inlining - GCC does not always inline this into hot loops in Parquet 
scanner.
 template <typename T>
 ALWAYS_INLINE inline bool DictDecoder<T>::GetNextValue(T* value) {
-  int index = -1; // Initialize to avoid compiler warning.
-  bool result = data_decoder_.Get(&index);
-  // Use & to avoid branches.
-  if (LIKELY(result & (index >= 0) & (index < dict_.size()))) {
-    *value = dict_[index];
+  // IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not 
always 16
+  // byte aligned for Decimal16Values.
+  if (num_repeats_ > 0) {
+    --num_repeats_;
+    memcpy(value, &decoded_values_[0], sizeof(T));
+    return true;
+  } else if (next_literal_idx_ < num_literal_values_) {
+    int idx = next_literal_idx_++;
+    memcpy(value, &decoded_values_[idx], sizeof(T));
     return true;
   }
-  return false;
+  // No decoded values left - need to decode some more.
+  return DecodeNextValue(value);
 }
 
-// Force inlining - GCC does not always inline this into hot loops in Parquet 
scanner.
-template <>
-ALWAYS_INLINE inline bool DictDecoder<Decimal16Value>::GetNextValue(
-    Decimal16Value* value) {
-  int index;
-  bool result = data_decoder_.Get(&index);
-  if (!result) return false;
-  if (index >= dict_.size()) return false;
-  // Workaround for IMPALA-959. Use memcpy instead of '=' so addresses
-  // do not need to be 16 byte aligned.
-  uint8_t* addr = reinterpret_cast<uint8_t*>(dict_.data());
-  addr = addr + index * sizeof(*value);
-  memcpy(value, addr, sizeof(*value));
-  return true;
+template <typename T>
+bool DictDecoder<T>::DecodeNextValue(T* value) {
+  // IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not 
always 16
+  // byte aligned for Decimal16Values.
+  uint32_t num_repeats = data_decoder_.NextNumRepeats();
+  if (num_repeats > 0) {
+    uint32_t idx = data_decoder_.GetRepeatedValue(num_repeats);
+    if (UNLIKELY(idx >= dict_.size())) return false;
+    memcpy(&decoded_values_[0], &dict_[idx], sizeof(T));
+    memcpy(value, &decoded_values_[0], sizeof(T));
+    num_repeats_ = num_repeats - 1;
+    return true;
+  } else {
+    uint32_t num_literals = data_decoder_.NextNumLiterals();
+    if (UNLIKELY(num_literals == 0)) return false;
+
+    uint32_t num_to_decode = std::min<uint32_t>(num_literals, 
DECODED_BUFFER_SIZE);
+    if (UNLIKELY(!data_decoder_.DecodeLiteralValues(
+            num_to_decode, dict_.data(), dict_.size(), &decoded_values_[0]))) {
+      return false;
+    }
+    num_literal_values_ = num_to_decode;
+    memcpy(value, &decoded_values_[0], sizeof(T));
+    next_literal_idx_ = 1;
+    return true;
+  }
 }
 
 template<typename T>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index de0fb11..11043f0 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -17,7 +17,9 @@
 
 #include <stdlib.h>
 #include <stdio.h>
+
 #include <iostream>
+#include <utility>
 
 #include "runtime/mem-tracker.h"
 #include "runtime/string-value.inline.h"
@@ -65,7 +67,7 @@ void ValidateDict(const vector<InternalType>& values,
   ASSERT_OK(decoder.SetData(data_buffer, data_len));
   for (InternalType i: values) {
     InternalType j;
-    decoder.GetNextValue(&j);
+    ASSERT_TRUE(decoder.GetNextValue(&j));
     EXPECT_EQ(i, j);
   }
   pool.FreeAll();
@@ -196,6 +198,98 @@ TEST(DictTest, TestStringBufferOverrun) {
       0));
 }
 
+// Make sure that SetData() resets the dictionary decoder, including the 
embedded RLE
+// decoder to a clean state, even if the input is not fully consumed. The RLE 
decoder
+// has various state that needs to be reset.
+TEST(DictTest, SetDataAfterPartialRead) {
+  MemTracker tracker;
+  MemPool pool(&tracker);
+  DictEncoder<int> encoder(&pool, sizeof(int));
+
+  // Literal run followed by a repeated run.
+  vector<int> values{1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 
9};
+  for (int val: values) encoder.Put(val);
+
+  vector<uint8_t> dict_buffer(encoder.dict_encoded_size());
+  encoder.WriteDict(dict_buffer.data());
+  vector<uint8_t> data_buffer(encoder.EstimatedDataEncodedSize() * 2);
+  int data_len = encoder.WriteData(data_buffer.data(), data_buffer.size());
+  ASSERT_GT(data_len, 0);
+  encoder.ClearIndices();
+
+  DictDecoder<int> decoder;
+  ASSERT_TRUE(decoder.template Reset<parquet::Type::INT32>(
+      dict_buffer.data(), dict_buffer.size(), sizeof(int)));
+
+  // Test decoding some of the values, then resetting. If the decoder 
incorrectly
+  // caches some values, this could produce incorrect results.
+  for (int num_to_decode = 0; num_to_decode < values.size(); ++num_to_decode) {
+    ASSERT_OK(decoder.SetData(data_buffer.data(), data_buffer.size()));
+    for (int i = 0; i < num_to_decode; ++i) {
+      int val;
+      ASSERT_TRUE(decoder.GetNextValue(&val));
+      EXPECT_EQ(values[i], val) << num_to_decode << " " << i;
+    }
+  }
+}
+
+// Test handling of decode errors from out-of-range values.
+TEST(DictTest, DecodeErrors) {
+  MemTracker tracker;
+  MemPool pool(&tracker);
+  DictEncoder<int> small_dict_encoder(&pool, sizeof(int));
+
+  // Generate a dictionary with 9 values (requires 4 bits to encode).
+  vector<int> small_dict_values{1, 2, 3, 4, 5, 6, 7, 8, 9};
+  for (int val: small_dict_values) small_dict_encoder.Put(val);
+
+  vector<uint8_t> small_dict_buffer(small_dict_encoder.dict_encoded_size());
+  small_dict_encoder.WriteDict(small_dict_buffer.data());
+  small_dict_encoder.ClearIndices();
+
+  DictDecoder<int> small_dict_decoder;
+  ASSERT_TRUE(small_dict_decoder.template Reset<parquet::Type::INT32>(
+        small_dict_buffer.data(), small_dict_buffer.size(), sizeof(int)));
+
+  // Generate dictionary-encoded data with between 9 and 15 distinct values to 
test that
+  // error is detected when the decoder reads a 4-bit value that is out of 
range.
+  using TestCase = pair<string, vector<int>>;
+  vector<TestCase> test_cases{
+    {"Out-of-range value in a repeated run",
+        {10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 
10, 10, 10}},
+    {"Out-of-range literal run in the last < 32 element batch",
+      {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}},
+    {"Out-of-range literal run in the middle of a 32 element batch",
+      {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 1, 2, 3, 4, 5, 6, 7, 
8, 9, 10,
+       11, 12, 13, 14, 15, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
1, 1, 1, 1,
+       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
1, 1, 1}}};
+  for (TestCase& test_case: test_cases) {
+    // Encode the values. This will produce a dictionary with more distinct 
values than
+    // the small dictionary that we'll use to decode it.
+    DictEncoder<int> large_dict_encoder(&pool, sizeof(int));
+    // Initialize the dictionary with the values already in the small 
dictionary.
+    for (int val : small_dict_values) large_dict_encoder.Put(val);
+    large_dict_encoder.ClearIndices();
+
+    for (int val: test_case.second) large_dict_encoder.Put(val);
+
+    vector<uint8_t> data_buffer(large_dict_encoder.EstimatedDataEncodedSize() 
* 2);
+    int data_len = large_dict_encoder.WriteData(data_buffer.data(), 
data_buffer.size());
+    ASSERT_GT(data_len, 0);
+    large_dict_encoder.ClearIndices();
+
+    ASSERT_OK(small_dict_decoder.SetData(data_buffer.data(), 
data_buffer.size()));
+    bool failed = false;
+    for (int i = 0; i < test_case.second.size(); ++i) {
+      int val;
+      failed = !small_dict_decoder.GetNextValue(&val);
+      if (failed) break;
+    }
+    EXPECT_TRUE(failed) << "Should have detected out-of-range dict-encoded 
value in test "
+        << test_case.first;
+  }
+}
+
 }
 
 IMPALA_TEST_MAIN();


Reply via email to