This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit b2f1d3bbf0fdae99c5945df15323bbd28a067b79 Author: Chun-Hung Hsiao <chhs...@apache.org> AuthorDate: Mon Apr 1 23:23:55 2019 -0700 Refactored SLRP to use v0 `VolumeManager`. This patch moves volume management code from SLRP to the v0 `VolumeManager`, and make SLRP uses the `VolumeManager` interface polymorphically. However, since SLRP now no longer keeps track of CSI volume states, it will not be able to verify that a persistent volume is published before being destroyed (although this should be guaranteed by volume manager recovery). Review: https://reviews.apache.org/r/70222/ --- src/csi/v0_volume_manager.cpp | 1015 ++++++++++++++++- src/csi/v0_volume_manager_process.hpp | 107 ++ src/resource_provider/storage/provider.cpp | 1190 +------------------- src/resource_provider/storage/provider_process.hpp | 151 +-- .../storage_local_resource_provider_tests.cpp | 25 +- 5 files changed, 1183 insertions(+), 1305 deletions(-) diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp index 2a4d3eb..c6112be 100644 --- a/src/csi/v0_volume_manager.cpp +++ b/src/csi/v0_volume_manager.cpp @@ -16,26 +16,57 @@ #include "csi/v0_volume_manager.hpp" +#include <algorithm> +#include <cstdlib> +#include <functional> +#include <list> + +#include <process/after.hpp> +#include <process/collect.hpp> #include <process/defer.hpp> #include <process/dispatch.hpp> #include <process/id.hpp> +#include <process/loop.hpp> #include <process/process.hpp> #include <stout/check.hpp> +#include <stout/duration.hpp> +#include <stout/foreach.hpp> +#include <stout/none.hpp> +#include <stout/os.hpp> +#include <stout/result.hpp> +#include <stout/some.hpp> +#include <stout/stringify.hpp> +#include <stout/try.hpp> +#include <stout/unreachable.hpp> +#include "csi/client.hpp" +#include "csi/paths.hpp" +#include "csi/utils.hpp" #include "csi/v0_volume_manager_process.hpp" +#include "slave/state.hpp" + namespace http = process::http; +namespace slave = mesos::internal::slave; +using std::list; using std::string; using std::vector; using google::protobuf::Map; +using mesos::csi::state::VolumeState; + +using process::Break; +using process::Continue; +using process::ControlFlow; using process::Failure; using process::Future; using process::ProcessBase; +using process::grpc::StatusError; + using process::grpc::client::Runtime; namespace mesos{ @@ -76,13 +107,163 @@ VolumeManagerProcess::VolumeManagerProcess( Future<Nothing> VolumeManagerProcess::recover() { - return Failure("Unimplemented"); + Try<string> bootId_ = os::bootId(); + if (bootId_.isError()) { + return Failure("Failed to get boot ID: " + bootId_.error()); + } + + bootId = bootId_.get(); + + return serviceManager->recover() + .then(process::defer(self(), &Self::prepareServices)) + .then(process::defer(self(), [this]() -> Future<Nothing> { + // Recover the states of CSI volumes. + Try<list<string>> volumePaths = + paths::getVolumePaths(rootDir, info.type(), info.name()); + + if (volumePaths.isError()) { + return Failure( + "Failed to find volumes for CSI plugin type '" + info.type() + + "' and name '" + info.name() + "': " + volumePaths.error()); + } + + vector<Future<Nothing>> futures; + + foreach (const string& path, volumePaths.get()) { + Try<paths::VolumePath> volumePath = + paths::parseVolumePath(rootDir, path); + + if (volumePath.isError()) { + return Failure( + "Failed to parse volume path '" + path + + "': " + volumePath.error()); + } + + CHECK_EQ(info.type(), volumePath->type); + CHECK_EQ(info.name(), volumePath->name); + + const string& volumeId = volumePath->volumeId; + const string statePath = paths::getVolumeStatePath( + rootDir, info.type(), info.name(), volumeId); + + if (!os::exists(statePath)) { + continue; + } + + Result<VolumeState> volumeState = + slave::state::read<VolumeState>(statePath); + + if (volumeState.isError()) { + return Failure( + "Failed to read volume state from '" + statePath + + "': " + volumeState.error()); + } + + if (volumeState.isNone()) { + continue; + } + + volumes.put(volumeId, std::move(volumeState.get())); + VolumeData& volume = volumes.at(volumeId); + + if (!VolumeState::State_IsValid(volume.state.state())) { + return Failure("Volume '" + volumeId + "' is in INVALID state"); + } + + // First, if there is a node reboot after the volume is made + // publishable, it should be reset to `NODE_READY`. + switch (volume.state.state()) { + case VolumeState::CREATED: + case VolumeState::NODE_READY: + case VolumeState::CONTROLLER_PUBLISH: + case VolumeState::CONTROLLER_UNPUBLISH: + case VolumeState::NODE_STAGE: { + break; + } + case VolumeState::VOL_READY: + case VolumeState::PUBLISHED: + case VolumeState::NODE_UNSTAGE: + case VolumeState::NODE_PUBLISH: + case VolumeState::NODE_UNPUBLISH: { + if (bootId != volume.state.boot_id()) { + // Since this is a no-op, no need to checkpoint here. + volume.state.set_state(VolumeState::NODE_READY); + volume.state.clear_boot_id(); + } + + break; + } + case VolumeState::UNKNOWN: { + return Failure("Volume '" + volumeId + "' is in UNKNOWN state"); + } + + // NOTE: We avoid using a default clause for the following values in + // proto3's open enum to enable the compiler to detect missing enum + // cases for us. See: https://github.com/google/protobuf/issues/3917 + case google::protobuf::kint32min: + case google::protobuf::kint32max: { + UNREACHABLE(); + } + } + + // Second, if the volume has been used by a container before recovery, + // we have to bring the volume back to `PUBLISHED` so data can be + // cleaned up synchronously when needed. + if (volume.state.node_publish_required()) { + futures.push_back(publishVolume(volumeId)); + } + } + + // Garbage collect leftover mount paths that were failed to remove before. + const string mountRootDir = + paths::getMountRootDir(rootDir, info.type(), info.name()); + + Try<list<string>> mountPaths = paths::getMountPaths(mountRootDir); + if (mountPaths.isError()) { + // TODO(chhsiao): This could indicate that something is seriously wrong. + // To help debugging the problem, we should surface the error via + // MESOS-8745. + return Failure( + "Failed to find mount paths for CSI plugin type '" + info.type() + + "' and name '" + info.name() + "': " + mountPaths.error()); + } + + foreach (const string& path, mountPaths.get()) { + Try<string> volumeId = paths::parseMountPath(mountRootDir, path); + if (volumeId.isError()) { + return Failure( + "Failed to parse mount path '" + path + "': " + volumeId.error()); + } + + if (!volumes.contains(volumeId.get())) { + garbageCollectMountPath(volumeId.get()); + } + } + + return process::collect(futures).then([] { return Nothing(); }); + })); } Future<vector<VolumeInfo>> VolumeManagerProcess::listVolumes() { - return Failure("Unimplemented"); + if (!controllerCapabilities->listVolumes) { + return vector<VolumeInfo>(); + } + + // TODO(chhsiao): Set the max entries and use a loop to do multiple + // `ListVolumes` calls. + return call<LIST_VOLUMES>(CONTROLLER_SERVICE, ListVolumesRequest()) + .then(process::defer(self(), [](const ListVolumesResponse& response) { + vector<VolumeInfo> result; + foreach (const auto& entry, response.entries()) { + result.push_back(VolumeInfo{Bytes(entry.volume().capacity_bytes()), + entry.volume().id(), + entry.volume().attributes()}); + } + + return result; + })); } @@ -90,7 +271,18 @@ Future<Bytes> VolumeManagerProcess::getCapacity( const types::VolumeCapability& capability, const Map<string, string>& parameters) { - return Failure("Unimplemented"); + if (!controllerCapabilities->getCapacity) { + return Bytes(0); + } + + GetCapacityRequest request; + *request.add_volume_capabilities() = evolve(capability); + *request.mutable_parameters() = parameters; + + return call<GET_CAPACITY>(CONTROLLER_SERVICE, std::move(request)) + .then([](const GetCapacityResponse& response) { + return Bytes(response.available_capacity()); + }); } @@ -100,7 +292,46 @@ Future<VolumeInfo> VolumeManagerProcess::createVolume( const types::VolumeCapability& capability, const Map<string, string>& parameters) { - return Failure("Unimplemented"); + if (!controllerCapabilities->createDeleteVolume) { + return Failure( + "CREATE_DELETE_VOLUME controller capability is not supported for CSI " + "plugin type '" + info.type() + "' and name '" + info.name()); + } + + LOG(INFO) << "Creating volume with name '" << name << "'"; + + CreateVolumeRequest request; + request.set_name(name); + request.mutable_capacity_range()->set_required_bytes(capacity.bytes()); + request.mutable_capacity_range()->set_limit_bytes(capacity.bytes()); + *request.add_volume_capabilities() = evolve(capability); + *request.mutable_parameters() = parameters; + + // We retry the `CreateVolume` call for MESOS-9517. + return call<CREATE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true) + .then(process::defer(self(), [=]( + const CreateVolumeResponse& response) -> Future<VolumeInfo> { + const string& volumeId = response.volume().id(); + + // NOTE: If the volume is already tracked, there might already be + // operations running in its sequence. Since this continuation runs + // outside the sequence, we fail the call here to avoid any race issue. + // This also means that this call is not idempotent. + if (volumes.contains(volumeId)) { + return Failure("Volume with name '" + name + "' already exists"); + } + + VolumeState volumeState; + volumeState.set_state(VolumeState::CREATED); + *volumeState.mutable_volume_capability() = capability; + *volumeState.mutable_parameters() = parameters; + *volumeState.mutable_volume_attributes() = response.volume().attributes(); + + volumes.put(volumeId, std::move(volumeState)); + checkpointVolumeState(volumeId); + + return VolumeInfo{capacity, volumeId, response.volume().attributes()}; + })); } @@ -109,13 +340,85 @@ Future<Option<Error>> VolumeManagerProcess::validateVolume( const types::VolumeCapability& capability, const Map<string, string>& parameters) { - return Failure("Unimplemented"); + // If the volume has been checkpointed, the validation succeeds only if the + // capability and parameters of the specified profile are the same as those in + // the checkpoint. + if (volumes.contains(volumeInfo.id)) { + const VolumeState& volumeState = volumes.at(volumeInfo.id).state; + + if (volumeState.volume_capability() != capability) { + return Some( + Error("Mismatched capability for volume '" + volumeInfo.id + "'")); + } + + if (volumeState.parameters() != parameters) { + return Some( + Error("Mismatched parameters for volume '" + volumeInfo.id + "'")); + } + + return None(); + } + + if (!parameters.empty()) { + LOG(WARNING) + << "Validating volumes against parameters is not supported in CSI v0"; + } + + LOG(INFO) << "Validating volume '" << volumeInfo.id << "'"; + + ValidateVolumeCapabilitiesRequest request; + request.set_volume_id(volumeInfo.id); + *request.add_volume_capabilities() = evolve(capability); + *request.mutable_volume_attributes() = volumeInfo.context; + + return call<VALIDATE_VOLUME_CAPABILITIES>( + CONTROLLER_SERVICE, std::move(request)) + .then(process::defer(self(), [=]( + const ValidateVolumeCapabilitiesResponse& response) + -> Future<Option<Error>> { + if (!response.supported()) { + return Error( + "Unsupported volume capability for volume '" + volumeInfo.id + + "': " + response.message()); + } + + // NOTE: If the volume is already tracked, there might already be + // operations running in its sequence. Since this continuation runs + // outside the sequence, we fail the call here to avoid any race issue. + // This also means that this call is not idempotent. + if (volumes.contains(volumeInfo.id)) { + return Failure("Volume '" + volumeInfo.id + "' already validated"); + } + + VolumeState volumeState; + volumeState.set_state(VolumeState::CREATED); + *volumeState.mutable_volume_capability() = capability; + *volumeState.mutable_parameters() = parameters; + *volumeState.mutable_volume_attributes() = volumeInfo.context; + + volumes.put(volumeInfo.id, std::move(volumeState)); + checkpointVolumeState(volumeInfo.id); + + return None(); + })); } Future<bool> VolumeManagerProcess::deleteVolume(const string& volumeId) { - return Failure("Unimplemented"); + if (!volumes.contains(volumeId)) { + return __deleteVolume(volumeId); + } + + VolumeData& volume = volumes.at(volumeId); + + LOG(INFO) << "Deleting volume '" << volumeId << "' in " + << volume.state.state() << " state"; + + // Volume deletion is sequentialized with other operations on the same volume + // to avoid races. + return volume.sequence->add(std::function<Future<bool>()>( + process::defer(self(), &Self::_deleteVolume, volumeId))); } @@ -133,7 +436,19 @@ Future<Nothing> VolumeManagerProcess::detachVolume(const string& volumeId) Future<Nothing> VolumeManagerProcess::publishVolume(const string& volumeId) { - return Failure("Unimplemented"); + if (!volumes.contains(volumeId)) { + return Failure("Cannot publish unknown volume '" + volumeId + "'"); + } + + VolumeData& volume = volumes.at(volumeId); + + LOG(INFO) << "Publishing volume '" << volumeId << "' in " + << volume.state.state() << " state"; + + // Volume publishing is serialized with other operations on the same volume to + // avoid races. + return volume.sequence->add(std::function<Future<Nothing>()>( + process::defer(self(), &Self::_publishVolume, volumeId))); } @@ -143,6 +458,692 @@ Future<Nothing> VolumeManagerProcess::unpublishVolume(const string& volumeId) } +template <RPC Rpc> +Future<Response<Rpc>> VolumeManagerProcess::call( + const Service& service, + const Request<Rpc>& request, + const bool retry) // Made immutable in the following mutable lambda. +{ + Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR; + + return process::loop( + self(), + [=] { + // Make the call to the latest service endpoint. + return serviceManager->getServiceEndpoint(service) + .then(process::defer( + self(), &VolumeManagerProcess::_call<Rpc>, lambda::_1, request)); + }, + [=](const Try<Response<Rpc>, StatusError>& result) mutable + -> Future<ControlFlow<Response<Rpc>>> { + Option<Duration> backoff = retry + ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX) + : Option<Duration>::none(); + + maxBackoff = std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX); + + // We dispatch `__call` for testing purpose. + return process::dispatch( + self(), &VolumeManagerProcess::__call<Rpc>, result, backoff); + }); +} + + +template <RPC Rpc> +Future<Try<Response<Rpc>, StatusError>> VolumeManagerProcess::_call( + const string& endpoint, const Request<Rpc>& request) +{ + ++metrics->csi_plugin_rpcs_pending.at(Rpc); + + return Client(endpoint, runtime).call<Rpc>(request) + .onAny(defer(self(), [=]( + const Future<Try<Response<Rpc>, StatusError>>& future) { + --metrics->csi_plugin_rpcs_pending.at(Rpc); + if (future.isReady() && future->isSome()) { + ++metrics->csi_plugin_rpcs_successes.at(Rpc); + } else if (future.isDiscarded()) { + ++metrics->csi_plugin_rpcs_cancelled.at(Rpc); + } else { + ++metrics->csi_plugin_rpcs_errors.at(Rpc); + } + })); +} + + +template <RPC Rpc> +Future<ControlFlow<Response<Rpc>>> VolumeManagerProcess::__call( + const Try<Response<Rpc>, StatusError>& result, + const Option<Duration>& backoff) +{ + if (result.isSome()) { + return Break(result.get()); + } + + if (backoff.isNone()) { + return Failure(result.error()); + } + + // See the link below for retryable status codes: + // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT + switch (result.error().status.error_code()) { + case grpc::DEADLINE_EXCEEDED: + case grpc::UNAVAILABLE: { + LOG(ERROR) << "Received '" << result.error() << "' while calling " << Rpc + << ". Retrying in " << backoff.get(); + + return process::after(backoff.get()) + .then([]() -> Future<ControlFlow<Response<Rpc>>> { + return Continue(); + }); + } + case grpc::CANCELLED: + case grpc::UNKNOWN: + case grpc::INVALID_ARGUMENT: + case grpc::NOT_FOUND: + case grpc::ALREADY_EXISTS: + case grpc::PERMISSION_DENIED: + case grpc::UNAUTHENTICATED: + case grpc::RESOURCE_EXHAUSTED: + case grpc::FAILED_PRECONDITION: + case grpc::ABORTED: + case grpc::OUT_OF_RANGE: + case grpc::UNIMPLEMENTED: + case grpc::INTERNAL: + case grpc::DATA_LOSS: { + return Failure(result.error()); + } + case grpc::OK: + case grpc::DO_NOT_USE: { + UNREACHABLE(); + } + } + + UNREACHABLE(); +} + + +Future<Nothing> VolumeManagerProcess::prepareServices() +{ + CHECK(!services.empty()); + + // Get the plugin capabilities. + return call<GET_PLUGIN_CAPABILITIES>( + *services.begin(), GetPluginCapabilitiesRequest()) + .then(process::defer(self(), [=]( + const GetPluginCapabilitiesResponse& response) -> Future<Nothing> { + pluginCapabilities = response.capabilities(); + + if (services.contains(CONTROLLER_SERVICE) && + !pluginCapabilities->controllerService) { + return Failure( + "CONTROLLER_SERVICE plugin capability is not supported for CSI " + "plugin type '" + info.type() + "' and name '" + info.name() + "'"); + } + + return Nothing(); + })) + // Check if all services have consistent plugin infos. + .then(process::defer(self(), [this] { + vector<Future<GetPluginInfoResponse>> futures; + foreach (const Service& service, services) { + futures.push_back( + call<GET_PLUGIN_INFO>(CONTROLLER_SERVICE, GetPluginInfoRequest()) + .onReady([service](const GetPluginInfoResponse& response) { + LOG(INFO) << service << " loaded: " << stringify(response); + })); + } + + return process::collect(futures) + .then([](const vector<GetPluginInfoResponse>& pluginInfos) { + for (size_t i = 1; i < pluginInfos.size(); ++i) { + if (pluginInfos[i].name() != pluginInfos[0].name() || + pluginInfos[i].vendor_version() != + pluginInfos[0].vendor_version()) { + LOG(WARNING) << "Inconsistent plugin services. Please check with " + "the plugin vendor to ensure compatibility."; + } + } + + return Nothing(); + }); + })) + // Get the controller capabilities. + .then(process::defer(self(), [this]() -> Future<Nothing> { + if (!services.contains(CONTROLLER_SERVICE)) { + controllerCapabilities = ControllerCapabilities(); + return Nothing(); + } + + return call<CONTROLLER_GET_CAPABILITIES>( + CONTROLLER_SERVICE, ControllerGetCapabilitiesRequest()) + .then(process::defer(self(), [this]( + const ControllerGetCapabilitiesResponse& response) { + controllerCapabilities = response.capabilities(); + return Nothing(); + })); + })) + // Get the node capabilities and ID. + .then(process::defer(self(), [this]() -> Future<Nothing> { + if (!services.contains(NODE_SERVICE)) { + nodeCapabilities = NodeCapabilities(); + return Nothing(); + } + + return call<NODE_GET_CAPABILITIES>( + NODE_SERVICE, NodeGetCapabilitiesRequest()) + .then(process::defer(self(), [this]( + const NodeGetCapabilitiesResponse& response) -> Future<Nothing> { + nodeCapabilities = response.capabilities(); + + if (controllerCapabilities->publishUnpublishVolume) { + return call<NODE_GET_ID>(NODE_SERVICE, NodeGetIdRequest()) + .then(process::defer(self(), [this]( + const NodeGetIdResponse& response) { + nodeId = response.node_id(); + return Nothing(); + })); + } + + return Nothing(); + })); + })); +} + + +Future<bool> VolumeManagerProcess::_deleteVolume(const std::string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.node_publish_required()) { + CHECK_EQ(VolumeState::PUBLISHED, volumeState.state()); + + const string targetPath = paths::getMountTargetPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + // NOTE: Normally the volume should have been cleaned up. However this may + // not be true for preprovisioned volumes (e.g., leftover from a previous + // resource provider instance). To prevent data leakage in such cases, we + // clean up the data (but not the target path) here. + Try<Nothing> rmdir = os::rmdir(targetPath, true, false); + if (rmdir.isError()) { + return Failure( + "Failed to clean up volume '" + volumeId + "': " + rmdir.error()); + } + + volumeState.set_node_publish_required(false); + checkpointVolumeState(volumeId); + } + + if (volumeState.state() != VolumeState::CREATED) { + // Retry after transitioning the volume to `CREATED` state. + return _detachVolume(volumeId) + .then(process::defer(self(), &Self::_deleteVolume, volumeId)); + } + + // NOTE: The last asynchronous continuation, which is supposed to be run in + // the volume's sequence, would cause the sequence to be destructed, which + // would in turn discard the returned future. However, since the continuation + // would have already been run, the returned future will become ready, making + // the future returned by the sequence ready as well. + return __deleteVolume(volumeId) + .then(process::defer(self(), [this, volumeId](bool deleted) { + volumes.erase(volumeId); + + const string volumePath = + paths::getVolumePath(rootDir, info.type(), info.name(), volumeId); + + Try<Nothing> rmdir = os::rmdir(volumePath); + CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '" + << volumePath << "': " << rmdir.error(); + + garbageCollectMountPath(volumeId); + + return deleted; + })); +} + + +Future<bool> VolumeManagerProcess::__deleteVolume( + const string& volumeId) +{ + if (!controllerCapabilities->createDeleteVolume) { + return false; + } + + LOG(INFO) << "Calling '/csi.v0.Controller/DeleteVolume' for volume '" + << volumeId << "'"; + + DeleteVolumeRequest request; + request.set_volume_id(volumeId); + + // We retry the `DeleteVolume` call for MESOS-9517. + return call<DELETE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true) + .then([] { return true; }); +} + + +Future<Nothing> VolumeManagerProcess::_attachVolume(const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.state() == VolumeState::NODE_READY) { + return Nothing(); + } + + if (volumeState.state() != VolumeState::CREATED && + volumeState.state() != VolumeState::CONTROLLER_PUBLISH && + volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) { + return Failure( + "Cannot attach volume '" + volumeId + "' in " + + stringify(volumeState.state()) + " state"); + } + + if (!controllerCapabilities->publishUnpublishVolume) { + // Since this is a no-op, no need to checkpoint here. + volumeState.set_state(VolumeState::NODE_READY); + return Nothing(); + } + + // A previously failed `ControllerUnpublishVolume` call can be recovered + // through an extra `ControllerUnpublishVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerunpublishvolume // NOLINT + if (volumeState.state() == VolumeState::CONTROLLER_UNPUBLISH) { + // Retry after recovering the volume to `CREATED` state. + return _detachVolume(volumeId) + .then(process::defer(self(), &Self::_attachVolume, volumeId)); + } + + if (volumeState.state() == VolumeState::CREATED) { + volumeState.set_state(VolumeState::CONTROLLER_PUBLISH); + checkpointVolumeState(volumeId); + } + + LOG(INFO) + << "Calling '/csi.v0.Controller/ControllerPublishVolume' for volume '" + << volumeId << "'"; + + ControllerPublishVolumeRequest request; + request.set_volume_id(volumeId); + request.set_node_id(CHECK_NOTNONE(nodeId)); + *request.mutable_volume_capability() = + evolve(volumeState.volume_capability()); + request.set_readonly(false); + *request.mutable_volume_attributes() = volumeState.volume_attributes(); + + return call<CONTROLLER_PUBLISH_VOLUME>(CONTROLLER_SERVICE, std::move(request)) + .then(process::defer(self(), [this, volumeId]( + const ControllerPublishVolumeResponse& response) { + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + volumeState.set_state(VolumeState::NODE_READY); + *volumeState.mutable_publish_info() = response.publish_info(); + + checkpointVolumeState(volumeId); + + return Nothing(); + })); +} + + +Future<Nothing> VolumeManagerProcess::_detachVolume(const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.state() == VolumeState::CREATED) { + return Nothing(); + } + + if (volumeState.state() != VolumeState::NODE_READY && + volumeState.state() != VolumeState::CONTROLLER_PUBLISH && + volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) { + // Retry after transitioning the volume to `CREATED` state. + return _unpublishVolume(volumeId) + .then(process::defer(self(), &Self::_detachVolume, volumeId)); + } + + if (!controllerCapabilities->publishUnpublishVolume) { + // Since this is a no-op, no need to checkpoint here. + volumeState.set_state(VolumeState::CREATED); + return Nothing(); + } + + // A previously failed `ControllerPublishVolume` call can be recovered through + // the current `ControllerUnpublishVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT + if (volumeState.state() == VolumeState::NODE_READY || + volumeState.state() == VolumeState::CONTROLLER_PUBLISH) { + volumeState.set_state(VolumeState::CONTROLLER_UNPUBLISH); + checkpointVolumeState(volumeId); + } + + LOG(INFO) + << "Calling '/csi.v0.Controller/ControllerUnpublishVolume' for volume '" + << volumeId << "'"; + + ControllerUnpublishVolumeRequest request; + request.set_volume_id(volumeId); + request.set_node_id(CHECK_NOTNONE(nodeId)); + + return call<CONTROLLER_UNPUBLISH_VOLUME>( + CONTROLLER_SERVICE, std::move(request)) + .then(process::defer(self(), [this, volumeId] { + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + volumeState.set_state(VolumeState::CREATED); + volumeState.mutable_publish_info()->clear(); + + checkpointVolumeState(volumeId); + + return Nothing(); + })); +} + + +Future<Nothing> VolumeManagerProcess::_publishVolume(const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.state() == VolumeState::PUBLISHED) { + CHECK(volumeState.node_publish_required()); + return Nothing(); + } + + if (volumeState.state() != VolumeState::VOL_READY && + volumeState.state() != VolumeState::NODE_PUBLISH && + volumeState.state() != VolumeState::NODE_UNPUBLISH) { + // Retry after transitioning the volume to `VOL_READY` state. + return __publishVolume(volumeId) + .then(process::defer(self(), &Self::_publishVolume, volumeId)); + } + + // A previously failed `NodeUnpublishVolume` call can be recovered through an + // extra `NodeUnpublishVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunpublishvolume // NOLINT + if (volumeState.state() == VolumeState::NODE_UNPUBLISH) { + // Retry after recovering the volume to `VOL_READY` state. + return __unpublishVolume(volumeId) + .then(process::defer(self(), &Self::_publishVolume, volumeId)); + } + + const string targetPath = paths::getMountTargetPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + // NOTE: The target path will be cleaned up during volume removal. + Try<Nothing> mkdir = os::mkdir(targetPath); + if (mkdir.isError()) { + return Failure( + "Failed to create mount target path '" + targetPath + + "': " + mkdir.error()); + } + + if (volumeState.state() == VolumeState::VOL_READY) { + volumeState.set_state(VolumeState::NODE_PUBLISH); + checkpointVolumeState(volumeId); + } + + LOG(INFO) << "Calling '/csi.v0.Node/NodePublishVolume' for volume '" + << volumeId << "'"; + + NodePublishVolumeRequest request; + request.set_volume_id(volumeId); + *request.mutable_publish_info() = volumeState.publish_info(); + request.set_target_path(targetPath); + *request.mutable_volume_capability() = + evolve(volumeState.volume_capability()); + request.set_readonly(false); + *request.mutable_volume_attributes() = volumeState.volume_attributes(); + + if (nodeCapabilities->stageUnstageVolume) { + const string stagingPath = paths::getMountStagingPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + CHECK(os::exists(stagingPath)); + request.set_staging_target_path(stagingPath); + } + + return call<NODE_PUBLISH_VOLUME>(NODE_SERVICE, std::move(request)) + .then(defer(self(), [this, volumeId, targetPath] { + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + volumeState.set_state(VolumeState::PUBLISHED); + + // NOTE: This is the first time a container is going to consume the + // persistent volume, so the `node_publish_required` field is set to + // indicate that this volume must remain published so it can be + // synchronously cleaned up when the persistent volume is destroyed. + volumeState.set_node_publish_required(true); + + checkpointVolumeState(volumeId); + + return Nothing(); + })); +} + + +Future<Nothing> VolumeManagerProcess::__publishVolume(const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.state() == VolumeState::VOL_READY) { + CHECK(!volumeState.boot_id().empty()); + return Nothing(); + } + + if (volumeState.state() != VolumeState::NODE_READY && + volumeState.state() != VolumeState::NODE_STAGE && + volumeState.state() != VolumeState::NODE_UNSTAGE) { + // Retry after transitioning the volume to `NODE_READY` state. + return _attachVolume(volumeId) + .then(process::defer(self(), &Self::__publishVolume, volumeId)); + } + + if (!nodeCapabilities->stageUnstageVolume) { + // Since this is a no-op, no need to checkpoint here. + volumeState.set_state(VolumeState::VOL_READY); + volumeState.set_boot_id(CHECK_NOTNONE(bootId)); + return Nothing(); + } + + // A previously failed `NodeUnstageVolume` call can be recovered through an + // extra `NodeUnstageVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunstagevolume // NOLINT + if (volumeState.state() == VolumeState::NODE_UNSTAGE) { + // Retry after recovering the volume to `NODE_READY` state. + return _unpublishVolume(volumeId) + .then(process::defer(self(), &Self::__publishVolume, volumeId)); + } + + const string stagingPath = paths::getMountStagingPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + // NOTE: The staging path will be cleaned up in during volume removal. + Try<Nothing> mkdir = os::mkdir(stagingPath); + if (mkdir.isError()) { + return Failure( + "Failed to create mount staging path '" + stagingPath + + "': " + mkdir.error()); + } + + if (volumeState.state() == VolumeState::NODE_READY) { + volumeState.set_state(VolumeState::NODE_STAGE); + checkpointVolumeState(volumeId); + } + + LOG(INFO) << "Calling '/csi.v0.Node/NodeStageVolume' for volume '" << volumeId + << "'"; + + NodeStageVolumeRequest request; + request.set_volume_id(volumeId); + *request.mutable_publish_info() = volumeState.publish_info(); + request.set_staging_target_path(stagingPath); + *request.mutable_volume_capability() = + evolve(volumeState.volume_capability()); + *request.mutable_volume_attributes() = volumeState.volume_attributes(); + + return call<NODE_STAGE_VOLUME>(NODE_SERVICE, std::move(request)) + .then(process::defer(self(), [this, volumeId] { + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + volumeState.set_state(VolumeState::VOL_READY); + volumeState.set_boot_id(CHECK_NOTNONE(bootId)); + + checkpointVolumeState(volumeId); + + return Nothing(); + })); +} + + +Future<Nothing> VolumeManagerProcess::_unpublishVolume(const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.state() == VolumeState::NODE_READY) { + CHECK(volumeState.boot_id().empty()); + return Nothing(); + } + + if (volumeState.state() != VolumeState::VOL_READY && + volumeState.state() != VolumeState::NODE_STAGE && + volumeState.state() != VolumeState::NODE_UNSTAGE) { + // Retry after transitioning the volume to `VOL_READY` state. + return __unpublishVolume(volumeId) + .then(process::defer(self(), &Self::_unpublishVolume, volumeId)); + } + + if (!nodeCapabilities->stageUnstageVolume) { + // Since this is a no-op, no need to checkpoint here. + volumeState.set_state(VolumeState::NODE_READY); + volumeState.clear_boot_id(); + return Nothing(); + } + + // A previously failed `NodeStageVolume` call can be recovered through the + // current `NodeUnstageVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT + if (volumeState.state() == VolumeState::VOL_READY || + volumeState.state() == VolumeState::NODE_STAGE) { + volumeState.set_state(VolumeState::NODE_UNSTAGE); + checkpointVolumeState(volumeId); + } + + const string stagingPath = paths::getMountStagingPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + CHECK(os::exists(stagingPath)); + + LOG(INFO) << "Calling '/csi.v0.Node/NodeUnstageVolume' for volume '" + << volumeId << "'"; + + NodeUnstageVolumeRequest request; + request.set_volume_id(volumeId); + request.set_staging_target_path(stagingPath); + + return call<NODE_UNSTAGE_VOLUME>(NODE_SERVICE, std::move(request)) + .then(process::defer(self(), [this, volumeId] { + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + volumeState.set_state(VolumeState::NODE_READY); + volumeState.clear_boot_id(); + + checkpointVolumeState(volumeId); + + return Nothing(); + })); +} + + +Future<Nothing> VolumeManagerProcess::__unpublishVolume(const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.state() == VolumeState::VOL_READY) { + return Nothing(); + } + + if (volumeState.state() != VolumeState::PUBLISHED && + volumeState.state() != VolumeState::NODE_PUBLISH && + volumeState.state() != VolumeState::NODE_UNPUBLISH) { + return Failure( + "Cannot unpublish volume '" + volumeId + "' in " + + stringify(volumeState.state()) + "state"); + } + + // A previously failed `NodePublishVolume` call can be recovered through the + // current `NodeUnpublishVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT + if (volumeState.state() == VolumeState::PUBLISHED || + volumeState.state() == VolumeState::NODE_PUBLISH) { + volumeState.set_state(VolumeState::NODE_UNPUBLISH); + checkpointVolumeState(volumeId); + } + + const string targetPath = paths::getMountTargetPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + CHECK(os::exists(targetPath)); + + LOG(INFO) << "Calling '/csi.v0.Node/NodeUnpublishVolume' for volume '" + << volumeId << "'"; + + NodeUnpublishVolumeRequest request; + request.set_volume_id(volumeId); + request.set_target_path(targetPath); + + return call<NODE_UNPUBLISH_VOLUME>(NODE_SERVICE, std::move(request)) + .then(process::defer(self(), [this, volumeId] { + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + volumeState.set_state(VolumeState::VOL_READY); + + checkpointVolumeState(volumeId); + + return Nothing(); + })); +} + + +void VolumeManagerProcess::checkpointVolumeState(const string& volumeId) +{ + const string statePath = + paths::getVolumeStatePath(rootDir, info.type(), info.name(), volumeId); + + // NOTE: We ensure the checkpoint is synced to the filesystem to avoid + // resulting in a stale or empty checkpoint when a system crash happens. + Try<Nothing> checkpoint = + slave::state::checkpoint(statePath, volumes.at(volumeId).state, true); + + CHECK_SOME(checkpoint) + << "Failed to checkpoint volume state to '" << statePath << "':" + << checkpoint.error(); +} + + +void VolumeManagerProcess::garbageCollectMountPath(const string& volumeId) +{ + CHECK(!volumes.contains(volumeId)); + + const string path = paths::getMountPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + if (os::exists(path)) { + Try<Nothing> rmdir = os::rmdir(path); + if (rmdir.isError()) { + LOG(ERROR) << "Failed to remove directory '" << path + << "': " << rmdir.error(); + } + } +} + + VolumeManager::VolumeManager( const http::URL& agentUrl, const string& rootDir, diff --git a/src/csi/v0_volume_manager_process.hpp b/src/csi/v0_volume_manager_process.hpp index 9db99de..214fc1f 100644 --- a/src/csi/v0_volume_manager_process.hpp +++ b/src/csi/v0_volume_manager_process.hpp @@ -29,17 +29,25 @@ #include <process/future.hpp> #include <process/grpc.hpp> #include <process/http.hpp> +#include <process/loop.hpp> #include <process/owned.hpp> #include <process/process.hpp> +#include <process/sequence.hpp> #include <stout/bytes.hpp> +#include <stout/duration.hpp> #include <stout/error.hpp> +#include <stout/hashmap.hpp> #include <stout/hashset.hpp> #include <stout/nothing.hpp> #include <stout/option.hpp> +#include <stout/try.hpp> #include "csi/metrics.hpp" +#include "csi/rpc.hpp" #include "csi/service_manager.hpp" +#include "csi/state.hpp" +#include "csi/utils.hpp" #include "csi/v0_volume_manager.hpp" #include "csi/volume_manager.hpp" @@ -47,6 +55,16 @@ namespace mesos { namespace csi { namespace v0 { +// The CSI volume manager initially picks a random amount of time between +// `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI calls. +// Subsequent retries are exponentially backed off based on this interval (e.g., +// 2nd retry uses a random value between `[0, b * 2^1]`, 3rd retry between +// `[0, b * 2^2]`, etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`. +// +// TODO(chhsiao): Make the retry parameters configurable. +constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10); +constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10); + class VolumeManagerProcess : public process::Process<VolumeManagerProcess> { @@ -90,7 +108,76 @@ public: process::Future<Nothing> unpublishVolume(const std::string& volumeId); + // Wrapper functions to make CSI calls and update RPC metrics. Made public for + // testing purpose. + // + // The call is made asynchronously and thus no guarantee is provided on the + // order in which calls are sent. Callers need to either ensure to not have + // multiple conflicting calls in flight, or treat results idempotently. + // + // NOTE: We currently ensure this by 1) resource locking to forbid concurrent + // calls on the same volume, and 2) no profile update while there are ongoing + // `CREATE_DISK` or `DESTROY_DISK` operations. + template <RPC Rpc> + process::Future<Response<Rpc>> call( + const Service& service, const Request<Rpc>& request, bool retry = false); + + template <RPC Rpc> + process::Future<Try<Response<Rpc>, process::grpc::StatusError>> + _call(const std::string& endpoint, const Request<Rpc>& request); + + template <RPC Rpc> + process::Future<process::ControlFlow<Response<Rpc>>> __call( + const Try<Response<Rpc>, process::grpc::StatusError>& result, + const Option<Duration>& backoff); + private: + process::Future<Nothing> prepareServices(); + + process::Future<bool> _deleteVolume(const std::string& volumeId); + process::Future<bool> __deleteVolume(const std::string& volumeId); + + // The following methods are used to manage volume lifecycles. Transient + // states are omitted. + // + // +------------+ + // + + + | CREATED | ^ + // _attachVolume | | | +---+----^---+ | + // | | | | | | _detachVolume + // | | | +---v----+---+ | + // v + + | NODE_READY | + ^ + // | | +---+----^---+ | | + // __publishVolume | | | | | | _unpublishVolume + // | | +---v----+---+ | | + // v + | VOL_READY | + + ^ + // | +---+----^---+ | | | + // _publishVolume | | | | | | __unpublishVolume + // | +---v----+---+ | | | + // V | PUBLISHED | + + + + // +------------+ + + // Transition a volume to `NODE_READY` state from any state above. + process::Future<Nothing> _attachVolume(const std::string& volumeId); + + // Transition a volume to `CREATED` state from any state below. + process::Future<Nothing> _detachVolume(const std::string& volumeId); + + // Transition a volume to `PUBLISHED` state from any state above. + process::Future<Nothing> _publishVolume(const std::string& volumeId); + + // Transition a volume to `VOL_READY` state from any state above. + process::Future<Nothing> __publishVolume(const std::string& volumeId); + + // Transition a volume to `NODE_READY` state from any state below. + process::Future<Nothing> _unpublishVolume(const std::string& volumeId); + + // Transition a volume to `VOL_READY` state from any state below. + process::Future<Nothing> __unpublishVolume(const std::string& volumeId); + + void checkpointVolumeState(const std::string& volumeId); + + void garbageCollectMountPath(const std::string& volumeId); + const std::string rootDir; const CSIPluginInfo info; const hashset<Service> services; @@ -98,6 +185,26 @@ private: process::grpc::client::Runtime runtime; Metrics* metrics; process::Owned<ServiceManager> serviceManager; + + Option<std::string> bootId; + Option<PluginCapabilities> pluginCapabilities; + Option<ControllerCapabilities> controllerCapabilities; + Option<NodeCapabilities> nodeCapabilities; + Option<std::string> nodeId; + + struct VolumeData + { + VolumeData(state::VolumeState&& _state) + : state(_state), sequence(new process::Sequence("csi-volume-sequence")) {} + + state::VolumeState state; + + // We call all CSI operations on the same volume in a sequence to ensure + // that they are processed in a sequential order. + process::Owned<process::Sequence> sequence; + }; + + hashmap<std::string, VolumeData> volumes; }; } // namespace v0 { diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index c5a5213..fba5b18 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -18,29 +18,24 @@ #include <algorithm> #include <cctype> -#include <cstdlib> #include <functional> #include <list> #include <memory> #include <numeric> #include <queue> -#include <type_traits> #include <utility> #include <vector> #include <glog/logging.h> -#include <process/after.hpp> #include <process/collect.hpp> #include <process/defer.hpp> #include <process/delay.hpp> -#include <process/dispatch.hpp> #include <process/future.hpp> #include <process/grpc.hpp> #include <process/id.hpp> #include <process/loop.hpp> #include <process/process.hpp> -#include <process/sequence.hpp> #include <process/metrics/counter.hpp> #include <process/metrics/metrics.hpp> @@ -74,13 +69,8 @@ #include "common/protobuf_utils.hpp" #include "common/resources_utils.hpp" -#include "csi/client.hpp" #include "csi/metrics.hpp" #include "csi/paths.hpp" -#include "csi/rpc.hpp" -#include "csi/service_manager.hpp" -#include "csi/state.hpp" -#include "csi/utils.hpp" #include "csi/volume_manager.hpp" #include "internal/devolve.hpp" @@ -98,10 +88,6 @@ namespace http = process::http; -// TODO(chhsiao): Remove `using namespace` statements after refactoring. -using namespace mesos::csi; -using namespace mesos::csi::v0; - using std::accumulate; using std::find; using std::list; @@ -110,24 +96,18 @@ using std::shared_ptr; using std::string; using std::vector; -using google::protobuf::Map; - -using process::after; using process::await; -using process::Break; using process::collect; using process::Continue; using process::ControlFlow; using process::defer; using process::delay; -using process::dispatch; using process::Failure; using process::Future; using process::loop; using process::Owned; using process::ProcessBase; using process::Promise; -using process::Sequence; using process::spawn; using process::grpc::StatusError; @@ -137,9 +117,8 @@ using process::http::authentication::Principal; using process::metrics::Counter; using process::metrics::PushGauge; -using mesos::csi::ServiceManager; - -using mesos::csi::state::VolumeState; +using mesos::csi::VolumeInfo; +using mesos::csi::VolumeManager; using mesos::internal::protobuf::convertLabelsToStringMap; using mesos::internal::protobuf::convertStringMapToLabels; @@ -339,140 +318,31 @@ void StorageLocalResourceProviderProcess::received(const Event& event) } -template < - csi::v0::RPC rpc, - typename std::enable_if<rpc != csi::v0::PROBE, int>::type> -Future<csi::v0::Response<rpc>> StorageLocalResourceProviderProcess::call( - const csi::Service& service, - const csi::v0::Request<rpc>& request, - const bool retry) // Make immutable in the following mutable lambda. -{ - Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR; - - return loop( - self(), - [=] { - // Make the call to the latest service endpoint. - return serviceManager->getServiceEndpoint(service) - .then(defer( - self(), - &StorageLocalResourceProviderProcess::_call<rpc>, - lambda::_1, - request)); - }, - [=](const Try<csi::v0::Response<rpc>, StatusError>& result) mutable - -> Future<ControlFlow<csi::v0::Response<rpc>>> { - Option<Duration> backoff = retry - ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX) - : Option<Duration>::none(); - - maxBackoff = std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX); - - // We dispatch `__call` for testing purpose. - return dispatch( - self(), - &StorageLocalResourceProviderProcess::__call<rpc>, - result, - backoff); - }); -} - - -template <csi::v0::RPC rpc> -Future<Try<csi::v0::Response<rpc>, StatusError>> -StorageLocalResourceProviderProcess::_call( - const string& endpoint, const csi::v0::Request<rpc>& request) -{ - ++metrics.csi_plugin_rpcs_pending.at(rpc); - - return csi::v0::Client(endpoint, runtime).call<rpc>(request) - .onAny(defer(self(), [=]( - const Future<Try<csi::v0::Response<rpc>, StatusError>>& future) { - --metrics.csi_plugin_rpcs_pending.at(rpc); - if (future.isReady() && future->isSome()) { - ++metrics.csi_plugin_rpcs_successes.at(rpc); - } else if (future.isDiscarded()) { - ++metrics.csi_plugin_rpcs_cancelled.at(rpc); - } else { - ++metrics.csi_plugin_rpcs_errors.at(rpc); - } - })); -} - - -template <csi::v0::RPC rpc> -Future<ControlFlow<csi::v0::Response<rpc>>> -StorageLocalResourceProviderProcess::__call( - const Try<csi::v0::Response<rpc>, StatusError>& result, - const Option<Duration>& backoff) -{ - if (result.isSome()) { - return Break(result.get()); - } - - if (backoff.isNone()) { - return Failure(result.error()); - } - - // See the link below for retryable status codes: - // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT - switch (result.error().status.error_code()) { - case grpc::DEADLINE_EXCEEDED: - case grpc::UNAVAILABLE: { - LOG(ERROR) - << "Received '" << result.error() << "' while calling " << rpc - << ". Retrying in " << backoff.get(); - - return after(backoff.get()) - .then([]() -> Future<ControlFlow<csi::v0::Response<rpc>>> { - return Continue(); - }); - } - case grpc::CANCELLED: - case grpc::UNKNOWN: - case grpc::INVALID_ARGUMENT: - case grpc::NOT_FOUND: - case grpc::ALREADY_EXISTS: - case grpc::PERMISSION_DENIED: - case grpc::UNAUTHENTICATED: - case grpc::RESOURCE_EXHAUSTED: - case grpc::FAILED_PRECONDITION: - case grpc::ABORTED: - case grpc::OUT_OF_RANGE: - case grpc::UNIMPLEMENTED: - case grpc::INTERNAL: - case grpc::DATA_LOSS: { - return Failure(result.error()); - } - case grpc::OK: - case grpc::DO_NOT_USE: { - UNREACHABLE(); - } - } - - UNREACHABLE(); -} - - void StorageLocalResourceProviderProcess::initialize() { const Principal principal = LocalResourceProvider::principal(info); CHECK(principal.claims.contains("cid_prefix")); const string& containerPrefix = principal.claims.at("cid_prefix"); - rootDir = slave::paths::getCsiRootDir(workDir); - pluginInfo = info.storage().plugin(); - services = {CONTROLLER_SERVICE, NODE_SERVICE}; - - serviceManager.reset(new ServiceManager( + Try<Owned<VolumeManager>> volumeManager_ = VolumeManager::create( extractParentEndpoint(url), - rootDir, - pluginInfo, - services, + slave::paths::getCsiRootDir(workDir), + info.storage().plugin(), + {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE}, containerPrefix, authToken, - runtime, - &metrics)); + &metrics); + + if (volumeManager_.isError()) { + LOG(ERROR) + << "Failed to create CSI volume manager for resource provider with type '" + << info.type() << "' and name '" << info.name() + << "': " << volumeManager_.error(); + + fatal(); + } + + volumeManager = std::move(volumeManager_).get(); auto die = [=](const string& message) { LOG(ERROR) @@ -503,7 +373,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() { CHECK_EQ(RECOVERING, state); - return recoverVolumes() + return volumeManager->recover() .then(defer(self(), [=]() -> Future<Nothing> { // Recover the resource provider ID and state from the latest symlink. If // the symlink does not exist, this is a new resource provider, and the @@ -604,147 +474,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() } -Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes() -{ - Try<string> bootId_ = os::bootId(); - if (bootId_.isError()) { - return Failure("Failed to get boot ID: " + bootId_.error()); - } - - bootId = bootId_.get(); - - return serviceManager->recover() - .then(process::defer(self(), &Self::prepareServices)) - .then(process::defer(self(), [this]() -> Future<Nothing> { - // Recover the states of CSI volumes. - Try<list<string>> volumePaths = - paths::getVolumePaths(rootDir, pluginInfo.type(), pluginInfo.name()); - - if (volumePaths.isError()) { - return Failure( - "Failed to find volumes for CSI plugin type '" + pluginInfo.type() + - "' and name '" + pluginInfo.name() + "': " + volumePaths.error()); - } - - vector<Future<Nothing>> futures; - - foreach (const string& path, volumePaths.get()) { - Try<paths::VolumePath> volumePath = - paths::parseVolumePath(rootDir, path); - - if (volumePath.isError()) { - return Failure( - "Failed to parse volume path '" + path + - "': " + volumePath.error()); - } - - CHECK_EQ(pluginInfo.type(), volumePath->type); - CHECK_EQ(pluginInfo.name(), volumePath->name); - - const string& volumeId = volumePath->volumeId; - const string statePath = paths::getVolumeStatePath( - rootDir, pluginInfo.type(), pluginInfo.name(), volumeId); - - if (!os::exists(statePath)) { - continue; - } - - Result<VolumeState> volumeState = - slave::state::read<VolumeState>(statePath); - - if (volumeState.isError()) { - return Failure( - "Failed to read volume state from '" + statePath + - "': " + volumeState.error()); - } - - if (volumeState.isNone()) { - continue; - } - - volumes.put(volumeId, std::move(volumeState.get())); - VolumeData& volume = volumes.at(volumeId); - - if (!VolumeState::State_IsValid(volume.state.state())) { - return Failure("Volume '" + volumeId + "' is in INVALID state"); - } - - // First, if there is a node reboot after the volume is made - // publishable, it should be reset to `NODE_READY`. - switch (volume.state.state()) { - case VolumeState::CREATED: - case VolumeState::NODE_READY: - case VolumeState::CONTROLLER_PUBLISH: - case VolumeState::CONTROLLER_UNPUBLISH: - case VolumeState::NODE_STAGE: { - break; - } - case VolumeState::VOL_READY: - case VolumeState::PUBLISHED: - case VolumeState::NODE_UNSTAGE: - case VolumeState::NODE_PUBLISH: - case VolumeState::NODE_UNPUBLISH: { - if (bootId != volume.state.boot_id()) { - // Since this is a no-op, no need to checkpoint here. - volume.state.set_state(VolumeState::NODE_READY); - volume.state.clear_boot_id(); - } - - break; - } - case VolumeState::UNKNOWN: { - return Failure("Volume '" + volumeId + "' is in UNKNOWN state"); - } - - // NOTE: We avoid using a default clause for the following values in - // proto3's open enum to enable the compiler to detect missing enum - // cases for us. See: https://github.com/google/protobuf/issues/3917 - case google::protobuf::kint32min: - case google::protobuf::kint32max: { - UNREACHABLE(); - } - } - - // Second, if the volume has been used by a container before recovery, - // we have to bring the volume back to `PUBLISHED` so data can be - // cleaned up synchronously when needed. - if (volume.state.node_publish_required()) { - futures.push_back(publishVolume(volumeId)); - } - } - - // Garbage collect leftover mount paths that were failed to remove before. - const string mountRootDir = - paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()); - - Try<list<string>> mountPaths = paths::getMountPaths(mountRootDir); - if (mountPaths.isError()) { - // TODO(chhsiao): This could indicate that something is seriously wrong. - // To help debugging the problem, we should surface the error via - // MESOS-8745. - return Failure( - "Failed to find mount paths for CSI plugin type '" + - pluginInfo.type() + "' and name '" + pluginInfo.name() + - "': " + mountPaths.error()); - } - - foreach (const string& path, mountPaths.get()) { - Try<string> volumeId = paths::parseMountPath(mountRootDir, path); - if (volumeId.isError()) { - return Failure( - "Failed to parse mount path '" + path + "': " + volumeId.error()); - } - - if (!volumes.contains(volumeId.get())) { - garbageCollectMountPath(volumeId.get()); - } - } - - return process::collect(futures).then([] { return Nothing(); }); - })); -} - - void StorageLocalResourceProviderProcess::doReliableRegistration() { if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) { @@ -1113,7 +842,7 @@ Future<Resources> StorageLocalResourceProviderProcess::getRawVolumes() { CHECK(info.has_id()); - return listVolumes() + return volumeManager->listVolumes() .then(defer(self(), [=](const vector<VolumeInfo>& volumeInfos) { Resources resources; @@ -1156,15 +885,15 @@ Future<Resources> StorageLocalResourceProviderProcess::getStoragePools() foreachpair (const string& profile, const DiskProfileAdaptor::ProfileInfo& profileInfo, profileInfos) { - futures.push_back( - getCapacity(profileInfo.capability, profileInfo.parameters) - .then(defer(self(), [=](const Bytes& capacity) -> Resources { - if (capacity == 0) { - return Resources(); - } + futures.push_back(volumeManager->getCapacity( + profileInfo.capability, profileInfo.parameters) + .then(defer(self(), [=](const Bytes& capacity) -> Resources { + if (capacity == 0) { + return Resources(); + } - return createRawDiskResource(info, capacity, profile, vendor); - }))); + return createRawDiskResource(info, capacity, profile, vendor); + }))); } return collect(futures) @@ -1395,7 +1124,6 @@ void StorageLocalResourceProviderProcess::publishResources( case Resource::DiskInfo::Source::MOUNT: case Resource::DiskInfo::Source::BLOCK: { CHECK(resource.disk().source().has_id()); - CHECK(volumes.contains(resource.disk().source().id())); volumeIds.insert(resource.disk().source().id()); break; } @@ -1418,7 +1146,7 @@ void StorageLocalResourceProviderProcess::publishResources( vector<Future<Nothing>> futures; foreach (const string& volumeId, volumeIds) { - futures.push_back(publishVolume(volumeId)); + futures.push_back(volumeManager->publishVolume(volumeId)); } allPublished = collect(futures); @@ -1526,766 +1254,6 @@ void StorageLocalResourceProviderProcess::reconcileOperations( } -Future<Nothing> StorageLocalResourceProviderProcess::prepareServices() -{ - CHECK(!services.empty()); - - // Get the plugin capabilities. - return call<GET_PLUGIN_CAPABILITIES>( - *services.begin(), GetPluginCapabilitiesRequest()) - .then(process::defer(self(), [=]( - const GetPluginCapabilitiesResponse& response) -> Future<Nothing> { - pluginCapabilities = response.capabilities(); - - if (services.contains(CONTROLLER_SERVICE) && - !pluginCapabilities->controllerService) { - return Failure( - "CONTROLLER_SERVICE plugin capability is not supported for CSI " - "plugin type '" + - pluginInfo.type() + "' and name '" + pluginInfo.name() + "'"); - } - - return Nothing(); - })) - // Check if all services have consistent plugin infos. - .then(process::defer(self(), [this] { - vector<Future<GetPluginInfoResponse>> futures; - foreach (const Service& service, services) { - futures.push_back( - call<GET_PLUGIN_INFO>(CONTROLLER_SERVICE, GetPluginInfoRequest()) - .onReady([service](const GetPluginInfoResponse& response) { - LOG(INFO) << service << " loaded: " << stringify(response); - })); - } - - return process::collect(futures) - .then([](const vector<GetPluginInfoResponse>& pluginInfos) { - for (size_t i = 1; i < pluginInfos.size(); ++i) { - if (pluginInfos[i].name() != pluginInfos[0].name() || - pluginInfos[i].vendor_version() != - pluginInfos[0].vendor_version()) { - LOG(WARNING) << "Inconsistent plugin services. Please check with " - "the plugin vendor to ensure compatibility."; - } - } - - return Nothing(); - }); - })) - // Get the controller capabilities. - .then(process::defer(self(), [this]() -> Future<Nothing> { - if (!services.contains(CONTROLLER_SERVICE)) { - controllerCapabilities = ControllerCapabilities(); - return Nothing(); - } - - return call<CONTROLLER_GET_CAPABILITIES>( - CONTROLLER_SERVICE, ControllerGetCapabilitiesRequest()) - .then(process::defer(self(), [this]( - const ControllerGetCapabilitiesResponse& response) { - controllerCapabilities = response.capabilities(); - return Nothing(); - })); - })) - // Get the node capabilities and ID. - .then(process::defer(self(), [this]() -> Future<Nothing> { - if (!services.contains(NODE_SERVICE)) { - nodeCapabilities = NodeCapabilities(); - return Nothing(); - } - - return call<NODE_GET_CAPABILITIES>( - NODE_SERVICE, NodeGetCapabilitiesRequest()) - .then(process::defer(self(), [this]( - const NodeGetCapabilitiesResponse& response) -> Future<Nothing> { - nodeCapabilities = response.capabilities(); - - if (controllerCapabilities->publishUnpublishVolume) { - return call<NODE_GET_ID>(NODE_SERVICE, NodeGetIdRequest()) - .then(process::defer(self(), [this]( - const NodeGetIdResponse& response) { - nodeId = response.node_id(); - return Nothing(); - })); - } - - return Nothing(); - })); - })); -} - - -Future<Nothing> StorageLocalResourceProviderProcess::publishVolume( - const string& volumeId) -{ - if (!volumes.contains(volumeId)) { - return Failure("Cannot publish unknown volume '" + volumeId + "'"); - } - - VolumeData& volume = volumes.at(volumeId); - - LOG(INFO) << "Publishing volume '" << volumeId << "' in " - << volume.state.state() << " state"; - - // Volume publishing is serialized with other operations on the same volume to - // avoid races. - return volume.sequence->add(std::function<Future<Nothing>()>( - process::defer(self(), &Self::_publishVolume, volumeId))); -} - - -Future<Nothing> StorageLocalResourceProviderProcess::_attachVolume( - const string& volumeId) -{ - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - - if (volumeState.state() == VolumeState::NODE_READY) { - return Nothing(); - } - - if (volumeState.state() != VolumeState::CREATED && - volumeState.state() != VolumeState::CONTROLLER_PUBLISH && - volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) { - return Failure( - "Cannot attach volume '" + volumeId + "' in " + - stringify(volumeState.state()) + " state"); - } - - if (!controllerCapabilities->publishUnpublishVolume) { - // Since this is a no-op, no need to checkpoint here. - volumeState.set_state(VolumeState::NODE_READY); - return Nothing(); - } - - // A previously failed `ControllerUnpublishVolume` call can be recovered - // through an extra `ControllerUnpublishVolume` call. See: - // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerunpublishvolume // NOLINT - if (volumeState.state() == VolumeState::CONTROLLER_UNPUBLISH) { - // Retry after recovering the volume to `CREATED` state. - return _detachVolume(volumeId) - .then(process::defer(self(), &Self::_attachVolume, volumeId)); - } - - if (volumeState.state() == VolumeState::CREATED) { - volumeState.set_state(VolumeState::CONTROLLER_PUBLISH); - checkpointVolumeState(volumeId); - } - - LOG(INFO) - << "Calling '/csi.v0.Controller/ControllerPublishVolume' for volume '" - << volumeId << "'"; - - ControllerPublishVolumeRequest request; - request.set_volume_id(volumeId); - request.set_node_id(CHECK_NOTNONE(nodeId)); - *request.mutable_volume_capability() = - csi::v0::evolve(volumeState.volume_capability()); - request.set_readonly(false); - *request.mutable_volume_attributes() = volumeState.volume_attributes(); - - return call<CONTROLLER_PUBLISH_VOLUME>(CONTROLLER_SERVICE, std::move(request)) - .then(process::defer(self(), [this, volumeId]( - const ControllerPublishVolumeResponse& response) { - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - volumeState.set_state(VolumeState::NODE_READY); - *volumeState.mutable_publish_info() = response.publish_info(); - - checkpointVolumeState(volumeId); - - return Nothing(); - })); -} - - -Future<Nothing> StorageLocalResourceProviderProcess::_detachVolume( - const string& volumeId) -{ - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - - if (volumeState.state() == VolumeState::CREATED) { - return Nothing(); - } - - if (volumeState.state() != VolumeState::NODE_READY && - volumeState.state() != VolumeState::CONTROLLER_PUBLISH && - volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) { - // Retry after transitioning the volume to `CREATED` state. - return _unpublishVolume(volumeId) - .then(process::defer(self(), &Self::_detachVolume, volumeId)); - } - - if (!controllerCapabilities->publishUnpublishVolume) { - // Since this is a no-op, no need to checkpoint here. - volumeState.set_state(VolumeState::CREATED); - return Nothing(); - } - - // A previously failed `ControllerPublishVolume` call can be recovered through - // the current `ControllerUnpublishVolume` call. See: - // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT - if (volumeState.state() == VolumeState::NODE_READY || - volumeState.state() == VolumeState::CONTROLLER_PUBLISH) { - volumeState.set_state(VolumeState::CONTROLLER_UNPUBLISH); - checkpointVolumeState(volumeId); - } - - LOG(INFO) - << "Calling '/csi.v0.Controller/ControllerUnpublishVolume' for volume '" - << volumeId << "'"; - - ControllerUnpublishVolumeRequest request; - request.set_volume_id(volumeId); - request.set_node_id(CHECK_NOTNONE(nodeId)); - - return call<CONTROLLER_UNPUBLISH_VOLUME>( - CONTROLLER_SERVICE, std::move(request)) - .then(process::defer(self(), [this, volumeId] { - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - volumeState.set_state(VolumeState::CREATED); - volumeState.mutable_publish_info()->clear(); - - checkpointVolumeState(volumeId); - - return Nothing(); - })); -} - - -Future<Nothing> StorageLocalResourceProviderProcess::_publishVolume( - const string& volumeId) -{ - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - - if (volumeState.state() == VolumeState::PUBLISHED) { - CHECK(volumeState.node_publish_required()); - return Nothing(); - } - - if (volumeState.state() != VolumeState::VOL_READY && - volumeState.state() != VolumeState::NODE_PUBLISH && - volumeState.state() != VolumeState::NODE_UNPUBLISH) { - // Retry after transitioning the volume to `VOL_READY` state. - return __publishVolume(volumeId) - .then(process::defer(self(), &Self::_publishVolume, volumeId)); - } - - // A previously failed `NodeUnpublishVolume` call can be recovered through an - // extra `NodeUnpublishVolume` call. See: - // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunpublishvolume // NOLINT - if (volumeState.state() == VolumeState::NODE_UNPUBLISH) { - // Retry after recovering the volume to `VOL_READY` state. - return __unpublishVolume(volumeId) - .then(process::defer(self(), &Self::_publishVolume, volumeId)); - } - - const string targetPath = paths::getMountTargetPath( - paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()), - volumeId); - - // NOTE: The target path will be cleaned up during volume removal. - Try<Nothing> mkdir = os::mkdir(targetPath); - if (mkdir.isError()) { - return Failure( - "Failed to create mount target path '" + targetPath + - "': " + mkdir.error()); - } - - if (volumeState.state() == VolumeState::VOL_READY) { - volumeState.set_state(VolumeState::NODE_PUBLISH); - checkpointVolumeState(volumeId); - } - - LOG(INFO) << "Calling '/csi.v0.Node/NodePublishVolume' for volume '" - << volumeId << "'"; - - NodePublishVolumeRequest request; - request.set_volume_id(volumeId); - *request.mutable_publish_info() = volumeState.publish_info(); - request.set_target_path(targetPath); - *request.mutable_volume_capability() = - csi::v0::evolve(volumeState.volume_capability()); - request.set_readonly(false); - *request.mutable_volume_attributes() = volumeState.volume_attributes(); - - if (nodeCapabilities->stageUnstageVolume) { - const string stagingPath = paths::getMountStagingPath( - paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()), - volumeId); - - CHECK(os::exists(stagingPath)); - request.set_staging_target_path(stagingPath); - } - - return call<NODE_PUBLISH_VOLUME>(NODE_SERVICE, std::move(request)) - .then(defer(self(), [this, volumeId, targetPath] { - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - - volumeState.set_state(VolumeState::PUBLISHED); - - // NOTE: This is the first time a container is going to consume the - // persistent volume, so the `node_publish_required` field is set to - // indicate that this volume must remain published so it can be - // synchronously cleaned up when the persistent volume is destroyed. - volumeState.set_node_publish_required(true); - - checkpointVolumeState(volumeId); - - return Nothing(); - })); -} - - -Future<Nothing> StorageLocalResourceProviderProcess::__publishVolume( - const string& volumeId) -{ - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - - if (volumeState.state() == VolumeState::VOL_READY) { - CHECK(!volumeState.boot_id().empty()); - return Nothing(); - } - - if (volumeState.state() != VolumeState::NODE_READY && - volumeState.state() != VolumeState::NODE_STAGE && - volumeState.state() != VolumeState::NODE_UNSTAGE) { - // Retry after transitioning the volume to `NODE_READY` state. - return _attachVolume(volumeId) - .then(process::defer(self(), &Self::__publishVolume, volumeId)); - } - - if (!nodeCapabilities->stageUnstageVolume) { - // Since this is a no-op, no need to checkpoint here. - volumeState.set_state(VolumeState::VOL_READY); - volumeState.set_boot_id(CHECK_NOTNONE(bootId)); - return Nothing(); - } - - // A previously failed `NodeUnstageVolume` call can be recovered through an - // extra `NodeUnstageVolume` call. See: - // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunstagevolume // NOLINT - if (volumeState.state() == VolumeState::NODE_UNSTAGE) { - // Retry after recovering the volume to `NODE_READY` state. - return _unpublishVolume(volumeId) - .then(process::defer(self(), &Self::__publishVolume, volumeId)); - } - - const string stagingPath = paths::getMountStagingPath( - paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()), - volumeId); - - // NOTE: The staging path will be cleaned up in during volume removal. - Try<Nothing> mkdir = os::mkdir(stagingPath); - if (mkdir.isError()) { - return Failure( - "Failed to create mount staging path '" + stagingPath + - "': " + mkdir.error()); - } - - if (volumeState.state() == VolumeState::NODE_READY) { - volumeState.set_state(VolumeState::NODE_STAGE); - checkpointVolumeState(volumeId); - } - - LOG(INFO) << "Calling '/csi.v0.Node/NodeStageVolume' for volume '" << volumeId - << "'"; - - NodeStageVolumeRequest request; - request.set_volume_id(volumeId); - *request.mutable_publish_info() = volumeState.publish_info(); - request.set_staging_target_path(stagingPath); - *request.mutable_volume_capability() = - csi::v0::evolve(volumeState.volume_capability()); - *request.mutable_volume_attributes() = volumeState.volume_attributes(); - - return call<NODE_STAGE_VOLUME>(NODE_SERVICE, std::move(request)) - .then(process::defer(self(), [this, volumeId] { - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - volumeState.set_state(VolumeState::VOL_READY); - volumeState.set_boot_id(CHECK_NOTNONE(bootId)); - - checkpointVolumeState(volumeId); - - return Nothing(); - })); -} - - -Future<Nothing> StorageLocalResourceProviderProcess::_unpublishVolume( - const string& volumeId) -{ - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - - if (volumeState.state() == VolumeState::NODE_READY) { - CHECK(volumeState.boot_id().empty()); - return Nothing(); - } - - if (volumeState.state() != VolumeState::VOL_READY && - volumeState.state() != VolumeState::NODE_STAGE && - volumeState.state() != VolumeState::NODE_UNSTAGE) { - // Retry after transitioning the volume to `VOL_READY` state. - return __unpublishVolume(volumeId) - .then(process::defer(self(), &Self::_unpublishVolume, volumeId)); - } - - if (!nodeCapabilities->stageUnstageVolume) { - // Since this is a no-op, no need to checkpoint here. - volumeState.set_state(VolumeState::NODE_READY); - volumeState.clear_boot_id(); - return Nothing(); - } - - // A previously failed `NodeStageVolume` call can be recovered through the - // current `NodeUnstageVolume` call. See: - // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT - if (volumeState.state() == VolumeState::VOL_READY || - volumeState.state() == VolumeState::NODE_STAGE) { - volumeState.set_state(VolumeState::NODE_UNSTAGE); - checkpointVolumeState(volumeId); - } - - const string stagingPath = paths::getMountStagingPath( - paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()), - volumeId); - - CHECK(os::exists(stagingPath)); - - LOG(INFO) << "Calling '/csi.v0.Node/NodeUnstageVolume' for volume '" - << volumeId << "'"; - - NodeUnstageVolumeRequest request; - request.set_volume_id(volumeId); - request.set_staging_target_path(stagingPath); - - return call<NODE_UNSTAGE_VOLUME>(NODE_SERVICE, std::move(request)) - .then(process::defer(self(), [this, volumeId] { - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - volumeState.set_state(VolumeState::NODE_READY); - volumeState.clear_boot_id(); - - checkpointVolumeState(volumeId); - - return Nothing(); - })); -} - - -Future<Nothing> StorageLocalResourceProviderProcess::__unpublishVolume( - const string& volumeId) -{ - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - - if (volumeState.state() == VolumeState::VOL_READY) { - return Nothing(); - } - - if (volumeState.state() != VolumeState::PUBLISHED && - volumeState.state() != VolumeState::NODE_PUBLISH && - volumeState.state() != VolumeState::NODE_UNPUBLISH) { - return Failure( - "Cannot unpublish volume '" + volumeId + "' in " + - stringify(volumeState.state()) + "state"); - } - - // A previously failed `NodePublishVolume` call can be recovered through the - // current `NodeUnpublishVolume` call. See: - // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT - if (volumeState.state() == VolumeState::PUBLISHED || - volumeState.state() == VolumeState::NODE_PUBLISH) { - volumeState.set_state(VolumeState::NODE_UNPUBLISH); - checkpointVolumeState(volumeId); - } - - const string targetPath = paths::getMountTargetPath( - paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()), - volumeId); - - CHECK(os::exists(targetPath)); - - LOG(INFO) << "Calling '/csi.v0.Node/NodeUnpublishVolume' for volume '" - << volumeId << "'"; - - NodeUnpublishVolumeRequest request; - request.set_volume_id(volumeId); - request.set_target_path(targetPath); - - return call<NODE_UNPUBLISH_VOLUME>(NODE_SERVICE, std::move(request)) - .then(process::defer(self(), [this, volumeId] { - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - volumeState.set_state(VolumeState::VOL_READY); - - checkpointVolumeState(volumeId); - - return Nothing(); - })); -} - - -Future<VolumeInfo> StorageLocalResourceProviderProcess::createVolume( - const string& name, - const Bytes& capacity, - const types::VolumeCapability& capability, - const Map<string, string>& parameters) -{ - if (!controllerCapabilities->createDeleteVolume) { - return Failure( - "CREATE_DELETE_VOLUME controller capability is not supported for CSI " - "plugin type '" + info.type() + "' and name '" + info.name()); - } - - LOG(INFO) << "Creating volume with name '" << name << "'"; - - CreateVolumeRequest request; - request.set_name(name); - request.mutable_capacity_range()->set_required_bytes(capacity.bytes()); - request.mutable_capacity_range()->set_limit_bytes(capacity.bytes()); - *request.add_volume_capabilities() = csi::v0::evolve(capability); - *request.mutable_parameters() = parameters; - - // We retry the `CreateVolume` call for MESOS-9517. - return call<CREATE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true) - .then(process::defer(self(), [=]( - const CreateVolumeResponse& response) -> Future<VolumeInfo> { - const string& volumeId = response.volume().id(); - - // NOTE: If the volume is already tracked, there might already be - // operations running in its sequence. Since this continuation runs - // outside the sequence, we fail the call here to avoid any race issue. - // This also means that this call is not idempotent. - if (volumes.contains(volumeId)) { - return Failure("Volume with name '" + name + "' already exists"); - } - - VolumeState volumeState; - volumeState.set_state(VolumeState::CREATED); - *volumeState.mutable_volume_capability() = capability; - *volumeState.mutable_parameters() = parameters; - *volumeState.mutable_volume_attributes() = response.volume().attributes(); - - volumes.put(volumeId, std::move(volumeState)); - checkpointVolumeState(volumeId); - - return VolumeInfo{capacity, volumeId, response.volume().attributes()}; - })); -} - - -Future<bool> StorageLocalResourceProviderProcess::deleteVolume( - const string& volumeId) -{ - if (!volumes.contains(volumeId)) { - return __deleteVolume(volumeId); - } - - VolumeData& volume = volumes.at(volumeId); - - LOG(INFO) << "Deleting volume '" << volumeId << "' in " - << volume.state.state() << " state"; - - // Volume deletion is sequentialized with other operations on the same volume - // to avoid races. - return volume.sequence->add(std::function<Future<bool>()>( - process::defer(self(), &Self::_deleteVolume, volumeId))); -} - - -Future<bool> StorageLocalResourceProviderProcess::_deleteVolume( - const std::string& volumeId) -{ - CHECK(volumes.contains(volumeId)); - VolumeState& volumeState = volumes.at(volumeId).state; - - if (volumeState.node_publish_required()) { - CHECK_EQ(VolumeState::PUBLISHED, volumeState.state()); - - const string targetPath = paths::getMountTargetPath( - paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()), - volumeId); - - // NOTE: Normally the volume should have been cleaned up. However this may - // not be true for preprovisioned volumes (e.g., leftover from a previous - // resource provider instance). To prevent data leakage in such cases, we - // clean up the data (but not the target path) here. - Try<Nothing> rmdir = os::rmdir(targetPath, true, false); - if (rmdir.isError()) { - return Failure( - "Failed to clean up volume '" + volumeId + "': " + rmdir.error()); - } - - volumeState.set_node_publish_required(false); - checkpointVolumeState(volumeId); - } - - if (volumeState.state() != VolumeState::CREATED) { - // Retry after transitioning the volume to `CREATED` state. - return _detachVolume(volumeId) - .then(process::defer(self(), &Self::_deleteVolume, volumeId)); - } - - // NOTE: The last asynchronous continuation, which is supposed to be run in - // the volume's sequence, would cause the sequence to be destructed, which - // would in turn discard the returned future. However, since the continuation - // would have already been run, the returned future will become ready, making - // the future returned by the sequence ready as well. - return __deleteVolume(volumeId) - .then(process::defer(self(), [this, volumeId](bool deleted) { - volumes.erase(volumeId); - - const string volumePath = paths::getVolumePath( - rootDir, pluginInfo.type(), pluginInfo.name(), volumeId); - - Try<Nothing> rmdir = os::rmdir(volumePath); - CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '" - << volumePath << "': " << rmdir.error(); - - garbageCollectMountPath(volumeId); - - return deleted; - })); -} - - -Future<bool> StorageLocalResourceProviderProcess::__deleteVolume( - const string& volumeId) -{ - if (!controllerCapabilities->createDeleteVolume) { - return false; - } - - LOG(INFO) << "Calling '/csi.v0.Controller/DeleteVolume' for volume '" - << volumeId << "'"; - - DeleteVolumeRequest request; - request.set_volume_id(volumeId); - - // We retry the `DeleteVolume` call for MESOS-9517. - return call<DELETE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true) - .then([] { return true; }); -} - - -Future<Option<Error>> StorageLocalResourceProviderProcess::validateVolume( - const VolumeInfo& volumeInfo, - const types::VolumeCapability& capability, - const Map<string, string>& parameters) -{ - // If the volume has been checkpointed, the validation succeeds only if the - // capability and parameters of the specified profile are the same as those in - // the checkpoint. - if (volumes.contains(volumeInfo.id)) { - const VolumeState& volumeState = volumes.at(volumeInfo.id).state; - - if (volumeState.volume_capability() != capability) { - return Some( - Error("Mismatched capability for volume '" + volumeInfo.id + "'")); - } - - if (volumeState.parameters() != parameters) { - return Some( - Error("Mismatched parameters for volume '" + volumeInfo.id + "'")); - } - - return None(); - } - - if (!parameters.empty()) { - LOG(WARNING) - << "Validating volumes against parameters is not supported in CSI v0"; - } - - LOG(INFO) << "Validating volume '" << volumeInfo.id << "'"; - - ValidateVolumeCapabilitiesRequest request; - request.set_volume_id(volumeInfo.id); - *request.add_volume_capabilities() = csi::v0::evolve(capability); - *request.mutable_volume_attributes() = volumeInfo.context; - - return call<VALIDATE_VOLUME_CAPABILITIES>( - CONTROLLER_SERVICE, std::move(request)) - .then(process::defer(self(), [=]( - const ValidateVolumeCapabilitiesResponse& response) - -> Future<Option<Error>> { - if (!response.supported()) { - return Error( - "Unsupported volume capability for volume '" + volumeInfo.id + - "': " + response.message()); - } - - // NOTE: If the volume is already tracked, there might already be - // operations running in its sequence. Since this continuation runs - // outside the sequence, we fail the call here to avoid any race issue. - // This also means that this call is not idempotent. - if (volumes.contains(volumeInfo.id)) { - return Failure("Volume '" + volumeInfo.id + "' already validated"); - } - - VolumeState volumeState; - volumeState.set_state(VolumeState::CREATED); - *volumeState.mutable_volume_capability() = capability; - *volumeState.mutable_parameters() = parameters; - *volumeState.mutable_volume_attributes() = volumeInfo.context; - - volumes.put(volumeInfo.id, std::move(volumeState)); - checkpointVolumeState(volumeInfo.id); - - return None(); - })); -} - - -Future<vector<VolumeInfo>> StorageLocalResourceProviderProcess::listVolumes() -{ - if (!controllerCapabilities->listVolumes) { - return vector<VolumeInfo>(); - } - - // TODO(chhsiao): Set the max entries and use a loop to do multiple - // `ListVolumes` calls. - return call<LIST_VOLUMES>(CONTROLLER_SERVICE, ListVolumesRequest()) - .then(process::defer(self(), [](const ListVolumesResponse& response) { - vector<VolumeInfo> result; - foreach (const auto& entry, response.entries()) { - result.push_back(VolumeInfo{Bytes(entry.volume().capacity_bytes()), - entry.volume().id(), - entry.volume().attributes()}); - } - - return result; - })); -} - - -Future<Bytes> StorageLocalResourceProviderProcess::getCapacity( - const types::VolumeCapability& capability, - const Map<string, string>& parameters) -{ - if (!controllerCapabilities->getCapacity) { - return Bytes(0); - } - - GetCapacityRequest request; - *request.add_volume_capabilities() = csi::v0::evolve(capability); - *request.mutable_parameters() = parameters; - - return call<GET_CAPACITY>(CONTROLLER_SERVICE, std::move(request)) - .then([](const GetCapacityResponse& response) { - return Bytes(response.available_capacity()); - }); -} - - Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation( const id::UUID& operationUuid) { @@ -2519,7 +1487,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk( // afterward. See MESOS-9254. Future<VolumeInfo> created; if (resource.disk().source().has_profile()) { - created = createVolume( + created = volumeManager->createVolume( operationUuid.toString(), resource.scalar().value() * Bytes::MEGABYTES, profileInfo.capability, @@ -2532,7 +1500,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk( resource.disk().source().metadata())) }; - created = validateVolume( + created = volumeManager->validateVolume( volumeInfo, profileInfo.capability, profileInfo.parameters) .then([resource, profile, volumeInfo]( const Option<Error>& error) -> Future<VolumeInfo> { @@ -2596,7 +1564,7 @@ StorageLocalResourceProviderProcess::applyDestroyDisk( CHECK(!Resources::isPersistentVolume(resource)); CHECK(resource.disk().source().has_id()); - return deleteVolume(resource.disk().source().id()) + return volumeManager->deleteVolume(resource.disk().source().id()) .then(defer(self(), [=](bool deprovisioned) { Resource converted = resource; converted.mutable_disk()->mutable_source()->set_type( @@ -2669,6 +1637,7 @@ StorageLocalResourceProviderProcess::applyCreate( foreach (const Resource& resource, operation.create().volumes()) { CHECK(Resources::isPersistentVolume(resource)); + CHECK(resource.disk().source().has_id()); // TODO(chhsiao): Support persistent BLOCK volumes. if (resource.disk().source().type() != Resource::DiskInfo::Source::MOUNT) { @@ -2677,6 +1646,12 @@ StorageLocalResourceProviderProcess::applyCreate( stringify(resource.disk().persistence().id()) + "' on a " + stringify(resource.disk().source().type()) + " disk"); } + + // TODO(chhsiao): Ideally, we could perform a sanity check to verify that + // the target path is empty before creating a new persistent volume. + // However, right now we cannot distinguish the case where a framework is + // recreating its own persistent volume after the agent ID changes from the + // case where existing data is being leaked to another framework. } return getResourceConversions(operation); @@ -2690,43 +1665,32 @@ StorageLocalResourceProviderProcess::applyDestroy( CHECK(operation.has_destroy()); foreach (const Resource& resource, operation.destroy().volumes()) { - // TODO(chhsiao): Support cleaning up persistent BLOCK volumes, presumably - // with `dd` or any other utility to zero out the block device. CHECK(Resources::isPersistentVolume(resource)); - CHECK(resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT); CHECK(resource.disk().source().has_id()); - const string& volumeId = resource.disk().source().id(); - CHECK(volumes.contains(volumeId)); - - const VolumeState& volumeState = volumes.at(volumeId).state; - - // NOTE: Data can only be written to the persistent volume when when it is - // in `PUBLISHED` state (i.e., mounted). Once a volume has been transitioned - // to `PUBLISHED`, we will set the `node_publish_required` field and always - // recover it back to `PUBLISHED` after a failover, until a `DESTROY_DISK` - // is applied, which only comes after `DESTROY`. So we only need to clean up - // the volume if it has the field set. - if (!volumeState.node_publish_required()) { - continue; - } - - CHECK_EQ(VolumeState::PUBLISHED, volumeState.state()); + // TODO(chhsiao): Support cleaning up persistent BLOCK volumes, presumably + // with `dd` or any other utility to zero out the block device. + CHECK_EQ(Resource::DiskInfo::Source::MOUNT, + resource.disk().source().type()); const string targetPath = csi::paths::getMountTargetPath( csi::paths::getMountRootDir( slave::paths::getCsiRootDir(workDir), info.storage().plugin().type(), info.storage().plugin().name()), - volumeId); - - // Only the data in the target path, but not itself, should be removed. - Try<Nothing> rmdir = os::rmdir(targetPath, true, false); - if (rmdir.isError()) { - return Error( - "Failed to remove persistent volume '" + - stringify(resource.disk().persistence().id()) + "' at '" + - targetPath + "': " + rmdir.error()); + resource.disk().source().id()); + + if (os::exists(targetPath)) { + // NOTE: We always clean up the data in the target path (but not the + // directory itself) even if the volume is not published, in which case + // this should be a no-op. + Try<Nothing> rmdir = os::rmdir(targetPath, true, false); + if (rmdir.isError()) { + return Error( + "Failed to remove persistent volume '" + + stringify(resource.disk().persistence().id()) + "' at '" + + targetPath + "': " + rmdir.error()); + } } } @@ -2854,28 +1818,6 @@ void StorageLocalResourceProviderProcess::garbageCollectOperationPath( } -void StorageLocalResourceProviderProcess::garbageCollectMountPath( - const string& volumeId) -{ - CHECK(!volumes.contains(volumeId)); - - const string path = csi::paths::getMountPath( - csi::paths::getMountRootDir( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name()), - volumeId); - - if (os::exists(path)) { - Try<Nothing> rmdir = os::rmdir(path); - if (rmdir.isError()) { - LOG(ERROR) - << "Failed to remove directory '" << path << "': " << rmdir.error(); - } - } -} - - void StorageLocalResourceProviderProcess::checkpointResourceProviderState() { ResourceProviderState state; @@ -2925,26 +1867,6 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState() } -void StorageLocalResourceProviderProcess::checkpointVolumeState( - const string& volumeId) -{ - const string statePath = csi::paths::getVolumeStatePath( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name(), - volumeId); - - // NOTE: We ensure the checkpoint is synced to the filesystem to avoid - // resulting in a stale or empty checkpoint when a system crash happens. - Try<Nothing> checkpoint = - slave::state::checkpoint(statePath, volumes.at(volumeId).state, true); - - CHECK_SOME(checkpoint) - << "Failed to checkpoint volume state to '" << statePath << "':" - << checkpoint.error(); -} - - void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate() { Call call; diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp index 56d3682..6e14d90 100644 --- a/src/resource_provider/storage/provider_process.hpp +++ b/src/resource_provider/storage/provider_process.hpp @@ -19,7 +19,6 @@ #include <memory> #include <string> -#include <type_traits> #include <vector> #include <mesos/mesos.hpp> @@ -32,9 +31,7 @@ #include <mesos/v1/resource_provider.hpp> #include <process/future.hpp> -#include <process/grpc.hpp> #include <process/http.hpp> -#include <process/loop.hpp> #include <process/owned.hpp> #include <process/process.hpp> #include <process/sequence.hpp> @@ -42,8 +39,6 @@ #include <process/metrics/counter.hpp> #include <process/metrics/push_gauge.hpp> -#include <stout/bytes.hpp> -#include <stout/duration.hpp> #include <stout/hashset.hpp> #include <stout/linkedhashmap.hpp> #include <stout/nothing.hpp> @@ -52,10 +47,6 @@ #include <stout/uuid.hpp> #include "csi/metrics.hpp" -#include "csi/rpc.hpp" -#include "csi/service_manager.hpp" -#include "csi/state.hpp" -#include "csi/utils.hpp" #include "csi/volume_manager.hpp" #include "status_update_manager/operation.hpp" @@ -63,18 +54,6 @@ namespace mesos { namespace internal { -// Storage local resource provider initially picks a random amount of time -// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI -// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent -// retries are exponentially backed off based on this interval (e.g., 2nd retry -// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`, -// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`. -// -// TODO(chhsiao): Make the retry parameters configurable. -constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10); -constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10); - - class StorageLocalResourceProviderProcess : public process::Process<StorageLocalResourceProviderProcess> { @@ -97,57 +76,11 @@ public: void disconnected(); void received(const resource_provider::Event& event); - // Wrapper functions to make CSI calls and update RPC metrics. Made public for - // testing purpose. - // - // The call is made asynchronously and thus no guarantee is provided on the - // order in which calls are sent. Callers need to either ensure to not have - // multiple conflicting calls in flight, or treat results idempotently. - // - // NOTE: We currently ensure this by 1) resource locking to forbid concurrent - // calls on the same volume, and 2) no profile update while there are ongoing - // `CREATE_DISK` or `DESTROY_DISK` operations. - // - // NOTE: Since this function uses `getService` to obtain the latest service - // future, which depends on probe results, it is disabled for making probe - // calls; `_call` should be used directly instead. - template < - csi::v0::RPC rpc, - typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0> - process::Future<csi::v0::Response<rpc>> call( - const csi::Service& service, - const csi::v0::Request<rpc>& request, - bool retry = false); - - template <csi::v0::RPC rpc> - process::Future<Try<csi::v0::Response<rpc>, process::grpc::StatusError>> - _call(const std::string& endpoint, const csi::v0::Request<rpc>& request); - - template <csi::v0::RPC rpc> - process::Future<process::ControlFlow<csi::v0::Response<rpc>>> __call( - const Try<csi::v0::Response<rpc>, process::grpc::StatusError>& result, - const Option<Duration>& backoff); - private: - struct VolumeData - { - VolumeData(csi::state::VolumeState&& _state) - : state(_state), sequence(new process::Sequence("volume-sequence")) {} - - csi::state::VolumeState state; - - // We run all CSI operations for the same volume on a sequence to - // ensure that they are processed in a sequential order. - process::Owned<process::Sequence> sequence; - }; - void initialize() override; void fatal(); - // The recover functions are responsible to recover the state of the - // resource provider and CSI volumes from checkpointed data. process::Future<Nothing> recover(); - process::Future<Nothing> recoverVolumes(); void doReliableRegistration(); @@ -192,73 +125,6 @@ private: void reconcileOperations( const resource_provider::Event::ReconcileOperations& reconcile); - process::Future<Nothing> prepareServices(); - - process::Future<Nothing> publishVolume(const std::string& volumeId); - - // The following methods are used to manage volume lifecycles. Transient - // states are omitted. - // - // +------------+ - // + + + | CREATED | ^ - // _attachVolume | | | +---+----^---+ | - // | | | | | | _detachVolume - // | | | +---v----+---+ | - // v + + | NODE_READY | + ^ - // | | +---+----^---+ | | - // __publishVolume | | | | | | _unpublishVolume - // | | +---v----+---+ | | - // v + | VOL_READY | + + ^ - // | +---+----^---+ | | | - // _publishVolume | | | | | | __unpublishVolume - // | +---v----+---+ | | | - // V | PUBLISHED | + + + - // +------------+ - - // Transition a volume to `NODE_READY` state from any state above. - process::Future<Nothing> _attachVolume(const std::string& volumeId); - - // Transition a volume to `CREATED` state from any state below. - process::Future<Nothing> _detachVolume(const std::string& volumeId); - - // Transition a volume to `PUBLISHED` state from any state above. - process::Future<Nothing> _publishVolume(const std::string& volumeId); - - // Transition a volume to `VOL_READY` state from any state above. - process::Future<Nothing> __publishVolume(const std::string& volumeId); - - // Transition a volume to `NODE_READY` state from any state below. - process::Future<Nothing> _unpublishVolume(const std::string& volumeId); - - // Transition a volume to `VOL_READY` state from any state below. - process::Future<Nothing> __unpublishVolume(const std::string& volumeId); - - // NOTE: This can only be called after `prepareServices`. - process::Future<csi::VolumeInfo> createVolume( - const std::string& name, - const Bytes& capacity, - const csi::types::VolumeCapability& capability, - const google::protobuf::Map<std::string, std::string>& parameters); - - // NOTE: This can only be called after `prepareServices`. - process::Future<bool> deleteVolume(const std::string& volumeId); - process::Future<bool> _deleteVolume(const std::string& volumeId); - process::Future<bool> __deleteVolume(const std::string& volumeId); - - // NOTE: This can only be called after `prepareServices`. - process::Future<Option<Error>> validateVolume( - const csi::VolumeInfo& volumeInfo, - const csi::types::VolumeCapability& capability, - const google::protobuf::Map<std::string, std::string>& parameters); - - // NOTE: This can only be called after `prepareServices`. - process::Future<std::vector<csi::VolumeInfo>> listVolumes(); - - // NOTE: This can only be called after `prepareServices`. - process::Future<Bytes> getCapacity( - const csi::types::VolumeCapability& capability, - const google::protobuf::Map<std::string, std::string>& parameters); - // Applies the operation. Speculative operations will be synchronously // applied. Do nothing if the operation is already in a terminal state. process::Future<Nothing> _applyOperation(const id::UUID& operationUuid); @@ -295,10 +161,8 @@ private: const Try<std::vector<ResourceConversion>>& conversions); void garbageCollectOperationPath(const id::UUID& operationUuid); - void garbageCollectMountPath(const std::string& volumeId); void checkpointResourceProviderState(); - void checkpointVolumeState(const std::string& volumeId); void sendResourceProviderStateUpdate(); @@ -326,25 +190,13 @@ private: std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor; - process::grpc::client::Runtime runtime; process::Owned<v1::resource_provider::Driver> driver; OperationStatusUpdateManager statusUpdateManager; // The mapping of known profiles fetched from the DiskProfileAdaptor. hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos; - process::Owned<csi::ServiceManager> serviceManager; - - // TODO(chhsiao): Remove the following variables after refactoring. - std::string rootDir; - CSIPluginInfo pluginInfo; - hashset<csi::Service> services; - - Option<std::string> bootId; - Option<csi::v0::PluginCapabilities> pluginCapabilities; - Option<csi::v0::ControllerCapabilities> controllerCapabilities; - Option<csi::v0::NodeCapabilities> nodeCapabilities; - Option<std::string> nodeId; + process::Owned<csi::VolumeManager> volumeManager; // We maintain the following invariant: if one operation depends on // another, they cannot be in PENDING state at the same time, i.e., @@ -356,7 +208,6 @@ private: LinkedHashMap<id::UUID, Operation> operations; Resources totalResources; id::UUID resourceVersion; - hashmap<std::string, VolumeData> volumes; // If pending, it means that the storage pools are being reconciled, and all // incoming operations that disallow reconciliation will be dropped. diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 0fbd602..bb71935 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -43,6 +43,7 @@ #include "csi/paths.hpp" #include "csi/rpc.hpp" #include "csi/state.hpp" +#include "csi/v0_volume_manager_process.hpp" #include "linux/fs.hpp" @@ -50,8 +51,6 @@ #include "module/manager.hpp" -#include "resource_provider/storage/provider_process.hpp" - #include "slave/container_daemon_process.hpp" #include "slave/paths.hpp" #include "slave/state.hpp" @@ -5054,7 +5053,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) ASSERT_EQ(0u, createVolumeRequests.size()); Future<Nothing> createVolumeCall = FUTURE_DISPATCH( - _, &StorageLocalResourceProviderProcess::__call<csi::v0::CREATE_VOLUME>); + _, &csi::v0::VolumeManagerProcess::__call<csi::v0::CREATE_VOLUME>); // Return `DEADLINE_EXCEEDED` for the first `CreateVolume` call. createVolumeResults.put( @@ -5062,7 +5061,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) AWAIT_READY(createVolumeCall); - Duration createVolumeBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR; + Duration createVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR; // Settle the clock to ensure that the retry timer has been set, then advance // the clock by the maximum backoff to trigger a retry. @@ -5080,15 +5079,14 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) ASSERT_EQ(0u, createVolumeRequests.size()); createVolumeCall = FUTURE_DISPATCH( - _, - &StorageLocalResourceProviderProcess::__call<csi::v0::CREATE_VOLUME>); + _, &csi::v0::VolumeManagerProcess::__call<csi::v0::CREATE_VOLUME>); createVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, ""))); AWAIT_READY(createVolumeCall); - createVolumeBackoff = - std::min(createVolumeBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX); + createVolumeBackoff = std::min( + createVolumeBackoff * 2, csi::v0::DEFAULT_CSI_RETRY_INTERVAL_MAX); // Settle the clock to ensure that the retry timer has been set, then // advance the clock by the maximum backoff to trigger a retry. @@ -5142,7 +5140,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) ASSERT_EQ(0u, deleteVolumeRequests.size()); Future<Nothing> deleteVolumeCall = FUTURE_DISPATCH( - _, &StorageLocalResourceProviderProcess::__call<csi::v0::DELETE_VOLUME>); + _, &csi::v0::VolumeManagerProcess::__call<csi::v0::DELETE_VOLUME>); // Return `DEADLINE_EXCEEDED` for the first `DeleteVolume` call. deleteVolumeResults.put( @@ -5150,7 +5148,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) AWAIT_READY(deleteVolumeCall); - Duration deleteVolumeBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR; + Duration deleteVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR; // Settle the clock to ensure that the retry timer has been set, then advance // the clock by the maximum backoff to trigger a retry. @@ -5168,15 +5166,14 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) ASSERT_EQ(0u, deleteVolumeRequests.size()); deleteVolumeCall = FUTURE_DISPATCH( - _, - &StorageLocalResourceProviderProcess::__call<csi::v0::DELETE_VOLUME>); + _, &csi::v0::VolumeManagerProcess::__call<csi::v0::DELETE_VOLUME>); deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, ""))); AWAIT_READY(deleteVolumeCall); - deleteVolumeBackoff = - std::min(deleteVolumeBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX); + deleteVolumeBackoff = std::min( + deleteVolumeBackoff * 2, csi::v0::DEFAULT_CSI_RETRY_INTERVAL_MAX); // Settle the clock to ensure that the retry timer has been set, then // advance the clock by the maximum backoff to trigger a retry.