Renamed `GRPC_RPC` and adapted the new gRPC async API. To better describe the `GRPC_RPC` macro and to avoid future name conflicts, we renamed it to `GRPC_CLIENT_METHOD`. Additionally, we adapted the new gRPC asynchronous client API. See: https://github.com/grpc/grpc/pull/12269
We also introduced the `MethodTraits` internal helper to simplify the declaration of `Runtime::call`. Review: https://reviews.apache.org/r/67155 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3bac270b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3bac270b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3bac270b Branch: refs/heads/master Commit: 3bac270b0ed99315edf72d24653c1cb068247c20 Parents: 9a94eb6 Author: Chun-Hung Hsiao <chhs...@mesosphere.io> Authored: Tue May 15 16:43:33 2018 -0700 Committer: Chun-Hung Hsiao <chhs...@mesosphere.io> Committed: Wed May 23 16:31:12 2018 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/grpc.hpp | 44 ++++++++++++++++++----- 3rdparty/libprocess/src/tests/grpc_tests.cpp | 20 +++++------ 2 files changed, 45 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3bac270b/3rdparty/libprocess/include/process/grpc.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp index f7f3171..68bdeb1 100644 --- a/3rdparty/libprocess/include/process/grpc.hpp +++ b/3rdparty/libprocess/include/process/grpc.hpp @@ -41,8 +41,8 @@ // interface to create an asynchrous gRPC call and return a `Future`. -#define GRPC_RPC(service, rpc) \ - (&service::Stub::Async##rpc) +#define GRPC_CLIENT_METHOD(service, rpc) \ + (&service::Stub::PrepareAsync##rpc) namespace process { namespace grpc { @@ -94,6 +94,28 @@ public: namespace client { +// Internal helper utilities. +namespace internal { + +template <typename T> +struct MethodTraits; // Undefined. + + +template <typename Stub, typename Request, typename Response> +struct MethodTraits< + std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*)( + ::grpc::ClientContext*, + const Request&, + ::grpc::CompletionQueue*)> +{ + typedef Stub stub_type; + typedef Request request_type; + typedef Response response_type; +}; + +} // namespace internal { + + /** * A copyable interface to manage an internal gRPC runtime instance for * asynchronous gRPC calls. A gRPC runtime instance includes a gRPC @@ -123,17 +145,19 @@ public: * * @param channel A connection to a gRPC server. * @param rpc The asynchronous gRPC call to make. This can be obtained - * by the `GRPC_RPC(Service, RPC)` macro. + * by the `GRPC_CLIENT_METHOD(service, rpc)` macro. * @param request The request protobuf for the gRPC call. * @return a `Future` of `Try` waiting for a response protobuf or an error. */ - template <typename Stub, typename Request, typename Response> + template < + typename Method, + typename Request = + typename internal::MethodTraits<Method>::request_type, + typename Response = + typename internal::MethodTraits<Method>::response_type> Future<Try<Response, StatusError>> call( const Channel& channel, - std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)( - ::grpc::ClientContext*, - const Request&, - ::grpc::CompletionQueue*), + Method&& method, const Request& request) { static_assert( @@ -170,8 +194,10 @@ public: std::shared_ptr<::grpc::Status> status(new ::grpc::Status()); std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader( - (Stub(channel.channel).*rpc)(context.get(), request, &data->queue)); + (typename internal::MethodTraits<Method>::stub_type( + channel.channel).*method)(context.get(), request, &data->queue)); + reader->StartCall(); reader->Finish( response.get(), status.get(), http://git-wip-us.apache.org/repos/asf/mesos/blob/3bac270b/3rdparty/libprocess/src/tests/grpc_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp index 07c2f3e..f1cdb5e 100644 --- a/3rdparty/libprocess/src/tests/grpc_tests.cpp +++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp @@ -124,7 +124,7 @@ TEST_F(GRPCClientTest, Success) client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); + runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); AWAIT_ASSERT_READY(pong); EXPECT_SOME(pong.get()); @@ -171,13 +171,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs) client::Runtime runtime; Future<Try<Pong, StatusError>> pong1 = - runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); + runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); Future<Try<Pong, StatusError>> pong2 = - runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); + runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); Future<Try<Pong, StatusError>> pong3 = - runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); + runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); AWAIT_READY(processed1->future()); AWAIT_READY(processed2->future()); @@ -216,7 +216,7 @@ TEST_F(GRPCClientTest, StatusError) client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); + runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); AWAIT_ASSERT_READY(pong); EXPECT_ERROR(pong.get()); @@ -242,7 +242,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted) client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel, GRPC_RPC(PingPong, Send), Ping()); + runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping()); pong.discard(); @@ -280,7 +280,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing) client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); + runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); AWAIT_READY(processed->future()); @@ -319,7 +319,7 @@ TEST_F(GRPCClientTest, ClientShutdown) client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); + runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); AWAIT_READY(processed->future()); @@ -347,7 +347,7 @@ TEST_F(GRPCClientTest, ServerUnreachable) client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel, GRPC_RPC(PingPong, Send), Ping()); + runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping()); runtime.terminate(); AWAIT_ASSERT_READY(runtime.wait()); @@ -381,7 +381,7 @@ TEST_F(GRPCClientTest, ServerTimeout) client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); + runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); // TODO(chhsiao): The gRPC library returns a failure after the default timeout // (5 seconds) is passed. The timeout should be lowered once we support it.