Updated Branches: refs/heads/master f00832a1d -> 5eb50d6e8
Added a recovery timeout for executor driver self-termination. Review: https://reviews.apache.org/r/13791 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5eb50d6e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5eb50d6e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5eb50d6e Branch: refs/heads/master Commit: 5eb50d6e8da1f311de30ca5f1218bb5bcba2236c Parents: f00832a Author: Benjamin Mahler <bmah...@twitter.com> Authored: Fri Aug 23 19:13:30 2013 -0700 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Mon Aug 26 12:52:07 2013 -0700 ---------------------------------------------------------------------- src/exec/exec.cpp | 64 +++++++++++++++++++++-- src/launcher/launcher.cpp | 10 +++- src/launcher/launcher.hpp | 7 ++- src/launcher/main.cpp | 26 ++++++++- src/slave/cgroups_isolator.cpp | 3 +- src/slave/constants.cpp | 1 + src/slave/constants.hpp | 1 + src/slave/flags.hpp | 9 ++++ src/slave/process_isolator.cpp | 3 +- src/tests/slave_recovery_tests.cpp | 93 +++++++++++++++++++++++++++++++++ 10 files changed, 206 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/exec/exec.cpp ---------------------------------------------------------------------- diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp index ca61892..d370560 100644 --- a/src/exec/exec.cpp +++ b/src/exec/exec.cpp @@ -106,7 +106,8 @@ public: const ExecutorID& _executorId, bool _local, const string& _directory, - bool _checkpoint) + bool _checkpoint, + Duration _recoveryTimeout) : ProcessBase(ID::generate("executor")), slave(_slave), driver(_driver), @@ -115,10 +116,12 @@ public: frameworkId(_frameworkId), executorId(_executorId), connected(false), + connection(UUID::random()), local(_local), aborted(false), directory(_directory), - checkpoint(_checkpoint) + checkpoint(_checkpoint), + recoveryTimeout(_recoveryTimeout) { install<ExecutorRegisteredMessage>( &ExecutorProcess::registered, @@ -195,6 +198,7 @@ protected: VLOG(1) << "Executor registered on slave " << slaveId; connected = true; + connection = UUID::random(); Stopwatch stopwatch; if (FLAGS_v >= 1) { @@ -216,6 +220,9 @@ protected: VLOG(1) << "Executor re-registered on slave " << slaveId; + connected = true; + connection = UUID::random(); + Stopwatch stopwatch; if (FLAGS_v >= 1) { stopwatch.start(); @@ -391,6 +398,23 @@ protected: aborted = true; } + void _recoveryTimeout(UUID _connection) + { + // If we're connected, no need to shut down the driver! + if (connected) { + return; + } + + // We need to compare the connections here to ensure there have + // not been any subsequent re-registrations with the slave in the + // interim. + if (connection == _connection) { + VLOG(1) << "Recovery timeout of " << recoveryTimeout << " exceeded; " + << "Shutting down"; + shutdown(); + } + } + virtual void exited(const UPID& pid) { if (aborted) { @@ -402,13 +426,21 @@ protected: // successfully registered with the slave, the slave can reconnect with // this executor when it comes back up and performs recovery! if (checkpoint && connected) { + connected = false; + VLOG(1) << "Slave exited, but framework has checkpointing enabled. " - << "Waiting to reconnect with slave " << slaveId; + << "Waiting " << recoveryTimeout << " to reconnect with slave " + << slaveId; + + delay(recoveryTimeout, self(), &Self::_recoveryTimeout, connection); + return; } VLOG(1) << "Slave exited ... shutting down"; + connected = false; + if (!local) { // Start the Shutdown Process. spawn(new ShutdownProcess(), true); @@ -494,10 +526,12 @@ private: FrameworkID frameworkId; ExecutorID executorId; bool connected; // Registered with the slave. + UUID connection; // UUID to identify the connection instance. bool local; bool aborted; const string directory; bool checkpoint; + Duration recoveryTimeout; LinkedHashMap<UUID, StatusUpdate> updates; // Unacknowledged updates. @@ -587,7 +621,7 @@ Status MesosExecutorDriver::start() value = os::getenv("MESOS_SLAVE_PID"); slave = UPID(value); if (!slave) { - fatal("cannot parse MESOS_SLAVE_PID"); + fatal("Cannot parse MESOS_SLAVE_PID '%s'", value.c_str()); } // Get slave ID from environment. @@ -615,6 +649,25 @@ Status MesosExecutorDriver::start() checkpoint = false; } + Duration recoveryTimeout = slave::RECOVERY_TIMEOUT; + + // Get the recovery timeout if checkpointing is enabled. + if (checkpoint) { + value = os::getenv("MESOS_RECOVERY_TIMEOUT", false); + + if (!value.empty()) { + Try<Duration> _recoveryTimeout = Duration::parse(value); + + if (_recoveryTimeout.isError()) { + fatal("Cannot parse MESOS_RECOVERY_TIMEOUT '%s': %s", + value.c_str(), + _recoveryTimeout.error().c_str()); + } + + recoveryTimeout = _recoveryTimeout.get(); + } + } + CHECK(process == NULL); process = new ExecutorProcess( @@ -626,7 +679,8 @@ Status MesosExecutorDriver::start() executorId, local, workDirectory, - checkpoint); + checkpoint, + recoveryTimeout); spawn(process); http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/launcher/launcher.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/launcher.cpp b/src/launcher/launcher.cpp index 004d90e..7bf127a 100644 --- a/src/launcher/launcher.cpp +++ b/src/launcher/launcher.cpp @@ -68,7 +68,8 @@ ExecutorLauncher::ExecutorLauncher( const string& _hadoopHome, bool _redirectIO, bool _shouldSwitchUser, - bool _checkpoint) + bool _checkpoint, + Duration _recoveryTimeout) : slaveId(_slaveId), frameworkId(_frameworkId), executorId(_executorId), @@ -82,7 +83,8 @@ ExecutorLauncher::ExecutorLauncher( hadoopHome(_hadoopHome), redirectIO(_redirectIO), shouldSwitchUser(_shouldSwitchUser), - checkpoint (_checkpoint) {} + checkpoint(_checkpoint), + recoveryTimeout(_recoveryTimeout) {} ExecutorLauncher::~ExecutorLauncher() {} @@ -456,6 +458,10 @@ map<string, string> ExecutorLauncher::getEnvironment() env["MESOS_EXECUTOR_UUID"] = uuid.toString(); env["MESOS_CHECKPOINT"] = checkpoint ? "1" : "0"; + if (checkpoint) { + env["MESOS_RECOVERY_TIMEOUT"] = stringify(recoveryTimeout); + } + return env; } http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/launcher/launcher.hpp ---------------------------------------------------------------------- diff --git a/src/launcher/launcher.hpp b/src/launcher/launcher.hpp index 637c9bc..104fe81 100644 --- a/src/launcher/launcher.hpp +++ b/src/launcher/launcher.hpp @@ -24,6 +24,7 @@ #include <mesos/mesos.hpp> +#include <stout/duration.hpp> #include <stout/uuid.hpp> #include "slave/flags.hpp" @@ -60,7 +61,8 @@ public: const std::string& hadoopHome, bool redirectIO, bool shouldSwitchUser, - bool checkpoint); + bool checkpoint, + Duration recoveryTimeout); virtual ~ExecutorLauncher(); @@ -111,6 +113,9 @@ protected: const bool redirectIO; // Whether to redirect stdout and stderr to files. const bool shouldSwitchUser; // Whether to setuid to framework's user. const bool checkpoint; // Whether the framework enabled checkpointing. + + // Executor suicide timeout for slave recovery. + const Duration recoveryTimeout; }; } // namespace launcher { http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/launcher/main.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/main.cpp b/src/launcher/main.cpp index 5674afb..de64609 100644 --- a/src/launcher/main.cpp +++ b/src/launcher/main.cpp @@ -16,8 +16,11 @@ * limitations under the License. */ +#include <string> + #include <mesos/mesos.hpp> +#include <stout/duration.hpp> #include <stout/strings.hpp> #include <stout/os.hpp> @@ -26,6 +29,8 @@ using namespace mesos; using namespace mesos::internal; // For 'utils'. +using std::string; + int main(int argc, char** argv) { @@ -57,6 +62,24 @@ int main(int argc, char** argv) commandInfo.add_uris()->MergeFrom(uri); } + bool checkpoint = os::getenv("MESOS_CHECKPOINT", false) == "1"; + + Duration recoveryTimeout = slave::RECOVERY_TIMEOUT; + + // Get the recovery timeout if checkpointing is enabled. + if (checkpoint) { + string value = os::getenv("MESOS_RECOVERY_TIMEOUT", false); + + if (!value.empty()) { + Try<Duration> _recoveryTimeout = Duration::parse(value); + + CHECK_SOME(_recoveryTimeout) + << "Cannot parse MESOS_RECOVERY_TIMEOUT '" + value + "'"; + + recoveryTimeout = _recoveryTimeout.get(); + } + } + return mesos::internal::launcher::ExecutorLauncher( slaveId, frameworkId, @@ -71,6 +94,7 @@ int main(int argc, char** argv) os::getenv("MESOS_HADOOP_HOME"), os::getenv("MESOS_REDIRECT_IO") == "1", os::getenv("MESOS_SWITCH_USER") == "1", - os::getenv("MESOS_CHECKPOINT") == "1") + checkpoint, + recoveryTimeout) .run(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/cgroups_isolator.cpp ---------------------------------------------------------------------- diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp index d4ccd11..676768e 100644 --- a/src/slave/cgroups_isolator.cpp +++ b/src/slave/cgroups_isolator.cpp @@ -623,7 +623,8 @@ void CgroupsIsolator::launchExecutor( flags.hadoop_home, !local, flags.switch_user, - frameworkInfo.checkpoint()); + frameworkInfo.checkpoint(), + flags.recovery_timeout); // First fetch the executor. if (launcher.setup() < 0) { http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/constants.cpp ---------------------------------------------------------------------- diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp index e8d16ca..8c74c00 100644 --- a/src/slave/constants.cpp +++ b/src/slave/constants.cpp @@ -29,6 +29,7 @@ const Duration STATUS_UPDATE_RETRY_INTERVAL = Seconds(10); const Duration GC_DELAY = Weeks(1); const double GC_DISK_HEADROOM = 0.1; const Duration DISK_WATCH_INTERVAL = Minutes(1); +const Duration RECOVERY_TIMEOUT = Minutes(15); const Duration RESOURCE_MONITORING_INTERVAL = Seconds(5); const uint32_t MAX_COMPLETED_FRAMEWORKS = 50; const uint32_t MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK = 150; http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/constants.hpp ---------------------------------------------------------------------- diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp index 901fdf2..bbbbfd3 100644 --- a/src/slave/constants.hpp +++ b/src/slave/constants.hpp @@ -37,6 +37,7 @@ namespace slave { extern const Duration EXECUTOR_REGISTRATION_TIMEOUT; extern const Duration EXECUTOR_SHUTDOWN_GRACE_PERIOD; extern const Duration EXECUTOR_REREGISTER_TIMEOUT; +extern const Duration RECOVERY_TIMEOUT; extern const Duration STATUS_UPDATE_RETRY_INTERVAL; extern const Duration GC_DELAY; extern const Duration DISK_WATCH_INTERVAL; http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/flags.hpp ---------------------------------------------------------------------- diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp index 616be9b..ea1e4f7 100644 --- a/src/slave/flags.hpp +++ b/src/slave/flags.hpp @@ -141,6 +141,14 @@ public: " and the slave registers with the master as a new slave.", "reconnect"); + add(&Flags::recovery_timeout, + "recovery_timeout", + "Amount of time alloted for the slave to recover. If the slave takes\n" + "longer than recovery_timeout to recover, any executors that are\n" + "waiting to reconnect to the slave will self-terminate.\n" + "NOTE: This flag is only applicable when checkpoint is enabled.\n", + RECOVERY_TIMEOUT); + add(&Flags::strict, "strict", "If strict=true, any and all recovery errors are considered fatal.\n" @@ -189,6 +197,7 @@ public: Duration resource_monitoring_interval; bool checkpoint; std::string recover; + Duration recovery_timeout; bool strict; #ifdef __linux__ std::string cgroups_hierarchy; http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/process_isolator.cpp ---------------------------------------------------------------------- diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp index 24a7fb2..fa80293 100644 --- a/src/slave/process_isolator.cpp +++ b/src/slave/process_isolator.cpp @@ -136,7 +136,8 @@ void ProcessIsolator::launchExecutor( flags.hadoop_home, !local, flags.switch_user, - frameworkInfo.checkpoint()); + frameworkInfo.checkpoint(), + flags.recovery_timeout); // We get the environment map for launching mesos-launcher before // the fork, because we have seen deadlock issues with ostringstream http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 0735dba..57636c1 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -690,6 +690,99 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor) } +// The slave is stopped after a non-terminal update is received. +// The command executor is expected to self-terminate while the slave +// is down, because the recovery timeout elapses. +// When the slave comes back up with recovery=reconnect, make +// sure the task is properly transitioned to FAILED. +TYPED_TEST(SlaveRecoveryTest, RecoveryTimeout) +{ + Try<PID<Master> > master = this->StartMaster(); + ASSERT_SOME(master); + + TypeParam isolator1; + + // Set a short recovery timeout, as we can't control the executor + // driver time when using the process / cgroups isolators. + slave::Flags flags = this->CreateSlaveFlags(); + flags.recovery_timeout = Milliseconds(1); + + Try<PID<Slave> > slave = this->StartSlave(&isolator1, flags); + ASSERT_SOME(slave); + + MockScheduler sched; + + // Enable checkpointing for the framework. + FrameworkInfo frameworkInfo; + frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); + frameworkInfo.set_checkpoint(true); + + MesosSchedulerDriver driver(&sched, frameworkInfo, master.get()); + + EXPECT_CALL(sched, registered(_, _, _)); + + Future<vector<Offer> > offers; + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + TaskInfo task = createTask(offers.get()[0], "sleep 1000"); + vector<TaskInfo> tasks; + tasks.push_back(task); // Long-running task. + + EXPECT_CALL(sched, statusUpdate(_, _)); + + Future<Nothing> _statusUpdateAcknowledgement = + FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); + + driver.launchTasks(offers.get()[0].id(), tasks); + + // Wait for the ACK to be checkpointed. + AWAIT_READY(_statusUpdateAcknowledgement); + + this->Stop(slave.get()); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&status)); + + // Ensure the executor terminates by causing the recovery timeout + // to elapse while disconnected from the slave. + os::sleep(Milliseconds(1)); + + Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); + + // Restart the slave (use same flags) with a new isolator. + TypeParam isolator2; + + slave = this->StartSlave(&isolator2, flags); + ASSERT_SOME(slave); + + Clock::pause(); + + AWAIT_READY(_recover); + + Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); + Clock::settle(); + + Clock::resume(); + + // Scheduler should receive the TASK_FAILED update. + AWAIT_READY(status); + ASSERT_EQ(TASK_FAILED, status.get().state()); + + driver.stop(); + driver.join(); + + this->Shutdown(); // Shutdown before isolator(s) get deallocated. +} + + // The slave is stopped after an executor is completed (i.e., it has // terminated and all its updates have been acknowledged). // When it comes back up with recovery=reconnect, make