Updated Branches: refs/heads/master 576448554 -> a8e36eb86
Fixed an issue where the Master unnecessarily sends a "Framework failed over" message when the scheduler driver retries an initial failover re-registration. Review: https://reviews.apache.org/r/13757 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a8e36eb8 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a8e36eb8 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a8e36eb8 Branch: refs/heads/master Commit: a8e36eb86ebed1ea328cb8ec798654ff24a27daa Parents: fb2a47a Author: Benjamin Mahler <bmah...@twitter.com> Authored: Thu Aug 22 16:58:40 2013 -0700 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Tue Oct 8 11:11:31 2013 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 20 +++++++- src/tests/fault_tolerance_tests.cpp | 81 ++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a8e36eb8/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 8e6a6ea..cdfae1d 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -746,6 +746,12 @@ void Master::reregisterFramework(const FrameworkInfo& frameworkInfo, framework->reregisteredTime = Clock::now(); if (failover) { + // We do not attempt to detect a duplicate re-registration + // message here because it is impossible to distinguish between + // a duplicate message, and a scheduler failover to the same + // pid, given the existing libprocess primitives (PID does not + // identify the libprocess Process instance). + // TODO: Should we check whether the new scheduler has given // us a different framework name, user name or executor info? LOG(INFO) << "Framework " << frameworkInfo.id() << " failed over"; @@ -2005,7 +2011,17 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) { const UPID& oldPid = framework->pid; - { + // There are a few failover cases to consider: + // 1. The pid has changed. In this case we definitely want to + // send a FrameworkErrorMessage to shut down the older + // scheduler. + // 2. The pid has not changed. + // 2.1 The old scheduler on that pid failed over to a new + // instance on the same pid. No need to shut down the old + // scheduler as it is necessarily dead. + // 2.2 This is a duplicate message. In this case, the scheduler + // has not failed over, so we do not want to shut it down. + if (oldPid != newPid) { FrameworkErrorMessage message; message.set_message("Framework failed over"); send(oldPid, message); @@ -2022,6 +2038,8 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) allocator->frameworkActivated(framework->id, framework->info); } + // The scheduler driver safely ignores any duplicate registration + // messages, so we don't need to compare the old and new pids here. { FrameworkRegisteredMessage message; message.mutable_framework_id()->MergeFrom(framework->id); http://git-wip-us.apache.org/repos/asf/mesos/blob/a8e36eb8/src/tests/fault_tolerance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp index 10e52c4..254eae4 100644 --- a/src/tests/fault_tolerance_tests.cpp +++ b/src/tests/fault_tolerance_tests.cpp @@ -711,6 +711,87 @@ TEST_F(FaultToleranceTest, SchedulerFailover) } +// This test was added to cover a fix for MESOS-659. +// Here, we drop the initial FrameworkReregisteredMessage from the +// master, so that the scheduler driver retries the initial failover +// re-registration. Previously, this caused a "Framework failed over" +// to be sent to the new scheduler driver! +TEST_F(FaultToleranceTest, SchedulerFailoverRetriedReregistration) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + // Launch the first (i.e., failing) scheduler and wait until + // registered gets called to launch the second (i.e., failover) + // scheduler. + + MockScheduler sched1; + MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master.get()); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched1, registered(&driver1, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + driver1.start(); + + AWAIT_READY(frameworkId); + + // Now launch the second (i.e., failover) scheduler using the + // framework id recorded from the first scheduler and wait until it + // gets a registered callback.. + + MockScheduler sched2; + + FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line. + framework2 = DEFAULT_FRAMEWORK_INFO; + framework2.mutable_id()->MergeFrom(frameworkId.get()); + + MesosSchedulerDriver driver2(&sched2, framework2, master.get()); + + Clock::pause(); + + // Drop the initial FrameworkRegisteredMessage to the failed over + // scheduler. This ensures the scheduler driver will retry the + // registration. + Future<process::Message> reregistrationMessage = DROP_MESSAGE( + Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); + + // There should be no error received, the master sends the error + // prior to sending the FrameworkRegisteredMessage so we don't + // need to wait to ensure this does not occur. + EXPECT_CALL(sched2, error(&driver2, "Framework failed over")) + .Times(0); + + Future<Nothing> sched2Registered; + EXPECT_CALL(sched2, registered(&driver2, frameworkId.get(), _)) + .WillOnce(FutureSatisfy(&sched2Registered)); + + Future<Nothing> sched1Error; + EXPECT_CALL(sched1, error(&driver1, "Framework failed over")) + .WillOnce(FutureSatisfy(&sched1Error)); + + driver2.start(); + + AWAIT_READY(reregistrationMessage); + + // Trigger the re-registration retry. + Clock::advance(Seconds(1)); + + AWAIT_READY(sched2Registered); + + AWAIT_READY(sched1Error); + + EXPECT_EQ(DRIVER_STOPPED, driver2.stop()); + EXPECT_EQ(DRIVER_STOPPED, driver2.join()); + + EXPECT_EQ(DRIVER_ABORTED, driver1.stop()); + EXPECT_EQ(DRIVER_STOPPED, driver1.join()); + + Shutdown(); + Clock::resume(); +} + + TEST_F(FaultToleranceTest, FrameworkReliableRegistration) { Clock::pause();