Implemented grow and shrink of persistent volumes.

The new offer operations are implemented as speculative operations, but
we will use validation to make them non-speculative on API level so that
we can transition later without a breaking change.

Review: https://reviews.apache.org/r/66050/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8251087b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8251087b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8251087b

Branch: refs/heads/master
Commit: 8251087b041ebdfd70ad5aa1e0296e139d4663f0
Parents: 9f2b497
Author: Zhitao Li <zhitaoli...@gmail.com>
Authored: Thu May 3 17:04:36 2018 -0700
Committer: Chun-Hung Hsiao <chhs...@mesosphere.io>
Committed: Thu May 3 17:04:36 2018 -0700

----------------------------------------------------------------------
 src/common/protobuf_utils.cpp  |  49 +++++++----
 src/common/resources_utils.cpp | 104 +++++++++++++++++++++--
 src/master/master.cpp          | 160 +++++++++++++++++++++++++++++++-----
 src/master/validation.cpp      | 129 +++++++++++++++++++++++++++++
 src/master/validation.hpp      |  10 +++
 5 files changed, 411 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index c5d873c..82ba141 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -37,7 +37,6 @@
 #include <stout/foreach.hpp>
 #include <stout/net.hpp>
 #include <stout/stringify.hpp>
-#include <stout/unimplemented.hpp>
 #include <stout/unreachable.hpp>
 #include <stout/uuid.hpp>
 
@@ -708,9 +707,24 @@ void injectAllocationInfo(
       break;
     }
 
-    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::GROW_VOLUME: {
+      inject(
+          *operation->mutable_grow_volume()->mutable_volume(),
+          allocationInfo);
+
+      inject(
+          *operation->mutable_grow_volume()->mutable_addition(),
+          allocationInfo);
+
+      break;
+    }
+
     case Offer::Operation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
+      inject(
+          *operation->mutable_shrink_volume()->mutable_volume(),
+          allocationInfo);
+
+      break;
     }
 
     case Offer::Operation::CREATE_VOLUME: {
@@ -828,9 +842,17 @@ void stripAllocationInfo(Offer::Operation* operation)
       break;
     }
 
-    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::GROW_VOLUME: {
+      strip(*operation->mutable_grow_volume()->mutable_volume());
+      strip(*operation->mutable_grow_volume()->mutable_addition());
+
+      break;
+    }
+
     case Offer::Operation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
+      strip(*operation->mutable_shrink_volume()->mutable_volume());
+
+      break;
     }
 
     case Offer::Operation::CREATE_VOLUME: {
@@ -877,11 +899,12 @@ bool isSpeculativeOperation(const Offer::Operation& 
operation)
     case Offer::Operation::UNRESERVE:
     case Offer::Operation::CREATE:
     case Offer::Operation::DESTROY:
-      return true;
+    // TODO(zhitao): Convert `GROW_VOLUME` and `SHRINK_VOLUME` to
+    // non-speculative operations once we can support non-speculative operator
+    // API.
     case Offer::Operation::GROW_VOLUME:
-    case Offer::Operation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
-    }
+    case Offer::Operation::SHRINK_VOLUME:
+      return true;
     case Offer::Operation::UNKNOWN:
       UNREACHABLE();
   }
@@ -1020,7 +1043,9 @@ Try<Resources> getConsumedResources(const 
Offer::Operation& operation)
     case Offer::Operation::RESERVE:
     case Offer::Operation::UNRESERVE:
     case Offer::Operation::CREATE:
-    case Offer::Operation::DESTROY: {
+    case Offer::Operation::DESTROY:
+    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::SHRINK_VOLUME: {
       Try<vector<ResourceConversion>> conversions =
         getResourceConversions(operation);
 
@@ -1035,10 +1060,6 @@ Try<Resources> getConsumedResources(const 
Offer::Operation& operation)
 
       return consumed;
     }
-    case Offer::Operation::GROW_VOLUME:
-    case Offer::Operation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
-    }
     case Offer::Operation::LAUNCH:
     case Offer::Operation::LAUNCH_GROUP:
       // TODO(bbannier): Consider adding support for 'LAUNCH' and

http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index 6f56026..eb72995 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -16,7 +16,6 @@
 
 #include <stout/foreach.hpp>
 #include <stout/stringify.hpp>
-#include <stout/unimplemented.hpp>
 
 #include "common/resources_utils.hpp"
 
@@ -196,9 +195,56 @@ Try<vector<TResourceConversion>> getResourceConversions(
       }
       break;
     }
