This is an automated email from the ASF dual-hosted git repository. asekretenko pushed a commit to branch 1.9.x in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/1.9.x by this push: new b3b6dbb Added test for reactivation of a disconnected drained agent. b3b6dbb is described below commit b3b6dbb27a93a9ace4e4d2d1e83b16ea92f1a8e1 Author: Andrei Sekretenko <asekrete...@apache.org> AuthorDate: Tue Apr 14 18:55:59 2020 +0200 Added test for reactivation of a disconnected drained agent. Review: https://reviews.apache.org/r/72364 --- src/tests/master_draining_tests.cpp | 143 ++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) diff --git a/src/tests/master_draining_tests.cpp b/src/tests/master_draining_tests.cpp index f1a00df..50a0f20 100644 --- a/src/tests/master_draining_tests.cpp +++ b/src/tests/master_draining_tests.cpp @@ -63,8 +63,10 @@ using process::Owned; using testing::_; using testing::AllOf; using testing::DoAll; +using testing::Not; using testing::Return; using testing::Sequence; +using testing::Truly; using testing::WithParamInterface; namespace mesos { @@ -224,6 +226,147 @@ TEST_P(MasterAlreadyDrainedTest, DrainAgent) } +// This is a regression test for MESOS-10116. +// It verifies that reactivating an agent while it is not connected +// does not trigger generating offers for this agent, but instead makes +// the agent offered when it re-registers. +// +// Also, the test ensures that accepting the first offer for a reconnected agent +// does not crash the master and that this offer can be successfully used +// to launch a task. The latter serves as a regression test for MESOS-10118. +TEST_P(MasterAlreadyDrainedTest, ReactivateDisconnectedAgent) +{ + const ContentType contentType = GetParam(); + + const auto IsMarkAgentDrained = + [](const process::Owned<master::RegistryOperation>& operation) { + return dynamic_cast<master::MarkAgentDrained*>(operation.get()) != + nullptr; + }; + + Future<Nothing> registrarApplyDrained; + EXPECT_CALL(*master->registrar, apply(Truly(IsMarkAgentDrained))) + .WillOnce(DoAll( + FutureSatisfy(®istrarApplyDrained), + Invoke(master->registrar.get(), &MockRegistrar::unmocked_apply))); + + EXPECT_CALL(*master->registrar, apply(Not(Truly(IsMarkAgentDrained)))) + .WillRepeatedly( + Invoke(master->registrar.get(), &MockRegistrar::unmocked_apply)); + + + { + v1::master::Call::DrainAgent drainAgent; + drainAgent.mutable_agent_id()->CopyFrom(agentId); + drainAgent.mutable_max_grace_period()->set_seconds(10); + + v1::master::Call call; + call.set_type(v1::master::Call::DRAIN_AGENT); + call.mutable_drain_agent()->CopyFrom(drainAgent); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::OK().status, + post(master->pid, call, contentType)); + } + + AWAIT_READY(registrarApplyDrained); + + // The agent should apply draining as well. + Clock::settle(); + + // Simulate agent crash. + slave->terminate(); + slave.reset(); + + Clock::settle(); + + // Set up the scheduler. + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + v1::FrameworkInfo::Capability::PARTITION_AWARE); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + // Expect no offers after agent reactivation. + EXPECT_CALL(*scheduler, offers(_, _)) + .Times(testing::AtMost(0)); + + // Subscribe the scheduler. + v1::scheduler::TestMesos mesos(master.get()->pid, contentType, scheduler); + + AWAIT_READY(subscribed); + const v1::FrameworkID frameworkId = subscribed->framework_id(); + + // Later, after agent reconnection, the scheduler will launch a task. + // It should expect and acknowledge all task status updates. + // Note that we will be specifically waiting for TASK_RUNNING update. + const auto sendAcknowledge = + v1::scheduler::SendAcknowledge(frameworkId, agentId); + + EXPECT_CALL( + *scheduler, update(_, Not(TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .WillRepeatedly(sendAcknowledge); + + Future<v1::scheduler::Event::Update> taskRunning; + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_RUNNING))) + .WillOnce(DoAll(FutureArg<1>(&taskRunning), sendAcknowledge)); + + // Reactivate the agent. + { + v1::master::Call::ReactivateAgent reactivateAgent; + reactivateAgent.mutable_agent_id()->CopyFrom(agentId); + + v1::master::Call call; + call.set_type(v1::master::Call::REACTIVATE_AGENT); + call.mutable_reactivate_agent()->CopyFrom(reactivateAgent); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::OK().status, + post(master->pid, call, contentType)); + } + + // Trigger allocation to make sure that the agent is not offered. + Clock::advance(masterFlags.allocation_interval); + Clock::settle(); + + // Expect to get an offer after the agent is brought back. + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + Try<Owned<cluster::Slave>> recoveredSlave = + StartSlave(detector.get(), agentFlags); + ASSERT_SOME(recoveredSlave); + + Clock::advance(agentFlags.registration_backoff_factor); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + EXPECT_EQ(agentId, offer.agent_id()); + + // Launch a task to verify that the offer is usable. + const v1::TaskInfo taskInfo = + v1::createTask(agentId, offer.resources(), SLEEP_COMMAND(1000)); + + mesos.send( + v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({taskInfo})})); + + AWAIT_READY(taskRunning); +} + + // When an operator submits a DRAIN_AGENT call with 'mark_gone == true', // and the agent is not running anything, the agent should immediately be // marked gone.