samvantran commented on a change in pull request #24276: [SPARK-27347][MESOS] 
Fix supervised driver retry logic for outdated tasks
URL: https://github.com/apache/spark/pull/24276#discussion_r282973452
 
 

 ##########
 File path: 
resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
 ##########
 @@ -450,6 +450,84 @@ class MesosClusterSchedulerSuite extends SparkFunSuite 
with LocalSparkContext wi
     assert(state.finishedDrivers.size == 1)
   }
 
+  test("SPARK-27347: do not restart outdated supervised drivers") {
+    // Covers scenario where:
+    // - agent goes down
+    // - supervised job is relaunched on another agent
+    // - first agent re-registers and sends status update: TASK_FAILED
+    // - job should NOT be relaunched again
+    val conf = new SparkConf()
+    conf.setMaster("mesos://localhost:5050")
+    conf.setAppName("SparkMesosDriverRetries")
+    setScheduler(conf.getAll.toMap)
+
+    val mem = 1000
+    val cpu = 1
+    val offers = List(
+      Utils.createOffer("o1", "s1", mem, cpu, None),
+      Utils.createOffer("o2", "s2", mem, cpu, None),
+      Utils.createOffer("o3", "s1", mem, cpu, None))
+
+    val response = scheduler.submitDriver(
+      new MesosDriverDescription("d1", "jar", 100, 1, true, command,
+        Map(("spark.mesos.executor.home", "test"), ("spark.app.name", 
"test")), "sub1", new Date()))
+    assert(response.success)
+
+    // Offer a resource to launch the submitted driver
+    scheduler.resourceOffers(driver, Collections.singletonList(offers.head))
+    var state = scheduler.getSchedulerState()
+    assert(state.launchedDrivers.size == 1)
+
+    // Signal agent lost with status with TASK_LOST
+    val agent1 = SlaveID.newBuilder().setValue("s1").build()
+    var taskStatus = TaskStatus.newBuilder()
+      .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
+      .setSlaveId(agent1)
+      .setReason(TaskStatus.Reason.REASON_SLAVE_REMOVED)
+      .setState(MesosTaskState.TASK_LOST)
+      .build()
+
+    scheduler.statusUpdate(driver, taskStatus)
+    state = scheduler.getSchedulerState()
+    assert(state.pendingRetryDrivers.size == 1)
+    assert(state.launchedDrivers.isEmpty)
 
 Review comment:
   Hm, in the test `state` is a `MesosClusterSchedulerState` object and does 
not have `taskId` nor `driverDescription` fields (that is in 
`MesosClusterSubmissionState`). 
   
   However at this point, `state.pendingRetryDrivers` **does** contain the same 
`taskStatus.taskId` (ie `sub1` in the test). That's because the task failed, 
got added to `pending` and is awaiting the next suitable offer. Later below on 
L506, a new offer comes in and the task is moved out of `pending` and into 
`launched` (with a new taskId: `sub1-retry-1`). From then on, what I'm testing 
is any new updates with the old taskId (`sub1`) have no effect. 
   
   But I've added two more tests that are more explicit about what IDs match 
where. Is that all right?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to