This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 794b9a35c37daced3139cc477bf3c60ee7af300b
Author: Tianyi Wang <tia...@apache.org>
AuthorDate: Sat Jun 9 15:57:02 2018 -0700

    IMPALA-3816: (prep) Move TupleSorter to sorter-ir.cc
    
    To inline calls to Compare() in TupleSorter, we need to cross compile
    TupleSorter to LLVM-IR first.
    This patch also adopted some clang-tidy suggestions including using
    nullptr.
    
    Change-Id: Iaaf2b75c2f789002c42939865c018f728d29a113
    Reviewed-on: http://gerrit.cloudera.org:8080/10679
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/codegen/impala-ir.cc      |   1 +
 be/src/runtime/CMakeLists.txt    |   1 +
 be/src/runtime/sorter-internal.h | 500 ++++++++++++++++++++
 be/src/runtime/sorter-ir.cc      | 252 ++++++++++
 be/src/runtime/sorter.cc         | 972 ++++++---------------------------------
 5 files changed, 903 insertions(+), 823 deletions(-)

diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 52586f9..b1e72a1 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -61,6 +61,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/raw-value-ir.cc"
 #include "runtime/runtime-filter-ir.cc"
+#include "runtime/sorter-ir.cc"
 #include "runtime/tuple-ir.cc"
 #include "udf/udf-ir.cc"
 #include "util/bloom-filter-ir.cc"
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index f3a3554..a84344d 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -69,6 +69,7 @@ add_library(Runtime
   scanner-mem-limiter.cc
   sorted-run-merger.cc
   sorter.cc
+  sorter-ir.cc
   string-value.cc
   thread-resource-mgr.cc
   timestamp-parse-util.cc
diff --git a/be/src/runtime/sorter-internal.h b/be/src/runtime/sorter-internal.h
new file mode 100644
index 0000000..ea8275a
--- /dev/null
+++ b/be/src/runtime/sorter-internal.h
@@ -0,0 +1,500 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Definitions of classes that are internal to the Sorter but shared between 
.cc files.
+
+#pragma once
+
+#include "sorter.h"
+
+namespace impala {
+
+/// Wrapper around BufferPool::PageHandle that tracks additional info about 
the page.
+/// The Page can be in four states:
+/// * Closed: The page starts in this state before Init() is called. Calling
+///   ExtractBuffer() or Close() puts the page back in this state. No other 
operations
+///   are valid on a closed page.
+/// * In memory: the page is pinned and the buffer is in memory. data() is 
valid. The
+///   page is in this state after Init(). If the page is pinned but not in 
memory, it
+///   can be brought into this state by calling WaitForBuffer().
+/// * Unpinned: the page was unpinned by calling Unpin(). It is invalid to 
access the
+///   page's buffer.
+/// * Pinned but not in memory: Pin() was called on the unpinned page, but
+///   WaitForBuffer() has not been called. It is invalid to access the page's 
buffer.
+class Sorter::Page {
+ public:
+  Page() { Reset(); }
+
+  /// Create a new page of length 'sorter->page_len_' bytes using
+  /// 'sorter->buffer_pool_client_'. Caller must ensure the client has enough
+  /// reservation for the page.
+  Status Init(Sorter* sorter);
+
+  /// Extract the buffer from the page. The page must be in memory. When this 
function
+  /// returns the page is closed.
+  BufferPool::BufferHandle ExtractBuffer(BufferPool::ClientHandle* client);
+
+  /// Allocate 'len' bytes in the current page. The page must be in memory, 
and the
+  /// amount to allocate cannot exceed BytesRemaining().
+  uint8_t* AllocateBytes(int64_t len);
+
+  /// Free the last 'len' bytes allocated from AllocateBytes(). The page must 
be in
+  /// memory.
+  void FreeBytes(int64_t len);
+
+  /// Return number of bytes remaining in page.
+  int64_t BytesRemaining() { return len() - valid_data_len_; }
+
+  /// Brings a pinned page into memory, if not already in memory, and sets 
'data_' to
+  /// point to the page's buffer.
+  Status WaitForBuffer();
+
+  /// Helper to pin the page. Caller must ensure the client has enough 
reservation
+  /// remaining to pin the page. Only valid to call on an unpinned page.
+  Status Pin(BufferPool::ClientHandle* client);
+
+  /// Helper to unpin the page.
+  void Unpin(BufferPool::ClientHandle* client);
+
+  /// Destroy the page with 'client'.
+  void Close(BufferPool::ClientHandle* client);
+
+  int64_t valid_data_len() const { return valid_data_len_; }
+  /// Returns a pointer to the start of the page's buffer. Only valid to call 
if the
+  /// page is in memory.
+  uint8_t* data() const {
+    DCHECK(data_ != nullptr);
+    return data_;
+  }
+  int64_t len() const { return handle_.len(); }
+  bool is_open() const { return handle_.is_open(); }
+  bool is_pinned() const { return handle_.is_pinned(); }
+  std::string DebugString() const { return handle_.DebugString(); }
+
+ private:
+  /// Reset the page to an uninitialized state. 'handle_' must already be 
closed.
+  void Reset();
+
+  /// Helper to get the singleton buffer pool.
+  static BufferPool* pool();
+
+  BufferPool::PageHandle handle_;
+
+  /// Length of valid data written to the page.
+  int64_t valid_data_len_;
+
+  /// Cached pointer to the buffer in 'handle_'. NULL if the page is unpinned. 
May be
+  /// NULL or not NULL if the page is pinned. Can be populated by calling
+  /// WaitForBuffer() on a pinned page.
+  uint8_t* data_;
+};
+
+/// A run is a sequence of tuples. The run can be sorted or unsorted (in which 
case the
+/// Sorter will sort it). A run comprises a sequence of fixed-length pages 
containing
+/// the tuples themselves (i.e. fixed-len slots that may contain ptrs to 
var-length
+/// data), and an optional sequence of var-length pages containing the 
var-length data.
+///
+/// Runs are either "initial runs" constructed from the sorter's input by 
evaluating
+/// the expressions in 'sort_tuple_exprs_' or "intermediate runs" constructed
+/// by merging already-sorted runs. Initial runs are sorted in-place in 
memory. Once
+/// sorted, runs can be spilled to disk to free up memory. Sorted runs are 
merged by
+/// SortedRunMerger, either to produce the final sorted output or to produce 
another
+/// sorted run.
+///
+/// The expected calling sequence of functions is as follows:
+/// * Init() to initialize the run and allocate initial pages.
+/// * Add*Batch() to add batches of tuples to the run.
+/// * FinalizeInput() to signal that no more batches will be added.
+/// * If the run is unsorted, it must be sorted. After that set_sorted() must 
be called.
+/// * Once sorted, the run is ready to read in sorted order for merging or 
final output.
+/// * PrepareRead() to allocate resources for reading the run.
+/// * GetNext() (if there was a single run) or GetNextBatch() (when merging 
multiple
+/// * runs) to read from the run.
+/// * Once reading is done, CloseAllPages() should be called to free resources.
+class Sorter::Run {
+ public:
+  Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool initial_run);
+
+  ~Run();
+
+  /// Initialize the run for input rows by allocating the minimum number of 
required
+  /// pages - one page for fixed-len data added to fixed_len_pages_, one for 
the
+  /// initially unsorted var-len data added to var_len_pages_, and one to copy 
sorted
+  /// var-len data into var_len_copy_page_.
+  Status Init();
+
+  /// Add the rows from 'batch' starting at 'start_index' to the current run. 
Returns
+  /// the number of rows actually added in 'num_processed'. If the run is full 
(no more
+  /// pages can be allocated), 'num_processed' may be less than the number of 
remaining
+  /// rows in the batch. AddInputBatch() materializes the input rows using the
+  /// expressions in sorter_->sort_tuple_expr_evals_, while 
AddIntermediateBatch() just
+  /// copies rows.
+  Status AddInputBatch(RowBatch* batch, int start_index, int* num_processed);
+
+  Status AddIntermediateBatch(RowBatch* batch, int start_index, int* 
num_processed);
+
+  /// Called after the final call to Add*Batch() to do any bookkeeping 
necessary to
+  /// finalize the run. Must be called before sorting or merging the run.
+  Status FinalizeInput();
+
+  /// Unpins all the pages in a sorted run. Var-length column data is copied 
into new
+  /// pages in sorted order. Pointers in the original tuples are converted to 
offsets
+  /// from the beginning of the sequence of var-len data pages. Returns an 
error and
+  /// may leave some pages pinned if an error is encountered.
+  Status UnpinAllPages();
+
+  /// Closes all pages and clears vectors of pages.
+  void CloseAllPages();
+
+  /// Prepare to read a sorted run. Pins the first page(s) in the run if the 
run was
+  /// previously unpinned. If the run was unpinned, try to pin the initial 
fixed and
+  /// var len pages in the run. If it couldn't pin them, an error Status is 
returned.
+  Status PrepareRead();
+
+  /// Interface for merger - get the next batch of rows from this run. This 
run still
+  /// owns the returned batch. Calls GetNext(RowBatch*, bool*).
+  Status GetNextBatch(RowBatch** sorted_batch);
+
+  /// Fill output_batch with rows from this run. If CONVERT_OFFSET_TO_PTR is 
true,
+  /// offsets in var-length slots are converted back to pointers. Only row 
pointers are
+  /// copied into output_batch. eos is set to true after all rows from the run 
are
+  /// returned. If eos is true, the returned output_batch has zero rows and 
has no
+  /// attached pages. If this run was unpinned, one page (two if there are 
var-len
+  /// slots) is pinned while rows are filled into output_batch. The page is 
unpinned
+  /// before the next page is pinned, so at most one (two if there are var-len 
slots)
+  /// page(s) will be pinned at once. If the run was pinned, the pages are not 
unpinned
+  /// and each page is attached to 'output_batch' once all rows referencing 
data in the
+  /// page have been returned, either in the current batch or previous 
batches. In both
+  /// pinned and unpinned cases, all rows in output_batch will reference at 
most one
+  /// fixed-len and one var-len page.
+  template <bool CONVERT_OFFSET_TO_PTR>
+  Status GetNext(RowBatch* output_batch, bool* eos);
+
+  /// Delete all pages in 'runs' and clear 'runs'.
+  static void CleanupRuns(std::deque<Run*>* runs);
+
+  /// Return total amount of fixed and var len data in run, not including 
pages that
+  /// were already transferred or closed.
+  int64_t TotalBytes() const;
+
+  bool is_pinned() const { return is_pinned_; }
+  bool is_finalized() const { return is_finalized_; }
+  bool is_sorted() const { return is_sorted_; }
+  void set_sorted() { is_sorted_ = true; }
+  int64_t num_tuples() const { return num_tuples_; }
+  /// Returns true if we have var-len pages in the run.
+  bool HasVarLenPages() const {
+    // Shouldn't have any pages unless there are slots.
+    DCHECK(var_len_pages_.empty() || has_var_len_slots_);
+    return !var_len_pages_.empty();
+  }
+
+ private:
+  /// TupleIterator needs access to internals to iterate over tuples.
+  friend class TupleIterator;
+
+  /// Templatized implementation of Add*Batch() functions.
+  /// INITIAL_RUN and HAS_VAR_LEN_SLOTS are template arguments for performance 
and must
+  /// match 'initial_run_' and 'has_var_len_slots_'.
+  template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
+  Status AddBatchInternal(
+      RowBatch* batch, int start_index, int* num_processed);
+
+  /// Finalize the list of pages: delete empty final pages and unpin the 
previous page
+  /// if the run is unpinned.
+  Status FinalizePages(vector<Page>* pages);
+
+  /// Collect the non-null var-len (e.g. STRING) slots from 'src' in 
'var_len_values' and
+  /// return the total length of all var-len values in 'total_var_len'.
+  void CollectNonNullVarSlots(
+      Tuple* src, vector<StringValue*>* var_len_values, int* total_var_len);
+
+  enum AddPageMode { KEEP_PREV_PINNED, UNPIN_PREV };
+
+  /// Try to extend the current run by a page. If 'mode' is KEEP_PREV_PINNED, 
try to
+  /// allocate a new page, which may fail to extend the run due to lack of 
memory. If
+  /// mode is 'UNPIN_PREV', unpin the previous page in page_sequence before 
allocating
+  /// and adding a new page - this never fails due to lack of memory.
+  ///
+  /// Returns an error status only if the buffer pool returns an error. If no 
error is
+  /// encountered, sets 'added' to indicate whether the run was extended and 
returns
+  /// Status::OK(). The new page is appended to 'page_sequence'.
+  Status TryAddPage(AddPageMode mode, vector<Page>* page_sequence, bool* 
added);
+
+  /// Adds a new page to 'page_sequence' by a page. Caller must ensure enough
+  /// reservation is available to create the page.
+  ///
+  /// Returns an error status only if the buffer pool returns an error. If an 
error
+  /// is returned 'page_sequence' is left unmodified.
+  Status AddPage(vector<Page>* page_sequence);
+
+  /// Advance to the next read page. If the run is pinned, has no effect. If 
the run
+  /// is unpinned, the pin at 'page_index' was already attached to an output 
batch and
+  /// this function will pin the page at 'page_index' + 1 in 'pages'.
+  Status PinNextReadPage(vector<Page>* pages, int page_index);
+
+  /// Copy the StringValues in 'var_values' to 'dest' in order and update the 
StringValue
+  /// ptrs in 'dest' to point to the copied data.
+  void CopyVarLenData(const vector<StringValue*>& var_values, uint8_t* dest);
+
+  /// Copy the StringValues in 'var_values' to 'dest' in order. Update the 
StringValue
+  /// ptrs in 'dest' to contain a packed offset for the copied data comprising
+  /// page_index and the offset relative to page_start.
+  void CopyVarLenDataConvertOffset(const vector<StringValue*>& var_values, int 
page_index,
+      const uint8_t* page_start, uint8_t* dest);
+
+  /// Convert encoded offsets to valid pointers in tuple with layout 
'sort_tuple_desc_'.
+  /// 'tuple' is modified in-place. Returns true if the pointers refer to the 
page at
+  /// 'var_len_pages_index_' and were successfully converted or false if the 
var len
+  /// data is in the next page, in which case 'tuple' is unmodified.
+  bool ConvertOffsetsToPtrs(Tuple* tuple);
+
+  int NumOpenPages(const vector<Page>& pages);
+
+  /// Close all open pages and clear vector.
+  void DeleteAndClearPages(vector<Page>* pages);
+
+  /// Parent sorter object.
+  Sorter* const sorter_;
+
+  /// Materialized sort tuple. Input rows are materialized into 1 tuple (with 
descriptor
+  /// sort_tuple_desc_) before sorting.
+  const TupleDescriptor* sort_tuple_desc_;
+
+  /// The size in bytes of the sort tuple.
+  const int sort_tuple_size_;
+
+  /// Number of tuples per page in a run. This gets multiplied with
+  /// TupleIterator::page_index_ in various places and to make sure we don't 
overflow
+  /// the result of that operation we make this int64_t here.
+  const int64_t page_capacity_;
+
+  const bool has_var_len_slots_;
+
+  /// True if this is an initial run. False implies this is an sorted 
intermediate run
+  /// resulting from merging other runs.
+  const bool initial_run_;
+
+  /// True if all pages in the run are pinned. Initial runs start off pinned 
and
+  /// can be unpinned. Intermediate runs are always unpinned.
+  bool is_pinned_;
+
+  /// True after FinalizeInput() is called. No more tuples can be added after 
the
+  /// run is finalized.
+  bool is_finalized_;
+
+  /// True if the tuples in the run are currently in sorted order.
+  /// Always true for intermediate runs.
+  bool is_sorted_;
+
+  /// Sequence of pages in this run containing the fixed-length portion of the 
sort
+  /// tuples comprising this run. The data pointed to by the var-len slots are 
in
+  /// var_len_pages_. A run can have zero pages if no rows are appended.
+  /// If the run is sorted, the tuples in fixed_len_pages_ will be in sorted 
order.
+  /// fixed_len_pages_[i] is closed iff it has been transferred or deleted.
+  vector<Page> fixed_len_pages_;
+
+  /// Sequence of pages in this run containing the var-length data 
corresponding to the
+  /// var-length columns from fixed_len_pages_. In intermediate runs, the 
var-len data
+  /// is always stored in the same order as the fixed-length tuples. In 
initial runs,
+  /// the var-len data is initially in unsorted order, but is reshuffled into 
sorted
+  /// order in UnpinAllPages(). A run can have no var len pages if there are 
no var len
+  /// slots or if all the var len data is empty or NULL.
+  /// var_len_pages_[i] is closed iff it has been transferred or deleted.
+  vector<Page> var_len_pages_;
+
+  /// For initial unsorted runs, an extra pinned page is needed to reorder 
var-len data
+  /// into fixed order in UnpinAllPages(). 'var_len_copy_page_' stores this 
extra
+  /// page. Deleted in UnpinAllPages().
+  /// TODO: in case of in-memory runs, this could be deleted earlier to free 
up memory.
+  Page var_len_copy_page_;
+
+  /// Number of tuples added so far to this run.
+  int64_t num_tuples_;
+
+  /// Number of tuples returned via GetNext(), maintained for debug purposes.
+  int64_t num_tuples_returned_;
+
+  /// Used to implement GetNextBatch() interface required for the merger.
+  boost::scoped_ptr<RowBatch> buffered_batch_;
+
+  /// Members used when a run is read in GetNext().
+  /// The index into 'fixed_' and 'var_len_pages_' of the pages being read in 
GetNext().
+  int fixed_len_pages_index_;
+  int var_len_pages_index_;
+
+  /// If true, the last call to GetNext() reached the end of the previous 
fixed or
+  /// var-len page. The next call to GetNext() must increment 
'fixed_len_pages_index_'
+  /// or 'var_len_pages_index_'. It must also pin the next page if the run is 
unpinned.
+  bool end_of_fixed_len_page_;
+  bool end_of_var_len_page_;
+
+  /// Offset into the current fixed length data page being processed.
+  int fixed_len_page_offset_;
+};
+
+/// Helper class used to iterate over tuples in a run during sorting.
+class Sorter::TupleIterator {
+ public:
+  /// Creates an iterator pointing at the tuple with the given 'index' in the 
'run'.
+  /// The index can be in the range [0, run->num_tuples()]. If it is equal to
+  /// run->num_tuples(), the iterator points to one past the end of the run, so
+  /// invoking Prev() will cause the iterator to point at the last tuple in 
the run.
+  /// 'run' must be finalized.
+  TupleIterator(Sorter::Run* run, int64_t index);
+
+  /// Default constructor used for local variable. Produces invalid iterator 
that must
+  /// be assigned before use.
+  TupleIterator() : index_(-1), tuple_(nullptr), buffer_start_index_(-1),
+      buffer_end_index_(-1), page_index_(-1) { }
+
+  /// Create an iterator pointing to the first tuple in the run.
+  static TupleIterator Begin(Sorter::Run* run) { return {run, 0}; }
+
+  /// Create an iterator pointing one past the end of the run.
+  static TupleIterator End(Sorter::Run* run) { return {run, 
run->num_tuples()}; }
+
+  /// Increments 'index_' and sets 'tuple_' to point to the next tuple in the 
run.
+  /// Increments 'page_index_' and advances to the next page if the next tuple 
is in
+  /// the next page. Can be advanced one past the last tuple in the run, but 
is not
+  /// valid to dereference 'tuple_' in that case. 'run' and 'tuple_size' are 
passed as
+  /// arguments to avoid redundantly storing the same values in multiple 
iterators in
+  /// perf-critical algorithms.
+  void Next(Sorter::Run* run, int tuple_size);
+
+  /// The reverse of Next(). Can advance one before the first tuple in the 
run, but it
+  /// is invalid to dereference 'tuple_' in that case.
+  void Prev(Sorter::Run* run, int tuple_size);
+
+  int64_t index() const { return index_; }
+  Tuple* tuple() const { return reinterpret_cast<Tuple*>(tuple_); }
+  /// Returns current tuple in TupleRow format. The caller should not modify 
the row.
+  const TupleRow* row() const {
+    return reinterpret_cast<const TupleRow*>(&tuple_);
+  }
+
+ private:
+  /// Move to the next page in the run (or do nothing if at end of run).
+  /// This is the slow path for Next();
+  void NextPage(Sorter::Run* run);
+
+  /// Move to the previous page in the run (or do nothing if at beginning of 
run).
+  /// This is the slow path for Prev();
+  void PrevPage(Sorter::Run* run);
+
+  /// Index of the current tuple in the run.
+  /// Can be -1 or run->num_rows() if Next() or Prev() moves iterator outside 
of run.
+  int64_t index_;
+
+  /// Pointer to the current tuple.
+  /// Will be an invalid pointer outside of current buffer if Next() or Prev() 
moves
+  /// iterator outside of run.
+  uint8_t* tuple_;
+
+  /// Indices of start and end tuples of page at page_index_. I.e. the current 
page
+  /// has tuples with indices in range [buffer_start_index_, buffer_end_index).
+  int64_t buffer_start_index_;
+  int64_t buffer_end_index_;
+
+  /// Index into fixed_len_pages_ of the page containing the current tuple.
+  /// If index_ is negative or past end of run, will point to the first or 
last page
+  /// in run respectively.
+  int page_index_;
+};
+
+/// Sorts a sequence of tuples from a run in place using a provided tuple 
comparator.
+/// Quick sort is used for sequences of tuples larger that 16 elements, and 
insertion
+/// sort is used for smaller sequences. The TupleSorter is initialized with a
+/// RuntimeState instance to check for cancellation during an in-memory sort.
+class Sorter::TupleSorter {
+ public:
+  TupleSorter(Sorter* parent, const TupleRowComparator& comparator,
+        int tuple_size, RuntimeState* state);
+
+  ~TupleSorter();
+
+  /// Performs a quicksort for tuples in 'run' followed by an insertion sort to
+  /// finish smaller ranges. Only valid to call if this is an initial run that 
has not
+  /// yet been sorted. Returns an error status if any error is encountered or 
if the
+  /// query is cancelled.
+  Status Sort(Run* run);
+
+ private:
+  static const int INSERTION_THRESHOLD = 16;
+
+  Sorter* const parent_;
+
+  /// Size of the tuples in memory.
+  const int tuple_size_;
+
+  /// Tuple comparator with method Less() that returns true if lhs < rhs.
+  const TupleRowComparator& comparator_;
+
+  /// Number of times comparator_.Less() can be invoked again before
+  /// comparator_. expr_results_pool_.Clear() needs to be called.
+  int num_comparisons_till_free_;
+
+  /// Runtime state instance to check for cancellation. Not owned.
+  RuntimeState* const state_;
+
+  /// The run to be sorted.
+  Run* run_;
+
+  /// Temporarily allocated space to copy and swap tuples (Both are used in
+  /// Partition()). Owned by this TupleSorter instance.
+  uint8_t* temp_tuple_buffer_;
+  uint8_t* swap_buffer_;
+
+  /// Random number generator used to randomly choose pivots. We need a RNG 
that
+  /// can generate 64-bit ints. Quality of randomness doesn't need to be 
especially
+  /// high: Mersenne Twister should be more than adequate.
+  std::mt19937_64 rng_;
+
+  /// Wrapper around comparator_.Less(). Also call expr_results_pool_.Clear()
+  /// on every 'state_->batch_size()' invocations of comparator_.Less(). 
Returns true
+  /// if 'lhs' is less than 'rhs'.
+  bool Less(const TupleRow* lhs, const TupleRow* rhs);
+
+  /// Perform an insertion sort for rows in the range [begin, end) in a run.
+  /// Only valid to call for ranges of size at least 1.
+  Status InsertionSort(const TupleIterator& begin, const TupleIterator& end);
+
+  /// Partitions the sequence of tuples in the range [begin, end) in a run 
into two
+  /// groups around the pivot tuple - i.e. tuples in first group are <= the 
pivot, and
+  /// tuples in the second group are >= pivot. Tuples are swapped in place to 
create the
+  /// groups and the index to the first element in the second group is 
returned in
+  /// 'cut'. Return an error status if any error is encountered or if the 
query is
+  /// cancelled.
+  Status Partition(TupleIterator begin, TupleIterator end,
+      const Tuple* pivot, TupleIterator* cut);
+
+  /// Performs a quicksort of rows in the range [begin, end) followed by 
insertion sort
+  /// for smaller groups of elements. Return an error status for any errors or 
if the
+  /// query is cancelled.
+  Status SortHelper(TupleIterator begin, TupleIterator end);
+
+  /// Select a pivot to partition [begin, end).
+  Tuple* SelectPivot(TupleIterator begin, TupleIterator end);
+
+  /// Return median of three tuples according to the sort comparator.
+  Tuple* MedianOfThree(Tuple* t1, Tuple* t2, Tuple* t3);
+
+  /// Swaps tuples pointed to by left and right using 'swap_tuple'.
+  static void Swap(Tuple* left, Tuple* right, Tuple* swap_tuple, int 
tuple_size);
+};
+
+} // namespace impala
diff --git a/be/src/runtime/sorter-ir.cc b/be/src/runtime/sorter-ir.cc
new file mode 100644
index 0000000..4da6bdd
--- /dev/null
+++ b/be/src/runtime/sorter-ir.cc
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/sorter-internal.h"
+
+#include <boost/random/uniform_int.hpp>
+
+#include "runtime/exec-env.h"
+#include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
+
+namespace  impala {
+
+void Sorter::TupleIterator::NextPage(Sorter::Run* run) {
+  // When moving after the last tuple, stay at the last page.
+  if (index_ >= run->num_tuples()) return;
+  ++page_index_;
+  DCHECK_LT(page_index_, run->fixed_len_pages_.size());
+  buffer_start_index_ = page_index_ * run->page_capacity_;
+  DCHECK_EQ(index_, buffer_start_index_);
+  buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
+  tuple_ = run->fixed_len_pages_[page_index_].data();
+}
+
+void Sorter::TupleIterator::PrevPage(Sorter::Run* run) {
+  // When moving before the first tuple, stay at the first page.
+  if (index_ < 0) return;
+  --page_index_;
+  DCHECK_GE(page_index_, 0);
+  buffer_start_index_ = page_index_ * run->page_capacity_;
+  buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
+  DCHECK_EQ(index_, buffer_end_index_ - 1);
+  int last_tuple_page_offset = run->sort_tuple_size_ * (run->page_capacity_ - 
1);
+  tuple_ = run->fixed_len_pages_[page_index_].data() + last_tuple_page_offset;
+}
+
+void Sorter::TupleIterator::Next(Sorter::Run* run, int tuple_size) {
+  DCHECK_LT(index_, run->num_tuples()) << "Can only advance one past end of 
run";
+  tuple_ += tuple_size;
+  ++index_;
+  if (UNLIKELY(index_ >= buffer_end_index_)) NextPage(run);
+}
+
+void Sorter::TupleIterator::Prev(Sorter::Run* run, int tuple_size) {
+  DCHECK_GE(index_, 0) << "Can only advance one before start of run";
+  tuple_ -= tuple_size;
+  --index_;
+  if (UNLIKELY(index_ < buffer_start_index_)) PrevPage(run);
+}
+
+bool Sorter::TupleSorter::Less(const TupleRow* lhs, const TupleRow* rhs) {
+  --num_comparisons_till_free_;
+  DCHECK_GE(num_comparisons_till_free_, 0);
+  if (UNLIKELY(num_comparisons_till_free_ == 0)) {
+    parent_->expr_results_pool_.Clear();
+    num_comparisons_till_free_ = state_->batch_size();
+  }
+  return comparator_.Less(lhs, rhs);
+}
+
+Status Sorter::TupleSorter::Partition(TupleIterator begin,
+    TupleIterator end, const Tuple* pivot, TupleIterator* cut) {
+  // Hoist member variable lookups out of loop to avoid extra loads inside 
loop.
+  Run* run = run_;
+  int tuple_size = tuple_size_;
+  Tuple* temp_tuple = reinterpret_cast<Tuple*>(temp_tuple_buffer_);
+  Tuple* swap_tuple = reinterpret_cast<Tuple*>(swap_buffer_);
+
+  // Copy pivot into temp_tuple since it points to a tuple within [begin, end).
+  DCHECK(temp_tuple != nullptr);
+  DCHECK(pivot != nullptr);
+  memcpy(temp_tuple, pivot, tuple_size);
+
+  TupleIterator left = begin;
+  TupleIterator right = end;
+  right.Prev(run, tuple_size); // Set 'right' to the last tuple in range.
+  while (true) {
+    // Search for the first and last out-of-place elements, and swap them.
+    while (Less(left.row(), reinterpret_cast<TupleRow*>(&temp_tuple))) {
+      left.Next(run, tuple_size);
+    }
+    while (Less(reinterpret_cast<TupleRow*>(&temp_tuple), right.row())) {
+      right.Prev(run, tuple_size);
+    }
+
+    if (left.index() >= right.index()) break;
+    // Swap first and last tuples.
+    Swap(left.tuple(), right.tuple(), swap_tuple, tuple_size);
+
+    left.Next(run, tuple_size);
+    right.Prev(run, tuple_size);
+    RETURN_IF_CANCELLED(state_);
+    RETURN_IF_ERROR(state_->GetQueryStatus());
+  }
+  *cut = left;
+  return Status::OK();
+}
+
+// Sort the sequence of tuples from [begin, last).
+// Begin with a sorted sequence of size 1 [begin, begin+1).
+// During each pass of the outermost loop, add the next tuple (at position 
'i') to
+// the sorted sequence by comparing it to each element of the sorted sequence
+// (reverse order) to find its correct place in the sorted sequence, copying 
tuples
+// along the way.
+Status Sorter::TupleSorter::InsertionSort(const TupleIterator& begin,
+    const TupleIterator& end) {
+  DCHECK_LT(begin.index(), end.index());
+
+  // Hoist member variable lookups out of loop to avoid extra loads inside 
loop.
+  Run* run = run_;
+  int tuple_size = tuple_size_;
+  uint8_t* temp_tuple_buffer = temp_tuple_buffer_;
+
+  TupleIterator insert_iter = begin;
+  insert_iter.Next(run, tuple_size);
+  for (; insert_iter.index() < end.index(); insert_iter.Next(run, tuple_size)) 
{
+    // insert_iter points to the tuple after the currently sorted sequence 
that must
+    // be inserted into the sorted sequence. Copy to temp_tuple_buffer_ since 
it may be
+    // overwritten by the one at position 'insert_iter - 1'
+    memcpy(temp_tuple_buffer, insert_iter.tuple(), tuple_size);
+
+    // 'iter' points to the tuple that temp_tuple_buffer will be compared to.
+    // 'copy_to' is the where iter should be copied to if it is >= 
temp_tuple_buffer.
+    // copy_to always to the next row after 'iter'
+    TupleIterator iter = insert_iter;
+    iter.Prev(run, tuple_size);
+    Tuple* copy_to = insert_iter.tuple();
+    while (Less(reinterpret_cast<TupleRow*>(&temp_tuple_buffer), iter.row())) {
+      memcpy(copy_to, iter.tuple(), tuple_size);
+      copy_to = iter.tuple();
+      // Break if 'iter' has reached the first row, meaning that the temp row
+      // will be inserted in position 'begin'
+      if (iter.index() <= begin.index()) break;
+      iter.Prev(run, tuple_size);
+    }
+
+    memcpy(copy_to, temp_tuple_buffer, tuple_size);
+  }
+  RETURN_IF_CANCELLED(state_);
+  RETURN_IF_ERROR(state_->GetQueryStatus());
+  return Status::OK();
+}
+
+
+Status Sorter::TupleSorter::SortHelper(TupleIterator begin, TupleIterator end) 
{
+  // Use insertion sort for smaller sequences.
+  while (end.index() - begin.index() > INSERTION_THRESHOLD) {
+    // Select a pivot and call Partition() to split the tuples in [begin, end) 
into two
+    // groups (<= pivot and >= pivot) in-place. 'cut' is the index of the 
first tuple in
+    // the second group.
+    Tuple* pivot = SelectPivot(begin, end);
+    TupleIterator cut;
+    RETURN_IF_ERROR(Partition(begin, end, pivot, &cut));
+
+    // Recurse on the smaller partition. This limits stack size to log(n) 
stack frames.
+    if (cut.index() - begin.index() < end.index() - cut.index()) {
+      // Left partition is smaller.
+      RETURN_IF_ERROR(SortHelper(begin, cut));
+      begin = cut;
+    } else {
+      // Right partition is equal or smaller.
+      RETURN_IF_ERROR(SortHelper(cut, end));
+      end = cut;
+    }
+  }
+
+  if (begin.index() < end.index()) RETURN_IF_ERROR(InsertionSort(begin, end));
+  return Status::OK();
+}
+
+Tuple* Sorter::TupleSorter::SelectPivot(TupleIterator begin, TupleIterator 
end) {
+  // Select the median of three random tuples. The random selection avoids 
pathological
+  // behaviour associated with techniques that pick a fixed element (e.g. 
picking
+  // first/last/middle element) and taking the median tends to help us select 
better
+  // pivots that more evenly split the input range. This method makes 
selection of
+  // bad pivots very infrequent.
+  //
+  // To illustrate, if we define a bad pivot as one in the lower or upper 10% 
of values,
+  // then the median of three is a bad pivot only if all three 
randomly-selected values
+  // are in the lower or upper 10%. The probability of that is 0.2 * 0.2 * 0.2 
= 0.008:
+  // less than 1%. Since selection is random each time, the chance of 
repeatedly picking
+  // bad pivots decreases exponentialy and becomes negligibly small after a few
+  // iterations.
+  Tuple* tuples[3];
+  for (auto& tuple : tuples) {
+    int64_t index = boost::uniform_int<int64_t>(begin.index(), end.index() - 
1)(rng_);
+    TupleIterator iter(run_, index);
+    DCHECK(iter.tuple() != nullptr);
+    tuple = iter.tuple();
+  }
+
+  return MedianOfThree(tuples[0], tuples[1], tuples[2]);
+}
+
+Tuple* Sorter::TupleSorter::MedianOfThree(Tuple* t1, Tuple* t2, Tuple* t3) {
+  TupleRow* tr1 = reinterpret_cast<TupleRow*>(&t1);
+  TupleRow* tr2 = reinterpret_cast<TupleRow*>(&t2);
+  TupleRow* tr3 = reinterpret_cast<TupleRow*>(&t3);
+
+  bool t1_lt_t2 = Less(tr1, tr2);
+  bool t2_lt_t3 = Less(tr2, tr3);
+  bool t1_lt_t3 = Less(tr1, tr3);
+
+  if (t1_lt_t2) {
+    // t1 < t2
+    if (t2_lt_t3) {
+      // t1 < t2 < t3
+      return t2;
+    } else if (t1_lt_t3) {
+      // t1 < t3 <= t2
+      return t3;
+    } else {
+      // t3 <= t1 < t2
+      return t1;
+    }
+  } else {
+    // t2 <= t1
+    if (t1_lt_t3) {
+      // t2 <= t1 < t3
+      return t1;
+    } else if (t2_lt_t3) {
+      // t2 < t3 <= t1
+      return t3;
+    } else {
+      // t3 <= t2 <= t1
+      return t2;
+    }
+  }
+}
+
+void Sorter::TupleSorter::Swap(Tuple* left, Tuple* right, Tuple* swap_tuple,
+    int tuple_size) {
+  memcpy(swap_tuple, left, tuple_size);
+  memcpy(left, right, tuple_size);
+  memcpy(right, swap_tuple, tuple_size);
+}
+
+}
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 806a0b4..a7a5a64 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -15,9 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/sorter.h"
-
-#include <limits>
+#include "runtime/sorter-internal.h"
 
 #include <boost/random/mersenne_twister.hpp>
 #include <boost/random/uniform_int.hpp>
