[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r240123260
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -262,6 +261,15 @@ case class RangePartitioning(ordering: Seq[SortOrder], 
numPartitions: Int)
 super.satisfies0(required) || {
   required match {
 case OrderedDistribution(requiredOrdering) =>
+  // If `ordering` is a prefix of `requiredOrdering`:
+  //   - Let's say `ordering` is [a, b] and `requiredOrdering` is 
[a, b, c]. If a row is
+  // larger than another row w.r.t. [a, b], it's also larger 
w.r.t. [a, b, c]. So
+  // `RangePartitioning(a, b)` satisfy `OrderedDistribution(a, 
b, c)`.
--- End diff --

nit `satisfy` -> `satisfies`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r240026485
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -118,10 +115,12 @@ case class HashClusteredDistribution(
 
 /**
  * Represents data where tuples have been ordered according to the 
`ordering`
- * [[Expression Expressions]].  This is a strictly stronger guarantee than
- * [[ClusteredDistribution]] as an ordering will ensure that tuples that 
share the
- * same value for the ordering expressions are contiguous and will never 
be split across
- * partitions.
+ * [[Expression Expressions]]. Its requirement is defined as the following:
+ *   - Given any 2 adjacent partitions, all the rows of the second 
partition must be larger than or
+ * equal to any row in the first partition, according to the 
`ordering` expressions.
--- End diff --

Note that, only sort requires `OrderedDistribution`, and global sort 
doesn't care if there are equal-rows across partitions.

Here is a definition of the requirement. When designing protocols, it's 
important to make the requirement as weak as possible, and make guarantees as 
strong as possible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239754619
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -118,10 +115,12 @@ case class HashClusteredDistribution(
 
 /**
  * Represents data where tuples have been ordered according to the 
`ordering`
- * [[Expression Expressions]].  This is a strictly stronger guarantee than
- * [[ClusteredDistribution]] as an ordering will ensure that tuples that 
share the
- * same value for the ordering expressions are contiguous and will never 
be split across
- * partitions.
+ * [[Expression Expressions]]. Its requirement is defined as the following:
+ *   - Given any 2 adjacent partitions, all the rows of the second 
partition must be larger than or
+ * equal to any row in the first partition, according to the 
`ordering` expressions.
--- End diff --

Why here we need this equality? Can we just have all the rows in the second 
partition must be larger than any row in the first partition?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239694008
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -243,10 +248,19 @@ case class HashPartitioning(expressions: 
Seq[Expression], numPartitions: Int)
  * Represents a partitioning where rows are split across partitions based 
on some total ordering of
  * the expressions specified in `ordering`.  When data is partitioned in 
this manner the following
  * two conditions are guaranteed to hold:
- *  - All row where the expressions in `ordering` evaluate to the same 
values will be in the same
--- End diff --

nit: "row" -> "rows", "where... `ordering`" -> "whose `ordering` 
expressions"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239693849
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -243,10 +248,19 @@ case class HashPartitioning(expressions: 
Seq[Expression], numPartitions: Int)
  * Represents a partitioning where rows are split across partitions based 
on some total ordering of
  * the expressions specified in `ordering`.  When data is partitioned in 
this manner the following
--- End diff --

nit: add "," after "this manner".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239690226
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
 
 /**
  * Specifies how tuples that share common expressions will be distributed 
when a query is executed
- * in parallel on many machines.  Distribution can be used to refer to two 
distinct physical
- * properties:
- *  - Inter-node partitioning of data: In this case the distribution 
describes how tuples are
- *partitioned across physical machines in a cluster.  Knowing this 
property allows some
- *operators (e.g., Aggregate) to perform partition local operations 
instead of global ones.
- *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
- *about how tuples are distributed within a single partition.
+ * in parallel on many machines.
+ *
+ * Distribution here refers to inter-node partitioning of data:
+ *   The distribution describes how tuples are partitioned across physical 
machines in a cluster.
+ *   Knowing this property allows some operators (e.g., Aggregate) to 
perform partition local
+ *   operations instead of global ones.
  */
--- End diff --

for ordering, I think people can look at `OrderedDistribution`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239689874
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
 
 /**
  * Specifies how tuples that share common expressions will be distributed 
when a query is executed
- * in parallel on many machines.  Distribution can be used to refer to two 
distinct physical
- * properties:
- *  - Inter-node partitioning of data: In this case the distribution 
describes how tuples are
- *partitioned across physical machines in a cluster.  Knowing this 
property allows some
- *operators (e.g., Aggregate) to perform partition local operations 
instead of global ones.
- *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
- *about how tuples are distributed within a single partition.
+ * in parallel on many machines.
+ *
+ * Distribution here refers to inter-node partitioning of data:
+ *   The distribution describes how tuples are partitioned across physical 
machines in a cluster.
+ *   Knowing this property allows some operators (e.g., Aggregate) to 
perform partition local
+ *   operations instead of global ones.
  */
--- End diff --

Yes, I understand that partitioning has nothing to do with intra-partition 
ordering at all. And it was wrong to include intra-partition ordering as part 
of the distribution properties. But I was thinking mentioning ordering as a 
side note would probably help ppl understand better how some operators work. Or 
maybe here's not the best place to put it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239684697
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
 
 /**
  * Specifies how tuples that share common expressions will be distributed 
when a query is executed
- * in parallel on many machines.  Distribution can be used to refer to two 
distinct physical
- * properties:
- *  - Inter-node partitioning of data: In this case the distribution 
describes how tuples are
- *partitioned across physical machines in a cluster.  Knowing this 
property allows some
- *operators (e.g., Aggregate) to perform partition local operations 
instead of global ones.
- *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
- *about how tuples are distributed within a single partition.
+ * in parallel on many machines.
+ *
+ * Distribution here refers to inter-node partitioning of data:
+ *   The distribution describes how tuples are partitioned across physical 
machines in a cluster.
+ *   Knowing this property allows some operators (e.g., Aggregate) to 
perform partition local
+ *   operations instead of global ones.
  */
--- End diff --

I intentionally remove everything about intra-partition, as we never 
leverage it and no partitioning provides this property. Did I miss something?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239540987
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
 
 /**
  * Specifies how tuples that share common expressions will be distributed 
when a query is executed
- * in parallel on many machines.  Distribution can be used to refer to two 
distinct physical
- * properties:
- *  - Inter-node partitioning of data: In this case the distribution 
describes how tuples are
- *partitioned across physical machines in a cluster.  Knowing this 
property allows some
- *operators (e.g., Aggregate) to perform partition local operations 
instead of global ones.
- *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
- *about how tuples are distributed within a single partition.
+ * in parallel on many machines.
+ *
+ * Distribution here refers to inter-node partitioning of data:
+ *   The distribution describes how tuples are partitioned across physical 
machines in a cluster.
+ *   Knowing this property allows some operators (e.g., Aggregate) to 
perform partition local
+ *   operations instead of global ones.
  */
--- End diff --

Do we also need to mention that there's another related but orthogonal 
physical property, i.e., the intra-partition ordering and maybe list an example 
here how operators take advantage of these two physical properties together?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239508488
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -118,10 +116,13 @@ case class HashClusteredDistribution(
 
 /**
  * Represents data where tuples have been ordered according to the 
`ordering`
- * [[Expression Expressions]].  This is a strictly stronger guarantee than
- * [[ClusteredDistribution]] as an ordering will ensure that tuples that 
share the
- * same value for the ordering expressions are contiguous and will never 
be split across
- * partitions.
+ * [[Expression Expressions]].
+ *
+ * Tuples that share the same values for the ordering expressions must be 
contiguous within a
+ * partition. They can also across partitions, but these partitions must 
be contiguous. For example,
+ * if value `v` is the biggest values in partition 3, it can also be in 
partition 4 as the smallest
+ * value. If all the values in partition 4 are `v`, it can also be in 
partition 5 as the smallest
+ * value.
  */
 case class OrderedDistribution(ordering: Seq[SortOrder]) extends 
Distribution {
--- End diff --

This is only used by sort, and sort doesn't require rows of same value to 
be colocated in the same partition.

Actually we already use this knowledge to optimize 
`RangePartitioning.satisfy`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239508437
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -118,10 +116,13 @@ case class HashClusteredDistribution(
 
 /**
  * Represents data where tuples have been ordered according to the 
`ordering`
- * [[Expression Expressions]].  This is a strictly stronger guarantee than
- * [[ClusteredDistribution]] as an ordering will ensure that tuples that 
share the
- * same value for the ordering expressions are contiguous and will never 
be split across
- * partitions.
+ * [[Expression Expressions]].
+ *
+ * Tuples that share the same values for the ordering expressions must be 
contiguous within a
+ * partition. They can also across partitions, but these partitions must 
be contiguous. For example,
+ * if value `v` is the biggest values in partition 3, it can also be in 
partition 4 as the smallest
+ * value. If all the values in partition 4 are `v`, it can also be in 
partition 5 as the smallest
+ * value.
  */
 case class OrderedDistribution(ordering: Seq[SortOrder]) extends 
Distribution {
--- End diff --

This is only used by sort, and sort doesn't require rows of same value to 
be colocated in the same partition.

Actually we already use this knowledge to optimize 
`RangePartitioning.satisfy`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23249

[SPARK-26297][SQL] improve the doc of Distribution/Partitioning

## What changes were proposed in this pull request?

Some documents of `Distribution/Partitioning` are stale and misleading, 
this PR fixes them:
1. `ClusteredDistribution` doesn't have intra-partition requirement
2. `OrderedDistribution` does not require tuples that share the same value 
being colocated in the same partition.
3. `RangePartitioning` can provide a weaker guarantee for a prefix of its 
`ordering` expressions.

## How was this patch tested?

comment-only PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23249.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23249


commit 24ea28abd5a385351703335df33b26838d203fe3
Author: Wenchen Fan 
Date:   2018-12-06T15:47:23Z

improve the doc of Distribution/Partitioning




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org