Added a unit test for CSI plugin RPC metrics. This patch adds the `ROOT_CsiPluginRpcMetrics` test that issues a `CREATE_VOLUME` followed by a `DESTROY_VOLUME`, which would fail due to an out-of-band deletion of the actual volume.
Review: https://reviews.apache.org/r/67256 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1a1f0bab Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1a1f0bab Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1a1f0bab Branch: refs/heads/master Commit: 1a1f0bab2fde34095c643cefdb24d700441048d0 Parents: 15fc86d Author: Chun-Hung Hsiao <chhs...@mesosphere.io> Authored: Tue May 22 14:47:03 2018 -0700 Committer: Chun-Hung Hsiao <chhs...@mesosphere.io> Committed: Thu May 31 18:29:56 2018 -0700 ---------------------------------------------------------------------- .../storage_local_resource_provider_tests.cpp | 293 +++++++++++++++++++ 1 file changed, 293 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/1a1f0bab/src/tests/storage_local_resource_provider_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 9bd8558..17df704 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -3116,6 +3116,299 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_OperationStateMetrics) } +// This test verifies that storage local resource provider properly +// reports metrics related to RPCs to CSI plugins. +// TODO(chhsiao): Currently there is no way to test the `pending` and +// `cancelled` metrics for RPCs since we have no control over the completion of +// an operation. Once we support out-of-band CSI plugins through domain sockets, +// we could test these metrics against a mock CSI plugin. +TEST_F(StorageLocalResourceProviderTest, ROOT_CsiPluginRpcMetrics) +{ + loadUriDiskProfileAdaptorModule(); + + setupResourceProviderConfig(Gigabytes(4)); + setupDiskProfileMapping(); + + master::Flags masterFlags = CreateMasterFlags(); + masterFlags.allocation_interval = Milliseconds(50); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.isolation = "filesystem/linux"; + + slaveFlags.resource_provider_config_dir = resourceProviderConfigDir; + slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // Register a framework to exercise operations. + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + framework.set_roles(0, "storage"); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + // The framework is expected to see the following offers in sequence: + // 1. One containing a RAW disk resource before `CREATE_VOLUME`. + // 2. One containing a MOUNT disk resource after `CREATE_VOLUME`. + // 3. One containing the same MOUNT disk resource after a failed + // `DESTROY_VOLUME`. + // + // We set up the expectations for these offers as the test progresses. + Future<vector<Offer>> rawDiskOffers; + Future<vector<Offer>> volumeCreatedOffers; + Future<vector<Offer>> operationFailedOffers; + + Sequence offers; + + // We use the following filter to filter offers that do not have + // wanted resources for 365 days (the maximum). + Filters declineFilters; + declineFilters.set_refuse_seconds(Days(365).secs()); + + // Decline offers that contain only the agent's default resources. + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(DeclineOffers(declineFilters)); + + // We are only interested in any storage pool or created volume which + // has a "volume-default" profile. + auto hasSourceType = []( + const Resource& r, + const Resource::DiskInfo::Source::Type& type) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().has_profile() && + r.disk().source().profile() == "volume-default" && + r.disk().source().type() == type; + }; + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW)))) + .InSequence(offers) + .WillOnce(FutureArg<1>(&rawDiskOffers)); + + driver.start(); + + AWAIT_READY(rawDiskOffers); + ASSERT_FALSE(rawDiskOffers->empty()); + + Option<Resource> source; + + foreach (const Resource& resource, rawDiskOffers->at(0).resources()) { + if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) { + source = resource; + break; + } + } + + ASSERT_SOME(source); + + JSON::Object snapshot = Metrics(); + + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Identity.Probe/successes"))); + EXPECT_EQ(1, snapshot.values.at( metricName( + "csi_plugin/rpcs/csi.v0.Identity.Probe/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Identity.GetPluginInfo/successes"))); + EXPECT_EQ(2, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Identity.GetPluginInfo/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Identity.GetPluginCapabilities/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Identity.GetPluginCapabilities/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.ControllerGetCapabilities/successes"))); // NOLINT(whitespace/line_length) + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.ControllerGetCapabilities/successes"))); // NOLINT(whitespace/line_length) + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.ListVolumes/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.ListVolumes/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.GetCapacity/successes"))); + EXPECT_EQ(2, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.GetCapacity/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes"))); + EXPECT_EQ(0, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors"))); + EXPECT_EQ(0, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Node.NodeGetCapabilities/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Node.NodeGetCapabilities/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Node.NodeGetId/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Node.NodeGetId/successes"))); + + // Create a volume. + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT)))) + .InSequence(offers) + .WillOnce(FutureArg<1>(&volumeCreatedOffers)); + + // We use the following filter so that the resources will not be + // filtered for 5 seconds (the default). + Filters acceptFilters; + acceptFilters.set_refuse_seconds(0); + + driver.acceptOffers( + {rawDiskOffers->at(0).id()}, + {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)}, + acceptFilters); + + AWAIT_READY(volumeCreatedOffers); + ASSERT_FALSE(volumeCreatedOffers->empty()); + + Option<Resource> volume; + + foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) { + if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) { + volume = resource; + break; + } + } + + ASSERT_SOME(volume); + ASSERT_TRUE(volume->disk().source().has_id()); + ASSERT_TRUE(volume->disk().source().has_metadata()); + ASSERT_TRUE(volume->disk().source().has_mount()); + ASSERT_TRUE(volume->disk().source().mount().has_root()); + EXPECT_FALSE(path::absolute(volume->disk().source().mount().root())); + + snapshot = Metrics(); + + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Identity.Probe/successes"))); + EXPECT_EQ(1, snapshot.values.at( metricName( + "csi_plugin/rpcs/csi.v0.Identity.Probe/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Identity.GetPluginInfo/successes"))); + EXPECT_EQ(2, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Identity.GetPluginInfo/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Identity.GetPluginCapabilities/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Identity.GetPluginCapabilities/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.ControllerGetCapabilities/successes"))); // NOLINT(whitespace/line_length) + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.ControllerGetCapabilities/successes"))); // NOLINT(whitespace/line_length) + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.ListVolumes/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.ListVolumes/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.GetCapacity/successes"))); + EXPECT_EQ(2, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.GetCapacity/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors"))); + EXPECT_EQ(0, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Node.NodeGetCapabilities/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Node.NodeGetCapabilities/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Node.NodeGetId/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Node.NodeGetId/successes"))); + + // Remove the volume out of band to fail `DESTROY_VOLUME`. + Option<string> volumePath; + + foreach (const Label& label, volume->disk().source().metadata().labels()) { + if (label.key() == "path") { + volumePath = label.value(); + break; + } + } + + ASSERT_SOME(volumePath); + ASSERT_SOME(os::rmdir(volumePath.get())); + + // Destroy the created volume, which will fail. + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(volume.get()))) + .InSequence(offers) + .WillOnce(FutureArg<1>(&operationFailedOffers)) + .WillRepeatedly(DeclineOffers(declineFilters)); // Decline further offers. + + driver.acceptOffers( + {volumeCreatedOffers->at(0).id()}, + {DESTROY_VOLUME(volume.get())}, + acceptFilters); + + AWAIT_READY(operationFailedOffers); + ASSERT_FALSE(operationFailedOffers->empty()); + + snapshot = Metrics(); + + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Identity.Probe/successes"))); + EXPECT_EQ(1, snapshot.values.at( metricName( + "csi_plugin/rpcs/csi.v0.Identity.Probe/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Identity.GetPluginInfo/successes"))); + EXPECT_EQ(2, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Identity.GetPluginInfo/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Identity.GetPluginCapabilities/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Identity.GetPluginCapabilities/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.ControllerGetCapabilities/successes"))); // NOLINT(whitespace/line_length) + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.ControllerGetCapabilities/successes"))); // NOLINT(whitespace/line_length) + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.ListVolumes/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.ListVolumes/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.GetCapacity/successes"))); + EXPECT_EQ(2, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.GetCapacity/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Node.NodeGetCapabilities/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Node.NodeGetCapabilities/successes"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_plugin/rpcs/csi.v0.Node.NodeGetId/successes"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_plugin/rpcs/csi.v0.Node.NodeGetId/successes"))); +} + + // Master reconciles operations that are missing from a reregistering slave. // In this case, the `ApplyOperationMessage` is dropped, so the resource // provider should send OPERATION_DROPPED. Operations on agent default