Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 727d9b798 -> 05b59e8d4


HBASE-18578 [C++] Add pause for RPC test

Signed-off-by: Enis Soztutar <e...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/05b59e8d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/05b59e8d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/05b59e8d

Branch: refs/heads/HBASE-14850
Commit: 05b59e8d4eb819eb1f9f192c7d72e74e39088e16
Parents: 727d9b7
Author: Xiaobing Zhou <xz...@hortonworks.com>
Authored: Tue Aug 22 12:01:21 2017 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Tue Aug 22 12:02:39 2017 -0700

----------------------------------------------------------------------
 .../connection/rpc-test-server.cc               |  9 ++-
 hbase-native-client/connection/rpc-test.cc      | 61 ++++++++++++++++----
 2 files changed, 59 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/05b59e8d/hbase-native-client/connection/rpc-test-server.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server.cc 
b/hbase-native-client/connection/rpc-test-server.cc
index 6132fbb..707bca7 100644
--- a/hbase-native-client/connection/rpc-test-server.cc
+++ b/hbase-native-client/connection/rpc-test-server.cc
@@ -88,7 +88,14 @@ Future<std::unique_ptr<Response>> 
RpcTestService::operator()(std::unique_ptr<Req
     
response->set_exception(folly::make_exception_wrapper<RpcTestException>("server 
error!"));
 
   } else if (method_name == "pause") {
-    // TODO:
+    auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
+    /* sleeping */
+    auto pb_req_msg = 
std::static_pointer_cast<PauseRequestProto>(request->req_msg());
+    std::this_thread::sleep_for(std::chrono::milliseconds(pb_req_msg->ms()));
+    response->set_resp_msg(pb_resp_msg);
+    VLOG(1) << "RPC server:"
+            << " pause called, " << pb_req_msg->ms() << " ms";
+
   } else if (method_name == "addr") {
     // TODO:
   } else if (method_name == "socketNotOpen") {

http://git-wip-us.apache.org/repos/asf/hbase/blob/05b59e8d/hbase-native-client/connection/rpc-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test.cc 
b/hbase-native-client/connection/rpc-test.cc
index 4688950..d541397 100644
--- a/hbase-native-client/connection/rpc-test.cc
+++ b/hbase-native-client/connection/rpc-test.cc
@@ -30,6 +30,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 #include <boost/thread.hpp>
+#include <chrono>
 
 #include "connection/rpc-client.h"
 #include "exceptions/exception.h"
@@ -41,11 +42,14 @@
 using namespace wangle;
 using namespace folly;
 using namespace hbase;
+using namespace std::chrono;
 
 DEFINE_int32(port, 0, "test server port");
 DEFINE_string(result_format, "RPC {} returned: {}.", "output format of RPC 
result");
-DEFINE_string(fail_format, "Shouldn't get here, exception is expected for RPC 
{}.",
-              "output format of enforcing fail");
+DEFINE_string(fail_ex_format, "Shouldn't get here, exception is expected for 
RPC {}.",
+              "output format of enforcing fail with exception");
+DEFINE_string(fail_no_ex_format, "Shouldn't get here, exception is not 
expected for RPC {}.",
+              "output format of enforcing fail without exception");
 typedef ServerBootstrap<RpcTestServerSerializePipeline> ServerTestBootstrap;
 typedef std::shared_ptr<ServerTestBootstrap> ServerPtr;
 
@@ -110,8 +114,8 @@ TEST_F(RpcTest, Ping) {
         VLOG(1) << folly::sformat(FLAGS_result_format, method, "");
       })
       .onError([&](const folly::exception_wrapper& ew) {
-        FAIL() << folly::sformat(FLAGS_fail_format, method);
-      });
+        FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method);
+      }).get();
 
   server->stop();
   server->join();
@@ -144,8 +148,8 @@ TEST_F(RpcTest, Echo) {
         EXPECT_EQ(greetings, pb_resp->message());
       })
       .onError([&](const folly::exception_wrapper& ew) {
-        FAIL() << folly::sformat(FLAGS_fail_format, method);
-      });
+        FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method);
+      }).get();
 
   server->stop();
   server->join();
@@ -168,7 +172,7 @@ TEST_F(RpcTest, Error) {
       ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), 
std::move(request),
                   hbase::security::User::defaultUser())
       .then([&](std::unique_ptr<Response> response) {
-        FAIL() << folly::sformat(FLAGS_fail_format, method);
+        FAIL() << folly::sformat(FLAGS_fail_ex_format, method);
       })
       .onError([&](const folly::exception_wrapper& ew) {
         VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
@@ -184,7 +188,7 @@ TEST_F(RpcTest, Error) {
           EXPECT_EQ(kRpcTestException, e.exception_class_name());
           EXPECT_EQ(kRpcTestException + ": server error!", e.stack_trace());
         }));
-      });
+      }).get();
 
   server->stop();
   server->join();
@@ -208,7 +212,7 @@ TEST_F(RpcTest, SocketNotOpen) {
       ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), 
std::move(request),
                   hbase::security::User::defaultUser())
       .then([&](std::unique_ptr<Response> response) {
-        FAIL() << folly::sformat(FLAGS_fail_format, method);
+        FAIL() << folly::sformat(FLAGS_fail_ex_format, method);
       })
       .onError([&](const folly::exception_wrapper& ew) {
         VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
@@ -231,5 +235,42 @@ TEST_F(RpcTest, SocketNotOpen) {
             EXPECT_EQ(111 /*ECONNREFUSED*/, ase.getErrno());
           });
         }));
-      });
+      }).get();
+}
+
+/**
+ * test pause
+ */
+TEST_F(RpcTest, Pause) {
+  int ms = 500;
+
+  auto conf = CreateConf();
+  auto server = CreateRpcServer();
+  auto server_addr = GetRpcServerAddress(server);
+  auto client =
+      CreateRpcClient(conf, 
std::chrono::duration_cast<nanoseconds>(milliseconds(2 * ms)));
+
+  auto method = "pause";
+  auto request = 
std::make_unique<Request>(std::make_shared<PauseRequestProto>(),
+                                           
std::make_shared<EmptyResponseProto>(), method);
+  auto pb_msg = 
std::static_pointer_cast<PauseRequestProto>(request->req_msg());
+
+  pb_msg->set_ms(ms);
+
+  /* sending out request */
+  client
+      ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), 
std::move(request),
+                  hbase::security::User::defaultUser())
+      .then([&](std::unique_ptr<Response> response) {
+        auto pb_resp = 
std::static_pointer_cast<EmptyResponseProto>(response->resp_msg());
+        EXPECT_TRUE(pb_resp != nullptr);
+        VLOG(1) << folly::sformat(FLAGS_result_format, method, "");
+      })
+      .onError([&](const folly::exception_wrapper& ew) {
+        VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what());
+        FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method);
+      }).get();
+
+  server->stop();
+  server->join();
 }

Reply via email to