Repository: hbase Updated Branches: refs/heads/HBASE-14850 66f8f36ec -> 8aa8a9251
HBASE-17726 [C++] Move implementation from header to cc for request retry Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8aa8a925 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8aa8a925 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8aa8a925 Branch: refs/heads/HBASE-14850 Commit: 8aa8a92519bd93e403ea45863490eafbf25e7eb9 Parents: 66f8f36 Author: Enis Soztutar <e...@apache.org> Authored: Tue Apr 11 06:14:41 2017 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Tue Apr 11 06:14:41 2017 -0700 ---------------------------------------------------------------------- hbase-native-client/core/BUCK | 7 +- hbase-native-client/core/async-connection.h | 10 +- hbase-native-client/core/async-region-locator.h | 1 + .../core/async-rpc-retrying-caller.cc | 197 ++++++++++++++++++- .../core/async-rpc-retrying-caller.h | 168 +--------------- .../core/async-rpc-retrying-test.cc | 30 ++- hbase-native-client/core/get-test.cc | 2 +- hbase-native-client/core/mutation.cc | 4 +- hbase-native-client/core/mutation.h | 2 +- hbase-native-client/core/region-request.h | 4 +- hbase-native-client/core/request-converter.h | 4 +- hbase-native-client/core/response-converter.cc | 42 ++-- 12 files changed, 265 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 30c3390..412ee3b 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -21,6 +21,8 @@ cxx_library( exported_headers=[ "async-connection.h", "async-region-locator.h", + "async-rpc-retrying-caller-factory.h", + "async-rpc-retrying-caller.h", "client.h", "cell.h", "hbase-macros.h", @@ -42,8 +44,6 @@ cxx_library( "response-converter.h", "table.h", "raw-async-table.h", - "async-rpc-retrying-caller-factory.h", - "async-rpc-retrying-caller.h", "hbase-rpc-controller.h", "time-range.h", "zk-util.h", @@ -56,8 +56,11 @@ cxx_library( ], srcs=[ "async-connection.cc", + "async-rpc-retrying-caller-factory.cc", + "async-rpc-retrying-caller.cc", "cell.cc", "client.cc", + "hbase-rpc-controller.cc", "keyvalue-codec.cc", "location-cache.cc", "meta-utils.cc", http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/async-connection.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-connection.h b/hbase-native-client/core/async-connection.h index 6a61124..ff11577 100644 --- a/hbase-native-client/core/async-connection.h +++ b/hbase-native-client/core/async-connection.h @@ -26,12 +26,12 @@ #include <memory> #include <string> +#include <utility> #include "connection/rpc-client.h" #include "core/async-region-locator.h" #include "core/configuration.h" #include "core/connection-configuration.h" -#include "core/connection-configuration.h" #include "core/hbase-configuration-loader.h" #include "core/hbase-rpc-controller.h" #include "core/keyvalue-codec.h" @@ -45,8 +45,8 @@ class AsyncRpcRetryingCallerFactory; class AsyncConnection { public: - AsyncConnection(){}; - virtual ~AsyncConnection(){}; + AsyncConnection() {} + virtual ~AsyncConnection() {} virtual std::shared_ptr<Configuration> conf() = 0; virtual std::shared_ptr<ConnectionConfiguration> connection_conf() = 0; virtual std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() = 0; @@ -82,7 +82,7 @@ class AsyncConnectionImpl : public AsyncConnection, return std::make_shared<HBaseRpcController>(); } - virtual void Close() override; + void Close() override; protected: AsyncConnectionImpl() {} @@ -105,7 +105,7 @@ class AsyncConnectionImpl : public AsyncConnection, bool is_closed_ = false; private: - AsyncConnectionImpl(std::shared_ptr<Configuration> conf) : conf_(conf) {} + explicit AsyncConnectionImpl(std::shared_ptr<Configuration> conf) : conf_(conf) {} void Init(); }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/async-region-locator.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-region-locator.h b/hbase-native-client/core/async-region-locator.h index b0019e0..c606dcb 100644 --- a/hbase-native-client/core/async-region-locator.h +++ b/hbase-native-client/core/async-region-locator.h @@ -20,6 +20,7 @@ #pragma once #include <folly/futures/Future.h> +#include <memory> #include <string> #include "core/region-location.h" http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/async-rpc-retrying-caller.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc index 743b6bb..965a44b 100644 --- a/hbase-native-client/core/async-rpc-retrying-caller.cc +++ b/hbase-native-client/core/async-rpc-retrying-caller.cc @@ -19,4 +19,199 @@ #include "core/async-rpc-retrying-caller.h" -namespace hbase {} /* namespace hbase */ +#include <folly/Format.h> +#include <folly/Logging.h> +#include <folly/futures/Unit.h> + +#include "connection/rpc-client.h" +#include "core/async-connection.h" +#include "core/hbase-rpc-controller.h" +#include "core/region-location.h" +#include "core/result.h" +#include "exceptions/exception.h" +#include "if/HBase.pb.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +namespace hbase { + +template <typename RESP> +AsyncSingleRequestRpcRetryingCaller<RESP>::AsyncSingleRequestRpcRetryingCaller( + std::shared_ptr<AsyncConnection> conn, std::shared_ptr<hbase::pb::TableName> table_name, + const std::string& row, RegionLocateType locate_type, Callable<RESP> callable, + nanoseconds pause, uint32_t max_retries, nanoseconds operation_timeout_nanos, + nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count) + : conn_(conn), + table_name_(table_name), + row_(row), + locate_type_(locate_type), + callable_(callable), + pause_(pause), + max_retries_(max_retries), + operation_timeout_nanos_(operation_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count), + promise_(std::make_shared<folly::Promise<RESP>>()), + tries_(1) { + controller_ = conn_->CreateRpcController(); + start_ns_ = TimeUtil::GetNowNanos(); + max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); + exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>(); + retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_); +} + +template <typename RESP> +AsyncSingleRequestRpcRetryingCaller<RESP>::~AsyncSingleRequestRpcRetryingCaller() {} + +template <typename RESP> +folly::Future<RESP> AsyncSingleRequestRpcRetryingCaller<RESP>::Call() { + auto f = promise_->getFuture(); + LocateThenCall(); + return f; +} + +template <typename RESP> +void AsyncSingleRequestRpcRetryingCaller<RESP>::LocateThenCall() { + int64_t locate_timeout_ns; + if (operation_timeout_nanos_.count() > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + CompleteExceptionally(); + return; + } + } else { + locate_timeout_ns = -1L; + } + + conn_->region_locator() + ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns) + .then([this](std::shared_ptr<RegionLocation> loc) { Call(*loc); }) + .onError([this](const std::exception& e) { + OnError(e, + [this]() -> std::string { + return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" + + table_name_->qualifier() + " failed, tries = " + std::to_string(tries_) + + ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(operation_timeout_nanos_) + " ms, time elapsed = " + + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; + }, + [](const std::exception& error) {}); + }); +} + +template <typename RESP> +void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError( + const std::exception& error, Supplier<std::string> err_msg, + Consumer<std::exception> update_cached_location) { + ThrowableWithExtraContext twec(std::make_shared<std::exception>(error), TimeUtil::GetNowNanos()); + exceptions_->push_back(twec); + if (SysUtil::InstanceOf<DoNotRetryIOException, std::exception>(error) || tries_ >= max_retries_) { + CompleteExceptionally(); + return; + } + + int64_t delay_ns; + if (operation_timeout_nanos_.count() > 0) { + int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; + if (max_delay_ns <= 0) { + CompleteExceptionally(); + return; + } + delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1)); + } else { + delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1); + } + update_cached_location(error); + tries_++; + retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); }, + milliseconds(TimeUtil::ToMillis(delay_ns))); +} + +template <typename RESP> +void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc) { + int64_t call_timeout_ns; + if (operation_timeout_nanos_.count() > 0) { + call_timeout_ns = this->RemainingTimeNs(); + if (call_timeout_ns <= 0) { + this->CompleteExceptionally(); + return; + } + call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count()); + } else { + call_timeout_ns = rpc_timeout_nanos_.count(); + } + + std::shared_ptr<RpcClient> rpc_client; + try { + // TODO: There is no connection attempt happening here, no need to try-catch. + rpc_client = conn_->rpc_client(); + } catch (const IOException& e) { + OnError(e, + [&, this]() -> std::string { + return "Get async rpc_client to " + + folly::sformat("{0}:{1}", loc.server_name().host_name(), + loc.server_name().port()) + + " for '" + row_ + "' in " + loc.DebugString() + " of " + + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; + }, + [&, this](const std::exception& error) { + conn_->region_locator()->UpdateCachedLocation(loc, error); + }); + return; + } + + ResetController(controller_, call_timeout_ns); + + callable_(controller_, std::make_shared<RegionLocation>(loc), rpc_client) + .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); }) + .onError([&, this](const std::exception& e) { + OnError(e, + [&, this]() -> std::string { + return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(), + loc.server_name().port()) + + " for '" + row_ + "' in " + loc.DebugString() + " of " + + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + + " ms"; + }, + [&, this](const std::exception& error) { + conn_->region_locator()->UpdateCachedLocation(loc, error); + }); + return; + }); +} + +template <typename RESP> +void AsyncSingleRequestRpcRetryingCaller<RESP>::CompleteExceptionally() { + this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); +} + +template <typename RESP> +int64_t AsyncSingleRequestRpcRetryingCaller<RESP>::RemainingTimeNs() { + return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_); +} + +template <typename RESP> +void AsyncSingleRequestRpcRetryingCaller<RESP>::ResetController( + std::shared_ptr<HBaseRpcController> controller, const int64_t& timeout_ns) { + controller->Reset(); + if (timeout_ns >= 0) { + controller->set_call_timeout( + milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_ns)))); + } +} + +// explicit instantiations for the linker. Otherwise, you have to #include the .cc file for the +// templetized +// class definitions. +template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<hbase::Result>>; +template class AsyncSingleRequestRpcRetryingCaller<folly::Unit>; +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/async-rpc-retrying-caller.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h index 6503301..6006388 100644 --- a/hbase-native-client/core/async-rpc-retrying-caller.h +++ b/hbase-native-client/core/async-rpc-retrying-caller.h @@ -18,11 +18,10 @@ */ #pragma once -#include <folly/Format.h> -#include <folly/Logging.h> #include <folly/futures/Future.h> #include <folly/io/async/EventBase.h> #include <folly/io/async/HHWheelTimer.h> + #include <algorithm> #include <chrono> #include <functional> @@ -31,15 +30,11 @@ #include <type_traits> #include <utility> #include <vector> -#include "connection/rpc-client.h" #include "core/async-connection.h" #include "core/hbase-rpc-controller.h" #include "core/region-location.h" #include "exceptions/exception.h" #include "if/HBase.pb.h" -#include "utils/connection-util.h" -#include "utils/sys-util.h" -#include "utils/time-util.h" using std::chrono::nanoseconds; using std::chrono::milliseconds; @@ -80,168 +75,26 @@ class AsyncSingleRequestRpcRetryingCaller { Callable<RESP> callable, nanoseconds pause, uint32_t max_retries, nanoseconds operation_timeout_nanos, nanoseconds rpc_timeout_nanos, - uint32_t start_log_errors_count) - : conn_(conn), - table_name_(table_name), - row_(row), - locate_type_(locate_type), - callable_(callable), - pause_(pause), - max_retries_(max_retries), - operation_timeout_nanos_(operation_timeout_nanos), - rpc_timeout_nanos_(rpc_timeout_nanos), - start_log_errors_count_(start_log_errors_count), - promise_(std::make_shared<folly::Promise<RESP>>()), - tries_(1) { - controller_ = conn_->CreateRpcController(); - start_ns_ = TimeUtil::GetNowNanos(); - max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); - exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>(); - retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_); - } + uint32_t start_log_errors_count); - virtual ~AsyncSingleRequestRpcRetryingCaller() {} + virtual ~AsyncSingleRequestRpcRetryingCaller(); - folly::Future<RESP> Call() { - auto f = promise_->getFuture(); - LocateThenCall(); - return f; - } + folly::Future<RESP> Call(); private: - void LocateThenCall() { - int64_t locate_timeout_ns; - if (operation_timeout_nanos_.count() > 0) { - locate_timeout_ns = RemainingTimeNs(); - if (locate_timeout_ns <= 0) { - CompleteExceptionally(); - return; - } - } else { - locate_timeout_ns = -1L; - } - - conn_->region_locator() - ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns) - .then([this](std::shared_ptr<RegionLocation> loc) { Call(*loc); }) - .onError([this](const std::exception& e) { - OnError(e, - [this]() -> std::string { - return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" + - table_name_->qualifier() + " failed, tries = " + std::to_string(tries_) + - ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " + - TimeUtil::ToMillisStr(operation_timeout_nanos_) + - " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + - " ms"; - }, - [](const std::exception& error) {}); - }); - } + void LocateThenCall(); void OnError(const std::exception& error, Supplier<std::string> err_msg, - Consumer<std::exception> update_cached_location) { - ThrowableWithExtraContext twec(std::make_shared<std::exception>(error), - TimeUtil::GetNowNanos()); - exceptions_->push_back(twec); - if (SysUtil::InstanceOf<DoNotRetryIOException, std::exception>(error) || - tries_ >= max_retries_) { - CompleteExceptionally(); - return; - } - - int64_t delay_ns; - if (operation_timeout_nanos_.count() > 0) { - int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; - if (max_delay_ns <= 0) { - CompleteExceptionally(); - return; - } - delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1)); - } else { - delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1); - } - update_cached_location(error); - tries_++; - retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); }, - milliseconds(TimeUtil::ToMillis(delay_ns))); - } - - void Call(const RegionLocation& loc) { - int64_t call_timeout_ns; - if (operation_timeout_nanos_.count() > 0) { - call_timeout_ns = this->RemainingTimeNs(); - if (call_timeout_ns <= 0) { - this->CompleteExceptionally(); - return; - } - call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count()); - } else { - call_timeout_ns = rpc_timeout_nanos_.count(); - } + Consumer<std::exception> update_cached_location); - std::shared_ptr<RpcClient> rpc_client; - try { - // TODO: There is no connection attempt happening here, no need to try-catch. - rpc_client = conn_->rpc_client(); - } catch (const IOException& e) { - OnError(e, - [&, this]() -> std::string { - return "Get async rpc_client to " + - folly::sformat("{0}:{1}", loc.server_name().host_name(), - loc.server_name().port()) + - " for '" + row_ + "' in " + loc.DebugString() + " of " + - table_name_->namespace_() + "::" + table_name_->qualifier() + - " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + - std::to_string(max_attempts_) + ", timeout = " + - TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + - " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; - }, - [&, this](const std::exception& error) { - conn_->region_locator()->UpdateCachedLocation(loc, error); - }); - return; - } + void Call(const RegionLocation& loc); - ResetController(controller_, call_timeout_ns); + void CompleteExceptionally(); - callable_(controller_, std::make_shared<RegionLocation>(loc), rpc_client) - .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); }) - .onError([&, this](const std::exception& e) { - OnError(e, - [&, this]() -> std::string { - return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(), - loc.server_name().port()) + - " for '" + row_ + "' in " + loc.DebugString() + " of " + - table_name_->namespace_() + "::" + table_name_->qualifier() + - " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + - std::to_string(max_attempts_) + ", timeout = " + - TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + - " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + - " ms"; - }, - [&, this](const std::exception& error) { - conn_->region_locator()->UpdateCachedLocation(loc, error); - }); - return; - }); - } - - void CompleteExceptionally() { - this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); - } - - int64_t RemainingTimeNs() { - return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_); - } + int64_t RemainingTimeNs(); static void ResetController(std::shared_ptr<HBaseRpcController> controller, - const int64_t& timeout_ns) { - controller->Reset(); - if (timeout_ns >= 0) { - controller->set_call_timeout( - milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_ns)))); - } - } + const int64_t& timeout_ns); private: folly::HHWheelTimer::UniquePtr retry_timer_; @@ -263,5 +116,4 @@ class AsyncSingleRequestRpcRetryingCaller { uint32_t max_attempts_; folly::EventBase event_base_; }; - } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/async-rpc-retrying-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc index 3ed6866..4956972 100644 --- a/hbase-native-client/core/async-rpc-retrying-test.cc +++ b/hbase-native-client/core/async-rpc-retrying-test.cc @@ -47,11 +47,23 @@ #include "test-util/test-util.h" #include "utils/time-util.h" -using namespace google::protobuf; -using namespace hbase; -using namespace hbase::pb; -using namespace std::placeholders; -using namespace testing; +using hbase::AsyncRpcRetryingCallerFactory; +using hbase::AsyncConnection; +using hbase::AsyncRegionLocator; +using hbase::ConnectionConfiguration; +using hbase::Configuration; +using hbase::HBaseRpcController; +using hbase::RegionLocation; +using hbase::RegionLocateType; +using hbase::RpcClient; +using hbase::RequestConverter; +using hbase::ResponseConverter; +using hbase::ReqConverter; +using hbase::RespConverter; +using hbase::Put; +using hbase::TimeUtil; +using hbase::Client; + using ::testing::Return; using ::testing::_; using std::chrono::nanoseconds; @@ -62,10 +74,10 @@ class MockAsyncRegionLocator : public AsyncRegionLocator { : region_location_(region_location) {} ~MockAsyncRegionLocator() = default; - folly::Future<std::shared_ptr<RegionLocation>> LocateRegion(const hbase::pb::TableName&, - const std::string&, - const RegionLocateType, - const int64_t) override { + folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName&, + const std::string&, + const RegionLocateType, + const int64_t) override { folly::Promise<std::shared_ptr<RegionLocation>> promise; promise.setValue(region_location_); return promise.getFuture(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/get-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/get-test.cc b/hbase-native-client/core/get-test.cc index 6127e23..4a44a26 100644 --- a/hbase-native-client/core/get-test.cc +++ b/hbase-native-client/core/get-test.cc @@ -17,8 +17,8 @@ * */ -#include "core/cell.h" #include "core/get.h" +#include "core/cell.h" #include <glog/logging.h> #include <gtest/gtest.h> http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/mutation.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/mutation.cc b/hbase-native-client/core/mutation.cc index ab33105..7182202 100644 --- a/hbase-native-client/core/mutation.cc +++ b/hbase-native-client/core/mutation.cc @@ -26,8 +26,8 @@ namespace hbase { -Mutation::Mutation(const std::string &row) : Row(row) { } -Mutation::Mutation(const std::string &row, int64_t timestamp) : Row(row), timestamp_(timestamp) { } +Mutation::Mutation(const std::string &row) : Row(row) {} +Mutation::Mutation(const std::string &row, int64_t timestamp) : Row(row), timestamp_(timestamp) {} Mutation::Mutation(const Mutation &mutation) { row_ = mutation.row_; http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/mutation.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/mutation.h b/hbase-native-client/core/mutation.h index 5e0381b..496891e 100644 --- a/hbase-native-client/core/mutation.h +++ b/hbase-native-client/core/mutation.h @@ -31,7 +31,7 @@ namespace hbase { -class Mutation: public Row { +class Mutation : public Row { public: /** * Constructors http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/region-request.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-request.h b/hbase-native-client/core/region-request.h index 6f29d44..7ce7c96 100644 --- a/hbase-native-client/core/region-request.h +++ b/hbase-native-client/core/region-request.h @@ -34,9 +34,7 @@ class RegionRequest { explicit RegionRequest(const std::shared_ptr<hbase::RegionLocation> ®ion_loc) : region_loc_(region_loc) {} ~RegionRequest() {} - void AddAction(std::shared_ptr<Action> action) { - actions_.push_back(action); - } + void AddAction(std::shared_ptr<Action> action) { actions_.push_back(action); } std::shared_ptr<hbase::RegionLocation> region_location() const { return region_loc_; } const ActionList &actions() const { return actions_; } http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/request-converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index ff6b290..6861604 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -26,11 +26,11 @@ #include "core/action.h" #include "core/cell.h" #include "core/get.h" +#include "core/mutation.h" +#include "core/put.h" #include "core/region-request.h" #include "core/scan.h" #include "core/server-request.h" -#include "core/mutation.h" -#include "core/put.h" #include "if/Client.pb.h" #include "if/HBase.pb.h" http://git-wip-us.apache.org/repos/asf/hbase/blob/8aa8a925/hbase-native-client/core/response-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index 7729257..b29a819 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -61,11 +61,11 @@ std::shared_ptr<Result> ResponseConverter::ToResult( int cells_read = 0; while (cells_read != result.associated_cell_count()) { if (cell_scanner->Advance()) { - vcells.push_back(cell_scanner->Current()); + vcells.push_back(cell_scanner->Current()); cells_read += 1; } else { - LOG(ERROR)<< "CellScanner::Advance() returned false unexpectedly. Cells Read:- " - << cells_read << "; Expected Cell Count:- " << result.associated_cell_count(); + LOG(ERROR) << "CellScanner::Advance() returned false unexpectedly. Cells Read:- " + << cells_read << "; Expected Cell Count:- " << result.associated_cell_count(); std::runtime_error("CellScanner::Advance() returned false unexpectedly"); } } @@ -111,16 +111,15 @@ std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const R std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ptr<Request> req, const Response& resp) { - auto multi_req = std::static_pointer_cast < hbase::pb::MultiRequest > (req->req_msg()); - auto multi_resp = std::static_pointer_cast < hbase::pb::MultiResponse > (resp.resp_msg()); + auto multi_req = std::static_pointer_cast<hbase::pb::MultiRequest>(req->req_msg()); + auto multi_resp = std::static_pointer_cast<hbase::pb::MultiResponse>(resp.resp_msg()); VLOG(3) << "GetResults:" << multi_resp->ShortDebugString(); int req_region_action_count = multi_req->regionaction_size(); int res_region_action_count = multi_resp->regionactionresult_size(); if (req_region_action_count != res_region_action_count) { - throw std::runtime_error( - "Request mutation count=" + std::to_string(req_region_action_count) - + " does not match response mutation result count=" - + std::to_string(res_region_action_count)); + throw std::runtime_error("Request mutation count=" + std::to_string(req_region_action_count) + + " does not match response mutation result count=" + + std::to_string(res_region_action_count)); } auto multi_response = std::make_unique<hbase::MultiResponse>(); for (int32_t num = 0; num < res_region_action_count; num++) { @@ -134,7 +133,7 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ auto region_name = rs.value(); if (action_result.has_exception()) { if (action_result.exception().has_value()) { - auto exc = std::make_shared < hbase::IOException > (action_result.exception().value()); + auto exc = std::make_shared<hbase::IOException>(action_result.exception().value()); VLOG(8) << "Store Region Exception:- " << exc->what(); multi_response->AddRegionException(region_name, exc); } @@ -142,19 +141,18 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ } if (actions.action_size() != action_result.resultorexception_size()) { - throw std::runtime_error( - "actions.action_size=" + std::to_string(actions.action_size()) - + ", action_result.resultorexception_size=" - + std::to_string(action_result.resultorexception_size()) + " for region " - + actions.region().value()); + throw std::runtime_error("actions.action_size=" + std::to_string(actions.action_size()) + + ", action_result.resultorexception_size=" + + std::to_string(action_result.resultorexception_size()) + + " for region " + actions.region().value()); } for (hbase::pb::ResultOrException roe : action_result.resultorexception()) { - std::shared_ptr < Result > result; - std::shared_ptr < std::exception > exc; + std::shared_ptr<Result> result; + std::shared_ptr<std::exception> exc; if (roe.has_exception()) { if (roe.exception().has_value()) { - exc = std::make_shared < hbase::IOException > (roe.exception().value()); + exc = std::make_shared<hbase::IOException>(roe.exception().value()); VLOG(8) << "Store ResultOrException:- " << exc->what(); } } else if (roe.has_result()) { @@ -165,9 +163,9 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ // Sometimes, the response is just "it was processed". Generally, this occurs for things // like mutateRows where either we get back 'processed' (or not) and optionally some // statistics about the regions we touched. - std::vector < std::shared_ptr < Cell >> empty_cells; - result = std::make_shared < Result - > (empty_cells, multi_resp->processed() ? true : false, false, false); + std::vector<std::shared_ptr<Cell>> empty_cells; + result = std::make_shared<Result>(empty_cells, multi_resp->processed() ? true : false, + false, false); } multi_response->AddRegionResult(region_name, roe.index(), std::move(result), exc); } @@ -177,7 +175,7 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ hbase::pb::MultiRegionLoadStats stats = multi_resp->regionstatistics(); for (int i = 0; i < stats.region_size(); i++) { multi_response->AddStatistic(stats.region(i).value(), - std::make_shared < RegionLoadStats > (stats.stat(i))); + std::make_shared<RegionLoadStats>(stats.stat(i))); } } return multi_response;