IMPALA-4138: Fix AcquireState() for batches that change capacity

If MarkAtCapacity() is called on a row batch, it is difficult to call
AcquireState() on that batch because tuple_ptrs_size_ is not accessible
to initialise the destination batch - this is usually calculated from
capacity(), but that value is wrong for these purposes after
MarkAtCapacity().

Add RowBatch::InitialCapacity() to return the initial capacity value of
the batch.

Add row-batch-test to add initial coverage of AcquireState() API.

Change-Id: I6ceca53c406b05cd04b7d95a4f9f2eec7bc127f5
Reviewed-on: http://gerrit.cloudera.org:8080/4428
Reviewed-by: Dan Hecht <dhe...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cd6d86b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cd6d86b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cd6d86b8

Branch: refs/heads/master
Commit: cd6d86b83078290c62c2d92b071a15ec19bbeb4f
Parents: 3aa4351
Author: Henry Robinson <he...@cloudera.com>
Authored: Thu Sep 15 12:08:52 2016 -0700
Committer: Internal Jenkins <cloudera-hud...@gerrit.cloudera.org>
Committed: Wed Sep 21 03:08:02 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/CMakeLists.txt    |  1 +
 be/src/runtime/row-batch-test.cc | 69 +++++++++++++++++++++++++++++++++++
 be/src/runtime/row-batch.cc      |  1 +
 be/src/runtime/row-batch.h       |  8 +++-
 4 files changed, 78 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cd6d86b8/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 6bdce48..a0b0a94 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -88,4 +88,5 @@ ADD_BE_TEST(buffered-tuple-stream-test)
 ADD_BE_TEST(hdfs-fs-cache-test)
 ADD_BE_TEST(tmp-file-mgr-test)
 ADD_BE_TEST(row-batch-serialize-test)
+ADD_BE_TEST(row-batch-test)
 ADD_BE_TEST(collection-value-builder-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cd6d86b8/be/src/runtime/row-batch-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-test.cc b/be/src/runtime/row-batch-test.cc
new file mode 100644
index 0000000..2a8304a
--- /dev/null
+++ b/be/src/runtime/row-batch-test.cc
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "testutil/death-test-util.h"
+#include "testutil/gtest-util.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "testutil/desc-tbl-builder.h"
+
+#include <gtest/gtest.h>
+
+#include "common/names.h"
+
+namespace impala {
+
+TEST(RowBatchTest, AcquireStateWithMarkAtCapacity) {
+  // Test that AcquireState() can be correctly called with MarkAtCapacity() on 
the
+  // source batch.
+  ObjectPool pool;
+  DescriptorTblBuilder builder(&pool);
+  builder.DeclareTuple() << TYPE_INT;
+  DescriptorTbl* desc_tbl = builder.Build();
+
+  vector<bool> nullable_tuples = {false};
+  vector<TTupleId> tuple_id = {static_cast<TupleId>(0)};
+  RowDescriptor row_desc(*desc_tbl, tuple_id, nullable_tuples);
+  MemTracker tracker;
+  {
+    RowBatch src(row_desc, 1024, &tracker);
+    src.AddRow(); src.CommitLastRow();
+    // Calls MarkAtCapacity().
+    src.MarkNeedToReturn();
+
+    // Note InitialCapacity(), not capacity(). Latter will DCHECK.
+    RowBatch dest(row_desc, src.InitialCapacity(), &tracker);
+    dest.AcquireState(&src);
+  }
+
+  // Confirm the bad pattern causes an error.
+  {
+    RowBatch src(row_desc, 1024, &tracker);
+    src.AddRow(); src.CommitLastRow();
+    // Calls MarkAtCapacity().
+    src.MarkNeedToReturn();
+    RowBatch bad_dest(row_desc, src.capacity(), &tracker);
+    IMPALA_ASSERT_DEBUG_DEATH(bad_dest.AcquireState(&src), "");
+  }
+}
+
+}
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cd6d86b8/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 3d076bf..daac913 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -83,6 +83,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const 
TRowBatch& input_batch,
     mem_tracker_(mem_tracker) {
   DCHECK(mem_tracker_ != NULL);
   tuple_ptrs_size_ = num_rows_ * input_batch.row_tuples.size() * 
sizeof(Tuple*);
+  DCHECK_EQ(input_batch.row_tuples.size(), 
row_desc.tuple_descriptors().size());
   DCHECK_GT(tuple_ptrs_size_, 0);
   // TODO: switch to Init() pattern so we can check memory limit and return 
Status.
   if (FLAGS_enable_partitioned_aggregation && 
FLAGS_enable_partitioned_hash_join) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cd6d86b8/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index f529c3d..75c1f44 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -279,6 +279,12 @@ class RowBatch {
   int ALWAYS_INLINE num_rows() const { return num_rows_; }
   int ALWAYS_INLINE capacity() const { return capacity_; }
 
+  // The maximum value that capacity_ ever took, before MarkCapacity() might 
have changed
+  // it.
+  int ALWAYS_INLINE InitialCapacity() const {
+    return tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*));
+  }
+
   const RowDescriptor& row_desc() const { return row_desc_; }
 
   /// Max memory that this row batch can accumulate before it is considered at 
capacity.
@@ -351,7 +357,7 @@ class RowBatch {
 
   const int num_tuples_per_row_;
 
-  /// Array of pointers with capacity_ * num_tuples_per_row_ elements.
+  /// Array of pointers with InitialCapacity() * num_tuples_per_row_ elements.
   /// The memory ownership depends on whether legacy joins and aggs are 
enabled.
   ///
   /// Memory is malloc'd and owned by RowBatch:

Reply via email to