Added per-CSI-call RPC metrics for SLRP. For each CSI call, e.g., `csi.v0.Identity.Probe`, we the following metrics for SLRP: `csi_plugin/rpcs/csi.v0.Identity.Probe/pending` `csi_plugin/rpcs/csi.v0.Identity.Probe/successes` `csi_plugin/rpcs/csi.v0.Identity.Probe/errors` `csi_plugin/rpcs/csi.v0.Identity.Probe/cancelled`
To add these per-CSI-call metrics, each method in `csi::v0::Client`, e.g., `csi::v0::Client::Probe`, is changed to `csi::v0::Client::call<PROBE>`, to make RPC calls based on the RPC enum value. A `call` helper function in SLRP is also added to intercept CSI calls and update the corresponding metrics. Review: https://reviews.apache.org/r/67255 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/15fc86d2 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/15fc86d2 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/15fc86d2 Branch: refs/heads/master Commit: 15fc86d22f1fbe922ef878bb2e6f6462d2248b14 Parents: cae7d7a Author: Chun-Hung Hsiao <chhs...@mesosphere.io> Authored: Tue May 22 17:34:06 2018 -0700 Committer: Chun-Hung Hsiao <chhs...@mesosphere.io> Committed: Thu May 31 18:29:56 2018 -0700 ---------------------------------------------------------------------- src/csi/client.cpp | 68 +++++++--- src/csi/client.hpp | 139 ++++++++++++++------- src/resource_provider/storage/provider.cpp | 158 ++++++++++++++++++++---- src/tests/csi_client_tests.cpp | 80 ++++++------ 4 files changed, 326 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/15fc86d2/src/csi/client.cpp ---------------------------------------------------------------------- diff --git a/src/csi/client.cpp b/src/csi/client.cpp index a4ba1f1..923ee6f 100644 --- a/src/csi/client.cpp +++ b/src/csi/client.cpp @@ -25,7 +25,9 @@ namespace mesos { namespace csi { namespace v0 { -Future<GetPluginInfoResponse> Client::GetPluginInfo( +template <> +Future<GetPluginInfoResponse> +Client::call<GET_PLUGIN_INFO>( const GetPluginInfoRequest& request) { return runtime @@ -37,7 +39,9 @@ Future<GetPluginInfoResponse> Client::GetPluginInfo( } -Future<GetPluginCapabilitiesResponse> Client::GetPluginCapabilities( +template <> +Future<GetPluginCapabilitiesResponse> +Client::call<GET_PLUGIN_CAPABILITIES>( const GetPluginCapabilitiesRequest& request) { return runtime @@ -52,7 +56,9 @@ Future<GetPluginCapabilitiesResponse> Client::GetPluginCapabilities( } -Future<ProbeResponse> Client::Probe( +template <> +Future<ProbeResponse> +Client::call<PROBE>( const ProbeRequest& request) { return runtime @@ -64,7 +70,9 @@ Future<ProbeResponse> Client::Probe( } -Future<CreateVolumeResponse> Client::CreateVolume( +template <> +Future<CreateVolumeResponse> +Client::call<CREATE_VOLUME>( const CreateVolumeRequest& request) { return runtime @@ -76,7 +84,9 @@ Future<CreateVolumeResponse> Client::CreateVolume( } -Future<DeleteVolumeResponse> Client::DeleteVolume( +template <> +Future<DeleteVolumeResponse> +Client::call<DELETE_VOLUME>( const DeleteVolumeRequest& request) { return runtime @@ -88,7 +98,9 @@ Future<DeleteVolumeResponse> Client::DeleteVolume( } -Future<ControllerPublishVolumeResponse> Client::ControllerPublishVolume( +template <> +Future<ControllerPublishVolumeResponse> +Client::call<CONTROLLER_PUBLISH_VOLUME>( const ControllerPublishVolumeRequest& request) { return runtime @@ -103,7 +115,9 @@ Future<ControllerPublishVolumeResponse> Client::ControllerPublishVolume( } -Future<ControllerUnpublishVolumeResponse> Client::ControllerUnpublishVolume( +template <> +Future<ControllerUnpublishVolumeResponse> +Client::call<CONTROLLER_UNPUBLISH_VOLUME>( const ControllerUnpublishVolumeRequest& request) { return runtime @@ -118,7 +132,9 @@ Future<ControllerUnpublishVolumeResponse> Client::ControllerUnpublishVolume( } -Future<ValidateVolumeCapabilitiesResponse> Client::ValidateVolumeCapabilities( +template <> +Future<ValidateVolumeCapabilitiesResponse> +Client::call<VALIDATE_VOLUME_CAPABILITIES>( const ValidateVolumeCapabilitiesRequest& request) { return runtime @@ -133,7 +149,9 @@ Future<ValidateVolumeCapabilitiesResponse> Client::ValidateVolumeCapabilities( } -Future<ListVolumesResponse> Client::ListVolumes( +template <> +Future<ListVolumesResponse> +Client::call<LIST_VOLUMES>( const ListVolumesRequest& request) { return runtime @@ -145,7 +163,9 @@ Future<ListVolumesResponse> Client::ListVolumes( } -Future<GetCapacityResponse> Client::GetCapacity( +template <> +Future<GetCapacityResponse> +Client::call<GET_CAPACITY>( const GetCapacityRequest& request) { return runtime @@ -157,7 +177,9 @@ Future<GetCapacityResponse> Client::GetCapacity( } -Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities( +template <> +Future<ControllerGetCapabilitiesResponse> +Client::call<CONTROLLER_GET_CAPABILITIES>( const ControllerGetCapabilitiesRequest& request) { return runtime @@ -172,7 +194,9 @@ Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities( } -Future<NodeStageVolumeResponse> Client::NodeStageVolume( +template <> +Future<NodeStageVolumeResponse> +Client::call<NODE_STAGE_VOLUME>( const NodeStageVolumeRequest& request) { return runtime @@ -184,7 +208,9 @@ Future<NodeStageVolumeResponse> Client::NodeStageVolume( } -Future<NodeUnstageVolumeResponse> Client::NodeUnstageVolume( +template <> +Future<NodeUnstageVolumeResponse> +Client::call<NODE_UNSTAGE_VOLUME>( const NodeUnstageVolumeRequest& request) { return runtime @@ -196,7 +222,9 @@ Future<NodeUnstageVolumeResponse> Client::NodeUnstageVolume( } -Future<NodePublishVolumeResponse> Client::NodePublishVolume( +template <> +Future<NodePublishVolumeResponse> +Client::call<NODE_PUBLISH_VOLUME>( const NodePublishVolumeRequest& request) { return runtime @@ -208,7 +236,9 @@ Future<NodePublishVolumeResponse> Client::NodePublishVolume( } -Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume( +template <> +Future<NodeUnpublishVolumeResponse> +Client::call<NODE_UNPUBLISH_VOLUME>( const NodeUnpublishVolumeRequest& request) { return runtime @@ -220,7 +250,9 @@ Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume( } -Future<NodeGetIdResponse> Client::NodeGetId( +template <> +Future<NodeGetIdResponse> +Client::call<NODE_GET_ID>( const NodeGetIdRequest& request) { return runtime @@ -232,7 +264,9 @@ Future<NodeGetIdResponse> Client::NodeGetId( } -Future<NodeGetCapabilitiesResponse> Client::NodeGetCapabilities( +template <> +Future<NodeGetCapabilitiesResponse> +Client::call<NODE_GET_CAPABILITIES>( const NodeGetCapabilitiesRequest& request) { return runtime http://git-wip-us.apache.org/repos/asf/mesos/blob/15fc86d2/src/csi/client.hpp ---------------------------------------------------------------------- diff --git a/src/csi/client.hpp b/src/csi/client.hpp index 9d7019a..1c57ac5 100644 --- a/src/csi/client.hpp +++ b/src/csi/client.hpp @@ -17,12 +17,12 @@ #ifndef __CSI_CLIENT_HPP__ #define __CSI_CLIENT_HPP__ -#include <string> - #include <csi/spec.hpp> #include <process/grpc.hpp> +#include "csi/rpc.hpp" + namespace mesos { namespace csi { namespace v0 { @@ -34,65 +34,116 @@ public: const process::grpc::client::Runtime& _runtime) : connection(_connection), runtime(_runtime) {} - // RPCs for the Identity service. - process::Future<GetPluginInfoResponse> - GetPluginInfo(const GetPluginInfoRequest& request); + template <RPC rpc> + process::Future<typename RPCTraits<rpc>::response_type> call( + const typename RPCTraits<rpc>::request_type& request); - process::Future<GetPluginCapabilitiesResponse> - GetPluginCapabilities(const GetPluginCapabilitiesRequest& request); +private: + process::grpc::client::Connection connection; + process::grpc::client::Runtime runtime; +}; - process::Future<ProbeResponse> - Probe(const ProbeRequest& request); - // RPCs for the Controller service. - process::Future<CreateVolumeResponse> - CreateVolume(const CreateVolumeRequest& request); +template <> +process::Future<GetPluginInfoResponse> +Client::call<GET_PLUGIN_INFO>( + const GetPluginInfoRequest& request); - process::Future<DeleteVolumeResponse> - DeleteVolume(const DeleteVolumeRequest& request); - process::Future<ControllerPublishVolumeResponse> - ControllerPublishVolume(const ControllerPublishVolumeRequest& request); +template <> +process::Future<GetPluginCapabilitiesResponse> +Client::call<GET_PLUGIN_CAPABILITIES>( + const GetPluginCapabilitiesRequest& request); - process::Future<ControllerUnpublishVolumeResponse> - ControllerUnpublishVolume(const ControllerUnpublishVolumeRequest& request); - process::Future<ValidateVolumeCapabilitiesResponse> - ValidateVolumeCapabilities( - const ValidateVolumeCapabilitiesRequest& request); +template <> +process::Future<ProbeResponse> +Client::call<PROBE>( + const ProbeRequest& request); - process::Future<ListVolumesResponse> - ListVolumes(const ListVolumesRequest& request); - process::Future<GetCapacityResponse> - GetCapacity(const GetCapacityRequest& request); +template <> +process::Future<CreateVolumeResponse> +Client::call<CREATE_VOLUME>( + const CreateVolumeRequest& request); - process::Future<ControllerGetCapabilitiesResponse> - ControllerGetCapabilities(const ControllerGetCapabilitiesRequest& request); - // RPCs for the Node service. - process::Future<NodeStageVolumeResponse> - NodeStageVolume(const NodeStageVolumeRequest& request); +template <> +process::Future<DeleteVolumeResponse> +Client::call<DELETE_VOLUME>( + const DeleteVolumeRequest& request); - process::Future<NodeUnstageVolumeResponse> - NodeUnstageVolume(const NodeUnstageVolumeRequest& request); - process::Future<NodePublishVolumeResponse> - NodePublishVolume(const NodePublishVolumeRequest& request); +template <> +process::Future<ControllerPublishVolumeResponse> +Client::call<CONTROLLER_PUBLISH_VOLUME>( + const ControllerPublishVolumeRequest& request); - process::Future<NodeUnpublishVolumeResponse> - NodeUnpublishVolume(const NodeUnpublishVolumeRequest& request); - process::Future<NodeGetIdResponse> - NodeGetId(const NodeGetIdRequest& request); +template <> +process::Future<ControllerUnpublishVolumeResponse> +Client::call<CONTROLLER_UNPUBLISH_VOLUME>( + const ControllerUnpublishVolumeRequest& request); - process::Future<NodeGetCapabilitiesResponse> - NodeGetCapabilities(const NodeGetCapabilitiesRequest& request); -private: - process::grpc::client::Connection connection; - process::grpc::client::Runtime runtime; -}; +template <> +process::Future<ValidateVolumeCapabilitiesResponse> +Client::call<VALIDATE_VOLUME_CAPABILITIES>( + const ValidateVolumeCapabilitiesRequest& request); + + +template <> +process::Future<ListVolumesResponse> +Client::call<LIST_VOLUMES>( + const ListVolumesRequest& request); + + +template <> +process::Future<GetCapacityResponse> +Client::call<GET_CAPACITY>( + const GetCapacityRequest& request); + + +template <> +process::Future<ControllerGetCapabilitiesResponse> +Client::call<CONTROLLER_GET_CAPABILITIES>( + const ControllerGetCapabilitiesRequest& request); + + +template <> +process::Future<NodeStageVolumeResponse> +Client::call<NODE_STAGE_VOLUME>( + const NodeStageVolumeRequest& request); + + +template <> +process::Future<NodeUnstageVolumeResponse> +Client::call<NODE_UNSTAGE_VOLUME>( + const NodeUnstageVolumeRequest& request); + + +template <> +process::Future<NodePublishVolumeResponse> +Client::call<NODE_PUBLISH_VOLUME>( + const NodePublishVolumeRequest& request); + + +template <> +process::Future<NodeUnpublishVolumeResponse> +Client::call<NODE_UNPUBLISH_VOLUME>( + const NodeUnpublishVolumeRequest& request); + + +template <> +process::Future<NodeGetIdResponse> +Client::call<NODE_GET_ID>( + const NodeGetIdRequest& request); + + +template <> +process::Future<NodeGetCapabilitiesResponse> +Client::call<NODE_GET_CAPABILITIES>( + const NodeGetCapabilitiesRequest& request); } // namespace v0 { } // namespace csi { http://git-wip-us.apache.org/repos/asf/mesos/blob/15fc86d2/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 8a4b037..333336e 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -59,6 +59,7 @@ #include "csi/client.hpp" #include "csi/paths.hpp" +#include "csi/rpc.hpp" #include "csi/state.hpp" #include "csi/utils.hpp" @@ -375,6 +376,11 @@ private: void reconcileOperations( const Event::ReconcileOperations& reconcile); + template <csi::v0::RPC rpc> + Future<typename csi::v0::RPCTraits<rpc>::response_type> call( + csi::v0::Client client, + const typename csi::v0::RPCTraits<rpc>::request_type& request); + Future<csi::v0::Client> connect(const string& endpoint); Future<csi::v0::Client> getService(const ContainerID& containerId); Future<Nothing> killService(const ContainerID& containerId); @@ -498,6 +504,10 @@ private: // CSI plugin metrics. Counter csi_plugin_container_terminations; + hashmap<csi::v0::RPC, PushGauge> csi_plugin_rpcs_pending; + hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_successes; + hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_errors; + hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_cancelled; // Operation state metrics. hashmap<Offer::Operation::Type, PushGauge> operations_pending; @@ -1752,6 +1762,29 @@ void StorageLocalResourceProviderProcess::reconcileOperations( } +template <csi::v0::RPC rpc> +Future<typename csi::v0::RPCTraits<rpc>::response_type> +StorageLocalResourceProviderProcess::call( + csi::v0::Client client, + const typename csi::v0::RPCTraits<rpc>::request_type& request) +{ + ++metrics.csi_plugin_rpcs_pending.at(rpc); + + return client.call<rpc>(request) + .onAny(defer(self(), [=]( + const Future<typename csi::v0::RPCTraits<rpc>::response_type>& future) { + --metrics.csi_plugin_rpcs_pending.at(rpc); + if (future.isReady()) { + ++metrics.csi_plugin_rpcs_successes.at(rpc); + } else if (future.isFailed()) { + ++metrics.csi_plugin_rpcs_errors.at(rpc); + } else { + ++metrics.csi_plugin_rpcs_cancelled.at(rpc); + } + })); +} + + // Returns a future of a CSI client that waits for the endpoint socket // to appear if necessary, then connects to the socket and check its // readiness. @@ -1786,7 +1819,7 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect( return future .then(defer(self(), [=](csi::v0::Client client) { - return client.Probe(csi::v0::ProbeRequest()) + return call<csi::v0::PROBE>(client, csi::v0::ProbeRequest()) .then(defer(self(), [=](const csi::v0::ProbeResponse& response) { return client; })); @@ -2011,7 +2044,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService() return getService(nodeContainerId.get()) .then(defer(self(), [=](csi::v0::Client client) { // Get the plugin info. - return client.GetPluginInfo(csi::v0::GetPluginInfoRequest()) + return call<csi::v0::GET_PLUGIN_INFO>( + client, csi::v0::GetPluginInfoRequest()) .then(defer(self(), [=]( const csi::v0::GetPluginInfoResponse& response) { pluginInfo = response; @@ -2024,8 +2058,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService() })) .then(defer(self(), [=](csi::v0::Client client) { // Get the plugin capabilities. - return client.GetPluginCapabilities( - csi::v0::GetPluginCapabilitiesRequest()) + return call<csi::v0::GET_PLUGIN_CAPABILITIES>( + client, csi::v0::GetPluginCapabilitiesRequest()) .then(defer(self(), [=]( const csi::v0::GetPluginCapabilitiesResponse& response) { pluginCapabilities = response.capabilities(); @@ -2053,7 +2087,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService() return getService(controllerContainerId.get()) .then(defer(self(), [=](csi::v0::Client client) { // Get the controller plugin info and check for consistency. - return client.GetPluginInfo(csi::v0::GetPluginInfoRequest()) + return call<csi::v0::GET_PLUGIN_INFO>( + client, csi::v0::GetPluginInfoRequest()) .then(defer(self(), [=]( const csi::v0::GetPluginInfoResponse& response) { LOG(INFO) << "Controller plugin loaded: " << stringify(response); @@ -2071,8 +2106,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService() })) .then(defer(self(), [=](csi::v0::Client client) { // Get the controller capabilities. - return client.ControllerGetCapabilities( - csi::v0::ControllerGetCapabilitiesRequest()) + return call<csi::v0::CONTROLLER_GET_CAPABILITIES>( + client, csi::v0::ControllerGetCapabilitiesRequest()) .then(defer(self(), [=]( const csi::v0::ControllerGetCapabilitiesResponse& response) { controllerCapabilities = response.capabilities(); @@ -2092,7 +2127,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() return getService(nodeContainerId.get()) .then(defer(self(), [=](csi::v0::Client client) { // Get the node capabilities. - return client.NodeGetCapabilities(csi::v0::NodeGetCapabilitiesRequest()) + return call<csi::v0::NODE_GET_CAPABILITIES>( + client, csi::v0::NodeGetCapabilitiesRequest()) .then(defer(self(), [=]( const csi::v0::NodeGetCapabilitiesResponse& response) -> Future<csi::v0::Client> { @@ -2107,7 +2143,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() } // Get the node ID. - return client.NodeGetId(csi::v0::NodeGetIdRequest()) + return call<csi::v0::NODE_GET_ID>(client, csi::v0::NodeGetIdRequest()) .then(defer(self(), [=]( const csi::v0::NodeGetIdResponse& response) { nodeId = response.node_id(); @@ -2161,7 +2197,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish( request.set_readonly(false); *request.mutable_volume_attributes() = volume.state.volume_attributes(); - return client.ControllerPublishVolume(request) + return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>( + client, std::move(request)) .then(defer(self(), [this, volumeId]( const csi::v0::ControllerPublishVolumeResponse& response) { VolumeData& volume = volumes.at(volumeId); @@ -2217,7 +2254,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( request.set_volume_id(volumeId); request.set_node_id(nodeId.get()); - return client.ControllerUnpublishVolume(request) + return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>( + client, std::move(request)) .then(defer(self(), [this, volumeId] { VolumeData& volume = volumes.at(volumeId); @@ -2286,7 +2324,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage( ->CopyFrom(volume.state.volume_capability()); *request.mutable_volume_attributes() = volume.state.volume_attributes(); - return client.NodeStageVolume(request) + return call<csi::v0::NODE_STAGE_VOLUME>(client, std::move(request)) .then(defer(self(), [this, volumeId] { VolumeData& volume = volumes.at(volumeId); @@ -2349,7 +2387,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage( request.set_volume_id(volumeId); request.set_staging_target_path(stagingPath); - return client.NodeUnstageVolume(request) + return call<csi::v0::NODE_UNSTAGE_VOLUME>(client, std::move(request)) .then(defer(self(), [this, volumeId] { VolumeData& volume = volumes.at(volumeId); @@ -2420,7 +2458,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( request.set_staging_target_path(stagingPath); } - return client.NodePublishVolume(request) + return call<csi::v0::NODE_PUBLISH_VOLUME>(client, std::move(request)) .then(defer(self(), [this, volumeId] { VolumeData& volume = volumes.at(volumeId); @@ -2470,7 +2508,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( request.set_volume_id(volumeId); request.set_target_path(targetPath); - return client.NodeUnpublishVolume(request) + return call<csi::v0::NODE_UNPUBLISH_VOLUME>(client, std::move(request)) .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> { VolumeData& volume = volumes.at(volumeId); @@ -2515,7 +2553,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( request.add_volume_capabilities()->CopyFrom(profileInfo.capability); *request.mutable_parameters() = profileInfo.parameters; - return client.CreateVolume(request) + return call<csi::v0::CREATE_VOLUME>(client, std::move(request)) .then(defer(self(), [=](const csi::v0::CreateVolumeResponse& response) { const csi::v0::Volume& volume = response.volume(); @@ -2609,11 +2647,11 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( if (!preExisting) { deleted = deleted .then(defer(self(), &Self::getService, controllerContainerId.get())) - .then(defer(self(), [volumeId](csi::v0::Client client) { + .then(defer(self(), [this, volumeId](csi::v0::Client client) { csi::v0::DeleteVolumeRequest request; request.set_volume_id(volumeId); - return client.DeleteVolume(request) + return call<csi::v0::DELETE_VOLUME>(client, std::move(request)) .then([] { return Nothing(); }); })); } @@ -2681,7 +2719,8 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability( request.add_volume_capabilities()->CopyFrom(capability); *request.mutable_volume_attributes() = volumeAttributes; - return client.ValidateVolumeCapabilities(request) + return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>( + client, std::move(request)) .then(defer(self(), [=]( const csi::v0::ValidateVolumeCapabilitiesResponse& response) -> Future<string> { @@ -2722,7 +2761,7 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes() .then(defer(self(), [=](csi::v0::Client client) { // TODO(chhsiao): Set the max entries and use a loop to do // multiple `ListVolumes` calls. - return client.ListVolumes(csi::v0::ListVolumesRequest()) + return call<csi::v0::LIST_VOLUMES>(client, csi::v0::ListVolumesRequest()) .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) { Resources resources; @@ -2785,7 +2824,8 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities() request.add_volume_capabilities()->CopyFrom(profileInfo.capability); *request.mutable_parameters() = profileInfo.parameters; - futures.push_back(client.GetCapacity(request) + futures.push_back(call<csi::v0::GET_CAPACITY>( + client, std::move(request)) .then(defer(self(), [=]( const csi::v0::GetCapacityResponse& response) -> Resources { if (response.available_capacity() == 0) { @@ -3433,6 +3473,66 @@ StorageLocalResourceProviderProcess::Metrics::Metrics(const string& prefix) { process::metrics::add(csi_plugin_container_terminations); + vector<csi::v0::RPC> rpcs; + + // NOTE: We use a switch statement here as a compile-time sanity check so we + // won't forget to add metrics for new RPCs in the future. + csi::v0::RPC firstRpc = csi::v0::GET_PLUGIN_INFO; + switch (firstRpc) { + case csi::v0::GET_PLUGIN_INFO: + rpcs.push_back(csi::v0::GET_PLUGIN_INFO); + case csi::v0::GET_PLUGIN_CAPABILITIES: + rpcs.push_back(csi::v0::GET_PLUGIN_CAPABILITIES); + case csi::v0::PROBE: + rpcs.push_back(csi::v0::PROBE); + case csi::v0::CREATE_VOLUME: + rpcs.push_back(csi::v0::CREATE_VOLUME); + case csi::v0::DELETE_VOLUME: + rpcs.push_back(csi::v0::DELETE_VOLUME); + case csi::v0::CONTROLLER_PUBLISH_VOLUME: + rpcs.push_back(csi::v0::CONTROLLER_PUBLISH_VOLUME); + case csi::v0::CONTROLLER_UNPUBLISH_VOLUME: + rpcs.push_back(csi::v0::CONTROLLER_UNPUBLISH_VOLUME); + case csi::v0::VALIDATE_VOLUME_CAPABILITIES: + rpcs.push_back(csi::v0::VALIDATE_VOLUME_CAPABILITIES); + case csi::v0::LIST_VOLUMES: + rpcs.push_back(csi::v0::LIST_VOLUMES); + case csi::v0::GET_CAPACITY: + rpcs.push_back(csi::v0::GET_CAPACITY); + case csi::v0::CONTROLLER_GET_CAPABILITIES: + rpcs.push_back(csi::v0::CONTROLLER_GET_CAPABILITIES); + case csi::v0::NODE_STAGE_VOLUME: + rpcs.push_back(csi::v0::NODE_STAGE_VOLUME); + case csi::v0::NODE_UNSTAGE_VOLUME: + rpcs.push_back(csi::v0::NODE_UNSTAGE_VOLUME); + case csi::v0::NODE_PUBLISH_VOLUME: + rpcs.push_back(csi::v0::NODE_PUBLISH_VOLUME); + case csi::v0::NODE_UNPUBLISH_VOLUME: + rpcs.push_back(csi::v0::NODE_UNPUBLISH_VOLUME); + case csi::v0::NODE_GET_ID: + rpcs.push_back(csi::v0::NODE_GET_ID); + case csi::v0::NODE_GET_CAPABILITIES: + rpcs.push_back(csi::v0::NODE_GET_CAPABILITIES); + } + + foreach (const csi::v0::RPC& rpc, rpcs) { + const string name = stringify(rpc); + + csi_plugin_rpcs_pending.put( + rpc, PushGauge(prefix + "csi_plugin/rpcs/" + name + "/pending")); + csi_plugin_rpcs_successes.put( + rpc, Counter(prefix + "csi_plugin/rpcs/" + name + "/successes")); + csi_plugin_rpcs_errors.put( + rpc, Counter(prefix + "csi_plugin/rpcs/" + name + "/errors")); + csi_plugin_rpcs_cancelled.put( + rpc, Counter(prefix + "csi_plugin/rpcs/" + name + "/cancelled")); + + process::metrics::add(csi_plugin_rpcs_pending.at(rpc)); + process::metrics::add(csi_plugin_rpcs_successes.at(rpc)); + process::metrics::add(csi_plugin_rpcs_errors.at(rpc)); + process::metrics::add(csi_plugin_rpcs_cancelled.at(rpc)); + } + vector<Offer::Operation::Type> operationTypes; // NOTE: We use a switch statement here as a compile-time sanity check so we @@ -3499,6 +3599,22 @@ StorageLocalResourceProviderProcess::Metrics::~Metrics() { process::metrics::remove(csi_plugin_container_terminations); + foreachvalue (const PushGauge& gauge, csi_plugin_rpcs_pending) { + process::metrics::remove(gauge); + } + + foreachvalue (const Counter& counter, csi_plugin_rpcs_successes) { + process::metrics::remove(counter); + } + + foreachvalue (const Counter& counter, csi_plugin_rpcs_errors) { + process::metrics::remove(counter); + } + + foreachvalue (const Counter& counter, csi_plugin_rpcs_cancelled) { + process::metrics::remove(counter); + } + foreachvalue (const PushGauge& gauge, operations_pending) { process::metrics::remove(gauge); } http://git-wip-us.apache.org/repos/asf/mesos/blob/15fc86d2/src/tests/csi_client_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp index d5993d6..39dea56 100644 --- a/src/tests/csi_client_tests.cpp +++ b/src/tests/csi_client_tests.cpp @@ -14,22 +14,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include <functional> +#include <ostream> + #include <process/gtest.hpp> -#include <stout/lambda.hpp> -#include <stout/path.hpp> +#include <stout/nothing.hpp> #include <stout/strings.hpp> +#include <stout/unreachable.hpp> #include <stout/tests/utils.hpp> #include "csi/client.hpp" +#include "csi/rpc.hpp" #include "tests/mock_csi_plugin.hpp" +using std::ostream; using std::string; -using mesos::csi::v0::Client; - using process::Future; using process::grpc::client::Connection; @@ -47,22 +50,27 @@ struct RPCParam { struct Printer { - const string& operator()(const TestParamInfo<RPCParam>& info) const + string operator()(const TestParamInfo<RPCParam>& info) const { - return info.param.name; + return strings::replace(stringify(info.param.value), ".", "_"); } }; - template <typename Request, typename Response> - RPCParam(const string& _name, Future<Response>(Client::*rpc)(const Request&)) - : name(_name), - call([=](const Connection& connection, const Runtime runtime) { - return (Client(connection, runtime).*rpc)(Request()) + template <csi::v0::RPC rpc> + static RPCParam create() + { + return RPCParam{ + rpc, + [](csi::v0::Client client) { + return client + .call<rpc>(typename csi::v0::RPCTraits<rpc>::request_type()) .then([] { return Nothing(); }); - }) {} + } + }; + } - string name; - lambda::function<Future<Nothing>(const Connection&, const Runtime&)> call; + const csi::v0::RPC value; + const std::function<Future<Nothing>(csi::v0::Client)> call; }; @@ -95,51 +103,49 @@ protected: }; -#define RPC_PARAM(method) \ - RPCParam(strings::replace(#method, "::", "_"), &method) - - INSTANTIATE_TEST_CASE_P( Identity, CSIClientTest, Values( - RPC_PARAM(Client::GetPluginInfo), - RPC_PARAM(Client::GetPluginCapabilities), - RPC_PARAM(Client::Probe)), + RPCParam::create<csi::v0::GET_PLUGIN_INFO>(), + RPCParam::create<csi::v0::GET_PLUGIN_CAPABILITIES>(), + RPCParam::create<csi::v0::PROBE>()), RPCParam::Printer()); + INSTANTIATE_TEST_CASE_P( Controller, CSIClientTest, Values( - RPC_PARAM(Client::CreateVolume), - RPC_PARAM(Client::DeleteVolume), - RPC_PARAM(Client::ControllerPublishVolume), - RPC_PARAM(Client::ControllerUnpublishVolume), - RPC_PARAM(Client::ValidateVolumeCapabilities), - RPC_PARAM(Client::ListVolumes), - RPC_PARAM(Client::GetCapacity), - RPC_PARAM(Client::ControllerGetCapabilities)), + RPCParam::create<csi::v0::CREATE_VOLUME>(), + RPCParam::create<csi::v0::DELETE_VOLUME>(), + RPCParam::create<csi::v0::CONTROLLER_PUBLISH_VOLUME>(), + RPCParam::create<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(), + RPCParam::create<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(), + RPCParam::create<csi::v0::LIST_VOLUMES>(), + RPCParam::create<csi::v0::GET_CAPACITY>(), + RPCParam::create<csi::v0::CONTROLLER_GET_CAPABILITIES>()), RPCParam::Printer()); + INSTANTIATE_TEST_CASE_P( Node, CSIClientTest, Values( - RPC_PARAM(Client::NodeStageVolume), - RPC_PARAM(Client::NodeUnstageVolume), - RPC_PARAM(Client::NodePublishVolume), - RPC_PARAM(Client::NodeUnpublishVolume), - RPC_PARAM(Client::NodeGetId), - RPC_PARAM(Client::NodeGetCapabilities)), + RPCParam::create<csi::v0::NODE_STAGE_VOLUME>(), + RPCParam::create<csi::v0::NODE_UNSTAGE_VOLUME>(), + RPCParam::create<csi::v0::NODE_PUBLISH_VOLUME>(), + RPCParam::create<csi::v0::NODE_UNPUBLISH_VOLUME>(), + RPCParam::create<csi::v0::NODE_GET_ID>(), + RPCParam::create<csi::v0::NODE_GET_CAPABILITIES>()), RPCParam::Printer()); // This test verifies that the all methods of CSI clients work. TEST_P(CSIClientTest, Call) { - Future<Nothing> call = GetParam().call(connection.get(), runtime); - AWAIT_EXPECT_READY(call); + AWAIT_EXPECT_READY( + GetParam().call(csi::v0::Client(connection.get(), runtime))); } } // namespace tests {