rpc: micro-optimize delayed task handling

Previously we put all pending DelayedTasks in an STL set<>. However, we
don't really need a set -- using an intrusive doubly linked list is
sufficient and provides O(1) removal instead of O(lg n). This speeds up
the new unit test significantly.

I measured the new test using the following command line before and
after three times:
  periodic-test --gtest_filter=\*Perf\* --gtest_repeat=10 2>&1 | grep User | 
tee -a /tmp/after

and then ran a t-test on the difference in CPU time:

data:  d.before and d.after
t = 19.097, df = 48.327, p-value < 2.2e-16
alternative hypothesis: true difference in means is not equal to 0
95 percent confidence interval:
 0.09643424 0.11912596
sample estimates:
mean of x mean of y
0.4324359 0.3246558

So this saves a noticeable amount of CPU when there are a lot of pending
tasks.

Change-Id: I3b6be5ef7e8f464f3bc4c62f904e2692b30ddc65
Reviewed-on: http://gerrit.cloudera.org:8080/9048
Tested-by: Kudu Jenkins
Reviewed-by: Michael Ho <k...@cloudera.com>
Reviewed-by: Todd Lipcon <t...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bf6c2c07
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bf6c2c07
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bf6c2c07

Branch: refs/heads/master
Commit: bf6c2c07b1edfc8d38d64ffb242b3f303d54144b
Parents: f27f5ad
Author: Todd Lipcon <t...@apache.org>
Authored: Wed Jan 17 15:59:44 2018 -0800
Committer: Todd Lipcon <t...@apache.org>
Committed: Mon Feb 5 04:21:34 2018 +0000

----------------------------------------------------------------------
 src/kudu/rpc/periodic-test.cc | 36 ++++++++++++++++++++++++++++++++++++
 src/kudu/rpc/reactor.cc       | 19 +++++++++++--------
 src/kudu/rpc/reactor.h        |  3 +--
 3 files changed, 48 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bf6c2c07/src/kudu/rpc/periodic-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/periodic-test.cc b/src/kudu/rpc/periodic-test.cc
index 08f379f..cd793bd 100644
--- a/src/kudu/rpc/periodic-test.cc
+++ b/src/kudu/rpc/periodic-test.cc
@@ -18,19 +18,25 @@
 #include <atomic>
 #include <cstdint>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <utility>
+#include <vector>
 
+#include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/periodic.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
 using std::atomic;
 using std::shared_ptr;
+using std::vector;
 
 namespace kudu {
 namespace rpc {
@@ -255,5 +261,35 @@ TEST_F(PeriodicTimerTest, 
TestCallbackRestartsOneShotTimer) {
   messenger->Shutdown();
 }
 
+TEST_F(PeriodicTimerTest, TestPerformance) {
+  const int kNumTimers = 1000;
+  shared_ptr<Messenger> messenger;
+  ASSERT_OK(MessengerBuilder("test")
+            .set_num_reactors(1)
+            .Build(&messenger));
+  SCOPED_CLEANUP({ messenger->Shutdown(); });
+
+  vector<shared_ptr<PeriodicTimer>> timers;
+  for (int i = 0; i < kNumTimers; i++) {
+    timers.emplace_back(PeriodicTimer::Create(
+        messenger,
+        [&] {}, // No-op.
+        MonoDelta::FromMilliseconds(10)));
+    timers.back()->Start();
+  }
+
+  Stopwatch sw(Stopwatch::ALL_THREADS);
+  sw.start();
+  SleepFor(MonoDelta::FromSeconds(1));
+  sw.stop();
+  LOG(INFO) << "User CPU for running " << kNumTimers << " timers for 1 second: 
"
+            << sw.elapsed().user_cpu_seconds() << "s";
+
+  for (auto& t : timers) {
+    t->Stop();
+  }
+
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/bf6c2c07/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index f7ff78b..e841be6 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -148,7 +148,7 @@ Status ReactorThread::Init() {
   DVLOG(6) << "Called ReactorThread::Init()";
   // Register to get async notifications in our epoll loop.
   async_.set(loop_);
-  async_.set<ReactorThread, &ReactorThread::AsyncHandler>(this);
+  async_.set<ReactorThread, &ReactorThread::AsyncHandler>(this); // NOLINT(*)
   async_.start();
 
   // Register the timer watcher.
@@ -243,10 +243,11 @@ void ReactorThread::ShutdownInternal() {
   // These won't be found in the ReactorThread's list of pending tasks
   // because they've been "run" (that is, they've been scheduled).
   Status aborted = ShutdownError(true); // aborted
-  for (DelayedTask* task : scheduled_tasks_) {
-    task->Abort(aborted); // should also free the task.
+  while (!scheduled_tasks_.empty()) {
+    DelayedTask* t = &scheduled_tasks_.front();
+    scheduled_tasks_.pop_front();
+    t->Abort(aborted); // should also free the task.
   }
-  scheduled_tasks_.clear();
 
   // Remove the OpenSSL thread state.
   ERR_remove_thread_state(nullptr);
@@ -675,14 +676,15 @@ DelayedTask::DelayedTask(boost::function<void(const 
Status&)> func,
 void DelayedTask::Run(ReactorThread* thread) {
   DCHECK(thread_ == nullptr) << "Task has already been scheduled";
   DCHECK(thread->IsCurrentThread());
+  DCHECK(!is_linked()) << "Should not be linked on pending_tasks_ anymore";
 
   // Schedule the task to run later.
   thread_ = thread;
   timer_.set(thread->loop_);
-  timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this);
+  timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this); // NOLINT(*)
   timer_.start(when_.ToSeconds(), // after
                0);                // repeat
-  thread_->scheduled_tasks_.insert(this);
+  thread_->scheduled_tasks_.push_back(*this);
 }
 
 void DelayedTask::Abort(const Status& abort_status) {
@@ -690,9 +692,10 @@ void DelayedTask::Abort(const Status& abort_status) {
   delete this;
 }
 
-void DelayedTask::TimerHandler(ev::timer& watcher, int revents) {
+void DelayedTask::TimerHandler(ev::timer& /*watcher*/, int revents) {
+  DCHECK(is_linked()) << "should be linked on scheduled_tasks_";
   // We will free this task's memory.
-  thread_->scheduled_tasks_.erase(this);
+  
thread_->scheduled_tasks_.erase(thread_->scheduled_tasks_.iterator_to(*this));
 
   if (EV_ERROR & revents) {
     string msg = "Delayed task got an error in its timer handler";

http://git-wip-us.apache.org/repos/asf/kudu/blob/bf6c2c07/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index cef966b..8884f54 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -20,7 +20,6 @@
 #include <cstdint>
 #include <list>
 #include <memory>
-#include <set>
 #include <string>
 #include <unordered_map>
 
@@ -287,7 +286,7 @@ class ReactorThread {
   //
   // Each task owns its own memory and must be freed by its TaskRun and
   // Abort members, provided it was allocated on the heap.
-  std::set<DelayedTask*> scheduled_tasks_;
+  boost::intrusive::list<DelayedTask> scheduled_tasks_;
 
   // The current monotonic time.  Updated every coarse_timer_granularity_secs_.
   MonoTime cur_time_;

Reply via email to