http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/runtime/buffered-tuple-stream-v2.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc b/be/src/runtime/buffered-tuple-stream-v2.cc deleted file mode 100644 index 2153264..0000000 --- a/be/src/runtime/buffered-tuple-stream-v2.cc +++ /dev/null @@ -1,1084 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/buffered-tuple-stream-v2.inline.h" - -#include <boost/bind.hpp> -#include <gutil/strings/substitute.h> - -#include "runtime/bufferpool/reservation-tracker.h" -#include "runtime/collection-value.h" -#include "runtime/descriptors.h" -#include "runtime/exec-env.h" -#include "runtime/mem-tracker.h" -#include "runtime/row-batch.h" -#include "runtime/runtime-state.h" -#include "runtime/string-value.h" -#include "runtime/tuple-row.h" -#include "util/bit-util.h" -#include "util/debug-util.h" -#include "util/runtime-profile-counters.h" - -#include "common/names.h" - -#ifdef NDEBUG -#define CHECK_CONSISTENCY_FAST() -#define CHECK_CONSISTENCY_FULL() -#else -#define CHECK_CONSISTENCY_FAST() CheckConsistencyFast() -#define CHECK_CONSISTENCY_FULL() CheckConsistencyFull() -#endif - -using namespace impala; -using namespace strings; - -using BufferHandle = BufferPool::BufferHandle; - -BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state, - const RowDescriptor* row_desc, BufferPool::ClientHandle* buffer_pool_client, - int64_t default_page_len, int64_t max_page_len, const set<SlotId>& ext_varlen_slots) - : state_(state), - desc_(row_desc), - node_id_(-1), - buffer_pool_(state->exec_env()->buffer_pool()), - buffer_pool_client_(buffer_pool_client), - num_pages_(0), - total_byte_size_(0), - has_read_iterator_(false), - read_page_reservation_(buffer_pool_client_), - read_page_rows_returned_(-1), - read_ptr_(nullptr), - read_end_ptr_(nullptr), - write_ptr_(nullptr), - write_end_ptr_(nullptr), - rows_returned_(0), - has_write_iterator_(false), - write_page_(nullptr), - write_page_reservation_(buffer_pool_client_), - bytes_pinned_(0), - num_rows_(0), - default_page_len_(default_page_len), - max_page_len_(max_page_len), - has_nullable_tuple_(row_desc->IsAnyTupleNullable()), - delete_on_read_(false), - closed_(false), - pinned_(true) { - DCHECK_GE(max_page_len, default_page_len); - DCHECK(BitUtil::IsPowerOf2(default_page_len)) << default_page_len; - DCHECK(BitUtil::IsPowerOf2(max_page_len)) << max_page_len; - read_page_ = pages_.end(); - for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) { - const TupleDescriptor* tuple_desc = desc_->tuple_descriptors()[i]; - const int tuple_byte_size = tuple_desc->byte_size(); - fixed_tuple_sizes_.push_back(tuple_byte_size); - - vector<SlotDescriptor*> tuple_string_slots; - vector<SlotDescriptor*> tuple_coll_slots; - for (int j = 0; j < tuple_desc->slots().size(); ++j) { - SlotDescriptor* slot = tuple_desc->slots()[j]; - if (!slot->type().IsVarLenType()) continue; - if (ext_varlen_slots.find(slot->id()) == ext_varlen_slots.end()) { - if (slot->type().IsVarLenStringType()) { - tuple_string_slots.push_back(slot); - } else { - DCHECK(slot->type().IsCollectionType()); - tuple_coll_slots.push_back(slot); - } - } - } - if (!tuple_string_slots.empty()) { - inlined_string_slots_.push_back(make_pair(i, tuple_string_slots)); - } - - if (!tuple_coll_slots.empty()) { - inlined_coll_slots_.push_back(make_pair(i, tuple_coll_slots)); - } - } -} - -BufferedTupleStreamV2::~BufferedTupleStreamV2() { - DCHECK(closed_); -} - -void BufferedTupleStreamV2::CheckConsistencyFull() const { - CheckConsistencyFast(); - // The below checks require iterating over all the pages in the stream. - DCHECK_EQ(bytes_pinned_, CalcBytesPinned()) << DebugString(); - DCHECK_EQ(pages_.size(), num_pages_) << DebugString(); - for (const Page& page : pages_) CheckPageConsistency(&page); -} - -void BufferedTupleStreamV2::CheckConsistencyFast() const { - // All the below checks should be O(1). - DCHECK(has_write_iterator() || write_page_ == nullptr); - if (write_page_ != nullptr) { - CheckPageConsistency(write_page_); - DCHECK(write_page_->is_pinned()); - DCHECK(write_page_->retrieved_buffer); - const BufferHandle* write_buffer; - Status status = write_page_->GetBuffer(&write_buffer); - DCHECK(status.ok()); // Write buffer should never have been unpinned. - DCHECK_GE(write_ptr_, write_buffer->data()); - DCHECK_EQ(write_end_ptr_, write_buffer->data() + write_page_->len()); - DCHECK_GE(write_end_ptr_, write_ptr_); - } - DCHECK(has_read_iterator() || read_page_ == pages_.end()); - if (read_page_ != pages_.end()) { - CheckPageConsistency(&*read_page_); - DCHECK(read_page_->is_pinned()); - DCHECK(read_page_->retrieved_buffer); - // Can't check read buffer without affecting behaviour, because a read may be in - // flight and this would required blocking on that write. - DCHECK_GE(read_end_ptr_, read_ptr_); - } - if (NeedReadReservation()) { - DCHECK_EQ(default_page_len_, read_page_reservation_.GetReservation()) - << DebugString(); - } else if (!read_page_reservation_.is_closed()) { - DCHECK_EQ(0, read_page_reservation_.GetReservation()); - } - if (NeedWriteReservation()) { - DCHECK_EQ(default_page_len_, write_page_reservation_.GetReservation()); - } else if (!write_page_reservation_.is_closed()) { - DCHECK_EQ(0, write_page_reservation_.GetReservation()); - } -} - -void BufferedTupleStreamV2::CheckPageConsistency(const Page* page) const { - DCHECK_EQ(ExpectedPinCount(pinned_, page), page->pin_count()) << DebugString(); - // Only one large row per page. - if (page->len() > default_page_len_) DCHECK_LE(page->num_rows, 1); - // We only create pages when we have a row to append to them. - DCHECK_GT(page->num_rows, 0); -} - -string BufferedTupleStreamV2::DebugString() const { - stringstream ss; - ss << "BufferedTupleStreamV2 num_rows=" << num_rows_ - << " rows_returned=" << rows_returned_ << " pinned=" << pinned_ - << " delete_on_read=" << delete_on_read_ << " closed=" << closed_ << "\n" - << " bytes_pinned=" << bytes_pinned_ << " has_write_iterator=" << has_write_iterator_ - << " write_page=" << write_page_ << " has_read_iterator=" << has_read_iterator_ - << " read_page="; - if (read_page_ == pages_.end()) { - ss << "<end>"; - } else { - ss << &*read_page_; - } - ss << "\n" - << " read_page_reservation="; - if (read_page_reservation_.is_closed()) { - ss << "<closed>"; - } else { - ss << read_page_reservation_.GetReservation(); - } - ss << " write_page_reservation="; - if (write_page_reservation_.is_closed()) { - ss << "<closed>"; - } else { - ss << write_page_reservation_.GetReservation(); - } - ss << "\n # pages=" << num_pages_ << " pages=[\n"; - for (const Page& page : pages_) { - ss << "{" << page.DebugString() << "}"; - if (&page != &pages_.back()) ss << ",\n"; - } - ss << "]"; - return ss.str(); -} - -string BufferedTupleStreamV2::Page::DebugString() const { - return Substitute("$0 num_rows=$1", handle.DebugString(), num_rows); -} - -Status BufferedTupleStreamV2::Init(int node_id, bool pinned) { - if (!pinned) UnpinStream(UNPIN_ALL_EXCEPT_CURRENT); - node_id_ = node_id; - return Status::OK(); -} - -Status BufferedTupleStreamV2::PrepareForWrite(bool* got_reservation) { - // This must be the first iterator created. - DCHECK(pages_.empty()); - DCHECK(!delete_on_read_); - DCHECK(!has_write_iterator()); - DCHECK(!has_read_iterator()); - CHECK_CONSISTENCY_FULL(); - - *got_reservation = buffer_pool_client_->IncreaseReservationToFit(default_page_len_); - if (!*got_reservation) return Status::OK(); - has_write_iterator_ = true; - // Save reservation for the write iterators. - buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); - CHECK_CONSISTENCY_FULL(); - return Status::OK(); -} - -Status BufferedTupleStreamV2::PrepareForReadWrite( - bool delete_on_read, bool* got_reservation) { - // This must be the first iterator created. - DCHECK(pages_.empty()); - DCHECK(!delete_on_read_); - DCHECK(!has_write_iterator()); - DCHECK(!has_read_iterator()); - CHECK_CONSISTENCY_FULL(); - - *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * default_page_len_); - if (!*got_reservation) return Status::OK(); - has_write_iterator_ = true; - // Save reservation for both the read and write iterators. - buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_); - buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); - RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read)); - return Status::OK(); -} - -void BufferedTupleStreamV2::Close(RowBatch* batch, RowBatch::FlushMode flush) { - for (Page& page : pages_) { - if (batch != nullptr && page.retrieved_buffer) { - // Subtle: We only need to attach buffers from pages that we may have returned - // references to. ExtractBuffer() cannot fail for these pages because the data - // is guaranteed to already be in -memory. - BufferPool::BufferHandle buffer; - Status status = buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer); - DCHECK(status.ok()); - batch->AddBuffer(buffer_pool_client_, move(buffer), flush); - } else { - buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle); - } - } - read_page_reservation_.Close(); - write_page_reservation_.Close(); - pages_.clear(); - num_pages_ = 0; - bytes_pinned_ = 0; - closed_ = true; -} - -int64_t BufferedTupleStreamV2::CalcBytesPinned() const { - int64_t result = 0; - for (const Page& page : pages_) result += page.pin_count() * page.len(); - return result; -} - -Status BufferedTupleStreamV2::PinPage(Page* page) { - RETURN_IF_ERROR(buffer_pool_->Pin(buffer_pool_client_, &page->handle)); - bytes_pinned_ += page->len(); - return Status::OK(); -} - -int BufferedTupleStreamV2::ExpectedPinCount(bool stream_pinned, const Page* page) const { - return (stream_pinned || is_read_page(page) || is_write_page(page)) ? 1 : 0; -} - -Status BufferedTupleStreamV2::PinPageIfNeeded(Page* page, bool stream_pinned) { - int new_pin_count = ExpectedPinCount(stream_pinned, page); - if (new_pin_count != page->pin_count()) { - DCHECK_EQ(new_pin_count, page->pin_count() + 1); - RETURN_IF_ERROR(PinPage(page)); - } - return Status::OK(); -} - -void BufferedTupleStreamV2::UnpinPageIfNeeded(Page* page, bool stream_pinned) { - int new_pin_count = ExpectedPinCount(stream_pinned, page); - if (new_pin_count != page->pin_count()) { - DCHECK_EQ(new_pin_count, page->pin_count() - 1); - buffer_pool_->Unpin(buffer_pool_client_, &page->handle); - bytes_pinned_ -= page->len(); - if (page->pin_count() == 0) page->retrieved_buffer = false; - } -} - -bool BufferedTupleStreamV2::NeedWriteReservation() const { - return NeedWriteReservation(pinned_); -} - -bool BufferedTupleStreamV2::NeedWriteReservation(bool stream_pinned) const { - return NeedWriteReservation(stream_pinned, num_pages_, has_write_iterator(), - write_page_ != nullptr, has_read_write_page()); -} - -bool BufferedTupleStreamV2::NeedWriteReservation(bool stream_pinned, int64_t num_pages, - bool has_write_iterator, bool has_write_page, bool has_read_write_page) { - if (!has_write_iterator) return false; - // If the stream is empty the write reservation hasn't been used yet. - if (num_pages == 0) return true; - if (stream_pinned) { - // Make sure we've saved the write reservation for the next page if the only - // page is a read/write page. - return has_read_write_page && num_pages == 1; - } else { - // Make sure we've saved the write reservation if it's not being used to pin - // a page in the stream. - return !has_write_page || has_read_write_page; - } -} - -bool BufferedTupleStreamV2::NeedReadReservation() const { - return NeedReadReservation(pinned_); -} - -bool BufferedTupleStreamV2::NeedReadReservation(bool stream_pinned) const { - return NeedReadReservation( - stream_pinned, num_pages_, has_read_iterator(), read_page_ != pages_.end()); -} - -bool BufferedTupleStreamV2::NeedReadReservation(bool stream_pinned, int64_t num_pages, - bool has_read_iterator, bool has_read_page) const { - return NeedReadReservation(stream_pinned, num_pages, has_read_iterator, has_read_page, - has_write_iterator(), write_page_ != nullptr); -} - -bool BufferedTupleStreamV2::NeedReadReservation(bool stream_pinned, int64_t num_pages, - bool has_read_iterator, bool has_read_page, bool has_write_iterator, - bool has_write_page) { - if (!has_read_iterator) return false; - if (stream_pinned) { - // Need reservation if there are no pages currently pinned for reading but we may add - // a page. - return num_pages == 0 && has_write_iterator; - } else { - // Only need to save reservation for an unpinned stream if there is no read page - // and we may advance to one in the future. - return (has_write_iterator || num_pages > 0) && !has_read_page; - } -} - -Status BufferedTupleStreamV2::NewWritePage(int64_t page_len) noexcept { - DCHECK(!closed_); - DCHECK(write_page_ == nullptr); - - Page new_page; - const BufferHandle* write_buffer; - RETURN_IF_ERROR(buffer_pool_->CreatePage( - buffer_pool_client_, page_len, &new_page.handle, &write_buffer)); - bytes_pinned_ += page_len; - total_byte_size_ += page_len; - - pages_.push_back(std::move(new_page)); - ++num_pages_; - write_page_ = &pages_.back(); - DCHECK_EQ(write_page_->num_rows, 0); - write_ptr_ = write_buffer->data(); - write_end_ptr_ = write_ptr_ + page_len; - return Status::OK(); -} - -Status BufferedTupleStreamV2::CalcPageLenForRow(int64_t row_size, int64_t* page_len) { - if (UNLIKELY(row_size > max_page_len_)) { - return Status(TErrorCode::MAX_ROW_SIZE, - PrettyPrinter::Print(row_size, TUnit::BYTES), node_id_, - PrettyPrinter::Print(0, TUnit::BYTES)); - } - *page_len = max(default_page_len_, BitUtil::RoundUpToPowerOfTwo(row_size)); - return Status::OK(); -} - -Status BufferedTupleStreamV2::AdvanceWritePage( - int64_t row_size, bool* got_reservation) noexcept { - DCHECK(has_write_iterator()); - CHECK_CONSISTENCY_FAST(); - - int64_t page_len; - RETURN_IF_ERROR(CalcPageLenForRow(row_size, &page_len)); - - // Reservation may have been saved for the next write page, e.g. by PrepareForWrite() - // if the stream is empty. - int64_t write_reservation_to_restore = 0, read_reservation_to_restore = 0; - if (NeedWriteReservation( - pinned_, num_pages_, true, write_page_ != nullptr, has_read_write_page()) - && !NeedWriteReservation(pinned_, num_pages_ + 1, true, true, false)) { - write_reservation_to_restore = default_page_len_; - } - // If the stream is pinned, we need to keep the previous write page pinned for reading. - // Check if we saved reservation for this case. - if (NeedReadReservation(pinned_, num_pages_, has_read_iterator(), - read_page_ != pages_.end(), true, write_page_ != nullptr) - && !NeedReadReservation(pinned_, num_pages_ + 1, has_read_iterator(), - read_page_ != pages_.end(), true, true)) { - read_reservation_to_restore = default_page_len_; - } - - // We may reclaim reservation by unpinning a page that was pinned for writing. - int64_t write_page_reservation_to_reclaim = - (write_page_ != nullptr && !pinned_ && !has_read_write_page()) ? - write_page_->len() : 0; - // Check to see if we can get the reservation before changing the state of the stream. - if (!buffer_pool_client_->IncreaseReservationToFit(page_len - - write_reservation_to_restore - read_reservation_to_restore - - write_page_reservation_to_reclaim)) { - DCHECK(pinned_ || page_len > default_page_len_) - << "If the stream is unpinned, this should only fail for large pages"; - CHECK_CONSISTENCY_FAST(); - *got_reservation = false; - return Status::OK(); - } - if (write_reservation_to_restore > 0) { - buffer_pool_client_->RestoreReservation( - &write_page_reservation_, write_reservation_to_restore); - } - if (read_reservation_to_restore > 0) { - buffer_pool_client_->RestoreReservation( - &read_page_reservation_, read_reservation_to_restore); - } - ResetWritePage(); - RETURN_IF_ERROR(NewWritePage(page_len)); - *got_reservation = true; - return Status::OK(); -} - -void BufferedTupleStreamV2::ResetWritePage() { - if (write_page_ == nullptr) return; - // Unpin the write page if we're reading in unpinned mode. - Page* prev_write_page = write_page_; - write_page_ = nullptr; - write_ptr_ = nullptr; - write_end_ptr_ = nullptr; - - // May need to decrement pin count now that it's not the write page, depending on - // the stream's mode. - UnpinPageIfNeeded(prev_write_page, pinned_); -} - -void BufferedTupleStreamV2::InvalidateWriteIterator() { - if (!has_write_iterator()) return; - ResetWritePage(); - has_write_iterator_ = false; - // No more pages will be appended to stream - do not need any write reservation. - write_page_reservation_.Close(); - // May not need a read reservation once the write iterator is invalidated. - if (NeedReadReservation(pinned_, num_pages_, has_read_iterator(), - read_page_ != pages_.end(), true, write_page_ != nullptr) - && !NeedReadReservation(pinned_, num_pages_, has_read_iterator(), - read_page_ != pages_.end(), false, false)) { - buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_); - } -} - -Status BufferedTupleStreamV2::NextReadPage() { - DCHECK(has_read_iterator()); - DCHECK(!closed_); - CHECK_CONSISTENCY_FAST(); - - if (read_page_ == pages_.end()) { - // No rows read yet - start reading at first page. If the stream is unpinned, we can - // use the reservation saved in PrepareForReadWrite() to pin the first page. - read_page_ = pages_.begin(); - if (NeedReadReservation(pinned_, num_pages_, true, false) - && !NeedReadReservation(pinned_, num_pages_, true, true)) { - buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_); - } - } else if (delete_on_read_) { - DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " " - << DebugString(); - DCHECK_NE(&*read_page_, write_page_); - bytes_pinned_ -= pages_.front().len(); - buffer_pool_->DestroyPage(buffer_pool_client_, &pages_.front().handle); - pages_.pop_front(); - --num_pages_; - read_page_ = pages_.begin(); - } else { - // Unpin pages after reading them if needed. - Page* prev_read_page = &*read_page_; - ++read_page_; - UnpinPageIfNeeded(prev_read_page, pinned_); - } - - if (read_page_ == pages_.end()) { - CHECK_CONSISTENCY_FULL(); - return Status::OK(); - } - - if (!pinned_ && read_page_->len() > default_page_len_ - && buffer_pool_client_->GetUnusedReservation() < read_page_->len()) { - // If we are iterating over an unpinned stream and encounter a page that is larger - // than the default page length, then unpinning the previous page may not have - // freed up enough reservation to pin the next one. The client is responsible for - // ensuring the reservation is available, so this indicates a bug. - return Status(TErrorCode::INTERNAL_ERROR, Substitute("Internal error: couldn't pin " - "large page of $0 bytes, client only had $1 bytes of unused reservation:\n$2", - read_page_->len(), buffer_pool_client_->GetUnusedReservation(), - buffer_pool_client_->DebugString())); - } - // Ensure the next page is pinned for reading. By this point we should have enough - // reservation to pin the page. If the stream is pinned, the page is already pinned. - // If the stream is unpinned, we freed up enough memory for a default-sized page by - // deleting or unpinning the previous page and ensured that, if the page was larger, - // that the reservation is available with the above check. - RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_)); - - // This waits for the pin to complete if the page was unpinned earlier. - const BufferHandle* read_buffer; - RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer)); - - read_page_rows_returned_ = 0; - read_ptr_ = read_buffer->data(); - read_end_ptr_ = read_ptr_ + read_buffer->len(); - - // We may need to save reservation for the write page in the case when the write page - // became a read/write page. - if (!NeedWriteReservation(pinned_, num_pages_, has_write_iterator(), - write_page_ != nullptr, false) - && NeedWriteReservation(pinned_, num_pages_, has_write_iterator(), - write_page_ != nullptr, has_read_write_page())) { - buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); - } - CHECK_CONSISTENCY_FAST(); - return Status::OK(); -} - -void BufferedTupleStreamV2::InvalidateReadIterator() { - if (read_page_ != pages_.end()) { - // Unpin the write page if we're reading in unpinned mode. - Page* prev_read_page = &*read_page_; - read_page_ = pages_.end(); - read_ptr_ = nullptr; - read_end_ptr_ = nullptr; - - // May need to decrement pin count after destroying read iterator. - UnpinPageIfNeeded(prev_read_page, pinned_); - } - has_read_iterator_ = false; - if (read_page_reservation_.GetReservation() > 0) { - buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_); - } - // It is safe to re-read a delete-on-read stream if no rows were read and no pages - // were therefore deleted. - if (rows_returned_ == 0) delete_on_read_ = false; -} - -Status BufferedTupleStreamV2::PrepareForRead(bool delete_on_read, bool* got_reservation) { - CHECK_CONSISTENCY_FULL(); - InvalidateWriteIterator(); - InvalidateReadIterator(); - // If already pinned, no additional pin is needed (see ExpectedPinCount()). - *got_reservation = pinned_ || pages_.empty() - || buffer_pool_client_->IncreaseReservationToFit(default_page_len_); - if (!*got_reservation) return Status::OK(); - return PrepareForReadInternal(delete_on_read); -} - -Status BufferedTupleStreamV2::PrepareForReadInternal(bool delete_on_read) { - DCHECK(!closed_); - DCHECK(!delete_on_read_); - DCHECK(!has_read_iterator()); - - has_read_iterator_ = true; - if (pages_.empty()) { - // No rows to return, or a the first read/write page has not yet been allocated. - read_page_ = pages_.end(); - read_ptr_ = nullptr; - read_end_ptr_ = nullptr; - } else { - // Eagerly pin the first page in the stream. - read_page_ = pages_.begin(); - // Check if we need to increment the pin count of the read page. - RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_)); - DCHECK(read_page_->is_pinned()); - - // This waits for the pin to complete if the page was unpinned earlier. - const BufferHandle* read_buffer; - RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer)); - read_ptr_ = read_buffer->data(); - read_end_ptr_ = read_ptr_ + read_buffer->len(); - } - read_page_rows_returned_ = 0; - rows_returned_ = 0; - delete_on_read_ = delete_on_read; - CHECK_CONSISTENCY_FULL(); - return Status::OK(); -} - -Status BufferedTupleStreamV2::PinStream(bool* pinned) { - DCHECK(!closed_); - CHECK_CONSISTENCY_FULL(); - if (pinned_) { - *pinned = true; - return Status::OK(); - } - *pinned = false; - // First, make sure we have the reservation to pin all the pages for reading. - int64_t bytes_to_pin = 0; - for (Page& page : pages_) { - bytes_to_pin += (ExpectedPinCount(true, &page) - page.pin_count()) * page.len(); - } - - // Check if we have some reservation to restore. - bool restore_write_reservation = - NeedWriteReservation(false) && !NeedWriteReservation(true); - bool restore_read_reservation = - NeedReadReservation(false) && !NeedReadReservation(true); - int64_t increase_needed = bytes_to_pin - - (restore_write_reservation ? default_page_len_ : 0) - - (restore_read_reservation ? default_page_len_ : 0); - bool reservation_granted = - buffer_pool_client_->IncreaseReservationToFit(increase_needed); - if (!reservation_granted) return Status::OK(); - - // If there is no current write page we should have some saved reservation to use. - // Only continue saving it if the stream is empty and need it to pin the first page. - if (restore_write_reservation) { - buffer_pool_client_->RestoreReservation(&write_page_reservation_, default_page_len_); - } - if (restore_read_reservation) { - buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_); - } - - // At this point success is guaranteed - go through to pin the pages we need to pin. - // If the page data was evicted from memory, the read I/O can happen in parallel - // because we defer calling GetBuffer() until NextReadPage(). - for (Page& page : pages_) RETURN_IF_ERROR(PinPageIfNeeded(&page, true)); - - pinned_ = true; - *pinned = true; - CHECK_CONSISTENCY_FULL(); - return Status::OK(); -} - -void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) { - CHECK_CONSISTENCY_FULL(); - DCHECK(!closed_); - if (mode == UNPIN_ALL) { - // Invalidate the iterators so they don't keep pages pinned. - InvalidateWriteIterator(); - InvalidateReadIterator(); - } - - if (pinned_) { - CHECK_CONSISTENCY_FULL(); - // If the stream was pinned, there may be some remaining pinned pages that should - // be unpinned at this point. - for (Page& page : pages_) UnpinPageIfNeeded(&page, false); - - // Check to see if we need to save some of the reservation we freed up. - if (!NeedWriteReservation(true) && NeedWriteReservation(false)) { - buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); - } - if (!NeedReadReservation(true) && NeedReadReservation(false)) { - buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_); - } - pinned_ = false; - } - CHECK_CONSISTENCY_FULL(); -} - -Status BufferedTupleStreamV2::GetRows( - MemTracker* tracker, scoped_ptr<RowBatch>* batch, bool* got_rows) { - if (num_rows() > numeric_limits<int>::max()) { - // RowBatch::num_rows_ is a 32-bit int, avoid an overflow. - return Status(Substitute("Trying to read $0 rows into in-memory batch failed. Limit " - "is $1", - num_rows(), numeric_limits<int>::max())); - } - RETURN_IF_ERROR(PinStream(got_rows)); - if (!*got_rows) return Status::OK(); - bool got_reservation; - RETURN_IF_ERROR(PrepareForRead(false, &got_reservation)); - DCHECK(got_reservation) << "Stream was pinned"; - batch->reset(new RowBatch(desc_, num_rows(), tracker)); - bool eos = false; - // Loop until GetNext fills the entire batch. Each call can stop at page - // boundaries. We generally want it to stop, so that pages can be freed - // as we read. It is safe in this case because we pin the entire stream. - while (!eos) { - RETURN_IF_ERROR(GetNext(batch->get(), &eos)); - } - return Status::OK(); -} - -Status BufferedTupleStreamV2::GetNext(RowBatch* batch, bool* eos) { - return GetNextInternal<false>(batch, eos, nullptr); -} - -Status BufferedTupleStreamV2::GetNext( - RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) { - return GetNextInternal<true>(batch, eos, flat_rows); -} - -template <bool FILL_FLAT_ROWS> -Status BufferedTupleStreamV2::GetNextInternal( - RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) { - if (has_nullable_tuple_) { - return GetNextInternal<FILL_FLAT_ROWS, true>(batch, eos, flat_rows); - } else { - return GetNextInternal<FILL_FLAT_ROWS, false>(batch, eos, flat_rows); - } -} - -template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE> -Status BufferedTupleStreamV2::GetNextInternal( - RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) { - DCHECK(!closed_); - DCHECK(batch->row_desc()->Equals(*desc_)); - DCHECK(is_pinned() || !FILL_FLAT_ROWS) - << "FlatRowPtrs are only valid for pinned streams"; - *eos = (rows_returned_ == num_rows_); - if (*eos) return Status::OK(); - - if (UNLIKELY(read_page_ == pages_.end() - || read_page_rows_returned_ == read_page_->num_rows)) { - // Get the next page in the stream (or the first page if read_page_ was not yet - // initialized.) We need to do this at the beginning of the GetNext() call to ensure - // the buffer management semantics. NextReadPage() may unpin or delete the buffer - // backing the rows returned from the *previous* call to GetNext(). - RETURN_IF_ERROR(NextReadPage()); - } - - DCHECK(has_read_iterator()); - DCHECK(read_page_ != pages_.end()); - DCHECK(read_page_->is_pinned()) << DebugString(); - DCHECK_GE(read_page_rows_returned_, 0); - - int rows_left_in_page = read_page_->num_rows - read_page_rows_returned_; - int rows_to_fill = std::min(batch->capacity() - batch->num_rows(), rows_left_in_page); - DCHECK_GE(rows_to_fill, 1); - uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(batch->GetRow(batch->num_rows())); - - // Produce tuple rows from the current page and the corresponding position on the - // null tuple indicator. - if (FILL_FLAT_ROWS) { - DCHECK(flat_rows != nullptr); - DCHECK(!delete_on_read_); - DCHECK_EQ(batch->num_rows(), 0); - flat_rows->clear(); - flat_rows->reserve(rows_to_fill); - } - - const uint64_t tuples_per_row = desc_->tuple_descriptors().size(); - // Start reading from the current position in 'read_page_'. - for (int i = 0; i < rows_to_fill; ++i) { - if (FILL_FLAT_ROWS) { - flat_rows->push_back(read_ptr_); - DCHECK_EQ(flat_rows->size(), i + 1); - } - // Copy the row into the output batch. - TupleRow* output_row = reinterpret_cast<TupleRow*>(tuple_row_mem); - tuple_row_mem += sizeof(Tuple*) * tuples_per_row; - UnflattenTupleRow<HAS_NULLABLE_TUPLE>(&read_ptr_, output_row); - - // Update string slot ptrs, skipping external strings. - for (int j = 0; j < inlined_string_slots_.size(); ++j) { - Tuple* tuple = output_row->GetTuple(inlined_string_slots_[j].first); - if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue; - FixUpStringsForRead(inlined_string_slots_[j].second, tuple); - } - - // Update collection slot ptrs, skipping external collections. We traverse the - // collection structure in the same order as it was written to the stream, allowing - // us to infer the data layout based on the length of collections and strings. - for (int j = 0; j < inlined_coll_slots_.size(); ++j) { - Tuple* tuple = output_row->GetTuple(inlined_coll_slots_[j].first); - if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue; - FixUpCollectionsForRead(inlined_coll_slots_[j].second, tuple); - } - } - - batch->CommitRows(rows_to_fill); - rows_returned_ += rows_to_fill; - read_page_rows_returned_ += rows_to_fill; - *eos = (rows_returned_ == num_rows_); - if (read_page_rows_returned_ == read_page_->num_rows && (!pinned_ || delete_on_read_)) { - // No more data in this page. The batch must be immediately returned up the operator - // tree and deep copied so that NextReadPage() can reuse the read page's buffer. - // TODO: IMPALA-4179 - instead attach the buffer and flush the resources. - batch->MarkNeedsDeepCopy(); - } - if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill); - DCHECK_LE(read_ptr_, read_end_ptr_); - return Status::OK(); -} - -void BufferedTupleStreamV2::FixUpStringsForRead( - const vector<SlotDescriptor*>& string_slots, Tuple* tuple) { - DCHECK(tuple != nullptr); - for (const SlotDescriptor* slot_desc : string_slots) { - if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; - - StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset()); - DCHECK_LE(read_ptr_ + sv->len, read_end_ptr_); - sv->ptr = reinterpret_cast<char*>(read_ptr_); - read_ptr_ += sv->len; - } -} - -void BufferedTupleStreamV2::FixUpCollectionsForRead( - const vector<SlotDescriptor*>& collection_slots, Tuple* tuple) { - DCHECK(tuple != nullptr); - for (const SlotDescriptor* slot_desc : collection_slots) { - if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; - - CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset()); - const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor(); - int coll_byte_size = cv->num_tuples * item_desc.byte_size(); - DCHECK_LE(read_ptr_ + coll_byte_size, read_end_ptr_); - cv->ptr = reinterpret_cast<uint8_t*>(read_ptr_); - read_ptr_ += coll_byte_size; - - if (!item_desc.HasVarlenSlots()) continue; - uint8_t* coll_data = cv->ptr; - for (int i = 0; i < cv->num_tuples; ++i) { - Tuple* item = reinterpret_cast<Tuple*>(coll_data); - FixUpStringsForRead(item_desc.string_slots(), item); - FixUpCollectionsForRead(item_desc.collection_slots(), item); - coll_data += item_desc.byte_size(); - } - } -} - -int64_t BufferedTupleStreamV2::ComputeRowSize(TupleRow* row) const noexcept { - int64_t size = 0; - if (has_nullable_tuple_) { - size += NullIndicatorBytesPerRow(); - for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) { - if (row->GetTuple(i) != nullptr) size += fixed_tuple_sizes_[i]; - } - } else { - for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) { - size += fixed_tuple_sizes_[i]; - } - } - for (int i = 0; i < inlined_string_slots_.size(); ++i) { - Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first); - if (tuple == nullptr) continue; - const vector<SlotDescriptor*>& slots = inlined_string_slots_[i].second; - for (auto it = slots.begin(); it != slots.end(); ++it) { - if (tuple->IsNull((*it)->null_indicator_offset())) continue; - size += tuple->GetStringSlot((*it)->tuple_offset())->len; - } - } - - for (int i = 0; i < inlined_coll_slots_.size(); ++i) { - Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first); - if (tuple == nullptr) continue; - const vector<SlotDescriptor*>& slots = inlined_coll_slots_[i].second; - for (auto it = slots.begin(); it != slots.end(); ++it) { - if (tuple->IsNull((*it)->null_indicator_offset())) continue; - CollectionValue* cv = tuple->GetCollectionSlot((*it)->tuple_offset()); - const TupleDescriptor& item_desc = *(*it)->collection_item_descriptor(); - size += cv->num_tuples * item_desc.byte_size(); - - if (!item_desc.HasVarlenSlots()) continue; - for (int j = 0; j < cv->num_tuples; ++j) { - Tuple* item = reinterpret_cast<Tuple*>(&cv->ptr[j * item_desc.byte_size()]); - size += item->VarlenByteSize(item_desc); - } - } - } - return size; -} - -bool BufferedTupleStreamV2::AddRowSlow(TupleRow* row, Status* status) noexcept { - // Use AddRowCustom*() to do the work of advancing the page. - int64_t row_size = ComputeRowSize(row); - uint8_t* data = AddRowCustomBeginSlow(row_size, status); - if (data == nullptr) return false; - bool success = DeepCopy(row, &data, data + row_size); - DCHECK(success); - DCHECK_EQ(data, write_ptr_); - AddRowCustomEnd(row_size); - return true; -} - -uint8_t* BufferedTupleStreamV2::AddRowCustomBeginSlow( - int64_t size, Status* status) noexcept { - bool got_reservation; - *status = AdvanceWritePage(size, &got_reservation); - if (!status->ok() || !got_reservation) return nullptr; - - // We have a large-enough page so now success is guaranteed. - uint8_t* result = AddRowCustomBegin(size, status); - DCHECK(result != nullptr); - return result; -} - -void BufferedTupleStreamV2::AddLargeRowCustomEnd(int64_t size) noexcept { - DCHECK_GT(size, default_page_len_); - // Immediately unpin the large write page so that we're not using up extra reservation - // and so we don't append another row to the page. - ResetWritePage(); - // Save some of the reservation we freed up so we can create the next write page when - // needed. - if (NeedWriteReservation()) { - buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); - } - // The stream should be in a consistent state once the row is added. - CHECK_CONSISTENCY_FAST(); -} - -bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) noexcept { - DCHECK(!closed_); - DCHECK(has_write_iterator()); - if (UNLIKELY(write_page_ == nullptr || !DeepCopy(row, &write_ptr_, write_end_ptr_))) { - return AddRowSlow(row, status); - } - ++num_rows_; - ++write_page_->num_rows; - return true; -} - -bool BufferedTupleStreamV2::DeepCopy( - TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept { - return has_nullable_tuple_ ? DeepCopyInternal<true>(row, data, data_end) : - DeepCopyInternal<false>(row, data, data_end); -} - -// TODO: consider codegening this. -// TODO: in case of duplicate tuples, this can redundantly serialize data. -template <bool HAS_NULLABLE_TUPLE> -bool BufferedTupleStreamV2::DeepCopyInternal( - TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept { - uint8_t* pos = *data; - const uint64_t tuples_per_row = desc_->tuple_descriptors().size(); - // Copy the not NULL fixed len tuples. For the NULL tuples just update the NULL tuple - // indicator. - if (HAS_NULLABLE_TUPLE) { - int null_indicator_bytes = NullIndicatorBytesPerRow(); - if (UNLIKELY(pos + null_indicator_bytes > data_end)) return false; - uint8_t* null_indicators = pos; - pos += NullIndicatorBytesPerRow(); - memset(null_indicators, 0, null_indicator_bytes); - for (int i = 0; i < tuples_per_row; ++i) { - uint8_t* null_word = null_indicators + (i >> 3); - const uint32_t null_pos = i & 7; - const int tuple_size = fixed_tuple_sizes_[i]; - Tuple* t = row->GetTuple(i); - const uint8_t mask = 1 << (7 - null_pos); - if (t != nullptr) { - if (UNLIKELY(pos + tuple_size > data_end)) return false; - memcpy(pos, t, tuple_size); - pos += tuple_size; - } else { - *null_word |= mask; - } - } - } else { - // If we know that there are no nullable tuples no need to set the nullability flags. - for (int i = 0; i < tuples_per_row; ++i) { - const int tuple_size = fixed_tuple_sizes_[i]; - if (UNLIKELY(pos + tuple_size > data_end)) return false; - Tuple* t = row->GetTuple(i); - // TODO: Once IMPALA-1306 (Avoid passing empty tuples of non-materialized slots) - // is delivered, the check below should become DCHECK(t != nullptr). - DCHECK(t != nullptr || tuple_size == 0); - memcpy(pos, t, tuple_size); - pos += tuple_size; - } - } - - // Copy inlined string slots. Note: we do not need to convert the string ptrs to offsets - // on the write path, only on the read. The tuple data is immediately followed - // by the string data so only the len information is necessary. - for (int i = 0; i < inlined_string_slots_.size(); ++i) { - const Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first); - if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue; - if (UNLIKELY(!CopyStrings(tuple, inlined_string_slots_[i].second, &pos, data_end))) - return false; - } - - // Copy inlined collection slots. We copy collection data in a well-defined order so - // we do not need to convert pointers to offsets on the write path. - for (int i = 0; i < inlined_coll_slots_.size(); ++i) { - const Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first); - if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue; - if (UNLIKELY(!CopyCollections(tuple, inlined_coll_slots_[i].second, &pos, data_end))) - return false; - } - *data = pos; - return true; -} - -bool BufferedTupleStreamV2::CopyStrings(const Tuple* tuple, - const vector<SlotDescriptor*>& string_slots, uint8_t** data, const uint8_t* data_end) { - for (const SlotDescriptor* slot_desc : string_slots) { - if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; - const StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset()); - if (LIKELY(sv->len > 0)) { - if (UNLIKELY(*data + sv->len > data_end)) return false; - - memcpy(*data, sv->ptr, sv->len); - *data += sv->len; - } - } - return true; -} - -bool BufferedTupleStreamV2::CopyCollections(const Tuple* tuple, - const vector<SlotDescriptor*>& collection_slots, uint8_t** data, const uint8_t* data_end) { - for (const SlotDescriptor* slot_desc : collection_slots) { - if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; - const CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset()); - const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor(); - if (LIKELY(cv->num_tuples > 0)) { - int coll_byte_size = cv->num_tuples * item_desc.byte_size(); - if (UNLIKELY(*data + coll_byte_size > data_end)) return false; - uint8_t* coll_data = *data; - memcpy(coll_data, cv->ptr, coll_byte_size); - *data += coll_byte_size; - - if (!item_desc.HasVarlenSlots()) continue; - // Copy variable length data when present in collection items. - for (int i = 0; i < cv->num_tuples; ++i) { - const Tuple* item = reinterpret_cast<Tuple*>(coll_data); - if (UNLIKELY(!CopyStrings(item, item_desc.string_slots(), data, data_end))) { - return false; - } - if (UNLIKELY( - !CopyCollections(item, item_desc.collection_slots(), data, data_end))) { - return false; - } - coll_data += item_desc.byte_size(); - } - } - } - return true; -} - -void BufferedTupleStreamV2::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const { - DCHECK(row != nullptr); - DCHECK(!closed_); - DCHECK(is_pinned()); - DCHECK(!delete_on_read_); - uint8_t* data = flat_row; - return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) : - UnflattenTupleRow<false>(&data, row); -} - -template <bool HAS_NULLABLE_TUPLE> -void BufferedTupleStreamV2::UnflattenTupleRow(uint8_t** data, TupleRow* row) const { - const int tuples_per_row = desc_->tuple_descriptors().size(); - uint8_t* ptr = *data; - if (has_nullable_tuple_) { - // Stitch together the tuples from the page and the NULL ones. - const uint8_t* null_indicators = ptr; - ptr += NullIndicatorBytesPerRow(); - for (int i = 0; i < tuples_per_row; ++i) { - const uint8_t* null_word = null_indicators + (i >> 3); - const uint32_t null_pos = i & 7; - const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0); - row->SetTuple( - i, reinterpret_cast<Tuple*>(reinterpret_cast<uint64_t>(ptr) * is_not_null)); - ptr += fixed_tuple_sizes_[i] * is_not_null; - } - } else { - for (int i = 0; i < tuples_per_row; ++i) { - row->SetTuple(i, reinterpret_cast<Tuple*>(ptr)); - ptr += fixed_tuple_sizes_[i]; - } - } - *data = ptr; -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/runtime/buffered-tuple-stream-v2.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.h b/be/src/runtime/buffered-tuple-stream-v2.h deleted file mode 100644 index 2023124..0000000 --- a/be/src/runtime/buffered-tuple-stream-v2.h +++ /dev/null @@ -1,705 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_H -#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_H - -#include <set> -#include <vector> -#include <boost/scoped_ptr.hpp> -#include <boost/function.hpp> - -#include "common/global-types.h" -#include "common/status.h" -#include "gutil/macros.h" -#include "runtime/bufferpool/buffer-pool.h" -#include "runtime/row-batch.h" - -namespace impala { - -class MemTracker; -class RuntimeState; -class RowDescriptor; -class SlotDescriptor; -class Tuple; -class TupleRow; - -/// Class that provides an abstraction for a stream of tuple rows backed by BufferPool -/// Pages. Rows can be added to the stream and read back. Rows are returned in the order -/// they are added. -/// -/// The BufferedTupleStream is *not* thread safe from the caller's point of view. -/// Different threads should not concurrently call methods of the same BufferedTupleStream -/// object. -/// -/// Reading and writing the stream: -/// The stream supports two modes of reading/writing, depending on whether -/// PrepareForWrite() is called to initialize a write iterator only or -/// PrepareForReadWrite() is called to initialize both read and write iterators to enable -/// interleaved reads and writes. -/// -/// To use write-only mode, PrepareForWrite() is called once and AddRow()/AddRowCustom*() -/// are called repeatedly to initialize then advance a write iterator through the stream. -/// Once the stream is fully written, it can be read back by calling PrepareForRead() -/// then GetNext() repeatedly to advance a read iterator through the stream, or by -/// calling GetRows() to get all of the rows at once. -/// -/// To use read/write mode, PrepareForReadWrite() is called once to initialize the read -/// and write iterators. AddRow()/AddRowCustom*() then advance a write iterator through -/// the stream, and GetNext() advances a trailing read iterator through the stream. -/// -/// Buffer management: -/// The tuple stream is backed by a sequence of BufferPool Pages. The tuple stream uses -/// the client's reservation to pin pages in memory. It will automatically try to -/// increase the client's reservation whenever it needs to do so to make progress. -/// -/// Normally pages are all of the same default page length, but larger pages up to the -/// max page length are used if needed to store rows that are too large for a -/// default-length page. -/// -/// The stream has both pinned and unpinned modes. In the pinned mode all pages are -/// pinned for reading. The pinned mode avoids I/O by keeping all pages pinned in memory -/// and allows clients to save pointers to rows in the stream and randomly access them. -/// E.g. hash tables can be backed by a BufferedTupleStream. In the unpinned mode, only -/// pages currently being read and written are pinned and other pages are unpinned and -/// therefore do not use the client's reservation and can be spilled to disk. The stream -/// always holds onto a default page's worth of reservation for the read and write -/// iterators (i.e. two page's worth if the stream is in read/write mode), even if that -/// many pages are not currently pinned. This means that UnpinStream() always succeeds, -/// and moving to the next default-length write page or read page on an unpinned stream -/// does not require additional reservation. This is implemented by saving reservations -/// in SubReservations. -/// -/// To read or write a row larger than the default page size to/from an unpinned stream, -/// the client must have max_page_len - default_page_len unused reservation. Writing a -/// large row to an unpinned stream only uses the reservation for the duration of the -/// AddRow()/AddRowCustom*() call. Reading a large row from an unpinned stream uses the -/// reservation until the next call to GetNext(). E.g. to partition a single unpinned -/// stream into n unpinned streams, the reservation needed is (n - 1) * -/// default_page_len + 2 * max_page_len: one large read buffer and one large write -/// buffer is needed to keep the row being processed in-memory, but only default-sized -/// buffers are needed for the other streams being written. -/// -/// The tuple stream also supports a 'delete_on_read' mode, enabled by passing a flag -/// to PrepareForRead() which deletes the stream's pages as it does a final read -/// pass over the stream. -/// -/// TODO: IMPALA-4179: the buffer management can be simplified once we can attach -/// buffers to RowBatches. -/// -/// Page layout: -/// Rows are stored back to back starting at the first byte of each page's buffer, with -/// no interleaving of data from different rows. There is no padding or alignment -/// between rows. Rows larger than the default page length are stored on their own -/// page. -/// -/// Tuple row layout: -/// If the stream's tuples are nullable (i.e. has_nullable_tuple_ is true), there is a -/// bitstring at the start of each row with null indicators for all tuples in each row -/// (including non-nullable tuples). The bitstring occupies ceil(num_tuples_per_row / 8) -/// bytes. A 1 indicates the tuple is null. -/// -/// The fixed length parts of the row's tuples are stored first, followed by var len data -/// for inlined_string_slots_ and inlined_coll_slots_. Other "external" var len slots can -/// point to var len data outside the stream. When reading the stream, the length of each -/// row's var len data in the stream must be computed to find the next row's start. -/// -/// The tuple stream supports reading from the stream into RowBatches without copying -/// out any data: the RowBatches' Tuple pointers will point directly into the stream's -/// pages' buffers. The fixed length parts follow Impala's internal tuple format, so for -/// the tuple to be valid, we only need to update pointers to point to the var len data -/// in the stream. These pointers need to be updated by the stream because a spilled -/// page's data may be relocated to a different buffer. The pointers are updated lazily -/// upon reading the stream via GetNext() or GetRows(). -/// -/// Example layout for a row with two non-nullable tuples ((1, "hello"), (2, "world")) -/// with all var len data stored in the stream: -/// <---- tuple 1 -----> <------ tuple 2 ------> <- var len -> <- next row ... -/// +--------+-----------+-----------+-----------+-------------+ -/// | IntVal | StringVal | BigIntVal | StringVal | | ... -/// +--------+-----------+-----------+-----------++------------+ -/// | val: 1 | len: 5 | val: 2 | len: 5 | helloworld | ... -/// | | ptr: 0x.. | | ptr: 0x.. | | ... -/// +--------+-----------+-----------+-----------+-------------+ -/// <--4b--> <---12b---> <----8b---> <---12b---> <----10b----> -/// -/// Example layout for a row with the second tuple nullable ((1, "hello"), NULL) -/// with all var len data stored in the stream: -/// <- null tuple bitstring -> <---- tuple 1 -----> <- var len -> <- next row ... -/// +-------------------------+--------+-----------+------------+ -/// | | IntVal | StringVal | | ... -/// +-------------------------+--------+-----------+------------+ -/// | 0000 0010 | val: 1 | len: 5 | hello | ... -/// | | | ptr: 0x.. | | ... -/// +-------------------------+--------+-----------+------------+ -/// <---------1b------------> <--4b--> <---12b---> <----5b----> -/// -/// Example layout for a row with a single non-nullable tuple (("hello", "world")) with -/// the second string slot stored externally to the stream: -/// <------ tuple 1 ------> <- var len -> <- next row ... -/// +-----------+-----------+-------------+ -/// | StringVal | StringVal | | ... -/// +-----------+-----------+-------------+ -/// | len: 5 | len: 5 | hello | ... -/// | ptr: 0x.. | ptr: 0x.. | | ... -/// +-----------+-----------+-------------+ -/// <---12b---> <---12b---> <-----5b----> -/// -/// The behavior of reads and writes is as follows: -/// Read: -/// 1. Unpinned: Only a single read page is pinned at a time. This means that only -/// enough reservation to pin a single page is needed to read the stream, regardless -/// of the stream's size. Each page is deleted or unpinned (if delete on read is true -/// or false respectively) before advancing to the next page. -/// 2. Pinned: All pages in the stream are pinned so do not need to be pinned or -/// unpinned when reading from the stream. If delete on read is true, pages are -/// deleted after being read. If the stream was previously unpinned, the page's data -/// may not yet be in memory - reading from the stream can block on I/O or fail with -/// an I/O error. -/// Write: -/// 1. Unpinned: Unpin pages as they fill up. This means that only a enough reservation -/// to pin a single write page is required to write to the stream, regardless of the -/// stream's size. -/// 2. Pinned: Pages are left pinned. If the next page in the stream cannot be pinned -/// because the client's reservation is insufficient (and could not be increased by -/// the stream), the read call will fail and the client can either unpin the stream -/// or free up other memory before retrying. -/// -/// Memory lifetime of rows read from stream: -/// If the stream is pinned and delete on read is false, it is valid to access any tuples -/// returned via GetNext() or GetRows() until the stream is unpinned. If the stream is -/// unpinned or delete on read is true, then the batch returned from GetNext() may have -/// the needs_deep_copy flag set, which means that any tuple memory returned so far from -/// the stream may be freed on the next call to GetNext(). -/// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch. -/// -/// Manual construction of rows with AddRowCustomBegin()/AddRowCustomEnd(): -/// The BufferedTupleStream supports allocation of uninitialized rows with -/// AddRowCustom*(). AddRowCustomBegin() is called instead of AddRow() if the client wants -/// to manually construct a row. The caller of AddRowCustomBegin() is responsible for -/// writing the row with exactly the layout described above then calling -/// AddRowCustomEnd() when done. -/// -/// If a caller constructs a tuple in this way, the caller can set the pointers and they -/// will not be modified until the stream is read via GetNext() or GetRows(). -/// TODO: IMPALA-5007: try to remove AddRowCustom*() by unifying with AddRow(). -/// -/// TODO: we need to be able to do read ahead for pages. We need some way to indicate a -/// page will need to be pinned soon. -class BufferedTupleStreamV2 { - public: - /// A pointer to the start of a flattened TupleRow in the stream. - typedef uint8_t* FlatRowPtr; - - /// row_desc: description of rows stored in the stream. This is the desc for rows - /// that are added and the rows being returned. - /// page_len: the size of pages to use in the stream - /// ext_varlen_slots: set of varlen slots with data stored externally to the stream - BufferedTupleStreamV2(RuntimeState* state, const RowDescriptor* row_desc, - BufferPool::ClientHandle* buffer_pool_client, int64_t default_page_len, - int64_t max_page_len, - const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>()); - - virtual ~BufferedTupleStreamV2(); - - /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called - /// once before any of the other APIs. - /// If 'pinned' is true, the tuple stream starts off pinned, otherwise it is unpinned. - /// 'node_id' is only used for error reporting. - Status Init(int node_id, bool pinned) WARN_UNUSED_RESULT; - - /// Prepares the stream for writing by saving enough reservation for a default-size - /// write page. Tries to increase reservation if there is not enough unused reservation - /// for a page. Called after Init() and before the first AddRow() or - /// AddRowCustomBegin() call. - /// 'got_reservation': set to true if there was enough reservation to initialize the - /// first write page and false if there was not enough reservation and no other - /// error was encountered. Undefined if an error status is returned. - Status PrepareForWrite(bool* got_reservation) WARN_UNUSED_RESULT; - - /// Prepares the stream for interleaved reads and writes by saving enough reservation - /// for default-sized read and write pages. Called after Init() and before the first - /// AddRow() or AddRowCustomBegin() call. - /// 'delete_on_read': Pages are deleted after they are read. - /// 'got_reservation': set to true if there was enough reservation to initialize the - /// read and write pages and false if there was not enough reservation and no other - /// error was encountered. Undefined if an error status is returned. - Status PrepareForReadWrite( - bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT; - - /// Prepares the stream for reading, invalidating the write iterator (if there is one). - /// Therefore must be called after the last AddRow() or AddRowCustomEnd() and before - /// GetNext(). PrepareForRead() can be called multiple times to do multiple read passes - /// over the stream, unless rows were read from the stream after PrepareForRead() or - /// PrepareForReadWrite() was called with delete_on_read = true. - /// 'delete_on_read': Pages are deleted after they are read. - /// 'got_reservation': set to true if there was enough reservation to initialize the - /// first read page and false if there was not enough reservation and no other - /// error was encountered. Undefined if an error status is returned. - Status PrepareForRead(bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT; - - /// Adds a single row to the stream. There are three possible outcomes: - /// a) The append succeeds. True is returned. - /// b) The append fails because the unused reservation was not sufficient to add - /// a new page to the stream large enough to fit 'row' and the stream could not - /// increase the reservation to get enough unused reservation. Returns false and - /// sets 'status' to OK. The append can be retried after freeing up memory or - /// unpinning the stream. - /// c) The append fails with a runtime error. Returns false and sets 'status' to an - /// error. - /// d) The append fails becase the row is too large to fit in a page of a stream. - /// Returns false and sets 'status' to an error. - /// - /// Unpinned streams can only encounter case b) when appending a row larger than - /// the default page size and the reservation could not be increased sufficiently. - /// Otherwise enough memory is automatically freed up by unpinning the current write - /// page. - /// - /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow() - /// returns an error, it should not be called again. - bool AddRow(TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT; - - /// Allocates space to store a row of 'size' bytes (including fixed and variable length - /// data). If successful, returns a pointer to the allocated row. The caller then must - /// writes valid data to the row and call AddRowCustomEnd(). - /// - /// If unsuccessful, returns nullptr. The failure modes are the same as described in the - /// AddRow() comment. - ALWAYS_INLINE uint8_t* AddRowCustomBegin(int64_t size, Status* status); - - /// Called after AddRowCustomBegin() when done writing the row. Only should be called - /// if AddRowCustomBegin() succeeded. See the AddRowCustomBegin() comment for - /// explanation. - /// 'size': the size passed into AddRowCustomBegin(). - void AddRowCustomEnd(int64_t size); - - /// Unflattens 'flat_row' into a regular TupleRow 'row'. Only valid to call if the - /// stream is pinned. The row must have been allocated with the stream's row desc. - /// The returned 'row' is backed by memory from the stream so is only valid as long - /// as the stream is pinned. - void GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const; - - /// Pins all pages in this stream and switches to pinned mode. Has no effect if the - /// stream is already pinned. - /// If the current unused reservation is not sufficient to pin the stream in memory, - /// this will try to increase the reservation. If that fails, 'pinned' is set to false - /// and the stream is left unpinned. Otherwise 'pinned' is set to true. - Status PinStream(bool* pinned) WARN_UNUSED_RESULT; - - /// Modes for UnpinStream(). - enum UnpinMode { - /// All pages in the stream are unpinned and the read/write positions in the stream - /// are reset. No more rows can be written to the stream after this. The stream can - /// be re-read from the beginning by calling PrepareForRead(). - UNPIN_ALL, - /// All pages are unpinned aside from the current read and write pages (if any), - /// which is left in the same state. The unpinned stream can continue being read - /// or written from the current read or write positions. - UNPIN_ALL_EXCEPT_CURRENT, - }; - - /// Unpins stream with the given 'mode' as described above. - void UnpinStream(UnpinMode mode); - - /// Get the next batch of output rows, which are backed by the stream's memory. - /// If the stream is unpinned or 'delete_on_read' is true, the 'needs_deep_copy' - /// flag may be set on 'batch' to signal that memory will be freed on the next - /// call to GetNext() and that the caller should copy out any data it needs from - /// rows in 'batch' or in previous batches returned from GetNext(). - /// - /// If the stream is pinned and 'delete_on_read' is false, the memory backing the - /// rows will remain valid until the stream is unpinned, destroyed, etc. - /// TODO: IMPALA-4179: update when we simplify the memory transfer model. - Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT; - - /// Same as above, but populate 'flat_rows' with a pointer to the flat version of - /// each returned row in the pinned stream. The pointers in 'flat_rows' are only - /// valid as long as the stream remains pinned. - Status GetNext( - RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows) WARN_UNUSED_RESULT; - - /// Returns all the rows in the stream in batch. This pins the entire stream in the - /// process. If the current unused reservation is not sufficient to pin the stream in - /// memory, this will try to increase the reservation. If that fails, 'got_rows' is set - /// to false. - Status GetRows(MemTracker* tracker, boost::scoped_ptr<RowBatch>* batch, - bool* got_rows) WARN_UNUSED_RESULT; - - /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL, - /// attaches buffers from pinned pages that rows returned from GetNext() may reference. - /// Otherwise deletes all pages. Does nothing if the stream was already closed. The - /// 'flush' mode is forwarded to RowBatch::AddBuffer() when attaching buffers. - void Close(RowBatch* batch, RowBatch::FlushMode flush); - - /// Number of rows in the stream. - int64_t num_rows() const { return num_rows_; } - - /// Number of rows returned via GetNext(). - int64_t rows_returned() const { return rows_returned_; } - - /// Returns the byte size necessary to store the entire stream in memory. - int64_t byte_size() const { return total_byte_size_; } - - /// Returns the number of bytes currently pinned in memory by the stream. - /// If ignore_current is true, the write_page_ memory is not included. - int64_t BytesPinned(bool ignore_current) const { - if (ignore_current && write_page_ != nullptr && write_page_->is_pinned()) { - return bytes_pinned_ - write_page_->len(); - } - return bytes_pinned_; - } - - bool is_closed() const { return closed_; } - bool is_pinned() const { return pinned_; } - bool has_read_iterator() const { return has_read_iterator_; } - bool has_write_iterator() const { return has_write_iterator_; } - - std::string DebugString() const; - - private: - DISALLOW_COPY_AND_ASSIGN(BufferedTupleStreamV2); - friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test; - friend class ArrayTupleStreamTest_TestComputeRowSize_Test; - friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test; - friend class SimpleTupleStreamTest_TestGetRowsOverflow_Test; - - /// Wrapper around BufferPool::PageHandle that tracks additional info about the page. - struct Page { - Page() : num_rows(0), retrieved_buffer(true) {} - - inline int len() const { return handle.len(); } - inline bool is_pinned() const { return handle.is_pinned(); } - inline int pin_count() const { return handle.pin_count(); } - Status GetBuffer(const BufferPool::BufferHandle** buffer) { - RETURN_IF_ERROR(handle.GetBuffer(buffer)); - retrieved_buffer = true; - return Status::OK(); - } - std::string DebugString() const; - - BufferPool::PageHandle handle; - - /// Number of rows written to the page. - int num_rows; - - /// Whether we called GetBuffer() on the page since it was last pinned. This means - /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() may have - /// returned rows referencing the page's buffer. - bool retrieved_buffer; - }; - - /// Runtime state instance used to check for cancellation. Not owned. - RuntimeState* const state_; - - /// Description of rows stored in the stream. - const RowDescriptor* desc_; - - /// Plan node ID, used for error reporting. - int node_id_; - - /// The size of the fixed length portion for each tuple in the row. - std::vector<int> fixed_tuple_sizes_; - - /// Vectors of all the strings slots that have their varlen data stored in stream - /// grouped by tuple_idx. - std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_string_slots_; - - /// Vectors of all the collection slots that have their varlen data stored in the - /// stream, grouped by tuple_idx. - std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_coll_slots_; - - /// Buffer pool and client used to allocate, pin and release pages. Not owned. - BufferPool* buffer_pool_; - BufferPool::ClientHandle* buffer_pool_client_; - - /// List of pages in the stream. - /// Empty iff one of two cases applies: - /// * before the first row has been added with AddRow() or AddRowCustom(). - /// * after the stream has been destructively read in 'delete_on_read' mode - std::list<Page> pages_; - // IMPALA-5629: avoid O(n) list.size() call by explicitly tracking the number of pages. - // TODO: remove when we switch to GCC5+, where list.size() is O(1). See GCC bug #49561. - int64_t num_pages_; - - /// Total size of pages_, including any pages already deleted in 'delete_on_read' - /// mode. - int64_t total_byte_size_; - - /// True if there is currently an active read iterator for the stream. - bool has_read_iterator_; - - /// The current page being read. When no read iterator is active, equal to list.end(). - /// When a read iterator is active, either points to the current read page, or equals - /// list.end() if no rows have yet been read. GetNext() does not advance this past - /// the end of the stream, so upon eos 'read_page_' points to the last page and - /// rows_returned_ == num_rows_. Always pinned, unless a Pin() call failed and an error - /// status was returned. - std::list<Page>::iterator read_page_; - - /// Saved reservation for read iterator. 'default_page_len_' reservation is saved if - /// there is a read iterator, no pinned read page, and the possibility that the read - /// iterator will advance to a valid page. - BufferPool::SubReservation read_page_reservation_; - - /// Number of rows returned from the current read_page_. - uint32_t read_page_rows_returned_; - - /// Pointer into read_page_ to the byte after the last row read. - uint8_t* read_ptr_; - - /// Pointer to one byte past the end of read_page_. Used to detect overruns. - const uint8_t* read_end_ptr_; - - /// Pointer into write_page_ to the byte after the last row written. - uint8_t* write_ptr_; - - /// Pointer to one byte past the end of write_page_. Cached to speed up computation - const uint8_t* write_end_ptr_; - - /// Number of rows returned to the caller from GetNext() since the last - /// PrepareForRead() call. - int64_t rows_returned_; - - /// True if there is currently an active write iterator into the stream. - bool has_write_iterator_; - - /// The current page for writing. NULL if there is no write iterator or no current - /// write page. Always pinned. Size is 'default_page_len_', except temporarily while - /// appending a larger row between AddRowCustomBegin() and AddRowCustomEnd(). - Page* write_page_; - - /// Saved reservation for write iterator. 'default_page_len_' reservation is saved if - /// there is a write iterator, no page currently pinned for writing and the possibility - /// that a pin count will be needed for the write iterator in future. Specifically if: - /// * no rows have been appended to the stream and 'pages_' is empty, or - /// * the stream is unpinned, 'write_page_' is null and and the last page in 'pages_' - /// is a large page that we advanced past, or - /// * there is only one pinned page in the stream and it is already pinned for reading. - BufferPool::SubReservation write_page_reservation_; - - /// Total bytes of pinned pages in pages_, stored to avoid iterating over the list - /// to compute it. - int64_t bytes_pinned_; - - /// Number of rows stored in the stream. Includes rows that were already deleted during - /// a destructive 'delete_on_read' pass over the stream. - int64_t num_rows_; - - /// The default length in bytes of pages used to store the stream's rows. All rows that - /// fit in a default-sized page are stored in default-sized page. - const int64_t default_page_len_; - - /// The maximum length in bytes of pages used to store the stream's rows. This is a - /// hard limit on the maximum size of row that can be stored in the stream and the - /// amount of reservation required to read or write to an unpinned stream. - const int64_t max_page_len_; - - /// Whether any tuple in the rows is nullable. - const bool has_nullable_tuple_; - - /// If true, pages are deleted after they are read during this read pass. Once rows - /// have been read from a stream with 'delete_on_read_' true, this is always true. - bool delete_on_read_; - - bool closed_; // Used for debugging. - - /// If true, this stream has been explicitly pinned by the caller and all pages are - /// kept pinned until the caller calls UnpinStream(). - bool pinned_; - - bool is_read_page(const Page* page) const { - return read_page_ != pages_.end() && &*read_page_ == page; - } - - bool is_write_page(const Page* page) const { return write_page_ == page; } - - /// Return true if the read and write page are the same. - bool has_read_write_page() const { - return write_page_ != nullptr && is_read_page(write_page_); - } - - /// The slow path for AddRow() that is called if there is not sufficient space in - /// the current page. - bool AddRowSlow(TupleRow* row, Status* status) noexcept; - - /// The slow path for AddRowCustomBegin() that is called if there is not sufficient space in - /// the current page. - uint8_t* AddRowCustomBeginSlow(int64_t size, Status* status) noexcept; - - /// The slow path for AddRowCustomEnd() that is called for large pages. - void AddLargeRowCustomEnd(int64_t size) noexcept; - - /// Copies 'row' into the buffer starting at *data and ending at the byte before - /// 'data_end'. On success, returns true and updates *data to point after the last - /// byte written. Returns false if there is not enough space in the buffer provided. - bool DeepCopy(TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept; - - /// Templated implementation of DeepCopy(). - template <bool HAS_NULLABLE_TUPLE> - bool DeepCopyInternal(TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept; - - /// Helper function to copy strings in string_slots from tuple into *data. - /// Updates *data to the end of the string data added. Returns false if the data - /// does not fit in the buffer [*data, data_end). - static bool CopyStrings(const Tuple* tuple, - const std::vector<SlotDescriptor*>& string_slots, uint8_t** data, - const uint8_t* data_end); - - /// Helper function to deep copy collections in collection_slots from tuple into - /// the buffer [*data, data_end). Updates *data to the end of the collection data - /// added. Returns false if the data does not fit in the buffer. - static bool CopyCollections(const Tuple* tuple, - const std::vector<SlotDescriptor*>& collection_slots, uint8_t** data, - const uint8_t* data_end); - - /// Gets a new page of 'page_len' bytes from buffer_pool_, updating write_page_, - /// write_ptr_ and write_end_ptr_. The caller must ensure there is 'page_len' unused - /// reservation. The caller must reset the write page (if there is one) before calling. - Status NewWritePage(int64_t page_len) noexcept WARN_UNUSED_RESULT; - - /// Determines what page size is needed to fit a row of 'row_size' bytes. - /// Returns an error if the row cannot fit in a page. - Status CalcPageLenForRow(int64_t row_size, int64_t* page_len); - - /// Wrapper around NewWritePage() that allocates a new write page that fits a row of - /// 'row_size' bytes. Increases reservation if needed to allocate the next page. - /// Returns OK and sets 'got_reservation' to true if the write page was successfully - /// allocated. Returns an error if the row cannot fit in a page. Returns OK and sets - /// 'got_reservation' to false if the reservation could not be increased and no other - /// error was encountered. - Status AdvanceWritePage( - int64_t row_size, bool* got_reservation) noexcept WARN_UNUSED_RESULT; - - /// Reset the write page, if there is one, and unpin pages accordingly. If there - /// is an active write iterator, the next row will be appended to a new page. - void ResetWritePage(); - - /// Invalidate the write iterator and release any resources associated with it. After - /// calling this, no more rows can be appended to the stream. - void InvalidateWriteIterator(); - - /// Same as PrepareForRead(), except the iterators are not invalidated and - /// the caller is assumed to have checked there is sufficient unused reservation. - Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT; - - /// Pins the next read page. This blocks reading from disk if necessary to bring the - /// page's data into memory. Updates read_page_, read_ptr_, and - /// read_page_rows_returned_. - Status NextReadPage() WARN_UNUSED_RESULT; - - /// Invalidate the read iterator, and release any resources associated with the active - /// iterator. - void InvalidateReadIterator(); - - /// Returns the total additional bytes that this row will consume in write_page_ if - /// appended to the page. This includes the row's null indicators, the fixed length - /// part of the row and the data for inlined_string_slots_ and inlined_coll_slots_. - int64_t ComputeRowSize(TupleRow* row) const noexcept; - - /// Pins page and updates tracking stats. - Status PinPage(Page* page) WARN_UNUSED_RESULT; - - /// Increment the page's pin count if this page needs a higher pin count given the - /// current read and write iterator positions and whether the stream will be pinned - /// ('stream_pinned'). Assumes that no scenarios occur when the pin count needs to - /// be incremented multiple times. The caller is responsible for ensuring sufficient - /// reservation is available. - Status PinPageIfNeeded(Page* page, bool stream_pinned) WARN_UNUSED_RESULT; - - /// Decrement the page's pin count if this page needs a lower pin count given the - /// current read and write iterator positions and whether the stream will be pinned - /// ('stream_pinned'). Assumes that no scenarios occur when the pin count needs to - /// be decremented multiple times. - void UnpinPageIfNeeded(Page* page, bool stream_pinned); - - /// Return the expected pin count for 'page' in the current stream based on the current - /// read and write pages and whether the stream is pinned. - int ExpectedPinCount(bool stream_pinned, const Page* page) const; - - /// Return true if the stream in its current state needs to have a reservation for - /// a write page stored in 'write_page_reservation_'. - bool NeedWriteReservation() const; - - /// Same as above, except assume the stream's 'pinned_' state is 'stream_pinned'. - bool NeedWriteReservation(bool stream_pinned) const; - - /// Same as above, except assume the stream has 'num_pages' pages and different - /// iterator state. - static bool NeedWriteReservation(bool stream_pinned, int64_t num_pages, - bool has_write_iterator, bool has_write_page, bool has_read_write_page); - - /// Return true if the stream in its current state needs to have a reservation for - /// a read page stored in 'read_page_reservation_'. - bool NeedReadReservation() const; - - /// Same as above, except assume the stream's 'pinned_' state is 'stream_pinned'. - bool NeedReadReservation(bool stream_pinned) const; - - /// Same as above, except assume the stream has 'num_pages' pages and a different - /// read iterator state. - bool NeedReadReservation(bool stream_pinned, int64_t num_pages, bool has_read_iterator, - bool has_read_page) const; - - /// Same as above, except assume the stream has 'num_pages' pages and a different - /// write iterator state. - static bool NeedReadReservation(bool stream_pinned, int64_t num_pages, - bool has_read_iterator, bool has_read_page, bool has_write_iterator, - bool has_write_page); - - /// Templated GetNext implementations. - template <bool FILL_FLAT_ROWS> - Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows); - template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE> - Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows); - - /// Helper function to convert a flattened TupleRow stored starting at '*data' into - /// 'row'. *data is updated to point to the first byte past the end of the row. - template <bool HAS_NULLABLE_TUPLE> - void UnflattenTupleRow(uint8_t** data, TupleRow* row) const; - - /// Helper function for GetNextInternal(). For each string slot in string_slots, - /// update StringValue's ptr field to point to the corresponding string data stored - /// inline in the stream (at the current value of read_ptr_) advance read_ptr_ by the - /// StringValue's length field. - void FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots, Tuple* tuple); - - /// Helper function for GetNextInternal(). For each collection slot in collection_slots, - /// recursively update any pointers in the CollectionValue to point to the corresponding - /// var len data stored inline in the stream, advancing read_ptr_ as data is read. - /// Assumes that the collection was serialized to the stream in DeepCopy()'s format. - void FixUpCollectionsForRead( - const vector<SlotDescriptor*>& collection_slots, Tuple* tuple); - - /// Returns the number of null indicator bytes per row. Only valid if this stream has - /// nullable tuples. - int NullIndicatorBytesPerRow() const; - - /// Returns the total bytes pinned. Only called in DCHECKs to validate bytes_pinned_. - int64_t CalcBytesPinned() const; - - /// DCHECKs if the stream is internally inconsistent. The stream should always be in - /// a consistent state after returning success from a public API call. The Fast version - /// has constant runtime and does not check all of 'pages_'. The Full version includes - /// O(n) checks that require iterating over the whole 'pages_' list (e.g. checking that - /// each page is in a valid state). - void CheckConsistencyFast() const; - void CheckConsistencyFull() const; - void CheckPageConsistency(const Page* page) const; -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/runtime/buffered-tuple-stream-v2.inline.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.inline.h b/be/src/runtime/buffered-tuple-stream-v2.inline.h deleted file mode 100644 index 7022249..0000000 --- a/be/src/runtime/buffered-tuple-stream-v2.inline.h +++ /dev/null @@ -1,56 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_INLINE_H -#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_INLINE_H - -#include "runtime/buffered-tuple-stream-v2.h" - -#include "runtime/descriptors.h" -#include "runtime/tuple-row.h" -#include "util/bit-util.h" - -namespace impala { - -inline int BufferedTupleStreamV2::NullIndicatorBytesPerRow() const { - DCHECK(has_nullable_tuple_); - return BitUtil::RoundUpNumBytes(fixed_tuple_sizes_.size()); -} - -inline uint8_t* BufferedTupleStreamV2::AddRowCustomBegin(int64_t size, Status* status) { - DCHECK(!closed_); - DCHECK(has_write_iterator()); - if (UNLIKELY(write_page_ == nullptr || write_ptr_ + size > write_end_ptr_)) { - return AddRowCustomBeginSlow(size, status); - } - DCHECK(write_page_ != nullptr); - DCHECK(write_page_->is_pinned()); - DCHECK_LE(write_ptr_ + size, write_end_ptr_); - ++num_rows_; - ++write_page_->num_rows; - - uint8_t* data = write_ptr_; - write_ptr_ += size; - return data; -} - -inline void BufferedTupleStreamV2::AddRowCustomEnd(int64_t size) { - if (UNLIKELY(size > default_page_len_)) AddLargeRowCustomEnd(size); -} -} - -#endif