Updated Branches:
  refs/heads/master f00832a1d -> 5eb50d6e8

Added a recovery timeout for executor driver self-termination.

Review: https://reviews.apache.org/r/13791


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5eb50d6e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5eb50d6e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5eb50d6e

Branch: refs/heads/master
Commit: 5eb50d6e8da1f311de30ca5f1218bb5bcba2236c
Parents: f00832a
Author: Benjamin Mahler <bmah...@twitter.com>
Authored: Fri Aug 23 19:13:30 2013 -0700
Committer: Benjamin Mahler <bmah...@twitter.com>
Committed: Mon Aug 26 12:52:07 2013 -0700

----------------------------------------------------------------------
 src/exec/exec.cpp                  | 64 +++++++++++++++++++++--
 src/launcher/launcher.cpp          | 10 +++-
 src/launcher/launcher.hpp          |  7 ++-
 src/launcher/main.cpp              | 26 ++++++++-
 src/slave/cgroups_isolator.cpp     |  3 +-
 src/slave/constants.cpp            |  1 +
 src/slave/constants.hpp            |  1 +
 src/slave/flags.hpp                |  9 ++++
 src/slave/process_isolator.cpp     |  3 +-
 src/tests/slave_recovery_tests.cpp | 93 +++++++++++++++++++++++++++++++++
 10 files changed, 206 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index ca61892..d370560 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -106,7 +106,8 @@ public:
                   const ExecutorID& _executorId,
                   bool _local,
                   const string& _directory,
-                  bool _checkpoint)
+                  bool _checkpoint,
+                  Duration _recoveryTimeout)
     : ProcessBase(ID::generate("executor")),
       slave(_slave),
       driver(_driver),
@@ -115,10 +116,12 @@ public:
       frameworkId(_frameworkId),
       executorId(_executorId),
       connected(false),
+      connection(UUID::random()),
       local(_local),
       aborted(false),
       directory(_directory),
