IMPALA-4835: Part 3: switch I/O buffers to buffer pool

This is the final patch to switch the Disk I/O manager to allocate all
buffer from the buffer pool and to reserve the buffers required for
a query upfront.

* The planner reserves enough memory to run a single scanner per
  scan node.
* The multi-threaded scan node must increase reservation before
  spinning up more threads.
* The scanner implementations must be careful to stay within their
  assigned reservation.

The row-oriented scanners were most straightforward, since they only
have a single scan range active at a time. A single I/O buffer is
sufficient to scan the whole file but more I/O buffers can improve I/O
throughput.

Parquet is more complex because it issues a scan range per column and
the sizes of the columns on disk are not known during planning. To
deal with this, the reservation in the frontend is based on a
heuristic involving the file size and # columns. The Parquet scanner
can then divvy up reservation to columns based on the size of column
data on disk.

I adjusted how the 'mem_limit' is divided between buffer pool and non
buffer pool memory for low mem_limits to account for the increase in
buffer pool memory.

Testing:
* Added more planner tests to cover reservation calcs for scan node.
* Test scanners for all file formats with the reservation denial debug
  action, to test behaviour when the scanners hit reservation limits.
* Updated memory and buffer pool limits for tests.
* Added unit tests for dividing reservation between columns in parquet,
  since the algorithm is non-trivial.

Perf:
I ran TPC-H and targeted perf locally comparing with master. Both
showed small improvements of a few percent and no regressions of
note. Cluster perf tests showed no significant change.

Change-Id: Ic09c6196b31e55b301df45cc56d0b72cfece6786
Reviewed-on: http://gerrit.cloudera.org:8080/8966
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/24b4ed0b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/24b4ed0b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/24b4ed0b

Branch: refs/heads/master
Commit: 24b4ed0b29a44090350e630d625291c01b753a36
Parents: 5699b59
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Fri Jan 5 16:47:03 2018 -0800
Committer: Tim Armstrong <tarmstr...@cloudera.com>
Committed: Fri Feb 23 04:17:41 2018 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |    1 +
 be/src/exec/hdfs-parquet-scanner-test.cc        |   96 +
 be/src/exec/hdfs-parquet-scanner.cc             |  218 ++-
 be/src/exec/hdfs-parquet-scanner.h              |   38 +-
 be/src/exec/hdfs-scan-node-base.cc              |   27 +-
 be/src/exec/hdfs-scan-node-base.h               |    3 +
 be/src/exec/hdfs-scan-node-mt.cc                |   11 +-
 be/src/exec/hdfs-scan-node.cc                   |  143 +-
 be/src/exec/hdfs-scan-node.h                    |   66 +-
 be/src/exec/parquet-column-readers.cc           |  108 +-
 be/src/exec/parquet-column-readers.h            |   88 +-
 be/src/exec/scanner-context.cc                  |   28 +-
 be/src/exec/scanner-context.h                   |   54 +-
 .../bufferpool/reservation-tracker-test.cc      |    8 +-
 be/src/runtime/bufferpool/reservation-util.cc   |    2 +-
 be/src/runtime/io/disk-io-mgr-stress-test.cc    |   38 +-
 be/src/runtime/io/disk-io-mgr-stress.cc         |   69 +-
 be/src/runtime/io/disk-io-mgr-stress.h          |   26 +-
 be/src/runtime/io/disk-io-mgr-test.cc           |  542 +++---
 be/src/runtime/io/disk-io-mgr.cc                |   34 +-
 be/src/runtime/io/disk-io-mgr.h                 |   35 +-
 be/src/runtime/io/request-context.cc            |   29 +-
 be/src/runtime/io/request-context.h             |   14 +-
 be/src/runtime/io/request-ranges.h              |   15 +-
 be/src/runtime/io/scan-range.cc                 |    6 +-
 be/src/runtime/tmp-file-mgr.cc                  |    2 +-
 common/thrift/PlanNodes.thrift                  |    3 +
 .../apache/impala/analysis/SlotDescriptor.java  |   19 +
 .../org/apache/impala/analysis/SlotRef.java     |   20 -
 .../org/apache/impala/planner/HdfsScanNode.java |  167 +-
 .../java/org/apache/impala/util/BitUtil.java    |    6 +
 .../org/apache/impala/util/BitUtilTest.java     |    6 +
 .../queries/PlannerTest/constant-folding.test   |   42 +-
 .../queries/PlannerTest/disable-codegen.test    |   24 +-
 .../PlannerTest/fk-pk-join-detection.test       |   78 +-
 .../queries/PlannerTest/max-row-size.test       |   80 +-
 .../PlannerTest/min-max-runtime-filters.test    |    6 +-
 .../queries/PlannerTest/mt-dop-validation.test  |   40 +-
 .../queries/PlannerTest/parquet-filtering.test  |   42 +-
 .../queries/PlannerTest/partition-pruning.test  |    4 +-
 .../PlannerTest/resource-requirements.test      | 1814 ++++++++++++++----
 .../PlannerTest/sort-expr-materialization.test  |   32 +-
 .../PlannerTest/spillable-buffer-sizing.test    |  192 +-
 .../queries/PlannerTest/tablesample.test        |   44 +-
 .../queries/PlannerTest/union.test              |    8 +-
 .../admission-reject-min-reservation.test       |   12 +-
 .../queries/QueryTest/analytic-fns.test         |    5 +-
 .../queries/QueryTest/codegen-mem-limit.test    |    5 +-
 .../QueryTest/disk-spill-encryption.test        |    2 +-
 .../queries/QueryTest/explain-level0.test       |    4 +-
 .../queries/QueryTest/explain-level1.test       |    4 +-
 .../queries/QueryTest/explain-level2.test       |   14 +-
 .../queries/QueryTest/explain-level3.test       |   14 +-
 .../queries/QueryTest/nested-types-tpch.test    |    6 +-
 .../queries/QueryTest/runtime_row_filters.test  |   11 +-
 .../queries/QueryTest/scanners.test             |    7 +
 .../functional-query/queries/QueryTest/set.test |    2 +-
 .../queries/QueryTest/spilling-aggs.test        |   19 +-
 .../spilling-naaj-no-deny-reservation.test      |    7 +-
 .../queries/QueryTest/spilling-naaj.test        |    8 +-
 .../QueryTest/spilling-no-debug-action.test     |   66 +
 .../QueryTest/spilling-sorts-exhaustive.test    |   10 +-
 .../queries/QueryTest/spilling.test             |   76 +-
 .../queries/QueryTest/stats-extrapolation.test  |   52 +-
 .../tpch/queries/sort-reservation-usage.test    |    9 +-
 tests/common/test_dimensions.py                 |   16 +-
 tests/custom_cluster/test_scratch_disk.py       |    2 +-
 tests/query_test/test_mem_usage_scaling.py      |   11 +-
 tests/query_test/test_query_mem_limit.py        |    4 +-
 tests/query_test/test_scanners.py               |   25 +-
 tests/query_test/test_scanners_fuzz.py          |    9 +-
 tests/query_test/test_sort.py                   |   16 +-
 tests/query_test/test_spilling.py               |    5 +
 73 files changed, 3204 insertions(+), 1545 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index aab1383..c1f91d6 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -108,3 +108,4 @@ ADD_BE_TEST(parquet-version-test)
 ADD_BE_TEST(row-batch-list-test)
 ADD_BE_TEST(incr-stats-util-test)
 ADD_BE_TEST(hdfs-avro-scanner-test)
