Returned profiles based on provider selectors in UriDiskProfileAdaptor.

Now the URI disk profile adaptor module will return the set of profiles
in which each profile is either known to a storage resource provider or
applies to it (based on the resource provider selector) when it watches
for profiles.

Review: https://reviews.apache.org/r/65566/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/58add5a2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/58add5a2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/58add5a2

Branch: refs/heads/master
Commit: 58add5a2c615f0dc5620da9a125ca121c632908c
Parents: 921f61f
Author: Chun-Hung Hsiao <chhs...@mesosphere.io>
Authored: Thu Feb 8 14:41:31 2018 -0800
Committer: Jie Yu <yujie....@gmail.com>
Committed: Thu Feb 8 16:24:02 2018 -0800

----------------------------------------------------------------------
 .../storage/uri_disk_profile.cpp                | 75 +++++++++++++-------
 .../storage/uri_disk_profile.hpp                | 35 ++++-----
 2 files changed, 68 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/58add5a2/src/resource_provider/storage/uri_disk_profile.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/uri_disk_profile.cpp 
b/src/resource_provider/storage/uri_disk_profile.cpp
index 5a48656..665798f 100644
--- a/src/resource_provider/storage/uri_disk_profile.cpp
+++ b/src/resource_provider/storage/uri_disk_profile.cpp
@@ -124,7 +124,7 @@ UriDiskProfileAdaptorProcess::UriDiskProfileAdaptorProcess(
     const Flags& _flags)
   : ProcessBase(ID::generate("uri-volume-profile")),
     flags(_flags),
-    watchPromise(new Promise<hashset<string>>()) {}
+    watchPromise(new Promise<Nothing>()) {}
 
 
 void UriDiskProfileAdaptorProcess::initialize()
