tgravescs commented on a change in pull request #30650:
URL: https://github.com/apache/spark/pull/30650#discussion_r551985269



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -661,35 +667,52 @@ private[spark] class TaskSchedulerImpl(
         }
 
         if (launchedAnyTask && taskSet.isBarrier) {
+          val barrierPendingLaunchTasks = 
taskSet.barrierPendingLaunchTasks.values.toArray
           // Check whether the barrier tasks are partially launched.
-          // TODO SPARK-24818 handle the assert failure case (that can happen 
when some locality
-          // requirements are not fulfilled, and we should revert the launched 
tasks).
-          if (addressesWithDescs.size != taskSet.numTasks) {
-            val errorMsg =
-              s"Fail resource offers for barrier stage ${taskSet.stageId} 
because only " +
-                s"${addressesWithDescs.size} out of a total number of 
${taskSet.numTasks}" +
-                s" tasks got resource offers. This happens because barrier 
execution currently " +
-                s"does not work gracefully with delay scheduling. We highly 
recommend you to " +
-                s"disable delay scheduling by setting spark.locality.wait=0 as 
a workaround if " +
-                s"you see this error frequently."
-            logWarning(errorMsg)
-            taskSet.abort(errorMsg)
-            throw new SparkException(errorMsg)
-          }
+          if (barrierPendingLaunchTasks.length != taskSet.numTasks) {
+            barrierPendingLaunchTasks.foreach { task =>
+              // revert all assigned resources
+              availableCpus(task.assignedOfferIndex) += task.assignedCores
+              task.assignedResources.foreach { case (rName, rInfo) =>
+                
availableResources(task.assignedOfferIndex)(rName).appendAll(rInfo.addresses)
+              }
+              // re-add the task to the schedule pending list
+              taskSet.addPendingTask(task.index)

Review comment:
       this is going to cause the rack to be re-resolved - at last on yarn 
side.  probably not a big deal unless we start seeing some performance issue 
from it then we may want handle differently. 

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -439,77 +446,109 @@ private[spark] class TaskSetManager(
         }
       }
 
+      var dequeuedTaskIndex: Option[Int] = None
       val taskDescription =
         dequeueTask(execId, host, allowedLocality)
           .map { case (index, taskLocality, speculative) =>
-        // Found a task; do some bookkeeping and return a task description
-        val task = tasks(index)
-        val taskId = sched.newTaskId()
-        // Do various bookkeeping
-        copiesRunning(index) += 1
-        val attemptNum = taskAttempts(index).size
-        val info = new TaskInfo(taskId, index, attemptNum, curTime,
-          execId, host, taskLocality, speculative)
-        taskInfos(taskId) = info
-        taskAttempts(index) = info :: taskAttempts(index)
-        if (legacyLocalityWaitReset && maxLocality != TaskLocality.NO_PREF) {
-          resetDelayScheduleTimer(Some(taskLocality))
-        }
-        // Serialize and return the task
-        val serializedTask: ByteBuffer = try {
-          ser.serialize(task)
-        } catch {
-          // If the task cannot be serialized, then there's no point to 
re-attempt the task,
-          // as it will always fail. So just abort the whole task-set.
-          case NonFatal(e) =>
-            val msg = s"Failed to serialize task $taskId, not attempting to 
retry it."
-            logError(msg, e)
-            abort(s"$msg Exception during serialization: $e")
-            throw new TaskNotSerializableException(e)
-        }
-        if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 
1024 &&
-          !emittedTaskSizeWarning) {
-          emittedTaskSizeWarning = true
-          logWarning(s"Stage ${task.stageId} contains a task of very large 
size " +
-            s"(${serializedTask.limit() / 1024} KiB). The maximum recommended 
task size is " +
-            s"${TaskSetManager.TASK_SIZE_TO_WARN_KIB} KiB.")
-        }
-        addRunningTask(taskId)
-
-        // We used to log the time it takes to serialize the task, but task 
size is already
-        // a good proxy to task serialization time.
-        // val timeTaken = clock.getTime() - startTime
-        val tName = taskName(taskId)
-        logInfo(s"Starting $tName ($host, executor ${info.executorId}, " +
-          s"partition ${task.partitionId}, $taskLocality, 
${serializedTask.limit()} bytes) " +
-          s"taskResourceAssignments ${taskResourceAssignments}")
-
-        sched.dagScheduler.taskStarted(task, info)
-        new TaskDescription(
-          taskId,
-          attemptNum,
-          execId,
-          tName,
-          index,
-          task.partitionId,
-          addedFiles,
-          addedJars,
-          addedArchives,
-          task.localProperties,
-          taskResourceAssignments,
-          serializedTask)
-      }
+            dequeuedTaskIndex = Some(index)
+            if (legacyLocalityWaitReset && maxLocality != 
TaskLocality.NO_PREF) {
+              resetDelayScheduleTimer(Some(taskLocality))

Review comment:
       So as @mridulm pointed out, since the legacy locality wait is reset  
every time we could schedule something I think that means you will never look 
at other levels, correct?  I think this was behavior before but that doesn't 
seem right. I think that means you could have enough slots available and some 
are node local but you won't ever be able to use the ones that aren't.  Please 
correct me if I'm wrong.  I guess that could be separate issue since same 
behavior as before.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to