Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21587#discussion_r196616725
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
    @@ -68,50 +68,42 @@ case object AllTuples extends Distribution {
       }
     }
     
    -/**
    - * Represents data where tuples that share the same values for the 
`clustering`
    - * [[Expression Expressions]] will be co-located. Based on the context, 
this
    - * can mean such tuples are either co-located in the same partition or 
they will be contiguous
    - * within a single partition.
    - */
    -case class ClusteredDistribution(
    -    clustering: Seq[Expression],
    -    requiredNumPartitions: Option[Int] = None) extends Distribution {
    +abstract class ClusteredDistributionBase(exprs: Seq[Expression]) extends 
Distribution {
       require(
    -    clustering != Nil,
    -    "The clustering expressions of a ClusteredDistribution should not be 
Nil. " +
    +    exprs.nonEmpty,
    +    s"The clustering expressions of a ${getClass.getSimpleName} should not 
be empty. " +
           "An AllTuples should be used to represent a distribution that only 
has " +
           "a single partition.")
     
       override def createPartitioning(numPartitions: Int): Partitioning = {
         assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == 
numPartitions,
    -      s"This ClusteredDistribution requires ${requiredNumPartitions.get} 
partitions, but " +
    +      s"This ${getClass.getSimpleName} requires 
${requiredNumPartitions.get} partitions, but " +
             s"the actual number of partitions is $numPartitions.")
    -    HashPartitioning(clustering, numPartitions)
    +    HashPartitioning(exprs, numPartitions)
       }
     }
     
     /**
    - * Represents data where tuples have been clustered according to the hash 
of the given
    - * `expressions`. The hash function is defined as 
`HashPartitioning.partitionIdExpression`, so only
    + * Represents data where tuples that share the same values for the 
`clustering`
    + * [[Expression Expressions]] will be co-located. Based on the context, 
this
    + * can mean such tuples are either co-located in the same partition or 
they will be contiguous
    + * within a single partition.
    + */
    +case class ClusteredDistribution(
    +    clustering: Seq[Expression],
    +    requiredNumPartitions: Option[Int] = None) extends 
ClusteredDistributionBase(clustering)
    +
    +/**
    + * Represents data where tuples have been clustered according to the hash 
of the given expressions.
    + * The hash function is defined as 
[[HashPartitioning.partitionIdExpression]], so only
      * [[HashPartitioning]] can satisfy this distribution.
      *
      * This is a strictly stronger guarantee than [[ClusteredDistribution]]. 
Given a tuple and the
      * number of partitions, this distribution strictly requires which 
partition the tuple should be in.
      */
    -case class HashClusteredDistribution(expressions: Seq[Expression]) extends 
Distribution {
    --- End diff --
    
    I do not see any new tests in the DistributionSuite. I feel like issues 
likes this should have specified unit tests in DistributionSuite and shouldnt 
have to rely on StreamingJoinSuite.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to