Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9276#discussion_r43719723
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---
    @@ -181,7 +215,54 @@ case class Exchange(newPartitioning: Partitioning, 
child: SparkPlan) extends Una
             }
           }
         }
    -    new ShuffledRowRDD(rddWithPartitionIds, serializer, part.numPartitions)
    +
    +    // Now, we manually create a ShuffleDependency. Because pairs in 
rddWithPartitionIds
    +    // are in the form of (partitionId, row) and every partitionId is in 
the expected range
    +    // [0, part.numPartitions - 1]. The partitioner of this is a 
PartitionIdPassthrough.
    +    val dependency =
    +      new ShuffleDependency[Int, InternalRow, InternalRow](
    +        rddWithPartitionIds,
    +        new PartitionIdPassthrough(part.numPartitions),
    +        Some(serializer))
    +
    +    dependency
    +  }
    +
    +  /**
    +   * Returns a [[ShuffledRowRDD]] that represents the post-shuffle dataset.
    +   * This [[ShuffledRowRDD]] is created based on a given 
[[ShuffleDependency]] and an optional
    +   * partition start indices array. If this optional array is defined, the 
returned
    +   * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices 
of this array.
    +   */
    +  private[sql] def preparePostShuffleRDD(
    +      shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
    +      specifiedPartitionStartIndices: Option[Array[Int]] = None): 
ShuffledRowRDD = {
    +    // If an array of partition start indices is provided, we need to use 
this array
    +    // to create the ShuffledRowRDD. Also, we need to update 
newPartitioning to
    +    // update the number of post-shuffle partitions.
    +    specifiedPartitionStartIndices.foreach { indices =>
    +      assert(newPartitioning.isInstanceOf[HashPartitioning])
    +      newPartitioning = newPartitioning.withNumPartitions(indices.length)
    --- End diff --
    
    Why not create a new `UnknownPartitioning` with the specified number of 
partitions? Based on your comments later in this file, it seems like it's not 
really correct to compare a `HashPartitioning(k)`s that was created via 
coalescing to one that was non-coalesced, so using `UnknownPartitioning` might 
help to avoid this. Just wondering if there are any tricky correctness 
corner-cases that we might hit as a result of this line. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to