tgravescs commented on a change in pull request #28257:
URL: https://github.com/apache/spark/pull/28257#discussion_r412194933



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -675,11 +675,13 @@ private[spark] class TaskSchedulerImpl(
           // Check whether the barrier tasks are partially launched.
           // TODO SPARK-24818 handle the assert failure case (that can happen 
when some locality
           // requirements are not fulfilled, and we should revert the launched 
tasks).
-          require(addressesWithDescs.size == taskSet.numTasks,
-            s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+          if (addressesWithDescs.size != taskSet.numTasks) {
+            dagScheduler.taskSetFailed(taskSet.taskSet,

Review comment:
       it might be better to call the TaskSetManager.abort function.  

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -401,9 +401,7 @@ private[spark] class TaskSchedulerImpl(
                 // addresses are the same as that we allocated in 
taskResourceAssignments since it's
                 // synchronized. We don't remove the exact addresses allocated 
because the current
                 // approach produces the identical result with less time 
complexity.
-                availableResources(i).getOrElse(rName,
-                  throw new SparkException(s"Try to acquire resource $rName 
that doesn't exist."))

Review comment:
       yeah this shouldn't happen I think it was there as just in case a bug or 
someone changed something that would get caught here.  I'm ok with this

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -675,11 +675,13 @@ private[spark] class TaskSchedulerImpl(
           // Check whether the barrier tasks are partially launched.
           // TODO SPARK-24818 handle the assert failure case (that can happen 
when some locality
           // requirements are not fulfilled, and we should revert the launched 
tasks).
-          require(addressesWithDescs.size == taskSet.numTasks,
-            s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
+          if (addressesWithDescs.size != taskSet.numTasks) {
+            dagScheduler.taskSetFailed(taskSet.taskSet,

Review comment:
       what gets displayed in the logs here?  We could still throw the 
exception for it to print message in the logs 

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -484,9 +482,11 @@ private[spark] class TaskSchedulerImpl(
         numTasksPerExecCores
       } else {
         val taskLimit = 
resourceProfile.taskResources.get(limitingResource).map(_.amount)
-          .getOrElse(throw new SparkException("limitingResource returns from 
ResourceProfile" +
-            s" $resourceProfile doesn't actually contain that task resource!")
-          )
+          .getOrElse {
+            dagScheduler.taskSetFailed(taskSet, "limitingResource returns from 
ResourceProfile " +

Review comment:
       similar use tasksetmanager.abort




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to