@@ -27,11 +25,8 @@
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
-#include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/sorted-run-merger.h"
-#include "util/pretty-printer.h"
-#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 
@@ -44,562 +39,72 @@ namespace impala {
 // Number of pinned pages required for a merge with fixed-length data only.
 const int MIN_BUFFERS_PER_MERGE = 3;
 
-/// Wrapper around BufferPool::PageHandle that tracks additional info about 
the page.
-/// The Page can be in four states:
-/// * Closed: The page starts in this state before Init() is called. Calling
-///   ExtractBuffer() or Close() puts the page back in this state. No other 
operations
-///   are valid on a closed page.
-/// * In memory: the page is pinned and the buffer is in memory. data() is 
valid. The
-///   page is in this state after Init(). If the page is pinned but not in 
memory, it
-///   can be brought into this state by calling WaitForBuffer().
-/// * Unpinned: the page was unpinned by calling Unpin(). It is invalid to 
access the
-///   page's buffer.
-/// * Pinned but not in memory: Pin() was called on the unpinned page, but
-///   WaitForBuffer() has not been called. It is invalid to access the page's 
buffer.
-class Sorter::Page {
- public:
-  Page() { Reset(); }
-
-  /// Create a new page of length 'sorter->page_len_' bytes using
-  /// 'sorter->buffer_pool_client_'. Caller must ensure the client has enough
-  /// reservation for the page.
-  Status Init(Sorter* sorter) WARN_UNUSED_RESULT {
-    const BufferPool::BufferHandle* page_buffer;
-    RETURN_IF_ERROR(pool()->CreatePage(sorter->buffer_pool_client_, 
sorter->page_len_,
-        &handle_, &page_buffer));
-    data_ = page_buffer->data();
-    return Status::OK();
-  }
-
-  /// Extract the buffer from the page. The page must be in memory. When this 
function
-  /// returns the page is closed.
-  BufferPool::BufferHandle ExtractBuffer(BufferPool::ClientHandle* client) {
-    DCHECK(data_ != nullptr) << "Page must be in memory";
-    BufferPool::BufferHandle buffer;
-    Status status = pool()->ExtractBuffer(client, &handle_, &buffer);
-    DCHECK(status.ok()) << "Page was in memory, ExtractBuffer() shouldn't 
fail";
-    Reset();
-    return buffer;
-  }
-
-  /// Allocate 'len' bytes in the current page. The page must be in memory, 
and the
-  /// amount to allocate cannot exceed BytesRemaining().
-  uint8_t* AllocateBytes(int64_t len) {
-    DCHECK_GE(len, 0);
-    DCHECK_LE(len, BytesRemaining());
-    DCHECK(data_ != nullptr);
-    uint8_t* result = data_ + valid_data_len_;
-    valid_data_len_ += len;
-    return result;
-  }
-
-  /// Free the last 'len' bytes allocated from AllocateBytes(). The page must 
be in
-  /// memory.
-  void FreeBytes(int64_t len) {
-    DCHECK_GE(len, 0);
-    DCHECK_LE(len, valid_data_len_);
-    DCHECK(data_ != nullptr);
-    valid_data_len_ -= len;
-  }
-
-  /// Return number of bytes remaining in page.
-  int64_t BytesRemaining() { return len() - valid_data_len_; }
-
-  /// Brings a pinned page into memory, if not already in memory, and sets 
'data_' to
-  /// point to the page's buffer.
-  Status WaitForBuffer() WARN_UNUSED_RESULT {
-    DCHECK(handle_.is_pinned());
-    if (data_ != nullptr) return Status::OK();
-    const BufferPool::BufferHandle* page_buffer;
-    RETURN_IF_ERROR(handle_.GetBuffer(&page_buffer));
-    data_ = page_buffer->data();
-    return Status::OK();
-  }
-
-  /// Helper to pin the page. Caller must ensure the client has enough 
reservation
-  /// remaining to pin the page. Only valid to call on an unpinned page.
-  Status Pin(BufferPool::ClientHandle* client) WARN_UNUSED_RESULT {
-    DCHECK(!handle_.is_pinned());
-    return pool()->Pin(client, &handle_);
-  }
-
-  /// Helper to unpin the page.
-  void Unpin(BufferPool::ClientHandle* client) {
-    pool()->Unpin(client, &handle_);
-    data_ = nullptr;
-  }
-
-  /// Destroy the page with 'client'.
-  void Close(BufferPool::ClientHandle* client) {
-    pool()->DestroyPage(client, &handle_);
-    Reset();
-  }
-
-  int64_t valid_data_len() const { return valid_data_len_; }
-  /// Returns a pointer to the start of the page's buffer. Only valid to call 
if the
-  /// page is in memory.
-  uint8_t* data() const {
-    DCHECK(data_ != nullptr);
-    return data_;
-  }
-  int64_t len() const { return handle_.len(); }
-  bool is_open() const { return handle_.is_open(); }
-  bool is_pinned() const { return handle_.is_pinned(); }
-  std::string DebugString() const { return handle_.DebugString(); }
-
- private:
-  /// Reset the page to an unitialized state. 'handle_' must already be closed.
-  void Reset() {
-    DCHECK(!handle_.is_open());
-    valid_data_len_ = 0;
-    data_ = nullptr;
-  }
-
-  /// Helper to get the singleton buffer pool.
-  static BufferPool* pool() { return ExecEnv::GetInstance()->buffer_pool(); }
-
-  BufferPool::PageHandle handle_;
-
-  /// Length of valid data written to the page.
-  int64_t valid_data_len_;
-
-  /// Cached pointer to the buffer in 'handle_'. NULL if the page is unpinned. 
May be NULL
-  /// or not NULL if the page is pinned. Can be populated by calling 
WaitForBuffer() on a
-  /// pinned page.
-  uint8_t* data_;
-};
-
-/// A run is a sequence of tuples. The run can be sorted or unsorted (in which 
case the
-/// Sorter will sort it). A run comprises a sequence of fixed-length pages 
containing the
-/// tuples themselves (i.e. fixed-len slots that may contain ptrs to 
var-length data), and
-/// an optional sequence of var-length pages containing the var-length data.
-///
-/// Runs are either "initial runs" constructed from the sorter's input by 
evaluating
-/// the expressions in 'sort_tuple_exprs_' or "intermediate runs" constructed
-/// by merging already-sorted runs. Initial runs are sorted in-place in 
memory. Once
-/// sorted, runs can be spilled to disk to free up memory. Sorted runs are 
merged by
-/// SortedRunMerger, either to produce the final sorted output or to produce 
another
-/// sorted run.
-///
-/// The expected calling sequence of functions is as follows:
-/// * Init() to initialize the run and allocate initial pages.
-/// * Add*Batch() to add batches of tuples to the run.
-/// * FinalizeInput() to signal that no more batches will be added.
-/// * If the run is unsorted, it must be sorted. After that set_sorted() must 
be called.
-/// * Once sorted, the run is ready to read in sorted order for merging or 
final output.
-/// * PrepareRead() to allocate resources for reading the run.
-/// * GetNext() (if there was a single run) or GetNextBatch() (when merging 
multiple runs)
-///   to read from the run.
-/// * Once reading is done, CloseAllPages() should be called to free resources.
-class Sorter::Run {
- public:
-  Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool initial_run);
-
-  ~Run() {
-    DCHECK(fixed_len_pages_.empty());
-    DCHECK(var_len_pages_.empty());
-    DCHECK(!var_len_copy_page_.is_open());
-  }
+Status Sorter::Page::Init(Sorter* sorter) {
+  const BufferPool::BufferHandle* page_buffer;
+  RETURN_IF_ERROR(pool()->CreatePage(sorter->buffer_pool_client_, 
sorter->page_len_,
+      &handle_, &page_buffer));
+  data_ = page_buffer->data();
+  return Status::OK();
+}
 
