IMPALA-4138: Fix AcquireState() for batches that change capacity If MarkAtCapacity() is called on a row batch, it is difficult to call AcquireState() on that batch because tuple_ptrs_size_ is not accessible to initialise the destination batch - this is usually calculated from capacity(), but that value is wrong for these purposes after MarkAtCapacity().
Add RowBatch::InitialCapacity() to return the initial capacity value of the batch. Add row-batch-test to add initial coverage of AcquireState() API. Change-Id: I6ceca53c406b05cd04b7d95a4f9f2eec7bc127f5 Reviewed-on: http://gerrit.cloudera.org:8080/4428 Reviewed-by: Dan Hecht <dhe...@cloudera.com> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cd6d86b8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cd6d86b8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cd6d86b8 Branch: refs/heads/master Commit: cd6d86b83078290c62c2d92b071a15ec19bbeb4f Parents: 3aa4351 Author: Henry Robinson <he...@cloudera.com> Authored: Thu Sep 15 12:08:52 2016 -0700 Committer: Internal Jenkins <cloudera-hud...@gerrit.cloudera.org> Committed: Wed Sep 21 03:08:02 2016 +0000 ---------------------------------------------------------------------- be/src/runtime/CMakeLists.txt | 1 + be/src/runtime/row-batch-test.cc | 69 +++++++++++++++++++++++++++++++++++ be/src/runtime/row-batch.cc | 1 + be/src/runtime/row-batch.h | 8 +++- 4 files changed, 78 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cd6d86b8/be/src/runtime/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 6bdce48..a0b0a94 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -88,4 +88,5 @@ ADD_BE_TEST(buffered-tuple-stream-test) ADD_BE_TEST(hdfs-fs-cache-test) ADD_BE_TEST(tmp-file-mgr-test) ADD_BE_TEST(row-batch-serialize-test) +ADD_BE_TEST(row-batch-test) ADD_BE_TEST(collection-value-builder-test) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cd6d86b8/be/src/runtime/row-batch-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch-test.cc b/be/src/runtime/row-batch-test.cc new file mode 100644 index 0000000..2a8304a --- /dev/null +++ b/be/src/runtime/row-batch-test.cc @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "testutil/death-test-util.h" +#include "testutil/gtest-util.h" +#include "runtime/mem-tracker.h" +#include "runtime/row-batch.h" +#include "testutil/desc-tbl-builder.h" + +#include <gtest/gtest.h> + +#include "common/names.h" + +namespace impala { + +TEST(RowBatchTest, AcquireStateWithMarkAtCapacity) { + // Test that AcquireState() can be correctly called with MarkAtCapacity() on the + // source batch. + ObjectPool pool; + DescriptorTblBuilder builder(&pool); + builder.DeclareTuple() << TYPE_INT; + DescriptorTbl* desc_tbl = builder.Build(); + + vector<bool> nullable_tuples = {false}; + vector<TTupleId> tuple_id = {static_cast<TupleId>(0)}; + RowDescriptor row_desc(*desc_tbl, tuple_id, nullable_tuples); + MemTracker tracker; + { + RowBatch src(row_desc, 1024, &tracker); + src.AddRow(); src.CommitLastRow(); + // Calls MarkAtCapacity(). + src.MarkNeedToReturn(); + + // Note InitialCapacity(), not capacity(). Latter will DCHECK. + RowBatch dest(row_desc, src.InitialCapacity(), &tracker); + dest.AcquireState(&src); + } + + // Confirm the bad pattern causes an error. + { + RowBatch src(row_desc, 1024, &tracker); + src.AddRow(); src.CommitLastRow(); + // Calls MarkAtCapacity(). + src.MarkNeedToReturn(); + RowBatch bad_dest(row_desc, src.capacity(), &tracker); + IMPALA_ASSERT_DEBUG_DEATH(bad_dest.AcquireState(&src), ""); + } +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cd6d86b8/be/src/runtime/row-batch.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index 3d076bf..daac913 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -83,6 +83,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, mem_tracker_(mem_tracker) { DCHECK(mem_tracker_ != NULL); tuple_ptrs_size_ = num_rows_ * input_batch.row_tuples.size() * sizeof(Tuple*); + DCHECK_EQ(input_batch.row_tuples.size(), row_desc.tuple_descriptors().size()); DCHECK_GT(tuple_ptrs_size_, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cd6d86b8/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index f529c3d..75c1f44 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -279,6 +279,12 @@ class RowBatch { int ALWAYS_INLINE num_rows() const { return num_rows_; } int ALWAYS_INLINE capacity() const { return capacity_; } + // The maximum value that capacity_ ever took, before MarkCapacity() might have changed + // it. + int ALWAYS_INLINE InitialCapacity() const { + return tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*)); + } + const RowDescriptor& row_desc() const { return row_desc_; } /// Max memory that this row batch can accumulate before it is considered at capacity. @@ -351,7 +357,7 @@ class RowBatch { const int num_tuples_per_row_; - /// Array of pointers with capacity_ * num_tuples_per_row_ elements. + /// Array of pointers with InitialCapacity() * num_tuples_per_row_ elements. /// The memory ownership depends on whether legacy joins and aggs are enabled. /// /// Memory is malloc'd and owned by RowBatch: