This is an automated email from the ASF dual-hosted git repository. bennoe pushed a commit to branch 1.9.x in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/1.9.x by this push: new b18ce53 Gracefully handled duplicated volumes from non-conforming CSI plugins. b18ce53 is described below commit b18ce53fe8e49e6f030efe89e0976a9f72ad8b50 Author: Chun-Hung Hsiao <chhs...@apache.org> AuthorDate: Fri Aug 30 13:05:37 2019 +0200 Gracefully handled duplicated volumes from non-conforming CSI plugins. If the SLRP uses a plugin that does not conform to the CSI spec and reports duplicated volumes, the duplicate would be removed. Review: https://reviews.apache.org/r/71414/ --- src/resource_provider/storage/provider.cpp | 78 +++++++++------ .../storage_local_resource_provider_tests.cpp | 109 +++++++++++++++++++++ 2 files changed, 159 insertions(+), 28 deletions(-) diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index f180af8..0a8dc26 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -673,8 +673,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() } } - LOG(INFO) << "Finished recovery for resource provider with type '" - << info.type() << "' and name '" << info.name() << "'"; + LOG(INFO) + << "Recovered resources '" << totalResources << "' and " + << operations.size() << " operations for resource provider with type '" + << info.type() << "' and name '" << info.name() << "'"; state = DISCONNECTED; @@ -1081,45 +1083,73 @@ StorageLocalResourceProviderProcess::getExistingVolumes() return volumeManager->listVolumes() .then(defer(self(), [=](const vector<VolumeInfo>& volumeInfos) { + // If a volume is duplicated or a volume context has been changed by a + // non-conforming CSI plugin, we need to construct a resources conversion + // to remove the duplicate and update the metadata, so we maintain the + // resources to be removed and those to be added here. + Resources toRemove; + Resources toAdd; + // Since we only support "exclusive" (MOUNT or BLOCK) disks, there should // be only one checkpointed resource for each volume ID. hashmap<string, Resource> checkpointedMap; foreach (const Resource& resource, totalResources) { if (resource.disk().source().has_id()) { - CHECK(!checkpointedMap.contains(resource.disk().source().id())); - checkpointedMap.put(resource.disk().source().id(), resource); + // If the checkpointed resources contain duplicated volumes because of + // a non-conforming CSI plugin, remove the duplicate. + if (checkpointedMap.contains(resource.disk().source().id())) { + LOG(WARNING) << "Removing duplicated volume '" << resource + << "' from the total resources"; + + toRemove += resource; + } else { + checkpointedMap.put(resource.disk().source().id(), resource); + } } } // The "discovered" resources consist of RAW disk resources, one for each // volume reported by the CSI plugin. Resources discovered; - - // If any volume context has been changed by a non-conforming CSI plugin, - // we need to construct a resources conversion to reflect the - // corresponding metadata changes, so we maintain the resources to be - // removed and those to be added here. - Resources metadataToRemove; - Resources metadataToAdd; + hashset<string> discoveredVolumeIds; foreach (const VolumeInfo& volumeInfo, volumeInfos) { - Option<string> profile; - Option<Labels> metadata = volumeInfo.context.empty() + const Option<string> profile = + checkpointedMap.contains(volumeInfo.id) && + checkpointedMap.at(volumeInfo.id).disk().source().has_profile() + ? checkpointedMap.at(volumeInfo.id).disk().source().profile() + : Option<string>::none(); + + const Option<Labels> metadata = volumeInfo.context.empty() ? Option<Labels>::none() : convertStringMapToLabels(volumeInfo.context); + const Resource resource = createRawDiskResource( + info, + volumeInfo.capacity, + profile, + vendor, + volumeInfo.id, + metadata); + + if (discoveredVolumeIds.contains(volumeInfo.id)) { + LOG(WARNING) << "Dropping duplicated volume '" << resource + << "' from the discovered resources"; + + continue; + } + + discovered += resource; + discoveredVolumeIds.insert(volumeInfo.id); + if (checkpointedMap.contains(volumeInfo.id)) { const Resource& resource = checkpointedMap.at(volumeInfo.id); - if (resource.disk().source().has_profile()) { - profile = resource.disk().source().profile(); - } - // If the volume context has been changed by a non-conforming CSI // plugin, the changes will be reflected in a resource conversion. if (resource.disk().source().metadata() != metadata.getOrElse(Labels())) { - metadataToRemove += resource; + toRemove += resource; Resource changed = resource; if (metadata.isSome()) { @@ -1129,21 +1159,13 @@ StorageLocalResourceProviderProcess::getExistingVolumes() changed.mutable_disk()->mutable_source()->clear_metadata(); } - metadataToAdd += changed; + toAdd += changed; } } - - discovered += createRawDiskResource( - info, - volumeInfo.capacity, - profile, - vendor, - volumeInfo.id, - metadata); } ResourceConversion metadataConversion( - std::move(metadataToRemove), std::move(metadataToAdd)); + std::move(toRemove), std::move(toAdd)); Resources checkpointed = CHECK_NOTERROR( totalResources.filter([](const Resource& resource) { diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 05daf2a..089aa97 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -1374,6 +1374,115 @@ TEST_P(StorageLocalResourceProviderTest, RecoverDiskWithChangedMetadata) } +// This test verifies that the storage local resource provider can properly +// handle duplicated volumes. This is a regression test for MESOS-9965. +TEST_P(StorageLocalResourceProviderTest, RecoverDuplicatedVolumes) +{ + const string mockCsiEndpoint = + "unix://" + path::join(sandbox.get(), "mock_csi.sock"); + + MockCSIPlugin plugin; + ASSERT_SOME(plugin.startup(mockCsiEndpoint)); + + setupResourceProviderConfig(Bytes(0), None(), mockCsiEndpoint); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + if (GetParam() == csi::v0::API_VERSION) { + EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v0::ListVolumesResponse*>())) + .WillRepeatedly(Invoke([&]( + grpc::ServerContext* context, + const csi::v0::ListVolumesRequest* request, + csi::v0::ListVolumesResponse* response) { + csi::v0::Volume volume; + volume.set_capacity_bytes(Gigabytes(2).bytes()); + volume.set_id("volume1"); + + // Report duplicated volumes. + *response->add_entries()->mutable_volume() = volume; + *response->add_entries()->mutable_volume() = volume; + + return grpc::Status::OK; + })); + } else { + EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v1::ListVolumesResponse*>())) + .WillRepeatedly(Invoke([&]( + grpc::ServerContext* context, + const csi::v1::ListVolumesRequest* request, + csi::v1::ListVolumesResponse* response) { + csi::v1::Volume volume; + volume.set_capacity_bytes(Gigabytes(2).bytes()); + volume.set_volume_id("volume1"); + + // Report duplicated volumes. + *response->add_entries()->mutable_volume() = volume; + *response->add_entries()->mutable_volume() = volume; + + return grpc::Status::OK; + })); + } + + slave::Flags slaveFlags = CreateSlaveFlags(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + // Register a framework to exercise operations. + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + framework.set_roles(0, "storage/role"); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + // We use the following filter to filter offers that do not have wanted + // resources for 365 days (the maximum). + Filters declineFilters; + declineFilters.set_refuse_seconds(Days(365).secs()); + + // Decline unwanted offers. The master can send such offers before the + // resource provider receives profile updates. + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(DeclineOffers(declineFilters)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + &Resources::hasResourceProvider))) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + AWAIT_READY(offers); + + // Restart the agent. + EXPECT_CALL(sched, offerRescinded(_, _)); + + slave.get()->terminate(); + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + &Resources::hasResourceProvider))) + .WillOnce(FutureArg<1>(&offers)); + + slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + // Wait for an offer to verify that the resource provider comes back. + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + Offer offer = offers->at(0); + + Resources resourceProviderResources = + Resources(offer.resources()).filter(&Resources::hasResourceProvider); + + EXPECT_SOME_EQ(Gigabytes(2), resourceProviderResources.disk()); +} + + // This test verifies that a framework cannot create a volume during and after // the profile disappears, and destroying a volume with a stale profile will // recover the freed disk with another appeared profile.