-    case TOperation::GROW_VOLUME:
+
+    case TOperation::GROW_VOLUME: {
+      const TResource& volume = operation.grow_volume().volume();
+      const TResource& addition = operation.grow_volume().addition();
+
+      if (TResources::hasResourceProvider(volume)) {
+        return Error("Operation not supported for resource provider");
+      }
+
+      // To grow a persistent volume, we consume the original volume and the
+      // additional resource and convert into a single volume with the new 
size.
+      TResource converted = volume;
+      *converted.mutable_scalar() += addition.scalar();
+
+      conversions.emplace_back(TResources(volume) + addition, converted);
+      break;
+    }
+
     case TOperation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
+      const TResource& volume = operation.shrink_volume().volume();
+
+      if (TResources::hasResourceProvider(volume)) {
+        return Error("Operation not supported for resource provider");
+      }
+
+      // To shrink a persistent volume, we consume the original volume and
+      // convert into a new volume with reduced size and a freed disk resource
+      // without persistent volume info.
+      TResource freed = volume;
+
+      *freed.mutable_scalar() = operation.shrink_volume().subtract();
+
+      // TODO(zhitao): Move this to helper function
+      // `Resources::stripPersistentVolume`.
+      if (freed.disk().has_source()) {
+        freed.mutable_disk()->clear_persistence();
+        freed.mutable_disk()->clear_volume();
+      } else {
+        freed.clear_disk();
+      }
+
+      // Since we only allow persistent volumes to be shared, the
+      // freed resource must be non-shared.
+      freed.clear_shared();
+
+      TResource shrunk = volume;
+      *shrunk.mutable_scalar() -= operation.shrink_volume().subtract();
+
+      conversions.emplace_back(volume, TResources(shrunk) + freed);
+      break;
     }
   }
 
@@ -265,9 +311,11 @@ Result<ResourceProviderID> getResourceProviderId(
       resource = operation.destroy().volumes(0);
       break;
     case Offer::Operation::GROW_VOLUME:
-    case Offer::Operation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
-    }
+      resource = operation.grow_volume().volume();
+      break;
+    case Offer::Operation::SHRINK_VOLUME:
+      resource = operation.shrink_volume().volume();
+      break;
     case Offer::Operation::CREATE_VOLUME:
       resource = operation.create_volume().source();
       break;
@@ -640,9 +688,49 @@ Option<Error> 
validateAndUpgradeResources(Offer::Operation* operation)
 
       break;
     }
-    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::GROW_VOLUME: {
+      // TODO(mpark): Once we perform a sanity check validation for
+      // offer operations as specified in MESOS-7760, this should no
+      // longer have to be handled in this function.
+      if (!operation->has_grow_volume()) {
+        return Error(
+            "A GROW_VOLUME operation must have"
+            " the Offer.Operation.grow_volume field set");
+      }
+
+      Option<Error> error = Resources::validate(
+          operation->grow_volume().volume());
+
+      if (error.isSome()) {
+        return error;
+      }
+
+      error = Resources::validate(operation->grow_volume().addition());
+
+      if (error.isSome()) {
+        return error;
+      }
+
+      break;
+    }
     case Offer::Operation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