-  /// Initialize the run for input rows by allocating the minimum number of 
required
-  /// pages - one page for fixed-len data added to fixed_len_pages_, one for 
the
-  /// initially unsorted var-len data added to var_len_pages_, and one to copy 
sorted
-  /// var-len data into var_len_copy_page_.
-  Status Init() WARN_UNUSED_RESULT;
-
-  /// Add the rows from 'batch' starting at 'start_index' to the current run. 
Returns the
-  /// number of rows actually added in 'num_processed'. If the run is full (no 
more pages
-  /// can be allocated), 'num_processed' may be less than the number of 
remaining rows in
-  /// the batch. AddInputBatch() materializes the input rows using the 
expressions in
-  /// sorter_->sort_tuple_expr_evals_, while AddIntermediateBatch() just 
copies rows.
-  Status AddInputBatch(
-      RowBatch* batch, int start_index, int* num_processed) WARN_UNUSED_RESULT 
{
-    DCHECK(initial_run_);
-    if (has_var_len_slots_) {
-      return AddBatchInternal<true, true>(batch, start_index, num_processed);
-    } else {
-      return AddBatchInternal<false, true>(batch, start_index, num_processed);
-    }
-  }
+BufferPool::BufferHandle Sorter::Page::ExtractBuffer(BufferPool::ClientHandle* 
client) {
+  DCHECK(data_ != nullptr) << "Page must be in memory";
+  BufferPool::BufferHandle buffer;
+  Status status = pool()->ExtractBuffer(client, &handle_, &buffer);
+  DCHECK(status.ok()) << "Page was in memory, ExtractBuffer() shouldn't fail";
+  Reset();
+  return buffer;
+}
 
