Returned profiles based on provider selectors in UriDiskProfileAdaptor. Now the URI disk profile adaptor module will return the set of profiles in which each profile is either known to a storage resource provider or applies to it (based on the resource provider selector) when it watches for profiles.
Review: https://reviews.apache.org/r/65566/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/58add5a2 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/58add5a2 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/58add5a2 Branch: refs/heads/master Commit: 58add5a2c615f0dc5620da9a125ca121c632908c Parents: 921f61f Author: Chun-Hung Hsiao <chhs...@mesosphere.io> Authored: Thu Feb 8 14:41:31 2018 -0800 Committer: Jie Yu <yujie....@gmail.com> Committed: Thu Feb 8 16:24:02 2018 -0800 ---------------------------------------------------------------------- .../storage/uri_disk_profile.cpp | 75 +++++++++++++------- .../storage/uri_disk_profile.hpp | 35 ++++----- 2 files changed, 68 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/58add5a2/src/resource_provider/storage/uri_disk_profile.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/uri_disk_profile.cpp b/src/resource_provider/storage/uri_disk_profile.cpp index 5a48656..665798f 100644 --- a/src/resource_provider/storage/uri_disk_profile.cpp +++ b/src/resource_provider/storage/uri_disk_profile.cpp @@ -124,7 +124,7 @@ UriDiskProfileAdaptorProcess::UriDiskProfileAdaptorProcess( const Flags& _flags) : ProcessBase(ID::generate("uri-volume-profile")), flags(_flags), - watchPromise(new Promise<hashset<string>>()) {} + watchPromise(new Promise<Nothing>()) {} void UriDiskProfileAdaptorProcess::initialize() @@ -138,11 +138,21 @@ Future<DiskProfileAdaptor::ProfileInfo> const string& profile, const ResourceProviderInfo& resourceProviderInfo) { - if (data.count(profile) != 1) { + if (profileMatrix.count(profile) != 1) { return Failure("Profile '" + profile + "' not found"); } - return data.at(profile); + const DiskProfileMapping::CSIManifest& manifest = profileMatrix.at(profile); + + // TODO(chhsiao): A storage resource provider may need to translate + // a profile that no longer applies to it to replay a `CREATE_VOLUME` + // or `CREATE_BLOCK` operation during recovery, so resource provider + // selection is only done in `watch()` but not here. We should do the + // selection once profiles are checkpointed in the resource provider. + return DiskProfileAdaptor::ProfileInfo{ + manifest.volume_capabilities(), + manifest.create_parameters() + }; } @@ -150,11 +160,31 @@ Future<hashset<string>> UriDiskProfileAdaptorProcess::watch( const hashset<string>& knownProfiles, const ResourceProviderInfo& resourceProviderInfo) { - if (profiles != knownProfiles) { - return profiles; + // Calculate the new set of profiles for the resource provider. + // TODO(chhsiao): A storage resource provider assumes that the new set + // should be a superset of `knownProfiles`, so we bypass resource + // provider selection if a profile is already known. We should do the + // selection once profiles are checkpointed in the resource provider. + hashset<string> newProfiles = knownProfiles; + foreachpair (const string& profile, + const DiskProfileMapping::CSIManifest& manifest, + profileMatrix) { + if (knownProfiles.contains(profile)) { + continue; + } + + if (isSelectedResourceProvider(manifest, resourceProviderInfo)) { + newProfiles.insert(profile); + } } - return watchPromise->future(); + if (newProfiles != knownProfiles) { + return newProfiles; + } + + // Wait for the next update if there is no change. + return watchPromise->future() + .then(defer(self(), &Self::watch, knownProfiles, resourceProviderInfo)); } @@ -212,7 +242,9 @@ void UriDiskProfileAdaptorProcess::notify( { bool hasErrors = false; - foreachkey (const string& profile, data) { + foreachpair (const string& profile, + const DiskProfileMapping::CSIManifest& manifest, + profileMatrix) { if (parsed.profile_matrix().count(profile) != 1) { hasErrors = true; @@ -223,11 +255,11 @@ void UriDiskProfileAdaptorProcess::notify( } bool matchingCapability = - data.at(profile).capability == + manifest.volume_capabilities() == parsed.profile_matrix().at(profile).volume_capabilities(); bool matchingParameters = - data.at(profile).parameters == + manifest.create_parameters() == parsed.profile_matrix().at(profile).create_parameters(); if (!matchingCapability || !matchingParameters) { @@ -249,35 +281,26 @@ void UriDiskProfileAdaptorProcess::notify( // Profiles can only be added, so if the parsed data is the same size, // nothing has changed and no notifications need to be sent. - if (parsed.profile_matrix().size() <= data.size()) { + if (parsed.profile_matrix().size() <= profileMatrix.size()) { return; } // The fetched mapping satisfies our invariants. - // Save the protobuf as a map we can expose through the module interface. - // And update the convenience set of profile names. - profiles.clear(); - auto iterator = parsed.profile_matrix().begin(); - while (iterator != parsed.profile_matrix().end()) { - data[iterator->first] = { - iterator->second.volume_capabilities(), - iterator->second.create_parameters() - }; - - profiles.insert(iterator->first); - iterator++; - } + // Save the protobuf as a map. + profileMatrix = map<string, DiskProfileMapping::CSIManifest>( + parsed.profile_matrix().begin(), + parsed.profile_matrix().end()); // Notify any watchers and then prepare a new promise for the next // iteration of polling. // // TODO(josephw): Delay this based on the `--max_random_wait` option. - watchPromise->set(profiles); - watchPromise.reset(new Promise<hashset<string>>()); + watchPromise->set(Nothing()); + watchPromise.reset(new Promise<Nothing>()); LOG(INFO) - << "Updated disk profile mapping to " << profiles.size() + << "Updated disk profile mapping to " << profileMatrix.size() << " total profiles"; } http://git-wip-us.apache.org/repos/asf/mesos/blob/58add5a2/src/resource_provider/storage/uri_disk_profile.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/uri_disk_profile.hpp b/src/resource_provider/storage/uri_disk_profile.hpp index 746111e..22e9d8b 100644 --- a/src/resource_provider/storage/uri_disk_profile.hpp +++ b/src/resource_provider/storage/uri_disk_profile.hpp @@ -58,17 +58,23 @@ struct Flags : public virtual flags::FlagsBase "This module supports both HTTP(s) and file URIs\n." "\n" "The JSON object should consist of some top-level string keys\n" - "corresponding to the disk profile name. Each value should\n" - "contain a `VolumeCapability` under a 'volume_capabilities'\n" + "corresponding to the disk profile name. Each value should contain\n" + "a `ResourceProviderSelector` under 'resource_provider_selector' or\n" + "a `CSIPluginTypeSelector` under 'csi_plugin_type_selector' to\n" + "specify the set of resource providers this profile applies to,\n" + "followed by a `VolumeCapability` under 'volume_capabilities'\n" "and a free-form string-string mapping under 'create_parameters'.\n" "\n" "The JSON is modeled after a protobuf found in\n" - "`src/csi/uri_disk_profile.proto`.\n" + "`src/resource_provider/storage/disk_profile.proto`.\n" "\n" "For example:\n" "{\n" " \"profile_matrix\" : {\n" " \"my-profile\" : {\n" + " \"csi_plugin_type_selector\": {\n" + " \"plugin_type\" : \"org.apache.mesos.csi.test\"\n" + " \"},\n" " \"volume_capabilities\" : {\n" " \"block\" : {},\n" " \"access_mode\" : { \"mode\" : \"SINGLE_NODE_WRITER\" }\n" @@ -171,14 +177,14 @@ struct Flags : public virtual flags::FlagsBase // and assumes that all fetched profiles are meant for all resource providers. // // See `Flags` above for more information. -class UriDiskProfileAdaptor : public mesos::DiskProfileAdaptor +class UriDiskProfileAdaptor : public DiskProfileAdaptor { public: UriDiskProfileAdaptor(const Flags& _flags); virtual ~UriDiskProfileAdaptor(); - virtual process::Future<mesos::DiskProfileAdaptor::ProfileInfo> translate( + virtual process::Future<DiskProfileAdaptor::ProfileInfo> translate( const std::string& profile, const ResourceProviderInfo& resourceProviderInfo) override; @@ -200,7 +206,7 @@ public: virtual void initialize() override; - process::Future<mesos::DiskProfileAdaptor::ProfileInfo> translate( + process::Future<DiskProfileAdaptor::ProfileInfo> translate( const std::string& profile, const ResourceProviderInfo& resourceProviderInfo); @@ -226,19 +232,16 @@ private: Flags flags; // The last fetched profile mapping. - // This module assumes that profiles can only be added and never removed. - // Once added, profiles cannot be changed either. + // This module assumes that profiles can only be added and never + // removed. Once added, a profile's volume capability and parameters + // cannot be changed either. // // TODO(josephw): Consider persisting this mapping across agent restarts. - std::map<std::string, DiskProfileAdaptor::ProfileInfo> data; + std::map<std::string, resource_provider::DiskProfileMapping::CSIManifest> + profileMatrix; - // Convenience set of the keys in `data` above. - // This module does not filter based on `CSIPluginInfo::type`, so this - // is valid for all input to `watch(...)`. - hashset<std::string> profiles; - - // Will be satisfied whenever `data` is changed. - process::Owned<process::Promise<hashset<std::string>>> watchPromise; + // Will be satisfied whenever `profileMatrix` is changed. + process::Owned<process::Promise<Nothing>> watchPromise; }; } // namespace profile {