asfgit closed pull request #23249: [SPARK-26297][SQL] improve the doc of 
Distribution/Partitioning
URL: https://github.com/apache/spark/pull/23249
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index cc1a5e835d9cd..17e1cb416fc8a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -22,13 +22,11 @@ import org.apache.spark.sql.types.{DataType, IntegerType}
 
 /**
  * Specifies how tuples that share common expressions will be distributed when 
a query is executed
- * in parallel on many machines.  Distribution can be used to refer to two 
distinct physical
- * properties:
- *  - Inter-node partitioning of data: In this case the distribution describes 
how tuples are
- *    partitioned across physical machines in a cluster.  Knowing this 
property allows some
- *    operators (e.g., Aggregate) to perform partition local operations 
instead of global ones.
- *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
- *    about how tuples are distributed within a single partition.
+ * in parallel on many machines.
+ *
+ * Distribution here refers to inter-node partitioning of data. That is, it 
describes how tuples
+ * are partitioned across physical machines in a cluster. Knowing this 
property allows some
+ * operators (e.g., Aggregate) to perform partition local operations instead 
of global ones.
  */
 sealed trait Distribution {
   /**
@@ -70,9 +68,7 @@ case object AllTuples extends Distribution {
 
 /**
  * Represents data where tuples that share the same values for the `clustering`
- * [[Expression Expressions]] will be co-located. Based on the context, this
- * can mean such tuples are either co-located in the same partition or they 
will be contiguous
- * within a single partition.
+ * [[Expression Expressions]] will be co-located in the same partition.
  */
 case class ClusteredDistribution(
     clustering: Seq[Expression],
@@ -118,10 +114,12 @@ case class HashClusteredDistribution(
 
 /**
  * Represents data where tuples have been ordered according to the `ordering`
- * [[Expression Expressions]].  This is a strictly stronger guarantee than
- * [[ClusteredDistribution]] as an ordering will ensure that tuples that share 
the
- * same value for the ordering expressions are contiguous and will never be 
split across
- * partitions.
+ * [[Expression Expressions]]. Its requirement is defined as the following:
+ *   - Given any 2 adjacent partitions, all the rows of the second partition 
must be larger than or
+ *     equal to any row in the first partition, according to the `ordering` 
expressions.
+ *
+ * In other words, this distribution requires the rows to be ordered across 
partitions, but not
+ * necessarily within a partition.
  */
 case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
   require(
@@ -241,12 +239,12 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
 
 /**
  * Represents a partitioning where rows are split across partitions based on 
some total ordering of
- * the expressions specified in `ordering`.  When data is partitioned in this 
manner the following
- * two conditions are guaranteed to hold:
- *  - All row where the expressions in `ordering` evaluate to the same values 
will be in the same
- *    partition.
- *  - Each partition will have a `min` and `max` row, relative to the given 
ordering.  All rows
- *    that are in between `min` and `max` in this `ordering` will reside in 
this partition.
+ * the expressions specified in `ordering`.  When data is partitioned in this 
manner, it guarantees:
+ * Given any 2 adjacent partitions, all the rows of the second partition must 
be larger than any row
+ * in the first partition, according to the `ordering` expressions.
+ *
+ * This is a strictly stronger guarantee than what 
`OrderedDistribution(ordering)` requires, as
+ * there is no overlap between partitions.
  *
  * This class extends expression primarily so that transformations over 
expression will descend
  * into its child.
@@ -262,6 +260,22 @@ case class RangePartitioning(ordering: Seq[SortOrder], 
numPartitions: Int)
     super.satisfies0(required) || {
       required match {
         case OrderedDistribution(requiredOrdering) =>
+          // If `ordering` is a prefix of `requiredOrdering`:
+          //   Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, 
c]. According to the
+          //   RangePartitioning definition, any [a, b] in a previous 
partition must be smaller
+          //   than any [a, b] in the following partition. This also means any 
[a, b, c] in a
+          //   previous partition must be smaller than any [a, b, c] in the 
following partition.
+          //   Thus `RangePartitioning(a, b)` satisfies 
`OrderedDistribution(a, b, c)`.
+          //
+          // If `requiredOrdering` is a prefix of `ordering`:
+          //   Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, 
b]. According to the
+          //   RangePartitioning definition, any [a, b, c] in a previous 
partition must be smaller
+          //   than any [a, b, c] in the following partition. If there is a 
[a1, b1] from a previous
+          //   partition which is larger than a [a2, b2] from the following 
partition, then there
+          //   must be a [a1, b1 c1] larger than [a2, b2, c2], which violates 
RangePartitioning
+          //   definition. So it's guaranteed that, any [a, b] in a previous 
partition must not be
+          //   greater(i.e. smaller or equal to) than any [a, b] in the 
following partition. Thus
+          //   `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, 
b)`.
           val minSize = Seq(requiredOrdering.size, ordering.size).min
           requiredOrdering.take(minSize) == ordering.take(minSize)
         case ClusteredDistribution(requiredClustering, _) =>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to