Added tests to check that executors which fail to launch are removed.

Theses tests ensure that the agent sends `ExitedExecutorMessage` when
a task group fails to launch due to unschedule GC failure, or when a
task fails to launch due to task authorization failure.

Review: https://reviews.apache.org/r/65593/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3e3c582f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3e3c582f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3e3c582f

Branch: refs/heads/master
Commit: 3e3c582f10e8154e4a76c2b481cc33c8d4d0310c
Parents: a8e723b
Author: Meng Zhu <m...@mesosphere.io>
Authored: Tue Feb 13 22:45:23 2018 -0800
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Wed Feb 14 02:36:03 2018 -0800

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 305 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 305 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3e3c582f/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 6631c05..d2c242e 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -4727,6 +4727,311 @@ TEST_F(SlaveTest, RemoveExecutorUponFailedLaunch)
 }
 
 
+// This test ensures that agent sends ExitedExecutorMessage when the task group
+// fails to launch due to unschedule GC failure and that master's executor
+// bookkeeping entry is removed.
+TEST_F(SlaveTest, RemoveExecutorUponFailedTaskGroupLaunch)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  // Start a mock slave.
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), CreateSlaveFlags(), true);
+
+  ASSERT_SOME(slave);
+  ASSERT_NE(nullptr, slave.get()->mock());
+
+  slave.get()->start();
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::ExecutorInfo executorInfo = v1::DEFAULT_EXECUTOR_INFO;
+  executorInfo.clear_command();
+  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+  executorInfo.mutable_resources()->CopyFrom(resources);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  v1::scheduler::TestMesos mesos(
+    master.get()->pid,
+    ContentType::PROTOBUF,
+    scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  mesos.send(v1::createCallSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Update `executorInfo` with the subscribed `frameworkId`.
+  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  v1::TaskInfo task1 = v1::createTask(agentId, resources, "sleep 1000");
+  v1::TaskInfo task2 = v1::createTask(agentId, resources, "sleep 1000");
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(task1);
+  taskGroup.add_tasks()->CopyFrom(task2);
+
+  v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(executorInfo, taskGroup);
+
+  // Saved arguments from `Slave::_run()`.
+  Future<list<bool>> _unschedules;
+  FrameworkInfo _frameworkInfo;
+  ExecutorInfo _executorInfo;
+  Option<TaskGroupInfo> _taskGroup;
+  Option<TaskInfo> _task;
+  vector<ResourceVersionUUID> _resourceVersionUuids;
+  Option<bool> _launchExecutor;
+
+  // Capture `_run` arguments.
+  Future<Nothing> _run;
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureSatisfy(&_run),
+                  SaveArg<0>(&_unschedules),
+                  SaveArg<1>(&_frameworkInfo),
+                  SaveArg<2>(&_executorInfo),
+                  SaveArg<3>(&_task),
+                  SaveArg<4>(&_taskGroup),
+                  SaveArg<5>(&_resourceVersionUuids),
+                  SaveArg<6>(&_launchExecutor)));
+
+  Future<ExitedExecutorMessage> exitedExecutorMessage =
+    FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
+
+  mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
+
+  AWAIT_READY(_run);
+
+  // Induce a failed GC unschedule.
+  Promise<list<bool>> promise;
+  Future<list<bool>> failedFuture = promise.future();
+  promise.fail("");
+
+  process::dispatch(slave.get()->pid, [&] {
+    slave.get()->mock()->unmocked__run(
+        failedFuture,
+        _frameworkInfo,
+        _executorInfo,
+        _task,
+        _taskGroup,
+        _resourceVersionUuids,
+        _launchExecutor);
+  });
+
+  AWAIT_READY(exitedExecutorMessage);
+
+  // Helper function to post a request to '/api/v1' master endpoint
+  // and return the response.
+  auto post = [](
+      const process::PID<master::Master>& pid,
+      const v1::master::Call& call,
+      const ContentType& contentType)
+  {
+    process::http::Headers headers = 
createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(contentType);
+
+    return process::http::post(
+        pid,
+        "api/v1",
+        headers,
+        serialize(contentType, call),
+        stringify(contentType));
+  };
+
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::GET_EXECUTORS);
+
+  Future<process::http::Response> response =
+    post(master.get()->pid, v1Call, ContentType::PROTOBUF);
+
+  response.await();
+  ASSERT_EQ(response->status, process::http::OK().status);
+
+  Future<v1::master::Response> v1Response =
+    deserialize<v1::master::Response>(ContentType::PROTOBUF, response->body);
+
+  // Master has no executor entry because the executor never launched.
+  ASSERT_TRUE(v1Response->IsInitialized());
+  ASSERT_EQ(v1::master::Response::GET_EXECUTORS, v1Response->type());
+  ASSERT_EQ(0, v1Response->get_executors().executors_size());
+}
+
+
+// This test ensures that agent sends ExitedExecutorMessage when the task
+// fails to launch due to task authorization failure and that master's executor
+// bookkeeping entry is removed.
+TEST_F(SlaveTest, RemoveExecutorUponFailedTaskAuthorization)
+{
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  // Start a mock slave.
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &containerizer, slaveFlags, true);
+
+  ASSERT_SOME(slave);
+  ASSERT_NE(nullptr, slave.get()->mock());
+
+  slave.get()->start();
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  Resources executorResources = Resources::parse("cpus:0.1;mem:32").get();
+  executorResources.allocate("*");
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(
+      Resources(offers.get()[0].resources()) - executorResources);
+
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+  task.mutable_executor()->mutable_resources()->CopyFrom(executorResources);
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(0);
+
+  Future<TaskStatus> killTaskStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&killTaskStatus));
+
+  Future<ExitedExecutorMessage> exitedExecutorMessage =
+    FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
+
+  Future<list<bool>> _future;
+  FrameworkInfo _frameworkInfo;
+  ExecutorInfo _executorInfo;
+  Option<TaskInfo> _task;
+  Option<TaskGroupInfo> _taskGroup;
+  vector<ResourceVersionUUID> _resourceVersionUuids;
+  Option<bool> _launchExecutor;
+
+  // Capture `__run` arguments.
+  Future<Nothing> __run;
+  EXPECT_CALL(*slave.get()->mock(), __run(_, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureSatisfy(&__run),
+                  SaveArg<0>(&_future),
+                  SaveArg<1>(&_frameworkInfo),
+                  SaveArg<2>(&_executorInfo),
+                  SaveArg<3>(&_task),
+                  SaveArg<4>(&_taskGroup),
+                  SaveArg<5>(&_resourceVersionUuids),
+                  SaveArg<6>(&_launchExecutor)));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(__run);
+
+  // Induce a task authorization failure.
+  Promise<list<bool>> promise;
+  Future<list<bool>> failedFuture = promise.future();
+  promise.fail("");
+
+  process::dispatch(slave.get()->pid, [&] {
+    slave.get()->mock()->unmocked___run(
+        failedFuture,
+        _frameworkInfo,
+        _executorInfo,
+        _task,
+        _taskGroup,
+        _resourceVersionUuids,
+        _launchExecutor);
+  });
+
+  AWAIT_READY(exitedExecutorMessage);
+
+  // Helper function to post a request to '/api/v1' master endpoint
+  // and return the response.
+  auto post = [](
+      const process::PID<master::Master>& pid,
+      const v1::master::Call& call,
+      const ContentType& contentType)
+  {
+    process::http::Headers headers = 
createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(contentType);
+
+    return process::http::post(
+        pid,
+        "api/v1",
+        headers,
+        serialize(contentType, call),
+        stringify(contentType));
+  };
+
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::GET_EXECUTORS);
+
+  Future<process::http::Response> response =
+    post(master.get()->pid, v1Call, ContentType::PROTOBUF);
+
+  response.await();
+  ASSERT_EQ(response->status, process::http::OK().status);
+
+  Future<v1::master::Response> v1Response =
+    deserialize<v1::master::Response>(ContentType::PROTOBUF, response->body);
+
+  // Master has no executor entry because the executor never launched.
+  ASSERT_TRUE(v1Response->IsInitialized());
+  ASSERT_EQ(v1::master::Response::GET_EXECUTORS, v1Response->type());
+  ASSERT_EQ(0, v1Response->get_executors().executors_size());
+
+  driver.stop();
+  driver.join();
+}
+
+
 // This test verifies that the executor is shutdown if all of its initial
 // tasks could not be delivered, even after the executor has been registered.
 // See MESOS-8411.

Reply via email to