Repository: hbase Updated Branches: refs/heads/HBASE-14850 2aceb22f7 -> d308e0d35
HBASE-17585 [C++] Use KVCodec in the RPC request/response Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8b3954f5 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8b3954f5 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8b3954f5 Branch: refs/heads/HBASE-14850 Commit: 8b3954f5749066e820a7e60c073eefcb6bc65d4d Parents: 2aceb22 Author: Enis Soztutar <e...@apache.org> Authored: Mon Feb 13 18:50:36 2017 -0800 Committer: Enis Soztutar <e...@apache.org> Committed: Mon Feb 13 18:50:36 2017 -0800 ---------------------------------------------------------------------- .../connection/client-dispatcher.cc | 10 ++-- .../connection/client-dispatcher.h | 9 ++-- .../connection/client-handler.cc | 36 ++++++++++--- hbase-native-client/connection/client-handler.h | 7 +-- .../connection/connection-factory.cc | 5 +- .../connection/connection-factory.h | 3 +- .../connection/connection-pool-test.cc | 10 ++-- .../connection/connection-pool.cc | 11 ++-- .../connection/connection-pool.h | 3 +- hbase-native-client/connection/pipeline.cc | 5 +- hbase-native-client/connection/pipeline.h | 4 +- hbase-native-client/connection/response.h | 11 +++- hbase-native-client/connection/rpc-client.cc | 32 +++++------ hbase-native-client/connection/rpc-client.h | 21 ++++---- hbase-native-client/connection/rpc-connection.h | 2 +- hbase-native-client/connection/service.h | 2 +- hbase-native-client/core/client-test.cc | 6 +-- hbase-native-client/core/client.cc | 9 +++- hbase-native-client/core/client.h | 2 + hbase-native-client/core/location-cache-test.cc | 10 ++-- hbase-native-client/core/location-cache.cc | 20 +------ hbase-native-client/core/meta-utils.cc | 34 ++++++++++++ hbase-native-client/core/meta-utils.h | 7 +++ hbase-native-client/core/response_converter.cc | 56 ++++++++++++++++++-- hbase-native-client/core/response_converter.h | 9 +++- hbase-native-client/core/result.cc | 2 +- hbase-native-client/core/result.h | 2 +- hbase-native-client/core/simple-client.cc | 7 ++- hbase-native-client/core/table.cc | 4 +- .../serde/client-deserializer-test.cc | 8 +-- .../serde/client-serializer-test.cc | 8 +-- hbase-native-client/serde/rpc.cc | 14 ++++- hbase-native-client/serde/rpc.h | 13 ++++- 33 files changed, 270 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/client-dispatcher.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index 1ace99c..626fc76 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -26,8 +26,8 @@ using namespace wangle; ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {} -void ClientDispatcher::read(Context *ctx, Response in) { - auto call_id = in.call_id(); +void ClientDispatcher::read(Context *ctx, std::unique_ptr<Response> in) { + auto call_id = in->call_id(); auto search = requests_.find(call_id); CHECK(search != requests_.end()); @@ -37,13 +37,13 @@ void ClientDispatcher::read(Context *ctx, Response in) { // TODO(eclark): check if the response // is an exception. If it is then set that. - p.setValue(in); + p.setValue(std::move(in)); } -Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) { +Future<std::unique_ptr<Response>> ClientDispatcher::operator()(std::unique_ptr<Request> arg) { auto call_id = current_call_id_++; arg->set_call_id(call_id); - requests_.insert(call_id, Promise<Response>{}); + requests_.insert(call_id, Promise<std::unique_ptr<Response>>{}); auto &p = requests_.find(call_id)->second; auto f = p.getFuture(); p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/client-dispatcher.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h index 0489717..857042c 100644 --- a/hbase-native-client/connection/client-dispatcher.h +++ b/hbase-native-client/connection/client-dispatcher.h @@ -36,21 +36,22 @@ namespace hbase { * future. */ class ClientDispatcher - : public wangle::ClientDispatcherBase<SerializePipeline, std::unique_ptr<Request>, Response> { + : public wangle::ClientDispatcherBase<SerializePipeline, std::unique_ptr<Request>, + std::unique_ptr<Response>> { public: /** Create a new ClientDispatcher */ ClientDispatcher(); /** Read a response off the pipeline. */ - void read(Context *ctx, Response in) override; + void read(Context *ctx, std::unique_ptr<Response> in) override; /** Take a request as a call and send it down the pipeline. */ - folly::Future<Response> operator()(std::unique_ptr<Request> arg) override; + folly::Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> arg) override; /** Close the dispatcher and the associated pipeline. */ folly::Future<folly::Unit> close(Context *ctx) override; /** Close the dispatcher and the associated pipeline. */ folly::Future<folly::Unit> close() override; private: - folly::AtomicHashMap<uint32_t, folly::Promise<Response>> requests_; + folly::AtomicHashMap<uint32_t, folly::Promise<std::unique_ptr<Response>>> requests_; // Start at some number way above what could // be there for un-initialized call id counters. // http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/client-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index 5a6dce2..af84572 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -36,9 +36,9 @@ using hbase::pb::ResponseHeader; using hbase::pb::GetResponse; using google::protobuf::Message; -ClientHandler::ClientHandler(std::string user_name) +ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec) : user_name_(user_name), - serde_(), + serde_(codec), once_flag_(std::make_unique<std::once_flag>()), resp_msgs_( make_unique<folly::AtomicHashMap<uint32_t, std::shared_ptr<google::protobuf::Message>>>( @@ -47,12 +47,12 @@ ClientHandler::ClientHandler(std::string user_name) void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { if (LIKELY(buf != nullptr)) { buf->coalesce(); - Response received; + auto received = std::make_unique<Response>(); ResponseHeader header; int used_bytes = serde_.ParseDelimited(buf.get(), &header); - LOG(INFO) << "Read ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() - << " has_exception=" << header.has_exception(); + VLOG(1) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() + << " has_exception=" << header.has_exception(); // Get the response protobuf from the map auto search = resp_msgs_->find(header.call_id()); @@ -67,17 +67,34 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { // set the call_id. // This will be used to by the dispatcher to match up // the promise with the response. - received.set_call_id(header.call_id()); + received->set_call_id(header.call_id()); // If there was an exception then there's no // data left on the wire. if (header.has_exception() == false) { buf->trimStart(used_bytes); + + int cell_block_length = 0; used_bytes = serde_.ParseDelimited(buf.get(), resp_msg.get()); + if (header.has_cell_block_meta() && header.cell_block_meta().has_length()) { + cell_block_length = header.cell_block_meta().length(); + } + + VLOG(3) << "Read RPCResponse, buf length:" << buf->length() + << ", header PB length:" << used_bytes << ", cell_block length:" << cell_block_length; + // Make sure that bytes were parsed. - CHECK(used_bytes == buf->length()); - received.set_resp_msg(resp_msg); + CHECK((used_bytes + cell_block_length) == buf->length()); + + if (cell_block_length > 0) { + auto cell_scanner = serde_.CreateCellScanner(std::move(buf), used_bytes, cell_block_length); + received->set_cell_scanner(std::move(cell_scanner)); + } + + received->set_resp_msg(resp_msg); } + // TODO: set exception in Response here + ctx->fireRead(std::move(received)); } } @@ -94,6 +111,9 @@ Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) { // Now store the call id to response. resp_msgs_->insert(r->call_id(), r->resp_msg()); + + VLOG(1) << "Writing RPC Request with call_id:" << r->call_id(); + // Send the data down the pipeline. return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/client-handler.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index d860cc1..afb8e62 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -27,6 +27,7 @@ #include <string> #include <utility> +#include "serde/codec.h" #include "serde/rpc.h" // Forward decs. @@ -51,14 +52,14 @@ namespace hbase { * on first request. */ class ClientHandler - : public wangle::Handler<std::unique_ptr<folly::IOBuf>, Response, std::unique_ptr<Request>, - std::unique_ptr<folly::IOBuf>> { + : public wangle::Handler<std::unique_ptr<folly::IOBuf>, std::unique_ptr<Response>, + std::unique_ptr<Request>, std::unique_ptr<folly::IOBuf>> { public: /** * Create the handler * @param user_name the user name of the user running this process. */ - explicit ClientHandler(std::string user_name); + explicit ClientHandler(std::string user_name, std::shared_ptr<Codec> codec); /** * Get bytes from the wire. http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/connection-factory.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index ff83212..6aba351 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -26,8 +26,9 @@ using namespace folly; using namespace hbase; -ConnectionFactory::ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool) - : io_pool_(io_pool), pipeline_factory_(std::make_shared<RpcPipelineFactory>()) {} +ConnectionFactory::ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool, + std::shared_ptr<Codec> codec) + : io_pool_(io_pool), pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {} std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> ConnectionFactory::MakeBootstrap() { auto client = std::make_shared<wangle::ClientBootstrap<SerializePipeline>>(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/connection-factory.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h index da44c35..0d1e0d0 100644 --- a/hbase-native-client/connection/connection-factory.h +++ b/hbase-native-client/connection/connection-factory.h @@ -40,7 +40,8 @@ class ConnectionFactory { * Constructor. * There should only be one ConnectionFactory per client. */ - explicit ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool); + ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool, + std::shared_ptr<Codec> codec); /** Default Desctructor */ virtual ~ConnectionFactory() = default; http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/connection-pool-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc index 0930095..623ce3c 100644 --- a/hbase-native-client/connection/connection-pool-test.cc +++ b/hbase-native-client/connection/connection-pool-test.cc @@ -36,7 +36,7 @@ using hbase::ConnectionId; class MockConnectionFactory : public ConnectionFactory { public: - MockConnectionFactory() : ConnectionFactory(nullptr) {} + MockConnectionFactory() : ConnectionFactory(nullptr, nullptr) {} MOCK_METHOD0(MakeBootstrap, std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>()); MOCK_METHOD3(Connect, std::shared_ptr<HBaseService>( std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>, @@ -47,17 +47,17 @@ class MockBootstrap : public wangle::ClientBootstrap<SerializePipeline> {}; class MockServiceBase : public HBaseService { public: - folly::Future<Response> operator()(std::unique_ptr<Request> req) override { + folly::Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> req) override { return do_operation(req.get()); } - virtual folly::Future<Response> do_operation(Request *req) { - return folly::makeFuture<Response>(Response{}); + virtual folly::Future<std::unique_ptr<Response>> do_operation(Request *req) { + return folly::makeFuture<std::unique_ptr<Response>>(std::make_unique<Response>()); } }; class MockService : public MockServiceBase { public: - MOCK_METHOD1(do_operation, folly::Future<Response>(Request *)); + MOCK_METHOD1(do_operation, folly::Future<std::unique_ptr<Response>>(Request *)); }; TEST(TestConnectionPool, TestOnlyCreateOnce) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/connection-pool.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index 15dd64e..6635a6d 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -33,8 +33,9 @@ using hbase::HBaseService; using folly::SharedMutexWritePriority; using folly::SocketAddress; -ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor) - : cf_(std::make_shared<ConnectionFactory>(io_executor)), +ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, + std::shared_ptr<Codec> codec) + : cf_(std::make_shared<ConnectionFactory>(io_executor, codec)), clients_(), connections_(), map_mutex_() {} @@ -88,12 +89,12 @@ std::shared_ptr<RpcConnection> ConnectionPool::GetNewConnection( auto clientBootstrap = cf_->MakeBootstrap(); auto dispatcher = cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port()); - auto conneciton = std::make_shared<RpcConnection>(remote_id, dispatcher); + auto connection = std::make_shared<RpcConnection>(remote_id, dispatcher); - connections_.insert(std::make_pair(remote_id, conneciton)); + connections_.insert(std::make_pair(remote_id, connection)); clients_.insert(std::make_pair(remote_id, clientBootstrap)); - return conneciton; + return connection; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/connection-pool.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index 1f2a182..23e5e9a 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -46,7 +46,8 @@ namespace hbase { class ConnectionPool { public: /** Create connection pool wit default connection factory */ - explicit ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor); + ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, + std::shared_ptr<Codec> codec); /** * Desctructor. http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/pipeline.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/pipeline.cc b/hbase-native-client/connection/pipeline.cc index 14ad73c..00dc05c 100644 --- a/hbase-native-client/connection/pipeline.cc +++ b/hbase-native-client/connection/pipeline.cc @@ -30,7 +30,8 @@ using namespace folly; using namespace hbase; using namespace wangle; -RpcPipelineFactory::RpcPipelineFactory() : user_util_() {} +RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr<Codec> codec) + : user_util_(), codec_(codec) {} SerializePipeline::Ptr RpcPipelineFactory::newPipeline( std::shared_ptr<AsyncTransportWrapper> sock) { @@ -38,7 +39,7 @@ SerializePipeline::Ptr RpcPipelineFactory::newPipeline( pipeline->addBack(AsyncSocketHandler{sock}); pipeline->addBack(EventBaseHandler{}); pipeline->addBack(LengthFieldBasedFrameDecoder{}); - pipeline->addBack(ClientHandler{user_util_.user_name()}); + pipeline->addBack(ClientHandler{user_util_.user_name(), codec_}); pipeline->finalize(); return pipeline; } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/pipeline.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/pipeline.h b/hbase-native-client/connection/pipeline.h index 343219d..ea40cfd 100644 --- a/hbase-native-client/connection/pipeline.h +++ b/hbase-native-client/connection/pipeline.h @@ -25,6 +25,7 @@ #include "connection/request.h" #include "connection/response.h" +#include "serde/codec.h" #include "utils/user-util.h" namespace hbase { @@ -40,7 +41,7 @@ class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> { /** * Constructor. This will create user util. */ - RpcPipelineFactory(); + explicit RpcPipelineFactory(std::shared_ptr<Codec> codec); /** * Create a new pipeline. @@ -55,5 +56,6 @@ class RpcPipelineFactory : public wangle::PipelineFactory<SerializePipeline> { private: UserUtil user_util_; + std::shared_ptr<Codec> codec_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/response.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/response.h b/hbase-native-client/connection/response.h index 560387c..1d60fed 100644 --- a/hbase-native-client/connection/response.h +++ b/hbase-native-client/connection/response.h @@ -22,6 +22,8 @@ #include <memory> #include <utility> +#include "serde/cell-scanner.h" + // Forward namespace google { namespace protobuf { @@ -42,7 +44,7 @@ class Response { * Constructor. * Initinalizes the call id to 0. 0 should never be a valid call id. */ - Response() : call_id_(0), resp_msg_(nullptr) {} + Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr) {} /** Get the call_id */ uint32_t call_id() { return call_id_; } @@ -62,8 +64,15 @@ class Response { resp_msg_ = std::move(response); } + void set_cell_scanner(std::unique_ptr<CellScanner> cell_scanner) { + cell_scanner_ = std::move(cell_scanner); + } + + const std::unique_ptr<CellScanner>& cell_scanner() const { return cell_scanner_; } + private: uint32_t call_id_; std::shared_ptr<google::protobuf::Message> resp_msg_; + std::unique_ptr<CellScanner> cell_scanner_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/rpc-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc index 7621193..cfbda3a 100644 --- a/hbase-native-client/connection/rpc-client.cc +++ b/hbase-native-client/connection/rpc-client.cc @@ -39,38 +39,40 @@ class RpcChannelImplementation : public AbstractRpcChannel { }; } // namespace hbase -RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor) +RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, + std::shared_ptr<Codec> codec) : io_executor_(io_executor) { - cp_ = std::make_shared<ConnectionPool>(io_executor_); + cp_ = std::make_shared<ConnectionPool>(io_executor_, codec); } void RpcClient::Close() { io_executor_->stop(); } -std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port, +std::unique_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port, std::unique_ptr<Request> req, std::shared_ptr<User> ticket) { - return std::make_shared<Response>(AsyncCall(host, port, std::move(req), ticket).get()); + return AsyncCall(host, port, std::move(req), ticket).get(); } -std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port, +std::unique_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port, std::unique_ptr<Request> req, std::shared_ptr<User> ticket, const std::string& service_name) { - return std::make_shared<Response>( - AsyncCall(host, port, std::move(req), ticket, service_name).get()); + return AsyncCall(host, port, std::move(req), ticket, service_name).get(); } -folly::Future<Response> RpcClient::AsyncCall(const std::string& host, uint16_t port, - std::unique_ptr<Request> req, - std::shared_ptr<User> ticket) { +folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host, + uint16_t port, + std::unique_ptr<Request> req, + std::shared_ptr<User> ticket) { auto remote_id = std::make_shared<ConnectionId>(host, port, ticket); return GetConnection(remote_id)->SendRequest(std::move(req)); } -folly::Future<Response> RpcClient::AsyncCall(const std::string& host, uint16_t port, - std::unique_ptr<Request> req, - std::shared_ptr<User> ticket, - const std::string& service_name) { +folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host, + uint16_t port, + std::unique_ptr<Request> req, + std::shared_ptr<User> ticket, + const std::string& service_name) { auto remote_id = std::make_shared<ConnectionId>(host, port, ticket, service_name); return GetConnection(remote_id)->SendRequest(std::move(req)); } @@ -99,5 +101,5 @@ void RpcClient::CallMethod(const MethodDescriptor* method, RpcController* contro std::unique_ptr<Request> req = std::make_unique<Request>(shared_req, shared_resp, method->name()); AsyncCall(host, port, std::move(req), ticket, method->service()->name()) - .then([done, this](Response resp) { done->Run(); }); + .then([done, this](std::unique_ptr<Response> resp) { done->Run(); }); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/rpc-client.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h index aeb9b56..f4645a0 100644 --- a/hbase-native-client/connection/rpc-client.h +++ b/hbase-native-client/connection/rpc-client.h @@ -51,27 +51,28 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> { friend class RpcChannelImplementation; public: - RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor); + RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, + std::shared_ptr<Codec> codec); virtual ~RpcClient() { Close(); } - virtual std::shared_ptr<Response> SyncCall(const std::string &host, uint16_t port, + virtual std::unique_ptr<Response> SyncCall(const std::string &host, uint16_t port, std::unique_ptr<Request> req, std::shared_ptr<User> ticket); - virtual std::shared_ptr<Response> SyncCall(const std::string &host, uint16_t port, + virtual std::unique_ptr<Response> SyncCall(const std::string &host, uint16_t port, std::unique_ptr<Request> req, std::shared_ptr<User> ticket, const std::string &service_name); - virtual folly::Future<Response> AsyncCall(const std::string &host, uint16_t port, - std::unique_ptr<Request> req, - std::shared_ptr<User> ticket); + virtual folly::Future<std::unique_ptr<Response>> AsyncCall(const std::string &host, uint16_t port, + std::unique_ptr<Request> req, + std::shared_ptr<User> ticket); - virtual folly::Future<Response> AsyncCall(const std::string &host, uint16_t port, - std::unique_ptr<Request> req, - std::shared_ptr<User> ticket, - const std::string &service_name); + virtual folly::Future<std::unique_ptr<Response>> AsyncCall(const std::string &host, uint16_t port, + std::unique_ptr<Request> req, + std::shared_ptr<User> ticket, + const std::string &service_name); virtual void Close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/rpc-connection.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-connection.h b/hbase-native-client/connection/rpc-connection.h index e2500b2..c37b1e0 100644 --- a/hbase-native-client/connection/rpc-connection.h +++ b/hbase-native-client/connection/rpc-connection.h @@ -41,7 +41,7 @@ class RpcConnection { virtual std::shared_ptr<HBaseService> get_service() const { return hbase_service_; } - virtual folly::Future<Response> SendRequest(std::unique_ptr<Request> req) { + virtual folly::Future<std::unique_ptr<Response>> SendRequest(std::unique_ptr<Request> req) { return (*hbase_service_)(std::move(req)); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/connection/service.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/service.h b/hbase-native-client/connection/service.h index 0d2e258..64d4f07 100644 --- a/hbase-native-client/connection/service.h +++ b/hbase-native-client/connection/service.h @@ -26,5 +26,5 @@ #include "connection/response.h" namespace hbase { -using HBaseService = wangle::Service<std::unique_ptr<Request>, Response>; +using HBaseService = wangle::Service<std::unique_ptr<Request>, std::unique_ptr<Response>>; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/client-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc index 28eec6f..0a45fff 100644 --- a/hbase-native-client/core/client-test.cc +++ b/hbase-native-client/core/client-test.cc @@ -116,9 +116,9 @@ TEST(Client, Get) { // Using TestUtil to populate test data hbase::TestUtil *test_util = new hbase::TestUtil(); - test_util->RunShellCmd("create 't', 'd'"); - test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'"); - test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'"); + test_util->RunShellCmd( + "create 't', 'd'; put 't', 'test2', 'd:2', 'value2'; put 't', 'test2', 'd:extra', 'value for " + "extra'"); // Create TableName and Row to be fetched from HBase auto tn = folly::to<hbase::pb::TableName>("t"); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc index c1efd8b..685524f 100644 --- a/hbase-native-client/core/client.cc +++ b/hbase-native-client/core/client.cc @@ -46,7 +46,14 @@ void Client::init(const hbase::Configuration &conf) { std::make_shared<wangle::CPUThreadPoolExecutor>(4); // TODO: read num threads from conf io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN)); - rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_); + std::shared_ptr<Codec> codec = nullptr; + if (conf.Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) == + std::string(KeyValueCodec::kJavaClassName)) { + codec = std::make_shared<hbase::KeyValueCodec>(); + } else { + LOG(WARNING) << "Not using RPC Cell Codec"; + } + rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, codec); location_cache_ = std::make_shared<hbase::LocationCache>(conf_, cpu_executor_, rpc_client_->connection_pool()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/client.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h index 730981d..0e436ba 100644 --- a/hbase-native-client/core/client.h +++ b/hbase-native-client/core/client.h @@ -30,6 +30,7 @@ #include "connection/rpc-client.h" #include "core/configuration.h" #include "core/hbase_configuration_loader.h" +#include "core/keyvalue-codec.h" #include "core/location-cache.h" #include "core/table.h" #include "if/Cell.pb.h" @@ -70,6 +71,7 @@ class Client { void init(const hbase::Configuration &conf); const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum"; const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181"; + const std::string kRpcCodec = "hbase.client.rpc.codec"; std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; std::shared_ptr<hbase::LocationCache> location_cache_; http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/location-cache-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc index 1ad6c65..9a778c6 100644 --- a/hbase-native-client/core/location-cache-test.cc +++ b/hbase-native-client/core/location-cache-test.cc @@ -23,6 +23,7 @@ #include <chrono> +#include "core/keyvalue-codec.h" #include "if/HBase.pb.h" #include "serde/table-name.h" #include "test-util/test-util.h" @@ -33,7 +34,8 @@ TEST(LocationCacheTest, TestGetMetaNodeContents) { TestUtil test_util{}; auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4); auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4); - auto cp = std::make_shared<ConnectionPool>(io); + auto codec = std::make_shared<KeyValueCodec>(); + auto cp = std::make_shared<ConnectionPool>(io, codec); LocationCache cache{test_util.conf(), cpu, cp}; auto f = cache.LocateMeta(); auto result = f.get(); @@ -49,7 +51,8 @@ TEST(LocationCacheTest, TestGetRegionLocation) { TestUtil test_util{}; auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4); auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4); - auto cp = std::make_shared<ConnectionPool>(io); + auto codec = std::make_shared<KeyValueCodec>(); + auto cp = std::make_shared<ConnectionPool>(io, codec); LocationCache cache{test_util.conf(), cpu, cp}; // If there is no table this should throw an exception @@ -68,7 +71,8 @@ TEST(LocationCacheTest, TestCaching) { TestUtil test_util{}; auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4); auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4); - auto cp = std::make_shared<ConnectionPool>(io); + auto codec = std::make_shared<KeyValueCodec>(); + auto cp = std::make_shared<ConnectionPool>(io, codec); LocationCache cache{test_util.conf(), cpu, cp}; auto tn_1 = folly::to<hbase::pb::TableName>("t1"); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/location-cache.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index dab5deb..da9f64a 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -128,10 +128,10 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(const Tabl .then([tn, row, this](std::shared_ptr<RpcConnection> rpc_connection) { return (*rpc_connection->get_service())(std::move(meta_util_.MetaRequest(tn, row))); }) - .then([this](Response resp) { + .then([this](std::unique_ptr<Response> resp) { // take the protobuf response and make it into // a region location. - return this->CreateLocation(std::move(resp)); + return meta_util_.CreateLocation(std::move(*resp)); }) .then([tn, this](std::shared_ptr<RegionLocation> rl) { // Make sure that the correct location was found. @@ -166,22 +166,6 @@ Future<shared_ptr<RegionLocation>> LocationCache::LocateRegion(const hbase::pb:: } } -std::shared_ptr<RegionLocation> LocationCache::CreateLocation(const Response &resp) { - auto resp_msg = static_pointer_cast<ScanResponse>(resp.resp_msg()); - auto &results = resp_msg->results().Get(0); - auto &cells = results.cell(); - - // TODO(eclark): There should probably be some better error - // handling around this. - auto cell_zero = cells.Get(0).value(); - auto cell_one = cells.Get(1).value(); - auto row = cells.Get(0).row(); - - auto region_info = folly::to<RegionInfo>(cell_zero); - auto server_name = folly::to<ServerName>(cell_one); - return std::make_shared<RegionLocation>(row, std::move(region_info), server_name, nullptr); -} - // must hold shared lock on locations_lock_ shared_ptr<RegionLocation> LocationCache::GetCachedLocation(const hbase::pb::TableName &tn, const std::string &row) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/meta-utils.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc index f92300c..bd26607 100644 --- a/hbase-native-client/core/meta-utils.cc +++ b/hbase-native-client/core/meta-utils.cc @@ -24,18 +24,26 @@ #include "connection/request.h" #include "connection/response.h" +#include "core/response_converter.h" #include "if/Client.pb.h" +#include "serde/region-info.h" +#include "serde/server-name.h" #include "serde/table-name.h" using hbase::pb::TableName; using hbase::MetaUtil; using hbase::Request; using hbase::Response; +using hbase::RegionLocation; +using hbase::pb::RegionInfo; using hbase::pb::ScanRequest; using hbase::pb::ServerName; using hbase::pb::RegionSpecifier_RegionSpecifierType; static const std::string META_REGION = "1588230740"; +static const std::string CATALOG_FAMILY = "info"; +static const std::string REGION_INFO_COLUMN = "regioninfo"; +static const std::string SERVER_COLUMN = "server"; std::string MetaUtil::RegionLookupRowkey(const TableName &tn, const std::string &row) const { return folly::to<std::string>(tn, ",", row, ",", "999999999999999999"); @@ -79,3 +87,29 @@ std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn, const std::st scan->set_start_row(RegionLookupRowkey(tn, row)); return request; } + +std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp) { + std::vector<std::unique_ptr<Result>> results = ResponseConverter::FromScanResponse(resp); + if (results.size() != 1) { + throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" + + std::to_string(results.size())); + } + + auto result = *results[0]; + // VLOG(1) << "Creating RegionLocation from received Response " << *result; TODO + + std::shared_ptr<std::string> region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN); + std::shared_ptr<std::string> server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN); + + if (region_info_str == nullptr) { + throw std::runtime_error("regioninfo column null for location"); + } + if (server_str == nullptr) { + throw std::runtime_error("server column null for location"); + } + + auto row = result.Row(); + auto region_info = folly::to<RegionInfo>(*region_info_str); + auto server_name = folly::to<ServerName>(*server_str); + return std::make_shared<RegionLocation>(row, std::move(region_info), server_name, nullptr); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/meta-utils.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h index 075215e..d67f32d 100644 --- a/hbase-native-client/core/meta-utils.h +++ b/hbase-native-client/core/meta-utils.h @@ -22,6 +22,8 @@ #include <string> #include "connection/request.h" +#include "connection/response.h" +#include "core/region-location.h" #include "if/HBase.pb.h" #include "serde/table-name.h" @@ -43,5 +45,10 @@ class MetaUtil { * location. */ std::unique_ptr<Request> MetaRequest(const hbase::pb::TableName tn, const std::string &row) const; + + /** + * Return a RegionLocation from the parsed Response + */ + std::shared_ptr<RegionLocation> CreateLocation(const Response &resp); }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/response_converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response_converter.cc b/hbase-native-client/core/response_converter.cc index 3fe2ba9..19a3554 100644 --- a/hbase-native-client/core/response_converter.cc +++ b/hbase-native-client/core/response_converter.cc @@ -22,9 +22,9 @@ #include <vector> #include "core/cell.h" -#include "if/Client.pb.h" using hbase::pb::GetResponse; +using hbase::pb::ScanResponse; namespace hbase { @@ -32,19 +32,65 @@ ResponseConverter::ResponseConverter() {} ResponseConverter::~ResponseConverter() {} -std::unique_ptr<hbase::Result> ResponseConverter::FromGetResponse(const Response& resp) { +std::unique_ptr<Result> ResponseConverter::FromGetResponse(const Response& resp) { auto get_resp = std::static_pointer_cast<GetResponse>(resp.resp_msg()); + return ToResult(get_resp->result(), resp.cell_scanner()); +} + +std::unique_ptr<Result> ResponseConverter::ToResult( + const hbase::pb::Result& result, const std::unique_ptr<CellScanner>& cell_scanner) { std::vector<std::shared_ptr<Cell>> vcells; - for (auto cell : get_resp->result().cell()) { + for (auto cell : result.cell()) { std::shared_ptr<Cell> pcell = std::make_shared<Cell>(cell.row(), cell.family(), cell.qualifier(), cell.timestamp(), cell.value(), static_cast<hbase::CellType>(cell.cell_type())); vcells.push_back(pcell); } - return std::make_unique<hbase::Result>(vcells, get_resp->result().exists(), - get_resp->result().stale(), get_resp->result().partial()); + // iterate over the cells coming from rpc codec + if (cell_scanner != nullptr) { + while (cell_scanner->Advance()) { + vcells.push_back(cell_scanner->Current()); + } + // TODO: check associated cell count? + } + return std::make_unique<Result>(vcells, result.exists(), result.stale(), result.partial()); } +std::vector<std::unique_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) { + auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg()); + VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString(); + int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size() + : scan_resp->results_size(); + + std::vector<std::unique_ptr<Result>> results{static_cast<size_t>(num_results)}; + for (int i = 0; i < num_results; i++) { + if (resp.cell_scanner() != nullptr) { + // Cells are out in cellblocks. Group them up again as Results. How many to read at a + // time will be found in getCellsLength -- length here is how many Cells in the i'th Result + int num_cells = scan_resp->cells_per_result(i); + + std::vector<std::shared_ptr<Cell>> vcells; + while (resp.cell_scanner()->Advance()) { + vcells.push_back(resp.cell_scanner()->Current()); + } + // TODO: check associated cell count? + + if (vcells.size() != num_cells) { + std::string msg = "Results sent from server=" + std::to_string(num_results) + + ". But only got " + std::to_string(i) + + " results completely at client. Resetting the scanner to scan again."; + LOG(ERROR) << msg; + throw std::runtime_error(msg); + } + // TODO: handle partial results per Result by checking partial_flag_per_result + results[i] = std::make_unique<Result>(vcells, false, scan_resp->stale(), false); + } else { + results[i] = ToResult(scan_resp->results(i), resp.cell_scanner()); + } + } + + return results; +} } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/response_converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response_converter.h b/hbase-native-client/core/response_converter.h index 86fb632..859644b 100644 --- a/hbase-native-client/core/response_converter.h +++ b/hbase-native-client/core/response_converter.h @@ -22,6 +22,8 @@ #include <memory> #include "connection/response.h" #include "core/result.h" +#include "if/Client.pb.h" +#include "serde/cell-scanner.h" namespace hbase { @@ -33,11 +35,16 @@ class ResponseConverter { public: ~ResponseConverter(); + static std::unique_ptr<Result> ToResult(const hbase::pb::Result& result, + const std::unique_ptr<CellScanner>& cell_scanner); + /** * @brief Returns a Result object created by PB Message in passed Response object. * @param resp - Response object having the PB message. */ - static std::unique_ptr<hbase::Result> FromGetResponse(const Response &resp); + static std::unique_ptr<hbase::Result> FromGetResponse(const Response& resp); + + static std::vector<std::unique_ptr<Result>> FromScanResponse(const Response& resp); private: // Constructor not required. We have all static methods to extract response from PB messages. http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/result.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/result.cc b/hbase-native-client/core/result.cc index d73a5b2..4db3fca 100644 --- a/hbase-native-client/core/result.cc +++ b/hbase-native-client/core/result.cc @@ -87,7 +87,7 @@ bool Result::IsEmpty() const { return cells_.empty(); } const std::string &Result::Row() const { return row_; } -const int Result::Size() const { return cells_.size(); } +int Result::Size() const { return cells_.size(); } const ResultMap &Result::Map() const { return result_map_; } http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/result.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/result.h b/hbase-native-client/core/result.h index cd41cf0..8ff4311 100644 --- a/hbase-native-client/core/result.h +++ b/hbase-native-client/core/result.h @@ -100,7 +100,7 @@ class Result { /** * @brief Returns the size of the underlying Cell vector */ - const int Size() const; + int Size() const; /** * @brief Map of families to all versions of its qualifiers and values. http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/simple-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index 3cd0a93..4b1144c 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -31,6 +31,7 @@ #include "connection/connection-pool.h" #include "core/client.h" +#include "core/keyvalue-codec.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" #include "serde/server-name.h" @@ -43,6 +44,7 @@ using hbase::Configuration; using hbase::Response; using hbase::Request; using hbase::HBaseService; +using hbase::KeyValueCodec; using hbase::LocationCache; using hbase::ConnectionPool; using hbase::ConnectionFactory; @@ -88,7 +90,8 @@ int main(int argc, char *argv[]) { // Set up thread pools. auto cpu_pool = std::make_shared<wangle::CPUThreadPoolExecutor>(FLAGS_threads); auto io_pool = std::make_shared<wangle::IOThreadPoolExecutor>(5); - auto cp = std::make_shared<ConnectionPool>(io_pool); + auto codec = std::make_shared<KeyValueCodec>(); + auto cp = std::make_shared<ConnectionPool>(io_pool, codec); // Configuration auto conf = std::make_shared<Configuration>(); @@ -105,7 +108,7 @@ int main(int argc, char *argv[]) { auto num_puts = FLAGS_columns; - auto results = std::vector<Future<Response>>{}; + auto results = std::vector<Future<std::unique_ptr<Response>>>{}; auto col = uint64_t{0}; for (; col < num_puts; col++) { results.push_back( http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/core/table.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index 58125f9..4e30d4b 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -56,12 +56,12 @@ std::unique_ptr<hbase::Result> Table::Get(const hbase::Get &get) { auto req = hbase::RequestConverter::ToGetRequest(get, loc->region_name()); auto user = User::defaultUser(); // TODO: make User::current() similar to UserUtil - Future<Response> f = + Future<std::unique_ptr<Response>> f = rpc_client_->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(req), user, "ClientService"); auto resp = f.get(); - return hbase::ResponseConverter::FromGetResponse(resp); + return hbase::ResponseConverter::FromGetResponse(*resp); } void Table::Close() { http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/serde/client-deserializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc index a30f904..054684d 100644 --- a/hbase-native-client/serde/client-deserializer-test.cc +++ b/hbase-native-client/serde/client-deserializer-test.cc @@ -30,12 +30,12 @@ using hbase::pb::RegionSpecifier; using hbase::pb::RegionSpecifier_RegionSpecifierType; TEST(TestRpcSerde, TestReturnFalseOnNullPtr) { - RpcSerde deser; + RpcSerde deser{nullptr}; ASSERT_LT(deser.ParseDelimited(nullptr, nullptr), 0); } TEST(TestRpcSerde, TestReturnFalseOnBadInput) { - RpcSerde deser; + RpcSerde deser{nullptr}; auto buf = IOBuf::copyBuffer("test"); GetRequest gr; @@ -44,8 +44,8 @@ TEST(TestRpcSerde, TestReturnFalseOnBadInput) { TEST(TestRpcSerde, TestGoodGetRequestFullRoundTrip) { GetRequest in; - RpcSerde ser; - RpcSerde deser; + RpcSerde ser{nullptr}; + RpcSerde deser{nullptr}; // fill up the GetRequest. in.mutable_region()->set_value("test_region_id"); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/serde/client-serializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc index 2bd17fb..33c48f3 100644 --- a/hbase-native-client/serde/client-serializer-test.cc +++ b/hbase-native-client/serde/client-serializer-test.cc @@ -32,7 +32,7 @@ using namespace folly; using namespace folly::io; TEST(RpcSerdeTest, PreambleIncludesHBas) { - RpcSerde ser; + RpcSerde ser{nullptr}; auto buf = ser.Preamble(); const char *p = reinterpret_cast<const char *>(buf->data()); // Take the first for chars and make sure they are the @@ -43,14 +43,14 @@ TEST(RpcSerdeTest, PreambleIncludesHBas) { } TEST(RpcSerdeTest, PreambleIncludesVersion) { - RpcSerde ser; + RpcSerde ser{nullptr}; auto buf = ser.Preamble(); EXPECT_EQ(0, static_cast<const uint8_t *>(buf->data())[4]); EXPECT_EQ(80, static_cast<const uint8_t *>(buf->data())[5]); } TEST(RpcSerdeTest, TestHeaderLengthPrefixed) { - RpcSerde ser; + RpcSerde ser{nullptr}; auto header = ser.Header("elliott"); // The header should be prefixed by 4 bytes of length. @@ -65,7 +65,7 @@ TEST(RpcSerdeTest, TestHeaderLengthPrefixed) { } TEST(RpcSerdeTest, TestHeaderDecode) { - RpcSerde ser; + RpcSerde ser{nullptr}; auto buf = ser.Header("elliott"); auto header_buf = buf->next(); ConnectionHeader h; http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/serde/rpc.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc.cc index 3bdb489..d5bca62 100644 --- a/hbase-native-client/serde/rpc.cc +++ b/hbase-native-client/serde/rpc.cc @@ -83,7 +83,7 @@ int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) { return coded_stream.CurrentPosition(); } -RpcSerde::RpcSerde() : auth_type_(DEFAULT_AUTH_TYPE) {} +RpcSerde::RpcSerde(std::shared_ptr<Codec> codec) : auth_type_(DEFAULT_AUTH_TYPE), codec_(codec) {} unique_ptr<IOBuf> RpcSerde::Preamble() { auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2); @@ -110,6 +110,10 @@ unique_ptr<IOBuf> RpcSerde::Header(const string &user) { // didn't. // TODO: send the service name and user from the RpcClient h.set_service_name(INTERFACE); + + if (codec_ != nullptr) { + h.set_cell_block_codec_class(codec_->java_class_name()); + } return PrependLength(SerializeMessage(h)); } @@ -128,6 +132,14 @@ unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, const string &method return PrependLength(std::move(ser_header)); } +std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, + uint32_t offset, uint32_t length) { + if (codec_ == nullptr) { + return nullptr; + } + return codec_->CreateDecoder(std::move(buf), offset, length); +} + unique_ptr<IOBuf> RpcSerde::PrependLength(unique_ptr<IOBuf> msg) { // Java ints are 4 long. So create a buffer that large auto len_buf = IOBuf::create(4); http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3954f5/hbase-native-client/serde/rpc.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/rpc.h b/hbase-native-client/serde/rpc.h index 7d060c7..c59f903 100644 --- a/hbase-native-client/serde/rpc.h +++ b/hbase-native-client/serde/rpc.h @@ -21,6 +21,9 @@ #include <memory> #include <string> +#include "serde/cell-scanner.h" +#include "serde/codec.h" + // Forward namespace folly { class IOBuf; @@ -44,7 +47,7 @@ class RpcSerde { /** * Constructor assumes the default auth type. */ - RpcSerde(); + RpcSerde(std::shared_ptr<Codec> codec); /** * Destructor. This is provided just for testing purposes. @@ -76,6 +79,13 @@ class RpcSerde { std::unique_ptr<folly::IOBuf> Header(const std::string &user); /** + * Take ownership of the passed buffer, and create a CellScanner using the + * Codec class to parse Cells out of the wire. + */ + std::unique_ptr<CellScanner> CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, uint32_t offset, + uint32_t length); + + /** * Serialize a request message into a protobuf. * Request consists of: * @@ -109,5 +119,6 @@ class RpcSerde { private: /* data */ uint8_t auth_type_; + std::shared_ptr<Codec> codec_; }; } // namespace hbase