Repository: spark
Updated Branches:
  refs/heads/master 226d38840 -> 0fbecc736


[SPARK-19537] Move pendingPartitions to ShuffleMapStage.

The pendingPartitions instance variable should be moved to ShuffleMapStage,
because it is only used by ShuffleMapStages. This change is purely refactoring
and does not change functionality.

I fixed this in an attempt to clarify some of the discussion around #16620, 
which I was having trouble reasoning about.  I stole the helpful comment Imran 
wrote for pendingPartitions and used it here.

cc squito markhamstra jinxing64

Author: Kay Ousterhout <kayousterh...@gmail.com>

Closes #16876 from kayousterhout/SPARK-19537.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fbecc73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fbecc73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fbecc73

Branch: refs/heads/master
Commit: 0fbecc736df95bf757cb497c108ae3dbc5893829
Parents: 226d388
Author: Kay Ousterhout <kayousterh...@gmail.com>
Authored: Fri Feb 10 22:34:57 2017 -0800
Committer: Kay Ousterhout <kayousterh...@gmail.com>
Committed: Fri Feb 10 22:34:57 2017 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 20 +++++++++++++-------
 .../spark/scheduler/ShuffleMapStage.scala       | 13 +++++++++++++
 .../org/apache/spark/scheduler/Stage.scala      |  2 --
 3 files changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0fbecc73/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b9d7e13..69101ac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -932,8 +932,6 @@ class DAGScheduler(
   /** Called when stage's parents are available and we can now do its task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
     logDebug("submitMissingTasks(" + stage + ")")
-    // Get our pending tasks and remember them in our pendingTasks entry
-    stage.pendingPartitions.clear()
 
     // First figure out the indexes of partition ids to compute.
     val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
@@ -1013,9 +1011,11 @@ class DAGScheduler(
       val serializedTaskMetrics = 
closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
       stage match {
         case stage: ShuffleMapStage =>
+          stage.pendingPartitions.clear()
           partitionsToCompute.map { id =>
             val locs = taskIdToLocations(id)
             val part = stage.rdd.partitions(id)
+            stage.pendingPartitions += id
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
               taskBinary, part, locs, properties, serializedTaskMetrics, 
Option(jobId),
               Option(sc.applicationId), sc.applicationAttemptId)
@@ -1039,9 +1039,8 @@ class DAGScheduler(
     }
 
     if (tasks.size > 0) {
-      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " 
(" + stage.rdd + ")")
-      stage.pendingPartitions ++= tasks.map(_.partitionId)
-      logDebug("New pending partitions: " + stage.pendingPartitions)
+      logInfo(s"Submitting ${tasks.size} missing tasks from $stage 
(${stage.rdd}) (first 15 " +
+        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
       taskScheduler.submitTasks(new TaskSet(
         tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, 
properties))
       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
@@ -1147,7 +1146,6 @@ class DAGScheduler(
     val stage = stageIdToStage(task.stageId)
     event.reason match {
       case Success =>
-        stage.pendingPartitions -= task.partitionId
         task match {
           case rt: ResultTask[_, _] =>
             // Cast to ResultStage here because it's part of the ResultTask
@@ -1183,6 +1181,7 @@ class DAGScheduler(
 
           case smt: ShuffleMapTask =>
             val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
+            shuffleStage.pendingPartitions -= task.partitionId
             updateAccumulators(event)
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
@@ -1235,7 +1234,14 @@ class DAGScheduler(
 
       case Resubmitted =>
         logInfo("Resubmitted " + task + ", so marking it as still running")
-        stage.pendingPartitions += task.partitionId
+        stage match {
+          case sms: ShuffleMapStage =>
+            sms.pendingPartitions += task.partitionId
+
+          case _ =>
+            assert(false, "TaskSetManagers should only send Resubmitted task 
statuses for " +
+              "tasks in ShuffleMapStages.")
+        }
 
       case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) 
=>
         val failedStage = stageIdToStage(task.stageId)

http://git-wip-us.apache.org/repos/asf/spark/blob/0fbecc73/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
index 51416e5..db4d9ef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import scala.collection.mutable.HashSet
+
 import org.apache.spark.ShuffleDependency
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.BlockManagerId
@@ -48,6 +50,17 @@ private[spark] class ShuffleMapStage(
   private[this] var _numAvailableOutputs: Int = 0
 
   /**
+   * Partitions that either haven't yet been computed, or that were computed 
on an executor
+   * that has since been lost, so should be re-computed.  This variable is 
used by the
+   * DAGScheduler to determine when a stage has completed. Task successes in 
both the active
+   * attempt for the stage or in earlier attempts for this stage can cause 
paritition ids to get
+   * removed from pendingPartitions. As a result, this variable may be 
inconsistent with the pending
+   * tasks in the TaskSetManager for the active attempt for the stage (the 
partitions stored here
+   * will always be a subset of the partitions that the TaskSetManager thinks 
are pending).
+   */
+  val pendingPartitions = new HashSet[Int]
+
+  /**
    * List of [[MapStatus]] for each partition. The index of the array is the 
map partition id,
    * and each value in the array is the list of possible [[MapStatus]] for a 
partition
    * (a single task might run multiple times).

http://git-wip-us.apache.org/repos/asf/spark/blob/0fbecc73/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index c628dd3..c6fc038 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -67,8 +67,6 @@ private[scheduler] abstract class Stage(
   /** Set of jobs that this stage belongs to. */
   val jobIds = new HashSet[Int]
 
-  val pendingPartitions = new HashSet[Int]
-
   /** The ID to use for the next new attempt for this stage. */
   private var nextAttemptId: Int = 0
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to