+ADD_BE_TEST(hdfs-parquet-scanner-test)

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-parquet-scanner-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner-test.cc 
b/be/src/exec/hdfs-parquet-scanner-test.cc
new file mode 100644
index 0000000..cbc6e76
--- /dev/null
+++ b/be/src/exec/hdfs-parquet-scanner-test.cc
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs-parquet-scanner.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+static const int64_t MIN_BUFFER_SIZE = 64 * 1024;
+static const int64_t MAX_BUFFER_SIZE = 8 * 1024 * 1024;
+
+namespace impala {
+
+class HdfsParquetScannerTest : public testing::Test {
+ protected:
+  void TestDivideReservation(const vector<int64_t>& col_range_lengths,
+      int64_t total_col_reservation, const vector<int64_t>& 
expected_reservations);
+};
+
+/// Test that DivideReservationBetweenColumns() returns 
'expected_reservations' for
+/// inputs 'col_range_lengths' and 'total_col_reservation'.
+void HdfsParquetScannerTest::TestDivideReservation(const vector<int64_t>& 
col_range_lengths,
+      int64_t total_col_reservation, const vector<int64_t>& 
expected_reservations) {
+  vector<pair<int, int64_t>> reservations =
+      HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
+      MIN_BUFFER_SIZE, MAX_BUFFER_SIZE, col_range_lengths, 
total_col_reservation);
+  for (int i = 0; i < reservations.size(); ++i) {
+    LOG(INFO) << i << " " << reservations[i].first << " " << 
reservations[i].second;
+  }
+  EXPECT_EQ(reservations.size(), expected_reservations.size());
+  vector<bool> present(expected_reservations.size(), false);
+  for (auto& reservation: reservations) {
+    // Ensure that each appears exactly once.
+    EXPECT_FALSE(present[reservation.first]);
+    present[reservation.first] = true;
+    EXPECT_EQ(expected_reservations[reservation.first], reservation.second)
+        << reservation.first;
+  }
+}
+
+TEST_F(HdfsParquetScannerTest, DivideReservation) {
+  // Test a long scan ranges with lots of memory - should allocate 3 max-size
+  // buffers per range.
+  TestDivideReservation({100 * 1024 * 1024}, 50 * 1024 * 1024, {3 * 
MAX_BUFFER_SIZE});
+  TestDivideReservation({100 * 1024 * 1024, 50 * 1024 * 1024}, 100 * 1024 * 
1024,
+        {3 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
+
+  // Long scan ranges, not enough memory for 3 buffers each. Should only 
allocate
+  // max-sized buffers, preferring the longer scan range.
+  TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 5 * 
MAX_BUFFER_SIZE,
+        {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
+  TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024},
+        5 * MAX_BUFFER_SIZE + MIN_BUFFER_SIZE,
+        {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
+  TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 6 * 
MAX_BUFFER_SIZE - 1,
+        {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
+
+  // Test a short range with lots of memory - should round up buffer size.
+  TestDivideReservation({100 * 1024}, 50 * 1024 * 1024, {128 * 1024});
+
+  // Test a range << MIN_BUFFER_SIZE - should round up to buffer size.
+  TestDivideReservation({13}, 50 * 1024 * 1024, {MIN_BUFFER_SIZE});
+
+  // Test long ranges with limited memory.
+  TestDivideReservation({100 * 1024 * 1024}, 100 * 1024, {MIN_BUFFER_SIZE});
+  TestDivideReservation({100 * 1024 * 1024}, MIN_BUFFER_SIZE, 
{MIN_BUFFER_SIZE});
+  TestDivideReservation({100 * 1024 * 1024}, 2 * MIN_BUFFER_SIZE, {2 * 
MIN_BUFFER_SIZE});
+  TestDivideReservation({100 * 1024 * 1024}, MAX_BUFFER_SIZE - 1, 
{MAX_BUFFER_SIZE / 2});
+  TestDivideReservation({100 * 1024 * 1024, 1024 * 1024, MIN_BUFFER_SIZE},
+      3 * MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE, MIN_BUFFER_SIZE, 
MIN_BUFFER_SIZE});
+
+  // Test a mix of scan range lengths larger than and smaller than the max I/O 
buffer
+  // size. Long ranges get allocated most memory.
+  TestDivideReservation(
+      {15145047, 5019635, 5019263, 15145047, 15145047, 5019635, 5019263, 
317304},
+      25165824,
+      {8388608, 2097152, 524288, 8388608, 4194304, 1048576, 262144, 262144});
+}
+
+}
+
+IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index 0188f08..51a39be 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -17,6 +17,7 @@
 
 #include "exec/hdfs-parquet-scanner.h"
 
+#include <algorithm>
 #include <queue>
 
 #include <gutil/strings/substitute.h>
@@ -27,6 +28,7 @@
 #include "exec/parquet-column-stats.h"
 #include "exec/scanner-context.inline.h"
 #include "runtime/collection-value-builder.h"
+#include "runtime/exec-env.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/runtime-state.h"
 #include "runtime/runtime-filter.inline.h"
@@ -35,6 +37,7 @@
 #include "common/names.h"
 
 using std::move;
+using std::sort;
 using namespace impala;
 using namespace impala::io;
 
@@ -47,10 +50,6 @@ constexpr int BATCHES_PER_FILTER_SELECTIVITY_CHECK = 16;
 static_assert(BitUtil::IsPowerOf2(BATCHES_PER_FILTER_SELECTIVITY_CHECK),
     "BATCHES_PER_FILTER_SELECTIVITY_CHECK must be a power of two");
 
-// Max dictionary page header size in bytes. This is an estimate and only 
needs to be an
-// upper bound.
-const int MAX_DICT_HEADER_SIZE = 100;
-
 // Max entries in the dictionary before switching to PLAIN encoding. If a 
dictionary
 // has fewer entries, then the entire column is dictionary encoded. This 
threshold
 // is guaranteed to be true for Impala versions 2.9 or below.
@@ -99,7 +98,7 @@ Status 
HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
             static_cast<ScanRangeMetadata*>(split->meta_data());
         // Each split is processed by first issuing a scan range for the file 
footer, which
         // is done here, followed by scan ranges for the columns of each row 
group within
-        // the actual split (in InitColumns()). The original split is stored 
in the
+        // the actual split (see InitScalarColumns()). The original split is 
stored in the
         // metadata associated with the footer range.
         ScanRange* footer_range;
         if (footer_split != nullptr) {
@@ -229,9 +228,14 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   // Release I/O buffers immediately to make sure they are cleaned up
   // in case we return a non-OK status anywhere below.
+  int64_t stream_reservation = stream_->reservation();
+  stream_ = nullptr;
   context_->ReleaseCompletedResources(true);
   context_->ClearStreams();
   RETURN_IF_ERROR(footer_status);
+  // The scanner-wide stream was used only to read the file footer.  Each 
column has added
+  // its own stream. We can use the reservation from 'stream_' for the columns 
now.
+  total_col_reservation_ = stream_reservation;
 
   // Parse the file schema into an internal representation for schema 
resolution.
   schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(),
@@ -248,10 +252,6 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
 
   RETURN_IF_ERROR(InitDictFilterStructures());
-
-  // The scanner-wide stream was used only to read the file footer.  Each 
column has added
-  // its own stream.
-  stream_ = nullptr;
   return Status::OK();
 }
 
@@ -675,15 +675,13 @@ Status HdfsParquetScanner::NextRowGroup() {
     }
 
     InitCollectionColumns();
+    RETURN_IF_ERROR(InitScalarColumns());
 
-    // Prepare dictionary filtering columns for first read
-    // This must be done before dictionary filtering, because this code 
initializes
-    // the column offsets and streams needed to read the dictionaries.
-    // TODO: Restructure the code so that the dictionary can be read without 
the rest
-    // of the column.
-    RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, 
dict_filterable_readers_));
+    // Start scanning dictionary filtering column readers, so we can read the 
dictionary
+    // pages in EvalDictionaryFilters().
+    
RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(dict_filterable_readers_));
 
-    // InitColumns() may have allocated resources to scan columns. If we skip 
this row
+    // StartScans() may have allocated resources to scan columns. If we skip 
this row
     // group below, we must call ReleaseSkippedRowGroupResources() before 
continuing.
 
     // If there is a dictionary-encoded column where every value is eliminated
@@ -704,10 +702,10 @@ Status HdfsParquetScanner::NextRowGroup() {
     }
 
     // At this point, the row group has passed any filtering criteria
-    // Prepare non-dictionary filtering column readers for first read and
-    // initialize their dictionaries.
-    RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, 
non_dict_filterable_readers_));
-    status = InitDictionaries(non_dict_filterable_readers_);
+    // Start scanning non-dictionary filtering column readers and initialize 
their
+    // dictionaries.
+    
RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(non_dict_filterable_readers_));
+    status = 
BaseScalarColumnReader::InitDictionaries(non_dict_filterable_readers_);
     if (!status.ok()) {
       // Either return an error or skip this row group if it is ok to ignore 
errors
       RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
@@ -742,7 +740,6 @@ Status HdfsParquetScanner::NextRowGroup() {
       break;
     }
   }
-
   DCHECK(parse_status_.ok());
   return Status::OK();
 }
@@ -800,6 +797,7 @@ void HdfsParquetScanner::PartitionReaders(
     } else {
       BaseScalarColumnReader* scalar_reader =
           static_cast<BaseScalarColumnReader*>(reader);
+      scalar_readers_.push_back(scalar_reader);
       if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) {
         dict_filterable_readers_.push_back(scalar_reader);
       } else {
@@ -991,7 +989,7 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const 
parquet::RowGroup& row_gr
 
   // Any columns that were not 100% dictionary encoded need to initialize
   // their dictionaries here.
-  RETURN_IF_ERROR(InitDictionaries(deferred_dict_init_list));
+  
RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list));
 
   return Status::OK();
 }
@@ -1648,23 +1646,16 @@ void HdfsParquetScanner::InitCollectionColumns() {
   }
 }
 
-Status HdfsParquetScanner::InitScalarColumns(
-    int row_group_idx, const vector<BaseScalarColumnReader*>& column_readers) {
+Status HdfsParquetScanner::InitScalarColumns() {
   int64_t partition_id = context_->partition_descriptor()->id();
   const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, 
filename());
   DCHECK(file_desc != nullptr);
-  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx];
+  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
 
-  // All the scan ranges (one for each column).
-  vector<ScanRange*> col_ranges;
   // Used to validate that the number of values in each reader in 
column_readers_ at the
   // same SchemaElement is the same.
   unordered_map<const parquet::SchemaElement*, int> num_values_map;
-  // Used to validate we issued the right number of scan ranges
-  int num_scalar_readers = 0;
-
-  for (BaseScalarColumnReader* scalar_reader: column_readers) {
-    ++num_scalar_readers;
+  for (BaseScalarColumnReader* scalar_reader : scalar_readers_) {
     const parquet::ColumnChunk& col_chunk = 
row_group.columns[scalar_reader->col_idx()];
     auto num_values_it = num_values_map.find(&scalar_reader->schema_element());
     int num_values = -1;
@@ -1673,84 +1664,115 @@ Status HdfsParquetScanner::InitScalarColumns(
     } else {
       num_values_map[&scalar_reader->schema_element()] = 
col_chunk.meta_data.num_values;
     }
-    int64_t col_start = col_chunk.meta_data.data_page_offset;
-
     if (num_values != -1 && col_chunk.meta_data.num_values != num_values) {
       // TODO: improve this error message by saying which columns are 
different,
       // and also specify column in other error messages as appropriate
       return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, 
scalar_reader->col_idx(),
           col_chunk.meta_data.num_values, num_values, filename());
     }
+    RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, 
row_group_idx_));
+  }
+  RETURN_IF_ERROR(
+      DivideReservationBetweenColumns(scalar_readers_, 
total_col_reservation_));
+  return Status::OK();
+}
 
