Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9276#discussion_r43720590
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---
    @@ -181,7 +215,54 @@ case class Exchange(newPartitioning: Partitioning, 
child: SparkPlan) extends Una
             }
           }
         }
    -    new ShuffledRowRDD(rddWithPartitionIds, serializer, part.numPartitions)
    +
    +    // Now, we manually create a ShuffleDependency. Because pairs in 
rddWithPartitionIds
    +    // are in the form of (partitionId, row) and every partitionId is in 
the expected range
    +    // [0, part.numPartitions - 1]. The partitioner of this is a 
PartitionIdPassthrough.
    +    val dependency =
    +      new ShuffleDependency[Int, InternalRow, InternalRow](
    +        rddWithPartitionIds,
    +        new PartitionIdPassthrough(part.numPartitions),
    +        Some(serializer))
    +
    +    dependency
    +  }
    +
    +  /**
    +   * Returns a [[ShuffledRowRDD]] that represents the post-shuffle dataset.
    +   * This [[ShuffledRowRDD]] is created based on a given 
[[ShuffleDependency]] and an optional
    +   * partition start indices array. If this optional array is defined, the 
returned
    +   * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices 
of this array.
    +   */
    +  private[sql] def preparePostShuffleRDD(
    +      shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
    +      specifiedPartitionStartIndices: Option[Array[Int]] = None): 
ShuffledRowRDD = {
    +    // If an array of partition start indices is provided, we need to use 
this array
    +    // to create the ShuffledRowRDD. Also, we need to update 
newPartitioning to
    +    // update the number of post-shuffle partitions.
    +    specifiedPartitionStartIndices.foreach { indices =>
    +      assert(newPartitioning.isInstanceOf[HashPartitioning])
    +      newPartitioning = newPartitioning.withNumPartitions(indices.length)
    +    }
    +    new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
    +  }
    +
    +  protected override def doExecute(): RDD[InternalRow] = attachTree(this , 
"execute") {
    +    coordinator match {
    +      case Some(exchangeCoordinator) =>
    +        val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
    +        assert(shuffleRDD.partitions.length == 
newPartitioning.numPartitions)
    +        shuffleRDD
    +      case None =>
    +        val shuffleDependency = prepareShuffleDependency()
    +        preparePostShuffleRDD(shuffleDependency)
    +    }
    +  }
    +}
    +
    +object Exchange {
    +  def apply(newPartitioning: Partitioning, child: SparkPlan): Exchange = {
    +    Exchange(newPartitioning, child, None: Option[ExchangeCoordinator])
    --- End diff --
    
    sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to