Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 58ec20cac -> 8335e1529


HBASE-18174 Implement Table#checkAndPut()


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8335e152
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8335e152
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8335e152

Branch: refs/heads/HBASE-14850
Commit: 8335e1529c7626279f58fb283975b5b45c430335
Parents: 58ec20c
Author: tedyu <yuzhih...@gmail.com>
Authored: Fri Jun 9 17:19:57 2017 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Fri Jun 9 17:19:57 2017 -0700

----------------------------------------------------------------------
 .../core/async-rpc-retrying-caller.cc           |  1 +
 hbase-native-client/core/client-test.cc         | 29 ++++++++++++++++++++
 hbase-native-client/core/raw-async-table.cc     | 26 ++++++++++++++++++
 hbase-native-client/core/raw-async-table.h      |  5 ++++
 hbase-native-client/core/request-converter.cc   | 21 ++++++++++++++
 hbase-native-client/core/request-converter.h    |  5 ++++
 hbase-native-client/core/response-converter.cc  |  5 ++++
 hbase-native-client/core/response-converter.h   |  2 ++
 hbase-native-client/core/table.cc               |  7 +++++
 hbase-native-client/core/table.h                | 17 ++++++++++++
 10 files changed, 118 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/async-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc 
b/hbase-native-client/core/async-rpc-retrying-caller.cc
index b9d01bb..0302ad3 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -216,4 +216,5 @@ class OpenScannerResponse;
 template class 
AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<hbase::Result>>;
 template class AsyncSingleRequestRpcRetryingCaller<folly::Unit>;
 template class 
AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<OpenScannerResponse>>;
+template class AsyncSingleRequestRpcRetryingCaller<bool>;
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/client-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client-test.cc 
b/hbase-native-client/core/client-test.cc
index 4972e05..ed413e8 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -247,6 +247,35 @@ TEST_F(ClientTest, Increment) {
   EXPECT_EQ(incr1 + incr2, hbase::BytesUtil::ToInt64(*(result->Value("d", 
"1"))));
 }
 
