This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 1da706a5863727b60fdff9ef02f5152ca8c40683 Author: Chun-Hung Hsiao <chhs...@apache.org> AuthorDate: Mon Apr 1 23:23:46 2019 -0700 Cleanup the recovery logic for refactoring SLRP. In addition to perform volume state recovery, the `recoverVolumes` function also recovers service manager and preparing services now. The whole logic will be moved out from SLRP to v0 `VolumeManager` later. During volume state recovery, we no longer recover all volumes to steady states, since transient states are properly handled in SLRP. To simplify the recovery logic, a `publishVolume` method that conforms to the volume manager's `publishVolume` is introduced. Review: https://reviews.apache.org/r/70216/ --- src/resource_provider/storage/provider.cpp | 827 +++++++++------------ src/resource_provider/storage/provider_process.hpp | 51 +- 2 files changed, 374 insertions(+), 504 deletions(-) diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index b762bb5..428d239 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -97,6 +97,10 @@ namespace http = process::http; +// TODO(chhsiao): Remove `using namespace` statements after refactoring. +using namespace mesos::csi; +using namespace mesos::csi::v0; + using std::accumulate; using std::find; using std::list; @@ -449,23 +453,19 @@ StorageLocalResourceProviderProcess::__call( void StorageLocalResourceProviderProcess::initialize() { - Try<string> _bootId = os::bootId(); - if (_bootId.isError()) { - LOG(ERROR) << "Failed to get boot ID: " << _bootId.error(); - return fatal(); - } - - bootId = _bootId.get(); - const Principal principal = LocalResourceProvider::principal(info); CHECK(principal.claims.contains("cid_prefix")); const string& containerPrefix = principal.claims.at("cid_prefix"); + rootDir = slave::paths::getCsiRootDir(workDir); + pluginInfo = info.storage().plugin(); + services = {CONTROLLER_SERVICE, NODE_SERVICE}; + serviceManager.reset(new ServiceManager( extractParentEndpoint(url), - slave::paths::getCsiRootDir(workDir), - info.storage().plugin(), - {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE}, + rootDir, + pluginInfo, + services, containerPrefix, authToken, runtime, @@ -500,21 +500,81 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() { CHECK_EQ(RECOVERING, state); - // NOTE: The `Controller` service is supported if the plugin has the - // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is supported if - // the `Controller` service has the `PUBLISH_UNPUBLISH_VOLUME` capability. So - // we first launch the node plugin to get the plugin capabilities, then decide - // if we need to launch the controller plugin and get the node ID. - return serviceManager->recover() - .then(defer(self(), &Self::prepareIdentityService)) - .then(defer(self(), &Self::prepareControllerService)) - .then(defer(self(), &Self::prepareNodeService)) - .then(defer(self(), &Self::recoverVolumes)) - .then(defer(self(), &Self::recoverResourceProviderState)) + return recoverVolumes() .then(defer(self(), [=]() -> Future<Nothing> { - LOG(INFO) - << "Finished recovery for resource provider with type '" << info.type() - << "' and name '" << info.name() << "'"; + // Recover the resource provider ID and state from the latest symlink. If + // the symlink does not exist, this is a new resource provider, and the + // total resources will be empty, which is fine since new resources will + // be added during reconciliation. + Result<string> realpath = + os::realpath(slave::paths::getLatestResourceProviderPath( + metaDir, slaveId, info.type(), info.name())); + + if (realpath.isError()) { + return Failure( + "Failed to read the latest symlink for resource provider with type " + "'" + info.type() + "' and name '" + info.name() + "': " + + realpath.error()); + } + + if (realpath.isSome()) { + info.mutable_id()->set_value(Path(realpath.get()).basename()); + + const string statePath = slave::paths::getResourceProviderStatePath( + metaDir, slaveId, info.type(), info.name(), info.id()); + + if (os::exists(statePath)) { + Result<ResourceProviderState> resourceProviderState = + slave::state::read<ResourceProviderState>(statePath); + + if (resourceProviderState.isError()) { + return Failure( + "Failed to read resource provider state from '" + statePath + + "': " + resourceProviderState.error()); + } + + if (resourceProviderState.isSome()) { + foreach (const Operation& operation, + resourceProviderState->operations()) { + Try<id::UUID> uuid = + id::UUID::fromBytes(operation.uuid().value()); + + operations[CHECK_NOTERROR(uuid)] = operation; + } + + totalResources = resourceProviderState->resources(); + + const ResourceProviderState::Storage& storage = + resourceProviderState->storage(); + + using ProfileEntry = google::protobuf:: + MapPair<string, ResourceProviderState::Storage::ProfileInfo>; + + foreach (const ProfileEntry& entry, storage.profiles()) { + profileInfos.put( + entry.first, + {entry.second.capability(), entry.second.parameters()}); + } + + // We only checkpoint profiles associated with storage pools (i.e., + // resources without IDs) in `checkpointResourceProviderState` as + // only these profiles might be used by pending operations, so we + // validate here that all such profiles exist. + foreach (const Resource& resource, totalResources) { + if (!resource.disk().source().has_id() && + resource.disk().source().has_profile() && + !profileInfos.contains(resource.disk().source().profile())) { + return Failure( + "Cannot recover profile for storage pool '" + + stringify(resource) + "' from '" + statePath + "'"); + } + } + } + } + } + + LOG(INFO) << "Finished recovery for resource provider with type '" + << info.type() << "' and name '" << info.name() << "'"; state = DISCONNECTED; @@ -543,338 +603,142 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes() { - // Recover the states of CSI volumes. - Try<list<string>> volumePaths = csi::paths::getVolumePaths( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name()); - - if (volumePaths.isError()) { - return Failure( - "Failed to find volumes for CSI plugin type '" + - info.storage().plugin().type() + "' and name '" + - info.storage().plugin().name() + "': " + volumePaths.error()); + Try<string> bootId_ = os::bootId(); + if (bootId_.isError()) { + return Failure("Failed to get boot ID: " + bootId_.error()); } - vector<Future<Nothing>> futures; + bootId = bootId_.get(); - foreach (const string& path, volumePaths.get()) { - Try<csi::paths::VolumePath> volumePath = - csi::paths::parseVolumePath(slave::paths::getCsiRootDir(workDir), path); - - if (volumePath.isError()) { - return Failure( - "Failed to parse volume path '" + path + "': " + volumePath.error()); - } + return serviceManager->recover() + .then(process::defer(self(), &Self::prepareServices)) + .then(process::defer(self(), [this]() -> Future<Nothing> { + // Recover the states of CSI volumes. + Try<list<string>> volumePaths = + paths::getVolumePaths(rootDir, pluginInfo.type(), pluginInfo.name()); - CHECK_EQ(info.storage().plugin().type(), volumePath->type); - CHECK_EQ(info.storage().plugin().name(), volumePath->name); + if (volumePaths.isError()) { + return Failure( + "Failed to find volumes for CSI plugin type '" + pluginInfo.type() + + "' and name '" + pluginInfo.name() + "': " + volumePaths.error()); + } - const string& volumeId = volumePath->volumeId; - const string statePath = csi::paths::getVolumeStatePath( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name(), - volumeId); + vector<Future<Nothing>> futures; - if (!os::exists(statePath)) { - continue; - } + foreach (const string& path, volumePaths.get()) { + Try<paths::VolumePath> volumePath = + paths::parseVolumePath(rootDir, path); - Result<VolumeState> volumeState = - slave::state::read<VolumeState>(statePath); + if (volumePath.isError()) { + return Failure( + "Failed to parse volume path '" + path + + "': " + volumePath.error()); + } - if (volumeState.isError()) { - return Failure( - "Failed to read volume state from '" + statePath + "': " + - volumeState.error()); - } + CHECK_EQ(pluginInfo.type(), volumePath->type); + CHECK_EQ(pluginInfo.name(), volumePath->name); - if (volumeState.isSome()) { - volumes.put(volumeId, std::move(volumeState.get())); + const string& volumeId = volumePath->volumeId; + const string statePath = paths::getVolumeStatePath( + rootDir, pluginInfo.type(), pluginInfo.name(), volumeId); - // To avoid any race with, e.g., `deleteVolume` calls, we sequentialize - // this lambda with any other operation on the same volume below, so the - // volume is guaranteed to exist in the deferred execution. - std::function<Future<Nothing>()> recoverVolume = defer(self(), [=]() - -> Future<Nothing> { - VolumeData& volume = volumes.at(volumeId); - Future<Nothing> recovered = Nothing(); - - // First, bring the volume back to a "good" state. - if (VolumeState::State_IsValid(volume.state.state())) { - switch (volume.state.state()) { - case VolumeState::CREATED: - case VolumeState::NODE_READY: { - break; - } - case VolumeState::VOL_READY: - case VolumeState::PUBLISHED: { - if (volume.state.boot_id() != bootId) { - // The node has been restarted since the volume is made - // publishable, so it is reset to `NODE_READY` state. - volume.state.set_state(VolumeState::NODE_READY); - volume.state.clear_boot_id(); - checkpointVolumeState(volumeId); - } + if (!os::exists(statePath)) { + continue; + } - break; - } - case VolumeState::CONTROLLER_PUBLISH: { - recovered = controllerPublish(volumeId); + Result<VolumeState> volumeState = + slave::state::read<VolumeState>(statePath); - break; - } - case VolumeState::CONTROLLER_UNPUBLISH: { - recovered = controllerUnpublish(volumeId); + if (volumeState.isError()) { + return Failure( + "Failed to read volume state from '" + statePath + + "': " + volumeState.error()); + } - break; - } - case VolumeState::NODE_STAGE: { - recovered = nodeStage(volumeId); + if (volumeState.isNone()) { + continue; + } - break; - } - case VolumeState::NODE_UNSTAGE: { - if (volume.state.boot_id() != bootId) { - // The node has been restarted since the volume is made - // publishable, so it is reset to `NODE_READY` state. - volume.state.set_state(VolumeState::NODE_READY); - volume.state.clear_boot_id(); - checkpointVolumeState(volumeId); - } else { - recovered = nodeUnstage(volumeId); - } + volumes.put(volumeId, std::move(volumeState.get())); + VolumeData& volume = volumes.at(volumeId); - break; - } - case VolumeState::NODE_PUBLISH: { - if (volume.state.boot_id() != bootId) { - // The node has been restarted since the volume is made - // publishable, so it is reset to `NODE_READY` state. - volume.state.set_state(VolumeState::NODE_READY); - volume.state.clear_boot_id(); - checkpointVolumeState(volumeId); - } else { - recovered = nodePublish(volumeId); - } + if (!VolumeState::State_IsValid(volume.state.state())) { + return Failure("Volume '" + volumeId + "' is in INVALID state"); + } - break; + // First, if there is a node reboot after the volume is made + // publishable, it should be reset to `NODE_READY`. + switch (volume.state.state()) { + case VolumeState::CREATED: + case VolumeState::NODE_READY: + case VolumeState::CONTROLLER_PUBLISH: + case VolumeState::CONTROLLER_UNPUBLISH: + case VolumeState::NODE_STAGE: { + break; + } + case VolumeState::VOL_READY: + case VolumeState::PUBLISHED: + case VolumeState::NODE_UNSTAGE: + case VolumeState::NODE_PUBLISH: + case VolumeState::NODE_UNPUBLISH: { + if (bootId != volume.state.boot_id()) { + // Since this is a no-op, no need to checkpoint here. + volume.state.set_state(VolumeState::NODE_READY); + volume.state.clear_boot_id(); } - case VolumeState::NODE_UNPUBLISH: { - if (volume.state.boot_id() != bootId) { - // The node has been restarted since the volume is made - // publishable, so it is reset to `NODE_READY` state. - volume.state.set_state(VolumeState::NODE_READY); - volume.state.clear_boot_id(); - checkpointVolumeState(volumeId); - } else { - recovered = nodeUnpublish(volumeId); - } - break; - } - case VolumeState::UNKNOWN: { - return Failure( - "Volume '" + volumeId + "' is in " + - stringify(volume.state.state()) + " state"); - } + break; + } + case VolumeState::UNKNOWN: { + return Failure("Volume '" + volumeId + "' is in UNKNOWN state"); + } - // NOTE: We avoid using a default clause for the following values in - // proto3's open enum to enable the compiler to detect missing enum - // cases for us. See: https://github.com/google/protobuf/issues/3917 - case google::protobuf::kint32min: - case google::protobuf::kint32max: { - UNREACHABLE(); - } + // NOTE: We avoid using a default clause for the following values in + // proto3's open enum to enable the compiler to detect missing enum + // cases for us. See: https://github.com/google/protobuf/issues/3917 + case google::protobuf::kint32min: + case google::protobuf::kint32max: { + UNREACHABLE(); } - } else { - return Failure("Volume '" + volumeId + "' is in UNDEFINED state"); } - auto err = [](const string& volumeId, const string& message) { - LOG(ERROR) - << "Failed to recover volume '" << volumeId << "': " << message; - }; - // Second, if the volume has been used by a container before recovery, // we have to bring the volume back to `PUBLISHED` so data can be - // cleaned up synchronously upon `DESTROY`. Otherwise, we skip the error - // and continue recovery. + // cleaned up synchronously when needed. if (volume.state.node_publish_required()) { - recovered = recovered - .then(defer(self(), [this, volumeId]() -> Future<Nothing> { - const VolumeData& volume = volumes.at(volumeId); - Future<Nothing> published = Nothing(); - - CHECK(VolumeState::State_IsValid(volume.state.state())); - - switch (volume.state.state()) { - case VolumeState::NODE_READY: { - published = published - .then(defer(self(), &Self::nodeStage, volumeId)); - - // NOTE: We continue to the next case to recover the volume in - // `VOL_READY` state once the above is done. - } - case VolumeState::VOL_READY: { - published = published - .then(defer(self(), &Self::nodePublish, volumeId)); - - // NOTE: We continue to the next case to recover the volume in - // `PUBLISHED` state once the above is done. - } - case VolumeState::PUBLISHED: { - break; - } - case VolumeState::UNKNOWN: - case VolumeState::CREATED: - case VolumeState::CONTROLLER_PUBLISH: - case VolumeState::CONTROLLER_UNPUBLISH: - case VolumeState::NODE_STAGE: - case VolumeState::NODE_UNSTAGE: - case VolumeState::NODE_PUBLISH: - case VolumeState::NODE_UNPUBLISH: { - UNREACHABLE(); - } - - // NOTE: We avoid using a default clause for the following - // values in proto3's open enum to enable the compiler to detect - // missing enum cases for us. See: - // https://github.com/google/protobuf/issues/3917 - case google::protobuf::kint32min: - case google::protobuf::kint32max: { - UNREACHABLE(); - } - } - - return published; - })) - .onFailed(std::bind(err, volumeId, lambda::_1)) - .onDiscarded(std::bind(err, volumeId, "future discarded")); - } else { - recovered = recovered - .onFailed(std::bind(err, volumeId, lambda::_1)) - .onDiscarded(std::bind(err, volumeId, "future discarded")) - .recover([](const Future<Nothing>& future) { return Nothing(); }); + futures.push_back(publishVolume(volumeId)); } - - return recovered; - }); - - futures.push_back(volumes.at(volumeId).sequence->add(recoverVolume)); - } - } - - // Garbage collect leftover mount paths that were failed to remove before. - const string mountRootDir = csi::paths::getMountRootDir( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name()); - - Try<list<string>> mountPaths = csi::paths::getMountPaths(mountRootDir); - if (mountPaths.isError()) { - // TODO(chhsiao): This could indicate that something is seriously wrong. To - // help debugging the problem, we should surface the error via MESOS-8745. - return Failure( - "Failed to find mount paths for CSI plugin type '" + - info.storage().plugin().type() + "' and name '" + - info.storage().plugin().name() + "': " + mountPaths.error()); - } - - foreach (const string& path, mountPaths.get()) { - Try<string> volumeId = csi::paths::parseMountPath(mountRootDir, path); - if (volumeId.isError()) { - return Failure( - "Failed to parse mount path '" + path + "': " + volumeId.error()); - } - - if (!volumes.contains(volumeId.get())) { - garbageCollectMountPath(volumeId.get()); - } - } - - return collect(futures).then([] { return Nothing(); }); -} - - -Future<Nothing> -StorageLocalResourceProviderProcess::recoverResourceProviderState() -{ - // Recover the resource provider ID and state from the latest - // symlink. If the symlink does not exist, this is a new resource - // provider, and the total resources will be empty, which is fine - // since new resources will be added during reconciliation. - Result<string> realpath = os::realpath( - slave::paths::getLatestResourceProviderPath( - metaDir, slaveId, info.type(), info.name())); - - if (realpath.isError()) { - return Failure( - "Failed to read the latest symlink for resource provider with type '" + - info.type() + "' and name '" + info.name() + "': " + realpath.error()); - } - - if (realpath.isSome()) { - info.mutable_id()->set_value(Path(realpath.get()).basename()); - - const string statePath = slave::paths::getResourceProviderStatePath( - metaDir, slaveId, info.type(), info.name(), info.id()); - - if (!os::exists(statePath)) { - return Nothing(); - } - - Result<ResourceProviderState> resourceProviderState = - slave::state::read<ResourceProviderState>(statePath); - - if (resourceProviderState.isError()) { - return Failure( - "Failed to read resource provider state from '" + statePath + - "': " + resourceProviderState.error()); - } - - if (resourceProviderState.isSome()) { - foreach (const Operation& operation, - resourceProviderState->operations()) { - Try<id::UUID> uuid = id::UUID::fromBytes(operation.uuid().value()); - - CHECK_SOME(uuid); - - operations[uuid.get()] = operation; } - totalResources = resourceProviderState->resources(); - - const ResourceProviderState::Storage& storage = - resourceProviderState->storage(); - - using ProfileEntry = google::protobuf::MapPair< - string, ResourceProviderState::Storage::ProfileInfo>; + // Garbage collect leftover mount paths that were failed to remove before. + const string mountRootDir = + paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()); - foreach (const ProfileEntry& entry, storage.profiles()) { - profileInfos.put( - entry.first, - {entry.second.capability(), entry.second.parameters()}); + Try<list<string>> mountPaths = paths::getMountPaths(mountRootDir); + if (mountPaths.isError()) { + // TODO(chhsiao): This could indicate that something is seriously wrong. + // To help debugging the problem, we should surface the error via + // MESOS-8745. + return Failure( + "Failed to find mount paths for CSI plugin type '" + + pluginInfo.type() + "' and name '" + pluginInfo.name() + + "': " + mountPaths.error()); } - // We only checkpoint profiles associated with storage pools (i.e., - // resources without IDs) in `checkpointResourceProviderState` as only - // these profiles might be used by pending operations, so we validate here - // that all such profiles exist. - foreach (const Resource& resource, totalResources) { - if (!resource.disk().source().has_id() && - resource.disk().source().has_profile() && - !profileInfos.contains(resource.disk().source().profile())) { + foreach (const string& path, mountPaths.get()) { + Try<string> volumeId = paths::parseMountPath(mountRootDir, path); + if (volumeId.isError()) { return Failure( - "Cannot recover profile for storage pool '" + - stringify(resource) + "' from '" + statePath + "'"); + "Failed to parse mount path '" + path + "': " + volumeId.error()); + } + + if (!volumes.contains(volumeId.get())) { + garbageCollectMountPath(volumeId.get()); } } - } - } - return Nothing(); + return process::collect(futures).then([] { return Nothing(); }); + })); } @@ -1486,87 +1350,7 @@ void StorageLocalResourceProviderProcess::publishResources( vector<Future<Nothing>> futures; foreach (const string& volumeId, volumeIds) { - // We check the state of the volume along with the CSI calls - // atomically with respect to other publish or deletion requests - // for the same volume through dispatching the whole lambda on the - // volume's sequence. - std::function<Future<Nothing>()> controllerAndNodePublish = - defer(self(), [=] { - CHECK(volumes.contains(volumeId)); - const VolumeData& volume = volumes.at(volumeId); - - Future<Nothing> published = Nothing(); - - CHECK(VolumeState::State_IsValid(volume.state.state())); - - switch (volume.state.state()) { - case VolumeState::CONTROLLER_UNPUBLISH: { - published = published - .then(defer(self(), &Self::controllerUnpublish, volumeId)); - - // NOTE: We continue to the next case to publish the volume in - // `CREATED` state once the above is done. - } - case VolumeState::CREATED: - case VolumeState::CONTROLLER_PUBLISH: { - published = published - .then(defer(self(), &Self::controllerPublish, volumeId)) - .then(defer(self(), &Self::nodeStage, volumeId)) - .then(defer(self(), &Self::nodePublish, volumeId)); - - break; - } - case VolumeState::NODE_UNSTAGE: { - published = published - .then(defer(self(), &Self::nodeUnstage, volumeId)); - - // NOTE: We continue to the next case to publish the volume in - // `NODE_READY` state once the above is done. - } - case VolumeState::NODE_READY: - case VolumeState::NODE_STAGE: { - published = published - .then(defer(self(), &Self::nodeStage, volumeId)) - .then(defer(self(), &Self::nodePublish, volumeId)); - - break; - } - case VolumeState::NODE_UNPUBLISH: { - published = published - .then(defer(self(), &Self::nodeUnpublish, volumeId)); - - // NOTE: We continue to the next case to publish the volume in - // `VOL_READY` state once the above is done. - } - case VolumeState::VOL_READY: - case VolumeState::NODE_PUBLISH: { - published = published - .then(defer(self(), &Self::nodePublish, volumeId)); - - break; - } - case VolumeState::PUBLISHED: { - break; - } - case VolumeState::UNKNOWN: { - UNREACHABLE(); - } - - // NOTE: We avoid using a default clause for the following - // values in proto3's open enum to enable the compiler to detect - // missing enum cases for us. See: - // https://github.com/google/protobuf/issues/3917 - case google::protobuf::kint32min: - case google::protobuf::kint32max: { - UNREACHABLE(); - } - } - - return published; - }); - - futures.push_back( - volumes.at(volumeId).sequence->add(controllerAndNodePublish)); + futures.push_back(publishVolume(volumeId)); } allPublished = collect(futures); @@ -1674,86 +1458,179 @@ void StorageLocalResourceProviderProcess::reconcileOperations( } -Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService() +Future<Nothing> StorageLocalResourceProviderProcess::prepareServices() { - // Get the plugin info. - return call<csi::v0::GET_PLUGIN_INFO>( - csi::NODE_SERVICE, csi::v0::GetPluginInfoRequest()) - .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) { - pluginInfo = response; - - LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get()); + CHECK(!services.empty()); - // Get the plugin capabilities. - return call<csi::v0::GET_PLUGIN_CAPABILITIES>( - csi::NODE_SERVICE, csi::v0::GetPluginCapabilitiesRequest()); - })) - .then(defer(self(), [=]( - const csi::v0::GetPluginCapabilitiesResponse& response) { + // Get the plugin capabilities. + return call<GET_PLUGIN_CAPABILITIES>( + *services.begin(), GetPluginCapabilitiesRequest()) + .then(process::defer(self(), [=]( + const GetPluginCapabilitiesResponse& response) -> Future<Nothing> { pluginCapabilities = response.capabilities(); + if (services.contains(CONTROLLER_SERVICE) && + !pluginCapabilities->controllerService) { + return Failure( + "CONTROLLER_SERVICE plugin capability is not supported for CSI " + "plugin type '" + + pluginInfo.type() + "' and name '" + pluginInfo.name() + "'"); + } + return Nothing(); + })) + // Check if all services have consistent plugin infos. + .then(process::defer(self(), [this] { + vector<Future<GetPluginInfoResponse>> futures; + foreach (const Service& service, services) { + futures.push_back( + call<GET_PLUGIN_INFO>(CONTROLLER_SERVICE, GetPluginInfoRequest()) + .onReady([service](const GetPluginInfoResponse& response) { + LOG(INFO) << service << " loaded: " << stringify(response); + })); + } + + return process::collect(futures) + .then([](const vector<GetPluginInfoResponse>& pluginInfos) { + for (size_t i = 1; i < pluginInfos.size(); ++i) { + if (pluginInfos[i].name() != pluginInfos[0].name() || + pluginInfos[i].vendor_version() != + pluginInfos[0].vendor_version()) { + LOG(WARNING) << "Inconsistent plugin services. Please check with " + "the plugin vendor to ensure compatibility."; + } + } + + return Nothing(); + }); + })) + // Get the controller capabilities. + .then(process::defer(self(), [this]() -> Future<Nothing> { + if (!services.contains(CONTROLLER_SERVICE)) { + controllerCapabilities = ControllerCapabilities(); + return Nothing(); + } + + return call<CONTROLLER_GET_CAPABILITIES>( + CONTROLLER_SERVICE, ControllerGetCapabilitiesRequest()) + .then(process::defer(self(), [this]( + const ControllerGetCapabilitiesResponse& response) { + controllerCapabilities = response.capabilities(); + return Nothing(); + })); + })) + // Get the node capabilities and ID. + .then(process::defer(self(), [this]() -> Future<Nothing> { + if (!services.contains(NODE_SERVICE)) { + nodeCapabilities = NodeCapabilities(); + return Nothing(); + } + + return call<NODE_GET_CAPABILITIES>( + NODE_SERVICE, NodeGetCapabilitiesRequest()) + .then(process::defer(self(), [this]( + const NodeGetCapabilitiesResponse& response) -> Future<Nothing> { + nodeCapabilities = response.capabilities(); + + if (controllerCapabilities->publishUnpublishVolume) { + return call<NODE_GET_ID>(NODE_SERVICE, NodeGetIdRequest()) + .then(process::defer(self(), [this]( + const NodeGetIdResponse& response) { + nodeId = response.node_id(); + return Nothing(); + })); + } + + return Nothing(); + })); })); } -Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService() +Future<Nothing> StorageLocalResourceProviderProcess::publishVolume( + const string& volumeId) { - CHECK_SOME(pluginInfo); + CHECK(volumes.contains(volumeId)); - if (!pluginCapabilities.controllerService) { - return Nothing(); - } + // To avoid any race with, e.g., `deleteVolume` calls, we sequentialize this + // lambda with any other operation on the same volume below, so the volume is + // guaranteed to exist in the deferred execution. + std::function<Future<Nothing>()> controllerAndNodePublish = + defer(self(), [this, volumeId] { + CHECK(volumes.contains(volumeId)); + const VolumeData& volume = volumes.at(volumeId); - // Get the controller plugin info and check for consistency. - return call<csi::v0::GET_PLUGIN_INFO>( - csi::CONTROLLER_SERVICE, csi::v0::GetPluginInfoRequest()) - .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) { - LOG(INFO) << "Controller plugin loaded: " << stringify(response); + Future<Nothing> published = Nothing(); - if (pluginInfo->name() != response.name() || - pluginInfo->vendor_version() != response.vendor_version()) { - LOG(WARNING) - << "Inconsistent controller and node plugin components. Please check " - "with the plugin vendor to ensure compatibility."; - } + CHECK(VolumeState::State_IsValid(volume.state.state())); - // Get the controller capabilities. - return call<csi::v0::CONTROLLER_GET_CAPABILITIES>( - csi::CONTROLLER_SERVICE, csi::v0::ControllerGetCapabilitiesRequest()); - })) - .then(defer(self(), [=]( - const csi::v0::ControllerGetCapabilitiesResponse& response) { - controllerCapabilities = response.capabilities(); + switch (volume.state.state()) { + case VolumeState::CONTROLLER_UNPUBLISH: { + published = published + .then(defer(self(), &Self::controllerUnpublish, volumeId)); - return Nothing(); - })); -} + // NOTE: We continue to the next case to publish the volume in + // `CREATED` state once the above is done. + } + case VolumeState::CREATED: + case VolumeState::CONTROLLER_PUBLISH: { + published = published + .then(defer(self(), &Self::controllerPublish, volumeId)) + .then(defer(self(), &Self::nodeStage, volumeId)) + .then(defer(self(), &Self::nodePublish, volumeId)); + break; + } + case VolumeState::NODE_UNSTAGE: { + published = published + .then(defer(self(), &Self::nodeUnstage, volumeId)); -Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() -{ - // Get the node capabilities. - return call<csi::v0::NODE_GET_CAPABILITIES>( - csi::NODE_SERVICE, csi::v0::NodeGetCapabilitiesRequest()) - .then(defer(self(), [=]( - const csi::v0::NodeGetCapabilitiesResponse& response) - -> Future<Nothing> { - nodeCapabilities = response.capabilities(); + // NOTE: We continue to the next case to publish the volume in + // `NODE_READY` state once the above is done. + } + case VolumeState::NODE_READY: + case VolumeState::NODE_STAGE: { + published = published + .then(defer(self(), &Self::nodeStage, volumeId)) + .then(defer(self(), &Self::nodePublish, volumeId)); - if (!controllerCapabilities.publishUnpublishVolume) { - return Nothing(); + break; + } + case VolumeState::NODE_UNPUBLISH: { + published = published + .then(defer(self(), &Self::nodeUnpublish, volumeId)); + + // NOTE: We continue to the next case to publish the volume in + // `VOL_READY` state once the above is done. + } + case VolumeState::VOL_READY: + case VolumeState::NODE_PUBLISH: { + published = published + .then(defer(self(), &Self::nodePublish, volumeId)); + + break; + } + case VolumeState::PUBLISHED: { + break; + } + case VolumeState::UNKNOWN: { + UNREACHABLE(); + } + + // NOTE: We avoid using a default clause for the following + // values in proto3's open enum to enable the compiler to detect + // missing enum cases for us. See: + // https://github.com/google/protobuf/issues/3917 + case google::protobuf::kint32min: + case google::protobuf::kint32max: { + UNREACHABLE(); + } } - // Get the node ID. - return call<csi::v0::NODE_GET_ID>( - csi::NODE_SERVICE, csi::v0::NodeGetIdRequest()) - .then(defer(self(), [=](const csi::v0::NodeGetIdResponse& response) { - nodeId = response.node_id(); + return published; + }); - return Nothing(); - })); - })); + return volumes.at(volumeId).sequence->add(controllerAndNodePublish); } @@ -1763,7 +1640,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish( CHECK(volumes.contains(volumeId)); VolumeData& volume = volumes.at(volumeId); - if (!controllerCapabilities.publishUnpublishVolume) { + if (!controllerCapabilities->publishUnpublishVolume) { CHECK_EQ(VolumeState::CREATED, volume.state.state()); volume.state.set_state(VolumeState::NODE_READY); @@ -1810,7 +1687,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( CHECK(volumes.contains(volumeId)); VolumeData& volume = volumes.at(volumeId); - if (!controllerCapabilities.publishUnpublishVolume) { + if (!controllerCapabilities->publishUnpublishVolume) { CHECK_EQ(VolumeState::NODE_READY, volume.state.state()); volume.state.set_state(VolumeState::CREATED); @@ -1856,11 +1733,11 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage( CHECK(volumes.contains(volumeId)); VolumeData& volume = volumes.at(volumeId); - if (!nodeCapabilities.stageUnstageVolume) { + if (!nodeCapabilities->stageUnstageVolume) { CHECK_EQ(VolumeState::NODE_READY, volume.state.state()); volume.state.set_state(VolumeState::VOL_READY); - volume.state.set_boot_id(bootId); + volume.state.set_boot_id(CHECK_NOTNONE(bootId)); checkpointVolumeState(volumeId); return Nothing(); @@ -1901,7 +1778,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage( VolumeData& volume = volumes.at(volumeId); volume.state.set_state(VolumeState::VOL_READY); - volume.state.set_boot_id(bootId); + volume.state.set_boot_id(CHECK_NOTNONE(bootId)); checkpointVolumeState(volumeId); return Nothing(); @@ -1915,7 +1792,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage( CHECK(volumes.contains(volumeId)); VolumeData& volume = volumes.at(volumeId); - if (!nodeCapabilities.stageUnstageVolume) { + if (!nodeCapabilities->stageUnstageVolume) { CHECK_EQ(VolumeState::VOL_READY, volume.state.state()); volume.state.set_state(VolumeState::NODE_READY); @@ -2000,7 +1877,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( request.set_readonly(false); *request.mutable_volume_attributes() = volume.state.volume_attributes(); - if (nodeCapabilities.stageUnstageVolume) { + if (nodeCapabilities->stageUnstageVolume) { const string stagingPath = csi::paths::getMountStagingPath( csi::paths::getMountRootDir( slave::paths::getCsiRootDir(workDir), @@ -2081,7 +1958,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( const Bytes& capacity, const DiskProfileAdaptor::ProfileInfo& profileInfo) { - if (!controllerCapabilities.createDeleteVolume) { + if (!controllerCapabilities->createDeleteVolume) { return Failure( "Controller capability 'CREATE_DELETE_VOLUME' is not supported"); } @@ -2203,7 +2080,7 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume( // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is // supported. Otherwise, we simply leave it as a preprovisioned volume. - if (controllerCapabilities.createDeleteVolume) { + if (controllerCapabilities->createDeleteVolume) { deleted = deleted.then(defer(self(), [this, volumeId] { csi::v0::DeleteVolumeRequest request; request.set_volume_id(volumeId); @@ -2237,7 +2114,7 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume( garbageCollectMountPath(volumeId); } - return controllerCapabilities.createDeleteVolume; + return controllerCapabilities->createDeleteVolume; })); } @@ -2264,7 +2141,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume( return Nothing(); } - if (!pluginCapabilities.controllerService) { + if (!pluginCapabilities->controllerService) { return Failure( "Plugin capability 'CONTROLLER_SERVICE' is not supported"); } @@ -2312,7 +2189,7 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes() CHECK(info.has_id()); // This is only used for reconciliation so no failure is returned. - if (!controllerCapabilities.listVolumes) { + if (!controllerCapabilities->listVolumes) { return Resources(); } @@ -2358,7 +2235,7 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities() CHECK(info.has_id()); // This is only used for reconciliation so no failure is returned. - if (!controllerCapabilities.getCapacity) { + if (!controllerCapabilities->getCapacity) { return Resources(); } diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp index 01f5fce..359db0c 100644 --- a/src/resource_provider/storage/provider_process.hpp +++ b/src/resource_provider/storage/provider_process.hpp @@ -147,7 +147,6 @@ private: // resource provider and CSI volumes from checkpointed data. process::Future<Nothing> recover(); process::Future<Nothing> recoverVolumes(); - process::Future<Nothing> recoverResourceProviderState(); void doReliableRegistration(); @@ -189,56 +188,49 @@ private: void reconcileOperations( const resource_provider::Event::ReconcileOperations& reconcile); - process::Future<Nothing> prepareIdentityService(); + process::Future<Nothing> prepareServices(); - // NOTE: This can only be called after `prepareIdentityService`. - process::Future<Nothing> prepareControllerService(); - - // NOTE: This can only be called after `prepareIdentityService` and - // `prepareControllerService`. - process::Future<Nothing> prepareNodeService(); + process::Future<Nothing> publishVolume(const std::string& volumeId); // Transitions the state of the specified volume from `CREATED` or // `CONTROLLER_PUBLISH` to `NODE_READY`. // - // NOTE: This can only be called after `prepareControllerService` and - // `prepareNodeService`. + // NOTE: This can only be called after `prepareServices`. process::Future<Nothing> controllerPublish(const std::string& volumeId); // Transitions the state of the specified volume from `NODE_READY`, // `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`. // - // NOTE: This can only be called after `prepareControllerService` and - // `prepareNodeService`. + // NOTE: This can only be called after `prepareServices`. process::Future<Nothing> controllerUnpublish(const std::string& volumeId); // Transitions the state of the specified volume from `NODE_READY` or // `NODE_STAGE` to `VOL_READY`. // - // NOTE: This can only be called after `prepareNodeService`. + // NOTE: This can only be called after `prepareServices`. process::Future<Nothing> nodeStage(const std::string& volumeId); // Transitions the state of the specified volume from `VOL_READY`, // `NODE_STAGE` or `NODE_UNSTAGE` to `NODE_READY`. // - // NOTE: This can only be called after `prepareNodeService`. + // NOTE: This can only be called after `prepareServices`. process::Future<Nothing> nodeUnstage(const std::string& volumeId); // Transitions the state of the specified volume from `VOL_READY` or // `NODE_PUBLISH` to `PUBLISHED`. // - // NOTE: This can only be called after `prepareNodeService`. + // NOTE: This can only be called after `prepareServices`. process::Future<Nothing> nodePublish(const std::string& volumeId); // Transitions the state of the specified volume from `PUBLISHED`, // `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`. // - // NOTE: This can only be called after `prepareNodeService`. + // NOTE: This can only be called after `prepareServices`. process::Future<Nothing> nodeUnpublish(const std::string& volumeId); // Returns a CSI volume ID. // - // NOTE: This can only be called after `prepareControllerService`. + // NOTE: This can only be called after `prepareServices`. process::Future<std::string> createVolume( const std::string& name, const Bytes& capacity, @@ -246,24 +238,21 @@ private: // Returns true if the volume has been deprovisioned. // - // NOTE: This can only be called after `prepareControllerService` and - // `prepareNodeService` (since it may require `NodeUnpublishVolume`). + // NOTE: This can only be called after `prepareServices`. process::Future<bool> deleteVolume(const std::string& volumeId); // Validates if a volume supports the capability of the specified profile. // - // NOTE: This can only be called after `prepareIdentityService`. + // NOTE: This can only be called after `prepareServices`. process::Future<Nothing> validateVolume( const std::string& volumeId, const Option<Labels>& metadata, const DiskProfileAdaptor::ProfileInfo& profileInfo); - // NOTE: This can only be called after `prepareControllerService` and the - // resource provider ID has been obtained. + // NOTE: This can only be called after `prepareServices`. process::Future<Resources> listVolumes(); - // NOTE: This can only be called after `prepareControllerService` and the - // resource provider ID has been obtained. + // NOTE: This can only be called after `prepareServices`. process::Future<Resources> getCapacities(); // Applies the operation. Speculative operations will be synchronously @@ -333,7 +322,6 @@ private: std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor; - std::string bootId; process::grpc::client::Runtime runtime; process::Owned<v1::resource_provider::Driver> driver; OperationStatusUpdateManager statusUpdateManager; @@ -343,10 +331,15 @@ private: process::Owned<csi::ServiceManager> serviceManager; - Option<csi::v0::GetPluginInfoResponse> pluginInfo; - csi::v0::PluginCapabilities pluginCapabilities; - csi::v0::ControllerCapabilities controllerCapabilities; - csi::v0::NodeCapabilities nodeCapabilities; + // TODO(chhsiao): Remove the following variables after refactoring. + std::string rootDir; + CSIPluginInfo pluginInfo; + hashset<csi::Service> services; + + Option<std::string> bootId; + Option<csi::v0::PluginCapabilities> pluginCapabilities; + Option<csi::v0::ControllerCapabilities> controllerCapabilities; + Option<csi::v0::NodeCapabilities> nodeCapabilities; Option<std::string> nodeId; // We maintain the following invariant: if one operation depends on