KUDU-2289: Tablet deletion should be throttled

When a table is deleted, the master eagerly sends DeleteTablet
requests for every replica of every tablet. Since DeleteTablet
can be IO-heavy and DeleteTablet is run by service threads,
deleting tables could harm other concurrent workloads.

This changes DeleteTablet to run on a threadpool. The number
of threads is capped by --num_tablets_to_delete_simultaneously,
which default to the number of data dirs, a proxy for the
number of disks. This should help throttle tablet deletions,
both preventing them from monopolizing service threads and
limiting their IO.

Change-Id: I3819bf8a3acf8ea03a76cc6cacd92d85bb114998
Reviewed-on: http://gerrit.cloudera.org:8080/9551
Reviewed-by: Adar Dembo <a...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/165688f8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/165688f8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/165688f8

Branch: refs/heads/master
Commit: 165688f83bbb00a41090e909ae4e17c463fa75e4
Parents: 7e35aee
Author: Will Berkeley <wdberke...@apache.org>
Authored: Wed Mar 7 08:52:59 2018 -0800
Committer: Will Berkeley <wdberke...@gmail.com>
Committed: Thu Apr 5 19:52:34 2018 +0000

----------------------------------------------------------------------
 src/kudu/tserver/tablet_service.cc    |  22 ++---
 src/kudu/tserver/ts_tablet_manager.cc | 140 +++++++++++++++++++++++------
 src/kudu/tserver/ts_tablet_manager.h  |  27 ++++--
 src/kudu/util/threadpool.h            |   2 +-
 4 files changed, 147 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/165688f8/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc 
