Github user skonto commented on a diff in the pull request:
https://github.com/apache/spark/pull/19374#discussion_r143969946
--- Diff:
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
---
@@ -804,45 +814,52 @@ private[spark] class MesosClusterScheduler(
logInfo(s"Received status update: taskId=${taskId}" +
s" state=${status.getState}" +
s" message=${status.getMessage}" +
- s" reason=${status.getReason}");
+ s" reason=${status.getReason}")
stateLock.synchronized {
- if (launchedDrivers.contains(taskId)) {
+ val subId = getSumbmissionIdFromTaskId(taskId)
+ if (launchedDrivers.contains(subId)) {
if (status.getReason == Reason.REASON_RECONCILIATION &&
!pendingRecover.contains(taskId)) {
// Task has already received update and no longer requires
reconciliation.
return
}
- val state = launchedDrivers(taskId)
+ val state = launchedDrivers(subId)
// Check if the driver is supervise enabled and can be relaunched.
if (state.driverDescription.supervise &&
shouldRelaunch(status.getState)) {
- removeFromLaunchedDrivers(taskId)
+ removeFromLaunchedDrivers(subId)
state.finishDate = Some(new Date())
val retryState: Option[MesosClusterRetryState] =
state.driverDescription.retryState
val (retries, waitTimeSec) = retryState
.map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime,
rs.waitTime * 2)) }
.getOrElse{ (1, 1) }
val nextRetry = new Date(new Date().getTime + waitTimeSec *
1000L)
-
val newDriverDescription = state.driverDescription.copy(
retryState = Some(new MesosClusterRetryState(status, retries,
nextRetry, waitTimeSec)))
- addDriverToPending(newDriverDescription, taskId);
+ addDriverToPending(newDriverDescription,
newDriverDescription.submissionId)
} else if
(TaskState.isFinished(mesosToTaskState(status.getState))) {
- removeFromLaunchedDrivers(taskId)
- state.finishDate = Some(new Date())
- if (finishedDrivers.size >= retainedDrivers) {
- val toRemove = math.max(retainedDrivers / 10, 1)
- finishedDrivers.trimStart(toRemove)
- }
- finishedDrivers += state
+ retireDriver(subId, state, status)
}
state.mesosTaskStatus = Option(status)
} else {
- logError(s"Unable to find driver $taskId in status update")
+ logError(s"Unable to find driver with $taskId in status update")
}
}
}
+ private def retireDriver(
+ submissionId: String,
+ state: MesosClusterSubmissionState,
+ status: TaskStatus) = {
--- End diff --
Will fix thanx.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]