This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 505928a3f51555bd3e45f2fc9787fdf890b28bfb
Author: Greg Mann <g...@mesosphere.io>
AuthorDate: Mon Jul 15 10:25:56 2019 -0700

    Added tests for task killing when draining the agent.
    
    Review: https://reviews.apache.org/r/70904/
---
 src/tests/slave_tests.cpp | 335 ++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 335 insertions(+)

diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 8098a1a..147967d 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -94,6 +94,8 @@
 #include "tests/resources_utils.hpp"
 #include "tests/utils.hpp"
 
+#include "tests/containerizer/mock_containerizer.hpp"
+
 using namespace mesos::internal::slave;
 
 #ifdef USE_SSL_SOCKET
@@ -11881,6 +11883,339 @@ TEST_F(SlaveTest, DrainInfoInAPIOutputs)
   }
 }
 
+
+// When an agent receives a `DrainSlaveMessage`, it should kill running tasks.
+TEST_F(SlaveTest, DrainAgentKillsRunningTask)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+  v1::Offer::Operation launch = v1::LAUNCH({taskInfo});
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {launch}));
+
+  AWAIT_READY(startingUpdate);
+  EXPECT_EQ(v1::TASK_STARTING, startingUpdate->status().state());
+
+  AWAIT_READY(runningUpdate);
+  EXPECT_EQ(v1::TASK_RUNNING, runningUpdate->status().state());
+
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&killedUpdate));
+
+  // Simulate the master sending a `DrainSlaveMessage` to the agent.
+
+  // Immediately kill the task forcefully.
+  DurationInfo maxGracePeriod;
+  maxGracePeriod.set_nanoseconds(0);
+
+  DrainConfig drainConfig;
+  drainConfig.set_mark_gone(true);
+  drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
+
+  DrainSlaveMessage drainSlaveMessage;
+  drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
+
+  process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
+
+  AWAIT_READY(killedUpdate);
+
+  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+}
+
+
+// When the agent receives a `DrainSlaveMessage`, it should kill queued tasks.
+TEST_F(SlaveTest, DrainAgentKillsQueuedTask)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  MockContainerizer mockContainerizer;
+  StandaloneMasterDetector detector(master.get()->pid);
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  EXPECT_CALL(mockContainerizer, recover(_))
+    .WillOnce(Return(Nothing()));
+
+  EXPECT_CALL(mockContainerizer, containers())
+    .WillOnce(Return(hashset<ContainerID>()));
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(
+      &detector,
+      &mockContainerizer,
+      slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+  v1::Offer::Operation launch = v1::LAUNCH({taskInfo});
+
+  // Return a pending future from the containerizer when launching the executor
+  // container so that the task remains pending.
+  Promise<slave::Containerizer::LaunchResult> launchResult;
+  Future<Nothing> launched;
+  EXPECT_CALL(mockContainerizer, launch(_, _, _, _))
+    .WillOnce(DoAll(
+        FutureSatisfy(&launched),
+        Return(launchResult.future())));
+
+  EXPECT_CALL(mockContainerizer, update(_, _))
+    .WillOnce(Return(Nothing()));
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {launch}));
+
+  AWAIT_READY(launched);
+
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&killedUpdate));
+
+  // Simulate the master sending a `DrainSlaveMessage` to the agent.
+
+  // Immediately kill the task forcefully.
+  DurationInfo maxGracePeriod;
+  maxGracePeriod.set_nanoseconds(0);
+
+  DrainConfig drainConfig;
+  drainConfig.set_mark_gone(true);
+  drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
+
+  DrainSlaveMessage drainSlaveMessage;
+  drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
+
+  EXPECT_CALL(mockContainerizer, destroy(_))
+    .WillOnce(Return(None()));
+
+  process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
+
+  AWAIT_READY(killedUpdate);
+
+  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+}
+
+
+// When the agent receives a `DrainSlaveMessage`, it should kill pending tasks.
+TEST_F(SlaveTest, DrainAgentKillsPendingTask)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  MockAuthorizer mockAuthorizer;
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(
+      &detector,
+      &mockAuthorizer,
+      slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+  v1::Offer::Operation launch = v1::LAUNCH({taskInfo});
+
+  // Intercept authorization so that the task remains pending.
+  Future<Nothing> authorized;
+  Promise<bool> promise; // Never satisfied.
+  EXPECT_CALL(mockAuthorizer, authorized(_))
+    .WillOnce(DoAll(FutureSatisfy(&authorized),
+                    Return(promise.future())));
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {launch}));
+
+  AWAIT_READY(authorized);
+
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&killedUpdate));
+
+  // Simulate the master sending a `DrainSlaveMessage` to the agent.
+
+  // Immediately kill the task forcefully.
+  DurationInfo maxGracePeriod;
+  maxGracePeriod.set_nanoseconds(0);
+
+  DrainConfig drainConfig;
+  drainConfig.set_mark_gone(true);
+  drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
+
+  DrainSlaveMessage drainSlaveMessage;
+  drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
+
+  process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
+
+  AWAIT_READY(killedUpdate);
+
+  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to