Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/9276#discussion_r43719723
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---
@@ -181,7 +215,54 @@ case class Exchange(newPartitioning: Partitioning,
child: SparkPlan) extends Una
}
}
}
- new ShuffledRowRDD(rddWithPartitionIds, serializer, part.numPartitions)
+
+ // Now, we manually create a ShuffleDependency. Because pairs in
rddWithPartitionIds
+ // are in the form of (partitionId, row) and every partitionId is in
the expected range
+ // [0, part.numPartitions - 1]. The partitioner of this is a
PartitionIdPassthrough.
+ val dependency =
+ new ShuffleDependency[Int, InternalRow, InternalRow](
+ rddWithPartitionIds,
+ new PartitionIdPassthrough(part.numPartitions),
+ Some(serializer))
+
+ dependency
+ }
+
+ /**
+ * Returns a [[ShuffledRowRDD]] that represents the post-shuffle dataset.
+ * This [[ShuffledRowRDD]] is created based on a given
[[ShuffleDependency]] and an optional
+ * partition start indices array. If this optional array is defined, the
returned
+ * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices
of this array.
+ */
+ private[sql] def preparePostShuffleRDD(
+ shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
+ specifiedPartitionStartIndices: Option[Array[Int]] = None):
ShuffledRowRDD = {
+ // If an array of partition start indices is provided, we need to use
this array
+ // to create the ShuffledRowRDD. Also, we need to update
newPartitioning to
+ // update the number of post-shuffle partitions.
+ specifiedPartitionStartIndices.foreach { indices =>
+ assert(newPartitioning.isInstanceOf[HashPartitioning])
+ newPartitioning = newPartitioning.withNumPartitions(indices.length)
--- End diff --
Why not create a new `UnknownPartitioning` with the specified number of
partitions? Based on your comments later in this file, it seems like it's not
really correct to compare a `HashPartitioning(k)`s that was created via
coalescing to one that was non-coalesced, so using `UnknownPartitioning` might
help to avoid this. Just wondering if there are any tricky correctness
corner-cases that we might hit as a result of this line.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]