IMPALA-4835: Part 3: switch I/O buffers to buffer pool This is the final patch to switch the Disk I/O manager to allocate all buffer from the buffer pool and to reserve the buffers required for a query upfront.
* The planner reserves enough memory to run a single scanner per scan node. * The multi-threaded scan node must increase reservation before spinning up more threads. * The scanner implementations must be careful to stay within their assigned reservation. The row-oriented scanners were most straightforward, since they only have a single scan range active at a time. A single I/O buffer is sufficient to scan the whole file but more I/O buffers can improve I/O throughput. Parquet is more complex because it issues a scan range per column and the sizes of the columns on disk are not known during planning. To deal with this, the reservation in the frontend is based on a heuristic involving the file size and # columns. The Parquet scanner can then divvy up reservation to columns based on the size of column data on disk. I adjusted how the 'mem_limit' is divided between buffer pool and non buffer pool memory for low mem_limits to account for the increase in buffer pool memory. Testing: * Added more planner tests to cover reservation calcs for scan node. * Test scanners for all file formats with the reservation denial debug action, to test behaviour when the scanners hit reservation limits. * Updated memory and buffer pool limits for tests. * Added unit tests for dividing reservation between columns in parquet, since the algorithm is non-trivial. Perf: I ran TPC-H and targeted perf locally comparing with master. Both showed small improvements of a few percent and no regressions of note. Cluster perf tests showed no significant change. Conflicts: be/src/exec/parquet-column-readers.cc testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test Other backport notes: File sizes of functional_parquet.alltypes are slightly less than 16kb on 2.x but slightly more than 16kb on master because sorts are not clustered by default. This changes resource requirements in some plans. Change-Id: Ic09c6196b31e55b301df45cc56d0b72cfece6786 Reviewed-on: http://gerrit.cloudera.org:8080/9417 Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8c922a6e Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8c922a6e Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8c922a6e Branch: refs/heads/2.x Commit: 8c922a6ef675181c8457bf712a0f2146ab2fefd2 Parents: 0b6fab7 Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Fri Jan 5 16:47:03 2018 -0800 Committer: Tim Armstrong <tarmstr...@cloudera.com> Committed: Fri Feb 23 22:51:01 2018 +0000 ---------------------------------------------------------------------- be/src/exec/CMakeLists.txt | 1 + be/src/exec/hdfs-parquet-scanner-test.cc | 96 + be/src/exec/hdfs-parquet-scanner.cc | 218 ++- be/src/exec/hdfs-parquet-scanner.h | 38 +- be/src/exec/hdfs-scan-node-base.cc | 27 +- be/src/exec/hdfs-scan-node-base.h | 3 + be/src/exec/hdfs-scan-node-mt.cc | 11 +- be/src/exec/hdfs-scan-node.cc | 143 +- be/src/exec/hdfs-scan-node.h | 66 +- be/src/exec/parquet-column-readers.cc | 109 +- be/src/exec/parquet-column-readers.h | 88 +- be/src/exec/scanner-context.cc | 28 +- be/src/exec/scanner-context.h | 54 +- .../bufferpool/reservation-tracker-test.cc | 8 +- be/src/runtime/bufferpool/reservation-util.cc | 2 +- be/src/runtime/io/disk-io-mgr-stress-test.cc | 38 +- be/src/runtime/io/disk-io-mgr-stress.cc | 69 +- be/src/runtime/io/disk-io-mgr-stress.h | 26 +- be/src/runtime/io/disk-io-mgr-test.cc | 542 +++--- be/src/runtime/io/disk-io-mgr.cc | 34 +- be/src/runtime/io/disk-io-mgr.h | 35 +- be/src/runtime/io/request-context.cc | 29 +- be/src/runtime/io/request-context.h | 14 +- be/src/runtime/io/request-ranges.h | 15 +- be/src/runtime/io/scan-range.cc | 6 +- be/src/runtime/tmp-file-mgr.cc | 2 +- common/thrift/PlanNodes.thrift | 3 + .../apache/impala/analysis/SlotDescriptor.java | 19 + .../org/apache/impala/analysis/SlotRef.java | 20 - .../org/apache/impala/planner/HdfsScanNode.java | 167 +- .../java/org/apache/impala/util/BitUtil.java | 6 + .../org/apache/impala/util/BitUtilTest.java | 6 + .../queries/PlannerTest/constant-folding.test | 42 +- .../queries/PlannerTest/disable-codegen.test | 24 +- .../PlannerTest/fk-pk-join-detection.test | 78 +- .../queries/PlannerTest/max-row-size.test | 80 +- .../PlannerTest/min-max-runtime-filters.test | 6 +- .../queries/PlannerTest/mt-dop-validation.test | 40 +- .../queries/PlannerTest/parquet-filtering.test | 42 +- .../queries/PlannerTest/partition-pruning.test | 4 +- .../PlannerTest/resource-requirements.test | 1814 ++++++++++++++---- .../PlannerTest/sort-expr-materialization.test | 32 +- .../PlannerTest/spillable-buffer-sizing.test | 192 +- .../queries/PlannerTest/tablesample.test | 44 +- .../queries/PlannerTest/union.test | 8 +- .../admission-reject-min-reservation.test | 12 +- .../queries/QueryTest/analytic-fns.test | 5 +- .../queries/QueryTest/codegen-mem-limit.test | 5 +- .../QueryTest/disk-spill-encryption.test | 2 +- .../queries/QueryTest/explain-level0.test | 4 +- .../queries/QueryTest/explain-level1.test | 4 +- .../queries/QueryTest/explain-level2.test | 14 +- .../queries/QueryTest/explain-level3.test | 14 +- .../queries/QueryTest/nested-types-tpch.test | 6 +- .../queries/QueryTest/runtime_row_filters.test | 11 +- .../queries/QueryTest/scanners.test | 7 + .../functional-query/queries/QueryTest/set.test | 2 +- .../queries/QueryTest/spilling-aggs.test | 19 +- .../spilling-naaj-no-deny-reservation.test | 7 +- .../queries/QueryTest/spilling-naaj.test | 8 +- .../QueryTest/spilling-no-debug-action.test | 66 + .../QueryTest/spilling-sorts-exhaustive.test | 10 +- .../queries/QueryTest/spilling.test | 76 +- .../queries/QueryTest/stats-extrapolation.test | 52 +- .../tpch/queries/sort-reservation-usage.test | 9 +- tests/common/test_dimensions.py | 16 +- tests/custom_cluster/test_scratch_disk.py | 2 +- tests/query_test/test_mem_usage_scaling.py | 11 +- tests/query_test/test_query_mem_limit.py | 4 +- tests/query_test/test_scanners.py | 25 +- tests/query_test/test_scanners_fuzz.py | 9 +- tests/query_test/test_sort.py | 16 +- tests/query_test/test_spilling.py | 5 + 73 files changed, 3204 insertions(+), 1546 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index aab1383..c1f91d6 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -108,3 +108,4 @@ ADD_BE_TEST(parquet-version-test) ADD_BE_TEST(row-batch-list-test) ADD_BE_TEST(incr-stats-util-test) ADD_BE_TEST(hdfs-avro-scanner-test) +ADD_BE_TEST(hdfs-parquet-scanner-test) http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/hdfs-parquet-scanner-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner-test.cc b/be/src/exec/hdfs-parquet-scanner-test.cc new file mode 100644 index 0000000..cbc6e76 --- /dev/null +++ b/be/src/exec/hdfs-parquet-scanner-test.cc @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/hdfs-parquet-scanner.h" +#include "testutil/gtest-util.h" + +#include "common/names.h" + +static const int64_t MIN_BUFFER_SIZE = 64 * 1024; +static const int64_t MAX_BUFFER_SIZE = 8 * 1024 * 1024; + +namespace impala { + +class HdfsParquetScannerTest : public testing::Test { + protected: + void TestDivideReservation(const vector<int64_t>& col_range_lengths, + int64_t total_col_reservation, const vector<int64_t>& expected_reservations); +}; + +/// Test that DivideReservationBetweenColumns() returns 'expected_reservations' for +/// inputs 'col_range_lengths' and 'total_col_reservation'. +void HdfsParquetScannerTest::TestDivideReservation(const vector<int64_t>& col_range_lengths, + int64_t total_col_reservation, const vector<int64_t>& expected_reservations) { + vector<pair<int, int64_t>> reservations = + HdfsParquetScanner::DivideReservationBetweenColumnsHelper( + MIN_BUFFER_SIZE, MAX_BUFFER_SIZE, col_range_lengths, total_col_reservation); + for (int i = 0; i < reservations.size(); ++i) { + LOG(INFO) << i << " " << reservations[i].first << " " << reservations[i].second; + } + EXPECT_EQ(reservations.size(), expected_reservations.size()); + vector<bool> present(expected_reservations.size(), false); + for (auto& reservation: reservations) { + // Ensure that each appears exactly once. + EXPECT_FALSE(present[reservation.first]); + present[reservation.first] = true; + EXPECT_EQ(expected_reservations[reservation.first], reservation.second) + << reservation.first; + } +} + +TEST_F(HdfsParquetScannerTest, DivideReservation) { + // Test a long scan ranges with lots of memory - should allocate 3 max-size + // buffers per range. + TestDivideReservation({100 * 1024 * 1024}, 50 * 1024 * 1024, {3 * MAX_BUFFER_SIZE}); + TestDivideReservation({100 * 1024 * 1024, 50 * 1024 * 1024}, 100 * 1024 * 1024, + {3 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE}); + + // Long scan ranges, not enough memory for 3 buffers each. Should only allocate + // max-sized buffers, preferring the longer scan range. + TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 5 * MAX_BUFFER_SIZE, + {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE}); + TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, + 5 * MAX_BUFFER_SIZE + MIN_BUFFER_SIZE, + {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE}); + TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 6 * MAX_BUFFER_SIZE - 1, + {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE}); + + // Test a short range with lots of memory - should round up buffer size. + TestDivideReservation({100 * 1024}, 50 * 1024 * 1024, {128 * 1024}); + + // Test a range << MIN_BUFFER_SIZE - should round up to buffer size. + TestDivideReservation({13}, 50 * 1024 * 1024, {MIN_BUFFER_SIZE}); + + // Test long ranges with limited memory. + TestDivideReservation({100 * 1024 * 1024}, 100 * 1024, {MIN_BUFFER_SIZE}); + TestDivideReservation({100 * 1024 * 1024}, MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE}); + TestDivideReservation({100 * 1024 * 1024}, 2 * MIN_BUFFER_SIZE, {2 * MIN_BUFFER_SIZE}); + TestDivideReservation({100 * 1024 * 1024}, MAX_BUFFER_SIZE - 1, {MAX_BUFFER_SIZE / 2}); + TestDivideReservation({100 * 1024 * 1024, 1024 * 1024, MIN_BUFFER_SIZE}, + 3 * MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE, MIN_BUFFER_SIZE, MIN_BUFFER_SIZE}); + + // Test a mix of scan range lengths larger than and smaller than the max I/O buffer + // size. Long ranges get allocated most memory. + TestDivideReservation( + {15145047, 5019635, 5019263, 15145047, 15145047, 5019635, 5019263, 317304}, + 25165824, + {8388608, 2097152, 524288, 8388608, 4194304, 1048576, 262144, 262144}); +} + +} + +IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 0188f08..51a39be 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -17,6 +17,7 @@ #include "exec/hdfs-parquet-scanner.h" +#include <algorithm> #include <queue> #include <gutil/strings/substitute.h> @@ -27,6 +28,7 @@ #include "exec/parquet-column-stats.h" #include "exec/scanner-context.inline.h" #include "runtime/collection-value-builder.h" +#include "runtime/exec-env.h" #include "runtime/io/disk-io-mgr.h" #include "runtime/runtime-state.h" #include "runtime/runtime-filter.inline.h" @@ -35,6 +37,7 @@ #include "common/names.h" using std::move; +using std::sort; using namespace impala; using namespace impala::io; @@ -47,10 +50,6 @@ constexpr int BATCHES_PER_FILTER_SELECTIVITY_CHECK = 16; static_assert(BitUtil::IsPowerOf2(BATCHES_PER_FILTER_SELECTIVITY_CHECK), "BATCHES_PER_FILTER_SELECTIVITY_CHECK must be a power of two"); -// Max dictionary page header size in bytes. This is an estimate and only needs to be an -// upper bound. -const int MAX_DICT_HEADER_SIZE = 100; - // Max entries in the dictionary before switching to PLAIN encoding. If a dictionary // has fewer entries, then the entire column is dictionary encoded. This threshold // is guaranteed to be true for Impala versions 2.9 or below. @@ -99,7 +98,7 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, static_cast<ScanRangeMetadata*>(split->meta_data()); // Each split is processed by first issuing a scan range for the file footer, which // is done here, followed by scan ranges for the columns of each row group within - // the actual split (in InitColumns()). The original split is stored in the + // the actual split (see InitScalarColumns()). The original split is stored in the // metadata associated with the footer range. ScanRange* footer_range; if (footer_split != nullptr) { @@ -229,9 +228,14 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { // Release I/O buffers immediately to make sure they are cleaned up // in case we return a non-OK status anywhere below. + int64_t stream_reservation = stream_->reservation(); + stream_ = nullptr; context_->ReleaseCompletedResources(true); context_->ClearStreams(); RETURN_IF_ERROR(footer_status); + // The scanner-wide stream was used only to read the file footer. Each column has added + // its own stream. We can use the reservation from 'stream_' for the columns now. + total_col_reservation_ = stream_reservation; // Parse the file schema into an internal representation for schema resolution. schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(), @@ -248,10 +252,6 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()]; RETURN_IF_ERROR(InitDictFilterStructures()); - - // The scanner-wide stream was used only to read the file footer. Each column has added - // its own stream. - stream_ = nullptr; return Status::OK(); } @@ -675,15 +675,13 @@ Status HdfsParquetScanner::NextRowGroup() { } InitCollectionColumns(); + RETURN_IF_ERROR(InitScalarColumns()); - // Prepare dictionary filtering columns for first read - // This must be done before dictionary filtering, because this code initializes - // the column offsets and streams needed to read the dictionaries. - // TODO: Restructure the code so that the dictionary can be read without the rest - // of the column. - RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, dict_filterable_readers_)); + // Start scanning dictionary filtering column readers, so we can read the dictionary + // pages in EvalDictionaryFilters(). + RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(dict_filterable_readers_)); - // InitColumns() may have allocated resources to scan columns. If we skip this row + // StartScans() may have allocated resources to scan columns. If we skip this row // group below, we must call ReleaseSkippedRowGroupResources() before continuing. // If there is a dictionary-encoded column where every value is eliminated @@ -704,10 +702,10 @@ Status HdfsParquetScanner::NextRowGroup() { } // At this point, the row group has passed any filtering criteria - // Prepare non-dictionary filtering column readers for first read and - // initialize their dictionaries. - RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, non_dict_filterable_readers_)); - status = InitDictionaries(non_dict_filterable_readers_); + // Start scanning non-dictionary filtering column readers and initialize their + // dictionaries. + RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(non_dict_filterable_readers_)); + status = BaseScalarColumnReader::InitDictionaries(non_dict_filterable_readers_); if (!status.ok()) { // Either return an error or skip this row group if it is ok to ignore errors RETURN_IF_ERROR(state_->LogOrReturnError(status.msg())); @@ -742,7 +740,6 @@ Status HdfsParquetScanner::NextRowGroup() { break; } } - DCHECK(parse_status_.ok()); return Status::OK(); } @@ -800,6 +797,7 @@ void HdfsParquetScanner::PartitionReaders( } else { BaseScalarColumnReader* scalar_reader = static_cast<BaseScalarColumnReader*>(reader); + scalar_readers_.push_back(scalar_reader); if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) { dict_filterable_readers_.push_back(scalar_reader); } else { @@ -991,7 +989,7 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr // Any columns that were not 100% dictionary encoded need to initialize // their dictionaries here. - RETURN_IF_ERROR(InitDictionaries(deferred_dict_init_list)); + RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list)); return Status::OK(); } @@ -1648,23 +1646,16 @@ void HdfsParquetScanner::InitCollectionColumns() { } } -Status HdfsParquetScanner::InitScalarColumns( - int row_group_idx, const vector<BaseScalarColumnReader*>& column_readers) { +Status HdfsParquetScanner::InitScalarColumns() { int64_t partition_id = context_->partition_descriptor()->id(); const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename()); DCHECK(file_desc != nullptr); - parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx]; + parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; - // All the scan ranges (one for each column). - vector<ScanRange*> col_ranges; // Used to validate that the number of values in each reader in column_readers_ at the // same SchemaElement is the same. unordered_map<const parquet::SchemaElement*, int> num_values_map; - // Used to validate we issued the right number of scan ranges - int num_scalar_readers = 0; - - for (BaseScalarColumnReader* scalar_reader: column_readers) { - ++num_scalar_readers; + for (BaseScalarColumnReader* scalar_reader : scalar_readers_) { const parquet::ColumnChunk& col_chunk = row_group.columns[scalar_reader->col_idx()]; auto num_values_it = num_values_map.find(&scalar_reader->schema_element()); int num_values = -1; @@ -1673,84 +1664,115 @@ Status HdfsParquetScanner::InitScalarColumns( } else { num_values_map[&scalar_reader->schema_element()] = col_chunk.meta_data.num_values; } - int64_t col_start = col_chunk.meta_data.data_page_offset; - if (num_values != -1 && col_chunk.meta_data.num_values != num_values) { // TODO: improve this error message by saying which columns are different, // and also specify column in other error messages as appropriate return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(), col_chunk.meta_data.num_values, num_values, filename()); } + RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_)); + } + RETURN_IF_ERROR( + DivideReservationBetweenColumns(scalar_readers_, total_col_reservation_)); + return Status::OK(); +} - RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(file_metadata_, - filename(), row_group_idx, scalar_reader->col_idx(), - scalar_reader->schema_element(), state_)); - - if (col_chunk.meta_data.__isset.dictionary_page_offset) { - // Already validated in ValidateColumnOffsets() - DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start); - col_start = col_chunk.meta_data.dictionary_page_offset; - } - int64_t col_len = col_chunk.meta_data.total_compressed_size; - if (col_len <= 0) { - return Status(Substitute("File '$0' contains invalid column chunk size: $1", - filename(), col_len)); - } - int64_t col_end = col_start + col_len; - - // Already validated in ValidateColumnOffsets() - DCHECK_GT(col_end, 0); - DCHECK_LT(col_end, file_desc->file_length); - if (file_version_.application == "parquet-mr" && file_version_.VersionLt(1, 2, 9)) { - // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the - // dictionary page header size in total_compressed_size and total_uncompressed_size - // (see IMPALA-694). We pad col_len to compensate. - int64_t bytes_remaining = file_desc->file_length - col_end; - int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining); - col_len += pad; - } - - // TODO: this will need to change when we have co-located files and the columns - // are different files. - if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) { - return Status(Substitute("Expected parquet column file path '$0' to match " - "filename '$1'", col_chunk.file_path, filename())); - } - - const ScanRange* split_range = - static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split; - - // Determine if the column is completely contained within a local split. - bool col_range_local = split_range->expected_local() - && col_start >= split_range->offset() - && col_end <= split_range->offset() + split_range->len(); - ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(), - filename(), col_len, col_start, partition_id, split_range->disk_id(), - col_range_local, - BufferOpts(split_range->try_cache(), file_desc->mtime)); - col_ranges.push_back(col_range); - - // Get the stream that will be used for this column - ScannerContext::Stream* stream = context_->AddStream(col_range); - DCHECK(stream != nullptr); +Status HdfsParquetScanner::DivideReservationBetweenColumns( + const vector<BaseScalarColumnReader*>& column_readers, + int64_t reservation_to_distribute) { + DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); + const int64_t min_buffer_size = io_mgr->min_buffer_size(); + const int64_t max_buffer_size = io_mgr->max_buffer_size(); + // The HdfsScanNode reservation calculation in the planner ensures that we have + // reservation for at least one buffer per column. + if (reservation_to_distribute < min_buffer_size * column_readers.size()) { + return Status(TErrorCode::INTERNAL_ERROR, + Substitute("Not enough reservation in Parquet scanner for file '$0'. Need at " + "least $1 bytes per column for $2 columns but had $3 bytes", + filename(), min_buffer_size, column_readers.size(), + reservation_to_distribute)); + } - RETURN_IF_ERROR(scalar_reader->Reset(&col_chunk.meta_data, stream)); + vector<int64_t> col_range_lengths(column_readers.size()); + for (int i = 0; i < column_readers.size(); ++i) { + col_range_lengths[i] = column_readers[i]->scan_range()->len(); } - DCHECK_EQ(col_ranges.size(), num_scalar_readers); + vector<pair<int, int64_t>> tmp_reservations = DivideReservationBetweenColumnsHelper( + min_buffer_size, max_buffer_size, col_range_lengths, reservation_to_distribute); + for (auto& tmp_reservation : tmp_reservations) { + column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second); + } + return Status::OK(); +} - DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr(); - // Issue all the column chunks to the IoMgr. We scan through all columns at the same - // time so need to read from all of them concurrently. - for (ScanRange* col_range : col_ranges) { - bool needs_buffers; - RETURN_IF_ERROR(io_mgr->StartScanRange( - scan_node_->reader_context(), col_range, &needs_buffers)); - if (needs_buffers) { - RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange( - scan_node_->reader_context(), col_range, 3 * io_mgr->max_buffer_size())); +vector<pair<int, int64_t>> HdfsParquetScanner::DivideReservationBetweenColumnsHelper( + int64_t min_buffer_size, int64_t max_buffer_size, + const vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute) { + // Pair of (column index, reservation allocated). + vector<pair<int, int64_t>> tmp_reservations; + for (int i = 0; i < col_range_lengths.size(); ++i) tmp_reservations.emplace_back(i, 0); + + // Sort in descending order of length, breaking ties by index so that larger columns + // get allocated reservation first. It is common to have dramatically different column + // sizes in a single file because of different value sizes and compressibility. E.g. + // consider a large STRING "comment" field versus a highly compressible + // dictionary-encoded column with only a few distinct values. We want to give max-sized + // buffers to large columns first to maximize the size of I/Os that we do while reading + // this row group. + sort(tmp_reservations.begin(), tmp_reservations.end(), + [&col_range_lengths](const pair<int, int64_t>& left, const pair<int, int64_t>& right) { + int64_t left_len = col_range_lengths[left.first]; + int64_t right_len = col_range_lengths[right.first]; + return left_len != right_len ? left_len > right_len : left.first < right.first; + }); + + // Set aside the minimum reservation per column. + reservation_to_distribute -= min_buffer_size * col_range_lengths.size(); + + // Allocate reservations to columns by repeatedly allocating either a max-sized buffer + // or a large enough buffer to fit the remaining data for each column. Do this + // round-robin up to the ideal number of I/O buffers. + for (int i = 0; i < DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) { + for (auto& tmp_reservation : tmp_reservations) { + // Add back the reservation we set aside above. + if (i == 0) reservation_to_distribute += min_buffer_size; + + int64_t bytes_left_in_range = + col_range_lengths[tmp_reservation.first] - tmp_reservation.second; + int64_t bytes_to_add; + if (bytes_left_in_range >= max_buffer_size) { + if (reservation_to_distribute >= max_buffer_size) { + bytes_to_add = max_buffer_size; + } else if (i == 0) { + DCHECK_EQ(0, tmp_reservation.second); + // Ensure this range gets at least one buffer on the first iteration. + bytes_to_add = BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute); + } else { + DCHECK_GT(tmp_reservation.second, 0); + // We need to read more than the max buffer size, but can't allocate a + // max-sized buffer. Stop adding buffers to this column: we prefer to use + // the existing max-sized buffers without small buffers mixed in so that + // we will alway do max-sized I/Os, which make efficient use of I/O devices. + bytes_to_add = 0; + } + } else if (bytes_left_in_range > 0 && + reservation_to_distribute >= min_buffer_size) { + // Choose a buffer size that will fit the rest of the bytes left in the range. + bytes_to_add = max(min_buffer_size, BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range)); + // But don't add more reservation than is available. + bytes_to_add = + min(bytes_to_add, BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute)); + } else { + bytes_to_add = 0; + } + DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << bytes_to_add; + reservation_to_distribute -= bytes_to_add; + tmp_reservation.second += bytes_to_add; + DCHECK_GE(reservation_to_distribute, 0); + DCHECK_GT(tmp_reservation.second, 0); } } - return Status::OK(); + return tmp_reservations; } Status HdfsParquetScanner::InitDictionaries( http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index 1fc3239..92f2550 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -34,7 +34,7 @@ class CollectionValueBuilder; struct HdfsFileDesc; /// Internal schema representation and resolution. -class SchemaNode; +struct SchemaNode; /// Class that implements Parquet definition and repetition level decoding. class ParquetLevelDecoder; @@ -361,6 +361,7 @@ class HdfsParquetScanner : public HdfsScanner { template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> friend class ScalarColumnReader; friend class BoolColumnReader; + friend class HdfsParquetScannerTest; /// Size of the file footer. This is a guess. If this value is too little, we will /// need to issue another read. @@ -429,7 +430,7 @@ class HdfsParquetScanner : public HdfsScanner { /// Number of scratch batches processed so far. int64_t row_batches_produced_; - /// Column reader for each materialized columns for this file. + /// Column reader for each top-level materialized slot in the output tuple. std::vector<ParquetColumnReader*> column_readers_; /// Column readers will write slot values into this scratch batch for @@ -445,6 +446,9 @@ class HdfsParquetScanner : public HdfsScanner { /// Scan range for the metadata. const io::ScanRange* metadata_range_; + /// Reservation available for scanning columns, in bytes. + int64_t total_col_reservation_ = 0; + /// Pool to copy dictionary page buffer into. This pool is shared across all the /// pages in a column chunk. boost::scoped_ptr<MemPool> dictionary_pool_; @@ -461,6 +465,9 @@ class HdfsParquetScanner : public HdfsScanner { /// or nested within a collection. std::vector<BaseScalarColumnReader*> non_dict_filterable_readers_; + /// Flattened list of all scalar column readers in column_readers_. + std::vector<BaseScalarColumnReader*> scalar_readers_; + /// Flattened collection column readers that point to readers in column_readers_. std::vector<CollectionColumnReader*> collection_readers_; @@ -627,12 +634,24 @@ class HdfsParquetScanner : public HdfsScanner { WARN_UNUSED_RESULT; /// Walks file_metadata_ and initiates reading the materialized columns. This - /// initializes 'column_readers' and issues the reads for the columns. 'column_readers' - /// includes a mix of scalar readers from multiple schema nodes (i.e., readers of - /// top-level scalar columns and readers of scalar columns within a collection node). - Status InitScalarColumns( - int row_group_idx, const std::vector<BaseScalarColumnReader*>& column_readers) - WARN_UNUSED_RESULT; + /// initializes 'scalar_readers_' and divides reservation between the columns but + /// does not start any scan ranges. + Status InitScalarColumns() WARN_UNUSED_RESULT; + + /// Decides how to divide 'reservation_to_distribute' bytes of reservation between the + /// columns. Sets the reservation on each corresponding reader in 'column_readers'. + Status DivideReservationBetweenColumns( + const std::vector<BaseScalarColumnReader*>& column_readers, + int64_t reservation_to_distribute); + + /// Helper for DivideReservationBetweenColumns. Implements the core algorithm for + /// dividing a reservation of 'reservation_to_distribute' bytes between columns with + /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns + /// a vector with an entry per column with the index into 'col_range_lengths' and the + /// amount of reservation in bytes to give to that column. + static std::vector<std::pair<int, int64_t>> DivideReservationBetweenColumnsHelper( + int64_t min_buffer_size, int64_t max_buffer_size, + const std::vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute); /// Initializes the column readers in collection_readers_. void InitCollectionColumns(); @@ -668,7 +687,8 @@ class HdfsParquetScanner : public HdfsScanner { /// Partitions the readers into scalar and collection readers. The collection readers /// are flattened into collection_readers_. The scalar readers are partitioned into /// dict_filterable_readers_ and non_dict_filterable_readers_ depending on whether - /// dictionary filtering is enabled and the reader can be dictionary filtered. + /// dictionary filtering is enabled and the reader can be dictionary filtered. All + /// scalar readers are also flattened into scalar_readers_. void PartitionReaders(const vector<ParquetColumnReader*>& readers, bool can_eval_dict_filters); http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 861d5dc..164197d 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -35,6 +35,7 @@ #include "exprs/scalar-expr-evaluator.h" #include "exprs/scalar-expr.h" #include "runtime/descriptors.h" +#include "runtime/exec-env.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/io/disk-io-mgr.h" #include "runtime/io/request-context.h" @@ -67,6 +68,7 @@ const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024; HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ScanNode(pool, tnode, descs), + ideal_scan_range_reservation_(tnode.hdfs_scan_node.ideal_scan_range_reservation), min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ? tnode.hdfs_scan_node.min_max_tuple_id : -1), skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ? @@ -83,6 +85,7 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr), disks_accessed_bitmap_(TUnit::UNIT, 0), active_hdfs_read_thread_counter_(TUnit::UNIT, 0) { + DCHECK_GE(ideal_scan_range_reservation_, resource_profile_.min_reservation); } HdfsScanNodeBase::~HdfsScanNodeBase() { @@ -112,7 +115,6 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts, *min_max_row_desc, state, &min_max_conjuncts_)); } - return Status::OK(); } @@ -246,6 +248,16 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) { ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size()); ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id); + // Check if reservation was enough to allocate at least one buffer. The + // reservation calculation in HdfsScanNode.java should guarantee this. + // Hitting this error indicates a misconfiguration or bug. + int64_t min_buffer_size = ExecEnv::GetInstance()->disk_io_mgr()->min_buffer_size(); + if (scan_range_params_->size() > 0 + && resource_profile_.min_reservation < min_buffer_size) { + return Status(TErrorCode::INTERNAL_ERROR, + Substitute("HDFS scan min reservation $0 must be >= min buffer size $1", + resource_profile_.min_reservation, min_buffer_size)); + } // Add per volume stats to the runtime profile PerVolumeStats per_volume_stats; stringstream str; @@ -332,7 +344,18 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) { partition_desc->partition_key_value_evals(), scan_node_pool_.get(), state); } - reader_context_ = runtime_state_->io_mgr()->RegisterContext(mem_tracker()); + RETURN_IF_ERROR(ClaimBufferReservation(state)); + // We got the minimum reservation. Now try to get ideal reservation. + if (resource_profile_.min_reservation != ideal_scan_range_reservation_) { + bool increased = buffer_pool_client_.IncreaseReservation( + ideal_scan_range_reservation_ - resource_profile_.min_reservation); + VLOG_FILE << "Increasing reservation from minimum " + << resource_profile_.min_reservation << "B to ideal " + << ideal_scan_range_reservation_ << "B " + << (increased ? "succeeded" : "failed"); + } + + reader_context_ = runtime_state_->io_mgr()->RegisterContext(); // Initialize HdfsScanNode specific counters // TODO: Revisit counters and move the counters specific to multi-threaded scans http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 70fbac2..3a9c37f 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -322,6 +322,9 @@ class HdfsScanNodeBase : public ScanNode { friend class ScannerContext; friend class HdfsScanner; + /// Ideal reservation to process each input split, computed by the planner. + const int64_t ideal_scan_range_reservation_; + /// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics. const int min_max_tuple_id_; http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/hdfs-scan-node-mt.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc index be75677..f4948d9 100644 --- a/be/src/exec/hdfs-scan-node-mt.cc +++ b/be/src/exec/hdfs-scan-node-mt.cc @@ -86,17 +86,18 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e StopAndFinalizeCounters(); return Status::OK(); } + int64_t scanner_reservation = buffer_pool_client_.GetReservation(); if (needs_buffers) { - RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(), scan_range_, - 3 * io_mgr->max_buffer_size())); + RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(), + &buffer_pool_client_, scan_range_, scanner_reservation)); } ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range_->meta_data()); int64_t partition_id = metadata->partition_id; HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id); - scanner_ctx_.reset(new ScannerContext( - runtime_state_, this, partition, scan_range_, filter_ctxs(), - expr_results_pool())); + scanner_ctx_.reset(new ScannerContext(runtime_state_, this, &buffer_pool_client_, + partition, filter_ctxs(), expr_results_pool())); + scanner_ctx_->AddStream(scan_range_, scanner_reservation); Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_); if (!status.ok()) { DCHECK(scanner_ == NULL); http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index f9d71e9..a95e47a 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -46,16 +46,6 @@ DECLARE_bool(skip_file_runtime_filtering); using namespace impala; using namespace impala::io; -// Amount of memory that we approximate a scanner thread will use not including IoBuffers. -// The memory used does not vary considerably between file formats (just a couple of MBs). -// This value is conservative and taken from running against the tpch lineitem table. -// TODO: revisit how we do this. -const int SCANNER_THREAD_MEM_USAGE = 32 * 1024 * 1024; - -// Estimated upper bound on the compression ratio of compressed text files. Used to -// estimate scanner thread memory usage. -const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11; - // Amount of time to block waiting for GetNext() to release scanner threads between // checking if a scanner thread should yield itself back to the global thread pool. const int SCANNER_THREAD_WAIT_TIME_MS = 20; @@ -63,11 +53,6 @@ const int SCANNER_THREAD_WAIT_TIME_MS = 20; HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : HdfsScanNodeBase(pool, tnode, descs), - ranges_issued_barrier_(1), - scanner_thread_bytes_required_(0), - done_(false), - all_ranges_started_(false), - thread_avail_cb_id_(-1), max_num_scanner_threads_(CpuInfo::num_cores()) { } @@ -168,36 +153,6 @@ Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) { Status HdfsScanNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state)); - - // Compute the minimum bytes required to start a new thread. This is based on the - // file format. - // The higher the estimate, the less likely it is the query will fail but more likely - // the query will be throttled when it does not need to be. - // TODO: how many buffers should we estimate per range. The IoMgr will throttle down to - // one but if there are already buffers queued before memory pressure was hit, we can't - // reclaim that memory. - if (per_type_files_[THdfsFileFormat::PARQUET].size() > 0) { - // Parquet files require buffers per column - scanner_thread_bytes_required_ = - materialized_slots_.size() * 3 * runtime_state_->io_mgr()->max_buffer_size(); - } else { - scanner_thread_bytes_required_ = - 3 * runtime_state_->io_mgr()->max_buffer_size(); - } - // scanner_thread_bytes_required_ now contains the IoBuffer requirement. - // Next we add in the other memory the scanner thread will use. - // e.g. decompression buffers, tuple buffers, etc. - // For compressed text, we estimate this based on the file size (since the whole file - // will need to be decompressed at once). For all other formats, we use a constant. - // TODO: can we do something better? - int64_t scanner_thread_mem_usage = SCANNER_THREAD_MEM_USAGE; - for (HdfsFileDesc* file: per_type_files_[THdfsFileFormat::TEXT]) { - if (file->file_compression != THdfsCompression::NONE) { - int64_t bytes_required = file->file_length * COMPRESSED_TEXT_COMPRESSION_RATIO; - scanner_thread_mem_usage = ::max(bytes_required, scanner_thread_mem_usage); - } - } - scanner_thread_bytes_required_ += scanner_thread_mem_usage; row_batches_get_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueueGetWaitTime"); row_batches_put_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueuePutWaitTime"); return Status::OK(); @@ -219,10 +174,9 @@ Status HdfsScanNode::Open(RuntimeState* state) { max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads; } DCHECK_GT(max_num_scanner_threads_, 0); - + spare_reservation_.Store(buffer_pool_client_.GetReservation()); thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb( bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1)); - return Status::OK(); } @@ -263,37 +217,28 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges, return Status::OK(); } -// For controlling the amount of memory used for scanners, we approximate the -// scanner mem usage based on scanner_thread_bytes_required_, rather than the -// consumption in the scan node's mem tracker. The problem with the scan node -// trackers is that it does not account for the memory the scanner will use. -// For example, if there is 110 MB of memory left (based on the mem tracker) -// and we estimate that a scanner will use 100MB, we want to make sure to only -// start up one additional thread. However, after starting the first thread, the -// mem tracker value will not change immediately (it takes some time before the -// scanner is running and starts using memory). Therefore we just use the estimate -// based on the number of running scanner threads. -bool HdfsScanNode::EnoughMemoryForScannerThread(bool new_thread) { - int64_t committed_scanner_mem = - active_scanner_thread_counter_.value() * scanner_thread_bytes_required_; - int64_t tracker_consumption = mem_tracker()->consumption(); - int64_t est_additional_scanner_mem = committed_scanner_mem - tracker_consumption; - if (est_additional_scanner_mem < 0) { - // This is the case where our estimate was too low. Expand the estimate based - // on the usage. - int64_t avg_consumption = - tracker_consumption / active_scanner_thread_counter_.value(); - // Take the average and expand it by 50%. Some scanners will not have hit their - // steady state mem usage yet. - // TODO: how can we scale down if we've overestimated. - // TODO: better heuristic? - scanner_thread_bytes_required_ = static_cast<int64_t>(avg_consumption * 1.5); - est_additional_scanner_mem = 0; - } +bool HdfsScanNode::EnoughReservationForExtraThread(const unique_lock<mutex>& lock) { + DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); + if (spare_reservation_.Load() >= ideal_scan_range_reservation_) return true; + int64_t increase = ideal_scan_range_reservation_ - spare_reservation_.Load(); + if (!buffer_pool_client_.IncreaseReservation(increase)) return false; + spare_reservation_.Add(increase); + return true; +} - // If we are starting a new thread, take that into account now. - if (new_thread) est_additional_scanner_mem += scanner_thread_bytes_required_; - return est_additional_scanner_mem < mem_tracker()->SpareCapacity(); +int64_t HdfsScanNode::DeductReservationForScannerThread(const unique_lock<mutex>& lock, + bool first_thread) { + DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); + int64_t amount; + if (first_thread) { + amount = spare_reservation_.Load() >= ideal_scan_range_reservation_ ? + ideal_scan_range_reservation_ : resource_profile_.min_reservation; + } else { + amount = ideal_scan_range_reservation_; + } + int64_t remainder = spare_reservation_.Add(-amount); + DCHECK_GE(remainder, 0); + return amount; } void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) { @@ -323,36 +268,45 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) // TODO: This still leans heavily on starvation-free locks, come up with a more // correct way to communicate between this method and ScannerThreadHelper unique_lock<mutex> lock(lock_); + + const int64_t num_active_scanner_threads = active_scanner_thread_counter_.value(); + const bool first_thread = num_active_scanner_threads == 0; // Cases 1, 2, 3. if (done_ || all_ranges_started_ || - active_scanner_thread_counter_.value() >= progress_.remaining()) { + num_active_scanner_threads >= progress_.remaining()) { break; } // Cases 5 and 6. - if (active_scanner_thread_counter_.value() > 0 && + if (!first_thread && (materialized_row_batches_->Size() >= max_materialized_row_batches_ || - !EnoughMemoryForScannerThread(true))) { + !EnoughReservationForExtraThread(lock))) { break; } // Case 7 and 8. - bool is_reserved = false; - if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ || - !pool->TryAcquireThreadToken(&is_reserved)) { + if (num_active_scanner_threads >= max_num_scanner_threads_ || + !pool->TryAcquireThreadToken()) { break; } + // Deduct the reservation. We haven't dropped the lock since the + // first_thread/EnoughReservationForExtraThread() checks so spare reservation + // must be available. + int64_t scanner_thread_reservation = + DeductReservationForScannerThread(lock, first_thread); COUNTER_ADD(&active_scanner_thread_counter_, 1); string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)", PrintId(runtime_state_->fragment_instance_id()), id(), num_scanner_threads_started_counter_->value()); - - auto fn = [this]() { this->ScannerThread(); }; + auto fn = [this, scanner_thread_reservation]() { + this->ScannerThread(scanner_thread_reservation); + }; std::unique_ptr<Thread> t; status = Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true); if (!status.ok()) { + ReturnReservationFromScannerThread(scanner_thread_reservation); COUNTER_ADD(&active_scanner_thread_counter_, -1); // Release the token and skip running callbacks to find a replacement. Skipping // serves two purposes. First, it prevents a mutual recursion between this function @@ -373,7 +327,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) } } -void HdfsScanNode::ScannerThread() { +void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) { SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters()); SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics()); DiskIoMgr* io_mgr = runtime_state_->io_mgr(); @@ -402,8 +356,7 @@ void HdfsScanNode::ScannerThread() { // this thread. unique_lock<mutex> l(lock_); if (active_scanner_thread_counter_.value() > 1) { - if (runtime_state_->resource_pool()->optional_exceeded() || - !EnoughMemoryForScannerThread(false)) { + if (runtime_state_->resource_pool()->optional_exceeded()) { // We can't break here. We need to update the counter with the lock held or else // all threads might see active_scanner_thread_counter_.value > 1 COUNTER_ADD(&active_scanner_thread_counter_, -1); @@ -438,12 +391,13 @@ void HdfsScanNode::ScannerThread() { if (status.ok() && scan_range != nullptr) { if (needs_buffers) { status = io_mgr->AllocateBuffersForRange( - reader_context_.get(), scan_range, 3 * io_mgr->max_buffer_size()); + reader_context_.get(), &buffer_pool_client_, scan_range, + scanner_thread_reservation); } if (status.ok()) { // Got a scan range. Process the range end to end (in this thread). status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(), - &expr_results_pool, scan_range); + &expr_results_pool, scan_range, scanner_thread_reservation); } } @@ -483,6 +437,7 @@ void HdfsScanNode::ScannerThread() { COUNTER_ADD(&active_scanner_thread_counter_, -1); exit: + ReturnReservationFromScannerThread(scanner_thread_reservation); runtime_state_->resource_pool()->ReleaseThreadToken(false); for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_); filter_mem_pool.FreeAll(); @@ -490,7 +445,8 @@ exit: } Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, - MemPool* expr_results_pool, ScanRange* scan_range) { + MemPool* expr_results_pool, ScanRange* scan_range, + int64_t scanner_thread_reservation) { DCHECK(scan_range != NULL); ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data()); @@ -515,8 +471,9 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, return Status::OK(); } - ScannerContext context( - runtime_state_, this, partition, scan_range, filter_ctxs, expr_results_pool); + ScannerContext context(runtime_state_, this, &buffer_pool_client_, partition, + filter_ctxs, expr_results_pool); + context.AddStream(scan_range, scanner_thread_reservation); scoped_ptr<HdfsScanner> scanner; Status status = CreateAndOpenScanner(partition, &context, &scanner); if (!status.ok()) { http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/hdfs-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index a1c97cf..81e826e 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -27,6 +27,7 @@ #include <boost/scoped_ptr.hpp> #include <boost/thread/mutex.hpp> +#include "common/atomic.h" #include "exec/filter-context.h" #include "exec/hdfs-scan-node-base.h" #include "runtime/io/disk-io-mgr.h" @@ -58,8 +59,14 @@ class TPlanNode; /// 5. The scanner finishes the scan range and informs the scan node so it can track /// end of stream. /// -/// TODO: This class allocates a bunch of small utility objects that should be -/// recycled. +/// Buffer management: +/// ------------------ +/// The different scanner threads all allocate I/O buffers from the node's Buffer Pool +/// client. The scan node ensures that enough reservation is available to start a +/// scanner thread before launching each one with (see +/// EnoughReservationForExtraThread()), after which the scanner thread is responsible +/// for staying within the reservation handed off to it. +/// /// TODO: Remove this class once the fragment-based multi-threaded execution is /// fully functional. class HdfsScanNode : public HdfsScanNodeBase { @@ -100,12 +107,7 @@ class HdfsScanNode : public HdfsScanNodeBase { private: /// Released when initial ranges are issued in the first call to GetNext(). - CountingBarrier ranges_issued_barrier_; - - /// The estimated memory required to start up a new scanner thread. If the memory - /// left (due to limits) is less than this value, we won't start up optional - /// scanner threads. - int64_t scanner_thread_bytes_required_; + CountingBarrier ranges_issued_barrier_{1}; /// Thread group for all scanner worker threads ThreadGroup scanner_threads_; @@ -130,16 +132,16 @@ class HdfsScanNode : public HdfsScanNodeBase { /// are finished, an error/cancellation occurred, or the limit was reached. /// Setting this to true triggers the scanner threads to clean up. /// This should not be explicitly set. Instead, call SetDone(). - bool done_; + bool done_ = false; /// Set to true if all ranges have started. Some of the ranges may still be in flight /// being processed by scanner threads, but no new ScannerThreads should be started. - bool all_ranges_started_; + bool all_ranges_started_ = false; /// The id of the callback added to the thread resource manager when thread token /// is available. Used to remove the callback before this scan node is destroyed. /// -1 if no callback is registered. - int thread_avail_cb_id_; + int thread_avail_cb_id_ = -1; /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query /// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads @@ -147,6 +149,14 @@ class HdfsScanNode : public HdfsScanNodeBase { /// the number of cores. int max_num_scanner_threads_; + /// Amount of the 'buffer_pool_client_' reservation that is not allocated to scanner + /// threads. Doled out to scanner threads when they are started and returned when + /// those threads no longer need it. Can be atomically incremented without holding + /// 'lock_' but 'lock_' is held when decrementing to ensure that the check for + /// reservation and the actual deduction are atomic with respect to other threads + /// trying to claim reservation. + AtomicInt64 spare_reservation_{0}; + /// The wait time for fetching a row batch from the row batch queue. RuntimeProfile::Counter* row_batches_get_timer_; @@ -160,21 +170,35 @@ class HdfsScanNode : public HdfsScanNodeBase { /// Main function for scanner thread. This thread pulls the next range to be /// processed from the IoMgr and then processes the entire range end to end. /// This thread terminates when all scan ranges are complete or an error occurred. - void ScannerThread(); + /// The caller must have reserved 'scanner_thread_reservation' bytes of memory for + /// this thread with DeductReservationForScannerThread(). + void ScannerThread(int64_t scanner_thread_reservation); /// Process the entire scan range with a new scanner object. Executed in scanner /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows /// in this split. Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs, - MemPool* expr_results_pool, io::ScanRange* scan_range) WARN_UNUSED_RESULT; - - /// Returns true if there is enough memory (against the mem tracker limits) to - /// have a scanner thread. - /// If new_thread is true, the calculation is for starting a new scanner thread. - /// If false, it determines whether there's adequate memory for the existing - /// set of scanner threads. - /// lock_ must be taken before calling this. - bool EnoughMemoryForScannerThread(bool new_thread); + MemPool* expr_results_pool, io::ScanRange* scan_range, + int64_t scanner_thread_reservation) WARN_UNUSED_RESULT; + + /// Return true if there is enough reservation to start an extra scanner thread. + /// Tries to increase reservation if enough is not already available in + /// 'spare_reservation_'. 'lock_' must be held via 'lock' + bool EnoughReservationForExtraThread(const boost::unique_lock<boost::mutex>& lock); + + /// Deduct reservation to start a new scanner thread from 'spare_reservation_'. If + /// 'first_thread' is true, this is the first thread to be started and only the + /// minimum reservation is required to be available. Otherwise + /// EnoughReservationForExtra() thread must have returned true in the current + /// critical section so that 'ideal_scan_range_bytes_' is available for the extra + /// thread. Returns the amount deducted. 'lock_' must be held via 'lock'. + int64_t DeductReservationForScannerThread(const boost::unique_lock<boost::mutex>& lock, + bool first_thread); + + /// Called by scanner thread to return or all of its reservation that is not needed. + void ReturnReservationFromScannerThread(int64_t bytes) { + spare_reservation_.Add(bytes); + } /// Checks for eos conditions and returns batches from materialized_row_batches_. Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos) http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/parquet-column-readers.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc index 099fdce..153b24a 100644 --- a/be/src/exec/parquet-column-readers.cc +++ b/be/src/exec/parquet-column-readers.cc @@ -30,6 +30,7 @@ #include "exec/scanner-context.inline.h" #include "rpc/thrift-util.h" #include "runtime/collection-value-builder.h" +#include "runtime/exec-env.h" #include "runtime/tuple-row.h" #include "runtime/tuple.h" #include "runtime/runtime-state.h" @@ -47,9 +48,12 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false, "When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will " "be converted from UTC to local time. Writes are unaffected."); -// Throttle deprecation warnings to - only print warning with this frequency. static const int BITPACKED_DEPRECATION_WARNING_FREQUENCY = 100; +// Max dictionary page header size in bytes. This is an estimate and only needs to be an +// upper bound. +static const int MAX_DICT_HEADER_SIZE = 100; + // Max data page header size in bytes. This is an estimate and only needs to be an upper // bound. It is theoretically possible to have a page header of any size due to string // value statistics, but in practice we'll have trouble reading string values this large. @@ -69,6 +73,8 @@ static int debug_count = 0; #define SHOULD_TRIGGER_DEBUG_ACTION(x) (false) #endif +using namespace impala::io; + namespace impala { const string PARQUET_COL_MEM_LIMIT_EXCEEDED = @@ -848,8 +854,99 @@ static bool RequiresSkippedDictionaryHeaderCheck( return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal); } +Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc, + const parquet::ColumnChunk& col_chunk, int row_group_idx) { + num_buffered_values_ = 0; + data_ = nullptr; + data_end_ = nullptr; + stream_ = nullptr; + io_reservation_ = 0; + metadata_ = &col_chunk.meta_data; + num_values_read_ = 0; + def_level_ = -1; + // See ColumnReader constructor. + rep_level_ = max_rep_level() == 0 ? 0 : -1; + pos_current_value_ = -1; + + if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) { + RETURN_IF_ERROR(Codec::CreateDecompressor( + nullptr, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_)); + } + int64_t col_start = col_chunk.meta_data.data_page_offset; + + RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_, + parent_->filename(), row_group_idx, col_idx(), schema_element(), + parent_->state_)); + + if (col_chunk.meta_data.__isset.dictionary_page_offset) { + // Already validated in ValidateColumnOffsets() + DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start); + col_start = col_chunk.meta_data.dictionary_page_offset; + } + int64_t col_len = col_chunk.meta_data.total_compressed_size; + if (col_len <= 0) { + return Status(Substitute("File '$0' contains invalid column chunk size: $1", + filename(), col_len)); + } + int64_t col_end = col_start + col_len; + + // Already validated in ValidateColumnOffsets() + DCHECK_GT(col_end, 0); + DCHECK_LT(col_end, file_desc.file_length); + const ParquetFileVersion& file_version = parent_->file_version_; + if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 9)) { + // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the + // dictionary page header size in total_compressed_size and total_uncompressed_size + // (see IMPALA-694). We pad col_len to compensate. + int64_t bytes_remaining = file_desc.file_length - col_end; + int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining); + col_len += pad; + } + + // TODO: this will need to change when we have co-located files and the columns + // are different files. + if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) { + return Status(Substitute("Expected parquet column file path '$0' to match " + "filename '$1'", col_chunk.file_path, filename())); + } + + const ScanRange* metadata_range = parent_->metadata_range_; + int64_t partition_id = parent_->context_->partition_descriptor()->id(); + const ScanRange* split_range = + static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split; + // Determine if the column is completely contained within a local split. + bool col_range_local = split_range->expected_local() + && col_start >= split_range->offset() + && col_end <= split_range->offset() + split_range->len(); + scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(), + filename(), col_len, col_start, partition_id, split_range->disk_id(), + col_range_local, + BufferOpts(split_range->try_cache(), file_desc.mtime)); + ClearDictionaryDecoder(); + return Status::OK(); +} + +Status BaseScalarColumnReader::StartScan() { + DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan."; + DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); + ScannerContext* context = parent_->context_; + DCHECK_GT(io_reservation_, 0); + bool needs_buffers; + RETURN_IF_ERROR(io_mgr->StartScanRange( + parent_->scan_node_->reader_context(), scan_range_, &needs_buffers)); + if (needs_buffers) { + RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange( + parent_->scan_node_->reader_context(), context->bp_client(), + scan_range_, io_reservation_)); + } + stream_ = parent_->context_->AddStream(scan_range_, io_reservation_); + DCHECK(stream_ != nullptr); + return Status::OK(); +} + Status BaseScalarColumnReader::ReadPageHeader(bool peek, parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* eos) { + DCHECK(stream_ != nullptr); *eos = false; uint8_t* buffer; @@ -933,7 +1030,7 @@ Status BaseScalarColumnReader::InitDictionary() { bool eos; parquet::PageHeader next_page_header; uint32_t next_header_size; - + DCHECK(stream_ != nullptr); DCHECK(!HasDictionaryDecoder()); RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header, @@ -1045,6 +1142,14 @@ Status BaseScalarColumnReader::InitDictionary() { return Status::OK(); } +Status BaseScalarColumnReader::InitDictionaries( + const vector<BaseScalarColumnReader*> readers) { + for (BaseScalarColumnReader* reader : readers) { + RETURN_IF_ERROR(reader->InitDictionary()); + } + return Status::OK(); +} + Status BaseScalarColumnReader::ReadDataPage() { // We're about to move to the next data page. The previous data page is // now complete, free up any memory allocated for it. If the data page contained http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index 3a8ad70..bfb20ec 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -326,42 +326,29 @@ class BaseScalarColumnReader : public ParquetColumnReader { BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc) : ParquetColumnReader(parent, node, slot_desc), - data_(NULL), - data_end_(NULL), - def_levels_(true), - rep_levels_(false), - page_encoding_(parquet::Encoding::PLAIN_DICTIONARY), - num_buffered_values_(0), - num_values_read_(0), - metadata_(NULL), - stream_(NULL), data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) { DCHECK_GE(node_.col_idx, 0) << node_.DebugString(); } virtual ~BaseScalarColumnReader() { } - /// This is called once for each row group in the file. - Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) { - DCHECK(stream != NULL); - DCHECK(metadata != NULL); - - num_buffered_values_ = 0; - data_ = NULL; - data_end_ = NULL; - stream_ = stream; - metadata_ = metadata; - num_values_read_ = 0; - def_level_ = -1; - // See ColumnReader constructor. - rep_level_ = max_rep_level() == 0 ? 0 : -1; - pos_current_value_ = -1; - - if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) { - RETURN_IF_ERROR(Codec::CreateDecompressor( - NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_)); - } - ClearDictionaryDecoder(); + /// Resets the reader for each row group in the file and creates the scan + /// range for the column, but does not start it. To start scanning, + /// set_io_reservation() must be called to assign reservation to this + /// column, followed by StartScan(). + Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk, + int row_group_idx); + + /// Starts the column scan range. The reader must be Reset() and have a + /// reservation assigned via set_io_reservation(). This must be called + /// before any of the column data can be read (including dictionary and + /// data pages). Returns an error status if there was an error starting the + /// scan or allocating buffers for it. + Status StartScan(); + + /// Helper to start scans for multiple columns at once. + static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) { + for (BaseScalarColumnReader* reader : readers) RETURN_IF_ERROR(reader->StartScan()); return Status::OK(); } @@ -376,22 +363,27 @@ class BaseScalarColumnReader : public ParquetColumnReader { if (dict_decoder != nullptr) dict_decoder->Close(); } + io::ScanRange* scan_range() const { return scan_range_; } int64_t total_len() const { return metadata_->total_compressed_size; } int col_idx() const { return node_.col_idx; } THdfsCompression::type codec() const { if (metadata_ == NULL) return THdfsCompression::NONE; return PARQUET_TO_IMPALA_CODEC[metadata_->codec]; } + void set_io_reservation(int bytes) { io_reservation_ = bytes; } /// Reads the next definition and repetition levels for this column. Initializes the /// next data page if necessary. virtual bool NextLevels() { return NextLevels<true>(); } - // Check the data stream to see if there is a dictionary page. If there is, - // use that page to initialize dict_decoder_ and advance the data stream - // past the dictionary page. + /// Check the data stream to see if there is a dictionary page. If there is, + /// use that page to initialize dict_decoder_ and advance the data stream + /// past the dictionary page. Status InitDictionary(); + /// Convenience function to initialize multiple dictionaries. + static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> readers); + // Returns the dictionary or NULL if the dictionary doesn't exist virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; } @@ -417,33 +409,45 @@ class BaseScalarColumnReader : public ParquetColumnReader { // fit in as few cache lines as possible. /// Pointer to start of next value in data page - uint8_t* data_; + uint8_t* data_ = nullptr; /// End of the data page. - const uint8_t* data_end_; + const uint8_t* data_end_ = nullptr; /// Decoder for definition levels. - ParquetLevelDecoder def_levels_; + ParquetLevelDecoder def_levels_{true}; /// Decoder for repetition levels. - ParquetLevelDecoder rep_levels_; + ParquetLevelDecoder rep_levels_{false}; /// Page encoding for values of the current data page. Cached here for perf. Set in /// InitDataPage(). - parquet::Encoding::type page_encoding_; + parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY; /// Num values remaining in the current data page - int num_buffered_values_; + int num_buffered_values_ = 0; // Less frequently used members that are not accessed in inner loop should go below // here so they do not occupy precious cache line space. /// The number of values seen so far. Updated per data page. - int64_t num_values_read_; + int64_t num_values_read_ = 0; + + /// Metadata for the column for the current row group. + const parquet::ColumnMetaData* metadata_ = nullptr; - const parquet::ColumnMetaData* metadata_; boost::scoped_ptr<Codec> decompressor_; - ScannerContext::Stream* stream_; + + /// The scan range for the column's data. Initialized for each row group by Reset(). + io::ScanRange* scan_range_ = nullptr; + + // Stream used to read data from 'scan_range_'. Initialized by StartScan(). + ScannerContext::Stream* stream_ = nullptr; + + /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set + /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group + /// by Reset(). + int64_t io_reservation_ = 0; /// Pool to allocate storage for data pages from - either decompression buffers for /// compressed data pages or copies of the data page with var-len data to attach to http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 0abf82f..c669e65 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -41,14 +41,15 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024; const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT; ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node, - HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range, - const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool) + BufferPool::ClientHandle* bp_client, HdfsPartitionDescriptor* partition_desc, + const vector<FilterContext>& filter_ctxs, + MemPool* expr_results_pool) : state_(state), scan_node_(scan_node), + bp_client_(bp_client), partition_desc_(partition_desc), filter_ctxs_(filter_ctxs), expr_results_pool_(expr_results_pool) { - AddStream(scan_range); } ScannerContext::~ScannerContext() { @@ -66,19 +67,20 @@ void ScannerContext::ClearStreams() { } ScannerContext::Stream::Stream(ScannerContext* parent, ScanRange* scan_range, - const HdfsFileDesc* file_desc) + int64_t reservation, const HdfsFileDesc* file_desc) : parent_(parent), scan_range_(scan_range), file_desc_(file_desc), + reservation_(reservation), file_len_(file_desc->file_length), next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES), boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())), boundary_buffer_(new StringBuffer(boundary_pool_.get())) { } -ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) { - streams_.emplace_back(new Stream( - this, range, scan_node_->GetFileDesc(partition_desc_->id(), range->file()))); +ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range, int64_t reservation) { + streams_.emplace_back(new Stream(this, range, reservation, + scan_node_->GetFileDesc(partition_desc_->id(), range->file()))); return streams_.back().get(); } @@ -101,7 +103,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool done) { Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { DCHECK_EQ(0, io_buffer_bytes_left_); - DiskIoMgr* io_mgr = parent_->state_->io_mgr(); + DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED; if (io_buffer_ != nullptr) ReturnIoBuffer(); @@ -134,6 +136,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { read_past_buffer_size = ::max(read_past_buffer_size, read_past_size); read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining); read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size); + read_past_buffer_size = ::min(read_past_buffer_size, reservation_); // We're reading past the scan range. Be careful not to read past the end of file. DCHECK_GE(read_past_buffer_size, 0); if (read_past_buffer_size == 0) { @@ -150,9 +153,14 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { if (needs_buffers) { // Allocate fresh buffers. The buffers for 'scan_range_' should be released now // since we hit EOS. + if (reservation_ < io_mgr->min_buffer_size()) { + return Status(Substitute("Could not read past end of scan range in file '$0'. " + "Reservation provided $1 was < the minimum I/O buffer size", + reservation_, io_mgr->min_buffer_size())); + } RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange( - parent_->scan_node_->reader_context(), range, - 3 * io_mgr->max_buffer_size())); + parent_->scan_node_->reader_context(), parent_->bp_client_, range, + reservation_)); } RETURN_IF_ERROR(range->GetNext(&io_buffer_)); DCHECK(io_buffer_->eosr()); http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/exec/scanner-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index a131d3f..6292486 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -27,6 +27,7 @@ #include "common/compiler-util.h" #include "common/status.h" #include "exec/filter-context.h" +#include "runtime/bufferpool/buffer-pool.h" #include "runtime/io/request-ranges.h" namespace impala { @@ -84,10 +85,12 @@ class TupleRow; class ScannerContext { public: /// Create a scanner context with the parent scan_node (where materialized row batches - /// get pushed to) and the scan range to process. - /// This context starts with 1 stream. - ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*, - io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs, + /// get pushed to) and the scan range to process. Buffers are allocated using + /// 'bp_client'. + ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node, + BufferPool::ClientHandle* bp_client, + HdfsPartitionDescriptor* partition_desc, + const std::vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool); /// Destructor verifies that all stream objects have been released. ~ScannerContext(); @@ -150,6 +153,7 @@ class ScannerContext { const char* filename() { return scan_range_->file(); } const io::ScanRange* scan_range() { return scan_range_; } const HdfsFileDesc* file_desc() { return file_desc_; } + int64_t reservation() const { return reservation_; } /// Returns the buffer's current offset in the file. int64_t file_offset() const { return scan_range_->offset() + total_bytes_returned_; } @@ -211,9 +215,15 @@ class ScannerContext { private: friend class ScannerContext; - ScannerContext* parent_; - io::ScanRange* scan_range_; - const HdfsFileDesc* file_desc_; + ScannerContext* const parent_; + io::ScanRange* const scan_range_; + const HdfsFileDesc* const file_desc_; + + /// Reservation given to this stream for allocating I/O buffers. The reservation is + /// shared with 'scan_range_', so the context must be careful not to use this until + /// all of 'scan_ranges_'s buffers have been freed. Must be >= the minimum IoMgr + /// buffer size to allow reading past the end of 'scan_range_'. + const int64_t reservation_; /// Total number of bytes returned from GetBytes() int64_t total_bytes_returned_ = 0; @@ -272,7 +282,8 @@ class ScannerContext { /// output_buffer_bytes_left_ will be set to something else. static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0; - Stream(ScannerContext* parent, io::ScanRange* scan_range, + /// Private constructor. See AddStream() for public API. + Stream(ScannerContext* parent, io::ScanRange* scan_range, int64_t reservation, const HdfsFileDesc* file_desc); /// GetBytes helper to handle the slow path. @@ -355,24 +366,37 @@ class ScannerContext { /// size to 0. void ClearStreams(); - /// Add a stream to this ScannerContext for 'range'. The stream is owned by this - /// context. - Stream* AddStream(io::ScanRange* range); + /// Add a stream to this ScannerContext for 'range'. 'range' must already have any + /// buffers that it needs allocated. 'reservation' is the amount of reservation that + /// is given to this stream for allocating I/O buffers. The reservation is shared with + /// 'range', so the context must be careful not to use this until all of 'range's + /// buffers have been freed. Must be >= the minimum IoMgr buffer size o allow reading + /// past the end of 'range'. + /// + /// Returns the added stream. The returned stream is owned by this context. + Stream* AddStream(io::ScanRange* range, int64_t reservation); /// Returns true if RuntimeState::is_cancelled() is true, or if scan node is not /// multi-threaded and is done (finished, cancelled or reached it's limit). /// In all other cases returns false. bool cancelled() const; - HdfsPartitionDescriptor* partition_descriptor() { return partition_desc_; } + BufferPool::ClientHandle* bp_client() const { return bp_client_; } + HdfsPartitionDescriptor* partition_descriptor() const { return partition_desc_; } const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; } MemPool* expr_results_pool() const { return expr_results_pool_; } private: friend class Stream; - RuntimeState* state_; - HdfsScanNodeBase* scan_node_; - HdfsPartitionDescriptor* partition_desc_; + RuntimeState* const state_; + HdfsScanNodeBase* const scan_node_; + + /// Buffer pool client used to allocate I/O buffers. This is accessed by multiple + /// threads in the multi-threaded scan node, so those threads must take care to only + /// call thread-safe BufferPool methods with this client. + BufferPool::ClientHandle* const bp_client_; + + HdfsPartitionDescriptor* const partition_desc_; /// Vector of streams. Non-columnar formats will always have one stream per context. std::vector<std::unique_ptr<Stream>> streams_; http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/bufferpool/reservation-tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc index c46c5ea..e441402 100644 --- a/be/src/runtime/bufferpool/reservation-tracker-test.cc +++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc @@ -522,15 +522,15 @@ TEST_F(ReservationTrackerTest, TransferReservation) { TEST_F(ReservationTrackerTest, ReservationUtil) { const int64_t MEG = 1024 * 1024; const int64_t GIG = 1024 * 1024 * 1024; - EXPECT_EQ(75 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING); + EXPECT_EQ(32 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING); EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(0)); EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(-1)); - EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(75 * MEG)); + EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(32 * MEG)); EXPECT_EQ(8 * GIG, ReservationUtil::GetReservationLimitFromMemLimit(10 * GIG)); - EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0)); - EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1)); + EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0)); + EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1)); EXPECT_EQ(500 * MEG, ReservationUtil::GetMinMemLimitFromReservation(400 * MEG)); EXPECT_EQ(5 * GIG, ReservationUtil::GetMinMemLimitFromReservation(4 * GIG)); http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/bufferpool/reservation-util.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-util.cc b/be/src/runtime/bufferpool/reservation-util.cc index 85718ab..a27ab9d 100644 --- a/be/src/runtime/bufferpool/reservation-util.cc +++ b/be/src/runtime/bufferpool/reservation-util.cc @@ -24,7 +24,7 @@ namespace impala { // Most operators that accumulate memory use reservations, so the majority of memory // should be allocated to buffer reservations, as a heuristic. const double ReservationUtil::RESERVATION_MEM_FRACTION = 0.8; -const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 75 * 1024 * 1024; +const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 32 * 1024 * 1024; int64_t ReservationUtil::GetReservationLimitFromMemLimit(int64_t mem_limit) { int64_t max_reservation = std::min<int64_t>( http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr-stress-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/io/disk-io-mgr-stress-test.cc b/be/src/runtime/io/disk-io-mgr-stress-test.cc index 0e41a6f..2ec1d09 100644 --- a/be/src/runtime/io/disk-io-mgr-stress-test.cc +++ b/be/src/runtime/io/disk-io-mgr-stress-test.cc @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "runtime/io/disk-io-mgr-stress.h" +#include <gflags/gflags.h> #include "common/init.h" +#include "runtime/io/disk-io-mgr-stress.h" +#include "common/init.h" #include "runtime/test-env.h" #include "service/fe-support.h" #include "util/string-parser.h" @@ -31,34 +33,32 @@ using namespace impala::io; // can be passed to control how long to run this test (0 for forever). // TODO: make these configurable once we decide how to run BE tests with args -const int DEFAULT_DURATION_SEC = 1; +constexpr int DEFAULT_DURATION_SEC = 1; const int NUM_DISKS = 5; const int NUM_THREADS_PER_DISK = 5; const int NUM_CLIENTS = 10; const bool TEST_CANCELLATION = true; +const int64_t BUFFER_POOL_CAPACITY = 1024L * 1024L * 1024L * 4L; + +DEFINE_int64(duration_sec, DEFAULT_DURATION_SEC, + "Disk I/O Manager stress test duration in seconds. 0 means run indefinitely."); int main(int argc, char** argv) { - InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST); - InitFeSupport(); - TestEnv test_env; - ABORT_IF_ERROR(test_env.Init()); - int duration_sec = DEFAULT_DURATION_SEC; + impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); + impala::InitFeSupport(); - if (argc == 2) { - StringParser::ParseResult status; - duration_sec = StringParser::StringToInt<int>(argv[1], strlen(argv[1]), &status); - if (status != StringParser::PARSE_SUCCESS) { - printf("Invalid arg: %s\n", argv[1]); - return 1; - } - } - if (duration_sec != 0) { - printf("Running stress test for %d seconds.\n", duration_sec); + if (FLAGS_duration_sec != 0) { + printf("Running stress test for %ld seconds.\n", FLAGS_duration_sec); } else { printf("Running stress test indefinitely.\n"); } - DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION); - test.Run(duration_sec); + TestEnv test_env; + // Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool allows it. + test_env.SetBufferPoolArgs(DiskIoMgrStress::MIN_READ_BUFFER_SIZE, BUFFER_POOL_CAPACITY); + Status status = test_env.Init(); + CHECK(status.ok()) << status.GetDetail(); + DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION); + test.Run(FLAGS_duration_sec); return 0; }