holdenk commented on a change in pull request #29422:
URL: https://github.com/apache/spark/pull/29422#discussion_r470765386



##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -323,6 +326,7 @@ private[spark] class CoarseGrainedExecutorBackend(
               // move forward.
               lastTaskRunningTime = System.nanoTime()
             }
+            Thread.sleep(sleep_time)

Review comment:
       This was moved so initial sleep time didn't have sleep_time added to it 
on the first pass through right? Nothing else?

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -294,10 +294,13 @@ private[spark] class CoarseGrainedExecutorBackend(
         override def run(): Unit = {
           var lastTaskRunningTime = System.nanoTime()
           val sleep_time = 1000 // 1s
-
+          val initialSleepMillis = env.conf.getInt(

Review comment:
       Maybe just add a comment here that this is for testing only.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2012,7 +2013,8 @@ private[spark] class DAGScheduler(
       execId: String,
       fileLost: Boolean,
       hostToUnregisterOutputs: Option[String],
-      maybeEpoch: Option[Long] = None): Unit = {
+      maybeEpoch: Option[Long] = None,
+      ignoreShuffleVersion: Boolean = false): Unit = {

Review comment:
       Please add this to the java doc. Also I'm not completely sure about the 
name of the variable.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -1027,7 +1036,15 @@ private[spark] class TaskSchedulerImpl(
       }
     }
 
-    executorsPendingDecommission -= executorId
+
+    val decomInfo = executorsPendingDecommission.get(executorId)
+    if (decomInfo.isDefined) {
+      val rememberSeconds =
+        conf.getInt("spark.decommissioningRememberAfterRemoval.seconds", 60)
+      val gcSecond = TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis()) + 
rememberSeconds
+      decommissioningExecutorsToGc.computeIfAbsent(gcSecond, _ => 
mutable.ArrayBuffer.empty) +=
+        executorId
+    }

Review comment:
       Seems like repeated logic.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2022,16 +2024,25 @@ private[spark] class DAGScheduler(
       blockManagerMaster.removeExecutor(execId)
       clearCacheLocs()
     }
-    if (fileLost &&
-        (!shuffleFileLostEpoch.contains(execId) || 
shuffleFileLostEpoch(execId) < currentEpoch)) {
-      shuffleFileLostEpoch(execId) = currentEpoch
-      hostToUnregisterOutputs match {
-        case Some(host) =>
-          logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)")
-          mapOutputTracker.removeOutputsOnHost(host)
-        case None =>
-          logInfo(s"Shuffle files lost for executor: $execId (epoch 
$currentEpoch)")
-          mapOutputTracker.removeOutputsOnExecutor(execId)
+    if (fileLost) {

Review comment:
       Can we have a comment here clarifying the reasoning behind this logic?

##########
File path: 
core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
##########
@@ -212,22 +226,27 @@ class DecommissionWorkerSuite
       override def handleRootTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
         val taskInfo = taskEnd.taskInfo
         if (taskInfo.executorId == executorToDecom && taskInfo.attemptNumber 
== 0 &&
-          taskEnd.stageAttemptId == 0) {
+          taskEnd.stageAttemptId == 0 && taskEnd.stageId == 0) {
           decommissionWorkerOnMaster(workerToDecom,
             "decommission worker after task on it is done")
         }
       }
     }
-    TestUtils.withListener(sc, listener) { _ =>
+    withListener(sc, listener) { _ =>
       val jobResult = sc.parallelize(1 to 2, 2).mapPartitionsWithIndex((_, _) 
=> {
         val executorId = SparkEnv.get.executorId
-        val sleepTimeSeconds = if (executorId == executorToDecom) 10 else 1
-        Thread.sleep(sleepTimeSeconds * 1000L)
+        val context = TaskContext.get()
+        if (context.attemptNumber() == 0 && context.stageAttemptNumber() == 0) 
{
+          val sleepTimeSeconds = if (executorId == executorToDecom) 10 else 1
+          Thread.sleep(sleepTimeSeconds * 1000L)
+        }

Review comment:
       I assume this is for speed up right?

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -136,7 +137,9 @@ private[spark] class TaskSchedulerImpl(
   // IDs of the tasks running on each executor
   private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
 
-  private val executorsPendingDecommission = new HashMap[String, 
ExecutorDecommissionInfo]
+  val executorsPendingDecommission = new HashMap[String, 
ExecutorDecommissionInfo]
+  // map of second to list of executors to clear form the above map
+  val decommissioningExecutorsToGc = new util.TreeMap[Long, 
mutable.ArrayBuffer[String]]()

Review comment:
       Could we use 
https://guava.dev/releases/17.0/api/docs/com/google/common/cache/CacheBuilder.html
 or other similar class that handles the GC cleanup for us? We already use 
CacheBuilder some places inside of Spark which is why I suggest it. (side note: 
I hadn't seen Java's TreeMap before, and it's a really neat use here).

##########
File path: 
core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
##########
@@ -84,6 +84,19 @@ class DecommissionWorkerSuite
     }
   }
 
+  // Unlike TestUtils.withListener, it also waits for the job to be done

Review comment:
       Nice fix / cleanup




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to