-    
RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(file_metadata_,
-        filename(), row_group_idx, scalar_reader->col_idx(),
-        scalar_reader->schema_element(), state_));
-
-    if (col_chunk.meta_data.__isset.dictionary_page_offset) {
-      // Already validated in ValidateColumnOffsets()
-      DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
-      col_start = col_chunk.meta_data.dictionary_page_offset;
-    }
-    int64_t col_len = col_chunk.meta_data.total_compressed_size;
-    if (col_len <= 0) {
-      return Status(Substitute("File '$0' contains invalid column chunk size: 
$1",
-          filename(), col_len));
-    }
-    int64_t col_end = col_start + col_len;
-
-    // Already validated in ValidateColumnOffsets()
-    DCHECK_GT(col_end, 0);
-    DCHECK_LT(col_end, file_desc->file_length);
-    if (file_version_.application == "parquet-mr" && 
file_version_.VersionLt(1, 2, 9)) {
-      // The Parquet MR writer had a bug in 1.2.8 and below where it didn't 
include the
-      // dictionary page header size in total_compressed_size and 
total_uncompressed_size
-      // (see IMPALA-694). We pad col_len to compensate.
-      int64_t bytes_remaining = file_desc->file_length - col_end;
-      int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
-      col_len += pad;
-    }
-
-    // TODO: this will need to change when we have co-located files and the 
columns
-    // are different files.
-    if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
-      return Status(Substitute("Expected parquet column file path '$0' to 
match "
-          "filename '$1'", col_chunk.file_path, filename()));
-    }
-
-    const ScanRange* split_range =
-        
static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
-
-    // Determine if the column is completely contained within a local split.
-    bool col_range_local = split_range->expected_local()
-        && col_start >= split_range->offset()
-        && col_end <= split_range->offset() + split_range->len();
-    ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
-        filename(), col_len, col_start, partition_id, split_range->disk_id(),
-        col_range_local,
-        BufferOpts(split_range->try_cache(), file_desc->mtime));
-    col_ranges.push_back(col_range);
-
-    // Get the stream that will be used for this column
-    ScannerContext::Stream* stream = context_->AddStream(col_range);
-    DCHECK(stream != nullptr);
+Status HdfsParquetScanner::DivideReservationBetweenColumns(
+    const vector<BaseScalarColumnReader*>& column_readers,
+    int64_t reservation_to_distribute) {
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  const int64_t min_buffer_size = io_mgr->min_buffer_size();
+  const int64_t max_buffer_size = io_mgr->max_buffer_size();
+  // The HdfsScanNode reservation calculation in the planner ensures that we 
have
+  // reservation for at least one buffer per column.
+  if (reservation_to_distribute < min_buffer_size * column_readers.size()) {
+    return Status(TErrorCode::INTERNAL_ERROR,
+        Substitute("Not enough reservation in Parquet scanner for file '$0'. 
Need at "
+                   "least $1 bytes per column for $2 columns but had $3 bytes",
+            filename(), min_buffer_size, column_readers.size(),
+            reservation_to_distribute));
+  }
 
-    RETURN_IF_ERROR(scalar_reader->Reset(&col_chunk.meta_data, stream));
+  vector<int64_t> col_range_lengths(column_readers.size());
+  for (int i = 0; i < column_readers.size(); ++i) {
+    col_range_lengths[i] = column_readers[i]->scan_range()->len();
   }
-  DCHECK_EQ(col_ranges.size(), num_scalar_readers);
+  vector<pair<int, int64_t>> tmp_reservations = 
DivideReservationBetweenColumnsHelper(
+      min_buffer_size, max_buffer_size, col_range_lengths, 
reservation_to_distribute);
+  for (auto& tmp_reservation : tmp_reservations) {
+    
column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second);
+  }
+  return Status::OK();
+}
 
-  DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr();
-  // Issue all the column chunks to the IoMgr. We scan through all columns at 
the same
-  // time so need to read from all of them concurrently.
-  for (ScanRange* col_range : col_ranges) {
-    bool needs_buffers;
-    RETURN_IF_ERROR(io_mgr->StartScanRange(
-        scan_node_->reader_context(), col_range, &needs_buffers));
-    if (needs_buffers) {
-      RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
-          scan_node_->reader_context(), col_range, 3 * 
io_mgr->max_buffer_size()));
+vector<pair<int, int64_t>> 
HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
+    int64_t min_buffer_size, int64_t max_buffer_size,
+    const vector<int64_t>& col_range_lengths, int64_t 
reservation_to_distribute) {
+  // Pair of (column index, reservation allocated).
+  vector<pair<int, int64_t>> tmp_reservations;
+  for (int i = 0; i < col_range_lengths.size(); ++i) 
tmp_reservations.emplace_back(i, 0);
+
+  // Sort in descending order of length, breaking ties by index so that larger 
columns
+  // get allocated reservation first. It is common to have dramatically 
different column
+  // sizes in a single file because of different value sizes and 
compressibility. E.g.
+  // consider a large STRING "comment" field versus a highly compressible
+  // dictionary-encoded column with only a few distinct values. We want to 
give max-sized
+  // buffers to large columns first to maximize the size of I/Os that we do 
while reading
+  // this row group.
+  sort(tmp_reservations.begin(), tmp_reservations.end(),
+      [&col_range_lengths](const pair<int, int64_t>& left, const pair<int, 
int64_t>& right) {
+        int64_t left_len = col_range_lengths[left.first];
+        int64_t right_len = col_range_lengths[right.first];
+        return left_len != right_len ? left_len > right_len : left.first < 
right.first;
+      });
+
+  // Set aside the minimum reservation per column.
+  reservation_to_distribute -= min_buffer_size * col_range_lengths.size();
+
+  // Allocate reservations to columns by repeatedly allocating either a 
max-sized buffer
+  // or a large enough buffer to fit the remaining data for each column. Do 
this
+  // round-robin up to the ideal number of I/O buffers.
+  for (int i = 0; i < DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) {
+    for (auto& tmp_reservation : tmp_reservations) {
+      // Add back the reservation we set aside above.
+      if (i == 0) reservation_to_distribute += min_buffer_size;
+
+      int64_t bytes_left_in_range =
+          col_range_lengths[tmp_reservation.first] - tmp_reservation.second;
+      int64_t bytes_to_add;
+      if (bytes_left_in_range >= max_buffer_size) {
+        if (reservation_to_distribute >= max_buffer_size) {
+          bytes_to_add = max_buffer_size;
+        } else if (i == 0) {
+          DCHECK_EQ(0, tmp_reservation.second);
+          // Ensure this range gets at least one buffer on the first iteration.
+          bytes_to_add = 
BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute);
+        } else {
+          DCHECK_GT(tmp_reservation.second, 0);
+          // We need to read more than the max buffer size, but can't allocate 
a
+          // max-sized buffer. Stop adding buffers to this column: we prefer 
to use
+          // the existing max-sized buffers without small buffers mixed in so 
that
+          // we will alway do max-sized I/Os, which make efficient use of I/O 
devices.
+          bytes_to_add = 0;
+        }
+      } else if (bytes_left_in_range > 0 &&
+          reservation_to_distribute >= min_buffer_size) {
+        // Choose a buffer size that will fit the rest of the bytes left in 
the range.
+        bytes_to_add = max(min_buffer_size, 
BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range));
+        // But don't add more reservation than is available.
+        bytes_to_add =
+            min(bytes_to_add, 
BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute));
+      } else {
+        bytes_to_add = 0;
+      }
+      DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << 
bytes_to_add;
+      reservation_to_distribute -= bytes_to_add;
+      tmp_reservation.second += bytes_to_add;
+      DCHECK_GE(reservation_to_distribute, 0);
+      DCHECK_GT(tmp_reservation.second, 0);
     }
   }
