HBASE-15687 Allow decoding more than GetResponse from the server

Summary:
We'll need more than get's for the client to be usable. So now the
Request class contains the protobufs needed to encode and decode
rpc's.

I also added some helper methods to create initial requests.

Test Plan: It compiles and still gets data from HBase meta

Differential Revision: https://reviews.facebook.net/D57327


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5a222062
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5a222062
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5a222062

Branch: refs/heads/HBASE-14850
Commit: 5a222062859d416f2dc5d55f56bce47fd8a058d9
Parents: 14e2166
Author: Elliott Clark <ecl...@apache.org>
Authored: Wed Apr 27 15:27:09 2016 -0700
Committer: Elliott Clark <ecl...@apache.org>
Committed: Fri May 13 14:36:28 2016 -0700

----------------------------------------------------------------------
 hbase-native-client/connection/BUCK             |  1 +
 .../connection/client-dispatcher.cc             |  6 +--
 .../connection/client-dispatcher.h              |  6 +--
 .../connection/client-handler.cc                | 30 ++++++++++---
 hbase-native-client/connection/client-handler.h | 18 ++++++--
 .../connection/connection-factory.cc            |  6 +--
 .../connection/connection-factory.h             |  2 +-
 hbase-native-client/connection/pipeline.h       |  3 +-
 hbase-native-client/connection/request.cc       | 45 ++++++++++++++++++++
 hbase-native-client/connection/request.h        | 25 +++++++++--
 hbase-native-client/core/simple-client.cc       | 13 +++---
 11 files changed, 123 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5a222062/hbase-native-client/connection/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/BUCK 
b/hbase-native-client/connection/BUCK
index 5067708..d393885 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -32,6 +32,7 @@ cxx_library(name="connection",
                 "client-handler.cc",
                 "connection-factory.cc",
                 "pipeline.cc",
+                "request.cc",
             ],
             deps=[
                 "//if:if",

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a222062/hbase-native-client/connection/client-dispatcher.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-dispatcher.cc 
b/hbase-native-client/connection/client-dispatcher.cc
index 25cff7d..eea0a17 100644
--- a/hbase-native-client/connection/client-dispatcher.cc
+++ b/hbase-native-client/connection/client-dispatcher.cc
@@ -35,16 +35,16 @@ void ClientDispatcher::read(Context *ctx, Response in) {
   p.setValue(in);
 }
 
-Future<Response> ClientDispatcher::operator()(Request arg) {
+Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) {
   auto call_id = ++current_call_id_;
 
-  arg.set_call_id(call_id);
+  arg->set_call_id(call_id);
   auto &p = requests_[call_id];
   auto f = p.getFuture();
   p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) {
     this->requests_.erase(call_id);
   });
-  this->pipeline_->write(arg);
+  this->pipeline_->write(std::move(arg));
 
   return f;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a222062/hbase-native-client/connection/client-dispatcher.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-dispatcher.h 
