http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/raw-async-table.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc index f71fbba..998e2f1 100644 --- a/hbase-native-client/core/raw-async-table.cc +++ b/hbase-native-client/core/raw-async-table.cc @@ -111,9 +111,10 @@ folly::Future<folly::Unit> RawAsyncTable::Put(const hbase::Put& put) { folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) { auto caller = CreateCallerBuilder<folly::Unit>(del.row(), connection_conf_->write_rpc_timeout()) - ->action([=, &del](std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc, - std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> { + ->action([=, &del]( + std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<folly::Unit> { return Call<hbase::Delete, hbase::Request, hbase::Response, folly::Unit>( rpc_client, controller, loc, del, &hbase::RequestConverter::DeleteToMutateRequest, [](const Response& r) -> folly::Unit { return folly::unit; }); @@ -143,4 +144,24 @@ folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::B return caller->Call().then([caller](auto r) { return r; }); } + +void RawAsyncTable::Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer) { + auto scanner = AsyncClientScanner::Create( + connection_, SetDefaultScanConfig(scan), table_name_, consumer, connection_conf_->pause(), + connection_conf_->max_retries(), connection_conf_->scan_timeout(), + connection_conf_->rpc_timeout(), connection_conf_->start_log_errors_count()); + scanner->Start(); +} + +std::shared_ptr<hbase::Scan> RawAsyncTable::SetDefaultScanConfig(const hbase::Scan& scan) { + // always create a new scan object as we may reset the start row later. + auto new_scan = std::make_shared<hbase::Scan>(scan); + if (new_scan->Caching() <= 0) { + new_scan->SetCaching(default_scanner_caching_); + } + if (new_scan->MaxResultSize() <= 0) { + new_scan->SetMaxResultSize(default_scanner_max_result_size_); + } + return new_scan; +} } // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/raw-async-table.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h index c8e9f2f..8c40dae 100644 --- a/hbase-native-client/core/raw-async-table.h +++ b/hbase-native-client/core/raw-async-table.h @@ -24,7 +24,9 @@ #include <memory> #include <string> #include <vector> + #include "core/async-batch-rpc-retrying-caller.h" +#include "core/async-client-scanner.h" #include "core/async-connection.h" #include "core/async-rpc-retrying-caller-factory.h" #include "core/async-rpc-retrying-caller.h" @@ -34,6 +36,7 @@ #include "core/increment.h" #include "core/put.h" #include "core/result.h" +#include "core/scan.h" namespace hbase { @@ -48,14 +51,22 @@ class RawAsyncTable { : connection_(connection), connection_conf_(connection->connection_conf()), table_name_(table_name), - rpc_client_(connection->rpc_client()) {} + rpc_client_(connection->rpc_client()) { + default_scanner_caching_ = connection_conf_->scanner_caching(); + default_scanner_max_result_size_ = connection_conf_->scanner_max_result_size(); + } virtual ~RawAsyncTable() = default; folly::Future<std::shared_ptr<Result>> Get(const hbase::Get& get); - folly::Future<folly::Unit> Delete(const hbase::Delete& del); - folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment); + folly::Future<folly::Unit> Delete(const hbase::Delete& del); + + folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment); + folly::Future<folly::Unit> Put(const hbase::Put& put); + + void Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer); + void Close() {} folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Get( @@ -69,6 +80,8 @@ class RawAsyncTable { std::shared_ptr<ConnectionConfiguration> connection_conf_; std::shared_ptr<pb::TableName> table_name_; std::shared_ptr<RpcClient> rpc_client_; + int32_t default_scanner_caching_; + int64_t default_scanner_max_result_size_; /* Methods */ template <typename REQ, typename PREQ, typename PRESP, typename RESP> @@ -81,5 +94,7 @@ class RawAsyncTable { template <typename RESP> std::shared_ptr<SingleRequestCallerBuilder<RESP>> CreateCallerBuilder( std::string row, std::chrono::nanoseconds rpc_timeout); + + std::shared_ptr<hbase::Scan> SetDefaultScanConfig(const hbase::Scan& scan); }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/raw-scan-result-consumer.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/raw-scan-result-consumer.h b/hbase-native-client/core/raw-scan-result-consumer.h new file mode 100644 index 0000000..b7c3c48 --- /dev/null +++ b/hbase-native-client/core/raw-scan-result-consumer.h @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/ExceptionWrapper.h> +#include <folly/Logging.h> +#include <chrono> +#include <memory> +#include <string> +#include <thread> +#include <vector> + +#include "core/result.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" + +namespace hbase { + +enum class ScanControllerState { kInitialized, kSuspended, kTerminated, kDestroyed }; + +enum class ScanResumerState { kInitialized, kSuspended, kResumed }; + +/** + * Used to resume a scan. + */ +class ScanResumer { + public: + virtual ~ScanResumer() = default; + + /** + * Resume the scan. You are free to call it multiple time but only the first call will take + * effect. + */ + virtual void Resume() = 0; +}; + +/** + * Used to suspend or stop a scan. + * <p> + * Notice that, you should only call the methods below inside onNext or onHeartbeat method. A + * IllegalStateException will be thrown if you call them at other places. + * <p> + * You can only call one of the methods below, i.e., call suspend or terminate(of course you are + * free to not call them both), and the methods are not reentrant. A IllegalStateException will be + * thrown if you have already called one of the methods. + */ +class ScanController { + public: + virtual ~ScanController() = default; + + /** + * Suspend the scan. + * <p> + * This means we will stop fetching data in background, i.e., will not call onNext any more + * before you resume the scan. + * @return A resumer used to resume the scan later. + */ + virtual std::shared_ptr<ScanResumer> Suspend() = 0; + + /** + * Terminate the scan. + * <p> + * This is useful when you have got enough results and want to stop the scan in onNext method, + * or you want to stop the scan in onHeartbeat method because it has spent too many time. + */ + virtual void Terminate() = 0; +}; + +/** + * Receives {@link Result} for an asynchronous scan. + * <p> + * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread + * which we send request to HBase service. So if you want the asynchronous scanner fetch data from + * HBase in background while you process the returned data, you need to move the processing work to + * another thread to make the {@code onNext} call return immediately. And please do NOT do any time + * consuming tasks in all methods below unless you know what you are doing. + */ +class RawScanResultConsumer { + public: + virtual ~RawScanResultConsumer() = default; + + /** + * Indicate that we have receive some data. + * @param results the data fetched from HBase service. + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within scope of onNext method. You can only call its method in + * onNext, do NOT store it and call it later outside onNext. + */ + virtual void OnNext(const std::vector<std::shared_ptr<Result>> &results, + std::shared_ptr<ScanController> controller) {} + + /** + * Indicate that there is an heartbeat message but we have not cumulated enough cells to call + * onNext. + * <p> + * This method give you a chance to terminate a slow scan operation. + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within the scope of onHeartbeat method. You can only call its + * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. + */ + virtual void OnHeartbeat(std::shared_ptr<ScanController> controller) {} + + /** + * Indicate that we hit an unrecoverable error and the scan operation is terminated. + * <p> + * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. + */ + virtual void OnError(const folly::exception_wrapper &error) {} + + /** + * Indicate that the scan operation is completed normally. + */ + virtual void OnComplete() {} +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/region-location.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h index 4087d94..d5d9d67 100644 --- a/hbase-native-client/core/region-location.h +++ b/hbase-native-client/core/region-location.h @@ -26,7 +26,7 @@ namespace hbase { -enum RegionLocateType { kBefore, kCurrent, kAfter }; +enum class RegionLocateType { kBefore, kCurrent, kAfter }; /** * @brief class to hold where a region is located. http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/request-converter-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/request-converter-test.cc b/hbase-native-client/core/request-converter-test.cc index c01f16c..6c07a19 100644 --- a/hbase-native-client/core/request-converter-test.cc +++ b/hbase-native-client/core/request-converter-test.cc @@ -83,7 +83,6 @@ TEST(RequestConverter, ToScan) { scan.SetReversed(true); scan.SetStartRow(start_row); scan.SetStopRow(stop_row); - scan.SetSmall(true); scan.SetCaching(3); scan.SetConsistency(hbase::pb::Consistency::TIMELINE); scan.SetCacheBlocks(true); @@ -105,7 +104,7 @@ TEST(RequestConverter, ToScan) { EXPECT_TRUE(msg->scan().reversed()); EXPECT_EQ(msg->scan().start_row(), start_row); EXPECT_EQ(msg->scan().stop_row(), stop_row); - EXPECT_TRUE(msg->scan().small()); + EXPECT_FALSE(msg->scan().small()); EXPECT_EQ(msg->scan().caching(), 3); EXPECT_EQ(msg->scan().consistency(), hbase::pb::Consistency::TIMELINE); EXPECT_TRUE(msg->scan().cache_blocks()); http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/request-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index a1e63fe..6eb2f04 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -53,19 +53,11 @@ std::unique_ptr<Request> RequestConverter::ToGetRequest(const Get &get, return pb_req; } -std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan, - const std::string ®ion_name) { - auto pb_req = Request::scan(); - - auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); - - RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); - - auto pb_scan = pb_msg->mutable_scan(); +std::unique_ptr<hbase::pb::Scan> RequestConverter::ToScan(const Scan &scan) { + auto pb_scan = std::make_unique<hbase::pb::Scan>(); pb_scan->set_max_versions(scan.MaxVersions()); pb_scan->set_cache_blocks(scan.CacheBlocks()); pb_scan->set_reversed(scan.IsReversed()); - pb_scan->set_small(scan.IsSmall()); pb_scan->set_caching(scan.Caching()); pb_scan->set_start_row(scan.StartRow()); pb_scan->set_stop_row(scan.StopRow()); @@ -94,12 +86,78 @@ std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan, pb_scan->set_allocated_filter(Filter::ToProto(*(scan.filter())).release()); } - // TODO We will change this later. + return std::move(pb_scan); +} + +std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan, + const std::string ®ion_name) { + auto pb_req = Request::scan(); + auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); + + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_scan(ToScan(scan).release()); + + SetCommonScanRequestFields(pb_msg, false); + + return pb_req; +} + +std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan, + const std::string ®ion_name, + int32_t num_rows, bool close_scanner) { + auto pb_req = Request::scan(); + auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); + + RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); + + pb_msg->set_allocated_scan(ToScan(scan).release()); + + pb_msg->set_number_of_rows(num_rows); + pb_msg->set_close_scanner(close_scanner); + + SetCommonScanRequestFields(pb_msg, false); + + return pb_req; +} + +std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows, + bool close_scanner) { + auto pb_req = Request::scan(); + auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); + + pb_msg->set_number_of_rows(num_rows); + pb_msg->set_close_scanner(close_scanner); + pb_msg->set_scanner_id(scanner_id); + + SetCommonScanRequestFields(pb_msg, false); + + return pb_req; +} + +std::unique_ptr<Request> RequestConverter::ToScanRequest(int64_t scanner_id, int32_t num_rows, + bool close_scanner, + int64_t next_call_seq_id, bool renew) { + auto pb_req = Request::scan(); + auto pb_msg = std::static_pointer_cast<ScanRequest>(pb_req->req_msg()); + + pb_msg->set_number_of_rows(num_rows); + pb_msg->set_close_scanner(close_scanner); + pb_msg->set_scanner_id(scanner_id); + pb_msg->set_next_call_seq(next_call_seq_id); + + SetCommonScanRequestFields(pb_msg, renew); + return pb_req; +} + +void RequestConverter::SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest> pb_msg, + bool renew) { + // TODO We will change these later when we implement partial results and heartbeats, etc pb_msg->set_client_handles_partials(false); pb_msg->set_client_handles_heartbeats(false); pb_msg->set_track_scan_metrics(false); - - return pb_req; + pb_msg->set_renew(renew); + // TODO: set scan limit } std::unique_ptr<Request> RequestConverter::ToMultiRequest( @@ -123,7 +181,6 @@ std::unique_ptr<Request> RequestConverter::ToMultiRequest( } } - VLOG(3) << "Multi Req:-" << pb_req->req_msg()->ShortDebugString(); return pb_req; } @@ -190,13 +247,13 @@ std::unique_ptr<MutationProto> RequestConverter::ToMutation(const MutationType t DeleteType RequestConverter::ToDeleteType(const CellType type) { switch (type) { - case DELETE: + case CellType::DELETE: return pb::MutationProto_DeleteType_DELETE_ONE_VERSION; - case DELETE_COLUMN: + case CellType::DELETE_COLUMN: return pb::MutationProto_DeleteType_DELETE_MULTIPLE_VERSIONS; - case DELETE_FAMILY: + case CellType::DELETE_FAMILY: return pb::MutationProto_DeleteType_DELETE_FAMILY; - case DELETE_FAMILY_VERSION: + case CellType::DELETE_FAMILY_VERSION: return pb::MutationProto_DeleteType_DELETE_FAMILY_VERSION; default: throw std::runtime_error("Unknown delete type: " + folly::to<std::string>(type)); @@ -216,12 +273,11 @@ std::unique_ptr<Request> RequestConverter::ToMutateRequest(const Put &put, pb_msg->set_allocated_mutation( ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release()); - VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); return pb_req; } std::unique_ptr<Request> RequestConverter::DeleteToMutateRequest(const Delete &del, - const std::string ®ion_name) { + const std::string ®ion_name) { auto pb_req = Request::mutate(); auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg()); RequestConverter::SetRegion(region_name, pb_msg->mutable_region()); http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/request-converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h index 6d57161..c807f45 100644 --- a/hbase-native-client/core/request-converter.h +++ b/hbase-native-client/core/request-converter.h @@ -66,9 +66,20 @@ class RequestConverter { */ static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string ®ion_name); + static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string ®ion_name, + int32_t num_rows, bool close_scanner); + + static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows, + bool close_scanner); + + static std::unique_ptr<Request> ToScanRequest(int64_t scanner_id, int32_t num_rows, + bool close_scanner, int64_t next_call_seq_id, + bool renew); + static std::unique_ptr<Request> ToMultiRequest(const ActionsByRegion ®ion_requests); - static std::unique_ptr<Request> DeleteToMutateRequest(const Delete &del,const std::string ®ion_name); + static std::unique_ptr<Request> DeleteToMutateRequest(const Delete &del, + const std::string ®ion_name); static std::unique_ptr<Request> ToMutateRequest(const Put &put, const std::string ®ion_name); @@ -91,8 +102,10 @@ class RequestConverter { */ static void SetRegion(const std::string ®ion_name, pb::RegionSpecifier *region_specifier); static std::unique_ptr<hbase::pb::Get> ToGet(const Get &get); + static std::unique_ptr<hbase::pb::Scan> ToScan(const Scan &scan); static DeleteType ToDeleteType(const CellType type); static bool IsDelete(const CellType type); + static void SetCommonScanRequestFields(std::shared_ptr<hbase::pb::ScanRequest>, bool renew); }; } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/response-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index 94b7875..d2d719b 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -54,7 +54,7 @@ std::shared_ptr<Result> ResponseConverter::FromMutateResponse(const Response& re } std::shared_ptr<Result> ResponseConverter::ToResult( - const hbase::pb::Result& result, const std::unique_ptr<CellScanner>& cell_scanner) { + const hbase::pb::Result& result, const std::shared_ptr<CellScanner> cell_scanner) { std::vector<std::shared_ptr<Cell>> vcells; for (auto cell : result.cell()) { std::shared_ptr<Cell> pcell = @@ -82,34 +82,38 @@ std::shared_ptr<Result> ResponseConverter::ToResult( std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) { auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg()); - VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString(); - int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size() - : scan_resp->results_size(); + return FromScanResponse(scan_resp, resp.cell_scanner()); +} + +std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse( + const std::shared_ptr<ScanResponse> scan_resp, std::shared_ptr<CellScanner> cell_scanner) { + VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString() + << " cell_scanner:" << (cell_scanner == nullptr); + int num_results = + cell_scanner != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size(); std::vector<std::shared_ptr<Result>> results{static_cast<size_t>(num_results)}; for (int i = 0; i < num_results; i++) { - if (resp.cell_scanner() != nullptr) { + if (cell_scanner != nullptr) { // Cells are out in cellblocks. Group them up again as Results. How many to read at a // time will be found in getCellsLength -- length here is how many Cells in the i'th Result int num_cells = scan_resp->cells_per_result(i); std::vector<std::shared_ptr<Cell>> vcells; - while (resp.cell_scanner()->Advance()) { - vcells.push_back(resp.cell_scanner()->Current()); - } - // TODO: check associated cell count? - - if (vcells.size() != num_cells) { - std::string msg = "Results sent from server=" + std::to_string(num_results) + - ". But only got " + std::to_string(i) + - " results completely at client. Resetting the scanner to scan again."; - LOG(ERROR) << msg; - throw std::runtime_error(msg); + for (int j = 0; j < num_cells; j++) { + if (!cell_scanner->Advance()) { + std::string msg = "Results sent from server=" + std::to_string(num_results) + + ". But only got " + std::to_string(i) + + " results completely at client. Resetting the scanner to scan again."; + LOG(ERROR) << msg; + throw std::runtime_error(msg); + } + vcells.push_back(cell_scanner->Current()); } // TODO: handle partial results per Result by checking partial_flag_per_result results[i] = std::make_shared<Result>(vcells, false, scan_resp->stale(), false); } else { - results[i] = ToResult(scan_resp->results(i), resp.cell_scanner()); + results[i] = ToResult(scan_resp->results(i), cell_scanner); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/response-converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h index 0fdde89..b518d1c 100644 --- a/hbase-native-client/core/response-converter.h +++ b/hbase-native-client/core/response-converter.h @@ -39,7 +39,7 @@ class ResponseConverter { ~ResponseConverter(); static std::shared_ptr<Result> ToResult(const hbase::pb::Result& result, - const std::unique_ptr<CellScanner>& cell_scanner); + const std::shared_ptr<CellScanner> cell_scanner); /** * @brief Returns a Result object created by PB Message in passed Response object. @@ -51,6 +51,9 @@ class ResponseConverter { static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response& resp); + static std::vector<std::shared_ptr<Result>> FromScanResponse( + const std::shared_ptr<pb::ScanResponse> resp, std::shared_ptr<CellScanner> cell_scanner); + static std::unique_ptr<hbase::MultiResponse> GetResults(std::shared_ptr<Request> req, const Response& resp); http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/result-scanner.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/result-scanner.h b/hbase-native-client/core/result-scanner.h new file mode 100644 index 0000000..9460521 --- /dev/null +++ b/hbase-native-client/core/result-scanner.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <functional> +#include <iterator> +#include <map> +#include <memory> +#include <string> +#include <vector> + +#include "core/cell.h" +#include "core/result.h" + +namespace hbase { + +/** + * Interface for client-side scanning. Use Table to obtain instances. + */ +class ResultScanner { + // TODO: should we implement forward iterators? + + public: + virtual ~ResultScanner() {} + + virtual void Close() = 0; + + virtual std::shared_ptr<Result> Next() = 0; +}; +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/result-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/result-test.cc b/hbase-native-client/core/result-test.cc index 520f4b9..3299ffc 100644 --- a/hbase-native-client/core/result-test.cc +++ b/hbase-native-client/core/result-test.cc @@ -17,6 +17,7 @@ * */ +#include <glog/logging.h> #include <gtest/gtest.h> #include <limits> #include <memory> @@ -71,7 +72,7 @@ void PopulateCells(std::vector<std::shared_ptr<Cell> > &cells) { } default: { cells.push_back(std::make_shared<Cell>( - row, family, column, std::numeric_limits<long>::max(), value, CellType::PUT)); + row, family, column, std::numeric_limits<int64_t>::max(), value, CellType::PUT)); } } } @@ -255,7 +256,7 @@ TEST(Result, FilledResult) { break; } default: { - EXPECT_EQ(std::numeric_limits<long>::max(), version_map.first); + EXPECT_EQ(std::numeric_limits<int64_t>::max(), version_map.first); EXPECT_EQ(value, version_map.second); } } @@ -297,3 +298,24 @@ TEST(Result, FilledResult) { EXPECT_EQ("value-9", qual_val_map.second); } } + +TEST(Result, ResultEstimatedSize) { + CellType cell_type = CellType::PUT; + int64_t timestamp = std::numeric_limits<int64_t>::max(); + std::vector<std::shared_ptr<Cell> > cells; + Result empty(cells, true, false, false); + + EXPECT_EQ(empty.EstimatedSize(), sizeof(Result)); + + cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type)); + Result result1(cells, true, false, false); + EXPECT_TRUE(result1.EstimatedSize() > empty.EstimatedSize()); + + cells.push_back(std::make_shared<Cell>("a", "a", "", timestamp, "", cell_type)); + Result result2(cells, true, false, false); + EXPECT_TRUE(result2.EstimatedSize() > result1.EstimatedSize()); + + LOG(INFO) << empty.EstimatedSize(); + LOG(INFO) << result1.EstimatedSize(); + LOG(INFO) << result2.EstimatedSize(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/result.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/result.cc b/hbase-native-client/core/result.cc index 9d9ddb3..44b4c86 100644 --- a/hbase-native-client/core/result.cc +++ b/hbase-native-client/core/result.cc @@ -25,13 +25,7 @@ Result::~Result() {} Result::Result(const std::vector<std::shared_ptr<Cell> > &cells, bool exists, bool stale, bool partial) - : exists_(exists), stale_(stale), partial_(partial) { - for (const auto &cell : cells) { - cells_.push_back(cell); - // We create the map when cells are added. unlike java where map is created - // when result.getMap() is called - result_map_[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value(); - } + : exists_(exists), stale_(stale), partial_(partial), cells_(cells) { row_ = (cells_.size() == 0 ? "" : cells_[0]->Row()); } @@ -43,10 +37,10 @@ Result::Result(const Result &result) { if (!result.cells_.empty()) { for (const auto &cell : result.cells_) { cells_.push_back(cell); - result_map_[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value(); } } } + const std::vector<std::shared_ptr<Cell> > &Result::Cells() const { return cells_; } std::vector<std::shared_ptr<Cell> > Result::ColumnCells(const std::string &family, @@ -74,13 +68,12 @@ const std::shared_ptr<Cell> Result::ColumnLatestCell(const std::string &family, return nullptr; } -std::shared_ptr<std::string> Result::Value(const std::string &family, - const std::string &qualifier) const { +optional<std::string> Result::Value(const std::string &family, const std::string &qualifier) const { std::shared_ptr<Cell> latest_cell(ColumnLatestCell(family, qualifier)); if (latest_cell.get()) { - return std::make_shared<std::string>(latest_cell->Value()); + return optional<std::string>(latest_cell->Value()); } - return nullptr; + return optional<std::string>(); } bool Result::IsEmpty() const { return cells_.empty(); } @@ -89,24 +82,33 @@ const std::string &Result::Row() const { return row_; } int Result::Size() const { return cells_.size(); } -const ResultMap &Result::Map() const { return result_map_; } +ResultMap Result::Map() const { + ResultMap result_map; + for (const auto &cell : cells_) { + result_map[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value(); + } + return result_map; +} -const std::map<std::string, std::string> Result::FamilyMap(const std::string &family) const { +std::map<std::string, std::string> Result::FamilyMap(const std::string &family) const { std::map<std::string, std::string> family_map; if (!IsEmpty()) { - for (auto itr = result_map_.begin(); itr != result_map_.end(); ++itr) { - if (family == itr->first) { - for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) { - for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) { - // We break after inserting the first value. Result.java takes only - // the first value - family_map[qitr->first] = vitr->second; - break; - } - } + auto result_map = Map(); + auto itr = result_map.find(family); + if (itr == result_map.end()) { + return family_map; + } + + for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) { + for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) { + // We break after inserting the first value. Result.java takes only + // the first value + family_map[qitr->first] = vitr->second; + break; } } } + return family_map; } @@ -131,4 +133,14 @@ std::string Result::DebugString() const { return ret; } +size_t Result::EstimatedSize() const { + size_t s = sizeof(Result); + s += row_.capacity(); + for (const auto c : cells_) { + s += sizeof(std::shared_ptr<Cell>); + s + c->EstimatedSize(); + } + return s; +} + } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/result.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/result.h b/hbase-native-client/core/result.h index 627d161..f18071b 100644 --- a/hbase-native-client/core/result.h +++ b/hbase-native-client/core/result.h @@ -26,6 +26,7 @@ #include <vector> #include "core/cell.h" +#include "utils/optional.h" namespace hbase { @@ -79,7 +80,7 @@ class Result { * @param family - column family * @param qualifier - column qualifier */ - std::shared_ptr<std::string> Value(const std::string &family, const std::string &qualifier) const; + optional<std::string> Value(const std::string &family, const std::string &qualifier) const; /** * @brief Returns if the underlying Cell vector is empty or not @@ -104,23 +105,32 @@ class Result { * All other map returning methods make use of this map internally * The Map is created when the Result instance is created */ - const ResultMap &Map() const; + ResultMap Map() const; /** * @brief Map of qualifiers to values. * Returns a Map of the form: Map<qualifier,value> * @param family - column family to get */ - const std::map<std::string, std::string> FamilyMap(const std::string &family) const; + std::map<std::string, std::string> FamilyMap(const std::string &family) const; std::string DebugString() const; + bool Exists() const { return exists_; } + + bool Stale() const { return stale_; } + + bool Partial() const { return partial_; } + + /** Returns estimated size of the Result object including deep heap space usage + * of its Cells and data. Notice that this is a very rough estimate. */ + size_t EstimatedSize() const; + private: bool exists_ = false; bool stale_ = false; bool partial_ = false; std::string row_ = ""; std::vector<std::shared_ptr<Cell> > cells_; - ResultMap result_map_; }; } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan-result-cache-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scan-result-cache-test.cc b/hbase-native-client/core/scan-result-cache-test.cc new file mode 100644 index 0000000..0bf83ce --- /dev/null +++ b/hbase-native-client/core/scan-result-cache-test.cc @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <folly/Conv.h> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include <vector> + +#include "core/cell.h" +#include "core/result.h" +#include "core/scan-result-cache.h" + +using hbase::ScanResultCache; +using hbase::Result; +using hbase::Cell; +using hbase::CellType; + +using ResultVector = std::vector<std::shared_ptr<Result>>; + +std::shared_ptr<Cell> CreateCell(const int32_t &key, const std::string &family, + const std::string &column) { + auto row = folly::to<std::string>(key); + return std::make_shared<Cell>(row, family, column, std::numeric_limits<int64_t>::max(), row, + CellType::PUT); +} + +std::shared_ptr<Result> CreateResult(std::shared_ptr<Cell> cell, bool partial) { + return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{cell}, false, false, partial); +} + +TEST(ScanResultCacheTest, NoPartial) { + ScanResultCache cache; + ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, false)); + ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, true)); + int32_t count = 10; + ResultVector results{}; + for (int32_t i = 0; i < count; i++) { + results.push_back(CreateResult(CreateCell(i, "cf", "cq1"), false)); + } + ASSERT_EQ(results, cache.AddAndGet(results, false)); +} + +TEST(ScanResultCacheTest, Combine1) { + ScanResultCache cache; + auto prev_result = CreateResult(CreateCell(0, "cf", "cq1"), true); + auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); + auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true); + auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true); + auto results = cache.AddAndGet(ResultVector{prev_result, result1}, false); + ASSERT_EQ(1L, results.size()); + ASSERT_EQ(prev_result, results[0]); + + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{}, true).size()); + + results = cache.AddAndGet(ResultVector{}, false); + ASSERT_EQ(1, results.size()); + ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(3, results[0]->Cells().size()); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3"))); +} + +TEST(ScanResultCacheTest, Combine2) { + ScanResultCache cache; + auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); + auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true); + auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true); + + auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true); + auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq2"), false); + + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size()); + + auto results = cache.AddAndGet(ResultVector{next_result1}, false); + ASSERT_EQ(1, results.size()); + ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(3, results[0]->Cells().size()); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3"))); + + results = cache.AddAndGet(ResultVector{next_to_next_result1}, false); + ASSERT_EQ(2, results.size()); + ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(1, results[0]->Cells().size()); + ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(3, folly::to<int32_t>(results[1]->Row())); + ASSERT_EQ(1, results[1]->Cells().size()); + ASSERT_EQ(3, folly::to<int32_t>(*results[1]->Value("cf", "cq2"))); +} + +TEST(ScanResultCacheTest, Combine3) { + ScanResultCache cache; + auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); + auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true); + auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), false); + auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq1"), true); + + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size()); + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size()); + + auto results = cache.AddAndGet(ResultVector{next_result1, next_to_next_result1}, false); + + ASSERT_EQ(2, results.size()); + ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(2, results[0]->Cells().size()); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); + ASSERT_EQ(2, folly::to<int32_t>(results[1]->Row())); + ASSERT_EQ(1, results[1]->Cells().size()); + ASSERT_EQ(2, folly::to<int32_t>(*results[1]->Value("cf", "cq1"))); + + results = cache.AddAndGet(ResultVector{}, false); + + ASSERT_EQ(1, results.size()); + ASSERT_EQ(3, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(1, results[0]->Cells().size()); + ASSERT_EQ(3, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); +} + +TEST(ScanResultCacheTest, Combine4) { + ScanResultCache cache; + auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); + auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), false); + auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true); + auto next_result2 = CreateResult(CreateCell(2, "cf", "cq2"), false); + + ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size()); + + auto results = cache.AddAndGet(ResultVector{result2, next_result1}, false); + + ASSERT_EQ(1, results.size()); + ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(2, results[0]->Cells().size()); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); + + results = cache.AddAndGet(ResultVector{next_result2}, false); + + ASSERT_EQ(1, results.size()); + ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row())); + ASSERT_EQ(2, results[0]->Cells().size()); + ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); + ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); +} + +TEST(ScanResultCacheTest, SizeOf) { + std::string e{""}; + std::string f{"f"}; + std::string foo{"foo"}; + + LOG(INFO) << sizeof(e) << " " << e.capacity(); + LOG(INFO) << sizeof(f) << " " << f.capacity(); + LOG(INFO) << sizeof(foo) << " " << foo.capacity(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan-result-cache.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scan-result-cache.cc b/hbase-native-client/core/scan-result-cache.cc new file mode 100644 index 0000000..62a51e0 --- /dev/null +++ b/hbase-native-client/core/scan-result-cache.cc @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/scan-result-cache.h" +#include <algorithm> +#include <iterator> +#include <limits> +#include <stdexcept> + +namespace hbase { +/** + * Add the given results to cache and get valid results back. + * @param results the results of a scan next. Must not be null. + * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response. + * @return valid results, never null. + */ +std::vector<std::shared_ptr<Result>> ScanResultCache::AddAndGet( + const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat) { + // If no results were returned it indicates that either we have the all the partial results + // necessary to construct the complete result or the server had to send a heartbeat message + // to the client to keep the client-server connection alive + if (results.empty()) { + // If this response was an empty heartbeat message, then we have not exhausted the region + // and thus there may be more partials server side that still need to be added to the partial + // list before we form the complete Result + if (!partial_results_.empty() && !is_hearthbeat) { + return UpdateNumberOfCompleteResultsAndReturn( + std::vector<std::shared_ptr<Result>>{Combine()}); + } + return std::vector<std::shared_ptr<Result>>{}; + } + // In every RPC response there should be at most a single partial result. Furthermore, if + // there is a partial result, it is guaranteed to be in the last position of the array. + auto last = results[results.size() - 1]; + if (last->Partial()) { + if (partial_results_.empty()) { + partial_results_.push_back(last); + std::vector<std::shared_ptr<Result>> new_results; + std::copy_n(results.begin(), results.size() - 1, std::back_inserter(new_results)); + return UpdateNumberOfCompleteResultsAndReturn(new_results); + } + // We have only one result and it is partial + if (results.size() == 1) { + // check if there is a row change + if (partial_results_.at(0)->Row() == last->Row()) { + partial_results_.push_back(last); + return std::vector<std::shared_ptr<Result>>{}; + } + auto complete_result = Combine(); + partial_results_.push_back(last); + return UpdateNumberOfCompleteResultsAndReturn(complete_result); + } + // We have some complete results + auto results_to_return = PrependCombined(results, results.size() - 1); + partial_results_.push_back(last); + return UpdateNumberOfCompleteResultsAndReturn(results_to_return); + } + if (!partial_results_.empty()) { + return UpdateNumberOfCompleteResultsAndReturn(PrependCombined(results, results.size())); + } + return UpdateNumberOfCompleteResultsAndReturn(results); +} + +void ScanResultCache::Clear() { partial_results_.clear(); } + +std::shared_ptr<Result> ScanResultCache::CreateCompleteResult( + const std::vector<std::shared_ptr<Result>> &partial_results) { + if (partial_results.empty()) { + return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{}, false, false, false); + } + std::vector<std::shared_ptr<Cell>> cells{}; + bool stale = false; + std::string prev_row = ""; + std::string current_row = ""; + size_t i = 0; + for (const auto &r : partial_results) { + current_row = r->Row(); + if (i != 0 && prev_row != current_row) { + throw new std::runtime_error( + "Cannot form complete result. Rows of partial results do not match."); + } + // Ensure that all Results except the last one are marked as partials. The last result + // may not be marked as a partial because Results are only marked as partials when + // the scan on the server side must be stopped due to reaching the maxResultSize. + // Visualizing it makes it easier to understand: + // maxResultSize: 2 cells + // (-x-) represents cell number x in a row + // Example: row1: -1- -2- -3- -4- -5- (5 cells total) + // How row1 will be returned by the server as partial Results: + // Result1: -1- -2- (2 cells, size limit reached, mark as partial) + // Result2: -3- -4- (2 cells, size limit reached, mark as partial) + // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) + if (i != partial_results.size() - 1 && !r->Partial()) { + throw new std::runtime_error("Cannot form complete result. Result is missing partial flag."); + } + prev_row = current_row; + stale = stale || r->Stale(); + for (const auto &c : r->Cells()) { + cells.push_back(c); + } + i++; + } + + return std::make_shared<Result>(cells, false, stale, false); +} + +std::shared_ptr<Result> ScanResultCache::Combine() { + auto result = CreateCompleteResult(partial_results_); + partial_results_.clear(); + return result; +} + +std::vector<std::shared_ptr<Result>> ScanResultCache::PrependCombined( + const std::vector<std::shared_ptr<Result>> &results, int length) { + if (length == 0) { + return std::vector<std::shared_ptr<Result>>{Combine()}; + } + // the last part of a partial result may not be marked as partial so here we need to check if + // there is a row change. + size_t start; + if (partial_results_[0]->Row() == results[0]->Row()) { + partial_results_.push_back(results[0]); + start = 1; + length--; + } else { + start = 0; + } + std::vector<std::shared_ptr<Result>> prepend_results{}; + prepend_results.push_back(Combine()); + std::copy_n(results.begin() + start, length, std::back_inserter(prepend_results)); + return prepend_results; +} + +std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn( + const std::shared_ptr<Result> &result) { + return UpdateNumberOfCompleteResultsAndReturn(std::vector<std::shared_ptr<Result>>{result}); +} + +std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn( + const std::vector<std::shared_ptr<Result>> &results) { + num_complete_rows_ += results.size(); + return results; +} +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan-result-cache.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scan-result-cache.h b/hbase-native-client/core/scan-result-cache.h new file mode 100644 index 0000000..5d3d0ab --- /dev/null +++ b/hbase-native-client/core/scan-result-cache.h @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/Logging.h> +#include <algorithm> +#include <chrono> +#include <iterator> +#include <memory> +#include <string> +#include <vector> + +#include "core/result.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" + +namespace hbase { + +class ScanResultCache { + // In Java, there are 3 different implementations for this. We are not doing partial results, + // or scan batching in native code for now, so this version is simpler and + // only deals with giving back complete rows as Result. It is more or less implementation + // of CompleteScanResultCache.java + + public: + /** + * Add the given results to cache and get valid results back. + * @param results the results of a scan next. Must not be null. + * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response. + * @return valid results, never null. + */ + std::vector<std::shared_ptr<Result>> AddAndGet( + const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat); + + void Clear(); + + int64_t num_complete_rows() const { return num_complete_rows_; } + + private: + /** + * Forms a single result from the partial results in the partialResults list. This method is + * useful for reconstructing partial results on the client side. + * @param partial_results list of partial results + * @return The complete result that is formed by combining all of the partial results together + */ + static std::shared_ptr<Result> CreateCompleteResult( + const std::vector<std::shared_ptr<Result>> &partial_results); + + std::shared_ptr<Result> Combine(); + + std::vector<std::shared_ptr<Result>> PrependCombined( + const std::vector<std::shared_ptr<Result>> &results, int length); + + std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn( + const std::shared_ptr<Result> &result); + + std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn( + const std::vector<std::shared_ptr<Result>> &results); + + private: + std::vector<std::shared_ptr<Result>> partial_results_; + int64_t num_complete_rows_ = 0; +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scan-test.cc b/hbase-native-client/core/scan-test.cc index 73fb6df..0ee054c 100644 --- a/hbase-native-client/core/scan-test.cc +++ b/hbase-native-client/core/scan-test.cc @@ -17,13 +17,13 @@ * */ -#include "core/scan.h" - +#include <gtest/gtest.h> #include <limits> -#include <gtest/gtest.h> +#include "core/scan.h" -using namespace hbase; +using hbase::Get; +using hbase::Scan; void CheckFamilies(Scan &scan) { EXPECT_EQ(false, scan.HasFamilies()); @@ -74,7 +74,7 @@ void CheckFamilies(Scan &scan) { EXPECT_EQ(it, scan.FamilyMap().end()); } -void CheckFamiliesAfterCopy(Scan &scan) { +void CheckFamiliesAfterCopy(const Scan &scan) { EXPECT_EQ(true, scan.HasFamilies()); EXPECT_EQ(3, scan.FamilyMap().size()); int i = 1; @@ -116,11 +116,6 @@ void ScanMethods(Scan &scan) { scan.SetStopRow(stop_row); EXPECT_EQ(stop_row, scan.StopRow()); - scan.SetSmall(true); - EXPECT_EQ(true, scan.IsSmall()); - scan.SetSmall(false); - EXPECT_EQ(false, scan.IsSmall()); - scan.SetCaching(3); EXPECT_EQ(3, scan.Caching()); http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scan.cc b/hbase-native-client/core/scan.cc index 5f315ec..6dcc51b 100644 --- a/hbase-native-client/core/scan.cc +++ b/hbase-native-client/core/scan.cc @@ -38,7 +38,7 @@ Scan::Scan(const std::string &start_row, const std::string &stop_row) CheckRow(stop_row_); } -Scan::Scan(const Scan &scan) { +Scan::Scan(const Scan &scan) : Query(scan) { start_row_ = scan.start_row_; stop_row_ = scan.stop_row_; max_versions_ = scan.max_versions_; @@ -47,7 +47,6 @@ Scan::Scan(const Scan &scan) { cache_blocks_ = scan.cache_blocks_; load_column_families_on_demand_ = scan.load_column_families_on_demand_; reversed_ = scan.reversed_; - small_ = scan.small_; allow_partial_results_ = scan.allow_partial_results_; consistency_ = scan.consistency_; tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp())); @@ -55,6 +54,7 @@ Scan::Scan(const Scan &scan) { } Scan &Scan::operator=(const Scan &scan) { + Query::operator=(scan); start_row_ = scan.start_row_; stop_row_ = scan.stop_row_; max_versions_ = scan.max_versions_; @@ -63,7 +63,6 @@ Scan &Scan::operator=(const Scan &scan) { cache_blocks_ = scan.cache_blocks_; load_column_families_on_demand_ = scan.load_column_families_on_demand_; reversed_ = scan.reversed_; - small_ = scan.small_; allow_partial_results_ = scan.allow_partial_results_; consistency_ = scan.consistency_; tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp())); @@ -109,24 +108,14 @@ void Scan::SetReversed(bool reversed) { reversed_ = reversed; } bool Scan::IsReversed() const { return reversed_; } -void Scan::SetStartRow(const std::string &start_row) { - CheckRow(start_row); - start_row_ = start_row; -} +void Scan::SetStartRow(const std::string &start_row) { start_row_ = start_row; } const std::string &Scan::StartRow() const { return start_row_; } -void Scan::SetStopRow(const std::string &stop_row) { - CheckRow(stop_row); - stop_row_ = stop_row; -} +void Scan::SetStopRow(const std::string &stop_row) { stop_row_ = stop_row; } const std::string &Scan::StopRow() const { return stop_row_; } -void Scan::SetSmall(bool small) { small_ = small; } - -bool Scan::IsSmall() const { return small_; } - void Scan::SetCaching(int caching) { caching_ = caching; } int Scan::Caching() const { return caching_; } http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scan.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scan.h b/hbase-native-client/core/scan.h index fb302b7..1085c4b 100644 --- a/hbase-native-client/core/scan.h +++ b/hbase-native-client/core/scan.h @@ -119,16 +119,6 @@ class Scan : public Query { const std::string &StopRow() const; /** - * @brief Set whether this scan is a small scan. - */ - void SetSmall(bool small); - - /** - * @brief Returns if the scan is a small scan. - */ - bool IsSmall() const; - - /** * @brief Set the number of rows for caching that will be passed to scanners. * Higher caching values will enable faster scanners but will use more memory. * @param caching - the number of rows for caching. @@ -258,12 +248,11 @@ class Scan : public Query { std::string start_row_ = ""; std::string stop_row_ = ""; uint32_t max_versions_ = 1; - int caching_ = -1; + int32_t caching_ = -1; int64_t max_result_size_ = -1; bool cache_blocks_ = true; bool load_column_families_on_demand_ = false; bool reversed_ = false; - bool small_ = false; bool allow_partial_results_ = false; hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG; std::unique_ptr<TimeRange> tr_ = std::make_unique<TimeRange>(); http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/scanner-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scanner-test.cc b/hbase-native-client/core/scanner-test.cc new file mode 100644 index 0000000..1ecbd02 --- /dev/null +++ b/hbase-native-client/core/scanner-test.cc @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <folly/Conv.h> +#include <gtest/gtest.h> + +#include <chrono> +#include <thread> +#include <vector> + +#include "core/async-client-scanner.h" +#include "core/async-table-result-scanner.h" +#include "core/cell.h" +#include "core/client.h" +#include "core/configuration.h" +#include "core/filter.h" +#include "core/get.h" +#include "core/hbase-configuration-loader.h" +#include "core/put.h" +#include "core/result.h" +#include "core/row.h" +#include "core/table.h" +#include "if/Comparator.pb.h" +#include "if/Filter.pb.h" +#include "serde/table-name.h" +#include "test-util/test-util.h" +#include "utils/time-util.h" + +using hbase::Cell; +using hbase::ComparatorFactory; +using hbase::Comparator; +using hbase::Configuration; +using hbase::Get; +using hbase::Put; +using hbase::Result; +using hbase::Scan; +using hbase::Table; +using hbase::TestUtil; +using hbase::TimeUtil; +using hbase::AsyncClientScanner; +using hbase::AsyncTableResultScanner; +using hbase::FilterFactory; +using hbase::pb::CompareType; + +class ScannerTest : public ::testing::Test { + public: + static std::unique_ptr<hbase::TestUtil> test_util; + static const uint32_t num_rows; + + static void SetUpTestCase() { + google::InstallFailureSignalHandler(); + test_util = std::make_unique<hbase::TestUtil>(); + test_util->StartMiniCluster(2); + } +}; +std::unique_ptr<hbase::TestUtil> ScannerTest::test_util = nullptr; +const uint32_t ScannerTest::num_rows = 1000; + +std::string Family(uint32_t i) { return "f" + folly::to<std::string>(i); } + +std::string Row(uint32_t i, int width) { + std::ostringstream s; + s.fill('0'); + s.width(width); + s << i; + return "row" + s.str(); +} + +std::string Row(uint32_t i) { return Row(i, 3); } + +std::unique_ptr<Put> MakePut(const std::string &row, uint32_t num_families) { + auto put = std::make_unique<Put>(row); + + for (uint32_t i = 0; i < num_families; i++) { + put->AddColumn(Family(i), "q1", row); + put->AddColumn(Family(i), "q2", row + "-" + row); + } + + return std::move(put); +} + +void CheckResult(const Result &r, std::string expected_row, uint32_t num_families) { + VLOG(1) << r.DebugString(); + auto row = r.Row(); + ASSERT_EQ(row, expected_row); + ASSERT_EQ(r.Cells().size(), num_families * 2); + for (uint32_t i = 0; i < num_families; i++) { + ASSERT_EQ(*r.Value(Family(i), "q1"), row); + ASSERT_EQ(*r.Value(Family(i), "q2"), row + "-" + row); + } +} + +void CreateTable(std::string table_name, uint32_t num_families, uint32_t num_rows, + int32_t num_regions) { + LOG(INFO) << "Creating the table " << table_name + << " with num_regions:" << folly::to<std::string>(num_regions); + std::vector<std::string> families; + for (uint32_t i = 0; i < num_families; i++) { + families.push_back(Family(i)); + } + if (num_regions <= 1) { + ScannerTest::test_util->CreateTable(table_name, families); + } else { + std::vector<std::string> keys; + for (int32_t i = 0; i < num_regions - 1; i++) { + keys.push_back(Row(i * (num_rows / (num_regions - 1)))); + LOG(INFO) << "Split key:" << keys[keys.size() - 1]; + } + ScannerTest::test_util->CreateTable(table_name, families, keys); + } +} + +std::unique_ptr<hbase::Client> CreateTableAndWriteData(std::string table_name, + uint32_t num_families, uint32_t num_rows, + int32_t num_regions) { + CreateTable(table_name, num_families, num_rows, num_regions); + auto tn = folly::to<hbase::pb::TableName>(table_name); + auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf()); + auto table = client->Table(tn); + + LOG(INFO) << "Writing data to the table, num_rows:" << num_rows; + // Perform Puts + for (uint32_t i = 0; i < num_rows; i++) { + table->Put(*MakePut(Row(i), num_families)); + } + return std::move(client); +} + +void TestScan(const Scan &scan, uint32_t num_families, int32_t start, int32_t num_rows, + Table *table) { + LOG(INFO) << "Starting scan for the test with start:" << scan.StartRow() + << ", stop:" << scan.StopRow() << " expected_num_rows:" << num_rows; + auto scanner = table->Scan(scan); + + uint32_t i = start; + auto r = scanner->Next(); + while (r != nullptr) { + CheckResult(*r, Row(i++), num_families); + r = scanner->Next(); + } + ASSERT_EQ(i - start, num_rows); +} + +void TestScan(const Scan &scan, int32_t start, int32_t num_rows, Table *table) { + TestScan(scan, 1, start, num_rows, table); +} + +void TestScan(uint32_t num_families, int32_t start, int32_t stop, int32_t num_rows, Table *table) { + Scan scan{}; + if (start >= 0) { + scan.SetStartRow(Row(start)); + } else { + start = 0; // neded for below logic + } + if (stop >= 0) { + scan.SetStopRow(Row(stop)); + } + + TestScan(scan, num_families, start, num_rows, table); +} + +void TestScan(int32_t start, int32_t stop, int32_t num_rows, Table *table) { + TestScan(1, start, stop, num_rows, table); +} + +void TestScan(uint32_t num_families, std::string start, std::string stop, int32_t num_rows, + Table *table) { + Scan scan{}; + + scan.SetStartRow(start); + scan.SetStopRow(stop); + + LOG(INFO) << "Starting scan for the test with start:" << start << ", stop:" << stop + << " expected_num_rows:" << num_rows; + auto scanner = table->Scan(scan); + + uint32_t i = 0; + auto r = scanner->Next(); + while (r != nullptr) { + VLOG(1) << r->DebugString(); + i++; + ASSERT_EQ(r->Map().size(), num_families); + r = scanner->Next(); + } + ASSERT_EQ(i, num_rows); +} + +void TestScan(std::string start, std::string stop, int32_t num_rows, Table *table) { + TestScan(1, start, stop, num_rows, table); +} + +void TestScanCombinations(Table *table, uint32_t num_families) { + // full table + TestScan(num_families, -1, -1, 1000, table); + TestScan(num_families, -1, 999, 999, table); + TestScan(num_families, 0, -1, 1000, table); + TestScan(num_families, 0, 999, 999, table); + TestScan(num_families, 10, 990, 980, table); + TestScan(num_families, 1, 998, 997, table); + + TestScan(num_families, 123, 345, 222, table); + TestScan(num_families, 234, 456, 222, table); + TestScan(num_families, 345, 567, 222, table); + TestScan(num_families, 456, 678, 222, table); + + // single results + TestScan(num_families, 111, 111, 1, table); // split keys are like 111, 222, 333, etc + TestScan(num_families, 111, 112, 1, table); + TestScan(num_families, 332, 332, 1, table); + TestScan(num_families, 332, 333, 1, table); + TestScan(num_families, 333, 333, 1, table); + TestScan(num_families, 333, 334, 1, table); + TestScan(num_families, 42, 42, 1, table); + TestScan(num_families, 921, 921, 1, table); + TestScan(num_families, 0, 0, 1, table); + TestScan(num_families, 0, 1, 1, table); + TestScan(num_families, 999, 999, 1, table); + + // few results + TestScan(num_families, 0, 0, 1, table); + TestScan(num_families, 0, 2, 2, table); + TestScan(num_families, 0, 5, 5, table); + TestScan(num_families, 10, 15, 5, table); + TestScan(num_families, 105, 115, 10, table); + TestScan(num_families, 111, 221, 110, table); + TestScan(num_families, 111, 222, 111, table); // crossing region boundary 111-222 + TestScan(num_families, 111, 223, 112, table); + TestScan(num_families, 111, 224, 113, table); + TestScan(num_families, 990, 999, 9, table); + TestScan(num_families, 900, 998, 98, table); + + // empty results + TestScan(num_families, "a", "a", 0, table); + TestScan(num_families, "a", "r", 0, table); + TestScan(num_families, "", "r", 0, table); + TestScan(num_families, "s", "", 0, table); + TestScan(num_families, "s", "z", 0, table); + TestScan(num_families, Row(110) + "a", Row(111), 0, table); + TestScan(num_families, Row(111) + "a", Row(112), 0, table); + TestScan(num_families, Row(123) + "a", Row(124), 0, table); + + // custom + TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table); + TestScan(num_families, Row(0, 3), Row(0, 4), 1, table); + TestScan(num_families, Row(999, 3), Row(9999, 4), 1, table); + TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table); + TestScan(num_families, Row(0, 3), Row(9999, 4), 1000, table); + TestScan(num_families, "a", "z", 1000, table); +} + +// some of these tests are from TestAsyncTableScan* and some from TestFromClientSide* and +// TestScannersFromClientSide* + +TEST_F(ScannerTest, SingleRegionScan) { + auto client = CreateTableAndWriteData("t_single_region_scan", 1, num_rows, 1); + auto table = client->Table(folly::to<hbase::pb::TableName>("t_single_region_scan")); + + TestScanCombinations(table.get(), 1); +} + +TEST_F(ScannerTest, MultiRegionScan) { + auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 10); + auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan")); + + TestScanCombinations(table.get(), 1); +} + +TEST_F(ScannerTest, ScanWithPauses) { + auto max_result_size = + ScannerTest::test_util->conf()->GetInt("hbase.client.scanner.max.result.size", 2097152); + ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", 100); + auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 5); + auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan")); + + VLOG(1) << "Starting scan for the test"; + Scan scan{}; + scan.SetCaching(100); + auto scanner = table->Scan(scan); + + uint32_t i = 0; + auto r = scanner->Next(); + while (r != nullptr) { + CheckResult(*r, Row(i++), 1); + r = scanner->Next(); + std::this_thread::sleep_for(TimeUtil::MillisToNanos(10)); + } + + auto s = static_cast<AsyncTableResultScanner *>(scanner.get()); + ASSERT_GT(s->num_prefetch_stopped(), 0); + + ASSERT_EQ(i, num_rows); + ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", max_result_size); +} + +TEST_F(ScannerTest, ScanWithFilters) { + auto client = CreateTableAndWriteData("t_scan_with_filters", 1, num_rows, 1); + auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_with_filters")); + + Scan scan{}; + scan.SetFilter(FilterFactory::ValueFilter(CompareType::GREATER_OR_EQUAL, + *ComparatorFactory::BinaryComparator(Row(800)))); + + TestScan(scan, 800, 200, table.get()); +} + +TEST_F(ScannerTest, ScanMultiFamily) { + auto client = CreateTableAndWriteData("t_scan_multi_family", 3, num_rows, 1); + auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_multi_family")); + + TestScanCombinations(table.get(), 3); +} + +TEST_F(ScannerTest, ScanNullQualifier) { + std::string table_name{"t_scan_null_qualifier"}; + std::string row{"row"}; + CreateTable(table_name, 1, 1, 1); + + auto tn = folly::to<hbase::pb::TableName>(table_name); + auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf()); + auto table = client->Table(tn); + + // Perform Puts + Put put{row}; + put.AddColumn(Family(0), "q1", row); + put.AddColumn(Family(0), "", row); + table->Put(put); + + Scan scan1{}; + scan1.AddColumn(Family(0), ""); + auto scanner1 = table->Scan(scan1); + auto r1 = scanner1->Next(); + ASSERT_EQ(r1->Cells().size(), 1); + ASSERT_EQ(scanner1->Next(), nullptr); + + Scan scan2{}; + scan2.AddFamily(Family(0)); + auto scanner2 = table->Scan(scan2); + auto r2 = scanner2->Next(); + ASSERT_EQ(r2->Cells().size(), 2); + ASSERT_EQ(scanner2->Next(), nullptr); +} + +TEST_F(ScannerTest, ScanNoResults) { + std::string table_name{"t_scan_no_results"}; + auto client = CreateTableAndWriteData(table_name, 1, num_rows, 3); + auto table = client->Table(folly::to<hbase::pb::TableName>(table_name)); + + Scan scan{}; + scan.AddColumn(Family(0), "non_existing_qualifier"); + + TestScan(scan, 0, 0, table.get()); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/simple-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index 3a7d62b..f79d848 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -30,6 +30,7 @@ #include "core/client.h" #include "core/get.h" #include "core/put.h" +#include "core/scan.h" #include "core/table.h" #include "serde/server-name.h" #include "serde/table-name.h" @@ -38,6 +39,7 @@ using hbase::Client; using hbase::Configuration; using hbase::Get; +using hbase::Scan; using hbase::Put; using hbase::Table; using hbase::pb::TableName; @@ -48,6 +50,10 @@ DEFINE_string(table, "test_table", "What table to do the reads or writes"); DEFINE_string(row, "row_", "row prefix"); DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); DEFINE_uint64(num_rows, 10000, "How many rows to write and read"); +DEFINE_bool(puts, true, "Whether to perform puts"); +DEFINE_bool(gets, true, "Whether to perform gets"); +DEFINE_bool(multigets, true, "Whether to perform multi-gets"); +DEFINE_bool(scans, true, "Whether to perform scans"); DEFINE_bool(display_results, false, "Whether to display the Results from Gets"); DEFINE_int32(threads, 6, "How many cpu threads"); @@ -86,41 +92,72 @@ int main(int argc, char *argv[]) { auto start_ns = TimeUtil::GetNowNanos(); // Do the Put requests - for (uint64_t i = 0; i < num_puts; i++) { - table->Put(*MakePut(Row(FLAGS_row, i))); - } + if (FLAGS_puts) { + LOG(INFO) << "Sending put requests"; + for (uint64_t i = 0; i < num_puts; i++) { + table->Put(*MakePut(Row(FLAGS_row, i))); + } - LOG(INFO) << "Successfully sent " << num_puts << " Put requests in " - << TimeUtil::ElapsedMillis(start_ns) << " ms."; + LOG(INFO) << "Successfully sent " << num_puts << " Put requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } // Do the Get requests - start_ns = TimeUtil::GetNowNanos(); - for (uint64_t i = 0; i < num_puts; i++) { - auto result = table->Get(Get{Row(FLAGS_row, i)}); - if (FLAGS_display_results) { - LOG(INFO) << result->DebugString(); + if (FLAGS_gets) { + LOG(INFO) << "Sending get requests"; + start_ns = TimeUtil::GetNowNanos(); + for (uint64_t i = 0; i < num_puts; i++) { + auto result = table->Get(Get{Row(FLAGS_row, i)}); + if (FLAGS_display_results) { + LOG(INFO) << result->DebugString(); + } } - } - LOG(INFO) << "Successfully sent " << num_puts << " Get requests in " - << TimeUtil::ElapsedMillis(start_ns) << " ms."; + LOG(INFO) << "Successfully sent " << num_puts << " Get requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } // Do the Multi-Gets - std::vector<hbase::Get> gets; - for (uint64_t i = 0; i < num_puts; ++i) { - hbase::Get get(Row(FLAGS_row, i)); - gets.push_back(get); - } + if (FLAGS_multigets) { + std::vector<hbase::Get> gets; + for (uint64_t i = 0; i < num_puts; ++i) { + hbase::Get get(Row(FLAGS_row, i)); + gets.push_back(get); + } + + LOG(INFO) << "Sending multi-get requests"; + start_ns = TimeUtil::GetNowNanos(); + auto results = table->Get(gets); - start_ns = TimeUtil::GetNowNanos(); - auto results = table->Get(gets); + if (FLAGS_display_results) { + for (const auto &result : results) LOG(INFO) << result->DebugString(); + } - if (FLAGS_display_results) { - for (const auto &result : results) LOG(INFO) << result->DebugString(); + LOG(INFO) << "Successfully sent " << gets.size() << " Multi-Get requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; } - LOG(INFO) << "Successfully sent " << gets.size() << " Multi-Get requests in " - << TimeUtil::ElapsedMillis(start_ns) << " ms."; + // Do the Scan + if (FLAGS_scans) { + LOG(INFO) << "Starting scanner"; + start_ns = TimeUtil::GetNowNanos(); + Scan scan{}; + auto scanner = table->Scan(scan); + + uint64_t i = 0; + auto r = scanner->Next(); + while (r != nullptr) { + if (FLAGS_display_results) { + LOG(INFO) << r->DebugString(); + } + r = scanner->Next(); + i++; + } + + LOG(INFO) << "Successfully iterated over " << i << " Scan results in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + scanner->Close(); + } table->Close(); client->Close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/table.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index aa51989..9cdd8b0 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -25,6 +25,7 @@ #include <vector> #include "core/async-connection.h" +#include "core/async-table-result-scanner.h" #include "core/request-converter.h" #include "core/response-converter.h" #include "if/Client.pb.h" @@ -52,6 +53,21 @@ std::shared_ptr<hbase::Result> Table::Get(const hbase::Get &get) { return context.get(operation_timeout()); } +std::shared_ptr<ResultScanner> Table::Scan(const hbase::Scan &scan) { + auto max_cache_size = ResultSize2CacheSize( + scan.MaxResultSize() > 0 ? scan.MaxResultSize() + : async_connection_->connection_conf()->scanner_max_result_size()); + auto scanner = std::make_shared<AsyncTableResultScanner>(max_cache_size); + async_table_->Scan(scan, scanner); + return scanner; +} + +int64_t Table::ResultSize2CacheSize(int64_t max_results_size) const { + // * 2 if possible + return max_results_size > (std::numeric_limits<int64_t>::max() / 2) ? max_results_size + : max_results_size * 2; +} + void Table::Put(const hbase::Put &put) { auto future = async_table_->Put(put); future.get(operation_timeout()); http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/core/table.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h index 81ddc8e..1f6d9b7 100644 --- a/hbase-native-client/core/table.h +++ b/hbase-native-client/core/table.h @@ -32,6 +32,7 @@ #include "core/location-cache.h" #include "core/put.h" #include "core/raw-async-table.h" +#include "core/result-scanner.h" #include "core/result.h" #include "serde/table-name.h" @@ -74,6 +75,8 @@ class Table { std::shared_ptr<hbase::Result> Increment(const hbase::Increment &increment); // TODO: Batch Puts + std::shared_ptr<ResultScanner> Scan(const hbase::Scan &scan); + /** * @brief - Close the client connection. */ @@ -92,5 +95,7 @@ class Table { private: std::chrono::milliseconds operation_timeout() const; + + int64_t ResultSize2CacheSize(int64_t max_results_size) const; }; } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/bde59b25/hbase-native-client/exceptions/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/exceptions/BUCK b/hbase-native-client/exceptions/BUCK index eef4437..07ffeaf 100644 --- a/hbase-native-client/exceptions/BUCK +++ b/hbase-native-client/exceptions/BUCK @@ -17,8 +17,12 @@ cxx_library( name="exceptions", - exported_headers=["exception.h",], + exported_headers=[ + "exception.h", + ], srcs=[], - deps=["//third-party:folly",], + deps=[ + "//third-party:folly", + ], compiler_flags=['-Weffc++'], - visibility=['//core/...','//connection//...'],) \ No newline at end of file + visibility=['//core/...', '//connection//...'],)