b/src/kudu/tserver/tablet_service.cc
index 3854f5c..2f4d285 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -805,16 +805,18 @@ void TabletServiceAdminImpl::DeleteTablet(const 
DeleteTabletRequestPB* req,
   if (req->has_cas_config_opid_index_less_or_equal()) {
     cas_config_opid_index_less_or_equal = 
req->cas_config_opid_index_less_or_equal();
   }
-  TabletServerErrorPB::Code error_code;
-  Status s = server_->tablet_manager()->DeleteTablet(req->tablet_id(),
-                                                     delete_type,
-                                                     
cas_config_opid_index_less_or_equal,
-                                                     &error_code);
-  if (PREDICT_FALSE(!s.ok())) {
-    HandleErrorResponse(req, resp, context, error_code, s);
-    return;
-  }
-  context->RespondSuccess();
+
+  auto response_callback = [context, req, resp](const Status& s, 
TabletServerErrorPB::Code code) {
+    if (PREDICT_FALSE(!s.ok())) {
+      HandleErrorResponse(req, resp, context, code, s);
+      return;
+    }
+    context->RespondSuccess();
+  };
+  server_->tablet_manager()->DeleteTabletAsync(req->tablet_id(),
+                                               delete_type,
+                                               
cas_config_opid_index_less_or_equal,
+                                               response_callback);
 }
 
 void TabletServiceImpl::Write(const WriteRequestPB* req,

http://git-wip-us.apache.org/repos/asf/kudu/blob/165688f8/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc 
b/src/kudu/tserver/ts_tablet_manager.cc
index f9341af..034d6ab 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -84,6 +84,13 @@ DEFINE_int32(num_tablets_to_open_simultaneously, 0,
              "may make sense to manually tune this.");
 TAG_FLAG(num_tablets_to_open_simultaneously, advanced);
 
+DEFINE_int32(num_tablets_to_delete_simultaneously, 0,
+             "Number of threads available to delete tablets. If this is set to 
0 (the "
+             "default), then the number of delete threads will be set based on 
the number "
+             "of data directories. If the data directories are on some very 
fast storage "
+             "device such as SSD or a RAID array, it may make sense to 
manually tune this.");
+TAG_FLAG(num_tablets_to_delete_simultaneously, advanced);
+
 DEFINE_int32(tablet_start_warn_threshold_ms, 500,
              "If a tablet takes more than this number of millis to start, 
issue "
              "a warning with a trace.");
@@ -119,6 +126,10 @@ DEFINE_int32(tablet_state_walk_min_period_ms, 1000,
              "tablet map to update tablet state counts.");
 TAG_FLAG(tablet_state_walk_min_period_ms, advanced);
 
+DEFINE_int32(delete_tablet_inject_latency_ms, 0,
+             "Amount of delay in milliseconds to inject into delete tablet 
operations.");
+TAG_FLAG(delete_tablet_inject_latency_ms, unsafe);
+
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 
 METRIC_DEFINE_gauge_int32(server, tablets_num_not_initialized,
@@ -247,6 +258,41 @@ TSTabletManager::TSTabletManager(TabletServer* server)
       ->AutoDetach(&metric_detacher_);
 }
 
+// Base class for Runnables submitted against TSTabletManager threadpools whose
+// whose callback must fire, for example if the callback responds to an RPC.
+class TabletManagerRunnable : public Runnable {
+public:
+  TabletManagerRunnable(TSTabletManager* ts_tablet_manager,
+                        std::function<void(const Status&, 
TabletServerErrorPB::Code)> cb)
+      : ts_tablet_manager_(ts_tablet_manager),
+        cb_(std::move(cb)) {
+  }
+
+  virtual ~TabletManagerRunnable() {
+    // If the Runnable is destroyed without the Run() method being invoked, we
+    // must invoke the user callback ourselves in order to free request
+    // resources. This may happen when the ThreadPool is shut down while the
+    // Runnable is enqueued.
+    if (!cb_invoked_) {
+      cb_(Status::ServiceUnavailable("Tablet server shutting down"),
+          TabletServerErrorPB::THROTTLED);
+    }
+  }
+
+  // Disable automatic invocation of the callback by the destructor.
+  // Does not disable invocation of the callback by Run().
+  void DisableCallback() {
+    cb_invoked_ = true;
+  }
+
+protected:
+  TSTabletManager* const ts_tablet_manager_;
+  const std::function<void(const Status&, TabletServerErrorPB::Code)> cb_;
+  bool cb_invoked_ = false;
+
+  DISALLOW_COPY_AND_ASSIGN(TabletManagerRunnable);
+};
+
 TSTabletManager::~TSTabletManager() {
 }
 
@@ -261,7 +307,7 @@ Status TSTabletManager::Init() {
                 .set_max_threads(FLAGS_num_tablets_to_copy_simultaneously)
                 .Build(&tablet_copy_pool_));
 
-  // Start the threadpool we'll use to open tablets.
+  // Start the threadpools we'll use to open and delete tablets.
   // This has to be done in Init() instead of the constructor, since the
   // FsManager isn't initialized until this point.
   int max_open_threads = FLAGS_num_tablets_to_open_simultaneously;
@@ -272,6 +318,14 @@ Status TSTabletManager::Init() {
   RETURN_NOT_OK(ThreadPoolBuilder("tablet-open")
                 .set_max_threads(max_open_threads)
                 .Build(&open_tablet_pool_));
+  int max_delete_threads = FLAGS_num_tablets_to_delete_simultaneously;
+  if (max_delete_threads == 0) {
+    // Default to the number of disks.
+    max_delete_threads = fs_manager_->GetDataRootDirs().size();
+  }
+  RETURN_NOT_OK(ThreadPoolBuilder("tablet-delete")
+                .set_max_threads(max_delete_threads)
+                .Build(&delete_tablet_pool_));
 
   // Search for tablets in the metadata dir.
   vector<string> tablet_ids;
@@ -417,42 +471,22 @@ Status TSTabletManager::CheckLeaderTermNotLower(const 
string& tablet_id,
 }
 
 // Tablet Copy runnable that will run on a ThreadPool.
-class TabletCopyRunnable : public Runnable {
+class TabletCopyRunnable : public TabletManagerRunnable {
  public:
   TabletCopyRunnable(TSTabletManager* ts_tablet_manager,
                      const StartTabletCopyRequestPB* req,
                      std::function<void(const Status&, 
TabletServerErrorPB::Code)> cb)
-      : ts_tablet_manager_(ts_tablet_manager),
-        req_(req),
-        cb_(std::move(cb)) {
+      : TabletManagerRunnable(ts_tablet_manager, cb),
+        req_(req) {
   }
 
-  virtual ~TabletCopyRunnable() {
-    // If the Runnable is destroyed without the Run() method being invoked, we
-    // must invoke the user callback ourselves in order to free request
-    // resources. This may happen when the ThreadPool is shut down while the
-    // Runnable is enqueued.
-    if (!cb_invoked_) {
-      cb_(Status::ServiceUnavailable("Tablet server shutting down"),
-          TabletServerErrorPB::THROTTLED);
-    }
-  }
-
-  virtual void Run() override {
+  void Run() override {
     ts_tablet_manager_->RunTabletCopy(req_, cb_);
     cb_invoked_ = true;
   }
 
-  // Disable automatic invocation of the callback by the destructor.
-  void DisableCallback() {
-    cb_invoked_ = true;
-  }
-
  private:
-  TSTabletManager* const ts_tablet_manager_;
   const StartTabletCopyRequestPB* const req_;
-  const std::function<void(const Status&, TabletServerErrorPB::Code)> cb_;
-  bool cb_invoked_ = false;
 
   DISALLOW_COPY_AND_ASSIGN(TabletCopyRunnable);
 };
@@ -774,6 +808,53 @@ Status TSTabletManager::BeginReplicaStateTransition(
   return Status::OK();
 }
 
+// Delete Tablet runnable that will run on a ThreadPool.
+class DeleteTabletRunnable : public TabletManagerRunnable {
+public:
+  DeleteTabletRunnable(TSTabletManager* ts_tablet_manager,
+                       const std::string& tablet_id,
+                       tablet::TabletDataState delete_type,
+                       const boost::optional<int64_t>& cas_config_index,
+                       std::function<void(const Status&, 
TabletServerErrorPB::Code)> cb)
+      : TabletManagerRunnable(ts_tablet_manager, cb),
+        tablet_id_(tablet_id),
+        delete_type_(delete_type),
+        cas_config_index_(cas_config_index) {
+  }
+
+  void Run() override {
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    Status s = ts_tablet_manager_->DeleteTablet(tablet_id_, delete_type_, 
cas_config_index_, &code);
+    cb_(s, code);
+    cb_invoked_ = true;
+  }
+
+private:
+  const string tablet_id_;
+  const tablet::TabletDataState delete_type_;
+  const boost::optional<int64_t> cas_config_index_;
+
+  DISALLOW_COPY_AND_ASSIGN(DeleteTabletRunnable);
+};
+
+void TSTabletManager::DeleteTabletAsync(
+    const std::string& tablet_id,
+    tablet::TabletDataState delete_type,
+    const boost::optional<int64_t>& cas_config_index,
+    std::function<void(const Status&, TabletServerErrorPB::Code)> cb) {
+  auto runnable = std::make_shared<DeleteTabletRunnable>(this, tablet_id, 
delete_type,
+                                                         cas_config_index, cb);
+  Status s = delete_tablet_pool_->Submit(runnable);
+  if (PREDICT_TRUE(s.ok())) {
+    return;
+  }
+
+  // Threadpool submission failed, so we'll invoke the callback ourselves.
+  runnable->DisableCallback();
+  cb(s, s.IsServiceUnavailable() ? TabletServerErrorPB::THROTTLED :
+                                   TabletServerErrorPB::UNKNOWN_ERROR);
+}
+
 Status TSTabletManager::DeleteTablet(
     const string& tablet_id,
     TabletDataState delete_type,
@@ -789,6 +870,12 @@ Status TSTabletManager::DeleteTablet(
 
   TRACE("Deleting tablet $0", tablet_id);
 
+  if (PREDICT_FALSE(FLAGS_delete_tablet_inject_latency_ms > 0)) {
+    LOG(WARNING) << "Injecting " << FLAGS_delete_tablet_inject_latency_ms
+                 << "ms of latency into DeleteTablet";
+    
SleepFor(MonoDelta::FromMilliseconds(FLAGS_delete_tablet_inject_latency_ms));
+  }
+
   scoped_refptr<TabletReplica> replica;
   scoped_refptr<TransitionInProgressDeleter> deleter;
   RETURN_NOT_OK(BeginReplicaStateTransition(tablet_id, "deleting tablet", 
&replica,
@@ -1044,6 +1131,9 @@ void TSTabletManager::Shutdown() {
   // Shut down the bootstrap pool, so no new tablets are registered after this 
point.
   open_tablet_pool_->Shutdown();
 
+  // Shut down the delete pool, so no new tablets are deleted after this point.
+  delete_tablet_pool_->Shutdown();
+
   // Take a snapshot of the replicas list -- that way we don't have to hold
   // on to the lock while shutting them down, which might cause a lock
   // inversion. (see KUDU-308 for example).

http://git-wip-us.apache.org/repos/asf/kudu/blob/165688f8/src/kudu/tserver/ts_tablet_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.h 
b/src/kudu/tserver/ts_tablet_manager.h
index a02a13c..742b5a4 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -122,14 +122,22 @@ class TSTabletManager : public 
tserver::TabletReplicaLookupIf {
                          consensus::RaftConfigPB config,
                          scoped_refptr<tablet::TabletReplica>* replica);
 
-  // Delete the specified tablet.
-  // 'delete_type' must be one of TABLET_DATA_DELETED or TABLET_DATA_TOMBSTONED
-  // or else returns Status::IllegalArgument.
-  // 'cas_config_index' is optionally specified to enable an
-  // atomic DeleteTablet operation that only occurs if the latest committed
-  // Raft config change op has an opid_index equal to or less than the 
specified
-  // value. If not, 'error_code' is set to CAS_FAILED and a non-OK Status is
-  // returned.
+  // Delete the specified tablet asynchronously with callback 'cb'.
+  // - If the async task cannot be started, 'cb' will be called with
+  //   Status::ServiceUnavailable and TabletServerErrorPB::THROTTLED.
+  // - 'delete_type' must be one of TABLET_DATA_DELETED or 
TABLET_DATA_TOMBSTONED.
+  // - 'cas_config_index' is optionally specified to enable an
+  //   atomic DeleteTablet operation that only occurs if the latest committed
+  //   Raft config change op has an opid_index equal to or less than the 
specified
+  //   value. If not, the callback is called with a non-OK Status and error 
code
+  //   CAS_FAILED.
+  void DeleteTabletAsync(const std::string& tablet_id,
+                         tablet::TabletDataState delete_type,
+                         const boost::optional<int64_t>& cas_config_index,
+                         std::function<void(const Status&, 
TabletServerErrorPB::Code)> cb);
+
+  // Delete the specified tablet synchronously.
+  // See DeleteTabletAsync() for more information.
   Status DeleteTablet(const std::string& tablet_id,
                       tablet::TabletDataState delete_type,
                       const boost::optional<int64_t>& cas_config_index,
@@ -361,6 +369,9 @@ class TSTabletManager : public 
tserver::TabletReplicaLookupIf {
   // Thread pool used to open the tablets async, whether bootstrap is required 
or not.
   gscoped_ptr<ThreadPool> open_tablet_pool_;
 
+  // Thread pool used to delete tablets asynchronously.
+  gscoped_ptr<ThreadPool> delete_tablet_pool_;
+
   FunctionGaugeDetacher metric_detacher_;
 
   DISALLOW_COPY_AND_ASSIGN(TSTabletManager);

http://git-wip-us.apache.org/repos/asf/kudu/blob/165688f8/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index f1a7a37..1557486 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -88,7 +88,7 @@ struct ThreadPoolMetrics {
 //    The TraceMetrics implementation relies on the number of distinct counter
 //    names being small. Thus, if the thread pool name itself is dynamically
 //    generated, the default behavior described above would result in an
-//    unbounded number of distinct cunter names. The 'trace_metric_prefix'
+//    unbounded number of distinct counter names. The 'trace_metric_prefix'
 //    setting can be used to override the prefix used in generating the trace
 //    metric names.
 //

Reply via email to