http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/raw-async-table.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/raw-async-table.cc b/hbase-native-client/src/hbase/client/raw-async-table.cc new file mode 100644 index 0000000..96361e4 --- /dev/null +++ b/hbase-native-client/src/hbase/client/raw-async-table.cc @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <utility> + +#include "hbase/client/async-batch-rpc-retrying-caller.h" +#include "hbase/client/raw-async-table.h" +#include "hbase/client/request-converter.h" +#include "hbase/client/response-converter.h" + +using hbase::security::User; + +namespace hbase { + +template <typename RESP> +std::shared_ptr<SingleRequestCallerBuilder<RESP>> RawAsyncTable::CreateCallerBuilder( + std::string row, std::chrono::nanoseconds rpc_timeout) { + return connection_->caller_factory() + ->Single<RESP>() + ->table(table_name_) + ->row(row) + ->rpc_timeout(rpc_timeout) + ->operation_timeout(connection_conf_->operation_timeout()) + ->pause(connection_conf_->pause()) + ->max_retries(connection_conf_->max_retries()) + ->start_log_errors_count(connection_conf_->start_log_errors_count()); +} + +template <typename REQ, typename PREQ, typename PRESP, typename RESP> +folly::Future<RESP> RawAsyncTable::Call( + std::shared_ptr<RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller, + std::shared_ptr<RegionLocation> loc, const REQ& req, + const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter, + const RespConverter<RESP, PRESP> resp_converter) { + std::unique_ptr<PREQ> preq = req_converter(req, loc->region_name()); + + // No need to make take a callable argument, it is always the same + return rpc_client + ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq), + User::defaultUser(), "ClientService") + .then( + [resp_converter](const std::unique_ptr<PRESP>& presp) { return resp_converter(*presp); }); +} + +folly::Future<std::shared_ptr<Result>> RawAsyncTable::Get(const hbase::Get& get) { + auto caller = + CreateCallerBuilder<std::shared_ptr<Result>>(get.row(), connection_conf_->read_rpc_timeout()) + ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> rpc_client) + -> folly::Future<std::shared_ptr<hbase::Result>> { + return Call<hbase::Get, hbase::Request, hbase::Response, + std::shared_ptr<hbase::Result>>( + rpc_client, controller, loc, get, + &hbase::RequestConverter::ToGetRequest, + &hbase::ResponseConverter::FromGetResponse); + }) + ->Build(); + + // Return the Future we obtain from the call(). However, we do not want the Caller to go out of + // context and get deallocated since the caller injects a lot of closures which capture [this, &] + // which is use-after-free. We are just passing an identity closure capturing caller by value to + // ensure that the lifecycle of the Caller object is longer than the retry lambdas. + return caller->Call().then([caller](const auto r) { return r; }); +} +folly::Future<std::shared_ptr<Result>> RawAsyncTable::Increment(const hbase::Increment& incr) { + auto caller = + CreateCallerBuilder<std::shared_ptr<Result>>(incr.row(), + connection_conf_->write_rpc_timeout()) + ->action([=, &incr](std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> + rpc_client) -> folly::Future<std::shared_ptr<Result>> { + return Call<hbase::Increment, hbase::Request, hbase::Response, std::shared_ptr<Result>>( + rpc_client, controller, loc, incr, + &hbase::RequestConverter::IncrementToMutateRequest, + &hbase::ResponseConverter::FromMutateResponse); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} + +folly::Future<folly::Unit> RawAsyncTable::Put(const hbase::Put& put) { + auto caller = + CreateCallerBuilder<folly::Unit>(put.row(), connection_conf_->write_rpc_timeout()) + ->action([=, &put]( + std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> { + return Call<hbase::Put, hbase::Request, hbase::Response, folly::Unit>( + rpc_client, controller, loc, put, &hbase::RequestConverter::ToMutateRequest, + [](const Response& r) -> folly::Unit { return folly::unit; }); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} + +folly::Future<bool> RawAsyncTable::CheckAndPut(const std::string& row, const std::string& family, + const std::string& qualifier, + const std::string& value, const hbase::Put& put, + const pb::CompareType& compare_op) { + auto caller = + CreateCallerBuilder<bool>(row, connection_conf_->write_rpc_timeout()) + ->action([=, &put](std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<bool> { + return Call<hbase::Put, hbase::Request, hbase::Response, bool>( + rpc_client, controller, loc, put, + // request conversion + [=, &put](const hbase::Put& put, + const std::string& region_name) -> std::unique_ptr<Request> { + auto checkReq = RequestConverter::CheckAndPutToMutateRequest( + row, family, qualifier, value, compare_op, put, region_name); + return checkReq; + }, + // response conversion + &ResponseConverter::BoolFromMutateResponse); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} + +folly::Future<bool> RawAsyncTable::CheckAndDelete(const std::string& row, const std::string& family, + const std::string& qualifier, + const std::string& value, + const hbase::Delete& del, + const pb::CompareType& compare_op) { + auto caller = + CreateCallerBuilder<bool>(row, connection_conf_->write_rpc_timeout()) + ->action([=, &del](std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<bool> { + return Call<hbase::Delete, hbase::Request, hbase::Response, bool>( + rpc_client, controller, loc, del, + // request conversion + [=, &del](const hbase::Delete& del, + const std::string& region_name) -> std::unique_ptr<Request> { + auto checkReq = RequestConverter::CheckAndDeleteToMutateRequest( + row, family, qualifier, value, compare_op, del, region_name); + return checkReq; + }, + // response conversion + &ResponseConverter::BoolFromMutateResponse); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} + +folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) { + auto caller = + CreateCallerBuilder<folly::Unit>(del.row(), connection_conf_->write_rpc_timeout()) + ->action([=, &del]( + std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> { + return Call<hbase::Delete, hbase::Request, hbase::Response, folly::Unit>( + rpc_client, controller, loc, del, &hbase::RequestConverter::DeleteToMutateRequest, + [](const Response& r) -> folly::Unit { return folly::unit; }); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} + +folly::Future<std::shared_ptr<Result>> RawAsyncTable::Append(const hbase::Append& append) { + auto caller = + CreateCallerBuilder<std::shared_ptr<Result>>(append.row(), + connection_conf_->write_rpc_timeout()) + ->action([=, &append](std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> + rpc_client) -> folly::Future<std::shared_ptr<Result>> { + return Call<hbase::Append, hbase::Request, hbase::Response, std::shared_ptr<Result>>( + rpc_client, controller, loc, append, + &hbase::RequestConverter::AppendToMutateRequest, + &hbase::ResponseConverter::FromMutateResponse); + }) + ->Build(); + + return caller->Call().then([caller](const auto r) { return r; }); +} + +folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Get( + const std::vector<hbase::Get>& gets) { + std::vector<std::shared_ptr<hbase::Row>> rows; + for (auto get : gets) { + std::shared_ptr<hbase::Row> srow = std::make_shared<hbase::Get>(get); + rows.push_back(srow); + } + return this->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>( + rows, connection_conf_->read_rpc_timeout()); +} + +template <typename REQ, typename RESP> +folly::Future<std::vector<folly::Try<RESP>>> RawAsyncTable::Batch( + const std::vector<REQ>& rows, std::chrono::nanoseconds timeout) { + auto caller = connection_->caller_factory() + ->Batch<REQ, RESP>() + ->table(table_name_) + ->actions(std::make_shared<std::vector<REQ>>(rows)) + ->rpc_timeout(timeout) + ->operation_timeout(connection_conf_->operation_timeout()) + ->pause(connection_conf_->pause()) + ->max_attempts(connection_conf_->max_retries()) + ->start_log_errors_count(connection_conf_->start_log_errors_count()) + ->Build(); + + return caller->Call().then([caller](auto r) { return r; }); +} + +void RawAsyncTable::Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer) { + auto scanner = AsyncClientScanner::Create( + connection_, SetDefaultScanConfig(scan), table_name_, consumer, connection_conf_->pause(), + connection_conf_->max_retries(), connection_conf_->scan_timeout(), + connection_conf_->rpc_timeout(), connection_conf_->start_log_errors_count()); + scanner->Start(); +} + +std::shared_ptr<hbase::Scan> RawAsyncTable::SetDefaultScanConfig(const hbase::Scan& scan) { + // always create a new scan object as we may reset the start row later. + auto new_scan = std::make_shared<hbase::Scan>(scan); + if (new_scan->Caching() <= 0) { + new_scan->SetCaching(default_scanner_caching_); + } + if (new_scan->MaxResultSize() <= 0) { + new_scan->SetMaxResultSize(default_scanner_max_result_size_); + } + return new_scan; +} + +folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Put( + const std::vector<hbase::Put>& puts) { + std::vector<std::shared_ptr<hbase::Row>> rows; + for (auto put : puts) { + std::shared_ptr<hbase::Row> srow = std::make_shared<hbase::Put>(put); + rows.push_back(srow); + } + return this->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>( + rows, connection_conf_->write_rpc_timeout()); +} +} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/region-result.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/region-result.cc b/hbase-native-client/src/hbase/client/region-result.cc new file mode 100644 index 0000000..28c4861 --- /dev/null +++ b/hbase-native-client/src/hbase/client/region-result.cc @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/region-result.h" +#include <glog/logging.h> +#include <stdexcept> + +using hbase::pb::RegionLoadStats; + +namespace hbase { + +RegionResult::RegionResult() {} + +RegionResult::~RegionResult() {} + +void RegionResult::AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result, + std::shared_ptr<folly::exception_wrapper> exc) { + auto index_found = result_or_excption_.find(index); + if (index_found == result_or_excption_.end()) { + result_or_excption_[index] = std::make_tuple(result ? result : nullptr, exc ? exc : nullptr); + } else { + throw std::runtime_error("Index " + std::to_string(index) + + " already set with ResultOrException"); + } +} + +void RegionResult::set_stat(std::shared_ptr<RegionLoadStats> stat) { stat_ = stat; } + +int RegionResult::ResultOrExceptionSize() const { return result_or_excption_.size(); } + +std::shared_ptr<ResultOrExceptionTuple> RegionResult::ResultOrException(int32_t index) const { + return std::make_shared<ResultOrExceptionTuple>(result_or_excption_.at(index)); +} + +const std::shared_ptr<RegionLoadStats>& RegionResult::stat() const { return stat_; } + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/request-converter-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/request-converter-test.cc b/hbase-native-client/src/hbase/client/request-converter-test.cc new file mode 100644 index 0000000..0878519 --- /dev/null +++ b/hbase-native-client/src/hbase/client/request-converter-test.cc @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/request-converter.h" + +#include <gtest/gtest.h> +#include <limits> +#include "hbase/connection/request.h" +#include "hbase/client/get.h" +#include "hbase/client/scan.h" + +using hbase::Get; +using hbase::Scan; + +using hbase::pb::GetRequest; +using hbase::pb::RegionSpecifier; +using hbase::pb::RegionSpecifier_RegionSpecifierType; +using hbase::pb::ScanRequest; + +TEST(RequestConverter, ToGet) { + std::string row_str = "row-test"; + Get get(row_str); + get.AddFamily("family-1"); + get.AddFamily("family-2"); + get.AddFamily("family-3"); + get.AddColumn("family-2", "qualifier-1"); + get.AddColumn("family-2", "qualifier-2"); + get.AddColumn("family-2", "qualifier-3"); + get.SetCacheBlocks(false); + get.SetConsistency(hbase::pb::Consistency::TIMELINE); + get.SetMaxVersions(2); + get.SetTimeRange(10000, 20000); + std::string region_name("RegionName"); + + auto req = hbase::RequestConverter::ToGetRequest(get, region_name); + auto msg = std::static_pointer_cast<GetRequest>(req->req_msg()); + + // Tests whether the PB object is properly set or not. + ASSERT_TRUE(msg->has_region()); + ASSERT_TRUE(msg->region().has_value()); + EXPECT_EQ(msg->region().value(), region_name); + + ASSERT_TRUE(msg->has_get()); + EXPECT_EQ(msg->get().row(), row_str); + EXPECT_FALSE(msg->get().cache_blocks()); + EXPECT_EQ(msg->get().consistency(), hbase::pb::Consistency::TIMELINE); + EXPECT_EQ(msg->get().max_versions(), 2); + EXPECT_EQ(msg->get().column_size(), 3); + for (int i = 0; i < msg->get().column_size(); ++i) { + EXPECT_EQ(msg->get().column(i).family(), "family-" + std::to_string(i + 1)); + for (int j = 0; j < msg->get().column(i).qualifier_size(); ++j) { + EXPECT_EQ(msg->get().column(i).qualifier(j), "qualifier-" + std::to_string(j + 1)); + } + } +} + +TEST(RequestConverter, ToScan) { + std::string start_row("start-row"); + std::string stop_row("stop-row"); + hbase::Scan scan; + scan.AddFamily("family-1"); + scan.AddFamily("family-2"); + scan.AddFamily("family-3"); + scan.AddColumn("family-2", "qualifier-1"); + scan.AddColumn("family-2", "qualifier-2"); + scan.AddColumn("family-2", "qualifier-3"); + scan.SetReversed(true); + scan.SetStartRow(start_row); + scan.SetStopRow(stop_row); + scan.SetCaching(3); + scan.SetConsistency(hbase::pb::Consistency::TIMELINE); + scan.SetCacheBlocks(true); + scan.SetAllowPartialResults(true); + scan.SetLoadColumnFamiliesOnDemand(true); + scan.SetMaxVersions(5); + scan.SetTimeRange(10000, 20000); + std::string region_name("RegionName"); + + auto req = hbase::RequestConverter::ToScanRequest(scan, region_name); + auto msg = std::static_pointer_cast<ScanRequest>(req->req_msg()); + + // Tests whether the PB object is properly set or not. + ASSERT_TRUE(msg->has_region()); + ASSERT_TRUE(msg->region().has_value()); + EXPECT_EQ(msg->region().value(), region_name); + + ASSERT_TRUE(msg->has_scan()); + EXPECT_TRUE(msg->scan().reversed()); + EXPECT_EQ(msg->scan().start_row(), start_row); + EXPECT_EQ(msg->scan().stop_row(), stop_row); + EXPECT_FALSE(msg->scan().small()); + EXPECT_EQ(msg->scan().caching(), 3); + EXPECT_EQ(msg->scan().consistency(), hbase::pb::Consistency::TIMELINE); + EXPECT_TRUE(msg->scan().cache_blocks()); + EXPECT_TRUE(msg->scan().allow_partial_results()); + EXPECT_TRUE(msg->scan().load_column_families_on_demand()); + EXPECT_EQ(msg->scan().max_versions(), 5); + EXPECT_EQ(msg->scan().max_result_size(), std::numeric_limits<uint64_t>::max()); + + EXPECT_EQ(msg->scan().column_size(), 3); + for (int i = 0; i < msg->scan().column_size(); ++i) { + EXPECT_EQ(msg->scan().column(i).family(), "family-" + std::to_string(i + 1)); + for (int j = 0; j < msg->scan().column(i).qualifier_size(); ++j) { + EXPECT_EQ(msg->scan().column(i).qualifier(j), "qualifier-" + std::to_string(j + 1)); + } + } + ASSERT_FALSE(msg->client_handles_partials()); + ASSERT_FALSE(msg->client_handles_heartbeats()); + ASSERT_FALSE(msg->track_scan_metrics()); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/request-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/request-converter.cc b/hbase-native-client/src/hbase/client/request-converter.cc new file mode 100644 index 0000000..a57ac31 --- /dev/null +++ b/hbase-native-client/src/hbase/client/request-converter.cc @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/request-converter.h" + +#include <folly/Conv.h> + +#include <utility> +#include "hbase/if/Client.pb.h" + +using hbase::pb::GetRequest; +using hbase::pb::MutationProto; +using hbase::pb::RegionAction; +using hbase::pb::RegionSpecifier; +using hbase::pb::RegionSpecifier_RegionSpecifierType; +using hbase::pb::ScanRequest; + +namespace hbase { + +RequestConverter::~RequestConverter() {} + +RequestConverter::RequestConverter() {} + +void RequestConverter::SetRegion(const std::string ®ion_name, + RegionSpecifier *region_specifier) { + region_specifier->set_type( + RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_REGION_NAME); + region_specifier->set_value(region_name); +} + +std::unique_ptr<Request> RequestConverter::ToGetRequest(const Get &get, + const std::string ®ion_name) { + auto pb_req = Request::get(); + auto pb_msg = std::static_pointer_cast<GetRequest>(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + pb_msg->set_allocated_get((RequestConverter::ToGet(get)).release()); + return pb_req; +} + +std::unique_ptr<hbase::pb::Scan> RequestConverter::ToScan(const Scan &scan) { + auto pb_scan = std::make_unique<hbase::pb::Scan>(); + pb_scan->set_max_versions(scan.MaxVersions()); + pb_scan->set_cache_blocks(scan.CacheBlocks()); + pb_scan->set_reversed(scan.IsReversed()); + pb_scan->set_caching(scan.Caching()); + pb_scan->set_start_row(scan.StartRow()); + pb_scan->set_stop_row(scan.StopRow()); + pb_scan->set_consistency(scan.Consistency()); + pb_scan->set_max_result_size(scan.MaxResultSize()); + pb_scan->set_allow_partial_results(scan.AllowPartialResults()); + pb_scan->set_load_column_families_on_demand(scan.LoadColumnFamiliesOnDemand()); + + if (!scan.Timerange().IsAllTime()) { + hbase::pb::TimeRange *pb_time_range = pb_scan->mutable_time_range(); + pb_time_range->set_from(scan.Timerange().MinTimeStamp()); + pb_time_range->set_to(scan.Timerange().MaxTimeStamp()); + } + + if (scan.HasFamilies()) { + for (const auto &family : scan.FamilyMap()) { + auto column = pb_scan->add_column(); + column->set_family(family.first); + for (const auto &qualifier : family.second) { + column->add_qualifier(qualifier); + } + } + } + + if (scan.filter() != nullptr) { + pb_scan->set_allocated_filter(Filter::ToProto(*(scan.filter())).release()); + } + + return std::move(pb_scan); +} + +std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan, + const std::string ®ion_name) { + auto pb_req = Request::scan(); + auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); + + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_scan(ToScan(scan).release()); + + SetCommonScanRequestFields(pb_msg, false); + + return pb_req; +} + +std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan, + const std::string ®ion_name, + int32_t num_rows, bool close_scanner) { + auto pb_req = Request::scan(); + auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); + + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_scan(ToScan(scan).release()); + + pb_msg->set_number_of_rows(num_rows); + pb_msg->set_close_scanner(close_scanner); + + SetCommonScanRequestFields(pb_msg, false); + + return pb_req; +} + +std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows, + bool close_scanner) { + auto pb_req = Request::scan(); + auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); + + pb_msg->set_number_of_rows(num_rows); + pb_msg->set_close_scanner(close_scanner); + pb_msg->set_scanner_id(scanner_id); + + SetCommonScanRequestFields(pb_msg, false); + + return pb_req; +} + +std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows, + bool close_scanner, + int64_t next_call_seq_id, bool renew) { + auto pb_req = Request::scan(); + auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); + + pb_msg->set_number_of_rows(num_rows); + pb_msg->set_close_scanner(close_scanner); + pb_msg->set_scanner_id(scanner_id); + pb_msg->set_next_call_seq(next_call_seq_id); + + SetCommonScanRequestFields(pb_msg, renew); + return pb_req; +} + +void RequestConverter::SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest> pb_msg, + bool renew) { + // TODO We will change these later when we implement partial results and heartbeats, etc + pb_msg->set_client_handles_partials(false); + pb_msg->set_client_handles_heartbeats(false); + pb_msg->set_track_scan_metrics(false); + pb_msg->set_renew(renew); + // TODO: set scan limit +} + +std::unique_ptr<Request> RequestConverter::ToMultiRequest( + const ActionsByRegion &actions_by_region) { + auto pb_req = Request::multi(); + auto pb_msg = std::static_pointer_cast<hbase::pb::MultiRequest>(pb_req->req_msg()); + + for (const auto &action_by_region : actions_by_region) { + auto pb_region_action = pb_msg->add_regionaction(); + RequestConverter::SetRegion(action_by_region.first, pb_region_action->mutable_region()); + int action_num = 0; + for (const auto ®ion_action : action_by_region.second->actions()) { + auto pb_action = pb_region_action->add_action(); + auto pget = region_action->action(); + // We store only hbase::Get in hbase::Action as of now. It will be changed later on. + CHECK(pget) << "Unexpected. action can't be null."; + std::string error_msg(""); + if (typeid(*pget) == typeid(hbase::Get)) { + auto getp = dynamic_cast<hbase::Get *>(pget.get()); + pb_action->set_allocated_get(RequestConverter::ToGet(*getp).release()); + } else if (typeid(*pget) == typeid(hbase::Put)) { + auto putp = dynamic_cast<hbase::Put *>(pget.get()); + pb_action->set_allocated_mutation( + RequestConverter::ToMutation(MutationType::MutationProto_MutationType_PUT, *putp, -1) + .release()); + } else { + throw std::runtime_error("Unexpected action type encountered."); + } + pb_action->set_index(action_num); + action_num++; + } + } + return pb_req; +} + +std::unique_ptr<hbase::pb::Get> RequestConverter::ToGet(const Get &get) { + auto pb_get = std::make_unique<hbase::pb::Get>(); + pb_get->set_max_versions(get.MaxVersions()); + pb_get->set_cache_blocks(get.CacheBlocks()); + pb_get->set_consistency(get.Consistency()); + + if (!get.Timerange().IsAllTime()) { + hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range(); + pb_time_range->set_from(get.Timerange().MinTimeStamp()); + pb_time_range->set_to(get.Timerange().MaxTimeStamp()); + } + pb_get->set_row(get.row()); + if (get.HasFamilies()) { + for (const auto &family : get.FamilyMap()) { + auto column = pb_get->add_column(); + column->set_family(family.first); + for (const auto &qualifier : family.second) { + column->add_qualifier(qualifier); + } + } + } + + if (get.filter() != nullptr) { + pb_get->set_allocated_filter(Filter::ToProto(*(get.filter())).release()); + } + return pb_get; +} + +std::unique_ptr<MutationProto> RequestConverter::ToMutation(const MutationType type, + const Mutation &mutation, + const int64_t nonce) { + auto pb_mut = std::make_unique<MutationProto>(); + pb_mut->set_row(mutation.row()); + pb_mut->set_mutate_type(type); + pb_mut->set_durability(mutation.Durability()); + pb_mut->set_timestamp(mutation.TimeStamp()); + // TODO: set attributes from the mutation (key value pairs). + + if (nonce > 0) { + pb_mut->set_nonce(nonce); + } + + for (const auto &family : mutation.FamilyMap()) { + for (const auto &cell : family.second) { + auto column = pb_mut->add_column_value(); + column->set_family(cell->Family()); + auto qual = column->add_qualifier_value(); + qual->set_qualifier(cell->Qualifier()); + qual->set_timestamp(cell->Timestamp()); + auto cell_type = cell->Type(); + if (type == pb::MutationProto_MutationType_DELETE || + (type == pb::MutationProto_MutationType_PUT && IsDelete(cell_type))) { + qual->set_delete_type(ToDeleteType(cell_type)); + } + + qual->set_value(cell->Value()); + } + } + return std::move(pb_mut); +} + +DeleteType RequestConverter::ToDeleteType(const CellType type) { + switch (type) { + case CellType::DELETE: + return pb::MutationProto_DeleteType_DELETE_ONE_VERSION; + case CellType::DELETE_COLUMN: + return pb::MutationProto_DeleteType_DELETE_MULTIPLE_VERSIONS; + case CellType::DELETE_FAMILY: + return pb::MutationProto_DeleteType_DELETE_FAMILY; + case CellType::DELETE_FAMILY_VERSION: + return pb::MutationProto_DeleteType_DELETE_FAMILY_VERSION; + default: + throw std::runtime_error("Unknown delete type: " + folly::to<std::string>(type)); + } +} + +bool RequestConverter::IsDelete(const CellType type) { + return CellType::DELETE <= type && type <= CellType::DELETE_FAMILY; +} + +std::unique_ptr<Request> RequestConverter::ToMutateRequest(const Put &put, + const std::string ®ion_name) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release()); + + return pb_req; +} + +std::unique_ptr<Request> RequestConverter::CheckAndPutToMutateRequest( + const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const pb::CompareType compare_op, const hbase::Put &put, + const std::string ®ion_name) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release()); + ::hbase::pb::Condition *cond = pb_msg->mutable_condition(); + cond->set_row(row); + cond->set_family(family); + cond->set_qualifier(qualifier); + cond->set_allocated_comparator( + Comparator::ToProto(*(ComparatorFactory::BinaryComparator(value).get())).release()); + cond->set_compare_type(compare_op); + + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + return pb_req; +} + +std::unique_ptr<Request> RequestConverter::CheckAndDeleteToMutateRequest( + const std::string &row, const std::string &family, const std::string &qualifier, + const std::string &value, const pb::CompareType compare_op, const hbase::Delete &del, + const std::string ®ion_name) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_DELETE, del, -1).release()); + ::hbase::pb::Condition *cond = pb_msg->mutable_condition(); + cond->set_row(row); + cond->set_family(family); + cond->set_qualifier(qualifier); + cond->set_allocated_comparator( + Comparator::ToProto(*(ComparatorFactory::BinaryComparator(value).get())).release()); + cond->set_compare_type(compare_op); + + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + return pb_req; +} + +std::unique_ptr<Request> RequestConverter::DeleteToMutateRequest(const Delete &del, + const std::string ®ion_name) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_DELETE, del, -1).release()); + + VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); + return pb_req; +} +std::unique_ptr<Request> RequestConverter::IncrementToMutateRequest( + const Increment &incr, const std::string ®ion_name) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_INCREMENT, incr, -1).release()); + + VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); + return pb_req; +} + +std::unique_ptr<Request> RequestConverter::AppendToMutateRequest(const Append &append, + const std::string ®ion_name) { + auto pb_req = Request::mutate(); + auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_mutation( + ToMutation(MutationType::MutationProto_MutationType_APPEND, append, -1).release()); + + VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); + return pb_req; +} + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/response-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/response-converter.cc b/hbase-native-client/src/hbase/client/response-converter.cc new file mode 100644 index 0000000..f3b78fd --- /dev/null +++ b/hbase-native-client/src/hbase/client/response-converter.cc @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/response-converter.h" +#include <glog/logging.h> +#include <stdexcept> +#include <string> +#include <utility> +#include <vector> +#include "hbase/client/cell.h" +#include "hbase/client/multi-response.h" +#include "hbase/exceptions/exception.h" + +using hbase::pb::GetResponse; +using hbase::pb::MutateResponse; +using hbase::pb::ScanResponse; +using hbase::pb::RegionLoadStats; + +namespace hbase { + +ResponseConverter::ResponseConverter() {} + +ResponseConverter::~ResponseConverter() {} + +// impl note: we are returning shared_ptr's instead of unique_ptr's because these +// go inside folly::Future's, making the move semantics extremely tricky. +std::shared_ptr<Result> ResponseConverter::FromGetResponse(const Response& resp) { + auto get_resp = std::static_pointer_cast<GetResponse>(resp.resp_msg()); + VLOG(3) << "FromGetResponse:" << get_resp->ShortDebugString(); + return ToResult(get_resp->result(), resp.cell_scanner()); +} + +std::shared_ptr<Result> ResponseConverter::FromMutateResponse(const Response& resp) { + auto mutate_resp = std::static_pointer_cast<MutateResponse>(resp.resp_msg()); + hbase::pb::Result result = mutate_resp->result(); + return ToResult(mutate_resp->result(), resp.cell_scanner()); +} + +bool ResponseConverter::BoolFromMutateResponse(const Response& resp) { + auto mutate_resp = std::static_pointer_cast<MutateResponse>(resp.resp_msg()); + return mutate_resp->processed(); +} + +std::shared_ptr<Result> ResponseConverter::ToResult( + const hbase::pb::Result& result, const std::shared_ptr<CellScanner> cell_scanner) { + std::vector<std::shared_ptr<Cell>> vcells; + for (auto cell : result.cell()) { + std::shared_ptr<Cell> pcell = + std::make_shared<Cell>(cell.row(), cell.family(), cell.qualifier(), cell.timestamp(), + cell.value(), static_cast<hbase::CellType>(cell.cell_type())); + vcells.push_back(pcell); + } + + // iterate over the cells coming from rpc codec + if (cell_scanner != nullptr) { + int cells_read = 0; + while (cells_read != result.associated_cell_count()) { + if (cell_scanner->Advance()) { + vcells.push_back(cell_scanner->Current()); + cells_read += 1; + } else { + LOG(ERROR) << "CellScanner::Advance() returned false unexpectedly. Cells Read:- " + << cells_read << "; Expected Cell Count:- " << result.associated_cell_count(); + std::runtime_error("CellScanner::Advance() returned false unexpectedly"); + } + } + } + return std::make_shared<Result>(vcells, result.exists(), result.stale(), result.partial()); +} + +std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) { + auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg()); + return FromScanResponse(scan_resp, resp.cell_scanner()); +} + +std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse( + const std::shared_ptr<ScanResponse> scan_resp, std::shared_ptr<CellScanner> cell_scanner) { + VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString() + << " cell_scanner:" << (cell_scanner != nullptr); + int num_results = + cell_scanner != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size(); + + std::vector<std::shared_ptr<Result>> results{static_cast<size_t>(num_results)}; + for (int i = 0; i < num_results; i++) { + if (cell_scanner != nullptr) { + // Cells are out in cellblocks. Group them up again as Results. How many to read at a + // time will be found in getCellsLength -- length here is how many Cells in the i'th Result + int num_cells = scan_resp->cells_per_result(i); + + std::vector<std::shared_ptr<Cell>> vcells; + for (int j = 0; j < num_cells; j++) { + if (!cell_scanner->Advance()) { + std::string msg = "Results sent from server=" + std::to_string(num_results) + + ". But only got " + std::to_string(i) + + " results completely at client. Resetting the scanner to scan again."; + LOG(ERROR) << msg; + throw std::runtime_error(msg); + } + vcells.push_back(cell_scanner->Current()); + } + // TODO: handle partial results per Result by checking partial_flag_per_result + results[i] = std::make_shared<Result>(vcells, false, scan_resp->stale(), false); + } else { + results[i] = ToResult(scan_resp->results(i), cell_scanner); + } + } + + return results; +} + +std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults( + std::shared_ptr<Request> req, const Response& resp, + const ServerRequest::ActionsByRegion& actions_by_region) { + auto multi_req = std::static_pointer_cast<hbase::pb::MultiRequest>(req->req_msg()); + auto multi_resp = std::static_pointer_cast<hbase::pb::MultiResponse>(resp.resp_msg()); + VLOG(3) << "GetResults:" << multi_resp->ShortDebugString(); + int req_region_action_count = multi_req->regionaction_size(); + int res_region_action_count = multi_resp->regionactionresult_size(); + if (req_region_action_count != res_region_action_count) { + throw std::runtime_error("Request mutation count=" + std::to_string(req_region_action_count) + + " does not match response mutation result count=" + + std::to_string(res_region_action_count)); + } + auto multi_response = std::make_unique<hbase::MultiResponse>(); + for (int32_t num = 0; num < res_region_action_count; num++) { + hbase::pb::RegionAction actions = multi_req->regionaction(num); + hbase::pb::RegionActionResult action_result = multi_resp->regionactionresult(num); + hbase::pb::RegionSpecifier rs = actions.region(); + if (rs.has_type() && rs.type() != hbase::pb::RegionSpecifier::REGION_NAME) { + throw std::runtime_error("We support only encoded types for protobuf multi response."); + } + + auto region_name = rs.value(); + if (action_result.has_exception()) { + auto ew = ResponseConverter::GetRemoteException(action_result.exception()); + VLOG(8) << "Store Remote Region Exception:- " << ew->what().toStdString() << "; Region[" + << region_name << "];"; + multi_response->AddRegionException(region_name, ew); + continue; + } + + if (actions.action_size() != action_result.resultorexception_size()) { + throw std::runtime_error("actions.action_size=" + std::to_string(actions.action_size()) + + ", action_result.resultorexception_size=" + + std::to_string(action_result.resultorexception_size()) + + " for region " + actions.region().value()); + } + + auto multi_actions = actions_by_region.at(region_name)->actions(); + uint64_t multi_actions_num = 0; + for (hbase::pb::ResultOrException roe : action_result.resultorexception()) { + std::shared_ptr<Result> result; + std::shared_ptr<folly::exception_wrapper> ew; + if (roe.has_exception()) { + auto ew = ResponseConverter::GetRemoteException(roe.exception()); + VLOG(8) << "Store Remote Region Exception:- " << ew->what().toStdString() << "; Region[" + << region_name << "];"; + multi_response->AddRegionException(region_name, ew); + } else if (roe.has_result()) { + result = ToResult(roe.result(), resp.cell_scanner()); + } else if (roe.has_service_result()) { + // TODO Not processing Coprocessor Service Result; + } else { + // Sometimes, the response is just "it was processed". Generally, this occurs for things + // like mutateRows where either we get back 'processed' (or not) and optionally some + // statistics about the regions we touched. + std::vector<std::shared_ptr<Cell>> empty_cells; + result = std::make_shared<Result>(empty_cells, multi_resp->processed() ? true : false, + false, false); + } + // We add the original index of the multi-action so that when populating the response back we + // do it as per the action index + multi_response->AddRegionResult( + region_name, multi_actions[multi_actions_num]->original_index(), std::move(result), ew); + multi_actions_num++; + } + } + + if (multi_resp->has_regionstatistics()) { + hbase::pb::MultiRegionLoadStats stats = multi_resp->regionstatistics(); + for (int i = 0; i < stats.region_size(); i++) { + multi_response->AddStatistic(stats.region(i).value(), + std::make_shared<RegionLoadStats>(stats.stat(i))); + } + } + return multi_response; +} + +std::shared_ptr<folly::exception_wrapper> ResponseConverter::GetRemoteException( + const hbase::pb::NameBytesPair& exc_resp) { + std::string what; + std::string exception_class_name = exc_resp.has_name() ? exc_resp.name() : ""; + std::string stack_trace = exc_resp.has_value() ? exc_resp.value() : ""; + + what.append(exception_class_name).append(stack_trace); + auto remote_exception = std::make_unique<RemoteException>(what); + remote_exception->set_exception_class_name(exception_class_name) + ->set_stack_trace(stack_trace) + ->set_hostname("") + ->set_port(0); + + return std::make_shared<folly::exception_wrapper>( + folly::make_exception_wrapper<RemoteException>(*remote_exception)); +} +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/result-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/result-test.cc b/hbase-native-client/src/hbase/client/result-test.cc new file mode 100644 index 0000000..684c08d --- /dev/null +++ b/hbase-native-client/src/hbase/client/result-test.cc @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <glog/logging.h> +#include <gtest/gtest.h> +#include <limits> +#include <memory> +#include <string> +#include <vector> + +#include "hbase/client/cell.h" +#include "hbase/client/result.h" + +using hbase::Cell; +using hbase::CellType; +using hbase::Result; +using std::experimental::nullopt; + +void PopulateCells(std::vector<std::shared_ptr<Cell> > &cells) { + // Populate some Results + // We assume that for a single Cell, the corresponding row, families and + // qualifiers are present. + // We have also considered different versions in the test for the same row. + std::string row = "row"; + for (int i = 0; i < 10; i++) { + std::string family = "family-" + std::to_string(i); + std::string column = "column-" + std::to_string(i); + std::string value = "value-" + std::to_string(i); + + switch (i) { + case 5: { + cells.push_back( + std::make_shared<Cell>(row, family, column, 1482113040506, "value-5", CellType::PUT)); + cells.push_back( + std::make_shared<Cell>(row, family, column, 1482111803856, "value-X", CellType::PUT)); + break; + } + case 8: { + cells.push_back( + std::make_shared<Cell>(row, family, column, 1482113040506, "value-8", CellType::PUT)); + cells.push_back( + std::make_shared<Cell>(row, family, column, 1482111803856, "value-X", CellType::PUT)); + cells.push_back( + std::make_shared<Cell>(row, family, column, 1482110969958, "value-Y", CellType::PUT)); + break; + } + case 9: { + cells.push_back( + std::make_shared<Cell>(row, family, column, 1482113040506, "value-9", CellType::PUT)); + cells.push_back( + std::make_shared<Cell>(row, family, column, 1482111803856, "value-X", CellType::PUT)); + cells.push_back( + std::make_shared<Cell>(row, family, column, 1482110969958, "value-Y", CellType::PUT)); + cells.push_back( + std::make_shared<Cell>(row, family, column, 1482110876075, "value-Z", CellType::PUT)); + break; + } + default: { + cells.push_back(std::make_shared<Cell>( + row, family, column, std::numeric_limits<int64_t>::max(), value, CellType::PUT)); + } + } + } + return; +} + +TEST(Result, EmptyResult) { + std::vector<std::shared_ptr<Cell> > cells; + Result result(cells, true, false, false); + EXPECT_EQ(true, result.IsEmpty()); + EXPECT_EQ(0, result.Size()); +} + +TEST(Result, FilledResult) { + std::vector<std::shared_ptr<Cell> > cells; + PopulateCells(cells); + + Result result(cells, true, false, false); + + EXPECT_EQ(false, result.IsEmpty()); + EXPECT_EQ(16, result.Size()); + + // Get Latest Cell for the given family and qualifier. + auto latest_cell(result.ColumnLatestCell("family", "column")); + // Nothing of the above family/qualifier combo is present so it should be + // nullptr + ASSERT_FALSE(latest_cell.get()); + + // Try to get the latest cell for the given family and qualifier. + latest_cell = result.ColumnLatestCell("family-4", "column-4"); + // Now shouldn't be a nullptr + ASSERT_TRUE(latest_cell.get()); + // And Value must match too + EXPECT_EQ("value-4", latest_cell->Value()); + + // Value will be nullptr as no such family and qualifier is present + ASSERT_FALSE(result.Value("family-4", "qualifier")); + // Value will be present as family and qualifier is present + ASSERT_TRUE(result.Value("family-4", "column-4") != nullopt); + // Value should be present and match. + EXPECT_EQ(latest_cell->Value(), (*result.ColumnLatestCell("family-4", "column-4")).Value()); + EXPECT_EQ("value-5", (*result.ColumnLatestCell("family-5", "column-5")).Value()); + EXPECT_EQ("value-8", (*result.ColumnLatestCell("family-8", "column-8")).Value()); + EXPECT_EQ("value-7", *result.Value("family-7", "column-7")); + + // Get cells for the given family and qualifier + auto column_cells = result.ColumnCells("family", "column"); + // Size should be 0 + EXPECT_EQ(0, column_cells.size()); + + // Size shouldn't be 0 and Row() and Value() must match + column_cells = result.ColumnCells("family-0", "column-0"); + EXPECT_EQ(1, column_cells.size()); + EXPECT_EQ("row", column_cells[0]->Row()); + EXPECT_EQ("row", result.Row()); + + // Size shouldn't be 0 and Row() and Value() must match + column_cells = result.ColumnCells("family-5", "column-5"); + EXPECT_EQ(2, column_cells.size()); + EXPECT_EQ("row", column_cells[0]->Row()); + EXPECT_EQ("row", column_cells[1]->Row()); + EXPECT_EQ("value-5", column_cells[0]->Value()); + EXPECT_EQ("value-X", column_cells[1]->Value()); + EXPECT_EQ("row", result.Row()); + + // Size shouldn't be 0 and Row() and Value() must match + column_cells = result.ColumnCells("family-8", "column-8"); + EXPECT_EQ(3, column_cells.size()); + EXPECT_EQ("row", column_cells[0]->Row()); + EXPECT_EQ("row", column_cells[1]->Row()); + EXPECT_EQ("row", column_cells[2]->Row()); + EXPECT_EQ("value-8", column_cells[0]->Value()); + EXPECT_EQ("value-X", column_cells[1]->Value()); + EXPECT_EQ("value-Y", column_cells[2]->Value()); + EXPECT_EQ("row", result.Row()); + + // Size shouldn't be 0 and Row() and Value() must match + column_cells = result.ColumnCells("family-9", "column-9"); + EXPECT_EQ(4, column_cells.size()); + EXPECT_EQ("row", column_cells[0]->Row()); + EXPECT_EQ("row", column_cells[1]->Row()); + EXPECT_EQ("row", column_cells[2]->Row()); + EXPECT_EQ("row", column_cells[3]->Row()); + EXPECT_EQ("value-9", column_cells[0]->Value()); + EXPECT_EQ("value-X", column_cells[1]->Value()); + EXPECT_EQ("value-Y", column_cells[2]->Value()); + EXPECT_EQ("value-Z", column_cells[3]->Value()); + EXPECT_EQ("row", result.Row()); + + // Test all the Cell values + const auto &result_cells = result.Cells(); + int i = 0, j = 0; + for (const auto &cell : result_cells) { + std::string row = "row"; + std::string family = "family-" + std::to_string(i); + std::string column = "column-" + std::to_string(i); + std::string value = "value-" + std::to_string(i); + switch (j) { + case 6: + case 10: + case 13: { + EXPECT_EQ("value-X", cell->Value()); + ++j; + continue; + } + case 11: + case 14: { + EXPECT_EQ("value-Y", cell->Value()); + ++j; + continue; + } + case 15: { + EXPECT_EQ("value-Z", cell->Value()); + ++j; + continue; + } + } + EXPECT_EQ(row, cell->Row()); + EXPECT_EQ(family, cell->Family()); + EXPECT_EQ(column, cell->Qualifier()); + EXPECT_EQ(value, cell->Value()); + ++i; + ++j; + } + + auto result_map_tmp = result.Map(); + result_map_tmp["testf"]["testq"][1] = "value"; + EXPECT_EQ(11, result_map_tmp.size()); + + auto result_map = result.Map(); + EXPECT_EQ(10, result_map.size()); + + i = 0; + for (auto family_map : result_map) { + std::string family = "family-" + std::to_string(i); + std::string qualifier = "column-" + std::to_string(i); + std::string value = "value-" + std::to_string(i); + EXPECT_EQ(family, family_map.first); + for (auto qualifier_map : family_map.second) { + EXPECT_EQ(qualifier, qualifier_map.first); + j = 0; + for (auto version_map : qualifier_map.second) { + switch (i) { + case 5: { + if (1 == j) { + EXPECT_EQ(1482111803856, version_map.first); + EXPECT_EQ("value-X", version_map.second); + } else if (0 == j) { + EXPECT_EQ(1482113040506, version_map.first); + EXPECT_EQ("value-5", version_map.second); + } + break; + } + case 8: { + if (2 == j) { + EXPECT_EQ(1482110969958, version_map.first); + EXPECT_EQ("value-Y", version_map.second); + } else if (1 == j) { + EXPECT_EQ(1482111803856, version_map.first); + EXPECT_EQ("value-X", version_map.second); + } else if (0 == j) { + EXPECT_EQ(1482113040506, version_map.first); + EXPECT_EQ("value-8", version_map.second); + } + break; + } + case 9: { + if (3 == j) { + EXPECT_EQ(1482110876075, version_map.first); + EXPECT_EQ("value-Z", version_map.second); + } else if (2 == j) { + EXPECT_EQ(1482110969958, version_map.first); + EXPECT_EQ("value-Y", version_map.second); + } else if (1 == j) { + EXPECT_EQ(1482111803856, version_map.first); + EXPECT_EQ("value-X", version_map.second); + } else if (0 == j) { + EXPECT_EQ(1482113040506, version_map.first); + EXPECT_EQ("value-9", version_map.second); + } + break; + } + default: { + EXPECT_EQ(std::numeric_limits<int64_t>::max(), version_map.first); + EXPECT_EQ(value, version_map.second); + } + } + ++j; + } + } + ++i; + } + + auto family_map = result.FamilyMap("family-0"); + EXPECT_EQ(1, family_map.size()); + i = 0; + for (auto qual_val_map : family_map) { + EXPECT_EQ("column-0", qual_val_map.first); + EXPECT_EQ("value-0", qual_val_map.second); + } + + family_map = result.FamilyMap("family-1"); + EXPECT_EQ(1, family_map.size()); + i = 0; + for (auto qual_val_map : family_map) { + EXPECT_EQ("column-1", qual_val_map.first); + EXPECT_EQ("value-1", qual_val_map.second); + } + + family_map = result.FamilyMap("family-5"); + EXPECT_EQ(1, family_map.size()); + i = 0; + for (auto qual_val_map : family_map) { + EXPECT_EQ("column-5", qual_val_map.first); + EXPECT_EQ("value-5", qual_val_map.second); + } + + family_map = result.FamilyMap("family-9"); + EXPECT_EQ(1, family_map.size()); + i = 0; + for (auto qual_val_map : family_map) { + EXPECT_EQ("column-9", qual_val_map.first); + EXPECT_EQ("value-9", qual_val_map.second); + } +} + +TEST(Result, ResultEstimatedSize) { + CellType cell_type = CellType::PUT; + int64_t timestamp = std::numeric_limits<int64_t>::max(); + std::vector<std::shared_ptr<Cell> > cells; + Result empty(cells, true, false, false); + + EXPECT_EQ(empty.EstimatedSize(), sizeof(Result)); + + cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type)); + Result result1(cells, true, false, false); + EXPECT_TRUE(result1.EstimatedSize() > empty.EstimatedSize()); + + cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type)); + Result result2(cells, true, false, false); + EXPECT_TRUE(result2.EstimatedSize() > result1.EstimatedSize()); + + LOG(INFO) << empty.EstimatedSize(); + LOG(INFO) << result1.EstimatedSize(); + LOG(INFO) << result2.EstimatedSize(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/result.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/result.cc b/hbase-native-client/src/hbase/client/result.cc new file mode 100644 index 0000000..a2f56aa --- /dev/null +++ b/hbase-native-client/src/hbase/client/result.cc @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/result.h" + +namespace hbase { + +Result::~Result() {} + +Result::Result(const std::vector<std::shared_ptr<Cell> > &cells, bool exists, bool stale, + bool partial) + : exists_(exists), stale_(stale), partial_(partial), cells_(cells) { + row_ = (cells_.size() == 0 ? "" : cells_[0]->Row()); +} + +Result::Result(const Result &result) { + exists_ = result.exists_; + stale_ = result.stale_; + partial_ = result.partial_; + row_ = result.row_; + if (!result.cells_.empty()) { + for (const auto &cell : result.cells_) { + cells_.push_back(cell); + } + } +} + +const std::vector<std::shared_ptr<Cell> > &Result::Cells() const { return cells_; } + +std::vector<std::shared_ptr<Cell> > Result::ColumnCells(const std::string &family, + const std::string &qualifier) const { + std::vector<std::shared_ptr<Cell> > column_cells; + // TODO implement a BinarySearch here ? + for (const auto &cell : cells_) { + if (cell->Family() == family && cell->Qualifier() == qualifier) { + column_cells.push_back(cell); + } + } + return column_cells; +} + +const std::shared_ptr<Cell> Result::ColumnLatestCell(const std::string &family, + const std::string &qualifier) const { + // TODO implement a BinarySearch here ? + for (const auto &cell : cells_) { + // We find the latest(first) occurrence of the Cell for a given column and + // qualifier and break + if (cell->Family() == family && cell->Qualifier() == qualifier) { + return cell; + } + } + return nullptr; +} + +optional<std::string> Result::Value(const std::string &family, const std::string &qualifier) const { + std::shared_ptr<Cell> latest_cell(ColumnLatestCell(family, qualifier)); + if (latest_cell.get()) { + return optional<std::string>(latest_cell->Value()); + } + return optional<std::string>(); +} + +bool Result::IsEmpty() const { return cells_.empty(); } + +const std::string &Result::Row() const { return row_; } + +int Result::Size() const { return cells_.size(); } + +ResultMap Result::Map() const { + ResultMap result_map; + for (const auto &cell : cells_) { + result_map[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value(); + } + return result_map; +} + +std::map<std::string, std::string> Result::FamilyMap(const std::string &family) const { + std::map<std::string, std::string> family_map; + if (!IsEmpty()) { + auto result_map = Map(); + auto itr = result_map.find(family); + if (itr == result_map.end()) { + return family_map; + } + + for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) { + for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) { + // We break after inserting the first value. Result.java takes only + // the first value + family_map[qitr->first] = vitr->second; + break; + } + } + } + + return family_map; +} + +std::string Result::DebugString() const { + std::string ret{"keyvalues="}; + if (IsEmpty()) { + ret += "NONE"; + return ret; + } + ret += "{"; + bool is_first = true; + for (const auto &cell : cells_) { + if (is_first) { + is_first = false; + } else { + ret += ", "; + } + ret += cell->DebugString(); + } + ret += "}"; + + return ret; +} + +size_t Result::EstimatedSize() const { + size_t s = sizeof(Result); + s += row_.capacity(); + for (const auto c : cells_) { + s += sizeof(std::shared_ptr<Cell>); + s + c->EstimatedSize(); + } + return s; +} + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/scan-result-cache-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/scan-result-cache-test.cc b/hbase-native-client/src/hbase/client/scan-result-cache-test.cc new file mode 100644 index 0000000..4c10a05 --- /dev/null +++ b/hbase-native-client/src/hbase/client/scan-result-cache-test.cc @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <folly/Conv.h> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include <vector> + +#include "hbase/client/cell.h" +#include "hbase/client/result.h" +#include "hbase/client/scan-result-cache.h" + +using hbase::ScanResultCache; +using hbase::Result; +using hbase::Cell; +using hbase::CellType; + +using ResultVector = std::vector<std::shared_ptr<Result>>; + +std::shared_ptr<Cell> CreateCell(const int32_t &key, const std::string &family, + const std::string &column) { + auto row = folly::to<std::string>(key); + return std::make_shared<Cell>(row, family, column, std::numeric_limits<int64_t>::max(), row, + CellType::PUT); +} + +std::shared_ptr<Result> CreateResult(std::shared_ptr<Cell> cell, bool partial) { + return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{cell}, false, false, partial); +} + +TEST(ScanResultCacheTest, NoPartial) { + ScanResultCache cache; + ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, false)); + ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, true)); + int32_t count = 10; + ResultVector results{}; + for (int32_t i = 0; i < count; i++) { + results.push_back(CreateResult(CreateCell(i, "cf", "cq1"), false)); + } + ASSERT_EQ(results, cache.AddAndGet(results, false)); +} + +TEST(ScanResultCacheTest, Combine1) { + ScanResultCache cache; + auto prev_result = CreateResult(CreateCell(0, "cf", "cq1"), true); + auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); + auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true); + auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true); + auto results = cache.AddAndGet(ResultVector{prev_result, result1}, false); + ASSERT_EQ(1L, results.size()); + ASSERT_EQ(prev_result, results[0]); + + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{}, true).size()); + + results = cache.AddAndGet(ResultVector{}, false); + ASSERT_EQ(1, results.size()); + ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(3, results[0]->Cells().size()); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3"))); +} + +TEST(ScanResultCacheTest, Combine2) { + ScanResultCache cache; + auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); + auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true); + auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true); + + auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true); + auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq2"), false); + + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size()); + + auto results = cache.AddAndGet(ResultVector{next_result1}, false); + ASSERT_EQ(1, results.size()); + ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(3, results[0]->Cells().size()); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3"))); + + results = cache.AddAndGet(ResultVector{next_to_next_result1}, false); + ASSERT_EQ(2, results.size()); + ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(1, results[0]->Cells().size()); + ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(3, folly::to<int32_t>(results[1]->Row())); + ASSERT_EQ(1, results[1]->Cells().size()); + ASSERT_EQ(3, folly::to<int32_t>(*results[1]->Value("cf", "cq2"))); +} + +TEST(ScanResultCacheTest, Combine3) { + ScanResultCache cache; + auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); + auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true); + auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), false); + auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq1"), true); + + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size()); + + auto results = cache.AddAndGet(ResultVector{next_result1, next_to_next_result1}, false); + + ASSERT_EQ(2, results.size()); + ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(2, results[0]->Cells().size()); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); + ASSERT_EQ(2, folly::to<int32_t>(results[1]->Row())); + ASSERT_EQ(1, results[1]->Cells().size()); + ASSERT_EQ(2, folly::to<int32_t>(*results[1]->Value("cf", "cq1"))); + + results = cache.AddAndGet(ResultVector{}, false); + + ASSERT_EQ(1, results.size()); + ASSERT_EQ(3, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(1, results[0]->Cells().size()); + ASSERT_EQ(3, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); +} + +TEST(ScanResultCacheTest, Combine4) { + ScanResultCache cache; + auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); + auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), false); + auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true); + auto next_result2 = CreateResult(CreateCell(2, "cf", "cq2"), false); + + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size()); + + auto results = cache.AddAndGet(ResultVector{result2, next_result1}, false); + + ASSERT_EQ(1, results.size()); + ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(2, results[0]->Cells().size()); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); + + results = cache.AddAndGet(ResultVector{next_result2}, false); + + ASSERT_EQ(1, results.size()); + ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(2, results[0]->Cells().size()); + ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); +} + +TEST(ScanResultCacheTest, SizeOf) { + std::string e{""}; + std::string f{"f"}; + std::string foo{"foo"}; + + LOG(INFO) << sizeof(e) << " " << e.capacity(); + LOG(INFO) << sizeof(f) << " " << f.capacity(); + LOG(INFO) << sizeof(foo) << " " << foo.capacity(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/scan-result-cache.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/scan-result-cache.cc b/hbase-native-client/src/hbase/client/scan-result-cache.cc new file mode 100644 index 0000000..e74a7d6 --- /dev/null +++ b/hbase-native-client/src/hbase/client/scan-result-cache.cc @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/scan-result-cache.h" +#include <algorithm> +#include <iterator> +#include <limits> +#include <stdexcept> + +namespace hbase { +/** + * Add the given results to cache and get valid results back. + * @param results the results of a scan next. Must not be null. + * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response. + * @return valid results, never null. + */ +std::vector<std::shared_ptr<Result>> ScanResultCache::AddAndGet( + const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat) { + // If no results were returned it indicates that either we have the all the partial results + // necessary to construct the complete result or the server had to send a heartbeat message + // to the client to keep the client-server connection alive + if (results.empty()) { + // If this response was an empty heartbeat message, then we have not exhausted the region + // and thus there may be more partials server side that still need to be added to the partial + // list before we form the complete Result + if (!partial_results_.empty() && !is_hearthbeat) { + return UpdateNumberOfCompleteResultsAndReturn( + std::vector<std::shared_ptr<Result>>{Combine()}); + } + return std::vector<std::shared_ptr<Result>>{}; + } + // In every RPC response there should be at most a single partial result. Furthermore, if + // there is a partial result, it is guaranteed to be in the last position of the array. + auto last = results[results.size() - 1]; + if (last->Partial()) { + if (partial_results_.empty()) { + partial_results_.push_back(last); + std::vector<std::shared_ptr<Result>> new_results; + std::copy_n(results.begin(), results.size() - 1, std::back_inserter(new_results)); + return UpdateNumberOfCompleteResultsAndReturn(new_results); + } + // We have only one result and it is partial + if (results.size() == 1) { + // check if there is a row change + if (partial_results_.at(0)->Row() == last->Row()) { + partial_results_.push_back(last); + return std::vector<std::shared_ptr<Result>>{}; + } + auto complete_result = Combine(); + partial_results_.push_back(last); + return UpdateNumberOfCompleteResultsAndReturn(complete_result); + } + // We have some complete results + auto results_to_return = PrependCombined(results, results.size() - 1); + partial_results_.push_back(last); + return UpdateNumberOfCompleteResultsAndReturn(results_to_return); + } + if (!partial_results_.empty()) { + return UpdateNumberOfCompleteResultsAndReturn(PrependCombined(results, results.size())); + } + return UpdateNumberOfCompleteResultsAndReturn(results); +} + +void ScanResultCache::Clear() { partial_results_.clear(); } + +std::shared_ptr<Result> ScanResultCache::CreateCompleteResult( + const std::vector<std::shared_ptr<Result>> &partial_results) { + if (partial_results.empty()) { + return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{}, false, false, false); + } + std::vector<std::shared_ptr<Cell>> cells{}; + bool stale = false; + std::string prev_row = ""; + std::string current_row = ""; + size_t i = 0; + for (const auto &r : partial_results) { + current_row = r->Row(); + if (i != 0 && prev_row != current_row) { + throw new std::runtime_error( + "Cannot form complete result. Rows of partial results do not match."); + } + // Ensure that all Results except the last one are marked as partials. The last result + // may not be marked as a partial because Results are only marked as partials when + // the scan on the server side must be stopped due to reaching the maxResultSize. + // Visualizing it makes it easier to understand: + // maxResultSize: 2 cells + // (-x-) represents cell number x in a row + // Example: row1: -1- -2- -3- -4- -5- (5 cells total) + // How row1 will be returned by the server as partial Results: + // Result1: -1- -2- (2 cells, size limit reached, mark as partial) + // Result2: -3- -4- (2 cells, size limit reached, mark as partial) + // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) + if (i != partial_results.size() - 1 && !r->Partial()) { + throw new std::runtime_error("Cannot form complete result. Result is missing partial flag."); + } + prev_row = current_row; + stale = stale || r->Stale(); + for (const auto &c : r->Cells()) { + cells.push_back(c); + } + i++; + } + + return std::make_shared<Result>(cells, false, stale, false); +} + +std::shared_ptr<Result> ScanResultCache::Combine() { + auto result = CreateCompleteResult(partial_results_); + partial_results_.clear(); + return result; +} + +std::vector<std::shared_ptr<Result>> ScanResultCache::PrependCombined( + const std::vector<std::shared_ptr<Result>> &results, int length) { + if (length == 0) { + return std::vector<std::shared_ptr<Result>>{Combine()}; + } + // the last part of a partial result may not be marked as partial so here we need to check if + // there is a row change. + size_t start; + if (partial_results_[0]->Row() == results[0]->Row()) { + partial_results_.push_back(results[0]); + start = 1; + length--; + } else { + start = 0; + } + std::vector<std::shared_ptr<Result>> prepend_results{}; + prepend_results.push_back(Combine()); + std::copy_n(results.begin() + start, length, std::back_inserter(prepend_results)); + return prepend_results; +} + +std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn( + const std::shared_ptr<Result> &result) { + return UpdateNumberOfCompleteResultsAndReturn(std::vector<std::shared_ptr<Result>>{result}); +} + +std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn( + const std::vector<std::shared_ptr<Result>> &results) { + num_complete_rows_ += results.size(); + return results; +} +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/scan-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/scan-test.cc b/hbase-native-client/src/hbase/client/scan-test.cc new file mode 100644 index 0000000..ba3a029 --- /dev/null +++ b/hbase-native-client/src/hbase/client/scan-test.cc @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <gtest/gtest.h> +#include <limits> + +#include "hbase/client/scan.h" + +using hbase::Get; +using hbase::Scan; + +void CheckFamilies(Scan &scan) { + EXPECT_EQ(false, scan.HasFamilies()); + scan.AddFamily("family-1"); + EXPECT_EQ(true, scan.HasFamilies()); + EXPECT_EQ(1, scan.FamilyMap().size()); + for (const auto &family : scan.FamilyMap()) { + EXPECT_STREQ("family-1", family.first.c_str()); + EXPECT_EQ(0, family.second.size()); + } + // Not allowed to add the same CF. + scan.AddFamily("family-1"); + EXPECT_EQ(1, scan.FamilyMap().size()); + scan.AddFamily("family-2"); + EXPECT_EQ(2, scan.FamilyMap().size()); + scan.AddFamily("family-3"); + EXPECT_EQ(3, scan.FamilyMap().size()); + int i = 1; + for (const auto &family : scan.FamilyMap()) { + std::string family_name = "family-" + std::to_string(i); + EXPECT_STREQ(family_name.c_str(), family.first.c_str()); + EXPECT_EQ(0, family.second.size()); + i += 1; + } + + scan.AddColumn("family-1", "column-1"); + scan.AddColumn("family-1", "column-2"); + scan.AddColumn("family-1", ""); + scan.AddColumn("family-1", "column-3"); + scan.AddColumn("family-2", "column-X"); + + EXPECT_EQ(3, scan.FamilyMap().size()); + auto it = scan.FamilyMap().begin(); + EXPECT_STREQ("family-1", it->first.c_str()); + EXPECT_EQ(4, it->second.size()); + EXPECT_STREQ("column-1", it->second[0].c_str()); + EXPECT_STREQ("column-2", it->second[1].c_str()); + EXPECT_STREQ("", it->second[2].c_str()); + EXPECT_STREQ("column-3", it->second[3].c_str()); + ++it; + EXPECT_STREQ("family-2", it->first.c_str()); + EXPECT_EQ(1, it->second.size()); + EXPECT_STREQ("column-X", it->second[0].c_str()); + ++it; + EXPECT_STREQ("family-3", it->first.c_str()); + EXPECT_EQ(0, it->second.size()); + ++it; + EXPECT_EQ(it, scan.FamilyMap().end()); +} + +void CheckFamiliesAfterCopy(const Scan &scan) { + EXPECT_EQ(true, scan.HasFamilies()); + EXPECT_EQ(3, scan.FamilyMap().size()); + int i = 1; + for (const auto &family : scan.FamilyMap()) { + std::string family_name = "family-" + std::to_string(i); + EXPECT_STREQ(family_name.c_str(), family.first.c_str()); + i += 1; + } + // Check if the alreaday added CF's and CQ's are as expected + auto it = scan.FamilyMap().begin(); + EXPECT_STREQ("family-1", it->first.c_str()); + EXPECT_EQ(4, it->second.size()); + EXPECT_STREQ("column-1", it->second[0].c_str()); + EXPECT_STREQ("column-2", it->second[1].c_str()); + EXPECT_STREQ("", it->second[2].c_str()); + EXPECT_STREQ("column-3", it->second[3].c_str()); + ++it; + EXPECT_STREQ("family-2", it->first.c_str()); + EXPECT_EQ(1, it->second.size()); + EXPECT_STREQ("column-X", it->second[0].c_str()); + ++it; + EXPECT_STREQ("family-3", it->first.c_str()); + EXPECT_EQ(0, it->second.size()); + ++it; + EXPECT_EQ(it, scan.FamilyMap().end()); +} + +void ScanMethods(Scan &scan) { + scan.SetReversed(true); + EXPECT_EQ(true, scan.IsReversed()); + scan.SetReversed(false); + EXPECT_EQ(false, scan.IsReversed()); + + std::string start_row("start-row"); + std::string stop_row("stop-row"); + scan.SetStartRow(start_row); + EXPECT_EQ(start_row, scan.StartRow()); + + scan.SetStopRow(stop_row); + EXPECT_EQ(stop_row, scan.StopRow()); + + scan.SetCaching(3); + EXPECT_EQ(3, scan.Caching()); + + scan.SetConsistency(hbase::pb::Consistency::STRONG); + EXPECT_EQ(hbase::pb::Consistency::STRONG, scan.Consistency()); + scan.SetConsistency(hbase::pb::Consistency::TIMELINE); + EXPECT_EQ(hbase::pb::Consistency::TIMELINE, scan.Consistency()); + + scan.SetCacheBlocks(true); + EXPECT_EQ(true, scan.CacheBlocks()); + scan.SetCacheBlocks(false); + EXPECT_EQ(false, scan.CacheBlocks()); + + scan.SetAllowPartialResults(true); + EXPECT_EQ(true, scan.AllowPartialResults()); + scan.SetAllowPartialResults(false); + EXPECT_EQ(false, scan.AllowPartialResults()); + + scan.SetLoadColumnFamiliesOnDemand(true); + EXPECT_EQ(true, scan.LoadColumnFamiliesOnDemand()); + scan.SetLoadColumnFamiliesOnDemand(false); + EXPECT_EQ(false, scan.LoadColumnFamiliesOnDemand()); + + scan.SetMaxVersions(); + EXPECT_EQ(1, scan.MaxVersions()); + scan.SetMaxVersions(20); + EXPECT_EQ(20, scan.MaxVersions()); + + scan.SetMaxResultSize(1024); + EXPECT_EQ(1024, scan.MaxResultSize()); + + // Test initial values + EXPECT_EQ(0, scan.Timerange().MinTimeStamp()); + EXPECT_EQ(std::numeric_limits<int64_t>::max(), scan.Timerange().MaxTimeStamp()); + + // Set & Test new values using TimeRange and TimeStamp + scan.SetTimeRange(1000, 2000); + EXPECT_EQ(1000, scan.Timerange().MinTimeStamp()); + EXPECT_EQ(2000, scan.Timerange().MaxTimeStamp()); + scan.SetTimeStamp(0); + EXPECT_EQ(0, scan.Timerange().MinTimeStamp()); + EXPECT_EQ(1, scan.Timerange().MaxTimeStamp()); + + // Test some exceptions + ASSERT_THROW(scan.SetTimeRange(-1000, 2000), std::runtime_error); + ASSERT_THROW(scan.SetTimeRange(1000, -2000), std::runtime_error); + ASSERT_THROW(scan.SetTimeRange(1000, 200), std::runtime_error); + ASSERT_THROW(scan.SetTimeStamp(std::numeric_limits<int64_t>::max()), std::runtime_error); +} + +TEST(Scan, Object) { + Scan scan; + ScanMethods(scan); + CheckFamilies(scan); + + // Resetting TimeRange values so that the copy construction and assignment + // operator tests pass. + scan.SetTimeRange(0, std::numeric_limits<int64_t>::max()); + Scan scancp(scan); + ScanMethods(scancp); + CheckFamiliesAfterCopy(scancp); + + Scan scaneq; + scaneq = scan; + ScanMethods(scaneq); + CheckFamiliesAfterCopy(scaneq); +} + +TEST(Scan, WithStartRow) { + Scan("row-test"); + Scan scan("row-test"); + ScanMethods(scan); + CheckFamilies(scan); +} + +TEST(Scan, WithStartAndStopRow) { + Scan("start-row", "stop-row"); + Scan scan("start-row", "stop-row"); + ScanMethods(scan); + CheckFamilies(scan); +} + +TEST(Scan, FromGet) { + std::string row_str = "row-test"; + Get get = Get(row_str); + + get.SetCacheBlocks(true); + get.SetMaxVersions(5); + get.AddFamily("family-1"); + get.AddFamily("family-1"); + get.AddFamily("family-2"); + get.AddFamily("family-3"); + get.AddColumn("family-1", "column-1"); + get.AddColumn("family-1", "column-2"); + get.AddColumn("family-1", ""); + get.AddColumn("family-1", "column-3"); + get.AddColumn("family-2", "column-X"); + + EXPECT_EQ(3, get.FamilyMap().size()); + + Scan scan(get); + ScanMethods(scan); + CheckFamiliesAfterCopy(scan); +} + +TEST(Scan, Exception) { + std::string row(std::numeric_limits<int16_t>::max() + 1, 'X'); + ASSERT_THROW(Scan tmp(row), std::runtime_error); + ASSERT_THROW(Scan tmp(""), std::runtime_error); +}