IMPALA-4458: Fix resource cleanup of cancelled mt scan nodes. The bug was that HdfsScanNodeMt::Close() did not properly clean up all in-flight resources when called through the query cancellation path.
The main change is to clean up all resources when passing a NULL batch into HdfsparquetScanner::Close() which also needed similar changes in the scanner context. Testing: Ran test_cancellation.py, test_scanners.py and test_nested_types.py with MT_DOP=3. Added a test query with a limit that was failing before. A regular private hdfs/core test run succeeded. Change-Id: Ib32f87b3289ed9e8fc2db0885675845e11207438 Reviewed-on: http://gerrit.cloudera.org:8080/5274 Reviewed-by: Alex Behm <alex.b...@cloudera.com> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c97bffcc Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c97bffcc Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c97bffcc Branch: refs/heads/master Commit: c97bffcce1e3d053ad6152dae300bf5233507f34 Parents: 6d8fd7e Author: Alex Behm <alex.b...@cloudera.com> Authored: Tue Nov 15 18:27:20 2016 -0800 Committer: Internal Jenkins <cloudera-hud...@gerrit.cloudera.org> Committed: Thu Dec 1 11:04:48 2016 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-scanner.cc | 25 ++++++++++---------- be/src/exec/hdfs-scan-node-mt.cc | 1 + be/src/exec/parquet-column-readers.h | 10 ++++---- be/src/exec/scanner-context.cc | 21 ++++++++-------- be/src/exec/scanner-context.h | 17 ++++++------- .../queries/QueryTest/mt-dop-parquet.test | 14 +++++++++++ 6 files changed, 53 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c97bffcc/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 1f8ff1a..f16f9d9 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -229,25 +229,31 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { } void HdfsParquetScanner::Close(RowBatch* row_batch) { - if (row_batch != NULL) { + if (row_batch != nullptr) { FlushRowGroupResources(row_batch); row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false); if (scan_node_->HasRowBatchQueue()) { static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch); } } else { - if (template_tuple_pool_.get() != NULL) template_tuple_pool_->FreeAll(); - if (!FLAGS_enable_partitioned_hash_join || - !FLAGS_enable_partitioned_aggregation) { + if (template_tuple_pool_ != nullptr) template_tuple_pool_->FreeAll(); + dictionary_pool_.get()->FreeAll(); + context_->ReleaseCompletedResources(nullptr, true); + for (ParquetColumnReader* col_reader: column_readers_) col_reader->Close(nullptr); + if (!FLAGS_enable_partitioned_hash_join || !FLAGS_enable_partitioned_aggregation) { // With the legacy aggs/joins the tuple ptrs of the scratch batch are allocated // from the scratch batch's mem pool. We can get into this case if Open() fails. scratch_batch_->mem_pool()->FreeAll(); } } + if (level_cache_pool_ != nullptr) { + level_cache_pool_->FreeAll(); + level_cache_pool_.reset(); + } // Verify all resources (if any) have been transferred. - DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0); - DCHECK_EQ(dictionary_pool_.get()->total_allocated_bytes(), 0); + DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0); + DCHECK_EQ(dictionary_pool_->total_allocated_bytes(), 0); DCHECK_EQ(scratch_batch_->mem_pool()->total_allocated_bytes(), 0); DCHECK_EQ(context_->num_completed_io_buffers(), 0); @@ -273,12 +279,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) { if (compression_types.empty()) compression_types.push_back(THdfsCompression::NONE); scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types); - if (level_cache_pool_.get() != NULL) { - level_cache_pool_->FreeAll(); - level_cache_pool_.reset(); - } - - if (schema_resolver_.get() != NULL) schema_resolver_.reset(); + if (schema_resolver_.get() != nullptr) schema_resolver_.reset(); for (int i = 0; i < filter_ctxs_.size(); ++i) { const FilterStats* stats = filter_ctxs_[i]->stats; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c97bffcc/be/src/exec/hdfs-scan-node-mt.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc index 84e3eef..bde9f81 100644 --- a/be/src/exec/hdfs-scan-node-mt.cc +++ b/be/src/exec/hdfs-scan-node-mt.cc @@ -118,6 +118,7 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e void HdfsScanNodeMt::Close(RuntimeState* state) { if (is_closed()) return; + if (scanner_.get() != nullptr) scanner_->Close(nullptr); scanner_.reset(); scanner_ctx_.reset(); HdfsScanNodeBase::Close(state); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c97bffcc/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index 9eefc1a..85acb88 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -227,8 +227,8 @@ class ParquetColumnReader { /// Returns true if this column reader has reached the end of the row group. inline bool RowGroupAtEnd() { return rep_level_ == HdfsParquetScanner::ROW_GROUP_END; } - /// Transfers the remaining resources backing tuples to the given row batch, - /// and frees up other resources. + /// If 'row_batch' is non-NULL, transfers the remaining resources backing tuples to it, + /// and frees up other resources. If 'row_batch' is NULL frees all resources instead. virtual void Close(RowBatch* row_batch) = 0; protected: @@ -343,10 +343,12 @@ class BaseScalarColumnReader : public ParquetColumnReader { } virtual void Close(RowBatch* row_batch) { - if (decompressed_data_pool_.get() != NULL) { + if (row_batch != nullptr) { row_batch->tuple_data_pool()->AcquireData(decompressed_data_pool_.get(), false); + } else { + decompressed_data_pool_->FreeAll(); } - if (decompressor_.get() != NULL) decompressor_->Close(); + if (decompressor_ != nullptr) decompressor_->Close(); } int64_t total_len() const { return metadata_->total_compressed_size; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c97bffcc/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 8e8e84c..66f112d 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -92,37 +92,36 @@ ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) { } void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool done) { - DCHECK((batch != NULL) || (batch == NULL && !contains_tuple_data_)); + DCHECK(batch != nullptr || done); if (done) { // Mark any pending resources as completed - if (io_buffer_ != NULL) { + if (io_buffer_ != nullptr) { ++parent_->num_completed_io_buffers_; completed_io_buffers_.push_back(io_buffer_); } - // Set variables to NULL to make sure streams are not used again - io_buffer_ = NULL; - io_buffer_pos_ = NULL; + // Set variables to nullptr to make sure streams are not used again + io_buffer_ = nullptr; + io_buffer_pos_ = nullptr; io_buffer_bytes_left_ = 0; // Cancel the underlying scan range to clean up any queued buffers there scan_range_->Cancel(Status::CANCELLED); } - for (list<DiskIoMgr::BufferDescriptor*>::iterator it = completed_io_buffers_.begin(); - it != completed_io_buffers_.end(); ++it) { - if (contains_tuple_data_) { - batch->AddIoBuffer(*it); + for (DiskIoMgr::BufferDescriptor* buffer: completed_io_buffers_) { + if (contains_tuple_data_ && batch != nullptr) { + batch->AddIoBuffer(buffer); // TODO: We can do row batch compaction here. This is the only place io buffers are // queued. A good heuristic is to check the number of io buffers queued and if // there are too many, we should compact. } else { - (*it)->Return(); + buffer->Return(); parent_->scan_node_->num_owned_io_buffers_.Add(-1); } } parent_->num_completed_io_buffers_ -= completed_io_buffers_.size(); completed_io_buffers_.clear(); - if (contains_tuple_data_) { + if (contains_tuple_data_ && batch != nullptr) { // If we're not done, keep using the last chunk allocated in boundary_pool_ so we // don't have to reallocate. If we are done, transfer it to the row batch. batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), /* keep_current */ !done); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c97bffcc/be/src/exec/scanner-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index 1f1bc0d..0f4e36f 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -254,9 +254,11 @@ class ScannerContext { /// never set to NULL, even if it contains 0 bytes. Status GetNextBuffer(int64_t read_past_size = 0); - /// If 'batch' is not NULL, attaches all completed io buffers and the boundary mem - /// pool to batch. If 'done' is set, releases the completed resources. - /// If 'batch' is NULL then contains_tuple_data_ should be false. + /// If 'batch' is not NULL and 'contains_tuple_data_' is true, attaches all completed + /// io buffers and the boundary mem pool to 'batch'. If 'done' is set, all in-flight + /// resources are also attached or released. + /// If 'batch' is NULL then 'done' must be true. Such a call will release all + /// completed and in-flight resources. void ReleaseCompletedResources(RowBatch* batch, bool done); /// Error-reporting functions. @@ -275,14 +277,13 @@ class ScannerContext { /// from all streams to 'batch'. Attaching only completed resources ensures that buffers /// (and their cleanup) trail the rows that reference them (row batches are consumed and /// cleaned up in order by the rest of the query). - /// If a NULL 'batch' is passed, then it tries to release whatever resource can be - /// released, ie. completed io buffers if 'done' is not set, and the mem pool if 'done' - /// is set. In that case, contains_tuple_data_ should be false. - // /// If 'done' is true, this is the final call for the current streams and any pending /// resources in each stream are also passed to the row batch. Callers which want to /// clear the streams from the context should also call ClearStreams(). - // + /// + /// A NULL 'batch' may be passed to free all resources. It is only valid to pass a NULL + /// 'batch' when also passing 'done'. + /// /// This must be called with 'done' set when the scanner is complete and no longer needs /// any resources (e.g. tuple memory, io buffers) returned from the current streams. /// After calling with 'done' set, this should be called again if new streams are http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c97bffcc/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test index cbd14ef..0523f1d 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test +++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test @@ -24,3 +24,17 @@ limit 10 ---- TYPES bigint,bigint ==== +---- QUERY +# IMPALA-4458: Test proper resource cleanup for cancelled fragments. +# This test is duplicated from nested-types-subplan.test +select c_custkey, c_mktsegment, o_orderkey, o_orderdate +from tpch_nested_parquet.customer c, c.c_orders o +where c_custkey = 1 +limit 3 +---- RESULTS +1,regex:.*,regex:.*,regex:.* +1,regex:.*,regex:.*,regex:.* +1,regex:.*,regex:.*,regex:.* +---- TYPES +bigint,string,bigint,string +====