-  return Status::OK();
+  return tmp_reservations;
 }
 
 Status HdfsParquetScanner::InitDictionaries(

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index 1fc3239..92f2550 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -34,7 +34,7 @@ class CollectionValueBuilder;
 struct HdfsFileDesc;
 
 /// Internal schema representation and resolution.
-class SchemaNode;
+struct SchemaNode;
 
 /// Class that implements Parquet definition and repetition level decoding.
 class ParquetLevelDecoder;
@@ -361,6 +361,7 @@ class HdfsParquetScanner : public HdfsScanner {
   template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
   friend class ScalarColumnReader;
   friend class BoolColumnReader;
+  friend class HdfsParquetScannerTest;
 
   /// Size of the file footer.  This is a guess.  If this value is too little, 
we will
   /// need to issue another read.
@@ -429,7 +430,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Number of scratch batches processed so far.
   int64_t row_batches_produced_;
 
-  /// Column reader for each materialized columns for this file.
+  /// Column reader for each top-level materialized slot in the output tuple.
   std::vector<ParquetColumnReader*> column_readers_;
 
   /// Column readers will write slot values into this scratch batch for
@@ -445,6 +446,9 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Scan range for the metadata.
   const io::ScanRange* metadata_range_;
 
+  /// Reservation available for scanning columns, in bytes.
+  int64_t total_col_reservation_ = 0;
+
   /// Pool to copy dictionary page buffer into. This pool is shared across all 
the
   /// pages in a column chunk.
   boost::scoped_ptr<MemPool> dictionary_pool_;
@@ -461,6 +465,9 @@ class HdfsParquetScanner : public HdfsScanner {
   /// or nested within a collection.
   std::vector<BaseScalarColumnReader*> non_dict_filterable_readers_;
 
+  /// Flattened list of all scalar column readers in column_readers_.
+  std::vector<BaseScalarColumnReader*> scalar_readers_;
+
   /// Flattened collection column readers that point to readers in 
column_readers_.
   std::vector<CollectionColumnReader*> collection_readers_;
 
@@ -627,12 +634,24 @@ class HdfsParquetScanner : public HdfsScanner {
       WARN_UNUSED_RESULT;
 
   /// Walks file_metadata_ and initiates reading the materialized columns.  
This
-  /// initializes 'column_readers' and issues the reads for the columns. 
'column_readers'
-  /// includes a mix of scalar readers from multiple schema nodes (i.e., 
readers of
-  /// top-level scalar columns and readers of scalar columns within a 
collection node).
-  Status InitScalarColumns(
-      int row_group_idx, const std::vector<BaseScalarColumnReader*>& 
column_readers)
-      WARN_UNUSED_RESULT;
+  /// initializes 'scalar_readers_' and divides reservation between the 
columns but
+  /// does not start any scan ranges.
+  Status InitScalarColumns() WARN_UNUSED_RESULT;
+
+  /// Decides how to divide 'reservation_to_distribute' bytes of reservation 
between the
+  /// columns. Sets the reservation on each corresponding reader in 
'column_readers'.
+  Status DivideReservationBetweenColumns(
+      const std::vector<BaseScalarColumnReader*>& column_readers,
+      int64_t reservation_to_distribute);
+
+  /// Helper for DivideReservationBetweenColumns. Implements the core 
algorithm for
+  /// dividing a reservation of 'reservation_to_distribute' bytes between 
columns with
+  /// scan range lengths 'col_range_lengths' given a min and max buffer size. 
Returns
+  /// a vector with an entry per column with the index into 
'col_range_lengths' and the
+  /// amount of reservation in bytes to give to that column.
+  static std::vector<std::pair<int, int64_t>> 
DivideReservationBetweenColumnsHelper(
+      int64_t min_buffer_size, int64_t max_buffer_size,
+      const std::vector<int64_t>& col_range_lengths, int64_t 
reservation_to_distribute);
 
   /// Initializes the column readers in collection_readers_.
   void InitCollectionColumns();
@@ -668,7 +687,8 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Partitions the readers into scalar and collection readers. The 
collection readers
   /// are flattened into collection_readers_. The scalar readers are 
partitioned into
   /// dict_filterable_readers_ and non_dict_filterable_readers_ depending on 
whether
-  /// dictionary filtering is enabled and the reader can be dictionary 
filtered.
+  /// dictionary filtering is enabled and the reader can be dictionary 
filtered. All
+  /// scalar readers are also flattened into scalar_readers_.
   void PartitionReaders(const vector<ParquetColumnReader*>& readers,
                         bool can_eval_dict_filters);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index 94af63b..b6a51f6 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -35,6 +35,7 @@
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/io/request-context.h"
@@ -64,6 +65,7 @@ const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 
* 1024;
 HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
     const DescriptorTbl& descs)
     : ScanNode(pool, tnode, descs),
+      
ideal_scan_range_reservation_(tnode.hdfs_scan_node.ideal_scan_range_reservation),
       min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ?
           tnode.hdfs_scan_node.min_max_tuple_id : -1),
       
skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
@@ -80,6 +82,7 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const 
TPlanNode& tnode,
           &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
       disks_accessed_bitmap_(TUnit::UNIT, 0),
       active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
+  DCHECK_GE(ideal_scan_range_reservation_, resource_profile_.min_reservation);
 }
 
 HdfsScanNodeBase::~HdfsScanNodeBase() {
@@ -109,7 +112,6 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, 
RuntimeState* state) {
     RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts,
         *min_max_row_desc, state, &min_max_conjuncts_));
   }
-
   return Status::OK();
 }
 
@@ -240,6 +242,16 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size());
   
ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
 
+  // Check if reservation was enough to allocate at least one buffer. The
+  // reservation calculation in HdfsScanNode.java should guarantee this.
+  // Hitting this error indicates a misconfiguration or bug.
+  int64_t min_buffer_size = 
ExecEnv::GetInstance()->disk_io_mgr()->min_buffer_size();
+  if (scan_range_params_->size() > 0
+      && resource_profile_.min_reservation < min_buffer_size) {
+    return Status(TErrorCode::INTERNAL_ERROR,
+      Substitute("HDFS scan min reservation $0 must be >= min buffer size $1",
+       resource_profile_.min_reservation, min_buffer_size));
+  }
   // Add per volume stats to the runtime profile
   PerVolumeStats per_volume_stats;
   stringstream str;
@@ -326,7 +338,18 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
         partition_desc->partition_key_value_evals(), scan_node_pool_.get(), 
state);
   }
 
-  reader_context_ = runtime_state_->io_mgr()->RegisterContext(mem_tracker());
+  RETURN_IF_ERROR(ClaimBufferReservation(state));
+  // We got the minimum reservation. Now try to get ideal reservation.
+  if (resource_profile_.min_reservation != ideal_scan_range_reservation_) {
+    bool increased = buffer_pool_client_.IncreaseReservation(
+        ideal_scan_range_reservation_ - resource_profile_.min_reservation);
+    VLOG_FILE << "Increasing reservation from minimum "
+              << resource_profile_.min_reservation << "B to ideal "
+              << ideal_scan_range_reservation_ << "B "
+              << (increased ? "succeeded" : "failed");
+  }
+
+  reader_context_ = runtime_state_->io_mgr()->RegisterContext();
 
   // Initialize HdfsScanNode specific counters
   // TODO: Revisit counters and move the counters specific to multi-threaded 
scans

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h 
b/be/src/exec/hdfs-scan-node-base.h
index 70fbac2..3a9c37f 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -322,6 +322,9 @@ class HdfsScanNodeBase : public ScanNode {
   friend class ScannerContext;
   friend class HdfsScanner;
 
+  /// Ideal reservation to process each input split, computed by the planner.
+  const int64_t ideal_scan_range_reservation_;
+
   /// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics.
   const int min_max_tuple_id_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index be75677..f4948d9 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -86,17 +86,18 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, 
RowBatch* row_batch, bool* e
       StopAndFinalizeCounters();
       return Status::OK();
     }
+    int64_t scanner_reservation = buffer_pool_client_.GetReservation();
     if (needs_buffers) {
-      RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(), 
scan_range_,
-          3 * io_mgr->max_buffer_size()));
+      RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(),
+          &buffer_pool_client_, scan_range_, scanner_reservation));
     }
     ScanRangeMetadata* metadata =
         static_cast<ScanRangeMetadata*>(scan_range_->meta_data());
     int64_t partition_id = metadata->partition_id;
     HdfsPartitionDescriptor* partition = 
hdfs_table_->GetPartition(partition_id);
-    scanner_ctx_.reset(new ScannerContext(
-        runtime_state_, this, partition, scan_range_, filter_ctxs(),
-        expr_results_pool()));
+    scanner_ctx_.reset(new ScannerContext(runtime_state_, this, 
&buffer_pool_client_,
+        partition, filter_ctxs(), expr_results_pool()));
+    scanner_ctx_->AddStream(scan_range_, scanner_reservation);
     Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), 
&scanner_);
     if (!status.ok()) {
       DCHECK(scanner_ == NULL);

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index f9d71e9..a95e47a 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -46,16 +46,6 @@ DECLARE_bool(skip_file_runtime_filtering);
 using namespace impala;
 using namespace impala::io;
 
-// Amount of memory that we approximate a scanner thread will use not 
including IoBuffers.
-// The memory used does not vary considerably between file formats (just a 
couple of MBs).
-// This value is conservative and taken from running against the tpch lineitem 
table.
-// TODO: revisit how we do this.
-const int SCANNER_THREAD_MEM_USAGE = 32 * 1024 * 1024;
-
-// Estimated upper bound on the compression ratio of compressed text files. 
Used to
-// estimate scanner thread memory usage.
-const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11;
-
 // Amount of time to block waiting for GetNext() to release scanner threads 
between
 // checking if a scanner thread should yield itself back to the global thread 
pool.
 const int SCANNER_THREAD_WAIT_TIME_MS = 20;
@@ -63,11 +53,6 @@ const int SCANNER_THREAD_WAIT_TIME_MS = 20;
 HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
                            const DescriptorTbl& descs)
     : HdfsScanNodeBase(pool, tnode, descs),
-      ranges_issued_barrier_(1),
-      scanner_thread_bytes_required_(0),
-      done_(false),
-      all_ranges_started_(false),
-      thread_avail_cb_id_(-1),
       max_num_scanner_threads_(CpuInfo::num_cores()) {
 }
 
@@ -168,36 +153,6 @@ Status HdfsScanNode::Init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status HdfsScanNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
-
-  // Compute the minimum bytes required to start a new thread. This is based 
on the
-  // file format.
-  // The higher the estimate, the less likely it is the query will fail but 
more likely
-  // the query will be throttled when it does not need to be.
-  // TODO: how many buffers should we estimate per range. The IoMgr will 
throttle down to
-  // one but if there are already buffers queued before memory pressure was 
hit, we can't
-  // reclaim that memory.
-  if (per_type_files_[THdfsFileFormat::PARQUET].size() > 0) {
-    // Parquet files require buffers per column
-    scanner_thread_bytes_required_ =
-        materialized_slots_.size() * 3 * 
runtime_state_->io_mgr()->max_buffer_size();
-  } else {
-    scanner_thread_bytes_required_ =
-        3 * runtime_state_->io_mgr()->max_buffer_size();
-  }
-  // scanner_thread_bytes_required_ now contains the IoBuffer requirement.
-  // Next we add in the other memory the scanner thread will use.
-  // e.g. decompression buffers, tuple buffers, etc.
-  // For compressed text, we estimate this based on the file size (since the 
whole file
-  // will need to be decompressed at once). For all other formats, we use a 
constant.
-  // TODO: can we do something better?
-  int64_t scanner_thread_mem_usage = SCANNER_THREAD_MEM_USAGE;
-  for (HdfsFileDesc* file: per_type_files_[THdfsFileFormat::TEXT]) {
-    if (file->file_compression != THdfsCompression::NONE) {
-      int64_t bytes_required = file->file_length * 
COMPRESSED_TEXT_COMPRESSION_RATIO;
-      scanner_thread_mem_usage = ::max(bytes_required, 
scanner_thread_mem_usage);
-    }
-  }
-  scanner_thread_bytes_required_ += scanner_thread_mem_usage;
   row_batches_get_timer_ = ADD_TIMER(runtime_profile(), 
"RowBatchQueueGetWaitTime");
   row_batches_put_timer_ = ADD_TIMER(runtime_profile(), 
"RowBatchQueuePutWaitTime");
   return Status::OK();
