Github user markhamstra commented on a diff in the pull request:
https://github.com/apache/spark/pull/4708#discussion_r25202931
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -210,40 +210,58 @@ class DAGScheduler(
* The jobId value passed in will be used if the stage doesn't already
exist with
* a lower jobId (jobId always increases across jobs.)
*/
- private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _],
jobId: Int): Stage = {
+ private def getShuffleMapStage(
+ shuffleDep: ShuffleDependency[_, _, _],
+ jobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, jobId)
// Then register current shuffleDep
- val stage =
- newOrUsedStage(
- shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep,
jobId,
- shuffleDep.rdd.creationSite)
+ val stage = newOrUsedShuffleStage(shuffleDep, jobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
-
+
stage
}
}
/**
- * Create a Stage -- either directly for use as a result stage, or as
part of the (re)-creation
- * of a shuffle map stage in newOrUsedStage. The stage will be
associated with the provided
- * jobId. Production of shuffle map stages should always use
newOrUsedStage, not newStage
- * directly.
+ * Create a ShuffleMapStage as part of the (re)-creation of a shuffle
map stage in
+ * newOrUsedShuffleStage. The stage will be associated with the provide
jobId.
+ * Production of shuffle map stages should always use
newOrUsedShuffleStage,not
+ * newShuffleMapStage directly.
*/
- private def newStage(
+ private def newShuffleMapStage(
rdd: RDD[_],
numTasks: Int,
- shuffleDep: Option[ShuffleDependency[_, _, _]],
+ shuffleDep: ShuffleDependency[_, _, _],
jobId: Int,
- callSite: CallSite)
- : Stage =
- {
+ callSite: CallSite): ShuffleMapStage = {
val parentStages = getParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
- val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages,
jobId, callSite)
+ val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks,
parentStages,
+ jobId, callSite, shuffleDep)
+
+ stageIdToStage(id) = stage
+ updateJobIdStageIdMaps(jobId, stage)
+ stage
+ }
+
+ /**
+ * Create a ResultStage -- either directly for use as a result stage, or
as part of the
+ * (re)-creation of a shuffle map stage in newOrUsedShuffleStage. The
stage will be associated
+ * with the provided jobId.
+ */
+ private def newResultStage(
+ rdd: RDD[_],
+ numTasks: Int,
+ jobId: Int,
+ callSite: CallSite): ResultStage = {
+ val parentStages = getParentStages(rdd, jobId)
+ val id = nextStageId.getAndIncrement()
+ val stage: ResultStage = new ResultStage(id, rdd, numTasks,
parentStages, jobId, callSite)
+
--- End diff --
I'd rather avoid the code duplication in newShuffleMapStage and
newResultStage. This can be done in generic fashion via runtime reflection:
```scala
import scala.reflect.runtime.{universe => ru}
...
private def newStage[T <: Stage: ru.TypeTag](
rdd: RDD[_],
numTasks: Int,
shuffleDep: Option[ShuffleDependency[_, _, _]],
jobId: Int,
callSite: CallSite): T = {
val m = ru.runtimeMirror(getClass.getClassLoader)
val classT = ru.typeOf[T].typeSymbol.asClass
val cm = m.reflectClass(classT)
val ctor = ru.typeOf[T].declaration(ru.nme.CONSTRUCTOR).asMethod
val ctorm = cm.reflectConstructor(ctor)
val parentStages = getParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = shuffleDep.map { shufDep =>
ctorm(id, rdd, numTasks, parentStages, jobId, callSite, shufDep)
}.getOrElse(ctorm(id, rdd, numTasks, parentStages, jobId,
callSite)).asInstanceOf[T]
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
...
val stage = newStage[ShuffleMapStage](rdd, numTasks, Some(shuffleDep),
jobId, rdd.creationSite)
...
finalStage = newStage[ResultStage](finalRDD, partitions.size, None,
jobId, callSite)
```
...but I'd want to see the performance numbers on that before deciding not
to go with a less flexible approach that avoids reflection:
```scala
private def newStage[T <: Stage](
rdd: RDD[_],
numTasks: Int,
shuffleDep: Option[ShuffleDependency[_, _, _]],
jobId: Int,
callSite: CallSite): T = {
val parentStages = getParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = shuffleDep.map { shufDep =>
new ShuffleMapStage(id, rdd, numTasks, parentStages, jobId, callSite,
shufDep)
}.getOrElse(new ResultStage(id, rdd, numTasks, parentStages, jobId,
callSite)).asInstanceOf[T]
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]