Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/6413#discussion_r33999240
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
---
@@ -76,144 +72,267 @@ case class OrderedDistribution(ordering:
Seq[SortOrder]) extends Distribution {
def clustering: Set[Expression] = ordering.map(_.child).toSet
}
-sealed trait Partitioning {
- /** Returns the number of partitions that the data is split across */
- val numPartitions: Int
+/**
+ * To a child operator, a `Gap` represents what need to be done for
satisfying its parent operator
+ * in the data distribution.
+ *
+ * NOTE: This trait and its inherits are not used by the physical
operators directly,
+ */
+private[sql] sealed trait Gap
- /**
- * Returns true iff the guarantees made by this [[Partitioning]] are
sufficient
- * to satisfy the partitioning scheme mandated by the `required`
[[Distribution]],
- * i.e. the current dataset does not need to be re-partitioned for the
`required`
- * Distribution (it is possible that tuples within a partition need to
be reorganized).
- */
- def satisfies(required: Distribution): Boolean
+/**
+ * Needn't do anything for the data distribution.
+ */
+private[sql] case object NoGap extends Gap
- /**
- * Returns true iff all distribution guarantees made by this
partitioning can also be made
- * for the `other` specified partitioning.
- * For example, two [[HashPartitioning HashPartitioning]]s are
- * only compatible if the `numPartitions` of them is the same.
- */
- def compatibleWith(other: Partitioning): Boolean
+/**
+ * Need to sort the data within the current partition.
+ * @param sortKeys the sorting keys
+ */
+private[sql] case class SortKeyWithinPartition(sortKeys: Seq[SortOrder])
extends Gap
- /** Returns the expressions that are used to key the partitioning. */
- def keyExpressions: Seq[Expression]
-}
+/**
+ * Need a global sorting for the distribution according to the specified
sorting keys.
+ * @param ordering the sorting keys
+ */
+private[sql] case class GlobalOrdering(ordering: Seq[SortOrder]) extends
Gap
-case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
- override def satisfies(required: Distribution): Boolean = required match
{
- case UnspecifiedDistribution => true
- case _ => false
- }
+/**
+ * Repartition the data according to the new clustering expression, and
it's possible that
+ * only a single partition needed, if in that cases, the clustering
expression would be ignored.
+ * @param clustering the clustering keys
+ */
+private[sql] case class RepartitionKey(
+ clustering: Seq[Expression]) extends Gap
- override def compatibleWith(other: Partitioning): Boolean = other match {
- case UnknownPartitioning(_) => true
- case _ => false
- }
+/**
+ * Repartition the data according to the the new clustering expression,
and we also need to
+ * sort the data within the partition according to the clustering
expression.
+ * Notice: The clustering expressions should be the same with the sort
keys.
+ * @param clustering the clustering expression
+ * @param sortKeys the sorting keys, should be the same with clustering
expression, but with
+ * sorting direction.
+ */
+private[sql] case class RepartitionKeyAndSort(
+ clustering: Seq[Expression],
+ sortKeys: Seq[SortOrder]) extends Gap
- override def keyExpressions: Seq[Expression] = Nil
-}
-case object SinglePartition extends Partitioning {
- val numPartitions = 1
+/**
+ * Represent the output data distribution for a physical operator.
+ *
+ * @param numPartitions
+ * @param clusterKeys
+ * @param sortKeys
+ * @param globalOrdered
+ * @param additionalNullClusterKeyGenerated
+ */
+sealed case class Partitioning(
+ /** the number of partitions that the data is split across */
+ numPartitions: Option[Int] = None,
+
+ /** the expressions that are used to key the partitioning. */
+ clusterKeys: Seq[Expression] = Nil,
+
+ /** the expression that are used to sort the data. */
+ sortKeys: Seq[SortOrder] = Nil,
+
+ /** work with `sortKeys` if the sorting cross or just within the
partition. */
+ globalOrdered: Boolean = false,
- override def satisfies(required: Distribution): Boolean = true
+ /** to indicate if new null clustering key will be generated in THIS
operator. */
+ additionalNullClusterKeyGenerated: Boolean = false) {
- override def compatibleWith(other: Partitioning): Boolean = other match {
- case SinglePartition => true
- case _ => false
+ def withNumPartitions(num: Int): Partitioning = {
+ new Partitioning(
+ numPartitions = Some(num),
+ clusterKeys,
+ sortKeys,
+ globalOrdered,
+ additionalNullClusterKeyGenerated)
--- End diff --
You may use the `copy` method comes with all case classes:
```scala
this.copy(numPartitions = Some(num))
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]