[mesos] 04/04: Replaced removeOffer + recoverResources pairs with specialized helpers.
This is an automated email from the ASF dual-hosted git repository. mzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git commit 927b012e96abeebbb02c293698be1ef43867e15f Author: Andrei Sekretenko AuthorDate: Fri Sep 6 14:15:54 2019 -0700 Replaced removeOffer + recoverResources pairs with specialized helpers. This patch adds helper methods `Master::rescindOffer()` / `Master::discardOffer()` that recover offer's resources in the allocator and remove the offer, and replaces paired calls of `removeOffer()` + `allocator->recoverResources()` with these helpers. Review: https://reviews.apache.org/r/71436/ --- src/master/http.cpp| 8 +- src/master/master.cpp | 242 - src/master/master.hpp | 19 +++- src/master/quota_handler.cpp | 20 +--- src/master/weights_handler.cpp | 8 +- 5 files changed, 115 insertions(+), 182 deletions(-) diff --git a/src/master/http.cpp b/src/master/http.cpp index 0987d93..60765c9 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -4336,13 +4336,7 @@ Future Master::Http::_operation( // NOTE: However it's entirely possible that these resources are // offered to other frameworks in the next 'allocate' and the filter // cannot prevent it. -master->allocator->recoverResources( -offer->framework_id(), -offer->slave_id(), -offer->resources(), -Filters()); - -master->removeOffer(offer, true); // Rescind! +master->rescindOffer(offer, Filters()); // If we've rescinded enough offers to cover 'operation', we're done. Try updatedRecovered = totalRecovered.apply(operation); diff --git a/src/master/master.cpp b/src/master/master.cpp index 60eb3aa..a2c289a 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1164,9 +1164,8 @@ void Master::finalize() } } -// Remove offers. foreach (Offer* offer, utils::copy(slave->offers)) { - removeOffer(offer); + discardOffer(offer); } // Remove inverse offers. @@ -3127,17 +3126,12 @@ void Master::_subscribe( LOG(INFO) << "Allowing framework " << *framework << " to subscribe with an already used id"; - // Remove any offers sent to this framework. + // Rescind any offers sent to this framework. // NOTE: We need to do this because the scheduler might have // replied to the offers but the driver might have dropped // those messages since it wasn't connected to the master. foreach (Offer* offer, utils::copy(framework->offers)) { -allocator->recoverResources( -offer->framework_id(), -offer->slave_id(), -offer->resources(), -None()); -removeOffer(offer, true); // Rescind. +rescindOffer(offer); } // Also remove inverse offers. @@ -3368,13 +3362,11 @@ void Master::deactivate(Framework* framework, bool rescind) // Remove the framework's offers. foreach (Offer* offer, utils::copy(framework->offers)) { -allocator->recoverResources( -offer->framework_id(), -offer->slave_id(), -offer->resources(), -None()); - -removeOffer(offer, rescind); +if (rescind) { + rescindOffer(offer); +} else { + discardOffer(offer); +} } // Remove the framework's inverse offers. @@ -3421,15 +3413,8 @@ void Master::deactivate(Slave* slave) allocator->deactivateSlave(slave->id); - // Remove and rescind offers. foreach (Offer* offer, utils::copy(slave->offers)) { -allocator->recoverResources( -offer->framework_id(), -slave->id, -offer->resources(), -None()); - -removeOffer(offer, true); // Rescind! +rescindOffer(offer); } // Remove and rescind inverse offers. @@ -4350,24 +4335,17 @@ void Master::accept( // 'drop' overload can handle both resource recovery and lost task // notifications. -// Remove existing offers and recover their resources. +// Discard existing offers. foreach (const OfferID& offerId, accept.offer_ids()) { Offer* offer = getOffer(offerId); - if (offer == nullptr) { + if (offer != nullptr) { +discardOffer(offer); + } else { // If the offer was not in our offer set, then this offer is no // longer valid. LOG(WARNING) << "Ignoring accept of offer " << offerId << " since it is no longer valid"; -continue; } - - allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - None()); - - removeOffer(offer); } LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids() @@ -4913,21 +4891,14 @@ void Master::_accept( scheduler::Call::Accept&& accept, const Future>>& _authorizations)
[mesos] 01/04: Separated handling offer validation failure from handling success.
This is an automated email from the ASF dual-hosted git repository. mzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git commit a8050cafaa5465bd74a2ced1c37bb6b64c735445 Author: Andrei Sekretenko AuthorDate: Fri Sep 6 14:15:28 2019 -0700 Separated handling offer validation failure from handling success. This patch refactors the loop through offer IDs in `Master::accept()` into two simpler loops: one loop for the offer validation failure case, another for the case of validation success, thus bringing removal of offers and recovering their resources close together. This is a prerequisite for implementing `rescindOffer()`/ `declineOffer()` in the dependent patch. Review: https://reviews.apache.org/r/71433/ --- src/master/master.cpp | 111 +- 1 file changed, 64 insertions(+), 47 deletions(-) diff --git a/src/master/master.cpp b/src/master/master.cpp index f00906e..89435c4 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -4335,74 +4335,49 @@ void Master::accept( // TODO(jieyu): Add metrics for non launch operations. } - // TODO(bmahler): We currently only support using multiple offers - // for a single slave. - Resources offeredResources; - Option slaveId = None(); Option error = None(); - Option allocationInfo = None(); if (accept.offer_ids().size() == 0) { error = Error("No offers specified"); } else { // Validate the offers. error = validation::offer::validate(accept.offer_ids(), this, framework); + } -size_t offersAccepted = 0; + if (error.isSome()) { +// TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to +// consistently handle message dropping. It would be ideal if the +// 'drop' overload can handle both resource recovery and lost task +// notifications. -// Compute offered resources and remove the offers. If the -// validation failed, return resources to the allocator. +// Remove existing offers and recover their resources. foreach (const OfferID& offerId, accept.offer_ids()) { Offer* offer = getOffer(offerId); - if (offer != nullptr) { -// Don't bother adding resources to `offeredResources` in case -// validation failed; just recover them. -if (error.isSome()) { - allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - None()); -} else { - slaveId = offer->slave_id(); - allocationInfo = offer->allocation_info(); - offeredResources += offer->resources(); - - offersAccepted++; -} - -removeOffer(offer); + if (offer == nullptr) { +// If the offer was not in our offer set, then this offer is no +// longer valid. +LOG(WARNING) << "Ignoring accept of offer " << offerId + << " since it is no longer valid"; continue; } - // If the offer was not in our offer set, then this offer is no - // longer valid. - LOG(WARNING) << "Ignoring accept of offer " << offerId - << " since it is no longer valid"; -} + allocator->recoverResources( + offer->framework_id(), + offer->slave_id(), + offer->resources(), + None()); -framework->metrics.offers_accepted += offersAccepted; - } + removeOffer(offer); +} - // If invalid, send TASK_DROPPED for the launch attempts. If the - // framework is not partition-aware, send TASK_LOST instead. If - // other operations have their `id` field set, then send - // OPERATION_ERROR updates for them. - // - // TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to - // consistently handle message dropping. It would be ideal if the - // 'drop' overload can handle both resource recovery and lost task - // notifications. - if (error.isSome()) { LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids() << "': " << error->message; -TaskState newTaskState = TASK_DROPPED; -if (!framework->capabilities.partitionAware) { - newTaskState = TASK_LOST; -} +const TaskState newTaskState = + framework->capabilities.partitionAware ? TASK_DROPPED : TASK_LOST; foreach (const Offer::Operation& operation, accept.operations()) { + // Send OPERATION_ERROR for non-LAUNCH operations if (operation.type() != Offer::Operation::LAUNCH && operation.type() != Offer::Operation::LAUNCH_GROUP) { drop(framework, @@ -4411,6 +4386,7 @@ void Master::accept( continue; } + // Send task status updates for launch attempts. const RepeatedPtrField& tasks = [&]() { if (operation.type() == Offer::Operation::LAUNCH) { return operation.launch().task_infos();
[mesos] 02/04: Moved `removeOffers()` from `Master::accept()` into `Master::_accept()`.
This is an automated email from the ASF dual-hosted git repository. mzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git commit 7eb21c41ed255184988298e29644bf7f310c3374 Author: Andrei Sekretenko AuthorDate: Fri Sep 6 14:15:38 2019 -0700 Moved `removeOffers()` from `Master::accept()` into `Master::_accept()`. This patch moves offer removal on accept into the deferred continuation that follows authorization (if offers pass validation in `accept()`). Incrementing the `offers_accepted` metric is also moved to `_accept()`. This is a prerequisite for implementing `rescindOffer()` / `declineOffer()` / in the dependent patch. Review: https://reviews.apache.org/r/71434/ --- src/master/master.cpp | 81 +-- src/master/master.hpp | 1 - 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/src/master/master.cpp b/src/master/master.cpp index 89435c4..60eb3aa 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -4427,48 +4427,32 @@ void Master::accept( // From now on, we are handling the valid offers case. + // Get slave id and allocation info from some existing offer + // (as they are valid, they must have the same slave id and allocation info). + const Offer* existingOffer = ([this](const RepeatedPtrField& ids) { +for (const OfferID& id : ids) { + const Offer* offer = getOffer(id); + if (offer != nullptr) { +return offer; + } +} + +LOG(FATAL) << "No validated offer_ids correspond to existing offers"; + })(accept.offer_ids()); + // TODO(bmahler): We currently only support using multiple offers // for a single slave. - Option slaveId = None(); + SlaveID slaveId = existingOffer->slave_id(); - // TODO(asekretenko): The code below is copying AllocationInfo (and + // TODO(asekretenko): The code below is copying AllocationInfo (and later // injecting it into operations) as a whole, but only the 'role' field is // subject to offer validation. As for now, this works fine, because // AllocationInfo has no other fields. However, this is fragile and can // silently break if more fields are added to AllocationInfo. - Option allocationInfo = None(); - Resources offeredResources; - - size_t offersAccepted = 0; - - // Compute offered resources and remove the offers. - foreach (const OfferID& offerId, accept.offer_ids()) { -Offer* offer = getOffer(offerId); -if (offer == nullptr) { - LOG(WARNING) << "Ignoring accept of offer " << offerId - << " since it is no longer valid"; - continue; -} - -if (slaveId.isNone()) { - slaveId = offer->slave_id(); -} - -if (allocationInfo.isNone()) { - allocationInfo = offer->allocation_info(); -} - -offeredResources += offer->resources(); -offersAccepted++; - -removeOffer(offer); - } - - framework->metrics.offers_accepted += offersAccepted; + Resource::AllocationInfo allocationInfo = existingOffer->allocation_info(); - CHECK_SOME(slaveId); - Slave* slave = slaves.registered.get(slaveId.get()); - CHECK(slave != nullptr) << slaveId.get(); + Slave* slave = slaves.registered.get(slaveId); + CHECK(slave != nullptr) << slaveId; // Validate and upgrade all of the resources in `accept.operations`: // @@ -4625,7 +4609,7 @@ void Master::accept( drop(framework, operation, "Operation requested feedback, but agent " + - stringify(slaveId.get()) + + stringify(slaveId) + " does not have the required RESOURCE_PROVIDER capability"); break; } @@ -4636,7 +4620,7 @@ void Master::accept( drop(framework, operation, "Operation on agent default resources requested feedback," - " but agent " + stringify(slaveId.get()) + + " but agent " + stringify(slaveId) + " does not have the required AGENT_OPERATION_FEEDBACK and" " RESOURCE_PROVIDER capabilities"); break; @@ -4666,8 +4650,7 @@ void Master::accept( // within an offer now contain an `AllocationInfo`. We therefore // inject the offer's allocation info into the operation's // resources if the scheduler has not done so already. -CHECK_SOME(allocationInfo); -protobuf::injectAllocationInfo(, allocationInfo.get()); +protobuf::injectAllocationInfo(, allocationInfo); switch (operation.type()) { case Offer::Operation::RESERVE: @@ -4918,8 +4901,7 @@ void Master::accept( .onAny(defer(self(), ::_accept, framework->id(), - slaveId.get(), - offeredResources, + slaveId, std::move(accept), lambda::_1)); } @@
[mesos] branch master updated (6c2a94c -> 927b012)
This is an automated email from the ASF dual-hosted git repository. mzhu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git. from 6c2a94c Added `SlaveOptions` for wrapping all parameters of `StartSlave`. new a8050ca Separated handling offer validation failure from handling success. new 7eb21c4 Moved `removeOffers()` from `Master::accept()` into `Master::_accept()`. new c20469e Moved setting expectation for recoverResources() to a proper place. new 927b012 Replaced removeOffer + recoverResources pairs with specialized helpers. The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: src/master/http.cpp | 8 +- src/master/master.cpp| 320 ++- src/master/master.hpp| 20 +- src/master/quota_handler.cpp | 20 +- src/master/weights_handler.cpp | 8 +- src/tests/master_authorization_tests.cpp | 30 +-- 6 files changed, 177 insertions(+), 229 deletions(-)
[mesos] 03/04: Moved setting expectation for recoverResources() to a proper place.
This is an automated email from the ASF dual-hosted git repository. mzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git commit c20469edb25348e70bf1647a57694e6dff8041c4 Author: Andrei Sekretenko AuthorDate: Fri Sep 6 14:15:48 2019 -0700 Moved setting expectation for recoverResources() to a proper place. Currently, when accepting offers for a slave already removed from the master, Master::_accept() dispatches `recoverResources()` unconditionally (with empty resources if, due to framework/slave removal the offers were already rescinded and their resources recovered). The depending patch removes this redundant recovery of empty resources, thus exposing a race in some of `MasterAuthorizationTest.*` tests between `FUTURE_DISPATCH(_, ::recoverResources)` and the dispatch which performs the actual resource recovery. This patch is fixing these tests to wait for the first dispatch of `recoverResources()`. Review: https://reviews.apache.org/r/71435/ --- src/tests/master_authorization_tests.cpp | 30 +++--- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp index 2932564..06471aa 100644 --- a/src/tests/master_authorization_tests.cpp +++ b/src/tests/master_authorization_tests.cpp @@ -433,6 +433,9 @@ TEST_F(MasterAuthorizationTest, KillTask) EXPECT_CALL(sched, statusUpdate(, _)) .WillOnce(FutureArg<1>()); + Future recoverResources = +FUTURE_DISPATCH(_, ::recoverResources); + // Now kill the task. driver.killTask(task.task_id()); @@ -441,9 +444,6 @@ TEST_F(MasterAuthorizationTest, KillTask) EXPECT_EQ(TASK_KILLED, status->state()); EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, status->reason()); - Future recoverResources = -FUTURE_DISPATCH(_, ::recoverResources); - // Now complete authorization. promise.set(true); @@ -556,6 +556,9 @@ TEST_F(MasterAuthorizationTest, KillPendingTaskInTaskGroup) AWAIT_READY(authorize1); AWAIT_READY(authorize2); + Future recoverResources = +FUTURE_DISPATCH(_, ::recoverResources); + // Now kill task1. driver.killTask(task1.task_id()); @@ -566,9 +569,6 @@ TEST_F(MasterAuthorizationTest, KillPendingTaskInTaskGroup) EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, task1Status->reason()); - Future recoverResources = -FUTURE_DISPATCH(_, ::recoverResources); - // Now complete authorizations for task1 and task2. promise1.set(true); promise2.set(true); @@ -651,6 +651,9 @@ TEST_F(MasterAuthorizationTest, SlaveRemovedLost) EXPECT_CALL(sched, slaveLost(, _)) .WillOnce(FutureSatisfy()); + Future recoverResources = +FUTURE_DISPATCH(_, ::recoverResources); + // Stop the slave with explicit shutdown as otherwise with // checkpointing the master will wait for the slave to reconnect. slave.get()->shutdown(); @@ -662,9 +665,6 @@ TEST_F(MasterAuthorizationTest, SlaveRemovedLost) EXPECT_CALL(sched, statusUpdate(, _)) .WillOnce(FutureArg<1>()); - Future recoverResources = -FUTURE_DISPATCH(_, ::recoverResources); - // Now complete authorization. promise.set(true); @@ -756,6 +756,9 @@ TEST_F(MasterAuthorizationTest, SlaveRemovedDropped) EXPECT_CALL(sched, slaveLost(, _)) .WillOnce(FutureSatisfy()); + Future recoverResources = +FUTURE_DISPATCH(_, ::recoverResources); + // Stop the slave with explicit shutdown as otherwise with // checkpointing the master will wait for the slave to reconnect. slave.get()->shutdown(); @@ -767,9 +770,6 @@ TEST_F(MasterAuthorizationTest, SlaveRemovedDropped) EXPECT_CALL(sched, statusUpdate(, _)) .WillOnce(FutureArg<1>()); - Future recoverResources = -FUTURE_DISPATCH(_, ::recoverResources); - // Now complete authorization. promise.set(true); @@ -856,15 +856,15 @@ TEST_F(MasterAuthorizationTest, FrameworkRemoved) Future removeFramework = FUTURE_DISPATCH(_, ::removeFramework); + Future recoverResources = +FUTURE_DISPATCH(_, ::recoverResources); + // Now stop the framework. driver.stop(); driver.join(); AWAIT_READY(removeFramework); - Future recoverResources = -FUTURE_DISPATCH(_, ::recoverResources); - // Now complete authorization. promise.set(true);
[mesos] branch master updated: Added `SlaveOptions` for wrapping all parameters of `StartSlave`.
This is an automated email from the ASF dual-hosted git repository. abudnik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git The following commit(s) were added to refs/heads/master by this push: new 6c2a94c Added `SlaveOptions` for wrapping all parameters of `StartSlave`. 6c2a94c is described below commit 6c2a94ca0eca90e6d3517e4400f4529ddce0b84c Author: Andrei Budnik AuthorDate: Mon Sep 2 17:15:52 2019 +0200 Added `SlaveOptions` for wrapping all parameters of `StartSlave`. This patch introduces a `SlaveOptions` struct which holds optional parameters accepted by `cluster::Slave::create`. Added an overload of `StartSlave` that accepts `SlaveOptions`. It's a preferred way of creating and starting an instance of `cluster::Slave` in tests, since underlying `cluster::Slave::create` accepts a long list of optional arguments, which might be extended in the future. Review: https://reviews.apache.org/r/71424 --- src/tests/mesos.cpp | 281 +--- src/tests/mesos.hpp | 104 +++ 2 files changed, 153 insertions(+), 232 deletions(-) diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp index 0396ce7..e77db22 100644 --- a/src/tests/mesos.cpp +++ b/src/tests/mesos.cpp @@ -334,25 +334,22 @@ Try> MesosTest::StartMaster( } -Try> MesosTest::StartSlave( -MasterDetector* detector, -const Option& flags, -bool mock) +Try> MesosTest::StartSlave(const SlaveOptions& options) { Try> slave = cluster::Slave::create( - detector, - flags.isNone() ? CreateSlaveFlags() : flags.get(), - None(), - None(), - None(), - None(), - None(), - None(), - None(), - None(), - mock); - - if (slave.isSome() && !mock) { + options.detector, + options.flags.isNone() ? CreateSlaveFlags() : options.flags.get(), + options.id, + options.containerizer, + options.gc, + options.taskStatusUpdateManager, + options.resourceEstimator, + options.qosController, + options.secretGenerator, + options.authorizer, + options.mock); + + if (slave.isSome() && !options.mock) { slave.get()->start(); } @@ -362,28 +359,23 @@ Try> MesosTest::StartSlave( Try> MesosTest::StartSlave( MasterDetector* detector, -slave::Containerizer* containerizer, const Option& flags, bool mock) { - Try> slave = cluster::Slave::create( - detector, - flags.isNone() ? CreateSlaveFlags() : flags.get(), - None(), - containerizer, - None(), - None(), - None(), - None(), - None(), - None(), - mock); + return StartSlave(SlaveOptions(detector, mock) +.withFlags(flags)); +} - if (slave.isSome() && !mock) { -slave.get()->start(); - } - return slave; +Try> MesosTest::StartSlave( +MasterDetector* detector, +slave::Containerizer* containerizer, +const Option& flags, +bool mock) +{ + return StartSlave(SlaveOptions(detector, mock) +.withFlags(flags) +.withContainerizer(containerizer)); } @@ -393,24 +385,9 @@ Try> MesosTest::StartSlave( const Option& flags, bool mock) { - Try> slave = cluster::Slave::create( - detector, - flags.isNone() ? CreateSlaveFlags() : flags.get(), - id, - None(), - None(), - None(), - None(), - None(), - None(), - None(), - mock); - - if (slave.isSome() && !mock) { -slave.get()->start(); - } - - return slave; + return StartSlave(SlaveOptions(detector, mock) +.withFlags(flags) +.withId(id)); } @@ -420,17 +397,10 @@ Try> MesosTest::StartSlave( const string& id, const Option& flags) { - Try> slave = cluster::Slave::create( - detector, - flags.isNone() ? CreateSlaveFlags() : flags.get(), - id, - containerizer); - - if (slave.isSome()) { -slave.get()->start(); - } - - return slave; + return StartSlave(SlaveOptions(detector) +.withFlags(flags) +.withId(id) +.withContainerizer(containerizer)); } @@ -440,24 +410,9 @@ Try> MesosTest::StartSlave( const Option& flags, bool mock) { - Try> slave = cluster::Slave::create( - detector, - flags.isNone() ? CreateSlaveFlags() : flags.get(), - None(), - None(), - gc, - None(), - None(), - None(), - None(), - None(), - mock); - - if (slave.isSome() && !mock) { -slave.get()->start(); - } - - return slave; + return StartSlave(SlaveOptions(detector, mock) +.withFlags(flags) +.withGc(gc)); } @@ -466,20 +421,9 @@ Try> MesosTest::StartSlave( mesos::slave::ResourceEstimator* resourceEstimator, const Option& flags) { - Try> slave = cluster::Slave::create( - detector, - flags.isNone() ? CreateSlaveFlags() : flags.get(), - None(), - None(), - None(), - None(), -