+TEST_F(ClientTest, CheckAndPut) {
+  // Using TestUtil to populate test data
+  ClientTest::test_util->CreateTable("check", "d");
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>("check");
+  auto row = "test1";
+
+  // Create a client
+  hbase::Client client(*ClientTest::test_util->conf());
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  // Perform Puts
+  table->Put(Put{row}.AddColumn("d", "1", "value1"));
+  auto result = table->CheckAndPut(row, "d", "1", "value1", 
Put{row}.AddColumn("d", "1", "value2"));
+  ASSERT_TRUE(result) << "CheckAndPut didn't replace value";
+
+  result = table->CheckAndPut(row, "d", "1", "value1", Put{row}.AddColumn("d", 
"1", "value3"));
+
+  // Perform the Get
+  hbase::Get get(row);
+  auto result1 = table->Get(get);
+  EXPECT_EQ("value2", *(result1->Value("d", "1")));
+  ASSERT_FALSE(result) << "CheckAndPut shouldn't replace value";
+}
+
 TEST_F(ClientTest, PutGet) {
   // Using TestUtil to populate test data
   ClientTest::test_util->CreateTable("t", "d");

http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/raw-async-table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.cc 
b/hbase-native-client/core/raw-async-table.cc
index 413dc6c..46d9dfd 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -111,6 +111,32 @@ folly::Future<folly::Unit> RawAsyncTable::Put(const 
hbase::Put& put) {
   return caller->Call().then([caller](const auto r) { return r; });
 }
 
+folly::Future<bool> RawAsyncTable::CheckAndPut(const std::string& row, const 
std::string& family,
+                                               const std::string& qualifier,
+                                               const std::string& value, const 
hbase::Put& put,
+                                               const pb::CompareType& 
compare_op) {
+  auto caller =
+      CreateCallerBuilder<bool>(row, connection_conf_->write_rpc_timeout())
+          ->action([=, &put](std::shared_ptr<hbase::HBaseRpcController> 
controller,
+                             std::shared_ptr<hbase::RegionLocation> loc,
+                             std::shared_ptr<hbase::RpcClient> rpc_client) -> 
folly::Future<bool> {
+            return Call<hbase::Put, hbase::Request, hbase::Response, bool>(
+                rpc_client, controller, loc, put,
+                // request conversion
+                [=, &put](const hbase::Put& put,
+                          const std::string& region_name) -> 
std::unique_ptr<Request> {
+                  auto checkReq = RequestConverter::CheckAndPutToMutateRequest(
+                      row, family, qualifier, value, compare_op, put, 
region_name);
+                  return checkReq;
+                },
+                // response conversion
+                &ResponseConverter::BoolFromMutateResponse);
+          })
+          ->Build();
+
+  return caller->Call().then([caller](const auto r) { return r; });
+}
+
 folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) {
   auto caller =
       CreateCallerBuilder<folly::Unit>(del.row(), 
connection_conf_->write_rpc_timeout())

http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/raw-async-table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.h 
b/hbase-native-client/core/raw-async-table.h
index 6088d1b..068f230 100644
--- a/hbase-native-client/core/raw-async-table.h
+++ b/hbase-native-client/core/raw-async-table.h
@@ -67,6 +67,11 @@ class RawAsyncTable {
 
   folly::Future<folly::Unit> Put(const hbase::Put& put);
 
+  folly::Future<bool> CheckAndPut(const std::string& row, const std::string& 
family,
+                                  const std::string& qualifier, const 
std::string& value,
+                                  const hbase::Put& put,
+                                  const pb::CompareType& compare_op = 
pb::CompareType::EQUAL);
+
   void Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> 
consumer);
 
   void Close() {}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/request-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.cc 
b/hbase-native-client/core/request-converter.cc
index 042d5b1..0cd9c7c 100644
--- a/hbase-native-client/core/request-converter.cc
+++ b/hbase-native-client/core/request-converter.cc
@@ -276,6 +276,27 @@ std::unique_ptr<Request> 
RequestConverter::ToMutateRequest(const Put &put,
   return pb_req;
 }
 
+std::unique_ptr<Request> RequestConverter::CheckAndPutToMutateRequest(
+    const std::string &row, const std::string &family, const std::string 
&qualifier,
+    const std::string &value, const pb::CompareType compare_op, const 
hbase::Put &put,
+    const std::string &region_name) {
+  auto pb_req = Request::mutate();
+  auto pb_msg = 
std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
+
+  pb_msg->set_allocated_mutation(
+      ToMutation(MutationType::MutationProto_MutationType_PUT, put, 
-1).release());
+  ::hbase::pb::Condition *cond = pb_msg->mutable_condition();
+  cond->set_row(row);
+  cond->set_family(family);
+  cond->set_qualifier(qualifier);
+  cond->set_allocated_comparator(
+      
Comparator::ToProto(*(ComparatorFactory::BinaryComparator(value).get())).release());
+  cond->set_compare_type(compare_op);
+
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+  return pb_req;
+}
+
 std::unique_ptr<Request> RequestConverter::DeleteToMutateRequest(const Delete 
&del,
                                                                  const 
std::string &region_name) {
   auto pb_req = Request::mutate();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/request-converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.h 
b/hbase-native-client/core/request-converter.h
index 51440d8..0755f42 100644
--- a/hbase-native-client/core/request-converter.h
+++ b/hbase-native-client/core/request-converter.h
@@ -84,6 +84,11 @@ class RequestConverter {
 
   static std::unique_ptr<Request> ToMutateRequest(const Put &put, const 
std::string &region_name);
 
+  static std::unique_ptr<Request> CheckAndPutToMutateRequest(
+      const std::string &row, const std::string &family, const std::string 
&qualifier,
+      const std::string &value, const pb::CompareType compare_op, const 
hbase::Put &put,
+      const std::string &region_name);
+
   static std::unique_ptr<Request> IncrementToMutateRequest(const Increment 
&incr,
                                                            const std::string 
&region_name);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/response-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.cc 
b/hbase-native-client/core/response-converter.cc
index d2d719b..9bc4892 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -53,6 +53,11 @@ std::shared_ptr<Result> 
ResponseConverter::FromMutateResponse(const Response& re
   return ToResult(mutate_resp->result(), resp.cell_scanner());
 }
 
+bool ResponseConverter::BoolFromMutateResponse(const Response& resp) {
+  auto mutate_resp = std::static_pointer_cast<MutateResponse>(resp.resp_msg());
+  return mutate_resp->processed();
+}
+
 std::shared_ptr<Result> ResponseConverter::ToResult(
     const hbase::pb::Result& result, const std::shared_ptr<CellScanner> 
cell_scanner) {
   std::vector<std::shared_ptr<Cell>> vcells;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/response-converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.h 
b/hbase-native-client/core/response-converter.h
index b518d1c..2f8f279 100644
--- a/hbase-native-client/core/response-converter.h
+++ b/hbase-native-client/core/response-converter.h
@@ -49,6 +49,8 @@ class ResponseConverter {
 
   static std::shared_ptr<hbase::Result> FromMutateResponse(const Response& 
resp);
 
+  static bool BoolFromMutateResponse(const Response& resp);
+
   static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response& 
resp);
 
   static std::vector<std::shared_ptr<Result>> FromScanResponse(

http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.cc 
b/hbase-native-client/core/table.cc
index bf22169..f20078d 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -73,6 +73,13 @@ void Table::Put(const hbase::Put &put) {
   future.get(operation_timeout());
 }
 
+bool Table::CheckAndPut(const std::string &row, const std::string &family,
+                        const std::string &qualifier, const std::string &value,
+                        const hbase::Put &put, const pb::CompareType 
&compare_op) {
+  auto context = async_table_->CheckAndPut(row, family, qualifier, value, put, 
compare_op);
+  return context.get(operation_timeout());
+}
+
 void Table::Delete(const hbase::Delete &del) {
   auto future = async_table_->Delete(del);
   future.get(operation_timeout());

http://git-wip-us.apache.org/repos/asf/hbase/blob/8335e152/hbase-native-client/core/table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index 781c6f1..3c486af 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -63,6 +63,23 @@ class Table {
   void Put(const hbase::Put &put);
 
   /**
+   * Atomically checks if a row/family/qualifier value matches the expected
+   * value. If it does, it adds the put.  If the passed value is null, the 
check
+   * is for the lack of column (ie: non-existance)
+   *
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param value the expected value
+   * @param put data to put if check succeeds
+   * @param compare_op comparison operator to use
+   * @throws IOException e
+   * @return true if the new put was executed, false otherwise
+   */
+  bool CheckAndPut(const std::string &row, const std::string &family, const 
std::string &qualifier,
+                   const std::string &value, const hbase::Put &put,
+                   const pb::CompareType &compare_op = pb::CompareType::EQUAL);
+  /**
    * @brief - Deletes some data in the table.
    * @param - del Delete object to perform HBase Delete operation.
    */

Reply via email to