condition_variable: rework timed waiting mechanisms and associated cleanup

This patch reworks ConditionVariable::TimedWait into two new methods:
1. WaitFor(delta), equivalent to TimedWait.
2. WaitUntil(deadline), waits for a deadline to elapse.

Besides the improved ergonomics of using WaitUntil() when all one has on
hand is a deadline, WaitUntil() also yields more precise timeouts when
dealing with spurious wakeups while waiting in a loop; the WaitFor methods
in CountDownLatch and ThreadPool stand to benefit from this.

I also cleaned up some dead portability code in condition_variable.cc.

Change-Id: Ie2d3d0d9d84c3b1a76f3efc8ae706ddcaa455630
Reviewed-on: http://gerrit.cloudera.org:8080/9572
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <t...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/038c3ebe
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/038c3ebe
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/038c3ebe

Branch: refs/heads/master
Commit: 038c3ebe77f41e3d2ebfc1d08ea8cf849edb2a5d
Parents: cda07be
Author: Adar Dembo <a...@cloudera.com>
Authored: Fri Mar 9 17:22:13 2018 -0800
Committer: Todd Lipcon <t...@apache.org>
Committed: Mon Mar 12 23:47:08 2018 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc   |  2 +-
 src/kudu/server/diagnostics_log.cc   |  2 +-
 src/kudu/tserver/heartbeater.cc      | 12 ++---
 src/kudu/tserver/scanners.cc         |  2 +-
 src/kudu/util/async_logger.cc        |  2 +-
 src/kudu/util/blocking_queue.h       |  3 +-
 src/kudu/util/condition_variable.cc  | 89 +++++++++++++------------------
 src/kudu/util/condition_variable.h   | 10 +++-
 src/kudu/util/countdown_latch.h      | 15 +++---
 src/kudu/util/maintenance_manager.cc |  2 +-
 src/kudu/util/monotime.cc            |  5 ++
 src/kudu/util/monotime.h             |  9 +++-
 src/kudu/util/pstack_watcher.cc      |  2 +-
 src/kudu/util/threadpool.cc          | 24 ++++-----
 14 files changed, 88 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index 1a76b12..35ec262 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -457,7 +457,7 @@ class CatalogManagerBgTasks {
     MutexLock lock(lock_);
     if (closing_) return;
     if (!pending_updates_) {
-      cond_.TimedWait(MonoDelta::FromMilliseconds(msec));
+      cond_.WaitFor(MonoDelta::FromMilliseconds(msec));
     }
     pending_updates_ = false;
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/server/diagnostics_log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/diagnostics_log.cc 
b/src/kudu/server/diagnostics_log.cc
index f4dce27..a185bd2 100644
--- a/src/kudu/server/diagnostics_log.cc
+++ b/src/kudu/server/diagnostics_log.cc
@@ -193,7 +193,7 @@ void DiagnosticsLog::RunThread() {
 
   while (!stop_) {
     MonoTime next_log = wakeups.top().first;
-    wake_.TimedWait(next_log - MonoTime::Now());
+    wake_.WaitUntil(next_log);
 
     string reason;
     WakeupType what;

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/tserver/heartbeater.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index e2657fb..26d0a58 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -564,14 +564,10 @@ void Heartbeater::Thread::RunThread() {
     // or for the signal to shut down.
     {
       MutexLock l(mutex_);
-      while (true) {
-        MonoDelta remaining = next_heartbeat - MonoTime::Now();
-        if (remaining.ToMilliseconds() <= 0 ||
-            heartbeat_asap_ ||
-            !should_run_) {
-          break;
-        }
-        cond_.TimedWait(remaining);
+      while (next_heartbeat > MonoTime::Now() &&
+          !heartbeat_asap_ &&
+          should_run_) {
+        cond_.WaitUntil(next_heartbeat);
       }
 
       heartbeat_asap_ = false;

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/tserver/scanners.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index eaf4836..96dccf2 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -119,7 +119,7 @@ void ScannerManager::RunRemovalThread() {
       if (shutdown_) {
         return;
       }
-      
shutdown_cv_.TimedWait(MonoDelta::FromMicroseconds(FLAGS_scanner_gc_check_interval_us));
+      
shutdown_cv_.WaitFor(MonoDelta::FromMicroseconds(FLAGS_scanner_gc_check_interval_us));
     }
     RemoveExpiredScanners();
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/async_logger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/async_logger.cc b/src/kudu/util/async_logger.cc
index 9eb0bc6..3214a42 100644
--- a/src/kudu/util/async_logger.cc
+++ b/src/kudu/util/async_logger.cc
@@ -114,7 +114,7 @@ void AsyncLogger::RunThread() {
   MutexLock l(lock_);
   while (state_ == RUNNING || active_buf_->needs_flush_or_write()) {
     while (!active_buf_->needs_flush_or_write() && state_ == RUNNING) {
-      if 
(!wake_flusher_cond_.TimedWait(MonoDelta::FromSeconds(FLAGS_logbufsecs))) {
+      if 
(!wake_flusher_cond_.WaitFor(MonoDelta::FromSeconds(FLAGS_logbufsecs))) {
         // In case of wait timeout, force it to flush regardless whether there 
is anything enqueued.
         active_buf_->flush = true;
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/blocking_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h
index 2907030..7331c12 100644
--- a/src/kudu/util/blocking_queue.h
+++ b/src/kudu/util/blocking_queue.h
@@ -26,6 +26,7 @@
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/mutex.h"
 #include "kudu/util/status.h"
 
@@ -131,7 +132,7 @@ class BlockingQueue {
       }
       if (!deadline.Initialized()) {
         not_empty_.Wait();
-      } else if (PREDICT_FALSE(!not_empty_.TimedWait(deadline - 
MonoTime::Now()))) {
+      } else if (PREDICT_FALSE(!not_empty_.WaitUntil(deadline))) {
         return Status::TimedOut("");
       }
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/condition_variable.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/condition_variable.cc 
b/src/kudu/util/condition_variable.cc
index c79a0a4..690188a 100644
--- a/src/kudu/util/condition_variable.cc
+++ b/src/kudu/util/condition_variable.cc
@@ -26,39 +26,23 @@ ConditionVariable::ConditionVariable(Mutex* user_lock)
 #endif
 {
   int rv = 0;
-  // http://crbug.com/293736
-  // NaCl doesn't support monotonic clock based absolute deadlines.
-  // On older Android platform versions, it's supported through the
-  // non-standard pthread_cond_timedwait_monotonic_np. Newer platform
-  // versions have pthread_condattr_setclock.
-  // Mac can use relative time deadlines.
-#if !defined(__APPLE__) && !defined(OS_NACL) && \
-      !(defined(OS_ANDROID) && defined(HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC))
+#if defined(__APPLE__)
+  rv = pthread_cond_init(&condition_, nullptr);
+#else
+  // On Linux we can't use relative times like on macOS; reconfiguring the
+  // condition variable to use the monotonic clock means we can use support
+  // WaitFor with our MonoTime implementation.
   pthread_condattr_t attrs;
   rv = pthread_condattr_init(&attrs);
   DCHECK_EQ(0, rv);
   pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
   rv = pthread_cond_init(&condition_, &attrs);
   pthread_condattr_destroy(&attrs);
-#else
-  rv = pthread_cond_init(&condition_, nullptr);
 #endif
   DCHECK_EQ(0, rv);
 }
 
 ConditionVariable::~ConditionVariable() {
-#if defined(OS_MACOSX)
-  // This hack is necessary to avoid a fatal pthreads subsystem bug in the
-  // Darwin kernel. https://codereview.chromium.org/1323293005/
-  {
-    Mutex lock;
-    MutexLock l(lock);
-    struct timespec ts;
-    ts.tv_sec = 0;
-    ts.tv_nsec = 1;
-    pthread_cond_timedwait_relative_np(&condition_, lock.native_handle, &ts);
-  }
-#endif
   int rv = pthread_cond_destroy(&condition_);
   DCHECK_EQ(0, rv);
 }
@@ -75,54 +59,55 @@ void ConditionVariable::Wait() const {
 #endif
 }
 
-bool ConditionVariable::TimedWait(const MonoDelta& max_time) const {
+bool ConditionVariable::WaitUntil(const MonoTime& until) const {
+  ThreadRestrictions::AssertWaitAllowed();
+
+  // Have we already timed out?
+  if (MonoTime::Now() > until) {
+    return false;
+  }
+
+#if !defined(NDEBUG)
+  user_lock_->CheckHeldAndUnmark();
+#endif
+
+  struct timespec absolute_time;
+  until.ToTimeSpec(&absolute_time);
+  int rv = pthread_cond_timedwait(&condition_, user_mutex_, &absolute_time);
+  DCHECK(rv == 0 || rv == ETIMEDOUT)
+    << "unexpected pthread_cond_timedwait return value: " << rv;
+
+#if !defined(NDEBUG)
+  user_lock_->CheckUnheldAndMark();
+#endif
+  return rv == 0;
+}
+
+bool ConditionVariable::WaitFor(const MonoDelta& delta) const {
   ThreadRestrictions::AssertWaitAllowed();
 
   // Negative delta means we've already timed out.
-  int64_t nsecs = max_time.ToNanoseconds();
+  int64_t nsecs = delta.ToNanoseconds();
   if (nsecs < 0) {
     return false;
   }
 
-  struct timespec relative_time;
-  max_time.ToTimeSpec(&relative_time);
-
 #if !defined(NDEBUG)
   user_lock_->CheckHeldAndUnmark();
 #endif
 
 #if defined(__APPLE__)
+  struct timespec relative_time;
+  delta.ToTimeSpec(&relative_time);
   int rv = pthread_cond_timedwait_relative_np(
       &condition_, user_mutex_, &relative_time);
 #else
   // The timeout argument to pthread_cond_timedwait is in absolute time.
   struct timespec absolute_time;
-#if defined(OS_NACL)
-  // See comment in constructor for why this is different in NaCl.
-  struct timeval now;
-  gettimeofday(&now, NULL);
-  absolute_time.tv_sec = now.tv_sec;
-  absolute_time.tv_nsec = now.tv_usec * MonoTime::kNanosecondsPerMicrosecond;
-#else
-  struct timespec now;
-  clock_gettime(CLOCK_MONOTONIC, &now);
-  absolute_time.tv_sec = now.tv_sec;
-  absolute_time.tv_nsec = now.tv_nsec;
-#endif
-
-  absolute_time.tv_sec += relative_time.tv_sec;
-  absolute_time.tv_nsec += relative_time.tv_nsec;
-  absolute_time.tv_sec += absolute_time.tv_nsec / 
MonoTime::kNanosecondsPerSecond;
-  absolute_time.tv_nsec %= MonoTime::kNanosecondsPerSecond;
-  DCHECK_GE(absolute_time.tv_sec, now.tv_sec);  // Overflow paranoia
-
-#if defined(OS_ANDROID) && defined(HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC)
-  int rv = pthread_cond_timedwait_monotonic_np(
-      &condition_, user_mutex_, &absolute_time);
-#else
+  MonoTime deadline = MonoTime::Now() + delta;
+  deadline.ToTimeSpec(&absolute_time);
   int rv = pthread_cond_timedwait(&condition_, user_mutex_, &absolute_time);
-#endif  // OS_ANDROID && HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
-#endif  // __APPLE__
+#endif
 
   DCHECK(rv == 0 || rv == ETIMEDOUT)
     << "unexpected pthread_cond_timedwait return value: " << rv;

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/condition_variable.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/condition_variable.h 
b/src/kudu/util/condition_variable.h
index 1f251ca..1245646 100644
--- a/src/kudu/util/condition_variable.h
+++ b/src/kudu/util/condition_variable.h
@@ -72,6 +72,7 @@
 namespace kudu {
 
 class MonoDelta;
+class MonoTime;
 class Mutex;
 
 class ConditionVariable {
@@ -85,10 +86,15 @@ class ConditionVariable {
   // sleep, and the reacquires it when it is signaled.
   void Wait() const;
 
+  // Like Wait(), but only waits up to a certain point in time.
+  //
+  // Returns true if we were Signal()'ed, or false if we reached 'until'.
+  bool WaitUntil(const MonoTime& until) const;
+
   // Like Wait(), but only waits up to a limited amount of time.
   //
-  // Returns true if we were Signal()'ed, or false if 'max_time' elapsed.
-  bool TimedWait(const MonoDelta& max_time) const;
+  // Returns true if we were Signal()'ed, or false if 'delta' elapsed.
+  bool WaitFor(const MonoDelta& delta) const;
 
   // Broadcast() revives all waiting threads.
   void Broadcast();

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/countdown_latch.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/countdown_latch.h b/src/kudu/util/countdown_latch.h
index 7024c1c..9a8000d 100644
--- a/src/kudu/util/countdown_latch.h
+++ b/src/kudu/util/countdown_latch.h
@@ -79,22 +79,21 @@ class CountDownLatch {
   // Returns true if the count became zero, false otherwise.
   bool WaitUntil(const MonoTime& when) const {
     ThreadRestrictions::AssertWaitAllowed();
-    return WaitFor(when - MonoTime::Now());
-  }
-
-  // Waits for the count on the latch to reach zero, or until 'delta' time 
elapses.
-  // Returns true if the count became zero, false otherwise.
-  bool WaitFor(const MonoDelta& delta) const {
-    ThreadRestrictions::AssertWaitAllowed();
     MutexLock lock(lock_);
     while (count_ > 0) {
-      if (!cond_.TimedWait(delta)) {
+      if (!cond_.WaitUntil(when)) {
         return false;
       }
     }
     return true;
   }
 
+  // Waits for the count on the latch to reach zero, or until 'delta' time 
elapses.
+  // Returns true if the count became zero, false otherwise.
+  bool WaitFor(const MonoDelta& delta) const {
+    return WaitUntil(MonoTime::Now() + delta);
+  }
+
   // Reset the latch with the given count. This is equivalent to reconstructing
   // the latch. If 'count' is 0, and there are currently waiters, those waiters
   // will be triggered as if you counted down to 0.

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.cc 
b/src/kudu/util/maintenance_manager.cc
index 61b2d0b..2694d6c 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -262,7 +262,7 @@ void MaintenanceManager::RunSchedulerThread() {
     // However, if it's time to shut down, we want to do so immediately.
     while ((running_ops_ >= num_threads_ || prev_iter_found_no_work || 
disabled_for_tests()) &&
            !shutdown_) {
-      cond_.TimedWait(polling_interval);
+      cond_.WaitFor(polling_interval);
       prev_iter_found_no_work = false;
     }
     if (shutdown_) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/monotime.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/monotime.cc b/src/kudu/util/monotime.cc
index abe3664..89c795d 100644
--- a/src/kudu/util/monotime.cc
+++ b/src/kudu/util/monotime.cc
@@ -223,6 +223,11 @@ std::string MonoTime::ToString() const {
   return StringPrintf("%.3fs", ToSeconds());
 }
 
+void MonoTime::ToTimeSpec(struct timespec* ts) const {
+  DCHECK(Initialized());
+  MonoDelta::NanosToTimeSpec(nanos_, ts);
+}
+
 bool MonoTime::Equals(const MonoTime& other) const {
   return nanos_ == other.nanos_;
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/monotime.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/monotime.h b/src/kudu/util/monotime.h
index 59d92f0..bb8ec35 100644
--- a/src/kudu/util/monotime.h
+++ b/src/kudu/util/monotime.h
@@ -35,8 +35,6 @@
 
 #include "kudu/util/kudu_export.h"
 
-struct timeval;   // IWYU pragma: keep
-
 namespace kudu {
 
 /// @brief A representation of a time interval.
@@ -213,6 +211,13 @@ class KUDU_EXPORT MonoTime {
   /// @return String representation of the object (in seconds).
   std::string ToString() const;
 
+  /// Represent this point in time as a timespec structure, with nanosecond
+  /// accuracy.
+  ///
+  /// @param [out] ts
+  ///   Placeholder for the result value.
+  void ToTimeSpec(struct timespec* ts) const;
+
   /// Check whether this object represents the same point in time as the other.
   ///
   /// @param [in] other

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/pstack_watcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/pstack_watcher.cc b/src/kudu/util/pstack_watcher.cc
index 6cf63d3..5ab4b63 100644
--- a/src/kudu/util/pstack_watcher.cc
+++ b/src/kudu/util/pstack_watcher.cc
@@ -79,7 +79,7 @@ void PstackWatcher::Wait() const {
 void PstackWatcher::Run() {
   MutexLock guard(lock_);
   if (!running_) return;
-  cond_.TimedWait(timeout_);
+  cond_.WaitFor(timeout_);
   if (!running_) return;
 
   WARN_NOT_OK(DumpStacks(DUMP_FULL), "Unable to print pstack from watcher");

http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/threadpool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 8e27878..23dda3d 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -230,20 +230,20 @@ void ThreadPoolToken::Wait() {
 }
 
 bool ThreadPoolToken::WaitUntil(const MonoTime& until) {
-  return WaitFor(until - MonoTime::Now());
-}
-
-bool ThreadPoolToken::WaitFor(const MonoDelta& delta) {
   MutexLock unique_lock(pool_->lock_);
   pool_->CheckNotPoolThreadUnlocked();
   while (IsActive()) {
-    if (!not_running_cond_.TimedWait(delta)) {
+    if (!not_running_cond_.WaitUntil(until)) {
       return false;
     }
   }
   return true;
 }
 
+bool ThreadPoolToken::WaitFor(const MonoDelta& delta) {
+  return WaitUntil(MonoTime::Now() + delta);
+}
+
 void ThreadPoolToken::Transition(State new_state) {
 #ifndef NDEBUG
   CHECK_NE(state_, new_state);
@@ -581,20 +581,20 @@ void ThreadPool::Wait() {
 }
 
 bool ThreadPool::WaitUntil(const MonoTime& until) {
-  return WaitFor(until - MonoTime::Now());
-}
-
-bool ThreadPool::WaitFor(const MonoDelta& delta) {
   MutexLock unique_lock(lock_);
   CheckNotPoolThreadUnlocked();
   while (total_queued_tasks_ > 0 || active_threads_ > 0) {
-    if (!idle_cond_.TimedWait(delta)) {
+    if (!idle_cond_.WaitUntil(until)) {
       return false;
     }
   }
   return true;
 }
 
+bool ThreadPool::WaitFor(const MonoDelta& delta) {
+  return WaitUntil(MonoTime::Now() + delta);
+}
+
 void ThreadPool::DispatchThread() {
   MutexLock unique_lock(lock_);
   InsertOrDie(&threads_, Thread::current_thread());
@@ -623,7 +623,7 @@ void ThreadPool::DispatchThread() {
       SCOPED_CLEANUP({
         // For some wake ups (i.e. Shutdown or DoSubmit) this thread is
         // guaranteed to be unlinked after being awakened. In others (i.e.
-        // spurious wake-up or TimedWait timeout), it'll still be linked.
+        // spurious wake-up or Wait timeout), it'll still be linked.
         if (me.is_linked()) {
           idle_threads_.erase(idle_threads_.iterator_to(me));
         }
@@ -631,7 +631,7 @@ void ThreadPool::DispatchThread() {
       if (permanent) {
         me.not_empty.Wait();
       } else {
-        if (!me.not_empty.TimedWait(idle_timeout_)) {
+        if (!me.not_empty.WaitFor(idle_timeout_)) {
           // After much investigation, it appears that pthread condition 
variables have
           // a weird behavior in which they can return ETIMEDOUT from 
timed_wait even if
           // another thread did in fact signal. Apparently after a timeout 
there is some

Reply via email to