Added tests to check that executors which fail to launch are removed. Theses tests ensure that the agent sends `ExitedExecutorMessage` when a task group fails to launch due to unschedule GC failure, or when a task fails to launch due to task authorization failure.
Review: https://reviews.apache.org/r/65593/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3e3c582f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3e3c582f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3e3c582f Branch: refs/heads/master Commit: 3e3c582f10e8154e4a76c2b481cc33c8d4d0310c Parents: a8e723b Author: Meng Zhu <m...@mesosphere.io> Authored: Tue Feb 13 22:45:23 2018 -0800 Committer: Greg Mann <gregorywm...@gmail.com> Committed: Wed Feb 14 02:36:03 2018 -0800 ---------------------------------------------------------------------- src/tests/slave_tests.cpp | 305 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 305 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3e3c582f/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 6631c05..d2c242e 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -4727,6 +4727,311 @@ TEST_F(SlaveTest, RemoveExecutorUponFailedLaunch) } +// This test ensures that agent sends ExitedExecutorMessage when the task group +// fails to launch due to unschedule GC failure and that master's executor +// bookkeeping entry is removed. +TEST_F(SlaveTest, RemoveExecutorUponFailedTaskGroupLaunch) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + // Start a mock slave. + Try<Owned<cluster::Slave>> slave = + StartSlave(detector.get(), CreateSlaveFlags(), true); + + ASSERT_SOME(slave); + ASSERT_NE(nullptr, slave.get()->mock()); + + slave.get()->start(); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo = v1::DEFAULT_EXECUTOR_INFO; + executorInfo.clear_command(); + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_resources()->CopyFrom(resources); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)); + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + mesos.send(v1::createCallSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + // Update `executorInfo` with the subscribed `frameworkId`. + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + v1::TaskInfo task1 = v1::createTask(agentId, resources, "sleep 1000"); + v1::TaskInfo task2 = v1::createTask(agentId, resources, "sleep 1000"); + + v1::TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(task1); + taskGroup.add_tasks()->CopyFrom(task2); + + v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(executorInfo, taskGroup); + + // Saved arguments from `Slave::_run()`. + Future<list<bool>> _unschedules; + FrameworkInfo _frameworkInfo; + ExecutorInfo _executorInfo; + Option<TaskGroupInfo> _taskGroup; + Option<TaskInfo> _task; + vector<ResourceVersionUUID> _resourceVersionUuids; + Option<bool> _launchExecutor; + + // Capture `_run` arguments. + Future<Nothing> _run; + EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _)) + .WillOnce(DoAll(FutureSatisfy(&_run), + SaveArg<0>(&_unschedules), + SaveArg<1>(&_frameworkInfo), + SaveArg<2>(&_executorInfo), + SaveArg<3>(&_task), + SaveArg<4>(&_taskGroup), + SaveArg<5>(&_resourceVersionUuids), + SaveArg<6>(&_launchExecutor))); + + Future<ExitedExecutorMessage> exitedExecutorMessage = + FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _); + + mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup})); + + AWAIT_READY(_run); + + // Induce a failed GC unschedule. + Promise<list<bool>> promise; + Future<list<bool>> failedFuture = promise.future(); + promise.fail(""); + + process::dispatch(slave.get()->pid, [&] { + slave.get()->mock()->unmocked__run( + failedFuture, + _frameworkInfo, + _executorInfo, + _task, + _taskGroup, + _resourceVersionUuids, + _launchExecutor); + }); + + AWAIT_READY(exitedExecutorMessage); + + // Helper function to post a request to '/api/v1' master endpoint + // and return the response. + auto post = []( + const process::PID<master::Master>& pid, + const v1::master::Call& call, + const ContentType& contentType) + { + process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + headers["Accept"] = stringify(contentType); + + return process::http::post( + pid, + "api/v1", + headers, + serialize(contentType, call), + stringify(contentType)); + }; + + v1::master::Call v1Call; + v1Call.set_type(v1::master::Call::GET_EXECUTORS); + + Future<process::http::Response> response = + post(master.get()->pid, v1Call, ContentType::PROTOBUF); + + response.await(); + ASSERT_EQ(response->status, process::http::OK().status); + + Future<v1::master::Response> v1Response = + deserialize<v1::master::Response>(ContentType::PROTOBUF, response->body); + + // Master has no executor entry because the executor never launched. + ASSERT_TRUE(v1Response->IsInitialized()); + ASSERT_EQ(v1::master::Response::GET_EXECUTORS, v1Response->type()); + ASSERT_EQ(0, v1Response->get_executors().executors_size()); +} + + +// This test ensures that agent sends ExitedExecutorMessage when the task +// fails to launch due to task authorization failure and that master's executor +// bookkeeping entry is removed. +TEST_F(SlaveTest, RemoveExecutorUponFailedTaskAuthorization) +{ + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + TestContainerizer containerizer(&exec); + + slave::Flags slaveFlags = CreateSlaveFlags(); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + // Start a mock slave. + Try<Owned<cluster::Slave>> slave = + StartSlave(detector.get(), &containerizer, slaveFlags, true); + + ASSERT_SOME(slave); + ASSERT_NE(nullptr, slave.get()->mock()); + + slave.get()->start(); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(_, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + Resources executorResources = Resources::parse("cpus:0.1;mem:32").get(); + executorResources.allocate("*"); + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); + task.mutable_resources()->MergeFrom( + Resources(offers.get()[0].resources()) - executorResources); + + task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); + task.mutable_executor()->mutable_resources()->CopyFrom(executorResources); + + EXPECT_CALL(exec, registered(_, _, _, _)) + .Times(0); + + Future<TaskStatus> killTaskStatus; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&killTaskStatus)); + + Future<ExitedExecutorMessage> exitedExecutorMessage = + FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _); + + Future<list<bool>> _future; + FrameworkInfo _frameworkInfo; + ExecutorInfo _executorInfo; + Option<TaskInfo> _task; + Option<TaskGroupInfo> _taskGroup; + vector<ResourceVersionUUID> _resourceVersionUuids; + Option<bool> _launchExecutor; + + // Capture `__run` arguments. + Future<Nothing> __run; + EXPECT_CALL(*slave.get()->mock(), __run(_, _, _, _, _, _, _)) + .WillOnce(DoAll(FutureSatisfy(&__run), + SaveArg<0>(&_future), + SaveArg<1>(&_frameworkInfo), + SaveArg<2>(&_executorInfo), + SaveArg<3>(&_task), + SaveArg<4>(&_taskGroup), + SaveArg<5>(&_resourceVersionUuids), + SaveArg<6>(&_launchExecutor))); + + driver.launchTasks(offers.get()[0].id(), {task}); + + AWAIT_READY(__run); + + // Induce a task authorization failure. + Promise<list<bool>> promise; + Future<list<bool>> failedFuture = promise.future(); + promise.fail(""); + + process::dispatch(slave.get()->pid, [&] { + slave.get()->mock()->unmocked___run( + failedFuture, + _frameworkInfo, + _executorInfo, + _task, + _taskGroup, + _resourceVersionUuids, + _launchExecutor); + }); + + AWAIT_READY(exitedExecutorMessage); + + // Helper function to post a request to '/api/v1' master endpoint + // and return the response. + auto post = []( + const process::PID<master::Master>& pid, + const v1::master::Call& call, + const ContentType& contentType) + { + process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + headers["Accept"] = stringify(contentType); + + return process::http::post( + pid, + "api/v1", + headers, + serialize(contentType, call), + stringify(contentType)); + }; + + v1::master::Call v1Call; + v1Call.set_type(v1::master::Call::GET_EXECUTORS); + + Future<process::http::Response> response = + post(master.get()->pid, v1Call, ContentType::PROTOBUF); + + response.await(); + ASSERT_EQ(response->status, process::http::OK().status); + + Future<v1::master::Response> v1Response = + deserialize<v1::master::Response>(ContentType::PROTOBUF, response->body); + + // Master has no executor entry because the executor never launched. + ASSERT_TRUE(v1Response->IsInitialized()); + ASSERT_EQ(v1::master::Response::GET_EXECUTORS, v1Response->type()); + ASSERT_EQ(0, v1Response->get_executors().executors_size()); + + driver.stop(); + driver.join(); +} + + // This test verifies that the executor is shutdown if all of its initial // tasks could not be delivered, even after the executor has been registered. // See MESOS-8411.