Repository: hbase Updated Branches: refs/heads/HBASE-14850 f27075a6f -> 73b65b475
HBASE-18178 [C++] Retrying meta location lookup and zookeeper connection Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/73b65b47 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/73b65b47 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/73b65b47 Branch: refs/heads/HBASE-14850 Commit: 73b65b475c2277115cff4ded67e44987a7b2b157 Parents: f27075a Author: Enis Soztutar <e...@apache.org> Authored: Fri Jun 16 11:55:51 2017 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Fri Jun 16 11:55:51 2017 -0700 ---------------------------------------------------------------------- .../connection/client-handler.cc | 9 +- hbase-native-client/core/BUCK | 12 ++ .../core/async-rpc-retrying-caller.cc | 17 +-- .../core/async-rpc-retrying-test.cc | 3 +- .../core/connection-configuration.h | 2 +- .../core/location-cache-retry-test.cc | 112 +++++++++++++++++++ hbase-native-client/core/location-cache.cc | 100 ++++++++++++----- hbase-native-client/core/location-cache.h | 8 +- hbase-native-client/core/meta-utils.cc | 42 +++++-- hbase-native-client/core/meta-utils.h | 24 +++- hbase-native-client/core/region-location.h | 19 +--- hbase-native-client/core/response-converter.cc | 2 +- hbase-native-client/core/simple-client.cc | 18 ++- hbase-native-client/core/zk-util.cc | 4 + hbase-native-client/core/zk-util.h | 5 + hbase-native-client/serde/region-info.h | 4 +- hbase-native-client/serde/table-name.h | 5 +- hbase-native-client/test-util/mini-cluster.cc | 4 + hbase-native-client/test-util/test-util.cc | 4 + hbase-native-client/test-util/test-util.h | 1 + hbase-native-client/utils/bytes-util-test.cc | 3 +- hbase-native-client/utils/bytes-util.cc | 4 +- 22 files changed, 315 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/connection/client-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index 894ecb3..775df68 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -51,7 +51,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) { int used_bytes = serde_.ParseDelimited(buf.get(), &header); VLOG(3) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() - << " has_exception=" << header.has_exception(); + << " has_exception=" << header.has_exception() << ", server: " << server_; // Get the response protobuf from the map auto search = resp_msgs_->find(header.call_id()); @@ -80,7 +80,8 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) { } VLOG(3) << "Read RPCResponse, buf length:" << buf->length() - << ", header PB length:" << used_bytes << ", cell_block length:" << cell_block_length; + << ", header PB length:" << used_bytes << ", cell_block length:" << cell_block_length + << ", server: " << server_; // Make sure that bytes were parsed. CHECK((used_bytes + cell_block_length) == buf->length()); @@ -113,7 +114,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) { VLOG(3) << "Exception RPC ResponseHeader, call_id=" << header.call_id() << " exception.what=" << remote_exception->what() - << ", do_not_retry=" << remote_exception->do_not_retry(); + << ", do_not_retry=" << remote_exception->do_not_retry() << ", server: " << server_; received->set_exception(folly::exception_wrapper{*remote_exception}); } ctx->fireRead(std::move(received)); @@ -129,7 +130,7 @@ folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Re ctx->fireWrite(std::move(header)); }); - VLOG(3) << "Writing RPC Request:" << r->DebugString(); + VLOG(3) << "Writing RPC Request:" << r->DebugString() << ", server: " << server_; // Now store the call id to response. resp_msgs_->insert(r->call_id(), r->resp_msg()); http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 47e97f5..464c010 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -137,6 +137,18 @@ cxx_test( ], run_test_separately=True,) cxx_test( + name="location-cache-retry-test", + srcs=[ + "location-cache-retry-test.cc", + ], + deps=[ + ":core", + "//if:if", + "//serde:serde", + "//test-util:test-util", + ], + run_test_separately=True,) +cxx_test( name="cell-test", srcs=[ "cell-test.cc", http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/async-rpc-retrying-caller.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc index 0302ad3..aee7d0b 100644 --- a/hbase-native-client/core/async-rpc-retrying-caller.cc +++ b/hbase-native-client/core/async-rpc-retrying-caller.cc @@ -168,14 +168,17 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc) ResetController(controller_, call_timeout_ns); - callable_(controller_, std::make_shared<RegionLocation>(loc), rpc_client) - .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); }) - .onError([&, this](const exception_wrapper& e) { + // TODO: RegionLocation should propagate through these method chains as a shared_ptr. + // Otherwise, it may get deleted underneat us. We are just copying for now. + auto loc_ptr = std::make_shared<RegionLocation>(loc); + callable_(controller_, loc_ptr, rpc_client) + .then([loc_ptr, this](const RESP& resp) { this->promise_->setValue(std::move(resp)); }) + .onError([&, loc_ptr, this](const exception_wrapper& e) { OnError(e, [&, this]() -> std::string { - return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(), - loc.server_name().port()) + - " for '" + row_ + "' in " + loc.DebugString() + " of " + + return "Call to " + folly::sformat("{0}:{1}", loc_ptr->server_name().host_name(), + loc_ptr->server_name().port()) + + " for '" + row_ + "' in " + loc_ptr->DebugString() + " of " + table_name_->namespace_() + "::" + table_name_->qualifier() + " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " + @@ -184,7 +187,7 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc) " ms"; }, [&, this](const exception_wrapper& error) { - conn_->region_locator()->UpdateCachedLocation(loc, error); + conn_->region_locator()->UpdateCachedLocation(*loc_ptr, error); }); }); } http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/async-rpc-retrying-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc index 0f83914..f887815 100644 --- a/hbase-native-client/core/async-rpc-retrying-test.cc +++ b/hbase-native-client/core/async-rpc-retrying-test.cc @@ -147,8 +147,7 @@ class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase { folly::Promise<std::shared_ptr<RegionLocation>> promise; /* set random region name, simulating invalid region */ auto result = std::make_shared<RegionLocation>( - "whatever-region-name", region_location_->region_info(), region_location_->server_name(), - region_location_->service()); + "whatever-region-name", region_location_->region_info(), region_location_->server_name()); promise.setValue(result); return promise.getFuture(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/connection-configuration.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/connection-configuration.h b/hbase-native-client/core/connection-configuration.h index adc8d5b..995798e 100644 --- a/hbase-native-client/core/connection-configuration.h +++ b/hbase-native-client/core/connection-configuration.h @@ -143,7 +143,7 @@ class ConnectionConfiguration { */ static constexpr const char* kClientRetriesNumber = "hbase.client.retries.number"; - static constexpr const uint32_t kDefaultClientRetriesNumber = 31; + static constexpr const uint32_t kDefaultClientRetriesNumber = 35; /** * Configure the number of failures after which the client will start logging. A few failures http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/location-cache-retry-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache-retry-test.cc b/hbase-native-client/core/location-cache-retry-test.cc new file mode 100644 index 0000000..988f994 --- /dev/null +++ b/hbase-native-client/core/location-cache-retry-test.cc @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <gtest/gtest.h> + +#include "core/append.h" +#include "core/cell.h" +#include "core/client.h" +#include "core/configuration.h" +#include "core/delete.h" +#include "core/get.h" +#include "core/hbase-configuration-loader.h" +#include "core/increment.h" +#include "core/meta-utils.h" +#include "core/put.h" +#include "core/result.h" +#include "core/table.h" +#include "exceptions/exception.h" +#include "serde/table-name.h" +#include "test-util/test-util.h" +#include "utils/bytes-util.h" + +using hbase::Cell; +using hbase::Configuration; +using hbase::Get; +using hbase::MetaUtil; +using hbase::RetriesExhaustedException; +using hbase::Put; +using hbase::Table; +using hbase::TestUtil; + +using std::chrono_literals::operator""s; + +class LocationCacheRetryTest : public ::testing::Test { + public: + static std::unique_ptr<hbase::TestUtil> test_util; + static void SetUpTestCase() { + google::InstallFailureSignalHandler(); + test_util = std::make_unique<hbase::TestUtil>(); + test_util->StartMiniCluster(2); + test_util->conf()->SetInt("hbase.client.retries.number", 5); + } +}; + +std::unique_ptr<hbase::TestUtil> LocationCacheRetryTest::test_util = nullptr; + +TEST_F(LocationCacheRetryTest, GetFromMetaTable) { + auto tn = folly::to<hbase::pb::TableName>("hbase:meta"); + auto row = "test1"; + + hbase::Client client(*LocationCacheRetryTest::test_util->conf()); + + // do a get against the other table, but not the actual table "t". + auto table = client.Table(tn); + hbase::Get get(row); + auto result = table->Get(get); + + LocationCacheRetryTest::test_util->MoveRegion(MetaUtil::kMetaRegion, ""); + + std::this_thread::sleep_for(3s); // sleep 3 sec + + result = table->Get(get); +} + +TEST_F(LocationCacheRetryTest, PutGet) { + LocationCacheRetryTest::test_util->CreateTable("t", "d"); + LocationCacheRetryTest::test_util->CreateTable("t2", "d"); + + auto tn = folly::to<hbase::pb::TableName>("t"); + auto tn2 = folly::to<hbase::pb::TableName>("t2"); + auto row = "test1"; + + hbase::Client client(*LocationCacheRetryTest::test_util->conf()); + + // do a get against the other table, but not the actual table "t". + auto table = client.Table(tn); + auto table2 = client.Table(tn2); + hbase::Get get(row); + auto result = table2->Get(get); + + // we should have already cached the location of meta right now. Now + // move the meta region to the other server so that we will get a NotServingRegionException + // when we do the actual location lookup request. If there is no invalidation + // of the meta's own location, then following put/get will result in retries exhausted. + LocationCacheRetryTest::test_util->MoveRegion(MetaUtil::kMetaRegion, ""); + + std::this_thread::sleep_for(3s); // sleep 3 sec + + table->Put(Put{row}.AddColumn("d", "1", "value1")); + + result = table->Get(get); + + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test1", result->Row()); + EXPECT_EQ("value1", *(result->Value("d", "1"))); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/location-cache.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index dfe3e9f..5f68420 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -24,8 +24,12 @@ #include <wangle/concurrent/CPUThreadPoolExecutor.h> #include <wangle/concurrent/IOThreadPoolExecutor.h> +#include <map> +#include <utility> + #include "connection/response.h" #include "connection/rpc-connection.h" +#include "core/meta-utils.h" #include "exceptions/exception.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" @@ -33,8 +37,6 @@ #include "serde/server-name.h" #include "serde/zk.h" -#include <utility> - using hbase::pb::MetaRegionServer; using hbase::pb::ServerName; using hbase::pb::TableName; @@ -54,27 +56,49 @@ LocationCache::LocationCache(std::shared_ptr<hbase::Configuration> conf, cached_locations_(), locations_lock_() { zk_quorum_ = ZKUtil::ParseZooKeeperQuorum(*conf_); - zk_ = zookeeper_init(zk_quorum_.c_str(), nullptr, 1000, 0, 0, 0); + EnsureZooKeeperConnection(); +} + +LocationCache::~LocationCache() { CloseZooKeeperConnection(); } + +void LocationCache::CloseZooKeeperConnection() { + if (zk_ != nullptr) { + zookeeper_close(zk_); + zk_ = nullptr; + LOG(INFO) << "Closed connection to ZooKeeper."; + } } -LocationCache::~LocationCache() { - zookeeper_close(zk_); - zk_ = nullptr; - LOG(INFO) << "Closed connection to ZooKeeper."; +void LocationCache::EnsureZooKeeperConnection() { + if (zk_ == nullptr) { + LOG(INFO) << "Connecting to ZooKeeper. Quorum:" + zk_quorum_; + auto session_timeout = ZKUtil::SessionTimeout(*conf_); + zk_ = zookeeper_init(zk_quorum_.c_str(), nullptr, session_timeout, nullptr, nullptr, 0); + } } folly::Future<ServerName> LocationCache::LocateMeta() { - std::lock_guard<std::mutex> g(meta_lock_); + std::lock_guard<std::recursive_mutex> g(meta_lock_); if (meta_promise_ == nullptr) { this->RefreshMetaLocation(); } - return meta_promise_->getFuture(); + return meta_promise_->getFuture().onError([&](const folly::exception_wrapper &ew) { + auto promise = InvalidateMeta(); + promise->setException(ew); + return ServerName{}; + }); } -void LocationCache::InvalidateMeta() { +std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> LocationCache::InvalidateMeta() { + VLOG(2) << "Invalidating meta location"; + std::lock_guard<std::recursive_mutex> g(meta_lock_); if (meta_promise_ != nullptr) { - std::lock_guard<std::mutex> g(meta_lock_); - meta_promise_ = nullptr; + // return the unique_ptr back to the caller. + std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> ret = nullptr; + std::swap(ret, meta_promise_); + return ret; + } else { + return nullptr; } } @@ -84,18 +108,21 @@ void LocationCache::RefreshMetaLocation() { cpu_executor_->add([&] { meta_promise_->setWith([&] { return this->ReadMetaLocation(); }); }); } +// Note: this is a blocking call to zookeeper ServerName LocationCache::ReadMetaLocation() { auto buf = folly::IOBuf::create(4096); ZkDeserializer derser; + EnsureZooKeeperConnection(); // This needs to be int rather than size_t as that's what ZK expects. int len = buf->capacity(); std::string zk_node = ZKUtil::MetaZNode(*conf_); - // TODO(elliott): handle disconnects/reconntion as needed. int zk_result = zoo_get(this->zk_, zk_node.c_str(), 0, reinterpret_cast<char *>(buf->writableData()), &len, nullptr); if (zk_result != ZOK || len < 9) { LOG(ERROR) << "Error getting meta location."; + // We just close the zk connection, and let the upper levels retry. + CloseZooKeeperConnection(); throw std::runtime_error("Error getting meta location. Quorum: " + zk_quorum_); } buf->append(len); @@ -103,6 +130,8 @@ ServerName LocationCache::ReadMetaLocation() { MetaRegionServer mrs; if (derser.Parse(buf.get(), &mrs) == false) { LOG(ERROR) << "Unable to decode"; + throw std::runtime_error("Error getting meta location (Unable to decode). Quorum: " + + zk_quorum_); } return mrs.server(); } @@ -118,10 +147,15 @@ folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta( .then([tn, row, this](std::shared_ptr<RpcConnection> rpc_connection) { return (*rpc_connection->get_service())(std::move(meta_util_.MetaRequest(tn, row))); }) - .then([this](std::unique_ptr<Response> resp) { + .onError([&](const folly::exception_wrapper &ew) { + auto promise = InvalidateMeta(); + throw ew; + return static_cast<std::unique_ptr<Response>>(nullptr); + }) + .then([tn, this](std::unique_ptr<Response> resp) { // take the protobuf response and make it into // a region location. - return meta_util_.CreateLocation(std::move(*resp)); + return meta_util_.CreateLocation(std::move(*resp), tn); }) .then([tn, this](std::shared_ptr<RegionLocation> rl) { // Make sure that the correct location was found. @@ -134,9 +168,6 @@ folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta( .then([this](std::shared_ptr<RegionLocation> rl) { auto remote_id = std::make_shared<ConnectionId>(rl->server_name().host_name(), rl->server_name().port()); - // Now fill out the connection. - // rl->set_service(cp_->GetConnection(remote_id)->get_service()); TODO: this causes wangle - // assertion errors return rl; }) .then([tn, this](std::shared_ptr<RegionLocation> rl) { @@ -146,9 +177,20 @@ folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta( }); } +constexpr const char *MetaUtil::kMetaRegionName; + folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateRegion( const TableName &tn, const std::string &row, const RegionLocateType locate_type, const int64_t locate_ns) { + // We maybe asked to locate meta itself + if (MetaUtil::IsMeta(tn)) { + return LocateMeta().then([this](const ServerName &server_name) { + auto rl = std::make_shared<RegionLocation>(MetaUtil::kMetaRegionName, + meta_util_.meta_region_info(), server_name); + return rl; + }); + } + // TODO: implement region locate type and timeout auto cached_loc = this->GetCachedLocation(tn, row); if (cached_loc != nullptr) { @@ -164,34 +206,28 @@ std::shared_ptr<RegionLocation> LocationCache::GetCachedLocation(const hbase::pb auto t_locs = this->GetTableLocations(tn); std::shared_lock<folly::SharedMutexWritePriority> lock(locations_lock_); - if (VLOG_IS_ON(2)) { - for (const auto &p : *t_locs) { - VLOG(2) << "t_locs[" << p.first << "] = " << p.second->DebugString(); - } - } - // looking for the "floor" key as a start key auto possible_region = t_locs->upper_bound(row); if (t_locs->empty()) { - VLOG(2) << "Could not find region in cache, table map is empty"; + VLOG(5) << "Could not find region in cache, table map is empty"; return nullptr; } if (possible_region == t_locs->begin()) { - VLOG(2) << "Could not find region in cache, all keys are greater, row:" << row + VLOG(5) << "Could not find region in cache, all keys are greater, row:" << row << " ,possible_region:" << possible_region->second->DebugString(); return nullptr; } --possible_region; - VLOG(2) << "Found possible region in cache for row:" << row + VLOG(5) << "Found possible region in cache for row:" << row << " ,possible_region:" << possible_region->second->DebugString(); // found possible start key, now need to check end key if (possible_region->second->region_info().end_key() == "" || possible_region->second->region_info().end_key() > row) { - VLOG(1) << "Found region in cache for row:" << row + VLOG(2) << "Found region in cache for row:" << row << " ,region:" << possible_region->second->DebugString(); return possible_region->second; } else { @@ -261,15 +297,23 @@ void LocationCache::ClearCache() { // must hold unique lock on locations_lock_ void LocationCache::ClearCachedLocations(const hbase::pb::TableName &tn) { + VLOG(1) << "ClearCachedLocations, table:" << folly::to<std::string>(tn); std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_); cached_locations_.erase(tn); + if (MetaUtil::IsMeta(tn)) { + InvalidateMeta(); + } } // must hold unique lock on locations_lock_ void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row) { + VLOG(1) << "ClearCachedLocation, table:" << folly::to<std::string>(tn) << ", row:" << row; auto table_locs = this->GetTableLocations(tn); std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_); table_locs->erase(row); + if (MetaUtil::IsMeta(tn)) { + InvalidateMeta(); + } } void LocationCache::UpdateCachedLocation(const RegionLocation &loc, http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/location-cache.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h index a3c15cb..a374fb6 100644 --- a/hbase-native-client/core/location-cache.h +++ b/hbase-native-client/core/location-cache.h @@ -137,7 +137,7 @@ class LocationCache : public AsyncRegionLocator { /** * Remove the cached location of meta. */ - void InvalidateMeta(); + std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> InvalidateMeta(); /** * Return cached region location corresponding to this row, @@ -186,6 +186,10 @@ class LocationCache : public AsyncRegionLocator { const std::string &zk_quorum() { return zk_quorum_; } private: + void CloseZooKeeperConnection(); + void EnsureZooKeeperConnection(); + + private: void RefreshMetaLocation(); hbase::pb::ServerName ReadMetaLocation(); std::shared_ptr<RegionLocation> CreateLocation(const Response &resp); @@ -198,7 +202,7 @@ class LocationCache : public AsyncRegionLocator { std::string zk_quorum_; std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> meta_promise_; - std::mutex meta_lock_; + std::recursive_mutex meta_lock_; MetaUtil meta_util_; std::shared_ptr<ConnectionPool> cp_; http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/meta-utils.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc index 8efecc8..31349a5 100644 --- a/hbase-native-client/core/meta-utils.cc +++ b/hbase-native-client/core/meta-utils.cc @@ -21,10 +21,13 @@ #include <folly/Conv.h> #include <memory> +#include <utility> +#include <vector> #include "connection/request.h" #include "connection/response.h" #include "core/response-converter.h" +#include "exceptions/exception.h" #include "if/Client.pb.h" #include "serde/region-info.h" #include "serde/server-name.h" @@ -38,10 +41,17 @@ using hbase::pb::ServerName; namespace hbase { -static const std::string META_REGION = "1588230740"; -static const std::string CATALOG_FAMILY = "info"; -static const std::string REGION_INFO_COLUMN = "regioninfo"; -static const std::string SERVER_COLUMN = "server"; +MetaUtil::MetaUtil() { + meta_region_info_.set_start_key(""); + meta_region_info_.set_end_key(""); + meta_region_info_.set_offline(false); + meta_region_info_.set_split(false); + meta_region_info_.set_replica_id(0); + meta_region_info_.set_split(false); + meta_region_info_.set_region_id(1); + meta_region_info_.mutable_table_name()->set_namespace_(MetaUtil::kSystemNamespace); + meta_region_info_.mutable_table_name()->set_qualifier(MetaUtil::kMetaTableQualifier); +} std::string MetaUtil::RegionLookupRowkey(const TableName &tn, const std::string &row) const { return folly::to<std::string>(tn, ",", row, ",", "999999999999999999"); @@ -56,7 +66,7 @@ std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn, const std::st // Set the region this scan goes to auto region = msg->mutable_region(); - region->set_value(META_REGION); + region->set_value(MetaUtil::kMetaRegion); region->set_type( RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME); @@ -78,30 +88,38 @@ std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn, const std::st // Set the columns that we need auto info_col = scan->add_column(); - info_col->set_family("info"); - info_col->add_qualifier("server"); - info_col->add_qualifier("regioninfo"); + info_col->set_family(MetaUtil::kCatalogFamily); + info_col->add_qualifier(MetaUtil::kServerColumn); + info_col->add_qualifier(MetaUtil::kRegionInfoColumn); scan->set_start_row(RegionLookupRowkey(tn, row)); return request; } -std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp) { +std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp, + const TableName &tn) { std::vector<std::shared_ptr<Result>> results = ResponseConverter::FromScanResponse(resp); + if (results.size() == 0) { + throw TableNotFoundException(folly::to<std::string>(tn)); + } if (results.size() != 1) { throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" + std::to_string(results.size())); } auto result = *results[0]; - auto region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN); - auto server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN); + auto region_info_str = result.Value(MetaUtil::kCatalogFamily, MetaUtil::kRegionInfoColumn); + auto server_str = result.Value(MetaUtil::kCatalogFamily, MetaUtil::kServerColumn); CHECK(region_info_str); CHECK(server_str); auto row = result.Row(); auto region_info = folly::to<RegionInfo>(*region_info_str); auto server_name = folly::to<ServerName>(*server_str); - return std::make_shared<RegionLocation>(row, std::move(region_info), server_name, nullptr); + return std::make_shared<RegionLocation>(row, std::move(region_info), server_name); +} + +bool MetaUtil::IsMeta(const hbase::pb::TableName &tn) { + return folly::to<std::string>(tn) == MetaUtil::kMetaTableName; } } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/meta-utils.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h index d67f32d..d178179 100644 --- a/hbase-native-client/core/meta-utils.h +++ b/hbase-native-client/core/meta-utils.h @@ -34,6 +34,17 @@ namespace hbase { */ class MetaUtil { public: + static constexpr const char *kSystemNamespace = "hbase"; + static constexpr const char *kMetaTableQualifier = "meta"; + static constexpr const char *kMetaTableName = "hbase:meta"; + static constexpr const char *kMetaRegion = "1588230740"; + static constexpr const char *kMetaRegionName = "hbase:meta,,1"; + static constexpr const char *kCatalogFamily = "info"; + static constexpr const char *kRegionInfoColumn = "regioninfo"; + static constexpr const char *kServerColumn = "server"; + + MetaUtil(); + /** * Given a table and a row give the row key from which to start a scan to find * region locations. @@ -49,6 +60,17 @@ class MetaUtil { /** * Return a RegionLocation from the parsed Response */ - std::shared_ptr<RegionLocation> CreateLocation(const Response &resp); + std::shared_ptr<RegionLocation> CreateLocation(const Response &resp, + const hbase::pb::TableName &tn); + + /** + * Return whether the table is the meta table. + */ + static bool IsMeta(const hbase::pb::TableName &tn); + + const pb::RegionInfo &meta_region_info() const { return meta_region_info_; } + + private: + pb::RegionInfo meta_region_info_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/region-location.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h index d5d9d67..822180b 100644 --- a/hbase-native-client/core/region-location.h +++ b/hbase-native-client/core/region-location.h @@ -44,9 +44,8 @@ class RegionLocation { * this region. * @param service the connected service to the regionserver. */ - RegionLocation(std::string region_name, hbase::pb::RegionInfo ri, hbase::pb::ServerName sn, - std::shared_ptr<HBaseService> service) - : region_name_(region_name), ri_(ri), sn_(sn), service_(service) {} + RegionLocation(std::string region_name, hbase::pb::RegionInfo ri, hbase::pb::ServerName sn) + : region_name_(region_name), ri_(ri), sn_(sn) {} /** * Get a reference to the regio info @@ -64,19 +63,6 @@ class RegionLocation { const std::string ®ion_name() const { return region_name_; } /** - * Get a service. This could be closed or null. It's the caller's - * responsibility to check. - */ - std::shared_ptr<HBaseService> service() { return service_; } - - /** - * Set the service. - * This should be used if the region moved or if the connection is thought to - * be bad and a new tcp connection needs to be made. - */ - void set_service(std::shared_ptr<HBaseService> s) { service_ = s; } - - /** * Set the servername if the region has moved. */ void set_server_name(hbase::pb::ServerName sn) { sn_ = sn; } @@ -89,7 +75,6 @@ class RegionLocation { std::string region_name_; hbase::pb::RegionInfo ri_; hbase::pb::ServerName sn_; - std::shared_ptr<HBaseService> service_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/response-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index 9bc4892..4f9bfb1 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -93,7 +93,7 @@ std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const R std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse( const std::shared_ptr<ScanResponse> scan_resp, std::shared_ptr<CellScanner> cell_scanner) { VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString() - << " cell_scanner:" << (cell_scanner == nullptr); + << " cell_scanner:" << (cell_scanner != nullptr); int num_results = cell_scanner != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size(); http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/simple-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index f79d848..2fd7108 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -29,6 +29,7 @@ #include "connection/rpc-client.h" #include "core/client.h" #include "core/get.h" +#include "core/hbase-configuration-loader.h" #include "core/put.h" #include "core/scan.h" #include "core/table.h" @@ -39,6 +40,7 @@ using hbase::Client; using hbase::Configuration; using hbase::Get; +using hbase::HBaseConfigurationLoader; using hbase::Scan; using hbase::Put; using hbase::Table; @@ -49,6 +51,7 @@ using hbase::TimeUtil; DEFINE_string(table, "test_table", "What table to do the reads or writes"); DEFINE_string(row, "row_", "row prefix"); DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); +DEFINE_string(conf, "", "Conf directory to read the config from (optional)"); DEFINE_uint64(num_rows, 10000, "How many rows to write and read"); DEFINE_bool(puts, true, "Whether to perform puts"); DEFINE_bool(gets, true, "Whether to perform gets"); @@ -76,10 +79,17 @@ int main(int argc, char *argv[]) { FLAGS_logtostderr = 1; FLAGS_stderrthreshold = 1; - // Configuration - auto conf = std::make_shared<Configuration>(); - conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper); - conf->SetInt("hbase.client.cpu.thread.pool.size", FLAGS_threads); + std::shared_ptr<Configuration> conf = nullptr; + if (FLAGS_conf == "") { + // Configuration + conf = std::make_shared<Configuration>(); + conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper); + conf->SetInt("hbase.client.cpu.thread.pool.size", FLAGS_threads); + } else { + setenv("HBASE_CONF", FLAGS_conf.c_str(), 1); + hbase::HBaseConfigurationLoader loader; + conf = std::make_shared<Configuration>(loader.LoadDefaultResources().value()); + } auto row = FLAGS_row; http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/zk-util.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/zk-util.cc b/hbase-native-client/core/zk-util.cc index 50ea92a..d29c8c3 100644 --- a/hbase-native-client/core/zk-util.cc +++ b/hbase-native-client/core/zk-util.cc @@ -55,4 +55,8 @@ std::string ZKUtil::MetaZNode(const hbase::Configuration& conf) { return zk_node; } +int32_t ZKUtil::SessionTimeout(const hbase::Configuration& conf) { + return conf.GetInt(kHBaseZookeeperSessionTimeout_, kDefHBaseZookeeperSessionTimeout_); +} + } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/zk-util.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/zk-util.h b/hbase-native-client/core/zk-util.h index 8f2d627..403fbe4 100644 --- a/hbase-native-client/core/zk-util.h +++ b/hbase-native-client/core/zk-util.h @@ -34,8 +34,13 @@ class ZKUtil { static constexpr const char* kDefHBaseZnodeParent_ = "/hbase"; static constexpr const char* kHBaseMetaRegionServer_ = "meta-region-server"; + static constexpr const char* kHBaseZookeeperSessionTimeout_ = "zookeeper.session.timeout"; + static constexpr const int32_t kDefHBaseZookeeperSessionTimeout_ = 90000; + static std::string ParseZooKeeperQuorum(const hbase::Configuration& conf); static std::string MetaZNode(const hbase::Configuration& conf); + + static int32_t SessionTimeout(const hbase::Configuration& conf); }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/serde/region-info.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/region-info.h b/hbase-native-client/serde/region-info.h index 1f08298..8010042 100644 --- a/hbase-native-client/serde/region-info.h +++ b/hbase-native-client/serde/region-info.h @@ -19,13 +19,13 @@ #pragma once -#include "if/HBase.pb.h" - #include <folly/Conv.h> #include <boost/algorithm/string/predicate.hpp> #include <string> +#include "if/HBase.pb.h" + namespace hbase { namespace pb { template <class String> http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/serde/table-name.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/table-name.h b/hbase-native-client/serde/table-name.h index b8b7776..3594802 100644 --- a/hbase-native-client/serde/table-name.h +++ b/hbase-native-client/serde/table-name.h @@ -18,12 +18,13 @@ */ #pragma once +#include <folly/Conv.h> +#include <folly/String.h> + #include <memory> #include <string> #include <vector> -#include <folly/Conv.h> -#include <folly/String.h> #include "if/HBase.pb.h" namespace hbase { http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/test-util/mini-cluster.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/test-util/mini-cluster.cc b/hbase-native-client/test-util/mini-cluster.cc index 688ea8e..56461e1 100644 --- a/hbase-native-client/test-util/mini-cluster.cc +++ b/hbase-native-client/test-util/mini-cluster.cc @@ -59,6 +59,7 @@ JNIEnv *MiniCluster::CreateVM(JavaVM **jvm) { } fd.close(); } + auto options = std::string{"-Djava.class.path="} + clspath; jvm_options.optionString = const_cast<char *>(options.c_str()); args.options = &jvm_options; @@ -185,6 +186,9 @@ JNIEnv *MiniCluster::env() { } // converts C char* to Java byte[] jbyteArray MiniCluster::StrToByteChar(const std::string &str) { + if (str.size() == 0) { + return nullptr; + } char *p = const_cast<char *>(str.c_str()); int n = str.length(); jbyteArray arr = env_->NewByteArray(n); http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/test-util/test-util.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/test-util/test-util.cc b/hbase-native-client/test-util/test-util.cc index 26862d8..b32c635 100644 --- a/hbase-native-client/test-util/test-util.cc +++ b/hbase-native-client/test-util/test-util.cc @@ -79,6 +79,10 @@ void TestUtil::CreateTable(const std::string &table, const std::vector<std::stri mini_->CreateTable(table, families, keys); } +void TestUtil::MoveRegion(const std::string ®ion, const std::string &server) { + mini_->MoveRegion(region, server); +} + void TestUtil::StartStandAloneInstance() { auto p = temp_dir_.path().string(); auto cmd = std::string{"bin/start-local-hbase.sh " + p}; http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/test-util/test-util.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/test-util/test-util.h b/hbase-native-client/test-util/test-util.h index e26558b..40e99d1 100644 --- a/hbase-native-client/test-util/test-util.h +++ b/hbase-native-client/test-util/test-util.h @@ -68,6 +68,7 @@ class TestUtil { void StartStandAloneInstance(); void StopStandAloneInstance(); void RunShellCmd(const std::string &); + void MoveRegion(const std::string ®ion, const std::string &server); private: std::unique_ptr<MiniCluster> mini_; http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/utils/bytes-util-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/utils/bytes-util-test.cc b/hbase-native-client/utils/bytes-util-test.cc index 16af021..4a49593 100644 --- a/hbase-native-client/utils/bytes-util-test.cc +++ b/hbase-native-client/utils/bytes-util-test.cc @@ -23,8 +23,7 @@ #include "utils/bytes-util.h" -using namespace std; -using namespace hbase; +using hbase::BytesUtil; TEST(TestBytesUtil, TestToStringBinary) { std::string empty{""}; http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/utils/bytes-util.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/utils/bytes-util.cc b/hbase-native-client/utils/bytes-util.cc index a937782..12037c3 100644 --- a/hbase-native-client/utils/bytes-util.cc +++ b/hbase-native-client/utils/bytes-util.cc @@ -21,11 +21,11 @@ #include <bits/stdc++.h> #include <boost/predef.h> +#include <glog/logging.h> + #include <memory> #include <string> -#include <glog/logging.h> - namespace hbase { constexpr char BytesUtil::kHexChars[];