IMPALA-4177,IMPALA-6039: batched bit reading and rle decoding Switch the decoders to using more batch-oriented interfaces. As an intermediate step this doesn't make the interfaces of LevelDecoder or DictDecoder batch-oriented, only the lower-level utility classes.
The next step would be to change those interfaces to be batch-oriented and make according optimisations in parquet. This could deliver much larger perf improvements than the current patch. The high-level changes are. * BitReader -> BatchedBitReader, which is built to unpack runs of 32 bit-packed values efficiently. * RleDecoder -> RleBatchDecoder, which exposes the repeated and literal runs to the caller and uses BatchedBitReader to unpack literal runs efficiently. * Dict decoding uses RleBatchDecoder to decode repeated runs efficiently and uses the BitPacking utilities to unpack and encode in a single step. Also removes an older benchmark that isn't too interesting (since the batch-oriented approach to encoding and decoding is so much faster than the value-by-value approach). Testing: * Ran core tests. * Updated unit tests to exercise new code. * Added test coverage for the deprecated bit-packed level encoding to that it still works (there was no coverage previously). Perf: Single-node benchmarks showed a few % performance gain. 16 node cluster benchmarks only showed a gain for TPC-H nested. Change-Id: I35de0cf80c86f501c4a39270afc8fb8111552ac6 Reviewed-on: http://gerrit.cloudera.org:8080/8267 Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ae116b5b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ae116b5b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ae116b5b Branch: refs/heads/master Commit: ae116b5bf7b8b2514d7a8655d9a6666ad3fd36dd Parents: 155bb77 Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Mon Oct 9 17:09:39 2017 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Thu Nov 16 21:23:09 2017 +0000 ---------------------------------------------------------------------- be/src/benchmarks/CMakeLists.txt | 1 - be/src/benchmarks/bit-packing-benchmark.cc | 120 ++++++ be/src/benchmarks/rle-benchmark.cc | 244 ------------ be/src/exec/parquet-column-readers.cc | 174 +++++---- be/src/exec/parquet-column-readers.h | 90 +++-- be/src/experiments/bit-stream-utils.8byte.h | 137 ------- .../experiments/bit-stream-utils.8byte.inline.h | 145 ------- be/src/util/bit-packing.h | 61 ++- be/src/util/bit-packing.inline.h | 210 ++++++++-- be/src/util/bit-stream-utils.h | 77 ++-- be/src/util/bit-stream-utils.inline.h | 106 +++-- be/src/util/dict-encoding.h | 94 +++-- be/src/util/dict-test.cc | 96 ++++- be/src/util/parquet-reader.cc | 21 +- be/src/util/rle-encoding.h | 383 +++++++++++++++---- be/src/util/rle-test.cc | 185 +++++---- testdata/data/README | 9 + .../alltypes_agg_bitpacked_def_levels.parquet | Bin 0 -> 208380 bytes .../queries/QueryTest/parquet-def-levels.test | 52 +++ tests/query_test/test_scanners.py | 16 + 20 files changed, 1253 insertions(+), 968 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/benchmarks/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/CMakeLists.txt b/be/src/benchmarks/CMakeLists.txt index 1d67d45..a569a66 100644 --- a/be/src/benchmarks/CMakeLists.txt +++ b/be/src/benchmarks/CMakeLists.txt @@ -48,7 +48,6 @@ ADD_BE_BENCHMARK(multiint-benchmark) ADD_BE_BENCHMARK(network-perf-benchmark) ADD_BE_BENCHMARK(overflow-benchmark) ADD_BE_BENCHMARK(parse-timestamp-benchmark) -ADD_BE_BENCHMARK(rle-benchmark) ADD_BE_BENCHMARK(row-batch-serialize-benchmark) ADD_BE_BENCHMARK(scheduler-benchmark) ADD_BE_BENCHMARK(status-benchmark) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/benchmarks/bit-packing-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/bit-packing-benchmark.cc b/be/src/benchmarks/bit-packing-benchmark.cc index 955c0fb..7769182 100644 --- a/be/src/benchmarks/bit-packing-benchmark.cc +++ b/be/src/benchmarks/bit-packing-benchmark.cc @@ -285,6 +285,126 @@ struct BenchmarkParams { int64_t data_len; }; +/// Legacy value-at-a-time implementation of bit unpacking. Retained here for +/// purposes of comparison in the benchmark. +class BitReader { + public: + /// 'buffer' is the buffer to read from. The buffer's length is 'buffer_len'. + /// Does not take ownership of the buffer. + BitReader(const uint8_t* buffer, int buffer_len) { Reset(buffer, buffer_len); } + + BitReader() : buffer_(NULL), max_bytes_(0) {} + + // The implicit copy constructor is left defined. If a BitReader is copied, the + // two copies do not share any state. Invoking functions on either copy continues + // reading from the current read position without modifying the state of the other + // copy. + + /// Resets the read to start reading from the start of 'buffer'. The buffer's + /// length is 'buffer_len'. Does not take ownership of the buffer. + void Reset(const uint8_t* buffer, int buffer_len) { + buffer_ = buffer; + max_bytes_ = buffer_len; + byte_offset_ = 0; + bit_offset_ = 0; + int num_bytes = std::min(8, max_bytes_); + memcpy(&buffered_values_, buffer_, num_bytes); + } + + /// Gets the next value from the buffer. Returns true if 'v' could be read or false if + /// there are not enough bytes left. num_bits must be <= 32. + template<typename T> + bool GetValue(int num_bits, T* v); + + /// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T needs to be a + /// little-endian native type and big enough to store 'num_bytes'. The value is assumed + /// to be byte-aligned so the stream will be advanced to the start of the next byte + /// before 'v' is read. Returns false if there are not enough bytes left. + template<typename T> + bool GetBytes(int num_bytes, T* v); + + /// Returns the number of bytes left in the stream, not including the current byte (i.e., + /// there may be an additional fraction of a byte). + int bytes_left() { return max_bytes_ - (byte_offset_ + BitUtil::Ceil(bit_offset_, 8)); } + + /// Maximum supported bitwidth for reader. + static const int MAX_BITWIDTH = 32; + + private: + const uint8_t* buffer_; + int max_bytes_; + + /// Bytes are memcpy'd from buffer_ and values are read from this variable. This is + /// faster than reading values byte by byte directly from buffer_. + uint64_t buffered_values_; + + int byte_offset_; // Offset in buffer_ + int bit_offset_; // Offset in buffered_values_ +}; + +template <typename T> +bool BitReader::GetValue(int num_bits, T* v) { + DCHECK(num_bits == 0 || buffer_ != NULL); + // TODO: revisit this limit if necessary + DCHECK_LE(num_bits, MAX_BITWIDTH); + DCHECK_LE(num_bits, sizeof(T) * 8); + + // First do a cheap check to see if we may read past the end of the stream, using + // constant upper bounds for 'bit_offset_' and 'num_bits'. + if (UNLIKELY(byte_offset_ + sizeof(buffered_values_) + MAX_BITWIDTH / 8 > max_bytes_)) { + // Now do the precise check. + if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) { + return false; + } + } + + DCHECK_GE(bit_offset_, 0); + DCHECK_LE(bit_offset_, 64); + *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> bit_offset_; + + bit_offset_ += num_bits; + if (bit_offset_ >= 64) { + byte_offset_ += 8; + bit_offset_ -= 64; + + int bytes_remaining = max_bytes_ - byte_offset_; + if (LIKELY(bytes_remaining >= 8)) { + memcpy(&buffered_values_, buffer_ + byte_offset_, 8); + } else { + memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining); + } + + // Read bits of v that crossed into new buffered_values_ + *v |= BitUtil::TrailingBits(buffered_values_, bit_offset_) + << (num_bits - bit_offset_); + } + DCHECK_LE(bit_offset_, 64); + return true; +} + +template<typename T> +bool BitReader::GetBytes(int num_bytes, T* v) { + DCHECK_LE(num_bytes, sizeof(T)); + int bytes_read = BitUtil::Ceil(bit_offset_, 8); + if (UNLIKELY(byte_offset_ + bytes_read + num_bytes > max_bytes_)) return false; + + // Advance byte_offset to next unread byte and read num_bytes + byte_offset_ += bytes_read; + *v = 0; // Ensure unset bytes are initialized to zero. + memcpy(v, buffer_ + byte_offset_, num_bytes); + byte_offset_ += num_bytes; + + // Reset buffered_values_ + bit_offset_ = 0; + int bytes_remaining = max_bytes_ - byte_offset_; + if (LIKELY(bytes_remaining >= 8)) { + memcpy(&buffered_values_, buffer_ + byte_offset_, 8); + } else { + memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining); + } + return true; +} + /// Benchmark calling BitReader::GetValue() in a loop to unpack 32 * 'batch_size' values. void BitReaderBenchmark(int batch_size, void* data) { const BenchmarkParams* p = reinterpret_cast<BenchmarkParams*>(data); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/benchmarks/rle-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/rle-benchmark.cc b/be/src/benchmarks/rle-benchmark.cc deleted file mode 100644 index 439c630..0000000 --- a/be/src/benchmarks/rle-benchmark.cc +++ /dev/null @@ -1,244 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <iostream> -#include <sstream> - -#include "runtime/mem-pool.h" -#include "runtime/mem-tracker.h" -#include "experiments/bit-stream-utils.8byte.inline.h" -#include "util/benchmark.h" -#include "util/bit-stream-utils.inline.h" -#include "util/cpu-info.h" - -#include "common/names.h" - -// Benchmark to measure how quickly we can do bit encoding and decoding. - -// encode: Function Rate (iters/ms) Comparison -// ---------------------------------------------------------------------- -// "BitWriter (8 byte) 1-Bit" 66.9 1X -// "BitWriter 1-Bit" 107.3 1.604X -// "BitWriter (8 byte) 2-Bit" 74.42 1X -// "BitWriter 2-Bit" 105.6 1.419X -// "BitWriter (8 byte) 3-Bit" 76.91 1X -// "BitWriter 3-Bit" 104 1.353X -// "BitWriter (8 byte) 4-Bit" 80.37 1X -// "BitWriter 4-Bit" 102.7 1.278X -// "BitWriter (8 byte) 5-Bit" 79.29 1X -// "BitWriter 5-Bit" 101 1.274X -// "BitWriter (8 byte) 6-Bit" 80.37 1X -// "BitWriter 6-Bit" 99.28 1.235X -// "BitWriter (8 byte) 7-Bit" 80.19 1X -// "BitWriter 7-Bit" 98.09 1.223X -// "BitWriter (8 byte) 8-Bit" 84.93 1X -// "BitWriter 8-Bit" 97 1.142X -// "BitWriter (8 byte) 9-Bit" 79.85 1X -// "BitWriter 9-Bit" 95.09 1.191X -// "BitWriter (8 byte) 10-Bit" 80.51 1X -// "BitWriter 10-Bit" 94.17 1.17X -// "BitWriter (8 byte) 11-Bit" 79.36 1X -// "BitWriter 11-Bit" 93.2 1.174X -// "BitWriter (8 byte) 12-Bit" 80.79 1X -// "BitWriter 12-Bit" 92.09 1.14X -// "BitWriter (8 byte) 13-Bit" 78.28 1X -// "BitWriter 13-Bit" 90.83 1.16X -// "BitWriter (8 byte) 14-Bit" 78.57 1X -// "BitWriter 14-Bit" 89.71 1.142X -// "BitWriter (8 byte) 15-Bit" 77.28 1X -// "BitWriter 15-Bit" 88 1.139X -// "BitWriter (8 byte) 16-Bit" 86.98 1X -// "BitWriter 16-Bit" 88.08 1.013X - -// decode: Function Rate (iters/ms) Comparison -// ---------------------------------------------------------------------- -// "BitWriter (8 byte) 1-Bit" 132.9 1X -// "BitWriter 1-Bit" 126.9 0.9546X -// "BitWriter (8 byte) 2-Bit" 132.9 1X -// "BitWriter 2-Bit" 125.6 0.9448X -// "BitWriter (8 byte) 3-Bit" 132.8 1X -// "BitWriter 3-Bit" 122.7 0.9237X -// "BitWriter (8 byte) 4-Bit" 133.1 1X -// "BitWriter 4-Bit" 123.6 0.9284X -// "BitWriter (8 byte) 5-Bit" 132.2 1X -// "BitWriter 5-Bit" 118.2 0.8942X -// "BitWriter (8 byte) 6-Bit" 132.9 1X -// "BitWriter 6-Bit" 117.6 0.885X -// "BitWriter (8 byte) 7-Bit" 132.3 1X -// "BitWriter 7-Bit" 112.8 0.8525X -// "BitWriter (8 byte) 8-Bit" 132.9 1X -// "BitWriter 8-Bit" 119.2 0.8971X -// "BitWriter (8 byte) 9-Bit" 131.8 1X -// "BitWriter 9-Bit" 111.3 0.8447X -// "BitWriter (8 byte) 10-Bit" 131.4 1X -// "BitWriter 10-Bit" 108.5 0.8255X -// "BitWriter (8 byte) 11-Bit" 131.7 1X -// "BitWriter 11-Bit" 106.9 0.8118X -// "BitWriter (8 byte) 12-Bit" 132.9 1X -// "BitWriter 12-Bit" 108.8 0.8189X -// "BitWriter (8 byte) 13-Bit" 131 1X -// "BitWriter 13-Bit" 103.1 0.7873X -// "BitWriter (8 byte) 14-Bit" 131.6 1X -// "BitWriter 14-Bit" 101.6 0.7724X -// "BitWriter (8 byte) 15-Bit" 131.1 1X -// "BitWriter 15-Bit" 99.91 0.7622X -// "BitWriter (8 byte) 16-Bit" 133 1X -// "BitWriter 16-Bit" 105.2 0.7907X - -using namespace impala; - -const int BUFFER_LEN = 64 * 4096; - -struct TestData { - uint8_t* array; - uint8_t* buffer; - int num_values; - int num_bits; - int max_value; - MemPool* pool; - bool result; -}; - -void TestBitWriterEncode(int batch_size, void* d) { - TestData* data = reinterpret_cast<TestData*>(d); - int buffer_size = BitUtil::Ceil(data->num_bits * data->num_values, 8); - for (int i = 0; i < batch_size; ++i) { - BitWriter writer(data->buffer, buffer_size); - // Unroll this to focus more on Put performance. - for (int j = 0; j < data->num_values; j += 8) { - writer.PutValue(j + 0, data->num_bits); - writer.PutValue(j + 1, data->num_bits); - writer.PutValue(j + 2, data->num_bits); - writer.PutValue(j + 3, data->num_bits); - writer.PutValue(j + 4, data->num_bits); - writer.PutValue(j + 5, data->num_bits); - writer.PutValue(j + 6, data->num_bits); - writer.PutValue(j + 7, data->num_bits); - } - writer.Flush(); - } -} - -void TestBitWriter8ByteEncode(int batch_size, void* d) { - TestData* data = reinterpret_cast<TestData*>(d); - int buffer_size = BitUtil::Ceil(data->num_bits * data->num_values, 8); - for (int i = 0; i < batch_size; ++i) { - BitWriter_8byte writer(data->buffer, buffer_size); - // Unroll this to focus more on Put performance. - for (int j = 0; j < data->num_values; j += 8) { - writer.PutValue(j + 0, data->num_bits); - writer.PutValue(j + 1, data->num_bits); - writer.PutValue(j + 2, data->num_bits); - writer.PutValue(j + 3, data->num_bits); - writer.PutValue(j + 4, data->num_bits); - writer.PutValue(j + 5, data->num_bits); - writer.PutValue(j + 6, data->num_bits); - writer.PutValue(j + 7, data->num_bits); - } - } -} - -void TestBitWriterDecode(int batch_size, void* d) { - TestData* data = reinterpret_cast<TestData*>(d); - int64_t v; - for (int i = 0; i < batch_size; ++i) { - BitReader reader(data->buffer, BUFFER_LEN); - // Unroll this to focus more on Put performance. - for (int j = 0; j < data->num_values; j += 8) { - reader.GetValue(data->num_bits, &v); - reader.GetValue(data->num_bits, &v); - reader.GetValue(data->num_bits, &v); - reader.GetValue(data->num_bits, &v); - reader.GetValue(data->num_bits, &v); - reader.GetValue(data->num_bits, &v); - reader.GetValue(data->num_bits, &v); - reader.GetValue(data->num_bits, &v); - } - } -} - -void TestBitWriter8ByteDecode(int batch_size, void* d) { - TestData* data = reinterpret_cast<TestData*>(d); - data->result = true; - int64_t v; - for (int i = 0; i < batch_size; ++i) { - BitReader_8byte reader(data->buffer, BUFFER_LEN); - // Unroll this to focus more on Put performance. - for (int j = 0; j < data->num_values; j += 8) { - data->result &= reader.GetValue(data->num_bits, &v); - data->result &= reader.GetValue(data->num_bits, &v); - data->result &= reader.GetValue(data->num_bits, &v); - data->result &= reader.GetValue(data->num_bits, &v); - data->result &= reader.GetValue(data->num_bits, &v); - data->result &= reader.GetValue(data->num_bits, &v); - data->result &= reader.GetValue(data->num_bits, &v); - data->result &= reader.GetValue(data->num_bits, &v); - } - } - CHECK(data->result); -} - -int main(int argc, char** argv) { - CpuInfo::Init(); - - MemTracker tracker; - MemPool pool(&tracker); - - int num_values = 4096; - int max_bits = 16; - - Benchmark encode_suite("encode"); - TestData data[max_bits]; - for (int i = 0; i < max_bits; ++i) { - data[i].buffer = new uint8_t[BUFFER_LEN]; - data[i].num_values = num_values; - data[i].num_bits = i + 1; - data[i].max_value = 1 << i; - data[i].pool = &pool; - - stringstream suffix; - suffix << " " << (i+1) << "-Bit"; - - stringstream name; - name << "\"BitWriter (8 byte)" << suffix.str() << "\""; - int baseline = - encode_suite.AddBenchmark(name.str(), TestBitWriter8ByteEncode, &data[i], -1); - - name.str(""); - name << "\"BitWriter" << suffix.str() << "\""; - encode_suite.AddBenchmark(name.str(), TestBitWriterEncode, &data[i], baseline); - } - cout << encode_suite.Measure() << endl; - - Benchmark decode_suite("decode"); - for (int i = 0; i < max_bits; ++i) { - stringstream suffix; - suffix << " " << (i+1) << "-Bit"; - - stringstream name; - name << "\"BitWriter (8 byte)" << suffix.str() << "\""; - int baseline = - decode_suite.AddBenchmark(name.str(), TestBitWriter8ByteDecode, &data[i], -1); - - name.str(""); - name << "\"BitWriter" << suffix.str() << "\""; - decode_suite.AddBenchmark(name.str(), TestBitWriterDecode, &data[i], baseline); - } - cout << decode_suite.Measure() << endl; - - return 0; -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/exec/parquet-column-readers.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc index 04127e3..5a2e90e 100644 --- a/be/src/exec/parquet-column-readers.cc +++ b/be/src/exec/parquet-column-readers.cc @@ -77,6 +77,8 @@ Status ParquetLevelDecoder::Init(const string& filename, parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size, int max_level, int num_buffered_values, uint8_t** data, int* data_size) { DCHECK_GE(num_buffered_values, 0); + DCHECK_GT(cache_size, 0); + cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32); encoding_ = encoding; max_level_ = max_level; num_buffered_values_ = num_buffered_values; @@ -97,7 +99,7 @@ Status ParquetLevelDecoder::Init(const string& filename, return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, num_bytes); } int bit_width = BitUtil::Log2Ceiling64(max_level + 1); - Reset(*data, num_bytes, bit_width); + rle_decoder_.Reset(*data, num_bytes, bit_width); break; } case parquet::Encoding::BIT_PACKED: @@ -143,66 +145,80 @@ Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) { } inline int16_t ParquetLevelDecoder::ReadLevel() { - bool valid; - uint8_t level; - if (encoding_ == parquet::Encoding::RLE) { - valid = Get(&level); - } else { - DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED); - valid = bit_reader_.GetValue(1, &level); + if (UNLIKELY(!CacheHasNext())) { + if (UNLIKELY(!FillCache(cache_size_, &num_cached_levels_))) { + return HdfsParquetScanner::INVALID_LEVEL; + } + DCHECK_GE(num_cached_levels_, 0); + if (UNLIKELY(num_cached_levels_ == 0)) { + return HdfsParquetScanner::INVALID_LEVEL; + } } - return LIKELY(valid) ? level : HdfsParquetScanner::INVALID_LEVEL; + return CacheGetNext(); } -Status ParquetLevelDecoder::CacheNextBatch(int batch_size) { - DCHECK_LE(batch_size, cache_size_); - cached_level_idx_ = 0; +Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) { + /// Fill the cache completely if there are enough values remaining. + /// Otherwise don't try to read more values than are left. + int batch_size = min(vals_remaining, cache_size_); if (max_level_ > 0) { - if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_))) { + if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_) || + num_cached_levels_ < batch_size)) { return Status(decoding_error_code_, num_buffered_values_, filename_); } } else { // No levels to read, e.g., because the field is required. The cache was // already initialized with all zeros, so we can hand out those values. DCHECK_EQ(max_level_, 0); + cached_level_idx_ = 0; num_cached_levels_ = batch_size; } return Status::OK(); } -bool ParquetLevelDecoder::FillCache(int batch_size, - int* num_cached_levels) { - DCHECK(num_cached_levels != NULL); - int num_values = 0; +bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) { + DCHECK(!CacheHasNext()); + DCHECK(num_cached_levels != nullptr); + DCHECK_GE(max_level_, 0); + DCHECK_EQ(num_cached_levels_ % 32, 0) << "Last batch was not a multiple of 32"; + cached_level_idx_ = 0; + if (max_level_ == 0) { + // No levels to read, e.g., because the field is required. The cache was + // already initialized with all zeros, so we can hand out those values. + *num_cached_levels = batch_size; + return true; + } if (encoding_ == parquet::Encoding::RLE) { - while (true) { - // Add RLE encoded values by repeating the current value this number of times. - uint32_t num_repeats_to_set = - min<uint32_t>(repeat_count_, batch_size - num_values); - memset(cached_levels_ + num_values, current_value_, num_repeats_to_set); - num_values += num_repeats_to_set; - repeat_count_ -= num_repeats_to_set; - - // Add remaining literal values, if any. - uint32_t num_literals_to_set = - min<uint32_t>(literal_count_, batch_size - num_values); - int num_values_end = min<uint32_t>(num_values + literal_count_, batch_size); - for (; num_values < num_values_end; ++num_values) { - bool valid = bit_reader_.GetValue(bit_width_, &cached_levels_[num_values]); - if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false; - } - literal_count_ -= num_literals_to_set; - - if (num_values == batch_size) break; - if (UNLIKELY(!NextCounts<int16_t>())) return false; - if (repeat_count_ > 0 && current_value_ > max_level_) return false; - } + return FillCacheRle(batch_size, num_cached_levels); } else { DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED); - for (; num_values < batch_size; ++num_values) { - bool valid = bit_reader_.GetValue(1, &cached_levels_[num_values]); - if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false; + *num_cached_levels = bit_reader_.UnpackBatch(1, batch_size, cached_levels_); + return true; + } +} + +bool ParquetLevelDecoder::FillCacheRle(int batch_size, int* num_cached_levels) { + int num_values = 0; + while (num_values < batch_size) { + // Add RLE encoded values by repeating the current value this number of times. + uint32_t num_repeats = rle_decoder_.NextNumRepeats(); + if (num_repeats > 0) { + uint32_t num_repeats_to_set = min<uint32_t>(num_repeats, batch_size - num_values); + uint8_t repeated_value = rle_decoder_.GetRepeatedValue(num_repeats_to_set); + memset(cached_levels_ + num_values, repeated_value, num_repeats_to_set); + num_values += num_repeats_to_set; + continue; + } + + // Add remaining literal values, if any. + uint32_t num_literals = rle_decoder_.NextNumLiterals(); + if (num_literals == 0) break; + uint32_t num_literals_to_set = min<uint32_t>(num_literals, batch_size - num_values); + if (!rle_decoder_.GetLiteralValues( + num_literals_to_set, &cached_levels_[num_values])) { + return false; } + num_values += num_literals_to_set; } *num_cached_levels = num_values; return true; @@ -282,9 +298,7 @@ class ScalarColumnReader : public BaseScalarColumnReader { // NextLevels() should have already been called and def and rep levels should be in // valid range. DCHECK_GE(rep_level_, 0); - DCHECK_LE(rep_level_, max_rep_level()); DCHECK_GE(def_level_, 0); - DCHECK_LE(def_level_, max_def_level()); DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << "Caller should have called NextLevels() until we are ready to read a value"; @@ -330,6 +344,7 @@ class ScalarColumnReader : public BaseScalarColumnReader { int val_count = 0; bool continue_execution = true; while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { + DCHECK_GE(num_buffered_values_, 0); // Read next page if necessary. if (num_buffered_values_ == 0) { if (!NextPage()) { @@ -338,26 +353,29 @@ class ScalarColumnReader : public BaseScalarColumnReader { } } - // Fill def/rep level caches if they are empty. - int level_batch_size = min(parent_->state_->batch_size(), num_buffered_values_); - if (!def_levels_.CacheHasNext()) { - parent_->parse_status_.MergeStatus(def_levels_.CacheNextBatch(level_batch_size)); - } - // We only need the repetition levels for populating the position slot since we - // are only populating top-level tuples. - if (IN_COLLECTION && pos_slot_desc_ != NULL && !rep_levels_.CacheHasNext()) { - parent_->parse_status_.MergeStatus(rep_levels_.CacheNextBatch(level_batch_size)); - } - if (UNLIKELY(!parent_->parse_status_.ok())) return false; - - // This special case is most efficiently handled here directly. + // Not materializing anything - skip decoding any levels and rely on the value + // count from page metadata to return the correct number of rows. if (!MATERIALIZED && !IN_COLLECTION) { - int vals_to_add = min(def_levels_.CacheRemaining(), max_values - val_count); + int vals_to_add = min(num_buffered_values_, max_values - val_count); val_count += vals_to_add; - def_levels_.CacheSkipLevels(vals_to_add); num_buffered_values_ -= vals_to_add; continue; } + // Fill the rep level cache if needed. We are flattening out the fields of the + // nested collection into the top-level tuple returned by the scan, so we don't + // care about the nesting structure unless the position slot is being populated. + if (IN_COLLECTION && pos_slot_desc_ != nullptr && !rep_levels_.CacheHasNext()) { + parent_->parse_status_.MergeStatus( + rep_levels_.CacheNextBatch(num_buffered_values_)); + if (UNLIKELY(!parent_->parse_status_.ok())) return false; + } + + // Fill def level cache if needed. + if (!def_levels_.CacheHasNext()) { + // TODO: add a fast path here if there's a run of repeated values. + parent_->parse_status_.MergeStatus( + def_levels_.CacheNextBatch(num_buffered_values_)); + } // Read data page and cached levels to materialize values. int cache_start_idx = def_levels_.CacheCurrIdx(); @@ -685,7 +703,9 @@ class BoolColumnReader : public BaseScalarColumnReader { virtual Status InitDataPage(uint8_t* data, int size) { // Initialize bool decoder - bool_values_ = BitReader(data, size); + bool_values_.Reset(data, size); + num_unpacked_values_ = 0; + unpacked_value_idx_ = 0; return Status::OK(); } @@ -695,9 +715,7 @@ class BoolColumnReader : public BaseScalarColumnReader { DCHECK(slot_desc_ != NULL); // Def and rep levels should be in valid range. DCHECK_GE(rep_level_, 0); - DCHECK_LE(rep_level_, max_rep_level()); DCHECK_GE(def_level_, 0); - DCHECK_LE(def_level_, max_def_level()); DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << "Caller should have called NextLevels() until we are ready to read a value"; @@ -719,14 +737,38 @@ class BoolColumnReader : public BaseScalarColumnReader { template<bool IN_COLLECTION> inline bool ReadSlot(Tuple* tuple, MemPool* pool) { void* slot = tuple->GetSlot(tuple_offset_); - if (!bool_values_.GetValue(1, reinterpret_cast<bool*>(slot))) { - parent_->parse_status_ = Status("Invalid bool column."); - return false; + bool val; + if (unpacked_value_idx_ < num_unpacked_values_) { + val = unpacked_values_[unpacked_value_idx_++]; + } else { + // Unpack as many values as we can into the buffer. We expect to read at least one + // value. + int num_unpacked = + bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]); + if (UNLIKELY(num_unpacked == 0)) { + parent_->parse_status_ = Status("Invalid bool column."); + return false; + } + val = unpacked_values_[0]; + num_unpacked_values_ = num_unpacked; + unpacked_value_idx_ = 1; } + *reinterpret_cast<bool*>(slot) = val; return NextLevels<IN_COLLECTION>(); } - BitReader bool_values_; + /// A buffer to store unpacked values. Must be a multiple of 32 size to use the + /// batch-oriented interface of BatchedBitReader. + static const int UNPACKED_BUFFER_LEN = 128; + bool unpacked_values_[UNPACKED_BUFFER_LEN]; + + /// The number of valid values in 'unpacked_values_'. + int num_unpacked_values_ = 0; + + /// The next value to return from 'unpacked_values_'. + int unpacked_value_idx_ = 0; + + BatchedBitReader bool_values_; }; // Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index dea84a8..17b8c0f 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -22,6 +22,7 @@ #include "exec/hdfs-parquet-scanner.h" #include "util/codec.h" +#include "util/bit-stream-utils.h" #include "util/dict-encoding.h" #include "util/rle-encoding.h" @@ -36,39 +37,34 @@ class MemPool; /// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up /// populating the level cache (e.g., with RLE we can memset() repeated values). /// -/// Inherits from RleDecoder instead of containing one for performance reasons. -/// The containment design would require two BitReaders per column reader. The extra -/// BitReader causes enough bloat for a column reader to require another cache line. -/// TODO: It is not clear whether the inheritance vs. containment choice still makes -/// sense with column-wise materialization. The containment design seems cleaner and -/// we should revisit. -class ParquetLevelDecoder : public RleDecoder { +/// TODO: expose whether we're in a run of repeated values so that callers can +/// optimise for that case. +class ParquetLevelDecoder { public: ParquetLevelDecoder(bool is_def_level_decoder) - : cached_levels_(NULL), - num_cached_levels_(0), - cached_level_idx_(0), - encoding_(parquet::Encoding::PLAIN), - max_level_(0), - cache_size_(0), - num_buffered_values_(0), - decoding_error_code_(is_def_level_decoder ? + : decoding_error_code_(is_def_level_decoder ? TErrorCode::PARQUET_DEF_LEVEL_ERROR : TErrorCode::PARQUET_REP_LEVEL_ERROR) { } /// Initialize the LevelDecoder. Reads and advances the provided data buffer if the - /// encoding requires reading metadata from the page header. + /// encoding requires reading metadata from the page header. 'cache_size' will be + /// rounded up to a multiple of 32 internally. Status Init(const string& filename, parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size, int max_level, int num_buffered_values, uint8_t** data, int* data_size); - /// Returns the next level or INVALID_LEVEL if there was an error. + /// Returns the next level or INVALID_LEVEL if there was an error. Not as efficient + /// as batched methods. inline int16_t ReadLevel(); - /// Decodes and caches the next batch of levels. Resets members associated with the - /// cache. Returns a non-ok status if there was a problem decoding a level, or if a - /// level was encountered with a value greater than max_level_. - Status CacheNextBatch(int batch_size); + /// Decodes and caches the next batch of levels given that there are 'vals_remaining' + /// values left to decode in the page. Resets members associated with the cache. + /// Returns a non-ok status if there was a problem decoding a level, if a level was + /// encountered with a value greater than max_level_, or if fewer than + /// min(CacheSize(), vals_remaining) levels could be read, which indicates that the + /// input did not have the expected number of values. Only valid to call when + /// the cache has been exhausted, i.e. CacheHasNext() is false. + Status CacheNextBatch(int vals_remaining); /// Functions for working with the level cache. inline bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; } @@ -83,33 +79,57 @@ class ParquetLevelDecoder : public RleDecoder { inline int CacheSize() const { return num_cached_levels_; } inline int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; } inline int CacheCurrIdx() const { return cached_level_idx_; } - private: /// Initializes members associated with the level cache. Allocates memory for /// the cache from pool, if necessary. Status InitCache(MemPool* pool, int cache_size); - /// Decodes and writes a batch of levels into the cache. Sets the number of - /// values written to the cache in *num_cached_levels. Returns false if there was - /// an error decoding a level or if there was a level value greater than max_level_. + /// Decodes and writes a batch of levels into the cache. Returns true and sets + /// the number of values written to the cache via *num_cached_levels if no errors + /// are encountered. *num_cached_levels is < 'batch_size' in this case iff the + /// end of input was hit without any other errors. Returns false if there was an + /// error decoding a level or if there was an invalid level value greater than + /// 'max_level_'. Only valid to call when the cache has been exhausted, i.e. + /// CacheHasNext() is false. bool FillCache(int batch_size, int* num_cached_levels); - /// Buffer for a batch of levels. The memory is allocated and owned by a pool in - /// passed in Init(). - uint8_t* cached_levels_; + /// Implementation of FillCache() for RLE encoding. + bool FillCacheRle(int batch_size, int* num_cached_levels); + + /// RLE decoder, used if 'encoding_' is RLE. + RleBatchDecoder<uint8_t> rle_decoder_; + + /// Bit unpacker, used if 'encoding_' is BIT_PACKED. + BatchedBitReader bit_reader_; + + /// Buffer for a batch of levels. The memory is allocated and owned by a pool passed + /// in Init(). + uint8_t* cached_levels_ = nullptr; + /// Number of valid level values in the cache. - int num_cached_levels_; + int num_cached_levels_ = 0; + /// Current index into cached_levels_. - int cached_level_idx_; - parquet::Encoding::type encoding_; + int cached_level_idx_ = 0; + + /// The parquet encoding used for the levels. Usually RLE but the deprecated BIT_PACKED + /// encoding is also allowed. + parquet::Encoding::type encoding_ = parquet::Encoding::PLAIN; /// For error checking and reporting. - int max_level_; - /// Number of level values cached_levels_ has memory allocated for. - int cache_size_; + int max_level_ = 0; + + /// Number of level values cached_levels_ has memory allocated for. Always + /// a multiple of 32 to allow reading directly from 'bit_reader_' in batches. + int cache_size_ = 0; + /// Number of remaining data values in the current data page. - int num_buffered_values_; + int num_buffered_values_ = 0; + + /// Name of the parquet file. Used for reporting level decoding errors. string filename_; + + /// Error code to use when reporting level decoding errors. TErrorCode::type decoding_error_code_; }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/experiments/bit-stream-utils.8byte.h ---------------------------------------------------------------------- diff --git a/be/src/experiments/bit-stream-utils.8byte.h b/be/src/experiments/bit-stream-utils.8byte.h deleted file mode 100644 index f9418a5..0000000 --- a/be/src/experiments/bit-stream-utils.8byte.h +++ /dev/null @@ -1,137 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#ifndef IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_H -#define IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_H - -#include <boost/cstdint.hpp> -#include <string.h> -#include "common/compiler-util.h" -#include "common/logging.h" -#include "util/bit-util.h" - -namespace impala { - -/// Utility class to write bit/byte streams. This class can write data to either be -/// bit packed or byte aligned (and a single stream that has a mix of both). -/// This class does not allocate memory. -class BitWriter_8byte { - public: - /// buffer: buffer to write bits to. Buffer should be preallocated with - /// 'buffer_len' bytes. 'buffer_len' must be a multiple of 8. - BitWriter_8byte(uint8_t* buffer, int buffer_len) : - buffer_(reinterpret_cast<uint64_t*>(buffer)), - max_bytes_(buffer_len), - offset_(0), - bit_offset_(0) { - DCHECK_EQ(buffer_len % 8, 0); - } - - void Clear() { - offset_ = 0; - bit_offset_ = 0; - memset(buffer_, 0, max_bytes_); - } - - uint8_t* buffer() const { return reinterpret_cast<uint8_t*>(buffer_); } - int buffer_len() const { return max_bytes_; } - - inline int bytes_written() const { - return offset_ * 8 + BitUtil::Ceil(bit_offset_, 8); - } - - /// Writes a value to the buffer. This is bit packed. Returns false if - /// there was not enough space. - bool PutValue(uint64_t v, int num_bits); - - /// Writes v to the next aligned byte. - template<typename T> - bool PutAligned(T v, int num_bits); - - /// Write a Vlq encoded int to the buffer. Returns false if there was not enough - /// room. The value is written byte aligned. - /// For more details on vlq: - /// en.wikipedia.org/wiki/Variable-length_quantity - bool PutVlqInt(int32_t v); - - /// Get a pointer to the next aligned byte and advance the underlying buffer - /// by num_bytes. - /// Returns NULL if there was not enough space. - uint8_t* GetNextBytePtr(int num_bytes = 1); - - private: - uint64_t* buffer_; - int max_bytes_; - int offset_; // Offset into buffer_ - int bit_offset_; // Offset into current uint64_t -}; - -/// Utility class to read bit/byte stream. This class can read bits or bytes -/// that are either byte aligned or not. It also has utilities to read multiple -/// bytes in one read (e.g. encoded int). -class BitReader_8byte { - public: - /// buffer: buffer to read from. The buffer's length is 'buffer_len' and must be a - /// multiple of 8. - BitReader_8byte(uint8_t* buffer, int buffer_len) : - buffer_(reinterpret_cast<uint64_t*>(buffer)), - max_bytes_(buffer_len), - offset_(0), - bit_offset_(0) { - DCHECK_EQ(buffer_len % 8, 0); - } - - BitReader_8byte() : buffer_(NULL), max_bytes_(0) {} - - /// Gets the next value from the buffer. - /// Returns true if 'v' could be read or false if there are not enough bytes left. - template<typename T> - bool GetValue(int num_bits, T* v); - - /// Reads a T sized value from the buffer. T needs to be a native type and little - /// endian. The value is assumed to be byte aligned so the stream will be advance - /// to the start of the next byte before v is read. - template<typename T> - bool GetAligned(int num_bits, T* v); - - /// Reads a vlq encoded int from the stream. The encoded int must start at the - /// beginning of a byte. Return false if there were not enough bytes in the buffer. - bool GetVlqInt(int32_t* v); - - /// Returns the number of bytes left in the stream, not including the current byte (i.e., - /// there may be an additional fraction of a byte). - inline int bytes_left() { - return max_bytes_ - (offset_ * 8 + BitUtil::Ceil(bit_offset_, 8)); - } - - /// Maximum byte length of a vlq encoded int - static const int MAX_VLQ_BYTE_LEN = 5; - - private: - uint64_t* buffer_; - int max_bytes_; - int offset_; // Offset into buffer_ - int bit_offset_; // Offset into current uint64_t - - /// Advances offset_ and/or bit_offset_ to next byte boundary in buffer_. - inline void Align(); -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/experiments/bit-stream-utils.8byte.inline.h ---------------------------------------------------------------------- diff --git a/be/src/experiments/bit-stream-utils.8byte.inline.h b/be/src/experiments/bit-stream-utils.8byte.inline.h deleted file mode 100644 index 1b1c05a..0000000 --- a/be/src/experiments/bit-stream-utils.8byte.inline.h +++ /dev/null @@ -1,145 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#ifndef IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_INLINE_H -#define IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_INLINE_H - -#include "experiments/bit-stream-utils.8byte.h" - -namespace impala { - -inline bool BitWriter_8byte::PutValue(uint64_t v, int num_bits) { - DCHECK_LE(num_bits, 64); - DCHECK(num_bits == 64 || v >> num_bits == 0) - << "v = " << v << ", num_bits = " << num_bits; - - if (UNLIKELY(offset_ * 8 + bit_offset_ > max_bytes_ * 8 - num_bits)) return false; - - buffer_[offset_] |= v << bit_offset_; - bit_offset_ += num_bits; - if (bit_offset_ >= 64) { - ++offset_; - bit_offset_ -= 64; - if (UNLIKELY(bit_offset_ > 0)) { - // Write out bits of v that crossed into new byte offset - // We cannot perform v >> num_bits (i.e. when bit_offset_ is 0) because v >> 64 != 0 - buffer_[offset_] = v >> (num_bits - bit_offset_); - } - } - DCHECK_LT(bit_offset_, 64); - return true; -} - -inline uint8_t* BitWriter_8byte::GetNextBytePtr(int num_bytes) { - if (UNLIKELY(bytes_written() + num_bytes > max_bytes_)) return NULL; - - // Advance to next aligned byte if necessary - if (UNLIKELY(bit_offset_ > 56)) { - ++offset_; - bit_offset_ = 0; - } else { - bit_offset_ = BitUtil::RoundUpNumBytes(bit_offset_) * 8; - } - - DCHECK_EQ(bit_offset_ % 8, 0); - uint8_t* ptr = reinterpret_cast<uint8_t*>(buffer_ + offset_) + bit_offset_ / 8; - bit_offset_ += num_bytes * 8; - offset_ += bit_offset_ / 64; - bit_offset_ %= 64; - return ptr; -} - -template<typename T> -inline bool BitWriter_8byte::PutAligned(T val, int num_bits) { - // Align to byte boundary - uint8_t* byte_ptr = GetNextBytePtr(0); - bool result = PutValue(val, num_bits); - if (!result) return false; - // Pad to next byte boundary - byte_ptr = GetNextBytePtr(0); - DCHECK(byte_ptr != NULL); - return true; -} - -inline bool BitWriter_8byte::PutVlqInt(int32_t v) { - bool result = true; - while ((v & 0xFFFFFF80) != 0L) { - result &= PutAligned<uint8_t>((v & 0x7F) | 0x80, 8); - v >>= 7; - } - result &= PutAligned<uint8_t>(v & 0x7F, 8); - return result; -} - -template<typename T> -inline bool BitReader_8byte::GetValue(int num_bits, T* v) { - int size = sizeof(T) * 8; - DCHECK_LE(num_bits, size); - if (UNLIKELY(offset_ * 8 + bit_offset_ > max_bytes_ * 8 - num_bits)) return false; - - *v = BitUtil::TrailingBits(buffer_[offset_], bit_offset_ + num_bits) >> bit_offset_; - bit_offset_ += num_bits; - if (bit_offset_ >= 64) { - ++offset_; - bit_offset_ -= 64; - // Read bits of v that crossed into new byte offset - *v |= BitUtil::TrailingBits(buffer_[offset_], bit_offset_) - << (num_bits - bit_offset_); - } - DCHECK_LT(bit_offset_, 64); - return true; -} - -template<typename T> -inline bool BitReader_8byte::GetAligned(int num_bits, T* v) { - Align(); - if (UNLIKELY(bytes_left() < BitUtil::Ceil(num_bits, 8))) return false; - DCHECK_EQ(bit_offset_ % 8, 0); - bool result = GetValue(num_bits, v); - DCHECK(result); - Align(); - return true; -} - -inline bool BitReader_8byte::GetVlqInt(int32_t* v) { - *v = 0; - int shift = 0; - int num_bytes = 0; - uint8_t byte = 0; - do { - if (!GetAligned<uint8_t>(8, &byte)) return false; - *v |= (byte & 0x7F) << shift; - shift += 7; - DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN); - } while ((byte & 0x80) != 0); - return true; -} - -inline void BitReader_8byte::Align() { - if (UNLIKELY(bit_offset_ > 56)) { - ++offset_; - bit_offset_ = 0; - DCHECK_LE(offset_, max_bytes_); - } else { - bit_offset_ = BitUtil::RoundUpNumBytes(bit_offset_) * 8; - } -} - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-packing.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-packing.h b/be/src/util/bit-packing.h index 62e5e88..05036db 100644 --- a/be/src/util/bit-packing.h +++ b/be/src/util/bit-packing.h @@ -62,6 +62,27 @@ class BitPacking { const uint8_t* __restrict__ in, int64_t in_bytes, int64_t num_values, OutType* __restrict__ out); + /// Same as above, templated by BIT_WIDTH. + template <typename OutType, int BIT_WIDTH> + static std::pair<const uint8_t*, int64_t> UnpackValues(const uint8_t* __restrict__ in, + int64_t in_bytes, int64_t num_values, OutType* __restrict__ out); + + /// Unpack values as above, treating them as unsigned integers, and decode them + /// using the provided dict. Sets 'decode_error' to true if one of the packed + /// values was greater than 'dict_len'. Does not modify 'decode_error' on success. + template <typename OutType> + static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(int bit_width, + const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict, + int64_t dict_len, int64_t num_values, OutType* __restrict__ out, + bool* __restrict__ decode_error); + + /// Same as above, templated by BIT_WIDTH. + template <typename OutType, int BIT_WIDTH> + static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues( + const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict, + int64_t dict_len, int64_t num_values, OutType* __restrict__ out, + bool* __restrict__ decode_error); + /// Unpack exactly 32 values of 'bit_width' from 'in' to 'out'. 'in' must point to /// 'in_bytes' of addressable memory, and 'in_bytes' must be at least /// (32 * bit_width / 8). 'out' must have space for 32 OutType values. @@ -70,22 +91,42 @@ class BitPacking { static const uint8_t* Unpack32Values(int bit_width, const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ out); - private: - /// Implementation of Unpack32Values() that uses 32-bit integer loads to - /// unpack values with the given BIT_WIDTH from 'in' to 'out'. + /// Same as Unpack32Values() but templated by BIT_WIDTH. template <typename OutType, int BIT_WIDTH> static const uint8_t* Unpack32Values( const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ out); - /// Function that unpacks 'num_values' values with the given BIT_WIDTH from 'in' to - /// 'out'. 'num_values' can be at most 32. The version with 'bit_width' as an argument - /// dispatches based on 'bit_width' to the appropriate templated implementation. - template <typename OutType, int BIT_WIDTH> - static const uint8_t* UnpackUpTo32Values(const uint8_t* __restrict__ in, - int64_t in_bytes, int num_values, OutType* __restrict__ out); + /// Same as Unpack32Values() with dictionary decoding. template <typename OutType> - static const uint8_t* UnpackUpTo32Values(int bit_width, const uint8_t* __restrict__ in, + static const uint8_t* UnpackAndDecode32Values(int bit_width, + const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict, + int64_t dict_len, OutType* __restrict__ out, bool* __restrict__ decode_error); + + /// Same as UnpackAndDecode32Values() but templated by BIT_WIDTH. + template <typename OutType, int BIT_WIDTH> + static const uint8_t* UnpackAndDecode32Values(const uint8_t* __restrict__ in, + int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, + OutType* __restrict__ out, bool* __restrict__ decode_error); + + /// Unpacks 'num_values' values with the given BIT_WIDTH from 'in' to 'out'. + /// 'num_values' must be at most 31. 'in' must point to 'in_bytes' of addressable + /// memory, and 'in_bytes' must be at least ceil(num_values * bit_width / 8). + /// 'out' must have space for 'num_values' OutType values. + /// 0 <= 'bit_width' <= 32 and 'bit_width' <= # of bits in OutType. + template <typename OutType, int BIT_WIDTH> + static const uint8_t* UnpackUpTo31Values(const uint8_t* __restrict__ in, int64_t in_bytes, int num_values, OutType* __restrict__ out); + + /// Same as UnpackUpTo31Values() with dictionary decoding. + template <typename OutType, int BIT_WIDTH> + static const uint8_t* UnpackAndDecodeUpTo31Values(const uint8_t* __restrict__ in, + int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, int num_values, + OutType* __restrict__ out, bool* __restrict__ decode_error); + + private: + /// Compute the number of values with the given bit width that can be unpacked from + /// an input buffer of 'in_bytes' into an output buffer with space for 'num_values'. + static int64_t NumValuesToUnpack(int bit_width, int64_t in_bytes, int64_t num_values); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-packing.inline.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-packing.inline.h b/be/src/util/bit-packing.inline.h index 37d51ab..4b2bb33 100644 --- a/be/src/util/bit-packing.inline.h +++ b/be/src/util/bit-packing.inline.h @@ -31,32 +31,109 @@ namespace impala { +inline int64_t BitPacking::NumValuesToUnpack( + int bit_width, int64_t in_bytes, int64_t num_values) { + // Check if we have enough input bytes to decode 'num_values'. + if (bit_width == 0 || BitUtil::RoundUpNumBytes(num_values * bit_width) <= in_bytes) { + // Limited by output space. + return num_values; + } else { + // Limited by the number of input bytes. Compute the number of values that can be + // unpacked from the input. + return (in_bytes * CHAR_BIT) / bit_width; + } +} + template <typename OutType> std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(int bit_width, const uint8_t* __restrict__ in, int64_t in_bytes, int64_t num_values, OutType* __restrict__ out) { +#pragma push_macro("UNPACK_VALUES_CASE") +#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \ + case i: \ + return UnpackValues<OutType, i>(in, in_bytes, num_values, out); + + switch (bit_width) { + // Expand cases from 0 to 32. + BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore); + default: + DCHECK(false); + return std::make_pair(nullptr, -1); + } +#pragma pop_macro("UNPACK_VALUES_CASE") +} + +template <typename OutType, int BIT_WIDTH> +std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues( + const uint8_t* __restrict__ in, int64_t in_bytes, int64_t num_values, + OutType* __restrict__ out) { constexpr int BATCH_SIZE = 32; - const int64_t max_input_values = - bit_width ? (in_bytes * CHAR_BIT) / bit_width : num_values; - const int64_t values_to_read = std::min(num_values, max_input_values); + const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, num_values); const int64_t batches_to_read = values_to_read / BATCH_SIZE; const int64_t remainder_values = values_to_read % BATCH_SIZE; const uint8_t* in_pos = in; OutType* out_pos = out; // First unpack as many full batches as possible. for (int64_t i = 0; i < batches_to_read; ++i) { - in_pos = Unpack32Values<OutType>(bit_width, in_pos, in_bytes, out_pos); + in_pos = Unpack32Values<OutType, BIT_WIDTH>(in_pos, in_bytes, out_pos); out_pos += BATCH_SIZE; - in_bytes -= (BATCH_SIZE * bit_width) / CHAR_BIT; + in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT; } // Then unpack the final partial batch. if (remainder_values > 0) { - in_pos = UnpackUpTo32Values<OutType>(bit_width, + in_pos = UnpackUpTo31Values<OutType, BIT_WIDTH>( in_pos, in_bytes, remainder_values, out_pos); } return std::make_pair(in_pos, values_to_read); } +template <typename OutType> +std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(int bit_width, + const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict, + int64_t dict_len, int64_t num_values, OutType* __restrict__ out, + bool* __restrict__ decode_error) { +#pragma push_macro("UNPACK_VALUES_CASE") +#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \ + case i: \ + return UnpackAndDecodeValues<OutType, i>( \ + in, in_bytes, dict, dict_len, num_values, out, decode_error); + + switch (bit_width) { + // Expand cases from 0 to 32. + BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore); + default: + DCHECK(false); + return std::make_pair(nullptr, -1); + } +#pragma pop_macro("UNPACK_VALUES_CASE") +} + +template <typename OutType, int BIT_WIDTH> +std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues( + const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict, + int64_t dict_len, int64_t num_values, OutType* __restrict__ out, + bool* __restrict__ decode_error) { + constexpr int BATCH_SIZE = 32; + const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, num_values); + const int64_t batches_to_read = values_to_read / BATCH_SIZE; + const int64_t remainder_values = values_to_read % BATCH_SIZE; + const uint8_t* in_pos = in; + OutType* out_pos = out; + // First unpack as many full batches as possible. + for (int64_t i = 0; i < batches_to_read; ++i) { + in_pos = UnpackAndDecode32Values<OutType, BIT_WIDTH>( + in_pos, in_bytes, dict, dict_len, out_pos, decode_error); + out_pos += BATCH_SIZE; + in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT; + } + // Then unpack the final partial batch. + if (remainder_values > 0) { + in_pos = UnpackAndDecodeUpTo31Values<OutType, BIT_WIDTH>( + in_pos, in_bytes, dict, dict_len, remainder_values, out_pos, decode_error); + } + return std::make_pair(in_pos, values_to_read); +} + // Loop body of unrolled loop that unpacks the value. BIT_WIDTH is the bit width of // the packed values. 'in_buf' is the start of the input buffer and 'out_vals' is the // start of the output values array. This function unpacks the VALUE_IDX'th packed value @@ -111,60 +188,83 @@ inline uint32_t ALWAYS_INLINE UnpackValue(const uint8_t* __restrict__ in_buf) { } } +template <typename OutType> +inline void ALWAYS_INLINE DecodeValue(OutType* __restrict__ dict, int64_t dict_len, + uint32_t idx, OutType* __restrict__ out_val, bool* __restrict__ decode_error) { + if (UNLIKELY(idx >= dict_len)) { + *decode_error = true; + } else { + // Use memcpy() because we can't assume sufficient alignment in some cases (e.g. + // 16 byte decimals). + memcpy(out_val, &dict[idx], sizeof(OutType)); + } +} + template <typename OutType, int BIT_WIDTH> const uint8_t* BitPacking::Unpack32Values( const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ out) { static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low"); static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32"); - static_assert( - BIT_WIDTH <= sizeof(OutType) * CHAR_BIT, "BIT_WIDTH too high for output type"); + DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output"; constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH); DCHECK_GE(in_bytes, BYTES_TO_READ); -// Call UnpackValue for 0 <= i < 32. -#pragma push_macro("UNPACK_VALUES_CALL") + // Call UnpackValue for 0 <= i < 32. +#pragma push_macro("UNPACK_VALUE_CALL") #define UNPACK_VALUE_CALL(ignore1, i, ignore2) \ out[i] = static_cast<OutType>(UnpackValue<BIT_WIDTH, i>(in)); + BOOST_PP_REPEAT_FROM_TO(0, 32, UNPACK_VALUE_CALL, ignore); -#pragma pop_macro("UNPACK_VALUES_CALL") return in + BYTES_TO_READ; +#pragma pop_macro("UNPACK_VALUE_CALL") } template <typename OutType> const uint8_t* BitPacking::Unpack32Values(int bit_width, const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ out) { - switch (bit_width) { - // Expand cases from 0 to 32. #pragma push_macro("UNPACK_VALUES_CASE") #define UNPACK_VALUES_CASE(ignore1, i, ignore2) \ case i: return Unpack32Values<OutType, i>(in, in_bytes, out); - BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore); -#pragma pop_macro("UNPACK_VALUES_CASE") - default: DCHECK(false); return in; - } -} -template <typename OutType> -const uint8_t* BitPacking::UnpackUpTo32Values(int bit_width, const uint8_t* __restrict__ in, - int64_t in_bytes, int num_values, OutType* __restrict__ out) { switch (bit_width) { // Expand cases from 0 to 32. -#pragma push_macro("UNPACK_VALUES_CASE") -#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \ - case i: return UnpackUpTo32Values<OutType, i>(in, in_bytes, num_values, out); BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore); -#pragma pop_macro("UNPACK_VALUES_CASE") default: DCHECK(false); return in; } +#pragma pop_macro("UNPACK_VALUES_CASE") +} + +template <typename OutType, int BIT_WIDTH> +const uint8_t* BitPacking::UnpackAndDecode32Values(const uint8_t* __restrict__ in, + int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, + OutType* __restrict__ out, bool* __restrict__ decode_error) { + static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low"); + static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32"); + DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output"; + constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH); + DCHECK_GE(in_bytes, BYTES_TO_READ); + // TODO: this could be optimised further by using SIMD instructions. + // https://lemire.me/blog/2016/08/25/faster-dictionary-decoding-with-simd-instructions/ + + // Call UnpackValue() and DecodeValue() for 0 <= i < 32. +#pragma push_macro("DECODE_VALUE_CALL") +#define DECODE_VALUE_CALL(ignore1, i, ignore2) \ + { \ + uint32_t idx = UnpackValue<BIT_WIDTH, i>(in); \ + DecodeValue(dict, dict_len, idx, &out[i], decode_error); \ + } + + BOOST_PP_REPEAT_FROM_TO(0, 32, DECODE_VALUE_CALL, ignore); + return in + BYTES_TO_READ; +#pragma pop_macro("DECODE_VALUE_CALL") } template <typename OutType, int BIT_WIDTH> -const uint8_t* BitPacking::UnpackUpTo32Values(const uint8_t* __restrict__ in, +const uint8_t* BitPacking::UnpackUpTo31Values(const uint8_t* __restrict__ in, int64_t in_bytes, int num_values, OutType* __restrict__ out) { static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low"); static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32"); - static_assert( - BIT_WIDTH <= sizeof(OutType) * CHAR_BIT, "BIT_WIDTH too high for output type"); + DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output"; constexpr int MAX_BATCH_SIZE = 31; const int BYTES_TO_READ = BitUtil::RoundUpNumBytes(num_values * BIT_WIDTH); DCHECK_GE(in_bytes, BYTES_TO_READ); @@ -183,19 +283,65 @@ const uint8_t* BitPacking::UnpackUpTo32Values(const uint8_t* __restrict__ in, in_buffer = tmp_buffer; } - // Use switch with fall-through cases to minimise branching. - switch (num_values) { -// Expand cases from 31 down to 1. #pragma push_macro("UNPACK_VALUES_CASE") #define UNPACK_VALUES_CASE(ignore1, i, ignore2) \ case 31 - i: out[30 - i] = \ static_cast<OutType>(UnpackValue<BIT_WIDTH, 30 - i>(in_buffer)); + + // Use switch with fall-through cases to minimise branching. + switch (num_values) { + // Expand cases from 31 down to 1. BOOST_PP_REPEAT_FROM_TO(0, 31, UNPACK_VALUES_CASE, ignore); -#pragma pop_macro("UNPACK_VALUES_CASE") case 0: break; default: DCHECK(false); } return in + BYTES_TO_READ; +#pragma pop_macro("UNPACK_VALUES_CASE") +} + +template <typename OutType, int BIT_WIDTH> +const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* __restrict__ in, + int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, int num_values, + OutType* __restrict__ out, bool* __restrict__ decode_error) { + static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low"); + static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32"); + DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output"; + constexpr int MAX_BATCH_SIZE = 31; + const int BYTES_TO_READ = BitUtil::RoundUpNumBytes(num_values * BIT_WIDTH); + DCHECK_GE(in_bytes, BYTES_TO_READ); + DCHECK_LE(num_values, MAX_BATCH_SIZE); + + // Make sure the buffer is at least 1 byte. + constexpr int TMP_BUFFER_SIZE = BIT_WIDTH ? + (BIT_WIDTH * (MAX_BATCH_SIZE + 1)) / CHAR_BIT : 1; + uint8_t tmp_buffer[TMP_BUFFER_SIZE]; + + const uint8_t* in_buffer = in; + // Copy into padded temporary buffer to avoid reading past the end of 'in' if the + // last 32-bit load would go past the end of the buffer. + if (BitUtil::RoundUp(BYTES_TO_READ, sizeof(uint32_t)) > in_bytes) { + memcpy(tmp_buffer, in, BYTES_TO_READ); + in_buffer = tmp_buffer; + } + +#pragma push_macro("DECODE_VALUES_CASE") +#define DECODE_VALUES_CASE(ignore1, i, ignore2) \ + case 31 - i: { \ + uint32_t idx = UnpackValue<BIT_WIDTH, 30 - i>(in_buffer); \ + DecodeValue(dict, dict_len, idx, &out[30 - i], decode_error); \ + } + + // Use switch with fall-through cases to minimise branching. + switch (num_values) { + // Expand cases from 31 down to 1. + BOOST_PP_REPEAT_FROM_TO(0, 31, DECODE_VALUES_CASE, ignore); + case 0: + break; + default: + DCHECK(false); + } + return in + BYTES_TO_READ; +#pragma pop_macro("DECODE_VALUES_CASE") } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-stream-utils.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h index 5acdeee..bac127a 100644 --- a/be/src/util/bit-stream-utils.h +++ b/be/src/util/bit-stream-utils.h @@ -92,53 +92,67 @@ class BitWriter { int bit_offset_; // Offset in buffered_values_ }; -/// Utility class to read bit/byte stream. This class can read bits or bytes -/// that are either byte aligned or not. It also has utilities to read multiple -/// bytes in one read (e.g. encoded int). -class BitReader { +/// Utility class to read bit/byte stream. This class can read bits or bytes that are +/// either byte aligned or not. It also has utilities to read multiple bytes in one +/// read (e.g. encoded int). Exposes a batch-oriented interface to allow efficient +/// processing of multiple values at a time. +class BatchedBitReader { public: /// 'buffer' is the buffer to read from. The buffer's length is 'buffer_len'. /// Does not take ownership of the buffer. - BitReader(const uint8_t* buffer, int buffer_len) { Reset(buffer, buffer_len); } + BatchedBitReader(const uint8_t* buffer, int64_t buffer_len) { + Reset(buffer, buffer_len); + } - BitReader() : buffer_(NULL), max_bytes_(0) {} + BatchedBitReader() {} - // The implicit copy constructor is left defined. If a BitReader is copied, the + // The implicit copy constructor is left defined. If a BatchedBitReader is copied, the // two copies do not share any state. Invoking functions on either copy continues // reading from the current read position without modifying the state of the other // copy. /// Resets the read to start reading from the start of 'buffer'. The buffer's /// length is 'buffer_len'. Does not take ownership of the buffer. - void Reset(const uint8_t* buffer, int buffer_len) { - buffer_ = buffer; - max_bytes_ = buffer_len; - byte_offset_ = 0; - bit_offset_ = 0; - int num_bytes = std::min(8, max_bytes_); - memcpy(&buffered_values_, buffer_, num_bytes); + void Reset(const uint8_t* buffer, int64_t buffer_len) { + buffer_pos_ = buffer; + buffer_end_ = buffer + buffer_len; } - /// Gets the next value from the buffer. Returns true if 'v' could be read or false if - /// there are not enough bytes left. num_bits must be <= 32. + /// Gets up to 'num_values' bit-packed values, starting from the current byte in the + /// buffer and advance the read position. 'bit_width' must be <= 32. + /// If 'bit_width' * 'num_values' is not a multiple of 8, the trailing bytes are + /// skipped and the next UnpackBatch() call will start reading from the next byte. + /// + /// If the caller does not want to drop trailing bits, 'num_values' must be exactly the + /// total number of values the caller wants to read from a run of bit-packed values, or + /// 'bit_width' * 'num_values' must be a multiple of 8. This condition is always + /// satisfied if 'num_values' is a multiple of 32. + /// + /// Returns the number of values read. template<typename T> - bool GetValue(int num_bits, T* v); + int UnpackBatch(int bit_width, int num_values, T* v); - /// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T needs to be a - /// little-endian native type and big enough to store 'num_bytes'. The value is assumed - /// to be byte-aligned so the stream will be advanced to the start of the next byte - /// before 'v' is read. Returns false if there are not enough bytes left. + /// Unpack bit-packed values in the same way as UnpackBatch() and decode them using the + /// dictionary 'dict' with 'dict_len' entries. Return -1 if a decoding error is + /// encountered, i.e. if the bit-packed values are not valid indices in 'dict'. + /// Otherwise returns the number of values decoded. template<typename T> - bool GetAligned(int num_bytes, T* v); + int UnpackAndDecodeBatch( + int bit_width, T* dict, int64_t dict_len, int num_values, T* v); + + /// Reads an unpacked 'num_bytes'-sized value from the buffer and stores it in 'v'. T + /// needs to be a little-endian native type and big enough to store 'num_bytes'. + /// Returns false if there are not enough bytes left. + template<typename T> + bool GetBytes(int num_bytes, T* v); /// Reads a vlq encoded int from the stream. The encoded int must start at the /// beginning of a byte. Return false if there were not enough bytes in the buffer or /// the int is invalid. bool GetVlqInt(int32_t* v); - /// Returns the number of bytes left in the stream, not including the current byte (i.e., - /// there may be an additional fraction of a byte). - int bytes_left() { return max_bytes_ - (byte_offset_ + BitUtil::Ceil(bit_offset_, 8)); } + /// Returns the number of bytes left in the stream. + int bytes_left() { return buffer_end_ - buffer_pos_; } /// Maximum byte length of a vlq encoded int static const int MAX_VLQ_BYTE_LEN = 5; @@ -147,17 +161,12 @@ class BitReader { static const int MAX_BITWIDTH = 32; private: - const uint8_t* buffer_; - int max_bytes_; + /// Current read position in the buffer. + const uint8_t* buffer_pos_ = nullptr; - /// Bytes are memcpy'd from buffer_ and values are read from this variable. This is - /// faster than reading values byte by byte directly from buffer_. - uint64_t buffered_values_; - - int byte_offset_; // Offset in buffer_ - int bit_offset_; // Offset in buffered_values_ + /// Pointer to the byte after the end of the buffer. + const uint8_t* buffer_end_ = nullptr; }; - } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-stream-utils.inline.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h index c8744aa..3974492 100644 --- a/be/src/util/bit-stream-utils.inline.h +++ b/be/src/util/bit-stream-utils.inline.h @@ -18,9 +18,11 @@ #ifndef IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H #define IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H -#include "common/compiler-util.h" #include "util/bit-stream-utils.h" +#include "common/compiler-util.h" +#include "util/bit-packing.inline.h" + namespace impala { inline bool BitWriter::PutValue(uint64_t v, int num_bits) { @@ -84,83 +86,67 @@ inline bool BitWriter::PutVlqInt(int32_t v) { return result; } -/// Force inlining - this is used in perf-critical loops in Parquet and GCC often doesn't -/// inline it in cases where it's beneficial. -template <typename T> -ALWAYS_INLINE inline bool BitReader::GetValue(int num_bits, T* v) { - DCHECK(num_bits == 0 || buffer_ != NULL); - // TODO: revisit this limit if necessary - DCHECK_LE(num_bits, MAX_BITWIDTH); - DCHECK_LE(num_bits, sizeof(T) * 8); - - // First do a cheap check to see if we may read past the end of the stream, using - // constant upper bounds for 'bit_offset_' and 'num_bits'. - if (UNLIKELY(byte_offset_ + sizeof(buffered_values_) + MAX_BITWIDTH / 8 > max_bytes_)) { - // Now do the precise check. - if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) { - return false; - } - } - - DCHECK_GE(bit_offset_, 0); - DCHECK_LE(bit_offset_, 64); - *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> bit_offset_; - - bit_offset_ += num_bits; - if (bit_offset_ >= 64) { - byte_offset_ += 8; - bit_offset_ -= 64; - - int bytes_remaining = max_bytes_ - byte_offset_; - if (LIKELY(bytes_remaining >= 8)) { - memcpy(&buffered_values_, buffer_ + byte_offset_, 8); - } else { - memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining); - } +template<typename T> +inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) { + DCHECK(buffer_pos_ != nullptr); + DCHECK_GE(bit_width, 0); + DCHECK_LE(bit_width, MAX_BITWIDTH); + DCHECK_LE(bit_width, sizeof(T) * 8); + DCHECK_GE(num_values, 0); + + int64_t num_read; + std::tie(buffer_pos_, num_read) = BitPacking::UnpackValues(bit_width, buffer_pos_, + bytes_left(), num_values, v); + DCHECK_LE(buffer_pos_, buffer_end_); + DCHECK_LE(num_read, num_values); + return static_cast<int>(num_read); +} - // Read bits of v that crossed into new buffered_values_ - *v |= BitUtil::TrailingBits(buffered_values_, bit_offset_) - << (num_bits - bit_offset_); - } - DCHECK_LE(bit_offset_, 64); - return true; +template<typename T> +inline int BatchedBitReader::UnpackAndDecodeBatch( + int bit_width, T* dict, int64_t dict_len, int num_values, T* v){ + DCHECK(buffer_pos_ != nullptr); + DCHECK_GE(bit_width, 0); + DCHECK_LE(bit_width, MAX_BITWIDTH); + DCHECK_LE(bit_width, sizeof(T) * 8); + DCHECK_GE(num_values, 0); + + const uint8_t* new_buffer_pos; + int64_t num_read; + bool decode_error = false; + std::tie(new_buffer_pos, num_read) = BitPacking::UnpackAndDecodeValues(bit_width, + buffer_pos_, bytes_left(), dict, dict_len, num_values, v, &decode_error); + if (UNLIKELY(decode_error)) return -1; + buffer_pos_ = new_buffer_pos; + DCHECK_LE(buffer_pos_, buffer_end_); + DCHECK_LE(num_read, num_values); + return static_cast<int>(num_read); } template<typename T> -inline bool BitReader::GetAligned(int num_bytes, T* v) { +inline bool BatchedBitReader::GetBytes(int num_bytes, T* v) { + DCHECK(buffer_pos_ != nullptr); + DCHECK_GE(num_bytes, 0); DCHECK_LE(num_bytes, sizeof(T)); - int bytes_read = BitUtil::Ceil(bit_offset_, 8); - if (UNLIKELY(byte_offset_ + bytes_read + num_bytes > max_bytes_)) return false; - - // Advance byte_offset to next unread byte and read num_bytes - byte_offset_ += bytes_read; - memcpy(v, buffer_ + byte_offset_, num_bytes); - byte_offset_ += num_bytes; - - // Reset buffered_values_ - bit_offset_ = 0; - int bytes_remaining = max_bytes_ - byte_offset_; - if (LIKELY(bytes_remaining >= 8)) { - memcpy(&buffered_values_, buffer_ + byte_offset_, 8); - } else { - memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining); - } + if (UNLIKELY(buffer_pos_ + num_bytes > buffer_end_)) return false; + *v = 0; // Ensure unset bytes are initialized to zero. + memcpy(v, buffer_pos_, num_bytes); + buffer_pos_ += num_bytes; return true; } -inline bool BitReader::GetVlqInt(int32_t* v) { +inline bool BatchedBitReader::GetVlqInt(int32_t* v) { *v = 0; int shift = 0; uint8_t byte = 0; do { if (UNLIKELY(shift >= MAX_VLQ_BYTE_LEN * 7)) return false; - if (!GetAligned<uint8_t>(1, &byte)) return false; + if (!GetBytes(1, &byte)) return false; *v |= (byte & 0x7F) << shift; shift += 7; } while ((byte & 0x80) != 0); return true; } - } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/dict-encoding.h ---------------------------------------------------------------------- diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h index 62b3d3a..fa5e798 100644 --- a/be/src/util/dict-encoding.h +++ b/be/src/util/dict-encoding.h @@ -174,13 +174,16 @@ class DictDecoderBase { DCHECK_GE(buffer_len, 0); if (UNLIKELY(buffer_len == 0)) return Status("Dictionary cannot be 0 bytes"); uint8_t bit_width = *buffer; - if (UNLIKELY(bit_width < 0 || bit_width > BitReader::MAX_BITWIDTH)) { + if (UNLIKELY(bit_width < 0 || bit_width > BatchedBitReader::MAX_BITWIDTH)) { return Status(strings::Substitute("Dictionary has invalid or unsupported bit " "width: $0", bit_width)); } ++buffer; --buffer_len; data_decoder_.Reset(buffer, buffer_len, bit_width); + num_repeats_ = 0; + num_literal_values_ = 0; + next_literal_idx_ = 0; return Status::OK(); } @@ -193,7 +196,22 @@ class DictDecoderBase { virtual void GetValue(int index, void* buffer) = 0; protected: - RleDecoder data_decoder_; + /// Number of decoded values to buffer at a time. A multiple of 32 is chosen to allow + /// efficient reading in batches from data_decoder_. Increasing the batch size up to + /// 128 seems to improve performance, but increasing further did not make a noticeable + /// difference. + static const int DECODED_BUFFER_SIZE = 128; + + RleBatchDecoder<uint32_t> data_decoder_; + + /// Greater than zero if we've started decoding a repeated run. + int64_t num_repeats_ = 0; + + /// Greater than zero if we have buffered some literal values. + int num_literal_values_ = 0; + + /// The index of the next decoded value to return. + int next_literal_idx_ = 0; }; template<typename T> @@ -211,7 +229,7 @@ class DictDecoder : public DictDecoderBase { /// Returns true if the dictionary values were all successfully decoded, or false /// if the dictionary was corrupt. template<parquet::Type::type PARQUET_TYPE> - bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size); + bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size) WARN_UNUSED_RESULT; virtual int num_entries() const { return dict_.size(); } @@ -219,17 +237,26 @@ class DictDecoder : public DictDecoderBase { T* val_ptr = reinterpret_cast<T*>(buffer); DCHECK_GE(index, 0); DCHECK_LT(index, dict_.size()); - // TODO: is there any circumstance where this should be a memcpy? *val_ptr = dict_[index]; } /// Returns the next value. Returns false if the data is invalid. /// For StringValues, this does not make a copy of the data. Instead, /// the string data is from the dictionary buffer passed into the c'tor. - bool GetNextValue(T* value); + bool GetNextValue(T* value) WARN_UNUSED_RESULT; private: std::vector<T> dict_; + + /// Decoded values, buffered to allow caller to consume one-by-one. If in the middle of + /// a repeated run, the first element is the current dict value. If in a literal run, + /// this contains 'num_literal_values_' values, with the next value to be returned at + /// 'next_literal_idx_'. + T decoded_values_[DECODED_BUFFER_SIZE]; + + /// Slow path for GetNextValue() where we need to decode new values. Should not be + /// inlined everywhere. + bool DecodeNextValue(T* value); }; template<typename T> @@ -290,30 +317,47 @@ inline int DictEncoder<StringValue>::AddToTable(const StringValue& value, // Force inlining - GCC does not always inline this into hot loops in Parquet scanner. template <typename T> ALWAYS_INLINE inline bool DictDecoder<T>::GetNextValue(T* value) { - int index = -1; // Initialize to avoid compiler warning. - bool result = data_decoder_.Get(&index); - // Use & to avoid branches. - if (LIKELY(result & (index >= 0) & (index < dict_.size()))) { - *value = dict_[index]; + // IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not always 16 + // byte aligned for Decimal16Values. + if (num_repeats_ > 0) { + --num_repeats_; + memcpy(value, &decoded_values_[0], sizeof(T)); + return true; + } else if (next_literal_idx_ < num_literal_values_) { + int idx = next_literal_idx_++; + memcpy(value, &decoded_values_[idx], sizeof(T)); return true; } - return false; + // No decoded values left - need to decode some more. + return DecodeNextValue(value); } -// Force inlining - GCC does not always inline this into hot loops in Parquet scanner. -template <> -ALWAYS_INLINE inline bool DictDecoder<Decimal16Value>::GetNextValue( - Decimal16Value* value) { - int index; - bool result = data_decoder_.Get(&index); - if (!result) return false; - if (index >= dict_.size()) return false; - // Workaround for IMPALA-959. Use memcpy instead of '=' so addresses - // do not need to be 16 byte aligned. - uint8_t* addr = reinterpret_cast<uint8_t*>(dict_.data()); - addr = addr + index * sizeof(*value); - memcpy(value, addr, sizeof(*value)); - return true; +template <typename T> +bool DictDecoder<T>::DecodeNextValue(T* value) { + // IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not always 16 + // byte aligned for Decimal16Values. + uint32_t num_repeats = data_decoder_.NextNumRepeats(); + if (num_repeats > 0) { + uint32_t idx = data_decoder_.GetRepeatedValue(num_repeats); + if (UNLIKELY(idx >= dict_.size())) return false; + memcpy(&decoded_values_[0], &dict_[idx], sizeof(T)); + memcpy(value, &decoded_values_[0], sizeof(T)); + num_repeats_ = num_repeats - 1; + return true; + } else { + uint32_t num_literals = data_decoder_.NextNumLiterals(); + if (UNLIKELY(num_literals == 0)) return false; + + uint32_t num_to_decode = std::min<uint32_t>(num_literals, DECODED_BUFFER_SIZE); + if (UNLIKELY(!data_decoder_.DecodeLiteralValues( + num_to_decode, dict_.data(), dict_.size(), &decoded_values_[0]))) { + return false; + } + num_literal_values_ = num_to_decode; + memcpy(value, &decoded_values_[0], sizeof(T)); + next_literal_idx_ = 1; + return true; + } } template<typename T> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/dict-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc index de0fb11..11043f0 100644 --- a/be/src/util/dict-test.cc +++ b/be/src/util/dict-test.cc @@ -17,7 +17,9 @@ #include <stdlib.h> #include <stdio.h> + #include <iostream> +#include <utility> #include "runtime/mem-tracker.h" #include "runtime/string-value.inline.h" @@ -65,7 +67,7 @@ void ValidateDict(const vector<InternalType>& values, ASSERT_OK(decoder.SetData(data_buffer, data_len)); for (InternalType i: values) { InternalType j; - decoder.GetNextValue(&j); + ASSERT_TRUE(decoder.GetNextValue(&j)); EXPECT_EQ(i, j); } pool.FreeAll(); @@ -196,6 +198,98 @@ TEST(DictTest, TestStringBufferOverrun) { 0)); } +// Make sure that SetData() resets the dictionary decoder, including the embedded RLE +// decoder to a clean state, even if the input is not fully consumed. The RLE decoder +// has various state that needs to be reset. +TEST(DictTest, SetDataAfterPartialRead) { + MemTracker tracker; + MemPool pool(&tracker); + DictEncoder<int> encoder(&pool, sizeof(int)); + + // Literal run followed by a repeated run. + vector<int> values{1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9}; + for (int val: values) encoder.Put(val); + + vector<uint8_t> dict_buffer(encoder.dict_encoded_size()); + encoder.WriteDict(dict_buffer.data()); + vector<uint8_t> data_buffer(encoder.EstimatedDataEncodedSize() * 2); + int data_len = encoder.WriteData(data_buffer.data(), data_buffer.size()); + ASSERT_GT(data_len, 0); + encoder.ClearIndices(); + + DictDecoder<int> decoder; + ASSERT_TRUE(decoder.template Reset<parquet::Type::INT32>( + dict_buffer.data(), dict_buffer.size(), sizeof(int))); + + // Test decoding some of the values, then resetting. If the decoder incorrectly + // caches some values, this could produce incorrect results. + for (int num_to_decode = 0; num_to_decode < values.size(); ++num_to_decode) { + ASSERT_OK(decoder.SetData(data_buffer.data(), data_buffer.size())); + for (int i = 0; i < num_to_decode; ++i) { + int val; + ASSERT_TRUE(decoder.GetNextValue(&val)); + EXPECT_EQ(values[i], val) << num_to_decode << " " << i; + } + } +} + +// Test handling of decode errors from out-of-range values. +TEST(DictTest, DecodeErrors) { + MemTracker tracker; + MemPool pool(&tracker); + DictEncoder<int> small_dict_encoder(&pool, sizeof(int)); + + // Generate a dictionary with 9 values (requires 4 bits to encode). + vector<int> small_dict_values{1, 2, 3, 4, 5, 6, 7, 8, 9}; + for (int val: small_dict_values) small_dict_encoder.Put(val); + + vector<uint8_t> small_dict_buffer(small_dict_encoder.dict_encoded_size()); + small_dict_encoder.WriteDict(small_dict_buffer.data()); + small_dict_encoder.ClearIndices(); + + DictDecoder<int> small_dict_decoder; + ASSERT_TRUE(small_dict_decoder.template Reset<parquet::Type::INT32>( + small_dict_buffer.data(), small_dict_buffer.size(), sizeof(int))); + + // Generate dictionary-encoded data with between 9 and 15 distinct values to test that + // error is detected when the decoder reads a 4-bit value that is out of range. + using TestCase = pair<string, vector<int>>; + vector<TestCase> test_cases{ + {"Out-of-range value in a repeated run", + {10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10}}, + {"Out-of-range literal run in the last < 32 element batch", + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}}, + {"Out-of-range literal run in the middle of a 32 element batch", + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 11, 12, 13, 14, 15, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}}}; + for (TestCase& test_case: test_cases) { + // Encode the values. This will produce a dictionary with more distinct values than + // the small dictionary that we'll use to decode it. + DictEncoder<int> large_dict_encoder(&pool, sizeof(int)); + // Initialize the dictionary with the values already in the small dictionary. + for (int val : small_dict_values) large_dict_encoder.Put(val); + large_dict_encoder.ClearIndices(); + + for (int val: test_case.second) large_dict_encoder.Put(val); + + vector<uint8_t> data_buffer(large_dict_encoder.EstimatedDataEncodedSize() * 2); + int data_len = large_dict_encoder.WriteData(data_buffer.data(), data_buffer.size()); + ASSERT_GT(data_len, 0); + large_dict_encoder.ClearIndices(); + + ASSERT_OK(small_dict_decoder.SetData(data_buffer.data(), data_buffer.size())); + bool failed = false; + for (int i = 0; i < test_case.second.size(); ++i) { + int val; + failed = !small_dict_decoder.GetNextValue(&val); + if (failed) break; + } + EXPECT_TRUE(failed) << "Should have detected out-of-range dict-encoded value in test " + << test_case.first; + } +} + } IMPALA_TEST_MAIN();