This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 1da706a5863727b60fdff9ef02f5152ca8c40683
Author: Chun-Hung Hsiao <chhs...@apache.org>
AuthorDate: Mon Apr 1 23:23:46 2019 -0700

    Cleanup the recovery logic for refactoring SLRP.
    
    In addition to perform volume state recovery, the `recoverVolumes`
    function also recovers service manager and preparing services now. The
    whole logic will be moved out from SLRP to v0 `VolumeManager` later.
    
    During volume state recovery, we no longer recover all volumes to steady
    states, since transient states are properly handled in SLRP. To simplify
    the recovery logic, a `publishVolume` method that conforms to the volume
    manager's `publishVolume` is introduced.
    
    Review: https://reviews.apache.org/r/70216/
---
 src/resource_provider/storage/provider.cpp         | 827 +++++++++------------
 src/resource_provider/storage/provider_process.hpp |  51 +-
 2 files changed, 374 insertions(+), 504 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index b762bb5..428d239 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -97,6 +97,10 @@
 
 namespace http = process::http;
 
+// TODO(chhsiao): Remove `using namespace` statements after refactoring.
+using namespace mesos::csi;
+using namespace mesos::csi::v0;
+
 using std::accumulate;
 using std::find;
 using std::list;
@@ -449,23 +453,19 @@ StorageLocalResourceProviderProcess::__call(
 
 void StorageLocalResourceProviderProcess::initialize()
 {
-  Try<string> _bootId = os::bootId();
-  if (_bootId.isError()) {
-    LOG(ERROR) << "Failed to get boot ID: " << _bootId.error();
-    return fatal();
-  }
-
-  bootId = _bootId.get();
-
   const Principal principal = LocalResourceProvider::principal(info);
   CHECK(principal.claims.contains("cid_prefix"));
   const string& containerPrefix = principal.claims.at("cid_prefix");
 
+  rootDir = slave::paths::getCsiRootDir(workDir);
+  pluginInfo = info.storage().plugin();
+  services = {CONTROLLER_SERVICE, NODE_SERVICE};
+
   serviceManager.reset(new ServiceManager(
       extractParentEndpoint(url),
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin(),
-      {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE},
+      rootDir,
+      pluginInfo,
+      services,
       containerPrefix,
       authToken,
       runtime,
@@ -500,21 +500,81 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
 {
   CHECK_EQ(RECOVERING, state);
 
-  // NOTE: The `Controller` service is supported if the plugin has the
-  // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is supported if
-  // the `Controller` service has the `PUBLISH_UNPUBLISH_VOLUME` capability. So
-  // we first launch the node plugin to get the plugin capabilities, then 
decide
-  // if we need to launch the controller plugin and get the node ID.
-  return serviceManager->recover()
-    .then(defer(self(), &Self::prepareIdentityService))
-    .then(defer(self(), &Self::prepareControllerService))
-    .then(defer(self(), &Self::prepareNodeService))
-    .then(defer(self(), &Self::recoverVolumes))
-    .then(defer(self(), &Self::recoverResourceProviderState))
+  return recoverVolumes()
     .then(defer(self(), [=]() -> Future<Nothing> {
-      LOG(INFO)
-        << "Finished recovery for resource provider with type '" << info.type()
-        << "' and name '" << info.name() << "'";
+      // Recover the resource provider ID and state from the latest symlink. If
+      // the symlink does not exist, this is a new resource provider, and the
+      // total resources will be empty, which is fine since new resources will
+      // be added during reconciliation.
+      Result<string> realpath =
+        os::realpath(slave::paths::getLatestResourceProviderPath(
+            metaDir, slaveId, info.type(), info.name()));
+
+      if (realpath.isError()) {
+        return Failure(
+            "Failed to read the latest symlink for resource provider with type 
"
+            "'" + info.type() + "' and name '" + info.name() + "': " +
+            realpath.error());
+      }
+
+      if (realpath.isSome()) {
+        info.mutable_id()->set_value(Path(realpath.get()).basename());
+
+        const string statePath = slave::paths::getResourceProviderStatePath(
+            metaDir, slaveId, info.type(), info.name(), info.id());
+
+        if (os::exists(statePath)) {
+          Result<ResourceProviderState> resourceProviderState =
+            slave::state::read<ResourceProviderState>(statePath);
+
+          if (resourceProviderState.isError()) {
+            return Failure(
+                "Failed to read resource provider state from '" + statePath +
+                "': " + resourceProviderState.error());
+          }
+
+          if (resourceProviderState.isSome()) {
+            foreach (const Operation& operation,
+                     resourceProviderState->operations()) {
+              Try<id::UUID> uuid =
+                id::UUID::fromBytes(operation.uuid().value());
+
+              operations[CHECK_NOTERROR(uuid)] = operation;
+            }
+
+            totalResources = resourceProviderState->resources();
+
+            const ResourceProviderState::Storage& storage =
+              resourceProviderState->storage();
+
+            using ProfileEntry = google::protobuf::
+              MapPair<string, ResourceProviderState::Storage::ProfileInfo>;
+
+            foreach (const ProfileEntry& entry, storage.profiles()) {
+              profileInfos.put(
+                  entry.first,
+                  {entry.second.capability(), entry.second.parameters()});
+            }
+
+            // We only checkpoint profiles associated with storage pools (i.e.,
+            // resources without IDs) in `checkpointResourceProviderState` as
+            // only these profiles might be used by pending operations, so we
+            // validate here that all such profiles exist.
+            foreach (const Resource& resource, totalResources) {
+              if (!resource.disk().source().has_id() &&
+                  resource.disk().source().has_profile() &&
+                  !profileInfos.contains(resource.disk().source().profile())) {
+                return Failure(
+                    "Cannot recover profile for storage pool '" +
+                    stringify(resource) + "' from '" + statePath + "'");
+              }
+            }
+          }
+        }
+      }
+
+      LOG(INFO) << "Finished recovery for resource provider with type '"
+                << info.type() << "' and name '" << info.name() << "'";
 
       state = DISCONNECTED;
 
@@ -543,338 +603,142 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
 
 Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
 {
-  // Recover the states of CSI volumes.
-  Try<list<string>> volumePaths = csi::paths::getVolumePaths(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name());
-
-  if (volumePaths.isError()) {
-    return Failure(
-        "Failed to find volumes for CSI plugin type '" +
-        info.storage().plugin().type() + "' and name '" +
-        info.storage().plugin().name() + "': " + volumePaths.error());
+  Try<string> bootId_ = os::bootId();
+  if (bootId_.isError()) {
+    return Failure("Failed to get boot ID: " + bootId_.error());
   }
 
-  vector<Future<Nothing>> futures;
+  bootId = bootId_.get();
 
-  foreach (const string& path, volumePaths.get()) {
-    Try<csi::paths::VolumePath> volumePath =
-      csi::paths::parseVolumePath(slave::paths::getCsiRootDir(workDir), path);
-
-    if (volumePath.isError()) {
-      return Failure(
-          "Failed to parse volume path '" + path + "': " + volumePath.error());
-    }
+  return serviceManager->recover()
+    .then(process::defer(self(), &Self::prepareServices))
+    .then(process::defer(self(), [this]() -> Future<Nothing> {
+      // Recover the states of CSI volumes.
+      Try<list<string>> volumePaths =
+        paths::getVolumePaths(rootDir, pluginInfo.type(), pluginInfo.name());
 
-    CHECK_EQ(info.storage().plugin().type(), volumePath->type);
-    CHECK_EQ(info.storage().plugin().name(), volumePath->name);
+      if (volumePaths.isError()) {
+        return Failure(
+            "Failed to find volumes for CSI plugin type '" + pluginInfo.type() 
+
+            "' and name '" + pluginInfo.name() + "': " + volumePaths.error());
+      }
 
-    const string& volumeId = volumePath->volumeId;
-    const string statePath = csi::paths::getVolumeStatePath(
-        slave::paths::getCsiRootDir(workDir),
-        info.storage().plugin().type(),
-        info.storage().plugin().name(),
-        volumeId);
+      vector<Future<Nothing>> futures;
 
-    if (!os::exists(statePath)) {
-      continue;
-    }
+      foreach (const string& path, volumePaths.get()) {
+        Try<paths::VolumePath> volumePath =
+          paths::parseVolumePath(rootDir, path);
 
-    Result<VolumeState> volumeState =
-      slave::state::read<VolumeState>(statePath);
+        if (volumePath.isError()) {
+          return Failure(
+              "Failed to parse volume path '" + path +
+              "': " + volumePath.error());
+        }
 
-    if (volumeState.isError()) {
-      return Failure(
-          "Failed to read volume state from '" + statePath + "': " +
-          volumeState.error());
-    }
+        CHECK_EQ(pluginInfo.type(), volumePath->type);
+        CHECK_EQ(pluginInfo.name(), volumePath->name);
 
-    if (volumeState.isSome()) {
-      volumes.put(volumeId, std::move(volumeState.get()));
+        const string& volumeId = volumePath->volumeId;
+        const string statePath = paths::getVolumeStatePath(
+            rootDir, pluginInfo.type(), pluginInfo.name(), volumeId);
 
-      // To avoid any race with, e.g., `deleteVolume` calls, we sequentialize
-      // this lambda with any other operation on the same volume below, so the
-      // volume is guaranteed to exist in the deferred execution.
-      std::function<Future<Nothing>()> recoverVolume = defer(self(), [=]()
-          -> Future<Nothing> {
-        VolumeData& volume = volumes.at(volumeId);
-        Future<Nothing> recovered = Nothing();
-
-        // First, bring the volume back to a "good" state.
-        if (VolumeState::State_IsValid(volume.state.state())) {
-          switch (volume.state.state()) {
-            case VolumeState::CREATED:
-            case VolumeState::NODE_READY: {
-              break;
-            }
-            case VolumeState::VOL_READY:
-            case VolumeState::PUBLISHED: {
-              if (volume.state.boot_id() != bootId) {
-                // The node has been restarted since the volume is made
-                // publishable, so it is reset to `NODE_READY` state.
-                volume.state.set_state(VolumeState::NODE_READY);
-                volume.state.clear_boot_id();
-                checkpointVolumeState(volumeId);
-              }
+        if (!os::exists(statePath)) {
+          continue;
+        }
 
-              break;
-            }
-            case VolumeState::CONTROLLER_PUBLISH: {
-              recovered = controllerPublish(volumeId);
+        Result<VolumeState> volumeState =
+          slave::state::read<VolumeState>(statePath);
 
-              break;
-            }
-            case VolumeState::CONTROLLER_UNPUBLISH: {
-              recovered = controllerUnpublish(volumeId);
+        if (volumeState.isError()) {
+          return Failure(
+              "Failed to read volume state from '" + statePath +
+              "': " + volumeState.error());
+        }
 
-              break;
-            }
-            case VolumeState::NODE_STAGE: {
-              recovered = nodeStage(volumeId);
+        if (volumeState.isNone()) {
+          continue;
+        }
 
-              break;
-            }
-            case VolumeState::NODE_UNSTAGE: {
-              if (volume.state.boot_id() != bootId) {
-                // The node has been restarted since the volume is made
-                // publishable, so it is reset to `NODE_READY` state.
-                volume.state.set_state(VolumeState::NODE_READY);
-                volume.state.clear_boot_id();
-                checkpointVolumeState(volumeId);
-              } else {
-                recovered = nodeUnstage(volumeId);
-              }
+        volumes.put(volumeId, std::move(volumeState.get()));
+        VolumeData& volume = volumes.at(volumeId);
 
-              break;
-            }
-            case VolumeState::NODE_PUBLISH: {
-              if (volume.state.boot_id() != bootId) {
-                // The node has been restarted since the volume is made
-                // publishable, so it is reset to `NODE_READY` state.
-                volume.state.set_state(VolumeState::NODE_READY);
-                volume.state.clear_boot_id();
-                checkpointVolumeState(volumeId);
-              } else {
-                recovered = nodePublish(volumeId);
-              }
+        if (!VolumeState::State_IsValid(volume.state.state())) {
+          return Failure("Volume '" + volumeId + "' is in INVALID state");
+        }
 
-              break;
+        // First, if there is a node reboot after the volume is made
+        // publishable, it should be reset to `NODE_READY`.
+        switch (volume.state.state()) {
+          case VolumeState::CREATED:
+          case VolumeState::NODE_READY:
+          case VolumeState::CONTROLLER_PUBLISH:
+          case VolumeState::CONTROLLER_UNPUBLISH:
+          case VolumeState::NODE_STAGE: {
+            break;
+          }
+          case VolumeState::VOL_READY:
+          case VolumeState::PUBLISHED:
+          case VolumeState::NODE_UNSTAGE:
+          case VolumeState::NODE_PUBLISH:
+          case VolumeState::NODE_UNPUBLISH: {
+            if (bootId != volume.state.boot_id()) {
+              // Since this is a no-op, no need to checkpoint here.
+              volume.state.set_state(VolumeState::NODE_READY);
+              volume.state.clear_boot_id();
             }
-            case VolumeState::NODE_UNPUBLISH: {
-              if (volume.state.boot_id() != bootId) {
-                // The node has been restarted since the volume is made
-                // publishable, so it is reset to `NODE_READY` state.
-                volume.state.set_state(VolumeState::NODE_READY);
-                volume.state.clear_boot_id();
-                checkpointVolumeState(volumeId);
-              } else {
-                recovered = nodeUnpublish(volumeId);
-              }
 
-              break;
-            }
-            case VolumeState::UNKNOWN: {
-              return Failure(
-                  "Volume '" + volumeId + "' is in " +
-                  stringify(volume.state.state()) + " state");
-            }
+            break;
+          }
+          case VolumeState::UNKNOWN: {
+            return Failure("Volume '" + volumeId + "' is in UNKNOWN state");
+          }
 
-            // NOTE: We avoid using a default clause for the following values 
in
-            // proto3's open enum to enable the compiler to detect missing enum
-            // cases for us. See: 
https://github.com/google/protobuf/issues/3917
-            case google::protobuf::kint32min:
-            case google::protobuf::kint32max: {
-              UNREACHABLE();
-            }
+          // NOTE: We avoid using a default clause for the following values in
+          // proto3's open enum to enable the compiler to detect missing enum
+          // cases for us. See: https://github.com/google/protobuf/issues/3917
+          case google::protobuf::kint32min:
+          case google::protobuf::kint32max: {
+            UNREACHABLE();
           }
-        } else {
-          return Failure("Volume '" + volumeId + "' is in UNDEFINED state");
         }
 
-        auto err = [](const string& volumeId, const string& message) {
-          LOG(ERROR)
-            << "Failed to recover volume '" << volumeId << "': " << message;
-        };
-
         // Second, if the volume has been used by a container before recovery,
         // we have to bring the volume back to `PUBLISHED` so data can be
-        // cleaned up synchronously upon `DESTROY`. Otherwise, we skip the 
error
-        // and continue recovery.
+        // cleaned up synchronously when needed.
         if (volume.state.node_publish_required()) {
-          recovered = recovered
-            .then(defer(self(), [this, volumeId]() -> Future<Nothing> {
-              const VolumeData& volume = volumes.at(volumeId);
-              Future<Nothing> published = Nothing();
-
-              CHECK(VolumeState::State_IsValid(volume.state.state()));
-
-              switch (volume.state.state()) {
-                case VolumeState::NODE_READY: {
-                  published = published
-                    .then(defer(self(), &Self::nodeStage, volumeId));
-
-                  // NOTE: We continue to the next case to recover the volume 
in
-                  // `VOL_READY` state once the above is done.
-                }
-                case VolumeState::VOL_READY: {
-                  published = published
-                    .then(defer(self(), &Self::nodePublish, volumeId));
-
-                  // NOTE: We continue to the next case to recover the volume 
in
-                  // `PUBLISHED` state once the above is done.
-                }
-                case VolumeState::PUBLISHED: {
-                  break;
-                }
-                case VolumeState::UNKNOWN:
-                case VolumeState::CREATED:
-                case VolumeState::CONTROLLER_PUBLISH:
-                case VolumeState::CONTROLLER_UNPUBLISH:
-                case VolumeState::NODE_STAGE:
-                case VolumeState::NODE_UNSTAGE:
-                case VolumeState::NODE_PUBLISH:
-                case VolumeState::NODE_UNPUBLISH: {
-                  UNREACHABLE();
-                }
-
-                // NOTE: We avoid using a default clause for the following
-                // values in proto3's open enum to enable the compiler to 
detect
-                // missing enum cases for us. See:
-                // https://github.com/google/protobuf/issues/3917
-                case google::protobuf::kint32min:
-                case google::protobuf::kint32max: {
-                  UNREACHABLE();
-                }
-              }
-
-              return published;
-            }))
-            .onFailed(std::bind(err, volumeId, lambda::_1))
-            .onDiscarded(std::bind(err, volumeId, "future discarded"));
-        } else {
-          recovered = recovered
-            .onFailed(std::bind(err, volumeId, lambda::_1))
-            .onDiscarded(std::bind(err, volumeId, "future discarded"))
-            .recover([](const Future<Nothing>& future) { return Nothing(); });
+          futures.push_back(publishVolume(volumeId));
         }
-
-        return recovered;
-      });
-
-      futures.push_back(volumes.at(volumeId).sequence->add(recoverVolume));
-    }
-  }
-
-  // Garbage collect leftover mount paths that were failed to remove before.
-  const string mountRootDir = csi::paths::getMountRootDir(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name());
-
-  Try<list<string>> mountPaths = csi::paths::getMountPaths(mountRootDir);
-  if (mountPaths.isError()) {
-    // TODO(chhsiao): This could indicate that something is seriously wrong. To
-    // help debugging the problem, we should surface the error via MESOS-8745.
-    return Failure(
-        "Failed to find mount paths for CSI plugin type '" +
-        info.storage().plugin().type() + "' and name '" +
-        info.storage().plugin().name() + "': " + mountPaths.error());
-  }
-
-  foreach (const string& path, mountPaths.get()) {
-    Try<string> volumeId = csi::paths::parseMountPath(mountRootDir, path);
-    if (volumeId.isError()) {
-      return Failure(
-          "Failed to parse mount path '" + path + "': " + volumeId.error());
-    }
-
-    if (!volumes.contains(volumeId.get())) {
-      garbageCollectMountPath(volumeId.get());
-    }
-  }
-
-  return collect(futures).then([] { return Nothing(); });
-}
-
-
-Future<Nothing>
-StorageLocalResourceProviderProcess::recoverResourceProviderState()
-{
-  // Recover the resource provider ID and state from the latest
-  // symlink. If the symlink does not exist, this is a new resource
-  // provider, and the total resources will be empty, which is fine
-  // since new resources will be added during reconciliation.
-  Result<string> realpath = os::realpath(
-      slave::paths::getLatestResourceProviderPath(
-          metaDir, slaveId, info.type(), info.name()));
-
-  if (realpath.isError()) {
-    return Failure(
-        "Failed to read the latest symlink for resource provider with type '" +
-        info.type() + "' and name '" + info.name() + "': " + realpath.error());
-  }
-
-  if (realpath.isSome()) {
-    info.mutable_id()->set_value(Path(realpath.get()).basename());
-
-    const string statePath = slave::paths::getResourceProviderStatePath(
-        metaDir, slaveId, info.type(), info.name(), info.id());
-
-    if (!os::exists(statePath)) {
-      return Nothing();
-    }
-
-    Result<ResourceProviderState> resourceProviderState =
-      slave::state::read<ResourceProviderState>(statePath);
-
-    if (resourceProviderState.isError()) {
-      return Failure(
-          "Failed to read resource provider state from '" + statePath +
-          "': " + resourceProviderState.error());
-    }
-
-    if (resourceProviderState.isSome()) {
-      foreach (const Operation& operation,
-               resourceProviderState->operations()) {
-        Try<id::UUID> uuid = id::UUID::fromBytes(operation.uuid().value());
-
-        CHECK_SOME(uuid);
-
-        operations[uuid.get()] = operation;
       }
 
-      totalResources = resourceProviderState->resources();
-
-      const ResourceProviderState::Storage& storage =
-        resourceProviderState->storage();
-
-      using ProfileEntry = google::protobuf::MapPair<
-          string, ResourceProviderState::Storage::ProfileInfo>;
+      // Garbage collect leftover mount paths that were failed to remove 
before.
+      const string mountRootDir =
+        paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name());
 
-      foreach (const ProfileEntry& entry, storage.profiles()) {
-        profileInfos.put(
-            entry.first,
-            {entry.second.capability(), entry.second.parameters()});
+      Try<list<string>> mountPaths = paths::getMountPaths(mountRootDir);
+      if (mountPaths.isError()) {
+        // TODO(chhsiao): This could indicate that something is seriously 
wrong.
+        // To help debugging the problem, we should surface the error via
+        // MESOS-8745.
+        return Failure(
+            "Failed to find mount paths for CSI plugin type '" +
+            pluginInfo.type() + "' and name '" + pluginInfo.name() +
+            "': " + mountPaths.error());
       }
 
-      // We only checkpoint profiles associated with storage pools (i.e.,
-      // resources without IDs) in `checkpointResourceProviderState` as only
-      // these profiles might be used by pending operations, so we validate 
here
-      // that all such profiles exist.
-      foreach (const Resource& resource, totalResources) {
-        if (!resource.disk().source().has_id() &&
-            resource.disk().source().has_profile() &&
-            !profileInfos.contains(resource.disk().source().profile())) {
+      foreach (const string& path, mountPaths.get()) {
+        Try<string> volumeId = paths::parseMountPath(mountRootDir, path);
+        if (volumeId.isError()) {
           return Failure(
-              "Cannot recover profile for storage pool '" +
-              stringify(resource) + "' from '" + statePath + "'");
+              "Failed to parse mount path '" + path + "': " + 
volumeId.error());
+        }
+
+        if (!volumes.contains(volumeId.get())) {
+          garbageCollectMountPath(volumeId.get());
         }
       }
-    }
-  }
 
-  return Nothing();
+      return process::collect(futures).then([] { return Nothing(); });
+    }));
 }
 
 
@@ -1486,87 +1350,7 @@ void 
StorageLocalResourceProviderProcess::publishResources(
     vector<Future<Nothing>> futures;
 
     foreach (const string& volumeId, volumeIds) {
-      // We check the state of the volume along with the CSI calls
-      // atomically with respect to other publish or deletion requests
-      // for the same volume through dispatching the whole lambda on the
-      // volume's sequence.
-      std::function<Future<Nothing>()> controllerAndNodePublish =
-        defer(self(), [=] {
-          CHECK(volumes.contains(volumeId));
-          const VolumeData& volume = volumes.at(volumeId);
-
-          Future<Nothing> published = Nothing();
-
-          CHECK(VolumeState::State_IsValid(volume.state.state()));
-
-          switch (volume.state.state()) {
-            case VolumeState::CONTROLLER_UNPUBLISH: {
-              published = published
-                .then(defer(self(), &Self::controllerUnpublish, volumeId));
-
-              // NOTE: We continue to the next case to publish the volume in
-              // `CREATED` state once the above is done.
-            }
-            case VolumeState::CREATED:
-            case VolumeState::CONTROLLER_PUBLISH: {
-              published = published
-                .then(defer(self(), &Self::controllerPublish, volumeId))
-                .then(defer(self(), &Self::nodeStage, volumeId))
-                .then(defer(self(), &Self::nodePublish, volumeId));
-
-              break;
-            }
-            case VolumeState::NODE_UNSTAGE: {
-              published = published
-                .then(defer(self(), &Self::nodeUnstage, volumeId));
-
-              // NOTE: We continue to the next case to publish the volume in
-              // `NODE_READY` state once the above is done.
-            }
-            case VolumeState::NODE_READY:
-            case VolumeState::NODE_STAGE: {
-              published = published
-                .then(defer(self(), &Self::nodeStage, volumeId))
-                .then(defer(self(), &Self::nodePublish, volumeId));
-
-              break;
-            }
-            case VolumeState::NODE_UNPUBLISH: {
-              published = published
-                .then(defer(self(), &Self::nodeUnpublish, volumeId));
-
-              // NOTE: We continue to the next case to publish the volume in
-              // `VOL_READY` state once the above is done.
-            }
-            case VolumeState::VOL_READY:
-            case VolumeState::NODE_PUBLISH: {
-              published = published
-                .then(defer(self(), &Self::nodePublish, volumeId));
-
-              break;
-            }
-            case VolumeState::PUBLISHED: {
-              break;
-            }
-            case VolumeState::UNKNOWN: {
-              UNREACHABLE();
-            }
-
-            // NOTE: We avoid using a default clause for the following
-            // values in proto3's open enum to enable the compiler to detect
-            // missing enum cases for us. See:
-            // https://github.com/google/protobuf/issues/3917
-            case google::protobuf::kint32min:
-            case google::protobuf::kint32max: {
-              UNREACHABLE();
-            }
-          }
-
-          return published;
-        });
-
-      futures.push_back(
-          volumes.at(volumeId).sequence->add(controllerAndNodePublish));
+      futures.push_back(publishVolume(volumeId));
     }
 
     allPublished = collect(futures);
@@ -1674,86 +1458,179 @@ void 
StorageLocalResourceProviderProcess::reconcileOperations(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
+Future<Nothing> StorageLocalResourceProviderProcess::prepareServices()
 {
-  // Get the plugin info.
-  return call<csi::v0::GET_PLUGIN_INFO>(
-      csi::NODE_SERVICE, csi::v0::GetPluginInfoRequest())
-    .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) {
-      pluginInfo = response;
-
-      LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get());
+  CHECK(!services.empty());
 
-      // Get the plugin capabilities.
-      return call<csi::v0::GET_PLUGIN_CAPABILITIES>(
-          csi::NODE_SERVICE, csi::v0::GetPluginCapabilitiesRequest());
-    }))
-    .then(defer(self(), [=](
-        const csi::v0::GetPluginCapabilitiesResponse& response) {
+  // Get the plugin capabilities.
+  return call<GET_PLUGIN_CAPABILITIES>(
+      *services.begin(), GetPluginCapabilitiesRequest())
+    .then(process::defer(self(), [=](
+        const GetPluginCapabilitiesResponse& response) -> Future<Nothing> {
       pluginCapabilities = response.capabilities();
 
+      if (services.contains(CONTROLLER_SERVICE) &&
+          !pluginCapabilities->controllerService) {
+        return Failure(
+            "CONTROLLER_SERVICE plugin capability is not supported for CSI "
+            "plugin type '" +
+            pluginInfo.type() + "' and name '" + pluginInfo.name() + "'");
+      }
+
       return Nothing();
+    }))
+    // Check if all services have consistent plugin infos.
+    .then(process::defer(self(), [this] {
+      vector<Future<GetPluginInfoResponse>> futures;
+      foreach (const Service& service, services) {
+        futures.push_back(
+            call<GET_PLUGIN_INFO>(CONTROLLER_SERVICE, GetPluginInfoRequest())
+              .onReady([service](const GetPluginInfoResponse& response) {
+                LOG(INFO) << service << " loaded: " << stringify(response);
+              }));
+      }
+
+      return process::collect(futures)
+        .then([](const vector<GetPluginInfoResponse>& pluginInfos) {
+          for (size_t i = 1; i < pluginInfos.size(); ++i) {
+            if (pluginInfos[i].name() != pluginInfos[0].name() ||
+                pluginInfos[i].vendor_version() !=
+                  pluginInfos[0].vendor_version()) {
+              LOG(WARNING) << "Inconsistent plugin services. Please check with 
"
+                              "the plugin vendor to ensure compatibility.";
+            }
+          }
+
+          return Nothing();
+        });
+    }))
+    // Get the controller capabilities.
+    .then(process::defer(self(), [this]() -> Future<Nothing> {
+      if (!services.contains(CONTROLLER_SERVICE)) {
+        controllerCapabilities = ControllerCapabilities();
+        return Nothing();
+      }
+
+      return call<CONTROLLER_GET_CAPABILITIES>(
+          CONTROLLER_SERVICE, ControllerGetCapabilitiesRequest())
+        .then(process::defer(self(), [this](
+            const ControllerGetCapabilitiesResponse& response) {
+          controllerCapabilities = response.capabilities();
+          return Nothing();
+        }));
+    }))
+    // Get the node capabilities and ID.
+    .then(process::defer(self(), [this]() -> Future<Nothing> {
+      if (!services.contains(NODE_SERVICE)) {
+        nodeCapabilities = NodeCapabilities();
+        return Nothing();
+      }
+
+      return call<NODE_GET_CAPABILITIES>(
+          NODE_SERVICE, NodeGetCapabilitiesRequest())
+        .then(process::defer(self(), [this](
+            const NodeGetCapabilitiesResponse& response) -> Future<Nothing> {
+          nodeCapabilities = response.capabilities();
+
+          if (controllerCapabilities->publishUnpublishVolume) {
+            return call<NODE_GET_ID>(NODE_SERVICE, NodeGetIdRequest())
+              .then(process::defer(self(), [this](
+                  const NodeGetIdResponse& response) {
+                nodeId = response.node_id();
+                return Nothing();
+              }));
+          }
+
+          return Nothing();
+        }));
     }));
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
+Future<Nothing> StorageLocalResourceProviderProcess::publishVolume(
+    const string& volumeId)
 {
-  CHECK_SOME(pluginInfo);
+  CHECK(volumes.contains(volumeId));
 
-  if (!pluginCapabilities.controllerService) {
-    return Nothing();
-  }
+  // To avoid any race with, e.g., `deleteVolume` calls, we sequentialize this
+  // lambda with any other operation on the same volume below, so the volume is
+  // guaranteed to exist in the deferred execution.
+  std::function<Future<Nothing>()> controllerAndNodePublish =
+    defer(self(), [this, volumeId] {
+      CHECK(volumes.contains(volumeId));
+      const VolumeData& volume = volumes.at(volumeId);
 
-  // Get the controller plugin info and check for consistency.
-  return call<csi::v0::GET_PLUGIN_INFO>(
-      csi::CONTROLLER_SERVICE, csi::v0::GetPluginInfoRequest())
-    .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) {
-      LOG(INFO) << "Controller plugin loaded: " << stringify(response);
+      Future<Nothing> published = Nothing();
 
-      if (pluginInfo->name() != response.name() ||
-          pluginInfo->vendor_version() != response.vendor_version()) {
-        LOG(WARNING)
-          << "Inconsistent controller and node plugin components. Please check 
"
-             "with the plugin vendor to ensure compatibility.";
-      }
+      CHECK(VolumeState::State_IsValid(volume.state.state()));
 
-      // Get the controller capabilities.
-      return call<csi::v0::CONTROLLER_GET_CAPABILITIES>(
-          csi::CONTROLLER_SERVICE, 
csi::v0::ControllerGetCapabilitiesRequest());
-    }))
-    .then(defer(self(), [=](
-        const csi::v0::ControllerGetCapabilitiesResponse& response) {
-      controllerCapabilities = response.capabilities();
+      switch (volume.state.state()) {
+        case VolumeState::CONTROLLER_UNPUBLISH: {
+          published = published
+            .then(defer(self(), &Self::controllerUnpublish, volumeId));
 
-      return Nothing();
-    }));
-}
+          // NOTE: We continue to the next case to publish the volume in
+          // `CREATED` state once the above is done.
+        }
+        case VolumeState::CREATED:
+        case VolumeState::CONTROLLER_PUBLISH: {
+          published = published
+            .then(defer(self(), &Self::controllerPublish, volumeId))
+            .then(defer(self(), &Self::nodeStage, volumeId))
+            .then(defer(self(), &Self::nodePublish, volumeId));
 
+          break;
+        }
+        case VolumeState::NODE_UNSTAGE: {
+          published = published
+            .then(defer(self(), &Self::nodeUnstage, volumeId));
 
-Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
-{
-  // Get the node capabilities.
-  return call<csi::v0::NODE_GET_CAPABILITIES>(
-      csi::NODE_SERVICE, csi::v0::NodeGetCapabilitiesRequest())
-    .then(defer(self(), [=](
-        const csi::v0::NodeGetCapabilitiesResponse& response)
-        -> Future<Nothing> {
-      nodeCapabilities = response.capabilities();
+          // NOTE: We continue to the next case to publish the volume in
+          // `NODE_READY` state once the above is done.
+        }
+        case VolumeState::NODE_READY:
+        case VolumeState::NODE_STAGE: {
+          published = published
+            .then(defer(self(), &Self::nodeStage, volumeId))
+            .then(defer(self(), &Self::nodePublish, volumeId));
 
-      if (!controllerCapabilities.publishUnpublishVolume) {
-        return Nothing();
+          break;
+        }
+        case VolumeState::NODE_UNPUBLISH: {
+          published = published
+            .then(defer(self(), &Self::nodeUnpublish, volumeId));
+
+          // NOTE: We continue to the next case to publish the volume in
+          // `VOL_READY` state once the above is done.
+        }
+        case VolumeState::VOL_READY:
+        case VolumeState::NODE_PUBLISH: {
+          published = published
+            .then(defer(self(), &Self::nodePublish, volumeId));
+
+          break;
+        }
+        case VolumeState::PUBLISHED: {
+          break;
+        }
+        case VolumeState::UNKNOWN: {
+          UNREACHABLE();
+        }
+
+        // NOTE: We avoid using a default clause for the following
+        // values in proto3's open enum to enable the compiler to detect
+        // missing enum cases for us. See:
+        // https://github.com/google/protobuf/issues/3917
+        case google::protobuf::kint32min:
+        case google::protobuf::kint32max: {
+          UNREACHABLE();
+        }
       }
 
-      // Get the node ID.
-      return call<csi::v0::NODE_GET_ID>(
-          csi::NODE_SERVICE, csi::v0::NodeGetIdRequest())
-        .then(defer(self(), [=](const csi::v0::NodeGetIdResponse& response) {
-          nodeId = response.node_id();
+      return published;
+    });
 
-          return Nothing();
-        }));
-    }));
+    return volumes.at(volumeId).sequence->add(controllerAndNodePublish);
 }
 
 
@@ -1763,7 +1640,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::controllerPublish(
   CHECK(volumes.contains(volumeId));
   VolumeData& volume = volumes.at(volumeId);
 
-  if (!controllerCapabilities.publishUnpublishVolume) {
+  if (!controllerCapabilities->publishUnpublishVolume) {
     CHECK_EQ(VolumeState::CREATED, volume.state.state());
 
     volume.state.set_state(VolumeState::NODE_READY);
@@ -1810,7 +1687,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::controllerUnpublish(
   CHECK(volumes.contains(volumeId));
   VolumeData& volume = volumes.at(volumeId);
 
-  if (!controllerCapabilities.publishUnpublishVolume) {
+  if (!controllerCapabilities->publishUnpublishVolume) {
     CHECK_EQ(VolumeState::NODE_READY, volume.state.state());
 
     volume.state.set_state(VolumeState::CREATED);
@@ -1856,11 +1733,11 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodeStage(
   CHECK(volumes.contains(volumeId));
   VolumeData& volume = volumes.at(volumeId);
 
-  if (!nodeCapabilities.stageUnstageVolume) {
+  if (!nodeCapabilities->stageUnstageVolume) {
     CHECK_EQ(VolumeState::NODE_READY, volume.state.state());
 
     volume.state.set_state(VolumeState::VOL_READY);
-    volume.state.set_boot_id(bootId);
+    volume.state.set_boot_id(CHECK_NOTNONE(bootId));
     checkpointVolumeState(volumeId);
 
     return Nothing();
@@ -1901,7 +1778,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodeStage(
       VolumeData& volume = volumes.at(volumeId);
 
       volume.state.set_state(VolumeState::VOL_READY);
-      volume.state.set_boot_id(bootId);
+      volume.state.set_boot_id(CHECK_NOTNONE(bootId));
       checkpointVolumeState(volumeId);
 
       return Nothing();
@@ -1915,7 +1792,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodeUnstage(
   CHECK(volumes.contains(volumeId));
   VolumeData& volume = volumes.at(volumeId);
 
-  if (!nodeCapabilities.stageUnstageVolume) {
+  if (!nodeCapabilities->stageUnstageVolume) {
     CHECK_EQ(VolumeState::VOL_READY, volume.state.state());
 
     volume.state.set_state(VolumeState::NODE_READY);
@@ -2000,7 +1877,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodePublish(
   request.set_readonly(false);
   *request.mutable_volume_attributes() = volume.state.volume_attributes();
 
-  if (nodeCapabilities.stageUnstageVolume) {
+  if (nodeCapabilities->stageUnstageVolume) {
     const string stagingPath = csi::paths::getMountStagingPath(
         csi::paths::getMountRootDir(
             slave::paths::getCsiRootDir(workDir),
@@ -2081,7 +1958,7 @@ Future<string> 
StorageLocalResourceProviderProcess::createVolume(
     const Bytes& capacity,
     const DiskProfileAdaptor::ProfileInfo& profileInfo)
 {
-  if (!controllerCapabilities.createDeleteVolume) {
+  if (!controllerCapabilities->createDeleteVolume) {
     return Failure(
         "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
   }
@@ -2203,7 +2080,7 @@ Future<bool> 
StorageLocalResourceProviderProcess::deleteVolume(
 
   // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is
   // supported. Otherwise, we simply leave it as a preprovisioned volume.
-  if (controllerCapabilities.createDeleteVolume) {
+  if (controllerCapabilities->createDeleteVolume) {
     deleted = deleted.then(defer(self(), [this, volumeId] {
       csi::v0::DeleteVolumeRequest request;
       request.set_volume_id(volumeId);
@@ -2237,7 +2114,7 @@ Future<bool> 
StorageLocalResourceProviderProcess::deleteVolume(
         garbageCollectMountPath(volumeId);
       }
 
-      return controllerCapabilities.createDeleteVolume;
+      return controllerCapabilities->createDeleteVolume;
     }));
 }
 
@@ -2264,7 +2141,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::validateVolume(
     return Nothing();
   }
 
-  if (!pluginCapabilities.controllerService) {
+  if (!pluginCapabilities->controllerService) {
     return Failure(
         "Plugin capability 'CONTROLLER_SERVICE' is not supported");
   }
@@ -2312,7 +2189,7 @@ Future<Resources> 
StorageLocalResourceProviderProcess::listVolumes()
   CHECK(info.has_id());
 
   // This is only used for reconciliation so no failure is returned.
-  if (!controllerCapabilities.listVolumes) {
+  if (!controllerCapabilities->listVolumes) {
     return Resources();
   }
 
@@ -2358,7 +2235,7 @@ Future<Resources> 
StorageLocalResourceProviderProcess::getCapacities()
   CHECK(info.has_id());
 
   // This is only used for reconciliation so no failure is returned.
-  if (!controllerCapabilities.getCapacity) {
+  if (!controllerCapabilities->getCapacity) {
     return Resources();
   }
 
diff --git a/src/resource_provider/storage/provider_process.hpp 
b/src/resource_provider/storage/provider_process.hpp
index 01f5fce..359db0c 100644
--- a/src/resource_provider/storage/provider_process.hpp
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -147,7 +147,6 @@ private:
   // resource provider and CSI volumes from checkpointed data.
   process::Future<Nothing> recover();
   process::Future<Nothing> recoverVolumes();
-  process::Future<Nothing> recoverResourceProviderState();
 
   void doReliableRegistration();
 
@@ -189,56 +188,49 @@ private:
   void reconcileOperations(
       const resource_provider::Event::ReconcileOperations& reconcile);
 
-  process::Future<Nothing> prepareIdentityService();
+  process::Future<Nothing> prepareServices();
 
-  // NOTE: This can only be called after `prepareIdentityService`.
-  process::Future<Nothing> prepareControllerService();
-
-  // NOTE: This can only be called after `prepareIdentityService` and
-  // `prepareControllerService`.
-  process::Future<Nothing> prepareNodeService();
+  process::Future<Nothing> publishVolume(const std::string& volumeId);
 
   // Transitions the state of the specified volume from `CREATED` or
   // `CONTROLLER_PUBLISH` to `NODE_READY`.
   //
-  // NOTE: This can only be called after `prepareControllerService` and
-  // `prepareNodeService`.
+  // NOTE: This can only be called after `prepareServices`.
   process::Future<Nothing> controllerPublish(const std::string& volumeId);
 
   // Transitions the state of the specified volume from `NODE_READY`,
   // `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
   //
-  // NOTE: This can only be called after `prepareControllerService` and
-  // `prepareNodeService`.
+  // NOTE: This can only be called after `prepareServices`.
   process::Future<Nothing> controllerUnpublish(const std::string& volumeId);
 
   // Transitions the state of the specified volume from `NODE_READY` or
   // `NODE_STAGE` to `VOL_READY`.
   //
-  // NOTE: This can only be called after `prepareNodeService`.
+  // NOTE: This can only be called after `prepareServices`.
   process::Future<Nothing> nodeStage(const std::string& volumeId);
 
   // Transitions the state of the specified volume from `VOL_READY`,
   // `NODE_STAGE` or `NODE_UNSTAGE` to `NODE_READY`.
   //
-  // NOTE: This can only be called after `prepareNodeService`.
+  // NOTE: This can only be called after `prepareServices`.
   process::Future<Nothing> nodeUnstage(const std::string& volumeId);
 
   // Transitions the state of the specified volume from `VOL_READY` or
   // `NODE_PUBLISH` to `PUBLISHED`.
   //
-  // NOTE: This can only be called after `prepareNodeService`.
+  // NOTE: This can only be called after `prepareServices`.
   process::Future<Nothing> nodePublish(const std::string& volumeId);
 
   // Transitions the state of the specified volume from `PUBLISHED`,
   // `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
   //
-  // NOTE: This can only be called after `prepareNodeService`.
+  // NOTE: This can only be called after `prepareServices`.
   process::Future<Nothing> nodeUnpublish(const std::string& volumeId);
 
   // Returns a CSI volume ID.
   //
-  // NOTE: This can only be called after `prepareControllerService`.
+  // NOTE: This can only be called after `prepareServices`.
   process::Future<std::string> createVolume(
       const std::string& name,
       const Bytes& capacity,
@@ -246,24 +238,21 @@ private:
 
   // Returns true if the volume has been deprovisioned.
   //
-  // NOTE: This can only be called after `prepareControllerService` and
-  // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
+  // NOTE: This can only be called after `prepareServices`.
   process::Future<bool> deleteVolume(const std::string& volumeId);
 
   // Validates if a volume supports the capability of the specified profile.
   //
-  // NOTE: This can only be called after `prepareIdentityService`.
+  // NOTE: This can only be called after `prepareServices`.
   process::Future<Nothing> validateVolume(
       const std::string& volumeId,
       const Option<Labels>& metadata,
       const DiskProfileAdaptor::ProfileInfo& profileInfo);
 
-  // NOTE: This can only be called after `prepareControllerService` and the
-  // resource provider ID has been obtained.
+  // NOTE: This can only be called after `prepareServices`.
   process::Future<Resources> listVolumes();
 
-  // NOTE: This can only be called after `prepareControllerService` and the
-  // resource provider ID has been obtained.
+  // NOTE: This can only be called after `prepareServices`.
   process::Future<Resources> getCapacities();
 
   // Applies the operation. Speculative operations will be synchronously
@@ -333,7 +322,6 @@ private:
 
   std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
 
-  std::string bootId;
   process::grpc::client::Runtime runtime;
   process::Owned<v1::resource_provider::Driver> driver;
   OperationStatusUpdateManager statusUpdateManager;
@@ -343,10 +331,15 @@ private:
 
   process::Owned<csi::ServiceManager> serviceManager;
 
-  Option<csi::v0::GetPluginInfoResponse> pluginInfo;
-  csi::v0::PluginCapabilities pluginCapabilities;
-  csi::v0::ControllerCapabilities controllerCapabilities;
-  csi::v0::NodeCapabilities nodeCapabilities;
+  // TODO(chhsiao): Remove the following variables after refactoring.
+  std::string rootDir;
+  CSIPluginInfo pluginInfo;
+  hashset<csi::Service> services;
+
+  Option<std::string> bootId;
+  Option<csi::v0::PluginCapabilities> pluginCapabilities;
+  Option<csi::v0::ControllerCapabilities> controllerCapabilities;
+  Option<csi::v0::NodeCapabilities> nodeCapabilities;
   Option<std::string> nodeId;
 
   // We maintain the following invariant: if one operation depends on

Reply via email to