-      checkpoint(_checkpoint)
+      checkpoint(_checkpoint),
+      recoveryTimeout(_recoveryTimeout)
   {
     install<ExecutorRegisteredMessage>(
         &ExecutorProcess::registered,
@@ -195,6 +198,7 @@ protected:
     VLOG(1) << "Executor registered on slave " << slaveId;
 
     connected = true;
+    connection = UUID::random();
 
     Stopwatch stopwatch;
     if (FLAGS_v >= 1) {
@@ -216,6 +220,9 @@ protected:
 
     VLOG(1) << "Executor re-registered on slave " << slaveId;
 
+    connected = true;
+    connection = UUID::random();
+
     Stopwatch stopwatch;
     if (FLAGS_v >= 1) {
       stopwatch.start();
@@ -391,6 +398,23 @@ protected:
     aborted = true;
   }
 
+  void _recoveryTimeout(UUID _connection)
+  {
+    // If we're connected, no need to shut down the driver!
+    if (connected) {
+      return;
+    }
+
+    // We need to compare the connections here to ensure there have
+    // not been any subsequent re-registrations with the slave in the
+    // interim.
+    if (connection == _connection) {
+      VLOG(1) << "Recovery timeout of " << recoveryTimeout << " exceeded; "
+              << "Shutting down";
+      shutdown();
+    }
+  }
+
   virtual void exited(const UPID& pid)
   {
     if (aborted) {
@@ -402,13 +426,21 @@ protected:
     // successfully registered with the slave, the slave can reconnect with
     // this executor when it comes back up and performs recovery!
     if (checkpoint && connected) {
+      connected = false;
+
       VLOG(1) << "Slave exited, but framework has checkpointing enabled. "
-              << "Waiting to reconnect with slave " << slaveId;
+              << "Waiting " << recoveryTimeout << " to reconnect with slave "
+              << slaveId;
+
+      delay(recoveryTimeout, self(), &Self::_recoveryTimeout, connection);
+
       return;
     }
 
     VLOG(1) << "Slave exited ... shutting down";
 
+    connected = false;
+
     if (!local) {
       // Start the Shutdown Process.
       spawn(new ShutdownProcess(), true);
@@ -494,10 +526,12 @@ private:
   FrameworkID frameworkId;
   ExecutorID executorId;
   bool connected; // Registered with the slave.
+  UUID connection; // UUID to identify the connection instance.
   bool local;
   bool aborted;
   const string directory;
   bool checkpoint;
+  Duration recoveryTimeout;
 
   LinkedHashMap<UUID, StatusUpdate> updates; // Unacknowledged updates.
 
@@ -587,7 +621,7 @@ Status MesosExecutorDriver::start()
   value = os::getenv("MESOS_SLAVE_PID");
   slave = UPID(value);
   if (!slave) {
-    fatal("cannot parse MESOS_SLAVE_PID");
+    fatal("Cannot parse MESOS_SLAVE_PID '%s'", value.c_str());
   }
 
   // Get slave ID from environment.
@@ -615,6 +649,25 @@ Status MesosExecutorDriver::start()
     checkpoint = false;
   }
 
+  Duration recoveryTimeout = slave::RECOVERY_TIMEOUT;
+
+  // Get the recovery timeout if checkpointing is enabled.
+  if (checkpoint) {
+    value = os::getenv("MESOS_RECOVERY_TIMEOUT", false);
+
+    if (!value.empty()) {
+      Try<Duration> _recoveryTimeout = Duration::parse(value);
+
+      if (_recoveryTimeout.isError()) {
+        fatal("Cannot parse MESOS_RECOVERY_TIMEOUT '%s': %s",
+              value.c_str(),
+              _recoveryTimeout.error().c_str());
+      }
+
+      recoveryTimeout = _recoveryTimeout.get();
+    }
+  }
+
   CHECK(process == NULL);
 
   process = new ExecutorProcess(
@@ -626,7 +679,8 @@ Status MesosExecutorDriver::start()
       executorId,
       local,
       workDirectory,
-      checkpoint);
+      checkpoint,
+      recoveryTimeout);
 
   spawn(process);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/launcher/launcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/launcher.cpp b/src/launcher/launcher.cpp
index 004d90e..7bf127a 100644
--- a/src/launcher/launcher.cpp
+++ b/src/launcher/launcher.cpp
@@ -68,7 +68,8 @@ ExecutorLauncher::ExecutorLauncher(
     const string& _hadoopHome,
     bool _redirectIO,
     bool _shouldSwitchUser,
-    bool _checkpoint)
+    bool _checkpoint,
+    Duration _recoveryTimeout)
   : slaveId(_slaveId),
     frameworkId(_frameworkId),
     executorId(_executorId),
@@ -82,7 +83,8 @@ ExecutorLauncher::ExecutorLauncher(
     hadoopHome(_hadoopHome),
     redirectIO(_redirectIO),
     shouldSwitchUser(_shouldSwitchUser),
-    checkpoint (_checkpoint) {}
+    checkpoint(_checkpoint),
+    recoveryTimeout(_recoveryTimeout) {}
 
 
 ExecutorLauncher::~ExecutorLauncher() {}
@@ -456,6 +458,10 @@ map<string, string> ExecutorLauncher::getEnvironment()
   env["MESOS_EXECUTOR_UUID"] = uuid.toString();
   env["MESOS_CHECKPOINT"] = checkpoint ? "1" : "0";
 
+  if (checkpoint) {
+    env["MESOS_RECOVERY_TIMEOUT"] = stringify(recoveryTimeout);
+  }
+
   return env;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/launcher/launcher.hpp
----------------------------------------------------------------------
diff --git a/src/launcher/launcher.hpp b/src/launcher/launcher.hpp
index 637c9bc..104fe81 100644
--- a/src/launcher/launcher.hpp
+++ b/src/launcher/launcher.hpp
@@ -24,6 +24,7 @@
 
 #include <mesos/mesos.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/uuid.hpp>
 
 #include "slave/flags.hpp"
@@ -60,7 +61,8 @@ public:
       const std::string& hadoopHome,
       bool redirectIO,
       bool shouldSwitchUser,
-      bool checkpoint);
+      bool checkpoint,
+      Duration recoveryTimeout);
 
   virtual ~ExecutorLauncher();
 
@@ -111,6 +113,9 @@ protected:
   const bool redirectIO;   // Whether to redirect stdout and stderr to files.
   const bool shouldSwitchUser; // Whether to setuid to framework's user.
   const bool checkpoint; // Whether the framework enabled checkpointing.
+
+  // Executor suicide timeout for slave recovery.
+  const Duration recoveryTimeout;
 };
 
 } // namespace launcher {

