rpc: micro-optimize delayed task handling Previously we put all pending DelayedTasks in an STL set<>. However, we don't really need a set -- using an intrusive doubly linked list is sufficient and provides O(1) removal instead of O(lg n). This speeds up the new unit test significantly.
I measured the new test using the following command line before and after three times: periodic-test --gtest_filter=\*Perf\* --gtest_repeat=10 2>&1 | grep User | tee -a /tmp/after and then ran a t-test on the difference in CPU time: data: d.before and d.after t = 19.097, df = 48.327, p-value < 2.2e-16 alternative hypothesis: true difference in means is not equal to 0 95 percent confidence interval: 0.09643424 0.11912596 sample estimates: mean of x mean of y 0.4324359 0.3246558 So this saves a noticeable amount of CPU when there are a lot of pending tasks. Change-Id: I3b6be5ef7e8f464f3bc4c62f904e2692b30ddc65 Reviewed-on: http://gerrit.cloudera.org:8080/9048 Tested-by: Kudu Jenkins Reviewed-by: Michael Ho <k...@cloudera.com> Reviewed-by: Todd Lipcon <t...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bf6c2c07 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bf6c2c07 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bf6c2c07 Branch: refs/heads/master Commit: bf6c2c07b1edfc8d38d64ffb242b3f303d54144b Parents: f27f5ad Author: Todd Lipcon <t...@apache.org> Authored: Wed Jan 17 15:59:44 2018 -0800 Committer: Todd Lipcon <t...@apache.org> Committed: Mon Feb 5 04:21:34 2018 +0000 ---------------------------------------------------------------------- src/kudu/rpc/periodic-test.cc | 36 ++++++++++++++++++++++++++++++++++++ src/kudu/rpc/reactor.cc | 19 +++++++++++-------- src/kudu/rpc/reactor.h | 3 +-- 3 files changed, 48 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/bf6c2c07/src/kudu/rpc/periodic-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/periodic-test.cc b/src/kudu/rpc/periodic-test.cc index 08f379f..cd793bd 100644 --- a/src/kudu/rpc/periodic-test.cc +++ b/src/kudu/rpc/periodic-test.cc @@ -18,19 +18,25 @@ #include <atomic> #include <cstdint> #include <memory> +#include <ostream> #include <string> #include <utility> +#include <vector> +#include <glog/logging.h> #include <gtest/gtest.h> #include "kudu/rpc/messenger.h" #include "kudu/rpc/periodic.h" #include "kudu/util/monotime.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/stopwatch.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" using std::atomic; using std::shared_ptr; +using std::vector; namespace kudu { namespace rpc { @@ -255,5 +261,35 @@ TEST_F(PeriodicTimerTest, TestCallbackRestartsOneShotTimer) { messenger->Shutdown(); } +TEST_F(PeriodicTimerTest, TestPerformance) { + const int kNumTimers = 1000; + shared_ptr<Messenger> messenger; + ASSERT_OK(MessengerBuilder("test") + .set_num_reactors(1) + .Build(&messenger)); + SCOPED_CLEANUP({ messenger->Shutdown(); }); + + vector<shared_ptr<PeriodicTimer>> timers; + for (int i = 0; i < kNumTimers; i++) { + timers.emplace_back(PeriodicTimer::Create( + messenger, + [&] {}, // No-op. + MonoDelta::FromMilliseconds(10))); + timers.back()->Start(); + } + + Stopwatch sw(Stopwatch::ALL_THREADS); + sw.start(); + SleepFor(MonoDelta::FromSeconds(1)); + sw.stop(); + LOG(INFO) << "User CPU for running " << kNumTimers << " timers for 1 second: " + << sw.elapsed().user_cpu_seconds() << "s"; + + for (auto& t : timers) { + t->Stop(); + } + +} + } // namespace rpc } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/bf6c2c07/src/kudu/rpc/reactor.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc index f7ff78b..e841be6 100644 --- a/src/kudu/rpc/reactor.cc +++ b/src/kudu/rpc/reactor.cc @@ -148,7 +148,7 @@ Status ReactorThread::Init() { DVLOG(6) << "Called ReactorThread::Init()"; // Register to get async notifications in our epoll loop. async_.set(loop_); - async_.set<ReactorThread, &ReactorThread::AsyncHandler>(this); + async_.set<ReactorThread, &ReactorThread::AsyncHandler>(this); // NOLINT(*) async_.start(); // Register the timer watcher. @@ -243,10 +243,11 @@ void ReactorThread::ShutdownInternal() { // These won't be found in the ReactorThread's list of pending tasks // because they've been "run" (that is, they've been scheduled). Status aborted = ShutdownError(true); // aborted - for (DelayedTask* task : scheduled_tasks_) { - task->Abort(aborted); // should also free the task. + while (!scheduled_tasks_.empty()) { + DelayedTask* t = &scheduled_tasks_.front(); + scheduled_tasks_.pop_front(); + t->Abort(aborted); // should also free the task. } - scheduled_tasks_.clear(); // Remove the OpenSSL thread state. ERR_remove_thread_state(nullptr); @@ -675,14 +676,15 @@ DelayedTask::DelayedTask(boost::function<void(const Status&)> func, void DelayedTask::Run(ReactorThread* thread) { DCHECK(thread_ == nullptr) << "Task has already been scheduled"; DCHECK(thread->IsCurrentThread()); + DCHECK(!is_linked()) << "Should not be linked on pending_tasks_ anymore"; // Schedule the task to run later. thread_ = thread; timer_.set(thread->loop_); - timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this); + timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this); // NOLINT(*) timer_.start(when_.ToSeconds(), // after 0); // repeat - thread_->scheduled_tasks_.insert(this); + thread_->scheduled_tasks_.push_back(*this); } void DelayedTask::Abort(const Status& abort_status) { @@ -690,9 +692,10 @@ void DelayedTask::Abort(const Status& abort_status) { delete this; } -void DelayedTask::TimerHandler(ev::timer& watcher, int revents) { +void DelayedTask::TimerHandler(ev::timer& /*watcher*/, int revents) { + DCHECK(is_linked()) << "should be linked on scheduled_tasks_"; // We will free this task's memory. - thread_->scheduled_tasks_.erase(this); + thread_->scheduled_tasks_.erase(thread_->scheduled_tasks_.iterator_to(*this)); if (EV_ERROR & revents) { string msg = "Delayed task got an error in its timer handler"; http://git-wip-us.apache.org/repos/asf/kudu/blob/bf6c2c07/src/kudu/rpc/reactor.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h index cef966b..8884f54 100644 --- a/src/kudu/rpc/reactor.h +++ b/src/kudu/rpc/reactor.h @@ -20,7 +20,6 @@ #include <cstdint> #include <list> #include <memory> -#include <set> #include <string> #include <unordered_map> @@ -287,7 +286,7 @@ class ReactorThread { // // Each task owns its own memory and must be freed by its TaskRun and // Abort members, provided it was allocated on the heap. - std::set<DelayedTask*> scheduled_tasks_; + boost::intrusive::list<DelayedTask> scheduled_tasks_; // The current monotonic time. Updated every coarse_timer_granularity_secs_. MonoTime cur_time_;