IMPALA-4458: Fix resource cleanup of cancelled mt scan nodes.

The bug was that HdfsScanNodeMt::Close() did not properly
clean up all in-flight resources when called through the
query cancellation path.

The main change is to clean up all resources when passing
a NULL batch into HdfsparquetScanner::Close() which also
needed similar changes in the scanner context.

Testing: Ran test_cancellation.py, test_scanners.py and
test_nested_types.py with MT_DOP=3. Added a test query
with a limit that was failing before.
A regular private hdfs/core test run succeeded.

Change-Id: Ib32f87b3289ed9e8fc2db0885675845e11207438
Reviewed-on: http://gerrit.cloudera.org:8080/5274
Reviewed-by: Alex Behm <alex.b...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c97bffcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c97bffcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c97bffcc

Branch: refs/heads/master
Commit: c97bffcce1e3d053ad6152dae300bf5233507f34
Parents: 6d8fd7e
Author: Alex Behm <alex.b...@cloudera.com>
Authored: Tue Nov 15 18:27:20 2016 -0800
Committer: Internal Jenkins <cloudera-hud...@gerrit.cloudera.org>
Committed: Thu Dec 1 11:04:48 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc             | 25 ++++++++++----------
 be/src/exec/hdfs-scan-node-mt.cc                |  1 +
 be/src/exec/parquet-column-readers.h            | 10 ++++----
 be/src/exec/scanner-context.cc                  | 21 ++++++++--------
 be/src/exec/scanner-context.h                   | 17 ++++++-------
 .../queries/QueryTest/mt-dop-parquet.test       | 14 +++++++++++
 6 files changed, 53 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c97bffcc/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index 1f8ff1a..f16f9d9 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -229,25 +229,31 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 }
 
 void HdfsParquetScanner::Close(RowBatch* row_batch) {
-  if (row_batch != NULL) {
+  if (row_batch != nullptr) {
     FlushRowGroupResources(row_batch);
     row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), 
false);
     if (scan_node_->HasRowBatchQueue()) {
       
static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
     }
   } else {
-    if (template_tuple_pool_.get() != NULL) template_tuple_pool_->FreeAll();
-    if (!FLAGS_enable_partitioned_hash_join ||
-        !FLAGS_enable_partitioned_aggregation) {
+    if (template_tuple_pool_ != nullptr) template_tuple_pool_->FreeAll();
+    dictionary_pool_.get()->FreeAll();
+    context_->ReleaseCompletedResources(nullptr, true);
+    for (ParquetColumnReader* col_reader: column_readers_) 
col_reader->Close(nullptr);
+    if (!FLAGS_enable_partitioned_hash_join || 
!FLAGS_enable_partitioned_aggregation) {
       // With the legacy aggs/joins the tuple ptrs of the scratch batch are 
allocated
       // from the scratch batch's mem pool. We can get into this case if 
Open() fails.
       scratch_batch_->mem_pool()->FreeAll();
     }
   }
+  if (level_cache_pool_ != nullptr) {
+    level_cache_pool_->FreeAll();
+    level_cache_pool_.reset();
+  }
 
   // Verify all resources (if any) have been transferred.
-  DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);
-  DCHECK_EQ(dictionary_pool_.get()->total_allocated_bytes(), 0);
+  DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
+  DCHECK_EQ(dictionary_pool_->total_allocated_bytes(), 0);
   DCHECK_EQ(scratch_batch_->mem_pool()->total_allocated_bytes(), 0);
   DCHECK_EQ(context_->num_completed_io_buffers(), 0);
 
@@ -273,12 +279,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
   if (compression_types.empty()) 
compression_types.push_back(THdfsCompression::NONE);
   scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types);
 
