mridulm commented on a change in pull request #28257:
URL: https://github.com/apache/spark/pull/28257#discussion_r412639758



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -401,9 +401,7 @@ private[spark] class TaskSchedulerImpl(
                 // addresses are the same as that we allocated in 
taskResourceAssignments since it's
                 // synchronized. We don't remove the exact addresses allocated 
because the current
                 // approach produces the identical result with less time 
complexity.
-                availableResources(i).getOrElse(rName,
-                  throw new SparkException(s"Try to acquire resource $rName 
that doesn't exist."))

Review comment:
       Agree, this can be removed and simplified as below.
   

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -468,8 +466,9 @@ private[spark] class TaskSchedulerImpl(
       resourceProfileIds: Array[Int],
       availableCpus: Array[Int],
       availableResources: Array[Map[String, Buffer[String]]],
-      rpId: Int): Int = {
-    val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
+      taskSet: TaskSetManager): Int = {
+    val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(
+      taskSet.taskSet.resourceProfileId)
     val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { 
case (id, _) =>

Review comment:
       Any particular reason to change this ?

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -675,11 +676,15 @@ private[spark] class TaskSchedulerImpl(
           // Check whether the barrier tasks are partially launched.
           // TODO SPARK-24818 handle the assert failure case (that can happen 
when some locality
           // requirements are not fulfilled, and we should revert the launched 
tasks).
-          require(addressesWithDescs.size == taskSet.numTasks,
-            s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+          if (addressesWithDescs.size != taskSet.numTasks) {
+            val errorMsg =
+              s"Fail current round of resource offers for barrier stage 
${taskSet.stageId} " +
               s"because only ${addressesWithDescs.size} out of a total number 
of " +
               s"${taskSet.numTasks} tasks got resource offers. The resource 
offers may have " +
-              "been blacklisted or cannot fulfill task locality requirements.")
+              "been blacklisted or cannot fulfill task locality requirements."
+            logError(errorMsg)

Review comment:
       nit: logWarning

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -675,11 +676,15 @@ private[spark] class TaskSchedulerImpl(
           // Check whether the barrier tasks are partially launched.
           // TODO SPARK-24818 handle the assert failure case (that can happen 
when some locality
           // requirements are not fulfilled, and we should revert the launched 
tasks).
-          require(addressesWithDescs.size == taskSet.numTasks,
-            s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +

Review comment:
       This is the primary issue causing the bug - and is overly strict.
   Unfortunately, the way barrier scheduling has been written, the only way to 
get it working currently is to disable delay scheduling entirely.
   We will need to relook how to get barrier scheduling working more gracefully 
with task locality and executor failures (and in future, dynamic allocation).

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -484,9 +483,11 @@ private[spark] class TaskSchedulerImpl(
         numTasksPerExecCores
       } else {
         val taskLimit = 
resourceProfile.taskResources.get(limitingResource).map(_.amount)
-          .getOrElse(throw new SparkException("limitingResource returns from 
ResourceProfile" +
-            s" $resourceProfile doesn't actually contain that task resource!")
-          )
+          .getOrElse {
+            taskSet.abort("limitingResource returns from ResourceProfile " +
+              s"$resourceProfile doesn't actually contain that task resource!")
+            return -1

Review comment:
       We still need to throw the exception after the abort - else subsequent 
code will continue.
   This is a repeated pattern in this PR - when replacing an 
exception/require/etc (which would have thrown an exception earlier) with a 
task set abort, please ensure that the exception continues to be thrown.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to