asfgit closed pull request #23249: [SPARK-26297][SQL] improve the doc of
Distribution/Partitioning
URL: https://github.com/apache/spark/pull/23249
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index cc1a5e835d9cd..17e1cb416fc8a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -22,13 +22,11 @@ import org.apache.spark.sql.types.{DataType, IntegerType}
/**
* Specifies how tuples that share common expressions will be distributed when
a query is executed
- * in parallel on many machines. Distribution can be used to refer to two
distinct physical
- * properties:
- * - Inter-node partitioning of data: In this case the distribution describes
how tuples are
- * partitioned across physical machines in a cluster. Knowing this
property allows some
- * operators (e.g., Aggregate) to perform partition local operations
instead of global ones.
- * - Intra-partition ordering of data: In this case the distribution
describes guarantees made
- * about how tuples are distributed within a single partition.
+ * in parallel on many machines.
+ *
+ * Distribution here refers to inter-node partitioning of data. That is, it
describes how tuples
+ * are partitioned across physical machines in a cluster. Knowing this
property allows some
+ * operators (e.g., Aggregate) to perform partition local operations instead
of global ones.
*/
sealed trait Distribution {
/**
@@ -70,9 +68,7 @@ case object AllTuples extends Distribution {
/**
* Represents data where tuples that share the same values for the `clustering`
- * [[Expression Expressions]] will be co-located. Based on the context, this
- * can mean such tuples are either co-located in the same partition or they
will be contiguous
- * within a single partition.
+ * [[Expression Expressions]] will be co-located in the same partition.
*/
case class ClusteredDistribution(
clustering: Seq[Expression],
@@ -118,10 +114,12 @@ case class HashClusteredDistribution(
/**
* Represents data where tuples have been ordered according to the `ordering`
- * [[Expression Expressions]]. This is a strictly stronger guarantee than
- * [[ClusteredDistribution]] as an ordering will ensure that tuples that share
the
- * same value for the ordering expressions are contiguous and will never be
split across
- * partitions.
+ * [[Expression Expressions]]. Its requirement is defined as the following:
+ * - Given any 2 adjacent partitions, all the rows of the second partition
must be larger than or
+ * equal to any row in the first partition, according to the `ordering`
expressions.
+ *
+ * In other words, this distribution requires the rows to be ordered across
partitions, but not
+ * necessarily within a partition.
*/
case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
require(
@@ -241,12 +239,12 @@ case class HashPartitioning(expressions: Seq[Expression],
numPartitions: Int)
/**
* Represents a partitioning where rows are split across partitions based on
some total ordering of
- * the expressions specified in `ordering`. When data is partitioned in this
manner the following
- * two conditions are guaranteed to hold:
- * - All row where the expressions in `ordering` evaluate to the same values
will be in the same
- * partition.
- * - Each partition will have a `min` and `max` row, relative to the given
ordering. All rows
- * that are in between `min` and `max` in this `ordering` will reside in
this partition.
+ * the expressions specified in `ordering`. When data is partitioned in this
manner, it guarantees:
+ * Given any 2 adjacent partitions, all the rows of the second partition must
be larger than any row
+ * in the first partition, according to the `ordering` expressions.
+ *
+ * This is a strictly stronger guarantee than what
`OrderedDistribution(ordering)` requires, as
+ * there is no overlap between partitions.
*
* This class extends expression primarily so that transformations over
expression will descend
* into its child.
@@ -262,6 +260,22 @@ case class RangePartitioning(ordering: Seq[SortOrder],
numPartitions: Int)
super.satisfies0(required) || {
required match {
case OrderedDistribution(requiredOrdering) =>
+ // If `ordering` is a prefix of `requiredOrdering`:
+ // Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b,
c]. According to the
+ // RangePartitioning definition, any [a, b] in a previous
partition must be smaller
+ // than any [a, b] in the following partition. This also means any
[a, b, c] in a
+ // previous partition must be smaller than any [a, b, c] in the
following partition.
+ // Thus `RangePartitioning(a, b)` satisfies
`OrderedDistribution(a, b, c)`.
+ //
+ // If `requiredOrdering` is a prefix of `ordering`:
+ // Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a,
b]. According to the
+ // RangePartitioning definition, any [a, b, c] in a previous
partition must be smaller
+ // than any [a, b, c] in the following partition. If there is a
[a1, b1] from a previous
+ // partition which is larger than a [a2, b2] from the following
partition, then there
+ // must be a [a1, b1 c1] larger than [a2, b2, c2], which violates
RangePartitioning
+ // definition. So it's guaranteed that, any [a, b] in a previous
partition must not be
+ // greater(i.e. smaller or equal to) than any [a, b] in the
following partition. Thus
+ // `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a,
b)`.
val minSize = Seq(requiredOrdering.size, ordering.size).min
requiredOrdering.take(minSize) == ordering.take(minSize)
case ClusteredDistribution(requiredClustering, _) =>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]