@@ -219,10 +174,9 @@ Status HdfsScanNode::Open(RuntimeState* state) {
     max_num_scanner_threads_ = 
runtime_state_->query_options().num_scanner_threads;
   }
   DCHECK_GT(max_num_scanner_threads_, 0);
-
+  spare_reservation_.Store(buffer_pool_client_.GetReservation());
   thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
       bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
-
   return Status::OK();
 }
 
@@ -263,37 +217,28 @@ Status HdfsScanNode::AddDiskIoRanges(const 
vector<ScanRange*>& ranges,
   return Status::OK();
 }
 
-// For controlling the amount of memory used for scanners, we approximate the
-// scanner mem usage based on scanner_thread_bytes_required_, rather than the
-// consumption in the scan node's mem tracker. The problem with the scan node
-// trackers is that it does not account for the memory the scanner will use.
-// For example, if there is 110 MB of memory left (based on the mem tracker)
-// and we estimate that a scanner will use 100MB, we want to make sure to only
-// start up one additional thread. However, after starting the first thread, 
the
-// mem tracker value will not change immediately (it takes some time before the
-// scanner is running and starts using memory). Therefore we just use the 
estimate
-// based on the number of running scanner threads.
-bool HdfsScanNode::EnoughMemoryForScannerThread(bool new_thread) {
-  int64_t committed_scanner_mem =
-      active_scanner_thread_counter_.value() * scanner_thread_bytes_required_;
-  int64_t tracker_consumption = mem_tracker()->consumption();
-  int64_t est_additional_scanner_mem = committed_scanner_mem - 
tracker_consumption;
-  if (est_additional_scanner_mem < 0) {
-    // This is the case where our estimate was too low. Expand the estimate 
based
-    // on the usage.
-    int64_t avg_consumption =
-        tracker_consumption / active_scanner_thread_counter_.value();
-    // Take the average and expand it by 50%. Some scanners will not have hit 
their
-    // steady state mem usage yet.
-    // TODO: how can we scale down if we've overestimated.
-    // TODO: better heuristic?
-    scanner_thread_bytes_required_ = static_cast<int64_t>(avg_consumption * 
1.5);
-    est_additional_scanner_mem = 0;
-  }
+bool HdfsScanNode::EnoughReservationForExtraThread(const unique_lock<mutex>& 
lock) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  if (spare_reservation_.Load() >= ideal_scan_range_reservation_) return true;
+  int64_t increase = ideal_scan_range_reservation_ - spare_reservation_.Load();
+  if (!buffer_pool_client_.IncreaseReservation(increase)) return false;
+  spare_reservation_.Add(increase);
+  return true;
+}
 
-  // If we are starting a new thread, take that into account now.
-  if (new_thread) est_additional_scanner_mem += scanner_thread_bytes_required_;
-  return est_additional_scanner_mem < mem_tracker()->SpareCapacity();
+int64_t HdfsScanNode::DeductReservationForScannerThread(const 
unique_lock<mutex>& lock,
+    bool first_thread) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  int64_t amount;
+  if (first_thread) {
+    amount = spare_reservation_.Load() >= ideal_scan_range_reservation_ ?
+        ideal_scan_range_reservation_ : resource_profile_.min_reservation;
+  } else {
+    amount = ideal_scan_range_reservation_;
+  }
+  int64_t remainder = spare_reservation_.Add(-amount);
+  DCHECK_GE(remainder, 0);
+  return amount;
 }
 
 void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* 
pool) {
@@ -323,36 +268,45 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
     // TODO: This still leans heavily on starvation-free locks, come up with a 
more
     // correct way to communicate between this method and ScannerThreadHelper
     unique_lock<mutex> lock(lock_);
+
+    const int64_t num_active_scanner_threads = 
active_scanner_thread_counter_.value();
+    const bool first_thread = num_active_scanner_threads == 0;
     // Cases 1, 2, 3.
     if (done_ || all_ranges_started_ ||
-        active_scanner_thread_counter_.value() >= progress_.remaining()) {
+        num_active_scanner_threads >= progress_.remaining()) {
       break;
     }
 
     // Cases 5 and 6.
-    if (active_scanner_thread_counter_.value() > 0 &&
+    if (!first_thread &&
         (materialized_row_batches_->Size() >= max_materialized_row_batches_ ||
-         !EnoughMemoryForScannerThread(true))) {
+         !EnoughReservationForExtraThread(lock))) {
       break;
     }
 
     // Case 7 and 8.
-    bool is_reserved = false;
-    if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ ||
-        !pool->TryAcquireThreadToken(&is_reserved)) {
+    if (num_active_scanner_threads >= max_num_scanner_threads_ ||
+        !pool->TryAcquireThreadToken()) {
       break;
     }
 
+    // Deduct the reservation. We haven't dropped the lock since the
+    // first_thread/EnoughReservationForExtraThread() checks so spare 
reservation
+    // must be available.
+    int64_t scanner_thread_reservation =
+        DeductReservationForScannerThread(lock, first_thread);
     COUNTER_ADD(&active_scanner_thread_counter_, 1);
     string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, 
thread-idx:$2)",
         PrintId(runtime_state_->fragment_instance_id()), id(),
         num_scanner_threads_started_counter_->value());
-
-    auto fn = [this]() { this->ScannerThread(); };
+    auto fn = [this, scanner_thread_reservation]() {
+      this->ScannerThread(scanner_thread_reservation);
+    };
     std::unique_ptr<Thread> t;
     status =
       Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, 
&t, true);
     if (!status.ok()) {
+      ReturnReservationFromScannerThread(scanner_thread_reservation);
       COUNTER_ADD(&active_scanner_thread_counter_, -1);
       // Release the token and skip running callbacks to find a replacement. 
Skipping
       // serves two purposes. First, it prevents a mutual recursion between 
this function
@@ -373,7 +327,7 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
   }
 }
 