http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/launcher/main.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/main.cpp b/src/launcher/main.cpp
index 5674afb..de64609 100644
--- a/src/launcher/main.cpp
+++ b/src/launcher/main.cpp
@@ -16,8 +16,11 @@
  * limitations under the License.
  */
 
+#include <string>
+
 #include <mesos/mesos.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/strings.hpp>
 #include <stout/os.hpp>
 
@@ -26,6 +29,8 @@
 using namespace mesos;
 using namespace mesos::internal; // For 'utils'.
 
+using std::string;
+
 
 int main(int argc, char** argv)
 {
@@ -57,6 +62,24 @@ int main(int argc, char** argv)
     commandInfo.add_uris()->MergeFrom(uri);
   }
 
+  bool checkpoint = os::getenv("MESOS_CHECKPOINT", false) == "1";
+
+  Duration recoveryTimeout = slave::RECOVERY_TIMEOUT;
+
+  // Get the recovery timeout if checkpointing is enabled.
+  if (checkpoint) {
+    string value = os::getenv("MESOS_RECOVERY_TIMEOUT", false);
+
+    if (!value.empty()) {
+      Try<Duration> _recoveryTimeout = Duration::parse(value);
+
+      CHECK_SOME(_recoveryTimeout)
+        << "Cannot parse MESOS_RECOVERY_TIMEOUT '" + value + "'";
+
+      recoveryTimeout = _recoveryTimeout.get();
+    }
+  }
+
   return mesos::internal::launcher::ExecutorLauncher(
       slaveId,
       frameworkId,
@@ -71,6 +94,7 @@ int main(int argc, char** argv)
       os::getenv("MESOS_HADOOP_HOME"),
       os::getenv("MESOS_REDIRECT_IO") == "1",
       os::getenv("MESOS_SWITCH_USER") == "1",
-      os::getenv("MESOS_CHECKPOINT") == "1")
+      checkpoint,
+      recoveryTimeout)
     .run();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index d4ccd11..676768e 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -623,7 +623,8 @@ void CgroupsIsolator::launchExecutor(
         flags.hadoop_home,
         !local,
         flags.switch_user,
-        frameworkInfo.checkpoint());
+        frameworkInfo.checkpoint(),
+        flags.recovery_timeout);
 
     // First fetch the executor.
     if (launcher.setup() < 0) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index e8d16ca..8c74c00 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -29,6 +29,7 @@ const Duration STATUS_UPDATE_RETRY_INTERVAL = Seconds(10);
 const Duration GC_DELAY = Weeks(1);
 const double GC_DISK_HEADROOM = 0.1;
 const Duration DISK_WATCH_INTERVAL = Minutes(1);
+const Duration RECOVERY_TIMEOUT = Minutes(15);
 const Duration RESOURCE_MONITORING_INTERVAL = Seconds(5);
 const uint32_t MAX_COMPLETED_FRAMEWORKS = 50;
 const uint32_t MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK = 150;