b/hbase-native-client/connection/client-dispatcher.h
index 89c7119..877e877 100644
--- a/hbase-native-client/connection/client-dispatcher.h
+++ b/hbase-native-client/connection/client-dispatcher.h
@@ -27,11 +27,11 @@
 
 namespace hbase {
 class ClientDispatcher
-    : public wangle::ClientDispatcherBase<SerializePipeline, Request,
-                                          Response> {
+    : public wangle::ClientDispatcherBase<SerializePipeline,
+                                          std::unique_ptr<Request>, Response> {
 public:
   void read(Context *ctx, Response in) override;
-  folly::Future<Response> operator()(Request arg) override;
+  folly::Future<Response> operator()(std::unique_ptr<Request> arg) override;
   folly::Future<folly::Unit> close(Context *ctx) override;
   folly::Future<folly::Unit> close() override;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a222062/hbase-native-client/connection/client-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.cc 
b/hbase-native-client/connection/client-handler.cc
index 205993a7..abcf5c1 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -47,14 +47,30 @@ void ClientHandler::read(Context *ctx, 
std::unique_ptr<IOBuf> buf) {
     LOG(INFO) << "Read ResponseHeader size=" << used_bytes
               << " call_id=" << header.call_id()
               << " has_exception=" << header.has_exception();
+
+    // Get the response protobuf from the map
+    auto search = resp_msgs_.find(header.call_id());
+    // It's an error if it's not there.
+    CHECK(search != resp_msgs_.end());
+    auto resp_msg = search->second;
+    CHECK(resp_msg != nullptr);
+
+    // Make sure we don't leak the protobuf
+    resp_msgs_.erase(search);
+
+    // set the call_id.
+    // This will be used to by the dispatcher to match up
+    // the promise with the response.
     received.set_call_id(header.call_id());
 
+    // If there was an exception then there's no
+    // data left on the wire.
     if (header.has_exception() == false) {
       buf->trimStart(used_bytes);
-      // For now assume that everything was a get.
-      // We'll need to set this up later.
-      received.set_response(std::make_shared<GetResponse>());
-      used_bytes = deser_.parse_delimited(buf.get(), 
received.response().get());
+      used_bytes = deser_.parse_delimited(buf.get(), resp_msg.get());
+      // Make sure that bytes were parsed.
+      CHECK(used_bytes == buf->length());
+      received.set_response(resp_msg);
     }
     ctx->fireRead(std::move(received));
   }
@@ -62,7 +78,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> 
buf) {
 
 // TODO(eclark): Figure out how to handle the
 // network errors that are going to come.
-Future<Unit> ClientHandler::write(Context *ctx, Request r) {
+Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
   // Keep track of if we have sent the header.
   if (UNLIKELY(need_send_header_)) {
     need_send_header_ = false;
@@ -78,5 +94,7 @@ Future<Unit> ClientHandler::write(Context *ctx, Request r) {
     ctx->fireWrite(std::move(pre));
   }
 
-  return ctx->fireWrite(ser_.request(r.call_id(), r.method(), r.msg()));
+  resp_msgs_[r->call_id()] = r->resp_msg();
+  return ctx->fireWrite(
+      ser_.request(r->call_id(), r->method(), r->req_msg().get()));
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a222062/hbase-native-client/connection/client-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.h 
b/hbase-native-client/connection/client-handler.h
index dbaf5a0..41bb883 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -30,20 +30,30 @@ namespace hbase {
 class Request;
 class Response;
 }
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
 
 namespace hbase {
-class ClientHandler
-    : public wangle::Handler<std::unique_ptr<folly::IOBuf>, Response, Request,
-                             std::unique_ptr<folly::IOBuf>> {
+class ClientHandler : public wangle::Handler<std::unique_ptr<folly::IOBuf>,
+                                             Response, 
std::unique_ptr<Request>,
+                                             std::unique_ptr<folly::IOBuf>> {
 public:
   ClientHandler(std::string user_name);
   void read(Context *ctx, std::unique_ptr<folly::IOBuf> msg) override;
-  folly::Future<folly::Unit> write(Context *ctx, Request r) override;
+  folly::Future<folly::Unit> write(Context *ctx,
+                                   std::unique_ptr<Request> r) override;
 
 private:
   bool need_send_header_ = true;
   std::string user_name_;
   ClientSerializer ser_;
   ClientDeserializer deser_;
+
+  // in flight requests
+  std::unordered_map<uint32_t, std::shared_ptr<google::protobuf::Message>>
+      resp_msgs_;
 };
 } // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a222062/hbase-native-client/connection/connection-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.cc 
b/hbase-native-client/connection/connection-factory.cc
index 5d1b0da..7073f9d 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -45,14 +45,14 @@ ConnectionFactory::ConnectionFactory() {
   bootstrap_.pipelineFactory(std::make_shared<RpcPipelineFactory>());
 }
 
-std::shared_ptr<Service<Request, Response>>
+std::shared_ptr<Service<std::unique_ptr<Request>, Response>>
 ConnectionFactory::make_connection(std::string host, int port) {
   // Connect to a given server
   // Then when connected create a ClientDispactcher.
   auto pipeline = bootstrap_.connect(SocketAddress(host, port, true)).get();
   auto dispatcher = std::make_shared<ClientDispatcher>();
   dispatcher->setPipeline(pipeline);
-  auto service =
-      std::make_shared<CloseOnReleaseFilter<Request, Response>>(dispatcher);
+  auto service = std::make_shared<
+      CloseOnReleaseFilter<std::unique_ptr<Request>, Response>>(dispatcher);
   return service;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a222062/hbase-native-client/connection/connection-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.h 
b/hbase-native-client/connection/connection-factory.h
index 73ac032..8d1d2f0 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -31,7 +31,7 @@ namespace hbase {
 class ConnectionFactory {
 public:
   ConnectionFactory();
-  std::shared_ptr<wangle::Service<Request, Response>>
+  std::shared_ptr<wangle::Service<std::unique_ptr<Request>, Response>>
   make_connection(std::string host, int port);
 
 private:

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a222062/hbase-native-client/connection/pipeline.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/pipeline.h 
b/hbase-native-client/connection/pipeline.h
index 8114fab..6c4f4ff 100644
--- a/hbase-native-client/connection/pipeline.h
+++ b/hbase-native-client/connection/pipeline.h
@@ -26,7 +26,8 @@
 #include "utils/user-util.h"
 
 namespace hbase {
-using SerializePipeline = wangle::Pipeline<folly::IOBufQueue &, Request>;
+using SerializePipeline =
+    wangle::Pipeline<folly::IOBufQueue &, std::unique_ptr<Request>>;
 
 class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> {
 public:

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a222062/hbase-native-client/connection/request.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/request.cc 
b/hbase-native-client/connection/request.cc
new file mode 100644
index 0000000..50ea029
--- /dev/null
+++ b/hbase-native-client/connection/request.cc
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/request.h"
+
+#include "if/Client.pb.h"
+
+using namespace hbase;
+
+Request::Request(std::shared_ptr<google::protobuf::Message> req,
+                 std::shared_ptr<google::protobuf::Message> resp,
+                 std::string method)
+    : req_msg_(req), resp_msg_(resp), method_(method), call_id_(0) {}
+
+std::unique_ptr<Request> Request::get() {
+  return std::make_unique<Request>(std::make_shared<hbase::pb::GetRequest>(),
+                                  std::make_shared<hbase::pb::GetResponse>(),
+                                  "Get");
+}
+std::unique_ptr<Request> Request::mutate() {
+  return 
std::make_unique<Request>(std::make_shared<hbase::pb::MutateRequest>(),
+                                  
std::make_shared<hbase::pb::MutateResponse>(),
+                                  "Mutate");
+}
+std::unique_ptr<Request> Request::scan() {
+  return std::make_unique<Request>(std::make_shared<hbase::pb::ScanRequest>(),
+                                  std::make_shared<hbase::pb::ScanResponse>(),
+                                  "Scan");
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a222062/hbase-native-client/connection/request.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/request.h 
b/hbase-native-client/connection/request.h
index e9e3e88..743c469 100644
--- a/hbase-native-client/connection/request.h
+++ b/hbase-native-client/connection/request.h
@@ -21,22 +21,39 @@
 #include <google/protobuf/message.h>
 
 #include <cstdint>
+#include <memory>
 #include <string>
 
 namespace hbase {
 class Request {
 public:
-  Request() : call_id_(0) {}
+  static std::unique_ptr<Request> get();
+  static std::unique_ptr<Request> mutate();
+  static std::unique_ptr<Request> scan();
+
+  Request(std::shared_ptr<google::protobuf::Message> req,
+          std::shared_ptr<google::protobuf::Message> resp, std::string method);
+
   uint32_t call_id() { return call_id_; }
   void set_call_id(uint32_t call_id) { call_id_ = call_id; }
-  google::protobuf::Message *msg() { return msg_.get(); }
-  void set_msg(std::shared_ptr<google::protobuf::Message> msg) { msg_ = msg; }
+
+  std::shared_ptr<google::protobuf::Message> req_msg() { return req_msg_; }
+  std::shared_ptr<google::protobuf::Message> resp_msg() { return resp_msg_; }
+
+  void set_req_msg(std::shared_ptr<google::protobuf::Message> msg) {
+    req_msg_ = msg;
+  }
+  void set_resp_msg(std::shared_ptr<google::protobuf::Message> msg) {
+    resp_msg_ = msg;
+  }
+
   std::string method() { return method_; }
   void set_method(std::string method) { method_ = method; }
 
 private:
   uint32_t call_id_;
-  std::shared_ptr<google::protobuf::Message> msg_ = nullptr;
+  std::shared_ptr<google::protobuf::Message> req_msg_ = nullptr;
+  std::shared_ptr<google::protobuf::Message> resp_msg_ = nullptr;
   std::string method_ = "Get";
 };
 } // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a222062/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc 
b/hbase-native-client/core/simple-client.cc
index 8b2fae5..2cb6200 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -58,24 +58,23 @@ int main(int argc, char *argv[]) {
   auto conn = cf.make_connection(result.host_name(), result.port());
 
   // Send the request
-  Request r;
+  auto r = Request::get();
 
   // This is a get request so make that
-  auto msg = make_shared<hbase::pb::GetRequest>();
+  auto req_msg = static_pointer_cast<hbase::pb::GetRequest>(r->req_msg());
 
   // Set what region
-  msg->mutable_region()->set_value(FLAGS_region);
+  req_msg->mutable_region()->set_value(FLAGS_region);
   // It's always this.
-  msg->mutable_region()->set_type(
+  req_msg->mutable_region()->set_type(
       RegionSpecifier_RegionSpecifierType::
           RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
 
   // What row.
-  msg->mutable_get()->set_row(FLAGS_row);
+  req_msg->mutable_get()->set_row(FLAGS_row);
 
   // Send it.
-  r.set_msg(msg);
-  auto resp = (*conn)(r).get(milliseconds(5000));
+  auto resp = (*conn)(std::move(r)).get(milliseconds(5000));
 
   auto get_resp = std::static_pointer_cast<GetResponse>(resp.response());
   cout << "GetResponse has_result = " << get_resp->has_result() << '\n';

Reply via email to