IMPALA-6957: calc thread resource requirement in planner

This only factors in fragment execution threads. E.g. this does *not*
try to account for the number of threads on the old Thrift RPC
code path if that is enabled.

This is loosely related to the old VCores estimate, but is different in
that it:
* Directly ties into the notion of required threads in
  ThreadResourceMgr.
* Is a strict upper bound on the number of such threads, rather than
  an estimate.

Does not include "optional" threads. ThreadResourceMgr in the backend
bounds the number of "optional" threads per impalad, so the number of
execution threads on a backend is limited by

  sum(required threads per query) +
      CpuInfo::num_cores() * FLAGS_num_threads_per_core

DCHECKS in the backend enforce that the calculation is correct. They
were actually hit in KuduScanNode because of some races in thread
management leading to multiple "required" threads running. Now the
first thread in the multithreaded scans never exits, which means
that it's always safe for any of the other threads to exit early,
which simplifies the logic a lot.

Testing:
Updated planner tests.

Ran core tests.

Change-Id: I982837ef883457fa4d2adc3bdbdc727353469140
Reviewed-on: http://gerrit.cloudera.org:8080/10256
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e12ee485
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e12ee485
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e12ee485

Branch: refs/heads/master
Commit: e12ee485cf4c77203b144c053ee167509cc39374
Parents: 6ca87e4
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Mon Apr 30 16:34:47 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Sat May 12 01:43:37 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node.cc                   |   45 +-
 be/src/exec/hdfs-scan-node.h                    |    5 +-
 be/src/exec/kudu-scan-node.cc                   |   33 +-
 be/src/exec/kudu-scan-node.h                    |    5 +-
 be/src/runtime/coordinator-backend-state.cc     |   11 +-
 be/src/runtime/query-state.cc                   |    4 +-
 be/src/runtime/runtime-state.cc                 |    4 +
 be/src/runtime/thread-resource-mgr.cc           |   12 +-
 be/src/runtime/thread-resource-mgr.h            |   31 +-
 be/src/scheduling/admission-controller.cc       |   44 +-
 be/src/scheduling/query-schedule.h              |    4 +-
 be/src/scheduling/scheduler.cc                  |   16 +-
 common/thrift/Frontend.thrift                   |    9 +-
 common/thrift/ImpalaInternalService.thrift      |    6 +-
 common/thrift/Planner.thrift                    |   12 +-
 .../apache/impala/planner/AggregationNode.java  |   10 +-
 .../apache/impala/planner/AnalyticEvalNode.java |    4 +-
 .../org/apache/impala/planner/HashJoinNode.java |    4 +-
 .../org/apache/impala/planner/HdfsScanNode.java |   48 +-
 .../org/apache/impala/planner/KuduScanNode.java |    5 +-
 .../org/apache/impala/planner/PlanFragment.java |   44 +-
 .../java/org/apache/impala/planner/Planner.java |   30 +-
 .../apache/impala/planner/ResourceProfile.java  |   71 +-
 .../impala/planner/ResourceProfileBuilder.java  |   25 +-
 .../org/apache/impala/planner/SortNode.java     |    8 +-
 .../queries/PlannerTest/constant-folding.test   |  116 +-
 .../queries/PlannerTest/disable-codegen.test    |   20 +-
 .../PlannerTest/fk-pk-join-detection.test       |  142 +-
 .../queries/PlannerTest/kudu-selectivity.test   |  102 +-
 .../queries/PlannerTest/max-row-size.test       |  152 +-
 .../PlannerTest/min-max-runtime-filters.test    |   52 +-
 .../queries/PlannerTest/mt-dop-validation.test  |  146 +-
 .../queries/PlannerTest/parquet-filtering.test  |  150 +-
 .../queries/PlannerTest/partition-pruning.test  |    6 +-
 .../PlannerTest/resource-requirements.test      | 1904 +++++++++---------
 .../PlannerTest/sort-expr-materialization.test  |   78 +-
 .../PlannerTest/spillable-buffer-sizing.test    |  404 ++--
 .../queries/PlannerTest/tablesample.test        |   70 +-
 .../queries/PlannerTest/union.test              |   28 +-
 .../queries/QueryTest/explain-level0.test       |    2 +-
 .../queries/QueryTest/explain-level1.test       |    2 +-
 .../queries/QueryTest/explain-level2.test       |   20 +-
 .../queries/QueryTest/explain-level3.test       |   24 +-
 43 files changed, 1977 insertions(+), 1931 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 3f5baab..e434731 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -338,25 +338,11 @@ void HdfsScanNode::ScannerThread(bool first_thread, 
int64_t scanner_thread_reser
   while (!done_) {
     // Prevent memory accumulating across scan ranges.
     expr_results_pool.Clear();
-    {
-      // Check if we have enough resources (thread token and memory) to keep 
using
-      // this thread.
-      unique_lock<mutex> l(lock_);
-      if (active_scanner_thread_counter_.value() > 1) {
-        if (runtime_state_->resource_pool()->optional_exceeded()) {
-          // We can't break here. We need to update the counter with the lock 
held or else
-          // all threads might see active_scanner_thread_counter_.value > 1
-          COUNTER_ADD(&active_scanner_thread_counter_, -1);
-          // Unlock before releasing the thread token to avoid deadlock in
-          // ThreadTokenAvailableCb().
-          l.unlock();
-          goto exit;
-        }
-      } else {
-        // If this is the only scanner thread, it should keep running 
regardless
-        // of resource constraints.
-      }
-    }
+    // Check if we have enough thread tokens to keep using this optional 
thread. This
+    // check is racy: multiple threads may notice that the optional tokens are 
exceeded
+    // and shut themselves down. If we shut down too many and there are more 
optional
+    // tokens, ThreadAvailableCb() will be invoked again.
+    if (!first_thread && runtime_state_->resource_pool()->optional_exceeded()) 
break;
 
     bool unused = false;
     // Wake up every SCANNER_THREAD_COUNTERS to yield scanner threads back if 
unused, or
@@ -367,9 +353,6 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t 
scanner_thread_reser
     // We don't want num_unqueued_files_ to go to zero between the return from
     // StartNextScanRange() and the check for when all ranges are complete.
     int num_unqueued_files = num_unqueued_files_.Load();
-    // TODO: the Load() acts as an acquire barrier.  Is this needed? (i.e. any 
earlier
-    // stores that need to complete?)
-    AtomicUtil::MemoryBarrier();
     ScanRange* scan_range;
     Status status = StartNextScanRange(&scanner_thread_reservation, 
&scan_range);
     if (status.ok() && scan_range != nullptr) {
@@ -401,9 +384,7 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t 
scanner_thread_reser
       break;
     }
 
-    if (scan_range == NULL && num_unqueued_files == 0) {
-      // TODO: Based on the usage pattern of all_ranges_started_, it looks 
like it is not
-      // needed to acquire the lock in x86.
+    if (scan_range == nullptr && num_unqueued_files == 0) {
       unique_lock<mutex> l(lock_);
       // All ranges have been queued and DiskIoMgr has no more new ranges for 
this scan
       // node to process. This means that every range is either done or being 
processed by
@@ -412,30 +393,28 @@ void HdfsScanNode::ScannerThread(bool first_thread, 
int64_t scanner_thread_reser
       break;
     }
   }
-  COUNTER_ADD(&active_scanner_thread_counter_, -1);
 
-exit:
   {
     unique_lock<mutex> l(lock_);
     ReturnReservationFromScannerThread(l, scanner_thread_reservation);
   }
-  runtime_state_->resource_pool()->ReleaseThreadToken(first_thread);
   for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
   filter_mem_pool.FreeAll();
   expr_results_pool.FreeAll();
+  runtime_state_->resource_pool()->ReleaseThreadToken(first_thread);
+  COUNTER_ADD(&active_scanner_thread_counter_, -1);
 }
 
 Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     MemPool* expr_results_pool, ScanRange* scan_range,
     int64_t* scanner_thread_reservation) {
-  DCHECK(scan_range != NULL);
-
+  DCHECK(scan_range != nullptr);
   ScanRangeMetadata* metadata = 
static_cast<ScanRangeMetadata*>(scan_range->meta_data());
   int64_t partition_id = metadata->partition_id;
   HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
-  DCHECK(partition != NULL) << "table_id=" << hdfs_table_->id()
-                            << " partition_id=" << partition_id
-                            << "\n" << 
PrintThrift(runtime_state_->instance_ctx());
+  DCHECK(partition != nullptr) << "table_id=" << hdfs_table_->id()
+                               << " partition_id=" << partition_id
+                               << "\n" << 
PrintThrift(runtime_state_->instance_ctx());
 
   if (!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, 
filter_ctxs)) {
     // Avoid leaking unread buffers in scan_range.

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 48684bd..53d5c71 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -170,7 +170,10 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// processed from the IoMgr and then processes the entire range end to end.
   /// This thread terminates when all scan ranges are complete or an error 
occurred.
   /// 'first_thread' is true if this was the first scanner thread to start and
-  /// it acquired a "required" thread token from ThreadResourceMgr.
+  /// it acquired a "required" thread token from ThreadResourceMgr. The first 
thread
+  /// will continue running until 'done_' is true or an error is encountered. 
Other
+  /// threads may terminate early if the optional tokens in
+  /// runtime_state_->resource_pool() are exceeded.
   /// The caller must have reserved 'scanner_thread_reservation' bytes of 
memory for
   /// this thread. Before returning, this function releases the reservation 
with
   /// ReturnReservationFromScannerThread().

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 16e0633..9ae5710 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -143,7 +143,7 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourcePool* 
pool) {
     unique_lock<mutex> lock(lock_);
     // All done or all tokens are assigned.
     if (done_ || !HasScanToken()) break;
-    bool first_thread = active_scanner_thread_counter_.value() == 0;
+    bool first_thread = num_active_scanners_ == 0;
 
     // Check if we can get a token. We need at least one thread to run.
     if (first_thread) {
@@ -211,12 +211,9 @@ Status KuduScanNode::ProcessScanToken(KuduScanner* 
scanner, const string& scan_t
 
 void KuduScanNode::RunScannerThread(
     bool first_thread, const string& name, const string* initial_token) {
-  DCHECK(initial_token != NULL);
+  DCHECK(initial_token != nullptr);
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
-  // Set to true if this thread observes that the number of optional threads 
has been
-  // exceeded and is exiting early.
-  bool optional_thread_exiting = false;
   KuduScanner scanner(this, runtime_state_);
 
   const string* scan_token = initial_token;
@@ -224,21 +221,16 @@ void KuduScanNode::RunScannerThread(
   if (status.ok()) {
     // Here, even though a read of 'done_' may conflict with a write to it,
     // ProcessScanToken() will return early, as will GetNextScanToken().
-    while (!done_ && scan_token != NULL) {
+    while (!done_ && scan_token != nullptr) {
       status = ProcessScanToken(&scanner, *scan_token);
       if (!status.ok()) break;
 
-      // Check if the number of optional threads has been exceeded.
-      if (runtime_state_->resource_pool()->optional_exceeded()) {
-        unique_lock<mutex> l(lock_);
-        // Don't exit if this is the last thread. Otherwise, the scan will 
indicate it's
-        // done before all scan tokens have been processed.
-        if (num_active_scanners_ > 1) {
-          --num_active_scanners_;
-          optional_thread_exiting = true;
-          break;
-        }
-      }
+      // Check if we have enough thread tokens to keep using this optional 
thread. This
+      // check is racy: multiple threads may notice that the optional tokens 
are exceeded
+      // and shut themselves down. If we shut down too many and there are more 
optional
+      // tokens, ThreadAvailableCb() will be invoked again.
+      if (!first_thread && 
runtime_state_->resource_pool()->optional_exceeded()) break;
+
       unique_lock<mutex> l(lock_);
       if (!done_) {
         scan_token = GetNextScanToken();
@@ -255,12 +247,7 @@ void KuduScanNode::RunScannerThread(
       status_ = status;
       SetDoneInternal();
     }
-    // Decrement num_active_scanners_ unless handling the case of an early 
exit when
-    // optional threads have been exceeded, in which case it already was 
decremented.
-    if (!optional_thread_exiting) --num_active_scanners_;
-    if (num_active_scanners_ == 0) {
-      SetDoneInternal();
-    }
+    if (--num_active_scanners_ == 0) SetDoneInternal();
   }
 
   // lock_ is released before calling ThreadResourceMgr::ReleaseThreadToken() 
which

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/be/src/exec/kudu-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index 2f8d808..651c0f5 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -97,7 +97,10 @@ class KuduScanNode : public KuduScanNodeBase {
   /// until there are none left, an error occurs, or the limit is reached. The 
caller must
   /// have acquired a thread token from the ThreadResourceMgr for this thread. 
The token
   /// is released before this function returns. 'first_thread' is true if this 
was the
-  /// first scanner thread to start and it acquired a "required" thread token.
+  /// first scanner thread to start and it acquired a "required" thread token. 
The first
+  /// thread will continue running until 'done_' is true or an error is 
encountered. Other
+  /// threads may terminate early if the optional tokens in
+  /// runtime_state_->resource_pool() are exceeded.
   void RunScannerThread(
       bool first_thread, const std::string& name, const std::string* 
initial_token);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index 3099412..b9b2f15 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -75,14 +75,15 @@ void Coordinator::BackendState::Init(
   }
 }
 
-void Coordinator::BackendState::SetRpcParams(
-    const DebugOptions& debug_options, const FilterRoutingTable& 
filter_routing_table,
+void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
+    const FilterRoutingTable& filter_routing_table,
     TExecQueryFInstancesParams* rpc_params) {
   rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
   rpc_params->__set_coord_state_idx(state_idx_);
-  
rpc_params->__set_min_reservation_bytes(backend_exec_params_->min_reservation_bytes);
-  rpc_params->__set_initial_reservation_total_claims(
-      backend_exec_params_->initial_reservation_total_claims);
+  rpc_params->__set_min_mem_reservation_bytes(
+      backend_exec_params_->min_mem_reservation_bytes);
+  rpc_params->__set_initial_mem_reservation_total_claims(
+      backend_exec_params_->initial_mem_reservation_total_claims);
 
   // set fragment_ctxs and fragment_instance_ctxs
   rpc_params->__isset.fragment_ctxs = true;

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 22616ed..a06b8e4 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -142,9 +142,9 @@ Status QueryState::Init(const TExecQueryFInstancesParams& 
rpc_params) {
   // to handle releasing it if a later step fails.
   initial_reservations_ = obj_pool_.Add(new InitialReservations(&obj_pool_,
       buffer_reservation_, query_mem_tracker_,
-      rpc_params.initial_reservation_total_claims));
+      rpc_params.initial_mem_reservation_total_claims));
   RETURN_IF_ERROR(
-      initial_reservations_->Init(query_id(), 
rpc_params.min_reservation_bytes));
+      initial_reservations_->Init(query_id(), 
rpc_params.min_mem_reservation_bytes));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index c8776ac..15b3354 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -109,6 +109,10 @@ void RuntimeState::Init() {
   // Register with the thread mgr
   resource_pool_ = exec_env_->thread_mgr()->CreatePool();
   DCHECK(resource_pool_ != nullptr);
+  if (fragment_ctx_ != nullptr) {
+    // Ensure that the planner correctly determined the required threads.
+    
resource_pool_->set_max_required_threads(fragment_ctx_->fragment.thread_reservation);
+  }
 
   total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), 
"TotalThreads");
   total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), 
"TotalStorageWaitTime");

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/be/src/runtime/thread-resource-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr.cc 
b/be/src/runtime/thread-resource-mgr.cc
index 86ea794..a328d5e 100644
--- a/be/src/runtime/thread-resource-mgr.cc
+++ b/be/src/runtime/thread-resource-mgr.cc
@@ -125,10 +125,10 @@ void 
ThreadResourceMgr::UpdatePoolQuotas(ThreadResourcePool* new_pool) {
 bool ThreadResourcePool::TryAcquireThreadToken() {
   while (true) {
     int64_t previous_num_threads = num_threads_.Load();
-    int64_t new_optional_threads = (previous_num_threads >> 32) + 1;
-    int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
+    int64_t new_optional_threads = (previous_num_threads >> OPTIONAL_SHIFT) + 
1;
+    int64_t new_required_threads = previous_num_threads & REQUIRED_MASK;
     if (new_optional_threads + new_required_threads > quota()) return false;
-    int64_t new_value = new_optional_threads << 32 | new_required_threads;
+    int64_t new_value = new_optional_threads << OPTIONAL_SHIFT | 
new_required_threads;
     // Atomically swap the new value if no one updated num_threads_.  We do not
     // care about the ABA problem here.
     if (num_threads_.CompareAndSwap(previous_num_threads, new_value)) return 
true;
@@ -144,9 +144,9 @@ void ThreadResourcePool::ReleaseThreadToken(
     DCHECK_GT(num_optional_threads(), 0);
     while (true) {
       int64_t previous_num_threads = num_threads_.Load();
-      int64_t new_optional_threads = (previous_num_threads >> 32) - 1;
-      int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
-      int64_t new_value = new_optional_threads << 32 | new_required_threads;
+      int64_t new_optional_threads = (previous_num_threads >> OPTIONAL_SHIFT) 
- 1;
+      int64_t new_required_threads = previous_num_threads & REQUIRED_MASK;
+      int64_t new_value = new_optional_threads << OPTIONAL_SHIFT | 
new_required_threads;
       if (num_threads_.CompareAndSwap(previous_num_threads, new_value)) break;
     }
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/be/src/runtime/thread-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr.h 
b/be/src/runtime/thread-resource-mgr.h
index e6c73a6..7e63ae1 100644
--- a/be/src/runtime/thread-resource-mgr.h
+++ b/be/src/runtime/thread-resource-mgr.h
@@ -122,7 +122,11 @@ class ThreadResourcePool {
   /// Acquire a thread for the pool. This will always succeed; the pool will 
go over the
   /// quota if needed. Pools should use this API to reserve threads they need 
in order to
   /// make progress.
-  void AcquireThreadToken() { num_threads_.Add(1); }
+  void AcquireThreadToken() {
+    int64_t num_threads = num_threads_.Add(1);
+    int64_t num_required = num_threads & REQUIRED_MASK;
+    DCHECK_LE(num_required, max_required_threads_);
+  }
 
   /// Try to acquire a thread for this pool. If the pool is at the quota, this 
will
   /// return false and the pool should not run. Pools should use this API for 
resources
@@ -149,11 +153,11 @@ class ThreadResourcePool {
   void RemoveThreadAvailableCb(int id);
 
   /// Returns the number of threads that are from AcquireThreadToken.
-  int num_required_threads() const { return num_threads_.Load() & 0xFFFFFFFF; }
+  int num_required_threads() const { return num_threads_.Load() & 
REQUIRED_MASK; }
 
   /// Returns the number of thread resources returned by successful calls
   /// to TryAcquireThreadToken.
-  int num_optional_threads() const { return num_threads_.Load() >> 32; }
+  int num_optional_threads() const { return num_threads_.Load() >> 
OPTIONAL_SHIFT; }
 
   /// Returns the total number of thread resources for this pool
   /// (i.e. num_optional_threads + num_required_threads).
@@ -165,8 +169,8 @@ class ThreadResourcePool {
   bool optional_exceeded() {
     // Cache this so optional/required are computed based on the same value.
     int64_t num_threads = num_threads_.Load();
-    int64_t optional_threads = num_threads >> 32;
-    int64_t required_threads = num_threads & 0xFFFFFFFF;
+    int64_t optional_threads = num_threads >> OPTIONAL_SHIFT;
+    int64_t required_threads = num_threads & REQUIRED_MASK;
     return optional_threads + required_threads > quota();
   }
 
@@ -179,9 +183,22 @@ class ThreadResourcePool {
   /// number of registered resource pools.
   int quota() const { return parent_->per_pool_quota_.Load(); }
 
+  /// Set the maximum number of required threads that will be running at one 
time.
+  /// The caller should not create more required threads than this, otherwise 
this
+  /// will DCHECK. Not thread-safe.
+  void set_max_required_threads(int max_required_threads) {
+    max_required_threads_ = max_required_threads;
+  }
+
  private:
   friend class ThreadResourceMgr;
 
+  /// Mask to extract required threads from 'num_threads_'.
+  static constexpr int64_t REQUIRED_MASK = 0xFFFFFFFF;
+
+  /// Shift to extract optional threads from 'num_threads_'.
+  static constexpr int OPTIONAL_SHIFT = 32;
+
   ThreadResourcePool(ThreadResourceMgr* parent);
 
   /// Invoke registered callbacks in round-robin manner until the quota is 
exhausted.
@@ -190,6 +207,10 @@ class ThreadResourcePool {
   /// The parent resource manager. Set to NULL when unregistered.
   ThreadResourceMgr* parent_;
 
+  /// Maximum number of required threads that should be running at one time. 
DCHECKs
+  /// if this is exceeded.
+  int64_t max_required_threads_ = std::numeric_limits<int32_t>::max();
+
   /// A single 64 bit value to store both the number of optional and required 
threads.
   /// This is combined to allow atomic compare-and-swap of both fields. The 
number of
   /// required threads is the lower 32 bits and the number of optional threads 
is the

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index 48f5e3b..eef18ef 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -403,35 +403,35 @@ bool 
AdmissionController::RejectImmediately(QuerySchedule* schedule,
   // the checks isn't particularly important, though some thought was given to 
ordering
   // them in a way that might make the sense for a user.
 
-  // Compute the max (over all backends) min_reservation_bytes and the cluster 
total
-  // (across all backends) min_reservation_bytes.
-  int64_t max_min_reservation_bytes = -1;
-  int64_t cluster_min_reservation_bytes = 0;
-  for (const auto& e: schedule->per_backend_exec_params()) {
-    cluster_min_reservation_bytes += e.second.min_reservation_bytes;
-    if (e.second.min_reservation_bytes > max_min_reservation_bytes) {
-      max_min_reservation_bytes = e.second.min_reservation_bytes;
+  // Compute the max (over all backends) min_mem_reservation_bytes and the 
cluster total
+  // (across all backends) min_mem_reservation_bytes.
+  int64_t max_min_mem_reservation_bytes = -1;
+  int64_t cluster_min_mem_reservation_bytes = 0;
+  for (const auto& e : schedule->per_backend_exec_params()) {
+    cluster_min_mem_reservation_bytes += e.second.min_mem_reservation_bytes;
+    if (e.second.min_mem_reservation_bytes > max_min_mem_reservation_bytes) {
+      max_min_mem_reservation_bytes = e.second.min_mem_reservation_bytes;
     }
   }
 
   // Checks related to the min buffer reservation against configured query 
memory limits:
-  if (schedule->query_options().__isset.buffer_pool_limit &&
-      schedule->query_options().buffer_pool_limit > 0) {
-    if (max_min_reservation_bytes > 
schedule->query_options().buffer_pool_limit) {
+  if (schedule->query_options().__isset.buffer_pool_limit
+      && schedule->query_options().buffer_pool_limit > 0) {
+    if (max_min_mem_reservation_bytes > 
schedule->query_options().buffer_pool_limit) {
       *rejection_reason = 
Substitute(REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION,
-          PrintBytes(max_min_reservation_bytes));
+          PrintBytes(max_min_mem_reservation_bytes));
       return true;
     }
-  } else if (schedule->query_options().__isset.mem_limit &&
-      schedule->query_options().mem_limit > 0) {
+  } else if (schedule->query_options().__isset.mem_limit
+      && schedule->query_options().mem_limit > 0) {
     const int64_t mem_limit = schedule->query_options().mem_limit;
     const int64_t max_reservation =
         ReservationUtil::GetReservationLimitFromMemLimit(mem_limit);
-    if (max_min_reservation_bytes > max_reservation) {
+    if (max_min_mem_reservation_bytes > max_reservation) {
       const int64_t required_mem_limit =
-          
ReservationUtil::GetMinMemLimitFromReservation(max_min_reservation_bytes);
+          
ReservationUtil::GetMinMemLimitFromReservation(max_min_mem_reservation_bytes);
       *rejection_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
-          PrintBytes(max_min_reservation_bytes), 
PrintBytes(required_mem_limit));
+          PrintBytes(max_min_mem_reservation_bytes), 
PrintBytes(required_mem_limit));
       return true;
     }
   }
@@ -447,15 +447,15 @@ bool 
AdmissionController::RejectImmediately(QuerySchedule* schedule,
     *rejection_reason = REASON_DISABLED_MAX_MEM_RESOURCES;
     return true;
   }
-  if (pool_cfg.max_mem_resources > 0 &&
-      cluster_min_reservation_bytes > pool_cfg.max_mem_resources) {
+  if (pool_cfg.max_mem_resources > 0
+      && cluster_min_mem_reservation_bytes > pool_cfg.max_mem_resources) {
     *rejection_reason = Substitute(REASON_MIN_RESERVATION_OVER_POOL_MEM,
         PrintBytes(pool_cfg.max_mem_resources),
-        PrintBytes(cluster_min_reservation_bytes));
+        PrintBytes(cluster_min_mem_reservation_bytes));
     return true;
   }
-  if (pool_cfg.max_mem_resources > 0 &&
-      schedule->GetClusterMemoryEstimate() > pool_cfg.max_mem_resources) {
+  if (pool_cfg.max_mem_resources > 0
+      && schedule->GetClusterMemoryEstimate() > pool_cfg.max_mem_resources) {
     *rejection_reason = Substitute(REASON_REQ_OVER_POOL_MEM,
         PrintBytes(schedule->GetClusterMemoryEstimate()),
         PrintBytes(pool_cfg.max_mem_resources));

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h 
b/be/src/scheduling/query-schedule.h
index be219a8..fa200c6 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -58,13 +58,13 @@ struct BackendExecParams {
   // concurrently-executing operators at any point in query execution. It may 
be less
   // than the initial reservation total claims (below) if execution of some 
operators
   // never overlaps, which allows reuse of reservations.
-  int64_t min_reservation_bytes;
+  int64_t min_mem_reservation_bytes;
 
   // Total of the initial buffer reservations that we expect to be claimed on 
this
   // backend for all fragment instances in instance_params. I.e. the sum over 
all
   // operators in all fragment instances that execute on this backend. This is 
used for
   // an optimization in InitialReservation. Measured in bytes.
-  int64_t initial_reservation_total_claims;
+  int64_t initial_mem_reservation_total_claims;
 };
 
 /// map from an impalad host address to the list of assigned fragment instance 
params.

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index d8aa88b..5721221 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -714,21 +714,21 @@ void Scheduler::ComputeBackendExecParams(QuerySchedule* 
schedule) {
       // instances on this backend can consume their peak resources at the 
same time,
       // i.e. that this backend's peak resources is the sum of the 
per-fragment-instance
       // peak resources for the instances executing on this backend.
-      be_params.min_reservation_bytes += f.fragment.min_reservation_bytes;
-      be_params.initial_reservation_total_claims +=
-          f.fragment.initial_reservation_total_claims;
+      be_params.min_mem_reservation_bytes += 
f.fragment.min_mem_reservation_bytes;
+      be_params.initial_mem_reservation_total_claims +=
+          f.fragment.initial_mem_reservation_total_claims;
     }
   }
   schedule->set_per_backend_exec_params(per_backend_params);
 
-  stringstream min_reservation_ss;
+  stringstream min_mem_reservation_ss;
   for (const auto& e: per_backend_params) {
-    min_reservation_ss << TNetworkAddressToString(e.first) << "("
-         << PrettyPrinter::Print(e.second.min_reservation_bytes, TUnit::BYTES)
+    min_mem_reservation_ss << TNetworkAddressToString(e.first) << "("
+         << PrettyPrinter::Print(e.second.min_mem_reservation_bytes, 
TUnit::BYTES)
          << ") ";
   }
-  schedule->summary_profile()->AddInfoString("Per Host Min Reservation",
-      min_reservation_ss.str());
+  schedule->summary_profile()->AddInfoString("Per Host Min Memory Reservation",
+      min_mem_reservation_ss.str());
 }
 
 Scheduler::AssignmentCtx::AssignmentCtx(const BackendConfig& executor_config,

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 8ba57e5..66b1af2 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -408,8 +408,13 @@ struct TQueryExecRequest {
   9: optional i64 per_host_mem_estimate
 
   // Maximum possible (in the case all fragments are scheduled on all hosts 
with
-  // max DOP) minimum reservation required per host, in bytes.
-  10: optional i64 max_per_host_min_reservation;
+  // max DOP) minimum memory reservation required per host, in bytes.
+  10: optional i64 max_per_host_min_mem_reservation;
+
+  // Maximum possible (in the case all fragments are scheduled on all hosts 
with
+  // max DOP) required threads per host, i.e. the number of threads that this 
query
+  // needs to execute successfully. Does not include "optional" threads.
+  11: optional i64 max_per_host_thread_reservation;
 }
 
 enum TCatalogOpType {

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index b394443..dad46f7 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -502,19 +502,19 @@ struct TExecQueryFInstancesParams {
   // required in V1
   5: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
 
-  // The minimum query-wide buffer reservation size (in bytes) required for 
the backend
+  // The minimum query-wide memory reservation (in bytes) required for the 
backend
   // executing the instances in fragment_instance_ctxs. This is the peak 
minimum
   // reservation that may be required by the concurrently-executing operators 
at any
   // point in query execution. It may be less than the initial reservation 
total claims
   // (below) if execution of some operators never overlaps, which allows reuse 
of
   // reservations. required in V1
-  6: optional i64 min_reservation_bytes
+  6: optional i64 min_mem_reservation_bytes
 
   // Total of the initial buffer reservations that we expect to be claimed on 
this
   // backend for all fragment instances in fragment_instance_ctxs. I.e. the 
sum over all
   // operators in all fragment instances that execute on this backend. This is 
used for
   // an optimization in InitialReservation. Measured in bytes. required in V1
-  7: optional i64 initial_reservation_total_claims
+  7: optional i64 initial_mem_reservation_total_claims
 }
 
 struct TExecQueryFInstancesResult {

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/common/thrift/Planner.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index 74256c3..9d2fc9d 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -61,19 +61,23 @@ struct TPlanFragment {
   // output, which is specified by output_sink.output_partitioning.
   6: required Partitions.TDataPartition partition
 
-  // The minimum reservation size (in bytes) required for an instance of this 
plan
+  // The minimum memory reservation (in bytes) required for an instance of 
this plan
   // fragment to execute on a single host.
-  7: optional i64 min_reservation_bytes
+  7: optional i64 min_mem_reservation_bytes
 
-  // Total of the initial buffer reservations that we expect to be claimed by 
this
+  // Total of the initial memory reservations that we expect to be claimed by 
this
   // fragment. I.e. the sum of the min reservations over all operators 
(including the
   // sink) in a single instance of this fragment. This is used for an 
optimization in
   // InitialReservation. Measured in bytes. required in V1
-  8: optional i64 initial_reservation_total_claims
+  8: optional i64 initial_mem_reservation_total_claims
 
   // The total memory (in bytes) required for the runtime filters used by the 
plan nodes
   // managed by this fragment. Is included in min_reservation_bytes.
   9: optional i64 runtime_filters_reservation_bytes
+
+  // Maximum number of required threads that will be executing concurrently 
for this plan
+  // fragment, i.e. the number of threads that this query needs to execute 
successfully.
+  10: optional i64 thread_reservation
 }
 
 // location information for a single scan range

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java 
b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index c31f448..03010b8 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -300,12 +300,12 @@ public class AggregationNode extends PlanNode {
     }
 
     // Must be kept in sync with PartitionedAggregationNode::MinReservation() 
in be.
-    long perInstanceMinReservation;
+    long perInstanceMinMemReservation;
     long bufferSize = queryOptions.getDefault_spillable_buffer_size();
     long maxRowBufferSize =
         computeMaxSpillableBufferSize(bufferSize, 
queryOptions.getMax_row_size());
     if (aggInfo_.getGroupingExprs().isEmpty()) {
-      perInstanceMinReservation = 0;
+      perInstanceMinMemReservation = 0;
     } else {
       // This is a grouping pre-aggregation or merge aggregation.
       final int PARTITION_FANOUT = 16;
@@ -327,19 +327,19 @@ public class AggregationNode extends PlanNode {
         // and at least 64kb for hash tables per partition. We must reserve at 
least one
         // full buffer for hash tables for the suballocator to subdivide. We 
don't need to
         // reserve memory for large rows since they can be passed through if 
needed.
-        perInstanceMinReservation = bufferSize * PARTITION_FANOUT +
+        perInstanceMinMemReservation = bufferSize * PARTITION_FANOUT +
             Math.max(64 * 1024 * PARTITION_FANOUT, bufferSize);
       } else {
         long minBuffers = PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() ? 
1 : 0);
         // Two of the buffers need to be buffers large enough to hold the 
maximum-sized
         // row to serve as input and output buffers while repartitioning.
-        perInstanceMinReservation = bufferSize * (minBuffers - 2) + 
maxRowBufferSize * 2;
+        perInstanceMinMemReservation = bufferSize * (minBuffers - 2) + 
maxRowBufferSize * 2;
       }
     }
 
     nodeResourceProfile_ = new ResourceProfileBuilder()
         .setMemEstimateBytes(perInstanceMemEstimate)
-        .setMinReservationBytes(perInstanceMinReservation)
+        .setMinMemReservationBytes(perInstanceMinMemReservation)
         .setSpillableBufferBytes(bufferSize)
         .setMaxRowBufferBytes(maxRowBufferSize).build();
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java 
b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index 1ce4884..5ca666c 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -253,10 +253,10 @@ public class AnalyticEvalNode extends PlanNode {
         queryOptions.getDefault_spillable_buffer_size(), 
queryOptions.getMax_row_size());
 
     // Must be kept in sync with MIN_REQUIRED_BUFFERS in AnalyticEvalNode in 
be.
-    long perInstanceMinBufferBytes = 2 * bufferSize;
+    long perInstanceMinMemReservation = 2 * bufferSize;
     nodeResourceProfile_ = new ResourceProfileBuilder()
         .setMemEstimateBytes(perInstanceMemEstimate)
-        .setMinReservationBytes(perInstanceMinBufferBytes)
+        .setMinMemReservationBytes(perInstanceMinMemReservation)
         
.setSpillableBufferBytes(bufferSize).setMaxRowBufferBytes(bufferSize).build();
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index 2730cfc..3c90fa8 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -243,11 +243,11 @@ public class HashJoinNode extends JoinNode {
     // to serve as input and output buffers while repartitioning.
     long maxRowBufferSize =
         computeMaxSpillableBufferSize(bufferSize, 
queryOptions.getMax_row_size());
-    long perInstanceMinBufferBytes =
+    long perInstanceMinMemReservation =
         bufferSize * (minBuffers - 2) + maxRowBufferSize * 2;
     nodeResourceProfile_ = new ResourceProfileBuilder()
         .setMemEstimateBytes(perInstanceMemEstimate)
-        .setMinReservationBytes(perInstanceMinBufferBytes)
+        .setMinMemReservationBytes(perInstanceMinMemReservation)
         .setSpillableBufferBytes(bufferSize)
         .setMaxRowBufferBytes(maxRowBufferSize).build();
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 398393b..dbaa965 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1261,7 +1261,7 @@ public class HdfsScanNode extends ScanNode {
     List<Long> columnReservations = null;
     if (fileFormats_.contains(HdfsFileFormat.PARQUET)
         || fileFormats_.contains(HdfsFileFormat.ORC)) {
-      columnReservations = computeMinColumnReservations();
+      columnReservations = computeMinColumnMemReservations();
     }
 
     int perHostScanRanges;
@@ -1277,6 +1277,8 @@ public class HdfsScanNode extends ScanNode {
           (double) scanRanges_.size() / (double) numNodes_) * 
SCAN_RANGE_SKEW_FACTOR);
     }
 
+    // The non-MT scan node requires at least one scanner thread.
+    int requiredThreads = useMtScanNode_ ? 0 : 1;
     int maxScannerThreads;
     if (queryOptions.getMt_dop() >= 1) {
       maxScannerThreads = 1;
@@ -1313,15 +1315,16 @@ public class HdfsScanNode extends ScanNode {
 
     nodeResourceProfile_ = new ResourceProfileBuilder()
         .setMemEstimateBytes(perInstanceMemEstimate)
-        
.setMinReservationBytes(computeMinReservation(columnReservations)).build();
+        
.setMinMemReservationBytes(computeMinMemReservation(columnReservations))
+        .setThreadReservation(requiredThreads).build();
   }
 
   /**
-   *  Compute the minimum reservation to process a single scan range (i.e. 
hdfs split).
-   *  We aim to choose a reservation that is as low as possible while still 
giving OK
-   *  performance when running with only the minimum reservation. The lower 
bound is one
-   *  minimum-sized buffer per IoMgr scan range - the absolute minimum 
required to scan
-   *  the data. The upper bounds are:
+   *  Compute the minimum memory reservation to process a single scan range
+   *  (i.e. hdfs split). We aim to choose a reservation that is as low as 
possible while
+   *  still giving OK performance when running with only the minimum 
reservation. The
+   *  lower bound is one minimum-sized buffer per IoMgr scan range - the 
absolute minimum
+   *  required to scan the data. The upper bounds are:
    * - One max-sized I/O buffer per IoMgr scan range. One max-sized I/O buffer 
avoids
    *   issuing small I/O unnecessarily. The backend can try to increase the 
reservation
    *   further if more memory would speed up processing.
@@ -1330,7 +1333,7 @@ public class HdfsScanNode extends ScanNode {
    * - The hdfs split size, to avoid reserving excessive memory for small 
files or ranges,
    *   e.g. small dimension tables with very few rows.
    */
-  private long computeMinReservation(List<Long> columnReservations) {
+  private long computeMinMemReservation(List<Long> columnReservations) {
     Preconditions.checkState(largestScanRangeBytes_ >= 0);
     long maxIoBufferSize =
         BitUtil.roundUpToPowerOf2(BackendConfig.INSTANCE.getReadSize());
@@ -1364,7 +1367,8 @@ public class HdfsScanNode extends ScanNode {
     //   the amount of buffers required to read it all at once.
     int iomgrScanRangesPerSplit = columnReservations != null ?
         Math.max(1, columnReservations.size()) : 1;
-    long maxReservationBytes = roundUpToIoBuffer(largestScanRangeBytes_, 
maxIoBufferSize);
+    long maxReservationBytes =
+        roundUpToIoBuffer(largestScanRangeBytes_, maxIoBufferSize);
     return Math.max(iomgrScanRangesPerSplit * 
BackendConfig.INSTANCE.getMinBufferSize(),
         Math.min(reservationBytes, maxReservationBytes));
   }
@@ -1380,7 +1384,7 @@ public class HdfsScanNode extends ScanNode {
    * logic for nested types in non-shredded columnar formats (e.g. IMPALA-6503 
- ORC)
    * if/when that is added.
    */
-  private List<Long> computeMinColumnReservations() {
+  private List<Long> computeMinColumnMemReservations() {
     List<Long> columnByteSizes = Lists.newArrayList();
     HdfsTable table = (HdfsTable) desc_.getTable();
     boolean havePosSlot = false;
@@ -1399,10 +1403,10 @@ public class HdfsScanNode extends ScanNode {
             // collections.
             columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
           } else {
-            columnByteSizes.add(computeMinScalarColumnReservation(column));
+            columnByteSizes.add(computeMinScalarColumnMemReservation(column));
           }
         } else {
-          appendMinColumnReservationsForCollection(slot, columnByteSizes);
+          appendMinColumnMemReservationsForCollection(slot, columnByteSizes);
         }
       }
     }
@@ -1415,12 +1419,12 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
-   * Helper for computeMinColumnReservations() - compute minimum memory 
reservations for
-   * all of the scalar columns read from disk when materializing 
collectionSlot. Appends
-   * one number per scalar column to columnByteSizes.
+   * Helper for computeMinColumnMemReservations() - compute minimum memory 
reservations
+   * for all of the scalar columns read from disk when materializing 
collectionSlot.
+   * Appends one number per scalar column to columnMemReservations.
    */
-  private void appendMinColumnReservationsForCollection(SlotDescriptor 
collectionSlot,
-      List<Long> columnByteSizes) {
+  private void appendMinColumnMemReservationsForCollection(SlotDescriptor 
collectionSlot,
+      List<Long> columnMemReservations) {
     Preconditions.checkState(collectionSlot.getType().isCollectionType());
     boolean addedColumn = false;
     for (SlotDescriptor nestedSlot: 
collectionSlot.getItemTupleDesc().getSlots()) {
@@ -1429,16 +1433,16 @@ public class HdfsScanNode extends ScanNode {
       if (nestedSlot.getType().isScalarType()) {
         // No column stats are available for nested collections so use the 
default
         // reservation.
-        columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+        columnMemReservations.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
         addedColumn = true;
       } else {
-        appendMinColumnReservationsForCollection(nestedSlot, columnByteSizes);
+        appendMinColumnMemReservationsForCollection(nestedSlot, 
columnMemReservations);
       }
     }
     // Need to scan at least one column to materialize the pos virtual slot 
and/or
     // determine the size of the nested array. Assume it is the size of a 
single I/O
     // buffer.
-    if (!addedColumn) 
columnByteSizes.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
+    if (!addedColumn) 
columnMemReservations.add(DEFAULT_COLUMN_SCAN_RANGE_RESERVATION);
   }
 
   /**
@@ -1449,12 +1453,12 @@ public class HdfsScanNode extends ScanNode {
    * infer that the column data is smaller than this starting value (and 
therefore the
    * extra memory would not be useful). These estimates are quite conservative 
so this
    * will still often overestimate the column size. An overestimate does not 
necessarily
-   * result in memory being wasted becase the Parquet scanner distributes the 
total
+   * result in memory being wasted because the Parquet scanner distributes the 
total
    * reservation between columns based on actual column size, so if multiple 
columns are
    * scanned, memory over-reserved for one column can be used to help scan a 
different
    * larger column.
    */
-  private long computeMinScalarColumnReservation(Column column) {
+  private long computeMinScalarColumnMemReservation(Column column) {
     Preconditions.checkNotNull(column);
     long reservationBytes = DEFAULT_COLUMN_SCAN_RANGE_RESERVATION;
     ColumnStats stats = column.getStats();

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index f60d519..22de3e2 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -271,7 +271,10 @@ public class KuduScanNode extends ScanNode {
 
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
-    nodeResourceProfile_ = ResourceProfile.noReservation(0);
+    // TODO: add a memory estimate when we revisit memory estimates overall.
+    nodeResourceProfile_ = new ResourceProfileBuilder()
+        .setMemEstimateBytes(0)
+        .setThreadReservation(useMtScanNode_ ? 0 : 1).build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java 
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index 775219f..7a90c23 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -19,11 +19,8 @@ package org.apache.impala.planner;
 
 import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
-import org.apache.impala.analysis.TupleId;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.PrintUtils;
@@ -111,13 +108,13 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   // computeResourceProfile().
   private ResourceProfile resourceProfile_ = ResourceProfile.invalid();
 
-  // The total of initial reservations (in bytes) that will be claimed over 
the lifetime
-  // of this fragment. Computed in computeResourceProfile().
-  private long initialReservationTotalClaims_ = -1;
+  // The total of initial memory reservations (in bytes) that will be claimed 
over the
+  // lifetime of this fragment. Computed in computeResourceProfile().
+  private long initialMemReservationTotalClaims_ = -1;
 
   // The total memory (in bytes) required for the runtime filters used by the 
plan nodes
   // managed by this fragment.
-  private long runtimeFiltersReservationBytes_ = 0;
+  private long runtimeFiltersMemReservationBytes_ = 0;
 
   /**
    * C'tor for fragment with specific partition; the output is by default 
broadcast.
@@ -239,7 +236,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         // so only add it once.
         if (!filterSet.contains(filter.getFilterId())) {
           filterSet.add(filter.getFilterId());
-          runtimeFiltersReservationBytes_ += filter.getFilterSize();
+          runtimeFiltersMemReservationBytes_ += filter.getFilterSize();
         }
       }
     }
@@ -257,15 +254,17 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     // The sink is opened after the plan tree.
     ResourceProfile fInstancePostOpenProfile =
         planTreeProfile.postOpenProfile.sum(sink_.getResourceProfile());
+    // One thread is required to execute the plan tree.
     resourceProfile_ = new ResourceProfileBuilder()
-        .setMemEstimateBytes(runtimeFiltersReservationBytes_)
-        .setMinReservationBytes(runtimeFiltersReservationBytes_).build()
+        .setMemEstimateBytes(runtimeFiltersMemReservationBytes_)
+        .setMinMemReservationBytes(runtimeFiltersMemReservationBytes_)
+        .setThreadReservation(1).build()
         .sum(planTreeProfile.duringOpenProfile.max(fInstancePostOpenProfile));
-    initialReservationTotalClaims_ = 
sink_.getResourceProfile().getMinReservationBytes() +
-        runtimeFiltersReservationBytes_;
+    initialMemReservationTotalClaims_ = 
sink_.getResourceProfile().getMinMemReservationBytes() +
+        runtimeFiltersMemReservationBytes_;
     for (PlanNode node: collectPlanNodes()) {
-      initialReservationTotalClaims_ +=
-          node.getNodeResourceProfile().getMinReservationBytes();
+      initialMemReservationTotalClaims_ +=
+          node.getNodeResourceProfile().getMinMemReservationBytes();
     }
   }
 
@@ -337,13 +336,14 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     if (sink_ != null) result.setOutput_sink(sink_.toThrift());
     result.setPartition(dataPartition_.toThrift());
     if (resourceProfile_.isValid()) {
-      Preconditions.checkArgument(initialReservationTotalClaims_ > -1);
-      
result.setMin_reservation_bytes(resourceProfile_.getMinReservationBytes());
-      
result.setInitial_reservation_total_claims(initialReservationTotalClaims_);
-      
result.setRuntime_filters_reservation_bytes(runtimeFiltersReservationBytes_);
+      Preconditions.checkArgument(initialMemReservationTotalClaims_ > -1);
+      
result.setMin_mem_reservation_bytes(resourceProfile_.getMinMemReservationBytes());
+      
result.setInitial_mem_reservation_total_claims(initialMemReservationTotalClaims_);
+      
result.setRuntime_filters_reservation_bytes(runtimeFiltersMemReservationBytes_);
+      result.setThread_reservation(resourceProfile_.getThreadReservation());
     } else {
-      result.setMin_reservation_bytes(0);
-      result.setInitial_reservation_total_claims(0);
+      result.setMin_mem_reservation_bytes(0);
+      result.setInitial_mem_reservation_total_claims(0);
       result.setRuntime_filters_reservation_bytes(0);
     }
     return result;
@@ -428,9 +428,9 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     } else {
       builder.append(resourceProfile_.multiply(getNumInstancesPerHost(mt_dop))
           .getExplainString());
-      if (resourceProfile_.isValid() && runtimeFiltersReservationBytes_ > 0) {
+      if (resourceProfile_.isValid() && runtimeFiltersMemReservationBytes_ > 
0) {
         builder.append(" runtime-filters-memory=");
-        builder.append(PrintUtils.printBytes(runtimeFiltersReservationBytes_));
+        
builder.append(PrintUtils.printBytes(runtimeFiltersMemReservationBytes_));
       }
     }
     builder.append("\n");

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index c18bc76..dbf4f35 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -67,7 +67,7 @@ public class Planner {
 
   public static final ResourceProfile MIN_PER_HOST_RESOURCES =
       new 
ResourceProfileBuilder().setMemEstimateBytes(MIN_PER_HOST_MEM_ESTIMATE_BYTES)
-      .setMinReservationBytes(0).build();
+      .setMinMemReservationBytes(0).build();
 
   private final PlannerContext ctx_;
 
@@ -264,12 +264,14 @@ public class Planner {
       TQueryExecRequest request, TExplainLevel explainLevel) {
     StringBuilder str = new StringBuilder();
     boolean hasHeader = false;
-    if (request.isSetMax_per_host_min_reservation()) {
-      str.append(String.format("Max Per-Host Resource Reservation: 
Memory=%s\n",
-          PrintUtils.printBytes(request.getMax_per_host_min_reservation())));
-      hasHeader = true;
-    }
-    if (request.isSetPer_host_mem_estimate()) {
+    // Only some requests (queries, DML, etc) have a resource profile.
+    if (request.isSetMax_per_host_min_mem_reservation()) {
+      Preconditions.checkState(request.isSetMax_per_host_thread_reservation());
+      Preconditions.checkState(request.isSetPer_host_mem_estimate());
+      str.append(String.format(
+          "Max Per-Host Resource Reservation: Memory=%s Threads=%d\n",
+          PrintUtils.printBytes(request.getMax_per_host_min_mem_reservation()),
+          request.getMax_per_host_thread_reservation()));
       str.append(String.format("Per-Host Resource Estimates: Memory=%s\n",
           PrintUtils.printBytes(request.getPer_host_mem_estimate())));
       hasHeader = true;
@@ -376,21 +378,25 @@ public class Planner {
 
     Preconditions.checkState(maxPerHostPeakResources.getMemEstimateBytes() >= 
0,
         maxPerHostPeakResources.getMemEstimateBytes());
-    Preconditions.checkState(maxPerHostPeakResources.getMinReservationBytes() 
>= 0,
-        maxPerHostPeakResources.getMinReservationBytes());
+    
Preconditions.checkState(maxPerHostPeakResources.getMinMemReservationBytes() >= 
0,
+        maxPerHostPeakResources.getMinMemReservationBytes());
 
     maxPerHostPeakResources = 
MIN_PER_HOST_RESOURCES.max(maxPerHostPeakResources);
 
     // TODO: Remove per_host_mem_estimate from the TQueryExecRequest when AC 
no longer
     // needs it.
     
request.setPer_host_mem_estimate(maxPerHostPeakResources.getMemEstimateBytes());
-    request.setMax_per_host_min_reservation(
-        maxPerHostPeakResources.getMinReservationBytes());
+    request.setMax_per_host_min_mem_reservation(
+        maxPerHostPeakResources.getMinMemReservationBytes());
+    request.setMax_per_host_thread_reservation(
+        maxPerHostPeakResources.getThreadReservation());
     if (LOG.isTraceEnabled()) {
       LOG.trace("Max per-host min reservation: " +
-          maxPerHostPeakResources.getMinReservationBytes());
+          maxPerHostPeakResources.getMinMemReservationBytes());
       LOG.trace("Max estimated per-host memory: " +
           maxPerHostPeakResources.getMemEstimateBytes());
+      LOG.trace("Max estimated per-host thread reservation: " +
+          maxPerHostPeakResources.getThreadReservation());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java 
b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
index 0494078..5cf4d87 100644
--- a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
+++ b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
@@ -39,13 +39,13 @@ public class ResourceProfile {
   // fix them.
   private final long memEstimateBytes_;
 
-  // Minimum buffer reservation required to execute in bytes.
+  // Minimum memory reservation required to execute in bytes.
   // The valid range is [0, maxReservationBytes_].
-  private final long minReservationBytes_;
+  private final long minMemReservationBytes_;
 
-  // Maximum buffer reservation allowed for this plan node.
-  // The valid range is [minReservationBytes_, Long.MAX_VALUE].
-  private final long maxReservationBytes_;
+  // Maximum memory reservation allowed for this plan node.
+  // The valid range is [minMemReservationBytes_, Long.MAX_VALUE].
+  private final long maxMemReservationBytes_;
 
   // The default spillable buffer size to use in a plan node. Only valid for 
resource
   // profiles for spilling PlanNodes. Operations like sum(), max(), etc., 
produce
@@ -56,38 +56,48 @@ public class ResourceProfile {
   // Must be set to a valid power-of-two value if spillableBufferBytes_ is set.
   private final long maxRowBufferBytes_;
 
-  ResourceProfile(boolean isValid, long memEstimateBytes,
-      long minReservationBytes, long maxReservationBytes, long 
spillableBufferBytes,
-      long maxRowBufferBytes) {
+  // The number of threads required to execute the plan node or plan tree. 
Does not
+  // include any optional threads that may be dynamically created or any 
threads outside
+  // of plan execution threads (e.g. system threads or threads used in the RPC 
stack).
+  // -1 if the profile is invalid (i.e. isValid_ is false).
+  private final long threadReservation_;
+
+  ResourceProfile(boolean isValid, long memEstimateBytes, long 
minMemReservationBytes,
+      long maxMemReservationBytes, long spillableBufferBytes,
+      long maxRowBufferBytes, long threadReservation) {
     Preconditions.checkArgument(spillableBufferBytes == -1 || 
maxRowBufferBytes != -1);
     Preconditions.checkArgument(spillableBufferBytes == -1
         || LongMath.isPowerOfTwo(spillableBufferBytes));
     Preconditions.checkArgument(maxRowBufferBytes == -1
         || LongMath.isPowerOfTwo(maxRowBufferBytes));
+    Preconditions.checkArgument(!isValid || threadReservation >= 0, 
threadReservation);
     isValid_ = isValid;
-    memEstimateBytes_ = (minReservationBytes != -1) ?
-        Math.max(memEstimateBytes, minReservationBytes) : memEstimateBytes;
-    minReservationBytes_ = minReservationBytes;
-    maxReservationBytes_ = maxReservationBytes;
+    memEstimateBytes_ = (minMemReservationBytes != -1) ?
+        Math.max(memEstimateBytes, minMemReservationBytes) : memEstimateBytes;
+    minMemReservationBytes_ = minMemReservationBytes;
+    maxMemReservationBytes_ = maxMemReservationBytes;
     spillableBufferBytes_ = spillableBufferBytes;
     maxRowBufferBytes_ = maxRowBufferBytes;
+    threadReservation_ = threadReservation;
   }
 
-  // Create a resource profile with zero min or max reservation.
+  // Create a resource profile with zero min or max reservation and zero 
required
+  // threads.
   public static ResourceProfile noReservation(long memEstimateBytes) {
-    return new ResourceProfile(true, memEstimateBytes, 0, 0, -1, -1);
+    return new ResourceProfile(true, memEstimateBytes, 0, 0, -1, -1, 0);
   }
 
   public static ResourceProfile invalid() {
-    return new ResourceProfile(false, -1, -1, -1, -1, -1);
+    return new ResourceProfile(false, -1, -1, -1, -1, -1, -1);
   }
 
   public boolean isValid() { return isValid_; }
   public long getMemEstimateBytes() { return memEstimateBytes_; }
-  public long getMinReservationBytes() { return minReservationBytes_; }
-  public long getMaxReservationBytes() { return maxReservationBytes_; }
+  public long getMinMemReservationBytes() { return minMemReservationBytes_; }
+  public long getMaxMemReservationBytes() { return maxMemReservationBytes_; }
   public long getSpillableBufferBytes() { return spillableBufferBytes_; }
   public long getMaxRowBufferBytes() { return maxRowBufferBytes_; }
+  public long getThreadReservation() { return threadReservation_; }
 
   // Return a string with the resource profile information suitable for 
display in an
   // explain plan in a format like: "resource1=value resource2=value"
@@ -96,13 +106,15 @@ public class ResourceProfile {
     output.append("mem-estimate=");
     output.append(isValid_ ? PrintUtils.printBytes(memEstimateBytes_) : 
"invalid");
     output.append(" mem-reservation=");
-    output.append(isValid_ ? PrintUtils.printBytes(minReservationBytes_) : 
"invalid");
+    output.append(isValid_ ? PrintUtils.printBytes(minMemReservationBytes_) : 
"invalid");
     // TODO: output maxReservation_ here if the planner becomes more 
sophisticated in
     // choosing it (beyond 0/unlimited).
     if (isValid_ && spillableBufferBytes_ != -1) {
       output.append(" spill-buffer=");
       output.append(PrintUtils.printBytes(spillableBufferBytes_));
     }
+    output.append(" thread-reservation=");
+    output.append(isValid_ ? threadReservation_ : "invalid");
     return output.toString();
   }
 
@@ -112,8 +124,9 @@ public class ResourceProfile {
     if (!other.isValid()) return this;
     return new ResourceProfile(true,
         Math.max(getMemEstimateBytes(), other.getMemEstimateBytes()),
-        Math.max(getMinReservationBytes(), other.getMinReservationBytes()),
-        Math.max(getMaxReservationBytes(), other.getMaxReservationBytes()), 
-1, -1);
+        Math.max(getMinMemReservationBytes(), 
other.getMinMemReservationBytes()),
+        Math.max(getMaxMemReservationBytes(), 
other.getMaxMemReservationBytes()), -1, -1,
+        Math.max(getThreadReservation(), other.getThreadReservation()));
   }
 
   // Returns a profile with the sum of each value in 'this' and 'other'.
@@ -122,9 +135,12 @@ public class ResourceProfile {
     if (!other.isValid()) return this;
     return new ResourceProfile(true,
         MathUtil.saturatingAdd(getMemEstimateBytes(), 
other.getMemEstimateBytes()),
-        
MathUtil.saturatingAdd(getMinReservationBytes(),other.getMinReservationBytes()),
-        MathUtil.saturatingAdd(getMaxReservationBytes(), 
other.getMaxReservationBytes()),
-        -1, -1);
+        MathUtil.saturatingAdd(
+            getMinMemReservationBytes(),other.getMinMemReservationBytes()),
+        MathUtil.saturatingAdd(
+            getMaxMemReservationBytes(), other.getMaxMemReservationBytes()),
+        -1, -1,
+        MathUtil.saturatingAdd(getThreadReservation(), 
other.getThreadReservation()));
   }
 
   // Returns a profile with all values multiplied by 'factor'.
@@ -132,14 +148,15 @@ public class ResourceProfile {
     if (!isValid()) return this;
     return new ResourceProfile(true,
         MathUtil.saturatingMultiply(memEstimateBytes_, factor),
-        MathUtil.saturatingMultiply(minReservationBytes_, factor),
-        MathUtil.saturatingMultiply(maxReservationBytes_, factor), -1, -1);
+        MathUtil.saturatingMultiply(minMemReservationBytes_, factor),
+        MathUtil.saturatingMultiply(maxMemReservationBytes_, factor), -1, -1,
+        MathUtil.saturatingMultiply(threadReservation_, factor));
   }
 
   public TBackendResourceProfile toThrift() {
     TBackendResourceProfile result = new TBackendResourceProfile();
-    result.setMin_reservation(minReservationBytes_);
-    result.setMax_reservation(maxReservationBytes_);
+    result.setMin_reservation(minMemReservationBytes_);
+    result.setMax_reservation(maxMemReservationBytes_);
     if (spillableBufferBytes_ != -1) {
       result.setSpillable_buffer_size(spillableBufferBytes_);
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java 
b/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java
index 5420200..394e5dc 100644
--- a/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java
+++ b/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java
@@ -28,8 +28,8 @@ public class ResourceProfileBuilder {
   private long memEstimateBytes_ = -1;
 
   // Assume no reservation is used unless the caller explicitly sets it.
-  private long minReservationBytes_ = 0;
-  private long maxReservationBytes_ = 0;
+  private long minMemReservationBytes_ = 0;
+  private long maxMemReservationBytes_ = 0;
 
   // The spillable buffer size is only set by plan nodes that use it.
   private long spillableBufferBytes_= -1;
@@ -37,17 +37,20 @@ public class ResourceProfileBuilder {
   // Must be set if spillableBufferBytes_ is set.
   private long maxRowBufferBytes_= -1;
 
+  // Defaults to zero, because most ExecNodes do not create additional threads.
+  private long threadReservation_ = 0;
+
   public ResourceProfileBuilder setMemEstimateBytes(long memEstimateBytes) {
     memEstimateBytes_ = memEstimateBytes;
     return this;
   }
 
   /**
-   * Sets the minimum reservation and an unbounded maximum reservation.
+   * Sets the minimum memory reservation and an unbounded maximum memory 
reservation.
    */
-  public ResourceProfileBuilder setMinReservationBytes(long 
minReservationBytes) {
-    minReservationBytes_ = minReservationBytes;
-    maxReservationBytes_ = Long.MAX_VALUE;
+  public ResourceProfileBuilder setMinMemReservationBytes(long 
minMemReservationBytes) {
+    minMemReservationBytes_ = minMemReservationBytes;
+    maxMemReservationBytes_ = Long.MAX_VALUE;
     return this;
   }
 
@@ -61,9 +64,15 @@ public class ResourceProfileBuilder {
     return this;
   }
 
+  public ResourceProfileBuilder setThreadReservation(long threadReservation) {
+    threadReservation_ = threadReservation;
+    return this;
+  }
+
   ResourceProfile build() {
     Preconditions.checkState(memEstimateBytes_ >= 0, "Mem estimate must be 
set");
-    return new ResourceProfile(true, memEstimateBytes_, minReservationBytes_,
-        maxReservationBytes_, spillableBufferBytes_, maxRowBufferBytes_);
+    return new ResourceProfile(true, memEstimateBytes_, 
minMemReservationBytes_,
+        maxMemReservationBytes_, spillableBufferBytes_, maxRowBufferBytes_,
+        threadReservation_);
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/fe/src/main/java/org/apache/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java 
b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 45ee763..ccef721 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -288,7 +288,7 @@ public class SortNode extends PlanNode {
     // Must be kept in sync with ComputeMinReservation() in Sorter in be.
     int pageMultiplier = usesVarLenBlocks ? 2 : 1;
     long perInstanceMemEstimate;
-    long perInstanceMinReservation;
+    long perInstanceMinMemReservation;
     if (type_ == TSortType.PARTIAL) {
       // The memory limit cannot be less than the size of the required blocks.
       long mem_limit = Math.max(PARTIAL_SORT_MEM_LIMIT, bufferSize * 
pageMultiplier);
@@ -296,16 +296,16 @@ public class SortNode extends PlanNode {
       perInstanceMemEstimate = fullInputSize < 0 ?
           mem_limit :
           Math.min((long) Math.ceil(fullInputSize), mem_limit);
-      perInstanceMinReservation = bufferSize * pageMultiplier;
+      perInstanceMinMemReservation = bufferSize * pageMultiplier;
     } else {
       double numInputBlocks = Math.ceil(fullInputSize / (bufferSize * 
pageMultiplier));
       perInstanceMemEstimate =
           bufferSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
-      perInstanceMinReservation = 3 * bufferSize * pageMultiplier;
+      perInstanceMinMemReservation = 3 * bufferSize * pageMultiplier;
     }
     nodeResourceProfile_ = new ResourceProfileBuilder()
         .setMemEstimateBytes(perInstanceMemEstimate)
-        .setMinReservationBytes(perInstanceMinReservation)
+        .setMinMemReservationBytes(perInstanceMinMemReservation)
         
.setSpillableBufferBytes(bufferSize).setMaxRowBufferBytes(bufferSize).build();
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e12ee485/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 5b3b426..9552caf 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -5,44 +5,44 @@ where 5 + 5 < c_custkey and o_orderkey = (2 + 2)
   and (coalesce(2, 3, 4) * 10) + l_linenumber < (0 * 1)
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=264.00MB mem-reservation=16.00MB
+|  Per-Host Resources: mem-estimate=264.00MB mem-reservation=16.00MB 
thread-reservation=2
 PLAN-ROOT SINK
-|  mem-estimate=0B mem-reservation=0B
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:SUBPLAN
-|  mem-estimate=0B mem-reservation=0B
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=2,1,0 row-size=52B cardinality=1500000
 |
 |--08:NESTED LOOP JOIN [CROSS JOIN]
-|  |  mem-estimate=24B mem-reservation=0B
+|  |  mem-estimate=24B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=2,1,0 row-size=52B cardinality=100
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
-|  |     mem-estimate=0B mem-reservation=0B
+|  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |     tuple-ids=0 row-size=24B cardinality=1
 |  |
 |  04:SUBPLAN
-|  |  mem-estimate=0B mem-reservation=0B
+|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=2,1 row-size=28B cardinality=100
 |  |
 |  |--07:NESTED LOOP JOIN [CROSS JOIN]
-|  |  |  mem-estimate=24B mem-reservation=0B
+|  |  |  mem-estimate=24B mem-reservation=0B thread-reservation=0
 |  |  |  tuple-ids=2,1 row-size=28B cardinality=10
 |  |  |
 |  |  |--05:SINGULAR ROW SRC
 |  |  |     parent-subplan=04
-|  |  |     mem-estimate=0B mem-reservation=0B
+|  |  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |  |     tuple-ids=1 row-size=24B cardinality=1
 |  |  |
 |  |  06:UNNEST [o.o_lineitems]
 |  |     parent-subplan=04
-|  |     mem-estimate=0B mem-reservation=0B
+|  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |     tuple-ids=2 row-size=0B cardinality=10
 |  |
 |  03:UNNEST [c.c_orders o]
 |     parent-subplan=01
-|     mem-estimate=0B mem-reservation=0B
+|     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |     tuple-ids=1 row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
@@ -59,7 +59,7 @@ PLAN-ROOT SINK
    parquet dictionary predicates: c_custkey > 10
    parquet dictionary predicates on o: o_orderkey = 4
    parquet dictionary predicates on o_lineitems: 20 + l_linenumber < 0
-   mem-estimate=264.00MB mem-reservation=16.00MB
+   mem-estimate=264.00MB mem-reservation=16.00MB thread-reservation=1
    tuple-ids=0 row-size=24B cardinality=15000
 ====
 # Test HBase scan node.
@@ -68,9 +68,9 @@ where string_col = cast(4 as string) and 2 + 3 = tinyint_col
   and id between concat('1', '0') and upper('20')
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=1.00GB mem-reservation=0B
+|  Per-Host Resources: mem-estimate=1.00GB mem-reservation=0B 
thread-reservation=1
 PLAN-ROOT SINK
-|  mem-estimate=0B mem-reservation=0B
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 00:SCAN HBASE [functional_hbase.stringids]
    start key: 10
@@ -80,7 +80,7 @@ PLAN-ROOT SINK
    stored statistics:
      table: rows=10000
      columns: all
-   mem-estimate=1.00GB mem-reservation=0B
+   mem-estimate=1.00GB mem-reservation=0B thread-reservation=0
    tuple-ids=0 row-size=119B cardinality=1
 ====
 # Test datasource scan node.
@@ -88,14 +88,14 @@ select * from functional.alltypes_datasource
 where tinyint_col < (pow(2, 8)) and float_col != 0 and 1 + 1 > int_col
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=1.00GB mem-reservation=0B
+|  Per-Host Resources: mem-estimate=1.00GB mem-reservation=0B 
thread-reservation=1
 PLAN-ROOT SINK
-|  mem-estimate=0B mem-reservation=0B
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 00:SCAN DATA SOURCE [functional.alltypes_datasource]
 data source predicates: tinyint_col < 256, int_col < 2
 predicates: float_col != 0
-   mem-estimate=1.00GB mem-reservation=0B
+   mem-estimate=1.00GB mem-reservation=0B thread-reservation=0
    tuple-ids=0 row-size=116B cardinality=500
 ====
 # Test aggregation.
@@ -107,15 +107,15 @@ having 1024 * 1024 * count(*) % 2 = 0
   and (sum(1 + 1 + id) between 5 and 10)
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=1.97MB
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=1.97MB 
thread-reservation=2
 PLAN-ROOT SINK
-|  mem-estimate=0B mem-reservation=0B
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum(2 + id), count(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00'
 |  having: sum(2 + id) <= 10, sum(2 + id) > 1, sum(2 + id) >= 5, 1048576 * 
count(*) % 2 = 0
-|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
 |  tuple-ids=1 row-size=17B cardinality=0
 |
 00:SCAN HDFS [functional.alltypes]
@@ -125,7 +125,7 @@ PLAN-ROOT SINK
      partitions: 24/24 rows=7300
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=310
-   mem-estimate=128.00MB mem-reservation=32.00KB
+   mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=20B cardinality=7300
 ====
 # Test hash join.
@@ -136,16 +136,16 @@ left outer join functional.alltypes b
 where round(1.11 + 2.22 + 3.33 + 4.44, 1) < cast(b.double_col as decimal(3, 2))
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=257.94MB mem-reservation=2.00MB
+|  Per-Host Resources: mem-estimate=257.94MB mem-reservation=2.00MB 
thread-reservation=3
 PLAN-ROOT SINK
-|  mem-estimate=0B mem-reservation=0B
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 02:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: 2 + a.id = b.id - 2
 |  fk/pk conjuncts: assumed fk/pk
 |  other join predicates: a.int_col <= b.bigint_col + 97, a.int_col >= 0 + 
b.bigint_col
 |  other predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
 |  tuple-ids=0,1N row-size=28B cardinality=7300
 |
 |--01:SCAN HDFS [functional.alltypes b]
@@ -157,7 +157,7 @@ PLAN-ROOT SINK
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=310
 |     parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-|     mem-estimate=128.00MB mem-reservation=32.00KB
+|     mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
 |     tuple-ids=1 row-size=20B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes a]
@@ -167,7 +167,7 @@ PLAN-ROOT SINK
      partitions: 24/24 rows=7300
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=310
-   mem-estimate=128.00MB mem-reservation=32.00KB
+   mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=8B cardinality=7300
 ====
 # Test nested-loop join. Same as above but and with a disjunction in the On 
clause.
@@ -179,14 +179,14 @@ left outer join functional.alltypes b
 where cast(b.double_col as decimal(3, 2)) > round(1.11 + 2.22 + 3.33 + 4.44, 1)
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=256.01MB mem-reservation=64.00KB
+|  Per-Host Resources: mem-estimate=256.01MB mem-reservation=64.00KB 
thread-reservation=3
 PLAN-ROOT SINK
-|  mem-estimate=0B mem-reservation=0B
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 02:NESTED LOOP JOIN [LEFT OUTER JOIN]
 |  join predicates: (2 + a.id = b.id - 2 OR a.int_col >= 0 + b.bigint_col AND 
a.int_col <= b.bigint_col + 97)
 |  predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-|  mem-estimate=14.26KB mem-reservation=0B
+|  mem-estimate=14.26KB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1N row-size=28B cardinality=7300
 |
 |--01:SCAN HDFS [functional.alltypes b]
@@ -198,7 +198,7 @@ PLAN-ROOT SINK
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=310
 |     parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-|     mem-estimate=128.00MB mem-reservation=32.00KB
+|     mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
 |     tuple-ids=1 row-size=20B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes a]
@@ -208,7 +208,7 @@ PLAN-ROOT SINK
      partitions: 24/24 rows=7300
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=310
-   mem-estimate=128.00MB mem-reservation=32.00KB
+   mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=8B cardinality=7300
 ====
 # Test distinct aggregation with grouping.
@@ -218,21 +218,21 @@ group by timestamp_col = cast('2015-11-15' as timestamp) 
+ interval 1 year
 having 1024 * 1024 * count(*) % 2 = 0
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=3.88MB
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=3.88MB 
thread-reservation=2
 PLAN-ROOT SINK
-|  mem-estimate=0B mem-reservation=0B
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 02:AGGREGATE [FINALIZE]
 |  output: sum(2 + id), count:merge(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00'
 |  having: 1048576 * count(*) % 2 = 0
-|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
 |  tuple-ids=2 row-size=17B cardinality=0
 |
 01:AGGREGATE
 |  output: count(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00', 2 + id
-|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
 |  tuple-ids=1 row-size=17B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
@@ -242,7 +242,7 @@ PLAN-ROOT SINK
      partitions: 24/24 rows=7300
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=310
-   mem-estimate=128.00MB mem-reservation=32.00KB
+   mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=20B cardinality=7300
 ====
 # Test non-grouping distinct aggregation.
@@ -251,20 +251,20 @@ from functional.alltypes
 having 1024 * 1024 * count(*) % 2 = 0
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=1.97MB
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=1.97MB 
thread-reservation=2
 PLAN-ROOT SINK
-|  mem-estimate=0B mem-reservation=0B
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 02:AGGREGATE [FINALIZE]
 |  output: sum(2 + id), count:merge(*)
 |  having: 1048576 * zeroifnull(count(*)) % 2 = 0
-|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB 
thread-reservation=0
 |  tuple-ids=2 row-size=16B cardinality=0
 |
 01:AGGREGATE
 |  output: count(*)
 |  group by: 2 + id
-|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
 |  tuple-ids=1 row-size=16B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
@@ -274,7 +274,7 @@ PLAN-ROOT SINK
      partitions: 24/24 rows=7300
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=310
-   mem-estimate=128.00MB mem-reservation=32.00KB
+   mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=4B cardinality=7300
 ====
 # Test analytic eval node.
@@ -284,22 +284,22 @@ select first_value(1 + 1 + int_col - (1 - 1)) over
 from functional.alltypes
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=144.00MB mem-reservation=16.03MB
+|  Per-Host Resources: mem-estimate=144.00MB mem-reservation=16.03MB 
thread-reservation=2
 PLAN-ROOT SINK
-|  mem-estimate=0B mem-reservation=0B
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 02:ANALYTIC
 |  functions: first_value(2 + int_col - 0)
 |  partition by: concat('ab', string_col)
 |  order by: greatest(20, bigint_col) ASC
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
 |  tuple-ids=3,2 row-size=61B cardinality=7300
 |
 01:SORT
 |  order by: concat('ab', string_col) ASC NULLS FIRST, greatest(20, 
bigint_col) ASC
 |  materialized: concat('ab', string_col), greatest(20, bigint_col)
-|  mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB
+|  mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB 
thread-reservation=0
 |  tuple-ids=3 row-size=53B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
@@ -309,7 +309,7 @@ PLAN-ROOT SINK
      partitions: 24/24 rows=7300
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=310
-   mem-estimate=128.00MB mem-reservation=32.00KB
+   mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=29B cardinality=7300
 ====
 # Test sort node.
@@ -317,13 +317,13 @@ select int_col from functional.alltypes
 order by id * abs((factorial(5) / power(2, 4)))
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=134.00MB mem-reservation=6.03MB
+|  Per-Host Resources: mem-estimate=134.00MB mem-reservation=6.03MB 
thread-reservation=2
 PLAN-ROOT SINK
-|  mem-estimate=0B mem-reservation=0B
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:SORT
 |  order by: id * 7.5 ASC
-|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB 
thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
@@ -333,7 +333,7 @@ PLAN-ROOT SINK
      partitions: 24/24 rows=7300
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=310
-   mem-estimate=128.00MB mem-reservation=32.00KB
+   mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=8B cardinality=7300
 ====
 # Test HDFS table sink.
@@ -342,14 +342,14 @@ select id, int_col, cast(1 + 1 + 1 + year as int), 
cast(month - (1 - 1 - 1) as i
 from functional.alltypessmall
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=38.00MB mem-reservation=6.01MB
+|  Per-Host Resources: mem-estimate=38.00MB mem-reservation=6.01MB 
thread-reservation=2
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(CAST(3 + 
year AS INT),CAST(month - -1 AS INT))]
 |  partitions=4
-|  mem-estimate=1.56KB mem-reservation=0B
+|  mem-estimate=1.56KB mem-reservation=0B thread-reservation=0
 |
 01:SORT
 |  order by: CAST(3 + year AS INT) ASC NULLS LAST, CAST(month - -1 AS INT) ASC 
NULLS LAST
-|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB 
thread-reservation=0
 |  tuple-ids=1 row-size=16B cardinality=100
 |
 00:SCAN HDFS [functional.alltypessmall]
@@ -359,7 +359,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, 
PARTITION-KEYS=(CAST(3 + ye
      partitions: 4/4 rows=100
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=25
-   mem-estimate=32.00MB mem-reservation=8.00KB
+   mem-estimate=32.00MB mem-reservation=8.00KB thread-reservation=1
    tuple-ids=0 row-size=16B cardinality=100
 ====
 # Constant folding does not work across query blocks.
@@ -371,13 +371,13 @@ select sum(id + c3) from
   ) v3
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=32.00KB
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=32.00KB 
thread-reservation=2
 PLAN-ROOT SINK
-|  mem-estimate=0B mem-reservation=0B
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum(id + 10 + 20 + 30)
-|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB 
thread-reservation=0
 |  tuple-ids=4 row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
@@ -388,6 +388,6 @@ PLAN-ROOT SINK
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=310
    limit: 2
-   mem-estimate=128.00MB mem-reservation=32.00KB
+   mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=4B cardinality=2
 ====

Reply via email to