szehon-ho commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1304941351


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -711,36 +718,82 @@ case class KeyGroupedShuffleSpec(
     case _ => false
   }
 
+  def isPartitioningCompatible(otherPartitioning: KeyGroupedPartitioning): 
Boolean = {
+    val joinKeyPositions = keyPositions.map(_.nonEmpty)
+    partitioning.partitionValues.zip(otherPartitioning.partitionValues)
+      .forall {
+        case (left, right) =>
+          KeyGroupedShuffleSpec.project(left, partitioning.expressions, 
joinKeyPositions)
+            .equals(
+              KeyGroupedShuffleSpec.project(right, partitioning.expressions, 
joinKeyPositions))
+      }
+  }
+
   // Whether the partition keys (i.e., partition expressions) are compatible 
between this and the
-  // `other` spec.
+  // other spec.
   def areKeysCompatible(other: KeyGroupedShuffleSpec): Boolean = {
-    val expressions = partitioning.expressions
-    val otherExpressions = other.partitioning.expressions
-
-    expressions.length == otherExpressions.length && {
-      val otherKeyPositions = other.keyPositions
-      keyPositions.zip(otherKeyPositions).forall { case (left, right) =>
-        left.intersect(right).nonEmpty
+    partitionExpressionsCompatible(other) &&
+      KeyGroupedShuffleSpec.keyPositionsCompatible(
+        keyPositions, other.keyPositions
+      )
+  }
+
+  // Whether the partition keys (i.e., partition expressions) that also are in 
the set of
+  // join keys are compatible between this and the other spec.
+  def areJoinKeysCompatible(other: KeyGroupedShuffleSpec): Boolean = {
+    partitionExpressionsCompatible(other) &&
+      KeyGroupedShuffleSpec.keyPositionsCompatible(
+        keyPositions.filter(_.nonEmpty),
+        other.keyPositions.filter(_.nonEmpty)
+    )
+  }
+
+  private def partitionExpressionsCompatible(other: KeyGroupedShuffleSpec): 
Boolean = {
+    val left = partitioning.expressions
+    val right = other.partitioning.expressions
+    left.length == right.length &&
+      left.zip(right).forall {
+        case (l, r) => KeyGroupedShuffleSpec.isExpressionCompatible(l, r)
       }
-    } && expressions.zip(otherExpressions).forall {
-      case (l, r) => isExpressionCompatible(l, r)
-    }
   }
 
-  private def isExpressionCompatible(left: Expression, right: Expression): 
Boolean =
+  override def canCreatePartitioning: Boolean = 
SQLConf.get.v2BucketingShuffleEnabled &&
+    // Only support partition expressions are AttributeReference for now
+    partitioning.expressions.forall(_.isInstanceOf[AttributeReference])
+
+  override def createPartitioning(clustering: Seq[Expression]): Partitioning = 
{
+    KeyGroupedPartitioning(clustering, partitioning.numPartitions, 
partitioning.partitionValues)
+  }
+}
+
+object KeyGroupedShuffleSpec {
+
+  def isExpressionCompatible(left: Expression, right: Expression): Boolean =

Review Comment:
   This is just grouping the new static into companion object, so the diff 
looks a bit bigger, let me know if I should revert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to