Added gRPC support in libprocess. A gRPC client can create a `process::grpc::client::Runtime` instance and use its `call()` method to send an asynchronous gRPC call to `Channel` (representing a connection to a gRPC server), and get a future waiting for the response. The `Runtime` class maintains a `CompletionQueue` to manage all pending asynchronous gRPC calls, and spawns a thread waiting for any response from the `CompletionQueue`. All gRPC calls using the same `Runtime` copy would share the same `CompletionQueue` and thread.
Review: https://reviews.apache.org/r/61097 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ce40531 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ce40531 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ce40531 Branch: refs/heads/master Commit: 7ce405318fad9284c67cf507480f8007f8219549 Parents: 70a4109 Author: Chun-Hung Hsiao <chhs...@mesosphere.io> Authored: Mon Jul 24 15:55:09 2017 -0700 Committer: Jie Yu <yujie....@gmail.com> Committed: Thu Aug 10 16:55:05 2017 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/grpc.hpp | 204 ++++++++++++++++++++++ 3rdparty/libprocess/src/grpc.cpp | 96 ++++++++++ 2 files changed, 300 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/7ce40531/3rdparty/libprocess/include/process/grpc.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp new file mode 100644 index 0000000..91c1cd9 --- /dev/null +++ b/3rdparty/libprocess/include/process/grpc.hpp @@ -0,0 +1,204 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#ifndef __PROCESS_GRPC_HPP__ +#define __PROCESS_GRPC_HPP__ + +#include <atomic> +#include <chrono> +#include <memory> +#include <thread> +#include <type_traits> + +#include <google/protobuf/message.h> + +#include <grpc++/grpc++.h> + +#include <process/future.hpp> +#include <process/owned.hpp> +#include <process/process.hpp> + +#include <stout/duration.hpp> +#include <stout/lambda.hpp> +#include <stout/synchronized.hpp> +#include <stout/try.hpp> + + +// This file provides libprocess "support" for using gRPC. In +// particular, it defines two wrapper classes: `Channel` (representing a +// connection to a gRPC server) and `client::Runtime`, which integrates +// an event loop waiting for gRPC responses, and provides the `call` +// interface to create an asynchrous gRPC call and return a `Future`. + + +#define GRPC_RPC(service, rpc) \ + (&service::Stub::Async##rpc) + +namespace process { +namespace grpc { + +// Forward declarations. +namespace client { class Runtime; } + + +/** + * A copyable interface to manage a connection to a gRPC server. + * All `Channel` copies share the same connection. Note that the + * connection is established lazily by the gRPC runtime library: the + * actual connection is delayed till an RPC call is made. + */ +class Channel +{ +public: + Channel(const std::string& uri, + const std::shared_ptr<::grpc::ChannelCredentials>& credentials = + ::grpc::InsecureChannelCredentials()) + : channel(::grpc::CreateChannel(uri, credentials)) {} + +private: + std::shared_ptr<::grpc::Channel> channel; + + friend class client::Runtime; +}; + + +namespace client { + +/** + * A copyable interface to manage an internal gRPC runtime instance for + * asynchronous gRPC calls. A gRPC runtime instance includes a gRPC + * `CompletionQueue` to manage outstanding requests, a looper thread to + * wait for any incoming responses from the `CompletionQueue`, and a + * process to handle the responses. All `Runtime` copies share the same + * gRPC runtime instance. Usually we only need a single gRPC runtime + * instance to handle all gRPC calls, but multiple instances can be + * instantiated for more parallelism or isolation. + * NOTE: The destruction of the internal gRPC runtime instance is a + * blocking operation: it waits for the managed process to terminate. + * The user should ensure that this only happens at shutdown. + */ +class Runtime +{ +public: + Runtime() : data(new Data()) {} + + /** + * Sends an asynchronous gRPC call. + * + * @param channel A connection to a gRPC server. + * @param rpc The asynchronous gRPC call to make. This can be obtained + * by the `GRPC_RPC(Service, RPC)` macro. + * @param request The request protobuf for the gRPC call. + * @return a `Future` waiting for a response protobuf. + */ + template <typename Stub, typename Request, typename Response> + Future<Response> call( + const Channel& channel, + std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)( + ::grpc::ClientContext*, + const Request&, + ::grpc::CompletionQueue*), + const Request& request) + { + static_assert( + std::is_convertible<Request*, google::protobuf::Message*>::value, + "Request must be a protobuf message"); + + synchronized (data->lock) { + if (data->terminating) { + return Failure("Runtime has been terminated."); + } + + std::shared_ptr<::grpc::ClientContext> context( + new ::grpc::ClientContext()); + + // TODO(chhsiao): Allow the caller to specify a timeout. + context->set_deadline( + std::chrono::system_clock::now() + std::chrono::seconds(5)); + + // Create a `Promise` and a callback lambda as a tag and invokes + // an asynchronous gRPC call through the `CompletionQueue` + // managed by `data`. The `Promise` will be set by the callback + // upon server response. + std::shared_ptr<Promise<Response>> promise(new Promise<Response>); + promise->future().onDiscard([=] { context->TryCancel(); }); + + std::shared_ptr<Response> response(new Response()); + std::shared_ptr<::grpc::Status> status(new ::grpc::Status()); + + std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader( + (Stub(channel.channel).*rpc)(context.get(), request, &data->queue)); + + reader->Finish( + response.get(), + status.get(), + new lambda::function<void()>( + // NOTE: `context` and `reader` need to be held on in + // order to get updates for the ongoing RPC, and thus + // are captured here. The lambda itself will later be + // retrieved and managed in `Data::loop()`. + [context, reader, response, status, promise]() { + CHECK(promise->future().isPending()); + if (promise->future().hasDiscard()) { + promise->discard(); + } else if (status->ok()) { + promise->set(*response); + } else { + promise->fail(status->error_message()); + } + })); + + return promise->future(); + } + } + + /** + * Asks the internal gRPC runtime instance to shut down the + * `CompletionQueue`, which would stop its looper thread, drain and + * fail all pending gRPC calls in the `CompletionQueue`, then + * asynchronously join the looper thread. + */ + void terminate(); + + /** + * @return A `Future` waiting for all pending gRPC calls in the + * `CompletionQueue` of the internal gRPC runtime instance to be + * drained and the looper thread to be joined. + */ + Future<Nothing> wait(); + +private: + struct Data + { + Data(); + ~Data(); + + void loop(); + void terminate(); + + std::unique_ptr<std::thread> looper; + ::grpc::CompletionQueue queue; + ProcessBase process; + std::atomic_flag lock; + bool terminating; + Promise<Nothing> terminated; + }; + + std::shared_ptr<Data> data; +}; + +} // namespace client { + +} // namespace grpc { +} // namespace process { + +#endif // __PROCESS_GRPC_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/7ce40531/3rdparty/libprocess/src/grpc.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/grpc.cpp b/3rdparty/libprocess/src/grpc.cpp new file mode 100644 index 0000000..3ba5bc5 --- /dev/null +++ b/3rdparty/libprocess/src/grpc.cpp @@ -0,0 +1,96 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#include <process/dispatch.hpp> +#include <process/grpc.hpp> +#include <process/id.hpp> + +namespace process { +namespace grpc { + +namespace client { + +void Runtime::terminate() +{ + data->terminate(); +} + + +Future<Nothing> Runtime::wait() +{ + return data->terminated.future(); +} + + +Runtime::Data::Data() + : process(ID::generate("__grpc_client__")), + lock(ATOMIC_FLAG_INIT), + terminating(false) +{ + spawn(process); + + // The looper thread can only be created here since it need to happen + // after `queue` is initialized. + looper.reset(new std::thread(&Runtime::Data::loop, this)); +} + + +Runtime::Data::~Data() +{ + terminate(); + process::wait(process); +} + + +void Runtime::Data::loop() +{ + void* tag; + bool ok; + + while (queue.Next(&tag, &ok)) { + // The returned callback object is managed by the `callback` shared + // pointer, so if we get a regular event from the `CompletionQueue`, + // then the object would be captured by the following lambda + // dispatched to `process`; otherwise it would be reclaimed here. + std::shared_ptr<lambda::function<void()>> callback( + reinterpret_cast<lambda::function<void()>*>(tag)); + if (ok) { + dispatch(process, [=] { (*callback)(); }); + } + } + + dispatch(process, [this] { + // NOTE: This is a blocking call. However, the thread is guaranteed + // to be exiting, therefore the amount of blocking time should be + // short (just like other syscalls we invoke). + looper->join(); + // Terminate `process` after all events are drained. + process::terminate(process, false); + terminated.set(Nothing()); + }); +} + + +void Runtime::Data::terminate() +{ + synchronized (lock) { + if (!terminating) { + terminating = true; + queue.Shutdown(); + } + } +} + +} // namespace client { + +} // namespace grpc { +} // namespace process {