Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21587#discussion_r196616725
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
---
@@ -68,50 +68,42 @@ case object AllTuples extends Distribution {
}
}
-/**
- * Represents data where tuples that share the same values for the
`clustering`
- * [[Expression Expressions]] will be co-located. Based on the context,
this
- * can mean such tuples are either co-located in the same partition or
they will be contiguous
- * within a single partition.
- */
-case class ClusteredDistribution(
- clustering: Seq[Expression],
- requiredNumPartitions: Option[Int] = None) extends Distribution {
+abstract class ClusteredDistributionBase(exprs: Seq[Expression]) extends
Distribution {
require(
- clustering != Nil,
- "The clustering expressions of a ClusteredDistribution should not be
Nil. " +
+ exprs.nonEmpty,
+ s"The clustering expressions of a ${getClass.getSimpleName} should not
be empty. " +
"An AllTuples should be used to represent a distribution that only
has " +
"a single partition.")
override def createPartitioning(numPartitions: Int): Partitioning = {
assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get ==
numPartitions,
- s"This ClusteredDistribution requires ${requiredNumPartitions.get}
partitions, but " +
+ s"This ${getClass.getSimpleName} requires
${requiredNumPartitions.get} partitions, but " +
s"the actual number of partitions is $numPartitions.")
- HashPartitioning(clustering, numPartitions)
+ HashPartitioning(exprs, numPartitions)
}
}
/**
- * Represents data where tuples have been clustered according to the hash
of the given
- * `expressions`. The hash function is defined as
`HashPartitioning.partitionIdExpression`, so only
+ * Represents data where tuples that share the same values for the
`clustering`
+ * [[Expression Expressions]] will be co-located. Based on the context,
this
+ * can mean such tuples are either co-located in the same partition or
they will be contiguous
+ * within a single partition.
+ */
+case class ClusteredDistribution(
+ clustering: Seq[Expression],
+ requiredNumPartitions: Option[Int] = None) extends
ClusteredDistributionBase(clustering)
+
+/**
+ * Represents data where tuples have been clustered according to the hash
of the given expressions.
+ * The hash function is defined as
[[HashPartitioning.partitionIdExpression]], so only
* [[HashPartitioning]] can satisfy this distribution.
*
* This is a strictly stronger guarantee than [[ClusteredDistribution]].
Given a tuple and the
* number of partitions, this distribution strictly requires which
partition the tuple should be in.
*/
-case class HashClusteredDistribution(expressions: Seq[Expression]) extends
Distribution {
--- End diff --
I do not see any new tests in the DistributionSuite. I feel like issues
likes this should have specified unit tests in DistributionSuite and shouldnt
have to rely on StreamingJoinSuite.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]