condition_variable: rework timed waiting mechanisms and associated cleanup This patch reworks ConditionVariable::TimedWait into two new methods: 1. WaitFor(delta), equivalent to TimedWait. 2. WaitUntil(deadline), waits for a deadline to elapse.
Besides the improved ergonomics of using WaitUntil() when all one has on hand is a deadline, WaitUntil() also yields more precise timeouts when dealing with spurious wakeups while waiting in a loop; the WaitFor methods in CountDownLatch and ThreadPool stand to benefit from this. I also cleaned up some dead portability code in condition_variable.cc. Change-Id: Ie2d3d0d9d84c3b1a76f3efc8ae706ddcaa455630 Reviewed-on: http://gerrit.cloudera.org:8080/9572 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <t...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/038c3ebe Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/038c3ebe Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/038c3ebe Branch: refs/heads/master Commit: 038c3ebe77f41e3d2ebfc1d08ea8cf849edb2a5d Parents: cda07be Author: Adar Dembo <a...@cloudera.com> Authored: Fri Mar 9 17:22:13 2018 -0800 Committer: Todd Lipcon <t...@apache.org> Committed: Mon Mar 12 23:47:08 2018 +0000 ---------------------------------------------------------------------- src/kudu/master/catalog_manager.cc | 2 +- src/kudu/server/diagnostics_log.cc | 2 +- src/kudu/tserver/heartbeater.cc | 12 ++--- src/kudu/tserver/scanners.cc | 2 +- src/kudu/util/async_logger.cc | 2 +- src/kudu/util/blocking_queue.h | 3 +- src/kudu/util/condition_variable.cc | 89 +++++++++++++------------------ src/kudu/util/condition_variable.h | 10 +++- src/kudu/util/countdown_latch.h | 15 +++--- src/kudu/util/maintenance_manager.cc | 2 +- src/kudu/util/monotime.cc | 5 ++ src/kudu/util/monotime.h | 9 +++- src/kudu/util/pstack_watcher.cc | 2 +- src/kudu/util/threadpool.cc | 24 ++++----- 14 files changed, 88 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/master/catalog_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index 1a76b12..35ec262 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -457,7 +457,7 @@ class CatalogManagerBgTasks { MutexLock lock(lock_); if (closing_) return; if (!pending_updates_) { - cond_.TimedWait(MonoDelta::FromMilliseconds(msec)); + cond_.WaitFor(MonoDelta::FromMilliseconds(msec)); } pending_updates_ = false; } http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/server/diagnostics_log.cc ---------------------------------------------------------------------- diff --git a/src/kudu/server/diagnostics_log.cc b/src/kudu/server/diagnostics_log.cc index f4dce27..a185bd2 100644 --- a/src/kudu/server/diagnostics_log.cc +++ b/src/kudu/server/diagnostics_log.cc @@ -193,7 +193,7 @@ void DiagnosticsLog::RunThread() { while (!stop_) { MonoTime next_log = wakeups.top().first; - wake_.TimedWait(next_log - MonoTime::Now()); + wake_.WaitUntil(next_log); string reason; WakeupType what; http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/tserver/heartbeater.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc index e2657fb..26d0a58 100644 --- a/src/kudu/tserver/heartbeater.cc +++ b/src/kudu/tserver/heartbeater.cc @@ -564,14 +564,10 @@ void Heartbeater::Thread::RunThread() { // or for the signal to shut down. { MutexLock l(mutex_); - while (true) { - MonoDelta remaining = next_heartbeat - MonoTime::Now(); - if (remaining.ToMilliseconds() <= 0 || - heartbeat_asap_ || - !should_run_) { - break; - } - cond_.TimedWait(remaining); + while (next_heartbeat > MonoTime::Now() && + !heartbeat_asap_ && + should_run_) { + cond_.WaitUntil(next_heartbeat); } heartbeat_asap_ = false; http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/tserver/scanners.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc index eaf4836..96dccf2 100644 --- a/src/kudu/tserver/scanners.cc +++ b/src/kudu/tserver/scanners.cc @@ -119,7 +119,7 @@ void ScannerManager::RunRemovalThread() { if (shutdown_) { return; } - shutdown_cv_.TimedWait(MonoDelta::FromMicroseconds(FLAGS_scanner_gc_check_interval_us)); + shutdown_cv_.WaitFor(MonoDelta::FromMicroseconds(FLAGS_scanner_gc_check_interval_us)); } RemoveExpiredScanners(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/async_logger.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/async_logger.cc b/src/kudu/util/async_logger.cc index 9eb0bc6..3214a42 100644 --- a/src/kudu/util/async_logger.cc +++ b/src/kudu/util/async_logger.cc @@ -114,7 +114,7 @@ void AsyncLogger::RunThread() { MutexLock l(lock_); while (state_ == RUNNING || active_buf_->needs_flush_or_write()) { while (!active_buf_->needs_flush_or_write() && state_ == RUNNING) { - if (!wake_flusher_cond_.TimedWait(MonoDelta::FromSeconds(FLAGS_logbufsecs))) { + if (!wake_flusher_cond_.WaitFor(MonoDelta::FromSeconds(FLAGS_logbufsecs))) { // In case of wait timeout, force it to flush regardless whether there is anything enqueued. active_buf_->flush = true; } http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/blocking_queue.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h index 2907030..7331c12 100644 --- a/src/kudu/util/blocking_queue.h +++ b/src/kudu/util/blocking_queue.h @@ -26,6 +26,7 @@ #include "kudu/gutil/basictypes.h" #include "kudu/gutil/gscoped_ptr.h" #include "kudu/util/condition_variable.h" +#include "kudu/util/monotime.h" #include "kudu/util/mutex.h" #include "kudu/util/status.h" @@ -131,7 +132,7 @@ class BlockingQueue { } if (!deadline.Initialized()) { not_empty_.Wait(); - } else if (PREDICT_FALSE(!not_empty_.TimedWait(deadline - MonoTime::Now()))) { + } else if (PREDICT_FALSE(!not_empty_.WaitUntil(deadline))) { return Status::TimedOut(""); } } http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/condition_variable.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/condition_variable.cc b/src/kudu/util/condition_variable.cc index c79a0a4..690188a 100644 --- a/src/kudu/util/condition_variable.cc +++ b/src/kudu/util/condition_variable.cc @@ -26,39 +26,23 @@ ConditionVariable::ConditionVariable(Mutex* user_lock) #endif { int rv = 0; - // http://crbug.com/293736 - // NaCl doesn't support monotonic clock based absolute deadlines. - // On older Android platform versions, it's supported through the - // non-standard pthread_cond_timedwait_monotonic_np. Newer platform - // versions have pthread_condattr_setclock. - // Mac can use relative time deadlines. -#if !defined(__APPLE__) && !defined(OS_NACL) && \ - !(defined(OS_ANDROID) && defined(HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC)) +#if defined(__APPLE__) + rv = pthread_cond_init(&condition_, nullptr); +#else + // On Linux we can't use relative times like on macOS; reconfiguring the + // condition variable to use the monotonic clock means we can use support + // WaitFor with our MonoTime implementation. pthread_condattr_t attrs; rv = pthread_condattr_init(&attrs); DCHECK_EQ(0, rv); pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC); rv = pthread_cond_init(&condition_, &attrs); pthread_condattr_destroy(&attrs); -#else - rv = pthread_cond_init(&condition_, nullptr); #endif DCHECK_EQ(0, rv); } ConditionVariable::~ConditionVariable() { -#if defined(OS_MACOSX) - // This hack is necessary to avoid a fatal pthreads subsystem bug in the - // Darwin kernel. https://codereview.chromium.org/1323293005/ - { - Mutex lock; - MutexLock l(lock); - struct timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = 1; - pthread_cond_timedwait_relative_np(&condition_, lock.native_handle, &ts); - } -#endif int rv = pthread_cond_destroy(&condition_); DCHECK_EQ(0, rv); } @@ -75,54 +59,55 @@ void ConditionVariable::Wait() const { #endif } -bool ConditionVariable::TimedWait(const MonoDelta& max_time) const { +bool ConditionVariable::WaitUntil(const MonoTime& until) const { + ThreadRestrictions::AssertWaitAllowed(); + + // Have we already timed out? + if (MonoTime::Now() > until) { + return false; + } + +#if !defined(NDEBUG) + user_lock_->CheckHeldAndUnmark(); +#endif + + struct timespec absolute_time; + until.ToTimeSpec(&absolute_time); + int rv = pthread_cond_timedwait(&condition_, user_mutex_, &absolute_time); + DCHECK(rv == 0 || rv == ETIMEDOUT) + << "unexpected pthread_cond_timedwait return value: " << rv; + +#if !defined(NDEBUG) + user_lock_->CheckUnheldAndMark(); +#endif + return rv == 0; +} + +bool ConditionVariable::WaitFor(const MonoDelta& delta) const { ThreadRestrictions::AssertWaitAllowed(); // Negative delta means we've already timed out. - int64_t nsecs = max_time.ToNanoseconds(); + int64_t nsecs = delta.ToNanoseconds(); if (nsecs < 0) { return false; } - struct timespec relative_time; - max_time.ToTimeSpec(&relative_time); - #if !defined(NDEBUG) user_lock_->CheckHeldAndUnmark(); #endif #if defined(__APPLE__) + struct timespec relative_time; + delta.ToTimeSpec(&relative_time); int rv = pthread_cond_timedwait_relative_np( &condition_, user_mutex_, &relative_time); #else // The timeout argument to pthread_cond_timedwait is in absolute time. struct timespec absolute_time; -#if defined(OS_NACL) - // See comment in constructor for why this is different in NaCl. - struct timeval now; - gettimeofday(&now, NULL); - absolute_time.tv_sec = now.tv_sec; - absolute_time.tv_nsec = now.tv_usec * MonoTime::kNanosecondsPerMicrosecond; -#else - struct timespec now; - clock_gettime(CLOCK_MONOTONIC, &now); - absolute_time.tv_sec = now.tv_sec; - absolute_time.tv_nsec = now.tv_nsec; -#endif - - absolute_time.tv_sec += relative_time.tv_sec; - absolute_time.tv_nsec += relative_time.tv_nsec; - absolute_time.tv_sec += absolute_time.tv_nsec / MonoTime::kNanosecondsPerSecond; - absolute_time.tv_nsec %= MonoTime::kNanosecondsPerSecond; - DCHECK_GE(absolute_time.tv_sec, now.tv_sec); // Overflow paranoia - -#if defined(OS_ANDROID) && defined(HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC) - int rv = pthread_cond_timedwait_monotonic_np( - &condition_, user_mutex_, &absolute_time); -#else + MonoTime deadline = MonoTime::Now() + delta; + deadline.ToTimeSpec(&absolute_time); int rv = pthread_cond_timedwait(&condition_, user_mutex_, &absolute_time); -#endif // OS_ANDROID && HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC -#endif // __APPLE__ +#endif DCHECK(rv == 0 || rv == ETIMEDOUT) << "unexpected pthread_cond_timedwait return value: " << rv; http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/condition_variable.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/condition_variable.h b/src/kudu/util/condition_variable.h index 1f251ca..1245646 100644 --- a/src/kudu/util/condition_variable.h +++ b/src/kudu/util/condition_variable.h @@ -72,6 +72,7 @@ namespace kudu { class MonoDelta; +class MonoTime; class Mutex; class ConditionVariable { @@ -85,10 +86,15 @@ class ConditionVariable { // sleep, and the reacquires it when it is signaled. void Wait() const; + // Like Wait(), but only waits up to a certain point in time. + // + // Returns true if we were Signal()'ed, or false if we reached 'until'. + bool WaitUntil(const MonoTime& until) const; + // Like Wait(), but only waits up to a limited amount of time. // - // Returns true if we were Signal()'ed, or false if 'max_time' elapsed. - bool TimedWait(const MonoDelta& max_time) const; + // Returns true if we were Signal()'ed, or false if 'delta' elapsed. + bool WaitFor(const MonoDelta& delta) const; // Broadcast() revives all waiting threads. void Broadcast(); http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/countdown_latch.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/countdown_latch.h b/src/kudu/util/countdown_latch.h index 7024c1c..9a8000d 100644 --- a/src/kudu/util/countdown_latch.h +++ b/src/kudu/util/countdown_latch.h @@ -79,22 +79,21 @@ class CountDownLatch { // Returns true if the count became zero, false otherwise. bool WaitUntil(const MonoTime& when) const { ThreadRestrictions::AssertWaitAllowed(); - return WaitFor(when - MonoTime::Now()); - } - - // Waits for the count on the latch to reach zero, or until 'delta' time elapses. - // Returns true if the count became zero, false otherwise. - bool WaitFor(const MonoDelta& delta) const { - ThreadRestrictions::AssertWaitAllowed(); MutexLock lock(lock_); while (count_ > 0) { - if (!cond_.TimedWait(delta)) { + if (!cond_.WaitUntil(when)) { return false; } } return true; } + // Waits for the count on the latch to reach zero, or until 'delta' time elapses. + // Returns true if the count became zero, false otherwise. + bool WaitFor(const MonoDelta& delta) const { + return WaitUntil(MonoTime::Now() + delta); + } + // Reset the latch with the given count. This is equivalent to reconstructing // the latch. If 'count' is 0, and there are currently waiters, those waiters // will be triggered as if you counted down to 0. http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/maintenance_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc index 61b2d0b..2694d6c 100644 --- a/src/kudu/util/maintenance_manager.cc +++ b/src/kudu/util/maintenance_manager.cc @@ -262,7 +262,7 @@ void MaintenanceManager::RunSchedulerThread() { // However, if it's time to shut down, we want to do so immediately. while ((running_ops_ >= num_threads_ || prev_iter_found_no_work || disabled_for_tests()) && !shutdown_) { - cond_.TimedWait(polling_interval); + cond_.WaitFor(polling_interval); prev_iter_found_no_work = false; } if (shutdown_) { http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/monotime.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/monotime.cc b/src/kudu/util/monotime.cc index abe3664..89c795d 100644 --- a/src/kudu/util/monotime.cc +++ b/src/kudu/util/monotime.cc @@ -223,6 +223,11 @@ std::string MonoTime::ToString() const { return StringPrintf("%.3fs", ToSeconds()); } +void MonoTime::ToTimeSpec(struct timespec* ts) const { + DCHECK(Initialized()); + MonoDelta::NanosToTimeSpec(nanos_, ts); +} + bool MonoTime::Equals(const MonoTime& other) const { return nanos_ == other.nanos_; } http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/monotime.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/monotime.h b/src/kudu/util/monotime.h index 59d92f0..bb8ec35 100644 --- a/src/kudu/util/monotime.h +++ b/src/kudu/util/monotime.h @@ -35,8 +35,6 @@ #include "kudu/util/kudu_export.h" -struct timeval; // IWYU pragma: keep - namespace kudu { /// @brief A representation of a time interval. @@ -213,6 +211,13 @@ class KUDU_EXPORT MonoTime { /// @return String representation of the object (in seconds). std::string ToString() const; + /// Represent this point in time as a timespec structure, with nanosecond + /// accuracy. + /// + /// @param [out] ts + /// Placeholder for the result value. + void ToTimeSpec(struct timespec* ts) const; + /// Check whether this object represents the same point in time as the other. /// /// @param [in] other http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/pstack_watcher.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/pstack_watcher.cc b/src/kudu/util/pstack_watcher.cc index 6cf63d3..5ab4b63 100644 --- a/src/kudu/util/pstack_watcher.cc +++ b/src/kudu/util/pstack_watcher.cc @@ -79,7 +79,7 @@ void PstackWatcher::Wait() const { void PstackWatcher::Run() { MutexLock guard(lock_); if (!running_) return; - cond_.TimedWait(timeout_); + cond_.WaitFor(timeout_); if (!running_) return; WARN_NOT_OK(DumpStacks(DUMP_FULL), "Unable to print pstack from watcher"); http://git-wip-us.apache.org/repos/asf/kudu/blob/038c3ebe/src/kudu/util/threadpool.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc index 8e27878..23dda3d 100644 --- a/src/kudu/util/threadpool.cc +++ b/src/kudu/util/threadpool.cc @@ -230,20 +230,20 @@ void ThreadPoolToken::Wait() { } bool ThreadPoolToken::WaitUntil(const MonoTime& until) { - return WaitFor(until - MonoTime::Now()); -} - -bool ThreadPoolToken::WaitFor(const MonoDelta& delta) { MutexLock unique_lock(pool_->lock_); pool_->CheckNotPoolThreadUnlocked(); while (IsActive()) { - if (!not_running_cond_.TimedWait(delta)) { + if (!not_running_cond_.WaitUntil(until)) { return false; } } return true; } +bool ThreadPoolToken::WaitFor(const MonoDelta& delta) { + return WaitUntil(MonoTime::Now() + delta); +} + void ThreadPoolToken::Transition(State new_state) { #ifndef NDEBUG CHECK_NE(state_, new_state); @@ -581,20 +581,20 @@ void ThreadPool::Wait() { } bool ThreadPool::WaitUntil(const MonoTime& until) { - return WaitFor(until - MonoTime::Now()); -} - -bool ThreadPool::WaitFor(const MonoDelta& delta) { MutexLock unique_lock(lock_); CheckNotPoolThreadUnlocked(); while (total_queued_tasks_ > 0 || active_threads_ > 0) { - if (!idle_cond_.TimedWait(delta)) { + if (!idle_cond_.WaitUntil(until)) { return false; } } return true; } +bool ThreadPool::WaitFor(const MonoDelta& delta) { + return WaitUntil(MonoTime::Now() + delta); +} + void ThreadPool::DispatchThread() { MutexLock unique_lock(lock_); InsertOrDie(&threads_, Thread::current_thread()); @@ -623,7 +623,7 @@ void ThreadPool::DispatchThread() { SCOPED_CLEANUP({ // For some wake ups (i.e. Shutdown or DoSubmit) this thread is // guaranteed to be unlinked after being awakened. In others (i.e. - // spurious wake-up or TimedWait timeout), it'll still be linked. + // spurious wake-up or Wait timeout), it'll still be linked. if (me.is_linked()) { idle_threads_.erase(idle_threads_.iterator_to(me)); } @@ -631,7 +631,7 @@ void ThreadPool::DispatchThread() { if (permanent) { me.not_empty.Wait(); } else { - if (!me.not_empty.TimedWait(idle_timeout_)) { + if (!me.not_empty.WaitFor(idle_timeout_)) { // After much investigation, it appears that pthread condition variables have // a weird behavior in which they can return ETIMEDOUT from timed_wait even if // another thread did in fact signal. Apparently after a timeout there is some