szehon-ho commented on code in PR #42757:
URL: https://github.com/apache/spark/pull/42757#discussion_r1312674140


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##########
@@ -137,81 +136,63 @@ case class BatchScanExec(
 
       outputPartitioning match {
         case p: KeyGroupedPartitioning =>
-          if (conf.v2BucketingPushPartValuesEnabled &&
-              conf.v2BucketingPartiallyClusteredDistributionEnabled) {
-            assert(filteredPartitions.forall(_.size == 1),
-              "Expect partitions to be not grouped when " +
-                  
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
-                  "is enabled")
-
-            val groupedPartitions = 
groupPartitions(finalPartitions.map(_.head),
-              groupSplits = true).get
-
-            // This means the input partitions are not grouped by partition 
values. We'll need to
-            // check `groupByPartitionValues` and decide whether to group and 
replicate splits
-            // within a partition.
-            if (spjParams.commonPartitionValues.isDefined &&
-              spjParams.applyPartialClustering) {
-              // A mapping from the common partition values to how many splits 
the partition
-              // should contain.
-              val commonPartValuesMap = spjParams.commonPartitionValues
+          val groupedPartitions = filteredPartitions.map(splits => {
+            assert(splits.nonEmpty && 
splits.head.isInstanceOf[HasPartitionKey])
+            (splits.head.asInstanceOf[HasPartitionKey].partitionKey(), splits)
+          })
+
+          // This means the input partitions are not grouped by partition 
values. We'll need to

Review Comment:
   Nit: can we clarify 'this'  `When partially-clustered, input partitions are 
not grouped by partition values`
   
   Nit: groupByPartitionValues seems never actually defined, can we fix it?  
Does it refer to groupedPartitions?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -327,11 +327,14 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
  * @param numPartitions the number of partitions
  * @param partitionValues the values for the cluster keys of the distribution, 
must be

Review Comment:
   What do you think to add 'final cluster keys' to the javadoc , to make it 
even more clear?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala:
##########
@@ -217,3 +210,19 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
     }
   }
 }
+
+/**
+ * A key-grouped Spark partition, which could consist of multiple input splits
+ *
+ * @param value the partition value shared by all the input splits
+ * @param parts the input splits that are grouped into a single Spark partition
+ */
+private[v2] case class KeyGroupedPartition(value: InternalRow, parts: 
Seq[InputPartition])
+
+/**
+ * Information about key-grouped partitions, which contains a list of grouped 
partitions as well
+ * as the original input partitions before the grouping.
+ */
+private[v2] case class KeyGroupedPartitionInfo(

Review Comment:
   It seems like it would refer to info about one KeyGroupedPartition.  How 
about KeyGroupedPartitionInfos ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to