Windows: Ported `slave_recovery_tests.cpp`. This commit enables the unit tests found in `slave_recovery_tests.cpp` to test agent recovery on Windows.
Review: https://reviews.apache.org/r/65408 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e4b31991 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e4b31991 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e4b31991 Branch: refs/heads/master Commit: e4b3199110655c4a35755df047c3098cfaf35388 Parents: 5c8b1e7 Author: Andrew Schwartzmeyer <and...@schwartzmeyer.com> Authored: Mon Jan 29 11:32:27 2018 -0800 Committer: Andrew Schwartzmeyer <and...@schwartzmeyer.com> Committed: Fri Feb 9 12:08:24 2018 -0800 ---------------------------------------------------------------------- src/tests/CMakeLists.txt | 2 +- src/tests/slave_recovery_tests.cpp | 117 ++++++++++++++++++++------------ 2 files changed, 73 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e4b31991/src/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 6f1c67a..ade5180 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -119,6 +119,7 @@ set(MESOS_TESTS_SRC slave_authorization_tests.cpp slave_compatibility_tests.cpp slave_tests.cpp + slave_recovery_tests.cpp slave_validation_tests.cpp sorter_tests.cpp task_status_update_manager_tests.cpp @@ -178,7 +179,6 @@ if (NOT WIN32) registrar_tests.cpp reservation_endpoints_tests.cpp reservation_tests.cpp - slave_recovery_tests.cpp state_tests.cpp teardown_tests.cpp upgrade_tests.cpp) http://git-wip-us.apache.org/repos/asf/mesos/blob/e4b31991/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 10f8dd3..762cebe 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -14,7 +14,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#ifndef __WINDOWS__ #include <unistd.h> +#endif // __WINDOWS__ #include <string> @@ -45,6 +47,8 @@ #include <stout/path.hpp> #include <stout/uuid.hpp> +#include <stout/os/killtree.hpp> + #include "common/protobuf_utils.hpp" #include "master/master.hpp" @@ -219,7 +223,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState) SlaveID slaveId = offers.get()[0].slave_id(); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); // Scheduler expectations. EXPECT_CALL(sched, statusUpdate(_, _)) @@ -390,7 +394,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTaskStatusUpdateManager) AWAIT_READY(offers); ASSERT_FALSE(offers->empty()); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); // Message expectations. Future<Message> registerExecutor = @@ -478,7 +482,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_ReconnectHTTPExecutor) ASSERT_FALSE(offers->empty()); // Launch a task with the HTTP based command executor. - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); Future<v1::executor::Call> updateCall = DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF); @@ -607,10 +611,10 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_ROOT_CGROUPS_ReconnectDefaultExecutor) const SlaveID slaveId = devolve(offer.agent_id()); v1::TaskInfo taskInfo1 = - evolve(createTask(slaveId, resources, "sleep 1000")); + evolve(createTask(slaveId, resources, SLEEP_COMMAND(1000))); v1::TaskInfo taskInfo2 = - evolve(createTask(slaveId, resources, "sleep 1000")); + evolve(createTask(slaveId, resources, SLEEP_COMMAND(1000))); v1::TaskGroupInfo taskGroup; taskGroup.add_tasks()->CopyFrom(taskInfo1); @@ -752,7 +756,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor) AWAIT_READY(offers); ASSERT_FALSE(offers->empty()); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); // Drop the status updates from the executor. // We actually wait until we can drop the TASK_RUNNING update here @@ -854,7 +858,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry) AWAIT_READY(offers); ASSERT_FALSE(offers->empty()); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); // Pause the clock to ensure the agent does not retry the // status update. We will ensure the acknowledgement is @@ -973,7 +977,7 @@ TYPED_TEST(SlaveRecoveryTest, PingTimeoutDuringRecovery) AWAIT_READY(offers); ASSERT_FALSE(offers->empty()); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); Future<TaskStatus> statusUpdate0; Future<TaskStatus> statusUpdate1; @@ -1121,7 +1125,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_RecoverUnregisteredHTTPExecutor) AWAIT_READY(offers1); ASSERT_FALSE(offers1->empty()); - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); // Drop the executor subscribe message. Future<v1::executor::Call> subscribeCall = @@ -1233,7 +1237,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor) AWAIT_READY(offers1); ASSERT_FALSE(offers1->empty()); - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); // Drop the executor registration message. Future<Message> registerExecutor = @@ -1347,7 +1351,7 @@ TYPED_TEST(SlaveRecoveryTest, KillQueuedTaskDuringExecutorRegistration) AWAIT_READY(offers1); ASSERT_FALSE(offers1->empty()); - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); // Drop the executor registration message so that the task stays // queued on the agent @@ -1451,7 +1455,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedHTTPExecutor) AWAIT_READY(offers1); ASSERT_FALSE(offers1->empty()); - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); EXPECT_CALL(sched, statusUpdate(_, _)) .WillRepeatedly(Return()); // Allow any number of subsequent status updates. @@ -1597,7 +1601,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor) AWAIT_READY(offers1); ASSERT_FALSE(offers1->empty()); - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); Future<Message> registerExecutor = FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); @@ -1745,7 +1749,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_RecoveryTimeout) AWAIT_READY(offers); ASSERT_FALSE(offers->empty()); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); EXPECT_CALL(sched, statusUpdate(_, _)); @@ -1972,7 +1976,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_CleanupHTTPExecutor) ASSERT_FALSE(offers->empty()); // Launch a task with the HTTP based command executor. - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); Future<v1::executor::Call> updateCall = DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF); @@ -2087,7 +2091,7 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor) AWAIT_READY(offers); ASSERT_FALSE(offers->empty()); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); // Expect TASK_STARTING and TASK_RUNNING updates EXPECT_CALL(sched, statusUpdate(_, _)) @@ -2216,13 +2220,15 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework) Resources resources1 = allocatedResources( Resources::parse("cpus:1;mem:512").get(), frameworkInfo.roles(0)); offer1.mutable_resources()->CopyFrom(resources1); - tasks.push_back(createTask(offer1, "sleep 1000")); // Long-running task. + // Long-running task. + tasks.push_back(createTask(offer1, SLEEP_COMMAND(1000))); Offer offer2 = offer; Resources resources2 = allocatedResources( Resources::parse("cpus:1;mem:512").get(), frameworkInfo.roles(0)); offer2.mutable_resources()->CopyFrom(resources2); - tasks.push_back(createTask(offer2, "sleep 1000")); // Long-running task, + // Long-running task. + tasks.push_back(createTask(offer2, SLEEP_COMMAND(1000))); ASSERT_TRUE(Resources(offer.resources()).contains( Resources(offer1.resources()) + @@ -2329,7 +2335,7 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingFramework) AWAIT_READY(offers); ASSERT_FALSE(offers->empty()); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); Future<Nothing> update; EXPECT_CALL(sched, statusUpdate(_, _)) @@ -2427,7 +2433,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_KillTaskWithHTTPExecutor) AWAIT_READY(offers1); ASSERT_FALSE(offers1->empty()); - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); EXPECT_CALL(sched, statusUpdate(_, _)); @@ -2542,7 +2548,7 @@ TYPED_TEST(SlaveRecoveryTest, KillTask) AWAIT_READY(offers1); ASSERT_FALSE(offers1->empty()); - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); // Expect a TASK_STARTING and a TASK_RUNNING update EXPECT_CALL(sched, statusUpdate(_, _)) @@ -2666,7 +2672,7 @@ TYPED_TEST(SlaveRecoveryTest, Reboot) AWAIT_READY(offers1); ASSERT_FALSE(offers1->empty()); - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); // Capture the slave and framework ids. SlaveID slaveId1 = offers1.get()[0].slave_id(); @@ -3040,7 +3046,7 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor) AWAIT_READY(offers1); ASSERT_FALSE(offers1->empty()); - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); // Capture the slave and framework ids. SlaveID slaveId = offers1.get()[0].slave_id(); @@ -3177,7 +3183,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave) ASSERT_FALSE(offers1->empty()); - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); Future<Nothing> statusUpdate1, statusUpdate2; EXPECT_CALL(sched, statusUpdate(_, _)) @@ -3245,7 +3251,12 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave) } +#ifndef __WINDOWS__ // The slave should shutdown when it receives a SIGUSR1 signal. +// +// TODO(andschwa): The Windows agent shuts down with SIGUSR1, but it does so +// through a custom signal-handler for Ctrl-C, not through `kill`, so this test +// can be implemented later. See MESOS-8505. TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1) { Try<Owned<cluster::Master>> master = this->StartMaster(); @@ -3286,7 +3297,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1) ASSERT_FALSE(offers->empty()); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); Future<TaskStatus> statusStarting, statusRunning; EXPECT_CALL(sched, statusUpdate(_, _)) @@ -3343,6 +3354,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1) driver.stop(); driver.join(); } +#endif // __WINDOWS__ // The slave fails to do recovery and tries to register as a new slave. The @@ -3397,7 +3409,7 @@ TYPED_TEST(SlaveRecoveryTest, RegisterDisconnectedSlave) AWAIT_READY(offers); ASSERT_FALSE(offers->empty()); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); // Capture the slave and framework ids. SlaveID slaveId = offers.get()[0].slave_id(); @@ -3527,7 +3539,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask) AWAIT_READY(offers1); ASSERT_FALSE(offers1->empty()); - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); // Capture the slave and framework ids. SlaveID slaveId = offers1.get()[0].slave_id(); @@ -3647,7 +3659,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework) Future<Nothing> _statusUpdateAcknowledgement2 = FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); driver.launchTasks(offers.get()[0].id(), {task}); @@ -3788,7 +3800,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave) // Start a task on the slave so that the master has knowledge of it. // We'll ensure the slave does not have this task when it // re-registers by wiping the relevant meta directory. - TaskInfo task = createTask(offers1.get()[0], "sleep 10"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(10)); Future<TaskStatus> starting; Future<TaskStatus> running; @@ -3966,7 +3978,7 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover) ASSERT_FALSE(offers1->empty()); // Create a long running task. - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); // Expecting TASK_STARTING and TASK_RUNNING updates EXPECT_CALL(sched1, statusUpdate(_, _)) @@ -4079,7 +4091,9 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover) // This test verifies that if the master changes when the slave is // down, the slave can still recover the task when it restarts. We // verify its correctness by killing the task from the scheduler. -TYPED_TEST(SlaveRecoveryTest, MasterFailover) +// +// TODO(andschwa): Enable when replicated log is supported (MESOS-5932). +TYPED_TEST_TEMP_DISABLED_ON_WINDOWS(SlaveRecoveryTest, MasterFailover) { // Step 1. Run a task. master::Flags masterFlags = this->CreateMasterFlags(); @@ -4125,7 +4139,7 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover) AWAIT_READY(offers1); ASSERT_FALSE(offers1->empty()); - TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); EXPECT_CALL(sched, statusUpdate(_, _)) .Times(2); // TASK_STARTING and TASK_RUNNING @@ -4272,7 +4286,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks) Resources::parse("cpus:1;mem:512").get()); // Framework 1 launches a task. - TaskInfo task1 = createTask(offer1, "sleep 1000"); + TaskInfo task1 = createTask(offer1, SLEEP_COMMAND(1000)); EXPECT_CALL(sched1, statusUpdate(_, _)) .Times(2); @@ -4310,7 +4324,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks) ASSERT_FALSE(offers2->empty()); // Framework 2 launches a task. - TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000"); + TaskInfo task2 = createTask(offers2.get()[0], SLEEP_COMMAND(1000)); EXPECT_CALL(sched2, statusUpdate(_, _)) .Times(2); @@ -4427,9 +4441,12 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves) // Start the first slave. slave::Flags flags1 = this->CreateSlaveFlags(); - // NOTE: We cannot run multiple slaves simultaneously on a host if - // cgroups isolation is involved. +#ifndef __WINDOWS__ + // NOTE: We cannot run multiple slaves simultaneously on a host if cgroups + // isolation is involved. Since this does not apply to Windows, we use the + // default flags from `CreateSlaveFlags()` above. flags1.isolation = "filesystem/posix,posix/mem,posix/cpu"; +#endif // __WINDOWS__ Fetcher fetcher(flags1); @@ -4447,7 +4464,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves) ASSERT_FALSE(offers1->empty()); // Launch a long running task in the first slave. - TaskInfo task1 = createTask(offers1.get()[0], "sleep 1000"); + TaskInfo task1 = createTask(offers1.get()[0], SLEEP_COMMAND(1000)); EXPECT_CALL(sched, statusUpdate(_, _)) .Times(2); @@ -4471,9 +4488,12 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves) // Start the second slave. slave::Flags flags2 = this->CreateSlaveFlags(); - // NOTE: We cannot run multiple slaves simultaneously on a host if - // cgroups isolation is involved. +#ifndef __WINDOWS__ + // NOTE: We cannot run multiple slaves simultaneously on a host if cgroups + // isolation is involved. Since this does not apply to Windows, we use the + // default flags from `CreateSlaveFlags()` above. flags2.isolation = "filesystem/posix,posix/mem,posix/cpu"; +#endif // __WINDOWS__ Try<TypeParam*> _containerizer2 = TypeParam::create(flags2, true, &fetcher); ASSERT_SOME(_containerizer2); @@ -4487,7 +4507,7 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves) ASSERT_FALSE(offers2->empty()); // Launch a long running task in each slave. - TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000"); + TaskInfo task2 = createTask(offers2.get()[0], SLEEP_COMMAND(1000)); EXPECT_CALL(sched, statusUpdate(_, _)) .Times(2); @@ -4620,7 +4640,7 @@ TYPED_TEST(SlaveRecoveryTest, RestartBeforeContainerizerLaunch) AWAIT_READY(offers); ASSERT_FALSE(offers->empty()); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); // Expect the launch but don't do anything. Future<Nothing> launch; @@ -4705,6 +4725,13 @@ TYPED_TEST(SlaveRecoveryTest, AgentReconfigurationWithRunningTask) // Start a slave. slave::Flags flags = this->CreateSlaveFlags(); + + // NOTE: These tests will start with "zero" memory, and the default Windows + // isolators will enforce this, causing the task to crash. The default POSIX + // isolators don't actually perform isolation, and so this does not occur. + // However, these tests are not testing isolation, they're testing resource + // accounting, so we can just use "no" isolators. + flags.isolation = ""; flags.resources = "cpus:5;mem:0;disk:0;ports:0"; Fetcher fetcher(flags); @@ -4729,7 +4756,7 @@ TYPED_TEST(SlaveRecoveryTest, AgentReconfigurationWithRunningTask) SlaveID slaveId = offers1.get()[0].slave_id(); TaskInfo task = createTask( - slaveId, Resources::parse("cpus:3").get(), "sleep 1000"); + slaveId, Resources::parse("cpus:3").get(), SLEEP_COMMAND(1000)); Future<TaskStatus> statusStarting; Future<TaskStatus> statusRunning; @@ -5064,7 +5091,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, ResourceStatistics) AWAIT_READY(offers); ASSERT_FALSE(offers->empty()); - TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + TaskInfo task = createTask(offers.get()[0], SLEEP_COMMAND(1000)); // Message expectations. Future<Message> registerExecutor = @@ -5176,7 +5203,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceForward) SlaveID slaveId = offers1.get()[0].slave_id(); TaskInfo task1 = createTask( - slaveId, Resources::parse("cpus:0.5;mem:128").get(), "sleep 1000"); + slaveId, Resources::parse("cpus:0.5;mem:128").get(), SLEEP_COMMAND(1000)); // Message expectations. Future<Message> registerExecutorMessage = @@ -5291,7 +5318,7 @@ TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PidNamespaceBackward) SlaveID slaveId = offers1.get()[0].slave_id(); TaskInfo task1 = createTask( - slaveId, Resources::parse("cpus:0.5;mem:128").get(), "sleep 1000"); + slaveId, Resources::parse("cpus:0.5;mem:128").get(), SLEEP_COMMAND(1000)); // Message expectations. Future<Message> registerExecutorMessage =