tgravescs commented on a change in pull request #28257:
URL: https://github.com/apache/spark/pull/28257#discussion_r412194933
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -675,11 +675,13 @@ private[spark] class TaskSchedulerImpl(
// Check whether the barrier tasks are partially launched.
// TODO SPARK-24818 handle the assert failure case (that can happen
when some locality
// requirements are not fulfilled, and we should revert the launched
tasks).
- require(addressesWithDescs.size == taskSet.numTasks,
- s"Skip current round of resource offers for barrier stage
${taskSet.stageId} " +
+ if (addressesWithDescs.size != taskSet.numTasks) {
+ dagScheduler.taskSetFailed(taskSet.taskSet,
Review comment:
it might be better to call the TaskSetManager.abort function.
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -401,9 +401,7 @@ private[spark] class TaskSchedulerImpl(
// addresses are the same as that we allocated in
taskResourceAssignments since it's
// synchronized. We don't remove the exact addresses allocated
because the current
// approach produces the identical result with less time
complexity.
- availableResources(i).getOrElse(rName,
- throw new SparkException(s"Try to acquire resource $rName
that doesn't exist."))
Review comment:
yeah this shouldn't happen I think it was there as just in case a bug or
someone changed something that would get caught here. I'm ok with this
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -675,11 +675,13 @@ private[spark] class TaskSchedulerImpl(
// Check whether the barrier tasks are partially launched.
// TODO SPARK-24818 handle the assert failure case (that can happen
when some locality
// requirements are not fulfilled, and we should revert the launched
tasks).
- require(addressesWithDescs.size == taskSet.numTasks,
- s"Skip current round of resource offers for barrier stage
${taskSet.stageId} " +
+ if (addressesWithDescs.size != taskSet.numTasks) {
+ dagScheduler.taskSetFailed(taskSet.taskSet,
Review comment:
what gets displayed in the logs here? We could still throw the
exception for it to print message in the logs
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -484,9 +482,11 @@ private[spark] class TaskSchedulerImpl(
numTasksPerExecCores
} else {
val taskLimit =
resourceProfile.taskResources.get(limitingResource).map(_.amount)
- .getOrElse(throw new SparkException("limitingResource returns from
ResourceProfile" +
- s" $resourceProfile doesn't actually contain that task resource!")
- )
+ .getOrElse {
+ dagScheduler.taskSetFailed(taskSet, "limitingResource returns from
ResourceProfile " +
Review comment:
similar use tasksetmanager.abort
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]