-void HdfsScanNode::ScannerThread() {
+void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) {
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
   DiskIoMgr* io_mgr = runtime_state_->io_mgr();
@@ -402,8 +356,7 @@ void HdfsScanNode::ScannerThread() {
       // this thread.
       unique_lock<mutex> l(lock_);
       if (active_scanner_thread_counter_.value() > 1) {
-        if (runtime_state_->resource_pool()->optional_exceeded() ||
-            !EnoughMemoryForScannerThread(false)) {
+        if (runtime_state_->resource_pool()->optional_exceeded()) {
           // We can't break here. We need to update the counter with the lock 
held or else
           // all threads might see active_scanner_thread_counter_.value > 1
           COUNTER_ADD(&active_scanner_thread_counter_, -1);
@@ -438,12 +391,13 @@ void HdfsScanNode::ScannerThread() {
     if (status.ok() && scan_range != nullptr) {
       if (needs_buffers) {
         status = io_mgr->AllocateBuffersForRange(
-            reader_context_.get(), scan_range, 3 * io_mgr->max_buffer_size());
+            reader_context_.get(), &buffer_pool_client_, scan_range,
+            scanner_thread_reservation);
       }
       if (status.ok()) {
         // Got a scan range. Process the range end to end (in this thread).
         status = ProcessSplit(filter_status.ok() ? filter_ctxs : 
vector<FilterContext>(),
-            &expr_results_pool, scan_range);
+            &expr_results_pool, scan_range, scanner_thread_reservation);
       }
     }
 
@@ -483,6 +437,7 @@ void HdfsScanNode::ScannerThread() {
   COUNTER_ADD(&active_scanner_thread_counter_, -1);
 
 exit:
+  ReturnReservationFromScannerThread(scanner_thread_reservation);
   runtime_state_->resource_pool()->ReleaseThreadToken(false);
   for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
   filter_mem_pool.FreeAll();
@@ -490,7 +445,8 @@ exit:
 }
 
 Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
-    MemPool* expr_results_pool, ScanRange* scan_range) {
+    MemPool* expr_results_pool, ScanRange* scan_range,
+    int64_t scanner_thread_reservation) {
   DCHECK(scan_range != NULL);
 
   ScanRangeMetadata* metadata = 
static_cast<ScanRangeMetadata*>(scan_range->meta_data());
@@ -515,8 +471,9 @@ Status HdfsScanNode::ProcessSplit(const 
vector<FilterContext>& filter_ctxs,
     return Status::OK();
   }
 
-  ScannerContext context(
-      runtime_state_, this, partition, scan_range, filter_ctxs, 
expr_results_pool);
+  ScannerContext context(runtime_state_, this, &buffer_pool_client_, partition,
+      filter_ctxs, expr_results_pool);
+  context.AddStream(scan_range, scanner_thread_reservation);
   scoped_ptr<HdfsScanner> scanner;
   Status status = CreateAndOpenScanner(partition, &context, &scanner);
   if (!status.ok()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index a1c97cf..81e826e 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -27,6 +27,7 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 
+#include "common/atomic.h"
 #include "exec/filter-context.h"
 #include "exec/hdfs-scan-node-base.h"
 #include "runtime/io/disk-io-mgr.h"
@@ -58,8 +59,14 @@ class TPlanNode;
 /// 5. The scanner finishes the scan range and informs the scan node so it can 
track
 ///    end of stream.
 ///
-/// TODO: This class allocates a bunch of small utility objects that should be
-/// recycled.
+/// Buffer management:
+/// ------------------
+/// The different scanner threads all allocate I/O buffers from the node's 
Buffer Pool
+/// client. The scan node ensures that enough reservation is available to 
start a
+/// scanner thread before launching each one with (see
+/// EnoughReservationForExtraThread()), after which the scanner thread is 
responsible
+/// for staying within the reservation handed off to it.
+///
 /// TODO: Remove this class once the fragment-based multi-threaded execution is
 /// fully functional.
 class HdfsScanNode : public HdfsScanNodeBase {
@@ -100,12 +107,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
 
  private:
   /// Released when initial ranges are issued in the first call to GetNext().
-  CountingBarrier ranges_issued_barrier_;
-
-  /// The estimated memory required to start up a new scanner thread. If the 
memory
-  /// left (due to limits) is less than this value, we won't start up optional
-  /// scanner threads.
-  int64_t scanner_thread_bytes_required_;
+  CountingBarrier ranges_issued_barrier_{1};
 
   /// Thread group for all scanner worker threads
   ThreadGroup scanner_threads_;
@@ -130,16 +132,16 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// are finished, an error/cancellation occurred, or the limit was reached.
   /// Setting this to true triggers the scanner threads to clean up.
   /// This should not be explicitly set. Instead, call SetDone().
-  bool done_;
+  bool done_ = false;
 
   /// Set to true if all ranges have started. Some of the ranges may still be 
in flight
   /// being processed by scanner threads, but no new ScannerThreads should be 
started.
-  bool all_ranges_started_;
+  bool all_ranges_started_ = false;
 
   /// The id of the callback added to the thread resource manager when thread 
token
   /// is available. Used to remove the callback before this scan node is 
destroyed.
   /// -1 if no callback is registered.
-  int thread_avail_cb_id_;
+  int thread_avail_cb_id_ = -1;
 
   /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that 
query
   /// option is set. Otherwise, it's set to the number of cpu cores. Scanner 
threads
@@ -147,6 +149,14 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// the number of cores.
   int max_num_scanner_threads_;
 
+  /// Amount of the 'buffer_pool_client_' reservation that is not allocated to 
scanner
+  /// threads. Doled out to scanner threads when they are started and returned 
when
+  /// those threads no longer need it. Can be atomically incremented without 
holding
+  /// 'lock_' but 'lock_' is held when decrementing to ensure that the check 
for
+  /// reservation and the actual deduction are atomic with respect to other 
threads
+  /// trying to claim reservation.
+  AtomicInt64 spare_reservation_{0};
+
   /// The wait time for fetching a row batch from the row batch queue.
   RuntimeProfile::Counter* row_batches_get_timer_;
 
@@ -160,21 +170,35 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// Main function for scanner thread. This thread pulls the next range to be
   /// processed from the IoMgr and then processes the entire range end to end.
   /// This thread terminates when all scan ranges are complete or an error 
occurred.
-  void ScannerThread();
+  /// The caller must have reserved 'scanner_thread_reservation' bytes of 
memory for
+  /// this thread with DeductReservationForScannerThread().
+  void ScannerThread(int64_t scanner_thread_reservation);
 
   /// Process the entire scan range with a new scanner object. Executed in 
scanner
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to 
filter rows
   /// in this split.
   Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
-      MemPool* expr_results_pool, io::ScanRange* scan_range) 
WARN_UNUSED_RESULT;
-
-  /// Returns true if there is enough memory (against the mem tracker limits) 
to
-  /// have a scanner thread.
-  /// If new_thread is true, the calculation is for starting a new scanner 
thread.
-  /// If false, it determines whether there's adequate memory for the existing
-  /// set of scanner threads.
-  /// lock_ must be taken before calling this.
-  bool EnoughMemoryForScannerThread(bool new_thread);
+      MemPool* expr_results_pool, io::ScanRange* scan_range,
+      int64_t scanner_thread_reservation) WARN_UNUSED_RESULT;
+
+  /// Return true if there is enough reservation to start an extra scanner 
thread.
+  /// Tries to increase reservation if enough is not already available in
+  /// 'spare_reservation_'. 'lock_' must be held via 'lock'
+  bool EnoughReservationForExtraThread(const boost::unique_lock<boost::mutex>& 
lock);
+
+  /// Deduct reservation to start a new scanner thread from 
'spare_reservation_'. If
+  /// 'first_thread' is true, this is the first thread to be started and only 
the
+  /// minimum reservation is required to be available. Otherwise
+  /// EnoughReservationForExtra() thread must have returned true in the current
+  /// critical section so that 'ideal_scan_range_bytes_' is available for the 
extra
+  /// thread. Returns the amount deducted. 'lock_' must be held via 'lock'.
+  int64_t DeductReservationForScannerThread(const 
boost::unique_lock<boost::mutex>& lock,
+      bool first_thread);
+
+  /// Called by scanner thread to return or all of its reservation that is not 
needed.
+  void ReturnReservationFromScannerThread(int64_t bytes) {
+    spare_reservation_.Add(bytes);
+  }
 
   /// Checks for eos conditions and returns batches from 
materialized_row_batches_.
   Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos)

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc 
b/be/src/exec/parquet-column-readers.cc
index 7cf89ba..5afb4e5 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -30,6 +30,7 @@
 #include "exec/scanner-context.inline.h"
 #include "rpc/thrift-util.h"
 #include "runtime/collection-value-builder.h"
+#include "runtime/exec-env.h"
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
 #include "runtime/runtime-state.h"
@@ -47,6 +48,10 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, 
false,
     "When true, TIMESTAMPs read from files written by Parquet-MR (used by 
Hive) will "
     "be converted from UTC to local time. Writes are unaffected.");
 
+// Max dictionary page header size in bytes. This is an estimate and only 
needs to be an
+// upper bound.
+static const int MAX_DICT_HEADER_SIZE = 100;
+
 // Max data page header size in bytes. This is an estimate and only needs to 
be an upper
 // bound. It is theoretically possible to have a page header of any size due 
to string
 // value statistics, but in practice we'll have trouble reading string values 
this large.
@@ -66,6 +71,8 @@ static int debug_count = 0;
 #define SHOULD_TRIGGER_DEBUG_ACTION(x) (false)
 #endif
 
+using namespace impala::io;
+
 namespace impala {
 
 const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
@@ -834,8 +841,99 @@ static bool RequiresSkippedDictionaryHeaderCheck(
   return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
 }
 
+Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
+    const parquet::ColumnChunk& col_chunk, int row_group_idx) {
+  num_buffered_values_ = 0;
+  data_ = nullptr;
+  data_end_ = nullptr;
+  stream_ = nullptr;
+  io_reservation_ = 0;
+  metadata_ = &col_chunk.meta_data;
+  num_values_read_ = 0;
+  def_level_ = -1;
+  // See ColumnReader constructor.
+  rep_level_ = max_rep_level() == 0 ? 0 : -1;
+  pos_current_value_ = -1;
+
+  if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
+    RETURN_IF_ERROR(Codec::CreateDecompressor(
+        nullptr, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], 
&decompressor_));
+  }
+  int64_t col_start = col_chunk.meta_data.data_page_offset;
+
+  
RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_,
+      parent_->filename(), row_group_idx, col_idx(), schema_element(),
+      parent_->state_));
+
+  if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+    // Already validated in ValidateColumnOffsets()
+    DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
+    col_start = col_chunk.meta_data.dictionary_page_offset;
+  }
+  int64_t col_len = col_chunk.meta_data.total_compressed_size;
+  if (col_len <= 0) {
+    return Status(Substitute("File '$0' contains invalid column chunk size: 
$1",
+        filename(), col_len));
+  }
+  int64_t col_end = col_start + col_len;
+
+  // Already validated in ValidateColumnOffsets()
+  DCHECK_GT(col_end, 0);
+  DCHECK_LT(col_end, file_desc.file_length);
+  const ParquetFileVersion& file_version = parent_->file_version_;
+  if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 
9)) {
+    // The Parquet MR writer had a bug in 1.2.8 and below where it didn't 
include the
+    // dictionary page header size in total_compressed_size and 
total_uncompressed_size
+    // (see IMPALA-694). We pad col_len to compensate.
+    int64_t bytes_remaining = file_desc.file_length - col_end;
+    int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
+    col_len += pad;
+  }
+
+  // TODO: this will need to change when we have co-located files and the 
columns
+  // are different files.
+  if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
+    return Status(Substitute("Expected parquet column file path '$0' to match "
+        "filename '$1'", col_chunk.file_path, filename()));
+  }
+
+  const ScanRange* metadata_range = parent_->metadata_range_;
+  int64_t partition_id = parent_->context_->partition_descriptor()->id();
+  const ScanRange* split_range =
+      
static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
+  // Determine if the column is completely contained within a local split.
+  bool col_range_local = split_range->expected_local()
+      && col_start >= split_range->offset()
+      && col_end <= split_range->offset() + split_range->len();
+  scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
+      filename(), col_len, col_start, partition_id, split_range->disk_id(),
+      col_range_local,
+      BufferOpts(split_range->try_cache(), file_desc.mtime));
+  ClearDictionaryDecoder();
+  return Status::OK();
+}
+
+Status BaseScalarColumnReader::StartScan() {
+  DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  ScannerContext* context = parent_->context_;
+  DCHECK_GT(io_reservation_, 0);
+  bool needs_buffers;
+  RETURN_IF_ERROR(io_mgr->StartScanRange(
+      parent_->scan_node_->reader_context(), scan_range_, &needs_buffers));
+  if (needs_buffers) {
+    RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
+        parent_->scan_node_->reader_context(), context->bp_client(),
+        scan_range_, io_reservation_));
+  }
+  stream_ = parent_->context_->AddStream(scan_range_, io_reservation_);
+  DCHECK(stream_ != nullptr);
+  return Status::OK();
+}
+
 Status BaseScalarColumnReader::ReadPageHeader(bool peek,
     parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* 
eos) {
+  DCHECK(stream_ != nullptr);
   *eos = false;
 
   uint8_t* buffer;
@@ -919,7 +1017,7 @@ Status BaseScalarColumnReader::InitDictionary() {
   bool eos;
   parquet::PageHeader next_page_header;
   uint32_t next_header_size;
-
+  DCHECK(stream_ != nullptr);
   DCHECK(!HasDictionaryDecoder());
 
   RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header,
@@ -1031,6 +1129,14 @@ Status BaseScalarColumnReader::InitDictionary() {
   return Status::OK();
 }
 
+Status BaseScalarColumnReader::InitDictionaries(
+    const vector<BaseScalarColumnReader*> readers) {
+  for (BaseScalarColumnReader* reader : readers) {
+    RETURN_IF_ERROR(reader->InitDictionary());
+  }
+  return Status::OK();
+}
+
 Status BaseScalarColumnReader::ReadDataPage() {
   // We're about to move to the next data page.  The previous data page is
   // now complete, free up any memory allocated for it. If the data page 
contained

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h 
b/be/src/exec/parquet-column-readers.h
index 86ca239..f19d40a 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -322,42 +322,29 @@ class BaseScalarColumnReader : public ParquetColumnReader 
{
   BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
       const SlotDescriptor* slot_desc)
     : ParquetColumnReader(parent, node, slot_desc),
-      data_(NULL),
-      data_end_(NULL),
-      def_levels_(true),
-      rep_levels_(false),
-      page_encoding_(parquet::Encoding::PLAIN_DICTIONARY),
-      num_buffered_values_(0),
-      num_values_read_(0),
-      metadata_(NULL),
-      stream_(NULL),
       data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
     DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
   }
 
   virtual ~BaseScalarColumnReader() { }
 
-  /// This is called once for each row group in the file.
-  Status Reset(const parquet::ColumnMetaData* metadata, 
ScannerContext::Stream* stream) {
-    DCHECK(stream != NULL);
-    DCHECK(metadata != NULL);
-
-    num_buffered_values_ = 0;
-    data_ = NULL;
-    data_end_ = NULL;
-    stream_ = stream;
-    metadata_ = metadata;
-    num_values_read_ = 0;
-    def_level_ = -1;
-    // See ColumnReader constructor.
-    rep_level_ = max_rep_level() == 0 ? 0 : -1;
-    pos_current_value_ = -1;
-
-    if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
-      RETURN_IF_ERROR(Codec::CreateDecompressor(
-          NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], 
&decompressor_));
-    }
-    ClearDictionaryDecoder();
+  /// Resets the reader for each row group in the file and creates the scan
+  /// range for the column, but does not start it. To start scanning,
+  /// set_io_reservation() must be called to assign reservation to this
+  /// column, followed by StartScan().
+  Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& 
col_chunk,
+    int row_group_idx);
+
+  /// Starts the column scan range. The reader must be Reset() and have a
+  /// reservation assigned via set_io_reservation(). This must be called
+  /// before any of the column data can be read (including dictionary and
+  /// data pages). Returns an error status if there was an error starting the
+  /// scan or allocating buffers for it.
+  Status StartScan();
+
+  /// Helper to start scans for multiple columns at once.
+  static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) 
{
+    for (BaseScalarColumnReader* reader : readers) 
RETURN_IF_ERROR(reader->StartScan());
     return Status::OK();
   }
 