-  Status AddIntermediateBatch(
-      RowBatch* batch, int start_index, int* num_processed) WARN_UNUSED_RESULT 
{
-    DCHECK(!initial_run_);
-    if (has_var_len_slots_) {
-      return AddBatchInternal<true, false>(batch, start_index, num_processed);
-    } else {
-      return AddBatchInternal<false, false>(batch, start_index, num_processed);
-    }
-  }
+uint8_t* Sorter::Page::AllocateBytes(int64_t len) {
+  DCHECK_GE(len, 0);
+  DCHECK_LE(len, BytesRemaining());
+  DCHECK(data_ != nullptr);
+  uint8_t* result = data_ + valid_data_len_;
+  valid_data_len_ += len;
+  return result;
+}
 
-  /// Called after the final call to Add*Batch() to do any bookkeeping 
necessary to
-  /// finalize the run. Must be called before sorting or merging the run.
-  Status FinalizeInput() WARN_UNUSED_RESULT;
-
-  /// Unpins all the pages in a sorted run. Var-length column data is copied 
into new
-  /// pages in sorted order. Pointers in the original tuples are converted to 
offsets
-  /// from the beginning of the sequence of var-len data pages. Returns an 
error and
-  /// may leave some pages pinned if an error is encountered.
-  Status UnpinAllPages() WARN_UNUSED_RESULT;
-
-  /// Closes all pages and clears vectors of pages.
-  void CloseAllPages();
-
-  /// Prepare to read a sorted run. Pins the first page(s) in the run if the 
run was
-  /// previously unpinned. If the run was unpinned, try to pin the initial 
fixed and
-  /// var len pages in the run. If it couldn't pin them, an error Status is 
returned.
-  Status PrepareRead() WARN_UNUSED_RESULT;
-
-  /// Interface for merger - get the next batch of rows from this run. This 
run still
-  /// owns the returned batch. Calls GetNext(RowBatch*, bool*).
-  Status GetNextBatch(RowBatch** sorted_batch) WARN_UNUSED_RESULT;
-
-  /// Fill output_batch with rows from this run. If CONVERT_OFFSET_TO_PTR is 
true, offsets
-  /// in var-length slots are converted back to pointers. Only row pointers 
are copied
-  /// into output_batch. eos is set to true after all rows from the run are 
returned.
-  /// If eos is true, the returned output_batch has zero rows and has no 
attached pages.
-  /// If this run was unpinned, one page (two if there are var-len slots) is 
pinned while
-  /// rows are filled into output_batch. The page is unpinned before the next 
page is
-  /// pinned, so at most one (two if there are var-len slots) page(s) will be 
pinned at
-  /// once. If the run was pinned, the pages are not unpinned and each page is 
attached
-  /// to 'output_batch' once all rows referencing data in the page have been 
returned,
-  /// either in the current batch or previous batches. In both pinned and 
unpinned cases,
-  /// all rows in output_batch will reference at most one fixed-len and one 
var-len page.
-  template <bool CONVERT_OFFSET_TO_PTR>
-  Status GetNext(RowBatch* output_batch, bool* eos) WARN_UNUSED_RESULT;
-
-  /// Delete all pages in 'runs' and clear 'runs'.
-  static void CleanupRuns(deque<Run*>* runs) {
-    for (Run* run : *runs) {
-      run->CloseAllPages();
-    }
-    runs->clear();
-  }
+void Sorter::Page::FreeBytes(int64_t len) {
+  DCHECK_GE(len, 0);
+  DCHECK_LE(len, valid_data_len_);
+  DCHECK(data_ != nullptr);
+  valid_data_len_ -= len;
+}
 
