Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4708#discussion_r25202931
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -210,40 +210,58 @@ class DAGScheduler(
        * The jobId value passed in will be used if the stage doesn't already 
exist with
        * a lower jobId (jobId always increases across jobs.)
        */
    -  private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], 
jobId: Int): Stage = {
    +  private def getShuffleMapStage(
    +      shuffleDep: ShuffleDependency[_, _, _],
    +      jobId: Int): ShuffleMapStage = {
         shuffleToMapStage.get(shuffleDep.shuffleId) match {
           case Some(stage) => stage
           case None =>
             // We are going to register ancestor shuffle dependencies
             registerShuffleDependencies(shuffleDep, jobId)
             // Then register current shuffleDep
    -        val stage =
    -          newOrUsedStage(
    -            shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, 
jobId,
    -            shuffleDep.rdd.creationSite)
    +        val stage = newOrUsedShuffleStage(shuffleDep, jobId)
             shuffleToMapStage(shuffleDep.shuffleId) = stage
    - 
    +
             stage
         }
       }
     
       /**
    -   * Create a Stage -- either directly for use as a result stage, or as 
part of the (re)-creation
    -   * of a shuffle map stage in newOrUsedStage.  The stage will be 
associated with the provided
    -   * jobId. Production of shuffle map stages should always use 
newOrUsedStage, not newStage
    -   * directly.
    +   * Create a ShuffleMapStage as part of the (re)-creation of a shuffle 
map stage in
    +   * newOrUsedShuffleStage.  The stage will be associated with the provide 
jobId.
    +   * Production of shuffle map stages should always use 
newOrUsedShuffleStage,not
    +   * newShuffleMapStage directly.
        */
    -  private def newStage(
    +  private def newShuffleMapStage(
           rdd: RDD[_],
           numTasks: Int,
    -      shuffleDep: Option[ShuffleDependency[_, _, _]],
    +      shuffleDep: ShuffleDependency[_, _, _],
           jobId: Int,
    -      callSite: CallSite)
    -    : Stage =
    -  {
    +      callSite: CallSite): ShuffleMapStage = {
         val parentStages = getParentStages(rdd, jobId)
         val id = nextStageId.getAndIncrement()
    -    val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, 
jobId, callSite)
    +    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, 
parentStages,
    +      jobId, callSite, shuffleDep)
    +
    +    stageIdToStage(id) = stage
    +    updateJobIdStageIdMaps(jobId, stage)
    +    stage
    +  }
    +
    +  /**
    +   * Create a ResultStage -- either directly for use as a result stage, or 
as part of the
    +   * (re)-creation of a shuffle map stage in newOrUsedShuffleStage.  The 
stage will be associated
    +   * with the provided jobId.
    +   */
    +  private def newResultStage(
    +      rdd: RDD[_],
    +      numTasks: Int,
    +      jobId: Int,
    +      callSite: CallSite): ResultStage = {
    +    val parentStages = getParentStages(rdd, jobId)
    +    val id = nextStageId.getAndIncrement()
    +    val stage: ResultStage = new ResultStage(id, rdd, numTasks, 
parentStages, jobId, callSite)
    +
    --- End diff --
    
    I'd rather avoid the code duplication in newShuffleMapStage and 
newResultStage.  This can be done in generic fashion via runtime reflection:
    ```scala
    import scala.reflect.runtime.{universe => ru}
    ...
      private def newStage[T <: Stage: ru.TypeTag](
          rdd: RDD[_],
          numTasks: Int,
          shuffleDep: Option[ShuffleDependency[_, _, _]],
          jobId: Int,
          callSite: CallSite): T = {
        val m = ru.runtimeMirror(getClass.getClassLoader)
        val classT = ru.typeOf[T].typeSymbol.asClass
        val cm = m.reflectClass(classT)
        val ctor = ru.typeOf[T].declaration(ru.nme.CONSTRUCTOR).asMethod
        val ctorm = cm.reflectConstructor(ctor)
        val parentStages = getParentStages(rdd, jobId)
        val id = nextStageId.getAndIncrement()
        val stage = shuffleDep.map { shufDep =>
          ctorm(id, rdd, numTasks, parentStages, jobId, callSite, shufDep)
        }.getOrElse(ctorm(id, rdd, numTasks, parentStages, jobId, 
callSite)).asInstanceOf[T]
    
        stageIdToStage(id) = stage
        updateJobIdStageIdMaps(jobId, stage)
        stage
      }
    ...
      val stage = newStage[ShuffleMapStage](rdd, numTasks, Some(shuffleDep), 
jobId, rdd.creationSite)
    ...
          finalStage = newStage[ResultStage](finalRDD, partitions.size, None, 
jobId, callSite)
    ```
    ...but I'd want to see the performance numbers on that before deciding not 
to go with a less flexible approach that avoids reflection:
    ```scala
      private def newStage[T <: Stage](
          rdd: RDD[_],
          numTasks: Int,
          shuffleDep: Option[ShuffleDependency[_, _, _]],
          jobId: Int,
          callSite: CallSite): T = {
        val parentStages = getParentStages(rdd, jobId)
        val id = nextStageId.getAndIncrement()
        val stage = shuffleDep.map { shufDep =>
          new ShuffleMapStage(id, rdd, numTasks, parentStages, jobId, callSite, 
shufDep)
        }.getOrElse(new ResultStage(id, rdd, numTasks, parentStages, jobId, 
callSite)).asInstanceOf[T]
    
        stageIdToStage(id) = stage
        updateJobIdStageIdMaps(jobId, stage)
        stage
      }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to