+      // TODO(mpark): Once we perform a sanity check validation for
+      // offer operations as specified in MESOS-7760, this should no
+      // longer have to be handled in this function.
+      if (!operation->has_shrink_volume()) {
+        return Error(
+            "A SHRINK_VOLUME offer operation must have"
+            " the Offer.Operation.shrink_volume field set");
+      }
+
+      Option<Error> error = Resources::validate(
+          operation->shrink_volume().volume());
+
+      if (error.isSome()) {
+        return error;
+      }
+
+      break;
     }
     case Offer::Operation::LAUNCH: {
       // TODO(mpark): Once we perform a sanity check validation for

http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 56cf61f..b9946b5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -69,7 +69,6 @@
 #include <stout/option.hpp>
 #include <stout/path.hpp>
 #include <stout/stringify.hpp>
-#include <stout/unimplemented.hpp>
 #include <stout/unreachable.hpp>
 #include <stout/utils.hpp>
 #include <stout/uuid.hpp>
@@ -4107,6 +4106,8 @@ void Master::accept(
           case Offer::Operation::UNRESERVE:
           case Offer::Operation::CREATE:
           case Offer::Operation::DESTROY:
+          case Offer::Operation::GROW_VOLUME:
+          case Offer::Operation::SHRINK_VOLUME:
           case Offer::Operation::CREATE_VOLUME:
           case Offer::Operation::DESTROY_VOLUME:
           case Offer::Operation::CREATE_BLOCK:
@@ -4133,10 +4134,6 @@ void Master::accept(
 
             break;
           }
-          case Offer::Operation::GROW_VOLUME:
-          case Offer::Operation::SHRINK_VOLUME: {
-            UNIMPLEMENTED;
-          }
           case Offer::Operation::UNKNOWN: {
             LOG(WARNING) << "Ignoring unknown operation";
             break;
@@ -4172,6 +4169,8 @@ void Master::accept(
           case Offer::Operation::UNRESERVE:
           case Offer::Operation::CREATE:
           case Offer::Operation::DESTROY:
+          case Offer::Operation::GROW_VOLUME:
+          case Offer::Operation::SHRINK_VOLUME:
           case Offer::Operation::CREATE_VOLUME:
           case Offer::Operation::DESTROY_VOLUME:
           case Offer::Operation::CREATE_BLOCK:
@@ -4208,10 +4207,6 @@ void Master::accept(
             accept.add_operations()->CopyFrom(operation);
             break;
           }
-          case Offer::Operation::GROW_VOLUME:
-          case Offer::Operation::SHRINK_VOLUME: {
-            UNIMPLEMENTED;
-          }
           case Offer::Operation::UNKNOWN: {
             LOG(WARNING) << "Ignoring unknown operation";
             break;
@@ -4241,6 +4236,8 @@ void Master::accept(
       case Offer::Operation::UNRESERVE:
       case Offer::Operation::CREATE:
       case Offer::Operation::DESTROY:
+      case Offer::Operation::GROW_VOLUME:
+      case Offer::Operation::SHRINK_VOLUME:
       case Offer::Operation::CREATE_VOLUME:
       case Offer::Operation::DESTROY_VOLUME:
       case Offer::Operation::CREATE_BLOCK:
@@ -4248,10 +4245,6 @@ void Master::accept(
         // No-op.
         break;
       }
-      case Offer::Operation::GROW_VOLUME:
-      case Offer::Operation::SHRINK_VOLUME: {
-        UNIMPLEMENTED;
-      }
       case Offer::Operation::LAUNCH: {
         foreach (
             TaskInfo& task, *operation.mutable_launch()->mutable_task_infos()) 
{
@@ -4416,9 +4409,14 @@ void Master::accept(
         break;
       }
 
-      case Offer::Operation::GROW_VOLUME:
+      case Offer::Operation::GROW_VOLUME: {
+        // TODO(zhitao): Add support for authorization of grow volume.
+        break;
+      }
+
       case Offer::Operation::SHRINK_VOLUME: {
-        UNIMPLEMENTED;
+        // TODO(zhitao): Add support for authorization of shrink volume.
+        break;
       }
 
       case Offer::Operation::CREATE_VOLUME: {
@@ -4563,6 +4561,12 @@ void Master::_accept(
   // launched, we remove its resource from offered resources.
   Resources _offeredResources = offeredResources;
 
+  // Converted resources from volume resizes. These converted resources are not
+  // put into `_offeredResources`, so no other operations can consume them.
+  // TODO(zhitao): This will be unnecessary once `GROW_VOLUME` and
+  // `SHRINK_VOLUME` become non-speculative.
+  Resources resizedResources;
+
   // We keep track of the shared resources from the offers separately.
   // `offeredSharedResources` can be modified by CREATE/DESTROY but we
   // don't remove from it when a task is successfully launched so this
@@ -4902,9 +4906,124 @@ void Master::_accept(
         break;
       }
 
-      case Offer::Operation::GROW_VOLUME:
+      case Offer::Operation::GROW_VOLUME: {
+        // TODO(zhitao): Authorize GROW_VOLUME from `authorizations`.
+
+        // Make sure this grow volume operation is valid.
+        Option<Error> error = validation::operation::validate(
+            operation.grow_volume(), slave->capabilities);
+
+        if (error.isSome()) {
+          drop(
+              framework,
+              operation,
+              error->message + "; on agent " + stringify(*slave));
+          continue;
+        }
+
+        // TODO(zhitao): Convert this operation to non-speculative once we can
+        // support that in the operator API.
+        Try<vector<ResourceConversion>> _conversions =
+          getResourceConversions(operation);
+
+        if (_conversions.isError()) {
+          drop(framework, operation, _conversions.error());
+          continue;
+        }
+
+        CHECK_EQ(1u, _conversions->size());
+        const Resources& consumed = _conversions->at(0).consumed;
+        const Resources& converted = _conversions->at(0).converted;
+
+        if (!_offeredResources.contains(consumed)) {
+          drop(
+              framework,
+              operation,
+              "Invalid GROW_VOLUME operation: " +
+              stringify(_offeredResources) + " does not contain " +
+              stringify(consumed));
+
+          continue;
+        }
+
+        _offeredResources -= consumed;
+        resizedResources += converted;
+
+        LOG(INFO) << "Processing GROW_VOLUME operation for volume "
+                  << operation.grow_volume().volume()
+                  << " with additional resource "
+                  << operation.grow_volume().addition()
+                  << " from framework "
+                  << *framework << " on agent " << *slave;
+
+        _apply(slave, framework, operation);
+
+        conversions.insert(
+            conversions.end(),
+            _conversions->begin(),
+            _conversions->end());
+
+        break;
+      }
+
       case Offer::Operation::SHRINK_VOLUME: {
-        UNIMPLEMENTED;
+        // TODO(zhitao): Authorize SHRINK_VOLUME from `authorizations`.
+
+        // Make sure this shrink volume operation is valid.
+        Option<Error> error = validation::operation::validate(
+            operation.shrink_volume(), slave->capabilities);
+
+        if (error.isSome()) {
+          drop(
+              framework,
+              operation,
+              error->message + "; on agent " + stringify(*slave));
+          continue;
+        }
+
+        // TODO(zhitao): Convert this operation to non-speculative once we can
+        // support that in the operator API.
+        Try<vector<ResourceConversion>> _conversions =
+          getResourceConversions(operation);
+
+        if (_conversions.isError()) {
+          drop(framework, operation, _conversions.error());
+          continue;
+        }
+
+        CHECK_EQ(1u, _conversions->size());
+        const Resources& consumed = _conversions->at(0).consumed;
+        const Resources& converted = _conversions->at(0).converted;
+
+        if (!_offeredResources.contains(consumed)) {
+          drop(
+              framework,
+              operation,
+              "Invalid SHRINK_VOLUME operation: " +
+              stringify(_offeredResources) + " does not contain " +
+              stringify(consumed));
+
+          continue;
+        }
+
+        _offeredResources -= consumed;
+        resizedResources += converted;
+
+        LOG(INFO) << "Processing SHRINK_VOLUME operation for volume "
+                  << operation.shrink_volume().volume()
+                  << " subtracting scalar value "
+                  << operation.shrink_volume().subtract()
+                  << " from framework "
+                  << *framework << " on agent " << *slave;
+
+        _apply(slave, framework, operation);
+
+        conversions.insert(
+            conversions.end(),
+            _conversions->begin(),
+            _conversions->end());
+
+        break;
       }
 
       case Offer::Operation::LAUNCH: {
@@ -5500,12 +5619,15 @@ void Master::_accept(
         conversions);
   }
 
-  if (!_offeredResources.empty()) {
+
+  // TODO(zhitao): Remove `resizedResources` once `GROW_VOLUME` and
+  // `SHRINK_VOLUME` become non-speculative.
+  if (!_offeredResources.empty() || !resizedResources.empty()) {
     // Tell the allocator about the unused (e.g., refused) resources.
     allocator->recoverResources(
         frameworkId,
         slaveId,
-        _offeredResources,
+        _offeredResources + resizedResources,
         accept.filters());
   }
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 15dfa8a..74ed171 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -2342,6 +2342,135 @@ Option<Error> validate(
 }
 
 
+Option<Error> validate(
+    const Offer::Operation::GrowVolume& growVolume,
+    const protobuf::slave::Capabilities& agentCapabilities)
+{
+  Option<Error> error = Resources::validate(growVolume.volume());
+  if (error.isSome()) {
+    return Error(
+        "Invalid resource in the 'GrowVolume.volume' field: " +
+        error->message);
+  }
+
+  error = Resources::validate(growVolume.addition());
+  if (error.isSome()) {
+    return Error(
+        "Invalid resource in the 'GrowVolume.addition' field: " +
+        error->message);
+  }
+
+  Value::Scalar zero;
+  zero.set_value(0);
+
+  // The `Scalar` comparison contains a fixed-point conversion.
+  if (growVolume.addition().scalar() <= zero) {
+    return Error(
+        "The size of 'GrowVolume.addition' field must be greater than zero");
+  }
+
+  if (Resources::hasResourceProvider(growVolume.volume())) {
+    return Error("Growing a volume from a resource provider is not supported");
+  }
+
+  error = resource::validatePersistentVolume(Resources(growVolume.volume()));
+  if (error.isSome()) {
+    return Error(
+        "Invalid persistent volume in the 'GrowVolume.volume' field: " +
+        error->message);
+  }
+
+  if (growVolume.volume().has_shared()) {
+    return Error("Growing a shared persistent volume is not supported");
+  }
+
+  // TODO(zhitao): Move this to a helper function
+  // `Resources::stripPersistentVolume`.
+  Resource stripped = growVolume.volume();
+
+  if (stripped.disk().has_source()) {
+    // PATH/MOUNT disk.
+    stripped.mutable_disk()->clear_persistence();
+    stripped.mutable_disk()->clear_volume();
+  } else {
+    // ROOT disk.
+    stripped.clear_disk();
+  }
+
+  if ((Resources(stripped) + growVolume.addition()).size() != 1) {
+    return Error(
+        "Incompatible resources in the 'GrowVolume.volume' and "
+        "'GrowVolume.addition' fields");
+  }
+
+  if (!agentCapabilities.resizeVolume) {
+    return Error(
+        "Volume " + stringify(growVolume.volume()) +
+        " cannot be grown on an agent without RESIZE_VOLUME capability");
+  }
+
+  return None();
+}
+
+
+Option<Error> validate(
+    const Offer::Operation::ShrinkVolume& shrinkVolume,
+    const protobuf::slave::Capabilities& agentCapabilities)
+{
+  Option<Error> error = Resources::validate(shrinkVolume.volume());
+  if (error.isSome()) {
+    return Error(
+        "Invalid resource in the 'ShrinkVolume.volume' field: " +
+        error->message);
+  }
+
+  Value::Scalar zero;
+  zero.set_value(0);
+
+  // The `Scalar` comparison contains a fixed-point conversion.
+  if (shrinkVolume.subtract() <= zero) {
+    return Error(
+        "Value of 'ShrinkVolume.subtract' must be greater than zero");
+  }
+
+  if (shrinkVolume.volume().scalar() <= shrinkVolume.subtract()) {
+    return Error(
+        "Value of 'ShrinkVolume.subtract' must be smaller than the size "
+        "of 'ShrinkVolume.volume'");
+  }
+
+  if (Resources::hasResourceProvider(shrinkVolume.volume())) {
+    return Error(
+        "Shrinking a volume from a resource provider is not supported");
+  }
+
+  if (shrinkVolume.volume().disk().source().type() ==
+      Resource::DiskInfo::Source::MOUNT) {
+    return Error(
+        "Shrinking a volume on a MOUNT disk is not supported");
+  }
+
+  error = resource::validatePersistentVolume(Resources(shrinkVolume.volume()));
+  if (error.isSome()) {
+    return Error(
+        "Invalid persistent volume in the 'ShrinkVolume.volume' field: " +
+        error->message);
+  }
+
+  if (shrinkVolume.volume().has_shared()) {
+    return Error("Shrinking a shared persistent volume is not supported");
+  }
+
+  if (!agentCapabilities.resizeVolume) {
+    return Error(
+        "Volume " + stringify(shrinkVolume.volume()) +
+        " cannot be shrunk on an agent without RESIZE_VOLUME capability");
+  }
+
+  return None();
+}
+
+
 Option<Error> validate(const Offer::Operation::CreateVolume& createVolume)
 {
   const Resource& source = createVolume.source();

http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index c1ab754..1ba6d65 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -302,6 +302,16 @@ Option<Error> validate(
     const Option<FrameworkInfo>& frameworkInfo = None());
 
 
+Option<Error> validate(
+    const Offer::Operation::GrowVolume& growVolume,
+    const protobuf::slave::Capabilities& agentCapabilities);
+
+
+Option<Error> validate(
+    const Offer::Operation::ShrinkVolume& shrinkVolume,
+    const protobuf::slave::Capabilities& agentCapabilities);
+
+
 Option<Error> validate(const Offer::Operation::CreateVolume& createVolume);
 Option<Error> validate(const Offer::Operation::DestroyVolume& destroyVolume);
 Option<Error> validate(const Offer::Operation::CreateBlock& createBlock);

Reply via email to