peter-toth commented on code in PR #54330:
URL: https://github.com/apache/spark/pull/54330#discussion_r2847523920
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -346,43 +348,105 @@ case class CoalescedHashPartitioning(from:
HashPartitioning, partitions: Seq[Coa
}
/**
- * Represents a partitioning where rows are split across partitions based on
transforms defined
- * by `expressions`. `partitionValues`, if defined, should contain value of
partition key(s) in
- * ascending order, after evaluated by the transforms in `expressions`, for
each input partition.
- * In addition, its length must be the same as the number of Spark partitions
(and thus is a 1-1
- * mapping), and each row in `partitionValues` must be unique.
+ * Represents a partitioning where rows are split across partitions based on
transforms defined by
+ * `expressions`.
+ *
+ * == Partition Keys ==
+ * This partitioning has two sets of partition keys:
+ *
+ * - `partitionKeys`: The current partition key for each partition, in
ascending order. May contain
+ * duplicates when first created from a data source, but becomes unique
after grouping.
+ *
+ * - `originalPartitionKeys`: The original partition keys from the data
source, in ascending order.
+ * Always preserves the original values, even after grouping. Used to track
the original
+ * distribution for optimization purposes.
*
- * The `originalPartitionValues`, on the other hand, are partition values from
the original input
- * splits returned by data sources. It may contain duplicated values.
+ * == Grouping State ==
+ * A KeyedPartitioning can be in two states:
*
- * For example, if a data source reports partition transform expressions
`[years(ts_col)]` with 4
- * input splits whose corresponding partition values are `[0, 1, 2, 2]`, then
the `expressions`
- * in this case is `[years(ts_col)]`, while `partitionValues` is `[0, 1, 2]`,
which
- * represents 3 input partitions with distinct partition values. All rows in
each partition have
- * the same value for column `ts_col` (which is of timestamp type), after
being applied by the
- * `years` transform. This is generated after combining the two splits with
partition value `2`
- * into a single Spark partition.
+ * - '''Ungrouped''' (when `isGrouped == false`): `partitionKeys` contains
duplicates. Multiple
+ * input partitions share the same key. This is the initial state when
created from a data source.
*
- * On the other hand, in this example `[0, 1, 2, 2]` is the value of
`originalPartitionValues`
- * which is calculated from the original input splits.
+ * - '''Grouped''' (when `isGrouped == true`): `partitionKeys` contains only
unique values. Each
+ * partition has a distinct key. This state is achieved by applying
`GroupPartitionsExec`, which
+ * coalesces partitions with the same key.
*
- * @param expressions partition expressions for the partitioning.
- * @param numPartitions the number of partitions
- * @param partitionValues the values for the final cluster keys (that is,
after applying grouping
- * on the input splits according to `expressions`) of
the distribution,
- * must be in ascending order, and must NOT contain
duplicated values.
- * @param originalPartitionValues the original input partition values before
any grouping has been
- * applied, must be in ascending order, and may
contain duplicated
- * values
+ * == Example ==
+ * Consider a data source with partition transform `[years(ts_col)]` and 4
input splits:
+ *
+ * '''Before GroupPartitionsExec''' (ungrouped):
+ * {{{
+ * expressions: [years(ts_col)]
+ * partitionKeys: [0, 1, 2, 2] // partition 2 and 3 have the same
key
+ * originalPartitionKeys: [0, 1, 2, 2]
+ * numPartitions: 4
+ * isGrouped: false
+ * }}}
+ *
+ * '''After GroupPartitionsExec''' (grouped):
+ * {{{
+ * expressions: [years(ts_col)]
+ * partitionKeys: [0, 1, 2] // duplicates removed, partitions
coalesced
+ * originalPartitionKeys: [0, 1, 2, 2] // unchanged, preserves original
distribution
+ * numPartitions: 3
+ * isGrouped: true
+ * }}}
+ *
+ * @param expressions Partition transform expressions (e.g., `years(col)`,
`bucket(10, col)`).
+ * @param partitionKeys Current partition keys, one per partition, in
ascending order.
+ * May contain duplicates before grouping.
+ * @param originalPartitionKeys Original partition keys from the data source,
in ascending order.
+ * Preserves the initial distribution even after
grouping.
*/
-case class KeyGroupedPartitioning(
+case class KeyedPartitioning(
expressions: Seq[Expression],
- numPartitions: Int,
- partitionValues: Seq[InternalRow] = Seq.empty,
- originalPartitionValues: Seq[InternalRow] = Seq.empty) extends
HashPartitioningLike {
+ partitionKeys: Seq[InternalRow],
+ originalPartitionKeys: Seq[InternalRow]) extends Expression with
Partitioning with Unevaluable {
+ override val numPartitions = partitionKeys.length
+
+ override def children: Seq[Expression] = expressions
+ override def nullable: Boolean = false
+ override def dataType: DataType = IntegerType
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): KeyedPartitioning =
+ copy(expressions = newChildren)
+
+ @transient private lazy val dataTypes: Seq[DataType] =
expressions.map(_.dataType)
+
+ @transient private lazy val comparableWrapperFactory =
+
InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(dataTypes)
+
+ @transient private lazy val rowOrdering =
RowOrdering.createNaturalAscendingOrdering(dataTypes)
+
+ @transient lazy val isGrouped: Boolean = {
+ partitionKeys.map(comparableWrapperFactory).distinct.size ==
partitionKeys.size
+ }
+
+ def toGrouped: KeyedPartitioning = {
+ val distinctSortedPartitionKeys =
+ partitionKeys.distinctBy(comparableWrapperFactory).sorted(rowOrdering)
Review Comment:
Indeed. Fixed in
https://github.com/apache/spark/pull/54330/commits/b8b2faa86dc8e80f374dfface97c097120e11ab4.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]