This is an automated email from the ASF dual-hosted git repository. asekretenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit c8f8fbd328235fcd7aa1b32e079fb6f9a424b881 Author: Andrei Sekretenko <asekrete...@apache.org> AuthorDate: Fri Sep 4 20:23:25 2020 +0200 Added `OfferConstraints` to `struct Framework` in the master. This is a prerequisite for exposing framework's offer constraints via master API endpoints. Review: https://reviews.apache.org/r/72839 --- src/master/framework.cpp | 41 +++++++++++++++++++++++---- src/master/master.cpp | 74 +++++++++++++++++++++++++++++++++++++----------- src/master/master.hpp | 18 +++++++++++- 3 files changed, 110 insertions(+), 23 deletions(-) diff --git a/src/master/framework.cpp b/src/master/framework.cpp index e9e2d20..980828e 100644 --- a/src/master/framework.cpp +++ b/src/master/framework.cpp @@ -25,6 +25,7 @@ using process::Owned; using process::http::authentication::Principal; using mesos::authorization::ActionObject; +using mesos::scheduler::OfferConstraints; namespace mesos { namespace internal { @@ -34,10 +35,19 @@ Framework::Framework( Master* const master, const Flags& masterFlags, const FrameworkInfo& info, + Option<OfferConstraints>&& offerConstraints, const process::UPID& pid, const Owned<ObjectApprovers>& approvers, const process::Time& time) - : Framework(master, masterFlags, info, CONNECTED, true, approvers, time) + : Framework( + master, + masterFlags, + info, + std::move(offerConstraints), + CONNECTED, + true, + approvers, + time) { pid_ = pid; } @@ -47,10 +57,19 @@ Framework::Framework( Master* const master, const Flags& masterFlags, const FrameworkInfo& info, + Option<OfferConstraints>&& offerConstraints, const StreamingHttpConnection<v1::scheduler::Event>& http, const Owned<ObjectApprovers>& approvers, const process::Time& time) - : Framework(master, masterFlags, info, CONNECTED, true, approvers, time) + : Framework( + master, + masterFlags, + info, + std::move(offerConstraints), + CONNECTED, + true, + approvers, + time) { http_ = http; } @@ -61,7 +80,14 @@ Framework::Framework( const Flags& masterFlags, const FrameworkInfo& info) : Framework( - master, masterFlags, info, RECOVERED, false, nullptr, process::Time()) + master, + masterFlags, + info, + None(), + RECOVERED, + false, + nullptr, + process::Time()) {} @@ -69,6 +95,7 @@ Framework::Framework( Master* const _master, const Flags& masterFlags, const FrameworkInfo& _info, + Option<OfferConstraints>&& offerConstraints, State state, bool active_, const Owned<ObjectApprovers>& approvers, @@ -84,7 +111,8 @@ Framework::Framework( metrics(_info, masterFlags.publish_per_framework_metrics), active_(active_), state(state), - objectApprovers(approvers) + objectApprovers(approvers), + offerConstraints_(std::move(offerConstraints)) { CHECK(_info.has_id()); @@ -508,7 +536,9 @@ void Framework::removeOperation(Operation* operation) } -void Framework::update(const FrameworkInfo& newInfo) +void Framework::update( + const FrameworkInfo& newInfo, + Option<OfferConstraints>&& offerConstraints) { // We only merge 'info' from the same framework 'id'. CHECK_EQ(info.id(), newInfo.id()); @@ -523,6 +553,7 @@ void Framework::update(const FrameworkInfo& newInfo) CHECK_EQ(info.checkpoint(), newInfo.checkpoint()); info.CopyFrom(newInfo); + offerConstraints_ = std::move(offerConstraints); // Save the old list of roles for later. std::set<std::string> oldRoles = roles; diff --git a/src/master/master.cpp b/src/master/master.cpp index 438ef98..54b24b4 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -155,6 +155,8 @@ using mesos::master::contender::MasterContender; using mesos::master::detector::MasterDetector; +using mesos::scheduler::OfferConstraints; + static bool isValidFailoverTimeout(const FrameworkInfo& frameworkInfo); @@ -2669,13 +2671,17 @@ void Master::subscribe( Option<Error> validationError = validateFramework(frameworkInfo, subscribe.suppressed_roles()); + Option<OfferConstraints> offerConstraints; + if (subscribe.has_offer_constraints()) { + offerConstraints = std::move(*subscribe.mutable_offer_constraints()); + } + allocator::FrameworkOptions allocatorOptions; // TODO(asekretenko): Validate roles in offer constraints (see MESOS-10176). - if (validationError.isNone() && subscribe.has_offer_constraints()) { + if (validationError.isNone() && offerConstraints.isSome()) { Try<OfferConstraintsFilter> filter = OfferConstraintsFilter::create( - offerConstraintsFilterOptions, - std::move(*subscribe.mutable_offer_constraints())); + offerConstraintsFilterOptions, OfferConstraints(*offerConstraints)); if (filter.isError()) { validationError = Error(std::move(filter.error())); @@ -2705,6 +2711,7 @@ void Master::subscribe( void (Master::*_subscribe)( StreamingHttpConnection<v1::scheduler::Event>, FrameworkInfo&&, + Option<OfferConstraints>&&, bool, FrameworkOptions&&, const Future<Owned<ObjectApprovers>>&) = &Self::_subscribe; @@ -2717,6 +2724,7 @@ void Master::subscribe( _subscribe, http, std::move(frameworkInfo), + std::move(offerConstraints), subscribe.force(), std::move(allocatorOptions), lambda::_1)); @@ -2726,6 +2734,7 @@ void Master::subscribe( void Master::_subscribe( StreamingHttpConnection<v1::scheduler::Event> http, FrameworkInfo&& frameworkInfo, + Option<OfferConstraints>&& offerConstraints, bool force, ::mesos::allocator::FrameworkOptions&& options, const Future<Owned<ObjectApprovers>>& objectApprovers) @@ -2760,8 +2769,13 @@ void Master::_subscribe( FrameworkInfo frameworkInfo_ = frameworkInfo; frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId()); - Framework* framework = - new Framework(this, flags, frameworkInfo_, http, objectApprovers.get()); + Framework* framework = new Framework( + this, + flags, + frameworkInfo_, + std::move(offerConstraints), + http, + objectApprovers.get()); addFramework(framework, std::move(options)); @@ -2823,7 +2837,11 @@ void Master::_subscribe( framework->metrics.incrementCall(scheduler::Call::SUBSCRIBE); - updateFramework(framework, frameworkInfo, std::move(options)); + updateFramework( + framework, + frameworkInfo, + std::move(offerConstraints), + std::move(options)); if (!framework->recovered()) { // The framework has previously been registered with this master; @@ -2915,13 +2933,17 @@ void Master::subscribe( validationError = validateFrameworkAuthentication(frameworkInfo, from); } + Option<OfferConstraints> offerConstraints; + if (subscribe.has_offer_constraints()) { + offerConstraints = std::move(*subscribe.mutable_offer_constraints()); + } + allocator::FrameworkOptions allocatorOptions; // TODO(asekretenko): Validate roles in offer constraints (see MESOS-10176). - if (validationError.isNone() && subscribe.has_offer_constraints()) { + if (validationError.isNone() && offerConstraints.isSome()) { Try<OfferConstraintsFilter> filter = OfferConstraintsFilter::create( - offerConstraintsFilterOptions, - std::move(*subscribe.mutable_offer_constraints())); + offerConstraintsFilterOptions, OfferConstraints(*offerConstraints)); if (filter.isError()) { validationError = Error(std::move(filter.error())); @@ -2965,6 +2987,7 @@ void Master::subscribe( void (Master::*_subscribe)( const UPID&, FrameworkInfo&&, + Option<OfferConstraints>&&, bool, ::mesos::allocator::FrameworkOptions&&, const Future<Owned<ObjectApprovers>>&) = &Self::_subscribe; @@ -2977,6 +3000,7 @@ void Master::subscribe( _subscribe, from, std::move(frameworkInfo), + std::move(offerConstraints), subscribe.force(), std::move(allocatorOptions), lambda::_1)); @@ -2986,6 +3010,7 @@ void Master::subscribe( void Master::_subscribe( const UPID& from, FrameworkInfo&& frameworkInfo, + Option<OfferConstraints>&& offerConstraints, bool force, ::mesos::allocator::FrameworkOptions&& options, const Future<Owned<ObjectApprovers>>& objectApprovers) @@ -3050,8 +3075,13 @@ void Master::_subscribe( // Assign a new FrameworkID. frameworkInfo.mutable_id()->CopyFrom(newFrameworkId()); - Framework* framework = - new Framework(this, flags, frameworkInfo, from, objectApprovers.get()); + Framework* framework = new Framework( + this, + flags, + frameworkInfo, + std::move(offerConstraints), + from, + objectApprovers.get()); addFramework(framework, std::move(options)); @@ -3138,7 +3168,11 @@ void Master::_subscribe( // It is now safe to update the framework fields since the request is now // guaranteed to be successful. We use the fields passed in during // re-registration. - updateFramework(framework, frameworkInfo, std::move(options)); + updateFramework( + framework, + frameworkInfo, + std::move(offerConstraints), + std::move(options)); if (!framework->recovered()) { // The framework has previously been registered with this master; @@ -3258,20 +3292,22 @@ Future<process::http::Response> Master::updateFramework( const bool frameworkInfoChanged = !typeutils::equivalent(framework->info, call.framework_info()); + Option<OfferConstraints> offerConstraints; allocator::FrameworkOptions allocatorOptions; if (call.has_offer_constraints()) { // TODO(asekretenko): Validate roles in offer constraints (see MESOS-10176). Try<OfferConstraintsFilter> filter = OfferConstraintsFilter::create( offerConstraintsFilterOptions, - std::move(*call.mutable_offer_constraints())); + OfferConstraints(call.offer_constraints())); if (filter.isError()) { return process::http::BadRequest( "'UpdateFramework.offer_constraints' are not valid: " + filter.error()); - } else { - allocatorOptions.offerConstraintsFilter = std::move(*filter); } + + allocatorOptions.offerConstraintsFilter = std::move(*filter); + offerConstraints = std::move(*call.mutable_offer_constraints()); } ActionObject actionObject = @@ -3296,7 +3332,10 @@ Future<process::http::Response> Master::updateFramework( make_move_iterator(call.mutable_suppressed_roles()->end())); updateFramework( - framework, call.framework_info(), std::move(allocatorOptions)); + framework, + call.framework_info(), + std::move(offerConstraints), + std::move(allocatorOptions)); if (frameworkInfoChanged) { // NOTE: Among the framework properties that can be changed by this call @@ -7461,6 +7500,7 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId) void Master::updateFramework( Framework* framework, const FrameworkInfo& frameworkInfo, + Option<OfferConstraints>&& offerConstraints, ::mesos::allocator::FrameworkOptions&& allocatorOptions) { LOG(INFO) << "Updating framework " << *framework << " with roles " @@ -7479,7 +7519,7 @@ void Master::updateFramework( } } - framework->update(frameworkInfo); + framework->update(frameworkInfo, std::move(offerConstraints)); } diff --git a/src/master/master.hpp b/src/master/master.hpp index 8c2f56a..95aca58 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -670,6 +670,7 @@ protected: void updateFramework( Framework* framework, const FrameworkInfo& frameworkInfo, + Option<::mesos::scheduler::OfferConstraints>&& offerConstraints, ::mesos::allocator::FrameworkOptions&& allocatorOptions); void sendFrameworkUpdates(const Framework& framework); @@ -903,6 +904,7 @@ private: void _subscribe( StreamingHttpConnection<v1::scheduler::Event> http, FrameworkInfo&& frameworkInfo, + Option<scheduler::OfferConstraints>&& offerConstraints, bool force, ::mesos::allocator::FrameworkOptions&& allocatorOptions, const process::Future<process::Owned<ObjectApprovers>>& objectApprovers); @@ -914,6 +916,7 @@ private: void _subscribe( const process::UPID& from, FrameworkInfo&& frameworkInfo, + Option<scheduler::OfferConstraints>&& offerConstraints, bool force, ::mesos::allocator::FrameworkOptions&& allocatorOptions, const process::Future<process::Owned<ObjectApprovers>>& objectApprovers); @@ -2413,6 +2416,7 @@ struct Framework Master* const master, const Flags& masterFlags, const FrameworkInfo& info, + Option<::mesos::scheduler::OfferConstraints>&& offerConstraints, const process::UPID& _pid, const process::Owned<ObjectApprovers>& objectApprovers, const process::Time& time = process::Clock::now()); @@ -2420,6 +2424,7 @@ struct Framework Framework(Master* const master, const Flags& masterFlags, const FrameworkInfo& info, + Option<::mesos::scheduler::OfferConstraints>&& offerConstraints, const StreamingHttpConnection<v1::scheduler::Event>& _http, const process::Owned<ObjectApprovers>& objectApprovers, const process::Time& time = process::Clock::now()); @@ -2486,7 +2491,9 @@ struct Framework // Update fields in 'info' using those in 'newInfo'. Currently this // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname', // 'webui_url', 'capabilities', and 'labels'. - void update(const FrameworkInfo& newInfo); + void update( + const FrameworkInfo& newInfo, + Option<::mesos::scheduler::OfferConstraints>&& offerConstraints); // Reactivate framework with new connection: update connection-related state // and mark the framework as CONNECTED, regardless of the previous state. @@ -2537,6 +2544,11 @@ struct Framework // action on object. Try<bool> approved(const authorization::ActionObject& actionObject) const; + const Option<::mesos::scheduler::OfferConstraints>& offerConstraints() const + { + return offerConstraints_; + } + Master* const master; FrameworkInfo info; @@ -2623,6 +2635,7 @@ private: Framework(Master* const _master, const Flags& masterFlags, const FrameworkInfo& _info, + Option<::mesos::scheduler::OfferConstraints>&& offerConstraints, State state, bool active, const process::Owned<ObjectApprovers>& objectApprovers, @@ -2656,6 +2669,9 @@ private: // ObjectApprovers for the framework's principal. process::Owned<ObjectApprovers> objectApprovers; + + // The last offer constraints with which the framework has been subscribed. + Option<::mesos::scheduler::OfferConstraints> offerConstraints_; };