IMPALA-4674: Part 2.5: Rename BufferedTupleStreamV2

This is cleanup that wasn't included in Part 2.

Testing:
Confirmed that everything (including be tests) built ok,
buffered-tuple-stream-v2-test passed and that I could
run a couple of basic queries.

Change-Id: Ib8b23d7c2d7488d9f74b08cc9adb4ed1a93e3591
Reviewed-on: http://gerrit.cloudera.org:8080/7609
Reviewed-by: Matthew Jacobs <m...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0c46147e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0c46147e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0c46147e

Branch: refs/heads/master
Commit: 0c46147e5fd93e2a9a63d145d60b656d2f6a7612
Parents: 5caadbb
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Mon Aug 7 09:07:00 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Tue Aug 8 10:22:20 2017 +0000

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc               |    8 +-
 be/src/exec/analytic-eval-node.h                |    4 +-
 be/src/exec/hash-table-test.cc                  |   12 +-
 be/src/exec/hash-table.cc                       |    4 +-
 be/src/exec/hash-table.h                        |   14 +-
 be/src/exec/hash-table.inline.h                 |    2 +-
 be/src/exec/partitioned-aggregation-node-ir.cc  |    2 +-
 be/src/exec/partitioned-aggregation-node.cc     |   28 +-
 be/src/exec/partitioned-aggregation-node.h      |   12 +-
 be/src/exec/partitioned-hash-join-builder-ir.cc |   10 +-
 be/src/exec/partitioned-hash-join-builder.cc    |   34 +-
 be/src/exec/partitioned-hash-join-builder.h     |   24 +-
 be/src/exec/partitioned-hash-join-node-ir.cc    |    4 +-
 be/src/exec/partitioned-hash-join-node.cc       |   34 +-
 be/src/exec/partitioned-hash-join-node.h        |   14 +-
 be/src/exec/partitioned-hash-join-node.inline.h |    2 +-
 be/src/runtime/CMakeLists.txt                   |    4 +-
 be/src/runtime/buffered-tuple-stream-test.cc    | 1462 ++++++++++++++++++
 be/src/runtime/buffered-tuple-stream-v2-test.cc | 1462 ------------------
 be/src/runtime/buffered-tuple-stream-v2.cc      | 1084 -------------
 be/src/runtime/buffered-tuple-stream-v2.h       |  705 ---------
 .../runtime/buffered-tuple-stream-v2.inline.h   |   56 -
 be/src/runtime/buffered-tuple-stream.cc         | 1084 +++++++++++++
 be/src/runtime/buffered-tuple-stream.h          |  705 +++++++++
 be/src/runtime/buffered-tuple-stream.inline.h   |   56 +
 25 files changed, 3413 insertions(+), 3413 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc 
b/be/src/exec/analytic-eval-node.cc
index f6d96ae..af4f866 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -23,7 +23,7 @@
 #include "exprs/agg-fn-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
@@ -195,7 +195,7 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
     RETURN_IF_ERROR(ClaimBufferReservation(state));
   }
   DCHECK(input_stream_ == nullptr);
