This is an automated email from the ASF dual-hosted git repository. grag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 505928a3f51555bd3e45f2fc9787fdf890b28bfb Author: Greg Mann <g...@mesosphere.io> AuthorDate: Mon Jul 15 10:25:56 2019 -0700 Added tests for task killing when draining the agent. Review: https://reviews.apache.org/r/70904/ --- src/tests/slave_tests.cpp | 335 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 335 insertions(+) diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 8098a1a..147967d 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -94,6 +94,8 @@ #include "tests/resources_utils.hpp" #include "tests/utils.hpp" +#include "tests/containerizer/mock_containerizer.hpp" + using namespace mesos::internal::slave; #ifdef USE_SSL_SOCKET @@ -11881,6 +11883,339 @@ TEST_F(SlaveTest, DrainInfoInAPIOutputs) } } + +// When an agent receives a `DrainSlaveMessage`, it should kill running tasks. +TEST_F(SlaveTest, DrainAgentKillsRunningTask) +{ + Clock::pause(); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + StandaloneMasterDetector detector(master.get()->pid); + + slave::Flags slaveFlags = CreateSlaveFlags(); + + Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags); + ASSERT_SOME(slave); + + Clock::advance(slaveFlags.registration_backoff_factor); + + AWAIT_READY(updateSlaveMessage); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + Future<v1::scheduler::Event::Update> startingUpdate; + Future<v1::scheduler::Event::Update> runningUpdate; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); + + v1::Offer::Operation launch = v1::LAUNCH({taskInfo}); + + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {launch})); + + AWAIT_READY(startingUpdate); + EXPECT_EQ(v1::TASK_STARTING, startingUpdate->status().state()); + + AWAIT_READY(runningUpdate); + EXPECT_EQ(v1::TASK_RUNNING, runningUpdate->status().state()); + + Future<v1::scheduler::Event::Update> killedUpdate; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&killedUpdate)); + + // Simulate the master sending a `DrainSlaveMessage` to the agent. + + // Immediately kill the task forcefully. + DurationInfo maxGracePeriod; + maxGracePeriod.set_nanoseconds(0); + + DrainConfig drainConfig; + drainConfig.set_mark_gone(true); + drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod); + + DrainSlaveMessage drainSlaveMessage; + drainSlaveMessage.mutable_config()->CopyFrom(drainConfig); + + process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage); + + AWAIT_READY(killedUpdate); + + EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state()); +} + + +// When the agent receives a `DrainSlaveMessage`, it should kill queued tasks. +TEST_F(SlaveTest, DrainAgentKillsQueuedTask) +{ + Clock::pause(); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + MockContainerizer mockContainerizer; + StandaloneMasterDetector detector(master.get()->pid); + slave::Flags slaveFlags = CreateSlaveFlags(); + + EXPECT_CALL(mockContainerizer, recover(_)) + .WillOnce(Return(Nothing())); + + EXPECT_CALL(mockContainerizer, containers()) + .WillOnce(Return(hashset<ContainerID>())); + + Try<Owned<cluster::Slave>> slave = StartSlave( + &detector, + &mockContainerizer, + slaveFlags); + ASSERT_SOME(slave); + + Clock::advance(slaveFlags.registration_backoff_factor); + + AWAIT_READY(updateSlaveMessage); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); + + v1::Offer::Operation launch = v1::LAUNCH({taskInfo}); + + // Return a pending future from the containerizer when launching the executor + // container so that the task remains pending. + Promise<slave::Containerizer::LaunchResult> launchResult; + Future<Nothing> launched; + EXPECT_CALL(mockContainerizer, launch(_, _, _, _)) + .WillOnce(DoAll( + FutureSatisfy(&launched), + Return(launchResult.future()))); + + EXPECT_CALL(mockContainerizer, update(_, _)) + .WillOnce(Return(Nothing())); + + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {launch})); + + AWAIT_READY(launched); + + Future<v1::scheduler::Event::Update> killedUpdate; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&killedUpdate)); + + // Simulate the master sending a `DrainSlaveMessage` to the agent. + + // Immediately kill the task forcefully. + DurationInfo maxGracePeriod; + maxGracePeriod.set_nanoseconds(0); + + DrainConfig drainConfig; + drainConfig.set_mark_gone(true); + drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod); + + DrainSlaveMessage drainSlaveMessage; + drainSlaveMessage.mutable_config()->CopyFrom(drainConfig); + + EXPECT_CALL(mockContainerizer, destroy(_)) + .WillOnce(Return(None())); + + process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage); + + AWAIT_READY(killedUpdate); + + EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state()); +} + + +// When the agent receives a `DrainSlaveMessage`, it should kill pending tasks. +TEST_F(SlaveTest, DrainAgentKillsPendingTask) +{ + Clock::pause(); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + StandaloneMasterDetector detector(master.get()->pid); + slave::Flags slaveFlags = CreateSlaveFlags(); + MockAuthorizer mockAuthorizer; + + Try<Owned<cluster::Slave>> slave = StartSlave( + &detector, + &mockAuthorizer, + slaveFlags); + ASSERT_SOME(slave); + + Clock::advance(slaveFlags.registration_backoff_factor); + + AWAIT_READY(updateSlaveMessage); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); + + v1::Offer::Operation launch = v1::LAUNCH({taskInfo}); + + // Intercept authorization so that the task remains pending. + Future<Nothing> authorized; + Promise<bool> promise; // Never satisfied. + EXPECT_CALL(mockAuthorizer, authorized(_)) + .WillOnce(DoAll(FutureSatisfy(&authorized), + Return(promise.future()))); + + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {launch})); + + AWAIT_READY(authorized); + + Future<v1::scheduler::Event::Update> killedUpdate; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&killedUpdate)); + + // Simulate the master sending a `DrainSlaveMessage` to the agent. + + // Immediately kill the task forcefully. + DurationInfo maxGracePeriod; + maxGracePeriod.set_nanoseconds(0); + + DrainConfig drainConfig; + drainConfig.set_mark_gone(true); + drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod); + + DrainSlaveMessage drainSlaveMessage; + drainSlaveMessage.mutable_config()->CopyFrom(drainConfig); + + process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage); + + AWAIT_READY(killedUpdate); + + EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state()); +} + } // namespace tests { } // namespace internal { } // namespace mesos {