-  /// Return total amount of fixed and var len data in run, not including 
pages that
-  /// were already transferred or closed.
-  int64_t TotalBytes() const;
-
-  inline bool is_pinned() const { return is_pinned_; }
-  inline bool is_finalized() const { return is_finalized_; }
-  inline bool is_sorted() const { return is_sorted_; }
-  inline void set_sorted() { is_sorted_ = true; }
-  inline int64_t num_tuples() const { return num_tuples_; }
-
-  /// Returns true if we have var-len pages in the run.
-  inline bool HasVarLenPages() const {
-    // Shouldn't have any pages unless there are slots.
-    DCHECK(var_len_pages_.empty() || has_var_len_slots_);
-    return !var_len_pages_.empty();
-  }
+Status Sorter::Page::WaitForBuffer() {
+  DCHECK(handle_.is_pinned());
+  if (data_ != nullptr) return Status::OK();
+  const BufferPool::BufferHandle* page_buffer;
+  RETURN_IF_ERROR(handle_.GetBuffer(&page_buffer));
+  data_ = page_buffer->data();
+  return Status::OK();
+}
 
- private:
-  /// TupleIterator needs access to internals to iterate over tuples.
-  friend class TupleIterator;
-
-  /// Templatized implementation of Add*Batch() functions.
-  /// INITIAL_RUN and HAS_VAR_LEN_SLOTS are template arguments for performance 
and must
-  /// match 'initial_run_' and 'has_var_len_slots_'.
-  template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
-  Status AddBatchInternal(
-      RowBatch* batch, int start_index, int* num_processed) WARN_UNUSED_RESULT;
-
-  /// Finalize the list of pages: delete empty final pages and unpin the 
previous page
-  /// if the run is unpinned.
-  Status FinalizePages(vector<Page>* pages) WARN_UNUSED_RESULT;
-
-  /// Collect the non-null var-len (e.g. STRING) slots from 'src' in 
'var_len_values' and
-  /// return the total length of all var-len values in 'total_var_len'.
-  void CollectNonNullVarSlots(
-      Tuple* src, vector<StringValue*>* var_len_values, int* total_var_len);
-
-  enum AddPageMode { KEEP_PREV_PINNED, UNPIN_PREV };
-
-  /// Try to extend the current run by a page. If 'mode' is KEEP_PREV_PINNED, 
try to
-  /// allocate a new page, which may fail to extend the run due to lack of 
memory. If
-  /// mode is 'UNPIN_PREV', unpin the previous page in page_sequence before 
allocating
-  /// and adding a new page - this never fails due to lack of memory.
-  ///
-  /// Returns an error status only if the buffer pool returns an error. If no 
error is
-  /// encountered, sets 'added' to indicate whether the run was extended and 
returns
-  /// Status::OK(). The new page is appended to 'page_sequence'.
-  Status TryAddPage(
-      AddPageMode mode, vector<Page>* page_sequence, bool* added) 
WARN_UNUSED_RESULT;
-
-  /// Adds a new page to 'page_sequence' by a page. Caller must ensure enough
-  /// reservation is available to create the page.
-  ///
-  /// Returns an error status only if the buffer pool returns an error. If an 
error
-  /// is returned 'page_sequence' is left unmodified.
-  Status AddPage(vector<Page>* page_sequence) WARN_UNUSED_RESULT;
-
-  /// Advance to the next read page. If the run is pinned, has no effect. If 
the run
-  /// is unpinned, the pin at 'page_index' was already attached to an output 
batch and
-  /// this function will pin the page at 'page_index' + 1 in 'pages'.
-  Status PinNextReadPage(vector<Page>* pages, int page_index) 
WARN_UNUSED_RESULT;
-
-  /// Copy the StringValues in 'var_values' to 'dest' in order and update the 
StringValue
-  /// ptrs in 'dest' to point to the copied data.
-  void CopyVarLenData(const vector<StringValue*>& var_values, uint8_t* dest);
-
-  /// Copy the StringValues in 'var_values' to 'dest' in order. Update the 
StringValue
-  /// ptrs in 'dest' to contain a packed offset for the copied data comprising
-  /// page_index and the offset relative to page_start.
-  void CopyVarLenDataConvertOffset(const vector<StringValue*>& var_values, int 
page_index,
-      const uint8_t* page_start, uint8_t* dest);
-
-  /// Convert encoded offsets to valid pointers in tuple with layout 
'sort_tuple_desc_'.
-  /// 'tuple' is modified in-place. Returns true if the pointers refer to the 
page at
-  /// 'var_len_pages_index_' and were successfully converted or false if the 
var len
-  /// data is in the next page, in which case 'tuple' is unmodified.
-  bool ConvertOffsetsToPtrs(Tuple* tuple);
-
-  static int NumOpenPages(const vector<Page>& pages) {
-    int count = 0;
-    for (const Page& page : pages) {
-      if (page.is_open()) ++count;
-    }
-    return count;
-  }
+Status Sorter::Page::Pin(BufferPool::ClientHandle* client) {
+  DCHECK(!handle_.is_pinned());
+  return pool()->Pin(client, &handle_);
+}
 
-  /// Close all open pages and clear vector.
-  void DeleteAndClearPages(vector<Page>* pages) {
-    for (Page& page : *pages) {
-      if (page.is_open()) page.Close(sorter_->buffer_pool_client_);
-    }
-    pages->clear();
-  }
+void Sorter::Page::Unpin(BufferPool::ClientHandle* client) {
+  pool()->Unpin(client, &handle_);
+  data_ = nullptr;
+}
 
