Synchronizer::WaitFor thread-safety

WaitFor's implementation previously was not thread safe in the case that
the waiter deallocated the Synchronizer after receiving a timeout.

Change-Id: I9565a50839ffd23b5bac6986a6fdee41ac21aa3a
Reviewed-on: http://gerrit.cloudera.org:8080/10783
Reviewed-by: Adar Dembo <a...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f6e8fe6c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f6e8fe6c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f6e8fe6c

Branch: refs/heads/master
Commit: f6e8fe6c6bb6dd83e6485bc1b56364b11b250e94
Parents: f1e652b
Author: Dan Burkert <danburk...@apache.org>
Authored: Thu Jun 21 11:49:39 2018 -0700
Committer: Dan Burkert <danburk...@apache.org>
Committed: Fri Jun 22 22:15:55 2018 +0000

----------------------------------------------------------------------
 src/kudu/util/CMakeLists.txt     |   1 +
 src/kudu/util/async_util-test.cc | 129 ++++++++++++++++++++++++++++++++++
 src/kudu/util/async_util.h       |  71 +++++++++++--------
 3 files changed, 170 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f6e8fe6c/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 044660c..743e1af 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -331,6 +331,7 @@ target_link_libraries(protoc-gen-insertions gutil protobuf 
protoc ${KUDU_BASE_LI
 #######################################
 
 set(KUDU_TEST_LINK_LIBS kudu_util gutil ${KUDU_MIN_TEST_LIBS})
+ADD_KUDU_TEST(async_util-test)
 ADD_KUDU_TEST(atomic-test)
 ADD_KUDU_TEST(bit-util-test)
 ADD_KUDU_TEST(bitmap-test)

http://git-wip-us.apache.org/repos/asf/kudu/blob/f6e8fe6c/src/kudu/util/async_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/async_util-test.cc b/src/kudu/util/async_util-test.cc
new file mode 100644
index 0000000..5cb7a63
--- /dev/null
+++ b/src/kudu/util/async_util-test.cc
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/async_util.h"
+
+#include <unistd.h>
+
+#include <functional>
+#include <thread>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/callback.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::thread;
+using std::vector;
+
+namespace kudu {
+
+class AsyncUtilTest : public KuduTest {
+ public:
+  AsyncUtilTest() {
+    // Set up an alarm to fail the test in case of deadlock.
+    alarm(30);
+  }
+  ~AsyncUtilTest() {
+    // Disable the alarm on test exit.
+    alarm(0);
+  }
+};
+
+// Test completing the synchronizer through each of the APIs it exposes.
+TEST_F(AsyncUtilTest, TestSynchronizerCompletion) {
+  Synchronizer sync;
+
+  {
+    auto waiter = thread([sync] {
+        ignore_result(sync.Wait());
+    });
+    SleepFor(MonoDelta::FromMilliseconds(5));
+    sync.StatusCB(Status::OK());
+    waiter.join();
+  }
+  sync.Reset();
+  {
+    auto cb = sync.AsStatusCallback();
+    auto waiter = thread([sync] {
+        ignore_result(sync.Wait());
+    });
+    SleepFor(MonoDelta::FromMilliseconds(5));
+    cb.Run(Status::OK());
+    waiter.join();
+  }
+  sync.Reset();
+  {
+    auto cb = sync.AsStdStatusCallback();
+    auto waiter = thread([sync] {
+        ignore_result(sync.Wait());
+    });
+    SleepFor(MonoDelta::FromMilliseconds(5));
+    cb(Status::OK());
+    waiter.join();
+  }
+}
+
+TEST_F(AsyncUtilTest, TestSynchronizerMultiWait) {
+  Synchronizer sync;
+  vector<thread> waiters;
+  for (int i = 0; i < 5; i++) {
+    waiters.emplace_back([sync] {
+        ignore_result(sync.Wait());
+    });
+  }
+  SleepFor(MonoDelta::FromMilliseconds(5));
+  sync.StatusCB(Status::OK());
+
+  for (auto& waiter : waiters) {
+    waiter.join();
+  }
+}
+
+TEST_F(AsyncUtilTest, TestSynchronizerTimedWait) {
+  thread waiter;
+  {
+    Synchronizer sync;
+    auto cb = sync.AsStatusCallback();
+    waiter = thread([cb] {
+        SleepFor(MonoDelta::FromMilliseconds(5));
+        cb.Run(Status::OK());
+    });
+    ASSERT_OK(sync.WaitFor(MonoDelta::FromMilliseconds(1000)));
+  }
+  waiter.join();
+
+  {
+    Synchronizer sync;
+    auto cb = sync.AsStatusCallback();
+    waiter = thread([cb] {
+        SleepFor(MonoDelta::FromMilliseconds(1000));
+        cb.Run(Status::OK());
+    });
+    ASSERT_TRUE(sync.WaitFor(MonoDelta::FromMilliseconds(5)).IsTimedOut());
+  }
+
+  // Waiting on the thread gives TSAN to check that no thread safety issues
+  // occurred.
+  waiter.join();
+}
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/f6e8fe6c/src/kudu/util/async_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/async_util.h b/src/kudu/util/async_util.h
index 727b7f7..338c6c2 100644
--- a/src/kudu/util/async_util.h
+++ b/src/kudu/util/async_util.h
@@ -16,10 +16,11 @@
 // under the License.
 //
 // Utility functions which are handy when doing async/callback-based 
programming.
-#ifndef KUDU_UTIL_ASYNC_UTIL_H
-#define KUDU_UTIL_ASYNC_UTIL_H
+
+#pragma once
 
 #include <functional>
+#include <memory>
 
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/macros.h"
@@ -32,59 +33,67 @@ namespace kudu {
 // Simple class which can be used to make async methods synchronous.
 // For example:
 //   Synchronizer s;
-//   SomeAsyncMethod(s.callback());
+//   SomeAsyncMethod(s.AsStatusCallback());
 //   CHECK_OK(s.Wait());
+//
+// The lifetime of the synchronizer is decoupled from the callback it produces.
+// If the callback outlives the synchronizer then executing it will be a no-op.
+// Callers must be careful not to allow the callback to be destructed without
+// completing it, otherwise the thread waiting on the synchronizer will block
+// indefinitely.
 class Synchronizer {
  public:
   Synchronizer()
-    : l_(1) {
+    : data_(std::make_shared<Data>()) {
   }
 
   void StatusCB(const Status& status) {
-    s_ = status;
-    l_.CountDown();
+    Data::Callback(std::weak_ptr<Data>(data_), status);
   }
 
   StatusCallback AsStatusCallback() {
-    // Synchronizers are often declared on the stack, so it doesn't make
-    // sense for a callback to take a reference to its synchronizer.
-    //
-    // Note: this means the returned callback _must_ go out of scope before
-    // its synchronizer.
-    return Bind(&Synchronizer::StatusCB, Unretained(this));
+    return Bind(Data::Callback, std::weak_ptr<Data>(data_));
   }
 
   StdStatusCallback AsStdStatusCallback() {
-    // Synchronizers are often declared on the stack, so it doesn't make
-    // sense for a callback to take a reference to its synchronizer.
-    //
-    // Note: this means the returned callback _must_ go out of scope before
-    // its synchronizer.
-    return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1);
+    return std::bind(Data::Callback, std::weak_ptr<Data>(data_), 
std::placeholders::_1);
   }
 
-  Status Wait() {
-    l_.Wait();
-    return s_;
+  Status Wait() const {
+    data_->latch.Wait();
+    return data_->status;
   }
 
-  Status WaitFor(const MonoDelta& delta) {
-    if (PREDICT_FALSE(!l_.WaitFor(delta))) {
-      return Status::TimedOut("Timed out while waiting for the callback to be 
called.");
+  Status WaitFor(const MonoDelta& delta) const {
+    if (PREDICT_FALSE(!data_->latch.WaitFor(delta))) {
+      return Status::TimedOut("timed out while waiting for the callback to be 
called");
     }
-    return s_;
+    return data_->status;
   }
 
   void Reset() {
-    l_.Reset(1);
+    data_->latch.Reset(1);
   }
 
  private:
-  Status s_;
-  CountDownLatch l_;
 
-  DISALLOW_COPY_AND_ASSIGN(Synchronizer);
-};
+  struct Data {
+    Data() : latch(1) {
+    }
 
+    static void Callback(std::weak_ptr<Data> weak, const Status& status) {
+      auto ptr = weak.lock();
+      if (ptr) {
+        ptr->status = status;
+        ptr->latch.CountDown();
+      }
+    }
+
+    Status status;
+    CountDownLatch latch;
+    DISALLOW_COPY_AND_ASSIGN(Data);
+  };
+
+  std::shared_ptr<Data> data_;
+};
 } // namespace kudu
-#endif /* KUDU_UTIL_ASYNC_UTIL_H */

Reply via email to