HBASE-15777 Fix needs header in client handler
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b3839162 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b3839162 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b3839162 Branch: refs/heads/HBASE-14850 Commit: b3839162d6f41ba66d762103a3a81fb688ccf057 Parents: def79c8 Author: Elliott Clark <ecl...@apache.org> Authored: Thu May 5 13:51:18 2016 -0700 Committer: Elliott Clark <ecl...@apache.org> Committed: Fri May 13 14:36:28 2016 -0700 ---------------------------------------------------------------------- .../connection/client-handler.cc | 31 ++++++++++++-------- hbase-native-client/connection/client-handler.h | 19 +++++++++++- 2 files changed, 36 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b3839162/hbase-native-client/connection/client-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index b92ad89..4fdb7ae 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -37,7 +37,7 @@ using hbase::pb::GetResponse; using google::protobuf::Message; ClientHandler::ClientHandler(std::string user_name) - : user_name_(user_name), need_send_header_(true), serde_(), + : user_name_(user_name), serde_(), header_info_(), resp_msgs_( make_unique<folly::AtomicHashMap< uint32_t, std::shared_ptr<google::protobuf::Message>>>(5000)) {} @@ -81,22 +81,27 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { } } -// TODO(eclark): Figure out how to handle the -// network errors that are going to come. Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) { // Keep track of if we have sent the header. - if (UNLIKELY(need_send_header_)) { - need_send_header_ = false; + // + // even though the bool is atomic we can load it lazily here. + if (UNLIKELY(header_info_->need_.load(std::memory_order_relaxed))) { - // Should we be sending just one fireWrite? - // Right now we're sending one for the header - // and one for the request. + // Grab the lock. + // We need to make sure that no one gets past here without there being a + // hearder sent. + std::lock_guard<std::mutex> lock(header_info_->mutex_); + + // Now see if we are the first thread to get through. // - // That doesn't seem like too bad, but who knows. - auto pre = serde_.Preamble(); - auto header = serde_.Header(user_name_); - pre->appendChain(std::move(header)); - ctx->fireWrite(std::move(pre)); + // If this is the first thread to get through then the + // need_send_header will have been true before this. + if (header_info_->need_.exchange(false)) { + auto pre = serde_.Preamble(); + auto header = serde_.Header(user_name_); + pre->appendChain(std::move(header)); + ctx->fireWrite(std::move(pre)); + } } resp_msgs_->insert(r->call_id(), r->resp_msg()); http://git-wip-us.apache.org/repos/asf/hbase/blob/b3839162/hbase-native-client/connection/client-handler.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index be5143c..1a4275f 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -21,6 +21,8 @@ #include <folly/AtomicHashMap.h> #include <wangle/channel/Handler.h> +#include <atomic> +#include <mutex> #include <string> #include "serde/rpc.h" @@ -29,6 +31,7 @@ namespace hbase { class Request; class Response; +class HeaderInfo; } namespace google { namespace protobuf { @@ -37,6 +40,7 @@ class Message; } namespace hbase { + class ClientHandler : public wangle::Handler<std::unique_ptr<folly::IOBuf>, Response, std::unique_ptr<Request>, std::unique_ptr<folly::IOBuf>> { @@ -47,7 +51,7 @@ public: std::unique_ptr<Request> r) override; private: - bool need_send_header_; + std::unique_ptr<HeaderInfo> header_info_; std::string user_name_; RpcSerde serde_; @@ -56,4 +60,17 @@ private: uint32_t, std::shared_ptr<google::protobuf::Message>>> resp_msgs_; }; + +/** + * Class to contain the info about if the connection header and preamble has + * been sent. + * + * We use a serperate class here so that ClientHandler is relocatable. + */ +class HeaderInfo { +public: + HeaderInfo() : need_(true), mutex_() {} + std::atomic<bool> need_; + std::mutex mutex_; +}; } // namespace hbase