@@ -138,11 +138,21 @@ Future<DiskProfileAdaptor::ProfileInfo>
       const string& profile,
       const ResourceProviderInfo& resourceProviderInfo)
 {
-  if (data.count(profile) != 1) {
+  if (profileMatrix.count(profile) != 1) {
     return Failure("Profile '" + profile + "' not found");
   }
 
-  return data.at(profile);
+  const DiskProfileMapping::CSIManifest& manifest = profileMatrix.at(profile);
+
+  // TODO(chhsiao): A storage resource provider may need to translate
+  // a profile that no longer applies to it to replay a `CREATE_VOLUME`
+  // or `CREATE_BLOCK` operation during recovery, so resource provider
+  // selection is only done in `watch()` but not here. We should do the
+  // selection once profiles are checkpointed in the resource provider.
+  return DiskProfileAdaptor::ProfileInfo{
+    manifest.volume_capabilities(),
+    manifest.create_parameters()
+  };
 }
 
 
@@ -150,11 +160,31 @@ Future<hashset<string>> 
UriDiskProfileAdaptorProcess::watch(
     const hashset<string>& knownProfiles,
     const ResourceProviderInfo& resourceProviderInfo)
 {
-  if (profiles != knownProfiles) {
-    return profiles;
+  // Calculate the new set of profiles for the resource provider.
+  // TODO(chhsiao): A storage resource provider assumes that the new set
+  // should be a superset of `knownProfiles`, so we bypass resource
+  // provider selection if a profile is already known. We should do the
+  // selection once profiles are checkpointed in the resource provider.
+  hashset<string> newProfiles = knownProfiles;
+  foreachpair (const string& profile,
+               const DiskProfileMapping::CSIManifest& manifest,
+               profileMatrix) {
+    if (knownProfiles.contains(profile)) {
+      continue;
+    }
+
+    if (isSelectedResourceProvider(manifest, resourceProviderInfo)) {
+      newProfiles.insert(profile);
+    }
   }
 
-  return watchPromise->future();
+  if (newProfiles != knownProfiles) {
+    return newProfiles;
+  }
+
+  // Wait for the next update if there is no change.
+  return watchPromise->future()
+    .then(defer(self(), &Self::watch, knownProfiles, resourceProviderInfo));
 }
 
 
@@ -212,7 +242,9 @@ void UriDiskProfileAdaptorProcess::notify(
 {
   bool hasErrors = false;
 
-  foreachkey (const string& profile, data) {
+  foreachpair (const string& profile,
+               const DiskProfileMapping::CSIManifest& manifest,
+               profileMatrix) {
     if (parsed.profile_matrix().count(profile) != 1) {
       hasErrors = true;
 
@@ -223,11 +255,11 @@ void UriDiskProfileAdaptorProcess::notify(
     }
 
     bool matchingCapability =
-      data.at(profile).capability ==
+      manifest.volume_capabilities() ==
         parsed.profile_matrix().at(profile).volume_capabilities();
 
     bool matchingParameters =
-      data.at(profile).parameters ==
+      manifest.create_parameters() ==
         parsed.profile_matrix().at(profile).create_parameters();
 
     if (!matchingCapability || !matchingParameters) {
@@ -249,35 +281,26 @@ void UriDiskProfileAdaptorProcess::notify(
 
   // Profiles can only be added, so if the parsed data is the same size,
   // nothing has changed and no notifications need to be sent.
-  if (parsed.profile_matrix().size() <= data.size()) {
+  if (parsed.profile_matrix().size() <= profileMatrix.size()) {
     return;
   }
 
   // The fetched mapping satisfies our invariants.
 
-  // Save the protobuf as a map we can expose through the module interface.
-  // And update the convenience set of profile names.
-  profiles.clear();
-  auto iterator = parsed.profile_matrix().begin();
-  while (iterator != parsed.profile_matrix().end()) {
-    data[iterator->first] = {
-      iterator->second.volume_capabilities(),
-      iterator->second.create_parameters()
-    };
-
-    profiles.insert(iterator->first);
-    iterator++;
-  }
+  // Save the protobuf as a map.
+  profileMatrix = map<string, DiskProfileMapping::CSIManifest>(
+      parsed.profile_matrix().begin(),
+      parsed.profile_matrix().end());
 
   // Notify any watchers and then prepare a new promise for the next
   // iteration of polling.
   //
   // TODO(josephw): Delay this based on the `--max_random_wait` option.
-  watchPromise->set(profiles);
-  watchPromise.reset(new Promise<hashset<string>>());
+  watchPromise->set(Nothing());
+  watchPromise.reset(new Promise<Nothing>());
 
   LOG(INFO)
-    << "Updated disk profile mapping to " << profiles.size()
+    << "Updated disk profile mapping to " << profileMatrix.size()
     << " total profiles";
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/58add5a2/src/resource_provider/storage/uri_disk_profile.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/uri_disk_profile.hpp 
b/src/resource_provider/storage/uri_disk_profile.hpp
index 746111e..22e9d8b 100644
--- a/src/resource_provider/storage/uri_disk_profile.hpp
+++ b/src/resource_provider/storage/uri_disk_profile.hpp
@@ -58,17 +58,23 @@ struct Flags : public virtual flags::FlagsBase
         "This module supports both HTTP(s) and file URIs\n."
         "\n"
         "The JSON object should consist of some top-level string keys\n"
-        "corresponding to the disk profile name. Each value should\n"
-        "contain a `VolumeCapability` under a 'volume_capabilities'\n"
+        "corresponding to the disk profile name. Each value should contain\n"
+        "a `ResourceProviderSelector` under 'resource_provider_selector' or\n"
+        "a `CSIPluginTypeSelector` under 'csi_plugin_type_selector' to\n"
+        "specify the set of resource providers this profile applies to,\n"
+        "followed by a `VolumeCapability` under 'volume_capabilities'\n"
         "and a free-form string-string mapping under 'create_parameters'.\n"
         "\n"
         "The JSON is modeled after a protobuf found in\n"
-        "`src/csi/uri_disk_profile.proto`.\n"
+        "`src/resource_provider/storage/disk_profile.proto`.\n"
         "\n"
         "For example:\n"
         "{\n"
         "  \"profile_matrix\" : {\n"
         "    \"my-profile\" : {\n"
+        "      \"csi_plugin_type_selector\": {\n"
+        "        \"plugin_type\" : \"org.apache.mesos.csi.test\"\n"
+        "      \"},\n"
         "      \"volume_capabilities\" : {\n"
         "        \"block\" : {},\n"
         "        \"access_mode\" : { \"mode\" : \"SINGLE_NODE_WRITER\" }\n"
@@ -171,14 +177,14 @@ struct Flags : public virtual flags::FlagsBase
 // and assumes that all fetched profiles are meant for all resource providers.
 //
 // See `Flags` above for more information.
-class UriDiskProfileAdaptor : public mesos::DiskProfileAdaptor
+class UriDiskProfileAdaptor : public DiskProfileAdaptor
 {
 public:
   UriDiskProfileAdaptor(const Flags& _flags);
 
   virtual ~UriDiskProfileAdaptor();
 
-  virtual process::Future<mesos::DiskProfileAdaptor::ProfileInfo> translate(
+  virtual process::Future<DiskProfileAdaptor::ProfileInfo> translate(
       const std::string& profile,
       const ResourceProviderInfo& resourceProviderInfo) override;
 
@@ -200,7 +206,7 @@ public:
 
   virtual void initialize() override;
 
-  process::Future<mesos::DiskProfileAdaptor::ProfileInfo> translate(
+  process::Future<DiskProfileAdaptor::ProfileInfo> translate(
       const std::string& profile,
       const ResourceProviderInfo& resourceProviderInfo);
 
@@ -226,19 +232,16 @@ private:
   Flags flags;
 
   // The last fetched profile mapping.
-  // This module assumes that profiles can only be added and never removed.
-  // Once added, profiles cannot be changed either.
+  // This module assumes that profiles can only be added and never
+  // removed. Once added, a profile's volume capability and parameters
+  // cannot be changed either.
   //
   // TODO(josephw): Consider persisting this mapping across agent restarts.
-  std::map<std::string, DiskProfileAdaptor::ProfileInfo> data;
+  std::map<std::string, resource_provider::DiskProfileMapping::CSIManifest>
+    profileMatrix;
 
-  // Convenience set of the keys in `data` above.
-  // This module does not filter based on `CSIPluginInfo::type`, so this
-  // is valid for all input to `watch(...)`.
-  hashset<std::string> profiles;
-
-  // Will be satisfied whenever `data` is changed.
-  process::Owned<process::Promise<hashset<std::string>>> watchPromise;
+  // Will be satisfied whenever `profileMatrix` is changed.
+  process::Owned<process::Promise<Nothing>> watchPromise;
 };
 
 } // namespace profile {

Reply via email to