-  /// Parent sorter object.
-  Sorter* const sorter_;
-
-  /// Materialized sort tuple. Input rows are materialized into 1 tuple (with 
descriptor
-  /// sort_tuple_desc_) before sorting.
-  const TupleDescriptor* sort_tuple_desc_;
-
-  /// The size in bytes of the sort tuple.
-  const int sort_tuple_size_;
-
-  /// Number of tuples per page in a run. This gets multiplied with
-  /// TupleIterator::page_index_ in various places and to make sure we don't 
overflow the
-  /// result of that operation we make this int64_t here.
-  const int64_t page_capacity_;
-
-  const bool has_var_len_slots_;
-
-  /// True if this is an initial run. False implies this is an sorted 
intermediate run
-  /// resulting from merging other runs.
-  const bool initial_run_;
-
-  /// True if all pages in the run are pinned. Initial runs start off pinned 
and
-  /// can be unpinned. Intermediate runs are always unpinned.
-  bool is_pinned_;
-
-  /// True after FinalizeInput() is called. No more tuples can be added after 
the
-  /// run is finalized.
-  bool is_finalized_;
-
-  /// True if the tuples in the run are currently in sorted order.
-  /// Always true for intermediate runs.
-  bool is_sorted_;
-
-  /// Sequence of pages in this run containing the fixed-length portion of the 
sort
-  /// tuples comprising this run. The data pointed to by the var-len slots are 
in
-  /// var_len_pages_. A run can have zero pages if no rows are appended.
-  /// If the run is sorted, the tuples in fixed_len_pages_ will be in sorted 
order.
-  /// fixed_len_pages_[i] is closed iff it has been transferred or deleted.
-  vector<Page> fixed_len_pages_;
-
-  /// Sequence of pages in this run containing the var-length data 
corresponding to the
-  /// var-length columns from fixed_len_pages_. In intermediate runs, the 
var-len data is
-  /// always stored in the same order as the fixed-length tuples. In initial 
runs, the
-  /// var-len data is initially in unsorted order, but is reshuffled into 
sorted order in
-  /// UnpinAllPages(). A run can have no var len pages if there are no var len 
slots or
-  /// if all the var len data is empty or NULL.
-  /// var_len_pages_[i] is closed iff it has been transferred or deleted.
-  vector<Page> var_len_pages_;
-
-  /// For initial unsorted runs, an extra pinned page is needed to reorder 
var-len data
-  /// into fixed order in UnpinAllPages(). 'var_len_copy_page_' stores this 
extra
-  /// page. Deleted in UnpinAllPages().
-  /// TODO: in case of in-memory runs, this could be deleted earlier to free 
up memory.
-  Page var_len_copy_page_;
-
-  /// Number of tuples added so far to this run.
-  int64_t num_tuples_;
-
-  /// Number of tuples returned via GetNext(), maintained for debug purposes.
-  int64_t num_tuples_returned_;
-
-  /// Used to implement GetNextBatch() interface required for the merger.
-  scoped_ptr<RowBatch> buffered_batch_;
-
-  /// Members used when a run is read in GetNext().
-  /// The index into 'fixed_' and 'var_len_pages_' of the pages being read in 
GetNext().
-  int fixed_len_pages_index_;
-  int var_len_pages_index_;
-
-  /// If true, the last call to GetNext() reached the end of the previous 
fixed or
-  /// var-len page. The next call to GetNext() must increment 
'fixed_len_pages_index_'
-  /// or 'var_len_pages_index_'. It must also pin the next page if the run is 
unpinned.
-  bool end_of_fixed_len_page_;
-  bool end_of_var_len_page_;
-
-  /// Offset into the current fixed length data page being processed.
-  int fixed_len_page_offset_;
-};
-
-/// Helper class used to iterate over tuples in a run during sorting.
-class Sorter::TupleIterator {
- public:
-  /// Creates an iterator pointing at the tuple with the given 'index' in the 
'run'.
-  /// The index can be in the range [0, run->num_tuples()]. If it is equal to
-  /// run->num_tuples(), the iterator points to one past the end of the run, so
-  /// invoking Prev() will cause the iterator to point at the last tuple in 
the run.
-  /// 'run' must be finalized.
-  TupleIterator(Sorter::Run* run, int64_t index);
-
-  /// Default constructor used for local variable. Produces invalid iterator 
that must
-  /// be assigned before use.
-  TupleIterator() : index_(-1), tuple_(NULL), buffer_start_index_(-1),
-      buffer_end_index_(-1), page_index_(-1) { }
-
-  /// Create an iterator pointing to the first tuple in the run.
-  static inline TupleIterator Begin(Sorter::Run* run) { return 
TupleIterator(run, 0); }
-
-  /// Create an iterator pointing one past the end of the run.
-  static inline TupleIterator End(Sorter::Run* run) {
-    return TupleIterator(run, run->num_tuples());
-  }
+void Sorter::Page::Close(BufferPool::ClientHandle* client) {
+  pool()->DestroyPage(client, &handle_);
+  Reset();
+}
 
-  /// Increments 'index_' and sets 'tuple_' to point to the next tuple in the 
run.
-  /// Increments 'page_index_' and advances to the next page if the next tuple 
is in
-  /// the next page. Can be advanced one past the last tuple in the run, but 
is not
-  /// valid to dereference 'tuple_' in that case. 'run' and 'tuple_size' are 
passed as
-  /// arguments to avoid redundantly storing the same values in multiple 
iterators in
-  /// perf-critical algorithms.
-  inline void Next(Sorter::Run* run, int tuple_size);
-
-  /// The reverse of Next(). Can advance one before the first tuple in the 
run, but it is
-  /// invalid to dereference 'tuple_' in that case.
-  inline void Prev(Sorter::Run* run, int tuple_size);
-
-  inline int64_t index() const { return index_; }
-  inline Tuple* tuple() const { return reinterpret_cast<Tuple*>(tuple_); }
-  /// Returns current tuple in TupleRow format. The caller should not modify 
the row.
-  inline const TupleRow* row() const {
-    return reinterpret_cast<const TupleRow*>(&tuple_);
-  }
+void Sorter::Page::Reset() {
+  DCHECK(!handle_.is_open());
+  valid_data_len_ = 0;
+  data_ = nullptr;
+}
 
- private:
-  // Move to the next page in the run (or do nothing if at end of run).
-  // This is the slow path for Next();
-  void NextPage(Sorter::Run* run, int tuple_size);
-
-  // Move to the previous page in the run (or do nothing if at beginning of 
run).
-  // This is the slow path for Prev();
-  void PrevPage(Sorter::Run* run, int tuple_size);
-
-  /// Index of the current tuple in the run.
-  /// Can be -1 or run->num_rows() if Next() or Prev() moves iterator outside 
of run.
-  int64_t index_;
-
-  /// Pointer to the current tuple.
-  /// Will be an invalid pointer outside of current buffer if Next() or Prev() 
moves
-  /// iterator outside of run.
-  uint8_t* tuple_;
-
-  /// Indices of start and end tuples of page at page_index_. I.e. the current 
page
-  /// has tuples with indices in range [buffer_start_index_, buffer_end_index).
-  int64_t buffer_start_index_;
-  int64_t buffer_end_index_;
-
-  /// Index into fixed_len_pages_ of the page containing the current tuple.
-  /// If index_ is negative or past end of run, will point to the first or 
last page
-  /// in run respectively.
-  int page_index_;
-};
-
-/// Sorts a sequence of tuples from a run in place using a provided tuple 
comparator.
-/// Quick sort is used for sequences of tuples larger that 16 elements, and 
insertion sort
-/// is used for smaller sequences. The TupleSorter is initialized with a 
RuntimeState
-/// instance to check for cancellation during an in-memory sort.
-class Sorter::TupleSorter {
- public:
-  TupleSorter(Sorter* parent, const TupleRowComparator& comparator, int64_t 
page_size,
-      int tuple_size, RuntimeState* state);
-
-  ~TupleSorter();
-
-  /// Performs a quicksort for tuples in 'run' followed by an insertion sort to
-  /// finish smaller ranges. Only valid to call if this is an initial run that 
has not
-  /// yet been sorted. Returns an error status if any error is encountered or 
if the
-  /// query is cancelled.
-  Status Sort(Run* run) WARN_UNUSED_RESULT;
-
- private:
-  static const int INSERTION_THRESHOLD = 16;
-
-  Sorter* const parent_;
-
-  /// Size of the tuples in memory.
-  const int tuple_size_;
-
-  /// Tuple comparator with method Less() that returns true if lhs < rhs.
-  const TupleRowComparator& comparator_;
-
-  /// Number of times comparator_.Less() can be invoked again before
-  /// comparator_. expr_results_pool_.Clear() needs to be called.
-  int num_comparisons_till_free_;
-
-  /// Runtime state instance to check for cancellation. Not owned.
-  RuntimeState* const state_;
-
-  /// The run to be sorted.
-  Run* run_;
-
-  /// Temporarily allocated space to copy and swap tuples (Both are used in 
Partition()).
-  /// Owned by this TupleSorter instance.
-  uint8_t* temp_tuple_buffer_;
-  uint8_t* swap_buffer_;
-
-  /// Random number generator used to randomly choose pivots. We need a RNG 
that
-  /// can generate 64-bit ints. Quality of randomness doesn't need to be 
especially
-  /// high: Mersenne Twister should be more than adequate.
-  mt19937_64 rng_;
-
-  /// Wrapper around comparator_.Less(). Also call expr_results_pool_.Clear()
-  /// on every 'state_->batch_size()' invocations of comparator_.Less(). 
Returns true
-  /// if 'lhs' is less than 'rhs'.
-  bool Less(const TupleRow* lhs, const TupleRow* rhs);
-
-  /// Perform an insertion sort for rows in the range [begin, end) in a run.
-  /// Only valid to call for ranges of size at least 1.
-  Status InsertionSort(
-      const TupleIterator& begin, const TupleIterator& end) WARN_UNUSED_RESULT;
-
-  /// Partitions the sequence of tuples in the range [begin, end) in a run 
into two
-  /// groups around the pivot tuple - i.e. tuples in first group are <= the 
pivot, and
-  /// tuples in the second group are >= pivot. Tuples are swapped in place to 
create the
-  /// groups and the index to the first element in the second group is 
returned in 'cut'.
-  /// Return an error status if any error is encountered or if the query is 
cancelled.
-  Status Partition(TupleIterator begin, TupleIterator end, const Tuple* pivot,
-      TupleIterator* cut) WARN_UNUSED_RESULT;
-
-  /// Performs a quicksort of rows in the range [begin, end) followed by 
insertion sort
-  /// for smaller groups of elements. Return an error status for any errors or 
if the
-  /// query is cancelled.
-  Status SortHelper(TupleIterator begin, TupleIterator end) WARN_UNUSED_RESULT;
-
-  /// Select a pivot to partition [begin, end).
-  Tuple* SelectPivot(TupleIterator begin, TupleIterator end);
-
-  /// Return median of three tuples according to the sort comparator.
-  Tuple* MedianOfThree(Tuple* t1, Tuple* t2, Tuple* t3);
-
-  /// Swaps tuples pointed to by left and right using 'swap_tuple'.
-  static void Swap(Tuple* left, Tuple* right, Tuple* swap_tuple, int 
tuple_size);
-};
+BufferPool* Sorter::Page::pool() {
+  return ExecEnv::GetInstance()->buffer_pool();
+}
 
 // Sorter::Run methods
 Sorter::Run::Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool 
