HBASE-15774 Fix Upgrade lock usage in connection pool.

Summary:
Use upgrade lock better.
Fix a clang warning around initializing the UpgradeHolder incorrectly.
Remove dead code. ( We'l need to add it back when there's a better plan)
Add on more comments.

Test Plan: buck test --all

Differential Revision: https://reviews.facebook.net/D58005


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f6ea4937
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f6ea4937
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f6ea4937

Branch: refs/heads/HBASE-14850
Commit: f6ea49374a552bc321e6954fc3e36587cbf4740f
Parents: eb4cde4
Author: Elliott Clark <ecl...@apache.org>
Authored: Tue May 10 17:44:41 2016 -0700
Committer: Elliott Clark <ecl...@apache.org>
Committed: Mon Jul 11 16:47:26 2016 -0700

----------------------------------------------------------------------
 .../connection/connection-pool-test.cc          | 12 ++--
 .../connection/connection-pool.cc               | 47 +++++++++++---
 .../connection/connection-pool.h                |  6 +-
 hbase-native-client/core/client.h               |  2 +-
 hbase-native-client/core/location-cache.cc      | 68 ++++----------------
 5 files changed, 62 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f6ea4937/hbase-native-client/connection/connection-pool-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool-test.cc 
b/hbase-native-client/connection/connection-pool-test.cc
index c0c346f..bd2d585 100644
--- a/hbase-native-client/connection/connection-pool-test.cc
+++ b/hbase-native-client/connection/connection-pool-test.cc
@@ -79,9 +79,9 @@ TEST(TestConnectionPool, TestOnlyCreateOnce) {
   sn.set_host_name(hostname);
   sn.set_port(port);
 
-  auto result = cp.get(sn);
+  auto result = cp.Get(sn);
   ASSERT_TRUE(result != nullptr);
-  result = cp.get(sn);
+  result = cp.Get(sn);
 }
 
 TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) {
@@ -102,13 +102,13 @@ TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) {
   ConnectionPool cp{mock_cf};
 
   {
-    auto result_one = cp.get(folly::to<ServerName>(
+    auto result_one = cp.Get(folly::to<ServerName>(
         hostname_one + ":" + folly::to<std::string>(port)));
-    auto result_two = cp.get(folly::to<ServerName>(
+    auto result_two = cp.Get(folly::to<ServerName>(
         hostname_two + ":" + folly::to<std::string>(port)));
   }
-  auto result_one = cp.get(
+  auto result_one = cp.Get(
       folly::to<ServerName>(hostname_one + ":" + 
folly::to<std::string>(port)));
-  auto result_two = cp.get(
+  auto result_two = cp.Get(
       folly::to<ServerName>(hostname_two + ":" + 
folly::to<std::string>(port)));
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f6ea4937/hbase-native-client/connection/connection-pool.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.cc 
b/hbase-native-client/connection/connection-pool.cc
index 90e2056..aa3d094 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -48,25 +48,54 @@ ConnectionPool::~ConnectionPool() {
   clients_.clear();
 }
 
-std::shared_ptr<HBaseService> ConnectionPool::get(const ServerName &sn) {
-  // Create a read lock.
-  SharedMutexWritePriority::UpgradeHolder holder(map_mutex_);
+std::shared_ptr<HBaseService> ConnectionPool::Get(const ServerName &sn) {
+  // Try and get th cached connection.
+  auto found_ptr = GetCached(sn);
 
+  // If there's no connection then create it.
+  if (found_ptr == nullptr) {
+    found_ptr = GetNew(sn);
+  }
+  return found_ptr;
+}
+
+std::shared_ptr<HBaseService> ConnectionPool::GetCached(const ServerName &sn) {
+  SharedMutexWritePriority::ReadHolder holder(map_mutex_);
   auto found = connections_.find(sn);
-  if (found == connections_.end() || found->second == nullptr) {
-    // Move the upgradable lock into the write lock if the connection
-    // hasn't been found.
-    SharedMutexWritePriority::WriteHolder holder(std::move(holder));
+  if (found == connections_.end()) {
+    return nullptr;
+  }
+  return found->second;
+}
+
+std::shared_ptr<HBaseService> ConnectionPool::GetNew(const ServerName &sn) {
+  // Grab the upgrade lock. While we are double checking other readers can
+  // continue on
+  SharedMutexWritePriority::UpgradeHolder u_holder{map_mutex_};
+
+  // Now check if someone else created the connection before we got the lock
+  // This is safe since we hold the upgrade lock.
+  // upgrade lock is more power than the reader lock.
+  auto found = connections_.find(sn);
+  if (found != connections_.end() && found->second != nullptr) {
+    return found->second;
+  } else {
+    // Yeah it looks a lot like there's no connection
+    SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)};
+
+    // Make double sure there are not stale connections hanging around.
+    connections_.erase(sn);
+
+    // Nope we are the ones who should create the new connection.
     auto client = cf_->MakeBootstrap();
     auto dispatcher = cf_->Connect(client, sn.host_name(), sn.port());
     clients_.insert(std::make_pair(sn, client));
     connections_.insert(std::make_pair(sn, dispatcher));
     return dispatcher;
   }
-  return found->second;
 }
 
-void ConnectionPool::close(const ServerName &sn) {
+void ConnectionPool::Close(const ServerName &sn) {
   SharedMutexWritePriority::WriteHolder holder{map_mutex_};
 
   auto found = connections_.find(sn);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f6ea4937/hbase-native-client/connection/connection-pool.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.h 
b/hbase-native-client/connection/connection-pool.h
index 60f00de..605a81b 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -80,14 +80,16 @@ public:
    * Get a connection to the server name. Start time is ignored.
    * This can be a blocking operation for a short time.
    */
-  std::shared_ptr<HBaseService> get(const hbase::pb::ServerName &sn);
+  std::shared_ptr<HBaseService> Get(const hbase::pb::ServerName &sn);
 
   /**
    * Close/remove a connection.
    */
-  void close(const hbase::pb::ServerName &sn);
+  void Close(const hbase::pb::ServerName &sn);
 
 private:
+  std::shared_ptr<HBaseService> GetCached(const hbase::pb::ServerName& sn);
+  std::shared_ptr<HBaseService> GetNew(const hbase::pb::ServerName& sn);
   std::unordered_map<hbase::pb::ServerName, std::shared_ptr<HBaseService>,
                      ServerNameHash, ServerNameEquals>
       connections_;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f6ea4937/hbase-native-client/core/client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.h 
b/hbase-native-client/core/client.h
index 4db82c4..ba24bb9 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -48,9 +48,9 @@ public:
   ~Client();
 
 private:
-  LocationCache location_cache_;
   std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
   std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
+  LocationCache location_cache_;
 };
 
 } // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/f6ea4937/hbase-native-client/core/location-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.cc 
b/hbase-native-client/core/location-cache.cc
index 9f2a0ef..6ba8add 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -115,7 +115,7 @@ Future<std::shared_ptr<RegionLocation>>
 LocationCache::LocateFromMeta(const TableName &tn, const string &row) {
   return this->LocateMeta()
       .via(cpu_executor_.get())
-      .then([this](ServerName sn) { return this->cp_.get(sn); })
+      .then([this](ServerName sn) { return this->cp_.Get(sn); })
     .then([tn, row, this](std::shared_ptr<HBaseService> service) {
         return (*service)(std::move(meta_util_.MetaRequest(tn, row)));
       })
@@ -134,67 +134,25 @@ LocationCache::LocateFromMeta(const TableName &tn, const 
string &row) {
       })
       .then([this](std::shared_ptr<RegionLocation> rl) {
         // Now fill out the connection.
-        rl->set_service(cp_.get(rl->server_name()));
+        rl->set_service(cp_.Get(rl->server_name()));
         return rl;
       });
 }
 
-/**
- * Filter to remove a service from the location cache and the connection cache
- * on errors
- * or on cloase.
- */
-class RemoveServiceFilter
-    : public ServiceFilter<std::unique_ptr<Request>, Response> {
-
-public:
-  /** Create a new filter. */
-  RemoveServiceFilter(std::shared_ptr<HBaseService> service, ServerName sn,
-                      ConnectionPool &cp)
-      : ServiceFilter<unique_ptr<Request>, Response>(service), sn_(sn),
-        cp_(cp) {}
-
-  /**
-   * Close will remove the connection from all caches.
-   */
-  folly::Future<folly::Unit> close() override {
-    if (!released.exchange(true)) {
-      return this->service_->close().then([this]() {
-        // TODO(eclark): remove the service from the meta cache.
-        this->cp_.close(this->sn_);
-      });
-    } else {
-      return folly::makeFuture();
-    }
-  }
-
-  /** Has this been closed */
-  virtual bool isAvailable() override {
-    return !released && service_->isAvailable();
-  }
-
-  /** Send the message. */
-  folly::Future<Response> operator()(unique_ptr<Request> req) override {
-    // TODO(eclark): add in an on error handler that will
-    // remove the region location from the cache if needed.
-    // Also close the connection if this is likely to be an error
-    // that needs to get a new connection.
-    return (*this->service_)(std::move(req));
-  }
-
-private:
-  std::atomic<bool> released{false};
-  hbase::pb::ServerName sn_;
-  ConnectionPool &cp_;
-};
-
 std::shared_ptr<RegionLocation>
 LocationCache::CreateLocation(const Response &resp) {
   auto resp_msg = static_pointer_cast<ScanResponse>(resp.resp_msg());
   auto &results = resp_msg->results().Get(0);
   auto &cells = results.cell();
-  auto ri = folly::to<RegionInfo>(cells.Get(0).value());
-  auto sn = folly::to<ServerName>(cells.Get(1).value());
-  return std::make_shared<RegionLocation>(cells.Get(0).row(), std::move(ri), 
sn,
-                                          nullptr);
+
+  // TODO(eclark): There should probably be some better error
+  // handling around this.
+  auto cell_zero = cells.Get(0).value();
+  auto cell_one = cells.Get(1).value();
+  auto row = cells.Get(0).row();
+
+  auto region_info = folly::to<RegionInfo>(cell_zero);
+  auto server_name = folly::to<ServerName>(cell_one);
+  return std::make_shared<RegionLocation>(row, std::move(region_info),
+                                          server_name, nullptr);
 }

Reply via email to