KUDU-2289: Tablet deletion should be throttled When a table is deleted, the master eagerly sends DeleteTablet requests for every replica of every tablet. Since DeleteTablet can be IO-heavy and DeleteTablet is run by service threads, deleting tables could harm other concurrent workloads.
This changes DeleteTablet to run on a threadpool. The number of threads is capped by --num_tablets_to_delete_simultaneously, which default to the number of data dirs, a proxy for the number of disks. This should help throttle tablet deletions, both preventing them from monopolizing service threads and limiting their IO. Change-Id: I3819bf8a3acf8ea03a76cc6cacd92d85bb114998 Reviewed-on: http://gerrit.cloudera.org:8080/9551 Reviewed-by: Adar Dembo <a...@cloudera.com> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/165688f8 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/165688f8 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/165688f8 Branch: refs/heads/master Commit: 165688f83bbb00a41090e909ae4e17c463fa75e4 Parents: 7e35aee Author: Will Berkeley <wdberke...@apache.org> Authored: Wed Mar 7 08:52:59 2018 -0800 Committer: Will Berkeley <wdberke...@gmail.com> Committed: Thu Apr 5 19:52:34 2018 +0000 ---------------------------------------------------------------------- src/kudu/tserver/tablet_service.cc | 22 ++--- src/kudu/tserver/ts_tablet_manager.cc | 140 +++++++++++++++++++++++------ src/kudu/tserver/ts_tablet_manager.h | 27 ++++-- src/kudu/util/threadpool.h | 2 +- 4 files changed, 147 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/165688f8/src/kudu/tserver/tablet_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 3854f5c..2f4d285 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -805,16 +805,18 @@ void TabletServiceAdminImpl::DeleteTablet(const DeleteTabletRequestPB* req, if (req->has_cas_config_opid_index_less_or_equal()) { cas_config_opid_index_less_or_equal = req->cas_config_opid_index_less_or_equal(); } - TabletServerErrorPB::Code error_code; - Status s = server_->tablet_manager()->DeleteTablet(req->tablet_id(), - delete_type, - cas_config_opid_index_less_or_equal, - &error_code); - if (PREDICT_FALSE(!s.ok())) { - HandleErrorResponse(req, resp, context, error_code, s); - return; - } - context->RespondSuccess(); + + auto response_callback = [context, req, resp](const Status& s, TabletServerErrorPB::Code code) { + if (PREDICT_FALSE(!s.ok())) { + HandleErrorResponse(req, resp, context, code, s); + return; + } + context->RespondSuccess(); + }; + server_->tablet_manager()->DeleteTabletAsync(req->tablet_id(), + delete_type, + cas_config_opid_index_less_or_equal, + response_callback); } void TabletServiceImpl::Write(const WriteRequestPB* req, http://git-wip-us.apache.org/repos/asf/kudu/blob/165688f8/src/kudu/tserver/ts_tablet_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc index f9341af..034d6ab 100644 --- a/src/kudu/tserver/ts_tablet_manager.cc +++ b/src/kudu/tserver/ts_tablet_manager.cc @@ -84,6 +84,13 @@ DEFINE_int32(num_tablets_to_open_simultaneously, 0, "may make sense to manually tune this."); TAG_FLAG(num_tablets_to_open_simultaneously, advanced); +DEFINE_int32(num_tablets_to_delete_simultaneously, 0, + "Number of threads available to delete tablets. If this is set to 0 (the " + "default), then the number of delete threads will be set based on the number " + "of data directories. If the data directories are on some very fast storage " + "device such as SSD or a RAID array, it may make sense to manually tune this."); +TAG_FLAG(num_tablets_to_delete_simultaneously, advanced); + DEFINE_int32(tablet_start_warn_threshold_ms, 500, "If a tablet takes more than this number of millis to start, issue " "a warning with a trace."); @@ -119,6 +126,10 @@ DEFINE_int32(tablet_state_walk_min_period_ms, 1000, "tablet map to update tablet state counts."); TAG_FLAG(tablet_state_walk_min_period_ms, advanced); +DEFINE_int32(delete_tablet_inject_latency_ms, 0, + "Amount of delay in milliseconds to inject into delete tablet operations."); +TAG_FLAG(delete_tablet_inject_latency_ms, unsafe); + DECLARE_bool(raft_prepare_replacement_before_eviction); METRIC_DEFINE_gauge_int32(server, tablets_num_not_initialized, @@ -247,6 +258,41 @@ TSTabletManager::TSTabletManager(TabletServer* server) ->AutoDetach(&metric_detacher_); } +// Base class for Runnables submitted against TSTabletManager threadpools whose +// whose callback must fire, for example if the callback responds to an RPC. +class TabletManagerRunnable : public Runnable { +public: + TabletManagerRunnable(TSTabletManager* ts_tablet_manager, + std::function<void(const Status&, TabletServerErrorPB::Code)> cb) + : ts_tablet_manager_(ts_tablet_manager), + cb_(std::move(cb)) { + } + + virtual ~TabletManagerRunnable() { + // If the Runnable is destroyed without the Run() method being invoked, we + // must invoke the user callback ourselves in order to free request + // resources. This may happen when the ThreadPool is shut down while the + // Runnable is enqueued. + if (!cb_invoked_) { + cb_(Status::ServiceUnavailable("Tablet server shutting down"), + TabletServerErrorPB::THROTTLED); + } + } + + // Disable automatic invocation of the callback by the destructor. + // Does not disable invocation of the callback by Run(). + void DisableCallback() { + cb_invoked_ = true; + } + +protected: + TSTabletManager* const ts_tablet_manager_; + const std::function<void(const Status&, TabletServerErrorPB::Code)> cb_; + bool cb_invoked_ = false; + + DISALLOW_COPY_AND_ASSIGN(TabletManagerRunnable); +}; + TSTabletManager::~TSTabletManager() { } @@ -261,7 +307,7 @@ Status TSTabletManager::Init() { .set_max_threads(FLAGS_num_tablets_to_copy_simultaneously) .Build(&tablet_copy_pool_)); - // Start the threadpool we'll use to open tablets. + // Start the threadpools we'll use to open and delete tablets. // This has to be done in Init() instead of the constructor, since the // FsManager isn't initialized until this point. int max_open_threads = FLAGS_num_tablets_to_open_simultaneously; @@ -272,6 +318,14 @@ Status TSTabletManager::Init() { RETURN_NOT_OK(ThreadPoolBuilder("tablet-open") .set_max_threads(max_open_threads) .Build(&open_tablet_pool_)); + int max_delete_threads = FLAGS_num_tablets_to_delete_simultaneously; + if (max_delete_threads == 0) { + // Default to the number of disks. + max_delete_threads = fs_manager_->GetDataRootDirs().size(); + } + RETURN_NOT_OK(ThreadPoolBuilder("tablet-delete") + .set_max_threads(max_delete_threads) + .Build(&delete_tablet_pool_)); // Search for tablets in the metadata dir. vector<string> tablet_ids; @@ -417,42 +471,22 @@ Status TSTabletManager::CheckLeaderTermNotLower(const string& tablet_id, } // Tablet Copy runnable that will run on a ThreadPool. -class TabletCopyRunnable : public Runnable { +class TabletCopyRunnable : public TabletManagerRunnable { public: TabletCopyRunnable(TSTabletManager* ts_tablet_manager, const StartTabletCopyRequestPB* req, std::function<void(const Status&, TabletServerErrorPB::Code)> cb) - : ts_tablet_manager_(ts_tablet_manager), - req_(req), - cb_(std::move(cb)) { + : TabletManagerRunnable(ts_tablet_manager, cb), + req_(req) { } - virtual ~TabletCopyRunnable() { - // If the Runnable is destroyed without the Run() method being invoked, we - // must invoke the user callback ourselves in order to free request - // resources. This may happen when the ThreadPool is shut down while the - // Runnable is enqueued. - if (!cb_invoked_) { - cb_(Status::ServiceUnavailable("Tablet server shutting down"), - TabletServerErrorPB::THROTTLED); - } - } - - virtual void Run() override { + void Run() override { ts_tablet_manager_->RunTabletCopy(req_, cb_); cb_invoked_ = true; } - // Disable automatic invocation of the callback by the destructor. - void DisableCallback() { - cb_invoked_ = true; - } - private: - TSTabletManager* const ts_tablet_manager_; const StartTabletCopyRequestPB* const req_; - const std::function<void(const Status&, TabletServerErrorPB::Code)> cb_; - bool cb_invoked_ = false; DISALLOW_COPY_AND_ASSIGN(TabletCopyRunnable); }; @@ -774,6 +808,53 @@ Status TSTabletManager::BeginReplicaStateTransition( return Status::OK(); } +// Delete Tablet runnable that will run on a ThreadPool. +class DeleteTabletRunnable : public TabletManagerRunnable { +public: + DeleteTabletRunnable(TSTabletManager* ts_tablet_manager, + const std::string& tablet_id, + tablet::TabletDataState delete_type, + const boost::optional<int64_t>& cas_config_index, + std::function<void(const Status&, TabletServerErrorPB::Code)> cb) + : TabletManagerRunnable(ts_tablet_manager, cb), + tablet_id_(tablet_id), + delete_type_(delete_type), + cas_config_index_(cas_config_index) { + } + + void Run() override { + TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR; + Status s = ts_tablet_manager_->DeleteTablet(tablet_id_, delete_type_, cas_config_index_, &code); + cb_(s, code); + cb_invoked_ = true; + } + +private: + const string tablet_id_; + const tablet::TabletDataState delete_type_; + const boost::optional<int64_t> cas_config_index_; + + DISALLOW_COPY_AND_ASSIGN(DeleteTabletRunnable); +}; + +void TSTabletManager::DeleteTabletAsync( + const std::string& tablet_id, + tablet::TabletDataState delete_type, + const boost::optional<int64_t>& cas_config_index, + std::function<void(const Status&, TabletServerErrorPB::Code)> cb) { + auto runnable = std::make_shared<DeleteTabletRunnable>(this, tablet_id, delete_type, + cas_config_index, cb); + Status s = delete_tablet_pool_->Submit(runnable); + if (PREDICT_TRUE(s.ok())) { + return; + } + + // Threadpool submission failed, so we'll invoke the callback ourselves. + runnable->DisableCallback(); + cb(s, s.IsServiceUnavailable() ? TabletServerErrorPB::THROTTLED : + TabletServerErrorPB::UNKNOWN_ERROR); +} + Status TSTabletManager::DeleteTablet( const string& tablet_id, TabletDataState delete_type, @@ -789,6 +870,12 @@ Status TSTabletManager::DeleteTablet( TRACE("Deleting tablet $0", tablet_id); + if (PREDICT_FALSE(FLAGS_delete_tablet_inject_latency_ms > 0)) { + LOG(WARNING) << "Injecting " << FLAGS_delete_tablet_inject_latency_ms + << "ms of latency into DeleteTablet"; + SleepFor(MonoDelta::FromMilliseconds(FLAGS_delete_tablet_inject_latency_ms)); + } + scoped_refptr<TabletReplica> replica; scoped_refptr<TransitionInProgressDeleter> deleter; RETURN_NOT_OK(BeginReplicaStateTransition(tablet_id, "deleting tablet", &replica, @@ -1044,6 +1131,9 @@ void TSTabletManager::Shutdown() { // Shut down the bootstrap pool, so no new tablets are registered after this point. open_tablet_pool_->Shutdown(); + // Shut down the delete pool, so no new tablets are deleted after this point. + delete_tablet_pool_->Shutdown(); + // Take a snapshot of the replicas list -- that way we don't have to hold // on to the lock while shutting them down, which might cause a lock // inversion. (see KUDU-308 for example). http://git-wip-us.apache.org/repos/asf/kudu/blob/165688f8/src/kudu/tserver/ts_tablet_manager.h ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h index a02a13c..742b5a4 100644 --- a/src/kudu/tserver/ts_tablet_manager.h +++ b/src/kudu/tserver/ts_tablet_manager.h @@ -122,14 +122,22 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf { consensus::RaftConfigPB config, scoped_refptr<tablet::TabletReplica>* replica); - // Delete the specified tablet. - // 'delete_type' must be one of TABLET_DATA_DELETED or TABLET_DATA_TOMBSTONED - // or else returns Status::IllegalArgument. - // 'cas_config_index' is optionally specified to enable an - // atomic DeleteTablet operation that only occurs if the latest committed - // Raft config change op has an opid_index equal to or less than the specified - // value. If not, 'error_code' is set to CAS_FAILED and a non-OK Status is - // returned. + // Delete the specified tablet asynchronously with callback 'cb'. + // - If the async task cannot be started, 'cb' will be called with + // Status::ServiceUnavailable and TabletServerErrorPB::THROTTLED. + // - 'delete_type' must be one of TABLET_DATA_DELETED or TABLET_DATA_TOMBSTONED. + // - 'cas_config_index' is optionally specified to enable an + // atomic DeleteTablet operation that only occurs if the latest committed + // Raft config change op has an opid_index equal to or less than the specified + // value. If not, the callback is called with a non-OK Status and error code + // CAS_FAILED. + void DeleteTabletAsync(const std::string& tablet_id, + tablet::TabletDataState delete_type, + const boost::optional<int64_t>& cas_config_index, + std::function<void(const Status&, TabletServerErrorPB::Code)> cb); + + // Delete the specified tablet synchronously. + // See DeleteTabletAsync() for more information. Status DeleteTablet(const std::string& tablet_id, tablet::TabletDataState delete_type, const boost::optional<int64_t>& cas_config_index, @@ -361,6 +369,9 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf { // Thread pool used to open the tablets async, whether bootstrap is required or not. gscoped_ptr<ThreadPool> open_tablet_pool_; + // Thread pool used to delete tablets asynchronously. + gscoped_ptr<ThreadPool> delete_tablet_pool_; + FunctionGaugeDetacher metric_detacher_; DISALLOW_COPY_AND_ASSIGN(TSTabletManager); http://git-wip-us.apache.org/repos/asf/kudu/blob/165688f8/src/kudu/util/threadpool.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h index f1a7a37..1557486 100644 --- a/src/kudu/util/threadpool.h +++ b/src/kudu/util/threadpool.h @@ -88,7 +88,7 @@ struct ThreadPoolMetrics { // The TraceMetrics implementation relies on the number of distinct counter // names being small. Thus, if the thread pool name itself is dynamically // generated, the default behavior described above would result in an -// unbounded number of distinct cunter names. The 'trace_metric_prefix' +// unbounded number of distinct counter names. The 'trace_metric_prefix' // setting can be used to override the prefix used in generating the trace // metric names. //