sunchao commented on code in PR #46255:
URL: https://github.com/apache/spark/pull/46255#discussion_r1619573898


##########
core/src/main/scala/org/apache/spark/Partitioner.scala:
##########
@@ -149,7 +150,9 @@ private[spark] class KeyGroupedPartitioner(
     override val numPartitions: Int) extends Partitioner {
   override def getPartition(key: Any): Int = {
     val keys = key.asInstanceOf[Seq[Any]]
-    valueMap.getOrElseUpdate(keys, Utils.nonNegativeMod(keys.hashCode, 
numPartitions))
+    val normalizedKeys = ArraySeq.from(keys)

Review Comment:
   curious what does this do? why it is normalized?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -870,12 +870,30 @@ case class KeyGroupedShuffleSpec(
     if (results.forall(p => p.isEmpty)) None else Some(results)
   }
 
-  override def canCreatePartitioning: Boolean = 
SQLConf.get.v2BucketingShuffleEnabled &&
-    // Only support partition expressions are AttributeReference for now
-    partitioning.expressions.forall(_.isInstanceOf[AttributeReference])
+  override def canCreatePartitioning: Boolean = {
+    // Allow one side shuffle for SPJ for now only if partially-clustered is 
not enabled
+    // and for join keys less than partition keys only if transforms are not 
enabled.
+    val checkExprType = if 
(SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) {

Review Comment:
   Trying to understand the reason behind this. Also, it might be better to add 
some logging here if it is easy.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -113,4 +116,23 @@ case class TransformExpression(
 
   override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[Expression]): Expression =
     copy(children = newChildren)
+
+  lazy val resolvedFunction: Option[Expression] = this match {

Review Comment:
   nit: this can be private



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -870,12 +870,30 @@ case class KeyGroupedShuffleSpec(
     if (results.forall(p => p.isEmpty)) None else Some(results)
   }
 
-  override def canCreatePartitioning: Boolean = 
SQLConf.get.v2BucketingShuffleEnabled &&
-    // Only support partition expressions are AttributeReference for now
-    partitioning.expressions.forall(_.isInstanceOf[AttributeReference])
+  override def canCreatePartitioning: Boolean = {
+    // Allow one side shuffle for SPJ for now only if partially-clustered is 
not enabled
+    // and for join keys less than partition keys only if transforms are not 
enabled.
+    val checkExprType = if 
(SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) {
+      e: Expression => e.isInstanceOf[AttributeReference]
+    } else {
+      e: Expression => e.isInstanceOf[AttributeReference] || 
e.isInstanceOf[TransformExpression]
+    }
+    SQLConf.get.v2BucketingShuffleEnabled &&
+      !SQLConf.get.v2BucketingPartiallyClusteredDistributionEnabled &&
+      partitioning.expressions.forall(checkExprType)
+  }
+
+
 
   override def createPartitioning(clustering: Seq[Expression]): Partitioning = 
{
-    KeyGroupedPartitioning(clustering, partitioning.numPartitions, 
partitioning.partitionValues)
+    val newExpressions: Seq[Expression] = 
clustering.zip(partitioning.expressions).map{

Review Comment:
   nit: space before `{`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to