IMPALA-4674: Part 2.5: Rename BufferedTupleStreamV2 This is cleanup that wasn't included in Part 2.
Testing: Confirmed that everything (including be tests) built ok, buffered-tuple-stream-v2-test passed and that I could run a couple of basic queries. Change-Id: Ib8b23d7c2d7488d9f74b08cc9adb4ed1a93e3591 Reviewed-on: http://gerrit.cloudera.org:8080/7609 Reviewed-by: Matthew Jacobs <m...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0c46147e Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0c46147e Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0c46147e Branch: refs/heads/master Commit: 0c46147e5fd93e2a9a63d145d60b656d2f6a7612 Parents: 5caadbb Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Mon Aug 7 09:07:00 2017 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Tue Aug 8 10:22:20 2017 +0000 ---------------------------------------------------------------------- be/src/exec/analytic-eval-node.cc | 8 +- be/src/exec/analytic-eval-node.h | 4 +- be/src/exec/hash-table-test.cc | 12 +- be/src/exec/hash-table.cc | 4 +- be/src/exec/hash-table.h | 14 +- be/src/exec/hash-table.inline.h | 2 +- be/src/exec/partitioned-aggregation-node-ir.cc | 2 +- be/src/exec/partitioned-aggregation-node.cc | 28 +- be/src/exec/partitioned-aggregation-node.h | 12 +- be/src/exec/partitioned-hash-join-builder-ir.cc | 10 +- be/src/exec/partitioned-hash-join-builder.cc | 34 +- be/src/exec/partitioned-hash-join-builder.h | 24 +- be/src/exec/partitioned-hash-join-node-ir.cc | 4 +- be/src/exec/partitioned-hash-join-node.cc | 34 +- be/src/exec/partitioned-hash-join-node.h | 14 +- be/src/exec/partitioned-hash-join-node.inline.h | 2 +- be/src/runtime/CMakeLists.txt | 4 +- be/src/runtime/buffered-tuple-stream-test.cc | 1462 ++++++++++++++++++ be/src/runtime/buffered-tuple-stream-v2-test.cc | 1462 ------------------ be/src/runtime/buffered-tuple-stream-v2.cc | 1084 ------------- be/src/runtime/buffered-tuple-stream-v2.h | 705 --------- .../runtime/buffered-tuple-stream-v2.inline.h | 56 - be/src/runtime/buffered-tuple-stream.cc | 1084 +++++++++++++ be/src/runtime/buffered-tuple-stream.h | 705 +++++++++ be/src/runtime/buffered-tuple-stream.inline.h | 56 + 25 files changed, 3413 insertions(+), 3413 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/analytic-eval-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc index f6d96ae..af4f866 100644 --- a/be/src/exec/analytic-eval-node.cc +++ b/be/src/exec/analytic-eval-node.cc @@ -23,7 +23,7 @@ #include "exprs/agg-fn-evaluator.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" -#include "runtime/buffered-tuple-stream-v2.inline.h" +#include "runtime/buffered-tuple-stream.inline.h" #include "runtime/descriptors.h" #include "runtime/mem-tracker.h" #include "runtime/query-state.h" @@ -195,7 +195,7 @@ Status AnalyticEvalNode::Open(RuntimeState* state) { RETURN_IF_ERROR(ClaimBufferReservation(state)); } DCHECK(input_stream_ == nullptr); - input_stream_.reset(new BufferedTupleStreamV2(state, child(0)->row_desc(), + input_stream_.reset(new BufferedTupleStream(state, child(0)->row_desc(), &buffer_pool_client_, resource_profile_.spillable_buffer_size, resource_profile_.spillable_buffer_size)); RETURN_IF_ERROR(input_stream_->Init(id(), true)); @@ -363,7 +363,7 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) { // TODO: Consider re-pinning later if the output stream is fully consumed. RETURN_IF_ERROR(status); RETURN_IF_ERROR(state_->StartSpilling(mem_tracker())); - input_stream_->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT); + input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx; if (!input_stream_->AddRow(row, &status)) { // Rows should be added in unpinned mode unless an error occurs. @@ -623,7 +623,7 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) { << " tuple pool size:" << curr_tuple_pool_->total_allocated_bytes(); SCOPED_TIMER(evaluation_timer_); - // BufferedTupleStreamV2::num_rows() returns the total number of rows that have been + // BufferedTupleStream::num_rows() returns the total number of rows that have been // inserted into the stream (it does not decrease when we read rows), so the index of // the next input row that will be inserted will be the current size of the stream. int64_t stream_idx = input_stream_->num_rows(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/analytic-eval-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h index eab9198..671eaa4 100644 --- a/be/src/exec/analytic-eval-node.h +++ b/be/src/exec/analytic-eval-node.h @@ -19,7 +19,7 @@ #define IMPALA_EXEC_ANALYTIC_EVAL_NODE_H #include "exec/exec-node.h" -#include "runtime/buffered-tuple-stream-v2.h" +#include "runtime/buffered-tuple-stream.h" #include "runtime/tuple.h" namespace impala { @@ -339,7 +339,7 @@ class AnalyticEvalNode : public ExecNode { /// buffers with tuple data are attached to an output row batch on eos or /// ReachedLimit(). /// TODO: Consider re-pinning unpinned streams when possible. - boost::scoped_ptr<BufferedTupleStreamV2> input_stream_; + boost::scoped_ptr<BufferedTupleStream> input_stream_; /// Pool used for O(1) allocations that live until Close() or Reset(). /// Does not own data backing tuples returned in GetNext(), so it does not http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/hash-table-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc index 7a6ec9d..aad9134 100644 --- a/be/src/exec/hash-table-test.cc +++ b/be/src/exec/hash-table-test.cc @@ -309,7 +309,7 @@ class HashTableTest : public testing::Test { for (int i = 0; i < 2; ++i) { if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue; - BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr; + BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr; EXPECT_TRUE(hash_table->stores_tuples_); Status status; bool inserted = @@ -350,7 +350,7 @@ class HashTableTest : public testing::Test { ASSERT_TRUE(success); for (int i = 0; i < 5; ++i) { if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue; - BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr; + BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr; EXPECT_TRUE(hash_table->stores_tuples_); bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, build_rows[i], &status); @@ -418,7 +418,7 @@ class HashTableTest : public testing::Test { for (int i = 0; i < val; ++i) { TupleRow* row = CreateTupleRow(val); if (!ht_ctx->EvalAndHashBuild(row)) continue; - BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr; + BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr; EXPECT_TRUE(hash_table->stores_tuples_); ASSERT_TRUE(hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status)); ASSERT_OK(status); @@ -481,7 +481,7 @@ class HashTableTest : public testing::Test { for (int j = 0; j < num_to_add; ++build_row_val, ++j) { TupleRow* row = CreateTupleRow(build_row_val); if (!ht_ctx->EvalAndHashBuild(row)) continue; - BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr; + BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr; EXPECT_TRUE(hash_table->stores_tuples_); bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status); ASSERT_OK(status); @@ -518,7 +518,7 @@ class HashTableTest : public testing::Test { while (true) { TupleRow* duplicate_row = CreateTupleRow(DUPLICATE_VAL); if (!ht_ctx->EvalAndHashBuild(duplicate_row)) continue; - BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr; + BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr; bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, duplicate_row, &status); ASSERT_OK(status); @@ -569,7 +569,7 @@ class HashTableTest : public testing::Test { // Insert using both Insert() and FindBucket() methods. if (build_row_val % 2 == 0) { - BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr; + BufferedTupleStream::FlatRowPtr dummy_flat_row = nullptr; EXPECT_TRUE(hash_table->stores_tuples_); bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status); EXPECT_TRUE(inserted); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/hash-table.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc index aacedc2..e65d9f1 100644 --- a/be/src/exec/hash-table.cc +++ b/be/src/exec/hash-table.cc @@ -385,14 +385,14 @@ constexpr double HashTable::MAX_FILL_FACTOR; constexpr int64_t HashTable::DATA_PAGE_SIZE; HashTable* HashTable::Create(Suballocator* allocator, bool stores_duplicates, - int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets, + int num_build_tuples, BufferedTupleStream* tuple_stream, int64_t max_num_buckets, int64_t initial_num_buckets) { return new HashTable(FLAGS_enable_quadratic_probing, allocator, stores_duplicates, num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets); } HashTable::HashTable(bool quadratic_probing, Suballocator* allocator, - bool stores_duplicates, int num_build_tuples, BufferedTupleStreamV2* stream, + bool stores_duplicates, int num_build_tuples, BufferedTupleStream* stream, int64_t max_num_buckets, int64_t num_buckets) : allocator_(allocator), tuple_stream_(stream), http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/hash-table.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h index 297e619..d764640 100644 --- a/be/src/exec/hash-table.h +++ b/be/src/exec/hash-table.h @@ -26,8 +26,8 @@ #include "codegen/impala-ir.h" #include "common/compiler-util.h" #include "common/logging.h" -#include "runtime/buffered-tuple-stream-v2.h" -#include "runtime/buffered-tuple-stream-v2.inline.h" +#include "runtime/buffered-tuple-stream.h" +#include "runtime/buffered-tuple-stream.inline.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/bufferpool/suballocator.h" #include "runtime/tuple-row.h" @@ -533,7 +533,7 @@ class HashTable { /// of two formats, depending on the number of tuples in the row. union HtData { // For rows with multiple tuples per row, a pointer to the flattened TupleRow. - BufferedTupleStreamV2::FlatRowPtr flat_row; + BufferedTupleStream::FlatRowPtr flat_row; // For rows with one tuple per row, a pointer to the Tuple itself. Tuple* tuple; }; @@ -600,7 +600,7 @@ class HashTable { /// - initial_num_buckets: number of buckets that the hash table should be initialized /// with. static HashTable* Create(Suballocator* allocator, bool stores_duplicates, - int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets, + int num_build_tuples, BufferedTupleStream* tuple_stream, int64_t max_num_buckets, int64_t initial_num_buckets); /// Allocates the initial bucket structure. Returns a non-OK status if an error is @@ -623,7 +623,7 @@ class HashTable { /// is stored. The 'row' is not copied by the hash table and the caller must guarantee /// it stays in memory. This will not grow the hash table. bool IR_ALWAYS_INLINE Insert(HashTableCtx* ht_ctx, - BufferedTupleStreamV2::FlatRowPtr flat_row, TupleRow* row, + BufferedTupleStream::FlatRowPtr flat_row, TupleRow* row, Status* status) WARN_UNUSED_RESULT; /// Prefetch the hash table bucket which the given hash value 'hash' maps to. @@ -819,7 +819,7 @@ class HashTable { /// - quadratic_probing: set to true when the probing algorithm is quadratic, as /// opposed to linear. HashTable(bool quadratic_probing, Suballocator* allocator, bool stores_duplicates, - int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets, + int num_build_tuples, BufferedTupleStream* tuple_stream, int64_t max_num_buckets, int64_t initial_num_buckets); /// Performs the probing operation according to the probing algorithm (linear or @@ -918,7 +918,7 @@ class HashTable { /// Stream contains the rows referenced by the hash table. Can be NULL if the /// row only contains a single tuple, in which case the TupleRow indirection /// is removed by the hash table. - BufferedTupleStreamV2* tuple_stream_; + BufferedTupleStream* tuple_stream_; /// Constants on how the hash table should behave. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/hash-table.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h index ce2f784..85d7ad6 100644 --- a/be/src/exec/hash-table.inline.h +++ b/be/src/exec/hash-table.inline.h @@ -109,7 +109,7 @@ inline HashTable::HtData* HashTable::InsertInternal( } inline bool HashTable::Insert(HashTableCtx* ht_ctx, - BufferedTupleStreamV2::FlatRowPtr flat_row, TupleRow* row, Status* status) { + BufferedTupleStream::FlatRowPtr flat_row, TupleRow* row, Status* status) { HtData* htdata = InsertInternal(ht_ctx, status); // If successful insert, update the contents of the newly inserted entry with 'idx'. if (LIKELY(htdata != NULL)) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-aggregation-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc index 126a2a5..9baada1 100644 --- a/be/src/exec/partitioned-aggregation-node-ir.cc +++ b/be/src/exec/partitioned-aggregation-node-ir.cc @@ -21,7 +21,7 @@ #include "exprs/agg-fn-evaluator.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" -#include "runtime/buffered-tuple-stream-v2.inline.h" +#include "runtime/buffered-tuple-stream.inline.h" #include "runtime/row-batch.h" #include "runtime/tuple-row.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index fc0a4a6..16db5cc 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -31,7 +31,7 @@ #include "exprs/scalar-expr-evaluator.h" #include "exprs/slot-ref.h" #include "gutil/strings/substitute.h" -#include "runtime/buffered-tuple-stream-v2.inline.h" +#include "runtime/buffered-tuple-stream.inline.h" #include "runtime/descriptors.h" #include "runtime/exec-env.h" #include "runtime/mem-pool.h" @@ -275,7 +275,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) { &buffer_pool_client_, resource_profile_.spillable_buffer_size)); if (!is_streaming_preagg_ && needs_serialize_) { - serialize_stream_.reset(new BufferedTupleStreamV2(state, &intermediate_row_desc_, + serialize_stream_.reset(new BufferedTupleStream(state, &intermediate_row_desc_, &buffer_pool_client_, resource_profile_.spillable_buffer_size, resource_profile_.spillable_buffer_size)); RETURN_IF_ERROR(serialize_stream_->Init(id(), false)); @@ -722,7 +722,7 @@ Status PartitionedAggregationNode::Partition::InitStreams() { } } - aggregated_row_stream.reset(new BufferedTupleStreamV2(parent->state_, + aggregated_row_stream.reset(new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_, &parent->buffer_pool_client_, parent->resource_profile_.spillable_buffer_size, parent->resource_profile_.spillable_buffer_size, external_varlen_slots)); @@ -740,7 +740,7 @@ Status PartitionedAggregationNode::Partition::InitStreams() { } if (!parent->is_streaming_preagg_) { - unaggregated_row_stream.reset(new BufferedTupleStreamV2(parent->state_, + unaggregated_row_stream.reset(new BufferedTupleStream(parent->state_, parent->child(0)->row_desc(), &parent->buffer_pool_client_, parent->resource_profile_.spillable_buffer_size, parent->resource_profile_.spillable_buffer_size)); @@ -786,7 +786,7 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() { // Serialize and copy the spilled partition's stream into the new stream. Status status; - BufferedTupleStreamV2* new_stream = parent->serialize_stream_.get(); + BufferedTupleStream* new_stream = parent->serialize_stream_.get(); HashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get()); while (!it.AtEnd()) { Tuple* tuple = it.GetTuple(); @@ -811,7 +811,7 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() { // when we need to spill again. We need to have this available before we need // to spill to make sure it is available. This should be acquirable since we just // freed at least one buffer from this partition's (old) aggregated_row_stream. - parent->serialize_stream_.reset(new BufferedTupleStreamV2(parent->state_, + parent->serialize_stream_.reset(new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_, &parent->buffer_pool_client_, parent->resource_profile_.spillable_buffer_size, parent->resource_profile_.spillable_buffer_size)); @@ -866,9 +866,9 @@ Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) { DCHECK(aggregated_row_stream->has_write_iterator()); DCHECK(!unaggregated_row_stream->has_write_iterator()); if (more_aggregate_rows) { - aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT); + aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); } else { - aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL); bool got_buffer; RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer)); DCHECK(got_buffer) @@ -932,7 +932,7 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( } Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( - const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStreamV2* stream, + const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStream* stream, Status* status) noexcept { DCHECK(stream != NULL && status != NULL); // Allocate space for the entire tuple in the stream. @@ -1077,7 +1077,7 @@ Status PartitionedAggregationNode::AppendSpilledRow( Partition* __restrict__ partition, TupleRow* __restrict__ row) { DCHECK(!is_streaming_preagg_); DCHECK(partition->is_spilled()); - BufferedTupleStreamV2* stream = AGGREGATED_ROWS ? + BufferedTupleStream* stream = AGGREGATED_ROWS ? partition->aggregated_row_stream.get() : partition->unaggregated_row_stream.get(); DCHECK(!stream->is_pinned()); @@ -1297,7 +1297,7 @@ Status PartitionedAggregationNode::RepartitionSpilledPartition() { if (!hash_partition->is_spilled()) continue; // The aggregated rows have been repartitioned. Free up at least a buffer's worth of // reservation and use it to pin the unaggregated write buffer. - hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL); bool got_buffer; RETURN_IF_ERROR( hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer)); @@ -1332,7 +1332,7 @@ Status PartitionedAggregationNode::RepartitionSpilledPartition() { } template <bool AGGREGATED_ROWS> -Status PartitionedAggregationNode::ProcessStream(BufferedTupleStreamV2* input_stream) { +Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stream) { DCHECK(!is_streaming_preagg_); if (input_stream->num_rows() > 0) { while (true) { @@ -1430,8 +1430,8 @@ void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) { // Ensure all pages in the spilled partition's streams are unpinned by invalidating // the streams' read and write iterators. We may need all the memory to process the // next spilled partitions. - partition->aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); - partition->unaggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL); + partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL); spilled_partitions_.push_front(partition); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h index 4f8b622..c230630 100644 --- a/be/src/exec/partitioned-aggregation-node.h +++ b/be/src/exec/partitioned-aggregation-node.h @@ -25,7 +25,7 @@ #include "exec/exec-node.h" #include "exec/hash-table.h" -#include "runtime/buffered-tuple-stream-v2.h" +#include "runtime/buffered-tuple-stream.h" #include "runtime/bufferpool/suballocator.h" #include "runtime/descriptors.h" // for TupleId #include "runtime/mem-pool.h" @@ -425,18 +425,18 @@ class PartitionedAggregationNode : public ExecNode { /// For streaming preaggs, this may be NULL if sufficient memory is not available. /// In that case hash_tbl is also NULL and all rows for the partition will be passed /// through. - boost::scoped_ptr<BufferedTupleStreamV2> aggregated_row_stream; + boost::scoped_ptr<BufferedTupleStream> aggregated_row_stream; /// Unaggregated rows that are spilled. Always NULL for streaming pre-aggregations. /// Always unpinned. Has a write buffer allocated when the partition is spilled and /// unaggregated rows are being processed. - boost::scoped_ptr<BufferedTupleStreamV2> unaggregated_row_stream; + boost::scoped_ptr<BufferedTupleStream> unaggregated_row_stream; }; /// Stream used to store serialized spilled rows. Only used if needs_serialize_ /// is set. This stream is never pinned and only used in Partition::Spill as a /// a temporary buffer. - boost::scoped_ptr<BufferedTupleStreamV2> serialize_stream_; + boost::scoped_ptr<BufferedTupleStream> serialize_stream_; /// Accessor for 'hash_tbls_' that verifies consistency with the partitions. HashTable* ALWAYS_INLINE GetHashTable(int partition_idx) { @@ -471,7 +471,7 @@ class PartitionedAggregationNode : public ExecNode { /// FunctionContexts, so is stored outside the stream. If stream's small buffers get /// full, it will attempt to switch to IO-buffers. Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals, - BufferedTupleStreamV2* stream, Status* status) noexcept; + BufferedTupleStream* stream, Status* status) noexcept; /// Constructs intermediate tuple, allocating memory from pool instead of the stream. /// Returns NULL and sets status if there is not enough memory to allocate the tuple. @@ -571,7 +571,7 @@ class PartitionedAggregationNode : public ExecNode { /// Reads all the rows from input_stream and process them by calling ProcessBatch(). template <bool AGGREGATED_ROWS> - Status ProcessStream(BufferedTupleStreamV2* input_stream) WARN_UNUSED_RESULT; + Status ProcessStream(BufferedTupleStream* input_stream) WARN_UNUSED_RESULT; /// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'. void GetSingletonOutput(RowBatch* row_batch); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-builder-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc index df58036..e15e116 100644 --- a/be/src/exec/partitioned-hash-join-builder-ir.cc +++ b/be/src/exec/partitioned-hash-join-builder-ir.cc @@ -19,7 +19,7 @@ #include "codegen/impala-ir.h" #include "exec/hash-table.inline.h" -#include "runtime/buffered-tuple-stream-v2.inline.h" +#include "runtime/buffered-tuple-stream.inline.h" #include "runtime/raw-value.inline.h" #include "runtime/row-batch.h" #include "runtime/runtime-filter.h" @@ -30,7 +30,7 @@ using namespace impala; inline bool PhjBuilder::AppendRow( - BufferedTupleStreamV2* stream, TupleRow* row, Status* status) { + BufferedTupleStream* stream, TupleRow* row, Status* status) { if (LIKELY(stream->AddRow(row, status))) return true; if (UNLIKELY(!status->ok())) return false; return AppendRowStreamFull(stream, row, status); @@ -73,12 +73,12 @@ Status PhjBuilder::ProcessBuildBatch( bool PhjBuilder::Partition::InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx, RowBatch* batch, - const vector<BufferedTupleStreamV2::FlatRowPtr>& flat_rows, Status* status) { + const vector<BufferedTupleStream::FlatRowPtr>& flat_rows, Status* status) { // Compute the hash values and prefetch the hash table buckets. const int num_rows = batch->num_rows(); HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); const int prefetch_size = expr_vals_cache->capacity(); - const BufferedTupleStreamV2::FlatRowPtr* flat_rows_data = flat_rows.data(); + const BufferedTupleStream::FlatRowPtr* flat_rows_data = flat_rows.data(); for (int prefetch_group_row = 0; prefetch_group_row < num_rows; prefetch_group_row += prefetch_size) { int cur_row = prefetch_group_row; @@ -97,7 +97,7 @@ bool PhjBuilder::Partition::InsertBatch(TPrefetchMode::type prefetch_mode, expr_vals_cache->ResetForRead(); FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) { TupleRow* row = batch_iter.Get(); - BufferedTupleStreamV2::FlatRowPtr flat_row = flat_rows_data[cur_row]; + BufferedTupleStream::FlatRowPtr flat_row = flat_rows_data[cur_row]; if (!expr_vals_cache->IsRowNull() && UNLIKELY(!hash_tbl_->Insert(ht_ctx, flat_row, row, status))) { return false; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-builder.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc index a2f7c96..2dc2d8d 100644 --- a/be/src/exec/partitioned-hash-join-builder.cc +++ b/be/src/exec/partitioned-hash-join-builder.cc @@ -25,7 +25,7 @@ #include "exec/hash-table.inline.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" -#include "runtime/buffered-tuple-stream-v2.h" +#include "runtime/buffered-tuple-stream.h" #include "runtime/exec-env.h" #include "runtime/mem-tracker.h" #include "runtime/query-state.h" @@ -293,11 +293,11 @@ Status PhjBuilder::CreateHashPartitions(int level) { } bool PhjBuilder::AppendRowStreamFull( - BufferedTupleStreamV2* stream, TupleRow* row, Status* status) noexcept { + BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept { while (true) { // We ran out of memory. Pick a partition to spill. If we ran out of unspilled // partitions, SpillPartition() will return an error status. - *status = SpillPartition(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT); + *status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); if (!status->ok()) return false; if (stream->AddRow(row, status)) return true; if (!status->ok()) return false; @@ -307,7 +307,7 @@ bool PhjBuilder::AppendRowStreamFull( } // TODO: can we do better with a different spilling heuristic? -Status PhjBuilder::SpillPartition(BufferedTupleStreamV2::UnpinMode mode) { +Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) { DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT); int64_t max_freed_mem = 0; int partition_idx = -1; @@ -367,7 +367,7 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() { partition->Close(NULL); } else if (partition->is_spilled()) { // We don't need any build-side data for spilled partitions in memory. - partition->build_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL); } } @@ -386,7 +386,7 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() { RETURN_IF_ERROR(partition->BuildHashTable(&built)); // If we did not have enough memory to build this hash table, we need to spill this // partition (clean up the hash table, unpin build). - if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStreamV2::UNPIN_ALL)); + if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL)); } // We may have spilled additional partitions while building hash tables, we need to @@ -423,9 +423,9 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() { while (probe_streams_to_create > 0) { // Create stream in vector, so that it will be cleaned up after any failure. spilled_partition_probe_streams_.emplace_back( - make_unique<BufferedTupleStreamV2>(runtime_state_, probe_row_desc_, + make_unique<BufferedTupleStream>(runtime_state_, probe_row_desc_, buffer_pool_client_, spillable_buffer_size_, spillable_buffer_size_)); - BufferedTupleStreamV2* probe_stream = spilled_partition_probe_streams_.back().get(); + BufferedTupleStream* probe_stream = spilled_partition_probe_streams_.back().get(); RETURN_IF_ERROR(probe_stream->Init(join_node_id_, false)); // Loop until either the stream gets a buffer or all partitions are spilled (in which @@ -435,7 +435,7 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() { RETURN_IF_ERROR(probe_stream->PrepareForWrite(&got_buffer)); if (got_buffer) break; - RETURN_IF_ERROR(SpillPartition(BufferedTupleStreamV2::UNPIN_ALL)); + RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL)); ++probe_streams_to_create; } --probe_streams_to_create; @@ -443,7 +443,7 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() { return Status::OK(); } -vector<unique_ptr<BufferedTupleStreamV2>> PhjBuilder::TransferProbeStreams() { +vector<unique_ptr<BufferedTupleStream>> PhjBuilder::TransferProbeStreams() { return std::move(spilled_partition_probe_streams_); } @@ -453,7 +453,7 @@ void PhjBuilder::CloseAndDeletePartitions() { all_partitions_.clear(); hash_partitions_.clear(); null_aware_partition_ = NULL; - for (unique_ptr<BufferedTupleStreamV2>& stream : spilled_partition_probe_streams_) { + for (unique_ptr<BufferedTupleStream>& stream : spilled_partition_probe_streams_) { stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); } spilled_partition_probe_streams_.clear(); @@ -505,14 +505,14 @@ void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) { } Status PhjBuilder::RepartitionBuildInput( - Partition* input_partition, int level, BufferedTupleStreamV2* input_probe_rows) { + Partition* input_partition, int level, BufferedTupleStream* input_probe_rows) { DCHECK_GE(level, 1); SCOPED_TIMER(repartition_timer_); COUNTER_ADD(num_repartitions_, 1); RuntimeState* state = runtime_state_; // Setup the read buffer and the new partitions. - BufferedTupleStreamV2* build_rows = input_partition->build_rows(); + BufferedTupleStream* build_rows = input_partition->build_rows(); DCHECK(build_rows != NULL); bool got_read_buffer; RETURN_IF_ERROR(build_rows->PrepareForRead(true, &got_read_buffer)); @@ -545,7 +545,7 @@ Status PhjBuilder::RepartitionBuildInput( bool got_buffer; RETURN_IF_ERROR(input_probe_rows->PrepareForRead(true, &got_buffer)); if (got_buffer) break; - RETURN_IF_ERROR(SpillPartition(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT)); + RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); } RETURN_IF_ERROR(FlushFinal(state)); @@ -573,7 +573,7 @@ bool PhjBuilder::HashTableStoresNulls() const { PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int level) : parent_(parent), is_spilled_(false), level_(level) { - build_rows_ = make_unique<BufferedTupleStreamV2>(state, parent_->row_desc_, + build_rows_ = make_unique<BufferedTupleStream>(state, parent_->row_desc_, parent_->buffer_pool_client_, parent->spillable_buffer_size_, parent->spillable_buffer_size_); } @@ -602,7 +602,7 @@ void PhjBuilder::Partition::Close(RowBatch* batch) { } } -Status PhjBuilder::Partition::Spill(BufferedTupleStreamV2::UnpinMode mode) { +Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) { DCHECK(!IsClosed()); RETURN_IF_ERROR(parent_->runtime_state_->StartSpilling(parent_->mem_tracker())); // Close the hash table and unpin the stream backing it to free memory. @@ -634,7 +634,7 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) { HashTableCtx* ctx = parent_->ht_ctx_.get(); ctx->set_level(level()); // Set the hash function for building the hash table. RowBatch batch(parent_->row_desc_, state->batch_size(), parent_->mem_tracker()); - vector<BufferedTupleStreamV2::FlatRowPtr> flat_rows; + vector<BufferedTupleStream::FlatRowPtr> flat_rows; bool eos = false; // Allocate the partition-local hash table. Initialize the number of buckets based on http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-builder.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h index 912613d..277579c 100644 --- a/be/src/exec/partitioned-hash-join-builder.h +++ b/be/src/exec/partitioned-hash-join-builder.h @@ -26,7 +26,7 @@ #include "exec/data-sink.h" #include "exec/filter-context.h" #include "exec/hash-table.h" -#include "runtime/buffered-tuple-stream-v2.h" +#include "runtime/buffered-tuple-stream.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/bufferpool/suballocator.h" @@ -103,7 +103,7 @@ class PhjBuilder : public DataSink { /// Transfer ownership of the probe streams to the caller. One stream was allocated per /// spilled partition in FlushFinal(). The probe streams are empty but prepared for /// writing with a write buffer allocated. - std::vector<std::unique_ptr<BufferedTupleStreamV2>> TransferProbeStreams(); + std::vector<std::unique_ptr<BufferedTupleStream>> TransferProbeStreams(); /// Clears the current list of hash partitions. Called after probing of the partitions /// is done. The partitions are not closed or destroyed, since they may be spilled or @@ -124,7 +124,7 @@ class PhjBuilder : public DataSink { /// 'input_probe_rows' for reading in "delete_on_read" mode, so that the probe phase /// has enough buffers preallocated to execute successfully. Status RepartitionBuildInput(Partition* input_partition, int level, - BufferedTupleStreamV2* input_probe_rows) WARN_UNUSED_RESULT; + BufferedTupleStream* input_probe_rows) WARN_UNUSED_RESULT; /// Returns the largest build row count out of the current hash partitions. int64_t LargestPartitionRows() const; @@ -201,10 +201,10 @@ class PhjBuilder : public DataSink { /// Spills this partition, the partition's stream is unpinned with 'mode' and /// its hash table is destroyed if it was built. - Status Spill(BufferedTupleStreamV2::UnpinMode mode) WARN_UNUSED_RESULT; + Status Spill(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT; bool ALWAYS_INLINE IsClosed() const { return build_rows_ == NULL; } - BufferedTupleStreamV2* ALWAYS_INLINE build_rows() { return build_rows_.get(); } + BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); } HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); } bool ALWAYS_INLINE is_spilled() const { return is_spilled_; } int ALWAYS_INLINE level() const { return level_; } @@ -220,7 +220,7 @@ class PhjBuilder : public DataSink { /// failed: if 'status' is ok, inserting failed because not enough reservation /// was available and if 'status' is an error, inserting failed because of that error. bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx, - RowBatch* batch, const std::vector<BufferedTupleStreamV2::FlatRowPtr>& flat_rows, + RowBatch* batch, const std::vector<BufferedTupleStream::FlatRowPtr>& flat_rows, Status* status); const PhjBuilder* parent_; @@ -239,7 +239,7 @@ class PhjBuilder : public DataSink { /// Stream of build tuples in this partition. Initially owned by this object but /// transferred to the parent exec node (via the row batch) when the partition /// is closed. If NULL, ownership has been transferred and the partition is closed. - std::unique_ptr<BufferedTupleStreamV2> build_rows_; + std::unique_ptr<BufferedTupleStream> build_rows_; }; /// Computes the minimum number of buffers required to execute the spilling partitioned @@ -288,19 +288,19 @@ class PhjBuilder : public DataSink { /// partitions. This odd return convention is used to avoid emitting unnecessary code /// for ~Status in perf-critical code. bool AppendRow( - BufferedTupleStreamV2* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT; + BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT; /// Slow path for AppendRow() above. It is called when the stream has failed to append /// the row. We need to find more memory by either switching to IO-buffers, in case the /// stream still uses small buffers, or spilling a partition. Returns false and sets /// 'status' if it was unable to append the row, even after spilling partitions. - bool AppendRowStreamFull(BufferedTupleStreamV2* stream, TupleRow* row, + bool AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT; /// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed /// to the Spill() call for the selected partition. The current policy is to spill the /// largest partition. Returns non-ok status if we couldn't spill a partition. - Status SpillPartition(BufferedTupleStreamV2::UnpinMode mode) WARN_UNUSED_RESULT; + Status SpillPartition(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT; /// Tries to build hash tables for all unspilled hash partitions. Called after /// FlushFinal() when all build rows have been partitioned and added to the appropriate @@ -464,7 +464,7 @@ class PhjBuilder : public DataSink { /// /// Because of this, at the end of the build phase, we always have sufficient memory /// to execute the probe phase of the algorithm without spilling more partitions. - std::vector<std::unique_ptr<BufferedTupleStreamV2>> spilled_partition_probe_streams_; + std::vector<std::unique_ptr<BufferedTupleStream>> spilled_partition_probe_streams_; /// END: Members that must be Reset() ///////////////////////////////////////// @@ -479,7 +479,7 @@ class PhjBuilder : public DataSink { ProcessBuildBatchFn process_build_batch_fn_level0_; typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*, RowBatch*, - const std::vector<BufferedTupleStreamV2::FlatRowPtr>&, Status*); + const std::vector<BufferedTupleStream::FlatRowPtr>&, Status*); /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled. InsertBatchFn insert_batch_fn_; InsertBatchFn insert_batch_fn_level0_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc index b890eb9..3106419 100644 --- a/be/src/exec/partitioned-hash-join-node-ir.cc +++ b/be/src/exec/partitioned-hash-join-node-ir.cc @@ -313,7 +313,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow( // The partition is not in memory, spill the probe row and move to the next row. // Skip the current row if we manage to append to the spilled partition's BTS. // Otherwise, we need to bail out and report the failure. - BufferedTupleStreamV2* probe_rows = probe_partition->probe_rows(); + BufferedTupleStream* probe_rows = probe_partition->probe_rows(); if (UNLIKELY(!AppendProbeRow(probe_rows, current_probe_row_, status))) { DCHECK(!status->ok()); return false; @@ -438,7 +438,7 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode } inline bool PartitionedHashJoinNode::AppendProbeRow( - BufferedTupleStreamV2* stream, TupleRow* row, Status* status) { + BufferedTupleStream* stream, TupleRow* row, Status* status) { DCHECK(stream->has_write_iterator()); DCHECK(!stream->is_pinned()); return stream->AddRow(row, status); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index 2db9e00..806bdc0 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -27,7 +27,7 @@ #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" #include "exprs/slot-ref.h" -#include "runtime/buffered-tuple-stream-v2.inline.h" +#include "runtime/buffered-tuple-stream.inline.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" @@ -265,7 +265,7 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) { PartitionedHashJoinNode::ProbePartition::ProbePartition(RuntimeState* state, PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition, - unique_ptr<BufferedTupleStreamV2> probe_rows) + unique_ptr<BufferedTupleStream> probe_rows) : build_partition_(build_partition), probe_rows_(std::move(probe_rows)) { DCHECK(probe_rows_->has_write_iterator()); @@ -328,7 +328,7 @@ Status PartitionedHashJoinNode::NextSpilledProbeRowBatch( probe_batch_pos_ = -1; return Status::OK(); } - BufferedTupleStreamV2* probe_rows = input_partition_->probe_rows(); + BufferedTupleStream* probe_rows = input_partition_->probe_rows(); if (LIKELY(probe_rows->rows_returned() < probe_rows->num_rows())) { // Continue from the current probe stream. bool eos = false; @@ -420,9 +420,9 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe( ht_ctx_->set_level(next_partition_level); // Spill to free memory from hash tables and pinned streams for use in new partitions. - RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStreamV2::UNPIN_ALL)); + RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStream::UNPIN_ALL)); // Temporarily free up the probe buffer to use when repartitioning. - input_partition_->probe_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL); DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0) << NodeDebugString(); DCHECK_EQ(input_partition_->probe_rows()->BytesPinned(false), 0) << NodeDebugString(); int64_t num_input_rows = build_partition->build_rows()->num_rows(); @@ -822,7 +822,7 @@ static Status NullAwareAntiJoinError(bool build) { Status PartitionedHashJoinNode::InitNullAwareProbePartition() { RuntimeState* state = runtime_state_; - unique_ptr<BufferedTupleStreamV2> probe_rows = make_unique<BufferedTupleStreamV2>( + unique_ptr<BufferedTupleStream> probe_rows = make_unique<BufferedTupleStream>( state, child(0)->row_desc(), &buffer_pool_client_, resource_profile_.spillable_buffer_size, resource_profile_.spillable_buffer_size); @@ -847,7 +847,7 @@ error: Status PartitionedHashJoinNode::InitNullProbeRows() { RuntimeState* state = runtime_state_; - null_probe_rows_ = make_unique<BufferedTupleStreamV2>(state, child(0)->row_desc(), + null_probe_rows_ = make_unique<BufferedTupleStream>(state, child(0)->row_desc(), &buffer_pool_client_, resource_profile_.spillable_buffer_size, resource_profile_.spillable_buffer_size); // TODO: we shouldn't start with this unpinned if spilling is disabled. @@ -866,8 +866,8 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() { DCHECK_EQ(probe_batch_pos_, -1); DCHECK_EQ(probe_batch_->num_rows(), 0); - BufferedTupleStreamV2* build_stream = builder_->null_aware_partition()->build_rows(); - BufferedTupleStreamV2* probe_stream = null_aware_probe_partition_->probe_rows(); + BufferedTupleStream* build_stream = builder_->null_aware_partition()->build_rows(); + BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows(); if (build_stream->num_rows() == 0) { // There were no build rows. Nothing to do. Just prepare to output the null @@ -904,7 +904,7 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state, int num_join_conjuncts = other_join_conjuncts_.size(); DCHECK(probe_batch_ != NULL); - BufferedTupleStreamV2* probe_stream = null_aware_probe_partition_->probe_rows(); + BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows(); if (probe_batch_pos_ == probe_batch_->num_rows()) { probe_batch_pos_ = 0; probe_batch_->TransferResourceOwnership(out_batch); @@ -952,7 +952,7 @@ Status PartitionedHashJoinNode::PrepareForProbe() { DCHECK(probe_hash_partitions_.empty()); // Initialize the probe partitions, providing them with probe streams. - vector<unique_ptr<BufferedTupleStreamV2>> probe_streams = + vector<unique_ptr<BufferedTupleStream>> probe_streams = builder_->TransferProbeStreams(); probe_hash_partitions_.resize(PARTITION_FANOUT); for (int i = 0; i < PARTITION_FANOUT; ++i) { @@ -989,7 +989,7 @@ Status PartitionedHashJoinNode::PrepareForProbe() { } void PartitionedHashJoinNode::CreateProbePartition( - int partition_idx, unique_ptr<BufferedTupleStreamV2> probe_rows) { + int partition_idx, unique_ptr<BufferedTupleStream> probe_rows) { DCHECK_GE(partition_idx, 0); DCHECK_LT(partition_idx, probe_hash_partitions_.size()); DCHECK(probe_hash_partitions_[partition_idx] == NULL); @@ -998,7 +998,7 @@ void PartitionedHashJoinNode::CreateProbePartition( } Status PartitionedHashJoinNode::EvaluateNullProbe( - RuntimeState* state, BufferedTupleStreamV2* build) { + RuntimeState* state, BufferedTupleStream* build) { if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) { return Status::OK(); } @@ -1067,9 +1067,9 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions( // can recurse the algorithm and create new hash partitions from spilled partitions. // TODO: we shouldn't need to unpin the build stream if we stop spilling // while probing. - build_partition->build_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL); DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0); - probe_partition->probe_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL); if (probe_partition->probe_rows()->num_rows() != 0 || NeedToProcessUnmatchedBuildRows()) { @@ -1108,7 +1108,7 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions( // Just finished evaluating the null probe rows with all the non-spilled build // partitions. Unpin this now to free this memory for repartitioning. if (null_probe_rows_ != NULL) { - null_probe_rows_->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT); + null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); } builder_->ClearHashPartitions(); @@ -1170,7 +1170,7 @@ string PartitionedHashJoinNode::NodeDebugString() const { ss << " Probe hash partition " << i << ": "; if (probe_partition != NULL) { ss << "probe ptr=" << probe_partition; - BufferedTupleStreamV2* probe_rows = probe_partition->probe_rows(); + BufferedTupleStream* probe_rows = probe_partition->probe_rows(); if (probe_rows != NULL) { ss << " Probe Rows: " << probe_rows->num_rows() << " (Bytes pinned: " << probe_rows->BytesPinned(false) << ")"; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index b3f663e..6ed5269 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -162,7 +162,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// Creates an initialized probe partition at 'partition_idx' in /// 'probe_hash_partitions_'. void CreateProbePartition( - int partition_idx, std::unique_ptr<BufferedTupleStreamV2> probe_rows); + int partition_idx, std::unique_ptr<BufferedTupleStream> probe_rows); /// Append the probe row 'row' to 'stream'. The stream must be unpinned and must have /// a write buffer allocated, so this will succeed unless an error is encountered. @@ -170,7 +170,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// return convention is used to avoid emitting unnecessary code for ~Status in perf- /// critical code. bool AppendProbeRow( - BufferedTupleStreamV2* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT; + BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT; /// Probes the hash table for rows matching the current probe row and appends /// all the matching build rows (with probe row) to output batch. Returns true @@ -325,7 +325,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// conjuncts pass (i.e. there is a match). /// This is used for NAAJ, when there are NULL probe rows. Status EvaluateNullProbe( - RuntimeState* state, BufferedTupleStreamV2* build) WARN_UNUSED_RESULT; + RuntimeState* state, BufferedTupleStream* build) WARN_UNUSED_RESULT; /// Prepares to output NULLs on the probe side for NAAJ. Before calling this, /// matched_null_probe_ should have been fully evaluated. @@ -472,7 +472,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// For NAAJ, this stream contains all probe rows that had NULL on the hash table /// conjuncts. Must be unique_ptr so we can release it and transfer to output batches. - std::unique_ptr<BufferedTupleStreamV2> null_probe_rows_; + std::unique_ptr<BufferedTupleStream> null_probe_rows_; /// For each row in null_probe_rows_, true if this row has matched any build row /// (i.e. the resulting joined row passes other_join_conjuncts). @@ -504,7 +504,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// that has been prepared for writing with an I/O-sized write buffer. ProbePartition(RuntimeState* state, PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition, - std::unique_ptr<BufferedTupleStreamV2> probe_rows); + std::unique_ptr<BufferedTupleStream> probe_rows); ~ProbePartition(); /// Prepare to read the probe rows. Allocates the first read block, so reads will @@ -517,7 +517,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// resources if 'batch' is NULL. Idempotent. void Close(RowBatch* batch); - BufferedTupleStreamV2* ALWAYS_INLINE probe_rows() { return probe_rows_.get(); } + BufferedTupleStream* ALWAYS_INLINE probe_rows() { return probe_rows_.get(); } PhjBuilder::Partition* build_partition() { return build_partition_; } inline bool IsClosed() const { return probe_rows_ == NULL; } @@ -529,7 +529,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// Stream of probe tuples in this partition. Initially owned by this object but /// transferred to the parent exec node (via the row batch) when the partition /// is complete. If NULL, ownership was transferred and the partition is closed. - std::unique_ptr<BufferedTupleStreamV2> probe_rows_; + std::unique_ptr<BufferedTupleStream> probe_rows_; }; /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/exec/partitioned-hash-join-node.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.inline.h b/be/src/exec/partitioned-hash-join-node.inline.h index 3441aac..a53b40e 100644 --- a/be/src/exec/partitioned-hash-join-node.inline.h +++ b/be/src/exec/partitioned-hash-join-node.inline.h @@ -20,7 +20,7 @@ #include "exec/partitioned-hash-join-node.h" -#include "runtime/buffered-tuple-stream-v2.inline.h" +#include "runtime/buffered-tuple-stream.inline.h" namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/runtime/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 92af968..391fd01 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -24,7 +24,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime") set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime") add_library(Runtime - buffered-tuple-stream-v2.cc + buffered-tuple-stream.cc client-cache.cc coordinator.cc coordinator-backend-state.cc @@ -91,7 +91,7 @@ ADD_BE_TEST(thread-resource-mgr-test) ADD_BE_TEST(mem-tracker-test) ADD_BE_TEST(multi-precision-test) ADD_BE_TEST(decimal-test) -ADD_BE_TEST(buffered-tuple-stream-v2-test) +ADD_BE_TEST(buffered-tuple-stream-test) ADD_BE_TEST(hdfs-fs-cache-test) ADD_BE_TEST(tmp-file-mgr-test) ADD_BE_TEST(row-batch-serialize-test)