[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r240123260 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -262,6 +261,15 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) super.satisfies0(required) || { required match { case OrderedDistribution(requiredOrdering) => + // If `ordering` is a prefix of `requiredOrdering`: + // - Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. If a row is + // larger than another row w.r.t. [a, b], it's also larger w.r.t. [a, b, c]. So + // `RangePartitioning(a, b)` satisfy `OrderedDistribution(a, b, c)`. --- End diff -- nit `satisfy` -> `satisfies` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r240026485 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -118,10 +115,12 @@ case class HashClusteredDistribution( /** * Represents data where tuples have been ordered according to the `ordering` - * [[Expression Expressions]]. This is a strictly stronger guarantee than - * [[ClusteredDistribution]] as an ordering will ensure that tuples that share the - * same value for the ordering expressions are contiguous and will never be split across - * partitions. + * [[Expression Expressions]]. Its requirement is defined as the following: + * - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or + * equal to any row in the first partition, according to the `ordering` expressions. --- End diff -- Note that, only sort requires `OrderedDistribution`, and global sort doesn't care if there are equal-rows across partitions. Here is a definition of the requirement. When designing protocols, it's important to make the requirement as weak as possible, and make guarantees as strong as possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239754619 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -118,10 +115,12 @@ case class HashClusteredDistribution( /** * Represents data where tuples have been ordered according to the `ordering` - * [[Expression Expressions]]. This is a strictly stronger guarantee than - * [[ClusteredDistribution]] as an ordering will ensure that tuples that share the - * same value for the ordering expressions are contiguous and will never be split across - * partitions. + * [[Expression Expressions]]. Its requirement is defined as the following: + * - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or + * equal to any row in the first partition, according to the `ordering` expressions. --- End diff -- Why here we need this equality? Can we just have all the rows in the second partition must be larger than any row in the first partition? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239694008 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -243,10 +248,19 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) * Represents a partitioning where rows are split across partitions based on some total ordering of * the expressions specified in `ordering`. When data is partitioned in this manner the following * two conditions are guaranteed to hold: - * - All row where the expressions in `ordering` evaluate to the same values will be in the same --- End diff -- nit: "row" -> "rows", "where... `ordering`" -> "whose `ordering` expressions" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239693849 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -243,10 +248,19 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) * Represents a partitioning where rows are split across partitions based on some total ordering of * the expressions specified in `ordering`. When data is partitioned in this manner the following --- End diff -- nit: add "," after "this manner". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239690226 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, IntegerType} /** * Specifies how tuples that share common expressions will be distributed when a query is executed - * in parallel on many machines. Distribution can be used to refer to two distinct physical - * properties: - * - Inter-node partitioning of data: In this case the distribution describes how tuples are - *partitioned across physical machines in a cluster. Knowing this property allows some - *operators (e.g., Aggregate) to perform partition local operations instead of global ones. - * - Intra-partition ordering of data: In this case the distribution describes guarantees made - *about how tuples are distributed within a single partition. + * in parallel on many machines. + * + * Distribution here refers to inter-node partitioning of data: + * The distribution describes how tuples are partitioned across physical machines in a cluster. + * Knowing this property allows some operators (e.g., Aggregate) to perform partition local + * operations instead of global ones. */ --- End diff -- for ordering, I think people can look at `OrderedDistribution`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239689874 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, IntegerType} /** * Specifies how tuples that share common expressions will be distributed when a query is executed - * in parallel on many machines. Distribution can be used to refer to two distinct physical - * properties: - * - Inter-node partitioning of data: In this case the distribution describes how tuples are - *partitioned across physical machines in a cluster. Knowing this property allows some - *operators (e.g., Aggregate) to perform partition local operations instead of global ones. - * - Intra-partition ordering of data: In this case the distribution describes guarantees made - *about how tuples are distributed within a single partition. + * in parallel on many machines. + * + * Distribution here refers to inter-node partitioning of data: + * The distribution describes how tuples are partitioned across physical machines in a cluster. + * Knowing this property allows some operators (e.g., Aggregate) to perform partition local + * operations instead of global ones. */ --- End diff -- Yes, I understand that partitioning has nothing to do with intra-partition ordering at all. And it was wrong to include intra-partition ordering as part of the distribution properties. But I was thinking mentioning ordering as a side note would probably help ppl understand better how some operators work. Or maybe here's not the best place to put it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239684697 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, IntegerType} /** * Specifies how tuples that share common expressions will be distributed when a query is executed - * in parallel on many machines. Distribution can be used to refer to two distinct physical - * properties: - * - Inter-node partitioning of data: In this case the distribution describes how tuples are - *partitioned across physical machines in a cluster. Knowing this property allows some - *operators (e.g., Aggregate) to perform partition local operations instead of global ones. - * - Intra-partition ordering of data: In this case the distribution describes guarantees made - *about how tuples are distributed within a single partition. + * in parallel on many machines. + * + * Distribution here refers to inter-node partitioning of data: + * The distribution describes how tuples are partitioned across physical machines in a cluster. + * Knowing this property allows some operators (e.g., Aggregate) to perform partition local + * operations instead of global ones. */ --- End diff -- I intentionally remove everything about intra-partition, as we never leverage it and no partitioning provides this property. Did I miss something? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239540987 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, IntegerType} /** * Specifies how tuples that share common expressions will be distributed when a query is executed - * in parallel on many machines. Distribution can be used to refer to two distinct physical - * properties: - * - Inter-node partitioning of data: In this case the distribution describes how tuples are - *partitioned across physical machines in a cluster. Knowing this property allows some - *operators (e.g., Aggregate) to perform partition local operations instead of global ones. - * - Intra-partition ordering of data: In this case the distribution describes guarantees made - *about how tuples are distributed within a single partition. + * in parallel on many machines. + * + * Distribution here refers to inter-node partitioning of data: + * The distribution describes how tuples are partitioned across physical machines in a cluster. + * Knowing this property allows some operators (e.g., Aggregate) to perform partition local + * operations instead of global ones. */ --- End diff -- Do we also need to mention that there's another related but orthogonal physical property, i.e., the intra-partition ordering and maybe list an example here how operators take advantage of these two physical properties together? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239508488 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -118,10 +116,13 @@ case class HashClusteredDistribution( /** * Represents data where tuples have been ordered according to the `ordering` - * [[Expression Expressions]]. This is a strictly stronger guarantee than - * [[ClusteredDistribution]] as an ordering will ensure that tuples that share the - * same value for the ordering expressions are contiguous and will never be split across - * partitions. + * [[Expression Expressions]]. + * + * Tuples that share the same values for the ordering expressions must be contiguous within a + * partition. They can also across partitions, but these partitions must be contiguous. For example, + * if value `v` is the biggest values in partition 3, it can also be in partition 4 as the smallest + * value. If all the values in partition 4 are `v`, it can also be in partition 5 as the smallest + * value. */ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { --- End diff -- This is only used by sort, and sort doesn't require rows of same value to be colocated in the same partition. Actually we already use this knowledge to optimize `RangePartitioning.satisfy` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239508437 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -118,10 +116,13 @@ case class HashClusteredDistribution( /** * Represents data where tuples have been ordered according to the `ordering` - * [[Expression Expressions]]. This is a strictly stronger guarantee than - * [[ClusteredDistribution]] as an ordering will ensure that tuples that share the - * same value for the ordering expressions are contiguous and will never be split across - * partitions. + * [[Expression Expressions]]. + * + * Tuples that share the same values for the ordering expressions must be contiguous within a + * partition. They can also across partitions, but these partitions must be contiguous. For example, + * if value `v` is the biggest values in partition 3, it can also be in partition 4 as the smallest + * value. If all the values in partition 4 are `v`, it can also be in partition 5 as the smallest + * value. */ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { --- End diff -- This is only used by sort, and sort doesn't require rows of same value to be colocated in the same partition. Actually we already use this knowledge to optimize `RangePartitioning.satisfy` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23249 [SPARK-26297][SQL] improve the doc of Distribution/Partitioning ## What changes were proposed in this pull request? Some documents of `Distribution/Partitioning` are stale and misleading, this PR fixes them: 1. `ClusteredDistribution` doesn't have intra-partition requirement 2. `OrderedDistribution` does not require tuples that share the same value being colocated in the same partition. 3. `RangePartitioning` can provide a weaker guarantee for a prefix of its `ordering` expressions. ## How was this patch tested? comment-only PR. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23249.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23249 commit 24ea28abd5a385351703335df33b26838d203fe3 Author: Wenchen Fan Date: 2018-12-06T15:47:23Z improve the doc of Distribution/Partitioning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org