Synchronizer::WaitFor thread-safety WaitFor's implementation previously was not thread safe in the case that the waiter deallocated the Synchronizer after receiving a timeout.
Change-Id: I9565a50839ffd23b5bac6986a6fdee41ac21aa3a Reviewed-on: http://gerrit.cloudera.org:8080/10783 Reviewed-by: Adar Dembo <a...@cloudera.com> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f6e8fe6c Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f6e8fe6c Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f6e8fe6c Branch: refs/heads/master Commit: f6e8fe6c6bb6dd83e6485bc1b56364b11b250e94 Parents: f1e652b Author: Dan Burkert <danburk...@apache.org> Authored: Thu Jun 21 11:49:39 2018 -0700 Committer: Dan Burkert <danburk...@apache.org> Committed: Fri Jun 22 22:15:55 2018 +0000 ---------------------------------------------------------------------- src/kudu/util/CMakeLists.txt | 1 + src/kudu/util/async_util-test.cc | 129 ++++++++++++++++++++++++++++++++++ src/kudu/util/async_util.h | 71 +++++++++++-------- 3 files changed, 170 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/f6e8fe6c/src/kudu/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt index 044660c..743e1af 100644 --- a/src/kudu/util/CMakeLists.txt +++ b/src/kudu/util/CMakeLists.txt @@ -331,6 +331,7 @@ target_link_libraries(protoc-gen-insertions gutil protobuf protoc ${KUDU_BASE_LI ####################################### set(KUDU_TEST_LINK_LIBS kudu_util gutil ${KUDU_MIN_TEST_LIBS}) +ADD_KUDU_TEST(async_util-test) ADD_KUDU_TEST(atomic-test) ADD_KUDU_TEST(bit-util-test) ADD_KUDU_TEST(bitmap-test) http://git-wip-us.apache.org/repos/asf/kudu/blob/f6e8fe6c/src/kudu/util/async_util-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/async_util-test.cc b/src/kudu/util/async_util-test.cc new file mode 100644 index 0000000..5cb7a63 --- /dev/null +++ b/src/kudu/util/async_util-test.cc @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/async_util.h" + +#include <unistd.h> + +#include <functional> +#include <thread> +#include <vector> + +#include <gtest/gtest.h> + +#include "kudu/gutil/basictypes.h" +#include "kudu/gutil/callback.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +using std::thread; +using std::vector; + +namespace kudu { + +class AsyncUtilTest : public KuduTest { + public: + AsyncUtilTest() { + // Set up an alarm to fail the test in case of deadlock. + alarm(30); + } + ~AsyncUtilTest() { + // Disable the alarm on test exit. + alarm(0); + } +}; + +// Test completing the synchronizer through each of the APIs it exposes. +TEST_F(AsyncUtilTest, TestSynchronizerCompletion) { + Synchronizer sync; + + { + auto waiter = thread([sync] { + ignore_result(sync.Wait()); + }); + SleepFor(MonoDelta::FromMilliseconds(5)); + sync.StatusCB(Status::OK()); + waiter.join(); + } + sync.Reset(); + { + auto cb = sync.AsStatusCallback(); + auto waiter = thread([sync] { + ignore_result(sync.Wait()); + }); + SleepFor(MonoDelta::FromMilliseconds(5)); + cb.Run(Status::OK()); + waiter.join(); + } + sync.Reset(); + { + auto cb = sync.AsStdStatusCallback(); + auto waiter = thread([sync] { + ignore_result(sync.Wait()); + }); + SleepFor(MonoDelta::FromMilliseconds(5)); + cb(Status::OK()); + waiter.join(); + } +} + +TEST_F(AsyncUtilTest, TestSynchronizerMultiWait) { + Synchronizer sync; + vector<thread> waiters; + for (int i = 0; i < 5; i++) { + waiters.emplace_back([sync] { + ignore_result(sync.Wait()); + }); + } + SleepFor(MonoDelta::FromMilliseconds(5)); + sync.StatusCB(Status::OK()); + + for (auto& waiter : waiters) { + waiter.join(); + } +} + +TEST_F(AsyncUtilTest, TestSynchronizerTimedWait) { + thread waiter; + { + Synchronizer sync; + auto cb = sync.AsStatusCallback(); + waiter = thread([cb] { + SleepFor(MonoDelta::FromMilliseconds(5)); + cb.Run(Status::OK()); + }); + ASSERT_OK(sync.WaitFor(MonoDelta::FromMilliseconds(1000))); + } + waiter.join(); + + { + Synchronizer sync; + auto cb = sync.AsStatusCallback(); + waiter = thread([cb] { + SleepFor(MonoDelta::FromMilliseconds(1000)); + cb.Run(Status::OK()); + }); + ASSERT_TRUE(sync.WaitFor(MonoDelta::FromMilliseconds(5)).IsTimedOut()); + } + + // Waiting on the thread gives TSAN to check that no thread safety issues + // occurred. + waiter.join(); +} +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/f6e8fe6c/src/kudu/util/async_util.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/async_util.h b/src/kudu/util/async_util.h index 727b7f7..338c6c2 100644 --- a/src/kudu/util/async_util.h +++ b/src/kudu/util/async_util.h @@ -16,10 +16,11 @@ // under the License. // // Utility functions which are handy when doing async/callback-based programming. -#ifndef KUDU_UTIL_ASYNC_UTIL_H -#define KUDU_UTIL_ASYNC_UTIL_H + +#pragma once #include <functional> +#include <memory> #include "kudu/gutil/bind.h" #include "kudu/gutil/macros.h" @@ -32,59 +33,67 @@ namespace kudu { // Simple class which can be used to make async methods synchronous. // For example: // Synchronizer s; -// SomeAsyncMethod(s.callback()); +// SomeAsyncMethod(s.AsStatusCallback()); // CHECK_OK(s.Wait()); +// +// The lifetime of the synchronizer is decoupled from the callback it produces. +// If the callback outlives the synchronizer then executing it will be a no-op. +// Callers must be careful not to allow the callback to be destructed without +// completing it, otherwise the thread waiting on the synchronizer will block +// indefinitely. class Synchronizer { public: Synchronizer() - : l_(1) { + : data_(std::make_shared<Data>()) { } void StatusCB(const Status& status) { - s_ = status; - l_.CountDown(); + Data::Callback(std::weak_ptr<Data>(data_), status); } StatusCallback AsStatusCallback() { - // Synchronizers are often declared on the stack, so it doesn't make - // sense for a callback to take a reference to its synchronizer. - // - // Note: this means the returned callback _must_ go out of scope before - // its synchronizer. - return Bind(&Synchronizer::StatusCB, Unretained(this)); + return Bind(Data::Callback, std::weak_ptr<Data>(data_)); } StdStatusCallback AsStdStatusCallback() { - // Synchronizers are often declared on the stack, so it doesn't make - // sense for a callback to take a reference to its synchronizer. - // - // Note: this means the returned callback _must_ go out of scope before - // its synchronizer. - return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1); + return std::bind(Data::Callback, std::weak_ptr<Data>(data_), std::placeholders::_1); } - Status Wait() { - l_.Wait(); - return s_; + Status Wait() const { + data_->latch.Wait(); + return data_->status; } - Status WaitFor(const MonoDelta& delta) { - if (PREDICT_FALSE(!l_.WaitFor(delta))) { - return Status::TimedOut("Timed out while waiting for the callback to be called."); + Status WaitFor(const MonoDelta& delta) const { + if (PREDICT_FALSE(!data_->latch.WaitFor(delta))) { + return Status::TimedOut("timed out while waiting for the callback to be called"); } - return s_; + return data_->status; } void Reset() { - l_.Reset(1); + data_->latch.Reset(1); } private: - Status s_; - CountDownLatch l_; - DISALLOW_COPY_AND_ASSIGN(Synchronizer); -}; + struct Data { + Data() : latch(1) { + } + static void Callback(std::weak_ptr<Data> weak, const Status& status) { + auto ptr = weak.lock(); + if (ptr) { + ptr->status = status; + ptr->latch.CountDown(); + } + } + + Status status; + CountDownLatch latch; + DISALLOW_COPY_AND_ASSIGN(Data); + }; + + std::shared_ptr<Data> data_; +}; } // namespace kudu -#endif /* KUDU_UTIL_ASYNC_UTIL_H */