Implemented grow and shrink of persistent volumes. The new offer operations are implemented as speculative operations, but we will use validation to make them non-speculative on API level so that we can transition later without a breaking change.
Review: https://reviews.apache.org/r/66050/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8251087b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8251087b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8251087b Branch: refs/heads/master Commit: 8251087b041ebdfd70ad5aa1e0296e139d4663f0 Parents: 9f2b497 Author: Zhitao Li <zhitaoli...@gmail.com> Authored: Thu May 3 17:04:36 2018 -0700 Committer: Chun-Hung Hsiao <chhs...@mesosphere.io> Committed: Thu May 3 17:04:36 2018 -0700 ---------------------------------------------------------------------- src/common/protobuf_utils.cpp | 49 +++++++---- src/common/resources_utils.cpp | 104 +++++++++++++++++++++-- src/master/master.cpp | 160 +++++++++++++++++++++++++++++++----- src/master/validation.cpp | 129 +++++++++++++++++++++++++++++ src/master/validation.hpp | 10 +++ 5 files changed, 411 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/common/protobuf_utils.cpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp index c5d873c..82ba141 100644 --- a/src/common/protobuf_utils.cpp +++ b/src/common/protobuf_utils.cpp @@ -37,7 +37,6 @@ #include <stout/foreach.hpp> #include <stout/net.hpp> #include <stout/stringify.hpp> -#include <stout/unimplemented.hpp> #include <stout/unreachable.hpp> #include <stout/uuid.hpp> @@ -708,9 +707,24 @@ void injectAllocationInfo( break; } - case Offer::Operation::GROW_VOLUME: + case Offer::Operation::GROW_VOLUME: { + inject( + *operation->mutable_grow_volume()->mutable_volume(), + allocationInfo); + + inject( + *operation->mutable_grow_volume()->mutable_addition(), + allocationInfo); + + break; + } + case Offer::Operation::SHRINK_VOLUME: { - UNIMPLEMENTED; + inject( + *operation->mutable_shrink_volume()->mutable_volume(), + allocationInfo); + + break; } case Offer::Operation::CREATE_VOLUME: { @@ -828,9 +842,17 @@ void stripAllocationInfo(Offer::Operation* operation) break; } - case Offer::Operation::GROW_VOLUME: + case Offer::Operation::GROW_VOLUME: { + strip(*operation->mutable_grow_volume()->mutable_volume()); + strip(*operation->mutable_grow_volume()->mutable_addition()); + + break; + } + case Offer::Operation::SHRINK_VOLUME: { - UNIMPLEMENTED; + strip(*operation->mutable_shrink_volume()->mutable_volume()); + + break; } case Offer::Operation::CREATE_VOLUME: { @@ -877,11 +899,12 @@ bool isSpeculativeOperation(const Offer::Operation& operation) case Offer::Operation::UNRESERVE: case Offer::Operation::CREATE: case Offer::Operation::DESTROY: - return true; + // TODO(zhitao): Convert `GROW_VOLUME` and `SHRINK_VOLUME` to + // non-speculative operations once we can support non-speculative operator + // API. case Offer::Operation::GROW_VOLUME: - case Offer::Operation::SHRINK_VOLUME: { - UNIMPLEMENTED; - } + case Offer::Operation::SHRINK_VOLUME: + return true; case Offer::Operation::UNKNOWN: UNREACHABLE(); } @@ -1020,7 +1043,9 @@ Try<Resources> getConsumedResources(const Offer::Operation& operation) case Offer::Operation::RESERVE: case Offer::Operation::UNRESERVE: case Offer::Operation::CREATE: - case Offer::Operation::DESTROY: { + case Offer::Operation::DESTROY: + case Offer::Operation::GROW_VOLUME: + case Offer::Operation::SHRINK_VOLUME: { Try<vector<ResourceConversion>> conversions = getResourceConversions(operation); @@ -1035,10 +1060,6 @@ Try<Resources> getConsumedResources(const Offer::Operation& operation) return consumed; } - case Offer::Operation::GROW_VOLUME: - case Offer::Operation::SHRINK_VOLUME: { - UNIMPLEMENTED; - } case Offer::Operation::LAUNCH: case Offer::Operation::LAUNCH_GROUP: // TODO(bbannier): Consider adding support for 'LAUNCH' and http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/common/resources_utils.cpp ---------------------------------------------------------------------- diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp index 6f56026..eb72995 100644 --- a/src/common/resources_utils.cpp +++ b/src/common/resources_utils.cpp @@ -16,7 +16,6 @@ #include <stout/foreach.hpp> #include <stout/stringify.hpp> -#include <stout/unimplemented.hpp> #include "common/resources_utils.hpp" @@ -196,9 +195,56 @@ Try<vector<TResourceConversion>> getResourceConversions( } break; } - case TOperation::GROW_VOLUME: + + case TOperation::GROW_VOLUME: { + const TResource& volume = operation.grow_volume().volume(); + const TResource& addition = operation.grow_volume().addition(); + + if (TResources::hasResourceProvider(volume)) { + return Error("Operation not supported for resource provider"); + } + + // To grow a persistent volume, we consume the original volume and the + // additional resource and convert into a single volume with the new size. + TResource converted = volume; + *converted.mutable_scalar() += addition.scalar(); + + conversions.emplace_back(TResources(volume) + addition, converted); + break; + } + case TOperation::SHRINK_VOLUME: { - UNIMPLEMENTED; + const TResource& volume = operation.shrink_volume().volume(); + + if (TResources::hasResourceProvider(volume)) { + return Error("Operation not supported for resource provider"); + } + + // To shrink a persistent volume, we consume the original volume and + // convert into a new volume with reduced size and a freed disk resource + // without persistent volume info. + TResource freed = volume; + + *freed.mutable_scalar() = operation.shrink_volume().subtract(); + + // TODO(zhitao): Move this to helper function + // `Resources::stripPersistentVolume`. + if (freed.disk().has_source()) { + freed.mutable_disk()->clear_persistence(); + freed.mutable_disk()->clear_volume(); + } else { + freed.clear_disk(); + } + + // Since we only allow persistent volumes to be shared, the + // freed resource must be non-shared. + freed.clear_shared(); + + TResource shrunk = volume; + *shrunk.mutable_scalar() -= operation.shrink_volume().subtract(); + + conversions.emplace_back(volume, TResources(shrunk) + freed); + break; } } @@ -265,9 +311,11 @@ Result<ResourceProviderID> getResourceProviderId( resource = operation.destroy().volumes(0); break; case Offer::Operation::GROW_VOLUME: - case Offer::Operation::SHRINK_VOLUME: { - UNIMPLEMENTED; - } + resource = operation.grow_volume().volume(); + break; + case Offer::Operation::SHRINK_VOLUME: + resource = operation.shrink_volume().volume(); + break; case Offer::Operation::CREATE_VOLUME: resource = operation.create_volume().source(); break; @@ -640,9 +688,49 @@ Option<Error> validateAndUpgradeResources(Offer::Operation* operation) break; } - case Offer::Operation::GROW_VOLUME: + case Offer::Operation::GROW_VOLUME: { + // TODO(mpark): Once we perform a sanity check validation for + // offer operations as specified in MESOS-7760, this should no + // longer have to be handled in this function. + if (!operation->has_grow_volume()) { + return Error( + "A GROW_VOLUME operation must have" + " the Offer.Operation.grow_volume field set"); + } + + Option<Error> error = Resources::validate( + operation->grow_volume().volume()); + + if (error.isSome()) { + return error; + } + + error = Resources::validate(operation->grow_volume().addition()); + + if (error.isSome()) { + return error; + } + + break; + } case Offer::Operation::SHRINK_VOLUME: { - UNIMPLEMENTED; + // TODO(mpark): Once we perform a sanity check validation for + // offer operations as specified in MESOS-7760, this should no + // longer have to be handled in this function. + if (!operation->has_shrink_volume()) { + return Error( + "A SHRINK_VOLUME offer operation must have" + " the Offer.Operation.shrink_volume field set"); + } + + Option<Error> error = Resources::validate( + operation->shrink_volume().volume()); + + if (error.isSome()) { + return error; + } + + break; } case Offer::Operation::LAUNCH: { // TODO(mpark): Once we perform a sanity check validation for http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 56cf61f..b9946b5 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -69,7 +69,6 @@ #include <stout/option.hpp> #include <stout/path.hpp> #include <stout/stringify.hpp> -#include <stout/unimplemented.hpp> #include <stout/unreachable.hpp> #include <stout/utils.hpp> #include <stout/uuid.hpp> @@ -4107,6 +4106,8 @@ void Master::accept( case Offer::Operation::UNRESERVE: case Offer::Operation::CREATE: case Offer::Operation::DESTROY: + case Offer::Operation::GROW_VOLUME: + case Offer::Operation::SHRINK_VOLUME: case Offer::Operation::CREATE_VOLUME: case Offer::Operation::DESTROY_VOLUME: case Offer::Operation::CREATE_BLOCK: @@ -4133,10 +4134,6 @@ void Master::accept( break; } - case Offer::Operation::GROW_VOLUME: - case Offer::Operation::SHRINK_VOLUME: { - UNIMPLEMENTED; - } case Offer::Operation::UNKNOWN: { LOG(WARNING) << "Ignoring unknown operation"; break; @@ -4172,6 +4169,8 @@ void Master::accept( case Offer::Operation::UNRESERVE: case Offer::Operation::CREATE: case Offer::Operation::DESTROY: + case Offer::Operation::GROW_VOLUME: + case Offer::Operation::SHRINK_VOLUME: case Offer::Operation::CREATE_VOLUME: case Offer::Operation::DESTROY_VOLUME: case Offer::Operation::CREATE_BLOCK: @@ -4208,10 +4207,6 @@ void Master::accept( accept.add_operations()->CopyFrom(operation); break; } - case Offer::Operation::GROW_VOLUME: - case Offer::Operation::SHRINK_VOLUME: { - UNIMPLEMENTED; - } case Offer::Operation::UNKNOWN: { LOG(WARNING) << "Ignoring unknown operation"; break; @@ -4241,6 +4236,8 @@ void Master::accept( case Offer::Operation::UNRESERVE: case Offer::Operation::CREATE: case Offer::Operation::DESTROY: + case Offer::Operation::GROW_VOLUME: + case Offer::Operation::SHRINK_VOLUME: case Offer::Operation::CREATE_VOLUME: case Offer::Operation::DESTROY_VOLUME: case Offer::Operation::CREATE_BLOCK: @@ -4248,10 +4245,6 @@ void Master::accept( // No-op. break; } - case Offer::Operation::GROW_VOLUME: - case Offer::Operation::SHRINK_VOLUME: { - UNIMPLEMENTED; - } case Offer::Operation::LAUNCH: { foreach ( TaskInfo& task, *operation.mutable_launch()->mutable_task_infos()) { @@ -4416,9 +4409,14 @@ void Master::accept( break; } - case Offer::Operation::GROW_VOLUME: + case Offer::Operation::GROW_VOLUME: { + // TODO(zhitao): Add support for authorization of grow volume. + break; + } + case Offer::Operation::SHRINK_VOLUME: { - UNIMPLEMENTED; + // TODO(zhitao): Add support for authorization of shrink volume. + break; } case Offer::Operation::CREATE_VOLUME: { @@ -4563,6 +4561,12 @@ void Master::_accept( // launched, we remove its resource from offered resources. Resources _offeredResources = offeredResources; + // Converted resources from volume resizes. These converted resources are not + // put into `_offeredResources`, so no other operations can consume them. + // TODO(zhitao): This will be unnecessary once `GROW_VOLUME` and + // `SHRINK_VOLUME` become non-speculative. + Resources resizedResources; + // We keep track of the shared resources from the offers separately. // `offeredSharedResources` can be modified by CREATE/DESTROY but we // don't remove from it when a task is successfully launched so this @@ -4902,9 +4906,124 @@ void Master::_accept( break; } - case Offer::Operation::GROW_VOLUME: + case Offer::Operation::GROW_VOLUME: { + // TODO(zhitao): Authorize GROW_VOLUME from `authorizations`. + + // Make sure this grow volume operation is valid. + Option<Error> error = validation::operation::validate( + operation.grow_volume(), slave->capabilities); + + if (error.isSome()) { + drop( + framework, + operation, + error->message + "; on agent " + stringify(*slave)); + continue; + } + + // TODO(zhitao): Convert this operation to non-speculative once we can + // support that in the operator API. + Try<vector<ResourceConversion>> _conversions = + getResourceConversions(operation); + + if (_conversions.isError()) { + drop(framework, operation, _conversions.error()); + continue; + } + + CHECK_EQ(1u, _conversions->size()); + const Resources& consumed = _conversions->at(0).consumed; + const Resources& converted = _conversions->at(0).converted; + + if (!_offeredResources.contains(consumed)) { + drop( + framework, + operation, + "Invalid GROW_VOLUME operation: " + + stringify(_offeredResources) + " does not contain " + + stringify(consumed)); + + continue; + } + + _offeredResources -= consumed; + resizedResources += converted; + + LOG(INFO) << "Processing GROW_VOLUME operation for volume " + << operation.grow_volume().volume() + << " with additional resource " + << operation.grow_volume().addition() + << " from framework " + << *framework << " on agent " << *slave; + + _apply(slave, framework, operation); + + conversions.insert( + conversions.end(), + _conversions->begin(), + _conversions->end()); + + break; + } + case Offer::Operation::SHRINK_VOLUME: { - UNIMPLEMENTED; + // TODO(zhitao): Authorize SHRINK_VOLUME from `authorizations`. + + // Make sure this shrink volume operation is valid. + Option<Error> error = validation::operation::validate( + operation.shrink_volume(), slave->capabilities); + + if (error.isSome()) { + drop( + framework, + operation, + error->message + "; on agent " + stringify(*slave)); + continue; + } + + // TODO(zhitao): Convert this operation to non-speculative once we can + // support that in the operator API. + Try<vector<ResourceConversion>> _conversions = + getResourceConversions(operation); + + if (_conversions.isError()) { + drop(framework, operation, _conversions.error()); + continue; + } + + CHECK_EQ(1u, _conversions->size()); + const Resources& consumed = _conversions->at(0).consumed; + const Resources& converted = _conversions->at(0).converted; + + if (!_offeredResources.contains(consumed)) { + drop( + framework, + operation, + "Invalid SHRINK_VOLUME operation: " + + stringify(_offeredResources) + " does not contain " + + stringify(consumed)); + + continue; + } + + _offeredResources -= consumed; + resizedResources += converted; + + LOG(INFO) << "Processing SHRINK_VOLUME operation for volume " + << operation.shrink_volume().volume() + << " subtracting scalar value " + << operation.shrink_volume().subtract() + << " from framework " + << *framework << " on agent " << *slave; + + _apply(slave, framework, operation); + + conversions.insert( + conversions.end(), + _conversions->begin(), + _conversions->end()); + + break; } case Offer::Operation::LAUNCH: { @@ -5500,12 +5619,15 @@ void Master::_accept( conversions); } - if (!_offeredResources.empty()) { + + // TODO(zhitao): Remove `resizedResources` once `GROW_VOLUME` and + // `SHRINK_VOLUME` become non-speculative. + if (!_offeredResources.empty() || !resizedResources.empty()) { // Tell the allocator about the unused (e.g., refused) resources. allocator->recoverResources( frameworkId, slaveId, - _offeredResources, + _offeredResources + resizedResources, accept.filters()); } } http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/master/validation.cpp ---------------------------------------------------------------------- diff --git a/src/master/validation.cpp b/src/master/validation.cpp index 15dfa8a..74ed171 100644 --- a/src/master/validation.cpp +++ b/src/master/validation.cpp @@ -2342,6 +2342,135 @@ Option<Error> validate( } +Option<Error> validate( + const Offer::Operation::GrowVolume& growVolume, + const protobuf::slave::Capabilities& agentCapabilities) +{ + Option<Error> error = Resources::validate(growVolume.volume()); + if (error.isSome()) { + return Error( + "Invalid resource in the 'GrowVolume.volume' field: " + + error->message); + } + + error = Resources::validate(growVolume.addition()); + if (error.isSome()) { + return Error( + "Invalid resource in the 'GrowVolume.addition' field: " + + error->message); + } + + Value::Scalar zero; + zero.set_value(0); + + // The `Scalar` comparison contains a fixed-point conversion. + if (growVolume.addition().scalar() <= zero) { + return Error( + "The size of 'GrowVolume.addition' field must be greater than zero"); + } + + if (Resources::hasResourceProvider(growVolume.volume())) { + return Error("Growing a volume from a resource provider is not supported"); + } + + error = resource::validatePersistentVolume(Resources(growVolume.volume())); + if (error.isSome()) { + return Error( + "Invalid persistent volume in the 'GrowVolume.volume' field: " + + error->message); + } + + if (growVolume.volume().has_shared()) { + return Error("Growing a shared persistent volume is not supported"); + } + + // TODO(zhitao): Move this to a helper function + // `Resources::stripPersistentVolume`. + Resource stripped = growVolume.volume(); + + if (stripped.disk().has_source()) { + // PATH/MOUNT disk. + stripped.mutable_disk()->clear_persistence(); + stripped.mutable_disk()->clear_volume(); + } else { + // ROOT disk. + stripped.clear_disk(); + } + + if ((Resources(stripped) + growVolume.addition()).size() != 1) { + return Error( + "Incompatible resources in the 'GrowVolume.volume' and " + "'GrowVolume.addition' fields"); + } + + if (!agentCapabilities.resizeVolume) { + return Error( + "Volume " + stringify(growVolume.volume()) + + " cannot be grown on an agent without RESIZE_VOLUME capability"); + } + + return None(); +} + + +Option<Error> validate( + const Offer::Operation::ShrinkVolume& shrinkVolume, + const protobuf::slave::Capabilities& agentCapabilities) +{ + Option<Error> error = Resources::validate(shrinkVolume.volume()); + if (error.isSome()) { + return Error( + "Invalid resource in the 'ShrinkVolume.volume' field: " + + error->message); + } + + Value::Scalar zero; + zero.set_value(0); + + // The `Scalar` comparison contains a fixed-point conversion. + if (shrinkVolume.subtract() <= zero) { + return Error( + "Value of 'ShrinkVolume.subtract' must be greater than zero"); + } + + if (shrinkVolume.volume().scalar() <= shrinkVolume.subtract()) { + return Error( + "Value of 'ShrinkVolume.subtract' must be smaller than the size " + "of 'ShrinkVolume.volume'"); + } + + if (Resources::hasResourceProvider(shrinkVolume.volume())) { + return Error( + "Shrinking a volume from a resource provider is not supported"); + } + + if (shrinkVolume.volume().disk().source().type() == + Resource::DiskInfo::Source::MOUNT) { + return Error( + "Shrinking a volume on a MOUNT disk is not supported"); + } + + error = resource::validatePersistentVolume(Resources(shrinkVolume.volume())); + if (error.isSome()) { + return Error( + "Invalid persistent volume in the 'ShrinkVolume.volume' field: " + + error->message); + } + + if (shrinkVolume.volume().has_shared()) { + return Error("Shrinking a shared persistent volume is not supported"); + } + + if (!agentCapabilities.resizeVolume) { + return Error( + "Volume " + stringify(shrinkVolume.volume()) + + " cannot be shrunk on an agent without RESIZE_VOLUME capability"); + } + + return None(); +} + + Option<Error> validate(const Offer::Operation::CreateVolume& createVolume) { const Resource& source = createVolume.source(); http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/master/validation.hpp ---------------------------------------------------------------------- diff --git a/src/master/validation.hpp b/src/master/validation.hpp index c1ab754..1ba6d65 100644 --- a/src/master/validation.hpp +++ b/src/master/validation.hpp @@ -302,6 +302,16 @@ Option<Error> validate( const Option<FrameworkInfo>& frameworkInfo = None()); +Option<Error> validate( + const Offer::Operation::GrowVolume& growVolume, + const protobuf::slave::Capabilities& agentCapabilities); + + +Option<Error> validate( + const Offer::Operation::ShrinkVolume& shrinkVolume, + const protobuf::slave::Capabilities& agentCapabilities); + + Option<Error> validate(const Offer::Operation::CreateVolume& createVolume); Option<Error> validate(const Offer::Operation::DestroyVolume& destroyVolume); Option<Error> validate(const Offer::Operation::CreateBlock& createBlock);