Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r143969946 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -804,45 +814,52 @@ private[spark] class MesosClusterScheduler( logInfo(s"Received status update: taskId=${taskId}" + s" state=${status.getState}" + s" message=${status.getMessage}" + - s" reason=${status.getReason}"); + s" reason=${status.getReason}") stateLock.synchronized { - if (launchedDrivers.contains(taskId)) { + val subId = getSumbmissionIdFromTaskId(taskId) + if (launchedDrivers.contains(subId)) { if (status.getReason == Reason.REASON_RECONCILIATION && !pendingRecover.contains(taskId)) { // Task has already received update and no longer requires reconciliation. return } - val state = launchedDrivers(taskId) + val state = launchedDrivers(subId) // Check if the driver is supervise enabled and can be relaunched. if (state.driverDescription.supervise && shouldRelaunch(status.getState)) { - removeFromLaunchedDrivers(taskId) + removeFromLaunchedDrivers(subId) state.finishDate = Some(new Date()) val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState val (retries, waitTimeSec) = retryState .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) } .getOrElse{ (1, 1) } val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L) - val newDriverDescription = state.driverDescription.copy( retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec))) - addDriverToPending(newDriverDescription, taskId); + addDriverToPending(newDriverDescription, newDriverDescription.submissionId) } else if (TaskState.isFinished(mesosToTaskState(status.getState))) { - removeFromLaunchedDrivers(taskId) - state.finishDate = Some(new Date()) - if (finishedDrivers.size >= retainedDrivers) { - val toRemove = math.max(retainedDrivers / 10, 1) - finishedDrivers.trimStart(toRemove) - } - finishedDrivers += state + retireDriver(subId, state, status) } state.mesosTaskStatus = Option(status) } else { - logError(s"Unable to find driver $taskId in status update") + logError(s"Unable to find driver with $taskId in status update") } } } + private def retireDriver( + submissionId: String, + state: MesosClusterSubmissionState, + status: TaskStatus) = { --- End diff -- Will fix thanx.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org