Repository: incubator-impala
Updated Branches:
  refs/heads/master 6b7425a57 -> aa0ee1e10


IMPALA-4703: reservation denial debug action

Add debug action to deny reservation increases with some probability.
This allows us to test various scenarios, particularly:
* The case when the node only gets its initial reservation and must
  run to completion without increasing its reservation.
* The case when there is some memory pressure and the node sometimes
  gets a reservation increase and sometimes doesn't.

E.g. to deny all reservation requests after an ExecNode has opened:

  set debug_action=-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0

This was applied to test_spilling. It caught a bug in the PAGG
with spilling string aggregations.

This required some minor extensions to the debug actions.
* Allow debug actions that apply to all ExecNodes if node_id is -1.
* Allow passing parameters to debug actions. The current grammar of the
  actions is not well-oriented towards extension, so I resorted to using
  @ as a new delimiter.

I also optimised ExecDebugAction() so that it is much faster in the
common case and extended --disable_mem_pools to prevent the buffer pool
from holding onto unused buffers.

Change-Id: Ied39bb091b12156e5dc61b528c6c0cd8de3fe657
Reviewed-on: http://gerrit.cloudera.org:8080/7022
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/aa0ee1e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/aa0ee1e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/aa0ee1e1

Branch: refs/heads/master
Commit: aa0ee1e10a971cb7aee71094e15a2b1970a195bb
Parents: 6b7425a
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Fri May 26 09:12:13 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Sat Aug 5 22:13:31 2017 +0000

----------------------------------------------------------------------
 be/src/exec/exec-node.cc                        | 33 ++++++++++++------
 be/src/exec/exec-node.h                         | 15 +++++++--
 be/src/runtime/bufferpool/buffer-allocator.cc   | 13 ++++++++
 be/src/runtime/bufferpool/buffer-pool.cc        |  4 +++
 be/src/runtime/bufferpool/buffer-pool.h         |  3 ++
 .../runtime/bufferpool/reservation-tracker.cc   | 15 ++++++---
 be/src/runtime/bufferpool/reservation-tracker.h | 18 +++++++---
 be/src/runtime/coordinator-backend-state.cc     |  2 +-
 be/src/runtime/debug-options.cc                 | 35 +++++++++++++++-----
 be/src/runtime/debug-options.h                  | 19 ++++++++---
 common/thrift/ImpalaInternalService.thrift      |  4 +++
 common/thrift/PlanNodes.thrift                  |  3 ++
 tests/query_test/test_spilling.py               | 13 +++++++-
 13 files changed, 140 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 61c8d40..de92fbe 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -62,6 +62,7 @@
 #include "runtime/runtime-state.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
+#include "util/string-parser.h"
 
 #include "common/names.h"
 