http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index 901fdf2..bbbbfd3 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -37,6 +37,7 @@ namespace slave {
 extern const Duration EXECUTOR_REGISTRATION_TIMEOUT;
 extern const Duration EXECUTOR_SHUTDOWN_GRACE_PERIOD;
 extern const Duration EXECUTOR_REREGISTER_TIMEOUT;
+extern const Duration RECOVERY_TIMEOUT;
 extern const Duration STATUS_UPDATE_RETRY_INTERVAL;
 extern const Duration GC_DELAY;
 extern const Duration DISK_WATCH_INTERVAL;

http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 616be9b..ea1e4f7 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -141,6 +141,14 @@ public:
         "      and the slave registers with the master as a new slave.",
         "reconnect");
 
+    add(&Flags::recovery_timeout,
+        "recovery_timeout",
+        "Amount of time alloted for the slave to recover. If the slave takes\n"
+        "longer than recovery_timeout to recover, any executors that are\n"
+        "waiting to reconnect to the slave will self-terminate.\n"
+        "NOTE: This flag is only applicable when checkpoint is enabled.\n",
+        RECOVERY_TIMEOUT);
+
     add(&Flags::strict,
         "strict",
         "If strict=true, any and all recovery errors are considered fatal.\n"
@@ -189,6 +197,7 @@ public:
   Duration resource_monitoring_interval;
   bool checkpoint;
   std::string recover;
+  Duration recovery_timeout;
   bool strict;
 #ifdef __linux__
   std::string cgroups_hierarchy;

http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index 24a7fb2..fa80293 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -136,7 +136,8 @@ void ProcessIsolator::launchExecutor(
       flags.hadoop_home,
       !local,
       flags.switch_user,
-      frameworkInfo.checkpoint());
+      frameworkInfo.checkpoint(),
+      flags.recovery_timeout);
 
   // We get the environment map for launching mesos-launcher before
   // the fork, because we have seen deadlock issues with ostringstream

http://git-wip-us.apache.org/repos/asf/mesos/blob/5eb50d6e/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp 
b/src/tests/slave_recovery_tests.cpp
index 0735dba..57636c1 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -690,6 +690,99 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
 }
 
 
+// The slave is stopped after a non-terminal update is received.
+// The command executor is expected to self-terminate while the slave
+// is down, because the recovery timeout elapses.
+// When the slave comes back up with recovery=reconnect, make
+// sure the task is properly transitioned to FAILED.
+TYPED_TEST(SlaveRecoveryTest, RecoveryTimeout)
+{
+  Try<PID<Master> > master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  TypeParam isolator1;
+
+  // Set a short recovery timeout, as we can't control the executor
+  // driver time when using the process / cgroups isolators.
+  slave::Flags flags = this->CreateSlaveFlags();
+  flags.recovery_timeout = Milliseconds(1);
+
+  Try<PID<Slave> > slave = this->StartSlave(&isolator1, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());      // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+  vector<TaskInfo> tasks;
+  tasks.push_back(task); // Long-running task.
+
+  EXPECT_CALL(sched, statusUpdate(_, _));
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  // Wait for the ACK to be checkpointed.
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  this->Stop(slave.get());
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Ensure the executor terminates by causing the recovery timeout
+  // to elapse while disconnected from the slave.
+  os::sleep(Milliseconds(1));
+
+  Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
+
+  // Restart the slave (use same flags) with a new isolator.
+  TypeParam isolator2;
+
+  slave = this->StartSlave(&isolator2, flags);
+  ASSERT_SOME(slave);
+
+  Clock::pause();
+
+  AWAIT_READY(_recover);
+
+  Clock::advance(EXECUTOR_REREGISTER_TIMEOUT);
+  Clock::settle();
+
+  Clock::resume();
+
+  // Scheduler should receive the TASK_FAILED update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_FAILED, status.get().state());
+
+  driver.stop();
+  driver.join();
+
+  this->Shutdown(); // Shutdown before isolator(s) get deallocated.
+}
+
+
 // The slave is stopped after an executor is completed (i.e., it has
 // terminated and all its updates have been acknowledged).
 // When it comes back up with recovery=reconnect, make

Reply via email to