Added test cases for PARTITION_AWARE behavior. Review: https://reviews.apache.org/r/50706/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9ba79dae Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9ba79dae Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9ba79dae Branch: refs/heads/master Commit: 9ba79daead85cb84759582e1aacf41eea0d43054 Parents: 937c85f Author: Neil Conway <neil.con...@gmail.com> Authored: Mon Sep 19 15:47:09 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Mon Sep 19 15:47:09 2016 -0700 ---------------------------------------------------------------------- src/tests/partition_tests.cpp | 514 ++++++++++++++++++++++++++++++++++++- 1 file changed, 510 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9ba79dae/src/tests/partition_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp index 34622b3..efdb49e 100644 --- a/src/tests/partition_tests.cpp +++ b/src/tests/partition_tests.cpp @@ -179,7 +179,7 @@ TEST_P(PartitionTest, ReregisterSlavePartitionAware) // Allow the master to PING the slave, but drop all PONG messages // from the slave. Note that we don't match on the master / slave - // PIDs because it's actually the SlaveObserver Process that sends + // PIDs because it's actually the `SlaveObserver` process that sends // the pings. Future<Message> ping = FUTURE_MESSAGE( Eq(PingSlaveMessage().GetTypeName()), _, _); @@ -326,7 +326,7 @@ TEST_P(PartitionTest, ReregisterSlaveNotPartitionAware) // Allow the master to PING the slave, but drop all PONG messages // from the slave. Note that we don't match on the master / slave - // PIDs because it's actually the SlaveObserver Process that sends + // PIDs because it's actually the `SlaveObserver` process that sends // the pings. Future<Message> ping = FUTURE_MESSAGE( Eq(PingSlaveMessage().GetTypeName()), _, _); @@ -458,6 +458,512 @@ TEST_P(PartitionTest, ReregisterSlaveNotPartitionAware) } +// This tests that an agent can reregister with the master after a +// partition in which the master has failed over while the agent was +// partitioned. We use one agent and two schedulers; one scheduler +// enables the PARTITION_AWARE capability, while the other does +// not. Both tasks should survive the reregistration of the partitioned +// agent: we allow the non-partition-aware task to continue running for +// backward compatibility with the "non-strict" Mesos 1.0 behavior. +TEST_P(PartitionTest, PartitionedSlaveReregistrationMasterFailover) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + masterFlags.registry_strict = GetParam(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + // Allow the master to PING the slave, but drop all PONG messages + // from the slave. Note that we don't match on the master / slave + // PIDs because it's actually the `SlaveObserver` process that sends + // the pings. + Future<Message> ping = FUTURE_MESSAGE( + Eq(PingSlaveMessage().GetTypeName()), _, _); + + DROP_PROTOBUFS(PongSlaveMessage(), _, _); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.resources = "cpus:2;mem:1024"; + + StandaloneMasterDetector detector(master.get()->pid); + + Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags); + ASSERT_SOME(slave); + + // Connect the first scheduler (not PARTITION_AWARE). + MockScheduler sched1; + TestingMesosSchedulerDriver driver1(&sched1, &detector); + + EXPECT_CALL(sched1, registered(&driver1, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched1, resourceOffers(&driver1, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver1.start(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers.get().empty()); + + Offer offer = offers.get()[0]; + + Resources taskResources = Resources::parse("cpus:1;mem:512").get(); + + EXPECT_TRUE(Resources(offer.resources()).contains(taskResources)); + + // Launch `task1` using `sched1`. + TaskInfo task1 = createTask(offer.slave_id(), taskResources, "sleep 60"); + + Future<TaskStatus> runningStatus1; + EXPECT_CALL(sched1, statusUpdate(&driver1, _)) + .WillOnce(FutureArg<1>(&runningStatus1)); + + Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + + driver1.launchTasks(offer.id(), {task1}); + + AWAIT_READY(runningStatus1); + EXPECT_EQ(TASK_RUNNING, runningStatus1.get().state()); + EXPECT_EQ(task1.task_id(), runningStatus1.get().task_id()); + + const SlaveID slaveId = runningStatus1.get().slave_id(); + + AWAIT_READY(statusUpdateAck1); + + // Connect the second scheduler (PARTITION_AWARE). + FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO; + frameworkInfo2.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + + MockScheduler sched2; + TestingMesosSchedulerDriver driver2(&sched2, &detector, frameworkInfo2); + + EXPECT_CALL(sched2, registered(&driver2, _, _)); + + EXPECT_CALL(sched2, resourceOffers(&driver2, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver2.start(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers.get().empty()); + + offer = offers.get()[0]; + + EXPECT_TRUE(Resources(offer.resources()).contains(taskResources)); + + // Launch the second task. + TaskInfo task2 = createTask(offer.slave_id(), taskResources, "sleep 60"); + + Future<TaskStatus> runningStatus2; + EXPECT_CALL(sched2, statusUpdate(&driver2, _)) + .WillOnce(FutureArg<1>(&runningStatus2)); + + Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + + driver2.launchTasks(offer.id(), {task2}); + + AWAIT_READY(runningStatus2); + EXPECT_EQ(TASK_RUNNING, runningStatus2.get().state()); + EXPECT_EQ(task2.task_id(), runningStatus2.get().task_id()); + EXPECT_EQ(slaveId, runningStatus2.get().slave_id()); + + AWAIT_READY(statusUpdateAck2); + + // Now, induce a partition of the slave by having the master + // timeout the slave. + Future<TaskStatus> lostStatus; + EXPECT_CALL(sched1, statusUpdate(&driver1, _)) + .WillOnce(FutureArg<1>(&lostStatus)); + + Future<TaskStatus> unreachableStatus; + EXPECT_CALL(sched2, statusUpdate(&driver2, _)) + .WillOnce(FutureArg<1>(&unreachableStatus)); + + // Note that we expect to get `slaveLost` callbacks in both + // schedulers, regardless of PARTITION_AWARE. + Future<Nothing> slaveLost1; + EXPECT_CALL(sched1, slaveLost(&driver1, _)) + .WillOnce(FutureSatisfy(&slaveLost1)); + + Future<Nothing> slaveLost2; + EXPECT_CALL(sched2, slaveLost(&driver2, _)) + .WillOnce(FutureSatisfy(&slaveLost2)); + + size_t pings = 0; + while (true) { + AWAIT_READY(ping); + pings++; + if (pings == masterFlags.max_agent_ping_timeouts) { + break; + } + ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _); + Clock::advance(masterFlags.agent_ping_timeout); + } + + Clock::advance(masterFlags.agent_ping_timeout); + + // `sched1` should see TASK_LOST. + AWAIT_READY(lostStatus); + EXPECT_EQ(TASK_LOST, lostStatus.get().state()); + EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, lostStatus.get().reason()); + EXPECT_EQ(task1.task_id(), lostStatus.get().task_id()); + EXPECT_EQ(slaveId, lostStatus.get().slave_id()); + + // `sched2` should see TASK_UNREACHABLE. + AWAIT_READY(unreachableStatus); + EXPECT_EQ(TASK_UNREACHABLE, unreachableStatus.get().state()); + EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, unreachableStatus.get().reason()); + EXPECT_EQ(task2.task_id(), unreachableStatus.get().task_id()); + EXPECT_EQ(slaveId, unreachableStatus.get().slave_id()); + + // The master should notify both schedulers that the slave was lost. + AWAIT_READY(slaveLost1); + AWAIT_READY(slaveLost2); + + EXPECT_CALL(sched1, disconnected(&driver1)); + EXPECT_CALL(sched2, disconnected(&driver2)); + + // Simulate master failover. + master->reset(); + master = StartMaster(); + ASSERT_SOME(master); + + // Settle the clock to ensure the master finishes recovering the registry. + Clock::settle(); + + Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF( + SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid); + + Future<Nothing> registered1; + EXPECT_CALL(sched1, registered(&driver1, _, _)) + .WillOnce(FutureSatisfy(®istered1)); + + Future<Nothing> registered2; + EXPECT_CALL(sched2, registered(&driver2, _, _)) + .WillOnce(FutureSatisfy(®istered2)); + + // Simulate a new master detected event to the slave and the schedulers. + detector.appoint(master.get()->pid); + + // Wait for slave to reregister. + AWAIT_READY(slaveReregistered); + + // Wait for both schedulers to reregister. + AWAIT_READY(registered1); + AWAIT_READY(registered2); + + // Have each scheduler perform explicit reconciliation. Both `task1` and + // `task2` should be running: `task2` because it is PARTITION_AWARE, + // `task1` because the master has failed over and we emulate the old + // "non-strict" semantics. + TaskStatus status1; + status1.mutable_task_id()->CopyFrom(task1.task_id()); + status1.mutable_slave_id()->CopyFrom(slaveId); + status1.set_state(TASK_STAGING); // Dummy value. + + Future<TaskStatus> reconcileUpdate1; + EXPECT_CALL(sched1, statusUpdate(&driver1, _)) + .WillOnce(FutureArg<1>(&reconcileUpdate1)); + + driver1.reconcileTasks({status1}); + + AWAIT_READY(reconcileUpdate1); + EXPECT_EQ(TASK_RUNNING, reconcileUpdate1.get().state()); + EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate1.get().reason()); + + TaskStatus status2; + status2.mutable_task_id()->CopyFrom(task2.task_id()); + status2.mutable_slave_id()->CopyFrom(slaveId); + status2.set_state(TASK_STAGING); // Dummy value. + + Future<TaskStatus> reconcileUpdate2; + EXPECT_CALL(sched2, statusUpdate(&driver2, _)) + .WillOnce(FutureArg<1>(&reconcileUpdate2)); + + driver2.reconcileTasks({status2}); + + AWAIT_READY(reconcileUpdate2); + EXPECT_EQ(TASK_RUNNING, reconcileUpdate2.get().state()); + EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate2.get().reason()); + + Clock::resume(); + + driver1.stop(); + driver1.join(); + + driver2.stop(); + driver2.join(); +} + + +// This test case causes a slave to be partitioned while it is running +// a task for a PARTITION_AWARE scheduler. The scheduler disconnects +// before the partition heals. Right now, the task is left running as +// an orphan; once MESOS-4659 is fixed, the task should be shutdown. +TEST_P(PartitionTest, PartitionedSlaveOrphanedTask) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + masterFlags.registry_strict = GetParam(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + // Allow the master to PING the slave, but drop all PONG messages + // from the slave. Note that we don't match on the master / slave + // PIDs because it's actually the `SlaveObserver` process that sends + // the pings. + Future<Message> ping = FUTURE_MESSAGE( + Eq(PingSlaveMessage().GetTypeName()), _, _); + + DROP_PROTOBUFS(PongSlaveMessage(), _, _); + + StandaloneMasterDetector detector(master.get()->pid); + + Try<Owned<cluster::Slave>> slave = StartSlave(&detector); + ASSERT_SOME(slave); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + AWAIT_READY(frameworkId); + AWAIT_READY(offers); + ASSERT_FALSE(offers.get().empty()); + + Offer offer = offers.get()[0]; + + // Launch `task` using `sched`. + TaskInfo task = createTask(offer, "sleep 60"); + + Future<TaskStatus> runningStatus; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&runningStatus)); + + Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + + driver.launchTasks(offer.id(), {task}); + + AWAIT_READY(runningStatus); + EXPECT_EQ(TASK_RUNNING, runningStatus.get().state()); + EXPECT_EQ(task.task_id(), runningStatus.get().task_id()); + + const SlaveID slaveId = runningStatus.get().slave_id(); + + AWAIT_READY(statusUpdateAck); + + // Now, induce a partition of the slave by having the master + // timeout the slave. + Future<TaskStatus> unreachableStatus; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&unreachableStatus)); + + Future<Nothing> slaveLost; + EXPECT_CALL(sched, slaveLost(&driver, _)) + .WillOnce(FutureSatisfy(&slaveLost)); + + size_t pings = 0; + while (true) { + AWAIT_READY(ping); + pings++; + if (pings == masterFlags.max_agent_ping_timeouts) { + break; + } + ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _); + Clock::advance(masterFlags.agent_ping_timeout); + } + + Clock::advance(masterFlags.agent_ping_timeout); + + AWAIT_READY(unreachableStatus); + EXPECT_EQ(TASK_UNREACHABLE, unreachableStatus.get().state()); + EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, unreachableStatus.get().reason()); + EXPECT_EQ(task.task_id(), unreachableStatus.get().task_id()); + EXPECT_EQ(slaveId, unreachableStatus.get().slave_id()); + + AWAIT_READY(slaveLost); + + // Disconnect the scheduler. The default `failover_timeout` is 0, so + // the framework's tasks should be shutdown when the slave + // reregisters, but this is currently not implemented (MESOS-4659). + driver.stop(); + driver.join(); + + // Simulate a master loss event at the slave and then cause the + // slave to reregister with the master. + detector.appoint(None()); + + Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF( + SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid); + + detector.appoint(master.get()->pid); + + AWAIT_READY(slaveReregistered); + + // Check if `task` is still running by querying master's state endpoint. + Future<Response> response = process::http::get( + master.get()->pid, + "state", + None(), + createBasicAuthHeaders(DEFAULT_CREDENTIAL)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); + + Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); + ASSERT_SOME(parse); + + JSON::Object state = parse.get(); + JSON::Array completedFrameworks = + state.values["completed_frameworks"].as<JSON::Array>(); + + EXPECT_EQ(1u, completedFrameworks.values.size()); + + JSON::Object jsonFramework = + completedFrameworks.values.front().as<JSON::Object>(); + + JSON::String jsonFrameworkId = jsonFramework.values["id"].as<JSON::String>(); + + EXPECT_EQ(frameworkId.get(), jsonFrameworkId.value); + + // TODO(neilc): Update this when MESOS-4659 is fixed. + JSON::Array orphanTasks = state.values["orphan_tasks"].as<JSON::Array>(); + + EXPECT_EQ(1u, orphanTasks.values.size()); + + JSON::Object jsonTask = orphanTasks.values.front().as<JSON::Object>(); + JSON::String jsonTaskId = jsonTask.values["id"].as<JSON::String>(); + + EXPECT_EQ(task.task_id(), jsonTaskId.value); + + Clock::resume(); +} + + +// This test checks that when a registered slave reregisters with the +// master (e.g., because of a spurious Zk leader flag at the slave), +// the master does not kill any tasks on the slave, even if those +// tasks are not PARTITION_AWARE. +TEST_P(PartitionTest, SpuriousSlaveReregistration) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + masterFlags.registry_strict = GetParam(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + StandaloneMasterDetector detector(master.get()->pid); + + Try<Owned<cluster::Slave>> slave = StartSlave(&detector); + ASSERT_SOME(slave); + + // The framework should not be PARTITION_AWARE, since tasks started + // by PARTITION_AWARE frameworks will never be killed on reregistration. + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + CHECK(!protobuf::frameworkHasCapability( + frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + AWAIT_READY(frameworkId); + AWAIT_READY(offers); + ASSERT_FALSE(offers.get().empty()); + + Offer offer = offers.get()[0]; + + // Launch `task` using `sched`. + TaskInfo task = createTask(offer, "sleep 60"); + + Future<TaskStatus> runningStatus; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&runningStatus)); + + Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( + slave.get()->pid, &Slave::_statusUpdateAcknowledgement); + + driver.launchTasks(offer.id(), {task}); + + AWAIT_READY(runningStatus); + EXPECT_EQ(TASK_RUNNING, runningStatus.get().state()); + EXPECT_EQ(task.task_id(), runningStatus.get().task_id()); + + const SlaveID slaveId = runningStatus.get().slave_id(); + + AWAIT_READY(statusUpdateAck); + + // Simulate a master loss event at the slave and then cause the + // slave to reregister with the master. From the master's + // perspective, the slave reregisters while it was still both + // connected and registered. + detector.appoint(None()); + + Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF( + SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid); + + detector.appoint(master.get()->pid); + + AWAIT_READY(slaveReregistered); + + // Perform explicit reconciliation. The task should still be running. + TaskStatus status; + status.mutable_task_id()->CopyFrom(task.task_id()); + status.mutable_slave_id()->CopyFrom(slaveId); + status.set_state(TASK_STAGING); // Dummy value. + + Future<TaskStatus> reconcileUpdate; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&reconcileUpdate)); + + driver.reconcileTasks({status}); + + AWAIT_READY(reconcileUpdate); + EXPECT_EQ(TASK_RUNNING, reconcileUpdate.get().state()); + EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate.get().reason()); + + driver.stop(); + driver.join(); + + Clock::resume(); +} + + // This test checks how Mesos behaves when a slave is removed because // it fails health checks, and then the slave sends a status update // (because it does not realize that it is partitioned from the @@ -476,7 +982,7 @@ TEST_P(PartitionTest, PartitionedSlaveStatusUpdates) // Drop both PINGs from master to slave and PONGs from slave to // master. Note that we don't match on the master / slave PIDs - // because it's actually the SlaveObserver Process that sends pings + // because it's actually the `SlaveObserver` process that sends pings // and receives pongs. DROP_PROTOBUFS(PingSlaveMessage(), _, _); DROP_PROTOBUFS(PongSlaveMessage(), _, _); @@ -627,7 +1133,7 @@ TEST_P(PartitionTest, PartitionedSlaveExitedExecutor) // Allow the master to PING the slave, but drop all PONG messages // from the slave. Note that we don't match on the master / slave - // PIDs because it's actually the SlaveObserver Process that sends + // PIDs because it's actually the `SlaveObserver` process that sends // the pings. Future<Message> ping = FUTURE_MESSAGE( Eq(PingSlaveMessage().GetTypeName()), _, _);