@@ -392,10 +393,10 @@ void ExecNode::SetDebugOptions(const TDebugOptions& 
debug_options, ExecNode* roo
   DCHECK(debug_options.__isset.node_id);
   DCHECK(debug_options.__isset.phase);
   DCHECK(debug_options.__isset.action);
-  if (root->id_ == debug_options.node_id) {
+  if (debug_options.node_id == -1 || root->id_ == debug_options.node_id) {
     root->debug_phase_ = debug_options.phase;
     root->debug_action_ = debug_options.action;
-    return;
+    root->debug_action_param_ = debug_options.action_param;
   }
   for (int i = 0; i < root->children_.size(); ++i) {
     SetDebugOptions(debug_options, root->children_[i]);
@@ -436,25 +437,35 @@ void ExecNode::InitRuntimeProfile(const string& name) {
   runtime_profile_->set_metadata(id_);
 }
 
-Status ExecNode::ExecDebugAction(TExecNodePhase::type phase, RuntimeState* 
state) {
-  DCHECK(phase != TExecNodePhase::INVALID);
-  if (debug_phase_ != phase) return Status::OK();
+Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* 
state) {
+  DCHECK_EQ(debug_phase_, phase);
   if (debug_action_ == TDebugAction::FAIL) {
     return Status(TErrorCode::INTERNAL_ERROR, "Debug Action: FAIL");
-  }
-  if (debug_action_ == TDebugAction::WAIT) {
+  } else if (debug_action_ == TDebugAction::WAIT) {
     while (!state->is_cancelled()) {
       sleep(1);
     }
     return Status::CANCELLED;
-  }
-  if (debug_action_ == TDebugAction::INJECT_ERROR_LOG) {
+  } else if (debug_action_ == TDebugAction::INJECT_ERROR_LOG) {
     state->LogError(
         ErrorMsg(TErrorCode::INTERNAL_ERROR, "Debug Action: 
INJECT_ERROR_LOG"));
     return Status::OK();
-  }
-  if (debug_action_ == TDebugAction::MEM_LIMIT_EXCEEDED) {
+  } else if (debug_action_ == TDebugAction::MEM_LIMIT_EXCEEDED) {
     return mem_tracker()->MemLimitExceeded(state, "Debug Action: 
MEM_LIMIT_EXCEEDED");
+  } else {
+    DCHECK_EQ(debug_action_, TDebugAction::SET_DENY_RESERVATION_PROBABILITY);
+    if (buffer_pool_client_.is_registered()) {
+      // Parse [0.0, 1.0] probability.
+      StringParser::ParseResult parse_result;
+      double probability = StringParser::StringToFloat<double>(
+          debug_action_param_.c_str(), debug_action_param_.size(), 
&parse_result);
+      if (parse_result != StringParser::PARSE_SUCCESS || probability < 0.0
+          || probability > 1.0) {
+        return Status(Substitute(
+            "Invalid SET_DENY_RESERVATION_PROBABILITY param: '$0'", 
debug_action_param_));
+      }
+      buffer_pool_client_.SetDebugDenyIncreaseReservation(probability);
+    }
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 60efff0..55c51ab 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -291,6 +291,7 @@ class ExecNode {
   /// debug_phase_
   TExecNodePhase::type debug_phase_;
   TDebugAction::type debug_action_;
+  std::string debug_action_param_;
 
   int64_t limit_;  // -1: no limit
   int64_t num_rows_returned_;
@@ -341,10 +342,15 @@ class ExecNode {
 
   void InitRuntimeProfile(const std::string& name);
 
-  /// Executes debug_action_ if phase matches debug_phase_.
+  /// Executes 'debug_action_' if 'phase' matches 'debug_phase_'.
   /// 'phase' must not be INVALID.
   Status ExecDebugAction(
-      TExecNodePhase::type phase, RuntimeState* state) WARN_UNUSED_RESULT;
+      TExecNodePhase::type phase, RuntimeState* state) WARN_UNUSED_RESULT {
+    DCHECK_NE(phase, TExecNodePhase::INVALID);
+    // Fast path for the common case when an action is not enabled for this 
phase.
+    if (LIKELY(debug_phase_ != phase)) return Status::OK();
+    return ExecDebugActionImpl(phase, state);
+  }
 
   /// Frees any local allocations made by evals_to_free_ and returns the 
result of
   /// state->CheckQueryState(). Nodes should call this periodically, e.g. once 
per input
@@ -364,6 +370,11 @@ class ExecNode {
   void AddEvaluatorsToFree(const std::vector<ScalarExprEvaluator*>& evals);
 
  private:
+  /// Implementation of ExecDebugAction(). This is the slow path we take when 
there is
+  /// actually a debug action enabled for 'phase'.
+  Status ExecDebugActionImpl(
+      TExecNodePhase::type phase, RuntimeState* state) WARN_UNUSED_RESULT;
+
   /// Set in ExecNode::Close(). Used to make Close() idempotent. This is not 
protected
   /// by a lock, it assumes all calls to Close() are made by the same thread.
   bool is_closed_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/be/src/runtime/bufferpool/buffer-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.cc 
b/be/src/runtime/bufferpool/buffer-allocator.cc
index 0978cca..a2e108a 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator.cc
@@ -30,6 +30,8 @@
 
 #include "common/names.h"
 
+DECLARE_bool(disable_mem_pools);
+
 namespace impala {
 
 /// An arena containing free buffers and clean pages that are associated with a
@@ -454,6 +456,12 @@ BufferPool::FreeBufferArena::~FreeBufferArena() {
 
 void BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) {
   lock_guard<SpinLock> al(lock_);
+  if (FLAGS_disable_mem_pools) {
+    int64_t len = buffer.len();
+    parent_->system_allocator_->Free(move(buffer));
+    parent_->system_bytes_remaining_.Add(len);
+    return;
+  }
   PerSizeLists* lists = GetListsForSize(buffer.len());
   FreeList* list = &lists->free_buffers;
   DCHECK_EQ(lists->num_free_buffers.Load(), list->Size());
@@ -586,6 +594,11 @@ pair<int64_t, int64_t> 
BufferPool::FreeBufferArena::FreeSystemMemory(
 }
 
 void BufferPool::FreeBufferArena::AddCleanPage(Page* page) {
+  if (FLAGS_disable_mem_pools) {
+    // Immediately evict the page.
+    AddFreeBuffer(move(page->buffer));
+    return;
+  }
   lock_guard<SpinLock> al(lock_);
   PerSizeLists* lists = GetListsForSize(page->len);
   DCHECK_EQ(lists->num_clean_pages.Load(), lists->clean_pages.size());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc 
b/be/src/runtime/bufferpool/buffer-pool.cc
index 83f2e6a..df92928 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -330,6 +330,10 @@ void 
BufferPool::ClientHandle::RestoreReservation(SubReservation* src, int64_t b
   DCHECK(success); // Transferring reservation to parent shouldn't fail.
 }
 
+void BufferPool::ClientHandle::SetDebugDenyIncreaseReservation(double 
probability) {
+  impl_->reservation()->SetDebugDenyIncreaseReservation(probability);
+}
+
 BufferPool::SubReservation::SubReservation(ClientHandle* client) {
   tracker_.reset(new ReservationTracker);
   tracker_->InitChildTracker(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h 
b/be/src/runtime/bufferpool/buffer-pool.h
index e3df8df..dab875c 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -346,6 +346,9 @@ class BufferPool::ClientHandle {
   /// ReservationTracker::TransferReservationTo().
   bool TransferReservationTo(ReservationTracker* dst, int64_t bytes);
 
+  /// Call SetDebugDenyIncreaseReservation() on this client's 
ReservationTracker.
+  void SetDebugDenyIncreaseReservation(double probability);
+
   bool is_registered() const { return impl_ != NULL; }
 
   std::string DebugString() const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/be/src/runtime/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc 
b/be/src/runtime/bufferpool/reservation-tracker.cc
index 972d825..b40f8b5 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -18,6 +18,7 @@
 #include "runtime/bufferpool/reservation-tracker.h"
 
 #include <algorithm>
+#include <cstdlib>
 
 #include "common/object-pool.h"
 #include "gutil/strings/substitute.h"
@@ -29,7 +30,7 @@
 
 namespace impala {
 
-ReservationTracker::ReservationTracker() : initialized_(false), 
mem_tracker_(nullptr) {}
+ReservationTracker::ReservationTracker() {}
 
 ReservationTracker::~ReservationTracker() {
   DCHECK(!initialized_);
@@ -147,10 +148,16 @@ bool 
ReservationTracker::IncreaseReservationInternalLocked(
 
   bool granted;
   // Check if the increase is allowed, starting at the bottom of hierarchy.
-  if (reservation_ + reservation_increase > reservation_limit_) {
-    granted = false;
-  } else if (reservation_increase == 0) {
+  if (reservation_increase == 0) {
     granted = true;
+  } else if (increase_deny_probability_ != 0.0
+      && rand() < increase_deny_probability_ * (RAND_MAX + 1L)) {
+    // Randomly deny reservation if requested. Use rand() to avoid needing to 
set up a RNG.
+    // Should be good enough. If the probability is 0.0, this never triggers. 
If it is 1.0
+    // it always triggers.
+    granted = false;
+  } else if (reservation_ + reservation_increase > reservation_limit_) {
+    granted = false;
   } else {
     if (parent_ == nullptr) {
       granted = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h 
b/be/src/runtime/bufferpool/reservation-tracker.h
index 80084bc..4126eb7 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -163,6 +163,11 @@ class ReservationTracker {
   /// Returns the total reservations of children in bytes.
   int64_t GetChildReservations();
 
+  /// Support for debug actions: deny reservation increase with probability 
'probability'.
+  void SetDebugDenyIncreaseReservation(double probability) {
+    increase_deny_probability_ = probability;
+  }
+
   ReservationTracker* parent() const { return parent_; }
 
   std::string DebugString();
@@ -175,7 +180,7 @@ class ReservationTracker {
 
   /// Returns the parent's memtracker if 'parent_' is non-NULL, or NULL 
otherwise.
   MemTracker* GetParentMemTracker() const {
-    return parent_ == NULL ? NULL : parent_->mem_tracker_;
+    return parent_ == nullptr ? nullptr : parent_->mem_tracker_;
   }
 
   /// Initializes 'counters_', storing the counters in 'profile'.
@@ -236,7 +241,10 @@ class ReservationTracker {
   /// 'lock_' must be held by caller.
   void UpdateReservation(int64_t delta);
 
-  /// lock_ protects all members. The lock order in a tree of 
ReservationTrackers is
+  /// Support for debug actions: see SetDebugDenyIncreaseReservation() for 
behaviour.
+  double increase_deny_probability_ = 0.0;
+
+  /// lock_ protects all below members. The lock order in a tree of 
ReservationTrackers is
   /// based on a post-order traversal of the tree, with children visited in 
order of the
   /// memory address of the ReservationTracker object. The following rules can 
be applied
   /// to determine the relative positions of two trackers t1 and t2 in the 
lock order:
@@ -249,7 +257,7 @@ class ReservationTracker {
   SpinLock lock_;
 
   /// True if the tracker is initialized.
-  bool initialized_;
+  bool initialized_ = false;
 
   /// A dummy profile to hold the counters in 'counters_' in the case that no 
profile
   /// is provided.
@@ -260,12 +268,12 @@ class ReservationTracker {
   ReservationTrackerCounters counters_;
 
   /// The parent of this tracker in the hierarchy. Does not change after 
initialization.
-  ReservationTracker* parent_;
+  ReservationTracker* parent_ = nullptr;
 
   /// If non-NULL, reservations are counted as memory consumption against this 
tracker.
   /// Does not change after initialization. Not owned.
   /// TODO: remove once all memory is accounted via ReservationTrackers.
-  MemTracker* mem_tracker_;
+  MemTracker* mem_tracker_ = nullptr;
 
   /// The maximum reservation in bytes that this tracker can have.
   int64_t reservation_limit_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index 3f1b2fc..93a3af2 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -116,7 +116,7 @@ void Coordinator::BackendState::SetRpcParams(
     instance_ctx.__set_per_exch_num_senders(
         params.fragment_exec_params.per_exch_num_senders);
     instance_ctx.__set_sender_id(params.sender_id);
-    if (debug_options.node_id() != -1
+    if (debug_options.enabled()
         && (debug_options.instance_idx() == -1
             || debug_options.instance_idx() == 
GetInstanceIdx(params.instance_id))) {
       instance_ctx.__set_debug_options(debug_options.ToThrift());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/be/src/runtime/debug-options.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/debug-options.cc b/be/src/runtime/debug-options.cc
index 7a63742..193daee 100644
--- a/be/src/runtime/debug-options.cc
+++ b/be/src/runtime/debug-options.cc
@@ -37,16 +37,25 @@ DebugOptions::DebugOptions(const TQueryOptions& 
query_options)
   vector<string> components;
   split(components, query_options.debug_action, is_any_of(":"), 
token_compress_on);
   if (components.size() < 3 || components.size() > 4) return;
+
+  const string* phase_str;
+  const string* action_str;
   if (components.size() == 3) {
     instance_idx_ = -1;
     node_id_ = atoi(components[0].c_str());
-    phase_ = GetExecNodePhase(components[1]);
-    action_ = GetDebugAction(components[2]);
+    phase_str = &components[1];
+    action_str = &components[2];
   } else {
     instance_idx_ = atoi(components[0].c_str());
     node_id_ = atoi(components[1].c_str());
-    phase_ = GetExecNodePhase(components[2]);
-    action_ = GetDebugAction(components[3]);
+    phase_str = &components[2];
+    action_str = &components[3];
+  }
+  phase_ = GetExecNodePhase(*phase_str);
+  if (!GetDebugAction(*action_str, &action_, &action_param_)) {
+    LOG(WARNING) << "Invalid debug action " << *action_str;
+    phase_ = TExecNodePhase::INVALID;
+    return;
   }
   DCHECK(!(phase_ == TExecNodePhase::CLOSE && action_ == TDebugAction::WAIT))
       << "Do not use CLOSE:WAIT debug actions because nodes cannot be 
cancelled in "
@@ -64,13 +73,20 @@ TExecNodePhase::type DebugOptions::GetExecNodePhase(const 
string& key) {
   return TExecNodePhase::INVALID;
 }
 
-TDebugAction::type DebugOptions::GetDebugAction(const string& key) {
-  for (auto entry: _TDebugAction_VALUES_TO_NAMES) {
-    if (iequals(key, entry.second)) {
-      return static_cast<TDebugAction::type>(entry.first);
+bool DebugOptions::GetDebugAction(
+    const string& action, TDebugAction::type* type, string* action_param) {
+  // Either "ACTION_TYPE" or "ACTION_TYPE@<integer value>".
+  vector<string> tokens;
+  split(tokens, action, is_any_of("@"), token_compress_on);
+  if (tokens.size() < 1 || tokens.size() > 2) return false;
+  if (tokens.size() == 2) *action_param = tokens[1];
+  for (auto& entry : _TDebugAction_VALUES_TO_NAMES) {
+    if (iequals(tokens[0], entry.second)) {
+      *type = static_cast<TDebugAction::type>(entry.first);
+      return true;
     }
   }
-  return TDebugAction::WAIT;
+  return false;
 }
 
 
@@ -79,5 +95,6 @@ TDebugOptions DebugOptions::ToThrift() const {
   result.__set_node_id(node_id_);
   result.__set_phase(phase_);
   result.__set_action(action_);
+  result.__set_action_param(action_param_);
   return result;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/be/src/runtime/debug-options.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/debug-options.h b/be/src/runtime/debug-options.h
index 95f1f65..2b76f5c 100644
--- a/be/src/runtime/debug-options.h
+++ b/be/src/runtime/debug-options.h
@@ -37,22 +37,33 @@ class DebugOptions {
   /// query-wide fragment instance index; -1 if not set
   int instance_idx() const { return instance_idx_; }
 
-  /// -1 if no debug options set
+  /// The node the debug option should be applied to. -1 if it should be 
applied to all
+  /// nodes.
   int node_id() const { return node_id_; }
 
+  /// True if a debug action is enabled.
+  bool enabled() const { return phase_ != TExecNodePhase::INVALID; }
+
   TDebugAction::type action() const { return action_; }
   TExecNodePhase::type phase() const { return phase_; }
+  std::string action_param() const { return action_param_; }
 
  private:
   int instance_idx_;
   int node_id_;
   TDebugAction::type action_;
-  TExecNodePhase::type phase_;  // INVALID: debug options invalid
+  TExecNodePhase::type phase_; // INVALID: debug options invalid
+  // A string parameter that goes along with 'action_'. The semantics depend 
on the
+  // specific action.
+  std::string action_param_;
 
   static TExecNodePhase::type GetExecNodePhase(const std::string& key);
-  static TDebugAction::type GetDebugAction(const std::string& key);
+  // Parse a debug action from a string. Either of format "ACTION_TYPE" or
+  // "ACTION_TYPE@<param value>". Returns true when 'action' is a valid debug 
action
+  // string.
+  static bool GetDebugAction(
+      const std::string& action, TDebugAction::type* type, std::string* 
action_param);
 };
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index b477299..b1084ed 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -310,9 +310,13 @@ struct TClientRequest {
 // Debug options: perform some action in a particular phase of a particular 
node
 // TODO: find a better name
 struct TDebugOptions {
+  // The plan node that this action should be applied to. If -1 it is applied 
to all plan
+  // nodes.
   1: optional Types.TPlanNodeId node_id
   2: optional PlanNodes.TExecNodePhase phase
   3: optional PlanNodes.TDebugAction action
+  // Optional parameter that goes along with the action.
+  4: optional string action_param
 }
 
 // Context of this query, including the client request, session state and

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 468ca44..ae9faa9 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -67,6 +67,9 @@ enum TDebugAction {
   FAIL,
   INJECT_ERROR_LOG,
   MEM_LIMIT_EXCEEDED,
+  // A floating point number in range [0.0, 1.0] that gives the probability of 
denying
+  // each reservation increase request after the initial reservation.
+  SET_DENY_RESERVATION_PROBABILITY,
 }
 
 // Preference for replica selection

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa0ee1e1/tests/query_test/test_spilling.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_spilling.py 
b/tests/query_test/test_spilling.py
index e2d5141..d029f44 100644
--- a/tests/query_test/test_spilling.py
+++ b/tests/query_test/test_spilling.py
@@ -21,6 +21,16 @@ from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import 
(create_exec_option_dimension_from_dict,
     create_parquet_dimension)
 
+# Test with denial of reservations at varying frequency.
+DEBUG_ACTION_DIMS = [None,
+  '-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@0.1',
+  '-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@0.5',
+  '-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@0.9',
+  '-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0']
+
+
+@pytest.mark.xfail(pytest.config.option.testing_remote_cluster,
+                   reason='Queries may not spill on larger clusters')
 class TestSpilling(ImpalaTestSuite):
   @classmethod
   def get_workload(self):
@@ -33,7 +43,8 @@ class TestSpilling(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(create_parquet_dimension('tpch'))
     # Tests are calibrated so that they can execute and spill with this page 
size.
     cls.ImpalaTestMatrix.add_dimension(
-        
create_exec_option_dimension_from_dict({'default_spillable_buffer_size' : 
['256k']}))
+        
create_exec_option_dimension_from_dict({'default_spillable_buffer_size' : 
['256k'],
+          'debug_action' : DEBUG_ACTION_DIMS}))
 
   def test_spilling(self, vector):
     self.run_test_case('QueryTest/spilling', vector)

Reply via email to