mridulm commented on a change in pull request #28257:
URL: https://github.com/apache/spark/pull/28257#discussion_r412639758
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -401,9 +401,7 @@ private[spark] class TaskSchedulerImpl(
// addresses are the same as that we allocated in
taskResourceAssignments since it's
// synchronized. We don't remove the exact addresses allocated
because the current
// approach produces the identical result with less time
complexity.
- availableResources(i).getOrElse(rName,
- throw new SparkException(s"Try to acquire resource $rName
that doesn't exist."))
Review comment:
Agree, this can be removed and simplified as below.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -468,8 +466,9 @@ private[spark] class TaskSchedulerImpl(
resourceProfileIds: Array[Int],
availableCpus: Array[Int],
availableResources: Array[Map[String, Buffer[String]]],
- rpId: Int): Int = {
- val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
+ taskSet: TaskSetManager): Int = {
+ val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(
+ taskSet.taskSet.resourceProfileId)
val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter {
case (id, _) =>
Review comment:
Any particular reason to change this ?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -675,11 +676,15 @@ private[spark] class TaskSchedulerImpl(
// Check whether the barrier tasks are partially launched.
// TODO SPARK-24818 handle the assert failure case (that can happen
when some locality
// requirements are not fulfilled, and we should revert the launched
tasks).
- require(addressesWithDescs.size == taskSet.numTasks,
- s"Skip current round of resource offers for barrier stage
${taskSet.stageId} " +
+ if (addressesWithDescs.size != taskSet.numTasks) {
+ val errorMsg =
+ s"Fail current round of resource offers for barrier stage
${taskSet.stageId} " +
s"because only ${addressesWithDescs.size} out of a total number
of " +
s"${taskSet.numTasks} tasks got resource offers. The resource
offers may have " +
- "been blacklisted or cannot fulfill task locality requirements.")
+ "been blacklisted or cannot fulfill task locality requirements."
+ logError(errorMsg)
Review comment:
nit: logWarning
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -675,11 +676,15 @@ private[spark] class TaskSchedulerImpl(
// Check whether the barrier tasks are partially launched.
// TODO SPARK-24818 handle the assert failure case (that can happen
when some locality
// requirements are not fulfilled, and we should revert the launched
tasks).
- require(addressesWithDescs.size == taskSet.numTasks,
- s"Skip current round of resource offers for barrier stage
${taskSet.stageId} " +
Review comment:
This is the primary issue causing the bug - and is overly strict.
Unfortunately, the way barrier scheduling has been written, the only way to
get it working currently is to disable delay scheduling entirely.
We will need to relook how to get barrier scheduling working more gracefully
with task locality and executor failures (and in future, dynamic allocation).
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -484,9 +483,11 @@ private[spark] class TaskSchedulerImpl(
numTasksPerExecCores
} else {
val taskLimit =
resourceProfile.taskResources.get(limitingResource).map(_.amount)
- .getOrElse(throw new SparkException("limitingResource returns from
ResourceProfile" +
- s" $resourceProfile doesn't actually contain that task resource!")
- )
+ .getOrElse {
+ taskSet.abort("limitingResource returns from ResourceProfile " +
+ s"$resourceProfile doesn't actually contain that task resource!")
+ return -1
Review comment:
We still need to throw the exception after the abort - else subsequent
code will continue.
This is a repeated pattern in this PR - when replacing an
exception/require/etc (which would have thrown an exception earlier) with a
task set abort, please ensure that the exception continues to be thrown.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]