Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/4708#discussion_r27065074
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -210,40 +210,58 @@ class DAGScheduler(
* The jobId value passed in will be used if the stage doesn't already
exist with
* a lower jobId (jobId always increases across jobs.)
*/
- private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _],
jobId: Int): Stage = {
+ private def getShuffleMapStage(
+ shuffleDep: ShuffleDependency[_, _, _],
+ jobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, jobId)
// Then register current shuffleDep
- val stage =
- newOrUsedStage(
- shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep,
jobId,
- shuffleDep.rdd.creationSite)
+ val stage = newOrUsedShuffleStage(shuffleDep, jobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
-
+
stage
}
}
/**
- * Create a Stage -- either directly for use as a result stage, or as
part of the (re)-creation
- * of a shuffle map stage in newOrUsedStage. The stage will be
associated with the provided
- * jobId. Production of shuffle map stages should always use
newOrUsedStage, not newStage
- * directly.
+ * Create a ShuffleMapStage as part of the (re)-creation of a shuffle
map stage in
+ * newOrUsedShuffleStage. The stage will be associated with the
provided jobId.
+ * Production of shuffle map stages should always use
newOrUsedShuffleStage, not
+ * newShuffleMapStage directly.
+ */
+ private def newShuffleMapStage(
+ rdd: RDD[_],
+ numTasks: Int,
+ shuffleDep: ShuffleDependency[_, _, _],
+ jobId: Int,
+ callSite: CallSite): ShuffleMapStage = {
+ val parentStages = getParentStages(rdd, jobId)
+ val id = nextStageId.getAndIncrement()
+ val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks,
parentStages,
+ jobId, callSite, shuffleDep)
+
+ stageIdToStage(id) = stage
+ updateJobIdStageIdMaps(jobId, stage)
+ stage
--- End diff --
These three lines seem to be duplicated in L265-267 in the new code (in
`newResultStage`) is there a way to abstract them?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]