-  if (level_cache_pool_.get() != NULL) {
-    level_cache_pool_->FreeAll();
-    level_cache_pool_.reset();
-  }
-
-  if (schema_resolver_.get() != NULL) schema_resolver_.reset();
+  if (schema_resolver_.get() != nullptr) schema_resolver_.reset();
 
   for (int i = 0; i < filter_ctxs_.size(); ++i) {
     const FilterStats* stats = filter_ctxs_[i]->stats;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c97bffcc/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 84e3eef..bde9f81 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -118,6 +118,7 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, 
RowBatch* row_batch, bool* e
 
 void HdfsScanNodeMt::Close(RuntimeState* state) {
   if (is_closed()) return;
+  if (scanner_.get() != nullptr) scanner_->Close(nullptr);
   scanner_.reset();
   scanner_ctx_.reset();
   HdfsScanNodeBase::Close(state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c97bffcc/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h 
b/be/src/exec/parquet-column-readers.h
index 9eefc1a..85acb88 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -227,8 +227,8 @@ class ParquetColumnReader {
   /// Returns true if this column reader has reached the end of the row group.
   inline bool RowGroupAtEnd() { return rep_level_ == 
HdfsParquetScanner::ROW_GROUP_END; }
 
-  /// Transfers the remaining resources backing tuples to the given row batch,
-  /// and frees up other resources.
+  /// If 'row_batch' is non-NULL, transfers the remaining resources backing 
tuples to it,
+  /// and frees up other resources. If 'row_batch' is NULL frees all resources 
instead.
   virtual void Close(RowBatch* row_batch) = 0;
 
  protected:
@@ -343,10 +343,12 @@ class BaseScalarColumnReader : public ParquetColumnReader 
{
   }
 
   virtual void Close(RowBatch* row_batch) {
-    if (decompressed_data_pool_.get() != NULL) {
+    if (row_batch != nullptr) {
       row_batch->tuple_data_pool()->AcquireData(decompressed_data_pool_.get(), 
false);
+    } else {
+      decompressed_data_pool_->FreeAll();
     }
-    if (decompressor_.get() != NULL) decompressor_->Close();
+    if (decompressor_ != nullptr) decompressor_->Close();
   }
 
   int64_t total_len() const { return metadata_->total_compressed_size; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c97bffcc/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 8e8e84c..66f112d 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -92,37 +92,36 @@ ScannerContext::Stream* 
ScannerContext::AddStream(DiskIoMgr::ScanRange* range) {
 }
 
 void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool 
done) {
-  DCHECK((batch != NULL) || (batch == NULL && !contains_tuple_data_));
+  DCHECK(batch != nullptr || done);
   if (done) {
     // Mark any pending resources as completed
-    if (io_buffer_ != NULL) {
+    if (io_buffer_ != nullptr) {
       ++parent_->num_completed_io_buffers_;
       completed_io_buffers_.push_back(io_buffer_);
     }
-    // Set variables to NULL to make sure streams are not used again
-    io_buffer_ = NULL;
-    io_buffer_pos_ = NULL;
+    // Set variables to nullptr to make sure streams are not used again
+    io_buffer_ = nullptr;
+    io_buffer_pos_ = nullptr;
     io_buffer_bytes_left_ = 0;
     // Cancel the underlying scan range to clean up any queued buffers there
     scan_range_->Cancel(Status::CANCELLED);
   }
 
-  for (list<DiskIoMgr::BufferDescriptor*>::iterator it = 
completed_io_buffers_.begin();
-       it != completed_io_buffers_.end(); ++it) {
-    if (contains_tuple_data_) {
-      batch->AddIoBuffer(*it);
+  for (DiskIoMgr::BufferDescriptor* buffer: completed_io_buffers_) {
+    if (contains_tuple_data_ && batch != nullptr) {
+      batch->AddIoBuffer(buffer);
       // TODO: We can do row batch compaction here.  This is the only place io 
buffers are
       // queued.  A good heuristic is to check the number of io buffers queued 
and if
       // there are too many, we should compact.
     } else {
-      (*it)->Return();
+      buffer->Return();
       parent_->scan_node_->num_owned_io_buffers_.Add(-1);
     }
   }
   parent_->num_completed_io_buffers_ -= completed_io_buffers_.size();
   completed_io_buffers_.clear();
 
-  if (contains_tuple_data_) {
+  if (contains_tuple_data_ && batch != nullptr) {
     // If we're not done, keep using the last chunk allocated in 
boundary_pool_ so we
     // don't have to reallocate. If we are done, transfer it to the row batch.
     batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), /* 
keep_current */ !done);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c97bffcc/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 1f1bc0d..0f4e36f 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -254,9 +254,11 @@ class ScannerContext {
     /// never set to NULL, even if it contains 0 bytes.
     Status GetNextBuffer(int64_t read_past_size = 0);
 
-    /// If 'batch' is not NULL, attaches all completed io buffers and the 
boundary mem
-    /// pool to batch.  If 'done' is set, releases the completed resources.
-    /// If 'batch' is NULL then contains_tuple_data_ should be false.
+    /// If 'batch' is not NULL and 'contains_tuple_data_' is true, attaches 
all completed
+    /// io buffers and the boundary mem pool to 'batch'. If 'done' is set, all 
in-flight
+    /// resources are also attached or released.
+    /// If 'batch' is NULL then 'done' must be true. Such a call will release 
all
+    /// completed and in-flight resources.
     void ReleaseCompletedResources(RowBatch* batch, bool done);
 
     /// Error-reporting functions.
@@ -275,14 +277,13 @@ class ScannerContext {
   /// from all streams to 'batch'. Attaching only completed resources ensures 
that buffers
   /// (and their cleanup) trail the rows that reference them (row batches are 
consumed and
   /// cleaned up in order by the rest of the query).
-  /// If a NULL 'batch' is passed, then it tries to release whatever resource 
can be
-  /// released, ie. completed io buffers if 'done' is not set, and the mem 
pool if 'done'
-  /// is set. In that case, contains_tuple_data_ should be false.
-  //
   /// If 'done' is true, this is the final call for the current streams and 
any pending
   /// resources in each stream are also passed to the row batch. Callers which 
want to
   /// clear the streams from the context should also call ClearStreams().
-  //
+  ///
+  /// A NULL 'batch' may be passed to free all resources. It is only valid to 
pass a NULL
+  /// 'batch' when also passing 'done'.
+  ///
   /// This must be called with 'done' set when the scanner is complete and no 
longer needs
   /// any resources (e.g. tuple memory, io buffers) returned from the current 
streams.
   /// After calling with 'done' set, this should be called again if new 
streams are

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c97bffcc/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test 
b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test
index cbd14ef..0523f1d 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test
@@ -24,3 +24,17 @@ limit 10
 ---- TYPES
 bigint,bigint
 ====
+---- QUERY
+# IMPALA-4458: Test proper resource cleanup for cancelled fragments.
+# This test is duplicated from nested-types-subplan.test
+select c_custkey, c_mktsegment, o_orderkey, o_orderdate
+from tpch_nested_parquet.customer c, c.c_orders o
+where c_custkey = 1
+limit 3
+---- RESULTS
+1,regex:.*,regex:.*,regex:.*
+1,regex:.*,regex:.*,regex:.*
+1,regex:.*,regex:.*,regex:.*
+---- TYPES
+bigint,string,bigint,string
+====

Reply via email to