-  input_stream_.reset(new BufferedTupleStreamV2(state, child(0)->row_desc(),
+  input_stream_.reset(new BufferedTupleStream(state, child(0)->row_desc(),
       &buffer_pool_client_, resource_profile_.spillable_buffer_size,
       resource_profile_.spillable_buffer_size));
   RETURN_IF_ERROR(input_stream_->Init(id(), true));
@@ -363,7 +363,7 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, 
TupleRow* row) {
     // TODO: Consider re-pinning later if the output stream is fully consumed.
     RETURN_IF_ERROR(status);
     RETURN_IF_ERROR(state_->StartSpilling(mem_tracker()));
-    
input_stream_->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+    input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
     VLOG_FILE << id() << " Unpin input stream while adding row idx=" << 
stream_idx;
     if (!input_stream_->AddRow(row, &status)) {
       // Rows should be added in unpinned mode unless an error occurs.
@@ -623,7 +623,7 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* 
state) {
             << " tuple pool size:" << 
curr_tuple_pool_->total_allocated_bytes();
   SCOPED_TIMER(evaluation_timer_);
 
-  // BufferedTupleStreamV2::num_rows() returns the total number of rows that 
have been
+  // BufferedTupleStream::num_rows() returns the total number of rows that 
have been
   // inserted into the stream (it does not decrease when we read rows), so the 
index of
   // the next input row that will be inserted will be the current size of the 
stream.
   int64_t stream_idx = input_stream_->num_rows();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index eab9198..671eaa4 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -19,7 +19,7 @@
 #define IMPALA_EXEC_ANALYTIC_EVAL_NODE_H
 
 #include "exec/exec-node.h"
-#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/buffered-tuple-stream.h"
 #include "runtime/tuple.h"
 
 namespace impala {
@@ -339,7 +339,7 @@ class AnalyticEvalNode : public ExecNode {
   /// buffers with tuple data are attached to an output row batch on eos or
   /// ReachedLimit().
   /// TODO: Consider re-pinning unpinned streams when possible.
-  boost::scoped_ptr<BufferedTupleStreamV2> input_stream_;
+  boost::scoped_ptr<BufferedTupleStream> input_stream_;
 
   /// Pool used for O(1) allocations that live until Close() or Reset().
   /// Does not own data backing tuples returned in GetNext(), so it does not

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 7a6ec9d..aad9134 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -309,7 +309,7 @@ class HashTableTest : public testing::Test {
 
     for (int i = 0; i < 2; ++i) {
       if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
-      BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+      BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
       EXPECT_TRUE(hash_table->stores_tuples_);
       Status status;
       bool inserted =
@@ -350,7 +350,7 @@ class HashTableTest : public testing::Test {
     ASSERT_TRUE(success);
     for (int i = 0; i < 5; ++i) {
       if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
-      BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+      BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
       EXPECT_TRUE(hash_table->stores_tuples_);
       bool inserted =
           hash_table->Insert(ht_ctx.get(), dummy_flat_row, build_rows[i], 
&status);
@@ -418,7 +418,7 @@ class HashTableTest : public testing::Test {
       for (int i = 0; i < val; ++i) {
         TupleRow* row = CreateTupleRow(val);
         if (!ht_ctx->EvalAndHashBuild(row)) continue;
-        BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+        BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
         EXPECT_TRUE(hash_table->stores_tuples_);
         ASSERT_TRUE(hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, 
&status));
         ASSERT_OK(status);
@@ -481,7 +481,7 @@ class HashTableTest : public testing::Test {
       for (int j = 0; j < num_to_add; ++build_row_val, ++j) {
         TupleRow* row = CreateTupleRow(build_row_val);
         if (!ht_ctx->EvalAndHashBuild(row)) continue;
-        BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+        BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
         EXPECT_TRUE(hash_table->stores_tuples_);
         bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, 
&status);
         ASSERT_OK(status);
@@ -518,7 +518,7 @@ class HashTableTest : public testing::Test {
     while (true) {
       TupleRow* duplicate_row = CreateTupleRow(DUPLICATE_VAL);
       if (!ht_ctx->EvalAndHashBuild(duplicate_row)) continue;
-      BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+      BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
       bool inserted =
           hash_table->Insert(ht_ctx.get(), dummy_flat_row, duplicate_row, 
&status);
       ASSERT_OK(status);
@@ -569,7 +569,7 @@ class HashTableTest : public testing::Test {
 
       // Insert using both Insert() and FindBucket() methods.
       if (build_row_val % 2 == 0) {
-        BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+        BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr;
         EXPECT_TRUE(hash_table->stores_tuples_);
         bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, 
&status);
         EXPECT_TRUE(inserted);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index aacedc2..e65d9f1 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -385,14 +385,14 @@ constexpr double HashTable::MAX_FILL_FACTOR;
 constexpr int64_t HashTable::DATA_PAGE_SIZE;
 
 HashTable* HashTable::Create(Suballocator* allocator, bool stores_duplicates,
-    int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t 
max_num_buckets,
+    int num_build_tuples, BufferedTupleStream* tuple_stream, int64_t 
max_num_buckets,
     int64_t initial_num_buckets) {
   return new HashTable(FLAGS_enable_quadratic_probing, allocator, 
stores_duplicates,
       num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets);
 }
 
 HashTable::HashTable(bool quadratic_probing, Suballocator* allocator,
-    bool stores_duplicates, int num_build_tuples, BufferedTupleStreamV2* 
stream,
+    bool stores_duplicates, int num_build_tuples, BufferedTupleStream* stream,
     int64_t max_num_buckets, int64_t num_buckets)
   : allocator_(allocator),
     tuple_stream_(stream),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 297e619..d764640 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -26,8 +26,8 @@
 #include "codegen/impala-ir.h"
 #include "common/compiler-util.h"
 #include "common/logging.h"
-#include "runtime/buffered-tuple-stream-v2.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.h"
+#include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/suballocator.h"
 #include "runtime/tuple-row.h"
@@ -533,7 +533,7 @@ class HashTable {
   /// of two formats, depending on the number of tuples in the row.
   union HtData {
     // For rows with multiple tuples per row, a pointer to the flattened 
TupleRow.
-    BufferedTupleStreamV2::FlatRowPtr flat_row;
+    BufferedTupleStream::FlatRowPtr flat_row;
     // For rows with one tuple per row, a pointer to the Tuple itself.
     Tuple* tuple;
   };
@@ -600,7 +600,7 @@ class HashTable {
   ///  - initial_num_buckets: number of buckets that the hash table should be 
initialized
   ///    with.
   static HashTable* Create(Suballocator* allocator, bool stores_duplicates,
-      int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t 
max_num_buckets,
+      int num_build_tuples, BufferedTupleStream* tuple_stream, int64_t 
max_num_buckets,
       int64_t initial_num_buckets);
 
   /// Allocates the initial bucket structure. Returns a non-OK status if an 
error is
@@ -623,7 +623,7 @@ class HashTable {
   /// is stored. The 'row' is not copied by the hash table and the caller must 
guarantee
   /// it stays in memory. This will not grow the hash table.
   bool IR_ALWAYS_INLINE Insert(HashTableCtx* ht_ctx,
-      BufferedTupleStreamV2::FlatRowPtr flat_row, TupleRow* row,
+      BufferedTupleStream::FlatRowPtr flat_row, TupleRow* row,
       Status* status) WARN_UNUSED_RESULT;
 
   /// Prefetch the hash table bucket which the given hash value 'hash' maps to.
@@ -819,7 +819,7 @@ class HashTable {
   ///  - quadratic_probing: set to true when the probing algorithm is 
quadratic, as
   ///    opposed to linear.
   HashTable(bool quadratic_probing, Suballocator* allocator, bool 
stores_duplicates,
-      int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t 
max_num_buckets,
+      int num_build_tuples, BufferedTupleStream* tuple_stream, int64_t 
max_num_buckets,
       int64_t initial_num_buckets);
 
   /// Performs the probing operation according to the probing algorithm 
(linear or
@@ -918,7 +918,7 @@ class HashTable {
   /// Stream contains the rows referenced by the hash table. Can be NULL if the
   /// row only contains a single tuple, in which case the TupleRow indirection
   /// is removed by the hash table.
-  BufferedTupleStreamV2* tuple_stream_;
+  BufferedTupleStream* tuple_stream_;
 
   /// Constants on how the hash table should behave.
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/hash-table.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h
index ce2f784..85d7ad6 100644
--- a/be/src/exec/hash-table.inline.h
+++ b/be/src/exec/hash-table.inline.h
@@ -109,7 +109,7 @@ inline HashTable::HtData* HashTable::InsertInternal(
 }
 
 inline bool HashTable::Insert(HashTableCtx* ht_ctx,
-    BufferedTupleStreamV2::FlatRowPtr flat_row, TupleRow* row, Status* status) 
{
+    BufferedTupleStream::FlatRowPtr flat_row, TupleRow* row, Status* status) {
   HtData* htdata = InsertInternal(ht_ctx, status);
   // If successful insert, update the contents of the newly inserted entry 
with 'idx'.
   if (LIKELY(htdata != NULL)) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc 
b/be/src/exec/partitioned-aggregation-node-ir.cc
index 126a2a5..9baada1 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -21,7 +21,7 @@
 #include "exprs/agg-fn-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/row-batch.h"
 #include "runtime/tuple-row.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc 
b/be/src/exec/partitioned-aggregation-node.cc
index fc0a4a6..16db5cc 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -31,7 +31,7 @@
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/slot-ref.h"
 #include "gutil/strings/substitute.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-pool.h"
@@ -275,7 +275,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* 
state) {
           &buffer_pool_client_, resource_profile_.spillable_buffer_size));
 
       if (!is_streaming_preagg_ && needs_serialize_) {
-        serialize_stream_.reset(new BufferedTupleStreamV2(state, 
&intermediate_row_desc_,
+        serialize_stream_.reset(new BufferedTupleStream(state, 
&intermediate_row_desc_,
             &buffer_pool_client_, resource_profile_.spillable_buffer_size,
             resource_profile_.spillable_buffer_size));
         RETURN_IF_ERROR(serialize_stream_->Init(id(), false));
@@ -722,7 +722,7 @@ Status PartitionedAggregationNode::Partition::InitStreams() 
{
     }
   }
 
-  aggregated_row_stream.reset(new BufferedTupleStreamV2(parent->state_,
+  aggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
       &parent->intermediate_row_desc_, &parent->buffer_pool_client_,
       parent->resource_profile_.spillable_buffer_size,
       parent->resource_profile_.spillable_buffer_size, external_varlen_slots));
@@ -740,7 +740,7 @@ Status PartitionedAggregationNode::Partition::InitStreams() 
{
   }
 
   if (!parent->is_streaming_preagg_) {
-    unaggregated_row_stream.reset(new BufferedTupleStreamV2(parent->state_,
+    unaggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
         parent->child(0)->row_desc(), &parent->buffer_pool_client_,
         parent->resource_profile_.spillable_buffer_size,
         parent->resource_profile_.spillable_buffer_size));
@@ -786,7 +786,7 @@ Status 
PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
 
     // Serialize and copy the spilled partition's stream into the new stream.
     Status status;
-    BufferedTupleStreamV2* new_stream = parent->serialize_stream_.get();
+    BufferedTupleStream* new_stream = parent->serialize_stream_.get();
     HashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get());
     while (!it.AtEnd()) {
       Tuple* tuple = it.GetTuple();
@@ -811,7 +811,7 @@ Status 
PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
     // when we need to spill again. We need to have this available before we 
need
     // to spill to make sure it is available. This should be acquirable since 
we just
     // freed at least one buffer from this partition's (old) 
aggregated_row_stream.
-    parent->serialize_stream_.reset(new BufferedTupleStreamV2(parent->state_,
+    parent->serialize_stream_.reset(new BufferedTupleStream(parent->state_,
         &parent->intermediate_row_desc_, &parent->buffer_pool_client_,
         parent->resource_profile_.spillable_buffer_size,
         parent->resource_profile_.spillable_buffer_size));
@@ -866,9 +866,9 @@ Status PartitionedAggregationNode::Partition::Spill(bool 
more_aggregate_rows) {
   DCHECK(aggregated_row_stream->has_write_iterator());
   DCHECK(!unaggregated_row_stream->has_write_iterator());
   if (more_aggregate_rows) {
-    
aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+    
aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
   } else {
-    aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+    aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
     bool got_buffer;
     RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
     DCHECK(got_buffer)
@@ -932,7 +932,7 @@ Tuple* 
PartitionedAggregationNode::ConstructIntermediateTuple(
 }
 
 Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
-    const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStreamV2* stream,
+    const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStream* stream,
     Status* status) noexcept {
   DCHECK(stream != NULL && status != NULL);
   // Allocate space for the entire tuple in the stream.
@@ -1077,7 +1077,7 @@ Status PartitionedAggregationNode::AppendSpilledRow(
     Partition* __restrict__ partition, TupleRow* __restrict__ row) {
   DCHECK(!is_streaming_preagg_);
   DCHECK(partition->is_spilled());
-  BufferedTupleStreamV2* stream = AGGREGATED_ROWS ?
+  BufferedTupleStream* stream = AGGREGATED_ROWS ?
       partition->aggregated_row_stream.get() :
       partition->unaggregated_row_stream.get();
   DCHECK(!stream->is_pinned());
@@ -1297,7 +1297,7 @@ Status 
PartitionedAggregationNode::RepartitionSpilledPartition() {
     if (!hash_partition->is_spilled()) continue;
     // The aggregated rows have been repartitioned. Free up at least a 
buffer's worth of
     // reservation and use it to pin the unaggregated write buffer.
-    
hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+    
hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
     bool got_buffer;
     RETURN_IF_ERROR(
         hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
@@ -1332,7 +1332,7 @@ Status 
PartitionedAggregationNode::RepartitionSpilledPartition() {
 }
 
 template <bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessStream(BufferedTupleStreamV2* 
input_stream) {
+Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream* 
input_stream) {
   DCHECK(!is_streaming_preagg_);
   if (input_stream->num_rows() > 0) {
     while (true) {
@@ -1430,8 +1430,8 @@ void 
PartitionedAggregationNode::PushSpilledPartition(Partition* partition) {
   // Ensure all pages in the spilled partition's streams are unpinned by 
invalidating
   // the streams' read and write iterators. We may need all the memory to 
process the
   // next spilled partitions.
-  
partition->aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
-  
partition->unaggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+  
partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+  
partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
   spilled_partitions_.push_front(partition);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h 
b/be/src/exec/partitioned-aggregation-node.h
index 4f8b622..c230630 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -25,7 +25,7 @@
 
 #include "exec/exec-node.h"
 #include "exec/hash-table.h"
-#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/buffered-tuple-stream.h"
 #include "runtime/bufferpool/suballocator.h"
 #include "runtime/descriptors.h" // for TupleId
 #include "runtime/mem-pool.h"
@@ -425,18 +425,18 @@ class PartitionedAggregationNode : public ExecNode {
     /// For streaming preaggs, this may be NULL if sufficient memory is not 
available.
     /// In that case hash_tbl is also NULL and all rows for the partition will 
be passed
     /// through.
-    boost::scoped_ptr<BufferedTupleStreamV2> aggregated_row_stream;
+    boost::scoped_ptr<BufferedTupleStream> aggregated_row_stream;
 
     /// Unaggregated rows that are spilled. Always NULL for streaming 
pre-aggregations.
     /// Always unpinned. Has a write buffer allocated when the partition is 
spilled and
     /// unaggregated rows are being processed.
-    boost::scoped_ptr<BufferedTupleStreamV2> unaggregated_row_stream;
+    boost::scoped_ptr<BufferedTupleStream> unaggregated_row_stream;
   };
 
   /// Stream used to store serialized spilled rows. Only used if 
needs_serialize_
   /// is set. This stream is never pinned and only used in Partition::Spill as 
a
   /// a temporary buffer.
-  boost::scoped_ptr<BufferedTupleStreamV2> serialize_stream_;
+  boost::scoped_ptr<BufferedTupleStream> serialize_stream_;
 
   /// Accessor for 'hash_tbls_' that verifies consistency with the partitions.
   HashTable* ALWAYS_INLINE GetHashTable(int partition_idx) {
@@ -471,7 +471,7 @@ class PartitionedAggregationNode : public ExecNode {
   /// FunctionContexts, so is stored outside the stream. If stream's small 
buffers get
   /// full, it will attempt to switch to IO-buffers.
   Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& 
agg_fn_evals,
-      BufferedTupleStreamV2* stream, Status* status) noexcept;
+      BufferedTupleStream* stream, Status* status) noexcept;
 
   /// Constructs intermediate tuple, allocating memory from pool instead of 
the stream.
   /// Returns NULL and sets status if there is not enough memory to allocate 
the tuple.
@@ -571,7 +571,7 @@ class PartitionedAggregationNode : public ExecNode {
 
   /// Reads all the rows from input_stream and process them by calling 
ProcessBatch().
   template <bool AGGREGATED_ROWS>
-  Status ProcessStream(BufferedTupleStreamV2* input_stream) WARN_UNUSED_RESULT;
+  Status ProcessStream(BufferedTupleStream* input_stream) WARN_UNUSED_RESULT;
 
   /// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'.
   void GetSingletonOutput(RowBatch* row_batch);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-builder-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc 
b/be/src/exec/partitioned-hash-join-builder-ir.cc
index df58036..e15e116 100644
--- a/be/src/exec/partitioned-hash-join-builder-ir.cc
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -19,7 +19,7 @@
 
 #include "codegen/impala-ir.h"
 #include "exec/hash-table.inline.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter.h"
@@ -30,7 +30,7 @@
 using namespace impala;
 
 inline bool PhjBuilder::AppendRow(
-    BufferedTupleStreamV2* stream, TupleRow* row, Status* status) {
+    BufferedTupleStream* stream, TupleRow* row, Status* status) {
   if (LIKELY(stream->AddRow(row, status))) return true;
   if (UNLIKELY(!status->ok())) return false;
   return AppendRowStreamFull(stream, row, status);
@@ -73,12 +73,12 @@ Status PhjBuilder::ProcessBuildBatch(
 
 bool PhjBuilder::Partition::InsertBatch(TPrefetchMode::type prefetch_mode,
     HashTableCtx* ht_ctx, RowBatch* batch,
-    const vector<BufferedTupleStreamV2::FlatRowPtr>& flat_rows, Status* 
status) {
+    const vector<BufferedTupleStream::FlatRowPtr>& flat_rows, Status* status) {
   // Compute the hash values and prefetch the hash table buckets.
   const int num_rows = batch->num_rows();
   HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
   const int prefetch_size = expr_vals_cache->capacity();
-  const BufferedTupleStreamV2::FlatRowPtr* flat_rows_data = flat_rows.data();
+  const BufferedTupleStream::FlatRowPtr* flat_rows_data = flat_rows.data();
   for (int prefetch_group_row = 0; prefetch_group_row < num_rows;
        prefetch_group_row += prefetch_size) {
     int cur_row = prefetch_group_row;
@@ -97,7 +97,7 @@ bool PhjBuilder::Partition::InsertBatch(TPrefetchMode::type 
prefetch_mode,
     expr_vals_cache->ResetForRead();
     FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
       TupleRow* row = batch_iter.Get();
-      BufferedTupleStreamV2::FlatRowPtr flat_row = flat_rows_data[cur_row];
+      BufferedTupleStream::FlatRowPtr flat_row = flat_rows_data[cur_row];
       if (!expr_vals_cache->IsRowNull()
           && UNLIKELY(!hash_tbl_->Insert(ht_ctx, flat_row, row, status))) {
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc 
b/be/src/exec/partitioned-hash-join-builder.cc
index a2f7c96..2dc2d8d 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -25,7 +25,7 @@
 #include "exec/hash-table.inline.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/buffered-tuple-stream.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
@@ -293,11 +293,11 @@ Status PhjBuilder::CreateHashPartitions(int level) {
 }
 
 bool PhjBuilder::AppendRowStreamFull(
-    BufferedTupleStreamV2* stream, TupleRow* row, Status* status) noexcept {
+    BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept {
   while (true) {
     // We ran out of memory. Pick a partition to spill. If we ran out of 
unspilled
     // partitions, SpillPartition() will return an error status.
-    *status = SpillPartition(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+    *status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
     if (!status->ok()) return false;
     if (stream->AddRow(row, status)) return true;
     if (!status->ok()) return false;
@@ -307,7 +307,7 @@ bool PhjBuilder::AppendRowStreamFull(
 }
 
 // TODO: can we do better with a different spilling heuristic?
-Status PhjBuilder::SpillPartition(BufferedTupleStreamV2::UnpinMode mode) {
+Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) {
   DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT);
   int64_t max_freed_mem = 0;
   int partition_idx = -1;
@@ -367,7 +367,7 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
       partition->Close(NULL);
     } else if (partition->is_spilled()) {
       // We don't need any build-side data for spilled partitions in memory.
-      partition->build_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+      partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
     }
   }
 
@@ -386,7 +386,7 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
     RETURN_IF_ERROR(partition->BuildHashTable(&built));
     // If we did not have enough memory to build this hash table, we need to 
spill this
     // partition (clean up the hash table, unpin build).
-    if (!built) 
RETURN_IF_ERROR(partition->Spill(BufferedTupleStreamV2::UNPIN_ALL));
+    if (!built) 
RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL));
   }
 
   // We may have spilled additional partitions while building hash tables, we 
need to
@@ -423,9 +423,9 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() {
   while (probe_streams_to_create > 0) {
     // Create stream in vector, so that it will be cleaned up after any 
failure.
     spilled_partition_probe_streams_.emplace_back(
-        make_unique<BufferedTupleStreamV2>(runtime_state_, probe_row_desc_,
+        make_unique<BufferedTupleStream>(runtime_state_, probe_row_desc_,
             buffer_pool_client_, spillable_buffer_size_, 
spillable_buffer_size_));
-    BufferedTupleStreamV2* probe_stream = 
spilled_partition_probe_streams_.back().get();
+    BufferedTupleStream* probe_stream = 
spilled_partition_probe_streams_.back().get();
     RETURN_IF_ERROR(probe_stream->Init(join_node_id_, false));
 
     // Loop until either the stream gets a buffer or all partitions are 
spilled (in which
@@ -435,7 +435,7 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() {
       RETURN_IF_ERROR(probe_stream->PrepareForWrite(&got_buffer));
       if (got_buffer) break;
 
-      RETURN_IF_ERROR(SpillPartition(BufferedTupleStreamV2::UNPIN_ALL));
+      RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL));
       ++probe_streams_to_create;
     }
     --probe_streams_to_create;
@@ -443,7 +443,7 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() {
   return Status::OK();
 }
 
-vector<unique_ptr<BufferedTupleStreamV2>> PhjBuilder::TransferProbeStreams() {
+vector<unique_ptr<BufferedTupleStream>> PhjBuilder::TransferProbeStreams() {
   return std::move(spilled_partition_probe_streams_);
 }
 
@@ -453,7 +453,7 @@ void PhjBuilder::CloseAndDeletePartitions() {
   all_partitions_.clear();
   hash_partitions_.clear();
   null_aware_partition_ = NULL;
-  for (unique_ptr<BufferedTupleStreamV2>& stream : 
spilled_partition_probe_streams_) {
+  for (unique_ptr<BufferedTupleStream>& stream : 
spilled_partition_probe_streams_) {
     stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
   spilled_partition_probe_streams_.clear();
@@ -505,14 +505,14 @@ void PhjBuilder::PublishRuntimeFilters(int64_t 
num_build_rows) {
 }
 
 Status PhjBuilder::RepartitionBuildInput(
-    Partition* input_partition, int level, BufferedTupleStreamV2* 
input_probe_rows) {
+    Partition* input_partition, int level, BufferedTupleStream* 
input_probe_rows) {
   DCHECK_GE(level, 1);
   SCOPED_TIMER(repartition_timer_);
   COUNTER_ADD(num_repartitions_, 1);
   RuntimeState* state = runtime_state_;
 
   // Setup the read buffer and the new partitions.
-  BufferedTupleStreamV2* build_rows = input_partition->build_rows();
+  BufferedTupleStream* build_rows = input_partition->build_rows();
   DCHECK(build_rows != NULL);
   bool got_read_buffer;
   RETURN_IF_ERROR(build_rows->PrepareForRead(true, &got_read_buffer));
@@ -545,7 +545,7 @@ Status PhjBuilder::RepartitionBuildInput(
     bool got_buffer;
     RETURN_IF_ERROR(input_probe_rows->PrepareForRead(true, &got_buffer));
     if (got_buffer) break;
-    
RETURN_IF_ERROR(SpillPartition(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT));
+    
RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
   }
 
   RETURN_IF_ERROR(FlushFinal(state));
@@ -573,7 +573,7 @@ bool PhjBuilder::HashTableStoresNulls() const {
 
 PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int 
level)
   : parent_(parent), is_spilled_(false), level_(level) {
-  build_rows_ = make_unique<BufferedTupleStreamV2>(state, parent_->row_desc_,
+  build_rows_ = make_unique<BufferedTupleStream>(state, parent_->row_desc_,
       parent_->buffer_pool_client_, parent->spillable_buffer_size_,
       parent->spillable_buffer_size_);
 }
@@ -602,7 +602,7 @@ void PhjBuilder::Partition::Close(RowBatch* batch) {
   }
 }
 
-Status PhjBuilder::Partition::Spill(BufferedTupleStreamV2::UnpinMode mode) {
+Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) {
   DCHECK(!IsClosed());
   
RETURN_IF_ERROR(parent_->runtime_state_->StartSpilling(parent_->mem_tracker()));
   // Close the hash table and unpin the stream backing it to free memory.
@@ -634,7 +634,7 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) {
   HashTableCtx* ctx = parent_->ht_ctx_.get();
   ctx->set_level(level()); // Set the hash function for building the hash 
table.
   RowBatch batch(parent_->row_desc_, state->batch_size(), 
parent_->mem_tracker());
-  vector<BufferedTupleStreamV2::FlatRowPtr> flat_rows;
+  vector<BufferedTupleStream::FlatRowPtr> flat_rows;
   bool eos = false;
 
   // Allocate the partition-local hash table. Initialize the number of buckets 
based on

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h 
b/be/src/exec/partitioned-hash-join-builder.h
index 912613d..277579c 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -26,7 +26,7 @@
 #include "exec/data-sink.h"
 #include "exec/filter-context.h"
 #include "exec/hash-table.h"
-#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/buffered-tuple-stream.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/suballocator.h"
 
@@ -103,7 +103,7 @@ class PhjBuilder : public DataSink {
   /// Transfer ownership of the probe streams to the caller. One stream was 
allocated per
   /// spilled partition in FlushFinal(). The probe streams are empty but 
prepared for
   /// writing with a write buffer allocated.
-  std::vector<std::unique_ptr<BufferedTupleStreamV2>> TransferProbeStreams();
+  std::vector<std::unique_ptr<BufferedTupleStream>> TransferProbeStreams();
 
   /// Clears the current list of hash partitions. Called after probing of the 
partitions
   /// is done. The partitions are not closed or destroyed, since they may be 
spilled or
@@ -124,7 +124,7 @@ class PhjBuilder : public DataSink {
   /// 'input_probe_rows' for reading in "delete_on_read" mode, so that the 
probe phase
   /// has enough buffers preallocated to execute successfully.
   Status RepartitionBuildInput(Partition* input_partition, int level,
-      BufferedTupleStreamV2* input_probe_rows) WARN_UNUSED_RESULT;
+      BufferedTupleStream* input_probe_rows) WARN_UNUSED_RESULT;
 
   /// Returns the largest build row count out of the current hash partitions.
   int64_t LargestPartitionRows() const;
@@ -201,10 +201,10 @@ class PhjBuilder : public DataSink {
 
     /// Spills this partition, the partition's stream is unpinned with 'mode' 
and
     /// its hash table is destroyed if it was built.
-    Status Spill(BufferedTupleStreamV2::UnpinMode mode) WARN_UNUSED_RESULT;
+    Status Spill(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT;
 
     bool ALWAYS_INLINE IsClosed() const { return build_rows_ == NULL; }
-    BufferedTupleStreamV2* ALWAYS_INLINE build_rows() { return 
build_rows_.get(); }
+    BufferedTupleStream* ALWAYS_INLINE build_rows() { return 
build_rows_.get(); }
     HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); }
     bool ALWAYS_INLINE is_spilled() const { return is_spilled_; }
     int ALWAYS_INLINE level() const { return level_; }
@@ -220,7 +220,7 @@ class PhjBuilder : public DataSink {
     /// failed: if 'status' is ok, inserting failed because not enough 
reservation
     /// was available and if 'status' is an error, inserting failed because of 
that error.
     bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx,
-        RowBatch* batch, const std::vector<BufferedTupleStreamV2::FlatRowPtr>& 
flat_rows,
+        RowBatch* batch, const std::vector<BufferedTupleStream::FlatRowPtr>& 
flat_rows,
         Status* status);
 
     const PhjBuilder* parent_;
@@ -239,7 +239,7 @@ class PhjBuilder : public DataSink {
     /// Stream of build tuples in this partition. Initially owned by this 
object but
     /// transferred to the parent exec node (via the row batch) when the 
partition
     /// is closed. If NULL, ownership has been transferred and the partition 
is closed.
-    std::unique_ptr<BufferedTupleStreamV2> build_rows_;
+    std::unique_ptr<BufferedTupleStream> build_rows_;
   };
 
   /// Computes the minimum number of buffers required to execute the spilling 
partitioned
@@ -288,19 +288,19 @@ class PhjBuilder : public DataSink {
   /// partitions. This odd return convention is used to avoid emitting 
unnecessary code
   /// for ~Status in perf-critical code.
   bool AppendRow(
-      BufferedTupleStreamV2* stream, TupleRow* row, Status* status) 
WARN_UNUSED_RESULT;
+      BufferedTupleStream* stream, TupleRow* row, Status* status) 
WARN_UNUSED_RESULT;
 
   /// Slow path for AppendRow() above. It is called when the stream has failed 
to append
   /// the row. We need to find more memory by either switching to IO-buffers, 
in case the
   /// stream still uses small buffers, or spilling a partition. Returns false 
and sets
   /// 'status' if it was unable to append the row, even after spilling 
partitions.
-  bool AppendRowStreamFull(BufferedTupleStreamV2* stream, TupleRow* row,
+  bool AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row,
       Status* status) noexcept WARN_UNUSED_RESULT;
 
   /// Frees memory by spilling one of the hash partitions. The 'mode' argument 
is passed
   /// to the Spill() call for the selected partition. The current policy is to 
spill the
   /// largest partition. Returns non-ok status if we couldn't spill a 
partition.
-  Status SpillPartition(BufferedTupleStreamV2::UnpinMode mode) 
WARN_UNUSED_RESULT;
+  Status SpillPartition(BufferedTupleStream::UnpinMode mode) 
WARN_UNUSED_RESULT;
 
   /// Tries to build hash tables for all unspilled hash partitions. Called 
after
   /// FlushFinal() when all build rows have been partitioned and added to the 
appropriate
@@ -464,7 +464,7 @@ class PhjBuilder : public DataSink {
   ///
   /// Because of this, at the end of the build phase, we always have 
sufficient memory
   /// to execute the probe phase of the algorithm without spilling more 
partitions.
-  std::vector<std::unique_ptr<BufferedTupleStreamV2>> 
spilled_partition_probe_streams_;
+  std::vector<std::unique_ptr<BufferedTupleStream>> 
spilled_partition_probe_streams_;
 
   /// END: Members that must be Reset()
   /////////////////////////////////////////
@@ -479,7 +479,7 @@ class PhjBuilder : public DataSink {
   ProcessBuildBatchFn process_build_batch_fn_level0_;
 
   typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, 
HashTableCtx*, RowBatch*,
-      const std::vector<BufferedTupleStreamV2::FlatRowPtr>&, Status*);
+      const std::vector<BufferedTupleStream::FlatRowPtr>&, Status*);
   /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is 
disabled.
   InsertBatchFn insert_batch_fn_;
   InsertBatchFn insert_batch_fn_level0_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc 
b/be/src/exec/partitioned-hash-join-node-ir.cc
index b890eb9..3106419 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -313,7 +313,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
           // The partition is not in memory, spill the probe row and move to 
the next row.
           // Skip the current row if we manage to append to the spilled 
partition's BTS.
           // Otherwise, we need to bail out and report the failure.
-          BufferedTupleStreamV2* probe_rows = probe_partition->probe_rows();
+          BufferedTupleStream* probe_rows = probe_partition->probe_rows();
           if (UNLIKELY(!AppendProbeRow(probe_rows, current_probe_row_, 
status))) {
             DCHECK(!status->ok());
             return false;
@@ -438,7 +438,7 @@ int 
PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode
 }
 
 inline bool PartitionedHashJoinNode::AppendProbeRow(
-    BufferedTupleStreamV2* stream, TupleRow* row, Status* status) {
+    BufferedTupleStream* stream, TupleRow* row, Status* status) {
   DCHECK(stream->has_write_iterator());
   DCHECK(!stream->is_pinned());
   return stream->AddRow(row, status);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc 
b/be/src/exec/partitioned-hash-join-node.cc
index 2db9e00..806bdc0 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -27,7 +27,7 @@
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/slot-ref.h"
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -265,7 +265,7 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
 
 PartitionedHashJoinNode::ProbePartition::ProbePartition(RuntimeState* state,
     PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition,
-    unique_ptr<BufferedTupleStreamV2> probe_rows)
+    unique_ptr<BufferedTupleStream> probe_rows)
   : build_partition_(build_partition),
     probe_rows_(std::move(probe_rows)) {
   DCHECK(probe_rows_->has_write_iterator());
@@ -328,7 +328,7 @@ Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
     probe_batch_pos_ = -1;
     return Status::OK();
   }
-  BufferedTupleStreamV2* probe_rows = input_partition_->probe_rows();
+  BufferedTupleStream* probe_rows = input_partition_->probe_rows();
   if (LIKELY(probe_rows->rows_returned() < probe_rows->num_rows())) {
     // Continue from the current probe stream.
     bool eos = false;
@@ -420,9 +420,9 @@ Status 
PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
     ht_ctx_->set_level(next_partition_level);
 
     // Spill to free memory from hash tables and pinned streams for use in new 
partitions.
-    RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStreamV2::UNPIN_ALL));
+    RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStream::UNPIN_ALL));
     // Temporarily free up the probe buffer to use when repartitioning.
-    
input_partition_->probe_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+    
input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
     DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0) << 
NodeDebugString();
     DCHECK_EQ(input_partition_->probe_rows()->BytesPinned(false), 0) << 
NodeDebugString();
     int64_t num_input_rows = build_partition->build_rows()->num_rows();
@@ -822,7 +822,7 @@ static Status NullAwareAntiJoinError(bool build) {
 
 Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
   RuntimeState* state = runtime_state_;
-  unique_ptr<BufferedTupleStreamV2> probe_rows = 
make_unique<BufferedTupleStreamV2>(
+  unique_ptr<BufferedTupleStream> probe_rows = 
make_unique<BufferedTupleStream>(
       state, child(0)->row_desc(), &buffer_pool_client_,
       resource_profile_.spillable_buffer_size,
       resource_profile_.spillable_buffer_size);
@@ -847,7 +847,7 @@ error:
 
 Status PartitionedHashJoinNode::InitNullProbeRows() {
   RuntimeState* state = runtime_state_;
-  null_probe_rows_ = make_unique<BufferedTupleStreamV2>(state, 
child(0)->row_desc(),
+  null_probe_rows_ = make_unique<BufferedTupleStream>(state, 
child(0)->row_desc(),
       &buffer_pool_client_, resource_profile_.spillable_buffer_size,
       resource_profile_.spillable_buffer_size);
   // TODO: we shouldn't start with this unpinned if spilling is disabled.
@@ -866,8 +866,8 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() 
{
   DCHECK_EQ(probe_batch_pos_, -1);
   DCHECK_EQ(probe_batch_->num_rows(), 0);
 
-  BufferedTupleStreamV2* build_stream = 
builder_->null_aware_partition()->build_rows();
-  BufferedTupleStreamV2* probe_stream = 
null_aware_probe_partition_->probe_rows();
+  BufferedTupleStream* build_stream = 
builder_->null_aware_partition()->build_rows();
+  BufferedTupleStream* probe_stream = 
null_aware_probe_partition_->probe_rows();
 
   if (build_stream->num_rows() == 0) {
     // There were no build rows. Nothing to do. Just prepare to output the null
@@ -904,7 +904,7 @@ Status 
PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
   int num_join_conjuncts = other_join_conjuncts_.size();
   DCHECK(probe_batch_ != NULL);
 
-  BufferedTupleStreamV2* probe_stream = 
null_aware_probe_partition_->probe_rows();
+  BufferedTupleStream* probe_stream = 
null_aware_probe_partition_->probe_rows();
   if (probe_batch_pos_ == probe_batch_->num_rows()) {
     probe_batch_pos_ = 0;
     probe_batch_->TransferResourceOwnership(out_batch);
@@ -952,7 +952,7 @@ Status PartitionedHashJoinNode::PrepareForProbe() {
   DCHECK(probe_hash_partitions_.empty());
 
   // Initialize the probe partitions, providing them with probe streams.
-  vector<unique_ptr<BufferedTupleStreamV2>> probe_streams =
+  vector<unique_ptr<BufferedTupleStream>> probe_streams =
       builder_->TransferProbeStreams();
   probe_hash_partitions_.resize(PARTITION_FANOUT);
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
@@ -989,7 +989,7 @@ Status PartitionedHashJoinNode::PrepareForProbe() {
 }
 
 void PartitionedHashJoinNode::CreateProbePartition(
-    int partition_idx, unique_ptr<BufferedTupleStreamV2> probe_rows) {
+    int partition_idx, unique_ptr<BufferedTupleStream> probe_rows) {
   DCHECK_GE(partition_idx, 0);
   DCHECK_LT(partition_idx, probe_hash_partitions_.size());
   DCHECK(probe_hash_partitions_[partition_idx] == NULL);
@@ -998,7 +998,7 @@ void PartitionedHashJoinNode::CreateProbePartition(
 }
 
 Status PartitionedHashJoinNode::EvaluateNullProbe(
-    RuntimeState* state, BufferedTupleStreamV2* build) {
+    RuntimeState* state, BufferedTupleStream* build) {
   if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) {
     return Status::OK();
   }
@@ -1067,9 +1067,9 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(
       // can recurse the algorithm and create new hash partitions from spilled 
partitions.
       // TODO: we shouldn't need to unpin the build stream if we stop spilling
       // while probing.
-      
build_partition->build_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+      
build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
       DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0);
-      
probe_partition->probe_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+      
probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
 
       if (probe_partition->probe_rows()->num_rows() != 0
           || NeedToProcessUnmatchedBuildRows()) {
@@ -1108,7 +1108,7 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(
   // Just finished evaluating the null probe rows with all the non-spilled 
build
   // partitions. Unpin this now to free this memory for repartitioning.
   if (null_probe_rows_ != NULL) {
-    
null_probe_rows_->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+    
null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
   }
 
   builder_->ClearHashPartitions();
@@ -1170,7 +1170,7 @@ string PartitionedHashJoinNode::NodeDebugString() const {
     ss << "  Probe hash partition " << i << ": ";
     if (probe_partition != NULL) {
       ss << "probe ptr=" << probe_partition;
-      BufferedTupleStreamV2* probe_rows = probe_partition->probe_rows();
+      BufferedTupleStream* probe_rows = probe_partition->probe_rows();
       if (probe_rows != NULL) {
         ss << "    Probe Rows: " << probe_rows->num_rows()
            << "    (Bytes pinned: " << probe_rows->BytesPinned(false) << ")";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h 
b/be/src/exec/partitioned-hash-join-node.h
index b3f663e..6ed5269 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -162,7 +162,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Creates an initialized probe partition at 'partition_idx' in
   /// 'probe_hash_partitions_'.
   void CreateProbePartition(
-      int partition_idx, std::unique_ptr<BufferedTupleStreamV2> probe_rows);
+      int partition_idx, std::unique_ptr<BufferedTupleStream> probe_rows);
 
   /// Append the probe row 'row' to 'stream'. The stream must be unpinned and 
must have
   /// a write buffer allocated, so this will succeed unless an error is 
encountered.
@@ -170,7 +170,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// return convention is used to avoid emitting unnecessary code for ~Status 
in perf-
   /// critical code.
   bool AppendProbeRow(
-      BufferedTupleStreamV2* stream, TupleRow* row, Status* status) 
WARN_UNUSED_RESULT;
+      BufferedTupleStream* stream, TupleRow* row, Status* status) 
WARN_UNUSED_RESULT;
 
   /// Probes the hash table for rows matching the current probe row and appends
   /// all the matching build rows (with probe row) to output batch. Returns 
true
@@ -325,7 +325,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// conjuncts pass (i.e. there is a match).
   /// This is used for NAAJ, when there are NULL probe rows.
   Status EvaluateNullProbe(
-      RuntimeState* state, BufferedTupleStreamV2* build) WARN_UNUSED_RESULT;
+      RuntimeState* state, BufferedTupleStream* build) WARN_UNUSED_RESULT;
 
   /// Prepares to output NULLs on the probe side for NAAJ. Before calling this,
   /// matched_null_probe_ should have been fully evaluated.
@@ -472,7 +472,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// For NAAJ, this stream contains all probe rows that had NULL on the hash 
table
   /// conjuncts. Must be unique_ptr so we can release it and transfer to 
output batches.
-  std::unique_ptr<BufferedTupleStreamV2> null_probe_rows_;
+  std::unique_ptr<BufferedTupleStream> null_probe_rows_;
 
   /// For each row in null_probe_rows_, true if this row has matched any build 
row
   /// (i.e. the resulting joined row passes other_join_conjuncts).
@@ -504,7 +504,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
     /// that has been prepared for writing with an I/O-sized write buffer.
     ProbePartition(RuntimeState* state, PartitionedHashJoinNode* parent,
         PhjBuilder::Partition* build_partition,
-        std::unique_ptr<BufferedTupleStreamV2> probe_rows);
+        std::unique_ptr<BufferedTupleStream> probe_rows);
     ~ProbePartition();
 
     /// Prepare to read the probe rows. Allocates the first read block, so 
reads will
@@ -517,7 +517,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
     /// resources if 'batch' is NULL. Idempotent.
     void Close(RowBatch* batch);
 
-    BufferedTupleStreamV2* ALWAYS_INLINE probe_rows() { return 
probe_rows_.get(); }
+    BufferedTupleStream* ALWAYS_INLINE probe_rows() { return 
probe_rows_.get(); }
     PhjBuilder::Partition* build_partition() { return build_partition_; }
 
     inline bool IsClosed() const { return probe_rows_ == NULL; }
@@ -529,7 +529,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
     /// Stream of probe tuples in this partition. Initially owned by this 
object but
     /// transferred to the parent exec node (via the row batch) when the 
partition
     /// is complete. If NULL, ownership was transferred and the partition is 
closed.
-    std::unique_ptr<BufferedTupleStreamV2> probe_rows_;
+    std::unique_ptr<BufferedTupleStream> probe_rows_;
   };
 
   /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when 
available

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-node.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.inline.h 
b/be/src/exec/partitioned-hash-join-node.inline.h
index 3441aac..a53b40e 100644
--- a/be/src/exec/partitioned-hash-join-node.inline.h
+++ b/be/src/exec/partitioned-hash-join-node.inline.h
@@ -20,7 +20,7 @@
 
 #include "exec/partitioned-hash-join-node.h"
 
-#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 92af968..391fd01 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -24,7 +24,7 @@ set(LIBRARY_OUTPUT_PATH 
"${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
 
 add_library(Runtime
-  buffered-tuple-stream-v2.cc
+  buffered-tuple-stream.cc
   client-cache.cc
   coordinator.cc
   coordinator-backend-state.cc
@@ -91,7 +91,7 @@ ADD_BE_TEST(thread-resource-mgr-test)
 ADD_BE_TEST(mem-tracker-test)
 ADD_BE_TEST(multi-precision-test)
 ADD_BE_TEST(decimal-test)
-ADD_BE_TEST(buffered-tuple-stream-v2-test)
+ADD_BE_TEST(buffered-tuple-stream-test)
 ADD_BE_TEST(hdfs-fs-cache-test)
 ADD_BE_TEST(tmp-file-mgr-test)
 ADD_BE_TEST(row-batch-serialize-test)

Reply via email to