http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/result.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/result.cc b/hbase-native-client/core/result.cc deleted file mode 100644 index 44b4c86..0000000 --- a/hbase-native-client/core/result.cc +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/result.h" - -namespace hbase { - -Result::~Result() {} - -Result::Result(const std::vector<std::shared_ptr<Cell> > &cells, bool exists, bool stale, - bool partial) - : exists_(exists), stale_(stale), partial_(partial), cells_(cells) { - row_ = (cells_.size() == 0 ? "" : cells_[0]->Row()); -} - -Result::Result(const Result &result) { - exists_ = result.exists_; - stale_ = result.stale_; - partial_ = result.partial_; - row_ = result.row_; - if (!result.cells_.empty()) { - for (const auto &cell : result.cells_) { - cells_.push_back(cell); - } - } -} - -const std::vector<std::shared_ptr<Cell> > &Result::Cells() const { return cells_; } - -std::vector<std::shared_ptr<Cell> > Result::ColumnCells(const std::string &family, - const std::string &qualifier) const { - std::vector<std::shared_ptr<Cell> > column_cells; - // TODO implement a BinarySearch here ? - for (const auto &cell : cells_) { - if (cell->Family() == family && cell->Qualifier() == qualifier) { - column_cells.push_back(cell); - } - } - return column_cells; -} - -const std::shared_ptr<Cell> Result::ColumnLatestCell(const std::string &family, - const std::string &qualifier) const { - // TODO implement a BinarySearch here ? - for (const auto &cell : cells_) { - // We find the latest(first) occurrence of the Cell for a given column and - // qualifier and break - if (cell->Family() == family && cell->Qualifier() == qualifier) { - return cell; - } - } - return nullptr; -} - -optional<std::string> Result::Value(const std::string &family, const std::string &qualifier) const { - std::shared_ptr<Cell> latest_cell(ColumnLatestCell(family, qualifier)); - if (latest_cell.get()) { - return optional<std::string>(latest_cell->Value()); - } - return optional<std::string>(); -} - -bool Result::IsEmpty() const { return cells_.empty(); } - -const std::string &Result::Row() const { return row_; } - -int Result::Size() const { return cells_.size(); } - -ResultMap Result::Map() const { - ResultMap result_map; - for (const auto &cell : cells_) { - result_map[cell->Family()][cell->Qualifier()][cell->Timestamp()] = cell->Value(); - } - return result_map; -} - -std::map<std::string, std::string> Result::FamilyMap(const std::string &family) const { - std::map<std::string, std::string> family_map; - if (!IsEmpty()) { - auto result_map = Map(); - auto itr = result_map.find(family); - if (itr == result_map.end()) { - return family_map; - } - - for (auto qitr = itr->second.begin(); qitr != itr->second.end(); ++qitr) { - for (auto vitr = qitr->second.begin(); vitr != qitr->second.end(); ++vitr) { - // We break after inserting the first value. Result.java takes only - // the first value - family_map[qitr->first] = vitr->second; - break; - } - } - } - - return family_map; -} - -std::string Result::DebugString() const { - std::string ret{"keyvalues="}; - if (IsEmpty()) { - ret += "NONE"; - return ret; - } - ret += "{"; - bool is_first = true; - for (const auto &cell : cells_) { - if (is_first) { - is_first = false; - } else { - ret += ", "; - } - ret += cell->DebugString(); - } - ret += "}"; - - return ret; -} - -size_t Result::EstimatedSize() const { - size_t s = sizeof(Result); - s += row_.capacity(); - for (const auto c : cells_) { - s += sizeof(std::shared_ptr<Cell>); - s + c->EstimatedSize(); - } - return s; -} - -} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/result.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/result.h b/hbase-native-client/core/result.h deleted file mode 100644 index f18071b..0000000 --- a/hbase-native-client/core/result.h +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <functional> -#include <map> -#include <memory> -#include <string> -#include <vector> - -#include "core/cell.h" -#include "utils/optional.h" - -namespace hbase { - -/** - * @brief Map of families to all versions of its qualifiers and values - * We need to have a reverse ordered map, when storing TS -> value, so that the - * most recent value is stored first - */ -using ResultMap = - std::map<std::string, - std::map<std::string, std::map<int64_t, std::string, std::greater<int64_t> > > >; - -class Result { - public: - /** - * Constructors - */ - Result(const std::vector<std::shared_ptr<Cell> > &cells, bool exists, bool stale, bool partial); - Result(const Result &result); - ~Result(); - - /** - * @brief Return the vector of Cells backing this Result instance. This vector - * will be ordered in the same manner - * as the one which was passed while creation of the Result instance. - */ - const std::vector<std::shared_ptr<Cell> > &Cells() const; - - /** - * @brief Return a vector of Cells for the family and qualifier or empty list - * if the column - * did not exist in the result. - * @param family - column family - * @param qualifier - column qualifier - */ - std::vector<std::shared_ptr<Cell> > ColumnCells(const std::string &family, - const std::string &qualifier) const; - - /** - * @brief Returns the Cell for the most recent timestamp for a given family - * and qualifier. - * Returns map of qualifiers to values, only includes latest values - * @param family - column family. - * @param qualifier - column qualifier - */ - const std::shared_ptr<Cell> ColumnLatestCell(const std::string &family, - const std::string &qualifier) const; - - /** - * @brief Get the latest version of the specified family and qualifier. - * @param family - column family - * @param qualifier - column qualifier - */ - optional<std::string> Value(const std::string &family, const std::string &qualifier) const; - - /** - * @brief Returns if the underlying Cell vector is empty or not - */ - bool IsEmpty() const; - - /** - * @brief Retrieves the row key that corresponds to the row from which this - * Result was created. - */ - const std::string &Row() const; - - /** - * @brief Returns the size of the underlying Cell vector - */ - int Size() const; - - /** - * @brief Map of families to all versions of its qualifiers and values. - * Returns a three level Map of the form: - * Map<family,Map<qualifier,Map<timestamp,value>>>> - * All other map returning methods make use of this map internally - * The Map is created when the Result instance is created - */ - ResultMap Map() const; - - /** - * @brief Map of qualifiers to values. - * Returns a Map of the form: Map<qualifier,value> - * @param family - column family to get - */ - std::map<std::string, std::string> FamilyMap(const std::string &family) const; - - std::string DebugString() const; - - bool Exists() const { return exists_; } - - bool Stale() const { return stale_; } - - bool Partial() const { return partial_; } - - /** Returns estimated size of the Result object including deep heap space usage - * of its Cells and data. Notice that this is a very rough estimate. */ - size_t EstimatedSize() const; - - private: - bool exists_ = false; - bool stale_ = false; - bool partial_ = false; - std::string row_ = ""; - std::vector<std::shared_ptr<Cell> > cells_; -}; -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/row.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/row.h b/hbase-native-client/core/row.h deleted file mode 100644 index 2c7bdd1..0000000 --- a/hbase-native-client/core/row.h +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <limits> -#include <stdexcept> -#include <string> - -#pragma once - -namespace hbase { - -class Row { - public: - Row() {} - explicit Row(const std::string &row) : row_(row) { CheckRow(row_); } - - /** - * @brief Returns the row for the Row interface. - */ - const std::string &row() const { return row_; } - virtual ~Row() {} - - private: - /** - * @brief Checks if the row for this Get operation is proper or not - * @param row Row to check - * @throws std::runtime_error if row is empty or greater than - * MAX_ROW_LENGTH(i.e. std::numeric_limits<short>::max()) - */ - void CheckRow(const std::string &row) { - const int16_t kMaxRowLength = std::numeric_limits<int16_t>::max(); - size_t row_length = row.size(); - if (0 == row_length) { - throw std::runtime_error("Row length can't be 0"); - } - if (row_length > kMaxRowLength) { - throw std::runtime_error("Length of " + row + " is greater than max row size: " + - std::to_string(kMaxRowLength)); - } - } - - protected: - std::string row_ = ""; -}; - -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/scan-result-cache-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scan-result-cache-test.cc b/hbase-native-client/core/scan-result-cache-test.cc deleted file mode 100644 index 0bf83ce..0000000 --- a/hbase-native-client/core/scan-result-cache-test.cc +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <folly/Conv.h> -#include <glog/logging.h> -#include <gtest/gtest.h> - -#include <vector> - -#include "core/cell.h" -#include "core/result.h" -#include "core/scan-result-cache.h" - -using hbase::ScanResultCache; -using hbase::Result; -using hbase::Cell; -using hbase::CellType; - -using ResultVector = std::vector<std::shared_ptr<Result>>; - -std::shared_ptr<Cell> CreateCell(const int32_t &key, const std::string &family, - const std::string &column) { - auto row = folly::to<std::string>(key); - return std::make_shared<Cell>(row, family, column, std::numeric_limits<int64_t>::max(), row, - CellType::PUT); -} - -std::shared_ptr<Result> CreateResult(std::shared_ptr<Cell> cell, bool partial) { - return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{cell}, false, false, partial); -} - -TEST(ScanResultCacheTest, NoPartial) { - ScanResultCache cache; - ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, false)); - ASSERT_EQ(ResultVector{}, cache.AddAndGet(ResultVector{}, true)); - int32_t count = 10; - ResultVector results{}; - for (int32_t i = 0; i < count; i++) { - results.push_back(CreateResult(CreateCell(i, "cf", "cq1"), false)); - } - ASSERT_EQ(results, cache.AddAndGet(results, false)); -} - -TEST(ScanResultCacheTest, Combine1) { - ScanResultCache cache; - auto prev_result = CreateResult(CreateCell(0, "cf", "cq1"), true); - auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); - auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true); - auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true); - auto results = cache.AddAndGet(ResultVector{prev_result, result1}, false); - ASSERT_EQ(1L, results.size()); - ASSERT_EQ(prev_result, results[0]); - - ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size()); - ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size()); - ASSERT_EQ(0, cache.AddAndGet(ResultVector{}, true).size()); - - results = cache.AddAndGet(ResultVector{}, false); - ASSERT_EQ(1, results.size()); - ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row())); - ASSERT_EQ(3, results[0]->Cells().size()); - ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); - ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); - ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3"))); -} - -TEST(ScanResultCacheTest, Combine2) { - ScanResultCache cache; - auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); - auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true); - auto result3 = CreateResult(CreateCell(1, "cf", "cq3"), true); - - auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true); - auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq2"), false); - - ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size()); - ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size()); - ASSERT_EQ(0, cache.AddAndGet(ResultVector{result3}, false).size()); - - auto results = cache.AddAndGet(ResultVector{next_result1}, false); - ASSERT_EQ(1, results.size()); - ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row())); - ASSERT_EQ(3, results[0]->Cells().size()); - ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); - ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); - ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq3"))); - - results = cache.AddAndGet(ResultVector{next_to_next_result1}, false); - ASSERT_EQ(2, results.size()); - ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row())); - ASSERT_EQ(1, results[0]->Cells().size()); - ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); - ASSERT_EQ(3, folly::to<int32_t>(results[1]->Row())); - ASSERT_EQ(1, results[1]->Cells().size()); - ASSERT_EQ(3, folly::to<int32_t>(*results[1]->Value("cf", "cq2"))); -} - -TEST(ScanResultCacheTest, Combine3) { - ScanResultCache cache; - auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); - auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), true); - auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), false); - auto next_to_next_result1 = CreateResult(CreateCell(3, "cf", "cq1"), true); - - ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size()); - ASSERT_EQ(0, cache.AddAndGet(ResultVector{result2}, false).size()); - - auto results = cache.AddAndGet(ResultVector{next_result1, next_to_next_result1}, false); - - ASSERT_EQ(2, results.size()); - ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row())); - ASSERT_EQ(2, results[0]->Cells().size()); - ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); - ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); - ASSERT_EQ(2, folly::to<int32_t>(results[1]->Row())); - ASSERT_EQ(1, results[1]->Cells().size()); - ASSERT_EQ(2, folly::to<int32_t>(*results[1]->Value("cf", "cq1"))); - - results = cache.AddAndGet(ResultVector{}, false); - - ASSERT_EQ(1, results.size()); - ASSERT_EQ(3, folly::to<int32_t>(results[0]->Row())); - ASSERT_EQ(1, results[0]->Cells().size()); - ASSERT_EQ(3, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); -} - -TEST(ScanResultCacheTest, Combine4) { - ScanResultCache cache; - auto result1 = CreateResult(CreateCell(1, "cf", "cq1"), true); - auto result2 = CreateResult(CreateCell(1, "cf", "cq2"), false); - auto next_result1 = CreateResult(CreateCell(2, "cf", "cq1"), true); - auto next_result2 = CreateResult(CreateCell(2, "cf", "cq2"), false); - - ASSERT_EQ(0, cache.AddAndGet(ResultVector{result1}, false).size()); - - auto results = cache.AddAndGet(ResultVector{result2, next_result1}, false); - - ASSERT_EQ(1, results.size()); - ASSERT_EQ(1, folly::to<int32_t>(results[0]->Row())); - ASSERT_EQ(2, results[0]->Cells().size()); - ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); - ASSERT_EQ(1, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); - - results = cache.AddAndGet(ResultVector{next_result2}, false); - - ASSERT_EQ(1, results.size()); - ASSERT_EQ(2, folly::to<int32_t>(results[0]->Row())); - ASSERT_EQ(2, results[0]->Cells().size()); - ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq1"))); - ASSERT_EQ(2, folly::to<int32_t>(*results[0]->Value("cf", "cq2"))); -} - -TEST(ScanResultCacheTest, SizeOf) { - std::string e{""}; - std::string f{"f"}; - std::string foo{"foo"}; - - LOG(INFO) << sizeof(e) << " " << e.capacity(); - LOG(INFO) << sizeof(f) << " " << f.capacity(); - LOG(INFO) << sizeof(foo) << " " << foo.capacity(); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/scan-result-cache.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scan-result-cache.cc b/hbase-native-client/core/scan-result-cache.cc deleted file mode 100644 index 62a51e0..0000000 --- a/hbase-native-client/core/scan-result-cache.cc +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/scan-result-cache.h" -#include <algorithm> -#include <iterator> -#include <limits> -#include <stdexcept> - -namespace hbase { -/** - * Add the given results to cache and get valid results back. - * @param results the results of a scan next. Must not be null. - * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response. - * @return valid results, never null. - */ -std::vector<std::shared_ptr<Result>> ScanResultCache::AddAndGet( - const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat) { - // If no results were returned it indicates that either we have the all the partial results - // necessary to construct the complete result or the server had to send a heartbeat message - // to the client to keep the client-server connection alive - if (results.empty()) { - // If this response was an empty heartbeat message, then we have not exhausted the region - // and thus there may be more partials server side that still need to be added to the partial - // list before we form the complete Result - if (!partial_results_.empty() && !is_hearthbeat) { - return UpdateNumberOfCompleteResultsAndReturn( - std::vector<std::shared_ptr<Result>>{Combine()}); - } - return std::vector<std::shared_ptr<Result>>{}; - } - // In every RPC response there should be at most a single partial result. Furthermore, if - // there is a partial result, it is guaranteed to be in the last position of the array. - auto last = results[results.size() - 1]; - if (last->Partial()) { - if (partial_results_.empty()) { - partial_results_.push_back(last); - std::vector<std::shared_ptr<Result>> new_results; - std::copy_n(results.begin(), results.size() - 1, std::back_inserter(new_results)); - return UpdateNumberOfCompleteResultsAndReturn(new_results); - } - // We have only one result and it is partial - if (results.size() == 1) { - // check if there is a row change - if (partial_results_.at(0)->Row() == last->Row()) { - partial_results_.push_back(last); - return std::vector<std::shared_ptr<Result>>{}; - } - auto complete_result = Combine(); - partial_results_.push_back(last); - return UpdateNumberOfCompleteResultsAndReturn(complete_result); - } - // We have some complete results - auto results_to_return = PrependCombined(results, results.size() - 1); - partial_results_.push_back(last); - return UpdateNumberOfCompleteResultsAndReturn(results_to_return); - } - if (!partial_results_.empty()) { - return UpdateNumberOfCompleteResultsAndReturn(PrependCombined(results, results.size())); - } - return UpdateNumberOfCompleteResultsAndReturn(results); -} - -void ScanResultCache::Clear() { partial_results_.clear(); } - -std::shared_ptr<Result> ScanResultCache::CreateCompleteResult( - const std::vector<std::shared_ptr<Result>> &partial_results) { - if (partial_results.empty()) { - return std::make_shared<Result>(std::vector<std::shared_ptr<Cell>>{}, false, false, false); - } - std::vector<std::shared_ptr<Cell>> cells{}; - bool stale = false; - std::string prev_row = ""; - std::string current_row = ""; - size_t i = 0; - for (const auto &r : partial_results) { - current_row = r->Row(); - if (i != 0 && prev_row != current_row) { - throw new std::runtime_error( - "Cannot form complete result. Rows of partial results do not match."); - } - // Ensure that all Results except the last one are marked as partials. The last result - // may not be marked as a partial because Results are only marked as partials when - // the scan on the server side must be stopped due to reaching the maxResultSize. - // Visualizing it makes it easier to understand: - // maxResultSize: 2 cells - // (-x-) represents cell number x in a row - // Example: row1: -1- -2- -3- -4- -5- (5 cells total) - // How row1 will be returned by the server as partial Results: - // Result1: -1- -2- (2 cells, size limit reached, mark as partial) - // Result2: -3- -4- (2 cells, size limit reached, mark as partial) - // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) - if (i != partial_results.size() - 1 && !r->Partial()) { - throw new std::runtime_error("Cannot form complete result. Result is missing partial flag."); - } - prev_row = current_row; - stale = stale || r->Stale(); - for (const auto &c : r->Cells()) { - cells.push_back(c); - } - i++; - } - - return std::make_shared<Result>(cells, false, stale, false); -} - -std::shared_ptr<Result> ScanResultCache::Combine() { - auto result = CreateCompleteResult(partial_results_); - partial_results_.clear(); - return result; -} - -std::vector<std::shared_ptr<Result>> ScanResultCache::PrependCombined( - const std::vector<std::shared_ptr<Result>> &results, int length) { - if (length == 0) { - return std::vector<std::shared_ptr<Result>>{Combine()}; - } - // the last part of a partial result may not be marked as partial so here we need to check if - // there is a row change. - size_t start; - if (partial_results_[0]->Row() == results[0]->Row()) { - partial_results_.push_back(results[0]); - start = 1; - length--; - } else { - start = 0; - } - std::vector<std::shared_ptr<Result>> prepend_results{}; - prepend_results.push_back(Combine()); - std::copy_n(results.begin() + start, length, std::back_inserter(prepend_results)); - return prepend_results; -} - -std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn( - const std::shared_ptr<Result> &result) { - return UpdateNumberOfCompleteResultsAndReturn(std::vector<std::shared_ptr<Result>>{result}); -} - -std::vector<std::shared_ptr<Result>> ScanResultCache::UpdateNumberOfCompleteResultsAndReturn( - const std::vector<std::shared_ptr<Result>> &results) { - num_complete_rows_ += results.size(); - return results; -} -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/scan-result-cache.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scan-result-cache.h b/hbase-native-client/core/scan-result-cache.h deleted file mode 100644 index 5d3d0ab..0000000 --- a/hbase-native-client/core/scan-result-cache.h +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/Logging.h> -#include <algorithm> -#include <chrono> -#include <iterator> -#include <memory> -#include <string> -#include <vector> - -#include "core/result.h" -#include "if/Client.pb.h" -#include "if/HBase.pb.h" - -namespace hbase { - -class ScanResultCache { - // In Java, there are 3 different implementations for this. We are not doing partial results, - // or scan batching in native code for now, so this version is simpler and - // only deals with giving back complete rows as Result. It is more or less implementation - // of CompleteScanResultCache.java - - public: - /** - * Add the given results to cache and get valid results back. - * @param results the results of a scan next. Must not be null. - * @param is_hearthbeat indicate whether the results is gotten from a heartbeat response. - * @return valid results, never null. - */ - std::vector<std::shared_ptr<Result>> AddAndGet( - const std::vector<std::shared_ptr<Result>> &results, bool is_hearthbeat); - - void Clear(); - - int64_t num_complete_rows() const { return num_complete_rows_; } - - private: - /** - * Forms a single result from the partial results in the partialResults list. This method is - * useful for reconstructing partial results on the client side. - * @param partial_results list of partial results - * @return The complete result that is formed by combining all of the partial results together - */ - static std::shared_ptr<Result> CreateCompleteResult( - const std::vector<std::shared_ptr<Result>> &partial_results); - - std::shared_ptr<Result> Combine(); - - std::vector<std::shared_ptr<Result>> PrependCombined( - const std::vector<std::shared_ptr<Result>> &results, int length); - - std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn( - const std::shared_ptr<Result> &result); - - std::vector<std::shared_ptr<Result>> UpdateNumberOfCompleteResultsAndReturn( - const std::vector<std::shared_ptr<Result>> &results); - - private: - std::vector<std::shared_ptr<Result>> partial_results_; - int64_t num_complete_rows_ = 0; -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/scan-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scan-test.cc b/hbase-native-client/core/scan-test.cc deleted file mode 100644 index 0ee054c..0000000 --- a/hbase-native-client/core/scan-test.cc +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <gtest/gtest.h> -#include <limits> - -#include "core/scan.h" - -using hbase::Get; -using hbase::Scan; - -void CheckFamilies(Scan &scan) { - EXPECT_EQ(false, scan.HasFamilies()); - scan.AddFamily("family-1"); - EXPECT_EQ(true, scan.HasFamilies()); - EXPECT_EQ(1, scan.FamilyMap().size()); - for (const auto &family : scan.FamilyMap()) { - EXPECT_STREQ("family-1", family.first.c_str()); - EXPECT_EQ(0, family.second.size()); - } - // Not allowed to add the same CF. - scan.AddFamily("family-1"); - EXPECT_EQ(1, scan.FamilyMap().size()); - scan.AddFamily("family-2"); - EXPECT_EQ(2, scan.FamilyMap().size()); - scan.AddFamily("family-3"); - EXPECT_EQ(3, scan.FamilyMap().size()); - int i = 1; - for (const auto &family : scan.FamilyMap()) { - std::string family_name = "family-" + std::to_string(i); - EXPECT_STREQ(family_name.c_str(), family.first.c_str()); - EXPECT_EQ(0, family.second.size()); - i += 1; - } - - scan.AddColumn("family-1", "column-1"); - scan.AddColumn("family-1", "column-2"); - scan.AddColumn("family-1", ""); - scan.AddColumn("family-1", "column-3"); - scan.AddColumn("family-2", "column-X"); - - EXPECT_EQ(3, scan.FamilyMap().size()); - auto it = scan.FamilyMap().begin(); - EXPECT_STREQ("family-1", it->first.c_str()); - EXPECT_EQ(4, it->second.size()); - EXPECT_STREQ("column-1", it->second[0].c_str()); - EXPECT_STREQ("column-2", it->second[1].c_str()); - EXPECT_STREQ("", it->second[2].c_str()); - EXPECT_STREQ("column-3", it->second[3].c_str()); - ++it; - EXPECT_STREQ("family-2", it->first.c_str()); - EXPECT_EQ(1, it->second.size()); - EXPECT_STREQ("column-X", it->second[0].c_str()); - ++it; - EXPECT_STREQ("family-3", it->first.c_str()); - EXPECT_EQ(0, it->second.size()); - ++it; - EXPECT_EQ(it, scan.FamilyMap().end()); -} - -void CheckFamiliesAfterCopy(const Scan &scan) { - EXPECT_EQ(true, scan.HasFamilies()); - EXPECT_EQ(3, scan.FamilyMap().size()); - int i = 1; - for (const auto &family : scan.FamilyMap()) { - std::string family_name = "family-" + std::to_string(i); - EXPECT_STREQ(family_name.c_str(), family.first.c_str()); - i += 1; - } - // Check if the alreaday added CF's and CQ's are as expected - auto it = scan.FamilyMap().begin(); - EXPECT_STREQ("family-1", it->first.c_str()); - EXPECT_EQ(4, it->second.size()); - EXPECT_STREQ("column-1", it->second[0].c_str()); - EXPECT_STREQ("column-2", it->second[1].c_str()); - EXPECT_STREQ("", it->second[2].c_str()); - EXPECT_STREQ("column-3", it->second[3].c_str()); - ++it; - EXPECT_STREQ("family-2", it->first.c_str()); - EXPECT_EQ(1, it->second.size()); - EXPECT_STREQ("column-X", it->second[0].c_str()); - ++it; - EXPECT_STREQ("family-3", it->first.c_str()); - EXPECT_EQ(0, it->second.size()); - ++it; - EXPECT_EQ(it, scan.FamilyMap().end()); -} - -void ScanMethods(Scan &scan) { - scan.SetReversed(true); - EXPECT_EQ(true, scan.IsReversed()); - scan.SetReversed(false); - EXPECT_EQ(false, scan.IsReversed()); - - std::string start_row("start-row"); - std::string stop_row("stop-row"); - scan.SetStartRow(start_row); - EXPECT_EQ(start_row, scan.StartRow()); - - scan.SetStopRow(stop_row); - EXPECT_EQ(stop_row, scan.StopRow()); - - scan.SetCaching(3); - EXPECT_EQ(3, scan.Caching()); - - scan.SetConsistency(hbase::pb::Consistency::STRONG); - EXPECT_EQ(hbase::pb::Consistency::STRONG, scan.Consistency()); - scan.SetConsistency(hbase::pb::Consistency::TIMELINE); - EXPECT_EQ(hbase::pb::Consistency::TIMELINE, scan.Consistency()); - - scan.SetCacheBlocks(true); - EXPECT_EQ(true, scan.CacheBlocks()); - scan.SetCacheBlocks(false); - EXPECT_EQ(false, scan.CacheBlocks()); - - scan.SetAllowPartialResults(true); - EXPECT_EQ(true, scan.AllowPartialResults()); - scan.SetAllowPartialResults(false); - EXPECT_EQ(false, scan.AllowPartialResults()); - - scan.SetLoadColumnFamiliesOnDemand(true); - EXPECT_EQ(true, scan.LoadColumnFamiliesOnDemand()); - scan.SetLoadColumnFamiliesOnDemand(false); - EXPECT_EQ(false, scan.LoadColumnFamiliesOnDemand()); - - scan.SetMaxVersions(); - EXPECT_EQ(1, scan.MaxVersions()); - scan.SetMaxVersions(20); - EXPECT_EQ(20, scan.MaxVersions()); - - scan.SetMaxResultSize(1024); - EXPECT_EQ(1024, scan.MaxResultSize()); - - // Test initial values - EXPECT_EQ(0, scan.Timerange().MinTimeStamp()); - EXPECT_EQ(std::numeric_limits<int64_t>::max(), scan.Timerange().MaxTimeStamp()); - - // Set & Test new values using TimeRange and TimeStamp - scan.SetTimeRange(1000, 2000); - EXPECT_EQ(1000, scan.Timerange().MinTimeStamp()); - EXPECT_EQ(2000, scan.Timerange().MaxTimeStamp()); - scan.SetTimeStamp(0); - EXPECT_EQ(0, scan.Timerange().MinTimeStamp()); - EXPECT_EQ(1, scan.Timerange().MaxTimeStamp()); - - // Test some exceptions - ASSERT_THROW(scan.SetTimeRange(-1000, 2000), std::runtime_error); - ASSERT_THROW(scan.SetTimeRange(1000, -2000), std::runtime_error); - ASSERT_THROW(scan.SetTimeRange(1000, 200), std::runtime_error); - ASSERT_THROW(scan.SetTimeStamp(std::numeric_limits<int64_t>::max()), std::runtime_error); -} - -TEST(Scan, Object) { - Scan scan; - ScanMethods(scan); - CheckFamilies(scan); - - // Resetting TimeRange values so that the copy construction and assignment - // operator tests pass. - scan.SetTimeRange(0, std::numeric_limits<int64_t>::max()); - Scan scancp(scan); - ScanMethods(scancp); - CheckFamiliesAfterCopy(scancp); - - Scan scaneq; - scaneq = scan; - ScanMethods(scaneq); - CheckFamiliesAfterCopy(scaneq); -} - -TEST(Scan, WithStartRow) { - Scan("row-test"); - Scan scan("row-test"); - ScanMethods(scan); - CheckFamilies(scan); -} - -TEST(Scan, WithStartAndStopRow) { - Scan("start-row", "stop-row"); - Scan scan("start-row", "stop-row"); - ScanMethods(scan); - CheckFamilies(scan); -} - -TEST(Scan, FromGet) { - std::string row_str = "row-test"; - Get get = Get(row_str); - - get.SetCacheBlocks(true); - get.SetMaxVersions(5); - get.AddFamily("family-1"); - get.AddFamily("family-1"); - get.AddFamily("family-2"); - get.AddFamily("family-3"); - get.AddColumn("family-1", "column-1"); - get.AddColumn("family-1", "column-2"); - get.AddColumn("family-1", ""); - get.AddColumn("family-1", "column-3"); - get.AddColumn("family-2", "column-X"); - - EXPECT_EQ(3, get.FamilyMap().size()); - - Scan scan(get); - ScanMethods(scan); - CheckFamiliesAfterCopy(scan); -} - -TEST(Scan, Exception) { - std::string row(std::numeric_limits<int16_t>::max() + 1, 'X'); - ASSERT_THROW(Scan tmp(row), std::runtime_error); - ASSERT_THROW(Scan tmp(""), std::runtime_error); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/scan.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scan.cc b/hbase-native-client/core/scan.cc deleted file mode 100644 index 6dcc51b..0000000 --- a/hbase-native-client/core/scan.cc +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/scan.h" - -#include <algorithm> -#include <iterator> -#include <limits> -#include <stdexcept> - -namespace hbase { - -Scan::Scan() {} - -Scan::~Scan() {} - -Scan::Scan(const std::string &start_row) : start_row_(start_row) { CheckRow(start_row_); } - -Scan::Scan(const std::string &start_row, const std::string &stop_row) - : start_row_(start_row), stop_row_(stop_row) { - CheckRow(start_row_); - CheckRow(stop_row_); -} - -Scan::Scan(const Scan &scan) : Query(scan) { - start_row_ = scan.start_row_; - stop_row_ = scan.stop_row_; - max_versions_ = scan.max_versions_; - caching_ = scan.caching_; - max_result_size_ = scan.max_result_size_; - cache_blocks_ = scan.cache_blocks_; - load_column_families_on_demand_ = scan.load_column_families_on_demand_; - reversed_ = scan.reversed_; - allow_partial_results_ = scan.allow_partial_results_; - consistency_ = scan.consistency_; - tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp())); - family_map_.insert(scan.family_map_.begin(), scan.family_map_.end()); -} - -Scan &Scan::operator=(const Scan &scan) { - Query::operator=(scan); - start_row_ = scan.start_row_; - stop_row_ = scan.stop_row_; - max_versions_ = scan.max_versions_; - caching_ = scan.caching_; - max_result_size_ = scan.max_result_size_; - cache_blocks_ = scan.cache_blocks_; - load_column_families_on_demand_ = scan.load_column_families_on_demand_; - reversed_ = scan.reversed_; - allow_partial_results_ = scan.allow_partial_results_; - consistency_ = scan.consistency_; - tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp())); - family_map_.insert(scan.family_map_.begin(), scan.family_map_.end()); - return *this; -} - -Scan::Scan(const Get &get) { - cache_blocks_ = get.CacheBlocks(); - max_versions_ = get.MaxVersions(); - tr_.reset(new TimeRange(get.Timerange().MinTimeStamp(), get.Timerange().MaxTimeStamp())); - family_map_.insert(get.FamilyMap().begin(), get.FamilyMap().end()); -} - -Scan &Scan::AddFamily(const std::string &family) { - const auto &it = family_map_.find(family); - /** - * Check if any qualifiers are already present or not. - * Remove all existing qualifiers if the given family is already present in - * the map - */ - if (family_map_.end() != it) { - it->second.clear(); - } else { - family_map_[family]; - } - return *this; -} - -Scan &Scan::AddColumn(const std::string &family, const std::string &qualifier) { - const auto &it = std::find(family_map_[family].begin(), family_map_[family].end(), qualifier); - /** - * Check if any qualifiers are already present or not. - * Add only if qualifiers for a given family are not present - */ - if (it == family_map_[family].end()) { - family_map_[family].push_back(qualifier); - } - return *this; -} - -void Scan::SetReversed(bool reversed) { reversed_ = reversed; } - -bool Scan::IsReversed() const { return reversed_; } - -void Scan::SetStartRow(const std::string &start_row) { start_row_ = start_row; } - -const std::string &Scan::StartRow() const { return start_row_; } - -void Scan::SetStopRow(const std::string &stop_row) { stop_row_ = stop_row; } - -const std::string &Scan::StopRow() const { return stop_row_; } - -void Scan::SetCaching(int caching) { caching_ = caching; } - -int Scan::Caching() const { return caching_; } - -Scan &Scan::SetConsistency(const hbase::pb::Consistency consistency) { - consistency_ = consistency; - return *this; -} - -hbase::pb::Consistency Scan::Consistency() const { return consistency_; } - -void Scan::SetCacheBlocks(bool cache_blocks) { cache_blocks_ = cache_blocks; } - -bool Scan::CacheBlocks() const { return cache_blocks_; } - -void Scan::SetAllowPartialResults(bool allow_partial_results) { - allow_partial_results_ = allow_partial_results; -} - -bool Scan::AllowPartialResults() const { return allow_partial_results_; } - -void Scan::SetLoadColumnFamiliesOnDemand(bool load_column_families_on_demand) { - load_column_families_on_demand_ = load_column_families_on_demand; -} - -bool Scan::LoadColumnFamiliesOnDemand() const { return load_column_families_on_demand_; } - -Scan &Scan::SetMaxVersions(uint32_t max_versions) { - max_versions_ = max_versions; - return *this; -} - -int Scan::MaxVersions() const { return max_versions_; } - -void Scan::SetMaxResultSize(int64_t max_result_size) { max_result_size_ = max_result_size; } - -int64_t Scan::MaxResultSize() const { return max_result_size_; } - -Scan &Scan::SetTimeRange(int64_t min_stamp, int64_t max_stamp) { - tr_.reset(new TimeRange(min_stamp, max_stamp)); - return *this; -} - -Scan &Scan::SetTimeStamp(int64_t timestamp) { - tr_.reset(new TimeRange(timestamp, timestamp + 1)); - return *this; -} - -const TimeRange &Scan::Timerange() const { return *tr_; } - -void Scan::CheckRow(const std::string &row) { - const int32_t kMaxRowLength = std::numeric_limits<int16_t>::max(); - int row_length = row.size(); - if (0 == row_length) { - throw std::runtime_error("Row length can't be 0"); - } - if (row_length > kMaxRowLength) { - throw std::runtime_error("Length of " + row + " is greater than max row size: " + - std::to_string(kMaxRowLength)); - } -} - -bool Scan::HasFamilies() const { return !family_map_.empty(); } - -const std::map<std::string, std::vector<std::string>> &Scan::FamilyMap() const { - return family_map_; -} -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/scan.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scan.h b/hbase-native-client/core/scan.h deleted file mode 100644 index 1085c4b..0000000 --- a/hbase-native-client/core/scan.h +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <cstdint> -#include <map> -#include <memory> -#include <string> -#include <vector> -#include "core/get.h" -#include "core/time-range.h" -#include "if/Client.pb.h" - -namespace hbase { - -class Scan : public Query { - public: - /** - * @brief Constructors. Create a Scan operation across all rows. - */ - Scan(); - Scan(const Scan &scan); - Scan &operator=(const Scan &scan); - - ~Scan(); - - /** - * @brief Create a Scan operation starting at the specified row. If the - * specified row does not exist, - * the Scanner will start from the next closest row after the specified row. - * @param start_row - row to start scanner at or after - */ - explicit Scan(const std::string &start_row); - - /** - * @brief Create a Scan operation for the range of rows specified. - * @param start_row - row to start scanner at or after (inclusive). - * @param stop_row - row to stop scanner before (exclusive). - */ - Scan(const std::string &start_row, const std::string &stop_row); - - /** - * @brief Builds a scan object with the same specs as get. - * @param get - get to model scan after - */ - explicit Scan(const Get &get); - - /** - * @brief Get all columns from the specified family.Overrides previous calls - * to AddColumn for this family. - * @param family - family name - */ - Scan &AddFamily(const std::string &family); - - /** - * @brief Get the column from the specified family with the specified - * qualifier.Overrides previous calls to AddFamily for this family. - * @param family - family name. - * @param qualifier - column qualifier. - */ - Scan &AddColumn(const std::string &family, const std::string &qualifier); - - /** - * @brief Set whether this scan is a reversed one. This is false by default - * which means forward(normal) scan. - * @param reversed - if true, scan will be backward order - */ - void SetReversed(bool reversed); - - /** - * @brief Get whether this scan is a reversed one. Returns true if backward - * scan, false if forward(default) scan - */ - bool IsReversed() const; - - /** - * @brief Set the start row of the scan.If the specified row does not exist, - * the Scanner will start from the next closest row after the specified row. - * @param start_row - row to start scanner at or after - * @throws std::runtime_error if start_row length is 0 or greater than - * MAX_ROW_LENGTH - */ - void SetStartRow(const std::string &start_row); - - /** - * @brief returns start_row of the Scan. - */ - const std::string &StartRow() const; - - /** - * @brief Set the stop row of the scan. The scan will include rows that are - * lexicographically less than the provided stop_row. - * @param stop_row - row to end at (exclusive) - * @throws std::runtime_error if stop_row length is 0 or greater than - * MAX_ROW_LENGTH - */ - void SetStopRow(const std::string &stop_row); - - /** - * @brief returns stop_row of the Scan. - */ - const std::string &StopRow() const; - - /** - * @brief Set the number of rows for caching that will be passed to scanners. - * Higher caching values will enable faster scanners but will use more memory. - * @param caching - the number of rows for caching. - */ - void SetCaching(int caching); - - /** - * @brief caching the number of rows fetched when calling next on a scanner. - */ - int Caching() const; - - /** - * @brief Sets the consistency level for this operation. - * @param consistency - the consistency level - */ - Scan &SetConsistency(const hbase::pb::Consistency consistency); - - /** - * @brief Returns the consistency level for this operation. - */ - hbase::pb::Consistency Consistency() const; - - /** - * @brief Set whether blocks should be cached for this Scan.This is true by - * default. When true, default settings of the table and family are used (this - * will never override caching blocks if the block cache is disabled for that - * family or entirely). - * @param cache_blocks - if false, default settings are overridden and blocks - * will not be cached - */ - void SetCacheBlocks(bool cache_blocks); - - /** - * @brief Get whether blocks should be cached for this Scan. - */ - bool CacheBlocks() const; - - /** - * @brief Setting whether the caller wants to see the partial results that may - * be returned from the server. By default this value is false and the - * complete results will be assembled client side before being delivered to - * the caller. - * @param allow_partial_results - if true partial results will be returned. - */ - void SetAllowPartialResults(bool allow_partial_results); - - /** - * @brief true when the constructor of this scan understands that the results - * they will see may only represent a partial portion of a row. The entire row - * would be retrieved by subsequent calls to ResultScanner.next() - */ - bool AllowPartialResults() const; - - /** - * @brief Set the value indicating whether loading CFs on demand should be - * allowed (cluster default is false). On-demand CF loading doesn't load - * column families until necessary. - * @param load_column_families_on_demand - */ - void SetLoadColumnFamiliesOnDemand(bool load_column_families_on_demand); - - /** - * @brief Get the raw loadColumnFamiliesOnDemand setting. - */ - bool LoadColumnFamiliesOnDemand() const; - - /** - * @brief Get up to the specified number of versions of each column if - * specified else get default i.e. one. - * @param max_versions - maximum versions for each column. - */ - Scan &SetMaxVersions(uint32_t max_versions = 1); - - /** - * @brief the max number of versions to fetch - */ - int MaxVersions() const; - - /** - * @brief Set the maximum result size. The default is -1; this means that no - * specific maximum result size will be set for this scan, and the global - * configured value will be used instead. (Defaults to unlimited). - * @param The maximum result size in bytes. - */ - void SetMaxResultSize(int64_t max_result_size); - - /** - * @brief the maximum result size in bytes. - */ - int64_t MaxResultSize() const; - - /** - * @brief Get versions of columns only within the specified timestamp range, - * [min_stamp, max_stamp). Note, default maximum versions to return is 1. If - * your time range spans more than one version and you want all versions - * returned, up the number of versions beyond the default. - * @param min_stamp - minimum timestamp value, inclusive. - * @param max_stamp - maximum timestamp value, exclusive. - */ - Scan &SetTimeRange(int64_t min_stamp, int64_t max_stamp); - - /** - * @brief Get versions of columns with the specified timestamp. Note, default - * maximum versions to return is 1. If your time range spans more than one - * version and you want all versions returned, up the number of versions - * beyond the defaut. - * @param timestamp - version timestamp - */ - Scan &SetTimeStamp(int64_t timestamp); - - /** - * @brief Return Timerange - */ - const TimeRange &Timerange() const; - - /** - * @brief Returns true if family map is non empty false otherwise - */ - bool HasFamilies() const; - - /** - * @brief Returns the Scan family map for this Scan operation. - */ - const std::map<std::string, std::vector<std::string>> &FamilyMap() const; - - private: - std::string start_row_ = ""; - std::string stop_row_ = ""; - uint32_t max_versions_ = 1; - int32_t caching_ = -1; - int64_t max_result_size_ = -1; - bool cache_blocks_ = true; - bool load_column_families_on_demand_ = false; - bool reversed_ = false; - bool allow_partial_results_ = false; - hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG; - std::unique_ptr<TimeRange> tr_ = std::make_unique<TimeRange>(); - std::map<std::string, std::vector<std::string>> family_map_; - - /** - * @brief Checks for row length validity, throws if length check fails, - * returns null otherwise. - * @param row - row whose validity needs to be checked - * @throws std::runtime_error if row length equals 0 or greater than - * std::numeric_limits<short>::max(); - */ - void CheckRow(const std::string &row); -}; -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/scanner-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/scanner-test.cc b/hbase-native-client/core/scanner-test.cc deleted file mode 100644 index 1ecbd02..0000000 --- a/hbase-native-client/core/scanner-test.cc +++ /dev/null @@ -1,368 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <folly/Conv.h> -#include <gtest/gtest.h> - -#include <chrono> -#include <thread> -#include <vector> - -#include "core/async-client-scanner.h" -#include "core/async-table-result-scanner.h" -#include "core/cell.h" -#include "core/client.h" -#include "core/configuration.h" -#include "core/filter.h" -#include "core/get.h" -#include "core/hbase-configuration-loader.h" -#include "core/put.h" -#include "core/result.h" -#include "core/row.h" -#include "core/table.h" -#include "if/Comparator.pb.h" -#include "if/Filter.pb.h" -#include "serde/table-name.h" -#include "test-util/test-util.h" -#include "utils/time-util.h" - -using hbase::Cell; -using hbase::ComparatorFactory; -using hbase::Comparator; -using hbase::Configuration; -using hbase::Get; -using hbase::Put; -using hbase::Result; -using hbase::Scan; -using hbase::Table; -using hbase::TestUtil; -using hbase::TimeUtil; -using hbase::AsyncClientScanner; -using hbase::AsyncTableResultScanner; -using hbase::FilterFactory; -using hbase::pb::CompareType; - -class ScannerTest : public ::testing::Test { - public: - static std::unique_ptr<hbase::TestUtil> test_util; - static const uint32_t num_rows; - - static void SetUpTestCase() { - google::InstallFailureSignalHandler(); - test_util = std::make_unique<hbase::TestUtil>(); - test_util->StartMiniCluster(2); - } -}; -std::unique_ptr<hbase::TestUtil> ScannerTest::test_util = nullptr; -const uint32_t ScannerTest::num_rows = 1000; - -std::string Family(uint32_t i) { return "f" + folly::to<std::string>(i); } - -std::string Row(uint32_t i, int width) { - std::ostringstream s; - s.fill('0'); - s.width(width); - s << i; - return "row" + s.str(); -} - -std::string Row(uint32_t i) { return Row(i, 3); } - -std::unique_ptr<Put> MakePut(const std::string &row, uint32_t num_families) { - auto put = std::make_unique<Put>(row); - - for (uint32_t i = 0; i < num_families; i++) { - put->AddColumn(Family(i), "q1", row); - put->AddColumn(Family(i), "q2", row + "-" + row); - } - - return std::move(put); -} - -void CheckResult(const Result &r, std::string expected_row, uint32_t num_families) { - VLOG(1) << r.DebugString(); - auto row = r.Row(); - ASSERT_EQ(row, expected_row); - ASSERT_EQ(r.Cells().size(), num_families * 2); - for (uint32_t i = 0; i < num_families; i++) { - ASSERT_EQ(*r.Value(Family(i), "q1"), row); - ASSERT_EQ(*r.Value(Family(i), "q2"), row + "-" + row); - } -} - -void CreateTable(std::string table_name, uint32_t num_families, uint32_t num_rows, - int32_t num_regions) { - LOG(INFO) << "Creating the table " << table_name - << " with num_regions:" << folly::to<std::string>(num_regions); - std::vector<std::string> families; - for (uint32_t i = 0; i < num_families; i++) { - families.push_back(Family(i)); - } - if (num_regions <= 1) { - ScannerTest::test_util->CreateTable(table_name, families); - } else { - std::vector<std::string> keys; - for (int32_t i = 0; i < num_regions - 1; i++) { - keys.push_back(Row(i * (num_rows / (num_regions - 1)))); - LOG(INFO) << "Split key:" << keys[keys.size() - 1]; - } - ScannerTest::test_util->CreateTable(table_name, families, keys); - } -} - -std::unique_ptr<hbase::Client> CreateTableAndWriteData(std::string table_name, - uint32_t num_families, uint32_t num_rows, - int32_t num_regions) { - CreateTable(table_name, num_families, num_rows, num_regions); - auto tn = folly::to<hbase::pb::TableName>(table_name); - auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf()); - auto table = client->Table(tn); - - LOG(INFO) << "Writing data to the table, num_rows:" << num_rows; - // Perform Puts - for (uint32_t i = 0; i < num_rows; i++) { - table->Put(*MakePut(Row(i), num_families)); - } - return std::move(client); -} - -void TestScan(const Scan &scan, uint32_t num_families, int32_t start, int32_t num_rows, - Table *table) { - LOG(INFO) << "Starting scan for the test with start:" << scan.StartRow() - << ", stop:" << scan.StopRow() << " expected_num_rows:" << num_rows; - auto scanner = table->Scan(scan); - - uint32_t i = start; - auto r = scanner->Next(); - while (r != nullptr) { - CheckResult(*r, Row(i++), num_families); - r = scanner->Next(); - } - ASSERT_EQ(i - start, num_rows); -} - -void TestScan(const Scan &scan, int32_t start, int32_t num_rows, Table *table) { - TestScan(scan, 1, start, num_rows, table); -} - -void TestScan(uint32_t num_families, int32_t start, int32_t stop, int32_t num_rows, Table *table) { - Scan scan{}; - if (start >= 0) { - scan.SetStartRow(Row(start)); - } else { - start = 0; // neded for below logic - } - if (stop >= 0) { - scan.SetStopRow(Row(stop)); - } - - TestScan(scan, num_families, start, num_rows, table); -} - -void TestScan(int32_t start, int32_t stop, int32_t num_rows, Table *table) { - TestScan(1, start, stop, num_rows, table); -} - -void TestScan(uint32_t num_families, std::string start, std::string stop, int32_t num_rows, - Table *table) { - Scan scan{}; - - scan.SetStartRow(start); - scan.SetStopRow(stop); - - LOG(INFO) << "Starting scan for the test with start:" << start << ", stop:" << stop - << " expected_num_rows:" << num_rows; - auto scanner = table->Scan(scan); - - uint32_t i = 0; - auto r = scanner->Next(); - while (r != nullptr) { - VLOG(1) << r->DebugString(); - i++; - ASSERT_EQ(r->Map().size(), num_families); - r = scanner->Next(); - } - ASSERT_EQ(i, num_rows); -} - -void TestScan(std::string start, std::string stop, int32_t num_rows, Table *table) { - TestScan(1, start, stop, num_rows, table); -} - -void TestScanCombinations(Table *table, uint32_t num_families) { - // full table - TestScan(num_families, -1, -1, 1000, table); - TestScan(num_families, -1, 999, 999, table); - TestScan(num_families, 0, -1, 1000, table); - TestScan(num_families, 0, 999, 999, table); - TestScan(num_families, 10, 990, 980, table); - TestScan(num_families, 1, 998, 997, table); - - TestScan(num_families, 123, 345, 222, table); - TestScan(num_families, 234, 456, 222, table); - TestScan(num_families, 345, 567, 222, table); - TestScan(num_families, 456, 678, 222, table); - - // single results - TestScan(num_families, 111, 111, 1, table); // split keys are like 111, 222, 333, etc - TestScan(num_families, 111, 112, 1, table); - TestScan(num_families, 332, 332, 1, table); - TestScan(num_families, 332, 333, 1, table); - TestScan(num_families, 333, 333, 1, table); - TestScan(num_families, 333, 334, 1, table); - TestScan(num_families, 42, 42, 1, table); - TestScan(num_families, 921, 921, 1, table); - TestScan(num_families, 0, 0, 1, table); - TestScan(num_families, 0, 1, 1, table); - TestScan(num_families, 999, 999, 1, table); - - // few results - TestScan(num_families, 0, 0, 1, table); - TestScan(num_families, 0, 2, 2, table); - TestScan(num_families, 0, 5, 5, table); - TestScan(num_families, 10, 15, 5, table); - TestScan(num_families, 105, 115, 10, table); - TestScan(num_families, 111, 221, 110, table); - TestScan(num_families, 111, 222, 111, table); // crossing region boundary 111-222 - TestScan(num_families, 111, 223, 112, table); - TestScan(num_families, 111, 224, 113, table); - TestScan(num_families, 990, 999, 9, table); - TestScan(num_families, 900, 998, 98, table); - - // empty results - TestScan(num_families, "a", "a", 0, table); - TestScan(num_families, "a", "r", 0, table); - TestScan(num_families, "", "r", 0, table); - TestScan(num_families, "s", "", 0, table); - TestScan(num_families, "s", "z", 0, table); - TestScan(num_families, Row(110) + "a", Row(111), 0, table); - TestScan(num_families, Row(111) + "a", Row(112), 0, table); - TestScan(num_families, Row(123) + "a", Row(124), 0, table); - - // custom - TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table); - TestScan(num_families, Row(0, 3), Row(0, 4), 1, table); - TestScan(num_families, Row(999, 3), Row(9999, 4), 1, table); - TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table); - TestScan(num_families, Row(0, 3), Row(9999, 4), 1000, table); - TestScan(num_families, "a", "z", 1000, table); -} - -// some of these tests are from TestAsyncTableScan* and some from TestFromClientSide* and -// TestScannersFromClientSide* - -TEST_F(ScannerTest, SingleRegionScan) { - auto client = CreateTableAndWriteData("t_single_region_scan", 1, num_rows, 1); - auto table = client->Table(folly::to<hbase::pb::TableName>("t_single_region_scan")); - - TestScanCombinations(table.get(), 1); -} - -TEST_F(ScannerTest, MultiRegionScan) { - auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 10); - auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan")); - - TestScanCombinations(table.get(), 1); -} - -TEST_F(ScannerTest, ScanWithPauses) { - auto max_result_size = - ScannerTest::test_util->conf()->GetInt("hbase.client.scanner.max.result.size", 2097152); - ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", 100); - auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 5); - auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan")); - - VLOG(1) << "Starting scan for the test"; - Scan scan{}; - scan.SetCaching(100); - auto scanner = table->Scan(scan); - - uint32_t i = 0; - auto r = scanner->Next(); - while (r != nullptr) { - CheckResult(*r, Row(i++), 1); - r = scanner->Next(); - std::this_thread::sleep_for(TimeUtil::MillisToNanos(10)); - } - - auto s = static_cast<AsyncTableResultScanner *>(scanner.get()); - ASSERT_GT(s->num_prefetch_stopped(), 0); - - ASSERT_EQ(i, num_rows); - ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", max_result_size); -} - -TEST_F(ScannerTest, ScanWithFilters) { - auto client = CreateTableAndWriteData("t_scan_with_filters", 1, num_rows, 1); - auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_with_filters")); - - Scan scan{}; - scan.SetFilter(FilterFactory::ValueFilter(CompareType::GREATER_OR_EQUAL, - *ComparatorFactory::BinaryComparator(Row(800)))); - - TestScan(scan, 800, 200, table.get()); -} - -TEST_F(ScannerTest, ScanMultiFamily) { - auto client = CreateTableAndWriteData("t_scan_multi_family", 3, num_rows, 1); - auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_multi_family")); - - TestScanCombinations(table.get(), 3); -} - -TEST_F(ScannerTest, ScanNullQualifier) { - std::string table_name{"t_scan_null_qualifier"}; - std::string row{"row"}; - CreateTable(table_name, 1, 1, 1); - - auto tn = folly::to<hbase::pb::TableName>(table_name); - auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf()); - auto table = client->Table(tn); - - // Perform Puts - Put put{row}; - put.AddColumn(Family(0), "q1", row); - put.AddColumn(Family(0), "", row); - table->Put(put); - - Scan scan1{}; - scan1.AddColumn(Family(0), ""); - auto scanner1 = table->Scan(scan1); - auto r1 = scanner1->Next(); - ASSERT_EQ(r1->Cells().size(), 1); - ASSERT_EQ(scanner1->Next(), nullptr); - - Scan scan2{}; - scan2.AddFamily(Family(0)); - auto scanner2 = table->Scan(scan2); - auto r2 = scanner2->Next(); - ASSERT_EQ(r2->Cells().size(), 2); - ASSERT_EQ(scanner2->Next(), nullptr); -} - -TEST_F(ScannerTest, ScanNoResults) { - std::string table_name{"t_scan_no_results"}; - auto client = CreateTableAndWriteData(table_name, 1, num_rows, 3); - auto table = client->Table(folly::to<hbase::pb::TableName>(table_name)); - - Scan scan{}; - scan.AddColumn(Family(0), "non_existing_qualifier"); - - TestScan(scan, 0, 0, table.get()); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/server-request.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/server-request.h b/hbase-native-client/core/server-request.h deleted file mode 100644 index 85df9ed..0000000 --- a/hbase-native-client/core/server-request.h +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <map> -#include <memory> -#include <stdexcept> -#include <string> -#include "core/action.h" -#include "core/region-location.h" -#include "core/region-request.h" - -namespace hbase { - -class ServerRequest { - public: - // Concurrent - using ActionsByRegion = std::map<std::string, std::shared_ptr<RegionRequest>>; - - explicit ServerRequest(std::shared_ptr<RegionLocation> region_location) { - auto region_name = region_location->region_name(); - auto region_request = std::make_shared<RegionRequest>(region_location); - actions_by_region_[region_name] = region_request; - } - ~ServerRequest() {} - - void AddActionsByRegion(std::shared_ptr<RegionLocation> region_location, - std::shared_ptr<Action> action) { - auto region_name = region_location->region_name(); - auto search = actions_by_region_.find(region_name); - if (search == actions_by_region_.end()) { - auto region_request = std::make_shared<RegionRequest>(region_location); - actions_by_region_[region_name] = region_request; - actions_by_region_[region_name]->AddAction(action); - } else { - search->second->AddAction(action); - } - } - - const ActionsByRegion &actions_by_region() const { return actions_by_region_; } - - private: - ActionsByRegion actions_by_region_; -}; -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/simple-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc deleted file mode 100644 index 6730248..0000000 --- a/hbase-native-client/core/simple-client.cc +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <folly/Logging.h> -#include <folly/Random.h> -#include <gflags/gflags.h> - -#include <atomic> -#include <chrono> -#include <iostream> -#include <thread> - -#include "connection/rpc-client.h" -#include "core/client.h" -#include "core/get.h" -#include "core/hbase-configuration-loader.h" -#include "core/put.h" -#include "core/scan.h" -#include "core/table.h" -#include "serde/server-name.h" -#include "serde/table-name.h" -#include "utils/time-util.h" - -using hbase::Client; -using hbase::Configuration; -using hbase::Get; -using hbase::HBaseConfigurationLoader; -using hbase::Scan; -using hbase::Put; -using hbase::Result; -using hbase::Table; -using hbase::pb::TableName; -using hbase::pb::ServerName; -using hbase::TimeUtil; - -DEFINE_string(table, "test_table", "What table to do the reads or writes"); -DEFINE_string(row, "row_", "row prefix"); -DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); -DEFINE_string(conf, "", "Conf directory to read the config from (optional)"); -DEFINE_uint64(num_rows, 10000, "How many rows to write and read"); -DEFINE_uint64(batch_num_rows, 10000, "How many rows batch for multi-gets and multi-puts"); -DEFINE_uint64(report_num_rows, 10000, "How frequent we should report the progress"); -DEFINE_bool(puts, true, "Whether to perform puts"); -DEFINE_bool(gets, true, "Whether to perform gets"); -DEFINE_bool(multigets, true, "Whether to perform multi-gets"); -DEFINE_bool(scans, true, "Whether to perform scans"); -DEFINE_bool(display_results, false, "Whether to display the Results from Gets"); -DEFINE_int32(threads, 6, "How many cpu threads"); - -std::unique_ptr<Put> MakePut(const std::string &row) { - auto put = std::make_unique<Put>(row); - put->AddColumn("f", "q", row); - return std::move(put); -} - -std::string Row(const std::string &prefix, uint64_t i) { - auto suf = folly::to<std::string>(i); - return prefix + suf; -} - -void ValidateResult(const Result &result, const std::string &row) { - CHECK(!result.IsEmpty()); - CHECK_EQ(result.Row(), row); - CHECK_EQ(result.Size(), 1); - CHECK_EQ(result.Value("f", "q").value(), row); -} - -int main(int argc, char *argv[]) { - gflags::SetUsageMessage("Simple client to get a single row from HBase on the comamnd line"); - gflags::ParseCommandLineFlags(&argc, &argv, true); - google::InitGoogleLogging(argv[0]); - google::InstallFailureSignalHandler(); - FLAGS_logtostderr = 1; - FLAGS_stderrthreshold = 1; - - std::shared_ptr<Configuration> conf = nullptr; - if (FLAGS_conf == "") { - // Configuration - conf = std::make_shared<Configuration>(); - conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper); - conf->SetInt("hbase.client.cpu.thread.pool.size", FLAGS_threads); - } else { - setenv("HBASE_CONF", FLAGS_conf.c_str(), 1); - hbase::HBaseConfigurationLoader loader; - conf = std::make_shared<Configuration>(loader.LoadDefaultResources().value()); - } - - auto row = FLAGS_row; - - auto tn = std::make_shared<TableName>(folly::to<TableName>(FLAGS_table)); - auto num_puts = FLAGS_num_rows; - - auto client = std::make_unique<Client>(*conf); - auto table = client->Table(*tn); - - auto start_ns = TimeUtil::GetNowNanos(); - - // Do the Put requests - if (FLAGS_puts) { - LOG(INFO) << "Sending put requests"; - for (uint64_t i = 0; i < num_puts; i++) { - table->Put(*MakePut(Row(FLAGS_row, i))); - if (i != 0 && i % FLAGS_report_num_rows == 0) { - LOG(INFO) << "Sent " << i << " Put requests in " << TimeUtil::ElapsedMillis(start_ns) - << " ms."; - } - } - - LOG(INFO) << "Successfully sent " << num_puts << " Put requests in " - << TimeUtil::ElapsedMillis(start_ns) << " ms."; - } - - // Do the Get requests - if (FLAGS_gets) { - LOG(INFO) << "Sending get requests"; - start_ns = TimeUtil::GetNowNanos(); - for (uint64_t i = 0; i < num_puts; i++) { - auto row = Row(FLAGS_row, i); - auto result = table->Get(Get{row}); - if (FLAGS_display_results) { - LOG(INFO) << result->DebugString(); - } else if (i != 0 && i % FLAGS_report_num_rows == 0) { - LOG(INFO) << "Sent " << i << " Get requests in " << TimeUtil::ElapsedMillis(start_ns) - << " ms."; - } - ValidateResult(*result, row); - } - - LOG(INFO) << "Successfully sent " << num_puts << " Get requests in " - << TimeUtil::ElapsedMillis(start_ns) << " ms."; - } - - // Do the Multi-Gets - if (FLAGS_multigets) { - LOG(INFO) << "Sending multi-get requests"; - start_ns = TimeUtil::GetNowNanos(); - std::vector<hbase::Get> gets; - - for (uint64_t i = 0; i < num_puts;) { - gets.clear(); - // accumulate batch_num_rows at a time - for (uint64_t j = 0; j < FLAGS_batch_num_rows && i < num_puts; ++j) { - hbase::Get get(Row(FLAGS_row, i)); - gets.push_back(get); - i++; - } - auto results = table->Get(gets); - - if (FLAGS_display_results) { - for (const auto &result : results) LOG(INFO) << result->DebugString(); - } else if (i != 0 && i % FLAGS_report_num_rows == 0) { - LOG(INFO) << "Sent " << i << " Multi-Get requests in " << TimeUtil::ElapsedMillis(start_ns) - << " ms."; - } - } - - LOG(INFO) << "Successfully sent " << num_puts << " Multi-Get requests in " - << TimeUtil::ElapsedMillis(start_ns) << " ms."; - } - - // Do the Scan - if (FLAGS_scans) { - LOG(INFO) << "Starting scanner"; - start_ns = TimeUtil::GetNowNanos(); - Scan scan{}; - auto scanner = table->Scan(scan); - - uint64_t i = 0; - auto r = scanner->Next(); - while (r != nullptr) { - if (FLAGS_display_results) { - LOG(INFO) << r->DebugString(); - } - r = scanner->Next(); - i++; - if (!FLAGS_display_results && i != 0 && i % FLAGS_report_num_rows == 0) { - LOG(INFO) << "Scan iterated over " << i << " results " << TimeUtil::ElapsedMillis(start_ns) - << " ms."; - } - } - - LOG(INFO) << "Successfully iterated over " << i << " Scan results in " - << TimeUtil::ElapsedMillis(start_ns) << " ms."; - scanner->Close(); - } - - table->Close(); - client->Close(); - - return 0; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/table.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc deleted file mode 100644 index f93a029..0000000 --- a/hbase-native-client/core/table.cc +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/table.h" - -#include <chrono> -#include <limits> -#include <utility> -#include <vector> - -#include "core/async-connection.h" -#include "core/async-table-result-scanner.h" -#include "core/request-converter.h" -#include "core/response-converter.h" -#include "if/Client.pb.h" -#include "security/user.h" -#include "serde/server-name.h" -#include "utils/time-util.h" - -using hbase::pb::TableName; -using hbase::security::User; -using std::chrono::milliseconds; - -namespace hbase { - -Table::Table(const TableName &table_name, std::shared_ptr<AsyncConnection> async_connection) - : table_name_(std::make_shared<TableName>(table_name)), - async_connection_(async_connection), - conf_(async_connection->conf()) { - async_table_ = std::make_unique<RawAsyncTable>(table_name_, async_connection); -} - -Table::~Table() {} - -std::shared_ptr<hbase::Result> Table::Get(const hbase::Get &get) { - auto context = async_table_->Get(get); - return context.get(operation_timeout()); -} - -std::shared_ptr<ResultScanner> Table::Scan(const hbase::Scan &scan) { - auto max_cache_size = ResultSize2CacheSize( - scan.MaxResultSize() > 0 ? scan.MaxResultSize() - : async_connection_->connection_conf()->scanner_max_result_size()); - auto scanner = std::make_shared<AsyncTableResultScanner>(max_cache_size); - async_table_->Scan(scan, scanner); - return scanner; -} - -int64_t Table::ResultSize2CacheSize(int64_t max_results_size) const { - // * 2 if possible - return max_results_size > (std::numeric_limits<int64_t>::max() / 2) ? max_results_size - : max_results_size * 2; -} - -void Table::Put(const hbase::Put &put) { - auto future = async_table_->Put(put); - future.get(operation_timeout()); -} - -bool Table::CheckAndPut(const std::string &row, const std::string &family, - const std::string &qualifier, const std::string &value, - const hbase::Put &put, const pb::CompareType &compare_op) { - auto context = async_table_->CheckAndPut(row, family, qualifier, value, put, compare_op); - return context.get(operation_timeout()); -} - -bool Table::CheckAndDelete(const std::string &row, const std::string &family, - const std::string &qualifier, const std::string &value, - const hbase::Delete &del, const pb::CompareType &compare_op) { - auto context = async_table_->CheckAndDelete(row, family, qualifier, value, del, compare_op); - return context.get(operation_timeout()); -} - -void Table::Delete(const hbase::Delete &del) { - auto future = async_table_->Delete(del); - future.get(operation_timeout()); -} - -std::shared_ptr<hbase::Result> Table::Increment(const hbase::Increment &increment) { - auto context = async_table_->Increment(increment); - return context.get(operation_timeout()); -} - -std::shared_ptr<hbase::Result> Table::Append(const hbase::Append &append) { - auto context = async_table_->Append(append); - return context.get(operation_timeout()); -} - -milliseconds Table::operation_timeout() const { - return TimeUtil::ToMillis(async_connection_->connection_conf()->operation_timeout()); -} - -void Table::Close() { async_table_->Close(); } - -std::shared_ptr<RegionLocation> Table::GetRegionLocation(const std::string &row) { - return async_connection_->region_locator()->LocateRegion(*table_name_, row).get(); -} - -std::vector<std::shared_ptr<hbase::Result>> Table::Get(const std::vector<hbase::Get> &gets) { - auto tresults = async_table_->Get(gets).get(operation_timeout()); - std::vector<std::shared_ptr<hbase::Result>> results{}; - uint32_t num = 0; - for (auto tresult : tresults) { - if (tresult.hasValue()) { - results.push_back(tresult.value()); - } else if (tresult.hasException()) { - LOG(ERROR) << "Caught exception:- " << tresult.exception().what() << " for " - << gets[num++].row(); - throw tresult.exception(); - } - } - return results; -} - -void Table::Put(const std::vector<hbase::Put> &puts) { - auto tresults = async_table_->Put(puts).get(operation_timeout()); - uint32_t num = 0; - for (auto tresult : tresults) { - if (tresult.hasException()) { - LOG(ERROR) << "Caught exception:- " << tresult.exception().what() << " for " - << puts[num++].row(); - throw tresult.exception(); - } - } - return; -} - -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/table.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h deleted file mode 100644 index 6340494..0000000 --- a/hbase-native-client/core/table.h +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <chrono> -#include <memory> -#include <string> -#include <vector> - -#include "connection/rpc-client.h" -#include "core/async-connection.h" -#include "core/client.h" -#include "core/configuration.h" -#include "core/get.h" -#include "core/location-cache.h" -#include "core/put.h" -#include "core/raw-async-table.h" -#include "core/result-scanner.h" -#include "core/result.h" -#include "serde/table-name.h" - -namespace hbase { - -class Client; - -class Table { - public: - /** - * Constructors - */ - Table(const pb::TableName &table_name, std::shared_ptr<AsyncConnection> async_connection); - ~Table(); - - /** - * @brief - Returns a Result object for the constructed Get. - * @param - get Get object to perform HBase Get operation. - */ - std::shared_ptr<hbase::Result> Get(const hbase::Get &get); - - std::vector<std::shared_ptr<hbase::Result>> Get(const std::vector<hbase::Get> &gets); - - /** - * @brief - Puts some data in the table. - * @param - put Put object to perform HBase Put operation. - */ - void Put(const hbase::Put &put); - - /** - * Atomically checks if a row/family/qualifier value matches the expected - * value. If it does, it adds the put. If the passed value is null, the check - * is for the lack of column (ie: non-existance) - * - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param value the expected value - * @param put data to put if check succeeds - * @param compare_op comparison operator to use - * @throws IOException e - * @return true if the new put was executed, false otherwise - */ - bool CheckAndPut(const std::string &row, const std::string &family, const std::string &qualifier, - const std::string &value, const hbase::Put &put, - const pb::CompareType &compare_op = pb::CompareType::EQUAL); - /** - * @brief - Deletes some data in the table. - * @param - del Delete object to perform HBase Delete operation. - */ - void Delete(const hbase::Delete &del); - - /** - * Atomically checks if a row/family/qualifier value matches the expected - * value. If it does, it adds the delete. If the passed value is null, the - * check is for the lack of column (ie: non-existence) - * - * The expected value argument of this call is on the left and the current - * value of the cell is on the right side of the comparison operator. - * - * Ie. eg. GREATER operator means expected value > existing <=> add the delete. - * - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param compare_op comparison operator to use - * @param value the expected value - * @param del data to delete if check succeeds - * @return true if the new delete was executed, false otherwise - */ - bool CheckAndDelete(const std::string &row, const std::string &family, - const std::string &qualifier, const std::string &value, - const hbase::Delete &del, - const pb::CompareType &compare_op = pb::CompareType::EQUAL); - - /** - * @brief - Increments some data in the table. - * @param - increment Increment object to perform HBase Increment operation. - */ - std::shared_ptr<hbase::Result> Increment(const hbase::Increment &increment); - - /** - * @brief - Appends some data in the table. - * @param - append Append object to perform HBase Append operation. - */ - std::shared_ptr<hbase::Result> Append(const hbase::Append &append); - - std::shared_ptr<ResultScanner> Scan(const hbase::Scan &scan); - - /** - * @brief - Multi Puts. - * @param - puts vector of hbase::Put. - */ - void Put(const std::vector<hbase::Put> &puts); - /** - * @brief - Close the client connection. - */ - void Close(); - - /** - * @brief - Get region location for a row in current table. - */ - std::shared_ptr<RegionLocation> GetRegionLocation(const std::string &row); - - private: - std::shared_ptr<pb::TableName> table_name_; - std::shared_ptr<AsyncConnection> async_connection_; - std::shared_ptr<hbase::Configuration> conf_; - std::unique_ptr<RawAsyncTable> async_table_; - - private: - std::chrono::milliseconds operation_timeout() const; - - int64_t ResultSize2CacheSize(int64_t max_results_size) const; -}; -} /* namespace hbase */