Github user skonto commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19374#discussion_r143969946
  
    --- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
    @@ -804,45 +814,52 @@ private[spark] class MesosClusterScheduler(
         logInfo(s"Received status update: taskId=${taskId}" +
           s" state=${status.getState}" +
           s" message=${status.getMessage}" +
    -      s" reason=${status.getReason}");
    +      s" reason=${status.getReason}")
     
         stateLock.synchronized {
    -      if (launchedDrivers.contains(taskId)) {
    +      val subId = getSumbmissionIdFromTaskId(taskId)
    +      if (launchedDrivers.contains(subId)) {
             if (status.getReason == Reason.REASON_RECONCILIATION &&
               !pendingRecover.contains(taskId)) {
               // Task has already received update and no longer requires 
reconciliation.
               return
             }
    -        val state = launchedDrivers(taskId)
    +        val state = launchedDrivers(subId)
             // Check if the driver is supervise enabled and can be relaunched.
             if (state.driverDescription.supervise && 
shouldRelaunch(status.getState)) {
    -          removeFromLaunchedDrivers(taskId)
    +          removeFromLaunchedDrivers(subId)
               state.finishDate = Some(new Date())
               val retryState: Option[MesosClusterRetryState] = 
state.driverDescription.retryState
               val (retries, waitTimeSec) = retryState
                 .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, 
rs.waitTime * 2)) }
                 .getOrElse{ (1, 1) }
               val nextRetry = new Date(new Date().getTime + waitTimeSec * 
1000L)
    -
               val newDriverDescription = state.driverDescription.copy(
                 retryState = Some(new MesosClusterRetryState(status, retries, 
nextRetry, waitTimeSec)))
    -          addDriverToPending(newDriverDescription, taskId);
    +          addDriverToPending(newDriverDescription, 
newDriverDescription.submissionId)
             } else if 
(TaskState.isFinished(mesosToTaskState(status.getState))) {
    -          removeFromLaunchedDrivers(taskId)
    -          state.finishDate = Some(new Date())
    -          if (finishedDrivers.size >= retainedDrivers) {
    -            val toRemove = math.max(retainedDrivers / 10, 1)
    -            finishedDrivers.trimStart(toRemove)
    -          }
    -          finishedDrivers += state
    +          retireDriver(subId, state, status)
             }
             state.mesosTaskStatus = Option(status)
           } else {
    -        logError(s"Unable to find driver $taskId in status update")
    +        logError(s"Unable to find driver with $taskId in status update")
           }
         }
       }
     
    +  private def retireDriver(
    +      submissionId: String,
    +      state: MesosClusterSubmissionState,
    +      status: TaskStatus) = {
    --- End diff --
    
    Will fix thanx.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to