Revert IMPALA-4835 and dependent changes Revert "IMPALA-6585: increase test_low_mem_limit_q21 limit"
This reverts commit 25bcb258dfd712f1514cf188206667a5e6be0e26. Revert "IMPALA-6588: don't add empty list of ranges in text scan" This reverts commit d57fbec6f67b83227b4c6117476da8f7d75fc4f6. Revert "IMPALA-4835: Part 3: switch I/O buffers to buffer pool" This reverts commit 24b4ed0b29a44090350e630d625291c01b753a36. Revert "IMPALA-4835: Part 2: Allocate scan range buffers upfront" This reverts commit 5699b59d0c5cbe37e888a367adb42fa12dfb0916. Revert "IMPALA-4835: Part 1: simplify I/O mgr mem mgmt and cancellation" This reverts commit 65680dc42107db4ff2273c635cedf83d20f0ea94. Change-Id: Ie5ca451cd96602886b0a8ecaa846957df0269cbb Reviewed-on: http://gerrit.cloudera.org:8080/9480 Reviewed-by: Dan Hecht <dhe...@cloudera.com> Tested-by: Impala Public Jenkins Reviewed-on: http://gerrit.cloudera.org:8080/9485 Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e5689fb5 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e5689fb5 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e5689fb5 Branch: refs/heads/2.x Commit: e5689fb5c6a011af9e526effa9aa5399b194e977 Parents: 42c604e Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Fri Mar 2 16:09:25 2018 -0800 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Sat Mar 3 22:07:15 2018 +0000 ---------------------------------------------------------------------- be/src/exec/CMakeLists.txt | 1 - be/src/exec/base-sequence-scanner.cc | 1 - be/src/exec/base-sequence-scanner.h | 3 +- be/src/exec/hdfs-lzo-text-scanner.cc | 1 - be/src/exec/hdfs-parquet-scanner-test.cc | 96 - be/src/exec/hdfs-parquet-scanner.cc | 234 +-- be/src/exec/hdfs-parquet-scanner.h | 44 +- be/src/exec/hdfs-scan-node-base.cc | 95 +- be/src/exec/hdfs-scan-node-base.h | 3 - be/src/exec/hdfs-scan-node-mt.cc | 20 +- be/src/exec/hdfs-scan-node.cc | 172 +- be/src/exec/hdfs-scan-node.h | 66 +- be/src/exec/hdfs-text-scanner.cc | 6 +- be/src/exec/parquet-column-readers.cc | 108 +- be/src/exec/parquet-column-readers.h | 88 +- be/src/exec/scanner-context.cc | 42 +- be/src/exec/scanner-context.h | 54 +- be/src/runtime/bufferpool/buffer-pool.h | 1 - .../bufferpool/reservation-tracker-test.cc | 8 +- be/src/runtime/bufferpool/reservation-util.cc | 2 +- be/src/runtime/exec-env.cc | 7 +- be/src/runtime/io/disk-io-mgr-internal.h | 16 - be/src/runtime/io/disk-io-mgr-stress-test.cc | 43 +- be/src/runtime/io/disk-io-mgr-stress.cc | 89 +- be/src/runtime/io/disk-io-mgr-stress.h | 26 +- be/src/runtime/io/disk-io-mgr-test.cc | 849 ++++---- be/src/runtime/io/disk-io-mgr.cc | 716 +++++-- be/src/runtime/io/disk-io-mgr.h | 383 ++-- be/src/runtime/io/request-context.cc | 239 +-- be/src/runtime/io/request-context.h | 318 ++- be/src/runtime/io/request-ranges.h | 196 +- be/src/runtime/io/scan-range.cc | 309 ++- be/src/runtime/mem-tracker.h | 1 + be/src/runtime/test-env.cc | 2 +- be/src/runtime/tmp-file-mgr-test.cc | 3 +- be/src/runtime/tmp-file-mgr.cc | 18 +- be/src/runtime/tmp-file-mgr.h | 17 +- be/src/util/bit-util-test.cc | 11 - be/src/util/bit-util.h | 8 +- be/src/util/impalad-metrics.cc | 13 +- be/src/util/impalad-metrics.h | 9 + common/thrift/PlanNodes.thrift | 3 - .../apache/impala/analysis/SlotDescriptor.java | 19 - .../org/apache/impala/analysis/SlotRef.java | 20 + .../org/apache/impala/planner/HdfsScanNode.java | 167 +- .../java/org/apache/impala/util/BitUtil.java | 6 - .../org/apache/impala/util/BitUtilTest.java | 6 - .../queries/PlannerTest/constant-folding.test | 42 +- .../queries/PlannerTest/disable-codegen.test | 20 +- .../PlannerTest/fk-pk-join-detection.test | 78 +- .../queries/PlannerTest/max-row-size.test | 80 +- .../PlannerTest/min-max-runtime-filters.test | 6 +- .../queries/PlannerTest/mt-dop-validation.test | 40 +- .../queries/PlannerTest/parquet-filtering.test | 42 +- .../queries/PlannerTest/partition-pruning.test | 4 +- .../PlannerTest/resource-requirements.test | 1814 ++++-------------- .../PlannerTest/sort-expr-materialization.test | 32 +- .../PlannerTest/spillable-buffer-sizing.test | 192 +- .../queries/PlannerTest/tablesample.test | 44 +- .../queries/PlannerTest/union.test | 8 +- .../admission-reject-min-reservation.test | 12 +- .../queries/QueryTest/analytic-fns.test | 5 +- .../queries/QueryTest/codegen-mem-limit.test | 5 +- .../QueryTest/disk-spill-encryption.test | 2 +- .../queries/QueryTest/explain-level0.test | 4 +- .../queries/QueryTest/explain-level1.test | 4 +- .../queries/QueryTest/explain-level2.test | 14 +- .../queries/QueryTest/explain-level3.test | 14 +- .../queries/QueryTest/nested-types-tpch.test | 6 +- .../queries/QueryTest/runtime_row_filters.test | 11 +- .../queries/QueryTest/scanners.test | 7 - .../functional-query/queries/QueryTest/set.test | 2 +- .../queries/QueryTest/spilling-aggs.test | 19 +- .../spilling-naaj-no-deny-reservation.test | 7 +- .../queries/QueryTest/spilling-naaj.test | 8 +- .../QueryTest/spilling-no-debug-action.test | 66 - .../QueryTest/spilling-sorts-exhaustive.test | 10 +- .../queries/QueryTest/spilling.test | 76 +- .../queries/QueryTest/stats-extrapolation.test | 52 +- .../tpch/queries/sort-reservation-usage.test | 9 +- tests/common/test_dimensions.py | 16 +- tests/custom_cluster/test_scratch_disk.py | 2 +- tests/query_test/test_mem_usage_scaling.py | 11 +- tests/query_test/test_query_mem_limit.py | 4 +- tests/query_test/test_scanners.py | 25 +- tests/query_test/test_scanners_fuzz.py | 9 +- tests/query_test/test_sort.py | 16 +- tests/query_test/test_spilling.py | 5 - 88 files changed, 2799 insertions(+), 4563 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index c1f91d6..aab1383 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -108,4 +108,3 @@ ADD_BE_TEST(parquet-version-test) ADD_BE_TEST(row-batch-list-test) ADD_BE_TEST(incr-stats-util-test) ADD_BE_TEST(hdfs-avro-scanner-test) -ADD_BE_TEST(hdfs-parquet-scanner-test) http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/base-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index 9d95b0b..9cb6330 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -46,7 +46,6 @@ static const int MIN_SYNC_READ_SIZE = 64 * 1024; // bytes Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, const vector<HdfsFileDesc*>& files) { - DCHECK(!files.empty()); // Issue just the header range for each file. When the header is complete, // we'll issue the splits for that file. Splits cannot be processed until the // header is parsed (the header object is then shared across splits for that file). http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/base-sequence-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.h b/be/src/exec/base-sequence-scanner.h index 3c2326e..887ff6f 100644 --- a/be/src/exec/base-sequence-scanner.h +++ b/be/src/exec/base-sequence-scanner.h @@ -47,8 +47,7 @@ class ScannerContext; /// situation, causing the block to be incorrectly skipped. class BaseSequenceScanner : public HdfsScanner { public: - /// Issue the initial ranges for all sequence container files. 'files' must not be - /// empty. + /// Issue the initial ranges for all sequence container files. static Status IssueInitialRanges(HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files) WARN_UNUSED_RESULT; http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/hdfs-lzo-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-lzo-text-scanner.cc b/be/src/exec/hdfs-lzo-text-scanner.cc index 8af89f2..88ae295 100644 --- a/be/src/exec/hdfs-lzo-text-scanner.cc +++ b/be/src/exec/hdfs-lzo-text-scanner.cc @@ -62,7 +62,6 @@ HdfsScanner* HdfsLzoTextScanner::GetHdfsLzoTextScanner( Status HdfsLzoTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, const vector<HdfsFileDesc*>& files) { - DCHECK(!files.empty()); if (LzoIssueInitialRanges == NULL) { lock_guard<SpinLock> l(lzo_load_lock_); if (library_load_status_.ok()) { http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/hdfs-parquet-scanner-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner-test.cc b/be/src/exec/hdfs-parquet-scanner-test.cc deleted file mode 100644 index cbc6e76..0000000 --- a/be/src/exec/hdfs-parquet-scanner-test.cc +++ /dev/null @@ -1,96 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/hdfs-parquet-scanner.h" -#include "testutil/gtest-util.h" - -#include "common/names.h" - -static const int64_t MIN_BUFFER_SIZE = 64 * 1024; -static const int64_t MAX_BUFFER_SIZE = 8 * 1024 * 1024; - -namespace impala { - -class HdfsParquetScannerTest : public testing::Test { - protected: - void TestDivideReservation(const vector<int64_t>& col_range_lengths, - int64_t total_col_reservation, const vector<int64_t>& expected_reservations); -}; - -/// Test that DivideReservationBetweenColumns() returns 'expected_reservations' for -/// inputs 'col_range_lengths' and 'total_col_reservation'. -void HdfsParquetScannerTest::TestDivideReservation(const vector<int64_t>& col_range_lengths, - int64_t total_col_reservation, const vector<int64_t>& expected_reservations) { - vector<pair<int, int64_t>> reservations = - HdfsParquetScanner::DivideReservationBetweenColumnsHelper( - MIN_BUFFER_SIZE, MAX_BUFFER_SIZE, col_range_lengths, total_col_reservation); - for (int i = 0; i < reservations.size(); ++i) { - LOG(INFO) << i << " " << reservations[i].first << " " << reservations[i].second; - } - EXPECT_EQ(reservations.size(), expected_reservations.size()); - vector<bool> present(expected_reservations.size(), false); - for (auto& reservation: reservations) { - // Ensure that each appears exactly once. - EXPECT_FALSE(present[reservation.first]); - present[reservation.first] = true; - EXPECT_EQ(expected_reservations[reservation.first], reservation.second) - << reservation.first; - } -} - -TEST_F(HdfsParquetScannerTest, DivideReservation) { - // Test a long scan ranges with lots of memory - should allocate 3 max-size - // buffers per range. - TestDivideReservation({100 * 1024 * 1024}, 50 * 1024 * 1024, {3 * MAX_BUFFER_SIZE}); - TestDivideReservation({100 * 1024 * 1024, 50 * 1024 * 1024}, 100 * 1024 * 1024, - {3 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE}); - - // Long scan ranges, not enough memory for 3 buffers each. Should only allocate - // max-sized buffers, preferring the longer scan range. - TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 5 * MAX_BUFFER_SIZE, - {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE}); - TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, - 5 * MAX_BUFFER_SIZE + MIN_BUFFER_SIZE, - {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE}); - TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 6 * MAX_BUFFER_SIZE - 1, - {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE}); - - // Test a short range with lots of memory - should round up buffer size. - TestDivideReservation({100 * 1024}, 50 * 1024 * 1024, {128 * 1024}); - - // Test a range << MIN_BUFFER_SIZE - should round up to buffer size. - TestDivideReservation({13}, 50 * 1024 * 1024, {MIN_BUFFER_SIZE}); - - // Test long ranges with limited memory. - TestDivideReservation({100 * 1024 * 1024}, 100 * 1024, {MIN_BUFFER_SIZE}); - TestDivideReservation({100 * 1024 * 1024}, MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE}); - TestDivideReservation({100 * 1024 * 1024}, 2 * MIN_BUFFER_SIZE, {2 * MIN_BUFFER_SIZE}); - TestDivideReservation({100 * 1024 * 1024}, MAX_BUFFER_SIZE - 1, {MAX_BUFFER_SIZE / 2}); - TestDivideReservation({100 * 1024 * 1024, 1024 * 1024, MIN_BUFFER_SIZE}, - 3 * MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE, MIN_BUFFER_SIZE, MIN_BUFFER_SIZE}); - - // Test a mix of scan range lengths larger than and smaller than the max I/O buffer - // size. Long ranges get allocated most memory. - TestDivideReservation( - {15145047, 5019635, 5019263, 15145047, 15145047, 5019635, 5019263, 317304}, - 25165824, - {8388608, 2097152, 524288, 8388608, 4194304, 1048576, 262144, 262144}); -} - -} - -IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 574ddb0..e279369 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -17,7 +17,6 @@ #include "exec/hdfs-parquet-scanner.h" -#include <algorithm> #include <queue> #include <gutil/strings/substitute.h> @@ -28,7 +27,6 @@ #include "exec/parquet-column-stats.h" #include "exec/scanner-context.inline.h" #include "runtime/collection-value-builder.h" -#include "runtime/exec-env.h" #include "runtime/io/disk-io-mgr.h" #include "runtime/runtime-state.h" #include "runtime/runtime-filter.inline.h" @@ -37,7 +35,6 @@ #include "common/names.h" using std::move; -using std::sort; using namespace impala; using namespace impala::io; @@ -50,6 +47,10 @@ constexpr int BATCHES_PER_FILTER_SELECTIVITY_CHECK = 16; static_assert(BitUtil::IsPowerOf2(BATCHES_PER_FILTER_SELECTIVITY_CHECK), "BATCHES_PER_FILTER_SELECTIVITY_CHECK must be a power of two"); +// Max dictionary page header size in bytes. This is an estimate and only needs to be an +// upper bound. +const int MAX_DICT_HEADER_SIZE = 100; + // Max entries in the dictionary before switching to PLAIN encoding. If a dictionary // has fewer entries, then the entire column is dictionary encoded. This threshold // is guaranteed to be true for Impala versions 2.9 or below. @@ -68,7 +69,6 @@ const string PARQUET_MEM_LIMIT_EXCEEDED = Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, const vector<HdfsFileDesc*>& files) { - DCHECK(!files.empty()); vector<ScanRange*> footer_ranges; for (int i = 0; i < files.size(); ++i) { // If the file size is less than 12 bytes, it is an invalid Parquet file. @@ -98,7 +98,7 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, static_cast<ScanRangeMetadata*>(split->meta_data()); // Each split is processed by first issuing a scan range for the file footer, which // is done here, followed by scan ranges for the columns of each row group within - // the actual split (see InitScalarColumns()). The original split is stored in the + // the actual split (in InitColumns()). The original split is stored in the // metadata associated with the footer range. ScanRange* footer_range; if (footer_split != nullptr) { @@ -121,13 +121,9 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, } } } - // We may not have any scan ranges if this scan node does not have the footer split for - // any Parquet file. - if (footer_ranges.size() > 0) { - // The threads that process the footer will also do the scan, so we mark all the files - // as complete here. - RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size())); - } + // The threads that process the footer will also do the scan, so we mark all the files + // as complete here. + RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size())); return Status::OK(); } @@ -232,14 +228,9 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { // Release I/O buffers immediately to make sure they are cleaned up // in case we return a non-OK status anywhere below. - int64_t stream_reservation = stream_->reservation(); - stream_ = nullptr; context_->ReleaseCompletedResources(true); context_->ClearStreams(); RETURN_IF_ERROR(footer_status); - // The scanner-wide stream was used only to read the file footer. Each column has added - // its own stream. We can use the reservation from 'stream_' for the columns now. - total_col_reservation_ = stream_reservation; // Parse the file schema into an internal representation for schema resolution. schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(), @@ -256,6 +247,10 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()]; RETURN_IF_ERROR(InitDictFilterStructures()); + + // The scanner-wide stream was used only to read the file footer. Each column has added + // its own stream. + stream_ = nullptr; return Status::OK(); } @@ -679,13 +674,15 @@ Status HdfsParquetScanner::NextRowGroup() { } InitCollectionColumns(); - RETURN_IF_ERROR(InitScalarColumns()); - // Start scanning dictionary filtering column readers, so we can read the dictionary - // pages in EvalDictionaryFilters(). - RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(dict_filterable_readers_)); + // Prepare dictionary filtering columns for first read + // This must be done before dictionary filtering, because this code initializes + // the column offsets and streams needed to read the dictionaries. + // TODO: Restructure the code so that the dictionary can be read without the rest + // of the column. + RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, dict_filterable_readers_)); - // StartScans() may have allocated resources to scan columns. If we skip this row + // InitColumns() may have allocated resources to scan columns. If we skip this row // group below, we must call ReleaseSkippedRowGroupResources() before continuing. // If there is a dictionary-encoded column where every value is eliminated @@ -706,10 +703,10 @@ Status HdfsParquetScanner::NextRowGroup() { } // At this point, the row group has passed any filtering criteria - // Start scanning non-dictionary filtering column readers and initialize their - // dictionaries. - RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(non_dict_filterable_readers_)); - status = BaseScalarColumnReader::InitDictionaries(non_dict_filterable_readers_); + // Prepare non-dictionary filtering column readers for first read and + // initialize their dictionaries. + RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, non_dict_filterable_readers_)); + status = InitDictionaries(non_dict_filterable_readers_); if (!status.ok()) { // Either return an error or skip this row group if it is ok to ignore errors RETURN_IF_ERROR(state_->LogOrReturnError(status.msg())); @@ -744,6 +741,7 @@ Status HdfsParquetScanner::NextRowGroup() { break; } } + DCHECK(parse_status_.ok()); return Status::OK(); } @@ -801,7 +799,6 @@ void HdfsParquetScanner::PartitionReaders( } else { BaseScalarColumnReader* scalar_reader = static_cast<BaseScalarColumnReader*>(reader); - scalar_readers_.push_back(scalar_reader); if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) { dict_filterable_readers_.push_back(scalar_reader); } else { @@ -993,7 +990,7 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr // Any columns that were not 100% dictionary encoded need to initialize // their dictionaries here. - RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list)); + RETURN_IF_ERROR(InitDictionaries(deferred_dict_init_list)); return Status::OK(); } @@ -1442,15 +1439,12 @@ Status HdfsParquetScanner::ProcessFooter() { BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size)); unique_ptr<BufferDescriptor> io_buffer; - bool needs_buffers; - RETURN_IF_ERROR(io_mgr->StartScanRange( - scan_node_->reader_context(), metadata_range, &needs_buffers)); - DCHECK(!needs_buffers) << "Already provided a buffer"; - RETURN_IF_ERROR(metadata_range->GetNext(&io_buffer)); + RETURN_IF_ERROR( + io_mgr->Read(scan_node_->reader_context(), metadata_range, &io_buffer)); DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer()); DCHECK_EQ(io_buffer->len(), metadata_size); DCHECK(io_buffer->eosr()); - metadata_range->ReturnBuffer(move(io_buffer)); + io_mgr->ReturnBuffer(move(io_buffer)); } // Deserialize file header @@ -1650,16 +1644,23 @@ void HdfsParquetScanner::InitCollectionColumns() { } } -Status HdfsParquetScanner::InitScalarColumns() { +Status HdfsParquetScanner::InitScalarColumns( + int row_group_idx, const vector<BaseScalarColumnReader*>& column_readers) { int64_t partition_id = context_->partition_descriptor()->id(); const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename()); DCHECK(file_desc != nullptr); - parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; + parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx]; + // All the scan ranges (one for each column). + vector<ScanRange*> col_ranges; // Used to validate that the number of values in each reader in column_readers_ at the // same SchemaElement is the same. unordered_map<const parquet::SchemaElement*, int> num_values_map; - for (BaseScalarColumnReader* scalar_reader : scalar_readers_) { + // Used to validate we issued the right number of scan ranges + int num_scalar_readers = 0; + + for (BaseScalarColumnReader* scalar_reader: column_readers) { + ++num_scalar_readers; const parquet::ColumnChunk& col_chunk = row_group.columns[scalar_reader->col_idx()]; auto num_values_it = num_values_map.find(&scalar_reader->schema_element()); int num_values = -1; @@ -1668,115 +1669,78 @@ Status HdfsParquetScanner::InitScalarColumns() { } else { num_values_map[&scalar_reader->schema_element()] = col_chunk.meta_data.num_values; } + int64_t col_start = col_chunk.meta_data.data_page_offset; + if (num_values != -1 && col_chunk.meta_data.num_values != num_values) { // TODO: improve this error message by saying which columns are different, // and also specify column in other error messages as appropriate return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(), col_chunk.meta_data.num_values, num_values, filename()); } - RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_)); - } - RETURN_IF_ERROR( - DivideReservationBetweenColumns(scalar_readers_, total_col_reservation_)); - return Status::OK(); -} -Status HdfsParquetScanner::DivideReservationBetweenColumns( - const vector<BaseScalarColumnReader*>& column_readers, - int64_t reservation_to_distribute) { - DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); - const int64_t min_buffer_size = io_mgr->min_buffer_size(); - const int64_t max_buffer_size = io_mgr->max_buffer_size(); - // The HdfsScanNode reservation calculation in the planner ensures that we have - // reservation for at least one buffer per column. - if (reservation_to_distribute < min_buffer_size * column_readers.size()) { - return Status(TErrorCode::INTERNAL_ERROR, - Substitute("Not enough reservation in Parquet scanner for file '$0'. Need at " - "least $1 bytes per column for $2 columns but had $3 bytes", - filename(), min_buffer_size, column_readers.size(), - reservation_to_distribute)); - } + RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(file_metadata_, + filename(), row_group_idx, scalar_reader->col_idx(), + scalar_reader->schema_element(), state_)); - vector<int64_t> col_range_lengths(column_readers.size()); - for (int i = 0; i < column_readers.size(); ++i) { - col_range_lengths[i] = column_readers[i]->scan_range()->len(); - } - vector<pair<int, int64_t>> tmp_reservations = DivideReservationBetweenColumnsHelper( - min_buffer_size, max_buffer_size, col_range_lengths, reservation_to_distribute); - for (auto& tmp_reservation : tmp_reservations) { - column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second); - } - return Status::OK(); -} + if (col_chunk.meta_data.__isset.dictionary_page_offset) { + // Already validated in ValidateColumnOffsets() + DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start); + col_start = col_chunk.meta_data.dictionary_page_offset; + } + int64_t col_len = col_chunk.meta_data.total_compressed_size; + if (col_len <= 0) { + return Status(Substitute("File '$0' contains invalid column chunk size: $1", + filename(), col_len)); + } + int64_t col_end = col_start + col_len; + + // Already validated in ValidateColumnOffsets() + DCHECK_GT(col_end, 0); + DCHECK_LT(col_end, file_desc->file_length); + if (file_version_.application == "parquet-mr" && file_version_.VersionLt(1, 2, 9)) { + // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the + // dictionary page header size in total_compressed_size and total_uncompressed_size + // (see IMPALA-694). We pad col_len to compensate. + int64_t bytes_remaining = file_desc->file_length - col_end; + int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining); + col_len += pad; + } -vector<pair<int, int64_t>> HdfsParquetScanner::DivideReservationBetweenColumnsHelper( - int64_t min_buffer_size, int64_t max_buffer_size, - const vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute) { - // Pair of (column index, reservation allocated). - vector<pair<int, int64_t>> tmp_reservations; - for (int i = 0; i < col_range_lengths.size(); ++i) tmp_reservations.emplace_back(i, 0); - - // Sort in descending order of length, breaking ties by index so that larger columns - // get allocated reservation first. It is common to have dramatically different column - // sizes in a single file because of different value sizes and compressibility. E.g. - // consider a large STRING "comment" field versus a highly compressible - // dictionary-encoded column with only a few distinct values. We want to give max-sized - // buffers to large columns first to maximize the size of I/Os that we do while reading - // this row group. - sort(tmp_reservations.begin(), tmp_reservations.end(), - [&col_range_lengths](const pair<int, int64_t>& left, const pair<int, int64_t>& right) { - int64_t left_len = col_range_lengths[left.first]; - int64_t right_len = col_range_lengths[right.first]; - return left_len != right_len ? left_len > right_len : left.first < right.first; - }); - - // Set aside the minimum reservation per column. - reservation_to_distribute -= min_buffer_size * col_range_lengths.size(); - - // Allocate reservations to columns by repeatedly allocating either a max-sized buffer - // or a large enough buffer to fit the remaining data for each column. Do this - // round-robin up to the ideal number of I/O buffers. - for (int i = 0; i < DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) { - for (auto& tmp_reservation : tmp_reservations) { - // Add back the reservation we set aside above. - if (i == 0) reservation_to_distribute += min_buffer_size; - - int64_t bytes_left_in_range = - col_range_lengths[tmp_reservation.first] - tmp_reservation.second; - int64_t bytes_to_add; - if (bytes_left_in_range >= max_buffer_size) { - if (reservation_to_distribute >= max_buffer_size) { - bytes_to_add = max_buffer_size; - } else if (i == 0) { - DCHECK_EQ(0, tmp_reservation.second); - // Ensure this range gets at least one buffer on the first iteration. - bytes_to_add = BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute); - } else { - DCHECK_GT(tmp_reservation.second, 0); - // We need to read more than the max buffer size, but can't allocate a - // max-sized buffer. Stop adding buffers to this column: we prefer to use - // the existing max-sized buffers without small buffers mixed in so that - // we will alway do max-sized I/Os, which make efficient use of I/O devices. - bytes_to_add = 0; - } - } else if (bytes_left_in_range > 0 && - reservation_to_distribute >= min_buffer_size) { - // Choose a buffer size that will fit the rest of the bytes left in the range. - bytes_to_add = max(min_buffer_size, BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range)); - // But don't add more reservation than is available. - bytes_to_add = - min(bytes_to_add, BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute)); - } else { - bytes_to_add = 0; - } - DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << bytes_to_add; - reservation_to_distribute -= bytes_to_add; - tmp_reservation.second += bytes_to_add; - DCHECK_GE(reservation_to_distribute, 0); - DCHECK_GT(tmp_reservation.second, 0); + // TODO: this will need to change when we have co-located files and the columns + // are different files. + if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) { + return Status(Substitute("Expected parquet column file path '$0' to match " + "filename '$1'", col_chunk.file_path, filename())); } + + const ScanRange* split_range = + static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split; + + // Determine if the column is completely contained within a local split. + bool col_range_local = split_range->expected_local() + && col_start >= split_range->offset() + && col_end <= split_range->offset() + split_range->len(); + ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(), + filename(), col_len, col_start, partition_id, split_range->disk_id(), + col_range_local, + BufferOpts(split_range->try_cache(), file_desc->mtime)); + col_ranges.push_back(col_range); + + // Get the stream that will be used for this column + ScannerContext::Stream* stream = context_->AddStream(col_range); + DCHECK(stream != nullptr); + + RETURN_IF_ERROR(scalar_reader->Reset(&col_chunk.meta_data, stream)); } - return tmp_reservations; + DCHECK_EQ(col_ranges.size(), num_scalar_readers); + + // Issue all the column chunks to the io mgr and have them scheduled immediately. + // This means these ranges aren't returned via DiskIoMgr::GetNextRange and + // instead are scheduled to be read immediately. + RETURN_IF_ERROR(scan_node_->runtime_state()->io_mgr()->AddScanRanges( + scan_node_->reader_context(), col_ranges, true)); + + return Status::OK(); } Status HdfsParquetScanner::InitDictionaries( http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index 92f2550..f0043b5 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -34,7 +34,7 @@ class CollectionValueBuilder; struct HdfsFileDesc; /// Internal schema representation and resolution. -struct SchemaNode; +class SchemaNode; /// Class that implements Parquet definition and repetition level decoding. class ParquetLevelDecoder; @@ -69,8 +69,8 @@ class BoolColumnReader; /// the split size, the mid point guarantees that we have at least 50% of the row group in /// the current split. ProcessSplit() then computes the column ranges for these row groups /// and submits them to the IoMgr for immediate scheduling (so they don't surface in -/// DiskIoMgr::GetNextUnstartedRange()). Scheduling them immediately also guarantees they -/// are all read at once. +/// DiskIoMgr::GetNextRange()). Scheduling them immediately also guarantees they are all +/// read at once. /// /// Like the other scanners, each parquet scanner object is one to one with a /// ScannerContext. Unlike the other scanners though, the context will have multiple @@ -328,7 +328,7 @@ class HdfsParquetScanner : public HdfsScanner { virtual ~HdfsParquetScanner() {} /// Issue just the footer range for each file. We'll then parse the footer and pick - /// out the columns we want. 'files' must not be empty. + /// out the columns we want. static Status IssueInitialRanges(HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files) WARN_UNUSED_RESULT; @@ -361,7 +361,6 @@ class HdfsParquetScanner : public HdfsScanner { template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> friend class ScalarColumnReader; friend class BoolColumnReader; - friend class HdfsParquetScannerTest; /// Size of the file footer. This is a guess. If this value is too little, we will /// need to issue another read. @@ -430,7 +429,7 @@ class HdfsParquetScanner : public HdfsScanner { /// Number of scratch batches processed so far. int64_t row_batches_produced_; - /// Column reader for each top-level materialized slot in the output tuple. + /// Column reader for each materialized columns for this file. std::vector<ParquetColumnReader*> column_readers_; /// Column readers will write slot values into this scratch batch for @@ -446,9 +445,6 @@ class HdfsParquetScanner : public HdfsScanner { /// Scan range for the metadata. const io::ScanRange* metadata_range_; - /// Reservation available for scanning columns, in bytes. - int64_t total_col_reservation_ = 0; - /// Pool to copy dictionary page buffer into. This pool is shared across all the /// pages in a column chunk. boost::scoped_ptr<MemPool> dictionary_pool_; @@ -465,9 +461,6 @@ class HdfsParquetScanner : public HdfsScanner { /// or nested within a collection. std::vector<BaseScalarColumnReader*> non_dict_filterable_readers_; - /// Flattened list of all scalar column readers in column_readers_. - std::vector<BaseScalarColumnReader*> scalar_readers_; - /// Flattened collection column readers that point to readers in column_readers_. std::vector<CollectionColumnReader*> collection_readers_; @@ -634,24 +627,12 @@ class HdfsParquetScanner : public HdfsScanner { WARN_UNUSED_RESULT; /// Walks file_metadata_ and initiates reading the materialized columns. This - /// initializes 'scalar_readers_' and divides reservation between the columns but - /// does not start any scan ranges. - Status InitScalarColumns() WARN_UNUSED_RESULT; - - /// Decides how to divide 'reservation_to_distribute' bytes of reservation between the - /// columns. Sets the reservation on each corresponding reader in 'column_readers'. - Status DivideReservationBetweenColumns( - const std::vector<BaseScalarColumnReader*>& column_readers, - int64_t reservation_to_distribute); - - /// Helper for DivideReservationBetweenColumns. Implements the core algorithm for - /// dividing a reservation of 'reservation_to_distribute' bytes between columns with - /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns - /// a vector with an entry per column with the index into 'col_range_lengths' and the - /// amount of reservation in bytes to give to that column. - static std::vector<std::pair<int, int64_t>> DivideReservationBetweenColumnsHelper( - int64_t min_buffer_size, int64_t max_buffer_size, - const std::vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute); + /// initializes 'column_readers' and issues the reads for the columns. 'column_readers' + /// includes a mix of scalar readers from multiple schema nodes (i.e., readers of + /// top-level scalar columns and readers of scalar columns within a collection node). + Status InitScalarColumns( + int row_group_idx, const std::vector<BaseScalarColumnReader*>& column_readers) + WARN_UNUSED_RESULT; /// Initializes the column readers in collection_readers_. void InitCollectionColumns(); @@ -687,8 +668,7 @@ class HdfsParquetScanner : public HdfsScanner { /// Partitions the readers into scalar and collection readers. The collection readers /// are flattened into collection_readers_. The scalar readers are partitioned into /// dict_filterable_readers_ and non_dict_filterable_readers_ depending on whether - /// dictionary filtering is enabled and the reader can be dictionary filtered. All - /// scalar readers are also flattened into scalar_readers_. + /// dictionary filtering is enabled and the reader can be dictionary filtered. void PartitionReaders(const vector<ParquetColumnReader*>& readers, bool can_eval_dict_filters); http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 0dfd6f0..80cb6c5 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -35,7 +35,6 @@ #include "exprs/scalar-expr-evaluator.h" #include "exprs/scalar-expr.h" #include "runtime/descriptors.h" -#include "runtime/exec-env.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/io/disk-io-mgr.h" #include "runtime/io/request-context.h" @@ -68,7 +67,6 @@ const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024; HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ScanNode(pool, tnode, descs), - ideal_scan_range_reservation_(tnode.hdfs_scan_node.ideal_scan_range_reservation), min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ? tnode.hdfs_scan_node.min_max_tuple_id : -1), skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ? @@ -85,7 +83,6 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr), disks_accessed_bitmap_(TUnit::UNIT, 0), active_hdfs_read_thread_counter_(TUnit::UNIT, 0) { - DCHECK_GE(ideal_scan_range_reservation_, resource_profile_.min_reservation); } HdfsScanNodeBase::~HdfsScanNodeBase() { @@ -115,6 +112,7 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts, *min_max_row_desc, state, &min_max_conjuncts_)); } + return Status::OK(); } @@ -248,16 +246,6 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) { ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size()); ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id); - // Check if reservation was enough to allocate at least one buffer. The - // reservation calculation in HdfsScanNode.java should guarantee this. - // Hitting this error indicates a misconfiguration or bug. - int64_t min_buffer_size = ExecEnv::GetInstance()->disk_io_mgr()->min_buffer_size(); - if (scan_range_params_->size() > 0 - && resource_profile_.min_reservation < min_buffer_size) { - return Status(TErrorCode::INTERNAL_ERROR, - Substitute("HDFS scan min reservation $0 must be >= min buffer size $1", - resource_profile_.min_reservation, min_buffer_size)); - } // Add per volume stats to the runtime profile PerVolumeStats per_volume_stats; stringstream str; @@ -344,18 +332,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) { partition_desc->partition_key_value_evals(), scan_node_pool_.get(), state); } - RETURN_IF_ERROR(ClaimBufferReservation(state)); - // We got the minimum reservation. Now try to get ideal reservation. - if (resource_profile_.min_reservation != ideal_scan_range_reservation_) { - bool increased = buffer_pool_client_.IncreaseReservation( - ideal_scan_range_reservation_ - resource_profile_.min_reservation); - VLOG_FILE << "Increasing reservation from minimum " - << resource_profile_.min_reservation << "B to ideal " - << ideal_scan_range_reservation_ << "B " - << (increased ? "succeeded" : "failed"); - } - - reader_context_ = runtime_state_->io_mgr()->RegisterContext(); + reader_context_ = runtime_state_->io_mgr()->RegisterContext(mem_tracker()); // Initialize HdfsScanNode specific counters // TODO: Revisit counters and move the counters specific to multi-threaded scans @@ -376,11 +353,14 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) { num_scanner_threads_started_counter_ = ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT); - reader_context_->set_bytes_read_counter(bytes_read_counter()); - reader_context_->set_read_timer(read_timer()); - reader_context_->set_open_file_timer(open_file_timer()); - reader_context_->set_active_read_thread_counter(&active_hdfs_read_thread_counter_); - reader_context_->set_disks_accessed_bitmap(&disks_accessed_bitmap_); + runtime_state_->io_mgr()->set_bytes_read_counter( + reader_context_.get(), bytes_read_counter()); + runtime_state_->io_mgr()->set_read_timer(reader_context_.get(), read_timer()); + runtime_state_->io_mgr()->set_open_file_timer(reader_context_.get(), open_file_timer()); + runtime_state_->io_mgr()->set_active_read_thread_counter( + reader_context_.get(), &active_hdfs_read_thread_counter_); + runtime_state_->io_mgr()->set_disks_access_bitmap( + reader_context_.get(), &disks_accessed_bitmap_); average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter( AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_); @@ -474,27 +454,18 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) { } } - // Issue initial ranges for all file types. Only call functions for file types that - // actually exist - trying to add empty lists of ranges can result in spurious - // CANCELLED errors - see IMPALA-6564. - for (const auto& entry : matching_per_type_files) { - if (entry.second.empty()) continue; - switch (entry.first) { - case THdfsFileFormat::PARQUET: - RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this, entry.second)); - break; - case THdfsFileFormat::TEXT: - RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this, entry.second)); - break; - case THdfsFileFormat::SEQUENCE_FILE: - case THdfsFileFormat::RC_FILE: - case THdfsFileFormat::AVRO: - RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, entry.second)); - break; - default: - DCHECK(false) << "Unexpected file type " << entry.first; - } - } + // Issue initial ranges for all file types. + RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this, + matching_per_type_files[THdfsFileFormat::PARQUET])); + RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this, + matching_per_type_files[THdfsFileFormat::TEXT])); + RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, + matching_per_type_files[THdfsFileFormat::SEQUENCE_FILE])); + RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, + matching_per_type_files[THdfsFileFormat::RC_FILE])); + RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, + matching_per_type_files[THdfsFileFormat::AVRO])); + return Status::OK(); } @@ -553,8 +524,6 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, Status HdfsScanNodeBase::AddDiskIoRanges( const vector<ScanRange*>& ranges, int num_files_queued) { - DCHECK(!progress_.done()) << "Don't call AddScanRanges() after all ranges finished."; - DCHECK_GT(ranges.size(), 0); RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges)); num_unqueued_files_.Add(-num_files_queued); DCHECK_GE(num_unqueued_files_.Load(), 0); @@ -857,14 +826,20 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() { Substitute("Codegen enabled: $0 out of $1", num_enabled, total)); if (reader_context_ != nullptr) { - bytes_read_local_->Set(reader_context_->bytes_read_local()); - bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit()); - bytes_read_dn_cache_->Set(reader_context_->bytes_read_dn_cache()); - num_remote_ranges_->Set(reader_context_->num_remote_ranges()); - unexpected_remote_bytes_->Set(reader_context_->unexpected_remote_bytes()); - cached_file_handles_hit_count_->Set(reader_context_->cached_file_handles_hit_count()); + bytes_read_local_->Set( + runtime_state_->io_mgr()->bytes_read_local(reader_context_.get())); + bytes_read_short_circuit_->Set( + runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_.get())); + bytes_read_dn_cache_->Set( + runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_.get())); + num_remote_ranges_->Set(static_cast<int64_t>( + runtime_state_->io_mgr()->num_remote_ranges(reader_context_.get()))); + unexpected_remote_bytes_->Set( + runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_.get())); + cached_file_handles_hit_count_->Set( + runtime_state_->io_mgr()->cached_file_handles_hit_count(reader_context_.get())); cached_file_handles_miss_count_->Set( - reader_context_->cached_file_handles_miss_count()); + runtime_state_->io_mgr()->cached_file_handles_miss_count(reader_context_.get())); if (unexpected_remote_bytes_->value() >= UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) { runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute( http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 3a9c37f..70fbac2 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -322,9 +322,6 @@ class HdfsScanNodeBase : public ScanNode { friend class ScannerContext; friend class HdfsScanner; - /// Ideal reservation to process each input split, computed by the planner. - const int64_t ideal_scan_range_reservation_; - /// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics. const int min_max_tuple_id_; http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/hdfs-scan-node-mt.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc index f4948d9..7ea4d80 100644 --- a/be/src/exec/hdfs-scan-node-mt.cc +++ b/be/src/exec/hdfs-scan-node-mt.cc @@ -26,7 +26,6 @@ #include "gen-cpp/PlanNodes_types.h" -using namespace impala::io; using std::stringstream; namespace impala { @@ -77,27 +76,20 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e scanner_->Close(row_batch); scanner_.reset(); } - DiskIoMgr* io_mgr = runtime_state_->io_mgr(); - bool needs_buffers; - RETURN_IF_ERROR(io_mgr->GetNextUnstartedRange( - reader_context_.get(), &scan_range_, &needs_buffers)); - if (scan_range_ == nullptr) { + RETURN_IF_ERROR( + runtime_state_->io_mgr()->GetNextRange(reader_context_.get(), &scan_range_)); + if (scan_range_ == NULL) { *eos = true; StopAndFinalizeCounters(); return Status::OK(); } - int64_t scanner_reservation = buffer_pool_client_.GetReservation(); - if (needs_buffers) { - RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(), - &buffer_pool_client_, scan_range_, scanner_reservation)); - } ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range_->meta_data()); int64_t partition_id = metadata->partition_id; HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id); - scanner_ctx_.reset(new ScannerContext(runtime_state_, this, &buffer_pool_client_, - partition, filter_ctxs(), expr_results_pool())); - scanner_ctx_->AddStream(scan_range_, scanner_reservation); + scanner_ctx_.reset(new ScannerContext( + runtime_state_, this, partition, scan_range_, filter_ctxs(), + expr_results_pool())); Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_); if (!status.ok()) { DCHECK(scanner_ == NULL); http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index a95e47a..710a8af 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -26,7 +26,6 @@ #include "exec/scanner-context.h" #include "runtime/descriptors.h" #include "runtime/fragment-instance-state.h" -#include "runtime/io/request-context.h" #include "runtime/runtime-filter.inline.h" #include "runtime/runtime-state.h" #include "runtime/mem-tracker.h" @@ -46,6 +45,16 @@ DECLARE_bool(skip_file_runtime_filtering); using namespace impala; using namespace impala::io; +// Amount of memory that we approximate a scanner thread will use not including IoBuffers. +// The memory used does not vary considerably between file formats (just a couple of MBs). +// This value is conservative and taken from running against the tpch lineitem table. +// TODO: revisit how we do this. +const int SCANNER_THREAD_MEM_USAGE = 32 * 1024 * 1024; + +// Estimated upper bound on the compression ratio of compressed text files. Used to +// estimate scanner thread memory usage. +const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11; + // Amount of time to block waiting for GetNext() to release scanner threads between // checking if a scanner thread should yield itself back to the global thread pool. const int SCANNER_THREAD_WAIT_TIME_MS = 20; @@ -53,6 +62,11 @@ const int SCANNER_THREAD_WAIT_TIME_MS = 20; HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : HdfsScanNodeBase(pool, tnode, descs), + ranges_issued_barrier_(1), + scanner_thread_bytes_required_(0), + done_(false), + all_ranges_started_(false), + thread_avail_cb_id_(-1), max_num_scanner_threads_(CpuInfo::num_cores()) { } @@ -153,6 +167,36 @@ Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) { Status HdfsScanNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state)); + + // Compute the minimum bytes required to start a new thread. This is based on the + // file format. + // The higher the estimate, the less likely it is the query will fail but more likely + // the query will be throttled when it does not need to be. + // TODO: how many buffers should we estimate per range. The IoMgr will throttle down to + // one but if there are already buffers queued before memory pressure was hit, we can't + // reclaim that memory. + if (per_type_files_[THdfsFileFormat::PARQUET].size() > 0) { + // Parquet files require buffers per column + scanner_thread_bytes_required_ = + materialized_slots_.size() * 3 * runtime_state_->io_mgr()->max_read_buffer_size(); + } else { + scanner_thread_bytes_required_ = + 3 * runtime_state_->io_mgr()->max_read_buffer_size(); + } + // scanner_thread_bytes_required_ now contains the IoBuffer requirement. + // Next we add in the other memory the scanner thread will use. + // e.g. decompression buffers, tuple buffers, etc. + // For compressed text, we estimate this based on the file size (since the whole file + // will need to be decompressed at once). For all other formats, we use a constant. + // TODO: can we do something better? + int64_t scanner_thread_mem_usage = SCANNER_THREAD_MEM_USAGE; + for (HdfsFileDesc* file: per_type_files_[THdfsFileFormat::TEXT]) { + if (file->file_compression != THdfsCompression::NONE) { + int64_t bytes_required = file->file_length * COMPRESSED_TEXT_COMPRESSION_RATIO; + scanner_thread_mem_usage = ::max(bytes_required, scanner_thread_mem_usage); + } + } + scanner_thread_bytes_required_ += scanner_thread_mem_usage; row_batches_get_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueueGetWaitTime"); row_batches_put_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueuePutWaitTime"); return Status::OK(); @@ -174,9 +218,10 @@ Status HdfsScanNode::Open(RuntimeState* state) { max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads; } DCHECK_GT(max_num_scanner_threads_, 0); - spare_reservation_.Store(buffer_pool_client_.GetReservation()); + thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb( bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1)); + return Status::OK(); } @@ -217,28 +262,37 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges, return Status::OK(); } -bool HdfsScanNode::EnoughReservationForExtraThread(const unique_lock<mutex>& lock) { - DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); - if (spare_reservation_.Load() >= ideal_scan_range_reservation_) return true; - int64_t increase = ideal_scan_range_reservation_ - spare_reservation_.Load(); - if (!buffer_pool_client_.IncreaseReservation(increase)) return false; - spare_reservation_.Add(increase); - return true; -} - -int64_t HdfsScanNode::DeductReservationForScannerThread(const unique_lock<mutex>& lock, - bool first_thread) { - DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); - int64_t amount; - if (first_thread) { - amount = spare_reservation_.Load() >= ideal_scan_range_reservation_ ? - ideal_scan_range_reservation_ : resource_profile_.min_reservation; - } else { - amount = ideal_scan_range_reservation_; +// For controlling the amount of memory used for scanners, we approximate the +// scanner mem usage based on scanner_thread_bytes_required_, rather than the +// consumption in the scan node's mem tracker. The problem with the scan node +// trackers is that it does not account for the memory the scanner will use. +// For example, if there is 110 MB of memory left (based on the mem tracker) +// and we estimate that a scanner will use 100MB, we want to make sure to only +// start up one additional thread. However, after starting the first thread, the +// mem tracker value will not change immediately (it takes some time before the +// scanner is running and starts using memory). Therefore we just use the estimate +// based on the number of running scanner threads. +bool HdfsScanNode::EnoughMemoryForScannerThread(bool new_thread) { + int64_t committed_scanner_mem = + active_scanner_thread_counter_.value() * scanner_thread_bytes_required_; + int64_t tracker_consumption = mem_tracker()->consumption(); + int64_t est_additional_scanner_mem = committed_scanner_mem - tracker_consumption; + if (est_additional_scanner_mem < 0) { + // This is the case where our estimate was too low. Expand the estimate based + // on the usage. + int64_t avg_consumption = + tracker_consumption / active_scanner_thread_counter_.value(); + // Take the average and expand it by 50%. Some scanners will not have hit their + // steady state mem usage yet. + // TODO: how can we scale down if we've overestimated. + // TODO: better heuristic? + scanner_thread_bytes_required_ = static_cast<int64_t>(avg_consumption * 1.5); + est_additional_scanner_mem = 0; } - int64_t remainder = spare_reservation_.Add(-amount); - DCHECK_GE(remainder, 0); - return amount; + + // If we are starting a new thread, take that into account now. + if (new_thread) est_additional_scanner_mem += scanner_thread_bytes_required_; + return est_additional_scanner_mem < mem_tracker()->SpareCapacity(); } void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) { @@ -268,45 +322,36 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) // TODO: This still leans heavily on starvation-free locks, come up with a more // correct way to communicate between this method and ScannerThreadHelper unique_lock<mutex> lock(lock_); - - const int64_t num_active_scanner_threads = active_scanner_thread_counter_.value(); - const bool first_thread = num_active_scanner_threads == 0; // Cases 1, 2, 3. if (done_ || all_ranges_started_ || - num_active_scanner_threads >= progress_.remaining()) { + active_scanner_thread_counter_.value() >= progress_.remaining()) { break; } // Cases 5 and 6. - if (!first_thread && + if (active_scanner_thread_counter_.value() > 0 && (materialized_row_batches_->Size() >= max_materialized_row_batches_ || - !EnoughReservationForExtraThread(lock))) { + !EnoughMemoryForScannerThread(true))) { break; } // Case 7 and 8. - if (num_active_scanner_threads >= max_num_scanner_threads_ || - !pool->TryAcquireThreadToken()) { + bool is_reserved = false; + if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ || + !pool->TryAcquireThreadToken(&is_reserved)) { break; } - // Deduct the reservation. We haven't dropped the lock since the - // first_thread/EnoughReservationForExtraThread() checks so spare reservation - // must be available. - int64_t scanner_thread_reservation = - DeductReservationForScannerThread(lock, first_thread); COUNTER_ADD(&active_scanner_thread_counter_, 1); string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)", PrintId(runtime_state_->fragment_instance_id()), id(), num_scanner_threads_started_counter_->value()); - auto fn = [this, scanner_thread_reservation]() { - this->ScannerThread(scanner_thread_reservation); - }; + + auto fn = [this]() { this->ScannerThread(); }; std::unique_ptr<Thread> t; status = Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true); if (!status.ok()) { - ReturnReservationFromScannerThread(scanner_thread_reservation); COUNTER_ADD(&active_scanner_thread_counter_, -1); // Release the token and skip running callbacks to find a replacement. Skipping // serves two purposes. First, it prevents a mutual recursion between this function @@ -327,10 +372,9 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) } } -void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) { +void HdfsScanNode::ScannerThread() { SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters()); SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics()); - DiskIoMgr* io_mgr = runtime_state_->io_mgr(); // Make thread-local copy of filter contexts to prune scan ranges, and to pass to the // scanner for finer-grained filtering. Use a thread-local MemPool for the filter @@ -356,7 +400,8 @@ void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) { // this thread. unique_lock<mutex> l(lock_); if (active_scanner_thread_counter_.value() > 1) { - if (runtime_state_->resource_pool()->optional_exceeded()) { + if (runtime_state_->resource_pool()->optional_exceeded() || + !EnoughMemoryForScannerThread(false)) { // We can't break here. We need to update the counter with the lock held or else // all threads might see active_scanner_thread_counter_.value > 1 COUNTER_ADD(&active_scanner_thread_counter_, -1); @@ -376,29 +421,21 @@ void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) { // to return if there's an error. ranges_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused); - // Take a snapshot of num_unqueued_files_ before calling GetNextUnstartedRange(). + ScanRange* scan_range; + // Take a snapshot of num_unqueued_files_ before calling GetNextRange(). // We don't want num_unqueued_files_ to go to zero between the return from - // GetNextUnstartedRange() and the check for when all ranges are complete. + // GetNextRange() and the check for when all ranges are complete. int num_unqueued_files = num_unqueued_files_.Load(); // TODO: the Load() acts as an acquire barrier. Is this needed? (i.e. any earlier // stores that need to complete?) AtomicUtil::MemoryBarrier(); - ScanRange* scan_range; - bool needs_buffers; Status status = - io_mgr->GetNextUnstartedRange(reader_context_.get(), &scan_range, &needs_buffers); + runtime_state_->io_mgr()->GetNextRange(reader_context_.get(), &scan_range); - if (status.ok() && scan_range != nullptr) { - if (needs_buffers) { - status = io_mgr->AllocateBuffersForRange( - reader_context_.get(), &buffer_pool_client_, scan_range, - scanner_thread_reservation); - } - if (status.ok()) { - // Got a scan range. Process the range end to end (in this thread). - status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(), - &expr_results_pool, scan_range, scanner_thread_reservation); - } + if (status.ok() && scan_range != NULL) { + // Got a scan range. Process the range end to end (in this thread). + status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(), + &expr_results_pool, scan_range); } if (!status.ok()) { @@ -428,8 +465,8 @@ void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) { // TODO: Based on the usage pattern of all_ranges_started_, it looks like it is not // needed to acquire the lock in x86. unique_lock<mutex> l(lock_); - // All ranges have been queued and GetNextUnstartedRange() returned NULL. This means - // that every range is either done or being processed by another thread. + // All ranges have been queued and GetNextRange() returned NULL. This means that + // every range is either done or being processed by another thread. all_ranges_started_ = true; break; } @@ -437,7 +474,6 @@ void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) { COUNTER_ADD(&active_scanner_thread_counter_, -1); exit: - ReturnReservationFromScannerThread(scanner_thread_reservation); runtime_state_->resource_pool()->ReleaseThreadToken(false); for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_); filter_mem_pool.FreeAll(); @@ -445,8 +481,7 @@ exit: } Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, - MemPool* expr_results_pool, ScanRange* scan_range, - int64_t scanner_thread_reservation) { + MemPool* expr_results_pool, ScanRange* scan_range) { DCHECK(scan_range != NULL); ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data()); @@ -471,9 +506,8 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, return Status::OK(); } - ScannerContext context(runtime_state_, this, &buffer_pool_client_, partition, - filter_ctxs, expr_results_pool); - context.AddStream(scan_range, scanner_thread_reservation); + ScannerContext context( + runtime_state_, this, partition, scan_range, filter_ctxs, expr_results_pool); scoped_ptr<HdfsScanner> scanner; Status status = CreateAndOpenScanner(partition, &context, &scanner); if (!status.ok()) { @@ -515,7 +549,9 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, void HdfsScanNode::SetDoneInternal() { if (done_) return; done_ = true; - if (reader_context_ != nullptr) reader_context_->Cancel(); + if (reader_context_ != nullptr) { + runtime_state_->io_mgr()->CancelContext(reader_context_.get()); + } materialized_row_batches_->Shutdown(); } http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/hdfs-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index 81e826e..a1c97cf 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -27,7 +27,6 @@ #include <boost/scoped_ptr.hpp> #include <boost/thread/mutex.hpp> -#include "common/atomic.h" #include "exec/filter-context.h" #include "exec/hdfs-scan-node-base.h" #include "runtime/io/disk-io-mgr.h" @@ -59,14 +58,8 @@ class TPlanNode; /// 5. The scanner finishes the scan range and informs the scan node so it can track /// end of stream. /// -/// Buffer management: -/// ------------------ -/// The different scanner threads all allocate I/O buffers from the node's Buffer Pool -/// client. The scan node ensures that enough reservation is available to start a -/// scanner thread before launching each one with (see -/// EnoughReservationForExtraThread()), after which the scanner thread is responsible -/// for staying within the reservation handed off to it. -/// +/// TODO: This class allocates a bunch of small utility objects that should be +/// recycled. /// TODO: Remove this class once the fragment-based multi-threaded execution is /// fully functional. class HdfsScanNode : public HdfsScanNodeBase { @@ -107,7 +100,12 @@ class HdfsScanNode : public HdfsScanNodeBase { private: /// Released when initial ranges are issued in the first call to GetNext(). - CountingBarrier ranges_issued_barrier_{1}; + CountingBarrier ranges_issued_barrier_; + + /// The estimated memory required to start up a new scanner thread. If the memory + /// left (due to limits) is less than this value, we won't start up optional + /// scanner threads. + int64_t scanner_thread_bytes_required_; /// Thread group for all scanner worker threads ThreadGroup scanner_threads_; @@ -132,16 +130,16 @@ class HdfsScanNode : public HdfsScanNodeBase { /// are finished, an error/cancellation occurred, or the limit was reached. /// Setting this to true triggers the scanner threads to clean up. /// This should not be explicitly set. Instead, call SetDone(). - bool done_ = false; + bool done_; /// Set to true if all ranges have started. Some of the ranges may still be in flight /// being processed by scanner threads, but no new ScannerThreads should be started. - bool all_ranges_started_ = false; + bool all_ranges_started_; /// The id of the callback added to the thread resource manager when thread token /// is available. Used to remove the callback before this scan node is destroyed. /// -1 if no callback is registered. - int thread_avail_cb_id_ = -1; + int thread_avail_cb_id_; /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query /// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads @@ -149,14 +147,6 @@ class HdfsScanNode : public HdfsScanNodeBase { /// the number of cores. int max_num_scanner_threads_; - /// Amount of the 'buffer_pool_client_' reservation that is not allocated to scanner - /// threads. Doled out to scanner threads when they are started and returned when - /// those threads no longer need it. Can be atomically incremented without holding - /// 'lock_' but 'lock_' is held when decrementing to ensure that the check for - /// reservation and the actual deduction are atomic with respect to other threads - /// trying to claim reservation. - AtomicInt64 spare_reservation_{0}; - /// The wait time for fetching a row batch from the row batch queue. RuntimeProfile::Counter* row_batches_get_timer_; @@ -170,35 +160,21 @@ class HdfsScanNode : public HdfsScanNodeBase { /// Main function for scanner thread. This thread pulls the next range to be /// processed from the IoMgr and then processes the entire range end to end. /// This thread terminates when all scan ranges are complete or an error occurred. - /// The caller must have reserved 'scanner_thread_reservation' bytes of memory for - /// this thread with DeductReservationForScannerThread(). - void ScannerThread(int64_t scanner_thread_reservation); + void ScannerThread(); /// Process the entire scan range with a new scanner object. Executed in scanner /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows /// in this split. Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs, - MemPool* expr_results_pool, io::ScanRange* scan_range, - int64_t scanner_thread_reservation) WARN_UNUSED_RESULT; - - /// Return true if there is enough reservation to start an extra scanner thread. - /// Tries to increase reservation if enough is not already available in - /// 'spare_reservation_'. 'lock_' must be held via 'lock' - bool EnoughReservationForExtraThread(const boost::unique_lock<boost::mutex>& lock); - - /// Deduct reservation to start a new scanner thread from 'spare_reservation_'. If - /// 'first_thread' is true, this is the first thread to be started and only the - /// minimum reservation is required to be available. Otherwise - /// EnoughReservationForExtra() thread must have returned true in the current - /// critical section so that 'ideal_scan_range_bytes_' is available for the extra - /// thread. Returns the amount deducted. 'lock_' must be held via 'lock'. - int64_t DeductReservationForScannerThread(const boost::unique_lock<boost::mutex>& lock, - bool first_thread); - - /// Called by scanner thread to return or all of its reservation that is not needed. - void ReturnReservationFromScannerThread(int64_t bytes) { - spare_reservation_.Add(bytes); - } + MemPool* expr_results_pool, io::ScanRange* scan_range) WARN_UNUSED_RESULT; + + /// Returns true if there is enough memory (against the mem tracker limits) to + /// have a scanner thread. + /// If new_thread is true, the calculation is for starting a new scanner thread. + /// If false, it determines whether there's adequate memory for the existing + /// set of scanner threads. + /// lock_ must be taken before calling this. + bool EnoughMemoryForScannerThread(bool new_thread); /// Checks for eos conditions and returns batches from materialized_row_batches_. Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos) http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index 2a6a912..253bcc8 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -144,10 +144,8 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, DCHECK(false); } } - if (compressed_text_scan_ranges.size() > 0) { - RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges, - compressed_text_files)); - } + RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges, + compressed_text_files)); if (lzo_text_files.size() > 0) { // This will dlopen the lzo binary and can fail if the lzo binary is not present. RETURN_IF_ERROR(HdfsLzoTextScanner::IssueInitialRanges(scan_node, lzo_text_files)); http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/parquet-column-readers.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc index 153b24a..2939691 100644 --- a/be/src/exec/parquet-column-readers.cc +++ b/be/src/exec/parquet-column-readers.cc @@ -30,7 +30,6 @@ #include "exec/scanner-context.inline.h" #include "rpc/thrift-util.h" #include "runtime/collection-value-builder.h" -#include "runtime/exec-env.h" #include "runtime/tuple-row.h" #include "runtime/tuple.h" #include "runtime/runtime-state.h" @@ -50,10 +49,6 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false, static const int BITPACKED_DEPRECATION_WARNING_FREQUENCY = 100; -// Max dictionary page header size in bytes. This is an estimate and only needs to be an -// upper bound. -static const int MAX_DICT_HEADER_SIZE = 100; - // Max data page header size in bytes. This is an estimate and only needs to be an upper // bound. It is theoretically possible to have a page header of any size due to string // value statistics, but in practice we'll have trouble reading string values this large. @@ -73,8 +68,6 @@ static int debug_count = 0; #define SHOULD_TRIGGER_DEBUG_ACTION(x) (false) #endif -using namespace impala::io; - namespace impala { const string PARQUET_COL_MEM_LIMIT_EXCEEDED = @@ -854,99 +847,8 @@ static bool RequiresSkippedDictionaryHeaderCheck( return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal); } -Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc, - const parquet::ColumnChunk& col_chunk, int row_group_idx) { - num_buffered_values_ = 0; - data_ = nullptr; - data_end_ = nullptr; - stream_ = nullptr; - io_reservation_ = 0; - metadata_ = &col_chunk.meta_data; - num_values_read_ = 0; - def_level_ = -1; - // See ColumnReader constructor. - rep_level_ = max_rep_level() == 0 ? 0 : -1; - pos_current_value_ = -1; - - if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) { - RETURN_IF_ERROR(Codec::CreateDecompressor( - nullptr, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_)); - } - int64_t col_start = col_chunk.meta_data.data_page_offset; - - RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_, - parent_->filename(), row_group_idx, col_idx(), schema_element(), - parent_->state_)); - - if (col_chunk.meta_data.__isset.dictionary_page_offset) { - // Already validated in ValidateColumnOffsets() - DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start); - col_start = col_chunk.meta_data.dictionary_page_offset; - } - int64_t col_len = col_chunk.meta_data.total_compressed_size; - if (col_len <= 0) { - return Status(Substitute("File '$0' contains invalid column chunk size: $1", - filename(), col_len)); - } - int64_t col_end = col_start + col_len; - - // Already validated in ValidateColumnOffsets() - DCHECK_GT(col_end, 0); - DCHECK_LT(col_end, file_desc.file_length); - const ParquetFileVersion& file_version = parent_->file_version_; - if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 9)) { - // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the - // dictionary page header size in total_compressed_size and total_uncompressed_size - // (see IMPALA-694). We pad col_len to compensate. - int64_t bytes_remaining = file_desc.file_length - col_end; - int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining); - col_len += pad; - } - - // TODO: this will need to change when we have co-located files and the columns - // are different files. - if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) { - return Status(Substitute("Expected parquet column file path '$0' to match " - "filename '$1'", col_chunk.file_path, filename())); - } - - const ScanRange* metadata_range = parent_->metadata_range_; - int64_t partition_id = parent_->context_->partition_descriptor()->id(); - const ScanRange* split_range = - static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split; - // Determine if the column is completely contained within a local split. - bool col_range_local = split_range->expected_local() - && col_start >= split_range->offset() - && col_end <= split_range->offset() + split_range->len(); - scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(), - filename(), col_len, col_start, partition_id, split_range->disk_id(), - col_range_local, - BufferOpts(split_range->try_cache(), file_desc.mtime)); - ClearDictionaryDecoder(); - return Status::OK(); -} - -Status BaseScalarColumnReader::StartScan() { - DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan."; - DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr(); - ScannerContext* context = parent_->context_; - DCHECK_GT(io_reservation_, 0); - bool needs_buffers; - RETURN_IF_ERROR(io_mgr->StartScanRange( - parent_->scan_node_->reader_context(), scan_range_, &needs_buffers)); - if (needs_buffers) { - RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange( - parent_->scan_node_->reader_context(), context->bp_client(), - scan_range_, io_reservation_)); - } - stream_ = parent_->context_->AddStream(scan_range_, io_reservation_); - DCHECK(stream_ != nullptr); - return Status::OK(); -} - Status BaseScalarColumnReader::ReadPageHeader(bool peek, parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* eos) { - DCHECK(stream_ != nullptr); *eos = false; uint8_t* buffer; @@ -1030,7 +932,7 @@ Status BaseScalarColumnReader::InitDictionary() { bool eos; parquet::PageHeader next_page_header; uint32_t next_header_size; - DCHECK(stream_ != nullptr); + DCHECK(!HasDictionaryDecoder()); RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header, @@ -1142,14 +1044,6 @@ Status BaseScalarColumnReader::InitDictionary() { return Status::OK(); } -Status BaseScalarColumnReader::InitDictionaries( - const vector<BaseScalarColumnReader*> readers) { - for (BaseScalarColumnReader* reader : readers) { - RETURN_IF_ERROR(reader->InitDictionary()); - } - return Status::OK(); -} - Status BaseScalarColumnReader::ReadDataPage() { // We're about to move to the next data page. The previous data page is // now complete, free up any memory allocated for it. If the data page contained http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index bfb20ec..3a8ad70 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -326,29 +326,42 @@ class BaseScalarColumnReader : public ParquetColumnReader { BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc) : ParquetColumnReader(parent, node, slot_desc), + data_(NULL), + data_end_(NULL), + def_levels_(true), + rep_levels_(false), + page_encoding_(parquet::Encoding::PLAIN_DICTIONARY), + num_buffered_values_(0), + num_values_read_(0), + metadata_(NULL), + stream_(NULL), data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) { DCHECK_GE(node_.col_idx, 0) << node_.DebugString(); } virtual ~BaseScalarColumnReader() { } - /// Resets the reader for each row group in the file and creates the scan - /// range for the column, but does not start it. To start scanning, - /// set_io_reservation() must be called to assign reservation to this - /// column, followed by StartScan(). - Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk, - int row_group_idx); - - /// Starts the column scan range. The reader must be Reset() and have a - /// reservation assigned via set_io_reservation(). This must be called - /// before any of the column data can be read (including dictionary and - /// data pages). Returns an error status if there was an error starting the - /// scan or allocating buffers for it. - Status StartScan(); - - /// Helper to start scans for multiple columns at once. - static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) { - for (BaseScalarColumnReader* reader : readers) RETURN_IF_ERROR(reader->StartScan()); + /// This is called once for each row group in the file. + Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) { + DCHECK(stream != NULL); + DCHECK(metadata != NULL); + + num_buffered_values_ = 0; + data_ = NULL; + data_end_ = NULL; + stream_ = stream; + metadata_ = metadata; + num_values_read_ = 0; + def_level_ = -1; + // See ColumnReader constructor. + rep_level_ = max_rep_level() == 0 ? 0 : -1; + pos_current_value_ = -1; + + if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) { + RETURN_IF_ERROR(Codec::CreateDecompressor( + NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_)); + } + ClearDictionaryDecoder(); return Status::OK(); } @@ -363,27 +376,22 @@ class BaseScalarColumnReader : public ParquetColumnReader { if (dict_decoder != nullptr) dict_decoder->Close(); } - io::ScanRange* scan_range() const { return scan_range_; } int64_t total_len() const { return metadata_->total_compressed_size; } int col_idx() const { return node_.col_idx; } THdfsCompression::type codec() const { if (metadata_ == NULL) return THdfsCompression::NONE; return PARQUET_TO_IMPALA_CODEC[metadata_->codec]; } - void set_io_reservation(int bytes) { io_reservation_ = bytes; } /// Reads the next definition and repetition levels for this column. Initializes the /// next data page if necessary. virtual bool NextLevels() { return NextLevels<true>(); } - /// Check the data stream to see if there is a dictionary page. If there is, - /// use that page to initialize dict_decoder_ and advance the data stream - /// past the dictionary page. + // Check the data stream to see if there is a dictionary page. If there is, + // use that page to initialize dict_decoder_ and advance the data stream + // past the dictionary page. Status InitDictionary(); - /// Convenience function to initialize multiple dictionaries. - static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> readers); - // Returns the dictionary or NULL if the dictionary doesn't exist virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; } @@ -409,45 +417,33 @@ class BaseScalarColumnReader : public ParquetColumnReader { // fit in as few cache lines as possible. /// Pointer to start of next value in data page - uint8_t* data_ = nullptr; + uint8_t* data_; /// End of the data page. - const uint8_t* data_end_ = nullptr; + const uint8_t* data_end_; /// Decoder for definition levels. - ParquetLevelDecoder def_levels_{true}; + ParquetLevelDecoder def_levels_; /// Decoder for repetition levels. - ParquetLevelDecoder rep_levels_{false}; + ParquetLevelDecoder rep_levels_; /// Page encoding for values of the current data page. Cached here for perf. Set in /// InitDataPage(). - parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY; + parquet::Encoding::type page_encoding_; /// Num values remaining in the current data page - int num_buffered_values_ = 0; + int num_buffered_values_; // Less frequently used members that are not accessed in inner loop should go below // here so they do not occupy precious cache line space. /// The number of values seen so far. Updated per data page. - int64_t num_values_read_ = 0; - - /// Metadata for the column for the current row group. - const parquet::ColumnMetaData* metadata_ = nullptr; + int64_t num_values_read_; + const parquet::ColumnMetaData* metadata_; boost::scoped_ptr<Codec> decompressor_; - - /// The scan range for the column's data. Initialized for each row group by Reset(). - io::ScanRange* scan_range_ = nullptr; - - // Stream used to read data from 'scan_range_'. Initialized by StartScan(). - ScannerContext::Stream* stream_ = nullptr; - - /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set - /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group - /// by Reset(). - int64_t io_reservation_ = 0; + ScannerContext::Stream* stream_; /// Pool to allocate storage for data pages from - either decompression buffers for /// compressed data pages or copies of the data page with var-len data to attach to