@@ -372,22 +359,27 @@ class BaseScalarColumnReader : public ParquetColumnReader 
{
     if (dict_decoder != nullptr) dict_decoder->Close();
   }
 
+  io::ScanRange* scan_range() const { return scan_range_; }
   int64_t total_len() const { return metadata_->total_compressed_size; }
   int col_idx() const { return node_.col_idx; }
   THdfsCompression::type codec() const {
     if (metadata_ == NULL) return THdfsCompression::NONE;
     return PARQUET_TO_IMPALA_CODEC[metadata_->codec];
   }
+  void set_io_reservation(int bytes) { io_reservation_ = bytes; }
 
   /// Reads the next definition and repetition levels for this column. 
Initializes the
   /// next data page if necessary.
   virtual bool NextLevels() { return NextLevels<true>(); }
 
-  // Check the data stream to see if there is a dictionary page. If there is,
-  // use that page to initialize dict_decoder_ and advance the data stream
-  // past the dictionary page.
+  /// Check the data stream to see if there is a dictionary page. If there is,
+  /// use that page to initialize dict_decoder_ and advance the data stream
+  /// past the dictionary page.
   Status InitDictionary();
 
+  /// Convenience function to initialize multiple dictionaries.
+  static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> 
readers);
+
   // Returns the dictionary or NULL if the dictionary doesn't exist
   virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; }
 
@@ -413,33 +405,45 @@ class BaseScalarColumnReader : public ParquetColumnReader 
{
   // fit in as few cache lines as possible.
 
   /// Pointer to start of next value in data page
-  uint8_t* data_;
+  uint8_t* data_ = nullptr;
 
   /// End of the data page.
-  const uint8_t* data_end_;
+  const uint8_t* data_end_ = nullptr;
 
   /// Decoder for definition levels.
-  ParquetLevelDecoder def_levels_;
+  ParquetLevelDecoder def_levels_{true};
 
   /// Decoder for repetition levels.
-  ParquetLevelDecoder rep_levels_;
+  ParquetLevelDecoder rep_levels_{false};
 
   /// Page encoding for values of the current data page. Cached here for perf. 
Set in
   /// InitDataPage().
-  parquet::Encoding::type page_encoding_;
+  parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
 
   /// Num values remaining in the current data page
-  int num_buffered_values_;
+  int num_buffered_values_ = 0;
 
   // Less frequently used members that are not accessed in inner loop should 
go below
   // here so they do not occupy precious cache line space.
 
   /// The number of values seen so far. Updated per data page.
-  int64_t num_values_read_;
+  int64_t num_values_read_ = 0;
+
+  /// Metadata for the column for the current row group.
+  const parquet::ColumnMetaData* metadata_ = nullptr;
 
-  const parquet::ColumnMetaData* metadata_;
   boost::scoped_ptr<Codec> decompressor_;
-  ScannerContext::Stream* stream_;
+
+  /// The scan range for the column's data. Initialized for each row group by 
Reset().
+  io::ScanRange* scan_range_ = nullptr;
+
+  // Stream used to read data from 'scan_range_'. Initialized by StartScan().
+  ScannerContext::Stream* stream_ = nullptr;
+
+  /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. 
Must be set
+  /// with set_io_reservation() before 'stream_' is initialized. Reset for 
each row group
+  /// by Reset().
+  int64_t io_reservation_ = 0;
 
   /// Pool to allocate storage for data pages from - either decompression 
buffers for
   /// compressed data pages or copies of the data page with var-len data to 
attach to

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 0abf82f..c669e65 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -41,14 +41,15 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
 const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT;
 
 ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* 
scan_node,
-    HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range,
-    const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool)
+    BufferPool::ClientHandle* bp_client, HdfsPartitionDescriptor* 
partition_desc,
+    const vector<FilterContext>& filter_ctxs,
+    MemPool* expr_results_pool)
   : state_(state),
     scan_node_(scan_node),
+    bp_client_(bp_client),
     partition_desc_(partition_desc),
     filter_ctxs_(filter_ctxs),
     expr_results_pool_(expr_results_pool) {
-  AddStream(scan_range);
 }
 
 ScannerContext::~ScannerContext() {
@@ -66,19 +67,20 @@ void ScannerContext::ClearStreams() {
 }
 
 ScannerContext::Stream::Stream(ScannerContext* parent, ScanRange* scan_range,
-    const HdfsFileDesc* file_desc)
+    int64_t reservation, const HdfsFileDesc* file_desc)
   : parent_(parent),
     scan_range_(scan_range),
     file_desc_(file_desc),
+    reservation_(reservation),
     file_len_(file_desc->file_length),
     next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES),
     boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())),
     boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
 }
 
-ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) {
-  streams_.emplace_back(new Stream(
-      this, range, scan_node_->GetFileDesc(partition_desc_->id(), 
range->file())));
+ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range, int64_t 
reservation) {
+  streams_.emplace_back(new Stream(this, range, reservation,
+      scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
   return streams_.back().get();
 }
 
@@ -101,7 +103,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool 
done) {
 
 Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
   DCHECK_EQ(0, io_buffer_bytes_left_);
-  DiskIoMgr* io_mgr = parent_->state_->io_mgr();
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
   if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
   if (io_buffer_ != nullptr) ReturnIoBuffer();
 
@@ -134,6 +136,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t 
read_past_size) {
     read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
     read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
     read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size);
+    read_past_buffer_size = ::min(read_past_buffer_size, reservation_);
     // We're reading past the scan range. Be careful not to read past the end 
of file.
     DCHECK_GE(read_past_buffer_size, 0);
     if (read_past_buffer_size == 0) {
@@ -150,9 +153,14 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t 
read_past_size) {
     if (needs_buffers) {
       // Allocate fresh buffers. The buffers for 'scan_range_' should be 
released now
       // since we hit EOS.
+      if (reservation_ < io_mgr->min_buffer_size()) {
+        return Status(Substitute("Could not read past end of scan range in 
file '$0'. "
+            "Reservation provided $1 was < the minimum I/O buffer size",
+            reservation_, io_mgr->min_buffer_size()));
+      }
       RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
-          parent_->scan_node_->reader_context(), range,
-          3 * io_mgr->max_buffer_size()));
+          parent_->scan_node_->reader_context(), parent_->bp_client_, range,
+          reservation_));
     }
     RETURN_IF_ERROR(range->GetNext(&io_buffer_));
     DCHECK(io_buffer_->eosr());

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index a131d3f..6292486 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -27,6 +27,7 @@
 #include "common/compiler-util.h"
 #include "common/status.h"
 #include "exec/filter-context.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/io/request-ranges.h"
 
 namespace impala {
@@ -84,10 +85,12 @@ class TupleRow;
 class ScannerContext {
  public:
   /// Create a scanner context with the parent scan_node (where materialized 
row batches
-  /// get pushed to) and the scan range to process.
-  /// This context starts with 1 stream.
-  ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
-      io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
+  /// get pushed to) and the scan range to process. Buffers are allocated using
+  /// 'bp_client'.
+  ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
+      BufferPool::ClientHandle* bp_client,
+      HdfsPartitionDescriptor* partition_desc,
+      const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool);
   /// Destructor verifies that all stream objects have been released.
   ~ScannerContext();
@@ -150,6 +153,7 @@ class ScannerContext {
     const char* filename() { return scan_range_->file(); }
     const io::ScanRange* scan_range() { return scan_range_; }
     const HdfsFileDesc* file_desc() { return file_desc_; }
+    int64_t reservation() const { return reservation_; }
 
     /// Returns the buffer's current offset in the file.
     int64_t file_offset() const { return scan_range_->offset() + 
total_bytes_returned_; }
@@ -211,9 +215,15 @@ class ScannerContext {
 
    private:
     friend class ScannerContext;
-    ScannerContext* parent_;
-    io::ScanRange* scan_range_;
-    const HdfsFileDesc* file_desc_;
+    ScannerContext* const parent_;
+    io::ScanRange* const scan_range_;
+    const HdfsFileDesc* const file_desc_;
+
+    /// Reservation given to this stream for allocating I/O buffers. The 
reservation is
+    /// shared with 'scan_range_', so the context must be careful not to use 
this until
+    /// all of 'scan_ranges_'s buffers have been freed. Must be >= the minimum 
IoMgr
+    /// buffer size to allow reading past the end of 'scan_range_'.
+    const int64_t reservation_;
 
     /// Total number of bytes returned from GetBytes()
     int64_t total_bytes_returned_ = 0;
@@ -272,7 +282,8 @@ class ScannerContext {
     /// output_buffer_bytes_left_ will be set to something else.
     static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
 
-    Stream(ScannerContext* parent, io::ScanRange* scan_range,
+    /// Private constructor. See AddStream() for public API.
+    Stream(ScannerContext* parent, io::ScanRange* scan_range, int64_t 
reservation,
         const HdfsFileDesc* file_desc);
 
     /// GetBytes helper to handle the slow path.
@@ -355,24 +366,37 @@ class ScannerContext {
   /// size to 0.
   void ClearStreams();
 
-  /// Add a stream to this ScannerContext for 'range'. The stream is owned by 
this
-  /// context.
-  Stream* AddStream(io::ScanRange* range);
+  /// Add a stream to this ScannerContext for 'range'. 'range' must already 
have any
+  /// buffers that it needs allocated. 'reservation' is the amount of 
reservation that
+  /// is given to this stream for allocating I/O buffers. The reservation is 
shared with
+  /// 'range', so the context must be careful not to use this until all of 
'range's
+  /// buffers have been freed. Must be >= the minimum IoMgr buffer size o 
allow reading
+  /// past the end of 'range'.
+  ///
+  /// Returns the added stream. The returned stream is owned by this context.
+  Stream* AddStream(io::ScanRange* range, int64_t reservation);
 
   /// Returns true if RuntimeState::is_cancelled() is true, or if scan node is 
not
   /// multi-threaded and is done (finished, cancelled or reached it's limit).
   /// In all other cases returns false.
   bool cancelled() const;
 
-  HdfsPartitionDescriptor* partition_descriptor() { return partition_desc_; }
+  BufferPool::ClientHandle* bp_client() const { return bp_client_; }
+  HdfsPartitionDescriptor* partition_descriptor() const { return 
partition_desc_; }
   const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; 
}
   MemPool* expr_results_pool() const { return expr_results_pool_; }
  private:
   friend class Stream;
 
-  RuntimeState* state_;
-  HdfsScanNodeBase* scan_node_;
-  HdfsPartitionDescriptor* partition_desc_;
+  RuntimeState* const state_;
+  HdfsScanNodeBase* const scan_node_;
+
+  /// Buffer pool client used to allocate I/O buffers. This is accessed by 
multiple
+  /// threads in the multi-threaded scan node, so those threads must take care 
to only
+  /// call thread-safe BufferPool methods with this client.
+  BufferPool::ClientHandle* const bp_client_;
+
+  HdfsPartitionDescriptor* const partition_desc_;
 
   /// Vector of streams. Non-columnar formats will always have one stream per 
context.
   std::vector<std::unique_ptr<Stream>> streams_;

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc 
b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index c46c5ea..e441402 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -522,15 +522,15 @@ TEST_F(ReservationTrackerTest, TransferReservation) {
 TEST_F(ReservationTrackerTest, ReservationUtil) {
   const int64_t MEG = 1024 * 1024;
   const int64_t GIG = 1024 * 1024 * 1024;
-  EXPECT_EQ(75 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
+  EXPECT_EQ(32 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
 
   EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(0));
   EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(-1));
-  EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(75 * MEG));
+  EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(32 * MEG));
   EXPECT_EQ(8 * GIG, ReservationUtil::GetReservationLimitFromMemLimit(10 * 
GIG));
 
-  EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
-  EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
+  EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
+  EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
   EXPECT_EQ(500 * MEG, ReservationUtil::GetMinMemLimitFromReservation(400 * 
MEG));
   EXPECT_EQ(5 * GIG, ReservationUtil::GetMinMemLimitFromReservation(4 * GIG));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/runtime/bufferpool/reservation-util.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-util.cc 
b/be/src/runtime/bufferpool/reservation-util.cc
index 85718ab..a27ab9d 100644
--- a/be/src/runtime/bufferpool/reservation-util.cc
+++ b/be/src/runtime/bufferpool/reservation-util.cc
@@ -24,7 +24,7 @@ namespace impala {
 // Most operators that accumulate memory use reservations, so the majority of 
memory
 // should be allocated to buffer reservations, as a heuristic.
 const double ReservationUtil::RESERVATION_MEM_FRACTION = 0.8;
-const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 75 * 1024 * 
1024;
+const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 32 * 1024 * 
1024;
 
 int64_t ReservationUtil::GetReservationLimitFromMemLimit(int64_t mem_limit) {
   int64_t max_reservation = std::min<int64_t>(

http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/runtime/io/disk-io-mgr-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress-test.cc 
b/be/src/runtime/io/disk-io-mgr-stress-test.cc
index 0e41a6f..2ec1d09 100644
--- a/be/src/runtime/io/disk-io-mgr-stress-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress-test.cc
@@ -15,9 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/io/disk-io-mgr-stress.h"
+#include <gflags/gflags.h>
 
 #include "common/init.h"
+#include "runtime/io/disk-io-mgr-stress.h"
+#include "common/init.h"
 #include "runtime/test-env.h"
 #include "service/fe-support.h"
 #include "util/string-parser.h"
@@ -31,34 +33,32 @@ using namespace impala::io;
 // can be passed to control how long to run this test (0 for forever).
 
 // TODO: make these configurable once we decide how to run BE tests with args
-const int DEFAULT_DURATION_SEC = 1;
+constexpr int DEFAULT_DURATION_SEC = 1;
 const int NUM_DISKS = 5;
 const int NUM_THREADS_PER_DISK = 5;
 const int NUM_CLIENTS = 10;
 const bool TEST_CANCELLATION = true;
+const int64_t BUFFER_POOL_CAPACITY = 1024L * 1024L * 1024L * 4L;
+
+DEFINE_int64(duration_sec, DEFAULT_DURATION_SEC,
+    "Disk I/O Manager stress test duration in seconds. 0 means run 
indefinitely.");
 
 int main(int argc, char** argv) {
-  InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST);
-  InitFeSupport();
-  TestEnv test_env;
-  ABORT_IF_ERROR(test_env.Init());
-  int duration_sec = DEFAULT_DURATION_SEC;
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
 
-  if (argc == 2) {
-    StringParser::ParseResult status;
-    duration_sec = StringParser::StringToInt<int>(argv[1], strlen(argv[1]), 
&status);
-    if (status != StringParser::PARSE_SUCCESS) {
-      printf("Invalid arg: %s\n", argv[1]);
-      return 1;
-    }
-  }
-  if (duration_sec != 0) {
-    printf("Running stress test for %d seconds.\n", duration_sec);
+  if (FLAGS_duration_sec != 0) {
+    printf("Running stress test for %ld seconds.\n", FLAGS_duration_sec);
   } else {
     printf("Running stress test indefinitely.\n");
   }
-  DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, 
TEST_CANCELLATION);
-  test.Run(duration_sec);
 
+  TestEnv test_env;
+  // Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool 
allows it.
+  test_env.SetBufferPoolArgs(DiskIoMgrStress::MIN_READ_BUFFER_SIZE, 
BUFFER_POOL_CAPACITY);
+  Status status = test_env.Init();
+  CHECK(status.ok()) << status.GetDetail();
+  DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, 
TEST_CANCELLATION);
+  test.Run(FLAGS_duration_sec);
   return 0;
 }

Reply via email to