initial_run)
@@ -685,7 +190,7 @@ Status Sorter::Run::AddBatchInternal(
           
reinterpret_cast<Tuple*>(cur_fixed_len_page->AllocateBytes(sort_tuple_size_));
       if (INITIAL_RUN) {
         new_tuple->MaterializeExprs<HAS_VAR_LEN_SLOTS, true>(input_row,
-            *sort_tuple_desc_, sorter_->sort_tuple_expr_evals_, NULL,
+            *sort_tuple_desc_, sorter_->sort_tuple_expr_evals_, nullptr,
             &string_values, &total_var_len);
         if (total_var_len > sorter_->page_len_) {
           int64_t max_row_size = sorter_->state_->query_options().max_row_size;
@@ -799,7 +304,7 @@ Status Sorter::Run::UnpinAllPages() {
   vector<StringValue*> string_values;
   int total_var_len;
   string_values.reserve(sort_tuple_desc_->string_slots().size());
-  Page* cur_sorted_var_len_page = NULL;
+  Page* cur_sorted_var_len_page = nullptr;
   if (HasVarLenPages()) {
     DCHECK(var_len_copy_page_.is_open());
     sorted_var_len_pages.push_back(move(var_len_copy_page_));
@@ -813,8 +318,8 @@ Status Sorter::Run::UnpinAllPages() {
   }
 
   Status status;
-  for (int i = 0; i < fixed_len_pages_.size(); ++i) {
-    Page* cur_fixed_page = &fixed_len_pages_[i];
+  for (auto& fixed_len_page : fixed_len_pages_) {
+    Page* cur_fixed_page = &fixed_len_page;
     // Skip converting the pointers if no var-len slots, or if all the values 
are null
     // or zero-length. This will possibly leave zero-length pointers pointing 
to
     // arbitrary memory, but zero-length data cannot be dereferenced anyway.
@@ -874,7 +379,7 @@ Status Sorter::Run::PrepareRead() {
   if (is_pinned_) return Status::OK();
 
   // Pins the first fixed len page.
-  if (fixed_len_pages_.size() > 0) {
+  if (!fixed_len_pages_.empty()) {
     RETURN_IF_ERROR(fixed_len_pages_[0].Pin(sorter_->buffer_pool_client_));
   }
 
@@ -887,7 +392,7 @@ Status Sorter::Run::PrepareRead() {
 }
 
 Status Sorter::Run::GetNextBatch(RowBatch** output_batch) {
-  DCHECK(buffered_batch_ != NULL);
+  DCHECK(buffered_batch_ != nullptr);
   buffered_batch_->Reset();
   // Fill more rows into buffered_batch_.
   bool eos;
@@ -898,11 +403,11 @@ Status Sorter::Run::GetNextBatch(RowBatch** output_batch) 
{
   }
 
   if (eos) {
-    // Setting output_batch to NULL signals eos to the caller, so GetNext() is 
not
+    // Setting output_batch to nullptr signals eos to the caller, so GetNext() 
is not
     // allowed to attach resources to the batch on eos.
     DCHECK_EQ(buffered_batch_->num_rows(), 0);
     DCHECK_EQ(buffered_batch_->num_buffers(), 0);
-    *output_batch = NULL;
+    *output_batch = nullptr;
     return Status::OK();
   }
   *output_batch = buffered_batch_.get();
@@ -945,7 +450,7 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* 
eos) {
 
   // Fills rows into the output batch until a page boundary is reached.
   Page* fixed_len_page = &fixed_len_pages_[fixed_len_pages_index_];
-  DCHECK(fixed_len_page != NULL);
+  DCHECK(fixed_len_page != nullptr);
 
   // Ensure we have a reference to the fixed-length page's buffer.
   RETURN_IF_ERROR(fixed_len_page->WaitForBuffer());
@@ -958,7 +463,7 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* 
eos) {
 
   while (!output_batch->AtCapacity()
       && fixed_len_page_offset_ < fixed_len_page->valid_data_len()) {
-    DCHECK(fixed_len_page != NULL);
+    DCHECK(fixed_len_page != nullptr);
     Tuple* input_tuple =
         reinterpret_cast<Tuple*>(fixed_len_page->data() + 
fixed_len_page_offset_);
 
@@ -1091,14 +596,13 @@ void Sorter::Run::CopyVarLenDataConvertOffset(const 
vector<StringValue*>& string
 
 bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) {
   // We need to be careful to handle the case where var_len_pages_ is empty,
-  // e.g. if all strings are NULL.
+  // e.g. if all strings are nullptr.
   uint8_t* page_start =
-      var_len_pages_.empty() ? NULL : 
var_len_pages_[var_len_pages_index_].data();
+      var_len_pages_.empty() ? nullptr : 
var_len_pages_[var_len_pages_index_].data();
 
   const vector<SlotDescriptor*>& string_slots = 
sort_tuple_desc_->string_slots();
   int num_non_null_string_slots = 0;
-  for (int i = 0; i < string_slots.size(); ++i) {
-    SlotDescriptor* slot_desc = string_slots[i];
+  for (auto slot_desc : string_slots) {
     if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
     ++num_non_null_string_slots;
 
@@ -1130,9 +634,9 @@ bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) {
     } else {
       DCHECK_LE(page_offset + value->len, 
var_len_pages_[page_index].valid_data_len());
     }
-    // Calculate the address implied by the offset and assign it. May be NULL 
for
-    // zero-length strings if there are no pages in the run since page_start 
is NULL.
-    DCHECK(page_start != NULL || page_offset == 0);
+    // Calculate the address implied by the offset and assign it. May be 
nullptr for
+    // zero-length strings if there are no pages in the run since page_start 
is nullptr.
+    DCHECK(page_start != nullptr || page_offset == 0);
     value->ptr = reinterpret_cast<char*>(page_start + page_offset);
   }
   return true;
@@ -1150,8 +654,55 @@ int64_t Sorter::Run::TotalBytes() const {
   return total_bytes;
 }
 
+Status Sorter::Run::AddInputBatch(RowBatch* batch, int start_index, int* 
num_processed) {
+  DCHECK(initial_run_);
+  if (has_var_len_slots_) {
+    return AddBatchInternal<true, true>(batch, start_index, num_processed);
+  } else {
+    return AddBatchInternal<false, true>(batch, start_index, num_processed);
+  }
+}
+
+Status Sorter::Run::AddIntermediateBatch(
+    RowBatch* batch, int start_index, int* num_processed) {
+  DCHECK(!initial_run_);
+  if (has_var_len_slots_) {
+    return AddBatchInternal<true, false>(batch, start_index, num_processed);
+  } else {
+    return AddBatchInternal<false, false>(batch, start_index, num_processed);
+  }
+}
+
+void Sorter::Run::CleanupRuns(deque<Sorter::Run*>* runs) {
+  for (Run* run : *runs) {
+    run->CloseAllPages();
+  }
+  runs->clear();
+}
+
+int Sorter::Run::NumOpenPages(const vector<Sorter::Page>& pages) {
+  int count = 0;
+  for (const Page& page : pages) {
+    if (page.is_open()) ++count;
+  }
+  return count;
+}
+
+void Sorter::Run::DeleteAndClearPages(vector<Sorter::Page>* pages) {
+  for (Page& page : *pages) {
+    if (page.is_open()) page.Close(sorter_->buffer_pool_client_);
+  }
+  pages->clear();
+}
+
+Sorter::Run::~Run() {
+  DCHECK(fixed_len_pages_.empty());
+  DCHECK(var_len_pages_.empty());
+  DCHECK(!var_len_copy_page_.is_open());
+}
+
 Sorter::TupleIterator::TupleIterator(Sorter::Run* run, int64_t index)
-    : index_(index), tuple_(NULL) {
+    : index_(index), tuple_(nullptr) {
   DCHECK(run->is_finalized_);
   DCHECK_GE(index, 0);
   DCHECK_LE(index, run->num_tuples());
@@ -1179,45 +730,8 @@ Sorter::TupleIterator::TupleIterator(Sorter::Run* run, 
int64_t index)
   tuple_ = run->fixed_len_pages_[page_index_].data() + page_offset;
 }
 
-void Sorter::TupleIterator::Next(Sorter::Run* run, int tuple_size) {
-  DCHECK_LT(index_, run->num_tuples()) << "Can only advance one past end of 
run";
-  tuple_ += tuple_size;
-  ++index_;
-  if (UNLIKELY(index_ >= buffer_end_index_)) NextPage(run, tuple_size);
-}
-
-void Sorter::TupleIterator::NextPage(Sorter::Run* run, int tuple_size) {
-  // When moving after the last tuple, stay at the last page.
-  if (index_ >= run->num_tuples()) return;
-  ++page_index_;
-  DCHECK_LT(page_index_, run->fixed_len_pages_.size());
-  buffer_start_index_ = page_index_ * run->page_capacity_;
-  DCHECK_EQ(index_, buffer_start_index_);
-  buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
-  tuple_ = run->fixed_len_pages_[page_index_].data();
-}
-
-void Sorter::TupleIterator::Prev(Sorter::Run* run, int tuple_size) {
-  DCHECK_GE(index_, 0) << "Can only advance one before start of run";
-  tuple_ -= tuple_size;
-  --index_;
-  if (UNLIKELY(index_ < buffer_start_index_)) PrevPage(run, tuple_size);
-}
-
-void Sorter::TupleIterator::PrevPage(Sorter::Run* run, int tuple_size) {
-  // When moving before the first tuple, stay at the first page.
-  if (index_ < 0) return;
-  --page_index_;
-  DCHECK_GE(page_index_, 0);
-  buffer_start_index_ = page_index_ * run->page_capacity_;
-  buffer_end_index_ = buffer_start_index_ + run->page_capacity_;
-  DCHECK_EQ(index_, buffer_end_index_ - 1);
-  int last_tuple_page_offset = run->sort_tuple_size_ * (run->page_capacity_ - 
1);
-  tuple_ = run->fixed_len_pages_[page_index_].data() + last_tuple_page_offset;
-}
-
 Sorter::TupleSorter::TupleSorter(Sorter* parent, const TupleRowComparator& 
comp,
-    int64_t page_size, int tuple_size, RuntimeState* state)
+    int tuple_size, RuntimeState* state)
   : parent_(parent),
     tuple_size_(tuple_size),
     comparator_(comp),
@@ -1232,16 +746,6 @@ Sorter::TupleSorter::~TupleSorter() {
   delete[] swap_buffer_;
 }
 
-bool Sorter::TupleSorter::Less(const TupleRow* lhs, const TupleRow* rhs) {
-  --num_comparisons_till_free_;
-  DCHECK_GE(num_comparisons_till_free_, 0);
-  if (UNLIKELY(num_comparisons_till_free_ == 0)) {
-    parent_->expr_results_pool_.Clear();
-    num_comparisons_till_free_ = state_->batch_size();
-  }
-  return comparator_.Less(lhs, rhs);
-}
-
 Status Sorter::TupleSorter::Sort(Run* run) {
   DCHECK(run->is_finalized());
   DCHECK(!run->is_sorted());
@@ -1251,184 +755,6 @@ Status Sorter::TupleSorter::Sort(Run* run) {
   return Status::OK();
 }
 
-// Sort the sequence of tuples from [begin, last).
-// Begin with a sorted sequence of size 1 [begin, begin+1).
-// During each pass of the outermost loop, add the next tuple (at position 
'i') to
-// the sorted sequence by comparing it to each element of the sorted sequence
-// (reverse order) to find its correct place in the sorted sequence, copying 
tuples
-// along the way.
-Status Sorter::TupleSorter::InsertionSort(const TupleIterator& begin,
-    const TupleIterator& end) {
-  DCHECK_LT(begin.index(), end.index());
-
-  // Hoist member variable lookups out of loop to avoid extra loads inside 
loop.
-  Run* run = run_;
-  int tuple_size = tuple_size_;
-  uint8_t* temp_tuple_buffer = temp_tuple_buffer_;
-
-  TupleIterator insert_iter = begin;
-  insert_iter.Next(run, tuple_size);
-  for (; insert_iter.index() < end.index(); insert_iter.Next(run, tuple_size)) 
{
-    // insert_iter points to the tuple after the currently sorted sequence 
that must
-    // be inserted into the sorted sequence. Copy to temp_tuple_buffer_ since 
it may be
-    // overwritten by the one at position 'insert_iter - 1'
-    memcpy(temp_tuple_buffer, insert_iter.tuple(), tuple_size);
-
-    // 'iter' points to the tuple that temp_tuple_buffer will be compared to.
-    // 'copy_to' is the where iter should be copied to if it is >= 
temp_tuple_buffer.
-    // copy_to always to the next row after 'iter'
-    TupleIterator iter = insert_iter;
-    iter.Prev(run, tuple_size);
-    Tuple* copy_to = insert_iter.tuple();
-    while (Less(reinterpret_cast<TupleRow*>(&temp_tuple_buffer), iter.row())) {
-      memcpy(copy_to, iter.tuple(), tuple_size);
-      copy_to = iter.tuple();
-      // Break if 'iter' has reached the first row, meaning that the temp row
-      // will be inserted in position 'begin'
-      if (iter.index() <= begin.index()) break;
-      iter.Prev(run, tuple_size);
-    }
-
-    memcpy(copy_to, temp_tuple_buffer, tuple_size);
-  }
-  RETURN_IF_CANCELLED(state_);
-  RETURN_IF_ERROR(state_->GetQueryStatus());
-  return Status::OK();
-}
-
-Status Sorter::TupleSorter::Partition(TupleIterator begin,
-    TupleIterator end, const Tuple* pivot, TupleIterator* cut) {
-  // Hoist member variable lookups out of loop to avoid extra loads inside 
loop.
-  Run* run = run_;
-  int tuple_size = tuple_size_;
-  Tuple* temp_tuple = reinterpret_cast<Tuple*>(temp_tuple_buffer_);
-  Tuple* swap_tuple = reinterpret_cast<Tuple*>(swap_buffer_);
-
-  // Copy pivot into temp_tuple since it points to a tuple within [begin, end).
-  DCHECK(temp_tuple != NULL);
-  DCHECK(pivot != NULL);
-  memcpy(temp_tuple, pivot, tuple_size);
-
-  TupleIterator left = begin;
-  TupleIterator right = end;
-  right.Prev(run, tuple_size); // Set 'right' to the last tuple in range.
-  while (true) {
-    // Search for the first and last out-of-place elements, and swap them.
-    while (Less(left.row(), reinterpret_cast<TupleRow*>(&temp_tuple))) {
-      left.Next(run, tuple_size);
-    }
-    while (Less(reinterpret_cast<TupleRow*>(&temp_tuple), right.row())) {
-      right.Prev(run, tuple_size);
-    }
-
-    if (left.index() >= right.index()) break;
-    // Swap first and last tuples.
-    Swap(left.tuple(), right.tuple(), swap_tuple, tuple_size);
-
-    left.Next(run, tuple_size);
-    right.Prev(run, tuple_size);
-
-    RETURN_IF_CANCELLED(state_);
-    RETURN_IF_ERROR(state_->GetQueryStatus());
-  }
-
-  *cut = left;
-  return Status::OK();
-}
-
-Status Sorter::TupleSorter::SortHelper(TupleIterator begin, TupleIterator end) 
{
-  // Use insertion sort for smaller sequences.
-  while (end.index() - begin.index() > INSERTION_THRESHOLD) {
-    // Select a pivot and call Partition() to split the tuples in [begin, end) 
into two
-    // groups (<= pivot and >= pivot) in-place. 'cut' is the index of the 
first tuple in
-    // the second group.
-    Tuple* pivot = SelectPivot(begin, end);
-    TupleIterator cut;
-    RETURN_IF_ERROR(Partition(begin, end, pivot, &cut));
-
-    // Recurse on the smaller partition. This limits stack size to log(n) 
stack frames.
-    if (cut.index() - begin.index() < end.index() - cut.index()) {
-      // Left partition is smaller.
-      RETURN_IF_ERROR(SortHelper(begin, cut));
-      begin = cut;
-    } else {
-      // Right partition is equal or smaller.
-      RETURN_IF_ERROR(SortHelper(cut, end));
-      end = cut;
-    }
-  }
-
-  if (begin.index() < end.index()) RETURN_IF_ERROR(InsertionSort(begin, end));
-  return Status::OK();
-}
-
-Tuple* Sorter::TupleSorter::SelectPivot(TupleIterator begin, TupleIterator 
end) {
-  // Select the median of three random tuples. The random selection avoids 
pathological
-  // behaviour associated with techniques that pick a fixed element (e.g. 
picking
-  // first/last/middle element) and taking the median tends to help us select 
better
-  // pivots that more evenly split the input range. This method makes 
selection of
-  // bad pivots very infrequent.
-  //
-  // To illustrate, if we define a bad pivot as one in the lower or upper 10% 
of values,
-  // then the median of three is a bad pivot only if all three 
randomly-selected values
-  // are in the lower or upper 10%. The probability of that is 0.2 * 0.2 * 0.2 
= 0.008:
-  // less than 1%. Since selection is random each time, the chance of 
repeatedly picking
-  // bad pivots decreases exponentialy and becomes negligibly small after a few
-  // iterations.
-  Tuple* tuples[3];
-  for (int i = 0; i < 3; ++i) {
-    int64_t index = uniform_int<int64_t>(begin.index(), end.index() - 1)(rng_);
-    TupleIterator iter(run_, index);
-    DCHECK(iter.tuple() != NULL);
-    tuples[i] = iter.tuple();
-  }
-
-  return MedianOfThree(tuples[0], tuples[1], tuples[2]);
-}
-
-Tuple* Sorter::TupleSorter::MedianOfThree(Tuple* t1, Tuple* t2, Tuple* t3) {
-  TupleRow* tr1 = reinterpret_cast<TupleRow*>(&t1);
-  TupleRow* tr2 = reinterpret_cast<TupleRow*>(&t2);
-  TupleRow* tr3 = reinterpret_cast<TupleRow*>(&t3);
-
-  bool t1_lt_t2 = Less(tr1, tr2);
-  bool t2_lt_t3 = Less(tr2, tr3);
-  bool t1_lt_t3 = Less(tr1, tr3);
-
-  if (t1_lt_t2) {
-    // t1 < t2
-    if (t2_lt_t3) {
-      // t1 < t2 < t3
-      return t2;
-    } else if (t1_lt_t3) {
-      // t1 < t3 <= t2
-      return t3;
-    } else {
-      // t3 <= t1 < t2
-      return t1;
-    }
-  } else {
-    // t2 <= t1
-    if (t1_lt_t3) {
-      // t2 <= t1 < t3
-      return t1;
-    } else if (t2_lt_t3) {
-      // t2 < t3 <= t1
-      return t3;
-    } else {
-      // t3 <= t2 <= t1
-      return t2;
-    }
-  }
-}
-
-inline void Sorter::TupleSorter::Swap(Tuple* left, Tuple* right, Tuple* 
swap_tuple,
-    int tuple_size) {
-  memcpy(swap_tuple, left, tuple_size);
-  memcpy(left, right, tuple_size);
-  memcpy(right, swap_tuple, tuple_size);
-}
-
 Sorter::Sorter(const std::vector<ScalarExpr*>& ordering_exprs,
       const std::vector<bool>& is_asc_order, const std::vector<bool>& 
nulls_first,
     const vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* 
output_row_desc,
@@ -1440,7 +766,7 @@ Sorter::Sorter(const std::vector<ScalarExpr*>& 
ordering_exprs,
     expr_perm_pool_(mem_tracker),
     expr_results_pool_(mem_tracker),
     compare_less_than_(ordering_exprs, is_asc_order, nulls_first),
-    in_mem_tuple_sorter_(NULL),
+    in_mem_tuple_sorter_(nullptr),
     buffer_pool_client_(buffer_pool_client),
     page_len_(page_len),
     has_var_len_slots_(false),
@@ -1448,24 +774,24 @@ Sorter::Sorter(const std::vector<ScalarExpr*>& 
ordering_exprs,
     mem_tracker_(mem_tracker),
     output_row_desc_(output_row_desc),
     enable_spilling_(enable_spilling),
-    unsorted_run_(NULL),
-    merge_output_run_(NULL),
+    unsorted_run_(nullptr),
+    merge_output_run_(nullptr),
     profile_(profile),
-    initial_runs_counter_(NULL),
-    num_merges_counter_(NULL),
-    in_mem_sort_timer_(NULL),
-    sorted_data_size_(NULL),
-    run_sizes_(NULL) {}
+    initial_runs_counter_(nullptr),
+    num_merges_counter_(nullptr),
+    in_mem_sort_timer_(nullptr),
+    sorted_data_size_(nullptr),
+    run_sizes_(nullptr) {}
 
 Sorter::~Sorter() {
   DCHECK(sorted_runs_.empty());
   DCHECK(merging_runs_.empty());
-  DCHECK(unsorted_run_ == NULL);
-  DCHECK(merge_output_run_ == NULL);
+  DCHECK(unsorted_run_ == nullptr);
+  DCHECK(merge_output_run_ == nullptr);
 }
 
 Status Sorter::Prepare(ObjectPool* obj_pool) {
-  DCHECK(in_mem_tuple_sorter_ == NULL) << "Already prepared";
+  DCHECK(in_mem_tuple_sorter_ == nullptr) << "Already prepared";
   // Page byte offsets are packed into uint32_t values, which limits the 
supported
   // page size.
   if (page_len_ > numeric_limits<uint32_t>::max()) {
@@ -1482,8 +808,8 @@ Status Sorter::Prepare(ObjectPool* obj_pool) {
         PrettyPrinter::Print(state_->query_options().max_row_size, 
TUnit::BYTES));
   }
   has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots();
-  in_mem_tuple_sorter_.reset(new TupleSorter(this, compare_less_than_, 
page_len_,
-      sort_tuple_desc->byte_size(), state_));
+  in_mem_tuple_sorter_.reset(
+      new TupleSorter(this, compare_less_than_, sort_tuple_desc->byte_size(), 
state_));
 
   if (enable_spilling_) {
     initial_runs_counter_ = ADD_COUNTER(profile_, "InitialRunsCreated", 
TUnit::UNIT);
@@ -1506,8 +832,8 @@ Status Sorter::Codegen(RuntimeState* state) {
 }
 
 Status Sorter::Open() {
-  DCHECK(in_mem_tuple_sorter_ != NULL) << "Not prepared";
-  DCHECK(unsorted_run_ == NULL) << "Already open";
+  DCHECK(in_mem_tuple_sorter_ != nullptr) << "Not prepared";
+  DCHECK(unsorted_run_ == nullptr) << "Already open";
   RETURN_IF_ERROR(compare_less_than_.Open(&obj_pool_, state_, &expr_perm_pool_,
       &expr_results_pool_));
   TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0];
@@ -1527,8 +853,8 @@ int64_t Sorter::ComputeMinReservation() const {
 }
 
 Status Sorter::AddBatch(RowBatch* batch) {
-  DCHECK(unsorted_run_ != NULL);
-  DCHECK(batch != NULL);
+  DCHECK(unsorted_run_ != nullptr);
+  DCHECK(batch != nullptr);
   DCHECK(enable_spilling_);
   int num_processed = 0;
   int cur_batch_index = 0;
@@ -1594,7 +920,7 @@ Status Sorter::GetNext(RowBatch* output_batch, bool* eos) {
 }
 
 void Sorter::Reset() {
-  DCHECK(unsorted_run_ == NULL) << "Cannot Reset() before calling InputDone()";
+  DCHECK(unsorted_run_ == nullptr) << "Cannot Reset() before calling 
InputDone()";
   merger_.reset();
   // Free resources from the current runs.
   CleanupAllRuns();
@@ -1613,10 +939,10 @@ void Sorter::Close(RuntimeState* state) {
 void Sorter::CleanupAllRuns() {
   Run::CleanupRuns(&sorted_runs_);
   Run::CleanupRuns(&merging_runs_);
-  if (unsorted_run_ != NULL) unsorted_run_->CloseAllPages();
-  unsorted_run_ = NULL;
-  if (merge_output_run_ != NULL) merge_output_run_->CloseAllPages();
-  merge_output_run_ = NULL;
+  if (unsorted_run_ != nullptr) unsorted_run_->CloseAllPages();
+  unsorted_run_ = nullptr;
+  if (merge_output_run_ != nullptr) merge_output_run_->CloseAllPages();
+  merge_output_run_ = nullptr;
   run_pool_.Clear();
 }
 
@@ -1630,7 +956,7 @@ Status Sorter::SortCurrentInputRun() {
   sorted_runs_.push_back(unsorted_run_);
   sorted_data_size_->Add(unsorted_run_->TotalBytes());
   run_sizes_->UpdateCounter(unsorted_run_->num_tuples());
-  unsorted_run_ = NULL;
+  unsorted_run_ = nullptr;
 
   RETURN_IF_CANCELLED(state_);
   return Status::OK();
@@ -1714,7 +1040,7 @@ Status Sorter::MergeIntermediateRuns() {
   while (true) {
     int num_of_runs_to_merge = GetNumOfRunsForMerge();
 
-    DCHECK(merge_output_run_ == NULL) << "Should have finished previous 
merge.";
+    DCHECK(merge_output_run_ == nullptr) << "Should have finished previous 
merge.";
     RETURN_IF_ERROR(CreateMerger(num_of_runs_to_merge));
 
     // If CreateMerger() consumed all the sorted runs, we have set up the 
final merge.
@@ -1725,7 +1051,7 @@ Status Sorter::MergeIntermediateRuns() {
     RETURN_IF_ERROR(merge_output_run_->Init());
     RETURN_IF_ERROR(ExecuteIntermediateMerge(merge_output_run_));
     sorted_runs_.push_back(merge_output_run_);
-    merge_output_run_ = NULL;
+    merge_output_run_ = nullptr;
   }
   return Status::OK();
 }
@@ -1750,7 +1076,7 @@ Status Sorter::CreateMerger(int num_runs) {
 
     // Run::GetNextBatch() is used by the merger to retrieve a batch of rows 
to merge
     // from this run.
-    merge_runs.push_back(bind<Status>(mem_fn(&Run::GetNextBatch), run, _1));
+    merge_runs.emplace_back(bind<Status>(mem_fn(&Run::GetNextBatch), run, _1));
     sorted_runs_.pop_front();
     merging_runs_.push_back(run);
   }

Reply via email to