cloud-fan commented on a change in pull request #27493: [SPARK-30751][SQL]
Combine the skewed readers into one in AQE skew join optimizations
URL: https://github.com/apache/spark/pull/27493#discussion_r377151516
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -249,78 +321,36 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
}
/**
- * A wrapper of shuffle query stage, which submits one reduce task to read a
single
- * shuffle partition 'partitionIndex' produced by the mappers in range
[startMapIndex, endMapIndex).
- * This is used to increase the parallelism when reading skewed partitions.
- *
- * @param child It's usually `ShuffleQueryStageExec`, but can be the shuffle
exchange
- * node during canonicalization.
- * @param partitionIndex The pre shuffle partition index.
- * @param startMapIndex The start map index.
- * @param endMapIndex The end map index.
- */
-case class SkewedPartitionReaderExec(
- child: QueryStageExec,
- partitionIndex: Int,
- startMapIndex: Int,
- endMapIndex: Int) extends LeafExecNode {
-
- override def output: Seq[Attribute] = child.output
-
- override def outputPartitioning: Partitioning = {
- UnknownPartitioning(1)
- }
- private var cachedSkewedShuffleRDD: SkewedShuffledRowRDD = null
-
- override def doExecute(): RDD[InternalRow] = {
- if (cachedSkewedShuffleRDD == null) {
- cachedSkewedShuffleRDD = child match {
- case stage: ShuffleQueryStageExec =>
- stage.shuffle.createSkewedShuffleRDD(partitionIndex, startMapIndex,
endMapIndex)
- case _ =>
- throw new IllegalStateException("operating on canonicalization plan")
- }
- }
- cachedSkewedShuffleRDD
- }
-}
-
-/**
- * A wrapper of shuffle query stage, which skips some partitions when reading
the shuffle blocks.
+ * A wrapper of shuffle query stage, which follows the given partition
arrangement.
*
* @param child It's usually `ShuffleQueryStageExec`, but can be the shuffle
exchange node during
* canonicalization.
- * @param excludedPartitions The partitions to skip when reading.
+ * @param partitionSpecs The partition specs that defines the arrangement.
+ * @param skewDesc The description of the skewed partitions.
*/
-case class PartialShuffleReaderExec(
- child: QueryStageExec,
- excludedPartitions: Set[Int]) extends UnaryExecNode {
+case class SkewJoinShuffleReaderExec(
Review comment:
I think we still need multiple shuffle readers, but use the same RDD.
For example, `LocalShuffleReaderExec` override `outputPartitioning`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]