mridulm commented on a change in pull request #30650:
URL: https://github.com/apache/spark/pull/30650#discussion_r550323062
##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -439,77 +446,109 @@ private[spark] class TaskSetManager(
}
}
+ var dequeuedTaskIndex: Option[Int] = None
val taskDescription =
dequeueTask(execId, host, allowedLocality)
.map { case (index, taskLocality, speculative) =>
- // Found a task; do some bookkeeping and return a task description
- val task = tasks(index)
- val taskId = sched.newTaskId()
- // Do various bookkeeping
- copiesRunning(index) += 1
- val attemptNum = taskAttempts(index).size
- val info = new TaskInfo(taskId, index, attemptNum, curTime,
- execId, host, taskLocality, speculative)
- taskInfos(taskId) = info
- taskAttempts(index) = info :: taskAttempts(index)
- if (legacyLocalityWaitReset && maxLocality != TaskLocality.NO_PREF) {
- resetDelayScheduleTimer(Some(taskLocality))
- }
- // Serialize and return the task
- val serializedTask: ByteBuffer = try {
- ser.serialize(task)
- } catch {
- // If the task cannot be serialized, then there's no point to
re-attempt the task,
- // as it will always fail. So just abort the whole task-set.
- case NonFatal(e) =>
- val msg = s"Failed to serialize task $taskId, not attempting to
retry it."
- logError(msg, e)
- abort(s"$msg Exception during serialization: $e")
- throw new TaskNotSerializableException(e)
- }
- if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KIB *
1024 &&
- !emittedTaskSizeWarning) {
- emittedTaskSizeWarning = true
- logWarning(s"Stage ${task.stageId} contains a task of very large
size " +
- s"(${serializedTask.limit() / 1024} KiB). The maximum recommended
task size is " +
- s"${TaskSetManager.TASK_SIZE_TO_WARN_KIB} KiB.")
- }
- addRunningTask(taskId)
-
- // We used to log the time it takes to serialize the task, but task
size is already
- // a good proxy to task serialization time.
- // val timeTaken = clock.getTime() - startTime
- val tName = taskName(taskId)
- logInfo(s"Starting $tName ($host, executor ${info.executorId}, " +
- s"partition ${task.partitionId}, $taskLocality,
${serializedTask.limit()} bytes) " +
- s"taskResourceAssignments ${taskResourceAssignments}")
-
- sched.dagScheduler.taskStarted(task, info)
- new TaskDescription(
- taskId,
- attemptNum,
- execId,
- tName,
- index,
- task.partitionId,
- addedFiles,
- addedJars,
- addedArchives,
- task.localProperties,
- taskResourceAssignments,
- serializedTask)
- }
+ dequeuedTaskIndex = Some(index)
+ if (legacyLocalityWaitReset && maxLocality !=
TaskLocality.NO_PREF) {
+ resetDelayScheduleTimer(Some(taskLocality))
Review comment:
If there are multiple tsm's (stages), the reset will adversely affect
the scheduling for the barrier stage, no ?
I agree, once barrier stage is scheduled, it does not matter (but then this
wont get triggered anyway in that case if I am not wrong).
> To prevent the behavior change, I think we might need to correct the
locality level(like reverting the reset we did before) if the barrier taskset
failed to get launched at the end of a single resourceOffer round. WDYT?
I agree, this should be sufficient.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]