This is an automated email from the ASF dual-hosted git repository.

asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit c8f8fbd328235fcd7aa1b32e079fb6f9a424b881
Author: Andrei Sekretenko <asekrete...@apache.org>
AuthorDate: Fri Sep 4 20:23:25 2020 +0200

    Added `OfferConstraints` to `struct Framework` in the master.
    
    This is a prerequisite for exposing framework's offer constraints
    via master API endpoints.
    
    Review: https://reviews.apache.org/r/72839
---
 src/master/framework.cpp | 41 +++++++++++++++++++++++----
 src/master/master.cpp    | 74 +++++++++++++++++++++++++++++++++++++-----------
 src/master/master.hpp    | 18 +++++++++++-
 3 files changed, 110 insertions(+), 23 deletions(-)

diff --git a/src/master/framework.cpp b/src/master/framework.cpp
index e9e2d20..980828e 100644
--- a/src/master/framework.cpp
+++ b/src/master/framework.cpp
@@ -25,6 +25,7 @@ using process::Owned;
 using process::http::authentication::Principal;
 
 using mesos::authorization::ActionObject;
+using mesos::scheduler::OfferConstraints;
 
 namespace mesos {
 namespace internal {
@@ -34,10 +35,19 @@ Framework::Framework(
     Master* const master,
     const Flags& masterFlags,
     const FrameworkInfo& info,
+    Option<OfferConstraints>&& offerConstraints,
     const process::UPID& pid,
     const Owned<ObjectApprovers>& approvers,
     const process::Time& time)
-  : Framework(master, masterFlags, info, CONNECTED, true, approvers, time)
+  : Framework(
+        master,
+        masterFlags,
+        info,
+        std::move(offerConstraints),
+        CONNECTED,
+        true,
+        approvers,
+        time)
 {
   pid_ = pid;
 }
@@ -47,10 +57,19 @@ Framework::Framework(
     Master* const master,
     const Flags& masterFlags,
     const FrameworkInfo& info,
+    Option<OfferConstraints>&& offerConstraints,
     const StreamingHttpConnection<v1::scheduler::Event>& http,
     const Owned<ObjectApprovers>& approvers,
     const process::Time& time)
-  : Framework(master, masterFlags, info, CONNECTED, true, approvers, time)
+  : Framework(
+        master,
+        masterFlags,
+        info,
+        std::move(offerConstraints),
+        CONNECTED,
+        true,
+        approvers,
+        time)
 {
   http_ = http;
 }
@@ -61,7 +80,14 @@ Framework::Framework(
     const Flags& masterFlags,
     const FrameworkInfo& info)
   : Framework(
-      master, masterFlags, info, RECOVERED, false, nullptr, process::Time())
+        master,
+        masterFlags,
+        info,
+        None(),
+        RECOVERED,
+        false,
+        nullptr,
+        process::Time())
 {}
 
 
@@ -69,6 +95,7 @@ Framework::Framework(
     Master* const _master,
     const Flags& masterFlags,
     const FrameworkInfo& _info,
+    Option<OfferConstraints>&& offerConstraints,
     State state,
     bool active_,
     const Owned<ObjectApprovers>& approvers,
@@ -84,7 +111,8 @@ Framework::Framework(
     metrics(_info, masterFlags.publish_per_framework_metrics),
     active_(active_),
     state(state),
-    objectApprovers(approvers)
+    objectApprovers(approvers),
+    offerConstraints_(std::move(offerConstraints))
 {
   CHECK(_info.has_id());
 
@@ -508,7 +536,9 @@ void Framework::removeOperation(Operation* operation)
 }
 
 
-void Framework::update(const FrameworkInfo& newInfo)
+void Framework::update(
+    const FrameworkInfo& newInfo,
+    Option<OfferConstraints>&& offerConstraints)
 {
   // We only merge 'info' from the same framework 'id'.
   CHECK_EQ(info.id(), newInfo.id());
@@ -523,6 +553,7 @@ void Framework::update(const FrameworkInfo& newInfo)
   CHECK_EQ(info.checkpoint(), newInfo.checkpoint());
 
   info.CopyFrom(newInfo);
+  offerConstraints_ = std::move(offerConstraints);
 
   // Save the old list of roles for later.
   std::set<std::string> oldRoles = roles;
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 438ef98..54b24b4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -155,6 +155,8 @@ using mesos::master::contender::MasterContender;
 
 using mesos::master::detector::MasterDetector;
 
+using mesos::scheduler::OfferConstraints;
+
 static bool isValidFailoverTimeout(const FrameworkInfo& frameworkInfo);
 
 
@@ -2669,13 +2671,17 @@ void Master::subscribe(
   Option<Error> validationError =
     validateFramework(frameworkInfo, subscribe.suppressed_roles());
 
+  Option<OfferConstraints> offerConstraints;
+  if (subscribe.has_offer_constraints()) {
+    offerConstraints = std::move(*subscribe.mutable_offer_constraints());
+  }
+
   allocator::FrameworkOptions allocatorOptions;
 
   // TODO(asekretenko): Validate roles in offer constraints (see MESOS-10176).
-  if (validationError.isNone() && subscribe.has_offer_constraints()) {
+  if (validationError.isNone() && offerConstraints.isSome()) {
     Try<OfferConstraintsFilter> filter = OfferConstraintsFilter::create(
-        offerConstraintsFilterOptions,
-        std::move(*subscribe.mutable_offer_constraints()));
+        offerConstraintsFilterOptions, OfferConstraints(*offerConstraints));
 
     if (filter.isError()) {
       validationError = Error(std::move(filter.error()));
@@ -2705,6 +2711,7 @@ void Master::subscribe(
   void (Master::*_subscribe)(
       StreamingHttpConnection<v1::scheduler::Event>,
       FrameworkInfo&&,
+      Option<OfferConstraints>&&,
       bool,
       FrameworkOptions&&,
       const Future<Owned<ObjectApprovers>>&) = &Self::_subscribe;
@@ -2717,6 +2724,7 @@ void Master::subscribe(
       _subscribe,
       http,
       std::move(frameworkInfo),
+      std::move(offerConstraints),
       subscribe.force(),
       std::move(allocatorOptions),
       lambda::_1));
@@ -2726,6 +2734,7 @@ void Master::subscribe(
 void Master::_subscribe(
     StreamingHttpConnection<v1::scheduler::Event> http,
     FrameworkInfo&& frameworkInfo,
+    Option<OfferConstraints>&& offerConstraints,
     bool force,
     ::mesos::allocator::FrameworkOptions&& options,
     const Future<Owned<ObjectApprovers>>& objectApprovers)
@@ -2760,8 +2769,13 @@ void Master::_subscribe(
     FrameworkInfo frameworkInfo_ = frameworkInfo;
     frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId());
 
-    Framework* framework =
-      new Framework(this, flags, frameworkInfo_, http, objectApprovers.get());
+    Framework* framework = new Framework(
+        this,
+        flags,
+        frameworkInfo_,
+        std::move(offerConstraints),
+        http,
+        objectApprovers.get());
 
     addFramework(framework, std::move(options));
 
@@ -2823,7 +2837,11 @@ void Master::_subscribe(
 
   framework->metrics.incrementCall(scheduler::Call::SUBSCRIBE);
 
-  updateFramework(framework, frameworkInfo, std::move(options));
+  updateFramework(
+      framework,
+      frameworkInfo,
+      std::move(offerConstraints),
+      std::move(options));
 
   if (!framework->recovered()) {
     // The framework has previously been registered with this master;
@@ -2915,13 +2933,17 @@ void Master::subscribe(
     validationError = validateFrameworkAuthentication(frameworkInfo, from);
   }
 
+  Option<OfferConstraints> offerConstraints;
+  if (subscribe.has_offer_constraints()) {
+    offerConstraints = std::move(*subscribe.mutable_offer_constraints());
+  }
+
   allocator::FrameworkOptions allocatorOptions;
 
   // TODO(asekretenko): Validate roles in offer constraints (see MESOS-10176).
-  if (validationError.isNone() && subscribe.has_offer_constraints()) {
+  if (validationError.isNone() && offerConstraints.isSome()) {
     Try<OfferConstraintsFilter> filter = OfferConstraintsFilter::create(
-        offerConstraintsFilterOptions,
-        std::move(*subscribe.mutable_offer_constraints()));
+        offerConstraintsFilterOptions, OfferConstraints(*offerConstraints));
 
     if (filter.isError()) {
       validationError = Error(std::move(filter.error()));
@@ -2965,6 +2987,7 @@ void Master::subscribe(
   void (Master::*_subscribe)(
       const UPID&,
       FrameworkInfo&&,
+      Option<OfferConstraints>&&,
       bool,
       ::mesos::allocator::FrameworkOptions&&,
       const Future<Owned<ObjectApprovers>>&) = &Self::_subscribe;
@@ -2977,6 +3000,7 @@ void Master::subscribe(
       _subscribe,
       from,
       std::move(frameworkInfo),
+      std::move(offerConstraints),
       subscribe.force(),
       std::move(allocatorOptions),
       lambda::_1));
@@ -2986,6 +3010,7 @@ void Master::subscribe(
 void Master::_subscribe(
     const UPID& from,
     FrameworkInfo&& frameworkInfo,
+    Option<OfferConstraints>&& offerConstraints,
     bool force,
     ::mesos::allocator::FrameworkOptions&& options,
     const Future<Owned<ObjectApprovers>>& objectApprovers)
@@ -3050,8 +3075,13 @@ void Master::_subscribe(
     // Assign a new FrameworkID.
     frameworkInfo.mutable_id()->CopyFrom(newFrameworkId());
 
-    Framework* framework =
-      new Framework(this, flags, frameworkInfo, from, objectApprovers.get());
+    Framework* framework = new Framework(
+        this,
+        flags,
+        frameworkInfo,
+        std::move(offerConstraints),
+        from,
+        objectApprovers.get());
 
     addFramework(framework, std::move(options));
 
@@ -3138,7 +3168,11 @@ void Master::_subscribe(
   // It is now safe to update the framework fields since the request is now
   // guaranteed to be successful. We use the fields passed in during
   // re-registration.
-  updateFramework(framework, frameworkInfo, std::move(options));
+  updateFramework(
+      framework,
+      frameworkInfo,
+      std::move(offerConstraints),
+      std::move(options));
 
   if (!framework->recovered()) {
     // The framework has previously been registered with this master;
@@ -3258,20 +3292,22 @@ Future<process::http::Response> Master::updateFramework(
   const bool frameworkInfoChanged =
     !typeutils::equivalent(framework->info, call.framework_info());
 
+  Option<OfferConstraints> offerConstraints;
   allocator::FrameworkOptions allocatorOptions;
   if (call.has_offer_constraints()) {
     // TODO(asekretenko): Validate roles in offer constraints (see 
MESOS-10176).
     Try<OfferConstraintsFilter> filter = OfferConstraintsFilter::create(
         offerConstraintsFilterOptions,
-        std::move(*call.mutable_offer_constraints()));
+        OfferConstraints(call.offer_constraints()));
 
     if (filter.isError()) {
       return process::http::BadRequest(
           "'UpdateFramework.offer_constraints' are not valid: " +
           filter.error());
-    } else {
-      allocatorOptions.offerConstraintsFilter = std::move(*filter);
     }
+
+    allocatorOptions.offerConstraintsFilter = std::move(*filter);
+    offerConstraints = std::move(*call.mutable_offer_constraints());
   }
 
   ActionObject actionObject =
@@ -3296,7 +3332,10 @@ Future<process::http::Response> Master::updateFramework(
     make_move_iterator(call.mutable_suppressed_roles()->end()));
 
   updateFramework(
-      framework, call.framework_info(), std::move(allocatorOptions));
+      framework,
+      call.framework_info(),
+      std::move(offerConstraints),
+      std::move(allocatorOptions));
 
   if (frameworkInfoChanged) {
     // NOTE: Among the framework properties that can be changed by this call
@@ -7461,6 +7500,7 @@ void Master::unregisterSlave(const UPID& from, const 
SlaveID& slaveId)
 void Master::updateFramework(
     Framework* framework,
     const FrameworkInfo& frameworkInfo,
+    Option<OfferConstraints>&& offerConstraints,
     ::mesos::allocator::FrameworkOptions&& allocatorOptions)
 {
   LOG(INFO) << "Updating framework " << *framework << " with roles "
@@ -7479,7 +7519,7 @@ void Master::updateFramework(
     }
   }
 
-  framework->update(frameworkInfo);
+  framework->update(frameworkInfo, std::move(offerConstraints));
 }
 
 
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8c2f56a..95aca58 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -670,6 +670,7 @@ protected:
   void updateFramework(
       Framework* framework,
       const FrameworkInfo& frameworkInfo,
+      Option<::mesos::scheduler::OfferConstraints>&& offerConstraints,
       ::mesos::allocator::FrameworkOptions&& allocatorOptions);
 
   void sendFrameworkUpdates(const Framework& framework);
@@ -903,6 +904,7 @@ private:
   void _subscribe(
       StreamingHttpConnection<v1::scheduler::Event> http,
       FrameworkInfo&& frameworkInfo,
+      Option<scheduler::OfferConstraints>&& offerConstraints,
       bool force,
       ::mesos::allocator::FrameworkOptions&& allocatorOptions,
       const process::Future<process::Owned<ObjectApprovers>>& objectApprovers);
@@ -914,6 +916,7 @@ private:
   void _subscribe(
       const process::UPID& from,
       FrameworkInfo&& frameworkInfo,
+      Option<scheduler::OfferConstraints>&& offerConstraints,
       bool force,
       ::mesos::allocator::FrameworkOptions&& allocatorOptions,
       const process::Future<process::Owned<ObjectApprovers>>& objectApprovers);
@@ -2413,6 +2416,7 @@ struct Framework
       Master* const master,
       const Flags& masterFlags,
       const FrameworkInfo& info,
+      Option<::mesos::scheduler::OfferConstraints>&& offerConstraints,
       const process::UPID& _pid,
       const process::Owned<ObjectApprovers>& objectApprovers,
       const process::Time& time = process::Clock::now());
@@ -2420,6 +2424,7 @@ struct Framework
   Framework(Master* const master,
             const Flags& masterFlags,
             const FrameworkInfo& info,
+            Option<::mesos::scheduler::OfferConstraints>&& offerConstraints,
             const StreamingHttpConnection<v1::scheduler::Event>& _http,
             const process::Owned<ObjectApprovers>& objectApprovers,
             const process::Time& time = process::Clock::now());
@@ -2486,7 +2491,9 @@ struct Framework
   // Update fields in 'info' using those in 'newInfo'. Currently this
   // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname',
   // 'webui_url', 'capabilities', and 'labels'.
-  void update(const FrameworkInfo& newInfo);
+  void update(
+      const FrameworkInfo& newInfo,
+      Option<::mesos::scheduler::OfferConstraints>&& offerConstraints);
 
   // Reactivate framework with new connection: update connection-related state
   // and mark the framework as CONNECTED, regardless of the previous state.
@@ -2537,6 +2544,11 @@ struct Framework
   // action on object.
   Try<bool> approved(const authorization::ActionObject& actionObject) const;
 
+  const Option<::mesos::scheduler::OfferConstraints>& offerConstraints() const
+  {
+    return offerConstraints_;
+  }
+
   Master* const master;
 
   FrameworkInfo info;
@@ -2623,6 +2635,7 @@ private:
   Framework(Master* const _master,
             const Flags& masterFlags,
             const FrameworkInfo& _info,
+            Option<::mesos::scheduler::OfferConstraints>&& offerConstraints,
             State state,
             bool active,
             const process::Owned<ObjectApprovers>& objectApprovers,
@@ -2656,6 +2669,9 @@ private:
 
   // ObjectApprovers for the framework's principal.
   process::Owned<ObjectApprovers> objectApprovers;
+
+  // The last offer constraints with which the framework has been subscribed.
+  Option<::mesos::scheduler::OfferConstraints> offerConstraints_;
 };
 
 

Reply via email to