Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21353#discussion_r188974319
  
    --- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
    @@ -88,14 +96,53 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
       private[spark] val combinerClassName: Option[String] =
         Option(reflect.classTag[C]).map(_.runtimeClass.getName)
     
    -  val shuffleId: Int = _rdd.context.newShuffleId()
    +  val shuffleId: Int = if (isContinuous) {
    +    // This will not be reset in continuous processing, set an invalid 
value for now.
    +    Int.MinValue
    +  } else {
    +    _rdd.context.newShuffleId()
    +  }
     
    -  val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(
    -    shuffleId, _rdd.partitions.length, this)
    +  val shuffleHandle: ShuffleHandle = if (isContinuous) {
    +    null
    +  } else {
    +    _rdd.context.env.shuffleManager.registerShuffle(
    +      shuffleId, _rdd.partitions.length, this)
    +  }
     
    -  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    +  if (!isContinuous) {
    +    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    +  }
     }
     
    +/**
    + * :: DeveloperApi ::
    + * Represents a dependency on the output of a shuffle stage of continuous 
type.
    + * Different with ShuffleDependency, the continuous dependency only create 
on Executor side,
    + * so the rdd in param is deserialized from taskBinary.
    + */
    +@DeveloperApi
    +class ContinuousShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    --- End diff --
    
    Changes about ShuffleDependency same in #21293 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to