dongjoon-hyun commented on a change in pull request #24276: 
[SPARK-27347][MESOS] Fix supervised driver retry logic for outdated tasks
URL: https://github.com/apache/spark/pull/24276#discussion_r282975909
 
 

 ##########
 File path: 
resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
 ##########
 @@ -450,6 +450,84 @@ class MesosClusterSchedulerSuite extends SparkFunSuite 
with LocalSparkContext wi
     assert(state.finishedDrivers.size == 1)
   }
 
+  test("SPARK-27347: do not restart outdated supervised drivers") {
+    // Covers scenario where:
+    // - agent goes down
+    // - supervised job is relaunched on another agent
+    // - first agent re-registers and sends status update: TASK_FAILED
+    // - job should NOT be relaunched again
+    val conf = new SparkConf()
+    conf.setMaster("mesos://localhost:5050")
+    conf.setAppName("SparkMesosDriverRetries")
+    setScheduler(conf.getAll.toMap)
+
+    val mem = 1000
+    val cpu = 1
+    val offers = List(
+      Utils.createOffer("o1", "s1", mem, cpu, None),
+      Utils.createOffer("o2", "s2", mem, cpu, None),
+      Utils.createOffer("o3", "s1", mem, cpu, None))
+
+    val response = scheduler.submitDriver(
+      new MesosDriverDescription("d1", "jar", 100, 1, true, command,
+        Map(("spark.mesos.executor.home", "test"), ("spark.app.name", 
"test")), "sub1", new Date()))
+    assert(response.success)
+
+    // Offer a resource to launch the submitted driver
+    scheduler.resourceOffers(driver, Collections.singletonList(offers.head))
+    var state = scheduler.getSchedulerState()
+    assert(state.launchedDrivers.size == 1)
+
+    // Signal agent lost with status with TASK_LOST
+    val agent1 = SlaveID.newBuilder().setValue("s1").build()
+    var taskStatus = TaskStatus.newBuilder()
+      .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
+      .setSlaveId(agent1)
+      .setReason(TaskStatus.Reason.REASON_SLAVE_REMOVED)
+      .setState(MesosTaskState.TASK_LOST)
+      .build()
+
+    scheduler.statusUpdate(driver, taskStatus)
+    state = scheduler.getSchedulerState()
+    assert(state.pendingRetryDrivers.size == 1)
+    assert(state.launchedDrivers.isEmpty)
 
 Review comment:
   Thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to